dplutils.pipeline.stream.StreamingGraphExecutor#
- class dplutils.pipeline.stream.StreamingGraphExecutor(graph, max_batches: int = None, generator: Callable[[], Generator[DataFrame, None, None]] = None)[source]#
Base class for implementing streaming remote graph execution
This class implements the
execute()method ofPipelineExecutorand contains logic necessary to schedule tasks, prioritizing completing those that are closer to terminals. It supports arbitrary pipeline graphs with branches, multiple inputs and outputs. By default, for each run, it generates a indefinite stream of input dataframes tagged with a monotonically incrementing batch id.- Parameters:
max_batches – maximum number of batches from the source generator to feed to the input task(s). Default is None, which means either exhaust the source generator or run indefinitely.
generator – A callable that when called returns a generator which yields dataframes. The driver will call
len()on the yielded dataframes to obtain the number of rows and will split and batch according to the input task settings. Each generated dataframe, regardless of size, counts as a single source batch with respect tomax_batches.
Implementations must override abstract methods for (remote) task submission and polling. The following must be overriden, see their docs for more:
- __init__(graph, max_batches: int = None, generator: Callable[[], Generator[DataFrame, None, None]] = None)[source]#
Methods
__init__(graph[, max_batches, generator])enqueue_tasks()execute()Execute the task graph in batches.
execute_until_output()from_graph(graph)get_pending()is_task_ready(pending_task)Return true if pending task is ready
output_batch_transform(batch)poll_tasks(pending_task_list)Wait for any change in status to
pending_task_listpre_execute()resolve_completed()run()Validate and run the pipeline.
set_config([coord, value, from_yaml])Set task configuration options for this instance.
set_config_from_dict(config)set_context(key, value)source_generator_fun()split_batch_submit(batch, max_rows)Submit a task to split batch into at most
max_rowstask_exhausted([task])task_resolve_output(pending_task)Return a
StreamBatchfrom completed tasktask_submit(task, df_list)Run or arrange for the running of task
task_submittable(task, rank)Preflight check if task can be submitted
validate()writeto(outdir[, partition_by_task, ...])Run pipeline, writing results to parquet table.
Attributes
run_idtasks_idx