From c87e01f6b96009f408bfcf0b516bb7c11009ff5d Mon Sep 17 00:00:00 2001 From: Andrew Brookins Date: Wed, 12 Feb 2025 13:59:37 -0800 Subject: [PATCH 1/6] Add an "invoke formatters" command --- CONTRIBUTING.md | 6 ++++++ tasks.py | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index d87e6ba1c3..f02ebcc43b 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -81,6 +81,12 @@ using `invoke standalone-tests`; similarly, RedisCluster tests can be run by usi Each run of tests starts and stops the various dockers required. Sometimes things get stuck, an `invoke clean` can help. +## Linting and Formatting + +Call `invoke linters` to run linters without also running tests. This command will +only report issues, not fix them automatically. Run `invoke formatters` to +automatically format your code. + ## Documentation If relevant, update the code documentation, via docstrings, or in `/docs`. diff --git a/tasks.py b/tasks.py index f7b728aed4..f318da5608 100644 --- a/tasks.py +++ b/tasks.py @@ -33,6 +33,12 @@ def linters(c): run("vulture redis whitelist.py --min-confidence 80") run("flynt --fail-on-change --dry-run tests redis") +@task +def formatters(c): + """Format code""" + run("black --target-version py37 tests redis") + run("isort tests redis") + @task def all_tests(c): From 95693c884e9414bd5c7246e0a3535263c7fb4d7e Mon Sep 17 00:00:00 2001 From: Andrew Brookins Date: Wed, 12 Feb 2025 13:59:44 -0800 Subject: [PATCH 2/6] Fix async safety when Redis client is used as an async context manager When the async Redis client is used as an async context manager and called from different corotuines, one coroutine can exit, shutting down the client's connection pool, while another coroutine is attempting to use a connection. This results in a connection error, such as: redis.exceptions.ConnectionError: Connection closed by server. Additional locking in `ConnectionPool` resolves the problem but introduces extreme latency due to the locking. Instead, this PR implements a shielded counter that increments as callers enter the async context manager and decrements when they exit. The client then closes its connection pool only after all active contexts exit. Performance is on par with use of the client without a context manager. --- redis/asyncio/client.py | 44 +++++++++++++++++++++--- redis/asyncio/connection.py | 25 +++++++------- tests/test_asyncio/test_usage_counter.py | 18 ++++++++++ 3 files changed, 70 insertions(+), 17 deletions(-) create mode 100644 tests/test_asyncio/test_usage_counter.py diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 7c17938714..7478b8394b 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -362,6 +362,12 @@ def __init__( # on a set of redis commands self._single_conn_lock = asyncio.Lock() + # When used as an async context manager, we need to increment and decrement + # a usage counter so that we can close the connection pool when no one is + # using the client. + self._usage_counter = 0 + self._usage_lock = asyncio.Lock() + def __repr__(self): return ( f"<{self.__class__.__module__}.{self.__class__.__name__}" @@ -562,10 +568,40 @@ def client(self) -> "Redis": ) async def __aenter__(self: _RedisT) -> _RedisT: - return await self.initialize() + """ + Async context manager entry. Increments a usage counter so that the + connection pool is only closed (via aclose()) when no one is using the client. + """ + async with self._usage_lock: + self._usage_counter += 1 + current_usage = self._usage_counter + try: + # Initialize the client (i.e. establish connection, etc.) + return await self.initialize() + except Exception: + # If initialization fails, decrement the counter to keep it in sync + async with self._usage_lock: + self._usage_counter -= 1 + raise + + async def _decrement_usage(self) -> int: + """ + Helper coroutine to decrement the usage counter while holding the lock. + Returns the new value of the usage counter. + """ + async with self._usage_lock: + self._usage_counter -= 1 + return self._usage_counter async def __aexit__(self, exc_type, exc_value, traceback): - await self.aclose() + """ + Async context manager exit. Decrements a usage counter. If this is the + last exit (counter becomes zero), the client closes its connection pool. + """ + current_usage = await asyncio.shield(self._decrement_usage()) + if current_usage == 0: + # This was the last active context, so disconnect the pool. + await asyncio.shield(self.aclose()) _DEL_MESSAGE = "Unclosed Redis client" @@ -1347,9 +1383,7 @@ async def _disconnect_reset_raise(self, conn, error): # indicates the user should retry this transaction. if self.watching: await self.aclose() - raise WatchError( - "A ConnectionError occurred on while watching one or more keys" - ) + raise # if retry_on_error is not set or the error is not one # of the specified error types, raise it if ( diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index 4a743ff374..368686268b 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -1157,19 +1157,20 @@ async def disconnect(self, inuse_connections: bool = True): current in use, potentially by other tasks. Otherwise only disconnect connections that are idle in the pool. """ - if inuse_connections: - connections: Iterable[AbstractConnection] = chain( - self._available_connections, self._in_use_connections + async with self._lock: + if inuse_connections: + connections: Iterable[AbstractConnection] = chain( + self._available_connections, self._in_use_connections + ) + else: + connections = self._available_connections + resp = await asyncio.gather( + *(connection.disconnect() for connection in connections), + return_exceptions=True, ) - else: - connections = self._available_connections - resp = await asyncio.gather( - *(connection.disconnect() for connection in connections), - return_exceptions=True, - ) - exc = next((r for r in resp if isinstance(r, BaseException)), None) - if exc: - raise exc + exc = next((r for r in resp if isinstance(r, BaseException)), None) + if exc: + raise exc async def aclose(self) -> None: """Close the pool, disconnecting all connections""" diff --git a/tests/test_asyncio/test_usage_counter.py b/tests/test_asyncio/test_usage_counter.py new file mode 100644 index 0000000000..c100fde3be --- /dev/null +++ b/tests/test_asyncio/test_usage_counter.py @@ -0,0 +1,18 @@ +import asyncio + +import pytest + + +@pytest.mark.asyncio +async def test_usage_counter(create_redis): + r = await create_redis(decode_responses=True) + + async def dummy_task(): + async with r: + await asyncio.sleep(0.01) + + tasks = [dummy_task() for _ in range(20)] + await asyncio.gather(*tasks) + + # After all tasks have completed, the usage counter should be back to zero. + assert r._usage_counter == 0 From 3953ac372dd7374d0d900baa9e5e35ad740e0107 Mon Sep 17 00:00:00 2001 From: Andrew Brookins Date: Wed, 12 Feb 2025 14:34:46 -0800 Subject: [PATCH 3/6] Remove now unnecessary lock from testing --- redis/asyncio/client.py | 4 +++- redis/asyncio/connection.py | 25 ++++++++++++------------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 7478b8394b..8064f9a7e2 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -1383,7 +1383,9 @@ async def _disconnect_reset_raise(self, conn, error): # indicates the user should retry this transaction. if self.watching: await self.aclose() - raise + raise WatchError( + "A ConnectionError occurred on while watching one or more keys" + ) # if retry_on_error is not set or the error is not one # of the specified error types, raise it if ( diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index 368686268b..4a743ff374 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -1157,20 +1157,19 @@ async def disconnect(self, inuse_connections: bool = True): current in use, potentially by other tasks. Otherwise only disconnect connections that are idle in the pool. """ - async with self._lock: - if inuse_connections: - connections: Iterable[AbstractConnection] = chain( - self._available_connections, self._in_use_connections - ) - else: - connections = self._available_connections - resp = await asyncio.gather( - *(connection.disconnect() for connection in connections), - return_exceptions=True, + if inuse_connections: + connections: Iterable[AbstractConnection] = chain( + self._available_connections, self._in_use_connections ) - exc = next((r for r in resp if isinstance(r, BaseException)), None) - if exc: - raise exc + else: + connections = self._available_connections + resp = await asyncio.gather( + *(connection.disconnect() for connection in connections), + return_exceptions=True, + ) + exc = next((r for r in resp if isinstance(r, BaseException)), None) + if exc: + raise exc async def aclose(self) -> None: """Close the pool, disconnecting all connections""" From 34ce3debd09cf460e1959b88d3da09866c0ee84b Mon Sep 17 00:00:00 2001 From: Andrew Brookins Date: Wed, 12 Feb 2025 15:20:36 -0800 Subject: [PATCH 4/6] Clean up variable --- redis/asyncio/client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 8064f9a7e2..25fc40cfb4 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -574,7 +574,6 @@ async def __aenter__(self: _RedisT) -> _RedisT: """ async with self._usage_lock: self._usage_counter += 1 - current_usage = self._usage_counter try: # Initialize the client (i.e. establish connection, etc.) return await self.initialize() From 955df70be75847ae7d55cb0bdcce9bf96c71518c Mon Sep 17 00:00:00 2001 From: Andrew Brookins Date: Wed, 12 Feb 2025 16:17:32 -0800 Subject: [PATCH 5/6] fix test, apply logic to async cluster client --- redis/asyncio/client.py | 3 +- redis/asyncio/cluster.py | 42 ++++++++++++++++++++++-- tests/test_asyncio/test_usage_counter.py | 5 ++- 3 files changed, 43 insertions(+), 7 deletions(-) diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 25fc40cfb4..bcebea6740 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -570,7 +570,8 @@ def client(self) -> "Redis": async def __aenter__(self: _RedisT) -> _RedisT: """ Async context manager entry. Increments a usage counter so that the - connection pool is only closed (via aclose()) when no one is using the client. + connection pool is only closed (via aclose()) when no context is using + the client. """ async with self._usage_lock: self._usage_counter += 1 diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 87a2c16afa..b13a9ca6f5 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -379,6 +379,12 @@ def __init__( self._initialize = True self._lock: Optional[asyncio.Lock] = None + # When used as an async context manager, we need to increment and decrement + # a usage counter so that we can close the connection pool when no one is + # using the client. + self._usage_counter = 0 + self._usage_lock = asyncio.Lock() + async def initialize(self) -> "RedisCluster": """Get all nodes from startup nodes & creates connections if not initialized.""" if self._initialize: @@ -415,10 +421,40 @@ async def close(self) -> None: await self.aclose() async def __aenter__(self) -> "RedisCluster": - return await self.initialize() + """ + Async context manager entry. Increments a usage counter so that the + connection pool is only closed (via aclose()) when no context is using + the client. + """ + async with self._usage_lock: + self._usage_counter += 1 + try: + # Initialize the client (i.e. establish connection, etc.) + return await self.initialize() + except Exception: + # If initialization fails, decrement the counter to keep it in sync + async with self._usage_lock: + self._usage_counter -= 1 + raise - async def __aexit__(self, exc_type: None, exc_value: None, traceback: None) -> None: - await self.aclose() + async def _decrement_usage(self) -> int: + """ + Helper coroutine to decrement the usage counter while holding the lock. + Returns the new value of the usage counter. + """ + async with self._usage_lock: + self._usage_counter -= 1 + return self._usage_counter + + async def __aexit__(self, exc_type, exc_value, traceback): + """ + Async context manager exit. Decrements a usage counter. If this is the + last exit (counter becomes zero), the client closes its connection pool. + """ + current_usage = await asyncio.shield(self._decrement_usage()) + if current_usage == 0: + # This was the last active context, so disconnect the pool. + await asyncio.shield(self.aclose()) def __await__(self) -> Generator[Any, None, "RedisCluster"]: return self.initialize().__await__() diff --git a/tests/test_asyncio/test_usage_counter.py b/tests/test_asyncio/test_usage_counter.py index c100fde3be..7e7684071c 100644 --- a/tests/test_asyncio/test_usage_counter.py +++ b/tests/test_asyncio/test_usage_counter.py @@ -1,12 +1,11 @@ import asyncio import pytest +import redis @pytest.mark.asyncio -async def test_usage_counter(create_redis): - r = await create_redis(decode_responses=True) - +async def test_usage_counter(r): async def dummy_task(): async with r: await asyncio.sleep(0.01) From 1580dd44bc50501e4fb48fb2ad95c7f9372cf653 Mon Sep 17 00:00:00 2001 From: Andrew Brookins Date: Thu, 13 Feb 2025 07:45:44 -0800 Subject: [PATCH 6/6] linting --- tests/test_asyncio/test_usage_counter.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_asyncio/test_usage_counter.py b/tests/test_asyncio/test_usage_counter.py index 7e7684071c..566ec6b4d3 100644 --- a/tests/test_asyncio/test_usage_counter.py +++ b/tests/test_asyncio/test_usage_counter.py @@ -1,7 +1,6 @@ import asyncio import pytest -import redis @pytest.mark.asyncio