Skip to content

Commit

Permalink
a little rearranging
Browse files Browse the repository at this point in the history
  • Loading branch information
rpatel3001 committed Feb 23, 2024
1 parent eee2e81 commit 800c7d5
Showing 1 changed file with 61 additions and 83 deletions.
144 changes: 61 additions & 83 deletions rootfs/scripts/soapy2tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,37 @@
from time import time, sleep
from queue import Queue, Full

def rx_thread(sdr, rxStream, rxcfg, tx_init, inbufs, rxq):
def rx_thread(sdrcfg, rxStream, rxcfg, tx_init, inbufs, rxq):
prctl.set_name("rx")

bufidx = 0
last_cleared = [time()] * len(rxq)

sdr = Device(sdrcfg)
rxStream = sdr.setupStream(SOAPY_SDR_RX, SOAPY_SDR_CF32)
sdr.setSampleRate(SOAPY_SDR_RX, 0, rxcfg["rate"])
sdr.setFrequencyCorrection(SOAPY_SDR_RX, 0, rxcfg["ppm"])
sdr.setFrequency(SOAPY_SDR_RX, 0, rxcfg["freq"])

try:
gain = float(environ.get("GAIN", 40))
print(f"[rx] Setting gain to: {gain}")
sdr.setGainMode(SOAPY_SDR_RX, 0, False)
sdr.setGain(SOAPY_SDR_RX, 0, gain)
except ValueError:
gains = dict(kv.split("=") for kv in environ.get("GAIN", "agc=-30" if sdrcfg["driver"] == "sdrplay" else "Automatic=40").split(","))
for g in gains:
if(g.lower() == "agc"):
print("[rx] Enabling AGC")
sdr.setGainMode(SOAPY_SDR_RX, 0, True)
if(gains[g].lower() != "true"):
print("[rx] Setting AGC setpoint to: %f" % float(gains[g]))
sdr.writeSetting("agc_setpoint", float(gains[g]))
else:
print("[rx] Setting gain %s to: %f" % (g, float(gains[g])))
sdr.setGain(SOAPY_SDR_RX, 0, g, float(gains[g]))

sdr.activateStream(rxStream)
atexit.register(sdr.deactivateStream, rxStream)

status = sdr.readStream(rxStream, [inbufs[bufidx]], rxcfg["mtu"])
print(f"[rx] Actual stream transfer size: {status.ret}/{rxcfg['mtu']}")
Expand All @@ -28,6 +51,8 @@ def rx_thread(sdr, rxStream, rxcfg, tx_init, inbufs, rxq):
samps = status.ret
if samps < 0:
print(f"[rx] failed to read stream, restarting thread: {SoapySDR_errToStr(status.ret)}: {status}")
sdr.deactivateStream(rxStream)
sdr.closeStream(rxStream)
return

for i in range(len(rxq)):
Expand Down Expand Up @@ -69,62 +94,62 @@ def fastfilt(sos, x, zi):
zi[s, 1] = (sos[s, 2] * x_n - sos[s, 5] * x[n])


def tx_thread(rxcfg, txcfg, tx_init, inbufs, rxq):
def tx_thread(rxcfg, chancfg, tx_init, inbufs, rxq):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
prctl.set_name(f"tx {txcfg['idx']}")
print(f"[tx {txcfg['idx']}] Listening on port {txcfg['baseport'] + txcfg['idx']}")
print(f"[tx {txcfg['idx']}] Shifting by {(txcfg['fc'] - rxcfg['freq'])/1e6} MHz ({rxcfg['freq']/1e6} to {txcfg['fc']/1e6}) and decimating {rxcfg['rate']/1e6} by {txcfg['deci']} to {rxcfg['rate']/txcfg['deci']/1e6}")
prctl.set_name(f"tx {chancfg['idx']}")
print(f"[tx {chancfg['idx']}] Listening on port {chancfg['baseport'] + chancfg['idx']}")
print(f"[tx {chancfg['idx']}] Shifting by {(chancfg['fc'] - rxcfg['freq'])/1e6} MHz ({rxcfg['freq']/1e6} to {chancfg['fc']/1e6}) and decimating {rxcfg['rate']/1e6} by {chancfg['deci']} to {rxcfg['rate']/chancfg['deci']/1e6}")

