Skip to content

Commit

Permalink
Fixes for list object versions result (akka#2751)
Browse files Browse the repository at this point in the history
* Make Owner fields optional for ListObjectVersionsResult
* Make VersionId optional by detecting null
* Deleting versions with S3.deleteBucketContents is now optional
* Fix S3 Javadoc comments to use `[[akka.Done]]`
  • Loading branch information
mdedetrich authored Oct 13, 2021
1 parent d290e5d commit 3f1b06f
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,10 @@ import scala.xml.NodeSeq
val id = (o \ "ID").text
val displayName = (o \ "DisplayName").text
AWSIdentity(id, displayName)
}.head
}.headOption
val size = (v \\ "Size").text.toLong
val storageClass = (v \ "StorageClass").text
val versionId = (v \ "VersionId").text
val versionId = (v \ "VersionId").headOption.flatMap(x => Utils.emptyStringToOption(x.text))
ListObjectVersionsResultVersions(eTag, isLatest, key, lastModified, owner, size, storageClass, versionId)
}

Expand All @@ -245,8 +245,8 @@ import scala.xml.NodeSeq
val id = (o \ "ID").text
val displayName = (o \ "DisplayName").text
AWSIdentity(id, displayName)
}.head
val versionId = (d \ "VersionId").text
}.headOption
val versionId = (d \ "VersionId").headOption.flatMap(x => Utils.emptyStringToOption(x.text))
DeleteMarkers(isLatest, key, lastModified, owner, versionId)
}

Expand Down
16 changes: 11 additions & 5 deletions s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -598,13 +598,18 @@ import scala.util.{Failure, Success, Try}
}
.mapMaterializedValue(_ => NotUsed)

def deleteObjectsByPrefix(bucket: String, prefix: Option[String], s3Headers: S3Headers): Source[Done, NotUsed] =
listBucket(bucket, prefix, s3Headers)
def deleteObjectsByPrefix(bucket: String,
prefix: Option[String],
deleteAllVersions: Boolean,
s3Headers: S3Headers): Source[Done, NotUsed] = {
val baseDelete = listBucket(bucket, prefix, s3Headers)
.flatMapConcat(
listBucketResultContents =>
deleteObject(S3Location(bucket, listBucketResultContents.key), versionId = None, s3Headers)
)
.flatMapConcat { _ =>

if (deleteAllVersions)
baseDelete.flatMapConcat { _ =>
listObjectVersions(bucket, prefix, s3Headers).flatMapConcat {
case (versions, deleteMarkers) =>
val allVersions =
Expand All @@ -613,12 +618,13 @@ import scala.util.{Failure, Success, Try}
.zipN(
allVersions.map {
case (key, versionId) =>
deleteObject(S3Location(bucket, key), versionId = Some(versionId), s3Headers)
deleteObject(S3Location(bucket, key), versionId = versionId, s3Headers)
}
)
.map(_ => Done)
}
}
} else baseDelete
}

