Skip to content

Commit

Permalink
[INLONG-11386][TubeMQ] Use local files to save consumer group offset …
Browse files Browse the repository at this point in the history
…information (#11395)

* [INLONG-11386][TubeMQ] Use local files to save consumer group offset information

* [INLONG-11386][TubeMQ] Use local files to save consumer group offset information

---------

Co-authored-by: gosonzhang <[email protected]>
  • Loading branch information
gosonzhang and gosonzhang authored Oct 23, 2024
1 parent 8ec51cb commit 1272609
Show file tree
Hide file tree
Showing 31 changed files with 2,425 additions and 357 deletions.
8 changes: 0 additions & 8 deletions docker/kubernetes/templates/tubemq-broker-ini-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,4 @@ data:
loadMessageStoresInParallel=true
consumerRegTimeoutMs=35000
[zookeeper]
zkNodeRoot=/tubemq
zkServerAddr={{ template "inlong.zookeeper.hostname" . }}:{{ .Values.zookeeper.ports.client }}
zkSessionTimeoutMs=30000
zkConnectionTimeoutMs=30000
zkSyncTimeMs=5000
zkCommitPeriodMs=5000
zkCommitFailRetries=10
{{- end }}
17 changes: 0 additions & 17 deletions inlong-tubemq/conf/broker.ini
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,6 @@ loadMessageStoresInParallel=true
; timeout of consumer heartbeat, optional; default is 30s
consumerRegTimeoutMs=35000


[zookeeper]
; root path of TubeMQ znodes on ZK
zkNodeRoot=/tubemq
; connect string of ZK servers
zkServerAddr=localhost:2181
; timeout of ZK heartbeat; default is 30000ms
zkSessionTimeoutMs=30000
; timeout of ZK connection; default is 30000ms
zkConnectionTimeoutMs=30000
; sync time on ZK; default is 5000ms
zkSyncTimeMs=5000
; interval to commits data on ZK; default is 5000ms
zkCommitPeriodMs=5000
; maximum retry times when commits data on ZK fails
zkCommitFailRetries=10

[audit]
; whether to enable data report by audit sdk
auditEnable=false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,8 @@ public class TBaseConstants {
public static final long CFG_MIN_META_FORCE_UPDATE_PERIOD = 1 * 60 * 1000;
public static final long CFG_STATS_MIN_SNAPSHOT_PERIOD_MS = 2000;

public static final int OFFSET_HISTORY_RECORD_SHORT_VERSION = 2;
public static final int OFFSET_HISTORY_RECORD_VERSION_3 = 3;

public static final int OFFSET_TOPIC_PUBLISH_RECORD_VERSION_1 = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;

import static java.lang.Math.abs;

/**
Expand All @@ -58,6 +60,12 @@ public class BrokerConfig extends AbstractFileConfig {
// master service address
private String masterAddressList;
private String primaryPath;
private boolean enableWriteOffset2Zk = false;
private String offsetStgFilePath;
private long grpOffsetStgExpMs = TServerConstants.CFG_DEF_GROUP_OFFSETS_STG_EXPIRED_DUR_MS;
private long offsetStgCacheFlushMs = 5000L;
private long offsetStgFileSyncMs = offsetStgCacheFlushMs + 1000L;
private long offsetStgSyncDurWarnMs = 20000L;
// tcp write service thread count
private int tcpWriteServiceThread =
Runtime.getRuntime().availableProcessors() * 2;
Expand Down Expand Up @@ -112,7 +120,7 @@ public class BrokerConfig extends AbstractFileConfig {
private int rowLockWaitDurMs =
TServerConstants.CFG_ROWLOCK_DEFAULT_DURATION;
// zookeeper config
private ZKConfig zkConfig = new ZKConfig();
private ZKConfig zkConfig;
// tls config
private TLSConfig tlsConfig = new TLSConfig();
// audit configure
Expand Down Expand Up @@ -214,7 +222,7 @@ public long getGroupOffsetScanDurMs() {
}

@Override
protected void loadFileSectAttributes(final Ini iniConf) {
protected void loadFileSectAttributes(Ini iniConf) {
this.loadBrokerSectConf(iniConf);
this.tlsConfig = this.loadTlsSectConf(iniConf,
TBaseConstants.META_DEFAULT_BROKER_TLS_PORT);
Expand Down Expand Up @@ -258,10 +266,58 @@ private void loadBrokerSectConf(final Ini iniConf) {
throw new IllegalArgumentException("Require primaryPath not Blank!");
}
this.primaryPath = brokerSect.get("primaryPath").trim();
if (this.primaryPath.endsWith(File.separator)) {
this.primaryPath = this.primaryPath.substring(0,
this.primaryPath.length() - File.separator.length());
if (this.primaryPath.isEmpty()) {
throw new IllegalArgumentException(new StringBuilder(256)
.append("Parameter primaryPath(")
.append(brokerSect.get("primaryPath").trim())
.append(" only include separator! ").toString());
}
}
if (TStringUtils.isBlank(brokerSect.get("hostName"))) {
throw new IllegalArgumentException(new StringBuilder(256).append("hostName is null or Blank in ")
.append(SECT_TOKEN_BROKER).append(" section!").toString());
}
// enableWriteOffset2Zk
if (TStringUtils.isNotBlank(brokerSect.get("enableWriteOffset2Zk"))) {
this.enableWriteOffset2Zk = getBoolean(brokerSect, "enableWriteOffset2Zk");
}
if (TStringUtils.isBlank(brokerSect.get("offsetStgFilePath"))) {
this.offsetStgFilePath = this.primaryPath;
} else {
this.offsetStgFilePath = brokerSect.get("offsetStgFilePath").trim();
if (this.offsetStgFilePath.endsWith(File.separator)) {
this.offsetStgFilePath = this.offsetStgFilePath.substring(0,
this.offsetStgFilePath.length() - File.separator.length());
if (this.offsetStgFilePath.isEmpty()) {
throw new IllegalArgumentException(new StringBuilder(256)
.append("Parameter offsetStgFilePath(")
.append(brokerSect.get("offsetStgFilePath").trim())
.append(" only include separator! ").toString());
}
}
}
if (TStringUtils.isNotBlank(brokerSect.get("offsetStgCacheFlushMs"))) {
this.offsetStgCacheFlushMs =
Math.min(getLong(brokerSect, "offsetStgCacheFlushMs"), 1000L);
this.offsetStgFileSyncMs = this.offsetStgCacheFlushMs + 1000L;
}
if (TStringUtils.isNotBlank(brokerSect.get("grpOffsetStgExpMs"))) {
this.grpOffsetStgExpMs = Math.min(getLong(brokerSect, "grpOffsetStgExpMs"),
TServerConstants.CFG_MIN_GROUP_OFFSETS_STG_EXPIRED_DUR_MS);
}
if (TStringUtils.isNotBlank(brokerSect.get("offsetStgFileSyncMs"))) {
this.offsetStgFileSyncMs =
Math.min(getLong(brokerSect, "offsetStgFileSyncMs"),
this.offsetStgCacheFlushMs + 1000L);
}
if (TStringUtils.isNotBlank(brokerSect.get("offsetStgSyncDurWarnMs"))) {
this.offsetStgSyncDurWarnMs =
Math.min(getLong(brokerSect, "offsetStgSyncDurWarnMs"),
this.offsetStgFileSyncMs + 1000L);
}
if (TStringUtils.isNotBlank(brokerSect.get("defEthName"))) {
this.defEthName = brokerSect.get("defEthName").trim();
}
Expand Down Expand Up @@ -522,4 +578,27 @@ public String getMasterAddressList() {
return masterAddressList;
}

public boolean isEnableWriteOffset2Zk() {
return enableWriteOffset2Zk;
}

public String getOffsetStgFilePath() {
return offsetStgFilePath;
}

public long getGrpOffsetStgExpMs() {
return grpOffsetStgExpMs;
}

public long getOffsetStgCacheFlushMs() {
return offsetStgCacheFlushMs;
}

public long getOffsetStgFileSyncMs() {
return offsetStgFileSyncMs;
}

public long getOffsetStgSyncDurWarnMs() {
return offsetStgSyncDurWarnMs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import org.apache.inlong.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
import org.apache.inlong.tubemq.server.broker.offset.OffsetHistoryInfo;
import org.apache.inlong.tubemq.server.broker.offset.OffsetService;
import org.apache.inlong.tubemq.server.broker.offset.offsetstorage.OffsetStorageInfo;
import org.apache.inlong.tubemq.server.broker.offset.OffsetStorageInfo;
import org.apache.inlong.tubemq.server.broker.stats.BrokerSrvStatsHolder;
import org.apache.inlong.tubemq.server.broker.stats.TrafficStatsService;
import org.apache.inlong.tubemq.server.broker.stats.audit.AuditUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -128,8 +129,8 @@ public TubeBroker(final BrokerConfig tubeConfig) throws Exception {
this.tubeConfig = tubeConfig;
this.brokerId = generateBrokerClientId();
BrokerJMXHolder.registerMXBean();
this.metadataManager = new BrokerMetadataManager();
this.offsetManager = new DefaultOffsetManager(tubeConfig);
this.metadataManager = new BrokerMetadataManager(tubeConfig.getGrpOffsetStgExpMs());
this.offsetManager = new DefaultOffsetManager(tubeConfig, this.metadataManager);
this.storeManager = new MessageStoreManager(this, tubeConfig);
this.serverAuthHandler = new SimpleCertificateBrokerHandler(this);
this.offsetRecordService =
Expand Down Expand Up @@ -227,6 +228,7 @@ public void start() throws Exception {
this.shutdown.set(false);
// register to master, and heartbeat to master.
this.register2Master();
this.offsetManager.start();
this.scheduledExecutorService.scheduleAtFixedRate(
new Runnable() {

Expand Down Expand Up @@ -403,7 +405,8 @@ private void procConfigFromHeartBeat(StringBuilder strBuff,

@Override
public void run() {
storeManager.removeTopicStore();
Set<String> removedTopics = storeManager.removeTopicStore();
offsetManager.cleanRmvTopicInfo(removedTopics);
}
}.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@ public class BrokerMetadataManager implements MetadataManager {
private final Map<String/* topic */, TopicMetadata> removedTopicConfigMap =
new ConcurrentHashMap<>();
private long lastRptBrokerMetaConfId = 0;
// group offset storage expire ms
private final long grpOffsetStgExpMs;

public BrokerMetadataManager() {

public BrokerMetadataManager(long grpOffsetStgExpMs) {
this.grpOffsetStgExpMs = grpOffsetStgExpMs;
}

@Override
Expand Down Expand Up @@ -109,6 +111,11 @@ public List<String> getTopics() {
return topics;
}

@Override
public boolean isTopicExisted(String topicName) {
return topics.contains(topicName);
}

@Override
public String getDefDeletePolicy() {
return brokerDefMetadata.getDeletePolicy();
Expand Down Expand Up @@ -173,6 +180,11 @@ public Map<String, TopicMetadata> getRemovedTopicConfigMap() {
return removedTopicConfigMap;
}

@Override
public long getGrpOffsetsStgExpMs() {
return grpOffsetStgExpMs;
}

/**
* Get hard removed topics. Hard removed means the disk files is deleted, cannot be recovery.
* Topic will be deleted in two phases, the first is mark topic's file delete, the second is delete the disk files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ void addPropertyChangeListener(String propertyName,

List<String> getTopics();

boolean isTopicExisted(String topicName);

TopicMetadata getTopicMetadata(String topic);

BrokerDefMetadata getBrokerDefMetadata();
Expand Down Expand Up @@ -83,4 +85,6 @@ void addPropertyChangeListener(String propertyName,
String getTopicDeletePolicy(String topic);

Map<String, TopicMetadata> getTopicConfigMap();

long getGrpOffsetsStgExpMs();
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@
import org.apache.inlong.tubemq.server.broker.metadata.TopicMetadata;
import org.apache.inlong.tubemq.server.broker.msgstore.disk.GetMessageResult;
import org.apache.inlong.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
import org.apache.inlong.tubemq.server.broker.offset.OffsetCsmRecord;
import org.apache.inlong.tubemq.server.broker.offset.OffsetHistoryInfo;
import org.apache.inlong.tubemq.server.broker.offset.topicpub.TopicPubInfo;
import org.apache.inlong.tubemq.server.broker.utils.DataStoreUtils;
import org.apache.inlong.tubemq.server.broker.utils.TopicPubStoreInfo;
import org.apache.inlong.tubemq.server.common.TStatusConstants;
Expand Down Expand Up @@ -199,16 +198,15 @@ public void close() {
}

@Override
public List<String> removeTopicStore() {
public Set<String> removeTopicStore() {
if (isRemovingTopic.get()) {
return null;
}
if (!isRemovingTopic.compareAndSet(false, true)) {
return null;
}
try {
List<String> removedTopics =
new ArrayList<>();
Set<String> removedTopics = new HashSet<>();
Map<String, TopicMetadata> removedTopicMap =
this.metadataManager.getRemovedTopicConfigMap();
if (removedTopicMap.isEmpty()) {
Expand Down Expand Up @@ -411,8 +409,8 @@ public Map<String, ConcurrentHashMap<Integer, MessageStore>> getMessageStores()
@Override
public Map<String, Map<Integer, TopicPubStoreInfo>> getTopicPublishInfos(
Set<String> topicSet) {
MessageStore store = null;
TopicMetadata topicMetadata = null;
MessageStore store;
TopicMetadata topicMetadata;
Set<String> qryTopicSet = new HashSet<>();
Map<String, Map<Integer, TopicPubStoreInfo>> topicPubStoreInfoMap = new HashMap<>();
Map<String, TopicMetadata> confTopicInfo = metadataManager.getTopicConfigMap();
Expand Down Expand Up @@ -460,45 +458,39 @@ public Map<String, Map<Integer, TopicPubStoreInfo>> getTopicPublishInfos(
}

/**
* Query topic's publish info.
* Query all topic's publish info.
*
* @param groupOffsetMap query's topic set
* @return the current topic's offset info
*
*/
@Override
public void getTopicPublishInfos(Map<String, OffsetHistoryInfo> groupOffsetMap) {
MessageStore store = null;
Map<Integer, MessageStore> storeMap;
Map<String, Map<Integer, OffsetCsmRecord>> topicOffsetMap;
for (Map.Entry<String, OffsetHistoryInfo> entry : groupOffsetMap.entrySet()) {
if (entry == null || entry.getKey() == null || entry.getValue() == null) {
public Map<String, TopicPubInfo> getTopicPublishInfos() {
TopicPubInfo topicPubInfo;
Map<String, TopicPubInfo> result = new HashMap<>();
for (Map.Entry<String, ConcurrentHashMap<Integer, MessageStore>> entry : dataStores.entrySet()) {
if (entry == null
|| entry.getKey() == null
|| entry.getValue() == null
|| entry.getValue().isEmpty()) {
continue;
}
topicOffsetMap = entry.getValue().getOffsetMap();
// Get offset records by topic
for (Map.Entry<String, Map<Integer, OffsetCsmRecord>> entryTopic : topicOffsetMap.entrySet()) {
if (entryTopic == null
|| entryTopic.getKey() == null
|| entryTopic.getValue() == null) {
continue;
}
// Get message store instance
storeMap = dataStores.get(entryTopic.getKey());
if (storeMap == null) {
for (Map.Entry<Integer, MessageStore> entry1 : entry.getValue().entrySet()) {
if (entry1 == null
|| entry1.getKey() == null
|| entry1.getValue() == null) {
continue;
}
for (Map.Entry<Integer, OffsetCsmRecord> entryRcd : entryTopic.getValue().entrySet()) {
store = storeMap.get(entryRcd.getValue().getStoreId());
if (store == null) {
continue;
}
// Append current max, min offset
entryRcd.getValue().addStoreInfo(store.getIndexMinOffset(),
store.getIndexMaxOffset(), store.getDataMinOffset(),
store.getDataMaxOffset());
topicPubInfo = result.get(entry.getKey());
if (topicPubInfo == null) {
topicPubInfo = new TopicPubInfo(entry.getKey());
result.put(entry.getKey(), topicPubInfo);
}
topicPubInfo.addStorePubInfo(entry1.getKey(), entry1.getValue().getPartitionNum(),
entry1.getValue().getIndexMinOffset(), entry1.getValue().getIndexMaxOffset(),
entry1.getValue().getDataMinOffset(), entry1.getValue().getDataMaxOffset());
}
}
return result;
}

private Set<File> getLogDirSet(final BrokerConfig tubeConfig) throws IOException {
Expand Down
Loading

0 comments on commit 1272609

Please sign in to comment.