[awssdk] Rewritten and simplified AWS SDK interface
This commit is contained in:
parent
fae876b554
commit
bc1bffc345
8 changed files with 65 additions and 79 deletions
|
@ -1,9 +0,0 @@
|
||||||
package net.kemitix.s3thorp.awssdk
|
|
||||||
|
|
||||||
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
|
|
||||||
|
|
||||||
trait CatsIOS3Client extends S3CatsIOClientProvider {
|
|
||||||
|
|
||||||
def s3Client = S3CatsIOClient(underlying)
|
|
||||||
|
|
||||||
}
|
|
|
@ -4,8 +4,9 @@ import com.github.j5ik2o.reactive.aws.s3.S3AsyncClient
|
||||||
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
|
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
|
||||||
import software.amazon.awssdk.services.s3.{S3AsyncClient => JavaS3AsyncClient}
|
import software.amazon.awssdk.services.s3.{S3AsyncClient => JavaS3AsyncClient}
|
||||||
|
|
||||||
trait UnderlyingS3AsyncClient extends S3CatsIOClient{
|
trait JavaClientWrapper extends S3CatsIOClient {
|
||||||
|
|
||||||
override val underlying: S3AsyncClient = S3AsyncClient(JavaS3AsyncClient.create)
|
override val underlying: S3AsyncClient =
|
||||||
|
S3AsyncClient(JavaS3AsyncClient.create)
|
||||||
|
|
||||||
}
|
}
|
|
@ -1,9 +0,0 @@
|
||||||
package net.kemitix.s3thorp.awssdk
|
|
||||||
|
|
||||||
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
|
|
||||||
|
|
||||||
trait S3CatsIOClientProvider extends UnderlyingS3AsyncClient {
|
|
||||||
|
|
||||||
def s3Client: S3CatsIOClient
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,7 +1,8 @@
|
||||||
package net.kemitix.s3thorp.awssdk
|
package net.kemitix.s3thorp.awssdk
|
||||||
|
|
||||||
import cats.effect.IO
|
import cats.effect.IO
|
||||||
import net.kemitix.s3thorp.Sync.{Bucket, MD5Hash, LastModified, LocalFile, RemoteKey}
|
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
|
||||||
|
import net.kemitix.s3thorp.Sync.{Bucket, LastModified, LocalFile, MD5Hash, RemoteKey}
|
||||||
|
|
||||||
trait S3Client {
|
trait S3Client {
|
||||||
|
|
||||||
|
@ -13,6 +14,8 @@ trait S3Client {
|
||||||
|
|
||||||
object S3Client {
|
object S3Client {
|
||||||
|
|
||||||
val defaultClient: S3Client = new ReactiveS3Client
|
val defaultClient: S3Client =
|
||||||
|
new ThropS3Client(
|
||||||
|
S3CatsIOClient(new JavaClientWrapper {}.underlying))
|
||||||
|
|
||||||
}
|
}
|
|
@ -1,27 +1,27 @@
|
||||||
package net.kemitix.s3thorp.awssdk
|
package net.kemitix.s3thorp.awssdk
|
||||||
|
|
||||||
import cats.effect.IO
|
import cats.effect.IO
|
||||||
import net.kemitix.s3thorp.Sync.{Bucket, LocalFile, RemoteKey}
|
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
|
||||||
|
import net.kemitix.s3thorp.Sync.{Bucket, LastModified, LocalFile, MD5Hash, RemoteKey}
|
||||||
import software.amazon.awssdk.core.async.AsyncRequestBody
|
import software.amazon.awssdk.core.async.AsyncRequestBody
|
||||||
import software.amazon.awssdk.services.s3.model.{HeadObjectRequest, PutObjectRequest}
|
import software.amazon.awssdk.services.s3.model.{HeadObjectRequest, PutObjectRequest}
|
||||||
|
|
||||||
private class ReactiveS3Client
|
private class ThropS3Client(s3Client: S3CatsIOClient) extends S3Client {
|
||||||
extends S3Client
|
|
||||||
with CatsIOS3Client {
|
|
||||||
|
|
||||||
override def objectHead(bucket: Bucket, remoteKey: RemoteKey) = {
|
def objectHead(bucket: Bucket, remoteKey: RemoteKey): IO[Option[(MD5Hash, LastModified)]] = {
|
||||||
val request = HeadObjectRequest.builder().bucket(bucket).key(remoteKey).build()
|
val request = HeadObjectRequest.builder().bucket(bucket).key(remoteKey).build()
|
||||||
s3Client.headObject(request).attempt.map {
|
s3Client.headObject(request).attempt.map {
|
||||||
case Right(response) => Some((response.eTag(), response.lastModified()))
|
case Right(response) => Some((response.eTag(), response.lastModified()))
|
||||||
case Left(_) =>None
|
case Left(_) => None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override def upload(localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey): IO[Unit] = {
|
def upload(localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey): IO[Unit] = {
|
||||||
val request = PutObjectRequest.builder().bucket(bucket).key(remoteKey).build()
|
val request = PutObjectRequest.builder().bucket(bucket).key(remoteKey).build()
|
||||||
val body = AsyncRequestBody.fromFile(localFile)
|
val body = AsyncRequestBody.fromFile(localFile)
|
||||||
for {
|
for {
|
||||||
_ <- s3Client.putObject(request, body)
|
_ <- s3Client.putObject(request, body)
|
||||||
} yield ()
|
} yield ()
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -12,11 +12,14 @@ class SyncSuite extends FunSpec {
|
||||||
val hash = "hash"
|
val hash = "hash"
|
||||||
val lastModified = Instant.now()
|
val lastModified = Instant.now()
|
||||||
val sync = new Sync(new S3Client with DummyS3Client {
|
val sync = new Sync(new S3Client with DummyS3Client {
|
||||||
override def objectHead(bucket: String, key: String) = IO(Some((hash, lastModified)))
|
override def objectHead(bucket: String, key: String) =
|
||||||
|
IO(Some((hash, lastModified)))
|
||||||
})
|
})
|
||||||
describe("objectHead") {
|
describe("objectHead") {
|
||||||
it("return the hash and lastModified expected") {
|
it("return the hash and lastModified expected") {
|
||||||
assertResult(Some((hash, lastModified)))(sync.objectHead("", "").unsafeRunSync())
|
assertResult(Some((hash, lastModified)))(
|
||||||
|
sync.objectHead("", "").
|
||||||
|
unsafeRunSync())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,48 +0,0 @@
|
||||||
package net.kemitix.s3thorp.awssdk
|
|
||||||
|
|
||||||
import java.time.Instant
|
|
||||||
|
|
||||||
import cats.effect.IO
|
|
||||||
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
|
|
||||||
import org.scalatest.FunSpec
|
|
||||||
import software.amazon.awssdk.services.s3.model.{HeadObjectRequest, HeadObjectResponse, NoSuchKeyException}
|
|
||||||
|
|
||||||
class ReactiveS3ClientTest extends FunSpec {
|
|
||||||
|
|
||||||
describe("testObjectHead") {
|
|
||||||
def invoke(self: S3Client) = {
|
|
||||||
self.objectHead("bucket", "remoteKey").unsafeRunSync()
|
|
||||||
}
|
|
||||||
|
|
||||||
describe("when underlying client response is okay") {
|
|
||||||
val expectedHash = "hash"
|
|
||||||
val expectedLastModified = Instant.now
|
|
||||||
new ReactiveS3Client { self: S3Client => {
|
|
||||||
it("should return Some(expected values)") {
|
|
||||||
assertResult(Some((expectedHash, expectedLastModified)))(invoke(self))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
override def s3Client: S3CatsIOClient = new S3CatsIOClient with UnderlyingS3AsyncClient {
|
|
||||||
override def headObject(headObjectRequest: HeadObjectRequest): IO[HeadObjectResponse] =
|
|
||||||
IO(HeadObjectResponse.builder().
|
|
||||||
eTag(expectedHash).
|
|
||||||
lastModified(expectedLastModified).
|
|
||||||
build())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
describe("when underlying client throws NoSuchKeyException") {
|
|
||||||
new ReactiveS3Client { self: S3Client =>
|
|
||||||
it("should return None") {
|
|
||||||
assertResult(None)(invoke(self))
|
|
||||||
}
|
|
||||||
override def s3Client: S3CatsIOClient = new S3CatsIOClient with UnderlyingS3AsyncClient {
|
|
||||||
override def headObject(headObjectRequest: HeadObjectRequest): IO[HeadObjectResponse] =
|
|
||||||
throw NoSuchKeyException.builder().build()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -0,0 +1,45 @@
|
||||||
|
package net.kemitix.s3thorp.awssdk
|
||||||
|
|
||||||
|
import java.time.Instant
|
||||||
|
|
||||||
|
import cats.effect.IO
|
||||||
|
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
|
||||||
|
import org.scalatest.FunSpec
|
||||||
|
import software.amazon.awssdk.services.s3.model.{HeadObjectRequest, HeadObjectResponse, NoSuchKeyException}
|
||||||
|
|
||||||
|
class S3ClientSuite extends FunSpec {
|
||||||
|
|
||||||
|
describe("testObjectHead") {
|
||||||
|
def invoke(self: S3Client) = {
|
||||||
|
self.objectHead("bucket", "remoteKey").unsafeRunSync()
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("when underlying client response is okay") {
|
||||||
|
val expectedHash = "hash"
|
||||||
|
val expectedLastModified = Instant.now
|
||||||
|
val underlyingClient = new S3CatsIOClient with JavaClientWrapper {
|
||||||
|
override def headObject(headObjectRequest: HeadObjectRequest) =
|
||||||
|
IO(HeadObjectResponse.builder().
|
||||||
|
eTag(expectedHash).
|
||||||
|
lastModified(expectedLastModified).
|
||||||
|
build())
|
||||||
|
}
|
||||||
|
val s3Client = new ThropS3Client(underlyingClient)
|
||||||
|
it("should return Some(expected values)") {
|
||||||
|
assertResult(Some((expectedHash, expectedLastModified)))(invoke(s3Client))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("when underlying client throws NoSuchKeyException") {
|
||||||
|
val underlyingClient = new S3CatsIOClient with JavaClientWrapper {
|
||||||
|
override def headObject(headObjectRequest: HeadObjectRequest): IO[HeadObjectResponse] =
|
||||||
|
IO(throw NoSuchKeyException.builder().build())
|
||||||
|
}
|
||||||
|
val s3Client = new ThropS3Client(underlyingClient)
|
||||||
|
it("should return None") {
|
||||||
|
assertResult(None)(invoke(s3Client))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue