This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9bedf98b6e Send client warnings when writing to a large partition
9bedf98b6e is described below

commit 9bedf98b6eb30fbefe00f7513eaf374fc7236e98
Author: minal-kyada <[email protected]>
AuthorDate: Sun Jan 11 23:27:45 2026 -0800

    Send client warnings when writing to a large partition
    
    Patch by Minal Kyada; reviewed by David Capwell and marcuse for 
CASSANDRA-17258
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/config/Config.java   |   5 +
 .../cassandra/config/DatabaseDescriptor.java       |  80 +++++-
 .../org/apache/cassandra/db/MessageParams.java     |  17 ++
 .../apache/cassandra/db/MutationVerbHandler.java   |  15 +-
 .../org/apache/cassandra/db/WriteThresholds.java   | 112 ++++++++
 .../apache/cassandra/metrics/KeyspaceMetrics.java  |   7 +
 .../org/apache/cassandra/metrics/TableMetrics.java |   6 +
 .../cassandra/metrics/TopPartitionTracker.java     |  20 +-
 src/java/org/apache/cassandra/net/ParamType.java   |   5 +-
 .../service/AbstractWriteResponseHandler.java      |  31 +-
 .../DatacenterSyncWriteResponseHandler.java        |  22 +-
 .../org/apache/cassandra/service/StorageProxy.java |  11 +
 .../apache/cassandra/service/StorageService.java   |  35 +++
 .../cassandra/service/StorageServiceMBean.java     |  11 +-
 .../cassandra/service/WriteResponseHandler.java    |  13 +-
 .../reads/thresholds/CoordinatorWarnings.java      |  94 ++----
 .../thresholds/CoordinatorWarningsState.java       | 155 ++++++++++
 .../thresholds/CoordinatorWriteWarnings.java       | 188 ++++++++++++
 .../service/writes/thresholds/WarnCounter.java     |  48 ++++
 .../writes/thresholds/WriteThresholdCounter.java   |  90 ++++++
 .../thresholds/WriteThresholdMapSerializer.java    |  68 +++++
 .../writes/thresholds/WriteWarningContext.java     |  67 +++++
 .../writes/thresholds/WriteWarningsSnapshot.java   |  95 +++++++
 .../org/apache/cassandra/transport/Dispatcher.java |  11 +
 test/conf/cassandra.yaml                           |   5 +
 .../distributed/impl/CoordinatorHelper.java        |  14 +-
 .../cassandra/distributed/impl/Instance.java       |   5 +
 .../thresholds/AbstractWriteThresholdWarning.java  | 315 +++++++++++++++++++++
 .../test/thresholds/ReplicaWarningTest.java        | 123 ++++++++
 .../test/thresholds/WriteSizeWarningTest.java      |  96 +++++++
 .../test/thresholds/WriteTombstoneWarningTest.java |  99 +++++++
 .../cassandra/config/DatabaseDescriptorTest.java   | 168 ++++++++++-
 .../service/writes/thresholds/WarnCounterTest.java | 147 ++++++++++
 .../writes/thresholds/WriteWarningContextTest.java | 205 ++++++++++++++
 .../thresholds/WriteWarningsSnapshotTest.java      | 199 +++++++++++++
 36 files changed, 2489 insertions(+), 94 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 68e5778058..e9de27af5c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Send client warnings when writing to a large partition (CASSANDRA-17258)
  * Harden the possible range of values for max dictionary size and max total 
sample size for dictionary training (CASSANDRA-21194)
  * Implement a guardrail ensuring that minimum training frequency parameter is 
provided in ZstdDictionaryCompressor (CASSANDRA-21192)
  * Replace manual referencing with ColumnFamilyStore.selectAndReference when 
training a dictionary (CASSANDRA-21188)
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index c6c0351f08..0ac9c691a0 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -579,6 +579,11 @@ public class Config
     public volatile int tombstone_warn_threshold = 1000;
     public volatile int tombstone_failure_threshold = 100000;
 
+    public volatile boolean write_thresholds_enabled = false;
+    public volatile DataStorageSpec.LongBytesBound write_size_warn_threshold = 
null;
+    public volatile DurationSpec.LongMillisecondsBound 
coordinator_write_warn_interval = new 
DurationSpec.LongMillisecondsBound("1000ms");
+    public volatile int write_tombstone_warn_threshold = -1;
+
     public TombstonesMetricGranularity 
tombstone_read_purgeable_metric_granularity = 
TombstonesMetricGranularity.disabled;
 
     public final ReplicaFilteringProtectionOptions 
replica_filtering_protection = new ReplicaFilteringProtectionOptions();
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 9999913edd..59ffb9f02a 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -931,7 +931,7 @@ public class DatabaseDescriptor
 
         applyConcurrentValidations(conf);
         applyRepairCommandPoolSize(conf);
-        applyReadThresholdsValidations(conf);
+        applyThresholdsValidations(conf);
 
         if (conf.concurrent_materialized_view_builders <= 0)
             throw new 
ConfigurationException("concurrent_materialized_view_builders should be 
strictly greater than 0, but was " + 
conf.concurrent_materialized_view_builders, false);
@@ -1311,11 +1311,19 @@ public class DatabaseDescriptor
     }
 
     @VisibleForTesting
-    static void applyReadThresholdsValidations(Config config)
+    static void applyThresholdsValidations(Config config)
     {
+        // Validate read thresholds
         validateReadThresholds("coordinator_read_size", 
config.coordinator_read_size_warn_threshold, 
config.coordinator_read_size_fail_threshold);
         validateReadThresholds("local_read_size", 
config.local_read_size_warn_threshold, config.local_read_size_fail_threshold);
         validateReadThresholds("row_index_read_size", 
config.row_index_read_size_warn_threshold, 
config.row_index_read_size_fail_threshold);
+
+        // Write threshold warning depends on top_partitions tracking
+        if (config.write_thresholds_enabled && !config.top_partitions_enabled)
+            logger.warn("Write thresholds require top partitions tracking to 
be enabled");
+
+        validateWriteSizeThreshold(config.write_size_warn_threshold, 
config.min_tracked_partition_size);
+        
validateWriteTombstoneThresholdRange(config.write_tombstone_warn_threshold, 
config.min_tracked_partition_tombstone_count);
     }
 
     private static void validateReadThresholds(String name, 
DataStorageSpec.LongBytesBound warn, DataStorageSpec.LongBytesBound fail)
@@ -1326,6 +1334,25 @@ public class DatabaseDescriptor
                                                            name + 
"_warn_threshold", warn));
     }
 
