Skip to content

Commit

Permalink
[FLINK-20370][table] part2: introduce 'table.exec.sink.keyed-shuffle'…
Browse files Browse the repository at this point in the history
… option to auto keyby on sink's pk if parallelism are not the same for insertOnly input

This closes apache#17939
  • Loading branch information
lincoln-lil authored Dec 8, 2021
1 parent a4299a2 commit 9e76585
Show file tree
Hide file tree
Showing 12 changed files with 715 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@
<td><p>Enum</p></td>
<td>Determines whether string values for columns with CHAR(&lt;precision&gt;)/VARCHAR(&lt;precision&gt;) types will be trimmed or padded (only for CHAR(&lt;precision&gt;)), so that their length will match the one defined by the precision of their respective CHAR/VARCHAR column type.<br /><br />Possible values:<ul><li>"IGNORE": Don't apply any trimming and padding, and instead ignore the CHAR/VARCHAR precision directive.</li><li>"TRIM_PAD": Trim and pad string values to match the length defined by the CHAR/VARCHAR precision.</li></ul></td>
</tr>
<tr>
<td><h5>table.exec.sink.keyed-shuffle</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">AUTO</td>
<td><p>Enum</p></td>
<td>In order to minimize the distributed disorder problem when writing data into table with primary keys that many users suffers. FLINK will auto add a keyed shuffle by default when the sink's parallelism differs from upstream operator and upstream is append only. This works only when the upstream ensures the multi-records' order on the primary key, if not, the added shuffle can not solve the problem (In this situation, a more proper way is to consider the deduplicate operation for the source firstly or use an upsert source with primary key definition which truly reflect the records evolution).<br />By default, the keyed shuffle will be added when the sink's parallelism differs from upstream operator. You can set to no shuffle(NONE) or force shuffle(FORCE).<br /><br />Possible values:<ul><li>"NONE"</li><li>"AUTO"</li><li>"FORCE"</li></ul></td>
</tr>
<tr>
<td><h5>table.exec.sink.legacy-cast-behaviour</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">ENABLED</td>
<td><p>Enum</p></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,25 @@ public class ExecutionConfigOptions {
+ "or force materialization(FORCE).")
.build());

@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<SinkKeyedShuffle> TABLE_EXEC_SINK_KEYED_SHUFFLE =
key("table.exec.sink.keyed-shuffle")
.enumType(SinkKeyedShuffle.class)
.defaultValue(SinkKeyedShuffle.AUTO)
.withDescription(
Description.builder()
.text(
"In order to minimize the distributed disorder problem when writing data into table with primary keys that many users suffers. "
+ "FLINK will auto add a keyed shuffle by default when the sink's parallelism differs from upstream operator and upstream is append only. "
+ "This works only when the upstream ensures the multi-records' order on the primary key, if not, the added shuffle can not solve "
+ "the problem (In this situation, a more proper way is to consider the deduplicate operation for the source firstly or use an "
+ "upsert source with primary key definition which truly reflect the records evolution).")
.linebreak()
.text(
"By default, the keyed shuffle will be added when the sink's parallelism differs from upstream operator. "
+ "You can set to no shuffle(NONE) or force shuffle(FORCE).")
.build());

// ------------------------------------------------------------------------
// Sort Options
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -463,6 +482,20 @@ public enum UpsertMaterialize {
FORCE
}

/** Shuffle by primary key before sink. */
@PublicEvolving
public enum SinkKeyedShuffle {

/** No keyed shuffle will be added for sink. */
NONE,

/** Auto add keyed shuffle when the sink's parallelism differs from upstream operator. */
AUTO,

/** Add keyed shuffle in any case except single parallelism. */
FORCE
}

/** Determine if CAST operates using the legacy behaviour or the new one. */
@Deprecated
public enum LegacyCastBehaviour implements DescribedEnum {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,31 +120,48 @@ protected Transformation<Object> createSinkTransformation(
int rowtimeFieldIndex,
boolean upsertMaterialize) {
final DynamicTableSink tableSink = tableSinkSpec.getTableSink(planner.getFlinkContext());
final ChangelogMode changelogMode = tableSink.getChangelogMode(inputChangelogMode);
final ResolvedSchema schema = tableSinkSpec.getCatalogTable().getResolvedSchema();

final SinkRuntimeProvider runtimeProvider =
tableSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded));

