Skip to content

Commit

Permalink
Broke Pulsar-IO::HDFS into 3.x and 2.x versions (apache#2966)
Browse files Browse the repository at this point in the history
### Motivation

A user was attempting to use the existing HDFS connector to connect to a 2.x version of HDFS, but the current connector only supported 3.x version of HDFS. 

### Modifications

To address this issue, we renamed the current HDFS connector to HDFS3, and created a new 2.x compatible connector named HDFS2. 

The code in both of these are nearly identical with 2 notable exceptions. First and foremost, they both use different versions of the Hadoop-client library. And secondly, the HDFS2 version creates the FSDataOutputStream object directly, whereas the HDFS3 version leverages the FSDataOutputStreamBuilder class for this purpose, as it is the preferred method going forward.

### Result

There will be be support for connecting to both 2.x and 3.x version of HDFS. However, there MAY BE some library conflicts in the released jar due to the different versions of the same library in the different modules. Hopefully the NAR packaging will address this.
  • Loading branch information
david-streamlio authored and sijie committed Dec 16, 2018
1 parent 18b2a20 commit 2ccf7ff
Show file tree
Hide file tree
Showing 55 changed files with 2,075 additions and 81 deletions.
74 changes: 74 additions & 0 deletions pulsar-io/hdfs2/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-io</artifactId>
<version>2.3.0-SNAPSHOT</version>
</parent>
<artifactId>pulsar-io-hdfs2</artifactId>
<name>Pulsar IO :: Hdfs2</name>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>

<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.hdfs;
package org.apache.pulsar.io.hdfs2;

import java.io.Serializable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.hdfs;
package org.apache.pulsar.io.hdfs2;

import java.io.IOException;
import java.lang.ref.WeakReference;
Expand All @@ -42,7 +42,7 @@
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.pulsar.io.hdfs.sink.HdfsSinkConfig;
import org.apache.pulsar.io.hdfs2.sink.HdfsSinkConfig;

/**
* A Simple abstract class for HDFS connectors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.hdfs;
package org.apache.pulsar.io.hdfs2;

/**
* An enumeration of compression codecs available for HDFS.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.hdfs;
package org.apache.pulsar.io.hdfs2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.hdfs;
package org.apache.pulsar.io.hdfs2;

import java.io.IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.hdfs;
package org.apache.pulsar.io.hdfs2;
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.hdfs2.sink;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.hdfs2.AbstractHdfsConnector;
import org.apache.pulsar.io.hdfs2.HdfsResources;

/**
* A Simple abstract class for HDFS sink.
* Users need to implement extractKeyValue function to use this sink.
*/
public abstract class HdfsAbstractSink<K, V> extends AbstractHdfsConnector implements Sink<V> {

protected HdfsSinkConfig hdfsSinkConfig;
protected BlockingQueue<Record<V>> unackedRecords;
protected HdfsSyncThread<V> syncThread;
private Path path;
private FSDataOutputStream hdfsStream;

public abstract KeyValue<K, V> extractKeyValue(Record<V> record);
protected abstract void createWriter() throws IOException;

@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
hdfsSinkConfig = HdfsSinkConfig.load(config);
hdfsSinkConfig.validate();
connectorConfig = hdfsSinkConfig;
unackedRecords = new LinkedBlockingQueue<Record<V>> (hdfsSinkConfig.getMaxPendingRecords());
connectToHdfs();
createWriter();
launchSyncThread();
}

@Override
public void close() throws Exception {
syncThread.halt();
syncThread.join(0);
}

protected final void connectToHdfs() throws IOException {
try {
HdfsResources resources = hdfsResources.get();

if (resources.getConfiguration() == null) {
resources = this.resetHDFSResources(hdfsSinkConfig);
hdfsResources.set(resources);
}
} catch (IOException ex) {
hdfsResources.set(new HdfsResources(null, null, null));
throw ex;
}
}

protected FSDataOutputStream getHdfsStream() throws IllegalArgumentException, IOException {
if (hdfsStream == null) {
Path path = getPath();
FileSystem fs = getFileSystemAsUser(getConfiguration(), getUserGroupInformation());
hdfsStream = fs.exists(path) ? fs.append(path) : fs.create(path);
}
return hdfsStream;
}

protected final Path getPath() {
if (path == null) {
String ext = "";
if (StringUtils.isNotBlank(hdfsSinkConfig.getFileExtension())) {
ext = hdfsSinkConfig.getFileExtension();
} else if (getCompressionCodec() != null) {
ext = getCompressionCodec().getDefaultExtension();
}

path = new Path(FilenameUtils.concat(hdfsSinkConfig.getDirectory(),
hdfsSinkConfig.getFilenamePrefix() + "-" + System.currentTimeMillis() + ext));
}
return path;
}

protected final void launchSyncThread() throws IOException {
syncThread = new HdfsSyncThread<V>(getHdfsStream(), unackedRecords, hdfsSinkConfig.getSyncInterval());
syncThread.start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.hdfs.sink;
package org.apache.pulsar.io.hdfs2.sink;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
Expand All @@ -34,7 +34,7 @@
import lombok.experimental.Accessors;

import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.io.hdfs.AbstractHdfsConfig;
import org.apache.pulsar.io.hdfs2.AbstractHdfsConfig;

/**
* Configuration object for all HDFS Sink components.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.hdfs.sink;
package org.apache.pulsar.io.hdfs2.sink;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.hdfs.sink.seq;
package org.apache.pulsar.io.hdfs2.sink;
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.hdfs.sink.seq;
package org.apache.pulsar.io.hdfs2.sink.seq;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -30,7 +30,7 @@
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.hdfs.sink.HdfsAbstractSink;
import org.apache.pulsar.io.hdfs2.sink.HdfsAbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.hdfs.sink.seq;
package org.apache.pulsar.io.hdfs2.sink.seq;

import java.io.IOException;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.hdfs.sink.seq;
package org.apache.pulsar.io.hdfs2.sink.seq;

import java.io.IOException;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.hdfs.sink.text;
package org.apache.pulsar.io.hdfs2.sink.seq;
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.hdfs.sink.text;
package org.apache.pulsar.io.hdfs2.sink.text;

import java.io.BufferedOutputStream;
import java.io.IOException;
Expand All @@ -26,7 +26,7 @@
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.hdfs.sink.HdfsAbstractSink;
import org.apache.pulsar.io.hdfs2.sink.HdfsAbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.hdfs.sink.text;
package org.apache.pulsar.io.hdfs2.sink.text;

import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.hdfs2.sink.text;
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@
# under the License.
#

name: hdfs
description: Writes data into HDFS
sinkClass: org.apache.pulsar.io.hdfs.sink.text.HdfsStringSink
name: hdfs2
description: Writes data into HDFS 2.x
sinkClass: org.apache.pulsar.io.hdfs2.sink.text.HdfsStringSink
Loading

0 comments on commit 2ccf7ff

Please sign in to comment.