Skip to content

Commit 2c5e7a5

Browse files
authored
Merge pull request #40 from nim-lang/feat_non_block_and_nim2
Feat non block and nim2
2 parents 06380aa + 073831a commit 2c5e7a5

File tree

8 files changed

+324
-37
lines changed

8 files changed

+324
-37
lines changed

tests/async_demo.nim

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import ../zmq
2+
import std/[asyncdispatch]
3+
4+
proc asyncpoll() =
5+
# test "asyncZPoller":
6+
block:
7+
const zaddr = "tcp://127.0.0.1:15571"
8+
const zaddr2 = "tcp://127.0.0.1:15572"
9+
var pusher = listen(zaddr, PUSH)
10+
var puller = connect(zaddr, PULL)
11+
12+
var pusher2 = listen(zaddr2, PUSH)
13+
var puller2 = connect(zaddr2, PULL)
14+
var poller: AsyncZPoller
15+
16+
var i = 0
17+
# Register the callback
18+
# assert message received are correct (should be even integer in string format)
19+
var msglist = @["0", "2", "4", "6", "8"]
20+
var msgCount = 0
21+
poller.register(
22+
puller2,
23+
ZMQ_POLLIN,
24+
proc(x: ZSocket) =
25+
let msg = x.receive()
26+
inc(msgCount)
27+
if msglist.contains(msg):
28+
msglist.delete(0)
29+
assert true
30+
else:
31+
assert false
32+
)
33+
# assert message received are correct (should be even integer in string format)
34+
var msglist2 = @["0", "2", "4", "6", "8"]
35+
var msgCount2 = 0
36+
poller.register(
37+
puller,
38+
ZMQ_POLLIN,
39+
proc(x: ZSocket) =
40+
let msg = x.receive()
41+
inc(msgCount2)
42+
if msglist2.contains(msg):
43+
msglist2.delete(0)
44+
assert true
45+
else:
46+
assert false
47+
)
48+
49+
let
50+
N = 10
51+
N_MAX_TIMEOUT = 5
52+
53+
var sndCount = 0
54+
# A client send some message
55+
for i in 0..<N:
56+
if (i mod 2) == 0:
57+
# Can periodically send stuff
58+
pusher.send($i)
59+
pusher2.send($i)
60+
inc(sndCount)
61+
62+
# N_MAX_TIMEOUT is the number of time the poller can timeout before exiting the loop
63+
while i < N_MAX_TIMEOUT:
64+
65+
# I don't recommend a high timeout because it's going to poll for the duration if there is no message in queue
66+
var fut = poller.pollAsync(1)
67+
let r = waitFor fut
68+
if r < 0:
69+
break # error case
70+
elif r == 0:
71+
inc(i)
72+
73+
# No longer polling but some callback may not have finished
74+
while hasPendingOperations():
75+
drain()
76+
77+
assert msgCount == msgCount2
78+
assert msgCount == sndCount
79+
80+
pusher.close()
81+
puller.close()
82+
pusher2.close()
83+
puller2.close()
84+
85+
when isMainModule:
86+
asyncpoll()

tests/tdestruc.nim

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import ../zmq
2+
import std/unittest
3+
4+
proc sockHandler(req, rep: ZConnection, pong: string) =
5+
req.send(pong)
6+
let r = rep.receive()
7+
check r == pong
8+
9+
proc testDestroy() =
10+
const sockaddr = "tcp://127.0.0.1:55001"
11+
12+
test "Destroy & Copy":
13+
let
14+
ping = "ping"
15+
pong = "pong"
16+
17+
var rep = listen(sockaddr, REP)
18+
var req = connect(sockaddr, REQ)
19+
20+
sockHandler(req, rep, ping)
21+
sockHandler(rep, req, pong)
22+
23+
block:
24+
var req2 = req
25+
req2.send(ping)
26+
let r = rep.receive()
27+
check r == ping
28+
29+
rep.send(pong)
30+
block:
31+
var req2 = req
32+
let r = req2.receive()
33+
check r == pong
34+
35+
when isMainModule:
36+
testDestroy()
37+

tests/tzmq.nim

+59-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import ../zmq
2-
import std/[unittest, os]
2+
import std/[unittest, os, times, monotimes]
33
import std/[asyncdispatch, asyncfutures]
44

55
proc reqrep() =
@@ -88,7 +88,8 @@ proc routerdealer() =
8888
test "routerdealer":
8989
const sockaddr = "tcp://127.0.0.1:55001"
9090
var router = listen(sockaddr, mode = ROUTER)
91-
router.setsockopt(RCVTIMEO, 350.cint)
91+
router.setsockopt(RCVTIMEO, 500.cint)
92+
9293
defer: router.close()
9394
var dealer = connect(sockaddr, mode = DEALER)
9495
defer: dealer.close()
@@ -106,12 +107,23 @@ proc routerdealer() =
106107
check dealer.receive() == payload
107108
# Let receive timeout
108109
block:
110+
let start = getMonoTime()
109111
# On receive return empty message
110-
let recv = router.receive()
112+
let
113+
recv = router.receive()
114+
stop = getMonoTime()
115+
elapsed = stop - start
116+
check (elapsed - initDuration(milliseconds=500)) < initDuration(milliseconds=1)
111117
check recv == ""
118+
112119
block:
113120
# On try receive, check flag is flase
114-
let recv = router.tryReceive()
121+
let
122+
start = getMonoTime()
123+
recv = router.waitForReceive(350)
124+
stop = getMonoTime()
125+
elapsed = stop - start
126+
check (elapsed - initDuration(milliseconds=350)) < initDuration(milliseconds=1)
115127
check recv.msgAvailable == false
116128

