From 1be96c31d31189a66999d982dc6c7d6956411f94 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Wed, 30 Oct 2024 18:25:33 +0300 Subject: [PATCH] cli: adjust index file creation in `upload-bin` In case of incomplete search result it will try to find each missed oid and process it. In case of duplicates the first found will be in index file. Close #3647 Signed-off-by: Ekaterina Pavlova --- cli/util/uploader.go | 152 +++++++++++++++++++++++++++++++------------ 1 file changed, 111 insertions(+), 41 deletions(-) diff --git a/cli/util/uploader.go b/cli/util/uploader.go index d716d9b092..08fd724734 100644 --- a/cli/util/uploader.go +++ b/cli/util/uploader.go @@ -351,10 +351,13 @@ func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun var ( errCh = make(chan error) - buffer = make([]byte, indexFileSize*oidSize) oidCh = make(chan oid.ID, indexFileSize) oidFetcherToProcessor = make(chan struct{}, indexFileSize) + // processedIndices: position in index file -> block index. + processedIndices = sync.Map{} + buffer = make([]byte, indexFileSize*oidSize) + emptyOid = make([]byte, oidSize) ) defer close(oidCh) @@ -382,59 +385,64 @@ func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun } return } - offset := (uint(blockIndex) % indexFileSize) * oidSize - id.Encode(buffer[offset:]) + indexFilePosition := uint(blockIndex) % indexFileSize + if _, exists := processedIndices.LoadOrStore(int(indexFilePosition), blockIndex); !exists { + id.Encode(buffer[indexFilePosition*oidSize:]) + } oidFetcherToProcessor <- struct{}{} } }() } + // Helper to wait for all objects to be processed + waitForCompletion := func(expectedCount int, errCh <-chan error, completionCh <-chan struct{}) error { + completed := 0 + for completed < expectedCount { + select { + case err := <-errCh: + return err + case <-completionCh: + completed++ + if completed%1000 == 0 { + fmt.Fprintf(ctx.App.Writer, "Processed %d objects out of %d\n", completed, expectedCount) + } + } + } + return nil + } + + // Main processing loop for each index file for i := existingIndexCount; i < expectedIndexCount; i++ { startIndex := i * indexFileSize endIndex := startIndex + indexFileSize - go func() { - for j := int(startIndex); j < int(endIndex); j += searchBatchSize { - remaining := int(endIndex) - j - end := j + min(searchBatchSize, remaining) - - prm = client.PrmObjectSearch{} - filters = object.NewSearchFilters() - filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", j), object.MatchNumGE) - filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT) - prm.SetFilters(filters) - var objIDs []oid.ID - err := retry(func() error { - var errSearchIndex error - objIDs, errSearchIndex = neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm) - return errSearchIndex - }) - + objIDs, err := searchObj(ctx.Context, p, containerID, account, blockAttributeKey, startIndex, endIndex, maxParallelSearches) + if err != nil { + return err + } + for _, id := range objIDs { + oidCh <- id + } + if err = waitForCompletion(len(objIDs), errCh, oidFetcherToProcessor); err != nil { + return err + } + // Check if there are any empty oid in the created index file. + // This could happen if searchObj returned not all objects, or we have duplicates. + // In this case, we need to retry the search. + oidNum := 0 + for idx := range indexFileSize { + if _, exists := processedIndices.Load(int(idx)); !exists { + objIDs, err = searchObj(ctx.Context, p, containerID, account, blockAttributeKey, i*indexFileSize+idx, i*indexFileSize+idx+1, 1) if err != nil { - select { - case errCh <- fmt.Errorf("failed to search for objects from %d to %d for index file %d: %w", j, end, i, err): - default: - } - return + return err } - + oidNum += len(objIDs) for _, id := range objIDs { oidCh <- id } } - }() - - var completed int - waitLoop: - for { - select { - case err := <-errCh: - return err - case <-oidFetcherToProcessor: - completed++ - if completed == int(indexFileSize) { - break waitLoop - } - } + } + if err = waitForCompletion(oidNum, errCh, oidFetcherToProcessor); err != nil { + return err } // Check if there are any empty oids in the created index file. // This could happen if object payload is empty -> @@ -448,7 +456,7 @@ func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun *object.NewAttribute(attributeKey, strconv.Itoa(int(i))), *object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))), } - err := retry(func() error { + err = retry(func() error { return uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled) }) if err != nil { @@ -459,6 +467,68 @@ func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun return nil } +// searchObj searches in parallel for objects with attribute GE startIndex and LT endIndex. +func searchObj(ctx context.Context, p *pool.Pool, containerID cid.ID, account wallet.Account, blockAttributeKey string, startIndex, endIndex uint, maxParallelSearches int) ([]oid.ID, error) { + var ( + res []oid.ID + mu sync.Mutex + wg sync.WaitGroup + errCh = make(chan error) + sem = make(chan struct{}, maxParallelSearches) + ) + + for j := int(startIndex); j < int(endIndex); j += searchBatchSize { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case err := <-errCh: + return nil, err + case sem <- struct{}{}: + } + + wg.Add(1) + go func(j int) { + defer wg.Done() + defer func() { <-sem }() + + remaining := int(endIndex) - j + end := j + min(searchBatchSize, remaining) + + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", j), object.MatchNumGE) + filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT) + prm.SetFilters(filters) + + var objIDs []oid.ID + err := retry(func() error { + var errSearchIndex error + objIDs, errSearchIndex = neofs.ObjectSearch(ctx, p, account.PrivateKey(), containerID.String(), prm) + return errSearchIndex + }) + if err != nil { + select { + case errCh <- fmt.Errorf("failed to search for objects from %d to %d: %w", j, end, err): + default: + } + return + } + + mu.Lock() + res = append(res, objIDs...) + mu.Unlock() + }(j) + } + wg.Wait() + + select { + case err := <-errCh: + return nil, err + default: + return res, nil + } +} + // uploadObj uploads the block to the container using the pool. func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, HomomorphicHashingDisabled bool) error { var (