Skip to content

Commit

Permalink
Revert "debugging the bug"
Browse files Browse the repository at this point in the history
This reverts commit 605e3a6.
  • Loading branch information
sanyamsinghal committed Dec 17, 2024
1 parent 431ed80 commit fb5079b
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 66 deletions.
2 changes: 0 additions & 2 deletions yb-voyager/cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,6 @@ func CleanupChildProcesses() {
}
}
PrintElapsedDuration()
utils.PrintAndLog("cleaned up child processes, returning...")
}

// this function wait for process to exit after signalling it to stop
Expand Down Expand Up @@ -1543,7 +1542,6 @@ func PackAndSendCallhomePayloadOnExit() {
case importDataFileCmd.CommandPath():
packAndSendImportDataFilePayload(EXIT)
}
utils.PrintAndLog("returning from PackAndSendCallhomePayloadOnExit() ...")
}

func updateExportSnapshotDataStatsInPayload(exportDataPayload *callhome.ExportDataPhasePayload) {
Expand Down
10 changes: 2 additions & 8 deletions yb-voyager/cmd/exportData.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ func exportDataCommandFn(cmd *cobra.Command, args []string) {
sendPayloadAsPerExporterRole(ERROR)
atexit.Exit(1)
}
utils.PrintAndLog("returned from exportDataCommandFn...")
}

