Source code for troika.controllers.base

"""Base controller class"""

import logging
import os
import pathlib
import shutil
import tempfile

from .. import ConfigurationError, InvocationError, RunError, hook, site
from ..directives import ALIASES, translators
from ..generator import Generator
from ..parser import DirectiveParser, MultiParser, ParseError, ShebangParser

_logger = logging.getLogger(__name__)


[docs]class Controller: """Main controller Parameters ---------- config: :py:class:`troika.config.Config` Configuration args: :py:class:`argparse.Namespace` Command-line arguments logfile: path-like or None Path to the log file """ __type_name__ = "base" def __init__(self, config, args, logfile): self.config = config self.args = args self.logfile = logfile self.site = None self.default_shebang = None self.unknown_directive = "warn" self.script_data = {}
[docs] def __repr__(self): """Return a printable representation""" return "Controller"
[docs] def submit(self, script, user, output, dryrun=False): """Process a 'submit' command The script and job ID are interpreted according to the site. Parameters ---------- script: path-like Path to the job script user: Remote user name output: path-like Path to the job output file dryrun: bool If True, do not submit, only report what would be done Returns ------- int Return code (0 for success) """ with self.action_context(parse_script=script) as context: pp_script = self.generate_script(script, user, output) hook.pre_submit(self.site, script, output, dryrun) self.site.submit(pp_script, user, output, dryrun) return context.status
[docs] def monitor(self, script, user, output=None, jid=None, dryrun=False): """Process a 'monitor' command The script and job ID are interpreted according to the site. If no job ID is provided, it will be inferred. Parameters ---------- script: path-like Path to the job script user: str Remote user name output: path-like or None Path to the job output file jid: str or None Job ID dryrun: bool If True, do not do anything, only report what would be done Returns ------- int Return code (0 for success) """ with self.action_context() as context: self.site.monitor(script, user, output, jid, dryrun) return context.status
[docs] def kill(self, script, user, output=None, jid=None, dryrun=False): """Process a 'kill' command The script and job ID are interpreted according to the site. If no job ID is provided, it will be inferred. Parameters ---------- script: path-like Path to the job script user: str Remote user name output: path-like or None Path to the job output file jid: str or None Job ID dryrun: bool If True, do not kill, only report what would be done Returns ------- int Return code (0 for success) """ with self.action_context() as context: jid, cancel_status = self.site.kill(script, user, output, jid, dryrun) hook.post_kill(self.site, script, output, jid, cancel_status, dryrun) return context.status
[docs] def check_connection(self, timeout=None, dryrun=False): """Process a 'check-connection' command Parameters ---------- timeout: int If set, consider the connection is not working if no response after this number of seconds dryrun: bool If True, do not do anything but print the command that would be executed Returns ------- bool True if the connection is able to execute commands """ with self.action_context() as context: working = self.site.check_connection(timeout=timeout, dryrun=dryrun) if not working: context.status = 1 return context.status == 0
[docs] def list_sites(self): """Process a 'list-sites' command Yields ------- ``(name, type, connection)`` """ yield from site.list_sites(self.config)
class ActionContext: def __init__(self, controller, *args, **kwargs): self._controller = controller self._controller.setup(*args, **kwargs) def __enter__(self): self.status = None return self def __exit__(self, exc_type, exc, traceback): if exc is None: if self.status is None: self.status = 0 swallow = False else: if self.status is None or self.status == 0: self.status = 1 if isinstance(exc, ConfigurationError): _logger.critical("Configuration error: %s", exc) elif isinstance(exc, InvocationError): _logger.critical("Invocation error: %s", exc) elif isinstance(exc, RunError): _logger.critical("%s", exc) else: _logger.error( "Unhandled exception", exc_info=(exc_type, exc, traceback) ) swallow = True try: self._controller.teardown(self.status) except Exception as e: # An error during the at-exit hooks should _not_ be reported as # failure of the operation (e.g. submission failure) _logger.error("Exception during teardown/exit", exc_info=e) return swallow
[docs] def action_context(self, *args, **kwargs): """Create a context manager for executing an action The arguments are passed to :py:meth:`setup` when entering the context manager """ return self.ActionContext(self, *args, **kwargs)
[docs] def setup(self, parse_script=None): """Set up the controller Parameters ---------- parse_script: path-like, optional Parse this script """ self.site = self._get_site() if parse_script is not None: self.parse_script(parse_script) hook.setup_hooks(self.config, self.args.site) res = hook.at_startup(self.args.action, self.site, self.args) if any(res): raise SystemExit(1) self.default_shebang = self.site.config.get("default_shebang", None) self.unknown_directive = self.site.config.get("unknown_directive", "warn")
[docs] def teardown(self, sts=0): """Tear down the controller Parameters ---------- sts: int Exit status """ hook.at_exit(self.args.action, self.site, self.args, sts, self.logfile)
[docs] def parse_script(self, script): """Parse the script Parameters ---------- script: path-like Script to parse """ dir_parser = DirectiveParser(aliases=ALIASES) parsers = [("directives", dir_parser)] native = self.site.get_native_parser() if native is not None: parsers.append(("native", native)) parsers.append(("shebang", ShebangParser())) parser = MultiParser(parsers) body = self.run_parser(script, parser) self.script_data.update(parser.data) self.script_data["body"] = body dir_defines = getattr(self.args, "define", []) dir_overrides = dir_parser.parse_directive_args(dir_defines) self.script_data["directives"].update(dir_overrides)
[docs] def run_parser(self, script, parser): """Run the parser on the script Parameters ---------- script: path-like Path to the script parser: :py:class:`troika.parser.Parser` Parser to use Returns ------- file-like Script body """ script = pathlib.Path(script) stmp = tempfile.SpooledTemporaryFile( max_size=1024**3, mode="w+b", dir=script.parent, prefix=script.name ) with open(script, "rb") as sin: try: for lineno, line in enumerate(sin, start=1): drop = parser.feed(line) if not drop: stmp.write(line) except ParseError as e: raise ParseError(f"in {script!s}, line {lineno} {e!s}") from e stmp.seek(0) return stmp
[docs] def generate_script(self, script, user, output): """Generate the post-processed script Parameters ---------- script: path-like Path to the script user: str or None Remote user name output: path-like Path to the job output file Returns ------- path-like Path to the post-processed script """ if ( self.default_shebang is not None and self.script_data.get("shebang", None) is None ): self.script_data["shebang"] = self.default_shebang.encode("utf-8") directive_prefix, directive_translate = self.site.get_directive_translation() generator = Generator( directive_prefix, directive_translate, self.unknown_directive ) self.script_data["directives"]["output_file"] = os.fsencode(output) self.script_data = translators(self.script_data, self.config, self.site) return self.run_generator(script, generator)
[docs] def run_generator(self, script, generator): """Generate the script file Parameters ---------- script: path-like Path to the script generator: :py:class:`troika.generator.Generator` Generator to run Returns ------- path-like Path to the generated script file """ script = pathlib.Path(script) orig_script = script.with_suffix(script.suffix + ".orig") if orig_script.exists(): _logger.warning( "Backup script file %r already exists, " + "overwriting", str(orig_script), ) with tempfile.NamedTemporaryFile( mode="w+b", delete=False, dir=script.parent, prefix=script.name ) as sout: sout.writelines(generator.generate(self.script_data)) for line in self.script_data["body"]: sout.write(line) new_script = pathlib.Path(sout.name) shutil.copymode(script, new_script) shutil.copy2(script, orig_script) new_script.replace(script) _logger.debug("Script generated. Original script saved to %r", str(orig_script)) return script
[docs] def _get_site(self): """Select the site and create the corresponding :py:class:`troika.sites.base.Site` instance""" return site.get_site(self.config, self.args.site, self.args.user)