Skip to content

Commit fd98938

Browse files
authored
Merge pull request #28 from Clonkk/master
Async comment, proxy API
2 parents 32a470d + a3da58d commit fd98938

File tree

6 files changed

+341
-174
lines changed

6 files changed

+341
-174
lines changed

examples/ex06_pollermultipart.nim

+1-8
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,6 @@ import ../zmq
66
const address = "tcp://127.0.0.1:5559"
77
const max_msg = 10
88

9-
proc receiveMultipart(socket: ZSocket, flags: ZSendRecvOptions): seq[string] =
10-
# Little trick to receive all multipart message no matter how many parts there is using getsockopt
11-
var hasMore: int = 1
12-
while hasMore > 0:
13-
result.add(socket.receive())
14-
hasMore = getsockopt[int](socket, RCVMORE)
15-
169

1710
proc client() =
1811
var d1 = connect(address, mode = DEALER)
@@ -35,7 +28,7 @@ proc client() =
3528
if res > 0:
3629
for i in 0..<len(poller):
3730
if events(poller[i]):
38-
let buf = receiveMultipart(poller[i].socket, NOFLAGS)
31+
let buf = receiveAll(poller[i].socket, NOFLAGS)
3932
for j, msg in buf.pairs:
4033
echo &"CLIENT> Socket{i} received \"{msg}\""
4134
else:

tests/tzmq.nim

+177-121
Original file line numberDiff line numberDiff line change
@@ -1,134 +1,190 @@
11
import ../zmq
2-
import os
2+
import std/[unittest, os]
33

44
proc reqrep() =
5-
const sockaddr = "tcp://127.0.0.1:55001"
6-
let
7-
ping = "ping"
8-
pong = "pong"
9-
10-
var
11-
rep = listen(sockaddr, REP)
12-
req = connect(sockaddr, REQ)
13-
14-
req.send(ping)
15-
assert rep.receive() == ping
16-
rep.send(pong)
17-
assert req.receive() == pong
18-
19-
rep.close()
20-
req.close()
5+
test "reqrep":
6+
const sockaddr = "tcp://127.0.0.1:55001"
7+
let
8+
ping = "ping"
9+
pong = "pong"
10+
11+
var rep = listen(sockaddr, REP)
12+
defer: rep.close()
13+
var req = connect(sockaddr, REQ)
14+
defer: req.close()
15+
16+
block:
17+
req.send(ping)
18+
let r = rep.receive()
19+
check r == ping
20+
block:
21+
rep.send(pong)
22+
let r = req.receive()
23+
check r == pong
2124

