Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consume as part of consumer group consumes more than max-messages #198

Open
bzakhar opened this issue May 23, 2024 · 3 comments
Open

consume as part of consumer group consumes more than max-messages #198

bzakhar opened this issue May 23, 2024 · 3 comments
Labels
bug Something isn't working

Comments

@bzakhar
Copy link

bzakhar commented May 23, 2024

Current behavior:

kafkactl consume -g group_name --max-messages 1 topic.name

advances group_name consumer offset by more than 1 while only outputting one message. Simple reproducer:

$ kafkactl create topic test.topic -p 1
$ for i in {1..100}; do kafkactl produce test.topic -s -v $i; done
$ kafkactl consume -g test_cg --from-beginning --max-messages 1 test.topic
$ kafkactl describe cg test_cg
$ kafkactl consume -g test_cg --max-messages 1 test.topic
$ kafkactl describe cg test_cg
# repeat a few times
# ...

you will notice that messages are "skipped" and CG offset advances by more than 1 after each invocation of kafkactl consume. In my case:

$ kafkactl create topic test.topic -p 1
$ for i in {1..100}; do kafkactl produce test.topic -s -v $i; done
$ kafkactl consume --from-beginning -g test_cg --max-messages 1 test.topic
1
$ kafkactl describe cg test_cg
TOPIC          PARTITION     NEWEST_OFFSET     OLDEST_OFFSET     CONSUMER_OFFSET     LEAD     LAG
test.topic     0             100               0                 2                   2        98

CLIENT_HOST     CLIENT_ID     TOPIC     ASSIGNED_PARTITIONS

$ kafkactl consume -g test_cg --max-messages 1 test.topic
3
$ kafkactl describe cg test_cg
TOPIC          PARTITION     NEWEST_OFFSET     OLDEST_OFFSET     CONSUMER_OFFSET     LEAD     LAG
test.topic     0             100               0                 4                   4        96

CLIENT_HOST     CLIENT_ID     TOPIC     ASSIGNED_PARTITIONS

$ kafkactl consume -g test_cg --max-messages 1 test.topic
5
$ kafkactl describe cg test_cg
TOPIC          PARTITION     NEWEST_OFFSET     OLDEST_OFFSET     CONSUMER_OFFSET     LEAD     LAG
test.topic     0             100               0                 7                   7        93

CLIENT_HOST     CLIENT_ID     TOPIC     ASSIGNED_PARTITIONS

$ kafkactl consume -g test_cg --max-messages 1 test.topic
8
# with -V, unimportant output skipped, but notice DROP MESSAGE logs
[kafkactl] 2024/05/23 18:18:56 group consumer initialized
[kafkactl] 2024/05/23 18:18:56 waiting for group consumer
[sarama  ] 2024/05/23 18:18:56 consumer/broker/2 accumulated 1 new subscriptions
[sarama  ] 2024/05/23 18:18:56 consumer/broker/2 added subscription to test.topic/0
10
[kafkactl] 2024/05/23 18:18:56 drop message
[kafkactl] 2024/05/23 18:18:56 stop consume claim via channel
[kafkactl] 2024/05/23 18:18:56 drop message
[sarama  ] 2024/05/23 18:18:56 consumergroup/test_cg loop check partition number goroutine will exit, topics [test.topic]
[sarama  ] 2024/05/23 18:18:56 consumer/broker/2 closed dead subscription to test.topic/0
[sarama  ] 2024/05/23 18:18:56 consumergroup/session/kafkactl-minio-57df5166-e975-42f5-99dc-f30d0fd4eaeb/9 heartbeat loop stopped
[sarama  ] 2024/05/23 18:18:56 consumergroup/session/kafkactl-minio-57df5166-e975-42f5-99dc-f30d0fd4eaeb/9 released
[kafkactl] 2024/05/23 18:18:56 waiting for deserialization
[kafkactl] 2024/05/23 18:18:56 deserialization finished
[kafkactl] 2024/05/23 18:18:56 closing consumer

Expected behavior:

Each invocation of kafkactl consume with --max-messages parameter and consumer group advances that consumer group's offset by no more than the value of max-messages parameter so that no messages are skipped during consumption.

Version tested:

cmd.info{version:"v5.0.6", buildTime:"2024-03-14T10:08:45Z", gitCommit:"d7f78e0", goVersion:"go1.21.8", compiler:"gc", platform:"linux/amd64"}

@d-rk d-rk added the bug Something isn't working label Jun 4, 2024
@bzakhar
Copy link
Author

bzakhar commented Sep 16, 2024

Is this bug planned to be addressed any time soon? I do not know enough Go to figure out how to fix it myself, but it looks like draining the channel after consuming max-messages is intentional (those "drop message" log entries) and just not doing that (draining) might fix the issue.

@d-rk
Copy link
Collaborator

d-rk commented Sep 17, 2024

The problem is not that easy to solve: Suppose you have a topic with 4 partitions and you want to read only a single message. In order to know on which partition the next message was produced, you will have to read one message from all partitions and then compare their timestamps. Finally, you have to mark only one of these messages as consumed.

This will lead to a bigger change in how the consumption of messages is implemented. Hence, I cannot predict when I will have time to look into it.

@bzakhar
Copy link
Author

bzakhar commented Sep 17, 2024

"In order to know on which partition the next message was produced, you will have to read one message from all partitions and then compare their timestamps. Finally, you have to mark only one of these messages as consumed." - that is not how it works. Broker decides where a returning consumer should pick up based on where they left off and order of messages in the topic. In other words, the logic you describe is implemented on broker side, consumers don't have to bother with that. If there's just one consumer in the consumer group, it will consume messages from a topic in order or arrival regardless if it's partitioned or not, and if there are multiple consumers in a group then order on topic level simply can't be guaranteed (but it's still guaranteed within each partition). Ordered consumption from a topic requires single partition or a key which constrains keyed messages to single partition where order is guaranteed, it's a well-known behavior. And the use case here is really simple: read messages from a topic in whatever order the broker returns them and stop after reading N. There is no assumption or expectation that this will be done concurrently in more than one invocation of the program, but if that's the case, the invoker is expected to understand the relationships between multiple consumers in a group and multiple partitions in a topic and how this affects order of consumption.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants