Skip to content

Commit

Permalink
v0.5.1-beta
Browse files Browse the repository at this point in the history
  • Loading branch information
prezha committed Nov 26, 2021
1 parent 8a72d59 commit 3ed166d
Show file tree
Hide file tree
Showing 9 changed files with 1,397 additions and 1 deletion.
10 changes: 9 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
*.dll
*.so
*.dylib
out/
cosmos-flooder
*.tar.gz
*.tgz

# Test binary, built with `go test -c`
*.test
Expand All @@ -12,4 +16,8 @@
*.out

# Dependency directories (remove the comment below to include it)
# vendor/
vendor/

cudos-data/
logs/
data/
149 changes: 149 additions & 0 deletions cmd/account.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
Copyright © 2021 prezha
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cmd

import (
"fmt"
"strconv"
"strings"
"sync"
"time"
)

type account struct {
m *sync.Mutex // ensure exclusive account access to control sequence in concurrent and sequential runs; use account.lock() and account.unlock()
id int
name string
address string
mnemonic string
sequence uint
}

func (a *account) lock() {
a.m.Lock()
}

func (a *account) unlock() {
a.m.Unlock()
}

// add adds amount from sponsor referencing note within 10 seconds (timeout)
func (a *account) add(amount string, sponsor *account, note string) error {
job := job{"topup", flags{sponsor, a.address, amount, fees, gas, note, chainId, "", broadcastMode, keyringBackend, "", sponsor.sequence, ""}, time.Now().Add(10 * time.Second), false}
return job.execute()
}

// load loads up to qty accounts that have prefix in their name
func load(qty int, prefix string) (map[int]*account, error) {
var out, out2 []byte
var err error
if out, _, err = run(fmt.Sprintf("%s keys list --keyring-backend %s --output json", bin, keyringBackend), keyringPassphrase); err != nil {
return nil, trimerr(err)
}
// make sure '.[]|.name,"\t",.address,"\n"' is tied together as one argument!
if out2, _, err = run(`jq -j .[]|.name,"\t",.address,"\n"`, string(out)); err != nil {
return nil, fmt.Errorf("parsing %s: %v", out, err)
}

accs := strings.Split(string(out2), "\n")

accounts := map[int]*account{}
for _, acc := range accs {
if len(acc) > 1 {
name := strings.TrimSpace(strings.Split(string(acc), string('\t'))[0])
if strings.HasPrefix(name, prefix) {
if n := strings.Split(name, prefix); len(n) > 1 {
ord, err := strconv.Atoi(n[1])
if err != nil {
return nil, fmt.Errorf("parsing index: %v", err)
}

addr := strings.TrimSpace(strings.Split(string(acc), string('\t'))[1])
seq, err := sequenceFromBC(addr, lcdNode)
if err != nil {
fmt.Printf("error getting account sequence (skipping!): %v\n", err)
continue
}

accounts[ord] = &account{&sync.Mutex{}, ord, name, addr, "", seq}
if len(accounts) == qty {
return accounts, nil
}
}
}
}
}

return accounts, nil
}

// charge creates qty new accounts each with name prefix and balance from sponsor
func charge(qty int, prefix string, balance string, sponsor *account, endpoints map[int]*account) (map[int]*account, error) {
for i := 0; i < qty; i++ {
acc, err := newAccount(prefix, i)
if err != nil {
return nil, err
}
if err := acc.add(balance, sponsor, "charging"); err != nil {
return nil, err
}
endpoints[i] = acc
// time.Sleep(100 * time.Millisecond) // prevent overloading
}
return endpoints, nil
}

// newAccount returns address of newly created account initialised with name ("prefix-index")
// in case that the account with the same name exists, if will be replaced with new one
func newAccount(prefix string, index int) (acc *account, err error) {
name := fmt.Sprintf("%s%03d", prefix, index)

var out []byte

// create new account
if out, _, err = run(fmt.Sprintf("%s keys add %s --keyring-backend %s --output json", bin, name, keyringBackend), "y"); err != nil { // TODO: add support for keyringPassphrase for non-"test" keyringBackend?
return nil, err
}
// make sure '[.address,.mnemonic]|@tsv' is tied together as one argument!
if out, _, err = run("jq -r [.address,.mnemonic]|@tsv", string(out)); err != nil {
return nil, err
}

if len(out) > 1 {
address := strings.TrimSpace(strings.Split(string(out), string('\t'))[0])
mnemonic := strings.TrimSpace(strings.Split(string(out), string('\t'))[1])
return &account{&sync.Mutex{}, index, name, address, mnemonic, 0}, nil // make sure caller can always call m.Unlock()
}

return nil, fmt.Errorf("failed to create new account: %v", err)
}

