Skip to content

Commit

Permalink
[hotfix][python] Fix Kafka csv example
Browse files Browse the repository at this point in the history
  • Loading branch information
dianfu committed Nov 1, 2023
1 parent bfe6f19 commit eef2270
Showing 1 changed file with 3 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
from pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
from pyflink.datastream.formats.csv import CsvRowSerializationSchema
from pyflink.datastream.formats.json import JsonRowDeserializationSchema
from pyflink.datastream.formats.csv import CsvRowSerializationSchema, CsvRowDeserializationSchema


# Make sure that the Kafka cluster is started and the topic 'test_csv_topic' is
Expand All @@ -46,9 +45,8 @@ def write_to_kafka(env):


def read_from_kafka(env):
deserialization_schema = JsonRowDeserializationSchema.Builder() \
.type_info(Types.ROW([Types.INT(), Types.STRING()])) \
.build()
type_info = Types.ROW([Types.INT(), Types.STRING()])
deserialization_schema = CsvRowDeserializationSchema.Builder(type_info).build()

kafka_consumer = FlinkKafkaConsumer(
topics='test_csv_topic',
Expand Down

0 comments on commit eef2270

Please sign in to comment.