[s3client] objectHead returns an IO[Option[...]]
If the remote file is missing then return None. S3MetaDataEnricher.enrichWithS3MetaData now returns an IO[Either[File, S3MetaData]]. If objectHead returns None, the this returns the file, otherwise, the Some[Hash, LastModified] from objectHead is used to create the S3MetaData as before.
This commit is contained in:
parent
9107e6f3eb
commit
7af4004c75
5 changed files with 18 additions and 13 deletions
|
@ -17,6 +17,6 @@ class ReactiveS3Client extends S3Client {
|
||||||
for {
|
for {
|
||||||
response <- s3Client.headObject(request)
|
response <- s3Client.headObject(request)
|
||||||
// TODO catch 404 error when key doesn't exist
|
// TODO catch 404 error when key doesn't exist
|
||||||
} yield (response.eTag(), response.lastModified())
|
} yield Some((response.eTag(), response.lastModified()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,6 @@ import net.kemitix.s3thorp.Sync.{Hash, LastModified}
|
||||||
|
|
||||||
trait S3Client {
|
trait S3Client {
|
||||||
|
|
||||||
def objectHead(bucket: String, key: String): IO[(Hash, LastModified)]
|
def objectHead(bucket: String, key: String): IO[Option[(Hash, LastModified)]]
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,14 +12,16 @@ trait S3MetaDataEnricher extends S3Client {
|
||||||
s"${c.prefix}/${c.source.toPath.relativize(file.toPath)}"
|
s"${c.prefix}/${c.source.toPath.relativize(file.toPath)}"
|
||||||
}
|
}
|
||||||
|
|
||||||
def enrichWithS3MetaData(c: Config): File => Stream[IO, S3MetaData] = {
|
def enrichWithS3MetaData(c: Config): File => Stream[IO, Either[File, S3MetaData]] = {
|
||||||
val fileToString = generateKey(c)_
|
val fileToString = generateKey(c)_
|
||||||
file =>
|
file =>
|
||||||
Stream.eval(for {
|
Stream.eval(for {
|
||||||
_ <- putStrLn(s"enrich: $file")
|
_ <- putStrLn(s"enrich: $file")
|
||||||
key = fileToString(file)
|
key = fileToString(file)
|
||||||
head <- objectHead(c.bucket, key)
|
head <- objectHead(c.bucket, key)
|
||||||
(hash, lastModified) = head
|
} yield head.map {
|
||||||
} yield S3MetaData(file, key, hash, lastModified))
|
case (hash,lastModified) =>
|
||||||
|
Right(S3MetaData(file, key, hash, lastModified))
|
||||||
|
}.getOrElse(Left(file)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,12 +24,15 @@ class Sync(s3Client: S3Client) extends LocalFileStream with S3MetaDataEnricher {
|
||||||
}
|
}
|
||||||
} yield ()
|
} yield ()
|
||||||
|
|
||||||
private def uploadRequiredFilter: S3MetaData => Stream[IO, File] = s3Metadata => Stream.eval(for {
|
private def uploadRequiredFilter: Either[File, S3MetaData] => Stream[IO, File] = {
|
||||||
_ <- putStrLn(s"upload required: ${s3Metadata.localFile}")
|
case Left(file) => Stream(file)
|
||||||
//md5File(localFile)
|
case Right(s3Metadata) =>
|
||||||
//filter(localHash => options.force || localHash != metadataHash)
|
Stream.eval(for {
|
||||||
} yield s3Metadata.localFile)
|
_ <- putStrLn(s"upload required: ${s3Metadata.localFile}")
|
||||||
|
//md5File(localFile)
|
||||||
|
//filter(localHash => options.force || localHash != metadataHash)
|
||||||
|
} yield s3Metadata.localFile)
|
||||||
|
}
|
||||||
private def performUpload: File => Stream[IO, Promise[Unit]] =
|
private def performUpload: File => Stream[IO, Promise[Unit]] =
|
||||||
file => Stream.eval(for {
|
file => Stream.eval(for {
|
||||||
_ <- putStrLn(s"upload: $file")
|
_ <- putStrLn(s"upload: $file")
|
||||||
|
|
|
@ -11,11 +11,11 @@ class SyncSuite extends FunSpec {
|
||||||
val hash = "hash"
|
val hash = "hash"
|
||||||
val lastModified = Instant.now()
|
val lastModified = Instant.now()
|
||||||
val sync = new Sync(new S3Client {
|
val sync = new Sync(new S3Client {
|
||||||
override def objectHead(bucket: String, key: String) = IO((hash, lastModified))
|
override def objectHead(bucket: String, key: String) = IO(Some((hash, lastModified)))
|
||||||
})
|
})
|
||||||
describe("objectHead") {
|
describe("objectHead") {
|
||||||
it("return the hash and lastModified expected") {
|
it("return the hash and lastModified expected") {
|
||||||
assertResult((hash, lastModified))(sync.objectHead("", "").unsafeRunSync())
|
assertResult(Some((hash, lastModified)))(sync.objectHead("", "").unsafeRunSync())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue