-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #221 from LedgerHQ/fbe/swap_ton
Add tests for TON swap
- Loading branch information
Showing
372 changed files
with
1,370 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
import sys | ||
import base64 | ||
from pathlib import Path | ||
from fastcrc import crc16 | ||
from enum import IntEnum | ||
|
||
from ragger.backend.interface import BackendInterface, RAPDU | ||
from ragger.bip import pack_derivation_path | ||
from ragger.utils import create_currency_config | ||
from ragger.error import ExceptionRAPDU | ||
|
||
TON_CONF = create_currency_config("TON", "TON") | ||
|
||
TON_DERIVATION_PATH = "m/44'/607'/0'/0'/0'/0'" | ||
TON_PACKED_DERIVATION_PATH = pack_derivation_path(TON_DERIVATION_PATH) | ||
DEVICE_PUBLIC_KEY = bytes.fromhex("9aff66dcc01b79787f6038c8070b8f7b9f27e381297c846a59f743bb075ed61c") | ||
|
||
SW_SWAP_FAILURE = 0xB009 | ||
|
||
MAX_APDU_LEN: int = 255 | ||
CLA = 0xE0 | ||
|
||
class Bounceability(IntEnum): | ||
BOUNCEABLE = 0x11 | ||
NON_BOUNCEABLE = 0x51 | ||
|
||
class WorkchainID(IntEnum): | ||
BASE_CHAIN = 0x00 | ||
MASTER_CHAIN = 0xff | ||
|
||
def craft_address(bounceability: Bounceability, workchain_id: WorkchainID, device_public_key: bytes): | ||
payload = b'' | ||
payload += int.to_bytes(bounceability, length=1, byteorder='big') | ||
payload += int.to_bytes(workchain_id, length=1, byteorder='big') | ||
payload += device_public_key | ||
payload += int.to_bytes(crc16.xmodem(payload), length=2, byteorder='big') | ||
return base64.b64encode(payload) |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
from tonsdk.boc import Builder | ||
|
||
|
||
class MyBuilder(Builder): | ||
def store_maybe_ref(self, src): | ||
if src is not None: | ||
self.store_bit(1) | ||
self.store_ref(src) | ||
else: | ||
self.store_bit(0) | ||
|
||
return self | ||
|
||
def store_string_tail(self, bs: bytes): | ||
store_max = self.bits.get_free_bits() // 8 | ||
self.store_bytes(bs[:store_max]) | ||
if len(bs) > store_max: | ||
inner = begin_cell().store_string_tail(bs[store_max:]).end_cell() | ||
self.store_ref(inner) | ||
|
||
return self | ||
|
||
|
||
def begin_cell(): | ||
return MyBuilder() |
211 changes: 211 additions & 0 deletions
211
test/python/apps/ton_application_client/ton_command_sender.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,211 @@ | ||
from enum import IntEnum, IntFlag | ||
from typing import Generator, Optional | ||
from contextlib import contextmanager | ||
|
||
from ragger.backend.interface import BackendInterface, RAPDU | ||
from ragger.bip import pack_derivation_path | ||
|
||
from .ton_utils import split_message | ||
|
||
|
||
MAX_APDU_LEN: int = 255 | ||
|
||
CLA: int = 0xE0 | ||
|
||
class P1(IntEnum): | ||
P1_NONE = 0x00 | ||
|
||
P1_CONFIRM = 0x01 | ||
|
||
P1_NON_CONFIRM = 0x00 | ||
|
||
class P2(IntFlag): | ||
P2_NONE = 0x00 | ||
|
||
P2_FIRST = 0x01 | ||
|
||
P2_MORE = 0x02 | ||
|
||
class InsType(IntEnum): | ||
GET_VERSION = 0x03 | ||
GET_APP_NAME = 0x04 | ||
GET_PUBLIC_KEY = 0x05 | ||
SIGN_TX = 0x06 | ||
GET_ADDRESS_PROOF = 0x08 | ||
SIGN_DATA = 0x09 | ||
GET_APP_SETTINGS = 0x0A | ||
|
||
class Errors(IntEnum): | ||
SW_DENY = 0x6985 | ||
SW_WRONG_P1P2 = 0x6A86 | ||
SW_WRONG_DATA_LENGTH = 0x6A87 | ||
SW_INS_NOT_SUPPORTED = 0x6D00 | ||
SW_CLA_NOT_SUPPORTED = 0x6E00 | ||
SW_WRONG_RESPONSE_LENGTH = 0xB000 | ||
SW_DISPLAY_ADDRESS_FAIL = 0xB002 | ||
SW_DISPLAY_AMOUNT_FAIL = 0xB003 | ||
SW_WRONG_TX_LENGTH = 0xB004 | ||
SW_TX_PARSING_FAIL = 0xB010 | ||
SW_WRONG_SIGN_DATA_LENGTH = 0xB005 | ||
SW_SIGN_DATA_PARSING_FAIL = 0xB011 | ||
SW_BAD_STATE = 0xB007 | ||
SW_SIGNATURE_FAIL = 0xB008 | ||
SW_REQUEST_TOO_LONG = 0xB00B | ||
SW_BAD_BIP32_PATH = 0XB0BD | ||
SW_BLIND_SIGNING_DISABLED = 0xBD00 | ||
|
||
class AddressDisplayFlags(IntFlag): | ||
NONE = 0 | ||
TESTNET = 1 | ||
MASTERCHAIN = 2 | ||
|
||
|
||
class BoilerplateCommandSender: | ||
def __init__(self, backend: BackendInterface) -> None: | ||
self.backend = backend | ||
|
||
|
||
def get_app_and_version(self) -> RAPDU: | ||
return self.backend.exchange(cla=0xB0, # specific CLA for BOLOS | ||
ins=0x01, # specific INS for get_app_and_version | ||
p1=P1.P1_NONE, | ||
p2=P2.P2_NONE, | ||
data=b"") | ||
|
||
|
||
def get_version(self) -> RAPDU: | ||
return self.backend.exchange(cla=CLA, | ||
ins=InsType.GET_VERSION, | ||
p1=P1.P1_NONE, | ||
p2=P2.P2_NONE, | ||
data=b"") | ||
|
||
|
||
def get_app_name(self) -> RAPDU: | ||
return self.backend.exchange(cla=CLA, | ||
ins=InsType.GET_APP_NAME, | ||
p1=P1.P1_NONE, | ||
p2=P2.P2_NONE, | ||
data=b"") | ||
|
||
|
||
def get_public_key(self, path: str) -> RAPDU: | ||
return self.backend.exchange(cla=CLA, | ||
ins=InsType.GET_PUBLIC_KEY, | ||
p1=P1.P1_NON_CONFIRM, | ||
p2=AddressDisplayFlags.NONE, | ||
data=pack_derivation_path(path)) | ||
|
||
|
||
def get_app_settings(self) -> RAPDU: | ||
return self.backend.exchange(cla=CLA, | ||
ins=InsType.GET_APP_SETTINGS, | ||
p1=P1.P1_NONE, | ||
p2=P2.P2_NONE, | ||
data=b"") | ||
|
||
|
||
@contextmanager | ||
def get_public_key_with_confirmation(self, | ||
path: str, | ||
display_flags: AddressDisplayFlags, | ||
is_v3r2: bool = False, | ||
subwallet_id: int = 698983191, | ||
) -> Generator[None, None, None]: | ||
specifiers = bytes([]) | ||
if is_v3r2 or subwallet_id != 698983191: | ||
display_flags |= 4 | ||
specifiers = b"".join([ | ||
bytes([1 if is_v3r2 else 0]), | ||
subwallet_id.to_bytes(4, byteorder="big"), | ||
]) | ||
with self.backend.exchange_async(cla=CLA, | ||
ins=InsType.GET_PUBLIC_KEY, | ||
p1=P1.P1_CONFIRM, | ||
p2=display_flags, | ||
data=b"".join([ | ||
pack_derivation_path(path), | ||
specifiers, | ||
])) as response: | ||
yield response | ||
|
||
@contextmanager | ||
def get_address_proof(self, | ||
path: str, | ||
display_flags: AddressDisplayFlags, | ||
domain: str, | ||
timestamp: int, | ||
payload: bytes, | ||
is_v3r2: bool = False, | ||
subwallet_id: int = 698983191) -> Generator[None, None, None]: | ||
domain_b = bytes(domain, "utf8") | ||
specifiers = bytes([]) | ||
if is_v3r2 or subwallet_id != 698983191: | ||
display_flags |= 4 | ||
specifiers = b"".join([ | ||
bytes([1 if is_v3r2 else 0]), | ||
subwallet_id.to_bytes(4, byteorder="big"), | ||
]) | ||
req_bytes = b"".join([ | ||
pack_derivation_path(path), | ||
specifiers, | ||
bytes([len(domain_b)]), | ||
domain_b, | ||
timestamp.to_bytes(8, byteorder="big"), | ||
payload | ||
]) | ||
with self.backend.exchange_async(cla=CLA, | ||
ins=InsType.GET_ADDRESS_PROOF, | ||
p1=P1.P1_CONFIRM, | ||
p2=display_flags, | ||
data=req_bytes) as response: | ||
yield response | ||
|
||
@contextmanager | ||
def sign_tx(self, path: str, transaction: bytes) -> Generator[None, None, None]: | ||
self.backend.exchange(cla=CLA, | ||
ins=InsType.SIGN_TX, | ||
p1=P1.P1_NONE, | ||
p2=(P2.P2_FIRST | P2.P2_MORE), | ||
data=pack_derivation_path(path)) | ||
messages = split_message(transaction, MAX_APDU_LEN) | ||
|
||
for msg in messages[:-1]: | ||
self.backend.exchange(cla=CLA, | ||
ins=InsType.SIGN_TX, | ||
p1=P1.P1_NONE, | ||
p2=P2.P2_MORE, | ||
data=msg) | ||
|
||
with self.backend.exchange_async(cla=CLA, | ||
ins=InsType.SIGN_TX, | ||
p1=P1.P1_NONE, | ||
p2=P2.P2_NONE, | ||
data=messages[-1]) as response: | ||
yield response | ||
|
||
@contextmanager | ||
def sign_data(self, path: str, data: bytes) -> Generator[None, None, None]: | ||
self.backend.exchange(cla=CLA, | ||
ins=InsType.SIGN_DATA, | ||
p1=P1.P1_NONE, | ||
p2=(P2.P2_FIRST | P2.P2_MORE), | ||
data=pack_derivation_path(path)) | ||
messages = split_message(data, MAX_APDU_LEN) | ||
|
||
for msg in messages[:-1]: | ||
self.backend.exchange(cla=CLA, | ||
ins=InsType.SIGN_DATA, | ||
p1=P1.P1_NONE, | ||
p2=P2.P2_MORE, | ||
data=msg) | ||
|
||
with self.backend.exchange_async(cla=CLA, | ||
ins=InsType.SIGN_DATA, | ||
p1=P1.P1_NONE, | ||
p2=P2.P2_NONE, | ||
data=messages[-1]) as response: | ||
yield response | ||
|
||
def get_async_response(self) -> Optional[RAPDU]: | ||
return self.backend.last_async_response |
76 changes: 76 additions & 0 deletions
76
test/python/apps/ton_application_client/ton_response_unpacker.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
from typing import Tuple | ||
from struct import unpack | ||
|
||
# remainder, data_len, data | ||
def pop_sized_buf_from_buffer(buffer:bytes, size:int) -> Tuple[bytes, bytes]: | ||
return buffer[size:], buffer[0:size] | ||
|
||
# remainder, data_len, data | ||
def pop_size_prefixed_buf_from_buf(buffer:bytes) -> Tuple[bytes, int, bytes]: | ||
data_len = buffer[0] | ||
return buffer[1+data_len:], data_len, buffer[1:data_len+1] | ||
|
||
# Unpack from response: | ||
# response = app_name (var) | ||
def unpack_get_app_name_response(response: bytes) -> str: | ||
return response.decode("ascii") | ||
|
||
# Unpack from response: | ||
# response = MAJOR (1) | ||
# MINOR (1) | ||
# PATCH (1) | ||
def unpack_get_version_response(response: bytes) -> Tuple[int, int, int]: | ||
assert len(response) == 3 | ||
major, minor, patch = unpack("BBB", response) | ||
return (major, minor, patch) | ||
|
||
# Unpack from response: | ||
# response = format_id (1) | ||
# app_name_raw_len (1) | ||
# app_name_raw (var) | ||
# version_raw_len (1) | ||
# version_raw (var) | ||
# unused_len (1) | ||
# unused (var) | ||
def unpack_get_app_and_version_response(response: bytes) -> Tuple[str, str]: | ||
response, _ = pop_sized_buf_from_buffer(response, 1) | ||
response, _, app_name_raw = pop_size_prefixed_buf_from_buf(response) | ||
response, _, version_raw = pop_size_prefixed_buf_from_buf(response) | ||
response, _, _ = pop_size_prefixed_buf_from_buf(response) | ||
|
||
assert len(response) == 0 | ||
|
||
return app_name_raw.decode("ascii"), version_raw.decode("ascii") | ||
|
||
def unpack_sign_tx_response(response: bytes) -> Tuple[bytes, bytes]: | ||
response, sig_len, sig = pop_size_prefixed_buf_from_buf(response) | ||
response, hash_len, hash_b = pop_size_prefixed_buf_from_buf(response) | ||
|
||
assert sig_len == len(sig) | ||
assert hash_len == len(hash_b) | ||
|
||
assert len(response) == 0 | ||
|
||
return sig, hash_b | ||
|
||
def unpack_sign_data_response(response: bytes) -> Tuple[bytes, bytes]: | ||
response, sig_len, sig = pop_size_prefixed_buf_from_buf(response) | ||
response, hash_len, hash_b = pop_size_prefixed_buf_from_buf(response) | ||
|
||
assert sig_len == len(sig) | ||
assert hash_len == len(hash_b) | ||
|
||
assert len(response) == 0 | ||
|
||
return sig, hash_b | ||
|
||
def unpack_proof_response(response: bytes) -> Tuple[bytes, bytes]: | ||
response, sig_len, sig = pop_size_prefixed_buf_from_buf(response) | ||
response, hash_len, hash_b = pop_size_prefixed_buf_from_buf(response) | ||
|
||
assert sig_len == len(sig) | ||
assert hash_len == len(hash_b) | ||
|
||
assert len(response) == 0 | ||
|
||
return sig, hash_b |
Oops, something went wrong.