diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java index 6d6453fb310a7..5f31d835f05cb 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java @@ -229,12 +229,14 @@ boolean shouldRunFetchTask() { *

The correctness can be think of in the following way. The purpose of wake up * is to let the fetcher thread go to the very beginning of the running loop. * There are three major events in each run of the loop. + * *

    *
  1. pick a task (blocking) *
  2. assign the task to runningTask variable. *
  3. run the runningTask. (blocking) *
- * We don't need to worry about things after step 3 because there is no blocking point + * + *

We don't need to worry about things after step 3 because there is no blocking point * anymore. * *

We always first set the wakeup flag when waking up the fetcher, then use the diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java index 9a60b5bc0f66b..8e24132a195f7 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java @@ -151,9 +151,8 @@ public CassandraSink setParallelism(int parallelism) { /** * Turns off chaining for this operator so thread co-location will not be * used as an optimization. - *

- *

- * Chaining can be turned off for the whole + * + *

Chaining can be turned off for the whole * job by {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#disableOperatorChaining()} * however it is not advised for performance considerations. * diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java index 46d3d37a93572..88298d1109de1 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java @@ -230,12 +230,14 @@ private static Collection splitsToPaths(Collection splits /** * The generic base builder. This builder carries a SELF type to make it convenient to * extend this for subclasses, using the following pattern. + * *

{@code
 	 * public class SubBuilder extends AbstractFileSourceBuilder> {
 	 *     ...
 	 * }
 	 * }
- * That way, all return values from builder method defined here are typed to the sub-class + * + *

That way, all return values from builder method defined here are typed to the sub-class * type and support fluent chaining. * *

We don't make the publicly visible builder generic with a SELF type, because it leads to diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java index 3e77e4282f31b..162b434ecab58 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java @@ -46,12 +46,14 @@ * accessed via the Flink's {@link FileSystem} class. * *

Start building a file source via one of the following calls: + * *

- * This creates a {@link FileSource.FileSourceBuilder} on which you can configure all the + * + *

This creates a {@link FileSource.FileSourceBuilder} on which you can configure all the * properties of the file source. * *

Batch and Streaming

@@ -71,6 +73,7 @@ *

The reading of each file happens through file readers defined by file formats. * These define the parsing logic for the contents of the file. There are multiple classes that * the source supports. Their interfaces trade of simplicity of implementation and flexibility/efficiency. + * *

- * In other words, any particular generated id will always be assigned to one and only one subtask. + * + *

In other words, any particular generated id will always be assigned to one and only one subtask. */ @Internal public class TransactionalIdsGenerator { diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java index 7e0d1fbe45622..b70edce4e6f03 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java @@ -26,8 +26,9 @@ * *

Note, one Kafka partition can contain multiple Flink partitions. * - *

Cases: - * # More Flink partitions than kafka partitions + *

There are a couple of cases to consider. + * + *

More Flink partitions than kafka partitions

*
  * 		Flink Sinks:		Kafka Partitions
  * 			1	---------------->	1
@@ -35,9 +36,11 @@
  * 			3   -------------/
  * 			4	------------/
  * 
- * Some (or all) kafka partitions contain the output of more than one flink partition * - *

Fewer Flink partitions than Kafka + *

Some (or all) kafka partitions contain the output of more than one flink partition + * + *

Fewer Flink partitions than Kafka

+ * *
  * 		Flink Sinks:		Kafka Partitions
  * 			1	---------------->	1
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
index f7cd0f4f0dfc2..bd2e745af9350 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
@@ -52,7 +52,7 @@
  * a {@link FlinkKafkaShuffleConsumer} together into a {@link FlinkKafkaShuffle}.
  * Here is an example how to use a {@link FlinkKafkaShuffle}.
  *
- * 

{@code
+ * 
{@code
  *	StreamExecutionEnvironment env = ... 					// create execution environment
  * 	DataStream source = env.addSource(...)				// add data stream source
  * 	DataStream dataStream = ...							// some transformation(s) based on source
@@ -91,7 +91,7 @@
  * 			decoupled to three regions: `KafkaShuffleProducer', `KafkaShuffleConsumer' and `KafkaShuffleConsumerReuse'
  * 			through `PERSISTENT DATA` as shown below. If any region fails the execution, the other two keep progressing.
  *
- * 

+ * 
  *     source -> ... KafkaShuffleProducer -> PERSISTENT DATA -> KafkaShuffleConsumer -> ...
  *                                                |
  *                                                | ----------> KafkaShuffleConsumerReuse -> ...
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java
index 19e8db5cdb725..893ba592faebb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java
@@ -29,7 +29,9 @@
  * 
{@code
  * TypeInformation> info = TypeInformation.of(new TypeHint>(){});
  * }
- * or + * + *

or + * *

{@code
  * TypeInformation> info = new TypeHint>(){}.getTypeInfo();
  * }
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java b/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java index 5594f9ebf3b09..3de036d0e91fb 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java @@ -41,13 +41,15 @@ public interface KeySelector extends Function, Serializable { * User-defined function that deterministically extracts the key from an object. * *

For example for a class: + * *

 	 * 	public class Word {
 	 * 		String word;
 	 * 		int count;
 	 * 	}
 	 * 
- * The key extractor could return the word as + * + *

The key extractor could return the word as * a key to group all Word objects by the String they contain. * *

The code would look like this diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileStatus.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileStatus.java index f9794e6be6379..6d23d245af08b 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileStatus.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileStatus.java @@ -17,7 +17,7 @@ */ -/** +/* * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for * additional information regarding copyright ownership. diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index 6c6d2a44b1c4d..09defff996b18 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -121,34 +121,36 @@ * *

Examples

* - *
    - *
  • For fault-tolerant distributed file systems, data is considered persistent once - * it has been received and acknowledged by the file system, typically by having been replicated - * to a quorum of machines (durability requirement). In addition the absolute file path - * must be visible to all other machines that will potentially access the file (visibility - * requirement). + *

    Fault-tolerant distributed file systems

    * - *

    Whether data has hit non-volatile storage on the storage nodes depends on the specific - * guarantees of the particular file system. + *

    For fault-tolerant distributed file systems, data is considered persistent once + * it has been received and acknowledged by the file system, typically by having been replicated + * to a quorum of machines (durability requirement). In addition the absolute file path + * must be visible to all other machines that will potentially access the file (visibility + * requirement). * - *

    The metadata updates to the file's parent directory are not required to have reached - * a consistent state. It is permissible that some machines see the file when listing the parent - * directory's contents while others do not, as long as access to the file by its absolute path - * is possible on all nodes.

  • + *

    Whether data has hit non-volatile storage on the storage nodes depends on the specific + * guarantees of the particular file system. * - *

  • A local file system must support the POSIX close-to-open semantics. - * Because the local file system does not have any fault tolerance guarantees, no further - * requirements exist. + *

    The metadata updates to the file's parent directory are not required to have reached + * a consistent state. It is permissible that some machines see the file when listing the parent + * directory's contents while others do not, as long as access to the file by its absolute path + * is possible on all nodes. * - *

    The above implies specifically that data may still be in the OS cache when considered - * persistent from the local file system's perspective. Crashes that cause the OS cache to loose - * data are considered fatal to the local machine and are not covered by the local file system's - * guarantees as defined by Flink. + *

    Local file systems

    * - *

    That means that computed results, checkpoints, and savepoints that are written only to - * the local filesystem are not guaranteed to be recoverable from the local machine's failure, - * making local file systems unsuitable for production setups.

  • - *
+ *

A local file system must support the POSIX close-to-open semantics. + * Because the local file system does not have any fault tolerance guarantees, no further + * requirements exist. + * + *

The above implies specifically that data may still be in the OS cache when considered + * persistent from the local file system's perspective. Crashes that cause the OS cache to loose + * data are considered fatal to the local machine and are not covered by the local file system's + * guarantees as defined by Flink. + * + *

That means that computed results, checkpoints, and savepoints that are written only to + * the local filesystem are not guaranteed to be recoverable from the local machine's failure, + * making local file systems unsuitable for production setups. * *

Updating File Contents

* diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java index 82dbd56ef5bff..3b0ef546653c7 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java @@ -61,7 +61,7 @@ * Note that this code realizes both the byte order swapping and the reinterpret cast access to * get a long from the byte array. * - *

+ * 
  * [Verified Entry Point]
  *   0x00007fc403e19920: sub    $0x18,%rsp
  *   0x00007fc403e19927: mov    %rbp,0x10(%rsp)    ;*synchronization entry
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
index 0874999b1c9cc..bba9731a6817a 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
@@ -36,7 +36,7 @@
  *
  * 

This program implements the following SQL equivalent: * - *

{@code
+ * 
{@code
  * SELECT
  *        c_custkey,
  *        c_name,
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
index cf9f078367e48..f6003de9e6d2b 100644
--- a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
@@ -40,7 +40,7 @@
  *
  * 

This program implements the following SQL equivalent: * - *

{@code
+ * 
{@code
  * SELECT
  *      l_orderkey,
  *      SUM(l_extendedprice*(1-l_discount)) AS revenue,
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
index 89a6343f9e8f7..c6dd4d50a0cc1 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
@@ -33,10 +33,12 @@
  * 

This program connects to a server socket and reads strings from the socket. * The easiest way to try this out is to open a text server (at port 12345) * using the netcat tool via + * *

  * nc -l 12345 on Linux or nc -l -p 12345 on Windows
  * 
- * and run this example with the hostname and the port as arguments. + * + *

and run this example with the hostname and the port as arguments. */ @SuppressWarnings("serial") public class SocketWindowWordCount { diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java index c4253f4d9df29..84c4ea2b13645 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java @@ -53,7 +53,8 @@ *

  • serialization via reflection (ReflectDatumReader / -Writer)
  • *
  • serialization of generic records via GenericDatumReader / -Writer
  • * - * The serializer instantiates them depending on the class of the type it should serialize. + * + *

    The serializer instantiates them depending on the class of the type it should serialize. * *

    Important: This serializer is NOT THREAD SAFE, because it reuses the data encoders * and decoders which have buffers that would be shared between the threads if used concurrently diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java index be52a27ea5f24..1a6bec85cb61d 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java @@ -100,6 +100,7 @@ public DataSet closeWith(DataSet solutionSetDelta, DataSet newWorkse * Gets the initial solution set. This is the data set on which the delta iteration was started. * *

    Consider the following example: + * *

     	 * {@code
     	 * DataSet solutionSetData = ...;
    @@ -108,7 +109,8 @@ public DataSet closeWith(DataSet solutionSetDelta, DataSet newWorkse
     	 * DeltaIteration iteration = solutionSetData.iteratorDelta(worksetData, 10, ...);
     	 * }
     	 * 
    - * The solutionSetData would be the data set returned by {@code iteration.getInitialSolutionSet();}. + * + *

    The solutionSetData would be the data set returned by {@code iteration.getInitialSolutionSet();}. * * @return The data set that forms the initial solution set. */ @@ -121,6 +123,7 @@ public DataSet getInitialSolutionSet() { * iteration. * *

    Consider the following example: + * *

     	 * {@code
     	 * DataSet solutionSetData = ...;
    @@ -129,7 +132,8 @@ public DataSet getInitialSolutionSet() {
     	 * DeltaIteration iteration = solutionSetData.iteratorDelta(worksetData, 10, ...);
     	 * }
     	 * 
    - * The worksetData would be the data set returned by {@code iteration.getInitialWorkset();}. + * + *

    The worksetData would be the data set returned by {@code iteration.getInitialWorkset();}. * * @return The data set that forms the initial workset. */ diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java index 74bd9e5acb7a8..608a7e73274bb 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Grouping.java @@ -65,11 +65,13 @@ public Grouping(DataSet set, Keys keys) { * Returns the input DataSet of a grouping operation, that is the one before the grouping. This means that * if it is applied directly to the result of a grouping operation, it will cancel its effect. As an example, in the * following snippet: + * *

    {@code
     	 * DataSet notGrouped = input.groupBy().getDataSet();
     	 * DataSet allReduced = notGrouped.reduce()
     	 * }
    - * the {@code groupBy()} is as if it never happened, as the {@code notGrouped} DataSet corresponds + * + *

    the {@code groupBy()} is as if it never happened, as the {@code notGrouped} DataSet corresponds * to the input of the {@code groupBy()} (because of the {@code getDataset()}). * */ @Internal diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java index 49aaf6d84e745..76f29dafa162e 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java @@ -37,7 +37,7 @@ * *

    A pattern definition is used by {@link org.apache.flink.cep.nfa.compiler.NFACompiler} to create a {@link NFA}. * - *

    {@code
    + * 
    {@code
      * Pattern pattern = Pattern.begin("start")
      *   .next("middle").subtype(F.class)
      *   .followedBy("end").where(new MyCondition());
    diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
    index d0c47ac791948..5a9ad88fdb142 100644
    --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
    +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
    @@ -26,7 +26,7 @@
     /**
      * A quantifier describing the Pattern. There are three main groups of {@link Quantifier}.
      *
    - * 

      + *
        *
      1. Single
      2. *
      3. Looping
      4. *
      5. Times
      6. diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java index baefe844c45f4..efa923549e95a 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java @@ -285,7 +285,7 @@ private CepOperator createCepOperator( /** * Creates a {@link PatternProcessFunction} that as a result will produce Strings as follows: - *
        [timestamp]:[Event.getName]...
        The Event.getName will occur stateNumber times. If the match does not + * {@code [timestamp]:[Event.getName]...}. The Event.getName will occur stateNumber times. If the match does not * contain n-th pattern it will replace this position with "null". * * @param stateNumber number of states in the pattern @@ -298,7 +298,7 @@ private static PatternProcessFunction extractTimestampAndNames(in /** * Creates a {@link PatternProcessFunction} that as a result will produce Strings as follows: - *
        [timestamp]:[Event.getName]...
        The Event.getName will occur stateNumber times. If the match does not + * {@code [timestamp]:[Event.getName]...} The Event.getName will occur stateNumber times. If the match does not * contain n-th pattern it will replace this position with "null". * *

        This function will also apply the same logic for timed out partial matches and emit those results into @@ -318,7 +318,7 @@ private static PatternProcessFunction extractTimestampAndNames( /** * Creates a {@link PatternProcessFunction} that as a result will produce Strings as follows: - *

        [currentProcessingTime]:[Event.getName]...
        The Event.getName will occur stateNumber times. + * {@code [currentProcessingTime]:[Event.getName]...}. The Event.getName will occur stateNumber times. * If the match does not contain n-th pattern it will replace this position with "null". * * @param stateNumber number of states in the pattern @@ -330,7 +330,7 @@ private static PatternProcessFunction extractCurrentProcessingTim /** * Creates a {@link PatternProcessFunction} that as a result will produce Strings as follows: - *
        [currentProcessingTime]:[Event.getName]...
        The Event.getName will occur stateNumber times. + * {@code [currentProcessingTime]:[Event.getName]...}. The Event.getName will occur stateNumber times. * If the match does not contain n-th pattern it will replace this position with "null". * *

        This function will also apply the same logic for timed out partial matches and emit those results into diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 446a2ba676420..bb7aabd1e88d9 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -1474,6 +1474,7 @@ public Graph removeVertex(Vertex vertex) { return removeVertices(vertexToBeRemoved); } + /** * Removes the given list of vertices and its edges from the graph. * @@ -1481,7 +1482,6 @@ public Graph removeVertex(Vertex vertex) { * @return the resulted graph containing the initial vertices and edges minus the vertices * and edges removed. */ - public Graph removeVertices(List> verticesToBeRemoved) { return removeVertices(this.context.fromCollection(verticesToBeRemoved)); } diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/ConnectedComponentsWithRandomisedEdgesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/ConnectedComponentsWithRandomisedEdgesITCase.java index 8b8927ee751de..d53074d565d59 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/ConnectedComponentsWithRandomisedEdgesITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/ConnectedComponentsWithRandomisedEdgesITCase.java @@ -70,7 +70,7 @@ protected void testProgram() throws Exception { /** * A map function that takes a Long value and creates a 2-tuple out of it: - *

        (Long value) -> (value, value)
        . + * {@code (Long value) -> (value, value)}. */ public static final class IdAssigner implements MapFunction> { @Override diff --git a/flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/statistics/basicstatistic/MultivariateGaussian.java b/flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/statistics/basicstatistic/MultivariateGaussian.java index a40095e1e1ec0..87b96d2372333 100644 --- a/flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/statistics/basicstatistic/MultivariateGaussian.java +++ b/flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/statistics/basicstatistic/MultivariateGaussian.java @@ -104,11 +104,11 @@ public double logpdf(Vector x) { * A) u = log((2*pi)^(-k/2)^ * det(sigma)^(-1/2)^) * B) rootSigmaInv = sqrt(inv(sigma)) = U * D^(-1/2)^ * - *

          + *
            *
          • sigma = U * D * U.t *
          • inv(sigma) = U * inv(D) * U.t = (U * D^(-1/2)^) * (U * D^(-1/2)^).t *
          • sqrt(inv(sigma)) = U * D^(-1/2)^ - *

          + *
        */ private void calculateCovarianceConstants() { int k = this.mean.size(); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java index 5563f8849303e..82480d7c395ba 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java @@ -76,12 +76,14 @@ *

        The WebInterface only displays the "Completed Jobs" page. * *

        The REST API is limited to + * *

          *
        • /config
        • *
        • /joboverview
        • *
        • /jobs/:jobid/*
        • *
        - * and relies on static files that are served by the {@link HistoryServerStaticFileServerHandler}. + * + *

        and relies on static files that are served by the {@link HistoryServerStaticFileServerHandler}. */ public class HistoryServer { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java index fb7e2051b8b32..8cf40882e580b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java @@ -63,7 +63,8 @@ default void notifyCheckpointAborted(long checkpointId) {} *

      7. There is a recovery from a completed checkpoint/savepoint but it contained no state * for the coordinator.
      8. * - * In both cases, the coordinator should reset to an empty (new) state. + * + *

        In both cases, the coordinator should reset to an empty (new) state. */ void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java index 99e8237304df8..daa4b4131d2a7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java @@ -46,7 +46,8 @@ * (and more latency on restore) *

      9. one thread; having multiple threads means more connections, couples with the implementation and increases complexity
      10. * - * Thread-safety: this class is thread-safe when used with a thread-safe {@link ChannelStateWriteRequestExecutor executor} + * + *

        Thread-safety: this class is thread-safe when used with a thread-safe {@link ChannelStateWriteRequestExecutor executor} * (e.g. default {@link ChannelStateWriteRequestExecutorImpl}. */ @Internal diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java index 83c63faeac138..23e19e4eb1645 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java @@ -57,7 +57,8 @@ *

      11. There is at least one {@link #availableMemorySegments}.
      12. *
      13. No subpartitions has reached {@link #maxBuffersPerChannel}.
      14. * - * To ensure this contract, the implementation eagerly fetches additional memory segments from {@link NetworkBufferPool} + * + *

        To ensure this contract, the implementation eagerly fetches additional memory segments from {@link NetworkBufferPool} * as long as it hasn't reached {@link #maxNumberOfMemorySegments} or one subpartition reached the quota. */ class LocalBufferPool implements BufferPool { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java index 12478861d530a..bf1f99c038e72 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java @@ -48,10 +48,13 @@ * *

        An operation is triggered by sending an HTTP {@code POST} request to a registered {@code url}. The HTTP * request may contain a JSON body to specify additional parameters, e.g., + * *

          * { "target-directory": "/tmp" }
          * 
        - * As written above, the response will contain a request id, e.g., + * + *

        As written above, the response will contain a request id, e.g., + * *

          * { "request-id": "7d273f5a62eb4730b9dea8e833733c1e" }
          * 
        @@ -59,6 +62,7 @@ *

        To poll for the status of an ongoing operation, an HTTP {@code GET} request is issued to * {@code url/:triggerid}. If the specified savepoint is still ongoing, * the response will be + * *

          * {
          *     "status": {
        @@ -66,8 +70,10 @@
          *     }
          * }
          * 
        - * If the specified operation has completed, the status id will transition to {@code COMPLETED}, and + * + *

        If the specified operation has completed, the status id will transition to {@code COMPLETED}, and * the response will additionally contain information about the operation result: + * *

          * {
          *     "status": {
        diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
        index cc363d4e11d40..11674bbcf3e58 100644
        --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
        +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
        @@ -64,12 +64,15 @@
          * 

        A savepoint is triggered by sending an HTTP {@code POST} request to * {@code /jobs/:jobid/savepoints}. The HTTP request may contain a JSON body to specify the target * directory of the savepoint, e.g., + * *

          * { "target-directory": "/tmp" }
          * 
        - * If the body is omitted, or the field {@code target-property} is {@code null}, the default + * + *

        If the body is omitted, or the field {@code target-property} is {@code null}, the default * savepoint directory as specified by {@link CheckpointingOptions#SAVEPOINT_DIRECTORY} will be used. * As written above, the response will contain a request id, e.g., + * *

          * { "request-id": "7d273f5a62eb4730b9dea8e833733c1e" }
          * 
        @@ -77,6 +80,7 @@ *

        To poll for the status of an ongoing savepoint, an HTTP {@code GET} request is issued to * {@code /jobs/:jobid/savepoints/:savepointtriggerid}. If the specified savepoint is still ongoing, * the response will be + * *

          * {
          *     "status": {
        @@ -84,8 +88,10 @@
          *     }
          * }
          * 
        - * If the specified savepoint has completed, the status id will transition to {@code COMPLETED}, and + * + *

        If the specified savepoint has completed, the status id will transition to {@code COMPLETED}, and * the response will additionally contain information about the savepoint, such as the location: + * *

          * {
          *     "status": {
        diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java
        index 0f8665544ff30..558a1b700bb5f 100644
        --- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java
        +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java
        @@ -75,7 +75,8 @@
          *     e.g. to manage the lifecycle of BLOCKING result partitions which can outlive their producers.
          *     The BLOCKING partitions can be consumed multiple times.
          * 
      - * The partitions, which currently still occupy local resources, can be queried with + * + *

      The partitions, which currently still occupy local resources, can be queried with * {@link ShuffleEnvironment#getPartitionsOccupyingLocalResources}. * *

      Input gate management.

      diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegionTest.java index f2511efc6e2a1..18346b3151879 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultSchedulingPipelinedRegionTest.java @@ -85,6 +85,7 @@ public void returnsVertices() { /** * Tests if the consumed inputs of the pipelined regions are computed * correctly using the Job graph below. + * *
       	 *          c
       	 *        /  X
      @@ -92,7 +93,8 @@ public void returnsVertices() {
       	 *       \  /
       	 *        d
       	 * 
      - * Pipelined regions: {a}, {b, c, d, e} + * + *

      Pipelined regions: {a}, {b, c, d, e} */ @Test public void returnsIncidentBlockingPartitions() throws Exception { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java index 5aeeb3466b5a8..d240c7088390c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java @@ -36,7 +36,8 @@ * abstraction represented by this interface. * *

      Initialization

      - * The {@link CheckpointedFunction#initializeState(FunctionInitializationContext)} is called when + * + *

      The {@link CheckpointedFunction#initializeState(FunctionInitializationContext)} is called when * the parallel instance of the transformation function is created during distributed execution. * The method gives access to the {@link FunctionInitializationContext} which in turn gives access * to the to the {@link OperatorStateStore} and {@link KeyedStateStore}. @@ -50,7 +51,8 @@ * keyed state, i.e., when it is applied on a keyed stream (after a {@code keyBy(...)}). * *

      Snapshot

      - * The {@link CheckpointedFunction#snapshotState(FunctionSnapshotContext)} is called whenever a + * + *

      The {@link CheckpointedFunction#snapshotState(FunctionSnapshotContext)} is called whenever a * checkpoint takes a state snapshot of the transformation function. Inside this method, functions typically * make sure that the checkpointed data structures (obtained in the initialization phase) are up * to date for a snapshot to be taken. The given snapshot context gives access to the metadata @@ -60,14 +62,15 @@ * external systems. * *

      Example

      - * The code example below illustrates how to use this interface for a function that keeps counts + * + *

      The code example below illustrates how to use this interface for a function that keeps counts * of events per key and per parallel partition (parallel instance of the transformation function * during distributed execution). * The example also changes of parallelism, which affect the count-per-parallel-partition by * adding up the counters of partitions that get merged on scale-down. Note that this is a * toy example, but should illustrate the basic skeleton for a stateful function. * - *

      {@code
      + * 
      {@code
        * public class MyFunction implements MapFunction, CheckpointedFunction {
        *
        *     private ReducingState countPerKey;
      @@ -110,15 +113,19 @@
        * 
      * *

      Shortcuts

      - * There are various ways that transformation functions can use state without implementing the + * + *

      There are various ways that transformation functions can use state without implementing the * full-fledged {@code CheckpointedFunction} interface: * *

      Operator State

      - * Checkpointing some state that is part of the function object itself is possible in a simpler way + * + *

      Checkpointing some state that is part of the function object itself is possible in a simpler way * by directly implementing the {@link ListCheckpointed} interface. * *

      Keyed State

      - * Access to keyed state is possible via the {@link RuntimeContext}'s methods: + * + *

      Access to keyed state is possible via the {@link RuntimeContext}'s methods: + * *

      {@code
        * public class CountPerKeyFunction extends RichMapFunction {
        *
      diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
      index da43224521c46..7a39379affe5c 100644
      --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
      +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
      @@ -37,7 +37,8 @@
        * of the state objects, or have multiple named states.
        *
        * 

      State Redistribution

      - * State redistribution happens when the parallelism of the operator is changed. + * + *

      State redistribution happens when the parallelism of the operator is changed. * State redistribution of operator state (to which category the state handled by this * interface belongs) always goes through a checkpoint, so it appears * to the transformation functions like a failure/recovery combination, where recovery happens @@ -50,6 +51,7 @@ *

      The following sketch illustrates the state redistribution.The function runs with parallelism * 3. The first two parallel instance of the function return lists with two state elements, * the third one a list with one element. + * *

        *    func_1        func_2     func_3
        * +----+----+   +----+----+   +----+
      @@ -58,6 +60,7 @@
        * 
      * *

      Recovering the checkpoint with parallelism = 5 yields the following state assignment: + * *

        * func_1   func_2   func_3   func_4   func_5
        * +----+   +----+   +----+   +----+   +----+
      @@ -65,7 +68,8 @@
        * +----+   +----+   +----+   +----+   +----+
        * 
      - * Recovering the checkpoint with parallelism = 2 yields the following state assignment: + *

      Recovering the checkpoint with parallelism = 2 yields the following state assignment: + * *

        *      func_1          func_2
        * +----+----+----+   +----+----+
      @@ -74,8 +78,10 @@
        * 
      * *

      Example

      - * The following example illustrates how to implement a {@code MapFunction} that counts all elements + * + *

      The following example illustrates how to implement a {@code MapFunction} that counts all elements * passing through it, keeping the total count accurate under re-scaling (changes or parallelism): + * *

      {@code
        * public class CountingFunction implements MapFunction>, ListCheckpointed {
        *
      diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
      index 1b06649adb399..36fa88f61f418 100644
      --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
      +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java
      @@ -30,7 +30,7 @@
       /**
        * A helper class to apply {@link AsyncFunction} to a data stream.
        *
      - * 

      {@code
      + * 
      {@code
        * DataStream input = ...
        * AsyncFunction> asyncFunc = ...
        *
      diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
      index 51255bcc31825..9e080f87bac29 100644
      --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
      +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
      @@ -476,8 +476,10 @@ public IntervalJoin inProcessingTime() {
       
       		/**
       		 * Specifies the time boundaries over which the join operation works, so that
      +		 *
       		 * 
      leftElement.timestamp + lowerBound <= rightElement.timestamp <= leftElement.timestamp + upperBound
      - * By default both the lower and the upper bound are inclusive. This can be configured + * + *

      By default both the lower and the upper bound are inclusive. This can be configured * with {@link IntervalJoined#lowerBoundExclusive()} and * {@link IntervalJoined#upperBoundExclusive()} * diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index 9f0dc04e49f35..193bcd2a95cb3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -116,7 +116,6 @@ public SingleOutputStreamOperator uid(String uid) { * that changes the automatically generated hashes. In this case, providing the previous hashes * directly through this method (e.g. obtained from old logs) can help to reestablish a lost * mapping from states to their target operator. - *

      * * @param uidHash The user provided hash for this operator. This will become the JobVertexID, * which is shown in the logs and web ui. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index f458c270de1f8..35c8a78ac6c4a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -312,7 +312,7 @@ public StreamExecutionEnvironment setRuntimeMode(final RuntimeExecutionMode exec * defines the number of key groups used for partitioned state. * * @param maxParallelism Maximum degree of parallelism to be used for the program., - * with 0 < maxParallelism <= 2^15 - 1 + * with {@code 0 < maxParallelism <= 2^15 - 1}. */ public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) { Preconditions.checkArgument(maxParallelism > 0 && diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index 1b050f53d2c0d..16948eff86e11 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -66,23 +66,29 @@ * {@link ContinuousFileMonitoringFunction}. Contrary to the {@link ContinuousFileMonitoringFunction} * which has a parallelism of 1, this operator can have DOP > 1. * - *

      This implementation uses {@link MailboxExecutor} to execute each action and state machine approach. The workflow is the following :

        - *
      1. start in {@link ReaderState#IDLE IDLE}
      2. + *

        This implementation uses {@link MailboxExecutor} to execute each action and state machine approach. The workflow is the following: + * + *

          + *
        1. start in {@link ReaderState#IDLE IDLE} *
        2. upon receiving a split add it to the queue, switch to {@link ReaderState#OPENING OPENING} and enqueue a - * {@link org.apache.flink.streaming.runtime.tasks.mailbox.Mail mail} to process it
        3. - *
        4. open file, switch to {@link ReaderState#READING READING}, read one record, re-enqueue self
        5. - *
        6. if no more records or splits available, switch back to {@link ReaderState#IDLE IDLE}
        7. - *
        - * On close: - *
          - *
        1. if {@link ReaderState#IDLE IDLE} then close immediately
        2. + * {@link org.apache.flink.streaming.runtime.tasks.mailbox.Mail mail} to process it + *
        3. open file, switch to {@link ReaderState#READING READING}, read one record, re-enqueue self + *
        4. if no more records or splits available, switch back to {@link ReaderState#IDLE IDLE} + *
        + * + *

        On close: + * + *

          + *
        1. if {@link ReaderState#IDLE IDLE} then close immediately *
        2. otherwise switch to {@link ReaderState#CLOSING CLOSING}, call {@link MailboxExecutor#yield() yield} in a loop - * until state is {@link ReaderState#CLOSED CLOSED}
        3. - *
        4. {@link MailboxExecutor#yield() yield()} causes remaining records (and splits) to be processed in the same way as above
        5. - *

        + * until state is {@link ReaderState#CLOSED CLOSED} + *
      3. {@link MailboxExecutor#yield() yield()} causes remaining records (and splits) to be processed in the same way as above + *
      + * *

      Using {@link MailboxExecutor} allows to avoid explicit synchronization. At most one mail should be enqueued at any - * given time.

      - *

      Using FSM approach allows to explicitly define states and enforce {@link ReaderState#VALID_TRANSITIONS transitions} between them.

      + * given time. + * + *

      Using FSM approach allows to explicitly define states and enforce {@link ReaderState#VALID_TRANSITIONS transitions} between them. */ @Internal public class ContinuousFileReaderOperator extends AbstractStreamOperator diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java index 8793fe54dc149..4bc459be7391a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java @@ -86,7 +86,8 @@ * * *

      Timestamps and watermarks:

      - * Sources may assign timestamps to elements and may manually emit watermarks. + * + *

      Sources may assign timestamps to elements and may manually emit watermarks. * However, these are only interpreted if the streaming program runs on * {@link TimeCharacteristic#EventTime}. On other time characteristics * ({@link TimeCharacteristic#IngestionTime} and {@link TimeCharacteristic#ProcessingTime}), diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedInputSplit.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedInputSplit.java index 10b015a217fed..0d5d13d3bf889 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedInputSplit.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedInputSplit.java @@ -24,11 +24,13 @@ /** * An extended {@link InputSplit} that also includes information about: + * *

        *
      • The modification time of the file this split belongs to.
      • *
      • When checkpointing, the state of the split at the moment of the checkpoint.
      • *
      - * This class is used by the {@link ContinuousFileMonitoringFunction} and the + * + *

      This class is used by the {@link ContinuousFileMonitoringFunction} and the * {@link ContinuousFileReaderOperator} to perform continuous file processing. * */ public interface TimestampedInputSplit extends InputSplit, Comparable { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java index a28d05664ca6f..1ea713a119c4c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java @@ -117,12 +117,14 @@ public class InternalTimerServiceImpl implements InternalTimerService { /** * Starts the local {@link InternalTimerServiceImpl} by: + * *

        *
      1. Setting the {@code keySerialized} and {@code namespaceSerializer} for the timers it will contain.
      2. *
      3. Setting the {@code triggerTarget} which contains the action to be performed when a timer fires.
      4. *
      5. Re-registering timers that were retrieved after recovering from a node failure, if any.
      6. *
      - * This method can be called multiple times, as long as it is called with the same serializers. + * + *

      This method can be called multiple times, as long as it is called with the same serializers. */ public void startTimerService( TypeSerializer keySerializer, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java index 96060f5dd1043..63cdd6bb21a12 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java @@ -107,11 +107,13 @@ *

    * *

    Note that + * *

      *
    1. user can only see results before a lastCheckpointedOffset, and
    2. *
    3. client will go back to the latest lastCheckpointedOffset when sink restarts,
    4. *
    - * client will never throw away results in user-visible buffer. + * + *

    client will never throw away results in user-visible buffer. * So this communication protocol achieves exactly-once semantics. * *

    In order not to block job finishing/cancelling, if there are still results in sink's buffer when job terminates, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java index 1cb1e5f8e05bb..c5685dd459393 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailbox.java @@ -31,18 +31,21 @@ * have a priority that can be used to retrieve only relevant letters. * *

    Threading model

    - * The mailbox is bound to a mailbox thread passed during creation. Most operations may only occur through that thread. + * + *

    The mailbox is bound to a mailbox thread passed during creation. Most operations may only occur through that thread. * Write operations ({@link #put(Mail)}, {@link #putFirst(Mail)}) can be executed by any thread. All other methods can * only be invoked by the mailbox thread, which is passed upon construction. To verify that the current thread is * allowed to take any mail, use {@link #isMailboxThread()}, but all methods will perform the check themselves and fail * accordingly if called from another thread. * *

    Life cycle

    - * In the open state, the mailbox supports put and take operations. In the quiesced state, the mailbox supports only + * + *

    In the open state, the mailbox supports put and take operations. In the quiesced state, the mailbox supports only * take operations. * *

    Batch

    - * A batch is a local view on the mailbox that does not contain simultaneously added mails similar to iterators of + * + *

    A batch is a local view on the mailbox that does not contain simultaneously added mails similar to iterators of * copy-on-write collections. * *

    A batch serves two purposes: it reduces synchronization if more than one mail is processable at the time of the diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java index 9ebf1192f1bd8..c76e37777dd40 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java @@ -224,10 +224,9 @@ public List getFullConstraints() { * 'connector' = 'csv' * ) *

    - * is equivalent with query: * - *

    "col1, col2, to_timestamp(col2) as col3", caution that the "computed column" operands - * have been reversed. + *

    is equivalent with query "col1, col2, to_timestamp(col2) as col3", caution that the + * "computed column" operandshave been reversed. */ public String getColumnSqlString() { SqlPrettyWriter writer = new SqlPrettyWriter( diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableLike.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableLike.java index ce765cc6a8331..45733ba9727e6 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableLike.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlTableLike.java @@ -129,7 +129,9 @@ public enum MergingStrategy { * EXCLUDING PARTITIONS * ) * }

    - * is equivalent to: + * + *

    is equivalent to: + * *

    {@code
     	 * LIKE `sourceTable` (
     	 *   INCLUDING GENERATED
    diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
    index 14934864f8df7..010f8cae37155 100644
    --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
    +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
    @@ -222,6 +222,7 @@ public static ApiExpression localTimestamp() {
     	 * 

    It evaluates: leftEnd >= rightStart && rightEnd >= leftStart * *

    e.g. + * *

    {@code
     	 * temporalOverlaps(
     	 *      lit("2:55:00").toTime(),
    @@ -230,7 +231,8 @@ public static ApiExpression localTimestamp() {
     	 *      lit(2).hours()
     	 * )
     	 * }
    - * leads to true + * + *

    leads to true */ public static ApiExpression temporalOverlaps( Object leftTimePoint, diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java index c0f24b73608dd..02bc158223001 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java @@ -136,6 +136,7 @@ public ZoneId getLocalTimeZone() { * is used during conversion. * *

    Example: + * *

    {@code
     	 * TableEnvironment tEnv = ...
     	 * TableConfig config = tEnv.getConfig
    @@ -144,7 +145,9 @@ public ZoneId getLocalTimeZone() {
     	 * tEnv("INSERT INTO testTable VALUES ((1, '2000-01-01 2:00:00'), (2, TIMESTAMP '2000-01-01 2:00:00'))");
     	 * tEnv("SELECT * FROM testTable"); // query with local time zone set to UTC+2
     	 * }
    - * should produce: + * + *

    should produce: + * *

     	 * =============================
     	 *    id   |       tmstmp
    @@ -152,12 +155,16 @@ public ZoneId getLocalTimeZone() {
     	 *    1    | 2000-01-01 2:00:00'
     	 *    2    | 2000-01-01 2:00:00'
     	 * 
    - * If we change the local time zone and query the same table: + * + *

    If we change the local time zone and query the same table: + * *

    {@code
     	 * config.setLocalTimeZone(ZoneOffset.ofHours(0));
     	 * tEnv("SELECT * FROM testTable"); // query with local time zone set to UTC+0
     	 * }
    - * we should get: + * + *

    we should get: + * *

     	 * =============================
     	 *    id   |       tmstmp
    diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
    index fbde34eddb5c4..2656a61cdeb61 100644
    --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
    +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
    @@ -96,13 +96,16 @@ static TableEnvironment create(EnvironmentSettings settings) {
     	 * 

    Examples: * *

    You can use a {@code row(...)} expression to create a composite rows: + * *

    {@code
     	 *  tEnv.fromValues(
     	 *      row(1, "ABC"),
     	 *      row(2L, "ABCDE")
     	 *  )
     	 * }
    - * will produce a Table with a schema as follows: + * + *

    will produce a Table with a schema as follows: + * *

    {@code
     	 *  root
     	 *  |-- f0: BIGINT NOT NULL     // original types INT and BIGINT are generalized to BIGINT
    @@ -130,7 +133,9 @@ static TableEnvironment create(EnvironmentSettings settings) {
     	 *      call(new RowFunction())
     	 *  )
     	 * }
    - * will produce a Table with a schema as follows: + * + *

    will produce a Table with a schema as follows: + * *

    {@code
     	 *  root
     	 *  |-- f0: ROW<`f0` BIGINT, `f1` VARCHAR(5)>
    @@ -139,6 +144,7 @@ static TableEnvironment create(EnvironmentSettings settings) {
     	 * 

    The row constructor can be dropped to create a table with a single column: * *

    ROWs that are a result of e.g. a function call are not flattened + * *

    {@code
     	 *  tEnv.fromValues(
     	 *      1,
    @@ -146,7 +152,9 @@ static TableEnvironment create(EnvironmentSettings settings) {
     	 *      3
     	 *  )
     	 * }
    - * will produce a Table with a schema as follows: + * + *

    will produce a Table with a schema as follows: + * *

    {@code
     	 *  root
     	 *  |-- f0: BIGINT NOT NULL
    @@ -169,6 +177,7 @@ default Table fromValues(Object... values) {
     	 * e.g. DECIMAL or naming the columns.
     	 *
     	 * 

    Examples: + * *

    {@code
     	 *  tEnv.fromValues(
     	 *      DataTypes.ROW(
    @@ -179,7 +188,9 @@ default Table fromValues(Object... values) {
     	 *      row(2L, "ABCDE")
     	 *  )
     	 * }
    - * will produce a Table with a schema as follows: + * + *

    will produce a Table with a schema as follows: + * *

    {@code
     	 *  root
     	 *  |-- id: DECIMAL(10, 2)
    @@ -205,13 +216,16 @@ default Table fromValues(AbstractDataType rowType, Object... values) {
     	 * 

    Examples: * *

    You can use a {@code row(...)} expression to create a composite rows: + * *

    {@code
     	 *  tEnv.fromValues(
     	 *      row(1, "ABC"),
     	 *      row(2L, "ABCDE")
     	 *  )
     	 * }
    - * will produce a Table with a schema as follows: + * + *

    will produce a Table with a schema as follows: + * *

    {@code
     	 *  root
     	 *  |-- f0: BIGINT NOT NULL     // original types INT and BIGINT are generalized to BIGINT
    @@ -228,6 +242,7 @@ default Table fromValues(AbstractDataType rowType, Object... values) {
     	 * {@code row} expressions.
     	 *
     	 * 

    ROWs that are a result of e.g. a function call are not flattened + * *

    {@code
     	 *  public class RowFunction extends ScalarFunction {
     	 *      {@literal @}DataTypeHint("ROW")
    @@ -239,7 +254,9 @@ default Table fromValues(AbstractDataType rowType, Object... values) {
     	 *      call(new RowFunction())
     	 *  )
     	 * }
    - * will produce a Table with a schema as follows: + * + *

    will produce a Table with a schema as follows: + * *

    {@code
     	 *  root
     	 *  |-- f0: ROW<`f0` BIGINT, `f1` VARCHAR(5)>
    @@ -248,6 +265,7 @@ default Table fromValues(AbstractDataType rowType, Object... values) {
     	 * 

    The row constructor can be dropped to create a table with a single column: * *

    ROWs that are a result of e.g. a function call are not flattened + * *

    {@code
     	 *  tEnv.fromValues(
     	 *      lit(1).plus(2),
    @@ -255,7 +273,9 @@ default Table fromValues(AbstractDataType rowType, Object... values) {
     	 *      lit(3)
     	 *  )
     	 * }
    - * will produce a Table with a schema as follows: + * + *

    will produce a Table with a schema as follows: + * *

    {@code
     	 *  root
     	 *  |-- f0: BIGINT NOT NULL
    @@ -273,6 +293,7 @@ default Table fromValues(AbstractDataType rowType, Object... values) {
     	 * e.g. DECIMAL or naming the columns.
     	 *
     	 * 

    Examples: + * *

    {@code
     	 *  tEnv.fromValues(
     	 *      DataTypes.ROW(
    @@ -283,7 +304,9 @@ default Table fromValues(AbstractDataType rowType, Object... values) {
     	 *      row(2L, "ABCDE")
     	 *  )
     	 * }
    - * will produce a Table with a schema as follows: + * + *

    will produce a Table with a schema as follows: + * *

    {@code
     	 *  root
     	 *  |-- id: DECIMAL(10, 2)
    @@ -920,7 +943,8 @@ default Table fromValues(AbstractDataType rowType, Object... values) {
     	 *    tEnv.sqlUpdate(query);
     	 *    tEnv.execute("MyJob");
     	 * 
    - * This code snippet creates a job to read data from Kafka source into a CSV sink. + * + *

    This code snippet creates a job to read data from Kafka source into a CSV sink. * * @param stmt The SQL statement to evaluate. * @deprecated use {@link #executeSql(String)} for single statement, diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExecutorFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExecutorFactory.java index 7de03c8eb0433..fe1b096917f7c 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExecutorFactory.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExecutorFactory.java @@ -32,10 +32,14 @@ * initialization configuration such as if the queries should be executed in batch mode. * *

    Important: The implementations of this interface should also implement method - *

    {@code public Executor create(Map properties, StreamExecutionEnvironment executionEnvironment);}
    - * 
    This method will be used when instantiating a {@link org.apache.flink.table.api.TableEnvironment} from a - * bridging module which enables conversion from/to {@code DataStream} API and requires a pre configured - * {@code StreamTableEnvironment}. + * + *
    + * {@code public Executor create(Map properties, StreamExecutionEnvironment executionEnvironment);}
    + * 
    + * + *

    This method will be used when instantiating a {@link org.apache.flink.table.api.TableEnvironment} + * from a bridging module which enables conversion from/to {@code DataStream} API and requires a pre + * configured {@code StreamTableEnvironment}. */ @Internal public interface ExecutorFactory extends ComponentFactory { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java index 474bf8084fb25..4869b3f57dc84 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java @@ -29,12 +29,13 @@ /** * This interface serves two purposes: + * *

      *
    • SQL parser via {@link #getParser()} - transforms a SQL string into a Table API specific objects * e.g. tree of {@link Operation}s
    • *
    • relational planner - provides a way to plan, optimize and transform tree of * {@link ModifyOperation} into a runnable form ({@link Transformation})
    • - *
    . + * * *

    The Planner is execution agnostic. It is up to the * {@link org.apache.flink.table.api.TableEnvironment} to ensure that if any of the diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java index 861ec35ac28f3..2644e0b8377a5 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java @@ -551,28 +551,28 @@ public Builder field(String name, DataType dataType) { } /** - * Add a computed field which is generated by the given expression. - * This also defines the field name and the data type. + * Add a computed field which is generated by the given expression. This also defines the + * field name and the data type. * *

    The call order of this method determines the order of fields in the schema. * - * @param name Field name - * @param dataType Field data type - * @param expression Computed column expression, it should be a SQL-style expression whose - * identifiers should be all quoted and expanded. + *

    The returned expression should be a SQL-style expression whose + * identifiers should be all quoted and expanded. * - * It should be expanded because this expression may be persisted - * then deserialized from the catalog, an expanded identifier would - * avoid the ambiguity if there are same name UDF referenced from - * different paths. For example, if there is a UDF named "my_udf" from - * path "my_catalog.my_database", you could pass in an expression like - * "`my_catalog`.`my_database`.`my_udf`(`f0`) + 1"; + *

    It should be expanded because this expression may be persisted then deserialized + * from the catalog, an expanded identifier would avoid the ambiguity if there are same name + * UDF referenced from different paths. For example, if there is a UDF named "my_udf" from + * path "my_catalog.my_database", you could pass in an expression like + * "`my_catalog`.`my_database`.`my_udf`(`f0`) + 1"; * - * It should be quoted because user could use a reserved keyword as the - * identifier, and we have no idea if it is quoted when deserialize from - * the catalog, so we force to use quoted identifier here. But framework - * will not check whether it is qualified and quoted or not. + *

    It should be quoted because user could use a reserved keyword as the identifier, + * and we have no idea if it is quoted when deserialize from the catalog, so we force to use + * quoted identifier here. But framework will not check whether it is qualified and quoted + * or not. * + * @param name Field name + * @param dataType Field data type + * @param expression Computed column expression. */ public Builder field(String name, DataType dataType, String expression) { Preconditions.checkNotNull(name); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sinks/PartitionableTableSink.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sinks/PartitionableTableSink.java index 9e96db889e75d..44c6d802fc252 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sinks/PartitionableTableSink.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sinks/PartitionableTableSink.java @@ -41,7 +41,8 @@ * INSERT INTO my_table PARTITION (dt='2019-06-20', country='bar') select a, b, c from my_view * *

    - * When all the partition columns are set a value in PARTITION clause, it is inserting into a + * + *

    When all the partition columns are set a value in PARTITION clause, it is inserting into a * static partition. It will writes the query result into a static partition, * i.e. {@code dt='2019-06-20', country='bar'}. The user specified static partitions will be told * to the sink via {@link #setStaticPartition(Map)}. @@ -52,7 +53,8 @@ * INSERT INTO my_table PARTITION (dt='2019-06-20') select a, b, c, country from another_view * *

    - * When partial partition columns (prefix part of all partition columns) are set a value in + * + *

    When partial partition columns (prefix part of all partition columns) are set a value in * PARTITION clause, it is writing the query result into a dynamic partition. In the above example, * the static partition part is {@code dt='2019-06-20'} which will be told to the sink via * {@link #setStaticPartition(Map)}. And the {@code country} is the dynamic partition which will be diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java index 571ef5d9e6e9e..316abaa0ca45f 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java @@ -45,24 +45,25 @@ *

    The following steps summarize the envisioned type inference process. Not all features are implemented * or exposed through the API yet (*). * - *

      - *
    • 1. Validate number of arguments. - *
    • 2. (*) Apply assignment operators on the call by permuting operands and adding default values. These - * are preparations for {@link CallContext}. - *
    • 3. For resolving unknown (NULL) operands: Access the outer wrapping call and try to get its operand - * type for the return type of the actual call. E.g. for {@code takes_string(this_function(NULL))} - * infer operands from {@code takes_string(NULL)} and use the inferred string type as the return - * type of {@code this_function(NULL)}. - *
    • 4. Try infer unknown operands, fail if not possible. - *
    • 5. (*) Check the usage of DEFAULT operands are correct using validator.isOptional(). - *
    • 6. Perform input type validation. - *
    • 7. (Optional) Infer accumulator type. - *
    • 8. Infer return type. - *
    • 9. (*) In the planner: Call the strategies again at any point in time to enrich a DataType that - * has been created from a logical type with a conversion class. - *
    • 10. (*) In the planner: Check for an implementation evaluation method matching the operands. The - * matching happens class-based. Thus, for example, eval(Object) is valid for (INT). Or eval(Object...) - * is valid for (INT, STRING). We rely on the conversion classes specified by DataType. + *
        + *
      • 1. Validate number of arguments. + *
      • 2. (*) Apply assignment operators on the call by permuting operands and adding default + * values. These are preparations for {@link CallContext}. + *
      • 3. For resolving unknown (NULL) operands: Access the outer wrapping call and try to get its + * operand type for the return type of the actual call. E.g. for {@code + * takes_string(this_function(NULL))} infer operands from {@code takes_string(NULL)} and use + * the inferred string type as the return type of {@code this_function(NULL)}. + *
      • 4. Try infer unknown operands, fail if not possible. + *
      • 5. (*) Check the usage of DEFAULT operands are correct using validator.isOptional(). + *
      • 6. Perform input type validation. + *
      • 7. (Optional) Infer accumulator type. + *
      • 8. Infer return type. + *
      • 9. (*) In the planner: Call the strategies again at any point in time to enrich a DataType + * that has been created from a logical type with a conversion class. + *
      • 10. (*) In the planner: Check for an implementation evaluation method matching the + * operands. The matching happens class-based. Thus, for example, eval(Object) is valid for + * (INT). Or eval(Object...) is valid for (INT, STRING). We rely on the conversion classes + * specified by DataType. *
      */ @Internal diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/XmlOutput.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/XmlOutput.java index 8ffa9aff2426c..de96b1b0134f8 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/XmlOutput.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/utils/XmlOutput.java @@ -311,32 +311,34 @@ public void emptyTag(String tagName, XMLAttrVector attributes) { } /** - * Writes a CDATA section. Such sections always appear on their own line. - * The nature in which the CDATA section is written depends on the actual - * string content with respect to these special characters/sequences: + * Writes a CDATA section. Such sections always appear on their own line. The nature in which + * the CDATA section is written depends on the actual string content with respect to these + * special characters/sequences: + * *
        - *
      • & - *
      • " - *
      • ' - *
      • < - *
      • > + *
      • & + *
      • " + *
      • ' + *
      • < + *
      • > *
      - * Additionally, the sequence ]]> is special. + * + *

      Additionally, the sequence ]]> is special. + * *

        - *
      • Content containing no special characters will be left as-is. - *
      • Content containing one or more special characters but not the - * sequence ]]> will be enclosed in a CDATA section. - *
      • Content containing special characters AND at least one - * ]]> sequence will be left as-is but have all of its - * special characters encoded as entities. + *
      • Content containing no special characters will be left as-is. + *
      • Content containing one or more special characters but not the sequence ]]> + * will be enclosed in a CDATA section. + *
      • Content containing special characters AND at least one ]]> sequence + * will be left as-is but have all of its special characters encoded as entities. *
      - * These special treatment rules are required to allow cdata sections - * to contain XML strings which may themselves contain cdata sections. - * Traditional CDATA sections do not nest. + * + *

      These special treatment rules are required to allow cdata sections to contain XML strings + * which may themselves contain cdata sections. Traditional CDATA sections do not nest. */ public void cdata(String data) { cdata(data, false); - } + } /** * Writes a CDATA section (as {@link #cdata(String)}). diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapper.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapper.java index 6bbef2192a676..e3935d57a1100 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapper.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/TableOperatorWrapper.java @@ -63,6 +63,7 @@ public class TableOperatorWrapper> implements *

      NOTE:The inputs of an operator may not all be in the multiple-input operator, e.g. * The multiple-input operator contains A and J, and A is one of the input of J, * and another input of J is not in the multiple-input operator. + * *

       	 * -------
       	 *        \
      @@ -70,7 +71,8 @@ public class TableOperatorWrapper> implements
       	 *        /
       	 * -- A --
       	 * 
      - * For this example, `allInputTypes` contains two input types. + * + *

      For this example, `allInputTypes` contains two input types. */ private final List> allInputTypes; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/triggers/EventTimeTriggers.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/triggers/EventTimeTriggers.java index fc096733cc206..04abdff161840 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/triggers/EventTimeTriggers.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/triggers/EventTimeTriggers.java @@ -28,10 +28,12 @@ /** * A {@link Trigger} that reacts to event-time timers. * The behavior can be one of the following: - *

        + * + *
          *
        • fire when the watermark passes the end of the window ({@link EventTimeTriggers#afterEndOfWindow()}), - *

        - * In the first case, the trigger can also specify an early and a late trigger. + *
      + * + *

      In the first case, the trigger can also specify an early and a late trigger. * The early trigger will be responsible for specifying when the trigger should fire in the period * between the beginning of the window and the time when the watermark passes the end of the window. * The late trigger takes over after the watermark passes the end of the window, and specifies when diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/triggers/ProcessingTimeTriggers.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/triggers/ProcessingTimeTriggers.java index cfae3f72d618a..4f593b25bd041 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/triggers/ProcessingTimeTriggers.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/triggers/ProcessingTimeTriggers.java @@ -32,14 +32,16 @@ /** * A {@link Trigger} that reacts to processing-time timers. * The behavior can be one of the following: - *

        + * + *
          *
        • fire when the processing time passes the end of the * window ({@link ProcessingTimeTriggers#afterEndOfWindow()}), *
        • fire when the processing time advances by a certain interval * after reception of the first element after the last firing for * a given window ({@link ProcessingTimeTriggers#every(Duration)}). - *

        - * In the first case, the trigger can also specify an early trigger. + *
      + * + *

      In the first case, the trigger can also specify an early trigger. * The early trigger will be responsible for specifying when the trigger should fire in the period * between the beginning of the window and the time when the processing time passes the end of the window. */ diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ApproximateLocalRecoveryDownstreamITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ApproximateLocalRecoveryDownstreamITCase.java index 1026cbf443cd3..ab621caa1167d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ApproximateLocalRecoveryDownstreamITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ApproximateLocalRecoveryDownstreamITCase.java @@ -67,10 +67,12 @@ public class ApproximateLocalRecoveryDownstreamITCase extends TestLogger { /** * Test the following topology. + * *

       	 *     (source1/1) -----> (map1/1) -----> (sink1/1)
       	 * 
      - * (map1/1) fails, (map1/1) and (sink1/1) restart + * + *

      (map1/1) fails, (map1/1) and (sink1/1) restart */ @Test public void localTaskFailureRecoveryThreeTasks() throws Exception { @@ -97,12 +99,14 @@ public void localTaskFailureRecoveryThreeTasks() throws Exception { /** * Test the following topology. + * *

       	 *     (source1/1) -----> (map1/2) -----> (sink1/1)
       	 *         |                                ^
       	 *         -------------> (map2/2) ---------|
       	 * 
      - * (map1/2) fails, (map1/2) and (sink1/1) restart + * + *

      (map1/2) fails, (map1/2) and (sink1/1) restart */ @Test public void localTaskFailureRecoveryTwoMapTasks() throws Exception {