Skip to content

Commit

Permalink
[FLINK-18716][python][docs] Remove the deprecated execute and insert_…
Browse files Browse the repository at this point in the history
…into calls in PyFlink Table API docs

This closes apache#12992.
  • Loading branch information
WeiZhong94 authored and dianfu committed Nov 4, 2020
1 parent 3c613dd commit b868639
Show file tree
Hide file tree
Showing 12 changed files with 26 additions and 50 deletions.
18 changes: 6 additions & 12 deletions docs/dev/python/table_api_tutorial.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,21 +123,17 @@ The table `mySink` has two columns, word and count, and writes data to the file

You can now create a job which reads input from table `mySource`, preforms some transformations, and writes the results to table `mySink`.

Finally you must execute the actual Flink Python Table API job.
All operations, such as creating sources, transformations and sinks are lazy.
Only when `execute_insert(sink_name)` is called, the job will be submitted for execution.

{% highlight python %}
from pyflink.table.expressions import lit

tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
.select(tab.word, lit(1).count) \
.insert_into('mySink')
{% endhighlight %}

Finally you must execute the actual Flink Python Table API job.
All operations, such as creating sources, transformations and sinks are lazy.
Only when `t_env.execute(job_name)` is called will the job be run.

{% highlight python %}
t_env.execute("tutorial_job")
.execute_insert('mySink').wait()
{% endhighlight %}

The complete code so far:
Expand Down Expand Up @@ -173,9 +169,7 @@ t_env.connect(FileSystem().path('/tmp/output')) \
tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
.select(tab.word, lit(1).count) \
.insert_into('mySink')

t_env.execute("tutorial_job")
.execute_insert('mySink').wait()
{% endhighlight %}

## Executing a Flink Python Table API Program
Expand Down
18 changes: 6 additions & 12 deletions docs/dev/python/table_api_tutorial.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,21 +127,17 @@ t_env.sql_update(my_sink_ddl)

接下来,我们介绍如何创建一个作业:该作业读取表`mySource`中的数据,进行一些变换,然后将结果写入表`mySink`

最后,需要做的就是启动Flink Python Table API作业。上面所有的操作,比如创建源表
进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当`execute_insert(sink_name)`被调用的时候,
作业才会被真正提交到集群或者本地进行执行。

{% highlight python %}
from pyflink.table.expressions import lit

tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
.select(tab.word, lit(1).count) \
.insert_into('mySink')
{% endhighlight %}

最后,需要做的就是启动Flink Python Table API作业。上面所有的操作,比如创建源表
进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当`t_env.execute(job_name)`被调用的时候,
作业才会被真正提交到集群或者本地进行执行。

{% highlight python %}
t_env.execute("python_job")
.execute_insert('mySink').wait()
{% endhighlight %}

该教程的完整代码如下:
Expand Down Expand Up @@ -177,9 +173,7 @@ t_env.connect(FileSystem().path('/tmp/output')) \
tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
.select(tab.word, lit(1).count) \
.insert_into('mySink')

t_env.execute("python_job")
.execute_insert('mySink').wait()
{% endhighlight %}

## 执行一个Flink Python Table API程序
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)

t_env.sql_query("SELECT a FROM source_table") \
.insert_into("sink_table")
.execute_insert("sink_table").wait()
{% endhighlight %}

Below is a complete example of how to use a Kafka source/sink and the JSON format in PyFlink.
Expand Down Expand Up @@ -122,9 +122,7 @@ def log_processing():
t_env.execute_sql(sink_ddl)

t_env.sql_query("SELECT a FROM source_table") \
.insert_into("sink_table")

t_env.execute("payment_demo")
.execute_insert("sink_table").wait()


if __name__ == '__main__':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)

t_env.sql_query("SELECT a FROM source_table") \
.insert_into("sink_table")
.execute_insert("sink_table").wait()
{% endhighlight %}

下面是如何在 PyFlink 中使用 Kafka source/sink 和 JSON 格式的完整示例。
Expand Down Expand Up @@ -122,9 +122,7 @@ def log_processing():
t_env.execute_sql(sink_ddl)

t_env.sql_query("SELECT a FROM source_table") \
.insert_into("sink_table")

t_env.execute("payment_demo")
.execute_insert("sink_table").wait()


if __name__ == '__main__':
Expand Down
2 changes: 1 addition & 1 deletion docs/dev/table/connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -2122,7 +2122,7 @@ table_env.register_table_sink(
)

