Skip to content

Commit

Permalink
PR citusdata#6728  / commit - 7
Browse files Browse the repository at this point in the history
Remove unused old metadata sync methods.
  • Loading branch information
aykut-bozkurt committed Mar 30, 2023
1 parent 1fb3de1 commit f8fb20c
Show file tree
Hide file tree
Showing 6 changed files with 0 additions and 665 deletions.
62 changes: 0 additions & 62 deletions src/backend/distributed/commands/dependencies.c
Original file line number Diff line number Diff line change
Expand Up @@ -529,68 +529,6 @@ GetAllDependencyCreateDDLCommands(const List *dependencies)
}


/*
* ReplicateAllObjectsToNodeCommandList returns commands to replicate all
* previously marked objects to a worker node. The function also sets
* clusterHasDistributedFunction if there are any distributed functions.
*/
List *
ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort)
{
/* since we are executing ddl commands disable propagation first, primarily for mx */
List *ddlCommands = list_make1(DISABLE_DDL_PROPAGATION);

/*
* collect all dependencies in creation order and get their ddl commands
*/
List *dependencies = GetDistributedObjectAddressList();

/*
* Depending on changes in the environment, such as the enable_metadata_sync guc
* there might be objects in the distributed object address list that should currently
* not be propagated by citus as they are 'not supported'.
*/
dependencies = FilterObjectAddressListByPredicate(dependencies,
&SupportedDependencyByCitus);

/*
* When dependency lists are getting longer we see a delay in the creation time on the
* workers. We would like to inform the user. Currently we warn for lists greater than
* 100 items, where 100 is an arbitrarily chosen number. If we find it too high or too
* low we can adjust this based on experience.
*/
if (list_length(dependencies) > 100)
{
ereport(NOTICE, (errmsg("Replicating postgres objects to node %s:%d", nodeName,
nodePort),
errdetail("There are %d objects to replicate, depending on your "
"environment this might take a while",
list_length(dependencies))));
}

dependencies = OrderObjectAddressListInDependencyOrder(dependencies);
ObjectAddress *dependency = NULL;
foreach_ptr(dependency, dependencies)
{
if (IsAnyObjectAddressOwnedByExtension(list_make1(dependency), NULL))
{
/*
* we expect extension-owned objects to be created as a result
* of the extension being created.
*/
continue;
}

ddlCommands = list_concat(ddlCommands,
GetDependencyCreateDDLCommands(dependency));
}

ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION);

return ddlCommands;
}


/*
* ShouldPropagate determines if we should be propagating anything
*/
Expand Down
295 changes: 0 additions & 295 deletions src/backend/distributed/metadata/metadata_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -237,81 +237,6 @@ start_metadata_sync_to_all_nodes(PG_FUNCTION_ARGS)
}


/*
* SyncNodeMetadataToNode is the internal API for
* start_metadata_sync_to_node().
*/
void
SyncNodeMetadataToNode(const char *nodeNameString, int32 nodePort)
{
char *escapedNodeName = quote_literal_cstr(nodeNameString);

CheckCitusVersion(ERROR);
EnsureCoordinator();
EnsureModificationsCanRun();

EnsureSequentialModeMetadataOperations();

LockRelationOid(DistNodeRelationId(), ExclusiveLock);

WorkerNode *workerNode = FindWorkerNode(nodeNameString, nodePort);
if (workerNode == NULL)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("you cannot sync metadata to a non-existent node"),
errhint("First, add the node with SELECT citus_add_node"
"(%s,%d)", escapedNodeName, nodePort)));
}

if (!workerNode->isActive)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("you cannot sync metadata to an inactive node"),
errhint("First, activate the node with "
"SELECT citus_activate_node(%s,%d)",
escapedNodeName, nodePort)));
}

if (NodeIsCoordinator(workerNode))
{
ereport(NOTICE, (errmsg("%s:%d is the coordinator and already contains "
"metadata, skipping syncing the metadata",
nodeNameString, nodePort)));
return;
}

UseCoordinatedTransaction();

/*
* One would normally expect to set hasmetadata first, and then metadata sync.
* However, at this point we do the order reverse.
* We first set metadatasynced, and then hasmetadata; since setting columns for
* nodes with metadatasynced==false could cause errors.
* (See ErrorIfAnyMetadataNodeOutOfSync)
* We can safely do that because we are in a coordinated transaction and the changes
* are only visible to our own transaction.
* If anything goes wrong, we are going to rollback all the changes.
*/
workerNode = SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced,
BoolGetDatum(true));
workerNode = SetWorkerColumn(workerNode, Anum_pg_dist_node_hasmetadata, BoolGetDatum(
true));

