Skip to content

Commit

Permalink
expose file open options in FileIO.toFile akka#19635
Browse files Browse the repository at this point in the history
  • Loading branch information
onsails authored and rkuhn committed Feb 16, 2016
1 parent 2bb45ce commit 5a5f52c
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers {
(classOf[scala.collection.immutable.Iterable[_]], classOf[java.lang.Iterable[_]]) ::
(classOf[scala.collection.Iterator[_]], classOf[java.util.Iterator[_]]) ::
(classOf[scala.collection.Seq[_]], classOf[java.util.List[_]]) ::
(classOf[scala.collection.immutable.Set[_]], classOf[java.util.Set[_]]) ::
(classOf[Boolean], classOf[akka.stream.javadsl.AsPublisher]) ::
(classOf[scala.Function0[_]], classOf[akka.japi.function.Creator[_]]) ::
(classOf[scala.Function0[_]], classOf[java.util.concurrent.Callable[_]]) ::
Expand Down
17 changes: 15 additions & 2 deletions akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package akka.stream.io

import java.io.File
import java.nio.file.StandardOpenOption

import akka.actor.ActorSystem
import akka.stream.impl.ActorMaterializerImpl
Expand Down Expand Up @@ -52,6 +53,17 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
}
}

"create new file if not exists" in assertAllStagesStopped {
targetFile({ f
val completion = Source(TestByteStrings)
.runWith(FileIO.toFile(f))

val result = Await.result(completion, 3.seconds)
result.count should equal(6006)
checkFileContents(f, TestLines.mkString(""))
}, create = false)
}

