Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 65 additions & 20 deletions src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Planner/Utils.h>
#include <Processors/Sources/RemoteSource.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <QueryPipeline/narrowPipe.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/extractTableFunctionFromSelectQuery.h>
#include <Storages/buildQueryTreeForShard.h>
#include <Storages/StorageDistributed.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Planner/Utils.h>
Expand Down Expand Up @@ -115,11 +117,14 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext<SearcherVisito
using Base = InDepthQueryTreeVisitorWithContext<SearcherVisitor>;
using Base::Base;

explicit SearcherVisitor(std::unordered_set<QueryTreeNodeType> types_, ContextPtr context) : Base(context), types(types_) {}
explicit SearcherVisitor(std::unordered_set<QueryTreeNodeType> types_, size_t entry_, ContextPtr context)
: Base(context)
, types(types_)
, entry(entry_) {}

bool needChildVisit(QueryTreeNodePtr & /*parent*/, QueryTreeNodePtr & /*child*/)
{
return getSubqueryDepth() <= 2 && !passed_node;
return getSubqueryDepth() <= 2 && !passed_node && !current_entry;
}

void enterImpl(QueryTreeNodePtr & node)
Expand All @@ -130,13 +135,19 @@ class SearcherVisitor : public InDepthQueryTreeVisitorWithContext<SearcherVisito
auto node_type = node->getNodeType();

if (types.contains(node_type))
passed_node = node;
{
++current_entry;
if (current_entry == entry)
passed_node = node;
}
}

QueryTreeNodePtr getNode() const { return passed_node; }

private:
std::unordered_set<QueryTreeNodeType> types;
size_t entry;
size_t current_entry = 0;
QueryTreeNodePtr passed_node;
};

Expand Down Expand Up @@ -203,33 +214,42 @@ Converts
localtable as t
ON s3.key == t.key

to
to (object_storage_cluster_join_mode='local')

SELECT s3.c1, s3.c2, s3.key
FROM
s3Cluster(...) AS s3

or (object_storage_cluster_join_mode='global')

