Skip to content

Commit

Permalink
NIFI-5452: Allow ignore block locality in HDFS
Browse files Browse the repository at this point in the history
Fixed formatting. Fallback to autoboxing

This closes apache#3652.
  • Loading branch information
belugabehr authored and joewitt committed Oct 28, 2019
1 parent 5fd8df5 commit dccbde4
Showing 1 changed file with 26 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package org.apache.nifi.processors.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsCreateModes;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.ipc.RemoteException;
Expand Down Expand Up @@ -61,6 +63,7 @@
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -170,6 +173,17 @@ public class PutHDFS extends AbstractHadoopProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();

public static final PropertyDescriptor IGNORE_LOCALITY = new PropertyDescriptor.Builder()
.name("Ignore Locality")
.displayName("Ignore Locality")
.description(
"Directs the HDFS system to ignore locality rules so that data is distributed randomly throughout the cluster")
.required(false)
.defaultValue("false")
.allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();

private static final Set<Relationship> relationships;

static {
Expand Down Expand Up @@ -199,6 +213,7 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
props.add(REMOTE_OWNER);
props.add(REMOTE_GROUP);
props.add(COMPRESSION_CODEC);
props.add(IGNORE_LOCALITY);
return props;
}

Expand Down Expand Up @@ -313,8 +328,18 @@ public void process(InputStream in) throws IOException {
if (conflictResponse.equals(APPEND_RESOLUTION_AV.getValue()) && destinationExists) {
fos = hdfs.append(copyFile, bufferSize);
} else {
fos = hdfs.create(tempCopyFile, true, bufferSize, replication, blockSize);
final EnumSet<CreateFlag> cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);

final Boolean ignoreLocality = context.getProperty(IGNORE_LOCALITY).asBoolean();
if (ignoreLocality) {
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
}

fos = hdfs.create(tempCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(),
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize,
null, null);
}

if (codec != null) {
fos = codec.createOutputStream(fos);
}
Expand Down

0 comments on commit dccbde4

Please sign in to comment.