Skip to content

Add usage example #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
/target
/**/target
Cargo.lock
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,6 @@ tracing = "0.1.26"
x25519-dalek = "1.1.1"
xoodoo = "0.1.0"
zeroize = "1.3.0"

[workspace]
members = ["examples/bench"]
19 changes: 19 additions & 0 deletions examples/bench/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "bench"
version = "0.1.0"
edition = "2018"
license = "MIT OR Apache-2.0"
publish = false

[dependencies]
anyhow = "1.0.22"
bytes = "1"
hdrhistogram = { version = "7.2", default-features = false }
quinn = "0.9.1"
clap = { version = "3.2", features = ["derive"] }
tokio = { version = "1.0.1", features = ["rt", "sync"] }
tracing = "0.1.10"
tracing-subscriber = { version = "0.3.0", default-features = false, features = ["env-filter", "fmt", "ansi", "time", "local-time"] }
quinn-noise = { path = "../.." }
rand = "0.7.3"
ed25519-dalek = "1.0.1"
212 changes: 212 additions & 0 deletions examples/bench/src/bin/bulk.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
use std::{
net::SocketAddr,
sync::{Arc, Mutex},
time::Instant,
};

use anyhow::{Context, Result};
use clap::Parser;
use tokio::sync::Semaphore;
use tracing::{info, trace};

use bench::{
configure_tracing_subscriber, connect_client, drain_stream, rt, send_data_on_stream,
server_endpoint,
stats::{Stats, TransferResult},
Opt,
};

fn main() {
let opt = Opt::parse();
configure_tracing_subscriber();

let mut csprng = rand::rngs::OsRng {};
let keypair: ed25519_dalek::Keypair = ed25519_dalek::Keypair::generate(&mut csprng);
let public_key: ed25519_dalek::PublicKey = keypair.public;

let runtime = rt();
let (server_addr, endpoint) = server_endpoint(&runtime, keypair, &opt);

let server_thread = std::thread::spawn(move || {
if let Err(e) = runtime.block_on(server(endpoint, opt)) {
eprintln!("server failed: {:#}", e);
}
});

let mut handles = Vec::new();
for _ in 0..opt.clients {
handles.push(std::thread::spawn(move || {
let runtime = rt();
match runtime.block_on(client(server_addr, public_key, opt)) {
Ok(stats) => Ok(stats),
Err(e) => {
eprintln!("client failed: {:#}", e);
Err(e)
}
}
}));
}

for (id, handle) in handles.into_iter().enumerate() {
// We print all stats at the end of the test sequentially to avoid
// them being garbled due to being printed concurrently
if let Ok(stats) = handle.join().expect("client thread") {
stats.print(id);
}
}

server_thread.join().expect("server thread");
}

async fn server(endpoint: quinn::Endpoint, opt: Opt) -> Result<()> {
let mut server_tasks = Vec::new();

// Handle only the expected amount of clients
for _ in 0..opt.clients {
let handshake = endpoint.accept().await.unwrap();
let connection = handshake.await.context("handshake failed")?;

server_tasks.push(tokio::spawn(async move {
loop {
let (mut send_stream, mut recv_stream) = match connection.accept_bi().await {
Err(quinn::ConnectionError::ApplicationClosed(_)) => break,
Err(e) => {
eprintln!("accepting stream failed: {:?}", e);
break;
}
Ok(stream) => stream,
};
trace!("stream established");

let _: tokio::task::JoinHandle<Result<()>> = tokio::spawn(async move {
drain_stream(&mut recv_stream, opt.read_unordered).await?;
send_data_on_stream(&mut send_stream, opt.download_size).await?;
Ok(())
});
}

if opt.stats {
println!("\nServer connection stats:\n{:#?}", connection.stats());
}
}));
}

// Await all the tasks. We have to do this to prevent the runtime getting dropped
// and all server tasks to be cancelled
for handle in server_tasks {
if let Err(e) = handle.await {
eprintln!("Server task error: {:?}", e);
};
}

Ok(())
}

async fn client(
server_addr: SocketAddr,
remote_public_key: ed25519_dalek::PublicKey,
opt: Opt,
) -> Result<ClientStats> {
let (endpoint, connection) = connect_client(server_addr, remote_public_key, opt).await?;

let start = Instant::now();

let connection = Arc::new(connection);

let mut stats = ClientStats::default();
let mut first_error = None;

let sem = Arc::new(Semaphore::new(opt.max_streams));
let results = Arc::new(Mutex::new(Vec::new()));
for _ in 0..opt.streams {
let permit = sem.clone().acquire_owned().await.unwrap();
let results = results.clone();
let connection = connection.clone();
tokio::spawn(async move {
let result =
handle_client_stream(connection, opt.upload_size, opt.read_unordered).await;
info!("stream finished: {:?}", result);
results.lock().unwrap().push(result);
drop(permit);
});
}

// Wait for remaining streams to finish
let _ = sem.acquire_many(opt.max_streams as u32).await.unwrap();

for result in results.lock().unwrap().drain(..) {
match result {
Ok((upload_result, download_result)) => {
stats.upload_stats.stream_finished(upload_result);
stats.download_stats.stream_finished(download_result);
}
Err(e) => {
if first_error.is_none() {
first_error = Some(e);
}
}
}
}

stats.upload_stats.total_duration = start.elapsed();
stats.download_stats.total_duration = start.elapsed();

// Explicit close of the connection, since handles can still be around due
// to `Arc`ing them
connection.close(0u32.into(), b"Benchmark done");

endpoint.wait_idle().await;

if opt.stats {
println!("\nClient connection stats:\n{:#?}", connection.stats());
}

match first_error {
None => Ok(stats),
Some(e) => Err(e),
}
}

async fn handle_client_stream(
connection: Arc<quinn::Connection>,
upload_size: u64,
read_unordered: bool,
) -> Result<(TransferResult, TransferResult)> {
let start = Instant::now();

let (mut send_stream, mut recv_stream) = connection
.open_bi()
.await
.context("failed to open stream")?;

send_data_on_stream(&mut send_stream, upload_size).await?;

let upload_result = TransferResult::new(start.elapsed(), upload_size);

let start = Instant::now();
let size = drain_stream(&mut recv_stream, read_unordered).await?;
let download_result = TransferResult::new(start.elapsed(), size as u64);

Ok((upload_result, download_result))
}

#[derive(Default)]
struct ClientStats {
upload_stats: Stats,
download_stats: Stats,
}

impl ClientStats {
pub fn print(&self, client_id: usize) {
println!();
println!("Client {} stats:", client_id);

if self.upload_stats.total_size != 0 {
self.upload_stats.print("upload");
}

if self.download_stats.total_size != 0 {
self.download_stats.print("download");
}
}
}
Loading