forked from hashgraph/hedera-sdk-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
executable.go
151 lines (123 loc) · 3.42 KB
/
executable.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package hedera
import (
"context"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"math"
"time"
"github.com/hashgraph/hedera-sdk-go/v2/proto"
"google.golang.org/grpc"
)
const maxAttempts = 10
type executionState uint32
const (
executionStateRetry executionState = 0
executionStateFinished executionState = 1
executionStateError executionState = 2
)
type method struct {
query func(
context.Context,
*proto.Query,
...grpc.CallOption,
) (*proto.Response, error)
transaction func(
context.Context,
*proto.Transaction,
...grpc.CallOption,
) (*proto.TransactionResponse, error)
}
type response struct {
query *proto.Response
transaction *proto.TransactionResponse
}
type intermediateResponse struct {
query *proto.Response
transaction TransactionResponse
}
type protoRequest struct {
query *proto.Query
transaction *proto.Transaction
}
type QueryHeader struct {
header *proto.QueryHeader
}
type request struct {
query *Query
transaction *Transaction
}
func execute(
client *Client,
request request,
shouldRetry func(request, response) executionState,
makeRequest func(request) protoRequest,
advanceRequest func(request),
getNodeAccountID func(request) AccountID,
getMethod func(request, *channel) method,
mapStatusError func(request, response) error,
mapResponse func(request, response, AccountID, protoRequest) (intermediateResponse, error),
) (intermediateResponse, error) {
maxAttempts := 10
var attempt int64
var errPersistent error
if request.query != nil {
maxAttempts = request.query.maxRetry
} else {
maxAttempts = request.transaction.maxRetry
}
for attempt = int64(0); attempt < int64(maxAttempts); attempt++ {
protoRequest := makeRequest(request)
nodeAccountID := getNodeAccountID(request)
node, ok := client.network.networkNodes[nodeAccountID]
if !ok {
return intermediateResponse{}, ErrInvalidNodeAccountIDSet{nodeAccountID}
}
node.inUse()
channel, err := node.getChannel()
if err != nil {
return intermediateResponse{}, err
}
method := getMethod(request, channel)
advanceRequest(request)
resp := response{}
if !node.isHealthy() {
node.wait()
}
if method.query != nil {
resp.query, err = method.query(context.TODO(), protoRequest.query)
} else {
resp.transaction, err = method.transaction(context.TODO(), protoRequest.transaction)
}
if err != nil {
errPersistent = err
if grpcErr, ok := status.FromError(err); ok && (grpcErr.Code() == codes.Unavailable || grpcErr.Code() == codes.ResourceExhausted) {
node.increaseDelay()
continue
}
return intermediateResponse{}, errors.Wrapf(errPersistent, "retry %d/%d", attempt, maxAttempts)
}
node.decreaseDelay()
retry := shouldRetry(request, resp)
switch retry {
case executionStateRetry:
if attempt <= int64(maxAttempts) {
delayForAttempt(attempt)
continue
} else {
errPersistent = mapStatusError(request, resp)
break
}
case executionStateError:
return intermediateResponse{}, mapStatusError(request, resp)
case executionStateFinished:
return mapResponse(request, resp, node.accountID, protoRequest)
}
}
return intermediateResponse{}, errors.Wrapf(errPersistent, "retry %d/%d", attempt, maxAttempts)
}
func delayForAttempt(attempt int64) {
// 0.1s, 0.2s, 0.4s, 0.8s, ...
ms := int64(math.Floor(50 * math.Pow(2, float64(attempt))))
time.Sleep(time.Duration(ms) * time.Millisecond)
}