From e4d431a97619303c3b3e40c75044bae502d4ff74 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Sat, 29 Mar 2025 08:37:13 -0400 Subject: [PATCH] Make guard to have only one pending reconnect ControlConnection.reconnect could be calld from many places in parallel. When it happens you get to see streak for reconnects happning one by one. This commit adds guard that makes sure there is only one pending reconnect. --- cassandra/cluster.py | 91 ++++++++-------------- tests/integration/standard/test_cluster.py | 55 +++++++++++++ 2 files changed, 89 insertions(+), 57 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 17e113e7aa..6077fc270a 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -3546,30 +3546,6 @@ class UserTypeDoesNotExist(Exception): pass -class _ControlReconnectionHandler(_ReconnectionHandler): - """ - Internal - """ - - def __init__(self, control_connection, *args, **kwargs): - _ReconnectionHandler.__init__(self, *args, **kwargs) - self.control_connection = weakref.proxy(control_connection) - - def try_reconnect(self): - return self.control_connection._reconnect_internal() - - def on_reconnection(self, connection): - self.control_connection._set_new_connection(connection) - - def on_exception(self, exc, next_delay): - # TODO only overridden to add logging, so add logging - if isinstance(exc, AuthenticationFailed): - return False - else: - log.debug("Error trying to reconnect control connection: %r", exc) - return True - - def _watch_callback(obj_weakref, method_name, *args, **kwargs): """ A callback handler for the ControlConnection that tolerates @@ -3662,6 +3638,7 @@ def __init__(self, cluster, timeout, self._reconnection_handler = None self._reconnection_lock = RLock() + self._reconnection_pending = False self._event_schedule_times = {} @@ -3695,6 +3672,8 @@ def _connect_host_in_lbp(self): ) for host in lbp.make_query_plan(): + if self._is_shutdown: + break try: return (self._try_connect(host), None) except ConnectionException as exc: @@ -3818,44 +3797,47 @@ def reconnect(self): if self._is_shutdown: return + with self._reconnection_lock: + if self._reconnection_pending: + return + self._reconnection_pending = True + self._submit(self._reconnect) - def _reconnect(self): + def _reconnect(self, schedule = None): log.debug("[control connection] Attempting to reconnect") + if self._is_shutdown: + return + try: self._set_new_connection(self._reconnect_internal()) + self._reconnection_pending = False + return except NoHostAvailable: - # make a retry schedule (which includes backoff) - schedule = self._cluster.reconnection_policy.new_schedule() + log.debug("[control connection] Reconnection plan is exhausted, scheduling new reconnection attempt") + except Exception as ex: + log.debug("[control connection] Unexpected exception during reconnect, scheduling new reconnection attempt: %s", ex) - with self._reconnection_lock: + if schedule is None: + schedule = self._cluster.reconnection_policy.new_schedule() - # cancel existing reconnection attempts - if self._reconnection_handler: - self._reconnection_handler.cancel() + try: + next_delay = next(schedule) + except StopIteration: + # the schedule has been exhausted + schedule = self._cluster.reconnection_policy.new_schedule() + try: + next_delay = next(schedule) + except StopIteration: + next_delay = 0 - # when a connection is successfully made, _set_new_connection - # will be called with the new connection and then our - # _reconnection_handler will be cleared out - self._reconnection_handler = _ControlReconnectionHandler( - self, self._cluster.scheduler, schedule, - self._get_and_set_reconnection_handler, - new_handler=None) - self._reconnection_handler.start() - except Exception: - log.debug("[control connection] error reconnecting", exc_info=True) - raise + if self._is_shutdown: + return - def _get_and_set_reconnection_handler(self, new_handler): - """ - Called by the _ControlReconnectionHandler when a new connection - is successfully created. Clears out the _reconnection_handler on - this ControlConnection. - """ - with self._reconnection_lock: - old = self._reconnection_handler - self._reconnection_handler = new_handler - return old + if next_delay == 0: + self._submit(self._reconnect) + else: + self._cluster.scheduler.schedule(next_delay, partial(self._reconnect, schedule)) def _submit(self, *args, **kwargs): try: @@ -3866,11 +3848,6 @@ def _submit(self, *args, **kwargs): return None def shutdown(self): - # stop trying to reconnect (if we are) - with self._reconnection_lock: - if self._reconnection_handler: - self._reconnection_handler.cancel() - with self._lock: if self._is_shutdown: return diff --git a/tests/integration/standard/test_cluster.py b/tests/integration/standard/test_cluster.py index cdfc7c1b82..e57b5387f2 100644 --- a/tests/integration/standard/test_cluster.py +++ b/tests/integration/standard/test_cluster.py @@ -326,6 +326,61 @@ def test_invalid_protocol_negotation(self): cluster.connect() cluster.shutdown() + def test_control_connection_reconnect(self): + """ + Ensure clusters that connect on a keyspace, do + """ + cassandra.cluster.log.setLevel(logging.DEBUG) + + cluster = TestCluster() + _ = cluster.connect() + + cluster.control_connection._reconnect_internal = Mock(wraps=cluster.control_connection._reconnect_internal) + + cluster.control_connection.reconnect() + cluster.control_connection.reconnect() + cluster.control_connection.reconnect() + cluster.control_connection.reconnect() + + while cluster.control_connection._reconnection_pending: + time.sleep(0.1) + + self.assertFalse(cluster.control_connection._connection.is_closed) + self.assertFalse(cluster.control_connection._connection.is_defunct) + self.assertTrue(cluster.control_connection.refresh_schema()) + cluster.control_connection._reconnect_internal.assert_called_once() + + def test_control_connection_reconnect_rescheduled(self): + """ + Ensure clusters that connect on a keyspace, do + """ + cassandra.cluster.log.setLevel(logging.DEBUG) + + cluster = TestCluster() + _ = cluster.connect() + + original_reconnect_internal = cluster.control_connection._reconnect_internal + def _throw(*args): + cluster.control_connection._reconnect_internal = Mock(wraps=original_reconnect_internal) + raise NoHostAvailable("Unable to connect to any servers") + + cluster.scheduler.schedule = Mock(wraps=cluster.scheduler.schedule) + cluster.control_connection._reconnect_internal = _throw + + cluster.control_connection.reconnect() + cluster.control_connection.reconnect() + cluster.control_connection.reconnect() + cluster.control_connection.reconnect() + + while cluster.control_connection._reconnection_pending: + time.sleep(0.1) + + self.assertFalse(cluster.control_connection._connection.is_closed) + self.assertFalse(cluster.control_connection._connection.is_defunct) + self.assertTrue(cluster.control_connection.refresh_schema()) + cluster.control_connection._reconnect_internal.assert_called_once() + cluster.scheduler.schedule.assert_called_once() + def test_connect_on_keyspace(self): """ Ensure clusters that connect on a keyspace, do