@@ -85,6 +85,8 @@ def start(self):
85
85
"""Enable executor, starting requested amount of workers
86
86
87
87
Workers are started always, not provisioned dynamically"""
88
+ logger .debug (f"Starting { self .executor_name } with { self .nb_workers } threads" )
89
+
88
90
self .drain ()
89
91
self ._workers : set [threading .Thread ] = set ()
90
92
self .no_more = False
@@ -139,26 +141,30 @@ def drain(self):
139
141
140
142
def join (self ):
141
143
"""Await completion of workers, requesting them to stop taking new task"""
142
- logger .debug (f"joining all threads for { self .executor_name } " )
144
+ logger .debug (
145
+ f"joining all threads for { self .executor_name } ; threads have "
146
+ f"{ self .thread_deadline_sec } s to join before we give-up waiting for them"
147
+ )
143
148
self .no_more = True
144
- for num , t in enumerate (self ._workers ):
145
- deadline = datetime .datetime .now (tz = datetime .UTC ) + datetime .timedelta (
146
- seconds = self .thread_deadline_sec
147
- )
148
- logger .debug (
149
- f"Giving { self .executor_name } -{ num } { self .thread_deadline_sec } s to join"
150
- )
149
+ deadline = datetime .datetime .now (tz = datetime .UTC ) + datetime .timedelta (
150
+ seconds = self .thread_deadline_sec
151
+ )
152
+ alive_threads = list (filter (lambda t : t .is_alive (), self ._workers ))
153
+ for t in filter (lambda t : t not in alive_threads , self ._workers ):
154
+ logger .debug (f"Thread { t .name } is already dead. Skipping…" )
155
+ while (
156
+ len (alive_threads ) > 0 and datetime .datetime .now (tz = datetime .UTC ) < deadline
157
+ ):
151
158
e = threading .Event ()
152
- while t .is_alive () and datetime .datetime .now (tz = datetime .UTC ) < deadline :
153
- t .join (1 )
154
- e .wait (timeout = 2 )
155
- if t .is_alive ():
156
- logger .debug (
157
- f"Thread { self .executor_name } -{ num } is not joining. Skipping…"
158
- )
159
- else :
160
- logger .debug (f"Thread { self .executor_name } -{ num } joined" )
161
- logger .debug (f"all threads joined for { self .executor_name } " )
159
+ for t in alive_threads :
160
+ t .join (0.1 ) # just indicate to the thread that we want to stop
161
+ e .wait (2 ) # wait a bit more to let things cool down
162
+ for t in filter (lambda t : not t .is_alive (), alive_threads ):
163
+ logger .debug (f"Thread { t .name } joined" )
164
+ alive_threads .remove (t )
165
+ for t in alive_threads :
166
+ logger .debug (f"Thread { t .name } never joined. Skipping…" )
167
+ logger .debug (f"join completed for { self .executor_name } " )
162
168
163
169
def shutdown (self , * , wait = True ):
164
170
"""stop the executor, either somewhat immediately or awaiting completion"""
0 commit comments