Skip to content

Commit

Permalink
fix: compatible with new topic group interface
Browse files Browse the repository at this point in the history
  • Loading branch information
fangchengjin committed May 17, 2024
1 parent a0039f7 commit af1b8c7
Show file tree
Hide file tree
Showing 15 changed files with 6,722 additions and 2,392 deletions.
2 changes: 1 addition & 1 deletion admin/Admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Admin interface {
ListTopics() ([]*topic.TopicInfo, error)
ListTopicsByOrgId(orgId string) ([]*topic.Topic, error)
ListTopicsInfo() ([]*topic.Topic, error)
LookupTopics(request *message.LookupTopicsRequest) (*message.LookupTopicsResponse, error)
LookupTopics(request *topic.LookupTopicsRequest) (*topic.LookupTopicsResponse, error)
GetTopicOffset(request *message.GetTopicOffsetRequest) ([]*message.OffsetInfo, error)
GetPartitionOffset(request *message.GetPartitionOffsetRequest) (*message.OffsetInfo, error)
GetScheduleInfo(request *message.GetScheduleInfoRequest) (map[*topic.TopicAndPartition]string, error)
Expand Down
6 changes: 3 additions & 3 deletions admin/TalosAdmin.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ func (a *TalosAdmin) ListTopicsInfo() ([]*topic.Topic, error) {
return listTopicsInfoResponse.GetTopicList(), err
}

func (a *TalosAdmin) LookupTopics(request *message.LookupTopicsRequest) (
*message.LookupTopicsResponse, error) {
return a.messageClient.LookupTopics(request)
func (a *TalosAdmin) LookupTopics(request *topic.LookupTopicsRequest) (
*topic.LookupTopicsResponse, error) {
return a.topicClient.LookupTopics(request)
}

func (a *TalosAdmin) GetTopicOffset(request *message.GetTopicOffsetRequest) (
Expand Down
117 changes: 93 additions & 24 deletions client/TalosClientFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/XiaoMi/talos-sdk-golang/thrift/thrift"
)

//interface for mock
// interface for mock
type TalosClientFactoryInterface interface {
NewTopicClient(url string) topic.TopicService
NewMessageClient(url string) message.MessageService
Expand Down Expand Up @@ -164,7 +164,6 @@ func (p *TopicClientProxy) CreateTopicGroup(request *topic.CreateTopicGroupReque
client := topic.NewTopicServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.CreateTopicGroup(request)

}

func (p *TopicClientProxy) DescribeTopicGroup(request *topic.DescribeTopicGroupRequest) (
Expand All @@ -187,14 +186,40 @@ func (p *TopicClientProxy) DeleteTopicGroup(
return client.DeleteTopicGroup(request)
}

func (p *TopicClientProxy) ListTopicGroup(request *topic.ListTopicGroupRequest) (
r *topic.ListTopicGroupResponse, err error) {
func (p *TopicClientProxy) UpdateTopicGroup(request *topic.UpdateTopicGroupRequest) (err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=updateTopicGroup")
defer transport.Close()
client := topic.NewTopicServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.UpdateTopicGroup(request)
}

func (p *TopicClientProxy) ListTopicGroup() (r *topic.ListTopicGroupResponse, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=listTopicGroup")
defer transport.Close()
client := topic.NewTopicServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.ListTopicGroup(request)
return client.ListTopicGroup()
}

func (p *TopicClientProxy) LookupTopics(request *topic.LookupTopicsRequest) (r *topic.LookupTopicsResponse, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=lookupTopics")
defer transport.Close()
client := topic.NewTopicServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.LookupTopics(request)
}

func (p *TopicClientProxy) MatchTopics(request *topic.MatchTopicsRequest) (r *topic.MatchTopicsResponse, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=matchTopics")
defer transport.Close()
client := topic.NewTopicServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.MatchTopics(request)
}

func (p *TopicClientProxy) CreateReplicationTopic(request *topic.CreateReplicationTopicRequest) (
Expand All @@ -207,6 +232,15 @@ func (p *TopicClientProxy) CreateReplicationTopic(request *topic.CreateReplicati
return client.CreateReplicationTopic(request)
}

func (p *TopicClientProxy) ChangeReplicationTopicSourceInfo(request *topic.ChangeReplicationTopicSourceInfoRequest) (err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=changeReplicationTopicSourceInfo")
defer transport.Close()
client := topic.NewTopicServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.ChangeReplicationTopicSourceInfo(request)
}

func (p *TopicClientProxy) GetTopicAttribute(request *topic.GetTopicAttributeRequest) (
r *topic.GetTopicAttributeResponse, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
Expand All @@ -217,6 +251,15 @@ func (p *TopicClientProxy) GetTopicAttribute(request *topic.GetTopicAttributeReq
return client.GetTopicAttribute(request)
}

func (p *TopicClientProxy) DeleteTopicAttribute(request *topic.DeleteTopicAttributeRequest) (err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=deleteTopicAttribute")
defer transport.Close()
client := topic.NewTopicServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.DeleteTopicAttribute(request)
}

func (p *TopicClientProxy) GetServiceVersion() (r *common.Version, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=getServerVersion")
Expand Down Expand Up @@ -265,6 +308,15 @@ func (p *TopicClientProxy) ChangeTopicAttribute(request *topic.
return client.ChangeTopicAttribute(request)
}

func (p *TopicClientProxy) ChangeTopicOwnerInfo(request *topic.ChangeTopicOwnerInfoRequest) (err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=changeTopicOwnerInfo")
defer transport.Close()
client := topic.NewTopicServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.ChangeTopicOwnerInfo(request)
}

func (p *TopicClientProxy) DescribeTopic(request *topic.
DescribeTopicRequest) (r *topic.DescribeTopicResponse, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
Expand Down Expand Up @@ -400,6 +452,33 @@ type MessageClientProxy struct {
clockOffset int64
}

func (p *MessageClientProxy) GetTopicCheckpoint(request *message.GetCheckpointRequest) (r *message.GetCheckpointResponse, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=getTopicCheckpoint")
defer transport.Close()
client := message.NewMessageServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.GetTopicCheckpoint(request)
}

func (p *MessageClientProxy) GetPartitionCheckpoint(request *message.GetPartitionCheckpointRequest) (r *message.GetPartitionCheckpointResponse, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=getPartitionCheckpoint")
defer transport.Close()
client := message.NewMessageServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.GetPartitionCheckpoint(request)
}

func (p *MessageClientProxy) DeleteMessageIndex(request *message.DeleteMessageIndexRequest) (err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=deleteMessageIndex")
defer transport.Close()
client := message.NewMessageServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.DeleteMessageIndex(request)
}

func (p *MessageClientProxy) Prepare(request *message.PrepareRequest) (
r *message.PrepareResponse, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
Expand Down Expand Up @@ -440,16 +519,6 @@ func (p *MessageClientProxy) GetUnkownStateTransaction(request *message.GetUnkow
return client.GetUnkownStateTransaction(request)
}

func (p *MessageClientProxy) LookupTopics(request *message.LookupTopicsRequest) (
r *message.LookupTopicsResponse, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=lookupTopics")
defer transport.Close()
client := message.NewMessageServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.LookupTopics(request)
}

func (p *MessageClientProxy) GetServiceVersion() (r *common.Version, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=getServerVersion")
Expand Down Expand Up @@ -536,35 +605,35 @@ type ConsumerClientProxy struct {
clockOffset int64
}

func (p *ConsumerClientProxy) LockWorkerForMultiTopics(request *consumer.MultiTopicsLockWorkerRequest) (r *consumer.MultiTopicsLockWorkerResponse, err error) {
func (p *ConsumerClientProxy) LockWorkerForTopics(request *consumer.TopicsLockWorkerRequest) (r *consumer.TopicsLockWorkerResponse, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=lockWorkerForMultiTopics")
p.clockOffset, "type=lockWorkerForTopics")
defer transport.Close()
client := consumer.NewConsumerServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.LockWorkerForMultiTopics(request)
return client.LockWorkerForTopics(request)
}

func (p *ConsumerClientProxy) CheckRegister(request *consumer.CheckRegisterRequest) (r *consumer.CheckRegisterResponse, err error) {
panic("implement me")
}

func (p *ConsumerClientProxy) RenewForMultiTopics(request *consumer.MultiTopicsRenewRequest) (r *consumer.MultiTopicsRenewResponse, err error) {
func (p *ConsumerClientProxy) RenewForTopics(request *consumer.TopicsRenewRequest) (r *consumer.TopicsRenewResponse, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=renewForMultiTopics")
p.clockOffset, "type=renewForTopics")
defer transport.Close()
client := consumer.NewConsumerServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.RenewForMultiTopics(request)
return client.RenewForTopics(request)
}

func (p *ConsumerClientProxy) QueryWorkerForMultiTopics(request *consumer.MultiTopicsQueryWorkerRequest) (r *consumer.MultiTopicsQueryWorkerResponse, err error) {
func (p *ConsumerClientProxy) QueryWorkerForTopics(request *consumer.TopicsQueryWorkerRequest) (r *consumer.TopicsQueryWorkerResponse, err error) {
transport := p.factory.GetTransportWithClockOffset(nil,
p.clockOffset, "type=queryWorkerForMultiTopics")
p.clockOffset, "type=queryWorkerForTopics")
defer transport.Close()
client := consumer.NewConsumerServiceClientFactory(transport,
thrift.NewTCompactProtocolFactory())
return client.QueryWorkerForMultiTopics(request)
return client.QueryWorkerForTopics(request)
}

func (p *ConsumerClientProxy) DeleteConsumerGroup(request *consumer.DeleteConsumerGroupRequest) (err error) {
Expand Down
Loading

0 comments on commit af1b8c7

Please sign in to comment.