-
Notifications
You must be signed in to change notification settings - Fork 43
/
Copy pathsnap.py
372 lines (348 loc) · 16.2 KB
/
snap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
from __future__ import print_function
import logging
import time
from memory import Memory
import bitfield
from register import Register
LOGGER = logging.getLogger(__name__)
class Snap(Memory):
"""
Snap blocks are triggered/controlled blocks of RAM on FPGAs.
"""
def __init__(self, parent, name, width_bits, address, length_bytes,
device_info=None):
super(Snap, self).__init__(name=name, width_bits=width_bits,
address=address, length_bytes=length_bytes)
self.parent = parent
self.block_info = device_info
self.field_add(bitfield.Field(name='data', numtype=0,
width_bits=self.width_bits,
binary_pt=0, lsb_offset=0))
self.control_registers = {
'control': {'register': None, 'name': self.name + '_ctrl'},
'status': {'register': None, 'name': self.name + '_status'},
'trig_offset': {'register': None,
'name': self.name + '_trig_offset'},
'extra_value': {'register': None, 'name': self.name + '_val'},
'tr_en_cnt': {'register': None, 'name': self.name + '_tr_en_cnt'}}
LOGGER.debug('New Snap %s' % self)
@classmethod
def from_device_info(cls, parent, device_name, device_info, memorymap_dict, **kwargs):
"""
Process device info and the memory map to get all necessary
info and return a Snap instance.
:param parent: the Casperfpga on which this snap is found
:param device_name: the unique device name
:param device_info: information about this device
:param memorymap_dict: a dictionary containing the device memory map
:return: a Snap object
"""
address, length_bytes = -1, -1
for mem_name in memorymap_dict.keys():
if mem_name == device_name + '_bram':
address = memorymap_dict[mem_name]['address']
length_bytes = memorymap_dict[mem_name]['bytes']
break
word_bits = int(device_info['data_width'])
num_bytes = pow(2, int(device_info['nsamples'])) * (word_bits/8)
if length_bytes == -1:
length_bytes = num_bytes
if length_bytes != num_bytes:
raise RuntimeError('%s has mask length_bytes %d bytes, '
'but mem map length_bytes %d bytes' %
(device_name, num_bytes, length_bytes))
return cls(parent, device_name, width_bits=word_bits, address=address,
length_bytes=length_bytes, device_info=device_info)
def post_create_update(self, raw_system_info):
"""
Update the device with information not available at creation.
:param raw_system_info: dictionary of device information
"""
# is this snap block inside a bitsnap block?
for dev_name, dev_info in raw_system_info.items():
if dev_name != '':
if dev_info['tag'] == 'casper:bitsnap':
if self.name == dev_name + '_ss':
self.update_from_bitsnap(dev_info)
break
# find control registers for this snap block
self._link_control_registers(raw_system_info)
def update_from_bitsnap(self, info):
"""
Update this device with information from a bitsnap container.
:type self: Snap
:param info: device information dictionary containing Simulink block
information
"""
clean_fields = bitfield.clean_fields
self.block_info = info
if self.width_bits != int(info['snap_data_width']):
raise ValueError('Snap and matched bitsnap widths do not match.')
samples_bytes = pow(2, int(info['snap_nsamples'])) * (self.width_bits/8)
if self.length_bytes != samples_bytes:
raise ValueError('Snap and matched bitsnap lengths do not match.')
fields = {'names': clean_fields(self.name, 'snapshot',
info['io_names']),
'widths': clean_fields(self.name, 'snapshot',
info['io_widths']),
'types': clean_fields(self.name, 'snapshot',
info['io_types']),
'bps': clean_fields(self.name, 'snapshot',
info['io_bps'])}
fields['names'].reverse()
fields['widths'].reverse()
fields['types'].reverse()
fields['bps'].reverse()
self.fields_clear()
len_names = len(fields['names'])
for fld in ['widths', 'types', 'bps']:
# convert the number-based fields to integers
for n, val in enumerate(fields[fld]):
try:
intvalue = int(val)
except ValueError:
intvalue = eval(val)
fields[fld][n] = intvalue
# accommodate new snapshots where the fields may have length one
len_fld = len(fields[fld])
if len_fld != len_names:
if len_fld != 1:
raise RuntimeError('%i names, but %i %s?' % (
len_names, len_fld, fld))
fields[fld] = [fields[fld][0]] * len_names
# construct the fields and add them to this BitField
for ctr, name in enumerate(fields['names']):
field = bitfield.Field(name=name,
numtype=fields['types'][ctr],
width_bits=fields['widths'][ctr],
binary_pt=fields['bps'][ctr],
lsb_offset=-1)
self.field_add(field, auto_offset=True)
def _link_control_registers(self, raw_device_info):
"""
Link available registers to this snapshot block's control registers.
:param raw_device_info: Information about the device in raw form
"""
for controlreg in self.control_registers.values():
try:
reg = self.parent.memory_devices[controlreg['name']]
assert isinstance(reg, Register)
controlreg['register'] = reg
except KeyError:
pass
# set up the control register fields
if (self.control_registers['control']['register'] is None) or \
(self.control_registers['status']['register'] is None):
raise RuntimeError('Critical control registers for snap %s '
'missing.' % self.name)
if 'value' in self.block_info.keys():
self.block_info['snap_value'] = self.block_info['value']
if self.block_info['snap_value'] == 'on':
if self.control_registers['extra_value']['register'] is None:
raise RuntimeError('snap %s extra value register specified, '
'but not found. Problem.' % self.name)
extra_reg = self.control_registers['extra_value']
extra_info = raw_device_info[extra_reg['name']]
extra_info['mode'] = 'fields of arbitrary size'
if 'extra_names' in self.block_info.keys():
extra_info['names'] = self.block_info['extra_names']
extra_info['bitwidths'] = self.block_info['extra_widths']
extra_info['arith_types'] = self.block_info['extra_types']
extra_info['bin_pts'] = self.block_info['extra_bps']
else:
extra_info['names'] = '[reg]'
extra_info['bitwidths'] = '[32]'
extra_info['arith_types'] = '[0]'
extra_info['bin_pts'] = '[0]'
extra_reg['register'].process_info(extra_info)
def arm(self, man_trig=False, man_valid=False, offset=-1,
circular_capture=False):
"""
Arm the snapshot block.
:param man_trig:
:type man_trig: bool
:param man_valid:
:type man_valid: bool
:param offset:
:type offset: int
:param circular_capture:
:type circular_capture: bool
"""
ctrl_reg = self.control_registers['control']['register']
if offset >= 0:
self.control_registers['trig_offset']['register'].write_int(offset)
ctrl_reg.write_int(
(0 + (man_trig << 1) + (man_valid << 2) + (circular_capture << 3)))
ctrl_reg.write_int(
(1 + (man_trig << 1) + (man_valid << 2) + (circular_capture << 3)))
def print_snap(self, limit_lines=-1, **kwargs):
"""
Read and print(a snap block.)
:param limit_lines: limit the number of lines to print
:param offset: trigger offset
:param man_valid: force valid to be true
:param man_trig: force a trigger now
:param circular_capture: enable circular capture
:param timeout: time out after this many seconds
:param read_nowait: do not wait for the snap to finish reading
"""
snapdata = self.read(**kwargs)
for ctr in range(0, len(snapdata['data'][snapdata['data'].keys()[0]])):
print('%5i ' % ctr, end='')
for key in sorted(snapdata['data'].keys()):
print('{}({})\t'.format(key, snapdata['data'][key][ctr]), end='')
print('')
if (limit_lines > 0) and (ctr == limit_lines):
break
print('Capture offset: %i' % snapdata['offset'])
def read(self, **kwargs):
"""
Override Memory.read to handle the extra value register.
:param offset: trigger offset
:param man_valid: force valid to be true
:param man_trig: force a trigger now
:param circular_capture: enable circular capture
:param timeout: time out after this many seconds
:param read_nowait: do not wait for the snap to finish reading
"""
rawdata, rawtime = self.read_raw(**kwargs)
processed = self._process_data(rawdata['data'])
if 'offset' in rawdata.keys():
offset = rawdata['offset']
else:
offset = 0
return {'data': processed, 'offset': offset, 'timestamp': rawtime,
'extra_value': rawdata['extra_value']}
def read_raw(self, **kwargs):
"""
Read snap data from the memory device.
"""
snapsetup = {
'man_trig': False,
'man_valid': False,
'timeout': -1,
'offset': -1,
'read_nowait': False,
'circular_capture': False,
'arm': True
}
for kkey in kwargs.keys():
if kkey not in snapsetup:
raise RuntimeError('Invalid kwarg for '
'snap read_raw(): %s' % kkey)
for setupvar in snapsetup:
if setupvar in kwargs:
snapsetup[setupvar] = kwargs[setupvar]
if snapsetup['arm']:
self.arm(man_trig=snapsetup['man_trig'],
man_valid=snapsetup['man_valid'],
offset=snapsetup['offset'],
circular_capture=snapsetup['circular_capture'])
else:
error = False
for req in ['man_trig', 'man_valid', 'offset', 'circular_capture']:
if req in kwargs:
error = True
break
if error:
raise RuntimeError('Additional kwargs to snapshot read_raw() '
'will have no effect if arm=False '
'is specified.')
done = snapsetup['read_nowait']
start_time = time.time()
# TODO - what would a sensible option be to check addr?
# the default of zero is probably not right
addr = 0
while (not done) and \
((time.time() - start_time) < snapsetup['timeout'] or
(snapsetup['timeout'] < 0)):
addr = self.control_registers['status']['register'].read_uint()
done = not bool(addr & 0x80000000)
if snapsetup['read_nowait']:
addr = self.length_bytes
bram_dmp = {'extra_value': None, 'data': [],
'length': addr & 0x7fffffff, 'offset': 0}
status_val = self.control_registers['status']['register'].read_uint()
now_status = bool(status_val & 0x80000000)
now_addr = status_val & 0x7fffffff
if (not snapsetup['read_nowait']) and \
((bram_dmp['length'] != now_addr) or
(bram_dmp['length'] == 0) or now_status):
# if address is still changing, then the snap block didn't
# finish capturing. we return empty.
error_info = 'timeout %2.2f seconds. Addr at stop time: %i. ' \
'Now: Still running :%s, addr: %i.' % (
snapsetup['timeout'], bram_dmp['length'],
'yes' if now_status else 'no', now_addr)
if bram_dmp['length'] != now_addr:
raise RuntimeError('Snap %s error: Address still changing '
'after %s' % (self.name, error_info))
elif bram_dmp['length'] == 0:
raise RuntimeError('Snap %s error: Returned 0 bytes after '
'%s' % (self.name, error_info))
else:
raise RuntimeError('Snap %s error: %s' % (
self.name, error_info))
if snapsetup['circular_capture']:
val = self.control_registers['tr_en_cnt']['register'].read_uint()
bram_dmp['offset'] = val - bram_dmp['length']
else:
bram_dmp['offset'] = 0
if bram_dmp['length'] == 0:
bram_dmp['data'] = []
datatime = -1
else:
bram_dmp['data'] = self.parent.read(self.name + '_bram',
bram_dmp['length'])
datatime = time.time()
bram_dmp['offset'] += snapsetup['offset']
if bram_dmp['offset'] < 0:
bram_dmp['offset'] = 0
if bram_dmp['length'] != self.length_bytes:
raise RuntimeError('%s.read_uint() - expected %i bytes, got %i' % (
self.name, self.length_bytes,
bram_dmp['length'] / (self.width_bits / 8)))
# read the extra value
ev_reg = self.control_registers['extra_value']['register']
if ev_reg is not None:
bram_dmp['extra_value'] = ev_reg.read()
return bram_dmp, datatime
def __str__(self):
return '%s: %s' % (self.name, self.block_info)
def __repr__(self):
return '%s:%s' % (self.__class__.__name__, self.name)
@staticmethod
def packetise_snapdata(data, eof_key='eof', packet_length=-1, dv_key=None):
"""
Use the given EOF key to packetise a dictionary of snap data
:param data: a dictionary containing snap block data
:param eof_key: the key used to identify the packet boundaries - the eof
comes on the LAST VALID word in a packet
:param packet_length: check the length of the packets against
this as they are created (in 64-bit words)
:param dv_key: the key used to identify which data samples are valid
:return: a list of packets
"""
class PacketLengthError(Exception):
pass
current_packet = {}
packets = []
for ctr in range(0, len(data[eof_key])):
if dv_key is not None:
if data[dv_key][ctr] == 0:
# print('ctr({}) is zero'.format(ctr))
continue
for key in data.keys():
if key not in current_packet.keys():
current_packet[key] = []
current_packet[key].append(data[key][ctr])
if current_packet[eof_key][-1]:
if packet_length != -1:
if len(current_packet[eof_key]) != packet_length:
raise PacketLengthError(
'Expected {}, got {} at location {}.'.format(
packet_length, len(current_packet[eof_key]),
ctr))
packets.append(current_packet)
current_packet = {}
return packets