Skip to content

Commit 6978275

Browse files
dvora-hchayim
andauthored
Client Side Caching: Alpha support (#3038)
* CSC * get keys from command * fix review comments * return respone in execute_command * fix tests * fix comments * linters --------- Co-authored-by: Chayim <[email protected]>
1 parent 0113034 commit 6978275

File tree

14 files changed

+492
-82
lines changed

14 files changed

+492
-82
lines changed

redis/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from redis import asyncio # noqa
44
from redis.backoff import default_backoff
5+
from redis.cache import _LocalChace
56
from redis.client import Redis, StrictRedis
67
from redis.cluster import RedisCluster
78
from redis.connection import (
@@ -61,6 +62,7 @@ def int_or_str(value):
6162
VERSION = tuple([99, 99, 99])
6263

6364
__all__ = [
65+
"_LocalChace",
6466
"AuthenticationError",
6567
"AuthenticationWrongNumberOfArgsError",
6668
"BlockingConnectionPool",

redis/asyncio/client.py

+2
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,7 @@ async def _disconnect_raise(self, conn: Connection, error: Exception):
597597
async def execute_command(self, *args, **options):
598598
"""Execute a command and return a parsed response"""
599599
await self.initialize()
600+
options.pop("keys", None) # the keys are used only for client side caching
600601
pool = self.connection_pool
601602
command_name = args[0]
602603
conn = self.connection or await pool.get_connection(command_name, **options)
@@ -1275,6 +1276,7 @@ def multi(self):
12751276
def execute_command(
12761277
self, *args, **kwargs
12771278
) -> Union["Pipeline", Awaitable["Pipeline"]]:
1279+
kwargs.pop("keys", None) # the keys are used only for client side caching
12781280
if (self.watching or args[0] == "WATCH") and not self.explicit_transaction:
12791281
return self.immediate_execute_command(*args, **kwargs)
12801282
return self.pipeline_execute_command(*args, **kwargs)

redis/asyncio/cluster.py

+2
Original file line numberDiff line numberDiff line change
@@ -682,6 +682,7 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any:
682682
:raises RedisClusterException: if target_nodes is not provided & the command
683683
can't be mapped to a slot
684684
"""
685+
kwargs.pop("keys", None) # the keys are used only for client side caching
685686
command = args[0]
686687
target_nodes = []
687688
target_nodes_specified = False
@@ -1447,6 +1448,7 @@ def execute_command(
14471448
or List[:class:`~.ClusterNode`] or Dict[Any, :class:`~.ClusterNode`]
14481449
- Rest of the kwargs are passed to the Redis connection
14491450
"""
1451+
kwargs.pop("keys", None) # the keys are used only for client side caching
14501452
self._command_stack.append(
14511453
PipelineCommand(len(self._command_stack), *args, **kwargs)
14521454
)

redis/asyncio/sentinel.py

+1
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ async def execute_command(self, *args, **kwargs):
220220
once - If set to True, then execute the resulting command on a single
221221
node at random, rather than across the entire sentinel cluster.
222222
"""
223+
kwargs.pop("keys", None) # the keys are used only for client side caching
223224
once = bool(kwargs.get("once", False))
224225
if "once" in kwargs.keys():
225226
kwargs.pop("once")

redis/cache.py

+326
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,326 @@
1+
import random
2+
import time
3+
from collections import OrderedDict, defaultdict
4+
from enum import Enum
5+
from typing import List
6+
7+
from redis.typing import KeyT, ResponseT
8+
9+
DEFAULT_EVICTION_POLICY = "lru"
10+
11+
12+
DEFAULT_BLACKLIST = [
13+
"BF.CARD",
14+
"BF.DEBUG",
15+
"BF.EXISTS",
16+
"BF.INFO",
17+
"BF.MEXISTS",
18+
"BF.SCANDUMP",
19+
"CF.COMPACT",
20+
"CF.COUNT",
21+
"CF.DEBUG",
22+
"CF.EXISTS",
23+
"CF.INFO",
24+
"CF.MEXISTS",
25+
"CF.SCANDUMP",
26+
"CMS.INFO",
27+
"CMS.QUERY",
28+
"DUMP",
29+
"EXPIRETIME",
30+
"FT.AGGREGATE",
31+
"FT.ALIASADD",
32+
"FT.ALIASDEL",
33+
"FT.ALIASUPDATE",
34+
"FT.CURSOR",
35+
"FT.EXPLAIN",
36+
"FT.EXPLAINCLI",
37+
"FT.GET",
38+
"FT.INFO",
39+
"FT.MGET",
40+
"FT.PROFILE",
41+
"FT.SEARCH",
42+
"FT.SPELLCHECK",
43+
"FT.SUGGET",
44+
"FT.SUGLEN",
45+
"FT.SYNDUMP",
46+
"FT.TAGVALS",
47+
"FT._ALIASADDIFNX",
48+
"FT._ALIASDELIFX",
49+
"HRANDFIELD",
50+
"JSON.DEBUG",
51+
"PEXPIRETIME",
52+
"PFCOUNT",
53+
"PTTL",
54+
"SRANDMEMBER",
55+
"TDIGEST.BYRANK",
56+
"TDIGEST.BYREVRANK",
57+
"TDIGEST.CDF",
58+
"TDIGEST.INFO",
59+
"TDIGEST.MAX",
60+
"TDIGEST.MIN",
61+
"TDIGEST.QUANTILE",
62+
"TDIGEST.RANK",
63+
"TDIGEST.REVRANK",
64+
"TDIGEST.TRIMMED_MEAN",
65+
"TOPK.INFO",
66+
"TOPK.LIST",
67+
"TOPK.QUERY",
68+
"TOUCH",
69+
"TTL",
70+
]
71+
72+
73+
DEFAULT_WHITELIST = [
74+
"BITCOUNT",
75+
"BITFIELD_RO",
76+
"BITPOS",
77+
"EXISTS",
78+
"GEODIST",
79+
"GEOHASH",
80+
"GEOPOS",
81+
"GEORADIUSBYMEMBER_RO",
82+
"GEORADIUS_RO",
83+
"GEOSEARCH",
84+
"GET",
85+
"GETBIT",
86+
"GETRANGE",
87+
"HEXISTS",
88+
"HGET",
89+
"HGETALL",
90+
"HKEYS",
91+
"HLEN",
92+
"HMGET",
93+
"HSTRLEN",
94+
"HVALS",
95+
"JSON.ARRINDEX",
96+
"JSON.ARRLEN",
97+
"JSON.GET",
98+
"JSON.MGET",
99+
"JSON.OBJKEYS",
100+
"JSON.OBJLEN",
101+
"JSON.RESP",
102+
"JSON.STRLEN",
103+
"JSON.TYPE",
104+
"LCS",
105+
"LINDEX",
106+
"LLEN",
107+
"LPOS",
108+
"LRANGE",
109+
"MGET",
110+
"SCARD",
111+
"SDIFF",
112+
"SINTER",
113+
"SINTERCARD",
114+
"SISMEMBER",
115+
"SMEMBERS",
116+
"SMISMEMBER",
117+
"SORT_RO",
118+
"STRLEN",
119+
"SUBSTR",
120+
"SUNION",
121+
"TS.GET",
122+
"TS.INFO",
123+
"TS.RANGE",
124+
"TS.REVRANGE",
125+
"TYPE",
126+
"XLEN",
127+
"XPENDING",
128+
"XRANGE",
129+
"XREAD",
130+
"XREVRANGE",
131+
"ZCARD",
132+
"ZCOUNT",
133+
"ZDIFF",
134+
"ZINTER",
135+
"ZINTERCARD",
136+
"ZLEXCOUNT",
137+
"ZMSCORE",
138+
"ZRANGE",
139+
"ZRANGEBYLEX",
140+
"ZRANGEBYSCORE",
141+
"ZRANK",
142+
"ZREVRANGE",
143+
"ZREVRANGEBYLEX",
144+
"ZREVRANGEBYSCORE",
145+
"ZREVRANK",
146+
"ZSCORE",
147+
"ZUNION",
148+
]
149+
150+
_RESPONSE = "response"
151+
_KEYS = "keys"
152+
_CTIME = "ctime"
153+
_ACCESS_COUNT = "access_count"
154+
155+
156+
class EvictionPolicy(Enum):
157+
LRU = "lru"
158+
LFU = "lfu"
159+
RANDOM = "random"
160+
161+
162+
class _LocalChace:
163+
"""
164+
A caching mechanism for storing redis commands and their responses.
165+
166+
Args:
167+
max_size (int): The maximum number of commands to be stored in the cache.
168+
ttl (int): The time-to-live for each command in seconds.
169+
eviction_policy (EvictionPolicy): The eviction policy to use for removing commands when the cache is full.
170+
171+
Attributes:
172+
max_size (int): The maximum number of commands to be stored in the cache.
173+
ttl (int): The time-to-live for each command in seconds.
174+
eviction_policy (EvictionPolicy): The eviction policy used for cache management.
175+
cache (OrderedDict): The ordered dictionary to store commands and their metadata.
176+
key_commands_map (defaultdict): A mapping of keys to the set of commands that use each key.
177+
commands_ttl_list (list): A list to keep track of the commands in the order they were added. # noqa
178+
"""
179+
180+
def __init__(
181+
self, max_size: int, ttl: int, eviction_policy: EvictionPolicy, **kwargs
182+
):
183+
self.max_size = max_size
184+
self.ttl = ttl
185+
self.eviction_policy = eviction_policy
186+
self.cache = OrderedDict()
187+
self.key_commands_map = defaultdict(set)
188+
self.commands_ttl_list = []
189+
190+
def set(self, command: str, response: ResponseT, keys_in_command: List[KeyT]):
191+
"""
192+
Set a redis command and its response in the cache.
193+
194+
Args:
195+
command (str): The redis command.
196+
response (ResponseT): The response associated with the command.
197+
keys_in_command (List[KeyT]): The list of keys used in the command.
198+
"""
199+
if len(self.cache) >= self.max_size:
200+
self._evict()
201+
self.cache[command] = {
202+
_RESPONSE: response,
203+
_KEYS: keys_in_command,
204+
_CTIME: time.monotonic(),
205+
_ACCESS_COUNT: 0, # Used only for LFU
206+
}
207+
self._update_key_commands_map(keys_in_command, command)
208+
self.commands_ttl_list.append(command)
209+
210+
def get(self, command: str) -> ResponseT:
211+
"""
212+
Get the response for a redis command from the cache.
213+
214+
Args:
215+
command (str): The redis command.
216+
217+
Returns:
218+
ResponseT: The response associated with the command, or None if the command is not in the cache. # noqa
219+
"""
220+
if command in self.cache:
221+
if self._is_expired(command):
222+
self.delete(command)
223+
self._update_access(command)
224+
return self.cache[command]["response"]
225+
226+
def delete(self, command: str):
227+
"""
228+
Delete a redis command and its metadata from the cache.
229+
230+
Args:
231+
command (str): The redis command to be deleted.
232+
"""
233+
if command in self.cache:
234+
keys_in_command = self.cache[command].get("keys")
235+
self._del_key_commands_map(keys_in_command, command)
236+
self.commands_ttl_list.remove(command)
237+
del self.cache[command]
238+
239+
def delete_many(self, commands):
240+
pass
241+
242+
def flush(self):
243+
"""Clear the entire cache, removing all redis commands and metadata."""
244+
self.cache.clear()
245+
self.key_commands_map.clear()
246+
self.commands_ttl_list = []
247+
248+
def _is_expired(self, command: str) -> bool:
249+
"""
250+
Check if a redis command has expired based on its time-to-live.
251+
252+
Args:
253+
command (str): The redis command.
254+
255+
Returns:
256+
bool: True if the command has expired, False otherwise.
257+
"""
258+
if self.ttl == 0:
259+
return False
260+
return time.monotonic() - self.cache[command]["ctime"] > self.ttl
261+
262+
def _update_access(self, command: str):
263+
"""
264+
Update the access information for a redis command based on the eviction policy.
265+
266+
Args:
267+
command (str): The redis command.
268+
"""
269+
if self.eviction_policy == EvictionPolicy.LRU:
270+
self.cache.move_to_end(command)
271+
elif self.eviction_policy == EvictionPolicy.LFU:
272+
self.cache[command]["access_count"] = (
273+
self.cache.get(command, {}).get("access_count", 0) + 1
274+
)
275+
self.cache.move_to_end(command)
276+
elif self.eviction_policy == EvictionPolicy.RANDOM:
277+
pass # Random eviction doesn't require updates
278+
279+
def _evict(self):
280+
"""Evict a redis command from the cache based on the eviction policy."""
281+
if self._is_expired(self.commands_ttl_list[0]):
282+
self.delete(self.commands_ttl_list[0])
283+
elif self.eviction_policy == EvictionPolicy.LRU:
284+
self.cache.popitem(last=False)
285+
elif self.eviction_policy == EvictionPolicy.LFU:
286+
min_access_command = min(
287+
self.cache, key=lambda k: self.cache[k].get("access_count", 0)
288+
)
289+
self.cache.pop(min_access_command)
290+
elif self.eviction_policy == EvictionPolicy.RANDOM:
291+
random_command = random.choice(list(self.cache.keys()))
292+
self.cache.pop(random_command)
293+
294+
def _update_key_commands_map(self, keys: List[KeyT], command: str):
295+
"""
296+
Update the key_commands_map with command that uses the keys.
297+
298+
Args:
299+
keys (List[KeyT]): The list of keys used in the command.
300+
command (str): The redis command.
301+
"""
302+
for key in keys:
303+
self.key_commands_map[key].add(command)
304+
305+
def _del_key_commands_map(self, keys: List[KeyT], command: str):
306+
"""
307+
Remove a redis command from the key_commands_map.
308+
309+
Args:
310+
keys (List[KeyT]): The list of keys used in the redis command.
311+
command (str): The redis command.
312+
"""
313+
for key in keys:
314+
self.key_commands_map[key].remove(command)
315+
316+
def invalidate(self, key: KeyT):
317+
"""
318+
Invalidate (delete) all redis commands associated with a specific key.
319+
320+
Args:
321+
key (KeyT): The key to be invalidated.
322+
"""
323+
if key not in self.key_commands_map:
324+
return
325+
for command in self.key_commands_map[key]:
326+
self.delete(command)

0 commit comments

Comments
 (0)