final RowType physicalRowType = getPhysicalRowType(schema);

final int[] primaryKeys = getPrimaryKeyIndices(physicalRowType, schema);

final int sinkParallelism = deriveSinkParallelism(inputTransform, runtimeProvider);
final int inputParallelism = inputTransform.getParallelism();
final boolean inputInsertOnly = inputChangelogMode.containsOnly(RowKind.INSERT);
final boolean hasPk = primaryKeys.length > 0;

if (!inputInsertOnly && sinkParallelism != inputParallelism && !hasPk) {
throw new TableException(
String.format(
"The sink for table '%s' has a configured parallelism of %s, while the input parallelism is %s. "
+ "Since the configured parallelism is different from the input's parallelism and "
+ "the changelog mode is not insert-only, a primary key is required but could not "
+ "be found.",
tableSinkSpec.getObjectIdentifier().asSummaryString(),
sinkParallelism,
inputParallelism));
}

// only add materialization if input has change
final boolean needMaterialization = !inputInsertOnly && upsertMaterialize;

Transformation<RowData> sinkTransform =
applyConstraintValidations(
inputTransform, planner.getTableConfig(), physicalRowType);

sinkTransform =
applyKeyBy(
changelogMode,
sinkTransform,
primaryKeys,
sinkParallelism,
upsertMaterialize);
if (hasPk) {
sinkTransform =
applyKeyBy(
planner.getTableConfig(),
sinkTransform,
primaryKeys,
sinkParallelism,
inputParallelism,
inputInsertOnly,
needMaterialization);
}

if (upsertMaterialize) {
if (needMaterialization) {
sinkTransform =
applyUpsertMaterialize(
sinkTransform,
Expand Down Expand Up @@ -280,26 +297,29 @@ private int deriveSinkParallelism(
* messages.
*/
private Transformation<RowData> applyKeyBy(
ChangelogMode changelogMode,
TableConfig config,
Transformation<RowData> inputTransform,
int[] primaryKeys,
int sinkParallelism,
boolean upsertMaterialize) {
final int inputParallelism = inputTransform.getParallelism();
if ((inputParallelism == sinkParallelism || changelogMode.containsOnly(RowKind.INSERT))
&& !upsertMaterialize) {
return inputTransform;
int inputParallelism,
boolean inputInsertOnly,
boolean needMaterialize) {
final ExecutionConfigOptions.SinkKeyedShuffle sinkShuffleByPk =
config.getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_SINK_KEYED_SHUFFLE);
boolean sinkKeyBy = false;
switch (sinkShuffleByPk) {
case NONE:
break;
case AUTO:
sinkKeyBy = inputInsertOnly && sinkParallelism != inputParallelism;
break;
case FORCE:
// single parallelism has no problem
sinkKeyBy = sinkParallelism != 1 || inputParallelism != 1;
break;
}
if (primaryKeys.length == 0) {
throw new TableException(
String.format(
"The sink for table '%s' has a configured parallelism of %s, while the input parallelism is %s. "
+ "Since the configured parallelism is different from the input's parallelism and "
+ "the changelog mode is not insert-only, a primary key is required but could not "
+ "be found.",
tableSinkSpec.getObjectIdentifier().asSummaryString(),
sinkParallelism,
inputParallelism));
if (!sinkKeyBy && !needMaterialize) {
return inputTransform;
}

final RowDataKeySelector selector =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1676,7 +1676,7 @@ public Optional<Integer> getParallelism() {
+ "' yet.");
sinkFunction = new RetractingSinkFunction(tableName, converter);
}
return SinkFunctionProvider.of(sinkFunction);
return SinkFunctionProvider.of(sinkFunction, this.parallelism);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ org.apache.flink.formats.testcsv.TestCsvFormatFactory
org.apache.flink.table.planner.factories.TestValuesTableFactory
org.apache.flink.table.planner.factories.TestFileFactory
org.apache.flink.table.planner.factories.TableFactoryHarness$Factory
org.apache.flink.table.planner.plan.stream.sql.TestTableFactory
Loading

0 comments on commit 9e76585

Please sign in to comment.