From c5bf0c3d4a9114c0d67e917a6c6807eae85812d1 Mon Sep 17 00:00:00 2001 From: Mukti Date: Mon, 16 Sep 2024 20:06:00 +0700 Subject: [PATCH] fix: handle compressed timestamp (#438) * fix: decoder on check message's header by mask * fix: encoder on compressing timestamp into header --- decoder/decoder.go | 5 ++- decoder/decoder_test.go | 96 +++++++++++++++++++++++++++++++++++++++++ decoder/raw.go | 2 +- encoder/encoder.go | 25 ++++++----- encoder/encoder_test.go | 74 +++++++++++++++++++++++++------ 5 files changed, 176 insertions(+), 26 deletions(-) diff --git a/decoder/decoder.go b/decoder/decoder.go index b4bda9ae..1d772e6c 100644 --- a/decoder/decoder.go +++ b/decoder/decoder.go @@ -534,7 +534,10 @@ func (d *Decoder) decodeMessage() error { return err } header := b[0] - if (header & proto.MesgDefinitionMask) == proto.MesgDefinitionMask { + + // NOTE: Compressed Timestamp Header Bit 5-6 is the local message type. + // Bit 6 overlap with MesgDefinitionMask; It's a message definition only if Bit 7 is zero. + if (header & (proto.MesgCompressedHeaderMask | proto.MesgDefinitionMask)) == proto.MesgDefinitionMask { return d.decodeMessageDefinition(header) } return d.decodeMessageData(header) diff --git a/decoder/decoder_test.go b/decoder/decoder_test.go index 38348376..ac3de3cd 100644 --- a/decoder/decoder_test.go +++ b/decoder/decoder_test.go @@ -1263,6 +1263,102 @@ func TestDecodeFileHeader(t *testing.T) { } } +func TestDecodeMessage(t *testing.T) { + now := time.Now() + + tt := []struct { + name string + r io.Reader // must consist of mesgDef and mesg + timestampReference uint32 + mesgDef *proto.MessageDefinition + mesg proto.Message + err error + }{ + { + name: "header with compressed timestamp", + r: bytes.NewBuffer(append( + /* mesgDef */ []byte{67, 0, 0, 21, 0, 3, 3, 4, 134, 0, 1, 0, 1, 1, 0}, + /* mesg */ []byte{0b11100000 | byte(datetime.ToUint32(now)&proto.CompressedTimeMask), 0, 0, 0, 0, 0, 0}..., + )), + timestampReference: datetime.ToUint32(now), + mesgDef: &proto.MessageDefinition{ + Header: 67, + Reserved: 0, + Architecture: 0, + MesgNum: mesgnum.Event, + FieldDefinitions: []proto.FieldDefinition{ + {Num: 3, Size: 4, BaseType: 134}, + {Num: 0, Size: 1, BaseType: 0}, + {Num: 1, Size: 1, BaseType: 0}, + }, + }, + mesg: proto.Message{ + Header: 0b11100000 | byte(datetime.ToUint32(now)&proto.CompressedTimeMask), + Num: mesgnum.Event, + Fields: []proto.Field{ + factory.CreateField(mesgnum.Event, fieldnum.EventTimestamp). + WithValue(datetime.ToUint32(now)), + factory.CreateField(mesgnum.Event, fieldnum.EventData). + WithValue(uint32(0)), + factory.CreateField(mesgnum.Event, fieldnum.EventEvent).WithValue(typedef.EventTimer), + factory.CreateField(mesgnum.Event, fieldnum.EventEventType).WithValue(typedef.EventTypeStart), + }, + }, + }, + } + + for i, tc := range tt { + t.Run(fmt.Sprintf("[%d] %s", i, tc.name), func(t *testing.T) { + dec := New(tc.r) + dec.timestamp = tc.timestampReference + dec.lastTimeOffset = byte(tc.timestampReference & proto.CompressedTimeMask) + for i := 0; i < 2; i++ { + err := dec.decodeMessage() + if !errors.Is(err, tc.err) { + t.Fatalf("expected error: %v, got: %v", tc.err, err) + } + if err != nil { + return + } + } + + var mesgDef *proto.MessageDefinition + for _, v := range dec.localMessageDefinitions { + if v != nil { + mesgDef = v + break + } + } + if diff := cmp.Diff(mesgDef, tc.mesgDef, + cmp.Transformer("MessageDefinition", func(m *proto.MessageDefinition) *proto.MessageDefinition { + if len(m.DeveloperFieldDefinitions) == 0 { + m.DeveloperFieldDefinitions = nil + } + return m + }), + ); diff != "" { + t.Fatal(diff) + } + + if len(dec.messages) == 0 { + t.Fatalf("no message is decoded") + } + + if diff := cmp.Diff(dec.messages[0], tc.mesg, + cmp.Transformer("Message", func(m proto.Message) proto.Message { + if len(m.DeveloperFields) == 0 { + m.DeveloperFields = nil + } + return m + }), + cmp.AllowUnexported(proto.Value{}), + ); diff != "" { + t.Fatal(diff) + } + }) + } +} + func TestDecodeMessageDefinition(t *testing.T) { fit, buf := createFitForTest() diff --git a/decoder/raw.go b/decoder/raw.go index 421c1a8b..decba572 100644 --- a/decoder/raw.go +++ b/decoder/raw.go @@ -136,7 +136,7 @@ func (d *RawDecoder) Decode(r io.Reader, fn func(flag RawFlag, b []byte) error) } // 2. a. Decode Message Definition - if (d.BytesArray[0] & proto.MesgDefinitionMask) == proto.MesgDefinitionMask { + if (d.BytesArray[0] & (proto.MesgCompressedHeaderMask | proto.MesgDefinitionMask)) == proto.MesgDefinitionMask { const fixedSize = uint16(6) // Header + Reserved + Architecture + MesgNum (2 bytes) + n Fields nr, err = io.ReadFull(r, d.BytesArray[1:fixedSize]) n += int64(nr) diff --git a/encoder/encoder.go b/encoder/encoder.go index 5798588e..e9b7f823 100644 --- a/encoder/encoder.go +++ b/encoder/encoder.go @@ -439,6 +439,7 @@ func (e *Encoder) encodeMessage(mesg *proto.Message) (err error) { return fmt.Errorf("message validation failed: %w", err) } + var compressed bool if e.options.headerOption == headerOptionCompressedTimestamp { if e.w == io.Discard { // NOTE: Only for calculating data size (Early Check Strategy) @@ -451,7 +452,7 @@ func (e *Encoder) encodeMessage(mesg *proto.Message) (err error) { } } prevLen := len(mesg.Fields) - e.compressTimestampIntoHeader(mesg) + compressed = e.compressTimestampIntoHeader(mesg) if prevLen > len(mesg.Fields) { defer func() { // Revert: put timestamp field back at original index mesg.Fields = mesg.Fields[:prevLen] @@ -460,7 +461,7 @@ func (e *Encoder) encodeMessage(mesg *proto.Message) (err error) { }() } } else { - e.compressTimestampIntoHeader(mesg) + compressed = e.compressTimestampIntoHeader(mesg) } } @@ -471,9 +472,12 @@ func (e *Encoder) encodeMessage(mesg *proto.Message) (err error) { b, _ := mesgDef.MarshalAppend(e.buf[:0]) localMesgNum, isNewMesgDef := e.localMesgNumLRU.Put(b) // This might alloc memory since we need to copy the item. - if e.options.headerOption == headerOptionNormal { - b[0] = (b[0] &^ proto.LocalMesgNumMask) | localMesgNum // Update the message definition header. - mesg.Header = (mesg.Header &^ proto.LocalMesgNumMask) | localMesgNum + + b[0] |= localMesgNum // Update the message definition header. + if compressed { + // TODO: implement compressed timestamp with multiple local messages type. + } else { + mesg.Header |= localMesgNum } var n int @@ -502,28 +506,27 @@ func (e *Encoder) encodeMessage(mesg *proto.Message) (err error) { return nil } -func (e *Encoder) compressTimestampIntoHeader(mesg *proto.Message) { +func (e *Encoder) compressTimestampIntoHeader(mesg *proto.Message) (ok bool) { timestamp := mesg.FieldValueByNum(proto.FieldNumTimestamp).Uint32() if timestamp == basetype.Uint32Invalid { - return // not supported + return false // not supported } if timestamp < uint32(typedef.DateTimeMin) { - return + return false } // The 5-bit time offset rolls over every 32 seconds, it is necessary that the difference // between timestamp and timestamp reference be measured less than 32 seconds apart. if (timestamp - e.timestampReference) > proto.CompressedTimeMask { e.timestampReference = timestamp - return // Rollover event occurs, keep it as it is. + return false // Rollover event occurs, keep it as it is. } - e.timestampReference = timestamp - timeOffset := byte(timestamp & proto.CompressedTimeMask) mesg.Header |= proto.MesgCompressedHeaderMask | timeOffset mesg.RemoveFieldByNum(proto.FieldNumTimestamp) + return true } func (e *Encoder) newMessageDefinition(mesg *proto.Message) *proto.MessageDefinition { diff --git a/encoder/encoder_test.go b/encoder/encoder_test.go index ab184acc..25257a69 100644 --- a/encoder/encoder_test.go +++ b/encoder/encoder_test.go @@ -1178,9 +1178,12 @@ func TestCompressTimestampInHeader(t *testing.T) { now := time.Now() offset := byte(datetime.ToUint32(now) & proto.CompressedTimeMask) tt := []struct { - name string - mesgs []proto.Message - headers []byte + name string + mesgs []proto.Message + headers []byte + lenFields []int + compresseds []bool + timestampReferences []uint32 }{ { name: "compress timestamp in header happy flow", @@ -1207,7 +1210,16 @@ func TestCompressTimestampInHeader(t *testing.T) { proto.MesgNormalHeaderMask, // record: the message containing timestamp reference prior to the use of compressed header. proto.MesgCompressedHeaderMask | (offset+1)&proto.CompressedTimeMask, proto.MesgCompressedHeaderMask | (offset+2)&proto.CompressedTimeMask, - proto.MesgCompressedHeaderMask | (offset+32)&proto.CompressedTimeMask, + proto.MesgNormalHeaderMask, + }, + lenFields: []int{2, 1, 0, 0, 1}, + compresseds: []bool{false, false, true, true, false}, + timestampReferences: []uint32{ + 0, + datetime.ToUint32(now), + datetime.ToUint32(now), + datetime.ToUint32(now), + datetime.ToUint32(now.Add(32 * time.Second)), }, }, { @@ -1233,6 +1245,14 @@ func TestCompressTimestampInHeader(t *testing.T) { proto.MesgNormalHeaderMask, // record: roll over has occurred, the timestamp is used new timestamp reference. proto.MesgCompressedHeaderMask | (offset+1)&proto.CompressedTimeMask, }, + lenFields: []int{2, 1, 1, 0}, + compresseds: []bool{false, false, false, true}, + timestampReferences: []uint32{ + 0, + datetime.ToUint32(now), + datetime.ToUint32(now.Add(32 * time.Second)), + datetime.ToUint32(now.Add(32 * time.Second)), // same as prev timestamp + }, }, { name: "timestamp less than DateTimeMin", @@ -1249,9 +1269,15 @@ func TestCompressTimestampInHeader(t *testing.T) { proto.MesgNormalHeaderMask, proto.MesgNormalHeaderMask, }, + lenFields: []int{2, 1}, + compresseds: []bool{false, false}, + timestampReferences: []uint32{ + 0, + 0, // less than DateTimeMin do not change timestampReference + }, }, { - name: "timestamp wrong type not uint32 or typedef.DateTime", + name: "timestamp type typedef.DateTime", mesgs: []proto.Message{ {Num: mesgnum.FileId, Fields: []proto.Field{ factory.CreateField(mesgnum.FileId, fieldnum.FileIdManufacturer).WithValue(typedef.ManufacturerGarmin), @@ -1265,9 +1291,15 @@ func TestCompressTimestampInHeader(t *testing.T) { proto.MesgNormalHeaderMask, proto.MesgNormalHeaderMask, }, + lenFields: []int{2, 1}, + compresseds: []bool{false, false}, + timestampReferences: []uint32{ + 0, + datetime.ToUint32(now), // typedef.Datetime will be converted into uint32 in proto.Value + }, }, { - name: "timestamp wrong type not uint32 or typedef.DateTime", + name: "timestamp wrong type not uint32 or typedef.DateTime: time.Time", mesgs: []proto.Message{ {Num: mesgnum.FileId, Fields: []proto.Field{ factory.CreateField(mesgnum.FileId, fieldnum.FileIdManufacturer).WithValue(typedef.ManufacturerGarmin), @@ -1281,19 +1313,35 @@ func TestCompressTimestampInHeader(t *testing.T) { proto.MesgNormalHeaderMask, proto.MesgNormalHeaderMask, }, + lenFields: []int{2, 1}, + compresseds: []bool{false, false}, + timestampReferences: []uint32{ + 0, + 0, + }, }, } - for _, tc := range tt { - t.Run(tc.name, func(t *testing.T) { + for i, tc := range tt { + t.Run(fmt.Sprintf("[%d] %s", i, tc.name), func(t *testing.T) { enc := New(nil) - for i := range tc.mesgs { - enc.compressTimestampIntoHeader(&tc.mesgs[i]) + for j := range tc.mesgs { + compressed := enc.compressTimestampIntoHeader(&tc.mesgs[j]) + if compressed != tc.compresseds[j] { + t.Errorf("index: %d: expected compressed: %t, got: %t", j, tc.compresseds[j], compressed) + } + if enc.timestampReference != tc.timestampReferences[j] { + t.Errorf("index: %d: expected timestampReference: %d, got: %d", + j, tc.timestampReferences[j], enc.timestampReference) + } } // Now that all message have been processed let's check the header - for i := range tc.mesgs { - if diff := cmp.Diff(tc.mesgs[i].Header, tc.headers[i]); diff != "" { - t.Errorf("index: %d: %s", i, diff) + for j := range tc.mesgs { + if diff := cmp.Diff(tc.mesgs[j].Header, tc.headers[j]); diff != "" { + t.Errorf("index: %d: %s", j, diff) + } + if l := len(tc.mesgs[j].Fields); l != tc.lenFields[j] { + t.Errorf("index: %d: expected len fields: %d, got: %d", j, l, tc.lenFields[j]) } } })