Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broadway.prepare_messages/2 callback always receives exactly one message #131

Closed
jvoegele opened this issue Dec 11, 2023 · 16 comments
Closed

Comments

@jvoegele
Copy link

The Broadway docs for the Broadway.prepare_messages/2 callback state:

The length of the list of messages received by this callback is based on the min_demand/max_demand configuration in the processor.

However, in our application using BroadwayRabbitMQ.Producer, it seems as if the length of the list of messages is always exactly one regardless of the min_demand/max_demand configuration in the processor. Furthermore, it doesn't seem to matter if there are only a few messages in the RabbitMQ queue being used, or if there there are very many messages in the queue. Whenever I IO.inspect(length(messages), label: "prepare_messages batch size") in my prepare_messages callback, it always prints prepare_messages batch size: 1.

Is this a bug in BroadwayRabbitMQ.Producer? Or have I misconfigured something? Or is there something else happening here that I'm not correctly understanding?

For reference, here is the Broadway topology for the pipeline in question:

[
  producers: [
    %{
      name: Integration.MQ.TrackingData.PlayerTrackingConsumer.Broadway.Producer,
      concurrency: 1
    }
  ],
  processors: [
    %{
      name: Integration.MQ.TrackingData.PlayerTrackingConsumer.Broadway.Processor_default,
      concurrency: 2,
      processor_key: :default
    }
  ],
  batchers: [
    %{
      name: Integration.MQ.TrackingData.PlayerTrackingConsumer.Broadway.BatchProcessor_derived_stats,
      concurrency: 4,
      batcher_key: :derived_stats,
      batcher_name: Integration.MQ.TrackingData.PlayerTrackingConsumer.Broadway.Batcher_derived_stats
    }
  ]
]

And below is the (elided) configuration being passed to Broadway.start_link. Note that :qos and most other optional configs are not being overridden, and that the processor is configured with min_demand: 10, max_demand: 20, which should result in a batch of 10 messages being passed to prepare_messages.

    producer: [
        module: {BroadwayRabbitMQ.Producer,
         [
           bindings: [
             {"***elided exchange***",
              [routing_key: "***elided***"]}
           ],
           on_failure: :reject,
           metadata: [:delivery_tag, :exchange, :routing_key, :content_type,
            :content_encoding, :headers, :correlation_id, :message_id,
            :timestamp, :type, :app_id],
           declare: [durable: true],
           queue: "***elided***",
           connection: "***elided***"
         ]}
    ],
    processors: [
      default: [
        concurrency: 2,
        min_demand: 10,
        max_demand: 20
      ]
    ],
    batchers: [
      derived_stats: [
        concurrency: 4,
        batch_size: 20,
        batch_timeout: 1_000
      ]
    ]
@josevalim
Copy link
Member

I am almost sure the RabbitMQ producer is push based, so it is receiving the messages from amqp connection. So perhaps it is something that needs to be configured there to receive bigger batches?

@jvoegele
Copy link
Author

I am almost sure the RabbitMQ producer is push based, so it is receiving the messages from amqp connection. So perhaps it is something that needs to be configured there to receive bigger batches?

Thank you for the very prompt reply, Jose!

My understanding is that this is controlled by the :prefetch_count (and/or :prefetch_size) configuration, which is part of the :qos option that can be passed to the BroadwayRabbitMQ.Producer. However, in our code we are not explicitly setting any :prefetch_count, or :prefetch_size, or any :qos options, so we are using the default value of prefetch_count: 50.

@whatyouhide
Copy link
Collaborator

@jvoegele prefetch_count and prefetch_size don't really work on Elixir messages. Those work on RabbitMQ messages and bytes going from RabbitMQ to the AMQP connection (channel, actually) in your application. Then, the channel still delivers messages one by one to the process that is consuming from that channel, which in this case is the BroadwayRabbitMQ.Producer. The producer always produces one message at a time as soon as it gets it:

{:noreply, [message], state}

So, you'll see >1 messages only if your processors/batchers start processing messages slower, which will result in messages queueing up in the producer (because of demand and all the GenStage stuff) and then being delivered at once.

Does that make sense?

@jvoegele
Copy link
Author

Thank you, @whatyouhide, that does make sense.

Is there anything in the Broadway or BroadwayRabbitMQ docs that explains this in any way? It seems like there's a bit of a disconnect between the Broadway docs that state that prepare_messages batch size is based on min_demand/max_demand of the processor stage, and what you've explained above where that batch size is based on whether the processors and/or batchers are falling behind in processing.

Also, is this behavior consistent with other Broadway producers, or is this a specific "quirk" in the BroadwayRabbitMQ.Producer?

Thank you for your help.

@whatyouhide
Copy link
Collaborator

whatyouhide commented Dec 11, 2023

batch size is based on whether the processors and/or batchers are falling behind in processing.

This is not really generic, it's more specific to how AMQP (the lib) works and how the Broadway RabbitMQ producer works, as you mentioned later. But any improvement to the doc is super appreciated, PRs are more than welcome 🙃

