Skip to content

Comments

CASSANDRA-17258: Add write threshold warnings for large partitions and tombstones#4556

Open
minal-kyada wants to merge 15 commits intoapache:trunkfrom
minal-kyada:cassandra-17258
Open

CASSANDRA-17258: Add write threshold warnings for large partitions and tombstones#4556
minal-kyada wants to merge 15 commits intoapache:trunkfrom
minal-kyada:cassandra-17258

Conversation

@minal-kyada
Copy link

Summary

Implements replica-side write threshold warnings to detect and warn about writes to large partitions or partitions with many tombstones. This follows the same architecture as the existing read threshold warnings but is warning-only (no blocking behavior).

Related Work

This implementation follows the pattern established by read threshold warnings but adapts it for write operations with a warning-only approach.

patch by Minal Kyada reviewed by TBD for CASSANDRA-17258

if (!Paxos.isInRangeAndShouldProcess(from, agreed.update.partitionKey(), agreed.update.metadata(), false))
return null;

WriteThresholds.checkWriteThresholds(agreed.update, agreed.update.partitionKey());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im not sure if this would actually work in practice

if (executeOnSelf)
        {
            ExecutorPlus executor = PAXOS_COMMIT_REQ.stage.executor();
            if (async) executor.execute(this::executeOnSelf);
            else executor.maybeExecuteImmediately(this::executeOnSelf);
        }

in some code paths we do a blocking execute, in others we do it async and ignore when it completes

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, we hold onto making these changes in this patch.

