From 22eb30ae0e6aab47aceb5733993128089506d3b0 Mon Sep 17 00:00:00 2001 From: Martin Ling Date: Wed, 7 Feb 2024 23:01:40 +0000 Subject: [PATCH] Remove analyzer gateware, now maintained in Cynthion repository. --- luna/gateware/applets/analyzer.py | 375 ----------------------- luna/gateware/usb/analyzer.py | 487 ------------------------------ tox.ini | 1 - 3 files changed, 863 deletions(-) delete mode 100644 luna/gateware/applets/analyzer.py delete mode 100644 luna/gateware/usb/analyzer.py diff --git a/luna/gateware/applets/analyzer.py b/luna/gateware/applets/analyzer.py deleted file mode 100644 index 8889e889a..000000000 --- a/luna/gateware/applets/analyzer.py +++ /dev/null @@ -1,375 +0,0 @@ -#!/usr/bin/env python3 -# pylint: disable=maybe-no-member -# -# This file is part of LUNA. -# -# Copyright (c) 2020 Great Scott Gadgets -# SPDX-License-Identifier: BSD-3-Clause - -""" Generic USB analyzer backend generator for LUNA. """ - -import time -import errno - - -import usb -from datetime import datetime -from enum import IntEnum, IntFlag - -from amaranth import Signal, Elaboratable, Module -from amaranth.build.res import ResourceError -from usb_protocol.emitters import DeviceDescriptorCollection -from usb_protocol.types import USBRequestType - -from luna.gateware.platform import get_appropriate_platform -from luna.usb2 import USBDevice, USBStreamInEndpoint - -from luna.gateware.usb.request.control import ControlRequestHandler -from luna.gateware.usb.stream import USBInStreamInterface -from luna.gateware.stream.generator import StreamSerializer -from luna.gateware.utils.cdc import synchronize -from luna.gateware.architecture.car import LunaECP5DomainGenerator - -from luna.gateware.interface.ulpi import UTMITranslator -from luna.gateware.usb.analyzer import USBAnalyzer - - -USB_SPEED_HIGH = 0b00 -USB_SPEED_FULL = 0b01 -USB_SPEED_LOW = 0b10 - -USB_VENDOR_ID = 0x1d50 -USB_PRODUCT_ID = 0x615b - -BULK_ENDPOINT_NUMBER = 1 -BULK_ENDPOINT_ADDRESS = 0x80 | BULK_ENDPOINT_NUMBER -MAX_BULK_PACKET_SIZE = 512 - - -class USBAnalyzerState(Elaboratable): - - def __init__(self): - self.current = Signal(8) - self.next = Signal(8) - self.write = Signal() - - def elaborate(self, platform): - m = Module() - with m.If(self.write): - m.d.sync += self.current.eq(self.next) - return m - - -class USBAnalyzerVendorRequests(IntEnum): - GET_STATE = 0 - SET_STATE = 1 - GET_SPEEDS = 2 - - -class USBAnalyzerSupportedSpeeds(IntFlag): - USB_SPEED_AUTO = 0b0001 - USB_SPEED_LOW = 0b0010 - USB_SPEED_FULL = 0b0100 - USB_SPEED_HIGH = 0b1000 - - -class USBAnalyzerVendorRequestHandler(ControlRequestHandler): - - def __init__(self, state): - self.state = state - super().__init__() - - def elaborate(self, platform): - m = Module() - interface = self.interface - - # Create convenience aliases for our interface components. - setup = interface.setup - handshake_generator = interface.handshakes_out - - # Transmitter for small-constant-response requests - m.submodules.transmitter = transmitter = \ - StreamSerializer(data_length=1, domain="usb", stream_type=USBInStreamInterface, max_length_width=1) - - # Handle vendor requests - with m.If(setup.type == USBRequestType.VENDOR): - with m.FSM(domain="usb"): - - # IDLE -- not handling any active request - with m.State('IDLE'): - - # If we've received a new setup packet, handle it. - with m.If(setup.received): - - # Select which vendor we're going to handle. - with m.Switch(setup.request): - - with m.Case(USBAnalyzerVendorRequests.GET_STATE): - m.next = 'GET_STATE' - with m.Case(USBAnalyzerVendorRequests.SET_STATE): - m.next = 'SET_STATE' - with m.Case(USBAnalyzerVendorRequests.GET_SPEEDS): - m.next = 'GET_SPEEDS' - with m.Case(): - m.next = 'UNHANDLED' - - - # GET_STATE -- Fetch the device's state - with m.State('GET_STATE'): - self.handle_simple_data_request(m, transmitter, self.state.current, length=1) - - # SET_STATE -- The host is trying to set our state - with m.State('SET_STATE'): - self.handle_register_write_request(m, self.state.next, self.state.write) - - # GET_SPEEDS -- Fetch the device's supported USB speeds - with m.State('GET_SPEEDS'): - supported_speeds = \ - USBAnalyzerSupportedSpeeds.USB_SPEED_LOW | \ - USBAnalyzerSupportedSpeeds.USB_SPEED_FULL | \ - USBAnalyzerSupportedSpeeds.USB_SPEED_HIGH - self.handle_simple_data_request(m, transmitter, supported_speeds, length=1) - - # UNHANDLED -- we've received a request we're not prepared to handle - with m.State('UNHANDLED'): - - # When we next have an opportunity to stall, do so, - # and then return to idle. - with m.If(interface.data_requested | interface.status_requested): - m.d.comb += handshake_generator.stall.eq(1) - m.next = 'IDLE' - - return m - - -class USBAnalyzerApplet(Elaboratable): - """ Gateware that serves as a generic USB analyzer backend. - - WARNING: This is _incomplete_! It's missing: - - DRAM backing for analysis - """ - - def create_descriptors(self): - """ Create the descriptors we want to use for our device. """ - - descriptors = DeviceDescriptorCollection() - - # - # We'll add the major components of the descriptors we we want. - # The collection we build here will be necessary to create a standard endpoint. - # - - # We'll need a device descriptor... - with descriptors.DeviceDescriptor() as d: - d.idVendor = USB_VENDOR_ID - d.idProduct = USB_PRODUCT_ID - - d.iManufacturer = "LUNA" - d.iProduct = "USB Analyzer" - d.iSerialNumber = "[autodetect serial here]" - d.bcdDevice = 0.02 - - d.bNumConfigurations = 1 - - - # ... and a description of the USB configuration we'll provide. - with descriptors.ConfigurationDescriptor() as c: - - with c.InterfaceDescriptor() as i: - i.bInterfaceNumber = 0 - - with i.EndpointDescriptor() as e: - e.bEndpointAddress = BULK_ENDPOINT_ADDRESS - e.wMaxPacketSize = MAX_BULK_PACKET_SIZE - - - return descriptors - - - def elaborate(self, platform): - m = Module() - - # State register - m.submodules.state = state = USBAnalyzerState() - - # Generate our clock domains. - clocking = LunaECP5DomainGenerator() - m.submodules.clocking = clocking - - # Create our UTMI translator. - ulpi = platform.request("target_phy") - m.submodules.utmi = utmi = UTMITranslator(ulpi=ulpi) - - # Strap our power controls to be in VBUS passthrough by default, - # on the target port. - try: - m.d.comb += [ - platform.request("power_a_port").o .eq(0), - platform.request("pass_through_vbus").o .eq(1), - ] - except ResourceError: - pass - - # Set up our parameters. - m.d.comb += [ - - # Set our mode to non-driving and to the desired speed. - utmi.op_mode .eq(0b01), - utmi.xcvr_select .eq(state.current[1:3]), - - # Disable all of our terminations, as we want to participate in - # passive observation. - utmi.dm_pulldown .eq(0), - utmi.dm_pulldown .eq(0), - utmi.term_select .eq(0) - ] - - # Create our USB uplink interface... - try: - uplink_ulpi = platform.request("control_phy") - except ResourceError: - uplink_ulpi = platform.request("host_phy") - m.submodules.usb = usb = USBDevice(bus=uplink_ulpi) - - # Add our standard control endpoint to the device. - descriptors = self.create_descriptors() - control_endpoint = usb.add_standard_control_endpoint(descriptors) - - # Add our vendor request handler to the control endpoint. - vendor_request_handler = USBAnalyzerVendorRequestHandler(state) - control_endpoint.add_request_handler(vendor_request_handler) - - # Add a stream endpoint to our device. - stream_ep = USBStreamInEndpoint( - endpoint_number=BULK_ENDPOINT_NUMBER, - max_packet_size=MAX_BULK_PACKET_SIZE - ) - usb.add_endpoint(stream_ep) - - # Create a USB analyzer, and connect a register up to its output. - m.submodules.analyzer = analyzer = USBAnalyzer(utmi_interface=utmi) - - m.d.comb += [ - # Connect enable signal to host-controlled state register. - analyzer.capture_enable .eq(state.current[0]), - - # Flush endpoint when analyzer is idle with capture disabled. - stream_ep.flush .eq(analyzer.idle & ~analyzer.capture_enable), - - # Discard data buffered by endpoint when the analyzer discards its data. - stream_ep.discard .eq(analyzer.discarding), - - # USB stream uplink. - stream_ep.stream .stream_eq(analyzer.stream), - - usb.connect .eq(1), - - # LED indicators. - platform.request("led", 0).o .eq(analyzer.capturing), - platform.request("led", 1).o .eq(analyzer.stream.valid), - platform.request("led", 2).o .eq(analyzer.overrun), - - platform.request("led", 3).o .eq(utmi.session_valid), - platform.request("led", 4).o .eq(utmi.rx_active), - platform.request("led", 5).o .eq(utmi.rx_error), - ] - - # Return our elaborated module. - return m - - - -class USBAnalyzerConnection: - """ Class representing a connection to a LUNA USB analyzer. - - This abstracts away connection details, so we can rapidly change the way things - work without requiring changes in e.g. our ViewSB frontend. - """ - - def __init__(self): - """ Creates our connection to the USBAnalyzer. """ - - self._buffer = bytearray() - self._device = None - - - - def build_and_configure(self): - """ Builds the LUNA analyzer applet and configures the FPGA with it. """ - - # Create the USBAnalyzer we want to work with. - analyzer = USBAnalyzerApplet() - - # Build and upload the analyzer. - # FIXME: use a temporary build directory - platform = get_appropriate_platform() - platform.build(analyzer, do_program=True) - - time.sleep(3) - - # For now, we'll use a slow, synchronous connection to the device via pyusb. - # This should be replaced with libusb1 for performance. - end_time = time.time() + 6 - while not self._device: - if time.time() > end_time: - raise RuntimeError('Timeout! The analyzer device did not show up.') - - self._device = usb.core.find(idVendor=USB_VENDOR_ID, idProduct=USB_PRODUCT_ID) - - def start_capture(self): - self._device.ctrl_transfer( - usb.core.util.CTRL_OUT | usb.core.util.CTRL_TYPE_VENDOR | usb.core.util.CTRL_RECIPIENT_DEVICE, - 1, - 1, - 0, - b"", - timeout=5, - ) - - def _fetch_data_into_buffer(self): - """ Attempts a single data read from the analyzer into our buffer. """ - - try: - data = self._device.read(BULK_ENDPOINT_ADDRESS, MAX_BULK_PACKET_SIZE) - self._buffer.extend(data) - except usb.core.USBError as e: - if e.errno == errno.ETIMEDOUT: - pass - else: - raise - - - - def read_raw_packet(self): - """ Reads a raw packet from our USB Analyzer. Blocks until a packet is complete. - - Returns: packet, timestamp, flags: - packet -- The raw packet data, as bytes. - timestamp -- The timestamp at which the packet was taken, in microseconds. - flags -- Flags indicating connection status. Format TBD. - """ - - size = 0 - packet = None - - # Read until we get enough data to determine our packet's size... - while not packet: - while len(self._buffer) < 3: - self._fetch_data_into_buffer() - - # Extract our size from our buffer. - size = (self._buffer.pop(0) << 8) | self._buffer.pop(0) - - # ... and read until we have a packet. - while len(self._buffer) < size: - self._fetch_data_into_buffer() - - # Extract our raw packet... - packet = self._buffer[0:size] - del self._buffer[0:size] - - - # ... and return it. - # TODO: extract and provide status flags - # TODO: generate a timestamp on-device - return packet, datetime.now(), None diff --git a/luna/gateware/usb/analyzer.py b/luna/gateware/usb/analyzer.py deleted file mode 100644 index da4dce62f..000000000 --- a/luna/gateware/usb/analyzer.py +++ /dev/null @@ -1,487 +0,0 @@ -# -# This file is part of LUNA. -# -# Copyright (c) 2020 Great Scott Gadgets -# SPDX-License-Identifier: BSD-3-Clause - -""" Low-level USB analyzer gateware. """ - -import unittest - -from amaranth import Signal, Module, Elaboratable, Memory, Record - -from ..stream import StreamInterface -from ..test import LunaGatewareTestCase, usb_domain_test_case - - -class USBAnalyzer(Elaboratable): - """ Core USB analyzer; backed by a small ringbuffer in FPGA block RAM. - - If you're looking to instantiate a full analyzer, you'll probably want to grab - one of the DRAM-based ringbuffer variants (which are currently forthcoming). - - If you're looking to use this with a ULPI PHY, rather than the FPGA-convenient UTMI interface, - grab the UTMITranslator from `luna.gateware.interface.ulpi`. - - Attributes - ---------- - stream: StreamInterface(), output stream - Stream that carries USB analyzer data. - - idle: Signal(), output - Asserted iff the analyzer is not currently receiving data. - stopped: Signal(), output - Asserted iff the analyzer is stopped and not capturing packets. - overrun: Signal(), output - Asserted iff the analyzer has received more data than it can store in its internal buffer. - Occurs if :attr:``stream`` is not being read quickly enough. - capturing: Signal(), output - Asserted iff the analyzer is currently capturing a packet. - discarding: Signal(), output - Asserted iff the analyzer is discarding the contents of its internal buffer. - - - Parameters - ---------- - utmi_interface: UTMIInterface() - The UTMI interface that carries the data to be analyzed. - mem_depth: int, default=8192 - The depth of the analyzer's local ringbuffer, in bytes. - Must be a power of 2. - """ - - # Current, we'll provide a packet header of 16 bits. - HEADER_SIZE_BITS = 16 - HEADER_SIZE_BYTES = HEADER_SIZE_BITS // 8 - - # Support a maximum payload size of 1024B, plus a 1-byte PID and a 2-byte CRC16. - MAX_PACKET_SIZE_BYTES = 1024 + 1 + 2 - - def __init__(self, *, utmi_interface, mem_depth=65536): - """ - Parameters: - utmi_interface -- A record or elaboratable that presents a UTMI interface. - """ - - self.utmi = utmi_interface - - assert (mem_depth % 2) == 0, "mem_depth must be a power of 2" - - # Internal storage memory. - self.mem = Memory(width=8, depth=mem_depth, name="analysis_ringbuffer") - self.mem_size = mem_depth - - # - # I/O port - # - self.stream = StreamInterface() - - self.capture_enable = Signal() - self.idle = Signal() - self.stopped = Signal() - self.overrun = Signal() - self.capturing = Signal() - self.discarding = Signal() - - # Diagnostic I/O. - self.sampling = Signal() - - - def elaborate(self, platform): - m = Module() - - # Memory read and write ports. - m.submodules.read = mem_read_port = self.mem.read_port(domain="usb") - m.submodules.write = mem_write_port = self.mem.write_port(domain="usb") - - # Store the memory address of our active packet header, which will store - # packet metadata like the packet size. - header_location = Signal.like(mem_write_port.addr) - write_location = Signal.like(mem_write_port.addr) - - # Read FIFO status. - read_location = Signal.like(mem_read_port.addr) - fifo_count = Signal.like(mem_read_port.addr, reset=0) - fifo_new_data = Signal() - - # Current receive status. - packet_size = Signal(16) - - # - # Read FIFO logic. - # - m.d.comb += [ - - # We have data ready whenever there's data in the FIFO. - self.stream.valid .eq((fifo_count != 0) & (self.idle | self.overrun)), - - # Our data_out is always the output of our read port... - self.stream.payload .eq(mem_read_port.data), - - - self.sampling .eq(mem_write_port.en) - ] - - # Once our consumer has accepted our current data, move to the next address. - with m.If(self.stream.ready & self.stream.valid): - m.d.usb += read_location.eq(read_location + 1) - m.d.comb += mem_read_port.addr.eq(read_location + 1) - - with m.Else(): - m.d.comb += mem_read_port.addr .eq(read_location), - - - - # - # FIFO count handling. - # - fifo_full = (fifo_count == self.mem_size) - - data_pop = Signal() - data_push = Signal() - m.d.comb += [ - data_pop .eq(self.stream.ready & self.stream.valid), - data_push .eq(fifo_new_data & ~fifo_full) - ] - - # If discarding data, set the count to zero. - with m.If(self.discarding): - m.d.usb += [ - fifo_count.eq(0), - read_location.eq(0), - write_location.eq(0), - ] - - # If we have both a read and a write, don't update the count, - # as we've both added one and subtracted one. - with m.Elif(data_push & data_pop): - pass - - # Otherwise, add when data's added, and subtract when data's removed. - with m.Elif(data_push): - m.d.usb += fifo_count.eq(fifo_count + 1) - with m.Elif(data_pop): - m.d.usb += fifo_count.eq(fifo_count - 1) - - - # - # Core analysis FSM. - # - with m.FSM(domain="usb") as f: - m.d.comb += [ - self.idle .eq(f.ongoing("AWAIT_START") | f.ongoing("AWAIT_PACKET")), - self.stopped .eq(f.ongoing("AWAIT_START") | f.ongoing("OVERRUN")), - self.overrun .eq(f.ongoing("OVERRUN")), - self.capturing .eq(f.ongoing("CAPTURE_PACKET")), - self.discarding.eq(self.stopped & self.capture_enable), - ] - - # AWAIT_START: wait for capture to be enabled, but don't start mid-packet. - with m.State("AWAIT_START"): - with m.If(self.capture_enable & ~self.utmi.rx_active): - m.next = "AWAIT_PACKET" - - - # AWAIT_PACKET: capture is enabled, wait for a packet to start. - with m.State("AWAIT_PACKET"): - with m.If(~self.capture_enable): - m.next = "AWAIT_START" - with m.Elif(self.utmi.rx_active): - m.next = "CAPTURE_PACKET" - m.d.usb += [ - header_location .eq(write_location), - write_location .eq(write_location + self.HEADER_SIZE_BYTES), - packet_size .eq(0), - ] - - - # Capture data until the packet is complete. - with m.State("CAPTURE_PACKET"): - - byte_received = self.utmi.rx_valid & self.utmi.rx_active - - # Capture data whenever RxValid is asserted. - m.d.comb += [ - mem_write_port.addr .eq(write_location), - mem_write_port.data .eq(self.utmi.rx_data), - mem_write_port.en .eq(byte_received), - fifo_new_data .eq(byte_received), - ] - - # Advance the write pointer each time we receive a bit. - with m.If(byte_received): - m.d.usb += [ - write_location .eq(write_location + 1), - packet_size .eq(packet_size + 1) - ] - - # If this would be filling up our data memory, - # move to the OVERRUN state. - with m.If(fifo_count == self.mem_size - 1 - self.HEADER_SIZE_BYTES): - m.next = "OVERRUN" - - # If we've stopped receiving, move to the "finalize" state. - with m.If(~self.utmi.rx_active): - m.next = "EOP_1" - - # Optimization: if we didn't receive any data, there's no need - # to create a packet. Clear our header from the FIFO and disarm. - with m.If(packet_size == 0): - m.next = "AWAIT_PACKET" - m.d.usb += [ - write_location.eq(header_location) - ] - with m.Else(): - m.next = "EOP_1" - - # EOP: handle the end of the relevant packet. - with m.State("EOP_1"): - - # Now that we're done, add the header to the start of our packet. - # This will take two cycles, currently, as we're using a 2-byte header, - # but we only have an 8-bit write port. - m.d.comb += [ - mem_write_port.addr .eq(header_location), - mem_write_port.data .eq(packet_size[8:16]), - mem_write_port.en .eq(1), - fifo_new_data .eq(1) - ] - m.next = "EOP_2" - - - with m.State("EOP_2"): - - # Add the second byte of our header. - # Note that, if this is an adjacent read, we should have - # just captured our packet header _during_ the stop turnaround. - m.d.comb += [ - mem_write_port.addr .eq(header_location + 1), - mem_write_port.data .eq(packet_size[0:8]), - mem_write_port.en .eq(1), - fifo_new_data .eq(1) - ] - m.next = "AWAIT_PACKET" - - - # BABBLE -- handles the case in which we've received a packet beyond - # the allowable size in the USB spec - with m.State("BABBLE"): - - # Trap here, for now. - pass - - - with m.State("OVERRUN"): - # TODO: we should probably set an overrun flag and then emit an EOP, here? - - # If capture is stopped by the host, reset back to the ready state. - with m.If(~self.capture_enable): - m.next = "AWAIT_START" - - - return m - - - -class USBAnalyzerTest(LunaGatewareTestCase): - - SYNC_CLOCK_FREQUENCY = None - USB_CLOCK_FREQUENCY = 60e6 - - def instantiate_dut(self): - self.utmi = Record([ - ('tx_data', 8), - ('rx_data', 8), - - ('rx_valid', 1), - ('rx_active', 1), - ('rx_error', 1), - ('rx_complete', 1), - ]) - self.analyzer = USBAnalyzer(utmi_interface=self.utmi, mem_depth=128) - return self.analyzer - - - def advance_stream(self, value): - yield self.utmi.rx_data.eq(value) - yield - - - @usb_domain_test_case - def test_single_packet(self): - # Enable capture - yield self.analyzer.capture_enable.eq(1) - yield - - # Ensure we're not capturing until a transaction starts. - self.assertEqual((yield self.dut.capturing), 0) - - # Apply our first input, and validate that we start capturing. - yield self.utmi.rx_active.eq(1) - yield self.utmi.rx_valid.eq(1) - yield self.utmi.rx_data.eq(0) - yield - yield - - # Provide some data. - for i in range(1, 10): - yield from self.advance_stream(i) - self.assertEqual((yield self.dut.capturing), 1) - - # Ensure we're still capturing, _and_ that we have - # data available. - self.assertEqual((yield self.dut.capturing), 1) - - # End our packet. - yield self.utmi.rx_active.eq(0) - yield from self.advance_stream(10) - - # Idle for several cycles. - yield from self.advance_cycles(5) - self.assertEqual((yield self.dut.capturing), 0) - - # Try to read back the capture data, byte by byte. - self.assertEqual((yield self.dut.stream.valid), 1) - - # First, we should get a header with the total data length. - # This should be 0x00, 0x0B; as we captured 11 bytes. - self.assertEqual((yield self.dut.stream.payload), 0) - yield self.dut.stream.ready.eq(1) - yield - - # Validate that we get all of the bytes of the packet we expected. - expected_data = [0x00, 0x0a] + list(range(0, 10)) - for datum in expected_data: - self.assertEqual((yield self.dut.stream.payload), datum) - yield - - # We should now be out of data -- verify that there's no longer data available. - self.assertEqual((yield self.dut.stream.valid), 0) - - - @usb_domain_test_case - def test_short_packet(self): - # Enable capture - yield self.analyzer.capture_enable.eq(1) - yield - - # Apply our first input, and validate that we start capturing. - yield self.utmi.rx_active.eq(1) - yield self.utmi.rx_valid.eq(1) - yield self.utmi.rx_data.eq(0) - yield - - # Provide some data. - yield from self.advance_stream(0xAB) - - # End our packet. - yield self.utmi.rx_active.eq(0) - yield from self.advance_stream(10) - - # Idle for several cycles. - yield from self.advance_cycles(5) - self.assertEqual((yield self.dut.capturing), 0) - - # Try to read back the capture data, byte by byte. - self.assertEqual((yield self.dut.stream.valid), 1) - - # First, we should get a header with the total data length. - # This should be 0x00, 0x01; as we captured 1 byte. - self.assertEqual((yield self.dut.stream.payload), 0) - yield self.dut.stream.ready.eq(1) - yield - - # Validate that we get all of the bytes of the packet we expected. - expected_data = [0x00, 0x01, 0xab] - for datum in expected_data: - self.assertEqual((yield self.dut.stream.payload), datum) - yield - - # We should now be out of data -- verify that there's no longer data available. - self.assertEqual((yield self.dut.stream.valid), 0) - - - - -class USBAnalyzerStackTest(LunaGatewareTestCase): - """ Test that evaluates a full-stack USB analyzer setup. """ - - SYNC_CLOCK_FREQUENCY = None - USB_CLOCK_FREQUENCY = 60e6 - - - def instantiate_dut(self): - - from ..interface.ulpi import UTMITranslator - - self.ulpi = Record([ - ('data', [ - ('i', 8), - ('o', 8), - ('oe', 8), - ]), - ('nxt', 1), - ('stp', 1), - ('dir', [('i', 1)]), - ('clk', 1), - ('rst', 1) - ]) - - # Create a stack of our UTMITranslator and our USBAnalyzer. - # We'll wrap the both in a module to establish a synthetic hierarchy. - m = Module() - m.submodules.translator = self.translator = UTMITranslator(ulpi=self.ulpi, handle_clocking=False) - m.submodules.analyzer = self.analyzer = USBAnalyzer(utmi_interface=self.translator, mem_depth=128) - return m - - - def initialize_signals(self): - - # Ensure the translator doesn't need to perform any register reads/writes - # by default, so we can focus on packet Rx. - yield self.translator.xcvr_select.eq(1) - yield self.translator.dm_pulldown.eq(1) - yield self.translator.dp_pulldown.eq(1) - yield self.translator.use_external_vbus_indicator.eq(0) - - - @usb_domain_test_case - def test_simple_analysis(self): - # Enable capture - yield self.analyzer.capture_enable.eq(1) - yield from self.advance_cycles(10) - - # Start a new packet. - yield self.ulpi.dir.eq(1) - yield self.ulpi.nxt.eq(1) - - # Bus turnaround packet. - yield self.ulpi.data.i.eq(0x80) - yield - - # Provide some data to be captured. - for i in [0x2d, 0x00, 0x10]: - yield self.ulpi.data.i.eq(i) - yield - - # Mark our packet as complete. - yield self.ulpi.dir.eq(0) - yield self.ulpi.nxt.eq(0) - yield - - # Wait for a few cycles, for realism. - yield from self.advance_cycles(10) - - # Read our data out of the PHY. - yield self.analyzer.stream.ready.eq(1) - yield - - # Validate that we got the correct packet out; plus headers. - for i in [0x00, 0x03, 0x2d, 0x00, 0x10]: - self.assertEqual((yield self.analyzer.stream.payload), i) - yield - - - -if __name__ == "__main__": - unittest.main() diff --git a/tox.ini b/tox.ini index c3cfad794..4b2421fb0 100644 --- a/tox.ini +++ b/tox.ini @@ -14,7 +14,6 @@ commands = python -m luna.gateware.utils.cdc python -m luna.gateware.debug.ila python -m luna.gateware.stream.generator - python -m luna.gateware.usb.analyzer python -m luna.gateware.usb.stream python -m luna.gateware.usb.usb2.packet python -m luna.gateware.usb.usb2.request