Skip to content

Commit

Permalink
(WIP) Create unified wrapper for TCP and QUIC
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed Sep 3, 2024
1 parent d9dcc4c commit 403d302
Show file tree
Hide file tree
Showing 5 changed files with 413 additions and 337 deletions.
25 changes: 13 additions & 12 deletions net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,20 @@ license.workspace = true
version.workspace = true

[dependencies]
bytecodec = "0.4.15"
bytes = "1.1.0"
futures-util = { workspace = true }
bytecodec = "0.4.15"
bytes = "1.1.0"
futures-util = { workspace = true }
pin-project-lite = { workspace = true }
quinn = "0.11.4"
rand = { package = "ouisync-rand", path = "../rand" }
rcgen = { workspace = true }
socket2 = "0.5.7" # To be able to setsockopts before a socket is bound
stun_codec = "0.3.4"
thiserror = "1.0.31"
tokio = { workspace = true, features = ["io-util", "macros", "net", "rt-multi-thread", "sync"] }
turmoil = { workspace = true, optional = true }
yamux = "0.13.3"
quinn = "0.11.4"
rand = { package = "ouisync-rand", path = "../rand" }
rcgen = { workspace = true }
socket2 = "0.5.7" # To be able to setsockopts before a socket is bound
stun_codec = "0.3.4"
thiserror = "1.0.31"
tokio = { workspace = true, features = ["io-util", "macros", "net", "rt-multi-thread", "sync"] }
tokio-util = { workspace = true, features = ["compat"] }
turmoil = { workspace = true, optional = true }
yamux = "0.13.3"

