Skip to content

Commit

Permalink
feat: add zstd and lz4 compression
Browse files Browse the repository at this point in the history
  • Loading branch information
bxfjb authored and 房成进 committed Nov 20, 2023
1 parent 06644d2 commit a0039f7
Show file tree
Hide file tree
Showing 23 changed files with 2,025 additions and 936 deletions.
28 changes: 28 additions & 0 deletions client/compression/Compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/XiaoMi/talos-sdk-golang/thrift/message"
"github.com/XiaoMi/talos-sdk-golang/utils"

"github.com/DataDog/zstd"
"github.com/bkaradzic/go-lz4"
xSnappy "github.com/eapache/go-xerial-snappy"
)

Expand Down Expand Up @@ -62,6 +64,19 @@ func DoCompress(messageList []*message.Message,
writer.Write(messageSerializedBuffer.Bytes())
writer.Close()
messageBlockData = append(messageBlockData, gzipBuf.Bytes()...)
case message.MessageCompressionType_ZSTD:
result, err := zstd.Compress(nil, messageSerializedBuffer.Bytes())
if err != nil {
return nil, err
}
messageBlockData = append(messageBlockData, result...)
case message.MessageCompressionType_LZ4:
compressed := make([]byte, 0)
compressed, err := lz4.Encode(nil, messageSerializedBuffer.Bytes())
if err != nil {
return nil, err
}
messageBlockData = append(messageBlockData, compressed...)
default:
err := fmt.Errorf("unsupport compression type")
return nil, err
Expand Down Expand Up @@ -120,6 +135,19 @@ func DoDecompress(messageBlock *message.MessageBlock,
return nil, err
}
messageBlockData = bytes.NewBuffer(messageByteSlice)
case message.MessageCompressionType_ZSTD:
result, err := zstd.Decompress(nil, messageBlock.GetMessageBlock())
if err != nil {
return nil, err
}
messageBlockData = bytes.NewBuffer(result)
case message.MessageCompressionType_LZ4:
decompressed := make([]byte, 0)
decompressed, err := lz4.Decode(nil, messageBlock.GetMessageBlock())
if err != nil {
return nil, err
}
messageBlockData = bytes.NewBuffer(decompressed)
}

