Skip to content

Commit

Permalink
Go: XREAD. (#2882)
Browse files Browse the repository at this point in the history
* Go: `XREAD`.

Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand authored Jan 15, 2025
1 parent ea67717 commit aa8760b
Show file tree
Hide file tree
Showing 6 changed files with 307 additions and 4 deletions.
85 changes: 85 additions & 0 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1324,6 +1324,91 @@ func (client *baseClient) XAddWithOptions(
return handleStringOrNilResponse(result)
}

// Reads entries from the given streams.
//
// Note:
//
// When in cluster mode, all keys in `keysAndIds` must map to the same hash slot.
//
// See [valkey.io] for details.
//
// Parameters:
//
// keysAndIds - A map of keys and entry IDs to read from.
//
// Return value:
// A `map[string]map[string][][]string` of stream keys to a map of stream entry IDs mapped to an array entries or `nil` if
// a key does not exist or does not contain requiested entries.
//
// For example:
//
// result, err := client.XRead({"stream1": "0-0", "stream2": "0-1"})
// err == nil: true
// result: map[string]map[string][][]string{
// "stream1": {"0-1": {{"field1", "value1"}}, "0-2": {{"field2", "value2"}, {"field2", "value3"}}},
// "stream2": {},
// }
//
// [valkey.io]: https://valkey.io/commands/xread/
func (client *baseClient) XRead(keysAndIds map[string]string) (map[string]map[string][][]string, error) {
return client.XReadWithOptions(keysAndIds, options.NewXReadOptions())
}

// Reads entries from the given streams.
//
// Note:
//
// When in cluster mode, all keys in `keysAndIds` must map to the same hash slot.
//
// See [valkey.io] for details.
//
// Parameters:
//
// keysAndIds - A map of keys and entry IDs to read from.
// options - Options detailing how to read the stream.
//
// Return value:
// A `map[string]map[string][][]string` of stream keys to a map of stream entry IDs mapped to an array entries or `nil` if
// a key does not exist or does not contain requiested entries.
//
// For example:
//
// options := options.NewXReadOptions().SetBlock(100500)
// result, err := client.XReadWithOptions({"stream1": "0-0", "stream2": "0-1"}, options)
// err == nil: true
// result: map[string]map[string][][]string{
// "stream1": {"0-1": {{"field1", "value1"}}, "0-2": {{"field2", "value2"}, {"field2", "value3"}}},
// "stream2": {},
// }
//
// [valkey.io]: https://valkey.io/commands/xread/
func (client *baseClient) XReadWithOptions(
keysAndIds map[string]string,
options *options.XReadOptions,
) (map[string]map[string][][]string, error) {
args := make([]string, 0, 5+2*len(keysAndIds))
optionArgs, _ := options.ToArgs()
args = append(args, optionArgs...)

// Note: this loop iterates in an indeterminate order, but it is OK for that case
keys := make([]string, 0, len(keysAndIds))
values := make([]string, 0, len(keysAndIds))
for key := range keysAndIds {
keys = append(keys, key)
values = append(values, keysAndIds[key])
}
args = append(args, "STREAMS")
args = append(args, keys...)
args = append(args, values...)

result, err := client.executeCommand(C.XRead, args)
if err != nil {
return nil, err
}

return handleXReadResponse(result)
}

func (client *baseClient) ZAdd(
key string,
membersScoreMap map[string]float64,
Expand Down
39 changes: 36 additions & 3 deletions go/api/options/stream_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func NewXAddOptions() *XAddOptions {
return &XAddOptions{}
}

// New entry will be added with this `id.
// New entry will be added with this `id`.
func (xao *XAddOptions) SetId(id string) *XAddOptions {
xao.id = id
return xao
Expand All @@ -47,7 +47,6 @@ func (xao *XAddOptions) SetTrimOptions(options *XTrimOptions) *XAddOptions {

func (xao *XAddOptions) ToArgs() ([]string, error) {
args := []string{}
var err error
if xao.makeStream == triStateBoolFalse {
args = append(args, "NOMKSTREAM")
}
Expand All @@ -63,7 +62,7 @@ func (xao *XAddOptions) ToArgs() ([]string, error) {
} else {
args = append(args, "*")
}
return args, err
return args, nil
}

// Optional arguments for `XTrim` and `XAdd` in [StreamCommands]
Expand Down Expand Up @@ -116,3 +115,37 @@ func (xTrimOptions *XTrimOptions) ToArgs() ([]string, error) {
}
return args, nil
}

// Optional arguments for `XRead` in [StreamCommands]
type XReadOptions struct {
count, block int64
}

// Create new empty `XReadOptions`
func NewXReadOptions() *XReadOptions {
return &XReadOptions{-1, -1}
}

// The maximal number of elements requested. Equivalent to `COUNT` in the Valkey API.
func (xro *XReadOptions) SetCount(count int64) *XReadOptions {
xro.count = count
return xro
}

// If set, the request will be blocked for the set amount of milliseconds or until the server has
// the required number of entries. A value of `0` will block indefinitely. Equivalent to `BLOCK` in the Valkey API.
func (xro *XReadOptions) SetBlock(block int64) *XReadOptions {
xro.block = block
return xro
}

func (xro *XReadOptions) ToArgs() ([]string, error) {
args := []string{}
if xro.count >= 0 {
args = append(args, "COUNT", utils.IntToString(xro.count))
}
if xro.block >= 0 {
args = append(args, "BLOCK", utils.IntToString(xro.block))
}
return args, nil
}
105 changes: 105 additions & 0 deletions go/api/response_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import "C"

import (
"fmt"
"reflect"
"unsafe"
)

Expand Down Expand Up @@ -496,3 +497,107 @@ func convertToResultStringArray(input []interface{}) ([]Result[string], error) {
}
return result, nil
}

// get type of T
func getType[T any]() reflect.Type {
var zero [0]T
return reflect.TypeOf(zero).Elem()
}

// convert (typecast) untyped response into a typed value
// for example, an arbitrary array `[]interface{}` into `[]string`
type responseConverter interface {
convert(data interface{}) (interface{}, error)
}

// convert maps, T - type of the value, key is string
type mapConverter[T any] struct {
next responseConverter
}

func (node mapConverter[T]) convert(data interface{}) (interface{}, error) {
result := make(map[string]T)

for key, value := range data.(map[string]interface{}) {
if node.next == nil {
valueT, ok := value.(T)
if !ok {
return nil, &RequestError{fmt.Sprintf("Unexpected type received: %T, expected: %v", value, getType[T]())}
}
result[key] = valueT
} else {
val, err := node.next.convert(value)
if err != nil {
return nil, err
}
valueT, ok := val.(T)
if !ok {
return nil, &RequestError{fmt.Sprintf("Unexpected type received: %T, expected: %v", valueT, getType[T]())}
}
result[key] = valueT
}
}

return result, nil
}

// convert arrays, T - type of the value
type arrayConverter[T any] struct {
next responseConverter
}

func (node arrayConverter[T]) convert(data interface{}) (interface{}, error) {
arrData := data.([]interface{})
result := make([]T, 0, len(arrData))
for _, value := range arrData {
if node.next == nil {
valueT, ok := value.(T)
if !ok {
return nil, &RequestError{fmt.Sprintf("Unexpected type received: %T, expected: %v", value, getType[T]())}
}
result = append(result, valueT)
} else {
val, err := node.next.convert(value)
if err != nil {
return nil, err
}
valueT, ok := val.(T)
if !ok {
return nil, &RequestError{fmt.Sprintf("Unexpected type received: %T, expected: %v", valueT, getType[T]())}
}
result = append(result, valueT)
}
}

return result, nil
}

// TODO: convert sets

func handleXReadResponse(response *C.struct_CommandResponse) (map[string]map[string][][]string, error) {
defer C.free_command_response(response)
data, err := parseMap(response)
if err != nil {
return nil, err
}
if data == nil {
return nil, nil
}

converters := mapConverter[map[string][][]string]{
mapConverter[[][]string]{
arrayConverter[[]string]{
arrayConverter[string]{},
},
},
}

res, err := converters.convert(data)
if err != nil {
return nil, err
}
if result, ok := res.(map[string]map[string][][]string); ok {
return result, nil
}
return nil, &RequestError{fmt.Sprintf("unexpected type received: %T", res)}
}
4 changes: 4 additions & 0 deletions go/api/stream_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,9 @@ type StreamCommands interface {
// [valkey.io]: https://valkey.io/commands/xlen/
XLen(key string) (int64, error)

XRead(keysAndIds map[string]string) (map[string]map[string][][]string, error)

XReadWithOptions(keysAndIds map[string]string, options *options.XReadOptions) (map[string]map[string][][]string, error)

XDel(key string, ids []string) (int64, error)
}
76 changes: 76 additions & 0 deletions go/integTest/shared_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4105,6 +4105,82 @@ func (suite *GlideTestSuite) TestXAddWithOptions() {
})
}

