1414from cassandra .auth import PlainTextAuthProvider
1515from cassandra .cluster import Cluster
1616from cassandra .policies import (
17+ AddressTranslator ,
18+ DCAwareRoundRobinPolicy ,
1719 ExponentialBackoffRetryPolicy ,
1820 FallthroughRetryPolicy ,
1921 RoundRobinPolicy ,
4042logger = logging .getLogger (__name__ )
4143
4244
45+ class _FixedAddressTranslator (AddressTranslator ):
46+ """Translates all discovered node addresses to a fixed host address.
47+
48+ Used in Docker/NAT environments where gossip-discovered internal IPs are unreachable.
49+ """
50+
51+ def __init__ (self , address : str ) -> None :
52+ self ._address = address
53+
54+ def translate (self , addr : str ) -> str : # noqa: ARG002
55+ return self ._address
56+
57+
4358class ScyllaDBExceptionHandlerMixin :
4459 """Mixin class to handle ScyllaDB/Cassandra exceptions in a consistent way."""
4560
@@ -182,13 +197,10 @@ def _create_cluster(self) -> Any:
182197
183198 # Configure load balancing policy with optional datacenter awareness
184199 if self .config .LOCAL_DC :
185- from cassandra .policies import DCAwareRoundRobinPolicy
186-
187200 base_policy = DCAwareRoundRobinPolicy (local_dc = self .config .LOCAL_DC )
201+ load_balancing_policy = TokenAwarePolicy (base_policy )
188202 else :
189- base_policy = RoundRobinPolicy ()
190-
191- load_balancing_policy = TokenAwarePolicy (base_policy )
203+ load_balancing_policy = TokenAwarePolicy (RoundRobinPolicy ())
192204
193205 if self .config .RETRY_POLICY == "FALLTHROUGH" :
194206 retry_policy = FallthroughRetryPolicy ()
@@ -203,6 +215,12 @@ def _create_cluster(self) -> Any:
203215 if self .config .DISABLE_SHARD_AWARENESS :
204216 shard_aware_options = {"disable" : True }
205217
218+ # Address translation for Docker/NAT environments where gossip-discovered
219+ # internal container IPs are unreachable from the host
220+ address_translator = None
221+ if self .config .ADDRESS_TRANSLATION_ENABLED :
222+ address_translator = _FixedAddressTranslator (self .config .CONTACT_POINTS [0 ])
223+
206224 # Cluster is from cassandra.cluster, properly typed
207225 cluster = Cluster (
208226 contact_points = self .config .CONTACT_POINTS ,
@@ -214,12 +232,17 @@ def _create_cluster(self) -> Any:
214232 load_balancing_policy = load_balancing_policy ,
215233 default_retry_policy = retry_policy ,
216234 shard_aware_options = shard_aware_options ,
235+ address_translator = address_translator ,
217236 )
218237
219238 # Configure connection pool settings
220239 if cluster .profile_manager is not None :
221240 profile = cluster .profile_manager .default
222241 profile .request_timeout = self .config .REQUEST_TIMEOUT
242+ # Configure connection pool limits per host
243+ profile .max_connections_per_host = self .config .MAX_CONNECTIONS_PER_HOST
244+ profile .min_connections_per_host = self .config .MIN_CONNECTIONS_PER_HOST
245+ profile .core_connections_per_host = self .config .CORE_CONNECTIONS_PER_HOST
223246
224247 # Set pool configuration
225248 cluster .connection_class .max_requests_per_connection = self .config .MAX_REQUESTS_PER_CONNECTION
@@ -328,10 +351,23 @@ def create_keyspace(self, keyspace: str, replication_factor: int = 1) -> None:
328351 keyspace (str): The name of the keyspace to create.
329352 replication_factor (int): The replication factor. Defaults to 1.
330353 """
331- query = f"""
332- CREATE KEYSPACE IF NOT EXISTS { keyspace }
333- WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': { replication_factor } }}
334- """
354+ # Use configured replication strategy
355+ if self .config .REPLICATION_STRATEGY == "NetworkTopologyStrategy" and self .config .REPLICATION_CONFIG :
356+ # Build replication config for NetworkTopologyStrategy
357+ replication_parts = ["'class': 'NetworkTopologyStrategy'" ]
358+ for dc , rf in self .config .REPLICATION_CONFIG .items ():
359+ replication_parts .append (f"'{ dc } ': { rf } " )
360+ replication_str = ", " .join (replication_parts )
361+ query = f"""
362+ CREATE KEYSPACE IF NOT EXISTS { keyspace }
363+ WITH replication = {{{ replication_str } }}
364+ """
365+ else :
366+ # Use SimpleStrategy (default)
367+ query = f"""
368+ CREATE KEYSPACE IF NOT EXISTS { keyspace }
369+ WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': { replication_factor } }}
370+ """
335371 try :
336372 self .execute (query )
337373 except Exception as e :
@@ -765,13 +801,10 @@ def _create_cluster(self) -> Any:
765801
766802 # Configure load balancing policy with optional datacenter awareness
767803 if self .config .LOCAL_DC :
768- from cassandra .policies import DCAwareRoundRobinPolicy
769-
770804 base_policy = DCAwareRoundRobinPolicy (local_dc = self .config .LOCAL_DC )
805+ load_balancing_policy = TokenAwarePolicy (base_policy )
771806 else :
772- base_policy = RoundRobinPolicy ()
773-
774- load_balancing_policy = TokenAwarePolicy (base_policy )
807+ load_balancing_policy = TokenAwarePolicy (RoundRobinPolicy ())
775808
776809 if self .config .RETRY_POLICY == "FALLTHROUGH" :
777810 retry_policy = FallthroughRetryPolicy ()
@@ -786,6 +819,12 @@ def _create_cluster(self) -> Any:
786819 if self .config .DISABLE_SHARD_AWARENESS :
787820 shard_aware_options = {"disable" : True }
788821
822+ # Address translation for Docker/NAT environments where gossip-discovered
823+ # internal container IPs are unreachable from the host
824+ address_translator = None
825+ if self .config .ADDRESS_TRANSLATION_ENABLED :
826+ address_translator = _FixedAddressTranslator (self .config .CONTACT_POINTS [0 ])
827+
789828 # Cluster is from cassandra.cluster, properly typed
790829 cluster = Cluster (
791830 contact_points = self .config .CONTACT_POINTS ,
@@ -797,12 +836,17 @@ def _create_cluster(self) -> Any:
797836 load_balancing_policy = load_balancing_policy ,
798837 default_retry_policy = retry_policy ,
799838 shard_aware_options = shard_aware_options ,
839+ address_translator = address_translator ,
800840 )
801841
802842 # Configure connection pool settings
803843 if cluster .profile_manager is not None :
804844 profile = cluster .profile_manager .default
805845 profile .request_timeout = self .config .REQUEST_TIMEOUT
846+ # Configure connection pool limits per host
847+ profile .max_connections_per_host = self .config .MAX_CONNECTIONS_PER_HOST
848+ profile .min_connections_per_host = self .config .MIN_CONNECTIONS_PER_HOST
849+ profile .core_connections_per_host = self .config .CORE_CONNECTIONS_PER_HOST
806850
807851 # Set pool configuration
808852 cluster .connection_class .max_requests_per_connection = self .config .MAX_REQUESTS_PER_CONNECTION
@@ -925,10 +969,23 @@ async def create_keyspace(self, keyspace: str, replication_factor: int = 1) -> N
925969 keyspace (str): The name of the keyspace to create.
926970 replication_factor (int): The replication factor. Defaults to 1.
927971 """
928- query = f"""
929- CREATE KEYSPACE IF NOT EXISTS { keyspace }
930- WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': { replication_factor } }}
931- """
972+ # Use configured replication strategy
973+ if self .config .REPLICATION_STRATEGY == "NetworkTopologyStrategy" and self .config .REPLICATION_CONFIG :
974+ # Build replication config for NetworkTopologyStrategy
975+ replication_parts = ["'class': 'NetworkTopologyStrategy'" ]
976+ for dc , rf in self .config .REPLICATION_CONFIG .items ():
977+ replication_parts .append (f"'{ dc } ': { rf } " )
978+ replication_str = ", " .join (replication_parts )
979+ query = f"""
980+ CREATE KEYSPACE IF NOT EXISTS { keyspace }
981+ WITH replication = {{{ replication_str } }}
982+ """
983+ else :
984+ # Use SimpleStrategy (default)
985+ query = f"""
986+ CREATE KEYSPACE IF NOT EXISTS { keyspace }
987+ WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': { replication_factor } }}
988+ """
932989 try :
933990 await self .execute (query )
934991 except Exception as e :
0 commit comments