Skip to content

Commit

Permalink
Use smart buffering (#520)
Browse files Browse the repository at this point in the history
Summary:

when reading the subprocess stdout to prevent extra new lines in output

Inspired by https://github.com/uber-go/zap/blob/0ba452dbe15478739ad4bab1067706018a3062c6/zapio/writer.go#L100

---
ttpforge reads stdout/stderr inaccurately causing such lines in logs:

```
INFO	[STDERR] curl: (56) Received HTT
INFO	[STDERR] P code 407 from proxy after CONNECT
```
the code: https://fburl.com/code/52hzjy4j

Differential Revision: D66702400
  • Loading branch information
inesusvet authored and facebook-github-bot committed Dec 4, 2024
1 parent 102c30c commit d49962c
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 22 deletions.
79 changes: 57 additions & 22 deletions pkg/blocks/iocapture.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,39 +27,77 @@ import (
"github.com/facebookincubator/ttpforge/pkg/logging"
)

type zapWriter struct {
prefix string
type bufferedWriter struct {
buff bytes.Buffer
writer io.Writer
}

func (z *zapWriter) Write(b []byte) (int, error) {
n := len(b)
// extra-defensive programming :P
if n <= 0 {
return 0, nil
func (bw *bufferedWriter) Write(b []byte) (n int, err error) {
n = len(b)
for len(b) > 0 {
b = bw.writeLine(b)
}
return n, nil
}

// strip trailing newline
if b[n-1] == '\n' {
b = b[:n-1]
func (bw *bufferedWriter) writeLine(line []byte) (remaining []byte) {
idx := bytes.IndexByte(line, '\n')
if idx < 0 {
// If there are no newlines, buffer the entire string.
bw.buff.Write(line)
return nil
}

// split lines
lines := bytes.Split(b, []byte{'\n'})
for _, line := range lines {
logging.L().Info(z.prefix, string(line))
// Split on the newline, buffer and flush the left.
line, remaining = line[:idx], line[idx+1:]

// Fast path: if we don't have a partial message from a previous write
// in the buffer, skip the buffer and log directly.
if bw.buff.Len() == 0 {
bw.log(line)
return remaining
}
return n, nil

bw.buff.Write(line)
bw.log(bw.buff.Bytes())
bw.buff.Reset()
return remaining
}

func (bw *bufferedWriter) log(line []byte) {
bw.writer.Write(line)
}

func (bw *bufferedWriter) Close() error {
if bw.buff.Len() != 0 {
bw.log(bw.buff.Bytes())
bw.buff.Reset()
}
return nil
}

type zapWriter struct {
prefix string
}

func (zw *zapWriter) Write(p []byte) (n int, err error) {
logging.L().Info(zw.prefix, string(p))
return len(p), nil
}

func streamAndCapture(cmd exec.Cmd, stdout, stderr io.Writer) (*ActResult, error) {
if stdout == nil {
stdout = &zapWriter{
prefix: "[STDOUT] ",
stdout = &bufferedWriter{
writer: &zapWriter{
prefix: "[STDOUT] ",
},
}
}
if stderr == nil {
stderr = &zapWriter{
prefix: "[STDERR] ",
stderr = &bufferedWriter{
writer: &zapWriter{
prefix: "[STDERR] ",
},
}
}

Expand All @@ -75,8 +113,5 @@ func streamAndCapture(cmd exec.Cmd, stdout, stderr io.Writer) (*ActResult, error
result := ActResult{}
result.Stdout = outStr
result.Stderr = errStr
if err != nil {
return nil, err
}
return &result, nil
}
94 changes: 94 additions & 0 deletions pkg/blocks/iocapture_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
Copyright © 2024-present, Meta Platforms, Inc. and affiliates
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/

package blocks

import (
"strings"
"testing"

"github.com/stretchr/testify/assert"
)

type mockWriter struct {
memory []byte
}

func (mw *mockWriter) Write(p []byte) (n int, err error) {
mw.memory = append(mw.memory, p...)
return len(p), nil
}

func (mw *mockWriter) GetMemory() []byte {
return mw.memory
}

func TestBufferedWriter(t *testing.T) {
testCases := []struct {
name string
textChunks []string
wantError bool
}{
{
name: "Finished line",
textChunks: []string{"Hello\n"},
wantError: false,
}, {
name: "Unfinished line",
textChunks: []string{"Hello, world", "!\n"},
wantError: false,
}, {
name: "No last newline",
textChunks: []string{"Hello,\nworld", "!\nfoobar"},
wantError: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mockWriter := &mockWriter{}
zw := &bufferedWriter{
writer: mockWriter,
}
var totalBytesWritten int
for _, text := range tc.textChunks {
bytesWritten, err := zw.Write([]byte(text))
totalBytesWritten += bytesWritten
if tc.wantError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
}
zw.Close()

expectedTotalBytesWritten := 0
for _, text := range tc.textChunks {
expectedTotalBytesWritten += len(text)
}
assert.Equal(t, expectedTotalBytesWritten, totalBytesWritten)
expectedBytesWritten := []byte{}
for _, text := range tc.textChunks {
text = strings.ReplaceAll(text, "\n", "")
expectedBytesWritten = append(expectedBytesWritten, []byte(text)...)
}
assert.Equal(t, expectedBytesWritten, mockWriter.GetMemory())
})
}
}

0 comments on commit d49962c

Please sign in to comment.