Skip to content

Multi exec on cluster #3611

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ vagrant/.vagrant
.cache
.eggs
.idea
.vscode
.coverage
env
venv
Expand Down
1 change: 1 addition & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Support transactions in ClusterPipeline
* Removing support for RedisGraph module. RedisGraph support is deprecated since Redis Stack 7.2 (https://redis.com/blog/redisgraph-eol/)
* Fix lock.extend() typedef to accept float TTL extension
* Update URL in the readme linking to Redis University
Expand Down
53 changes: 39 additions & 14 deletions docs/advanced_features.rst
Original file line number Diff line number Diff line change
Expand Up @@ -177,20 +177,45 @@ the server.
... pipe.set('foo1', 'bar1').get('foo1').execute()
[True, b'bar1']

Please note: - RedisCluster pipelines currently only support key-based
commands. - The pipeline gets its ‘read_from_replicas’ value from the
cluster’s parameter. Thus, if read from replications is enabled in the
cluster instance, the pipeline will also direct read commands to
replicas. - The ‘transaction’ option is NOT supported in cluster-mode.
In non-cluster mode, the ‘transaction’ option is available when
executing pipelines. This wraps the pipeline commands with MULTI/EXEC
commands, and effectively turns the pipeline commands into a single
transaction block. This means that all commands are executed
sequentially without any interruptions from other clients. However, in
cluster-mode this is not possible, because commands are partitioned
according to their respective destination nodes. This means that we can
not turn the pipeline commands into one transaction block, because in
most cases they are split up into several smaller pipelines.
Please note:

- RedisCluster pipelines currently only support key-based commands.
- The pipeline gets its ‘read_from_replicas’ value from the
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new preferred aproach is to configure load_balancing_strategy, starting from this release the read_from_replicas is deprecated.

cluster’s parameter. Thus, if read from replications is enabled in
the cluster instance, the pipeline will also direct read commands to
replicas.


Transactions in clusters
~~~~~~~~~~~~~~~~~~~~~~~~

Transactions are supported in cluster-mode with one caveat: all keys of
all commands issued on a transaction pipeline must reside on the
same slot. This is similar to the limitation of multikey commands in
cluster. The reason behind this is that the Redis engine does not offer
a mechanism to block or exchange key data across nodes on the fly. A
client may add some logic to abstract engine limitations when running
on a cluster, such as the pipeline behavior explained on the previous
block, but there is no simple way that a client can enforce atomicity
across nodes on a distributed system.

The compromise of limiting the transaction pipeline to same-slot keys
is exactly that: a compromise. While this behavior is differnet from
non-transactional cluster pipelines, it simplifies migration of clients
from standalone to cluster under some circumstances. Note that application
code that issues multi/exec commands on a standalone client without
embedding them within a pipeline would eventually get ‘AttributeError’s.
With this approach, if the application uses ‘client.pipeline(transaction=True)’,
then switching the client with a cluster-aware instance would simplify
code changes (to some extent). This may be true for application code that
makes use of hash keys, since its transactions may are already be
mapping all commands to the same slot.

An alternative is some kind of two-step commit solution, where a slot
validation is run before the actual commands are run. This could work
with controlled node maintenance but does not cover single node failures.



Publish / Subscribe
-------------------
Expand Down
6 changes: 6 additions & 0 deletions redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
BusyLoadingError,
ChildDeadlockedError,
ConnectionError,
CrossSlotTransactionError,
DataError,
InvalidPipelineStack,
InvalidResponse,
OutOfMemoryError,
PubSubError,
ReadOnlyError,
RedisClusterException,
RedisError,
ResponseError,
TimeoutError,
Expand Down Expand Up @@ -56,15 +59,18 @@ def int_or_str(value):
"ConnectionError",
"ConnectionPool",
"CredentialProvider",
"CrossSlotTransactionError",
"DataError",
"from_url",
"default_backoff",
"InvalidPipelineStack",
"InvalidResponse",
"OutOfMemoryError",
"PubSubError",
"ReadOnlyError",
"Redis",
"RedisCluster",
"RedisClusterException",
"RedisError",
"ResponseError",
"Sentinel",
Expand Down
15 changes: 12 additions & 3 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from redis.commands.core import Script
from redis.connection import (
AbstractConnection,
Connection,
ConnectionPool,
SSLConnection,
UnixDomainSocketConnection,
Expand Down Expand Up @@ -1297,9 +1298,15 @@ class Pipeline(Redis):

UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}

def __init__(self, connection_pool, response_callbacks, transaction, shard_hint):
def __init__(
self,
connection_pool: ConnectionPool,
response_callbacks,
transaction,
shard_hint,
):
self.connection_pool = connection_pool
self.connection = None
self.connection: Optional[Connection] = None
self.response_callbacks = response_callbacks
self.transaction = transaction
self.shard_hint = shard_hint
Expand Down Expand Up @@ -1434,7 +1441,9 @@ def pipeline_execute_command(self, *args, **options) -> "Pipeline":
self.command_stack.append((args, options))
return self

def _execute_transaction(self, connection, commands, raise_on_error) -> List:
def _execute_transaction(
self, connection: Connection, commands, raise_on_error
) -> List:
cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})])
all_cmds = connection.pack_commands(
[args for args, options in cmds if EMPTY_RESPONSE not in options]
Expand Down
Loading