Skip to content

Commit

Permalink
[FLINK-14007][python][docs] Add documentation for how to use Java use…
Browse files Browse the repository at this point in the history
…r-defined source/sink in Python API

This closes apache#10515.
  • Loading branch information
hequn8128 committed Dec 12, 2019
1 parent ab4c31c commit 98dd5dc
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 38 deletions.
58 changes: 39 additions & 19 deletions docs/dev/table/sourceSinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -714,10 +714,11 @@ connector.debug=true

For a type-safe, programmatic approach with explanatory Scaladoc/Javadoc, the Table & SQL API offers descriptors in `org.apache.flink.table.descriptors` that translate into string-based properties. See the [built-in descriptors](connect.html) for sources, sinks, and formats as a reference.

A connector for `MySystem` in our example can extend `ConnectorDescriptor` as shown below:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">

A custom descriptor can be defined by extending the `ConnectorDescriptor` class.

{% highlight java %}
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import java.util.HashMap;
Expand All @@ -743,9 +744,25 @@ public class MySystemConnector extends ConnectorDescriptor {
}
}
{% endhighlight %}

The descriptor can then be used to create a table with the table environment.

{% highlight java %}
StreamTableEnvironment tableEnv = // ...

tableEnv
.connect(new MySystemConnector(true))
.withSchema(...)
.inAppendMode()
.createTemporaryTable("MySystemTable");
{% endhighlight %}

</div>

<div data-lang="scala" markdown="1">

A custom descriptor can be defined by extending the `ConnectorDescriptor` class.

{% highlight scala %}
import org.apache.flink.table.descriptors.ConnectorDescriptor
import java.util.HashMap
Expand All @@ -763,25 +780,9 @@ class MySystemConnector(isDebug: Boolean) extends ConnectorDescriptor("my-system
}
}
{% endhighlight %}
</div>
</div>

The descriptor can then be used in the API as follows:
The descriptor can then be used to create a table with the table environment.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
StreamTableEnvironment tableEnv = // ...

tableEnv
.connect(new MySystemConnector(true))
.withSchema(...)
.inAppendMode()
.createTemporaryTable("MySystemTable");
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
val tableEnv: StreamTableEnvironment = // ...

Expand All @@ -791,7 +792,26 @@ tableEnv
.inAppendMode()
.createTemporaryTable("MySystemTable")
{% endhighlight %}

</div>

<div data-lang="python" markdown="1">

You can use a Java `TableFactory` from Python using the `CustomConnectorDescriptor`.

{% highlight python %}
s_env = StreamExecutionEnvironment.get_execution_environment()
st_env = StreamTableEnvironment.create(s_env)

custom_connector = CustomConnectorDescriptor('my-system', 1, False)
st_env\
.connect(custom_connector.property("connector.debug", "true")) \
.with_schema(...) \
.in_append_mode()\
.create_temporary_table("MySystemTable")
{% endhighlight %}
</div>

</div>

{% top %}
56 changes: 37 additions & 19 deletions docs/dev/table/sourceSinks.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -714,10 +714,11 @@ connector.debug=true

For a type-safe, programmatic approach with explanatory Scaladoc/Javadoc, the Table & SQL API offers descriptors in `org.apache.flink.table.descriptors` that translate into string-based properties. See the [built-in descriptors](connect.html) for sources, sinks, and formats as a reference.

A connector for `MySystem` in our example can extend `ConnectorDescriptor` as shown below:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">

A custom descriptor can be defined by extending the `ConnectorDescriptor` class.

{% highlight java %}
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import java.util.HashMap;
Expand All @@ -743,9 +744,24 @@ public class MySystemConnector extends ConnectorDescriptor {
}
}
{% endhighlight %}

The descriptor can then be used to create a table with the table environment.

{% highlight java %}
StreamTableEnvironment tableEnv = // ...

tableEnv
.connect(new MySystemConnector(true))
.withSchema(...)
.inAppendMode()
.createTemporaryTable("MySystemTable");
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">

A custom descriptor can be defined by extending the `ConnectorDescriptor` class.

{% highlight scala %}
import org.apache.flink.table.descriptors.ConnectorDescriptor
import java.util.HashMap
Expand All @@ -763,25 +779,9 @@ class MySystemConnector(isDebug: Boolean) extends ConnectorDescriptor("my-system
}
}
{% endhighlight %}
</div>
</div>

The descriptor can then be used in the API as follows:
The descriptor can then be used to create a table with the table environment.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
StreamTableEnvironment tableEnv = // ...

tableEnv
.connect(new MySystemConnector(true))
.withSchema(...)
.inAppendMode()
.createTemporaryTable("MySystemTable");
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
val tableEnv: StreamTableEnvironment = // ...

Expand All @@ -791,7 +791,25 @@ tableEnv
.inAppendMode()
.createTemporaryTable("MySystemTable")
{% endhighlight %}

</div>
<div data-lang="python" markdown="1">

You can use a Java `TableFactory` from Python using the `CustomConnectorDescriptor`.

{% highlight python %}
s_env = StreamExecutionEnvironment.get_execution_environment()
st_env = StreamTableEnvironment.create(s_env)

custom_connector = CustomConnectorDescriptor('my-system', 1, False)
st_env\
.connect(custom_connector.property("connector.debug", "true")) \
.with_schema(...) \
.in_append_mode()\
.create_temporary_table("MySystemTable")
{% endhighlight %}
</div>
</div>


{% top %}

0 comments on commit 98dd5dc

Please sign in to comment.