This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new 090dd4fd CASSANALYTICS-20: CassandraDataLayer uses configuration list
of IPs instead of the full ring/datacenter (#122)
090dd4fd is described below
commit 090dd4fdc86ea0ca9410140ce72e840e37497df7
Author: Yifan Cai <[email protected]>
AuthorDate: Sat Jun 21 20:45:26 2025 -0700
CASSANALYTICS-20: CassandraDataLayer uses configuration list of IPs instead
of the full ring/datacenter (#122)
Patch by Serban Teodorescu, Yifan Cai; Reviewed by Francisco Guerrero,
Yifan Cai for CASSANALYTICS-20
---------
Co-authored-by: Serban Teodorescu <[email protected]>
---
CHANGES.txt | 1 +
.../cassandra/spark/data/CassandraDataLayer.java | 31 +++++++++++++----
.../apache/cassandra/analytics/BulkReaderTest.java | 27 +++++++++++++++
.../SharedClusterSparkIntegrationTestBase.java | 21 ++++++++++--
.../apache/cassandra/analytics/SparkTestUtils.java | 39 +++++++++++++---------
5 files changed, 94 insertions(+), 25 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 1f074a1f..8fd4e7af 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.0.0
+ * Use full ring instead of only IPs from configuration (CASSANALYTICS-20)
* Bulk Reader should dynamically size the Spark job based on estimated table
size (CASSANALYTICS-36)
* Allow getting cassandra role in Spark options for use in Sidecar requests
for RBAC (CASSANALYTICS-61)
* Fix NPE in the deserialized CassandraClusterInfoGroup (CASSANALYTICS-59)
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
index 5e7910ae..bf877f61 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
@@ -146,7 +146,7 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
private SslConfig sslConfig;
@VisibleForTesting
- transient Map<String, SidecarInstance> instanceMap;
+ transient Map<String, SidecarInstance> sidecarInstanceMap;
public CassandraDataLayer(@NotNull ClientConfig options,
@NotNull Sidecar.ClientConfig
sidecarClientConfig,
@@ -220,6 +220,7 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
this.rfMap = rfMap;
this.timeProvider = timeProvider;
this.maybeQuoteKeyspaceAndTable();
+ this.initSidecarClient();
this.initInstanceMap();
this.startupValidate();
}
@@ -234,7 +235,7 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
// Load cluster config from options
clusterConfig = initializeClusterConfig(options);
- initInstanceMap();
+ initSidecarClient();
// Get cluster info from Cassandra Sidecar
int effectiveNumberOfCores;
@@ -251,6 +252,7 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
{
throw new RuntimeException(ThrowableUtils.rootCause(exception));
}
+ initInstanceMap();
LOGGER.info("Initialized Cassandra Bulk Reader with
effectiveNumberOfCores={}", effectiveNumberOfCores);
}
@@ -422,7 +424,21 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
protected void initInstanceMap()
{
- instanceMap =
clusterConfig.stream().collect(Collectors.toMap(SidecarInstance::hostname,
Function.identity()));
+ Preconditions.checkState(tokenPartitioner != null, "tokenPartitioner
cannot be absent");
+ sidecarInstanceMap = tokenPartitioner
+ .ring()
+ .instances()
+ .stream()
+ .filter(instance -> datacenter == null ||
datacenter.equals(instance.dataCenter()))
+ .map(CassandraInstance::nodeName)
+ .distinct()
+ .map(nodeName -> new
SidecarInstanceImpl(nodeName, sidecarClientConfig.effectivePort()))
+
.collect(Collectors.toMap(SidecarInstance::hostname, Function.identity()));
+ LOGGER.info("Initialized CassandraDataLayer sidecarInstanceMap
numInstances={}", sidecarInstanceMap.size());
+ }
+
+ protected void initSidecarClient()
+ {
try
{
SslConfigSecretsProvider secretsProvider = sslConfig != null
@@ -436,7 +452,7 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
{
throw new RuntimeException("Unable to build sidecar client",
ioException);
}
- LOGGER.info("Initialized CassandraDataLayer instanceMap
numInstances={}", instanceMap.size());
+ LOGGER.info("Initialized sidecar client");
}
@Override
@@ -516,7 +532,7 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
@NotNull
Range<BigInteger> range,
@NotNull
CassandraInstance instance)
{
- SidecarInstance sidecarInstance = instanceMap.get(instance.nodeName());
+ SidecarInstance sidecarInstance =
sidecarInstanceMap.get(instance.nodeName());
if (sidecarInstance == null)
{
throw new IllegalStateException("Could not find matching cassandra
instance: " + instance.nodeName());
@@ -733,6 +749,7 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
this.rfMap = (Map<String, ReplicationFactor>) in.readObject();
this.timeProvider = new ReaderTimeProvider(in.readInt());
this.maybeQuoteKeyspaceAndTable();
+ this.initSidecarClient();
this.initInstanceMap();
this.startupValidate();
}
@@ -944,10 +961,10 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
LOGGER.info("Clearing snapshot at end of Spark job snapshotName={}
keyspace={} table={} dc={}",
snapshotName, maybeQuotedKeyspace, maybeQuotedTable,
datacenter);
- CountDownLatch latch = new CountDownLatch(clusterConfig.size());
+ CountDownLatch latch = new CountDownLatch(sidecarInstanceMap.size());
try
{
- for (SidecarInstance instance : clusterConfig)
+ for (SidecarInstance instance : sidecarInstanceMap.values())
{
sidecar.clearSnapshot(instance, maybeQuotedKeyspace,
maybeQuotedTable, snapshotName).whenComplete((resp, throwable) -> {
try
diff --git
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTest.java
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTest.java
index 21637e85..7c5ebaff 100644
---
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTest.java
+++
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkReaderTest.java
@@ -20,6 +20,7 @@
package org.apache.cassandra.analytics;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
@@ -29,6 +30,7 @@ import org.junit.jupiter.api.Test;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.sidecar.testing.QualifiedName;
+import org.apache.cassandra.testing.ClusterBuilderConfiguration;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -48,6 +50,13 @@ class BulkReaderTest extends
SharedClusterSparkIntegrationTestBase
QualifiedName table2 = uniqueTestTableFullName(TEST_KEYSPACE);
QualifiedName tableForNullStaticColumn =
uniqueTestTableFullName(TEST_KEYSPACE);
+ @Override
+ protected ClusterBuilderConfiguration testClusterConfiguration()
+ {
+ return super.testClusterConfiguration()
+ .nodesPerDc(2);
+ }
+
@Test
void testDynamicSizingOption()
{
@@ -119,6 +128,24 @@ class BulkReaderTest extends
SharedClusterSparkIntegrationTestBase
}
}
+ @Test
+ void testUsingSingleSidecarContactPoint()
+ {
+ String singleSidecar =
SparkTestUtils.sidecarInstancesOptionStream(cluster, dnsResolver)
+ .limit(1)
+ .collect(Collectors.joining());
+ assertThat(cluster.size()).isEqualTo(2);
+ assertThat(singleSidecar.contains(","))
+ .describedAs("should not contain the separator ',' as it should have
one single contact point")
+ .isFalse();
+ Dataset<Row> data = bulkReaderDataFrame(table1,
Collections.singletonMap("sidecar_contact_points", singleSidecar)).load();
+
+ List<Row> rows = data.collectAsList().stream()
+ .sorted(Comparator.comparing(row ->
row.getInt(0)))
+ .collect(Collectors.toList());
+ assertThat(rows.size()).isEqualTo(DATASET.size());
+ }
+
@Override
protected void initializeSchemaForTest()
{
diff --git
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java
index 9a7d5889..c6a98ca7 100644
---
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java
+++
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SharedClusterSparkIntegrationTestBase.java
@@ -91,8 +91,25 @@ public abstract class SharedClusterSparkIntegrationTestBase
extends SharedCluste
*/
protected DataFrameReader bulkReaderDataFrame(QualifiedName tableName)
{
- return
sparkTestUtils.defaultBulkReaderDataFrame(getOrCreateSparkConf(),
getOrCreateSparkSession(),
- tableName);
+ return
sparkTestUtils.defaultBulkReaderDataFrame(getOrCreateSparkConf(),
+
getOrCreateSparkSession(),
+ tableName,
Collections.emptyMap());
+ }
+
+ /**
+ * A preconfigured {@link DataFrameReader} with pre-populated required
options that can be overridden
+ * with additional options for every specific test.
+ *
+ * @param tableName the qualified name for the Cassandra table
+ * @param additionalOptions additional options for the data frame
+ * @return a {@link DataFrameReader} for Cassandra bulk reads
+ */
+ protected DataFrameReader bulkReaderDataFrame(QualifiedName tableName,
Map<String, String> additionalOptions)
+ {
+ return
sparkTestUtils.defaultBulkReaderDataFrame(getOrCreateSparkConf(),
+
getOrCreateSparkSession(),
+ tableName,
+ additionalOptions);
}
/**
diff --git
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
index 0e3a1366..5eda84a9 100644
---
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
+++
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.analytics;
import java.net.UnknownHostException;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -112,7 +113,8 @@ public class SparkTestUtils
*/
public DataFrameReader defaultBulkReaderDataFrame(SparkConf sparkConf,
SparkSession spark,
- QualifiedName tableName)
+ QualifiedName tableName,
+ Map<String, String>
additionalOptions)
{
SQLContext sql = spark.sqlContext();
SparkContext sc = spark.sparkContext();
@@ -121,22 +123,27 @@ public class SparkTestUtils
int numExecutors =
sparkConf.getInt("spark.dynamicAllocation.maxExecutors",
sparkConf.getInt("spark.executor.instances", 1));
int numCores = coresPerExecutor * numExecutors;
+ Map<String, String> options = new HashMap<>();
+ options.put("sidecar_contact_points", sidecarInstancesOption(cluster,
dnsResolver));
+ options.put("keyspace", tableName.keyspace());
+ options.put("table", tableName.table());
+ options.put("DC", "datacenter1");
+ options.put("snapshotName", UUID.randomUUID().toString());
+ options.put("createSnapshot", "true");
+ // Shutdown hooks are called after the job ends, and in the case of
integration tests
+ // the sidecar is already shut down before this. Since the cluster
will be torn
+ // down anyway, the integration job skips clearing snapshots.
+ options.put("clearSnapshotStrategy", "noop");
+ options.put("defaultParallelism",
String.valueOf(sc.defaultParallelism()));
+ options.put("numCores", String.valueOf(numCores));
+ options.put("sizing", "default");
+ options.put("sidecar_port", String.valueOf(sidecarPort));
+ // merge in additionalOptions; note that for options with the same
name, the entries in additionalOptions are kept
+ options.putAll(additionalOptions);
+
return
sql.read().format("org.apache.cassandra.spark.sparksql.CassandraDataSource")
- .option("sidecar_contact_points",
sidecarInstancesOption(cluster, dnsResolver))
- .option("keyspace", tableName.keyspace()) // unquoted
- .option("table", tableName.table()) // unquoted
- .option("DC", "datacenter1")
- .option("snapshotName", UUID.randomUUID().toString())
- .option("createSnapshot", "true")
- // Shutdown hooks are called after the job ends, and in the
case of integration tests
- // the sidecar is already shut down before this. Since the
cluster will be torn
- // down anyway, the integration job skips clearing snapshots.
- .option("clearSnapshotStrategy", "noop")
- .option("defaultParallelism", sc.defaultParallelism())
- .option("numCores", numCores)
- .option("sizing", "default")
- .options(mtlsTestHelper.mtlOptionMap())
- .option("sidecar_port", sidecarPort);
+ .options(options)
+ .options(mtlsTestHelper.mtlOptionMap());
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]