Skip to content

Commit

Permalink
[FLINK-6442] [table] Add registration for TableSinks and INSERT INTO …
Browse files Browse the repository at this point in the history
…support for SQL and Table API.

This closes apache#3829.
  • Loading branch information
lincoln-lil authored and fhueske committed Sep 20, 2017
1 parent df5efe9 commit 2cb37cb
Show file tree
Hide file tree
Showing 56 changed files with 1,371 additions and 262 deletions.
153 changes: 136 additions & 17 deletions docs/dev/table/common.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ tableEnv.registerExternalCatalog("extCat", ...);
// create a Table from a Table API query
Table tapiResult = tableEnv.scan("table1").select(...);
// create a Table from a SQL query
Table sqlResult = tableEnv.sql("SELECT ... FROM table2 ... ");
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");

// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.writeToSink(...);
Expand All @@ -77,7 +77,7 @@ tableEnv.registerExternalCatalog("extCat", ...)
// create a Table from a Table API query
val tapiResult = tableEnv.scan("table1").select(...)
// Create a Table from a SQL query
val sqlResult = tableEnv.sql("SELECT ... FROM table2 ...")
val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ...")

// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.writeToSink(...)
Expand Down Expand Up @@ -149,18 +149,18 @@ val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)

{% top %}

Register a Table in the Catalog
Register Tables in the Catalog
-------------------------------

A `TableEnvironment` has an internal catalog of tables, organized by table name. Table API or SQL queries can access tables which are registered in the catalog, by referencing them by name.
A `TableEnvironment` maintains a catalog of tables which are registered by name. There are two types of tables, *input tables* and *output tables*. Input tables can be referenced in Table API and SQL queries and provide input data. Output tables can be used to emit the result of a Table API or SQL query to an external system.

A `TableEnvironment` allows you to register a table from various sources:
An input table can be registered from various sources:

* an existing `Table` object, usually the result of a Table API or SQL query.
* a `TableSource`, which accesses external data, such as a file, database, or messaging system.
* a `DataStream` or `DataSet` from a DataStream or DataSet program.
* a `DataStream` or `DataSet` from a DataStream or DataSet program. Registering a `DataStream` or `DataSet` is discussed in the [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api) section.

Registering a `DataStream` or `DataSet` as a table is discussed in the [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api) section.
An output table can be registerd using a `TableSink`.

### Register a Table

Expand Down Expand Up @@ -200,7 +200,7 @@ tableEnv.registerTable("projectedTable", projTable)

### Register a TableSource

A `TableSource` provides access to external data which is stored in a storage systems such as a database (MySQL, HBase, ...), a file with specific encoding (CSV, Apache \[Parquet, Avro, ORC\], ...), or a messaging system (Apache Kafka, RabbitMQ, ...).
A `TableSource` provides access to external data which is stored in a storage system such as a database (MySQL, HBase, ...), a file with a specific encoding (CSV, Apache \[Parquet, Avro, ORC\], ...), or a messaging system (Apache Kafka, RabbitMQ, ...).

Flink aims to provide TableSources for common data formats and storage systems. Please have a look at the [Table Sources and Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html) page for a list of supported TableSources and instructions for how to build a custom `TableSource`.

Expand Down Expand Up @@ -236,6 +236,52 @@ tableEnv.registerTableSource("CsvTable", csvSource)

{% top %}

### Register a TableSink

A registered `TableSink` can be used to [emit the result of a Table API or SQL query](common.html#emit-a-table) to an external storage system, such as a database, key-value store, message queue, or file system (in different encodings, e.g., CSV, Apache \[Parquet, Avro, ORC\], ...).

Flink aims to provide TableSinks for common data formats and storage systems. Please see the documentation about [Table Sources and Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html) page for details about available sinks and instructions for how to implement a custom `TableSink`.

A `TableSink` is registered in a `TableEnvironment` as follows:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// create a TableSink
TableSink csvSink = new CsvTableSink("/path/to/file", ...);

// define the field names and types
String[] fieldNames = {"a", "b", "c"};
TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};

// register the TableSink as table "CsvSinkTable"
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// create a TableSink
val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)

// define the field names and types
val fieldNames: Arary[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)

// register the TableSink as table "CsvSinkTable"
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
{% endhighlight %}
</div>
</div>

{% top %}

Register an External Catalog
----------------------------

Expand Down Expand Up @@ -342,7 +388,7 @@ Flink's SQL integration is based on [Apache Calcite](https://calcite.apache.org)

The [SQL]({{ site.baseurl }}/dev/table/sql.html) document describes Flink's SQL support for streaming and batch tables.

The following example shows how to specify a query and return the result as a Table.
The following example shows how to specify a query and return the result as a `Table`.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
Expand All @@ -353,7 +399,7 @@ StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register Orders table

