Source code for snaketools.snaketools

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Provide code supporting the running and automating of Snakemake rules."""

# Imports
from collections import OrderedDict
from pathlib import Path
import textwrap

import munch

from logzero import logger as log  # noqa: F401

from snaketools import errors as e


__all__ = ["apply_template", "pathify_by_key_ends", "SnakeRun", "SnakeRule", "recode_graph", "rewrite_snakefile_no_rules"]


[docs]class SnakeRun(object): """Initialize and manage information common to the whole run."""
[docs] def __init__(self, cfg, snakefile): """Initialize common information for a run.""" assert isinstance(cfg, dict) common = cfg["COMMON"] self.snakefile = snakefile self.globals = munch.Munch() self.cfg = cfg self.name = common["RUN_NAME"] try: self.interim_dir = common["INTERIM_DIR"] except KeyError: self.interim_dir = None self.out_dir = Path("{base_dir}/{run_name}".format(base_dir=common["OUT_DIR"], run_name=self.name)) self.pretty_names = {} self.log_dir = self.out_dir / "logs" self.rules = munch.Munch()
[docs]class SnakeRule(object): """Manage the initialization and deployment of rule-specific information."""
[docs] def __init__(self, run, name, pretty_name=None): """Initialize logs, inputs, outputs, params, etc for a single rule.""" assert isinstance(run, SnakeRun) if pretty_name is None: pretty_name = name self.run = run self.name = name.lower() self.pretty_name = pretty_name self.run.pretty_names[self.name] = pretty_name self.log_dir = run.log_dir / self.name self.log = self.log_dir / "{name}.log".format(name=self.name) self.out_dir = run.out_dir / self.name self.i = munch.Munch() # inputs self.o = munch.Munch() # outputs self.p = munch.Munch() # params self.extra = munch.Munch() # params self.run.rules[name] = self self._import_config_dict()
[docs] def _import_config_dict(self): """Import configuration values set for this rule so they are directly accessable as attributes.""" try: for key, val in self.run.cfg[self.name.upper()].items(): self.__setattr__(key, val) self.cfg = True except KeyError: self.cfg = False
[docs]def apply_template(template, keywords): """Return a list of strings of form ``template`` with values in ``keywords`` inserted. Args: template (``str``): a string containing keywords (``{kw_name}``). keywords (``dict``-like): dict with keys of appropriate keyword names and values as equal length ORDERED lists with the correct values to be inserted. """ # Check lengths of keywords list_lens = set([len(x) for x in keywords.values()]) if len(list_lens) != 1: raise e.ValidationError("keywords dict must contain values of constant length.") formatted = [] for i in range(len(list(keywords.values())[0])): args = {k: v[i] for k, v in keywords.items()} formatted.append(template.format(**args)) return formatted
def pathify_this(key): """Return `True` if the value associated with this key should be pathified.""" pathify_these = {"PATH", "FILE", "DIR"} return bool(key.split("_")[-1] in pathify_these)
[docs]def pathify_by_key_ends(dictionary): """Return a dict that has had all values with keys containing the suffixes: '_FILE', '_PATH' or '_DIR' converted to Path() instances. Args: dictionary (dict-like): Usually the loaded, processed config file as a `dict`. Returns: dict-like: Modified version of the input. """ for key, value in dictionary.items(): if isinstance(value, dict): pathify_by_key_ends(value) elif key.endswith("_PATH") or key.endswith("_DIR"): dictionary[key] = Path(value) return dictionary
# DAG and rulegraph stuff def digest_node_line(line): """Return OrderedDict of relevant line parts.""" line = line.strip() d = OrderedDict() d["num"], fields = line.split('[') fields = fields.replace('rounded,dashed', 'rounded-dashed') fields = fields.rstrip('];').split(',') fields[-1] = fields[-1].replace('rounded-dashed', 'rounded,dashed') for field in fields: key, value = field.split('=') d[key.strip()] = value.strip().replace('"', '').replace("'", "") return d def should_ignore_line(line, strings_to_ignore): """Return true if line contains a rule name in `rule_names`.""" for string in strings_to_ignore: if string in line: return True return False
[docs]def recode_graph(dot, new_dot, pretty_names, rules_to_drop, color=None, use_pretty_names=True): """Change `dot` label info to pretty_names and alter styling.""" if color is None: color = "#50D0FF" node_patterns_to_drop = [] with open(dot, mode='r') as dot: with open(new_dot, mode='w') as new_dot: for line in dot: if '[label = "' in line: # Add pretty names and single color IF pretty names are provided. data = digest_node_line(line=line) rule_name = data['label'] if use_pretty_names: pretty_name = textwrap.fill(pretty_names[rule_name], width=40).replace('\n', '\\n') full_name = "[{rule_name}]\\n{pretty_name}".format(rule_name=rule_name, pretty_name=pretty_name) data['label'] = full_name data['color'] = color else: pass fields = ', '.join(['{k} = "{v}"'.format(k=k, v=v) for k, v in data.items()][1:]) if should_ignore_line(line, strings_to_ignore=rules_to_drop): node_patterns_to_drop.append("\t{num} ->".format(num=data['num'])) node_patterns_to_drop.append("-> {num}\n".format(num=data['num'])) continue new_line = """\t{num}[{fields}];\n""".format(num=data['num'], fields=fields) new_dot.write(new_line) else: if should_ignore_line(line, strings_to_ignore=node_patterns_to_drop): continue elif "fontname=sans" in line: line = line.replace("fontname=sans", "fontname=Cantarell") line = line.replace("fontsize=10", "fontsize=11") new_dot.write(line) else: new_dot.write(line)
[docs]def rewrite_snakefile_no_rules(infile, outfile): """Write new file, omitting the snakemake grammar sections.""" def rule_declaration(line): return line.startswith("rule") def startswith_indent(line): return line.startswith(" ") def get_line_after_rule(file): for line in file: if not startswith_indent(line): return line infile = Path(infile) outfile = Path(outfile) with outfile.open('w') as out, infile.open('r') as snek: for line in snek: if not rule_declaration(line): out.write(line) else: out.write(get_line_after_rule(line))