// sequenceFromBC queries bc via lcdNode for account info and returns next available sequence (nonce)
func sequenceFromBC(address string, lcdNode string) (sequence uint, err error) {
var seq int
var out, out2 []byte
if out, _, err = run(fmt.Sprintf("curl -sS http://%s/cosmos/auth/v1beta1/accounts/%s", lcdNode, address), ""); err != nil {
return 0, err
}
if out2, _, err = run("jq -r .account.sequence", string(out)); err != nil {
return 0, fmt.Errorf("parsing sequence %s: %v", out, err)
}
if seq, err = strconv.Atoi(strings.TrimSpace(string(out2))); err != nil {
return 0, fmt.Errorf("parsing sequence %s: %v", out, err)
}
return uint(seq), nil
}
122 changes: 122 additions & 0 deletions cmd/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
Copyright © 2021 prezha
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cmd

import (
"fmt"
"math/rand"
"regexp"
"strconv"
"sync"
"time"
)

type job struct {
id string
flags flags
deadline time.Time
stop bool
}

// execute will try to run job
// it will lock sender's account address during execution to reduce 'account sequence mismatch' errors rate
// likewise, it will also preemptively update account sequece value for next use
func (j *job) execute() error {
start := time.Now()

var err error
var out []byte

// handle 'Error: rpc error: code = InvalidArgument desc = account sequence mismatch, expected 157, got 156: incorrect account sequence: invalid request'
re := regexp.MustCompile(`account sequence mismatch, expected ([0-9]+),`) // extract right sequence from error

if j.flags.sender == nil && len(endpoints) > 0 {
i := rand.Intn(len(endpoints))
j.flags.sender = endpoints[i]
// time.Sleep(100 * time.Millisecond) // wait for selected sender's account to stabilise/sync
}
j.flags.sender.lock()
defer j.flags.sender.unlock()

retries := 0
j.flags.sequence = j.flags.sender.sequence // copy sender's account sequence to flags
for time.Now().Before(j.deadline) {
args := j.flags.hydrate()

cmd := fmt.Sprintf("%s %s", subcmd, args)
fmt.Printf("%s job started: %s\n", j.id, cmd)
fmt.Printf("%s sender: %+v\n", j.id, j.flags.sender)
out, _, err = run(cmd, keyringPassphrase)
seq := int(j.flags.sender.sequence)
var err2 error
if err != nil {
if r := re.FindStringSubmatch(trimerr(err).Error()); len(r) > 0 {
if seq, err2 = strconv.Atoi(r[1]); err2 == nil {
fmt.Printf("%s job errored (will retry): %v\n", j.id, trimerr(err))
retries++
j.flags.sequence = uint(seq) // update job's flags sequence
j.flags.sender.sequence = uint(seq) // update sender's account sequence
continue
}
}
return fmt.Errorf("%s job failed (unretryable) after %s: %v", j.id, time.Since(start), trimerr(err))
}
fmt.Printf("%s job completed in %s (retries: %d): %s\n", j.id, time.Since(start), retries, out)
j.flags.sender.sequence = uint(seq) + 1 // automatically set sender's account sequence on successful job completion for next run
return nil
}
return fmt.Errorf("%s job timed out after %s (retries: %d): %v", j.id, time.Since(start), retries, trimerr(err))
}

func spawn(flags flags) {
// delay := time.Duration(b.duration.Nanoseconds() / int64(b.qty))

deadline := time.Now().Add(duration)

if requests < workers { // don't spawn more workers than jobs needed
workers = requests
}
jobs := make(chan job, workers)

var wg sync.WaitGroup
// spawn all workers
for i := 0; i < workers; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()

for j := range jobs {
if j.stop {
close(jobs)
return
}
j.id = fmt.Sprintf("%s.%s", j.id, strconv.Itoa(i)) // amend job id with worker id
if err := j.execute(); err != nil {
fmt.Printf("%v\n", err)
}
}
}(i)
}
// send all jobs to channel, signal the end and wait for completion
for i := 0; i < requests; i++ {
id := fmt.Sprintf("%s-%s", testId, strconv.Itoa(i))
flags.note = id
jobs <- job{id, flags, deadline, false}
}
jobs <- job{stop: true} // signal end of jobs queue for channel to close and stop workers
wg.Wait()
}
Loading

0 comments on commit 3ed166d

Please sign in to comment.