Skip to content

Commit

Permalink
Add CSV data transformation module (akka#213)
Browse files Browse the repository at this point in the history
* Alpakka Issue akka#66: CSV component

* Alpakka Issue akka#66: revised CSV parser

* Alpakka Issue akka#60: CSV parsing stage

* wait for line end before issuing line

As the byte string may not contain a whole line the parser needs to read until a line end is reached.

* Add Java API and JUnit test; add a bit of documentation

* Introduce CsvToMap stage; more documentation

* Parse line even without line end at upstream finish

* Add Java API for CsvToMap; more documentation

* More restricted API, incorporated comments by @johanandren

* Format sequence as CSV in ByteString

* Add Scala CSV formatting stage

* Add Java API for CSV formatting; more docs

* Separate enums for Java and Scala DSLs

* Use Flow.fromGraph to construct flow

* Rename CsvFraming to CsvParsing

* Check for Byte Order Mark and ignore it for UTF-8

* Emit Byte Order Marks in formatting; CsvFormatting is just a map

* Byte Order Mark for Java API

* Add line number to error messages; sample files exported from third party software

* Use Charset directly instead of name

* csv: autoformatted files

* simplified dependency declaration

Fixes akka#60.
  • Loading branch information
ennru authored and jrudolph committed Apr 12, 2017
1 parent 0f7a409 commit b062d15
Show file tree
Hide file tree
Showing 30 changed files with 2,056 additions and 1 deletion.
9 changes: 8 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
lazy val alpakka = project
.in(file("."))
.enablePlugins(PublishUnidoc)
.aggregate(amqp, awslambda, cassandra, dynamodb, files, ftp, googleCloudPubSub, hbase, jms, mqtt, s3, simpleCodecs, sqs, sse)
.aggregate(amqp, awslambda, cassandra, csv, dynamodb, files, ftp, googleCloudPubSub, hbase, jms, mqtt, s3, simpleCodecs, sqs, sse)

lazy val amqp = project
.enablePlugins(AutomateHeaderPlugin)
Expand All @@ -24,6 +24,13 @@ lazy val cassandra = project
Dependencies.Cassandra
)

lazy val csv = project
.enablePlugins(AutomateHeaderPlugin)
.settings(
name := "akka-stream-alpakka-csv",
Dependencies.Csv
)

