Skip to content

Commit

Permalink
Introduce intermediate result broadcasting
Browse files Browse the repository at this point in the history
In plain words, each distributed plan pulls the necessary intermediate
results to the worker nodes that the plan hits. This is primarily useful
in three ways. 

(i) If the distributed plan that uses intermediate
result(s) is a router query, then the intermediate results are only
broadcasted to a single node.

(ii) If a distributed plan consists of only intermediate results, which
is not uncommon, the intermediate results are broadcasted to a single
node only.

(iii) If a distributed query hits a sub-set of the shards in multiple
workers, the intermediate results will be broadcasted to the relevant
node(s).

The final item (iii) becomes crucial for append/range distributed
tables where typically the distributed queries hit a small subset of
shards/workers.

To do this, for each query that Citus creates a distributed plan, we keep
track of the subPlans used in the queryTree, and save it in the distributed
plan. Just before Citus executes each subPlan, Citus first keeps track of
every worker node that the distributed plan hits, and marks every subPlan
should be broadcasted to these nodes. Later, for each subPlan which is a
distributed plan, Citus does this operation recursively since these
distributed plans may access to different subPlans, and those have to be
recorded as well.
  • Loading branch information
hanefi authored Nov 20, 2019
1 parent b7fef5c commit d82f3e9
Show file tree
Hide file tree
Showing 24 changed files with 2,046 additions and 85 deletions.
86 changes: 86 additions & 0 deletions src/backend/distributed/executor/citus_custom_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -398,3 +398,89 @@ ScanStateGetExecutorState(CitusScanState *scanState)
{
return scanState->customScanState.ss.ps.state;
}


/*
* FetchCitusCustomScanIfExists traverses a given plan and returns a Citus CustomScan
* if it has any.
*/
CustomScan *
FetchCitusCustomScanIfExists(Plan *plan)
{
CustomScan *customScan = NULL;

if (plan == NULL)
{
return NULL;
}

if (IsCitusCustomScan(plan))
{
return (CustomScan *) plan;
}

customScan = FetchCitusCustomScanIfExists(plan->lefttree);

if (customScan == NULL)
{
customScan = FetchCitusCustomScanIfExists(plan->righttree);
}

return customScan;
}


/*
* IsCitusPlan returns whether a Plan contains a CustomScan generated by Citus
* by recursively walking through the plan tree.
*/
bool
IsCitusPlan(Plan *plan)
{
if (plan == NULL)
{
return false;
}

if (IsCitusCustomScan(plan))
{
return true;
}

return IsCitusPlan(plan->lefttree) || IsCitusPlan(plan->righttree);
}


/*
* IsCitusCustomScan returns whether Plan node is a CustomScan generated by Citus.
*/
bool
IsCitusCustomScan(Plan *plan)
{
CustomScan *customScan = NULL;
Node *privateNode = NULL;

if (plan == NULL)
{
return false;
}

if (!IsA(plan, CustomScan))
{
return false;
}

customScan = (CustomScan *) plan;
if (list_length(customScan->custom_private) == 0)
{
return false;
}

privateNode = (Node *) linitial(customScan->custom_private);
if (!CitusIsA(privateNode, DistributedPlan))
{
return false;
}

return true;
}
68 changes: 0 additions & 68 deletions src/backend/distributed/executor/multi_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ bool SortReturning = false;


/* local function forward declarations */
static bool IsCitusPlan(Plan *plan);
static bool IsCitusCustomScan(Plan *plan);
static Relation StubRelation(TupleDesc tupleDescriptor);
static bool AlterTableConstraintCheck(QueryDesc *queryDesc);
static bool IsLocalReferenceTableJoinPlan(PlannedStmt *plan);
Expand Down Expand Up @@ -189,72 +187,6 @@ CitusExecutorRun(QueryDesc *queryDesc,
}


