Skip to content

Commit

Permalink
[fixup][table-planner] Refactoring CodeGeneratorContext by add user C…
Browse files Browse the repository at this point in the history
…lassLoader
  • Loading branch information
lsyldliu authored and wuchong committed Jun 8, 2022
1 parent 6268c54 commit 0fe20bc
Show file tree
Hide file tree
Showing 121 changed files with 723 additions and 277 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ protected OneInputStreamOperatorTestHarness getTestHarness(Configuration config)
new KeyedOneInputStreamOperatorTestHarness(
operator,
KeySelectorUtil.getRowDataSelector(
getGrouping(), InternalTypeInfo.of(getInputType())),
Thread.currentThread().getContextClassLoader(),
getGrouping(),
InternalTypeInfo.of(getInputType())),
InternalTypeInfo.of(getKeyType()),
1,
1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,9 @@ private Projection<RowData, BinaryRowData> createProjection(String name, int[] f
.collect(Collectors.toList()));
final GeneratedProjection generatedProjection =
ProjectionCodeGenerator.generateProjection(
new CodeGeneratorContext(new Configuration()),
new CodeGeneratorContext(
new Configuration(),
Thread.currentThread().getContextClassLoader()),
name,
inputType,
forwardedFieldType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,19 +185,25 @@ public AbstractArrowPythonAggregateFunctionOperator getTestOperator(
udfInputType,
udfOutputType,
ProjectionCodeGenerator.generateProjection(
new CodeGeneratorContext(new Configuration()),
new CodeGeneratorContext(
new Configuration(),
Thread.currentThread().getContextClassLoader()),
"UdafInputProjection",
inputType,
udfInputType,
udafInputOffsets),
ProjectionCodeGenerator.generateProjection(
new CodeGeneratorContext(new Configuration()),
new CodeGeneratorContext(
new Configuration(),
Thread.currentThread().getContextClassLoader()),
"GroupKey",
inputType,
(RowType) Projection.of(groupingSet).project(inputType),
groupingSet),
ProjectionCodeGenerator.generateProjection(
new CodeGeneratorContext(new Configuration()),
new CodeGeneratorContext(
new Configuration(),
Thread.currentThread().getContextClassLoader()),
"GroupSet",
inputType,
(RowType) Projection.of(groupingSet).project(inputType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,19 +344,25 @@ public AbstractArrowPythonAggregateFunctionOperator getTestOperator(
5000L,
new int[] {0, 1},
ProjectionCodeGenerator.generateProjection(
new CodeGeneratorContext(new Configuration()),
new CodeGeneratorContext(
new Configuration(),
Thread.currentThread().getContextClassLoader()),
"UdafInputProjection",
inputRowType,
udfInputType,
udafInputOffsets),
ProjectionCodeGenerator.generateProjection(
new CodeGeneratorContext(new Configuration()),
new CodeGeneratorContext(
new Configuration(),
Thread.currentThread().getContextClassLoader()),
"GroupKey",
inputRowType,
(RowType) Projection.of(groupingSet).project(inputRowType),
groupingSet),
ProjectionCodeGenerator.generateProjection(
new CodeGeneratorContext(new Configuration()),
new CodeGeneratorContext(
new Configuration(),
Thread.currentThread().getContextClassLoader()),
"GroupSet",
inputRowType,
(RowType) Projection.of(groupingSet).project(inputRowType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,19 +243,25 @@ public AbstractArrowPythonAggregateFunctionOperator getTestOperator(
3,
true,
ProjectionCodeGenerator.generateProjection(
new CodeGeneratorContext(new Configuration()),
new CodeGeneratorContext(
new Configuration(),
Thread.currentThread().getContextClassLoader()),
"UdafInputProjection",
inputRowType,
udfInputType,
udafInputOffsets),
ProjectionCodeGenerator.generateProjection(
new CodeGeneratorContext(new Configuration()),
new CodeGeneratorContext(
new Configuration(),
Thread.currentThread().getContextClassLoader()),
"GroupKey",
inputRowType,
(RowType) Projection.of(groupingSet).project(inputRowType),
groupingSet),
ProjectionCodeGenerator.generateProjection(
new CodeGeneratorContext(new Configuration()),
new CodeGeneratorContext(
new Configuration(),
Thread.currentThread().getContextClassLoader()),
"GroupSet",
inputRowType,
(RowType) Projection.of(groupingSet).project(inputRowType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ public OneInputStreamOperatorTestHarness<RowData, RowData> getTestHarness(Config

int[] grouping = new int[] {0};
RowDataKeySelector keySelector =
KeySelectorUtil.getRowDataSelector(grouping, InternalTypeInfo.of(getInputType()));
KeySelectorUtil.getRowDataSelector(
Thread.currentThread().getContextClassLoader(),
grouping,
InternalTypeInfo.of(getInputType()));
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(
operator, keySelector, keySelector.getProducedType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,9 @@ public AbstractArrowPythonAggregateFunctionOperator getTestOperator(
},
UTC_ZONE_ID,
ProjectionCodeGenerator.generateProjection(
new CodeGeneratorContext(new Configuration()),
new CodeGeneratorContext(
new Configuration(),
Thread.currentThread().getContextClassLoader()),
"UdafInputProjection",
inputType,
udfInputType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ public AbstractArrowPythonAggregateFunctionOperator getTestOperator(
-1,
100L,
ProjectionCodeGenerator.generateProjection(
new CodeGeneratorContext(new Configuration()),
new CodeGeneratorContext(
new Configuration(),
Thread.currentThread().getContextClassLoader()),
"UdafInputProjection",
inputType,
udfInputType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ public AbstractArrowPythonAggregateFunctionOperator getTestOperator(
3,
1,
ProjectionCodeGenerator.generateProjection(
new CodeGeneratorContext(new Configuration()),
new CodeGeneratorContext(
new Configuration(),
Thread.currentThread().getContextClassLoader()),
"UdafInputProjection",
inputType,
udfInputType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,9 @@ public AbstractArrowPythonAggregateFunctionOperator getTestOperator(
3,
3L,
ProjectionCodeGenerator.generateProjection(
new CodeGeneratorContext(new Configuration()),
new CodeGeneratorContext(
new Configuration(),
Thread.currentThread().getContextClassLoader()),
"UdafInputProjection",
inputType,
udfInputType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ public AbstractArrowPythonAggregateFunctionOperator getTestOperator(
3,
1,
ProjectionCodeGenerator.generateProjection(
new CodeGeneratorContext(new Configuration()),
new CodeGeneratorContext(
new Configuration(),
Thread.currentThread().getContextClassLoader()),
"UdafInputProjection",
inputType,
udfInputType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,17 @@ public AbstractPythonScalarFunctionOperator getTestOperator(
udfInputType,
udfOutputType,
ProjectionCodeGenerator.generateProjection(
new CodeGeneratorContext(new Configuration()),
new CodeGeneratorContext(
new Configuration(),
Thread.currentThread().getContextClassLoader()),
"UdfInputProjection",
inputType,
udfInputType,
udfInputOffsets),
ProjectionCodeGenerator.generateProjection(
new CodeGeneratorContext(new Configuration()),
new CodeGeneratorContext(
new Configuration(),
Thread.currentThread().getContextClassLoader()),
"ForwardedFieldProjection",
inputType,
forwardedFieldType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,17 @@ public ArrowPythonScalarFunctionOperator getTestOperator(
udfInputType,
udfOutputType,
ProjectionCodeGenerator.generateProjection(
new CodeGeneratorContext(new Configuration()),
new CodeGeneratorContext(
new Configuration(),
Thread.currentThread().getContextClassLoader()),
"UdfInputProjection",
inputType,
udfInputType,
udfInputOffsets),
ProjectionCodeGenerator.generateProjection(
new CodeGeneratorContext(new Configuration()),
new CodeGeneratorContext(
new Configuration(),
Thread.currentThread().getContextClassLoader()),
"ForwardedFieldProjection",
inputType,
forwardedFieldType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ public PythonTableFunctionOperator getTestOperator(
udfOutputType,
joinRelType,
ProjectionCodeGenerator.generateProjection(
new CodeGeneratorContext(new Configuration()),
new CodeGeneratorContext(
new Configuration(),
Thread.currentThread().getContextClassLoader()),
"UdtfInputProjection",
inputType,
udfInputType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ public FrameworkConfig createFrameworkConfig() {
.sqlToRelConverterConfig(getSqlToRelConverterConfig())
.operatorTable(getSqlOperatorTable(getCalciteConfig()))
// set the executor to evaluate constant expressions
.executor(new ExpressionReducer(context.getTableConfig(), false))
.executor(
new ExpressionReducer(
context.getTableConfig(), context.getClassLoader(), false))
.context(context)
.traitDefs(traitDefs)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public void apply(DynamicTableSource tableSource, SourceAbilityContext context)
GeneratedWatermarkGenerator generatedWatermarkGenerator =
WatermarkGeneratorCodeGenerator.generateWatermarkGenerator(
context.getTableConfig(),
context.getClassLoader(),
context.getSourceRowType(),
watermarkExpr,
Option.apply("context"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ protected Transformation<RowData> translateToPlanInternal(
final Transformation<?> sourceTransform = dataStream.getTransformation();
if (needInternalConversion()) {
return ScanUtil.convertToInternalRow(
new CodeGeneratorContext(config),
new CodeGeneratorContext(config, planner.getFlinkContext().getClassLoader()),
(Transformation<Object>) sourceTransform,
fieldIndexes,
sourceType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,10 @@ protected Transformation<RowData> translateToPlanInternal(
case HASH:
partitioner =
createHashPartitioner(
((HashDistribution) requiredDistribution), inputType, config);
((HashDistribution) requiredDistribution),
inputType,
config,
planner.getFlinkContext().getClassLoader());
parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
break;
case KEEP_INPUT_AS_IS:
Expand All @@ -181,7 +184,8 @@ protected Transformation<RowData> translateToPlanInternal(
createHashPartitioner(
((HashDistribution) inputDistribution),
inputType,
config));
config,
planner.getFlinkContext().getClassLoader()));
}
parallelism = inputTransform.getParallelism();
break;
Expand All @@ -201,15 +205,21 @@ protected Transformation<RowData> translateToPlanInternal(
}

private BinaryHashPartitioner createHashPartitioner(
HashDistribution hashDistribution, RowType inputType, ExecNodeConfig config) {
HashDistribution hashDistribution,
RowType inputType,
ExecNodeConfig config,
ClassLoader classLoader) {
int[] keys = hashDistribution.getKeys();
String[] fieldNames =
Arrays.stream(keys)
.mapToObj(i -> inputType.getFieldNames().get(i))
.toArray(String[]::new);
return new BinaryHashPartitioner(
HashCodeGenerator.generateRowHash(
new CodeGeneratorContext(config), inputType, "HashPartitioner", keys),
new CodeGeneratorContext(config, classLoader),
inputType,
"HashPartitioner",
keys),
fieldNames);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ protected Transformation<RowData> translateToPlanInternal(
final RowType inputRowType = (RowType) inputEdge.getOutputType();
final RowType outputRowType = (RowType) getOutputType();

final CodeGeneratorContext ctx = new CodeGeneratorContext(config);
final CodeGeneratorContext ctx =
new CodeGeneratorContext(config, planner.getFlinkContext().getClassLoader());

final AggregateInfoList aggInfos =
AggregateUtil.transformToBatchAggregateInfoList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,19 +115,25 @@ protected Transformation<RowData> translateToPlanInternal(

GeneratedJoinCondition condFunc =
JoinUtil.generateConditionFunction(
config, joinSpec.getNonEquiCondition().orElse(null), leftType, rightType);
config,
planner.getFlinkContext().getClassLoader(),
joinSpec.getNonEquiCondition().orElse(null),
leftType,
rightType);

// projection for equals
GeneratedProjection leftProj =
ProjectionCodeGenerator.generateProjection(
new CodeGeneratorContext(config),
new CodeGeneratorContext(
config, planner.getFlinkContext().getClassLoader()),
"HashJoinLeftProjection",
leftType,
keyType,
leftKeys);
GeneratedProjection rightProj =
ProjectionCodeGenerator.generateProjection(
new CodeGeneratorContext(config),
new CodeGeneratorContext(
config, planner.getFlinkContext().getClassLoader()),
"HashJoinRightProjection",
rightType,
keyType,
Expand Down Expand Up @@ -187,6 +193,7 @@ protected Transformation<RowData> translateToPlanInternal(
operator =
LongHashJoinGenerator.gen(
config,
planner.getFlinkContext().getClassLoader(),
hashJoinType,
keyType,
buildType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ protected Transformation<RowData> translateToPlanInternal(
final RowType inputRowType = (RowType) inputEdge.getOutputType();
final HashWindowCodeGenerator hashWindowCodeGenerator =
new HashWindowCodeGenerator(
new CodeGeneratorContext(config),
new CodeGeneratorContext(
config, planner.getFlinkContext().getClassLoader()),
planner.createRelBuilder(),
window,
inputTimeFieldIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ protected Transformation<RowData> translateToPlanInternal(
protected Transformation<RowData> createConversionTransformationIfNeeded(
StreamExecutionEnvironment streamExecEnv,
ExecNodeConfig config,
ClassLoader classLoader,
Transformation<?> sourceTransform,
@Nullable RexNode rowtimeExpression) {
final int[] fieldIndexes = computeIndexMapping(false);
Expand All @@ -99,7 +100,7 @@ protected Transformation<RowData> createConversionTransformationIfNeeded(
TableSourceUtil.fixPrecisionForProducedDataType(
tableSource, (RowType) getOutputType());
return ScanUtil.convertToInternalRow(
new CodeGeneratorContext(config),
new CodeGeneratorContext(config, classLoader),
(Transformation<Object>) sourceTransform,
fieldIndexes,
fixedProducedDataType,
Expand Down
Loading

0 comments on commit 0fe20bc

Please sign in to comment.