Skip to content

Commit

Permalink
[FLINK-21738][table] Caching built-in functions for CoreModule and Hi…
Browse files Browse the repository at this point in the history
…veModule

This closes apache#15267
  • Loading branch information
zoucao authored and wuchong committed Mar 28, 2021
1 parent 4782318 commit 912fc8d
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class HiveModule implements Module {
private final HiveFunctionDefinitionFactory factory;
private final String hiveVersion;
private final HiveShim hiveShim;
private Set<String> functionNames;

public HiveModule() {
this(HiveShimLoader.getHiveVersion());
Expand All @@ -86,13 +87,17 @@ public HiveModule(String hiveVersion) {
this.hiveVersion = hiveVersion;
this.hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
this.factory = new HiveFunctionDefinitionFactory(hiveShim);
this.functionNames = new HashSet<>();
}

@Override
public Set<String> listFunctions() {
Set<String> builtInFuncs = hiveShim.listBuiltInFunctions();
builtInFuncs.removeAll(BUILT_IN_FUNC_BLACKLIST);
return builtInFuncs;
// lazy initialize
if (functionNames.isEmpty()) {
functionNames = hiveShim.listBuiltInFunctions();
functionNames.removeAll(BUILT_IN_FUNC_BLACKLIST);
}
return functionNames;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,19 +173,15 @@ public Set<String> listFunctions() {
* @return an optional of {@link FunctionDefinition}
*/
public Optional<FunctionDefinition> getFunctionDefinition(String name) {
Optional<String> module =
usedModules.stream()
.filter(
n ->
loadedModules.get(n).listFunctions().stream()
.anyMatch(e -> e.equalsIgnoreCase(name)))
.findFirst();
if (module.isPresent()) {
LOG.debug("Got FunctionDefinition '{}' from '{}' module.", name, module.get());
return loadedModules.get(module.get()).getFunctionDefinition(name);
for (String moduleName : usedModules) {
if (loadedModules.get(moduleName).listFunctions().stream()
.anyMatch(name::equalsIgnoreCase)) {
LOG.debug("Got FunctionDefinition '{}' from '{}' module.", name, moduleName);
return loadedModules.get(moduleName).getFunctionDefinition(name);
}
}
LOG.debug("Cannot find FunctionDefinition '{}' from any loaded modules.", name);

LOG.debug("Cannot find FunctionDefinition '{}' from any loaded modules.", name);
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@

package org.apache.flink.table.module;

import org.apache.flink.table.functions.BuiltInFunctionDefinition;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;

import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
Expand All @@ -29,19 +32,29 @@
/** Module of default core metadata in Flink. */
public class CoreModule implements Module {
public static final CoreModule INSTANCE = new CoreModule();
private final List<BuiltInFunctionDefinition> functionDefinitions;
private Set<String> functionNames;

private CoreModule() {}
private CoreModule() {
this.functionDefinitions = BuiltInFunctionDefinitions.getDefinitions();
this.functionNames = new HashSet<>();
}

@Override
public Set<String> listFunctions() {
return BuiltInFunctionDefinitions.getDefinitions().stream()
.map(f -> f.getName())
.collect(Collectors.toSet());
// lazy initialize
if (functionNames.isEmpty()) {
functionNames =
functionDefinitions.stream()
.map(BuiltInFunctionDefinition::getName)
.collect(Collectors.toSet());
}
return functionNames;
}

@Override
public Optional<FunctionDefinition> getFunctionDefinition(String name) {
return BuiltInFunctionDefinitions.getDefinitions().stream()
return functionDefinitions.stream()
.filter(f -> f.getName().equalsIgnoreCase(name))
.findFirst()
.map(Function.identity());
Expand Down

0 comments on commit 912fc8d

Please sign in to comment.