Source code for lhpcdt.lrms

#!/bin/env python
#
# LUNARC HPC Desktop On-Demand graphical launch tool
# Copyright (C) 2017-2023 LUNARC, Lund University
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

"""Base classes for interacting with resource management systems."""

import os
import subprocess
import time
import datetime
import getpass
import sys

from subprocess import Popen, PIPE, STDOUT

from . import hostlist
from . import config
from . import jobs


[docs] def execute_cmd(cmd): """Wrapper function for calling an external process""" p = subprocess.Popen( cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True) output = p.stdout.read() retval = p.wait() return output
[docs] class GrantFile: """Class for accessing LUNARC grantfiles""" def __init__(self, filename): """Class constructor""" self.filename = filename self.verbose = True self._parse_grantfile() def _parse_grantfile(self): """Parse grantfile""" f = open(self.filename, "r") lines = f.readlines() f.close() self.projects = {} for line in lines: if line[0] != '#': items = line.split(",") if len(items) == 6: name = items[0] self.projects[name] = {} self.projects[name]["start_date"] = items[1] self.projects[name]["end_date"] = items[2] self.projects[name]["core_hours"] = int(items[3]) self.projects[name]["partition"] = items[4] self.projects[name]["pi"] = items[5].split("#")[0] self.projects[name]["users"] = items[5].split("#")[ 1].split()
[docs] def query_active_projects(self, user): """Query for an active project for user""" active_projects = [] for project in list(self.projects.keys()): if user in self.projects[project]["users"]: if self.verbose: print("Found user %s in project %s in grantfile %s" % (user, project, self.filename)) start_date = datetime.datetime.strptime( self.projects[project]["start_date"], "%Y%m%d") end_date = datetime.datetime.strptime( self.projects[project]["end_date"], "%Y%m%d") current_date = datetime.datetime.today() if self.verbose: print("Project lifetime: %s-%s" % (start_date, end_date)) if (start_date < current_date) and (current_date < end_date): if self.verbose: print("Project is ACTIVE") active_projects.append(project) else: if self.verbose: print("Project is EXPIRED") return active_projects
[docs] class Queue(object): """Class for encapsuling a SLURM queue""" def __init__(self): """ JOBID PARTITION NAME USER STATE TIME TIMELIMIT NODES NODELIST(REASON) 2981700 brand 5cpu.scr kurs16 RUNNING 2-02:39:48 6-00:00:00 1 an225 """ self.squeueParams = ["jobid", "partition", "name", "user", "state", "time", "timelimit", "nodes", "nodelist", "timeleft", "deps", "account", "cpus", "features", "timestart"] # jobinfo squeue format - %.7i %.9P %.25j %.8u %14a %.2t %.19S %.10L %.8Q %.4C %.16R %.12f %E # x x x x x x x self.squeueFormat = "%.7i;%.9P;%.20j;%.12u;%.8T;%.10M;%.9l;%.6D;%R;%L;%E;%14a;%4C;%.12f;%S" self.jobList = [] self.jobs = {} self.userJobs = {} self.running_jobs = {} self.pending_jobs = {} self.max_nodes = -1 self.max_cpus = -1
[docs] def job_info(self, jobid): """Return information on job jobid""" return execute_cmd('scontrol show job %s' % jobid)
[docs] def update(self): """Update queue information""" output = execute_cmd( 'squeue --noheader --format="%s"' % self.squeueFormat) lines = output.split("\n") self.max_nodes = -1 self.max_cpus = -1 self.jobs = {} self.jobList = [] self.userJobs = {} for line in lines: parts = line.split(";") if len(parts) > 2: id = parts[0].strip() if not (id in self.jobs): self.jobs[id] = {} job = {"jobid": id} for i in range(1, 15): self.jobs[id][self.squeueParams[i]] = parts[i].strip() job[self.squeueParams[i]] = parts[i].strip() if job["state"] == "RUNNING": self.running_jobs[id] = job if job["state"] == "PENDING": self.pending_jobs[id] = job if int(job["nodes"]) > self.max_nodes: self.max_nodes = int(job["nodes"]) if int(job["cpus"]) > self.max_cpus: self.max_cpus = int(job["cpus"]) self.jobList.append(job) # if not self.userJobs.has_key(self.jobs[id]["user"]): if not (self.jobs[id]["user"] in self.userJobs): self.userJobs[self.jobs[id]["user"]] = {} self.userJobs[self.jobs[id]["user"]][id] = self.jobs[id]
[docs] class Slurm(object): """SLURM Interface class""" def __init__(self): """Slurm constructor""" self.partitions = [] self.node_lists = {} self.verbose = True def __include_part(self, part, exclude_set): include = True if len(exclude_set)>0: for exclude_pattern in exclude_set: if exclude_pattern in part: include = False return include
[docs] def query_partitions(self, exclude_set={}): """Query partitions in slurm.""" p = Popen("sinfo", stdout=PIPE, stderr=PIPE, shell=True, universal_newlines=True) squeue_output = p.communicate()[0].split("\n") self.partitions = [] self.node_lists = {} part_lines = squeue_output[1:] for line in part_lines: if line != "": part_name = line.split()[0].strip() node_list = line.split()[5] if part_name.find("*") != -1: part_name = part_name[:-1] if self.__include_part(part_name, exclude_set): self.partitions.append(part_name) if part_name in self.node_lists: self.node_lists[part_name] = self.node_lists[part_name] + \ hostlist.expand_hostlist(node_list) else: self.node_lists[part_name] = hostlist.expand_hostlist( node_list) self.partitions = list(set(self.partitions))
""" NodeName=eg24 Arch=x86_64 CoresPerSocket=8 CPUAlloc=0 CPUErr=0 CPUTot=16 CPULoad=0.01 AvailableFeatures=rack-f1,kepler,mem96GB,gpu8k20 ActiveFeatures=rack-f1,kepler,mem96GB,gpu8k20 Gres=gpu:k20:6 NodeAddr=eg24 NodeHostName=eg24 Version=17.02 OS=Linux RealMemory=94000 AllocMem=0 FreeMem=94163 Sockets=2 Boards=1 State=IDLE ThreadsPerCore=1 TmpDisk=0 Weight=1 Owner=N/A MCS_label=N/A Partitions=lvis BootTime=2018-02-05T18:02:37 SlurmdStartTime=2018-02-05T18:05:18 CfgTRES=cpu=16,mem=94000M AllocTRES= CapWatts=n/a CurrentWatts=0 LowestJoules=0 ConsumedJoules=0 ExtSensorsJoules=n/s ExtSensorsWatts=0 ExtSensorsTemp=n/s """
[docs] def query_node(self, node): """Query information on node""" p = Popen("scontrol show node %s" % node, stdout=PIPE, stderr=PIPE, shell=True, universal_newlines=True) scontrol_output = p.communicate()[0].split("\n") node_dict = {} for line in scontrol_output: var_pairs = line.strip().split(" ") if len(var_pairs) >= 1: for var_pair in var_pairs: if len(var_pair) > 0: if var_pair.find("=") != -1: var_name = var_pair.split("=")[0] var_value = var_pair.split("=")[1] node_dict[var_name] = var_value return node_dict
[docs] def query_nodes(self): """Query information on node""" p = Popen("scontrol show nodes -o", stdout=PIPE, stderr=PIPE, shell=True, universal_newlines=True) scontrol_output = p.communicate()[0].split("\n") node_dict = {} current_node_name = "" for line in scontrol_output: var_pairs = line.strip().split(" ") if len(var_pairs) >= 1: for var_pair in var_pairs: if len(var_pair) > 0: if var_pair.find("=") != -1: var_name = var_pair.split("=")[0] var_value = var_pair.split("=")[1] if var_name == "NodeName": current_node_name = var_value node_dict[var_value] = {} else: node_dict[current_node_name][var_name] = var_value return node_dict
def __include_feature(self, feature, exclude_set): include = True for exclude_pattern in exclude_set: if exclude_pattern in feature: include = False return include
[docs] def query_features(self, part, exclude_set={}): """Query features of partition""" if self.verbose: print("Please wait, querying nodes for features %s ...") node_info = self.query_nodes() feature_list = [] for node in list(node_info.keys()): if "Partitions" in node_info[node]: if node_info[node]["Partitions"] == part: features = node_info[node]["ActiveFeatures"].split(",") for feature in features: if self.__include_feature(feature, exclude_set): feature_list.append(feature) if self.verbose: # print(list(set(feature_list))) print("Done.") return list(set(feature_list))
[docs] def query_gres(self, part): """Query features of partition""" node_list = self.node_lists[part] gres_list = [] for node in node_list: node_info = self.query_node(node) gres = node_info["Gres"].split(",") gres_list.extend(gres) return list(set(gres_list))
[docs] def submit(self, job): """Submit job to SLURM""" # Write job script to file (Debugging) cfg = config.GfxConfig.create() home_dir = os.getenv("HOME") if cfg.debug_mode: debug_script_filename = os.path.join(home_dir, "gfxjob.sh") submit_script = open(debug_script_filename, "w") submit_script.write(job.script) submit_script.close() # Submit from user home dir. os.chdir(home_dir) # Start a sbatch process for job submission p = Popen("sbatch", stdout=PIPE, stdin=PIPE, stderr=PIPE, shell=True, universal_newlines=True) sbatch_output = p.communicate(input=job.script)[0].strip() if sbatch_output.find("Submitted batch") != -1: job.id = int(sbatch_output.split()[3]) return True else: job.id = -1 return False
[docs] def job_status(self, job): """Query status of job""" p = Popen("squeue -j " + str(job.id) + " -t PD,R -h -o '%t;%N;%L;%M;%l'", stdout=PIPE, stderr=PIPE, shell=True, universal_newlines=True) squeue_output = p.communicate()[0].strip().split(";") #print(squeue_output) if len(squeue_output) > 1: job.status = squeue_output[0] job.nodes = squeue_output[1] job.timeLeft = squeue_output[2] job.timeRunning = squeue_output[3] job.timeLimit = squeue_output[4] else: job.status = "" job.nodes = "" job.timeLeft = "" job.timeRunning = "" job.timeLimit = ""
[docs] def cancel_job_with_id(self, jobid): """Cancel job""" result = subprocess.call("scancel %d" % (jobid), shell=True) return result
[docs] def cancel_job(self, job): """Cancel job""" try: result = subprocess.call("scancel %d" % (job.id), shell=True) job.id = -1 job.status = "" except: return -1 return result
[docs] def job_output(self, job): """Query job output""" if self.is_running(job): output_filename = os.path.join( os.environ["HOME"], "slurm-%d.out" % job.id) if os.path.exists(output_filename): output_file = open(output_filename, "r") output = output_file.readlines() output_file.close() return output else: print("Couldn't find: "+output_filename) return [] else: return []
[docs] def wait_for_start(self, job): """Wait for job to start""" self.job_status(job) while job.status != "R": self.job_status(job) time.sleep(1)
def is_running(self, job): self.job_status(job) return job.status == "R"
[docs] def has_started(self, job): """Query if job has started""" self.job_status(job) return job.status == "R"
[docs] def is_waiting(self, job): """Query if job is in an non-running state""" self.job_status(job) return job.status != "R"
[docs] class Accounting: """ sacct -o jobid%20,jobname%50,state%30,exitcode%20 """ def __init__(self): self.job_status = {} self.query_job_accouting() def query_job_accouting(self): output = execute_cmd("sacct -p -n -b") lines = output.split("\n") for line in lines: items = line.split("|") job = items[0].split(".") if len(items)<=1: continue if len(job)>1: job_id = job[0] job_step = job[1] else: job_id = job[0] job_step = "first" job_state = items[1] job_exit_code = items[2] if not job_id in self.job_status: self.job_status[job_id] = {} self.job_status[job_id][job_step] = [] if not job_step in self.job_status[job_id]: self.job_status[job_id][job_step] = [] self.job_status[job_id][job_step].append(job_state) self.job_status[job_id][job_step].append(job_exit_code)
class AccountManager: def __init__(self, user=""): self.user = user self.user_account_dict = {} self.account_dict = {} if self.user == "": self.user = getpass.getuser() self.query() def query(self): self.query_user_account_info() self.query_account_info() def query_user_account_info(self) -> list: output = execute_cmd("sacctmgr -P show user %s withassoc" % self.user) lines = output.split("\n") headers = lines[0].split("|") lines = lines[1:] account_dict = {} for line in lines: columns = line.split("|") if len(columns) > 1: account = columns[4] if account != "no_project": partition = columns[5] if not account in account_dict: account_dict[account] = {} if not 'partitions' in account_dict[account]: account_dict[account]['partitions'] = [] account_dict[account]["partitions"].append(partition) self.user_account_dict = account_dict def query_account_info(self) -> dict: # sacctmgr -P show user output = execute_cmd("sacctmgr -P show user") lines = output.split("\n") headers = lines[0].split("|") lines = lines[1:] account_dict = {} for line in lines: columns = line.split("|") if len(columns) > 1: account = columns[1] user = columns[0] if not account in account_dict: account_dict[account] = {} if not 'users' in account_dict[account]: account_dict[account]['users'] = [] account_dict[account]["users"].append(user) self.account_dict = account_dict def query_active_projects(self, user): return list(self.user_account_dict.keys()) if __name__ == "__main__": #slurm = Slurm() #features = slurm.query_features("snic") # print(features) #saccmgr = AccountManager() #saccmgr.query_user_account_info() sacct = Accounting() print(sacct.job_status)