dplutils.pipeline.stream.StreamingGraphExecutor

dplutils.pipeline.stream.StreamingGraphExecutor#

class dplutils.pipeline.stream.StreamingGraphExecutor(graph, max_batches: int = None, generator: Callable[[], Generator[DataFrame, None, None]] = None)[source]#

Base class for implementing streaming remote graph execution

This class implements the execute() method of PipelineExecutor and contains logic necessary to schedule tasks, prioritizing completing those that are closer to terminals. It supports arbitrary pipeline graphs with branches, multiple inputs and outputs. By default, for each run, it generates a indefinite stream of input dataframes tagged with a monotonically incrementing batch id.

Parameters:
  • max_batches – maximum number of batches from the source generator to feed to the input task(s). Default is None, which means either exhaust the source generator or run indefinitely.

  • generator – A callable that when called returns a generator which yields dataframes. The driver will call len() on the yielded dataframes to obtain the number of rows and will split and batch according to the input task settings. Each generated dataframe, regardless of size, counts as a single source batch with respect to max_batches.

Implementations must override abstract methods for (remote) task submission and polling. The following must be overriden, see their docs for more:

__init__(graph, max_batches: int = None, generator: Callable[[], Generator[DataFrame, None, None]] = None)[source]#

Methods

__init__(graph[, max_batches, generator])

enqueue_tasks()

execute()

Execute the task graph in batches.

execute_until_output()

from_graph(graph)

get_pending()

is_task_ready(pending_task)

Return true if pending task is ready

output_batch_transform(batch)

poll_tasks(pending_task_list)

Wait for any change in status to pending_task_list

pre_execute()

resolve_completed()

run()

Validate and run the pipeline.

set_config([coord, value, from_yaml])

Set task configuration options for this instance.

set_config_from_dict(config)

set_context(key, value)

source_generator_fun()

split_batch_submit(batch, max_rows)

Submit a task to split batch into at most max_rows

task_exhausted([task])

task_resolve_output(pending_task)

Return a StreamBatch from completed task

task_submit(task, df_list)

Run or arrange for the running of task

task_submittable(task, rank)

Preflight check if task can be submitted

validate()

writeto(outdir[, partition_by_task, ...])

Run pipeline, writing results to parquet table.

Attributes

run_id

tasks_idx