Skip to content

Commit

Permalink
controlflow/branch segment (#34)
Browse files Browse the repository at this point in the history
* pipeline: use pointer receivers and separate instanciation from startup

* pipeline: rework instanciation, add initial subpipeline segment

* flowfilter: add ability to subscribe to dropped flows

* subpipeline: add conditional pipelines

* rename blackhole to drop

* rename noop to pass

* give filter/drop the ability to report drops for subpipeline

* simplify drop handling in elephant segment

* give segments from filter group their own base type

* change subpipeline segment into a conditional branch segment

* add deprecation notice for skip segment

* fix deadlock on drops within either branch

* fix tests

* remove skip and re-simplify rewire methods

* greatly simplify branch segment by having empty pipelines replace themselves with a single pass segment

* fix accidental close of nil channels in filter segments

* setup github action for testing

* make testing on github action more silent

* fail a test on purpose

* Revert "fail a test on purpose"

This reverts commit 82e9628.

* remove logging setup from all tests

* fix panic on exit when two or more filter segments were part of the if branch

* further simplify drop outputs of filter segments

* make the branch segment test a bit less dummy

* add some basic testing for branch segment

* add more tests using a generator segment to detect deadlocks in branch segment
  • Loading branch information
debugloop authored Feb 17, 2022
1 parent a07403e commit b67ce16
Show file tree
Hide file tree
Showing 44 changed files with 586 additions and 629 deletions.
20 changes: 20 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
name: go test

on:
push:
branches: [ master ]
pull_request:
branches: [ master ]

jobs:

test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2

- uses: actions/setup-go@v2
with:
go-version: 1.17

- run: go test ./...
81 changes: 69 additions & 12 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,64 @@ conditional, limiting payload data, and multiple receivers.
Segments in this group have the ability to change the sequence of segments any
given flow traverses.

#### blackhole
The `blackhole` segment is used to drain a pipeline, effectively starting a new
pipeline after it. In conjunction with `skip`, this can act as a `flowfilter`.
#### branch
The `branch` segment is used to select the further progression of the pipeline
between to branches. To this end, it uses additional syntax that other segments
do not have access to, namely the `if`, `then` and `else` keys which can
contain lists of segments that constitute embedded pipelines.

The any of these three keys may be empty and they are by default. The `if`
segments receive the flows entering the `branch` segment unconditionally. If
the segments in `if` proceed any flow from the input all the way to the end of
the `if` segments, this flow will be moved on to the `then` segments. If flows
are dropped at any point within the `if` segments, they will be moved on to the
`else` branch immediately, shortcutting the traversal of the `if` segments. Any
edits made to flows during the `if` segments will be persisted in either
branch, `then` and `else`, as well as after the flows passed from the `branch`
segment into consecutive segments. Dropping flows behaves regularly in both
branches, but note that flows can not be dropped within the `if` branch
segments, as this is taken as a cue to move them into the `else` branch.

If any of these three lists of segments (or subpipelines) is empty, the
`branch` segment will behave as if this subpipeline consisted of a single
`pass` segment.

Instead of a minimal example, the following more elaborate one highlights all
TCP flows while printing to standard output and keeps only these highlighted
ones in a sqlite export:

```
- segment: branch
if:
- segment: flowfilter
config:
filter: proto tcp
- segment: elephant
then:
- segment: printflowdump
config:
highlight: 1
else:
- segment: printflowdump
- segment: drop
```
- segment: blackhole
- segment: sqlite
config:
filename: tcponly.sqlite
```

[godoc](https://pkg.go.dev/github.com/bwNetFlow/flowpipeline/segments/controlflow/blackhole)
[examples using this segment](https://github.com/search?q=%22segment%3A+blackhole%22+extension%3Ayml+repo%3AbwNetFlow%2Fflowpipeline%2Fexamples&type=Code)
[godoc](https://pkg.go.dev/github.com/bwNetFlow/flowpipeline/segments/controlflow/branch)
[examples using this segment](https://github.com/search?q=%22segment%3A+branch%22+extension%3Ayml+repo%3AbwNetFlow%2Fflowpipeline%2Fexamples&type=Code)


#### skip

*DEPRECATION NOTICE*: This segment will be deprecated in a future version of
flowpipeline. In any but the most convoluted examples, the `branch` segment
documented directly above is the clearer and more legible choice. This will
also greatly simplify the setup internals of segments.


The `skip` segment is used to conditionally skip over segments behind it. For
instance, in front of a export segment a condition such as `proto tcp` with a
skip value of `1` would result in any TCP flows not being exported by the
Expand Down Expand Up @@ -142,6 +188,17 @@ Segments in this group all drop flows, i.e. remove them from the pipeline from
this segment on. Fields in individual flows are never modified, only used as
criteria.

#### drop
The `drop` segment is used to drain a pipeline, effectively starting a new
pipeline after it. In conjunction with `skip`, this can act as a `flowfilter`.

```
- segment: drop
```

[godoc](https://pkg.go.dev/github.com/bwNetFlow/flowpipeline/segments/filter/drop)
[examples using this segment](https://github.com/search?q=%22segment%3A+drop%22+extension%3Ayml+repo%3AbwNetFlow%2Fflowpipeline%2Fexamples&type=Code)

#### elephant
The `elephant` segment uses a configurable sliding window to determine flow
statistics at runtime and filter out unremarkable flows from the pipeline. This
Expand Down Expand Up @@ -701,8 +758,8 @@ for an application.

This is for internally used segments only.

#### noop
The `noop` segment serves as a heavily annotated template for new segments. So
#### pass
The `pass` segment serves as a heavily annotated template for new segments. So
does this piece of documentation. Aside from summarizing what a segment does,
it should include a description of all the parameters it accepts as well as any
caveats users should be aware of.
Expand All @@ -712,12 +769,12 @@ Roadmap:
* and here

```
- segment: noop
- segment: pass
# the lines below are optional and set to default
config:
jk: this segment actually has no config at all, its just for this template
```

[any additional links](https://bwnet.belwue.de)
[godoc](https://pkg.go.dev/github.com/bwNetFlow/flowpipeline/segments/noop)
[examples using this segment](https://github.com/search?q=%22segment%3A+noop%22+extension%3Ayml+repo%3AbwNetFlow%2Fflowpipeline%2Fexamples&type=Code)
[godoc](https://pkg.go.dev/github.com/bwNetFlow/flowpipeline/segments/pass)
[examples using this segment](https://github.com/search?q=%22segment%3A+pass%22+extension%3Ayml+repo%3AbwNetFlow%2Fflowpipeline%2Fexamples&type=Code)
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ test:
go test ./... -cover

bench:
@go test -bench=. -benchtime=1ns ./segments/noop | grep "cpu:"
@go test -bench=. -benchtime=1ns ./segments/pass | grep "cpu:"
@echo "results:"
@go test -bench=. -run=Bench ./... | grep -E "^Bench" | awk '{fps = 1/(($$3)/1e9); sub(/Benchmark/, "", $$1); sub(/-.*/, "", $$1); printf("%15s: %8s ns/flow, %7.0f flows/s\n", tolower($$1), $$3, fps)}'
@rm segments/output/sqlite/bench.sqlite
7 changes: 4 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ import (

_ "github.com/bwNetFlow/flowpipeline/segments/alert/http"

_ "github.com/bwNetFlow/flowpipeline/segments/controlflow/blackhole"
_ "github.com/bwNetFlow/flowpipeline/segments/controlflow/skip"
_ "github.com/bwNetFlow/flowpipeline/segments/controlflow/branch"

_ "github.com/bwNetFlow/flowpipeline/segments/export/influx"
_ "github.com/bwNetFlow/flowpipeline/segments/export/prometheus"

_ "github.com/bwNetFlow/flowpipeline/segments/filter/drop"
_ "github.com/bwNetFlow/flowpipeline/segments/filter/elephant"
_ "github.com/bwNetFlow/flowpipeline/segments/filter/flowfilter"

Expand All @@ -40,7 +40,7 @@ import (
_ "github.com/bwNetFlow/flowpipeline/segments/modify/remoteaddress"
_ "github.com/bwNetFlow/flowpipeline/segments/modify/snmp"

_ "github.com/bwNetFlow/flowpipeline/segments/noop"
_ "github.com/bwNetFlow/flowpipeline/segments/pass"

_ "github.com/bwNetFlow/flowpipeline/segments/output/csv"
_ "github.com/bwNetFlow/flowpipeline/segments/output/json"
Expand Down Expand Up @@ -69,6 +69,7 @@ func main() {
return
}
pipe := pipeline.NewFromConfig(config)
pipe.Start()
pipe.AutoDrain()

sigs := make(chan os.Signal, 1)
Expand Down
40 changes: 30 additions & 10 deletions pipeline/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,22 @@ import (
"strconv"

"github.com/bwNetFlow/flowpipeline/segments"
"github.com/bwNetFlow/flowpipeline/segments/controlflow/branch"
"gopkg.in/yaml.v2"
)

// A config representation of a segment. It is intended to look like this:
// - segment: noop
// - segment: pass
// config:
// key: value
// foo: bar
// This struct has the appropriate yaml tags inline.
type SegmentRepr struct {
Name string `yaml:"segment"` // to be looked up with a registry
Config map[string]string `yaml:"config"` // to be expanded by our instance
Name string `yaml:"segment"` // to be looked up with a registry
Config map[string]string `yaml:"config"` // to be expanded by our instance
If []SegmentRepr `yaml:"if,omitempty,flow"` // only used by group segment
Then []SegmentRepr `yaml:"then,omitempty,flow"` // only used by group segment
Else []SegmentRepr `yaml:"else,omitempty,flow"` // only used by group segment
}

// Returns the SegmentRepr's Config with all its variables expanded. It tries
Expand Down Expand Up @@ -47,24 +51,40 @@ func (s *SegmentRepr) ExpandedConfig() map[string]string {
// initializes a Pipeline with them.
func NewFromConfig(config []byte) *Pipeline {
// parse a list of SegmentReprs from yaml
pipelineRepr := new([]SegmentRepr)
segmentReprs := new([]SegmentRepr)

err := yaml.Unmarshal(config, &pipelineRepr)
err := yaml.Unmarshal(config, &segmentReprs)
if err != nil {
log.Fatalf("[error] Error parsing configuration YAML: %v", err)
}

segments := SegmentsFromRepr(segmentReprs)

// we have SegmentReprs parsed, instanciate them as actual Segments
segmentList := make([]segments.Segment, len(*pipelineRepr))
for i, segmentrepr := range *pipelineRepr {
segmenttype := segments.LookupSegment(segmentrepr.Name) // a typed nil instance
return New(segments...)
}

// Creates a list of Segments from their config representations. Handles
// recursive definitions found in Segments.
func SegmentsFromRepr(segmentReprs *[]SegmentRepr) []segments.Segment {
segmentList := make([]segments.Segment, len(*segmentReprs))
for i, segmentrepr := range *segmentReprs {
segmentTemplate := segments.LookupSegment(segmentrepr.Name) // a typed nil instance
// the Segment's New method knows how to handle our config
segment := segmenttype.New(segmentrepr.ExpandedConfig())
segment := segmentTemplate.New(segmentrepr.ExpandedConfig())
switch segment := segment.(type) { // handle special segments
case *branch.Branch:
segment.ImportBranches(
New(SegmentsFromRepr(&segmentrepr.If)...),
New(SegmentsFromRepr(&segmentrepr.Then)...),
New(SegmentsFromRepr(&segmentrepr.Else)...),
)
}
if segment != nil {
segmentList[i] = segment
} else {
log.Fatalf("[error] Configured segment '%s' could not be initialized properly, see previous messages.", segmentrepr.Name)
}
}
return New(segmentList...)
return segmentList
}
66 changes: 52 additions & 14 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import (
"sync"

"github.com/bwNetFlow/flowpipeline/segments"
"github.com/bwNetFlow/flowpipeline/segments/filter/drop"
"github.com/bwNetFlow/flowpipeline/segments/filter/elephant"
"github.com/bwNetFlow/flowpipeline/segments/filter/flowfilter"
"github.com/bwNetFlow/flowpipeline/segments/pass"
flow "github.com/bwNetFlow/protobuf/go"
)

Expand All @@ -15,17 +19,48 @@ import (
type Pipeline struct {
In chan *flow.FlowMessage
Out <-chan *flow.FlowMessage
Drop chan *flow.FlowMessage
wg *sync.WaitGroup
SegmentList []segments.Segment `yaml: segments`
SegmentList []segments.Segment
}

func (pipeline *Pipeline) GetInput() chan *flow.FlowMessage {
return pipeline.In
}

func (pipeline *Pipeline) GetOutput() <-chan *flow.FlowMessage {
return pipeline.Out
}

func (pipeline *Pipeline) GetDrop() <-chan *flow.FlowMessage {
if pipeline.Drop != nil {
return pipeline.Drop
}
pipeline.Drop = make(chan *flow.FlowMessage)
// Subscribe to drops from special segments, namely all based on
// BaseFilterSegment grouped in the filter directory.
for _, segment := range pipeline.SegmentList {
switch typedSegment := segment.(type) {
case *drop.Drop:
typedSegment.SubscribeDrops(pipeline.Drop)
case *elephant.Elephant:
typedSegment.SubscribeDrops(pipeline.Drop)
case *flowfilter.FlowFilter:
typedSegment.SubscribeDrops(pipeline.Drop)
}
}
// If there are no filter/* segments, this channel will never have
// messages available.
return pipeline.Drop
}

// Starts up a goroutine specific to this Pipeline which reads any message from
// the Out channel and discards it. This is a convenience function to enable
// having a segment at the end of the pipeline handle all results, i.e. having
// no post-pipeline processing.
func (pipeline Pipeline) AutoDrain() {
func (pipeline *Pipeline) AutoDrain() {
go func() {
for _ = range pipeline.Out {
for range pipeline.Out {
}
log.Println("[info] Pipeline closed, auto draining finished.")
}()
Expand All @@ -35,7 +70,7 @@ func (pipeline Pipeline) AutoDrain() {
// segments to propagate this close event through the full pipeline,
// terminating all segment goroutines and thus releasing the waitgroup.
// Blocking.
func (pipeline Pipeline) Close() {
func (pipeline *Pipeline) Close() {
defer func() {
recover() // in case In is already closed
pipeline.wg.Wait()
Expand All @@ -47,19 +82,22 @@ func (pipeline Pipeline) Close() {
// therein. Initialization includes creating any intermediate channels and
// wiring up the segments in the segmentList with them.
func New(segmentList ...segments.Segment) *Pipeline {
wg := sync.WaitGroup{}
// the following loops need to be split so that Rewire can work with
// non-nil channels from further ahead in the pipeline... important for
// skip segments
if len(segmentList) == 0 {
segmentList = []segments.Segment{&pass.Pass{}}
}
channels := make([]chan *flow.FlowMessage, len(segmentList)+1)
channels[0] = make(chan *flow.FlowMessage)
for i, _ := range segmentList {
for i, segment := range segmentList {
channels[i+1] = make(chan *flow.FlowMessage)
segment.Rewire(channels[i], channels[i+1])
}
for i, segment := range segmentList {
segment.Rewire(channels, uint(i), uint(i+1))
wg.Add(1)
go segment.Run(&wg)
return &Pipeline{In: channels[0], Out: channels[len(channels)-1], wg: &sync.WaitGroup{}, SegmentList: segmentList}
}

// Starts the Pipeline by starting all segment goroutines therein.
func (pipeline *Pipeline) Start() {
for _, segment := range pipeline.SegmentList {
pipeline.wg.Add(1)
go segment.Run(pipeline.wg)
}
return &Pipeline{In: channels[0], Out: channels[len(channels)-1], wg: &wg, SegmentList: segmentList}
}
Loading

0 comments on commit b67ce16

Please sign in to comment.