Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 23 additions & 15 deletions src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -535,14 +535,19 @@ void ExportPartitionManifestUpdatingTask::poll()

ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetWatch);
std::string status;
if (!zk->tryGetWatch(fs::path(entry_path) / "status", status, nullptr, status_watch_callback))
std::string status_string;
if (!zk->tryGetWatch(fs::path(entry_path) / "status", status_string, nullptr, status_watch_callback))
{
LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Skipping {}: missing status", key);
continue;
}

bool is_pending = status == "PENDING";
const auto status = magic_enum::enum_cast<ExportReplicatedMergeTreePartitionTaskEntry::Status>(status_string);
if (!status)
{
LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Invalid status {} for task {}, skipping", status_string, key);
continue;
}

/// if we have the cleanup lock, try to cleanup
/// if we successfully cleaned it up, early exit
Expand All @@ -556,25 +561,20 @@ void ExportPartitionManifestUpdatingTask::poll()
key,
metadata,
now,
is_pending, entries_by_key);
*status == ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING,
entries_by_key);

if (cleanup_successful)
continue;
}

if (!is_pending)
{
LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Skipping {}: status is not PENDING", key);
continue;
}

if (has_local_entry_and_is_up_to_date)
{
LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Skipping {}: already exists", key);
continue;
}

addTask(metadata, key, entries_by_key);
addTask(metadata, *status, key, entries_by_key);
}

/// Remove entries that were deleted by someone else
Expand All @@ -587,22 +587,30 @@ void ExportPartitionManifestUpdatingTask::poll()

void ExportPartitionManifestUpdatingTask::addTask(
const ExportReplicatedMergeTreePartitionManifest & metadata,
ExportReplicatedMergeTreePartitionTaskEntry::Status status,
const std::string & key,
auto & entries_by_key
)
{
std::vector<DataPartPtr> part_references;

for (const auto & part_name : metadata.parts)
/// If the status is PENDING, we grab references to the data parts to prevent them from being deleted from the disk
/// Otherwise, the operation has already been completed and there is no need to keep the data parts alive
/// You might also ask: why bother adding tasks that have already been completed (i.e, status != PENDING)?
/// The reason is the `replicated_partition_exports` table in the local only mode might miss entries if they are not added here.
if (status == ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING)
{
if (const auto part = storage.getPartIfExists(part_name, {MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated}))
for (const auto & part_name : metadata.parts)
{
part_references.push_back(part);
if (const auto part = storage.getPartIfExists(part_name, {MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated}))
{
part_references.push_back(part);
}
}
}

/// Insert or update entry. The multi_index container automatically maintains both indexes.
auto entry = ExportReplicatedMergeTreePartitionTaskEntry {metadata, ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, std::move(part_references)};
auto entry = ExportReplicatedMergeTreePartitionTaskEntry {metadata, status, std::move(part_references)};
auto it = entries_by_key.find(key);
if (it != entries_by_key.end())
entries_by_key.replace(it, entry);
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <string>
#include <unordered_set>
#include <Storages/System/StorageSystemReplicatedPartitionExports.h>
#include <Storages/ExportReplicatedMergeTreePartitionTaskEntry.h>
namespace DB
{

Expand All @@ -31,6 +32,7 @@ class ExportPartitionManifestUpdatingTask

void addTask(
const ExportReplicatedMergeTreePartitionManifest & metadata,
ExportReplicatedMergeTreePartitionTaskEntry::Status status,
const std::string & key,
auto & entries_by_key
);
Expand Down
Loading