Skip to content

Commit

Permalink
Merge pull request #255 from martinling/speed-test-fix
Browse files Browse the repository at this point in the history
Support explicit port sharing in speed test
  • Loading branch information
mossmann authored May 17, 2024
2 parents 75cc3c2 + 880dab7 commit 1c2eece
Showing 1 changed file with 25 additions and 8 deletions.
33 changes: 25 additions & 8 deletions luna/gateware/applets/speed_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
from luna.usb3 import USBSuperSpeedDevice, SuperSpeedStreamInEndpoint

from luna.gateware.platform import NullPin
from luna.gateware.platform.core import LUNAApolloPlatform

from apollo_fpga.gateware.advertiser import ApolloAdvertiser, ApolloAdvertiserRequestHandler

VENDOR_ID = 0x16d0
PRODUCT_ID = 0x0f3b
Expand All @@ -15,16 +18,16 @@
class USBSpeedTestDevice(Elaboratable):
""" Simple device that exchanges data with the host as fast as the hardware can. """

def __init__(self, generate_clocks=True, fs_only=False, phy=None,
def __init__(self, generate_clocks=True, fs_only=False, phy_name=None,
vid=VENDOR_ID, pid=PRODUCT_ID):
self.generate_clocks = generate_clocks
self.fs_only = fs_only
self.phy = phy
self.phy_name = phy_name
self.vid = vid
self.pid = pid
self.max_bulk_packet_size = 64 if fs_only else 512

def create_descriptors(self):
def create_descriptors(self, sharing):
""" Create the descriptors we want to use for our device. """

descriptors = DeviceDescriptorCollection()
Expand Down Expand Up @@ -62,6 +65,12 @@ def create_descriptors(self):
e.bEndpointAddress = BULK_ENDPOINT_NUMBER
e.wMaxPacketSize = self.max_bulk_packet_size

if sharing is not None:
with c.InterfaceDescriptor() as i:
i.bInterfaceNumber = 1
i.bInterfaceClass = 0xFF
i.bInterfaceSubclass = 0x00
i.bInterfaceProtocol = ApolloAdvertiserRequestHandler.PROTOCOL_VERSION

return descriptors

Expand All @@ -74,10 +83,14 @@ def elaborate(self, platform):
m.submodules.clocks = platform.clock_domain_generator()

# Request default PHY unless another was specified.
if self.phy is None:
ulpi = platform.request(platform.default_usb_connection)
phy_name = self.phy_name or platform.default_usb_connection
ulpi = platform.request(phy_name)

# Check if port is shared.
if isinstance(platform, LUNAApolloPlatform):
sharing = platform.port_sharing(phy_name)
else:
ulpi = self.phy
sharing = None

# Create our USB device interface...
m.submodules.usb = usb = USBDevice(bus=ulpi)
Expand All @@ -86,9 +99,13 @@ def elaborate(self, platform):
"fs_only must be set for devices with a full speed only PHY"

# Add our standard control endpoint to the device.
descriptors = self.create_descriptors()
usb.add_standard_control_endpoint(descriptors)
descriptors = self.create_descriptors(sharing)
control_ep = usb.add_standard_control_endpoint(descriptors)

# Add an Apollo advertiser, if needed.
if sharing == "advertising":
adv = m.submodules.adv = ApolloAdvertiser()
control_ep.add_request_handler(adv.default_request_handler(1))

# Add a stream endpoint to our device for Bulk IN transfers.
stream_in_ep = USBStreamInEndpoint(
Expand Down

0 comments on commit 1c2eece

Please sign in to comment.