table = ...
table.insert_into("csvOutputTable")
table.execute_insert("csvOutputTable").wait()
{% endhighlight %}
</div>
</div>
Expand Down
2 changes: 1 addition & 1 deletion docs/dev/table/connect.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -2118,7 +2118,7 @@ table_env.register_table_sink(
)

table = ...
table.insert_into("csvOutputTable")
table.execute_insert("csvOutputTable").wait()
{% endhighlight %}
</div>
</div>
Expand Down
2 changes: 1 addition & 1 deletion docs/dev/table/streaming/query_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ table_env.register_table_sink("outputTable", # table name
sink) # table sink

# emit result Table via a TableSink
result.execute_insert("outputTable")
result.execute_insert("outputTable").wait()

{% endhighlight %}
</div>
Expand Down
2 changes: 1 addition & 1 deletion docs/dev/table/streaming/query_configuration.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ table_env.register_table_sink("outputTable", # table name
sink) # table sink

# emit result Table via a TableSink
result.insert_into("outputTable")
result.execute_insert("outputTable").wait()

{% endhighlight %}
</div>
Expand Down
4 changes: 1 addition & 3 deletions docs/dev/table/tableApi.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,7 @@ t_env.execute_sql(sink_ddl)
# specify table program
orders = t_env.from_path("Orders") # schema (a, b, c, rowtime)

orders.group_by(orders.a).select(orders.a, orders.b.count.alias('cnt')).insert_into("Result")

t_env.execute("python_job")
orders.group_by("a").select(orders.a, orders.b.count.alias('cnt')).execute_insert("result").wait()

{% endhighlight %}

Expand Down
4 changes: 1 addition & 3 deletions docs/dev/table/tableApi.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,7 @@ t_env.execute_sql(sink_ddl)
# specify table program
orders = t_env.from_path("Orders") # schema (a, b, c, rowtime)

orders.group_by(orders.a).select(orders.a, orders.b.count.alias('cnt')).insert_into("Result")

t_env.execute("python_job")
orders.group_by("a").select(orders.a, orders.b.count.alias('cnt')).execute_insert("result").wait()

{% endhighlight %}

Expand Down
6 changes: 2 additions & 4 deletions docs/ops/python_shell.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ The example below is a simple program in the Python shell:
... .field("c", DataTypes.STRING()))\
... .create_temporary_table("stream_sink")
>>> t.select("a + 1, b, c")\
... .insert_into("stream_sink")
>>> st_env.execute("stream_job")
... .execute_insert("stream_sink").wait()
>>> # If the job runs in local mode, you can exec following code in Python shell to see the result:
>>> with open(sink_path, 'r') as f:
... print(f.read())
Expand Down Expand Up @@ -107,8 +106,7 @@ The example below is a simple program in the Python shell:
... .field("c", DataTypes.STRING()))\
... .create_temporary_table("batch_sink")
>>> t.select("a + 1, b, c")\
... .insert_into("batch_sink")
>>> bt_env.execute("batch_job")
... .execute_insert("batch_sink").wait()
>>> # If the job runs in local mode, you can exec following code in Python shell to see the result:
>>> with open(sink_path, 'r') as f:
... print(f.read())
Expand Down
6 changes: 2 additions & 4 deletions docs/ops/python_shell.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ $ pyflink-shell.sh local
... .field("c", DataTypes.STRING()))\
... .create_temporary_table("stream_sink")
>>> t.select("a + 1, b, c")\
... .insert_into("stream_sink")
>>> st_env.execute("stream_job")
... .execute_insert("stream_sink").wait()
>>> # 如果作业运行在local模式, 你可以执行以下代码查看结果:
>>> with open(sink_path, 'r') as f:
... print(f.read())
Expand Down Expand Up @@ -107,8 +106,7 @@ $ pyflink-shell.sh local
... .field("c", DataTypes.STRING()))\
... .create_temporary_table("batch_sink")
>>> t.select("a + 1, b, c")\
... .insert_into("batch_sink")
>>> bt_env.execute("batch_job")
... .execute_insert("batch_sink").wait()
>>> # 如果作业运行在local模式, 你可以执行以下代码查看结果:
>>> with open(sink_path, 'r') as f:
... print(f.read())
Expand Down

0 comments on commit b868639

Please sign in to comment.