#!/bin/env python
#
# LUNARC HPC Desktop On-Demand graphical launch tool
# Copyright (C) 2017-2022 LUNARC, Lund University
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Jobs module
This module provides classes for supported job types.
"""
import os
import sys
import subprocess
import time
import urllib.parse as up
from subprocess import Popen, PIPE, STDOUT
[docs]
def find_remote_port(url):
"""Extract port information from a url."""
url_parts = up.urlparse(url)
if url_parts.netloc.find(":")!=-1:
return int(url_parts.netloc.split(":")[1])
else:
return -1
[docs]
class Job(object):
"""Class describing a SLURM jobs"""
def __init__(self, account="", partition="", time="00:60:00"):
"""Initialise default class variables"""
self.script = ""
self.id = -1
self.status = ""
self.magic = "#!/bin/bash"
self.name = "gui_interactive"
self.nodes = ""
self.tasksPerNode = 1
self.cpusPerNode = -1
self.exclusive = False
self.time = time
self.nodeCount = 1
self.memory = -1
self.account = account
self.partition = partition
self.node = ""
self.submitNode = False
self.constraints = []
self.gres = ""
self.oversubscribe = False
self.module_list = []
self.scriptLines = []
self.customLines = []
self._process_output = False
self.update_processing = False
self.processing_description = ""
self._create_script()
[docs]
def add_constraint(self, constraint):
"""Add constraint (feature)"""
self.constraints.append(constraint)
def clear_constraints(self):
self.constraints = []
def add_script(self, line):
self.scriptLines.append(line)
def add_option(self, option):
self.add_script("#SBATCH " + option)
def add_module(self, name, version=""):
self.module_list.append([name, version])
def clear_script(self):
self.scriptLines = []
self.customLines = []
self.constraints = []
self.module_list = []
def _create_script(self):
self.scriptLines = []
self.add_script(self.magic)
self.add_script("")
if self.account != "":
self.add_option("-A %s" % self.account)
if self.submitNode:
self.add_option("-w %s" % self.node)
else:
if self.partition != "":
self.add_option("-p %s" % self.partition)
if self.nodeCount >= 0:
self.add_option("-N %d" % self.nodeCount)
if self.tasksPerNode >= 0:
self.add_option("--ntasks-per-node=%d" % self.tasksPerNode)
self.add_option("--time=%s" % self.time)
if self.gres != "":
self.add_option("--gres=%s" % self.gres)
if self.memory > 0:
self.add_option("--mem=%d" % self.memory)
if self.exclusive:
self.add_option("--exclusive")
if self.oversubscribe:
self.add_option("--oversubscribe")
if len(self.constraints) > 0:
if len(self.constraints) == 1:
self.add_option("--constraint=%s" % self.constraints[0])
else:
constraint_string = "&".join(self.constraints)
self.add_option("--constraint=%s" % constraint_string)
self.add_option("-J %s" % self.name)
self.add_script("")
self.add_script('echo "Starting at `date`"')
self.add_script('echo "Running on hosts: $SLURM_NODELIST"')
self.add_script('echo "Running on $SLURM_NNODES nodes."')
self.add_script('echo "Running on $SLURM_NPROCS processors."')
self.add_script('echo "SLURM JobID $SLURM_JOB_ID processors."')
self.add_script('echo "Node has $SLURM_CPUS_ON_NODE processors."')
self.add_script('echo "Node has $SLURM_MEM_PER_NODE total memory."')
self.add_script('echo "Node has $SLURM_MEM_PER_CPU memory per cpu."')
self.add_script('echo "Current working directory is `pwd`"')
self.add_script('echo "Current path is $PATH"')
self.add_script('')
for module in self.module_list:
module_name = module[0]
module_version = module[1]
if module_version == "":
self.add_script('module load %s' % (module_name))
else:
self.add_script('module load %s/%s' % (module_name, module_version))
self.script = "\n".join(self.scriptLines + self.customLines)
def add_custom_script(self, line):
self.customLines.append(line)
def update(self):
self._create_script()
def set_process_output(self, flag):
self._process_output = flag
def get_process_output(self):
return self._process_output
def do_process_output(self, output_lines):
pass
def do_update_processing(self):
pass
def __str__(self):
return self.script
process_output = property(get_process_output, set_process_output)
[docs]
class PlaceHolderJob(Job):
"""Placeholder job running acting as master process"""
def __init__(self, account="", partition="", time="00:30:00"):
Job.__init__(self, account, partition, time)
self.add_custom_script('while true; do sleep 60; done')
self.update()
conda_initialise_script = """# >>> conda initialize >>>
# !! Contents within this block are managed by 'conda init' !!
if [ -z "${EBROOTANACONDA3}" ]; then
echo "You need to load the Anaconda3 module before sourcing this script."
return
fi
__conda_setup="$(${EBROOTANACONDA3}/bin/conda 'shell.bash' 'hook' 2> /dev/null)"
if [ $? -eq 0 ]; then
eval "$__conda_setup"
else
if [ -f "${EBROOTANACONDA3}/etc/profile.d/conda.sh" ]; then
. "${EBROOTANACONDA3}/etc/profile.d/conda.sh"
else
export PATH="${EBROOTANACONDA3}/bin:$PATH"
fi
fi
unset __conda_setup
# <<< conda initialize <<<
"""
[docs]
class JupyterNotebookJob(Job):
"""Jupyter notebook job"""
def __init__(self, account="", partition="", time="00:30:00", notebook_module="Anaconda3", use_localhost=False, conda_env=""):
Job.__init__(self, account, partition, time)
self.use_localhost = use_localhost
self.notebook_url = ""
self.process_output = True
self.processing_description = "Waiting for notebook instance to start."
self.notebook_module = notebook_module
self.conda_source_env = ""
self.conda_env = conda_env
if ',' in self.notebook_module:
modules = self.notebook_module.split(",")
for module in modules:
self.add_module(module.strip())
else:
self.add_module(self.notebook_module)
self.add_custom_script("unset XDG_RUNTIME_DIR")
if self.conda_source_env != "":
self.add_custom_script("source %s" % self.conda_source_env)
if self.conda_env != "":
self.add_custom_script("conda activate %s" % self.conda_env)
if self.use_localhost:
self.add_custom_script('jupyter-notebook --no-browser')
else:
self.add_custom_script('jupyter-notebook --no-browser --ip=$HOSTNAME')
self.add_custom_script("module list")
self.add_custom_script("which python")
[docs]
def on_notebook_url_found(self, url):
"""Event method called when notebook has been found"""
print("Notebook found: "+url)
[docs]
def do_process_output(self, output_lines):
"""Process job output"""
Job.do_process_output(self, output_lines)
if self.process_output:
for line in output_lines:
if line.find("?token=") != -1:
if line.find("127.0.0.1") == -1:
if self.process_output:
url = line[line.find("http:"):].strip()
port = find_remote_port(url)
if port!=-1:
self.notebook_port = port
else:
self.notebook_port = 8888
self.notebook_url = url
self.process_output = False
self.on_notebook_url_found(self.notebook_url)
[docs]
class JupyterLabJob(Job):
"""Jupyter lab job"""
def __init__(self, account="", partition="", time="00:30:00", jupyterlab_module="Anaconda3", use_localhost=False, conda_env=""):
Job.__init__(self, account, partition, time)
self.use_localhost = use_localhost
self.notebook_url = ""
self.process_output = True
self.processing_description = "Waiting for notebook instance to start."
self.jupyterlab_module = jupyterlab_module
self.init_conda = False
self.conda_env = conda_env
if ',' in self.jupyterlab_module:
modules = self.jupyterlab_module.split(",")
for module in modules:
self.add_module(module.strip())
else:
self.add_module(self.jupyterlab_module)
self.add_custom_script("unset XDG_RUNTIME_DIR")
if self.init_conda:
self.add_custom_script(conda_initialise_script)
if self.conda_env != "":
self.add_custom_script("conda activate %s" % self.conda_env)
if self.use_localhost:
self.add_custom_script('jupyter-lab --no-browser')
else:
self.add_custom_script('jupyter-lab --no-browser --ip=$HOSTNAME')
self.add_custom_script("module list")
self.add_custom_script("which python")
[docs]
def on_notebook_url_found(self, url):
"""Event method called when notebook has been found"""
print("Lab found: "+url)
[docs]
def do_process_output(self, output_lines):
"""Process job output"""
Job.do_process_output(self, output_lines)
if self.process_output:
for line in output_lines:
if line.find("?token=") != -1:
if line.find("127.0.0.1") == -1:
if self.process_output:
url = line[line.find("http:"):].strip()
port = find_remote_port(url)
if port!=-1:
self.notebook_port = port
else:
self.notebook_port = 8888
self.notebook_url = url
self.process_output = False
self.on_notebook_url_found(self.notebook_url)
[docs]
class VMJob(Job):
"""Special Job for starting VM:s"""
def __init__(self, account="", partition="", time="00:30:00"):
"""Class constructor"""
super().__init__(account, partition, time)
self.notebook_url = ""
self.process_output = False
self.processing_description = "Waiting Windows session to become available."
self.update_processing = True
#self.add_custom_script("sleep infinity")
self.add_custom_script("while true; do date; sleep 5; done")
self.hostname = ""
self.oversubscribe = True
self.memory = 100
self.gres = "win10m"
self.nodeCount = -1
self.tasksPerNode = -1
[docs]
def do_update_processing(self):
"""Check for vm job ip file"""
home_dir = os.getenv("HOME")
store_dir = os.path.join(home_dir, ".lhpc")
job_host_filename = os.path.join(
store_dir, "vm_host_%s.ip" % str(self.id))
if os.path.exists(job_host_filename):
with open(job_host_filename) as f:
hostname = f.readlines()[0].strip()
self.update_processing = False
self.hostname = hostname
self.on_vm_available(hostname)
[docs]
def on_vm_available(self, hostname):
"""Callback when job ib file found."""
print("VM vailable: "+hostname)
[docs]
class JobPluginBase(Job):
"""Base class for loadable job plugins."""
def __init__(self, account="", partition="", time="00:60:00"):
"""Class constructor"""
super().__init__(account, partition, time)
self.plugin_name = "Noname"
self.plugin_descr = "Plugin that does nothing"