forked from digibyte/digibyte
-
Notifications
You must be signed in to change notification settings - Fork 65
/
Copy pathp2p_invalid_block.py
executable file
·156 lines (125 loc) · 6.32 KB
/
p2p_invalid_block.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#!/usr/bin/env python3
# Copyright (c) 2021-2022 The DigiByte Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test node responses to invalid blocks.
In this test we connect to one node over p2p, and test block requests:
1) Valid blocks should be requested and become chain tip.
2) Invalid block with duplicated transaction should be re-requested.
3) Invalid block with bad coinbase value should be rejected and not
re-requested.
4) Invalid block due to future timestamp is later accepted when that timestamp
becomes valid.
"""
import copy
import time
from test_framework.blocktools import create_block, create_coinbase, create_tx_with_script
from test_framework.messages import COIN
from test_framework.p2p import P2PDataStore
from test_framework.test_framework import DigiByteTestFramework
from test_framework.util import assert_equal
MAX_FUTURE_BLOCK_TIME = 2 * 60 * 60
class InvalidBlockRequestTest(DigiByteTestFramework):
def set_test_params(self):
self.num_nodes = 1
self.setup_clean_chain = True
self.extra_args = [["[email protected]"]]
def run_test(self):
# Add p2p connection to node0
node = self.nodes[0] # convenience reference to the node
peer = node.add_p2p_connection(P2PDataStore())
best_block = node.getblock(node.getbestblockhash())
tip = int(node.getbestblockhash(), 16)
height = best_block["height"] + 1
block_time = best_block["time"] + 1
self.log.info("Create a new block with an anyone-can-spend coinbase")
height = 1
block = create_block(tip, create_coinbase(height), block_time)
block.solve()
# Save the coinbase for later
block1 = block
tip = block.sha256
peer.send_blocks_and_test([block1], node, success=True)
self.log.info("Mature the block.")
self.generatetoaddress(node, 100, node.get_deterministic_priv_key().address)
best_block = node.getblock(node.getbestblockhash())
tip = int(node.getbestblockhash(), 16)
height = best_block["height"] + 1
block_time = best_block["time"] + 1
# Use merkle-root malleability to generate an invalid block with
# same blockheader (CVE-2012-2459).
# Manufacture a block with 3 transactions (coinbase, spend of prior
# coinbase, spend of that spend). Duplicate the 3rd transaction to
# leave merkle root and blockheader unchanged but invalidate the block.
# For more information on merkle-root malleability see src/consensus/merkle.cpp.
self.log.info("Test merkle root malleability.")
block2 = create_block(tip, create_coinbase(height), block_time)
block_time += 1
# b'0x51' is OP_TRUE
tx1 = create_tx_with_script(block1.vtx[0], 0, script_sig=b'\x51', amount=50 * COIN)
tx2 = create_tx_with_script(tx1, 0, script_sig=b'\x51', amount=50 * COIN)
block2.vtx.extend([tx1, tx2])
block2.hashMerkleRoot = block2.calc_merkle_root()
block2.rehash()
block2.solve()
orig_hash = block2.sha256
block2_orig = copy.deepcopy(block2)
# Mutate block 2
block2.vtx.append(tx2)
assert_equal(block2.hashMerkleRoot, block2.calc_merkle_root())
assert_equal(orig_hash, block2.rehash())
assert block2_orig.vtx != block2.vtx
peer.send_blocks_and_test([block2], node, success=False, reject_reason='bad-txns-duplicate')
# Check transactions for duplicate inputs (CVE-2018-17144)
self.log.info("Test duplicate input block.")
block2_dup = copy.deepcopy(block2_orig)
block2_dup.vtx[2].vin.append(block2_dup.vtx[2].vin[0])
block2_dup.vtx[2].rehash()
block2_dup.hashMerkleRoot = block2_dup.calc_merkle_root()
block2_dup.rehash()
block2_dup.solve()
peer.send_blocks_and_test([block2_dup], node, success=False, reject_reason='bad-txns-inputs-duplicate')
self.log.info("Test very broken block.")
block3 = create_block(tip, create_coinbase(height), block_time)
block_time += 1
block3.vtx[0].vout[0].nValue = 73000 * COIN # Too high!
block3.vtx[0].sha256 = None
block3.vtx[0].calc_sha256()
block3.hashMerkleRoot = block3.calc_merkle_root()
block3.rehash()
block3.solve()
peer.send_blocks_and_test([block3], node, success=False, reject_reason='bad-cb-amount')
# Complete testing of CVE-2012-2459 by sending the original block.
# It should be accepted even though it has the same hash as the mutated one.
self.log.info("Test accepting original block after rejecting its mutated version.")
peer.send_blocks_and_test([block2_orig], node, success=True, timeout=5)
# Update tip info
height += 1
block_time += 1
tip = int(block2_orig.hash, 16)
# Complete testing of CVE-2018-17144, by checking for the inflation bug.
# Create a block that spends the output of a tx in a previous block.
block4 = create_block(tip, create_coinbase(height), block_time)
tx3 = create_tx_with_script(tx2, 0, script_sig=b'\x51', amount=50 * COIN)
# Duplicates input
tx3.vin.append(tx3.vin[0])
tx3.rehash()
block4.vtx.append(tx3)
block4.hashMerkleRoot = block4.calc_merkle_root()
block4.rehash()
block4.solve()
self.log.info("Test inflation by duplicating input")
peer.send_blocks_and_test([block4], node, success=False, reject_reason='bad-txns-inputs-duplicate')
self.log.info("Test accepting identical block after rejecting it due to a future timestamp.")
t = int(time.time())
node.setmocktime(t)
# Set block time +1 second past max future validity
block = create_block(tip, create_coinbase(height), t + MAX_FUTURE_BLOCK_TIME + 1)
block.hashMerkleRoot = block.calc_merkle_root()
block.solve()
# Need force_send because the block will get rejected without a getdata otherwise
peer.send_blocks_and_test([block], node, force_send=True, success=False, reject_reason='time-too-new')
node.setmocktime(t + 1)
peer.send_blocks_and_test([block], node, success=True)
if __name__ == '__main__':
InvalidBlockRequestTest().main()