Skip to content

Multi exec on cluster #3611

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

robertosantamaria-scopely
Copy link

@robertosantamaria-scopely robertosantamaria-scopely commented Apr 21, 2025

Pull Request check-list

Please make sure to review and check all of these items:

  • Do tests and lints pass with this change?
  • Do the CI tests pass with this change (enable it first in your forked repo and wait for the github action build to finish)?
  • Is the new or changed code fully tested?
  • Is a documentation update included (if this change modifies existing APIs, or introduces new ones)?
  • Is there an example added to the examples folder (if applicable)?
  • Was the change added to CHANGES file?

NOTE: these things are not required to open a PR and can be done
afterwards / while the PR is open.

Description of change

This adds support for transactions in cluster clients. These transactions are based on multi/watch/exec, same as for standalone client, but are limited to a single hash slot.

@robertosantamaria-scopely robertosantamaria-scopely force-pushed the multi-exec-on-cluster branch 2 times, most recently from dee69d2 to 9937904 Compare April 22, 2025 08:06
@robertosantamaria-scopely
Copy link
Author

@elena-kolevska this needs workflow approval, thanks

Adds support for transactions based on multi/watch/exec on clusters.
Transactions in this mode are limited to a single hash slot.

Contributed-by: Scopely <[email protected]>
while True:
try:
if watches:
pipe.watch(*watches)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation doesn't provides an option to execute commands between WATCH and MULTI which makes it impossible to reproduce the following case:

WATCH mykey
val = GET mykey
val = val + 1
MULTI
SET mykey $val
EXEC

func_value = func(pipe)
exec_value = pipe.execute()
return func_value if value_from_callable else exec_value
except WatchError:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We definitely want to restrict the amount of retries regardless of exception type

redis/cluster.py Outdated
self.transaction_connection = None

if not self.transaction_connection:
self.transaction_connection = get_connection(redis_node, ("INFO",))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second argument is deprecated and simply could be omitted

self._validate_watch()

if slot_number is not None:
if self.pipeline_slots and slot_number not in self.pipeline_slots:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks a bit confusing to me, transaction should be exactly executed against single hash slot, why do we check for a set of slots?

retry_on_timeout is not set,
or the error is not a TimeoutError
"""
if not conn:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mutual exclusive check, conn is required

def _execute_transaction(
self, stack: List["PipelineCommand"], raise_on_error: bool
):
if len(self.pipeline_slots) > 1:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned above looks confusing, I think it makes sense to keep a transaction slot as a separate variable

if self.watching:
self.reset()
raise WatchError(
"A ConnectionError occurred on while watching one or more keys"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you might have other connection errors as well, so it will be better if you extract the error type and add it as part of the message: you can extract the type with type(error).name

if self.cluster_error:
raise RedisClusterException("Cluster error ocurred while watching keys")

if self.slot_migrating or self.cluster_error:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest using a well-defined default set of connection-related error types, and checking whether the raised error belongs to this set, rather than relying on the cluster_error flag.

You could improve error handling by splitting the current ERRORS_ALLOW_RETRY tuple into two distinct groups:

  • One for errors that require cluster reinitialization due to connectivity issues (e.g., ConnectionError, OSError, ClusterDownError).

  • Another for logical errors like MOVED and ASK, which indicate slot redirection.

This separation would allow for more precise control over error handling.

Currently, both slot migration errors and general cluster errors trigger the same behavior—waiting for multiple failures before reinitializing the cluster. This could lead to unnecessary delays in reinitialization when a cluster error occurs. Additionally, relying on the cluster_error flag might lead to unexpected behavior if an error is raised in a context where that flag hasn’t been properly set.

def _execute_transaction_with_retries(
self, stack: List["PipelineCommand"], raise_on_error: bool
):
retry = Retry(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Retry object should be provided through client initialization rather than being hardcoded within private methods.
Hardcoding it forces all users of the library to rely on a specific retry configuration, without offering any flexibility to customize it externally.

self.cluster_error = False
self.executing = False

def _execute_transaction_with_retries(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have just one _execute_transaction and the retry behavior should depend on the client retry object configured. If there is retry object configured with several retries - the flow should be retried, if we have zero retries and NoBackoff configured - we will have just one execution of the error handler to cleanup the state.

Please note:

- RedisCluster pipelines currently only support key-based commands.
- The pipeline gets its ‘read_from_replicas’ value from the
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new preferred aproach is to configure load_balancing_strategy, starting from this release the read_from_replicas is deprecated.

self.transaction_connection: Optional[Connection] = None
self.pipeline_slots: Set[int] = set()
self.slot_migrating = False
self.cluster_error = False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest not to rely on this property for triggering cluster topology reinitialization. We can miss a place where we should set it to True.

)

def immediate_execute_command(self, *args, **options):
retry = Retry(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as in _execute_transaction_with_retries for the Retry object.

@petyaslavova petyaslavova added the feature New feature label Apr 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature New feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants