-
Notifications
You must be signed in to change notification settings - Fork 43
/
Copy pathtransport.py
161 lines (126 loc) · 4.43 KB
/
transport.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
from utils import get_hostname
class Transport(object):
"""
The actual network transport of data for a CasperFpga object.
"""
def __init__(self, **kwargs):
"""
Initialise the CasperFpga object
:param host:
"""
self.host, self.bitstream = get_hostname(**kwargs)
self.memory_devices = None
self.prog_info = {'last_uploaded': '', 'last_programmed': '',
'system_name': ''}
def connect(self, timeout=None):
"""
:param timeout:
"""
pass
def is_running(self):
"""
Is the FPGA programmed and running?
:return: True or False
"""
raise NotImplementedError
def is_connected(self):
"""
Is the transport layer connected to the platform?
:return: True or False
"""
raise NotImplementedError
def test_connection(self):
"""
Write to and read from the scratchpad to test the connection to the FPGA
- i.e. Is the casper FPGA connected?
:return: Boolean - True/False - Success/Fail
"""
return self.is_connected()
def ping(self):
"""
Use the 'watchdog' request to ping the FPGA host.
:return: True or False
"""
raise NotImplementedError
def disconnect(self):
"""
"""
pass
def read(self, device_name, size, offset=0):
"""
Read `size` bytes from register `device_name`.
Start reading from `offset` bytes from `device_name`'s base address.
Return the read data as a big-endian binary string.
:param device_name: Name of device to be read
:type device_name: String
:param size: Number of bytes to read
:type size: Integer
:param offset: Offset from which to begin read, in bytes
:type offset: Integer
:return: Big-endian binary string
"""
raise NotImplementedError
def blindwrite(self, device_name, data, offset=0):
"""
Write binary data to `device_name`, starting at `offset` bytes from `device_name`'s base address..
:param device_name: Name of device to be read
:type device_name: String
:param data: Data to write
:type data: Big-endian binary string
:param offset: Offset from which to begin write, in bytes
:type offset: Integer
:return: None
"""
raise NotImplementedError
def listdev(self):
"""
Get a list of the memory bus items in this design.
:return: a list of memory devices
"""
return self.memory_devices.keys()
def deprogram(self):
"""
Deprogram the FPGA connected by this transport
"""
raise NotImplementedError
def set_igmp_version(self, version):
"""
:param version:
"""
pass
def upload_to_ram_and_program(self, filename, port=-1, timeout=10,
wait_complete=True, skip_verification=False):
"""
Upload an FPG file to RAM and then program the FPGA.
- Implemented in the child
:param filename: the file to upload
:param port: the port to use on the rx end, -1 means a random port
:param timeout: how long to wait, seconds
:param wait_complete: wait for the transaction to complete, return
after upload if False
:param skip_verification: don't verify the image after uploading it
"""
raise NotImplementedError
def upload_to_flash(self, binary_file, port=-1, force_upload=False,
timeout=30, wait_complete=True):
"""
Upload the provided binary file to the flash filesystem.
:param binary_file: filename of the binary file to upload
:param port: host-side port, -1 means a random port will be used
:param force_upload: upload the binary even if it already exists
on the host
:param timeout: upload timeout, in seconds
:param wait_complete: wait for the upload to complete, or just
kick it off
"""
raise NotImplementedError
def get_system_information_from_transport(self):
"""
"""
return self.bitstream, None
def post_get_system_information(self):
"""
Cleanup run after get_system_information
"""
pass
# end