func (suite *GlideTestSuite) TestXRead() {
suite.runWithDefaultClients(func(client api.BaseClient) {
key1 := "{xread}" + uuid.NewString()
key2 := "{xread}" + uuid.NewString()
key3 := "{xread}" + uuid.NewString()

// key does not exist
read, err := client.XRead(map[string]string{key1: "0-0"})
assert.Nil(suite.T(), err)
assert.Nil(suite.T(), read)

res, err := client.XAddWithOptions(
key1,
[][]string{{"k1_field1", "k1_value1"}, {"k1_field1", "k1_value2"}},
options.NewXAddOptions().SetId("0-1"),
)
assert.Nil(suite.T(), err)
assert.False(suite.T(), res.IsNil())

res, err = client.XAddWithOptions(key2, [][]string{{"k2_field1", "k2_value1"}}, options.NewXAddOptions().SetId("2-0"))
assert.Nil(suite.T(), err)
assert.False(suite.T(), res.IsNil())

// reading ID which does not exist yet
read, err = client.XRead(map[string]string{key1: "100-500"})
assert.Nil(suite.T(), err)
assert.Nil(suite.T(), read)

read, err = client.XRead(map[string]string{key1: "0-0", key2: "0-0"})
assert.Nil(suite.T(), err)
assert.Equal(suite.T(), map[string]map[string][][]string{
key1: {
"0-1": {{"k1_field1", "k1_value1"}, {"k1_field1", "k1_value2"}},
},
key2: {
"2-0": {{"k2_field1", "k2_value1"}},
},
}, read)

// Key exists, but it is not a stream
client.Set(key3, "xread")
_, err = client.XRead(map[string]string{key1: "0-0", key3: "0-0"})
assert.NotNil(suite.T(), err)
assert.IsType(suite.T(), &api.RequestError{}, err)

// ensure that commands doesn't time out even if timeout > request timeout
var testClient api.BaseClient
if _, ok := client.(api.GlideClient); ok {
testClient = suite.client(api.NewGlideClientConfiguration().
WithAddress(&suite.standaloneHosts[0]).
WithUseTLS(suite.tls))
} else {
testClient = suite.clusterClient(api.NewGlideClusterClientConfiguration().
WithAddress(&suite.clusterHosts[0]).
WithUseTLS(suite.tls))
}
read, err = testClient.XReadWithOptions(map[string]string{key1: "0-1"}, options.NewXReadOptions().SetBlock(1000))
assert.Nil(suite.T(), err)
assert.Nil(suite.T(), read)

// with 0 timeout (no timeout) should never time out,
// but we wrap the test with timeout to avoid test failing or stuck forever
finished := make(chan bool)
go func() {
testClient.XReadWithOptions(map[string]string{key1: "0-1"}, options.NewXReadOptions().SetBlock(0))
finished <- true
}()
select {
case <-finished:
assert.Fail(suite.T(), "Infinite block finished")
case <-time.After(3 * time.Second):
}
testClient.Close()
})
}

func (suite *GlideTestSuite) TestZAddAndZAddIncr() {
suite.runWithDefaultClients(func(client api.BaseClient) {
key := uuid.New().String()
Expand Down
2 changes: 1 addition & 1 deletion go/utils/transform_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func MapToString(parameter map[string]string) []string {
return flat
}

// Flattens a map[string, V] to a value-key string array
// Flattens a map[string, V] to a value-key string array like { value1, key1, value2, key2..}
func ConvertMapToValueKeyStringArray[V any](args map[string]V) []string {
result := make([]string, 0, len(args)*2)
for key, value := range args {
Expand Down

0 comments on commit aa8760b

Please sign in to comment.