CASSANDRA-17258: Add write threshold warnings for large partitions and tombstones#4556
CASSANDRA-17258: Add write threshold warnings for large partitions and tombstones#4556minal-kyada wants to merge 15 commits intoapache:trunkfrom
Conversation
src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
Outdated
Show resolved
Hide resolved
src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
Outdated
Show resolved
Hide resolved
src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
Outdated
Show resolved
Hide resolved
19d42eb to
e145c0b
Compare
| if (!Paxos.isInRangeAndShouldProcess(from, agreed.update.partitionKey(), agreed.update.metadata(), false)) | ||
| return null; | ||
|
|
||
| WriteThresholds.checkWriteThresholds(agreed.update, agreed.update.partitionKey()); |
There was a problem hiding this comment.
im not sure if this would actually work in practice
if (executeOnSelf)
{
ExecutorPlus executor = PAXOS_COMMIT_REQ.stage.executor();
if (async) executor.execute(this::executeOnSelf);
else executor.maybeExecuteImmediately(this::executeOnSelf);
}
in some code paths we do a blocking execute, in others we do it async and ignore when it completes
There was a problem hiding this comment.
As discussed, we hold onto making these changes in this patch.
src/java/org/apache/cassandra/service/writes/thresholds/CoordinatorWriteWarnings.java
Show resolved
Hide resolved
src/java/org/apache/cassandra/service/writes/thresholds/WarnCounter.java
Show resolved
Hide resolved
803344e to
d49ca1c
Compare
src/java/org/apache/cassandra/service/reads/thresholds/CoordinatorWarnings.java
Outdated
Show resolved
Hide resolved
0763316 to
e8941a9
Compare
src/java/org/apache/cassandra/service/thresholds/CoordinatorWarningsState.java
Outdated
Show resolved
Hide resolved
src/java/org/apache/cassandra/service/thresholds/CoordinatorWarningsState.java
Outdated
Show resolved
Hide resolved
test/distributed/org/apache/cassandra/distributed/impl/CoordinatorHelper.java
Show resolved
Hide resolved
test/distributed/org/apache/cassandra/distributed/impl/CoordinatorHelper.java
Show resolved
Hide resolved
test/distributed/org/apache/cassandra/distributed/impl/CoordinatorHelper.java
Outdated
Show resolved
Hide resolved
test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java
Outdated
Show resolved
Hide resolved
64ebdb9 to
b918756
Compare
src/java/org/apache/cassandra/service/reads/thresholds/CoordinatorWarnings.java
Outdated
Show resolved
Hide resolved
src/java/org/apache/cassandra/service/writes/thresholds/CoordinatorWriteWarnings.java
Outdated
Show resolved
Hide resolved
src/java/org/apache/cassandra/service/writes/thresholds/WarnCounter.java
Outdated
Show resolved
Hide resolved
src/java/org/apache/cassandra/service/writes/thresholds/WriteWarningsSnapshot.java
Outdated
Show resolved
Hide resolved
src/java/org/apache/cassandra/service/writes/thresholds/CoordinatorWriteWarnings.java
Show resolved
Hide resolved
src/java/org/apache/cassandra/service/writes/thresholds/CoordinatorWriteWarnings.java
Outdated
Show resolved
Hide resolved
src/java/org/apache/cassandra/service/WriteResponseHandler.java
Outdated
Show resolved
Hide resolved
test/distributed/org/apache/cassandra/distributed/impl/CoordinatorHelper.java
Outdated
Show resolved
Hide resolved
...tributed/org/apache/cassandra/distributed/test/thresholds/AbstractWriteThresholdWarning.java
Outdated
Show resolved
Hide resolved
| @Test | ||
| public void testMergeSelfWithSelf() | ||
| { | ||
| qt().forAll(all()).check(snapshot -> snapshot.merge(snapshot).equals(snapshot)); |
| @Test | ||
| public void testMergeCommutative() | ||
| { | ||
| qt().forAll(all(), all()).check((a, b) -> a.merge(b).equals(b.merge(a))); |
dcapwell
left a comment
There was a problem hiding this comment.
My approval assumes we fix https://github.com/apache/cassandra/pull/4556/changes#r2748568224 and the other comment in that file
b918756 to
edf8225
Compare
dcapwell
left a comment
There was a problem hiding this comment.
+1 to latest changes.
+1 to the patch
|
|
||
| long sizeWarnBytes = sizeWarnThreshold != null ? sizeWarnThreshold.toBytes() : -1; | ||
|
|
||
| for (PartitionUpdate update : mutation.getPartitionUpdates()) |
There was a problem hiding this comment.
Since we only need to check one time for each table, I think we can iterate over mutation.getTableIds() here instead, number of updated tables is smaller or equal than the total number of updates
Also, once we have warned for both tombstones and size, we can stop iterating
There was a problem hiding this comment.
I am checking partition-level thresholds, not table level. If I iterate over mutation.getTableIds() won't the context completely change ?
There was a problem hiding this comment.
All PartitionUpdates in a Mutation are for the same partition, so we can just grab mutation.key() and check that for the tables
There was a problem hiding this comment.
public static void checkWriteThresholds(Mutation mutation)
{
if (!DatabaseDescriptor.isDaemonInitialized() || !DatabaseDescriptor.getWriteThresholdsEnabled())
return;
DataStorageSpec.LongBytesBound sizeWarnThreshold = DatabaseDescriptor.getWriteSizeWarnThreshold();
int tombstoneWarnThreshold = DatabaseDescriptor.getWriteTombstoneWarnThreshold();
if (sizeWarnThreshold == null && tombstoneWarnThreshold == -1)
return;
long sizeWarnBytes = sizeWarnThreshold != null ? sizeWarnThreshold.toBytes() : -1;
DecoratedKey key = mutation.key();
boolean warnedSize = false;
boolean warnedTombstone = false;
for (TableId tableId : mutation.getTableIds())
{
ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(tableId);
if (cfs == null || cfs.topPartitions == null)
continue;
TableMetadata metadata = cfs.metadata();
if (sizeWarnBytes != -1 && !warnedSize)
{
long estimatedSize = cfs.topPartitions.topSizes().getEstimate(key);
if (estimatedSize > sizeWarnBytes)
{
MessageParams.add(ParamType.WRITE_SIZE_WARN, estimatedSize);
noSpamLogger.warn("Write to {} partition {} triggered size warning; " +
"estimated size is {} bytes, threshold is {} bytes (see write_size_warn_threshold)",
metadata, metadata.partitionKeyType.toCQLString(key.getKey()), estimatedSize, sizeWarnBytes);
warnedSize = true;
}
}
if (tombstoneWarnThreshold != -1 && !warnedTombstone)
{
long estimatedTombstones = cfs.topPartitions.topTombstones().getEstimate(key);
if (estimatedTombstones > tombstoneWarnThreshold)
{
MessageParams.add(ParamType.WRITE_TOMBSTONE_WARN, (int) estimatedTombstones);
noSpamLogger.warn("Write to {} partition {} triggered tombstone warning; " +
"estimated tombstone count is {}, threshold is {} (see write_tombstone_warn_threshold)",
metadata, metadata.partitionKeyType.toCQLString(key.getKey()), estimatedTombstones, tombstoneWarnThreshold);
warnedTombstone = true;
}
}
}
}
I was wondering: once it warns for any table, the flag is set and it won't warn for subsequent tables. However, in the existing code, we ensure that we always report the worst offender across all partition updates.
I thought that if a mutation touches multiple tables, we'd want to know about the biggest problem. If the goal is just to warn once irrespective of who the culprit is, then yes, we can proceed with these changes. Thoughts?
There was a problem hiding this comment.
we just set MessageParams.add(ParamType.WRITE_TOMBSTONE_WARN, (int) estimatedTombstones); not which table it is related to, and the actual number probably doesn't matter
| { | ||
| MessageParams.add(ParamType.WRITE_SIZE_WARN, estimatedSize); | ||
| noSpamLogger.warn("Write to {} partition {} triggered size warning; " + | ||
| "estimated size is {} bytes, threshold is {} bytes (see write_size_warn_threshold)", |
There was a problem hiding this comment.
we should log the keyspace and table here
|
|
||
| private static void validateWriteThreshold(String name, int value) | ||
| { | ||
| if (value < -1) |
There was a problem hiding this comment.
we should validate that the write thresholds are larger than
min_tracked_partition_size and min_tracked_partition_tombstone_count otherwise we might miss some partitions we'd want to warn for.
We should also make sure that top_partitions_enabled is true if write_thresholds_enabled (or at least log a warning otherwise)
There was a problem hiding this comment.
Sure, I have fixed this with the new changes now!
| protected abstract long totalWarnings(); | ||
| protected abstract void assertWarnings(List<String> warnings); | ||
| protected abstract void populateTopPartitions(int pk, long value); | ||
| protected abstract void clearTopPartitions(); |
There was a problem hiding this comment.
the implementations of this methods are both empty, can remove
| public static void update(IMutation mutation, WriteWarningsSnapshot snapshot) | ||
| { | ||
| if (snapshot.isEmpty()) | ||
| { |
There was a problem hiding this comment.
nit; no need for brackets when there is only a single statement
|
|
||
| public static WriteWarningsSnapshot create(ThresholdCounter writeSize, ThresholdCounter writeTombstone) | ||
| { | ||
| if (writeSize == ThresholdCounter.empty() && writeTombstone == ThresholdCounter.empty()) |
There was a problem hiding this comment.
Maybe add a method isEmpty() in ThresholdCounter?
| private @Nullable final Supplier<Mutation> hintOnFailure; | ||
| private volatile WriteWarningContext warningContext; | ||
| private static final AtomicReferenceFieldUpdater<AbstractWriteResponseHandler, WriteWarningContext> warningsUpdater | ||
| = AtomicReferenceFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, WriteWarningContext.class, "warningContext"); |
There was a problem hiding this comment.
nit; follow the code style here (see above, line 88/89 + 91/92)
| { | ||
| if (other == null || other == EMPTY) | ||
| return this; | ||
| return WriteWarningsSnapshot.create( |
There was a problem hiding this comment.
I think the recommended code style here is something like:
return WriteWarningsSnapshot.create(writeSize.merge(other.writeSize),
writeTombstone.merge(other.writeTombstone));
| { | ||
| MessageParams.reset(); | ||
|
|
||
| boolean trackWriteWarnings = description instanceof Mutation && handler instanceof AbstractWriteResponseHandler; |
There was a problem hiding this comment.
We should probably also check if DatabaseDescriptor.getWriteThresholdsEnabled() is true here, to avoid that slightly expensive block below
There was a problem hiding this comment.
Ah, missed this important check! Thank you!
|
|
||
| public void onResponse(Message<T> m) | ||
| { | ||
| if (m != null) |
There was a problem hiding this comment.
looks like m == null indicates a response from the local host, should we instead assume from = FBUtilities.getBroadcastAddressAndPort() if m==null in this block?
There was a problem hiding this comment.
I doubt, mainly due to 3 reasons:
- For local writes, warning context is already updated before onResponse is called in
StorageProxy - By the time
onResponse(null)is called, the params have already been captured - We can't capture params again in onResponse because
MessageParamswill be reset in the finally block
thoughts? I can add a clarifying comment, if that helps ?
There was a problem hiding this comment.
I mistakenly thought that I was resetting the messageParams before calling the handler method, because MP are in thread-local, they won't be accessible in handler!
But I just realized, that I reset them in finally block and handler is called before that, which means it still has MP, and the handler can access MP from thread-local, and I can achieve unified handling!
I was skeptical about the thread-local working. But, I tested the changes.
looks like handler gets the MP and it is propagated correctly to the caller. So, it works fine!
081deed to
13e5a50
Compare
| Warnings warnings = STATE.mutable(); | ||
| if (warnings == EMPTY) return; | ||
|
|
||
| for (PartitionUpdate update : mutation.getPartitionUpdates()) |
There was a problem hiding this comment.
can iterate mutation.getTableIds() here as well
But I think it is incorrect - if we write to multiple tables and only one of the tables warns, we'll set the same snapshot on all tables
| params = m != null ? m.header.params() : MessageParams.capture(); | ||
|
|
||
| if (WriteWarningContext.isSupported(params.keySet())) | ||
| getWarningContext().updateCounters(params, from); |
There was a problem hiding this comment.
this also needs to be done in DatacenterSyncWriteResponseHandler#onResponse otherwise EACH_QUORUM queries won't warn
Summary
Implements replica-side write threshold warnings to detect and warn about writes to large partitions or partitions with many tombstones. This follows the same architecture as the existing read threshold warnings but is warning-only (no blocking behavior).
Related Work
This implementation follows the pattern established by read threshold warnings but adapts it for write operations with a warning-only approach.
patch by Minal Kyada reviewed by TBD for CASSANDRA-17258