Replace use of Path with use of File
This commit is contained in:
parent
04b5d4eded
commit
b87495cfb5
7 changed files with 35 additions and 38 deletions
|
@ -1,10 +1,10 @@
|
||||||
package net.kemitix.s3thorp
|
package net.kemitix.s3thorp
|
||||||
|
|
||||||
import net.kemitix.s3thorp.Sync.{Bucket, LocalPath}
|
import net.kemitix.s3thorp.Sync.{Bucket, LocalFile}
|
||||||
|
|
||||||
case class Config(bucket: Bucket = "",
|
case class Config(bucket: Bucket = "",
|
||||||
prefix: String = "",
|
prefix: String = "",
|
||||||
source: LocalPath
|
source: LocalFile
|
||||||
) {
|
) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,35 +1,29 @@
|
||||||
package net.kemitix.s3thorp
|
package net.kemitix.s3thorp
|
||||||
|
|
||||||
import java.nio.file.{DirectoryStream, Files, Path}
|
import java.io.File
|
||||||
|
|
||||||
import fs2.Stream
|
import fs2.Stream
|
||||||
import scala.collection.JavaConverters._
|
|
||||||
import cats.effect.IO
|
import cats.effect.IO
|
||||||
|
|
||||||
trait LocalFileStream {
|
trait LocalFileStream {
|
||||||
|
|
||||||
def streamDirectoryPaths(path: Path): Stream[IO, Path] =
|
def streamDirectoryPaths(file: File): Stream[IO, File] =
|
||||||
{
|
{
|
||||||
Stream.eval(IO(path)).
|
Stream.eval(IO(file)).
|
||||||
flatMap(openDirectory).
|
flatMap(file => Stream.fromIterator[IO, File](dirPaths(file))).
|
||||||
flatMap(recurseIntoSubDirectories)
|
flatMap(recurseIntoSubDirectories)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def acquire: Path => IO[DirectoryStream[Path]] =
|
private def dirPaths(file: File): Iterator[File] = {
|
||||||
p => IO(Files.newDirectoryStream(p))
|
Option(file.listFiles).map(_.iterator).
|
||||||
|
getOrElse(throw new IllegalArgumentException(s"Directory not found $file"))
|
||||||
|
}
|
||||||
|
|
||||||
private def release: DirectoryStream[Path] => IO[Unit] =
|
private def recurseIntoSubDirectories: File => Stream[IO, File] =
|
||||||
ds => IO(ds.close())
|
file =>
|
||||||
|
if (file.isDirectory) streamDirectoryPaths(file)
|
||||||
private def openDirectory: Path => Stream[IO, Path] =
|
else Stream(file)
|
||||||
p => Stream.bracket(acquire(p))(release).
|
|
||||||
map(ds => ds.iterator()).
|
|
||||||
map(ji => ji.asScala).
|
|
||||||
flatMap(it => Stream.fromIterator[IO, Path](it))
|
|
||||||
|
|
||||||
private def recurseIntoSubDirectories: Path => Stream[IO, Path] =
|
|
||||||
p =>
|
|
||||||
if (p.toFile.isDirectory) streamDirectoryPaths(p)
|
|
||||||
else Stream(p)
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,7 +10,7 @@ object Main extends IOApp {
|
||||||
def putStrLn(value: String) = IO { println(value) }
|
def putStrLn(value: String) = IO { println(value) }
|
||||||
|
|
||||||
val defaultConfig: Config =
|
val defaultConfig: Config =
|
||||||
Config("(none)", "", Paths.get(".").toAbsolutePath)
|
Config("(none)", "", Paths.get(".").toFile)
|
||||||
|
|
||||||
def program(args: List[String]): IO[ExitCode] =
|
def program(args: List[String]): IO[ExitCode] =
|
||||||
for {
|
for {
|
||||||
|
|
|
@ -15,7 +15,7 @@ object ParseArgs {
|
||||||
programName("S3Thorp"),
|
programName("S3Thorp"),
|
||||||
head("s3thorp"),
|
head("s3thorp"),
|
||||||
opt[String]('s', "source")
|
opt[String]('s', "source")
|
||||||
.action((str, c) => c.copy(source = Paths.get(str)))
|
.action((str, c) => c.copy(source = Paths.get(str).toFile))
|
||||||
.required()
|
.required()
|
||||||
.text("Source directory to sync to S3"),
|
.text("Source directory to sync to S3"),
|
||||||
opt[String]('b', "bucket")
|
opt[String]('b', "bucket")
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
package net.kemitix.s3thorp
|
package net.kemitix.s3thorp
|
||||||
|
|
||||||
import net.kemitix.s3thorp.Sync.{Hash, LastModified, LocalPath, RemotePath}
|
import net.kemitix.s3thorp.Sync.{Hash, LastModified, LocalFile, RemotePath}
|
||||||
|
|
||||||
case class S3MetaData(localPath: LocalPath,
|
case class S3MetaData(localFile: LocalFile,
|
||||||
remotePath: RemotePath,
|
remotePath: RemotePath,
|
||||||
remoteHash: Hash,
|
remoteHash: Hash,
|
||||||
remoteLastModified: LastModified)
|
remoteLastModified: LastModified)
|
|
@ -1,18 +1,19 @@
|
||||||
package net.kemitix.s3thorp
|
package net.kemitix.s3thorp
|
||||||
|
|
||||||
import java.nio.file.Path
|
import java.io.File
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
|
|
||||||
import fs2.Stream
|
import fs2.Stream
|
||||||
import cats.effect.IO
|
import cats.effect.IO
|
||||||
import Main.putStrLn
|
import Main.putStrLn
|
||||||
|
|
||||||
trait S3MetaDataEnricher extends S3Client {
|
trait S3MetaDataEnricher extends S3Client {
|
||||||
|
|
||||||
def enrichWithS3MetaData: Path => Stream[IO, S3MetaData] =
|
def enrichWithS3MetaData: File => Stream[IO, S3MetaData] =
|
||||||
path => Stream.eval(for {
|
file => Stream.eval(for {
|
||||||
_ <- putStrLn(s"enrich: $path")
|
_ <- putStrLn(s"enrich: $file")
|
||||||
// HEAD(bucket, prefix, relative(path))
|
// HEAD(bucket, prefix, relative(file))
|
||||||
// create blank S3MetaData records (sealed trait?)
|
// create blank S3MetaData records (sealed trait?)
|
||||||
} yield S3MetaData(path, "", "", Instant.now()))
|
} yield S3MetaData(file, "", "", Instant.now()))
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package net.kemitix.s3thorp
|
package net.kemitix.s3thorp
|
||||||
|
|
||||||
|
import java.io.File
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
|
|
||||||
|
@ -21,19 +22,20 @@ object Sync extends LocalFileStream with S3MetaDataEnricher {
|
||||||
} yield ()
|
} yield ()
|
||||||
|
|
||||||
type Bucket = String // the S3 bucket name
|
type Bucket = String // the S3 bucket name
|
||||||
type LocalPath = Path // fully qualified path to a file or directory
|
type LocalFile = File // the file or directory
|
||||||
type RemotePath = String // path within an S3 bucket
|
type RemotePath = String // path within an S3 bucket
|
||||||
type Hash = String // an MD5 hash
|
type Hash = String // an MD5 hash
|
||||||
type LastModified = Instant // or scala equivalent
|
type LastModified = Instant // or scala equivalent
|
||||||
|
|
||||||
private def uploadRequiredFilter: S3MetaData => Stream[IO, Path] = s3Metadata => Stream.eval(for {
|
private def uploadRequiredFilter: S3MetaData => Stream[IO, File] = s3Metadata => Stream.eval(for {
|
||||||
_ <- putStrLn(s"upload required: ${s3Metadata.localPath}")
|
_ <- putStrLn(s"upload required: ${s3Metadata.localFile}")
|
||||||
//md5File(localFile)
|
//md5File(localFile)
|
||||||
//filter(localHash => options.force || localHash != metadataHash)
|
//filter(localHash => options.force || localHash != metadataHash)
|
||||||
} yield s3Metadata.localPath)
|
} yield s3Metadata.localFile)
|
||||||
|
|
||||||
private def performUpload: Path => Stream[IO, Promise[Unit]] = path => Stream.eval(for {
|
private def performUpload: File => Stream[IO, Promise[Unit]] =
|
||||||
_ <- putStrLn(s"upload: $path")
|
file => Stream.eval(for {
|
||||||
|
_ <- putStrLn(s"upload: $file")
|
||||||
// upload
|
// upload
|
||||||
p = Promise[Unit]()
|
p = Promise[Unit]()
|
||||||
} yield p)
|
} yield p)
|
||||||
|
|
Loading…
Reference in a new issue