Skip to content

Commit

Permalink
[FLINK-14356][table][formats] Introduce "single-value" format to (de)…
Browse files Browse the repository at this point in the history
…serialize message for single field
  • Loading branch information
hackergin authored and wuchong committed Nov 7, 2020
1 parent f1f25e0 commit 17c3811
Show file tree
Hide file tree
Showing 11 changed files with 981 additions and 1 deletion.
7 changes: 6 additions & 1 deletion docs/dev/table/connectors/formats/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,10 @@ Flink supports the following formats:
<td><a href="{% link dev/table/connectors/formats/orc.md %}">Apache ORC</a></td>
<td><a href="{% link dev/table/connectors/filesystem.md %}">Filesystem</a></td>
</tr>
<tr>
<td><a href="{% link dev/table/connectors/formats/singleValue.md %}">Single Value</a></td>
<td><a href="{% link dev/table/connectors/kafka.md %}">Apache Kafka</a>,
<a href="{% link dev/table/connectors/filesystem.md %}">Filesystem</a></td>
</tr>
</tbody>
</table>
</table>
78 changes: 78 additions & 0 deletions docs/dev/table/connectors/formats/singleValue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
---
title: "SingleValue Format"
nav-title: SingleValue
nav-parent_id: sql-formats
nav-pos: 7
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

<span class="label label-info">Format: Serialization Schema</span>
<span class="label label-info">Format: Deserialization Schema</span>

* This will be replaced by the TOC
{:toc}

The SingleValue format allows to read and write single value include String, byte[] and java primitive type.

How to create a table with SingleValue format
----------------

Here is an example to create a table using Kafka connector and SingleValue format.

<div class="codetabs" markdown="1">
<div data-lang="SQL" markdown="1">
{% highlight sql %}
CREATE TABLE user_behavior (
user_id VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'single-value'
)
{% endhighlight %}
</div>
</div>

Format Options
----------------

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 25%">Option</th>
<th class="text-center" style="width: 8%">Required</th>
<th class="text-center" style="width: 7%">Default</th>
<th class="text-center" style="width: 10%">Type</th>
<th class="text-center" style="width: 50%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>format</h5></td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specify what format to use, here should be 'single-value'.</td>
</tr>
</tbody>
</table>

78 changes: 78 additions & 0 deletions docs/dev/table/connectors/formats/singleValue.zh.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
---
title: "SingleValue Format"
nav-title: SingleValue
nav-parent_id: sql-formats
nav-pos: 7
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

<span class="label label-info">Format: Serialization Schema</span>
<span class="label label-info">Format: Deserialization Schema</span>

* This will be replaced by the TOC
{:toc}

SingleValue Format用于读写单一类型的消息, 包括String, byte[]和java原始类型.

如何创建基于SingleValue格式的表
----------------

以下是一个使用 Kafka 连接器和 SingleValue 格式创建表的示例。

<div class="codetabs" markdown="1">
<div data-lang="SQL" markdown="1">
{% highlight sql %}
CREATE TABLE user_behavior (
user_id VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'single-value'
)
{% endhighlight %}
</div>
</div>

Format 参数
----------------

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 25%">参数</th>
<th class="text-center" style="width: 8%">是否必须</th>
<th class="text-center" style="width: 7%">默认值</th>
<th class="text-center" style="width: 10%">类型</th>
<th class="text-center" style="width: 50%">描述</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>format</h5></td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>指定使用的格式,此处应为"single-value"。</td>
</tr>
</tbody>
</table>

Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.formats;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory.Context;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;

import java.util.Collections;
import java.util.Set;

/**
* SingleValueFormatFactory for single value.
*/
public class SingleValueFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory {

public static final String IDENTIFIER = "single-value";

@Override
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
Context context,
ReadableConfig formatOptions) {
FactoryUtil.validateFactoryOptions(this, formatOptions);
validateTableSchema(context.getCatalogTable().getSchema());
return new DecodingFormat<DeserializationSchema<RowData>>() {
@Override
public DeserializationSchema<RowData> createRuntimeDecoder(
DynamicTableSource.Context context,
DataType producedDataType) {
final RowType rowType = (RowType) producedDataType.getLogicalType();
final TypeInformation<RowData> rowDataTypeInfo =
(TypeInformation<RowData>) context.createTypeInformation(producedDataType);
return new SingleValueRowDataDeserialization(
rowType,
rowDataTypeInfo);
}

@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
};
}

@Override
public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
Context context, ReadableConfig formatOptions) {
FactoryUtil.validateFactoryOptions(this, formatOptions);
validateTableSchema(context.getCatalogTable().getSchema());
return new EncodingFormat<SerializationSchema<RowData>>() {
@Override
public SerializationSchema<RowData> createRuntimeEncoder(
DynamicTableSink.Context context,
DataType consumedDataType) {
final RowType rowType = (RowType) consumedDataType.getLogicalType();
return new SingleValueRowDataSerialization(rowType);
}

@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
};
}

@Override
public String factoryIdentifier() {
return IDENTIFIER;
}

@Override
public Set<ConfigOption<?>> requiredOptions() {
return Collections.emptySet();
}

@Override
public Set<ConfigOption<?>> optionalOptions() {
return Collections.emptySet();
}

private static void validateTableSchema(TableSchema tableSchema) {
long physicalColumnCount = tableSchema.getTableColumns()
.stream()
.filter(TableColumn::isGenerated)
.count();
if (physicalColumnCount > 1) {
throw new ValidationException("Single value should have only one physical column");
}
}

}
Loading

0 comments on commit 17c3811

Please sign in to comment.