[awssdk] add listObjects
This commit is contained in:
parent
64bf42921d
commit
74be5ec1ac
3 changed files with 93 additions and 6 deletions
11
src/main/scala/net/kemitix/s3thorp/awssdk/HashLookup.scala
Normal file
11
src/main/scala/net/kemitix/s3thorp/awssdk/HashLookup.scala
Normal file
|
@ -0,0 +1,11 @@
|
||||||
|
package net.kemitix.s3thorp.awssdk
|
||||||
|
|
||||||
|
import net.kemitix.s3thorp.Sync.{LastModified, MD5Hash, RemoteKey}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A list of objects and their MD5 hash values.
|
||||||
|
*/
|
||||||
|
case class HashLookup(byHash: Map[MD5Hash, (RemoteKey, LastModified)],
|
||||||
|
byKey: Map[RemoteKey, (MD5Hash, LastModified)]) {
|
||||||
|
|
||||||
|
}
|
|
@ -4,22 +4,49 @@ import cats.effect.IO
|
||||||
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
|
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
|
||||||
import net.kemitix.s3thorp.Sync._
|
import net.kemitix.s3thorp.Sync._
|
||||||
import software.amazon.awssdk.core.async.AsyncRequestBody
|
import software.amazon.awssdk.core.async.AsyncRequestBody
|
||||||
import software.amazon.awssdk.services.s3.model.{HeadObjectRequest, PutObjectRequest}
|
import software.amazon.awssdk.services.s3.model.{HeadObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Object}
|
||||||
|
|
||||||
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
private class ThorpS3Client(s3Client: S3CatsIOClient) extends S3Client {
|
private class ThorpS3Client(s3Client: S3CatsIOClient) extends S3Client {
|
||||||
|
|
||||||
def objectHead(bucket: Bucket, remoteKey: RemoteKey) = {
|
def objectHead(bucket: Bucket, remoteKey: RemoteKey): IO[Option[(MD5Hash, LastModified)]] = {
|
||||||
val request = HeadObjectRequest.builder().bucket(bucket).key(remoteKey).build()
|
val request = HeadObjectRequest.builder()
|
||||||
|
.bucket(bucket)
|
||||||
|
.key(remoteKey)
|
||||||
|
.build()
|
||||||
s3Client.headObject(request).attempt.map {
|
s3Client.headObject(request).attempt.map {
|
||||||
case Right(response) => Some((response.eTag(), response.lastModified()))
|
case Right(r) => Some((r.eTag(), r.lastModified()))
|
||||||
case Left(_) => None
|
case Left(_) => None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def upload(localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey): IO[Either[Throwable, MD5Hash]] = {
|
def upload(localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey): IO[Either[Throwable, MD5Hash]] = {
|
||||||
val request = PutObjectRequest.builder().bucket(bucket).key(remoteKey).build()
|
val request = PutObjectRequest.builder()
|
||||||
|
.bucket(bucket)
|
||||||
|
.key(remoteKey)
|
||||||
|
.build()
|
||||||
val body = AsyncRequestBody.fromFile(localFile)
|
val body = AsyncRequestBody.fromFile(localFile)
|
||||||
s3Client.putObject(request, body).map{response => Right(response.eTag())}
|
s3Client.putObject(request, body).map(r => Right(r.eTag()))
|
||||||
|
}
|
||||||
|
|
||||||
|
private def asHashLookup: Stream[S3Object] => HashLookup =
|
||||||
|
os => HashLookup(byHash(os), byKey(os))
|
||||||
|
|
||||||
|
private def byHash(os: Stream[S3Object]) =
|
||||||
|
os.map{o => (o.eTag, (o.key, o.lastModified))}.toMap
|
||||||
|
|
||||||
|
private def byKey(os: Stream[S3Object]) =
|
||||||
|
os.map{o => (o.key(), (o.eTag(), o.lastModified()))}.toMap
|
||||||
|
|
||||||
|
def listObjects(bucket: Bucket, prefix: RemoteKey): IO[HashLookup] = {
|
||||||
|
val request = ListObjectsV2Request.builder()
|
||||||
|
.bucket(bucket)
|
||||||
|
.prefix(prefix)
|
||||||
|
.build()
|
||||||
|
s3Client.listObjectsV2(request)
|
||||||
|
.map(r => r.contents.asScala.toStream)
|
||||||
|
.map(asHashLookup)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,49 @@
|
||||||
|
package net.kemitix.s3thorp.awssdk
|
||||||
|
|
||||||
|
import java.time.Instant
|
||||||
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
|
import cats.effect.IO
|
||||||
|
import com.github.j5ik2o.reactive.aws.s3.S3AsyncClient
|
||||||
|
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
|
||||||
|
import org.scalatest.FunSpec
|
||||||
|
import software.amazon.awssdk.services.s3
|
||||||
|
import software.amazon.awssdk.services.s3.model.{ListObjectsV2Request, ListObjectsV2Response, S3Object}
|
||||||
|
|
||||||
|
class ThorpS3ClientSuite extends FunSpec {
|
||||||
|
|
||||||
|
describe("listObjectsInPrefix") {
|
||||||
|
val h1 = "hash1"
|
||||||
|
val k1 = "key1"
|
||||||
|
val lm1 = Instant.now
|
||||||
|
val o1 = S3Object.builder.eTag(h1).key(k1).lastModified(lm1).build
|
||||||
|
val h2 = "hash2"
|
||||||
|
val k2 = "key2"
|
||||||
|
val lm2 = Instant.now.minusSeconds(200)
|
||||||
|
val o2 = S3Object.builder.eTag(h2).key(k2).lastModified(lm2).build
|
||||||
|
val myFakeResponse: IO[ListObjectsV2Response] = IO{
|
||||||
|
ListObjectsV2Response.builder()
|
||||||
|
.contents(List(o1, o2).asJava)
|
||||||
|
.build()
|
||||||
|
}
|
||||||
|
val subject = new ThorpS3Client(new S3CatsIOClient {
|
||||||
|
override val underlying: S3AsyncClient = new S3AsyncClient {
|
||||||
|
override val underlying: s3.S3AsyncClient = new s3.S3AsyncClient {
|
||||||
|
override def serviceName(): String = "fake-s3-client"
|
||||||
|
|
||||||
|
override def close(): Unit = ()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
override def listObjectsV2(listObjectsV2Request: ListObjectsV2Request) =
|
||||||
|
myFakeResponse
|
||||||
|
})
|
||||||
|
it("should build list of hash lookups") {
|
||||||
|
val result: HashLookup = subject.listObjects("bucket", "prefix").unsafeRunSync()
|
||||||
|
val expected = HashLookup(
|
||||||
|
Map(h1 -> (k1, lm1), h2 -> (k2, lm2)),
|
||||||
|
Map(k1 -> (h1, lm1), k2 -> (h2, lm2)))
|
||||||
|
assertResult(expected)(result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in a new issue