From b87495cfb5048c290b1648bd1bf8a37d217f973b Mon Sep 17 00:00:00 2001 From: Paul Campbell Date: Tue, 7 May 2019 22:27:06 +0100 Subject: [PATCH] Replace use of Path with use of File --- .../scala/net/kemitix/s3thorp/Config.scala | 4 +-- .../net/kemitix/s3thorp/LocalFileStream.scala | 34 ++++++++----------- src/main/scala/net/kemitix/s3thorp/Main.scala | 2 +- .../scala/net/kemitix/s3thorp/ParseArgs.scala | 2 +- .../net/kemitix/s3thorp/S3MetaData.scala | 4 +-- .../kemitix/s3thorp/S3MetaDataEnricher.scala | 13 +++---- src/main/scala/net/kemitix/s3thorp/Sync.scala | 14 ++++---- 7 files changed, 35 insertions(+), 38 deletions(-) diff --git a/src/main/scala/net/kemitix/s3thorp/Config.scala b/src/main/scala/net/kemitix/s3thorp/Config.scala index e1860ea..64610c5 100644 --- a/src/main/scala/net/kemitix/s3thorp/Config.scala +++ b/src/main/scala/net/kemitix/s3thorp/Config.scala @@ -1,10 +1,10 @@ package net.kemitix.s3thorp -import net.kemitix.s3thorp.Sync.{Bucket, LocalPath} +import net.kemitix.s3thorp.Sync.{Bucket, LocalFile} case class Config(bucket: Bucket = "", prefix: String = "", - source: LocalPath + source: LocalFile ) { } diff --git a/src/main/scala/net/kemitix/s3thorp/LocalFileStream.scala b/src/main/scala/net/kemitix/s3thorp/LocalFileStream.scala index 45b1610..5da8b4b 100644 --- a/src/main/scala/net/kemitix/s3thorp/LocalFileStream.scala +++ b/src/main/scala/net/kemitix/s3thorp/LocalFileStream.scala @@ -1,35 +1,29 @@ package net.kemitix.s3thorp -import java.nio.file.{DirectoryStream, Files, Path} +import java.io.File + import fs2.Stream -import scala.collection.JavaConverters._ + import cats.effect.IO trait LocalFileStream { - def streamDirectoryPaths(path: Path): Stream[IO, Path] = + def streamDirectoryPaths(file: File): Stream[IO, File] = { - Stream.eval(IO(path)). - flatMap(openDirectory). + Stream.eval(IO(file)). + flatMap(file => Stream.fromIterator[IO, File](dirPaths(file))). flatMap(recurseIntoSubDirectories) } - private def acquire: Path => IO[DirectoryStream[Path]] = - p => IO(Files.newDirectoryStream(p)) + private def dirPaths(file: File): Iterator[File] = { + Option(file.listFiles).map(_.iterator). + getOrElse(throw new IllegalArgumentException(s"Directory not found $file")) + } - private def release: DirectoryStream[Path] => IO[Unit] = - ds => IO(ds.close()) - - private def openDirectory: Path => Stream[IO, Path] = - p => Stream.bracket(acquire(p))(release). - map(ds => ds.iterator()). - map(ji => ji.asScala). - flatMap(it => Stream.fromIterator[IO, Path](it)) - - private def recurseIntoSubDirectories: Path => Stream[IO, Path] = - p => - if (p.toFile.isDirectory) streamDirectoryPaths(p) - else Stream(p) + private def recurseIntoSubDirectories: File => Stream[IO, File] = + file => + if (file.isDirectory) streamDirectoryPaths(file) + else Stream(file) } diff --git a/src/main/scala/net/kemitix/s3thorp/Main.scala b/src/main/scala/net/kemitix/s3thorp/Main.scala index 5d49d30..c2b9df7 100644 --- a/src/main/scala/net/kemitix/s3thorp/Main.scala +++ b/src/main/scala/net/kemitix/s3thorp/Main.scala @@ -10,7 +10,7 @@ object Main extends IOApp { def putStrLn(value: String) = IO { println(value) } val defaultConfig: Config = - Config("(none)", "", Paths.get(".").toAbsolutePath) + Config("(none)", "", Paths.get(".").toFile) def program(args: List[String]): IO[ExitCode] = for { diff --git a/src/main/scala/net/kemitix/s3thorp/ParseArgs.scala b/src/main/scala/net/kemitix/s3thorp/ParseArgs.scala index 8559d09..ccdbd23 100644 --- a/src/main/scala/net/kemitix/s3thorp/ParseArgs.scala +++ b/src/main/scala/net/kemitix/s3thorp/ParseArgs.scala @@ -15,7 +15,7 @@ object ParseArgs { programName("S3Thorp"), head("s3thorp"), opt[String]('s', "source") - .action((str, c) => c.copy(source = Paths.get(str))) + .action((str, c) => c.copy(source = Paths.get(str).toFile)) .required() .text("Source directory to sync to S3"), opt[String]('b', "bucket") diff --git a/src/main/scala/net/kemitix/s3thorp/S3MetaData.scala b/src/main/scala/net/kemitix/s3thorp/S3MetaData.scala index 2c7e0c6..01ec002 100644 --- a/src/main/scala/net/kemitix/s3thorp/S3MetaData.scala +++ b/src/main/scala/net/kemitix/s3thorp/S3MetaData.scala @@ -1,8 +1,8 @@ package net.kemitix.s3thorp -import net.kemitix.s3thorp.Sync.{Hash, LastModified, LocalPath, RemotePath} +import net.kemitix.s3thorp.Sync.{Hash, LastModified, LocalFile, RemotePath} -case class S3MetaData(localPath: LocalPath, +case class S3MetaData(localFile: LocalFile, remotePath: RemotePath, remoteHash: Hash, remoteLastModified: LastModified) \ No newline at end of file diff --git a/src/main/scala/net/kemitix/s3thorp/S3MetaDataEnricher.scala b/src/main/scala/net/kemitix/s3thorp/S3MetaDataEnricher.scala index 071f26d..09eeec6 100644 --- a/src/main/scala/net/kemitix/s3thorp/S3MetaDataEnricher.scala +++ b/src/main/scala/net/kemitix/s3thorp/S3MetaDataEnricher.scala @@ -1,18 +1,19 @@ package net.kemitix.s3thorp -import java.nio.file.Path +import java.io.File import java.time.Instant + import fs2.Stream import cats.effect.IO import Main.putStrLn trait S3MetaDataEnricher extends S3Client { - def enrichWithS3MetaData: Path => Stream[IO, S3MetaData] = - path => Stream.eval(for { - _ <- putStrLn(s"enrich: $path") - // HEAD(bucket, prefix, relative(path)) + def enrichWithS3MetaData: File => Stream[IO, S3MetaData] = + file => Stream.eval(for { + _ <- putStrLn(s"enrich: $file") + // HEAD(bucket, prefix, relative(file)) // create blank S3MetaData records (sealed trait?) - } yield S3MetaData(path, "", "", Instant.now())) + } yield S3MetaData(file, "", "", Instant.now())) } diff --git a/src/main/scala/net/kemitix/s3thorp/Sync.scala b/src/main/scala/net/kemitix/s3thorp/Sync.scala index 8b7b96e..81fb3f2 100644 --- a/src/main/scala/net/kemitix/s3thorp/Sync.scala +++ b/src/main/scala/net/kemitix/s3thorp/Sync.scala @@ -1,5 +1,6 @@ package net.kemitix.s3thorp +import java.io.File import java.nio.file.Path import java.time.Instant @@ -21,19 +22,20 @@ object Sync extends LocalFileStream with S3MetaDataEnricher { } yield () type Bucket = String // the S3 bucket name - type LocalPath = Path // fully qualified path to a file or directory + type LocalFile = File // the file or directory type RemotePath = String // path within an S3 bucket type Hash = String // an MD5 hash type LastModified = Instant // or scala equivalent - private def uploadRequiredFilter: S3MetaData => Stream[IO, Path] = s3Metadata => Stream.eval(for { - _ <- putStrLn(s"upload required: ${s3Metadata.localPath}") + private def uploadRequiredFilter: S3MetaData => Stream[IO, File] = s3Metadata => Stream.eval(for { + _ <- putStrLn(s"upload required: ${s3Metadata.localFile}") //md5File(localFile) //filter(localHash => options.force || localHash != metadataHash) - } yield s3Metadata.localPath) + } yield s3Metadata.localFile) - private def performUpload: Path => Stream[IO, Promise[Unit]] = path => Stream.eval(for { - _ <- putStrLn(s"upload: $path") + private def performUpload: File => Stream[IO, Promise[Unit]] = + file => Stream.eval(for { + _ <- putStrLn(s"upload: $file") // upload p = Promise[Unit]() } yield p)