Install WartRemover pluging (#150)
* [sbt] Install WartRemover * remove warts * remote warts * fix tests * [domain] UploadEventListener fix progress bar (again) * [domain] Remove LastModified - wasn't being used for anything
This commit is contained in:
parent
9a6208025c
commit
af7733952c
85 changed files with 645 additions and 631 deletions
|
@ -26,6 +26,7 @@ val commonSettings = Seq(
|
|||
"-language:postfixOps",
|
||||
"-language:higherKinds",
|
||||
"-Ypartial-unification"),
|
||||
wartremoverErrors ++= Warts.unsafe.filterNot(wart => List(Wart.Any, Wart.Nothing, Wart.Serializable).contains(wart)),
|
||||
test in assembly := {}
|
||||
)
|
||||
|
||||
|
|
|
@ -11,7 +11,7 @@ object CliArgs {
|
|||
OParser
|
||||
.parse(configParser, args, List())
|
||||
.map(ConfigOptions(_))
|
||||
.getOrElse(ConfigOptions())
|
||||
.getOrElse(ConfigOptions.empty)
|
||||
}
|
||||
|
||||
val configParser: OParser[Unit, List[ConfigOption]] = {
|
||||
|
|
|
@ -23,7 +23,7 @@ object Config {
|
|||
trait Live extends Config {
|
||||
|
||||
val config: Service = new Service {
|
||||
private val configRef = new AtomicReference(Configuration())
|
||||
private val configRef = new AtomicReference(Configuration.empty)
|
||||
override def setConfiguration(
|
||||
config: Configuration): ZIO[Config, Nothing, Unit] =
|
||||
UIO(configRef.set(config))
|
||||
|
|
|
@ -12,12 +12,12 @@ sealed trait ConfigOption {
|
|||
|
||||
object ConfigOption {
|
||||
|
||||
case class Source(path: Path) extends ConfigOption {
|
||||
final case class Source(path: Path) extends ConfigOption {
|
||||
override def update(config: Configuration): Configuration =
|
||||
sources.modify(_ + path)(config)
|
||||
}
|
||||
|
||||
case class Bucket(name: String) extends ConfigOption {
|
||||
final case class Bucket(name: String) extends ConfigOption {
|
||||
override def update(config: Configuration): Configuration =
|
||||
if (config.bucket.name.isEmpty)
|
||||
bucket.set(domain.Bucket(name))(config)
|
||||
|
@ -25,7 +25,7 @@ object ConfigOption {
|
|||
config
|
||||
}
|
||||
|
||||
case class Prefix(path: String) extends ConfigOption {
|
||||
final case class Prefix(path: String) extends ConfigOption {
|
||||
override def update(config: Configuration): Configuration =
|
||||
if (config.prefix.key.isEmpty)
|
||||
prefix.set(RemoteKey(path))(config)
|
||||
|
@ -33,17 +33,17 @@ object ConfigOption {
|
|||
config
|
||||
}
|
||||
|
||||
case class Include(pattern: String) extends ConfigOption {
|
||||
final case class Include(pattern: String) extends ConfigOption {
|
||||
override def update(config: Configuration): Configuration =
|
||||
filters.modify(domain.Filter.Include(pattern) :: _)(config)
|
||||
}
|
||||
|
||||
case class Exclude(pattern: String) extends ConfigOption {
|
||||
final case class Exclude(pattern: String) extends ConfigOption {
|
||||
override def update(config: Configuration): Configuration =
|
||||
filters.modify(domain.Filter.Exclude(pattern) :: _)(config)
|
||||
}
|
||||
|
||||
case class Debug() extends ConfigOption {
|
||||
final case class Debug() extends ConfigOption {
|
||||
override def update(config: Configuration): Configuration =
|
||||
debug.set(true)(config)
|
||||
}
|
||||
|
|
|
@ -2,29 +2,27 @@ package net.kemitix.thorp.config
|
|||
|
||||
import net.kemitix.thorp.domain.SimpleLens
|
||||
|
||||
case class ConfigOptions(
|
||||
options: List[ConfigOption] = List()
|
||||
) {
|
||||
|
||||
def combine(
|
||||
x: ConfigOptions,
|
||||
y: ConfigOptions
|
||||
): ConfigOptions =
|
||||
x ++ y
|
||||
final case class ConfigOptions(options: List[ConfigOption]) {
|
||||
|
||||
def ++(other: ConfigOptions): ConfigOptions =
|
||||
ConfigOptions(options ++ other.options)
|
||||
ConfigOptions.combine(this, other)
|
||||
|
||||
def ::(head: ConfigOption): ConfigOptions =
|
||||
ConfigOptions(head :: options)
|
||||
|
||||
def contains[A1 >: ConfigOption](elem: A1): Boolean =
|
||||
options contains elem
|
||||
|
||||
}
|
||||
|
||||
object ConfigOptions {
|
||||
val empty: ConfigOptions = ConfigOptions(List.empty)
|
||||
val options: SimpleLens[ConfigOptions, List[ConfigOption]] =
|
||||
SimpleLens[ConfigOptions, List[ConfigOption]](_.options,
|
||||
c => a => c.copy(options = a))
|
||||
def combine(
|
||||
x: ConfigOptions,
|
||||
y: ConfigOptions
|
||||
): ConfigOptions = ConfigOptions(x.options ++ y.options)
|
||||
|
||||
def contains[A1 >: ConfigOption](elem: A1)(
|
||||
configOptions: ConfigOptions): Boolean =
|
||||
configOptions.options.contains(elem)
|
||||
}
|
||||
|
|
|
@ -7,26 +7,27 @@ import net.kemitix.thorp.domain.Sources
|
|||
trait ConfigQuery {
|
||||
|
||||
def showVersion(configOptions: ConfigOptions): Boolean =
|
||||
configOptions contains ConfigOption.Version
|
||||
ConfigOptions.contains(ConfigOption.Version)(configOptions)
|
||||
|
||||
def batchMode(configOptions: ConfigOptions): Boolean =
|
||||
configOptions contains ConfigOption.BatchMode
|
||||
ConfigOptions.contains(ConfigOption.BatchMode)(configOptions)
|
||||
|
||||
def ignoreUserOptions(configOptions: ConfigOptions): Boolean =
|
||||
configOptions contains ConfigOption.IgnoreUserOptions
|
||||
ConfigOptions.contains(ConfigOption.IgnoreUserOptions)(configOptions)
|
||||
|
||||
def ignoreGlobalOptions(configOptions: ConfigOptions): Boolean =
|
||||
configOptions contains ConfigOption.IgnoreGlobalOptions
|
||||
ConfigOptions.contains(ConfigOption.IgnoreGlobalOptions)(configOptions)
|
||||
|
||||
def sources(configOptions: ConfigOptions): Sources = {
|
||||
val paths = configOptions.options.flatMap {
|
||||
case ConfigOption.Source(sourcePath) => Some(sourcePath)
|
||||
case _ => None
|
||||
val explicitPaths = configOptions.options.flatMap {
|
||||
case ConfigOption.Source(sourcePath) => List(sourcePath)
|
||||
case _ => List.empty
|
||||
}
|
||||
Sources(paths match {
|
||||
val paths = explicitPaths match {
|
||||
case List() => List(Paths.get(System.getenv("PWD")))
|
||||
case _ => paths
|
||||
})
|
||||
case _ => explicitPaths
|
||||
}
|
||||
Sources(paths)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ object ConfigValidation {
|
|||
override def errorMessage: String = "Bucket name is missing"
|
||||
}
|
||||
|
||||
case class ErrorReadingFile(
|
||||
final case class ErrorReadingFile(
|
||||
file: File,
|
||||
message: String
|
||||
) extends ConfigValidation {
|
||||
|
|
|
@ -3,15 +3,23 @@ package net.kemitix.thorp.config
|
|||
import net.kemitix.thorp.domain.{Bucket, Filter, RemoteKey, SimpleLens, Sources}
|
||||
|
||||
private[config] final case class Configuration(
|
||||
bucket: Bucket = Bucket(""),
|
||||
prefix: RemoteKey = RemoteKey(""),
|
||||
filters: List[Filter] = List(),
|
||||
debug: Boolean = false,
|
||||
batchMode: Boolean = false,
|
||||
sources: Sources = Sources(List())
|
||||
bucket: Bucket,
|
||||
prefix: RemoteKey,
|
||||
filters: List[Filter],
|
||||
debug: Boolean,
|
||||
batchMode: Boolean,
|
||||
sources: Sources
|
||||
)
|
||||
|
||||
private[config] object Configuration {
|
||||
val empty: Configuration = Configuration(
|
||||
bucket = Bucket(""),
|
||||
prefix = RemoteKey(""),
|
||||
filters = List.empty,
|
||||
debug = false,
|
||||
batchMode = false,
|
||||
sources = Sources(List.empty)
|
||||
)
|
||||
val sources: SimpleLens[Configuration, Sources] =
|
||||
SimpleLens[Configuration, Sources](_.sources, b => a => b.copy(sources = a))
|
||||
val bucket: SimpleLens[Configuration, Bucket] =
|
||||
|
|
|
@ -29,7 +29,7 @@ trait ConfigurationBuilder {
|
|||
globalOpts <- globalOptions(priorityOpts ++ sourceOpts ++ userOpts)
|
||||
} yield priorityOpts ++ sourceOpts ++ userOpts ++ globalOpts
|
||||
|
||||
private val emptyConfig = ZIO.succeed(ConfigOptions())
|
||||
private val emptyConfig = ZIO.succeed(ConfigOptions.empty)
|
||||
|
||||
private def userOptions(priorityOpts: ConfigOptions) =
|
||||
if (ConfigQuery.ignoreUserOptions(priorityOpts)) emptyConfig
|
||||
|
@ -42,7 +42,7 @@ trait ConfigurationBuilder {
|
|||
private def collateOptions(configOptions: ConfigOptions): Configuration =
|
||||
ConfigOptions.options
|
||||
.get(configOptions)
|
||||
.foldLeft(Configuration()) { (config, configOption) =>
|
||||
.foldLeft(Configuration.empty) { (config, configOption) =>
|
||||
configOption.update(config)
|
||||
}
|
||||
|
||||
|
|
|
@ -17,21 +17,21 @@ trait ParseConfigLines {
|
|||
private def parseLine(str: String) =
|
||||
format.matcher(str) match {
|
||||
case m if m.matches => parseKeyValue(m.group("key"), m.group("value"))
|
||||
case _ => None
|
||||
case _ => List.empty
|
||||
}
|
||||
|
||||
private def parseKeyValue(
|
||||
key: String,
|
||||
value: String
|
||||
): Option[ConfigOption] =
|
||||
): List[ConfigOption] =
|
||||
key.toLowerCase match {
|
||||
case "source" => Some(Source(Paths.get(value)))
|
||||
case "bucket" => Some(Bucket(value))
|
||||
case "prefix" => Some(Prefix(value))
|
||||
case "include" => Some(Include(value))
|
||||
case "exclude" => Some(Exclude(value))
|
||||
case "debug" => if (truthy(value)) Some(Debug()) else None
|
||||
case _ => None
|
||||
case "source" => List(Source(Paths.get(value)))
|
||||
case "bucket" => List(Bucket(value))
|
||||
case "prefix" => List(Prefix(value))
|
||||
case "include" => List(Include(value))
|
||||
case "exclude" => List(Exclude(value))
|
||||
case "debug" => if (truthy(value)) List(Debug()) else List.empty
|
||||
case _ => List.empty
|
||||
}
|
||||
|
||||
private def truthy(value: String): Boolean =
|
||||
|
|
|
@ -1,16 +1,11 @@
|
|||
package net.kemitix.thorp.config
|
||||
|
||||
import java.io.{File, FileNotFoundException}
|
||||
|
||||
import scala.util.Try
|
||||
import java.io.File
|
||||
|
||||
object Resource {
|
||||
|
||||
def apply(
|
||||
base: AnyRef,
|
||||
name: String
|
||||
): File =
|
||||
Try {
|
||||
new File(base.getClass.getResource(name).getPath)
|
||||
}.getOrElse(throw new FileNotFoundException(name))
|
||||
): File = new File(base.getClass.getResource(name).getPath)
|
||||
}
|
||||
|
|
|
@ -45,25 +45,27 @@ class CliArgsTest extends FunSpec {
|
|||
val strings = List("--source", pathTo("."), "--bucket", "bucket", arg)
|
||||
.filter(_ != "")
|
||||
val maybeOptions = invoke(strings)
|
||||
maybeOptions.getOrElse(ConfigOptions())
|
||||
maybeOptions.getOrElse(ConfigOptions.empty)
|
||||
}
|
||||
|
||||
val containsDebug = ConfigOptions.contains(Debug())(_)
|
||||
|
||||
describe("when no debug flag") {
|
||||
val configOptions = invokeWithArgument("")
|
||||
it("debug should be false") {
|
||||
assertResult(false)(configOptions.contains(Debug()))
|
||||
assertResult(false)(containsDebug(configOptions))
|
||||
}
|
||||
}
|
||||
describe("when long debug flag") {
|
||||
val configOptions = invokeWithArgument("--debug")
|
||||
it("debug should be true") {
|
||||
assert(configOptions.contains(Debug()))
|
||||
assert(containsDebug(configOptions))
|
||||
}
|
||||
}
|
||||
describe("when short debug flag") {
|
||||
val configOptions = invokeWithArgument("-d")
|
||||
it("debug should be true") {
|
||||
assert(configOptions.contains(Debug()))
|
||||
assert(containsDebug(configOptions))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ class ConfigOptionTest extends FunSpec with TemporaryFolder {
|
|||
withDirectory(path1 => {
|
||||
withDirectory(path2 => {
|
||||
val configOptions = ConfigOptions(
|
||||
List(
|
||||
List[ConfigOption](
|
||||
ConfigOption.Source(path1),
|
||||
ConfigOption.Source(path2),
|
||||
ConfigOption.Bucket("bucket"),
|
||||
|
|
|
@ -2,6 +2,7 @@ package net.kemitix.thorp.config
|
|||
|
||||
import java.nio.file.Paths
|
||||
|
||||
import net.kemitix.thorp.domain.NonUnit.~*
|
||||
import net.kemitix.thorp.domain.Sources
|
||||
import org.scalatest.FreeSpec
|
||||
|
||||
|
@ -75,7 +76,7 @@ class ConfigQueryTest extends FreeSpec {
|
|||
val pwd = Paths.get(System.getenv("PWD"))
|
||||
val expected = Sources(List(pwd))
|
||||
val result = ConfigQuery.sources(ConfigOptions(List()))
|
||||
assertResult(expected)(result)
|
||||
~*(assertResult(expected)(result))
|
||||
}
|
||||
}
|
||||
"when is set once" - {
|
||||
|
|
|
@ -3,6 +3,7 @@ package net.kemitix.thorp.config
|
|||
import java.nio.file.{Path, Paths}
|
||||
|
||||
import net.kemitix.thorp.domain.Filter.{Exclude, Include}
|
||||
import net.kemitix.thorp.domain.NonUnit.~*
|
||||
import net.kemitix.thorp.domain._
|
||||
import net.kemitix.thorp.filesystem.FileSystem
|
||||
import org.scalatest.FunSpec
|
||||
|
@ -17,7 +18,7 @@ class ConfigurationBuilderTest extends FunSpec with TemporaryFolder {
|
|||
|
||||
private def configOptions(options: ConfigOption*): ConfigOptions =
|
||||
ConfigOptions(
|
||||
List(
|
||||
List[ConfigOption](
|
||||
ConfigOption.IgnoreUserOptions,
|
||||
ConfigOption.IgnoreGlobalOptions
|
||||
) ++ options)
|
||||
|
@ -34,7 +35,7 @@ class ConfigurationBuilderTest extends FunSpec with TemporaryFolder {
|
|||
describe("with .thorp.conf") {
|
||||
describe("with settings") {
|
||||
withDirectory(source => {
|
||||
val configFileName = createFile(source,
|
||||
writeFile(source,
|
||||
thorpConfigFileName,
|
||||
"bucket = a-bucket",
|
||||
"prefix = a-prefix",
|
||||
|
@ -51,7 +52,8 @@ class ConfigurationBuilderTest extends FunSpec with TemporaryFolder {
|
|||
}
|
||||
it("should have filters") {
|
||||
val expected =
|
||||
Right(List(Exclude("an-exclusion"), Include("an-inclusion")))
|
||||
Right(
|
||||
List[Filter](Exclude("an-exclusion"), Include("an-inclusion")))
|
||||
assertResult(expected)(result.map(_.filters))
|
||||
}
|
||||
})
|
||||
|
@ -125,14 +127,14 @@ class ConfigurationBuilderTest extends FunSpec with TemporaryFolder {
|
|||
val expectedPrefixes = Right(RemoteKey("current-prefix"))
|
||||
// should have filters from both sources
|
||||
val expectedFilters = Right(
|
||||
List(Filter.Exclude("current-exclude"),
|
||||
List[Filter](Filter.Exclude("current-exclude"),
|
||||
Filter.Include("current-include")))
|
||||
val options = configOptions(ConfigOption.Source(currentSource))
|
||||
val result = invoke(options)
|
||||
assertResult(expectedSources)(result.map(_.sources))
|
||||
assertResult(expectedBuckets)(result.map(_.bucket))
|
||||
assertResult(expectedPrefixes)(result.map(_.prefix))
|
||||
assertResult(expectedFilters)(result.map(_.filters))
|
||||
~*(assertResult(expectedSources)(result.map(_.sources)))
|
||||
~*(assertResult(expectedBuckets)(result.map(_.bucket)))
|
||||
~*(assertResult(expectedPrefixes)(result.map(_.prefix)))
|
||||
~*(assertResult(expectedFilters)(result.map(_.filters)))
|
||||
})
|
||||
})
|
||||
}
|
||||
|
|
|
@ -10,7 +10,7 @@ import zio.DefaultRuntime
|
|||
|
||||
class ParseConfigFileTest extends FunSpec with TemporaryFolder {
|
||||
|
||||
private val empty = Right(ConfigOptions())
|
||||
private val empty = Right(ConfigOptions.empty)
|
||||
|
||||
describe("parse a missing file") {
|
||||
val file = new File("/path/to/missing/file")
|
||||
|
@ -21,7 +21,7 @@ class ParseConfigFileTest extends FunSpec with TemporaryFolder {
|
|||
describe("parse an empty file") {
|
||||
it("should return no options") {
|
||||
withDirectory(dir => {
|
||||
val file = writeFile(dir, "empty-file")
|
||||
val file = createFile(dir, "empty-file")
|
||||
assertResult(empty)(invoke(file))
|
||||
})
|
||||
}
|
||||
|
@ -29,7 +29,7 @@ class ParseConfigFileTest extends FunSpec with TemporaryFolder {
|
|||
describe("parse a file with no valid entries") {
|
||||
it("should return no options") {
|
||||
withDirectory(dir => {
|
||||
val file = writeFile(dir, "invalid-config", "no valid = config items")
|
||||
val file = createFile(dir, "invalid-config", "no valid = config items")
|
||||
assertResult(empty)(invoke(file))
|
||||
})
|
||||
}
|
||||
|
@ -37,10 +37,11 @@ class ParseConfigFileTest extends FunSpec with TemporaryFolder {
|
|||
describe("parse a file with properties") {
|
||||
it("should return some options") {
|
||||
val expected = Right(
|
||||
ConfigOptions(List(ConfigOption.Source(Paths.get("/path/to/source")),
|
||||
ConfigOptions(
|
||||
List[ConfigOption](ConfigOption.Source(Paths.get("/path/to/source")),
|
||||
ConfigOption.Bucket("bucket-name"))))
|
||||
withDirectory(dir => {
|
||||
val file = writeFile(dir,
|
||||
val file = createFile(dir,
|
||||
"simple-config",
|
||||
"source = /path/to/source",
|
||||
"bucket = bucket-name")
|
||||
|
|
|
@ -59,21 +59,21 @@ class ParseConfigLinesTest extends FunSpec {
|
|||
}
|
||||
describe("debug - false") {
|
||||
it("should parse") {
|
||||
val expected = Right(ConfigOptions())
|
||||
val expected = Right(ConfigOptions.empty)
|
||||
val result = invoke(List("debug = false"))
|
||||
assertResult(expected)(result)
|
||||
}
|
||||
}
|
||||
describe("comment line") {
|
||||
it("should be ignored") {
|
||||
val expected = Right(ConfigOptions())
|
||||
val expected = Right(ConfigOptions.empty)
|
||||
val result = invoke(List("# ignore me"))
|
||||
assertResult(expected)(result)
|
||||
}
|
||||
}
|
||||
describe("unrecognised option") {
|
||||
it("should be ignored") {
|
||||
val expected = Right(ConfigOptions())
|
||||
val expected = Right(ConfigOptions.empty)
|
||||
val result = invoke(List("unsupported = option"))
|
||||
assertResult(expected)(result)
|
||||
}
|
||||
|
|
|
@ -15,17 +15,17 @@ trait Console {
|
|||
object Console {
|
||||
|
||||
trait Service {
|
||||
def putStrLn(line: ConsoleOut): ZIO[Console, Nothing, Unit]
|
||||
def putMessageLn(line: ConsoleOut): ZIO[Console, Nothing, Unit]
|
||||
def putStrLn(line: String): ZIO[Console, Nothing, Unit]
|
||||
}
|
||||
|
||||
trait Live extends Console {
|
||||
val console: Service = new Service {
|
||||
override def putStrLn(line: ConsoleOut): ZIO[Console, Nothing, Unit] =
|
||||
override def putMessageLn(line: ConsoleOut): ZIO[Console, Nothing, Unit] =
|
||||
putStrLn(line.en)
|
||||
override def putStrLn(line: String): ZIO[Console, Nothing, Unit] =
|
||||
putStrLn(SConsole.out)(line)
|
||||
final def putStrLn(stream: PrintStream)(
|
||||
putStrLnPrintStream(SConsole.out)(line)
|
||||
final def putStrLnPrintStream(stream: PrintStream)(
|
||||
line: String): ZIO[Console, Nothing, Unit] =
|
||||
UIO(SConsole.withOut(stream)(SConsole.println(line)))
|
||||
}
|
||||
|
@ -39,11 +39,11 @@ object Console {
|
|||
def getOutput: List[String] = output.get
|
||||
|
||||
val console: Service = new Service {
|
||||
override def putStrLn(line: ConsoleOut): ZIO[Console, Nothing, Unit] =
|
||||
override def putMessageLn(line: ConsoleOut): ZIO[Console, Nothing, Unit] =
|
||||
putStrLn(line.en)
|
||||
|
||||
override def putStrLn(line: String): ZIO[Console, Nothing, Unit] = {
|
||||
output.accumulateAndGet(List(line), (a, b) => a ++ b)
|
||||
val _ = output.accumulateAndGet(List(line), (a, b) => a ++ b)
|
||||
ZIO.succeed(())
|
||||
}
|
||||
|
||||
|
@ -59,9 +59,9 @@ object Console {
|
|||
ZIO.accessM(_.console putStrLn line)
|
||||
|
||||
final def putMessageLn(line: ConsoleOut): ZIO[Console, Nothing, Unit] =
|
||||
ZIO.accessM(_.console putStrLn line)
|
||||
ZIO.accessM(_.console putMessageLn line)
|
||||
|
||||
final def putMessageLn(
|
||||
final def putMessageLnB(
|
||||
line: ConsoleOut.WithBatchMode): ZIO[Console with Config, Nothing, Unit] =
|
||||
ZIO.accessM(line() >>= _.console.putStrLn)
|
||||
|
||||
|
|
|
@ -23,7 +23,7 @@ object ConsoleOut {
|
|||
if (batchMode) UIO(enBatch) else UIO(en)
|
||||
}
|
||||
|
||||
case class ValidConfig(
|
||||
final case class ValidConfig(
|
||||
bucket: Bucket,
|
||||
prefix: RemoteKey,
|
||||
sources: Sources
|
||||
|
@ -36,7 +36,7 @@ object ConsoleOut {
|
|||
.mkString(", ")
|
||||
}
|
||||
|
||||
case class UploadComplete(remoteKey: RemoteKey)
|
||||
final case class UploadComplete(remoteKey: RemoteKey)
|
||||
extends ConsoleOut.WithBatchMode {
|
||||
override def en: String =
|
||||
s"${GREEN}Uploaded:$RESET ${remoteKey.key}$eraseToEndOfScreen"
|
||||
|
@ -44,7 +44,7 @@ object ConsoleOut {
|
|||
s"Uploaded: ${remoteKey.key}"
|
||||
}
|
||||
|
||||
case class CopyComplete(sourceKey: RemoteKey, targetKey: RemoteKey)
|
||||
final case class CopyComplete(sourceKey: RemoteKey, targetKey: RemoteKey)
|
||||
extends ConsoleOut.WithBatchMode {
|
||||
override def en: String =
|
||||
s"${GREEN}Copied:$RESET ${sourceKey.key} => ${targetKey.key}$eraseToEndOfScreen"
|
||||
|
@ -52,7 +52,7 @@ object ConsoleOut {
|
|||
s"Copied: ${sourceKey.key} => ${targetKey.key}"
|
||||
}
|
||||
|
||||
case class DeleteComplete(remoteKey: RemoteKey)
|
||||
final case class DeleteComplete(remoteKey: RemoteKey)
|
||||
extends ConsoleOut.WithBatchMode {
|
||||
override def en: String =
|
||||
s"${GREEN}Deleted:$RESET ${remoteKey.key}$eraseToEndOfScreen"
|
||||
|
@ -60,7 +60,7 @@ object ConsoleOut {
|
|||
s"Deleted: $remoteKey"
|
||||
}
|
||||
|
||||
case class ErrorQueueEventOccurred(action: Action, e: Throwable)
|
||||
final case class ErrorQueueEventOccurred(action: Action, e: Throwable)
|
||||
extends ConsoleOut.WithBatchMode {
|
||||
override def en: String =
|
||||
s"${action.name} failed: ${action.keys}: ${e.getMessage}"
|
||||
|
|
|
@ -2,15 +2,16 @@ package net.kemitix.thorp.core
|
|||
|
||||
import net.kemitix.thorp.config.Config
|
||||
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToUpload}
|
||||
import net.kemitix.thorp.domain.Implicits._
|
||||
import net.kemitix.thorp.domain._
|
||||
import zio.RIO
|
||||
|
||||
object ActionGenerator {
|
||||
|
||||
def createAction(
|
||||
def createActions(
|
||||
matchedMetadata: MatchedMetadata,
|
||||
previousActions: Stream[Action]
|
||||
): RIO[Config, Action] =
|
||||
): RIO[Config, Stream[Action]] =
|
||||
for {
|
||||
bucket <- Config.bucket
|
||||
} yield
|
||||
|
@ -30,7 +31,7 @@ object ActionGenerator {
|
|||
anyMatches)
|
||||
}
|
||||
|
||||
case class TaggedMetadata(
|
||||
final case class TaggedMetadata(
|
||||
matchedMetadata: MatchedMetadata,
|
||||
previousActions: Stream[Action],
|
||||
remoteExists: Boolean,
|
||||
|
@ -39,14 +40,15 @@ object ActionGenerator {
|
|||
)
|
||||
|
||||
private def genAction(taggedMetadata: TaggedMetadata,
|
||||
bucket: Bucket): Action = {
|
||||
bucket: Bucket): Stream[Action] = {
|
||||
taggedMetadata match {
|
||||
case TaggedMetadata(md, _, exists, matches, _) if exists && matches =>
|
||||
case TaggedMetadata(md, _, remoteExists, remoteMatches, _)
|
||||
if remoteExists && remoteMatches =>
|
||||
doNothing(bucket, md.localFile.remoteKey)
|
||||
case TaggedMetadata(md, _, _, _, any) if any =>
|
||||
copyFile(bucket, md.localFile, md.matchByHash.head)
|
||||
case TaggedMetadata(md, _, _, _, anyMatches) if anyMatches =>
|
||||
copyFile(bucket, md.localFile, md.matchByHash)
|
||||
case TaggedMetadata(md, previous, _, _, _)
|
||||
if isUploadAlreadyQueued(previous)(md.localFile) =>
|
||||
if isNotUploadAlreadyQueued(previous)(md.localFile) =>
|
||||
uploadFile(bucket, md.localFile)
|
||||
case TaggedMetadata(md, _, _, _, _) =>
|
||||
doNothing(bucket, md.localFile.remoteKey)
|
||||
|
@ -55,34 +57,39 @@ object ActionGenerator {
|
|||
|
||||
private def key = LocalFile.remoteKey ^|-> RemoteKey.key
|
||||
|
||||
def isUploadAlreadyQueued(
|
||||
def isNotUploadAlreadyQueued(
|
||||
previousActions: Stream[Action]
|
||||
)(
|
||||
localFile: LocalFile
|
||||
): Boolean = !previousActions.exists {
|
||||
case ToUpload(_, lf, _) => key.get(lf) equals key.get(localFile)
|
||||
case ToUpload(_, lf, _) => key.get(lf) === key.get(localFile)
|
||||
case _ => false
|
||||
}
|
||||
|
||||
private def doNothing(
|
||||
bucket: Bucket,
|
||||
remoteKey: RemoteKey
|
||||
) = DoNothing(bucket, remoteKey, 0L)
|
||||
) = Stream(DoNothing(bucket, remoteKey, 0L))
|
||||
|
||||
private def uploadFile(
|
||||
bucket: Bucket,
|
||||
localFile: LocalFile
|
||||
) = ToUpload(bucket, localFile, localFile.file.length)
|
||||
) = Stream(ToUpload(bucket, localFile, localFile.file.length))
|
||||
|
||||
private def copyFile(
|
||||
bucket: Bucket,
|
||||
localFile: LocalFile,
|
||||
remoteMetaData: RemoteMetaData
|
||||
): Action =
|
||||
remoteMetaData: Set[RemoteMetaData]
|
||||
) =
|
||||
remoteMetaData
|
||||
.take(1)
|
||||
.toStream
|
||||
.map(
|
||||
other =>
|
||||
ToCopy(bucket,
|
||||
remoteMetaData.remoteKey,
|
||||
remoteMetaData.hash,
|
||||
other.remoteKey,
|
||||
other.hash,
|
||||
localFile.remoteKey,
|
||||
localFile.file.length)
|
||||
localFile.file.length))
|
||||
|
||||
}
|
||||
|
|
|
@ -3,13 +3,14 @@ package net.kemitix.thorp.core
|
|||
import net.kemitix.thorp.domain.SimpleLens
|
||||
|
||||
final case class Counters(
|
||||
uploaded: Int = 0,
|
||||
deleted: Int = 0,
|
||||
copied: Int = 0,
|
||||
errors: Int = 0
|
||||
uploaded: Int,
|
||||
deleted: Int,
|
||||
copied: Int,
|
||||
errors: Int
|
||||
)
|
||||
|
||||
object Counters {
|
||||
val empty: Counters = Counters(0, 0, 0, 0)
|
||||
val uploaded: SimpleLens[Counters, Int] =
|
||||
SimpleLens[Counters, Int](_.uploaded, b => a => b.copy(uploaded = a))
|
||||
val deleted: SimpleLens[Counters, Int] =
|
||||
|
|
|
@ -2,7 +2,7 @@ package net.kemitix.thorp.core
|
|||
|
||||
import net.kemitix.thorp.domain.StorageQueueEvent
|
||||
|
||||
case class EventQueue(
|
||||
final case class EventQueue(
|
||||
events: Stream[StorageQueueEvent],
|
||||
bytesInQueue: Long
|
||||
)
|
||||
|
|
|
@ -9,9 +9,9 @@ object Filters {
|
|||
|
||||
def isIncluded(p: Path)(filters: List[Filter]): Boolean = {
|
||||
sealed trait State
|
||||
case class Unknown() extends State
|
||||
case class Accepted() extends State
|
||||
case class Discarded() extends State
|
||||
final case class Unknown() extends State
|
||||
final case class Accepted() extends State
|
||||
final case class Discarded() extends State
|
||||
val excluded = isExcludedByFilter(p)(_)
|
||||
val included = isIncludedByFilter(p)(_)
|
||||
filters.foldRight(Unknown(): State)((filter, state) =>
|
||||
|
@ -33,9 +33,9 @@ object Filters {
|
|||
}
|
||||
|
||||
def isIncludedByFilter(path: Path)(filter: Filter): Boolean =
|
||||
filter.predicate.test(path.toString)
|
||||
filter.predicate.test(path.toFile.getPath)
|
||||
|
||||
def isExcludedByFilter(path: Path)(filter: Filter): Boolean =
|
||||
filter.predicate.test(path.toString)
|
||||
filter.predicate.test(path.toFile.getPath)
|
||||
|
||||
}
|
||||
|
|
|
@ -13,7 +13,8 @@ object KeyGenerator {
|
|||
)(path: Path): Task[RemoteKey] =
|
||||
Sources
|
||||
.forPath(path)(sources)
|
||||
.map(p => p.relativize(path.toAbsolutePath).toString)
|
||||
.map(_.relativize(path.toAbsolutePath))
|
||||
.map(_.toFile.getPath)
|
||||
.map(RemoteKey.resolve(_)(prefix))
|
||||
|
||||
}
|
||||
|
|
|
@ -65,9 +65,11 @@ object LocalFileStream {
|
|||
_ <- filesMustExist(path, files)
|
||||
} yield Stream(files: _*).map(_.toPath)
|
||||
|
||||
private def filesMustExist(path: Path, files: Array[File]) = {
|
||||
Task.when(files == null)(
|
||||
Task.fail(new IllegalArgumentException(s"Directory not found $path")))
|
||||
private def filesMustExist(path: Path, files: Array[File]) =
|
||||
Task {
|
||||
Option(files)
|
||||
.map(_ => ())
|
||||
.getOrElse(new IllegalArgumentException(s"Directory not found $path"))
|
||||
}
|
||||
|
||||
private def isIncluded(path: Path) =
|
||||
|
|
|
@ -22,11 +22,11 @@ object LocalFileValidator {
|
|||
prefix: RemoteKey
|
||||
): IO[Violation, LocalFile] =
|
||||
for {
|
||||
vFile <- validateFile(path.toFile)
|
||||
file <- validateFile(path.toFile)
|
||||
remoteKey <- validateRemoteKey(sources, prefix, path)
|
||||
} yield LocalFile(vFile, source, hash, remoteKey)
|
||||
} yield LocalFile(file, source, hash, remoteKey)
|
||||
|
||||
private def validateFile(file: File) =
|
||||
private def validateFile(file: File): IO[Violation, File] =
|
||||
if (file.isDirectory)
|
||||
ZIO.fail(Violation.IsNotAFile(file))
|
||||
else
|
||||
|
@ -34,7 +34,7 @@ object LocalFileValidator {
|
|||
|
||||
private def validateRemoteKey(sources: Sources,
|
||||
prefix: RemoteKey,
|
||||
path: Path) =
|
||||
path: Path): IO[Violation, RemoteKey] =
|
||||
KeyGenerator
|
||||
.generateKey(sources, prefix)(path)
|
||||
.mapError(e => Violation.InvalidRemoteKey(path, e))
|
||||
|
@ -43,10 +43,11 @@ object LocalFileValidator {
|
|||
def getMessage: String
|
||||
}
|
||||
object Violation {
|
||||
case class IsNotAFile(file: File) extends Violation {
|
||||
final case class IsNotAFile(file: File) extends Violation {
|
||||
override def getMessage: String = s"Local File must be a file: ${file}"
|
||||
}
|
||||
case class InvalidRemoteKey(path: Path, e: Throwable) extends Violation {
|
||||
final case class InvalidRemoteKey(path: Path, e: Throwable)
|
||||
extends Violation {
|
||||
override def getMessage: String =
|
||||
s"Remote Key for '${path}' is invalid: ${e.getMessage}"
|
||||
}
|
||||
|
|
|
@ -2,10 +2,10 @@ package net.kemitix.thorp.core
|
|||
|
||||
import net.kemitix.thorp.domain.LocalFile
|
||||
|
||||
case class LocalFiles(
|
||||
localFiles: Stream[LocalFile] = Stream(),
|
||||
count: Long = 0,
|
||||
totalSizeBytes: Long = 0
|
||||
final case class LocalFiles(
|
||||
localFiles: Stream[LocalFile],
|
||||
count: Long,
|
||||
totalSizeBytes: Long
|
||||
) {
|
||||
def ++(append: LocalFiles): LocalFiles =
|
||||
copy(
|
||||
|
@ -16,8 +16,9 @@ case class LocalFiles(
|
|||
}
|
||||
|
||||
object LocalFiles {
|
||||
val empty: LocalFiles = LocalFiles(Stream.empty, 0L, 0L)
|
||||
def reduce: Stream[LocalFiles] => LocalFiles =
|
||||
list => list.foldLeft(LocalFiles())((acc, lf) => acc ++ lf)
|
||||
list => list.foldLeft(LocalFiles.empty)((acc, lf) => acc ++ lf)
|
||||
def one(localFile: LocalFile): LocalFiles =
|
||||
LocalFiles(Stream(localFile), 1, localFile.file.length)
|
||||
}
|
||||
|
|
|
@ -35,7 +35,10 @@ object PlanBuilder {
|
|||
createActions(remoteObjects, localData.localFiles)
|
||||
.map(_.filter(doesSomething).sortBy(SequencePlan.order))
|
||||
.map(
|
||||
SyncPlan(_, SyncTotals(localData.count, localData.totalSizeBytes)))
|
||||
SyncPlan
|
||||
.create(_,
|
||||
SyncTotals
|
||||
.create(localData.count, localData.totalSizeBytes, 0L)))
|
||||
}
|
||||
|
||||
private def createActions(
|
||||
|
@ -57,14 +60,14 @@ object PlanBuilder {
|
|||
localFiles: Stream[LocalFile]
|
||||
) =
|
||||
ZIO.foldLeft(localFiles)(Stream.empty[Action])((acc, localFile) =>
|
||||
createActionFromLocalFile(remoteObjects, acc, localFile).map(_ #:: acc))
|
||||
createActionsFromLocalFile(remoteObjects, acc, localFile).map(_ #::: acc))
|
||||
|
||||
private def createActionFromLocalFile(
|
||||
private def createActionsFromLocalFile(
|
||||
remoteObjects: RemoteObjects,
|
||||
previousActions: Stream[Action],
|
||||
localFile: LocalFile
|
||||
) =
|
||||
ActionGenerator.createAction(
|
||||
ActionGenerator.createActions(
|
||||
S3MetaDataEnricher.getMetadata(localFile, remoteObjects),
|
||||
previousActions)
|
||||
|
||||
|
@ -72,12 +75,13 @@ object PlanBuilder {
|
|||
ZIO.foldLeft(remoteKeys)(Stream.empty[Action])((acc, remoteKey) =>
|
||||
createActionFromRemoteKey(remoteKey).map(_ #:: acc))
|
||||
|
||||
private def createActionFromRemoteKey(remoteKey: RemoteKey) =
|
||||
private def createActionFromRemoteKey(
|
||||
remoteKey: RemoteKey): ZIO[FileSystem with Config, Throwable, Action] =
|
||||
for {
|
||||
bucket <- Config.bucket
|
||||
prefix <- Config.prefix
|
||||
sources <- Config.sources
|
||||
needsDeleted <- Remote.isMissingLocally(sources, prefix)(remoteKey)
|
||||
needsDeleted <- Remote.isMissingLocally(sources, prefix, remoteKey)
|
||||
} yield
|
||||
if (needsDeleted) ToDelete(bucket, remoteKey, 0L)
|
||||
else DoNothing(bucket, remoteKey, 0L)
|
||||
|
|
|
@ -8,9 +8,9 @@ import zio.{RIO, ZIO}
|
|||
|
||||
object Remote {
|
||||
|
||||
def isMissingLocally(sources: Sources, prefix: RemoteKey)(
|
||||
remoteKey: RemoteKey
|
||||
): RIO[FileSystem, Boolean] =
|
||||
def isMissingLocally(sources: Sources,
|
||||
prefix: RemoteKey,
|
||||
remoteKey: RemoteKey): RIO[FileSystem, Boolean] =
|
||||
existsLocally(sources, prefix)(remoteKey)
|
||||
.map(exists => !exists)
|
||||
|
||||
|
|
|
@ -11,11 +11,11 @@ object S3MetaDataEnricher {
|
|||
val (keyMatches, hashMatches) = getS3Status(localFile, remoteObjects)
|
||||
MatchedMetadata(
|
||||
localFile,
|
||||
matchByKey = keyMatches.map { hm =>
|
||||
RemoteMetaData(localFile.remoteKey, hm.hash, hm.modified)
|
||||
matchByKey = keyMatches.map { hash =>
|
||||
RemoteMetaData(localFile.remoteKey, hash)
|
||||
},
|
||||
matchByHash = hashMatches.map {
|
||||
case (hash, km) => RemoteMetaData(km.key, hash, km.modified)
|
||||
case (key, hash) => RemoteMetaData(key, hash)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
@ -23,14 +23,14 @@ object S3MetaDataEnricher {
|
|||
def getS3Status(
|
||||
localFile: LocalFile,
|
||||
remoteObjects: RemoteObjects
|
||||
): (Option[HashModified], Set[(MD5Hash, KeyModified)]) = {
|
||||
): (Option[MD5Hash], Set[(RemoteKey, MD5Hash)]) = {
|
||||
val matchingByKey = remoteObjects.byKey.get(localFile.remoteKey)
|
||||
val matchingByHash = localFile.hashes
|
||||
.map {
|
||||
case (_, md5Hash) =>
|
||||
remoteObjects.byHash
|
||||
.getOrElse(md5Hash, Set())
|
||||
.map(km => (md5Hash, km))
|
||||
.map(key => (key, md5Hash))
|
||||
}
|
||||
.flatten
|
||||
.toSet
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package net.kemitix.thorp.core
|
||||
|
||||
case class SequencedAction(
|
||||
final case class SequencedAction(
|
||||
action: Action,
|
||||
index: Int
|
||||
)
|
||||
|
|
|
@ -32,7 +32,7 @@ trait SyncLogging {
|
|||
def logRunFinished(
|
||||
actions: Stream[StorageQueueEvent]
|
||||
): ZIO[Console, Nothing, Unit] = {
|
||||
val counters = actions.foldLeft(Counters())(countActivities)
|
||||
val counters = actions.foldLeft(Counters.empty)(countActivities)
|
||||
Console.putStrLn(eraseToEndOfScreen) *>
|
||||
Console.putStrLn(s"Uploaded ${counters.uploaded} files") *>
|
||||
Console.putStrLn(s"Copied ${counters.copied} files") *>
|
||||
|
|
|
@ -2,7 +2,13 @@ package net.kemitix.thorp.core
|
|||
|
||||
import net.kemitix.thorp.domain.SyncTotals
|
||||
|
||||
case class SyncPlan(
|
||||
actions: Stream[Action] = Stream(),
|
||||
syncTotals: SyncTotals = SyncTotals()
|
||||
final case class SyncPlan private (
|
||||
actions: Stream[Action],
|
||||
syncTotals: SyncTotals
|
||||
)
|
||||
|
||||
object SyncPlan {
|
||||
val empty: SyncPlan = SyncPlan(Stream.empty, SyncTotals.empty)
|
||||
def create(actions: Stream[Action], syncTotals: SyncTotals): SyncPlan =
|
||||
SyncPlan(actions, syncTotals)
|
||||
}
|
||||
|
|
|
@ -24,13 +24,13 @@ trait ThorpArchive {
|
|||
event: StorageQueueEvent): RIO[Console with Config, StorageQueueEvent] =
|
||||
event match {
|
||||
case UploadQueueEvent(remoteKey, _) =>
|
||||
ZIO(event) <* Console.putMessageLn(UploadComplete(remoteKey))
|
||||
ZIO(event) <* Console.putMessageLnB(UploadComplete(remoteKey))
|
||||
case CopyQueueEvent(sourceKey, targetKey) =>
|
||||
ZIO(event) <* Console.putMessageLn(CopyComplete(sourceKey, targetKey))
|
||||
ZIO(event) <* Console.putMessageLnB(CopyComplete(sourceKey, targetKey))
|
||||
case DeleteQueueEvent(remoteKey) =>
|
||||
ZIO(event) <* Console.putMessageLn(DeleteComplete(remoteKey))
|
||||
ZIO(event) <* Console.putMessageLnB(DeleteComplete(remoteKey))
|
||||
case ErrorQueueEvent(action, _, e) =>
|
||||
ZIO(event) <* Console.putMessageLn(ErrorQueueEventOccurred(action, e))
|
||||
ZIO(event) <* Console.putMessageLnB(ErrorQueueEventOccurred(action, e))
|
||||
case DoNothingQueueEvent(_) => ZIO(event)
|
||||
case ShutdownQueueEvent() => ZIO(event)
|
||||
}
|
||||
|
|
|
@ -8,7 +8,7 @@ import net.kemitix.thorp.domain._
|
|||
import net.kemitix.thorp.storage.api.Storage
|
||||
import zio.{Task, RIO}
|
||||
|
||||
case class UnversionedMirrorArchive(syncTotals: SyncTotals)
|
||||
final case class UnversionedMirrorArchive(syncTotals: SyncTotals)
|
||||
extends ThorpArchive {
|
||||
|
||||
override def update(
|
||||
|
|
|
@ -48,7 +48,7 @@ private object MD5HashGenerator {
|
|||
offset: Long,
|
||||
endOffset: Long
|
||||
) =
|
||||
FileSystem.open(file, offset) >>= { managedFileInputStream =>
|
||||
FileSystem.openAtOffset(file, offset) >>= { managedFileInputStream =>
|
||||
managedFileInputStream.use { fileInputStream =>
|
||||
digestFile(fileInputStream, offset, endOffset)
|
||||
}
|
||||
|
@ -76,7 +76,7 @@ private object MD5HashGenerator {
|
|||
if (nextBufferSize(currentOffset, endOffset) < maxBufferSize)
|
||||
new Array[Byte](nextBufferSize(currentOffset, endOffset))
|
||||
else defaultBuffer
|
||||
fis read buffer
|
||||
val _ = fis read buffer
|
||||
buffer
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
package net.kemitix.thorp.core
|
||||
|
||||
import java.time.Instant
|
||||
|
||||
import net.kemitix.thorp.config._
|
||||
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToUpload}
|
||||
import net.kemitix.thorp.domain.HashType.MD5
|
||||
|
@ -11,14 +9,13 @@ import org.scalatest.FunSpec
|
|||
import zio.DefaultRuntime
|
||||
|
||||
class ActionGeneratorSuite extends FunSpec {
|
||||
val lastModified = LastModified(Instant.now())
|
||||
private val source = Resource(this, "upload")
|
||||
private val sourcePath = source.toPath
|
||||
private val sources = Sources(List(sourcePath))
|
||||
private val prefix = RemoteKey("prefix")
|
||||
private val bucket = Bucket("bucket")
|
||||
private val configOptions = ConfigOptions(
|
||||
List(
|
||||
List[ConfigOption](
|
||||
ConfigOption.Bucket("bucket"),
|
||||
ConfigOption.Prefix("prefix"),
|
||||
ConfigOption.Source(sourcePath),
|
||||
|
@ -38,9 +35,7 @@ class ActionGeneratorSuite extends FunSpec {
|
|||
sourcePath,
|
||||
sources,
|
||||
prefix)
|
||||
theRemoteMetadata = RemoteMetaData(theFile.remoteKey,
|
||||
theHash,
|
||||
lastModified)
|
||||
theRemoteMetadata = RemoteMetaData(theFile.remoteKey, theHash)
|
||||
input = MatchedMetadata(
|
||||
theFile, // local exists
|
||||
matchByHash = Set(theRemoteMetadata), // remote matches
|
||||
|
@ -69,9 +64,7 @@ class ActionGeneratorSuite extends FunSpec {
|
|||
prefix)
|
||||
theRemoteKey = theFile.remoteKey
|
||||
otherRemoteKey = RemoteKey.resolve("other-key")(prefix)
|
||||
otherRemoteMetadata = RemoteMetaData(otherRemoteKey,
|
||||
theHash,
|
||||
lastModified)
|
||||
otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash)
|
||||
input = MatchedMetadata(
|
||||
theFile, // local exists
|
||||
matchByHash = Set(otherRemoteMetadata), // other matches
|
||||
|
@ -128,12 +121,10 @@ class ActionGeneratorSuite extends FunSpec {
|
|||
theRemoteKey = theFile.remoteKey
|
||||
oldHash = MD5Hash("old-hash")
|
||||
otherRemoteKey = RemoteKey.resolve("other-key")(prefix)
|
||||
otherRemoteMetadata = RemoteMetaData(otherRemoteKey,
|
||||
theHash,
|
||||
lastModified)
|
||||
otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash)
|
||||
oldRemoteMetadata = RemoteMetaData(theRemoteKey,
|
||||
hash = oldHash, // remote no match
|
||||
lastModified = lastModified)
|
||||
hash = oldHash // remote no match
|
||||
)
|
||||
input = MatchedMetadata(
|
||||
theFile, // local exists
|
||||
matchByHash = Set(otherRemoteMetadata), // other matches
|
||||
|
@ -166,7 +157,7 @@ class ActionGeneratorSuite extends FunSpec {
|
|||
prefix)
|
||||
theRemoteKey = theFile.remoteKey
|
||||
oldHash = MD5Hash("old-hash")
|
||||
theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash, lastModified)
|
||||
theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash)
|
||||
input = MatchedMetadata(
|
||||
theFile, // local exists
|
||||
matchByHash = Set.empty, // remote no match, other no match
|
||||
|
@ -206,7 +197,7 @@ class ActionGeneratorSuite extends FunSpec {
|
|||
for {
|
||||
config <- ConfigurationBuilder.buildConfig(configOptions)
|
||||
_ <- Config.set(config)
|
||||
actions <- ActionGenerator.createAction(input, previousActions)
|
||||
actions <- ActionGenerator.createActions(input, previousActions)
|
||||
} yield actions
|
||||
|
||||
new DefaultRuntime {}.unsafeRunSync {
|
||||
|
|
|
@ -7,7 +7,8 @@ import net.kemitix.thorp.domain._
|
|||
import net.kemitix.thorp.storage.api.Storage
|
||||
import zio.{RIO, UIO}
|
||||
|
||||
case class DummyStorageService(remoteObjects: RemoteObjects,
|
||||
final case class DummyStorageService(
|
||||
remoteObjects: RemoteObjects,
|
||||
uploadFiles: Map[File, (RemoteKey, MD5Hash)])
|
||||
extends Storage.Service {
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ class FiltersSuite extends FunSpec {
|
|||
describe("Include") {
|
||||
|
||||
describe("default filter") {
|
||||
val include = Include()
|
||||
val include = Include.all
|
||||
it("should include files") {
|
||||
paths.foreach(path =>
|
||||
assertResult(true)(Filters.isIncludedByFilter(path)(include)))
|
||||
|
@ -43,10 +43,12 @@ class FiltersSuite extends FunSpec {
|
|||
val matching = Paths.get("/upload/root-file")
|
||||
assertResult(true)(Filters.isIncludedByFilter(matching)(include))
|
||||
}
|
||||
it("exclude non-matching files 'test-file-for-hash.txt' & '/upload/subdir/leaf-file'") {
|
||||
it("exclude non-matching files 'test-file-for-hash.txt'") {
|
||||
val nonMatching1 = Paths.get("/test-file-for-hash.txt")
|
||||
val nonMatching2 = Paths.get("/upload/subdir/leaf-file")
|
||||
assertResult(false)(Filters.isIncludedByFilter(nonMatching1)(include))
|
||||
}
|
||||
it("exclude non-matching files '/upload/subdir/leaf-file'") {
|
||||
val nonMatching2 = Paths.get("/upload/subdir/leaf-file")
|
||||
assertResult(false)(Filters.isIncludedByFilter(nonMatching2)(include))
|
||||
}
|
||||
}
|
||||
|
@ -78,10 +80,12 @@ class FiltersSuite extends FunSpec {
|
|||
val matching = Paths.get("/upload/root-file")
|
||||
assertResult(true)(Filters.isExcludedByFilter(matching)(exclude))
|
||||
}
|
||||
it("include non-matching files 'test-file-for-hash.txt' & '/upload/subdir/leaf-file'") {
|
||||
it("include non-matching files 'test-file-for-hash.txt'") {
|
||||
val nonMatching1 = Paths.get("/test-file-for-hash.txt")
|
||||
val nonMatching2 = Paths.get("/upload/subdir/leaf-file")
|
||||
assertResult(false)(Filters.isExcludedByFilter(nonMatching1)(exclude))
|
||||
}
|
||||
it("include non-matching files '/upload/subdir/leaf-file'") {
|
||||
val nonMatching2 = Paths.get("/upload/subdir/leaf-file")
|
||||
assertResult(false)(Filters.isExcludedByFilter(nonMatching2)(exclude))
|
||||
}
|
||||
}
|
||||
|
@ -116,7 +120,7 @@ class FiltersSuite extends FunSpec {
|
|||
}
|
||||
}
|
||||
describe("when include .txt files, but then exclude everything trumps all") {
|
||||
val filters = List(Include(".txt"), Exclude(".*"))
|
||||
val filters = List[Filter](Include(".txt"), Exclude(".*"))
|
||||
it("should include nothing") {
|
||||
val expected = List()
|
||||
val result = invoke(filters)
|
||||
|
@ -124,7 +128,7 @@ class FiltersSuite extends FunSpec {
|
|||
}
|
||||
}
|
||||
describe("when exclude everything except .txt files") {
|
||||
val filters = List(Exclude(".*"), Include(".txt"))
|
||||
val filters = List[Filter](Exclude(".*"), Include(".txt"))
|
||||
it("should include only the .txt files") {
|
||||
val expected = List(path2, path3).map(Paths.get(_))
|
||||
val result = invoke(filters)
|
||||
|
|
|
@ -32,7 +32,7 @@ class LocalFileStreamSuite extends FunSpec {
|
|||
val result =
|
||||
invoke()
|
||||
.map(_.localFiles)
|
||||
.map(_.map(LocalFile.relativeToSource(_).toString))
|
||||
.map(_.map(LocalFile.relativeToSource(_).toFile.getPath))
|
||||
.map(_.toSet)
|
||||
assertResult(expected)(result)
|
||||
}
|
||||
|
@ -73,7 +73,7 @@ class LocalFileStreamSuite extends FunSpec {
|
|||
file("subdir/leaf-file") -> Map(MD5 -> MD5HashData.Leaf.hash)
|
||||
))
|
||||
val configOptions = ConfigOptions(
|
||||
List(
|
||||
List[ConfigOption](
|
||||
ConfigOption.IgnoreGlobalOptions,
|
||||
ConfigOption.IgnoreUserOptions,
|
||||
ConfigOption.Source(sourcePath),
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
package net.kemitix.thorp.core
|
||||
|
||||
import java.time.Instant
|
||||
|
||||
import net.kemitix.thorp.config.Resource
|
||||
import net.kemitix.thorp.core.S3MetaDataEnricher.{getMetadata, getS3Status}
|
||||
import net.kemitix.thorp.domain.HashType.MD5
|
||||
|
@ -9,22 +7,19 @@ import net.kemitix.thorp.domain._
|
|||
import org.scalatest.FunSpec
|
||||
|
||||
class MatchedMetadataEnricherSuite extends FunSpec {
|
||||
private val lastModified = LastModified(Instant.now())
|
||||
private val source = Resource(this, "upload")
|
||||
private val sourcePath = source.toPath
|
||||
private val sources = Sources(List(sourcePath))
|
||||
private val prefix = RemoteKey("prefix")
|
||||
|
||||
def getMatchesByKey(
|
||||
status: (Option[HashModified], Set[(MD5Hash, KeyModified)]))
|
||||
: Option[HashModified] = {
|
||||
status: (Option[MD5Hash], Set[(RemoteKey, MD5Hash)])): Option[MD5Hash] = {
|
||||
val (byKey, _) = status
|
||||
byKey
|
||||
}
|
||||
|
||||
def getMatchesByHash(
|
||||
status: (Option[HashModified], Set[(MD5Hash, KeyModified)]))
|
||||
: Set[(MD5Hash, KeyModified)] = {
|
||||
def getMatchesByHash(status: (Option[MD5Hash], Set[(RemoteKey, MD5Hash)]))
|
||||
: Set[(RemoteKey, MD5Hash)] = {
|
||||
val (_, byHash) = status
|
||||
byHash
|
||||
}
|
||||
|
@ -42,10 +37,10 @@ class MatchedMetadataEnricherSuite extends FunSpec {
|
|||
prefix)
|
||||
theRemoteKey = theFile.remoteKey
|
||||
remoteObjects = RemoteObjects(
|
||||
byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))),
|
||||
byKey = Map(theRemoteKey -> HashModified(theHash, lastModified))
|
||||
byHash = Map(theHash -> Set(theRemoteKey)),
|
||||
byKey = Map(theRemoteKey -> theHash)
|
||||
)
|
||||
theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash, lastModified)
|
||||
theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash)
|
||||
} yield (theFile, theRemoteMetadata, remoteObjects)
|
||||
it("generates valid metadata") {
|
||||
env.map({
|
||||
|
@ -70,10 +65,10 @@ class MatchedMetadataEnricherSuite extends FunSpec {
|
|||
prefix)
|
||||
theRemoteKey: RemoteKey = RemoteKey.resolve("the-file")(prefix)
|
||||
remoteObjects = RemoteObjects(
|
||||
byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))),
|
||||
byKey = Map(theRemoteKey -> HashModified(theHash, lastModified))
|
||||
byHash = Map(theHash -> Set(theRemoteKey)),
|
||||
byKey = Map(theRemoteKey -> theHash)
|
||||
)
|
||||
theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash, lastModified)
|
||||
theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash)
|
||||
} yield (theFile, theRemoteMetadata, remoteObjects)
|
||||
it("generates valid metadata") {
|
||||
env.map({
|
||||
|
@ -98,13 +93,10 @@ class MatchedMetadataEnricherSuite extends FunSpec {
|
|||
prefix)
|
||||
otherRemoteKey = RemoteKey("other-key")
|
||||
remoteObjects = RemoteObjects(
|
||||
byHash =
|
||||
Map(theHash -> Set(KeyModified(otherRemoteKey, lastModified))),
|
||||
byKey = Map(otherRemoteKey -> HashModified(theHash, lastModified))
|
||||
byHash = Map(theHash -> Set(otherRemoteKey)),
|
||||
byKey = Map(otherRemoteKey -> theHash)
|
||||
)
|
||||
otherRemoteMetadata = RemoteMetaData(otherRemoteKey,
|
||||
theHash,
|
||||
lastModified)
|
||||
otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash)
|
||||
} yield (theFile, otherRemoteMetadata, remoteObjects)
|
||||
it("generates valid metadata") {
|
||||
env.map({
|
||||
|
@ -128,7 +120,7 @@ class MatchedMetadataEnricherSuite extends FunSpec {
|
|||
sourcePath,
|
||||
sources,
|
||||
prefix)
|
||||
remoteObjects = RemoteObjects()
|
||||
remoteObjects = RemoteObjects.empty
|
||||
} yield (theFile, remoteObjects)
|
||||
it("generates valid metadata") {
|
||||
env.map({
|
||||
|
@ -157,17 +149,14 @@ class MatchedMetadataEnricherSuite extends FunSpec {
|
|||
otherRemoteKey = RemoteKey.resolve("other-key")(prefix)
|
||||
remoteObjects = RemoteObjects(
|
||||
byHash =
|
||||
Map(oldHash -> Set(KeyModified(theRemoteKey, lastModified)),
|
||||
theHash -> Set(KeyModified(otherRemoteKey, lastModified))),
|
||||
Map(oldHash -> Set(theRemoteKey), theHash -> Set(otherRemoteKey)),
|
||||
byKey = Map(
|
||||
theRemoteKey -> HashModified(oldHash, lastModified),
|
||||
otherRemoteKey -> HashModified(theHash, lastModified)
|
||||
theRemoteKey -> oldHash,
|
||||
otherRemoteKey -> theHash
|
||||
)
|
||||
)
|
||||
theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash, lastModified)
|
||||
otherRemoteMetadata = RemoteMetaData(otherRemoteKey,
|
||||
theHash,
|
||||
lastModified)
|
||||
theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash)
|
||||
otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash)
|
||||
} yield (theFile, theRemoteMetadata, otherRemoteMetadata, remoteObjects)
|
||||
it("generates valid metadata") {
|
||||
env.map({
|
||||
|
@ -197,13 +186,10 @@ class MatchedMetadataEnricherSuite extends FunSpec {
|
|||
theRemoteKey = theFile.remoteKey
|
||||
oldHash = MD5Hash("old-hash")
|
||||
remoteObjects = RemoteObjects(
|
||||
byHash = Map(oldHash -> Set(KeyModified(theRemoteKey, lastModified)),
|
||||
theHash -> Set.empty),
|
||||
byKey = Map(
|
||||
theRemoteKey -> HashModified(oldHash, lastModified)
|
||||
byHash = Map(oldHash -> Set(theRemoteKey), theHash -> Set.empty),
|
||||
byKey = Map(theRemoteKey -> oldHash)
|
||||
)
|
||||
)
|
||||
theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash, lastModified)
|
||||
theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash)
|
||||
} yield (theFile, theRemoteMetadata, remoteObjects)
|
||||
it("generates valid metadata") {
|
||||
env.map({
|
||||
|
@ -243,17 +229,15 @@ class MatchedMetadataEnricherSuite extends FunSpec {
|
|||
sourcePath,
|
||||
sources,
|
||||
prefix)
|
||||
lastModified = LastModified(Instant.now)
|
||||
remoteObjects = RemoteObjects(
|
||||
byHash = Map(
|
||||
hash -> Set(KeyModified(key, lastModified),
|
||||
KeyModified(keyOtherKey.remoteKey, lastModified)),
|
||||
diffHash -> Set(KeyModified(keyDiffHash.remoteKey, lastModified))
|
||||
hash -> Set(key, keyOtherKey.remoteKey),
|
||||
diffHash -> Set(keyDiffHash.remoteKey)
|
||||
),
|
||||
byKey = Map(
|
||||
key -> HashModified(hash, lastModified),
|
||||
keyOtherKey.remoteKey -> HashModified(hash, lastModified),
|
||||
keyDiffHash.remoteKey -> HashModified(diffHash, lastModified)
|
||||
key -> hash,
|
||||
keyOtherKey.remoteKey -> hash,
|
||||
keyDiffHash.remoteKey -> diffHash
|
||||
)
|
||||
)
|
||||
} yield (remoteObjects, localFile, keyDiffHash, diffHash)
|
||||
|
@ -267,7 +251,7 @@ class MatchedMetadataEnricherSuite extends FunSpec {
|
|||
env.map({
|
||||
case (remoteObjects, localFile: LocalFile, _, _) =>
|
||||
val result = getMatchesByKey(invoke(localFile, remoteObjects))
|
||||
assert(result.contains(HashModified(hash, lastModified)))
|
||||
assert(result.contains(hash))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -307,17 +291,15 @@ class MatchedMetadataEnricherSuite extends FunSpec {
|
|||
}
|
||||
|
||||
describe("when remote key exists and no others match hash") {
|
||||
env.map({
|
||||
val _ = env.map({
|
||||
case (remoteObjects, _, keyDiffHash, diffHash) => {
|
||||
it("should return match by key") {
|
||||
val result = getMatchesByKey(invoke(keyDiffHash, remoteObjects))
|
||||
assert(result.contains(HashModified(diffHash, lastModified)))
|
||||
assert(result.contains(diffHash))
|
||||
}
|
||||
it("should return only itself in match by hash") {
|
||||
val result = getMatchesByHash(invoke(keyDiffHash, remoteObjects))
|
||||
assert(
|
||||
result.equals(Set(
|
||||
(diffHash, KeyModified(keyDiffHash.remoteKey, lastModified)))))
|
||||
assert(result === Set((keyDiffHash.remoteKey, diffHash)))
|
||||
}
|
||||
}
|
||||
})
|
||||
|
|
|
@ -21,8 +21,7 @@ import zio.{DefaultRuntime, Task, UIO}
|
|||
|
||||
class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
||||
|
||||
private val lastModified: LastModified = LastModified()
|
||||
private val emptyRemoteObjects = RemoteObjects()
|
||||
private val emptyRemoteObjects = RemoteObjects.empty
|
||||
|
||||
"create a plan" - {
|
||||
|
||||
|
@ -63,9 +62,8 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
val anOtherKey = RemoteKey("other")
|
||||
val expected = Right(List(toCopy(anOtherKey, aHash, remoteKey)))
|
||||
val remoteObjects = RemoteObjects(
|
||||
byHash =
|
||||
Map(aHash -> Set(KeyModified(anOtherKey, lastModified))),
|
||||
byKey = Map(anOtherKey -> HashModified(aHash, lastModified))
|
||||
byHash = Map(aHash -> Set(anOtherKey)),
|
||||
byKey = Map(anOtherKey -> aHash)
|
||||
)
|
||||
val result =
|
||||
invoke(options(source),
|
||||
|
@ -86,9 +84,8 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
// DoNothing actions should have been filtered out of the plan
|
||||
val expected = Right(List())
|
||||
val remoteObjects = RemoteObjects(
|
||||
byHash =
|
||||
Map(hash -> Set(KeyModified(remoteKey, lastModified))),
|
||||
byKey = Map(remoteKey -> HashModified(hash, lastModified))
|
||||
byHash = Map(hash -> Set(remoteKey)),
|
||||
byKey = Map(remoteKey -> hash)
|
||||
)
|
||||
val result =
|
||||
invoke(options(source),
|
||||
|
@ -108,10 +105,8 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
val expected =
|
||||
Right(List(toUpload(remoteKey, currentHash, source, file)))
|
||||
val remoteObjects = RemoteObjects(
|
||||
byHash = Map(originalHash -> Set(
|
||||
KeyModified(remoteKey, lastModified))),
|
||||
byKey =
|
||||
Map(remoteKey -> HashModified(originalHash, lastModified))
|
||||
byHash = Map(originalHash -> Set(remoteKey)),
|
||||
byKey = Map(remoteKey -> originalHash)
|
||||
)
|
||||
val result =
|
||||
invoke(options(source),
|
||||
|
@ -129,9 +124,8 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
val sourceKey = RemoteKey("other-key")
|
||||
val expected = Right(List(toCopy(sourceKey, hash, remoteKey)))
|
||||
val remoteObjects = RemoteObjects(
|
||||
byHash =
|
||||
Map(hash -> Set(KeyModified(sourceKey, lastModified))),
|
||||
byKey = Map()
|
||||
byHash = Map(hash -> Set(sourceKey)),
|
||||
byKey = Map.empty
|
||||
)
|
||||
val result =
|
||||
invoke(options(source),
|
||||
|
@ -155,8 +149,8 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
// DoNothing actions should have been filtered out of the plan
|
||||
val expected = Right(List())
|
||||
val remoteObjects = RemoteObjects(
|
||||
byHash = Map(hash -> Set(KeyModified(remoteKey, lastModified))),
|
||||
byKey = Map(remoteKey -> HashModified(hash, lastModified))
|
||||
byHash = Map(hash -> Set(remoteKey)),
|
||||
byKey = Map(remoteKey -> hash)
|
||||
)
|
||||
val result =
|
||||
invoke(options(source),
|
||||
|
@ -172,8 +166,8 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
val hash = MD5Hash("file-content")
|
||||
val expected = Right(List(toDelete(remoteKey)))
|
||||
val remoteObjects = RemoteObjects(
|
||||
byHash = Map(hash -> Set(KeyModified(remoteKey, lastModified))),
|
||||
byKey = Map(remoteKey -> HashModified(hash, lastModified))
|
||||
byHash = Map(hash -> Set(remoteKey)),
|
||||
byKey = Map(remoteKey -> hash)
|
||||
)
|
||||
val result =
|
||||
invoke(options(source),
|
||||
|
@ -208,7 +202,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
createFile(secondSource, filename2, "file-2-content")
|
||||
val hash2 = md5Hash(fileInSecondSource)
|
||||
val expected = Right(
|
||||
List(
|
||||
Set(
|
||||
toUpload(remoteKey2, hash2, secondSource, fileInSecondSource),
|
||||
toUpload(remoteKey1, hash1, firstSource, fileInFirstSource)
|
||||
))
|
||||
|
@ -219,7 +213,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
UIO.succeed(
|
||||
Map(fileInFirstSource.toPath -> fileInFirstSource,
|
||||
fileInSecondSource.toPath -> fileInSecondSource))
|
||||
)
|
||||
).map(_.toSet)
|
||||
assertResult(expected)(result)
|
||||
})
|
||||
})
|
||||
|
@ -228,11 +222,11 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
"same filename in both" - {
|
||||
"only upload file in first source" in {
|
||||
withDirectory(firstSource => {
|
||||
val fileInFirstSource: File =
|
||||
val fileInFirstSource =
|
||||
createFile(firstSource, filename1, "file-1-content")
|
||||
val hash1 = md5Hash(fileInFirstSource)
|
||||
withDirectory(secondSource => {
|
||||
val fileInSecondSource: File =
|
||||
val fileInSecondSource =
|
||||
createFile(secondSource, filename1, "file-2-content")
|
||||
val hash2 = md5Hash(fileInSecondSource)
|
||||
val expected = Right(List(
|
||||
|
@ -258,10 +252,9 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
createFile(secondSource, filename2, "file-2-content")
|
||||
val hash2 = md5Hash(fileInSecondSource)
|
||||
val expected = Right(List())
|
||||
val remoteObjects = RemoteObjects(
|
||||
byHash =
|
||||
Map(hash2 -> Set(KeyModified(remoteKey2, lastModified))),
|
||||
byKey = Map(remoteKey2 -> HashModified(hash2, lastModified)))
|
||||
val remoteObjects =
|
||||
RemoteObjects(byHash = Map(hash2 -> Set(remoteKey2)),
|
||||
byKey = Map(remoteKey2 -> hash2))
|
||||
val result =
|
||||
invoke(options(firstSource)(secondSource),
|
||||
UIO.succeed(remoteObjects),
|
||||
|
@ -280,10 +273,9 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
val hash1 = md5Hash(fileInFirstSource)
|
||||
withDirectory(secondSource => {
|
||||
val expected = Right(List())
|
||||
val remoteObjects = RemoteObjects(
|
||||
byHash =
|
||||
Map(hash1 -> Set(KeyModified(remoteKey1, lastModified))),
|
||||
byKey = Map(remoteKey1 -> HashModified(hash1, lastModified)))
|
||||
val remoteObjects =
|
||||
RemoteObjects(byHash = Map(hash1 -> Set(remoteKey1)),
|
||||
byKey = Map(remoteKey1 -> hash1))
|
||||
val result =
|
||||
invoke(options(firstSource)(secondSource),
|
||||
UIO.succeed(remoteObjects),
|
||||
|
@ -299,8 +291,9 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
withDirectory(firstSource => {
|
||||
withDirectory(secondSource => {
|
||||
val expected = Right(List(toDelete(remoteKey1)))
|
||||
val remoteObjects = RemoteObjects(byKey =
|
||||
Map(remoteKey1 -> HashModified(MD5Hash(""), lastModified)))
|
||||
val remoteObjects =
|
||||
RemoteObjects(byHash = Map.empty,
|
||||
byKey = Map(remoteKey1 -> MD5Hash("")))
|
||||
val result =
|
||||
invoke(options(firstSource)(secondSource),
|
||||
UIO.succeed(remoteObjects),
|
||||
|
@ -336,7 +329,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
("upload",
|
||||
remoteKey.key,
|
||||
MD5Hash.hash(md5Hash),
|
||||
source.toString,
|
||||
source.toFile.getPath,
|
||||
file.toString)
|
||||
|
||||
private def toCopy(
|
||||
|
@ -398,6 +391,6 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
("copy", sourceKey.key, MD5Hash.hash(hash), targetKey.key, "")
|
||||
case DoNothing(_, remoteKey, _) =>
|
||||
("do-nothing", remoteKey.key, "", "", "")
|
||||
case x => ("other", x.toString, "", "", "")
|
||||
case x => ("other", "", "", "", "")
|
||||
})
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ class SequencePlanTest extends FreeSpec {
|
|||
val source = new File("source")
|
||||
val localFile1 =
|
||||
LocalFile(file1, source, hashes, remoteKey1)
|
||||
val localFile2 =
|
||||
val _ =
|
||||
LocalFile(file2, source, hashes, remoteKey2)
|
||||
val copy1 = ToCopy(bucket, remoteKey1, hash, remoteKey2, size)
|
||||
val copy2 = ToCopy(bucket, remoteKey2, hash, remoteKey1, size)
|
||||
|
@ -36,8 +36,10 @@ class SequencePlanTest extends FreeSpec {
|
|||
val delete1 = ToDelete(bucket, remoteKey1, size)
|
||||
val delete2 = ToDelete(bucket, remoteKey2, size)
|
||||
"should be in correct order" in {
|
||||
val actions = List(copy1, delete1, upload1, delete2, upload2, copy2)
|
||||
val expected = List(copy1, copy2, upload1, upload2, delete1, delete2)
|
||||
val actions =
|
||||
List[Action](copy1, delete1, upload1, delete2, upload2, copy2)
|
||||
val expected =
|
||||
List[Action](copy1, copy2, upload1, upload2, delete1, delete2)
|
||||
val result = actions.sortBy(SequencePlan.order)
|
||||
assertResult(expected)(result)
|
||||
}
|
||||
|
|
|
@ -8,10 +8,13 @@ sealed trait Filter {
|
|||
}
|
||||
|
||||
object Filter {
|
||||
case class Include(include: String = ".*") extends Filter {
|
||||
final case class Include(include: String) extends Filter {
|
||||
lazy val predicate: Predicate[String] = Pattern.compile(include).asPredicate
|
||||
}
|
||||
case class Exclude(exclude: String) extends Filter {
|
||||
object Include {
|
||||
def all: Include = Include(".*")
|
||||
}
|
||||
final case class Exclude(exclude: String) extends Filter {
|
||||
lazy val predicate: Predicate[String] =
|
||||
Pattern.compile(exclude).asPredicate()
|
||||
}
|
||||
|
|
|
@ -1,6 +0,0 @@
|
|||
package net.kemitix.thorp.domain
|
||||
|
||||
final case class HashModified(
|
||||
hash: MD5Hash,
|
||||
modified: LastModified
|
||||
)
|
|
@ -6,7 +6,7 @@ trait HexEncoder {
|
|||
|
||||
def encode(bytes: Array[Byte]): String =
|
||||
String
|
||||
.format("%0" + (bytes.length << 1) + "x", new BigInteger(1, bytes))
|
||||
.format(s"%0${bytes.length << 1}x", new BigInteger(1, bytes))
|
||||
.toUpperCase
|
||||
|
||||
def decode(hexString: String): Array[Byte] =
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
package net.kemitix.thorp.domain
|
||||
|
||||
object Implicits {
|
||||
|
||||
@SuppressWarnings(Array("org.wartremover.warts.Equals"))
|
||||
implicit final class AnyOps[A](self: A) {
|
||||
def ===(other: A): Boolean = self == other
|
||||
def =/=(other: A): Boolean = self != other
|
||||
}
|
||||
|
||||
}
|
|
@ -1,6 +0,0 @@
|
|||
package net.kemitix.thorp.domain
|
||||
|
||||
final case class KeyModified(
|
||||
key: RemoteKey,
|
||||
modified: LastModified
|
||||
)
|
|
@ -1,7 +0,0 @@
|
|||
package net.kemitix.thorp.domain
|
||||
|
||||
import java.time.Instant
|
||||
|
||||
final case class LastModified(
|
||||
when: Instant = Instant.now
|
||||
)
|
|
@ -4,6 +4,7 @@ import java.io.File
|
|||
import java.nio.file.Path
|
||||
|
||||
import net.kemitix.thorp.domain.HashType.MD5
|
||||
import Implicits._
|
||||
|
||||
final case class LocalFile private (
|
||||
file: File,
|
||||
|
@ -20,7 +21,7 @@ object LocalFile {
|
|||
def relativeToSource(localFile: LocalFile): Path =
|
||||
localFile.source.toPath.relativize(localFile.file.toPath)
|
||||
def matchesHash(localFile: LocalFile)(other: MD5Hash): Boolean =
|
||||
localFile.hashes.values.exists(other equals _)
|
||||
localFile.hashes.values.exists(other === _)
|
||||
def md5base64(localFile: LocalFile): Option[String] =
|
||||
localFile.hashes.get(MD5).map(MD5Hash.hash64)
|
||||
}
|
||||
|
|
|
@ -3,6 +3,6 @@ package net.kemitix.thorp.domain
|
|||
// For the LocalFile, the set of matching S3 objects with the same MD5Hash, and any S3 object with the same remote key
|
||||
final case class MatchedMetadata(
|
||||
localFile: LocalFile,
|
||||
matchByHash: Set[RemoteMetaData],
|
||||
matchByHash: Set[RemoteMetaData], //TODO Can this be an Option?
|
||||
matchByKey: Option[RemoteMetaData]
|
||||
)
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
package net.kemitix.thorp.domain
|
||||
|
||||
object NonUnit {
|
||||
@specialized def ~*[A](evaluateForSideEffectOnly: A): Unit = {
|
||||
val _: A = evaluateForSideEffectOnly
|
||||
() //Return unit to prevent warning due to discarding value
|
||||
}
|
||||
}
|
|
@ -1,7 +1,9 @@
|
|||
package net.kemitix.thorp.domain
|
||||
|
||||
import Implicits._
|
||||
|
||||
object QuoteStripper {
|
||||
|
||||
def stripQuotes: Char => Boolean = _ != '"'
|
||||
def stripQuotes: Char => Boolean = _ =/= '"'
|
||||
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package net.kemitix.thorp.domain
|
|||
|
||||
import java.io.File
|
||||
import java.nio.file.{Path, Paths}
|
||||
import Implicits._
|
||||
|
||||
final case class RemoteKey(key: String)
|
||||
|
||||
|
@ -10,7 +11,7 @@ object RemoteKey {
|
|||
SimpleLens[RemoteKey, String](_.key, b => a => b.copy(key = a))
|
||||
def asFile(source: Path, prefix: RemoteKey)(
|
||||
remoteKey: RemoteKey): Option[File] =
|
||||
if (remoteKey.key.length == 0) None
|
||||
if (remoteKey.key.length === 0) None
|
||||
else Some(source.resolve(RemoteKey.relativeTo(prefix)(remoteKey)).toFile)
|
||||
def relativeTo(prefix: RemoteKey)(remoteKey: RemoteKey): Path = {
|
||||
prefix match {
|
||||
|
|
|
@ -2,6 +2,5 @@ package net.kemitix.thorp.domain
|
|||
|
||||
final case class RemoteMetaData(
|
||||
remoteKey: RemoteKey,
|
||||
hash: MD5Hash,
|
||||
lastModified: LastModified
|
||||
hash: MD5Hash
|
||||
)
|
||||
|
|
|
@ -3,7 +3,14 @@ package net.kemitix.thorp.domain
|
|||
/**
|
||||
* A list of objects and their MD5 hash values.
|
||||
*/
|
||||
final case class RemoteObjects(
|
||||
byHash: Map[MD5Hash, Set[KeyModified]] = Map.empty,
|
||||
byKey: Map[RemoteKey, HashModified] = Map.empty
|
||||
final case class RemoteObjects private (
|
||||
byHash: Map[MD5Hash, Set[RemoteKey]],
|
||||
byKey: Map[RemoteKey, MD5Hash]
|
||||
)
|
||||
|
||||
object RemoteObjects {
|
||||
val empty: RemoteObjects = RemoteObjects(Map.empty, Map.empty)
|
||||
def create(byHash: Map[MD5Hash, Set[RemoteKey]],
|
||||
byKey: Map[RemoteKey, MD5Hash]): RemoteObjects =
|
||||
RemoteObjects(byHash, byKey)
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package net.kemitix.thorp.domain
|
||||
|
||||
case class SimpleLens[A, B](field: A => B, update: A => B => A) {
|
||||
final case class SimpleLens[A, B](field: A => B, update: A => B => A) {
|
||||
|
||||
def composeLens[C](other: SimpleLens[B, C]): SimpleLens[A, C] =
|
||||
SimpleLens[A, C](
|
||||
|
|
|
@ -2,9 +2,9 @@ package net.kemitix.thorp.domain
|
|||
|
||||
object SizeTranslation {
|
||||
|
||||
val kbLimit = 10240L
|
||||
val mbLimit = kbLimit * 1024
|
||||
val gbLimit = mbLimit * 1024
|
||||
val kbLimit: Long = 10240L
|
||||
val mbLimit: Long = kbLimit * 1024
|
||||
val gbLimit: Long = mbLimit * 1024
|
||||
|
||||
def sizeInEnglish(length: Long): String =
|
||||
length.toDouble match {
|
||||
|
|
|
@ -14,24 +14,17 @@ import zio.{Task, ZIO}
|
|||
*
|
||||
* A path should only occur once in paths.
|
||||
*/
|
||||
case class Sources(
|
||||
final case class Sources(
|
||||
paths: List[Path]
|
||||
) {
|
||||
def +(path: Path)(implicit m: Monoid[Sources]): Sources = this ++ List(path)
|
||||
def ++(otherPaths: List[Path])(implicit m: Monoid[Sources]): Sources =
|
||||
m.op(this, Sources(otherPaths))
|
||||
def +(path: Path): Sources = this ++ List(path)
|
||||
def ++(otherPaths: List[Path]): Sources =
|
||||
Sources(otherPaths.foldLeft(paths)((acc, path) =>
|
||||
if (acc contains path) acc else acc ++ List(path)))
|
||||
}
|
||||
|
||||
object Sources {
|
||||
final val emptySources = Sources(List.empty)
|
||||
implicit def sourcesAppendMonoid: Monoid[Sources] = new Monoid[Sources] {
|
||||
override def zero: Sources = emptySources
|
||||
override def op(t1: Sources, t2: Sources): Sources =
|
||||
Sources(t2.paths.foldLeft(t1.paths) { (acc, path) =>
|
||||
if (acc.contains(path)) acc
|
||||
else acc ++ List(path)
|
||||
})
|
||||
}
|
||||
val emptySources: Sources = Sources(List.empty)
|
||||
|
||||
/**
|
||||
* Returns the source path for the given path.
|
||||
|
|
|
@ -35,13 +35,13 @@ object StorageQueueEvent {
|
|||
val keys: String
|
||||
}
|
||||
object Action {
|
||||
case class Copy(keys: String) extends Action {
|
||||
final case class Copy(keys: String) extends Action {
|
||||
override val name: String = "Copy"
|
||||
}
|
||||
case class Upload(keys: String) extends Action {
|
||||
final case class Upload(keys: String) extends Action {
|
||||
override val name: String = "Upload"
|
||||
}
|
||||
case class Delete(keys: String) extends Action {
|
||||
final case class Delete(keys: String) extends Action {
|
||||
override val name: String = "Delete"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,15 @@
|
|||
package net.kemitix.thorp.domain
|
||||
|
||||
case class SyncTotals(
|
||||
count: Long = 0L,
|
||||
totalSizeBytes: Long = 0L,
|
||||
sizeUploadedBytes: Long = 0L
|
||||
final case class SyncTotals private (
|
||||
count: Long,
|
||||
totalSizeBytes: Long,
|
||||
sizeUploadedBytes: Long
|
||||
)
|
||||
|
||||
object SyncTotals {
|
||||
val empty: SyncTotals = SyncTotals(0L, 0L, 0L)
|
||||
def create(count: Long,
|
||||
totalSizeBytes: Long,
|
||||
sizeUploadedBytes: Long): SyncTotals =
|
||||
SyncTotals(count, totalSizeBytes, sizeUploadedBytes)
|
||||
}
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
package net.kemitix.thorp.domain
|
||||
|
||||
import Implicits._
|
||||
|
||||
object Terminal {
|
||||
|
||||
val esc: String = "\u001B"
|
||||
|
@ -75,58 +77,58 @@ object Terminal {
|
|||
*
|
||||
* Stops at the edge of the screen.
|
||||
*/
|
||||
def cursorUp(lines: Int = 1): String = csi + lines + "A"
|
||||
def cursorUp(lines: Int): String = s"${csi}${lines}A"
|
||||
|
||||
/**
|
||||
* Move the cursor down, default 1 line.
|
||||
*
|
||||
* Stops at the edge of the screen.
|
||||
*/
|
||||
def cursorDown(lines: Int = 1): String = csi + lines + "B"
|
||||
def cursorDown(lines: Int): String = s"${csi}${lines}B"
|
||||
|
||||
/**
|
||||
* Move the cursor forward, default 1 column.
|
||||
*
|
||||
* Stops at the edge of the screen.
|
||||
*/
|
||||
def cursorForward(cols: Int = 1): String = csi + cols + "C"
|
||||
def cursorForward(cols: Int): String = s"${csi}${cols}C"
|
||||
|
||||
/**
|
||||
* Move the cursor back, default 1 column,
|
||||
*
|
||||
* Stops at the edge of the screen.
|
||||
*/
|
||||
def cursorBack(cols: Int = 1): String = csi + cols + "D"
|
||||
def cursorBack(cols: Int): String = s"${csi}${cols}D"
|
||||
|
||||
/**
|
||||
* Move the cursor to the beginning of the line, default 1, down.
|
||||
*/
|
||||
def cursorNextLine(lines: Int = 1): String = csi + lines + "E"
|
||||
def cursorNextLine(lines: Int): String = s"${csi}${lines}E"
|
||||
|
||||
/**
|
||||
* Move the cursor to the beginning of the line, default 1, up.
|
||||
*/
|
||||
def cursorPrevLine(lines: Int = 1): String = csi + lines + "F"
|
||||
def cursorPrevLine(lines: Int): String = s"${csi}${lines}F"
|
||||
|
||||
/**
|
||||
* Move the cursor to the column on the current line.
|
||||
*/
|
||||
def cursorHorizAbs(col: Int): String = csi + col + "G"
|
||||
def cursorHorizAbs(col: Int): String = s"${csi}${col}G"
|
||||
|
||||
/**
|
||||
* Move the cursor to the position on screen (1,1 is the top-left).
|
||||
*/
|
||||
def cursorPosition(row: Int, col: Int): String = csi + row + ";" + col + "H"
|
||||
def cursorPosition(row: Int, col: Int): String = s"${csi}${row};${col}H"
|
||||
|
||||
/**
|
||||
* Scroll page up, default 1, lines.
|
||||
*/
|
||||
def scrollUp(lines: Int = 1): String = csi + lines + "S"
|
||||
def scrollUp(lines: Int): String = s"${csi}${lines}S"
|
||||
|
||||
/**
|
||||
* Scroll page down, default 1, lines.
|
||||
*/
|
||||
def scrollDown(lines: Int = 1): String = csi + lines + "T"
|
||||
def scrollDown(lines: Int): String = s"${csi}${lines}T"
|
||||
|
||||
/**
|
||||
* The Width of the terminal, as reported by the COLUMNS environment variable.
|
||||
|
@ -154,7 +156,7 @@ object Terminal {
|
|||
val pxDone = pxWidth * ratio
|
||||
val fullHeadSize: Int = (pxDone / phases).toInt
|
||||
val part = (pxDone % phases).toInt
|
||||
val partial = if (part != 0) subBars.getOrElse(part, "") else ""
|
||||
val partial = if (part =/= 0) subBars.getOrElse(part, "") else ""
|
||||
val head = ("█" * fullHeadSize) + partial
|
||||
val tailSize = barWidth - head.length
|
||||
val tail = " " * tailSize
|
||||
|
|
|
@ -1,31 +1,34 @@
|
|||
package net.kemitix.thorp.domain
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
||||
import net.kemitix.thorp.domain.UploadEvent.RequestEvent
|
||||
import net.kemitix.thorp.domain.UploadEventLogger.RequestCycle
|
||||
|
||||
object UploadEventListener {
|
||||
|
||||
case class Settings(
|
||||
final case class Settings(
|
||||
localFile: LocalFile,
|
||||
index: Int,
|
||||
syncTotals: SyncTotals,
|
||||
totalBytesSoFar: Long
|
||||
)
|
||||
|
||||
def apply(settings: Settings): UploadEvent => Unit =
|
||||
uploadEvent => {
|
||||
var bytesTransferred = 0L
|
||||
uploadEvent match {
|
||||
def listener(settings: Settings): UploadEvent => Unit = {
|
||||
val bytesTransferred = new AtomicLong(0L)
|
||||
event =>
|
||||
{
|
||||
event match {
|
||||
case e: RequestEvent =>
|
||||
bytesTransferred += e.transferred
|
||||
UploadEventLogger(
|
||||
RequestCycle(settings.localFile,
|
||||
bytesTransferred,
|
||||
bytesTransferred.addAndGet(e.transferred),
|
||||
settings.index,
|
||||
settings.syncTotals,
|
||||
settings.totalBytesSoFar))
|
||||
case _ => ()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@ import scala.io.AnsiColor._
|
|||
|
||||
object UploadEventLogger {
|
||||
|
||||
case class RequestCycle(
|
||||
final case class RequestCycle(
|
||||
localFile: LocalFile,
|
||||
bytesTransferred: Long,
|
||||
index: Int,
|
||||
|
|
|
@ -3,24 +3,24 @@ package net.kemitix.thorp.domain
|
|||
object MD5HashData {
|
||||
|
||||
object Root {
|
||||
val hash = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e")
|
||||
val base64 = "o6asEaDrV3uBs7tclcyKbg=="
|
||||
val hash: MD5Hash = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e")
|
||||
val base64: String = "o6asEaDrV3uBs7tclcyKbg=="
|
||||
}
|
||||
object Leaf {
|
||||
val hash = MD5Hash("208386a650bdec61cfcd7bd8dcb6b542")
|
||||
val base64 = "IIOGplC97GHPzXvY3La1Qg=="
|
||||
val hash: MD5Hash = MD5Hash("208386a650bdec61cfcd7bd8dcb6b542")
|
||||
val base64: String = "IIOGplC97GHPzXvY3La1Qg=="
|
||||
}
|
||||
object BigFile {
|
||||
val hash = MD5Hash("b1ab1f7680138e6db7309200584e35d8")
|
||||
val hash: MD5Hash = MD5Hash("b1ab1f7680138e6db7309200584e35d8")
|
||||
object Part1 {
|
||||
val offset = 0
|
||||
val size = 1048576
|
||||
val hash = MD5Hash("39d4a9c78b9cfddf6d241a201a4ab726")
|
||||
val offset: Int = 0
|
||||
val size: Int = 1048576
|
||||
val hash: MD5Hash = MD5Hash("39d4a9c78b9cfddf6d241a201a4ab726")
|
||||
}
|
||||
object Part2 {
|
||||
val offset = 1048576
|
||||
val size = 1048576
|
||||
val hash = MD5Hash("af5876f3a3bc6e66f4ae96bb93d8dae0")
|
||||
val offset: Int = 1048576
|
||||
val size: Int = 1048576
|
||||
val hash: MD5Hash = MD5Hash("af5876f3a3bc6e66f4ae96bb93d8dae0")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -4,22 +4,27 @@ import java.io.{File, IOException, PrintWriter}
|
|||
import java.nio.file.attribute.BasicFileAttributes
|
||||
import java.nio.file.{FileVisitResult, Files, Path, SimpleFileVisitor}
|
||||
|
||||
import net.kemitix.thorp.domain.NonUnit.~*
|
||||
|
||||
import scala.util.Try
|
||||
|
||||
trait TemporaryFolder {
|
||||
|
||||
def withDirectory(testCode: Path => Any): Any = {
|
||||
@SuppressWarnings(Array("org.wartremover.warts.TryPartial"))
|
||||
def withDirectory(testCode: Path => Any): Unit = {
|
||||
val dir: Path = Files.createTempDirectory("thorp-temp")
|
||||
val t = Try(testCode(dir))
|
||||
remove(dir)
|
||||
t.get
|
||||
~*(t.get)
|
||||
}
|
||||
|
||||
def remove(root: Path): Unit = {
|
||||
~*(
|
||||
Files.walkFileTree(
|
||||
root,
|
||||
new SimpleFileVisitor[Path] {
|
||||
override def visitFile(file: Path,
|
||||
override def visitFile(
|
||||
file: Path,
|
||||
attrs: BasicFileAttributes): FileVisitResult = {
|
||||
Files.delete(file)
|
||||
FileVisitResult.CONTINUE
|
||||
|
@ -30,16 +35,11 @@ trait TemporaryFolder {
|
|||
FileVisitResult.CONTINUE
|
||||
}
|
||||
}
|
||||
)
|
||||
))
|
||||
}
|
||||
|
||||
def createFile(path: Path, name: String, content: String*): File = {
|
||||
writeFile(path, name, content: _*)
|
||||
path.resolve(name).toFile
|
||||
}
|
||||
|
||||
def writeFile(directory: Path, name: String, contents: String*): File = {
|
||||
directory.toFile.mkdirs
|
||||
def createFile(directory: Path, name: String, contents: String*): File = {
|
||||
val _ = directory.toFile.mkdirs
|
||||
val file = directory.resolve(name).toFile
|
||||
val writer = new PrintWriter(file, "UTF-8")
|
||||
contents.foreach(writer.println)
|
||||
|
@ -47,4 +47,7 @@ trait TemporaryFolder {
|
|||
file
|
||||
}
|
||||
|
||||
def writeFile(directory: Path, name: String, contents: String*): Unit =
|
||||
~*(createFile(directory, name, contents: _*))
|
||||
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@ object FileSystem {
|
|||
|
||||
trait Service {
|
||||
def fileExists(file: File): ZIO[FileSystem, Throwable, Boolean]
|
||||
def openManagedFileInputStream(file: File, offset: Long = 0L)
|
||||
def openManagedFileInputStream(file: File, offset: Long)
|
||||
: RIO[FileSystem, ZManaged[Any, Throwable, FileInputStream]]
|
||||
def fileLines(file: File): RIO[FileSystem, Seq[String]]
|
||||
}
|
||||
|
@ -24,7 +24,7 @@ object FileSystem {
|
|||
override val filesystem: Service = new Service {
|
||||
override def fileExists(
|
||||
file: File
|
||||
): ZIO[FileSystem, Throwable, Boolean] = ZIO(file.exists)
|
||||
): RIO[FileSystem, Boolean] = ZIO(file.exists)
|
||||
|
||||
override def openManagedFileInputStream(file: File, offset: Long)
|
||||
: RIO[FileSystem, ZManaged[Any, Throwable, FileInputStream]] = {
|
||||
|
@ -32,7 +32,7 @@ object FileSystem {
|
|||
def acquire =
|
||||
Task {
|
||||
val stream = new FileInputStream(file)
|
||||
stream skip offset
|
||||
val _ = stream.skip(offset)
|
||||
stream
|
||||
}
|
||||
|
||||
|
@ -59,7 +59,7 @@ object FileSystem {
|
|||
|
||||
override val filesystem: Service = new Service {
|
||||
|
||||
override def fileExists(file: File): ZIO[FileSystem, Throwable, Boolean] =
|
||||
override def fileExists(file: File): RIO[FileSystem, Boolean] =
|
||||
fileExistsResultMap.map(m => m.keys.exists(_ equals file.toPath))
|
||||
|
||||
override def openManagedFileInputStream(file: File, offset: Long)
|
||||
|
@ -71,13 +71,17 @@ object FileSystem {
|
|||
}
|
||||
}
|
||||
|
||||
final def exists(file: File): ZIO[FileSystem, Throwable, Boolean] =
|
||||
final def exists(file: File): RIO[FileSystem, Boolean] =
|
||||
ZIO.accessM(_.filesystem fileExists file)
|
||||
|
||||
final def open(file: File, offset: Long = 0)
|
||||
final def openAtOffset(file: File, offset: Long)
|
||||
: RIO[FileSystem, ZManaged[FileSystem, Throwable, FileInputStream]] =
|
||||
ZIO.accessM(_.filesystem openManagedFileInputStream (file, offset))
|
||||
|
||||
final def open(file: File)
|
||||
: RIO[FileSystem, ZManaged[FileSystem, Throwable, FileInputStream]] =
|
||||
ZIO.accessM(_.filesystem openManagedFileInputStream (file, 0L))
|
||||
|
||||
final def lines(file: File): RIO[FileSystem, Seq[String]] =
|
||||
ZIO.accessM(_.filesystem fileLines (file))
|
||||
}
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.3.2")
|
||||
addSbtPlugin("com.geirsson" % "sbt-ci-release" % "1.2.6")
|
||||
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.9.0")
|
||||
addSbtPlugin("org.wartremover" % "sbt-wartremover" % "2.4.2")
|
||||
|
|
|
@ -18,7 +18,7 @@ object AmazonS3 {
|
|||
|
||||
}
|
||||
|
||||
case class ClientImpl(amazonS3: AmazonS3Client) extends Client {
|
||||
final case class ClientImpl(amazonS3: AmazonS3Client) extends Client {
|
||||
|
||||
def shutdown(): UIO[Unit] =
|
||||
UIO {
|
||||
|
|
|
@ -8,7 +8,15 @@ import net.kemitix.thorp.storage.aws.AmazonUpload.{
|
|||
}
|
||||
import zio.{Task, UIO}
|
||||
|
||||
case class AmazonTransferManager(transferManager: TransferManager) {
|
||||
trait AmazonTransferManager {
|
||||
def shutdownNow(now: Boolean): UIO[Unit]
|
||||
def upload: PutObjectRequest => Task[InProgress]
|
||||
}
|
||||
|
||||
object AmazonTransferManager {
|
||||
|
||||
final case class Wrapper(transferManager: TransferManager)
|
||||
extends AmazonTransferManager {
|
||||
def shutdownNow(now: Boolean): UIO[Unit] =
|
||||
UIO(transferManager.shutdownNow(now))
|
||||
|
||||
|
@ -17,4 +25,6 @@ case class AmazonTransferManager(transferManager: TransferManager) {
|
|||
Task(transferManager.upload(putObjectRequest))
|
||||
.map(CompletableUpload)
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -9,7 +9,7 @@ object AmazonUpload {
|
|||
def waitForUploadResult: UploadResult
|
||||
}
|
||||
|
||||
case class CompletableUpload(upload: Upload) extends InProgress {
|
||||
final case class CompletableUpload(upload: Upload) extends InProgress {
|
||||
override def waitForUploadResult: UploadResult =
|
||||
upload.waitForUploadResult()
|
||||
}
|
||||
|
|
|
@ -7,7 +7,7 @@ import net.kemitix.thorp.domain.StorageQueueEvent.{
|
|||
ErrorQueueEvent
|
||||
}
|
||||
import net.kemitix.thorp.domain.{Bucket, RemoteKey, StorageQueueEvent}
|
||||
import zio.{Task, UIO}
|
||||
import zio.{Task, UIO, ZIO}
|
||||
|
||||
trait Deleter {
|
||||
|
||||
|
@ -16,16 +16,15 @@ trait Deleter {
|
|||
remoteKey: RemoteKey
|
||||
): UIO[StorageQueueEvent] =
|
||||
deleteObject(amazonS3)(bucket, remoteKey)
|
||||
.map(_ => DeleteQueueEvent(remoteKey))
|
||||
.catchAll(e =>
|
||||
UIO(ErrorQueueEvent(Action.Delete(remoteKey.key), remoteKey, e)))
|
||||
|
||||
private def deleteObject(amazonS3: AmazonS3.Client)(
|
||||
bucket: Bucket,
|
||||
remoteKey: RemoteKey
|
||||
): Task[Unit] =
|
||||
amazonS3.deleteObject(new DeleteObjectRequest(bucket.name, remoteKey.key))
|
||||
|
||||
): Task[StorageQueueEvent] =
|
||||
(amazonS3.deleteObject(new DeleteObjectRequest(bucket.name, remoteKey.key))
|
||||
*> ZIO(DeleteQueueEvent(remoteKey)))
|
||||
}
|
||||
|
||||
object Deleter extends Deleter
|
||||
|
|
|
@ -48,7 +48,7 @@ trait Lister {
|
|||
|
||||
fetch(request)
|
||||
.map(summaries => {
|
||||
RemoteObjects(byHash(summaries), byKey(summaries))
|
||||
RemoteObjects.create(byHash(summaries), byKey(summaries))
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -7,11 +7,11 @@ object S3ClientException {
|
|||
override def getMessage: String =
|
||||
"The hash of the object to be overwritten did not match the the expected value"
|
||||
}
|
||||
case class CopyError(error: Throwable) extends S3ClientException {
|
||||
final case class CopyError(error: Throwable) extends S3ClientException {
|
||||
override def getMessage: String =
|
||||
"The hash of the object to be overwritten did not match the the expected value"
|
||||
}
|
||||
case class S3Exception(message: String) extends S3ClientException {
|
||||
final case class S3Exception(message: String) extends S3ClientException {
|
||||
override def getMessage: String = message
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,22 +1,17 @@
|
|||
package net.kemitix.thorp.storage.aws
|
||||
|
||||
import com.amazonaws.services.s3.model.S3ObjectSummary
|
||||
import net.kemitix.thorp.domain.{KeyModified, LastModified, MD5Hash, RemoteKey}
|
||||
import net.kemitix.thorp.domain.{MD5Hash, RemoteKey}
|
||||
|
||||
object S3ObjectsByHash {
|
||||
|
||||
def byHash(
|
||||
os: Stream[S3ObjectSummary]
|
||||
): Map[MD5Hash, Set[KeyModified]] = {
|
||||
): Map[MD5Hash, Set[RemoteKey]] = {
|
||||
val mD5HashToS3Objects: Map[MD5Hash, Stream[S3ObjectSummary]] =
|
||||
os.groupBy(o => MD5Hash(o.getETag.filter(_ != '"')))
|
||||
mD5HashToS3Objects.mapValues { os =>
|
||||
os.map { o =>
|
||||
KeyModified(
|
||||
RemoteKey(o.getKey),
|
||||
LastModified(o.getLastModified.toInstant)
|
||||
)
|
||||
}.toSet
|
||||
os.map(_.getKey).map(RemoteKey(_)).toSet
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,17 +1,16 @@
|
|||
package net.kemitix.thorp.storage.aws
|
||||
|
||||
import com.amazonaws.services.s3.model.S3ObjectSummary
|
||||
import net.kemitix.thorp.domain.{HashModified, LastModified, MD5Hash, RemoteKey}
|
||||
import net.kemitix.thorp.domain.{MD5Hash, RemoteKey}
|
||||
|
||||
object S3ObjectsByKey {
|
||||
|
||||
def byKey(os: Stream[S3ObjectSummary]): Map[RemoteKey, HashModified] =
|
||||
def byKey(os: Stream[S3ObjectSummary]): Map[RemoteKey, MD5Hash] =
|
||||
os.map { o =>
|
||||
{
|
||||
val remoteKey = RemoteKey(o.getKey)
|
||||
val hash = MD5Hash(o.getETag)
|
||||
val lastModified = LastModified(o.getLastModified.toInstant)
|
||||
(remoteKey, HashModified(hash, lastModified))
|
||||
(remoteKey, hash)
|
||||
}
|
||||
}.toMap
|
||||
|
||||
|
|
|
@ -17,7 +17,8 @@ object S3Storage {
|
|||
private val client: AmazonS3.Client =
|
||||
AmazonS3.ClientImpl(AmazonS3ClientBuilder.defaultClient)
|
||||
private val transferManager: AmazonTransferManager =
|
||||
AmazonTransferManager(TransferManagerBuilder.defaultTransferManager)
|
||||
AmazonTransferManager.Wrapper(
|
||||
TransferManagerBuilder.defaultTransferManager)
|
||||
|
||||
override def listObjects(bucket: Bucket,
|
||||
prefix: RemoteKey): RIO[Console, RemoteObjects] =
|
||||
|
@ -42,7 +43,7 @@ object S3Storage {
|
|||
Deleter.delete(client)(bucket, remoteKey)
|
||||
|
||||
override def shutdown: UIO[StorageQueueEvent] = {
|
||||
transferManager.shutdownNow(true)
|
||||
transferManager.shutdownNow(true) *>
|
||||
client.shutdown().map(_ => ShutdownQueueEvent())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
package net.kemitix.thorp.storage.aws
|
||||
|
||||
import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener}
|
||||
import com.amazonaws.event.ProgressEventType.RESPONSE_BYTE_TRANSFER_EVENT
|
||||
import com.amazonaws.event.{ProgressEvent, ProgressListener}
|
||||
import com.amazonaws.services.s3.model.{ObjectMetadata, PutObjectRequest}
|
||||
import net.kemitix.thorp.config.Config
|
||||
import net.kemitix.thorp.domain.Implicits._
|
||||
import net.kemitix.thorp.domain.StorageQueueEvent.{
|
||||
Action,
|
||||
ErrorQueueEvent,
|
||||
|
@ -14,22 +16,18 @@ import net.kemitix.thorp.domain.UploadEvent.{
|
|||
TransferEvent
|
||||
}
|
||||
import net.kemitix.thorp.domain.{StorageQueueEvent, _}
|
||||
import net.kemitix.thorp.storage.aws.Uploader.Request
|
||||
import zio.{UIO, ZIO}
|
||||
|
||||
trait Uploader {
|
||||
|
||||
case class Request(
|
||||
localFile: LocalFile,
|
||||
bucket: Bucket,
|
||||
uploadEventListener: UploadEventListener.Settings
|
||||
)
|
||||
|
||||
def upload(transferManager: => AmazonTransferManager)(
|
||||
request: Request): ZIO[Config, Nothing, StorageQueueEvent] =
|
||||
transfer(transferManager)(request)
|
||||
.catchAll(handleError(request.localFile.remoteKey))
|
||||
|
||||
private def handleError(remoteKey: RemoteKey)(e: Throwable) =
|
||||
private def handleError(remoteKey: RemoteKey)(
|
||||
e: Throwable): UIO[StorageQueueEvent] =
|
||||
UIO(ErrorQueueEvent(Action.Upload(remoteKey.key), remoteKey, e))
|
||||
|
||||
private def transfer(transferManager: => AmazonTransferManager)(
|
||||
|
@ -77,15 +75,15 @@ trait Uploader {
|
|||
listenerSettings =>
|
||||
new ProgressListener {
|
||||
override def progressChanged(progressEvent: ProgressEvent): Unit =
|
||||
UploadEventListener(listenerSettings)(eventHandler(progressEvent))
|
||||
UploadEventListener.listener(listenerSettings)(
|
||||
eventHandler(progressEvent))
|
||||
|
||||
private def eventHandler: ProgressEvent => UploadEvent =
|
||||
progressEvent => {
|
||||
def isTransfer: ProgressEvent => Boolean =
|
||||
_.getEventType.isTransferEvent
|
||||
def isByteTransfer: ProgressEvent => Boolean =
|
||||
_.getEventType.equals(
|
||||
ProgressEventType.RESPONSE_BYTE_TRANSFER_EVENT)
|
||||
(_.getEventType === RESPONSE_BYTE_TRANSFER_EVENT)
|
||||
progressEvent match {
|
||||
case e: ProgressEvent if isTransfer(e) =>
|
||||
TransferEvent(e.getEventType.name)
|
||||
|
@ -101,4 +99,10 @@ trait Uploader {
|
|||
|
||||
}
|
||||
|
||||
object Uploader extends Uploader
|
||||
object Uploader extends Uploader {
|
||||
final case class Request(
|
||||
localFile: LocalFile,
|
||||
bucket: Bucket,
|
||||
uploadEventListener: UploadEventListener.Settings
|
||||
)
|
||||
}
|
||||
|
|
|
@ -10,8 +10,11 @@ import zio.{RIO, UIO, ZIO}
|
|||
|
||||
trait AmazonS3ClientTestFixture extends MockFactory {
|
||||
|
||||
val fixture: Fixture =
|
||||
Fixture(stub[AmazonS3.Client], stub[AmazonTransferManager])
|
||||
@SuppressWarnings(Array("org.wartremover.warts.PublicInference"))
|
||||
private val manager = stub[AmazonTransferManager]
|
||||
@SuppressWarnings(Array("org.wartremover.warts.PublicInference"))
|
||||
private val client = stub[AmazonS3.Client]
|
||||
val fixture: Fixture = Fixture(client, manager)
|
||||
|
||||
case class Fixture(
|
||||
amazonS3Client: AmazonS3.Client,
|
||||
|
@ -53,7 +56,7 @@ trait AmazonS3ClientTestFixture extends MockFactory {
|
|||
Deleter.delete(client)(bucket, remoteKey)
|
||||
|
||||
override def shutdown: UIO[StorageQueueEvent] = {
|
||||
transferManager.shutdownNow(true)
|
||||
transferManager.shutdownNow(true) *>
|
||||
client.shutdown().map(_ => ShutdownQueueEvent())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package net.kemitix.thorp.storage.aws
|
|||
|
||||
import com.amazonaws.services.s3.model.{AmazonS3Exception, CopyObjectResult}
|
||||
import net.kemitix.thorp.console.Console
|
||||
import net.kemitix.thorp.domain.NonUnit.~*
|
||||
import net.kemitix.thorp.domain.StorageQueueEvent.{Action, ErrorQueueEvent}
|
||||
import net.kemitix.thorp.domain._
|
||||
import net.kemitix.thorp.storage.aws.S3ClientException.{CopyError, HashError}
|
||||
|
@ -24,34 +25,36 @@ class CopierTest extends FreeSpec {
|
|||
val event = StorageQueueEvent.CopyQueueEvent(sourceKey, targetKey)
|
||||
val expected = Right(event)
|
||||
new AmazonS3ClientTestFixture {
|
||||
~*(
|
||||
(fixture.amazonS3Client.copyObject _)
|
||||
.when()
|
||||
.returns(_ => Task.succeed(Some(new CopyObjectResult)))
|
||||
.returns(_ => Task.succeed(Some(new CopyObjectResult))))
|
||||
private val result =
|
||||
invoke(bucket, sourceKey, hash, targetKey, fixture.amazonS3Client)
|
||||
assertResult(expected)(result)
|
||||
~*(assertResult(expected)(result))
|
||||
}
|
||||
}
|
||||
}
|
||||
"when source hash does not match" - {
|
||||
"skip the file with an error" in {
|
||||
new AmazonS3ClientTestFixture {
|
||||
~*(
|
||||
(fixture.amazonS3Client.copyObject _)
|
||||
.when()
|
||||
.returns(_ => Task.succeed(None))
|
||||
.returns(_ => Task.succeed(None)))
|
||||
private val result =
|
||||
invoke(bucket, sourceKey, hash, targetKey, fixture.amazonS3Client)
|
||||
result match {
|
||||
~*(result match {
|
||||
case Right(
|
||||
ErrorQueueEvent(Action.Copy("sourceKey => targetKey"),
|
||||
RemoteKey("targetKey"),
|
||||
e)) =>
|
||||
e match {
|
||||
case HashError => assert(true)
|
||||
case _ => fail("Not a HashError: " + e)
|
||||
}
|
||||
case e => fail("Not an ErrorQueueEvent: " + e)
|
||||
case _ => fail(s"Not a HashError: ${e.getMessage}")
|
||||
}
|
||||
case e => fail(s"Not an ErrorQueueEvent: $e")
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -59,12 +62,12 @@ class CopierTest extends FreeSpec {
|
|||
"skip the file with an error" in {
|
||||
new AmazonS3ClientTestFixture {
|
||||
private val expectedMessage = "The specified key does not exist"
|
||||
(fixture.amazonS3Client.copyObject _)
|
||||
~*((fixture.amazonS3Client.copyObject _)
|
||||
.when()
|
||||
.returns(_ => Task.fail(new AmazonS3Exception(expectedMessage)))
|
||||
.returns(_ => Task.fail(new AmazonS3Exception(expectedMessage))))
|
||||
private val result =
|
||||
invoke(bucket, sourceKey, hash, targetKey, fixture.amazonS3Client)
|
||||
result match {
|
||||
~*(result match {
|
||||
case Right(
|
||||
ErrorQueueEvent(Action.Copy("sourceKey => targetKey"),
|
||||
RemoteKey("targetKey"),
|
||||
|
@ -72,10 +75,10 @@ class CopierTest extends FreeSpec {
|
|||
e match {
|
||||
case CopyError(cause) =>
|
||||
assert(cause.getMessage.startsWith(expectedMessage))
|
||||
case _ => fail("Not a CopyError: " + e)
|
||||
}
|
||||
case e => fail("Not an ErrorQueueEvent: " + e)
|
||||
case _ => fail(s"Not a CopyError: ${e.getMessage}")
|
||||
}
|
||||
case e => fail(s"Not an ErrorQueueEvent: ${e}")
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ import net.kemitix.thorp.domain.{Bucket, RemoteKey}
|
|||
import org.scalatest.FreeSpec
|
||||
import zio.internal.PlatformLive
|
||||
import zio.{Runtime, Task, UIO}
|
||||
import net.kemitix.thorp.domain.NonUnit.~*
|
||||
|
||||
class DeleterTest extends FreeSpec {
|
||||
|
||||
|
@ -23,11 +24,12 @@ class DeleterTest extends FreeSpec {
|
|||
"when no errors" in {
|
||||
val expected = Right(DeleteQueueEvent(remoteKey))
|
||||
new AmazonS3ClientTestFixture {
|
||||
~*(
|
||||
(fixture.amazonS3Client.deleteObject _)
|
||||
.when()
|
||||
.returns(_ => UIO.succeed(()))
|
||||
.returns(_ => UIO.succeed(())))
|
||||
private val result = invoke(fixture.amazonS3Client)(bucket, remoteKey)
|
||||
assertResult(expected)(result)
|
||||
~*(assertResult(expected)(result))
|
||||
}
|
||||
}
|
||||
"when Amazon Service Exception" in {
|
||||
|
@ -36,11 +38,12 @@ class DeleterTest extends FreeSpec {
|
|||
Right(
|
||||
ErrorQueueEvent(Action.Delete(remoteKey.key), remoteKey, exception))
|
||||
new AmazonS3ClientTestFixture {
|
||||
~*(
|
||||
(fixture.amazonS3Client.deleteObject _)
|
||||
.when()
|
||||
.returns(_ => Task.fail(exception))
|
||||
.returns(_ => Task.fail(exception)))
|
||||
private val result = invoke(fixture.amazonS3Client)(bucket, remoteKey)
|
||||
assertResult(expected)(result)
|
||||
~*(assertResult(expected)(result))
|
||||
}
|
||||
}
|
||||
"when Amazon SDK Client Exception" in {
|
||||
|
@ -49,11 +52,12 @@ class DeleterTest extends FreeSpec {
|
|||
Right(
|
||||
ErrorQueueEvent(Action.Delete(remoteKey.key), remoteKey, exception))
|
||||
new AmazonS3ClientTestFixture {
|
||||
~*(
|
||||
(fixture.amazonS3Client.deleteObject _)
|
||||
.when()
|
||||
.returns(_ => Task.fail(exception))
|
||||
.returns(_ => Task.fail(exception)))
|
||||
private val result = invoke(fixture.amazonS3Client)(bucket, remoteKey)
|
||||
assertResult(expected)(result)
|
||||
~*(assertResult(expected)(result))
|
||||
}
|
||||
}
|
||||
def invoke(amazonS3Client: AmazonS3.Client)(bucket: Bucket,
|
||||
|
|
|
@ -9,6 +9,7 @@ import com.amazonaws.services.s3.model.{
|
|||
S3ObjectSummary
|
||||
}
|
||||
import net.kemitix.thorp.console._
|
||||
import net.kemitix.thorp.domain.NonUnit.~*
|
||||
import net.kemitix.thorp.domain._
|
||||
import org.scalatest.FreeSpec
|
||||
import zio.internal.PlatformLive
|
||||
|
@ -24,58 +25,50 @@ class ListerTest extends FreeSpec {
|
|||
"when no errors" - {
|
||||
"when single fetch required" in {
|
||||
val nowDate = new Date
|
||||
val nowInstant = nowDate.toInstant
|
||||
val key = "key"
|
||||
val etag = "etag"
|
||||
val expectedHashMap = Map(
|
||||
MD5Hash(etag) -> Set(
|
||||
KeyModified(RemoteKey(key), LastModified(nowInstant))))
|
||||
val expectedKeyMap = Map(
|
||||
RemoteKey(key) -> HashModified(MD5Hash(etag),
|
||||
LastModified(nowInstant))
|
||||
)
|
||||
val expectedHashMap = Map(MD5Hash(etag) -> Set(RemoteKey(key)))
|
||||
val expectedKeyMap = Map(RemoteKey(key) -> MD5Hash(etag))
|
||||
val expected = Right(RemoteObjects(expectedHashMap, expectedKeyMap))
|
||||
new AmazonS3ClientTestFixture {
|
||||
~*(
|
||||
(fixture.amazonS3Client.listObjectsV2 _)
|
||||
.when()
|
||||
.returns(_ => {
|
||||
UIO.succeed(objectResults(nowDate, key, etag, false))
|
||||
})
|
||||
}))
|
||||
private val result = invoke(fixture.amazonS3Client)(bucket, prefix)
|
||||
assertResult(expected)(result)
|
||||
~*(assertResult(expected)(result))
|
||||
}
|
||||
}
|
||||
|
||||
"when second fetch required" in {
|
||||
val nowDate = new Date
|
||||
val nowInstant = nowDate.toInstant
|
||||
val key1 = "key1"
|
||||
val etag1 = "etag1"
|
||||
val key2 = "key2"
|
||||
val etag2 = "etag2"
|
||||
val expectedHashMap = Map(
|
||||
MD5Hash(etag1) -> Set(
|
||||
KeyModified(RemoteKey(key1), LastModified(nowInstant))),
|
||||
MD5Hash(etag2) -> Set(
|
||||
KeyModified(RemoteKey(key2), LastModified(nowInstant)))
|
||||
MD5Hash(etag1) -> Set(RemoteKey(key1)),
|
||||
MD5Hash(etag2) -> Set(RemoteKey(key2))
|
||||
)
|
||||
val expectedKeyMap = Map(
|
||||
RemoteKey(key1) -> HashModified(MD5Hash(etag1),
|
||||
LastModified(nowInstant)),
|
||||
RemoteKey(key2) -> HashModified(MD5Hash(etag2),
|
||||
LastModified(nowInstant))
|
||||
RemoteKey(key1) -> MD5Hash(etag1),
|
||||
RemoteKey(key2) -> MD5Hash(etag2)
|
||||
)
|
||||
val expected = Right(RemoteObjects(expectedHashMap, expectedKeyMap))
|
||||
new AmazonS3ClientTestFixture {
|
||||
~*(
|
||||
(fixture.amazonS3Client.listObjectsV2 _)
|
||||
.when()
|
||||
.returns(_ => UIO(objectResults(nowDate, key1, etag1, true)))
|
||||
.noMoreThanOnce()
|
||||
.noMoreThanOnce())
|
||||
~*(
|
||||
(fixture.amazonS3Client.listObjectsV2 _)
|
||||
.when()
|
||||
.returns(_ => UIO(objectResults(nowDate, key2, etag2, false)))
|
||||
.returns(_ => UIO(objectResults(nowDate, key2, etag2, false))))
|
||||
private val result = invoke(fixture.amazonS3Client)(bucket, prefix)
|
||||
assertResult(expected)(result)
|
||||
~*(assertResult(expected)(result))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -92,7 +85,7 @@ class ListerTest extends FreeSpec {
|
|||
etag: String,
|
||||
truncated: Boolean) = {
|
||||
val result = new ListObjectsV2Result
|
||||
result.getObjectSummaries.add(objectSummary(key, etag, nowDate))
|
||||
~*(result.getObjectSummaries.add(objectSummary(key, etag, nowDate)))
|
||||
result.setTruncated(truncated)
|
||||
result
|
||||
}
|
||||
|
@ -101,9 +94,10 @@ class ListerTest extends FreeSpec {
|
|||
"when Amazon Service Exception" in {
|
||||
val exception = new AmazonS3Exception("message")
|
||||
new AmazonS3ClientTestFixture {
|
||||
~*(
|
||||
(fixture.amazonS3Client.listObjectsV2 _)
|
||||
.when()
|
||||
.returns(_ => Task.fail(exception))
|
||||
.returns(_ => Task.fail(exception)))
|
||||
private val result = invoke(fixture.amazonS3Client)(bucket, prefix)
|
||||
assert(result.isLeft)
|
||||
}
|
||||
|
@ -111,9 +105,10 @@ class ListerTest extends FreeSpec {
|
|||
"when Amazon SDK Client Exception" in {
|
||||
val exception = new SdkClientException("message")
|
||||
new AmazonS3ClientTestFixture {
|
||||
~*(
|
||||
(fixture.amazonS3Client.listObjectsV2 _)
|
||||
.when()
|
||||
.returns(_ => Task.fail(exception))
|
||||
.returns(_ => Task.fail(exception)))
|
||||
private val result = invoke(fixture.amazonS3Client)(bucket, prefix)
|
||||
assert(result.isLeft)
|
||||
}
|
||||
|
|
|
@ -1,11 +1,7 @@
|
|||
package net.kemitix.thorp.storage.aws
|
||||
|
||||
import java.time.Instant
|
||||
import java.time.temporal.ChronoUnit
|
||||
import java.util.Date
|
||||
|
||||
import com.amazonaws.services.s3.model.S3ObjectSummary
|
||||
import net.kemitix.thorp.domain.{KeyModified, LastModified, MD5Hash, RemoteKey}
|
||||
import net.kemitix.thorp.domain.{MD5Hash, RemoteKey}
|
||||
import org.scalatest.FunSpec
|
||||
|
||||
class S3ObjectsByHashSuite extends FunSpec {
|
||||
|
@ -14,14 +10,12 @@ class S3ObjectsByHashSuite extends FunSpec {
|
|||
val hash = MD5Hash("hash")
|
||||
val key1 = RemoteKey("key-1")
|
||||
val key2 = RemoteKey("key-2")
|
||||
val lastModified = LastModified(Instant.now.truncatedTo(ChronoUnit.MILLIS))
|
||||
val o1 = s3object(hash, key1, lastModified)
|
||||
val o2 = s3object(hash, key2, lastModified)
|
||||
val o1 = s3object(hash, key1)
|
||||
val o2 = s3object(hash, key2)
|
||||
val os = Stream(o1, o2)
|
||||
it("should group by the hash value") {
|
||||
val expected: Map[MD5Hash, Set[KeyModified]] = Map(
|
||||
hash -> Set(KeyModified(key1, lastModified),
|
||||
KeyModified(key2, lastModified))
|
||||
val expected: Map[MD5Hash, Set[RemoteKey]] = Map(
|
||||
hash -> Set(key1, key2)
|
||||
)
|
||||
val result = S3ObjectsByHash.byHash(os)
|
||||
assertResult(expected)(result)
|
||||
|
@ -29,12 +23,10 @@ class S3ObjectsByHashSuite extends FunSpec {
|
|||
}
|
||||
|
||||
private def s3object(md5Hash: MD5Hash,
|
||||
remoteKey: RemoteKey,
|
||||
lastModified: LastModified): S3ObjectSummary = {
|
||||
remoteKey: RemoteKey): S3ObjectSummary = {
|
||||
val summary = new S3ObjectSummary()
|
||||
summary.setETag(MD5Hash.hash(md5Hash))
|
||||
summary.setKey(remoteKey.key)
|
||||
summary.setLastModified(Date.from(lastModified.when))
|
||||
summary
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
package net.kemitix.thorp.storage.aws
|
||||
|
||||
import java.time.Instant
|
||||
|
||||
import net.kemitix.thorp.config.Resource
|
||||
import net.kemitix.thorp.core.{LocalFileValidator, S3MetaDataEnricher}
|
||||
import net.kemitix.thorp.domain.HashType.MD5
|
||||
|
@ -37,23 +35,20 @@ class StorageServiceSuite extends FunSpec with MockFactory {
|
|||
sourcePath,
|
||||
sources,
|
||||
prefix)
|
||||
lastModified = LastModified(Instant.now)
|
||||
s3ObjectsData = RemoteObjects(
|
||||
byHash = Map(
|
||||
hash -> Set(KeyModified(key, lastModified),
|
||||
KeyModified(keyOtherKey.remoteKey, lastModified)),
|
||||
diffHash -> Set(KeyModified(keyDiffHash.remoteKey, lastModified))
|
||||
hash -> Set(key, keyOtherKey.remoteKey),
|
||||
diffHash -> Set(keyDiffHash.remoteKey)
|
||||
),
|
||||
byKey = Map(
|
||||
key -> HashModified(hash, lastModified),
|
||||
keyOtherKey.remoteKey -> HashModified(hash, lastModified),
|
||||
keyDiffHash.remoteKey -> HashModified(diffHash, lastModified)
|
||||
key -> hash,
|
||||
keyOtherKey.remoteKey -> hash,
|
||||
keyDiffHash.remoteKey -> diffHash
|
||||
)
|
||||
)
|
||||
} yield
|
||||
(s3ObjectsData,
|
||||
localFile: LocalFile,
|
||||
lastModified,
|
||||
keyOtherKey,
|
||||
keyDiffHash,
|
||||
diffHash,
|
||||
|
@ -62,16 +57,14 @@ class StorageServiceSuite extends FunSpec with MockFactory {
|
|||
def invoke(localFile: LocalFile, s3ObjectsData: RemoteObjects) =
|
||||
S3MetaDataEnricher.getS3Status(localFile, s3ObjectsData)
|
||||
|
||||
def getMatchesByKey(
|
||||
status: (Option[HashModified], Set[(MD5Hash, KeyModified)]))
|
||||
: Option[HashModified] = {
|
||||
def getMatchesByKey(status: (Option[MD5Hash], Set[(RemoteKey, MD5Hash)]))
|
||||
: Option[MD5Hash] = {
|
||||
val (byKey, _) = status
|
||||
byKey
|
||||
}
|
||||
|
||||
def getMatchesByHash(
|
||||
status: (Option[HashModified], Set[(MD5Hash, KeyModified)]))
|
||||
: Set[(MD5Hash, KeyModified)] = {
|
||||
def getMatchesByHash(status: (Option[MD5Hash], Set[(RemoteKey, MD5Hash)]))
|
||||
: Set[(RemoteKey, MD5Hash)] = {
|
||||
val (_, byHash) = status
|
||||
byHash
|
||||
}
|
||||
|
@ -80,25 +73,18 @@ class StorageServiceSuite extends FunSpec with MockFactory {
|
|||
"when remote key exists, unmodified and other key matches the hash") {
|
||||
it("should return the match by key") {
|
||||
env.map({
|
||||
case (s3ObjectsData, localFile, lastModified, _, _, _, _) => {
|
||||
case (s3ObjectsData, localFile, _, _, _, _) => {
|
||||
val result = getMatchesByKey(invoke(localFile, s3ObjectsData))
|
||||
assert(result.contains(HashModified(hash, lastModified)))
|
||||
assert(result.contains(hash))
|
||||
}
|
||||
})
|
||||
}
|
||||
it("should return both matches for the hash") {
|
||||
env.map({
|
||||
case (s3ObjectsData,
|
||||
localFile,
|
||||
lastModified,
|
||||
keyOtherKey,
|
||||
_,
|
||||
_,
|
||||
key) => {
|
||||
case (s3ObjectsData, localFile, keyOtherKey, _, _, key) => {
|
||||
val result = getMatchesByHash(invoke(localFile, s3ObjectsData))
|
||||
assertResult(
|
||||
Set((hash, KeyModified(key, lastModified)),
|
||||
(hash, KeyModified(keyOtherKey.remoteKey, lastModified)))
|
||||
Set((hash, key), (hash, keyOtherKey.remoteKey))
|
||||
)(result)
|
||||
}
|
||||
})
|
||||
|
@ -115,7 +101,7 @@ class StorageServiceSuite extends FunSpec with MockFactory {
|
|||
it("should return no matches by key") {
|
||||
env2.map(localFile => {
|
||||
env.map({
|
||||
case (s3ObjectsData, _, _, _, _, _, _) => {
|
||||
case (s3ObjectsData, _, _, _, _, _) => {
|
||||
val result = getMatchesByKey(invoke(localFile, s3ObjectsData))
|
||||
assert(result.isEmpty)
|
||||
}
|
||||
|
@ -125,7 +111,7 @@ class StorageServiceSuite extends FunSpec with MockFactory {
|
|||
it("should return no matches by hash") {
|
||||
env2.map(localFile => {
|
||||
env.map({
|
||||
case (s3ObjectsData, _, _, _, _, _, _) => {
|
||||
case (s3ObjectsData, _, _, _, _, _) => {
|
||||
val result = getMatchesByHash(invoke(localFile, s3ObjectsData))
|
||||
assert(result.isEmpty)
|
||||
}
|
||||
|
@ -137,31 +123,18 @@ class StorageServiceSuite extends FunSpec with MockFactory {
|
|||
describe("when remote key exists and no others match hash") {
|
||||
it("should return the match by key") {
|
||||
env.map({
|
||||
case (s3ObjectsData,
|
||||
_,
|
||||
lastModified,
|
||||
_,
|
||||
keyDiffHash,
|
||||
diffHash,
|
||||
_) => {
|
||||
case (s3ObjectsData, _, _, keyDiffHash, diffHash, _) => {
|
||||
val result = getMatchesByKey(invoke(keyDiffHash, s3ObjectsData))
|
||||
assert(result.contains(HashModified(diffHash, lastModified)))
|
||||
assert(result.contains(diffHash))
|
||||
}
|
||||
})
|
||||
}
|
||||
it("should return one match by hash") {
|
||||
env.map({
|
||||
case (s3ObjectsData,
|
||||
_,
|
||||
lastModified,
|
||||
_,
|
||||
keyDiffHash,
|
||||
diffHash,
|
||||
_) => {
|
||||
case (s3ObjectsData, _, _, keyDiffHash, diffHash, _) => {
|
||||
val result = getMatchesByHash(invoke(keyDiffHash, s3ObjectsData))
|
||||
assertResult(
|
||||
Set(
|
||||
(diffHash, KeyModified(keyDiffHash.remoteKey, lastModified)))
|
||||
Set((diffHash, keyDiffHash.remoteKey))
|
||||
)(result)
|
||||
}
|
||||
})
|
||||
|
|
|
@ -16,6 +16,7 @@ import net.kemitix.thorp.domain._
|
|||
import org.scalamock.scalatest.MockFactory
|
||||
import org.scalatest.FreeSpec
|
||||
import zio.{DefaultRuntime, Task}
|
||||
import net.kemitix.thorp.domain.NonUnit.~*
|
||||
|
||||
class UploaderTest extends FreeSpec with MockFactory {
|
||||
|
||||
|
@ -39,16 +40,17 @@ class UploaderTest extends FreeSpec with MockFactory {
|
|||
val expected =
|
||||
Right(UploadQueueEvent(remoteKey, aHash))
|
||||
new AmazonS3ClientTestFixture {
|
||||
~*(
|
||||
(fixture.amazonS3TransferManager.upload _)
|
||||
.when()
|
||||
.returns(_ => Task.succeed(inProgress))
|
||||
.returns(_ => Task.succeed(inProgress)))
|
||||
private val result =
|
||||
invoke(fixture.amazonS3TransferManager)(
|
||||
localFile,
|
||||
bucket,
|
||||
listenerSettings
|
||||
)
|
||||
assertResult(expected)(result)
|
||||
~*(assertResult(expected)(result))
|
||||
}
|
||||
}
|
||||
"when Amazon Service Exception" in {
|
||||
|
@ -57,16 +59,17 @@ class UploaderTest extends FreeSpec with MockFactory {
|
|||
Right(
|
||||
ErrorQueueEvent(Action.Upload(remoteKey.key), remoteKey, exception))
|
||||
new AmazonS3ClientTestFixture {
|
||||
~*(
|
||||
(fixture.amazonS3TransferManager.upload _)
|
||||
.when()
|
||||
.returns(_ => Task.fail(exception))
|
||||
.returns(_ => Task.fail(exception)))
|
||||
private val result =
|
||||
invoke(fixture.amazonS3TransferManager)(
|
||||
localFile,
|
||||
bucket,
|
||||
listenerSettings
|
||||
)
|
||||
assertResult(expected)(result)
|
||||
~*(assertResult(expected)(result))
|
||||
}
|
||||
}
|
||||
"when Amazon SDK Client Exception" in {
|
||||
|
@ -75,16 +78,17 @@ class UploaderTest extends FreeSpec with MockFactory {
|
|||
Right(
|
||||
ErrorQueueEvent(Action.Upload(remoteKey.key), remoteKey, exception))
|
||||
new AmazonS3ClientTestFixture {
|
||||
~*(
|
||||
(fixture.amazonS3TransferManager.upload _)
|
||||
.when()
|
||||
.returns(_ => Task.fail(exception))
|
||||
.returns(_ => Task.fail(exception)))
|
||||
private val result =
|
||||
invoke(fixture.amazonS3TransferManager)(
|
||||
localFile,
|
||||
bucket,
|
||||
listenerSettings
|
||||
)
|
||||
assertResult(expected)(result)
|
||||
~*(assertResult(expected)(result))
|
||||
}
|
||||
}
|
||||
def invoke(transferManager: AmazonTransferManager)(
|
||||
|
|
Loading…
Reference in a new issue