Skip to content
Open
6 changes: 6 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ class IDataLakeMetadata : boost::noncopyable

virtual void modifyFormatSettings(FormatSettings &, const Context &) const {}

virtual bool supportsTruncate() const { return false; }
virtual void truncate(ContextPtr /*context*/, std::shared_ptr<DataLake::ICatalog> /*catalog*/, const StorageID & /*storage_id*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncation is not supported by {} metadata", getName());
}

static constexpr bool supportsTotalRows() { return false; }
virtual std::optional<size_t> totalRows(ContextPtr) const { return {}; }
static constexpr bool supportsTotalBytes() { return false; }
Expand Down
82 changes: 81 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
#include <Core/UUID.h>
#include <DataTypes/DataTypeSet.h>
#include <Formats/FormatFilterInfo.h>
#include <Formats/FormatParserSharedResources.h>
#include <Formats/ReadSchemaUtils.h>
#include <Functions/FunctionFactory.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.h>
#include <Functions/IFunctionAdaptors.h>
#include <Functions/tuple.h>
#include <Processors/Formats/ISchemaReader.h>
Expand Down Expand Up @@ -531,6 +531,86 @@ void IcebergMetadata::mutate(
);
}

void IcebergMetadata::truncate(ContextPtr context, std::shared_ptr<DataLake::ICatalog> catalog, const StorageID & storage_id)
{
if (!context->getSettingsRef()[Setting::allow_experimental_insert_into_iceberg].value)
{
throw Exception(
ErrorCodes::SUPPORT_IS_DISABLED,
"Iceberg truncate is experimental. "
"To allow its usage, enable setting allow_experimental_insert_into_iceberg");
}

auto [actual_data_snapshot, actual_table_state_snapshot] = getRelevantState(context);
auto metadata_object = getMetadataJSONObject(
actual_table_state_snapshot.metadata_file_path,
object_storage,
persistent_components.metadata_cache,
context,
log,
persistent_components.metadata_compression_method,
persistent_components.table_uuid);

Int64 parent_snapshot_id = actual_table_state_snapshot.snapshot_id.value_or(0);
auto config_path = persistent_components.table_path;
if (config_path.empty() || config_path.back() != '/')
config_path += "/";
if (!config_path.starts_with('/'))
config_path = '/' + config_path;

FileNamesGenerator filename_generator;
if (!context->getSettingsRef()[Setting::write_full_path_in_iceberg_metadata])
{
filename_generator = FileNamesGenerator(
config_path, config_path, (catalog != nullptr && catalog->isTransactional()), persistent_components.metadata_compression_method, write_format);
}
else
{
auto bucket = metadata_object->getValue<String>(Iceberg::f_location);
if (bucket.empty() || bucket.back() != '/')
bucket += "/";
filename_generator = FileNamesGenerator(
bucket, config_path, (catalog != nullptr && catalog->isTransactional()), persistent_components.metadata_compression_method, write_format);
}

Int32 new_metadata_version = actual_table_state_snapshot.metadata_version + 1;
filename_generator.setVersion(new_metadata_version);
auto [metadata_name, storage_metadata_name] = filename_generator.generateMetadataName();

auto [new_snapshot, manifest_list_name, storage_manifest_list_name] = MetadataGenerator(metadata_object).generateNextMetadata(
filename_generator, metadata_name, parent_snapshot_id,
/* added_files */ 0, /* added_records */ 0, /* added_files_size */ 0,
/* num_partitions */ 0, /* added_delete_files */ 0, /* num_deleted_rows */ 0);

// generate manifest list with 0 manifest files
int format_version = 1;
if (metadata_object->has("format-version"))
format_version = metadata_object->getValue<int>("format-version");
bool is_v2 = (format_version == 2);

// generate manifest list with 0 manifest files
auto write_settings = context->getWriteSettings();
auto buf = object_storage->writeObject(
StoredObject(storage_manifest_list_name),
WriteMode::Rewrite,
std::nullopt,
DBMS_DEFAULT_BUFFER_SIZE,
write_settings
);
generateManifestList(filename_generator, metadata_object, object_storage, context, {}, new_snapshot, 0, *buf, Iceberg::FileContentType::DATA, is_v2);
buf->finalize();

String metadata_content = dumpMetadataObjectToString(metadata_object);
writeMessageToFile(metadata_content, storage_metadata_name, object_storage, context, "*", "", persistent_components.metadata_compression_method);
if (catalog)
{
const auto & [namespace_name, table_name] = DataLake::parseTableName(storage_id.getTableName());
bool success = catalog->updateMetadata(namespace_name, table_name, storage_metadata_name, metadata_object);
if (!success)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to commit Iceberg truncate update to catalog.");
}
}

