Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Pending status returns immediately from send and subscribe transaction statuses #6737

Open
wants to merge 21 commits into
base: master
Choose a base branch
from

Conversation

Guitarheroua
Copy link
Contributor

@Guitarheroua Guitarheroua commented Nov 19, 2024

Closes #6573, part of #6767

This PR implements functionality to return a Pending status immediately upon the start of a subscription.

@codecov-commenter
Copy link

codecov-commenter commented Nov 19, 2024

Codecov Report

Attention: Patch coverage is 32.77311% with 80 lines in your changes missing coverage. Please review.

Project coverage is 41.13%. Comparing base (0f4013e) to head (a162ba2).

Files with missing lines Patch % Lines
access/mock/api.go 0.00% 44 Missing ⚠️
.../access/rpc/backend/backend_stream_transactions.go 57.57% 23 Missing and 5 partials ⚠️
cmd/util/cmd/run-script/cmd.go 0.00% 7 Missing ⚠️
access/handler.go 0.00% 1 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff           @@
##           master    #6737   +/-   ##
=======================================
  Coverage   41.12%   41.13%           
=======================================
  Files        2107     2107           
  Lines      185335   185422   +87     
=======================================
+ Hits        76228    76278   +50     
- Misses     102706   102739   +33     
- Partials     6401     6405    +4     
Flag Coverage Δ
unittests 41.13% <32.77%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Guitarheroua Guitarheroua self-assigned this Nov 20, 2024
@Guitarheroua Guitarheroua marked this pull request as ready for review November 21, 2024 07:42
Copy link
Contributor

@peterargue peterargue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a few small comments, but otherwise looks good

cmd/util/cmd/run-script/cmd.go Outdated Show resolved Hide resolved
engine/access/rpc/backend/backend_stream_transactions.go Outdated Show resolved Hide resolved
engine/access/rpc/backend/backend_stream_transactions.go Outdated Show resolved Hide resolved
engine/access/rpc/backend/backend_stream_transactions.go Outdated Show resolved Hide resolved
Copy link
Contributor

@peterargue peterargue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a couple more comments, but otherwise looks good.

engine/access/rpc/backend/backend_stream_transactions.go Outdated Show resolved Hide resolved
engine/access/rpc/backend/backend_stream_transactions.go Outdated Show resolved Hide resolved
Copy link
Contributor

@peterargue peterargue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good other than missing irrecoverable error

