This project provides an environment to test classifiers in Apache Flink.
- Java 11
- Apache Flink 1.19.1
-
Build the JAR file using the following Maven command:
mvn clean package
-
Run the JAR using Flink:
flink run <path-to-jar>
Alternatively, you can use the predefined execution configurations in IntelliJ:
- Deploy Flink App
- Deploy Flink App & Plot
Note: You need to configure environment variables in IntelliJ's run configuration (see Configuration).
Environment Variable | Description | Required | Default Value / Example (if required) |
---|---|---|---|
FLINK_BIN_PATH |
Absolute path of Flink 1.19.1 binaries | Yes | /home/userxyz/flink-1.19.1/bin |
RESULTS_DIRECTORY |
Directory for placing Flink's classification results | Yes | /home/userxyz/resultsDir |
FLINK_ADDRESS |
URL of Flink cluster for API calls | No | localhost |
FLINK_PORT |
Port of Flink cluster for API calls | No | 8081 |
To implement a new classifier, extend one of the following base classes:
Next, create a class to define your classifier operator, extending one of these classes:
See for example HoeffdingTree
and
VfdtProcessFunction
.
-
The
registerClassifier
method fromBaseProcessFunctionClassifyAndTrain
andBaseProcessFunctionTrainAndClassify
is generic, asTypeInformation<C>
requires a concrete class. -
For easier invocation of operators, provide a
ProcessFunction
factory. For example, see thevfdt(...)
method inVfdtProcessFactory
and the classifier factory invocations in themain()
method ofDataStreamJob
.
The dataset files used should follow these conventions:
- Format:
.csv
- Headers: The first row should be a standard comma-separated list of features (e.g.,
period,nswprice,nswdemand,vicprice,vicdemand,transfer,class
). - Data Rows: Each row should list the feature values separated by
#
and the class label separated by a comma ( e.g.,0.0#0.0564#0.439155#0.003467#0.422915#0.414912,1
).
For the class labels, you must provide a corresponding .txt
file in the same directory as the dataset. This file
should map the class names to integer values. Example for a binary classification dataset:
<dataset>.txt
:
yes 1
no 0
You can use the dataset_file_formatter
script to format any dataset into the
required structure.