Source code for magic_call._scheduler

"""An abstraction around `concurrent.futures.ThreadPoolExecutor`."""
from concurrent.futures import ThreadPoolExecutor as _ThreadPoolExecutor


[docs]class Task: pass
[docs]class Scheduler: def __init__(self, max_workers=None): self._executor = _ThreadPoolExecutor(max_workers=max_workers)
[docs] def create_task(self, function, *args, **attrs): """ ``*args`` are passed to function ("task" is passed as first argument) ``**attrs`` are assigned as attributes to "task" """ # NB: Using circular dependencies will cause deadlock! # TODO: dataclass? task = Task() assert '_dependencies' not in attrs for k, v in attrs.items(): setattr(task, k, v) assert not hasattr(task, 'future') task.future = self._executor.submit(_wrapper, function, task, *args) return task
def _wrapper(function, task, *args): # TODO: handle keeping dependencies alive with weak references? assert not hasattr(task, '_dependencies') task._dependencies = [] for arg in args: if not isinstance(arg, Task): continue # NB: This waits until all dependencies are done, but it also forces # exceptions to show themselves (for easier debugging). arg.future.result() # The result itself is discarded task._dependencies.append(arg) # Keep dependencies alive return function(task, *args)