if (!NodeIsPrimary(workerNode))
{
/*
* If this is a secondary node we can't actually sync metadata to it; we assume
* the primary node is receiving metadata.
*/
return;
}

/* fail if metadata synchronization doesn't succeed */
bool raiseInterrupts = true;
SyncNodeMetadataSnapshotToNode(workerNode, raiseInterrupts);
}


/*
* SyncCitusTableMetadata syncs citus table metadata to worker nodes with metadata.
* Our definition of metadata includes the shell table and its inter relations with
Expand Down Expand Up @@ -803,114 +728,6 @@ NodeMetadataCreateCommands(void)
}


/*
* DistributedObjectMetadataSyncCommandList returns the necessary commands to create
* pg_dist_object entries on the new node.
*/
List *
DistributedObjectMetadataSyncCommandList(void)
{
HeapTuple pgDistObjectTup = NULL;
Relation pgDistObjectRel = table_open(DistObjectRelationId(), AccessShareLock);
Relation pgDistObjectIndexRel = index_open(DistObjectPrimaryKeyIndexId(),
AccessShareLock);
TupleDesc pgDistObjectDesc = RelationGetDescr(pgDistObjectRel);

List *objectAddressList = NIL;
List *distArgumentIndexList = NIL;
List *colocationIdList = NIL;
List *forceDelegationList = NIL;

/* It is not strictly necessary to read the tuples in order.
* However, it is useful to get consistent behavior, both for regression
* tests and also in production systems.
*/
SysScanDesc pgDistObjectScan = systable_beginscan_ordered(pgDistObjectRel,
pgDistObjectIndexRel, NULL,
0, NULL);
while (HeapTupleIsValid(pgDistObjectTup = systable_getnext_ordered(pgDistObjectScan,
ForwardScanDirection)))
{
Form_pg_dist_object pg_dist_object = (Form_pg_dist_object) GETSTRUCT(
pgDistObjectTup);

ObjectAddress *address = palloc(sizeof(ObjectAddress));

ObjectAddressSubSet(*address, pg_dist_object->classid, pg_dist_object->objid,
pg_dist_object->objsubid);

bool distributionArgumentIndexIsNull = false;
Datum distributionArgumentIndexDatum =
heap_getattr(pgDistObjectTup,
Anum_pg_dist_object_distribution_argument_index,
pgDistObjectDesc,
&distributionArgumentIndexIsNull);
int32 distributionArgumentIndex = DatumGetInt32(distributionArgumentIndexDatum);

bool colocationIdIsNull = false;
Datum colocationIdDatum =
heap_getattr(pgDistObjectTup,
Anum_pg_dist_object_colocationid,
pgDistObjectDesc,
&colocationIdIsNull);
int32 colocationId = DatumGetInt32(colocationIdDatum);

bool forceDelegationIsNull = false;
Datum forceDelegationDatum =
heap_getattr(pgDistObjectTup,
Anum_pg_dist_object_force_delegation,
pgDistObjectDesc,
&forceDelegationIsNull);
bool forceDelegation = DatumGetBool(forceDelegationDatum);

objectAddressList = lappend(objectAddressList, address);

if (distributionArgumentIndexIsNull)
{
distArgumentIndexList = lappend_int(distArgumentIndexList,
INVALID_DISTRIBUTION_ARGUMENT_INDEX);
}
else
{
distArgumentIndexList = lappend_int(distArgumentIndexList,
distributionArgumentIndex);
}

if (colocationIdIsNull)
{
colocationIdList = lappend_int(colocationIdList,
INVALID_COLOCATION_ID);
}
else
{
colocationIdList = lappend_int(colocationIdList, colocationId);
}

if (forceDelegationIsNull)
{
forceDelegationList = lappend_int(forceDelegationList, NO_FORCE_PUSHDOWN);
}
else
{
forceDelegationList = lappend_int(forceDelegationList, forceDelegation);
}
}

systable_endscan_ordered(pgDistObjectScan);
index_close(pgDistObjectIndexRel, AccessShareLock);
relation_close(pgDistObjectRel, NoLock);

char *workerMetadataUpdateCommand =
MarkObjectsDistributedCreateCommand(objectAddressList,
distArgumentIndexList,
colocationIdList,
forceDelegationList);
List *commandList = list_make1(workerMetadataUpdateCommand);

return commandList;
}


