forked from apache/pulsar
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added Pulsar IO connector for local files (apache#2869)
### Motivation Added a Pulsar IO connector for consuming files from the local filesystem ### Modifications Added a new module to the pulsar-io module that includes the Pulsar file connector and its associated classes & tests ### Result After your change, users will be able to consume files from the local filesystem, and have the contents directly published to a Pulsar topic.
- Loading branch information
1 parent
f15afec
commit 209c448
Showing
26 changed files
with
2,185 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
<!-- | ||
Licensed to the Apache Software Foundation (ASF) under one | ||
or more contributor license agreements. See the NOTICE file | ||
distributed with this work for additional information | ||
regarding copyright ownership. The ASF licenses this file | ||
to you under the Apache License, Version 2.0 (the | ||
"License"); you may not use this file except in compliance | ||
with the License. You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, | ||
software distributed under the License is distributed on an | ||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
KIND, either express or implied. See the License for the | ||
specific language governing permissions and limitations | ||
under the License. | ||
--> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
<parent> | ||
<groupId>org.apache.pulsar</groupId> | ||
<artifactId>pulsar-io</artifactId> | ||
<version>2.3.0-SNAPSHOT</version> | ||
</parent> | ||
|
||
<artifactId>pulsar-io-file</artifactId> | ||
<name>Pulsar IO :: File</name> | ||
|
||
<dependencies> | ||
|
||
<dependency> | ||
<groupId>${project.groupId}</groupId> | ||
<artifactId>pulsar-io-core</artifactId> | ||
<version>${project.version}</version> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>com.fasterxml.jackson.dataformat</groupId> | ||
<artifactId>jackson-dataformat-yaml</artifactId> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>com.fasterxml.jackson.core</groupId> | ||
<artifactId>jackson-databind</artifactId> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>commons-io</groupId> | ||
<artifactId>commons-io</artifactId> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.commons</groupId> | ||
<artifactId>commons-lang3</artifactId> | ||
</dependency> | ||
</dependencies> | ||
|
||
<build> | ||
<plugins> | ||
<plugin> | ||
<groupId>org.apache.nifi</groupId> | ||
<artifactId>nifi-nar-maven-plugin</artifactId> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
|
||
</project> |
108 changes: 108 additions & 0 deletions
108
pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileConsumerThread.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pulsar.io.file; | ||
|
||
import java.io.File; | ||
import java.io.IOException; | ||
import java.nio.charset.Charset; | ||
import java.nio.file.Files; | ||
import java.nio.file.Paths; | ||
import java.util.concurrent.BlockingQueue; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.stream.Stream; | ||
|
||
import org.apache.pulsar.io.core.PushSource; | ||
import org.apache.pulsar.io.file.utils.GZipFiles; | ||
import org.apache.pulsar.io.file.utils.ZipFiles; | ||
|
||
/** | ||
* Worker thread that consumes the contents of the files | ||
* and publishes them to a Pulsar topic. | ||
*/ | ||
public class FileConsumerThread extends Thread { | ||
|
||
private final PushSource<byte[]> source; | ||
private final BlockingQueue<File> workQueue; | ||
private final BlockingQueue<File> inProcess; | ||
private final BlockingQueue<File> recentlyProcessed; | ||
|
||
public FileConsumerThread(PushSource<byte[]> source, | ||
BlockingQueue<File> workQueue, | ||
BlockingQueue<File> inProcess, | ||
BlockingQueue<File> recentlyProcessed) { | ||
this.source = source; | ||
this.workQueue = workQueue; | ||
this.inProcess = inProcess; | ||
this.recentlyProcessed = recentlyProcessed; | ||
} | ||
|
||
public void run() { | ||
try { | ||
while (true) { | ||
File file = workQueue.take(); | ||
|
||
boolean added = false; | ||
do { | ||
added = inProcess.add(file); | ||
} while (!added); | ||
|
||
consumeFile(file); | ||
} | ||
} catch (InterruptedException ie) { | ||
// just terminate | ||
} | ||
} | ||
|
||
private void consumeFile(File file) { | ||
final AtomicInteger idx = new AtomicInteger(1); | ||
try (Stream<String> lines = getLines(file)) { | ||
lines.forEachOrdered(line -> process(file, idx.getAndIncrement(), line)); | ||
} catch (IOException e) { | ||
e.printStackTrace(); | ||
} finally { | ||
|
||
boolean removed = false; | ||
do { | ||
removed = inProcess.remove(file); | ||
} while (!removed); | ||
|
||
boolean added = false; | ||
do { | ||
added = recentlyProcessed.add(file); | ||
} while (!added); | ||
} | ||
} | ||
|
||
private Stream<String> getLines(File file) throws IOException { | ||
if (file == null) { | ||
return null; | ||
} else if (GZipFiles.isGzip(file)) { | ||
return GZipFiles.lines(Paths.get(file.getAbsolutePath())); | ||
} else if (ZipFiles.isZip(file)) { | ||
return ZipFiles.lines(Paths.get(file.getAbsolutePath())); | ||
} else { | ||
return Files.lines(Paths.get(file.getAbsolutePath()), Charset.defaultCharset()); | ||
} | ||
} | ||
|
||
private void process(File srcFile, int lineNumber, String line) { | ||
source.consume(new FileRecord(srcFile, lineNumber, line.getBytes())); | ||
} | ||
|
||
} |
189 changes: 189 additions & 0 deletions
189
pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,189 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pulsar.io.file; | ||
|
||
import java.io.File; | ||
import java.io.FileFilter; | ||
import java.nio.file.Files; | ||
import java.nio.file.Path; | ||
import java.nio.file.Paths; | ||
import java.util.HashSet; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.concurrent.BlockingQueue; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.concurrent.locks.Lock; | ||
import java.util.concurrent.locks.ReentrantLock; | ||
import java.util.regex.Pattern; | ||
|
||
/** | ||
* Worker thread that checks the configured input directory for | ||
* files that meet the provided filtering criteria, and publishes | ||
* them to a work queue for processing by the FileConsumerThreads. | ||
*/ | ||
public class FileListingThread extends Thread { | ||
|
||
private final AtomicLong queueLastUpdated = new AtomicLong(0L); | ||
private final Lock listingLock = new ReentrantLock(); | ||
private final AtomicReference<FileFilter> fileFilterRef = new AtomicReference<>(); | ||
private final BlockingQueue<File> workQueue; | ||
private final BlockingQueue<File> inProcess; | ||
private final BlockingQueue<File> recentlyProcessed; | ||
|
||
private final String inputDir; | ||
private final boolean recurseDirs; | ||
private final boolean keepOriginal; | ||
private final long pollingInterval; | ||
|
||
public FileListingThread(FileSourceConfig fileConfig, | ||
BlockingQueue<File> workQueue, | ||
BlockingQueue<File> inProcess, | ||
BlockingQueue<File> recentlyProcessed) { | ||
this.workQueue = workQueue; | ||
this.inProcess = inProcess; | ||
this.recentlyProcessed = recentlyProcessed; | ||
|
||
inputDir = fileConfig.getInputDirectory(); | ||
recurseDirs = Optional.ofNullable(fileConfig.getRecurse()).orElse(true); | ||
keepOriginal = Optional.ofNullable(fileConfig.getKeepFile()).orElse(false); | ||
pollingInterval = Optional.ofNullable(fileConfig.getPollingInterval()).orElse(10000L); | ||
fileFilterRef.set(createFileFilter(fileConfig)); | ||
} | ||
|
||
public void run() { | ||
while (true) { | ||
if ((queueLastUpdated.get() < System.currentTimeMillis() - pollingInterval) && listingLock.tryLock()) { | ||
try { | ||
final File directory = new File(inputDir); | ||
final Set<File> listing = performListing(directory, fileFilterRef.get(), recurseDirs); | ||
|
||
if (listing != null && !listing.isEmpty()) { | ||
|
||
// Remove any files that have been or are currently being processed. | ||
listing.removeAll(inProcess); | ||
if (!keepOriginal) { | ||
listing.removeAll(recentlyProcessed); | ||
} | ||
|
||
for (File f: listing) { | ||
if (!workQueue.contains(f)) { | ||
workQueue.offer(f); | ||
} | ||
} | ||
queueLastUpdated.set(System.currentTimeMillis()); | ||
} | ||
|
||
} finally { | ||
listingLock.unlock(); | ||
} | ||
} | ||
|
||
try { | ||
sleep(pollingInterval - 1); | ||
} catch (InterruptedException e) { | ||
// Just ignore | ||
} | ||
} | ||
} | ||
|
||
private Set<File> performListing(final File directory, final FileFilter filter, | ||
final boolean recurseSubdirectories) { | ||
Path p = directory.toPath(); | ||
if (!Files.isWritable(p) || !Files.isReadable(p)) { | ||
throw new IllegalStateException("Directory '" + directory | ||
+ "' does not have sufficient permissions (i.e., not writable and readable)"); | ||
} | ||
final Set<File> queue = new HashSet<>(); | ||
if (!directory.exists()) { | ||
return queue; | ||
} | ||
|
||
final File[] children = directory.listFiles(); | ||
if (children == null) { | ||
return queue; | ||
} | ||
|
||
for (final File child : children) { | ||
if (child.isDirectory()) { | ||
if (recurseSubdirectories) { | ||
queue.addAll(performListing(child, filter, recurseSubdirectories)); | ||
} | ||
} else if (filter.accept(child)) { | ||
queue.add(child); | ||
} | ||
} | ||
|
||
return queue; | ||
} | ||
|
||
private FileFilter createFileFilter(FileSourceConfig fileConfig) { | ||
final long minSize = Optional.ofNullable(fileConfig.getMinimumSize()).orElse(1); | ||
final Double maxSize = Optional.ofNullable(fileConfig.getMaximumSize()).orElse(Double.MAX_VALUE); | ||
final long minAge = Optional.ofNullable(fileConfig.getMinimumFileAge()).orElse(0); | ||
final Long maxAge = Optional.ofNullable(fileConfig.getMaximumFileAge()).orElse(Long.MAX_VALUE); | ||
final boolean ignoreHidden = Optional.ofNullable(fileConfig.getIgnoreHiddenFiles()).orElse(true); | ||
final Pattern filePattern = Pattern.compile(Optional.ofNullable(fileConfig.getFileFilter()).orElse("[^\\.].*")); | ||
final String indir = fileConfig.getInputDirectory(); | ||
final String pathPatternStr = fileConfig.getPathFilter(); | ||
final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ? null : Pattern.compile(pathPatternStr); | ||
|
||
return new FileFilter() { | ||
@Override | ||
public boolean accept(final File file) { | ||
if (minSize > file.length()) { | ||
return false; | ||
} | ||
if (maxSize != null && maxSize < file.length()) { | ||
return false; | ||
} | ||
final long fileAge = System.currentTimeMillis() - file.lastModified(); | ||
if (minAge > fileAge) { | ||
return false; | ||
} | ||
if (maxAge != null && maxAge < fileAge) { | ||
return false; | ||
} | ||
if (ignoreHidden && file.isHidden()) { | ||
return false; | ||
} | ||
if (pathPattern != null) { | ||
Path reldir = Paths.get(indir).relativize(file.toPath()).getParent(); | ||
if (reldir != null && !reldir.toString().isEmpty()) { | ||
if (!pathPattern.matcher(reldir.toString()).matches()) { | ||
return false; | ||
} | ||
} | ||
} | ||
//Verify that we have at least read permissions on the file we're considering grabbing | ||
if (!Files.isReadable(file.toPath())) { | ||
return false; | ||
} | ||
|
||
/* Verify that if we're not keeping original that we have write | ||
* permissions on the directory the file is in | ||
*/ | ||
if (!keepOriginal && !Files.isWritable(file.toPath().getParent())) { | ||
return false; | ||
} | ||
return filePattern.matcher(file.getName()).matches(); | ||
} | ||
}; | ||
} | ||
} |
Oops, something went wrong.