dplutils.pipeline.PipelineTask#
- class dplutils.pipeline.PipelineTask(name: str, func: ~typing.Callable[[~pandas.core.frame.DataFrame, ...], ~pandas.core.frame.DataFrame], context_kwargs: dict[str, str] = <factory>, kwargs: dict[str, ~typing.Any] = <factory>, num_gpus: int = None, num_cpus: int = 1, resources: dict[str, ~typing.Any] = <factory>, batch_size: int = None)[source]#
Container representing a task and its runtime configuration and dependencies.
The PipelineTask represents metadata about a task within a particular pipeline, the execution of which is handled by a
PipelineExecutor. Each task function is expected to take a pandas Dataframe as its first positional argument and produce a dataframe. Any additional arguments should be keyword arguments (required or otherwise) and are expected to be passed to the function by the executor viakwargsandcontext_kwargs. The values set in a PipelineTask represent the defaults for a task, which may be updated by a particular pipeline invocation.The
__call__method can be used to return a new task with updated default parameters, for example to customize the name:>>> MyTask = PipelineTask('task', func=myfunc) >>> MyNewTask = MyTask('reconf', kwargs=dict(arg=value))
- Parameters:
name – name of task for reference in pipeline and configuration.
func – the callable to execute task operations.
context_kwargs – a mapping from pipeline context key to keyword arguments passed to func. Used for reusing arguments that are not task-specific. For example if the several steps access a key
data, then setting context_kwargs to {data_in:data} would indicate the executor should pass the value of the context elementdatato thedata_inof func at runtime.kwargs – a dict of task-specific kwargs to pass to function as
**kwargsnum_cpus – CPU allocations requested for task
num_gpus – GPU allocations requested for task
resources – dict of any additional resources to pass to the executor
batch_size – ideal batch size for this workload.
- __init__(name: str, func: ~typing.Callable[[~pandas.core.frame.DataFrame, ...], ~pandas.core.frame.DataFrame], context_kwargs: dict[str, str] = <factory>, kwargs: dict[str, ~typing.Any] = <factory>, num_gpus: int = None, num_cpus: int = 1, resources: dict[str, ~typing.Any] = <factory>, batch_size: int = None) None#
Methods
__init__(name, func[, context_kwargs, ...])resolve_kwargs(context)Return a dict of final keyword arguments for the given context.
validate(context)Validate the arguments of
funcgiven context prior to run.Attributes
batch_sizenum_cpusnum_gpusnamefunccontext_kwargskwargsresources