Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 [pagination] Fix stream pagination #518

Merged
merged 2 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/20241114171420.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:bug: `[pagination]` Fix stream pagination
4 changes: 2 additions & 2 deletions utils/collection/pagination/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type IIterator interface {
GetNext() (interface{}, error)
}

// IStaticPage defines a generic page for a collection.
// IStaticPage defines a generic page for a collection. A page is marked as static when it cannot retrieve next pages on its own.
type IStaticPage interface {
// HasNext states whether more pages are accessible.
HasNext() bool
Expand Down Expand Up @@ -78,7 +78,7 @@ type IPaginatorAndPageFetcher interface {
FetchNextPage(ctx context.Context, currentPage IStaticPage) (IStaticPage, error)
}

// IGenericStreamPaginator is an iterator over a stream. A stream is a collection without any know ending.
// IGenericStreamPaginator is an iterator over a stream. A stream is a collection without any known ending.
type IGenericStreamPaginator interface {
IGenericPaginator
// DryUp indicates to the stream that it will soon run out.
Expand Down
45 changes: 23 additions & 22 deletions utils/collection/pagination/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,29 @@ func (s *AbstractStreamPaginator) Close() error {
}

func (s *AbstractStreamPaginator) HasNext() bool {
if s.AbstractPaginator.HasNext() {
s.timeReachLast.Store(time.Now())
return true
}
page, err := s.AbstractPaginator.FetchCurrentPage()
if err != nil {
return false
}
stream, ok := page.(IStaticPageStream)
if !ok {
return false
}
if !stream.HasFuture() {
return false
}
if s.IsRunningDry() {
if time.Since(s.timeReachLast.Load()) >= s.timeOut {
for {
if s.AbstractPaginator.HasNext() {
s.timeReachLast.Store(time.Now())
return true
}
page, err := s.AbstractPaginator.FetchCurrentPage()
if err != nil {
return false
}
stream, ok := page.(IStaticPageStream)
if !ok {
return false
}
if !stream.HasFuture() {
return false
}
if s.IsRunningDry() {
if time.Since(s.timeReachLast.Load()) >= s.timeOut {
return false
}
} else {
s.timeReachLast.Store(time.Now())
}
future, err := s.FetchFuturePage(s.GetContext(), stream)
if err != nil {
return false
Expand All @@ -61,10 +65,9 @@ func (s *AbstractStreamPaginator) HasNext() bool {
if err != nil {
return false
}
} else {
s.timeReachLast.Store(time.Now())

parallelisation.SleepWithContext(s.GetContext(), s.backoff)
}
return s.AbstractPaginator.HasNext()
}

func (s *AbstractStreamPaginator) GetNext() (interface{}, error) {
Expand All @@ -78,9 +81,7 @@ func (s *AbstractStreamPaginator) GetNext() (interface{}, error) {
err = fmt.Errorf("%w: there is not any next item", commonerrors.ErrNotFound)
return nil, err
}

parallelisation.SleepWithContext(s.GetContext(), s.backoff)

}
}

Expand Down
153 changes: 153 additions & 0 deletions utils/collection/pagination/stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package pagination

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/ARM-software/golang-utils/utils/commonerrors"
"github.com/ARM-software/golang-utils/utils/commonerrors/errortest"
)

func TestStreamPaginator(t *testing.T) {
tests := []struct {
paginator func(context.Context, IStaticPageStream) (IGenericStreamPaginator, error)
name string
generateFunc func() (firstPage IStream, itemTotal int64, err error)
dryOut bool
}{
{
paginator: func(ctx context.Context, collection IStaticPageStream) (IGenericStreamPaginator, error) {
paginator, err := NewStaticPageStreamPaginator(ctx, time.Second, 10*time.Millisecond, func(context.Context) (IStaticPageStream, error) {
return collection, nil
}, func(fCtx context.Context, current IStaticPage) (IStaticPage, error) {
c, err := toDynamicPage(current)
if err != nil {
return nil, err
}
return c.GetNext(fCtx)
}, func(fCtx context.Context, current IStaticPageStream) (IStaticPageStream, error) {
s, err := toDynamicStream(current)
if err != nil {
return nil, err
}
return s.GetFuture(fCtx)
})
return paginator, err
},
generateFunc: GenerateMockStreamWithEnding,
name: "stream paginator over a stream of static pages with known ending",
},
{
paginator: func(ctx context.Context, collection IStaticPageStream) (IGenericStreamPaginator, error) {
paginator, err := NewStreamPaginator(ctx, time.Second, 10*time.Millisecond, func(context.Context) (IStream, error) {
return toDynamicStream(collection)
})
return paginator, err
},
generateFunc: GenerateMockStreamWithEnding,
name: "stream paginator over a stream of dynamic pages but with a known ending",
},
{
paginator: func(ctx context.Context, collection IStaticPageStream) (IGenericStreamPaginator, error) {
paginator, err := NewStreamPaginator(ctx, time.Second, 10*time.Millisecond, func(context.Context) (IStream, error) {
return toDynamicStream(collection)
})
return paginator, err
},
name: "stream paginator over a running dry stream of dynamic pages",
generateFunc: GenerateMockStream,
dryOut: true,
},
{
paginator: func(ctx context.Context, collection IStaticPageStream) (IGenericStreamPaginator, error) {
paginator, err := NewStaticPageStreamPaginator(ctx, time.Second, 10*time.Millisecond, func(context.Context) (IStaticPageStream, error) {
return collection, nil
}, func(fCtx context.Context, current IStaticPage) (IStaticPage, error) {
c, err := toDynamicPage(current)
if err != nil {
return nil, err
}
return c.GetNext(fCtx)
}, func(fCtx context.Context, current IStaticPageStream) (IStaticPageStream, error) {
s, err := toDynamicStream(current)
if err != nil {
return nil, err
}
return s.GetFuture(fCtx)
})
if paginator != nil {
// Indicate the stream will run out.
err = paginator.DryUp()
}
return paginator, err
},
name: "stream paginator over a running dry stream of static pages",
generateFunc: GenerateMockStream,
dryOut: true,
},
}

for te := range tests {
test := tests[te]
for i := 0; i < 10; i++ {
mockPages, expectedCount, err := test.generateFunc()
require.NoError(t, err)
t.Run(fmt.Sprintf("%v-#%v-[%v items]", test.name, i, expectedCount), func(t *testing.T) {
paginator, err := test.paginator(context.TODO(), mockPages)
require.NoError(t, err)
count := int64(0)
for {
if !paginator.HasNext() {
break
}
count += 1
item, err := paginator.GetNext()
require.NoError(t, err)
require.NotNil(t, item)
mockItem, ok := item.(*MockItem)
require.True(t, ok)
assert.Equal(t, int(count-1), mockItem.Index)
if count >= expectedCount%2 {
require.NoError(t, paginator.DryUp())
}
}
assert.Equal(t, expectedCount, count)
})
}
}
}

func TestEmptyStream(t *testing.T) {
mockPages, expectedCount, err := GenerateMockEmptyStream()
require.NoError(t, err)
require.Zero(t, expectedCount)
require.NotNil(t, mockPages)
paginator, err := NewStreamPaginator(context.Background(), time.Second, 10*time.Millisecond, func(context.Context) (IStream, error) {
return toDynamicStream(mockPages)
})
require.NoError(t, err)
assert.False(t, paginator.HasNext())
assert.False(t, paginator.IsRunningDry())
item, err := paginator.GetNext()
errortest.AssertError(t, err, commonerrors.ErrNotFound)
assert.Nil(t, item)
}

func TestDryOutStream(t *testing.T) {
mockPages, expectedCount, err := GenerateMockEmptyStream()
require.NoError(t, err)
require.Zero(t, expectedCount)
require.NotNil(t, mockPages)
paginator, err := NewStreamPaginator(context.Background(), time.Millisecond, 10*time.Millisecond, func(context.Context) (IStream, error) {
return toDynamicStream(mockPages)
})
require.NoError(t, err)
require.NoError(t, paginator.DryUp())
assert.False(t, paginator.HasNext())
assert.True(t, paginator.IsRunningDry())
}
Loading
Loading