diff --git a/client.go b/client.go index 7e92511..a259b14 100644 --- a/client.go +++ b/client.go @@ -327,7 +327,7 @@ func (c *client) Close() { } func (c *client) Scan(s *hrpc.Scan) hrpc.Scanner { - return newScanner(c, s) + return newScanner(c, s, c.logger) } func (c *client) Get(g *hrpc.Get) (*hrpc.Result, error) { diff --git a/integration_test.go b/integration_test.go index b64d596..b7dcd23 100644 --- a/integration_test.go +++ b/integration_test.go @@ -2681,16 +2681,17 @@ func TestScannerTimeout(t *testing.T) { } // force lease timeout - time.Sleep(scannerLease) + time.Sleep(scannerLease * 2) _, err = scanner.Next() // lease timeout should return an UnknownScannerException if err != nil && strings.Contains(err.Error(), "org.apache.hadoop.hbase.UnknownScannerException") { - fmt.Println("Error matches: UnknownScannerException") + t.Log("Error matches: UnknownScannerException") } else { - t.Fatalf("Error does not match org.apache.hadoop.hbase.UnknownScannerException") + t.Fatalf("Error does not match org.apache.hadoop.hbase.UnknownScannerException, "+ + "got: %v", err) } } @@ -2730,8 +2731,6 @@ func TestScannerRenewal(t *testing.T) { defer scanner.Close() for i := 0; i < numRows; i++ { rsp, err := scanner.Next() - // Sleep to trigger renewal - time.Sleep(scannerLease) if err != nil { t.Fatalf("Scanner.Next() returned error: %v", err) } @@ -2742,6 +2741,8 @@ func TestScannerRenewal(t *testing.T) { if !bytes.Equal(rsp.Cells[0].Value, expectedValue) { t.Fatalf("Unexpected value. Got %v, want %v", rsp.Cells[0].Value, expectedValue) } + // Sleep to trigger renewal + time.Sleep(scannerLease * 2) } // Ensure scanner is exhausted rsp, err := scanner.Next() diff --git a/prometheus.go b/prometheus.go index 76ef540..62eb1ea 100644 --- a/prometheus.go +++ b/prometheus.go @@ -53,4 +53,12 @@ var ( 4.096, 8.192, 13.192, 18.192, 23.192, 28.192, 33.192, }, }) + + scanRenewers = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "gohbase", + Subsystem: "scanner", + Name: "renewer_count", + Help: "Number of active scanner renewers. " + + "A continually increasing value indicates an Scanner leak.", + }) ) diff --git a/scanner.go b/scanner.go index d3fbc90..d1c445b 100644 --- a/scanner.go +++ b/scanner.go @@ -75,24 +75,35 @@ func (s *scanner) fetch() ([]*pb.Result, error) { } func (s *scanner) peek() (*pb.Result, error) { - if len(s.results) == 0 { - var ( - err error - rs []*pb.Result - ) - if s.closed { - // done scanning - return nil, io.EOF - } + if len(s.results) > 0 { + return s.results[0], nil + } - rs, err = s.fetch() - if err != nil { - return nil, err - } + if s.renewCancel != nil { + // About to send new Scan request to HBase, cancel our + // renewer. + s.renewCancel() + s.renewCancel = nil + } - // fetch cannot return zero results - s.results = rs + if s.closed { + // done scanning + return nil, io.EOF + } + + rs, err := s.fetch() + if err != nil { + return nil, err } + if !s.closed && s.rpc.RenewInterval() > 0 { + // Start up a renewer + renewCtx, cancel := context.WithCancel(s.rpc.Context()) + s.renewCancel = cancel + go s.renewLoop(renewCtx, s.startRow) + } + + // fetch cannot return zero results + s.results = rs return s.results[0], nil } @@ -130,7 +141,7 @@ func (s *scanner) coalesce(result, partial *pb.Result) (*pb.Result, bool) { return result, true } -func newScanner(c RPCClient, rpc *hrpc.Scan) *scanner { +func newScanner(c RPCClient, rpc *hrpc.Scan, logger *slog.Logger) *scanner { var sm map[string]int64 if rpc.TrackScanMetrics() { sm = make(map[string]int64) @@ -141,7 +152,7 @@ func newScanner(c RPCClient, rpc *hrpc.Scan) *scanner { startRow: rpc.StartRow(), curRegionScannerID: noScannerID, scanMetrics: sm, - logger: slog.Default(), + logger: logger, } } @@ -154,21 +165,6 @@ func toLocalResult(r *pb.Result) *hrpc.Result { } func (s *scanner) Next() (*hrpc.Result, error) { - if s.rpc.RenewInterval() > 0 && s.renewCancel != nil { - s.renewCancel() - } - - res, err := s.nextInternal() - - if err == nil && s.rpc.RenewInterval() > 0 { - renewCtx, cancel := context.WithCancel(s.rpc.Context()) - s.renewCancel = cancel - go s.renewLoop(renewCtx, s.startRow) - } - return res, err -} - -func (s *scanner) nextInternal() (*hrpc.Result, error) { var ( result, partial *pb.Result err error @@ -385,11 +381,11 @@ func (s *scanner) closeRegionScanner() { } // renews a scanner by resending scan request with renew = true -func (s *scanner) renew(startRow []byte) error { - if err := s.rpc.Context().Err(); err != nil { +func (s *scanner) renew(ctx context.Context, startRow []byte) error { + if err := ctx.Err(); err != nil { return err } - rpc, err := hrpc.NewScanRange(s.rpc.Context(), + rpc, err := hrpc.NewScanRange(ctx, s.rpc.Table(), startRow, nil, @@ -405,13 +401,17 @@ func (s *scanner) renew(startRow []byte) error { } func (s *scanner) renewLoop(ctx context.Context, startRow []byte) { + scanRenewers.Inc() t := time.NewTicker(s.rpc.RenewInterval()) - defer t.Stop() + defer func() { + t.Stop() + scanRenewers.Dec() + }() for { select { case <-t.C: - if err := s.renew(startRow); err != nil { + if err := s.renew(ctx, startRow); err != nil { s.logger.Error("error renewing scanner", "err", err) return } diff --git a/scanner_test.go b/scanner_test.go index cf07748..018ae3c 100644 --- a/scanner_test.go +++ b/scanner_test.go @@ -10,6 +10,7 @@ import ( "errors" "fmt" "io" + "log/slog" "reflect" "sync" "testing" @@ -120,7 +121,7 @@ func TestScanner(t *testing.T) { } var scannerID uint64 = 42 - scanner := newScanner(c, scan) + scanner := newScanner(c, scan, slog.Default()) s, err := hrpc.NewScanRange(scan.Context(), table, nil, nil, hrpc.NumberOfRows(2)) @@ -412,7 +413,7 @@ func TestScanMetrics(t *testing.T) { t.Fatal(err) } - sc := newScanner(c, scan) + sc := newScanner(c, scan, slog.Default()) c.EXPECT().SendRPC(&scanMatcher{scan: scan}).Return(&pb.ScanResponse{ Results: tcase.results, @@ -488,7 +489,7 @@ func TestErrorFirstFetchNoMetrics(t *testing.T) { if err != nil { t.Fatal(err) } - scanner := newScanner(c, scan) + scanner := newScanner(c, scan, slog.Default()) srange, err := hrpc.NewScanRange(context.Background(), table, nil, nil) if err != nil { @@ -528,7 +529,7 @@ func TestErrorFirstFetchWithMetrics(t *testing.T) { if err != nil { t.Fatal(err) } - scanner := newScanner(c, scan) + scanner := newScanner(c, scan, slog.Default()) srange, err := hrpc.NewScanRange(context.Background(), table, nil, nil, hrpc.TrackScanMetrics()) @@ -570,7 +571,7 @@ func testErrorScanFromID(t *testing.T, scan *hrpc.Scan, out []*hrpc.Result) { defer wg.Wait() var scannerID uint64 = 42 - scanner := newScanner(c, scan) + scanner := newScanner(c, scan, slog.Default()) srange, err := hrpc.NewScanRange(scan.Context(), table, nil, nil, scan.Options()...) if err != nil { @@ -679,7 +680,7 @@ func testPartialResults(t *testing.T, scan *hrpc.Scan, expected []*hrpc.Result) } var scannerID uint64 - scanner := newScanner(c, scan) + scanner := newScanner(c, scan, slog.Default()) ctx := scan.Context() for _, partial := range tcase { partial := partial @@ -736,7 +737,7 @@ func TestReversedScanner(t *testing.T) { var scannerID uint64 = 42 - scanner := newScanner(c, scan) + scanner := newScanner(c, scan, slog.Default()) ctx = scan.Context() s, err := hrpc.NewScanRange(ctx, table, nil, nil, hrpc.Reversed()) if err != nil { @@ -819,7 +820,7 @@ func TestScannerWithContextCanceled(t *testing.T) { t.Fatal(err) } - scanner := newScanner(c, scan) + scanner := newScanner(c, scan, slog.Default()) cancel() @@ -839,7 +840,7 @@ func TestScannerClosed(t *testing.T) { t.Fatal(err) } - scanner := newScanner(c, scan) + scanner := newScanner(c, scan, slog.Default()) scanner.Close() _, err = scanner.Next()