diff --git a/cmd/demoserver/kvdatamake.go b/cmd/demoserver/kvdatamake.go index 38ae4da9..e3dc02e2 100644 --- a/cmd/demoserver/kvdatamake.go +++ b/cmd/demoserver/kvdatamake.go @@ -66,7 +66,7 @@ func main() { fmt.Printf("the third parameter should be an interger") } } - generateData(folder, recorderNum) + go generateData(folder, recorderNum) } else { fmt.Printf("the args should be : dir [100000]") } @@ -200,7 +200,7 @@ func (s *DataRWServer) ReadLines(req *pb.ReadLinesRequest, stream pb.DataRWServi log.L().Info("receive the request for reading file ") file, err := os.OpenFile(fileName, os.O_RDONLY, 0o666) if err != nil { - log.L().Info("make sure the file exist ", + log.L().Error("make sure the file exist ", zap.String("fileName ", fileName)) fmt.Printf("open file %v failed \n", fileName) return err @@ -261,7 +261,7 @@ func (s *DataRWServer) WriteLines(stream pb.DataRWService_WriteLinesServer) erro } else { index := strings.LastIndex(fileName, "/") if index <= 1 { - log.L().Info("bad file name ", + log.L().Error("bad file name ", zap.Int("index ", index)) return &ErrorInfo{info: " bad file name :" + fileName} } @@ -277,7 +277,7 @@ func (s *DataRWServer) WriteLines(stream pb.DataRWService_WriteLinesServer) erro if err != nil { return &ErrorInfo{info: "create the folder " + folder + " failed"} } - log.L().Info("create the folder ", + log.L().Error("create the folder ", zap.String("folder ", folder)) } // create the file @@ -299,12 +299,12 @@ func (s *DataRWServer) WriteLines(stream pb.DataRWService_WriteLinesServer) erro err = writer.Flush() } if err != nil { - log.L().Info("write data failed ", + log.L().Error("write data failed ", zap.String("error ", err.Error())) } } else if err == io.EOF { - fmt.Printf("receive the eof %v \n", res.Key) + log.L().Info("receive the eof") s.mu.Lock() for _, w := range s.fileWriterMap { w.Flush() diff --git a/executor/cvsTask/cvstask.go b/executor/cvsTask/cvstask.go index 5c1508a8..82202512 100644 --- a/executor/cvsTask/cvstask.go +++ b/executor/cvsTask/cvstask.go @@ -79,18 +79,19 @@ func NewCvsTask(ctx *dcontext.Context, _workerID lib.WorkerID, masterID lib.Mast func (task *cvsTask) InitImpl(ctx context.Context) error { log.L().Info("init the task ", zap.Any("task id :", task.ID())) + task.status = lib.WorkerStatusNormal ctx, task.cancelFn = context.WithCancel(ctx) go func() { err := task.Receive(ctx) if err != nil { - log.L().Info("error happened when reading data from the upstream ", zap.Any("message", err.Error())) + log.L().Error("error happened when reading data from the upstream ", zap.Any("message", err.Error())) task.status = lib.WorkerStatusError } }() go func() { err := task.Send(ctx) if err != nil { - log.L().Info("error happened when writing data to the downstream ", zap.Any("message", err.Error())) + log.L().Error("error happened when writing data to the downstream ", zap.Any("message", err.Error())) task.status = lib.WorkerStatusError } }() @@ -128,20 +129,20 @@ func (task *cvsTask) CloseImpl(ctx context.Context) error { func (task *cvsTask) Receive(ctx context.Context) error { conn, err := grpc.Dial(task.srcHost, grpc.WithInsecure()) if err != nil { - log.L().Info("cann't connect with the source address ", zap.Any("message", task.srcHost)) + log.L().Error("cann't connect with the source address ", zap.Any("message", task.srcHost)) return err } client := pb.NewDataRWServiceClient(conn) defer conn.Close() reader, err := client.ReadLines(ctx, &pb.ReadLinesRequest{FileName: task.srcDir, LineNo: task.index}) if err != nil { - log.L().Info("read data from file failed ", zap.Any("message", task.srcDir)) + log.L().Error("read data from file failed ", zap.Any("message", task.srcDir)) return err } for { reply, err := reader.Recv() if err != nil { - log.L().Info("read data failed", zap.Any("error:", err.Error())) + log.L().Error("read data failed", zap.Error(err)) if !task.isEOF { task.cancelFn() } @@ -149,7 +150,7 @@ func (task *cvsTask) Receive(ctx context.Context) error { } if reply.IsEof { log.L().Info("Reach the end of the file ", zap.Any("fileName:", task.srcDir)) - task.isEOF = true + close(task.buffer) break } strs := strings.Split(reply.Linestr, ",") @@ -161,7 +162,8 @@ func (task *cvsTask) Receive(ctx context.Context) error { return nil case task.buffer <- strPair{firstStr: strs[0], secondStr: strs[1]}: } - time.Sleep(time.Microsecond * 10) + // waiting longer time to read lines slowly + time.Sleep(time.Millisecond * 10) } return nil } @@ -169,25 +171,31 @@ func (task *cvsTask) Receive(ctx context.Context) error { func (task *cvsTask) Send(ctx context.Context) error { conn, err := grpc.Dial(task.dstHost, grpc.WithInsecure()) if err != nil { - log.L().Info("cann't connect with the destination address ", zap.Any("message", task.dstHost)) + log.L().Error("cann't connect with the destination address ", zap.Any("message", task.dstHost)) return err } client := pb.NewDataRWServiceClient(conn) defer conn.Close() writer, err := client.WriteLines(ctx) if err != nil { - log.L().Info("call write data rpc failed ") + log.L().Error("call write data rpc failed", zap.Error(err)) task.status = lib.WorkerStatusError task.cancelFn() return err } for { select { - case kv := <-task.buffer: + case kv, more := <-task.buffer: + if !more { + log.L().Info("Reach the end of the file ") + task.status = lib.WorkerStatusFinished + _, err = writer.CloseAndRecv() + return err + } err := writer.Send(&pb.WriteLinesRequest{FileName: task.dstDir, Key: kv.firstStr, Value: kv.secondStr}) task.counter++ if err != nil { - log.L().Info("call write data rpc failed ") + log.L().Error("call write data rpc failed ", zap.Error(err)) task.status = lib.WorkerStatusError task.cancelFn() return err @@ -195,14 +203,6 @@ func (task *cvsTask) Send(ctx context.Context) error { case <-ctx.Done(): task.status = lib.WorkerStatusError return nil - default: - if task.isEOF { - log.L().Info("Reach the end of the file ") - task.status = lib.WorkerStatusFinished - err = writer.CloseSend() - return err - } - time.Sleep(time.Second) } } diff --git a/executor/server.go b/executor/server.go index 894d521d..ed978515 100644 --- a/executor/server.go +++ b/executor/server.go @@ -400,7 +400,7 @@ func (s *Server) selfRegister(ctx context.Context) (err error) { s.info = &model.NodeInfo{ Type: model.NodeTypeExecutor, ID: model.ExecutorID(resp.ExecutorId), - Addr: s.cfg.WorkerAddr, + Addr: s.cfg.AdvertiseAddr, } log.L().Logger.Info("register successful", zap.Any("info", s.info)) return nil diff --git a/executor/worker/internal/runnables.go b/executor/worker/internal/runnables.go index f8a2720a..36f362ff 100644 --- a/executor/worker/internal/runnables.go +++ b/executor/worker/internal/runnables.go @@ -61,7 +61,7 @@ func (c *RunnableContainer) OnInitialized() { func (c *RunnableContainer) OnStopped() { oldStatus := c.status.Swap(TaskClosing) - if oldStatus != TaskRunning { + if oldStatus != TaskRunning && oldStatus != TaskSubmitted { log.L().Panic("unexpected status", zap.Int32("status", oldStatus)) } } diff --git a/jobmaster/cvsJob/cvsJobMaster.go b/jobmaster/cvsJob/cvsJobMaster.go index 4ecb0be9..aa930edd 100644 --- a/jobmaster/cvsJob/cvsJobMaster.go +++ b/jobmaster/cvsJob/cvsJobMaster.go @@ -39,7 +39,7 @@ func (e *errorInfo) Error() string { } type JobMaster struct { - lib.BaseJobMaster + lib.BaseMaster syncInfo *Config syncFilesInfo map[lib.WorkerID]*workerInfo counter int64 @@ -54,18 +54,16 @@ func init() { registry.GlobalWorkerRegistry().MustRegisterWorkerType(lib.CvsJobMaster, factory) } -func NewCVSJobMaster(ctx *dcontext.Context, workerID lib.WorkerID, masterID lib.MasterID, conf lib.WorkerConfig) *JobMaster { +func NewCVSJobMaster(ctx *dcontext.Context, workerID lib.WorkerID, _ lib.MasterID, conf lib.WorkerConfig) *JobMaster { jm := &JobMaster{} jm.workerID = workerID jm.syncInfo = conf.(*Config) jm.syncFilesInfo = make(map[lib.WorkerID]*workerInfo) deps := ctx.Dependencies - base := lib.NewBaseJobMaster( + base := lib.NewBaseMaster( ctx, jm, - jm, - masterID, workerID, deps.MessageHandlerManager, deps.MessageRouter, @@ -73,7 +71,7 @@ func NewCVSJobMaster(ctx *dcontext.Context, workerID lib.WorkerID, masterID lib. deps.ExecutorClientManager, deps.ServerMasterClient, ) - jm.BaseJobMaster = base + jm.BaseMaster = base log.L().Info("new cvs jobmaster ", zap.Any("id :", jm.workerID)) return jm } @@ -115,20 +113,19 @@ func (jm *JobMaster) Tick(ctx context.Context) error { } status := worker.handle.Status() if status.Code == lib.WorkerStatusNormal { - num, ok := status.Ext.(float64) - if ok { - worker.curLoc = int64(num) - jm.counter += int64(num) - // todo : store the sync progress into the meta store for each file - } + num := status.Ext.(int64) + worker.curLoc = num + jm.counter += num + log.L().Info("cvs job tmp num ", zap.Any("id :", worker.handle.ID()), zap.Int64("counter: ", num)) + // todo : store the sync progress into the meta store for each file } else if status.Code == lib.WorkerStatusFinished { // todo : handle error case here log.L().Info("sync file finished ", zap.Any("message", worker.file)) } else if status.Code == lib.WorkerStatusError { - log.L().Info("sync file failed ", zap.Any("message", worker.file)) + log.L().Error("sync file failed ", zap.Any("message", worker.file)) } } - log.L().Info("cvs job master status ", zap.Any("id :", jm.workerID), zap.Any(" ,counter: ", jm.counter)) + log.L().Info("cvs job master status ", zap.Any("id :", jm.workerID), zap.Int64("counter: ", jm.counter)) return nil } @@ -144,8 +141,7 @@ func (jm *JobMaster) OnWorkerOnline(worker lib.WorkerHandle) error { // todo : add the worker information to the sync files map syncInfo, exist := jm.syncFilesInfo[worker.ID()] if !exist { - log.L().Info("bad worker found", zap.Any("message", worker.ID())) - panic(errorInfo{info: "bad worker "}) + log.L().Panic("bad worker found", zap.Any("message", worker.ID())) } else { log.L().Info("worker online ", zap.Any("fileName", syncInfo.file)) } @@ -195,7 +191,7 @@ func (jm *JobMaster) OnMasterFailover(reason lib.MasterFailoverReason) error { } func (jm *JobMaster) Status() lib.WorkerStatus { - return lib.WorkerStatus{Code: lib.WorkerStatusNormal} + return lib.WorkerStatus{Code: lib.WorkerStatusNormal, Ext: jm.counter} } func (jm *JobMaster) listSrcFiles(ctx context.Context) ([]string, error) {