diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java index 015ae25b6b984..ca6fbd6f390a8 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java @@ -82,7 +82,6 @@ import org.apache.ignite.internal.processors.query.calcite.exec.TimeoutService; import org.apache.ignite.internal.processors.query.calcite.exec.TimeoutServiceImpl; import org.apache.ignite.internal.processors.query.calcite.exec.exp.RexExecutorImpl; -import org.apache.ignite.internal.processors.query.calcite.exec.task.AbstractQueryTaskExecutor; import org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlockingTaskExecutor; import org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor; import org.apache.ignite.internal.processors.query.calcite.hint.HintsConfig; @@ -272,9 +271,6 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query /** */ private final InjectResourcesService injectSvc; - /** */ - private final AtomicBoolean udfQryWarned = new AtomicBoolean(); - /** */ private volatile boolean started; @@ -545,8 +541,6 @@ private List parseAndProcessQuery( ) throws IgniteSQLException { ensureTransactionModeSupported(qryCtx); - checkUdfQuery(); - SchemaPlus schema = schemaHolder.schema(schemaName); assert schema != null : "Schema not found: " + schemaName; @@ -704,29 +698,6 @@ private void ensureTransactionModeSupported(@Nullable QueryContext qryCtx) { IgniteTxManager.ensureTransactionModeSupported(ctx.cache().context().tm().tx(ver).isolation()); } - /** Checks that query is initiated by UDF and print message to log if needed. */ - private void checkUdfQuery() { - if (udfQryWarned.get()) - return; - - if (Thread.currentThread().getName().startsWith(AbstractQueryTaskExecutor.THREAD_PREFIX) - && udfQryWarned.compareAndSet(false, true)) { - if (taskExecutor instanceof QueryBlockingTaskExecutor) { - log.info("Detected query initiated by user-defined function. " + - "In some circumstances, this can lead to thread pool starvation and deadlock. Ensure that " + - "the pool size is properly configured (property IgniteConfiguration.QueryThreadPoolSize). " + - "The pool size should be greater than the maximum number of concurrent queries initiated by UDFs."); - } - else { - log.warning("Detected query initiated by user-defined function. " + - "When a striped query task executor (the default configuration) is used, tasks for such queries " + - "can be assigned to the same thread as that held by the initial query, which can lead to a " + - "deadlock. To switch to a blocking tasks executor, set the following parameter: " + - "-DIGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR=true."); - } - } - } - /** */ private T processQuery( @Nullable QueryContext qryCtx, diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java index 497520f96ef05..24e3f8002c0f4 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; import org.apache.calcite.plan.Context; @@ -66,6 +67,8 @@ import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox; import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node; import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox; +import org.apache.ignite.internal.processors.query.calcite.exec.task.AbstractQueryTaskExecutor; +import org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlockingTaskExecutor; import org.apache.ignite.internal.processors.query.calcite.exec.tracker.GlobalMemoryTracker; import org.apache.ignite.internal.processors.query.calcite.exec.tracker.IoTracker; import org.apache.ignite.internal.processors.query.calcite.exec.tracker.MemoryTracker; @@ -202,6 +205,9 @@ public class ExecutionServiceImpl extends AbstractService implements Execut /** */ private InjectResourcesService injectSvc; + /** Limit for nested queries, initiated by UDF. */ + private final AtomicInteger udfQryLimit = new AtomicInteger(); + /** */ private final Map fragmentPlanCache = new GridBoundedConcurrentLinkedHashMap<>(1024); @@ -465,6 +471,8 @@ public void injectService(InjectResourcesService injectSvc) { memoryTracker = cfg.getGlobalMemoryQuota() > 0 ? new GlobalMemoryTracker(cfg.getGlobalMemoryQuota()) : NoOpMemoryTracker.INSTANCE; + udfQryLimit.set(ctx.config().getQueryThreadPoolSize() - 1); + init(); } @@ -574,6 +582,38 @@ private FieldsQueryCursor> executeDdl(RootQuery qry, DdlPlan plan) } } + /** + * Checks that query is initiated by UDF. + * + * @return {@code True} if query is initiated by UDF (in this case UDF query limit is affected). + * @throws IgniteSQLException If query execution can lead to deadlocks. + */ + private boolean checkUdfQuery() { + if (Thread.currentThread().getName().startsWith(AbstractQueryTaskExecutor.THREAD_PREFIX)) { + if (taskExecutor instanceof QueryBlockingTaskExecutor) { + if (udfQryLimit.getAndDecrement() <= 0) { + udfQryLimit.getAndIncrement(); + + throw new IgniteSQLException("Detected thread pool starvation by queries initiated by " + + "user-defined functions. Starting more queries from UDF will lead to deadlock. Ensure that " + + "the pool size is properly configured (property IgniteConfiguration.QueryThreadPoolSize). " + + "The pool size should be greater than the maximum number of concurrent queries initiated by UDFs."); + } + + return true; + } + else { + throw new IgniteSQLException("Detected query initiated by user-defined function. " + + "When a striped query task executor (the default configuration) is used, tasks for such queries " + + "can be assigned to the same thread as that held by the initial query, which can lead to a " + + "deadlock. To avoid deadlocks switch to a blocking tasks executor (set the parameter: " + + "-DIGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR=true)"); + } + } + + return false; + } + /** */ private ListFieldsQueryCursor mapAndExecutePlan( RootQuery qry, @@ -594,207 +634,218 @@ private ListFieldsQueryCursor mapAndExecutePlan( checkPermissions(fragment.root()); } - // Local execution - Fragment fragment = F.first(fragments); + boolean udfQry = checkUdfQuery(); + + try { + // Local execution + Fragment fragment = F.first(fragments); - if (U.assertionsEnabled()) { - assert fragment != null; + if (U.assertionsEnabled()) { + assert fragment != null; - FragmentMapping mapping = execPlan.mapping(fragment); + FragmentMapping mapping = execPlan.mapping(fragment); - assert mapping != null; + assert mapping != null; - List nodes = mapping.nodeIds(); + List nodes = mapping.nodeIds(); - assert nodes != null && (nodes.size() == 1 && F.first(nodes).equals(localNodeId()) || nodes.isEmpty()) + assert nodes != null && (nodes.size() == 1 && F.first(nodes).equals(localNodeId()) || nodes.isEmpty()) : "nodes=" + nodes + ", localNode=" + localNodeId(); - } + } - long timeout = qry.remainingTime(); + long timeout = qry.remainingTime(); - if (timeout == 0) { - throw new IgniteSQLException("The query was cancelled due to timeout", IgniteQueryErrorCode.QUERY_CANCELED, - new QueryCancelledException()); - } + if (timeout == 0) { + throw new IgniteSQLException("The query was cancelled due to timeout", IgniteQueryErrorCode.QUERY_CANCELED, + new QueryCancelledException()); + } - FragmentDescription fragmentDesc = new FragmentDescription( - fragment.fragmentId(), - execPlan.mapping(fragment), - execPlan.target(fragment), - execPlan.remotes(fragment)); - - MemoryTracker qryMemoryTracker = qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota()); - - final GridNearTxLocal userTx = Commons.queryTransaction(qry.context(), ctx.cache().context()); - - ExecutionContext ectx = new ExecutionContext<>( - qry.context(), - taskExecutor(), - injectSvc, - qry.id(), - locNodeId, - locNodeId, - mapCtx.topologyVersion(), - fragmentDesc, - handler, - qryMemoryTracker, - createIoTracker(locNodeId, qry.localQueryId()), - timeout, - qryParams, - userTx == null ? null : ExecutionContext.transactionChanges(userTx.writeEntries())); - - Node node = new LogicalRelImplementor<>(ectx, partitionService(), mailboxRegistry(), - exchangeService(), failureProcessor()).go(fragment.root()); - - qry.run(ectx, execPlan, plan.fieldsMetadata(), node); - - Map fragmentsPerNode = fragments.stream() - .skip(1) - .flatMap(f -> f.mapping().nodeIds().stream()) - .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); - - // Start remote execution. - for (int i = 1; i < fragments.size(); i++) { - fragment = fragments.get(i); - fragmentDesc = new FragmentDescription( + FragmentDescription fragmentDesc = new FragmentDescription( fragment.fragmentId(), execPlan.mapping(fragment), execPlan.target(fragment), execPlan.remotes(fragment)); - Throwable ex = null; - byte[] parametersMarshalled = null; - - for (UUID nodeId : fragmentDesc.nodeIds()) { - if (ex != null) - qry.onResponse(nodeId, fragment.fragmentId(), ex); - else { - try { - SessionContextImpl sesCtx = qry.context().unwrap(SessionContextImpl.class); - - QueryProperties props = qry.context().unwrap(QueryProperties.class); - boolean keepBinaryMode = props == null || props.keepBinary(); - - QueryStartRequest req = new QueryStartRequest( - qry.id(), - qry.localQueryId(), - qry.context().schemaName(), - fragment.serialized(), - ectx.topologyVersion(), - fragmentDesc, - fragmentsPerNode.get(nodeId).intValue(), - qry.parameters(), - parametersMarshalled, - timeout, - ectx.getQryTxEntries(), - sesCtx == null ? null : sesCtx.attributes(), - keepBinaryMode - ); - - messageService().send(nodeId, req); - - // Avoid marshaling of the same parameters for other nodes. - if (parametersMarshalled == null) - parametersMarshalled = req.parametersMarshalled(); - } - catch (Throwable e) { - qry.onResponse(nodeId, fragment.fragmentId(), ex = e); + MemoryTracker qryMemoryTracker = qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota()); + + final GridNearTxLocal userTx = Commons.queryTransaction(qry.context(), ctx.cache().context()); + + ExecutionContext ectx = new ExecutionContext<>( + qry.context(), + taskExecutor(), + injectSvc, + qry.id(), + locNodeId, + locNodeId, + mapCtx.topologyVersion(), + fragmentDesc, + handler, + qryMemoryTracker, + createIoTracker(locNodeId, qry.localQueryId()), + timeout, + qryParams, + userTx == null ? null : ExecutionContext.transactionChanges(userTx.writeEntries())); + + Node node = new LogicalRelImplementor<>(ectx, partitionService(), mailboxRegistry(), + exchangeService(), failureProcessor()).go(fragment.root()); + + qry.run(ectx, execPlan, plan.fieldsMetadata(), node); + + Map fragmentsPerNode = fragments.stream() + .skip(1) + .flatMap(f -> f.mapping().nodeIds().stream()) + .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); + + QueryProperties qryProps = qry.context().unwrap(QueryProperties.class); + boolean keepBinary = qryProps == null || qryProps.keepBinary(); + + // Start remote execution. + for (int i = 1; i < fragments.size(); i++) { + fragment = fragments.get(i); + fragmentDesc = new FragmentDescription( + fragment.fragmentId(), + execPlan.mapping(fragment), + execPlan.target(fragment), + execPlan.remotes(fragment)); + + Throwable ex = null; + byte[] parametersMarshalled = null; + + for (UUID nodeId : fragmentDesc.nodeIds()) { + if (ex != null) + qry.onResponse(nodeId, fragment.fragmentId(), ex); + else { + try { + SessionContextImpl sesCtx = qry.context().unwrap(SessionContextImpl.class); + + QueryStartRequest req = new QueryStartRequest( + qry.id(), + qry.localQueryId(), + qry.context().schemaName(), + fragment.serialized(), + ectx.topologyVersion(), + fragmentDesc, + fragmentsPerNode.get(nodeId).intValue(), + qry.parameters(), + parametersMarshalled, + timeout, + ectx.getQryTxEntries(), + sesCtx == null ? null : sesCtx.attributes(), + keepBinary + ); + + messageService().send(nodeId, req); + + // Avoid marshaling of the same parameters for other nodes. + if (parametersMarshalled == null) + parametersMarshalled = req.parametersMarshalled(); + } + catch (Throwable e) { + qry.onResponse(nodeId, fragment.fragmentId(), ex = e); + } } } } - } - - if (perfStatProc.enabled()) { - perfStatProc.queryProperty( - GridCacheQueryType.SQL_FIELDS, - qry.initiatorNodeId(), - qry.localQueryId(), - "Query plan", - plan.textPlan() - ); - } - if (ctx.query().runningQueryManager().planHistoryTracker().enabled()) { - ctx.query().runningQueryManager().planHistoryTracker().addPlan( - plan.textPlan(), - qry.sql(), - qry.context().schemaName(), - qry.context().isLocal(), - CalciteQueryEngineConfiguration.ENGINE_NAME - ); - } + if (perfStatProc.enabled()) { + perfStatProc.queryProperty( + GridCacheQueryType.SQL_FIELDS, + qry.initiatorNodeId(), + qry.localQueryId(), + "Query plan", + plan.textPlan() + ); + } - QueryProperties qryProps = qry.context().unwrap(QueryProperties.class); + if (ctx.query().runningQueryManager().planHistoryTracker().enabled()) { + ctx.query().runningQueryManager().planHistoryTracker().addPlan( + plan.textPlan(), + qry.sql(), + qry.context().schemaName(), + qry.context().isLocal(), + CalciteQueryEngineConfiguration.ENGINE_NAME + ); + } - Function fieldConverter = (qryProps == null || qryProps.keepBinary()) ? null : - o -> CacheObjectUtils.unwrapBinaryIfNeeded(objValCtx, o, false, true, null); + Function fieldConverter = keepBinary ? null : + o -> CacheObjectUtils.unwrapBinaryIfNeeded(objValCtx, o, false, true, null); + + HeavyQueriesTracker.ResultSetChecker resultSetChecker = ctx.query().runningQueryManager() + .heavyQueriesTracker().resultSetChecker(qry); + + Function, List> rowConverter; + + // Fire EVT_CACHE_QUERY_OBJECT_READ on initiator node before return result to cursor. + if (qryProps != null && qryProps.cacheName() != null && evtMgr.isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) { + ClusterNode locNode = ctx.discovery().localNode(); + UUID subjId = SecurityUtils.securitySubjectId(ctx); + + rowConverter = row -> { + evtMgr.record(new CacheQueryReadEvent<>( + locNode, + "SQL fields query result set row read.", + EVT_CACHE_QUERY_OBJECT_READ, + CacheQueryType.SQL_FIELDS.name(), + qryProps.cacheName(), + null, + qry.sql(), + null, + null, + qry.parameters(), + subjId, + null, + null, + null, + null, + row)); + + resultSetChecker.checkOnFetchNext(); + + return row; + }; + } + else { + rowConverter = row -> { + resultSetChecker.checkOnFetchNext(); - HeavyQueriesTracker.ResultSetChecker resultSetChecker = ctx.query().runningQueryManager() - .heavyQueriesTracker().resultSetChecker(qry); + return row; + }; + } - Function, List> rowConverter; + Runnable onClose = () -> { + if (udfQry) // Restore UDF queries limit. + udfQryLimit.getAndIncrement(); + + if (perfStatProc.enabled()) { + perfStatProc.queryRowsProcessed( + GridCacheQueryType.SQL_FIELDS, + qry.initiatorNodeId(), + qry.localQueryId(), + "Fetched", + resultSetChecker.fetchedSize() + ); + } - // Fire EVT_CACHE_QUERY_OBJECT_READ on initiator node before return result to cursor. - if (qryProps != null && qryProps.cacheName() != null && evtMgr.isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) { - ClusterNode locNode = ctx.discovery().localNode(); - UUID subjId = SecurityUtils.securitySubjectId(ctx); + resultSetChecker.checkOnClose(); + }; - rowConverter = row -> { - evtMgr.record(new CacheQueryReadEvent<>( - locNode, - "SQL fields query result set row read.", - EVT_CACHE_QUERY_OBJECT_READ, - CacheQueryType.SQL_FIELDS.name(), - qryProps.cacheName(), - null, - qry.sql(), - null, - null, - qry.parameters(), - subjId, - null, - null, - null, - null, - row)); + Iterator> it = iteratorsHolder().iterator(new ConvertingClosableIterator<>(qry.iterator(), ectx, + fieldConverter, rowConverter, onClose)); - resultSetChecker.checkOnFetchNext(); + // Make yet another tracking layer for cursor.getAll(), so tracking hierarchy will look like: + // Row tracker -> Cursor memory tracker -> Query memory tracker -> Global memory tracker. + // It's required, since query memory tracker can be closed concurrently during getAll() and + // tracked data for cursor can be lost without additional tracker. + MemoryTracker curMemoryTracker = QueryMemoryTracker.create(qryMemoryTracker, cfg.getQueryMemoryQuota()); - return row; - }; + return new ListFieldsQueryCursor<>(plan, it, ectx, curMemoryTracker); } - else { - rowConverter = row -> { - resultSetChecker.checkOnFetchNext(); + catch (Exception e) { + if (udfQry) // Restore UDF queries limit. + udfQryLimit.getAndIncrement(); - return row; - }; + throw e; } - - Runnable onClose = () -> { - if (perfStatProc.enabled()) { - perfStatProc.queryRowsProcessed( - GridCacheQueryType.SQL_FIELDS, - qry.initiatorNodeId(), - qry.localQueryId(), - "Fetched", - resultSetChecker.fetchedSize() - ); - } - - resultSetChecker.checkOnClose(); - }; - - Iterator> it = new ConvertingClosableIterator<>(iteratorsHolder().iterator(qry.iterator()), ectx, - fieldConverter, rowConverter, onClose); - - // Make yet another tracking layer for cursor.getAll(), so tracking hierarchy will look like: - // Row tracker -> Cursor memory tracker -> Query memory tracker -> Global memory tracker. - // It's required, since query memory tracker can be closed concurrently during getAll() and - // tracked data for cursor can be lost without additional tracker. - MemoryTracker curMemoryTracker = QueryMemoryTracker.create(qryMemoryTracker, cfg.getQueryMemoryQuota()); - - return new ListFieldsQueryCursor<>(plan, it, ectx, curMemoryTracker); } /** */ diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java index 7e64f3735759e..f60fb59014d8c 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java @@ -174,7 +174,7 @@ protected QueryContext queryContext() { } /** - * Asserts that executeSql throws an exception. + * Asserts that query throws an exception. * * @param sql Query. * @param cls Exception class. @@ -184,6 +184,18 @@ protected void assertThrows(String sql, Class cls, String m assertThrowsAnyCause(log, () -> sql(sql, args), cls, msg); } + /** + * Asserts that query throws an exception. + * + * @param ignite Ignite instance. + * @param sql Query. + * @param cls Exception class. + * @param msg Error message. + */ + protected void assertThrows(IgniteEx ignite, String sql, Class cls, String msg, Object... args) { + assertThrowsAnyCause(log, () -> sql(ignite, sql, args), cls, msg); + } + /** */ protected void createAndPopulateTable() { createAndPopulateTable(client, 2, CacheMode.PARTITIONED); diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java index dc72fa9b314dc..5eb2f511ec0a8 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java @@ -22,6 +22,7 @@ import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.Statement; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -68,11 +69,11 @@ import org.apache.ignite.internal.processors.query.calcite.exec.task.AbstractQueryTaskExecutor; import org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlockingTaskExecutor; import org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor; -import org.apache.ignite.internal.processors.query.running.GridRunningQueryInfo; import org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker; import org.apache.ignite.internal.util.GridTestClockTimer; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.metric.MetricRegistry; import org.apache.ignite.spi.metric.LongMetric; @@ -103,6 +104,8 @@ import static org.apache.ignite.internal.processors.query.running.RunningQueryManager.SQL_QRY_HIST_VIEW; import static org.apache.ignite.internal.processors.query.running.RunningQueryManager.SQL_USER_QUERIES_REG_NAME; import static org.apache.ignite.internal.util.lang.GridFunc.first; +import static org.apache.ignite.testframework.GridTestUtils.runAsync; +import static org.apache.ignite.testframework.GridTestUtils.runMultiThreaded; import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; /** @@ -119,7 +122,7 @@ public class SqlDiagnosticIntegrationTest extends AbstractBasicIntegrationTest { private static final int BIG_RESULT_SET_THRESHOLD = 10_000; /** */ - private static final int POOL_SIZE = 2; + private static final int POOL_SIZE = 5; /** */ private ListeningTestLogger log; @@ -921,73 +924,72 @@ private void checkStarvation() throws Exception { /** */ @Test - public void testUdfQueryWarningStripedExecutor() throws Exception { - assertTrue(queryProcessor(grid(0)).taskExecutor() instanceof StripedQueryTaskExecutor); + public void testUdfQueryDeadlockDetectionStripedExecutor() throws Exception { + IgniteEx ignite = grid(0); + + assertTrue(queryProcessor(ignite).taskExecutor() instanceof StripedQueryTaskExecutor); + + client.getOrCreateCache(new CacheConfiguration(DEFAULT_CACHE_NAME) + .setSqlFunctionClasses(FunctionsLibrary.class) + .setSqlSchema("PUBLIC") + ); + + // Expect message with tips about switching to query blocking task executor. + String expMsg = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR + "=true"; - checkUdfQueryWarning("-DIGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR=true"); + // Check that error is thrown for UDF initiated query. + assertThrows(ignite, "SELECT innerSql(?, ?, ?)", IgniteSQLException.class, + expMsg, ignite.name(), DEFAULT_CACHE_NAME, "SELECT 'Test'"); } /** */ @Test @WithSystemProperty(key = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR, value = "true") - public void testUdfQueryWarningBlockingExecutor() throws Exception { - assertTrue(queryProcessor(grid(0)).taskExecutor() instanceof QueryBlockingTaskExecutor); + public void testUdfQueryDeadlockDetectionBlockingExecutor() throws Exception { + IgniteEx ignite = grid(0); - checkUdfQueryWarning("IgniteConfiguration.QueryThreadPoolSize"); - } + assertTrue(queryProcessor(ignite).taskExecutor() instanceof QueryBlockingTaskExecutor); - /** */ - private void checkUdfQueryWarning(String tipsMsg) throws Exception { client.getOrCreateCache(new CacheConfiguration(DEFAULT_CACHE_NAME) .setSqlFunctionClasses(FunctionsLibrary.class) .setSqlSchema("PUBLIC") ); - LogListener logLsnr1 = LogListener.matches("Detected query initiated by user-defined function.").build(); - LogListener logLsnr2 = LogListener.matches(tipsMsg).build(); - - log.registerListener(logLsnr1); - log.registerListener(logLsnr2); - - // Check that message is not printed for regular query. - sql(grid(0), "SELECT ?", "Test"); - - assertFalse(logLsnr1.check()); - assertFalse(logLsnr2.check()); - - // Check that message is printed for UDF initiated query. - IgniteInternalFuture fut = GridTestUtils.runAsync(() -> sql(grid(0), "SELECT innerSql(?, ?, ?)", - grid(0).name(), DEFAULT_CACHE_NAME, "Test")); - - assertTrue(logLsnr1.check(1_000L)); - assertTrue(logLsnr2.check()); - - cancelAllQueriesAndWaitForCompletion(grid(0), fut); + FunctionsLibrary.latch = new CountDownLatch(POOL_SIZE); - // Check that message is printed only once. - logLsnr1.reset(); - logLsnr2.reset(); + List>>> futs = new ArrayList<>(POOL_SIZE); - fut = GridTestUtils.runAsync(() -> sql(grid(0), "SELECT innerSql(?, ?, ?)", - grid(0).name(), DEFAULT_CACHE_NAME, "Test")); + for (int i = 0; i < POOL_SIZE; i++) { + futs.add(runAsync(() -> sql(ignite, + "SELECT countDownLatch(), waitLatch(1000), innerSql(?, ?, ?)", + ignite.name(), DEFAULT_CACHE_NAME, "SELECT cast(sleep(500) AS varchar)"))); + } - assertFalse(logLsnr1.check(1_000L)); - assertFalse(logLsnr2.check()); + // Expect message with tips about query pool size. + String expMsg = "IgniteConfiguration.QueryThreadPoolSize"; + boolean errFound = false; - cancelAllQueriesAndWaitForCompletion(grid(0), fut); - } + // Check that concurrent inner queries allow to occupy all thread pool except one thread. + for (IgniteInternalFuture>> fut : futs) { + try { + assertEquals(F.asList(true, true, "TRUE"), fut.get(5_000L).get(0)); + } + catch (Exception e) { + assertTrue("Unexpected error: " + e, X.hasCause(e, expMsg, IgniteSQLException.class)); + assertFalse(errFound); + errFound = true; + } + } - /** */ - private void cancelAllQueriesAndWaitForCompletion(IgniteEx ignite, IgniteInternalFuture qryFut) { - ignite.context().query().runningQueryManager().runningSqlQueries().forEach(GridRunningQueryInfo::cancel); + assertTrue(errFound); - try { - // Wait for future completion, it can be successful or unsuccessful. - qryFut.get(); - } - catch (Exception ignore) { - // No-op. - } + // Check that POOL_SIZE - 1 concurrent inner queries can't block the execution. + runMultiThreaded(() -> { + for (int i = 0; i < 1000; i++) { + assertEquals("Test", sql("SELECT innerSql(?, ?, ?)", + ignite.name(), DEFAULT_CACHE_NAME, "SELECT 'Test'").get(0).get(0)); + } + }, POOL_SIZE - 1, "async-sql"); } /** Verifies that user-defined query initiator ID is present in the SQL_QUERY_HISTORY system view and logs. */ @@ -1125,6 +1127,14 @@ public static boolean waitLatch(long time) { return true; } + /** */ + @QuerySqlFunction + public static boolean countDownLatch() { + latch.countDown(); + + return true; + } + /** */ @QuerySqlFunction public static boolean sleep(int sleep) { @@ -1137,10 +1147,10 @@ public static boolean sleep(int sleep) { /** */ @QuerySqlFunction - public static String innerSql(String ignite, String cache, String val) { + public static String innerSql(String ignite, String cache, String sql) { return (String)Ignition.ignite(ignite) .cache(cache) - .query(new SqlFieldsQuery("SELECT ?").setArgs(val)) + .query(new SqlFieldsQuery(sql)) .getAll().get(0).get(0); } }