Convert Config to full ZIO effect module (#134)
* [config] new module * [config] stub module * [domain] Rename domain.Config as domain.LegacyConfig * [config] Move LegacyConfig to config module * [config] Move config parsing and validation into module * [config] Complete migration to module for Config * [config] Config You should not name methods after their defining object * [config] Rename LegacyConfig as Configuration Also remove redundant uses * [core] LocalFileStream Refactoring * [changelog] update
This commit is contained in:
parent
96a83e6c3e
commit
ccefd286f9
43 changed files with 517 additions and 327 deletions
|
@ -20,6 +20,7 @@ The format is based on [[https://keepachangelog.com/en/1.0.0/][Keep a Changelog]
|
||||||
- [internal] Replace Monocle with local SimpleLens implementation (#121)
|
- [internal] Replace Monocle with local SimpleLens implementation (#121)
|
||||||
- [internal] Don't use String as key in Map for hashes (#124)
|
- [internal] Don't use String as key in Map for hashes (#124)
|
||||||
- [internal] Convert Storage to full ZIO effect module (#133)
|
- [internal] Convert Storage to full ZIO effect module (#133)
|
||||||
|
- [internal] Convert Config to full ZIO effect module (#134)
|
||||||
|
|
||||||
** Dependencies
|
** Dependencies
|
||||||
|
|
||||||
|
|
11
build.sbt
11
build.sbt
|
@ -58,6 +58,7 @@ val zioDependencies = Seq(
|
||||||
)
|
)
|
||||||
|
|
||||||
// cli -> thorp-lib -> storage-aws -> core -> storage-api -> console -> domain
|
// cli -> thorp-lib -> storage-aws -> core -> storage-api -> console -> domain
|
||||||
|
// core -> config -> domain
|
||||||
|
|
||||||
lazy val thorp = (project in file("."))
|
lazy val thorp = (project in file("."))
|
||||||
.settings(commonSettings)
|
.settings(commonSettings)
|
||||||
|
@ -67,7 +68,6 @@ lazy val cli = (project in file("cli"))
|
||||||
.settings(commonSettings)
|
.settings(commonSettings)
|
||||||
.settings(mainClass in assembly := Some("net.kemitix.thorp.cli.Main"))
|
.settings(mainClass in assembly := Some("net.kemitix.thorp.cli.Main"))
|
||||||
.settings(applicationSettings)
|
.settings(applicationSettings)
|
||||||
.settings(commandLineParsing)
|
|
||||||
.settings(testDependencies)
|
.settings(testDependencies)
|
||||||
.enablePlugins(BuildInfoPlugin)
|
.enablePlugins(BuildInfoPlugin)
|
||||||
.settings(
|
.settings(
|
||||||
|
@ -101,6 +101,7 @@ lazy val core = (project in file("core"))
|
||||||
.settings(testDependencies)
|
.settings(testDependencies)
|
||||||
.dependsOn(`storage-api`)
|
.dependsOn(`storage-api`)
|
||||||
.dependsOn(domain % "compile->compile;test->test")
|
.dependsOn(domain % "compile->compile;test->test")
|
||||||
|
.dependsOn(config)
|
||||||
|
|
||||||
lazy val `storage-api` = (project in file("storage-api"))
|
lazy val `storage-api` = (project in file("storage-api"))
|
||||||
.settings(commonSettings)
|
.settings(commonSettings)
|
||||||
|
@ -114,6 +115,14 @@ lazy val console = (project in file("console"))
|
||||||
.settings(assemblyJarName in assembly := "console.jar")
|
.settings(assemblyJarName in assembly := "console.jar")
|
||||||
.dependsOn(domain)
|
.dependsOn(domain)
|
||||||
|
|
||||||
|
lazy val config = (project in file("config"))
|
||||||
|
.settings(commonSettings)
|
||||||
|
.settings(zioDependencies)
|
||||||
|
.settings(testDependencies)
|
||||||
|
.settings(commandLineParsing)
|
||||||
|
.settings(assemblyJarName in assembly := "config.jar")
|
||||||
|
.dependsOn(domain)
|
||||||
|
|
||||||
lazy val domain = (project in file("domain"))
|
lazy val domain = (project in file("domain"))
|
||||||
.settings(commonSettings)
|
.settings(commonSettings)
|
||||||
.settings(assemblyJarName in assembly := "domain.jar")
|
.settings(assemblyJarName in assembly := "domain.jar")
|
||||||
|
|
|
@ -1,12 +1,13 @@
|
||||||
package net.kemitix.thorp.cli
|
package net.kemitix.thorp.cli
|
||||||
|
|
||||||
|
import net.kemitix.thorp.config.Config
|
||||||
import net.kemitix.thorp.console.Console
|
import net.kemitix.thorp.console.Console
|
||||||
import net.kemitix.thorp.storage.aws.S3Storage
|
import net.kemitix.thorp.storage.aws.S3Storage
|
||||||
import zio.{App, ZIO}
|
import zio.{App, ZIO}
|
||||||
|
|
||||||
object Main extends App {
|
object Main extends App {
|
||||||
|
|
||||||
object LiveThorpApp extends S3Storage.Live with Console.Live
|
object LiveThorpApp extends S3Storage.Live with Console.Live with Config.Live
|
||||||
|
|
||||||
override def run(args: List[String]): ZIO[Environment, Nothing, Int] =
|
override def run(args: List[String]): ZIO[Environment, Nothing, Int] =
|
||||||
Program
|
Program
|
||||||
|
|
|
@ -1,11 +1,12 @@
|
||||||
package net.kemitix.thorp.cli
|
package net.kemitix.thorp.cli
|
||||||
|
|
||||||
|
import net.kemitix.thorp.config._
|
||||||
import net.kemitix.thorp.console._
|
import net.kemitix.thorp.console._
|
||||||
import net.kemitix.thorp.core.CoreTypes.CoreProgram
|
import net.kemitix.thorp.core.CoreTypes.CoreProgram
|
||||||
import net.kemitix.thorp.core._
|
import net.kemitix.thorp.core._
|
||||||
import net.kemitix.thorp.domain.StorageQueueEvent
|
import net.kemitix.thorp.domain.StorageQueueEvent
|
||||||
import net.kemitix.thorp.storage.aws.S3HashService.defaultHashService
|
import net.kemitix.thorp.storage.aws.S3HashService.defaultHashService
|
||||||
import zio.{UIO, ZIO}
|
import zio.ZIO
|
||||||
|
|
||||||
trait Program {
|
trait Program {
|
||||||
|
|
||||||
|
@ -14,27 +15,26 @@ trait Program {
|
||||||
def run(args: List[String]): CoreProgram[Unit] = {
|
def run(args: List[String]): CoreProgram[Unit] = {
|
||||||
for {
|
for {
|
||||||
cli <- CliArgs.parse(args)
|
cli <- CliArgs.parse(args)
|
||||||
|
config <- ConfigurationBuilder.buildConfig(cli)
|
||||||
|
_ <- setConfiguration(config)
|
||||||
_ <- ZIO.when(showVersion(cli))(putStrLn(version))
|
_ <- ZIO.when(showVersion(cli))(putStrLn(version))
|
||||||
_ <- ZIO.when(!showVersion(cli))(execute(cli).catchAll(handleErrors))
|
_ <- ZIO.when(!showVersion(cli))(execute.catchAll(handleErrors))
|
||||||
} yield ()
|
} yield ()
|
||||||
}
|
}
|
||||||
|
|
||||||
private def showVersion: ConfigOptions => Boolean =
|
private def showVersion: ConfigOptions => Boolean =
|
||||||
cli => ConfigQuery.showVersion(cli)
|
cli => ConfigQuery.showVersion(cli)
|
||||||
|
|
||||||
private def execute(cliOptions: ConfigOptions) = {
|
private def execute = {
|
||||||
for {
|
for {
|
||||||
plan <- PlanBuilder.createPlan(defaultHashService, cliOptions)
|
plan <- PlanBuilder.createPlan(defaultHashService)
|
||||||
batchMode <- isBatchMode(cliOptions)
|
batchMode <- isBatchMode
|
||||||
archive <- UnversionedMirrorArchive.default(batchMode, plan.syncTotals)
|
archive <- UnversionedMirrorArchive.default(batchMode, plan.syncTotals)
|
||||||
events <- applyPlan(archive, plan)
|
events <- applyPlan(archive, plan)
|
||||||
_ <- SyncLogging.logRunFinished(events)
|
_ <- SyncLogging.logRunFinished(events)
|
||||||
} yield ()
|
} yield ()
|
||||||
}
|
}
|
||||||
|
|
||||||
private def isBatchMode(cliOptions: ConfigOptions) =
|
|
||||||
UIO(ConfigQuery.batchMode(cliOptions))
|
|
||||||
|
|
||||||
private def handleErrors(throwable: Throwable) =
|
private def handleErrors(throwable: Throwable) =
|
||||||
for {
|
for {
|
||||||
_ <- putStrLn("There were errors:")
|
_ <- putStrLn("There were errors:")
|
||||||
|
|
|
@ -1,8 +1,7 @@
|
||||||
package net.kemitix.thorp.cli
|
package net.kemitix.thorp.config
|
||||||
|
|
||||||
import java.nio.file.Paths
|
import java.nio.file.Paths
|
||||||
|
|
||||||
import net.kemitix.thorp.core.{ConfigOption, ConfigOptions}
|
|
||||||
import scopt.OParser
|
import scopt.OParser
|
||||||
import zio.Task
|
import zio.Task
|
||||||
|
|
50
config/src/main/scala/net/kemitix/thorp/config/Config.scala
Normal file
50
config/src/main/scala/net/kemitix/thorp/config/Config.scala
Normal file
|
@ -0,0 +1,50 @@
|
||||||
|
package net.kemitix.thorp.config
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
|
|
||||||
|
import net.kemitix.thorp.domain.{Bucket, Filter, RemoteKey, Sources}
|
||||||
|
import zio.{UIO, ZIO}
|
||||||
|
|
||||||
|
trait Config {
|
||||||
|
val config: Config.Service
|
||||||
|
}
|
||||||
|
|
||||||
|
object Config {
|
||||||
|
|
||||||
|
trait Service {
|
||||||
|
def setConfiguration(config: Configuration): ZIO[Config, Nothing, Unit]
|
||||||
|
def isBatchMode: ZIO[Config, Nothing, Boolean]
|
||||||
|
def bucket: ZIO[Config, Nothing, Bucket]
|
||||||
|
def prefix: ZIO[Config, Nothing, RemoteKey]
|
||||||
|
def sources: ZIO[Config, Nothing, Sources]
|
||||||
|
def filters: ZIO[Config, Nothing, List[Filter]]
|
||||||
|
}
|
||||||
|
|
||||||
|
trait Live extends Config {
|
||||||
|
|
||||||
|
val config: Service = new Service {
|
||||||
|
private val configRef = new AtomicReference(Configuration())
|
||||||
|
override def setConfiguration(
|
||||||
|
config: Configuration): ZIO[Config, Nothing, Unit] =
|
||||||
|
UIO(configRef.set(config))
|
||||||
|
|
||||||
|
override def bucket: ZIO[Config, Nothing, Bucket] =
|
||||||
|
UIO(configRef.get).map(_.bucket)
|
||||||
|
|
||||||
|
override def sources: ZIO[Config, Nothing, Sources] =
|
||||||
|
UIO(configRef.get).map(_.sources)
|
||||||
|
|
||||||
|
override def prefix: ZIO[Config, Nothing, RemoteKey] =
|
||||||
|
UIO(configRef.get).map(_.prefix)
|
||||||
|
|
||||||
|
override def isBatchMode: ZIO[Config, Nothing, Boolean] =
|
||||||
|
UIO(configRef.get).map(_.batchMode)
|
||||||
|
|
||||||
|
override def filters: ZIO[Config, Nothing, List[Filter]] =
|
||||||
|
UIO(configRef.get).map(_.filters)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
object Live extends Live
|
||||||
|
|
||||||
|
}
|
|
@ -1,24 +1,24 @@
|
||||||
package net.kemitix.thorp.core
|
package net.kemitix.thorp.config
|
||||||
|
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
|
|
||||||
|
import net.kemitix.thorp.config.Configuration._
|
||||||
import net.kemitix.thorp.domain
|
import net.kemitix.thorp.domain
|
||||||
import net.kemitix.thorp.domain.{Config, RemoteKey}
|
import net.kemitix.thorp.domain.RemoteKey
|
||||||
import net.kemitix.thorp.domain.Config._
|
|
||||||
|
|
||||||
sealed trait ConfigOption {
|
sealed trait ConfigOption {
|
||||||
def update(config: Config): Config
|
def update(config: Configuration): Configuration
|
||||||
}
|
}
|
||||||
|
|
||||||
object ConfigOption {
|
object ConfigOption {
|
||||||
|
|
||||||
case class Source(path: Path) extends ConfigOption {
|
case class Source(path: Path) extends ConfigOption {
|
||||||
override def update(config: Config): Config =
|
override def update(config: Configuration): Configuration =
|
||||||
sources.modify(_ ++ path)(config)
|
sources.modify(_ ++ path)(config)
|
||||||
}
|
}
|
||||||
|
|
||||||
case class Bucket(name: String) extends ConfigOption {
|
case class Bucket(name: String) extends ConfigOption {
|
||||||
override def update(config: Config): Config =
|
override def update(config: Configuration): Configuration =
|
||||||
if (config.bucket.name.isEmpty)
|
if (config.bucket.name.isEmpty)
|
||||||
bucket.set(domain.Bucket(name))(config)
|
bucket.set(domain.Bucket(name))(config)
|
||||||
else
|
else
|
||||||
|
@ -26,7 +26,7 @@ object ConfigOption {
|
||||||
}
|
}
|
||||||
|
|
||||||
case class Prefix(path: String) extends ConfigOption {
|
case class Prefix(path: String) extends ConfigOption {
|
||||||
override def update(config: Config): Config =
|
override def update(config: Configuration): Configuration =
|
||||||
if (config.prefix.key.isEmpty)
|
if (config.prefix.key.isEmpty)
|
||||||
prefix.set(RemoteKey(path))(config)
|
prefix.set(RemoteKey(path))(config)
|
||||||
else
|
else
|
||||||
|
@ -34,35 +34,35 @@ object ConfigOption {
|
||||||
}
|
}
|
||||||
|
|
||||||
case class Include(pattern: String) extends ConfigOption {
|
case class Include(pattern: String) extends ConfigOption {
|
||||||
override def update(config: Config): Config =
|
override def update(config: Configuration): Configuration =
|
||||||
filters.modify(domain.Filter.Include(pattern) :: _)(config)
|
filters.modify(domain.Filter.Include(pattern) :: _)(config)
|
||||||
}
|
}
|
||||||
|
|
||||||
case class Exclude(pattern: String) extends ConfigOption {
|
case class Exclude(pattern: String) extends ConfigOption {
|
||||||
override def update(config: Config): Config =
|
override def update(config: Configuration): Configuration =
|
||||||
filters.modify(domain.Filter.Exclude(pattern) :: _)(config)
|
filters.modify(domain.Filter.Exclude(pattern) :: _)(config)
|
||||||
}
|
}
|
||||||
|
|
||||||
case class Debug() extends ConfigOption {
|
case class Debug() extends ConfigOption {
|
||||||
override def update(config: Config): Config =
|
override def update(config: Configuration): Configuration =
|
||||||
debug.set(true)(config)
|
debug.set(true)(config)
|
||||||
}
|
}
|
||||||
|
|
||||||
case object Version extends ConfigOption {
|
case object Version extends ConfigOption {
|
||||||
override def update(config: Config): Config = config
|
override def update(config: Configuration): Configuration = config
|
||||||
}
|
}
|
||||||
|
|
||||||
case object BatchMode extends ConfigOption {
|
case object BatchMode extends ConfigOption {
|
||||||
override def update(config: Config): Config =
|
override def update(config: Configuration): Configuration =
|
||||||
batchMode.set(true)(config)
|
batchMode.set(true)(config)
|
||||||
}
|
}
|
||||||
|
|
||||||
case object IgnoreUserOptions extends ConfigOption {
|
case object IgnoreUserOptions extends ConfigOption {
|
||||||
override def update(config: Config): Config = config
|
override def update(config: Configuration): Configuration = config
|
||||||
}
|
}
|
||||||
|
|
||||||
case object IgnoreGlobalOptions extends ConfigOption {
|
case object IgnoreGlobalOptions extends ConfigOption {
|
||||||
override def update(config: Config): Config = config
|
override def update(config: Configuration): Configuration = config
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package net.kemitix.thorp.core
|
package net.kemitix.thorp.config
|
||||||
|
|
||||||
import net.kemitix.thorp.domain.SimpleLens
|
import net.kemitix.thorp.domain.SimpleLens
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package net.kemitix.thorp.core
|
package net.kemitix.thorp.config
|
||||||
|
|
||||||
import java.nio.file.Paths
|
import java.nio.file.Paths
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package net.kemitix.thorp.core
|
package net.kemitix.thorp.config
|
||||||
|
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package net.kemitix.thorp.core
|
package net.kemitix.thorp.config
|
||||||
|
|
||||||
final case class ConfigValidationException(
|
final case class ConfigValidationException(
|
||||||
errors: List[ConfigValidation]
|
errors: List[ConfigValidation]
|
|
@ -1,15 +1,15 @@
|
||||||
package net.kemitix.thorp.core
|
package net.kemitix.thorp.config
|
||||||
|
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
|
|
||||||
import net.kemitix.thorp.domain.{Bucket, Config, Sources}
|
import net.kemitix.thorp.domain.{Bucket, Sources}
|
||||||
import zio.IO
|
import zio.IO
|
||||||
|
|
||||||
sealed trait ConfigValidator {
|
sealed trait ConfigValidator {
|
||||||
|
|
||||||
def validateConfig(
|
def validateConfig(
|
||||||
config: Config
|
config: Configuration
|
||||||
): IO[List[ConfigValidation], Config] = IO.fromEither {
|
): IO[List[ConfigValidation], Configuration] = IO.fromEither {
|
||||||
for {
|
for {
|
||||||
_ <- validateSources(config.sources)
|
_ <- validateSources(config.sources)
|
||||||
_ <- validateBucket(config.bucket)
|
_ <- validateBucket(config.bucket)
|
|
@ -0,0 +1,29 @@
|
||||||
|
package net.kemitix.thorp.config
|
||||||
|
|
||||||
|
import net.kemitix.thorp.domain.{Bucket, Filter, RemoteKey, SimpleLens, Sources}
|
||||||
|
|
||||||
|
private[config] final case class Configuration(
|
||||||
|
bucket: Bucket = Bucket(""),
|
||||||
|
prefix: RemoteKey = RemoteKey(""),
|
||||||
|
filters: List[Filter] = List(),
|
||||||
|
debug: Boolean = false,
|
||||||
|
batchMode: Boolean = false,
|
||||||
|
sources: Sources = Sources(List())
|
||||||
|
)
|
||||||
|
|
||||||
|
private[config] object Configuration {
|
||||||
|
val sources: SimpleLens[Configuration, Sources] =
|
||||||
|
SimpleLens[Configuration, Sources](_.sources, b => a => b.copy(sources = a))
|
||||||
|
val bucket: SimpleLens[Configuration, Bucket] =
|
||||||
|
SimpleLens[Configuration, Bucket](_.bucket, b => a => b.copy(bucket = a))
|
||||||
|
val prefix: SimpleLens[Configuration, RemoteKey] =
|
||||||
|
SimpleLens[Configuration, RemoteKey](_.prefix, b => a => b.copy(prefix = a))
|
||||||
|
val filters: SimpleLens[Configuration, List[Filter]] =
|
||||||
|
SimpleLens[Configuration, List[Filter]](_.filters,
|
||||||
|
b => a => b.copy(filters = a))
|
||||||
|
val debug: SimpleLens[Configuration, Boolean] =
|
||||||
|
SimpleLens[Configuration, Boolean](_.debug, b => a => b.copy(debug = a))
|
||||||
|
val batchMode: SimpleLens[Configuration, Boolean] =
|
||||||
|
SimpleLens[Configuration, Boolean](_.batchMode,
|
||||||
|
b => a => b.copy(batchMode = a))
|
||||||
|
}
|
|
@ -1,12 +1,8 @@
|
||||||
package net.kemitix.thorp.core
|
package net.kemitix.thorp.config
|
||||||
|
|
||||||
import java.nio.file.Paths
|
import java.nio.file.Paths
|
||||||
|
|
||||||
import net.kemitix.thorp.core.ConfigOptions.options
|
import zio.{IO, TaskR}
|
||||||
import net.kemitix.thorp.core.ConfigValidator.validateConfig
|
|
||||||
import net.kemitix.thorp.core.ParseConfigFile.parseFile
|
|
||||||
import net.kemitix.thorp.domain.Config
|
|
||||||
import zio.IO
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Builds a configuration from settings in a file within the
|
* Builds a configuration from settings in a file within the
|
||||||
|
@ -18,12 +14,13 @@ trait ConfigurationBuilder {
|
||||||
private val globalConfig = Paths.get("/etc/thorp.conf")
|
private val globalConfig = Paths.get("/etc/thorp.conf")
|
||||||
private val userHome = Paths.get(System.getProperty("user.home"))
|
private val userHome = Paths.get(System.getProperty("user.home"))
|
||||||
|
|
||||||
def buildConfig(
|
def buildConfig(priorityOpts: ConfigOptions)
|
||||||
priorityOpts: ConfigOptions): IO[List[ConfigValidation], Config] =
|
: IO[ConfigValidationException, Configuration] =
|
||||||
for {
|
(for {
|
||||||
config <- getConfigOptions(priorityOpts).map(collateOptions)
|
config <- getConfigOptions(priorityOpts).map(collateOptions)
|
||||||
valid <- validateConfig(config)
|
valid <- ConfigValidator.validateConfig(config)
|
||||||
} yield valid
|
} yield valid)
|
||||||
|
.catchAll(errors => TaskR.fail(ConfigValidationException(errors)))
|
||||||
|
|
||||||
private def getConfigOptions(
|
private def getConfigOptions(
|
||||||
priorityOpts: ConfigOptions): IO[List[ConfigValidation], ConfigOptions] =
|
priorityOpts: ConfigOptions): IO[List[ConfigValidation], ConfigOptions] =
|
||||||
|
@ -39,17 +36,17 @@ trait ConfigurationBuilder {
|
||||||
private def userOptions(
|
private def userOptions(
|
||||||
priorityOpts: ConfigOptions): IO[List[ConfigValidation], ConfigOptions] =
|
priorityOpts: ConfigOptions): IO[List[ConfigValidation], ConfigOptions] =
|
||||||
if (ConfigQuery.ignoreUserOptions(priorityOpts)) emptyConfig
|
if (ConfigQuery.ignoreUserOptions(priorityOpts)) emptyConfig
|
||||||
else parseFile(userHome.resolve(userConfigFilename))
|
else ParseConfigFile.parseFile(userHome.resolve(userConfigFilename))
|
||||||
|
|
||||||
private def globalOptions(
|
private def globalOptions(
|
||||||
priorityOpts: ConfigOptions): IO[List[ConfigValidation], ConfigOptions] =
|
priorityOpts: ConfigOptions): IO[List[ConfigValidation], ConfigOptions] =
|
||||||
if (ConfigQuery.ignoreGlobalOptions(priorityOpts)) emptyConfig
|
if (ConfigQuery.ignoreGlobalOptions(priorityOpts)) emptyConfig
|
||||||
else parseFile(globalConfig)
|
else ParseConfigFile.parseFile(globalConfig)
|
||||||
|
|
||||||
private def collateOptions(configOptions: ConfigOptions): Config =
|
private def collateOptions(configOptions: ConfigOptions): Configuration =
|
||||||
options
|
ConfigOptions.options
|
||||||
.get(configOptions)
|
.get(configOptions)
|
||||||
.foldLeft(Config()) { (config, configOption) =>
|
.foldLeft(Configuration()) { (config, configOption) =>
|
||||||
configOption.update(config)
|
configOption.update(config)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package net.kemitix.thorp.core
|
package net.kemitix.thorp.config
|
||||||
|
|
||||||
import java.nio.file.{Files, Path}
|
import java.nio.file.{Files, Path}
|
||||||
|
|
|
@ -1,9 +1,16 @@
|
||||||
package net.kemitix.thorp.core
|
package net.kemitix.thorp.config
|
||||||
|
|
||||||
import java.nio.file.Paths
|
import java.nio.file.Paths
|
||||||
import java.util.regex.Pattern
|
import java.util.regex.Pattern
|
||||||
|
|
||||||
import net.kemitix.thorp.core.ConfigOption._
|
import net.kemitix.thorp.config.ConfigOption.{
|
||||||
|
Bucket,
|
||||||
|
Debug,
|
||||||
|
Exclude,
|
||||||
|
Include,
|
||||||
|
Prefix,
|
||||||
|
Source
|
||||||
|
}
|
||||||
|
|
||||||
trait ParseConfigLines {
|
trait ParseConfigLines {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package net.kemitix.thorp.core
|
package net.kemitix.thorp.config
|
||||||
|
|
||||||
import java.io.{File, FileNotFoundException}
|
import java.io.{File, FileNotFoundException}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package net.kemitix.thorp.core
|
package net.kemitix.thorp.config
|
||||||
|
|
||||||
import net.kemitix.thorp.domain.Sources
|
import net.kemitix.thorp.domain.Sources
|
||||||
import zio.IO
|
import zio.IO
|
29
config/src/main/scala/net/kemitix/thorp/config/package.scala
Normal file
29
config/src/main/scala/net/kemitix/thorp/config/package.scala
Normal file
|
@ -0,0 +1,29 @@
|
||||||
|
package net.kemitix.thorp
|
||||||
|
|
||||||
|
import net.kemitix.thorp.domain.{Bucket, Filter, RemoteKey, Sources}
|
||||||
|
import zio.ZIO
|
||||||
|
|
||||||
|
package object config {
|
||||||
|
|
||||||
|
final val configService: ZIO[Config, Nothing, Config.Service] =
|
||||||
|
ZIO.access(_.config)
|
||||||
|
|
||||||
|
final def setConfiguration(
|
||||||
|
config: Configuration): ZIO[Config, Nothing, Unit] =
|
||||||
|
ZIO.accessM(_.config setConfiguration config)
|
||||||
|
|
||||||
|
final def isBatchMode: ZIO[Config, Nothing, Boolean] =
|
||||||
|
ZIO.accessM(_.config isBatchMode)
|
||||||
|
|
||||||
|
final def getBucket: ZIO[Config, Nothing, Bucket] =
|
||||||
|
ZIO.accessM(_.config bucket)
|
||||||
|
|
||||||
|
final def getPrefix: ZIO[Config, Nothing, RemoteKey] =
|
||||||
|
ZIO.accessM(_.config prefix)
|
||||||
|
|
||||||
|
final def getSources: ZIO[Config, Nothing, Sources] =
|
||||||
|
ZIO.accessM(_.config sources)
|
||||||
|
|
||||||
|
final def getFilters: ZIO[Config, Nothing, List[Filter]] =
|
||||||
|
ZIO.accessM(_.config filters)
|
||||||
|
}
|
|
@ -1,9 +1,8 @@
|
||||||
package net.kemitix.thorp.cli
|
package net.kemitix.thorp.config
|
||||||
|
|
||||||
import java.nio.file.Paths
|
import java.nio.file.Paths
|
||||||
|
|
||||||
import net.kemitix.thorp.core.ConfigOption.Debug
|
import net.kemitix.thorp.config.ConfigOption.Debug
|
||||||
import net.kemitix.thorp.core.{ConfigOptions, ConfigQuery, Resource}
|
|
||||||
import org.scalatest.FunSpec
|
import org.scalatest.FunSpec
|
||||||
import zio.DefaultRuntime
|
import zio.DefaultRuntime
|
||||||
|
|
|
@ -10,7 +10,7 @@ package object console {
|
||||||
final def putStrLn(line: String): ZIO[Console, Nothing, Unit] =
|
final def putStrLn(line: String): ZIO[Console, Nothing, Unit] =
|
||||||
ZIO.accessM(_.console putStrLn line)
|
ZIO.accessM(_.console putStrLn line)
|
||||||
|
|
||||||
final def putStrLn(line: ConsoleOut): ZIO[Console, Nothing, Unit] =
|
final def putMessageLn(line: ConsoleOut): ZIO[Console, Nothing, Unit] =
|
||||||
ZIO.accessM(_.console putStrLn line)
|
ZIO.accessM(_.console putStrLn line)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,38 +1,46 @@
|
||||||
package net.kemitix.thorp.core
|
package net.kemitix.thorp.core
|
||||||
|
|
||||||
|
import net.kemitix.thorp.config._
|
||||||
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToUpload}
|
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToUpload}
|
||||||
import net.kemitix.thorp.domain._
|
import net.kemitix.thorp.domain._
|
||||||
|
import zio.ZIO
|
||||||
|
|
||||||
object ActionGenerator {
|
object ActionGenerator {
|
||||||
|
|
||||||
def createActions(
|
def createActions(
|
||||||
s3MetaData: S3MetaData,
|
s3MetaData: S3MetaData,
|
||||||
previousActions: Stream[Action]
|
previousActions: Stream[Action]
|
||||||
)(implicit c: Config): Stream[Action] =
|
): ZIO[Config, Nothing, Stream[Action]] =
|
||||||
|
for {
|
||||||
|
bucket <- getBucket
|
||||||
|
} yield
|
||||||
s3MetaData match {
|
s3MetaData match {
|
||||||
// #1 local exists, remote exists, remote matches - do nothing
|
// #1 local exists, remote exists, remote matches - do nothing
|
||||||
case S3MetaData(localFile, _, Some(RemoteMetaData(key, hash, _)))
|
case S3MetaData(localFile, _, Some(RemoteMetaData(key, hash, _)))
|
||||||
if localFile.matches(hash) =>
|
if localFile.matches(hash) =>
|
||||||
doNothing(c.bucket, key)
|
doNothing(bucket, key)
|
||||||
// #2 local exists, remote is missing, other matches - copy
|
// #2 local exists, remote is missing, other matches - copy
|
||||||
case S3MetaData(localFile, matchByHash, None) if matchByHash.nonEmpty =>
|
case S3MetaData(localFile, matchByHash, None) if matchByHash.nonEmpty =>
|
||||||
copyFile(c.bucket, localFile, matchByHash)
|
copyFile(bucket, localFile, matchByHash)
|
||||||
// #3 local exists, remote is missing, other no matches - upload
|
// #3 local exists, remote is missing, other no matches - upload
|
||||||
case S3MetaData(localFile, matchByHash, None)
|
case S3MetaData(localFile, matchByHash, None)
|
||||||
if matchByHash.isEmpty &&
|
if matchByHash.isEmpty &&
|
||||||
isUploadAlreadyQueued(previousActions)(localFile) =>
|
isUploadAlreadyQueued(previousActions)(localFile) =>
|
||||||
uploadFile(c.bucket, localFile)
|
uploadFile(bucket, localFile)
|
||||||
// #4 local exists, remote exists, remote no match, other matches - copy
|
// #4 local exists, remote exists, remote no match, other matches - copy
|
||||||
case S3MetaData(localFile, matchByHash, Some(RemoteMetaData(_, hash, _)))
|
case S3MetaData(localFile,
|
||||||
|
matchByHash,
|
||||||
|
Some(RemoteMetaData(_, hash, _)))
|
||||||
if !localFile.matches(hash) &&
|
if !localFile.matches(hash) &&
|
||||||
matchByHash.nonEmpty =>
|
matchByHash.nonEmpty =>
|
||||||
copyFile(c.bucket, localFile, matchByHash)
|
copyFile(bucket, localFile, matchByHash)
|
||||||
// #5 local exists, remote exists, remote no match, other no matches - upload
|
// #5 local exists, remote exists, remote no match, other no matches - upload
|
||||||
case S3MetaData(localFile, matchByHash, Some(_)) if matchByHash.isEmpty =>
|
case S3MetaData(localFile, matchByHash, Some(_))
|
||||||
uploadFile(c.bucket, localFile)
|
if matchByHash.isEmpty =>
|
||||||
|
uploadFile(bucket, localFile)
|
||||||
// fallback
|
// fallback
|
||||||
case S3MetaData(localFile, _, _) =>
|
case S3MetaData(localFile, _, _) =>
|
||||||
doNothing(c.bucket, localFile.remoteKey)
|
doNothing(bucket, localFile.remoteKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def key = LocalFile.remoteKey ^|-> RemoteKey.key
|
private def key = LocalFile.remoteKey ^|-> RemoteKey.key
|
||||||
|
|
|
@ -1,12 +1,13 @@
|
||||||
package net.kemitix.thorp.core
|
package net.kemitix.thorp.core
|
||||||
|
|
||||||
|
import net.kemitix.thorp.config.Config
|
||||||
import net.kemitix.thorp.console.Console
|
import net.kemitix.thorp.console.Console
|
||||||
import net.kemitix.thorp.storage.api.Storage
|
import net.kemitix.thorp.storage.api.Storage
|
||||||
import zio.ZIO
|
import zio.ZIO
|
||||||
|
|
||||||
object CoreTypes {
|
object CoreTypes {
|
||||||
|
|
||||||
type CoreEnv = Storage with Console
|
type CoreEnv = Storage with Console with Config
|
||||||
type CoreProgram[A] = ZIO[CoreEnv, Throwable, A]
|
type CoreProgram[A] = ZIO[CoreEnv, Throwable, A]
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,42 +2,30 @@ package net.kemitix.thorp.core
|
||||||
|
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
|
|
||||||
|
import net.kemitix.thorp.config._
|
||||||
import net.kemitix.thorp.core.KeyGenerator.generateKey
|
import net.kemitix.thorp.core.KeyGenerator.generateKey
|
||||||
import net.kemitix.thorp.domain
|
|
||||||
import net.kemitix.thorp.domain._
|
import net.kemitix.thorp.domain._
|
||||||
import net.kemitix.thorp.storage.api.HashService
|
import net.kemitix.thorp.storage.api.HashService
|
||||||
import zio.Task
|
import zio.{Task, TaskR, ZIO}
|
||||||
|
|
||||||
object LocalFileStream {
|
object LocalFileStream {
|
||||||
|
|
||||||
def findFiles(
|
def findFiles(hashService: HashService)(
|
||||||
source: Path,
|
source: Path
|
||||||
hashService: HashService
|
): TaskR[Config, LocalFiles] = {
|
||||||
)(
|
|
||||||
implicit c: Config
|
|
||||||
): Task[LocalFiles] = {
|
|
||||||
|
|
||||||
val isIncluded: Path => Boolean = Filter.isIncluded(c.filters)
|
def recurseIntoSubDirectories(path: Path): TaskR[Config, LocalFiles] =
|
||||||
|
|
||||||
val pathToLocalFile: Path => Task[LocalFiles] = path =>
|
|
||||||
localFile(hashService, c)(path)
|
|
||||||
|
|
||||||
def loop(path: Path): Task[LocalFiles] = {
|
|
||||||
|
|
||||||
def dirPaths(path: Path): Task[Stream[Path]] =
|
|
||||||
listFiles(path)
|
|
||||||
.map(_.filter(isIncluded))
|
|
||||||
|
|
||||||
def recurseIntoSubDirectories(path: Path): Task[LocalFiles] =
|
|
||||||
path.toFile match {
|
path.toFile match {
|
||||||
case f if f.isDirectory => loop(path)
|
case f if f.isDirectory => loop(path)
|
||||||
case _ => pathToLocalFile(path)
|
case _ => pathToLocalFile(hashService)(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
def recurse(paths: Stream[Path]): Task[LocalFiles] =
|
def recurse(paths: Stream[Path]): TaskR[Config, LocalFiles] =
|
||||||
Task.foldLeft(paths)(LocalFiles())((acc, path) => {
|
for {
|
||||||
recurseIntoSubDirectories(path).map(localFiles => acc ++ localFiles)
|
recursed <- ZIO.foreach(paths)(path => recurseIntoSubDirectories(path))
|
||||||
})
|
} yield LocalFiles.reduce(recursed.toStream)
|
||||||
|
|
||||||
|
def loop(path: Path): TaskR[Config, LocalFiles] = {
|
||||||
|
|
||||||
for {
|
for {
|
||||||
paths <- dirPaths(path)
|
paths <- dirPaths(path)
|
||||||
|
@ -48,30 +36,50 @@ object LocalFileStream {
|
||||||
loop(source)
|
loop(source)
|
||||||
}
|
}
|
||||||
|
|
||||||
def localFile(
|
private def dirPaths(path: Path) =
|
||||||
hashService: HashService,
|
|
||||||
c: Config
|
|
||||||
): Path => Task[LocalFiles] =
|
|
||||||
path => {
|
|
||||||
val file = path.toFile
|
|
||||||
val source = c.sources.forPath(path)
|
|
||||||
for {
|
for {
|
||||||
hash <- hashService.hashLocalObject(path)
|
paths <- listFiles(path)
|
||||||
|
filtered <- includedDirPaths(paths)
|
||||||
|
} yield filtered
|
||||||
|
|
||||||
|
private def includedDirPaths(paths: Stream[Path]) =
|
||||||
|
for {
|
||||||
|
flaggedPaths <- TaskR.foreach(paths)(path =>
|
||||||
|
isIncluded(path).map((path, _)))
|
||||||
} yield
|
} yield
|
||||||
LocalFiles(localFiles = Stream(
|
flaggedPaths.toStream
|
||||||
domain.LocalFile(file,
|
.filter({ case (_, included) => included })
|
||||||
source.toFile,
|
.map({ case (path, _) => path })
|
||||||
|
|
||||||
|
private def localFile(hashService: HashService)(path: Path) = {
|
||||||
|
val file = path.toFile
|
||||||
|
for {
|
||||||
|
sources <- getSources
|
||||||
|
prefix <- getPrefix
|
||||||
|
hash <- hashService.hashLocalObject(path)
|
||||||
|
localFile = LocalFile(file,
|
||||||
|
sources.forPath(path).toFile,
|
||||||
hash,
|
hash,
|
||||||
generateKey(c.sources, c.prefix)(path))),
|
generateKey(sources, prefix)(path))
|
||||||
|
} yield
|
||||||
|
LocalFiles(localFiles = Stream(localFile),
|
||||||
count = 1,
|
count = 1,
|
||||||
totalSizeBytes = file.length)
|
totalSizeBytes = file.length)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def listFiles(path: Path): Task[Stream[Path]] =
|
private def listFiles(path: Path) =
|
||||||
for {
|
for {
|
||||||
files <- Task(path.toFile.listFiles)
|
files <- Task(path.toFile.listFiles)
|
||||||
_ <- Task.when(files == null)(
|
_ <- Task.when(files == null)(
|
||||||
Task.fail(new IllegalArgumentException(s"Directory not found $path")))
|
Task.fail(new IllegalArgumentException(s"Directory not found $path")))
|
||||||
} yield Stream(files: _*).map(_.toPath)
|
} yield Stream(files: _*).map(_.toPath)
|
||||||
|
|
||||||
|
private def isIncluded(path: Path) =
|
||||||
|
for {
|
||||||
|
filters <- getFilters
|
||||||
|
} yield Filter.isIncluded(filters)(path)
|
||||||
|
|
||||||
|
private def pathToLocalFile(hashService: HashService)(path: Path) =
|
||||||
|
localFile(hashService)(path)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,3 +15,10 @@ case class LocalFiles(
|
||||||
)
|
)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
object LocalFiles {
|
||||||
|
|
||||||
|
def reduce: Stream[LocalFiles] => LocalFiles =
|
||||||
|
list => list.foldLeft(LocalFiles())((acc, lf) => acc ++ lf)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
@ -1,121 +1,111 @@
|
||||||
package net.kemitix.thorp.core
|
package net.kemitix.thorp.core
|
||||||
|
|
||||||
|
import net.kemitix.thorp.config._
|
||||||
import net.kemitix.thorp.console._
|
import net.kemitix.thorp.console._
|
||||||
import net.kemitix.thorp.core.Action._
|
import net.kemitix.thorp.core.Action._
|
||||||
import net.kemitix.thorp.domain._
|
import net.kemitix.thorp.domain._
|
||||||
import net.kemitix.thorp.storage._
|
import net.kemitix.thorp.storage._
|
||||||
import net.kemitix.thorp.storage.api.{HashService, Storage}
|
import net.kemitix.thorp.storage.api.{HashService, Storage}
|
||||||
import zio.{Task, TaskR}
|
import zio.{TaskR, ZIO}
|
||||||
|
|
||||||
trait PlanBuilder {
|
trait PlanBuilder {
|
||||||
|
|
||||||
def createPlan(
|
def createPlan(hashService: HashService)
|
||||||
hashService: HashService,
|
: TaskR[Storage with Console with Config, SyncPlan] =
|
||||||
configOptions: ConfigOptions
|
|
||||||
): TaskR[Storage with Console, SyncPlan] =
|
|
||||||
ConfigurationBuilder
|
|
||||||
.buildConfig(configOptions)
|
|
||||||
.catchAll(errors => TaskR.fail(ConfigValidationException(errors)))
|
|
||||||
.flatMap(config => useValidConfig(hashService)(config))
|
|
||||||
|
|
||||||
private def useValidConfig(
|
|
||||||
hashService: HashService
|
|
||||||
)(implicit c: Config) = {
|
|
||||||
for {
|
for {
|
||||||
_ <- SyncLogging.logRunStart(c.bucket, c.prefix, c.sources)
|
_ <- SyncLogging.logRunStart
|
||||||
actions <- buildPlan(hashService)
|
actions <- buildPlan(hashService)
|
||||||
} yield actions
|
} yield actions
|
||||||
}
|
|
||||||
|
|
||||||
private def buildPlan(
|
private def buildPlan(hashService: HashService) =
|
||||||
hashService: HashService
|
|
||||||
)(implicit c: Config) =
|
|
||||||
for {
|
for {
|
||||||
metadata <- gatherMetadata(hashService)
|
metadata <- gatherMetadata(hashService)
|
||||||
} yield assemblePlan(c)(metadata)
|
plan <- assemblePlan(metadata)
|
||||||
|
} yield plan
|
||||||
|
|
||||||
private def assemblePlan(
|
private def assemblePlan(metadata: (S3ObjectsData, LocalFiles)) =
|
||||||
implicit c: Config): ((S3ObjectsData, LocalFiles)) => SyncPlan = {
|
metadata match {
|
||||||
case (remoteData, localData) =>
|
case (remoteData, localData) =>
|
||||||
SyncPlan(
|
createActions(remoteData, localData)
|
||||||
actions = createActions(c)(remoteData)(localData)
|
.map(_.filter(doesSomething).sortBy(SequencePlan.order))
|
||||||
.filter(doesSomething)
|
.map(
|
||||||
.sortBy(SequencePlan.order),
|
SyncPlan(_, SyncTotals(localData.count, localData.totalSizeBytes)))
|
||||||
syncTotals = SyncTotals(count = localData.count,
|
|
||||||
totalSizeBytes = localData.totalSizeBytes)
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private def createActions
|
private def createActions(
|
||||||
: Config => S3ObjectsData => LocalFiles => Stream[Action] =
|
remoteData: S3ObjectsData,
|
||||||
c =>
|
localData: LocalFiles
|
||||||
remoteData =>
|
) =
|
||||||
localData =>
|
for {
|
||||||
actionsForLocalFiles(c)(remoteData)(localData) ++
|
fileActions <- actionsForLocalFiles(remoteData, localData)
|
||||||
actionsForRemoteKeys(c)(remoteData)
|
remoteActions <- actionsForRemoteKeys(remoteData)
|
||||||
|
} yield fileActions ++ remoteActions
|
||||||
|
|
||||||
private def doesSomething: Action => Boolean = {
|
private def doesSomething: Action => Boolean = {
|
||||||
case _: DoNothing => false
|
case _: DoNothing => false
|
||||||
case _ => true
|
case _ => true
|
||||||
}
|
}
|
||||||
|
|
||||||
private def actionsForLocalFiles
|
private def actionsForLocalFiles(
|
||||||
: Config => S3ObjectsData => LocalFiles => Stream[Action] =
|
remoteData: S3ObjectsData,
|
||||||
c =>
|
localData: LocalFiles
|
||||||
remoteData =>
|
) =
|
||||||
localData =>
|
ZIO.foldLeft(localData.localFiles)(Stream.empty[Action])(
|
||||||
localData.localFiles.foldLeft(Stream.empty[Action])((acc, lf) =>
|
(acc, localFile) =>
|
||||||
createActionFromLocalFile(c)(lf)(remoteData)(acc) ++ acc)
|
createActionFromLocalFile(remoteData, acc, localFile)
|
||||||
|
.map(actions => actions ++ acc))
|
||||||
|
|
||||||
private def createActionFromLocalFile
|
private def createActionFromLocalFile(
|
||||||
: Config => LocalFile => S3ObjectsData => Stream[Action] => Stream[Action] =
|
remoteData: S3ObjectsData,
|
||||||
c =>
|
previousActions: Stream[Action],
|
||||||
lf =>
|
localFile: LocalFile
|
||||||
remoteData =>
|
) =
|
||||||
previousActions =>
|
|
||||||
ActionGenerator.createActions(
|
ActionGenerator.createActions(
|
||||||
S3MetaDataEnricher.getMetadata(lf, remoteData)(c),
|
S3MetaDataEnricher.getMetadata(localFile, remoteData),
|
||||||
previousActions)(c)
|
previousActions)
|
||||||
|
|
||||||
private def actionsForRemoteKeys: Config => S3ObjectsData => Stream[Action] =
|
private def actionsForRemoteKeys(remoteData: S3ObjectsData) =
|
||||||
c =>
|
ZIO.foldLeft(remoteData.byKey.keys)(Stream.empty[Action]) {
|
||||||
remoteData =>
|
(acc, remoteKey) =>
|
||||||
remoteData.byKey.keys.foldLeft(Stream.empty[Action])((acc, rk) =>
|
createActionFromRemoteKey(remoteKey).map(action => action #:: acc)
|
||||||
createActionFromRemoteKey(c)(rk) #:: acc)
|
}
|
||||||
|
|
||||||
private def createActionFromRemoteKey: Config => RemoteKey => Action =
|
private def createActionFromRemoteKey(remoteKey: RemoteKey) =
|
||||||
c =>
|
for {
|
||||||
rk =>
|
bucket <- getBucket
|
||||||
if (rk.isMissingLocally(c.sources, c.prefix))
|
prefix <- getPrefix
|
||||||
Action.ToDelete(c.bucket, rk, 0L)
|
sources <- getSources
|
||||||
else DoNothing(c.bucket, rk, 0L)
|
needsDeleted = remoteKey.isMissingLocally(sources, prefix)
|
||||||
|
} yield
|
||||||
|
if (needsDeleted) ToDelete(bucket, remoteKey, 0L)
|
||||||
|
else DoNothing(bucket, remoteKey, 0L)
|
||||||
|
|
||||||
private def gatherMetadata(
|
private def gatherMetadata(hashService: HashService) =
|
||||||
hashService: HashService
|
|
||||||
)(implicit c: Config) =
|
|
||||||
for {
|
for {
|
||||||
remoteData <- fetchRemoteData
|
remoteData <- fetchRemoteData
|
||||||
localData <- findLocalFiles(hashService)
|
localData <- findLocalFiles(hashService)
|
||||||
} yield (remoteData, localData)
|
} yield (remoteData, localData)
|
||||||
|
|
||||||
private def fetchRemoteData(implicit c: Config) =
|
private def fetchRemoteData =
|
||||||
listObjects(c.bucket, c.prefix)
|
for {
|
||||||
|
bucket <- getBucket
|
||||||
|
prefix <- getPrefix
|
||||||
|
objects <- listObjects(bucket, prefix)
|
||||||
|
} yield objects
|
||||||
|
|
||||||
private def findLocalFiles(
|
private def findLocalFiles(hashService: HashService) =
|
||||||
hashService: HashService
|
|
||||||
)(implicit config: Config) =
|
|
||||||
for {
|
for {
|
||||||
_ <- SyncLogging.logFileScan
|
_ <- SyncLogging.logFileScan
|
||||||
localFiles <- findFiles(hashService)
|
localFiles <- findFiles(hashService)
|
||||||
} yield localFiles
|
} yield localFiles
|
||||||
|
|
||||||
private def findFiles(
|
private def findFiles(hashService: HashService) =
|
||||||
hashService: HashService
|
for {
|
||||||
)(implicit c: Config) = {
|
sources <- getSources
|
||||||
Task
|
paths = sources.paths
|
||||||
.foreach(c.sources.paths)(LocalFileStream.findFiles(_, hashService))
|
found <- ZIO.foreach(paths)(path =>
|
||||||
.map(_.foldLeft(LocalFiles())((acc, localFile) => acc ++ localFile))
|
LocalFileStream.findFiles(hashService)(path))
|
||||||
}
|
} yield LocalFiles.reduce(found.toStream)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,7 +7,7 @@ object S3MetaDataEnricher {
|
||||||
def getMetadata(
|
def getMetadata(
|
||||||
localFile: LocalFile,
|
localFile: LocalFile,
|
||||||
s3ObjectsData: S3ObjectsData
|
s3ObjectsData: S3ObjectsData
|
||||||
)(implicit c: Config): S3MetaData = {
|
): S3MetaData = {
|
||||||
val (keyMatches, hashMatches) = getS3Status(localFile, s3ObjectsData)
|
val (keyMatches, hashMatches) = getS3Status(localFile, s3ObjectsData)
|
||||||
S3MetaData(
|
S3MetaData(
|
||||||
localFile,
|
localFile,
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package net.kemitix.thorp.core
|
package net.kemitix.thorp.core
|
||||||
|
|
||||||
|
import net.kemitix.thorp.config._
|
||||||
import net.kemitix.thorp.console._
|
import net.kemitix.thorp.console._
|
||||||
import net.kemitix.thorp.domain.StorageQueueEvent.{
|
import net.kemitix.thorp.domain.StorageQueueEvent.{
|
||||||
CopyQueueEvent,
|
CopyQueueEvent,
|
||||||
|
@ -13,17 +14,19 @@ import zio.ZIO
|
||||||
|
|
||||||
trait SyncLogging {
|
trait SyncLogging {
|
||||||
|
|
||||||
def logRunStart(
|
def logRunStart: ZIO[Console with Config, Nothing, Unit] =
|
||||||
bucket: Bucket,
|
|
||||||
prefix: RemoteKey,
|
|
||||||
sources: Sources
|
|
||||||
): ZIO[Console, Nothing, Unit] =
|
|
||||||
for {
|
for {
|
||||||
_ <- putStrLn(ConsoleOut.ValidConfig(bucket, prefix, sources))
|
bucket <- getBucket
|
||||||
|
prefix <- getPrefix
|
||||||
|
sources <- getSources
|
||||||
|
_ <- putMessageLn(ConsoleOut.ValidConfig(bucket, prefix, sources))
|
||||||
} yield ()
|
} yield ()
|
||||||
|
|
||||||
def logFileScan(implicit c: Config): ZIO[Console, Nothing, Unit] =
|
def logFileScan: ZIO[Config with Console, Nothing, Unit] =
|
||||||
putStrLn(s"Scanning local files: ${c.sources.paths.mkString(", ")}...")
|
for {
|
||||||
|
sources <- getSources
|
||||||
|
_ <- putStrLn(s"Scanning local files: ${sources.paths.mkString(", ")}...")
|
||||||
|
} yield ()
|
||||||
|
|
||||||
def logRunFinished(
|
def logRunFinished(
|
||||||
actions: Stream[StorageQueueEvent]
|
actions: Stream[StorageQueueEvent]
|
||||||
|
|
|
@ -2,29 +2,35 @@ package net.kemitix.thorp.core
|
||||||
|
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
|
|
||||||
|
import net.kemitix.thorp.config._
|
||||||
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToUpload}
|
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToUpload}
|
||||||
import net.kemitix.thorp.domain.HashType.MD5
|
import net.kemitix.thorp.domain.HashType.MD5
|
||||||
import net.kemitix.thorp.domain._
|
import net.kemitix.thorp.domain._
|
||||||
import org.scalatest.FunSpec
|
import org.scalatest.FunSpec
|
||||||
|
import zio.DefaultRuntime
|
||||||
|
|
||||||
class ActionGeneratorSuite extends FunSpec {
|
class ActionGeneratorSuite extends FunSpec {
|
||||||
val lastModified = LastModified(Instant.now())
|
val lastModified = LastModified(Instant.now())
|
||||||
private val source = Resource(this, "upload")
|
private val source = Resource(this, "upload")
|
||||||
private val sourcePath = source.toPath
|
private val sourcePath = source.toPath
|
||||||
|
private val sources = Sources(List(sourcePath))
|
||||||
private val prefix = RemoteKey("prefix")
|
private val prefix = RemoteKey("prefix")
|
||||||
private val bucket = Bucket("bucket")
|
private val bucket = Bucket("bucket")
|
||||||
implicit private val config: Config =
|
private val configOptions = ConfigOptions(
|
||||||
Config(bucket, prefix, sources = Sources(List(sourcePath)))
|
List(
|
||||||
|
ConfigOption.Bucket("bucket"),
|
||||||
|
ConfigOption.Prefix("prefix"),
|
||||||
|
ConfigOption.Source(sourcePath),
|
||||||
|
ConfigOption.IgnoreUserOptions,
|
||||||
|
ConfigOption.IgnoreGlobalOptions
|
||||||
|
))
|
||||||
private val fileToKey =
|
private val fileToKey =
|
||||||
KeyGenerator.generateKey(config.sources, config.prefix) _
|
KeyGenerator.generateKey(sources, prefix) _
|
||||||
|
|
||||||
describe("create actions") {
|
describe("create actions") {
|
||||||
|
|
||||||
val previousActions = Stream.empty[Action]
|
val previousActions = Stream.empty[Action]
|
||||||
|
|
||||||
def invoke(input: S3MetaData) =
|
|
||||||
ActionGenerator.createActions(input, previousActions).toList
|
|
||||||
|
|
||||||
describe("#1 local exists, remote exists, remote matches - do nothing") {
|
describe("#1 local exists, remote exists, remote matches - do nothing") {
|
||||||
val theHash = MD5Hash("the-hash")
|
val theHash = MD5Hash("the-hash")
|
||||||
val theFile = LocalFile.resolve("the-file",
|
val theFile = LocalFile.resolve("the-file",
|
||||||
|
@ -40,8 +46,9 @@ class ActionGeneratorSuite extends FunSpec {
|
||||||
)
|
)
|
||||||
it("do nothing") {
|
it("do nothing") {
|
||||||
val expected =
|
val expected =
|
||||||
List(DoNothing(bucket, theFile.remoteKey, theFile.file.length))
|
Right(
|
||||||
val result = invoke(input)
|
Stream(DoNothing(bucket, theFile.remoteKey, theFile.file.length)))
|
||||||
|
val result = invoke(input, previousActions)
|
||||||
assertResult(expected)(result)
|
assertResult(expected)(result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -60,13 +67,14 @@ class ActionGeneratorSuite extends FunSpec {
|
||||||
matchByHash = Set(otherRemoteMetadata), // other matches
|
matchByHash = Set(otherRemoteMetadata), // other matches
|
||||||
matchByKey = None) // remote is missing
|
matchByKey = None) // remote is missing
|
||||||
it("copy from other key") {
|
it("copy from other key") {
|
||||||
val expected = List(
|
val expected = Right(
|
||||||
|
Stream(
|
||||||
ToCopy(bucket,
|
ToCopy(bucket,
|
||||||
otherRemoteKey,
|
otherRemoteKey,
|
||||||
theHash,
|
theHash,
|
||||||
theRemoteKey,
|
theRemoteKey,
|
||||||
theFile.file.length)) // copy
|
theFile.file.length))) // copy
|
||||||
val result = invoke(input)
|
val result = invoke(input, previousActions)
|
||||||
assertResult(expected)(result)
|
assertResult(expected)(result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -80,8 +88,9 @@ class ActionGeneratorSuite extends FunSpec {
|
||||||
matchByHash = Set.empty, // other no matches
|
matchByHash = Set.empty, // other no matches
|
||||||
matchByKey = None) // remote is missing
|
matchByKey = None) // remote is missing
|
||||||
it("upload") {
|
it("upload") {
|
||||||
val expected = List(ToUpload(bucket, theFile, theFile.file.length)) // upload
|
val expected = Right(
|
||||||
val result = invoke(input)
|
Stream(ToUpload(bucket, theFile, theFile.file.length))) // upload
|
||||||
|
val result = invoke(input, previousActions)
|
||||||
assertResult(expected)(result)
|
assertResult(expected)(result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -105,13 +114,14 @@ class ActionGeneratorSuite extends FunSpec {
|
||||||
matchByHash = Set(otherRemoteMetadata), // other matches
|
matchByHash = Set(otherRemoteMetadata), // other matches
|
||||||
matchByKey = Some(oldRemoteMetadata)) // remote exists
|
matchByKey = Some(oldRemoteMetadata)) // remote exists
|
||||||
it("copy from other key") {
|
it("copy from other key") {
|
||||||
val expected = List(
|
val expected = Right(
|
||||||
|
Stream(
|
||||||
ToCopy(bucket,
|
ToCopy(bucket,
|
||||||
otherRemoteKey,
|
otherRemoteKey,
|
||||||
theHash,
|
theHash,
|
||||||
theRemoteKey,
|
theRemoteKey,
|
||||||
theFile.file.length)) // copy
|
theFile.file.length))) // copy
|
||||||
val result = invoke(input)
|
val result = invoke(input, previousActions)
|
||||||
assertResult(expected)(result)
|
assertResult(expected)(result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -132,8 +142,9 @@ class ActionGeneratorSuite extends FunSpec {
|
||||||
matchByKey = Some(theRemoteMetadata) // remote exists
|
matchByKey = Some(theRemoteMetadata) // remote exists
|
||||||
)
|
)
|
||||||
it("upload") {
|
it("upload") {
|
||||||
val expected = List(ToUpload(bucket, theFile, theFile.file.length)) // upload
|
val expected = Right(
|
||||||
val result = invoke(input)
|
Stream(ToUpload(bucket, theFile, theFile.file.length))) // upload
|
||||||
|
val result = invoke(input, previousActions)
|
||||||
assertResult(expected)(result)
|
assertResult(expected)(result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -147,4 +158,23 @@ class ActionGeneratorSuite extends FunSpec {
|
||||||
private def md5HashMap(theHash: MD5Hash): Map[HashType, MD5Hash] = {
|
private def md5HashMap(theHash: MD5Hash): Map[HashType, MD5Hash] = {
|
||||||
Map(MD5 -> theHash)
|
Map(MD5 -> theHash)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private def invoke(
|
||||||
|
input: S3MetaData,
|
||||||
|
previousActions: Stream[Action]
|
||||||
|
) = {
|
||||||
|
type TestEnv = Config
|
||||||
|
val testEnv: TestEnv = new Config.Live {}
|
||||||
|
|
||||||
|
def testProgram =
|
||||||
|
for {
|
||||||
|
config <- ConfigurationBuilder.buildConfig(configOptions)
|
||||||
|
_ <- setConfiguration(config)
|
||||||
|
actions <- ActionGenerator.createActions(input, previousActions)
|
||||||
|
} yield actions
|
||||||
|
|
||||||
|
new DefaultRuntime {}.unsafeRunSync {
|
||||||
|
testProgram.provide(testEnv)
|
||||||
|
}.toEither
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,11 @@
|
||||||
package net.kemitix.thorp.core
|
package net.kemitix.thorp.core
|
||||||
|
|
||||||
|
import net.kemitix.thorp.config.{
|
||||||
|
ConfigOption,
|
||||||
|
ConfigOptions,
|
||||||
|
ConfigQuery,
|
||||||
|
ConfigurationBuilder
|
||||||
|
}
|
||||||
import net.kemitix.thorp.domain.Sources
|
import net.kemitix.thorp.domain.Sources
|
||||||
import org.scalatest.FunSpec
|
import org.scalatest.FunSpec
|
||||||
import zio.DefaultRuntime
|
import zio.DefaultRuntime
|
||||||
|
|
|
@ -2,6 +2,7 @@ package net.kemitix.thorp.core
|
||||||
|
|
||||||
import java.nio.file.Paths
|
import java.nio.file.Paths
|
||||||
|
|
||||||
|
import net.kemitix.thorp.config.{ConfigOption, ConfigOptions, ConfigQuery}
|
||||||
import net.kemitix.thorp.domain.Sources
|
import net.kemitix.thorp.domain.Sources
|
||||||
import org.scalatest.FreeSpec
|
import org.scalatest.FreeSpec
|
||||||
|
|
||||||
|
@ -10,8 +11,8 @@ class ConfigQueryTest extends FreeSpec {
|
||||||
"show version" - {
|
"show version" - {
|
||||||
"when is set" - {
|
"when is set" - {
|
||||||
"should be true" in {
|
"should be true" in {
|
||||||
val result = ConfigQuery.showVersion(ConfigOptions(List(
|
val result =
|
||||||
ConfigOption.Version)))
|
ConfigQuery.showVersion(ConfigOptions(List(ConfigOption.Version)))
|
||||||
assertResult(true)(result)
|
assertResult(true)(result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -25,8 +26,8 @@ class ConfigQueryTest extends FreeSpec {
|
||||||
"batch mode" - {
|
"batch mode" - {
|
||||||
"when is set" - {
|
"when is set" - {
|
||||||
"should be true" in {
|
"should be true" in {
|
||||||
val result = ConfigQuery.batchMode(ConfigOptions(List(
|
val result =
|
||||||
ConfigOption.BatchMode)))
|
ConfigQuery.batchMode(ConfigOptions(List(ConfigOption.BatchMode)))
|
||||||
assertResult(true)(result)
|
assertResult(true)(result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -40,8 +41,8 @@ class ConfigQueryTest extends FreeSpec {
|
||||||
"ignore user options" - {
|
"ignore user options" - {
|
||||||
"when is set" - {
|
"when is set" - {
|
||||||
"should be true" in {
|
"should be true" in {
|
||||||
val result = ConfigQuery.ignoreUserOptions(ConfigOptions(List(
|
val result = ConfigQuery.ignoreUserOptions(
|
||||||
ConfigOption.IgnoreUserOptions)))
|
ConfigOptions(List(ConfigOption.IgnoreUserOptions)))
|
||||||
assertResult(true)(result)
|
assertResult(true)(result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -55,8 +56,8 @@ class ConfigQueryTest extends FreeSpec {
|
||||||
"ignore global options" - {
|
"ignore global options" - {
|
||||||
"when is set" - {
|
"when is set" - {
|
||||||
"should be true" in {
|
"should be true" in {
|
||||||
val result = ConfigQuery.ignoreGlobalOptions(ConfigOptions(List(
|
val result = ConfigQuery.ignoreGlobalOptions(
|
||||||
ConfigOption.IgnoreGlobalOptions)))
|
ConfigOptions(List(ConfigOption.IgnoreGlobalOptions)))
|
||||||
assertResult(true)(result)
|
assertResult(true)(result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -81,17 +82,17 @@ class ConfigQueryTest extends FreeSpec {
|
||||||
"when is set once" - {
|
"when is set once" - {
|
||||||
"should have one source" in {
|
"should have one source" in {
|
||||||
val expected = Sources(List(pathA))
|
val expected = Sources(List(pathA))
|
||||||
val result = ConfigQuery.sources(ConfigOptions(List(
|
val result =
|
||||||
ConfigOption.Source(pathA))))
|
ConfigQuery.sources(ConfigOptions(List(ConfigOption.Source(pathA))))
|
||||||
assertResult(expected)(result)
|
assertResult(expected)(result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"when is set twice" - {
|
"when is set twice" - {
|
||||||
"should have two sources" in {
|
"should have two sources" in {
|
||||||
val expected = Sources(List(pathA, pathB))
|
val expected = Sources(List(pathA, pathB))
|
||||||
val result = ConfigQuery.sources(ConfigOptions(List(
|
val result = ConfigQuery.sources(
|
||||||
ConfigOption.Source(pathA),
|
ConfigOptions(
|
||||||
ConfigOption.Source(pathB))))
|
List(ConfigOption.Source(pathA), ConfigOption.Source(pathB))))
|
||||||
assertResult(expected)(result)
|
assertResult(expected)(result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,11 @@ package net.kemitix.thorp.core
|
||||||
|
|
||||||
import java.nio.file.{Path, Paths}
|
import java.nio.file.{Path, Paths}
|
||||||
|
|
||||||
|
import net.kemitix.thorp.config.{
|
||||||
|
ConfigOption,
|
||||||
|
ConfigOptions,
|
||||||
|
ConfigurationBuilder
|
||||||
|
}
|
||||||
import net.kemitix.thorp.domain.Filter.{Exclude, Include}
|
import net.kemitix.thorp.domain.Filter.{Exclude, Include}
|
||||||
import net.kemitix.thorp.domain._
|
import net.kemitix.thorp.domain._
|
||||||
import org.scalatest.FunSpec
|
import org.scalatest.FunSpec
|
||||||
|
|
|
@ -2,7 +2,8 @@ package net.kemitix.thorp.core
|
||||||
|
|
||||||
import java.io.File
|
import java.io.File
|
||||||
|
|
||||||
import net.kemitix.thorp.domain.{Bucket, Config, RemoteKey, Sources}
|
import net.kemitix.thorp.config.Resource
|
||||||
|
import net.kemitix.thorp.domain.{RemoteKey, Sources}
|
||||||
import org.scalatest.FunSpec
|
import org.scalatest.FunSpec
|
||||||
|
|
||||||
class KeyGeneratorSuite extends FunSpec {
|
class KeyGeneratorSuite extends FunSpec {
|
||||||
|
@ -10,10 +11,9 @@ class KeyGeneratorSuite extends FunSpec {
|
||||||
private val source: File = Resource(this, "upload")
|
private val source: File = Resource(this, "upload")
|
||||||
private val sourcePath = source.toPath
|
private val sourcePath = source.toPath
|
||||||
private val prefix = RemoteKey("prefix")
|
private val prefix = RemoteKey("prefix")
|
||||||
implicit private val config: Config =
|
private val sources = Sources(List(sourcePath))
|
||||||
Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
|
|
||||||
private val fileToKey =
|
private val fileToKey =
|
||||||
KeyGenerator.generateKey(config.sources, config.prefix) _
|
KeyGenerator.generateKey(sources, prefix) _
|
||||||
|
|
||||||
describe("key generator") {
|
describe("key generator") {
|
||||||
|
|
||||||
|
|
|
@ -2,11 +2,13 @@ package net.kemitix.thorp.core
|
||||||
|
|
||||||
import java.nio.file.Paths
|
import java.nio.file.Paths
|
||||||
|
|
||||||
|
import net.kemitix.thorp.config._
|
||||||
|
import net.kemitix.thorp.console._
|
||||||
import net.kemitix.thorp.domain.HashType.MD5
|
import net.kemitix.thorp.domain.HashType.MD5
|
||||||
import net.kemitix.thorp.domain._
|
import net.kemitix.thorp.domain._
|
||||||
import net.kemitix.thorp.storage.api.HashService
|
import net.kemitix.thorp.storage.api.{HashService, Storage}
|
||||||
import org.scalatest.FunSpec
|
import org.scalatest.FunSpec
|
||||||
import zio.DefaultRuntime
|
import zio.{DefaultRuntime, Task, UIO}
|
||||||
|
|
||||||
class LocalFileStreamSuite extends FunSpec {
|
class LocalFileStreamSuite extends FunSpec {
|
||||||
|
|
||||||
|
@ -21,8 +23,13 @@ class LocalFileStreamSuite extends FunSpec {
|
||||||
private def file(filename: String) =
|
private def file(filename: String) =
|
||||||
sourcePath.resolve(Paths.get(filename))
|
sourcePath.resolve(Paths.get(filename))
|
||||||
|
|
||||||
implicit private val config: Config = Config(
|
private val configOptions = ConfigOptions(
|
||||||
sources = Sources(List(sourcePath)))
|
List(
|
||||||
|
ConfigOption.IgnoreGlobalOptions,
|
||||||
|
ConfigOption.IgnoreUserOptions,
|
||||||
|
ConfigOption.Source(sourcePath),
|
||||||
|
ConfigOption.Bucket("aBucket")
|
||||||
|
))
|
||||||
|
|
||||||
describe("findFiles") {
|
describe("findFiles") {
|
||||||
it("should find all files") {
|
it("should find all files") {
|
||||||
|
@ -47,9 +54,29 @@ class LocalFileStreamSuite extends FunSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
private def invoke() = {
|
private def invoke() = {
|
||||||
val runtime = new DefaultRuntime {}
|
type TestEnv = Storage with Console with Config
|
||||||
runtime.unsafeRunSync {
|
val testEnv: TestEnv = new Storage.Test with Console.Test with Config.Live {
|
||||||
LocalFileStream.findFiles(sourcePath, hashService)
|
override def listResult: Task[S3ObjectsData] =
|
||||||
|
Task.die(new NotImplementedError)
|
||||||
|
override def uploadResult: UIO[StorageQueueEvent] =
|
||||||
|
Task.die(new NotImplementedError)
|
||||||
|
override def copyResult: UIO[StorageQueueEvent] =
|
||||||
|
Task.die(new NotImplementedError)
|
||||||
|
override def deleteResult: UIO[StorageQueueEvent] =
|
||||||
|
Task.die(new NotImplementedError)
|
||||||
|
override def shutdownResult: UIO[StorageQueueEvent] =
|
||||||
|
Task.die(new NotImplementedError)
|
||||||
|
}
|
||||||
|
|
||||||
|
def testProgram =
|
||||||
|
for {
|
||||||
|
config <- ConfigurationBuilder.buildConfig(configOptions)
|
||||||
|
_ <- setConfiguration(config)
|
||||||
|
files <- LocalFileStream.findFiles(hashService)(sourcePath)
|
||||||
|
} yield files
|
||||||
|
|
||||||
|
new DefaultRuntime {}.unsafeRunSync {
|
||||||
|
testProgram.provide(testEnv)
|
||||||
}.toEither
|
}.toEither
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,8 +2,8 @@ package net.kemitix.thorp.core
|
||||||
|
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
|
|
||||||
|
import net.kemitix.thorp.config.Resource
|
||||||
import net.kemitix.thorp.domain.MD5HashData.{BigFile, Root}
|
import net.kemitix.thorp.domain.MD5HashData.{BigFile, Root}
|
||||||
import net.kemitix.thorp.domain._
|
|
||||||
import org.scalatest.FunSpec
|
import org.scalatest.FunSpec
|
||||||
import zio.DefaultRuntime
|
import zio.DefaultRuntime
|
||||||
|
|
||||||
|
@ -12,10 +12,6 @@ class MD5HashGeneratorTest extends FunSpec {
|
||||||
private val runtime = new DefaultRuntime {}
|
private val runtime = new DefaultRuntime {}
|
||||||
|
|
||||||
private val source = Resource(this, "upload")
|
private val source = Resource(this, "upload")
|
||||||
private val sourcePath = source.toPath
|
|
||||||
private val prefix = RemoteKey("prefix")
|
|
||||||
implicit private val config: Config =
|
|
||||||
Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
|
|
||||||
|
|
||||||
describe("md5File()") {
|
describe("md5File()") {
|
||||||
describe("read a small file (smaller than buffer)") {
|
describe("read a small file (smaller than buffer)") {
|
||||||
|
|
|
@ -2,6 +2,12 @@ package net.kemitix.thorp.core
|
||||||
|
|
||||||
import java.nio.file.{Path, Paths}
|
import java.nio.file.{Path, Paths}
|
||||||
|
|
||||||
|
import net.kemitix.thorp.config.{
|
||||||
|
ConfigOption,
|
||||||
|
ConfigOptions,
|
||||||
|
ParseConfigFile,
|
||||||
|
Resource
|
||||||
|
}
|
||||||
import org.scalatest.FunSpec
|
import org.scalatest.FunSpec
|
||||||
import zio.DefaultRuntime
|
import zio.DefaultRuntime
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@ package net.kemitix.thorp.core
|
||||||
|
|
||||||
import java.nio.file.Paths
|
import java.nio.file.Paths
|
||||||
|
|
||||||
|
import net.kemitix.thorp.config.{ConfigOption, ConfigOptions, ParseConfigLines}
|
||||||
import org.scalatest.FunSpec
|
import org.scalatest.FunSpec
|
||||||
|
|
||||||
class ParseConfigLinesTest extends FunSpec {
|
class ParseConfigLinesTest extends FunSpec {
|
||||||
|
|
|
@ -3,6 +3,7 @@ package net.kemitix.thorp.core
|
||||||
import java.io.File
|
import java.io.File
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
|
|
||||||
|
import net.kemitix.thorp.config._
|
||||||
import net.kemitix.thorp.console._
|
import net.kemitix.thorp.console._
|
||||||
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
|
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
|
||||||
import net.kemitix.thorp.domain.HashType.MD5
|
import net.kemitix.thorp.domain.HashType.MD5
|
||||||
|
@ -24,7 +25,9 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
||||||
val options: Path => ConfigOptions =
|
val options: Path => ConfigOptions =
|
||||||
source =>
|
source =>
|
||||||
configOptions(ConfigOption.Source(source),
|
configOptions(ConfigOption.Source(source),
|
||||||
ConfigOption.Bucket("a-bucket"))
|
ConfigOption.Bucket("a-bucket"),
|
||||||
|
ConfigOption.IgnoreUserOptions,
|
||||||
|
ConfigOption.IgnoreGlobalOptions)
|
||||||
"a file" - {
|
"a file" - {
|
||||||
val filename = "aFile"
|
val filename = "aFile"
|
||||||
val remoteKey = RemoteKey(filename)
|
val remoteKey = RemoteKey(filename)
|
||||||
|
@ -325,9 +328,9 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
||||||
hashService: HashService,
|
hashService: HashService,
|
||||||
configOptions: ConfigOptions,
|
configOptions: ConfigOptions,
|
||||||
result: Task[S3ObjectsData]
|
result: Task[S3ObjectsData]
|
||||||
): Either[Any, List[(String, String, String, String, String)]] = {
|
) = {
|
||||||
type TestEnv = Storage.Test with Console.Test
|
type TestEnv = Storage with Console with Config
|
||||||
val testEnv: TestEnv = new Storage.Test with Console.Test {
|
val testEnv: TestEnv = new Storage.Test with Console.Test with Config.Live {
|
||||||
override def listResult: Task[S3ObjectsData] = result
|
override def listResult: Task[S3ObjectsData] = result
|
||||||
override def uploadResult: UIO[StorageQueueEvent] =
|
override def uploadResult: UIO[StorageQueueEvent] =
|
||||||
Task.die(new NotImplementedError)
|
Task.die(new NotImplementedError)
|
||||||
|
@ -339,12 +342,15 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
||||||
Task.die(new NotImplementedError)
|
Task.die(new NotImplementedError)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def testProgram =
|
||||||
|
for {
|
||||||
|
config <- ConfigurationBuilder.buildConfig(configOptions)
|
||||||
|
_ <- setConfiguration(config)
|
||||||
|
plan <- PlanBuilder.createPlan(hashService)
|
||||||
|
} yield plan
|
||||||
|
|
||||||
new DefaultRuntime {}
|
new DefaultRuntime {}
|
||||||
.unsafeRunSync {
|
.unsafeRunSync(testProgram.provide(testEnv))
|
||||||
PlanBuilder
|
|
||||||
.createPlan(hashService, configOptions)
|
|
||||||
.provide(testEnv)
|
|
||||||
}
|
|
||||||
.toEither
|
.toEither
|
||||||
.map(convertResult)
|
.map(convertResult)
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package net.kemitix.thorp.core
|
||||||
|
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
|
|
||||||
|
import net.kemitix.thorp.config.Resource
|
||||||
import net.kemitix.thorp.core.S3MetaDataEnricher.{getMetadata, getS3Status}
|
import net.kemitix.thorp.core.S3MetaDataEnricher.{getMetadata, getS3Status}
|
||||||
import net.kemitix.thorp.domain.HashType.MD5
|
import net.kemitix.thorp.domain.HashType.MD5
|
||||||
import net.kemitix.thorp.domain._
|
import net.kemitix.thorp.domain._
|
||||||
|
@ -11,11 +12,10 @@ class S3MetaDataEnricherSuite extends FunSpec {
|
||||||
val lastModified = LastModified(Instant.now())
|
val lastModified = LastModified(Instant.now())
|
||||||
private val source = Resource(this, "upload")
|
private val source = Resource(this, "upload")
|
||||||
private val sourcePath = source.toPath
|
private val sourcePath = source.toPath
|
||||||
|
private val sources = Sources(List(sourcePath))
|
||||||
private val prefix = RemoteKey("prefix")
|
private val prefix = RemoteKey("prefix")
|
||||||
implicit private val config: Config =
|
|
||||||
Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
|
|
||||||
private val fileToKey =
|
private val fileToKey =
|
||||||
KeyGenerator.generateKey(config.sources, config.prefix) _
|
KeyGenerator.generateKey(sources, prefix) _
|
||||||
|
|
||||||
def getMatchesByKey(
|
def getMatchesByKey(
|
||||||
status: (Option[HashModified], Set[(MD5Hash, KeyModified)]))
|
status: (Option[HashModified], Set[(MD5Hash, KeyModified)]))
|
||||||
|
|
|
@ -1,25 +0,0 @@
|
||||||
package net.kemitix.thorp.domain
|
|
||||||
|
|
||||||
final case class Config(
|
|
||||||
bucket: Bucket = Bucket(""),
|
|
||||||
prefix: RemoteKey = RemoteKey(""),
|
|
||||||
filters: List[Filter] = List(),
|
|
||||||
debug: Boolean = false,
|
|
||||||
batchMode: Boolean = false,
|
|
||||||
sources: Sources = Sources(List())
|
|
||||||
)
|
|
||||||
|
|
||||||
object Config {
|
|
||||||
val sources: SimpleLens[Config, Sources] =
|
|
||||||
SimpleLens[Config, Sources](_.sources, b => a => b.copy(sources = a))
|
|
||||||
val bucket: SimpleLens[Config, Bucket] =
|
|
||||||
SimpleLens[Config, Bucket](_.bucket, b => a => b.copy(bucket = a))
|
|
||||||
val prefix: SimpleLens[Config, RemoteKey] =
|
|
||||||
SimpleLens[Config, RemoteKey](_.prefix, b => a => b.copy(prefix = a))
|
|
||||||
val filters: SimpleLens[Config, List[Filter]] =
|
|
||||||
SimpleLens[Config, List[Filter]](_.filters, b => a => b.copy(filters = a))
|
|
||||||
val debug: SimpleLens[Config, Boolean] =
|
|
||||||
SimpleLens[Config, Boolean](_.debug, b => a => b.copy(debug = a))
|
|
||||||
val batchMode: SimpleLens[Config, Boolean] =
|
|
||||||
SimpleLens[Config, Boolean](_.batchMode, b => a => b.copy(batchMode = a))
|
|
||||||
}
|
|
|
@ -3,7 +3,7 @@ package net.kemitix.thorp.storage.aws
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
|
|
||||||
import com.amazonaws.services.s3.transfer.TransferManagerConfiguration
|
import com.amazonaws.services.s3.transfer.TransferManagerConfiguration
|
||||||
import net.kemitix.thorp.core.Resource
|
import net.kemitix.thorp.config.Resource
|
||||||
import org.scalatest.FunSpec
|
import org.scalatest.FunSpec
|
||||||
import zio.DefaultRuntime
|
import zio.DefaultRuntime
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,8 @@ package net.kemitix.thorp.storage.aws
|
||||||
|
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
|
|
||||||
import net.kemitix.thorp.core.{KeyGenerator, Resource, S3MetaDataEnricher}
|
import net.kemitix.thorp.config.Resource
|
||||||
|
import net.kemitix.thorp.core.{KeyGenerator, S3MetaDataEnricher}
|
||||||
import net.kemitix.thorp.domain.HashType.MD5
|
import net.kemitix.thorp.domain.HashType.MD5
|
||||||
import net.kemitix.thorp.domain._
|
import net.kemitix.thorp.domain._
|
||||||
import org.scalamock.scalatest.MockFactory
|
import org.scalamock.scalatest.MockFactory
|
||||||
|
@ -12,12 +13,10 @@ class StorageServiceSuite extends FunSpec with MockFactory {
|
||||||
|
|
||||||
private val source = Resource(this, "upload")
|
private val source = Resource(this, "upload")
|
||||||
private val sourcePath = source.toPath
|
private val sourcePath = source.toPath
|
||||||
|
private val sources = Sources(List(sourcePath))
|
||||||
private val prefix = RemoteKey("prefix")
|
private val prefix = RemoteKey("prefix")
|
||||||
implicit private val config: Config =
|
|
||||||
Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
|
|
||||||
private val fileToKey =
|
private val fileToKey =
|
||||||
KeyGenerator.generateKey(config.sources, config.prefix) _
|
KeyGenerator.generateKey(sources, prefix) _
|
||||||
|
|
||||||
describe("getS3Status") {
|
describe("getS3Status") {
|
||||||
|
|
||||||
|
|
|
@ -5,8 +5,8 @@ import java.io.File
|
||||||
import com.amazonaws.SdkClientException
|
import com.amazonaws.SdkClientException
|
||||||
import com.amazonaws.services.s3.model.AmazonS3Exception
|
import com.amazonaws.services.s3.model.AmazonS3Exception
|
||||||
import com.amazonaws.services.s3.transfer.model.UploadResult
|
import com.amazonaws.services.s3.transfer.model.UploadResult
|
||||||
|
import net.kemitix.thorp.config.Resource
|
||||||
import net.kemitix.thorp.console._
|
import net.kemitix.thorp.console._
|
||||||
import net.kemitix.thorp.core.Resource
|
|
||||||
import net.kemitix.thorp.domain.HashType.MD5
|
import net.kemitix.thorp.domain.HashType.MD5
|
||||||
import net.kemitix.thorp.domain.StorageQueueEvent.{
|
import net.kemitix.thorp.domain.StorageQueueEvent.{
|
||||||
Action,
|
Action,
|
||||||
|
|
Loading…
Reference in a new issue