Skip to content

Commit

Permalink
[FLINK-26969][python][examples] Add a few examples of window operatio…
Browse files Browse the repository at this point in the history
…n in Python DataStream API

This closes apache#19328.
  • Loading branch information
cun8cun8 authored and dianfu committed Apr 2, 2022
1 parent 0e459bf commit db23add
Show file tree
Hide file tree
Showing 7 changed files with 500 additions and 1 deletion.
2 changes: 1 addition & 1 deletion flink-python/pyflink/datastream/tests/test_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str, CountW

def process(self,
key: str,
content: ProcessWindowFunction.Context,
context: ProcessWindowFunction.Context,
elements: Iterable[tuple]) -> Iterable[tuple]:
return [(key, len([e for e in elements]))]

Expand Down
17 changes: 17 additions & 0 deletions flink-python/pyflink/examples/datastream/windowing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import sys

import argparse
from typing import Iterable

from pyflink.datastream.connectors import FileSink, OutputFileConfig, RollingPolicy

from pyflink.common import Types, WatermarkStrategy, Encoder
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment, ProcessWindowFunction
from pyflink.datastream.window import EventTimeSessionWindows, \
SessionWindowTimeGapExtractor, TimeWindow


class MyTimestampAssigner(TimestampAssigner):
def extract_timestamp(self, value, record_timestamp) -> int:
return int(value[1])


class MySessionWindowTimeGapExtractor(SessionWindowTimeGapExtractor):
def extract(self, element: tuple) -> int:
return element[1]


class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str, TimeWindow]):
def process(self,
key: str,
context: ProcessWindowFunction.Context[TimeWindow],
elements: Iterable[tuple]) -> Iterable[tuple]:
return [(key, context.window().start, context.window().end, len([e for e in elements]))]

def clear(self, context: ProcessWindowFunction.Context) -> None:
pass


if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--output',
dest='output',
required=False,
help='Output file to write results to.')

argv = sys.argv[1:]
known_args, _ = parser.parse_known_args(argv)
output_path = known_args.output

env = StreamExecutionEnvironment.get_execution_environment()
# write all the data to one file
env.set_parallelism(1)

# define the source
data_stream = env.from_collection([
('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 8), ('hi', 9), ('hi', 15)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()]))

# define the watermark strategy
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(MyTimestampAssigner())

ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(EventTimeSessionWindows.with_dynamic_gap(MySessionWindowTimeGapExtractor())) \
.process(CountWindowProcessFunction(),
Types.TUPLE([Types.STRING(), Types.INT(), Types.INT(), Types.INT()]))

# define the sink
if output_path is not None:
ds.sink_to(
sink=FileSink.for_row_format(
base_path=output_path,
encoder=Encoder.simple_string_encoder())
.with_output_file_config(
OutputFileConfig.builder()
.with_part_prefix("prefix")
.with_part_suffix(".ext")
.build())
.with_rolling_policy(RollingPolicy.default_rolling_policy())
.build()
)
else:
print("Printing result to stdout. Use --output to specify output path.")
ds.print()

# submit for execution
env.execute()
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import sys

import argparse
from typing import Iterable

from pyflink.datastream.connectors import FileSink, RollingPolicy, OutputFileConfig

from pyflink.common import Types, WatermarkStrategy, Time, Encoder
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment, ProcessWindowFunction
from pyflink.datastream.window import EventTimeSessionWindows, \
SessionWindowTimeGapExtractor, TimeWindow


class MyTimestampAssigner(TimestampAssigner):
def extract_timestamp(self, value, record_timestamp) -> int:
return int(value[1])


class MySessionWindowTimeGapExtractor(SessionWindowTimeGapExtractor):
def extract(self, element: tuple) -> int:
return element[1]


class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str, TimeWindow]):
def process(self,
key: str,
context: ProcessWindowFunction.Context[TimeWindow],
elements: Iterable[tuple]) -> Iterable[tuple]:
return [(key, context.window().start, context.window().end, len([e for e in elements]))]

def clear(self, context: ProcessWindowFunction.Context) -> None:
pass


if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--output',
dest='output',
required=False,
help='Output file to write results to.')

argv = sys.argv[1:]
known_args, _ = parser.parse_known_args(argv)
output_path = known_args.output

env = StreamExecutionEnvironment.get_execution_environment()
# write all the data to one file
env.set_parallelism(1)

# define the source
data_stream = env.from_collection([
('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 8), ('hi', 9), ('hi', 15)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()]))

# define the watermark strategy
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(MyTimestampAssigner())

ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(EventTimeSessionWindows.with_gap(Time.milliseconds(5))) \
.process(CountWindowProcessFunction(),
Types.TUPLE([Types.STRING(), Types.INT(), Types.INT(), Types.INT()]))

# define the sink
if output_path is not None:
ds.sink_to(
sink=FileSink.for_row_format(
base_path=output_path,
encoder=Encoder.simple_string_encoder())
.with_output_file_config(
OutputFileConfig.builder()
.with_part_prefix("prefix")
.with_part_suffix(".ext")
.build())
.with_rolling_policy(RollingPolicy.default_rolling_policy())
.build()
)
else:
print("Printing result to stdout. Use --output to specify output path.")
ds.print()

# submit for execution
env.execute()
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import sys

import argparse
from typing import Iterable

from pyflink.datastream.connectors import FileSink, OutputFileConfig, RollingPolicy

from pyflink.common import Types, WatermarkStrategy, Time, Encoder
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment, ProcessWindowFunction
from pyflink.datastream.window import SlidingEventTimeWindows, TimeWindow


class MyTimestampAssigner(TimestampAssigner):
def extract_timestamp(self, value, record_timestamp) -> int:
return int(value[1])


class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str, TimeWindow]):
def process(self,
key: str,
context: ProcessWindowFunction.Context[TimeWindow],
elements: Iterable[tuple]) -> Iterable[tuple]:
return [(key, context.window().start, context.window().end, len([e for e in elements]))]

def clear(self, context: ProcessWindowFunction.Context) -> None:
pass


if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--output',
dest='output',
required=False,
help='Output file to write results to.')

argv = sys.argv[1:]
known_args, _ = parser.parse_known_args(argv)
output_path = known_args.output

env = StreamExecutionEnvironment.get_execution_environment()
# write all the data to one file
env.set_parallelism(1)

# define the source
data_stream = env.from_collection([
('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 5), ('hi', 8), ('hi', 9), ('hi', 15)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()]))

# define the watermark strategy
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(MyTimestampAssigner())

ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(SlidingEventTimeWindows.of(Time.milliseconds(5), Time.milliseconds(2))) \
.process(CountWindowProcessFunction(),
Types.TUPLE([Types.STRING(), Types.INT(), Types.INT(), Types.INT()]))

# define the sink
if output_path is not None:
ds.sink_to(
sink=FileSink.for_row_format(
base_path=output_path,
encoder=Encoder.simple_string_encoder())
.with_output_file_config(
OutputFileConfig.builder()
.with_part_prefix("prefix")
.with_part_suffix(".ext")
.build())
.with_rolling_policy(RollingPolicy.default_rolling_policy())
.build()
)
else:
print("Printing result to stdout. Use --output to specify output path.")
ds.print()

# submit for execution
env.execute()
Loading

0 comments on commit db23add

Please sign in to comment.