forked from linkedin/goavro
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmap.go
307 lines (285 loc) · 10.9 KB
/
map.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
// Copyright [2017] LinkedIn Corp. Licensed under the Apache License, Version
// 2.0 (the "License"); you may not use this file except in compliance with the
// License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
package goavro
import (
"errors"
"fmt"
"io"
"math"
"reflect"
)
func makeMapCodec(st map[string]*Codec, namespace string, schemaMap map[string]interface{}) (*Codec, error) {
// map type must have values
valueSchema, ok := schemaMap["values"]
if !ok {
return nil, errors.New("Map ought to have values key")
}
valueCodec, err := buildCodec(st, namespace, valueSchema)
if err != nil {
return nil, fmt.Errorf("Map values ought to be valid Avro type: %s", err)
}
return &Codec{
typeName: &name{"map", nullNamespace},
nativeFromBinary: func(buf []byte) (interface{}, []byte, error) {
var err error
var value interface{}
// block count and block size
if value, buf, err = longNativeFromBinary(buf); err != nil {
return nil, nil, fmt.Errorf("cannot decode binary map block count: %s", err)
}
blockCount := value.(int64)
if blockCount < 0 {
// NOTE: A negative block count implies there is a long encoded
// block size following the negative block count. We have no use
// for the block size in this decoder, so we read and discard
// the value.
if blockCount == math.MinInt64 {
// The minimum number for any signed numerical type can
// never be made positive
return nil, nil, fmt.Errorf("cannot decode binary map with block count: %d", blockCount)
}
blockCount = -blockCount // convert to its positive equivalent
if _, buf, err = longNativeFromBinary(buf); err != nil {
return nil, nil, fmt.Errorf("cannot decode binary map block size: %s", err)
}
}
// Ensure block count does not exceed some sane value.
if blockCount > MaxBlockCount {
return nil, nil, fmt.Errorf("cannot decode binary map when block count exceeds MaxBlockCount: %d > %d", blockCount, MaxBlockCount)
}
// NOTE: While the attempt of a RAM optimization shown below is not
// necessary, many encoders will encode all items in a single block.
// We can optimize amount of RAM allocated by runtime for the array
// by initializing the array for that number of items.
mapValues := make(map[string]interface{}, blockCount)
for blockCount != 0 {
// Decode `blockCount` datum values from buffer
for i := int64(0); i < blockCount; i++ {
// first decode the key string
if value, buf, err = stringNativeFromBinary(buf); err != nil {
return nil, nil, fmt.Errorf("cannot decode binary map key: %s", err)
}
key := value.(string) // string decoder always returns a string
if _, ok := mapValues[key]; ok {
return nil, nil, fmt.Errorf("cannot decode binary map: duplicate key: %q", key)
}
// then decode the value
if value, buf, err = valueCodec.nativeFromBinary(buf); err != nil {
return nil, nil, fmt.Errorf("cannot decode binary map value for key %q: %s", key, err)
}
mapValues[key] = value
}
// Decode next blockCount from buffer, because there may be more blocks
if value, buf, err = longNativeFromBinary(buf); err != nil {
return nil, nil, fmt.Errorf("cannot decode binary map block count: %s", err)
}
blockCount = value.(int64)
if blockCount < 0 {
// NOTE: A negative block count implies there is a long
// encoded block size following the negative block count. We
// have no use for the block size in this decoder, so we
// read and discard the value.
if blockCount == math.MinInt64 {
// The minimum number for any signed numerical type can
// never be made positive
return nil, nil, fmt.Errorf("cannot decode binary map with block count: %d", blockCount)
}
blockCount = -blockCount // convert to its positive equivalent
if _, buf, err = longNativeFromBinary(buf); err != nil {
return nil, nil, fmt.Errorf("cannot decode binary map block size: %s", err)
}
}
// Ensure block count does not exceed some sane value.
if blockCount > MaxBlockCount {
return nil, nil, fmt.Errorf("cannot decode binary map when block count exceeds MaxBlockCount: %d > %d", blockCount, MaxBlockCount)
}
}
return mapValues, buf, nil
},
binaryFromNative: func(buf []byte, datum interface{}) ([]byte, error) {
mapValues, err := convertMap(datum)
if err != nil {
return nil, fmt.Errorf("cannot encode binary map: %s", err)
}
keyCount := int64(len(mapValues))
var alreadyEncoded, remainingInBlock int64
for k, v := range mapValues {
if remainingInBlock == 0 { // start a new block
remainingInBlock = keyCount - alreadyEncoded
if remainingInBlock > MaxBlockCount {
// limit block count to MacBlockCount
remainingInBlock = MaxBlockCount
}
buf, _ = longBinaryFromNative(buf, remainingInBlock)
}
// only fails when given non string, so elide error checking
buf, _ = stringBinaryFromNative(buf, k)
// encode the value
if buf, err = valueCodec.binaryFromNative(buf, v); err != nil {
return nil, fmt.Errorf("cannot encode binary map value for key %q: %v: %s", k, v, err)
}
remainingInBlock--
alreadyEncoded++
}
return longBinaryFromNative(buf, 0) // append tailing 0 block count to signal end of Map
},
nativeFromTextual: func(buf []byte) (interface{}, []byte, error) {
return genericMapTextDecoder(buf, valueCodec, nil) // codecFromKey == nil
},
textualFromNative: func(buf []byte, datum interface{}) ([]byte, error) {
return genericMapTextEncoder(buf, datum, valueCodec, nil)
},
}, nil
}
// genericMapTextDecoder decodes a JSON text blob to a native Go map, using the
// codecs from codecFromKey, and if a key is not found in that map, from
// defaultCodec if provided. If defaultCodec is nil, this function returns an
// error if it encounters a map key that is not present in codecFromKey. If
// codecFromKey is nil, every map value will be decoded using defaultCodec, if
// possible.
func genericMapTextDecoder(buf []byte, defaultCodec *Codec, codecFromKey map[string]*Codec) (map[string]interface{}, []byte, error) {
var value interface{}
var err error
var b byte
lencodec := len(codecFromKey)
mapValues := make(map[string]interface{}, lencodec)
if buf, err = advanceAndConsume(buf, '{'); err != nil {
return nil, nil, err
}
if buf, _ = advanceToNonWhitespace(buf); len(buf) == 0 {
return nil, nil, io.ErrShortBuffer
}
// NOTE: Special case empty map
if buf[0] == '}' {
return mapValues, buf[1:], nil
}
// NOTE: Also terminates when read '}' byte.
for len(buf) > 0 {
// decode key string
value, buf, err = stringNativeFromTextual(buf)
if err != nil {
return nil, nil, fmt.Errorf("cannot decode textual map: expected key: %s", err)
}
key := value.(string)
// Is key already used?
if _, ok := mapValues[key]; ok {
return nil, nil, fmt.Errorf("cannot decode textual map: duplicate key: %q", key)
}
// Find a codec for the key
fieldCodec := codecFromKey[key]
if fieldCodec == nil {
fieldCodec = defaultCodec
}
if fieldCodec == nil {
return nil, nil, fmt.Errorf("cannot decode textual map: cannot determine codec: %q", key)
}
// decode colon
if buf, err = advanceAndConsume(buf, ':'); err != nil {
return nil, nil, err
}
// decode value
if buf, _ = advanceToNonWhitespace(buf); len(buf) == 0 {
return nil, nil, io.ErrShortBuffer
}
value, buf, err = fieldCodec.nativeFromTextual(buf)
if err != nil {
return nil, nil, err
}
// set map value for key
mapValues[key] = value
// either comma or closing curly brace
if buf, _ = advanceToNonWhitespace(buf); len(buf) == 0 {
return nil, nil, io.ErrShortBuffer
}
switch b = buf[0]; b {
case '}':
return mapValues, buf[1:], nil
case ',':
// no-op
default:
return nil, nil, fmt.Errorf("cannot decode textual map: expected ',' or '}'; received: %q", b)
}
// NOTE: consume comma from above
if buf, _ = advanceToNonWhitespace(buf[1:]); len(buf) == 0 {
return nil, nil, io.ErrShortBuffer
}
}
return nil, nil, io.ErrShortBuffer
}
// genericMapTextEncoder encodes a native Go map to a JSON text blob, using the
// codecs from codecFromKey, and if a key is not found in that map, from
// defaultCodec if provided. If defaultCodec is nil, this function returns an
// error if it encounters a map key that is not present in codecFromKey. If
// codecFromKey is nil, every map value will be encoded using defaultCodec, if
// possible.
func genericMapTextEncoder(buf []byte, datum interface{}, defaultCodec *Codec, codecFromKey map[string]*Codec) ([]byte, error) {
mapValues, err := convertMap(datum)
if err != nil {
return nil, fmt.Errorf("cannot encode textual map: %s", err)
}
var atLeastOne bool
buf = append(buf, '{')
for key, value := range mapValues {
atLeastOne = true
// Find a codec for the key
fieldCodec := codecFromKey[key]
if fieldCodec == nil {
fieldCodec = defaultCodec
}
if fieldCodec == nil {
return nil, fmt.Errorf("cannot encode textual map: cannot determine codec: %q", key)
}
// Encode key string
buf, err = stringTextualFromNative(buf, key)
if err != nil {
return nil, err
}
buf = append(buf, ':')
// Encode value
buf, err = fieldCodec.textualFromNative(buf, value)
if err != nil {
// field was specified in datum; therefore its value was invalid
return nil, fmt.Errorf("cannot encode textual map: value for %q does not match its schema: %s", key, err)
}
buf = append(buf, ',')
}
if atLeastOne {
return append(buf[:len(buf)-1], '}'), nil
}
return append(buf, '}'), nil
}
// convertMap converts datum to map[string]interface{} if possible.
func convertMap(datum interface{}) (map[string]interface{}, error) {
mapValues, ok := datum.(map[string]interface{})
if ok {
return mapValues, nil
}
// NOTE: When given a map of any other type, zip values to items as a
// convenience to client.
v := reflect.ValueOf(datum)
if v.Kind() != reflect.Map {
return nil, fmt.Errorf("cannot create map[string]interface{}: expected map[string]...; received: %T", datum)
}
// NOTE: Two better alternatives to the current algorithm are:
// (1) mutate the reflection tuple underneath to convert the
// map[string]int, for example, to map[string]interface{}, with
// O(1) complexity.
// (2) use copy builtin to zip the data items over with O(n) complexity,
// but more efficient than what's below.
mapValues = make(map[string]interface{}, v.Len())
for _, key := range v.MapKeys() {
k, ok := key.Interface().(string)
if !ok {
// bail when map key type is not string
return nil, fmt.Errorf("cannot create map[string]interface{}: expected map[string]...; received: %T", datum)
}
mapValues[string(k)] = v.MapIndex(key).Interface()
}
return mapValues, nil
}