Skip to content

Commit

Permalink
Merge pull request #6 from holinov/master
Browse files Browse the repository at this point in the history
Added static auth provider and custom response transformer to getObject
  • Loading branch information
tampler authored Jul 12, 2020
2 parents 6fd3ab4 + af558f9 commit 9766abe
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 54 deletions.
85 changes: 49 additions & 36 deletions src/main/scala/AwsLink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

package zio_aws_s3

import java.nio.file.{ Paths }
import java.nio.file.Paths
import java.util.concurrent.CompletableFuture

import scala.collection.JavaConverters._
import com.github.ghik.silencer.silent

import zio.{ Has, IO, Task, ZIO, ZLayer }
import zio.{ Has, IO, RLayer, Task, URLayer, ZIO, ZLayer }
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.services.s3.model.{
Expand All @@ -39,6 +39,7 @@ import software.amazon.awssdk.services.s3.model.{
GetObjectRequest,
GetObjectResponse,
Grant,
Grantee,
ListBucketsResponse,
ListObjectsV2Request,
ListObjectsV2Response,
Expand All @@ -50,22 +51,35 @@ import software.amazon.awssdk.services.s3.model.{
PutObjectResponse,
Type
}

import java.net.URLEncoder
import java.nio.charset.StandardCharsets
import java.util.{ List => JList }
import java.net.URI

import software.amazon.awssdk.auth.credentials.{ AwsCredentials, StaticCredentialsProvider }
import software.amazon.awssdk.core.async.AsyncResponseTransformer

object AwsAgent {
def createClient(region: Region, endpointOverride: Option[String] = None): Task[S3AsyncClient] = {
def createClient(region: Region, endpointOverride: Option[String] = None): Task[S3AsyncClient] = Task {
val initBuilder = S3AsyncClient.builder.region(region)
val client = endpointOverride
endpointOverride
.map(URI.create)
.map(initBuilder.endpointOverride)
.getOrElse(initBuilder)
.build
}

Task(client)
def createClientWithCreds(
region: Region,
creds: AwsCredentials,
endpointOverride: Option[String] = None
): Task[S3AsyncClient] = Task {
val initBuilder = S3AsyncClient.builder.region(region).credentialsProvider(StaticCredentialsProvider.create(creds))
endpointOverride
.map(URI.create)
.map(initBuilder.endpointOverride)
.getOrElse(initBuilder)
.build
}
}

Expand All @@ -78,10 +92,10 @@ package object AwsApp {
val s3: S3AsyncClient
}

val any: ZLayer[ExtDeps, Nothing, ExtDeps] =
ZLayer.requires[ExtDeps]
val any: URLayer[ExtDeps, ExtDeps] = ZLayer.requires[ExtDeps]

val live = ZLayer.fromFunction((curr: S3AsyncClient) => new ExtDeps.Service { val s3 = curr })
val live: URLayer[S3AsyncClient, Has[Service]] =
ZLayer.fromFunction((curr: S3AsyncClient) => new ExtDeps.Service { val s3: S3AsyncClient = curr })
}

type AwsLink = Has[AwsLink.Service[Any]]
Expand All @@ -90,10 +104,10 @@ package object AwsApp {

trait Service[R] extends GenericLink[R] {}

val any: ZLayer[AwsLink, Nothing, AwsLink] =
val any: URLayer[AwsLink, AwsLink] =
ZLayer.requires[AwsLink]

val live: ZLayer[ExtDeps, Throwable, AwsLink] = ZLayer.fromService { (deps: ExtDeps.Service) =>
val live: RLayer[ExtDeps, AwsLink] = ZLayer.fromService { (deps: ExtDeps.Service) =>
new Service[Any] {

def createBucket(buck: String): Task[CreateBucketResponse] =
Expand All @@ -103,6 +117,7 @@ package object AwsApp {
callback
)
}

def delBucket(buck: String): Task[DeleteBucketResponse] =
IO.effectAsync[Throwable, DeleteBucketResponse] { callback =>
processResponse(
Expand Down Expand Up @@ -137,34 +152,27 @@ package object AwsApp {
for {
list <- listBucketObjects(buck, prefix)
keys = list.contents.asScala.map(_.key).toList
_ = println(s">>>>>> Total keys found for prefix ${prefix}: ${keys.size}")
} yield keys

def lookupObject(buck: String, prefix: String, key: String): Task[Boolean] =
for {
list <- listBucketObjects(buck, prefix)
newKey = prefix + "/" + key
res = list.contents.asScala
.filter(_.key == newKey)
.nonEmpty
_ = println(res)
res = list.contents.asScala.exists(_.key == newKey)
} yield res

def getObjectAcl(buck: String, key: String): Task[GetObjectAclResponse] = {
val req = GetObjectAclRequest.builder.bucket(buck).key(key).build

println(s">>>>>> Get ACL for key: ${key}")

IO.effectAsync[Throwable, GetObjectAclResponse](callback =>
processResponse(deps.s3.getObjectAcl(req), callback)
)
.mapError(_ => new Throwable("Failed Processing CopyObjectResponse"))
.orElseFail(new Throwable("Failed Processing CopyObjectResponse"))
}

def putObjectAcl(buck: String, key: String, owner: Owner, grants: JList[Grant]): Task[PutObjectAclResponse] =
for {
acl <- Task.effect(AccessControlPolicy.builder.owner(owner).grants(grants).build)
_ = println(s">>>>>> PUT ACL for key: ${key}")
req <- Task.effect(PutObjectAclRequest.builder.bucket(buck).key(key).accessControlPolicy(acl).build)
rsp <- IO
.effectAsync[Throwable, PutObjectAclResponse] { callback =>
Expand All @@ -180,10 +188,10 @@ package object AwsApp {
} yield ()

def blockPack(buck: String, prefix: String): Task[Unit] =
putPackAcl(buck, prefix, true).as(())
putPackAcl(buck, prefix, block = true).unit

def unblockPack(buck: String, prefix: String): Task[Unit] =
putPackAcl(buck, prefix, false).as(())
putPackAcl(buck, prefix, block = false).unit

def getPackAcl(buck: String, prefix: String): Task[List[GetObjectAclResponse]] =
for {
Expand All @@ -199,18 +207,17 @@ package object AwsApp {
buck,
keys.head
) // read ACL for the first element in a pack. Assume all others have the same ACL in the pack
owner = acl.owner // Evaluate owner and grants to avoid multiple calls
grGrant <- Task.effect(
Grant
.builder()
.grantee { bld =>
.grantee { bld: Grantee.Builder =>
bld
.id("dev-assets")
.`type`(Type.CANONICAL_USER)
.displayName("DEV Assets User")
}
.permission(Permission.FULL_CONTROL)
.grantee { bld =>
.grantee { bld: Grantee.Builder =>
bld
.`type`(Type.GROUP)
.uri("http://acs.amazonaws.com/groups/global/AllUsers")
Expand All @@ -232,9 +239,6 @@ package object AwsApp {
val src = URLEncoder.encode(buck + "/" + srcKey, StandardCharsets.UTF_8.toString)
val dst = URLEncoder.encode(buck + "/" + dstKey, StandardCharsets.UTF_8.toString)

println(s">>>>>> Copy Src Link: ${src}")
println(s">>>>>> Copy Dst link: ${dst}")

for {
req <- IO.effect(
CopyObjectRequest.builder
Expand All @@ -248,7 +252,7 @@ package object AwsApp {
.effectAsync[Throwable, CopyObjectResponse] { callback =>
processResponse(deps.s3.copyObject(req), callback)
}
.mapError(_ => new Throwable("Failed Processing CopyObjectResponse"))
.orElseFail(new Throwable("Failed Processing CopyObjectResponse"))
} yield rsp
}

Expand All @@ -268,6 +272,17 @@ package object AwsApp {
)
}

def getObject[G](
buck: String,
key: String,
transformer: AsyncResponseTransformer[GetObjectResponse, G]
): AwsTask[G] = IO.effectAsync { callback =>
processResponse(
deps.s3.getObject(GetObjectRequest.builder.bucket(buck).key(key).build, transformer),
callback
)
}

def delObject(buck: String, key: String): Task[DeleteObjectResponse] =
IO.effectAsync[Throwable, DeleteObjectResponse] { callback =>
processResponse(
Expand All @@ -282,17 +297,15 @@ package object AwsApp {
_ <- Task.foreach(keys)(key => delObject(buck, key))
} yield ()

@silent("discarded non-Unit value")
def processResponse[T](
fut: CompletableFuture[T],
callback: Task[T] => Unit
): Unit =
def processResponse[T](fut: CompletableFuture[T], callback: Task[T] => Unit): Unit = {
fut.handle[Unit] { (response, err) =>
err match {
case null => callback(IO.succeed(response))
case ex => callback(IO.fail(ex))
}
}: Unit
}
()
}
}
}

Expand Down
31 changes: 13 additions & 18 deletions src/main/scala/GenericLink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,12 @@

package zio_aws_s3

import zio.{ RIO }
// import software.amazon.awssdk.regions.Region
// import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.services.s3.model.{
CopyObjectResponse,
CreateBucketResponse,
DeleteBucketResponse,
DeleteObjectResponse,
GetObjectAclResponse,
GetObjectResponse,
Grant,
ListBucketsResponse,
ListObjectsV2Response,
Owner,
PutObjectAclResponse,
PutObjectResponse
}

import software.amazon.awssdk.core.async.AsyncResponseTransformer
import zio.RIO
import java.util.{ List => JList }

import software.amazon.awssdk.services.s3.model._

trait GenericLink[R] {
type AwsTask[+A] = RIO[R, A]

Expand Down Expand Up @@ -109,6 +95,15 @@ trait GenericLink[R] {
*/
def getObject(buck: String, key: String, file: String): AwsTask[GetObjectResponse]

/**
* Get a file with a key from a Bucket using custom response transformer
*/
def getObject[G](
buck: String,
key: String,
transformer: AsyncResponseTransformer[GetObjectResponse, G]
): AwsTask[G]

/**
* Delete object by key from a Bucket
*/
Expand Down

0 comments on commit 9766abe

Please sign in to comment.