dplutils.pipeline.ray.RayStreamGraphExecutor#
- class dplutils.pipeline.ray.RayStreamGraphExecutor(*args, ray_poll_timeout: int = 20, **kwargs)[source]#
Ray-based implementation of stream graph executor.
All task outputs are kept in object store and only de-serialized as needed for execution, until yielded by
run(), where they are de-serialized on the driver.This executor will attempt to pack the cluster, irrespective of any other workloads.
Note
Ray cluster will be initialized with defaults upon run if it hasn’t already been initialized
- Parameters:
ray_poll_timeout – After scheduling as many tasks as can fit, ray.wait on all pending tasks for ray_poll_timeout seconds. The timeout gives opportunity to re-evaluate cluster resources in case it has expanded since last scheduling loop
*args – These are passed to
StreamingGraphexecutor**kwargs – These are passed to
StreamingGraphexecutor
Methods
__init__(*args[, ray_poll_timeout])enqueue_tasks()execute()Execute the task graph in batches.
execute_until_output()from_graph(graph)get_pending()is_task_ready(remote_task)Return true if pending task is ready
output_batch_transform(batch)poll_tasks(remote_task_list)Wait for any change in status to
pending_task_listpre_execute()resolve_completed()run()Validate and run the pipeline.
set_config([coord, value, from_yaml])Set task configuration options for this instance.
set_config_from_dict(config)set_context(key, value)source_generator_fun()split_batch_submit(stream_batch, max_rows)Submit a task to split batch into at most
max_rowstask_exhausted([task])task_resolve_output(remote_task)Return a
StreamBatchfrom completed tasktask_submit(task, df_list)Run or arrange for the running of task
task_submittable(task, rank)Preflight check if task can be submitted
validate()writeto(outdir[, partition_by_task, ...])Run pipeline, writing results to parquet table.
Attributes
run_idtasks_idx