From 89b7b6fd4536e0c7b3031435116b9567d8a9e0c5 Mon Sep 17 00:00:00 2001 From: Yihang Wang Date: Sat, 20 Apr 2024 20:05:57 +0800 Subject: [PATCH] refactor: unexport unnecessary functions and types --- main.py | 0 pkg/utils/io.go | 18 +++++++++++ scheduler.go | 76 +++++++++++++++++------------------------------ scheduler_test.go | 24 +++++++-------- status.go | 22 +++++++------- task.go | 6 ++-- 6 files changed, 72 insertions(+), 74 deletions(-) create mode 100644 main.py diff --git a/main.py b/main.py new file mode 100644 index 0000000..e69de29 diff --git a/pkg/utils/io.go b/pkg/utils/io.go index 6a0a08d..8b60b05 100644 --- a/pkg/utils/io.go +++ b/pkg/utils/io.go @@ -5,6 +5,7 @@ import ( "io" "log/slog" "os" + "path/filepath" "strings" ) @@ -104,3 +105,20 @@ type DiscardCloser struct { func (wc DiscardCloser) Close() error { return nil } + +func OpenFile(path string) (io.WriteCloser, error) { + switch path { + case "-": + return DiscardCloser{Writer: os.Stdout}, nil + case "": + return DiscardCloser{Writer: io.Discard}, nil + default: + // Create folder + dir := filepath.Dir(path) + if err := os.MkdirAll(dir, 0755); err != nil { + return nil, err + } + // Open file + return os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + } +} diff --git a/scheduler.go b/scheduler.go index 65ccf01..885363c 100644 --- a/scheduler.go +++ b/scheduler.go @@ -3,10 +3,7 @@ package gojob import ( "encoding/json" "fmt" - "io" "log/slog" - "os" - "path/filepath" "sync" "sync/atomic" "time" @@ -15,13 +12,13 @@ import ( "github.com/google/uuid" ) -type Metadata map[string]interface{} +type schedulerMetadata map[string]interface{} // Scheduler is a task scheduler type Scheduler struct { id string numWorkers int - metadata Metadata + metadata schedulerMetadata maxRetries int maxRuntimePerTaskSeconds int @@ -32,10 +29,10 @@ type Scheduler struct { isStarted atomic.Bool currentIndex atomic.Int64 - statusManager *StatusManager + statusManager *statusManager - taskChan chan *BasicTask - resultChans []chan *BasicTask + taskChan chan *basicTask + resultChans []chan *basicTask resultFilePath string statusFilePath string @@ -47,14 +44,14 @@ type Scheduler struct { recorderWg *sync.WaitGroup } -type SchedulerOption func(*Scheduler) error +type schedulerOption func(*Scheduler) error -func New(options ...SchedulerOption) *Scheduler { +func New(options ...schedulerOption) *Scheduler { id := uuid.New().String() svr := &Scheduler{ id: id, numWorkers: 1, - metadata: Metadata{"id": id}, + metadata: schedulerMetadata{"id": id}, maxRetries: 4, maxRuntimePerTaskSeconds: 16, @@ -65,10 +62,10 @@ func New(options ...SchedulerOption) *Scheduler { isStarted: atomic.Bool{}, currentIndex: atomic.Int64{}, - statusManager: NewStatusManager(), + statusManager: newStatusManager(), - taskChan: make(chan *BasicTask), - resultChans: []chan *BasicTask{}, + taskChan: make(chan *basicTask), + resultChans: []chan *basicTask{}, resultFilePath: "result.json", statusFilePath: "status.json", @@ -90,7 +87,7 @@ func New(options ...SchedulerOption) *Scheduler { svr.recorderWg.Add(3) chanRecorder(svr.resultFilePath, svr.ResultChan(), svr.recorderWg) chanRecorder(svr.statusFilePath, svr.StatusChan(), svr.recorderWg) - metadataChan := make(chan Metadata) + metadataChan := make(chan schedulerMetadata) chanRecorder(svr.metadataFilePath, metadataChan, svr.recorderWg) metadataChan <- svr.metadata close(metadataChan) @@ -102,7 +99,7 @@ func New(options ...SchedulerOption) *Scheduler { } // SetNumShards sets the number of shards, default is 1 which means no sharding -func WithNumShards(numShards int64) SchedulerOption { +func WithNumShards(numShards int64) schedulerOption { return func(s *Scheduler) error { if numShards <= 0 { return fmt.Errorf("numShards must be greater than 0") @@ -113,7 +110,7 @@ func WithNumShards(numShards int64) SchedulerOption { } // SetShard sets the shard (from 0 to NumShards-1) -func WithShard(shard int64) SchedulerOption { +func WithShard(shard int64) schedulerOption { return func(s *Scheduler) error { if shard < 0 || shard >= s.numShards { return fmt.Errorf("shard must be in [0, NumShards)") @@ -124,7 +121,7 @@ func WithShard(shard int64) SchedulerOption { } // SetNumWorkers sets the number of workers -func WithNumWorkers(numWorkers int) SchedulerOption { +func WithNumWorkers(numWorkers int) schedulerOption { return func(s *Scheduler) error { if numWorkers <= 0 { return fmt.Errorf("numWorkers must be greater than 0") @@ -135,7 +132,7 @@ func WithNumWorkers(numWorkers int) SchedulerOption { } // SetMaxRetries sets the max retries -func WithMaxRetries(maxRetries int) SchedulerOption { +func WithMaxRetries(maxRetries int) schedulerOption { return func(s *Scheduler) error { if maxRetries <= 0 { return fmt.Errorf("maxRetries must be greater than 0") @@ -146,7 +143,7 @@ func WithMaxRetries(maxRetries int) SchedulerOption { } // SetMaxRuntimePerTaskSeconds sets the max runtime per task seconds -func WithMaxRuntimePerTaskSeconds(maxRuntimePerTaskSeconds int) SchedulerOption { +func WithMaxRuntimePerTaskSeconds(maxRuntimePerTaskSeconds int) schedulerOption { return func(s *Scheduler) error { if maxRuntimePerTaskSeconds <= 0 { return fmt.Errorf("maxRuntimePerTaskSeconds must be greater than 0") @@ -157,7 +154,7 @@ func WithMaxRuntimePerTaskSeconds(maxRuntimePerTaskSeconds int) SchedulerOption } // WithTotalTasks sets the total number of tasks, and calculates the number of tasks for this shard -func WithTotalTasks(numTotalTasks int64) SchedulerOption { +func WithTotalTasks(numTotalTasks int64) schedulerOption { return func(s *Scheduler) error { // Check if NumShards is set and is greater than 0 if s.numShards <= 0 { @@ -187,7 +184,7 @@ func WithTotalTasks(numTotalTasks int64) SchedulerOption { } // AddMetadata adds metadata -func WithMetadata(key string, value interface{}) SchedulerOption { +func WithMetadata(key string, value interface{}) schedulerOption { return func(s *Scheduler) error { s.metadata[key] = value return nil @@ -195,7 +192,7 @@ func WithMetadata(key string, value interface{}) SchedulerOption { } // WithResultFilePath sets the file path for results -func WithResultFilePath(path string) SchedulerOption { +func WithResultFilePath(path string) schedulerOption { return func(s *Scheduler) error { s.resultFilePath = path return nil @@ -203,14 +200,14 @@ func WithResultFilePath(path string) SchedulerOption { } // WithStatusFilePath sets the file path for status -func WithStatusFilePath(path string) SchedulerOption { +func WithStatusFilePath(path string) schedulerOption { return func(s *Scheduler) error { s.statusFilePath = path return nil } } -func WithPrometheusPushGateway(url string, job string) SchedulerOption { +func WithPrometheusPushGateway(url string, job string) schedulerOption { return func(s *Scheduler) error { s.prometheusPushGatewayUrl = url s.prometheusPushGatewayJob = job @@ -219,7 +216,7 @@ func WithPrometheusPushGateway(url string, job string) SchedulerOption { } // WithMetadataFilePath sets the file path for metadata -func WithMetadataFilePath(path string) SchedulerOption { +func WithMetadataFilePath(path string) schedulerOption { return func(s *Scheduler) error { s.metadataFilePath = path return nil @@ -227,8 +224,8 @@ func WithMetadataFilePath(path string) SchedulerOption { } // chanRecorder records the channel to a given file path -func chanRecorder[T *BasicTask | Status | Metadata](path string, ch <-chan T, wg *sync.WaitGroup) { - fd, err := openFile(path) +func chanRecorder[T *basicTask | Status | schedulerMetadata](path string, ch <-chan T, wg *sync.WaitGroup) { + fd, err := utils.OpenFile(path) if err != nil { slog.Error("error occured while opening file", slog.String("path", path), slog.String("error", err.Error())) return @@ -248,8 +245,8 @@ func chanRecorder[T *BasicTask | Status | Metadata](path string, ch <-chan T, wg // ResultChan returns a newly created channel to receive results // Everytime ResultChan is called, a new channel is created, and the results are written to all channels // This is useful for multiple consumers (e.g. writing to multiple files) -func (s *Scheduler) ResultChan() <-chan *BasicTask { - c := make(chan *BasicTask) +func (s *Scheduler) ResultChan() <-chan *basicTask { + c := make(chan *basicTask) s.resultChans = append(s.resultChans, c) return c } @@ -273,7 +270,7 @@ func (s *Scheduler) Submit(task Task) { index := s.currentIndex.Load() if (index % s.numShards) == s.shard { s.taskWg.Add(1) - s.taskChan <- NewBasicTask(index, s.id, task) + s.taskChan <- newBasicTask(index, s.id, task) } s.currentIndex.Add(1) } @@ -340,20 +337,3 @@ func (s *Scheduler) Worker() { s.taskWg.Done() } } - -func openFile(path string) (io.WriteCloser, error) { - switch path { - case "-": - return utils.DiscardCloser{Writer: os.Stdout}, nil - case "": - return utils.DiscardCloser{Writer: io.Discard}, nil - default: - // Create folder - dir := filepath.Dir(path) - if err := os.MkdirAll(dir, 0755); err != nil { - return nil, err - } - // Open file - return os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) - } -} diff --git a/scheduler_test.go b/scheduler_test.go index 300f389..3360344 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -12,41 +12,41 @@ import ( "github.com/WangYihang/gojob" ) -type SafeWriter struct { +type safeWriter struct { writer *strings.Builder lock sync.Mutex } -func NewSafeWriter() *SafeWriter { - return &SafeWriter{ +func newSafeWriter() *safeWriter { + return &safeWriter{ writer: new(strings.Builder), lock: sync.Mutex{}, } } -func (sw *SafeWriter) WriteString(s string) { +func (sw *safeWriter) WriteString(s string) { sw.lock.Lock() defer sw.lock.Unlock() sw.writer.WriteString(s) } -func (sw *SafeWriter) String() string { +func (sw *safeWriter) String() string { return sw.writer.String() } -type Task struct { +type schedulerTestTask struct { I int - writer *SafeWriter + writer *safeWriter } -func NewTask(i int, writer *SafeWriter) *Task { - return &Task{ +func newTask(i int, writer *safeWriter) *schedulerTestTask { + return &schedulerTestTask{ I: i, writer: writer, } } -func (t *Task) Do() error { +func (t *schedulerTestTask) Do() error { t.writer.WriteString(fmt.Sprintf("%d\n", t.I)) return nil } @@ -84,14 +84,14 @@ func TestSharding(t *testing.T) { }, } for _, tc := range testcases { - safeWriter := NewSafeWriter() + safeWriter := newSafeWriter() scheduler := gojob.New( gojob.WithNumShards(tc.numShards), gojob.WithShard(tc.shard), gojob.WithResultFilePath(""), ).Start() for i := 0; i < 16; i++ { - scheduler.Submit(NewTask(i, safeWriter)) + scheduler.Submit(newTask(i, safeWriter)) } scheduler.Wait() output := safeWriter.String() diff --git a/status.go b/status.go index f7e36eb..05d2e4e 100644 --- a/status.go +++ b/status.go @@ -13,7 +13,7 @@ type Status struct { NumTotal int64 `json:"num_total"` } -type StatusManager struct { +type statusManager struct { numFailed int64 numSucceed int64 numTotal int64 @@ -23,21 +23,21 @@ type StatusManager struct { statusChans []chan Status } -func NewStatusManager() *StatusManager { - return &StatusManager{ +func newStatusManager() *statusManager { + return &statusManager{ mutex: &sync.Mutex{}, ticker: time.NewTicker(1 * time.Second), statusChans: []chan Status{}, } } -func (sm *StatusManager) Start() { +func (sm *statusManager) Start() { for range sm.ticker.C { sm.notify() } } -func (sm *StatusManager) Stop() { +func (sm *statusManager) Stop() { sm.notify() sm.ticker.Stop() for _, ch := range sm.statusChans { @@ -45,31 +45,31 @@ func (sm *StatusManager) Stop() { } } -func (sm *StatusManager) IncFailed() { +func (sm *statusManager) IncFailed() { sm.mutex.Lock() sm.numFailed++ sm.mutex.Unlock() } -func (sm *StatusManager) IncSucceed() { +func (sm *statusManager) IncSucceed() { sm.mutex.Lock() sm.numSucceed++ sm.mutex.Unlock() } -func (sm *StatusManager) SetTotal(total int64) { +func (sm *statusManager) SetTotal(total int64) { sm.mutex.Lock() sm.numTotal = total sm.mutex.Unlock() } -func (sm *StatusManager) StatusChan() <-chan Status { +func (sm *statusManager) StatusChan() <-chan Status { ch := make(chan Status) sm.statusChans = append(sm.statusChans, ch) return ch } -func (sm *StatusManager) Snapshot() Status { +func (sm *statusManager) Snapshot() Status { sm.mutex.Lock() status := Status{ Timestamp: time.Now().Format(time.RFC3339), @@ -82,7 +82,7 @@ func (sm *StatusManager) Snapshot() Status { return status } -func (sm *StatusManager) notify() { +func (sm *statusManager) notify() { status := sm.Snapshot() for _, ch := range sm.statusChans { ch <- status diff --git a/task.go b/task.go index 1e4d896..9847f3d 100644 --- a/task.go +++ b/task.go @@ -10,7 +10,7 @@ type Task interface { Do() error } -type BasicTask struct { +type basicTask struct { Index int64 `json:"index"` RunID string `json:"run_id"` ID string `json:"id"` @@ -21,8 +21,8 @@ type BasicTask struct { Error string `json:"error"` } -func NewBasicTask(index int64, runID string, task Task) *BasicTask { - return &BasicTask{ +func newBasicTask(index int64, runID string, task Task) *basicTask { + return &basicTask{ Index: index, RunID: runID, ID: uuid.New().String(),