void IcebergMetadata::checkMutationIsPossible(const MutationCommands & commands)
{
for (const auto & command : commands)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ class IcebergMetadata : public IDataLakeMetadata
bool supportsUpdate() const override { return true; }
bool supportsWrites() const override { return true; }
bool supportsParallelInsert() const override { return true; }
bool supportsTruncate() const override { return true; }

void truncate(ContextPtr context, std::shared_ptr<DataLake::ICatalog> catalog, const StorageID & storage_id) override;

IcebergHistory getHistory(ContextPtr local_context) const;

Expand Down
10 changes: 7 additions & 3 deletions src/Storages/ObjectStorage/StorageObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ void StorageObjectStorage::commitExportPartitionTransaction(const String & trans
void StorageObjectStorage::truncate(
const ASTPtr & /* query */,
const StorageMetadataPtr & /* metadata_snapshot */,
ContextPtr /* context */,
ContextPtr context,
TableExclusiveLockHolder & /* table_holder */)
{
const auto path = configuration->getRawPath();
Expand All @@ -639,8 +639,12 @@ void StorageObjectStorage::truncate(

if (configuration->isDataLakeConfiguration())
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Truncate is not supported for data lake engine");
auto * data_lake_metadata = getExternalMetadata(context);
if (!data_lake_metadata || !data_lake_metadata->supportsTruncate())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate is not supported for this data lake engine");

data_lake_metadata->truncate(context, catalog, getStorageID());
return;
}

if (path.hasGlobs())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#!/usr/bin/env python3

from pyiceberg.catalog import load_catalog
from helpers.config_cluster import minio_secret_key, minio_access_key
import uuid
import pyarrow as pa
from pyiceberg.schema import Schema, NestedField
from pyiceberg.types import LongType, StringType
from pyiceberg.partitioning import PartitionSpec

BASE_URL_LOCAL_RAW = "http://localhost:8182"
CATALOG_NAME = "demo"

def load_catalog_impl(started_cluster):
return load_catalog(
CATALOG_NAME,
**{
"uri": BASE_URL_LOCAL_RAW,
"type": "rest",
"s3.endpoint": f"http://{started_cluster.get_instance_ip('minio')}:9000",
"s3.access-key-id": minio_access_key,
"s3.secret-access-key": minio_secret_key,
},
)


def test_iceberg_truncate(started_cluster_iceberg_no_spark):
instance = started_cluster_iceberg_no_spark.instances["node1"]
catalog = load_catalog_impl(started_cluster_iceberg_no_spark)

# 1. Setup PyIceberg Namespace and Table
namespace = f"clickhouse_truncate_{uuid.uuid4().hex}"
catalog.create_namespace(namespace)

schema = Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=False),
NestedField(field_id=2, name="val", field_type=StringType(), required=False),
)

table_name = "test_truncate"
table = catalog.create_table(
identifier=f"{namespace}.{table_name}",
schema=schema,
location=f"s3://warehouse-rest/{namespace}.{table_name}",
partition_spec=PartitionSpec(),
)

# 2. Populate Data
df = pa.Table.from_pylist([
{"id": 1, "val": "A"},
{"id": 2, "val": "B"},
{"id": 3, "val": "C"},
])
table.append(df)

# Validate data is in iceberg
assert len(table.scan().to_arrow()) == 3

# 3. Setup ClickHouse Database
instance.query(f"DROP DATABASE IF EXISTS {namespace}")
instance.query(
f"""
CREATE DATABASE {namespace} ENGINE = DataLakeCatalog('http://rest:8181/v1', 'minio', '{minio_secret_key}')
SETTINGS
catalog_type='rest',
warehouse='demo',
storage_endpoint='http://minio:9000/warehouse-rest';
""",
settings={"allow_database_iceberg": 1}
)

# 4. Formulate the ClickHouse Table Identifier
# MUST wrap the inner table name in backticks so ClickHouse parses the Iceberg namespace correctly
ch_table_identifier = f"`{namespace}.{table_name}`"

# Assert data from ClickHouse
assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 3

# 5. Truncate Table via ClickHouse
instance.query(
f"TRUNCATE TABLE {namespace}.{ch_table_identifier}",
settings={"allow_experimental_insert_into_iceberg": 1}
)

# Assert truncated from ClickHouse
assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 0

# 6. Cross-Engine Validation using PyIceberg
# Refresh table state to grab the new v<N>.metadata.json you generated
table.refresh()

# Assert PyIceberg reads the empty snapshot successfully
assert len(table.scan().to_arrow()) == 0

# Cleanup
instance.query(f"DROP DATABASE {namespace}")
Loading