@minal-kyada minal-kyada force-pushed the cassandra-17258 branch 2 times, most recently from 803344e to d49ca1c Compare January 16, 2026 23:11
@minal-kyada minal-kyada force-pushed the cassandra-17258 branch 4 times, most recently from 64ebdb9 to b918756 Compare January 29, 2026 22:49
@minal-kyada minal-kyada requested a review from dcapwell January 30, 2026 20:55
@Test
public void testMergeSelfWithSelf()
{
qt().forAll(all()).check(snapshot -> snapshot.merge(snapshot).equals(snapshot));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<3

@Test
public void testMergeCommutative()
{
qt().forAll(all(), all()).check((a, b) -> a.merge(b).equals(b.merge(a)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<3

Copy link
Contributor

@dcapwell dcapwell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My approval assumes we fix https://github.com/apache/cassandra/pull/4556/changes#r2748568224 and the other comment in that file

Copy link
Contributor

@dcapwell dcapwell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to latest changes.
+1 to the patch


long sizeWarnBytes = sizeWarnThreshold != null ? sizeWarnThreshold.toBytes() : -1;

for (PartitionUpdate update : mutation.getPartitionUpdates())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we only need to check one time for each table, I think we can iterate over mutation.getTableIds() here instead, number of updated tables is smaller or equal than the total number of updates

Also, once we have warned for both tombstones and size, we can stop iterating

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am checking partition-level thresholds, not table level. If I iterate over mutation.getTableIds() won't the context completely change ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All PartitionUpdates in a Mutation are for the same partition, so we can just grab mutation.key() and check that for the tables

Copy link
Author

@minal-kyada minal-kyada Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public static void checkWriteThresholds(Mutation mutation)
    {
        if (!DatabaseDescriptor.isDaemonInitialized() || !DatabaseDescriptor.getWriteThresholdsEnabled())
            return;

        DataStorageSpec.LongBytesBound sizeWarnThreshold = DatabaseDescriptor.getWriteSizeWarnThreshold();
        int tombstoneWarnThreshold = DatabaseDescriptor.getWriteTombstoneWarnThreshold();

        if (sizeWarnThreshold == null && tombstoneWarnThreshold == -1)
            return;

        long sizeWarnBytes = sizeWarnThreshold != null ? sizeWarnThreshold.toBytes() : -1;

        DecoratedKey key = mutation.key();
        boolean warnedSize = false;
        boolean warnedTombstone = false;

        for (TableId tableId : mutation.getTableIds())
        {
            ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(tableId);
            if (cfs == null || cfs.topPartitions == null)
                continue;

            TableMetadata metadata = cfs.metadata();
            if (sizeWarnBytes != -1 && !warnedSize)
            {
                long estimatedSize = cfs.topPartitions.topSizes().getEstimate(key);
                if (estimatedSize > sizeWarnBytes)
                {
                    MessageParams.add(ParamType.WRITE_SIZE_WARN, estimatedSize);
                    noSpamLogger.warn("Write to {} partition {} triggered size warning; " +
                                      "estimated size is {} bytes, threshold is {} bytes (see write_size_warn_threshold)",
                                      metadata, metadata.partitionKeyType.toCQLString(key.getKey()), estimatedSize, sizeWarnBytes);
                    warnedSize = true;
                }
            }

            if (tombstoneWarnThreshold != -1 && !warnedTombstone)
            {
                long estimatedTombstones = cfs.topPartitions.topTombstones().getEstimate(key);
                if (estimatedTombstones > tombstoneWarnThreshold)
                {
                    MessageParams.add(ParamType.WRITE_TOMBSTONE_WARN, (int) estimatedTombstones);
                    noSpamLogger.warn("Write to {} partition {} triggered tombstone warning; " +
                                      "estimated tombstone count is {}, threshold is {} (see write_tombstone_warn_threshold)",
                                      metadata, metadata.partitionKeyType.toCQLString(key.getKey()), estimatedTombstones, tombstoneWarnThreshold);
                    warnedTombstone = true;
                }
            }
        }
    }

I was wondering: once it warns for any table, the flag is set and it won't warn for subsequent tables. However, in the existing code, we ensure that we always report the worst offender across all partition updates.

I thought that if a mutation touches multiple tables, we'd want to know about the biggest problem. If the goal is just to warn once irrespective of who the culprit is, then yes, we can proceed with these changes. Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we just set MessageParams.add(ParamType.WRITE_TOMBSTONE_WARN, (int) estimatedTombstones); not which table it is related to, and the actual number probably doesn't matter

{
MessageParams.add(ParamType.WRITE_SIZE_WARN, estimatedSize);
noSpamLogger.warn("Write to {} partition {} triggered size warning; " +
"estimated size is {} bytes, threshold is {} bytes (see write_size_warn_threshold)",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should log the keyspace and table here


private static void validateWriteThreshold(String name, int value)
{
if (value < -1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should validate that the write thresholds are larger than
min_tracked_partition_size and min_tracked_partition_tombstone_count otherwise we might miss some partitions we'd want to warn for.

We should also make sure that top_partitions_enabled is true if write_thresholds_enabled (or at least log a warning otherwise)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I have fixed this with the new changes now!

protected abstract long totalWarnings();
protected abstract void assertWarnings(List<String> warnings);
protected abstract void populateTopPartitions(int pk, long value);
protected abstract void clearTopPartitions();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the implementations of this methods are both empty, can remove

public static void update(IMutation mutation, WriteWarningsSnapshot snapshot)
{
if (snapshot.isEmpty())
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit; no need for brackets when there is only a single statement


public static WriteWarningsSnapshot create(ThresholdCounter writeSize, ThresholdCounter writeTombstone)
{
if (writeSize == ThresholdCounter.empty() && writeTombstone == ThresholdCounter.empty())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a method isEmpty() in ThresholdCounter?

private @Nullable final Supplier<Mutation> hintOnFailure;
private volatile WriteWarningContext warningContext;
private static final AtomicReferenceFieldUpdater<AbstractWriteResponseHandler, WriteWarningContext> warningsUpdater
= AtomicReferenceFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, WriteWarningContext.class, "warningContext");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit; follow the code style here (see above, line 88/89 + 91/92)

{
if (other == null || other == EMPTY)
return this;
return WriteWarningsSnapshot.create(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the recommended code style here is something like:

        return WriteWarningsSnapshot.create(writeSize.merge(other.writeSize), 
                                            writeTombstone.merge(other.writeTombstone));

{
MessageParams.reset();

boolean trackWriteWarnings = description instanceof Mutation && handler instanceof AbstractWriteResponseHandler;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably also check if DatabaseDescriptor.getWriteThresholdsEnabled() is true here, to avoid that slightly expensive block below

Copy link
Author

@minal-kyada minal-kyada Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, missed this important check! Thank you!


public void onResponse(Message<T> m)
{
if (m != null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like m == null indicates a response from the local host, should we instead assume from = FBUtilities.getBroadcastAddressAndPort() if m==null in this block?

Copy link
Author

@minal-kyada minal-kyada Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I doubt, mainly due to 3 reasons:

  1. For local writes, warning context is already updated before onResponse is called in StorageProxy
  2. By the time onResponse(null) is called, the params have already been captured
  3. We can't capture params again in onResponse because MessageParams will be reset in the finally block

thoughts? I can add a clarifying comment, if that helps ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mistakenly thought that I was resetting the messageParams before calling the handler method, because MP are in thread-local, they won't be accessible in handler!

But I just realized, that I reset them in finally block and handler is called before that, which means it still has MP, and the handler can access MP from thread-local, and I can achieve unified handling!

I was skeptical about the thread-local working. But, I tested the changes.
looks like handler gets the MP and it is propagated correctly to the caller. So, it works fine!

Warnings warnings = STATE.mutable();
if (warnings == EMPTY) return;

for (PartitionUpdate update : mutation.getPartitionUpdates())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can iterate mutation.getTableIds() here as well

But I think it is incorrect - if we write to multiple tables and only one of the tables warns, we'll set the same snapshot on all tables

params = m != null ? m.header.params() : MessageParams.capture();

if (WriteWarningContext.isSupported(params.keySet()))
getWarningContext().updateCounters(params, from);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this also needs to be done in DatacenterSyncWriteResponseHandler#onResponse otherwise EACH_QUORUM queries won't warn

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants