diff --git a/src/main/scala/net/kemitix/s3thorp/Main.scala b/src/main/scala/net/kemitix/s3thorp/Main.scala index 757db0b..06b2e79 100644 --- a/src/main/scala/net/kemitix/s3thorp/Main.scala +++ b/src/main/scala/net/kemitix/s3thorp/Main.scala @@ -12,7 +12,7 @@ object Main extends IOApp { val defaultConfig: Config = Config("(none)", "", Paths.get(".").toFile) - val sync = new Sync() + val sync = new Sync(new ReactiveS3Client()) def program(args: List[String]): IO[ExitCode] = for { diff --git a/src/main/scala/net/kemitix/s3thorp/ReactiveS3Client.scala b/src/main/scala/net/kemitix/s3thorp/ReactiveS3Client.scala new file mode 100644 index 0000000..4d45820 --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/ReactiveS3Client.scala @@ -0,0 +1,20 @@ +package net.kemitix.s3thorp + +import cats.effect.IO +import com.github.j5ik2o.reactive.aws.s3.S3AsyncClient +import software.amazon.awssdk.services.s3.model.HeadObjectRequest +import software.amazon.awssdk.services.s3.{S3AsyncClient => JavaS3AsyncClient} + +class ReactiveS3Client extends S3Client { + + val s3Client = S3AsyncClient(JavaS3AsyncClient.create()) + + override def objectHead(bucket: String, key: String) = { + IO.fromFuture(IO( + s3Client.headObject(HeadObjectRequest.builder() + .bucket(bucket) + .key(key) + .build()))). + map(r => (r.eTag(), r.lastModified())) + } +} diff --git a/src/main/scala/net/kemitix/s3thorp/S3Client.scala b/src/main/scala/net/kemitix/s3thorp/S3Client.scala index 7ce20cf..af049a9 100644 --- a/src/main/scala/net/kemitix/s3thorp/S3Client.scala +++ b/src/main/scala/net/kemitix/s3thorp/S3Client.scala @@ -1,9 +1,10 @@ package net.kemitix.s3thorp +import cats.effect.IO import net.kemitix.s3thorp.Sync.{Hash, LastModified} trait S3Client { - def objectHead(bucket: String, key: String): (Hash, LastModified) + def objectHead(bucket: String, key: String): IO[(Hash, LastModified)] } diff --git a/src/main/scala/net/kemitix/s3thorp/S3MetaDataEnricher.scala b/src/main/scala/net/kemitix/s3thorp/S3MetaDataEnricher.scala index 6ec12ac..f2741f7 100644 --- a/src/main/scala/net/kemitix/s3thorp/S3MetaDataEnricher.scala +++ b/src/main/scala/net/kemitix/s3thorp/S3MetaDataEnricher.scala @@ -18,7 +18,7 @@ trait S3MetaDataEnricher extends S3Client { Stream.eval(for { _ <- putStrLn(s"enrich: $file") key = fileToString(file) - head <- IO(objectHead(c.bucket, key)) + head <- objectHead(c.bucket, key) (hash, lastModified) = head } yield S3MetaData(file, key, hash, lastModified)) } diff --git a/src/main/scala/net/kemitix/s3thorp/Sync.scala b/src/main/scala/net/kemitix/s3thorp/Sync.scala index 7732aaa..4e99918 100644 --- a/src/main/scala/net/kemitix/s3thorp/Sync.scala +++ b/src/main/scala/net/kemitix/s3thorp/Sync.scala @@ -1,19 +1,18 @@ package net.kemitix.s3thorp import java.io.File -import java.nio.file.Path import java.time.Instant import cats.effect._ import fs2.Stream import net.kemitix.s3thorp.Main.putStrLn -import net.kemitix.s3thorp.Sync.{Hash, LastModified} import scala.concurrent.Promise -class Sync extends LocalFileStream with S3MetaDataEnricher { +class Sync(s3Client: S3Client) extends LocalFileStream with S3MetaDataEnricher { - override def objectHead(bucket: String, key: String): (Hash, LastModified) = ??? + override def objectHead(bucket: String, key: String)= + s3Client.objectHead(bucket, key) def run(c: Config): IO[Unit] = for { _ <- putStrLn(s"Bucket: ${c.bucket}, Prefix: ${c.prefix}, Source: ${c.source}") diff --git a/src/test/scala/net/kemitix/s3thorp/S3MetaDataEnricherSuite.scala b/src/test/scala/net/kemitix/s3thorp/S3MetaDataEnricherSuite.scala index cd9a830..e33c6d7 100644 --- a/src/test/scala/net/kemitix/s3thorp/S3MetaDataEnricherSuite.scala +++ b/src/test/scala/net/kemitix/s3thorp/S3MetaDataEnricherSuite.scala @@ -3,7 +3,6 @@ package net.kemitix.s3thorp import java.io.File import java.nio.file.Paths -import net.kemitix.s3thorp.Sync.{Hash, LastModified} import org.scalatest.FunSpec class S3MetaDataEnricherSuite extends FunSpec { @@ -34,6 +33,6 @@ class S3MetaDataEnricherSuite extends FunSpec { } } } - override def objectHead(bucket: String, key: String): (Hash, LastModified) = ??? + override def objectHead(bucket: String, key: String) = ??? } }