skoppu22 commented on code in PR #169:
URL: 
https://github.com/apache/cassandra-analytics/pull/169#discussion_r2988955393


##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/bridge/SSTableVersionAnalyzer.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.bridge;
+
+import java.util.Comparator;
+import java.util.Optional;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Analyzes SSTable versions on a cluster to determine the appropriate
+ * Cassandra bridge to load for bulk write operations.
+ *
+ * <p>This class provides logic to select Cassandra bridge based on the 
highest SSTable
+ * version detected on the cluster and the user's requested format 
preference.</p>
+ */
+public final class SSTableVersionAnalyzer
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SSTableVersionAnalyzer.class);
+
+    private SSTableVersionAnalyzer()
+    {
+        // Utility class
+    }
+
+    /**
+     * Determines which CassandraVersion bridge to load based on:
+     * - Highest SSTable version detected on cluster
+     * - User's format preference
+     *
+     * @param sstableVersionsOnCluster Set of SSTable versions found on 
cluster nodes
+     * @param requestedFormat User's requested format, example: "big" or "bti"
+     * @param cassandraVersion Cassandra version string for fallback
+     * @param isSSTableVersionBasedBridgeDisabled flag to disable sstable 
version based bridge determination
+     * @return CassandraVersion enum indicating which bridge to load
+     * @throws IllegalArgumentException if format is invalid, or cluster 
doesn't support requested format,
+     *                                  or SSTable versions are empty/unknown
+     */
+    public static CassandraVersion determineBridgeVersionForWrite(Set<String> 
sstableVersionsOnCluster,
+                                                                  String 
requestedFormat,
+                                                                  String 
cassandraVersion,
+                                                                  boolean 
isSSTableVersionBasedBridgeDisabled)
+    {
+        // Check for fallback mode
+        if (isSSTableVersionBasedBridgeDisabled)
+        {
+            LOGGER.info("SSTable version-based bridge selection is disabled 
via configuration. " +
+                        "Using cassandra.version for bridge selection: {}", 
cassandraVersion);
+            return CassandraVersion.fromVersion(cassandraVersion)
+                                   .orElseThrow(() -> new 
UnsupportedOperationException(
+                                   String.format("Unsupported Cassandra 
version: %s", cassandraVersion)));
+        }
+
+        // Validate SSTable versions are present
+        ensureSSTableVersionsNotEmpty(sstableVersionsOnCluster);
+
+        // Find highest Cassandra version based on SSTable versions
+        CassandraVersion highestCassandraVersion = 
findHighestCassandraVersion(sstableVersionsOnCluster);
+
+        // Check if highestCassandraVersion supports the requested format
+        boolean supportsRequestedFormat = 
highestCassandraVersion.getNativeSStableVersions()
+            .stream()
+            .anyMatch(v -> v.startsWith(requestedFormat + "-"));
+
+        if (supportsRequestedFormat)
+        {
+            return highestCassandraVersion;
+        }
+        else
+        {
+            throw new IllegalArgumentException(
+                String.format("Cluster does not support requested SSTable 
format '%s'. " +
+                              "Bridge version determined is %s, which only 
supports formats: %s",
+                    requestedFormat, highestCassandraVersion.versionName(),
+                    highestCassandraVersion.sstableFormats()));
+        }
+    }
+
+    /**
+     * Determines which CassandraVersion bridge to load for read operations 
based on:
+     * - Highest SSTable version detected on cluster
+     *
+     * @param sstableVersionsOnCluster Set of SSTable versions found on 
cluster nodes
+     * @param cassandraVersion Cassandra version string for fallback
+     * @param isSSTableVersionBasedBridgeDisabled flag to disable sstable 
version based bridge determination
+     * @return CassandraVersion enum indicating which bridge to load
+     * @throws IllegalArgumentException if SSTable versions are empty/unknown
+     */
+    public static CassandraVersion determineBridgeVersionForRead(Set<String> 
sstableVersionsOnCluster,
+                                                                 String 
cassandraVersion,
+                                                                 boolean 
isSSTableVersionBasedBridgeDisabled)
+    {
+        // Check for fallback mode
+        if (isSSTableVersionBasedBridgeDisabled)
+        {
+            LOGGER.info("SSTable version-based bridge selection is disabled 
via configuration. " +
+                        "Using cassandra.version for bridge selection: {}", 
cassandraVersion);
+            return CassandraVersion.fromVersion(cassandraVersion)
+                                   .orElseThrow(() -> new 
UnsupportedOperationException(
+                                   String.format("Unsupported Cassandra 
version: %s", cassandraVersion)));
+        }
+
+        // Validate SSTable versions are present
+        ensureSSTableVersionsNotEmpty(sstableVersionsOnCluster);
+
+        // Find highest Cassandra version based on SSTable versions
+        CassandraVersion bridgeVersion = 
findHighestCassandraVersion(sstableVersionsOnCluster);
+
+        LOGGER.debug("Determined bridge version {} for read based on SSTable 
versions on cluster: {}",
+                     bridgeVersion.versionName(), sstableVersionsOnCluster);
+
+        return bridgeVersion;
+    }
+
+    /**
+     * Ensures that SSTable versions from cluster are not null or empty.
+     *
+     * @param sstableVersionsOnCluster Set of SSTable versions to validate
+     * @throws IllegalStateException if versions are null or empty
+     */
+    private static void ensureSSTableVersionsNotEmpty(Set<String> 
sstableVersionsOnCluster)
+    {
+        if (sstableVersionsOnCluster == null || 
sstableVersionsOnCluster.isEmpty())
+        {
+            throw new IllegalStateException(String.format(
+                "Unable to retrieve SSTable versions from cluster. " +

Review Comment:
   Here we are asking user to run the job using fallback mechanism if they 
thinks that's appropriate. We do not use fallback mechanism when 
spark.cassandra_analytics.bridge.disable_sstable_version_based is false. There 
must be a reason why sstable version based bridge selection is failing. User 
can evaluate the cluster state and try with fallback mechanism only if they 
think that's fine for their use case or cluster situation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to