Skip to content

Commit

Permalink
[FLINK-20805][table][build] Apply spotless formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Dec 30, 2020
1 parent 2b5e6f5 commit 481ccff
Show file tree
Hide file tree
Showing 35 changed files with 552 additions and 612 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@
*/
public interface AggsHandleFunction extends AggsHandleFunctionBase {

/**
* Gets the result of the aggregation from the current accumulators.
* @return the final result (saved in a row) of the current accumulators.
*/
RowData getValue() throws Exception;
/**
* Gets the result of the aggregation from the current accumulators.
*
* @return the final result (saved in a row) of the current accumulators.
*/
RowData getValue() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,74 +27,71 @@
/**
* The base class for handling aggregate or table aggregate functions.
*
* <p>It is code generated to handle all {@link AggregateFunction}s and
* {@link TableAggregateFunction}s together in an aggregation.
* <p>It is code generated to handle all {@link AggregateFunction}s and {@link
* TableAggregateFunction}s together in an aggregation.
*
* <p>It is the entry point for aggregate operators to operate all {@link AggregateFunction}s and
* {@link TableAggregateFunction}s.
*/
public interface AggsHandleFunctionBase extends Function {

/**
* Initialization method for the function. It is called before the actual working methods.
*/
void open(StateDataViewStore store) throws Exception;
/** Initialization method for the function. It is called before the actual working methods. */
void open(StateDataViewStore store) throws Exception;

/**
* Accumulates the input values to the accumulators.
* @param input input values bundled in a row
*/
void accumulate(RowData input) throws Exception;
/**
* Accumulates the input values to the accumulators.
*
* @param input input values bundled in a row
*/
void accumulate(RowData input) throws Exception;

/**
* Retracts the input values from the accumulators.
* @param input input values bundled in a row
*/
void retract(RowData input) throws Exception;
/**
* Retracts the input values from the accumulators.
*
* @param input input values bundled in a row
*/
void retract(RowData input) throws Exception;

/**
* Merges the other accumulators into current accumulators.
*
* @param accumulators The other row of accumulators
*/
void merge(RowData accumulators) throws Exception;
/**
* Merges the other accumulators into current accumulators.
*
* @param accumulators The other row of accumulators
*/
void merge(RowData accumulators) throws Exception;

/**
* Set the current accumulators (saved in a row) which contains the current aggregated results.
* In streaming: accumulators are store in the state, we need to restore aggregate buffers from state.
* In batch: accumulators are store in the hashMap, we need to restore aggregate buffers from hashMap.
*
* @param accumulators current accumulators
*/
void setAccumulators(RowData accumulators) throws Exception;
/**
* Set the current accumulators (saved in a row) which contains the current aggregated results.
* In streaming: accumulators are store in the state, we need to restore aggregate buffers from
* state. In batch: accumulators are store in the hashMap, we need to restore aggregate buffers
* from hashMap.
*
* @param accumulators current accumulators
*/
void setAccumulators(RowData accumulators) throws Exception;

/**
* Resets all the accumulators.
*/
void resetAccumulators() throws Exception;
/** Resets all the accumulators. */
void resetAccumulators() throws Exception;

/**
* Gets the current accumulators (saved in a row) which contains the current
* aggregated results.
* @return the current accumulators
*/
RowData getAccumulators() throws Exception;
/**
* Gets the current accumulators (saved in a row) which contains the current aggregated results.
*
* @return the current accumulators
*/
RowData getAccumulators() throws Exception;

/**
* Initializes the accumulators and save them to a accumulators row.
*
* @return a row of accumulators which contains the aggregated results
*/
RowData createAccumulators() throws Exception;
/**
* Initializes the accumulators and save them to a accumulators row.
*
* @return a row of accumulators which contains the aggregated results
*/
RowData createAccumulators() throws Exception;

/**
* Cleanup for the retired accumulators state.
*/
void cleanup() throws Exception;
/** Cleanup for the retired accumulators state. */
void cleanup() throws Exception;

/**
* Tear-down method for this function. It can be used for clean up work.
* By default, this method does nothing.
*/
void close() throws Exception;
/**
* Tear-down method for this function. It can be used for clean up work. By default, this method
* does nothing.
*/
void close() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,75 +30,81 @@

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Utilities to compile a generated code to a Class.
*/
/** Utilities to compile a generated code to a Class. */
public final class CompileUtils {

// used for logging the generated codes to a same place
private static final Logger CODE_LOG = LoggerFactory.getLogger(CompileUtils.class);
// used for logging the generated codes to a same place
private static final Logger CODE_LOG = LoggerFactory.getLogger(CompileUtils.class);

/**
* Cache of compile, Janino generates a new Class Loader and a new Class file every compile
* (guaranteeing that the class name will not be repeated). This leads to multiple tasks of
* the same process that generate a large number of duplicate class, resulting in a large
* number of Meta zone GC (class unloading), resulting in performance bottlenecks. So we add
* a cache to avoid this problem.
*/
protected static final Cache<String, Cache<ClassLoader, Class>> COMPILED_CACHE = CacheBuilder
.newBuilder()
.maximumSize(100) // estimated cache size
.build();
/**
* Cache of compile, Janino generates a new Class Loader and a new Class file every compile
* (guaranteeing that the class name will not be repeated). This leads to multiple tasks of the
* same process that generate a large number of duplicate class, resulting in a large number of
* Meta zone GC (class unloading), resulting in performance bottlenecks. So we add a cache to
* avoid this problem.
*/
protected static final Cache<String, Cache<ClassLoader, Class>> COMPILED_CACHE =
CacheBuilder.newBuilder()
.maximumSize(100) // estimated cache size
.build();

/**
* Compiles a generated code to a Class.
* @param cl the ClassLoader used to load the class
* @param name the class name
* @param code the generated code
* @param <T> the class type
* @return the compiled class
*/
@SuppressWarnings("unchecked")
public static <T> Class<T> compile(ClassLoader cl, String name, String code) {
try {
Cache<ClassLoader, Class> compiledClasses = COMPILED_CACHE.get(name,
() -> CacheBuilder.newBuilder().maximumSize(5).weakKeys().softValues().build());
return compiledClasses.get(cl, () -> doCompile(cl, name, code));
} catch (Exception e) {
throw new FlinkRuntimeException(e.getMessage(), e);
}
}
/**
* Compiles a generated code to a Class.
*
* @param cl the ClassLoader used to load the class
* @param name the class name
* @param code the generated code
* @param <T> the class type
* @return the compiled class
*/
@SuppressWarnings("unchecked")
public static <T> Class<T> compile(ClassLoader cl, String name, String code) {
try {
Cache<ClassLoader, Class> compiledClasses =
COMPILED_CACHE.get(
name,
() ->
CacheBuilder.newBuilder()
.maximumSize(5)
.weakKeys()
.softValues()
.build());
return compiledClasses.get(cl, () -> doCompile(cl, name, code));
} catch (Exception e) {
throw new FlinkRuntimeException(e.getMessage(), e);
}
}

private static <T> Class<T> doCompile(ClassLoader cl, String name, String code) {
checkNotNull(cl, "Classloader must not be null.");
CODE_LOG.debug("Compiling: {} \n\n Code:\n{}", name, code);
SimpleCompiler compiler = new SimpleCompiler();
compiler.setParentClassLoader(cl);
try {
compiler.cook(code);
} catch (Throwable t) {
System.out.println(addLineNumber(code));
throw new InvalidProgramException(
"Table program cannot be compiled. This is a bug. Please file an issue.", t);
}
try {
//noinspection unchecked
return (Class<T>) compiler.getClassLoader().loadClass(name);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Can not load class " + name, e);
}
}
private static <T> Class<T> doCompile(ClassLoader cl, String name, String code) {
checkNotNull(cl, "Classloader must not be null.");
CODE_LOG.debug("Compiling: {} \n\n Code:\n{}", name, code);
SimpleCompiler compiler = new SimpleCompiler();
compiler.setParentClassLoader(cl);
try {
compiler.cook(code);
} catch (Throwable t) {
System.out.println(addLineNumber(code));
throw new InvalidProgramException(
"Table program cannot be compiled. This is a bug. Please file an issue.", t);
}
try {
//noinspection unchecked
return (Class<T>) compiler.getClassLoader().loadClass(name);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Can not load class " + name, e);
}
}

/**
* To output more information when an error occurs.
* Generally, when cook fails, it shows which line is wrong. This line number starts at 1.
*/
private static String addLineNumber(String code) {
String[] lines = code.split("\n");
StringBuilder builder = new StringBuilder();
for (int i = 0; i < lines.length; i++) {
builder.append("/* ").append(i + 1).append(" */").append(lines[i]).append("\n");
}
return builder.toString();
}
/**
* To output more information when an error occurs. Generally, when cook fails, it shows which
* line is wrong. This line number starts at 1.
*/
private static String addLineNumber(String code) {
String[] lines = code.split("\n");
StringBuilder builder = new StringBuilder();
for (int i = 0; i < lines.length; i++) {
builder.append("/* ").append(i + 1).append(" */").append(lines[i]).append("\n");
}
return builder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@

package org.apache.flink.table.runtime.generated;

/**
* Describes a generated {@link AggsHandleFunction}.
*/
/** Describes a generated {@link AggsHandleFunction}. */
public class GeneratedAggsHandleFunction extends GeneratedClass<AggsHandleFunction> {

private static final long serialVersionUID = 1L;
private static final long serialVersionUID = 1L;

public GeneratedAggsHandleFunction(String className, String code, Object[] references) {
super(className, code, references);
}
public GeneratedAggsHandleFunction(String className, String code, Object[] references) {
super(className, code, references);
}
}
Loading

0 comments on commit 481ccff

Please sign in to comment.