This is an automated email from the ASF dual-hosted git repository.

jmckenzie pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 978d969f CASSANALYTICS-128: Add flag to allow bulk write to indexed 
tables (#181)
978d969f is described below

commit 978d969f3ceb01f8eb34194d88d0a896989b2660
Author: Josh McKenzie <[email protected]>
AuthorDate: Fri Mar 13 15:58:18 2026 -0400

    CASSANALYTICS-128: Add flag to allow bulk write to indexed tables (#181)
    
    Patch by Josh McKenzie; reviewed by Jyothsna Konisa and Shailaja Koppu for 
CASSANALYTICS-128
---
 CHANGES.txt                                        |  1 +
 .../bulkwriter/AbstractBulkWriterContext.java      |  3 +-
 .../cassandra/spark/bulkwriter/BulkSparkConf.java  |  2 ++
 .../cassandra/spark/bulkwriter/TableSchema.java    | 19 ++++++++--
 .../cassandra/spark/bulkwriter/WriterOptions.java  |  8 +++++
 .../spark/bulkwriter/TableSchemaTest.java          | 11 ++++++
 .../spark/bulkwriter/TableSchemaTestCommon.java    | 42 ++++++++++++++++++++--
 7 files changed, 80 insertions(+), 6 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index d2e4573c..d0e3c524 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@
  * Fixing CdcTests.testMockedCdc broken due to incorrect position update in 
BufferingCommitLogReader (CASSANALYTICS-127)
  * Commitlog reading not progressing in CDC due to incorrect 
CommitLogReader.isFullyRead (CASSANALYTICS-124)
  * Incorrect hash code calculation in PartitionUpdateWrapper.Digest 
(CASSANALYTICS-125)
+ * Add flag to allow bulk write to indexed tables (CASSANALYTICS-128)
  * Assign data file start offset based on BTI index (CASSANALYTICS-121)
  * Quote identifiers option must be set to true if ttl has mixed case column 
name (CASSANALYTICS-120)
 
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
index 6acbb2fc..b6eff3c8 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/AbstractBulkWriterContext.java
@@ -321,7 +321,8 @@ public abstract class AbstractBulkWriterContext implements 
BulkWriterContext, Kr
                                conf.getTTLOptions(),
                                conf.getTimestampOptions(),
                                lowestCassandraVersion,
-                               job().qualifiedTableName().quoteIdentifiers());
+                               job().qualifiedTableName().quoteIdentifiers(),
+                               conf.skipSecondaryIndexCheck);
     }
 
     @NotNull
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
index 3f52cd74..68f5636b 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
@@ -127,6 +127,7 @@ public class BulkSparkConf implements Serializable
     public final int commitThreadsPerInstance;
     public final double importCoordinatorTimeoutMultiplier;
     public boolean quoteIdentifiers;
+    public final boolean skipSecondaryIndexCheck;
     protected final String keystorePassword;
     protected final String keystorePath;
     protected final String keystoreBase64Encoded;
@@ -207,6 +208,7 @@ public class BulkSparkConf implements Serializable
         this.ttl = MapUtils.getOrDefault(options, WriterOptions.TTL.name(), 
null);
         this.timestamp = MapUtils.getOrDefault(options, 
WriterOptions.TIMESTAMP.name(), null);
         this.quoteIdentifiers = MapUtils.getBoolean(options, 
WriterOptions.QUOTE_IDENTIFIERS.name(), false, "quote identifiers");
+        this.skipSecondaryIndexCheck = MapUtils.getBoolean(options, 
WriterOptions.SKIP_SECONDARY_INDEX_CHECK.name(), false, "skip secondary index 
check");
         int storageClientConcurrency = MapUtils.getInt(options, 
WriterOptions.STORAGE_CLIENT_CONCURRENCY.name(),
                                                        
DEFAULT_STORAGE_CLIENT_CONCURRENCY, "storage client concurrency");
         long storageClientKeepAliveSeconds = MapUtils.getLong(options, 
WriterOptions.STORAGE_CLIENT_THREAD_KEEP_ALIVE_SECONDS.name(),
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
index 8f0bbcef..b198af98 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
@@ -69,7 +69,8 @@ public class TableSchema
                        TTLOption ttlOption,
                        TimestampOption timestampOption,
                        String lowestCassandraVersion,
-                       boolean quoteIdentifiers)
+                       boolean quoteIdentifiers,
+                       boolean skipSecondaryIndexCheck)
     {
         this.writeMode = writeMode;
         this.ttlOption = ttlOption;
@@ -79,7 +80,21 @@ public class TableSchema
         this.quoteIdentifiers = quoteIdentifiers;
 
         validateDataFrameCompatibility(dfSchema, tableInfo);
-        validateNoSecondaryIndexes(tableInfo);
+        // If a table has indexes on it, some external process (application, 
DB, etc.) is responsible for rebuilding
+        // indexes on the table after the bulk write completes; cassandra does 
this as part of the SSTable import
+        // process today. 2i and SAI have different ergonomics here regarding 
if stale data is served during index build;
+        // ultimately we want the bulk writer to also write native SAI index 
files alongside sstables but until
+        // then, this is allowable and fine for users who Know What They're 
Doing.
+        if (!skipSecondaryIndexCheck)
+        {
+            validateNoSecondaryIndexes(tableInfo);
+        }
+        else if (tableInfo.hasSecondaryIndex())
+        {
+            LOGGER.warn("Bulk writing to tables with SecondaryIndexes will 
have an asynchronous index rebuild "
+                      + "take place automatically after writing. Reads against 
the index during this time "
+                      + "window will produce inconsistent or stale results 
until index rebuild is complete.");
+        }
         validateUserAddedColumns(lowestCassandraVersion, quoteIdentifiers, 
ttlOption, timestampOption);
 
         this.createStatement = getCreateStatement(tableInfo);
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
index 6574e278..445ab206 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
@@ -132,4 +132,12 @@ public enum WriterOptions implements WriterOption
      * - a failure otherwise
      */
     JOB_TIMEOUT_SECONDS,
+    /**
+     * Option to bypass the secondary index validation check during bulk write 
job setup.
+     * By default, bulk writes to tables with secondary indexes are rejected.
+     * Setting this option to {@code true} allows bulk writes to proceed on 
tables that have secondary indexes,
+     * with the understanding that the secondary indexes will NOT be updated 
by the bulk write and must be
+     * rebuilt separately after the job completes.
+     */
+    SKIP_SECONDARY_INDEX_CHECK,
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
index 266caed5..138bbffe 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
@@ -270,6 +270,17 @@ public class TableSchemaTest
                 .hasMessage("Bulkwriter doesn't support secondary indexes");
     }
 
