Skip to content

Commit 2253a0a

Browse files
author
Michael Eden
authored
Merge pull request #111 from illegalprime/feat/tokio
Add tokio support.
2 parents 5c534de + b51db78 commit 2253a0a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+4136
-1525
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
/target
22
/Cargo.lock
33

4+
# emacs
5+
*.#*.rs
6+
47
# Windows image file caches
58
Thumbs.db
69
ehthumbs.db

.travis.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ before_script:
77

88
script:
99
- cargo fmt -- --write-mode=diff
10-
- cargo build --features nightly
10+
- ./scripts/build-all.sh
1111
- cargo test --features nightly
1212
- cargo bench --features nightly
1313

Cargo.toml

+18-13
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,24 @@ license = "MIT"
1818

1919
[dependencies]
2020
hyper = "^0.10.6"
21-
unicase = "^1.0"
22-
url = "^1.0"
23-
bitflags = "^0.8"
24-
rand = "^0.3"
25-
byteorder = "^1.0"
26-
sha1 = "^0.2"
27-
openssl = { version = "^0.9.10", optional = true }
28-
base64 = "^0.5"
29-
30-
[dev-dependencies]
31-
serde_json = "^1.0"
21+
unicase = "1.0"
22+
url = "1.0"
23+
bitflags = "0.8"
24+
rand = "0.3"
25+
byteorder = "1.0"
26+
sha1 = "0.2"
27+
base64 = "0.5"
28+
futures = { version = "0.1", optional = true }
29+
tokio-core = { version = "0.1", optional = true }
30+
tokio-io = { version = "^0.1.2", optional = true }
31+
tokio-tls = { version = "0.1", optional = true }
32+
bytes = { version = "0.4", optional = true }
33+
native-tls = { version = "^0.1.2", optional = true }
3234

3335
[features]
34-
default = ["ssl"]
35-
ssl = ["openssl"]
36+
default = ["sync", "sync-ssl", "async", "async-ssl"]
37+
sync = []
38+
sync-ssl = ["native-tls", "sync"]
39+
async = ["tokio-core", "tokio-io", "bytes", "futures"]
40+
async-ssl = ["native-tls", "tokio-tls", "async"]
3641
nightly = ["hyper/nightly"]

autobahn/client-results.json

+6-6
Original file line numberDiff line numberDiff line change
@@ -1723,7 +1723,7 @@
17231723
"reportfile": "rust_websocket_case_3_2.json"
17241724
},
17251725
"3.3": {
1726-
"behavior": "NON-STRICT",
1726+
"behavior": "OK",
17271727
"behaviorClose": "OK",
17281728
"duration": 2,
17291729
"remoteCloseCode": null,
@@ -2773,28 +2773,28 @@
27732773
"reportfile": "rust_websocket_case_6_3_2.json"
27742774
},
27752775
"6.4.1": {
2776-
"behavior": "NON-STRICT",
2776+
"behavior": "OK",
27772777
"behaviorClose": "OK",
27782778
"duration": 2002,
27792779
"remoteCloseCode": null,
27802780
"reportfile": "rust_websocket_case_6_4_1.json"
27812781
},
27822782
"6.4.2": {
2783-
"behavior": "NON-STRICT",
2783+
"behavior": "OK",
27842784
"behaviorClose": "OK",
27852785
"duration": 2002,
27862786
"remoteCloseCode": null,
27872787
"reportfile": "rust_websocket_case_6_4_2.json"
27882788
},
27892789
"6.4.3": {
2790-
"behavior": "NON-STRICT",
2790+
"behavior": "OK",
27912791
"behaviorClose": "OK",
27922792
"duration": 2003,
27932793
"remoteCloseCode": null,
27942794
"reportfile": "rust_websocket_case_6_4_3.json"
27952795
},
27962796
"6.4.4": {
2797-
"behavior": "NON-STRICT",
2797+
"behavior": "OK",
27982798
"behaviorClose": "OK",
27992799
"duration": 2002,
28002800
"remoteCloseCode": null,
@@ -3634,4 +3634,4 @@
36343634
"reportfile": "rust_websocket_case_9_8_6.json"
36353635
}
36363636
}
3637-
}
3637+
}

autobahn/server-results.json