/*
* IsCitusPlan returns whether a Plan contains a CustomScan generated by Citus
* by recursively walking through the plan tree.
*/
static bool
IsCitusPlan(Plan *plan)
{
if (plan == NULL)
{
return false;
}

if (IsCitusCustomScan(plan))
{
return true;
}

if (plan->lefttree != NULL && IsCitusPlan(plan->lefttree))
{
return true;
}

if (plan->righttree != NULL && IsCitusPlan(plan->righttree))
{
return true;
}

return false;
}


/*
* IsCitusCustomScan returns whether Plan node is a CustomScan generated by Citus.
*/
static bool
IsCitusCustomScan(Plan *plan)
{
CustomScan *customScan = NULL;
Node *privateNode = NULL;

if (plan == NULL)
{
return false;
}

if (!IsA(plan, CustomScan))
{
return false;
}

customScan = (CustomScan *) plan;
if (list_length(customScan->custom_private) == 0)
{
return false;
}

privateNode = (Node *) linitial(customScan->custom_private);
if (!CitusIsA(privateNode, DistributedPlan))
{
return false;
}

return true;
}


/*
* ReturnTupleFromTuplestore reads the next tuple from the tuple store of the
* given Citus scan node and returns it. It returns null if all tuples are read
Expand Down
47 changes: 35 additions & 12 deletions src/backend/distributed/executor/subplan_execution.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "postgres.h"

#include "distributed/intermediate_result_pruning.h"
#include "distributed/intermediate_results.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_physical_planner.h"
Expand All @@ -35,19 +36,18 @@ ExecuteSubPlans(DistributedPlan *distributedPlan)
uint64 planId = distributedPlan->planId;
List *subPlanList = distributedPlan->subPlanList;
ListCell *subPlanCell = NULL;
List *nodeList = NIL;

/* If you're not a worker node, you should write local file to make sure
* you have the data too */
bool writeLocalFile = GetLocalGroupId() == 0;

HTAB *intermediateResultsHash = NULL;

if (subPlanList == NIL)
{
/* no subplans to execute */
return;
}

intermediateResultsHash = MakeIntermediateResultHTAB();
RecordSubplanExecutionsOnNodes(intermediateResultsHash, distributedPlan);


/*
* Make sure that this transaction has a distributed transaction ID.
*
Expand All @@ -56,8 +56,6 @@ ExecuteSubPlans(DistributedPlan *distributedPlan)
*/
BeginOrContinueCoordinatedTransaction();

nodeList = ActiveReadableWorkerNodeList();

foreach(subPlanCell, subPlanList)
{
DistributedSubPlan *subPlan = (DistributedSubPlan *) lfirst(subPlanCell);
Expand All @@ -66,14 +64,39 @@ ExecuteSubPlans(DistributedPlan *distributedPlan)
DestReceiver *copyDest = NULL;
ParamListInfo params = NULL;
EState *estate = NULL;

bool writeLocalFile = false;
char *resultId = GenerateResultId(planId, subPlanId);
List *workerNodeList =
FindAllWorkerNodesUsingSubplan(intermediateResultsHash, resultId);

/*
* Write intermediate results to local file only if there is no worker
* node that receives them.
*
* This could happen in two cases:
* (a) Subquery in the having
* (b) The intermediate result is not used, such as RETURNING of a
* modifying CTE is not used
*
* For SELECT, Postgres/Citus is clever enough to not execute the CTE
* if it is not used at all, but for modifications we have to execute
* the queries.
*/
if (workerNodeList == NIL)
{
writeLocalFile = true;

if ((LogIntermediateResults && IsLoggableLevel(DEBUG1)) ||
IsLoggableLevel(DEBUG4))
{
elog(DEBUG1, "Subplan %s will be written to local file", resultId);
}
}

SubPlanLevel++;
estate = CreateExecutorState();
copyDest = (DestReceiver *) CreateRemoteFileDestReceiver(resultId, estate,
nodeList,
writeLocalFile);
copyDest = CreateRemoteFileDestReceiver(resultId, estate, workerNodeList,
writeLocalFile);

ExecutePlanIntoDestReceiver(plannedStmt, params, copyDest);

Expand Down
58 changes: 57 additions & 1 deletion src/backend/distributed/planner/distributed_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
#include "distributed/citus_nodes.h"
#include "distributed/function_call_delegation.h"
#include "distributed/insert_select_planner.h"
#include "distributed/intermediate_result_pruning.h"
#include "distributed/intermediate_results.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/distributed_planner.h"
#include "distributed/query_pushdown_planning.h"
#include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_partitioning_utils.h"
Expand Down Expand Up @@ -73,6 +75,8 @@ static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQue
bool hasUnresolvedParams,
PlannerRestrictionContext *
plannerRestrictionContext);
static void FinalizeDistributedPlan(DistributedPlan *plan, Query *originalQuery);
static void RecordSubPlansUsedInPlan(DistributedPlan *plan, Query *originalQuery);
static DeferredErrorMessage * DeferErrorIfPartitionTableNotSingleReplicated(Oid
relationId);

