From 326fd7d4cc2e25cbc2213bd24e84deb9519bb8e1 Mon Sep 17 00:00:00 2001 From: Paul Campbell Date: Tue, 7 May 2019 17:16:44 +0100 Subject: [PATCH] [LocalFileStream] extract as trait --- .../net/kemitix/s3thorp/LocalFileStream.scala | 35 +++++++++++++++++++ src/main/scala/net/kemitix/s3thorp/Sync.scala | 28 ++------------- 2 files changed, 37 insertions(+), 26 deletions(-) create mode 100644 src/main/scala/net/kemitix/s3thorp/LocalFileStream.scala diff --git a/src/main/scala/net/kemitix/s3thorp/LocalFileStream.scala b/src/main/scala/net/kemitix/s3thorp/LocalFileStream.scala new file mode 100644 index 0000000..45b1610 --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/LocalFileStream.scala @@ -0,0 +1,35 @@ +package net.kemitix.s3thorp + +import java.nio.file.{DirectoryStream, Files, Path} +import fs2.Stream +import scala.collection.JavaConverters._ +import cats.effect.IO + +trait LocalFileStream { + + def streamDirectoryPaths(path: Path): Stream[IO, Path] = + { + Stream.eval(IO(path)). + flatMap(openDirectory). + flatMap(recurseIntoSubDirectories) + } + + private def acquire: Path => IO[DirectoryStream[Path]] = + p => IO(Files.newDirectoryStream(p)) + + private def release: DirectoryStream[Path] => IO[Unit] = + ds => IO(ds.close()) + + private def openDirectory: Path => Stream[IO, Path] = + p => Stream.bracket(acquire(p))(release). + map(ds => ds.iterator()). + map(ji => ji.asScala). + flatMap(it => Stream.fromIterator[IO, Path](it)) + + private def recurseIntoSubDirectories: Path => Stream[IO, Path] = + p => + if (p.toFile.isDirectory) streamDirectoryPaths(p) + else Stream(p) + + +} diff --git a/src/main/scala/net/kemitix/s3thorp/Sync.scala b/src/main/scala/net/kemitix/s3thorp/Sync.scala index c1668d7..574cc2e 100644 --- a/src/main/scala/net/kemitix/s3thorp/Sync.scala +++ b/src/main/scala/net/kemitix/s3thorp/Sync.scala @@ -1,6 +1,6 @@ package net.kemitix.s3thorp -import java.nio.file.{DirectoryStream, Files, Path, Paths} +import java.nio.file.{Path, Paths} import java.time.Instant import cats.effect._ @@ -10,7 +10,7 @@ import net.kemitix.s3thorp.Main.putStrLn import scala.collection.JavaConverters._ import scala.concurrent.Promise -object Sync { +object Sync extends LocalFileStream { def apply(c: Config): IO[Unit] = for { _ <- putStrLn(s"Bucket: ${c.bucket}, Prefix: ${c.prefix}, Source: ${c.source}") _ <- { @@ -21,30 +21,6 @@ object Sync { } } yield () - private def streamDirectoryPaths(path: Path): Stream[IO, Path] = { - - def acquire: Path => IO[DirectoryStream[Path]] = - p => IO(Files.newDirectoryStream(p)) - - def release: DirectoryStream[Path] => IO[Unit] = - ds => IO(ds.close()) - - def openDirectory: Path => Stream[IO, Path] = - p => Stream.bracket(acquire(p))(release). - map(ds => ds.iterator()). - map(ji => ji.asScala). - flatMap(it => Stream.fromIterator[IO, Path](it)) - - def recurseIntoSubDirectories: Path => Stream[IO, Path] = - p => - if (p.toFile.isDirectory) streamDirectoryPaths(p) - else Stream(p) - - Stream.eval(IO(path)). - flatMap(openDirectory). - flatMap(recurseIntoSubDirectories) - } - type LocalPath = Path type RemotePath = String type Hash = String // an MD5 hash