Skip to content

Commit

Permalink
Fix NPE due to negative offset
Browse files Browse the repository at this point in the history
  • Loading branch information
discanto committed Sep 2, 2020
1 parent be35886 commit 6556783
Showing 1 changed file with 8 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package de.azapps.kafkabackup.common.offset;

import com.fasterxml.jackson.databind.ObjectMapper;
import de.azapps.kafkabackup.sink.BackupSinkTask;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.RetriableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.Files;
Expand All @@ -19,6 +22,7 @@
import java.util.stream.Collectors;

public class OffsetSink {
private static final Logger log = LoggerFactory.getLogger(OffsetSink.class);
private final Path targetDir;
private final Map<TopicPartition, OffsetStoreFile> topicOffsets = new HashMap<>();
private List<String> consumerGroups = new ArrayList<>();
Expand Down Expand Up @@ -62,6 +66,10 @@ private void syncOffsetsForGroup(String consumerGroup) throws IOException {
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : topicOffsetsAndMetadata.entrySet()) {
TopicPartition tp = entry.getKey();
OffsetAndMetadata offsetAndMetadata = entry.getValue();
if (offsetAndMetadata == null) {
log.warn("OffsetAndMetadata not available, negative offset? for tp {}", tp);
return;
}

if (validTopic(tp.topic())) {
if (!this.topicOffsets.containsKey(tp)) {
Expand Down

0 comments on commit 6556783

Please sign in to comment.