dplutils.pipeline.PipelineTask

dplutils.pipeline.PipelineTask#

class dplutils.pipeline.PipelineTask(name: str, func: ~typing.Callable[[~pandas.core.frame.DataFrame, ...], ~pandas.core.frame.DataFrame], context_kwargs: dict[str, str] = <factory>, kwargs: dict[str, ~typing.Any] = <factory>, num_gpus: int = None, num_cpus: int = 1, resources: dict[str, ~typing.Any] = <factory>, batch_size: int = None)[source]#

Container representing a task and its runtime configuration and dependencies.

The PipelineTask represents metadata about a task within a particular pipeline, the execution of which is handled by a PipelineExecutor. Each task function is expected to take a pandas Dataframe as its first positional argument and produce a dataframe. Any additional arguments should be keyword arguments (required or otherwise) and are expected to be passed to the function by the executor via kwargs and context_kwargs. The values set in a PipelineTask represent the defaults for a task, which may be updated by a particular pipeline invocation.

The __call__ method can be used to return a new task with updated default parameters, for example to customize the name:

>>> MyTask = PipelineTask('task', func=myfunc)
>>> MyNewTask = MyTask('reconf', kwargs=dict(arg=value))
Parameters:
  • name – name of task for reference in pipeline and configuration.

  • func – the callable to execute task operations.

  • context_kwargs – a mapping from pipeline context key to keyword arguments passed to func. Used for reusing arguments that are not task-specific. For example if the several steps access a key data, then setting context_kwargs to {data_in: data} would indicate the executor should pass the value of the context element data to the data_in of func at runtime.

  • kwargs – a dict of task-specific kwargs to pass to function as **kwargs

  • num_cpus – CPU allocations requested for task

  • num_gpus – GPU allocations requested for task

  • resources – dict of any additional resources to pass to the executor

  • batch_size – ideal batch size for this workload.

__init__(name: str, func: ~typing.Callable[[~pandas.core.frame.DataFrame, ...], ~pandas.core.frame.DataFrame], context_kwargs: dict[str, str] = <factory>, kwargs: dict[str, ~typing.Any] = <factory>, num_gpus: int = None, num_cpus: int = 1, resources: dict[str, ~typing.Any] = <factory>, batch_size: int = None) None#

Methods

__init__(name, func[, context_kwargs, ...])

resolve_kwargs(context)

Return a dict of final keyword arguments for the given context.

validate(context)

Validate the arguments of func given context prior to run.

Attributes

batch_size

num_cpus

num_gpus

name

func

context_kwargs

kwargs

resources