+    @ParameterizedTest
+    
@MethodSource("org.apache.cassandra.bridge.VersionRunner#supportedVersions")
+    public void testSecondaryIndexAllowedWithSkipCheck(String cassandraVersion)
+    {
+        TableSchema schema = getValidSchemaBuilder(cassandraVersion)
+                .withHasSecondaryIndex()
+                .withSkipSecondaryIndexCheck()
+                .build();
+        assertThat(schema).isNotNull();
+    }
+
     @ParameterizedTest
     
@MethodSource("org.apache.cassandra.bridge.VersionRunner#supportedVersions")
     public void testMixedCaseTTLColumnNameWithoutQuoteIdentifiersFails(String 
cassandraVersion)
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
index cff2e6f4..4122d94d 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
@@ -191,6 +191,8 @@ public final class TableSchemaTestCommon
         private TTLOption ttlOption = TTLOption.forever();
         private TimestampOption timestampOption = TimestampOption.now();
         private boolean quoteIdentifiers = false;
+        private boolean skipSecondaryIndexCheck = false;
+        private boolean hasSecondaryIndex = false;
 
         public MockTableSchemaBuilder(CassandraBridge bridge)
         {
@@ -270,6 +272,18 @@ public final class TableSchemaTestCommon
             return this;
         }
 
+        public MockTableSchemaBuilder withSkipSecondaryIndexCheck()
+        {
+            this.skipSecondaryIndexCheck = true;
+            return this;
+        }
+
+        public MockTableSchemaBuilder withHasSecondaryIndex()
+        {
+            this.hasSecondaryIndex = true;
+            return this;
+        }
+
         private ImmutableMap<String, CqlField.CqlType> 
addColumnToCqlColumns(ImmutableMap<String, CqlField.CqlType> currentColumns,
                                                                              
String columnName,
                                                                              
String cqlType)
@@ -315,8 +329,16 @@ public final class TableSchemaTestCommon
                                                                                
 partitionKeyColumnTypes,
                                                                                
 primaryKeyColumnNames,
                                                                                
 cassandraVersion,
-                                                                               
 quoteIdentifiers);
-            return new TableSchema(dataFrameSchema, tableInfoProvider, 
writeMode, ttlOption, timestampOption, cassandraVersion, quoteIdentifiers);
+                                                                               
 quoteIdentifiers,
+                                                                               
 hasSecondaryIndex);
+            return new TableSchema(dataFrameSchema,
+                                   tableInfoProvider,
+                                   writeMode,
+                                   ttlOption,
+                                   timestampOption,
+                                   cassandraVersion,
+                                   quoteIdentifiers,
+                                   skipSecondaryIndexCheck);
         }
     }
 
@@ -333,6 +355,7 @@ public final class TableSchemaTestCommon
         Map<String, CqlField.CqlType> columns;
         private final String cassandraVersion;
         private final boolean quoteIdentifiers;
+        private final boolean hasSecondaryIndex;
 
         public MockTableInfoProvider(CassandraBridge bridge,
                                      ImmutableMap<String, CqlField.CqlType> 
cqlColumns,
@@ -341,6 +364,18 @@ public final class TableSchemaTestCommon
                                      String[] primaryKeyColumnNames,
                                      String cassandraVersion,
                                      boolean quoteIdentifiers)
+        {
+            this(bridge, cqlColumns, partitionKeyColumns, 
partitionKeyColumnTypes, primaryKeyColumnNames, cassandraVersion, 
quoteIdentifiers, false);
+        }
+
+        public MockTableInfoProvider(CassandraBridge bridge,
+                                     ImmutableMap<String, CqlField.CqlType> 
cqlColumns,
+                                     String[] partitionKeyColumns,
+                                     ColumnType[] partitionKeyColumnTypes,
+                                     String[] primaryKeyColumnNames,
+                                     String cassandraVersion,
+                                     boolean quoteIdentifiers,
+                                     boolean hasSecondaryIndex)
         {
             this.bridge = bridge;
             this.cqlColumns = cqlColumns;
@@ -350,6 +385,7 @@ public final class TableSchemaTestCommon
             columns = cqlColumns;
             this.cassandraVersion = 
cassandraVersion.replaceAll("(\\w+-)*cassandra-", "");
             this.quoteIdentifiers = quoteIdentifiers;
+            this.hasSecondaryIndex = hasSecondaryIndex;
             this.uniqueTableName = TEST_TABLE_PREFIX + 
TEST_TABLE_ID.getAndIncrement();
         }
 
@@ -439,7 +475,7 @@ public final class TableSchemaTestCommon
         @Override
         public boolean hasSecondaryIndex()
         {
-            return false;
+            return hasSecondaryIndex;
         }
 
         @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to