Skip to content

Commit

Permalink
dag: initial color support
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidGamba committed Oct 9, 2024
1 parent d527aa6 commit ffca682
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 21 deletions.
33 changes: 19 additions & 14 deletions dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,13 @@ import (
"errors"
"fmt"
"io"
"log"
"os"
"sync"
"time"

"github.com/DavidGamba/go-getoptions"
)

var Logger = log.New(os.Stderr, "", log.LstdFlags)

type (
ID string

Expand Down Expand Up @@ -79,6 +76,11 @@ type (
bufferOutput bool
bufferWriter io.Writer
bufferMutex sync.Mutex
UseColor bool
InfoColor string
InfoBoldColor string
ErrorColor string
ErrorBoldColor string
}

runStatus int
Expand Down Expand Up @@ -196,6 +198,10 @@ func NewGraph(name string) *Graph {
Vertices: make(map[ID]*Vertex),
errs: &Errors{name, make([]error, 0)},
maxParallel: 1_000_000,
InfoColor: "34",
InfoBoldColor: "36;1",
ErrorColor: "31",
ErrorBoldColor: "35;1",
}
}

Expand Down Expand Up @@ -424,7 +430,7 @@ LOOP:
g.Vertices[iderr.ID].status = runDone
if iderr.Error != nil {
err := fmt.Errorf("Task %s:%s error: %w", g.Name, iderr.ID, iderr.Error)
Logger.Printf("%s\n", err)
Logger.Printf(g.colorError("Task ")+g.colorErrorBold("%s:%s")+g.colorError(" error: %s\n"), g.Name, iderr.ID, iderr.Error)
if !errors.Is(iderr.Error, ErrorSkipParents) {
g.errs.Errors = append(g.errs.Errors, err)
continue
Expand All @@ -443,8 +449,8 @@ LOOP:
if handledContext {
break
}
Logger.Printf("Cancelation received or time out reached, allowing in-progress tasks to finish, skipping the rest.\n")
g.errs.Errors = append(g.errs.Errors, fmt.Errorf("cancelation received or time out reached"))
Logger.Print(g.colorError("Cancellation received or time out reached, allowing in-progress tasks to finish, skipping the rest.\n"))
g.errs.Errors = append(g.errs.Errors, fmt.Errorf("cancellation received or time out reached"))
handledContext = true
default:
break
Expand All @@ -456,7 +462,7 @@ LOOP:
}
if v.status == runSkip {
v.status = runInProgress
Logger.Printf("Skipped Task %s:%s\n", g.Name, v.ID)
Logger.Printf(g.colorError("Skipped Task ")+g.colorErrorBold("%s:%s\n"), g.Name, v.ID)
go func(done chan IDErr, v *Vertex) {
done <- IDErr{v.ID, nil}
}(done, v)
Expand All @@ -472,7 +478,7 @@ LOOP:
go func(ctx context.Context, done chan IDErr, v *Vertex) {
semaphore <- struct{}{}
defer func() { <-semaphore }()
Logger.Printf("Running Task %s:%s\n", g.Name, v.ID)
Logger.Printf(g.colorInfo("Running Task ")+g.colorInfoBold("%s:%s\n"), g.Name, v.ID)
start := time.Now()
v.Task.Lock()
defer v.Task.Unlock()
Expand All @@ -492,21 +498,20 @@ LOOP:
_, _ = combinedBuffer.WriteTo(g.bufferWriter)
g.bufferMutex.Unlock()
}
Logger.Printf("Completed Task %s:%s in %s\n", g.Name, v.ID, durationStr(time.Since(start)))
Logger.Printf(g.colorInfo("Completed Task ")+g.colorInfoBold("%s:%s")+g.colorInfo(" in %s\n"), g.Name, v.ID, durationStr(time.Since(start)))
if err == nil {
break
}
if err != nil && i < v.Retries {
err = fmt.Errorf("Task %s:%s error: %w", g.Name, v.ID, err)
Logger.Printf("%s", err)
Logger.Printf("Retrying (%d/%d) Task %s:%s\n", i+1, v.Retries, g.Name, v.ID)
Logger.Printf(g.colorError("Task ")+g.colorErrorBold("%s:%s")+g.colorError(" error: %s"), g.Name, v.ID, err)
Logger.Printf(g.colorInfo("Retrying (%d/%d) Task %s:%s\n"), i+1, v.Retries, g.Name, v.ID)
}
}
done <- IDErr{v.ID, err}
}(ctx, done, v)
}
}
Logger.Printf("Completed %s Run in %s\n", g.Name, durationStr(time.Since(runStart)))
Logger.Printf(g.colorInfo("Completed ")+g.colorInfoBold("%s")+g.colorInfo(" Run in %s\n"), g.Name, durationStr(time.Since(runStart)))