@@ -96,8 +96,7 @@ func (b *backendSubscribeTransactions) SubscribeTransactionStatusesFromLatest(
) subscription.Subscription {
header, err := b.txLocalDataProvider.state.Sealed().Head()
if err != nil {
b.log.Error().Err(err).Msg("failed to retrieve latest block")
return subscription.NewFailedSubscription(err, "failed to retrieve latest block")
irrecoverable.Throw(ctx, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please wrap this with some context so it's obvious why the node crashed

@peterargue peterargue requested a review from durkmurder January 2, 2025 17:55
// - txID: The identifier of the transaction to monitor.
// - startBlockID: The block ID from which to start monitoring.
// - requiredEventEncodingVersion: The version of event encoding required for the subscription.
SubscribeTransactionStatusesFromStartBlockID(ctx context.Context, txID flow.Identifier, startBlockID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
SubscribeTransactionStatusesFromStartBlockID(ctx context.Context, txID flow.Identifier, startBlockID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription
SubscribeTransactionStatusesFromBlockID(ctx context.Context, txID flow.Identifier, startBlockID flow.Identifier, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription

To me it seems that "Start" can be omitted but that is just a preference for shorter function names.

// - txID: The unique identifier of the transaction to monitor.
// - startHeight: The block height from which to start monitoring.
// - requiredEventEncodingVersion: The version of event encoding required for the subscription.
SubscribeTransactionStatusesFromStartHeight(ctx context.Context, txID flow.Identifier, startHeight uint64, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
SubscribeTransactionStatusesFromStartHeight(ctx context.Context, txID flow.Identifier, startHeight uint64, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription
SubscribeTransactionStatusesFromHeight(ctx context.Context, txID flow.Identifier, startHeight uint64, requiredEventEncodingVersion entities.EventEncodingVersion) subscription.Subscription

Comment on lines +217 to +220
// SubscribeTransactionStatusesFromStartHeight subscribes to transaction status updates for a given transaction ID.
// Monitoring begins from the specified block height. The subscription streams status updates until the transaction
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). When the transaction reaches one of
// these final statuses, the subscription will automatically terminate.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// SubscribeTransactionStatusesFromStartHeight subscribes to transaction status updates for a given transaction ID.
// Monitoring begins from the specified block height. The subscription streams status updates until the transaction
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). When the transaction reaches one of
// these final statuses, the subscription will automatically terminate.
// SubscribeTransactionStatusesFromStartHeight subscribes to transaction status updates for a given transaction ID.
// Monitoring begins from the specified block height. The subscription streams status updates until the transaction
// reaches the final state ([flow.TransactionStatusSealed] or [flow.TransactionStatusExpired]). When the transaction reaches one of
// these final states, the subscription will automatically terminate.

Comment on lines +206 to +209
// SubscribeTransactionStatusesFromStartBlockID subscribes to transaction status updates for a given transaction ID.
// Monitoring begins from the specified block ID. The subscription streams status updates until the transaction
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). When the transaction reaches one of
// these final statuses, the subscription will automatically terminate.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// SubscribeTransactionStatusesFromStartBlockID subscribes to transaction status updates for a given transaction ID.
// Monitoring begins from the specified block ID. The subscription streams status updates until the transaction
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). When the transaction reaches one of
// these final statuses, the subscription will automatically terminate.
// SubscribeTransactionStatusesFromStartBlockID subscribes to transaction status updates for a given transaction ID.
// Monitoring begins from the specified block ID. The subscription streams status updates until the transaction
// reaches the final state ([flow.TransactionStatusSealed] or [flow.TransactionStatusExpired]). When the transaction reaches one of
// these final states, the subscription will automatically terminate.

Comment on lines +228 to +231
// SubscribeTransactionStatusesFromLatest subscribes to transaction status updates for a given transaction ID.
// Monitoring begins from the latest block. The subscription streams status updates until the transaction
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). When the transaction reaches one of
// these final statuses, the subscription will automatically terminate.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// SubscribeTransactionStatusesFromLatest subscribes to transaction status updates for a given transaction ID.
// Monitoring begins from the latest block. The subscription streams status updates until the transaction
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). When the transaction reaches one of
// these final statuses, the subscription will automatically terminate.
// SubscribeTransactionStatusesFromLatest subscribes to transaction status updates for a given transaction ID.
// Monitoring begins from the latest block. The subscription streams status updates until the transaction
// reaches the final state ([flow.TransactionStatusSealed] or [flow.TransactionStatusExpired]). When the transaction reaches one of
// these final states, the subscription will automatically terminate.

Comment on lines +238 to +241
// SendAndSubscribeTransactionStatuses sends a transaction to the execution node and subscribes to its status updates.
// Monitoring begins from the reference block saved in the transaction itself and streams status updates until the transaction
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). Once a final status is reached, the subscription
// automatically terminates.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// SendAndSubscribeTransactionStatuses sends a transaction to the execution node and subscribes to its status updates.
// Monitoring begins from the reference block saved in the transaction itself and streams status updates until the transaction
// reaches a final state (TransactionStatusSealed or TransactionStatusExpired). Once a final status is reached, the subscription
// automatically terminates.
// SendAndSubscribeTransactionStatuses sends a transaction to the execution node and subscribes to its status updates.
// Monitoring begins from the reference block saved in the transaction itself and streams status updates until the transaction
// reaches the final state ([flow.TransactionStatusSealed] or [flow.TransactionStatusExpired]). Once the final status has been reached, the subscription
// automatically terminates.

irrecoverable.Throw(ctx, err)
}
return nil, rpc.ConvertStorageError(err)
if txInfo.Status, err = b.getTransactionStatus(ctx, txInfo, prevTxStatus); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤯 I find this logic very hard to follow if you are not familiar with it. It includes a lot of state changes and mutating fields. I am wondering if we can make it easier to follow by restructuring the code path a bit.

A proposal to this:

// createSubscription initializes a subscription for monitoring a transaction's status.
// If the start height cannot be determined, a failed subscription is returned.
func (b *backendSubscribeTransactions) createSubscription(
	ctx context.Context,
	txID flow.Identifier,
	startBlockID flow.Identifier,
	startBlockHeight uint64,
	referenceBlockID flow.Identifier,
	requiredEventEncodingVersion entities.EventEncodingVersion,
	shouldTriggerPending bool,
) subscription.Subscription {
	var nextHeight uint64
	var err error

	// Get height to start subscription from
	if startBlockID == flow.ZeroID {
		if nextHeight, err = b.blockTracker.GetStartHeightFromHeight(startBlockHeight); err != nil {
			b.log.Debug().Err(err).Uint64("block_height", startBlockHeight).Msg("failed to get start height")
			return subscription.NewFailedSubscription(err, "failed to get start height")
		}
	} else {
		if nextHeight, err = b.blockTracker.GetStartHeightFromBlockID(startBlockID); err != nil {
			b.log.Debug().Err(err).Str("block_id", startBlockID.String()).Msg("failed to get start height")
			return subscription.NewFailedSubscription(err, "failed to get start height")
		}
	}

	return b.subscriptionHandler.Subscribe(ctx, nextHeight, b.getTransactionStatusResponse(txID, referenceBlockID, requiredEventEncodingVersion, shouldTriggerPending))
}

I have moved creation of metadata closer to the usage, it's easier to understand that way why it's valid. I had hard time understanding how are we mutating the txInfo field.