+    private static void 
validateWriteSizeThreshold(DataStorageSpec.LongBytesBound writeSizeWarn, 
DataStorageSpec.LongBytesBound minTrackedSize)
+    {
+        if (writeSizeWarn != null && minTrackedSize != null)
+        {
+            if (writeSizeWarn.toBytes() < minTrackedSize.toBytes())
+                throw new 
ConfigurationException(String.format("write_size_warn_threshold (%s) cannot be 
less than min_tracked_partition_size (%s)", writeSizeWarn, minTrackedSize));
+        }
+    }
+
+    private static void validateWriteTombstoneThresholdRange(int 
writeTombstoneWarn, long minTrackedTombstoneCount)
+    {
+        if (writeTombstoneWarn < -1)
+            throw new 
ConfigurationException(String.format("write_tombstone_warn_threshold (%d) must 
be -1 (disabled) or >= 0", writeTombstoneWarn));
+
+        if (writeTombstoneWarn != -1 && writeTombstoneWarn < 
minTrackedTombstoneCount)
+            throw new 
ConfigurationException(String.format("write_tombstone_warn_threshold (%d) 
cannot be less than min_tracked_partition_tombstone_count (%d)",
+                                                           writeTombstoneWarn, 
minTrackedTombstoneCount));
+    }
+
     public static GuardrailsOptions getGuardrailsConfig()
     {
         return guardrails;
@@ -5482,6 +5509,55 @@ public class DatabaseDescriptor
         conf.row_index_read_size_fail_threshold = value;
     }
 
+    public static boolean getWriteThresholdsEnabled()
+    {
+        return conf.write_thresholds_enabled;
+    }
+
+    public static void setWriteThresholdsEnabled(boolean enabled)
+    {
+        if (enabled && !conf.top_partitions_enabled)
+            logger.warn("Write thresholds require top partitions tracking to 
be enabled");
+        logger.info("updating write_thresholds_enabled to {}", enabled);
+        conf.write_thresholds_enabled = enabled;
+    }
+
+    @Nullable
+    public static DataStorageSpec.LongBytesBound getWriteSizeWarnThreshold()
+    {
+        return conf.write_size_warn_threshold;
+    }
+
+    public static void setWriteSizeWarnThreshold(@Nullable 
DataStorageSpec.LongBytesBound value)
+    {
+        validateWriteSizeThreshold(value, conf.min_tracked_partition_size);
+        logger.info("updating write_size_warn_threshold to {}", value);
+        conf.write_size_warn_threshold = value;
+    }
+
+    public static DurationSpec.LongMillisecondsBound 
getCoordinatorWriteWarnInterval()
+    {
+        return conf.coordinator_write_warn_interval;
+    }
+
+    public static void 
setCoordinatorWriteWarnInterval(DurationSpec.LongMillisecondsBound ms)
+    {
+        logger.info("updating coordinator_write_warn_interval to {}", ms);
+        conf.coordinator_write_warn_interval = ms;
+    }
+
+    public static int getWriteTombstoneWarnThreshold()
+    {
+        return conf.write_tombstone_warn_threshold;
+    }
+
+    public static void setWriteTombstoneWarnThreshold(int threshold)
+    {
+        validateWriteTombstoneThresholdRange(threshold, 
conf.min_tracked_partition_tombstone_count);
+        logger.info("updating write_tombstone_warn_threshold to {}", 
threshold);
+        conf.write_tombstone_warn_threshold = threshold;
+    }
+
     public static int getDefaultKeyspaceRF()
     {
         return conf.default_keyspace_rf;
diff --git a/src/java/org/apache/cassandra/db/MessageParams.java 
b/src/java/org/apache/cassandra/db/MessageParams.java
index 689e3673ab..6556cf3c09 100644
--- a/src/java/org/apache/cassandra/db/MessageParams.java
+++ b/src/java/org/apache/cassandra/db/MessageParams.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.db;
 
+import java.util.Collections;
 import java.util.EnumMap;
 import java.util.Map;
 
@@ -66,6 +67,22 @@ public class MessageParams
         get().clear();
     }
 
+    /**
+     * Capture the current MessageParams for use across threads.
+     * This returns a copy of the current ThreadLocal params, which can be 
safely
+     * passed to async callbacks that may run on different threads.
+     *
+     * @return immutable copy of current params, or empty map if none
+     */
+    public static Map<ParamType, Object> capture()
+    {
+        Map<ParamType, Object> current = local.get();
+        if (current == null || current.isEmpty())
+            return Collections.emptyMap();
+
+        return new EnumMap<>(current);
+    }
+
     public static <T> Message<T> addToMessage(Message<T> message)
     {
         return message.withParams(get());
diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java 
b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
index 3312a12b65..f4244c84e1 100644
--- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.db;
 
+import java.util.Map;
+
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.ForwardingInfo;
@@ -33,10 +35,13 @@ public class MutationVerbHandler extends 
AbstractMutationVerbHandler<Mutation>
 {
     public static final MutationVerbHandler instance = new 
MutationVerbHandler();
 
-    private void respond(Message<?> respondTo, InetAddressAndPort 
respondToAddress)
+    private void respond(Message<?> respondTo, InetAddressAndPort 
respondToAddress, Map<ParamType, Object> params)
     {
         Tracing.trace("Enqueuing response to {}", respondToAddress);
-        MessagingService.instance().send(respondTo.emptyResponse(), 
respondToAddress);
+        Message<?> response = respondTo.emptyResponse();
+        if (!params.isEmpty())
+            response = response.withParams(params);
+        MessagingService.instance().send(response, respondToAddress);
     }
 
     private void failed()
@@ -53,9 +58,10 @@ public class MutationVerbHandler extends 
AbstractMutationVerbHandler<Mutation>
             return;
         }
 
+        MessageParams.reset();
         message.payload.validateSize(MessagingService.current_version, 
ENTRY_OVERHEAD_SIZE);
+        WriteThresholds.checkWriteThresholds(message.payload);
 
-        // Check if there were any forwarding headers in this message
         ForwardingInfo forwardTo = message.forwardTo();
         if (forwardTo != null)
             forwardToLocalNodes(message, forwardTo);
@@ -73,7 +79,8 @@ public class MutationVerbHandler extends 
AbstractMutationVerbHandler<Mutation>
 
     protected void applyMutation(Message<Mutation> message, InetAddressAndPort 
respondToAddress)
     {
-        message.payload.applyFuture().addCallback(o -> respond(message, 
respondToAddress), wto -> failed());
+        Map<ParamType, Object> params = MessageParams.capture();
+        message.payload.applyFuture().addCallback(o -> respond(message, 
respondToAddress, params), wto -> failed());
     }
 
     private static void forwardToLocalNodes(Message<Mutation> originalMessage, 
ForwardingInfo forwardTo)
diff --git a/src/java/org/apache/cassandra/db/WriteThresholds.java 
b/src/java/org/apache/cassandra/db/WriteThresholds.java
new file mode 100644
index 0000000000..ab6737f1ea
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/WriteThresholds.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.metrics.TopPartitionTracker;
+import org.apache.cassandra.net.ParamType;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.NoSpamLogger;
+
+/**
+ * Utility class for checking write threshold warnings on replicas.
+ * CASSANDRA-17258: paxos and accord do complex thread hand off and custom 
write logic which makes this patch complex, so was deferred
+ */
+public class WriteThresholds
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(WriteThresholds.class);
+    private static final NoSpamLogger noSpamLogger = 
NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
+
+    /**
+     * Check write thresholds for a mutation by comparing the estimated 
partition size and tombstone count
+     * from {@link TopPartitionTracker} against configured warn thresholds. If 
a threshold is breached,
+     * a warning is logged and the corresponding {@link ParamType} is added to 
{@link MessageParams}
+     * for propagation back to the coordinator.
+     *
+     * @param mutation the mutation containing one or more partition updates
+     */
+    public static void checkWriteThresholds(Mutation mutation)
+    {
+        if (!DatabaseDescriptor.isDaemonInitialized() || 
!DatabaseDescriptor.getWriteThresholdsEnabled())
+            return;
+
+        DataStorageSpec.LongBytesBound sizeWarnThreshold = 
DatabaseDescriptor.getWriteSizeWarnThreshold();
+        int tombstoneWarnThreshold = 
DatabaseDescriptor.getWriteTombstoneWarnThreshold();
+
+        if (sizeWarnThreshold == null && tombstoneWarnThreshold == -1)
+            return;
+
+        long sizeWarnBytes = sizeWarnThreshold != null ? 
sizeWarnThreshold.toBytes() : -1;
+        DecoratedKey key = mutation.key();
+
+        Map<TableId, Long> sizeWarnings = null;
+        Map<TableId, Long> tombstoneWarnings = null;
+
+        for (TableId tableId : mutation.getTableIds())
+        {
+            ColumnFamilyStore cfs = 
Schema.instance.getColumnFamilyStoreInstance(tableId);
+            if (cfs == null || cfs.topPartitions == null)
+                continue;
+
+            TableMetadata metadata = cfs.metadata();
+            if (sizeWarnBytes != -1)
+            {
+                long estimatedSize = 
cfs.topPartitions.topSizes().getEstimate(key);
+                if (estimatedSize > sizeWarnBytes)
+                {
+                    if (sizeWarnings == null)
+                        sizeWarnings = new HashMap<>();
+                    sizeWarnings.put(tableId, estimatedSize);
+                    noSpamLogger.warn("Write to {} partition {} triggered size 
warning; " +
+                                      "estimated size is {} bytes, threshold 
is {} bytes (see write_size_warn_threshold)",
+                                      metadata, 
metadata.partitionKeyType.toCQLString(key.getKey()), estimatedSize, 
sizeWarnBytes);
+                }
+            }
+
+            if (tombstoneWarnThreshold != -1)
+            {
+                long estimatedTombstones = 
cfs.topPartitions.topTombstones().getEstimate(key);
+                if (estimatedTombstones > tombstoneWarnThreshold)
+                {
+                    if (tombstoneWarnings == null)
+                        tombstoneWarnings = new HashMap<>();
+                    tombstoneWarnings.put(tableId, estimatedTombstones);
+                    noSpamLogger.warn("Write to {} partition {} triggered 
tombstone warning; " +
+                                      "estimated tombstone count is {}, 
threshold is {} (see write_tombstone_warn_threshold)",
+                                      metadata, 
metadata.partitionKeyType.toCQLString(key.getKey()), estimatedTombstones, 
tombstoneWarnThreshold);
+                }
+            }
+        }
+
+        if (sizeWarnings != null)
+            MessageParams.add(ParamType.WRITE_SIZE_WARN, sizeWarnings);
+        if (tombstoneWarnings != null)
+            MessageParams.add(ParamType.WRITE_TOMBSTONE_WARN, 
tombstoneWarnings);
+    }
+}
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java 
b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index 6f533defff..4491d64e8b 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -191,6 +191,10 @@ public class KeyspaceMetrics
 
     public final Meter tooManySSTableIndexesReadWarnings;
     public final Meter tooManySSTableIndexesReadAborts;
+
+    public final Meter writeSizeWarnings;
+    public final Meter writeTombstoneWarnings;
+
     public final Meter bytesAnticompacted;
     public final Meter bytesMutatedAnticompaction;
     public final Meter bytesPreviewed;
@@ -309,6 +313,9 @@ public class KeyspaceMetrics
         tooManySSTableIndexesReadWarnings = 
createKeyspaceMeter("TooManySSTableIndexesReadWarnings");
         tooManySSTableIndexesReadAborts = 
createKeyspaceMeter("TooManySSTableIndexesReadAborts");
 
+        writeSizeWarnings = createKeyspaceMeter("WriteSizeWarnings");
+        writeTombstoneWarnings = createKeyspaceMeter("WriteTombstoneWarnings");
+
         formatSpecificGauges = createFormatSpecificGauges(keyspace);
 
         outOfRangeTokenReads = createKeyspaceCounter("ReadOutOfRangeToken");
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java 
b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index a1ee119240..9e00988da7 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -307,6 +307,9 @@ public class TableMetrics
     public final TableMeter tooManySSTableIndexesReadWarnings;
     public final TableMeter tooManySSTableIndexesReadAborts;
 
+    public final TableMeter writeSizeWarnings;
+    public final TableMeter writeTombstoneWarnings;
+
     public final ImmutableMap<SSTableFormat<?, ?>, ImmutableMap<String, 
Gauge<? extends Number>>> formatSpecificGauges;
 
     // Time spent building SSTableIntervalTree when constructing a new View 
under the Tracker lock
@@ -907,6 +910,9 @@ public class TableMetrics
         tooManySSTableIndexesReadWarnings = 
createTableMeter("TooManySSTableIndexesReadWarnings", 
cfs.keyspace.metric.tooManySSTableIndexesReadWarnings);
         tooManySSTableIndexesReadAborts = 
createTableMeter("TooManySSTableIndexesReadAborts", 
cfs.keyspace.metric.tooManySSTableIndexesReadAborts);
 
+        writeSizeWarnings = createTableMeter("WriteSizeWarnings", 
cfs.keyspace.metric.writeSizeWarnings);
+        writeTombstoneWarnings = createTableMeter("WriteTombstoneWarnings", 
cfs.keyspace.metric.writeTombstoneWarnings);
+
         viewSSTableIntervalTree = 
createLatencyMetrics("ViewSSTableIntervalTree", 
cfs.keyspace.metric.viewSSTableIntervalTree);
 
         formatSpecificGauges = createFormatSpecificGauges(cfs);
diff --git a/src/java/org/apache/cassandra/metrics/TopPartitionTracker.java 
b/src/java/org/apache/cassandra/metrics/TopPartitionTracker.java
index a5d5090394..eba0f14344 100644
--- a/src/java/org/apache/cassandra/metrics/TopPartitionTracker.java
+++ b/src/java/org/apache/cassandra/metrics/TopPartitionTracker.java
@@ -32,6 +32,8 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.carrotsearch.hppc.ObjectLongHashMap;
+import com.carrotsearch.hppc.ObjectLongMap;
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.concurrent.ScheduledExecutors;
@@ -197,6 +199,7 @@ public class TopPartitionTracker implements Closeable
     public static class TopHolder
     {
         public final NavigableSet<TopPartition> top;
+        public final ObjectLongMap<DecoratedKey> lookup;
         private final int maxTopPartitionCount;
         private final long minTrackedValue;
         private final Collection<Range<Token>> ranges;
@@ -205,14 +208,15 @@ public class TopPartitionTracker implements Closeable
 
         private TopHolder(int maxTopPartitionCount, long minTrackedValue, 
Collection<Range<Token>> ranges)
         {
-            this(maxTopPartitionCount, minTrackedValue, new TreeSet<>(), 
ranges, 0);
+            this(maxTopPartitionCount, minTrackedValue, new TreeSet<>(), new 
ObjectLongHashMap<>(), ranges, 0);
         }
 
-        private TopHolder(int maxTopPartitionCount, long minTrackedValue, 
NavigableSet<TopPartition> top, Collection<Range<Token>> ranges, long 
lastUpdate)
+        private TopHolder(int maxTopPartitionCount, long minTrackedValue, 
NavigableSet<TopPartition> top, ObjectLongMap<DecoratedKey> lookup, 
Collection<Range<Token>> ranges, long lastUpdate)
         {
             this.maxTopPartitionCount = maxTopPartitionCount;
             this.minTrackedValue = minTrackedValue;
             this.top = top;
+            this.lookup = lookup;
             this.ranges = ranges;
             this.lastUpdate = lastUpdate;
         }
@@ -224,6 +228,7 @@ public class TopPartitionTracker implements Closeable
             this.maxTopPartitionCount = maxTopPartitionCount;
             this.minTrackedValue = minTrackedValue;
             top = new TreeSet<>();
+            this.lookup = new ObjectLongHashMap<>();
             this.ranges = null;
             this.lastUpdate = storedTopPartitions.lastUpdated;
 
@@ -243,9 +248,11 @@ public class TopPartitionTracker implements Closeable
         private void track(TopPartition tp)
         {
             top.add(tp);
+            lookup.put(tp.key, tp.value);
             while (top.size() > maxTopPartitionCount)
             {
-                top.pollLast();
+                TopPartition p = top.pollLast();
+                lookup.remove(p.key);
                 currentMinValue = top.last().value;
             }
             currentMinValue = Math.min(tp.value, currentMinValue);
@@ -274,7 +281,12 @@ public class TopPartitionTracker implements Closeable
 
         private TopHolder cloneForMerging(long lastUpdate)
         {
-            return new TopHolder(maxTopPartitionCount, minTrackedValue, new 
TreeSet<>(top), ranges, lastUpdate);
+            return new TopHolder(maxTopPartitionCount, minTrackedValue, new 
TreeSet<>(top), new ObjectLongHashMap<>(lookup), ranges, lastUpdate);
+        }
+
+        public long getEstimate(DecoratedKey dk)
+        {
+            return lookup.getOrDefault(dk, 0L);
         }
 
         public String toString()
diff --git a/src/java/org/apache/cassandra/net/ParamType.java 
b/src/java/org/apache/cassandra/net/ParamType.java
index 2367b1a390..5d3c92316f 100644
--- a/src/java/org/apache/cassandra/net/ParamType.java
+++ b/src/java/org/apache/cassandra/net/ParamType.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.net;
 import javax.annotation.Nullable;
 
 import org.apache.cassandra.io.IVersionedSerializer;
+import 
org.apache.cassandra.service.writes.thresholds.WriteThresholdMapSerializer;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.Int32Serializer;
 import org.apache.cassandra.utils.Int64Serializer;
@@ -54,7 +55,9 @@ public enum ParamType
     ROW_INDEX_READ_SIZE_WARN         (13, Int64Serializer.serializer),
     CUSTOM_MAP                       (14, CustomParamsSerializer.serializer),
     TOO_MANY_REFERENCED_INDEXES_WARN (16, Int32Serializer.serializer),
-    TOO_MANY_REFERENCED_INDEXES_FAIL (17, Int32Serializer.serializer);
+    TOO_MANY_REFERENCED_INDEXES_FAIL (17, Int32Serializer.serializer),
+    WRITE_SIZE_WARN                  (18, 
WriteThresholdMapSerializer.serializer),
+    WRITE_TOMBSTONE_WARN             (19, 
WriteThresholdMapSerializer.serializer);
 
     final int id;
     final IVersionedSerializer serializer;
diff --git 
a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 5a0d16cf59..434f25055b 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
@@ -48,6 +49,9 @@ import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.locator.ReplicaPlan.ForWrite;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.service.writes.thresholds.CoordinatorWriteWarnings;
+import org.apache.cassandra.service.writes.thresholds.WriteWarningContext;
+import org.apache.cassandra.service.writes.thresholds.WriteWarningsSnapshot;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.transport.Dispatcher;
 import org.apache.cassandra.utils.concurrent.Condition;
@@ -91,6 +95,9 @@ public abstract class AbstractWriteResponseHandler<T> 
implements RequestCallback
     private volatile Map<InetAddressAndPort, RequestFailureReason> 
failureReasonByEndpoint;
     private final Dispatcher.RequestTime requestTime;
     private @Nullable final Supplier<Mutation> hintOnFailure;
+    private volatile WriteWarningContext warningContext;
+    private static final 
AtomicReferenceFieldUpdater<AbstractWriteResponseHandler, WriteWarningContext> 
warningsUpdater =
+        
AtomicReferenceFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, 
WriteWarningContext.class, "warningContext");
 
     /**
       * Delegate to another WriteResponseHandler or possibly this one to track 
if the ideal consistency level was reached.
@@ -176,7 +183,29 @@ public abstract class AbstractWriteResponseHandler<T> 
implements RequestCallback
         }
 
         if (replicaPlan.stillAppliesTo(ClusterMetadata.current()))
-            return;
+        {
+            if (warningContext != null)
+            {
+                WriteWarningsSnapshot snapshot = warningContext.snapshot();
+                if (!snapshot.isEmpty() && hintOnFailure != null)
+                    CoordinatorWriteWarnings.update(hintOnFailure.get(), 
snapshot);
+            }
+        }
+    }
+
+    protected WriteWarningContext getWarningContext()
+    {
+        WriteWarningContext current;
+        while (true)
+        {
+            current = warningContext;
+            if (current != null)
+                return current;
+
+            current = new WriteWarningContext();
+            if (warningsUpdater.compareAndSet(this, null, current))
+                return current;
+        }
     }
 
     private void throwTimeout()
diff --git 
a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index 689b5f100d..b897298dbb 100644
--- 
a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++ 
b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@ -24,6 +24,7 @@ import java.util.function.Supplier;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.MessageParams;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.locator.Locator;
@@ -31,6 +32,8 @@ import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.ParamType;
+import org.apache.cassandra.service.writes.thresholds.WriteWarningContext;
 import org.apache.cassandra.transport.Dispatcher;
 
 /**
@@ -79,9 +82,22 @@ public class DatacenterSyncWriteResponseHandler<T> extends 
AbstractWriteResponse
     {
         try
         {
-            String dataCenter = message == null
-                                ? locator.local().datacenter
-                                : locator.location(message.from()).datacenter;
+            Map<ParamType, Object> params;
+            String dataCenter;
+
+            if (message != null)
+            {
+                params = message.header.params();
+                dataCenter = locator.location(message.from()).datacenter;
+            }
+            else
+            {
+                params = MessageParams.capture();
+                dataCenter = locator.local().datacenter;
+            }
+
+            if (WriteWarningContext.isSupported(params.keySet()))
+                getWarningContext().updateCounters(params);
 
             responses.get(dataCenter).getAndDecrement();
             acks.incrementAndGet();
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index ba660858a4..cc97da02aa 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -82,6 +82,7 @@ import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.db.RejectException;
 import org.apache.cassandra.db.SinglePartitionReadCommand;
 import org.apache.cassandra.db.TruncateRequest;
+import org.apache.cassandra.db.WriteThresholds;
 import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
 import org.apache.cassandra.db.partitions.FilteredPartition;
@@ -2023,7 +2024,13 @@ public class StorageProxy implements StorageProxyMBean
             {
                 try
                 {
+                    MessageParams.reset();
+
+                    boolean trackWriteWarnings = description instanceof 
Mutation && handler instanceof AbstractWriteResponseHandler && 
DatabaseDescriptor.getWriteThresholdsEnabled();
+                    if (trackWriteWarnings)
+                        WriteThresholds.checkWriteThresholds((Mutation) 
description);
                     runnable.run();
+
                     handler.onResponse(null);
                 }
                 catch (Exception ex)
@@ -2032,6 +2039,10 @@ public class StorageProxy implements StorageProxyMBean
                         logger.error("Failed to apply mutation locally : ", 
ex);
                     
handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), 
RequestFailure.forException(ex));
                 }
+                finally
+                {
+                    MessageParams.reset();
+                }
             }
 
             @Override
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index dd0fe6e569..bac2342c12 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -4758,6 +4758,17 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         logger.info("updated tombstone_warn_threshold to {}", threshold);
     }
 
+    public int getWriteTombstoneWarnThreshold()
+    {
+        return DatabaseDescriptor.getWriteTombstoneWarnThreshold();
+    }
+
+    public void setWriteTombstoneWarnThreshold(int threshold)
+    {
+        DatabaseDescriptor.setWriteTombstoneWarnThreshold(threshold);
+        logger.info("updated write_tombstone_warn_threshold to {}", threshold);
+    }
+
     public int getTombstoneFailureThreshold()
     {
         return DatabaseDescriptor.getTombstoneFailureThreshold();
@@ -5328,6 +5339,30 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         
DatabaseDescriptor.setLocalReadSizeWarnThreshold(parseDataStorageSpec(threshold));
     }
 
+    @Override
+    public boolean getWriteThresholdsEnabled()
+    {
+        return DatabaseDescriptor.getWriteThresholdsEnabled();
+    }
+
+    @Override
+    public void setWriteThresholdsEnabled(boolean value)
+    {
+        DatabaseDescriptor.setWriteThresholdsEnabled(value);
+    }
+
+    @Override
+    public String getWriteTooLargeWarnThreshold()
+    {
+        return toString(DatabaseDescriptor.getWriteSizeWarnThreshold());
+    }
+
+    @Override
+    public void setWriteTooLargeWarnThreshold(String threshold)
+    {
+        
DatabaseDescriptor.setWriteSizeWarnThreshold(parseDataStorageSpec(threshold));
+    }
+
     @Override
     public String getLocalReadTooLargeAbortThreshold()
     {
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index d0929df8da..058bc8955f 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -1046,7 +1046,10 @@ public interface StorageServiceMBean extends 
NotificationEmitter
     public int getTombstoneWarnThreshold();
     /** Sets the threshold for warning queries with many tombstones */
     public void setTombstoneWarnThreshold(int tombstoneDebugThreshold);
-
+    /** Returns the threshold for write warning of queries with many 
tombstones */
+    public int getWriteTombstoneWarnThreshold();
+    /** Sets the threshold for write warning queries with many tombstones */
+    public void setWriteTombstoneWarnThreshold(int 
writeTombstoneDebugThreshold);
     /** Returns the threshold for abandoning queries with many tombstones */
     public int getTombstoneFailureThreshold();
     /** Sets the threshold for abandoning queries with many tombstones */
@@ -1312,6 +1315,12 @@ public interface StorageServiceMBean extends 
NotificationEmitter
     public String getRowIndexReadSizeAbortThreshold();
     public void setRowIndexReadSizeAbortThreshold(String value);
 
+    public boolean getWriteThresholdsEnabled();
+    public void setWriteThresholdsEnabled(boolean value);
+
+    public String getWriteTooLargeWarnThreshold();
+    public void setWriteTooLargeWarnThreshold(String value);
+
     public void setDefaultKeyspaceReplicationFactor(int value);
     public int getDefaultKeyspaceReplicationFactor();
 
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index 2a3665712b..d99aa3989f 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -17,16 +17,21 @@
  */
 package org.apache.cassandra.service;
 
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.function.Supplier;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.MessageParams;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.WriteType;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.ParamType;
+import org.apache.cassandra.service.writes.thresholds.WriteWarningContext;
 import org.apache.cassandra.transport.Dispatcher;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -58,7 +63,13 @@ public class WriteResponseHandler<T> extends 
AbstractWriteResponseHandler<T>
 
     public void onResponse(Message<T> m)
     {
-        replicaPlan.collectSuccess(m == null ? 
FBUtilities.getBroadcastAddressAndPort() : m.from());
+        InetAddressAndPort from = m == null ? 
FBUtilities.getBroadcastAddressAndPort() : m.from();
+        Map<ParamType, Object> params = m != null ? m.header.params() : 
MessageParams.capture();
+
+        if (WriteWarningContext.isSupported(params.keySet()))
+            getWarningContext().updateCounters(params);
+
+        replicaPlan.collectSuccess(from);
         if (responsesUpdater.decrementAndGet(this) == 0)
             signal();
         //Must be last after all subclass processing
diff --git 
a/src/java/org/apache/cassandra/service/reads/thresholds/CoordinatorWarnings.java
 
b/src/java/org/apache/cassandra/service/reads/thresholds/CoordinatorWarnings.java
index fdb68a56a8..7ea6b3c07a 100644
--- 
a/src/java/org/apache/cassandra/service/reads/thresholds/CoordinatorWarnings.java
+++ 
b/src/java/org/apache/cassandra/service/reads/thresholds/CoordinatorWarnings.java
@@ -31,8 +31,7 @@ import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.service.ClientWarn;
-
-import io.netty.util.concurrent.FastThreadLocal;
+import org.apache.cassandra.service.thresholds.CoordinatorWarningsState;
 
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.READS_THRESHOLDS_COORDINATOR_DEFENSIVE_CHECKS_ENABLED;
 
@@ -41,34 +40,38 @@ public class CoordinatorWarnings
     private static final Logger logger = 
LoggerFactory.getLogger(CoordinatorWarnings.class);
     private static final boolean ENABLE_DEFENSIVE_CHECKS = 
READS_THRESHOLDS_COORDINATOR_DEFENSIVE_CHECKS_ENABLED.getBoolean();
 
-    // when .init() is called set the STATE to be INIT; this is to lazy 
allocate the map only when warnings are generated
     private static final Map<ReadCommand, WarningsSnapshot> INIT = 
Collections.emptyMap();
-    private static final FastThreadLocal<Map<ReadCommand, WarningsSnapshot>> 
STATE = new FastThreadLocal<>();
 
-    private CoordinatorWarnings() {}
+    private static final CoordinatorWarningsState<Map<ReadCommand, 
WarningsSnapshot>> STATE =
+    new CoordinatorWarningsState<>("CoordinatorWarnings",
+                                   INIT,
+                                   null, // reset() calls threadLocal.remove() 
when emptySentinel is null
+                                   HashMap::new,
+                                   IgnoreMap::get,
+                                   logger,
+                                   ENABLE_DEFENSIVE_CHECKS);
+
+    private CoordinatorWarnings()
+    {
+    }
 
     public static void init()
     {
-        logger.trace("CoordinatorTrackWarnings.init()");
-        if (STATE.get() != null)
-        {
-            if (ENABLE_DEFENSIVE_CHECKS)
-                throw new AssertionError("CoordinatorTrackWarnings.init called 
while state is not null: " + STATE.get());
-            return;
-        }
-        STATE.set(INIT);
+        STATE.init();
     }
 
     public static void reset()
     {
-        logger.trace("CoordinatorTrackWarnings.reset()");
-        STATE.remove();
+        STATE.reset();
     }
 
     public static void update(ReadCommand cmd, WarningsSnapshot snapshot)
     {
-        logger.trace("CoordinatorTrackWarnings.update({}, {})", 
cmd.metadata(), snapshot);
-        Map<ReadCommand, WarningsSnapshot> map = mutable();
+        if (logger.isTraceEnabled())
+            logger.trace("CoordinatorTrackWarnings.update({}, {})", 
cmd.metadata(), snapshot);
+
+        Map<ReadCommand, WarningsSnapshot> map = STATE.mutable();
+
         WarningsSnapshot previous = map.get(cmd);
         WarningsSnapshot update = WarningsSnapshot.merge(previous, snapshot);
         if (update == null) // null happens when the merge had null input or 
EMPTY input... remove the command from the map
@@ -79,8 +82,17 @@ public class CoordinatorWarnings
 
     public static void done()
     {
-        Map<ReadCommand, WarningsSnapshot> map = readonly();
-        logger.trace("CoordinatorTrackWarnings.done() with state {}", map);
+        STATE.processAndReset(CoordinatorWarnings::processWarnings);
+    }
+
+    private static void processWarnings(Map<ReadCommand, WarningsSnapshot> map)
+    {
+        if (map == INIT || map.isEmpty())
+            return;
+
+        if (logger.isTraceEnabled())
+            logger.trace("CoordinatorTrackWarnings.done() with state {}", map);
+
         map.forEach((command, merged) -> {
             ColumnFamilyStore cfs = 
Schema.instance.getColumnFamilyStoreInstance(command.metadata().id);
             // race condition when dropping tables, also happens in unit tests 
as Schema may be bypassed
@@ -101,50 +113,6 @@ public class CoordinatorWarnings
             recordAborts(merged.indexReadSSTablesCount, cql, loggableTokens, 
cfs.metric.tooManySSTableIndexesReadAborts, 
WarningsSnapshot::tooManyIndexesReadAbortMessage);
             recordWarnings(merged.indexReadSSTablesCount, cql, loggableTokens, 
cfs.metric.tooManySSTableIndexesReadWarnings, 
WarningsSnapshot::tooManyIndexesReadWarnMessage);
         });
-
-        // reset the state to block from double publishing
-        clearState();
-    }
-
-    private static Map<ReadCommand, WarningsSnapshot> mutable()
-    {
-        Map<ReadCommand, WarningsSnapshot> map = STATE.get();
-        if (map == null)
-        {
-            if (ENABLE_DEFENSIVE_CHECKS)
-                throw new AssertionError("CoordinatorTrackWarnings.mutable 
calling without calling .init() first");
-            // set map to an "ignore" map; dropping all mutations
-            // since init was not called, it isn't clear that the state will 
be cleaned up, so avoid populating
-            map = IgnoreMap.get();
-        }
-        else if (map == INIT)
-        {
-            map = new HashMap<>();
-            STATE.set(map);
-        }
-        return map;
-    }
-
-    private static Map<ReadCommand, WarningsSnapshot> readonly()
-    {
-        Map<ReadCommand, WarningsSnapshot> map = STATE.get();
-        if (map == null)
-        {
-            if (ENABLE_DEFENSIVE_CHECKS)
-                throw new AssertionError("CoordinatorTrackWarnings.readonly 
calling without calling .init() first");
-            // since init was not called, it isn't clear that the state will 
be cleaned up, so avoid populating
-            map = Collections.emptyMap();
-        }
-        return map;
-    }
-
-    private static void clearState()
-    {
-        Map<ReadCommand, WarningsSnapshot> map = STATE.get();
-        if (map == null || map == INIT)
-            return;
-        // map is mutable, so set to INIT
-        STATE.set(INIT);
     }
 
     // utility interface to let callers use static functions
diff --git 
a/src/java/org/apache/cassandra/service/thresholds/CoordinatorWarningsState.java
 
b/src/java/org/apache/cassandra/service/thresholds/CoordinatorWarningsState.java
new file mode 100644
index 0000000000..1ace6701eb
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/service/thresholds/CoordinatorWarningsState.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.thresholds;
+
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+
+import io.netty.util.concurrent.FastThreadLocal;
+
+/**
+ * Generic ThreadLocal state manager for coordinator warnings.
+ * Provides lifecycle management (init, update, done, reset) for accumulating
+ * warnings across a client request.
+ *
+ * @param <S> the type of state to manage (e.g., Map, custom holder class)
+ */
+public class CoordinatorWarningsState<S>
+{
+    private final FastThreadLocal<S> threadLocal;
+    private final S initSentinel;
+    private final S emptySentinel;
+    private final Supplier<S> stateFactory;
+    private final Supplier<S> fallbackSupplier;
+    private final Logger logger;
+    private final String name;
+    private final boolean enableDefensiveChecks;
+
+    /**
+     * Creates a new coordinator warnings state manager.
+     *
+     * @param name descriptive name for logging
+     * @param initSentinel sentinel value indicating initialized but empty 
state
+     * @param emptySentinel sentinel value indicating cleared state
+     * @param stateFactory factory to create new mutable state instances
+     * @param fallbackSupplier supplies fallback state if init() was not called
+     * @param logger logger for diagnostic messages
+     * @param enableDefensiveChecks whether to enable defensive checks
+     */
+    public CoordinatorWarningsState(String name,
+                                    S initSentinel,
+                                    @Nullable S emptySentinel,
+                                    Supplier<S> stateFactory,
+                                    Supplier<S> fallbackSupplier,
+                                    Logger logger,
+                                    boolean enableDefensiveChecks)
+    {
+        this.name = name;
+        this.initSentinel = initSentinel;
+        this.emptySentinel = emptySentinel;
+        this.stateFactory = stateFactory;
+        this.fallbackSupplier = fallbackSupplier;
+        this.logger = logger;
+        this.enableDefensiveChecks = enableDefensiveChecks;
+        this.threadLocal = new FastThreadLocal<>();
+    }
+
+    /**
+     * Initialize state for this thread. Must be called at the start of a 
client request.
+     */
+    public void init()
+    {
+        if (logger.isTraceEnabled())
+            logger.trace("{}.init()", name);
+
+        if (enableDefensiveChecks)
+        {
+            S current = threadLocal.get();
+            if (current != null)
+                throw new AssertionError(name + ".init called while state is 
not null: " + current);
+        }
+        threadLocal.set(initSentinel);
+    }
+
+    /**
+     * Reset/clear state for this thread.
+     */
+    public void reset()
+    {
+        if (logger.isTraceEnabled())
+            logger.trace("{}.reset()", name);
+
+        if (emptySentinel != null)
+            threadLocal.set(emptySentinel);
+        else
+            threadLocal.remove();
+    }
+
+    /**
+     * Gets the mutable state for this thread, lazily creating it if needed.
+     * Transitions from initSentinel to a new mutable instance on first access.
+     *
+     * @return mutable state instance
+     */
+    public S mutable()
+    {
+        S state = threadLocal.get();
+
+        if (state == null)
+        {
+            if (enableDefensiveChecks)
+                throw new AssertionError(name + " accessing state without 
calling .init() first");
+            return fallbackSupplier.get();
+        }
+
+        if (state == initSentinel)
+        {
+            state = stateFactory.get();
+            threadLocal.set(state);
+        }
+
+        return state;
+    }
+
+    /**
+     * Process accumulated state and reset.
+     * Must be called at the end of a client request.
+     * The reset will occur even if the processor throws an exception.
+     *
+     * @param processor processes the state
+     */
+    public void processAndReset(Consumer<S> processor)
+    {
+        try
+        {
+            S state = threadLocal.get();
+            if (state == null || state == initSentinel)
+                return;
+
+            processor.accept(state);
+        }
+        finally
+        {
+            reset();
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/src/java/org/apache/cassandra/service/writes/thresholds/CoordinatorWriteWarnings.java
 
b/src/java/org/apache/cassandra/service/writes/thresholds/CoordinatorWriteWarnings.java
new file mode 100644
index 0000000000..7ae35be257
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/service/writes/thresholds/CoordinatorWriteWarnings.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.writes.thresholds;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DurationSpec;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.thresholds.CoordinatorWarningsState;
+import org.apache.cassandra.utils.Clock;
+
+public class CoordinatorWriteWarnings
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(CoordinatorWriteWarnings.class);
+
+    private static final Warnings INIT = new Warnings();
+    private static final Warnings EMPTY = new Warnings();
+    private static final AtomicLong nextWarn = new AtomicLong(0);
+
+    private static boolean timeForWarn()
+    {
+        DurationSpec.LongMillisecondsBound interval = 
DatabaseDescriptor.getCoordinatorWriteWarnInterval();
+        if (interval == null || interval.toMilliseconds() == 0)
+            return true;
+
+        long now = Clock.Global.nanoTime();
+        long next = nextWarn.get();
+        return now > next && nextWarn.compareAndSet(next, now + 
TimeUnit.MILLISECONDS.toNanos(interval.toMilliseconds()));
+    }
+
+    private static final CoordinatorWarningsState<Warnings> STATE =
+    new CoordinatorWarningsState<>("CoordinatorWriteWarnings",
+                                   INIT,
+                                   EMPTY,
+                                   Warnings::new,
+                                   () -> EMPTY,
+                                   logger,
+                                   false);
+
+    /**
+     * Initialize coordinator write warnings for this thread. Must be called 
at the start of a client request.
+     */
+    public static void init()
+    {
+        STATE.init();
+    }
+
+    /**
+     * Update warnings for a partition after receiving responses from replicas.
+     *
+     * @param mutation the mutation that was written
+     * @param snapshot the aggregated warnings from replicas
+     */
+    public static void update(IMutation mutation, WriteWarningsSnapshot 
snapshot)
+    {
+        if (snapshot.isEmpty()) return;
+
+        Warnings warnings = STATE.mutable();
+        if (warnings == EMPTY) return;
+
+        warnings.merge(mutation.key(), snapshot);
+    }
+
+    /**
+     * Process accumulated warnings: send to client and update metrics.
+     * Must be called at the end of a client request.
+     */
+    public static void done()
+    {
+        STATE.processAndReset(CoordinatorWriteWarnings::processWarnings);
+    }
+
+    /**
+     * Reset/clear warnings for this thread.
+     */
+    public static void reset()
+    {
+        STATE.reset();
+    }
+
+    private static void processWarnings(Warnings warnings)
+    {
+
+        if (warnings.snapshot == null || warnings.snapshot.isEmpty() || 
warnings.partitionKey == null)
+            return;
+
+        if (!timeForWarn())
+            return;
+
+        WriteWarningsSnapshot snapshot = warnings.snapshot;
+
+        for (Map.Entry<TableId, Long> entry : 
snapshot.writeSize.tableValues.entrySet())
+        {
+            TableId tableId = entry.getKey();
+            ColumnFamilyStore cfs = 
Schema.instance.getColumnFamilyStoreInstance(tableId);
+            if (cfs == null)
+            {
+                logger.warn("ColumnFamilyStore is null for table {}, 
skipping", tableId);
+                continue;
+            }
+
+            TableMetadata metadata = cfs.metadata();
+            String partitionKey = 
metadata.partitionKeyType.toCQLString(warnings.partitionKey.getKey());
+            String msg = String.format("Write to %s.%s partition %s: %s",
+                                       metadata.keyspace,
+                                       metadata.name,
+                                       partitionKey,
+                                       
WriteWarningsSnapshot.writeSizeWarnMessage(entry.getValue()));
+            ClientWarn.instance.warn(msg);
+            logger.warn(msg);
+            cfs.metric.writeSizeWarnings.mark();
+        }
+
+        for (Map.Entry<TableId, Long> entry : 
snapshot.writeTombstone.tableValues.entrySet())
+        {
+            TableId tableId = entry.getKey();
+            ColumnFamilyStore cfs = 
Schema.instance.getColumnFamilyStoreInstance(tableId);
+            if (cfs == null)
+            {
+                logger.warn("ColumnFamilyStore is null for table {}, 
skipping", tableId);
+                continue;
+            }
+
+            TableMetadata metadata = cfs.metadata();
+            String partitionKey = 
metadata.partitionKeyType.toCQLString(warnings.partitionKey.getKey());
+            String msg = String.format("Write to %s.%s partition %s: %s",
+                                       metadata.keyspace,
+                                       metadata.name,
+                                       partitionKey,
+                                       
WriteWarningsSnapshot.writeTombstoneWarnMessage(entry.getValue()));
+            ClientWarn.instance.warn(msg);
+            logger.warn(msg);
+            cfs.metric.writeTombstoneWarnings.mark();
+        }
+    }
+
+    /**
+     * Internal state holder for accumulated warnings.
+     * A Mutation is always for a single partition key, so we store it once
+     * and track warnings per table within that partition.
+     */
+    private static class Warnings
+    {
+        @Nullable
+        DecoratedKey partitionKey;
+
+        @Nullable
+        WriteWarningsSnapshot snapshot;
+
+        void merge(DecoratedKey partitionKey, WriteWarningsSnapshot snapshot)
+        {
+            if (this.partitionKey == null)
+                this.partitionKey = partitionKey;
+
+            this.snapshot = this.snapshot == null ? snapshot : 
this.snapshot.merge(snapshot);
+        }
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/service/writes/thresholds/WarnCounter.java 
b/src/java/org/apache/cassandra/service/writes/thresholds/WarnCounter.java
new file mode 100644
index 0000000000..78518fe8df
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/writes/thresholds/WarnCounter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.writes.thresholds;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.schema.TableId;
+
+public class WarnCounter
+{
+    private final ConcurrentHashMap<TableId, AtomicLong> tableValues = new 
ConcurrentHashMap<>();
+
+    void addWarning(Map<TableId, Long> incoming)
+    {
+        for (Map.Entry<TableId, Long> entry : incoming.entrySet())
+            tableValues.computeIfAbsent(entry.getKey(), k -> new AtomicLong())
+                       .accumulateAndGet(entry.getValue(), Math::max);
+    }
+
+    public WriteThresholdCounter snapshot()
+    {
+        ImmutableMap.Builder<TableId, Long> builder = ImmutableMap.builder();
+        for (Map.Entry<TableId, AtomicLong> entry : tableValues.entrySet())
+            builder.put(entry.getKey(), entry.getValue().get());
+
+        return WriteThresholdCounter.create(builder.build());
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/service/writes/thresholds/WriteThresholdCounter.java
 
b/src/java/org/apache/cassandra/service/writes/thresholds/WriteThresholdCounter.java
new file mode 100644
index 0000000000..a57baa01e5
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/service/writes/thresholds/WriteThresholdCounter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.writes.thresholds;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.schema.TableId;
+
+public class WriteThresholdCounter
+{
+    private static final WriteThresholdCounter EMPTY = new 
WriteThresholdCounter(ImmutableMap.of());
+    public final ImmutableMap<TableId, Long> tableValues;
+
+    private WriteThresholdCounter(ImmutableMap<TableId, Long> tableValues)
+    {
+        this.tableValues = tableValues;
+    }
+
+    public static WriteThresholdCounter empty()
+    {
+        return EMPTY;
+    }
+
+    public boolean isEmpty()
+    {
+        return tableValues.isEmpty();
+    }
+
+    public static WriteThresholdCounter create(Map<TableId, Long> snapshot)
+    {
+        if (snapshot.isEmpty())
+            return EMPTY;
+        return new WriteThresholdCounter(ImmutableMap.copyOf(snapshot));
+    }
+
+    public WriteThresholdCounter merge(WriteThresholdCounter other)
+    {
+        if (other == EMPTY)
+            return this;
+        if (this == EMPTY)
+            return other;
+        Map<TableId, Long> merged = new HashMap<>(tableValues);
+        for (Map.Entry<TableId, Long> entry : other.tableValues.entrySet())
+            merged.merge(entry.getKey(), entry.getValue(), Math::max);
+        return new WriteThresholdCounter(ImmutableMap.copyOf(merged));
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        WriteThresholdCounter that = (WriteThresholdCounter) o;
+        return Objects.equals(tableValues, that.tableValues);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(tableValues);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "WriteThresholdCounter{tableValues=" + tableValues + '}';
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/service/writes/thresholds/WriteThresholdMapSerializer.java
 
b/src/java/org/apache/cassandra/service/writes/thresholds/WriteThresholdMapSerializer.java
new file mode 100644
index 0000000000..bc2cc863ad
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/service/writes/thresholds/WriteThresholdMapSerializer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.writes.thresholds;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.TableId;
+
+public class WriteThresholdMapSerializer implements 
IVersionedSerializer<Map<TableId, Long>>
+{
+    public static final WriteThresholdMapSerializer serializer = new 
WriteThresholdMapSerializer();
+
+    @Override
+    public void serialize(Map<TableId, Long> t, DataOutputPlus out, int 
version) throws IOException
+    {
+        out.writeUnsignedVInt32(t.size());
+        for (Map.Entry<TableId, Long> entry : t.entrySet())
+        {
+            entry.getKey().serialize(out);
+            out.writeUnsignedVInt(entry.getValue());
+        }
+    }
+
+    @Override
+    public Map<TableId, Long> deserialize(DataInputPlus in, int version) 
throws IOException
+    {
+        int size = in.readUnsignedVInt32();
+        Map<TableId, Long> result = Maps.newHashMapWithExpectedSize(size);
+        for (int i = 0; i < size; i++)
+            result.put(TableId.deserialize(in), in.readUnsignedVInt());
+        return result;
+    }
+
+    @Override
+    public long serializedSize(Map<TableId, Long> t, int version)
+    {
+        long size = TypeSizes.sizeofUnsignedVInt(t.size());
+        for(Map.Entry<TableId, Long> entry : t.entrySet())
+        {
+            size += entry.getKey().serializedSize();
+            size += TypeSizes.sizeofUnsignedVInt(entry.getValue());
+        }
+        return size;
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/service/writes/thresholds/WriteWarningContext.java
 
b/src/java/org/apache/cassandra/service/writes/thresholds/WriteWarningContext.java
new file mode 100644
index 0000000000..1d46b15e3b
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/service/writes/thresholds/WriteWarningContext.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.writes.thresholds;
+
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.net.ParamType;
+import org.apache.cassandra.schema.TableId;
+
+/**
+ * Accumulates write warning information from replica responses.
+ * Similar to WarningContext but for write operations (warnings only, no 
aborts).
+ */
+public class WriteWarningContext
+{
+    private static final EnumSet<ParamType> SUPPORTED = EnumSet.of(
+        ParamType.WRITE_SIZE_WARN,
+        ParamType.WRITE_TOMBSTONE_WARN
+    );
+
+    final WarnCounter writeSize = new WarnCounter();
+    final WarnCounter writeTombstone = new WarnCounter();
+
+    public static boolean isSupported(Set<ParamType> keys)
+    {
+        return !Collections.disjoint(keys, SUPPORTED);
+    }
+
+    /**
+     * Update counters from replica response parameters. Writes never abort, 
so this always returns without throwing.
+     */
+    public void updateCounters(Map<ParamType, Object> params)
+    {
+        Object value = params.get(ParamType.WRITE_SIZE_WARN);
+        if (value != null)
+            writeSize.addWarning((Map<TableId, Long>) value);
+
+        value = params.get(ParamType.WRITE_TOMBSTONE_WARN);
+        if (value != null)
+            writeTombstone.addWarning((Map<TableId, Long>) value);
+    }
+
+    public WriteWarningsSnapshot snapshot()
+    {
+        return WriteWarningsSnapshot.create(writeSize.snapshot(), 
writeTombstone.snapshot());
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/service/writes/thresholds/WriteWarningsSnapshot.java
 
b/src/java/org/apache/cassandra/service/writes/thresholds/WriteWarningsSnapshot.java
new file mode 100644
index 0000000000..a8195ea5b8
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/service/writes/thresholds/WriteWarningsSnapshot.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.writes.thresholds;
+
+import java.util.Objects;
+
+/**
+ * Immutable snapshot of write warnings. Simpler than WarningsSnapshot since 
writes never abort (warnings only).
+ */
+public class WriteWarningsSnapshot
+{
+    private static final WriteWarningsSnapshot EMPTY = new 
WriteWarningsSnapshot(WriteThresholdCounter.empty(), 
WriteThresholdCounter.empty());
+
+    public final WriteThresholdCounter writeSize;
+    public final WriteThresholdCounter writeTombstone;
+
+    private WriteWarningsSnapshot(WriteThresholdCounter writeSize, 
WriteThresholdCounter writeTombstone)
+    {
+        this.writeSize = writeSize;
+        this.writeTombstone = writeTombstone;
+    }
+
+    public static WriteWarningsSnapshot create(WriteThresholdCounter 
writeSize, WriteThresholdCounter writeTombstone)
+    {
+        if (writeSize.isEmpty() && writeTombstone.isEmpty())
+            return EMPTY;
+        return new WriteWarningsSnapshot(writeSize, writeTombstone);
+    }
+
+    public boolean isEmpty()
+    {
+        return this == EMPTY;
+    }
+
+    public WriteWarningsSnapshot merge(WriteWarningsSnapshot other)
+    {
+        if (other == null || other == EMPTY)
+            return this;
+        if (this == EMPTY)
+            return other;
+        return WriteWarningsSnapshot.create(writeSize.merge(other.writeSize), 
writeTombstone.merge(other.writeTombstone));
+    }
+
+    public static String writeSizeWarnMessage(long bytes)
+    {
+        return String.format("Write to large partition; estimated size is %d 
bytes (see write_size_warn_threshold)", bytes);
+    }
+
+    public static String writeTombstoneWarnMessage(long tombstones)
+    {
+        return String.format("Write to partition with many tombstones; 
estimated count is %d (see write_tombstone_warn_threshold)", tombstones);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        WriteWarningsSnapshot that = (WriteWarningsSnapshot) o;
+        return Objects.equals(writeSize, that.writeSize) && 
Objects.equals(writeTombstone, that.writeTombstone);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(writeSize, writeTombstone);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "WriteWarningsSnapshot{" +
+               "writeSize=" + writeSize +
+               ", writeTombstone=" + writeTombstone +
+               '}';
+    }
+}
diff --git a/src/java/org/apache/cassandra/transport/Dispatcher.java 
b/src/java/org/apache/cassandra/transport/Dispatcher.java
index ccec6817c3..3f6468c218 100644
--- a/src/java/org/apache/cassandra/transport/Dispatcher.java
+++ b/src/java/org/apache/cassandra/transport/Dispatcher.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.net.FrameEncoder;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.reads.thresholds.CoordinatorWarnings;
+import org.apache.cassandra.service.writes.thresholds.CoordinatorWriteWarnings;
 import org.apache.cassandra.transport.ClientResourceLimits.Overload;
 import org.apache.cassandra.transport.Flusher.FlushItem;
 import org.apache.cassandra.transport.messages.ErrorMessage;
@@ -382,7 +383,10 @@ public class Dispatcher implements 
CQLMessageHandler.MessageConsumer<Message.Req
         // even if ClientWarn is disabled, still setup 
CoordinatorTrackWarnings, as this will populate metrics and
         // emit logs on the server; the warnings will just be ignored and not 
sent to the client
         if (request.isTrackable())
+        {
             CoordinatorWarnings.init();
+            CoordinatorWriteWarnings.init();
+        }
 
         switch (backpressure)
         {
@@ -425,7 +429,10 @@ public class Dispatcher implements 
CQLMessageHandler.MessageConsumer<Message.Req
         Message.Response response = request.execute(qstate, requestTime);
 
         if (request.isTrackable())
+        {
             CoordinatorWarnings.done();
+            CoordinatorWriteWarnings.done();
+        }
 
         response.setStreamId(request.getStreamId());
         response.setWarnings(ClientWarn.instance.getWarnings());
@@ -448,7 +455,10 @@ public class Dispatcher implements 
CQLMessageHandler.MessageConsumer<Message.Req
             JVMStabilityInspector.inspectThrowable(t);
 
             if (request.isTrackable())
+            {
                 CoordinatorWarnings.done();
+                CoordinatorWriteWarnings.done();
+            }
 
             Predicate<Throwable> handler = 
ExceptionHandlers.getUnexpectedExceptionHandler(channel, true);
             ErrorMessage error = ErrorMessage.fromException(t, handler);
@@ -459,6 +469,7 @@ public class Dispatcher implements 
CQLMessageHandler.MessageConsumer<Message.Req
         finally
         {
             CoordinatorWarnings.reset();
+            CoordinatorWriteWarnings.reset();
             ClientWarn.instance.resetWarnings();
         }
     }
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index 20c1d2fd70..07200f2d16 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -67,6 +67,11 @@ local_read_size_warn_threshold: 4096KiB
 local_read_size_fail_threshold: 8192KiB
 row_index_read_size_warn_threshold: 4096KiB
 row_index_read_size_fail_threshold: 8192KiB
+write_thresholds_enabled: true
+write_size_warn_threshold: 4096KiB
+write_tombstone_warn_threshold: 1000
+min_tracked_partition_size: 1MiB
+min_tracked_partition_tombstone_count: 500
 
 accord:
     enabled: true
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/CoordinatorHelper.java 
b/test/distributed/org/apache/cassandra/distributed/impl/CoordinatorHelper.java
index 414b30e05c..2100217d22 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/CoordinatorHelper.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/CoordinatorHelper.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.reads.thresholds.CoordinatorWarnings;
+import org.apache.cassandra.service.writes.thresholds.CoordinatorWriteWarnings;
 import org.apache.cassandra.transport.Dispatcher;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.transport.messages.ResultMessage;
@@ -62,6 +63,7 @@ public class CoordinatorHelper
         // warnings as it sets a new State instance on the ThreadLocal.
         ClientWarn.instance.captureWarnings();
         CoordinatorWarnings.init();
+        CoordinatorWriteWarnings.init();
         try
         {
             ResultMessage res = prepared.execute(QueryState.forInternalCalls(),
@@ -76,19 +78,27 @@ public class CoordinatorHelper
                                                  requestTime);
             // Collect warnings reported during the query.
             CoordinatorWarnings.done();
-            if (res != null)
-                res.setWarnings(ClientWarn.instance.getWarnings());
+            CoordinatorWriteWarnings.done();
+
+            // Convert null result to ResultMessage.Void, matching 
QueryProcessor.processStatement() behavior
+            // This is necessary to attach warnings to INSERT/UPDATE/DELETE 
statements which return null
+            if (res == null)
+                res = new ResultMessage.Void();
+
+            res.setWarnings(ClientWarn.instance.getWarnings());
 
             return RowUtil.toQueryResult(res);
         }
         catch (Exception | Error e)
         {
             CoordinatorWarnings.done();
+            CoordinatorWriteWarnings.done();
             throw e;
         }
         finally
         {
             CoordinatorWarnings.reset();
+            CoordinatorWriteWarnings.reset();
             ClientWarn.instance.resetWarnings();
         }
     }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index ac1da245d5..791465eb60 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -141,6 +141,7 @@ import org.apache.cassandra.service.paxos.PaxosState;
 import org.apache.cassandra.service.paxos.uncommitted.UncommittedTableData;
 import org.apache.cassandra.service.reads.thresholds.CoordinatorWarnings;
 import org.apache.cassandra.service.snapshot.SnapshotManager;
+import org.apache.cassandra.service.writes.thresholds.CoordinatorWriteWarnings;
 import org.apache.cassandra.streaming.StreamManager;
 import org.apache.cassandra.streaming.StreamReceiveTask;
 import org.apache.cassandra.streaming.StreamTransferTask;
@@ -303,12 +304,14 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
     {
         ClientWarn.instance.captureWarnings();
         CoordinatorWarnings.init();
+        CoordinatorWriteWarnings.init();
         try
         {
             QueryHandler.Prepared prepared = 
QueryProcessor.prepareInternal(query);
             ResultMessage result = 
prepared.statement.executeLocally(QueryProcessor.internalQueryState(),
                                                                      
QueryProcessor.makeInternalOptions(prepared.statement, args));
             CoordinatorWarnings.done();
+            CoordinatorWriteWarnings.done();
 
             if (result != null)
                 result.setWarnings(ClientWarn.instance.getWarnings());
@@ -317,11 +320,13 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
         catch (Exception | Error e)
         {
             CoordinatorWarnings.done();
+            CoordinatorWriteWarnings.done();
             throw e;
         }
         finally
         {
             CoordinatorWarnings.reset();
+            CoordinatorWriteWarnings.reset();
             ClientWarn.instance.resetWarnings();
         }
     }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/thresholds/AbstractWriteThresholdWarning.java
 
b/test/distributed/org/apache/cassandra/distributed/test/thresholds/AbstractWriteThresholdWarning.java
new file mode 100644
index 0000000000..36f4e2c4bf
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/thresholds/AbstractWriteThresholdWarning.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.thresholds;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.SimpleStatement;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DurationSpec;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.JavaDriverUtils;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Base class for write threshold warning distributed tests.
+ * Tests coordinator-side warning aggregation from replica responses.
+ */
+public abstract class AbstractWriteThresholdWarning extends TestBaseImpl
+{
+    protected static ICluster<IInvokableInstance> CLUSTER;
+    protected static com.datastax.driver.core.Cluster JAVA_DRIVER;
+    protected static com.datastax.driver.core.Session JAVA_DRIVER_SESSION;
+
+    @BeforeClass
+    public static void setupClass() throws IOException
+    {
+        Cluster.Builder builder = Cluster.build(3);
+        builder.withConfig(c -> c.with(Feature.NATIVE_PROTOCOL, 
Feature.GOSSIP));
+        CLUSTER = builder.start();
+        JAVA_DRIVER = JavaDriverUtils.create(CLUSTER);
+        JAVA_DRIVER_SESSION = JAVA_DRIVER.connect();
+    }
+
+    protected abstract long totalWarnings();
+    protected abstract void assertWarnings(List<String> warnings);
+    protected abstract void populateTopPartitions(int pk, long value);
+
+    @Before
+    public void setup()
+    {
+        CLUSTER.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
+        CLUSTER.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+        CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck 
int, v blob, PRIMARY KEY (pk, ck))");
+    }
+
+    @Test
+    public void noTopPartitionsFeatureEnabled()
+    {
+        noTopPartitionsTest(true);
+    }
+
+    @Test
+    public void noTopPartitionsFeatureDisabled()
+    {
+        noTopPartitionsTest(false);
+    }
+
+    private void noTopPartitionsTest(boolean featureEnabled)
+    {
+        enable(featureEnabled);
+
+        CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v) VALUES (1, 1, ?)",
+                                      ConsistencyLevel.ALL, bytes(512));
+
+        // Should have no warnings regardless of feature state
+        SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(
+            "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, ?)",
+            ConsistencyLevel.ALL, bytes(512));
+        assertThat(result.warnings()).isEmpty();
+
+        assertWarningsCount(0);
+    }
+
+    @Test
+    public void topPartitionsExistFeatureDisabled()
+    {
+        // Populate TopPartitionTracker with high value
+        populateTopPartitions(1, getWarnThreshold() * 2);
+
+        enable(false);
+
+        // Write to the top partition
+        SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(
+            "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)",
+            ConsistencyLevel.ALL, bytes(512));
+
+        // Should have no warnings when feature is disabled
+        assertThat(result.warnings()).isEmpty();
+        assertWarningsCount(0);
+    }
+
+    @Test
+    public void topPartitionExistsFeatureEnabledPartitionIsTop()
+    {
+        // Populate TopPartitionTracker with value above warn threshold
+        populateTopPartitions(1, getWarnThreshold() * 2);
+
+        enable(true);
+
+        // Write to the top partition
+        SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(
+            "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)",
+            ConsistencyLevel.ALL, bytes(512));
+
+        // Should have warning
+        assertWarnings(result.warnings());
+        assertWarningsCount(1);
+    }
+
+    @Test
+    public void topPartitionExistsFeatureEnabledPartitionNotTop()
+    {
+        // Populate TopPartitionTracker for pk=1 with high value
+        populateTopPartitions(1, getWarnThreshold() * 2);
+
+        enable(true);
+
+        // Write to a different partition (pk=2) that's not in 
TopPartitionTracker
+        SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(
+            "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (2, 1, ?)",
+            ConsistencyLevel.ALL, bytes(512));
+
+        // Should have no warnings - pk=2 is not tracked
+        assertThat(result.warnings()).isEmpty();
+        assertWarningsCount(0);
+    }
+
+    @Test
+    public void topPartitionBelowThreshold()
+    {
+        // Populate TopPartitionTracker with value BELOW warn threshold
+        populateTopPartitions(1, getWarnThreshold() / 2);
+
+        enable(true);
+
+        // Write to the partition
+        SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(
+            "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)",
+            ConsistencyLevel.ALL, bytes(512));
+
+        // Should have no warnings - value is below threshold
+        assertThat(result.warnings()).isEmpty();
+        assertWarningsCount(0);
+    }
+
+    @Test
+    public void multipleWritesToTopPartition()
+    {
+        // Populate TopPartitionTracker
+        populateTopPartitions(1, getWarnThreshold() * 2);
+
+        enable(true);
+
+        // First write - should warn
+        SimpleQueryResult result1 = CLUSTER.coordinator(1).executeWithResult(
+            "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)",
+            ConsistencyLevel.ALL, bytes(512));
+        assertWarnings(result1.warnings());
+        assertWarningsCount(1);
+
+        // Second write - should warn again
+        SimpleQueryResult result2 = CLUSTER.coordinator(1).executeWithResult(
+            "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, ?)",
+            ConsistencyLevel.ALL, bytes(512));
+        assertWarnings(result2.warnings());
+        assertWarningsCount(2);
+    }
+
+    @Test
+    public void mixedTopAndNonTopPartitions()
+    {
+        populateTopPartitions(1, getWarnThreshold() * 2);
+
+        enable(true);
+
+        SimpleQueryResult result1 = CLUSTER.coordinator(1).executeWithResult(
+            "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)",
+            ConsistencyLevel.ALL, bytes(512));
+        assertWarnings(result1.warnings());
+        assertWarningsCount(1);
+
+        SimpleQueryResult result2 = CLUSTER.coordinator(1).executeWithResult(
+            "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (2, 1, ?)",
+            ConsistencyLevel.ALL, bytes(512));
+        assertThat(result2.warnings()).isEmpty();
+        assertWarningsCount(1);
+
+        SimpleQueryResult result3 = CLUSTER.coordinator(1).executeWithResult(
+            "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, ?)",
+            ConsistencyLevel.ALL, bytes(512));
+        assertWarnings(result3.warnings());
+        assertWarningsCount(2);
+    }
+
+    @Test
+    public void javaDriverWarnings()
+    {
+        // Populate TopPartitionTracker
+        populateTopPartitions(1, getWarnThreshold() * 2);
+
+        enable(true);
+
+        // Write using Java driver
+        ResultSet result = JAVA_DRIVER_SESSION.execute(
+            new SimpleStatement("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) 
VALUES (1, 1, ?)", bytes(512))
+                
.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.ALL));
+
+        // Should have warnings
+        assertWarnings(result.getExecutionInfo().getWarnings());
+    }
+
+    @Test
+    public void warningMessageContainsTableIdentifier()
+    {
+        populateTopPartitions(1, getWarnThreshold() * 2);
+        enable(true);
+
+        SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(
+        "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)",
+        ConsistencyLevel.ALL, bytes(512));
+
+        List<String> warnings = result.warnings();
+        assertThat(warnings).hasSize(1);
+        // Warning must identify the specific table that breached the threshold
+        assertThat(warnings.get(0)).contains(KEYSPACE + ".tbl");
+    }
+
+    @Test
+    public void rateLimitingThrottlesWarnings() throws InterruptedException
+    {
+        populateTopPartitions(1, getWarnThreshold() * 2);
+        enable(true);
+
+        // Enable rate limiting with 1 second interval
+        CLUSTER.stream().forEach(i -> i.runOnInstance(() ->
+                                                      
DatabaseDescriptor.setCoordinatorWriteWarnInterval(new 
DurationSpec.LongMillisecondsBound("1000ms"))));
+
+        try
+        {
+            // First write should produce a warning
+            SimpleQueryResult result1 = 
CLUSTER.coordinator(1).executeWithResult(
+            "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)",
+            ConsistencyLevel.ALL, bytes(512));
+            assertWarnings(result1.warnings());
+
+            // Second write immediately after should be rate-limited (no 
warning)
+            SimpleQueryResult result2 = 
CLUSTER.coordinator(1).executeWithResult(
+            "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, ?)",
+            ConsistencyLevel.ALL, bytes(512));
+            assertThat(result2.warnings()).isEmpty();
+
+            Thread.sleep(1500);
+
+            // Third write should produce a warning again
+            SimpleQueryResult result3 = 
CLUSTER.coordinator(1).executeWithResult(
+                "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 
?)",
+            ConsistencyLevel.ALL, bytes(512));
+            assertWarnings(result3.warnings());
+        }
+        finally
+        {
+            // Reset to no rate limiting for other tests
+            CLUSTER.stream().forEach(i -> i.runOnInstance(() ->
+                                                          
DatabaseDescriptor.setCoordinatorWriteWarnInterval(new 
DurationSpec.LongMillisecondsBound("0ms"))));
+        }
+    }
+
+    protected static void enable(boolean value)
+    {
+        CLUSTER.stream().forEach(i -> i.runOnInstance(() -> 
DatabaseDescriptor.setWriteThresholdsEnabled(value)));
+    }
+
+    protected static ByteBuffer bytes(int size)
+    {
+        return ByteBuffer.wrap(new byte[size]);
+    }
+
+    private void assertWarningsCount(int expected)
+    {
+        assertThat(totalWarnings()).as("warnings").isEqualTo(expected);
+    }
+
+    protected abstract long getWarnThreshold();
+}
\ No newline at end of file
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/thresholds/ReplicaWarningTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/thresholds/ReplicaWarningTest.java
new file mode 100644
index 0000000000..c1dc4e4a7a
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/thresholds/ReplicaWarningTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.thresholds;
+
+import java.io.IOException;
+import java.util.List;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+
+import org.junit.Test;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.metrics.TopPartitionTracker;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class ReplicaWarningTest extends TestBaseImpl
+{
+    @Test
+    public void testMultiTableWrite() throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(3)
+                                           .withConfig(c -> 
c.set("min_tracked_partition_size", "50B")
+                                                             
.set("write_thresholds_enabled", "true")
+                                                             
.set("write_size_warn_threshold", "50B")
+                                                             
.with(Feature.NATIVE_PROTOCOL))
+                                           .start()))
+        {
+            createTables(cluster);
+            populateTopPartitions(cluster.get(3));
+            assertOneWarning(cluster, ConsistencyLevel.ALL);
+        }
+    }
+
+    @Test
+    public void testMultiDCWrite() throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(6)
+                                           .withRacks(2, 3)
+                                           .withConfig(c -> 
c.set("min_tracked_partition_size", "50B")
+                                                             
.set("write_thresholds_enabled", "true")
+                                                             
.set("write_size_warn_threshold", "50B")
+                                                             
.with(Feature.NATIVE_PROTOCOL))
+                                           .start()))
+        {
+            cluster.schemaChange(withKeyspace("alter keyspace %s with 
replication = {'class':'NetworkTopologyStrategy', 'datacenter1' : 3, 
'datacenter2': 3}"));
+            createTables(cluster);
+            for (int node : new int[] {4, 5})
+                populateTopPartitions(cluster.get(node));
+            assertOneWarning(cluster, ConsistencyLevel.EACH_QUORUM);
+        }
+    }
+
+    private static void createTables(Cluster cluster)
+    {
+        cluster.schemaChange(withKeyspace("create table %s.tbl1 (id int 
primary key)"));
+        cluster.schemaChange(withKeyspace("create table %s.tbl2 (id int 
primary key)"));
+        cluster.schemaChange(withKeyspace("create table %s.tbl3 (id int 
primary key)"));
+    }
+
+    private static void populateTopPartitions(IInvokableInstance instance)
+    {
+        instance.runOnInstance(() -> {
+            TopPartitionTracker tpt = 
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl1").topPartitions;
+            for (int i = 0; i < 10; i++)
+            {
+                DecoratedKey key = 
Murmur3Partitioner.instance.decorateKey(Int32Type.instance.fromString(String.valueOf(i)));
+                tpt.topSizes().track(key, 100 + i);
+            }
+        });
+    }
+
+    private static void assertOneWarning(Cluster cluster, ConsistencyLevel cl)
+    {
+        com.datastax.driver.core.Cluster.Builder builder = 
com.datastax.driver.core.Cluster.builder().addContactPoint((String)cluster.get(1).config().get("rpc_address"));
+
+        try (com.datastax.driver.core.Cluster c = builder.build(); Session 
session = c.connect())
+        {
+            BatchStatement bs = new BatchStatement();
+            bs.add(new SimpleStatement(withKeyspace("insert into %s.tbl1 (id) 
values (1)")));
+            bs.add(new SimpleStatement(withKeyspace("insert into %s.tbl2 (id) 
values (1)")));
+            bs.add(new SimpleStatement(withKeyspace("insert into %s.tbl3 (id) 
values (1)")));
+            ResultSet res = session.execute(bs.setConsistencyLevel(cl));
+
+            List<String> warnings = res.getExecutionInfo().getWarnings();
+            // only `tbl1` has any tracked top partitions, should only warn 
for that
+            assertEquals(1, warnings.size());
+            for (String warn : warnings)
+            {
+                assertFalse(warn.contains("tbl2"));
+                assertFalse(warn.contains("tbl3"));
+            }
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/thresholds/WriteSizeWarningTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/thresholds/WriteSizeWarningTest.java
new file mode 100644
index 0000000000..c9176efa34
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/thresholds/WriteSizeWarningTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.thresholds;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.BeforeClass;
+
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DurationSpec;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static 
org.apache.cassandra.config.DataStorageSpec.DataStorageUnit.MEBIBYTES;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Distributed tests for write size threshold warnings.
+ * Tests that writes to large partitions (tracked in TopPartitionTracker) 
trigger warnings.
+ */
+public class WriteSizeWarningTest extends AbstractWriteThresholdWarning
+{
+    private static final long WARN_THRESHOLD_BYTES = 5 * 1024 * 1024; // 5MB
+
+    @BeforeClass
+    public static void setupClass() throws IOException
+    {
+        AbstractWriteThresholdWarning.setupClass();
+
+        // Setup write size threshold after cluster init
+        CLUSTER.stream().forEach(i -> i.runOnInstance(() -> {
+            DatabaseDescriptor.setWriteSizeWarnThreshold(new 
DataStorageSpec.LongBytesBound(5, MEBIBYTES));
+            // Set minimum tracked partition size to ensure partitions are 
tracked
+            // This should be lower than the test value (10MB) to allow 
tracking
+            DatabaseDescriptor.setMinTrackedPartitionSizeInBytes(new 
DataStorageSpec.LongBytesBound(1, MEBIBYTES));
+            DatabaseDescriptor.setWriteThresholdsEnabled(true);
+            DatabaseDescriptor.setCoordinatorWriteWarnInterval(new 
DurationSpec.LongMillisecondsBound("0ms"));
+        }));
+    }
+
+    @Override
+    protected long getWarnThreshold()
+    {
+        return WARN_THRESHOLD_BYTES;
+    }
+
+    @Override
+    protected void populateTopPartitions(int pk, long sizeBytes)
+    {
+        CLUSTER.stream().forEach(node -> node.runOnInstance(() -> {
+            var key = 
Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(pk));
+            ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+            cfs.topPartitions.topSizes().track(key, sizeBytes);
+        }));
+    }
+
+    @Override
+    protected long totalWarnings()
+    {
+        return CLUSTER.stream()
+                      .mapToLong(i -> 
i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.WriteSizeWarnings."
 + KEYSPACE))
+                      .sum();
+    }
+
+    @Override
+    protected void assertWarnings(List<String> warnings)
+    {
+        assertThat(warnings).hasSize(1);
+        assertThat(warnings.get(0))
+            .contains(KEYSPACE + ".tbl")
+            .contains("large partition")
+            .contains("estimated size is")
+            .contains("bytes")
+            .contains("write_size_warn_threshold");
+    }
+}
\ No newline at end of file
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/thresholds/WriteTombstoneWarningTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/thresholds/WriteTombstoneWarningTest.java
new file mode 100644
index 0000000000..0d4ba64662
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/thresholds/WriteTombstoneWarningTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.thresholds;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.BeforeClass;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DurationSpec;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Distributed tests for write tombstone threshold warnings.
+ * Tests that writes to partitions with many tombstones (tracked in 
TopPartitionTracker) trigger warnings.
+ */
+public class WriteTombstoneWarningTest extends AbstractWriteThresholdWarning
+{
+    private static final long WARN_THRESHOLD_COUNT = 1000;
+
+    @BeforeClass
+    public static void setupClass() throws IOException
+    {
+        AbstractWriteThresholdWarning.setupClass();
+
+        // Setup write tombstone threshold after cluster init
+        CLUSTER.stream().forEach(i -> i.runOnInstance(() -> {
+            // Set minimum tracked count first, before the threshold 
(validation requires threshold >= min)
+            DatabaseDescriptor.setMinTrackedPartitionTombstoneCount(100);
+            DatabaseDescriptor.setWriteTombstoneWarnThreshold((int) 
WARN_THRESHOLD_COUNT);
+            DatabaseDescriptor.setCoordinatorWriteWarnInterval(new 
DurationSpec.LongMillisecondsBound("0ms"));
+        }));
+    }
+
+    @Override
+    protected long getWarnThreshold()
+    {
+        return WARN_THRESHOLD_COUNT;
+    }
+
+    @Override
+    protected void populateTopPartitions(int pk, long tombstoneCount)
+    {
+        CLUSTER.stream().forEach(node -> node.runOnInstance(() -> {
+            // Get the DecoratedKey for the partition
+            var key = 
Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(pk));
+
+            // Get the ColumnFamilyStore
+            ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+
+            // Populate TopPartitionTracker with the tombstone count
+            if (cfs.topPartitions != null)
+            {
+                cfs.topPartitions.topTombstones().track(key, tombstoneCount);
+            }
+        }));
+    }
+
+    @Override
+    protected long totalWarnings()
+    {
+        return CLUSTER.stream()
+                      .mapToLong(i -> 
i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.WriteTombstoneWarnings."
 + KEYSPACE))
+                      .sum();
+    }
+
+    @Override
+    protected void assertWarnings(List<String> warnings)
+    {
+        assertThat(warnings).hasSize(1);
+        assertThat(warnings.get(0))
+            .contains(KEYSPACE + ".tbl")
+            .contains("many tombstones")
+            .contains("estimated count is")
+            .contains("write_tombstone_warn_threshold");
+    }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java 
b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
index 678f6746f1..2a44e763e0 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
@@ -50,6 +50,7 @@ import static 
org.apache.cassandra.config.CassandraRelevantProperties.ALLOW_UNLI
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.CONFIG_LOADER;
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.PARTITIONER;
 import static 
org.apache.cassandra.config.DataStorageSpec.DataStorageUnit.KIBIBYTES;
+import static 
org.apache.cassandra.config.DataStorageSpec.DataStorageUnit.MEBIBYTES;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -687,7 +688,7 @@ public class DatabaseDescriptorTest
         Config conf = new Config();
         conf.coordinator_read_size_warn_threshold = new 
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
         conf.coordinator_read_size_fail_threshold = new 
DataStorageSpec.LongBytesBound(1, KIBIBYTES);
-        Assertions.assertThatThrownBy(() -> 
DatabaseDescriptor.applyReadThresholdsValidations(conf))
+        Assertions.assertThatThrownBy(() -> 
DatabaseDescriptor.applyThresholdsValidations(conf))
                   .isInstanceOf(ConfigurationException.class)
                   .hasMessage("coordinator_read_size_fail_threshold (1KiB) 
must be greater than or equal to coordinator_read_size_warn_threshold (2KiB)");
     }
@@ -698,7 +699,7 @@ public class DatabaseDescriptorTest
         Config conf = new Config();
         conf.coordinator_read_size_warn_threshold = new 
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
         conf.coordinator_read_size_fail_threshold = new 
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
-        DatabaseDescriptor.applyReadThresholdsValidations(conf);
+        DatabaseDescriptor.applyThresholdsValidations(conf);
     }
 
     @Test
@@ -707,7 +708,7 @@ public class DatabaseDescriptorTest
         Config conf = new Config();
         conf.coordinator_read_size_warn_threshold = new 
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
         conf.coordinator_read_size_fail_threshold = null;
-        DatabaseDescriptor.applyReadThresholdsValidations(conf);
+        DatabaseDescriptor.applyThresholdsValidations(conf);
     }
 
     @Test
@@ -716,7 +717,7 @@ public class DatabaseDescriptorTest
         Config conf = new Config();
         conf.coordinator_read_size_warn_threshold = new 
DataStorageSpec.LongBytesBound(0, KIBIBYTES);
         conf.coordinator_read_size_fail_threshold = new 
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
-        DatabaseDescriptor.applyReadThresholdsValidations(conf);
+        DatabaseDescriptor.applyThresholdsValidations(conf);
     }
 
     // local read
@@ -727,7 +728,7 @@ public class DatabaseDescriptorTest
         Config conf = new Config();
         conf.local_read_size_warn_threshold = new 
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
         conf.local_read_size_fail_threshold = new 
DataStorageSpec.LongBytesBound(1, KIBIBYTES);
-        Assertions.assertThatThrownBy(() -> 
DatabaseDescriptor.applyReadThresholdsValidations(conf))
+        Assertions.assertThatThrownBy(() -> 
DatabaseDescriptor.applyThresholdsValidations(conf))
                   .isInstanceOf(ConfigurationException.class)
                   .hasMessage("local_read_size_fail_threshold (1KiB) must be 
greater than or equal to local_read_size_warn_threshold (2KiB)");
     }
@@ -738,7 +739,7 @@ public class DatabaseDescriptorTest
         Config conf = new Config();
         conf.local_read_size_warn_threshold = new 
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
         conf.local_read_size_fail_threshold = new 
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
-        DatabaseDescriptor.applyReadThresholdsValidations(conf);
+        DatabaseDescriptor.applyThresholdsValidations(conf);
     }
 
     @Test
@@ -747,7 +748,7 @@ public class DatabaseDescriptorTest
         Config conf = new Config();
         conf.local_read_size_warn_threshold = new 
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
         conf.local_read_size_fail_threshold = null;
-        DatabaseDescriptor.applyReadThresholdsValidations(conf);
+        DatabaseDescriptor.applyThresholdsValidations(conf);
     }
 
     @Test
@@ -756,7 +757,7 @@ public class DatabaseDescriptorTest
         Config conf = new Config();
         conf.local_read_size_warn_threshold = new 
DataStorageSpec.LongBytesBound(0, KIBIBYTES);
         conf.local_read_size_fail_threshold = new 
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
-        DatabaseDescriptor.applyReadThresholdsValidations(conf);
+        DatabaseDescriptor.applyThresholdsValidations(conf);
     }
 
     // row index entry
@@ -767,7 +768,7 @@ public class DatabaseDescriptorTest
         Config conf = new Config();
         conf.row_index_read_size_warn_threshold = new 
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
         conf.row_index_read_size_fail_threshold = new 
DataStorageSpec.LongBytesBound(1, KIBIBYTES);
-        Assertions.assertThatThrownBy(() -> 
DatabaseDescriptor.applyReadThresholdsValidations(conf))
+        Assertions.assertThatThrownBy(() -> 
DatabaseDescriptor.applyThresholdsValidations(conf))
                   .isInstanceOf(ConfigurationException.class)
                   .hasMessage("row_index_read_size_fail_threshold (1KiB) must 
be greater than or equal to row_index_read_size_warn_threshold (2KiB)");
     }
@@ -778,7 +779,7 @@ public class DatabaseDescriptorTest
         Config conf = new Config();
         conf.row_index_read_size_warn_threshold = new 
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
         conf.row_index_read_size_fail_threshold = new 
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
-        DatabaseDescriptor.applyReadThresholdsValidations(conf);
+        DatabaseDescriptor.applyThresholdsValidations(conf);
     }
 
     @Test
@@ -787,7 +788,7 @@ public class DatabaseDescriptorTest
         Config conf = new Config();
         conf.row_index_read_size_warn_threshold = new 
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
         conf.row_index_read_size_fail_threshold = null;
-        DatabaseDescriptor.applyReadThresholdsValidations(conf);
+        DatabaseDescriptor.applyThresholdsValidations(conf);
     }
 
     @Test
@@ -796,7 +797,150 @@ public class DatabaseDescriptorTest
         Config conf = new Config();
         conf.row_index_read_size_warn_threshold = new 
DataStorageSpec.LongBytesBound(0, KIBIBYTES);
         conf.row_index_read_size_fail_threshold = new 
DataStorageSpec.LongBytesBound(2, KIBIBYTES);
-        DatabaseDescriptor.applyReadThresholdsValidations(conf);
+        DatabaseDescriptor.applyThresholdsValidations(conf);
+    }
+
+    // write thresholds
+    @Test
+    public void testWriteSizeWarnThresholdEnabled()
+    {
+        Config conf = new Config();
+        conf.write_size_warn_threshold = new 
DataStorageSpec.LongBytesBound(10, MEBIBYTES);
+        conf.write_thresholds_enabled = true;
+        
DatabaseDescriptor.setWriteSizeWarnThreshold(conf.write_size_warn_threshold);
+        
DatabaseDescriptor.setWriteThresholdsEnabled(conf.write_thresholds_enabled);
+        
assertThat(DatabaseDescriptor.getWriteSizeWarnThreshold()).isEqualTo(conf.write_size_warn_threshold);
+        assertThat(DatabaseDescriptor.getWriteThresholdsEnabled()).isTrue();
+    }
+
+    @Test
+    public void testWriteSizeWarnThresholdDisabled()
+    {
+        Config conf = new Config();
+        conf.write_size_warn_threshold = null;
+        conf.write_thresholds_enabled = false;
+        
DatabaseDescriptor.setWriteSizeWarnThreshold(conf.write_size_warn_threshold);
+        
DatabaseDescriptor.setWriteThresholdsEnabled(conf.write_thresholds_enabled);
+        assertThat(DatabaseDescriptor.getWriteSizeWarnThreshold()).isNull();
+        assertThat(DatabaseDescriptor.getWriteThresholdsEnabled()).isFalse();
+    }
+
+    @Test
+    public void testWriteTombstoneWarnThresholdEnabled()
+    {
+        Config conf = new Config();
+        conf.write_tombstone_warn_threshold = 1000;
+        conf.write_thresholds_enabled = true;
+        
DatabaseDescriptor.setWriteTombstoneWarnThreshold(conf.write_tombstone_warn_threshold);
+        
DatabaseDescriptor.setWriteThresholdsEnabled(conf.write_thresholds_enabled);
+        
assertThat(DatabaseDescriptor.getWriteTombstoneWarnThreshold()).isEqualTo(1000);
+        assertThat(DatabaseDescriptor.getWriteThresholdsEnabled()).isTrue();
+    }
+
+    @Test
+    public void testWriteTombstoneWarnThresholdDisabled()
+    {
+        Config conf = new Config();
+        conf.write_tombstone_warn_threshold = -1;
+        conf.write_thresholds_enabled = false;
+        
DatabaseDescriptor.setWriteTombstoneWarnThreshold(conf.write_tombstone_warn_threshold);
+        
DatabaseDescriptor.setWriteThresholdsEnabled(conf.write_thresholds_enabled);
+        
assertThat(DatabaseDescriptor.getWriteTombstoneWarnThreshold()).isEqualTo(-1);
+        assertThat(DatabaseDescriptor.getWriteThresholdsEnabled()).isFalse();
+    }
+
+    @Test
+    public void testWriteThresholdsCanBeSetIndependently()
+    {
+        Config conf = new Config();
+        // Set size threshold only
+        conf.write_size_warn_threshold = new DataStorageSpec.LongBytesBound(5, 
MEBIBYTES);
+        conf.write_tombstone_warn_threshold = -1;
+        conf.write_thresholds_enabled = true;
+        
DatabaseDescriptor.setWriteSizeWarnThreshold(conf.write_size_warn_threshold);
+        
DatabaseDescriptor.setWriteTombstoneWarnThreshold(conf.write_tombstone_warn_threshold);
+        
DatabaseDescriptor.setWriteThresholdsEnabled(conf.write_thresholds_enabled);
+        
assertThat(DatabaseDescriptor.getWriteSizeWarnThreshold()).isEqualTo(conf.write_size_warn_threshold);
+        
assertThat(DatabaseDescriptor.getWriteTombstoneWarnThreshold()).isEqualTo(-1);
+
+        // Set tombstone threshold only
+        conf.write_size_warn_threshold = null;
+        conf.write_tombstone_warn_threshold = 500;
+        
DatabaseDescriptor.setWriteSizeWarnThreshold(conf.write_size_warn_threshold);
+        
DatabaseDescriptor.setWriteTombstoneWarnThreshold(conf.write_tombstone_warn_threshold);
+        assertThat(DatabaseDescriptor.getWriteSizeWarnThreshold()).isNull();
+        
assertThat(DatabaseDescriptor.getWriteTombstoneWarnThreshold()).isEqualTo(500);
+    }
+
+    @Test
+    public void testWriteSizeWarnThresholdValidation()
+    {
+        Config conf = new Config();
+
+        // Error: warn < min
+        conf.write_size_warn_threshold = new DataStorageSpec.LongBytesBound(1, 
MEBIBYTES);
+        conf.min_tracked_partition_size = new 
DataStorageSpec.LongBytesBound(5, MEBIBYTES);
+        assertThatThrownBy(() -> 
DatabaseDescriptor.applyThresholdsValidations(conf))
+        .isInstanceOf(ConfigurationException.class)
+        .hasMessageContaining("write_size_warn_threshold (1MiB) cannot be less 
than min_tracked_partition_size (5MiB)");
+
+        // Valid: warn == min
+        conf.write_size_warn_threshold = new DataStorageSpec.LongBytesBound(5, 
MEBIBYTES);
+        conf.min_tracked_partition_size = new 
DataStorageSpec.LongBytesBound(5, MEBIBYTES);
+        DatabaseDescriptor.applyThresholdsValidations(conf);
+
+        // Valid: warn > min
+        conf.write_size_warn_threshold = new 
DataStorageSpec.LongBytesBound(10, MEBIBYTES);
+        conf.min_tracked_partition_size = new 
DataStorageSpec.LongBytesBound(5, MEBIBYTES);
+        DatabaseDescriptor.applyThresholdsValidations(conf);
+
+        // Valid: null values
+        conf.write_size_warn_threshold = null;
+        conf.min_tracked_partition_size = new 
DataStorageSpec.LongBytesBound(5, MEBIBYTES);
+        DatabaseDescriptor.applyThresholdsValidations(conf);
+
+        conf.write_size_warn_threshold = new 
DataStorageSpec.LongBytesBound(10, MEBIBYTES);
+        conf.min_tracked_partition_size = null;
+        DatabaseDescriptor.applyThresholdsValidations(conf);
+
+        conf.write_size_warn_threshold = null;
+        conf.min_tracked_partition_size = null;
+        DatabaseDescriptor.applyThresholdsValidations(conf);
+    }
+
+    @Test
+    public void testWriteTombstoneWarnThresholdValidation()
+    {
+        Config conf = new Config();
+
+        // Error: threshold < -1
+        conf.write_tombstone_warn_threshold = -2;
+        conf.min_tracked_partition_tombstone_count = 100;
+        assertThatThrownBy(() -> 
DatabaseDescriptor.applyThresholdsValidations(conf))
+        .isInstanceOf(ConfigurationException.class)
+        .hasMessageContaining("write_tombstone_warn_threshold (-2) must be -1 
(disabled) or >= 0");
+
+        // Valid: threshold == -1 (disabled)
+        conf.write_tombstone_warn_threshold = -1;
+        conf.min_tracked_partition_tombstone_count = 100;
+        DatabaseDescriptor.applyThresholdsValidations(conf);
+
+        // Error: warn < min
+        conf.write_tombstone_warn_threshold = 50;
+        conf.min_tracked_partition_tombstone_count = 100;
+        assertThatThrownBy(() -> 
DatabaseDescriptor.applyThresholdsValidations(conf))
+        .isInstanceOf(ConfigurationException.class)
+        .hasMessageContaining("write_tombstone_warn_threshold (50) cannot be 
less than min_tracked_partition_tombstone_count (100)");
+
+        // Valid: warn == min
+        conf.write_tombstone_warn_threshold = 100;
+        conf.min_tracked_partition_tombstone_count = 100;
+        DatabaseDescriptor.applyThresholdsValidations(conf);
+
+        // Valid: warn > min
+        conf.write_tombstone_warn_threshold = 200;
+        conf.min_tracked_partition_tombstone_count = 100;
+        DatabaseDescriptor.applyThresholdsValidations(conf);
     }
 
     @Test
diff --git 
a/test/unit/org/apache/cassandra/service/writes/thresholds/WarnCounterTest.java 
b/test/unit/org/apache/cassandra/service/writes/thresholds/WarnCounterTest.java
new file mode 100644
index 0000000000..192833c8e3
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/service/writes/thresholds/WarnCounterTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.writes.thresholds;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.junit.Test;
+
+import org.apache.cassandra.schema.TableId;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class WarnCounterTest
+{
+    private static final TableId TABLE1 = TableId.fromUUID(new UUID(0, 1));
+    private static final TableId TABLE2 = TableId.fromUUID(new UUID(0, 2));
+
+    @Test
+    public void testAddWarningSingleTable()
+    {
+        WarnCounter counter = new WarnCounter();
+        counter.addWarning(map(TABLE1, 1024L));
+
+        WriteThresholdCounter snapshot = counter.snapshot();
+        assertThat(snapshot.tableValues).containsEntry(TABLE1, 1024L);
+        assertThat(snapshot.tableValues).hasSize(1);
+    }
+
+    @Test
+    public void testAddWarningMultipleTables()
+    {
+        WarnCounter counter = new WarnCounter();
+        counter.addWarning(map(TABLE1, 1024L, TABLE2, 2048L));
+
+        WriteThresholdCounter snapshot = counter.snapshot();
+        assertThat(snapshot.tableValues).containsEntry(TABLE1, 1024L);
+        assertThat(snapshot.tableValues).containsEntry(TABLE2, 2048L);
+    }
+
+    @Test
+    public void testMaxValueTrackedPerTable()
+    {
+        WarnCounter counter = new WarnCounter();
+        counter.addWarning(map(TABLE1, 1024L));
+        counter.addWarning(map(TABLE1, 2048L));
+        counter.addWarning(map(TABLE1, 512L));
+
+        WriteThresholdCounter snapshot = counter.snapshot();
+        assertThat(snapshot.tableValues).containsEntry(TABLE1, 2048L);
+    }
+
+    @Test
+    public void testMaxValueIndependentPerTable()
+    {
+        WarnCounter counter = new WarnCounter();
+        counter.addWarning(map(TABLE1, 1000L, TABLE2, 500L));
+        counter.addWarning(map(TABLE1, 500L, TABLE2, 1000L));
+
+        WriteThresholdCounter snapshot = counter.snapshot();
+        assertThat(snapshot.tableValues).containsEntry(TABLE1, 1000L);
+        assertThat(snapshot.tableValues).containsEntry(TABLE2, 1000L);
+    }
+
+    @Test
+    public void testNewTableAddedOnSubsequentCall()
+    {
+        WarnCounter counter = new WarnCounter();
+        counter.addWarning(map(TABLE1, 1024L));
+        counter.addWarning(map(TABLE2, 2048L));
+
+        WriteThresholdCounter snapshot = counter.snapshot();
+        assertThat(snapshot.tableValues).containsEntry(TABLE1, 1024L);
+        assertThat(snapshot.tableValues).containsEntry(TABLE2, 2048L);
+    }
+
+    @Test
+    public void testSnapshotIsImmutable()
+    {
+        WarnCounter counter = new WarnCounter();
+        counter.addWarning(map(TABLE1, 1024L));
+
+        WriteThresholdCounter snapshot1 = counter.snapshot();
+
+        counter.addWarning(map(TABLE1, 2048L));
+        counter.addWarning(map(TABLE2, 512L));
+
+        WriteThresholdCounter snapshot2 = counter.snapshot();
+
+        // snapshot1 should not be affected by subsequent addWarning calls
+        assertThat(snapshot1.tableValues).containsEntry(TABLE1, 1024L);
+        assertThat(snapshot1.tableValues).doesNotContainKey(TABLE2);
+
+        assertThat(snapshot2.tableValues).containsEntry(TABLE1, 2048L);
+        assertThat(snapshot2.tableValues).containsEntry(TABLE2, 512L);
+    }
+
+    @Test
+    public void testEmptyCounter()
+    {
+        WarnCounter counter = new WarnCounter();
+
+        WriteThresholdCounter snapshot = counter.snapshot();
+        assertThat(snapshot.isEmpty()).isTrue();
+        assertThat(snapshot.tableValues).isEmpty();
+    }
+
+    @Test
+    public void testEmptyMapDoesNothing()
+    {
+        WarnCounter counter = new WarnCounter();
+        counter.addWarning(new HashMap<>());
+
+        assertThat(counter.snapshot().isEmpty()).isTrue();
+    }
+
+    private static Map<TableId, Long> map(TableId t1, long v1)
+    {
+        Map<TableId, Long> m = new HashMap<>();
+        m.put(t1, v1);
+        return m;
+    }
+
+    private static Map<TableId, Long> map(TableId t1, long v1, TableId t2, 
long v2)
+    {
+        Map<TableId, Long> m = new HashMap<>();
+        m.put(t1, v1);
+        m.put(t2, v2);
+        return m;
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/service/writes/thresholds/WriteWarningContextTest.java
 
b/test/unit/org/apache/cassandra/service/writes/thresholds/WriteWarningContextTest.java
new file mode 100644
index 0000000000..ec631ad6bd
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/service/writes/thresholds/WriteWarningContextTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.writes.thresholds;
+
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.junit.Test;
+
+import org.apache.cassandra.net.ParamType;
+import org.apache.cassandra.schema.TableId;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class WriteWarningContextTest
+{
+    private static final TableId TABLE1 = TableId.fromUUID(new UUID(0, 1));
+    private static final TableId TABLE2 = TableId.fromUUID(new UUID(0, 2));
+
+    @Test
+    public void testIsSupported()
+    {
+        
assertThat(WriteWarningContext.isSupported(EnumSet.of(ParamType.WRITE_SIZE_WARN))).isTrue();
+        
assertThat(WriteWarningContext.isSupported(EnumSet.of(ParamType.WRITE_TOMBSTONE_WARN))).isTrue();
+        assertThat(WriteWarningContext.isSupported(EnumSet.of(
+            ParamType.WRITE_SIZE_WARN,
+            ParamType.WRITE_TOMBSTONE_WARN
+        ))).isTrue();
+
+        // Read threshold params are not supported
+        assertThat(WriteWarningContext.isSupported(EnumSet.of(
+            ParamType.TOMBSTONE_WARNING,
+            ParamType.TOMBSTONE_FAIL
+        ))).isFalse();
+
+        
assertThat(WriteWarningContext.isSupported(EnumSet.noneOf(ParamType.class))).isFalse();
+    }
+
+    @Test
+    public void testUpdateCountersWithSingleCall()
+    {
+        WriteWarningContext context = new WriteWarningContext();
+
+        Map<ParamType, Object> params = new HashMap<>();
+        params.put(ParamType.WRITE_SIZE_WARN, tableMap(TABLE1, 1024L));
+        params.put(ParamType.WRITE_TOMBSTONE_WARN, tableMap(TABLE1, 500L));
+
+        context.updateCounters(params);
+
+        WriteWarningsSnapshot snapshot = context.snapshot();
+        assertThat(snapshot.writeSize.tableValues).containsEntry(TABLE1, 
1024L);
+        assertThat(snapshot.writeTombstone.tableValues).containsEntry(TABLE1, 
500L);
+    }
+
+    @Test
+    public void testUpdateCountersFromMultipleCalls()
+    {
+        WriteWarningContext context = new WriteWarningContext();
+
+        // First call reports TABLE1 with lower values
+        Map<ParamType, Object> params1 = new HashMap<>();
+        params1.put(ParamType.WRITE_SIZE_WARN, tableMap(TABLE1, 1024L));
+        params1.put(ParamType.WRITE_TOMBSTONE_WARN, tableMap(TABLE1, 500L));
+        context.updateCounters(params1);
+
+        // Second call reports TABLE1 with higher values and TABLE2
+        Map<ParamType, Object> params2 = new HashMap<>();
+        params2.put(ParamType.WRITE_SIZE_WARN, tableMap(TABLE1, 2048L));
+        params2.put(ParamType.WRITE_TOMBSTONE_WARN, tableMap(TABLE2, 1000L));
+        context.updateCounters(params2);
+
+        WriteWarningsSnapshot snapshot = context.snapshot();
+
+        // Max values per table should be taken
+        assertThat(snapshot.writeSize.tableValues).containsEntry(TABLE1, 
2048L);
+        assertThat(snapshot.writeTombstone.tableValues).containsEntry(TABLE1, 
500L);
+        assertThat(snapshot.writeTombstone.tableValues).containsEntry(TABLE2, 
1000L);
+    }
+
+    @Test
+    public void testUpdateCountersWithPartialWarnings()
+    {
+        WriteWarningContext context = new WriteWarningContext();
+
+        // First call only reports size warning
+        Map<ParamType, Object> params1 = new HashMap<>();
+        params1.put(ParamType.WRITE_SIZE_WARN, tableMap(TABLE1, 1024L));
+        context.updateCounters(params1);
+
+        // Second call only reports tombstone warning
+        Map<ParamType, Object> params2 = new HashMap<>();
+        params2.put(ParamType.WRITE_TOMBSTONE_WARN, tableMap(TABLE2, 500L));
+        context.updateCounters(params2);
+
+        WriteWarningsSnapshot snapshot = context.snapshot();
+
+        assertThat(snapshot.writeSize.tableValues).containsEntry(TABLE1, 
1024L);
+        assertThat(snapshot.writeTombstone.tableValues).containsEntry(TABLE2, 
500L);
+    }
+
+    @Test
+    public void testUpdateCountersWithEmptyParams()
+    {
+        WriteWarningContext context = new WriteWarningContext();
+
+        context.updateCounters(new HashMap<>());
+
+        assertThat(context.snapshot().isEmpty()).isTrue();
+    }
+
+    @Test
+    public void testUpdateCountersWithUnsupportedParams()
+    {
+        WriteWarningContext context = new WriteWarningContext();
+
+        Map<ParamType, Object> params = new HashMap<>();
+        params.put(ParamType.TOMBSTONE_WARNING, 100);
+        params.put(ParamType.TOMBSTONE_FAIL, 200);
+        context.updateCounters(params);
+
+        assertThat(context.snapshot().isEmpty()).isTrue();
+    }
+
+    @Test
+    public void testUpdateCountersMaxValueTrackedPerTable()
+    {
+        WriteWarningContext context = new WriteWarningContext();
+
+        context.updateCounters(paramsSize(tableMap(TABLE1, 1024L)));
+        context.updateCounters(paramsSize(tableMap(TABLE1, 2048L)));
+        context.updateCounters(paramsSize(tableMap(TABLE1, 512L)));
+
+        WriteWarningsSnapshot snapshot = context.snapshot();
+        assertThat(snapshot.writeSize.tableValues).containsEntry(TABLE1, 
2048L);
+    }
+
+    @Test
+    public void testSnapshotIsImmutable()
+    {
+        WriteWarningContext context = new WriteWarningContext();
+
+        context.updateCounters(paramsSize(tableMap(TABLE1, 1024L)));
+        WriteWarningsSnapshot snapshot1 = context.snapshot();
+
+        context.updateCounters(paramsSize(tableMap(TABLE1, 2048L)));
+        context.updateCounters(paramsSize(tableMap(TABLE2, 512L)));
+        WriteWarningsSnapshot snapshot2 = context.snapshot();
+
+        // First snapshot should not be affected by subsequent calls
+        assertThat(snapshot1.writeSize.tableValues).containsEntry(TABLE1, 
1024L);
+        assertThat(snapshot1.writeSize.tableValues).doesNotContainKey(TABLE2);
+
+        assertThat(snapshot2.writeSize.tableValues).containsEntry(TABLE1, 
2048L);
+        assertThat(snapshot2.writeSize.tableValues).containsEntry(TABLE2, 
512L);
+    }
+
+    @Test
+    public void testSizeAndTombstoneTrackedIndependently()
+    {
+        WriteWarningContext context = new WriteWarningContext();
+
+        Map<ParamType, Object> params = new HashMap<>();
+        params.put(ParamType.WRITE_SIZE_WARN, tableMap(TABLE1, 
Long.MAX_VALUE));
+        params.put(ParamType.WRITE_TOMBSTONE_WARN, tableMap(TABLE2, 500L));
+
+        context.updateCounters(params);
+
+        WriteWarningsSnapshot snapshot = context.snapshot();
+        assertThat(snapshot.writeSize.tableValues).containsEntry(TABLE1, 
Long.MAX_VALUE);
+        assertThat(snapshot.writeTombstone.tableValues).containsEntry(TABLE2, 
500L);
+        assertThat(snapshot.writeSize.tableValues).doesNotContainKey(TABLE2);
+        
assertThat(snapshot.writeTombstone.tableValues).doesNotContainKey(TABLE1);
+    }
+
+    private static Map<TableId, Long> tableMap(TableId tableId, long value)
+    {
+        Map<TableId, Long> m = new HashMap<>();
+        m.put(tableId, value);
+        return m;
+    }
+
+    private static Map<ParamType, Object> paramsSize(Map<TableId, Long> 
sizeMap)
+    {
+        Map<ParamType, Object> params = new HashMap<>();
+        params.put(ParamType.WRITE_SIZE_WARN, sizeMap);
+        return params;
+    }
+}
\ No newline at end of file
diff --git 
a/test/unit/org/apache/cassandra/service/writes/thresholds/WriteWarningsSnapshotTest.java
 
b/test/unit/org/apache/cassandra/service/writes/thresholds/WriteWarningsSnapshotTest.java
new file mode 100644
index 0000000000..c22ec7df3c
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/service/writes/thresholds/WriteWarningsSnapshotTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.writes.thresholds;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.junit.Test;
+import org.quicktheories.core.Gen;
+import org.quicktheories.generators.SourceDSL;
+import org.quicktheories.impl.Constraint;
+
+import org.apache.cassandra.schema.TableId;
+
+import static 
org.apache.cassandra.service.writes.thresholds.WriteWarningsSnapshot.create;
+import static 
org.apache.cassandra.service.writes.thresholds.WriteWarningsSnapshot.writeSizeWarnMessage;
+import static 
org.apache.cassandra.service.writes.thresholds.WriteWarningsSnapshot.writeTombstoneWarnMessage;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.quicktheories.QuickTheory.qt;
+
+public class WriteWarningsSnapshotTest
+{
+    private static final TableId TABLE1 = TableId.fromUUID(new UUID(0, 1));
+    private static final TableId TABLE2 = TableId.fromUUID(new UUID(0, 2));
+    private static final WriteWarningsSnapshot EMPTY_SNAPSHOT = 
create(WriteThresholdCounter.empty(), WriteThresholdCounter.empty());
+
+    @Test
+    public void testCreateWithNonEmptyCounters()
+    {
+        WriteThresholdCounter sizeCounter = 
WriteThresholdCounter.create(map(TABLE1, 1024L));
+        WriteThresholdCounter tombstoneCounter = 
WriteThresholdCounter.create(map(TABLE2, 500L));
+        WriteWarningsSnapshot snapshot = create(sizeCounter, tombstoneCounter);
+
+        assertThat(snapshot.isEmpty()).isFalse();
+        assertThat(snapshot.writeSize).isEqualTo(sizeCounter);
+        assertThat(snapshot.writeTombstone).isEqualTo(tombstoneCounter);
+    }
+
+    @Test
+    public void testMergeWithNull()
+    {
+        WriteThresholdCounter sizeCounter = 
WriteThresholdCounter.create(map(TABLE1, 2048L));
+        WriteWarningsSnapshot snapshot = create(sizeCounter, 
WriteThresholdCounter.empty());
+        WriteWarningsSnapshot result = snapshot.merge(null);
+
+        assertThat(result).isEqualTo(snapshot);
+    }
+
+    @Test
+    public void testMergeSelfWithSelf()
+    {
+        qt().forAll(all()).check(snapshot -> 
snapshot.merge(snapshot).equals(snapshot));
+    }
+
+    @Test
+    public void testMergeNonOverlappingTables()
+    {
+        WriteWarningsSnapshot snapshot1 = create(
+            WriteThresholdCounter.create(map(TABLE1, 1024L)),
+            WriteThresholdCounter.create(map(TABLE1, 100L))
+        );
+
+        WriteWarningsSnapshot snapshot2 = create(
+            WriteThresholdCounter.create(map(TABLE2, 2048L)),
+            WriteThresholdCounter.create(map(TABLE2, 200L))
+        );
+
+        WriteWarningsSnapshot merged = snapshot1.merge(snapshot2);
+
+        assertThat(merged.writeSize.tableValues).containsEntry(TABLE1, 1024L);
+        assertThat(merged.writeSize.tableValues).containsEntry(TABLE2, 2048L);
+        assertThat(merged.writeTombstone.tableValues).containsEntry(TABLE1, 
100L);
+        assertThat(merged.writeTombstone.tableValues).containsEntry(TABLE2, 
200L);
+    }
+
+    @Test
+    public void testMergeOverlappingTablesTakesMax()
+    {
+        WriteWarningsSnapshot snapshot1 = create(
+            WriteThresholdCounter.create(map(TABLE1, 3000L)),
+            WriteThresholdCounter.empty()
+        );
+
+        WriteWarningsSnapshot snapshot2 = create(
+            WriteThresholdCounter.create(map(TABLE1, 4000L)),
+            WriteThresholdCounter.empty()
+        );
+
+        WriteWarningsSnapshot merged = snapshot1.merge(snapshot2);
+
+        assertThat(merged.writeSize.tableValues).containsEntry(TABLE1, 4000L);
+        assertThat(merged.writeSize.tableValues).hasSize(1);
+    }
+
+    @Test
+    public void testMergeDifferentThresholdTypes()
+    {
+        WriteWarningsSnapshot snapshot1 = create(
+            WriteThresholdCounter.create(map(TABLE1, 5000L)),
+            WriteThresholdCounter.empty()
+        );
+
+        WriteWarningsSnapshot snapshot2 = create(
+            WriteThresholdCounter.empty(),
+            WriteThresholdCounter.create(map(TABLE2, 300L))
+        );
+
+        WriteWarningsSnapshot merged = snapshot1.merge(snapshot2);
+
+        assertThat(merged.writeSize.tableValues).containsEntry(TABLE1, 5000L);
+        assertThat(merged.writeTombstone.tableValues).containsEntry(TABLE2, 
300L);
+    }
+
+    @Test
+    public void testWriteSizeWarnMessage()
+    {
+        String message = writeSizeWarnMessage(1048576L);
+        assertThat(message).isEqualTo("Write to large partition; estimated 
size is 1048576 bytes (see write_size_warn_threshold)");
+    }
+
+    @Test
+    public void testWriteTombstoneWarnMessage()
+    {
+        String message = writeTombstoneWarnMessage(500L);
+        assertThat(message).isEqualTo("Write to partition with many 
tombstones; estimated count is 500 (see write_tombstone_warn_threshold)");
+    }
+
+    @Test
+    public void testMergeCommutative()
+    {
+        qt().forAll(all(), all()).check((a, b) -> 
a.merge(b).equals(b.merge(a)));
+    }
+
+    @Test
+    public void testMergeAssociative()
+    {
+        qt().forAll(all(), all(), all()).check((a, b, c) -> 
a.merge(b).merge(c).equals(a.merge(b.merge(c))));
+    }
+
+    private static Gen<WriteWarningsSnapshot> all()
+    {
+        Gen<Boolean> isEmpty = SourceDSL.booleans().all();
+        Gen<WriteWarningsSnapshot> nonEmpty = nonEmpty();
+        Gen<WriteWarningsSnapshot> gen = rs -> isEmpty.generate(rs) ? 
EMPTY_SNAPSHOT : nonEmpty.generate(rs);
+        return gen.describedAs(WriteWarningsSnapshot::toString);
+    }
+
+    private static Gen<WriteWarningsSnapshot> nonEmpty()
+    {
+        Gen<WriteThresholdCounter> counter = counter();
+        Gen<WriteWarningsSnapshot> gen = rs ->
+        {
+            WriteThresholdCounter writeSize = counter.generate(rs);
+            WriteThresholdCounter writeTombstone = counter.generate(rs);
+            return create(writeSize, writeTombstone);
+        };
+        return gen.assuming(snapshot -> 
!snapshot.isEmpty()).describedAs(WriteWarningsSnapshot::toString);
+    }
+
+    private static Gen<WriteThresholdCounter> counter()
+    {
+        Gen<Boolean> isEmpty = SourceDSL.booleans().all();
+        Constraint maxValue = Constraint.between(1, Long.MAX_VALUE);
+        Gen<WriteThresholdCounter> gen = rs ->
+        {
+            if (isEmpty.generate(rs))
+                return WriteThresholdCounter.empty();
+            Map<TableId, Long> values = new HashMap<>();
+            values.put(TABLE1, rs.next(maxValue));
+            if (rs.next(Constraint.between(0, 1)) == 1)
+                values.put(TABLE2, rs.next(maxValue));
+            return WriteThresholdCounter.create(values);
+        };
+        return gen.describedAs(WriteThresholdCounter::toString);
+    }
+
+    private static Map<TableId, Long> map(TableId t1, long v1)
+    {
+        Map<TableId, Long> m = new HashMap<>();
+        m.put(t1, v1);
+        return m;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to