lazy val dynamodb = project
.enablePlugins(AutomateHeaderPlugin)
.settings(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.csv.javadsl;

import akka.util.ByteString;

/**
* Byte Order Marks may be used to indicate the used character encoding
* in text files.
*
* @see <a href="http://www.unicode.org/faq/utf_bom.html#bom1">Unicode FAQ Byte Order Mark</a>
*/
public class ByteOrderMark {
/** Byte Order Mark for UTF-16 big-endian */
public static final ByteString UTF_16_BE = akka.stream.alpakka.csv.scaladsl.ByteOrderMark.UTF_16_BE();

/** Byte Order Mark for UTF-16 little-endian */
public static final ByteString UTF_16_LE = akka.stream.alpakka.csv.scaladsl.ByteOrderMark.UTF_16_LE();

/** Byte Order Mark for UTF-32 big-endian */
public static final ByteString UTF_32_BE = akka.stream.alpakka.csv.scaladsl.ByteOrderMark.UTF_32_BE();

/** Byte Order Mark for UTF-32 little-endian */
public static final ByteString UTF_32_LE = akka.stream.alpakka.csv.scaladsl.ByteOrderMark.UTF_32_LE();

/** Byte Order Mark for UTF-8 */
public static final ByteString UTF_8 = akka.stream.alpakka.csv.scaladsl.ByteOrderMark.UTF_8();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.csv.javadsl;

import akka.NotUsed;
import akka.stream.alpakka.csv.scaladsl.CsvQuotingStyle$;
import akka.stream.javadsl.Flow;
import akka.util.ByteString;
import scala.Option;
import scala.Some;
import scala.collection.JavaConversions;
import scala.collection.immutable.List;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Optional;

/**
* Provides CSV formatting flows that convert a sequence of String into their CSV representation
* in {@see akka.util.ByteString}.
*/
public class CsvFormatting {

public static final char BACKSLASH = '\\';
public static final char COMMA = ',';
public static final char SEMI_COLON = ';';
public static final char COLON = ':';
public static final char TAB = '\t';
public static final char DOUBLE_QUOTE = '"';
public static final String CR_LF = "\r\n";

/**
* Generates standard CSV format (with commas).
*
* @param <T> Any collection implementation
* @return The formatting flow
*/
public static <T extends Collection<String>> Flow<T, ByteString, NotUsed> format() {
return format(COMMA, DOUBLE_QUOTE, BACKSLASH, CR_LF, CsvQuotingStyle.REQUIRED, StandardCharsets.UTF_8, Optional.empty());
}

/**
* Generates CSV with the specified special characters and character set.
*
* @param delimiter Delimiter between columns
* @param quoteChar Quoting character
* @param escapeChar Escape character
* @param endOfLine End of line character sequence
* @param quotingStyle Quote all values or as required
* @param charset Character set to be used
* @param <T> Any collection implementation
* @return The formatting flow
*/
public static <T extends Collection<String>> Flow<T, ByteString, NotUsed> format(char delimiter, char quoteChar, char escapeChar, String endOfLine, CsvQuotingStyle quotingStyle, Charset charset, Optional<ByteString> byteOrderMark) {
akka.stream.alpakka.csv.scaladsl.CsvQuotingStyle qs = CsvQuotingStyle$.MODULE$.asScala(quotingStyle);
Option<ByteString> byteOrderMarkScala = byteOrderMark.<Option<ByteString>>map(Some::apply).orElse(Option.empty());
akka.stream.scaladsl.Flow<List<String>, ByteString, NotUsed> formattingFlow
= akka.stream.alpakka.csv.scaladsl.CsvFormatting
.format(delimiter, quoteChar, escapeChar, endOfLine, qs, charset, byteOrderMarkScala);
return Flow.<T>create()
.map(c -> JavaConversions.collectionAsScalaIterable(c).toList())
.via(formattingFlow);
}
}
31 changes: 31 additions & 0 deletions csv/src/main/java/akka/stream/alpakka/csv/javadsl/CsvParsing.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.csv.javadsl;

import akka.NotUsed;
import akka.stream.javadsl.Flow;
import akka.util.ByteString;
import scala.collection.JavaConversions;

import java.util.Collection;

public class CsvParsing {

public static final byte BACKSLASH = '\\';
public static final byte COMMA = ',';
public static final byte SEMI_COLON = ';';
public static final byte COLON = ':';
public static final byte TAB = '\t';
public static final byte DOUBLE_QUOTE = '"';

public static Flow<ByteString, Collection<ByteString>, NotUsed> lineScanner() {
return lineScanner(COMMA, DOUBLE_QUOTE, BACKSLASH);
}

public static Flow<ByteString, Collection<ByteString>, NotUsed> lineScanner(byte delimiter, byte quoteChar, byte escapeChar) {
return akka.stream.alpakka.csv.scaladsl.CsvParsing
.lineScanner(delimiter, quoteChar, escapeChar).asJava()
.map(JavaConversions::asJavaCollection);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.csv.javadsl;

/**
* Select which fields to quote in CSV formatting.
*/
public enum CsvQuotingStyle {
/** Quote all fields */
ALWAYS,

/** Quote only fields requiring quotes */
REQUIRED;
}
46 changes: 46 additions & 0 deletions csv/src/main/java/akka/stream/alpakka/csv/javadsl/CsvToMap.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.csv.javadsl;


import akka.stream.alpakka.csv.CsvToMapJavaStage;
import akka.stream.javadsl.Flow;
import akka.util.ByteString;
import scala.Option;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;

class CsvToMap {

/**
* A flow translating incoming {@link Collection<ByteString>} to a {@link Map<String, ByteString>} using the streams first
* element's values as keys. The charset to decode [[ByteString]] to [[String]] defaults to UTF-8.
*/
public static Flow<Collection<ByteString>, Map<String, ByteString>, ?> toMap() {
return toMap(StandardCharsets.UTF_8);
}

/**
* A flow translating incoming {@link Collection<ByteString>} to a {@link Map<String, ByteString>} using the streams first
* element's values as keys.
* @param charset the charset to decode {@link ByteString} to {@link String}
*/
public static Flow<Collection<ByteString>, Map<String, ByteString>, ?> toMap(Charset charset) {
return Flow.fromGraph(new CsvToMapJavaStage(Optional.empty(), charset));
}

/**
* A flow translating incoming {@link Collection<ByteString>} to a {@link Map<String, ByteString>} using the given headers
* as keys.
* @param headers column names to be used as map keys
*/
public static Flow<Collection<ByteString>, Map<String, ByteString>, ?> withHeaders(String... headers) {
return Flow.fromGraph(new CsvToMapJavaStage(Optional.of(Arrays.asList(headers)), StandardCharsets.UTF_8));
}
}
116 changes: 116 additions & 0 deletions csv/src/main/scala/akka/stream/alpakka/csv/CsvFormatter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.csv

import java.nio.charset.{Charset, StandardCharsets}

import akka.stream.alpakka.csv.scaladsl.CsvQuotingStyle
import akka.util.ByteString

import scala.collection.immutable

/**
* Internal API
*/
private[csv] class CsvFormatter(delimiter: Char,
quoteChar: Char,
escapeChar: Char,
endOfLine: String,
quotingStyle: CsvQuotingStyle,
charset: Charset = StandardCharsets.UTF_8) {

private[this] val charsetName = charset.name()

private[this] val delimiterBs = ByteString(String.valueOf(delimiter), charsetName)
private[this] val quoteBs = ByteString(String.valueOf(quoteChar), charsetName)
private[this] val duplicatedQuote = ByteString(String.valueOf(Array(quoteChar, quoteChar)), charsetName)
private[this] val duplicatedEscape = ByteString(String.valueOf(Array(escapeChar, escapeChar)), charsetName)
private[this] val endOfLineBs = ByteString(endOfLine, charsetName)

def toCsv(fields: immutable.Iterable[Any]): ByteString =
if (fields.nonEmpty) nonEmptyToCsv(fields)
else endOfLineBs

private def nonEmptyToCsv(fields: immutable.Iterable[Any]) = {
val builder = ByteString.createBuilder

def splitAndDuplicateQuotesAndEscapes(field: String, splitAt: Int) = {

@inline def indexOfQuoteOrEscape(lastIndex: Int) = {
var index = lastIndex
var found = -1
while (index < field.length && found == -1) {
val char = field(index)
if (char == quoteChar || char == escapeChar) found = index
index += 1
}
found
}

var lastIndex = 0
var index = splitAt
while (index > -1) {
builder ++= ByteString.apply(field.substring(lastIndex, index), charsetName)
val char = field.charAt(index)
if (char == quoteChar) {
builder ++= duplicatedQuote
} else {
builder ++= duplicatedEscape
}
lastIndex = index + 1
index = indexOfQuoteOrEscape(lastIndex)
}
if (lastIndex < field.length) {
builder ++= ByteString(field.substring(lastIndex), charsetName)
}
}

def append(field: String) = {
val (quoteIt, splitAt) = requiresQuotesOrSplit(field)
if (quoteIt || quotingStyle == CsvQuotingStyle.Always) {
builder ++= quoteBs
if (splitAt != -1) {
splitAndDuplicateQuotesAndEscapes(field, splitAt)
} else {
builder ++= ByteString(field, charsetName)
}
builder ++= quoteBs
} else {
builder ++= ByteString(field, charsetName)
}
}

val iterator = fields.iterator
var hasNext = iterator.hasNext
while (hasNext) {
val next = iterator.next()
if (next != null) {
append(next.toString)
}
hasNext = iterator.hasNext
if (hasNext) {
builder ++= delimiterBs
}
}
builder ++= endOfLineBs
builder.result()
}

private def requiresQuotesOrSplit(field: String): (Boolean, Int) = {
var quotes = CsvQuotingStyle.Always == quotingStyle
var split = -1
var index = 0
while (index < field.length && !(quotes && split != -1)) {
val char = field(index)
if (char == `quoteChar` || char == `escapeChar`) {
quotes = true
split = index
} else if (char == '\r' || char == '\n' || char == `delimiter`) {
quotes = true
}
index += 1
}
(quotes, split)
}
}
Loading

0 comments on commit b062d15

Please sign in to comment.