outbuf = np.zeros(rxcfg["mtu"] * 2 // txcfg['deci'], np.uint8)
outbuf = np.zeros(rxcfg["mtu"] * 2 // chancfg['deci'], np.uint8)
fdtype = np.complex64

if txcfg['deci'] != 1:
if chancfg['deci'] != 1:
# setup mixer LO
fmix = txcfg["fc"] - rxcfg["freq"] # amount to shift by
fmix = chancfg["fc"] - rxcfg["freq"] # amount to shift by
mixper = int(np.lcm(fmix, rxcfg["rate"]) / fmix) # period of the mixer frequency sampled at the sample rate
mixlen = int(np.ceil(rxcfg["mtu"] / mixper)) * mixper * 2 # the smallest periodic buffer length that fits the max MTU, times 2
mixtime = np.arange(0, mixlen) / rxcfg["rate"] # sample times
mix = np.exp(-1j * 2*np.pi * fmix * mixtime).astype(fdtype) * 255.0 # LO buffer, scaled to int8
offset = 0 # keep track of where in the LO buffer we are
# setup filter
wp = 0.80 / txcfg['deci'] # passband stop (% Nyquist)
ws = 1.20 / txcfg['deci'] # stopband start (% Nyquist)
wp = 0.80 / chancfg['deci'] # passband stop (% Nyquist)
ws = 1.20 / chancfg['deci'] # stopband start (% Nyquist)
rp = 0.75 # passband ripple (dB)
rs = 40 # stopband attenuation (dB)
sos = iirdesign(wp, ws, rp, rs, output="sos").astype(fdtype) # get filter coefficients
zi = sosfilt_zi(sos).astype(fdtype) # calculate initial conditions
# buffer for decimation
decbuf = np.zeros(rxcfg["mtu"] // txcfg['deci'], fdtype)
decbuf = np.zeros(rxcfg["mtu"] // chancfg['deci'], fdtype)

# wait for a connection, then signal RX thread to push to the queue
sock.bind(("0.0.0.0", txcfg["baseport"] + txcfg["idx"]))
sock.bind(("0.0.0.0", chancfg["baseport"] + chancfg["idx"]))
sock.listen()
conn, addr = sock.accept()
tx_init[txcfg["idx"]].set()
tx_init[chancfg["idx"]].set()

with conn:
print(f"[tx {txcfg['idx']}] Connection accepted from {addr}")
print(f"[tx {chancfg['idx']}] Connection accepted from {addr}")
conn.sendall(b"RTL0\x00\x00\x00\x00\x00\x00\x00\x00") # rtl-tcp header

while True:
bufidx, insamps = rxq[txcfg['idx']].get() # receive a buffer index and length
decsamps = insamps // txcfg['deci']
bufidx, insamps = rxq.get() # receive a buffer index and length
decsamps = insamps // chancfg['deci']
outsamps = decsamps * 2
# copy out the received samples
sigbuf = np.array(inbufs[bufidx][:insamps], fdtype, order='C')
if txcfg['deci'] == 1:
if chancfg['deci'] == 1:
# if no decimation, just scale and shift, don't mix/filter
outbuf[:outsamps] = fastscale(sigbuf.view(np.float32))
else:
fastmult(sigbuf, mix[offset:offset+insamps]) # mix with LO
offset = (offset + insamps) % mixper
fastfilt(sos, sigbuf, zi) # filter
decbuf[:decsamps] = sigbuf[:insamps:txcfg['deci']] # decimate
decbuf[:decsamps] = sigbuf[:insamps:chancfg['deci']] # decimate
outbuf[:outsamps] = fastshift(decbuf[:decsamps].view(np.float32)) # shift to uint8 range
try:
conn.sendall(outbuf[:outsamps])
except BaseException:
print(f"[tx {txcfg['idx']}] Disconnected from {addr}")
print(f"[tx {chancfg['idx']}] Disconnected from {addr}")
tx_init[txcfg['idx']].clear()


Expand All @@ -147,75 +172,28 @@ def main():
rxcfg = {}
txcfg = {}

# parse params to open and initialize SoapySDR device + stream
args = dict(kv.split("=") for kv in environ["SOAPY"].split(","))
print(f"[rx] Opening SoapySDR device with parameters: {args}")
sdr = Device(args)
rxStream = sdr.setupStream(SOAPY_SDR_RX, SOAPY_SDR_CF32)
atexit.register(sdr.closeStream, rxStream)
sdrcfg = dict(kv.split("=") for kv in environ["SOAPY"].split(","))
print(f"[rx] Using SoapySDR device with parameters: {sdrcfg}")

try:
txcfg["baseport"] = int(environ["BASEPORT"])
except KeyError:
txcfg["baseport"] = 1234
print(f"[main] Starting output channels at port {txcfg['baseport']}")
txcfg["baseport"] = int(environ.get("BASEPORT", 1234))

try:
rxcfg["numbufs"] = int(environ["NUMBUFS"])
except KeyError:
rxcfg["numbufs"] = 100
rxcfg["numbufs"] = int(environ.get("NUMBUFS", 100))
print(f"[main] Using {rxcfg['numbufs']} bufs")

try:
rxcfg["rate"] = int(environ["RATE"])
sdr.setSampleRate(SOAPY_SDR_RX, 0, rxcfg["rate"])
except KeyError:
rxcfg["rate"] = sdr.getSampleRate(SOAPY_SDR_RX, 0)
rxcfg["rate"] = int(environ.get("RATE", 2100000))
print(f"[main] Sampling at {rxcfg['rate']} MHz")

try:
rxcfg["ppm"] = int(environ["PPM"])
sdr.setFrequencyCorrection(SOAPY_SDR_RX, 0, rxcfg["ppm"])
print(f"[main] Using {rxcfg['ppm']} ppm offset")
except KeyError:
pass
rxcfg["ppm"] = int(environ.get("PPM", 0))
print(f"[main] Using {rxcfg['ppm']} ppm offset")

try:
rxcfg["freq"] = int(environ["FREQ"])
sdr.setFrequency(SOAPY_SDR_RX, 0, rxcfg["freq"])
except KeyError:
rxcfg["freq"] = sdr.getFrequency(SOAPY_SDR_RX, 0)
rxcfg["freq"] = int(environ.get("FREQ", 136500000))
print(f"[main] Tuning to {rxcfg['freq']} MHz")

try:
rxcfg["bw"] = int(environ["BANDWIDTH"])
sdr.setBandwidth(SOAPY_SDR_RX, 0, rxcfg["bw"])
print(f"[main] Setting {rxcfg['bw']} MHz bandwidth")
except KeyError:
pass

try:
sdr.setGainMode(SOAPY_SDR_RX, 0, False)
try:
gain = float(environ["GAIN"])
print(f"[rx] Setting gain to: {gain}")
sdr.setGain(SOAPY_SDR_RX, 0, gain)
except ValueError:
gains = dict(kv.split("=") for kv in environ["GAIN"].split(","))
for g in gains:
if(g.lower() == "agc"):
print("[rx] Enabling AGC")
sdr.setGainMode(SOAPY_SDR_RX, 0, True)
if(gains[g].lower() != "true"):
print("[rx] Setting AGC setpoint to: %f" % float(gains[g]))
sdr.writeSetting("agc_setpoint", float(gains[g]))
else:
print("[rx] Setting gain %s to: %f" % (g, float(gains[g])))
sdr.setGain(SOAPY_SDR_RX, 0, g, float(gains[g]))
except KeyError:
pass

sdr = Device(sdrcfg)
rxStream = sdr.setupStream(SOAPY_SDR_RX, SOAPY_SDR_CF32)
rxcfg["mtu"] = sdr.getStreamMTU(rxStream)
sdr.closeStream(rxStream)
del sdr
print(f"[rx] Using stream MTU: {rxcfg['mtu']}")

inbufs = np.zeros((rxcfg["numbufs"], rxcfg["mtu"]), np.complex64)
Expand All @@ -225,16 +203,16 @@ def main():
# semicolon separated list of comma separated channel settings
# 0: center frequency
# 1: decimation factor
chans = list(tuple(map(int, c.split(","))) for c in environ["CHANS"].split(";"))
chans = list(tuple(map(int, c.split(","))) for c in environ.get("CHANS", f"rxcfg['freq'],1").split(";"))
# start new TX threads for each output channel
for i, (fc, deci) in enumerate(chans):
cfg = {"idx": i, "fc": fc, "deci": deci, **txcfg}
chancfg = {"idx": i, "fc": fc, "deci": deci, **txcfg}
rxq.append(Queue(rxcfg["numbufs"]))
tx_init.append(Event())
Thread(name=f"tx {i}", target=thread_wrapper, args=(tx_thread, rxcfg, cfg, tx_init, inbufs, rxq)).start()
Thread(name=f"tx {i}", target=thread_wrapper, args=(tx_thread, rxcfg, chancfg, tx_init, inbufs, rxq[i])).start()

# start a thread to receive samples from the SDR
rxt = Thread(name="rx", target=thread_wrapper, args=(rx_thread, sdr, rxStream, rxcfg, tx_init, inbufs, rxq))
rxt = Thread(name="rx", target=thread_wrapper, args=(rx_thread, sdrcfg, rxStream, rxcfg, tx_init, inbufs, rxq))
rxt.start()

rxt.join()
Expand Down

0 comments on commit 800c7d5

Please sign in to comment.