This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9bedf98b6e Send client warnings when writing to a large partition
9bedf98b6e is described below
commit 9bedf98b6eb30fbefe00f7513eaf374fc7236e98
Author: minal-kyada <[email protected]>
AuthorDate: Sun Jan 11 23:27:45 2026 -0800
Send client warnings when writing to a large partition
Patch by Minal Kyada; reviewed by David Capwell and marcuse for
CASSANDRA-17258
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/config/Config.java | 5 +
.../cassandra/config/DatabaseDescriptor.java | 80 +++++-
.../org/apache/cassandra/db/MessageParams.java | 17 ++
.../apache/cassandra/db/MutationVerbHandler.java | 15 +-
.../org/apache/cassandra/db/WriteThresholds.java | 112 ++++++++
.../apache/cassandra/metrics/KeyspaceMetrics.java | 7 +
.../org/apache/cassandra/metrics/TableMetrics.java | 6 +
.../cassandra/metrics/TopPartitionTracker.java | 20 +-
src/java/org/apache/cassandra/net/ParamType.java | 5 +-
.../service/AbstractWriteResponseHandler.java | 31 +-
.../DatacenterSyncWriteResponseHandler.java | 22 +-
.../org/apache/cassandra/service/StorageProxy.java | 11 +
.../apache/cassandra/service/StorageService.java | 35 +++
.../cassandra/service/StorageServiceMBean.java | 11 +-
.../cassandra/service/WriteResponseHandler.java | 13 +-
.../reads/thresholds/CoordinatorWarnings.java | 94 ++----
.../thresholds/CoordinatorWarningsState.java | 155 ++++++++++
.../thresholds/CoordinatorWriteWarnings.java | 188 ++++++++++++
.../service/writes/thresholds/WarnCounter.java | 48 ++++
.../writes/thresholds/WriteThresholdCounter.java | 90 ++++++
.../thresholds/WriteThresholdMapSerializer.java | 68 +++++
.../writes/thresholds/WriteWarningContext.java | 67 +++++
.../writes/thresholds/WriteWarningsSnapshot.java | 95 +++++++
.../org/apache/cassandra/transport/Dispatcher.java | 11 +
test/conf/cassandra.yaml | 5 +
.../distributed/impl/CoordinatorHelper.java | 14 +-
.../cassandra/distributed/impl/Instance.java | 5 +
.../thresholds/AbstractWriteThresholdWarning.java | 315 +++++++++++++++++++++
.../test/thresholds/ReplicaWarningTest.java | 123 ++++++++
.../test/thresholds/WriteSizeWarningTest.java | 96 +++++++
.../test/thresholds/WriteTombstoneWarningTest.java | 99 +++++++
.../cassandra/config/DatabaseDescriptorTest.java | 168 ++++++++++-
.../service/writes/thresholds/WarnCounterTest.java | 147 ++++++++++
.../writes/thresholds/WriteWarningContextTest.java | 205 ++++++++++++++
.../thresholds/WriteWarningsSnapshotTest.java | 199 +++++++++++++
36 files changed, 2489 insertions(+), 94 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 68e5778058..e9de27af5c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Send client warnings when writing to a large partition (CASSANDRA-17258)
* Harden the possible range of values for max dictionary size and max total
sample size for dictionary training (CASSANDRA-21194)
* Implement a guardrail ensuring that minimum training frequency parameter is
provided in ZstdDictionaryCompressor (CASSANDRA-21192)
* Replace manual referencing with ColumnFamilyStore.selectAndReference when
training a dictionary (CASSANDRA-21188)
diff --git a/src/java/org/apache/cassandra/config/Config.java
b/src/java/org/apache/cassandra/config/Config.java
index c6c0351f08..0ac9c691a0 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -579,6 +579,11 @@ public class Config
public volatile int tombstone_warn_threshold = 1000;
public volatile int tombstone_failure_threshold = 100000;
+ public volatile boolean write_thresholds_enabled = false;
+ public volatile DataStorageSpec.LongBytesBound write_size_warn_threshold =
null;
+ public volatile DurationSpec.LongMillisecondsBound
coordinator_write_warn_interval = new
DurationSpec.LongMillisecondsBound("1000ms");
+ public volatile int write_tombstone_warn_threshold = -1;
+
public TombstonesMetricGranularity
tombstone_read_purgeable_metric_granularity =
TombstonesMetricGranularity.disabled;
public final ReplicaFilteringProtectionOptions
replica_filtering_protection = new ReplicaFilteringProtectionOptions();
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 9999913edd..59ffb9f02a 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -931,7 +931,7 @@ public class DatabaseDescriptor
applyConcurrentValidations(conf);
applyRepairCommandPoolSize(conf);
- applyReadThresholdsValidations(conf);
+ applyThresholdsValidations(conf);
if (conf.concurrent_materialized_view_builders <= 0)
throw new
ConfigurationException("concurrent_materialized_view_builders should be
strictly greater than 0, but was " +
conf.concurrent_materialized_view_builders, false);
@@ -1311,11 +1311,19 @@ public class DatabaseDescriptor
}
@VisibleForTesting
- static void applyReadThresholdsValidations(Config config)
+ static void applyThresholdsValidations(Config config)
{
+ // Validate read thresholds
validateReadThresholds("coordinator_read_size",
config.coordinator_read_size_warn_threshold,
config.coordinator_read_size_fail_threshold);
validateReadThresholds("local_read_size",
config.local_read_size_warn_threshold, config.local_read_size_fail_threshold);
validateReadThresholds("row_index_read_size",
config.row_index_read_size_warn_threshold,
config.row_index_read_size_fail_threshold);
+
+ // Write threshold warning depends on top_partitions tracking
+ if (config.write_thresholds_enabled && !config.top_partitions_enabled)
+ logger.warn("Write thresholds require top partitions tracking to
be enabled");
+
+ validateWriteSizeThreshold(config.write_size_warn_threshold,
config.min_tracked_partition_size);
+
validateWriteTombstoneThresholdRange(config.write_tombstone_warn_threshold,
config.min_tracked_partition_tombstone_count);
}
private static void validateReadThresholds(String name,
DataStorageSpec.LongBytesBound warn, DataStorageSpec.LongBytesBound fail)
@@ -1326,6 +1334,25 @@ public class DatabaseDescriptor
name +
"_warn_threshold", warn));
}
+ private static void
validateWriteSizeThreshold(DataStorageSpec.LongBytesBound writeSizeWarn,
DataStorageSpec.LongBytesBound minTrackedSize)
+ {
+ if (writeSizeWarn != null && minTrackedSize != null)
+ {
+ if (writeSizeWarn.toBytes() < minTrackedSize.toBytes())
+ throw new
ConfigurationException(String.format("write_size_warn_threshold (%s) cannot be
less than min_tracked_partition_size (%s)", writeSizeWarn, minTrackedSize));
+ }
+ }
+
+ private static void validateWriteTombstoneThresholdRange(int
writeTombstoneWarn, long minTrackedTombstoneCount)
+ {
+ if (writeTombstoneWarn < -1)
+ throw new
ConfigurationException(String.format("write_tombstone_warn_threshold (%d) must
be -1 (disabled) or >= 0", writeTombstoneWarn));
+
+ if (writeTombstoneWarn != -1 && writeTombstoneWarn <
minTrackedTombstoneCount)
+ throw new
ConfigurationException(String.format("write_tombstone_warn_threshold (%d)
cannot be less than min_tracked_partition_tombstone_count (%d)",
+ writeTombstoneWarn,
minTrackedTombstoneCount));
+ }
+
public static GuardrailsOptions getGuardrailsConfig()
{
return guardrails;
@@ -5482,6 +5509,55 @@ public class DatabaseDescriptor
conf.row_index_read_size_fail_threshold = value;
}
+ public static boolean getWriteThresholdsEnabled()
+ {
+ return conf.write_thresholds_enabled;
+ }
+
+ public static void setWriteThresholdsEnabled(boolean enabled)
+ {
+ if (enabled && !conf.top_partitions_enabled)
+ logger.warn("Write thresholds require top partitions tracking to
be enabled");
+ logger.info("updating write_thresholds_enabled to {}", enabled);
+ conf.write_thresholds_enabled = enabled;
+ }
+
+ @Nullable
+ public static DataStorageSpec.LongBytesBound getWriteSizeWarnThreshold()
+ {
+ return conf.write_size_warn_threshold;
+ }
+
+ public static void setWriteSizeWarnThreshold(@Nullable
DataStorageSpec.LongBytesBound value)
+ {
+ validateWriteSizeThreshold(value, conf.min_tracked_partition_size);
+ logger.info("updating write_size_warn_threshold to {}", value);
+ conf.write_size_warn_threshold = value;
+ }
+
+ public static DurationSpec.LongMillisecondsBound
getCoordinatorWriteWarnInterval()
+ {
+ return conf.coordinator_write_warn_interval;
+ }
+
+ public static void
setCoordinatorWriteWarnInterval(DurationSpec.LongMillisecondsBound ms)
+ {
+ logger.info("updating coordinator_write_warn_interval to {}", ms);
+ conf.coordinator_write_warn_interval = ms;
+ }
+
+ public static int getWriteTombstoneWarnThreshold()
+ {
+ return conf.write_tombstone_warn_threshold;
+ }
+
+ public static void setWriteTombstoneWarnThreshold(int threshold)
+ {
+ validateWriteTombstoneThresholdRange(threshold,
conf.min_tracked_partition_tombstone_count);
+ logger.info("updating write_tombstone_warn_threshold to {}",
threshold);
+ conf.write_tombstone_warn_threshold = threshold;
+ }
+
public static int getDefaultKeyspaceRF()
{
return conf.default_keyspace_rf;
diff --git a/src/java/org/apache/cassandra/db/MessageParams.java
b/src/java/org/apache/cassandra/db/MessageParams.java
index 689e3673ab..6556cf3c09 100644
--- a/src/java/org/apache/cassandra/db/MessageParams.java
+++ b/src/java/org/apache/cassandra/db/MessageParams.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.db;
+import java.util.Collections;
import java.util.EnumMap;
import java.util.Map;
@@ -66,6 +67,22 @@ public class MessageParams
get().clear();
}
+ /**
+ * Capture the current MessageParams for use across threads.
+ * This returns a copy of the current ThreadLocal params, which can be
safely
+ * passed to async callbacks that may run on different threads.
+ *
+ * @return immutable copy of current params, or empty map if none
+ */
+ public static Map<ParamType, Object> capture()
+ {
+ Map<ParamType, Object> current = local.get();
+ if (current == null || current.isEmpty())
+ return Collections.emptyMap();
+
+ return new EnumMap<>(current);
+ }
+
public static <T> Message<T> addToMessage(Message<T> message)
{
return message.withParams(get());
diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java
b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
index 3312a12b65..f4244c84e1 100644
--- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
@@ -17,6 +17,8 @@
*/
package org.apache.cassandra.db;
+import java.util.Map;
+
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.ForwardingInfo;
@@ -33,10 +35,13 @@ public class MutationVerbHandler extends
AbstractMutationVerbHandler<Mutation>
{
public static final MutationVerbHandler instance = new
MutationVerbHandler();
- private void respond(Message<?> respondTo, InetAddressAndPort
respondToAddress)
+ private void respond(Message<?> respondTo, InetAddressAndPort
respondToAddress, Map<ParamType, Object> params)
{
Tracing.trace("Enqueuing response to {}", respondToAddress);
- MessagingService.instance().send(respondTo.emptyResponse(),
respondToAddress);
+ Message<?> response = respondTo.emptyResponse();
+ if (!params.isEmpty())
+ response = response.withParams(params);
+ MessagingService.instance().send(response, respondToAddress);
}
private void failed()
@@ -53,9 +58,10 @@ public class MutationVerbHandler extends
AbstractMutationVerbHandler<Mutation>
return;
}
+ MessageParams.reset();
message.payload.validateSize(MessagingService.current_version,
ENTRY_OVERHEAD_SIZE);
+ WriteThresholds.checkWriteThresholds(message.payload);
- // Check if there were any forwarding headers in this message
ForwardingInfo forwardTo = message.forwardTo();
if (forwardTo != null)
forwardToLocalNodes(message, forwardTo);
@@ -73,7 +79,8 @@ public class MutationVerbHandler extends
AbstractMutationVerbHandler<Mutation>
protected void applyMutation(Message<Mutation> message, InetAddressAndPort
respondToAddress)
{
- message.payload.applyFuture().addCallback(o -> respond(message,
respondToAddress), wto -> failed());
+ Map<ParamType, Object> params = MessageParams.capture();
+ message.payload.applyFuture().addCallback(o -> respond(message,
respondToAddress, params), wto -> failed());
}
private static void forwardToLocalNodes(Message<Mutation> originalMessage,
ForwardingInfo forwardTo)
diff --git a/src/java/org/apache/cassandra/db/WriteThresholds.java
b/src/java/org/apache/cassandra/db/WriteThresholds.java
new file mode 100644
index 0000000000..ab6737f1ea
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/WriteThresholds.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.metrics.TopPartitionTracker;
+import org.apache.cassandra.net.ParamType;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.NoSpamLogger;
+
+/**
+ * Utility class for checking write threshold warnings on replicas.
+ * CASSANDRA-17258: paxos and accord do complex thread hand off and custom
write logic which makes this patch complex, so was deferred
+ */
+public class WriteThresholds
+{
+ private static final Logger logger =
LoggerFactory.getLogger(WriteThresholds.class);
+ private static final NoSpamLogger noSpamLogger =
NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
+
+ /**
+ * Check write thresholds for a mutation by comparing the estimated
partition size and tombstone count
+ * from {@link TopPartitionTracker} against configured warn thresholds. If
a threshold is breached,
+ * a warning is logged and the corresponding {@link ParamType} is added to
{@link MessageParams}
+ * for propagation back to the coordinator.
+ *
+ * @param mutation the mutation containing one or more partition updates
+ */
+ public static void checkWriteThresholds(Mutation mutation)
+ {
+ if (!DatabaseDescriptor.isDaemonInitialized() ||
!DatabaseDescriptor.getWriteThresholdsEnabled())
+ return;
+
+ DataStorageSpec.LongBytesBound sizeWarnThreshold =
DatabaseDescriptor.getWriteSizeWarnThreshold();
+ int tombstoneWarnThreshold =
DatabaseDescriptor.getWriteTombstoneWarnThreshold();
+
+ if (sizeWarnThreshold == null && tombstoneWarnThreshold == -1)
+ return;
+
+ long sizeWarnBytes = sizeWarnThreshold != null ?
sizeWarnThreshold.toBytes() : -1;
+ DecoratedKey key = mutation.key();
+
+ Map<TableId, Long> sizeWarnings = null;
+ Map<TableId, Long> tombstoneWarnings = null;
+
+ for (TableId tableId : mutation.getTableIds())
+ {
+ ColumnFamilyStore cfs =
Schema.instance.getColumnFamilyStoreInstance(tableId);
+ if (cfs == null || cfs.topPartitions == null)
+ continue;
+
+ TableMetadata metadata = cfs.metadata();
+ if (sizeWarnBytes != -1)
+ {
+ long estimatedSize =
cfs.topPartitions.topSizes().getEstimate(key);
+ if (estimatedSize > sizeWarnBytes)
+ {
+ if (sizeWarnings == null)
+ sizeWarnings = new HashMap<>();
+ sizeWarnings.put(tableId, estimatedSize);
+ noSpamLogger.warn("Write to {} partition {} triggered size
warning; " +
+ "estimated size is {} bytes, threshold
is {} bytes (see write_size_warn_threshold)",
+ metadata,
metadata.partitionKeyType.toCQLString(key.getKey()), estimatedSize,
sizeWarnBytes);
+ }
+ }
+
+ if (tombstoneWarnThreshold != -1)
+ {
+ long estimatedTombstones =
cfs.topPartitions.topTombstones().getEstimate(key);
+ if (estimatedTombstones > tombstoneWarnThreshold)
+ {
+ if (tombstoneWarnings == null)
+ tombstoneWarnings = new HashMap<>();
+ tombstoneWarnings.put(tableId, estimatedTombstones);
+ noSpamLogger.warn("Write to {} partition {} triggered
tombstone warning; " +
+ "estimated tombstone count is {},
threshold is {} (see write_tombstone_warn_threshold)",
+ metadata,
metadata.partitionKeyType.toCQLString(key.getKey()), estimatedTombstones,
tombstoneWarnThreshold);
+ }
+ }
+ }
+
+ if (sizeWarnings != null)
+ MessageParams.add(ParamType.WRITE_SIZE_WARN, sizeWarnings);
+ if (tombstoneWarnings != null)
+ MessageParams.add(ParamType.WRITE_TOMBSTONE_WARN,
tombstoneWarnings);
+ }
+}
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index 6f533defff..4491d64e8b 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -191,6 +191,10 @@ public class KeyspaceMetrics
public final Meter tooManySSTableIndexesReadWarnings;
public final Meter tooManySSTableIndexesReadAborts;
+
+ public final Meter writeSizeWarnings;
+ public final Meter writeTombstoneWarnings;
+
public final Meter bytesAnticompacted;
public final Meter bytesMutatedAnticompaction;
public final Meter bytesPreviewed;
@@ -309,6 +313,9 @@ public class KeyspaceMetrics
tooManySSTableIndexesReadWarnings =
createKeyspaceMeter("TooManySSTableIndexesReadWarnings");
tooManySSTableIndexesReadAborts =
createKeyspaceMeter("TooManySSTableIndexesReadAborts");
+ writeSizeWarnings = createKeyspaceMeter("WriteSizeWarnings");
+ writeTombstoneWarnings = createKeyspaceMeter("WriteTombstoneWarnings");
+
formatSpecificGauges = createFormatSpecificGauges(keyspace);
outOfRangeTokenReads = createKeyspaceCounter("ReadOutOfRangeToken");
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java
b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index a1ee119240..9e00988da7 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -307,6 +307,9 @@ public class TableMetrics
public final TableMeter tooManySSTableIndexesReadWarnings;
public final TableMeter tooManySSTableIndexesReadAborts;
+ public final TableMeter writeSizeWarnings;
+ public final TableMeter writeTombstoneWarnings;
+
public final ImmutableMap<SSTableFormat<?, ?>, ImmutableMap<String,
Gauge<? extends Number>>> formatSpecificGauges;
// Time spent building SSTableIntervalTree when constructing a new View
under the Tracker lock
@@ -907,6 +910,9 @@ public class TableMetrics
tooManySSTableIndexesReadWarnings =
createTableMeter("TooManySSTableIndexesReadWarnings",
cfs.keyspace.metric.tooManySSTableIndexesReadWarnings);
tooManySSTableIndexesReadAborts =
createTableMeter("TooManySSTableIndexesReadAborts",
cfs.keyspace.metric.tooManySSTableIndexesReadAborts);
+ writeSizeWarnings = createTableMeter("WriteSizeWarnings",
cfs.keyspace.metric.writeSizeWarnings);
+ writeTombstoneWarnings = createTableMeter("WriteTombstoneWarnings",
cfs.keyspace.metric.writeTombstoneWarnings);
+
viewSSTableIntervalTree =
createLatencyMetrics("ViewSSTableIntervalTree",
cfs.keyspace.metric.viewSSTableIntervalTree);
formatSpecificGauges = createFormatSpecificGauges(cfs);
diff --git a/src/java/org/apache/cassandra/metrics/TopPartitionTracker.java
b/src/java/org/apache/cassandra/metrics/TopPartitionTracker.java
index a5d5090394..eba0f14344 100644
--- a/src/java/org/apache/cassandra/metrics/TopPartitionTracker.java
+++ b/src/java/org/apache/cassandra/metrics/TopPartitionTracker.java
@@ -32,6 +32,8 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import com.carrotsearch.hppc.ObjectLongHashMap;
+import com.carrotsearch.hppc.ObjectLongMap;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.concurrent.ScheduledExecutors;
@@ -197,6 +199,7 @@ public class TopPartitionTracker implements Closeable
public static class TopHolder
{
public final NavigableSet<TopPartition> top;
+ public final ObjectLongMap<DecoratedKey> lookup;
private final int maxTopPartitionCount;
private final long minTrackedValue;
private final Collection<Range<Token>> ranges;
@@ -205,14 +208,15 @@ public class TopPartitionTracker implements Closeable
private TopHolder(int maxTopPartitionCount, long minTrackedValue,
Collection<Range<Token>> ranges)
{
- this(maxTopPartitionCount, minTrackedValue, new TreeSet<>(),
ranges, 0);
+ this(maxTopPartitionCount, minTrackedValue, new TreeSet<>(), new
ObjectLongHashMap<>(), ranges, 0);
}
- private TopHolder(int maxTopPartitionCount, long minTrackedValue,
NavigableSet<TopPartition> top, Collection<Range<Token>> ranges, long
lastUpdate)
+ private TopHolder(int maxTopPartitionCount, long minTrackedValue,
NavigableSet<TopPartition> top, ObjectLongMap<DecoratedKey> lookup,
Collection<Range<Token>> ranges, long lastUpdate)
{
this.maxTopPartitionCount = maxTopPartitionCount;
this.minTrackedValue = minTrackedValue;
this.top = top;
+ this.lookup = lookup;
this.ranges = ranges;
this.lastUpdate = lastUpdate;
}
@@ -224,6 +228,7 @@ public class TopPartitionTracker implements Closeable
this.maxTopPartitionCount = maxTopPartitionCount;
this.minTrackedValue = minTrackedValue;
top = new TreeSet<>();
+ this.lookup = new ObjectLongHashMap<>();
this.ranges = null;
this.lastUpdate = storedTopPartitions.lastUpdated;
@@ -243,9 +248,11 @@ public class TopPartitionTracker implements Closeable
private void track(TopPartition tp)
{
top.add(tp);
+ lookup.put(tp.key, tp.value);
while (top.size() > maxTopPartitionCount)
{
- top.pollLast();
+ TopPartition p = top.pollLast();
+ lookup.remove(p.key);
currentMinValue = top.last().value;
}
currentMinValue = Math.min(tp.value, currentMinValue);
@@ -274,7 +281,12 @@ public class TopPartitionTracker implements Closeable
private TopHolder cloneForMerging(long lastUpdate)
{
- return new TopHolder(maxTopPartitionCount, minTrackedValue, new
TreeSet<>(top), ranges, lastUpdate);
+ return new TopHolder(maxTopPartitionCount, minTrackedValue, new
TreeSet<>(top), new ObjectLongHashMap<>(lookup), ranges, lastUpdate);
+ }
+
+ public long getEstimate(DecoratedKey dk)
+ {
+ return lookup.getOrDefault(dk, 0L);
}
public String toString()
diff --git a/src/java/org/apache/cassandra/net/ParamType.java
b/src/java/org/apache/cassandra/net/ParamType.java
index 2367b1a390..5d3c92316f 100644
--- a/src/java/org/apache/cassandra/net/ParamType.java
+++ b/src/java/org/apache/cassandra/net/ParamType.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.net;
import javax.annotation.Nullable;
import org.apache.cassandra.io.IVersionedSerializer;
+import
org.apache.cassandra.service.writes.thresholds.WriteThresholdMapSerializer;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.Int32Serializer;
import org.apache.cassandra.utils.Int64Serializer;
@@ -54,7 +55,9 @@ public enum ParamType
ROW_INDEX_READ_SIZE_WARN (13, Int64Serializer.serializer),
CUSTOM_MAP (14, CustomParamsSerializer.serializer),
TOO_MANY_REFERENCED_INDEXES_WARN (16, Int32Serializer.serializer),
- TOO_MANY_REFERENCED_INDEXES_FAIL (17, Int32Serializer.serializer);
+ TOO_MANY_REFERENCED_INDEXES_FAIL (17, Int32Serializer.serializer),
+ WRITE_SIZE_WARN (18,
WriteThresholdMapSerializer.serializer),
+ WRITE_TOMBSTONE_WARN (19,
WriteThresholdMapSerializer.serializer);
final int id;
final IVersionedSerializer serializer;
diff --git
a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 5a0d16cf59..434f25055b 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -48,6 +49,9 @@ import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.locator.ReplicaPlan.ForWrite;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.service.writes.thresholds.CoordinatorWriteWarnings;
+import org.apache.cassandra.service.writes.thresholds.WriteWarningContext;
+import org.apache.cassandra.service.writes.thresholds.WriteWarningsSnapshot;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.transport.Dispatcher;
import org.apache.cassandra.utils.concurrent.Condition;
@@ -91,6 +95,9 @@ public abstract class AbstractWriteResponseHandler<T>
implements RequestCallback
private volatile Map<InetAddressAndPort, RequestFailureReason>
failureReasonByEndpoint;
private final Dispatcher.RequestTime requestTime;
private @Nullable final Supplier<Mutation> hintOnFailure;
+ private volatile WriteWarningContext warningContext;
+ private static final
AtomicReferenceFieldUpdater<AbstractWriteResponseHandler, WriteWarningContext>
warningsUpdater =
+
AtomicReferenceFieldUpdater.newUpdater(AbstractWriteResponseHandler.class,
WriteWarningContext.class, "warningContext");
/**
* Delegate to another WriteResponseHandler or possibly this one to track
if the ideal consistency level was reached.
@@ -176,7 +183,29 @@ public abstract class AbstractWriteResponseHandler<T>
implements RequestCallback
}
if (replicaPlan.stillAppliesTo(ClusterMetadata.current()))
- return;
+ {
+ if (warningContext != null)
+ {
+ WriteWarningsSnapshot snapshot = warningContext.snapshot();
+ if (!snapshot.isEmpty() && hintOnFailure != null)
+ CoordinatorWriteWarnings.update(hintOnFailure.get(),
snapshot);
+ }
+ }
+ }
+
+ protected WriteWarningContext getWarningContext()
+ {
+ WriteWarningContext current;
+ while (true)
+ {
+ current = warningContext;
+ if (current != null)
+ return current;
+
+ current = new WriteWarningContext();
+ if (warningsUpdater.compareAndSet(this, null, current))
+ return current;
+ }
}
private void throwTimeout()
diff --git
a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index 689b5f100d..b897298dbb 100644
---
a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++
b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@ -24,6 +24,7 @@ import java.util.function.Supplier;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.MessageParams;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.locator.Locator;
@@ -31,6 +32,8 @@ import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.ParamType;
+import org.apache.cassandra.service.writes.thresholds.WriteWarningContext;
import org.apache.cassandra.transport.Dispatcher;
/**
@@ -79,9 +82,22 @@ public class DatacenterSyncWriteResponseHandler<T> extends
AbstractWriteResponse
{
try
{
- String dataCenter = message == null
- ? locator.local().datacenter
- : locator.location(message.from()).datacenter;
+ Map<ParamType, Object> params;
+ String dataCenter;
+
+ if (message != null)
+ {
+ params = message.header.params();
+ dataCenter = locator.location(message.from()).datacenter;
+ }
+ else
+ {
+ params = MessageParams.capture();
+ dataCenter = locator.local().datacenter;
+ }
+
+ if (WriteWarningContext.isSupported(params.keySet()))
+ getWarningContext().updateCounters(params);
responses.get(dataCenter).getAndDecrement();
acks.incrementAndGet();
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java
b/src/java/org/apache/cassandra/service/StorageProxy.java
index ba660858a4..cc97da02aa 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -82,6 +82,7 @@ import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.RejectException;
import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.db.TruncateRequest;
+import org.apache.cassandra.db.WriteThresholds;
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
import org.apache.cassandra.db.partitions.FilteredPartition;
@@ -2023,7 +2024,13 @@ public class StorageProxy implements StorageProxyMBean
{
try
{
+ MessageParams.reset();
+
+ boolean trackWriteWarnings = description instanceof
Mutation && handler instanceof AbstractWriteResponseHandler &&
DatabaseDescriptor.getWriteThresholdsEnabled();
+ if (trackWriteWarnings)
+ WriteThresholds.checkWriteThresholds((Mutation)
description);
runnable.run();
+
handler.onResponse(null);
}
catch (Exception ex)
@@ -2032,6 +2039,10 @@ public class StorageProxy implements StorageProxyMBean
logger.error("Failed to apply mutation locally : ",
ex);
handler.onFailure(FBUtilities.getBroadcastAddressAndPort(),
RequestFailure.forException(ex));
}
+ finally
+ {
+ MessageParams.reset();
+ }
}
@Override
diff --git a/src/java/org/apache/cassandra/service/StorageService.java
b/src/java/org/apache/cassandra/service/StorageService.java
index dd0fe6e569..bac2342c12 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -4758,6 +4758,17 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
logger.info("updated tombstone_warn_threshold to {}", threshold);
}
+ public int getWriteTombstoneWarnThreshold()
+ {
+ return DatabaseDescriptor.getWriteTombstoneWarnThreshold();
+ }
+
+ public void setWriteTombstoneWarnThreshold(int threshold)
+ {
+ DatabaseDescriptor.setWriteTombstoneWarnThreshold(threshold);
+ logger.info("updated write_tombstone_warn_threshold to {}", threshold);
+ }
+
public int getTombstoneFailureThreshold()
{
return DatabaseDescriptor.getTombstoneFailureThreshold();
@@ -5328,6 +5339,30 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
DatabaseDescriptor.setLocalReadSizeWarnThreshold(parseDataStorageSpec(threshold));
}
+ @Override
+ public boolean getWriteThresholdsEnabled()
+ {
+ return DatabaseDescriptor.getWriteThresholdsEnabled();
+ }
+
+ @Override
+ public void setWriteThresholdsEnabled(boolean value)
+ {
+ DatabaseDescriptor.setWriteThresholdsEnabled(value);
+ }
+
+ @Override
+ public String getWriteTooLargeWarnThreshold()
+ {
+ return toString(DatabaseDescriptor.getWriteSizeWarnThreshold());
+ }
+
+ @Override
+ public void setWriteTooLargeWarnThreshold(String threshold)
+ {
+
DatabaseDescriptor.setWriteSizeWarnThreshold(parseDataStorageSpec(threshold));
+ }
+
@Override
public String getLocalReadTooLargeAbortThreshold()
{
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index d0929df8da..058bc8955f 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -1046,7 +1046,10 @@ public interface StorageServiceMBean extends
NotificationEmitter
public int getTombstoneWarnThreshold();
/** Sets the threshold for warning queries with many tombstones */
public void setTombstoneWarnThreshold(int tombstoneDebugThreshold);
-
+ /** Returns the threshold for write warning of queries with many
tombstones */
+ public int getWriteTombstoneWarnThreshold();
+ /** Sets the threshold for write warning queries with many tombstones */
+ public void setWriteTombstoneWarnThreshold(int
writeTombstoneDebugThreshold);
/** Returns the threshold for abandoning queries with many tombstones */
public int getTombstoneFailureThreshold();
/** Sets the threshold for abandoning queries with many tombstones */
@@ -1312,6 +1315,12 @@ public interface StorageServiceMBean extends
NotificationEmitter
public String getRowIndexReadSizeAbortThreshold();
public void setRowIndexReadSizeAbortThreshold(String value);
+ public boolean getWriteThresholdsEnabled();
+ public void setWriteThresholdsEnabled(boolean value);
+
+ public String getWriteTooLargeWarnThreshold();
+ public void setWriteTooLargeWarnThreshold(String value);
+
public void setDefaultKeyspaceReplicationFactor(int value);
public int getDefaultKeyspaceReplicationFactor();
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index 2a3665712b..d99aa3989f 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -17,16 +17,21 @@
*/
package org.apache.cassandra.service;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.db.MessageParams;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.WriteType;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.ParamType;
+import org.apache.cassandra.service.writes.thresholds.WriteWarningContext;
import org.apache.cassandra.transport.Dispatcher;
import org.apache.cassandra.utils.FBUtilities;
@@ -58,7 +63,13 @@ public class WriteResponseHandler<T> extends
AbstractWriteResponseHandler<T>
public void onResponse(Message<T> m)
{
- replicaPlan.collectSuccess(m == null ?
FBUtilities.getBroadcastAddressAndPort() : m.from());
+ InetAddressAndPort from = m == null ?
FBUtilities.getBroadcastAddressAndPort() : m.from();
+ Map<ParamType, Object> params = m != null ? m.header.params() :
MessageParams.capture();
+
+ if (WriteWarningContext.isSupported(params.keySet()))
+ getWarningContext().updateCounters(params);
+
+ replicaPlan.collectSuccess(from);
if (responsesUpdater.decrementAndGet(this) == 0)
signal();
//Must be last after all subclass processing
diff --git
a/src/java/org/apache/cassandra/service/reads/thresholds/CoordinatorWarnings.java
b/src/java/org/apache/cassandra/service/reads/thresholds/CoordinatorWarnings.java
index fdb68a56a8..7ea6b3c07a 100644
---
a/src/java/org/apache/cassandra/service/reads/thresholds/CoordinatorWarnings.java
+++
b/src/java/org/apache/cassandra/service/reads/thresholds/CoordinatorWarnings.java
@@ -31,8 +31,7 @@ import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.service.ClientWarn;
-
-import io.netty.util.concurrent.FastThreadLocal;
+import org.apache.cassandra.service.thresholds.CoordinatorWarningsState;
import static
org.apache.cassandra.config.CassandraRelevantProperties.READS_THRESHOLDS_COORDINATOR_DEFENSIVE_CHECKS_ENABLED;
@@ -41,34 +40,38 @@ public class CoordinatorWarnings
private static final Logger logger =
LoggerFactory.getLogger(CoordinatorWarnings.class);
private static final boolean ENABLE_DEFENSIVE_CHECKS =
READS_THRESHOLDS_COORDINATOR_DEFENSIVE_CHECKS_ENABLED.getBoolean();
- // when .init() is called set the STATE to be INIT; this is to lazy
allocate the map only when warnings are generated
private static final Map<ReadCommand, WarningsSnapshot> INIT =
Collections.emptyMap();
- private static final FastThreadLocal<Map<ReadCommand, WarningsSnapshot>>
STATE = new FastThreadLocal<>();
- private CoordinatorWarnings() {}
+ private static final CoordinatorWarningsState<Map<ReadCommand,
WarningsSnapshot>> STATE =
+ new CoordinatorWarningsState<>("CoordinatorWarnings",
+ INIT,
+ null, // reset() calls threadLocal.remove()
when emptySentinel is null
+ HashMap::new,
+ IgnoreMap::get,
+ logger,
+ ENABLE_DEFENSIVE_CHECKS);
+
+ private CoordinatorWarnings()
+ {
+ }
public static void init()
{
- logger.trace("CoordinatorTrackWarnings.init()");
- if (STATE.get() != null)
- {
- if (ENABLE_DEFENSIVE_CHECKS)
- throw new AssertionError("CoordinatorTrackWarnings.init called
while state is not null: " + STATE.get());
- return;
- }
- STATE.set(INIT);
+ STATE.init();
}
public static void reset()
{
- logger.trace("CoordinatorTrackWarnings.reset()");
- STATE.remove();
+ STATE.reset();
}
public static void update(ReadCommand cmd, WarningsSnapshot snapshot)
{
- logger.trace("CoordinatorTrackWarnings.update({}, {})",
cmd.metadata(), snapshot);
- Map<ReadCommand, WarningsSnapshot> map = mutable();
+ if (logger.isTraceEnabled())
+ logger.trace("CoordinatorTrackWarnings.update({}, {})",
cmd.metadata(), snapshot);
+
+ Map<ReadCommand, WarningsSnapshot> map = STATE.mutable();
+
WarningsSnapshot previous = map.get(cmd);
WarningsSnapshot update = WarningsSnapshot.merge(previous, snapshot);
if (update == null) // null happens when the merge had null input or
EMPTY input... remove the command from the map
@@ -79,8 +82,17 @@ public class CoordinatorWarnings
public static void done()
{
- Map<ReadCommand, WarningsSnapshot> map = readonly();
- logger.trace("CoordinatorTrackWarnings.done() with state {}", map);
+ STATE.processAndReset(CoordinatorWarnings::processWarnings);
+ }
+
+ private static void processWarnings(Map<ReadCommand, WarningsSnapshot> map)
+ {
+ if (map == INIT || map.isEmpty())
+ return;
+
+ if (logger.isTraceEnabled())
+ logger.trace("CoordinatorTrackWarnings.done() with state {}", map);
+
map.forEach((command, merged) -> {
ColumnFamilyStore cfs =
Schema.instance.getColumnFamilyStoreInstance(command.metadata().id);
// race condition when dropping tables, also happens in unit tests
as Schema may be bypassed
@@ -101,50 +113,6 @@ public class CoordinatorWarnings
recordAborts(merged.indexReadSSTablesCount, cql, loggableTokens,
cfs.metric.tooManySSTableIndexesReadAborts,
WarningsSnapshot::tooManyIndexesReadAbortMessage);
recordWarnings(merged.indexReadSSTablesCount, cql, loggableTokens,
cfs.metric.tooManySSTableIndexesReadWarnings,
WarningsSnapshot::tooManyIndexesReadWarnMessage);
});
-
- // reset the state to block from double publishing
- clearState();
- }
-
- private static Map<ReadCommand, WarningsSnapshot> mutable()
- {
- Map<ReadCommand, WarningsSnapshot> map = STATE.get();
- if (map == null)
- {
- if (ENABLE_DEFENSIVE_CHECKS)
- throw new AssertionError("CoordinatorTrackWarnings.mutable
calling without calling .init() first");
- // set map to an "ignore" map; dropping all mutations
- // since init was not called, it isn't clear that the state will
be cleaned up, so avoid populating
- map = IgnoreMap.get();
- }
- else if (map == INIT)
- {
- map = new HashMap<>();
- STATE.set(map);
- }
- return map;
- }
-
- private static Map<ReadCommand, WarningsSnapshot> readonly()
- {
- Map<ReadCommand, WarningsSnapshot> map = STATE.get();
- if (map == null)
- {
- if (ENABLE_DEFENSIVE_CHECKS)
- throw new AssertionError("CoordinatorTrackWarnings.readonly
calling without calling .init() first");
- // since init was not called, it isn't clear that the state will
be cleaned up, so avoid populating
- map = Collections.emptyMap();
- }
- return map;
- }
-
- private static void clearState()
- {
- Map<ReadCommand, WarningsSnapshot> map = STATE.get();
- if (map == null || map == INIT)
- return;
- // map is mutable, so set to INIT
- STATE.set(INIT);
}
// utility interface to let callers use static functions
diff --git
a/src/java/org/apache/cassandra/service/thresholds/CoordinatorWarningsState.java
b/src/java/org/apache/cassandra/service/thresholds/CoordinatorWarningsState.java
new file mode 100644
index 0000000000..1ace6701eb
--- /dev/null
+++
b/src/java/org/apache/cassandra/service/thresholds/CoordinatorWarningsState.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.thresholds;
+
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+
+import io.netty.util.concurrent.FastThreadLocal;
+
+/**
+ * Generic ThreadLocal state manager for coordinator warnings.
+ * Provides lifecycle management (init, update, done, reset) for accumulating
+ * warnings across a client request.
+ *
+ * @param <S> the type of state to manage (e.g., Map, custom holder class)
+ */
+public class CoordinatorWarningsState<S>
+{
+ private final FastThreadLocal<S> threadLocal;
+ private final S initSentinel;
+ private final S emptySentinel;
+ private final Supplier<S> stateFactory;
+ private final Supplier<S> fallbackSupplier;
+ private final Logger logger;
+ private final String name;
+ private final boolean enableDefensiveChecks;
+
+ /**
+ * Creates a new coordinator warnings state manager.
+ *
+ * @param name descriptive name for logging
+ * @param initSentinel sentinel value indicating initialized but empty
state
+ * @param emptySentinel sentinel value indicating cleared state
+ * @param stateFactory factory to create new mutable state instances
+ * @param fallbackSupplier supplies fallback state if init() was not called
+ * @param logger logger for diagnostic messages
+ * @param enableDefensiveChecks whether to enable defensive checks
+ */
+ public CoordinatorWarningsState(String name,
+ S initSentinel,
+ @Nullable S emptySentinel,
+ Supplier<S> stateFactory,
+ Supplier<S> fallbackSupplier,
+ Logger logger,
+ boolean enableDefensiveChecks)
+ {
+ this.name = name;
+ this.initSentinel = initSentinel;
+ this.emptySentinel = emptySentinel;
+ this.stateFactory = stateFactory;
+ this.fallbackSupplier = fallbackSupplier;
+ this.logger = logger;
+ this.enableDefensiveChecks = enableDefensiveChecks;
+ this.threadLocal = new FastThreadLocal<>();
+ }
+
+ /**
+ * Initialize state for this thread. Must be called at the start of a
client request.
+ */
+ public void init()
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("{}.init()", name);
+
+ if (enableDefensiveChecks)
+ {
+ S current = threadLocal.get();
+ if (current != null)
+ throw new AssertionError(name + ".init called while state is
not null: " + current);
+ }
+ threadLocal.set(initSentinel);
+ }
+
+ /**
+ * Reset/clear state for this thread.
+ */
+ public void reset()
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("{}.reset()", name);
+
+ if (emptySentinel != null)
+ threadLocal.set(emptySentinel);
+ else
+ threadLocal.remove();
+ }
+
+ /**
+ * Gets the mutable state for this thread, lazily creating it if needed.
+ * Transitions from initSentinel to a new mutable instance on first access.
+ *
+ * @return mutable state instance
+ */
+ public S mutable()
+ {
+ S state = threadLocal.get();
+
+ if (state == null)
+ {
+ if (enableDefensiveChecks)
+ throw new AssertionError(name + " accessing state without
calling .init() first");
+ return fallbackSupplier.get();
+ }
+
+ if (state == initSentinel)
+ {
+ state = stateFactory.get();
+ threadLocal.set(state);
+ }
+
+ return state;
+ }
+
+ /**
+ * Process accumulated state and reset.
+ * Must be called at the end of a client request.
+ * The reset will occur even if the processor throws an exception.
+ *
+ * @param processor processes the state
+ */
+ public void processAndReset(Consumer<S> processor)
+ {
+ try
+ {
+ S state = threadLocal.get();
+ if (state == null || state == initSentinel)
+ return;
+
+ processor.accept(state);
+ }
+ finally
+ {
+ reset();
+ }
+ }
+}
\ No newline at end of file
diff --git
a/src/java/org/apache/cassandra/service/writes/thresholds/CoordinatorWriteWarnings.java
b/src/java/org/apache/cassandra/service/writes/thresholds/CoordinatorWriteWarnings.java
new file mode 100644
index 0000000000..7ae35be257
--- /dev/null
+++
b/src/java/org/apache/cassandra/service/writes/thresholds/CoordinatorWriteWarnings.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.writes.thresholds;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DurationSpec;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.thresholds.CoordinatorWarningsState;
+import org.apache.cassandra.utils.Clock;
+
+public class CoordinatorWriteWarnings
+{
+ private static final Logger logger =
LoggerFactory.getLogger(CoordinatorWriteWarnings.class);
+
+ private static final Warnings INIT = new Warnings();
+ private static final Warnings EMPTY = new Warnings();
+ private static final AtomicLong nextWarn = new AtomicLong(0);
+
+ private static boolean timeForWarn()
+ {
+ DurationSpec.LongMillisecondsBound interval =
DatabaseDescriptor.getCoordinatorWriteWarnInterval();
+ if (interval == null || interval.toMilliseconds() == 0)
+ return true;
+
+ long now = Clock.Global.nanoTime();
+ long next = nextWarn.get();
+ return now > next && nextWarn.compareAndSet(next, now +
TimeUnit.MILLISECONDS.toNanos(interval.toMilliseconds()));
+ }
+
+ private static final CoordinatorWarningsState<Warnings> STATE =
+ new CoordinatorWarningsState<>("CoordinatorWriteWarnings",
+ INIT,
+ EMPTY,
+ Warnings::new,
+ () -> EMPTY,
+ logger,
+ false);
+
+ /**
+ * Initialize coordinator write warnings for this thread. Must be called
at the start of a client request.
+ */
+ public static void init()
+ {
+ STATE.init();
+ }
+
+ /**
+ * Update warnings for a partition after receiving responses from replicas.
+ *
+ * @param mutation the mutation that was written
+ * @param snapshot the aggregated warnings from replicas
+ */
+ public static void update(IMutation mutation, WriteWarningsSnapshot
snapshot)
+ {
+ if (snapshot.isEmpty()) return;
+
+ Warnings warnings = STATE.mutable();
+ if (warnings == EMPTY) return;
+
+ warnings.merge(mutation.key(), snapshot);
+ }
+
+ /**
+ * Process accumulated warnings: send to client and update metrics.
+ * Must be called at the end of a client request.
+ */
+ public static void done()
+ {
+ STATE.processAndReset(CoordinatorWriteWarnings::processWarnings);
+ }
+
+ /**
+ * Reset/clear warnings for this thread.
+ */
+ public static void reset()
+ {
+ STATE.reset();
+ }
+
+ private static void processWarnings(Warnings warnings)
+ {
+
+ if (warnings.snapshot == null || warnings.snapshot.isEmpty() ||
warnings.partitionKey == null)
+ return;
+
+ if (!timeForWarn())
+ return;
+
+ WriteWarningsSnapshot snapshot = warnings.snapshot;
+
+ for (Map.Entry<TableId, Long> entry :
snapshot.writeSize.tableValues.entrySet())
+ {
+ TableId tableId = entry.getKey();
+ ColumnFamilyStore cfs =
Schema.instance.getColumnFamilyStoreInstance(tableId);
+ if (cfs == null)
+ {
+ logger.warn("ColumnFamilyStore is null for table {},
skipping", tableId);
+ continue;
+ }
+
+ TableMetadata metadata = cfs.metadata();
+ String partitionKey =
metadata.partitionKeyType.toCQLString(warnings.partitionKey.getKey());
+ String msg = String.format("Write to %s.%s partition %s: %s",
+ metadata.keyspace,
+ metadata.name,
+ partitionKey,
+
WriteWarningsSnapshot.writeSizeWarnMessage(entry.getValue()));
+ ClientWarn.instance.warn(msg);
+ logger.warn(msg);
+ cfs.metric.writeSizeWarnings.mark();
+ }
+
+ for (Map.Entry<TableId, Long> entry :
snapshot.writeTombstone.tableValues.entrySet())
+ {
+ TableId tableId = entry.getKey();
+ ColumnFamilyStore cfs =
Schema.instance.getColumnFamilyStoreInstance(tableId);
+ if (cfs == null)
+ {
+ logger.warn("ColumnFamilyStore is null for table {},
skipping", tableId);
+ continue;
+ }
+
+ TableMetadata metadata = cfs.metadata();
+ String partitionKey =
metadata.partitionKeyType.toCQLString(warnings.partitionKey.getKey());
+ String msg = String.format("Write to %s.%s partition %s: %s",
+ metadata.keyspace,
+ metadata.name,
+ partitionKey,
+
WriteWarningsSnapshot.writeTombstoneWarnMessage(entry.getValue()));
+ ClientWarn.instance.warn(msg);
+ logger.warn(msg);
+ cfs.metric.writeTombstoneWarnings.mark();
+ }
+ }
+
+ /**
+ * Internal state holder for accumulated warnings.
+ * A Mutation is always for a single partition key, so we store it once
+ * and track warnings per table within that partition.
+ */
+ private static class Warnings
+ {
+ @Nullable
+ DecoratedKey partitionKey;
+
+ @Nullable
+ WriteWarningsSnapshot snapshot;
+
+ void merge(DecoratedKey partitionKey, WriteWarningsSnapshot snapshot)
+ {
+ if (this.partitionKey == null)
+ this.partitionKey = partitionKey;
+
+ this.snapshot = this.snapshot == null ? snapshot :
this.snapshot.merge(snapshot);
+ }
+ }
+}
diff --git
a/src/java/org/apache/cassandra/service/writes/thresholds/WarnCounter.java
b/src/java/org/apache/cassandra/service/writes/thresholds/WarnCounter.java
new file mode 100644
index 0000000000..78518fe8df
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/writes/thresholds/WarnCounter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.writes.thresholds;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.schema.TableId;
+
+public class WarnCounter
+{
+ private final ConcurrentHashMap<TableId, AtomicLong> tableValues = new
ConcurrentHashMap<>();
+
+ void addWarning(Map<TableId, Long> incoming)
+ {
+ for (Map.Entry<TableId, Long> entry : incoming.entrySet())
+ tableValues.computeIfAbsent(entry.getKey(), k -> new AtomicLong())
+ .accumulateAndGet(entry.getValue(), Math::max);
+ }
+
+ public WriteThresholdCounter snapshot()
+ {
+ ImmutableMap.Builder<TableId, Long> builder = ImmutableMap.builder();
+ for (Map.Entry<TableId, AtomicLong> entry : tableValues.entrySet())
+ builder.put(entry.getKey(), entry.getValue().get());
+
+ return WriteThresholdCounter.create(builder.build());
+ }
+}
diff --git
a/src/java/org/apache/cassandra/service/writes/thresholds/WriteThresholdCounter.java
b/src/java/org/apache/cassandra/service/writes/thresholds/WriteThresholdCounter.java
new file mode 100644
index 0000000000..a57baa01e5
--- /dev/null
+++
b/src/java/org/apache/cassandra/service/writes/thresholds/WriteThresholdCounter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.writes.thresholds;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.schema.TableId;
+
+public class WriteThresholdCounter
+{
+ private static final WriteThresholdCounter EMPTY = new
WriteThresholdCounter(ImmutableMap.of());
+ public final ImmutableMap<TableId, Long> tableValues;
+
+ private WriteThresholdCounter(ImmutableMap<TableId, Long> tableValues)
+ {
+ this.tableValues = tableValues;
+ }
+
+ public static WriteThresholdCounter empty()
+ {
+ return EMPTY;
+ }
+
+ public boolean isEmpty()
+ {
+ return tableValues.isEmpty();
+ }
+
+ public static WriteThresholdCounter create(Map<TableId, Long> snapshot)
+ {
+ if (snapshot.isEmpty())
+ return EMPTY;
+ return new WriteThresholdCounter(ImmutableMap.copyOf(snapshot));
+ }
+
+ public WriteThresholdCounter merge(WriteThresholdCounter other)
+ {
+ if (other == EMPTY)
+ return this;
+ if (this == EMPTY)
+ return other;
+ Map<TableId, Long> merged = new HashMap<>(tableValues);
+ for (Map.Entry<TableId, Long> entry : other.tableValues.entrySet())
+ merged.merge(entry.getKey(), entry.getValue(), Math::max);
+ return new WriteThresholdCounter(ImmutableMap.copyOf(merged));
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ WriteThresholdCounter that = (WriteThresholdCounter) o;
+ return Objects.equals(tableValues, that.tableValues);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(tableValues);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "WriteThresholdCounter{tableValues=" + tableValues + '}';
+ }
+}
diff --git
a/src/java/org/apache/cassandra/service/writes/thresholds/WriteThresholdMapSerializer.java
b/src/java/org/apache/cassandra/service/writes/thresholds/WriteThresholdMapSerializer.java
new file mode 100644
index 0000000000..bc2cc863ad
--- /dev/null
+++
b/src/java/org/apache/cassandra/service/writes/thresholds/WriteThresholdMapSerializer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.writes.thresholds;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.TableId;
+
+public class WriteThresholdMapSerializer implements
IVersionedSerializer<Map<TableId, Long>>
+{
+ public static final WriteThresholdMapSerializer serializer = new
WriteThresholdMapSerializer();
+
+ @Override
+ public void serialize(Map<TableId, Long> t, DataOutputPlus out, int
version) throws IOException
+ {
+ out.writeUnsignedVInt32(t.size());
+ for (Map.Entry<TableId, Long> entry : t.entrySet())
+ {
+ entry.getKey().serialize(out);
+ out.writeUnsignedVInt(entry.getValue());
+ }
+ }
+
+ @Override
+ public Map<TableId, Long> deserialize(DataInputPlus in, int version)
throws IOException
+ {
+ int size = in.readUnsignedVInt32();
+ Map<TableId, Long> result = Maps.newHashMapWithExpectedSize(size);
+ for (int i = 0; i < size; i++)
+ result.put(TableId.deserialize(in), in.readUnsignedVInt());
+ return result;
+ }
+
+ @Override
+ public long serializedSize(Map<TableId, Long> t, int version)
+ {
+ long size = TypeSizes.sizeofUnsignedVInt(t.size());
+ for(Map.Entry<TableId, Long> entry : t.entrySet())
+ {
+ size += entry.getKey().serializedSize();
+ size += TypeSizes.sizeofUnsignedVInt(entry.getValue());
+ }
+ return size;
+ }
+}
diff --git
a/src/java/org/apache/cassandra/service/writes/thresholds/WriteWarningContext.java
b/src/java/org/apache/cassandra/service/writes/thresholds/WriteWarningContext.java
new file mode 100644
index 0000000000..1d46b15e3b
--- /dev/null
+++
b/src/java/org/apache/cassandra/service/writes/thresholds/WriteWarningContext.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.writes.thresholds;
+
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.net.ParamType;
+import org.apache.cassandra.schema.TableId;
+
+/**
+ * Accumulates write warning information from replica responses.
+ * Similar to WarningContext but for write operations (warnings only, no
aborts).
+ */
+public class WriteWarningContext
+{
+ private static final EnumSet<ParamType> SUPPORTED = EnumSet.of(
+ ParamType.WRITE_SIZE_WARN,
+ ParamType.WRITE_TOMBSTONE_WARN
+ );
+
+ final WarnCounter writeSize = new WarnCounter();
+ final WarnCounter writeTombstone = new WarnCounter();
+
+ public static boolean isSupported(Set<ParamType> keys)
+ {
+ return !Collections.disjoint(keys, SUPPORTED);
+ }
+
+ /**
+ * Update counters from replica response parameters. Writes never abort,
so this always returns without throwing.
+ */
+ public void updateCounters(Map<ParamType, Object> params)
+ {
+ Object value = params.get(ParamType.WRITE_SIZE_WARN);
+ if (value != null)
+ writeSize.addWarning((Map<TableId, Long>) value);
+
+ value = params.get(ParamType.WRITE_TOMBSTONE_WARN);
+ if (value != null)
+ writeTombstone.addWarning((Map<TableId, Long>) value);
+ }
+
+ public WriteWarningsSnapshot snapshot()
+ {
+ return WriteWarningsSnapshot.create(writeSize.snapshot(),
writeTombstone.snapshot());
+ }
+}
diff --git
a/src/java/org/apache/cassandra/service/writes/thresholds/WriteWarningsSnapshot.java
b/src/java/org/apache/cassandra/service/writes/thresholds/WriteWarningsSnapshot.java
new file mode 100644
index 0000000000..a8195ea5b8
--- /dev/null
+++
b/src/java/org/apache/cassandra/service/writes/thresholds/WriteWarningsSnapshot.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.writes.thresholds;
+
+import java.util.Objects;
+
+/**
+ * Immutable snapshot of write warnings. Simpler than WarningsSnapshot since
writes never abort (warnings only).
+ */
+public class WriteWarningsSnapshot
+{
+ private static final WriteWarningsSnapshot EMPTY = new
WriteWarningsSnapshot(WriteThresholdCounter.empty(),
WriteThresholdCounter.empty());
+
+ public final WriteThresholdCounter writeSize;
+ public final WriteThresholdCounter writeTombstone;
+
+ private WriteWarningsSnapshot(WriteThresholdCounter writeSize,
WriteThresholdCounter writeTombstone)
+ {
+ this.writeSize = writeSize;
+ this.writeTombstone = writeTombstone;
+ }
+
+ public static WriteWarningsSnapshot create(WriteThresholdCounter
writeSize, WriteThresholdCounter writeTombstone)
+ {
+ if (writeSize.isEmpty() && writeTombstone.isEmpty())
+ return EMPTY;
+ return new WriteWarningsSnapshot(writeSize, writeTombstone);
+ }
+
+ public boolean isEmpty()
+ {
+ return this == EMPTY;
+ }
+
+ public WriteWarningsSnapshot merge(WriteWarningsSnapshot other)
+ {
+ if (other == null || other == EMPTY)
+ return this;
+ if (this == EMPTY)
+ return other;
+ return WriteWarningsSnapshot.create(writeSize.merge(other.writeSize),
writeTombstone.merge(other.writeTombstone));
+ }
+
+ public static String writeSizeWarnMessage(long bytes)
+ {
+ return String.format("Write to large partition; estimated size is %d
bytes (see write_size_warn_threshold)", bytes);
+ }
+
+ public static String writeTombstoneWarnMessage(long tombstones)
+ {
+ return String.format("Write to partition with many tombstones;
estimated count is %d (see write_tombstone_warn_threshold)", tombstones);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ WriteWarningsSnapshot that = (WriteWarningsSnapshot) o;
+ return Objects.equals(writeSize, that.writeSize) &&
Objects.equals(writeTombstone, that.writeTombstone);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(writeSize, writeTombstone);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "WriteWarningsSnapshot{" +
+ "writeSize=" + writeSize +
+ ", writeTombstone=" + writeTombstone +
+ '}';
+ }
+}
diff --git a/src/java/org/apache/cassandra/transport/Dispatcher.java
b/src/java/org/apache/cassandra/transport/Dispatcher.java
index ccec6817c3..3f6468c218 100644
--- a/src/java/org/apache/cassandra/transport/Dispatcher.java
+++ b/src/java/org/apache/cassandra/transport/Dispatcher.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.net.FrameEncoder;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.reads.thresholds.CoordinatorWarnings;
+import org.apache.cassandra.service.writes.thresholds.CoordinatorWriteWarnings;
import org.apache.cassandra.transport.ClientResourceLimits.Overload;
import org.apache.cassandra.transport.Flusher.FlushItem;
import org.apache.cassandra.transport.messages.ErrorMessage;
@@ -382,7 +383,10 @@ public class Dispatcher implements
CQLMessageHandler.MessageConsumer<Message.Req
// even if ClientWarn is disabled, still setup
CoordinatorTrackWarnings, as this will populate metrics and
// emit logs on the server; the warnings will just be ignored and not
sent to the client
if (request.isTrackable())
+ {
CoordinatorWarnings.init();
+ CoordinatorWriteWarnings.init();
+ }
switch (backpressure)
{
@@ -425,7 +429,10 @@ public class Dispatcher implements
CQLMessageHandler.MessageConsumer<Message.Req
Message.Response response = request.execute(qstate, requestTime);
if (request.isTrackable())
+ {
CoordinatorWarnings.done();
+ CoordinatorWriteWarnings.done();
+ }
response.setStreamId(request.getStreamId());
response.setWarnings(ClientWarn.instance.getWarnings());
@@ -448,7 +455,10 @@ public class Dispatcher implements
CQLMessageHandler.MessageConsumer<Message.Req
JVMStabilityInspector.inspectThrowable(t);
if (request.isTrackable())
+ {
CoordinatorWarnings.done();
+ CoordinatorWriteWarnings.done();
+ }
Predicate<Throwable> handler =
ExceptionHandlers.getUnexpectedExceptionHandler(channel, true);
ErrorMessage error = ErrorMessage.fromException(t, handler);
@@ -459,6 +469,7 @@ public class Dispatcher implements
CQLMessageHandler.MessageConsumer<Message.Req
finally
{
CoordinatorWarnings.reset();
+ CoordinatorWriteWarnings.reset();
ClientWarn.instance.resetWarnings();
}
}
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index 20c1d2fd70..07200f2d16 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -67,6 +67,11 @@ local_read_size_warn_threshold: 4096KiB
local_read_size_fail_threshold: 8192KiB
row_index_read_size_warn_threshold: 4096KiB
row_index_read_size_fail_threshold: 8192KiB
+write_thresholds_enabled: true
+write_size_warn_threshold: 4096KiB
+write_tombstone_warn_threshold: 1000
+min_tracked_partition_size: 1MiB
+min_tracked_partition_tombstone_count: 500
accord:
enabled: true
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/CoordinatorHelper.java
b/test/distributed/org/apache/cassandra/distributed/impl/CoordinatorHelper.java
index 414b30e05c..2100217d22 100644
---
a/test/distributed/org/apache/cassandra/distributed/impl/CoordinatorHelper.java
+++
b/test/distributed/org/apache/cassandra/distributed/impl/CoordinatorHelper.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.reads.thresholds.CoordinatorWarnings;
+import org.apache.cassandra.service.writes.thresholds.CoordinatorWriteWarnings;
import org.apache.cassandra.transport.Dispatcher;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.transport.messages.ResultMessage;
@@ -62,6 +63,7 @@ public class CoordinatorHelper
// warnings as it sets a new State instance on the ThreadLocal.
ClientWarn.instance.captureWarnings();
CoordinatorWarnings.init();
+ CoordinatorWriteWarnings.init();
try
{
ResultMessage res = prepared.execute(QueryState.forInternalCalls(),
@@ -76,19 +78,27 @@ public class CoordinatorHelper
requestTime);
// Collect warnings reported during the query.
CoordinatorWarnings.done();
- if (res != null)
- res.setWarnings(ClientWarn.instance.getWarnings());
+ CoordinatorWriteWarnings.done();
+
+ // Convert null result to ResultMessage.Void, matching
QueryProcessor.processStatement() behavior
+ // This is necessary to attach warnings to INSERT/UPDATE/DELETE
statements which return null
+ if (res == null)
+ res = new ResultMessage.Void();
+
+ res.setWarnings(ClientWarn.instance.getWarnings());
return RowUtil.toQueryResult(res);
}
catch (Exception | Error e)
{
CoordinatorWarnings.done();
+ CoordinatorWriteWarnings.done();
throw e;
}
finally
{
CoordinatorWarnings.reset();
+ CoordinatorWriteWarnings.reset();
ClientWarn.instance.resetWarnings();
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index ac1da245d5..791465eb60 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -141,6 +141,7 @@ import org.apache.cassandra.service.paxos.PaxosState;
import org.apache.cassandra.service.paxos.uncommitted.UncommittedTableData;
import org.apache.cassandra.service.reads.thresholds.CoordinatorWarnings;
import org.apache.cassandra.service.snapshot.SnapshotManager;
+import org.apache.cassandra.service.writes.thresholds.CoordinatorWriteWarnings;
import org.apache.cassandra.streaming.StreamManager;
import org.apache.cassandra.streaming.StreamReceiveTask;
import org.apache.cassandra.streaming.StreamTransferTask;
@@ -303,12 +304,14 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
{
ClientWarn.instance.captureWarnings();
CoordinatorWarnings.init();
+ CoordinatorWriteWarnings.init();
try
{
QueryHandler.Prepared prepared =
QueryProcessor.prepareInternal(query);
ResultMessage result =
prepared.statement.executeLocally(QueryProcessor.internalQueryState(),
QueryProcessor.makeInternalOptions(prepared.statement, args));
CoordinatorWarnings.done();
+ CoordinatorWriteWarnings.done();
if (result != null)
result.setWarnings(ClientWarn.instance.getWarnings());
@@ -317,11 +320,13 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
catch (Exception | Error e)
{
CoordinatorWarnings.done();
+ CoordinatorWriteWarnings.done();
throw e;
}
finally
{
CoordinatorWarnings.reset();
+ CoordinatorWriteWarnings.reset();
ClientWarn.instance.resetWarnings();
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/thresholds/AbstractWriteThresholdWarning.java
b/test/distributed/org/apache/cassandra/distributed/test/thresholds/AbstractWriteThresholdWarning.java
new file mode 100644
index 0000000000..36f4e2c4bf
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/thresholds/AbstractWriteThresholdWarning.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.thresholds;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.SimpleStatement;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DurationSpec;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.JavaDriverUtils;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Base class for write threshold warning distributed tests.
+ * Tests coordinator-side warning aggregation from replica responses.
+ */
+public abstract class AbstractWriteThresholdWarning extends TestBaseImpl
+{
+ protected static ICluster<IInvokableInstance> CLUSTER;
+ protected static com.datastax.driver.core.Cluster JAVA_DRIVER;
+ protected static com.datastax.driver.core.Session JAVA_DRIVER_SESSION;
+
+ @BeforeClass
+ public static void setupClass() throws IOException
+ {
+ Cluster.Builder builder = Cluster.build(3);
+ builder.withConfig(c -> c.with(Feature.NATIVE_PROTOCOL,
Feature.GOSSIP));
+ CLUSTER = builder.start();
+ JAVA_DRIVER = JavaDriverUtils.create(CLUSTER);
+ JAVA_DRIVER_SESSION = JAVA_DRIVER.connect();
+ }
+
+ protected abstract long totalWarnings();
+ protected abstract void assertWarnings(List<String> warnings);
+ protected abstract void populateTopPartitions(int pk, long value);
+
+ @Before
+ public void setup()
+ {
+ CLUSTER.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
+ CLUSTER.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH
replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+ CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck
int, v blob, PRIMARY KEY (pk, ck))");
+ }
+
+ @Test
+ public void noTopPartitionsFeatureEnabled()
+ {
+ noTopPartitionsTest(true);
+ }
+
+ @Test
+ public void noTopPartitionsFeatureDisabled()
+ {
+ noTopPartitionsTest(false);
+ }
+
+ private void noTopPartitionsTest(boolean featureEnabled)
+ {
+ enable(featureEnabled);
+
+ CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk,
ck, v) VALUES (1, 1, ?)",
+ ConsistencyLevel.ALL, bytes(512));
+
+ // Should have no warnings regardless of feature state
+ SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(
+ "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, ?)",
+ ConsistencyLevel.ALL, bytes(512));
+ assertThat(result.warnings()).isEmpty();
+
+ assertWarningsCount(0);
+ }
+
+ @Test
+ public void topPartitionsExistFeatureDisabled()
+ {
+ // Populate TopPartitionTracker with high value
+ populateTopPartitions(1, getWarnThreshold() * 2);
+
+ enable(false);
+
+ // Write to the top partition
+ SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(
+ "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)",
+ ConsistencyLevel.ALL, bytes(512));
+
+ // Should have no warnings when feature is disabled
+ assertThat(result.warnings()).isEmpty();
+ assertWarningsCount(0);
+ }
+
+ @Test
+ public void topPartitionExistsFeatureEnabledPartitionIsTop()
+ {
+ // Populate TopPartitionTracker with value above warn threshold
+ populateTopPartitions(1, getWarnThreshold() * 2);
+
+ enable(true);
+
+ // Write to the top partition
+ SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(
+ "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)",
+ ConsistencyLevel.ALL, bytes(512));
+
+ // Should have warning
+ assertWarnings(result.warnings());
+ assertWarningsCount(1);
+ }
+
+ @Test
+ public void topPartitionExistsFeatureEnabledPartitionNotTop()
+ {
+ // Populate TopPartitionTracker for pk=1 with high value
+ populateTopPartitions(1, getWarnThreshold() * 2);
+
+ enable(true);
+
+ // Write to a different partition (pk=2) that's not in
TopPartitionTracker
+ SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(
+ "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (2, 1, ?)",
+ ConsistencyLevel.ALL, bytes(512));
+
+ // Should have no warnings - pk=2 is not tracked
+ assertThat(result.warnings()).isEmpty();
+ assertWarningsCount(0);
+ }
+
+ @Test
+ public void topPartitionBelowThreshold()
+ {
+ // Populate TopPartitionTracker with value BELOW warn threshold
+ populateTopPartitions(1, getWarnThreshold() / 2);
+
+ enable(true);
+
+ // Write to the partition
+ SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(
+ "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)",
+ ConsistencyLevel.ALL, bytes(512));
+
+ // Should have no warnings - value is below threshold
+ assertThat(result.warnings()).isEmpty();
+ assertWarningsCount(0);
+ }
+
+ @Test
+ public void multipleWritesToTopPartition()
+ {
+ // Populate TopPartitionTracker
+ populateTopPartitions(1, getWarnThreshold() * 2);
+
+ enable(true);
+
+ // First write - should warn
+ SimpleQueryResult result1 = CLUSTER.coordinator(1).executeWithResult(
+ "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)",
+ ConsistencyLevel.ALL, bytes(512));
+ assertWarnings(result1.warnings());
+ assertWarningsCount(1);
+
+ // Second write - should warn again
+ SimpleQueryResult result2 = CLUSTER.coordinator(1).executeWithResult(
+ "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, ?)",
+ ConsistencyLevel.ALL, bytes(512));
+ assertWarnings(result2.warnings());
+ assertWarningsCount(2);
+ }
+
+ @Test
+ public void mixedTopAndNonTopPartitions()
+ {
+ populateTopPartitions(1, getWarnThreshold() * 2);
+
+ enable(true);
+
+ SimpleQueryResult result1 = CLUSTER.coordinator(1).executeWithResult(
+ "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)",
+ ConsistencyLevel.ALL, bytes(512));
+ assertWarnings(result1.warnings());
+ assertWarningsCount(1);
+
+ SimpleQueryResult result2 = CLUSTER.coordinator(1).executeWithResult(
+ "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (2, 1, ?)",
+ ConsistencyLevel.ALL, bytes(512));
+ assertThat(result2.warnings()).isEmpty();
+ assertWarningsCount(1);
+
+ SimpleQueryResult result3 = CLUSTER.coordinator(1).executeWithResult(
+ "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, ?)",
+ ConsistencyLevel.ALL, bytes(512));
+ assertWarnings(result3.warnings());
+ assertWarningsCount(2);
+ }
+
+ @Test
+ public void javaDriverWarnings()
+ {
+ // Populate TopPartitionTracker
+ populateTopPartitions(1, getWarnThreshold() * 2);
+
+ enable(true);
+
+ // Write using Java driver
+ ResultSet result = JAVA_DRIVER_SESSION.execute(
+ new SimpleStatement("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v)
VALUES (1, 1, ?)", bytes(512))
+
.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.ALL));
+
+ // Should have warnings
+ assertWarnings(result.getExecutionInfo().getWarnings());
+ }
+
+ @Test
+ public void warningMessageContainsTableIdentifier()
+ {
+ populateTopPartitions(1, getWarnThreshold() * 2);
+ enable(true);
+
+ SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(
+ "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)",
+ ConsistencyLevel.ALL, bytes(512));
+
+ List<String> warnings = result.warnings();
+ assertThat(warnings).hasSize(1);
+ // Warning must identify the specific table that breached the threshold
+ assertThat(warnings.get(0)).contains(KEYSPACE + ".tbl");
+ }
+
+ @Test
+ public void rateLimitingThrottlesWarnings() throws InterruptedException
+ {
+ populateTopPartitions(1, getWarnThreshold() * 2);
+ enable(true);
+
+ // Enable rate limiting with 1 second interval
+ CLUSTER.stream().forEach(i -> i.runOnInstance(() ->
+
DatabaseDescriptor.setCoordinatorWriteWarnInterval(new
DurationSpec.LongMillisecondsBound("1000ms"))));
+
+ try
+ {
+ // First write should produce a warning
+ SimpleQueryResult result1 =
CLUSTER.coordinator(1).executeWithResult(
+ "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)",
+ ConsistencyLevel.ALL, bytes(512));
+ assertWarnings(result1.warnings());
+
+ // Second write immediately after should be rate-limited (no
warning)
+ SimpleQueryResult result2 =
CLUSTER.coordinator(1).executeWithResult(
+ "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, ?)",
+ ConsistencyLevel.ALL, bytes(512));
+ assertThat(result2.warnings()).isEmpty();
+
+ Thread.sleep(1500);
+
+ // Third write should produce a warning again
+ SimpleQueryResult result3 =
CLUSTER.coordinator(1).executeWithResult(
+ "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3,
?)",
+ ConsistencyLevel.ALL, bytes(512));
+ assertWarnings(result3.warnings());
+ }
+ finally
+ {
+ // Reset to no rate limiting for other tests
+ CLUSTER.stream().forEach(i -> i.runOnInstance(() ->
+
DatabaseDescriptor.setCoordinatorWriteWarnInterval(new
DurationSpec.LongMillisecondsBound("0ms"))));
+ }
+ }
+
+ protected static void enable(boolean value)
+ {
+ CLUSTER.stream().forEach(i -> i.runOnInstance(() ->
DatabaseDescriptor.setWriteThresholdsEnabled(value)));
+ }
+
+ protected static ByteBuffer bytes(int size)
+ {
+ return ByteBuffer.wrap(new byte[size]);
+ }
+
+ private void assertWarningsCount(int expected)
+ {
+ assertThat(totalWarnings()).as("warnings").isEqualTo(expected);
+ }
+
+ protected abstract long getWarnThreshold();
+}
\ No newline at end of file
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/thresholds/ReplicaWarningTest.java
b/test/distributed/org/apache/cassandra/distributed/test/thresholds/ReplicaWarningTest.java
new file mode 100644
index 0000000000..c1dc4e4a7a
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/thresholds/ReplicaWarningTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.thresholds;
+
+import java.io.IOException;
+import java.util.List;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+
+import org.junit.Test;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.metrics.TopPartitionTracker;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class ReplicaWarningTest extends TestBaseImpl
+{
+ @Test
+ public void testMultiTableWrite() throws IOException
+ {
+ try (Cluster cluster = init(Cluster.build(3)
+ .withConfig(c ->
c.set("min_tracked_partition_size", "50B")
+
.set("write_thresholds_enabled", "true")
+
.set("write_size_warn_threshold", "50B")
+
.with(Feature.NATIVE_PROTOCOL))
+ .start()))
+ {
+ createTables(cluster);
+ populateTopPartitions(cluster.get(3));
+ assertOneWarning(cluster, ConsistencyLevel.ALL);
+ }
+ }
+
+ @Test
+ public void testMultiDCWrite() throws IOException
+ {
+ try (Cluster cluster = init(Cluster.build(6)
+ .withRacks(2, 3)
+ .withConfig(c ->
c.set("min_tracked_partition_size", "50B")
+
.set("write_thresholds_enabled", "true")
+
.set("write_size_warn_threshold", "50B")
+
.with(Feature.NATIVE_PROTOCOL))
+ .start()))
+ {
+ cluster.schemaChange(withKeyspace("alter keyspace %s with
replication = {'class':'NetworkTopologyStrategy', 'datacenter1' : 3,
'datacenter2': 3}"));
+ createTables(cluster);
+ for (int node : new int[] {4, 5})
+ populateTopPartitions(cluster.get(node));
+ assertOneWarning(cluster, ConsistencyLevel.EACH_QUORUM);
+ }
+ }
+
+ private static void createTables(Cluster cluster)
+ {
+ cluster.schemaChange(withKeyspace("create table %s.tbl1 (id int
primary key)"));
+ cluster.schemaChange(withKeyspace("create table %s.tbl2 (id int
primary key)"));
+ cluster.schemaChange(withKeyspace("create table %s.tbl3 (id int
primary key)"));
+ }
+
+ private static void populateTopPartitions(IInvokableInstance instance)
+ {
+ instance.runOnInstance(() -> {
+ TopPartitionTracker tpt =
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl1").topPartitions;
+ for (int i = 0; i < 10; i++)
+ {
+ DecoratedKey key =
Murmur3Partitioner.instance.decorateKey(Int32Type.instance.fromString(String.valueOf(i)));
+ tpt.topSizes().track(key, 100 + i);
+ }
+ });
+ }
+
+ private static void assertOneWarning(Cluster cluster, ConsistencyLevel cl)
+ {
+ com.datastax.driver.core.Cluster.Builder builder =
com.datastax.driver.core.Cluster.builder().addContactPoint((String)cluster.get(1).config().get("rpc_address"));
+
+ try (com.datastax.driver.core.Cluster c = builder.build(); Session
session = c.connect())
+ {
+ BatchStatement bs = new BatchStatement();
+ bs.add(new SimpleStatement(withKeyspace("insert into %s.tbl1 (id)
values (1)")));
+ bs.add(new SimpleStatement(withKeyspace("insert into %s.tbl2 (id)
values (1)")));
+ bs.add(new SimpleStatement(withKeyspace("insert into %s.tbl3 (id)
values (1)")));
+ ResultSet res = session.execute(bs.setConsistencyLevel(cl));
+
+ List<String> warnings = res.getExecutionInfo().getWarnings();
+ // only `tbl1` has any tracked top partitions, should only warn
for that
+ assertEquals(1, warnings.size());
+ for (String warn : warnings)
+ {
+ assertFalse(warn.contains("tbl2"));
+ assertFalse(warn.contains("tbl3"));
+ }
+ }
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/thresholds/WriteSizeWarningTest.java
b/test/distributed/org/apache/cassandra/distributed/test/thresholds/WriteSizeWarningTest.java
new file mode 100644
index 0000000000..c9176efa34
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/thresholds/WriteSizeWarningTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.thresholds;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.BeforeClass;
+
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DurationSpec;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static
org.apache.cassandra.config.DataStorageSpec.DataStorageUnit.MEBIBYTES;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Distributed tests for write size threshold warnings.
+ * Tests that writes to large partitions (tracked in TopPartitionTracker)
trigger warnings.
+ */
+public class WriteSizeWarningTest extends AbstractWriteThresholdWarning
+{
+ private static final long WARN_THRESHOLD_BYTES = 5 * 1024 * 1024; // 5MB
+
+ @BeforeClass
+ public static void setupClass() throws IOException
+ {
+ AbstractWriteThresholdWarning.setupClass();
+
+ // Setup write size threshold after cluster init
+ CLUSTER.stream().forEach(i -> i.runOnInstance(() -> {
+ DatabaseDescriptor.setWriteSizeWarnThreshold(new
DataStorageSpec.LongBytesBound(5, MEBIBYTES));
+ // Set minimum tracked partition size to ensure partitions are
tracked
+ // This should be lower than the test value (10MB) to allow
tracking
+ DatabaseDescriptor.setMinTrackedPartitionSizeInBytes(new
DataStorageSpec.LongBytesBound(1, MEBIBYTES));
+ DatabaseDescriptor.setWriteThresholdsEnabled(true);
+ DatabaseDescriptor.setCoordinatorWriteWarnInterval(new
DurationSpec.LongMillisecondsBound("0ms"));
+ }));
+ }
+
+ @Override
+ protected long getWarnThreshold()
+ {
+ return WARN_THRESHOLD_BYTES;
+ }
+
+ @Override
+ protected void populateTopPartitions(int pk, long sizeBytes)
+ {
+ CLUSTER.stream().forEach(node -> node.runOnInstance(() -> {
+ var key =
Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(pk));
+ ColumnFamilyStore cfs =
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+ cfs.topPartitions.topSizes().track(key, sizeBytes);
+ }));
+ }
+
+ @Override
+ protected long totalWarnings()
+ {
+ return CLUSTER.stream()
+ .mapToLong(i ->
i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.WriteSizeWarnings."
+ KEYSPACE))
+ .sum();
+ }
+
+ @Override
+ protected void assertWarnings(List<String> warnings)
+ {
+ assertThat(warnings).hasSize(1);
+ assertThat(warnings.get(0))
+ .contains(KEYSPACE + ".tbl")
+ .contains("large partition")
+ .contains("estimated size is")
+ .contains("bytes")
+ .contains("write_size_warn_threshold");
+ }
+}
\ No newline at end of file
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/thresholds/WriteTombstoneWarningTest.java
b/test/distributed/org/apache/cassandra/distributed/test/thresholds/WriteTombstoneWarningTest.java
new file mode 100644
index 0000000000..0d4ba64662
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/thresholds/WriteTombstoneWarningTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.thresholds;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.BeforeClass;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DurationSpec;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Distributed tests for write tombstone threshold warnings.
+ * Tests that writes to partitions with many tombstones (tracked in
TopPartitionTracker) trigger warnings.
+ */
+public class WriteTombstoneWarningTest extends AbstractWriteThresholdWarning
+{
+ private static final long WARN_THRESHOLD_COUNT = 1000;
+
+ @BeforeClass
+ public static void setupClass() throws IOException
+ {
+ AbstractWriteThresholdWarning.setupClass();
+
+ // Setup write tombstone threshold after cluster init
+ CLUSTER.stream().forEach(i -> i.runOnInstance(() -> {
+ // Set minimum tracked count first, before the threshold
(validation requires threshold >= min)
+ DatabaseDescriptor.setMinTrackedPartitionTombstoneCount(100);
+ DatabaseDescriptor.setWriteTombstoneWarnThreshold((int)
WARN_THRESHOLD_COUNT);
+ DatabaseDescriptor.setCoordinatorWriteWarnInterval(new
DurationSpec.LongMillisecondsBound("0ms"));
+ }));
+ }
+
+ @Override
+ protected long getWarnThreshold()
+ {
+ return WARN_THRESHOLD_COUNT;
+ }
+
+ @Override
+ protected void populateTopPartitions(int pk, long tombstoneCount)
+ {
+ CLUSTER.stream().forEach(node -> node.runOnInstance(() -> {
+ // Get the DecoratedKey for the partition
+ var key =
Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(pk));
+
+ // Get the ColumnFamilyStore
+ ColumnFamilyStore cfs =
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+
+ // Populate TopPartitionTracker with the tombstone count
+ if (cfs.topPartitions != null)
+ {
+ cfs.topPartitions.topTombstones().track(key, tombstoneCount);
+ }
+ }));
+ }
+
+ @Override
+ protected long totalWarnings()
+ {
+ return CLUSTER.stream()
+ .mapToLong(i ->
i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.WriteTombstoneWarnings."
+ KEYSPACE))
+ .sum();
+ }
+
+ @Override
+ protected void assertWarnings(List<String> warnings)
+ {
+ assertThat(warnings).hasSize(1);
+ assertThat(warnings.get(0))
+ .contains(KEYSPACE + ".tbl")
+ .contains("many tombstones")
+ .contains("estimated count is")
+ .contains("write_tombstone_warn_threshold");
+ }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
index 678f6746f1..2a44e763e0 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
@@ -50,6 +50,7 @@ import static
org.apache.cassandra.config.CassandraRelevantProperties.ALLOW_UNLI
import static
org.apache.cassandra.config.CassandraRelevantProperties.CONFIG_LOADER;
import static
org.apache.cassandra.config.CassandraRelevantProperties.PARTITIONER;
import static
org.apache.cassandra.config.DataStorageSpec.DataStorageUnit.KIBIBYTES;
+import static
org.apache.cassandra.config.DataStorageSpec.DataStorageUnit.MEBIBYTES;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -687,7 +688,7 @@ public class DatabaseDescriptorTest
Config conf = new Config();
conf.coordinator_read_size_warn_threshold = new
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
conf.coordinator_read_size_fail_threshold = new
DataStorageSpec.LongBytesBound(1, KIBIBYTES);
- Assertions.assertThatThrownBy(() ->
DatabaseDescriptor.applyReadThresholdsValidations(conf))
+ Assertions.assertThatThrownBy(() ->
DatabaseDescriptor.applyThresholdsValidations(conf))
.isInstanceOf(ConfigurationException.class)
.hasMessage("coordinator_read_size_fail_threshold (1KiB)
must be greater than or equal to coordinator_read_size_warn_threshold (2KiB)");
}
@@ -698,7 +699,7 @@ public class DatabaseDescriptorTest
Config conf = new Config();
conf.coordinator_read_size_warn_threshold = new
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
conf.coordinator_read_size_fail_threshold = new
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
- DatabaseDescriptor.applyReadThresholdsValidations(conf);
+ DatabaseDescriptor.applyThresholdsValidations(conf);
}
@Test
@@ -707,7 +708,7 @@ public class DatabaseDescriptorTest
Config conf = new Config();
conf.coordinator_read_size_warn_threshold = new
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
conf.coordinator_read_size_fail_threshold = null;
- DatabaseDescriptor.applyReadThresholdsValidations(conf);
+ DatabaseDescriptor.applyThresholdsValidations(conf);
}
@Test
@@ -716,7 +717,7 @@ public class DatabaseDescriptorTest
Config conf = new Config();
conf.coordinator_read_size_warn_threshold = new
DataStorageSpec.LongBytesBound(0, KIBIBYTES);
conf.coordinator_read_size_fail_threshold = new
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
- DatabaseDescriptor.applyReadThresholdsValidations(conf);
+ DatabaseDescriptor.applyThresholdsValidations(conf);
}
// local read
@@ -727,7 +728,7 @@ public class DatabaseDescriptorTest
Config conf = new Config();
conf.local_read_size_warn_threshold = new
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
conf.local_read_size_fail_threshold = new
DataStorageSpec.LongBytesBound(1, KIBIBYTES);
- Assertions.assertThatThrownBy(() ->
DatabaseDescriptor.applyReadThresholdsValidations(conf))
+ Assertions.assertThatThrownBy(() ->
DatabaseDescriptor.applyThresholdsValidations(conf))
.isInstanceOf(ConfigurationException.class)
.hasMessage("local_read_size_fail_threshold (1KiB) must be
greater than or equal to local_read_size_warn_threshold (2KiB)");
}
@@ -738,7 +739,7 @@ public class DatabaseDescriptorTest
Config conf = new Config();
conf.local_read_size_warn_threshold = new
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
conf.local_read_size_fail_threshold = new
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
- DatabaseDescriptor.applyReadThresholdsValidations(conf);
+ DatabaseDescriptor.applyThresholdsValidations(conf);
}
@Test
@@ -747,7 +748,7 @@ public class DatabaseDescriptorTest
Config conf = new Config();
conf.local_read_size_warn_threshold = new
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
conf.local_read_size_fail_threshold = null;
- DatabaseDescriptor.applyReadThresholdsValidations(conf);
+ DatabaseDescriptor.applyThresholdsValidations(conf);
}
@Test
@@ -756,7 +757,7 @@ public class DatabaseDescriptorTest
Config conf = new Config();
conf.local_read_size_warn_threshold = new
DataStorageSpec.LongBytesBound(0, KIBIBYTES);
conf.local_read_size_fail_threshold = new
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
- DatabaseDescriptor.applyReadThresholdsValidations(conf);
+ DatabaseDescriptor.applyThresholdsValidations(conf);
}
// row index entry
@@ -767,7 +768,7 @@ public class DatabaseDescriptorTest
Config conf = new Config();
conf.row_index_read_size_warn_threshold = new
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
conf.row_index_read_size_fail_threshold = new
DataStorageSpec.LongBytesBound(1, KIBIBYTES);
- Assertions.assertThatThrownBy(() ->
DatabaseDescriptor.applyReadThresholdsValidations(conf))
+ Assertions.assertThatThrownBy(() ->
DatabaseDescriptor.applyThresholdsValidations(conf))
.isInstanceOf(ConfigurationException.class)
.hasMessage("row_index_read_size_fail_threshold (1KiB) must
be greater than or equal to row_index_read_size_warn_threshold (2KiB)");
}
@@ -778,7 +779,7 @@ public class DatabaseDescriptorTest
Config conf = new Config();
conf.row_index_read_size_warn_threshold = new
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
conf.row_index_read_size_fail_threshold = new
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
- DatabaseDescriptor.applyReadThresholdsValidations(conf);
+ DatabaseDescriptor.applyThresholdsValidations(conf);
}
@Test
@@ -787,7 +788,7 @@ public class DatabaseDescriptorTest
Config conf = new Config();
conf.row_index_read_size_warn_threshold = new
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
conf.row_index_read_size_fail_threshold = null;
- DatabaseDescriptor.applyReadThresholdsValidations(conf);
+ DatabaseDescriptor.applyThresholdsValidations(conf);
}
@Test
@@ -796,7 +797,150 @@ public class DatabaseDescriptorTest
Config conf = new Config();
conf.row_index_read_size_warn_threshold = new
DataStorageSpec.LongBytesBound(0, KIBIBYTES);
conf.row_index_read_size_fail_threshold = new
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
- DatabaseDescriptor.applyReadThresholdsValidations(conf);
+ DatabaseDescriptor.applyThresholdsValidations(conf);
+ }
+
+ // write thresholds
+ @Test
+ public void testWriteSizeWarnThresholdEnabled()
+ {
+ Config conf = new Config();
+ conf.write_size_warn_threshold = new
DataStorageSpec.LongBytesBound(10, MEBIBYTES);
+ conf.write_thresholds_enabled = true;
+
DatabaseDescriptor.setWriteSizeWarnThreshold(conf.write_size_warn_threshold);
+
DatabaseDescriptor.setWriteThresholdsEnabled(conf.write_thresholds_enabled);
+
assertThat(DatabaseDescriptor.getWriteSizeWarnThreshold()).isEqualTo(conf.write_size_warn_threshold);
+ assertThat(DatabaseDescriptor.getWriteThresholdsEnabled()).isTrue();
+ }
+
+ @Test
+ public void testWriteSizeWarnThresholdDisabled()
+ {
+ Config conf = new Config();
+ conf.write_size_warn_threshold = null;
+ conf.write_thresholds_enabled = false;
+
DatabaseDescriptor.setWriteSizeWarnThreshold(conf.write_size_warn_threshold);
+
DatabaseDescriptor.setWriteThresholdsEnabled(conf.write_thresholds_enabled);
+ assertThat(DatabaseDescriptor.getWriteSizeWarnThreshold()).isNull();
+ assertThat(DatabaseDescriptor.getWriteThresholdsEnabled()).isFalse();
+ }
+
+ @Test
+ public void testWriteTombstoneWarnThresholdEnabled()
+ {
+ Config conf = new Config();
+ conf.write_tombstone_warn_threshold = 1000;
+ conf.write_thresholds_enabled = true;
+
DatabaseDescriptor.setWriteTombstoneWarnThreshold(conf.write_tombstone_warn_threshold);
+
DatabaseDescriptor.setWriteThresholdsEnabled(conf.write_thresholds_enabled);
+
assertThat(DatabaseDescriptor.getWriteTombstoneWarnThreshold()).isEqualTo(1000);
+ assertThat(DatabaseDescriptor.getWriteThresholdsEnabled()).isTrue();
+ }
+
+ @Test
+ public void testWriteTombstoneWarnThresholdDisabled()
+ {
+ Config conf = new Config();
+ conf.write_tombstone_warn_threshold = -1;
+ conf.write_thresholds_enabled = false;
+
DatabaseDescriptor.setWriteTombstoneWarnThreshold(conf.write_tombstone_warn_threshold);
+
DatabaseDescriptor.setWriteThresholdsEnabled(conf.write_thresholds_enabled);
+
assertThat(DatabaseDescriptor.getWriteTombstoneWarnThreshold()).isEqualTo(-1);
+ assertThat(DatabaseDescriptor.getWriteThresholdsEnabled()).isFalse();
+ }
+
+ @Test
+ public void testWriteThresholdsCanBeSetIndependently()
+ {
+ Config conf = new Config();
+ // Set size threshold only
+ conf.write_size_warn_threshold = new DataStorageSpec.LongBytesBound(5,
MEBIBYTES);
+ conf.write_tombstone_warn_threshold = -1;
+ conf.write_thresholds_enabled = true;
+
DatabaseDescriptor.setWriteSizeWarnThreshold(conf.write_size_warn_threshold);
+
DatabaseDescriptor.setWriteTombstoneWarnThreshold(conf.write_tombstone_warn_threshold);
+
DatabaseDescriptor.setWriteThresholdsEnabled(conf.write_thresholds_enabled);
+
assertThat(DatabaseDescriptor.getWriteSizeWarnThreshold()).isEqualTo(conf.write_size_warn_threshold);
+
assertThat(DatabaseDescriptor.getWriteTombstoneWarnThreshold()).isEqualTo(-1);
+
+ // Set tombstone threshold only
+ conf.write_size_warn_threshold = null;
+ conf.write_tombstone_warn_threshold = 500;
+
DatabaseDescriptor.setWriteSizeWarnThreshold(conf.write_size_warn_threshold);
+
DatabaseDescriptor.setWriteTombstoneWarnThreshold(conf.write_tombstone_warn_threshold);
+ assertThat(DatabaseDescriptor.getWriteSizeWarnThreshold()).isNull();
+
assertThat(DatabaseDescriptor.getWriteTombstoneWarnThreshold()).isEqualTo(500);
+ }
+
+ @Test
+ public void testWriteSizeWarnThresholdValidation()
+ {
+ Config conf = new Config();
+
+ // Error: warn < min
+ conf.write_size_warn_threshold = new DataStorageSpec.LongBytesBound(1,
MEBIBYTES);
+ conf.min_tracked_partition_size = new
DataStorageSpec.LongBytesBound(5, MEBIBYTES);
+ assertThatThrownBy(() ->
DatabaseDescriptor.applyThresholdsValidations(conf))
+ .isInstanceOf(ConfigurationException.class)
+ .hasMessageContaining("write_size_warn_threshold (1MiB) cannot be less
than min_tracked_partition_size (5MiB)");
+
+ // Valid: warn == min
+ conf.write_size_warn_threshold = new DataStorageSpec.LongBytesBound(5,
MEBIBYTES);
+ conf.min_tracked_partition_size = new
DataStorageSpec.LongBytesBound(5, MEBIBYTES);
+ DatabaseDescriptor.applyThresholdsValidations(conf);
+
+ // Valid: warn > min
+ conf.write_size_warn_threshold = new
DataStorageSpec.LongBytesBound(10, MEBIBYTES);
+ conf.min_tracked_partition_size = new
DataStorageSpec.LongBytesBound(5, MEBIBYTES);
+ DatabaseDescriptor.applyThresholdsValidations(conf);
+
+ // Valid: null values
+ conf.write_size_warn_threshold = null;
+ conf.min_tracked_partition_size = new
DataStorageSpec.LongBytesBound(5, MEBIBYTES);
+ DatabaseDescriptor.applyThresholdsValidations(conf);
+
+ conf.write_size_warn_threshold = new
DataStorageSpec.LongBytesBound(10, MEBIBYTES);
+ conf.min_tracked_partition_size = null;
+ DatabaseDescriptor.applyThresholdsValidations(conf);
+
+ conf.write_size_warn_threshold = null;
+ conf.min_tracked_partition_size = null;
+ DatabaseDescriptor.applyThresholdsValidations(conf);
+ }
+
+ @Test
+ public void testWriteTombstoneWarnThresholdValidation()
+ {
+ Config conf = new Config();
+
+ // Error: threshold < -1
+ conf.write_tombstone_warn_threshold = -2;
+ conf.min_tracked_partition_tombstone_count = 100;
+ assertThatThrownBy(() ->
DatabaseDescriptor.applyThresholdsValidations(conf))
+ .isInstanceOf(ConfigurationException.class)
+ .hasMessageContaining("write_tombstone_warn_threshold (-2) must be -1
(disabled) or >= 0");
+
+ // Valid: threshold == -1 (disabled)
+ conf.write_tombstone_warn_threshold = -1;
+ conf.min_tracked_partition_tombstone_count = 100;
+ DatabaseDescriptor.applyThresholdsValidations(conf);
+
+ // Error: warn < min
+ conf.write_tombstone_warn_threshold = 50;
+ conf.min_tracked_partition_tombstone_count = 100;
+ assertThatThrownBy(() ->
DatabaseDescriptor.applyThresholdsValidations(conf))
+ .isInstanceOf(ConfigurationException.class)
+ .hasMessageContaining("write_tombstone_warn_threshold (50) cannot be
less than min_tracked_partition_tombstone_count (100)");
+
+ // Valid: warn == min
+ conf.write_tombstone_warn_threshold = 100;
+ conf.min_tracked_partition_tombstone_count = 100;
+ DatabaseDescriptor.applyThresholdsValidations(conf);
+
+ // Valid: warn > min
+ conf.write_tombstone_warn_threshold = 200;
+ conf.min_tracked_partition_tombstone_count = 100;
+ DatabaseDescriptor.applyThresholdsValidations(conf);
}
@Test
diff --git
a/test/unit/org/apache/cassandra/service/writes/thresholds/WarnCounterTest.java
b/test/unit/org/apache/cassandra/service/writes/thresholds/WarnCounterTest.java
new file mode 100644
index 0000000000..192833c8e3
--- /dev/null
+++
b/test/unit/org/apache/cassandra/service/writes/thresholds/WarnCounterTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.writes.thresholds;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.junit.Test;
+
+import org.apache.cassandra.schema.TableId;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class WarnCounterTest
+{
+ private static final TableId TABLE1 = TableId.fromUUID(new UUID(0, 1));
+ private static final TableId TABLE2 = TableId.fromUUID(new UUID(0, 2));
+
+ @Test
+ public void testAddWarningSingleTable()
+ {
+ WarnCounter counter = new WarnCounter();
+ counter.addWarning(map(TABLE1, 1024L));
+
+ WriteThresholdCounter snapshot = counter.snapshot();
+ assertThat(snapshot.tableValues).containsEntry(TABLE1, 1024L);
+ assertThat(snapshot.tableValues).hasSize(1);
+ }
+
+ @Test
+ public void testAddWarningMultipleTables()
+ {
+ WarnCounter counter = new WarnCounter();
+ counter.addWarning(map(TABLE1, 1024L, TABLE2, 2048L));
+
+ WriteThresholdCounter snapshot = counter.snapshot();
+ assertThat(snapshot.tableValues).containsEntry(TABLE1, 1024L);
+ assertThat(snapshot.tableValues).containsEntry(TABLE2, 2048L);
+ }
+
+ @Test
+ public void testMaxValueTrackedPerTable()
+ {
+ WarnCounter counter = new WarnCounter();
+ counter.addWarning(map(TABLE1, 1024L));
+ counter.addWarning(map(TABLE1, 2048L));
+ counter.addWarning(map(TABLE1, 512L));
+
+ WriteThresholdCounter snapshot = counter.snapshot();
+ assertThat(snapshot.tableValues).containsEntry(TABLE1, 2048L);
+ }
+
+ @Test
+ public void testMaxValueIndependentPerTable()
+ {
+ WarnCounter counter = new WarnCounter();
+ counter.addWarning(map(TABLE1, 1000L, TABLE2, 500L));
+ counter.addWarning(map(TABLE1, 500L, TABLE2, 1000L));
+
+ WriteThresholdCounter snapshot = counter.snapshot();
+ assertThat(snapshot.tableValues).containsEntry(TABLE1, 1000L);
+ assertThat(snapshot.tableValues).containsEntry(TABLE2, 1000L);
+ }
+
+ @Test
+ public void testNewTableAddedOnSubsequentCall()
+ {
+ WarnCounter counter = new WarnCounter();
+ counter.addWarning(map(TABLE1, 1024L));
+ counter.addWarning(map(TABLE2, 2048L));
+
+ WriteThresholdCounter snapshot = counter.snapshot();
+ assertThat(snapshot.tableValues).containsEntry(TABLE1, 1024L);
+ assertThat(snapshot.tableValues).containsEntry(TABLE2, 2048L);
+ }
+
+ @Test
+ public void testSnapshotIsImmutable()
+ {
+ WarnCounter counter = new WarnCounter();
+ counter.addWarning(map(TABLE1, 1024L));
+
+ WriteThresholdCounter snapshot1 = counter.snapshot();
+
+ counter.addWarning(map(TABLE1, 2048L));
+ counter.addWarning(map(TABLE2, 512L));
+
+ WriteThresholdCounter snapshot2 = counter.snapshot();
+
+ // snapshot1 should not be affected by subsequent addWarning calls
+ assertThat(snapshot1.tableValues).containsEntry(TABLE1, 1024L);
+ assertThat(snapshot1.tableValues).doesNotContainKey(TABLE2);
+
+ assertThat(snapshot2.tableValues).containsEntry(TABLE1, 2048L);
+ assertThat(snapshot2.tableValues).containsEntry(TABLE2, 512L);
+ }
+
+ @Test
+ public void testEmptyCounter()
+ {
+ WarnCounter counter = new WarnCounter();
+
+ WriteThresholdCounter snapshot = counter.snapshot();
+ assertThat(snapshot.isEmpty()).isTrue();
+ assertThat(snapshot.tableValues).isEmpty();
+ }
+
+ @Test
+ public void testEmptyMapDoesNothing()
+ {
+ WarnCounter counter = new WarnCounter();
+ counter.addWarning(new HashMap<>());
+
+ assertThat(counter.snapshot().isEmpty()).isTrue();
+ }
+
+ private static Map<TableId, Long> map(TableId t1, long v1)
+ {
+ Map<TableId, Long> m = new HashMap<>();
+ m.put(t1, v1);
+ return m;
+ }
+
+ private static Map<TableId, Long> map(TableId t1, long v1, TableId t2,
long v2)
+ {
+ Map<TableId, Long> m = new HashMap<>();
+ m.put(t1, v1);
+ m.put(t2, v2);
+ return m;
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/service/writes/thresholds/WriteWarningContextTest.java
b/test/unit/org/apache/cassandra/service/writes/thresholds/WriteWarningContextTest.java
new file mode 100644
index 0000000000..ec631ad6bd
--- /dev/null
+++
b/test/unit/org/apache/cassandra/service/writes/thresholds/WriteWarningContextTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.writes.thresholds;
+
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.junit.Test;
+
+import org.apache.cassandra.net.ParamType;
+import org.apache.cassandra.schema.TableId;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class WriteWarningContextTest
+{
+ private static final TableId TABLE1 = TableId.fromUUID(new UUID(0, 1));
+ private static final TableId TABLE2 = TableId.fromUUID(new UUID(0, 2));
+
+ @Test
+ public void testIsSupported()
+ {
+
assertThat(WriteWarningContext.isSupported(EnumSet.of(ParamType.WRITE_SIZE_WARN))).isTrue();
+
assertThat(WriteWarningContext.isSupported(EnumSet.of(ParamType.WRITE_TOMBSTONE_WARN))).isTrue();
+ assertThat(WriteWarningContext.isSupported(EnumSet.of(
+ ParamType.WRITE_SIZE_WARN,
+ ParamType.WRITE_TOMBSTONE_WARN
+ ))).isTrue();
+
+ // Read threshold params are not supported
+ assertThat(WriteWarningContext.isSupported(EnumSet.of(
+ ParamType.TOMBSTONE_WARNING,
+ ParamType.TOMBSTONE_FAIL
+ ))).isFalse();
+
+
assertThat(WriteWarningContext.isSupported(EnumSet.noneOf(ParamType.class))).isFalse();
+ }
+
+ @Test
+ public void testUpdateCountersWithSingleCall()
+ {
+ WriteWarningContext context = new WriteWarningContext();
+
+ Map<ParamType, Object> params = new HashMap<>();
+ params.put(ParamType.WRITE_SIZE_WARN, tableMap(TABLE1, 1024L));
+ params.put(ParamType.WRITE_TOMBSTONE_WARN, tableMap(TABLE1, 500L));
+
+ context.updateCounters(params);
+
+ WriteWarningsSnapshot snapshot = context.snapshot();
+ assertThat(snapshot.writeSize.tableValues).containsEntry(TABLE1,
1024L);
+ assertThat(snapshot.writeTombstone.tableValues).containsEntry(TABLE1,
500L);
+ }
+
+ @Test
+ public void testUpdateCountersFromMultipleCalls()
+ {
+ WriteWarningContext context = new WriteWarningContext();
+
+ // First call reports TABLE1 with lower values
+ Map<ParamType, Object> params1 = new HashMap<>();
+ params1.put(ParamType.WRITE_SIZE_WARN, tableMap(TABLE1, 1024L));
+ params1.put(ParamType.WRITE_TOMBSTONE_WARN, tableMap(TABLE1, 500L));
+ context.updateCounters(params1);
+
+ // Second call reports TABLE1 with higher values and TABLE2
+ Map<ParamType, Object> params2 = new HashMap<>();
+ params2.put(ParamType.WRITE_SIZE_WARN, tableMap(TABLE1, 2048L));
+ params2.put(ParamType.WRITE_TOMBSTONE_WARN, tableMap(TABLE2, 1000L));
+ context.updateCounters(params2);
+
+ WriteWarningsSnapshot snapshot = context.snapshot();
+
+ // Max values per table should be taken
+ assertThat(snapshot.writeSize.tableValues).containsEntry(TABLE1,
2048L);
+ assertThat(snapshot.writeTombstone.tableValues).containsEntry(TABLE1,
500L);
+ assertThat(snapshot.writeTombstone.tableValues).containsEntry(TABLE2,
1000L);
+ }
+
+ @Test
+ public void testUpdateCountersWithPartialWarnings()
+ {
+ WriteWarningContext context = new WriteWarningContext();
+
+ // First call only reports size warning
+ Map<ParamType, Object> params1 = new HashMap<>();
+ params1.put(ParamType.WRITE_SIZE_WARN, tableMap(TABLE1, 1024L));
+ context.updateCounters(params1);
+
+ // Second call only reports tombstone warning
+ Map<ParamType, Object> params2 = new HashMap<>();
+ params2.put(ParamType.WRITE_TOMBSTONE_WARN, tableMap(TABLE2, 500L));
+ context.updateCounters(params2);
+
+ WriteWarningsSnapshot snapshot = context.snapshot();
+
+ assertThat(snapshot.writeSize.tableValues).containsEntry(TABLE1,
1024L);
+ assertThat(snapshot.writeTombstone.tableValues).containsEntry(TABLE2,
500L);
+ }
+
+ @Test
+ public void testUpdateCountersWithEmptyParams()
+ {
+ WriteWarningContext context = new WriteWarningContext();
+
+ context.updateCounters(new HashMap<>());
+
+ assertThat(context.snapshot().isEmpty()).isTrue();
+ }
+
+ @Test
+ public void testUpdateCountersWithUnsupportedParams()
+ {
+ WriteWarningContext context = new WriteWarningContext();
+
+ Map<ParamType, Object> params = new HashMap<>();
+ params.put(ParamType.TOMBSTONE_WARNING, 100);
+ params.put(ParamType.TOMBSTONE_FAIL, 200);
+ context.updateCounters(params);
+
+ assertThat(context.snapshot().isEmpty()).isTrue();
+ }
+
+ @Test
+ public void testUpdateCountersMaxValueTrackedPerTable()
+ {
+ WriteWarningContext context = new WriteWarningContext();
+
+ context.updateCounters(paramsSize(tableMap(TABLE1, 1024L)));
+ context.updateCounters(paramsSize(tableMap(TABLE1, 2048L)));
+ context.updateCounters(paramsSize(tableMap(TABLE1, 512L)));
+
+ WriteWarningsSnapshot snapshot = context.snapshot();
+ assertThat(snapshot.writeSize.tableValues).containsEntry(TABLE1,
2048L);
+ }
+
+ @Test
+ public void testSnapshotIsImmutable()
+ {
+ WriteWarningContext context = new WriteWarningContext();
+
+ context.updateCounters(paramsSize(tableMap(TABLE1, 1024L)));
+ WriteWarningsSnapshot snapshot1 = context.snapshot();
+
+ context.updateCounters(paramsSize(tableMap(TABLE1, 2048L)));
+ context.updateCounters(paramsSize(tableMap(TABLE2, 512L)));
+ WriteWarningsSnapshot snapshot2 = context.snapshot();
+
+ // First snapshot should not be affected by subsequent calls
+ assertThat(snapshot1.writeSize.tableValues).containsEntry(TABLE1,
1024L);
+ assertThat(snapshot1.writeSize.tableValues).doesNotContainKey(TABLE2);
+
+ assertThat(snapshot2.writeSize.tableValues).containsEntry(TABLE1,
2048L);
+ assertThat(snapshot2.writeSize.tableValues).containsEntry(TABLE2,
512L);
+ }
+
+ @Test
+ public void testSizeAndTombstoneTrackedIndependently()
+ {
+ WriteWarningContext context = new WriteWarningContext();
+
+ Map<ParamType, Object> params = new HashMap<>();
+ params.put(ParamType.WRITE_SIZE_WARN, tableMap(TABLE1,
Long.MAX_VALUE));
+ params.put(ParamType.WRITE_TOMBSTONE_WARN, tableMap(TABLE2, 500L));
+
+ context.updateCounters(params);
+
+ WriteWarningsSnapshot snapshot = context.snapshot();
+ assertThat(snapshot.writeSize.tableValues).containsEntry(TABLE1,
Long.MAX_VALUE);
+ assertThat(snapshot.writeTombstone.tableValues).containsEntry(TABLE2,
500L);
+ assertThat(snapshot.writeSize.tableValues).doesNotContainKey(TABLE2);
+
assertThat(snapshot.writeTombstone.tableValues).doesNotContainKey(TABLE1);
+ }
+
+ private static Map<TableId, Long> tableMap(TableId tableId, long value)
+ {
+ Map<TableId, Long> m = new HashMap<>();
+ m.put(tableId, value);
+ return m;
+ }
+
+ private static Map<ParamType, Object> paramsSize(Map<TableId, Long>
sizeMap)
+ {
+ Map<ParamType, Object> params = new HashMap<>();
+ params.put(ParamType.WRITE_SIZE_WARN, sizeMap);
+ return params;
+ }
+}
\ No newline at end of file
diff --git
a/test/unit/org/apache/cassandra/service/writes/thresholds/WriteWarningsSnapshotTest.java
b/test/unit/org/apache/cassandra/service/writes/thresholds/WriteWarningsSnapshotTest.java
new file mode 100644
index 0000000000..c22ec7df3c
--- /dev/null
+++
b/test/unit/org/apache/cassandra/service/writes/thresholds/WriteWarningsSnapshotTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.writes.thresholds;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.junit.Test;
+import org.quicktheories.core.Gen;
+import org.quicktheories.generators.SourceDSL;
+import org.quicktheories.impl.Constraint;
+
+import org.apache.cassandra.schema.TableId;
+
+import static
org.apache.cassandra.service.writes.thresholds.WriteWarningsSnapshot.create;
+import static
org.apache.cassandra.service.writes.thresholds.WriteWarningsSnapshot.writeSizeWarnMessage;
+import static
org.apache.cassandra.service.writes.thresholds.WriteWarningsSnapshot.writeTombstoneWarnMessage;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.quicktheories.QuickTheory.qt;
+
+public class WriteWarningsSnapshotTest
+{
+ private static final TableId TABLE1 = TableId.fromUUID(new UUID(0, 1));
+ private static final TableId TABLE2 = TableId.fromUUID(new UUID(0, 2));
+ private static final WriteWarningsSnapshot EMPTY_SNAPSHOT =
create(WriteThresholdCounter.empty(), WriteThresholdCounter.empty());
+
+ @Test
+ public void testCreateWithNonEmptyCounters()
+ {
+ WriteThresholdCounter sizeCounter =
WriteThresholdCounter.create(map(TABLE1, 1024L));
+ WriteThresholdCounter tombstoneCounter =
WriteThresholdCounter.create(map(TABLE2, 500L));
+ WriteWarningsSnapshot snapshot = create(sizeCounter, tombstoneCounter);
+
+ assertThat(snapshot.isEmpty()).isFalse();
+ assertThat(snapshot.writeSize).isEqualTo(sizeCounter);
+ assertThat(snapshot.writeTombstone).isEqualTo(tombstoneCounter);
+ }
+
+ @Test
+ public void testMergeWithNull()
+ {
+ WriteThresholdCounter sizeCounter =
WriteThresholdCounter.create(map(TABLE1, 2048L));
+ WriteWarningsSnapshot snapshot = create(sizeCounter,
WriteThresholdCounter.empty());
+ WriteWarningsSnapshot result = snapshot.merge(null);
+
+ assertThat(result).isEqualTo(snapshot);
+ }
+
+ @Test
+ public void testMergeSelfWithSelf()
+ {
+ qt().forAll(all()).check(snapshot ->
snapshot.merge(snapshot).equals(snapshot));
+ }
+
+ @Test
+ public void testMergeNonOverlappingTables()
+ {
+ WriteWarningsSnapshot snapshot1 = create(
+ WriteThresholdCounter.create(map(TABLE1, 1024L)),
+ WriteThresholdCounter.create(map(TABLE1, 100L))
+ );
+
+ WriteWarningsSnapshot snapshot2 = create(
+ WriteThresholdCounter.create(map(TABLE2, 2048L)),
+ WriteThresholdCounter.create(map(TABLE2, 200L))
+ );
+
+ WriteWarningsSnapshot merged = snapshot1.merge(snapshot2);
+
+ assertThat(merged.writeSize.tableValues).containsEntry(TABLE1, 1024L);
+ assertThat(merged.writeSize.tableValues).containsEntry(TABLE2, 2048L);
+ assertThat(merged.writeTombstone.tableValues).containsEntry(TABLE1,
100L);
+ assertThat(merged.writeTombstone.tableValues).containsEntry(TABLE2,
200L);
+ }
+
+ @Test
+ public void testMergeOverlappingTablesTakesMax()
+ {
+ WriteWarningsSnapshot snapshot1 = create(
+ WriteThresholdCounter.create(map(TABLE1, 3000L)),
+ WriteThresholdCounter.empty()
+ );
+
+ WriteWarningsSnapshot snapshot2 = create(
+ WriteThresholdCounter.create(map(TABLE1, 4000L)),
+ WriteThresholdCounter.empty()
+ );
+
+ WriteWarningsSnapshot merged = snapshot1.merge(snapshot2);
+
+ assertThat(merged.writeSize.tableValues).containsEntry(TABLE1, 4000L);
+ assertThat(merged.writeSize.tableValues).hasSize(1);
+ }
+
+ @Test
+ public void testMergeDifferentThresholdTypes()
+ {
+ WriteWarningsSnapshot snapshot1 = create(
+ WriteThresholdCounter.create(map(TABLE1, 5000L)),
+ WriteThresholdCounter.empty()
+ );
+
+ WriteWarningsSnapshot snapshot2 = create(
+ WriteThresholdCounter.empty(),
+ WriteThresholdCounter.create(map(TABLE2, 300L))
+ );
+
+ WriteWarningsSnapshot merged = snapshot1.merge(snapshot2);
+
+ assertThat(merged.writeSize.tableValues).containsEntry(TABLE1, 5000L);
+ assertThat(merged.writeTombstone.tableValues).containsEntry(TABLE2,
300L);
+ }
+
+ @Test
+ public void testWriteSizeWarnMessage()
+ {
+ String message = writeSizeWarnMessage(1048576L);
+ assertThat(message).isEqualTo("Write to large partition; estimated
size is 1048576 bytes (see write_size_warn_threshold)");
+ }
+
+ @Test
+ public void testWriteTombstoneWarnMessage()
+ {
+ String message = writeTombstoneWarnMessage(500L);
+ assertThat(message).isEqualTo("Write to partition with many
tombstones; estimated count is 500 (see write_tombstone_warn_threshold)");
+ }
+
+ @Test
+ public void testMergeCommutative()
+ {
+ qt().forAll(all(), all()).check((a, b) ->
a.merge(b).equals(b.merge(a)));
+ }
+
+ @Test
+ public void testMergeAssociative()
+ {
+ qt().forAll(all(), all(), all()).check((a, b, c) ->
a.merge(b).merge(c).equals(a.merge(b.merge(c))));
+ }
+
+ private static Gen<WriteWarningsSnapshot> all()
+ {
+ Gen<Boolean> isEmpty = SourceDSL.booleans().all();
+ Gen<WriteWarningsSnapshot> nonEmpty = nonEmpty();
+ Gen<WriteWarningsSnapshot> gen = rs -> isEmpty.generate(rs) ?
EMPTY_SNAPSHOT : nonEmpty.generate(rs);
+ return gen.describedAs(WriteWarningsSnapshot::toString);
+ }
+
+ private static Gen<WriteWarningsSnapshot> nonEmpty()
+ {
+ Gen<WriteThresholdCounter> counter = counter();
+ Gen<WriteWarningsSnapshot> gen = rs ->
+ {
+ WriteThresholdCounter writeSize = counter.generate(rs);
+ WriteThresholdCounter writeTombstone = counter.generate(rs);
+ return create(writeSize, writeTombstone);
+ };
+ return gen.assuming(snapshot ->
!snapshot.isEmpty()).describedAs(WriteWarningsSnapshot::toString);
+ }
+
+ private static Gen<WriteThresholdCounter> counter()
+ {
+ Gen<Boolean> isEmpty = SourceDSL.booleans().all();
+ Constraint maxValue = Constraint.between(1, Long.MAX_VALUE);
+ Gen<WriteThresholdCounter> gen = rs ->
+ {
+ if (isEmpty.generate(rs))
+ return WriteThresholdCounter.empty();
+ Map<TableId, Long> values = new HashMap<>();
+ values.put(TABLE1, rs.next(maxValue));
+ if (rs.next(Constraint.between(0, 1)) == 1)
+ values.put(TABLE2, rs.next(maxValue));
+ return WriteThresholdCounter.create(values);
+ };
+ return gen.describedAs(WriteThresholdCounter::toString);
+ }
+
+ private static Map<TableId, Long> map(TableId t1, long v1)
+ {
+ Map<TableId, Long> m = new HashMap<>();
+ m.put(t1, v1);
+ return m;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]