for i := int32(0); i < messageNumber; i++ {
Expand Down
4 changes: 2 additions & 2 deletions consumer/TalosTopicsConsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func NewDefaultTalosMultiTopicsConsumer(consumerGroupName string, consumerConfig
}

func (c *TalosTopicsConsumer) getTopicAndScheduleInfo() error {
response, err := c.talosAdmin.LookupTopics(&message.LookupTopicsRequest{TopicPattern: c.topicPattern})
response, err := c.talosAdmin.LookupTopics(&message.LookupTopicsRequest{TopicGroup: c.topicGroup})
if err != nil {
return err
}
Expand Down Expand Up @@ -733,7 +733,7 @@ func (c *TalosTopicsConsumer) CheckTopicPatternTask() {
}
}
response, err := c.scheduleInfoCache.GetOrCreateMessageClient(topicAndPartition).LookupTopics(
&message.LookupTopicsRequest{TopicPattern: c.topicPattern})
&message.LookupTopicsRequest{TopicGroup: c.topicGroup})
if err != nil {
c.log.Errorf("Exception in CheckTopicsTask: %s", err.Error())
return
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
module github.com/XiaoMi/talos-sdk-golang

go 1.12
go 1.14

require (
github.com/DataDog/zstd v1.5.5
github.com/bkaradzic/go-lz4 v1.0.0
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21
github.com/gofrs/uuid v3.2.0+incompatible
github.com/golang/mock v1.3.1
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ=
github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/bkaradzic/go-lz4 v1.0.0 h1:RXc4wYsyz985CkXXeX04y4VnZFGG8Rd43pRaHsOXAKk=
github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwjwegp5jy4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
10 changes: 7 additions & 3 deletions producer/TalosProducerConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func initProducerConfig(props *utils.Properties) (*TalosProducerConfig, error) {
compressionType := props.GetProperty(
GALAXY_TALOS_PRODUCER_COMPRESSION_TYPE,
GALAXY_TALOS_PRODUCER_COMPRESSION_TYPE_DEFAULT)
if compressionType != "NONE" && compressionType != "SNAPPY" &&
if compressionType != "NONE" && compressionType != "SNAPPY" && compressionType != "ZSTD" && compressionType != "LZ4" &&
compressionType != "GZIP" {
return nil, fmt.Errorf("Unsupported Compression Type: %v ", compressionType)
}
Expand Down Expand Up @@ -201,12 +201,16 @@ func (p *TalosProducerConfig) GetCompressionType() message.MessageCompressionTyp
return message.MessageCompressionType_NONE
} else if p.compressionType == "SNAPPY" {
return message.MessageCompressionType_SNAPPY
} else if p.compressionType == "GZIP" {
return message.MessageCompressionType_GZIP
} else if p.compressionType == "ZSTD" {
return message.MessageCompressionType_ZSTD
} else {
err := utils.CheckArgument(p.compressionType == "GZIP")
err := utils.CheckArgument(p.compressionType == "LZ4")
if err != nil {
return message.MessageCompressionType(0)
}
return message.MessageCompressionType_GZIP
return message.MessageCompressionType_LZ4
}
}

Expand Down
2 changes: 1 addition & 1 deletion producer/TalosProducerConfigKey.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ const (
GALAXY_TALOS_PRODUCER_WAIT_PARTITION_WORKING_TIME_DEFAULT = 200

/**
* The producer compression type, right now suport "NONE", "SNAPPY" and "GZIP";
* The producer compression type, right now suport "NONE", "SNAPPY", "GZIP", "ZSTD" and "LZ4";
* default is "SNAPPY";
*/
GALAXY_TALOS_PRODUCER_COMPRESSION_TYPE = "galaxy.talos.producer.compression.type"
Expand Down
46 changes: 46 additions & 0 deletions test/client/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,49 @@ func TestGzip(t *testing.T) {
}

}

func TestCompression(t *testing.T) {
messageList := setUp()
compressionTypeList := []message.MessageCompressionType{
message.MessageCompressionType_NONE,
message.MessageCompressionType_SNAPPY,
message.MessageCompressionType_GZIP,
message.MessageCompressionType_ZSTD,
message.MessageCompressionType_LZ4,
}
for _, compressionType := range compressionTypeList {
messageBlock, err := compression.Compress(messageList, compressionType)
if err != nil {
t.Errorf("compression type error")
}
if int32(len(messageList)) != messageBlock.GetMessageNumber() {
t.Errorf("message number error")
}

startOffset := int64(1234)
appendTimestamp := int64(1110000)
unHandledMessageNumber := int64(117)

messageBlock.StartMessageOffset = startOffset
messageBlock.AppendTimestamp = &appendTimestamp
t.Logf("CompressionType: %s, message BlockSize: %v", messageBlock.CompressionType.String(), len(messageBlock.GetMessageBlock()))

verifyMessageList, err := compression.DoDecompress(messageBlock, unHandledMessageNumber)
if len(verifyMessageList) != len(messageList) || err != nil {
t.Errorf("decompress error: wrong size or unKnow")
}
for i := 0; i < len(messageList); i++ {
msg := messageList[i]
verifyMsg := verifyMessageList[i].GetMessage()
if msg.GetPartitionKey() != verifyMsg.GetPartitionKey() ||
msg.GetCreateTimestamp() != verifyMsg.GetCreateTimestamp() ||
appendTimestamp != verifyMsg.GetAppendTimestamp() ||
msg.GetSequenceNumber() != verifyMsg.GetSequenceNumber() ||
!reflect.DeepEqual(msg.GetMessage(), verifyMsg.GetMessage()) ||
(startOffset+int64(i)) != verifyMessageList[i].GetMessageOffset() ||
verifyMessageList[i].GetUnHandledMessageNumber() != (unHandledMessageNumber+int64(len(messageList))-1-int64(i)) {
t.Errorf("decompress message not equal to source message")
}
}
}
}
38 changes: 20 additions & 18 deletions test/client/scheduleinfocache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ package client
import (
"testing"

"talos-sdk-golang/thrift/message"
"talos-sdk-golang/thrift/topic"
"github.com/XiaoMi/talos-sdk-golang/thrift/message"
"github.com/XiaoMi/talos-sdk-golang/thrift/topic"

"talos-sdk-golang/utils"
"github.com/XiaoMi/talos-sdk-golang/utils"

"talos-sdk-golang/client"
"talos-sdk-golang/testos-sdk-golang/test/mock_client"
"talos-sdk-golang/testos-sdk-golang/test/mock_message"
"github.com/XiaoMi/talos-sdk-golang/client"
"github.com/XiaoMi/talos-sdk-golang/test/mock_client"
"github.com/XiaoMi/talos-sdk-golang/test/mock_message"

"github.com/golang/mock/gomock"
log4go "github.com/sirupsen/logrus"
Expand All @@ -35,7 +35,8 @@ const (
)

func TestNewAndGetScheduleInfoCache(t *testing.T) {
log4go.Close()
//log4go.Close()
var logger log4go.Logger
properties := utils.NewProperties()
properties.SetProperty("galaxy.talos.service.endpoint", "https://talos.api.xiaomi.com")
clientConfig := client.NewTalosClientConfigByProperties(properties)
Expand All @@ -54,25 +55,26 @@ func TestNewAndGetScheduleInfoCache(t *testing.T) {
defer ctrl.Finish()
mockMessageClient := mock_message.NewMockMessageService(ctrl)
mockMessageClient.EXPECT().GetScheduleInfo(gomock.Any()).Return(getScheduleInfoResponse, nil).Times(1)
mockClientFactory := mock_client.NewMockTalosClient(ctrl)
mockClientFactory := mock_client.NewMockTalosClientFactoryInterface(ctrl)

//test new scheduleinfocache
scheduleInfoCache0 := client.NewScheduleInfoCache(topicTalosResourceName0,
clientConfig, mockMessageClient, mockClientFactory)
scheduleInfoCache0 := client.NewAutoLocationScheduleInfoCache(topicTalosResourceName0,
clientConfig, mockMessageClient, mockClientFactory, &logger)

scheduleInfoCache1 := scheduleInfoCache0.GetScheduleInfoCache(topicTalosResourceName0)
scheduleInfoCache1 := scheduleInfoCache0.GetScheduleInfo(topicTalosResourceName0)
assert.NotNil(t, scheduleInfoCache1)
assert.Equal(t, scheduleInfoCache0, scheduleInfoCache1)

scheduleInfoCache2 := scheduleInfoCache0.GetScheduleInfoCache(topicTalosResourceName1)
scheduleInfoCache2 := scheduleInfoCache0.GetScheduleInfo(topicTalosResourceName1)
assert.Nil(t, scheduleInfoCache2)
assert.NotEqual(t, scheduleInfoCache0, scheduleInfoCache2)

}

func TestGetOrCreatMessageClient(t *testing.T) {
//log4go.LoadConfiguration("log4go.xml")
defer log4go.Close()
//defer log4go.Close()
var logger log4go.Logger
properties := utils.NewProperties()
properties.SetProperty("galaxy.talos.service.endpoint", "https://talos.api.xiaomi.com")
clientConfig := client.NewTalosClientConfigByProperties(properties)
Expand Down Expand Up @@ -111,18 +113,18 @@ func TestGetOrCreatMessageClient(t *testing.T) {
mockMessageClient2.EXPECT().GetScheduleInfo(gomock.Any()).Return(getScheduleInfoResponse, nil).Times(2)
//mockMessageClient3 := mock_message.NewMockMessageService(ctrl)

mockClientFactory := mock_client.NewMockTalosClient(ctrl)
mockClientFactory := mock_client.NewMockTalosClientFactoryInterface(ctrl)
mockClientFactory.EXPECT().NewMessageClient("http://" + host0).Return(mockMessageClient1)
//mockClientFactory.EXPECT().NewMessageClient("http://" + host1).Return(mockMessageClient2)
//mockClientFactory.EXPECT().NewMessageClient("http://" + host2).Return(mockMessageClient3)

scheduleInfoCache := client.NewScheduleInfoCache(topicTalosResourceName1,
clientConfig, mockMessageClient, mockClientFactory)
scheduleInfoCache := client.NewAutoLocationScheduleInfoCache(topicTalosResourceName1,
clientConfig, mockMessageClient, mockClientFactory, &logger)

assert.Equal(t, mockMessageClient, scheduleInfoCache.GetOrCreateMessageClient(topicAndPartition0))
assert.Equal(t, mockMessageClient1, scheduleInfoCache.GetOrCreateMessageClient(topicAndPartition1))

scheduleInfoCache2 := client.NewScheduleInfoCache(topicTalosResourceName1,
clientConfig, mockMessageClient2, mockClientFactory)
scheduleInfoCache2 := client.NewAutoLocationScheduleInfoCache(topicTalosResourceName1,
clientConfig, mockMessageClient2, mockClientFactory, &logger)
scheduleInfoCache2.UpdateScheduleInfoCache()
}
6 changes: 3 additions & 3 deletions test/client/serialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import (

"reflect"

"talos-sdk-golang/thrift/message"
"github.com/XiaoMi/talos-sdk-golang/thrift/message"

"talos-sdk-golang/utils"
"github.com/XiaoMi/talos-sdk-golang/utils"

"talos-sdk-golang/client/serialization"
"github.com/XiaoMi/talos-sdk-golang/client/serialization"

"github.com/stretchr/testify/assert"
)
Expand Down
2 changes: 1 addition & 1 deletion test/mock_admin/mock_TalosAdmin.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit a0039f7

Please sign in to comment.