[LocalFileStream] extract as trait
This commit is contained in:
parent
38edf68f43
commit
326fd7d4cc
2 changed files with 37 additions and 26 deletions
35
src/main/scala/net/kemitix/s3thorp/LocalFileStream.scala
Normal file
35
src/main/scala/net/kemitix/s3thorp/LocalFileStream.scala
Normal file
|
@ -0,0 +1,35 @@
|
||||||
|
package net.kemitix.s3thorp
|
||||||
|
|
||||||
|
import java.nio.file.{DirectoryStream, Files, Path}
|
||||||
|
import fs2.Stream
|
||||||
|
import scala.collection.JavaConverters._
|
||||||
|
import cats.effect.IO
|
||||||
|
|
||||||
|
trait LocalFileStream {
|
||||||
|
|
||||||
|
def streamDirectoryPaths(path: Path): Stream[IO, Path] =
|
||||||
|
{
|
||||||
|
Stream.eval(IO(path)).
|
||||||
|
flatMap(openDirectory).
|
||||||
|
flatMap(recurseIntoSubDirectories)
|
||||||
|
}
|
||||||
|
|
||||||
|
private def acquire: Path => IO[DirectoryStream[Path]] =
|
||||||
|
p => IO(Files.newDirectoryStream(p))
|
||||||
|
|
||||||
|
private def release: DirectoryStream[Path] => IO[Unit] =
|
||||||
|
ds => IO(ds.close())
|
||||||
|
|
||||||
|
private def openDirectory: Path => Stream[IO, Path] =
|
||||||
|
p => Stream.bracket(acquire(p))(release).
|
||||||
|
map(ds => ds.iterator()).
|
||||||
|
map(ji => ji.asScala).
|
||||||
|
flatMap(it => Stream.fromIterator[IO, Path](it))
|
||||||
|
|
||||||
|
private def recurseIntoSubDirectories: Path => Stream[IO, Path] =
|
||||||
|
p =>
|
||||||
|
if (p.toFile.isDirectory) streamDirectoryPaths(p)
|
||||||
|
else Stream(p)
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -1,6 +1,6 @@
|
||||||
package net.kemitix.s3thorp
|
package net.kemitix.s3thorp
|
||||||
|
|
||||||
import java.nio.file.{DirectoryStream, Files, Path, Paths}
|
import java.nio.file.{Path, Paths}
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
|
|
||||||
import cats.effect._
|
import cats.effect._
|
||||||
|
@ -10,7 +10,7 @@ import net.kemitix.s3thorp.Main.putStrLn
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
import scala.concurrent.Promise
|
import scala.concurrent.Promise
|
||||||
|
|
||||||
object Sync {
|
object Sync extends LocalFileStream {
|
||||||
def apply(c: Config): IO[Unit] = for {
|
def apply(c: Config): IO[Unit] = for {
|
||||||
_ <- putStrLn(s"Bucket: ${c.bucket}, Prefix: ${c.prefix}, Source: ${c.source}")
|
_ <- putStrLn(s"Bucket: ${c.bucket}, Prefix: ${c.prefix}, Source: ${c.source}")
|
||||||
_ <- {
|
_ <- {
|
||||||
|
@ -21,30 +21,6 @@ object Sync {
|
||||||
}
|
}
|
||||||
} yield ()
|
} yield ()
|
||||||
|
|
||||||
private def streamDirectoryPaths(path: Path): Stream[IO, Path] = {
|
|
||||||
|
|
||||||
def acquire: Path => IO[DirectoryStream[Path]] =
|
|
||||||
p => IO(Files.newDirectoryStream(p))
|
|
||||||
|
|
||||||
def release: DirectoryStream[Path] => IO[Unit] =
|
|
||||||
ds => IO(ds.close())
|
|
||||||
|
|
||||||
def openDirectory: Path => Stream[IO, Path] =
|
|
||||||
p => Stream.bracket(acquire(p))(release).
|
|
||||||
map(ds => ds.iterator()).
|
|
||||||
map(ji => ji.asScala).
|
|
||||||
flatMap(it => Stream.fromIterator[IO, Path](it))
|
|
||||||
|
|
||||||
def recurseIntoSubDirectories: Path => Stream[IO, Path] =
|
|
||||||
p =>
|
|
||||||
if (p.toFile.isDirectory) streamDirectoryPaths(p)
|
|
||||||
else Stream(p)
|
|
||||||
|
|
||||||
Stream.eval(IO(path)).
|
|
||||||
flatMap(openDirectory).
|
|
||||||
flatMap(recurseIntoSubDirectories)
|
|
||||||
}
|
|
||||||
|
|
||||||
type LocalPath = Path
|
type LocalPath = Path
|
||||||
type RemotePath = String
|
type RemotePath = String
|
||||||
type Hash = String // an MD5 hash
|
type Hash = String // an MD5 hash
|
||||||
|
|
Loading…
Reference in a new issue