[ReactiveS3Client] Added

This commit is contained in:
Paul Campbell 2019-05-08 08:35:25 +01:00
parent ae3243ad21
commit 387904d1af
6 changed files with 28 additions and 9 deletions

View file

@ -12,7 +12,7 @@ object Main extends IOApp {
val defaultConfig: Config = val defaultConfig: Config =
Config("(none)", "", Paths.get(".").toFile) Config("(none)", "", Paths.get(".").toFile)
val sync = new Sync() val sync = new Sync(new ReactiveS3Client())
def program(args: List[String]): IO[ExitCode] = def program(args: List[String]): IO[ExitCode] =
for { for {

View file

@ -0,0 +1,20 @@
package net.kemitix.s3thorp
import cats.effect.IO
import com.github.j5ik2o.reactive.aws.s3.S3AsyncClient
import software.amazon.awssdk.services.s3.model.HeadObjectRequest
import software.amazon.awssdk.services.s3.{S3AsyncClient => JavaS3AsyncClient}
class ReactiveS3Client extends S3Client {
val s3Client = S3AsyncClient(JavaS3AsyncClient.create())
override def objectHead(bucket: String, key: String) = {
IO.fromFuture(IO(
s3Client.headObject(HeadObjectRequest.builder()
.bucket(bucket)
.key(key)
.build()))).
map(r => (r.eTag(), r.lastModified()))
}
}

View file

@ -1,9 +1,10 @@
package net.kemitix.s3thorp package net.kemitix.s3thorp
import cats.effect.IO
import net.kemitix.s3thorp.Sync.{Hash, LastModified} import net.kemitix.s3thorp.Sync.{Hash, LastModified}
trait S3Client { trait S3Client {
def objectHead(bucket: String, key: String): (Hash, LastModified) def objectHead(bucket: String, key: String): IO[(Hash, LastModified)]
} }

View file

@ -18,7 +18,7 @@ trait S3MetaDataEnricher extends S3Client {
Stream.eval(for { Stream.eval(for {
_ <- putStrLn(s"enrich: $file") _ <- putStrLn(s"enrich: $file")
key = fileToString(file) key = fileToString(file)
head <- IO(objectHead(c.bucket, key)) head <- objectHead(c.bucket, key)
(hash, lastModified) = head (hash, lastModified) = head
} yield S3MetaData(file, key, hash, lastModified)) } yield S3MetaData(file, key, hash, lastModified))
} }

View file

@ -1,19 +1,18 @@
package net.kemitix.s3thorp package net.kemitix.s3thorp
import java.io.File import java.io.File
import java.nio.file.Path
import java.time.Instant import java.time.Instant
import cats.effect._ import cats.effect._
import fs2.Stream import fs2.Stream
import net.kemitix.s3thorp.Main.putStrLn import net.kemitix.s3thorp.Main.putStrLn
import net.kemitix.s3thorp.Sync.{Hash, LastModified}
import scala.concurrent.Promise import scala.concurrent.Promise
class Sync extends LocalFileStream with S3MetaDataEnricher { class Sync(s3Client: S3Client) extends LocalFileStream with S3MetaDataEnricher {
override def objectHead(bucket: String, key: String): (Hash, LastModified) = ??? override def objectHead(bucket: String, key: String)=
s3Client.objectHead(bucket, key)
def run(c: Config): IO[Unit] = for { def run(c: Config): IO[Unit] = for {
_ <- putStrLn(s"Bucket: ${c.bucket}, Prefix: ${c.prefix}, Source: ${c.source}") _ <- putStrLn(s"Bucket: ${c.bucket}, Prefix: ${c.prefix}, Source: ${c.source}")

View file

@ -3,7 +3,6 @@ package net.kemitix.s3thorp
import java.io.File import java.io.File
import java.nio.file.Paths import java.nio.file.Paths
import net.kemitix.s3thorp.Sync.{Hash, LastModified}
import org.scalatest.FunSpec import org.scalatest.FunSpec
class S3MetaDataEnricherSuite extends FunSpec { class S3MetaDataEnricherSuite extends FunSpec {
@ -34,6 +33,6 @@ class S3MetaDataEnricherSuite extends FunSpec {
} }
} }
} }
override def objectHead(bucket: String, key: String): (Hash, LastModified) = ??? override def objectHead(bucket: String, key: String) = ???
} }
} }