-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[POC - 3] New commit interface #1042
base: master
Are you sure you want to change the base?
Conversation
0fb7fcb
to
a2f446e
Compare
.flatMap(offsetBatch => | ||
commitTransactionWithOffsets( | ||
offsetBatch, | ||
consumerGroupMetadata = None // TODO Jules |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO @guizmaii: How to pass this information here?
@@ -219,23 +214,16 @@ private[consumer] final class Runloop private ( | |||
if (streams.isEmpty) ZIO.succeed(fulfillResult) | |||
else { | |||
for { | |||
consumerGroupMetadata <- getConsumerGroupMetadataIfAny | |||
/*consumerGroupMetadata*/ _ <- getConsumerGroupMetadataIfAny // TODO Jules: Use: how? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO @guizmaii: Find how to pass this information to the transactional producer
da3ed50
to
b7bc8d7
Compare
} | ||
.transduce(Consumer.offsetBatches) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO @guizmaii: What was .tranduce
doing?
b7bc8d7
to
b1ade41
Compare
@inline | ||
private[zio] def topicPartition(record: ConsumerRecord[_, _]): TopicPartition = | ||
new TopicPartition(record.topic(), record.partition()) | ||
|
||
private[zio] def deserializeWith[R, K, V]( | ||
keyDeserializer: Deserializer[R, K], | ||
valueDeserializer: Deserializer[R, V] | ||
)(record: ConsumerRecord[Array[Byte], Array[Byte]]): RIO[R, ConsumerRecord[K, V]] = | ||
for { | ||
key <- keyDeserializer.deserialize(record.topic(), record.headers(), record.key()) | ||
value <- valueDeserializer.deserialize(record.topic(), record.headers(), record.value()) | ||
} yield new ConsumerRecord[K, V]( | ||
record.topic(), | ||
record.partition(), | ||
record.offset(), | ||
record.timestamp(), | ||
record.timestampType(), | ||
record.serializedKeySize(), | ||
record.serializedValueSize(), | ||
key, | ||
value, | ||
record.headers(), | ||
record.leaderEpoch() | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO @guizmaii: Move?
1ae11f0
to
ad7fb69
Compare
fc9b7e9
to
2b88dce
Compare
2b88dce
to
a60db8e
Compare
@@ -30,11 +31,11 @@ private[consumer] final class RunloopAccess private ( | |||
diagnostics: Diagnostics | |||
) { | |||
|
|||
private def withRunloopZIO[E]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO @guizmaii: Put back previous signature
@@ -37,6 +38,14 @@ trait Consumer { | |||
timeout: Duration = Duration.Infinity | |||
): Task[Map[TopicPartition, Long]] | |||
|
|||
def commit(record: ConsumerRecord[_, _]): Task[Unit] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is 6 extra methods. I propose we commit only Offset
like objects, not entire ConsumerRecords. Semantically that is better anyway, to 'commit an offset' instead of 'commit a record', the latter suggesting you can ack single records.
It would be nice to have a supertype for the Offset
and the OffsetBatch
, maybe CommittableOffset
. Alpakka has Committable
for the same concept.
@guizmaii Okay with you if we close this PR, being over a year old? I'd still like to explore a |
No description provided.