-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #22 from jumpstarter-dev/dutlink
Implement dutlink driver
- Loading branch information
Showing
18 changed files
with
615 additions
and
172 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
import sys | ||
from threading import Thread | ||
|
||
import click | ||
|
||
from jumpstarter.common.utils import serve | ||
from jumpstarter.drivers.dutlink.base import Dutlink | ||
|
||
instance = Dutlink( | ||
name="dutlink", | ||
serial="c415a913", | ||
storage_device="/dev/disk/by-id/usb-SanDisk_Extreme_Pro_52A456790D93-0:0", | ||
) | ||
|
||
|
||
def monitor_power(client): | ||
try: | ||
for reading in client.power.read(): | ||
click.secho(f"{reading}", fg="red") | ||
except Exception: | ||
pass | ||
|
||
|
||
with serve(instance) as client: | ||
click.secho("Connected to Dutlink", fg="red") | ||
Thread(target=monitor_power, args=[client]).start() | ||
with client.console.expect() as expect: | ||
expect.logfile = sys.stdout.buffer | ||
|
||
expect.send("\x02" * 5) | ||
|
||
click.secho("Entering DUT console", fg="red") | ||
expect.send("console\r\n") | ||
expect.expect("Entering console mode") | ||
|
||
client.power.off() | ||
|
||
click.secho("Writing system image", fg="red") | ||
client.storage.write_local_file("/tmp/sdcard.img") | ||
click.secho("Written system image", fg="red") | ||
|
||
client.storage.dut() | ||
|
||
click.secho("Powering on DUT", fg="red") | ||
client.power.on() | ||
|
||
expect.expect("StarFive #") | ||
click.secho("Working around u-boot usb initialization issue", fg="red") | ||
expect.sendline("usb reset") | ||
|
||
expect.expect("StarFive #") | ||
expect.sendline("boot") | ||
|
||
expect.expect("Enter choice:") | ||
click.secho("Selecting boot entry", fg="red") | ||
expect.sendline("1") | ||
|
||
expect.expect("NixOS Stage 1") | ||
|
||
click.secho("Reached initrd", fg="red") | ||
|
||
expect.send("\x02" * 5) | ||
expect.expect("Exiting console mode") | ||
|
||
client.power.off() |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
import os | ||
|
||
from anyio.from_thread import start_blocking_portal | ||
|
||
from jumpstarter.client import client_from_channel | ||
from jumpstarter.common.grpc import insecure_channel | ||
|
||
|
||
def main(): | ||
host = os.environ.get("JUMPSTARTER_HOST", None) | ||
|
||
if host is None: | ||
raise RuntimeError("j command can only be used under jmp shell") | ||
|
||
with start_blocking_portal() as portal: | ||
channel = portal.call(insecure_channel, host) | ||
client = portal.call(client_from_channel, channel, portal) | ||
client.cli()() | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,172 @@ | ||
import os | ||
from collections.abc import AsyncGenerator | ||
from dataclasses import dataclass, field | ||
from pathlib import Path | ||
from uuid import UUID | ||
|
||
import pyudev | ||
import usb.core | ||
import usb.util | ||
from anyio import fail_after, sleep | ||
from anyio.streams.file import FileWriteStream | ||
|
||
from jumpstarter.drivers import Driver, export | ||
from jumpstarter.drivers.composite import CompositeInterface | ||
from jumpstarter.drivers.power import PowerInterface, PowerReading | ||
from jumpstarter.drivers.serial.pyserial import PySerial | ||
from jumpstarter.drivers.storage import StorageMuxInterface | ||
|
||
|
||
@dataclass(kw_only=True) | ||
class DutlinkPower(PowerInterface, Driver): | ||
parent: "Dutlink" | ||
|
||
def control(self, action): | ||
return self.parent.control( | ||
usb.ENDPOINT_OUT, | ||
0x01, | ||
["off", "on", "force-off", "force-on", "rescue"], | ||
action, | ||
None, | ||
) | ||
|
||
@export | ||
def on(self): | ||
return self.control("on") | ||
|
||
@export | ||
def off(self): | ||
return self.control("off") | ||
|
||
@export | ||
async def read(self) -> AsyncGenerator[PowerReading, None]: | ||
prev = None | ||
|
||
while True: | ||
[v, a, _] = self.parent.control( | ||
usb.ENDPOINT_IN, | ||
0x04, | ||
["version", "power", "voltage", "current"], | ||
"power", | ||
None, | ||
).split() | ||
|
||
curr = PowerReading(voltage=float(v[:-1]), current=float(a[:-1])) | ||
|
||
if prev != curr: | ||
prev = curr | ||
yield curr | ||
|
||
await sleep(1) | ||
|
||
|
||
@dataclass(kw_only=True) | ||
class DutlinkStorageMux(StorageMuxInterface, Driver): | ||
parent: "Dutlink" | ||
storage_device: str | ||
|
||
def control(self, action): | ||
return self.parent.control( | ||
usb.ENDPOINT_OUT, | ||
0x02, | ||
["off", "host", "dut"], | ||
action, | ||
None, | ||
) | ||
|
||
@export | ||
def host(self): | ||
return self.control("host") | ||
|
||
@export | ||
def dut(self): | ||
return self.control("dut") | ||
|
||
@export | ||
def off(self): | ||
return self.control("off") | ||
|
||
@export | ||
async def write(self, src: str): | ||
self.control("host") | ||
|
||
with fail_after(20): | ||
while True: | ||
if os.path.exists(self.storage_device): | ||
try: | ||
Path(self.storage_device).write_bytes(b"\0") | ||
except OSError: | ||
pass # wait for device ready | ||
else: | ||
break | ||
|
||
await sleep(1) | ||
|
||
async with await FileWriteStream.from_path(self.storage_device) as stream: | ||
async for chunk in self.resources[UUID(src)]: | ||
await stream.send(chunk) | ||
|
||
|
||
@dataclass(kw_only=True) | ||
class Dutlink(CompositeInterface, Driver): | ||
serial: str | None = field(default=None) | ||
dev: usb.core.Device = field(init=False) | ||
itf: usb.core.Interface = field(init=False) | ||
|
||
storage_device: str | ||
|
||
power: DutlinkPower = field(init=False) | ||
storage: DutlinkStorageMux = field(init=False) | ||
console: PySerial = field(init=False, default=None) | ||
|
||
def items(self, parent=None): | ||
return super().items(parent) + self.power.items(self) + self.storage.items(self) + self.console.items(self) | ||
|
||
def __post_init__(self, *args): | ||
super().__post_init__(*args) | ||
for dev in usb.core.find(idVendor=0x2B23, idProduct=0x1012, find_all=True): | ||
serial = usb.util.get_string(dev, dev.iSerialNumber) | ||
if serial == self.serial or self.serial is None: | ||
self.dev = dev | ||
self.itf = usb.util.find_descriptor( | ||
dev.get_active_configuration(), | ||
bInterfaceClass=0xFF, | ||
bInterfaceSubClass=0x1, | ||
bInterfaceProtocol=0x1, | ||
) | ||
|
||
self.power = DutlinkPower(name="power", parent=self) | ||
self.storage = DutlinkStorageMux(name="storage", parent=self, storage_device=self.storage_device) | ||
|
||
for tty in pyudev.Context().list_devices(subsystem="tty", ID_SERIAL_SHORT=serial): | ||
if self.console is None: | ||
self.console = PySerial(name="console", url=tty.device_node) | ||
else: | ||
raise RuntimeError(f"multiple console found for the dutlink board with serial {serial}") | ||
|
||
if self.console is None: | ||
raise RuntimeError(f"no console found for the dutlink board with serial {serial}") | ||
|
||
return | ||
|
||
raise FileNotFoundError("failed to find dutlink device") | ||
|
||
def control(self, direction, ty, actions, action, value): | ||
if direction == usb.ENDPOINT_IN: | ||
self.dev.ctrl_transfer( | ||
bmRequestType=usb.ENDPOINT_OUT | usb.TYPE_VENDOR | usb.RECIP_INTERFACE, | ||
wIndex=self.itf.bInterfaceNumber, | ||
bRequest=0x00, | ||
) | ||
|
||
op = actions.index(action) | ||
res = self.dev.ctrl_transfer( | ||
bmRequestType=direction | usb.TYPE_VENDOR | usb.RECIP_INTERFACE, | ||
wIndex=self.itf.bInterfaceNumber, | ||
bRequest=ty, | ||
wValue=op, | ||
data_or_wLength=(value if direction == usb.ENDPOINT_OUT else 512), | ||
) | ||
|
||
if direction == usb.ENDPOINT_IN: | ||
return bytes(res).decode("utf-8") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
import pytest | ||
|
||
from jumpstarter.common.utils import serve | ||
from jumpstarter.drivers.dutlink.base import Dutlink | ||
|
||
|
||
def test_drivers_dutlink(): | ||
try: | ||
instance = Dutlink( | ||
name="dutlink", | ||
storage_device="/dev/null", | ||
) | ||
except FileNotFoundError: | ||
pytest.skip("dutlink not available") | ||
|
||
with serve(instance) as client: | ||
with client.console.expect() as expect: | ||
expect.send("\x02" * 5) | ||
|
||
expect.send("about\r\n") | ||
expect.expect("Jumpstarter test-harness") | ||
|
||
expect.send("console\r\n") | ||
expect.expect("Entering console mode") | ||
|
||
client.power.off() | ||
|
||
client.storage.write_local_file("/dev/null") | ||
client.storage.dut() | ||
|
||
client.power.on() | ||
|
||
expect.send("\x02" * 5) | ||
expect.expect("Exiting console mode") | ||
|
||
client.power.off() |
Oops, something went wrong.