// compute revenue for all customers from France
Table revenue = tableEnv.sql(
Table revenue = tableEnv.sqlQuery(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
Expand All @@ -373,7 +419,7 @@ val tableEnv = TableEnvironment.getTableEnvironment(env)
// register Orders table

// compute revenue for all customers from France
Table revenue = tableEnv.sql("""
Table revenue = tableEnv.sqlQuery("""
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
Expand All @@ -387,6 +433,53 @@ Table revenue = tableEnv.sql("""
</div>
</div>

The following example shows how to specify an update query that inserts its result into a registered table.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// register "Orders" table
// register "RevenueFrance" output table

// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.sqlUpdate(
"INSERT INTO RevenueFrance " +
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);

// execute query
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// register "Orders" table
// register "RevenueFrance" output table

// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.sqlUpdate("""
|INSERT INTO RevenueFrance
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)

// execute query
{% endhighlight %}

</div>
</div>

{% top %}

### Mixing Table API and SQL
Expand All @@ -401,12 +494,19 @@ Table API and SQL queries can be easily mixed because both return `Table` object
Emit a Table
------------

In order to emit a `Table`, it can be written to a `TableSink`. A `TableSink` is a generic interface to support a wide variety of file formats (e.g. CSV, Apache Parquet, Apache Avro), storage systems (e.g., JDBC, Apache HBase, Apache Cassandra, Elasticsearch), or messaging systems (e.g., Apache Kafka, RabbitMQ).
A `Table` is emitted by writing it to a `TableSink`. A `TableSink` is a generic interface to support a wide variety of file formats (e.g. CSV, Apache Parquet, Apache Avro), storage systems (e.g., JDBC, Apache HBase, Apache Cassandra, Elasticsearch), or messaging systems (e.g., Apache Kafka, RabbitMQ).

A batch `Table` can only be written to a `BatchTableSink`, while a streaming table requires either an `AppendStreamTableSink`, a `RetractStreamTableSink`, or an `UpsertStreamTableSink`.
A batch `Table` can only be written to a `BatchTableSink`, while a streaming `Table` requires either an `AppendStreamTableSink`, a `RetractStreamTableSink`, or an `UpsertStreamTableSink`.

Please see the documentation about [Table Sources & Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html) for details about available sinks and instructions for how to implement a custom `TableSink`.

There are two ways to emit a table:

1. The `Table.writeToSink(TableSink sink)` method emits the table using the provided `TableSink` and automatically configures the sink with the schema of the table to emit.
2. The `Table.insertInto(String sinkTable)` method looks up a `TableSink` that was registered with a specific schema under the provided name in the `TableEnvironment`'s catalog. The schema of the table to emit is validated against the schema of the registered `TableSink`.

The following examples shows how to emit a `Table`:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
Expand All @@ -419,9 +519,18 @@ Table result = ...
// create a TableSink
TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");

// write the result Table to the TableSink
// METHOD 1:
// Emit the result Table to the TableSink via the writeToSink() method
result.writeToSink(sink);

// METHOD 2:
// Register the TableSink with a specific schema
String[] fieldNames = {"a", "b", "c"};
TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink);
// Emit the result Table to the registered TableSink via the insertInto() method
result.insertInto("CsvSinkTable");

// execute the program
{% endhighlight %}
</div>
Expand All @@ -437,9 +546,18 @@ val result: Table = ...
// create a TableSink
val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")

// write the result Table to the TableSink
// METHOD 1:
// Emit the result Table to the TableSink via the writeToSink() method
result.writeToSink(sink)

// METHOD 2:
// Register the TableSink with a specific schema
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation] = Array(Types.INT, Types.STRING, Types.LONG)
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink)
// Emit the result Table to the registered TableSink via the insertInto() method
result.insertInto("CsvSinkTable")

// execute the program
{% endhighlight %}
</div>
Expand All @@ -458,8 +576,9 @@ Table API and SQL queries are translated into [DataStream]({{ site.baseurl }}/de

A Table API or SQL query is translated when:

* the `Table` is emitted to a `TableSink`, i.e., when `Table.writeToSink()` is called.
* the `Table` is converted into a `DataStream` or `DataSet` (see [Integration with DataStream and DataSet API](#integration-with-dataStream-and-dataSet-api)).
* a `Table` is emitted to a `TableSink`, i.e., when `Table.writeToSink()` or `Table.insertInto()` is called.
* a SQL update query is specified, i.e., when `TableEnvironment.sqlUpdate()` is called.
* a `Table` is converted into a `DataStream` or `DataSet` (see [Integration with DataStream and DataSet API](#integration-with-dataStream-and-dataSet-api)).

Once translated, a Table API or SQL query is handled like a regular DataStream or DataSet program and is executed when `StreamExecutionEnvironment.execute()` or `ExecutionEnvironment.execute()` is called.

Expand Down
Loading

0 comments on commit 2cb37cb

Please sign in to comment.