Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement operator chaining #804

Merged
merged 14 commits into from
Dec 10, 2024
Merged

Implement operator chaining #804

merged 14 commits into from
Dec 10, 2024

Conversation

mwylde
Copy link
Member

@mwylde mwylde commented Dec 9, 2024

Currently each operator is planned as its own node in the dataflow graph, with queues or network stack between them. This PR adds a new capability to the dataflow system, the ability to chain operators together in a single node.

chaining

This has a number of benefits:

  • Reduces memory used by the queues
  • Reduces overhead of producing/consuming from the queue
  • Fewer tasks to be scheduled and managed by the controller

For large and complex pipelines, the effect can be quite large, significantly reducing the memory requirements for the pipeline (particularly at larger queue sizes).

Chaining is implemented with a type of node, ChainedOperator. Currently all operators aside from sources are wrapped in a ChainedOperator, which may have one or more elements in the chain. Dataflow between links in a chain is performed via function call, thanks to a new Collector trait and its implementor ChainCollector which when it receives a record batch, immediately calls the handle method of the next operator with the batch. Signal messages are similarly propagated, with care to ensure that the signals are properly interleaved with data messages produced by the signal.

Currently an operator will be chained into its predecessor if:

  • The predecessor is not a source
  • The current node is not a sink
  • The predecessor has exactly one downstream node
  • The current node has exactly one upstream node
  • The edge to our predecessor is a forward
  • Our predecessor has our same parallelism

In the future we may relax the restriction on source/sink chaining.

Initially chaining is disabled by default. It can be enabled with the configuration setting pipeline.chaining.enabled = true

@mwylde mwylde merged commit cde8210 into master Dec 10, 2024
6 checks passed
@mwylde mwylde deleted the chaining branch December 10, 2024 18:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant