Skip to content

Commit

Permalink
[pulsar-io-hdfs2] Add config to create subdirectory from current time (
Browse files Browse the repository at this point in the history
…apache#7771)

### Motivation

Adding a subdirectory associated with current time willmake it easier to process HDFS files in batch.

For example, user can create multiple running sink instances with `yyyy-MM-dd-hh` pattern. Then stop all instances at next hour. Eventually, files of the subdirectory will contain all messages consumed during this hour.

### Modifications

- Add a `subdirectoryPattern` field to `HdfsSinkConfig`
- Update some simple tests for `HdfsSinkConfig`
- Update the doc of HDFS2 sink

### Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs)
  • Loading branch information
BewareMyPower authored Aug 11, 2020
1 parent 0e67fc5 commit 569b8f9
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
package org.apache.pulsar.io.hdfs2.sink;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
Expand All @@ -39,13 +42,15 @@
* A Simple abstract class for HDFS sink.
* Users need to implement extractKeyValue function to use this sink.
*/
@Slf4j
public abstract class HdfsAbstractSink<K, V> extends AbstractHdfsConnector implements Sink<V> {

protected HdfsSinkConfig hdfsSinkConfig;
protected BlockingQueue<Record<V>> unackedRecords;
protected HdfsSyncThread<V> syncThread;
private Path path;
private FSDataOutputStream hdfsStream;
private DateTimeFormatter subdirectoryFormatter;

public abstract KeyValue<K, V> extractKeyValue(Record<V> record);
protected abstract void createWriter() throws IOException;
Expand All @@ -56,6 +61,9 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
hdfsSinkConfig.validate();
connectorConfig = hdfsSinkConfig;
unackedRecords = new LinkedBlockingQueue<Record<V>> (hdfsSinkConfig.getMaxPendingRecords());
if (hdfsSinkConfig.getSubdirectoryPattern() != null) {
subdirectoryFormatter = DateTimeFormatter.ofPattern(hdfsSinkConfig.getSubdirectoryPattern());
}
connectToHdfs();
createWriter();
launchSyncThread();
Expand Down Expand Up @@ -99,8 +107,13 @@ protected final Path getPath() {
ext = getCompressionCodec().getDefaultExtension();
}

path = new Path(FilenameUtils.concat(hdfsSinkConfig.getDirectory(),
String directory = hdfsSinkConfig.getDirectory();
if (subdirectoryFormatter != null) {
directory = FilenameUtils.concat(directory, LocalDateTime.now().format(subdirectoryFormatter));
}
path = new Path(FilenameUtils.concat(directory,
hdfsSinkConfig.getFilenamePrefix() + "-" + System.currentTimeMillis() + ext));
log.info("Create path: {}", path);
}
return path;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;

import lombok.Data;
Expand Down Expand Up @@ -73,6 +75,14 @@ public class HdfsSinkConfig extends AbstractHdfsConfig implements Serializable {
*/
private int maxPendingRecords = Integer.MAX_VALUE;

/**
* A subdirectory associated with the created time of the sink.
* The pattern is the formatted pattern of {@link AbstractHdfsConfig#getDirectory()}'s subdirectory.
*
* @see java.time.format.DateTimeFormatter for pattern's syntax
*/
private String subdirectoryPattern;

public static HdfsSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), HdfsSinkConfig.class);
Expand All @@ -87,16 +97,24 @@ public static HdfsSinkConfig load(Map<String, Object> map) throws IOException {
public void validate() {
super.validate();
if ((StringUtils.isEmpty(fileExtension) && getCompression() == null)
|| StringUtils.isEmpty(filenamePrefix)) {
throw new IllegalArgumentException("Required property not set.");
|| StringUtils.isEmpty(filenamePrefix)) {
throw new IllegalArgumentException("Required property not set.");
}

if (syncInterval < 0) {
throw new IllegalArgumentException("Sync Interval cannot be negative");
throw new IllegalArgumentException("Sync Interval cannot be negative");
}

if (maxPendingRecords < 1) {
throw new IllegalArgumentException("Max Pending Records must be a positive integer");
throw new IllegalArgumentException("Max Pending Records must be a positive integer");
}

if (subdirectoryPattern != null) {
try {
LocalDateTime.of(2020, 1, 1, 12, 0).format(DateTimeFormatter.ofPattern(subdirectoryPattern));
} catch (Exception e) {
throw new IllegalArgumentException(subdirectoryPattern + " is not a valid pattern: " + e.getMessage());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public final void loadFromYamlFileTest() throws IOException {
assertEquals("/foo/bar", config.getDirectory());
assertEquals("prefix", config.getFilenamePrefix());
assertEquals(Compression.SNAPPY, config.getCompression());
assertEquals("yyyy-MM-dd", config.getSubdirectoryPattern());
}

@Test
Expand All @@ -53,13 +54,15 @@ public final void loadFromMapTest() throws IOException {
map.put("directory", "/foo/bar");
map.put("filenamePrefix", "prefix");
map.put("compression", "SNAPPY");
map.put("subdirectoryPattern", "yy-MM-dd");

HdfsSinkConfig config = HdfsSinkConfig.load(map);
assertNotNull(config);
assertEquals("core-site.xml", config.getHdfsConfigResources());
assertEquals("/foo/bar", config.getDirectory());
assertEquals("prefix", config.getFilenamePrefix());
assertEquals(Compression.SNAPPY, config.getCompression());
assertEquals("yy-MM-dd", config.getSubdirectoryPattern());
}

@Test
Expand Down
3 changes: 2 additions & 1 deletion pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@
"hdfsConfigResources": "core-site.xml",
"directory": "/foo/bar",
"filenamePrefix": "prefix",
"compression": "SNAPPY"
"compression": "SNAPPY",
"subdirectoryPattern": "yyyy-MM-dd"
}
5 changes: 4 additions & 1 deletion site2/docs/io-hdfs2-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The configuration of the HDFS2 sink connector has the following properties.
| `separator` | char|false |None |The character used to separate records in a text file. <br/><br/>If no value is provided, the contents from all records are concatenated together in one continuous byte array. |
| `syncInterval` | long| false |0| The interval between calls to flush data to HDFS disk in milliseconds. |
| `maxPendingRecords` |int| false|Integer.MAX_VALUE | The maximum number of records that hold in memory before acking. <br/><br/>Setting this property to 1 makes every record send to disk before the record is acked.<br/><br/>Setting this property to a higher value allows buffering records before flushing them to disk.
| `subdirectoryPattern` | String | false | None | A subdirectory associated with the created time of the sink.<br/>The pattern is the formatted pattern of `directory`'s subdirectory.<br/><br/>See [DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html) for pattern's syntax. |

### Example

Expand All @@ -39,7 +40,8 @@ Before using the HDFS2 sink connector, you need to create a configuration file t
"directory": "/foo/bar",
"filenamePrefix": "prefix",
"fileExtension": ".log",
"compression": "SNAPPY"
"compression": "SNAPPY",
"subdirectoryPattern": "yyyy-MM-dd"
}
```

Expand All @@ -52,4 +54,5 @@ Before using the HDFS2 sink connector, you need to create a configuration file t
filenamePrefix: "prefix"
fileExtension: ".log"
compression: "SNAPPY"
subdirectoryPattern: "yyyy-MM-dd"
```

0 comments on commit 569b8f9

Please sign in to comment.