117129
proc inproc_sharectx() =
@@ -146,31 +158,32 @@ proc pairpair() =
146158

147159
var pairs = @[listen(sockaddr, PAIR), connect(sockaddr, PAIR)]
148160
pairs[1].setsockopt(RCVTIMEO, 500.cint)
161+
149162
block:
150163
pairs[0].send(ping, SNDMORE)
151164
pairs[0].send(ping, SNDMORE)
152165
pairs[0].send(ping)
153166

154167
block:
155-
let content = pairs[1].tryReceive()
168+
let content = pairs[1].waitForReceive()
156169
check content.msgAvailable
157170
check content.moreAvailable
158171
check content.msg == ping
159172

160173
block:
161-
let content = pairs[1].tryReceive()
174+
let content = pairs[1].waitForReceive()
162175
check content.msgAvailable
163176
check content.moreAvailable
164177
check content.msg == ping
165178

166179
block:
167-
let content = pairs[1].tryReceive()
180+
let content = pairs[1].waitForReceive()
168181
check content.msgAvailable
169182
check (not content.moreAvailable)
170183
check content.msg == ping
171184

172185
block:
173-
let content = pairs[1].tryReceive()
186+
let content = pairs[1].waitForReceive()
174187
check (not content.msgAvailable)
175188

176189
block:
@@ -300,6 +313,43 @@ proc async_pub_sub() =
300313
test "async pub_sub":
301314
check count == N_mSGS
302315

316+
proc non_blocking_recv() =
317+
const sockaddr = "tcp://127.0.0.1:55001"
318+
test "non-blocking receive":
319+
var router = listen(sockaddr, mode = ROUTER)
320+
let res = router.tryReceive()
321+
check res == (false, false, "")
322+
323+
var dealer = connect(sockaddr, mode = DEALER)
324+
let payload = "payload"
325+
block:
326+
# Dealer send a message to router
327+
dealer.send(payload)
328+
329+
block:
330+
# Remove "envelope" of router / dealer
331+
let dealerSocketId = router.receive()
332+
let res = router.waitForReceive(250)
333+
check res.msgAvailable
334+
check not res.moreAvailable
335+
check res.msg == payload
336+
337+
block:
338+
# Remove "envelope" of router / dealer
339+
let
340+
start = getMonoTime()
341+
res = router.waitForReceive(250)
342+
stop = getMonoTime()
343+
elapsed = stop - start
344+
check (elapsed - initDuration(milliseconds=250)) < initDuration(milliseconds=1)
345+
346+
check not res.msgAvailable
347+
check not res.moreAvailable
348+
check res.msg == ""
349+
350+
router.close(250)
351+
dealer.close(250)
352+
303353
when isMainModule:
304354
reqrep()
305355
pubsub()
@@ -308,3 +358,4 @@ when isMainModule:
308358
pairpair()
309359
async_pub_sub()
310360
asyncpoll()
361+
non_blocking_recv()

zmq.nimble

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
# Package
22

3-
version = "1.4.0"
3+
version = "1.5.0"
44
author = "Andreas Rumpf"
55
description = "ZeroMQ wrapper"
66
license = "MIT"
77

88
# Dependencies
9-
requires "nim >= 0.18.0"
9+
requires "nim >= 1.4.0"
1010

1111
task buildexamples, "Compile all examples":
1212
withDir "examples":
@@ -17,5 +17,5 @@ task buildexamples, "Compile all examples":
1717
selfExec("cpp --mm:orc -d:release " & fstr)
1818

1919
task gendoc, "Generate documentation":
20-
exec("nimble doc --project zmq.nim --out:docs/")
20+
exec("nim doc --mm:orc --project --out:docs/ zmq.nim")
2121

zmq/asynczmq.nim

+26-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import std/[asyncdispatch]
1+
import std/[asyncdispatch, selectors]
22
import ./connections
33
import ./bindings
44
import ./poller
@@ -14,9 +14,31 @@ type
1414
proc len*(poller: AsyncZPoller): int =
1515
result = poller.zpoll.len()
1616

17-
proc `=destroy`*(obj: var AsyncZPoller) =
18-
if hasPendingOperations():
19-
drain(500)
17+
proc waitAll(obj: AsyncZPoller) {.raises: [].} =
18+
# Is there a more elegant to do this ?
19+
# We want a helper function that will not raises to avoid excpetion in =destroy hooks
20+
21+
try:
22+
23+
while hasPendingOperations():
24+
drain(500)
25+
26+
except ValueError:
27+
discard
28+
29+
except OSError:
30+
discard
31+
32+
except ref IOSelectorsException:
33+
discard
34+
35+
except Exception:
36+
discard
37+
38+
proc `=destroy`*(obj: AsyncZPoller) =
39+
obj.waitAll()
40+
`=destroy`(obj.cb)
41+
`=destroy`(obj.zpoll)
2042

2143
proc register*(poller: var AsyncZPoller, sock: ZSocket, event: int, cb: AsyncZPollCB) =
2244
## Register ZSocket function

zmq/bindings.nim

-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
{.deadCodeElim: on.}
21
when defined(windows):
32
const
43
zmqdll* = "(lib|)zmq.dll"

0 commit comments

Comments
 (0)