Skip to content

Commit

Permalink
cli: adjust index file creation in upload-bin
Browse files Browse the repository at this point in the history
In case of incomplete search result it will try to find each missed oid
and process it. In case of duplicates the first found will be in index
file.

Close #3647

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Oct 30, 2024
1 parent 9d26ca9 commit 1be96c3
Showing 1 changed file with 111 additions and 41 deletions.
152 changes: 111 additions & 41 deletions cli/util/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,13 @@ func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun

var (
errCh = make(chan error)
buffer = make([]byte, indexFileSize*oidSize)
oidCh = make(chan oid.ID, indexFileSize)
oidFetcherToProcessor = make(chan struct{}, indexFileSize)

// processedIndices: position in index file -> block index.
processedIndices = sync.Map{}
buffer = make([]byte, indexFileSize*oidSize)

Check warning on line 360 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L357-L360

Added lines #L357 - L360 were not covered by tests
emptyOid = make([]byte, oidSize)
)
defer close(oidCh)
Expand Down Expand Up @@ -382,59 +385,64 @@ func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun
}
return
}
offset := (uint(blockIndex) % indexFileSize) * oidSize
id.Encode(buffer[offset:])
indexFilePosition := uint(blockIndex) % indexFileSize
if _, exists := processedIndices.LoadOrStore(int(indexFilePosition), blockIndex); !exists {
id.Encode(buffer[indexFilePosition*oidSize:])
}

Check warning on line 391 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L388-L391

Added lines #L388 - L391 were not covered by tests
oidFetcherToProcessor <- struct{}{}
}
}()
}

// Helper to wait for all objects to be processed
waitForCompletion := func(expectedCount int, errCh <-chan error, completionCh <-chan struct{}) error {
completed := 0
for completed < expectedCount {
select {
case err := <-errCh:
return err
case <-completionCh:
completed++
if completed%1000 == 0 {
fmt.Fprintf(ctx.App.Writer, "Processed %d objects out of %d\n", completed, expectedCount)
}

Check warning on line 408 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L398-L408

Added lines #L398 - L408 were not covered by tests
}
}
return nil

Check warning on line 411 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L411

Added line #L411 was not covered by tests
}

// Main processing loop for each index file
for i := existingIndexCount; i < expectedIndexCount; i++ {
startIndex := i * indexFileSize
endIndex := startIndex + indexFileSize
go func() {
for j := int(startIndex); j < int(endIndex); j += searchBatchSize {
remaining := int(endIndex) - j
end := j + min(searchBatchSize, remaining)

prm = client.PrmObjectSearch{}
filters = object.NewSearchFilters()
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", j), object.MatchNumGE)
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT)
prm.SetFilters(filters)
var objIDs []oid.ID
err := retry(func() error {
var errSearchIndex error
objIDs, errSearchIndex = neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm)
return errSearchIndex
})

objIDs, err := searchObj(ctx.Context, p, containerID, account, blockAttributeKey, startIndex, endIndex, maxParallelSearches)
if err != nil {
return err
}
for _, id := range objIDs {
oidCh <- id
}
if err = waitForCompletion(len(objIDs), errCh, oidFetcherToProcessor); err != nil {
return err
}

Check warning on line 427 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L418-L427

Added lines #L418 - L427 were not covered by tests
// Check if there are any empty oid in the created index file.
// This could happen if searchObj returned not all objects, or we have duplicates.
// In this case, we need to retry the search.
oidNum := 0
for idx := range indexFileSize {
if _, exists := processedIndices.Load(int(idx)); !exists {
objIDs, err = searchObj(ctx.Context, p, containerID, account, blockAttributeKey, i*indexFileSize+idx, i*indexFileSize+idx+1, 1)

Check warning on line 434 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L431-L434

Added lines #L431 - L434 were not covered by tests
if err != nil {
select {
case errCh <- fmt.Errorf("failed to search for objects from %d to %d for index file %d: %w", j, end, i, err):
default:
}
return
return err

Check warning on line 436 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L436

Added line #L436 was not covered by tests
}

oidNum += len(objIDs)

Check warning on line 438 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L438

Added line #L438 was not covered by tests
for _, id := range objIDs {
oidCh <- id
}
}
}()

var completed int
waitLoop:
for {
select {
case err := <-errCh:
return err
case <-oidFetcherToProcessor:
completed++
if completed == int(indexFileSize) {
break waitLoop
}
}
}
if err = waitForCompletion(oidNum, errCh, oidFetcherToProcessor); err != nil {
return err

Check warning on line 445 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L444-L445

Added lines #L444 - L445 were not covered by tests
}
// Check if there are any empty oids in the created index file.
// This could happen if object payload is empty ->
Expand All @@ -448,7 +456,7 @@ func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun
*object.NewAttribute(attributeKey, strconv.Itoa(int(i))),
*object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))),
}
err := retry(func() error {
err = retry(func() error {

Check warning on line 459 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L459

Added line #L459 was not covered by tests
return uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled)
})
if err != nil {
Expand All @@ -459,6 +467,68 @@ func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun
return nil
}

// searchObj searches in parallel for objects with attribute GE startIndex and LT endIndex.
func searchObj(ctx context.Context, p *pool.Pool, containerID cid.ID, account wallet.Account, blockAttributeKey string, startIndex, endIndex uint, maxParallelSearches int) ([]oid.ID, error) {
var (
res []oid.ID
mu sync.Mutex
wg sync.WaitGroup
errCh = make(chan error)
sem = make(chan struct{}, maxParallelSearches)
)

for j := int(startIndex); j < int(endIndex); j += searchBatchSize {
select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-errCh:
return nil, err
case sem <- struct{}{}:

Check warning on line 486 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L471-L486

Added lines #L471 - L486 were not covered by tests
}

wg.Add(1)
go func(j int) {
defer wg.Done()
defer func() { <-sem }()

Check warning on line 492 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L489-L492

Added lines #L489 - L492 were not covered by tests

remaining := int(endIndex) - j
end := j + min(searchBatchSize, remaining)

prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", j), object.MatchNumGE)
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT)
prm.SetFilters(filters)

var objIDs []oid.ID
err := retry(func() error {
var errSearchIndex error
objIDs, errSearchIndex = neofs.ObjectSearch(ctx, p, account.PrivateKey(), containerID.String(), prm)
return errSearchIndex
})
if err != nil {
select {
case errCh <- fmt.Errorf("failed to search for objects from %d to %d: %w", j, end, err):
default:

Check warning on line 512 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L494-L512

Added lines #L494 - L512 were not covered by tests
}
return

Check warning on line 514 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L514

Added line #L514 was not covered by tests
}

mu.Lock()
res = append(res, objIDs...)
mu.Unlock()

Check warning on line 519 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L517-L519

Added lines #L517 - L519 were not covered by tests
}(j)
}
wg.Wait()

select {
case err := <-errCh:
return nil, err
default:
return res, nil

Check warning on line 528 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L522-L528

Added lines #L522 - L528 were not covered by tests
}
}

// uploadObj uploads the block to the container using the pool.
func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, HomomorphicHashingDisabled bool) error {
var (
Expand Down

0 comments on commit 1be96c3

Please sign in to comment.