[dev-dependencies]
anyhow = { workspace = true }
Expand Down
123 changes: 56 additions & 67 deletions net/examples/peer.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use anyhow::{Context, Result};
use anyhow::Result;
use clap::{Parser, ValueEnum};
use ouisync_net::{
quic,
tcp::{self, TcpStream},
connection::{Acceptor, Connection, Connector, RecvStream, SendStream},
quic, tcp,
};
use std::{
future,
Expand All @@ -11,7 +11,7 @@ use std::{
time::Duration,
};
use tokio::{
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
io::{AsyncReadExt, AsyncWriteExt},
task, time,
};

Expand Down Expand Up @@ -68,79 +68,36 @@ enum Proto {

async fn run_client(options: &Options) -> Result<()> {
let addr: SocketAddr = (options.addr.unwrap_or(DEFAULT_CONNECT_ADDR), options.port).into();
let connector = match options.proto {
Proto::Tcp => {
let (connector, _) = tcp::configure((DEFAULT_BIND_ADDR, 0).into())?;
Connector::Tcp(connector)
}
Proto::Quic => {
let (connector, _, _) = quic::configure((DEFAULT_BIND_ADDR, 0).into())?;
Connector::Quic(connector)
}
};

match options.proto {
Proto::Tcp => run_tcp_client(addr, options.count).await,
Proto::Quic => run_quic_client(addr, options.count).await,
}
}

async fn run_tcp_client(addr: SocketAddr, count: Option<usize>) -> Result<()> {
let stream = TcpStream::connect(addr).await?;
run_client_connection(stream, count).await
}

async fn run_quic_client(addr: SocketAddr, count: Option<usize>) -> Result<()> {
let (connector, _, _) = quic::configure((Ipv4Addr::UNSPECIFIED, 0).into())?;
let connection = connector.connect(addr).await?;
run_client_connection(connection, count).await
}

async fn run_server(options: &Options) -> Result<()> {
let bind_addr: SocketAddr = (options.addr.unwrap_or(DEFAULT_BIND_ADDR), options.port).into();

match options.proto {
Proto::Tcp => run_tcp_server(bind_addr).await,
Proto::Quic => run_quic_server(bind_addr).await,
}
}

async fn run_tcp_server(addr: SocketAddr) -> Result<()> {
let (_, acceptor) = tcp::configure(addr)?;
println!("bound to {}", acceptor.local_addr());

loop {
let (stream, addr) = acceptor.accept().await?;
task::spawn(run_server_connection(stream, addr));
}
}

async fn run_quic_server(addr: SocketAddr) -> Result<()> {
let (_, acceptor, _) = quic::configure(addr)?;
println!("bound to {}", acceptor.local_addr());

loop {
let connection = acceptor
.accept()
.await
.context("failed to accept")?
.complete()
.await?;
let addr = *connection.remote_addr();
task::spawn(run_server_connection(connection, addr));
}
}
let mut connection = connector.connect(addr).await?;
let (mut tx, mut rx) = connection.open().await?;

async fn run_client_connection<T: AsyncRead + AsyncWrite + Unpin>(
mut stream: T,
count: Option<usize>,
) -> Result<()> {
println!("connected");

let message = "hello world";
let mut i = 0;

loop {
if count.map(|count| i >= count).unwrap_or(false) {
if options.count.map(|count| i >= count).unwrap_or(false) {
break;
}

i = i.saturating_add(1);

println!("sending \"{message}\"");
write_message(&mut stream, message).await?;
write_message(&mut tx, message).await?;

let response = read_message(&mut stream).await?;
let response = read_message(&mut rx).await?;
println!("received \"{response}\"");

time::sleep(SEND_DELAY).await;
Expand All @@ -149,11 +106,43 @@ async fn run_client_connection<T: AsyncRead + AsyncWrite + Unpin>(
future::pending().await
}

async fn run_server_connection<T: AsyncRead + AsyncWrite + Unpin>(mut stream: T, addr: SocketAddr) {
async fn run_server(options: &Options) -> Result<()> {
let bind_addr: SocketAddr = (options.addr.unwrap_or(DEFAULT_BIND_ADDR), options.port).into();

let acceptor = match options.proto {
Proto::Tcp => {
let (_, acceptor) = tcp::configure(bind_addr)?;
Acceptor::Tcp(acceptor)
}
Proto::Quic => {
let (_, acceptor, _) = quic::configure(bind_addr)?;
Acceptor::Quic(acceptor)
}
};

println!("bound to {}", acceptor.local_addr());

loop {
let connection = acceptor.accept().await?.await?;
task::spawn(run_server_connection(connection));
}
}

async fn run_server_connection(mut connection: Connection) {
let addr = connection.remote_addr();

println!("[{}] accepted", addr);

let (mut tx, mut rx) = match connection.accept().await {
Ok(stream) => stream,
Err(error) => {
println!("[{}] accept stream failed: {}", addr, error);
return;
}
};

loop {
let message = match read_message(&mut stream).await {
let message = match read_message(&mut rx).await {
Ok(message) => message,
Err(error) => {
println!("[{}] read failed: {}", addr, error);
Expand All @@ -163,7 +152,7 @@ async fn run_server_connection<T: AsyncRead + AsyncWrite + Unpin>(mut stream: T,

println!("[{}] received \"{}\"", addr, message);

match write_message(&mut stream, "ok").await {
match write_message(&mut tx, "ok").await {
Ok(_) => (),
Err(error) => {
println!("[{}] write failed: {}", addr, error);
Expand All @@ -175,14 +164,14 @@ async fn run_server_connection<T: AsyncRead + AsyncWrite + Unpin>(mut stream: T,
println!("[{}] closed", addr);
}

async fn read_message<T: AsyncRead + Unpin>(reader: &mut T) -> Result<String> {
async fn read_message(reader: &mut RecvStream) -> Result<String> {
let size = reader.read_u32().await? as usize;
let mut buffer = vec![0; size];
reader.read_exact(&mut buffer).await?;
Ok(String::from_utf8(buffer)?)
}

async fn write_message<T: AsyncWrite + Unpin>(writer: &mut T, message: &str) -> Result<()> {
async fn write_message(writer: &mut SendStream, message: &str) -> Result<()> {
writer.write_u32(message.len() as u32).await?;
writer.write_all(message.as_bytes()).await?;
Ok(())
Expand Down
Loading

0 comments on commit 403d302

Please sign in to comment.