Expand Down Expand Up @@ -617,6 +621,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
List *subPlanList = NIL;
bool hasCtes = originalQuery->cteList != NIL;


if (IsModifyCommand(originalQuery))
{
Oid targetRelationId = InvalidOid;
Expand Down Expand Up @@ -652,6 +657,8 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi

if (distributedPlan->planningError == NULL)
{
FinalizeDistributedPlan(distributedPlan, originalQuery);

return distributedPlan;
}
else
Expand All @@ -672,7 +679,8 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
plannerRestrictionContext);
if (distributedPlan->planningError == NULL)
{
/* successfully created a router plan */
FinalizeDistributedPlan(distributedPlan, originalQuery);

return distributedPlan;
}
else
Expand Down Expand Up @@ -765,6 +773,8 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
plannerRestrictionContext);
distributedPlan->subPlanList = subPlanList;

FinalizeDistributedPlan(distributedPlan, originalQuery);

return distributedPlan;
}

Expand All @@ -775,6 +785,8 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
*/
if (IsModifyCommand(originalQuery))
{
FinalizeDistributedPlan(distributedPlan, originalQuery);

return distributedPlan;
}

Expand Down Expand Up @@ -806,10 +818,54 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
/* distributed plan currently should always succeed or error out */
Assert(distributedPlan && distributedPlan->planningError == NULL);

FinalizeDistributedPlan(distributedPlan, originalQuery);

return distributedPlan;
}


/*
* FinalizeDistributedPlan is the final step of distributed planning. The function
* currently only implements some optimizations for intermediate result(s) pruning.
*/
static void
FinalizeDistributedPlan(DistributedPlan *plan, Query *originalQuery)
{
RecordSubPlansUsedInPlan(plan, originalQuery);
}


/*
* RecordSubPlansUsedInPlan gets a distributed plan a queryTree, and
* updates the usedSubPlanNodeList of the distributed plan.
*
* The function simply pulls all the subPlans that are used in the queryTree
* with one exception: subPlans in the HAVING clause. The reason is explained
* in the function.
*/
static void
RecordSubPlansUsedInPlan(DistributedPlan *plan, Query *originalQuery)
{
/* first, get all the subplans in the query */
plan->usedSubPlanNodeList = FindSubPlansUsedInNode((Node *) originalQuery);

/*
* Later, remove the subplans used in the HAVING clause, because they
* are only required in the coordinator. Including them in the
* usedSubPlanNodeList prevents the intermediate results to be sent to the
* coordinator only.
*/
if (originalQuery->hasSubLinks &&
FindNodeCheck(originalQuery->havingQual, IsNodeSubquery))
{
List *subplansInHaving = FindSubPlansUsedInNode(originalQuery->havingQual);

plan->usedSubPlanNodeList =
list_difference(plan->usedSubPlanNodeList, subplansInHaving);
}
}


/*
* EnsurePartitionTableNotReplicated errors out if the infput relation is
* a partition table and the table has a replication factor greater than
Expand Down
Loading

0 comments on commit d82f3e9

Please sign in to comment.