Skip to content

Commit

Permalink
[FLINK-1081] Add HDFS file-stream source for streaming
Browse files Browse the repository at this point in the history
Conflicts:
	flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
  • Loading branch information
chiwanpark authored and gyfora committed Jan 25, 2015
1 parent 7aa9a50 commit bf5e39a
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,18 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.StreamGraph;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.function.source.FileMonitoringFunction;
import org.apache.flink.streaming.api.function.source.FileMonitoringFunction.WatchType;
import org.apache.flink.streaming.api.function.source.FileReadFunction;
import org.apache.flink.streaming.api.function.source.FileSourceFunction;
import org.apache.flink.streaming.api.function.source.FileStreamFunction;
import org.apache.flink.streaming.api.function.source.FromElementsFunction;
import org.apache.flink.streaming.api.function.source.GenSequenceFunction;
import org.apache.flink.streaming.api.function.source.GenericSourceFunction;
Expand Down Expand Up @@ -214,34 +217,30 @@ public DataStreamSource<String> readTextFile(String filePath, String charsetName
}

/**
* Creates a DataStream that represents the Strings produced by reading the
* given file line wise multiple times(infinite). The file will be read with
* the system's default character set. This functionality can be used for
* testing a topology.
* Creates a DataStream that contains the contents of file created while
* system watches the given path. The file will be read with the system's
* default character set.
*
* @param filePath
* The path of the file, as a URI (e.g.,
* "file:///some/local/file" or "hdfs://host:port/file/path").
* @return The DataStream representing the text file.
* "file:///some/local/file" or "hdfs://host:port/file/path/").
* @param interval
* The interval of file watching.
* @param watchType
* The watch type of file stream. When watchType is
* {@link WatchType.ONLY_NEW_FILES}, the system processes only
* new files. {@link WatchType.REPROCESS_WITH_APPENDED} means
* that the system re-processes all contents of appended file.
* {@link WatchType.PROCESS_ONLY_APPENDED} means that the system
* processes only appended contents of files.
*
* @return The DataStream containing the given directory.
*/
public DataStreamSource<String> readTextStream(String filePath) {
checkIfFileExists(filePath);
return addSource(new FileStreamFunction(filePath), null, "textStream");
}
public DataStream<String> readFileStream(String filePath, long interval, WatchType watchType) {
DataStream<Tuple3<String, Long, Long>> source = addSource(new FileMonitoringFunction(
filePath, interval, watchType));

private static void checkIfFileExists(String filePath) {
File file = new File(filePath);
if (!file.exists()) {
throw new IllegalArgumentException("File not found: " + filePath);
}

if (!file.canRead()) {
throw new IllegalArgumentException("Cannot read file: " + filePath);
}

if (file.isDirectory()) {
throw new IllegalArgumentException("Given path is a directory: " + filePath);
}
return source.flatMap(new FileReadFunction());
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.function.source;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Long, Long>> {
private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(FileMonitoringFunction.class);

public enum WatchType {
ONLY_NEW_FILES, // Only new files will be processed.
REPROCESS_WITH_APPENDED, // When some files are appended, all contents of the files will be processed.
PROCESS_ONLY_APPENDED // When some files are appended, only appended contents will be processed.
}

private String path;
private long interval;
private WatchType watchType;

private FileSystem fileSystem;
private long lastModificationTime;
private Map<String, Long> offsetOfFiles;

public FileMonitoringFunction(String path, long interval, WatchType watchType) {
this.path = path;
this.interval = interval;
this.watchType = watchType;

this.lastModificationTime = System.currentTimeMillis();
this.offsetOfFiles = new HashMap<String, Long>();
}

@Override
public void invoke(Collector<Tuple3<String, Long, Long>> collector) throws Exception {
fileSystem = FileSystem.get(new URI(path));

while (true) {
List<String> files = listNewFiles();
for (String filePath : files) {
if (watchType == WatchType.ONLY_NEW_FILES || watchType == WatchType.REPROCESS_WITH_APPENDED) {
collector.collect(new Tuple3<String, Long, Long>(filePath, 0L, -1L));
offsetOfFiles.put(filePath, -1L);
} else if (watchType == WatchType.PROCESS_ONLY_APPENDED) {
long offset = 0;
long fileSize = fileSystem.getFileStatus(new Path(filePath)).getLen();
if (offsetOfFiles.containsKey(filePath)) {
offset = offsetOfFiles.get(filePath);
}

collector.collect(new Tuple3<String, Long, Long>(filePath, offset, fileSize));
offsetOfFiles.put(filePath, fileSize);

LOG.info("File processed: {}, {}, {}", filePath, offset, fileSize);
}
}

Thread.sleep(interval);
}
}

private List<String> listNewFiles() throws IOException {
List<String> files = new ArrayList<String>();
FileStatus[] statuses = fileSystem.listStatus(new Path(path));

for (FileStatus status : statuses) {
Path filePath = status.getPath();
long modificationTime = status.getModificationTime();

if (!isFiltered(filePath, modificationTime)) {
files.add(filePath.toString());
}
}

lastModificationTime = System.currentTimeMillis();

return files;
}

private boolean isFiltered(Path path, long modificationTime) {
String filename = path.getName();

return lastModificationTime > modificationTime // not modified file
|| (watchType == WatchType.ONLY_NEW_FILES && offsetOfFiles.containsKey(path.toString())) // modified file but already processed
|| filename.startsWith(".") // hidden file
|| filename.contains("_COPYING_"); // currently copying file
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,35 @@

package org.apache.flink.streaming.api.function.source;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.Collector;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;

import org.apache.flink.util.Collector;
public class FileReadFunction implements FlatMapFunction<Tuple3<String, Long, Long>, String> {

public class FileStreamFunction implements SourceFunction<String> {
private static final long serialVersionUID = 1L;

private final String path;
@Override
public void flatMap(Tuple3<String, Long, Long> value, Collector<String> out) throws Exception {
FSDataInputStream stream = FileSystem.get(new URI(value.f0)).open(new Path(value.f0));
stream.seek(value.f1);

public FileStreamFunction(String path) {
this.path = path;
}
BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
String line;

@Override
public void invoke(Collector<String> collector) throws IOException {
while (true) {
BufferedReader br = new BufferedReader(new FileReader(path));
String line = br.readLine();
while (line != null) {
if (!line.equals("")) {
collector.collect(line);
}
line = br.readLine();
try {
while ((line = reader.readLine()) != null && (value.f2 == -1L || stream.getPos() <= value.f2)) {
out.collect(line);
}
br.close();
} finally {
reader.close();
}
}
}

0 comments on commit bf5e39a

Please sign in to comment.