diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index 8f3542165eb7..f58bf8d04dcd 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -144,6 +144,12 @@ class IDataLakeMetadata : boost::noncopyable virtual void modifyFormatSettings(FormatSettings &, const Context &) const {} + virtual bool supportsTruncate() const { return false; } + virtual void truncate(ContextPtr /*context*/, std::shared_ptr /*catalog*/, const StorageID & /*storage_id*/) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncation is not supported by {} metadata", getName()); + } + static constexpr bool supportsTotalRows() { return false; } virtual std::optional totalRows(ContextPtr) const { return {}; } static constexpr bool supportsTotalBytes() { return false; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index 58f2926cc07e..73e811bb5e85 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -9,9 +9,9 @@ #include #include #include -#include #include #include +#include #include #include #include @@ -531,6 +531,86 @@ void IcebergMetadata::mutate( ); } +void IcebergMetadata::truncate(ContextPtr context, std::shared_ptr catalog, const StorageID & storage_id) +{ + if (!context->getSettingsRef()[Setting::allow_experimental_insert_into_iceberg].value) + { + throw Exception( + ErrorCodes::SUPPORT_IS_DISABLED, + "Iceberg truncate is experimental. " + "To allow its usage, enable setting allow_experimental_insert_into_iceberg"); + } + + auto [actual_data_snapshot, actual_table_state_snapshot] = getRelevantState(context); + auto metadata_object = getMetadataJSONObject( + actual_table_state_snapshot.metadata_file_path, + object_storage, + persistent_components.metadata_cache, + context, + log, + persistent_components.metadata_compression_method, + persistent_components.table_uuid); + + Int64 parent_snapshot_id = actual_table_state_snapshot.snapshot_id.value_or(0); + auto config_path = persistent_components.table_path; + if (config_path.empty() || config_path.back() != '/') + config_path += "/"; + if (!config_path.starts_with('/')) + config_path = '/' + config_path; + + FileNamesGenerator filename_generator; + if (!context->getSettingsRef()[Setting::write_full_path_in_iceberg_metadata]) + { + filename_generator = FileNamesGenerator( + config_path, config_path, (catalog != nullptr && catalog->isTransactional()), persistent_components.metadata_compression_method, write_format); + } + else + { + auto bucket = metadata_object->getValue(Iceberg::f_location); + if (bucket.empty() || bucket.back() != '/') + bucket += "/"; + filename_generator = FileNamesGenerator( + bucket, config_path, (catalog != nullptr && catalog->isTransactional()), persistent_components.metadata_compression_method, write_format); + } + + Int32 new_metadata_version = actual_table_state_snapshot.metadata_version + 1; + filename_generator.setVersion(new_metadata_version); + auto [metadata_name, storage_metadata_name] = filename_generator.generateMetadataName(); + + auto [new_snapshot, manifest_list_name, storage_manifest_list_name] = MetadataGenerator(metadata_object).generateNextMetadata( + filename_generator, metadata_name, parent_snapshot_id, + /* added_files */ 0, /* added_records */ 0, /* added_files_size */ 0, + /* num_partitions */ 0, /* added_delete_files */ 0, /* num_deleted_rows */ 0); + + // generate manifest list with 0 manifest files + int format_version = 1; + if (metadata_object->has("format-version")) + format_version = metadata_object->getValue("format-version"); + bool is_v2 = (format_version == 2); + + // generate manifest list with 0 manifest files + auto write_settings = context->getWriteSettings(); + auto buf = object_storage->writeObject( + StoredObject(storage_manifest_list_name), + WriteMode::Rewrite, + std::nullopt, + DBMS_DEFAULT_BUFFER_SIZE, + write_settings + ); + generateManifestList(filename_generator, metadata_object, object_storage, context, {}, new_snapshot, 0, *buf, Iceberg::FileContentType::DATA, is_v2); + buf->finalize(); + + String metadata_content = dumpMetadataObjectToString(metadata_object); + writeMessageToFile(metadata_content, storage_metadata_name, object_storage, context, "*", "", persistent_components.metadata_compression_method); + if (catalog) + { + const auto & [namespace_name, table_name] = DataLake::parseTableName(storage_id.getTableName()); + bool success = catalog->updateMetadata(namespace_name, table_name, storage_metadata_name, metadata_object); + if (!success) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to commit Iceberg truncate update to catalog."); + } +} + void IcebergMetadata::checkMutationIsPossible(const MutationCommands & commands) { for (const auto & command : commands) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index dcb2b91131bd..9f1b0408d156 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -83,6 +83,9 @@ class IcebergMetadata : public IDataLakeMetadata bool supportsUpdate() const override { return true; } bool supportsWrites() const override { return true; } bool supportsParallelInsert() const override { return true; } + bool supportsTruncate() const override { return true; } + + void truncate(ContextPtr context, std::shared_ptr catalog, const StorageID & storage_id) override; IcebergHistory getHistory(ContextPtr local_context) const; diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index f04be5d5f946..7fe3ce25debe 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -625,7 +625,7 @@ void StorageObjectStorage::commitExportPartitionTransaction(const String & trans void StorageObjectStorage::truncate( const ASTPtr & /* query */, const StorageMetadataPtr & /* metadata_snapshot */, - ContextPtr /* context */, + ContextPtr context, TableExclusiveLockHolder & /* table_holder */) { const auto path = configuration->getRawPath(); @@ -639,8 +639,12 @@ void StorageObjectStorage::truncate( if (configuration->isDataLakeConfiguration()) { - throw Exception(ErrorCodes::NOT_IMPLEMENTED, - "Truncate is not supported for data lake engine"); + auto * data_lake_metadata = getExternalMetadata(context); + if (!data_lake_metadata || !data_lake_metadata->supportsTruncate()) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate is not supported for this data lake engine"); + + data_lake_metadata->truncate(context, catalog, getStorageID()); + return; } if (path.hasGlobs()) diff --git a/tests/integration/test_storage_iceberg_no_spark/test_iceberg_truncate.py b/tests/integration/test_storage_iceberg_no_spark/test_iceberg_truncate.py new file mode 100644 index 000000000000..42465bb205ad --- /dev/null +++ b/tests/integration/test_storage_iceberg_no_spark/test_iceberg_truncate.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 + +from pyiceberg.catalog import load_catalog +from helpers.config_cluster import minio_secret_key, minio_access_key +import uuid +import pyarrow as pa +from pyiceberg.schema import Schema, NestedField +from pyiceberg.types import LongType, StringType +from pyiceberg.partitioning import PartitionSpec + +BASE_URL_LOCAL_RAW = "http://localhost:8182" +CATALOG_NAME = "demo" + +def load_catalog_impl(started_cluster): + return load_catalog( + CATALOG_NAME, + **{ + "uri": BASE_URL_LOCAL_RAW, + "type": "rest", + "s3.endpoint": f"http://{started_cluster.get_instance_ip('minio')}:9000", + "s3.access-key-id": minio_access_key, + "s3.secret-access-key": minio_secret_key, + }, + ) + + +def test_iceberg_truncate(started_cluster_iceberg_no_spark): + instance = started_cluster_iceberg_no_spark.instances["node1"] + catalog = load_catalog_impl(started_cluster_iceberg_no_spark) + + # 1. Setup PyIceberg Namespace and Table + namespace = f"clickhouse_truncate_{uuid.uuid4().hex}" + catalog.create_namespace(namespace) + + schema = Schema( + NestedField(field_id=1, name="id", field_type=LongType(), required=False), + NestedField(field_id=2, name="val", field_type=StringType(), required=False), + ) + + table_name = "test_truncate" + table = catalog.create_table( + identifier=f"{namespace}.{table_name}", + schema=schema, + location=f"s3://warehouse-rest/{namespace}.{table_name}", + partition_spec=PartitionSpec(), + ) + + # 2. Populate Data + df = pa.Table.from_pylist([ + {"id": 1, "val": "A"}, + {"id": 2, "val": "B"}, + {"id": 3, "val": "C"}, + ]) + table.append(df) + + # Validate data is in iceberg + assert len(table.scan().to_arrow()) == 3 + + # 3. Setup ClickHouse Database + instance.query(f"DROP DATABASE IF EXISTS {namespace}") + instance.query( + f""" + CREATE DATABASE {namespace} ENGINE = DataLakeCatalog('http://rest:8181/v1', 'minio', '{minio_secret_key}') + SETTINGS + catalog_type='rest', + warehouse='demo', + storage_endpoint='http://minio:9000/warehouse-rest'; + """, + settings={"allow_database_iceberg": 1} + ) + + # 4. Formulate the ClickHouse Table Identifier + # MUST wrap the inner table name in backticks so ClickHouse parses the Iceberg namespace correctly + ch_table_identifier = f"`{namespace}.{table_name}`" + + # Assert data from ClickHouse + assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 3 + + # 5. Truncate Table via ClickHouse + instance.query( + f"TRUNCATE TABLE {namespace}.{ch_table_identifier}", + settings={"allow_experimental_insert_into_iceberg": 1} + ) + + # Assert truncated from ClickHouse + assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 0 + + # 6. Cross-Engine Validation using PyIceberg + # Refresh table state to grab the new v.metadata.json you generated + table.refresh() + + # Assert PyIceberg reads the empty snapshot successfully + assert len(table.scan().to_arrow()) == 0 + + # Cleanup + instance.query(f"DROP DATABASE {namespace}") \ No newline at end of file