Skip to content

Commit

Permalink
feat: stream encoder (#75)
Browse files Browse the repository at this point in the history
* feat: add stream encoder

* docs: update usage.md
  • Loading branch information
muktihari authored Dec 28, 2023
1 parent 0b893d3 commit 88a8072
Show file tree
Hide file tree
Showing 5 changed files with 324 additions and 15 deletions.
63 changes: 62 additions & 1 deletion docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Table of Contents:
- [Encode RAW Protocol Messages](#Encode-RAW-Protocol-Messages)
- [Encode Common File Types](#Encode-Common-File-Types)
- [Available Encode Options](#Available-Encode-Options)
- [Stream Encoder](#Stream-Encoder)

## Decoding

Expand Down Expand Up @@ -348,12 +349,19 @@ func main() {
1. **WithIgnoreChecksum**: directs the decoder to ignore the checksum, which is useful when we want to retrieve the data without considering its integrity.

Example:
Example:

```go
dec := decoder.New(f, decoder.WithIgnoreChecksum())
```

1. **WithNoComponentExpansion**: directs the Decoder to not expand the components.

Example:

```go
dec := decoder.New(f, decoder.WithNoComponentExpansion())
```

## Encoding

Note: By default, Encoder use protocol version 1.0 (proto.V1), if you want to use protocol version 2.0 (proto.V2), please specify it using Encode Option: WithProtocolVersion. See [Available Encode Options](#Available-Encode-Options)
Expand Down Expand Up @@ -622,3 +630,56 @@ Example decoding FIT file into common file `Activity File`, edit the manufacture
```

Note: we can only use either WithCompressedTimestampHeader or WithNormalHeader, can't use both at the same time.

### Stream Encoder

This is a new feature to enable encode per message basis or in streaming fashion rather than bulk per `proto.Fit`. To enable this, the Encoder's Writer should either implement io.WriterAt or io.WriteSeeker, since we need to be able to update FileHeader (the first 14 bytes) for every sequence completed. This is another building block that we can use.

```go
package main

import (
"os"
"time"

"github.com/muktihari/fit/encoder"
"github.com/muktihari/fit/factory"
"github.com/muktihari/fit/kit/datetime"
"github.com/muktihari/fit/profile/untyped/fieldnum"
"github.com/muktihari/fit/profile/untyped/mesgnum"
)

func main() {
f, err := os.OpenFile("Activity.fit", os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o777)
if err != nil {
panic(err)
}
defer f.Close()

// f (*os.File) is even implementing both io.WriterAt and io.WriteSeeker
streamEnc, err := encoder.New(f).StreamEncoder()
if err != nil {
panic(err)
}

// Simplified example, writing only this mesg.
mesg := factory.CreateMesgOnly(mesgnum.FileId).WithFields(
factory.CreateField(mesgnum.FileId, fieldnum.FileIdTimeCreated).WithValue(datetime.ToUint32(time.Now())),
)

// Write per message, we can use this to write message as it arrives.
// For example, message retrieved from decoder's Listener can be write right away
// without waiting all messages to be received.
if err := streamEnc.WriteMessage(&mesg); err != nil {
panic(err)
}

/* Write more messages */

// This should be invoked for every sequence of FIT File (not every message) to finalize.
if err := streamEnc.SequenceCompleted(); err != nil {
panic(err)
}
}

```
37 changes: 27 additions & 10 deletions encoder/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/muktihari/fit/kit/hash/crc16"
"github.com/muktihari/fit/profile"
"github.com/muktihari/fit/profile/typedef"
"github.com/muktihari/fit/profile/untyped/fieldnum"
"github.com/muktihari/fit/profile/untyped/mesgnum"
"github.com/muktihari/fit/proto"
)
Expand All @@ -25,6 +24,8 @@ var (
ErrNilWriter = errors.New("nil writer")
ErrEmptyMessages = errors.New("empty messages")
ErrMissingFileId = errors.New("missing file_id mesg")

ErrWriterAtOrWriteSeekerIsExpected = errors.New("io.WriterAt or io.WriteSeeker is expected")
)

const (
Expand Down Expand Up @@ -232,7 +233,8 @@ func (e *Encoder) encodeWithDirectUpdateStrategy(fit *proto.Fit) error {
if err := e.encodeMessages(e.w, fit.Messages); err != nil {
return err
}
if err := e.encodeCRC(&fit.CRC); err != nil {
fit.CRC = e.crc16.Sum16()
if err := e.encodeCRC(); err != nil {
return err
}
if err := e.updateHeader(&fit.FileHeader); err != nil {
Expand All @@ -252,9 +254,11 @@ func (e *Encoder) encodeWithEarlyCheckStrategy(fit *proto.Fit) error {
if err := e.encodeMessages(e.w, fit.Messages); err != nil {
return err
}
if err := e.encodeCRC(&fit.CRC); err != nil {
fit.CRC = e.crc16.Sum16()
if err := e.encodeCRC(); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -425,7 +429,7 @@ func (e *Encoder) encodeMessage(w io.Writer, mesg *proto.Message) error {
}

func (e *Encoder) compressTimestampIntoHeader(mesg *proto.Message) {
field := mesg.FieldByNum(fieldnum.TimestampCorrelationTimestamp)
field := mesg.FieldByNum(proto.FieldNumTimestamp)
if field == nil {
return
}
Expand Down Expand Up @@ -456,7 +460,7 @@ func (e *Encoder) compressTimestampIntoHeader(mesg *proto.Message) {

timeOffset := byte(timestamp & proto.CompressedTimeMask)
mesg.Header |= proto.MesgCompressedHeaderMask | timeOffset
mesg.RemoveFieldByNum(fieldnum.TimestampCorrelationTimestamp)
mesg.RemoveFieldByNum(proto.FieldNumTimestamp)
}

// writeMessage writes buf (marshaled message) to given w, and counts the total data size of all messages.
Expand All @@ -473,9 +477,7 @@ func (e *Encoder) writeMessage(w io.Writer, buf []byte) error {
return nil
}

func (e *Encoder) encodeCRC(crc *uint16) error {
*crc = e.crc16.Sum16() // update calculated crc to fit's CRC

func (e *Encoder) encodeCRC() error {
b := make([]byte, 2)
binary.LittleEndian.PutUint16(b, e.crc16.Sum16())

Expand Down Expand Up @@ -521,7 +523,8 @@ func (e *Encoder) encodeWithDirectUpdateStrategyWithContext(ctx context.Context,
if err := e.encodeMessagesWithContext(ctx, e.w, fit.Messages); err != nil {
return err
}
if err := e.encodeCRC(&fit.CRC); err != nil {
fit.CRC = e.crc16.Sum16()
if err := e.encodeCRC(); err != nil {
return err
}
if err := e.updateHeader(&fit.FileHeader); err != nil {
Expand All @@ -540,7 +543,8 @@ func (e *Encoder) encodeWithEarlyCheckStrategyWithContext(ctx context.Context, f
if err := e.encodeMessagesWithContext(ctx, e.w, fit.Messages); err != nil {
return err
}
if err := e.encodeCRC(&fit.CRC); err != nil {
fit.CRC = e.crc16.Sum16()
if err := e.encodeCRC(); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -570,3 +574,16 @@ func (e *Encoder) encodeMessagesWithContext(ctx context.Context, w io.Writer, me

return nil
}

// StreamEncoder turns this Encoder into StreamEncoder to encode per message basis or in streaming fashion.
// It returns an error if the Encoder's Writer does not implement io.WriterAt or io.WriteSeeker.
// After invoking this method, it is recommended not to use the Encoder to avoid undefined behavior.
func (e *Encoder) StreamEncoder() (*StreamEncoder, error) {
switch e.w.(type) {
case io.WriterAt, io.WriteSeeker:
return &StreamEncoder{enc: e}, nil
default:
return nil, fmt.Errorf("could not convert encoder into stream encoder: %w",
ErrWriterAtOrWriteSeekerIsExpected)
}
}
43 changes: 39 additions & 4 deletions encoder/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ type mockWriteSeeker struct {
}

var (
fnWriteOK = fnWriter(func(b []byte) (n int, err error) { return 0, nil })
fnWriteErr = fnWriter(func(b []byte) (n int, err error) { return 0, io.EOF })
fnWriteAtOK = fnWriterAt(func(p []byte, offset int64) (n int, err error) { return 0, nil })
fnWriteOK = fnWriter(func(p []byte) (n int, err error) { return len(p), nil })
fnWriteErr = fnWriter(func(p []byte) (n int, err error) { return 0, io.EOF })
fnWriteAtOK = fnWriterAt(func(p []byte, offset int64) (n int, err error) { return len(p), nil })
fnWriteAtErr = fnWriterAt(func(p []byte, offset int64) (n int, err error) { return 0, io.EOF })
fnSeekOK = fnSeeker(func(offset int64, whence int) (n int64, err error) { return 0, nil })
fnSeekErr = fnSeeker(func(offset int64, whence int) (n int64, err error) { return 0, io.EOF })
Expand Down Expand Up @@ -351,7 +351,7 @@ func makeEncodeWithEarlyCheckStrategy() []encodeWithEarlyCheckStrategyTestCase {
name: "encode crc error",
fit: &proto.Fit{Messages: []proto.Message{{}}},
w: func() io.Writer {
fnInstances := []io.Writer{fnWriteOK, fnWriteOK, fnWriteErr}
fnInstances := []io.Writer{fnWriteOK, fnWriteOK, fnWriteOK, fnWriteErr}
index := 0

return fnWriter(func(b []byte) (n int, err error) {
Expand Down Expand Up @@ -935,6 +935,41 @@ func TestEncodeMessagesWithContext(t *testing.T) {
}
}

func TestStreamEncoder(t *testing.T) {
tt := []struct {
name string
w io.Writer
err error
}{
{
name: "writer is io.WriterAt",
w: mockWriterAt{},
},
{
name: "writer is io.WriteSeeker",
w: mockWriteSeeker{},
},
{
name: "writer is pure io.Writer",
w: fnWriteOK,
err: ErrWriterAtOrWriteSeekerIsExpected,
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
_, err := New(tc.w).StreamEncoder()
if !errors.Is(err, tc.err) {
t.Errorf("expected err: %v, got: %v", tc.err, err)
}
if err != nil {
return
}

})
}
}

func createFitForBenchmark(recodSize int) *proto.Fit {
now := time.Now()
fit := new(proto.Fit)
Expand Down
48 changes: 48 additions & 0 deletions encoder/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2023 The Fit SDK for Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package encoder

import (
"fmt"

"github.com/muktihari/fit/proto"
)

// StreamEncoder is one layer above Encoder to enable encoding in streaming fashion.
// This will only valid when the Writer given to the Encoder should either implement io.WriterAt or io.WriteSeeker.
// This can only be created using (*Encoder).StreamEncoder() method.
type StreamEncoder struct {
enc *Encoder
fileHeader proto.FileHeader
fileHeaderWritten bool
}

// WriteMessage writes message to the writer, it will auto write FileHeader when
// - This method is invoked on the first time of use.
// - This method is called right after SequenceCompleted method has been called.
func (e *StreamEncoder) WriteMessage(mesg *proto.Message) error {
if !e.fileHeaderWritten {
e.fileHeader = e.enc.defaultFileHeader
if err := e.enc.encodeHeader(&e.fileHeader); err != nil {
return err
}
e.fileHeaderWritten = true
}
return e.enc.encodeMessage(e.enc.w, mesg)
}

// SequenceCompleted finalises the FIT File by updating its FileHeader's DataSize & CRC, as well as the File's CRC.
// This will also reset variables so that the StreamEncoder can be used for the next sequence of FIT file.
func (e *StreamEncoder) SequenceCompleted() error {
if err := e.enc.encodeCRC(); err != nil {
return fmt.Errorf("could not encode crc: %w", err)
}
if err := e.enc.updateHeader(&e.fileHeader); err != nil {
return fmt.Errorf("could not update header: %w", err)
}
e.fileHeaderWritten = false
e.enc.reset()
return nil
}
Loading

0 comments on commit 88a8072

Please sign in to comment.