Skip to content

Commit

Permalink
XML subtree flow
Browse files Browse the repository at this point in the history
  • Loading branch information
takezoe committed Jul 8, 2018
1 parent acf6ba6 commit 789c91c
Show file tree
Hide file tree
Showing 6 changed files with 354 additions and 1 deletion.
117 changes: 117 additions & 0 deletions xml/src/main/scala/akka/stream/alpakka/xml/Xml.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ package akka.stream.alpakka.xml

import java.nio.charset.Charset
import java.util.Optional
import javax.xml.parsers.DocumentBuilderFactory
import javax.xml.stream.XMLOutputFactory

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.{ByteString, ByteStringBuilder}
import com.fasterxml.aalto.stax.InputFactoryImpl
import com.fasterxml.aalto.{AsyncByteArrayFeeder, AsyncXMLInputFactory, AsyncXMLStreamReader}
import org.w3c.dom.Element

import scala.annotation.tailrec
import scala.collection.immutable
Expand Down Expand Up @@ -505,4 +507,119 @@ object Xml {
}
}

/**
* Internal API
*/
private[xml] class Subtree(path: immutable.Seq[String]) extends GraphStage[FlowShape[ParseEvent, Element]] {
val in: Inlet[ParseEvent] = Inlet("XMLSubtree.in")
val out: Outlet[Element] = Outlet("XMLSubtree.out")
override val shape: FlowShape[ParseEvent, Element] = FlowShape(in, out)

private val doc = DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument()

private def createElement(start: StartElement): Element = {
val element = start.namespace match {
case Some(ns) => doc.createElementNS(start.localName, ns)
case None => doc.createElement(start.localName)
}
start.attributes.foreach {
case (name, value) =>
element.setAttribute(name, value)
}
element
}

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler {
private var expected = path.toList
private var matchedSoFar: List[String] = Nil
private var elementStack: List[Element] = Nil

override def onPull(): Unit = pull(in)

val matching: InHandler = new InHandler {

override def onPush(): Unit = grab(in) match {
case start: StartElement =>
val element = createElement(start)
elementStack.headOption.foreach { head =>
head.appendChild(element)
}
elementStack = element :: elementStack
pull(in)
case end: EndElement =>
elementStack match {
case head :: Nil =>
expected = matchedSoFar.head :: Nil
matchedSoFar = matchedSoFar.tail
push(out, head)
elementStack = Nil
setHandler(in, partialMatch)
case _ =>
elementStack = elementStack.tail
pull(in)
}
case cdata: CData =>
elementStack.headOption.foreach { element =>
element.appendChild(doc.createCDATASection(cdata.text))
}
pull(in)
case text: TextEvent =>
elementStack.headOption.foreach { element =>
element.appendChild(doc.createTextNode(text.text))
}
pull(in)
case other =>
pull(in)
}
}

if (path.isEmpty) setHandler(in, matching) else setHandler(in, partialMatch)
setHandler(out, this)

lazy val partialMatch: InHandler = new InHandler {

override def onPush(): Unit = grab(in) match {
case start: StartElement =>
if (start.localName == expected.head) {
matchedSoFar = expected.head :: matchedSoFar
expected = expected.tail
if (expected.isEmpty) {
val element = createElement(start)
elementStack = element :: Nil
setHandler(in, matching)
}
} else {
setHandler(in, noMatch)
}
pull(in)
case EndElement(name) =>
expected = matchedSoFar.head :: expected
matchedSoFar = matchedSoFar.tail
pull(in)
case other =>
pull(in)
}

}

lazy val noMatch: InHandler = new InHandler {
var depth = 0

override def onPush(): Unit = grab(in) match {
case start: StartElement =>
depth += 1
pull(in)
case end: EndElement =>
if (depth == 0) setHandler(in, partialMatch)
else depth -= 1
pull(in)
case other =>
pull(in)
}
}

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import akka.NotUsed
import akka.stream.alpakka.xml
import akka.stream.alpakka.xml.ParseEvent
import akka.util.ByteString
import org.w3c.dom.Element

import scala.collection.JavaConverters._

Expand All @@ -33,4 +34,11 @@ object XmlParsing {
*/
def subslice(path: java.util.Collection[String]): akka.stream.javadsl.Flow[ParseEvent, ParseEvent, NotUsed] =
xml.scaladsl.XmlParsing.subslice(path.asScala.map(identity)(collection.breakOut)).asJava

/**
* A Flow that transforms a stream of XML ParseEvents. This stage pushes elements of a certain path in
* the XML document as org.w3c.dom.Element.
*/
def subtree(path: java.util.Collection[String]): akka.stream.javadsl.Flow[ParseEvent, Element, NotUsed] =
xml.scaladsl.XmlParsing.subtree(path.asScala.map(identity)(collection.breakOut)).asJava
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import akka.stream.alpakka.xml.ParseEvent
import akka.stream.alpakka.xml.Xml._
import akka.stream.scaladsl.Flow
import akka.util.ByteString
import org.w3c.dom.Element

import scala.collection.immutable

Expand All @@ -34,4 +35,12 @@ object XmlParsing {
*/
def subslice(path: immutable.Seq[String]): Flow[ParseEvent, ParseEvent, NotUsed] =
Flow.fromGraph(new Subslice(path))

/**
* A Flow that transforms a stream of XML ParseEvents. This stage pushes elements of a certain path in
* the XML document as org.w3c.dom.Element.
*/
def subtree(path: immutable.Seq[String]): Flow[ParseEvent, Element, NotUsed] =
Flow.fromGraph(new Subtree(path))

}
34 changes: 34 additions & 0 deletions xml/src/test/java/akka/stream/alpakka/xml/javadsl/XmlHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.xml.javadsl;

import org.w3c.dom.Document;
import org.w3c.dom.Node;

import javax.xml.transform.*;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import java.io.StringWriter;

public class XmlHelper {

public static String asString(Node node) {
StringWriter writer = new StringWriter();
try {
Transformer trans = TransformerFactory.newInstance().newTransformer();
trans.setOutputProperty(OutputKeys.INDENT, "no");
trans.setOutputProperty(OutputKeys.VERSION, "1.0");
if (!(node instanceof Document)) {
trans.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
}
trans.transform(new DOMSource(node), new StreamResult(writer));
} catch (final TransformerConfigurationException ex) {
throw new IllegalStateException(ex);
} catch (final TransformerException ex) {
throw new IllegalArgumentException(ex);
}
return writer.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.w3c.dom.Element;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -22,6 +23,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static org.hamcrest.CoreMatchers.hasItems;
import static org.junit.Assert.assertThat;
Expand Down Expand Up @@ -86,7 +88,7 @@ public void xmlSubslice() throws InterruptedException, ExecutionException, Timeo
+ " <elem>"
+ " <item>i1</item>"
+ " <item><sub>i2</sub></item>"
+ " <item>i3</item>"
+ " <item>i3</item>"
+ " </elem>"
+ "</doc>";
final CompletionStage<List<ParseEvent>> resultStage =
Expand All @@ -109,6 +111,42 @@ public void xmlSubslice() throws InterruptedException, ExecutionException, Timeo
.get(5, TimeUnit.SECONDS);
}

@Test
public void xmlSubtree() throws InterruptedException, ExecutionException, TimeoutException {

// #subtree
final Sink<String, CompletionStage<List<Element>>> parse =
Flow.<String>create()
.map(ByteString::fromString)
.via(XmlParsing.parser())
.via(XmlParsing.subtree(Arrays.asList("doc", "elem", "item")))
.toMat(Sink.seq(), Keep.right());
// #subtree

// #subtree-usage
final String doc =
"<doc>"
+ " <elem>"
+ " <item>i1</item>"
+ " <item><sub>i2</sub></item>"
+ " <item>i3</item>"
+ " </elem>"
+ "</doc>";
final CompletionStage<List<Element>> resultStage =
Source.single(doc).runWith(parse, materializer);
// #subtree-usage

resultStage
.thenAccept(
(list) -> {
assertThat(
list.stream().map(e -> XmlHelper.asString(e).trim()).collect(Collectors.toList()),
hasItems("<item>i1</item>", "<item><sub>i2</sub></item>", "<item>i3</item>"));
})
.toCompletableFuture()
.get(5, TimeUnit.SECONDS);
}

@BeforeClass
public static void setup() throws Exception {
system = ActorSystem.create();
Expand Down
Loading

0 comments on commit 789c91c

Please sign in to comment.