SELECT s3.c1, s3.c2, t.c3
FROM
s3Cluster(...) as s3
JOIN
values('key UInt32, data String', (1, 'one'), (2, 'two'), ...) as t
ON s3.key == t.key
*/
void IStorageCluster::updateQueryWithJoinToSendIfNeeded(
ASTPtr & query_to_send,
QueryTreeNodePtr query_tree,
SelectQueryInfo query_info,
const ContextPtr & context)
{
auto object_storage_cluster_join_mode = context->getSettingsRef()[Setting::object_storage_cluster_join_mode];
switch (object_storage_cluster_join_mode)
{
case ObjectStorageClusterJoinMode::LOCAL:
{
auto info = getQueryTreeInfo(query_tree, context);
auto info = getQueryTreeInfo(query_info.query_tree, context);

if (info.has_join || info.has_cross_join || info.has_local_columns_in_where)
{
auto modified_query_tree = query_tree->clone();
auto modified_query_tree = query_info.query_tree->clone();

SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context);
SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, 1, context);
left_table_expression_searcher.visit(modified_query_tree);
auto table_function_node = left_table_expression_searcher.getNode();
if (!table_function_node)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table function node");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find left table function node");

QueryTreeNodePtr query_tree_distributed;

Expand All @@ -242,7 +262,7 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded(
}
else if (info.has_cross_join)
{
SearcherVisitor join_searcher({QueryTreeNodeType::CROSS_JOIN}, context);
SearcherVisitor join_searcher({QueryTreeNodeType::CROSS_JOIN}, 1, context);
join_searcher.visit(modified_query_tree);
auto cross_join_node = join_searcher.getNode();
if (!cross_join_node)
Expand Down Expand Up @@ -297,8 +317,25 @@ void IStorageCluster::updateQueryWithJoinToSendIfNeeded(
return;
}
case ObjectStorageClusterJoinMode::GLOBAL:
// TODO
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "`Global` mode for `object_storage_cluster_join_mode` setting is unimplemented for now");
{
auto info = getQueryTreeInfo(query_info.query_tree, context);

if (info.has_join || info.has_cross_join || info.has_local_columns_in_where)
{
auto modified_query_tree = query_info.query_tree->clone();

rewriteJoinToGlobalJoin(modified_query_tree, context);
modified_query_tree = buildQueryTreeForShard(
query_info.planner_context,
modified_query_tree,
/*allow_global_join_for_right_table*/ true,
/*find_cross_join*/ true);
query_to_send = queryNodeToDistributedSelectQuery(modified_query_tree);
send_external_tables = true;
}

return;
}
case ObjectStorageClusterJoinMode::ALLOW: // Do nothing special
return;
}
Expand Down Expand Up @@ -336,7 +373,7 @@ void IStorageCluster::read(
SharedHeader sample_block;
ASTPtr query_to_send = query_info.query;

updateQueryWithJoinToSendIfNeeded(query_to_send, query_info.query_tree, context);
updateQueryWithJoinToSendIfNeeded(query_to_send, query_info, context);

if (settings[Setting::allow_experimental_analyzer])
{
Expand Down Expand Up @@ -374,6 +411,10 @@ void IStorageCluster::read(

auto this_ptr = std::static_pointer_cast<IStorageCluster>(shared_from_this());

std::optional<Tables> external_tables = std::nullopt;
if (send_external_tables && query_info.planner_context && query_info.planner_context->getMutableQueryContext())
external_tables = query_info.planner_context->getMutableQueryContext()->getExternalTables();

auto reading = std::make_unique<ReadFromCluster>(
column_names,
query_info,
Expand All @@ -384,7 +425,8 @@ void IStorageCluster::read(
std::move(query_to_send),
processed_stage,
cluster,
log);
log,
external_tables);

query_plan.addStep(std::move(reading));
}
Expand Down Expand Up @@ -502,7 +544,7 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const
new_context,
/*throttler=*/nullptr,
scalars,
Tables(),
external_tables.has_value() ? *external_tables : Tables(),
processed_stage,
nullptr,
RemoteQueryExecutor::Extension{.task_iterator = extension->task_iterator, .replica_info = std::move(replica_info)});
Expand Down Expand Up @@ -540,7 +582,7 @@ IStorageCluster::QueryTreeInfo IStorageCluster::getQueryTreeInfo(QueryTreeNodePt
info.has_cross_join = true;
}

SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, context);
SearcherVisitor left_table_expression_searcher({QueryTreeNodeType::TABLE, QueryTreeNodeType::TABLE_FUNCTION}, 1, context);
left_table_expression_searcher.visit(query_tree);
auto table_function_node = left_table_expression_searcher.getNode();
if (!table_function_node)
Expand Down Expand Up @@ -573,11 +615,14 @@ QueryProcessingStage::Enum IStorageCluster::getQueryProcessingStage(
{
if (!context->getSettingsRef()[Setting::allow_experimental_analyzer])
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=true");
"object_storage_cluster_join_mode!='allow' is not supported without allow_experimental_analyzer=false");
Comment on lines 617 to +618

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3 Badge Correct inverted analyzer requirement in error message

This branch is entered when allow_experimental_analyzer is disabled, but the message says the mode is unsupported without allow_experimental_analyzer=false. That inverts the required value and points users to the wrong configuration change when they hit this exception.

Useful? React with 👍 / 👎.


auto info = getQueryTreeInfo(query_info.query_tree, context);
if (info.has_join || info.has_cross_join || info.has_local_columns_in_where)
return QueryProcessingStage::Enum::FetchColumns;
if (object_storage_cluster_join_mode == ObjectStorageClusterJoinMode::LOCAL)
{
auto info = getQueryTreeInfo(query_info.query_tree, context);
if (info.has_join || info.has_cross_join || info.has_local_columns_in_where)
return QueryProcessingStage::Enum::FetchColumns;
}
}

/// Initiator executes query on remote node.
Expand Down
8 changes: 6 additions & 2 deletions src/Storages/IStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class IStorageCluster : public IStorage

protected:
virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {}
void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, QueryTreeNodePtr query_tree, const ContextPtr & context);
void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, SelectQueryInfo query_info, const ContextPtr & context);

virtual void updateConfigurationIfNeeded(ContextPtr /* context */) {}

