diff --git a/src/main/scala/net/kemitix/s3thorp/awssdk/HashLookup.scala b/src/main/scala/net/kemitix/s3thorp/awssdk/HashLookup.scala new file mode 100644 index 0000000..b6332d9 --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/awssdk/HashLookup.scala @@ -0,0 +1,11 @@ +package net.kemitix.s3thorp.awssdk + +import net.kemitix.s3thorp.Sync.{LastModified, MD5Hash, RemoteKey} + +/** + * A list of objects and their MD5 hash values. + */ +case class HashLookup(byHash: Map[MD5Hash, (RemoteKey, LastModified)], + byKey: Map[RemoteKey, (MD5Hash, LastModified)]) { + +} diff --git a/src/main/scala/net/kemitix/s3thorp/awssdk/ThorpS3Client.scala b/src/main/scala/net/kemitix/s3thorp/awssdk/ThorpS3Client.scala index 8ba970f..fb14728 100644 --- a/src/main/scala/net/kemitix/s3thorp/awssdk/ThorpS3Client.scala +++ b/src/main/scala/net/kemitix/s3thorp/awssdk/ThorpS3Client.scala @@ -4,22 +4,49 @@ import cats.effect.IO import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient import net.kemitix.s3thorp.Sync._ import software.amazon.awssdk.core.async.AsyncRequestBody -import software.amazon.awssdk.services.s3.model.{HeadObjectRequest, PutObjectRequest} +import software.amazon.awssdk.services.s3.model.{HeadObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Object} + +import scala.collection.JavaConverters._ private class ThorpS3Client(s3Client: S3CatsIOClient) extends S3Client { - def objectHead(bucket: Bucket, remoteKey: RemoteKey) = { - val request = HeadObjectRequest.builder().bucket(bucket).key(remoteKey).build() + def objectHead(bucket: Bucket, remoteKey: RemoteKey): IO[Option[(MD5Hash, LastModified)]] = { + val request = HeadObjectRequest.builder() + .bucket(bucket) + .key(remoteKey) + .build() s3Client.headObject(request).attempt.map { - case Right(response) => Some((response.eTag(), response.lastModified())) + case Right(r) => Some((r.eTag(), r.lastModified())) case Left(_) => None } } def upload(localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey): IO[Either[Throwable, MD5Hash]] = { - val request = PutObjectRequest.builder().bucket(bucket).key(remoteKey).build() + val request = PutObjectRequest.builder() + .bucket(bucket) + .key(remoteKey) + .build() val body = AsyncRequestBody.fromFile(localFile) - s3Client.putObject(request, body).map{response => Right(response.eTag())} + s3Client.putObject(request, body).map(r => Right(r.eTag())) + } + + private def asHashLookup: Stream[S3Object] => HashLookup = + os => HashLookup(byHash(os), byKey(os)) + + private def byHash(os: Stream[S3Object]) = + os.map{o => (o.eTag, (o.key, o.lastModified))}.toMap + + private def byKey(os: Stream[S3Object]) = + os.map{o => (o.key(), (o.eTag(), o.lastModified()))}.toMap + + def listObjects(bucket: Bucket, prefix: RemoteKey): IO[HashLookup] = { + val request = ListObjectsV2Request.builder() + .bucket(bucket) + .prefix(prefix) + .build() + s3Client.listObjectsV2(request) + .map(r => r.contents.asScala.toStream) + .map(asHashLookup) } } diff --git a/src/test/scala/net/kemitix/s3thorp/awssdk/ThorpS3ClientSuite.scala b/src/test/scala/net/kemitix/s3thorp/awssdk/ThorpS3ClientSuite.scala new file mode 100644 index 0000000..5f8a8d3 --- /dev/null +++ b/src/test/scala/net/kemitix/s3thorp/awssdk/ThorpS3ClientSuite.scala @@ -0,0 +1,49 @@ +package net.kemitix.s3thorp.awssdk + +import java.time.Instant +import scala.collection.JavaConverters._ + +import cats.effect.IO +import com.github.j5ik2o.reactive.aws.s3.S3AsyncClient +import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient +import org.scalatest.FunSpec +import software.amazon.awssdk.services.s3 +import software.amazon.awssdk.services.s3.model.{ListObjectsV2Request, ListObjectsV2Response, S3Object} + +class ThorpS3ClientSuite extends FunSpec { + + describe("listObjectsInPrefix") { + val h1 = "hash1" + val k1 = "key1" + val lm1 = Instant.now + val o1 = S3Object.builder.eTag(h1).key(k1).lastModified(lm1).build + val h2 = "hash2" + val k2 = "key2" + val lm2 = Instant.now.minusSeconds(200) + val o2 = S3Object.builder.eTag(h2).key(k2).lastModified(lm2).build + val myFakeResponse: IO[ListObjectsV2Response] = IO{ + ListObjectsV2Response.builder() + .contents(List(o1, o2).asJava) + .build() + } + val subject = new ThorpS3Client(new S3CatsIOClient { + override val underlying: S3AsyncClient = new S3AsyncClient { + override val underlying: s3.S3AsyncClient = new s3.S3AsyncClient { + override def serviceName(): String = "fake-s3-client" + + override def close(): Unit = () + } + } + override def listObjectsV2(listObjectsV2Request: ListObjectsV2Request) = + myFakeResponse + }) + it("should build list of hash lookups") { + val result: HashLookup = subject.listObjects("bucket", "prefix").unsafeRunSync() + val expected = HashLookup( + Map(h1 -> (k1, lm1), h2 -> (k2, lm2)), + Map(k1 -> (h1, lm1), k2 -> (h2, lm2))) + assertResult(expected)(result) + } + } + +}