+5-5
Original file line numberDiff line numberDiff line change
@@ -2773,28 +2773,28 @@
27732773
"reportfile": "rust_websocket_case_6_3_2.json"
27742774
},
27752775
"6.4.1": {
2776-
"behavior": "NON-STRICT",
2776+
"behavior": "OK",
27772777
"behaviorClose": "OK",
27782778
"duration": 2002,
27792779
"remoteCloseCode": null,
27802780
"reportfile": "rust_websocket_case_6_4_1.json"
27812781
},
27822782
"6.4.2": {
2783-
"behavior": "NON-STRICT",
2783+
"behavior": "OK",
27842784
"behaviorClose": "OK",
27852785
"duration": 2001,
27862786
"remoteCloseCode": null,
27872787
"reportfile": "rust_websocket_case_6_4_2.json"
27882788
},
27892789
"6.4.3": {
2790-
"behavior": "NON-STRICT",
2790+
"behavior": "OK",
27912791
"behaviorClose": "OK",
27922792
"duration": 2003,
27932793
"remoteCloseCode": null,
27942794
"reportfile": "rust_websocket_case_6_4_3.json"
27952795
},
27962796
"6.4.4": {
2797-
"behavior": "NON-STRICT",
2797+
"behavior": "OK",
27982798
"behaviorClose": "OK",
27992799
"duration": 2003,
28002800
"remoteCloseCode": null,
@@ -3634,4 +3634,4 @@
36343634
"reportfile": "rust_websocket_case_9_8_6.json"
36353635
}
36363636
}
3637-
}
3637+
}

examples/async-autobahn-client.rs

+109
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
extern crate websocket;
2+
extern crate tokio_core;
3+
extern crate futures;
4+
5+
use websocket::{ClientBuilder, OwnedMessage};
6+
use websocket::result::WebSocketError;
7+
use tokio_core::reactor::Core;
8+
use futures::sink::Sink;
9+
use futures::stream::Stream;
10+
use futures::Future;
11+
use futures::future::{self, Loop};
12+
13+
fn main() {
14+
let addr = "ws://127.0.0.1:9001".to_string();
15+
let agent = "rust-websocket";
16+
let mut core = Core::new().unwrap();
17+
let handle = core.handle();
18+
19+
println!("Using fuzzingserver {}", addr);
20+
println!("Using agent {}", agent);
21+
22+
let case_count = get_case_count(addr.clone(), &mut core);
23+
println!("We will be running {} test cases!", case_count);
24+
25+
println!("Running test suite...");
26+
for case_id in 1..(case_count + 1) {
27+
let url = addr.clone() + "/runCase?case=" + &case_id.to_string()[..] + "&agent=" + agent;
28+
29+
let test_case = ClientBuilder::new(&url)
30+
.unwrap()
31+
.async_connect_insecure(&handle)
32+
.and_then(move |(duplex, _)| {
33+
println!("Executing test case: {}/{}", case_id, case_count);
34+
future::loop_fn(duplex, |stream| {
35+
stream.into_future()
36+
.or_else(|(err, stream)| {
37+
println!("Could not receive message: {:?}", err);
38+
stream.send(OwnedMessage::Close(None)).map(|s| (None, s))
39+
})
40+
.and_then(|(msg, stream)| match msg {
41+
Some(OwnedMessage::Text(txt)) => {
42+
stream.send(OwnedMessage::Text(txt))
43+
.map(|s| Loop::Continue(s))
44+
.boxed()
45+
}
46+
Some(OwnedMessage::Binary(bin)) => {
47+
stream.send(OwnedMessage::Binary(bin))
48+
.map(|s| Loop::Continue(s))
49+
.boxed()
50+
}
51+
Some(OwnedMessage::Ping(data)) => {
52+
stream.send(OwnedMessage::Pong(data))
53+
.map(|s| Loop::Continue(s))
54+
.boxed()
55+
}
56+
Some(OwnedMessage::Close(_)) => {
57+
stream.send(OwnedMessage::Close(None))
58+
.map(|_| Loop::Break(()))
59+
.boxed()
60+
}
61+
Some(OwnedMessage::Pong(_)) => {
62+
future::ok(Loop::Continue(stream)).boxed()
63+
}
64+
None => future::ok(Loop::Break(())).boxed(),
65+
})
66+
})
67+
})
68+
.map(move |_| {
69+
println!("Test case {} is finished!", case_id);
70+
})
71+
.or_else(move |err| {
72+
println!("Test case {} ended with an error: {:?}", case_id, err);
73+
Ok(()) as Result<(), ()>
74+
});
75+
76+
core.run(test_case).ok();
77+
}
78+
79+
update_reports(addr.clone(), agent, &mut core);
80+
println!("Test suite finished!");
81+
}
82+
83+
fn get_case_count(addr: String, core: &mut Core) -> usize {
84+
let url = addr + "/getCaseCount";
85+
let err = "Unsupported message in /getCaseCount";
86+
87+
let counter = ClientBuilder::new(&url)
88+
.unwrap()
89+
.async_connect_insecure(&core.handle())
90+
.and_then(|(s, _)| s.into_future().map_err(|e| e.0))
91+
.and_then(|(msg, _)| match msg {
92+
Some(OwnedMessage::Text(txt)) => Ok(txt.parse().unwrap()),
93+
_ => Err(WebSocketError::ProtocolError(err)),
94+
});
95+
core.run(counter).unwrap()
96+
}
97+
98+
fn update_reports(addr: String, agent: &str, core: &mut Core) {
99+
println!("Updating reports...");
100+
let url = addr + "/updateReports?agent=" + agent;
101+
102+
let updater = ClientBuilder::new(&url)
103+
.unwrap()
104+
.async_connect_insecure(&core.handle())
105+
.and_then(|(sink, _)| sink.send(OwnedMessage::Close(None)));
106+
core.run(updater).unwrap();
107+
108+
println!("Reports updated.");
109+
}

