From 33b11dbf6a15a7011833645bffd34ee82e4fcb27 Mon Sep 17 00:00:00 2001 From: yiqiwang-17 <66986742+yiqiwang-17@users.noreply.github.com> Date: Mon, 16 Dec 2024 14:52:44 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20forwarders=E5=B9=B6=E5=8F=91=E5=86=B2?= =?UTF-8?q?=E7=AA=81=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- filebeat/input/log/reader.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/filebeat/input/log/reader.go b/filebeat/input/log/reader.go index ceffc6c19ee..09a2d6d2282 100644 --- a/filebeat/input/log/reader.go +++ b/filebeat/input/log/reader.go @@ -358,9 +358,11 @@ func (h *FileHarvester) Run() { case <-tick.C: if len(newForwarders) > 0 { logp.Info("found new forward join, reload file:%s", h.state.Source) + h.forwardersLock.Lock() for _, reuseReader := range newForwarders { h.forwarders[reuseReader.HarvesterID] = reuseReader } + h.forwardersLock.Unlock() newForwarders = make([]*ReuseHarvester, 0) offset, err := h.reloadFileOffset() @@ -378,6 +380,7 @@ func (h *FileHarvester) Run() { h.readerDone.Add(1) go h.loopRead() } else { + h.forwardersLock.Lock() for _, reuseReader := range h.forwarders { select { case <-reuseReader.done: @@ -386,6 +389,7 @@ func (h *FileHarvester) Run() { default: } } + h.forwardersLock.Unlock() } if len(h.forwarders) > 0 { @@ -449,8 +453,13 @@ func (h *FileHarvester) forward(message reader.Message, err error) { message: message, error: err, } - + reuseReaders := make([]*ReuseHarvester, 0) + h.forwardersLock.Lock() for _, reuseReader := range h.forwarders { + reuseReaders = append(reuseReaders, reuseReader) + } + h.forwardersLock.Unlock() + for _, reuseReader := range reuseReaders { select { case <-h.done: return