@cigrainger
Copy link

I'm not sure how I didn't come across this before, but I've been struggling with this as well! I sent a PR but now that I see this it's clear that I didn't really clear it up 😬.

Has anyone figured out a workaround? @LostKobrakai suggested here that it might be a matter of adding an aggregator in between? Perhaps a custom producer that uses this producer but pools?

Another option would be to add a size + timeout set of options here to accumulate batches. Would you be open to a PR that does this? The behaviour would be identical to the batcher behaviour in Broadway (send the messages once either N messages have accumulated or the timeout has been reached). The default could be 1 message and 0ms timeout to match current behaviour.

@josevalim
Copy link
Member

I have pushed more docs to this.

Another option would be to add a size + timeout set of options here to accumulate batches. Would you be open to a PR that does this?

First we need to discuss why you need this feature. It feels you want prepare_messages to work as some sort of batcher, but that's not a guarantee that it provides nor one that it can provide. It is a convenience for looking all messages at once as part of the fan-out stage of your pipeline. If you want to guarantee batching, then you need to move the logic to the batcher.

@josevalim
Copy link
Member

First we need to discuss why you need this feature. It feels you want prepare_messages to work as some sort of batcher, but that's not a guarantee that it provides nor one that it can provide

For example, even if you tell the producer to batch, if may create a batch of 4, but then you have four processors, then each of them will still receive a single message. We really can't batch effectively at the same time we fan-out.

@cigrainger
Copy link

This makes sense, but it's because I need to hit the database to prepare messages. Each message is an identifier and I need to fetch data from both elasticsearch and postgres before doing work in handle_message. The docs in Broadway suggest this:

For example, if you need to query the database, instead of doing it once per message, you can do it on this callback.

So I think I'm understanding it as you described: 'a convenience for looking all messages at once as part of the fan-out stage of your pipeline', but the behaviour today (one message at a time no matter what) precludes that.

For example, even if you tell the producer to batch, if may create a batch of 4, but then you have four processors, then each of them will still receive a single message. We really can't batch effectively at the same time we fan-out.

This would be fine! 100% expected. Right now I have one processor and no matter what min/max demand I set, no matter what happens, I see one message at a time in prepare_messages.

@cigrainger
Copy link

It's actually causing ack timeout problems for me that I've had to work around. Because the docs don't really reflect this:

prefetch_count and prefetch_size don't really work on Elixir messages

I've been trying to increase prefetch_count to no avail. In fact, the docs suggest setting it to max_demand * producers despite the above. What happens for me is that this translates to a bunch of unacked messages that time out. But the alternative is that I try to work with one message at at time coming from the producer and increase throughput by increasing the number of pocessors, and then my db gets overloaded with single identifier requests and the whole thing grinds to a halt. So I'm stuck between a rock and a hard place 😬 .

I could try to just use a different producer like SQS, but right now we're not running this on AWS and I'd love to keep it that way. What you suggested at the top of the thread:

So perhaps it is something that needs to be configured there to receive bigger batches?

Makes sense! And the docs say this is possible, but then it seems from the line @whatyouhide referenced above that these opts are basically no-ops?

@josevalim
Copy link
Member

Qos controls what happens over the network, but not at the message level. In any case it is clear this is a documentation issue. Prepare messages is a best effort optimization and, in case of rabbit, it does not happen. You need to move the logic to handle_batch.

@josevalim
Copy link
Member

Another approach is for you to introduce an intermediate process, called by handle_message, that does very simple batching before reaching out to elastic search or the database.

@cigrainger
Copy link

cigrainger commented Jan 19, 2024

You need to move the logic to handle_batch.

If I do that then I lose the ability to direct to different batchers based on information in handle_message -- I noop some messages, send some to another queue, and upsert some to ES.

Another approach is for you to introduce an intermediate process

I'll try this approach. Similar to Nx.Serving right? Batch size/timeout, keep track of the caller for each message, process the batch, then respond to the caller with the results? This seems pretty efficient -- I think I could go quite wide with the number of processors.

Thanks for the help!

@josevalim
Copy link
Member

I'll try this approach. Similar to Nx.Serving right? Batch size/timeout, keep track of the caller for each message, process the batch, then respond to the caller with the results?

Yes, it should be relatively straight-forward. You will keep all requests in a list and have a counter.

  1. start with an empty list and counter to 0
  2. Once you receive the first entry, start a timeout
  3. Either the counter gets to a limit or the timeout triggers
  4. Once that happens, clear the timer (if any), process the batch, and send the responses back with GenServer.reply/2
  5. Go back to 1

A bonus would be to start a task to process the batch and have the task send the replies back to everyone waiting.

@josevalim
Copy link
Member

I will close this, as I pushed docs to broadway. :)

@cigrainger
Copy link

Thanks again! The above will definitely work for us, nearly got it going already :). 100% a documentation issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants