diff --git a/changes/20241114171420.bugfix b/changes/20241114171420.bugfix new file mode 100644 index 0000000000..3060525660 --- /dev/null +++ b/changes/20241114171420.bugfix @@ -0,0 +1 @@ +:bug: `[pagination]` Fix stream pagination diff --git a/utils/collection/pagination/interfaces.go b/utils/collection/pagination/interfaces.go index a4bd7ba431..6d72a8fed4 100644 --- a/utils/collection/pagination/interfaces.go +++ b/utils/collection/pagination/interfaces.go @@ -22,7 +22,7 @@ type IIterator interface { GetNext() (interface{}, error) } -// IStaticPage defines a generic page for a collection. +// IStaticPage defines a generic page for a collection. A page is marked as static when it cannot retrieve next pages on its own. type IStaticPage interface { // HasNext states whether more pages are accessible. HasNext() bool @@ -78,7 +78,7 @@ type IPaginatorAndPageFetcher interface { FetchNextPage(ctx context.Context, currentPage IStaticPage) (IStaticPage, error) } -// IGenericStreamPaginator is an iterator over a stream. A stream is a collection without any know ending. +// IGenericStreamPaginator is an iterator over a stream. A stream is a collection without any known ending. type IGenericStreamPaginator interface { IGenericPaginator // DryUp indicates to the stream that it will soon run out. diff --git a/utils/collection/pagination/stream.go b/utils/collection/pagination/stream.go index 3316472c50..082ecf442d 100644 --- a/utils/collection/pagination/stream.go +++ b/utils/collection/pagination/stream.go @@ -34,25 +34,29 @@ func (s *AbstractStreamPaginator) Close() error { } func (s *AbstractStreamPaginator) HasNext() bool { - if s.AbstractPaginator.HasNext() { - s.timeReachLast.Store(time.Now()) - return true - } - page, err := s.AbstractPaginator.FetchCurrentPage() - if err != nil { - return false - } - stream, ok := page.(IStaticPageStream) - if !ok { - return false - } - if !stream.HasFuture() { - return false - } - if s.IsRunningDry() { - if time.Since(s.timeReachLast.Load()) >= s.timeOut { + for { + if s.AbstractPaginator.HasNext() { + s.timeReachLast.Store(time.Now()) + return true + } + page, err := s.AbstractPaginator.FetchCurrentPage() + if err != nil { return false } + stream, ok := page.(IStaticPageStream) + if !ok { + return false + } + if !stream.HasFuture() { + return false + } + if s.IsRunningDry() { + if time.Since(s.timeReachLast.Load()) >= s.timeOut { + return false + } + } else { + s.timeReachLast.Store(time.Now()) + } future, err := s.FetchFuturePage(s.GetContext(), stream) if err != nil { return false @@ -61,10 +65,9 @@ func (s *AbstractStreamPaginator) HasNext() bool { if err != nil { return false } - } else { - s.timeReachLast.Store(time.Now()) + + parallelisation.SleepWithContext(s.GetContext(), s.backoff) } - return s.AbstractPaginator.HasNext() } func (s *AbstractStreamPaginator) GetNext() (interface{}, error) { @@ -78,9 +81,7 @@ func (s *AbstractStreamPaginator) GetNext() (interface{}, error) { err = fmt.Errorf("%w: there is not any next item", commonerrors.ErrNotFound) return nil, err } - parallelisation.SleepWithContext(s.GetContext(), s.backoff) - } } diff --git a/utils/collection/pagination/stream_test.go b/utils/collection/pagination/stream_test.go new file mode 100644 index 0000000000..6a11d455d2 --- /dev/null +++ b/utils/collection/pagination/stream_test.go @@ -0,0 +1,153 @@ +package pagination + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/ARM-software/golang-utils/utils/commonerrors" + "github.com/ARM-software/golang-utils/utils/commonerrors/errortest" +) + +func TestStreamPaginator(t *testing.T) { + tests := []struct { + paginator func(context.Context, IStaticPageStream) (IGenericStreamPaginator, error) + name string + generateFunc func() (firstPage IStream, itemTotal int64, err error) + dryOut bool + }{ + { + paginator: func(ctx context.Context, collection IStaticPageStream) (IGenericStreamPaginator, error) { + paginator, err := NewStaticPageStreamPaginator(ctx, time.Second, 10*time.Millisecond, func(context.Context) (IStaticPageStream, error) { + return collection, nil + }, func(fCtx context.Context, current IStaticPage) (IStaticPage, error) { + c, err := toDynamicPage(current) + if err != nil { + return nil, err + } + return c.GetNext(fCtx) + }, func(fCtx context.Context, current IStaticPageStream) (IStaticPageStream, error) { + s, err := toDynamicStream(current) + if err != nil { + return nil, err + } + return s.GetFuture(fCtx) + }) + return paginator, err + }, + generateFunc: GenerateMockStreamWithEnding, + name: "stream paginator over a stream of static pages with known ending", + }, + { + paginator: func(ctx context.Context, collection IStaticPageStream) (IGenericStreamPaginator, error) { + paginator, err := NewStreamPaginator(ctx, time.Second, 10*time.Millisecond, func(context.Context) (IStream, error) { + return toDynamicStream(collection) + }) + return paginator, err + }, + generateFunc: GenerateMockStreamWithEnding, + name: "stream paginator over a stream of dynamic pages but with a known ending", + }, + { + paginator: func(ctx context.Context, collection IStaticPageStream) (IGenericStreamPaginator, error) { + paginator, err := NewStreamPaginator(ctx, time.Second, 10*time.Millisecond, func(context.Context) (IStream, error) { + return toDynamicStream(collection) + }) + return paginator, err + }, + name: "stream paginator over a running dry stream of dynamic pages", + generateFunc: GenerateMockStream, + dryOut: true, + }, + { + paginator: func(ctx context.Context, collection IStaticPageStream) (IGenericStreamPaginator, error) { + paginator, err := NewStaticPageStreamPaginator(ctx, time.Second, 10*time.Millisecond, func(context.Context) (IStaticPageStream, error) { + return collection, nil + }, func(fCtx context.Context, current IStaticPage) (IStaticPage, error) { + c, err := toDynamicPage(current) + if err != nil { + return nil, err + } + return c.GetNext(fCtx) + }, func(fCtx context.Context, current IStaticPageStream) (IStaticPageStream, error) { + s, err := toDynamicStream(current) + if err != nil { + return nil, err + } + return s.GetFuture(fCtx) + }) + if paginator != nil { + // Indicate the stream will run out. + err = paginator.DryUp() + } + return paginator, err + }, + name: "stream paginator over a running dry stream of static pages", + generateFunc: GenerateMockStream, + dryOut: true, + }, + } + + for te := range tests { + test := tests[te] + for i := 0; i < 10; i++ { + mockPages, expectedCount, err := test.generateFunc() + require.NoError(t, err) + t.Run(fmt.Sprintf("%v-#%v-[%v items]", test.name, i, expectedCount), func(t *testing.T) { + paginator, err := test.paginator(context.TODO(), mockPages) + require.NoError(t, err) + count := int64(0) + for { + if !paginator.HasNext() { + break + } + count += 1 + item, err := paginator.GetNext() + require.NoError(t, err) + require.NotNil(t, item) + mockItem, ok := item.(*MockItem) + require.True(t, ok) + assert.Equal(t, int(count-1), mockItem.Index) + if count >= expectedCount%2 { + require.NoError(t, paginator.DryUp()) + } + } + assert.Equal(t, expectedCount, count) + }) + } + } +} + +func TestEmptyStream(t *testing.T) { + mockPages, expectedCount, err := GenerateMockEmptyStream() + require.NoError(t, err) + require.Zero(t, expectedCount) + require.NotNil(t, mockPages) + paginator, err := NewStreamPaginator(context.Background(), time.Second, 10*time.Millisecond, func(context.Context) (IStream, error) { + return toDynamicStream(mockPages) + }) + require.NoError(t, err) + assert.False(t, paginator.HasNext()) + assert.False(t, paginator.IsRunningDry()) + item, err := paginator.GetNext() + errortest.AssertError(t, err, commonerrors.ErrNotFound) + assert.Nil(t, item) +} + +func TestDryOutStream(t *testing.T) { + mockPages, expectedCount, err := GenerateMockEmptyStream() + require.NoError(t, err) + require.Zero(t, expectedCount) + require.NotNil(t, mockPages) + paginator, err := NewStreamPaginator(context.Background(), time.Millisecond, 10*time.Millisecond, func(context.Context) (IStream, error) { + return toDynamicStream(mockPages) + }) + require.NoError(t, err) + require.NoError(t, paginator.DryUp()) + assert.False(t, paginator.HasNext()) + assert.True(t, paginator.IsRunningDry()) +} diff --git a/utils/collection/pagination/testing.go b/utils/collection/pagination/testing.go index 142e38f18c..3a31f5fc6d 100644 --- a/utils/collection/pagination/testing.go +++ b/utils/collection/pagination/testing.go @@ -8,13 +8,12 @@ package pagination import ( "context" "fmt" - "math/rand" - "time" "github.com/go-faker/faker/v4" "github.com/ARM-software/golang-utils/utils/commonerrors" "github.com/ARM-software/golang-utils/utils/parallelisation" + "github.com/ARM-software/golang-utils/utils/safecast" ) type MockItem struct { @@ -131,7 +130,7 @@ func (m *MockPage) SetIndexes(firstIndex int) { } func (m *MockPage) GetItemCount() (int64, error) { - return int64(len(m.elements)), nil + return safecast.ToInt64(len(m.elements)), nil } func (m *MockPage) HasFuture() bool { @@ -154,25 +153,31 @@ func GenerateEmptyPage() IStream { return &MockPage{} } -func GenerateMockPage() (IStream, error) { - random := rand.New(rand.NewSource(time.Now().Unix())) //nolint:gosec //causes G404: Use of weak random number generator (math/rand instead of crypto/rand) (gosec), So disable gosec as this is just for testing +func GenerateMockPage() (IStream, int64, error) { + randoms, err := faker.RandomInt(0, 50) + if err != nil { + return nil, 0, err + } + n := randoms[2] page := GenerateEmptyPage().(*MockPage) - n := random.Intn(50) //nolint:gosec //causes G404: Use of weak random number generator (math/rand instead of crypto/rand) (gosec), So disable gosec as this is just for testing for i := 0; i < n; i++ { - err := page.AppendItem(GenerateMockItem()) - if err != nil { - return nil, err + subErr := page.AppendItem(GenerateMockItem()) + if subErr != nil { + return nil, 0, subErr } } - return page, nil + return page, safecast.ToInt64(n), nil } func GenerateMockCollection() (firstPage IStream, itemTotal int64, err error) { - random := rand.New(rand.NewSource(time.Now().Unix())) //nolint:gosec //causes G404: Use of weak random number generator (math/rand instead of crypto/rand) (gosec), So disable gosec as this is just for testing - n := random.Intn(50) //nolint:gosec //causes G404: Use of weak random number generator (math/rand instead of crypto/rand) (gosec), So disable gosec as this is just for testing + randoms, err := faker.RandomInt(0, 50) + if err != nil { + return + } + n := randoms[1] var next IStream for i := 0; i < n; i++ { - currentPage, subErr := GenerateMockPage() + currentPage, _, subErr := GenerateMockPage() if subErr != nil { err = subErr return @@ -197,19 +202,23 @@ func GenerateMockCollection() (firstPage IStream, itemTotal int64, err error) { next = firstPage } if firstPage == nil { - return + firstPage = GenerateEmptyPage() } mockP := firstPage.(*MockPage) mockP.SetIndexes(0) return } +// GenerateMockStream creates a mock stream which could never end (as in, a future link will be always present) func GenerateMockStream() (firstPage IStream, itemTotal int64, err error) { - random := rand.New(rand.NewSource(time.Now().Unix())) //nolint:gosec //causes G404: Use of weak random number generator (math/rand instead of crypto/rand) (gosec), So disable gosec as this is just for testing - n := random.Intn(50) //nolint:gosec //causes G404: Use of weak random number generator (math/rand instead of crypto/rand) (gosec), So disable gosec as this is just for testing + randoms, err := faker.RandomInt(1, 3) + if err != nil { + return + } + n := randoms[0] var future IStream for i := 0; i < n; i++ { - currentPage, subErr := GenerateMockPage() + currentPage, _, subErr := GenerateMockPage() if subErr != nil { err = subErr return @@ -236,8 +245,90 @@ func GenerateMockStream() (firstPage IStream, itemTotal int64, err error) { future = firstPage } if firstPage == nil { + firstPage = GenerateEmptyPage() + } + mockP := firstPage.(*MockPage) + mockP.SetIndexes(0) + return +} + +// GenerateMockStreamWithEnding generates a stream which will end itself (as in the future link will disappear). +func GenerateMockStreamWithEnding() (firstPage IStream, itemTotal int64, err error) { + randoms, err := faker.RandomInt(1, 50) + if err != nil { return } + n := randoms[0] + var future IStream + for i := 0; i < n; i++ { + currentPage, _, subErr := GenerateMockPage() + if subErr != nil { + err = subErr + return + } + currentCount, subErr := currentPage.GetItemCount() + if subErr != nil { + err = subErr + return + } + itemTotal += currentCount + + mockP := currentPage.(*MockPage) + if future == nil { + subErr = mockP.SetNext(GenerateEmptyPage()) + } else { + subErr = mockP.SetFuture(future) + } + if subErr != nil { + err = subErr + return + } + + firstPage = currentPage + future = firstPage + } + if firstPage == nil { + firstPage = GenerateEmptyPage() + } + mockP := firstPage.(*MockPage) + mockP.SetIndexes(0) + return +} + +// GenerateMockEmptyStream generates an empty stream (as in stream of pages with no element). +func GenerateMockEmptyStream() (firstPage IStream, itemTotal int64, err error) { + randoms, err := faker.RandomInt(1, 50) + if err != nil { + return + } + n := randoms[0] + var future IStream + for i := 0; i < n; i++ { + currentPage := GenerateEmptyPage() + currentCount, subErr := currentPage.GetItemCount() + if subErr != nil { + err = subErr + return + } + itemTotal += currentCount + + mockP := currentPage.(*MockPage) + if future == nil { + subErr = mockP.SetNext(GenerateEmptyPage()) + } else { + subErr = mockP.SetFuture(future) + } + if subErr != nil { + err = subErr + return + } + + firstPage = currentPage + future = firstPage + } + if firstPage == nil { + firstPage = GenerateEmptyPage() + } mockP := firstPage.(*MockPage) mockP.SetIndexes(0) return diff --git a/utils/collection/pagination/testing_test.go b/utils/collection/pagination/testing_test.go new file mode 100644 index 0000000000..3f21fbd2ef --- /dev/null +++ b/utils/collection/pagination/testing_test.go @@ -0,0 +1,124 @@ +package pagination + +import ( + "context" + "fmt" + "testing" + + "github.com/go-faker/faker/v4" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/ARM-software/golang-utils/utils/safecast" +) + +func TestGenerateEmptyPage(t *testing.T) { + page := GenerateEmptyPage() + require.NotNil(t, page) + assert.False(t, page.HasNext()) + assert.False(t, page.HasFuture()) + count, err := page.GetItemCount() + require.NoError(t, err) + assert.Equal(t, int64(0), count) +} + +func TestGenerateMockPage(t *testing.T) { + r, err := faker.RandomInt(2, 50) + require.NoError(t, err) + for i := 0; i < r[0]; i++ { + page, count, err := GenerateMockPage() + require.NoError(t, err) + t.Run(fmt.Sprintf("%d_items#%d", i, count), func(t *testing.T) { + require.NotNil(t, page) + intCount, err := page.GetItemCount() + require.NoError(t, err) + assert.Equal(t, count, intCount) + }) + } +} + +func TestGenerateMockCollection(t *testing.T) { + r, err := faker.RandomInt(2, 50) + require.NoError(t, err) + for i := 0; i < r[0]; i++ { + firstPage, count, err := GenerateMockCollection() + require.NoError(t, err) + t.Run(fmt.Sprintf("%d_items#%d", i, count), func(t *testing.T) { + require.NotNil(t, firstPage) + cCount, err := firstPage.GetItemCount() + require.NoError(t, err) + assert.True(t, cCount <= count) + size := safecast.ToInt64(cCount) + page := firstPage.(IPage) + for { + if !page.HasNext() { + break + } + page, err = page.GetNext(context.Background()) + require.NoError(t, err) + cCount, err := page.GetItemCount() + require.NoError(t, err) + size += cCount + } + assert.Equal(t, count, size) + }) + } +} + +func TestGenerateMockStream(t *testing.T) { + r, err := faker.RandomInt(2, 50) + require.NoError(t, err) + for i := 0; i < r[0]; i++ { + firstPage, count, err := GenerateMockStream() + require.NoError(t, err) + t.Run(fmt.Sprintf("%d_items#%d", i, count), func(t *testing.T) { + require.NotNil(t, firstPage) + cCount, err := firstPage.GetItemCount() + require.NoError(t, err) + assert.True(t, cCount <= count) + size := safecast.ToInt64(cCount) + page := firstPage + for { + if !page.HasNext() && !page.HasFuture() { + break + } + if page.HasNext() { + nextPage, err := page.GetNext(context.Background()) + require.NoError(t, err) + page = nextPage.(IStream) + } else { + page, err = page.GetFuture(context.Background()) + require.NoError(t, err) + } + + cCount, err := page.GetItemCount() + require.NoError(t, err) + size += cCount + } + assert.Equal(t, count, size) + }) + } +} + +func TestNewMockPageIterator(t *testing.T) { + type args struct { + page *MockPage + } + tests := []struct { + name string + args args + want IIterator + wantErr assert.ErrorAssertionFunc + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := NewMockPageIterator(tt.args.page) + if !tt.wantErr(t, err, fmt.Sprintf("NewMockPageIterator(%v)", tt.args.page)) { + return + } + assert.Equalf(t, tt.want, got, "NewMockPageIterator(%v)", tt.args.page) + }) + } +}