Skip to content

Commit

Permalink
Merge pull request #151 from hey-car/feature/kinesis-data-recorder
Browse files Browse the repository at this point in the history
Add Kinesis support
  • Loading branch information
zhouzhuojie authored Aug 10, 2018
2 parents b942c88 + 0a2f262 commit 3e8caa2
Show file tree
Hide file tree
Showing 174 changed files with 44,790 additions and 2 deletions.
68 changes: 67 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions docs/flagr_env.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,27 @@ export FLAGR_DB_DBDRIVER=mysql
// results in
Config.DBDriver = "mysql"
```

## Kinesis Authentication

In order to use Flagr with Kinesis, you need to authenticate with AWS.
For that, you can use the standard AWS authentication methods:

### Environment

The most common way of authentication is over the environemnt, providing the `ACCESS_KEY_ID` and the `SECRET_ACCESS_KEY`. That way flagr can authenticate with AWS to connect to your Kinesis Stream.

e.g.:
```
AWS_ACCESS_KEY_ID=example123
AWS_SECRET_ACCESS_KEY=example123
AWS_DEFAULT_REGION=eu-central-1
```

More info: https://docs.aws.amazon.com/cli/latest/userguide/cli-environment.html

### Other Alternatives

Alternatively, there are couple more options to provide authentication to your stream, such as credentials file, container credentials or instance profiles. Read more about that on the [official AWS documentation](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html#config-settings-and-precedence).

**Important**: Make sure the key is attached to a user that has permissions to push records into the stream.
13 changes: 12 additions & 1 deletion pkg/config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ var Config = struct {
// RecorderType - the pipeline to log data records, e.g. Kafka
RecorderType string `env:"FLAGR_RECORDER_TYPE" envDefault:"kafka"`

// RecorderKafkaBrokers and etc. - Kafka related configurations for data records logging (Flagr Metrics)
// Kafka related configurations for data records logging (Flagr Metrics)
RecorderKafkaBrokers string `env:"FLAGR_RECORDER_KAFKA_BROKERS" envDefault:":9092"`
RecorderKafkaCertFile string `env:"FLAGR_RECORDER_KAFKA_CERTFILE" envDefault:""`
RecorderKafkaKeyFile string `env:"FLAGR_RECORDER_KAFKA_KEYFILE" envDefault:""`
Expand All @@ -79,6 +79,17 @@ var Config = struct {
RecorderKafkaEncrypted bool `env:"FLAGR_RECORDER_KAFKA_ENCRYPTED" envDefault:"false"`
RecorderKafkaEncryptionKey string `env:"FLAGR_RECORDER_KAFKA_ENCRYPTION_KEY" envDefault:""`

// Kinesis related configurations for data records logging (Flagr Metrics)
RecorderKinesisStreamName string `env:"FLAGR_RECORDER_KINESIS_STREAM_NAME" envDefault:"flagr-records"`
RecorderKinesisBacklogCount int `env:"FLAGR_RECORDER_KINESIS_BACKLOG_COUNT" envDefault:"500"`
RecorderKinesisMaxConnections int `env:"FLAGR_RECORDER_KINESIS_MAX_CONNECTIONS" envDefault:"24"`
RecorderKinesisFlushInterval time.Duration `env:"FLAGR_RECORDER_KINESIS_FLUSH_INTERVAL" envDefault:"5s"`
RecorderKinesisBatchCount int `env:"FLAGR_RECORDER_KINESIS_BATCH_COUNT" envDefault:"500"`
RecorderKinesisBatchSize int `env:"FLAGR_RECORDER_KINESIS_BATCH_SIZE" envDefault:"0"`
RecorderKinesisAggregateBatchCount int `env:"FLAGR_RECORDER_KINESIS_AGGREGATE_BATCH_COUNT" envDefault:"4294967295"`
RecorderKinesisAggregateBatchSize int `env:"FLAGR_RECORDER_KINESIS_AGGREGATE_BATCH_SIZE" envDefault:"51200"`
RecorderKinesisVerbose bool `env:"FLAGR_RECORDER_KINESIS_VERBOSE" envDefault:"false"`

/**
JWTAuthEnabled enables the JWT Auth
Expand Down
2 changes: 2 additions & 0 deletions pkg/handler/data_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func GetDataRecorder() DataRecorder {
switch recorderType {
case "kafka":
singletonDataRecorder = NewKafkaRecorder()
case "kinesis":
singletonDataRecorder = NewKinesisRecorder()
default:
panic("recorderType not supported")
}
Expand Down
112 changes: 112 additions & 0 deletions pkg/handler/data_recorder_kinesis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package handler

import (
"encoding/json"

"github.com/checkr/flagr/pkg/config"
"github.com/checkr/flagr/pkg/util"
"github.com/checkr/flagr/swagger_gen/models"

"github.com/a8m/kinesis-producer"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/sirupsen/logrus"
)

var (
newKinesisProducer = producer.New
)

type kinesisRecorder struct {
enabled bool
producer *producer.Producer
}

// NewKinesisRecorder creates a new Kinesis recorder
var NewKinesisRecorder = func() DataRecorder {
client := kinesis.New(session.New(aws.NewConfig()))

p := newKinesisProducer(&producer.Config{
StreamName: config.Config.RecorderKinesisStreamName,
Client: client,
BacklogCount: config.Config.RecorderKinesisBacklogCount,
MaxConnections: config.Config.RecorderKinesisMaxConnections,
FlushInterval: config.Config.RecorderKinesisFlushInterval,
BatchSize: config.Config.RecorderKinesisBatchSize,
BatchCount: config.Config.RecorderKinesisBatchCount,
AggregateBatchCount: config.Config.RecorderKinesisAggregateBatchCount,
AggregateBatchSize: config.Config.RecorderKinesisAggregateBatchSize,
Verbose: config.Config.RecorderKinesisVerbose,
Logger: logrus.WithField("producer", "kinesis"),
})

p.Start()

go func() {
for err := range p.NotifyFailures() {
logrus.WithField("kinesis_error", err).Error("error pushing to kinesis")
}
}()

return &kinesisRecorder{
producer: p,
enabled: config.Config.RecorderEnabled,
}
}

func (k *kinesisRecorder) AsyncRecord(r *models.EvalResult) {
if !k.enabled {
return
}

kr := &kinesisEvalResult{
EvalResult: r,
}

payload, err := kr.Payload()
if err != nil {
logrus.WithField("kinesis_error", err).Error("error marshaling")
}

messageFrame := kinesisMessageFrame{
Payload: string(payload),
Encrypted: false, // ignoring encryption at this time - https://github.com/checkr/flagr/pull/151#discussion_r208313230
}

message, err := messageFrame.encode()
if err != nil {
logrus.WithField("kinesis_error", err).Error("error marshaling")
}

err = k.producer.Put(message, kr.Key())
if err != nil {
logrus.WithField("kinesis_error", err).Error("error pushing to kinesis")
}
}

type kinesisEvalResult struct {
*models.EvalResult
}

type kinesisMessageFrame struct {
Payload string `json:"payload"`
Encrypted bool `json:"encrypted"`
}

func (kmf *kinesisMessageFrame) encode() ([]byte, error) {
return json.MarshalIndent(kmf, "", " ")
}

// Payload marshals the EvalResult
func (r *kinesisEvalResult) Payload() ([]byte, error) {
return r.EvalResult.MarshalBinary()
}

// Key generates the partition key
func (r *kinesisEvalResult) Key() string {
if r.EvalResult == nil || r.EvalContext == nil {
return ""
}
return util.SafeString(r.EvalContext.EntityID)
}
Loading

0 comments on commit 3e8caa2

Please sign in to comment.