"by default write into existing file" in assertAllStagesStopped {
targetFile { f
def write(lines: List[String]) =
Expand All @@ -76,7 +88,7 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
def write(lines: List[String] = TestLines) =
Source(lines)
.map(ByteString(_))
.runWith(FileIO.toFile(f, append = true))
.runWith(FileIO.toFile(f, Set(StandardOpenOption.APPEND)))

val completion1 = write()
val result1 = Await.result(completion1, 3.seconds)
Expand Down Expand Up @@ -129,8 +141,9 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {

}

private def targetFile(block: File Unit) {
private def targetFile(block: File Unit, create: Boolean = true) {
val targetFile = File.createTempFile("synchronous-file-sink", ".tmp")
if (!create) targetFile.delete()
try block(targetFile) finally targetFile.delete()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,28 @@ package akka.stream.impl.io

import java.io.File
import java.nio.channels.FileChannel
import java.util.Collections
import java.nio.file.StandardOpenOption

import akka.Done
import akka.actor.{ Deploy, ActorLogging, Props }
import akka.stream.io.IOResult
import akka.stream.actor.{ ActorSubscriberMessage, WatermarkRequestStrategy }
import akka.util.ByteString

import scala.collection.JavaConverters._
import scala.concurrent.Promise
import scala.util.{ Failure, Success }

/** INTERNAL API */
private[akka] object FileSubscriber {
def props(f: File, completionPromise: Promise[IOResult], bufSize: Int, append: Boolean) = {
def props(f: File, completionPromise: Promise[IOResult], bufSize: Int, openOptions: Set[StandardOpenOption]) = {
require(bufSize > 0, "buffer size must be > 0")
Props(classOf[FileSubscriber], f, completionPromise, bufSize, append).withDeploy(Deploy.local)
Props(classOf[FileSubscriber], f, completionPromise, bufSize, openOptions).withDeploy(Deploy.local)
}

import java.nio.file.StandardOpenOption._
val Write = Collections.singleton(WRITE)
val Append = Collections.singleton(APPEND)
}

/** INTERNAL API */
private[akka] class FileSubscriber(f: File, completionPromise: Promise[IOResult], bufSize: Int, append: Boolean)
private[akka] class FileSubscriber(f: File, completionPromise: Promise[IOResult], bufSize: Int, openOptions: Set[StandardOpenOption])
extends akka.stream.actor.ActorSubscriber
with ActorLogging {

Expand All @@ -40,8 +37,7 @@ private[akka] class FileSubscriber(f: File, completionPromise: Promise[IOResult]
private var bytesWritten: Long = 0

override def preStart(): Unit = try {
val openOptions = if (append) FileSubscriber.Append else FileSubscriber.Write
chan = FileChannel.open(f.toPath, openOptions)
chan = FileChannel.open(f.toPath, openOptions.asJava)

super.preStart()
} catch {
Expand Down
9 changes: 5 additions & 4 deletions akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package akka.stream.impl.io

import java.io.{ File, OutputStream }
import java.nio.file.StandardOpenOption
import akka.stream.io.IOResult
import akka.stream.impl.SinkModule
import akka.stream.impl.StreamLayout.Module
Expand All @@ -18,26 +19,26 @@ import scala.concurrent.{ Future, Promise }
* Creates simple synchronous (Java 6 compatible) Sink which writes all incoming elements to the given file
* (creating it before hand if necessary).
*/
private[akka] final class FileSink(f: File, append: Boolean, val attributes: Attributes, shape: SinkShape[ByteString])
private[akka] final class FileSink(f: File, options: Set[StandardOpenOption], val attributes: Attributes, shape: SinkShape[ByteString])
extends SinkModule[ByteString, Future[IOResult]](shape) {

override def create(context: MaterializationContext) = {
val materializer = ActorMaterializer.downcast(context.materializer)
val settings = materializer.effectiveSettings(context.effectiveAttributes)

val ioResultPromise = Promise[IOResult]()
val props = FileSubscriber.props(f, ioResultPromise, settings.maxInputBufferSize, append)
val props = FileSubscriber.props(f, ioResultPromise, settings.maxInputBufferSize, options)
val dispatcher = context.effectiveAttributes.get[Dispatcher](IODispatcher).dispatcher

val ref = materializer.actorOf(context, props.withDispatcher(dispatcher))
(akka.stream.actor.ActorSubscriber[ByteString](ref), ioResultPromise.future)
}

override protected def newInstance(shape: SinkShape[ByteString]): SinkModule[ByteString, Future[IOResult]] =
new FileSink(f, append, attributes, shape)
new FileSink(f, options, attributes, shape)

override def withAttributes(attr: Attributes): Module =
new FileSink(f, append, attr, amendShape(attr))
new FileSink(f, options, attr, amendShape(attr))
}

/**
Expand Down
23 changes: 13 additions & 10 deletions akka-stream/src/main/scala/akka/stream/javadsl/FileIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,25 @@
*/
package akka.stream.javadsl

import java.io.{ InputStream, OutputStream, File }
import akka.japi.function
import java.io.File
import java.nio.file.StandardOpenOption
import java.nio.file.StandardOpenOption._
import java.util
import akka.stream.{ scaladsl, javadsl, ActorAttributes }
import akka.stream.io.IOResult
import akka.util.ByteString
import java.util.concurrent.CompletionStage

import scala.collection.JavaConverters._

/**
* Factories to create sinks and sources from files
*/
object FileIO {

/**
* Creates a Sink that writes incoming [[ByteString]] elements to the given file.
* Overwrites existing files, if you want to append to an existing file use [[#file(File, Boolean)]] and
* pass in `true` as the Boolean argument.
* Overwrites existing files, if you want to append to an existing file use [[#file(File, util.Set[StandardOpenOption])]].
*
* Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion,
* and a possible exception if IO operation was not completed successfully.
Expand All @@ -28,11 +31,11 @@ object FileIO {
*
* @param f The file to write to
*/
def toFile(f: File): javadsl.Sink[ByteString, CompletionStage[IOResult]] = toFile(f, append = false)
def toFile(f: File): javadsl.Sink[ByteString, CompletionStage[IOResult]] =
new Sink(scaladsl.FileIO.toFile(f).toCompletionStage())

/**
* Creates a Sink that writes incoming [[ByteString]] elements to the given file and either overwrites
* or appends to it.
* Creates a Sink that writes incoming [[ByteString]] elements to the given file
*
* Materializes a [[java.util.concurrent.CompletionStage]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion,
* and a possible exception if IO operation was not completed successfully.
Expand All @@ -41,10 +44,10 @@ object FileIO {
* set it for a given Source by using [[ActorAttributes]].
*
* @param f The file to write to
* @param append Whether or not the file should be overwritten or appended to
* @param options File open options
*/
def toFile(f: File, append: Boolean): javadsl.Sink[ByteString, CompletionStage[IOResult]] =
new Sink(scaladsl.FileIO.toFile(f, append).toCompletionStage())
def toFile(f: File, options: util.Set[StandardOpenOption]): javadsl.Sink[ByteString, CompletionStage[IOResult]] =
new Sink(scaladsl.FileIO.toFile(f, options.asScala.toSet).toCompletionStage())

/**
* Creates a Source from a Files contents.
Expand Down
22 changes: 12 additions & 10 deletions akka-stream/src/main/scala/akka/stream/scaladsl/FileIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,25 @@
*/
package akka.stream.scaladsl

import java.io.{ OutputStream, InputStream, File }
import java.io.File
import java.nio.file.StandardOpenOption
import java.nio.file.StandardOpenOption._

import akka.stream.ActorAttributes
import akka.stream.io.IOResult
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.io._
import akka.stream.io.IOResult
import akka.util.ByteString

import scala.concurrent.Future
import scala.concurrent.duration._

/**
* Java API: Factories to create sinks and sources from files
*/
object FileIO {

import Source.{ shape sourceShape }
import Sink.{ shape sinkShape }
import Source.{ shape sourceShape }

/**
* Creates a Source from a Files contents.
Expand All @@ -33,23 +34,24 @@ object FileIO {
* It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
* and a possible exception if IO operation was not completed successfully.
*
* @param f the File to read from
* @param f the File to read from
* @param chunkSize the size of each read operation, defaults to 8192
*/
def fromFile(f: File, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] =
new Source(new FileSource(f, chunkSize, DefaultAttributes.fileSource, sourceShape("FileSource")))

/**
* Creates a Sink which writes incoming [[ByteString]] elements to the given file and either overwrites
* or appends to it.
* Creates a Sink which writes incoming [[ByteString]] elements to the given file. Overwrites existing files by default.
*
* Materializes a [[Future]] of [[IOResult]] that will be completed with the size of the file (in bytes) at the streams completion,
* and a possible exception if IO operation was not completed successfully.
*
* This source is backed by an Actor which will use the dedicated `akka.stream.blocking-io-dispatcher`,
* unless configured otherwise by using [[ActorAttributes]].
*
* @param f the File to write to
* @param options File open options, defaults to Set(WRITE, CREATE)
*/
def toFile(f: File, append: Boolean = false): Sink[ByteString, Future[IOResult]] =
new Sink(new FileSink(f, append, DefaultAttributes.fileSink, sinkShape("FileSink")))

def toFile(f: File, options: Set[StandardOpenOption] = Set(WRITE, CREATE)): Sink[ByteString, Future[IOResult]] =
new Sink(new FileSink(f, options, DefaultAttributes.fileSink, sinkShape("FileSink")))
}

0 comments on commit 5a5f52c

Please sign in to comment.