-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Multi exec on cluster #3611
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Multi exec on cluster #3611
Conversation
dee69d2
to
9937904
Compare
@elena-kolevska this needs workflow approval, thanks |
Adds support for transactions based on multi/watch/exec on clusters. Transactions in this mode are limited to a single hash slot. Contributed-by: Scopely <[email protected]>
9937904
to
f9cc550
Compare
while True: | ||
try: | ||
if watches: | ||
pipe.watch(*watches) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation doesn't provides an option to execute commands between WATCH and MULTI which makes it impossible to reproduce the following case:
WATCH mykey
val = GET mykey
val = val + 1
MULTI
SET mykey $val
EXEC
func_value = func(pipe) | ||
exec_value = pipe.execute() | ||
return func_value if value_from_callable else exec_value | ||
except WatchError: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We definitely want to restrict the amount of retries regardless of exception type
redis/cluster.py
Outdated
self.transaction_connection = None | ||
|
||
if not self.transaction_connection: | ||
self.transaction_connection = get_connection(redis_node, ("INFO",)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The second argument is deprecated and simply could be omitted
self._validate_watch() | ||
|
||
if slot_number is not None: | ||
if self.pipeline_slots and slot_number not in self.pipeline_slots: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks a bit confusing to me, transaction should be exactly executed against single hash slot, why do we check for a set of slots?
retry_on_timeout is not set, | ||
or the error is not a TimeoutError | ||
""" | ||
if not conn: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mutual exclusive check, conn is required
def _execute_transaction( | ||
self, stack: List["PipelineCommand"], raise_on_error: bool | ||
): | ||
if len(self.pipeline_slots) > 1: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I mentioned above looks confusing, I think it makes sense to keep a transaction slot as a separate variable
if self.watching: | ||
self.reset() | ||
raise WatchError( | ||
"A ConnectionError occurred on while watching one or more keys" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here you might have other connection errors as well, so it will be better if you extract the error type and add it as part of the message: you can extract the type with type(error).name
if self.cluster_error: | ||
raise RedisClusterException("Cluster error ocurred while watching keys") | ||
|
||
if self.slot_migrating or self.cluster_error: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest using a well-defined default set of connection-related error types, and checking whether the raised error belongs to this set, rather than relying on the cluster_error flag.
You could improve error handling by splitting the current ERRORS_ALLOW_RETRY tuple into two distinct groups:
-
One for errors that require cluster reinitialization due to connectivity issues (e.g., ConnectionError, OSError, ClusterDownError).
-
Another for logical errors like MOVED and ASK, which indicate slot redirection.
This separation would allow for more precise control over error handling.
Currently, both slot migration errors and general cluster errors trigger the same behavior—waiting for multiple failures before reinitializing the cluster. This could lead to unnecessary delays in reinitialization when a cluster error occurs. Additionally, relying on the cluster_error flag might lead to unexpected behavior if an error is raised in a context where that flag hasn’t been properly set.
def _execute_transaction_with_retries( | ||
self, stack: List["PipelineCommand"], raise_on_error: bool | ||
): | ||
retry = Retry( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Retry object should be provided through client initialization rather than being hardcoded within private methods.
Hardcoding it forces all users of the library to rely on a specific retry configuration, without offering any flexibility to customize it externally.
self.cluster_error = False | ||
self.executing = False | ||
|
||
def _execute_transaction_with_retries( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should have just one _execute_transaction and the retry behavior should depend on the client retry object configured. If there is retry object configured with several retries - the flow should be retried, if we have zero retries and NoBackoff configured - we will have just one execution of the error handler to cleanup the state.
Please note: | ||
|
||
- RedisCluster pipelines currently only support key-based commands. | ||
- The pipeline gets its ‘read_from_replicas’ value from the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new preferred aproach is to configure load_balancing_strategy, starting from this release the read_from_replicas is deprecated.
self.transaction_connection: Optional[Connection] = None | ||
self.pipeline_slots: Set[int] = set() | ||
self.slot_migrating = False | ||
self.cluster_error = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest not to rely on this property for triggering cluster topology reinitialization. We can miss a place where we should set it to True.
) | ||
|
||
def immediate_execute_command(self, *args, **options): | ||
retry = Retry( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as in _execute_transaction_with_retries for the Retry object.
Pull Request check-list
Please make sure to review and check all of these items:
NOTE: these things are not required to open a PR and can be done
afterwards / while the PR is open.
Description of change
This adds support for transactions in cluster clients. These transactions are based on multi/watch/exec, same as for standalone client, but are limited to a single hash slot.