2225
proc pubsub() =
23-
const sockaddr = "tcp://127.0.0.1:55001"
24-
let
25-
topic1 = "topic1"
26-
topic2 = "topic2"
27-
28-
var pub = listen(sockaddr, PUB)
29-
var
30-
broadcast = connect(sockaddr, SUB)
31-
sub1 = connect(sockaddr, SUB)
32-
sub2 = connect(sockaddr, SUB)
33-
# Subscribe to all topic
34-
broadcast.setsockopt(SUBSCRIBE, "")
35-
# Subscribe to topic
36-
sub1.setsockopt(SUBSCRIBE, topic1)
37-
sub2.setsockopt(SUBSCRIBE, topic2)
38-
39-
# Slow-joiner pattern -> PUB / SUB Pattern needs a bit of time to establish connection
40-
sleep(200)
41-
42-
# Topic1
43-
pub.send(topic1, SNDMORE)
44-
pub.send("content1")
45-
block alltopic:
46-
let topic = broadcast.receive()
47-
let msg = broadcast.receive()
48-
assert topic == topic1
49-
assert msg == "content1"
50-
block s1:
51-
let topic = sub1.receive()
52-
let msg = sub1.receive()
53-
assert topic == topic1
54-
assert msg == "content1"
55-
56-
# Topic2
57-
pub.send(topic2, SNDMORE)
58-
pub.send("content2")
59-
block alltopic:
60-
let topic = broadcast.receive()
61-
let msg = broadcast.receive()
62-
assert topic == topic2
63-
assert msg == "content2"
64-
block s2:
65-
let topic = sub2.receive()
66-
let msg = sub2.receive()
67-
assert topic == topic2
68-
assert msg == "content2"
69-
70-
# Broadcast
71-
pub.send("", SNDMORE)
72-
pub.send("content3")
73-
block alltopic:
74-
let topic = broadcast.receive()
75-
let msg = broadcast.receive()
76-
assert topic == ""
77-
assert msg == "content3"
78-
79-
sub1.close()
80-
sub2.close()
81-
broadcast.close()
82-
pub.close()
26+
test "pubsub":
27+
const sockaddr = "tcp://127.0.0.1:55001"
28+
let
29+
topic1 = "topic1"
30+
topic2 = "topic2"
31+
32+
var pub = listen(sockaddr, PUB)
33+
defer: pub.close()
34+
var broadcast = connect(sockaddr, SUB)
35+
defer: broadcast.close()
36+
var sub1 = connect(sockaddr, SUB)
37+
defer: sub1.close()
38+
var sub2 = connect(sockaddr, SUB)
39+
defer: sub2.close()
40+
# Subscribe to all topic
41+
broadcast.setsockopt(SUBSCRIBE, "")
42+
# Subscribe to topic
43+
sub1.setsockopt(SUBSCRIBE, topic1)
44+
sub2.setsockopt(SUBSCRIBE, topic2)
45+
46+
# Slow-joiner pattern -> PUB / SUB Pattern needs a bit of time to establish connection
47+
sleep(200)
48+
49+
# Topic1
50+
pub.send(topic1, SNDMORE)
51+
pub.send("content1")
52+
block alltopic:
53+
let topic = broadcast.receive()
54+
let msg = broadcast.receive()
55+
check topic == topic1
56+
check msg == "content1"
57+
block s1:
58+
let topic = sub1.receive()
59+
let msg = sub1.receive()
60+
check topic == topic1
61+
check msg == "content1"
62+
63+
# Topic2
64+
pub.send(topic2, SNDMORE)
65+
pub.send("content2")
66+
block alltopic:
67+
let topic = broadcast.receive()
68+
let msg = broadcast.receive()
69+
check topic == topic2
70+
check msg == "content2"
71+
block s2:
72+
let topic = sub2.receive()
73+
let msg = sub2.receive()
74+
check topic == topic2
75+
check msg == "content2"
76+
77+
# Broadcast
78+
pub.send("", SNDMORE)
79+
pub.send("content3")
80+
block alltopic:
81+
let topic = broadcast.receive()
82+
let msg = broadcast.receive()
83+
check topic == ""
84+
check msg == "content3"
8385

8486
proc routerdealer() =
85-
const sockaddr = "tcp://127.0.0.1:55001"
86-
var router = listen(sockaddr, mode = ROUTER)
87-
var dealer = connect(sockaddr, mode = DEALER)
88-
89-
let payload = "payload"
90-
# Dealer send a message to router
91-
dealer.send(payload)
92-
# Remove "envelope" of router / dealer
93-
let dealerSocketId = router.receive()
94-
let msg = router.receive()
95-
assert msg == payload
96-
# Reply to the Dealer
97-
router.send(dealerSocketId, SNDMORE)
98-
router.send(payload)
99-
assert dealer.receive() == payload
87+
test "routerdealer":
88+
const sockaddr = "tcp://127.0.0.1:55001"
89+
var router = listen(sockaddr, mode = ROUTER)
90+
router.setsockopt(RCVTIMEO, 350.cint)
91+
defer: router.close()
92+
var dealer = connect(sockaddr, mode = DEALER)
93+
defer: dealer.close()
94+
95+
let payload = "payload"
96+
# Dealer send a message to router
97+
dealer.send(payload)
98+
# Remove "envelope" of router / dealer
99+
let dealerSocketId = router.receive()
100+
let msg = router.receive()
101+
check msg == payload
102+
# Reply to the Dealer
103+
router.send(dealerSocketId, SNDMORE)
104+
router.send(payload)
105+
check dealer.receive() == payload
106+
# Let receive timeout
107+
block:
108+
# On receive return empty message
109+
let recv = router.receive()
110+
check recv == ""
111+
block:
112+
# On try receive, check flag is flase
113+
let recv = router.tryReceive()
114+
check recv.msgAvailable == false
100115

