diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml new file mode 100644 index 0000000..c384298 --- /dev/null +++ b/.github/workflows/go.yml @@ -0,0 +1,20 @@ +name: go test + +on: + push: + branches: [ master ] + pull_request: + branches: [ master ] + +jobs: + + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - uses: actions/setup-go@v2 + with: + go-version: 1.17 + + - run: go test ./... diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 2a81ab4..6a40fe1 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -74,18 +74,64 @@ conditional, limiting payload data, and multiple receivers. Segments in this group have the ability to change the sequence of segments any given flow traverses. -#### blackhole -The `blackhole` segment is used to drain a pipeline, effectively starting a new -pipeline after it. In conjunction with `skip`, this can act as a `flowfilter`. +#### branch +The `branch` segment is used to select the further progression of the pipeline +between to branches. To this end, it uses additional syntax that other segments +do not have access to, namely the `if`, `then` and `else` keys which can +contain lists of segments that constitute embedded pipelines. + +The any of these three keys may be empty and they are by default. The `if` +segments receive the flows entering the `branch` segment unconditionally. If +the segments in `if` proceed any flow from the input all the way to the end of +the `if` segments, this flow will be moved on to the `then` segments. If flows +are dropped at any point within the `if` segments, they will be moved on to the +`else` branch immediately, shortcutting the traversal of the `if` segments. Any +edits made to flows during the `if` segments will be persisted in either +branch, `then` and `else`, as well as after the flows passed from the `branch` +segment into consecutive segments. Dropping flows behaves regularly in both +branches, but note that flows can not be dropped within the `if` branch +segments, as this is taken as a cue to move them into the `else` branch. + +If any of these three lists of segments (or subpipelines) is empty, the +`branch` segment will behave as if this subpipeline consisted of a single +`pass` segment. + +Instead of a minimal example, the following more elaborate one highlights all +TCP flows while printing to standard output and keeps only these highlighted +ones in a sqlite export: + +``` +- segment: branch + if: + - segment: flowfilter + config: + filter: proto tcp + - segment: elephant + then: + - segment: printflowdump + config: + highlight: 1 + else: + - segment: printflowdump + - segment: drop -``` -- segment: blackhole +- segment: sqlite + config: + filename: tcponly.sqlite ``` -[godoc](https://pkg.go.dev/github.com/bwNetFlow/flowpipeline/segments/controlflow/blackhole) -[examples using this segment](https://github.com/search?q=%22segment%3A+blackhole%22+extension%3Ayml+repo%3AbwNetFlow%2Fflowpipeline%2Fexamples&type=Code) +[godoc](https://pkg.go.dev/github.com/bwNetFlow/flowpipeline/segments/controlflow/branch) +[examples using this segment](https://github.com/search?q=%22segment%3A+branch%22+extension%3Ayml+repo%3AbwNetFlow%2Fflowpipeline%2Fexamples&type=Code) + #### skip + +*DEPRECATION NOTICE*: This segment will be deprecated in a future version of +flowpipeline. In any but the most convoluted examples, the `branch` segment +documented directly above is the clearer and more legible choice. This will +also greatly simplify the setup internals of segments. + + The `skip` segment is used to conditionally skip over segments behind it. For instance, in front of a export segment a condition such as `proto tcp` with a skip value of `1` would result in any TCP flows not being exported by the @@ -142,6 +188,17 @@ Segments in this group all drop flows, i.e. remove them from the pipeline from this segment on. Fields in individual flows are never modified, only used as criteria. +#### drop +The `drop` segment is used to drain a pipeline, effectively starting a new +pipeline after it. In conjunction with `skip`, this can act as a `flowfilter`. + +``` +- segment: drop +``` + +[godoc](https://pkg.go.dev/github.com/bwNetFlow/flowpipeline/segments/filter/drop) +[examples using this segment](https://github.com/search?q=%22segment%3A+drop%22+extension%3Ayml+repo%3AbwNetFlow%2Fflowpipeline%2Fexamples&type=Code) + #### elephant The `elephant` segment uses a configurable sliding window to determine flow statistics at runtime and filter out unremarkable flows from the pipeline. This @@ -701,8 +758,8 @@ for an application. This is for internally used segments only. -#### noop -The `noop` segment serves as a heavily annotated template for new segments. So +#### pass +The `pass` segment serves as a heavily annotated template for new segments. So does this piece of documentation. Aside from summarizing what a segment does, it should include a description of all the parameters it accepts as well as any caveats users should be aware of. @@ -712,12 +769,12 @@ Roadmap: * and here ``` -- segment: noop +- segment: pass # the lines below are optional and set to default config: jk: this segment actually has no config at all, its just for this template ``` [any additional links](https://bwnet.belwue.de) -[godoc](https://pkg.go.dev/github.com/bwNetFlow/flowpipeline/segments/noop) -[examples using this segment](https://github.com/search?q=%22segment%3A+noop%22+extension%3Ayml+repo%3AbwNetFlow%2Fflowpipeline%2Fexamples&type=Code) +[godoc](https://pkg.go.dev/github.com/bwNetFlow/flowpipeline/segments/pass) +[examples using this segment](https://github.com/search?q=%22segment%3A+pass%22+extension%3Ayml+repo%3AbwNetFlow%2Fflowpipeline%2Fexamples&type=Code) diff --git a/Makefile b/Makefile index 1fe7c91..b5f6da8 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ test: go test ./... -cover bench: - @go test -bench=. -benchtime=1ns ./segments/noop | grep "cpu:" + @go test -bench=. -benchtime=1ns ./segments/pass | grep "cpu:" @echo "results:" @go test -bench=. -run=Bench ./... | grep -E "^Bench" | awk '{fps = 1/(($$3)/1e9); sub(/Benchmark/, "", $$1); sub(/-.*/, "", $$1); printf("%15s: %8s ns/flow, %7.0f flows/s\n", tolower($$1), $$3, fps)}' @rm segments/output/sqlite/bench.sqlite diff --git a/main.go b/main.go index 46b95ea..5ec763e 100644 --- a/main.go +++ b/main.go @@ -17,12 +17,12 @@ import ( _ "github.com/bwNetFlow/flowpipeline/segments/alert/http" - _ "github.com/bwNetFlow/flowpipeline/segments/controlflow/blackhole" - _ "github.com/bwNetFlow/flowpipeline/segments/controlflow/skip" + _ "github.com/bwNetFlow/flowpipeline/segments/controlflow/branch" _ "github.com/bwNetFlow/flowpipeline/segments/export/influx" _ "github.com/bwNetFlow/flowpipeline/segments/export/prometheus" + _ "github.com/bwNetFlow/flowpipeline/segments/filter/drop" _ "github.com/bwNetFlow/flowpipeline/segments/filter/elephant" _ "github.com/bwNetFlow/flowpipeline/segments/filter/flowfilter" @@ -40,7 +40,7 @@ import ( _ "github.com/bwNetFlow/flowpipeline/segments/modify/remoteaddress" _ "github.com/bwNetFlow/flowpipeline/segments/modify/snmp" - _ "github.com/bwNetFlow/flowpipeline/segments/noop" + _ "github.com/bwNetFlow/flowpipeline/segments/pass" _ "github.com/bwNetFlow/flowpipeline/segments/output/csv" _ "github.com/bwNetFlow/flowpipeline/segments/output/json" @@ -69,6 +69,7 @@ func main() { return } pipe := pipeline.NewFromConfig(config) + pipe.Start() pipe.AutoDrain() sigs := make(chan os.Signal, 1) diff --git a/pipeline/config.go b/pipeline/config.go index dfb4522..f1359a0 100644 --- a/pipeline/config.go +++ b/pipeline/config.go @@ -7,18 +7,22 @@ import ( "strconv" "github.com/bwNetFlow/flowpipeline/segments" + "github.com/bwNetFlow/flowpipeline/segments/controlflow/branch" "gopkg.in/yaml.v2" ) // A config representation of a segment. It is intended to look like this: -// - segment: noop +// - segment: pass // config: // key: value // foo: bar // This struct has the appropriate yaml tags inline. type SegmentRepr struct { - Name string `yaml:"segment"` // to be looked up with a registry - Config map[string]string `yaml:"config"` // to be expanded by our instance + Name string `yaml:"segment"` // to be looked up with a registry + Config map[string]string `yaml:"config"` // to be expanded by our instance + If []SegmentRepr `yaml:"if,omitempty,flow"` // only used by group segment + Then []SegmentRepr `yaml:"then,omitempty,flow"` // only used by group segment + Else []SegmentRepr `yaml:"else,omitempty,flow"` // only used by group segment } // Returns the SegmentRepr's Config with all its variables expanded. It tries @@ -47,24 +51,40 @@ func (s *SegmentRepr) ExpandedConfig() map[string]string { // initializes a Pipeline with them. func NewFromConfig(config []byte) *Pipeline { // parse a list of SegmentReprs from yaml - pipelineRepr := new([]SegmentRepr) + segmentReprs := new([]SegmentRepr) - err := yaml.Unmarshal(config, &pipelineRepr) + err := yaml.Unmarshal(config, &segmentReprs) if err != nil { log.Fatalf("[error] Error parsing configuration YAML: %v", err) } + segments := SegmentsFromRepr(segmentReprs) + // we have SegmentReprs parsed, instanciate them as actual Segments - segmentList := make([]segments.Segment, len(*pipelineRepr)) - for i, segmentrepr := range *pipelineRepr { - segmenttype := segments.LookupSegment(segmentrepr.Name) // a typed nil instance + return New(segments...) +} + +// Creates a list of Segments from their config representations. Handles +// recursive definitions found in Segments. +func SegmentsFromRepr(segmentReprs *[]SegmentRepr) []segments.Segment { + segmentList := make([]segments.Segment, len(*segmentReprs)) + for i, segmentrepr := range *segmentReprs { + segmentTemplate := segments.LookupSegment(segmentrepr.Name) // a typed nil instance // the Segment's New method knows how to handle our config - segment := segmenttype.New(segmentrepr.ExpandedConfig()) + segment := segmentTemplate.New(segmentrepr.ExpandedConfig()) + switch segment := segment.(type) { // handle special segments + case *branch.Branch: + segment.ImportBranches( + New(SegmentsFromRepr(&segmentrepr.If)...), + New(SegmentsFromRepr(&segmentrepr.Then)...), + New(SegmentsFromRepr(&segmentrepr.Else)...), + ) + } if segment != nil { segmentList[i] = segment } else { log.Fatalf("[error] Configured segment '%s' could not be initialized properly, see previous messages.", segmentrepr.Name) } } - return New(segmentList...) + return segmentList } diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 7040ecc..59cd137 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -6,6 +6,10 @@ import ( "sync" "github.com/bwNetFlow/flowpipeline/segments" + "github.com/bwNetFlow/flowpipeline/segments/filter/drop" + "github.com/bwNetFlow/flowpipeline/segments/filter/elephant" + "github.com/bwNetFlow/flowpipeline/segments/filter/flowfilter" + "github.com/bwNetFlow/flowpipeline/segments/pass" flow "github.com/bwNetFlow/protobuf/go" ) @@ -15,17 +19,48 @@ import ( type Pipeline struct { In chan *flow.FlowMessage Out <-chan *flow.FlowMessage + Drop chan *flow.FlowMessage wg *sync.WaitGroup - SegmentList []segments.Segment `yaml: segments` + SegmentList []segments.Segment +} + +func (pipeline *Pipeline) GetInput() chan *flow.FlowMessage { + return pipeline.In +} + +func (pipeline *Pipeline) GetOutput() <-chan *flow.FlowMessage { + return pipeline.Out +} + +func (pipeline *Pipeline) GetDrop() <-chan *flow.FlowMessage { + if pipeline.Drop != nil { + return pipeline.Drop + } + pipeline.Drop = make(chan *flow.FlowMessage) + // Subscribe to drops from special segments, namely all based on + // BaseFilterSegment grouped in the filter directory. + for _, segment := range pipeline.SegmentList { + switch typedSegment := segment.(type) { + case *drop.Drop: + typedSegment.SubscribeDrops(pipeline.Drop) + case *elephant.Elephant: + typedSegment.SubscribeDrops(pipeline.Drop) + case *flowfilter.FlowFilter: + typedSegment.SubscribeDrops(pipeline.Drop) + } + } + // If there are no filter/* segments, this channel will never have + // messages available. + return pipeline.Drop } // Starts up a goroutine specific to this Pipeline which reads any message from // the Out channel and discards it. This is a convenience function to enable // having a segment at the end of the pipeline handle all results, i.e. having // no post-pipeline processing. -func (pipeline Pipeline) AutoDrain() { +func (pipeline *Pipeline) AutoDrain() { go func() { - for _ = range pipeline.Out { + for range pipeline.Out { } log.Println("[info] Pipeline closed, auto draining finished.") }() @@ -35,7 +70,7 @@ func (pipeline Pipeline) AutoDrain() { // segments to propagate this close event through the full pipeline, // terminating all segment goroutines and thus releasing the waitgroup. // Blocking. -func (pipeline Pipeline) Close() { +func (pipeline *Pipeline) Close() { defer func() { recover() // in case In is already closed pipeline.wg.Wait() @@ -47,19 +82,22 @@ func (pipeline Pipeline) Close() { // therein. Initialization includes creating any intermediate channels and // wiring up the segments in the segmentList with them. func New(segmentList ...segments.Segment) *Pipeline { - wg := sync.WaitGroup{} - // the following loops need to be split so that Rewire can work with - // non-nil channels from further ahead in the pipeline... important for - // skip segments + if len(segmentList) == 0 { + segmentList = []segments.Segment{&pass.Pass{}} + } channels := make([]chan *flow.FlowMessage, len(segmentList)+1) channels[0] = make(chan *flow.FlowMessage) - for i, _ := range segmentList { + for i, segment := range segmentList { channels[i+1] = make(chan *flow.FlowMessage) + segment.Rewire(channels[i], channels[i+1]) } - for i, segment := range segmentList { - segment.Rewire(channels, uint(i), uint(i+1)) - wg.Add(1) - go segment.Run(&wg) + return &Pipeline{In: channels[0], Out: channels[len(channels)-1], wg: &sync.WaitGroup{}, SegmentList: segmentList} +} + +// Starts the Pipeline by starting all segment goroutines therein. +func (pipeline *Pipeline) Start() { + for _, segment := range pipeline.SegmentList { + pipeline.wg.Add(1) + go segment.Run(pipeline.wg) } - return &Pipeline{In: channels[0], Out: channels[len(channels)-1], wg: &wg, SegmentList: segmentList} } diff --git a/pipeline/pipeline_test.go b/pipeline/pipeline_test.go index 6bef350..881b7ce 100644 --- a/pipeline/pipeline_test.go +++ b/pipeline/pipeline_test.go @@ -4,13 +4,18 @@ import ( "testing" "github.com/bwNetFlow/flowpipeline/segments" - "github.com/bwNetFlow/flowpipeline/segments/noop" + "github.com/bwNetFlow/flowpipeline/segments/pass" flow "github.com/bwNetFlow/protobuf/go" + + _ "github.com/bwNetFlow/flowpipeline/segments/filter/drop" + _ "github.com/bwNetFlow/flowpipeline/segments/modify/dropfields" + _ "github.com/bwNetFlow/flowpipeline/segments/testing/generator" ) func TestPipelineBuild(t *testing.T) { - segmentList := []segments.Segment{&noop.NoOp{}, &noop.NoOp{}} + segmentList := []segments.Segment{&pass.Pass{}, &pass.Pass{}} pipeline := New(segmentList...) + pipeline.Start() pipeline.In <- &flow.FlowMessage{Type: 3} fmsg := <-pipeline.Out if fmsg.Type != 3 { @@ -19,8 +24,9 @@ func TestPipelineBuild(t *testing.T) { } func TestPipelineTeardown(t *testing.T) { - segmentList := []segments.Segment{&noop.NoOp{}, &noop.NoOp{}} + segmentList := []segments.Segment{&pass.Pass{}, &pass.Pass{}} pipeline := New(segmentList...) + pipeline.Start() pipeline.AutoDrain() pipeline.In <- &flow.FlowMessage{Type: 3} pipeline.Close() // fail test on halting ;) @@ -28,13 +34,99 @@ func TestPipelineTeardown(t *testing.T) { func TestPipelineConfigSuccess(t *testing.T) { pipeline := NewFromConfig([]byte(`--- -- segment: noop +- segment: pass config: foo: $baz bar: $0`)) + pipeline.Start() pipeline.In <- &flow.FlowMessage{Type: 3} fmsg := <-pipeline.Out if fmsg.Type != 3 { t.Error("Pipeline built from config is not working.") } } + +func Test_Branch_passthrough(t *testing.T) { + pipeline := NewFromConfig([]byte(`--- +- segment: branch + if: + - segment: flowfilter + config: + filter: proto tcp + then: + - segment: dropfields + config: + policy: drop + fields: InIf + else: + - segment: dropfields + config: + policy: drop + fields: OutIf +`)) + pipeline.Start() + pipeline.In <- &flow.FlowMessage{Proto: 6, InIf: 1, OutIf: 1} + fmsg := <-pipeline.Out + if fmsg.Proto != 6 || fmsg.InIf == 1 || fmsg.OutIf != 1 { + t.Errorf("Branch segment did not work correctly, state is Proto %d, InIf %d, OutIf %d, should be (6, 0, 1).", fmsg.Proto, fmsg.InIf, fmsg.OutIf) + } + pipeline.In <- &flow.FlowMessage{Proto: 42, InIf: 1, OutIf: 1} + fmsg = <-pipeline.Out + if fmsg.Proto != 42 || fmsg.InIf != 1 || fmsg.OutIf == 1 { + t.Errorf("Branch segment did not work correctly, state is Proto %d, InIf %d, OutIf %d, should be (42, 1, 0).", fmsg.Proto, fmsg.InIf, fmsg.OutIf) + } +} + +func Test_Branch_DeadlockFreeGeneration_If(t *testing.T) { + pipeline := NewFromConfig([]byte(`--- +- segment: branch + if: + - segment: generator + - segment: flowfilter + config: + filter: proto tcp + then: + - segment: dropfields + config: + policy: drop + fields: Bytes +`)) + pipeline.Start() + pipeline.In <- &flow.FlowMessage{Proto: 42, Bytes: 42} + for i := 0; i < 5; i++ { + fmsg := <-pipeline.Out + if fmsg.Proto == 6 && fmsg.Bytes != 0 { + t.Errorf("Branch segment did not work correctly, state is Proto %d, Bytes %d, should be (6, 0).", fmsg.Proto, fmsg.Bytes) + } else if fmsg.Proto == 42 && fmsg.Bytes != 42 { + t.Errorf("Branch segment did not work correctly, state is Proto %d, Bytes %d, should be (42, 42).", fmsg.Proto, fmsg.Bytes) + } + } +} + +func Test_Branch_DeadlockFreeGeneration_Then(t *testing.T) { + pipeline := NewFromConfig([]byte(`--- +- segment: branch + then: + - segment: generator +`)) + pipeline.Start() + pipeline.In <- &flow.FlowMessage{Proto: 42, Bytes: 42} + for i := 0; i < 5; i++ { + // no checks, not timeouting is enough + <-pipeline.Out + } +} + +func Test_Branch_DeadlockFreeGeneration_Else(t *testing.T) { + pipeline := NewFromConfig([]byte(`--- +- segment: branch + else: + - segment: generator +`)) + pipeline.Start() + pipeline.In <- &flow.FlowMessage{Proto: 42, Bytes: 42} + for i := 0; i < 5; i++ { + // no checks, not timeouting is enough + <-pipeline.Out + } +} diff --git a/segments/alert/http/http_test.go b/segments/alert/http/http_test.go index 3321907..55467c7 100644 --- a/segments/alert/http/http_test.go +++ b/segments/alert/http/http_test.go @@ -1,25 +1,12 @@ package http import ( - "log" - "os" "testing" "github.com/bwNetFlow/flowpipeline/segments" flow "github.com/bwNetFlow/protobuf/go" - "github.com/hashicorp/logutils" ) -func TestMain(m *testing.M) { - log.SetOutput(&logutils.LevelFilter{ - Levels: []logutils.LogLevel{"info", "warning", "error"}, - MinLevel: logutils.LogLevel("info"), - Writer: os.Stderr, - }) - code := m.Run() - os.Exit(code) -} - // Http Segment test, passthrough test func TestSegment_Http_passthrough(t *testing.T) { result := segments.TestSegment("http", map[string]string{"url": "http://localhost:8000"}, diff --git a/segments/controlflow/blackhole/blackhole.go b/segments/controlflow/blackhole/blackhole.go deleted file mode 100644 index 7ed1aa2..0000000 --- a/segments/controlflow/blackhole/blackhole.go +++ /dev/null @@ -1,30 +0,0 @@ -package blackhole - -import ( - "sync" - - "github.com/bwNetFlow/flowpipeline/segments" -) - -type Blackhole struct { - segments.BaseSegment -} - -func (segment Blackhole) New(config map[string]string) segments.Segment { - return &Blackhole{} -} - -func (segment *Blackhole) Run(wg *sync.WaitGroup) { - defer func() { - close(segment.Out) - wg.Done() - }() - - for range segment.In { - } -} - -func init() { - segment := &Blackhole{} - segments.RegisterSegment("blackhole", segment) -} diff --git a/segments/controlflow/branch/branch.go b/segments/controlflow/branch/branch.go new file mode 100644 index 0000000..34c7b06 --- /dev/null +++ b/segments/controlflow/branch/branch.go @@ -0,0 +1,109 @@ +package branch + +import ( + "log" + "sync" + + "github.com/bwNetFlow/flowpipeline/segments" + flow "github.com/bwNetFlow/protobuf/go" +) + +// This mirrors the proper implementation in the pipeline package. This +// duplication is to avoid the import cycle. +type Pipeline interface { + Start() + Close() + GetInput() chan *flow.FlowMessage + GetOutput() <-chan *flow.FlowMessage + GetDrop() <-chan *flow.FlowMessage +} + +type Branch struct { + segments.BaseSegment + condition Pipeline + then_branch Pipeline + else_branch Pipeline +} + +func (segment Branch) New(config map[string]string) segments.Segment { + return &Branch{} +} + +func (segment *Branch) ImportBranches(condition interface{}, then_branch interface{}, else_branch interface{}) { + segment.condition = condition.(Pipeline) + segment.then_branch = then_branch.(Pipeline) + segment.else_branch = else_branch.(Pipeline) +} + +func (segment *Branch) Run(wg *sync.WaitGroup) { + if segment.condition == nil || segment.then_branch == nil || segment.else_branch == nil { + log.Println("[error] Branch: Uninitialized branches. This is expected during standalone testing of this package. The actual test is done as part of the pipeline package, as this segment embeds further pipelines.") + return + } + defer func() { + segment.condition.Close() + segment.then_branch.Close() + segment.else_branch.Close() + close(segment.Out) + wg.Done() + }() + + go segment.condition.Start() + go segment.then_branch.Start() + go segment.else_branch.Start() + + go func() { // drain our output + from_then := segment.then_branch.GetOutput() + from_else := segment.else_branch.GetOutput() + for { + select { + case msg, ok := <-from_then: + if !ok { + from_then = nil + } else { + segment.Out <- msg + } + case msg, ok := <-from_else: + if !ok { + from_else = nil + } else { + segment.Out <- msg + } + } + if from_then == nil && from_else == nil { + return + } + } + }() + go func() { // move anything from conditional to our two branches + from_condition_out := segment.condition.GetOutput() + from_condition_drop := segment.condition.GetDrop() + for { + select { + case msg, ok := <-from_condition_out: + if !ok { + from_condition_out = nil + } else { + segment.then_branch.GetInput() <- msg + } + case msg, ok := <-from_condition_drop: + if !ok { + from_condition_drop = nil + } else { + segment.else_branch.GetInput() <- msg + } + } + if from_condition_out == nil && from_condition_drop == nil { + return + } + } + }() + for msg := range segment.In { // connect our own input to conditional + segment.condition.GetInput() <- msg + } +} + +func init() { + segment := &Branch{} + segments.RegisterSegment("branch", segment) +} diff --git a/segments/controlflow/branch/branch_test.go b/segments/controlflow/branch/branch_test.go new file mode 100644 index 0000000..01928ba --- /dev/null +++ b/segments/controlflow/branch/branch_test.go @@ -0,0 +1,30 @@ +package branch + +import ( + "log" + "sync" + "testing" + + "github.com/bwNetFlow/flowpipeline/segments" + flow "github.com/bwNetFlow/protobuf/go" +) + +// Branch Segment test, passthrough test +// This does not work currently, as segment tests are scoped for segment +// package only, and this specific segment requires some pipeline +// initialization, which would lead to an import cycle. Thus, this test +// confirms that it fails silently, and this segment is instead tested from the +// pipeline package test files. +func TestSegment_Branch_passthrough(t *testing.T) { + segment := segments.LookupSegment("branch").New(map[string]string{}).(*Branch) + if segment == nil { + log.Fatal("[error] Configured segment 'branch' could not be initialized properly, see previous messages.") + } + in, out := make(chan *flow.FlowMessage), make(chan *flow.FlowMessage) + segment.Rewire(in, out) + + wg := &sync.WaitGroup{} + wg.Add(1) + // this would timeout if it worked properly, instead it logs an error and returns + segment.Run(wg) +} diff --git a/segments/controlflow/skip/skip.go b/segments/controlflow/skip/skip.go deleted file mode 100644 index 8a14df8..0000000 --- a/segments/controlflow/skip/skip.go +++ /dev/null @@ -1,94 +0,0 @@ -package skip - -import ( - "log" - "strconv" - "sync" - - "github.com/bwNetFlow/flowfilter/parser" - "github.com/bwNetFlow/flowfilter/visitors" - "github.com/bwNetFlow/flowpipeline/segments" - flow "github.com/bwNetFlow/protobuf/go" -) - -type Skip struct { - segments.BaseSegment - AltOut chan<- *flow.FlowMessage // in addition to the BaseSegment declarations, a second Out is provided - - Condition string // optional, default "", if this is evaluated as true, a skip will occur - Skip uint // optional, default 1, the number of segments to skip after this one when condition is true - Invert bool // optional, default false, whether to invert the conditional behaviour - - expression *parser.Expression -} - -func (segment Skip) New(config map[string]string) segments.Segment { - var err error - var skip uint = 1 - // Skip parameter is parsed but not validated, as that can only be done - // in the context of a pipeline, i.e. when using Rewire. - if parsedSkip, err := strconv.ParseUint(config["skip"], 10, 32); err == nil { - skip = uint(parsedSkip) - } else { - if config["skip"] != "" { - log.Println("[error] Skip: Could not parse 'skip' parameter, using default 1.") - } else { - log.Println("[info] Skip: 'skip' set to default 1.") - } - } - - invert, err := strconv.ParseBool(config["invert"]) - if err != nil { - log.Println("[info] Skip: 'invert' set to default 'false'.") - } - - newSegment := &Skip{ - Skip: skip, - Invert: invert, - Condition: config["condition"], - } - - newSegment.expression, err = parser.Parse(config["condition"]) - if err != nil { - log.Printf("[error] Skip: Syntax error in condition expression: %v", err) - return nil - } - filter := &visitors.Filter{} - if _, err := filter.CheckFlow(newSegment.expression, &flow.FlowMessage{}); err != nil { - log.Printf("[error] Skip: Semantic error in filter expression: %v", err) - return nil - } - return newSegment -} - -func (segment *Skip) Run(wg *sync.WaitGroup) { - defer func() { - close(segment.Out) - wg.Done() - }() - - filter := &visitors.Filter{} - for msg := range segment.In { - if match, _ := filter.CheckFlow(segment.expression, msg); match != segment.Invert { - segment.AltOut <- msg - } else { - segment.Out <- msg - } - } -} - -// Override default Rewire method for this segment. -func (segment *Skip) Rewire(chans []chan *flow.FlowMessage, in uint, out uint) { - segment.In = chans[in] - segment.Out = chans[out] - if out+segment.Skip > uint(len(chans))-1 { - log.Printf("[error] Skip: Configuration error, skipping the next %d segments is beyond the pipelines end, defaulting to 0.", segment.Skip) - segment.Skip = 0 - } - segment.AltOut = chans[out+segment.Skip] -} - -func init() { - segment := &Skip{} - segments.RegisterSegment("skip", segment) -} diff --git a/segments/controlflow/skip/skip_test.go b/segments/controlflow/skip/skip_test.go deleted file mode 100644 index e192c97..0000000 --- a/segments/controlflow/skip/skip_test.go +++ /dev/null @@ -1,80 +0,0 @@ -package skip - -import ( - "log" - "os" - "sync" - "testing" - - "github.com/bwNetFlow/flowpipeline/segments" - flow "github.com/bwNetFlow/protobuf/go" - "github.com/hashicorp/logutils" -) - -func TestMain(m *testing.M) { - log.SetOutput(&logutils.LevelFilter{ - Levels: []logutils.LogLevel{"info", "warning", "error"}, - MinLevel: logutils.LogLevel("info"), - Writer: os.Stderr, - }) - code := m.Run() - os.Exit(code) -} - -// Skip Segment test, passthrough test -func TestSegment_Skip_passthrough(t *testing.T) { - result := segments.TestSegment("skip", map[string]string{}, - &flow.FlowMessage{Type: 3}) - if result.Type != 3 { - t.Error("Segment Skip is not working.") - } -} - -// Skip Segment test, skip 1 test -func TestSegment_Skip_skip(t *testing.T) { - segment := segments.LookupSegment("skip").New(map[string]string{"skip": "1", "invert": "0", "condition": "port 3"}) - - in, out, altout := make(chan *flow.FlowMessage), make(chan *flow.FlowMessage), make(chan *flow.FlowMessage) - segment.Rewire([]chan *flow.FlowMessage{in, out, altout}, 0, 1) - - wg := &sync.WaitGroup{} - wg.Add(1) - go segment.Run(wg) - - in <- &flow.FlowMessage{SrcPort: 3} - close(in) - select { - case _ = <-out: - t.Error("Segment Skip is not skipping when it is supposed to.") - case result := <-altout: - if result.SrcPort != 3 { - t.Error("Segment Skip is skipping correctly but mangling the data.") - } - } - wg.Wait() -} - -// Skip Segment test, no skip 1 test -func TestSegment_Skip_noskip(t *testing.T) { - segment := segments.LookupSegment("skip").New(map[string]string{"skip": "1", "invert": "0", "condition": "port 4"}) - - in, out, altout := make(chan *flow.FlowMessage), make(chan *flow.FlowMessage), make(chan *flow.FlowMessage) - segment.Rewire([]chan *flow.FlowMessage{in, out, altout}, 0, 1) - - wg := &sync.WaitGroup{} - wg.Add(1) - go segment.Run(wg) - - in <- &flow.FlowMessage{SrcPort: 3} - close(in) - select { - case result := <-out: - if result.SrcPort != 3 { - t.Error("Segment Skip is skipping correctly but mangling the data.") - } - case _ = <-altout: - t.Error("Segment Skip is not skipping when it is supposed to.") - } - wg.Wait() - -} diff --git a/segments/export/influx/influx_test.go b/segments/export/influx/influx_test.go index 8882f53..b201f6d 100644 --- a/segments/export/influx/influx_test.go +++ b/segments/export/influx/influx_test.go @@ -1,25 +1,12 @@ package influx import ( - "log" - "os" "testing" "github.com/bwNetFlow/flowpipeline/segments" flow "github.com/bwNetFlow/protobuf/go" - "github.com/hashicorp/logutils" ) -func TestMain(m *testing.M) { - log.SetOutput(&logutils.LevelFilter{ - Levels: []logutils.LogLevel{"info", "warning", "error"}, - MinLevel: logutils.LogLevel("info"), - Writer: os.Stderr, - }) - code := m.Run() - os.Exit(code) -} - // Influx Segment test, passthrough test only func TestSegment_Influx_passthrough(t *testing.T) { result := segments.TestSegment("influx", map[string]string{"org": "testorg", "bucket": "testbucket", "token": "testtoken"}, diff --git a/segments/export/prometheus/prometheus_test.go b/segments/export/prometheus/prometheus_test.go index 74ee637..5eee61b 100644 --- a/segments/export/prometheus/prometheus_test.go +++ b/segments/export/prometheus/prometheus_test.go @@ -1,25 +1,12 @@ package prometheus import ( - "log" - "os" "testing" "github.com/bwNetFlow/flowpipeline/segments" flow "github.com/bwNetFlow/protobuf/go" - "github.com/hashicorp/logutils" ) -func TestMain(m *testing.M) { - log.SetOutput(&logutils.LevelFilter{ - Levels: []logutils.LogLevel{"info", "warning", "error"}, - MinLevel: logutils.LogLevel("info"), - Writer: os.Stderr, - }) - code := m.Run() - os.Exit(code) -} - // Prometheus Segment test, passthrough test only func TestSegment_PrometheusExporter_passthrough(t *testing.T) { result := segments.TestSegment("prometheus", map[string]string{"endpoint": ":8080"}, diff --git a/segments/filter/drop/drop.go b/segments/filter/drop/drop.go new file mode 100644 index 0000000..95ad5bb --- /dev/null +++ b/segments/filter/drop/drop.go @@ -0,0 +1,33 @@ +package drop + +import ( + "sync" + + "github.com/bwNetFlow/flowpipeline/segments" +) + +type Drop struct { + segments.BaseFilterSegment +} + +func (segment Drop) New(config map[string]string) segments.Segment { + return &Drop{} +} + +func (segment *Drop) Run(wg *sync.WaitGroup) { + defer func() { + close(segment.Out) + wg.Done() + }() + + for msg := range segment.In { + if segment.Drops != nil { + segment.Drops <- msg + } + } +} + +func init() { + segment := &Drop{} + segments.RegisterSegment("drop", segment) +} diff --git a/segments/filter/elephant/elephant.go b/segments/filter/elephant/elephant.go index dc96d46..0e78432 100644 --- a/segments/filter/elephant/elephant.go +++ b/segments/filter/elephant/elephant.go @@ -13,7 +13,7 @@ import ( ) type Elephant struct { - segments.BaseSegment + segments.BaseFilterSegment Aspect string // optional, one of "bytes", "bps", "packets", or "pps", default is "bytes", determines which aspect qualifies a flow as an elephant Percentile float64 // optional, default is 99.00, determines the cutoff percentile for flows being dropped by this segment, i.e. 95.00 corresponds to outputting the top 5% only // TODO: add option to get bottom percent? @@ -156,8 +156,13 @@ func (segment *Elephant) Run(wg *sync.WaitGroup) { if aspect >= threshold { log.Printf("[debug] Elephant: Found elephant with size %d (>=%f)", msg.Bytes, threshold) segment.Out <- msg + continue } } + // implicit "(if inRampup || aspect < threshold) && ..." due to the continue 3 lines above + if segment.Drops != nil { + segment.Drops <- msg + } } } diff --git a/segments/filter/elephant/elephant_test.go b/segments/filter/elephant/elephant_test.go index 891847a..7917e66 100644 --- a/segments/filter/elephant/elephant_test.go +++ b/segments/filter/elephant/elephant_test.go @@ -9,19 +9,8 @@ import ( "github.com/bwNetFlow/flowpipeline/segments" flow "github.com/bwNetFlow/protobuf/go" - "github.com/hashicorp/logutils" ) -func TestMain(m *testing.M) { - log.SetOutput(&logutils.LevelFilter{ - Levels: []logutils.LogLevel{"info", "warning", "error"}, - MinLevel: logutils.LogLevel("info"), - Writer: os.Stderr, - }) - code := m.Run() - os.Exit(code) -} - // Elephant Segment test, passthrough test func TestSegment_Elephant_passthrough(t *testing.T) { segment := segments.LookupSegment("elephant").New(map[string]string{}) @@ -30,13 +19,15 @@ func TestSegment_Elephant_passthrough(t *testing.T) { } in, out := make(chan *flow.FlowMessage), make(chan *flow.FlowMessage) - segment.Rewire([]chan *flow.FlowMessage{in, out}, 0, 1) + segment.Rewire(in, out) wg := &sync.WaitGroup{} wg.Add(1) go segment.Run(wg) in <- &flow.FlowMessage{Bytes: 10} + <-out + in <- &flow.FlowMessage{Bytes: 9} in <- &flow.FlowMessage{Bytes: 100} result := <-out if result.Bytes != 100 { @@ -54,7 +45,7 @@ func BenchmarkElephant(b *testing.B) { segment := Elephant{} in, out := make(chan *flow.FlowMessage), make(chan *flow.FlowMessage) - segment.Rewire([]chan *flow.FlowMessage{in, out}, 0, 1) + segment.Rewire(in, out) wg := &sync.WaitGroup{} wg.Add(1) diff --git a/segments/filter/flowfilter/flowfilter.go b/segments/filter/flowfilter/flowfilter.go index 6ecab0f..7a62165 100644 --- a/segments/filter/flowfilter/flowfilter.go +++ b/segments/filter/flowfilter/flowfilter.go @@ -14,7 +14,7 @@ import ( ) type FlowFilter struct { - segments.BaseSegment + segments.BaseFilterSegment Filter string // optional, default is empty expression *parser.Expression @@ -52,6 +52,8 @@ func (segment *FlowFilter) Run(wg *sync.WaitGroup) { for msg := range segment.In { if match, _ := filter.CheckFlow(segment.expression, msg); match { segment.Out <- msg + } else if segment.Drops != nil { + segment.Drops <- msg } } } diff --git a/segments/filter/flowfilter/flowfilter_test.go b/segments/filter/flowfilter/flowfilter_test.go index 65f0c76..3a6ff13 100644 --- a/segments/filter/flowfilter/flowfilter_test.go +++ b/segments/filter/flowfilter/flowfilter_test.go @@ -10,19 +10,8 @@ import ( "github.com/bwNetFlow/flowpipeline/segments" flow "github.com/bwNetFlow/protobuf/go" - "github.com/hashicorp/logutils" ) -func TestMain(m *testing.M) { - log.SetOutput(&logutils.LevelFilter{ - Levels: []logutils.LogLevel{"info", "warning", "error"}, - MinLevel: logutils.LogLevel("info"), - Writer: os.Stderr, - }) - code := m.Run() - os.Exit(code) -} - // FlowFilter Segment testing is basic, the filtering itself is tested in the flowfilter repo func TestSegment_FlowFilter_accept(t *testing.T) { result := segments.TestSegment("flowfilter", map[string]string{"filter": "proto 4"}, @@ -56,7 +45,7 @@ func BenchmarkFlowFilter(b *testing.B) { segment := FlowFilter{}.New(map[string]string{"filter": "port <50"}) in, out := make(chan *flow.FlowMessage), make(chan *flow.FlowMessage) - segment.Rewire([]chan *flow.FlowMessage{in, out}, 0, 1) + segment.Rewire(in, out) wg := &sync.WaitGroup{} wg.Add(1) diff --git a/segments/input/bpf/bpf_test.go b/segments/input/bpf/bpf_test.go index de5dad9..9bd4a89 100644 --- a/segments/input/bpf/bpf_test.go +++ b/segments/input/bpf/bpf_test.go @@ -1,23 +1,5 @@ package bpf -import ( - "log" - "os" - "testing" - - "github.com/hashicorp/logutils" -) - -func TestMain(m *testing.M) { - log.SetOutput(&logutils.LevelFilter{ - Levels: []logutils.LogLevel{"info", "warning", "error"}, - MinLevel: logutils.LogLevel("info"), - Writer: os.Stderr, - }) - code := m.Run() - os.Exit(code) -} - // Bpf Segment test, passthrough test TODO: how to guarantee device presence on any host // func TestSegment_Bpf_passthrough(t *testing.T) { // result := segments.TestSegment("bpf", map[string]string{"device": "eth0"}, diff --git a/segments/input/goflow/goflow_test.go b/segments/input/goflow/goflow_test.go index e113969..e07df5b 100644 --- a/segments/input/goflow/goflow_test.go +++ b/segments/input/goflow/goflow_test.go @@ -1,25 +1,12 @@ package goflow import ( - "log" - "os" "testing" "github.com/bwNetFlow/flowpipeline/segments" flow "github.com/bwNetFlow/protobuf/go" - "github.com/hashicorp/logutils" ) -func TestMain(m *testing.M) { - log.SetOutput(&logutils.LevelFilter{ - Levels: []logutils.LogLevel{"info", "warning", "error"}, - MinLevel: logutils.LogLevel("info"), - Writer: os.Stderr, - }) - code := m.Run() - os.Exit(code) -} - // Goflow Segment test, passthrough test only, functionality is tested by Goflow package func TestSegment_Goflow_passthrough(t *testing.T) { result := segments.TestSegment("goflow", map[string]string{"port": "2055"}, diff --git a/segments/input/kafkaconsumer/kafkaconsumer_test.go b/segments/input/kafkaconsumer/kafkaconsumer_test.go index 73d8ee7..203ec6d 100644 --- a/segments/input/kafkaconsumer/kafkaconsumer_test.go +++ b/segments/input/kafkaconsumer/kafkaconsumer_test.go @@ -1,23 +1,9 @@ package kafkaconsumer import ( - "log" - "os" "testing" - - "github.com/hashicorp/logutils" ) -func TestMain(m *testing.M) { - log.SetOutput(&logutils.LevelFilter{ - Levels: []logutils.LogLevel{"info", "warning", "error"}, - MinLevel: logutils.LogLevel("info"), - Writer: os.Stderr, - }) - code := m.Run() - os.Exit(code) -} - func TestSegment_KafkaConsumer_instanciation(t *testing.T) { kafkaConsumer := &KafkaConsumer{} result := kafkaConsumer.New(map[string]string{}) diff --git a/segments/input/stdin/stdin_test.go b/segments/input/stdin/stdin_test.go index 8cef0c4..ce2fccf 100644 --- a/segments/input/stdin/stdin_test.go +++ b/segments/input/stdin/stdin_test.go @@ -10,19 +10,8 @@ import ( "github.com/bwNetFlow/flowpipeline/segments" flow "github.com/bwNetFlow/protobuf/go" - "github.com/hashicorp/logutils" ) -func TestMain(m *testing.M) { - log.SetOutput(&logutils.LevelFilter{ - Levels: []logutils.LogLevel{"info", "warning", "error"}, - MinLevel: logutils.LogLevel("info"), - Writer: os.Stderr, - }) - code := m.Run() - os.Exit(code) -} - // StdIn Segment test, passthrough test only func TestSegment_StdIn_passthrough(t *testing.T) { result := segments.TestSegment("stdin", map[string]string{}, @@ -42,7 +31,7 @@ func BenchmarkStdin(b *testing.B) { } in, out := make(chan *flow.FlowMessage), make(chan *flow.FlowMessage) - segment.Rewire([]chan *flow.FlowMessage{in, out}, 0, 1) + segment.Rewire(in, out) wg := &sync.WaitGroup{} wg.Add(1) diff --git a/segments/modify/addcid/addcid_test.go b/segments/modify/addcid/addcid_test.go index 1c20adb..2c0888d 100644 --- a/segments/modify/addcid/addcid_test.go +++ b/segments/modify/addcid/addcid_test.go @@ -9,19 +9,8 @@ import ( "github.com/bwNetFlow/flowpipeline/segments" flow "github.com/bwNetFlow/protobuf/go" - "github.com/hashicorp/logutils" ) -func TestMain(m *testing.M) { - log.SetOutput(&logutils.LevelFilter{ - Levels: []logutils.LogLevel{"info", "warning", "error"}, - MinLevel: logutils.LogLevel("info"), - Writer: os.Stderr, - }) - code := m.Run() - os.Exit(code) -} - // AddCid Segment tests are thorough and try every combination func TestSegment_AddCid_noLocalAddrKeep(t *testing.T) { result := segments.TestSegment("addcid", map[string]string{"filename": "../../../examples/enricher/customer_subnets.csv"}, @@ -71,7 +60,7 @@ func BenchmarkAddCid(b *testing.B) { segment := AddCid{}.New(map[string]string{"filename": "../../../examples/enricher/customer_subnets.csv"}) in, out := make(chan *flow.FlowMessage), make(chan *flow.FlowMessage) - segment.Rewire([]chan *flow.FlowMessage{in, out}, 0, 1) + segment.Rewire(in, out) wg := &sync.WaitGroup{} wg.Add(1) diff --git a/segments/modify/anonymize/anonymize_test.go b/segments/modify/anonymize/anonymize_test.go index d8b0bbf..c7c8274 100644 --- a/segments/modify/anonymize/anonymize_test.go +++ b/segments/modify/anonymize/anonymize_test.go @@ -10,19 +10,8 @@ import ( cryptopan "github.com/Yawning/cryptopan" "github.com/bwNetFlow/flowpipeline/segments" flow "github.com/bwNetFlow/protobuf/go" - "github.com/hashicorp/logutils" ) -func TestMain(m *testing.M) { - log.SetOutput(&logutils.LevelFilter{ - Levels: []logutils.LogLevel{"info", "warning", "error"}, - MinLevel: logutils.LogLevel("info"), - Writer: os.Stderr, - }) - code := m.Run() - os.Exit(code) -} - // Influx Segment test, passthrough test only func TestSegment_Influx_passthrough(t *testing.T) { result := segments.TestSegment("anonymize", map[string]string{"key": "testkey123jfh789fhj456ezhskila73"}, @@ -49,7 +38,7 @@ func BenchmarkAnonymize(b *testing.B) { } in, out := make(chan *flow.FlowMessage), make(chan *flow.FlowMessage) - segment.Rewire([]chan *flow.FlowMessage{in, out}, 0, 1) + segment.Rewire(in, out) wg := &sync.WaitGroup{} wg.Add(1) diff --git a/segments/modify/dropfields/dropfields_test.go b/segments/modify/dropfields/dropfields_test.go index 157d14b..b852685 100644 --- a/segments/modify/dropfields/dropfields_test.go +++ b/segments/modify/dropfields/dropfields_test.go @@ -9,19 +9,8 @@ import ( "github.com/bwNetFlow/flowpipeline/segments" flow "github.com/bwNetFlow/protobuf/go" - "github.com/hashicorp/logutils" ) -func TestMain(m *testing.M) { - log.SetOutput(&logutils.LevelFilter{ - Levels: []logutils.LogLevel{"info", "warning", "error"}, - MinLevel: logutils.LogLevel("info"), - Writer: os.Stderr, - }) - code := m.Run() - os.Exit(code) -} - // DropFields Segment tests are thorough and try every combination func TestSegment_DropFields_policyKeep(t *testing.T) { result := segments.TestSegment("dropfields", map[string]string{"policy": "keep", "fields": "DstAddr"}, @@ -49,7 +38,7 @@ func BenchmarkDropFields(b *testing.B) { segment := DropFields{}.New(map[string]string{"policy": "drop", "fields": "SrcAddr"}) in, out := make(chan *flow.FlowMessage), make(chan *flow.FlowMessage) - segment.Rewire([]chan *flow.FlowMessage{in, out}, 0, 1) + segment.Rewire(in, out) wg := &sync.WaitGroup{} wg.Add(1) diff --git a/segments/modify/geolocation/geolocation_test.go b/segments/modify/geolocation/geolocation_test.go index 9ae4152..5c6beeb 100644 --- a/segments/modify/geolocation/geolocation_test.go +++ b/segments/modify/geolocation/geolocation_test.go @@ -9,19 +9,8 @@ import ( "github.com/bwNetFlow/flowpipeline/segments" flow "github.com/bwNetFlow/protobuf/go" - "github.com/hashicorp/logutils" ) -func TestMain(m *testing.M) { - log.SetOutput(&logutils.LevelFilter{ - Levels: []logutils.LogLevel{"info", "warning", "error"}, - MinLevel: logutils.LogLevel("info"), - Writer: os.Stderr, - }) - code := m.Run() - os.Exit(code) -} - // GeoLocation Segment tests are thorough and try every combination func TestSegment_GeoLocation_noRemoteAddrKeep(t *testing.T) { result := segments.TestSegment("geolocation", map[string]string{"filename": "../../../examples/enricher/GeoLite2-Country-Test.mmdb"}, @@ -71,7 +60,7 @@ func BenchmarkGeoLocation(b *testing.B) { segment := GeoLocation{}.New(map[string]string{"filename": "../../../examples/enricher/GeoLite2-Country-Test.mmdb"}) in, out := make(chan *flow.FlowMessage), make(chan *flow.FlowMessage) - segment.Rewire([]chan *flow.FlowMessage{in, out}, 0, 1) + segment.Rewire(in, out) wg := &sync.WaitGroup{} wg.Add(1) diff --git a/segments/modify/normalize/normalize_test.go b/segments/modify/normalize/normalize_test.go index 7c16d1d..176f0b2 100644 --- a/segments/modify/normalize/normalize_test.go +++ b/segments/modify/normalize/normalize_test.go @@ -9,19 +9,8 @@ import ( "github.com/bwNetFlow/flowpipeline/segments" flow "github.com/bwNetFlow/protobuf/go" - "github.com/hashicorp/logutils" ) -func TestMain(m *testing.M) { - log.SetOutput(&logutils.LevelFilter{ - Levels: []logutils.LogLevel{"info", "warning", "error"}, - MinLevel: logutils.LogLevel("info"), - Writer: os.Stderr, - }) - code := m.Run() - os.Exit(code) -} - // Normalize Segment test, in-flow SampleingRate test func TestSegment_Normalize_inFlowSamplingRate(t *testing.T) { result := segments.TestSegment("normalize", map[string]string{}, @@ -57,7 +46,7 @@ func BenchmarkNormalize(b *testing.B) { segment := Normalize{}.New(map[string]string{}) in, out := make(chan *flow.FlowMessage), make(chan *flow.FlowMessage) - segment.Rewire([]chan *flow.FlowMessage{in, out}, 0, 1) + segment.Rewire(in, out) wg := &sync.WaitGroup{} wg.Add(1) diff --git a/segments/modify/protomap/protomap_test.go b/segments/modify/protomap/protomap_test.go index 8867188..7b44f25 100644 --- a/segments/modify/protomap/protomap_test.go +++ b/segments/modify/protomap/protomap_test.go @@ -9,19 +9,8 @@ import ( "github.com/bwNetFlow/flowpipeline/segments" flow "github.com/bwNetFlow/protobuf/go" - "github.com/hashicorp/logutils" ) -func TestMain(m *testing.M) { - log.SetOutput(&logutils.LevelFilter{ - Levels: []logutils.LogLevel{"info", "warning", "error"}, - MinLevel: logutils.LogLevel("info"), - Writer: os.Stderr, - }) - code := m.Run() - os.Exit(code) -} - // Protomap Segment test, passthrough test only func TestSegment_protomap_passthrough(t *testing.T) { result := segments.TestSegment("protomap", map[string]string{}, @@ -48,7 +37,7 @@ func BenchmarkProtomap(b *testing.B) { segment := Protomap{}.New(map[string]string{}) in, out := make(chan *flow.FlowMessage), make(chan *flow.FlowMessage) - segment.Rewire([]chan *flow.FlowMessage{in, out}, 0, 1) + segment.Rewire(in, out) wg := &sync.WaitGroup{} wg.Add(1) diff --git a/segments/modify/remoteaddress/remoteaddress_test.go b/segments/modify/remoteaddress/remoteaddress_test.go index bf961e2..cb54940 100644 --- a/segments/modify/remoteaddress/remoteaddress_test.go +++ b/segments/modify/remoteaddress/remoteaddress_test.go @@ -9,19 +9,8 @@ import ( "github.com/bwNetFlow/flowpipeline/segments" flow "github.com/bwNetFlow/protobuf/go" - "github.com/hashicorp/logutils" ) -func TestMain(m *testing.M) { - log.SetOutput(&logutils.LevelFilter{ - Levels: []logutils.LogLevel{"info", "warning", "error"}, - MinLevel: logutils.LogLevel("info"), - Writer: os.Stderr, - }) - code := m.Run() - os.Exit(code) -} - // RemoteAddress Segment testing is basically checking whether switch/case is working okay... func TestSegment_RemoteAddress(t *testing.T) { result := segments.TestSegment("remoteaddress", map[string]string{"policy": "border"}, @@ -47,7 +36,7 @@ func BenchmarkRemoteAddress(b *testing.B) { segment := RemoteAddress{}.New(map[string]string{"policy": "cidr", "filename": "../../../examples/enricher/customer_subnets.csv"}) in, out := make(chan *flow.FlowMessage), make(chan *flow.FlowMessage) - segment.Rewire([]chan *flow.FlowMessage{in, out}, 0, 1) + segment.Rewire(in, out) wg := &sync.WaitGroup{} wg.Add(1) diff --git a/segments/modify/snmp/snmp_test.go b/segments/modify/snmp/snmp_test.go index 37c4869..c1d8571 100644 --- a/segments/modify/snmp/snmp_test.go +++ b/segments/modify/snmp/snmp_test.go @@ -1,23 +1,9 @@ package snmp import ( - "log" - "os" "testing" - - "github.com/hashicorp/logutils" ) -func TestMain(m *testing.M) { - log.SetOutput(&logutils.LevelFilter{ - Levels: []logutils.LogLevel{"info", "warning", "error"}, - MinLevel: logutils.LogLevel("info"), - Writer: os.Stderr, - }) - code := m.Run() - os.Exit(code) -} - // SNMPInterface Segment test // TODO: find a way to run this elsewhere, as this currently only works by // having the local 161/udp port forwarded to some router. diff --git a/segments/noop/noop_test.go b/segments/noop/noop_test.go deleted file mode 100644 index db1a188..0000000 --- a/segments/noop/noop_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package noop - -import ( - "io/ioutil" - "log" - "os" - "sync" - "testing" - - "github.com/bwNetFlow/flowpipeline/segments" - flow "github.com/bwNetFlow/protobuf/go" - "github.com/hashicorp/logutils" -) - -func TestMain(m *testing.M) { - log.SetOutput(&logutils.LevelFilter{ - Levels: []logutils.LogLevel{"info", "warning", "error"}, - MinLevel: logutils.LogLevel("info"), - Writer: os.Stderr, - }) - code := m.Run() - os.Exit(code) -} - -// NoOp Segment test, passthrough test -func TestSegment_NoOp_passthrough(t *testing.T) { - result := segments.TestSegment("noop", map[string]string{}, - &flow.FlowMessage{Type: 3}) - if result.Type != 3 { - t.Error("Segment NoOp is not working.") - } -} - -// NoOp Segment benchmark passthrough -func BenchmarkNoOp(b *testing.B) { - log.SetOutput(ioutil.Discard) - os.Stdout, _ = os.Open(os.DevNull) - - segment := NoOp{} - - in, out := make(chan *flow.FlowMessage), make(chan *flow.FlowMessage) - segment.Rewire([]chan *flow.FlowMessage{in, out}, 0, 1) - - wg := &sync.WaitGroup{} - wg.Add(1) - go segment.Run(wg) - - for n := 0; n < b.N; n++ { - in <- &flow.FlowMessage{} - _ = <-out - } - close(in) -} diff --git a/segments/output/csv/csv_test.go b/segments/output/csv/csv_test.go index b20bc82..44db24c 100644 --- a/segments/output/csv/csv_test.go +++ b/segments/output/csv/csv_test.go @@ -10,19 +10,8 @@ import ( "github.com/bwNetFlow/flowpipeline/segments" flow "github.com/bwNetFlow/protobuf/go" - "github.com/hashicorp/logutils" ) -func TestMain(m *testing.M) { - log.SetOutput(&logutils.LevelFilter{ - Levels: []logutils.LogLevel{"info", "warning", "error"}, - MinLevel: logutils.LogLevel("info"), - Writer: os.Stderr, - }) - code := m.Run() - os.Exit(code) -} - // Csv Segment test, passthrough test func TestSegment_Csv_passthrough(t *testing.T) { result := segments.TestSegment("csv", map[string]string{}, @@ -41,7 +30,7 @@ func BenchmarkCsv(b *testing.B) { segment := Csv{}.New(map[string]string{}) in, out := make(chan *flow.FlowMessage), make(chan *flow.FlowMessage) - segment.Rewire([]chan *flow.FlowMessage{in, out}, 0, 1) + segment.Rewire(in, out) wg := &sync.WaitGroup{} wg.Add(1) diff --git a/segments/output/json/json_test.go b/segments/output/json/json_test.go index 0f3dca6..d89b85c 100644 --- a/segments/output/json/json_test.go +++ b/segments/output/json/json_test.go @@ -9,22 +9,11 @@ import ( "github.com/bwNetFlow/flowpipeline/segments" flow "github.com/bwNetFlow/protobuf/go" - "github.com/hashicorp/logutils" ) -func TestMain(m *testing.M) { - log.SetOutput(&logutils.LevelFilter{ - Levels: []logutils.LogLevel{"info", "warning", "error"}, - MinLevel: logutils.LogLevel("info"), - Writer: os.Stderr, - }) - code := m.Run() - os.Exit(code) -} - // Json Segment test, passthrough test only func TestSegment_Json_passthrough(t *testing.T) { - result := segments.TestSegment("stdout", map[string]string{}, + result := segments.TestSegment("json", map[string]string{}, &flow.FlowMessage{}) if result == nil { t.Error("Segment Json is not passing through flows.") @@ -39,7 +28,7 @@ func BenchmarkJson(b *testing.B) { segment := Json{}.New(map[string]string{}) in, out := make(chan *flow.FlowMessage), make(chan *flow.FlowMessage) - segment.Rewire([]chan *flow.FlowMessage{in, out}, 0, 1) + segment.Rewire(in, out) wg := &sync.WaitGroup{} wg.Add(1) diff --git a/segments/output/kafkaproducer/kafkaproducer_test.go b/segments/output/kafkaproducer/kafkaproducer_test.go index 488145f..70632fa 100644 --- a/segments/output/kafkaproducer/kafkaproducer_test.go +++ b/segments/output/kafkaproducer/kafkaproducer_test.go @@ -1,23 +1,9 @@ package kafkaproducer import ( - "log" - "os" "testing" - - "github.com/hashicorp/logutils" ) -func TestMain(m *testing.M) { - log.SetOutput(&logutils.LevelFilter{ - Levels: []logutils.LogLevel{"info", "warning", "error"}, - MinLevel: logutils.LogLevel("info"), - Writer: os.Stderr, - }) - code := m.Run() - os.Exit(code) -} - func TestSegment_KafkaProducer_instanciation(t *testing.T) { kafkaProducer := &KafkaProducer{} result := kafkaProducer.New(map[string]string{}) diff --git a/segments/output/sqlite/sqlite_test.go b/segments/output/sqlite/sqlite_test.go index e405553..6592950 100644 --- a/segments/output/sqlite/sqlite_test.go +++ b/segments/output/sqlite/sqlite_test.go @@ -9,19 +9,8 @@ import ( // "github.com/bwNetFlow/flowpipeline/segments" flow "github.com/bwNetFlow/protobuf/go" - "github.com/hashicorp/logutils" ) -func TestMain(m *testing.M) { - log.SetOutput(&logutils.LevelFilter{ - Levels: []logutils.LogLevel{"info", "warning", "error"}, - MinLevel: logutils.LogLevel("info"), - Writer: os.Stderr, - }) - code := m.Run() - os.Exit(code) -} - // Sqlite Segment test, passthrough test only func TestSegment_Sqlite_passthrough(t *testing.T) { // result := segments.TestSegment("sqlite", map[string]string{"filename": "test.sqlite"}, @@ -32,7 +21,7 @@ func TestSegment_Sqlite_passthrough(t *testing.T) { segment := Sqlite{}.New(map[string]string{"filename": "test.sqlite"}) in, out := make(chan *flow.FlowMessage), make(chan *flow.FlowMessage) - segment.Rewire([]chan *flow.FlowMessage{in, out}, 0, 1) + segment.Rewire(in, out) wg := &sync.WaitGroup{} wg.Add(1) @@ -53,7 +42,7 @@ func BenchmarkSqlite_1000(b *testing.B) { segment := Sqlite{}.New(map[string]string{"filename": "bench.sqlite"}) in, out := make(chan *flow.FlowMessage), make(chan *flow.FlowMessage) - segment.Rewire([]chan *flow.FlowMessage{in, out}, 0, 1) + segment.Rewire(in, out) wg := &sync.WaitGroup{} wg.Add(1) @@ -74,7 +63,7 @@ func BenchmarkSqlite_10000(b *testing.B) { segment := Sqlite{}.New(map[string]string{"filename": "bench.sqlite", "batchsize": "10000"}) in, out := make(chan *flow.FlowMessage), make(chan *flow.FlowMessage) - segment.Rewire([]chan *flow.FlowMessage{in, out}, 0, 1) + segment.Rewire(in, out) wg := &sync.WaitGroup{} wg.Add(1) @@ -95,7 +84,7 @@ func BenchmarkSqlite_100000(b *testing.B) { segment := Sqlite{}.New(map[string]string{"filename": "bench.sqlite", "batchsize": "100000"}) in, out := make(chan *flow.FlowMessage), make(chan *flow.FlowMessage) - segment.Rewire([]chan *flow.FlowMessage{in, out}, 0, 1) + segment.Rewire(in, out) wg := &sync.WaitGroup{} wg.Add(1) diff --git a/segments/noop/noop.go b/segments/pass/pass.go similarity index 83% rename from segments/noop/noop.go rename to segments/pass/pass.go index ed81c2a..78d4872 100644 --- a/segments/noop/noop.go +++ b/segments/pass/pass.go @@ -1,6 +1,6 @@ // Serves as a template for new segments and forwards flows, otherwise does // nothing. -package noop +package pass import ( "sync" @@ -8,18 +8,18 @@ import ( "github.com/bwNetFlow/flowpipeline/segments" ) -// The NoOp Segment is considered a template for any additional Segments, as it +// The Pass Segment is considered a template for any additional Segments, as it // showcases the exact implementation. -type NoOp struct { +type Pass struct { segments.BaseSegment // always embed this, no need to repeat I/O chan code // add any additional fields here } // Every Segment must implement a New method, even if there isn't any config // it is interested in. -func (segment NoOp) New(config map[string]string) segments.Segment { +func (segment Pass) New(config map[string]string) segments.Segment { // do config stuff here, add it to fields maybe - return &NoOp{} + return &Pass{} } // The main goroutine of any Segment. Any Run method must: @@ -30,7 +30,7 @@ func (segment NoOp) New(config map[string]string) segments.Segment { // Usually, when using a range over In in combination with below defer, nothing // will go wrong. However, some segments have a legitimate use case for using // `for {}`, in which case care must be taken to keep draining In. -func (segment *NoOp) Run(wg *sync.WaitGroup) { +func (segment *Pass) Run(wg *sync.WaitGroup) { defer func() { // This defer clause is important and needs to be present in // any Segment.Run method in some form, but with at least the @@ -48,6 +48,6 @@ func (segment *NoOp) Run(wg *sync.WaitGroup) { // callable from config. An unregistered Segment will only be available using // the API. func init() { - segment := &NoOp{} - segments.RegisterSegment("noop", segment) + segment := &Pass{} + segments.RegisterSegment("pass", segment) } diff --git a/segments/pass/pass_test.go b/segments/pass/pass_test.go new file mode 100644 index 0000000..49dbbdc --- /dev/null +++ b/segments/pass/pass_test.go @@ -0,0 +1,42 @@ +package pass + +import ( + "io/ioutil" + "log" + "os" + "sync" + "testing" + + "github.com/bwNetFlow/flowpipeline/segments" + flow "github.com/bwNetFlow/protobuf/go" +) + +// Pass Segment test, passthrough test +func TestSegment_Pass(t *testing.T) { + result := segments.TestSegment("pass", map[string]string{}, + &flow.FlowMessage{Type: 3}) + if result.Type != 3 { + t.Error("Segment Pass is not working.") + } +} + +// Pass Segment benchmark passthrough +func BenchmarkPass(b *testing.B) { + log.SetOutput(ioutil.Discard) + os.Stdout, _ = os.Open(os.DevNull) + + segment := Pass{} + + in, out := make(chan *flow.FlowMessage), make(chan *flow.FlowMessage) + segment.Rewire(in, out) + + wg := &sync.WaitGroup{} + wg.Add(1) + go segment.Run(wg) + + for n := 0; n < b.N; n++ { + in <- &flow.FlowMessage{} + _ = <-out + } + close(in) +} diff --git a/segments/print/count/count_test.go b/segments/print/count/count_test.go index 6938009..1f3ae41 100644 --- a/segments/print/count/count_test.go +++ b/segments/print/count/count_test.go @@ -1,25 +1,12 @@ package count import ( - "log" - "os" "testing" "github.com/bwNetFlow/flowpipeline/segments" flow "github.com/bwNetFlow/protobuf/go" - "github.com/hashicorp/logutils" ) -func TestMain(m *testing.M) { - log.SetOutput(&logutils.LevelFilter{ - Levels: []logutils.LogLevel{"info", "warning", "error"}, - MinLevel: logutils.LogLevel("info"), - Writer: os.Stderr, - }) - code := m.Run() - os.Exit(code) -} - // Count Segment test, passthrough test only func TestSegment_Count_passthrough(t *testing.T) { result := segments.TestSegment("count", map[string]string{"prefix": "Test: "}, diff --git a/segments/print/printdots/printdots_test.go b/segments/print/printdots/printdots_test.go index 5c73cab..3b15060 100644 --- a/segments/print/printdots/printdots_test.go +++ b/segments/print/printdots/printdots_test.go @@ -1,25 +1,12 @@ package printdots import ( - "log" - "os" "testing" "github.com/bwNetFlow/flowpipeline/segments" flow "github.com/bwNetFlow/protobuf/go" - "github.com/hashicorp/logutils" ) -func TestMain(m *testing.M) { - log.SetOutput(&logutils.LevelFilter{ - Levels: []logutils.LogLevel{"info", "warning", "error"}, - MinLevel: logutils.LogLevel("info"), - Writer: os.Stderr, - }) - code := m.Run() - os.Exit(code) -} - // PrintDots Segment test, passthrough test only func TestSegment_PrintDots_passthrough(t *testing.T) { result := segments.TestSegment("printdots", map[string]string{"flowsPerDot": "100"}, diff --git a/segments/print/printflowdump/printflowdump_test.go b/segments/print/printflowdump/printflowdump_test.go index eb089b0..7903be5 100644 --- a/segments/print/printflowdump/printflowdump_test.go +++ b/segments/print/printflowdump/printflowdump_test.go @@ -9,19 +9,8 @@ import ( "github.com/bwNetFlow/flowpipeline/segments" flow "github.com/bwNetFlow/protobuf/go" - "github.com/hashicorp/logutils" ) -func TestMain(m *testing.M) { - log.SetOutput(&logutils.LevelFilter{ - Levels: []logutils.LogLevel{"info", "warning", "error"}, - MinLevel: logutils.LogLevel("info"), - Writer: os.Stderr, - }) - code := m.Run() - os.Exit(code) -} - // PrintFlowdump Segment test, passthrough test only func TestSegment_PrintFlowdump_passthrough(t *testing.T) { result := segments.TestSegment("printflowdump", map[string]string{}, @@ -39,7 +28,7 @@ func BenchmarkPrintFlowdump(b *testing.B) { segment := PrintFlowdump{} in, out := make(chan *flow.FlowMessage), make(chan *flow.FlowMessage) - segment.Rewire([]chan *flow.FlowMessage{in, out}, 0, 1) + segment.Rewire(in, out) wg := &sync.WaitGroup{} wg.Add(1) diff --git a/segments/segments.go b/segments/segments.go index fea4943..f1f6b63 100644 --- a/segments/segments.go +++ b/segments/segments.go @@ -49,7 +49,7 @@ func TestSegment(name string, config map[string]string, msg *flow.FlowMessage) * } in, out := make(chan *flow.FlowMessage), make(chan *flow.FlowMessage) - segment.Rewire([]chan *flow.FlowMessage{in, out}, 0, 1) + segment.Rewire(in, out) wg := &sync.WaitGroup{} wg.Add(1) @@ -67,9 +67,9 @@ func TestSegment(name string, config map[string]string, msg *flow.FlowMessage) * // them. In general, Segments should embed the BaseSegment to provide the // Rewire function and the associated vars. type Segment interface { - New(config map[string]string) Segment // for reading the provided config - Run(wg *sync.WaitGroup) // goroutine, must close(segment.Out) when segment.In is closed - Rewire(chans []chan *flow.FlowMessage, in uint, out uint) // embed this using BaseSegment + New(config map[string]string) Segment // for reading the provided config + Run(wg *sync.WaitGroup) // goroutine, must close(segment.Out) when segment.In is closed + Rewire(in chan *flow.FlowMessage, out chan *flow.FlowMessage) // embed this using BaseSegment } // Serves as a basis for any Segment implementations. Segments embedding this @@ -80,13 +80,28 @@ type BaseSegment struct { Out chan<- *flow.FlowMessage } +// An extended basis for Segment implementations in the filter group. It +// contains the necessities to process filtered (dropped) flows. +type BaseFilterSegment struct { + BaseSegment + Drops chan<- *flow.FlowMessage +} + +// Set a return channel for dropped flow messages. Segments need to be wary of +// this channel closing when producing messages to this channel. This method is +// only called by the flowpipeline tool from the controlflow/branch segment to +// implement the then/else branches, otherwise this functionality is unused. +func (segment *BaseFilterSegment) SubscribeDrops(drops chan<- *flow.FlowMessage) { + segment.Drops = drops +} + // This function rewires this Segment with the provided channels. This is // typically called only by pipeline.New() and present in any Segment // implementation embedding the BaseSegment. // The peculiar implementation of passing the full channel list and providing // indexes is due to the fact that controlflow segments may want to skip // segments and thus need to have all later references available as well. -func (segment *BaseSegment) Rewire(chans []chan *flow.FlowMessage, in uint, out uint) { - segment.In = chans[in] - segment.Out = chans[out] +func (segment *BaseSegment) Rewire(in chan *flow.FlowMessage, out chan *flow.FlowMessage) { + segment.In = in + segment.Out = out } diff --git a/segments/testing/generator/generator.go b/segments/testing/generator/generator.go new file mode 100644 index 0000000..a5978b5 --- /dev/null +++ b/segments/testing/generator/generator.go @@ -0,0 +1,40 @@ +package generator + +import ( + "sync" + + "github.com/bwNetFlow/flowpipeline/segments" + flow "github.com/bwNetFlow/protobuf/go" +) + +type Generator struct { + segments.BaseSegment +} + +func (segment Generator) New(config map[string]string) segments.Segment { + return &Generator{} +} + +func (segment *Generator) Run(wg *sync.WaitGroup) { + defer func() { + close(segment.Out) + wg.Done() + }() + + for { + select { + case msg, ok := <-segment.In: + if !ok { + return + } + segment.Out <- msg + default: + segment.Out <- &flow.FlowMessage{Proto: 6, Bytes: 42, Note: "generated test flow"} + } + } +} + +func init() { + segment := &Generator{} + segments.RegisterSegment("generator", segment) +}