Skip to content

Commit b87c5b7

Browse files
committed
Add executor to zimscraperlib
1 parent ea6505f commit b87c5b7

File tree

2 files changed

+459
-0
lines changed

2 files changed

+459
-0
lines changed

src/zimscraperlib/executor.py

+173
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
import datetime
2+
import queue
3+
import threading
4+
from collections.abc import Callable
5+
6+
from zimscraperlib import logger
7+
8+
_shutdown = False
9+
# Lock that ensures that new workers are not created while the interpreter is
10+
# shutting down. Must be held while mutating _threads_queues and _shutdown.
11+
_global_shutdown_lock = threading.Lock()
12+
13+
14+
def excepthook(args): # pragma: no cover
15+
logger.error(f"UNHANDLED Exception in {args.thread.name}: {args.exc_type}")
16+
logger.exception(args.exc_value)
17+
18+
19+
threading.excepthook = excepthook
20+
21+
22+
class ScraperExecutor(queue.Queue):
23+
"""Custom FIFO queue based Executor that's less generic than ThreadPoolExec one
24+
25+
Providing more flexibility for the use cases we're interested about:
26+
- halt immediately (sort of) upon exception (if requested)
27+
- able to join() then restart later to accomodate successive steps
28+
29+
See: https://github.com/python/cpython/blob/3.8/Lib/concurrent/futures/thread.py
30+
"""
31+
32+
def __init__(
33+
self,
34+
queue_size: int = 10,
35+
nb_workers: int = 1,
36+
executor_name: str = "executor",
37+
thread_deadline_sec: int = 60,
38+
):
39+
super().__init__(queue_size)
40+
self.executor_name = executor_name
41+
self._shutdown_lock = threading.Lock()
42+
self.nb_workers = nb_workers
43+
self.exceptions = []
44+
self.thread_deadline_sec = thread_deadline_sec
45+
46+
@property
47+
def exception(self):
48+
"""Exception raises in any thread, if any"""
49+
try:
50+
return self.exceptions[0:1].pop()
51+
except IndexError:
52+
return None
53+
54+
@property
55+
def alive(self):
56+
"""whether it should continue running"""
57+
return not self._shutdown
58+
59+
def submit(self, task: Callable, **kwargs):
60+
"""Submit a callable and its kwargs for execution in one of the workers"""
61+
with self._shutdown_lock, _global_shutdown_lock:
62+
if not self.alive:
63+
raise RuntimeError("cannot submit task to dead executor")
64+
if self.no_more:
65+
raise RuntimeError(
66+
"cannot submit task to a joined executor, restart it first"
67+
)
68+
if _shutdown:
69+
raise RuntimeError( # pragma: no cover
70+
"cannot submit task after interpreter shutdown"
71+
)
72+
73+
while True:
74+
try:
75+
self.put((task, kwargs), block=True, timeout=3.0)
76+
except queue.Full:
77+
if self.no_more:
78+
# rarely happens except if submit and join are done in different
79+
# threads, but we need this to escape the while loop
80+
break # pragma: no cover
81+
else:
82+
break
83+
84+
def start(self):
85+
"""Enable executor, starting requested amount of workers
86+
87+
Workers are started always, not provisioned dynamically"""
88+
self.drain()
89+
self._workers: set[threading.Thread] = set()
90+
self.no_more = False
91+
self._shutdown = False
92+
self.exceptions[:] = []
93+
94+
for n in range(self.nb_workers):
95+
t = threading.Thread(target=self.worker, name=f"{self.executor_name}-{n}")
96+
t.daemon = True
97+
t.start()
98+
self._workers.add(t)
99+
100+
def worker(self):
101+
while self.alive or self.no_more:
102+
try:
103+
func, kwargs = self.get(block=True, timeout=2.0)
104+
except queue.Empty:
105+
if self.no_more:
106+
break
107+
continue
108+
except TypeError: # pragma: no cover
109+
# received None from the queue. most likely shuting down
110+
return
111+
112+
raises = kwargs.pop("raises") if "raises" in kwargs.keys() else False
113+
callback = kwargs.pop("callback") if "callback" in kwargs.keys() else None
114+
dont_release = kwargs.pop("dont_release", False)
115+
116+
try:
117+
func(**kwargs)
118+
except Exception as exc:
119+
logger.error(f"Error processing {func} with {kwargs=}")
120+
logger.exception(exc)
121+
if raises: # to cover when raises = False
122+
self.exceptions.append(exc)
123+
self.shutdown()
124+
finally:
125+
# user will manually release the queue for this task.
126+
# most likely in a libzim-written callback
127+
if not dont_release:
128+
self.task_done()
129+
if callback:
130+
callback.__call__()
131+
132+
def drain(self):
133+
"""Empty the queue without processing the tasks (tasks will be lost)"""
134+
while True:
135+
try:
136+
self.get_nowait()
137+
except queue.Empty:
138+
break
139+
140+
def join(self):
141+
"""Await completion of workers, requesting them to stop taking new task"""
142+
logger.debug(f"joining all threads for {self.executor_name}")
143+
self.no_more = True
144+
for num, t in enumerate(self._workers):
145+
deadline = datetime.datetime.now(tz=datetime.UTC) + datetime.timedelta(
146+
seconds=self.thread_deadline_sec
147+
)
148+
logger.debug(
149+
f"Giving {self.executor_name}-{num} {self.thread_deadline_sec}s to join"
150+
)
151+
e = threading.Event()
152+
while t.is_alive() and datetime.datetime.now(tz=datetime.UTC) < deadline:
153+
t.join(1)
154+
e.wait(timeout=2)
155+
if t.is_alive():
156+
logger.debug(
157+
f"Thread {self.executor_name}-{num} is not joining. Skipping…"
158+
)
159+
else:
160+
logger.debug(f"Thread {self.executor_name}-{num} joined")
161+
logger.debug(f"all threads joined for {self.executor_name}")
162+
163+
def shutdown(self, *, wait=True):
164+
"""stop the executor, either somewhat immediately or awaiting completion"""
165+
logger.debug(f"shutting down {self.executor_name} with {wait=}")
166+
with self._shutdown_lock:
167+
self._shutdown = True
168+
169+
# Drain all work items from the queue
170+
if not wait:
171+
self.drain()
172+
if wait:
173+
self.join()

0 commit comments

Comments
 (0)