diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java index 3f6463a07aed5..9cbda50efc5b3 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java @@ -42,6 +42,7 @@ public RichCombineToGroupCombineWrapper(F wrappedFunction) { @Override public void open(Configuration config) throws Exception { + wrappedFunction.setRuntimeContext(getRuntimeContext()); wrappedFunction.open(config); } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala index 330386b1e93e0..4dbaeea46bab0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala @@ -70,7 +70,7 @@ class AggregateAggFunction(genAggregations: GeneratedAggregationsFunction) LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + s"Code:\n$genAggregations.code") val clazz = compile( - getClass.getClassLoader, + Thread.currentThread().getContextClassLoader, genAggregations.name, genAggregations.code) LOG.debug("Instantiating AggregateHelper.") diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala index bc0c16385a04b..ced14504e8e37 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala @@ -46,7 +46,7 @@ class DataSetAggFunction( LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + s"Code:\n$genAggregations.code") val clazz = compile( - getClass.getClassLoader, + getRuntimeContext.getUserCodeClassLoader, genAggregations.name, genAggregations.code) LOG.debug("Instantiating AggregateHelper.") diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala index 3b3be708c3c9e..f2eb3d943dbfc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala @@ -47,7 +47,7 @@ class DataSetFinalAggFunction( LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + s"Code:\n$genAggregations.code") val clazz = compile( - getClass.getClassLoader, + getRuntimeContext.getUserCodeClassLoader, genAggregations.name, genAggregations.code) LOG.debug("Instantiating AggregateHelper.") diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala index fc3366bd31691..744a739a394d2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala @@ -48,7 +48,7 @@ class DataSetPreAggFunction(genAggregations: GeneratedAggregationsFunction) LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + s"Code:\n$genAggregations.code") val clazz = compile( - getClass.getClassLoader, + getRuntimeContext.getUserCodeClassLoader, genAggregations.name, genAggregations.code) LOG.debug("Instantiating AggregateHelper.") diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala index 372fc0d27c11e..0d54de62335a7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala @@ -72,7 +72,7 @@ class DataSetSessionWindowAggReduceGroupFunction( LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + s"Code:\n$genAggregations.code") val clazz = compile( - getClass.getClassLoader, + getRuntimeContext.getUserCodeClassLoader, genAggregations.name, genAggregations.code) LOG.debug("Instantiating AggregateHelper.") diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala index 666bfee19348e..35e814264a7f0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala @@ -59,7 +59,7 @@ class DataSetSessionWindowAggregatePreProcessor( LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + s"Code:\n$genAggregations.code") val clazz = compile( - getClass.getClassLoader, + getRuntimeContext.getUserCodeClassLoader, genAggregations.name, genAggregations.code) LOG.debug("Instantiating AggregateHelper.") diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala index 3af7969a37c06..f2987a74b5e39 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala @@ -68,7 +68,7 @@ class DataSetSlideTimeWindowAggReduceGroupFunction( LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + s"Code:\n$genAggregations.code") val clazz = compile( - getClass.getClassLoader, + getRuntimeContext.getUserCodeClassLoader, genAggregations.name, genAggregations.code) LOG.debug("Instantiating AggregateHelper.") diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala index 2da838fe8abf7..6a9d63122746e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala @@ -67,7 +67,7 @@ class DataSetSlideWindowAggReduceCombineFunction( LOG.debug(s"Compiling AggregateHelper: $genPreAggregations.name \n\n " + s"Code:\n$genPreAggregations.code") val clazz = compile( - getClass.getClassLoader, + getRuntimeContext.getUserCodeClassLoader, genPreAggregations.name, genPreAggregations.code) LOG.debug("Instantiating AggregateHelper.") diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala index 474a09b5d45a4..f96e8416bf147 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala @@ -63,7 +63,7 @@ class DataSetSlideWindowAggReduceGroupFunction( LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + s"Code:\n$genAggregations.code") val clazz = compile( - getClass.getClassLoader, + getRuntimeContext.getUserCodeClassLoader, genAggregations.name, genAggregations.code) LOG.debug("Instantiating AggregateHelper.") diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala index 22fe389a5b250..f4d347a9ad7d0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala @@ -50,7 +50,7 @@ class DataSetTumbleCountWindowAggReduceGroupFunction( LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + s"Code:\n$genAggregations.code") val clazz = compile( - getClass.getClassLoader, + getRuntimeContext.getUserCodeClassLoader, genAggregations.name, genAggregations.code) LOG.debug("Instantiating AggregateHelper.") diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala index 9eeab33c45b1d..a3a72ae47677b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala @@ -66,7 +66,7 @@ class DataSetTumbleTimeWindowAggReduceCombineFunction( LOG.debug(s"Compiling AggregateHelper: $genPreAggregations.name \n\n " + s"Code:\n$genPreAggregations.code") val clazz = compile( - getClass.getClassLoader, + getRuntimeContext.getUserCodeClassLoader, genPreAggregations.name, genPreAggregations.code) LOG.debug("Instantiating AggregateHelper.") diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala index 4e9214850a485..14e89adca33da 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala @@ -62,7 +62,7 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction( LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + s"Code:\n$genAggregations.code") val clazz = compile( - getClass.getClassLoader, + getRuntimeContext.getUserCodeClassLoader, genAggregations.name, genAggregations.code) LOG.debug("Instantiating AggregateHelper.")