diff --git a/ci/jobs/scripts/workflow_hooks/pr_body_check.py b/ci/jobs/scripts/workflow_hooks/pr_body_check.py index 315d162cd994..6c5695dcc620 100644 --- a/ci/jobs/scripts/workflow_hooks/pr_body_check.py +++ b/ci/jobs/scripts/workflow_hooks/pr_body_check.py @@ -60,6 +60,11 @@ def check_changelog_entry(category, pr_body: str) -> str: if not title or not body: print("WARNING: Failed to get PR title or body, read from environment") body = Info().pr_body + labels = Info().pr_labels + + if "release" in labels or "release-lts" in labels: + print("NOTE: Release PR detected, skipping changelog entry check") + sys.exit(0) error, category = get_category(body) if error or not category: diff --git a/ci/jobs/scripts/workflow_hooks/pr_labels_and_category.py b/ci/jobs/scripts/workflow_hooks/pr_labels_and_category.py index f8f339adc810..1f9d59fa7451 100644 --- a/ci/jobs/scripts/workflow_hooks/pr_labels_and_category.py +++ b/ci/jobs/scripts/workflow_hooks/pr_labels_and_category.py @@ -181,6 +181,9 @@ def check_labels(category, info): if __name__ == "__main__": info = Info() + if Labels.RELEASE in info.pr_labels or Labels.RELEASE_LTS in info.pr_labels: + print("NOTE: Release PR detected, skipping changelog category check") + sys.exit(0) error, category = get_category(info.pr_body) if not category or error: print(f"ERROR: {error}") diff --git a/cmake/autogenerated_versions.txt b/cmake/autogenerated_versions.txt index 23b38f3f296b..c9ed99820b38 100644 --- a/cmake/autogenerated_versions.txt +++ b/cmake/autogenerated_versions.txt @@ -2,13 +2,13 @@ # NOTE: VERSION_REVISION has nothing common with DBMS_TCP_PROTOCOL_VERSION, # only DBMS_TCP_PROTOCOL_VERSION should be incremented on protocol changes. -SET(VERSION_REVISION 54508) +SET(VERSION_REVISION 54509) SET(VERSION_MAJOR 26) SET(VERSION_MINOR 1) -SET(VERSION_PATCH 3) -SET(VERSION_GITHASH 53779390caa67a65a2368cdbb7533ed925608ba9) -SET(VERSION_DESCRIBE v26.1.3.20001.altinityantalya) -SET(VERSION_STRING 26.1.3.20001.altinityantalya) +SET(VERSION_PATCH 4) +SET(VERSION_GITHASH 5549f2acae95c6d627654f50e212a85d059a55f9) +SET(VERSION_DESCRIBE v26.1.4.20001.altinityantalya) +SET(VERSION_STRING 26.1.4.20001.altinityantalya) # end of autochange SET(VERSION_TWEAK 20001) diff --git a/src/Access/IAccessStorage.cpp b/src/Access/IAccessStorage.cpp index fdc96a6a9e0b..9039056f740d 100644 --- a/src/Access/IAccessStorage.cpp +++ b/src/Access/IAccessStorage.cpp @@ -547,7 +547,7 @@ std::optional IAccessStorage::authenticateImpl( { if (auto user = tryRead(*id)) { - AuthResult auth_result { .user_id = *id }; + AuthResult auth_result { .user_id = *id, .user_name = credentials.getUserName() }; if (!isAddressAllowed(*user, address)) throwAddressNotAllowed(address); diff --git a/src/Access/IAccessStorage.h b/src/Access/IAccessStorage.h index bb38e8ec3d1c..0d44ef5cb9be 100644 --- a/src/Access/IAccessStorage.h +++ b/src/Access/IAccessStorage.h @@ -33,6 +33,9 @@ struct AuthResult /// Session settings received from authentication server (if any) SettingsChanges settings{}; AuthenticationData authentication_data {}; + /// Username determined by the access storage during authentication, + /// should be treated as the authenticated user name + String user_name; }; /// Contains entities, i.e. instances of classes derived from IAccessEntity. diff --git a/src/Access/LDAPAccessStorage.cpp b/src/Access/LDAPAccessStorage.cpp index 44f75f72b5fd..939b99396c30 100644 --- a/src/Access/LDAPAccessStorage.cpp +++ b/src/Access/LDAPAccessStorage.cpp @@ -490,7 +490,7 @@ std::optional LDAPAccessStorage::authenticateImpl( } if (id) - return AuthResult{ .user_id = *id, .authentication_data = AuthenticationData(AuthenticationType::LDAP) }; + return AuthResult{ .user_id = *id, .authentication_data = AuthenticationData(AuthenticationType::LDAP), .user_name = credentials.getUserName() }; return std::nullopt; } diff --git a/src/Access/TokenAccessStorage.cpp b/src/Access/TokenAccessStorage.cpp index e17bc7159cef..53a6313aefc7 100644 --- a/src/Access/TokenAccessStorage.cpp +++ b/src/Access/TokenAccessStorage.cpp @@ -583,7 +583,7 @@ std::optional TokenAccessStorage::authenticateImpl( } if (id) - return AuthResult{ .user_id = *id, .authentication_data = AuthenticationData(AuthenticationType::JWT) }; + return AuthResult{ .user_id = *id, .authentication_data = AuthenticationData(AuthenticationType::JWT), .user_name = credentials.getUserName() }; return std::nullopt; } diff --git a/src/Analyzer/Passes/FunctionToSubcolumnsPass.cpp b/src/Analyzer/Passes/FunctionToSubcolumnsPass.cpp index 296f29f9e094..6d26a7839927 100644 --- a/src/Analyzer/Passes/FunctionToSubcolumnsPass.cpp +++ b/src/Analyzer/Passes/FunctionToSubcolumnsPass.cpp @@ -366,6 +366,18 @@ std::map, NodeToSubcolumnTransformer> node_transfor NameAndTypePair column{ctx.column.name + ".null", std::make_shared()}; if (sourceHasColumn(ctx.column_source, column.name) || !canOptimizeToSubcolumn(ctx.column_source, column.name)) return; + + /// For nested Nullable types (e.g. Nullable(Tuple(... Nullable(T) ...))), + /// the .null subcolumn in storage is Nullable(UInt8), not UInt8. + /// Using it with a hardcoded UInt8 type causes a type mismatch at runtime. + if (auto * table_node = ctx.column_source->as()) + { + auto actual = table_node->getStorageSnapshot()->tryGetColumn( + GetColumnsOptions(GetColumnsOptions::All).withRegularSubcolumns(), column.name); + if (actual && actual->type->isNullable()) + return; + } + node = std::make_shared(column, ctx.column_source); }, }, @@ -377,6 +389,16 @@ std::map, NodeToSubcolumnTransformer> node_transfor NameAndTypePair column{ctx.column.name + ".null", std::make_shared()}; if (sourceHasColumn(ctx.column_source, column.name) || !canOptimizeToSubcolumn(ctx.column_source, column.name)) return; + + /// Same guard as isNull above: nested Nullable .null subcolumn may itself be Nullable. + if (auto * table_node = ctx.column_source->as()) + { + auto actual = table_node->getStorageSnapshot()->tryGetColumn( + GetColumnsOptions(GetColumnsOptions::All).withRegularSubcolumns(), column.name); + if (actual && actual->type->isNullable()) + return; + } + auto & function_arguments_nodes = function_node.getArguments().getNodes(); function_arguments_nodes = {std::make_shared(column, ctx.column_source)}; diff --git a/src/Analyzer/Passes/InverseDictionaryLookupPass.cpp b/src/Analyzer/Passes/InverseDictionaryLookupPass.cpp index 6acc7c00da36..bf31d710cc7c 100644 --- a/src/Analyzer/Passes/InverseDictionaryLookupPass.cpp +++ b/src/Analyzer/Passes/InverseDictionaryLookupPass.cpp @@ -15,6 +15,8 @@ #include +#include +#include #include #include @@ -140,6 +142,12 @@ class InverseDictionaryLookupVisitor : public InDepthQueryTreeVisitorWithContext if (getSettings()[Setting::rewrite_in_to_join]) return; + /// This rewrite turns `dictGet(...)` predicates into `IN (SELECT ... FROM dictionary(...))`. + /// The `dictionary()` table function requires `CREATE TEMPORARY TABLE`; if that grant is missing, + /// skip the optimization to avoid `ACCESS_DENIED`. + if (!getContext()->getAccess()->isGranted(AccessType::CREATE_TEMPORARY_TABLE)) + return; + auto * node_function = node->as(); if (!node_function) diff --git a/src/Columns/ColumnConst.h b/src/Columns/ColumnConst.h index 3f4317df2a21..3b2833c71bc8 100644 --- a/src/Columns/ColumnConst.h +++ b/src/Columns/ColumnConst.h @@ -122,11 +122,12 @@ class ColumnConst final : public COWHelper, ColumnCon } #if !defined(DEBUG_OR_SANITIZER_BUILD) - void insertRangeFrom(const IColumn &, size_t /*start*/, size_t length) override + void insertRangeFrom(const IColumn & src, size_t /*start*/, size_t length) override #else - void doInsertRangeFrom(const IColumn &, size_t /*start*/, size_t length) override + void doInsertRangeFrom(const IColumn & src, size_t /*start*/, size_t length) override #endif { + chassert(!typeid_cast(&src) || data->compareAt(0, 0, *typeid_cast(src).data, -1) == 0); s += length; } diff --git a/src/Columns/ColumnDynamic.h b/src/Columns/ColumnDynamic.h index b12107240751..9837acc4f309 100644 --- a/src/Columns/ColumnDynamic.h +++ b/src/Columns/ColumnDynamic.h @@ -228,7 +228,7 @@ class ColumnDynamic final : public COWHelper, Colum ColumnPtr permute(const Permutation & perm, size_t limit) const override { - return create(variant_column_ptr->permute(perm, limit), variant_info, max_dynamic_types, global_max_dynamic_types); + return create(variant_column_ptr->permute(perm, limit), variant_info, max_dynamic_types, global_max_dynamic_types, statistics); } ColumnPtr index(const IColumn & indexes, size_t limit) const override @@ -332,6 +332,12 @@ class ColumnDynamic final : public COWHelper, Colum void forEachSubcolumn(ColumnCallback callback) const override { callback(variant_column); } + /// Dynamic columns manage their own variant_info type metadata. + /// The default convertToFullIfNeeded recurses into subcolumns and strips LowCardinality + /// from variant columns, but cannot update variant_info, creating column/type mismatches. + /// Override to skip recursion — Dynamic is a self-contained typed container. + [[nodiscard]] IColumn::Ptr convertToFullIfNeeded() const override { return getPtr(); } + void forEachMutableSubcolumnRecursively(RecursiveMutableColumnCallback callback) override { callback(*variant_column); diff --git a/src/Columns/ColumnLowCardinality.cpp b/src/Columns/ColumnLowCardinality.cpp index 2a9dfbbb856d..24cc935448b5 100644 --- a/src/Columns/ColumnLowCardinality.cpp +++ b/src/Columns/ColumnLowCardinality.cpp @@ -287,6 +287,11 @@ char * ColumnLowCardinality::serializeValueIntoMemory(size_t n, char * memory, c return getDictionary().serializeValueIntoMemory(getIndexes().getUInt(n), memory, settings); } +std::optional ColumnLowCardinality::getSerializedValueSize(size_t n, const IColumn::SerializationSettings * settings) const +{ + return getDictionary().getSerializedValueSize(getIndexes().getUInt(n), settings); +} + void ColumnLowCardinality::collectSerializedValueSizes(PaddedPODArray & sizes, const UInt8 * is_null, const IColumn::SerializationSettings * settings) const { /// nullable is handled internally. diff --git a/src/Columns/ColumnLowCardinality.h b/src/Columns/ColumnLowCardinality.h index af56ab0a994f..3faf52e8a3ed 100644 --- a/src/Columns/ColumnLowCardinality.h +++ b/src/Columns/ColumnLowCardinality.h @@ -105,6 +105,8 @@ class ColumnLowCardinality final : public COWHelper getSerializedValueSize(size_t n, const IColumn::SerializationSettings * settings) const override; + void collectSerializedValueSizes(PaddedPODArray & sizes, const UInt8 * is_null, const IColumn::SerializationSettings * settings) const override; void deserializeAndInsertFromArena(ReadBuffer & in, const IColumn::SerializationSettings * settings) override; diff --git a/src/Columns/ColumnObject.cpp b/src/Columns/ColumnObject.cpp index 425fd6a1ac0d..f4cca256e403 100644 --- a/src/Columns/ColumnObject.cpp +++ b/src/Columns/ColumnObject.cpp @@ -1213,7 +1213,7 @@ ColumnPtr ColumnObject::permute(const Permutation & perm, size_t limit) const permuted_dynamic_paths[path] = column->permute(perm, limit); auto permuted_shared_data = shared_data->permute(perm, limit); - return ColumnObject::create(permuted_typed_paths, permuted_dynamic_paths, permuted_shared_data, max_dynamic_paths, global_max_dynamic_paths, max_dynamic_types); + return ColumnObject::create(permuted_typed_paths, permuted_dynamic_paths, permuted_shared_data, max_dynamic_paths, global_max_dynamic_paths, max_dynamic_types, statistics); } ColumnPtr ColumnObject::index(const IColumn & indexes, size_t limit) const diff --git a/src/Columns/ColumnUnique.h b/src/Columns/ColumnUnique.h index 66538d49a024..749f0ba0b844 100644 --- a/src/Columns/ColumnUnique.h +++ b/src/Columns/ColumnUnique.h @@ -90,6 +90,7 @@ class ColumnUnique final : public COWHelpergetBool(n); } bool isNullAt(size_t n) const override { return is_nullable && n == getNullValueIndex(); } void collectSerializedValueSizes(PaddedPODArray & sizes, const UInt8 * is_null, const IColumn::SerializationSettings * settings) const override; + std::optional getSerializedValueSize(size_t n, const IColumn::SerializationSettings * settings) const override; std::string_view serializeValueIntoArena(size_t n, Arena & arena, char const *& begin, const IColumn::SerializationSettings * settings) const override; char * serializeValueIntoMemory(size_t n, char * memory, const IColumn::SerializationSettings * settings) const override; void skipSerializedInArena(ReadBuffer & in) const override; @@ -456,6 +457,22 @@ void ColumnUnique::collectSerializedValueSizes(PaddedPODArraycollectSerializedValueSizes(sizes, nullptr, settings); } +template +std::optional ColumnUnique::getSerializedValueSize( + size_t n, const IColumn::SerializationSettings * settings) const +{ + if (is_nullable) + { + if (n == getNullValueIndex()) + return 1; + auto nested_size = column_holder->getSerializedValueSize(n, settings); + if (!nested_size) + return std::nullopt; + return 1 + *nested_size; + } + return column_holder->getSerializedValueSize(n, settings); +} + template std::string_view ColumnUnique::serializeValueIntoArena( size_t n, Arena & arena, char const *& begin, const IColumn::SerializationSettings * settings) const diff --git a/src/Columns/ColumnVariant.cpp b/src/Columns/ColumnVariant.cpp index ba6205467842..7f69887c0f3c 100644 --- a/src/Columns/ColumnVariant.cpp +++ b/src/Columns/ColumnVariant.cpp @@ -215,6 +215,8 @@ ColumnVariant::ColumnVariant(DB::MutableColumnPtr local_discriminators_, DB::Mut global_to_local_discriminators[local_to_global_discriminators[i]] = i; } } + + validateState(); } namespace @@ -1840,5 +1842,35 @@ void ColumnVariant::fixDynamicStructure() variant->fixDynamicStructure(); } +void ColumnVariant::validateState() const +{ + const auto & local_discriminators_data = getLocalDiscriminators(); + const auto & offsets_data = getOffsets(); + if (local_discriminators_data.size() != offsets_data.size()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of discriminators and offsets should be equal, but {} and {} were given", local_discriminators_data.size(), offsets_data.size()); + + std::vector actual_variant_sizes(variants.size()); + for (size_t i = 0; i != variants.size(); ++i) + actual_variant_sizes[i] = variants[i]->size(); + + std::vector expected_variant_sizes(variants.size(), 0); + for (size_t i = 0; i != local_discriminators_data.size(); ++i) + { + auto local_discr = local_discriminators_data[i]; + if (local_discr != NULL_DISCRIMINATOR) + { + ++expected_variant_sizes[local_discr]; + if (offsets_data[i] >= actual_variant_sizes[local_discr]) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Offset at position {} is {}, but variant {} ({}) has size {}", i, offsets_data[i], static_cast(local_discr), variants[local_discr]->getName(), variants[local_discr]->size()); + } + } + + for (size_t i = 0; i != variants.size(); ++i) + { + if (variants[i]->size() != expected_variant_sizes[i]) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Variant {} ({}) has size {}, but expected {}", i, variants[i]->getName(), variants[i]->size(), expected_variant_sizes[i]); + } +} + } diff --git a/src/Columns/ColumnVariant.h b/src/Columns/ColumnVariant.h index e61790f79ce3..958ff939205b 100644 --- a/src/Columns/ColumnVariant.h +++ b/src/Columns/ColumnVariant.h @@ -260,6 +260,12 @@ class ColumnVariant final : public COWHelper, Colum void forEachMutableSubcolumnRecursively(RecursiveMutableColumnCallback callback) override; void forEachSubcolumn(ColumnCallback callback) const override; void forEachSubcolumnRecursively(RecursiveColumnCallback callback) const override; + + /// Variant columns pair variant sub-columns with DataTypeVariant's sorted type list. + /// The default convertToFullIfNeeded recurses into sub-columns and strips LowCardinality + /// from variant columns, but cannot update the corresponding DataTypeVariant, creating + /// column/type position mismatches. Override to skip recursion. + [[nodiscard]] IColumn::Ptr convertToFullIfNeeded() const override { return getPtr(); } bool structureEquals(const IColumn & rhs) const override; ColumnPtr compress(bool force_compression) const override; double getRatioOfDefaultRows(double sample_ratio) const override; @@ -348,6 +354,8 @@ class ColumnVariant final : public COWHelper, Colum void takeDynamicStructureFromColumn(const ColumnPtr & source_column) override; void fixDynamicStructure() override; + void validateState() const; + private: void insertFromImpl(const IColumn & src_, size_t n, const std::vector * global_discriminators_mapping); void insertRangeFromImpl(const IColumn & src_, size_t start, size_t length, const std::vector * global_discriminators_mapping, const Discriminator * skip_discriminator); diff --git a/src/Columns/IColumn.h b/src/Columns/IColumn.h index 6707ad12d2d0..030c8e223fa9 100644 --- a/src/Columns/IColumn.h +++ b/src/Columns/IColumn.h @@ -130,7 +130,32 @@ class IColumn : public COW [[nodiscard]] virtual Ptr convertToFullIfNeeded() const { - return convertToFullColumnIfConst()->convertToFullColumnIfReplicated()->convertToFullColumnIfSparse()->convertToFullColumnIfLowCardinality(); + Ptr converted = convertToFullColumnIfConst() + ->convertToFullColumnIfReplicated() + ->convertToFullColumnIfSparse() + ->convertToFullColumnIfLowCardinality(); + + Columns new_subcolumns; + bool any_changed = false; + + converted->forEachSubcolumn([&](const WrappedPtr & subcolumn) + { + auto new_sub = subcolumn->convertToFullIfNeeded(); + any_changed |= (new_sub.get() != subcolumn.get()); + new_subcolumns.push_back(std::move(new_sub)); + }); + + if (!any_changed) + return converted; + + auto mutable_column = IColumn::mutate(std::move(converted)); + size_t i = 0; + mutable_column->forEachMutableSubcolumn([&](WrappedPtr & subcolumn) + { + subcolumn = std::move(new_subcolumns[i++]); + }); + + return std::move(mutable_column); } /// Creates empty column with the same type. diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/src/Common/ZooKeeper/ZooKeeperImpl.cpp index 8d13fcc20143..bad8d1df3882 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -809,6 +809,15 @@ void ZooKeeper::sendThread() HistogramMetrics::KeeperClientQueueDuration, std::chrono::duration_cast(dequeue_ts - info.request->enqueue_ts).count()); + if (info.watch) + info.request->has_watch = true; + + if (info.request->add_root_path) + info.request->addRootPath(args.chroot); + + /// Insert into operations AFTER mutating the request (has_watch, addRootPath) + /// to avoid a data race: receiveThread reads from operations concurrently, + /// and the request object is shared via shared_ptr. if (info.request->xid != close_xid) { CurrentMetrics::add(CurrentMetrics::ZooKeeperRequest); @@ -817,17 +826,11 @@ void ZooKeeper::sendThread() operations[info.request->xid] = info; } - if (info.watch) - info.request->has_watch = true; - if (requests_queue.isFinished()) { break; } - if (info.request->add_root_path) - info.request->addRootPath(args.chroot); - info.request->probably_sent = true; info.request->write(getWriteBuffer(), use_xid_64); flushWriteBuffer(); diff --git a/src/Common/setThreadName.h b/src/Common/setThreadName.h index 7c0ad6a6ce61..9bfa3e407737 100644 --- a/src/Common/setThreadName.h +++ b/src/Common/setThreadName.h @@ -65,6 +65,7 @@ namespace DB M(HASHED_DICT_DTOR, "HashedDictDtor") \ M(HASHED_DICT_LOAD, "HashedDictLoad") \ M(HTTP_HANDLER, "HTTPHandler") \ + M(ICEBERG_ITERATOR, "IcebergIter") \ M(INTERSERVER_HANDLER, "IntersrvHandler") \ M(IO_URING_MONITOR, "IoUringMonitr") \ M(KEEPER_HANDLER, "KeeperHandler") \ diff --git a/src/Coordination/KeeperContext.cpp b/src/Coordination/KeeperContext.cpp index b007b0b849c2..0e8e2b559cf8 100644 --- a/src/Coordination/KeeperContext.cpp +++ b/src/Coordination/KeeperContext.cpp @@ -204,7 +204,7 @@ bool diskValidator(const Poco::Util::AbstractConfiguration & config, const std:: { "s3"sv, "s3_plain"sv, - "local"sv + "local"sv, }; if (std::all_of( diff --git a/src/Core/BaseSettings.h b/src/Core/BaseSettings.h index ced47d431b66..c1006a34571e 100644 --- a/src/Core/BaseSettings.h +++ b/src/Core/BaseSettings.h @@ -605,6 +605,10 @@ void BaseSettings::readBinary(ReadBuffer & in) size_t index = accessor.find(name); std::ignore = BaseSettingsHelpers::readFlags(in); + + if (index == static_cast(-1)) + BaseSettingsHelpers::throwSettingNotFound(name); + accessor.readBinary(*this, index, in); } } diff --git a/src/DataTypes/Serializations/SerializationObject.cpp b/src/DataTypes/Serializations/SerializationObject.cpp index 58546a7cf731..865575e8a4f3 100644 --- a/src/DataTypes/Serializations/SerializationObject.cpp +++ b/src/DataTypes/Serializations/SerializationObject.cpp @@ -227,8 +227,10 @@ void SerializationObject::enumerateStreams(EnumerateStreamsSettings & settings, { shared_data_serialization_version = SerializationObjectSharedData::SerializationVersion(settings.object_shared_data_serialization_version); /// Avoid creating buckets in shared data for Wide part if shared data is empty. - if (settings.data_part_type != MergeTreeDataPartType::Wide || !column_object->getStatistics() || !column_object->getStatistics()->shared_data_paths_statistics.empty()) + if (settings.data_part_type != MergeTreeDataPartType::Wide || !column_object->getStatistics() + || !column_object->getStatistics()->shared_data_paths_statistics.empty()) num_buckets = settings.object_shared_data_buckets; + } shared_data_serialization = std::make_shared(shared_data_serialization_version, dynamic_type, num_buckets); diff --git a/src/DataTypes/Serializations/SerializationObjectSharedData.cpp b/src/DataTypes/Serializations/SerializationObjectSharedData.cpp index b56c2807684b..98fd4b4ed8c1 100644 --- a/src/DataTypes/Serializations/SerializationObjectSharedData.cpp +++ b/src/DataTypes/Serializations/SerializationObjectSharedData.cpp @@ -684,10 +684,10 @@ std::shared_ptr SerializationO structure_state.last_granule_structure.clear(); size_t rows_to_read = limit + rows_offset; - StructureGranule current_granule; - std::swap(structure_state.last_granule_structure, current_granule); while (rows_to_read != 0) { + auto & current_granule = structure_state.last_granule_structure; + /// Calculate remaining rows in current granule that can be read. size_t remaining_rows_in_granule = current_granule.num_rows - current_granule.limit - current_granule.offset; @@ -738,12 +738,7 @@ std::shared_ptr SerializationO } result->push_back(current_granule); - current_granule.clear(); } - - /// Remember the state of the last read granule because it can be partially read. - if (!result->empty()) - structure_state.last_granule_structure = result->back(); } /// Add deserialized data into cache. diff --git a/src/DataTypes/Serializations/SerializationSparse.cpp b/src/DataTypes/Serializations/SerializationSparse.cpp index 2aa2755936c8..9aa2322c52ae 100644 --- a/src/DataTypes/Serializations/SerializationSparse.cpp +++ b/src/DataTypes/Serializations/SerializationSparse.cpp @@ -402,8 +402,15 @@ void SerializationSparse::deserializeBinaryBulkWithMultipleStreams( auto & values_column = column_sparse.getValuesPtr(); settings.path.push_back(Substream::SparseElements); + /// We cannot use column from substream cache during deserialization of sparse values column, because + /// sparse values column must always contain default value at the first row that is added during ColumnSparse + /// creation. Using column from substream cache will lead to loss of this value and unexpected column size. + /// So, we should set insert_only_rows_in_current_range_from_substreams_cache flag to true + /// to insert only rows in current range from substream cache instead of using the whole cached column if any. + auto values_settings = settings; + values_settings.insert_only_rows_in_current_range_from_substreams_cache = true; nested->deserializeBinaryBulkWithMultipleStreams( - values_column, skipped_values_rows, num_read_offsets, settings, state_sparse->nested, cache); + values_column, skipped_values_rows, num_read_offsets, values_settings, state_sparse->nested, cache); settings.path.pop_back(); if (offsets_column->size() + 1 != values_column->size()) diff --git a/src/DataTypes/Serializations/SerializationStringSize.cpp b/src/DataTypes/Serializations/SerializationStringSize.cpp index b4f7538c250f..c5920e54ddf0 100644 --- a/src/DataTypes/Serializations/SerializationStringSize.cpp +++ b/src/DataTypes/Serializations/SerializationStringSize.cpp @@ -63,14 +63,14 @@ void SerializationStringSize::deserializeBinaryBulkStatePrefix( { auto string_state = std::make_shared(); - /// If there is no state cache (e.g. StorageLog), we must always read the full string data. Without cached - /// state, we cannot know in advance whether the string data will be needed later, and the string size has - /// to be derived from the data itself. + /// Without a states cache (e.g. StorageLog) we must always read the full + /// string data, because the state is not shared with SerializationString + /// and we cannot know whether the full column will also be read. /// - /// As a result, the subsequent deserialization relies on the substream cache to correctly share the string - /// data across subcolumns. We do not support an optimization that deserializes only the size substream in - /// this case, and therefore we must always populate the substream cache with the string data rather than - /// the size-only substream. + /// With a states cache (MergeTree), we default to sizes-only reading. + /// If SerializationString also reads this column, its + /// deserializeBinaryBulkStatePrefix will find this shared state and + /// upgrade need_string_data to true. if (!cache) string_state->need_string_data = true; state = string_state; @@ -125,7 +125,15 @@ void SerializationStringSize::deserializeWithStringData( serialization_string.deserializeBinaryBulk(*string_state.column->assumeMutable(), *stream, rows_offset, limit, avg_value_size_hint); num_read_rows = string_state.column->size() - prev_size; - addColumnWithNumReadRowsToSubstreamsCache(cache, settings.path, string_state.column, num_read_rows); + + /// Cache only the current range's data, not the entire accumulated column. + /// string_state.column accumulates data across marks (it is persistent state), + /// so on mark 1+ it contains elements from all previous marks plus the current one. + /// If we cache the full accumulated column with num_read_rows < column->size(), + /// insertDataFromCachedColumn will see the size mismatch and replace the result + /// column entirely (e.g. ColumnSparse's values), breaking invariants. + auto column_for_cache = string_state.column->cut(prev_size, num_read_rows); + addColumnWithNumReadRowsToSubstreamsCache(cache, settings.path, column_for_cache, num_read_rows); if (settings.update_avg_value_size_hint_callback) settings.update_avg_value_size_hint_callback(settings.path, *string_state.column); diff --git a/src/DataTypes/Serializations/SerializationVariant.cpp b/src/DataTypes/Serializations/SerializationVariant.cpp index 673652f62033..5689c25bbba0 100644 --- a/src/DataTypes/Serializations/SerializationVariant.cpp +++ b/src/DataTypes/Serializations/SerializationVariant.cpp @@ -259,6 +259,9 @@ void SerializationVariant::serializeBinaryBulkWithMultipleStreamsAndUpdateVarian size_t & total_size_of_variants) const { const ColumnVariant & col = assert_cast(column); + if (offset == 0) + col.validateState(); + if (const size_t size = col.size(); limit == 0 || offset + limit > size) limit = size - offset; @@ -317,9 +320,17 @@ void SerializationVariant::serializeBinaryBulkWithMultipleStreamsAndUpdateVarian addVariantElementToPath(settings.path, i); /// We can use the same offset/limit as for whole Variant column if (i == non_empty_global_discr) - variant_serializations[i]->serializeBinaryBulkWithMultipleStreams(col.getVariantByGlobalDiscriminator(i), offset, limit, settings, variant_state->variant_states[i]); + { + const auto & variant_column = col.getVariantByGlobalDiscriminator(i); + if (variant_column.size() < offset + limit) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Variant {} has less rows ({}) than expected rows to serialize ({})", variant_names[i], variant_column.size(), offset + limit); + + variant_serializations[i]->serializeBinaryBulkWithMultipleStreams(variant_column, offset, limit, settings, variant_state->variant_states[i]); + } else + { variant_serializations[i]->serializeBinaryBulkWithMultipleStreams(col.getVariantByGlobalDiscriminator(i), col.getVariantByGlobalDiscriminator(i).size(), 0, settings, variant_state->variant_states[i]); + } settings.path.pop_back(); } variants_statistics[variant_names[non_empty_global_discr]] += limit; @@ -444,6 +455,10 @@ void SerializationVariant::serializeBinaryBulkWithMultipleStreamsAndUpdateVarian settings.path.push_back(Substream::VariantElements); for (size_t i = 0; i != variant_serializations.size(); ++i) { + const auto & variant_column = col.getVariantByGlobalDiscriminator(i); + if (variant_column.size() < variant_offsets_and_limits[i].first + variant_offsets_and_limits[i].second) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Variant {} has less rows ({}) than expected rows to serialize ({})", variant_names[i], variant_column.size(), variant_offsets_and_limits[i].first + variant_offsets_and_limits[i].second); + addVariantElementToPath(settings.path, i); variant_serializations[i]->serializeBinaryBulkWithMultipleStreams( col.getVariantByGlobalDiscriminator(i), @@ -648,6 +663,9 @@ void SerializationVariant::deserializeBinaryBulkWithMultipleStreams( last_non_empty_discr = i; } + if (col.getVariantByLocalDiscriminator(i).size() < variant_limits[i]) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of variant {} is expected to be not less than {} according to discriminators, but it is {}", variant_names[i], variant_limits[i], col.getVariantByLocalDiscriminator(i).size()); + variant_offsets.push_back(col.getVariantByLocalDiscriminator(i).size() - variant_limits[i]); } @@ -685,6 +703,8 @@ void SerializationVariant::deserializeBinaryBulkWithMultipleStreams( addColumnWithNumReadRowsToSubstreamsCache(cache, settings.path, col.getOffsetsPtr(), col.getOffsetsPtr()->size() - prev_size); } settings.path.pop_back(); + + col.validateState(); } std::pair, std::vector> SerializationVariant::deserializeCompactDiscriminators( diff --git a/src/Disks/DiskObjectStorage/MetadataStorages/Plain/MetadataStorageFromPlainObjectStorage.cpp b/src/Disks/DiskObjectStorage/MetadataStorages/Plain/MetadataStorageFromPlainObjectStorage.cpp index 74143b276188..b15a201d087d 100644 --- a/src/Disks/DiskObjectStorage/MetadataStorages/Plain/MetadataStorageFromPlainObjectStorage.cpp +++ b/src/Disks/DiskObjectStorage/MetadataStorages/Plain/MetadataStorageFromPlainObjectStorage.cpp @@ -114,7 +114,7 @@ std::vector MetadataStorageFromPlainObjectStorage::listDirectory(co RelativePathsWithMetadata files; std::string absolute_key = key_prefix; - if (!absolute_key.ends_with('/')) + if (!absolute_key.empty() && !absolute_key.ends_with('/')) absolute_key += '/'; object_storage->listObjects(absolute_key, files, 0); diff --git a/src/Disks/DiskObjectStorage/ObjectStorages/AzureBlobStorage/AzureBlobStorageCommon.cpp b/src/Disks/DiskObjectStorage/ObjectStorages/AzureBlobStorage/AzureBlobStorageCommon.cpp index 2518369a2574..f9b6bf9ff454 100644 --- a/src/Disks/DiskObjectStorage/ObjectStorages/AzureBlobStorage/AzureBlobStorageCommon.cpp +++ b/src/Disks/DiskObjectStorage/ObjectStorages/AzureBlobStorage/AzureBlobStorageCommon.cpp @@ -18,7 +18,6 @@ #include #include #include -#include #include #include @@ -44,8 +43,6 @@ namespace ProfileEvents extern const Event DiskAzurePutRequestThrottlerSleepMicroseconds; } -namespace fs = std::filesystem; - namespace DB { @@ -142,16 +139,18 @@ static std::shared_ptr getManagedIde ContainerClientWrapper::ContainerClientWrapper(RawContainerClient client_, String blob_prefix_) : client(std::move(client_)), blob_prefix(std::move(blob_prefix_)) { + if (!blob_prefix.empty() && !blob_prefix.ends_with('/')) + blob_prefix += '/'; } BlobClient ContainerClientWrapper::GetBlobClient(const String & blob_name) const { - return client.GetBlobClient(blob_prefix / blob_name); + return client.GetBlobClient(blob_prefix + blob_name); } BlockBlobClient ContainerClientWrapper::GetBlockBlobClient(const String & blob_name) const { - return client.GetBlockBlobClient(blob_prefix / blob_name); + return client.GetBlockBlobClient(blob_prefix + blob_name); } BlobContainerPropertiesRespones ContainerClientWrapper::GetProperties() const @@ -162,17 +161,16 @@ BlobContainerPropertiesRespones ContainerClientWrapper::GetProperties() const ListBlobsPagedResponse ContainerClientWrapper::ListBlobs(const ListBlobsOptions & options) const { auto new_options = options; - new_options.Prefix = blob_prefix / options.Prefix.ValueOr(""); + new_options.Prefix = blob_prefix + options.Prefix.ValueOr(""); auto response = client.ListBlobs(new_options); - String blob_prefix_str = blob_prefix / ""; for (auto & blob : response.Blobs) { - if (!blob.Name.starts_with(blob_prefix_str)) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected prefix '{}' in blob name '{}'", blob_prefix_str, blob.Name); + if (!blob.Name.starts_with(blob_prefix)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected prefix '{}' in blob name '{}'", blob_prefix, blob.Name); - blob.Name = blob.Name.substr(blob_prefix_str.size()); + blob.Name = blob.Name.substr(blob_prefix.size()); } return response; @@ -490,7 +488,9 @@ Endpoint processEndpoint(const Poco::Util::AbstractConfiguration & config, const if (config.has(config_prefix + ".endpoint_subpath")) { String endpoint_subpath = config.getString(config_prefix + ".endpoint_subpath"); - prefix = fs::path(prefix) / endpoint_subpath; + if (!prefix.empty() && !prefix.ends_with('/')) + prefix += '/'; + prefix += endpoint_subpath; } } else if (config.has(config_prefix + ".connection_string")) diff --git a/src/Disks/DiskObjectStorage/ObjectStorages/AzureBlobStorage/AzureBlobStorageCommon.h b/src/Disks/DiskObjectStorage/ObjectStorages/AzureBlobStorage/AzureBlobStorageCommon.h index c90aece5008b..a2e681c797e3 100644 --- a/src/Disks/DiskObjectStorage/ObjectStorages/AzureBlobStorage/AzureBlobStorageCommon.h +++ b/src/Disks/DiskObjectStorage/ObjectStorages/AzureBlobStorage/AzureBlobStorageCommon.h @@ -130,7 +130,7 @@ class ContainerClientWrapper private: RawContainerClient client; - fs::path blob_prefix; + String blob_prefix; }; using ContainerClient = ContainerClientWrapper; diff --git a/src/Formats/JSONExtractTree.cpp b/src/Formats/JSONExtractTree.cpp index 5ba02597d1f5..86f37c5b6e72 100644 --- a/src/Formats/JSONExtractTree.cpp +++ b/src/Formats/JSONExtractTree.cpp @@ -1855,7 +1855,7 @@ class ObjectJSONNode : public JSONExtractTreeNode if (shouldSkipPath(current_path, insert_settings)) return true; - if (element.isObject() && !typed_path_nodes.contains(current_path)) + if (element.isObject() && (!typed_path_nodes.contains(current_path) || (format_settings.json.type_json_allow_duplicated_key_with_literal_and_nested_object && hasTypedPathWithPrefix(current_path + ".")))) { std::unordered_map> visited_keys; for (auto [key, value] : element.getObject()) @@ -1915,7 +1915,7 @@ class ObjectJSONNode : public JSONExtractTreeNode { if (!format_settings.json.type_json_skip_duplicated_paths) { - error = fmt::format("Duplicate path found during parsing JSON object: {}. You can enable setting type_json_skip_duplicated_paths to skip duplicated paths during insert", current_path); + error = fmt::format("Duplicate path found during parsing JSON object: {}. You can enable setting type_json_skip_duplicated_paths to skip duplicated paths during insert or setting type_json_allow_duplicated_key_with_literal_and_nested_object to allow duplicated path with literal and nested object", current_path); return false; } } @@ -2025,6 +2025,17 @@ class ObjectJSONNode : public JSONExtractTreeNode return settings; } + bool hasTypedPathWithPrefix(const String & prefix) const + { + for (const auto & [path, _] : typed_paths_types) + { + if (path.starts_with(prefix)) + return true; + } + + return false; + } + std::unordered_map typed_paths_types; std::unordered_map>> typed_path_nodes; std::unordered_set paths_to_skip; diff --git a/src/Functions/FunctionsConversion.h b/src/Functions/FunctionsConversion.h index 6edfb4620c40..0f8063bbaaa5 100644 --- a/src/Functions/FunctionsConversion.h +++ b/src/Functions/FunctionsConversion.h @@ -4934,10 +4934,6 @@ class FunctionCast final : public IFunctionBase using Word = std::conditional_t>; ColumnPtr src_col = arguments.front().column; - - if (nullable_source) - src_col = nullable_source->getNestedColumnPtr(); - const auto * col_array = checkAndGetColumn(src_col.get()); if (!col_array) @@ -4950,15 +4946,19 @@ class FunctionCast final : public IFunctionBase const size_t bytes_per_fixedstring = DataTypeQBit::bitsToBytes(n); const size_t padded_dimension = bytes_per_fixedstring * 8; - /// Verify array size matches expected QBit size + /// Use the null map to skip NULL rows — their nested arrays may have default (empty) values + /// that don't match the expected dimension, but the result will be masked by NULL anyway. + const NullMap * null_map = nullable_source ? &nullable_source->getNullMapData() : nullptr; + + /// Verify array size matches expected QBit size (skip NULL rows) size_t prev_offset = 0; - for (auto off : offsets) + for (size_t row = 0; row < arrays_count; ++row) { - size_t array_size = off - prev_offset; - if (array_size != n) + size_t array_size = offsets[row] - prev_offset; + if (!(null_map && (*null_map)[row]) && array_size != n) throw Exception( ErrorCodes::SIZES_OF_ARRAYS_DONT_MATCH, "Array arguments must have size {} for QBit conversion, got {}", n, array_size); - prev_offset = off; + prev_offset = offsets[row]; } /// Handle empty input column @@ -4982,8 +4982,19 @@ class FunctionCast final : public IFunctionBase } prev_offset = 0; - for (auto off : offsets) + for (size_t row = 0; row < arrays_count; ++row) { + auto off = offsets[row]; + + /// For NULL rows, insert default (zero) values — the result will be masked by NULL. + if (null_map && (*null_map)[row]) + { + for (size_t j = 0; j < size; ++j) + assert_cast(*tuple_columns[j]).insertDefault(); + prev_offset = off; + continue; + } + /// Insert default values for each FixedString column and keep pointers to them std::vector row_ptrs(size); for (size_t j = 0; j < size; ++j) @@ -5033,11 +5044,16 @@ class FunctionCast final : public IFunctionBase { const auto & col_array = assert_cast(*arguments.front().column); + /// Don't propagate nullable_source into array elements — the inner data column + /// has a different size (total elements vs. number of rows), and the original + /// nullable_source column may have a different type than the converted column. ColumnsWithTypeAndName nested_columns{{col_array.getDataPtr(), from_nested_type, ""}}; - auto converted_nested = nested_function(nested_columns, to_nested_type, nullable_source, nested_columns.front().column->size()); + auto converted_nested = nested_function(nested_columns, to_nested_type, nullptr, nested_columns.front().column->size()); auto converted_array = ColumnArray::create(converted_nested, col_array.getOffsetsPtr()); ColumnsWithTypeAndName converted_arguments{{std::move(converted_array), std::make_shared(to_nested_type), ""}}; + /// Pass nullable_source so that convertArrayToQBit can use the null map + /// to skip NULL rows (whose nested arrays may have default/empty values). return convertArrayToQBit(converted_arguments, result_type, nullable_source, dimension, element_size); }; } diff --git a/src/Functions/concat.cpp b/src/Functions/concat.cpp index 0e168240bdb3..46a2df3cd441 100644 --- a/src/Functions/concat.cpp +++ b/src/Functions/concat.cpp @@ -141,8 +141,13 @@ class ConcatImpl : public IFunction } else { - /// A non-String/non-FixedString-type argument: use the default serialization to convert it to String - auto full_column = column->convertToFullIfNeeded(); + /// A non-String/non-FixedString-type argument: use the default serialization to convert it to String. + /// Only strip top-level wrappers (Const, Sparse, LowCardinality) without recursing into subcolumns. + /// Using the recursive convertToFullIfNeeded would strip LowCardinality from inside + /// compound types like Variant while the type is not updated, creating a type/column mismatch. + auto full_column = column->convertToFullColumnIfConst() + ->convertToFullColumnIfSparse() + ->convertToFullColumnIfLowCardinality(); auto serialization = arguments[i].type->getDefaultSerialization(); auto converted_col_str = ColumnString::create(); ColumnStringHelpers::WriteHelper write_helper(*converted_col_str, column->size()); diff --git a/src/Functions/concatWithSeparator.cpp b/src/Functions/concatWithSeparator.cpp index ae0430efffbf..d9e4172d65f8 100644 --- a/src/Functions/concatWithSeparator.cpp +++ b/src/Functions/concatWithSeparator.cpp @@ -132,8 +132,12 @@ class ConcatWithSeparatorImpl : public IFunction } else { - /// A non-String/non-FixedString-type argument: use the default serialization to convert it to String - auto full_column = column->convertToFullIfNeeded(); + /// A non-String/non-FixedString-type argument: use the default serialization to convert it to String. + /// Only strip top-level wrappers (Const, Sparse, LowCardinality) without recursing into subcolumns. + /// Using the recursive convertToFullIfNeeded would strip LowCardinality from inside + /// compound types like Variant while the type is not updated, creating a type/column mismatch. + auto full_column + = column->convertToFullColumnIfConst()->convertToFullColumnIfSparse()->convertToFullColumnIfLowCardinality(); chassert(full_column->size() == input_rows_count); diff --git a/src/Functions/format.cpp b/src/Functions/format.cpp index 68ee19507ef8..346b016f123f 100644 --- a/src/Functions/format.cpp +++ b/src/Functions/format.cpp @@ -100,8 +100,12 @@ class FormatFunction : public IFunction } else { - /// A non-String/non-FixedString-type argument: use the default serialization to convert it to String - auto full_column = column->convertToFullIfNeeded(); + /// A non-String/non-FixedString-type argument: use the default serialization to convert it to String. + /// Only strip top-level wrappers (Const, Sparse, LowCardinality) without recursing into subcolumns. + /// Using the recursive convertToFullIfNeeded would strip LowCardinality from inside + /// compound types like Variant while the type is not updated, creating a type/column mismatch. + auto full_column + = column->convertToFullColumnIfConst()->convertToFullColumnIfSparse()->convertToFullColumnIfLowCardinality(); auto serialization = arguments[i].type->getDefaultSerialization(); auto converted_col_str = ColumnString::create(); ColumnStringHelpers::WriteHelper write_helper(*converted_col_str, column->size()); diff --git a/src/Functions/icebergBucketTransform.cpp b/src/Functions/icebergBucketTransform.cpp index bdf9290e7466..7756e690b392 100644 --- a/src/Functions/icebergBucketTransform.cpp +++ b/src/Functions/icebergBucketTransform.cpp @@ -75,7 +75,7 @@ class FunctionIcebergHash : public IFunction WhichDataType which(type); - if (isBool(type) || which.isInteger() || which.isDate()) + if (isBool(type) || which.isInteger() || which.isDate32() || which.isDate()) { for (size_t i = 0; i < input_rows_count; ++i) { diff --git a/src/Interpreters/ActionsDAG.cpp b/src/Interpreters/ActionsDAG.cpp index 04a190a863b5..f15b02100f91 100644 --- a/src/Interpreters/ActionsDAG.cpp +++ b/src/Interpreters/ActionsDAG.cpp @@ -923,6 +923,9 @@ static ColumnWithTypeAndName executeActionForPartialResult( case ActionsDAG::ActionType::ARRAY_JOIN: { auto key = arguments.at(0); + if (!key.column) + break; + key.column = key.column->convertToFullColumnIfConst(); const auto * array = getArrayJoinColumnRawPtr(key.column); diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 091b71716579..bc75c583fc30 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -1367,6 +1367,36 @@ String Context::getFilesystemCacheUser() const return shared->filesystem_cache_user; } +DatabaseAndTable Context::getOrCacheStorage(const StorageID & id, std::function storage_getter, std::optional * exception) const +{ + auto & shard = storage_cache.shards[StorageCache::shardIndex(id)]; + std::lock_guard lock(shard.mutex); + + if (auto it = shard.set.find(id); it != shard.set.end()) + { + DatabaseAndTable storage = DatabaseCatalog::instance().tryGetByUUID(it->uuid); + if (exception && !storage.second) + exception->emplace(Exception( + ErrorCodes::UNKNOWN_TABLE, + "Table {} does not exist anymore - maybe it was dropped", + id.getNameForLogs())); + return storage; + } + + auto storage = storage_getter(); + + if (storage.second) + { + const auto & new_id = storage.second->getStorageID(); + if (new_id.hasUUID()) + { + shard.set.insert(new_id); + } + } + + return storage; +} + std::unordered_map Context::getWarnings() const { std::unordered_map common_warnings; diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 10d7cd3d2a4f..00b4d8b9fb85 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -293,6 +293,13 @@ struct IRuntimeFilterLookup; using RuntimeFilterLookupPtr = std::shared_ptr; RuntimeFilterLookupPtr createRuntimeFilterLookup(); +class QueryMetadataCache; +using QueryMetadataCachePtr = std::shared_ptr; +using QueryMetadataCacheWeakPtr = std::weak_ptr; + +using DatabasePtr = std::shared_ptr; +using DatabaseAndTable = std::pair; + /// An empty interface for an arbitrary object that may be attached by a shared pointer /// to query context, when using ClickHouse as a library. struct IHostContext @@ -556,6 +563,29 @@ class ContextData /// mutation tasks of one mutation executed against different parts of the same table. PreparedSetsCachePtr prepared_sets_cache; + struct StorageCache + { + static constexpr size_t NumShards = 6; + + struct Shard + { + std::mutex mutex; + std::unordered_set< + StorageID, + StorageID::DatabaseAndTableNameHash, + StorageID::DatabaseAndTableNameEqual + > set; + }; + + std::array shards; + + static size_t shardIndex(const StorageID & id) + { + return StorageID::DatabaseAndTableNameHash{}(id) & (NumShards - 1); + } + }; + + mutable StorageCache storage_cache; /// Cache for reverse lookups of serialized dictionary keys used in `dictGetKeys` function. /// This is a per query cache and not shared across queries. mutable ReverseLookupCachePtr reverse_lookup_cache; @@ -679,6 +709,8 @@ class Context: public ContextData, public std::enable_shared_from_this String getFilesystemCachesPath() const; String getFilesystemCacheUser() const; + DatabaseAndTable getOrCacheStorage(const StorageID & id, std::function storage_getter, std::optional * exception) const; + // Get the disk used by databases to store metadata files. std::shared_ptr getDatabaseDisk() const; diff --git a/src/Interpreters/DatabaseCatalog.cpp b/src/Interpreters/DatabaseCatalog.cpp index 118c26084fd3..f88615ca8429 100644 --- a/src/Interpreters/DatabaseCatalog.cpp +++ b/src/Interpreters/DatabaseCatalog.cpp @@ -83,6 +83,7 @@ namespace ErrorCodes namespace Setting { extern const SettingsBool fsync_metadata; + extern const SettingsBool allow_experimental_analyzer; } namespace MergeTreeSetting @@ -377,6 +378,7 @@ DatabaseAndTable DatabaseCatalog::getTableImpl( return {}; } + bool analyzer = context_->getSettingsRef()[Setting::allow_experimental_analyzer]; if (table_id.hasUUID()) { /// Shortcut for tables which have persistent UUID @@ -395,7 +397,8 @@ DatabaseAndTable DatabaseCatalog::getTableImpl( } return {}; } - else + /// In old analyzer resolving done in multiple places, so we ignore TABLE_UUID_MISMATCH error. + else if (!analyzer) { const auto & table_storage_id = db_and_table.second->getStorageID(); if (db_and_table.first->getDatabaseName() != table_id.database_name || @@ -1075,21 +1078,27 @@ bool DatabaseCatalog::isDictionaryExist(const StorageID & table_id) const StoragePtr DatabaseCatalog::getTable(const StorageID & table_id, ContextPtr local_context) const { std::optional exc; - auto res = getTableImpl(table_id, local_context, &exc); - if (!res.second) + auto table = local_context->hasQueryContext() ? + local_context->getQueryContext()->getOrCacheStorage(table_id, [&](){ return getTableImpl(table_id, local_context, &exc); }, &exc).second : + getTableImpl(table_id, local_context, &exc).second; + if (!table) throw Exception(*exc); - return res.second; + return table; } StoragePtr DatabaseCatalog::tryGetTable(const StorageID & table_id, ContextPtr local_context) const { - return getTableImpl(table_id, local_context, nullptr).second; + return local_context->hasQueryContext() ? + local_context->getQueryContext()->getOrCacheStorage(table_id, [&](){ return getTableImpl(table_id, local_context, nullptr); }, nullptr).second : + getTableImpl(table_id, local_context, nullptr).second; } DatabaseAndTable DatabaseCatalog::getDatabaseAndTable(const StorageID & table_id, ContextPtr local_context) const { std::optional exc; - auto res = getTableImpl(table_id, local_context, &exc); + auto res = local_context->hasQueryContext() ? + local_context->getQueryContext()->getOrCacheStorage(table_id, [&](){ return getTableImpl(table_id, local_context, &exc); }, &exc) : + getTableImpl(table_id, local_context, &exc); if (!res.second) throw Exception(*exc); return res; @@ -1097,7 +1106,9 @@ DatabaseAndTable DatabaseCatalog::getDatabaseAndTable(const StorageID & table_id DatabaseAndTable DatabaseCatalog::tryGetDatabaseAndTable(const StorageID & table_id, ContextPtr local_context) const { - return getTableImpl(table_id, local_context, nullptr); + return local_context->hasQueryContext() ? + local_context->getQueryContext()->getOrCacheStorage(table_id, [&](){ return getTableImpl(table_id, local_context, nullptr); }, nullptr) : + getTableImpl(table_id, local_context, nullptr); } void DatabaseCatalog::loadMarkedAsDroppedTables() diff --git a/src/Interpreters/GraceHashJoin.cpp b/src/Interpreters/GraceHashJoin.cpp index e0491f94261f..f257c46f7e70 100644 --- a/src/Interpreters/GraceHashJoin.cpp +++ b/src/Interpreters/GraceHashJoin.cpp @@ -541,6 +541,17 @@ class GraceHashJoin::DelayedBlocks : public IBlocksStream if (not_processed) { auto res = not_processed->next(); + if (res.is_last && res.next_block) + { + res.next_block->filterBySelector(); + auto next_block = std::move(*res.next_block).getSourceBlock(); + if (next_block.rows() > 0) + { + auto new_res = hash_join->joinBlock(std::move(next_block)); + std::lock_guard lock(extra_block_mutex); + not_processed_results.emplace_back(std::move(new_res)); + } + } if (!res.is_last) { std::lock_guard lock(extra_block_mutex); @@ -615,6 +626,17 @@ class GraceHashJoin::DelayedBlocks : public IBlocksStream auto res = hash_join->joinBlock(block); auto next = res->next(); + if (next.is_last && next.next_block) + { + next.next_block->filterBySelector(); + auto next_block = std::move(*next.next_block).getSourceBlock(); + if (next_block.rows() > 0) + { + auto new_res = hash_join->joinBlock(std::move(next_block)); + std::lock_guard lock(extra_block_mutex); + not_processed_results.emplace_back(std::move(new_res)); + } + } if (!next.is_last) { std::lock_guard lock(extra_block_mutex); diff --git a/src/Interpreters/HashJoin/HashJoinMethods.h b/src/Interpreters/HashJoin/HashJoinMethods.h index d8476e7226c4..a3647b129757 100644 --- a/src/Interpreters/HashJoin/HashJoinMethods.h +++ b/src/Interpreters/HashJoin/HashJoinMethods.h @@ -193,7 +193,7 @@ class HashJoinMethods /// First to collect all matched rows refs by join keys, then filter out rows which are not true in additional filter expression. template - static size_t joinRightColumnsWithAddtitionalFilter( + static size_t joinRightColumnsWithAdditionalFilter( std::vector && key_getter_vector, const std::vector & mapv, AddedColumns & added_columns, diff --git a/src/Interpreters/HashJoin/HashJoinMethodsImpl.h b/src/Interpreters/HashJoin/HashJoinMethodsImpl.h index 2792f9c23a49..bbf9eebf5b2f 100644 --- a/src/Interpreters/HashJoin/HashJoinMethodsImpl.h +++ b/src/Interpreters/HashJoin/HashJoinMethodsImpl.h @@ -308,7 +308,7 @@ size_t HashJoinMethods::joinRightColumnsSwitchMu if (added_columns.additional_filter_expression) { const bool mark_per_row_used = join_features.right || join_features.full || mapv.size() > 1; - return joinRightColumnsWithAddtitionalFilter( + return joinRightColumnsWithAdditionalFilter( std::forward>(key_getter_vector), mapv, added_columns, @@ -817,7 +817,7 @@ static ColumnPtr buildAdditionalFilter( template template -size_t HashJoinMethods::joinRightColumnsWithAddtitionalFilter( +size_t HashJoinMethods::joinRightColumnsWithAdditionalFilter( std::vector && key_getter_vector, const std::vector & mapv, AddedColumns & added_columns, diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 23982cc61e7d..5cf92f9052bf 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -819,10 +819,16 @@ InterpreterSelectQuery::InterpreterSelectQuery( const auto parts = assert_cast(*storage_snapshot->data).parts; + /// Just attempting to read statistics files on disk can increase query latencies + /// First check the in-memory metadata if statistics are present at all + auto estimator = storage_snapshot->metadata->hasStatistics() + ? storage->getConditionSelectivityEstimator(*parts, context) + : nullptr; + MergeTreeWhereOptimizer where_optimizer{ std::move(column_compressed_sizes), storage_snapshot, - storage->getConditionSelectivityEstimator(parts ? *parts : RangesInDataParts{}, context), + estimator, queried_columns, supported_prewhere_columns, log}; diff --git a/src/Interpreters/Session.cpp b/src/Interpreters/Session.cpp index 84b1c0212755..a159d59db690 100644 --- a/src/Interpreters/Session.cpp +++ b/src/Interpreters/Session.cpp @@ -378,10 +378,10 @@ void Session::authenticate(const Credentials & credentials_, const Poco::Net::So LOG_DEBUG(log, "Authenticating user '{}' from {}", credentials_.getUserName(), address.toString()); + AuthResult auth_result; try { - auto auth_result = - global_context->getAccessControl().authenticate(credentials_, address.host(), getClientInfo()); + auth_result = global_context->getAccessControl().authenticate(credentials_, address.host(), getClientInfo()); user_id = auth_result.user_id; user_authenticated_with = auth_result.authentication_data; settings_from_auth_server = auth_result.settings; @@ -402,8 +402,9 @@ void Session::authenticate(const Credentials & credentials_, const Poco::Net::So throw; } - prepared_client_info->current_user = credentials_.getUserName(); - prepared_client_info->authenticated_user = credentials_.getUserName(); + chassert(!auth_result.user_name.empty()); + prepared_client_info->current_user = auth_result.user_name; + prepared_client_info->authenticated_user = auth_result.user_name; prepared_client_info->current_address = std::make_shared(address); } diff --git a/src/Interpreters/Set.cpp b/src/Interpreters/Set.cpp index ae51edb19ee9..f170024bf583 100644 --- a/src/Interpreters/Set.cpp +++ b/src/Interpreters/Set.cpp @@ -116,8 +116,10 @@ DataTypes Set::getElementTypes(DataTypes types, bool transform_null_in) { for (auto & type : types) { - if (const auto * low_cardinality_type = typeid_cast(type.get())) - type = low_cardinality_type->getDictionaryType(); + /// Strip LowCardinality recursively to match what setHeader/insertFromColumns do: + /// insertFromColumns calls convertToFullIfNeeded which recursively strips LC from + /// compound types like Tuple(LowCardinality(T), ...). + type = recursiveRemoveLowCardinality(type); if (!transform_null_in) type = removeNullable(type); @@ -155,10 +157,15 @@ void Set::setHeader(const ColumnsWithTypeAndName & header) if (const auto * low_cardinality_type = typeid_cast(data_types.back().get())) { data_types.back() = low_cardinality_type->getDictionaryType(); - set_elements_types.back() = low_cardinality_type->getDictionaryType(); materialized_columns.emplace_back(key_columns.back()->convertToFullColumnIfLowCardinality()); key_columns.back() = materialized_columns.back().get(); } + + /// Strip LowCardinality recursively from set_elements_types so they match what + /// convertToFullIfNeeded (which is recursive) does to columns in insertFromColumns. + /// Without this, compound types like Tuple(LowCardinality(T), ...) keep LowCardinality + /// in the type while the column has it stripped, causing type/column mismatches later. + set_elements_types.back() = recursiveRemoveLowCardinality(set_elements_types.back()); } /// We will insert to the Set only keys, where all components are not NULL. diff --git a/src/Interpreters/Squashing.cpp b/src/Interpreters/Squashing.cpp index 335af7339bd5..2c5f56bc9025 100644 --- a/src/Interpreters/Squashing.cpp +++ b/src/Interpreters/Squashing.cpp @@ -187,9 +187,17 @@ Chunk Squashing::squash(std::vector && input_chunks) for (size_t i = 0; i != num_columns; ++i) { + /// Materialize ColumnConst before concatenation, because ColumnConst::insertRangeFrom + /// ignores the source value and just increments the row count + if (isColumnConst(*mutable_columns[i])) + { + mutable_columns[i] = IColumn::mutate(mutable_columns[i]->convertToFullColumnIfConst()); + for (auto & column : source_columns_list[i]) + column = column->convertToFullColumnIfConst(); + } if (!have_same_serialization[i]) { - mutable_columns[i] = removeSpecialRepresentations(mutable_columns[i]->convertToFullColumnIfConst())->assumeMutable(); + mutable_columns[i] = IColumn::mutate(removeSpecialRepresentations(mutable_columns[i]->convertToFullColumnIfConst())); for (auto & column : source_columns_list[i]) column = removeSpecialRepresentations(column->convertToFullColumnIfConst()); } diff --git a/src/Processors/QueryPlan/Optimizations/optimizeJoin.cpp b/src/Processors/QueryPlan/Optimizations/optimizeJoin.cpp index f8c6d57bc804..b754e2ad96cd 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeJoin.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeJoin.cpp @@ -240,13 +240,13 @@ RelationStats estimateReadRowsCount(QueryPlan::Node & node, const ActionsDAG::No if (reading->getContext()->getSettingsRef()[Setting::use_statistics]) { - if (auto estimator_ = reading->getConditionSelectivityEstimator()) + if (auto estimator = reading->getConditionSelectivityEstimator()) { auto prewhere_info = reading->getPrewhereInfo(); const ActionsDAG::Node * prewhere_node = prewhere_info ? static_cast(prewhere_info->prewhere_actions.tryFindInOutputs(prewhere_info->prewhere_column_name)) : nullptr; - auto relation_profile = estimator_->estimateRelationProfile(reading->getStorageMetadata(), filter, prewhere_node); + auto relation_profile = estimator->estimateRelationProfile(reading->getStorageMetadata(), filter, prewhere_node); RelationStats stats {.estimated_rows = relation_profile.rows, .column_stats = relation_profile.column_stats, .table_name = table_display_name}; LOG_TRACE(getLogger("optimizeJoin"), "estimate statistics {}", dumpStatsForLogs(stats)); return stats; diff --git a/src/Processors/QueryPlan/Optimizations/optimizeReadInOrder.cpp b/src/Processors/QueryPlan/Optimizations/optimizeReadInOrder.cpp index 1765222d2e81..b5f497845de1 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeReadInOrder.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeReadInOrder.cpp @@ -236,16 +236,24 @@ void buildSortingDAG(QueryPlan::Node & node, std::optional & dag, Fi /// Should ignore limit if there is filtering. limit = 0; - //std::cerr << "====== Adding prewhere " << std::endl; appendExpression(dag, prewhere_info->prewhere_actions); if (const auto * filter_expression = dag->tryFindInOutputs(prewhere_info->prewhere_column_name)) appendFixedColumnsFromFilterExpression(*filter_expression, fixed_columns); + } + if (const auto row_level_filter = reading->getRowLevelFilter()) + { + /// Should ignore limit if there is filtering. + limit = 0; + + appendExpression(dag, row_level_filter->actions); + if (const auto * filter_expression = dag->tryFindInOutputs(row_level_filter->column_name)) + appendFixedColumnsFromFilterExpression(*filter_expression, fixed_columns); + } return; } - if (typeid_cast(step) || typeid_cast(step)) { limit = 0; diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 5133d725c50a..827dd04f3b68 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -1825,6 +1825,30 @@ ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(bool return analyzed_result_ptr; } +namespace +{ + +/// Check if all columns of all useful skip indexes are also part of the primary key. +/// When true, skip indexes cannot cause incorrect FINAL results (since PK-based filtering cannot drop parts with overlapping key ranges), +/// so the `findPKRangesForFinalAfterSkipIndex` recovery pass can be skipped. +bool areAllSkipIndexColumnsInPrimaryKey(const Names & primary_key_columns, const UsefulSkipIndexes & skip_indexes) +{ + NameSet primary_key_columns_set(primary_key_columns.begin(), primary_key_columns.end()); + + for (const auto & skip_index : skip_indexes.useful_indices) + { + for (const auto & column : skip_index.index->index.column_names) + { + if (!primary_key_columns_set.contains(column)) + return false; + } + } + + return true; +} + +} + void ReadFromMergeTree::buildIndexes( std::optional & indexes, const ActionsDAG * filter_actions_dag_, @@ -1961,13 +1985,13 @@ void ReadFromMergeTree::buildIndexes( indexes->use_skip_indexes_for_disjunctions = settings[Setting::use_skip_indexes_for_disjunctions] && skip_indexes.useful_indices.size() > 1 - && !indexes->key_condition_rpn_template->hasOnlyConjunctions(); + && !indexes->key_condition_rpn_template->hasOnlyConjunctions() + && indexes->key_condition_rpn_template->getRPN().size() <= MergeTreeDataSelectExecutor::MAX_BITS_FOR_PARTIAL_DISJUNCTION_RESULT; indexes->use_skip_indexes_if_final_exact_mode = indexes->use_skip_indexes && !skip_indexes.empty() && query_info_.isFinal() && settings[Setting::use_skip_indexes_if_final_exact_mode] - && !areSkipIndexColumnsInPrimaryKey(primary_key_column_names, skip_indexes, - indexes->key_condition_rpn_template->hasOnlyConjunctions()); + && !areAllSkipIndexColumnsInPrimaryKey(primary_key_column_names, skip_indexes); { std::vector index_sizes; index_sizes.reserve(skip_indexes.useful_indices.size()); @@ -2027,11 +2051,41 @@ void ReadFromMergeTree::buildIndexes( indexes->skip_indexes = std::move(skip_indexes); } +void ReadFromMergeTree::deferFiltersAfterFinalIfNeeded() +{ + if (!isQueryWithFinal()) + return; + + const auto & settings = context->getSettingsRef(); + bool defer_row_policy = settings[Setting::apply_row_policy_after_final] && query_info.row_level_filter; + bool defer_prewhere = settings[Setting::apply_prewhere_after_final] && query_info.prewhere_info; + + /// row policy must run before prewhere. If row policy touches non-sorting-key columns, defer prewhere too + if (defer_row_policy && query_info.prewhere_info) + { + const auto & sorting_key_columns = storage_snapshot->metadata->getSortingKeyColumns(); + NameSet sorting_key_columns_set(sorting_key_columns.begin(), sorting_key_columns.end()); + + auto required = query_info.row_level_filter->actions.getRequiredColumnsNames(); + bool all_in_sorting_key = std::all_of( + required.begin(), required.end(), + [&](const auto & col) { return sorting_key_columns_set.contains(col); }); + if (!all_in_sorting_key) + defer_prewhere = true; + } + + if (defer_row_policy) + deferred_row_level_filter = query_info.row_level_filter; + if (defer_prewhere) + deferred_prewhere_info = query_info.prewhere_info; +} + void ReadFromMergeTree::applyFilters(ActionDAGNodes added_filter_nodes) { if (!indexes) { - auto dag = ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes, query_info.buildNodeNameToInputNodeColumn()); + auto node_name_to_input = query_info.buildNodeNameToInputNodeColumn(); + auto dag = ActionsDAG::buildFilterActionsDAG(added_filter_nodes.nodes, node_name_to_input); filter_actions_dag = dag ? std::make_shared(std::move(*dag)) : nullptr; /// NOTE: Currently we store two DAGs for analysis: @@ -2041,9 +2095,44 @@ void ReadFromMergeTree::applyFilters(ActionDAGNodes added_filter_nodes) if (filter_actions_dag) query_info.filter_actions_dag = filter_actions_dag; + /// don't let deferred filters participate in index analysis + /// otherwise partition pruning / skip indexes could drop data that FINAL still needs + const ActionsDAG * index_filter_dag = query_info.filter_actions_dag.get(); + std::shared_ptr index_filter_dag_without_deferred; + + deferFiltersAfterFinalIfNeeded(); + if (deferred_row_level_filter || deferred_prewhere_info) + { + /// build a separate DAG for index analysis, without the deferred filter nodes + NameSet deferred_column_names; + if (deferred_row_level_filter) + deferred_column_names.insert(deferred_row_level_filter->column_name); + if (deferred_prewhere_info) + deferred_column_names.insert(deferred_prewhere_info->prewhere_column_name); + + std::vector index_nodes; + for (const auto * node : added_filter_nodes.nodes) + { + if (!deferred_column_names.contains(node->result_name)) + index_nodes.push_back(node); + } + + auto idx_dag = ActionsDAG::buildFilterActionsDAG(index_nodes, node_name_to_input); + if (idx_dag) + index_filter_dag_without_deferred = std::make_shared(std::move(*idx_dag)); + /// nullptr is fine here: all filters are deferred, nothing left for indexes + index_filter_dag = index_filter_dag_without_deferred.get(); + + LOG_DEBUG( + log, + "Excluding deferred filters from index analysis: row_policy={}, prewhere={}", + deferred_row_level_filter != nullptr, + deferred_prewhere_info != nullptr); + } + buildIndexes( indexes, - query_info.filter_actions_dag.get(), + index_filter_dag, data, getParts(), vector_search_parameters, @@ -2972,98 +3061,57 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons storage_snapshot->data = std::make_unique(); } - const auto & local_settings = context->getSettingsRef(); - /// Check if we should apply row policy and prewhere after FINAL instead of during reading /// (for correct behavior with ReplacingMergeTree where row policy should not affect which row "wins" during deduplication) /// also PREWHERE must always be executed after row policy, so if row policy is deferred, prewhere must be too - FilterDAGInfoPtr deferred_row_level_filter; - PrewhereInfoPtr deferred_prewhere_info; - - if (isQueryWithFinal()) + if (deferred_row_level_filter || deferred_prewhere_info) { - bool defer_row_policy = local_settings[Setting::apply_row_policy_after_final] && query_info.row_level_filter; - bool defer_prewhere = local_settings[Setting::apply_prewhere_after_final] && query_info.prewhere_info; - - /// If row policy is deferred and uses non-sorting-key columns, prewhere must also be deferred - /// to maintain correct execution order (row policy before prewhere). - if (defer_row_policy && query_info.prewhere_info) - { - const auto & sorting_key_columns = storage_snapshot->metadata->getSortingKeyColumns(); - NameSet sorting_key_columns_set(sorting_key_columns.begin(), sorting_key_columns.end()); + if (deferred_row_level_filter) + query_info.row_level_filter = nullptr; + if (deferred_prewhere_info) + query_info.prewhere_info = nullptr; - auto filter_required_columns = query_info.row_level_filter->actions.getRequiredColumnsNames(); - bool all_columns_in_sorting_key = true; - for (const auto & col : filter_required_columns) - { - if (!sorting_key_columns_set.contains(col)) - { - all_columns_in_sorting_key = false; - break; - } - } - /// If row policy uses non-sorting-key columns, we must defer prewhere too - if (!all_columns_in_sorting_key) - defer_prewhere = true; - } + /// Ensure columns required by deferred filters are included in the columns to read + /// Without this, SELECT x would fail if row policy uses column y + NameSet columns_to_read_set(result.column_names_to_read.begin(), result.column_names_to_read.end()); + NameSet all_columns_set(all_column_names.begin(), all_column_names.end()); - if (defer_row_policy || defer_prewhere) + auto add_required_columns = [&](const Names & required_columns) { - /// Save the filters and clear them from query_info so they won't be applied during reading - if (defer_row_policy) - { - deferred_row_level_filter = query_info.row_level_filter; - query_info.row_level_filter = nullptr; - } - - if (defer_prewhere) - { - deferred_prewhere_info = query_info.prewhere_info; - query_info.prewhere_info = nullptr; - } - - /// Ensure columns required by deferred filters are included in the columns to read. - /// Without this, SELECT x would fail if row policy uses column y. - NameSet columns_to_read_set(result.column_names_to_read.begin(), result.column_names_to_read.end()); - NameSet all_columns_set(all_column_names.begin(), all_column_names.end()); - - auto add_required_columns = [&](const Names & required_columns) + for (const auto & col : required_columns) { - for (const auto & col : required_columns) + if (!columns_to_read_set.contains(col)) { - if (!columns_to_read_set.contains(col)) - { - result.column_names_to_read.push_back(col); - columns_to_read_set.insert(col); - } - if (!all_columns_set.contains(col)) - { - all_column_names.push_back(col); - all_columns_set.insert(col); - } + result.column_names_to_read.push_back(col); + columns_to_read_set.insert(col); } - }; + if (!all_columns_set.contains(col)) + { + all_column_names.push_back(col); + all_columns_set.insert(col); + } + } + }; - if (deferred_row_level_filter) - add_required_columns(deferred_row_level_filter->actions.getRequiredColumnsNames()); + if (deferred_row_level_filter) + add_required_columns(deferred_row_level_filter->actions.getRequiredColumnsNames()); - if (deferred_prewhere_info) - add_required_columns(deferred_prewhere_info->prewhere_actions.getRequiredColumnsNames()); + if (deferred_prewhere_info) + add_required_columns(deferred_prewhere_info->prewhere_actions.getRequiredColumnsNames()); - /// Recreate output_header without the deferred filters since they will be applied after FINAL - output_header = std::make_shared(MergeTreeSelectProcessor::transformHeader( - storage_snapshot->getSampleBlockForColumns(all_column_names), - defer_row_policy ? nullptr : query_info.row_level_filter, - defer_prewhere ? nullptr : query_info.prewhere_info)); + /// Recreate output_header without the deferred filters since they will be applied after FINAL + output_header = std::make_shared(MergeTreeSelectProcessor::transformHeader( + storage_snapshot->getSampleBlockForColumns(all_column_names), + query_info.row_level_filter, + query_info.prewhere_info)); - LOG_DEBUG( - log, - "Deferring filters to after FINAL: row_policy={}, prewhere={}. columns_to_read={}", - defer_row_policy, - defer_prewhere, - fmt::join(result.column_names_to_read, ",")); - } + LOG_DEBUG( + log, + "Deferring filters to after FINAL: row_policy={}, prewhere={}. columns_to_read={}", + deferred_row_level_filter != nullptr, + deferred_prewhere_info != nullptr, + fmt::join(result.column_names_to_read, ",")); } shared_virtual_fields.emplace("_sample_factor", result.sampling.used_sample_factor); @@ -3263,49 +3311,32 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons } /// apply row policy after FINAL if needed (must be applied before prewhere) - if (deferred_row_level_filter) + auto add_deferred_filter = [&pipe](ActionsDAG filter_dag, const String & column_name, bool remove_column) { - /// Clone the filter DAG and add all inputs to outputs to preserve them - auto filter_dag = deferred_row_level_filter->actions.clone(); - - /// Get all input column names and add them to outputs NameSet input_names; for (const auto * input : filter_dag.getInputs()) input_names.insert(input->result_name); restoreDAGInputs(filter_dag, input_names); - auto row_level_filter_actions = std::make_shared(std::move(filter_dag)); - pipe.addSimpleTransform([&](const SharedHeader & header) + auto actions = std::make_shared(std::move(filter_dag)); + pipe.addSimpleTransform([&, actions](const SharedHeader & header) { - return std::make_shared( - header, - row_level_filter_actions, - deferred_row_level_filter->column_name, - deferred_row_level_filter->do_remove_column); + return std::make_shared(header, actions, column_name, remove_column); }); - } + }; + + if (deferred_row_level_filter) + add_deferred_filter( + deferred_row_level_filter->actions.clone(), + deferred_row_level_filter->column_name, + deferred_row_level_filter->do_remove_column); /// apply deferred PREWHERE after row policy if (deferred_prewhere_info) - { - /// Clone the prewhere DAG and add all inputs to outputs to preserve them - auto prewhere_dag = deferred_prewhere_info->prewhere_actions.clone(); - - NameSet input_names; - for (const auto * input : prewhere_dag.getInputs()) - input_names.insert(input->result_name); - restoreDAGInputs(prewhere_dag, input_names); - - auto prewhere_actions = std::make_shared(std::move(prewhere_dag)); - pipe.addSimpleTransform([&](const SharedHeader & header) - { - return std::make_shared( - header, - prewhere_actions, - deferred_prewhere_info->prewhere_column_name, - deferred_prewhere_info->remove_prewhere_column); - }); - } + add_deferred_filter( + deferred_prewhere_info->prewhere_actions.clone(), + deferred_prewhere_info->prewhere_column_name, + deferred_prewhere_info->remove_prewhere_column); Block cur_header = pipe.getHeader(); @@ -3451,6 +3482,15 @@ void ReadFromMergeTree::describeActions(FormatSettings & format_settings) const expression->describeActions(format_settings.out, prefix); } + if (deferred_prewhere_info || deferred_row_level_filter) + { + format_settings.out << prefix << "Deferred filters (applied after FINAL)" << '\n'; + if (deferred_row_level_filter) + format_settings.out << prefix << " Deferred row level filter column: " << deferred_row_level_filter->column_name << '\n'; + if (deferred_prewhere_info) + format_settings.out << prefix << " Deferred prewhere filter column: " << deferred_prewhere_info->prewhere_column_name << '\n'; + } + if (virtual_row_conversion) { format_settings.out << prefix << "Virtual row conversions" << '\n'; @@ -3499,6 +3539,16 @@ void ReadFromMergeTree::describeActions(JSONBuilder::JSONMap & map) const if (prewhere_info_map) map.add("Prewhere info", std::move(prewhere_info_map)); + if (deferred_prewhere_info || deferred_row_level_filter) + { + auto deferred_map = std::make_unique(); + if (deferred_row_level_filter) + deferred_map->add("Deferred row level filter column", deferred_row_level_filter->column_name); + if (deferred_prewhere_info) + deferred_map->add("Deferred prewhere filter column", deferred_prewhere_info->prewhere_column_name); + map.add("Deferred filters (applied after FINAL)", std::move(deferred_map)); + } + if (virtual_row_conversion) map.add("Virtual row conversions", virtual_row_conversion->toTree()); } @@ -3873,27 +3923,14 @@ bool ReadFromMergeTree::isSkipIndexAvailableForTopK(const String & sort_column) return false; } -/// Check if any/all columns with the given skip indexes are also part of the primary key -bool ReadFromMergeTree::areSkipIndexColumnsInPrimaryKey(const Names & primary_key_columns, const UsefulSkipIndexes & skip_indexes, bool any_one) -{ - NameSet primary_key_columns_set(primary_key_columns.begin(), primary_key_columns.end()); - - for (const auto & skip_index : skip_indexes.useful_indices) - { - for (const auto & column : skip_index.index->index.column_names) - { - if (primary_key_columns_set.contains(column) && any_one) - return true; - else if (!primary_key_columns_set.contains(column) && !any_one) - return false; - } - } - - return !any_one; -} ConditionSelectivityEstimatorPtr ReadFromMergeTree::getConditionSelectivityEstimator() const { + /// Just attempting to read statistics files on disk can increase query latencies + /// First check the in-memory metadata if statistics are present at all + if (!getStorageMetadata()->hasStatistics()) + return nullptr; + return data.getConditionSelectivityEstimator(getParts(), getContext()); } diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.h b/src/Processors/QueryPlan/ReadFromMergeTree.h index a2f48822298a..205df9b947bc 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.h +++ b/src/Processors/QueryPlan/ReadFromMergeTree.h @@ -320,7 +320,6 @@ class ReadFromMergeTree final : public SourceStepWithFilter bool allow_query_condition_cache_, bool supports_skip_indexes_on_data_read); - static bool areSkipIndexColumnsInPrimaryKey(const Names & primary_key_columns, const UsefulSkipIndexes & skip_indexes, bool any_one); AnalysisResultPtr selectRangesToRead(bool find_exact_ranges = false) const; @@ -399,6 +398,8 @@ class ReadFromMergeTree final : public SourceStepWithFilter std::unique_ptr keepOnlyRequiredColumnsAndCreateLazyReadStep(const NameSet & required_outputs); void addStartingPartOffsetAndPartOffset(bool & added_part_starting_offset, bool & added_part_offset); + void deferFiltersAfterFinalIfNeeded(); + private: MergeTreeSettingsPtr data_settings; MergeTreeReaderSettings reader_settings; @@ -426,6 +427,10 @@ class ReadFromMergeTree final : public SourceStepWithFilter /// Pre-computed value, needed to trigger sets creating for PK mutable std::optional indexes; + /// Row policy / prewhere deferred to after FINAL, if needed + FilterDAGInfoPtr deferred_row_level_filter; + PrewhereInfoPtr deferred_prewhere_info; + LoggerPtr log; UInt64 selected_parts = 0; UInt64 selected_rows = 0; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 191f886413ba..0e41ee4da2da 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -10330,6 +10330,9 @@ bool MergeTreeData::supportsTrivialCountOptimization(const StorageSnapshotPtr & const auto & snapshot_data = assert_cast(*storage_snapshot->data); const auto & mutations_snapshot = snapshot_data.mutations_snapshot; + if (!mutations_snapshot) + return !settings[Setting::apply_mutations_on_fly] && !settings[Setting::apply_patch_parts]; + return !mutations_snapshot->hasDataMutations() && !mutations_snapshot->hasPatchParts(); } diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 2dd77f0c3fdf..53c691a8c8d2 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -30,6 +30,7 @@ #include #include +#include #include #include #include @@ -562,8 +563,21 @@ std::optional> MergeTreeDataSelectExecutor::filterPar break; } } + if (!has_virtual_column_input) + { + /// If no virtual columns are referenced the the filter is a constant expression. + /// A constant-false filter (e.g. pushed down from a UNION ALL branch that can never + /// match) must still exclude all parts, so check for that before skipping. + const auto & output_node = *dag->getOutputs().front(); + if (output_node.column) + { + ConstantFilterDescription filter_description(*output_node.column); + if (filter_description.always_false) + return std::unordered_set{}; + } return {}; + } auto start_time = std::chrono::steady_clock::now(); diff --git a/src/Storages/MergeTree/PatchParts/applyPatches.cpp b/src/Storages/MergeTree/PatchParts/applyPatches.cpp index 0cccf8f0abfb..6b8c2fcdcc70 100644 --- a/src/Storages/MergeTree/PatchParts/applyPatches.cpp +++ b/src/Storages/MergeTree/PatchParts/applyPatches.cpp @@ -230,9 +230,13 @@ IColumn::Patch CombinedPatchBuilder::createPatchForColumn(const String & column_ for (const auto & patch_block : all_patch_blocks) { + const auto & patch_column = patch_block.getByName(column_name).column; + if (!patch_column) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Column {} has null data in patch block", column_name); + IColumn::Patch::Source source = { - .column = *patch_block.getByName(column_name).column, + .column = *patch_column, .versions = getColumnUInt64Data(patch_block, PartDataVersionColumn::name), }; @@ -266,8 +270,8 @@ Block getUpdatedHeader(const PatchesToApply & patches, const NameSet & updated_c for (const auto & column : patch->patch_blocks[0]) { - /// Ignore columns that are not updated. - if (!updated_columns.contains(column.name)) + /// Ignore columns that are not updated or have no data. + if (!updated_columns.contains(column.name) || !column.column) header.erase(column.name); } @@ -293,7 +297,7 @@ bool canApplyPatchesRaw(const PatchesToApply & patches) { for (const auto & column : patch->patch_blocks.front()) { - if (!isPatchPartSystemColumn(column.name) && !canApplyPatchInplace(*column.column)) + if (!isPatchPartSystemColumn(column.name) && column.column && !canApplyPatchInplace(*column.column)) return false; } } @@ -325,9 +329,16 @@ void applyPatchesToBlockRaw( chassert(patch_to_apply->patch_blocks.size() == 1); const auto & patch_block = patch_to_apply->patch_blocks.front(); + if (!patch_block.has(result_column.name)) + continue; + + const auto & patch_column = patch_block.getByName(result_column.name).column; + if (!patch_column) + continue; + IColumn::Patch::Source source = { - .column = *patch_block.getByName(result_column.name).column, + .column = *patch_column, .versions = getColumnUInt64Data(patch_block, PartDataVersionColumn::name), }; diff --git a/src/Storages/MergeTree/tests/gtest_trivial_count_null_snapshot.cpp b/src/Storages/MergeTree/tests/gtest_trivial_count_null_snapshot.cpp new file mode 100644 index 000000000000..37b0785a61e0 --- /dev/null +++ b/src/Storages/MergeTree/tests/gtest_trivial_count_null_snapshot.cpp @@ -0,0 +1,130 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB::Setting +{ +extern const SettingsBool apply_mutations_on_fly; +extern const SettingsBool apply_patch_parts; +} + +/// Test that MergeTreeData::supportsTrivialCountOptimization does not crash +/// when the storage snapshot has a SnapshotData with null mutations_snapshot. +/// +/// This is a regression test for a SEGFAULT (Cloud incident #1317) where +/// `createStorageSnapshot(metadata, context, /*without_data=*/true)` produces +/// a SnapshotData with `mutations_snapshot = nullptr`, and a code path in the +/// Planner (subquery building for sets) could pass such a snapshot to +/// `supportsTrivialCountOptimization`, which dereferenced it unconditionally. + +TEST(SupportsTrivialCountOptimization, NullMutationsSnapshot) +{ + using namespace DB; + + MainThreadStatus::getInstance(); + tryRegisterFunctions(); + tryRegisterAggregateFunctions(); + + getActivePartsLoadingThreadPool().initializeWithDefaultSettingsIfNotInitialized(); + getOutdatedPartsLoadingThreadPool().initializeWithDefaultSettingsIfNotInitialized(); + getUnexpectedPartsLoadingThreadPool().initializeWithDefaultSettingsIfNotInitialized(); + getPartsCleaningThreadPool().initializeWithDefaultSettingsIfNotInitialized(); + + const auto & context_holder = getContext(); + auto context = Context::createCopy(context_holder.context); + + /// Build minimal StorageInMemoryMetadata for a MergeTree with one UInt64 column and tuple() ORDER BY. + StorageInMemoryMetadata metadata; + + ColumnsDescription columns; + columns.add(ColumnDescription("a", std::make_shared())); + metadata.setColumns(columns); + + /// ORDER BY tuple() — empty sorting key. + auto order_by_ast = makeASTFunction("tuple"); + metadata.sorting_key = KeyDescription::getSortingKeyFromAST(order_by_ast, metadata.columns, context, {}); + metadata.primary_key = KeyDescription::getKeyFromAST(order_by_ast, metadata.columns, context); + metadata.primary_key.definition_ast = nullptr; + + /// PARTITION BY — empty partition key. + metadata.partition_key = KeyDescription::getKeyFromAST(nullptr, metadata.columns, context); + + auto minmax_columns = metadata.getColumnsRequiredForPartitionKey(); + auto partition_key = metadata.partition_key.expression_list_ast->clone(); + metadata.minmax_count_projection.emplace( + ProjectionDescription::getMinMaxCountProjection(columns, partition_key, minmax_columns, metadata.primary_key, context)); + + auto storage_settings = std::make_unique(context->getMergeTreeSettings()); + + /// Create the StorageMergeTree with ATTACH mode to skip sanity checks. + auto storage = std::make_shared( + StorageID("test_db", "test_table"), + "store/test_trivial_count/", + metadata, + LoadingStrictnessLevel::ATTACH, + context, + /*date_column_name=*/"", + MergeTreeData::MergingParams{}, + std::move(storage_settings)); + + auto metadata_snapshot = storage->getInMemoryMetadataPtr(); + + /// Case 1: Null StorageSnapshot entirely (already handled by existing code before our fix). + { + StorageSnapshotPtr null_snapshot = nullptr; + EXPECT_NO_THROW(storage->supportsTrivialCountOptimization(null_snapshot, context)); + } + + /// Case 2: SnapshotData with null mutations_snapshot — this is the crash scenario. + /// createStorageSnapshot(metadata, context, /*without_data=*/true) produces exactly this. + { + auto snapshot_data = std::make_unique(); + /// mutations_snapshot is default-constructed to nullptr. + ASSERT_EQ(snapshot_data->mutations_snapshot, nullptr); + + auto snapshot = std::make_shared(*storage, metadata_snapshot, std::move(snapshot_data)); + EXPECT_NO_THROW(storage->supportsTrivialCountOptimization(snapshot, context)); + + /// With default settings (apply_mutations_on_fly=false, apply_patch_parts=true), + /// the function should return false because !true == false. + auto snapshot_data2 = std::make_unique(); + auto snapshot2 = std::make_shared(*storage, metadata_snapshot, std::move(snapshot_data2)); + EXPECT_FALSE(storage->supportsTrivialCountOptimization(snapshot2, context)); + } + + /// Case 3: SnapshotData with null mutations_snapshot and apply_patch_parts=false. + { + auto modified_context = Context::createCopy(context); + modified_context->setSetting("apply_patch_parts", Field(false)); + modified_context->setSetting("apply_mutations_on_fly", Field(false)); + + auto snapshot_data = std::make_unique(); + auto snapshot = std::make_shared(*storage, metadata_snapshot, std::move(snapshot_data)); + EXPECT_TRUE(storage->supportsTrivialCountOptimization(snapshot, modified_context)); + } + + /// Case 4: Verify the real getStorageSnapshotWithoutData path produces the same. + { + auto without_data_snapshot = storage->getStorageSnapshotWithoutData(metadata_snapshot, context); + ASSERT_NE(without_data_snapshot, nullptr); + ASSERT_NE(without_data_snapshot->data, nullptr); + + const auto & data = assert_cast(*without_data_snapshot->data); + EXPECT_EQ(data.mutations_snapshot, nullptr); + + EXPECT_NO_THROW(storage->supportsTrivialCountOptimization(without_data_snapshot, context)); + } + + storage->flushAndShutdown(); +} diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp index df5b9bd1be3e..fe3467190f83 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.cpp @@ -215,11 +215,11 @@ std::optional SingleThreadIcebergKeysIterator::next() return manifest_file_entry; } case PruningReturnStatus::MIN_MAX_INDEX_PRUNED: { - ++min_max_index_pruned_files; + ProfileEvents::increment(ProfileEvents::IcebergMinMaxIndexPrunedFiles, 1); break; } case PruningReturnStatus::PARTITION_PRUNED: { - ++partition_pruned_files; + ProfileEvents::increment(ProfileEvents::IcebergPartitionPrunedFiles, 1); break; } } @@ -235,14 +235,6 @@ std::optional SingleThreadIcebergKeysIterator::next() return std::nullopt; } -SingleThreadIcebergKeysIterator::~SingleThreadIcebergKeysIterator() -{ - if (partition_pruned_files > 0) - ProfileEvents::increment(ProfileEvents::IcebergPartitionPrunedFiles, partition_pruned_files); - if (min_max_index_pruned_files > 0) - ProfileEvents::increment(ProfileEvents::IcebergMinMaxIndexPrunedFiles, min_max_index_pruned_files); -} - SingleThreadIcebergKeysIterator::SingleThreadIcebergKeysIterator( ObjectStoragePtr object_storage_, ContextPtr local_context_, @@ -343,8 +335,9 @@ IcebergIterator::IcebergIterator( std::sort(equality_deletes_files.begin(), equality_deletes_files.end()); std::sort(position_deletes_files.begin(), position_deletes_files.end()); producer_task.emplace( - [this]() + [this, thread_group = CurrentThread::getGroup()]() { + DB::ThreadGroupSwitcher switcher(thread_group, DB::ThreadName::ICEBERG_ITERATOR); while (!blocking_queue.isFinished()) { std::optional entry; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h index 794a6a2fddd4..ac3cabfa0149 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h @@ -51,8 +51,6 @@ class SingleThreadIcebergKeysIterator std::optional next(); - ~SingleThreadIcebergKeysIterator(); - private: ObjectStoragePtr object_storage; std::shared_ptr filter_dag; @@ -75,9 +73,6 @@ class SingleThreadIcebergKeysIterator std::optional current_pruner; const Iceberg::ManifestFileContentType manifest_file_content_type; - - size_t min_max_index_pruned_files = 0; - size_t partition_pruned_files = 0; }; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp index 89e345306491..4efa9b46615a 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp @@ -241,7 +241,7 @@ DataTypePtr IcebergSchemaProcessor::getSimpleType(const String & type_name, Cont if (type_name == f_double) return std::make_shared(); if (type_name == f_date) - return std::make_shared(); + return std::make_shared(); if (type_name == f_time) return std::make_shared(); if (type_name == f_timestamp) diff --git a/src/Storages/StorageInMemoryMetadata.cpp b/src/Storages/StorageInMemoryMetadata.cpp index 9cd1e284618b..28bec5f1cf32 100644 --- a/src/Storages/StorageInMemoryMetadata.cpp +++ b/src/Storages/StorageInMemoryMetadata.cpp @@ -634,6 +634,16 @@ bool StorageInMemoryMetadata::hasSelectQuery() const return select.select_query != nullptr; } +bool StorageInMemoryMetadata::hasStatistics() const +{ + for (const auto & column : columns) + { + if (!column.statistics.empty()) + return true; + } + return false; +} + namespace { using NamesAndTypesMap = HashMapWithSavedHash; diff --git a/src/Storages/StorageInMemoryMetadata.h b/src/Storages/StorageInMemoryMetadata.h index 8e5eb81abbb7..0fcf803433e7 100644 --- a/src/Storages/StorageInMemoryMetadata.h +++ b/src/Storages/StorageInMemoryMetadata.h @@ -279,6 +279,9 @@ struct StorageInMemoryMetadata const SelectQueryDescription & getSelectQuery() const; bool hasSelectQuery() const; + /// If any of the columns has statistics. + bool hasStatistics() const; + /// Get version of metadata int32_t getMetadataVersion() const { return metadata_version; } diff --git a/tests/integration/test_keeper_azure_s3_plain/__init__.py b/tests/integration/test_keeper_azure_s3_plain/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/integration/test_keeper_azure_s3_plain/test.py b/tests/integration/test_keeper_azure_s3_plain/test.py new file mode 100644 index 000000000000..99b37cf0f98c --- /dev/null +++ b/tests/integration/test_keeper_azure_s3_plain/test.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python3 + +import logging +import os +import time + +import pytest +from azure.storage.blob import BlobServiceClient + +import helpers.keeper_utils as keeper_utils +from helpers.cluster import ClickHouseCluster + +CURRENT_TEST_DIR = os.path.dirname(os.path.abspath(__file__)) + + +def generate_config(azurite_port): + path = os.path.join(CURRENT_TEST_DIR, "_gen/keeper_config.xml") + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, "w") as f: + f.write( + f""" + + trace + + + + + local + /var/lib/clickhouse/coordination/logs/ + + + s3_plain + azure_blob_storage + http://azurite1:{azurite_port}/devstoreaccount1/cont + logs + devstoreaccount1 + Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== + + + local + /var/lib/clickhouse/coordination/snapshots/ + + + + + false + 9181 + 1 + false + + 5000 + 10000 + trace + + 100000 + 10 + 1 + 3 + + log_azure_plain + log_local + snapshot_local + + + 1 + node + 9234 + + + +""" + ) + return path + + +@pytest.fixture(scope="module") +def started_cluster(): + cluster = ClickHouseCluster(__file__) + try: + config_path = generate_config(cluster.azurite_port) + cluster.add_instance( + "node", + main_configs=[config_path], + stay_alive=True, + with_azurite=True, + ) + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def get_azure_container_client(started_cluster): + """Get an Azure container client for the Azurite instance.""" + port = started_cluster.env_variables["AZURITE_PORT"] + connection_string = ( + f"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;" + f"AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;" + f"BlobEndpoint=http://127.0.0.1:{port}/devstoreaccount1;" + ) + blob_service_client = BlobServiceClient.from_connection_string(connection_string) + return blob_service_client.get_container_client("cont") + + +def list_azure_blobs(started_cluster, prefix=""): + """List blobs in the Azure container via the Python SDK (from the host).""" + container_client = get_azure_container_client(started_cluster) + return [ + blob.name + for blob in container_client.list_blobs( + name_starts_with=prefix or None + ) + ] + + +def cleanup(started_cluster, node): + """Remove all Azure blobs and local coordination data for a clean run.""" + node.stop_clickhouse() + + container_client = get_azure_container_client(started_cluster) + for blob in container_client.list_blobs(): + container_client.delete_blob(blob.name) + + # Remove local coordination directories. + node.exec_in_container( + ["rm", "-rf", "/var/lib/clickhouse/coordination/logs"] + ) + node.exec_in_container( + ["rm", "-rf", "/var/lib/clickhouse/coordination/snapshots"] + ) + + node.start_clickhouse() + keeper_utils.wait_until_connected(started_cluster, node) + + +def test_keeper_logs_azure_s3_plain(started_cluster): + """Keeper writes logs to Azure s3_plain disk; after restart it must be able + to list and read them back.""" + + node = started_cluster.instances["node"] + cleanup(started_cluster, node) + + # Create enough Keeper entries to trigger multiple log file rotations. + # With stale_log_gap=10 and rotate_log_storage_interval=3, creating 30+ + # entries should produce several log files that get rotated to Azure. + zk = keeper_utils.get_fake_zk(started_cluster, "node") + try: + zk.create("/test_azure") + for i in range(30): + zk.create(f"/test_azure/node_{i:04d}", b"data") + finally: + zk.stop() + zk.close() + + # Wait for log rotation to move old log files to Azure. + deadline = time.time() + 60 + azure_blobs = [] + while time.time() < deadline: + azure_blobs = list_azure_blobs(started_cluster, prefix="logs/") + if azure_blobs: + break + time.sleep(1) + + logging.info(f"Azure blobs after log rotation: {azure_blobs}") + assert azure_blobs, "Expected Keeper log files in Azure after rotation" + + # Restart Keeper. On startup, it must list the Azure disk to discover + # old log files and replay them. With the fs::path bug, + # ContainerClientWrapper::ListBlobs receives prefix "/" which replaces + # the blob_prefix "logs" entirely, so Azure returns zero blobs. + node.restart_clickhouse() + keeper_utils.wait_until_connected(started_cluster, node) + + # All data must be accessible. If Azure listing is broken, the old logs + # are invisible and znodes written before the last log rotation are lost. + zk = keeper_utils.get_fake_zk(started_cluster, "node") + try: + children = sorted(zk.get_children("/test_azure")) + assert len(children) == 30, ( + f"Expected 30 znodes after restart, got {len(children)}. " + "Azure s3_plain listing likely returned empty results." + ) + for child in children: + data, _ = zk.get(f"/test_azure/{child}") + assert data == b"data" + finally: + zk.stop() + zk.close() diff --git a/tests/integration/test_statistics_cache/test.py b/tests/integration/test_statistics_cache/test.py index d47e6ca5e414..892ee7232af7 100644 --- a/tests/integration/test_statistics_cache/test.py +++ b/tests/integration/test_statistics_cache/test.py @@ -351,13 +351,8 @@ def test_drop_statistics_means_no_load_and_bypass_still_loads(): _query(ch1, "SELECT count() FROM drop_tbl WHERE v>0.99 AND k>=0 " "SETTINGS use_statistics_cache=0, log_comment='drop-bypass' FORMAT Null") - _assert_load(ch1, "drop-bypass") - - _wait_hit( - ch1, "drop-hit", - "SELECT count() FROM drop_tbl WHERE v>0.99 AND k>=0 " - "SETTINGS use_statistics_cache=1, log_comment='drop-hit' FORMAT Null" - ) + # after optimization, there is no load after `drop statistics` + _assert_hit(ch1, "drop-bypass") def test_per_replica_cache_and_restart_needed(): table = _create_rep(0) diff --git a/tests/integration/test_storage_iceberg_with_spark/test_dates.py b/tests/integration/test_storage_iceberg_with_spark/test_dates.py new file mode 100644 index 000000000000..3bb0c8f1bc6b --- /dev/null +++ b/tests/integration/test_storage_iceberg_with_spark/test_dates.py @@ -0,0 +1,74 @@ +import pytest + +from helpers.iceberg_utils import ( + create_iceberg_table, + get_uuid_str, + default_upload_directory, + default_download_directory +) + + +def test_date_reads(started_cluster_iceberg_with_spark): + instance = started_cluster_iceberg_with_spark.instances["node1"] + spark = started_cluster_iceberg_with_spark.spark_session + storage_type = 's3' + expected_rows=2 + expected_date_1='2299-12-31\n' + expected_date_2='1900-01-13\n' + + + TABLE_NAME = ( + "test_date_reads_" + + storage_type + + "_" + + get_uuid_str() + ) + + spark.sql( + f""" + CREATE TABLE {TABLE_NAME} ( + number INT, + date_col DATE + ) + USING iceberg + """ + ) + spark.sql( + f""" INSERT INTO {TABLE_NAME} VALUES(1,DATE '2299-12-31') """ + ) + spark.sql( + f""" INSERT INTO {TABLE_NAME} VALUES(2,DATE '1900-01-13') """ + ) + + files = default_upload_directory( + started_cluster_iceberg_with_spark, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", + ) + + + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster_iceberg_with_spark) + rows_in_ch = int( + instance.query( + f"SELECT count() FROM {TABLE_NAME}", + ) + ) + + assert rows_in_ch == expected_rows, f"Expected {expected_rows} rows, but got {rows_in_ch}" + + ret_date_1 = ( + instance.query( + f"SELECT date_col FROM {TABLE_NAME} where number=1", + ) + ) + + assert ret_date_1==expected_date_1, f"Expected {expected_date_1} rows, but got {ret_date_1}" + + ret_date_2 = ( + instance.query( + f"SELECT date_col FROM {TABLE_NAME} where number=2", + ) + ) + + assert ret_date_2==expected_date_2, f"Expected {expected_date_2} rows, but got {ret_date_2}" diff --git a/tests/integration/test_storage_iceberg_with_spark/test_schema_inference.py b/tests/integration/test_storage_iceberg_with_spark/test_schema_inference.py index 64725d275b4c..7085782b2818 100644 --- a/tests/integration/test_storage_iceberg_with_spark/test_schema_inference.py +++ b/tests/integration/test_storage_iceberg_with_spark/test_schema_inference.py @@ -61,7 +61,7 @@ def test_schema_inference(started_cluster_iceberg_with_spark, format_version, st ["decimalC1", "Nullable(Decimal(10, 3))"], ["decimalC2", "Nullable(Decimal(20, 10))"], ["decimalC3", "Nullable(Decimal(38, 30))"], - ["dateC", "Nullable(Date)"], + ["dateC", "Nullable(Date32)"], ["timestampC", "Nullable(DateTime64(6, 'UTC'))"], ["stringC", "Nullable(String)"], ["binaryC", "Nullable(String)"], diff --git a/tests/integration/test_storage_iceberg_with_spark/test_types.py b/tests/integration/test_storage_iceberg_with_spark/test_types.py index 7df6cda3dcf6..1dd605098279 100644 --- a/tests/integration/test_storage_iceberg_with_spark/test_types.py +++ b/tests/integration/test_storage_iceberg_with_spark/test_types.py @@ -81,7 +81,7 @@ def test_types(started_cluster_iceberg_with_spark, format_version, storage_type) [ ["a", "Nullable(Int32)"], ["b", "Nullable(String)"], - ["c", "Nullable(Date)"], + ["c", "Nullable(Date32)"], ["d", "Array(Nullable(String))"], ["e", "Nullable(Bool)"], ] @@ -104,7 +104,7 @@ def test_types(started_cluster_iceberg_with_spark, format_version, storage_type) [ ["a", "Nullable(Int32)"], ["b", "Nullable(String)"], - ["c", "Nullable(Date)"], + ["c", "Nullable(Date32)"], ["d", "Array(Nullable(String))"], ["e", "Nullable(Bool)"], ] @@ -127,7 +127,7 @@ def test_types(started_cluster_iceberg_with_spark, format_version, storage_type) [ ["a", "Nullable(Int32)"], ["b", "Nullable(String)"], - ["c", "Nullable(Date)"], + ["c", "Nullable(Date32)"], ["d", "Array(Nullable(String))"], ["e", "Nullable(Bool)"], ] diff --git a/tests/integration/test_storage_iceberg_with_spark/test_writes_with_partitioned_table.py b/tests/integration/test_storage_iceberg_with_spark/test_writes_with_partitioned_table.py index 604a0c79848d..7667d4dff2ee 100644 --- a/tests/integration/test_storage_iceberg_with_spark/test_writes_with_partitioned_table.py +++ b/tests/integration/test_storage_iceberg_with_spark/test_writes_with_partitioned_table.py @@ -82,4 +82,4 @@ def execute_spark_query(query: str): f.write(b"3") df = spark.read.format("iceberg").load(f"/var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}").collect() - assert len(df) == 10 \ No newline at end of file + assert len(df) == 10 diff --git a/tests/queries/0_stateless/03400_auto_minmax_final_sign_bug.reference b/tests/queries/0_stateless/03400_auto_minmax_final_sign_bug.reference new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/queries/0_stateless/03400_auto_minmax_final_sign_bug.sql b/tests/queries/0_stateless/03400_auto_minmax_final_sign_bug.sql new file mode 100644 index 000000000000..e8514bd11152 --- /dev/null +++ b/tests/queries/0_stateless/03400_auto_minmax_final_sign_bug.sql @@ -0,0 +1,38 @@ +-- Reproduces a bug where mixed PK and non-PK minmax skip indexes break FINAL queries. +-- +-- The minmax index on `sign` evaluates the `sign = 1` filter and drops the part +-- containing the delete marker (sign = -1). The recovery mechanism +-- (`findPKRangesForFinalAfterSkipIndex`) is disabled because the minmax index on +-- `key` (a PK column) is also useful, causing `areSkipIndexColumnsInPrimaryKey` +-- to return true. + +DROP TABLE IF EXISTS t_final_minmax; + +CREATE TABLE t_final_minmax +( + key Int32, + value String, + sign Int8, + ver UInt64, + INDEX idx_key key TYPE minmax, + INDEX idx_sign sign TYPE minmax +) +ENGINE = ReplacingMergeTree(ver) +PARTITION BY key +ORDER BY key; + +SYSTEM STOP MERGES t_final_minmax; + +-- Part 1: original row +INSERT INTO t_final_minmax VALUES (2, 'original', 1, 1); + +-- Part 2: delete marker (higher version, sign = -1) +INSERT INTO t_final_minmax VALUES (2, '', -1, 2); + +-- FINAL should keep only the row with highest version (ver=2, sign=-1). +-- Then sign = 1 filter should exclude it, returning empty result. +-- Bug: idx_sign drops the delete-marker part, and the recovery is disabled +-- because idx_key (on a PK column) is also useful. +SELECT value FROM t_final_minmax FINAL WHERE key = 2 AND sign = 1; + +DROP TABLE t_final_minmax; diff --git a/tests/queries/0_stateless/03742_apply_row_policy_after_final.reference b/tests/queries/0_stateless/03742_apply_row_policy_after_final.reference index 4fc6365b89b2..5c0acae733c0 100644 --- a/tests/queries/0_stateless/03742_apply_row_policy_after_final.reference +++ b/tests/queries/0_stateless/03742_apply_row_policy_after_final.reference @@ -41,3 +41,9 @@ --- PREWHERE before FINAL 1 2 + +--- filter before FINAL +1 aaa 1 +2 bbb 1 +--- deferred +2 bbb 1 diff --git a/tests/queries/0_stateless/03742_apply_row_policy_after_final.sql b/tests/queries/0_stateless/03742_apply_row_policy_after_final.sql index c47862917d9c..b0a68cc7cc31 100644 --- a/tests/queries/0_stateless/03742_apply_row_policy_after_final.sql +++ b/tests/queries/0_stateless/03742_apply_row_policy_after_final.sql @@ -132,3 +132,31 @@ SELECT '--- PREWHERE before FINAL'; SELECT x FROM tab_final FINAL PREWHERE y != 'ccc' ORDER BY x; DROP TABLE tab_final; + +-- test that partition pruning works OK with row policy +-- When row policy column in the partition key, minmax index must not prune partitions that FINAL needs for correct deduplication +SELECT ''; + +DROP TABLE IF EXISTS tab_part; +DROP ROW POLICY IF EXISTS pol_part ON tab_part; + +CREATE TABLE tab_part (x UInt32, y String, version UInt32) +ENGINE = ReplacingMergeTree(version) PARTITION BY y ORDER BY x; + +INSERT INTO tab_part VALUES (1, 'aaa', 1), (2, 'bbb', 1); +INSERT INTO tab_part VALUES (1, 'ccc', 2); + +CREATE ROW POLICY pol_part ON tab_part USING y != 'ccc' TO ALL; + +SET apply_row_policy_after_final = 0; +SELECT '--- filter before FINAL'; +-- partition 'ccc' pruned, FINAL sees (1,'aaa',1) and (2,'bbb',1) +SELECT * FROM tab_part FINAL ORDER BY x; + +SET apply_row_policy_after_final = 1; +SELECT '--- deferred'; +-- all partitions read, FINAL picks (1,'ccc',2) then row policy removes it +SELECT * FROM tab_part FINAL ORDER BY x; + +DROP ROW POLICY pol_part ON tab_part; +DROP TABLE tab_part; diff --git a/tests/queries/0_stateless/03823_skip_index_disjunction_oob.reference b/tests/queries/0_stateless/03823_skip_index_disjunction_oob.reference index ec635144f600..b62923296e54 100644 --- a/tests/queries/0_stateless/03823_skip_index_disjunction_oob.reference +++ b/tests/queries/0_stateless/03823_skip_index_disjunction_oob.reference @@ -1 +1,2 @@ 9 +9 diff --git a/tests/queries/0_stateless/03823_skip_index_disjunction_oob.sql b/tests/queries/0_stateless/03823_skip_index_disjunction_oob.sql index 27f02d16feae..123a9c15105c 100644 --- a/tests/queries/0_stateless/03823_skip_index_disjunction_oob.sql +++ b/tests/queries/0_stateless/03823_skip_index_disjunction_oob.sql @@ -31,4 +31,12 @@ SETTINGS use_primary_key = 0, use_skip_indexes_on_data_read = 0, use_skip_indexes_for_disjunctions = 1, use_query_condition_cache = 0, optimize_min_equality_disjunction_chain_length = 100; +SELECT count() FROM t_skip_index_disj_oob +WHERE (a = 1 AND b = 1) OR (a = 2 AND b = 2) OR (a = 3 AND b = 3) + OR (a = 4 AND b = 4) OR (a = 5 AND b = 5) OR (a = 6 AND b = 6) + OR (a = 7 AND b = 7) OR (a = 8 AND b = 8) OR (a = 9 AND b = 9) +SETTINGS use_primary_key = 0, use_skip_indexes_on_data_read = 1, + use_skip_indexes_for_disjunctions = 1, use_query_condition_cache = 0, + optimize_min_equality_disjunction_chain_length = 100; + DROP TABLE t_skip_index_disj_oob; diff --git a/tests/queries/0_stateless/03904_fix_empty_statistics_load.reference b/tests/queries/0_stateless/03904_fix_empty_statistics_load.reference new file mode 100644 index 000000000000..573541ac9702 --- /dev/null +++ b/tests/queries/0_stateless/03904_fix_empty_statistics_load.reference @@ -0,0 +1 @@ +0 diff --git a/tests/queries/0_stateless/03904_fix_empty_statistics_load.sql b/tests/queries/0_stateless/03904_fix_empty_statistics_load.sql new file mode 100644 index 000000000000..370ccf49efc3 --- /dev/null +++ b/tests/queries/0_stateless/03904_fix_empty_statistics_load.sql @@ -0,0 +1,45 @@ +-- Test for issue #96068 + +SET use_statistics = 1; + +DROP TABLE IF EXISTS tab; + +-- The table has no manually or automatically created statistics +CREATE TABLE tab +( + u64 UInt64, + u64_tdigest UInt64, + u64_minmax UInt64, + u64_countmin UInt64, + f64 Float64, + b Bool, + s String, +) Engine = MergeTree() ORDER BY tuple() PARTITION BY u64_minmax +SETTINGS min_bytes_for_wide_part = 0, auto_statistics_types = ''; + +-- Insert looooots of parts (1000) +INSERT INTO tab +SELECT number % 1000, + number % 1000, + number % 99, + number % 1000, + number % 1000, + number % 2, + toString(number % 1000) +FROM system.numbers LIMIT 10000; + +SELECT * FROM tab +WHERE u64_countmin > 3500 and u64_countmin < 3600 +FORMAT NULL +SETTINGS use_statistics_cache = 0, log_comment = '03904_empty'; + +SYSTEM FLUSH LOGS query_log; + +-- Expect that no statistics were loaded from disk +SELECT ProfileEvents['LoadedStatisticsMicroseconds'] +FROM system.query_log +WHERE type = 'QueryFinish' AND current_database = currentDatabase() AND log_comment = '03904_empty' +ORDER BY event_time_microseconds DESC +LIMIT 1; + +DROP TABLE IF EXISTS tab; diff --git a/tests/queries/0_stateless/03915_exchange_tables_race.reference b/tests/queries/0_stateless/03915_exchange_tables_race.reference new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/queries/0_stateless/03915_exchange_tables_race.sh b/tests/queries/0_stateless/03915_exchange_tables_race.sh new file mode 100755 index 000000000000..3b95f7480d2f --- /dev/null +++ b/tests/queries/0_stateless/03915_exchange_tables_race.sh @@ -0,0 +1,27 @@ +#!/usr/bin/env bash +# Tags: no-ordinary-database + +set -e + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +${CLICKHOUSE_CLIENT} --multiquery <&1 | grep LOGICAL_ERROR) & + ${CLICKHOUSE_CLIENT} --query "EXCHANGE TABLES tbl_03007_1 AND tbl_03007_2" & +done + +wait + +${CLICKHOUSE_CLIENT} --multiquery < __table1.y String : 0 + INPUT : 1 -> __table1.x UInt32 : 1 + INPUT : 2 -> __table1.version UInt32 : 2 + ALIAS __table1.y :: 0 -> y String : 3 + ALIAS __table1.x :: 1 -> x UInt32 : 0 + ALIAS __table1.version :: 2 -> version UInt32 : 1 +Positions: 0 3 1 + Sorting (Sorting for ORDER BY) + Prefix sort description: __table1.x ASC + Result sort description: __table1.x ASC + Expression ((Before ORDER BY + (Projection + Change column names to column identifiers))) + Actions: INPUT : 0 -> y String : 0 + INPUT : 1 -> x UInt32 : 1 + INPUT : 2 -> version UInt32 : 2 + ALIAS y :: 0 -> __table1.y String : 3 + ALIAS x :: 1 -> __table1.x UInt32 : 0 + ALIAS version :: 2 -> __table1.version UInt32 : 1 + Positions: 0 3 1 + ReadFromMergeTree (default.tab) + ReadType: InOrder + Parts: 3 + Granules: 3 + Prewhere info + Need filter: 1 + Prewhere filter + Prewhere filter column: notEquals(y, \'ccc\'_String) (removed) + Actions: INPUT : 0 -> y String : 0 + COLUMN Const(String) -> \'ccc\'_String String : 1 + FUNCTION notEquals(y : 0, \'ccc\'_String :: 1) -> notEquals(y, \'ccc\'_String) UInt8 : 2 + Positions: 2 0 + Row level filter + Row level filter column: notEquals(y, \'ccc\'_String) (removed) + Actions: INPUT : 0 -> y String : 0 + COLUMN Const(String) -> \'ccc\'_String String : 1 + FUNCTION notEquals(y : 0, \'ccc\'_String :: 1) -> notEquals(y, \'ccc\'_String) UInt8 : 2 + Positions: 2 0 + Deferred filters (applied after FINAL) + Deferred row level filter column: notEquals(y, \'ccc\'_String) + Deferred prewhere filter column: notEquals(y, \'ccc\'_String) += full plan: nothing deferred = +Expression (Project names) +Actions: INPUT : 0 -> __table1.y String : 0 + INPUT : 1 -> __table1.x UInt32 : 1 + INPUT : 2 -> __table1.version UInt32 : 2 + ALIAS __table1.y :: 0 -> y String : 3 + ALIAS __table1.x :: 1 -> x UInt32 : 0 + ALIAS __table1.version :: 2 -> version UInt32 : 1 +Positions: 0 3 1 + Sorting (Sorting for ORDER BY) + Prefix sort description: __table1.x ASC + Result sort description: __table1.x ASC + Expression ((Before ORDER BY + (Projection + Change column names to column identifiers))) + Actions: INPUT : 0 -> y String : 0 + INPUT : 1 -> x UInt32 : 1 + INPUT : 2 -> version UInt32 : 2 + ALIAS y :: 0 -> __table1.y String : 3 + ALIAS x :: 1 -> __table1.x UInt32 : 0 + ALIAS version :: 2 -> __table1.version UInt32 : 1 + Positions: 0 3 1 + ReadFromMergeTree (default.tab) + ReadType: InOrder + Parts: 3 + Granules: 3 + Prewhere info + Need filter: 1 + Prewhere filter + Prewhere filter column: notEquals(y, \'ccc\'_String) (removed) + Actions: INPUT : 0 -> y String : 0 + COLUMN Const(String) -> \'ccc\'_String String : 1 + FUNCTION notEquals(y : 0, \'ccc\'_String :: 1) -> notEquals(y, \'ccc\'_String) UInt8 : 2 + Positions: 2 0 + Row level filter + Row level filter column: notEquals(y, \'ccc\'_String) (removed) + Actions: INPUT : 0 -> y String : 0 + COLUMN Const(String) -> \'ccc\'_String String : 1 + FUNCTION notEquals(y : 0, \'ccc\'_String :: 1) -> notEquals(y, \'ccc\'_String) UInt8 : 2 + Positions: 2 0 += row policy deferred = + Deferred filters (applied after FINAL) + Deferred row level filter column: notEquals(y, \'ccc\'_String) += row policy not deferred = += prewhere deferred = + Deferred filters (applied after FINAL) + Deferred prewhere filter column: notEquals(y, \'ccc\'_String) += prewhere not deferred = += both deferred = + Deferred filters (applied after FINAL) + Deferred row level filter column: notEquals(y, \'ccc\'_String) + Deferred prewhere filter column: notEquals(y, \'ccc\'_String) += row policy on non-sorting-key defers prewhere = + Deferred filters (applied after FINAL) + Deferred row level filter column: notEquals(y, \'ccc\'_String) + Deferred prewhere filter column: notEquals(y, \'ccc\'_String) += no FINAL - no deferred = diff --git a/tests/queries/0_stateless/03928_deferred_filters_after_final_in_explain.sql b/tests/queries/0_stateless/03928_deferred_filters_after_final_in_explain.sql new file mode 100644 index 000000000000..a22db44ecfb7 --- /dev/null +++ b/tests/queries/0_stateless/03928_deferred_filters_after_final_in_explain.sql @@ -0,0 +1,47 @@ +-- Tags: no-parallel +-- test that EXPLAIN shows deferred filter information for apply_prewhere_after_final / apply_row_policy_after_final + +DROP TABLE IF EXISTS tab; +DROP ROW POLICY IF EXISTS pol1 ON tab; + +CREATE TABLE tab (x UInt32, y String, version UInt32) ENGINE = ReplacingMergeTree(version) ORDER BY x; + +INSERT INTO tab SELECT 1, 'aaa', 1; +INSERT INTO tab SELECT 2, 'bbb', 1; +INSERT INTO tab SELECT 1, 'ccc', 2; + +CREATE ROW POLICY pol1 ON tab USING y != 'ccc' TO ALL; + +SET enable_analyzer = 1; + +SELECT '= full plan: both deferred ='; +EXPLAIN actions=1 SELECT * FROM tab FINAL PREWHERE y != 'ccc' ORDER BY x +SETTINGS apply_row_policy_after_final=1, apply_prewhere_after_final=1, optimize_read_in_order=1; + +SELECT '= full plan: nothing deferred ='; +EXPLAIN actions=1 SELECT * FROM tab FINAL PREWHERE y != 'ccc' ORDER BY x +SETTINGS apply_row_policy_after_final=0, apply_prewhere_after_final=0, optimize_read_in_order=1; + +SELECT '= row policy deferred ='; +SELECT explain FROM (EXPLAIN actions=1 SELECT * FROM tab FINAL ORDER BY x SETTINGS apply_row_policy_after_final=1, apply_prewhere_after_final=0) WHERE explain LIKE '%Deferred%'; + +SELECT '= row policy not deferred ='; +SELECT explain FROM (EXPLAIN actions=1 SELECT * FROM tab FINAL ORDER BY x SETTINGS apply_row_policy_after_final=0, apply_prewhere_after_final=0) WHERE explain LIKE '%Deferred%'; + +SELECT '= prewhere deferred ='; +SELECT explain FROM (EXPLAIN actions=1 SELECT * FROM tab FINAL PREWHERE y != 'ccc' ORDER BY x SETTINGS apply_prewhere_after_final=1, apply_row_policy_after_final=0) WHERE explain LIKE '%Deferred%'; + +SELECT '= prewhere not deferred ='; +SELECT explain FROM (EXPLAIN actions=1 SELECT * FROM tab FINAL PREWHERE y != 'ccc' ORDER BY x SETTINGS apply_prewhere_after_final=0, apply_row_policy_after_final=0) WHERE explain LIKE '%Deferred%'; + +SELECT '= both deferred ='; +SELECT explain FROM (EXPLAIN actions=1 SELECT * FROM tab FINAL PREWHERE y != 'ccc' ORDER BY x SETTINGS apply_row_policy_after_final=1, apply_prewhere_after_final=1) WHERE explain LIKE '%Deferred%'; + +SELECT '= row policy on non-sorting-key defers prewhere ='; +SELECT explain FROM (EXPLAIN actions=1 SELECT * FROM tab FINAL PREWHERE y != 'ccc' ORDER BY x SETTINGS apply_row_policy_after_final=1, apply_prewhere_after_final=0) WHERE explain LIKE '%Deferred%'; + +SELECT '= no FINAL - no deferred ='; +SELECT explain FROM (EXPLAIN actions=1 SELECT * FROM tab ORDER BY x SETTINGS apply_row_policy_after_final=1, apply_prewhere_after_final=0) WHERE explain LIKE '%Deferred%'; + +DROP ROW POLICY pol1 ON tab; +DROP TABLE tab; diff --git a/tests/queries/0_stateless/03928_isNull_nullable_tuple_subcolumn.reference b/tests/queries/0_stateless/03928_isNull_nullable_tuple_subcolumn.reference new file mode 100644 index 000000000000..223af8a37894 --- /dev/null +++ b/tests/queries/0_stateless/03928_isNull_nullable_tuple_subcolumn.reference @@ -0,0 +1,6 @@ +800 800 +200 200 +800 +200 +OK +0 diff --git a/tests/queries/0_stateless/03928_isNull_nullable_tuple_subcolumn.sql b/tests/queries/0_stateless/03928_isNull_nullable_tuple_subcolumn.sql new file mode 100644 index 000000000000..2789a04d108c --- /dev/null +++ b/tests/queries/0_stateless/03928_isNull_nullable_tuple_subcolumn.sql @@ -0,0 +1,58 @@ +-- Tags: no-ordinary-database + +-- Regression test for LOGICAL_ERROR: Bad cast from ColumnNullable to ColumnVector +-- The FunctionToSubcolumnsPass replaced isNull(col) with reading the .null subcolumn, +-- hardcoding its type as UInt8. But for Nullable(Tuple(... Nullable(T) ...)), +-- the .null subcolumn in storage is Nullable(UInt8), causing a type mismatch. + +SET allow_experimental_nullable_tuple_type = 1; + +DROP TABLE IF EXISTS t_nullable_tuple; + +CREATE TABLE t_nullable_tuple +( + `tup` Nullable(Tuple(u Nullable(UInt64), s Nullable(String))) +) +ENGINE = MergeTree +ORDER BY tuple() +SETTINGS ratio_of_defaults_for_sparse_serialization = 0, nullable_serialization_version = 'allow_sparse', min_bytes_for_wide_part = 0; + +INSERT INTO t_nullable_tuple SELECT if((number % 5) = 0, (number, toString(number)), NULL) FROM numbers(1000); + +-- These used to cause LOGICAL_ERROR in debug/sanitizer builds. +SELECT sum(toUInt64(isNull(tup.s))) AS null_s, sum(toUInt64(isNull(tup.u))) AS null_u FROM t_nullable_tuple FORMAT Null; +SELECT sum(toUInt64(isNotNull(tup.s))) AS notnull_s, sum(toUInt64(isNotNull(tup.u))) AS notnull_u FROM t_nullable_tuple FORMAT Null; + +-- Also test without inner Nullable — this exercises the normal optimization path +-- and should produce correct results (800 out of 1000 tuples are NULL). +DROP TABLE t_nullable_tuple; + +CREATE TABLE t_nullable_tuple +( + `tup` Nullable(Tuple(u UInt64, s String)) +) +ENGINE = MergeTree +ORDER BY tuple() +SETTINGS ratio_of_defaults_for_sparse_serialization = 0, nullable_serialization_version = 'allow_sparse', min_bytes_for_wide_part = 0; + +INSERT INTO t_nullable_tuple SELECT if((number % 5) = 0, (number, toString(number)), NULL) FROM numbers(1000); + +SELECT sum(toUInt64(isNull(tup.s))) AS null_s, sum(toUInt64(isNull(tup.u))) AS null_u FROM t_nullable_tuple; +SELECT sum(toUInt64(isNotNull(tup.s))) AS notnull_s, sum(toUInt64(isNotNull(tup.u))) AS notnull_u FROM t_nullable_tuple; +SELECT count() FROM t_nullable_tuple WHERE isNull(tup.s); +SELECT count() FROM t_nullable_tuple WHERE isNotNull(tup.s); + +SELECT 'OK'; + +DROP TABLE t_nullable_tuple; + +-- Simplified reproducer from https://github.com/ClickHouse/ClickHouse/pull/97582#issuecomment-3939227260 +DROP TABLE IF EXISTS t0; +CREATE TABLE t0 (c0 Nullable(Tuple(c1 Nullable(Int32)))) ENGINE = MergeTree ORDER BY tuple(); +INSERT INTO t0 VALUES ((1,)); + +SELECT count() +FROM t0 +WHERE c0.c1 IS NULL AND c0 IS NULL; + +DROP TABLE t0; diff --git a/tests/queries/0_stateless/03928_json_advanced_shared_data_bug.reference b/tests/queries/0_stateless/03928_json_advanced_shared_data_bug.reference new file mode 100644 index 000000000000..c0d2ee3f3b2a --- /dev/null +++ b/tests/queries/0_stateless/03928_json_advanced_shared_data_bug.reference @@ -0,0 +1,30 @@ +{"a":[{"b":42}]} +{"a":[{"b":42}]} +{"a":[{"b":42}]} +{"a":[{"b":42}]} +{"a":[{"b":42}]} +{"a":[{"b":42}]} +{"a":[{"b":42}]} +{"a":[{"b":42}]} +{"a":[{"b":42}]} +{"a":[{"b":42}]} +{} +{} +{} +{} +{} +{} +{} +{} +{} +{} +{"a":[{"b":42}]} +{"a":[{"b":42}]} +{"a":[{"b":42}]} +{"a":[{"b":42}]} +{"a":[{"b":42}]} +{"a":[{"b":42}]} +{"a":[{"b":42}]} +{"a":[{"b":42}]} +{"a":[{"b":42}]} +{"a":[{"b":42}]} diff --git a/tests/queries/0_stateless/03928_json_advanced_shared_data_bug.sql b/tests/queries/0_stateless/03928_json_advanced_shared_data_bug.sql new file mode 100644 index 000000000000..c54c8378ae3c --- /dev/null +++ b/tests/queries/0_stateless/03928_json_advanced_shared_data_bug.sql @@ -0,0 +1,16 @@ +DROP TABLE IF EXISTS test; + +CREATE TABLE test +( + `json` JSON(max_dynamic_paths = 1) +) +ENGINE = MergeTree +ORDER BY tuple() +SETTINGS min_bytes_for_wide_part = 1, min_rows_for_wide_part = 1, write_marks_for_substreams_in_compact_parts = 1, object_serialization_version = 'v3', object_shared_data_serialization_version = 'advanced', object_shared_data_serialization_version_for_zero_level_parts = 'advanced', object_shared_data_buckets_for_wide_part = 1, index_granularity = 100; + +INSERT INTO test SELECT multiIf(number < 10, '{"a" : [{"b" : 42}]}', number < 20, '{}', '{"a" : [{"b" : 42}]}') from numbers(30); + +SELECT * FROM test SETTINGS max_block_size=10; + +DROP TABLE test; + diff --git a/tests/queries/0_stateless/03977_rollup_lowcardinality_nullable_in_tuple.reference b/tests/queries/0_stateless/03977_rollup_lowcardinality_nullable_in_tuple.reference new file mode 100644 index 000000000000..e8183f05f5db --- /dev/null +++ b/tests/queries/0_stateless/03977_rollup_lowcardinality_nullable_in_tuple.reference @@ -0,0 +1,3 @@ +1 +1 +1 diff --git a/tests/queries/0_stateless/03977_rollup_lowcardinality_nullable_in_tuple.sql b/tests/queries/0_stateless/03977_rollup_lowcardinality_nullable_in_tuple.sql new file mode 100644 index 000000000000..8a45db9ac7c5 --- /dev/null +++ b/tests/queries/0_stateless/03977_rollup_lowcardinality_nullable_in_tuple.sql @@ -0,0 +1,16 @@ +-- Regression test: ROLLUP with LowCardinality(Nullable) inside Nullable(Tuple) caused +-- a logical error due to incorrect getSerializedValueSize in ColumnLowCardinality/ColumnUnique. +-- The serialized size didn't account for the null flag byte, causing key deserialization corruption. + +SET allow_experimental_nullable_tuple_type = 1; +SET allow_suspicious_low_cardinality_types = 1; +SET enable_analyzer = 1; + +DROP TABLE IF EXISTS t_rollup_lc_nullable; + +CREATE TABLE t_rollup_lc_nullable (value Nullable(Tuple(LowCardinality(Nullable(Int64))))) ENGINE = Memory; +INSERT INTO t_rollup_lc_nullable VALUES ((NULL)); + +SELECT 1 FROM t_rollup_lc_nullable GROUP BY value, 'foo' WITH ROLLUP; + +DROP TABLE t_rollup_lc_nullable; diff --git a/tests/queries/0_stateless/03983_inverse_dictionary_lookup_rbac.reference b/tests/queries/0_stateless/03983_inverse_dictionary_lookup_rbac.reference new file mode 100644 index 000000000000..aa47d0d46d47 --- /dev/null +++ b/tests/queries/0_stateless/03983_inverse_dictionary_lookup_rbac.reference @@ -0,0 +1,2 @@ +0 +0 diff --git a/tests/queries/0_stateless/03983_inverse_dictionary_lookup_rbac.sh b/tests/queries/0_stateless/03983_inverse_dictionary_lookup_rbac.sh new file mode 100755 index 000000000000..3ae909c78580 --- /dev/null +++ b/tests/queries/0_stateless/03983_inverse_dictionary_lookup_rbac.sh @@ -0,0 +1,67 @@ +#!/usr/bin/env bash + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +user="user_${CLICKHOUSE_TEST_UNIQUE_NAME}" +table_name="table_${CLICKHOUSE_TEST_UNIQUE_NAME}" +dict_name="dict_${CLICKHOUSE_TEST_UNIQUE_NAME}" +db_name="${CLICKHOUSE_DATABASE}" + +${CLICKHOUSE_CLIENT} -nm --query " + DROP USER IF EXISTS ${user}; + DROP TABLE IF EXISTS ${table_name} SYNC; + DROP DICTIONARY IF EXISTS ${dict_name}; + + CREATE TABLE ${table_name} + ( + id UInt64, + value_id UInt64 + ) + ENGINE = MergeTree + ORDER BY id; + + INSERT INTO ${table_name} VALUES (1, 1), (2, 2); + + CREATE DICTIONARY ${dict_name} + ( + id UInt64, + name String + ) + PRIMARY KEY id + SOURCE(NULL()) + LAYOUT(HASHED_ARRAY()) + LIFETIME(0); + + CREATE USER ${user} IDENTIFIED WITH no_password; + GRANT SELECT ON ${db_name}.${table_name} TO ${user}; + GRANT dictGet ON ${db_name}.${dict_name} TO ${user}; +" + +# Ensure user doesn't have CREATE TEMPORARY TABLE privileges. +${CLICKHOUSE_CLIENT} --user="${user}" -nm --query " + SELECT count() FROM dictionary('${db_name}.${dict_name}'); -- { serverError ACCESS_DENIED } +" + +# Baseline: optimization disabled should work. +${CLICKHOUSE_CLIENT} --user="${user}" -nm --query " + SELECT count() + FROM ${db_name}.${table_name} + WHERE dictGet('${db_name}.${dict_name}', 'name', value_id) = 'abc' + SETTINGS enable_analyzer = 1, optimize_inverse_dictionary_lookup = 0; +" + +# Regression check: with optimization enabled this used to fail with ACCESS_DENIED. +${CLICKHOUSE_CLIENT} --user="${user}" -nm --query " + SELECT count() + FROM ${db_name}.${table_name} + WHERE dictGet('${db_name}.${dict_name}', 'name', value_id) = 'abc' + SETTINGS enable_analyzer = 1, optimize_inverse_dictionary_lookup = 1; +" + +${CLICKHOUSE_CLIENT} -nm --query " + DROP USER IF EXISTS ${user}; + DROP TABLE IF EXISTS ${table_name} SYNC; + DROP DICTIONARY IF EXISTS ${dict_name}; +" diff --git a/tests/queries/0_stateless/03987_concat_variant_low_cardinality.reference b/tests/queries/0_stateless/03987_concat_variant_low_cardinality.reference new file mode 100644 index 000000000000..bcf190ed2048 --- /dev/null +++ b/tests/queries/0_stateless/03987_concat_variant_low_cardinality.reference @@ -0,0 +1,2 @@ +a[(1,2),3] +7[(1,100000000000000000000),0,2] diff --git a/tests/queries/0_stateless/03987_concat_variant_low_cardinality.sql b/tests/queries/0_stateless/03987_concat_variant_low_cardinality.sql new file mode 100644 index 000000000000..f7d619c83382 --- /dev/null +++ b/tests/queries/0_stateless/03987_concat_variant_low_cardinality.sql @@ -0,0 +1,13 @@ +-- Concat with Variant type containing LowCardinality caused a LOGICAL_ERROR +-- because convertToFullIfNeeded recursively stripped LowCardinality from inside the Variant column +-- while the type was not updated, creating a type/column mismatch in serialization. +-- https://s3.amazonaws.com/clickhouse-test-reports/json.html?PR=97581&sha=320e7c9d8876b04a971bd26214b9ac0ab433c250&name_0=PR&name_1=AST%20fuzzer%20%28amd_debug%29 + +SET allow_suspicious_low_cardinality_types = 1; +SET allow_suspicious_types_in_group_by = 1; +SET allow_not_comparable_types_in_comparison_functions = 1; +SET enable_analyzer = 1; + +SELECT concat('a', [(1, 2), toLowCardinality(3)]); +SELECT concat(7, [(1., 100000000000000000000.), isNull((NULL, 1048577)), toLowCardinality(toNullable(2))]); +SELECT DISTINCT [CAST('2', 'UInt64')] * 7 FROM numbers(5) WHERE equals(7, number) GROUP BY GROUPING SETS ((isNullable(toLowCardinality(2))), (concat(7, [(1., 100000000000000000000.), isNull((NULL, 1048577)), toLowCardinality(toNullable(2))])), (1)) QUALIFY isNotDistinctFrom(assumeNotNull(2), isNull(2)); diff --git a/tests/queries/0_stateless/03988_concat_with_separator_format_variant_low_cardinality.reference b/tests/queries/0_stateless/03988_concat_with_separator_format_variant_low_cardinality.reference new file mode 100644 index 000000000000..de24c39432b2 --- /dev/null +++ b/tests/queries/0_stateless/03988_concat_with_separator_format_variant_low_cardinality.reference @@ -0,0 +1,12 @@ +{0:(0,1),(4):5},x,y +{1:(1,1),(4):5},x,y +{2:(2,1),(4):5},x,y +[(0,2),3],x,y +[(1,2),3],x,y +[(2,2),3],x,y +{0:(0,1),(4):5} x y +{1:(1,1),(4):5} x y +{2:(2,1),(4):5} x y +[(0,2),3] x y +[(1,2),3] x y +[(2,2),3] x y diff --git a/tests/queries/0_stateless/03988_concat_with_separator_format_variant_low_cardinality.sql b/tests/queries/0_stateless/03988_concat_with_separator_format_variant_low_cardinality.sql new file mode 100644 index 000000000000..602a33828ca5 --- /dev/null +++ b/tests/queries/0_stateless/03988_concat_with_separator_format_variant_low_cardinality.sql @@ -0,0 +1,11 @@ +-- concatWithSeparator and format had the same bug as concat (fixed in #97654): +-- convertToFullIfNeeded recursively stripped LowCardinality from inside a Variant column +-- while the type was not updated, creating a type/column mismatch in serialization. + +SET allow_suspicious_low_cardinality_types = 1; + +SELECT concatWithSeparator(',', map(number, tuple(number, toLowCardinality(toUInt128(1))), tuple(4), 5), 'x', 'y') FROM numbers(3); +SELECT concatWithSeparator(',', [(number, 2), toLowCardinality(3)], 'x', 'y') FROM numbers(3); + +SELECT format('{} {} {}', map(number, tuple(number, toLowCardinality(toUInt128(1))), tuple(4), 5), 'x', 'y') FROM numbers(3); +SELECT format('{} {} {}', [(number, 2), toLowCardinality(3)], 'x', 'y') FROM numbers(3); diff --git a/tests/queries/0_stateless/03988_grace_hash_join_leftover_blocks.reference b/tests/queries/0_stateless/03988_grace_hash_join_leftover_blocks.reference new file mode 100644 index 000000000000..3894b6e08587 --- /dev/null +++ b/tests/queries/0_stateless/03988_grace_hash_join_leftover_blocks.reference @@ -0,0 +1,6 @@ +9 +9 +9 +9 +9 +9 diff --git a/tests/queries/0_stateless/03988_grace_hash_join_leftover_blocks.sql b/tests/queries/0_stateless/03988_grace_hash_join_leftover_blocks.sql new file mode 100644 index 000000000000..521ca6af3cf3 --- /dev/null +++ b/tests/queries/0_stateless/03988_grace_hash_join_leftover_blocks.sql @@ -0,0 +1,79 @@ +DROP TABLE IF EXISTS test; +DROP TABLE IF EXISTS test2; + +SET max_joined_block_size_rows = 5; +SET enable_analyzer = 1; + +CREATE TABLE test +( + c0 Int, + c1 Date +) +ENGINE = MergeTree() +ORDER BY (c1); + +INSERT INTO test (c0, c1) VALUES +(1,'1995-01-28'), +(1,'1995-01-29'), +(1,'1995-01-30'); + +CREATE TABLE test2 +( + c0 Int, + c1 Date +) +ENGINE = MergeTree() +ORDER BY (c0); + +INSERT INTO test2 (c1, c0) VALUES +('1992-12-14',1), +('1992-12-14',1), +('1989-05-06',1); + +SELECT + count() +FROM test +LEFT JOIN test2 + ON test.c0 = test2.c0 + AND test.c1 >= test2.c1 +SETTINGS join_algorithm='parallel_hash'; + +SELECT + count() +FROM test2 +LEFT JOIN test + ON test.c0 = test2.c0 + AND test.c1 >= test2.c1 +SETTINGS join_algorithm='grace_hash', grace_hash_join_initial_buckets = 2; + +SELECT + count() +FROM test +RIGHT JOIN test2 + ON test.c0 = test2.c0 + AND test.c1 >= test2.c1 +SETTINGS join_algorithm='parallel_hash'; + +SELECT + count() +FROM test2 +RIGHT JOIN test + ON test.c0 = test2.c0 + AND test.c1 >= test2.c1 +SETTINGS join_algorithm='grace_hash', grace_hash_join_initial_buckets = 2; + +SELECT + count() +FROM test +FULL JOIN test2 + ON test.c0 = test2.c0 + AND test.c1 >= test2.c1 +SETTINGS join_algorithm='parallel_hash'; + +SELECT + count() +FROM test2 +FULL JOIN test + ON test.c0 = test2.c0 + AND test.c1 >= test2.c1 +SETTINGS join_algorithm='grace_hash', grace_hash_join_initial_buckets = 2; \ No newline at end of file diff --git a/tests/queries/0_stateless/03988_zookeeper_send_receive_race.reference b/tests/queries/0_stateless/03988_zookeeper_send_receive_race.reference new file mode 100644 index 000000000000..d86bac9de59a --- /dev/null +++ b/tests/queries/0_stateless/03988_zookeeper_send_receive_race.reference @@ -0,0 +1 @@ +OK diff --git a/tests/queries/0_stateless/03988_zookeeper_send_receive_race.sh b/tests/queries/0_stateless/03988_zookeeper_send_receive_race.sh new file mode 100755 index 000000000000..8b4a8a3d92f4 --- /dev/null +++ b/tests/queries/0_stateless/03988_zookeeper_send_receive_race.sh @@ -0,0 +1,42 @@ +#!/usr/bin/env bash +# Tags: zookeeper, no-fasttest + +# Regression test for a data race in ZooKeeper client between sendThread and receiveThread. +# +# sendThread used to mutate the request (addRootPath, has_watch) AFTER copying it +# into the operations map, while receiveThread could concurrently read from the +# same shared request object via the operations map. This caused a data race on +# the request's path string (std::string reallocation during addRootPath vs +# concurrent getPath() read), leading to SIGBUS/use-after-free crashes. +# +# Under TSAN this test reliably detects the race before the fix. +# The key is to generate many concurrent ZooKeeper requests through the server's +# shared ZK session so sendThread and receiveThread are both actively working on +# the operations map at the same time. + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT -q " + DROP TABLE IF EXISTS t_zk_race; + CREATE TABLE t_zk_race (key UInt64) + ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/t_zk_race', 'r1') + ORDER BY key; +" + +ZK_PATH="/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/t_zk_race" + +# Flood the server's shared ZK connection with concurrent reads from +# system.zookeeper. Each SELECT issues ZK list/get requests that go through +# sendThread (addRootPath + operations map insert) and receiveThread +# (operations map read for timeout + response handling) on the same session. +# +# Use clickhouse-benchmark for maximum ZK operations/sec on a single session. +# --timelimit ensures the test runs long enough for TSAN to catch the race. +echo "SELECT count() FROM system.zookeeper WHERE path = '$ZK_PATH' FORMAT Null" | \ + ${CLICKHOUSE_BENCHMARK} --concurrency 30 --iterations 100000 --timelimit 10 2>&1 | grep -q "Executed" || true + +echo "OK" + +$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS t_zk_race" diff --git a/tests/queries/0_stateless/03989_set_low_cardinality_in_tuple.reference b/tests/queries/0_stateless/03989_set_low_cardinality_in_tuple.reference new file mode 100644 index 000000000000..b7596799bb84 --- /dev/null +++ b/tests/queries/0_stateless/03989_set_low_cardinality_in_tuple.reference @@ -0,0 +1,6 @@ +hello 1 +world 42 +simple +nullable 5 +kept 1 +multi 10 diff --git a/tests/queries/0_stateless/03989_set_low_cardinality_in_tuple.sql b/tests/queries/0_stateless/03989_set_low_cardinality_in_tuple.sql new file mode 100644 index 000000000000..2face437a129 --- /dev/null +++ b/tests/queries/0_stateless/03989_set_low_cardinality_in_tuple.sql @@ -0,0 +1,68 @@ +-- Regression test: LowCardinality inside Tuple in IN subquery caused LOGICAL_ERROR +-- because Set::setHeader only stripped top-level LowCardinality from set_elements_types +-- while convertToFullIfNeeded (recursive since #97493) stripped it from inner columns, +-- creating a column/type mismatch in KeyCondition::tryPrepareSetColumnsForIndex. +SET allow_suspicious_low_cardinality_types = 1; + +-- Exact reproducer from AST fuzzer CI report +DROP TABLE IF EXISTS table2__fuzz_36; +DROP TABLE IF EXISTS table1__fuzz_47; +CREATE TABLE table2__fuzz_36 (id1 Nullable(Date), id2 LowCardinality(Int8)) ENGINE = Memory AS SELECT 1, 1; +CREATE TABLE table1__fuzz_47 (id1 Nullable(Int8), id2 Decimal(76, 58)) ENGINE = MergeTree ORDER BY id1 SETTINGS allow_nullable_key = 1 AS SELECT 1, 1; +SELECT * FROM table1__fuzz_47 WHERE (id1, id2) IN (SELECT tuple(id2, id1) FROM table2__fuzz_36); -- { serverError ILLEGAL_COLUMN } +DROP TABLE table2__fuzz_36; +DROP TABLE table1__fuzz_47; + +-- LowCardinality(String) inside tuple from subquery +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +CREATE TABLE t2 (a LowCardinality(String), b Int32) ENGINE = Memory AS SELECT 'hello', 1; +CREATE TABLE t1 (a String, b Int32) ENGINE = MergeTree ORDER BY a AS SELECT 'hello', 1; +SELECT * FROM t1 WHERE (a, b) IN (SELECT tuple(a, b) FROM t2); +DROP TABLE t1; +DROP TABLE t2; + +-- Multiple LowCardinality columns inside tuple, composite key +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +CREATE TABLE t2 (a LowCardinality(String), b LowCardinality(Int32)) ENGINE = Memory AS SELECT 'world', 42; +CREATE TABLE t1 (a String, b Int32) ENGINE = MergeTree ORDER BY (a, b) AS SELECT 'world', 42; +SELECT * FROM t1 WHERE (a, b) IN (SELECT tuple(a, b) FROM t2); +DROP TABLE t1; +DROP TABLE t2; + +-- Simple LowCardinality column in IN subquery (non-tuple case) +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +CREATE TABLE t2 (a LowCardinality(String)) ENGINE = Memory AS SELECT 'simple'; +CREATE TABLE t1 (a String) ENGINE = MergeTree ORDER BY a AS SELECT 'simple'; +SELECT * FROM t1 WHERE a IN (SELECT a FROM t2); +DROP TABLE t1; +DROP TABLE t2; + +-- LowCardinality(Nullable) inside tuple +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +CREATE TABLE t2 (a LowCardinality(Nullable(String)), b Int32) ENGINE = Memory AS SELECT 'nullable', 5; +CREATE TABLE t1 (a Nullable(String), b Int32) ENGINE = MergeTree ORDER BY b AS SELECT 'nullable', 5; +SELECT * FROM t1 WHERE (a, b) IN (SELECT tuple(a, b) FROM t2); +DROP TABLE t1; +DROP TABLE t2; + +-- NOT IN with LowCardinality in tuple +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +CREATE TABLE t2 (a LowCardinality(String), b Int32) ENGINE = Memory AS SELECT 'excluded', 99; +CREATE TABLE t1 (a String, b Int32) ENGINE = MergeTree ORDER BY a AS SELECT 'kept', 1; +SELECT * FROM t1 WHERE (a, b) NOT IN (SELECT tuple(a, b) FROM t2); +DROP TABLE t1; +DROP TABLE t2; + +-- Non-tuple subquery with multiple LowCardinality columns +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +CREATE TABLE t2 (a LowCardinality(String), b Int32) ENGINE = Memory AS SELECT 'multi', 10; +CREATE TABLE t1 (a String, b Int32) ENGINE = MergeTree ORDER BY a AS SELECT 'multi', 10; +SELECT * FROM t1 WHERE (a, b) IN (SELECT a, b FROM t2); +DROP TABLE t1; +DROP TABLE t2; diff --git a/tests/queries/0_stateless/03990_set_dynamic_low_cardinality_join_runtime_filter.reference b/tests/queries/0_stateless/03990_set_dynamic_low_cardinality_join_runtime_filter.reference new file mode 100644 index 000000000000..257e563266b4 --- /dev/null +++ b/tests/queries/0_stateless/03990_set_dynamic_low_cardinality_join_runtime_filter.reference @@ -0,0 +1 @@ +102 diff --git a/tests/queries/0_stateless/03990_set_dynamic_low_cardinality_join_runtime_filter.sql b/tests/queries/0_stateless/03990_set_dynamic_low_cardinality_join_runtime_filter.sql new file mode 100644 index 000000000000..7987744f999f --- /dev/null +++ b/tests/queries/0_stateless/03990_set_dynamic_low_cardinality_join_runtime_filter.sql @@ -0,0 +1,17 @@ +-- Regression test: Dynamic column with LowCardinality variant and enable_join_runtime_filters +-- caused LOGICAL_ERROR "Bad cast from type DB::ColumnVector to DB::ColumnLowCardinality" +-- because convertToFullIfNeeded recursively stripped LowCardinality from Dynamic's internal +-- variant columns without updating variant_info type metadata, causing column/type mismatches +-- in Set::appendSetElements when serializing values into shared variant storage. +-- max_threads=1 is needed to reliably trigger the runtime filter code path. +-- https://github.com/ClickHouse/ClickHouse/issues/97847 +DROP TABLE IF EXISTS t0; +CREATE TABLE t0 (`c0` Dynamic(max_types=1)) ENGINE = MergeTree ORDER BY tuple(); +SYSTEM STOP MERGES t0; +INSERT INTO t0 SETTINGS allow_suspicious_low_cardinality_types=1 SELECT 'str_' || toString(number) FROM numbers(100); +INSERT INTO t0 SETTINGS allow_suspicious_low_cardinality_types=1 VALUES (1::LowCardinality(Int32)), (2::LowCardinality(Int32)); +SELECT count() FROM ( + SELECT t0.c0 FROM (SELECT NULL AS c0) AS v0 RIGHT JOIN t0 USING (c0) + SETTINGS allow_dynamic_type_in_join_keys=1, enable_join_runtime_filters=1, max_threads=1 +); +DROP TABLE t0; diff --git a/tests/queries/0_stateless/03991_set_variant_low_cardinality_in_global_in.reference b/tests/queries/0_stateless/03991_set_variant_low_cardinality_in_global_in.reference new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/queries/0_stateless/03991_set_variant_low_cardinality_in_global_in.sql b/tests/queries/0_stateless/03991_set_variant_low_cardinality_in_global_in.sql new file mode 100644 index 000000000000..b7fcbf4c8a73 --- /dev/null +++ b/tests/queries/0_stateless/03991_set_variant_low_cardinality_in_global_in.sql @@ -0,0 +1,12 @@ +-- Regression test: GLOBAL IN with UNION ALL that produces Variant containing LowCardinality +-- caused LOGICAL_ERROR "Bad cast from type DB::ColumnFixedString to DB::ColumnLowCardinality" +-- because convertToFullIfNeeded recursively stripped LowCardinality from ColumnVariant's +-- internal variant columns without updating the corresponding DataTypeVariant, creating +-- column/type position mismatches in KeyCondition::tryPrepareSetColumnsForIndex. +-- https://github.com/ClickHouse/ClickHouse/issues/97854 +SET enable_analyzer=1; +DROP TABLE IF EXISTS t_variant_lc; +CREATE TABLE t_variant_lc (`id` Decimal(76, 70), `value` Int128) ENGINE = MergeTree ORDER BY (id, value); +INSERT INTO t_variant_lc SELECT number, number FROM numbers(10); +SELECT id FROM t_variant_lc WHERE (value, id) GLOBAL IN (SELECT toFixedString(toLowCardinality('not a number'), 12), * UNION ALL SELECT DISTINCT toLowCardinality(5), toString(number) FROM numbers(5)); -- { serverError CANNOT_PARSE_TEXT } +DROP TABLE t_variant_lc; diff --git a/tests/queries/0_stateless/04003_array_join_in_filter_outer_to_inner_join.reference b/tests/queries/0_stateless/04003_array_join_in_filter_outer_to_inner_join.reference new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/queries/0_stateless/04003_array_join_in_filter_outer_to_inner_join.sql b/tests/queries/0_stateless/04003_array_join_in_filter_outer_to_inner_join.sql new file mode 100644 index 000000000000..3d63ae1a423d --- /dev/null +++ b/tests/queries/0_stateless/04003_array_join_in_filter_outer_to_inner_join.sql @@ -0,0 +1,19 @@ +-- Regression: segfault in executeActionForPartialResult when filter expression contains arrayJoin +-- and the convertOuterJoinToInnerJoin optimization tries to evaluate the filter with partial (null) arguments. + +SET enable_analyzer = 1; +SELECT DISTINCT + 2, + 1048575 +FROM numbers(1) AS l, + numbers(2, isZeroOrNull(assumeNotNull(1))) AS r +ANY INNER JOIN r AS alias37 ON equals(alias37.number, r.number) +RIGHT JOIN l AS alias44 ON equals(alias44.number, alias37.number) +ANY INNER JOIN alias44 AS alias48 ON equals(alias48.number, r.number) +ANY RIGHT JOIN r AS alias52 ON equals(alias52.number, alias37.number) +WHERE equals(isNull(toLowCardinality(toUInt128(2))), arrayJoin([*, 13, 13, 13, toNullable(13), 13])) +GROUP BY + materialize(1), + isNull(toUInt128(2)), + and(and(1048575, isZeroOrNull(1), isNullable(isNull(1))), materialize(13), isNull(toUInt256(materialize(2))), *, and(*, and(1, nan, isNull(isNull(1)), isZeroOrNull(1), 1048575), 13)) +WITH CUBE;