Skip to content

Commit

Permalink
consume topics by topic group
Browse files Browse the repository at this point in the history
  • Loading branch information
房成进 committed Aug 1, 2022
1 parent 5bc07fa commit 244e66b
Show file tree
Hide file tree
Showing 23 changed files with 14,819 additions and 3,082 deletions.
1 change: 1 addition & 0 deletions admin/Admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Admin interface {
ListTopics() ([]*topic.TopicInfo, error)
ListTopicsByOrgId(orgId string) ([]*topic.Topic, error)
ListTopicsInfo() ([]*topic.Topic, error)
LookupTopics(request *message.LookupTopicsRequest) (*message.LookupTopicsResponse, error)
GetTopicOffset(request *message.GetTopicOffsetRequest) ([]*message.OffsetInfo, error)
GetPartitionOffset(request *message.GetPartitionOffsetRequest) (*message.OffsetInfo, error)
GetScheduleInfo(request *message.GetScheduleInfoRequest) (map[*topic.TopicAndPartition]string, error)
Expand Down
16 changes: 13 additions & 3 deletions admin/TalosAdmin.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ type TalosAdmin struct {
credential *auth.Credential
}

func (a *TalosAdmin) ListTopicsByOrgId(orgId string) ([]*topic.Topic, error) {
panic("implement me")
}

func NewTalosAdmin(clientFactory *client.TalosClientFactory) *TalosAdmin {
return &TalosAdmin{
topicClient: clientFactory.NewTopicClientDefault(),
Expand Down Expand Up @@ -87,9 +91,9 @@ func (a *TalosAdmin) ListTopicsInfo() ([]*topic.Topic, error) {
return listTopicsInfoResponse.GetTopicList(), err
}

func (a *TalosAdmin) ListTopicsByOrgId(orgId string) ([]*topic.Topic, error) {
listTopicsResponse, err := a.metricClient.ListTopicsByOrgId(orgId)
return listTopicsResponse.GetTopicList(), err
func (a *TalosAdmin) LookupTopics(request *message.LookupTopicsRequest) (
*message.LookupTopicsResponse, error) {
return a.messageClient.LookupTopics(request)
}

func (a *TalosAdmin) GetTopicOffset(request *message.GetTopicOffsetRequest) (
Expand All @@ -104,6 +108,12 @@ func (a *TalosAdmin) GetPartitionOffset(request *message.GetPartitionOffsetReque
return getPartitionOffsetResponse.GetOffsetInfo(), err
}

func (a *TalosAdmin) DescribeTopicGroup(request *topic.DescribeTopicGroupRequest) (
*topic.TopicGroup, error) {
getPartitionOffsetResponse, err := a.topicClient.DescribeTopicGroup(request)
return getPartitionOffsetResponse.GetTopicGroup(), err
}

func (a *TalosAdmin) GetScheduleInfo(request *message.GetScheduleInfoRequest) (
map[*topic.TopicAndPartition]string, error) {
getScheduleInfoResponse, err := a.messageClient.GetScheduleInfo(request)
Expand Down
156 changes: 146 additions & 10 deletions client/TalosClientFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,67 @@ type TopicClientProxy struct {
clockOffset int64
}

func (p *TopicClientProxy) CreateTopicGroup(request *topic.CreateTopicGroupRequest) (
r *topic.CreateTopicGroupResponse, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=createTopicGroup")
defer transport.Close()
client := topic.NewTopicServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.CreateTopicGroup(request)

}

func (p *TopicClientProxy) DescribeTopicGroup(request *topic.DescribeTopicGroupRequest) (
r *topic.DescribeTopicGroupResponse, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=describeTopicGroup")
defer transport.Close()
client := topic.NewTopicServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.DescribeTopicGroup(request)
}

func (p *TopicClientProxy) DeleteTopicGroup(
request *topic.DeleteTopicGroupRequest) (err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=deleteTopicGroup")
defer transport.Close()
client := topic.NewTopicServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.DeleteTopicGroup(request)
}

func (p *TopicClientProxy) ListTopicGroup(request *topic.ListTopicGroupRequest) (
r *topic.ListTopicGroupResponse, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=listTopicGroup")
defer transport.Close()
client := topic.NewTopicServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.ListTopicGroup(request)
}

func (p *TopicClientProxy) CreateReplicationTopic(request *topic.CreateReplicationTopicRequest) (
r *topic.CreateTopicResponse, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=createReplicationTopic")
defer transport.Close()
client := topic.NewTopicServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.CreateReplicationTopic(request)
}

func (p *TopicClientProxy) GetTopicAttribute(request *topic.GetTopicAttributeRequest) (
r *topic.GetTopicAttributeResponse, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=getTopicAttribute")
defer transport.Close()
client := topic.NewTopicServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.GetTopicAttribute(request)
}

func (p *TopicClientProxy) GetServiceVersion() (r *common.Version, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=getServerVersion")
Expand Down Expand Up @@ -338,6 +399,56 @@ type MessageClientProxy struct {
clockOffset int64
}

func (p *MessageClientProxy) Prepare(request *message.PrepareRequest) (
r *message.PrepareResponse, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=prepare")
defer transport.Close()
client := message.NewMessageServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.Prepare(request)
}

func (p *MessageClientProxy) Commit(request *message.CommitRequest) (
r *message.CommitResponse, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=commit")
defer transport.Close()
client := message.NewMessageServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.Commit(request)
}

func (p *MessageClientProxy) Rollback(request *message.RollbackRequest) (
r *message.RollbackResponse, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=rollback")
defer transport.Close()
client := message.NewMessageServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.Rollback(request)
}

func (p *MessageClientProxy) GetUnkownStateTransaction(request *message.GetUnkownStateTransactionRequest) (
r *message.GetUnkownStateTransactionResponse, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=getUnkownStateTransaction")
defer transport.Close()
client := message.NewMessageServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.GetUnkownStateTransaction(request)
}

func (p *MessageClientProxy) LookupTopics(request *message.LookupTopicsRequest) (
r *message.LookupTopicsResponse, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=lookupTopics")
defer transport.Close()
client := message.NewMessageServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.LookupTopics(request)
}

func (p *MessageClientProxy) GetServiceVersion() (r *common.Version, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=getServerVersion")
Expand Down Expand Up @@ -424,6 +535,41 @@ type ConsumerClientProxy struct {
clockOffset int64
}

func (p *ConsumerClientProxy) LockWorkerForMultiTopics(request *consumer.MultiTopicsLockWorkerRequest) (r *consumer.MultiTopicsLockWorkerResponse, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=lockWorkerForMultiTopics")
defer transport.Close()
client := consumer.NewConsumerServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.LockWorkerForMultiTopics(request)
}

func (p *ConsumerClientProxy) CheckRegister(request *consumer.CheckRegisterRequest) (r *consumer.CheckRegisterResponse, err error) {
panic("implement me")
}

func (p *ConsumerClientProxy) RenewForMultiTopics(request *consumer.MultiTopicsRenewRequest) (r *consumer.MultiTopicsRenewResponse, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=renewForMultiTopics")
defer transport.Close()
client := consumer.NewConsumerServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.RenewForMultiTopics(request)
}

func (p *ConsumerClientProxy) QueryWorkerForMultiTopics(request *consumer.MultiTopicsQueryWorkerRequest) (r *consumer.MultiTopicsQueryWorkerResponse, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=queryWorkerForMultiTopics")
defer transport.Close()
client := consumer.NewConsumerServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.QueryWorkerForMultiTopics(request)
}

func (p *ConsumerClientProxy) DeleteConsumerGroup(request *consumer.DeleteConsumerGroupRequest) (err error) {
panic("implement me")
}

func (p *ConsumerClientProxy) GetServiceVersion() (r *common.Version, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=getServerVersion")
Expand Down Expand Up @@ -680,16 +826,6 @@ func (p *MetricClientProxy) ValidClientVersion(clientVersion *common.Version) (e
return client.ValidClientVersion(clientVersion)
}

func (p *MetricClientProxy) ListTopicsByOrgId(orgId string) (
r *topic.ListTopicsInfoResponse, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=listTopicsByOrgId")
defer transport.Close()
client := metric.NewMetricServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.ListTopicsByOrgId(orgId)
}

func (p *MetricClientProxy) ListTopics() (r *topic.ListTopicsInfoResponse, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=listTopics")
Expand Down
16 changes: 8 additions & 8 deletions consumer/PartitionFetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ func (f *PartitionFetcher) GetCurState() TaskState {
func (f *PartitionFetcher) updateState(targetState TaskState) bool {
f.fetcherLock.Lock()
defer f.fetcherLock.Unlock()
f.log.Infof("PartitionFetcher for Partition: %d update status from: %s to %s",
f.partitionId, f.curState.String(), targetState.String())
f.log.Infof("PartitionFetcher for Topic: %s Partition: %d update status from: %s to %s",
f.topicAndPartition.TopicName, f.partitionId, f.curState.String(), targetState.String())

switch targetState {
case INIT:
Expand Down Expand Up @@ -230,8 +230,8 @@ func (f *PartitionFetcher) Lock() {
if f.updateState(LOCKED) {
f.wg.Add(1)
go f.fetcherStateMachine()
f.log.Infof("Worker: %s invoke partition: %d to 'LOCKED', try to serve it.",
f.workerId, f.partitionId)
f.log.Infof("Worker: %s invoke topic: %s partition: %d to 'LOCKED', try to serve it.",
f.workerId, f.topicAndPartition.TopicName, f.partitionId)
}
}

Expand Down Expand Up @@ -307,12 +307,12 @@ func (f *PartitionFetcher) stealPartition() bool {
f.log.Errorf("lock partition failed: %s", err.Error())
return false
}
f.log.Infof("Worker: %s success to lock partitions: %d",
f.workerId, f.partitionId)
f.log.Infof("Worker: %s success to lock topic: %s partitions: %d",
f.workerId, f.topicAndPartition.TopicName, f.partitionId)
return true
}
f.log.Errorf("Worker: %s failed to lock partitions: %d",
f.workerId, f.partitionId)
f.log.Errorf("Worker: %s failed to lock topic: %s partition: %d",
f.workerId, f.topicAndPartition.TopicName, f.partitionId)
return false
}

Expand Down
20 changes: 19 additions & 1 deletion consumer/TalosConsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,15 @@ func (p WorkerPairs) Len() int {

// descending sort
func (p WorkerPairs) Less(i, j int) bool {
return p[i].hasPartitionNum > p[j].hasPartitionNum
if p[i].hasPartitionNum > p[j].hasPartitionNum {
return true
}

if p[i].hasPartitionNum < p[j].hasPartitionNum {
return false
}

return p[i].workerId > p[j].workerId
}

func (p WorkerPairs) Swap(i, j int) {
Expand Down Expand Up @@ -403,6 +411,16 @@ func (c *TalosConsumer) makeBalance() {
hasList := c.getHasList()
has := len(hasList)

// release all reduced partition and wait next cycle to balance
if len(hasList) > 0 && int(hasList[len(hasList) - 1]) >= c.partitionNumber {
for _, partitionId := range hasList {
if int(partitionId) >= c.partitionNumber {
toReleaseList = append(toReleaseList, partitionId)
}
}
break
}

// workerNum > partitionNum, idle workers have no match target, do nothing
if i >= len(targetList) {
break
Expand Down
16 changes: 16 additions & 0 deletions consumer/TalosConsumerConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

type TalosConsumerConfig struct {
partitionCheckInterval int64
topicPatternCheckInterval int64
workerInfoCheckInterval int64
renewCheckInterval int64
renewMaxRetry int64
Expand Down Expand Up @@ -53,6 +54,9 @@ func initConsumerConfig(props *utils.Properties) *TalosConsumerConfig {
partitionCheckInterval, _ := strconv.ParseInt(props.GetProperty(
GALAXY_TALOS_CONSUMER_CHECK_PARTITION_INTERVAL,
strconv.Itoa(GALAXY_TALOS_CONSUMER_CHECK_PARTITION_INTERVAL_DEFAULT)), 10, 64)
topicPatternCheckInterval, _ := strconv.ParseInt(props.GetProperty(
GALAXY_TALOS_CONSUMER_CHECK_TOPIC_PATTERN_INTERVAL,
strconv.Itoa(GALAXY_TALOS_CONSUMER_CHECK_TOPIC_PATTERN_INTERVAL_DEFAULT)), 10, 64)
workerInfoCheckInterval, _ := strconv.ParseInt(props.GetProperty(
GALAXY_TALOS_CONSUMER_CHECK_WORKER_INFO_INTERVAL,
strconv.Itoa(GALAXY_TALOS_CONSUMER_CHECK_WORKER_INFO_INTERVAL_DEFAULT)), 10, 64)
Expand Down Expand Up @@ -98,6 +102,7 @@ func initConsumerConfig(props *utils.Properties) *TalosConsumerConfig {

return &TalosConsumerConfig{
partitionCheckInterval: partitionCheckInterval,
topicPatternCheckInterval: topicPatternCheckInterval,
workerInfoCheckInterval: workerInfoCheckInterval,
renewCheckInterval: renewCheckInterval,
renewMaxRetry: renewMaxRetry,
Expand Down Expand Up @@ -125,6 +130,14 @@ func (c *TalosConsumerConfig) CheckParameter() error {
return err
}

err = utils.CheckParameterRange(GALAXY_TALOS_CONSUMER_CHECK_TOPIC_PATTERN_INTERVAL,
c.topicPatternCheckInterval,
GALAXY_TALOS_CONSUMER_CHECK_TOPIC_PATTERN_INTERVAL_MINIMUM,
GALAXY_TALOS_CONSUMER_CHECK_TOPIC_PATTERN_INTERVAL_MAXIMUM)
if err != nil {
return err
}

err = utils.CheckParameterRange(GALAXY_TALOS_CONSUMER_CHECK_WORKER_INFO_INTERVAL,
c.workerInfoCheckInterval,
GALAXY_TALOS_CONSUMER_CHECK_WORKER_INFO_INTERVAL_MINIMUM,
Expand Down Expand Up @@ -191,6 +204,9 @@ func (c *TalosConsumerConfig) CheckParameter() error {
return nil
}

func (c *TalosConsumerConfig) GetTopicPatternCheckInterval() int64 {
return c.topicPatternCheckInterval
}
func (c *TalosConsumerConfig) GetPartitionCheckInterval() int64 {
return c.partitionCheckInterval
}
Expand Down
8 changes: 8 additions & 0 deletions consumer/TalosConsumerConfigKey.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ const (
GALAXY_TALOS_CONSUMER_CHECK_PARTITION_INTERVAL_MINIMUM = 1000 * 60
GALAXY_TALOS_CONSUMER_CHECK_PARTITION_INTERVAL_MAXIMUM = 1000 * 60 * 3

/**
* The consumer check all authorized topics that match given pattern
*/
GALAXY_TALOS_CONSUMER_CHECK_TOPIC_PATTERN_INTERVAL = "galaxy.talos.consumer.check.topic.pattern.interval"
GALAXY_TALOS_CONSUMER_CHECK_TOPIC_PATTERN_INTERVAL_DEFAULT = 5 * 60 * 1000
GALAXY_TALOS_CONSUMER_CHECK_TOPIC_PATTERN_INTERVAL_MINIMUM = 3 * 60 * 1000
GALAXY_TALOS_CONSUMER_CHECK_TOPIC_PATTERN_INTERVAL_MAXIMUM = 10 * 60 * 1000

/**
* The consumer check alive worker info and their serving partitions interval
*/
Expand Down
4 changes: 2 additions & 2 deletions consumer/TalosMessageReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func (r *TalosMessageReader) InitStartOffset() error {
r.lastCommitOffset = atomic.LoadInt64(r.startOffset) - 1
r.finishedOffset = r.lastCommitOffset
}
r.log.Infof("Init startOffset: %d lastCommitOffset: %d for partition: %d ",
atomic.LoadInt64(r.startOffset), r.lastCommitOffset,
r.log.Infof("Init startOffset: %d lastCommitOffset: %d for topic: %s partition: %d ",
atomic.LoadInt64(r.startOffset), r.lastCommitOffset, r.topicAndPartition.GetTopicName(),
r.topicAndPartition.GetPartitionId())
return nil
}
Expand Down
Loading

0 comments on commit 244e66b

Please sign in to comment.