Skip to content

Commit 2d77865

Browse files
Add support for SCYLLA_USE_METADATA_ID
SCYLLA_USE_METADATA_ID extension allows using METADATA_ID (which was introduced in CQLv5) in CQLv4. This commit: - Introduce support for SCYLLA_USE_METADATA_ID in protocol extension negotation - Reuse CQLv5 metadata id implemnetation if use_metadata_id feature is enabled
1 parent a401409 commit 2d77865

File tree

3 files changed

+32
-22
lines changed

3 files changed

+32
-22
lines changed

cassandra/connection.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1110,7 +1110,7 @@ def send_msg(self, msg, request_id, cb, encoder=ProtocolHandler.encode_message,
11101110
# queue the decoder function with the request
11111111
# this allows us to inject custom functions per request to encode, decode messages
11121112
self._requests[request_id] = (cb, decoder, result_metadata)
1113-
msg = encoder(msg, request_id, self.protocol_version, compressor=self.compressor,
1113+
msg = encoder(msg, request_id, self.protocol_version, self.features, compressor=self.compressor,
11141114
allow_beta_protocol_version=self.allow_beta_protocol_version)
11151115

11161116
if self._is_checksumming_enabled:

cassandra/protocol.py

+18-18
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ def __init__(self, cqlversion, options):
421421
self.cqlversion = cqlversion
422422
self.options = options
423423

424-
def send_body(self, f, protocol_version):
424+
def send_body(self, f, protocol_version, protocol_features):
425425
optmap = self.options.copy()
426426
optmap['CQL_VERSION'] = self.cqlversion
427427
write_stringmap(f, optmap)
@@ -456,7 +456,7 @@ class CredentialsMessage(_MessageType):
456456
def __init__(self, creds):
457457
self.creds = creds
458458

459-
def send_body(self, f, protocol_version):
459+
def send_body(self, f, protocol_version, protocol_features):
460460
if protocol_version > 1:
461461
raise UnsupportedOperation(
462462
"Credentials-based authentication is not supported with "
@@ -487,7 +487,7 @@ class AuthResponseMessage(_MessageType):
487487
def __init__(self, response):
488488
self.response = response
489489

490-
def send_body(self, f, protocol_version):
490+
def send_body(self, f, protocol_version, protocol_features):
491491
write_longstring(f, self.response)
492492

493493

@@ -507,7 +507,7 @@ class OptionsMessage(_MessageType):
507507
opcode = 0x05
508508
name = 'OPTIONS'
509509

510-
def send_body(self, f, protocol_version):
510+
def send_body(self, f, protocol_version, protocol_features):
511511
pass
512512

513513

@@ -645,7 +645,7 @@ def __init__(self, query, consistency_level, serial_consistency_level=None,
645645
super(QueryMessage, self).__init__(None, consistency_level, serial_consistency_level, fetch_size,
646646
paging_state, timestamp, False, continuous_paging_options, keyspace)
647647

648-
def send_body(self, f, protocol_version):
648+
def send_body(self, f, protocol_version, protocol_features):
649649
write_longstring(f, self.query)
650650
self._write_query_params(f, protocol_version)
651651

@@ -681,9 +681,9 @@ def _write_query_params(self, f, protocol_version):
681681
else:
682682
super(ExecuteMessage, self)._write_query_params(f, protocol_version)
683683

684-
def send_body(self, f, protocol_version):
684+
def send_body(self, f, protocol_version, protocol_features):
685685
write_string(f, self.query_id)
686-
if ProtocolVersion.uses_prepared_metadata(protocol_version):
686+
if ProtocolVersion.uses_prepared_metadata(protocol_version) or protocol_features.use_metadata_id:
687687
write_string(f, self.result_metadata_id)
688688
self._write_query_params(f, protocol_version)
689689

@@ -734,15 +734,15 @@ class ResultMessage(_MessageType):
734734
def __init__(self, kind):
735735
self.kind = kind
736736

737-
def recv(self, f, protocol_version, user_type_map, result_metadata, column_encryption_policy):
737+
def recv(self, f, protocol_version, protocol_features, user_type_map, result_metadata, column_encryption_policy):
738738
if self.kind == RESULT_KIND_VOID:
739739
return
740740
elif self.kind == RESULT_KIND_ROWS:
741741
self.recv_results_rows(f, protocol_version, user_type_map, result_metadata, column_encryption_policy)
742742
elif self.kind == RESULT_KIND_SET_KEYSPACE:
743743
self.new_keyspace = read_string(f)
744744
elif self.kind == RESULT_KIND_PREPARED:
745-
self.recv_results_prepared(f, protocol_version, user_type_map)
745+
self.recv_results_prepared(f, protocol_version, protocol_features, user_type_map)
746746
elif self.kind == RESULT_KIND_SCHEMA_CHANGE:
747747
self.recv_results_schema_change(f, protocol_version)
748748
else:
@@ -752,7 +752,7 @@ def recv(self, f, protocol_version, user_type_map, result_metadata, column_encry
752752
def recv_body(cls, f, protocol_version, protocol_features, user_type_map, result_metadata, column_encryption_policy):
753753
kind = read_int(f)
754754
msg = cls(kind)
755-
msg.recv(f, protocol_version, user_type_map, result_metadata, column_encryption_policy)
755+
msg.recv(f, protocol_version, protocol_features, user_type_map, result_metadata, column_encryption_policy)
756756
return msg
757757

758758
def recv_results_rows(self, f, protocol_version, user_type_map, result_metadata, column_encryption_policy):
@@ -785,9 +785,9 @@ def decode_row(row):
785785
col_md[3].cql_parameterized_type(),
786786
str(e)))
787787

788-
def recv_results_prepared(self, f, protocol_version, user_type_map):
788+
def recv_results_prepared(self, f, protocol_version, protocol_features, user_type_map):
789789
self.query_id = read_binary_string(f)
790-
if ProtocolVersion.uses_prepared_metadata(protocol_version):
790+
if ProtocolVersion.uses_prepared_metadata(protocol_version) or protocol_features.use_metadata_id:
791791
self.result_metadata_id = read_binary_string(f)
792792
else:
793793
self.result_metadata_id = None
@@ -909,7 +909,7 @@ def __init__(self, query, keyspace=None):
909909
self.query = query
910910
self.keyspace = keyspace
911911

912-
def send_body(self, f, protocol_version):
912+
def send_body(self, f, protocol_version, protocol_features):
913913
write_longstring(f, self.query)
914914

915915
flags = 0x00
@@ -953,7 +953,7 @@ def __init__(self, batch_type, queries, consistency_level,
953953
self.timestamp = timestamp
954954
self.keyspace = keyspace
955955

956-
def send_body(self, f, protocol_version):
956+
def send_body(self, f, protocol_version, protocol_features):
957957
write_byte(f, self.batch_type.value)
958958
write_short(f, len(self.queries))
959959
for prepared, string_or_query_id, params in self.queries:
@@ -1012,7 +1012,7 @@ class RegisterMessage(_MessageType):
10121012
def __init__(self, event_list):
10131013
self.event_list = event_list
10141014

1015-
def send_body(self, f, protocol_version):
1015+
def send_body(self, f, protocol_version, protocol_features):
10161016
write_stringlist(f, self.event_list)
10171017

10181018

@@ -1086,7 +1086,7 @@ def __init__(self, op_type, op_id, next_pages=0):
10861086
self.op_id = op_id
10871087
self.next_pages = next_pages
10881088

1089-
def send_body(self, f, protocol_version):
1089+
def send_body(self, f, protocol_version, protocol_features):
10901090
write_int(f, self.op_type)
10911091
write_int(f, self.op_id)
10921092
if self.op_type == ReviseRequestMessage.RevisionType.PAGING_BACKPRESSURE:
@@ -1122,7 +1122,7 @@ class _ProtocolHandler(object):
11221122
"""Instance of :class:`cassandra.policies.ColumnEncryptionPolicy` in use by this handler"""
11231123

11241124
@classmethod
1125-
def encode_message(cls, msg, stream_id, protocol_version, compressor, allow_beta_protocol_version):
1125+
def encode_message(cls, msg, stream_id, protocol_version, protocol_features,compressor, allow_beta_protocol_version):
11261126
"""
11271127
Encodes a message using the specified frame parameters, and compressor
11281128
@@ -1138,7 +1138,7 @@ def encode_message(cls, msg, stream_id, protocol_version, compressor, allow_beta
11381138
raise UnsupportedOperation("Custom key/value payloads can only be used with protocol version 4 or higher")
11391139
flags |= CUSTOM_PAYLOAD_FLAG
11401140
write_bytesmap(body, msg.custom_payload)
1141-
msg.send_body(body, protocol_version)
1141+
msg.send_body(body, protocol_version, protocol_features)
11421142
body = body.getvalue()
11431143

11441144
# With checksumming, the compression is done at the segment frame encoding

cassandra/protocol_features.py

+13-3
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,29 @@
77

88
RATE_LIMIT_ERROR_EXTENSION = "SCYLLA_RATE_LIMIT_ERROR"
99
TABLETS_ROUTING_V1 = "TABLETS_ROUTING_V1"
10+
USE_METADATA_ID = "SCYLLA_USE_METADATA_ID"
1011

1112
class ProtocolFeatures(object):
1213
rate_limit_error = None
1314
shard_id = 0
1415
sharding_info = None
1516
tablets_routing_v1 = False
17+
use_metadata_id = False
1618

17-
def __init__(self, rate_limit_error=None, shard_id=0, sharding_info=None, tablets_routing_v1=False):
19+
def __init__(self, rate_limit_error=None, shard_id=0, sharding_info=None, tablets_routing_v1=False, use_metadata_id=False):
1820
self.rate_limit_error = rate_limit_error
1921
self.shard_id = shard_id
2022
self.sharding_info = sharding_info
2123
self.tablets_routing_v1 = tablets_routing_v1
24+
self.use_metadata_id = use_metadata_id
2225

2326
@staticmethod
2427
def parse_from_supported(supported):
25-
rate_limit_error = ProtocolFeatures.maybe_parse_rate_limit_error(supported)
2628
shard_id, sharding_info = ProtocolFeatures.parse_sharding_info(supported)
29+
rate_limit_error = ProtocolFeatures.maybe_parse_rate_limit_error(supported)
2730
tablets_routing_v1 = ProtocolFeatures.parse_tablets_info(supported)
28-
return ProtocolFeatures(rate_limit_error, shard_id, sharding_info, tablets_routing_v1)
31+
use_metadata_id = ProtocolFeatures.parse_metadata_id_info(supported)
32+
return ProtocolFeatures(rate_limit_error, shard_id, sharding_info, tablets_routing_v1, use_metadata_id)
2933

3034
@staticmethod
3135
def maybe_parse_rate_limit_error(supported):
@@ -49,6 +53,8 @@ def add_startup_options(self, options):
4953
options[RATE_LIMIT_ERROR_EXTENSION] = ""
5054
if self.tablets_routing_v1:
5155
options[TABLETS_ROUTING_V1] = ""
56+
if self.use_metadata_id:
57+
options[USE_METADATA_ID] = ""
5258

5359
@staticmethod
5460
def parse_sharding_info(options):
@@ -72,3 +78,7 @@ def parse_sharding_info(options):
7278
@staticmethod
7379
def parse_tablets_info(options):
7480
return TABLETS_ROUTING_V1 in options
81+
82+
@staticmethod
83+
def parse_metadata_id_info(options):
84+
return USE_METADATA_ID in options

0 commit comments

Comments
 (0)