diff --git a/src/main/scala/net/kemitix/s3thorp/ReactiveS3Client.scala b/src/main/scala/net/kemitix/s3thorp/ReactiveS3Client.scala index a8cd732..fd55068 100644 --- a/src/main/scala/net/kemitix/s3thorp/ReactiveS3Client.scala +++ b/src/main/scala/net/kemitix/s3thorp/ReactiveS3Client.scala @@ -17,6 +17,6 @@ class ReactiveS3Client extends S3Client { for { response <- s3Client.headObject(request) // TODO catch 404 error when key doesn't exist - } yield (response.eTag(), response.lastModified()) + } yield Some((response.eTag(), response.lastModified())) } } diff --git a/src/main/scala/net/kemitix/s3thorp/S3Client.scala b/src/main/scala/net/kemitix/s3thorp/S3Client.scala index af049a9..41e0220 100644 --- a/src/main/scala/net/kemitix/s3thorp/S3Client.scala +++ b/src/main/scala/net/kemitix/s3thorp/S3Client.scala @@ -5,6 +5,6 @@ import net.kemitix.s3thorp.Sync.{Hash, LastModified} trait S3Client { - def objectHead(bucket: String, key: String): IO[(Hash, LastModified)] + def objectHead(bucket: String, key: String): IO[Option[(Hash, LastModified)]] } diff --git a/src/main/scala/net/kemitix/s3thorp/S3MetaDataEnricher.scala b/src/main/scala/net/kemitix/s3thorp/S3MetaDataEnricher.scala index f2741f7..976da82 100644 --- a/src/main/scala/net/kemitix/s3thorp/S3MetaDataEnricher.scala +++ b/src/main/scala/net/kemitix/s3thorp/S3MetaDataEnricher.scala @@ -12,14 +12,16 @@ trait S3MetaDataEnricher extends S3Client { s"${c.prefix}/${c.source.toPath.relativize(file.toPath)}" } - def enrichWithS3MetaData(c: Config): File => Stream[IO, S3MetaData] = { + def enrichWithS3MetaData(c: Config): File => Stream[IO, Either[File, S3MetaData]] = { val fileToString = generateKey(c)_ file => Stream.eval(for { _ <- putStrLn(s"enrich: $file") key = fileToString(file) head <- objectHead(c.bucket, key) - (hash, lastModified) = head - } yield S3MetaData(file, key, hash, lastModified)) + } yield head.map { + case (hash,lastModified) => + Right(S3MetaData(file, key, hash, lastModified)) + }.getOrElse(Left(file))) } } diff --git a/src/main/scala/net/kemitix/s3thorp/Sync.scala b/src/main/scala/net/kemitix/s3thorp/Sync.scala index 4e99918..fa5b0cb 100644 --- a/src/main/scala/net/kemitix/s3thorp/Sync.scala +++ b/src/main/scala/net/kemitix/s3thorp/Sync.scala @@ -24,12 +24,15 @@ class Sync(s3Client: S3Client) extends LocalFileStream with S3MetaDataEnricher { } } yield () - private def uploadRequiredFilter: S3MetaData => Stream[IO, File] = s3Metadata => Stream.eval(for { - _ <- putStrLn(s"upload required: ${s3Metadata.localFile}") - //md5File(localFile) - //filter(localHash => options.force || localHash != metadataHash) - } yield s3Metadata.localFile) - + private def uploadRequiredFilter: Either[File, S3MetaData] => Stream[IO, File] = { + case Left(file) => Stream(file) + case Right(s3Metadata) => + Stream.eval(for { + _ <- putStrLn(s"upload required: ${s3Metadata.localFile}") + //md5File(localFile) + //filter(localHash => options.force || localHash != metadataHash) + } yield s3Metadata.localFile) + } private def performUpload: File => Stream[IO, Promise[Unit]] = file => Stream.eval(for { _ <- putStrLn(s"upload: $file") diff --git a/src/test/scala/net/kemitix/s3thorp/SyncSuite.scala b/src/test/scala/net/kemitix/s3thorp/SyncSuite.scala index cf975fb..df71a3f 100644 --- a/src/test/scala/net/kemitix/s3thorp/SyncSuite.scala +++ b/src/test/scala/net/kemitix/s3thorp/SyncSuite.scala @@ -11,11 +11,11 @@ class SyncSuite extends FunSpec { val hash = "hash" val lastModified = Instant.now() val sync = new Sync(new S3Client { - override def objectHead(bucket: String, key: String) = IO((hash, lastModified)) + override def objectHead(bucket: String, key: String) = IO(Some((hash, lastModified))) }) describe("objectHead") { it("return the hash and lastModified expected") { - assertResult((hash, lastModified))(sync.objectHead("", "").unsafeRunSync()) + assertResult(Some((hash, lastModified)))(sync.objectHead("", "").unsafeRunSync()) } } }