Source code for orchestra.workflow

from importlib import import_module

from django.conf import settings

from orchestra.core.errors import InvalidSlugValue
from orchestra.core.errors import SlugUniquenessError


[docs]class Workflow(): """ Workflows represent execution graphs of human and machine steps. Attributes: slug (str): Unique identifier for the workflow. name (str): Human-readable name for the workflow. description (str): A longer description of the workflow. steps (dict): Steps comprising the workflow. """ def __init__(self, **kwargs): self.slug = kwargs.get('slug') if len(self.slug) > 200: raise InvalidSlugValue('Slug value should be less than 200 chars') self.name = kwargs.get('name') self.description = kwargs.get('description') self.steps = {}
[docs] def add_step(self, step): """ Add `step` to the workflow. Args: step (orchestra.workflow.Step): The step to be added. Returns: None Raises: orchestra.core.errors.InvalidSlugValue: Step slug should have fewer than 200 characters. orchestra.core.errors.SlugUniquenessError: Step slug has already been used in this workflow. """ if len(step.slug) > 200: raise InvalidSlugValue('Slug value should be less than 200 chars') if step.slug in self.steps: raise SlugUniquenessError('Slug value already taken') self.steps[step.slug] = step
[docs] def get_steps(self): """ Return all steps for the workflow. Args: None Returns: steps ([orchestra.workflow.Step]): List of steps for the workflow. """ return self.steps.values()
[docs] def get_step_slugs(self): """ Return all step slugs for the workflow. Args: None Returns: slugs ([str]): List of step slugs for the workflow. """ return self.steps.keys()
[docs] def get_step(self, slug): """ Return the specified step from the workflow. Args: slug (str): The slug of the desired step. Returns: step (orchestra.workflow.Step): The specified step from the workflow. """ return self.steps[slug]
[docs] def get_human_steps(self): """ Return steps from the workflow with a human `worker_type`. Args: None Returns: steps ([orchestra.workflow.Step]): Steps from the workflow with a human `worker_type`.. """ return [step for slug, step in self.steps.items() if step.worker_type == Step.WorkerType.HUMAN]
def __str__(self): return self.slug def __unicode__(self): return self.slug
[docs]class Step(): """ Steps represent nodes on a workflow execution graph. Attributes: slug (str): Unique identifier for the step. name (str): Human-readable name for the step. description (str): A longer description of the step. worker_type (orchestra.workflow.Step.WorkerType): Indicates whether the policy is for a human or machine. creation_depends_on ([str]): Slugs for steps on which this step's creation depends. submission_depends_on ([str]): Slugs for steps on which this step's submission depends. function (function): Function to execute during step. Should be present only for machine tasks required_certifications ([str]): Slugs for certifications required for a worker to pick up tasks based on this step. """
[docs] class WorkerType: """Specifies whether step is performed by human or machine""" HUMAN = 0 MACHINE = 1
def __init__(self, **kwargs): self.slug = kwargs.get('slug') self.name = kwargs.get('name') self.description = kwargs.get('description') self.worker_type = kwargs.get('worker_type') self.creation_depends_on = kwargs.get('creation_depends_on') or [] self.submission_depends_on = kwargs.get('submission_depends_on') or [] self.function = kwargs.get('function') self.required_certifications = kwargs.get( 'required_certifications') or [] # Example: {'policy': 'previously_completed_steps', 'step': ['design']} self.assignment_policy = (kwargs.get('assignment_policy') or get_default_policy(self.worker_type, 'assignment_policy')) # Example: {'policy': 'sampled_review', 'rate': .25, 'max_reviews': 2} self.review_policy = (kwargs.get('review_policy') or get_default_policy(self.worker_type, 'review_policy')) # Example: {'html_blob': 'http://some_url', # 'javascript_includes': [url1, url2], # 'css_includes': [url1, url2]} self.user_interface = kwargs.get('user_interface') or {} def __str__(self): return self.slug def __unicode__(self): return self.slug
[docs]def get_workflows(): """ Return all stored workflows. Args: None Returns: workflows ([orchestra.workflow.Workflow]): A dict of all workflows keyed by slug. """ workflows = {} for backend_module, variable in settings.ORCHESTRA_PATHS: backend_module = import_module(backend_module) workflow = getattr(backend_module, variable) if workflow.slug in workflows: raise SlugUniquenessError('Repeated slug value for workflows.') workflows[workflow.slug] = workflow return workflows
[docs]def get_workflow_by_slug(slug): """ Return the workflow specified by `slug`. Args: slug (str): The slug of the desired workflow. Returns: workflow (orchestra.workflow.Workflow): The corresponding workflow object. """ return get_workflows()[slug]
[docs]def get_workflow_choices(): """ Return workflow data formatted as `choices` for a model field. Args: None Returns: workflow_choices (tuple): A tuple of tuples containing each workflow slug and human-readable name. """ workflows = get_workflows() choices = [] for slug, workflow in workflows.items(): choices.append((slug, workflow.name)) return tuple(choices)
[docs]def get_step_choices(): """ Return step data formatted as `choices` for a model field. Args: None Returns: step_choices (tuple): A tuple of tuples containing each step slug and human-readable name. """ choices = [] for slug, workflow in iter(get_workflows().items()): for step in workflow.get_steps(): choices.append((step.slug, step.name)) return tuple(choices)
[docs]def get_default_policy(worker_type, policy_name): """ Return the default value for a specified policy. Args: worker_type (orchestra.workflow.Step.WorkerType): Indicates whether the policy is for a human or machine. policy_name (str): The specified policy identifier. Returns: default_policy (dict): A dict containing the default policy for the worker type and policy name specified. """ default_policies = { 'assignment_policy': {'policy': 'anyone_certified'}, 'review_policy': {'policy': 'sampled_review', 'rate': 1, 'max_reviews': 1} } if worker_type == Step.WorkerType.HUMAN: return default_policies[policy_name] else: return {}