examples/async-autobahn-server.rs

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
extern crate websocket;
2+
extern crate futures;
3+
extern crate tokio_core;
4+
5+
use websocket::message::OwnedMessage;
6+
use websocket::server::InvalidConnection;
7+
use websocket::async::Server;
8+
9+
use tokio_core::reactor::Core;
10+
use futures::{Future, Sink, Stream};
11+
12+
fn main() {
13+
let mut core = Core::new().unwrap();
14+
let handle = core.handle();
15+
// bind to the server
16+
let server = Server::bind("127.0.0.1:9002", &handle).unwrap();
17+
18+
// time to build the server's future
19+
// this will be a struct containing everything the server is going to do
20+
21+
// a stream of incoming connections
22+
let f = server.incoming()
23+
// we don't wanna save the stream if it drops
24+
.map_err(|InvalidConnection { error, .. }| error)
25+
.for_each(|(upgrade, addr)| {
26+
// accept the request to be a ws connection
27+
println!("Got a connection from: {}", addr);
28+
let f = upgrade
29+
.accept()
30+
.and_then(|(s, _)| {
31+
// simple echo server impl
32+
let (sink, stream) = s.split();
33+
stream
34+
.take_while(|m| Ok(!m.is_close()))
35+
.filter_map(|m| {
36+
match m {
37+
OwnedMessage::Ping(p) => Some(OwnedMessage::Pong(p)),
38+
OwnedMessage::Pong(_) => None,
39+
_ => Some(m),
40+
}
41+
})
42+
.forward(sink)
43+
.and_then(|(_, sink)| {
44+
sink.send(OwnedMessage::Close(None))
45+
})
46+
});
47+
48+
handle.spawn(f.map_err(move |e| println!("{}: '{:?}'", addr, e))
49+
.map(move |_| println!("{} closed.", addr)));
50+
Ok(())
51+
});
52+
53+
core.run(f).unwrap();
54+
}

examples/async-client.rs

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
extern crate websocket;
2+
extern crate futures;
3+
extern crate tokio_core;
4+
5+
use std::thread;
6+
use std::io::stdin;
7+
use tokio_core::reactor::Core;
8+
use futures::future::Future;
9+
use futures::sink::Sink;
10+
use futures::stream::Stream;
11+
use futures::sync::mpsc;
12+
use websocket::result::WebSocketError;
13+
use websocket::{ClientBuilder, OwnedMessage};
14+
15+
const CONNECTION: &'static str = "ws://127.0.0.1:2794";
16+
17+
fn main() {
18+
println!("Connecting to {}", CONNECTION);
19+
let mut core = Core::new().unwrap();
20+
21+
// standard in isn't supported in mio yet, so we use a thread
22+
// see https://github.com/carllerche/mio/issues/321
23+
let (usr_msg, stdin_ch) = mpsc::channel(0);
24+
thread::spawn(move || {
25+
let mut input = String::new();
26+
let mut stdin_sink = usr_msg.wait();
27+
loop {
28+
input.clear();
29+
stdin().read_line(&mut input).unwrap();
30+
let trimmed = input.trim();
31+
32+
let (close, msg) = match trimmed {
33+
"/close" => (true, OwnedMessage::Close(None)),
34+
"/ping" => (false, OwnedMessage::Ping(b"PING".to_vec())),
35+
_ => (false, OwnedMessage::Text(trimmed.to_string())),
36+
};
37+
38+
stdin_sink.send(msg)
39+
.expect("Sending message across stdin channel.");
40+
41+
if close {
42+
break;
43+
}
44+
}
45+
});
46+
47+
let runner = ClientBuilder::new(CONNECTION)
48+
.unwrap()
49+
.add_protocol("rust-websocket")
50+
.async_connect_insecure(&core.handle())
51+
.and_then(|(duplex, _)| {
52+
let (sink, stream) = duplex.split();
53+
stream.filter_map(|message| {
54+
println!("Received Message: {:?}", message);
55+
match message {
56+
OwnedMessage::Close(e) => Some(OwnedMessage::Close(e)),
57+
OwnedMessage::Ping(d) => Some(OwnedMessage::Pong(d)),
58+
_ => None,
59+
}
60+
})
61+
.select(stdin_ch.map_err(|_| WebSocketError::NoDataAvailable))
62+
.forward(sink)
63+
});
64+
core.run(runner).unwrap();
65+
}

0 commit comments

Comments
 (0)