// getTransactionStatusResponse returns a callback function that produces transaction status
// subscription responses based on new blocks.
func (b *backendSubscribeTransactions) getTransactionStatusResponse(
	txID, referenceBlockID flow.Identifier,
	requiredEventEncodingVersion entities.EventEncodingVersion,
	shouldTriggerPending bool,
) func(context.Context, uint64) (interface{}, error) {
	txInfo := &transactionSubscriptionMetadata{
		TransactionResult: &access.TransactionResult{
			TransactionID: txID,
			BlockID:       flow.ZeroID,
		},
		txReferenceBlockID:   referenceBlockID,
		blockWithTx:          nil,
		eventEncodingVersion: requiredEventEncodingVersion,
	}
	triggerPendingOnce := atomic.NewBool(false)
	return func(ctx context.Context, height uint64) (interface{}, error) {
		err := b.checkBlockReady(height)
		if err != nil {
			return nil, err
		}

		if shouldTriggerPending && triggerPendingOnce.CompareAndSwap(false, true) {
			// The status of the first pending transaction should be returned immediately, as the transaction has already been sent.
			// This should occur only once for each subscription.
			txInfo.Status = flow.TransactionStatusPending
			return b.generateResultsWithMissingStatuses(txInfo, flow.TransactionStatusUnknown)
		}

		if txInfo.IsFinal() {
			return nil, fmt.Errorf("transaction final status %s already reported: %w", txInfo.Status.String(), subscription.ErrEndOfData)
		}

		// If on this step transaction block not available, search for it.
		if txInfo.blockWithTx == nil {
			// Search for transaction`s block information.
			txInfo.blockWithTx,
				txInfo.BlockID,
				txInfo.BlockHeight,
				txInfo.CollectionID,
				err = b.searchForTransactionBlockInfo(height, txInfo)

			if err != nil {
				if errors.Is(err, storage.ErrNotFound) {
					return nil, fmt.Errorf("could not find block %d in storage: %w", height, subscription.ErrBlockNotReady)
				}

				if !errors.Is(err, ErrTransactionNotInBlock) {
					return nil, status.Errorf(codes.Internal, "could not get block %d: %v", height, err)
				}
			}
		}

		// Get old status here, as it could be replaced by status from founded tx result
		prevTxStatus := txInfo.Status

		// Check, if transaction executed and transaction result already available
		if txInfo.blockWithTx == nil {
			txInfo.Status, err = b.txLocalDataProvider.DeriveUnknownTransactionStatus(txInfo.txReferenceBlockID)
			if err != nil {
				if !errors.Is(err, state.ErrUnknownSnapshotReference) {
					irrecoverable.Throw(ctx, err)
				}
				return nil, rpc.ConvertStorageError(err)
			}
		} else if !txInfo.IsExecuted() {
			txResult, err := b.searchForTransactionResult(ctx, txInfo)
			if err != nil {
				return nil, status.Errorf(codes.Internal, "failed to get execution result for block %s: %v", txInfo.BlockID, err)
			}

			// If transaction result was found, fully replace it in metadata. New transaction status already included in result.
			if txResult != nil {
				txInfo.TransactionResult = txResult
			}
		}

		if prevTxStatus == txInfo.Status {
			// When a block with the transaction is available, it is possible to receive a new transaction status while
			// searching for the transaction result. Otherwise, it remains unchanged. So, if the old and new transaction
			// statuses are the same, the current transaction status should be retrieved.
			txInfo.Status, err = b.txLocalDataProvider.DeriveTransactionStatus(txInfo.blockWithTx.Height, txInfo.IsExecuted())
			if err != nil {
				if !errors.Is(err, state.ErrUnknownSnapshotReference) {
					irrecoverable.Throw(ctx, err)
				}
				return nil, rpc.ConvertStorageError(err)
			}
		}

		// If the old and new transaction statuses are still the same, the status change should not be reported, so
		// return here with no response.
		if prevTxStatus == txInfo.Status {
			return nil, nil
		}

		return b.generateResultsWithMissingStatuses(txInfo, prevTxStatus)
	}
}

In this section I got rid from inner functions to avoid extra levels of search, they are quite simple and I believe it's easier to understand the logic when they are inlined. Additionally, I have removed extra fields and refactored other functions to pass only bare minimum data.

This lead to the simplifications to the structure:

// transactionSubscriptionMetadata holds data representing the status state for each transaction subscription.
type transactionSubscriptionMetadata struct {
	*access.TransactionResult
	txReferenceBlockID   flow.Identifier
	blockWithTx          *flow.Header
	eventEncodingVersion entities.EventEncodingVersion
}

To support new code path I have added helper functions:

func (r *TransactionResult) IsExecuted() bool {
	return r.Status == flow.TransactionStatusExecuted || r.Status == flow.TransactionStatusSealed
}

func (r *TransactionResult) IsFinal() bool {
	return r.Status == flow.TransactionStatusSealed || r.Status == flow.TransactionStatusExpired
}

Hopefully this proposal can make the underlying implementation simpler.
@peterargue would like to hear your thoughts on this as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Access] Pending response from send and subscribe should return immediately
4 participants