101116
proc inproc_sharectx() =
102-
# AFAIK, inproc only works for Linux
103-
when defined(linux):
104-
# Check sharing context works for inproc
117+
test "inproc":
118+
# AFAIK, inproc only works for Linux
119+
when defined(linux):
120+
# Check sharing context works for inproc
121+
let
122+
inprocpath = getTempDir() / "nimzmq"
123+
sockaddr = "inproc://" & inprocpath
124+
var
125+
server = listen(sockaddr, PAIR)
126+
client = connect(sockaddr, PAIR, server.context)
127+
128+
client.send("Hello")
129+
check server.receive() == "Hello"
130+
server.send("World")
131+
check client.receive() == "World"
132+
133+
client.close()
134+
server.close()
135+
136+
else:
137+
discard
138+
139+
proc pairpair() =
140+
test "pairpair_sndmore":
141+
const sockaddr = "tcp://127.0.0.1:55001"
105142
let
106-
inprocpath = getTempDir() / "nimzmq"
107-
sockaddr = "inproc://" & inprocpath
108-
var
109-
server = listen(sockaddr, PAIR)
110-
client = connect(sockaddr, PAIR, server.context)
111-
112-
client.send("Hello")
113-
assert server.receive() == "Hello"
114-
server.send("World")
115-
assert client.receive() == "World"
116-
117-
client.close()
118-
server.close()
119-
120-
else:
121-
discard
143+
ping = "ping"
144+
pong = "pong"
145+
146+
var pairs = @[listen(sockaddr, PAIR), connect(sockaddr, PAIR)]
147+
pairs[1].setsockopt(RCVTIMEO, 500.cint)
148+
block:
149+
pairs[0].send(ping, SNDMORE)
150+
pairs[0].send(ping, SNDMORE)
151+
pairs[0].send(ping)
152+
153+
block:
154+
let content = pairs[1].tryReceive()
155+
check content.msgAvailable
156+
check content.moreAvailable
157+
check content.msg == ping
158+
159+
block:
160+
let content = pairs[1].tryReceive()
161+
check content.msgAvailable
162+
check content.moreAvailable
163+
check content.msg == ping
164+
165+
block:
166+
let content = pairs[1].tryReceive()
167+
check content.msgAvailable
168+
check (not content.moreAvailable)
169+
check content.msg == ping
170+
171+
block:
172+
let content = pairs[1].tryReceive()
173+
check (not content.msgAvailable)
174+
175+
block:
176+
let msgs = [pong, pong, pong]
177+
pairs[1].sendAll(msgs)
178+
let contents = pairs[0].receiveAll()
179+
check contents == msgs
180+
181+
for p in pairs.mitems:
182+
p.close()
122183

123184
when isMainModule:
124-
block reqrep:
125-
reqrep()
126-
block pubsub:
127-
pubsub()
128-
block inproc:
129-
inproc_sharectx()
130-
block routerdealer:
131-
routerdealer()
132-
block poller:
133-
discard
185+
reqrep()
186+
pubsub()
187+
inproc_sharectx()
188+
routerdealer()
189+
pairpair()
134190

zmq.nimble

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Package
22

3-
version = "1.1.0"
3+
version = "1.2.0"
44
author = "Andreas Rumpf"
55
description = "ZeroMQ wrapper"
66
license = "MIT"

zmq/asynczmq.nim

+11-2
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,15 @@ import ./bindings
44

55
proc receiveAsync*(conn: ZConnection): Future[string] =
66
## Similar to `receive()`, but `receiveAsync()` allows other async tasks to run.
7+
## `receiveAsync()` allows other async tasks to run in those cases.
8+
##
9+
## This will not work in some case because it depends on ZMQ_FD which is not necessarily the 'true' FD of the socket
10+
##
11+
## See https://github.com/zeromq/libzmq/issues/2941 and https://github.com/zeromq/pyzmq/issues/1411
712
let fut = newFuture[string]("receiveAsync")
813
result = fut
914

10-
proc cb(fd: AsyncFD): bool {.closure,gcsafe.} =
15+
proc cb(fd: AsyncFD): bool {.closure, gcsafe.} =
1116
result = true
1217

1318
# ignore if already finished
@@ -33,13 +38,17 @@ proc receiveAsync*(conn: ZConnection): Future[string] =
3338
proc sendAsync*(conn: ZConnection, msg: string, flags: ZSendRecvOptions = DONTWAIT): Future[void] =
3439
## `send()` is blocking for some connection types (e.g. PUSH, DEALER).
3540
## `sendAsync()` allows other async tasks to run in those cases.
41+
##
42+
## This will not work in some case because it depends on ZMQ_FD which is not necessarily the 'true' FD of the socket
43+
##
44+
## See https://github.com/zeromq/libzmq/issues/2941 and https://github.com/zeromq/pyzmq/issues/1411
3645
let fut = newFuture[void]("sendAsync")
3746
result = fut
3847

3948
let status = getsockopt[cint](conn, ZSockOptions.EVENTS)
4049
if (status and ZMQ_POLLOUT) == 0:
4150
# wait until queue available
42-
proc cb(fd: AsyncFD): bool {.closure,gcsafe.} =
51+
proc cb(fd: AsyncFD): bool {.closure, gcsafe.} =
4352
result = true
4453

4554
# ignore if already finished

0 commit comments

Comments
 (0)