|
299 | 299 | (io/file (or (System/getenv "XTDB_DATA_DIR") "/var/lib/xtdb")))
|
300 | 300 |
|
301 | 301 | (def nodes (atom {}))
|
| 302 | +(def active-queries-per-node (atom {})) |
| 303 | + |
302 | 304 | (defn- atom-add-node [nodes name]
|
303 | 305 | (if (get nodes name)
|
304 | 306 | ;; Node was already added, so we can just return nodes
|
|
350 | 352 | ;; Node is not there, so nothing to delete
|
351 | 353 | nodes
|
352 | 354 | (do
|
353 |
| - (.close node) |
354 |
| - (run! io/delete-file (reverse (file-seq (io/file node-dir name)))) |
355 | 355 | (dissoc nodes name)))))
|
356 | 356 |
|
357 | 357 | (defn- delete-node [request]
|
358 |
| - (let [name (get-in request [:parameters :body :node])] |
359 |
| - (if (get @nodes name) |
| 358 | + (let [name (get-in request [:parameters :body :node]) |
| 359 | + node (get @nodes name)] |
| 360 | + (if node |
360 | 361 | (do
|
| 362 | + ;; We first remove the node so no new queries can be started |
361 | 363 | (swap! nodes atom-delete-node name)
|
362 |
| - (log/info "Deleted node" name) |
363 |
| - {:status 200, :body {:deleted true}}) |
| 364 | + (log/info "Removed node" name "from the list of nodes") |
| 365 | + (.close node) |
| 366 | + (loop [] |
| 367 | + (let [num-queries (get @active-queries-per-node name 0)] |
| 368 | + (if (= num-queries 0) |
| 369 | + (do |
| 370 | + ;; No queries are running anymore so we can delete the files. |
| 371 | + (run! io/delete-file (reverse (file-seq (io/file node-dir name)))) |
| 372 | + (log/info "Deleted files for node" name) |
| 373 | + {:status 200, :body {:deleted true}}) |
| 374 | + (do |
| 375 | + (log/info "Sleeping 100 ms because of" num-queries "active queries for node" name) |
| 376 | + (Thread/sleep 100) |
| 377 | + (recur)))))) |
364 | 378 | {:status 404, :body {:error "Node not found"}})))
|
365 | 379 |
|
| 380 | +(defn- active-query-inc [active-queries name] |
| 381 | + (assoc active-queries name (inc (get active-queries name 0)))) |
| 382 | + |
| 383 | +(defn- active-query-dec [active-queries name] |
| 384 | + (assoc active-queries name (dec (get active-queries name)))) |
| 385 | + |
366 | 386 | (defn- lookup-node [handler]
|
367 | 387 | (fn [request]
|
368 | 388 | (let [node-name (get-in request [:path-params :node])]
|
369 | 389 | (if node-name
|
370 | 390 | (let [node (get @nodes node-name)]
|
371 | 391 | (if node
|
372 |
| - (handler (assoc request :xtdb-node node)) |
| 392 | + (do |
| 393 | + (swap! active-queries-per-node active-query-inc node-name) |
| 394 | + (let [ret (handler (assoc request :xtdb-node node))] |
| 395 | + (swap! active-queries-per-node active-query-dec node-name) |
| 396 | + ret)) |
373 | 397 | {:status 404, :body {:error "Node not found"}}))
|
374 | 398 | (handler request)))))
|
375 | 399 |
|
|
0 commit comments