From 2153f109aa1ea5e34178f640d3baf2d06293139f Mon Sep 17 00:00:00 2001 From: Paul Campbell Date: Mon, 6 May 2019 22:07:23 +0100 Subject: [PATCH] [sync] sketch out rest of sync process --- src/main/scala/net/kemitix/s3thorp/Sync.scala | 37 ++++++++++++++----- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/src/main/scala/net/kemitix/s3thorp/Sync.scala b/src/main/scala/net/kemitix/s3thorp/Sync.scala index 4d604c7..ee29d1f 100644 --- a/src/main/scala/net/kemitix/s3thorp/Sync.scala +++ b/src/main/scala/net/kemitix/s3thorp/Sync.scala @@ -2,21 +2,21 @@ package net.kemitix.s3thorp import java.nio.file.{DirectoryStream, Files, Path, Paths} -import scala.collection.JavaConverters._ -import fs2.Stream import cats.effect._ -import Main.putStrLn +import fs2.Stream +import net.kemitix.s3thorp.Main.putStrLn + +import scala.collection.JavaConverters._ +import scala.concurrent.Promise object Sync { def apply(c: Config): IO[Unit] = for { _ <- putStrLn(s"Bucket: ${c.bucket}, Prefix: ${c.prefix}, Source: ${c.source}") _ <- { - - // a stream of files in the source directory - val pathStream: Stream[IO, Path] = streamDirectoryPaths(Paths.get(c.source)) - - - IO.unit + streamDirectoryPaths(Paths.get(c.source)).flatMap( + enrichWithS3MetaData).flatMap( + uploadRequiredFilter).flatMap( + performUpload).compile.drain } } yield () @@ -43,4 +43,23 @@ object Sync { flatMap(openDirectory). flatMap(recurseIntoSubDirectories) } + + case class S3MetaData() + + private def enrichWithS3MetaData: Path => Stream[IO, S3MetaData] = path => { + // HEAD(bucket, prefix, relative(path)) + // create blank S3MetaData records (sealed trait?) + } + + private def uploadRequiredFilter: S3MetaData => Stream[IO, Path] = + s3Metadata => { + //md5File(localFile) + //filter(localHash => options.force || localHash != metadataHash) + } + + private def performUpload: Path => Stream[IO, Promise[Unit]] = + path => { + // upload + IO.unit + } }