sarankk commented on code in PR #169:
URL:
https://github.com/apache/cassandra-analytics/pull/169#discussion_r2984343777
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/cloudstorage/coordinated/CassandraClusterInfoGroup.java:
##########
@@ -395,4 +366,61 @@ Map<String, ClusterInfo> clusterInfoByIdUnsafe()
{
return clusterInfoById;
}
+
+ /**
+ * Retrieves the lowest Cassandra version from all clusters.
+ *
+ * @param conf Bulk Spark configuration
+ * @return lowest Cassandra version string across all clusters
+ */
+ public static String getLowestCassandraVersion(BulkSparkConf conf)
+ {
+ CoordinatedWriteConf coordinatedWriteConf =
conf.coordinatedWriteConf();
+ Preconditions.checkArgument(coordinatedWriteConf != null,
+ "CoordinatedWriteConf is required for
multi-cluster operations");
+
+ Map<String, String> clusterVersions = new HashMap<>();
+ for (String clusterId : coordinatedWriteConf.clusters().keySet())
+ {
+ String version =
CassandraClusterInfo.getLowestCassandraVersion(conf, clusterId);
+ clusterVersions.put(clusterId, version);
+ }
+
+ // Find the lowest version across all clusters
+ List<CassandraVersionFeatures> versions = clusterVersions.values()
+ .stream()
+
.map(CassandraVersionFeatures::cassandraVersionFeaturesFromCassandraVersion)
+ .sorted()
+
.collect(Collectors.toList());
+
+ CassandraVersionFeatures first = versions.get(0);
+ CassandraVersionFeatures last = versions.get(versions.size() - 1);
+ Preconditions.checkState(first.getMajorVersion() ==
last.getMajorVersion(),
+ "Cluster versions are not compatible.
lowest=%s and highest=%s",
+ first.getRawVersionString(),
last.getRawVersionString());
+
+ return first.getRawVersionString();
+ }
+
+ /**
+ * Retrieves aggregated SSTable versions from all clusters.
+ *
+ * @param conf Bulk Spark configuration
+ * @return set of SSTable versions present across all clusters
+ */
+ public static Set<String> getSSTableVersionsOnCluster(BulkSparkConf conf)
+ {
+ CoordinatedWriteConf coordinatedWriteConf =
conf.coordinatedWriteConf();
+ Preconditions.checkArgument(coordinatedWriteConf != null,
+ "CoordinatedWriteConf is required for
multi-cluster operations");
+
+ Set<String> aggregatedSSTableVersions = new HashSet<>();
+ for (String clusterId : coordinatedWriteConf.clusters().keySet())
+ {
+ Set<String> sstableVersions =
CassandraClusterInfo.getSSTableVersionsOnCluster(conf, clusterId);
+ aggregatedSSTableVersions.addAll(sstableVersions);
Review Comment:
the check we do comparing lowest and highest Cassandra version in
`getLowestCassandraVersion` is missing when we get bridge based on SSTable
versions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]