Source code for quest.api.tasks

import sys
import psutil

import pandas as pd
from tornado import gen
from functools import wraps
from distributed import Client, LocalCluster
from concurrent.futures import CancelledError

from ..static import DatasetStatus
from ..util import listify, logger

_cluster = None
tasks = {}
futures = {}


class StartCluster():
    def __init__(self, n_cores=None):
        if n_cores is None:
            n_cores = psutil.cpu_count()-2
        self.cluster = LocalCluster(processes=True, n_workers=1)
        self.client = Client(self.cluster)

    def __exit__(self, type, value, traceback):
        self.cluster.close()


def _get_client():
    global _cluster
    if _cluster is None:
        _cluster = StartCluster()
    return _cluster.client


def add_async(f):
    @wraps(f)
    def wrapper(*args, **kwargs):
        async_tasks = kwargs.pop('async_tasks', None)
        if async_tasks:
            client = _get_client()
            future = client.submit(f, *args, **kwargs)
            client.loop.add_callback(add_result_when_done, future)
            futures[future.key] = future
            tasks[future.key] = {
                'fn': f.__name__,
                'args': args,
                'kwargs': kwargs,
                'status': DatasetStatus.PENDING,
                'result': None,
            }
            return future.key
        else:
            return f(*args, **kwargs)
    return wrapper


[docs]def get_pending_tasks(**kwargs): """Return list of pending tasks calls get_tasks with filter -> status=pending, passes through other kwargs (filters={}, expand=None, as_dataframe=None, with_future=None) """ filters = {'status': DatasetStatus.PENDING} if 'filters' in kwargs: kwargs['filters'].update(filters) else: kwargs['filters'] = filters return get_tasks(**kwargs)
[docs]def get_task(task_id, with_future=None): """Get details for a task. Args: task_id (string,Required): id of a task with_future (bool, Optional, Default=None): If true include the task `future` objects in the returned dataframe/dictionary """ task = get_tasks(expand=True, with_future=with_future).get(task_id) if task is None: logger.error('task {} not found'.format(task_id)) return task
[docs]def get_tasks(filters=None, expand=None, as_dataframe=None, with_future=None): """Get all available tasks. Args: filters (dict, Optional, Default=None): filter tasks by one or more of the available filters available filters: * `task_ids` (str or list): task id or list of task ids * `status` (str or list): single status or list of statuses. Must be subset of ['pending', 'cancelled', 'finished', 'lost', 'error'] * `fn` (str): name of the function a task was assigned * `args` (list): list of arguments that were passed to the task function * `kwargs` (dict): dictionary of keyword arguments that were passed to the task function * `result` (object): result of the task function expand (bool, Optional, Default=None): include details of tasks and format as a dict as_dataframe (bool, Optional, Default=None): include details of tasks and format as a pandas dataframe with_future (bool, Optional, Default=None): If true include the task `future` objects in the returned dataframe/dictionary Returns: tasks (list, dict, or pandas dataframe, Default=list): all available tasks """ task_list = tasks if filters: task_ids = listify(filters.pop('task_ids', None)) if task_ids is not None: task_list = {k: v for k, v in task_list.items() if k in task_ids} statuses = listify(filters.pop('status', None)) if statuses is not None: task_list = {k: v for k, v in task_list.items() if v['status'] in statuses} for fk, fv in filters.items(): task_list = {k: v for k, v in task_list.items() if v[fk] == fv} task_list = pd.DataFrame.from_dict(task_list, orient='index') if with_future: task_list['future'] = [futures[k] for k in task_list.index] if not expand and not as_dataframe: task_list = task_list.index.tolist() if expand: task_list = task_list.to_dict(orient='index') return task_list
[docs]def cancel_tasks(task_ids): """Cancel tasks. Args: task_ids (string or list of strings, Required): id of tasks to be cancelled """ task_ids = listify(task_ids) df = get_tasks(with_future=True, as_dataframe=True) futures = df['future'][task_ids].tolist() c = _get_client() c.cancel(futures) return
[docs]def remove_tasks(task_ids=None, status=None): """Remove tasks. Args: task_ids (string or list, Optional, Default=None): tasks with specified id(s) will be removed status (string or list, Optional, Default=None): tasks with specified status(es) will be removed. Valid statuses are: * `cancelled` * `finished` * `lost` * `error` NOTE: `pending` is not a valid option and will be ignored, since pending tasks must be canceled before they can be removed. If no status is specified, remove tasks with status = ['cancelled', 'finished', 'lost', 'error'] from task list """ global tasks if status: status = listify(status) else: status = ['cancelled', 'finished', 'lost', 'error'] if DatasetStatus.PENDING in status: logger.error('cannot remove pending tasks, please cancel them first') status.remove(DatasetStatus.PENDING) task_list = get_tasks(filters={'status': status, 'task_ids': task_ids}) for key in task_list: del tasks[key] del futures[key] return
@gen.coroutine def add_result_when_done(future): try: result = yield future._result() tasks[future.key]['result'] = result except CancelledError as e: tasks[future.key]['result'] = {'error_message': 'task cancelled'} except: tasks[future.key]['result'] = {'error_message': str(sys.exc_info()[0])} tasks[future.key]['status'] = future.status