|
30 | 30 | import email.utils
|
31 | 31 | import errno
|
32 | 32 | import fcntl
|
| 33 | +import gevent |
33 | 34 | import gzip
|
34 | 35 | import logging
|
35 | 36 | import os
|
|
44 | 45 | from filetracker.utils import file_digest
|
45 | 46 |
|
46 | 47 |
|
| 48 | +_LOCK_RETRIES = 20 |
| 49 | +_LOCK_SLEEP_TIME_S = 1 |
| 50 | + |
| 51 | + |
47 | 52 | logger = logging.getLogger(__name__)
|
48 | 53 |
|
49 | 54 |
|
50 | 55 | class FiletrackerFileNotFoundError(Exception):
|
51 | 56 | pass
|
52 | 57 |
|
53 | 58 |
|
| 59 | +class ConcurrentModificationError(Exception): |
| 60 | + """Raised after acquiring lock failed multiple times.""" |
| 61 | + def __init__(self, lock_name): |
| 62 | + message = 'Failed to acquire lock: {}'.format(lock_name) |
| 63 | + super(ConcurrentModificationError, self).__init__(self, message) |
| 64 | + |
| 65 | + |
54 | 66 | class FileStorage(object):
|
55 | 67 | """Manages the whole file storage."""
|
56 | 68 |
|
@@ -422,10 +434,32 @@ def _exclusive_lock(path):
|
422 | 434 | fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o600)
|
423 | 435 |
|
424 | 436 | try:
|
425 |
| - fcntl.flock(fd, fcntl.LOCK_EX) |
426 |
| - yield |
| 437 | + retries_left = _LOCK_RETRIES |
| 438 | + success = False |
| 439 | + |
| 440 | + while retries_left > 0: |
| 441 | + # try to acquire the lock in a loop |
| 442 | + # because gevent doesn't treat flock as IO, |
| 443 | + # so waiting here without yielding would get the worker killed |
| 444 | + try: |
| 445 | + fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) |
| 446 | + success = True |
| 447 | + break |
| 448 | + except IOError as e: |
| 449 | + if e.errno in [errno.EAGAIN, errno.EWOULDBLOCK]: |
| 450 | + # This yields execution to other green threads. |
| 451 | + gevent.sleep(_LOCK_SLEEP_TIME_S) |
| 452 | + retries_left -= 1 |
| 453 | + else: |
| 454 | + raise |
| 455 | + |
| 456 | + if success: |
| 457 | + yield |
| 458 | + else: |
| 459 | + raise ConcurrentModificationError(path) |
427 | 460 | finally:
|
428 |
| - fcntl.flock(fd, fcntl.LOCK_UN) |
| 461 | + if success: |
| 462 | + fcntl.flock(fd, fcntl.LOCK_UN) |
429 | 463 | os.close(fd)
|
430 | 464 |
|
431 | 465 |
|
|
0 commit comments