def putObject(s3Location: S3Location,
contentType: ContentType,
Expand Down
66 changes: 56 additions & 10 deletions s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3.scala
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ object S3 {
*
* @param bucket the s3 bucket name
* @param key the s3 object key
* @return A [[akka.stream.javadsl.Source Source]] that will emit [[java.lang.Void]] when operation is completed
* @return A [[akka.stream.javadsl.Source Source]] that will emit [[akka.Done]] when operation is completed
*/
def deleteObject(bucket: String, key: String): Source[Done, NotUsed] =
deleteObject(bucket, key, Optional.empty(), S3Headers.empty)
Expand All @@ -152,7 +152,7 @@ object S3 {
* @param bucket the s3 bucket name
* @param key the s3 object key
* @param versionId optional version id of the object
* @return A [[akka.stream.javadsl.Source Source]] that will emit [[java.lang.Void]] when operation is completed
* @return A [[akka.stream.javadsl.Source Source]] that will emit [[akka.Done]] when operation is completed
*/
def deleteObject(bucket: String, key: String, versionId: Optional[String]): Source[Done, NotUsed] =
deleteObject(bucket, key, versionId, S3Headers.empty)
Expand All @@ -164,7 +164,7 @@ object S3 {
* @param key the s3 object key
* @param versionId optional version id of the object
* @param s3Headers any headers you want to add
* @return A [[akka.stream.javadsl.Source Source]] that will emit [[java.lang.Void]] when operation is completed
* @return A [[akka.stream.javadsl.Source Source]] that will emit [[akka.Done]] when operation is completed
*/
def deleteObject(bucket: String,
key: String,
Expand All @@ -179,44 +179,90 @@ object S3 {
* Deletes all keys under the specified bucket
*
* @param bucket the s3 bucket name
* @return A [[akka.stream.javadsl.Source Source]] that will emit [[java.lang.Void]] when operation is completed
* @return A [[akka.stream.javadsl.Source Source]] that will emit [[akka.Done]] when operation is completed
*/
def deleteObjectsByPrefix(bucket: String): Source[Done, NotUsed] =
deleteObjectsByPrefix(bucket, Optional.empty(), S3Headers.empty)
deleteObjectsByPrefix(bucket, Optional.empty(), deleteAllVersions = false, S3Headers.empty)

/**
* Deletes all keys under the specified bucket
*
* @param bucket the s3 bucket name
* @param deleteAllVersions Whether to delete all object versions as well (applies to versioned buckets)
* @return A [[akka.stream.javadsl.Source Source]] that will emit [[akka.Done]] when operation is completed
*/
def deleteObjectsByPrefix(bucket: String, deleteAllVersions: Boolean): Source[Done, NotUsed] =
deleteObjectsByPrefix(bucket, Optional.empty(), deleteAllVersions, S3Headers.empty)

/**
* Deletes all keys which have the given prefix under the specified bucket
*
* @param bucket the s3 bucket name
* @param prefix optional s3 objects prefix
* @return A [[akka.stream.javadsl.Source Source]] that will emit [[java.lang.Void]] when operation is completed
* @return A [[akka.stream.javadsl.Source Source]] that will emit [[akka.Done]] when operation is completed
*/
def deleteObjectsByPrefix(bucket: String, prefix: Optional[String]): Source[Done, NotUsed] =
deleteObjectsByPrefix(bucket, prefix, S3Headers.empty)

/**
* Deletes all keys which have the given prefix under the specified bucket
*
* @param bucket the s3 bucket name
* @param prefix optional s3 objects prefix
* @return A [[akka.stream.javadsl.Source Source]] that will emit [[akka.Done]] when operation is completed
*/
def deleteObjectsByPrefix(bucket: String,
prefix: Optional[String],
deleteAllVersions: Boolean): Source[Done, NotUsed] =
deleteObjectsByPrefix(bucket, prefix, deleteAllVersions, S3Headers.empty)

/**
* Deletes all keys which have the given prefix under the specified bucket
*
* @param bucket the s3 bucket name
* @param prefix optional s3 objects prefix
* @param s3Headers any headers you want to add
* @return A [[akka.stream.javadsl.Source Source]] that will emit [[java.lang.Void]] when operation is completed
* @return A [[akka.stream.javadsl.Source Source]] that will emit [[akka.Done]] when operation is completed
*/
def deleteObjectsByPrefix(bucket: String, prefix: Optional[String], s3Headers: S3Headers): Source[Done, NotUsed] =
S3.deleteObjectsByPrefix(bucket, prefix, deleteAllVersions = false, s3Headers)

/**
* Deletes all keys which have the given prefix under the specified bucket
*
* @param bucket the s3 bucket name
* @param prefix optional s3 objects prefix
* @param s3Headers any headers you want to add
* @return A [[akka.stream.javadsl.Source Source]] that will emit [[akka.Done]] when operation is completed
*/
def deleteObjectsByPrefix(bucket: String,
prefix: Optional[String],
deleteAllVersions: Boolean,
s3Headers: S3Headers): Source[Done, NotUsed] =
S3Stream
.deleteObjectsByPrefix(bucket, Option(prefix.orElse(null)), s3Headers)
.deleteObjectsByPrefix(bucket, Option(prefix.orElse(null)), deleteAllVersions, s3Headers)
.map(_ => Done.getInstance())
.asJava

/**
* Deletes all S3 Objects within the given bucket
*
* @param bucket the s3 bucket name
* @return A [[akka.stream.javadsl.Source Source]] that will emit [[java.lang.Void]] when operation is completed
* @return A [[akka.stream.javadsl.Source Source]] that will emit [[akka.Done]] when operation is completed
*/
def deleteBucketContents(bucket: String): Source[Done, NotUsed] =
S3.deleteBucketContents(bucket, deleteAllVersions = false)

/**
* Deletes all S3 Objects within the given bucket
*
* @param bucket the s3 bucket name
* @param deleteAllVersions Whether to delete all object versions as well (applies to versioned buckets)
* @return A [[akka.stream.javadsl.Source Source]] that will emit [[akka.Done]] when operation is completed
*/
def deleteBucketContents(bucket: String, deleteAllVersions: Boolean): Source[Done, NotUsed] =
S3Stream
.deleteObjectsByPrefix(bucket, None, S3Headers.empty)
.deleteObjectsByPrefix(bucket, None, deleteAllVersions, S3Headers.empty)
.map(_ => Done.getInstance())
.asJava

Expand Down
Loading

0 comments on commit 3f1b06f

Please sign in to comment.