Skip to content

Commit

Permalink
Don't restart scanner renewer on every Next call
Browse files Browse the repository at this point in the history
Most calls to Next are just reading some buffered results. The scanner
renewer only needs to be stopped and started again when we ask for
more results from HBase. Also, we shouldn't start a renewer if we know
we have already fetched the final results.
  • Loading branch information
aaronbee committed Dec 30, 2024
1 parent 7abcffa commit e21effc
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 31 deletions.
3 changes: 2 additions & 1 deletion integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2690,7 +2690,8 @@ func TestScannerTimeout(t *testing.T) {
"org.apache.hadoop.hbase.UnknownScannerException") {
fmt.Println("Error matches: UnknownScannerException")
} else {
t.Fatalf("Error does not match org.apache.hadoop.hbase.UnknownScannerException")
t.Fatalf("Error does not match org.apache.hadoop.hbase.UnknownScannerException, "+
"got: %v", err)
}
}

Expand Down
56 changes: 26 additions & 30 deletions scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,24 +75,35 @@ func (s *scanner) fetch() ([]*pb.Result, error) {
}

func (s *scanner) peek() (*pb.Result, error) {
if len(s.results) == 0 {
var (
err error
rs []*pb.Result
)
if s.closed {
// done scanning
return nil, io.EOF
}
if len(s.results) > 0 {
return s.results[0], nil
}

rs, err = s.fetch()
if err != nil {
return nil, err
}
if s.closed {
// done scanning
return nil, io.EOF
}

// fetch cannot return zero results
s.results = rs
if s.renewCancel != nil {
// About to send new Scan request to HBase, cancel our
// renewer.
s.renewCancel()
s.renewCancel = nil
}

rs, err := s.fetch()
if err != nil {
return nil, err
}
if !s.closed && s.rpc.RenewInterval() > 0 {
// Start up a renewer
renewCtx, cancel := context.WithCancel(s.rpc.Context())
s.renewCancel = cancel
go s.renewLoop(renewCtx, s.startRow)
}

// fetch cannot return zero results
s.results = rs
return s.results[0], nil
}

Expand Down Expand Up @@ -154,21 +165,6 @@ func toLocalResult(r *pb.Result) *hrpc.Result {
}

func (s *scanner) Next() (*hrpc.Result, error) {
if s.rpc.RenewInterval() > 0 && s.renewCancel != nil {
s.renewCancel()
}

res, err := s.nextInternal()

if err == nil && s.rpc.RenewInterval() > 0 {
renewCtx, cancel := context.WithCancel(s.rpc.Context())
s.renewCancel = cancel
go s.renewLoop(renewCtx, s.startRow)
}
return res, err
}

func (s *scanner) nextInternal() (*hrpc.Result, error) {
var (
result, partial *pb.Result
err error
Expand Down

0 comments on commit e21effc

Please sign in to comment.