Skip to content

Commit a3eaf42

Browse files
committed
Fix problem with running multiple code interpreters at the same time
1 parent 911028d commit a3eaf42

File tree

2 files changed

+18
-24
lines changed

2 files changed

+18
-24
lines changed

python/e2b_code_interpreter/main.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,13 @@ def __init__(
5252

5353

5454
class JupyterExtension:
55-
_default_kernel_id: Optional[str] = None
56-
_connected_kernels: Dict[str, Future[JupyterKernelWebSocket]] = {}
5755

5856
def __init__(self, sandbox: CodeInterpreter, timeout: Optional[float] = TIMEOUT):
5957
self._sandbox = sandbox
6058
self._kernel_id_set = Future()
6159
self._start_connecting_to_default_kernel(timeout=timeout)
60+
self._connected_kernels: Dict[str, Future[JupyterKernelWebSocket]] = {}
61+
self._default_kernel_id: Optional[str] = None
6262

6363
def exec_cell(
6464
self,

python/e2b_code_interpreter/messaging.py

+16-22
Original file line numberDiff line numberDiff line change
@@ -43,30 +43,29 @@ def __init__(
4343
self.on_result = on_result
4444

4545

46-
class JupyterKernelWebSocket(BaseModel):
47-
model_config = ConfigDict(arbitrary_types_allowed=True)
46+
class JupyterKernelWebSocket:
4847

49-
url: str
50-
51-
_cells: Dict[str, CellExecution] = {}
52-
_waiting_for_replies: Dict[str, DeferredFuture] = PrivateAttr(default_factory=dict)
53-
_queue_in: Queue = PrivateAttr(default_factory=Queue)
54-
_queue_out: Queue = PrivateAttr(default_factory=Queue)
55-
_process_cleanup: List[Callable[[], Any]] = PrivateAttr(default_factory=list)
56-
_closed: bool = PrivateAttr(default=False)
48+
def __init__(self, url: str):
49+
self.url = url
50+
self._cells: Dict[str, CellExecution] = {}
51+
self._waiting_for_replies: Dict[str, DeferredFuture] = {}
52+
self._queue_in = Queue()
53+
self._queue_out = Queue()
54+
self._stopped = threading.Event()
5755

5856
def process_messages(self):
59-
while True:
60-
data = self._queue_out.get()
57+
while not self._stopped.is_set():
58+
if self._queue_out.empty():
59+
time.sleep(0.01)
60+
continue
6161

62+
data = self._queue_out.get()
6263
logger.debug(f"WebSocket received message: {data}".strip())
6364
self._receive_message(json.loads(data))
6465
self._queue_out.task_done()
6566

6667
def connect(self, timeout: float = TIMEOUT):
6768
started = threading.Event()
68-
stopped = threading.Event()
69-
self._process_cleanup.append(stopped.set)
7069

7170
threading.Thread(
7271
target=self.process_messages, daemon=True, name="e2b-process-messages"
@@ -78,7 +77,7 @@ def connect(self, timeout: float = TIMEOUT):
7877
queue_in=self._queue_in,
7978
queue_out=self._queue_out,
8079
started=started,
81-
stopped=stopped,
80+
stopped=self._stopped
8281
).run,
8382
daemon=True,
8483
name="e2b-code-interpreter-websocket",
@@ -91,7 +90,7 @@ def connect(self, timeout: float = TIMEOUT):
9190
while (
9291
not started.is_set()
9392
and time.time() - start_time < timeout
94-
and not self._closed
93+
and not self._stopped.is_set()
9594
):
9695
time.sleep(0.1)
9796

@@ -245,12 +244,7 @@ def _receive_message(self, data: dict):
245244

246245
def close(self):
247246
logger.debug("Closing WebSocket")
248-
self._closed = True
249-
250-
for cancel in self._process_cleanup:
251-
cancel()
252-
253-
self._process_cleanup.clear()
247+
self._stopped.set()
254248

255249
for handler in self._waiting_for_replies.values():
256250
logger.debug(f"Cancelling waiting for execution result for {handler}")

0 commit comments

Comments
 (0)