dplutils.pipeline.ray.RayStreamGraphExecutor

dplutils.pipeline.ray.RayStreamGraphExecutor#

class dplutils.pipeline.ray.RayStreamGraphExecutor(*args, ray_poll_timeout: int = 20, **kwargs)[source]#

Ray-based implementation of stream graph executor.

All task outputs are kept in object store and only de-serialized as needed for execution, until yielded by run(), where they are de-serialized on the driver.

This executor will attempt to pack the cluster, irrespective of any other workloads.

Note

Ray cluster will be initialized with defaults upon run if it hasn’t already been initialized

Parameters:
  • ray_poll_timeout – After scheduling as many tasks as can fit, ray.wait on all pending tasks for ray_poll_timeout seconds. The timeout gives opportunity to re-evaluate cluster resources in case it has expanded since last scheduling loop

  • *args – These are passed to StreamingGraphexecutor

  • **kwargs – These are passed to StreamingGraphexecutor

__init__(*args, ray_poll_timeout: int = 20, **kwargs)[source]#

Methods

__init__(*args[, ray_poll_timeout])

enqueue_tasks()

execute()

Execute the task graph in batches.

execute_until_output()

from_graph(graph)

get_pending()

is_task_ready(remote_task)

Return true if pending task is ready

output_batch_transform(batch)

poll_tasks(remote_task_list)

Wait for any change in status to pending_task_list

pre_execute()

resolve_completed()

run()

Validate and run the pipeline.

set_config([coord, value, from_yaml])

Set task configuration options for this instance.

set_config_from_dict(config)

set_context(key, value)

source_generator_fun()

split_batch_submit(stream_batch, max_rows)

Submit a task to split batch into at most max_rows

task_exhausted([task])

task_resolve_output(remote_task)

Return a StreamBatch from completed task

task_submit(task, df_list)

Run or arrange for the running of task

task_submittable(task, rank)

Preflight check if task can be submitted

validate()

writeto(outdir[, partition_by_task, ...])

Run pipeline, writing results to parquet table.

Attributes

run_id

tasks_idx