Expand Down Expand Up @@ -108,6 +108,7 @@ class IStorageCluster : public IStorage

LoggerPtr log;
String cluster_name;
bool send_external_tables = false;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Keep external-table send flag scoped to a single read

send_external_tables is stored as mutable state on IStorageCluster, but it is set from one query path and never reset before subsequent reads. After a GLOBAL-mode query sets it, later queries on the same storage instance can still forward external tables even when rewrite is not active; and because this field is non-atomic shared state, concurrent reads can race on it. This creates cross-query state leakage and undefined behavior risk in production read concurrency.

Useful? React with 👍 / 👎.


struct QueryTreeInfo
{
Expand Down Expand Up @@ -137,7 +138,8 @@ class ReadFromCluster : public SourceStepWithFilter
ASTPtr query_to_send_,
QueryProcessingStage::Enum processed_stage_,
ClusterPtr cluster_,
LoggerPtr log_)
LoggerPtr log_,
std::optional<Tables> external_tables_)
: SourceStepWithFilter(
std::move(sample_block),
column_names_,
Expand All @@ -149,6 +151,7 @@ class ReadFromCluster : public SourceStepWithFilter
, processed_stage(processed_stage_)
, cluster(std::move(cluster_))
, log(log_)
, external_tables(external_tables_)
{
}

Expand All @@ -160,6 +163,7 @@ class ReadFromCluster : public SourceStepWithFilter
LoggerPtr log;

std::optional<RemoteQueryExecutor::Extension> extension;
std::optional<Tables> external_tables;

void createExtension(const ActionsDAG::Node * predicate);
ContextPtr updateSettings(const Settings & settings);
Expand Down
43 changes: 37 additions & 6 deletions src/Storages/buildQueryTreeForShard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ namespace Setting
extern const SettingsBool prefer_global_in_and_join;
extern const SettingsBool enable_add_distinct_to_in_subqueries;
extern const SettingsInt64 optimize_const_name_size;
extern const SettingsObjectStorageClusterJoinMode object_storage_cluster_join_mode;
}

namespace ErrorCodes
Expand Down Expand Up @@ -120,8 +121,9 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito
using Base = InDepthQueryTreeVisitorWithContext<DistributedProductModeRewriteInJoinVisitor>;
using Base::Base;

explicit DistributedProductModeRewriteInJoinVisitor(const ContextPtr & context_)
explicit DistributedProductModeRewriteInJoinVisitor(const ContextPtr & context_, bool find_cross_join_)
: Base(context_)
, find_cross_join(find_cross_join_)
{}

