Skip to content

Commit

Permalink
StringWriter support custom row delimiter
Browse files Browse the repository at this point in the history
  • Loading branch information
wangxiaowei authored and pnowojski committed Aug 8, 2019
1 parent 7b70bc9 commit eb4b3b7
Showing 3 changed files with 124 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -41,12 +41,18 @@ public class StringWriter<T> extends StreamWriterBase<T> {

private transient Charset charset;

private final String rowDelimiter;

private static final String DEFAULT_ROW_DELIMITER = "\n";

private byte[] rowDelimiterBytes;

/**
* Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert
* strings to bytes.
*/
public StringWriter() {
this("UTF-8");
this("UTF-8", DEFAULT_ROW_DELIMITER);
}

/**
@@ -56,12 +62,25 @@ public StringWriter() {
* @param charsetName Name of the charset to be used, must be valid input for {@code Charset.forName(charsetName)}
*/
public StringWriter(String charsetName) {
this(charsetName, DEFAULT_ROW_DELIMITER);
}

/**
* Creates a new {@code StringWriter} that uses the given charset and row delimiter to convert
* strings to bytes.
*
* @param charsetName Name of the charset to be used, must be valid input for {@code Charset.forName(charsetName)}
* @param rowDelimiter Parameter that specifies which character to use for delimiting rows
*/
public StringWriter(String charsetName, String rowDelimiter) {
this.charsetName = charsetName;
this.rowDelimiter = rowDelimiter;
}

protected StringWriter(StringWriter<T> other) {
super(other);
this.charsetName = other.charsetName;
this.rowDelimiter = other.rowDelimiter;
}

@Override
@@ -70,6 +89,7 @@ public void open(FileSystem fs, Path path) throws IOException {

try {
this.charset = Charset.forName(charsetName);
this.rowDelimiterBytes = rowDelimiter.getBytes(charset);
}
catch (IllegalCharsetNameException e) {
throw new IOException("The charset " + charsetName + " is not valid.", e);
@@ -83,7 +103,7 @@ public void open(FileSystem fs, Path path) throws IOException {
public void write(T element) throws IOException {
FSDataOutputStream outputStream = getStream();
outputStream.write(element.toString().getBytes(charset));
outputStream.write('\n');
outputStream.write(rowDelimiterBytes);
}

@Override
@@ -94,4 +114,8 @@ public StringWriter<T> duplicate() {
String getCharsetName() {
return charsetName;
}

public String getRowDelimiter() {
return rowDelimiter;
}
}
Original file line number Diff line number Diff line change
@@ -55,6 +55,7 @@ public static <T> boolean equals(
StringWriter<T> writer1,
StringWriter<T> writer2) {
return equals((StreamWriterBase) writer1, (StreamWriterBase) writer2) &&
Objects.equals(writer1.getCharsetName(), writer2.getCharsetName());
Objects.equals(writer1.getCharsetName(), writer2.getCharsetName()) &&
Objects.equals(writer1.getRowDelimiter(), writer2.getRowDelimiter());
}
}
Original file line number Diff line number Diff line change
@@ -18,18 +18,57 @@

package org.apache.flink.streaming.connectors.fs;

import org.apache.flink.util.NetUtils;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.StringTokenizer;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;


/**
* Tests for {@link StringWriter}.
*/
public class StringWriterTest {

@ClassRule
public static TemporaryFolder tempFolder = new TemporaryFolder();

private static MiniDFSCluster hdfsCluster;
private static org.apache.hadoop.fs.FileSystem dfs;

private static String outputDir;

@Before
public void createHDFS() throws IOException {
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();

File dataDir = tempFolder.newFolder();

conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
hdfsCluster = builder.build();

dfs = hdfsCluster.getFileSystem();

outputDir = "hdfs://"
+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort());
}

@Test
public void testDuplicate() {
StringWriter<String> writer = new StringWriter(StandardCharsets.UTF_16.name());
@@ -41,5 +80,62 @@ public void testDuplicate() {
writer.setSyncOnFlush(false);
assertFalse(StreamWriterBaseComparator.equals(writer, other));
assertFalse(StreamWriterBaseComparator.equals(writer, new StringWriter<>()));

}

@Test
public void testMultiRowdelimiters() throws IOException {
String rowDelimiter1 = "\n";
String testDat1 = "A" + rowDelimiter1 + "B" + rowDelimiter1 + "C" + rowDelimiter1 + "D" + rowDelimiter1 + "E";
Path testFile1 = new Path(outputDir + "/test01");
testRowdelimiter(rowDelimiter1, testDat1, StandardCharsets.UTF_8.name(), testFile1);

String rowDelimiter2 = "\r\n";
String testDat2 = "A" + rowDelimiter2 + "B" + rowDelimiter2 + "C" + rowDelimiter2 + "D" + rowDelimiter2 + "E";
Path testFile2 = new Path(outputDir + "/test02");
testRowdelimiter(rowDelimiter2, testDat2, StandardCharsets.UTF_8.name(), testFile2);

String rowDelimiter3 = "*";
String testDat3 = "A" + rowDelimiter3 + "B" + rowDelimiter3 + "C" + rowDelimiter3 + "D" + rowDelimiter3 + "E";
Path testFile3 = new Path(outputDir + "/test03");
testRowdelimiter(rowDelimiter3, testDat3, StandardCharsets.UTF_8.name(), testFile3);

String rowDelimiter4 = "##";
String testDat4 = "A" + rowDelimiter4 + "B" + rowDelimiter4 + "C" + rowDelimiter4 + "D" + rowDelimiter4 + "E";
Path testFile4 = new Path(outputDir + "/test04");
testRowdelimiter(rowDelimiter4, testDat4, StandardCharsets.UTF_8.name(), testFile4);

}

private void testRowdelimiter(String rowDelimiter, String inputData, String charset, Path outputFile) throws IOException {
StringWriter<String> writer = new StringWriter(charset, rowDelimiter);
writer.open(dfs, outputFile);
StringTokenizer lineTokenizer = new StringTokenizer(inputData, rowDelimiter);
while (lineTokenizer.hasMoreTokens()){
writer.write(lineTokenizer.nextToken());
}
writer.close();
FSDataInputStream inStream = dfs.open(outputFile);
byte[] buffer = new byte[inputData.getBytes(charset).length];
readFully(inStream, buffer);
inStream.close();
String outputData = new String(buffer, charset);
Assert.assertEquals(inputData, outputData);

}

private void readFully(InputStream in, byte[] buffer) throws IOException {
int pos = 0;
int remaining = buffer.length;

while (remaining > 0) {
int read = in.read(buffer, pos, remaining);
if (read == -1) {
throw new EOFException();
}

pos += read;
remaining -= read;
}
}
}

0 comments on commit eb4b3b7

Please sign in to comment.