Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 单行长日志截断上报优化 #37

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
3 changes: 2 additions & 1 deletion filebeat/input/log/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ func (h *FileHarvester) newLogFileReader() (reader.Reader, error) {
// for the worst case scenario where incoming UTF32 charchers are decoded to the single byte UTF-8 characters.
// This limit serves primarily to avoid memory bload or potential OOM with expectedly long lines in the file.
// The further size limiting is performed by LimitReader at the end of the readers pipeline as needed.
encReaderMaxBytes := h.config.MaxBytes * 4
encReaderMaxBytes := h.config.MaxBytes

r, err = readfile.NewEncodeReader(reader, readfile.Config{
Codec: h.encoding,
Expand All @@ -683,6 +683,7 @@ func (h *FileHarvester) newLogFileReader() (reader.Reader, error) {
h.config.DockerJSON.ForceCRI,
h.config.DockerJSON.CRIFlags,
h.config.IsLudicrousModeActivated(),
h.config.MaxBytes,
)
}

Expand Down
53 changes: 33 additions & 20 deletions libbeat/reader/readfile/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func NewLineReader(input io.Reader, config Config) (*LineReader, error) {

// Next reads the next line until the new line character
func (r *LineReader) Next() ([]byte, int, error) {
// This loop is need in case advance detects an line ending which turns out
// This loop is need in case advance detects a line ending which turns out
// not to be one when decoded. If that is the case, reading continues.
for {
// read next 'potential' line from input buffer/reader
Expand All @@ -97,7 +97,7 @@ func (r *LineReader) Next() ([]byte, int, error) {
end -= len(r.nl)
}

sz, err := r.decode(end)
sz, err := r.decode(end, false)
if err != nil {
logp.Err("Error decoding line: %s", err)
// In case of error increase size by unencoded length
Expand Down Expand Up @@ -203,36 +203,46 @@ func (r *LineReader) advance() error {
// Check if buffer has newLine character
idx = r.findInBufferIndex(r.inOffset, r.nl)

// If max bytes limit per line is set, then drop the lines that are longer
// 超出最大限制的长日志处理
if r.maxBytes != 0 {
// If newLine is found, drop the lines longer than maxBytes
for idx != -1 && idx > r.maxBytes {
skipped := idx + len(r.nl)
r.skippedByteCount += skipped
logp.Warn("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, skipped)
err = r.inBuffer.Advance(skipped)
// 如果已找到最后一个换行符索引位置,且超出最大限制,则找到第一行单独处理
if idx != -1 && idx > r.maxBytes {
var err error
firstIdx := r.inBuffer.IndexFrom(r.inOffset, r.nl)
if firstIdx > r.maxBytes {
_, err = r.decode(r.maxBytes, true)
r.skippedByteCount += firstIdx + len(r.nl) - r.maxBytes
} else {
_, err = r.decode(firstIdx+len(r.nl), false)
}
err = r.inBuffer.Advance(firstIdx + len(r.nl))
r.inBuffer.Reset()
r.inOffset = 0
idx = r.findInBufferIndex(r.inOffset, r.nl)
return err
}

// If newLine is not found and the incoming data buffer exceeded max bytes limit, then skip until the next newLine
// 如果未找到最后一个换行符索引位置,且超出最大限制,则分截断上报,仅处理最大限制字节数
if idx == -1 && r.inBuffer.Len() > r.maxBytes {
skipped, err := r.skipUntilNewLine(buf)
r.skippedByteCount += skipped
sz, err := r.decode(r.maxBytes, true)
if err != nil {
logp.Err("Error skipping until new line, err: %v", err)
return err
logp.Err("Error decoding line: %s", err)
// In case of error increase size by unencoded length
sz = r.maxBytes
}
logp.Warn("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, skipped)
idx = r.findInBufferIndex(r.inOffset, r.nl)
err = r.inBuffer.Advance(sz)
r.inBuffer.Reset()
r.inOffset = 0

// 跳过该行剩余字节
skipped, err := r.skipUntilNewLine(buf)
r.skippedByteCount += skipped
return err
}
}
}

// found encoded byte sequence for '\n' in buffer
// -> decode input sequence into outBuffer
sz, err := r.decode(idx + len(r.nl))
sz, err := r.decode(idx+len(r.nl), false)
if err != nil {
logp.Err("Error decoding line: %s", err)
// In case of error increase size by unencoded length
Expand Down Expand Up @@ -296,7 +306,7 @@ func (r *LineReader) skipUntilNewLine(buf []byte) (int, error) {
return skipped, nil
}

func (r *LineReader) decode(end int) (int, error) {
func (r *LineReader) decode(end int, addNl bool) (int, error) {
var err error
buffer := make([]byte, 1024)
inBytes := r.inBuffer.Bytes()
Expand All @@ -323,5 +333,8 @@ func (r *LineReader) decode(end int) (int, error) {
}

r.byteCount += start
if addNl {
r.outBuffer.Write(r.nl)
}
return start, err
}
57 changes: 34 additions & 23 deletions libbeat/reader/readfile/line_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//go:build !integration
// +build !integration

package readfile
Expand Down Expand Up @@ -98,7 +99,7 @@ func TestReaderEncodings(t *testing.T) {
}

// create line reader
reader, err := NewLineReader(ioutil.NopCloser(buffer), Config{codec, 1024, unlimited})
reader, err := NewLineReader(ioutil.NopCloser(buffer), Config{codec, 1024, unlimited, true})
if err != nil {
t.Errorf("failed to initialize reader: %v", err)
continue
Expand Down Expand Up @@ -189,7 +190,7 @@ func testReadLines(t *testing.T, inputLines [][]byte) {
// initialize reader
buffer := bytes.NewBuffer(inputStream)
codec, _ := encoding.Plain(buffer)
reader, err := NewLineReader(buffer, Config{codec, buffer.Len(), unlimited})
reader, err := NewLineReader(buffer, Config{codec, buffer.Len(), unlimited, true})
if err != nil {
t.Fatalf("Error initializing reader: %v", err)
}
Expand Down Expand Up @@ -261,7 +262,7 @@ func setupTestMaxBytesLimit(lineMaxLimit, lineLen int, nl []byte) (lines []strin
if randomBool(rnd) {
// Boundary to the lineMaxLimit
if randomBool(rnd) {
sz = randomInt(rnd, lineMaxLimit-1, lineMaxLimit+1)
sz = randomInt(rnd, lineMaxLimit-1, lineMaxLimit*4)
} else {
sz = randomInt(rnd, 0, lineLen)
}
Expand All @@ -287,10 +288,10 @@ func setupTestMaxBytesLimit(lineMaxLimit, lineLen int, nl []byte) (lines []strin
func TestMaxBytesLimit(t *testing.T) {
const (
enc = "plain"
numberOfLines = 102
bufferSize = 1024
lineMaxLimit = 3012
lineLen = 5720 // exceeds lineMaxLimit
numberOfLines = 20
bufferSize = 100
lineMaxLimit = 200
lineLen = 200 // exceeds lineMaxLimit
)

codecFactory, ok := encoding.FindEncoding(enc)
Expand All @@ -309,7 +310,7 @@ func TestMaxBytesLimit(t *testing.T) {
}

// Create line reader
reader, err := NewLineReader(ioutil.NopCloser(strings.NewReader(input)), Config{codec, bufferSize, lineMaxLimit})
reader, err := NewLineReader(ioutil.NopCloser(strings.NewReader(input)), Config{codec, bufferSize, lineMaxLimit, true})
if err != nil {
t.Fatal("failed to initialize reader:", err)
}
Expand All @@ -328,24 +329,34 @@ func TestMaxBytesLimit(t *testing.T) {

// Find the next expected line from the original test array
var line string
for ; idx < len(lines); idx++ {
// Expected to be dropped
if len(lines[idx]) > lineMaxLimit {
continue
var originLine string
var linesLen int
firstIdx := bytes.Index(b, nl)
for firstIdx != -1 {
for ; idx < len(lines); idx++ {

originLine = lines[idx]
// Expected to be dropped
if len(lines[idx]) > lineMaxLimit {
originLine = lines[idx]
lines[idx] = lines[idx][:lineMaxLimit]
}
line = lines[idx]
idx++
break
}
firstBytes := b[:firstIdx]
s := string(firstBytes)
if line != s {
t.Fatalf("lines do not match, expected: %s got: %s", line, s)
}
line = lines[idx]
idx++
break
linesLen += len(originLine) + len(nl)
b = b[firstIdx+len(nl):]
firstIdx = bytes.Index(b, nl)
}

gotLen := n - len(nl)
s := string(b[:len(b)-len(nl)])
if len(line) != gotLen {
t.Fatalf("invalid line length, expected: %d got: %d", len(line), gotLen)
if linesLen != n {
t.Fatalf("invalid line length, expected: %d got: %d", len(line), n)
}

if line != s {
t.Fatalf("lines do not match, expected: %s got: %s", line, s)
}
}
}
73 changes: 59 additions & 14 deletions libbeat/reader/readjson/docker_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ type DockerJSONReader struct {
lineBuffer []byte
lineBufferBytes int

totalBytes int
totalBytes int
maxBytes int
lineFinished bool
}

type LogLine struct {
Expand All @@ -64,14 +66,16 @@ type LogLine struct {
}

// New creates a new reader renaming a field
func New(r reader.Reader, stream string, partial bool, forceCRI bool, CRIFlags bool, batchMode bool) *DockerJSONReader {
func New(r reader.Reader, stream string, partial bool, forceCRI bool, CRIFlags bool, batchMode bool, maxBytes int) *DockerJSONReader {
reader := DockerJSONReader{
stream: stream,
partial: partial,
reader: r,
forceCRI: forceCRI,
criflags: CRIFlags,
batchMode: batchMode,
stream: stream,
partial: partial,
reader: r,
forceCRI: forceCRI,
criflags: CRIFlags,
batchMode: batchMode,
maxBytes: maxBytes,
lineFinished: false,
}

if runtime.GOOS == "windows" {
Expand Down Expand Up @@ -273,6 +277,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) {

func (p *DockerJSONReader) next() (reader.Message, error) {
var nbytes int
p.lineFinished = false
for {
message, err := p.reader.Next()

Expand Down Expand Up @@ -305,7 +310,17 @@ func (p *DockerJSONReader) next() (reader.Message, error) {
if err != nil {
return message, err
}
message.Content = append(message.Content, next.Content...)
// 当行buffer与当前msg内容相加超过最大字节数,将buffer中的内容截断返回
if len(message.Content)+len(next.Content) > p.maxBytes {
// 计算截断位置
truncateIdx := p.maxBytes - len(message.Content)
// 未截断部分写入texts,清空缓冲区
message.Content = append(message.Content, next.Content[:truncateIdx]...)
p.lineFinished = true
}
if !p.lineFinished {
message.Content = append(message.Content, next.Content...)
}
}

if p.stream != "all" && p.stream != logLine.Stream {
Expand All @@ -319,6 +334,7 @@ func (p *DockerJSONReader) next() (reader.Message, error) {
func (p *DockerJSONReader) batchNext() (reader.Message, error) {

for {

message, err := p.reader.Next()

message.Bytes += p.totalBytes
Expand All @@ -334,7 +350,6 @@ func (p *DockerJSONReader) batchNext() (reader.Message, error) {

// 当前位置
var offset int

for offset < len(buffer) {
// 继续解析下一行日志
idx := bytes.Index(buffer[offset:], []byte{'\n'})
Expand All @@ -347,7 +362,9 @@ func (p *DockerJSONReader) batchNext() (reader.Message, error) {

var logLine LogLine

content, err := p.batchParseLine(buffer[offset:offset+idx], &logLine)
// json解析前原始日志
rawContent := buffer[offset : offset+idx]
content, err := p.batchParseLine(rawContent, &logLine)

// 指针往前推移
offset += idx
Expand All @@ -362,16 +379,44 @@ func (p *DockerJSONReader) batchNext() (reader.Message, error) {
if p.lineBuffer == nil {
p.lineBuffer = make([]byte, 0, len(content)*4)
}
p.lineBuffer = append(p.lineBuffer, content...)
p.lineBufferBytes += idx
// 当行buffer与当前msg内容相加超过最大字节数,将buffer中的内容截断返回
if len(p.lineBuffer)+len(content) > p.maxBytes {
// 计算截断位置
truncateIdx := p.maxBytes - len(p.lineBuffer)

// 未截断部分写入texts,清空缓冲区
texts = append(texts, append(p.lineBuffer, content[:truncateIdx]...))

// 清空缓冲区,此行标记为返回结束
p.lineBuffer = nil
p.lineBufferBytes = 0
p.lineFinished = true
} else if !p.lineFinished {
p.lineBuffer = append(p.lineBuffer, content...)
p.lineBufferBytes += idx
}
continue
}

if p.stream == "all" || p.stream == logLine.Stream {
texts = append(texts, append(p.lineBuffer, content...))
if !p.lineFinished {
if len(p.lineBuffer)+len(content) > p.maxBytes {
// 计算截断位置
truncateIdx := p.maxBytes - len(p.lineBuffer)

// 未截断部分写入texts,清空缓冲区
texts = append(texts, append(p.lineBuffer, content[:truncateIdx]...))

// 清空缓冲区,此行标记为读取结束
p.lineFinished = true
} else {
texts = append(texts, append(p.lineBuffer, content...))
}
}
}

// 行日志已经结束,直接清空缓存
p.lineFinished = false
p.lineBuffer = nil
p.lineBufferBytes = 0
}
Expand Down
Loading