/*------------------------------------------------------------------------- * * recursive_planning.c * * Logic for calling the postgres planner recursively for CTEs and * non-pushdownable subqueries in distributed queries. * * PostgreSQL with Citus can execute 4 types of queries: * * - Postgres queries on local tables and functions. * * These queries can use all SQL features, but they may not reference * distributed tables. * * - Router queries that can be executed on a single by node by replacing * table names with shard names. * * These queries can use nearly all SQL features, but only if they have * a single-valued filter on the distribution column. * * - Multi-shard queries that can be executed by performing a task for each * shard in a distributed table and performing a merge step. * * These queries have limited SQL support. They may only include * subqueries if the subquery can be executed on each shard by replacing * table names with shard names and concatenating the result. * * These queries have very limited SQL support and only support basic * inner joins and subqueries without joins. * * To work around the limitations of these planners, we recursively call * the planner for CTEs and unsupported subqueries to obtain a list of * subplans. * * During execution, each subplan is executed separately through the method * that is appropriate for that query. The results are written to temporary * files on the workers. In the original query, the CTEs and subqueries are * replaced by mini-subqueries that read from the temporary files. * * This allows almost all SQL to be directly or indirectly supported, * because if all subqueries that contain distributed tables have been * replaced then what remains is a router query which can use nearly all * SQL features. * * Copyright (c) Citus Data, Inc. *------------------------------------------------------------------------- */ #include "postgres.h" #include "distributed/pg_version_constants.h" #include "funcapi.h" #include "catalog/pg_type.h" #include "catalog/pg_class.h" #include "distributed/citus_nodes.h" #include "distributed/citus_ruleutils.h" #include "distributed/commands/multi_copy.h" #include "distributed/distributed_planner.h" #include "distributed/errormessage.h" #include "distributed/local_distributed_join_planner.h" #include "distributed/listutils.h" #include "distributed/log_utils.h" #include "distributed/metadata_cache.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_server_executor.h" #include "distributed/query_colocation_checker.h" #include "distributed/query_pushdown_planning.h" #include "distributed/recursive_planning.h" #include "distributed/relation_restriction_equivalence.h" #include "distributed/log_utils.h" #include "distributed/shard_pruning.h" #include "distributed/version_compat.h" #include "lib/stringinfo.h" #include "optimizer/clauses.h" #include "optimizer/optimizer.h" #include "optimizer/planner.h" #include "optimizer/prep.h" #include "parser/parsetree.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" #include "nodes/nodes.h" #include "nodes/nodeFuncs.h" #include "nodes/pg_list.h" #include "nodes/primnodes.h" #include "nodes/pathnodes.h" #include "utils/builtins.h" #include "utils/guc.h" #include "utils/lsyscache.h" /* * RecursivePlanningContext is used to recursively plan subqueries * and CTEs, pull results to the coordinator, and push it back into * the workers. */ struct RecursivePlanningContextInternal { int level; uint64 planId; bool allDistributionKeysInQueryAreEqual; /* used for some optimizations */ List *subPlanList; PlannerRestrictionContext *plannerRestrictionContext; }; /* track depth of current recursive planner query */ static int recursivePlanningDepth = 0; /* * CteReferenceWalkerContext is used to collect CTE references in * CteReferenceListWalker. */ typedef struct CteReferenceWalkerContext { int level; List *cteReferenceList; } CteReferenceWalkerContext; /* * VarLevelsUpWalkerContext is used to find Vars in a (sub)query that * refer to upper levels and therefore cannot be planned separately. */ typedef struct VarLevelsUpWalkerContext { int level; } VarLevelsUpWalkerContext; /* local function forward declarations */ static DeferredErrorMessage * RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext * context); static bool ShouldRecursivelyPlanNonColocatedSubqueries(Query *subquery, RecursivePlanningContext * context); static bool ContainsSubquery(Query *query); static bool ShouldRecursivelyPlanOuterJoins(RecursivePlanningContext *context); static void RecursivelyPlanNonColocatedSubqueries(Query *subquery, RecursivePlanningContext *context); static void RecursivelyPlanNonColocatedJoinWalker(Node *joinNode, ColocatedJoinChecker * colocatedJoinChecker, RecursivePlanningContext * recursivePlanningContext); static void RecursivelyPlanNonColocatedSubqueriesInWhere(Query *query, ColocatedJoinChecker * colocatedJoinChecker, RecursivePlanningContext * recursivePlanningContext); static bool RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query, RecursivePlanningContext *context); static void RecursivelyPlanDistributedJoinNode(Node *node, Query *query, RecursivePlanningContext *context); static bool IsRTERefRecurring(RangeTblRef *rangeTableRef, Query *query); static List * SublinkListFromWhere(Query *originalQuery); static bool ExtractSublinkWalker(Node *node, List **sublinkList); static bool ShouldRecursivelyPlanSublinks(Query *query); static bool RecursivelyPlanAllSubqueries(Node *node, RecursivePlanningContext *planningContext); static DeferredErrorMessage * RecursivelyPlanCTEs(Query *query, RecursivePlanningContext *context); static bool RecursivelyPlanSubqueryWalker(Node *node, RecursivePlanningContext *context); static bool ShouldRecursivelyPlanSubquery(Query *subquery, RecursivePlanningContext *context); static bool AllDistributionKeysInSubqueryAreEqual(Query *subquery, PlannerRestrictionContext * restrictionContext); static bool ShouldRecursivelyPlanSetOperation(Query *query, RecursivePlanningContext *context); static bool RecursivelyPlanSubquery(Query *subquery, RecursivePlanningContext *planningContext); static void RecursivelyPlanSetOperations(Query *query, Node *node, RecursivePlanningContext *context); static bool IsLocalTableRteOrMatView(Node *node); static DistributedSubPlan * CreateDistributedSubPlan(uint32 subPlanId, Query *subPlanQuery); static bool CteReferenceListWalker(Node *node, CteReferenceWalkerContext *context); static bool ContainsReferencesToOuterQueryWalker(Node *node, VarLevelsUpWalkerContext *context); static bool NodeContainsSubqueryReferencingOuterQuery(Node *node); static void WrapFunctionsInSubqueries(Query *query); static void TransformFunctionRTE(RangeTblEntry *rangeTblEntry); static bool ShouldTransformRTE(RangeTblEntry *rangeTableEntry); static Query * BuildReadIntermediateResultsQuery(List *targetEntryList, List *columnAliasList, Const *resultIdConst, Oid functionOid, bool useBinaryCopyFormat); static void UpdateVarNosInNode(Node *node, Index newVarNo); static Query * CreateOuterSubquery(RangeTblEntry *rangeTableEntry, List *outerSubqueryTargetList); static List * GenerateRequiredColNamesFromTargetList(List *targetList); static char * GetRelationNameAndAliasName(RangeTblEntry *rangeTablentry); /* * GenerateSubplansForSubqueriesAndCTEs is a wrapper around RecursivelyPlanSubqueriesAndCTEs. * The function returns the subplans if necessary. For the details of when/how subplans are * generated, see RecursivelyPlanSubqueriesAndCTEs(). * * Note that the input originalQuery query is modified if any subplans are generated. */ List * GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *originalQuery, PlannerRestrictionContext *plannerRestrictionContext) { RecursivePlanningContext context; recursivePlanningDepth++; /* * Plan subqueries and CTEs that cannot be pushed down by recursively * calling the planner and add the resulting plans to subPlanList. */ context.level = 0; context.planId = planId; context.subPlanList = NIL; context.plannerRestrictionContext = plannerRestrictionContext; /* * Calculating the distribution key equality upfront is a trade-off for us. * * When the originalQuery contains the distribution key equality, we'd be * able to skip further checks for each lower level subqueries (i.e., if the * all query contains distribution key equality, each subquery also contains * distribution key equality.) * * When the originalQuery doesn't contain the distribution key equality, * calculating this wouldn't help us at all, we should individually check * each each subquery and subquery joins among subqueries. */ context.allDistributionKeysInQueryAreEqual = AllDistributionKeysInQueryAreEqual(originalQuery, plannerRestrictionContext); DeferredErrorMessage *error = RecursivelyPlanSubqueriesAndCTEs(originalQuery, &context); if (error != NULL) { recursivePlanningDepth--; RaiseDeferredError(error, ERROR); } if (context.subPlanList && IsLoggableLevel(DEBUG1)) { StringInfo subPlanString = makeStringInfo(); pg_get_query_def(originalQuery, subPlanString); ereport(DEBUG1, (errmsg( "Plan " UINT64_FORMAT " query after replacing subqueries and CTEs: %s", planId, subPlanString->data))); } recursivePlanningDepth--; return context.subPlanList; } /* * RecursivelyPlanSubqueriesAndCTEs finds subqueries and CTEs that cannot be pushed down to * workers directly and instead plans them by recursively calling the planner and * adding the subplan to subPlanList. * * Subplans are executed prior to the distributed plan and the results are written * to temporary files on workers. * * CTE references are replaced by a subquery on the read_intermediate_result * function, which reads from the temporary file. * * If recursive planning results in an error then the error is returned. Otherwise, the * subplans will be added to subPlanList. */ static DeferredErrorMessage * RecursivelyPlanSubqueriesAndCTEs(Query *query, RecursivePlanningContext *context) { DeferredErrorMessage *error = RecursivelyPlanCTEs(query, context); if (error != NULL) { return error; } if (SubqueryPushdown) { /* * When the subquery_pushdown flag is enabled we make some hacks * to push down subqueries with LIMIT. Recursive planning would * valiantly do the right thing and try to recursively plan the * inner subqueries, but we don't really want it to because those * subqueries might not be supported and would be much slower. * * Instead, we skip recursive planning altogether when * subquery_pushdown is enabled. */ return NULL; } /* make sure function calls in joins are executed in the coordinator */ WrapFunctionsInSubqueries(query); /* descend into subqueries */ query_tree_walker(query, RecursivelyPlanSubqueryWalker, context, 0); /* * At this point, all CTEs, leaf subqueries containing local tables and * non-pushdownable subqueries have been replaced. We now check for * combinations of subqueries that cannot be pushed down (e.g. * UNION ). * * This code also runs for the top-level query, which allows us to support * top-level set operations. */ if (ShouldRecursivelyPlanSetOperation(query, context)) { RecursivelyPlanSetOperations(query, (Node *) query->setOperations, context); } if (query->havingQual != NULL) { if (NodeContainsSubqueryReferencingOuterQuery(query->havingQual)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "Subqueries in HAVING cannot refer to outer query", NULL, NULL); } RecursivelyPlanAllSubqueries(query->havingQual, context); } /* * If the query doesn't have distribution key equality, * recursively plan some of its subqueries. */ if (ShouldRecursivelyPlanNonColocatedSubqueries(query, context)) { RecursivelyPlanNonColocatedSubqueries(query, context); } if (ShouldConvertLocalTableJoinsToSubqueries(query->rtable)) { /* * Logical planner cannot handle "local_table" [OUTER] JOIN "dist_table", or * a query with local table/citus local table and subquery. We convert local/citus local * tables to a subquery until they can be planned. */ RecursivelyPlanLocalTableJoins(query, context); } /* * Similarly, logical planner cannot handle outer joins when the outer rel * is recurring, such as " LEFT JOIN ". In that case, * we convert distributed table into a subquery and recursively plan inner * side of the outer join. That way, inner rel gets converted into an intermediate * result and logical planner can handle the new query since it's of the from * " LEFT JOIN ". */ if (ShouldRecursivelyPlanOuterJoins(context)) { RecursivelyPlanRecurringTupleOuterJoinWalker((Node *) query->jointree, query, context); } /* * If the FROM clause is recurring (does not contain a distributed table), * then we cannot have any distributed tables appearing in subqueries in * the SELECT and WHERE clauses. * * We do the sublink conversations at the end of the recursive planning * because earlier steps might have transformed the query into a * shape that needs recursively planning the sublinks. */ if (ShouldRecursivelyPlanSublinks(query)) { /* replace all subqueries in the WHERE clause */ if (query->jointree && query->jointree->quals) { RecursivelyPlanAllSubqueries((Node *) query->jointree->quals, context); } /* replace all subqueries in the SELECT clause */ RecursivelyPlanAllSubqueries((Node *) query->targetList, context); } return NULL; } /* * GetPlannerRestrictionContext returns the planner restriction context * from the given context. */ PlannerRestrictionContext * GetPlannerRestrictionContext(RecursivePlanningContext *recursivePlanningContext) { return recursivePlanningContext->plannerRestrictionContext; } /* * ShouldRecursivelyPlanNonColocatedSubqueries returns true if the input query contains joins * that are not on the distribution key. * * * Note that at the point that this function is called, we've already recursively planned all * the leaf subqueries. Thus, we're actually checking whether the joins among the subqueries * on the distribution key or not. */ static bool ShouldRecursivelyPlanNonColocatedSubqueries(Query *subquery, RecursivePlanningContext *context) { /* * If the input query already contains the equality, simply return since it is not * possible to find any non colocated subqueries. */ if (context->allDistributionKeysInQueryAreEqual) { return false; } /* * This check helps us in two ways: * (i) We're not targeting queries that don't include subqueries at all, * they should go through regular planning. * (ii) Lower level subqueries are already recursively planned, so we should * only bother non-colocated subquery joins, which only happens when * there are subqueries. */ if (!ContainsSubquery(subquery)) { return false; } /* direct joins with local tables are not supported by any of Citus planners */ if (FindNodeMatchingCheckFunctionInRangeTableList(subquery->rtable, IsLocalTableRteOrMatView)) { return false; } /* * Finally, check whether this subquery contains distribution key equality or not. */ if (!AllDistributionKeysInSubqueryAreEqual(subquery, context->plannerRestrictionContext)) { return true; } return false; } /* * ContainsSubquery returns true if the input query contains any subqueries * in the FROM or WHERE clauses. */ static bool ContainsSubquery(Query *query) { return JoinTreeContainsSubquery(query) || WhereOrHavingClauseContainsSubquery(query); } /* * ShouldRecursivelyPlanOuterJoins returns true if the JoinRestrictionContext * that given RecursivePlanningContext holds implies that the query has outer * join(s) that might need to be recursively planned. */ static bool ShouldRecursivelyPlanOuterJoins(RecursivePlanningContext *context) { if (!context || !context->plannerRestrictionContext || !context->plannerRestrictionContext->joinRestrictionContext) { ereport(ERROR, (errmsg("unexpectedly got NULL pointer in recursive " "planning context"))); } return context->plannerRestrictionContext->joinRestrictionContext->hasOuterJoin; } /* * RecursivelyPlanNonColocatedSubqueries gets a query which includes one or more * other subqueries that are not joined on their distribution keys. The function * tries to recursively plan some of the subqueries to make the input query * executable by Citus. * * The function picks an anchor subquery and iterates on the remaining subqueries. * Whenever it finds a non colocated subquery with the anchor subquery, the function * decides to recursively plan the non colocated subquery. * * The function first handles subqueries in FROM clause (i.e., jointree->fromlist) and then * subqueries in WHERE clause (i.e., jointree->quals). * * The function does not treat outer joins seperately. Thus, we might end up with * a query where the function decides to recursively plan an outer side of an outer * join (i.e., LEFT side of LEFT JOIN). For simplicity, we chose to do so and handle * outer joins with a seperate pass on the join tree. */ static void RecursivelyPlanNonColocatedSubqueries(Query *subquery, RecursivePlanningContext *context) { FromExpr *joinTree = subquery->jointree; /* create the context for the non colocated subquery planning */ PlannerRestrictionContext *restrictionContext = context->plannerRestrictionContext; ColocatedJoinChecker colocatedJoinChecker = CreateColocatedJoinChecker(subquery, restrictionContext); /* * Although this is a rare case, we weren't able to pick an anchor * range table entry, so we cannot continue. */ if (colocatedJoinChecker.anchorRelationRestrictionList == NIL) { return; } /* handle from clause subqueries first */ RecursivelyPlanNonColocatedJoinWalker((Node *) joinTree, &colocatedJoinChecker, context); /* handle subqueries in WHERE clause */ RecursivelyPlanNonColocatedSubqueriesInWhere(subquery, &colocatedJoinChecker, context); } /* * RecursivelyPlanNonColocatedJoinWalker gets a join node and walks over it to find * subqueries that live under the node. * * When a subquery found, it's checked whether the subquery is colocated with the * anchor subquery specified in the nonColocatedJoinContext. If not, * the subquery is recursively planned. */ static void RecursivelyPlanNonColocatedJoinWalker(Node *joinNode, ColocatedJoinChecker *colocatedJoinChecker, RecursivePlanningContext *recursivePlanningContext) { if (joinNode == NULL) { return; } else if (IsA(joinNode, FromExpr)) { FromExpr *fromExpr = (FromExpr *) joinNode; ListCell *fromExprCell; /* * For each element of the from list, check whether the element is * colocated with the anchor subquery by recursing until we * find the subqueries. */ foreach(fromExprCell, fromExpr->fromlist) { Node *fromElement = (Node *) lfirst(fromExprCell); RecursivelyPlanNonColocatedJoinWalker(fromElement, colocatedJoinChecker, recursivePlanningContext); } } else if (IsA(joinNode, JoinExpr)) { JoinExpr *joinExpr = (JoinExpr *) joinNode; /* recurse into the left subtree */ RecursivelyPlanNonColocatedJoinWalker(joinExpr->larg, colocatedJoinChecker, recursivePlanningContext); /* recurse into the right subtree */ RecursivelyPlanNonColocatedJoinWalker(joinExpr->rarg, colocatedJoinChecker, recursivePlanningContext); } else if (IsA(joinNode, RangeTblRef)) { int rangeTableIndex = ((RangeTblRef *) joinNode)->rtindex; List *rangeTableList = colocatedJoinChecker->subquery->rtable; RangeTblEntry *rte = rt_fetch(rangeTableIndex, rangeTableList); /* we're only interested in subqueries for now */ if (rte->rtekind != RTE_SUBQUERY) { return; } /* * If the subquery is not colocated with the anchor subquery, * recursively plan it. */ Query *subquery = rte->subquery; if (!SubqueryColocated(subquery, colocatedJoinChecker)) { RecursivelyPlanSubquery(subquery, recursivePlanningContext); } } else { pg_unreachable(); } } /* * RecursivelyPlanNonColocatedSubqueriesInWhere gets a query and walks over its * sublinks to find subqueries that live in WHERE clause. * * When a subquery found, it's checked whether the subquery is colocated with the * anchor subquery specified in the nonColocatedJoinContext. If not, * the subquery is recursively planned. */ static void RecursivelyPlanNonColocatedSubqueriesInWhere(Query *query, ColocatedJoinChecker *colocatedJoinChecker, RecursivePlanningContext * recursivePlanningContext) { List *sublinkList = SublinkListFromWhere(query); ListCell *sublinkCell = NULL; foreach(sublinkCell, sublinkList) { SubLink *sublink = (SubLink *) lfirst(sublinkCell); Query *subselect = (Query *) sublink->subselect; /* subselect is probably never NULL, but anyway lets keep the check */ if (subselect == NULL) { continue; } if (!SubqueryColocated(subselect, colocatedJoinChecker)) { RecursivelyPlanSubquery(subselect, recursivePlanningContext); } } } /* * RecursivelyPlanRecurringTupleOuterJoinWalker descends into a join tree and * recursively plans all non-recurring (i.e., distributed) rels that that * participate in an outer join expression together with a recurring rel, * such as in " LEFT JOIN ", i.e., * where the recurring rel causes returning recurring tuples from the worker * nodes. * * Returns true if given node is recurring. * * See RecursivelyPlanDistributedJoinNode() function for the explanation on * what does it mean for a node to be "recurring" or "distributed". */ static bool RecursivelyPlanRecurringTupleOuterJoinWalker(Node *node, Query *query, RecursivePlanningContext * recursivePlanningContext) { if (node == NULL) { return false; } else if (IsA(node, FromExpr)) { FromExpr *fromExpr = (FromExpr *) node; ListCell *fromExprCell; /* search for join trees in each FROM element */ foreach(fromExprCell, fromExpr->fromlist) { Node *fromElement = (Node *) lfirst(fromExprCell); RecursivelyPlanRecurringTupleOuterJoinWalker(fromElement, query, recursivePlanningContext); } /* * Can only appear during the top-level call and top-level callers * are not interested in the return value. Even more, we can't tell * whether a FromExpr is recurring or not. */ return false; } else if (IsA(node, JoinExpr)) { JoinExpr *joinExpr = (JoinExpr *) node; Node *leftNode = joinExpr->larg; Node *rightNode = joinExpr->rarg; /* * There may be recursively plannable outer joins deeper in the join tree. * * We first handle the sub join trees and then the top level one since the * top level join expression might not require recursive planning after * handling the sub join trees. */ bool leftNodeRecurs = RecursivelyPlanRecurringTupleOuterJoinWalker(leftNode, query, recursivePlanningContext); bool rightNodeRecurs = RecursivelyPlanRecurringTupleOuterJoinWalker(rightNode, query, recursivePlanningContext); switch (joinExpr->jointype) { case JOIN_LEFT: { /* left join */ if (leftNodeRecurs && !rightNodeRecurs) { ereport(DEBUG1, (errmsg("recursively planning right side of " "the left join since the outer side " "is a recurring rel"))); RecursivelyPlanDistributedJoinNode(rightNode, query, recursivePlanningContext); } /* * A LEFT JOIN is recurring if the lhs is recurring. * Note that we might have converted the rhs into a recurring * one too if the lhs is recurring, but this anyway has no * effects when deciding whether a LEFT JOIN is recurring. */ return leftNodeRecurs; } case JOIN_RIGHT: { /* right join */ if (!leftNodeRecurs && rightNodeRecurs) { ereport(DEBUG1, (errmsg("recursively planning left side of " "the right join since the outer side " "is a recurring rel"))); RecursivelyPlanDistributedJoinNode(leftNode, query, recursivePlanningContext); } /* * Similar to LEFT JOINs, a RIGHT JOIN is recurring if the rhs * is recurring. */ return rightNodeRecurs; } case JOIN_FULL: { /* * full join * full join */ if (leftNodeRecurs && !rightNodeRecurs) { ereport(DEBUG1, (errmsg("recursively planning right side of " "the full join since the other side " "is a recurring rel"))); RecursivelyPlanDistributedJoinNode(rightNode, query, recursivePlanningContext); } else if (!leftNodeRecurs && rightNodeRecurs) { ereport(DEBUG1, (errmsg("recursively planning left side of " "the full join since the other side " "is a recurring rel"))); RecursivelyPlanDistributedJoinNode(leftNode, query, recursivePlanningContext); } /* * An OUTER JOIN is recurring if any sides of the join is * recurring. As in other outer join types, it doesn't matter * whether the other side was / became recurring or not. */ return leftNodeRecurs || rightNodeRecurs; } case JOIN_INNER: { /* * We don't need to recursively plan non-outer joins and we * already descended into sub join trees to handle outer joins * buried in them. */ return leftNodeRecurs && rightNodeRecurs; } default: { ereport(ERROR, (errmsg("got unexpected join type (%d) when recursively " "planning a join", joinExpr->jointype))); } } } else if (IsA(node, RangeTblRef)) { return IsRTERefRecurring((RangeTblRef *) node, query); } else { ereport(ERROR, errmsg("got unexpected node type (%d) when recursively " "planning a join", nodeTag(node))); } } /* * RecursivelyPlanDistributedJoinNode is a helper function for * RecursivelyPlanRecurringTupleOuterJoinWalker that recursively plans given * distributed node that is known to be inner side of an outer join. * * Fails to do so if the distributed join node references the recurring one. * In that case, we don't throw an error here but instead we let * DeferredErrorIfUnsupportedRecurringTuplesJoin to so for a better error * message. * * We call a node "distributed" if it points to a distributed table or a * more complex object (i.e., a join tree or a subquery) that can be pushed * down to the worker nodes directly. For a join, this means that it's either * an INNER join where any side of it is a distributed table / a distributed * sub join tree, or an OUTER join where the outer side is a distributed table * / a distributed sub join tree. */ static void RecursivelyPlanDistributedJoinNode(Node *node, Query *query, RecursivePlanningContext *recursivePlanningContext) { if (IsA(node, JoinExpr)) { /* * This, for example, means that RecursivelyPlanRecurringTupleOuterJoinWalker * needs to plan inner side, i.e., " INNER JOIN ", * of the following join: * LEFT JOIN ( JOIN ) * * XXX: Ideally, we should handle such a sub join tree by moving * it into a subquery "as a whole" but this implies that we need to * rebuild the rtable and re-point all the Vars to the new rtable * indexes, so we've not implemented that yet. * * Instead, we recursively plan all the distributed tables in that * sub join tree. This is much more inefficient than the other * approach (since we lose the opportunity to push-down the whole * sub join tree into the workers) but is easier to implement. */ RecursivelyPlanDistributedJoinNode(((JoinExpr *) node)->larg, query, recursivePlanningContext); RecursivelyPlanDistributedJoinNode(((JoinExpr *) node)->rarg, query, recursivePlanningContext); return; } if (!IsA(node, RangeTblRef)) { ereport(ERROR, (errmsg("unexpected join node type (%d)", nodeTag(node)))); } RangeTblRef *rangeTableRef = (RangeTblRef *) node; if (IsRTERefRecurring(rangeTableRef, query)) { /* * Not the top-level callers but RecursivelyPlanDistributedJoinNode * might call itself for recurring nodes and need to skip them. */ return; } RangeTblEntry *distributedRte = rt_fetch(rangeTableRef->rtindex, query->rtable); if (distributedRte->rtekind == RTE_RELATION) { ereport(DEBUG1, (errmsg("recursively planning distributed relation %s " "since it is part of a distributed join node " "that is outer joined with a recurring rel", GetRelationNameAndAliasName(distributedRte)))); PlannerRestrictionContext *restrictionContext = GetPlannerRestrictionContext(recursivePlanningContext); List *requiredAttributes = RequiredAttrNumbersForRelation(distributedRte, restrictionContext); ReplaceRTERelationWithRteSubquery(distributedRte, requiredAttributes, recursivePlanningContext); } else if (distributedRte->rtekind == RTE_SUBQUERY) { /* * We don't try logging the subquery here because RecursivelyPlanSubquery * will anyway do so if the query doesn't reference the outer query. */ ereport(DEBUG1, (errmsg("recursively planning the distributed subquery " "since it is part of a distributed join node " "that is outer joined with a recurring rel"))); bool recursivelyPlanned = RecursivelyPlanSubquery(distributedRte->subquery, recursivePlanningContext); if (!recursivelyPlanned) { /* * RecursivelyPlanSubquery fails to plan a subquery only if it * contains references to the outer query. This means that, we can't * plan such outer joins (like ) * if it's a LATERAL join where the distributed side is a subquery that * references the outer side, as in, * * SELECT * FROM reference * LEFT JOIN LATERAL * (SELECT * FROM distributed WHERE reference.b > distributed.b) q * USING (a); */ Assert(ContainsReferencesToOuterQuery(distributedRte->subquery)); } } else { /* * We don't expect RecursivelyPlanRecurringTupleOuterJoinWalker to try recursively * plan such an RTE. */ ereport(ERROR, errmsg("got unexpected RTE type (%d) when recursively " "planning a join", distributedRte->rtekind)); } } /* * IsRTERefRecurring returns true if given rte reference points to a recurring * rte. * * If an rte points to a table, then we call it recurring if the table is not * a distributed table. Otherwise, e.g., if it points a query, then we call it * recurring if none of the rtes that belongs to the query point to a distributed * table. * * Note that it's safe to assume a subquery is not recurring if we have a rte reference * to a distributed table somewhere in the query tree. For example, considering * the subquery (q) of the the following query: * SELECT * FROM ref LEFT JOIN (SELECT * FROM ref LEFT dist) q, * one might think that it's not appropriate to call IsRTERefRecurring for subquery * (q). However, this is already not the case because this function is called * in the context of recursive planning and hence any query that contains * rtes pointing to distributed tables and that cannot be pushed down to worker * nodes should've been recursively planned already. This is because, the recursive * planner processes the queries in bottom-up fashion. For this reason, the subquery * in the example should've already be converted to the following before we check * the rte reference that points to the subquery (q): * SELECT * FROM ref LEFT JOIN (SELECT * FROM ref LEFT (SELECT * FROM read_intermediate_result()) dist_1) * That way, we wouldn't incorrectly say that (SELECT * FROM ref LEFT dist) is a * distributed subquery (due to having a reference to a distributed table). */ static bool IsRTERefRecurring(RangeTblRef *rangeTableRef, Query *query) { int rangeTableIndex = rangeTableRef->rtindex; List *rangeTableList = query->rtable; RangeTblEntry *rangeTableEntry = rt_fetch(rangeTableIndex, rangeTableList); return !FindNodeMatchingCheckFunctionInRangeTableList(list_make1(rangeTableEntry), IsDistributedTableRTE); } /* * SublinkListFromWhere finds the subquery nodes in the where clause of the given query. Note * that the function should be called on the original query given that postgres * standard_planner() may convert the subqueries in WHERE clause to joins. */ static List * SublinkListFromWhere(Query *originalQuery) { FromExpr *joinTree = originalQuery->jointree; List *sublinkList = NIL; if (!joinTree) { return NIL; } Node *queryQuals = joinTree->quals; ExtractSublinkWalker(queryQuals, &sublinkList); return sublinkList; } /* * ExtractSublinkWalker walks over a quals node, and finds all sublinks * in that node. */ static bool ExtractSublinkWalker(Node *node, List **sublinkList) { bool walkerResult = false; if (node == NULL) { return false; } if (IsA(node, SubLink)) { (*sublinkList) = lappend(*sublinkList, node); } else { walkerResult = expression_tree_walker(node, ExtractSublinkWalker, sublinkList); } return walkerResult; } /* * ShouldRecursivelyPlanSublinks returns true if the query has a recurring * FROM clause. */ static bool ShouldRecursivelyPlanSublinks(Query *query) { if (FindNodeMatchingCheckFunctionInRangeTableList(query->rtable, IsDistributedTableRTE)) { /* there is a distributed table in the FROM clause */ return false; } return true; } /* * RecursivelyPlanAllSubqueries descends into an expression tree and recursively * plans all subqueries that contain at least one distributed table. The recursive * planning starts from the top of the input query. */ static bool RecursivelyPlanAllSubqueries(Node *node, RecursivePlanningContext *planningContext) { if (node == NULL) { return false; } if (IsA(node, Query)) { Query *query = (Query *) node; if (FindNodeMatchingCheckFunctionInRangeTableList(query->rtable, IsCitusTableRTE)) { RecursivelyPlanSubquery(query, planningContext); } return false; } return expression_tree_walker(node, RecursivelyPlanAllSubqueries, planningContext); } /* * RecursivelyPlanCTEs plans all CTEs in the query by recursively calling the planner * The resulting plan is added to planningContext->subPlanList and CTE references * are replaced by subqueries that call read_intermediate_result, which reads the * intermediate result of the CTE after it is executed. * * Recursive and modifying CTEs are not yet supported and return an error. */ static DeferredErrorMessage * RecursivelyPlanCTEs(Query *query, RecursivePlanningContext *planningContext) { ListCell *cteCell = NULL; CteReferenceWalkerContext context = { -1, NIL }; if (query->cteList == NIL) { /* no CTEs, nothing to do */ return NULL; } if (query->hasRecursive) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "recursive CTEs are not supported in distributed " "queries", NULL, NULL); } /* get all RTE_CTEs that point to CTEs from cteList */ CteReferenceListWalker((Node *) query, &context); foreach(cteCell, query->cteList) { CommonTableExpr *cte = (CommonTableExpr *) lfirst(cteCell); char *cteName = cte->ctename; Query *subquery = (Query *) cte->ctequery; uint64 planId = planningContext->planId; List *cteTargetList = NIL; ListCell *rteCell = NULL; int replacedCtesCount = 0; if (ContainsReferencesToOuterQuery(subquery)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "CTEs that refer to other subqueries are not " "supported in multi-shard queries", NULL, NULL); } if (cte->cterefcount == 0 && subquery->commandType == CMD_SELECT) { /* * SELECT CTEs that aren't referenced aren't executed in postgres. * We don't need to generate a subplan for it and can take the rest * of this iteration off. */ continue; } uint32 subPlanId = list_length(planningContext->subPlanList) + 1; if (IsLoggableLevel(DEBUG1)) { StringInfo subPlanString = makeStringInfo(); pg_get_query_def(subquery, subPlanString); ereport(DEBUG1, (errmsg("generating subplan " UINT64_FORMAT "_%u for CTE %s: %s", planId, subPlanId, cteName, subPlanString->data))); } /* build a sub plan for the CTE */ DistributedSubPlan *subPlan = CreateDistributedSubPlan(subPlanId, subquery); planningContext->subPlanList = lappend(planningContext->subPlanList, subPlan); /* build the result_id parameter for the call to read_intermediate_result */ char *resultId = GenerateResultId(planId, subPlanId); if (subquery->returningList) { /* modifying CTE with returning */ cteTargetList = subquery->returningList; } else { /* regular SELECT CTE */ cteTargetList = subquery->targetList; } /* replace references to the CTE with a subquery that reads results */ Query *resultQuery = BuildSubPlanResultQuery(cteTargetList, cte->aliascolnames, resultId); foreach(rteCell, context.cteReferenceList) { RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rteCell); if (rangeTableEntry->rtekind != RTE_CTE) { /* * This RTE pointed to a preceding CTE that was already replaced * by a subplan. */ continue; } if (strncmp(rangeTableEntry->ctename, cteName, NAMEDATALEN) == 0) { /* change the RTE_CTE into an RTE_SUBQUERY */ rangeTableEntry->rtekind = RTE_SUBQUERY; rangeTableEntry->ctename = NULL; rangeTableEntry->ctelevelsup = 0; if (replacedCtesCount == 0) { /* * Replace the first CTE reference with the result query directly. */ rangeTableEntry->subquery = resultQuery; } else { /* * Replace subsequent CTE references with a copy of the result * query. */ rangeTableEntry->subquery = copyObject(resultQuery); } replacedCtesCount++; } } Assert(cte->cterefcount == replacedCtesCount); } /* * All CTEs are now executed through subplans and RTE_CTEs pointing * to the CTE list have been replaced with subqueries. We can now * clear the cteList. */ query->cteList = NIL; return NULL; } /* * RecursivelyPlanSubqueryWalker recursively finds all the Query nodes and * recursively plans if necessary. */ static bool RecursivelyPlanSubqueryWalker(Node *node, RecursivePlanningContext *context) { if (node == NULL) { return false; } if (IsA(node, Query)) { Query *query = (Query *) node; context->level += 1; /* * First, make sure any subqueries and CTEs within this subquery * are recursively planned if necessary. */ DeferredErrorMessage *error = RecursivelyPlanSubqueriesAndCTEs(query, context); if (error != NULL) { RaiseDeferredError(error, ERROR); } context->level -= 1; /* * Recursively plan this subquery if it cannot be pushed down and is * eligible for recursive planning. */ if (ShouldRecursivelyPlanSubquery(query, context)) { RecursivelyPlanSubquery(query, context); } /* we're done, no need to recurse anymore for this query */ return false; } return expression_tree_walker(node, RecursivelyPlanSubqueryWalker, context); } /* * ShouldRecursivelyPlanSubquery decides whether the input subquery should be recursively * planned or not. * * For the details, see the cases in the function. */ static bool ShouldRecursivelyPlanSubquery(Query *subquery, RecursivePlanningContext *context) { if (FindNodeMatchingCheckFunctionInRangeTableList(subquery->rtable, IsLocalTableRteOrMatView)) { /* * Postgres can always plan queries that don't require distributed planning. * Note that we need to check this first, otherwise the calls to the many other * Citus planner functions would error our due to local relations. * * TODO: We could only successfully create distributed plans with local tables * when the local tables are on the leaf queries and the upper level queries * do not contain any other local tables. */ } else if (CanPushdownSubquery(subquery, false)) { /* * We should do one more check for the distribution key equality. * * If the input query to the planner doesn't contain distribution key equality, * we should further check whether this individual subquery contains or not. * * If all relations are not joined on their distribution keys for the given * subquery, we cannot push push it down and therefore we should try to * recursively plan it. */ if (!context->allDistributionKeysInQueryAreEqual && !AllDistributionKeysInSubqueryAreEqual(subquery, context->plannerRestrictionContext)) { return true; } /* * Citus can pushdown this subquery, no need to recursively * plan which is much more expensive than pushdown. */ return false; } return true; } /* * AllDistributionKeysInSubqueryAreEqual is a wrapper function * for AllDistributionKeysInQueryAreEqual(). Here, we filter the * planner restrictions for the given subquery and do the restriction * equality checks on the filtered restriction. */ static bool AllDistributionKeysInSubqueryAreEqual(Query *subquery, PlannerRestrictionContext *restrictionContext) { /* we don't support distribution eq. checks for CTEs yet */ if (subquery->cteList != NIL) { return false; } PlannerRestrictionContext *filteredRestrictionContext = FilterPlannerRestrictionForQuery(restrictionContext, subquery); bool allDistributionKeysInSubqueryAreEqual = AllDistributionKeysInQueryAreEqual(subquery, filteredRestrictionContext); if (!allDistributionKeysInSubqueryAreEqual) { return false; } return true; } /* * ShouldRecursivelyPlanSetOperation determines whether the leaf queries of a * set operations tree need to be recursively planned in order to support the * query as a whole. */ static bool ShouldRecursivelyPlanSetOperation(Query *query, RecursivePlanningContext *context) { SetOperationStmt *setOperations = (SetOperationStmt *) query->setOperations; if (setOperations == NULL) { return false; } if (context->level == 0) { /* * We cannot push down top-level set operation. Recursively plan the * leaf nodes such that it becomes a router query. */ return true; } if (setOperations->op != SETOP_UNION) { /* * We can only push down UNION operaionts, plan other set operations * recursively. */ return true; } if (DeferErrorIfUnsupportedUnionQuery(query) != NULL) { /* * If at least one leaf query in the union is recurring, then all * leaf nodes need to be recurring. */ return true; } PlannerRestrictionContext *filteredRestrictionContext = FilterPlannerRestrictionForQuery(context->plannerRestrictionContext, query); if (!SafeToPushdownUnionSubquery(query, filteredRestrictionContext)) { /* * The distribution column is not in the same place in all sides * of the union, meaning we cannot determine distribution column * equivalence. Recursive planning is necessary. */ return true; } return false; } /* * RecursivelyPlanSetOperations descends into a tree of set operations * (e.g. UNION, INTERSECTS) and recursively plans all leaf nodes that * contain distributed tables. */ static void RecursivelyPlanSetOperations(Query *query, Node *node, RecursivePlanningContext *context) { if (IsA(node, SetOperationStmt)) { SetOperationStmt *setOperations = (SetOperationStmt *) node; RecursivelyPlanSetOperations(query, setOperations->larg, context); RecursivelyPlanSetOperations(query, setOperations->rarg, context); } else if (IsA(node, RangeTblRef)) { RangeTblRef *rangeTableRef = (RangeTblRef *) node; RangeTblEntry *rangeTableEntry = rt_fetch(rangeTableRef->rtindex, query->rtable); Query *subquery = rangeTableEntry->subquery; if (rangeTableEntry->rtekind == RTE_SUBQUERY && FindNodeMatchingCheckFunction((Node *) subquery, IsDistributedTableRTE)) { RecursivelyPlanSubquery(subquery, context); } } else { ereport(ERROR, (errmsg("unexpected node type (%d) while " "expecting set operations or " "range table references", nodeTag(node)))); } } /* * IsLocalTableRteOrMatView gets a node and returns true if the node is a range * table entry that points to a postgres local or citus local table or to a * materialized view. */ static bool IsLocalTableRteOrMatView(Node *node) { if (node == NULL) { return false; } if (!IsA(node, RangeTblEntry)) { return false; } RangeTblEntry *rangeTableEntry = (RangeTblEntry *) node; if (rangeTableEntry->rtekind != RTE_RELATION) { return false; } if (rangeTableEntry->relkind == RELKIND_VIEW) { return false; } Oid relationId = rangeTableEntry->relid; return IsRelationLocalTableOrMatView(relationId); } /* * IsRelationLocalTableOrMatView returns true if the given relation * is a citus local, local, or materialized view. */ bool IsRelationLocalTableOrMatView(Oid relationId) { if (!IsCitusTable(relationId)) { /* postgres local table or a materialized view */ return true; } else if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) { return true; } /* no local table found */ return false; } /* * RecursivelyPlanSubquery recursively plans a query, replaces it with a * result query and returns the subplan. * * Before we recursively plan the given subquery, we should ensure * that the subquery doesn't contain any references to the outer * queries (i.e., such queries cannot be separately planned). In * that case, the function doesn't recursively plan the input query * and immediately returns. Later, the planner decides on what to do * with the query. */ static bool RecursivelyPlanSubquery(Query *subquery, RecursivePlanningContext *planningContext) { uint64 planId = planningContext->planId; Query *debugQuery = NULL; if (ContainsReferencesToOuterQuery(subquery)) { elog(DEBUG2, "skipping recursive planning for the subquery since it " "contains references to outer queries"); return false; } /* * Subquery will go through the standard planner, thus to properly deparse it * we keep its copy: debugQuery. */ if (IsLoggableLevel(DEBUG1)) { debugQuery = copyObject(subquery); } /* * Create the subplan and append it to the list in the planning context. */ int subPlanId = list_length(planningContext->subPlanList) + 1; DistributedSubPlan *subPlan = CreateDistributedSubPlan(subPlanId, subquery); planningContext->subPlanList = lappend(planningContext->subPlanList, subPlan); /* build the result_id parameter for the call to read_intermediate_result */ char *resultId = GenerateResultId(planId, subPlanId); /* * BuildSubPlanResultQuery() can optionally use provided column aliases. * We do not need to send additional alias list for subqueries. */ Query *resultQuery = BuildSubPlanResultQuery(subquery->targetList, NIL, resultId); if (IsLoggableLevel(DEBUG1)) { StringInfo subqueryString = makeStringInfo(); pg_get_query_def(debugQuery, subqueryString); ereport(DEBUG1, (errmsg("generating subplan " UINT64_FORMAT "_%u for subquery %s", planId, subPlanId, subqueryString->data))); } /* finally update the input subquery to point the result query */ *subquery = *resultQuery; return true; } /* * CreateDistributedSubPlan creates a distributed subplan by recursively calling * the planner from the top, which may either generate a local plan or another * distributed plan, which can itself contain subplans. */ static DistributedSubPlan * CreateDistributedSubPlan(uint32 subPlanId, Query *subPlanQuery) { int cursorOptions = 0; if (ContainsReadIntermediateResultFunction((Node *) subPlanQuery)) { /* * Make sure we go through distributed planning if there are * read_intermediate_result calls, even if there are no distributed * tables in the query anymore. * * We cannot perform this check in the planner itself, since that * would also cause the workers to attempt distributed planning. */ cursorOptions |= CURSOR_OPT_FORCE_DISTRIBUTED; } DistributedSubPlan *subPlan = CitusMakeNode(DistributedSubPlan); subPlan->plan = planner(subPlanQuery, NULL, cursorOptions, NULL); subPlan->subPlanId = subPlanId; return subPlan; } /* * CteReferenceListWalker finds all references to CTEs in the top level of a query * and adds them to context->cteReferenceList. */ static bool CteReferenceListWalker(Node *node, CteReferenceWalkerContext *context) { if (node == NULL) { return false; } if (IsA(node, RangeTblEntry)) { RangeTblEntry *rangeTableEntry = (RangeTblEntry *) node; if (rangeTableEntry->rtekind == RTE_CTE && rangeTableEntry->ctelevelsup == context->level) { context->cteReferenceList = lappend(context->cteReferenceList, rangeTableEntry); } /* caller will descend into range table entry */ return false; } else if (IsA(node, Query)) { Query *query = (Query *) node; context->level += 1; query_tree_walker(query, CteReferenceListWalker, context, QTW_EXAMINE_RTES_BEFORE); context->level -= 1; return false; } else { return expression_tree_walker(node, CteReferenceListWalker, context); } } /* * ContainsReferencesToOuterQuery determines whether the given query contains * anything that points outside of the query itself. Such queries cannot be * planned recursively. */ bool ContainsReferencesToOuterQuery(Query *query) { VarLevelsUpWalkerContext context = { 0 }; int flags = 0; return query_tree_walker(query, ContainsReferencesToOuterQueryWalker, &context, flags); } /* * ContainsReferencesToOuterQueryWalker determines whether the given query * contains any Vars that point more than context->level levels up. * * ContainsReferencesToOuterQueryWalker recursively descends into subqueries * and increases the level by 1 before recursing. */ static bool ContainsReferencesToOuterQueryWalker(Node *node, VarLevelsUpWalkerContext *context) { if (node == NULL) { return false; } if (IsA(node, Var)) { if (((Var *) node)->varlevelsup > context->level) { return true; } return false; } else if (IsA(node, Aggref)) { if (((Aggref *) node)->agglevelsup > context->level) { return true; } } else if (IsA(node, GroupingFunc)) { if (((GroupingFunc *) node)->agglevelsup > context->level) { return true; } return false; } else if (IsA(node, PlaceHolderVar)) { if (((PlaceHolderVar *) node)->phlevelsup > context->level) { return true; } } else if (IsA(node, Query)) { Query *query = (Query *) node; int flags = 0; context->level += 1; bool found = query_tree_walker(query, ContainsReferencesToOuterQueryWalker, context, flags); context->level -= 1; return found; } return expression_tree_walker(node, ContainsReferencesToOuterQueryWalker, context); } /* * NodeContainsSubqueryReferencingOuterQuery determines whether the given node * contains anything that points outside of the query itself. */ static bool NodeContainsSubqueryReferencingOuterQuery(Node *node) { List *sublinks = NIL; ExtractSublinkWalker(node, &sublinks); SubLink *sublink; foreach_ptr(sublink, sublinks) { if (ContainsReferencesToOuterQuery(castNode(Query, sublink->subselect))) { return true; } } return false; } /* * ReplaceRTERelationWithRteSubquery replaces the input rte relation target entry * with a subquery. The function also pushes down the filters to the subquery. * * It then recursively plans the subquery. This subquery is wrapped with another subquery * as a trick to reduce network cost, because we currently don't have an easy way to * skip generating NULL's for non-required columns, and if we create (SELECT a, NULL, NULL FROM table) * then this will be sent over network and NULL's also occupy some space. Instead of this we generate: * (SELECT t.a, NULL, NULL FROM (SELECT a FROM table) t). The inner subquery will be recursively planned * but the outer part will not be yet it will still have the NULL columns so that the query is correct. */ void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry, List *requiredAttrNumbers, RecursivePlanningContext *context) { Query *subquery = WrapRteRelationIntoSubquery(rangeTableEntry, requiredAttrNumbers); List *outerQueryTargetList = CreateAllTargetListForRelation(rangeTableEntry->relid, requiredAttrNumbers); List *restrictionList = GetRestrictInfoListForRelation(rangeTableEntry, context->plannerRestrictionContext); List *copyRestrictionList = copyObject(restrictionList); Expr *andedBoundExpressions = make_ands_explicit(copyRestrictionList); subquery->jointree->quals = (Node *) andedBoundExpressions; /* * Originally the quals were pointing to the RTE and its varno * was pointing to its index in rtable. However now we converted the RTE * to a subquery and the quals should be pointing to that subquery, which * is the only RTE in its rtable, hence we update the varnos so that they * point to the subquery RTE. * Originally: rtable: [rte1, current_rte, rte3...] * Now: rtable: [rte1, subquery[current_rte], rte3...] --subquery[current_rte] refers to its rtable. */ Node *quals = subquery->jointree->quals; UpdateVarNosInNode(quals, SINGLE_RTE_INDEX); /* replace the function with the constructed subquery */ rangeTableEntry->rtekind = RTE_SUBQUERY; rangeTableEntry->subquery = subquery; /* * If the relation is inherited, it'll still be inherited as * we've copied it earlier. This is to prevent the newly created * subquery being treated as inherited. */ rangeTableEntry->inh = false; if (IsLoggableLevel(DEBUG1)) { char *relationAndAliasName = GetRelationNameAndAliasName(rangeTableEntry); ereport(DEBUG1, (errmsg("Wrapping relation %s to a subquery", relationAndAliasName))); } /* as we created the subquery, now forcefully recursively plan it */ bool recursivelyPlanned = RecursivelyPlanSubquery(subquery, context); if (!recursivelyPlanned) { ereport(ERROR, (errmsg( "unexpected state: query should have been recursively planned"))); } Query *outerSubquery = CreateOuterSubquery(rangeTableEntry, outerQueryTargetList); rangeTableEntry->subquery = outerSubquery; } /* * GetRelationNameAndAliasName returns the relname + alias name if * alias name exists otherwise only the relname is returned. */ static char * GetRelationNameAndAliasName(RangeTblEntry *rangeTableEntry) { StringInfo str = makeStringInfo(); appendStringInfo(str, "\"%s\"", get_rel_name(rangeTableEntry->relid)); char *aliasName = NULL; if (rangeTableEntry->alias) { aliasName = rangeTableEntry->alias->aliasname; } if (aliasName) { appendStringInfo(str, " \"%s\"", aliasName); } return str->data; } /* * CreateOuterSubquery creates outer subquery which contains * the given range table entry in its rtable. */ static Query * CreateOuterSubquery(RangeTblEntry *rangeTableEntry, List *outerSubqueryTargetList) { List *innerSubqueryColNames = GenerateRequiredColNamesFromTargetList( outerSubqueryTargetList); Query *outerSubquery = makeNode(Query); outerSubquery->commandType = CMD_SELECT; /* we copy the input rteRelation to preserve the rteIdentity */ RangeTblEntry *innerSubqueryRTE = copyObject(rangeTableEntry); innerSubqueryRTE->eref->colnames = innerSubqueryColNames; outerSubquery->rtable = list_make1(innerSubqueryRTE); /* set the FROM expression to the subquery */ RangeTblRef *newRangeTableRef = makeNode(RangeTblRef); newRangeTableRef->rtindex = 1; outerSubquery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL); outerSubquery->targetList = outerSubqueryTargetList; return outerSubquery; } /* * GenerateRequiredColNamesFromTargetList generates the required colnames * from the given target list. */ static List * GenerateRequiredColNamesFromTargetList(List *targetList) { TargetEntry *entry = NULL; List *innerSubqueryColNames = NIL; foreach_ptr(entry, targetList) { if (IsA(entry->expr, Var)) { /* * column names of the inner subquery should only contain the * required columns, as in if we choose 'b' from ('a','b') colnames * should be 'a' not ('a','b') */ innerSubqueryColNames = lappend(innerSubqueryColNames, makeString( entry->resname)); } } return innerSubqueryColNames; } /* * UpdateVarNosInNode iterates the Vars in the * given node and updates the varno's as the newVarNo. */ static void UpdateVarNosInNode(Node *node, Index newVarNo) { List *varList = pull_var_clause(node, PVC_RECURSE_AGGREGATES | PVC_RECURSE_PLACEHOLDERS); Var *var = NULL; foreach_ptr(var, varList) { var->varno = newVarNo; } } /* * IsRecursivelyPlannableRelation returns true if the given range table entry * is a relation type that can be converted to a subquery. */ bool IsRecursivelyPlannableRelation(RangeTblEntry *rangeTableEntry) { if (rangeTableEntry->rtekind != RTE_RELATION) { return false; } return rangeTableEntry->relkind == RELKIND_PARTITIONED_TABLE || rangeTableEntry->relkind == RELKIND_RELATION || rangeTableEntry->relkind == RELKIND_MATVIEW || rangeTableEntry->relkind == RELKIND_FOREIGN_TABLE; } /* * ContainsLocalTableDistributedTableJoin returns true if the input range table list * contains a direct join between local RTE and an RTE that contains a distributed * or reference table. */ bool ContainsLocalTableDistributedTableJoin(List *rangeTableList) { bool containsLocalTable = false; bool containsDistributedTable = false; RangeTblEntry *rangeTableEntry = NULL; foreach_ptr(rangeTableEntry, rangeTableList) { if (FindNodeMatchingCheckFunctionInRangeTableList(list_make1(rangeTableEntry), IsDistributedOrReferenceTableRTE)) { containsDistributedTable = true; } else if (IsRecursivelyPlannableRelation(rangeTableEntry) && IsLocalTableRteOrMatView((Node *) rangeTableEntry)) { /* we consider citus local tables as local table */ containsLocalTable = true; } } return containsLocalTable && containsDistributedTable; } /* * WrapFunctionsInSubqueries iterates over all the immediate Range Table Entries * of a query and wraps the functions inside (SELECT * FROM fnc() f) * subqueries, so that those functions will be executed on the coordinator if * necessary. * * We wrap all the functions that are used in joins except the ones that are * laterally joined or have WITH ORDINALITY clauses. * */ static void WrapFunctionsInSubqueries(Query *query) { List *rangeTableList = query->rtable; ListCell *rangeTableCell = NULL; /* * If we have only one function call in a query without any joins, we can * easily decide where to execute it. * * If there are some subqueries and/or functions that are joined with a * function, it is not trivial to decide whether we should run this * function in the coordinator or in workers and therefore we may need to * wrap some of those functions in subqueries. * * If we have only one RTE, we leave the parsed query tree as it is. This * also makes sure we do not wrap an already wrapped function call * because we know that there will always be 1 RTE in a wrapped function. * */ if (list_length(rangeTableList) < 2) { return; } /* iterate over all RTEs and wrap them if necessary */ foreach(rangeTableCell, rangeTableList) { RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell); if (ShouldTransformRTE(rangeTableEntry)) { TransformFunctionRTE(rangeTableEntry); } } } /* * TransformFunctionRTE wraps a given function RangeTableEntry * inside a (SELECT * from function() f) subquery. * * The said RangeTableEntry is modified and now points to the new subquery. * */ static void TransformFunctionRTE(RangeTblEntry *rangeTblEntry) { Query *subquery = makeNode(Query); RangeTblRef *newRangeTableRef = makeNode(RangeTblRef); Var *targetColumn = NULL; TargetEntry *targetEntry = NULL; AttrNumber targetColumnIndex = 0; RangeTblFunction *rangeTblFunction = linitial(rangeTblEntry->functions); subquery->commandType = CMD_SELECT; /* copy the input rangeTblEntry to prevent cycles */ RangeTblEntry *newRangeTableEntry = copyObject(rangeTblEntry); /* set the FROM expression to the subquery */ subquery->rtable = list_make1(newRangeTableEntry); newRangeTableRef->rtindex = 1; subquery->jointree = makeFromExpr(list_make1(newRangeTableRef), NULL); /* Determine the result type of the function. * * If function return type is not composite or rowtype can't be determined, * tupleDesc is set to null here */ TupleDesc tupleDesc = (TupleDesc) get_expr_result_tupdesc(rangeTblFunction->funcexpr, true); /* * If tupleDesc is not null, we iterate over all the attributes and * create targetEntries * */ if (tupleDesc) { /* * A sample function join that end up here: * * CREATE FUNCTION f(..) RETURNS TABLE(c1 int, c2 text) AS .. ; * SELECT .. FROM table JOIN f(..) ON ( .. ) ; * * We will iterate over Tuple Description attributes. i.e (c1 int, c2 text) */ if (tupleDesc->natts > MaxAttrNumber) { ereport(ERROR, (errmsg("bad number of tuple descriptor attributes"))); } AttrNumber natts = tupleDesc->natts; for (targetColumnIndex = 0; targetColumnIndex < natts; targetColumnIndex++) { FormData_pg_attribute *attribute = TupleDescAttr(tupleDesc, targetColumnIndex); Oid columnType = attribute->atttypid; char *columnName = attribute->attname.data; /* * The indexing of attributes and TupleDesc and varattno differ * * varattno=0 corresponds to whole row * varattno=1 corresponds to first column that is stored in tupDesc->attrs[0] * * That's why we need to add one to the targetColumnIndex * */ targetColumn = makeVar(1, targetColumnIndex + 1, columnType, -1, InvalidOid, 0); targetEntry = makeTargetEntry((Expr *) targetColumn, targetColumnIndex + 1, columnName, false); subquery->targetList = lappend(subquery->targetList, targetEntry); } } /* * If tupleDesc is NULL we have 2 different cases: * * 1. The function returns a record but the attributes can not be * determined just by looking at the function definition. In this case the * column names and types must be defined explicitly in the query * * 2. The function returns a non-composite type (e.g. int, text, jsonb ..) * */ else { /* create target entries for all columns returned by the function */ ListCell *functionColumnName = NULL; List *functionColumnNames = rangeTblEntry->eref->colnames; foreach(functionColumnName, functionColumnNames) { char *columnName = strVal(lfirst(functionColumnName)); Oid columnType = InvalidOid; /* * If the function returns a set of records, the query needs * to explicitly name column names and types * * Use explicitly defined types in the query if they are * available * */ if (list_length(rangeTblFunction->funccoltypes) > 0) { /* * A sample function join that end up here: * * CREATE FUNCTION get_set_of_records() RETURNS SETOF RECORD AS * $cmd$ * SELECT x, x+1 FROM generate_series(0,4) f(x) * $cmd$ * LANGUAGE SQL; * * SELECT * * FROM table1 JOIN get_set_of_records() AS t2(x int, y int) * ON (id = x); * * Note that the function definition does not have column * names and types. Therefore the user needs to explicitly * state them in the query * */ columnType = list_nth_oid(rangeTblFunction->funccoltypes, targetColumnIndex); } /* use the types in the function definition otherwise */ else { /* * Only functions returning simple types end up here. * A sample function: * * CREATE FUNCTION add(integer, integer) RETURNS integer AS * 'SELECT $1 + $2;' * LANGUAGE SQL; * SELECT * FROM table JOIN add(3,5) sum ON ( .. ) ; * */ FuncExpr *funcExpr = (FuncExpr *) rangeTblFunction->funcexpr; columnType = funcExpr->funcresulttype; } /* Note that the column k is associated with varattno/resno of k+1 */ targetColumn = makeVar(1, targetColumnIndex + 1, columnType, -1, InvalidOid, 0); targetEntry = makeTargetEntry((Expr *) targetColumn, targetColumnIndex + 1, columnName, false); subquery->targetList = lappend(subquery->targetList, targetEntry); targetColumnIndex++; } } /* replace the function with the constructed subquery */ rangeTblEntry->rtekind = RTE_SUBQUERY; rangeTblEntry->subquery = subquery; } /* * ShouldTransformRTE determines whether a given RTE should bne wrapped in a * subquery. * * Not all functions should be wrapped in a subquery for now. As we support more * functions to be used in joins, the constraints here will be relaxed. * */ static bool ShouldTransformRTE(RangeTblEntry *rangeTableEntry) { /* * We should wrap only function rtes that are not LATERAL and * without WITH ORDINALITY clause */ if (rangeTableEntry->rtekind != RTE_FUNCTION || rangeTableEntry->lateral || rangeTableEntry->funcordinality) { return false; } return true; } /* * BuildSubPlanResultQuery returns a query of the form: * * SELECT * * FROM * read_intermediate_result('', ') * AS res (); * * The caller can optionally supply a columnAliasList, which is useful for * CTEs that have column aliases. * * If any of the types in the target list cannot be used in the binary copy format, * then the copy format 'text' is used, otherwise 'binary' is used. */ Query * BuildSubPlanResultQuery(List *targetEntryList, List *columnAliasList, char *resultId) { Oid functionOid = CitusReadIntermediateResultFuncId(); bool useBinaryCopyFormat = CanUseBinaryCopyFormatForTargetList(targetEntryList); Const *resultIdConst = makeNode(Const); resultIdConst->consttype = TEXTOID; resultIdConst->consttypmod = -1; resultIdConst->constlen = -1; resultIdConst->constvalue = CStringGetTextDatum(resultId); resultIdConst->constbyval = false; resultIdConst->constisnull = false; resultIdConst->location = -1; return BuildReadIntermediateResultsQuery(targetEntryList, columnAliasList, resultIdConst, functionOid, useBinaryCopyFormat); } /* * BuildReadIntermediateResultsArrayQuery returns a query of the form: * * SELECT * * FROM * read_intermediate_results(ARRAY['', ...]::text[], ') * AS res (); * * The caller can optionally supply a columnAliasList, which is useful for * CTEs that have column aliases. * * If useBinaryCopyFormat is true, then 'binary' format is used. Otherwise, * 'text' format is used. */ Query * BuildReadIntermediateResultsArrayQuery(List *targetEntryList, List *columnAliasList, List *resultIdList, bool useBinaryCopyFormat) { Oid functionOid = CitusReadIntermediateResultArrayFuncId(); Const *resultIdConst = makeNode(Const); resultIdConst->consttype = TEXTARRAYOID; resultIdConst->consttypmod = -1; resultIdConst->constlen = -1; resultIdConst->constvalue = PointerGetDatum(strlist_to_textarray(resultIdList)); resultIdConst->constbyval = false; resultIdConst->constisnull = false; resultIdConst->location = -1; return BuildReadIntermediateResultsQuery(targetEntryList, columnAliasList, resultIdConst, functionOid, useBinaryCopyFormat); } /* * BuildReadIntermediateResultsQuery is the common code for generating * queries to read from result files. It is used by * BuildReadIntermediateResultsArrayQuery and BuildSubPlanResultQuery. */ static Query * BuildReadIntermediateResultsQuery(List *targetEntryList, List *columnAliasList, Const *resultIdConst, Oid functionOid, bool useBinaryCopyFormat) { List *funcColNames = NIL; List *funcColTypes = NIL; List *funcColTypMods = NIL; List *funcColCollations = NIL; ListCell *targetEntryCell = NULL; List *targetList = NIL; int columnNumber = 1; Oid copyFormatId = BinaryCopyFormatId(); int columnAliasCount = list_length(columnAliasList); /* build the target list and column definition list */ foreach(targetEntryCell, targetEntryList) { TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); Node *targetExpr = (Node *) targetEntry->expr; char *columnName = targetEntry->resname; Oid columnType = exprType(targetExpr); Oid columnTypMod = exprTypmod(targetExpr); Oid columnCollation = exprCollation(targetExpr); if (targetEntry->resjunk) { continue; } funcColNames = lappend(funcColNames, makeString(columnName)); funcColTypes = lappend_int(funcColTypes, columnType); funcColTypMods = lappend_int(funcColTypMods, columnTypMod); funcColCollations = lappend_int(funcColCollations, columnCollation); Var *functionColumnVar = makeNode(Var); functionColumnVar->varno = 1; functionColumnVar->varattno = columnNumber; functionColumnVar->vartype = columnType; functionColumnVar->vartypmod = columnTypMod; functionColumnVar->varcollid = columnCollation; functionColumnVar->varlevelsup = 0; functionColumnVar->varnosyn = 1; functionColumnVar->varattnosyn = columnNumber; functionColumnVar->location = -1; TargetEntry *newTargetEntry = makeNode(TargetEntry); newTargetEntry->expr = (Expr *) functionColumnVar; newTargetEntry->resno = columnNumber; /* * Rename the column only if a column alias is defined. * Notice that column alias count could be less than actual * column count. We only use provided aliases and keep the * original column names if no alias is defined. */ if (columnAliasCount >= columnNumber) { String *columnAlias = (String *) list_nth(columnAliasList, columnNumber - 1); Assert(IsA(columnAlias, String)); newTargetEntry->resname = strVal(columnAlias); } else { newTargetEntry->resname = columnName; } newTargetEntry->resjunk = false; targetList = lappend(targetList, newTargetEntry); columnNumber++; } /* build the citus_copy_format parameter for the call to read_intermediate_result */ if (!useBinaryCopyFormat) { copyFormatId = TextCopyFormatId(); } Const *resultFormatConst = makeNode(Const); resultFormatConst->consttype = CitusCopyFormatTypeId(); resultFormatConst->consttypmod = -1; resultFormatConst->constlen = 4; resultFormatConst->constvalue = ObjectIdGetDatum(copyFormatId); resultFormatConst->constbyval = true; resultFormatConst->constisnull = false; resultFormatConst->location = -1; /* build the call to read_intermediate_result */ FuncExpr *funcExpr = makeNode(FuncExpr); funcExpr->funcid = functionOid; funcExpr->funcretset = true; funcExpr->funcvariadic = false; funcExpr->funcformat = 0; funcExpr->funccollid = 0; funcExpr->inputcollid = 0; funcExpr->location = -1; funcExpr->args = list_make2(resultIdConst, resultFormatConst); /* build the RTE for the call to read_intermediate_result */ RangeTblFunction *rangeTableFunction = makeNode(RangeTblFunction); rangeTableFunction->funccolcount = list_length(funcColNames); rangeTableFunction->funccolnames = funcColNames; rangeTableFunction->funccoltypes = funcColTypes; rangeTableFunction->funccoltypmods = funcColTypMods; rangeTableFunction->funccolcollations = funcColCollations; rangeTableFunction->funcparams = NULL; rangeTableFunction->funcexpr = (Node *) funcExpr; Alias *funcAlias = makeNode(Alias); funcAlias->aliasname = "intermediate_result"; funcAlias->colnames = funcColNames; RangeTblEntry *rangeTableEntry = makeNode(RangeTblEntry); rangeTableEntry->rtekind = RTE_FUNCTION; rangeTableEntry->functions = list_make1(rangeTableFunction); rangeTableEntry->inFromCl = true; rangeTableEntry->eref = funcAlias; /* build the join tree using the read_intermediate_result RTE */ RangeTblRef *rangeTableRef = makeNode(RangeTblRef); rangeTableRef->rtindex = 1; FromExpr *joinTree = makeNode(FromExpr); joinTree->fromlist = list_make1(rangeTableRef); /* build the SELECT query */ Query *resultQuery = makeNode(Query); resultQuery->commandType = CMD_SELECT; resultQuery->rtable = list_make1(rangeTableEntry); resultQuery->jointree = joinTree; resultQuery->targetList = targetList; return resultQuery; } /* * GenerateResultId generates the result ID that is used to identify an intermediate * result of the subplan with the given plan ID and subplan ID. */ char * GenerateResultId(uint64 planId, uint32 subPlanId) { StringInfo resultId = makeStringInfo(); appendStringInfo(resultId, UINT64_FORMAT "_%u", planId, subPlanId); return resultId->data; } /* * GeneratingSubplans returns true if we are currently in the process of * generating subplans. */ bool GeneratingSubplans(void) { return recursivePlanningDepth > 0; }