Skip to content

Commit

Permalink
Merge pull request akka#1890 from michalbogacz/file-zip-support
Browse files Browse the repository at this point in the history
Added flow for creating file ZIP archive
  • Loading branch information
2m authored Aug 27, 2019
2 parents c473198 + 2e8b971 commit e48dd81
Show file tree
Hide file tree
Showing 12 changed files with 555 additions and 0 deletions.
16 changes: 16 additions & 0 deletions docs/src/main/paradox/file.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,19 @@ Java
### Example: content-based rotation with compression to SFTP file

This example can be found in the @ref:[self-contained example documentation section](examples/ftp-samples.md#example-rotate-data-stream-over-to-multiple-compressed-files-on-sftp-server).

## ZIP Archive

The @scala[@scaladoc[Archive](akka.stream.alpakka.file.scaladsl.Archive$)] @java[@scaladoc[Archive](akka.stream.alpakka.file.javadsl.Archive$)]
contains flow for compressing multiple files into one ZIP file.

Result of flow can be send to sink even before whole ZIP file is created, so size of resulting ZIP archive
is not limited to memory size.

This example usage shows compressing files from disk.

Scala
: @@snip [snip](/file/src/test/scala/docs/scaladsl/ArchiveSpec.scala) { #sample }

Java
: @@snip [snip](/file/src/test/java/docs/javadsl/ArchiveTest.java) { #sample }
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.file.impl.archive

import akka.annotation.InternalApi
import akka.util.ByteString

/**
* INTERNAL API
*
* ArchiveZipFlow operates on ByteString. But it is required to inform ZipOutputStream when each file starts and ends.
* For this, special starting and ending ByteString is added.
*/
@InternalApi private[file] object FileByteStringSeparators {
private val startFileWord = "$START$"
private val endFileWord = "$END$"
private val separator: Char = '|'

def createStartingByteString(path: String): ByteString =
ByteString(s"$startFileWord$separator$path")

def createEndingByteString(): ByteString =
ByteString(endFileWord)

def isStartingByteString(b: ByteString): Boolean =
b.utf8String.startsWith(startFileWord)

def isEndingByteString(b: ByteString): Boolean =
b.utf8String == endFileWord

def getPathFromStartingByteString(b: ByteString): String = {
val splitted = b.utf8String.split(separator)
if (splitted.length == 1) {
""
} else if (splitted.length == 2) {
splitted.tail.head
} else {
splitted.tail.mkString(separator.toString)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.file.impl.archive

import java.util.zip.{ZipEntry, ZipOutputStream}

import akka.annotation.InternalApi
import akka.event.Logging
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.{ByteString, ByteStringBuilder}

/**
* INTERNAL API
*/
@InternalApi private[file] final class ZipArchiveFlowStage(
val shape: FlowShape[ByteString, ByteString]
) extends GraphStageLogic(shape) {

private def in = shape.in
private def out = shape.out

private val builder = new ByteStringBuilder()
private val zip = new ZipOutputStream(builder.asOutputStream)
private var emptyStream = true

setHandler(
out,
new OutHandler {
override def onPull(): Unit =
if (isClosed(in)) {
emptyStream = true
completeStage()
} else {
pull(in)
}
}
)

setHandler(
in,
new InHandler {
override def onPush(): Unit = {
emptyStream = false
val element = grab(in)
element match {
case b: ByteString if FileByteStringSeparators.isStartingByteString(b) =>
val name = FileByteStringSeparators.getPathFromStartingByteString(b)
zip.putNextEntry(new ZipEntry(name))
case b: ByteString if FileByteStringSeparators.isEndingByteString(b) =>
zip.closeEntry()
case b: ByteString =>
val array = b.toArray
zip.write(array, 0, array.length)
}
zip.flush()
val result = builder.result
if (result.nonEmpty) {
builder.clear()
push(out, result)
} else {
pull(in)
}
}

override def onUpstreamFinish(): Unit = {
if (!emptyStream) {
zip.close()
push(out, builder.result)
builder.clear()
}
super.onUpstreamFinish()
}

}
)

}

/**
* INTERNAL API
*/
@InternalApi private[file] final class ZipArchiveFlow extends GraphStage[FlowShape[ByteString, ByteString]] {

val in: Inlet[ByteString] = Inlet(Logging.simpleName(this) + ".in")
val out: Outlet[ByteString] = Outlet(Logging.simpleName(this) + ".out")

override def initialAttributes: Attributes =
Attributes.name(Logging.simpleName(this))

override val shape: FlowShape[ByteString, ByteString] = FlowShape(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new ZipArchiveFlowStage(shape)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.file.impl.archive

import akka.NotUsed
import akka.annotation.InternalApi
import akka.stream.alpakka.file.ArchiveMetadata
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString

/**
* INTERNAL API
*/
@InternalApi private[file] object ZipArchiveManager {

def zipFlow(): Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed] = {
val archiveZipFlow = new ZipArchiveFlow()
Flow[(ArchiveMetadata, Source[ByteString, Any])]
.flatMapConcat {
case (metadata, stream) =>
val prependElem = Source.single(FileByteStringSeparators.createStartingByteString(metadata.filePath))
val appendElem = Source.single(FileByteStringSeparators.createEndingByteString())
stream.prepend(prependElem).concat(appendElem)
}
.via(archiveZipFlow)
}

}
31 changes: 31 additions & 0 deletions file/src/main/scala/akka/stream/alpakka/file/javadsl/Archive.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.file.javadsl

import akka.NotUsed
import akka.stream.alpakka.file.{scaladsl, ArchiveMetadata}
import akka.stream.javadsl.Flow
import akka.util.ByteString
import akka.japi.Pair
import akka.stream.javadsl.Source

/**
* Java API.
*/
object Archive {

/**
* Flow for compressing multiple files into one ZIP file.
*/
def zip(): Flow[Pair[ArchiveMetadata, Source[ByteString, NotUsed]], ByteString, NotUsed] =
Flow
.create[Pair[ArchiveMetadata, Source[ByteString, NotUsed]]]()
.map(func(pair => (pair.first, pair.second.asScala)))
.via(scaladsl.Archive.zip().asJava)

private def func[T, R](f: T => R) = new akka.japi.function.Function[T, R] {
override def apply(param: T): R = f(param)
}
}
14 changes: 14 additions & 0 deletions file/src/main/scala/akka/stream/alpakka/file/model.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.file

final class ArchiveMetadata private (
val filePath: String
)

object ArchiveMetadata {
def apply(filePath: String): ArchiveMetadata = new ArchiveMetadata(filePath)
def create(filePath: String): ArchiveMetadata = new ArchiveMetadata(filePath)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream.alpakka.file.scaladsl

import akka.NotUsed
import akka.stream.alpakka.file.ArchiveMetadata
import akka.stream.alpakka.file.impl.archive.ZipArchiveManager
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString

/**
* Scala API.
*/
object Archive {

/**
* Flow for compressing multiple files into one ZIP file.
*/
def zip(): Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed] =
ZipArchiveManager.zipFlow()

}
35 changes: 35 additions & 0 deletions file/src/test/java/docs/javadsl/ArchiveHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (C) 2016-2019 Lightbend Inc. <http://www.lightbend.com>
*/

package docs.javadsl;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.nio.file.Path;
import java.util.List;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

public class ArchiveHelper {

public void createReferenceZipFile(List<Path> inputFilePaths, String resultFileName)
throws Exception {
FileOutputStream fout = new FileOutputStream(resultFileName);
ZipOutputStream zout = new ZipOutputStream(fout);
for (Path inputFilePath : inputFilePaths) {
File fileToZip = inputFilePath.toFile();
FileInputStream fis = new FileInputStream(fileToZip);
ZipEntry zipEntry = new ZipEntry(fileToZip.getName());
zout.putNextEntry(zipEntry);
byte[] bytes = new byte[1024];
int length;
while ((length = fis.read(bytes)) >= 0) {
zout.write(bytes, 0, length);
}
fis.close();
}
zout.close();
}
}
Loading

0 comments on commit e48dd81

Please sign in to comment.