struct InFunctionOrJoin
Expand Down Expand Up @@ -157,9 +159,11 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito
{
auto * function_node = node->as<FunctionNode>();
auto * join_node = node->as<JoinNode>();
CrossJoinNode * cross_join_node = find_cross_join ? node->as<CrossJoinNode>() : nullptr;

if ((function_node && isNameOfGlobalInFunction(function_node->getFunctionName())) ||
(join_node && join_node->getLocality() == JoinLocality::Global))
(join_node && join_node->getLocality() == JoinLocality::Global) ||
cross_join_node)
{
InFunctionOrJoin in_function_or_join_entry;
in_function_or_join_entry.query_node = node;
Expand Down Expand Up @@ -223,7 +227,9 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito
replacement_table_expression->setTableExpressionModifiers(*table_expression_modifiers);
replacement_map.emplace(table_node.get(), std::move(replacement_table_expression));
}
else if ((distributed_product_mode == DistributedProductMode::GLOBAL || getSettings()[Setting::prefer_global_in_and_join]) &&
else if ((distributed_product_mode == DistributedProductMode::GLOBAL ||
getSettings()[Setting::prefer_global_in_and_join] ||
(find_cross_join && getSettings()[Setting::object_storage_cluster_join_mode] == ObjectStorageClusterJoinMode::GLOBAL)) &&
!in_function_or_join_stack.empty())
{
auto * in_or_join_node_to_modify = in_function_or_join_stack.back().query_node.get();
Expand Down Expand Up @@ -257,6 +263,8 @@ class DistributedProductModeRewriteInJoinVisitor : public InDepthQueryTreeVisito
std::vector<InFunctionOrJoin> in_function_or_join_stack;
std::unordered_map<const IQueryTreeNode *, QueryTreeNodePtr> replacement_map;
std::vector<InFunctionOrJoin> global_in_or_join_nodes;

bool find_cross_join = false;
};

/** Replaces large constant values with `__getScalar` function calls to avoid
Expand Down Expand Up @@ -504,14 +512,18 @@ QueryTreeNodePtr getSubqueryFromTableExpression(

}

QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_context, QueryTreeNodePtr query_tree_to_modify, bool allow_global_join_for_right_table)
QueryTreeNodePtr buildQueryTreeForShard(
const PlannerContextPtr & planner_context,
QueryTreeNodePtr query_tree_to_modify,
bool allow_global_join_for_right_table,
bool find_cross_join)
{
CollectColumnSourceToColumnsVisitor collect_column_source_to_columns_visitor;
collect_column_source_to_columns_visitor.visit(query_tree_to_modify);

const auto & column_source_to_columns = collect_column_source_to_columns_visitor.getColumnSourceToColumns();

DistributedProductModeRewriteInJoinVisitor visitor(planner_context->getQueryContext());
DistributedProductModeRewriteInJoinVisitor visitor(planner_context->getQueryContext(), find_cross_join);
visitor.visit(query_tree_to_modify);

auto replacement_map = visitor.getReplacementMap();
Expand Down Expand Up @@ -550,6 +562,24 @@ QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_contex
replacement_map.emplace(join_table_expression.get(), std::move(temporary_table_expression_node));
continue;
}
if (auto * cross_join_node = global_in_or_join_node.query_node->as<CrossJoinNode>())
{
auto tables_count = cross_join_node->getTableExpressions().size();
for (size_t i = 1; i < tables_count; ++i)
{
QueryTreeNodePtr join_table_expression = cross_join_node->getTableExpressions()[i];

auto subquery_node = getSubqueryFromTableExpression(join_table_expression, column_source_to_columns, planner_context->getQueryContext());

auto temporary_table_expression_node = executeSubqueryNode(subquery_node,
planner_context->getMutableQueryContext(),
global_in_or_join_node.subquery_depth);
temporary_table_expression_node->setAlias(join_table_expression->getAlias());

replacement_map.emplace(join_table_expression.get(), std::move(temporary_table_expression_node));
}
continue;
}
if (auto * in_function_node = global_in_or_join_node.query_node->as<FunctionNode>())
{
auto & in_function_subquery_node = in_function_node->getArguments().getNodes().at(1);
Expand Down Expand Up @@ -661,7 +691,8 @@ class RewriteJoinToGlobalJoinVisitor : public InDepthQueryTreeVisitorWithContext
{
if (auto * join_node = node->as<JoinNode>())
{
bool prefer_local_join = getContext()->getSettingsRef()[Setting::parallel_replicas_prefer_local_join];
bool prefer_local_join = getContext()->getSettingsRef()[Setting::parallel_replicas_prefer_local_join]
&& getContext()->getSettingsRef()[Setting::object_storage_cluster_join_mode] != ObjectStorageClusterJoinMode::GLOBAL;
bool should_use_global_join = !prefer_local_join || !allStoragesAreMergeTree(join_node->getRightTableExpression());
if (should_use_global_join)
join_node->setLocality(JoinLocality::Global);
Expand Down
6 changes: 5 additions & 1 deletion src/Storages/buildQueryTreeForShard.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ using PlannerContextPtr = std::shared_ptr<PlannerContext>;
class Context;
using ContextPtr = std::shared_ptr<const Context>;

QueryTreeNodePtr buildQueryTreeForShard(const PlannerContextPtr & planner_context, QueryTreeNodePtr query_tree_to_modify, bool allow_global_join_for_right_table);
QueryTreeNodePtr buildQueryTreeForShard(
const PlannerContextPtr & planner_context,
QueryTreeNodePtr query_tree_to_modify,
bool allow_global_join_for_right_table,
bool find_cross_join = false);

void rewriteJoinToGlobalJoin(QueryTreeNodePtr query_tree_to_modify, ContextPtr context);

Expand Down
Loading
Loading