Skip to content

Commit

Permalink
refactor: unexport unnecessary functions and types
Browse files Browse the repository at this point in the history
  • Loading branch information
WangYihang committed Apr 20, 2024
1 parent 869d0d6 commit 89b7b6f
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 74 deletions.
Empty file added main.py
Empty file.
18 changes: 18 additions & 0 deletions pkg/utils/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"log/slog"
"os"
"path/filepath"
"strings"
)

Expand Down Expand Up @@ -104,3 +105,20 @@ type DiscardCloser struct {
func (wc DiscardCloser) Close() error {
return nil
}

func OpenFile(path string) (io.WriteCloser, error) {
switch path {
case "-":
return DiscardCloser{Writer: os.Stdout}, nil
case "":
return DiscardCloser{Writer: io.Discard}, nil
default:
// Create folder
dir := filepath.Dir(path)
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, err
}
// Open file
return os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
}
}
76 changes: 28 additions & 48 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ package gojob
import (
"encoding/json"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
Expand All @@ -15,13 +12,13 @@ import (
"github.com/google/uuid"
)

type Metadata map[string]interface{}
type schedulerMetadata map[string]interface{}

// Scheduler is a task scheduler
type Scheduler struct {
id string
numWorkers int
metadata Metadata
metadata schedulerMetadata

maxRetries int
maxRuntimePerTaskSeconds int
Expand All @@ -32,10 +29,10 @@ type Scheduler struct {
isStarted atomic.Bool
currentIndex atomic.Int64

statusManager *StatusManager
statusManager *statusManager

taskChan chan *BasicTask
resultChans []chan *BasicTask
taskChan chan *basicTask
resultChans []chan *basicTask

resultFilePath string
statusFilePath string
Expand All @@ -47,14 +44,14 @@ type Scheduler struct {
recorderWg *sync.WaitGroup
}

type SchedulerOption func(*Scheduler) error
type schedulerOption func(*Scheduler) error

func New(options ...SchedulerOption) *Scheduler {
func New(options ...schedulerOption) *Scheduler {
id := uuid.New().String()
svr := &Scheduler{
id: id,
numWorkers: 1,
metadata: Metadata{"id": id},
metadata: schedulerMetadata{"id": id},

maxRetries: 4,
maxRuntimePerTaskSeconds: 16,
Expand All @@ -65,10 +62,10 @@ func New(options ...SchedulerOption) *Scheduler {
isStarted: atomic.Bool{},
currentIndex: atomic.Int64{},

statusManager: NewStatusManager(),
statusManager: newStatusManager(),

taskChan: make(chan *BasicTask),
resultChans: []chan *BasicTask{},
taskChan: make(chan *basicTask),
resultChans: []chan *basicTask{},

resultFilePath: "result.json",
statusFilePath: "status.json",
Expand All @@ -90,7 +87,7 @@ func New(options ...SchedulerOption) *Scheduler {
svr.recorderWg.Add(3)
chanRecorder(svr.resultFilePath, svr.ResultChan(), svr.recorderWg)
chanRecorder(svr.statusFilePath, svr.StatusChan(), svr.recorderWg)
metadataChan := make(chan Metadata)
metadataChan := make(chan schedulerMetadata)
chanRecorder(svr.metadataFilePath, metadataChan, svr.recorderWg)
metadataChan <- svr.metadata
close(metadataChan)
Expand All @@ -102,7 +99,7 @@ func New(options ...SchedulerOption) *Scheduler {
}

// SetNumShards sets the number of shards, default is 1 which means no sharding
func WithNumShards(numShards int64) SchedulerOption {
func WithNumShards(numShards int64) schedulerOption {
return func(s *Scheduler) error {
if numShards <= 0 {
return fmt.Errorf("numShards must be greater than 0")
Expand All @@ -113,7 +110,7 @@ func WithNumShards(numShards int64) SchedulerOption {
}

// SetShard sets the shard (from 0 to NumShards-1)
func WithShard(shard int64) SchedulerOption {
func WithShard(shard int64) schedulerOption {
return func(s *Scheduler) error {
if shard < 0 || shard >= s.numShards {
return fmt.Errorf("shard must be in [0, NumShards)")
Expand All @@ -124,7 +121,7 @@ func WithShard(shard int64) SchedulerOption {
}

// SetNumWorkers sets the number of workers
func WithNumWorkers(numWorkers int) SchedulerOption {
func WithNumWorkers(numWorkers int) schedulerOption {
return func(s *Scheduler) error {
if numWorkers <= 0 {
return fmt.Errorf("numWorkers must be greater than 0")
Expand All @@ -135,7 +132,7 @@ func WithNumWorkers(numWorkers int) SchedulerOption {
}

// SetMaxRetries sets the max retries
func WithMaxRetries(maxRetries int) SchedulerOption {
func WithMaxRetries(maxRetries int) schedulerOption {
return func(s *Scheduler) error {
if maxRetries <= 0 {
return fmt.Errorf("maxRetries must be greater than 0")
Expand All @@ -146,7 +143,7 @@ func WithMaxRetries(maxRetries int) SchedulerOption {
}

// SetMaxRuntimePerTaskSeconds sets the max runtime per task seconds
func WithMaxRuntimePerTaskSeconds(maxRuntimePerTaskSeconds int) SchedulerOption {
func WithMaxRuntimePerTaskSeconds(maxRuntimePerTaskSeconds int) schedulerOption {
return func(s *Scheduler) error {
if maxRuntimePerTaskSeconds <= 0 {
return fmt.Errorf("maxRuntimePerTaskSeconds must be greater than 0")
Expand All @@ -157,7 +154,7 @@ func WithMaxRuntimePerTaskSeconds(maxRuntimePerTaskSeconds int) SchedulerOption
}

// WithTotalTasks sets the total number of tasks, and calculates the number of tasks for this shard
func WithTotalTasks(numTotalTasks int64) SchedulerOption {
func WithTotalTasks(numTotalTasks int64) schedulerOption {
return func(s *Scheduler) error {
// Check if NumShards is set and is greater than 0
if s.numShards <= 0 {
Expand Down Expand Up @@ -187,30 +184,30 @@ func WithTotalTasks(numTotalTasks int64) SchedulerOption {
}

// AddMetadata adds metadata
func WithMetadata(key string, value interface{}) SchedulerOption {
func WithMetadata(key string, value interface{}) schedulerOption {
return func(s *Scheduler) error {
s.metadata[key] = value
return nil
}
}

// WithResultFilePath sets the file path for results
func WithResultFilePath(path string) SchedulerOption {
func WithResultFilePath(path string) schedulerOption {
return func(s *Scheduler) error {
s.resultFilePath = path
return nil
}
}

// WithStatusFilePath sets the file path for status
func WithStatusFilePath(path string) SchedulerOption {
func WithStatusFilePath(path string) schedulerOption {
return func(s *Scheduler) error {
s.statusFilePath = path
return nil
}
}

func WithPrometheusPushGateway(url string, job string) SchedulerOption {
func WithPrometheusPushGateway(url string, job string) schedulerOption {
return func(s *Scheduler) error {
s.prometheusPushGatewayUrl = url
s.prometheusPushGatewayJob = job
Expand All @@ -219,16 +216,16 @@ func WithPrometheusPushGateway(url string, job string) SchedulerOption {
}

// WithMetadataFilePath sets the file path for metadata
func WithMetadataFilePath(path string) SchedulerOption {
func WithMetadataFilePath(path string) schedulerOption {
return func(s *Scheduler) error {
s.metadataFilePath = path
return nil
}
}

// chanRecorder records the channel to a given file path
func chanRecorder[T *BasicTask | Status | Metadata](path string, ch <-chan T, wg *sync.WaitGroup) {
fd, err := openFile(path)
func chanRecorder[T *basicTask | Status | schedulerMetadata](path string, ch <-chan T, wg *sync.WaitGroup) {
fd, err := utils.OpenFile(path)
if err != nil {
slog.Error("error occured while opening file", slog.String("path", path), slog.String("error", err.Error()))
return
Expand All @@ -248,8 +245,8 @@ func chanRecorder[T *BasicTask | Status | Metadata](path string, ch <-chan T, wg
// ResultChan returns a newly created channel to receive results
// Everytime ResultChan is called, a new channel is created, and the results are written to all channels
// This is useful for multiple consumers (e.g. writing to multiple files)
func (s *Scheduler) ResultChan() <-chan *BasicTask {
c := make(chan *BasicTask)
func (s *Scheduler) ResultChan() <-chan *basicTask {
c := make(chan *basicTask)
s.resultChans = append(s.resultChans, c)
return c
}
Expand All @@ -273,7 +270,7 @@ func (s *Scheduler) Submit(task Task) {
index := s.currentIndex.Load()
if (index % s.numShards) == s.shard {
s.taskWg.Add(1)
s.taskChan <- NewBasicTask(index, s.id, task)
s.taskChan <- newBasicTask(index, s.id, task)
}
s.currentIndex.Add(1)
}
Expand Down Expand Up @@ -340,20 +337,3 @@ func (s *Scheduler) Worker() {
s.taskWg.Done()
}
}

func openFile(path string) (io.WriteCloser, error) {
switch path {
case "-":
return utils.DiscardCloser{Writer: os.Stdout}, nil
case "":
return utils.DiscardCloser{Writer: io.Discard}, nil
default:
// Create folder
dir := filepath.Dir(path)
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, err
}
// Open file
return os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
}
}
24 changes: 12 additions & 12 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,41 +12,41 @@ import (
"github.com/WangYihang/gojob"
)

type SafeWriter struct {
type safeWriter struct {
writer *strings.Builder
lock sync.Mutex
}

func NewSafeWriter() *SafeWriter {
return &SafeWriter{
func newSafeWriter() *safeWriter {
return &safeWriter{
writer: new(strings.Builder),
lock: sync.Mutex{},
}
}

func (sw *SafeWriter) WriteString(s string) {
func (sw *safeWriter) WriteString(s string) {
sw.lock.Lock()
defer sw.lock.Unlock()
sw.writer.WriteString(s)
}

func (sw *SafeWriter) String() string {
func (sw *safeWriter) String() string {
return sw.writer.String()
}

type Task struct {
type schedulerTestTask struct {
I int
writer *SafeWriter
writer *safeWriter
}

func NewTask(i int, writer *SafeWriter) *Task {
return &Task{
func newTask(i int, writer *safeWriter) *schedulerTestTask {
return &schedulerTestTask{
I: i,
writer: writer,
}
}

func (t *Task) Do() error {
func (t *schedulerTestTask) Do() error {
t.writer.WriteString(fmt.Sprintf("%d\n", t.I))
return nil
}
Expand Down Expand Up @@ -84,14 +84,14 @@ func TestSharding(t *testing.T) {
},
}
for _, tc := range testcases {
safeWriter := NewSafeWriter()
safeWriter := newSafeWriter()
scheduler := gojob.New(
gojob.WithNumShards(tc.numShards),
gojob.WithShard(tc.shard),
gojob.WithResultFilePath(""),
).Start()
for i := 0; i < 16; i++ {
scheduler.Submit(NewTask(i, safeWriter))
scheduler.Submit(newTask(i, safeWriter))
}
scheduler.Wait()
output := safeWriter.String()
Expand Down
Loading

0 comments on commit 89b7b6f

Please sign in to comment.