Skip to content

Commit

Permalink
feat(3376) Process condition's pipeline logs
Browse files Browse the repository at this point in the history
  • Loading branch information
EmilienLeroux committed Mar 29, 2023
1 parent 8a85f97 commit f529f3a
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 12 deletions.
79 changes: 67 additions & 12 deletions pkg/entry/mongo/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,22 @@ type AppLogDocument struct {
ContainerId string `bson:"container_id"`
}

type ConditionPipelineLogDocument struct {
LogDocument `bson:",inline"`
ConditionExecutionId string `bson:"condition_execution_id"`
ConditionNodeId string `bson:"condition_node_id"`
PipelineExecutionId string `bson:"pipeline_execution_id"`
}

func Convert(ctx context.Context, ts time.Time, record map[interface{}]interface{}) (LogEntry, error) {
var doc LogEntry

if isJobLog(record) {
doc = &JobLogDocument{}
} else {
} else if isAppLog(record) {
doc = &AppLogDocument{}
} else {
doc = &ConditionPipelineLogDocument{}
}

if err := doc.Populate(ctx, ts, record); err != nil {
Expand All @@ -66,24 +75,32 @@ func Convert(ctx context.Context, ts time.Time, record map[interface{}]interface
}

const (
LogKey = "log"
StreamKey = "stream"
TimeKey = "time"
LogPrefixKey = "log_prefix"
JobExecutionIDKey = "job_execution_id"
ContainerIDKey = "container_id"
AppExecutionIDKey = "app_execution_id"
AppIDKey = "app_id"
ProjectIDKey = "project_id"
CustomerKey = "customer"
PlatformIDKey = "platform_id"
LogKey = "log"
StreamKey = "stream"
TimeKey = "time"
LogPrefixKey = "log_prefix"
JobExecutionIDKey = "job_execution_id"
ContainerIDKey = "container_id"
AppExecutionIDKey = "app_execution_id"
AppIDKey = "app_id"
ConditionExecutionIDKey = "condition_execution_id"
ConditionNodeIDKey = "condition_node_id"
PipelineExecutionIDKey = "pipeline_execution_id"
ProjectIDKey = "project_id"
CustomerKey = "customer"
PlatformIDKey = "platform_id"
)

func isJobLog(record map[interface{}]interface{}) bool {
_, err := parse.ExtractStringValue(record, JobExecutionIDKey)
return err == nil
}

func isAppLog(record map[interface{}]interface{}) bool {
_, err := parse.ExtractStringValue(record, AppExecutionIDKey)
return err == nil
}

func (d *AppLogDocument) Populate(ctx context.Context, ts time.Time, record map[interface{}]interface{}) error {
err := d.LogDocument.Populate(ctx, ts, record)
if err != nil {
Expand Down Expand Up @@ -122,6 +139,30 @@ func (d *JobLogDocument) Populate(ctx context.Context, ts time.Time, record map[
return d.generateObjectID()
}

func (d *ConditionPipelineLogDocument) Populate(ctx context.Context, ts time.Time, record map[interface{}]interface{}) error {
err := d.LogDocument.Populate(ctx, ts, record)
if err != nil {
return fmt.Errorf("populate: %w", err)
}

d.ConditionExecutionId, err = parse.ExtractStringValue(record, ConditionExecutionIDKey)
if err != nil {
return fmt.Errorf("parse %s: %w", ConditionExecutionIDKey, err)
}

d.ConditionNodeId, err = parse.ExtractStringValue(record, ConditionNodeIDKey)
if err != nil {
return fmt.Errorf("parse %s: %w", ConditionNodeIDKey, err)
}

d.PipelineExecutionId, err = parse.ExtractStringValue(record, PipelineExecutionIDKey)
if err != nil {
return fmt.Errorf("parse %s: %w", PipelineExecutionIDKey, err)
}

return d.generateObjectID()
}

func cleanLogContent(content string) string {
return strings.TrimSuffix(content, "\n")
}
Expand Down Expand Up @@ -270,3 +311,17 @@ func (d *AppLogDocument) SaveTo(collection *mgo.Collection) error {

return nil
}

func (d *ConditionPipelineLogDocument) SaveTo(collection *mgo.Collection) error {
if _, err := collection.UpsertId(d.Id, d); err != nil {
return fmt.Errorf("upsert %s: %w", d.Id, err)
}

indexes := []string{"condition_execution_id", "time"}

if err := collection.EnsureIndexKey(indexes...); err != nil {
return fmt.Errorf("ensure indexes %v: %w", indexes, err)
}

return nil
}
30 changes: 30 additions & 0 deletions pkg/entry/mongo/document_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,36 @@ var _ = Describe("Convert document", func() {
})
})

Describe("ConditionPipeline with all fields", func() {
var entry map[interface{}]interface{}

BeforeEach(func() {
entry = map[interface{}]interface{}{
mongo.LogKey: stringEntry("log"),
mongo.StreamKey: stringEntry("stream"),
mongo.TimeKey: timeEntry(time.Now()),
mongo.ConditionExecutionIDKey: stringEntry("conditionExecutionID"),
mongo.ConditionNodeIDKey: stringEntry("conditionNodeID"),
mongo.PipelineExecutionIDKey: stringEntry("pipelineExecutionID"),
mongo.ProjectIDKey: stringEntry("projectID"),
mongo.CustomerKey: stringEntry("customer"),
mongo.PlatformIDKey: stringEntry("platformID"),
}
})

It("Should work", func() {
d, err := mongo.Convert(ctx, time.Now(), entry)
Expect(err).ToNot(HaveOccurred())
Expect(d).ToNot(BeNil())
Expect(d).To(BeAssignableToTypeOf(&mongo.ConditionPipelineLogDocument{}))
document := d.(*mongo.ConditionPipelineLogDocument)
Expect(document.ConditionExecutionId).To(BeEquivalentTo(stringEntry("conditionExecutionID")))
Expect(document.ConditionNodeId).To(BeEquivalentTo(stringEntry("conditionNodeID")))
Expect(document.PipelineExecutionId).To(BeEquivalentTo(stringEntry("pipelineExecutionID")))
Expect(document.Customer).To(BeEquivalentTo(entry[mongo.CustomerKey]))
})
})

Context("With missing field", func() {
var entry map[interface{}]interface{}

Expand Down

0 comments on commit f529f3a

Please sign in to comment.