/*
* CitusTableMetadataCreateCommandList returns the set of commands necessary to
* create the given distributed table metadata on a worker.
Expand Down Expand Up @@ -4081,118 +3898,6 @@ ColocationGroupDeleteCommand(uint32 colocationId)
}


/*
* ColocationGroupCreateCommandList returns the full list of commands for syncing
* pg_dist_colocation.
*/
List *
ColocationGroupCreateCommandList(void)
{
bool hasColocations = false;

StringInfo colocationGroupCreateCommand = makeStringInfo();
appendStringInfo(colocationGroupCreateCommand,
"WITH colocation_group_data (colocationid, shardcount, "
"replicationfactor, distributioncolumntype, "
"distributioncolumncollationname, "
"distributioncolumncollationschema) AS (VALUES ");

Relation pgDistColocation = table_open(DistColocationRelationId(), AccessShareLock);
Relation colocationIdIndexRel = index_open(DistColocationIndexId(), AccessShareLock);

/*
* It is not strictly necessary to read the tuples in order.
* However, it is useful to get consistent behavior, both for regression
* tests and also in production systems.
*/
SysScanDesc scanDescriptor =
systable_beginscan_ordered(pgDistColocation, colocationIdIndexRel,
NULL, 0, NULL);

HeapTuple colocationTuple = systable_getnext_ordered(scanDescriptor,
ForwardScanDirection);

while (HeapTupleIsValid(colocationTuple))
{
if (hasColocations)
{
appendStringInfo(colocationGroupCreateCommand, ", ");
}

hasColocations = true;

Form_pg_dist_colocation colocationForm =
(Form_pg_dist_colocation) GETSTRUCT(colocationTuple);

appendStringInfo(colocationGroupCreateCommand,
"(%d, %d, %d, %s, ",
colocationForm->colocationid,
colocationForm->shardcount,
colocationForm->replicationfactor,
RemoteTypeIdExpression(colocationForm->distributioncolumntype));

/*
* For collations, include the names in the VALUES section and then
* join with pg_collation.
*/
Oid distributionColumCollation = colocationForm->distributioncolumncollation;
if (distributionColumCollation != InvalidOid)
{
Datum collationIdDatum = ObjectIdGetDatum(distributionColumCollation);
HeapTuple collationTuple = SearchSysCache1(COLLOID, collationIdDatum);

if (HeapTupleIsValid(collationTuple))
{
Form_pg_collation collationform =
(Form_pg_collation) GETSTRUCT(collationTuple);
char *collationName = NameStr(collationform->collname);
char *collationSchemaName = get_namespace_name(
collationform->collnamespace);

appendStringInfo(colocationGroupCreateCommand,
"%s, %s)",
quote_literal_cstr(collationName),
quote_literal_cstr(collationSchemaName));

ReleaseSysCache(collationTuple);
}
else
{
appendStringInfo(colocationGroupCreateCommand,
"NULL, NULL)");
}
}
else
{
appendStringInfo(colocationGroupCreateCommand,
"NULL, NULL)");
}

colocationTuple = systable_getnext_ordered(scanDescriptor, ForwardScanDirection);
}

systable_endscan_ordered(scanDescriptor);
index_close(colocationIdIndexRel, AccessShareLock);
table_close(pgDistColocation, AccessShareLock);

if (!hasColocations)
{
return NIL;
}

appendStringInfo(colocationGroupCreateCommand,
") SELECT pg_catalog.citus_internal_add_colocation_metadata("
"colocationid, shardcount, replicationfactor, "
"distributioncolumntype, coalesce(c.oid, 0)) "
"FROM colocation_group_data d LEFT JOIN pg_collation c "
"ON (d.distributioncolumncollationname = c.collname "
"AND d.distributioncolumncollationschema::regnamespace"
" = c.collnamespace)");

return list_make1(colocationGroupCreateCommand->data);
}


/*
* SetMetadataSyncNodesFromNodeList sets list of nodes that needs to be metadata
* synced among given node list into metadataSyncContext.
Expand Down
Loading

0 comments on commit f8fb20c

Please sign in to comment.