This is an automated email from the ASF dual-hosted git repository.
jmckenzie pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new 978d969f CASSANALYTICS-128: Add flag to allow bulk write to indexed
tables (#181)
978d969f is described below
commit 978d969f3ceb01f8eb34194d88d0a896989b2660
Author: Josh McKenzie <[email protected]>
AuthorDate: Fri Mar 13 15:58:18 2026 -0400
CASSANALYTICS-128: Add flag to allow bulk write to indexed tables (#181)
Patch by Josh McKenzie; reviewed by Jyothsna Konisa and Shailaja Koppu for
CASSANALYTICS-128
---
CHANGES.txt | 1 +
.../bulkwriter/AbstractBulkWriterContext.java | 3 +-
.../cassandra/spark/bulkwriter/BulkSparkConf.java | 2 ++
.../cassandra/spark/bulkwriter/TableSchema.java | 19 ++++++++--
.../cassandra/spark/bulkwriter/WriterOptions.java | 8 +++++
.../spark/bulkwriter/TableSchemaTest.java | 11 ++++++
.../spark/bulkwriter/TableSchemaTestCommon.java | 42 ++++++++++++++++++++--
7 files changed, 80 insertions(+), 6 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index d2e4573c..d0e3c524 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@
* Fixing CdcTests.testMockedCdc broken due to incorrect position update in
BufferingCommitLogReader (CASSANALYTICS-127)
* Commitlog reading not progressing in CDC due to incorrect
CommitLogReader.isFullyRead (CASSANALYTICS-124)
* Incorrect hash code calculation in PartitionUpdateWrapper.Digest
(CASSANALYTICS-125)
+ * Add flag to allow bulk write to indexed tables (CASSANALYTICS-128)
* Assign data file start offset based on BTI index (CASSANALYTICS-121)
* Quote identifiers option must be set to true if ttl has mixed case column
name (CASSANALYTICS-120)
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
index 6acbb2fc..b6eff3c8 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
@@ -321,7 +321,8 @@ public abstract class AbstractBulkWriterContext implements
BulkWriterContext, Kr
conf.getTTLOptions(),
conf.getTimestampOptions(),
lowestCassandraVersion,
- job().qualifiedTableName().quoteIdentifiers());
+ job().qualifiedTableName().quoteIdentifiers(),
+ conf.skipSecondaryIndexCheck);
}
@NotNull
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
index 3f52cd74..68f5636b 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
@@ -127,6 +127,7 @@ public class BulkSparkConf implements Serializable
public final int commitThreadsPerInstance;
public final double importCoordinatorTimeoutMultiplier;
public boolean quoteIdentifiers;
+ public final boolean skipSecondaryIndexCheck;
protected final String keystorePassword;
protected final String keystorePath;
protected final String keystoreBase64Encoded;
@@ -207,6 +208,7 @@ public class BulkSparkConf implements Serializable
this.ttl = MapUtils.getOrDefault(options, WriterOptions.TTL.name(),
null);
this.timestamp = MapUtils.getOrDefault(options,
WriterOptions.TIMESTAMP.name(), null);
this.quoteIdentifiers = MapUtils.getBoolean(options,
WriterOptions.QUOTE_IDENTIFIERS.name(), false, "quote identifiers");
+ this.skipSecondaryIndexCheck = MapUtils.getBoolean(options,
WriterOptions.SKIP_SECONDARY_INDEX_CHECK.name(), false, "skip secondary index
check");
int storageClientConcurrency = MapUtils.getInt(options,
WriterOptions.STORAGE_CLIENT_CONCURRENCY.name(),
DEFAULT_STORAGE_CLIENT_CONCURRENCY, "storage client concurrency");
long storageClientKeepAliveSeconds = MapUtils.getLong(options,
WriterOptions.STORAGE_CLIENT_THREAD_KEEP_ALIVE_SECONDS.name(),
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
index 8f0bbcef..b198af98 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
@@ -69,7 +69,8 @@ public class TableSchema
TTLOption ttlOption,
TimestampOption timestampOption,
String lowestCassandraVersion,
- boolean quoteIdentifiers)
+ boolean quoteIdentifiers,
+ boolean skipSecondaryIndexCheck)
{
this.writeMode = writeMode;
this.ttlOption = ttlOption;
@@ -79,7 +80,21 @@ public class TableSchema
this.quoteIdentifiers = quoteIdentifiers;
validateDataFrameCompatibility(dfSchema, tableInfo);
- validateNoSecondaryIndexes(tableInfo);
+ // If a table has indexes on it, some external process (application,
DB, etc.) is responsible for rebuilding
+ // indexes on the table after the bulk write completes; cassandra does
this as part of the SSTable import
+ // process today. 2i and SAI have different ergonomics here regarding
if stale data is served during index build;
+ // ultimately we want the bulk writer to also write native SAI index
files alongside sstables but until
+ // then, this is allowable and fine for users who Know What They're
Doing.
+ if (!skipSecondaryIndexCheck)
+ {
+ validateNoSecondaryIndexes(tableInfo);
+ }
+ else if (tableInfo.hasSecondaryIndex())
+ {
+ LOGGER.warn("Bulk writing to tables with SecondaryIndexes will
have an asynchronous index rebuild "
+ + "take place automatically after writing. Reads against
the index during this time "
+ + "window will produce inconsistent or stale results
until index rebuild is complete.");
+ }
validateUserAddedColumns(lowestCassandraVersion, quoteIdentifiers,
ttlOption, timestampOption);
this.createStatement = getCreateStatement(tableInfo);
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
index 6574e278..445ab206 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
@@ -132,4 +132,12 @@ public enum WriterOptions implements WriterOption
* - a failure otherwise
*/
JOB_TIMEOUT_SECONDS,
+ /**
+ * Option to bypass the secondary index validation check during bulk write
job setup.
+ * By default, bulk writes to tables with secondary indexes are rejected.
+ * Setting this option to {@code true} allows bulk writes to proceed on
tables that have secondary indexes,
+ * with the understanding that the secondary indexes will NOT be updated
by the bulk write and must be
+ * rebuilt separately after the job completes.
+ */
+ SKIP_SECONDARY_INDEX_CHECK,
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
index 266caed5..138bbffe 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
@@ -270,6 +270,17 @@ public class TableSchemaTest
.hasMessage("Bulkwriter doesn't support secondary indexes");
}
+ @ParameterizedTest
+
@MethodSource("org.apache.cassandra.bridge.VersionRunner#supportedVersions")
+ public void testSecondaryIndexAllowedWithSkipCheck(String cassandraVersion)
+ {
+ TableSchema schema = getValidSchemaBuilder(cassandraVersion)
+ .withHasSecondaryIndex()
+ .withSkipSecondaryIndexCheck()
+ .build();
+ assertThat(schema).isNotNull();
+ }
+
@ParameterizedTest
@MethodSource("org.apache.cassandra.bridge.VersionRunner#supportedVersions")
public void testMixedCaseTTLColumnNameWithoutQuoteIdentifiersFails(String
cassandraVersion)
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
index cff2e6f4..4122d94d 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
@@ -191,6 +191,8 @@ public final class TableSchemaTestCommon
private TTLOption ttlOption = TTLOption.forever();
private TimestampOption timestampOption = TimestampOption.now();
private boolean quoteIdentifiers = false;
+ private boolean skipSecondaryIndexCheck = false;
+ private boolean hasSecondaryIndex = false;
public MockTableSchemaBuilder(CassandraBridge bridge)
{
@@ -270,6 +272,18 @@ public final class TableSchemaTestCommon
return this;
}
+ public MockTableSchemaBuilder withSkipSecondaryIndexCheck()
+ {
+ this.skipSecondaryIndexCheck = true;
+ return this;
+ }
+
+ public MockTableSchemaBuilder withHasSecondaryIndex()
+ {
+ this.hasSecondaryIndex = true;
+ return this;
+ }
+
private ImmutableMap<String, CqlField.CqlType>
addColumnToCqlColumns(ImmutableMap<String, CqlField.CqlType> currentColumns,
String columnName,
String cqlType)
@@ -315,8 +329,16 @@ public final class TableSchemaTestCommon
partitionKeyColumnTypes,
primaryKeyColumnNames,
cassandraVersion,
-
quoteIdentifiers);
- return new TableSchema(dataFrameSchema, tableInfoProvider,
writeMode, ttlOption, timestampOption, cassandraVersion, quoteIdentifiers);
+
quoteIdentifiers,
+
hasSecondaryIndex);
+ return new TableSchema(dataFrameSchema,
+ tableInfoProvider,
+ writeMode,
+ ttlOption,
+ timestampOption,
+ cassandraVersion,
+ quoteIdentifiers,
+ skipSecondaryIndexCheck);
}
}
@@ -333,6 +355,7 @@ public final class TableSchemaTestCommon
Map<String, CqlField.CqlType> columns;
private final String cassandraVersion;
private final boolean quoteIdentifiers;
+ private final boolean hasSecondaryIndex;
public MockTableInfoProvider(CassandraBridge bridge,
ImmutableMap<String, CqlField.CqlType>
cqlColumns,
@@ -341,6 +364,18 @@ public final class TableSchemaTestCommon
String[] primaryKeyColumnNames,
String cassandraVersion,
boolean quoteIdentifiers)
+ {
+ this(bridge, cqlColumns, partitionKeyColumns,
partitionKeyColumnTypes, primaryKeyColumnNames, cassandraVersion,
quoteIdentifiers, false);
+ }
+
+ public MockTableInfoProvider(CassandraBridge bridge,
+ ImmutableMap<String, CqlField.CqlType>
cqlColumns,
+ String[] partitionKeyColumns,
+ ColumnType[] partitionKeyColumnTypes,
+ String[] primaryKeyColumnNames,
+ String cassandraVersion,
+ boolean quoteIdentifiers,
+ boolean hasSecondaryIndex)
{
this.bridge = bridge;
this.cqlColumns = cqlColumns;
@@ -350,6 +385,7 @@ public final class TableSchemaTestCommon
columns = cqlColumns;
this.cassandraVersion =
cassandraVersion.replaceAll("(\\w+-)*cassandra-", "");
this.quoteIdentifiers = quoteIdentifiers;
+ this.hasSecondaryIndex = hasSecondaryIndex;
this.uniqueTableName = TEST_TABLE_PREFIX +
TEST_TABLE_ID.getAndIncrement();
}
@@ -439,7 +475,7 @@ public final class TableSchemaTestCommon
@Override
public boolean hasSecondaryIndex()
{
- return false;
+ return hasSecondaryIndex;
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]