diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp index 18977902b03b..e5b1c72ed4c8 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp @@ -535,14 +535,19 @@ void ExportPartitionManifestUpdatingTask::poll() ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetWatch); - std::string status; - if (!zk->tryGetWatch(fs::path(entry_path) / "status", status, nullptr, status_watch_callback)) + std::string status_string; + if (!zk->tryGetWatch(fs::path(entry_path) / "status", status_string, nullptr, status_watch_callback)) { LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Skipping {}: missing status", key); continue; } - bool is_pending = status == "PENDING"; + const auto status = magic_enum::enum_cast(status_string); + if (!status) + { + LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Invalid status {} for task {}, skipping", status_string, key); + continue; + } /// if we have the cleanup lock, try to cleanup /// if we successfully cleaned it up, early exit @@ -556,25 +561,20 @@ void ExportPartitionManifestUpdatingTask::poll() key, metadata, now, - is_pending, entries_by_key); + *status == ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, + entries_by_key); if (cleanup_successful) continue; } - if (!is_pending) - { - LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Skipping {}: status is not PENDING", key); - continue; - } - if (has_local_entry_and_is_up_to_date) { LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Skipping {}: already exists", key); continue; } - addTask(metadata, key, entries_by_key); + addTask(metadata, *status, key, entries_by_key); } /// Remove entries that were deleted by someone else @@ -587,22 +587,30 @@ void ExportPartitionManifestUpdatingTask::poll() void ExportPartitionManifestUpdatingTask::addTask( const ExportReplicatedMergeTreePartitionManifest & metadata, + ExportReplicatedMergeTreePartitionTaskEntry::Status status, const std::string & key, auto & entries_by_key ) { std::vector part_references; - for (const auto & part_name : metadata.parts) + /// If the status is PENDING, we grab references to the data parts to prevent them from being deleted from the disk + /// Otherwise, the operation has already been completed and there is no need to keep the data parts alive + /// You might also ask: why bother adding tasks that have already been completed (i.e, status != PENDING)? + /// The reason is the `replicated_partition_exports` table in the local only mode might miss entries if they are not added here. + if (status == ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING) { - if (const auto part = storage.getPartIfExists(part_name, {MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated})) + for (const auto & part_name : metadata.parts) { - part_references.push_back(part); + if (const auto part = storage.getPartIfExists(part_name, {MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated})) + { + part_references.push_back(part); + } } } /// Insert or update entry. The multi_index container automatically maintains both indexes. - auto entry = ExportReplicatedMergeTreePartitionTaskEntry {metadata, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, std::move(part_references)}; + auto entry = ExportReplicatedMergeTreePartitionTaskEntry {metadata, status, std::move(part_references)}; auto it = entries_by_key.find(key); if (it != entries_by_key.end()) entries_by_key.replace(it, entry); diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h index 99078e486cb3..855ecc334c09 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h @@ -5,6 +5,7 @@ #include #include #include +#include namespace DB { @@ -31,6 +32,7 @@ class ExportPartitionManifestUpdatingTask void addTask( const ExportReplicatedMergeTreePartitionManifest & metadata, + ExportReplicatedMergeTreePartitionTaskEntry::Status status, const std::string & key, auto & entries_by_key );