Skip to content

Commit

Permalink
[FLINK-24635][examples] Fix deprecations in state machine example
Browse files Browse the repository at this point in the history
  • Loading branch information
sjwiesman committed Nov 16, 2021
1 parent dbcce67 commit cfada41
Showing 1 changed file with 18 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,31 @@

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.examples.statemachine.dfa.State;
import org.apache.flink.streaming.examples.statemachine.event.Alert;
import org.apache.flink.streaming.examples.statemachine.event.Event;
import org.apache.flink.streaming.examples.statemachine.generator.EventsGeneratorSource;
import org.apache.flink.streaming.examples.statemachine.kafka.EventDeSerializationSchema;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
* Main class of the state machine example. This class implements the streaming application that
* receives the stream of events and evaluates a state machine (per originating address) to validate
Expand Down Expand Up @@ -140,7 +146,17 @@ public static void main(String[] args) throws Exception {
if (outputFile == null) {
alerts.print();
} else {
alerts.writeAsText(outputFile, FileSystem.WriteMode.OVERWRITE).setParallelism(1);
alerts.sinkTo(
FileSink.<Alert>forRowFormat(
new Path(outputFile), new SimpleStringEncoder<>())
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withMaxPartSize(MemorySize.ofMebiBytes(1))
.withRolloverInterval(Duration.ofSeconds(10))
.build())
.build())
.setParallelism(1)
.name("output");
}

// trigger program execution
Expand Down

0 comments on commit cfada41

Please sign in to comment.