func sendPayloadAsPerExporterRole(status string) {
Expand Down Expand Up @@ -255,13 +254,7 @@ func exportData() bool {
}

ctx, cancel := context.WithCancel(context.Background())
defer func() {
cancel()
utils.PrintAndLog("waiting for the go routines to exit...")
utils.WaitGroup.Wait()
time.Sleep(25 * time.Second)
utils.PrintAndLog("DONE waiting for the go routines to exit...")
}()
defer cancel()
var partitionsToRootTableMap map[string]string
// get initial table list
partitionsToRootTableMap, finalTableList := getInitialTableList()
Expand Down Expand Up @@ -340,6 +333,7 @@ func exportData() bool {
utils.PrintAndLog("table list for data export: %v", tableListToDisplay)

//finalTableList is with leaf partitions and root tables after this in the whole export flow to make all the catalog queries work fine

if changeStreamingIsEnabled(exportType) || useDebezium {
exportPhase = dbzm.MODE_SNAPSHOT
config, tableNametoApproxRowCountMap, err := prepareDebeziumConfig(partitionsToRootTableMap, finalTableList, tablesColumnList, leafPartitions)
Expand Down
86 changes: 35 additions & 51 deletions yb-voyager/cmd/exportDataDebezium.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func debeziumExportData(ctx context.Context, config *dbzm.Config, tableNameToApp
}
progressTracker.UpdateProgress(status)
if !snapshotComplete {
snapshotComplete, err = checkAndHandleSnapshotComplete(ctx, config, status, progressTracker)
snapshotComplete, err = checkAndHandleSnapshotComplete(config, status, progressTracker)
if err != nil {
return fmt.Errorf("failed to check if snapshot is complete: %w", err)
}
Expand All @@ -407,7 +407,7 @@ func debeziumExportData(ctx context.Context, config *dbzm.Config, tableNameToApp
if err != nil {
return fmt.Errorf("failed to read export status: %w", err)
}
snapshotComplete, err = checkAndHandleSnapshotComplete(ctx, config, status, progressTracker)
snapshotComplete, err = checkAndHandleSnapshotComplete(config, status, progressTracker)
if !snapshotComplete || err != nil {
return fmt.Errorf("snapshot was not completed: %w", err)
}
Expand All @@ -417,9 +417,7 @@ func debeziumExportData(ctx context.Context, config *dbzm.Config, tableNameToApp
return nil
}

func reportStreamingProgress(ctx context.Context) {
utils.WaitGroup.Add(1)
defer utils.WaitGroup.Done()
func reportStreamingProgress() {
tableWriter := uilive.New()
headerWriter := tableWriter.Newline()
separatorWriter := tableWriter.Newline()
Expand All @@ -429,61 +427,47 @@ func reportStreamingProgress(ctx context.Context) {
row4Writer := tableWriter.Newline()
footerWriter := tableWriter.Newline()
tableWriter.Start()
// defer tableWriter.Stop()
defer fmt.Printf("returning from streamingprogress uilive\n")
for {
select {
case <-ctx.Done():
utils.PrintAndLog("cancel context called, returning reportStreamingProgress()...")
tableWriter.Stop()
return
default:
fmt.Fprint(tableWriter, color.GreenString("| %-40s | %30s |\n", "---------------------------------------", "-----------------------------"))
fmt.Fprint(headerWriter, color.GreenString("| %-40s | %30s |\n", "Metric", "Value"))
fmt.Fprint(separatorWriter, color.GreenString("| %-40s | %30s |\n", "---------------------------------------", "-----------------------------"))
fmt.Fprint(row1Writer, color.GreenString("| %-40s | %30s |\n", "Total Exported Events", strconv.FormatInt(totalEventCount, 10)))
fmt.Fprint(row2Writer, color.GreenString("| %-40s | %30s |\n", "Total Exported Events (Current Run)", strconv.FormatInt(totalEventCountRun, 10)))
fmt.Fprint(row3Writer, color.GreenString("| %-40s | %30s |\n", "Export Rate(Last 3 min)", strconv.FormatInt(throughputInLast3Min, 10)+"/sec"))
fmt.Fprint(row4Writer, color.GreenString("| %-40s | %30s |\n", "Export Rate(Last 10 min)", strconv.FormatInt(throughputInLast10Min, 10)+"/sec"))
fmt.Fprint(footerWriter, color.GreenString("| %-40s | %30s |\n", "---------------------------------------", "-----------------------------"))
tableWriter.Flush()
time.Sleep(10 * time.Second)
}
fmt.Fprint(tableWriter, color.GreenString("| %-40s | %30s |\n", "---------------------------------------", "-----------------------------"))
fmt.Fprint(headerWriter, color.GreenString("| %-40s | %30s |\n", "Metric", "Value"))
fmt.Fprint(separatorWriter, color.GreenString("| %-40s | %30s |\n", "---------------------------------------", "-----------------------------"))
fmt.Fprint(row1Writer, color.GreenString("| %-40s | %30s |\n", "Total Exported Events", strconv.FormatInt(totalEventCount, 10)))
fmt.Fprint(row2Writer, color.GreenString("| %-40s | %30s |\n", "Total Exported Events (Current Run)", strconv.FormatInt(totalEventCountRun, 10)))
fmt.Fprint(row3Writer, color.GreenString("| %-40s | %30s |\n", "Export Rate(Last 3 min)", strconv.FormatInt(throughputInLast3Min, 10)+"/sec"))
fmt.Fprint(row4Writer, color.GreenString("| %-40s | %30s |\n", "Export Rate(Last 10 min)", strconv.FormatInt(throughputInLast10Min, 10)+"/sec"))
fmt.Fprint(footerWriter, color.GreenString("| %-40s | %30s |\n", "---------------------------------------", "-----------------------------"))
tableWriter.Flush()
time.Sleep(10 * time.Second)
}
}

func calculateStreamingProgress(ctx context.Context) {
func calculateStreamingProgress() {
var err error
for {
select {
case <-ctx.Done():
utils.PrintAndLog("cancel context called, returning calculateStreamingProgress()...")
return
default:
totalEventCount, totalEventCountRun, err = metaDB.GetTotalExportedEventsByExporterRole(exporterRole, runId)
if err != nil {
utils.ErrExit("failed to get total exported count from metadb: %w", err)
}
totalEventCount, totalEventCountRun, err = metaDB.GetTotalExportedEventsByExporterRole(exporterRole, runId)
if err != nil {
utils.ErrExit("failed to get total exported count from metadb: %w", err)
}

throughputInLast3Min, err = metaDB.GetExportedEventsRateInLastNMinutes(runId, 3)
if err != nil {
utils.ErrExit("failed to get export rate from metadb: %w", err)
}
throughputInLast10Min, err = metaDB.GetExportedEventsRateInLastNMinutes(runId, 10)
if err != nil {
utils.ErrExit("failed to get export rate from metadb: %w", err)
}
if disablePb && callhome.SendDiagnostics {
// to not do unneccessary frequent calls to metadb in case we only require this info for callhome
time.Sleep(12 * time.Minute)
} else {
time.Sleep(10 * time.Second)
}
throughputInLast3Min, err = metaDB.GetExportedEventsRateInLastNMinutes(runId, 3)
if err != nil {
utils.ErrExit("failed to get export rate from metadb: %w", err)
}
throughputInLast10Min, err = metaDB.GetExportedEventsRateInLastNMinutes(runId, 10)
if err != nil {
utils.ErrExit("failed to get export rate from metadb: %w", err)
}
if disablePb && callhome.SendDiagnostics {
// to not do unneccessary frequent calls to metadb in case we only require this info for callhome
time.Sleep(12 * time.Minute)
} else {
time.Sleep(10 * time.Second)
}
}

}

func checkAndHandleSnapshotComplete(ctx context.Context, config *dbzm.Config, status *dbzm.ExportStatus, progressTracker *ProgressTracker) (bool, error) {
func checkAndHandleSnapshotComplete(config *dbzm.Config, status *dbzm.ExportStatus, progressTracker *ProgressTracker) (bool, error) {
if !status.SnapshotExportIsComplete() {
return false, nil
}
Expand Down Expand Up @@ -535,10 +519,10 @@ func checkAndHandleSnapshotComplete(ctx context.Context, config *dbzm.Config, st
}
color.Blue("streaming changes to a local queue file...")
if !disablePb || callhome.SendDiagnostics {
go calculateStreamingProgress(ctx)
go calculateStreamingProgress()
}
if !disablePb {
go reportStreamingProgress(ctx)
go reportStreamingProgress()
}
}
return true, nil
Expand Down
1 change: 0 additions & 1 deletion yb-voyager/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ func shouldRunPersistentPreRun(cmd *cobra.Command) bool {
// This is called by main.main(). It only needs to happen once to the rootCmd.
func Execute() {
cobra.CheckErr(rootCmd.Execute())
utils.PrintAndLog("returning from Execute of cobra command>>>>>>>>>>>>\n")
}

func init() {
Expand Down
4 changes: 0 additions & 4 deletions yb-voyager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,11 @@ func main() {
atexit.Register(cmd.CleanupChildProcesses)
atexit.Register(restoreTerminalState) // ensure terminal is always restored
cmd.Execute()
utils.PrintAndLog("preparing1 to return from main function>>>>>>>")
cmd.PrintElapsedDuration()
utils.PrintAndLog("preparing2 to return from main function>>>>>>>")
if cmd.ProcessShutdownRequested {
utils.PrintAndLog("waiting for exit handlers to complete the cleanup")
time.Sleep(time.Second * 120) // using here larger value than what we have for debezium(100sec)
}
utils.PrintAndLog("returning from main function>>>>>>>")
}

func registerSignalHandlers() {
Expand Down Expand Up @@ -96,5 +93,4 @@ func restoreTerminalState() {
log.Errorf("error restoring terminal: %v\n", err)
}
}
utils.PrintAndLog("restored terminal state, returning...")
}

0 comments on commit fb5079b

Please sign in to comment.