if len(g.errs.Errors) != 0 {
return g.errs
Expand Down Expand Up @@ -569,7 +574,7 @@ func (g *Graph) getNextVertex() (*Vertex, bool, bool) {

// skipParents - Marks all Vertex parents as runDone
func skipParents(v *Vertex) {
Logger.Printf("skip parents for %s\n", v.ID)
// Logger.Printf("skip parents for %s\n", v.ID)
for _, c := range v.Parents {
c.status = runSkip
skipParents(c)
Expand Down
14 changes: 7 additions & 7 deletions dag/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ func TestDagTaskErrorRetry(t *testing.T) {
}
}

func TestDagContexDone(t *testing.T) {
func TestDagContextDone(t *testing.T) {
buf := setupLogging()
t.Cleanup(func() { t.Log(buf.String()) })

Expand Down Expand Up @@ -526,20 +526,20 @@ func TestDagContexDone(t *testing.T) {
if len(errs.Errors) != 5 {
t.Fatalf("Unexpected error size, %d: %s\n", len(errs.Errors), err)
}
if errs.Errors[0].Error() != "cancelation received or time out reached" {
t.Fatalf("Unexpected error: %s\n", errs.Errors[0])
if errs.Errors[0].Error() != "cancellation received or time out reached" {
t.Fatalf("Unexpected error 0: %s\n", errs.Errors[0])
}
if !errors.Is(errs.Errors[1], ErrorTaskSkipped) {
t.Fatalf("Unexpected error: %s\n", errs.Errors[1])
t.Fatalf("Unexpected error 1: %s\n", errs.Errors[1])
}
if !errors.Is(errs.Errors[2], ErrorTaskSkipped) {
t.Fatalf("Unexpected error: %s\n", errs.Errors[1])
t.Fatalf("Unexpected error 2: %s\n", errs.Errors[2])
}
if !errors.Is(errs.Errors[3], ErrorTaskSkipped) {
t.Fatalf("Unexpected error: %s\n", errs.Errors[1])
t.Fatalf("Unexpected error 3: %s\n", errs.Errors[3])
}
if !errors.Is(errs.Errors[4], ErrorTaskSkipped) {
t.Fatalf("Unexpected error: %s\n", errs.Errors[1])
t.Fatalf("Unexpected error 4: %s\n", errs.Errors[4])
}
if len(results) > 4 {
t.Errorf("Wrong list: %v\n", results)
Expand Down
69 changes: 69 additions & 0 deletions dag/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package dag

import (
"fmt"
"io"
"log"
"os"

"strings"
)

// var Logger = NewColorLogger()

var Logger = log.New(os.Stderr, "", log.LstdFlags)

type ColorLogger struct {
logger *log.Logger
Color string
}

func NewColorLogger() *ColorLogger {
return &ColorLogger{
logger: log.New(os.Stderr, "", log.LstdFlags),
Color: "35",
}
}

func (l *ColorLogger) Printf(format string, v ...interface{}) {
if l.Color == "" {
l.logger.Printf(format, v...)
return
} else {
// l.logger.Printf(fmt.Sprintf("\033[%sm%s\033[0m", l.Color, format), v...)
// l.logger.Printf(fmt.Sprintf("\033[34m%sx", format), v...)
l.logger.Printf("\033[%sm%s\033[0m", l.Color, fmt.Sprintf(format, v...))
}
}

func (l *ColorLogger) SetOutput(w io.Writer) {
l.logger.SetOutput(w)
}

func (g *Graph) colorInfo(format string) string {
return g.color(g.InfoColor, format)
}

func (g *Graph) colorInfoBold(format string) string {
return g.color(g.InfoBoldColor, format)
}

func (g *Graph) colorError(format string) string {
return g.color(g.ErrorColor, format)
}

func (g *Graph) colorErrorBold(format string) string {
return g.color(g.ErrorBoldColor, format)
}

func (g *Graph) color(color string, format string) string {
if g.UseColor {
return format
}
if !strings.HasSuffix(format, "\n") {
return fmt.Sprintf("\033[%sm%s\033[0m", color, format)
}

format = format[:len(format)-1]
return fmt.Sprintf("\033[%sm%s\033[0m\n", color, format)
}

0 comments on commit ffca682

Please sign in to comment.