From 899e0724c9d63cca271da20faf96e80bdbc8efea Mon Sep 17 00:00:00 2001 From: Paul Campbell Date: Thu, 1 Aug 2019 08:34:58 +0100 Subject: [PATCH] Refactor Copier.copyObject(AmazonS3.Client,Bucket,RemoteKey,MD5Hash,RemoteKey) (#137) * [core] Convert HashService into an effect Hasher * [core] LocalFileStreamSuite fix prep of test env * [core] LocalFileStreamSuite Refactor * [core] Add Hasher.hashObjectChunk * [core] make MD5HashGenerator private Moved MD5HashGenerator into package hasher and made it private to that package, together with the Hasher effect which is the only permitted user of it. Added hex and digest methods to Hasher. Similar for the ETageGenerator in [storage-aws], moved into a hasher package and made private. It can only be accessed using the S3Hasher effect. * [domain] Add Monoid * [domain] Sources use Monoid * [core] ActionGenerator Refactor * [core] Use >>= and *> operators * [config] Refactoring * [storage-aws] Refactoring * [domain] Refactoring * [core] Refactoring * [config] refactoring * Refactoring * [core] PlanBuilder Refactoring * [core] Make PlanBuilder into an object * [storate-aws] Copier Reduce the number of parameters --- build.sbt | 2 +- .../scala/net/kemitix/thorp/cli/Main.scala | 2 + .../scala/net/kemitix/thorp/cli/Program.scala | 5 +- .../kemitix/thorp/config/ConfigOption.scala | 2 +- .../config/ConfigValidationException.scala | 2 +- .../thorp/config/ConfigValidator.scala | 18 ++-- .../thorp/config/ConfigurationBuilder.scala | 6 +- .../thorp/config/ParseConfigFile.scala | 25 ++--- .../thorp/config/ParseConfigLines.scala | 16 +--- .../thorp/config/SourceConfigLoader.scala | 26 ++--- .../thorp/config}/ConfigOptionTest.scala | 10 +- .../thorp/config}/ConfigQueryTest.scala | 3 +- .../config}/ConfigurationBuilderTest.scala | 7 +- .../thorp/config}/ParseConfigFileTest.scala | 8 +- .../thorp/config/ParseConfigLinesTest.scala | 88 +++++++++++++++++ .../kemitix/thorp/core/ActionGenerator.scala | 60 ++++++------ .../net/kemitix/thorp/core/CoreTypes.scala | 3 +- .../net/kemitix/thorp/core/HashService.scala | 16 ---- .../kemitix/thorp/core/LocalFileStream.scala | 46 ++++----- .../net/kemitix/thorp/core/PlanBuilder.scala | 80 ++++++---------- .../thorp/core/SimpleHashService.scala | 19 ---- .../net/kemitix/thorp/core/SyncLogging.scala | 12 +-- .../net/kemitix/thorp/core/ThorpArchive.scala | 14 +-- .../thorp/core/UnversionedMirrorArchive.scala | 15 +-- .../kemitix/thorp/core/hasher/Hasher.scala | 96 +++++++++++++++++++ .../core/{ => hasher}/MD5HashGenerator.scala | 14 ++- .../kemitix/thorp/core/DummyHashService.scala | 14 --- .../thorp/core/LocalFileStreamSuite.scala | 39 ++++---- .../thorp/core/ParseConfigLinesTest.scala | 83 ---------------- .../kemitix/thorp/core/PlanBuilderTest.scala | 49 ++++------ .../{ => hasher}/MD5HashGeneratorTest.scala | 8 +- .../net/kemitix/thorp/domain/Filter.scala | 2 +- .../net/kemitix/thorp/domain/Monoid.scala | 6 ++ .../net/kemitix/thorp/domain/Sources.scala | 23 +++-- .../kemitix/thorp/domain/FiltersSuite.scala | 2 +- .../thorp/domain}/TemporaryFolder.scala | 2 +- .../kemitix/thorp/filesystem/FileSystem.scala | 14 +-- .../kemitix/thorp/storage/aws/Copier.scala | 56 ++++------- .../kemitix/thorp/storage/aws/Lister.scala | 6 +- .../thorp/storage/aws/S3HashService.scala | 35 ------- .../kemitix/thorp/storage/aws/S3Storage.scala | 2 +- .../kemitix/thorp/storage/aws/Uploader.scala | 15 +-- .../aws/{ => hasher}/ETagGenerator.scala | 34 ++++--- .../thorp/storage/aws/hasher/S3Hasher.scala | 45 +++++++++ .../aws/AmazonS3ClientTestFixture.scala | 3 +- .../thorp/storage/aws/CopierTest.scala | 3 +- .../aws/{ => hasher}/ETagGeneratorTest.scala | 23 +++-- 47 files changed, 519 insertions(+), 540 deletions(-) rename {core/src/test/scala/net/kemitix/thorp/core => config/src/test/scala/net/kemitix/thorp/config}/ConfigOptionTest.scala (85%) rename {core/src/test/scala/net/kemitix/thorp/core => config/src/test/scala/net/kemitix/thorp/config}/ConfigQueryTest.scala (96%) rename {core/src/test/scala/net/kemitix/thorp/core => config/src/test/scala/net/kemitix/thorp/config}/ConfigurationBuilderTest.scala (98%) rename {core/src/test/scala/net/kemitix/thorp/core => config/src/test/scala/net/kemitix/thorp/config}/ParseConfigFileTest.scala (91%) create mode 100644 config/src/test/scala/net/kemitix/thorp/config/ParseConfigLinesTest.scala delete mode 100644 core/src/main/scala/net/kemitix/thorp/core/HashService.scala delete mode 100644 core/src/main/scala/net/kemitix/thorp/core/SimpleHashService.scala create mode 100644 core/src/main/scala/net/kemitix/thorp/core/hasher/Hasher.scala rename core/src/main/scala/net/kemitix/thorp/core/{ => hasher}/MD5HashGenerator.scala (88%) delete mode 100644 core/src/test/scala/net/kemitix/thorp/core/DummyHashService.scala delete mode 100644 core/src/test/scala/net/kemitix/thorp/core/ParseConfigLinesTest.scala rename core/src/test/scala/net/kemitix/thorp/core/{ => hasher}/MD5HashGeneratorTest.scala (90%) create mode 100644 domain/src/main/scala/net/kemitix/thorp/domain/Monoid.scala rename {core/src/test/scala/net/kemitix/thorp/core => domain/src/test/scala/net/kemitix/thorp/domain}/TemporaryFolder.scala (97%) delete mode 100644 storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3HashService.scala rename storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/{ => hasher}/ETagGenerator.scala (73%) create mode 100644 storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/hasher/S3Hasher.scala rename storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/{ => hasher}/ETagGeneratorTest.scala (77%) diff --git a/build.sbt b/build.sbt index 9483fe8..abf65be 100644 --- a/build.sbt +++ b/build.sbt @@ -122,7 +122,7 @@ lazy val config = (project in file("config")) .settings(testDependencies) .settings(commandLineParsing) .settings(assemblyJarName in assembly := "config.jar") - .dependsOn(domain) + .dependsOn(domain % "compile->compile;test->test") .dependsOn(filesystem) lazy val filesystem = (project in file("filesystem")) diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala b/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala index 6c7863a..fdfa986 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala @@ -4,6 +4,7 @@ import net.kemitix.thorp.config.Config import net.kemitix.thorp.console.Console import net.kemitix.thorp.filesystem.FileSystem import net.kemitix.thorp.storage.aws.S3Storage +import net.kemitix.thorp.storage.aws.hasher.S3Hasher import zio.{App, ZIO} object Main extends App { @@ -13,6 +14,7 @@ object Main extends App { with Console.Live with Config.Live with FileSystem.Live + with S3Hasher.Live override def run(args: List[String]): ZIO[Environment, Nothing, Int] = Program diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala index 4ac6f47..6acd473 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala @@ -5,7 +5,6 @@ import net.kemitix.thorp.console._ import net.kemitix.thorp.core.CoreTypes.CoreProgram import net.kemitix.thorp.core._ import net.kemitix.thorp.domain.StorageQueueEvent -import net.kemitix.thorp.storage.aws.S3HashService.defaultHashService import zio.ZIO trait Program { @@ -27,7 +26,7 @@ trait Program { private def execute = { for { - plan <- PlanBuilder.createPlan(defaultHashService) + plan <- PlanBuilder.createPlan archive <- UnversionedMirrorArchive.default(plan.syncTotals) events <- applyPlan(archive, plan) _ <- SyncLogging.logRunFinished(events) @@ -39,7 +38,7 @@ trait Program { _ <- Console.putStrLn("There were errors:") _ <- throwable match { case ConfigValidationException(errors) => - ZIO.foreach(errors)(error => Console.putStrLn(s"- $error")) + ZIO.foreach_(errors)(error => Console.putStrLn(s"- $error")) } } yield () diff --git a/config/src/main/scala/net/kemitix/thorp/config/ConfigOption.scala b/config/src/main/scala/net/kemitix/thorp/config/ConfigOption.scala index 04e9b34..f8ba081 100644 --- a/config/src/main/scala/net/kemitix/thorp/config/ConfigOption.scala +++ b/config/src/main/scala/net/kemitix/thorp/config/ConfigOption.scala @@ -14,7 +14,7 @@ object ConfigOption { case class Source(path: Path) extends ConfigOption { override def update(config: Configuration): Configuration = - sources.modify(_ ++ path)(config) + sources.modify(_ + path)(config) } case class Bucket(name: String) extends ConfigOption { diff --git a/config/src/main/scala/net/kemitix/thorp/config/ConfigValidationException.scala b/config/src/main/scala/net/kemitix/thorp/config/ConfigValidationException.scala index b78e35f..927ded2 100644 --- a/config/src/main/scala/net/kemitix/thorp/config/ConfigValidationException.scala +++ b/config/src/main/scala/net/kemitix/thorp/config/ConfigValidationException.scala @@ -1,5 +1,5 @@ package net.kemitix.thorp.config final case class ConfigValidationException( - errors: List[ConfigValidation] + errors: Seq[ConfigValidation] ) extends Exception diff --git a/config/src/main/scala/net/kemitix/thorp/config/ConfigValidator.scala b/config/src/main/scala/net/kemitix/thorp/config/ConfigValidator.scala index 605c82e..0b5e5bb 100644 --- a/config/src/main/scala/net/kemitix/thorp/config/ConfigValidator.scala +++ b/config/src/main/scala/net/kemitix/thorp/config/ConfigValidator.scala @@ -22,17 +22,15 @@ sealed trait ConfigValidator { def validateSources( sources: Sources): Either[List[ConfigValidation], Sources] = - (for { - x <- sources.paths.foldLeft(List[ConfigValidation]()) { - (acc: List[ConfigValidation], path) => - { - validateSource(path) match { - case Left(errors) => acc ++ errors - case Right(_) => acc - } + sources.paths.foldLeft(List[ConfigValidation]()) { + (acc: List[ConfigValidation], path) => + { + validateSource(path) match { + case Left(errors) => acc ++ errors + case Right(_) => acc } - } - } yield x) match { + } + } match { case Nil => Right(sources) case errors => Left(errors) } diff --git a/config/src/main/scala/net/kemitix/thorp/config/ConfigurationBuilder.scala b/config/src/main/scala/net/kemitix/thorp/config/ConfigurationBuilder.scala index f226f1f..3b4fd07 100644 --- a/config/src/main/scala/net/kemitix/thorp/config/ConfigurationBuilder.scala +++ b/config/src/main/scala/net/kemitix/thorp/config/ConfigurationBuilder.scala @@ -17,10 +17,8 @@ trait ConfigurationBuilder { def buildConfig(priorityOpts: ConfigOptions) : ZIO[FileSystem, ConfigValidationException, Configuration] = - (for { - config <- getConfigOptions(priorityOpts).map(collateOptions) - valid <- ConfigValidator.validateConfig(config) - } yield valid) + (getConfigOptions(priorityOpts).map(collateOptions) >>= + ConfigValidator.validateConfig) .catchAll(errors => ZIO.fail(ConfigValidationException(errors))) private def getConfigOptions(priorityOpts: ConfigOptions) = diff --git a/config/src/main/scala/net/kemitix/thorp/config/ParseConfigFile.scala b/config/src/main/scala/net/kemitix/thorp/config/ParseConfigFile.scala index f5bf968..142ee52 100644 --- a/config/src/main/scala/net/kemitix/thorp/config/ParseConfigFile.scala +++ b/config/src/main/scala/net/kemitix/thorp/config/ParseConfigFile.scala @@ -3,24 +3,25 @@ package net.kemitix.thorp.config import java.nio.file.Path import net.kemitix.thorp.filesystem.FileSystem -import zio.{IO, ZIO} +import zio.{IO, TaskR, ZIO} trait ParseConfigFile { def parseFile( - filename: Path): ZIO[FileSystem, List[ConfigValidation], ConfigOptions] = - readFile(filename) - .map(ParseConfigLines.parseLines) - .catchAll(h => - IO.fail( - List(ConfigValidation.ErrorReadingFile(filename, h.getMessage)))) + filename: Path): ZIO[FileSystem, Seq[ConfigValidation], ConfigOptions] = + (readFile(filename) >>= ParseConfigLines.parseLines) + .catchAll( + h => + IO.fail( + List(ConfigValidation.ErrorReadingFile(filename, h.getMessage)))) private def readFile(filename: Path) = - FileSystem - .exists(filename.toFile) - .flatMap( - if (_) FileSystem.lines(filename.toFile) - else ZIO.succeed(List.empty)) + FileSystem.exists(filename.toFile) >>= readLines(filename) + + private def readLines(filename: Path)( + exists: Boolean): TaskR[FileSystem, Seq[String]] = + if (exists) FileSystem.lines(filename.toFile) + else ZIO.succeed(Seq.empty) } diff --git a/config/src/main/scala/net/kemitix/thorp/config/ParseConfigLines.scala b/config/src/main/scala/net/kemitix/thorp/config/ParseConfigLines.scala index c7a1c84..5d35902 100644 --- a/config/src/main/scala/net/kemitix/thorp/config/ParseConfigLines.scala +++ b/config/src/main/scala/net/kemitix/thorp/config/ParseConfigLines.scala @@ -3,22 +3,16 @@ package net.kemitix.thorp.config import java.nio.file.Paths import java.util.regex.Pattern -import net.kemitix.thorp.config.ConfigOption.{ - Bucket, - Debug, - Exclude, - Include, - Prefix, - Source -} +import net.kemitix.thorp.config.ConfigOption._ +import zio.UIO trait ParseConfigLines { private val pattern = "^\\s*(?\\S*)\\s*=\\s*(?\\S*)\\s*$" private val format = Pattern.compile(pattern) - def parseLines(lines: List[String]): ConfigOptions = - ConfigOptions(lines.flatMap(parseLine)) + def parseLines(lines: Seq[String]): UIO[ConfigOptions] = + UIO(ConfigOptions(lines.flatMap(parseLine).toList)) private def parseLine(str: String) = format.matcher(str) match { @@ -40,7 +34,7 @@ trait ParseConfigLines { case _ => None } - def truthy(value: String): Boolean = + private def truthy(value: String): Boolean = value.toLowerCase match { case "true" => true case "yes" => true diff --git a/config/src/main/scala/net/kemitix/thorp/config/SourceConfigLoader.scala b/config/src/main/scala/net/kemitix/thorp/config/SourceConfigLoader.scala index 4be190a..ae941b1 100644 --- a/config/src/main/scala/net/kemitix/thorp/config/SourceConfigLoader.scala +++ b/config/src/main/scala/net/kemitix/thorp/config/SourceConfigLoader.scala @@ -8,24 +8,16 @@ trait SourceConfigLoader { val thorpConfigFileName = ".thorp.conf" - def loadSourceConfigs - : Sources => ZIO[FileSystem, List[ConfigValidation], ConfigOptions] = - sources => { - - val sourceConfigOptions = - ConfigOptions(sources.paths.map(ConfigOption.Source)) - - val reduce: List[ConfigOptions] => ConfigOptions = - _.foldLeft(sourceConfigOptions) { (acc, co) => + def loadSourceConfigs( + sources: Sources): ZIO[FileSystem, Seq[ConfigValidation], ConfigOptions] = + ZIO + .foreach(sources.paths) { path => + ParseConfigFile.parseFile(path.resolve(thorpConfigFileName)) + } + .map(_.foldLeft(ConfigOptions(sources.paths.map(ConfigOption.Source))) { + (acc, co) => acc ++ co - } - - ZIO - .foreach(sources.paths) { path => - ParseConfigFile.parseFile(path.resolve(thorpConfigFileName)) - } - .map(reduce) - } + }) } diff --git a/core/src/test/scala/net/kemitix/thorp/core/ConfigOptionTest.scala b/config/src/test/scala/net/kemitix/thorp/config/ConfigOptionTest.scala similarity index 85% rename from core/src/test/scala/net/kemitix/thorp/core/ConfigOptionTest.scala rename to config/src/test/scala/net/kemitix/thorp/config/ConfigOptionTest.scala index cae4b0d..d19eb73 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/ConfigOptionTest.scala +++ b/config/src/test/scala/net/kemitix/thorp/config/ConfigOptionTest.scala @@ -1,12 +1,6 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.config -import net.kemitix.thorp.config.{ - ConfigOption, - ConfigOptions, - ConfigQuery, - ConfigurationBuilder -} -import net.kemitix.thorp.domain.Sources +import net.kemitix.thorp.domain.{Sources, TemporaryFolder} import net.kemitix.thorp.filesystem.FileSystem import org.scalatest.FunSpec import zio.DefaultRuntime diff --git a/core/src/test/scala/net/kemitix/thorp/core/ConfigQueryTest.scala b/config/src/test/scala/net/kemitix/thorp/config/ConfigQueryTest.scala similarity index 96% rename from core/src/test/scala/net/kemitix/thorp/core/ConfigQueryTest.scala rename to config/src/test/scala/net/kemitix/thorp/config/ConfigQueryTest.scala index 8050300..1a83d6d 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/ConfigQueryTest.scala +++ b/config/src/test/scala/net/kemitix/thorp/config/ConfigQueryTest.scala @@ -1,8 +1,7 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.config import java.nio.file.Paths -import net.kemitix.thorp.config.{ConfigOption, ConfigOptions, ConfigQuery} import net.kemitix.thorp.domain.Sources import org.scalatest.FreeSpec diff --git a/core/src/test/scala/net/kemitix/thorp/core/ConfigurationBuilderTest.scala b/config/src/test/scala/net/kemitix/thorp/config/ConfigurationBuilderTest.scala similarity index 98% rename from core/src/test/scala/net/kemitix/thorp/core/ConfigurationBuilderTest.scala rename to config/src/test/scala/net/kemitix/thorp/config/ConfigurationBuilderTest.scala index 70bc753..56100ed 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/ConfigurationBuilderTest.scala +++ b/config/src/test/scala/net/kemitix/thorp/config/ConfigurationBuilderTest.scala @@ -1,12 +1,7 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.config import java.nio.file.{Path, Paths} -import net.kemitix.thorp.config.{ - ConfigOption, - ConfigOptions, - ConfigurationBuilder -} import net.kemitix.thorp.domain.Filter.{Exclude, Include} import net.kemitix.thorp.domain._ import net.kemitix.thorp.filesystem.FileSystem diff --git a/core/src/test/scala/net/kemitix/thorp/core/ParseConfigFileTest.scala b/config/src/test/scala/net/kemitix/thorp/config/ParseConfigFileTest.scala similarity index 91% rename from core/src/test/scala/net/kemitix/thorp/core/ParseConfigFileTest.scala rename to config/src/test/scala/net/kemitix/thorp/config/ParseConfigFileTest.scala index 9b6524c..c3b62a6 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/ParseConfigFileTest.scala +++ b/config/src/test/scala/net/kemitix/thorp/config/ParseConfigFileTest.scala @@ -1,13 +1,7 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.config import java.nio.file.{Path, Paths} -import net.kemitix.thorp.config.{ - ConfigOption, - ConfigOptions, - ParseConfigFile, - Resource -} import net.kemitix.thorp.filesystem.FileSystem import org.scalatest.FunSpec import zio.DefaultRuntime diff --git a/config/src/test/scala/net/kemitix/thorp/config/ParseConfigLinesTest.scala b/config/src/test/scala/net/kemitix/thorp/config/ParseConfigLinesTest.scala new file mode 100644 index 0000000..8255a54 --- /dev/null +++ b/config/src/test/scala/net/kemitix/thorp/config/ParseConfigLinesTest.scala @@ -0,0 +1,88 @@ +package net.kemitix.thorp.config + +import java.nio.file.Paths + +import org.scalatest.FunSpec +import zio.DefaultRuntime + +class ParseConfigLinesTest extends FunSpec { + + describe("parse single lines") { + describe("source") { + it("should parse") { + val expected = + Right( + ConfigOptions( + List(ConfigOption.Source(Paths.get("/path/to/source"))))) + val result = invoke(List("source = /path/to/source")) + assertResult(expected)(result) + } + } + describe("bucket") { + it("should parse") { + val expected = + Right(ConfigOptions(List(ConfigOption.Bucket("bucket-name")))) + val result = invoke(List("bucket = bucket-name")) + assertResult(expected)(result) + } + } + describe("prefix") { + it("should parse") { + val expected = + Right(ConfigOptions(List(ConfigOption.Prefix("prefix/to/files")))) + val result = invoke(List("prefix = prefix/to/files")) + assertResult(expected)(result) + } + } + describe("include") { + it("should parse") { + val expected = + Right(ConfigOptions(List(ConfigOption.Include("path/to/include")))) + val result = invoke(List("include = path/to/include")) + assertResult(expected)(result) + } + } + describe("exclude") { + it("should parse") { + val expected = + Right(ConfigOptions(List(ConfigOption.Exclude("path/to/exclude")))) + val result = invoke(List("exclude = path/to/exclude")) + assertResult(expected)(result) + } + } + describe("debug - true") { + it("should parse") { + val expected = Right(ConfigOptions(List(ConfigOption.Debug()))) + val result = invoke(List("debug = true")) + assertResult(expected)(result) + } + } + describe("debug - false") { + it("should parse") { + val expected = Right(ConfigOptions()) + val result = invoke(List("debug = false")) + assertResult(expected)(result) + } + } + describe("comment line") { + it("should be ignored") { + val expected = Right(ConfigOptions()) + val result = invoke(List("# ignore me")) + assertResult(expected)(result) + } + } + describe("unrecognised option") { + it("should be ignored") { + val expected = Right(ConfigOptions()) + val result = invoke(List("unsupported = option")) + assertResult(expected)(result) + } + } + + def invoke(lines: List[String]) = { + new DefaultRuntime {}.unsafeRunSync { + ParseConfigLines.parseLines(lines) + }.toEither + } + } +} diff --git a/core/src/main/scala/net/kemitix/thorp/core/ActionGenerator.scala b/core/src/main/scala/net/kemitix/thorp/core/ActionGenerator.scala index b3db88c..05a0108 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ActionGenerator.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ActionGenerator.scala @@ -13,35 +13,37 @@ object ActionGenerator { ): ZIO[Config, Nothing, Stream[Action]] = for { bucket <- Config.bucket - } yield - s3MetaData match { - // #1 local exists, remote exists, remote matches - do nothing - case S3MetaData(localFile, _, Some(RemoteMetaData(key, hash, _))) - if localFile.matches(hash) => - doNothing(bucket, key) - // #2 local exists, remote is missing, other matches - copy - case S3MetaData(localFile, matchByHash, None) if matchByHash.nonEmpty => - copyFile(bucket, localFile, matchByHash) - // #3 local exists, remote is missing, other no matches - upload - case S3MetaData(localFile, matchByHash, None) - if matchByHash.isEmpty && - isUploadAlreadyQueued(previousActions)(localFile) => - uploadFile(bucket, localFile) - // #4 local exists, remote exists, remote no match, other matches - copy - case S3MetaData(localFile, - matchByHash, - Some(RemoteMetaData(_, hash, _))) - if !localFile.matches(hash) && - matchByHash.nonEmpty => - copyFile(bucket, localFile, matchByHash) - // #5 local exists, remote exists, remote no match, other no matches - upload - case S3MetaData(localFile, matchByHash, Some(_)) - if matchByHash.isEmpty => - uploadFile(bucket, localFile) - // fallback - case S3MetaData(localFile, _, _) => - doNothing(bucket, localFile.remoteKey) - } + } yield genAction(s3MetaData, previousActions, bucket) + + private def genAction(s3MetaData: S3MetaData, + previousActions: Stream[Action], + bucket: Bucket): Stream[Action] = { + s3MetaData match { + // #1 local exists, remote exists, remote matches - do nothing + case S3MetaData(localFile, _, Some(RemoteMetaData(key, hash, _))) + if localFile.matches(hash) => + doNothing(bucket, key) + // #2 local exists, remote is missing, other matches - copy + case S3MetaData(localFile, matchByHash, None) if matchByHash.nonEmpty => + copyFile(bucket, localFile, matchByHash) + // #3 local exists, remote is missing, other no matches - upload + case S3MetaData(localFile, matchByHash, None) + if matchByHash.isEmpty && + isUploadAlreadyQueued(previousActions)(localFile) => + uploadFile(bucket, localFile) + // #4 local exists, remote exists, remote no match, other matches - copy + case S3MetaData(localFile, matchByHash, Some(RemoteMetaData(_, hash, _))) + if !localFile.matches(hash) && + matchByHash.nonEmpty => + copyFile(bucket, localFile, matchByHash) + // #5 local exists, remote exists, remote no match, other no matches - upload + case S3MetaData(localFile, matchByHash, Some(_)) if matchByHash.isEmpty => + uploadFile(bucket, localFile) + // fallback + case S3MetaData(localFile, _, _) => + doNothing(bucket, localFile.remoteKey) + } + } private def key = LocalFile.remoteKey ^|-> RemoteKey.key diff --git a/core/src/main/scala/net/kemitix/thorp/core/CoreTypes.scala b/core/src/main/scala/net/kemitix/thorp/core/CoreTypes.scala index ad59bfe..7da3ff9 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/CoreTypes.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/CoreTypes.scala @@ -2,13 +2,14 @@ package net.kemitix.thorp.core import net.kemitix.thorp.config.Config import net.kemitix.thorp.console.Console +import net.kemitix.thorp.core.hasher.Hasher import net.kemitix.thorp.filesystem.FileSystem import net.kemitix.thorp.storage.api.Storage import zio.ZIO object CoreTypes { - type CoreEnv = Storage with Console with Config with FileSystem + type CoreEnv = Storage with Console with Config with FileSystem with Hasher type CoreProgram[A] = ZIO[CoreEnv, Throwable, A] } diff --git a/core/src/main/scala/net/kemitix/thorp/core/HashService.scala b/core/src/main/scala/net/kemitix/thorp/core/HashService.scala deleted file mode 100644 index 2e89a68..0000000 --- a/core/src/main/scala/net/kemitix/thorp/core/HashService.scala +++ /dev/null @@ -1,16 +0,0 @@ -package net.kemitix.thorp.core - -import java.nio.file.Path - -import net.kemitix.thorp.domain.{HashType, MD5Hash} -import net.kemitix.thorp.filesystem.FileSystem -import zio.TaskR - -/** - * Creates one, or more, hashes for local objects. - */ -trait HashService { - - def hashLocalObject(path: Path): TaskR[FileSystem, Map[HashType, MD5Hash]] - -} diff --git a/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala b/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala index 900745d..c1972bf 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala @@ -1,48 +1,43 @@ package net.kemitix.thorp.core +import java.io.File import java.nio.file.Path import net.kemitix.thorp.config.Config import net.kemitix.thorp.core.KeyGenerator.generateKey +import net.kemitix.thorp.core.hasher.Hasher import net.kemitix.thorp.domain._ import net.kemitix.thorp.filesystem.FileSystem import zio.{Task, TaskR, ZIO} object LocalFileStream { - def findFiles(hashService: HashService)( + def findFiles( source: Path - ): TaskR[Config with FileSystem, LocalFiles] = { + ): TaskR[Config with FileSystem with Hasher, LocalFiles] = { def recurseIntoSubDirectories( - path: Path): TaskR[Config with FileSystem, LocalFiles] = + path: Path): TaskR[Config with FileSystem with Hasher, LocalFiles] = path.toFile match { case f if f.isDirectory => loop(path) - case _ => pathToLocalFile(hashService)(path) + case _ => localFile(path) } - def recurse( - paths: Stream[Path]): TaskR[Config with FileSystem, LocalFiles] = + def recurse(paths: Stream[Path]) + : TaskR[Config with FileSystem with Hasher, LocalFiles] = for { recursed <- ZIO.foreach(paths)(path => recurseIntoSubDirectories(path)) } yield LocalFiles.reduce(recursed.toStream) - def loop(path: Path): TaskR[Config with FileSystem, LocalFiles] = { - - for { - paths <- dirPaths(path) - localFiles <- recurse(paths) - } yield localFiles - } + def loop( + path: Path): TaskR[Config with FileSystem with Hasher, LocalFiles] = + dirPaths(path) >>= recurse loop(source) } private def dirPaths(path: Path) = - for { - paths <- listFiles(path) - filtered <- includedDirPaths(paths) - } yield filtered + listFiles(path) >>= includedDirPaths private def includedDirPaths(paths: Stream[Path]) = for { @@ -53,12 +48,12 @@ object LocalFileStream { .filter({ case (_, included) => included }) .map({ case (path, _) => path }) - private def localFile(hashService: HashService)(path: Path) = { + private def localFile(path: Path) = { val file = path.toFile for { sources <- Config.sources prefix <- Config.prefix - hash <- hashService.hashLocalObject(path) + hash <- Hasher.hashObject(path) localFile = LocalFile(file, sources.forPath(path).toFile, hash, @@ -72,16 +67,17 @@ object LocalFileStream { private def listFiles(path: Path) = for { files <- Task(path.toFile.listFiles) - _ <- Task.when(files == null)( - Task.fail(new IllegalArgumentException(s"Directory not found $path"))) + _ <- filesMustExist(path, files) } yield Stream(files: _*).map(_.toPath) + private def filesMustExist(path: Path, files: Array[File]) = { + Task.when(files == null)( + Task.fail(new IllegalArgumentException(s"Directory not found $path"))) + } + private def isIncluded(path: Path) = for { filters <- Config.filters - } yield Filter.isIncluded(filters)(path) - - private def pathToLocalFile(hashService: HashService)(path: Path) = - localFile(hashService)(path) + } yield Filter.isIncluded(path)(filters) } diff --git a/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala b/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala index d613fa1..e04d54c 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala @@ -3,30 +3,36 @@ package net.kemitix.thorp.core import net.kemitix.thorp.config.Config import net.kemitix.thorp.console._ import net.kemitix.thorp.core.Action._ +import net.kemitix.thorp.core.hasher.Hasher import net.kemitix.thorp.domain._ import net.kemitix.thorp.filesystem.FileSystem import net.kemitix.thorp.storage.api.Storage import zio.{TaskR, ZIO} -trait PlanBuilder { +object PlanBuilder { - def createPlan(hashService: HashService) - : TaskR[Storage with Console with Config with FileSystem, SyncPlan] = - for { - _ <- SyncLogging.logRunStart - actions <- buildPlan(hashService) - } yield actions + def createPlan + : TaskR[Storage with Console with Config with FileSystem with Hasher, + SyncPlan] = + SyncLogging.logRunStart *> buildPlan - private def buildPlan(hashService: HashService) = + private def buildPlan = + gatherMetadata >>= assemblePlan + + private def gatherMetadata = + fetchRemoteData &&& findLocalFiles + + private def fetchRemoteData = for { - metadata <- gatherMetadata(hashService) - plan <- assemblePlan(metadata) - } yield plan + bucket <- Config.bucket + prefix <- Config.prefix + objects <- Storage.list(bucket, prefix) + } yield objects private def assemblePlan(metadata: (S3ObjectsData, LocalFiles)) = metadata match { case (remoteData, localData) => - createActions(remoteData, localData) + createActions(remoteData, localData.localFiles) .map(_.filter(doesSomething).sortBy(SequencePlan.order)) .map( SyncPlan(_, SyncTotals(localData.count, localData.totalSizeBytes))) @@ -34,11 +40,11 @@ trait PlanBuilder { private def createActions( remoteData: S3ObjectsData, - localData: LocalFiles + localFiles: Stream[LocalFile] ) = for { - fileActions <- actionsForLocalFiles(remoteData, localData) - remoteActions <- actionsForRemoteKeys(remoteData) + fileActions <- actionsForLocalFiles(remoteData, localFiles) + remoteActions <- actionsForRemoteKeys(remoteData.byKey.keys) } yield fileActions ++ remoteActions private def doesSomething: Action => Boolean = { @@ -48,12 +54,10 @@ trait PlanBuilder { private def actionsForLocalFiles( remoteData: S3ObjectsData, - localData: LocalFiles + localFiles: Stream[LocalFile] ) = - ZIO.foldLeft(localData.localFiles)(Stream.empty[Action])( - (acc, localFile) => - createActionFromLocalFile(remoteData, acc, localFile) - .map(actions => actions ++ acc)) + ZIO.foldLeft(localFiles)(Stream.empty[Action])((acc, localFile) => + createActionFromLocalFile(remoteData, acc, localFile).map(_ ++ acc)) private def createActionFromLocalFile( remoteData: S3ObjectsData, @@ -64,11 +68,9 @@ trait PlanBuilder { S3MetaDataEnricher.getMetadata(localFile, remoteData), previousActions) - private def actionsForRemoteKeys(remoteData: S3ObjectsData) = - ZIO.foldLeft(remoteData.byKey.keys)(Stream.empty[Action]) { - (acc, remoteKey) => - createActionFromRemoteKey(remoteKey).map(action => action #:: acc) - } + private def actionsForRemoteKeys(remoteKeys: Iterable[RemoteKey]) = + ZIO.foldLeft(remoteKeys)(Stream.empty[Action])((acc, remoteKey) => + createActionFromRemoteKey(remoteKey).map(_ #:: acc)) private def createActionFromRemoteKey(remoteKey: RemoteKey) = for { @@ -80,33 +82,13 @@ trait PlanBuilder { if (needsDeleted) ToDelete(bucket, remoteKey, 0L) else DoNothing(bucket, remoteKey, 0L) - private def gatherMetadata(hashService: HashService) = - for { - remoteData <- fetchRemoteData - localData <- findLocalFiles(hashService) - } yield (remoteData, localData) + private def findLocalFiles = + SyncLogging.logFileScan *> findFiles - private def fetchRemoteData = - for { - bucket <- Config.bucket - prefix <- Config.prefix - objects <- Storage.list(bucket, prefix) - } yield objects - - private def findLocalFiles(hashService: HashService) = - for { - _ <- SyncLogging.logFileScan - localFiles <- findFiles(hashService) - } yield localFiles - - private def findFiles(hashService: HashService) = + private def findFiles = for { sources <- Config.sources - paths = sources.paths - found <- ZIO.foreach(paths)(path => - LocalFileStream.findFiles(hashService)(path)) + found <- ZIO.foreach(sources.paths)(LocalFileStream.findFiles) } yield LocalFiles.reduce(found.toStream) } - -object PlanBuilder extends PlanBuilder diff --git a/core/src/main/scala/net/kemitix/thorp/core/SimpleHashService.scala b/core/src/main/scala/net/kemitix/thorp/core/SimpleHashService.scala deleted file mode 100644 index 81de4c5..0000000 --- a/core/src/main/scala/net/kemitix/thorp/core/SimpleHashService.scala +++ /dev/null @@ -1,19 +0,0 @@ -package net.kemitix.thorp.core - -import java.nio.file.Path - -import net.kemitix.thorp.domain.HashType.MD5 -import net.kemitix.thorp.domain.{HashType, MD5Hash} -import net.kemitix.thorp.filesystem.FileSystem -import zio.TaskR - -case class SimpleHashService() extends HashService { - - override def hashLocalObject( - path: Path - ): TaskR[FileSystem, Map[HashType, MD5Hash]] = - for { - md5 <- MD5HashGenerator.md5File(path) - } yield Map(MD5 -> md5) - -} diff --git a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala b/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala index bad5433..11c7631 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala @@ -33,13 +33,11 @@ trait SyncLogging { actions: Stream[StorageQueueEvent] ): ZIO[Console, Nothing, Unit] = { val counters = actions.foldLeft(Counters())(countActivities) - for { - _ <- Console.putStrLn(eraseToEndOfScreen) - _ <- Console.putStrLn(s"Uploaded ${counters.uploaded} files") - _ <- Console.putStrLn(s"Copied ${counters.copied} files") - _ <- Console.putStrLn(s"Deleted ${counters.deleted} files") - _ <- Console.putStrLn(s"Errors ${counters.errors}") - } yield () + Console.putStrLn(eraseToEndOfScreen) *> + Console.putStrLn(s"Uploaded ${counters.uploaded} files") *> + Console.putStrLn(s"Copied ${counters.copied} files") *> + Console.putStrLn(s"Deleted ${counters.deleted} files") *> + Console.putStrLn(s"Errors ${counters.errors}") } private def countActivities: (Counters, StorageQueueEvent) => Counters = diff --git a/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala b/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala index faaaecf..f2adf35 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala @@ -27,7 +27,7 @@ trait ThorpArchive { def logEvent( event: StorageQueueEvent - ): TaskR[Console with Config, Unit] = + ): TaskR[Console with Config, StorageQueueEvent] = event match { case UploadQueueEvent(remoteKey, _) => for { @@ -37,7 +37,7 @@ trait ThorpArchive { _ <- TaskR.when(!batchMode)( Console.putStrLn( s"${GREEN}Uploaded:$RESET ${remoteKey.key}$eraseToEndOfScreen")) - } yield () + } yield event case CopyQueueEvent(sourceKey, targetKey) => for { batchMode <- Config.batchMode @@ -47,7 +47,7 @@ trait ThorpArchive { Console.putStrLn( s"${GREEN}Copied:$RESET ${sourceKey.key} => ${targetKey.key}$eraseToEndOfScreen") ) - } yield () + } yield event case DeleteQueueEvent(remoteKey) => for { batchMode <- Config.batchMode @@ -55,7 +55,7 @@ trait ThorpArchive { _ <- TaskR.when(!batchMode)( Console.putStrLn( s"${GREEN}Deleted:$RESET ${remoteKey.key}$eraseToEndOfScreen")) - } yield () + } yield event case ErrorQueueEvent(action, _, e) => for { batchMode <- Config.batchMode @@ -64,9 +64,9 @@ trait ThorpArchive { s"${action.name} failed: ${action.keys}: ${e.getMessage}")) _ <- TaskR.when(!batchMode)(Console.putStrLn( s"${RED}ERROR:$RESET ${action.name} ${action.keys}: ${e.getMessage}$eraseToEndOfScreen")) - } yield () - case DoNothingQueueEvent(_) => TaskR(()) - case ShutdownQueueEvent() => TaskR(()) + } yield event + case DoNothingQueueEvent(_) => TaskR(event) + case ShutdownQueueEvent() => TaskR(event) } } diff --git a/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala b/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala index ab9f7ae..9fc065b 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala @@ -18,20 +18,11 @@ case class UnversionedMirrorArchive(syncTotals: SyncTotals) ): TaskR[Storage with Console with Config, StorageQueueEvent] = action match { case ToUpload(bucket, localFile, _) => - for { - event <- doUpload(index, totalBytesSoFar, bucket, localFile) - _ <- logEvent(event) - } yield event + doUpload(index, totalBytesSoFar, bucket, localFile) >>= logEvent case ToCopy(bucket, sourceKey, hash, targetKey, _) => - for { - event <- Storage.copy(bucket, sourceKey, hash, targetKey) - _ <- logEvent(event) - } yield event + Storage.copy(bucket, sourceKey, hash, targetKey) >>= logEvent case ToDelete(bucket, remoteKey, _) => - for { - event <- Storage.delete(bucket, remoteKey) - _ <- logEvent(event) - } yield event + Storage.delete(bucket, remoteKey) >>= logEvent case DoNothing(_, remoteKey, _) => Task(DoNothingQueueEvent(remoteKey)) } diff --git a/core/src/main/scala/net/kemitix/thorp/core/hasher/Hasher.scala b/core/src/main/scala/net/kemitix/thorp/core/hasher/Hasher.scala new file mode 100644 index 0000000..476b328 --- /dev/null +++ b/core/src/main/scala/net/kemitix/thorp/core/hasher/Hasher.scala @@ -0,0 +1,96 @@ +package net.kemitix.thorp.core.hasher + +import java.nio.file.Path +import java.util.concurrent.atomic.AtomicReference + +import net.kemitix.thorp.domain.HashType.MD5 +import net.kemitix.thorp.domain.{HashType, MD5Hash} +import net.kemitix.thorp.filesystem.FileSystem +import zio.{TaskR, ZIO} + +/** + * Creates one, or more, hashes for local objects. + */ +trait Hasher { + val hasher: Hasher.Service +} +object Hasher { + trait Service { + def hashObject( + path: Path): TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] + def hashObjectChunk( + path: Path, + chunkNumber: Long, + chunkSize: Long): TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] + def hex(in: Array[Byte]): TaskR[Hasher, String] + def digest(in: String): TaskR[Hasher, Array[Byte]] + } + trait Live extends Hasher { + val hasher: Service = new Service { + override def hashObject( + path: Path): TaskR[FileSystem, Map[HashType, MD5Hash]] = + for { + md5 <- MD5HashGenerator.md5File(path) + } yield Map(MD5 -> md5) + + override def hashObjectChunk(path: Path, + chunkNumber: Long, + chunkSize: Long) + : TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] = + for { + md5 <- MD5HashGenerator.md5FileChunk(path, + chunkNumber * chunkSize, + chunkSize) + } yield Map(MD5 -> md5) + + override def hex(in: Array[Byte]): TaskR[Hasher, String] = + ZIO(MD5HashGenerator.hex(in)) + + override def digest(in: String): TaskR[Hasher, Array[Byte]] = + ZIO(MD5HashGenerator.digest(in)) + } + } + object Live extends Live + + trait Test extends Hasher { + val hashes: AtomicReference[Map[Path, Map[HashType, MD5Hash]]] = + new AtomicReference(Map.empty) + val hashChunks + : AtomicReference[Map[Path, Map[Long, Map[HashType, MD5Hash]]]] = + new AtomicReference(Map.empty) + val hasher: Service = new Service { + override def hashObject( + path: Path): TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] = + ZIO(hashes.get()(path)) + + override def hashObjectChunk(path: Path, + chunkNumber: Long, + chunkSize: Long) + : TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] = + ZIO(hashChunks.get()(path)(chunkNumber)) + + override def hex(in: Array[Byte]): TaskR[Hasher, String] = + ZIO(MD5HashGenerator.hex(in)) + + override def digest(in: String): TaskR[Hasher, Array[Byte]] = + ZIO(MD5HashGenerator.digest(in)) + } + } + object Test extends Test + + final def hashObject( + path: Path): TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] = + ZIO.accessM(_.hasher hashObject path) + + final def hashObjectChunk( + path: Path, + chunkNumber: Long, + chunkSize: Long): TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] = + ZIO.accessM(_.hasher hashObjectChunk (path, chunkNumber, chunkSize)) + + final def hex(in: Array[Byte]): TaskR[Hasher, String] = + ZIO.accessM(_.hasher hex in) + + final def digest(in: String): TaskR[Hasher, Array[Byte]] = + ZIO.accessM(_.hasher digest in) +} diff --git a/core/src/main/scala/net/kemitix/thorp/core/MD5HashGenerator.scala b/core/src/main/scala/net/kemitix/thorp/core/hasher/MD5HashGenerator.scala similarity index 88% rename from core/src/main/scala/net/kemitix/thorp/core/MD5HashGenerator.scala rename to core/src/main/scala/net/kemitix/thorp/core/hasher/MD5HashGenerator.scala index 822d10c..140168a 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/MD5HashGenerator.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/hasher/MD5HashGenerator.scala @@ -1,4 +1,4 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.core.hasher import java.io.{File, FileInputStream} import java.nio.file.Path @@ -10,7 +10,7 @@ import zio.{Task, TaskR} import scala.collection.immutable.NumericRange -object MD5HashGenerator { +private object MD5HashGenerator { val maxBufferSize = 8048 val defaultBuffer = new Array[Byte](maxBufferSize) @@ -48,13 +48,11 @@ object MD5HashGenerator { offset: Long, endOffset: Long ) = - FileSystem - .open(file, offset) - .flatMap { managedFileInputStream => - managedFileInputStream.use { fileInputStream => - digestFile(fileInputStream, offset, endOffset) - } + FileSystem.open(file, offset) >>= { managedFileInputStream => + managedFileInputStream.use { fileInputStream => + digestFile(fileInputStream, offset, endOffset) } + } private def digestFile( fis: FileInputStream, diff --git a/core/src/test/scala/net/kemitix/thorp/core/DummyHashService.scala b/core/src/test/scala/net/kemitix/thorp/core/DummyHashService.scala deleted file mode 100644 index 8a1a9c1..0000000 --- a/core/src/test/scala/net/kemitix/thorp/core/DummyHashService.scala +++ /dev/null @@ -1,14 +0,0 @@ -package net.kemitix.thorp.core - -import java.nio.file.Path - -import net.kemitix.thorp.domain.{HashType, MD5Hash} -import zio.Task - -case class DummyHashService(hashes: Map[Path, Map[HashType, MD5Hash]]) - extends HashService { - - override def hashLocalObject(path: Path): Task[Map[HashType, MD5Hash]] = - Task(hashes(path)) - -} diff --git a/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala index a92d419..17b5755 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala @@ -10,6 +10,7 @@ import net.kemitix.thorp.config.{ Resource } import net.kemitix.thorp.console._ +import net.kemitix.thorp.core.hasher.Hasher import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain._ import net.kemitix.thorp.filesystem.FileSystem @@ -21,30 +22,17 @@ class LocalFileStreamSuite extends FunSpec { private val source = Resource(this, "upload") private val sourcePath = source.toPath - private val hashService: HashService = DummyHashService( - Map( - file("root-file") -> Map(MD5 -> MD5HashData.Root.hash), - file("subdir/leaf-file") -> Map(MD5 -> MD5HashData.Leaf.hash) - )) private def file(filename: String) = sourcePath.resolve(Paths.get(filename)) - private val configOptions = ConfigOptions( - List( - ConfigOption.IgnoreGlobalOptions, - ConfigOption.IgnoreUserOptions, - ConfigOption.Source(sourcePath), - ConfigOption.Bucket("aBucket") - )) - describe("findFiles") { it("should find all files") { val expected = Right(Set("subdir/leaf-file", "root-file")) val result = invoke() .map(_.localFiles) - .map(localFiles => localFiles.map(_.relative.toString)) + .map(_.map(_.relative.toString)) .map(_.toSet) assertResult(expected)(result) } @@ -61,9 +49,13 @@ class LocalFileStreamSuite extends FunSpec { } private def invoke() = { - type TestEnv = Storage with Console with Config with FileSystem + type TestEnv = Storage + with Console + with Config + with FileSystem + with Hasher.Test val testEnv: TestEnv = new Storage.Test with Console.Test with Config.Live - with FileSystem.Live { + with FileSystem.Live with Hasher.Test { override def listResult: Task[S3ObjectsData] = Task.die(new NotImplementedError) override def uploadResult: UIO[StorageQueueEvent] = @@ -75,12 +67,23 @@ class LocalFileStreamSuite extends FunSpec { override def shutdownResult: UIO[StorageQueueEvent] = Task.die(new NotImplementedError) } - + testEnv.hashes.set( + Map( + file("root-file") -> Map(MD5 -> MD5HashData.Root.hash), + file("subdir/leaf-file") -> Map(MD5 -> MD5HashData.Leaf.hash) + )) + val configOptions = ConfigOptions( + List( + ConfigOption.IgnoreGlobalOptions, + ConfigOption.IgnoreUserOptions, + ConfigOption.Source(sourcePath), + ConfigOption.Bucket("aBucket") + )) def testProgram = for { config <- ConfigurationBuilder.buildConfig(configOptions) _ <- Config.set(config) - files <- LocalFileStream.findFiles(hashService)(sourcePath) + files <- LocalFileStream.findFiles(sourcePath) } yield files new DefaultRuntime {}.unsafeRunSync { diff --git a/core/src/test/scala/net/kemitix/thorp/core/ParseConfigLinesTest.scala b/core/src/test/scala/net/kemitix/thorp/core/ParseConfigLinesTest.scala deleted file mode 100644 index 9f0853f..0000000 --- a/core/src/test/scala/net/kemitix/thorp/core/ParseConfigLinesTest.scala +++ /dev/null @@ -1,83 +0,0 @@ -package net.kemitix.thorp.core - -import java.nio.file.Paths - -import net.kemitix.thorp.config.{ConfigOption, ConfigOptions, ParseConfigLines} -import org.scalatest.FunSpec - -class ParseConfigLinesTest extends FunSpec { - - describe("parse single lines") { - describe("source") { - it("should parse") { - val expected = - ConfigOptions(List(ConfigOption.Source(Paths.get("/path/to/source")))) - val result = - ParseConfigLines.parseLines(List("source = /path/to/source")) - assertResult(expected)(result) - } - } - describe("bucket") { - it("should parse") { - val expected = ConfigOptions(List(ConfigOption.Bucket("bucket-name"))) - val result = ParseConfigLines.parseLines(List("bucket = bucket-name")) - assertResult(expected)(result) - } - } - describe("prefix") { - it("should parse") { - val expected = - ConfigOptions(List(ConfigOption.Prefix("prefix/to/files"))) - val result = - ParseConfigLines.parseLines(List("prefix = prefix/to/files")) - assertResult(expected)(result) - } - } - describe("include") { - it("should parse") { - val expected = - ConfigOptions(List(ConfigOption.Include("path/to/include"))) - val result = - ParseConfigLines.parseLines(List("include = path/to/include")) - assertResult(expected)(result) - } - } - describe("exclude") { - it("should parse") { - val expected = - ConfigOptions(List(ConfigOption.Exclude("path/to/exclude"))) - val result = - ParseConfigLines.parseLines(List("exclude = path/to/exclude")) - assertResult(expected)(result) - } - } - describe("debug - true") { - it("should parse") { - val expected = ConfigOptions(List(ConfigOption.Debug())) - val result = ParseConfigLines.parseLines(List("debug = true")) - assertResult(expected)(result) - } - } - describe("debug - false") { - it("should parse") { - val expected = ConfigOptions() - val result = ParseConfigLines.parseLines(List("debug = false")) - assertResult(expected)(result) - } - } - describe("comment line") { - it("should be ignored") { - val expected = ConfigOptions() - val result = ParseConfigLines.parseLines(List("# ignore me")) - assertResult(expected)(result) - } - } - describe("unrecognised option") { - it("should be ignored") { - val expected = ConfigOptions() - val result = ParseConfigLines.parseLines(List("unsupported = option")) - assertResult(expected)(result) - } - } - } -} diff --git a/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala b/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala index 650e855..c5fe30c 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala @@ -11,6 +11,7 @@ import net.kemitix.thorp.config.{ } import net.kemitix.thorp.console._ import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} +import net.kemitix.thorp.core.hasher.Hasher import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain._ import net.kemitix.thorp.filesystem._ @@ -25,8 +26,6 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { "create a plan" - { - val hashService = SimpleHashService() - "one source" - { val options: Path => ConfigOptions = source => @@ -46,8 +45,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { val expected = Right(List(toUpload(remoteKey, hash, source, file))) val result = - invoke(hashService, - options(source), + invoke(options(source), UIO.succeed(emptyS3ObjectData), UIO.succeed(Map(file.toPath -> file))) assertResult(expected)(result) @@ -70,8 +68,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { byKey = Map(anOtherKey -> HashModified(aHash, lastModified)) ) val result = - invoke(hashService, - options(source), + invoke(options(source), UIO.succeed(s3ObjectsData), UIO.succeed(Map(aFile.toPath -> aFile, anOtherFile.toPath -> anOtherFile))) @@ -94,8 +91,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { byKey = Map(remoteKey -> HashModified(hash, lastModified)) ) val result = - invoke(hashService, - options(source), + invoke(options(source), UIO.succeed(s3ObjectsData), UIO.succeed(Map(file.toPath -> file))) assertResult(expected)(result) @@ -118,8 +114,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { Map(remoteKey -> HashModified(originalHash, lastModified)) ) val result = - invoke(hashService, - options(source), + invoke(options(source), UIO.succeed(s3ObjectsData), UIO.succeed(Map(file.toPath -> file))) assertResult(expected)(result) @@ -139,8 +134,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { byKey = Map() ) val result = - invoke(hashService, - options(source), + invoke(options(source), UIO.succeed(s3ObjectsData), UIO.succeed(Map(file.toPath -> file))) assertResult(expected)(result) @@ -165,8 +159,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { byKey = Map(remoteKey -> HashModified(hash, lastModified)) ) val result = - invoke(hashService, - options(source), + invoke(options(source), UIO.succeed(s3ObjectsData), UIO.succeed(Map(file.toPath -> file))) assertResult(expected)(result) @@ -183,8 +176,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { byKey = Map(remoteKey -> HashModified(hash, lastModified)) ) val result = - invoke(hashService, - options(source), + invoke(options(source), UIO.succeed(s3ObjectsData), UIO.succeed(Map.empty)) assertResult(expected)(result) @@ -222,7 +214,6 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { )) val result = invoke( - hashService, options(firstSource)(secondSource), UIO.succeed(emptyS3ObjectData), UIO.succeed( @@ -248,7 +239,6 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { toUpload(remoteKey1, hash1, firstSource, fileInFirstSource))) val result = invoke( - hashService, options(firstSource)(secondSource), UIO.succeed(emptyS3ObjectData), UIO.succeed( @@ -273,8 +263,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { Map(hash2 -> Set(KeyModified(remoteKey2, lastModified))), byKey = Map(remoteKey2 -> HashModified(hash2, lastModified))) val result = - invoke(hashService, - options(firstSource)(secondSource), + invoke(options(firstSource)(secondSource), UIO.succeed(s3ObjectData), UIO.succeed( Map(fileInSecondSource.toPath -> fileInSecondSource))) @@ -296,8 +285,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { Map(hash1 -> Set(KeyModified(remoteKey1, lastModified))), byKey = Map(remoteKey1 -> HashModified(hash1, lastModified))) val result = - invoke(hashService, - options(firstSource)(secondSource), + invoke(options(firstSource)(secondSource), UIO.succeed(s3ObjectData), UIO.succeed( Map(fileInFirstSource.toPath -> fileInFirstSource))) @@ -314,8 +302,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { val s3ObjectData = S3ObjectsData(byKey = Map(remoteKey1 -> HashModified(MD5Hash(""), lastModified))) val result = - invoke(hashService, - options(firstSource)(secondSource), + invoke(options(firstSource)(secondSource), UIO.succeed(s3ObjectData), UIO.succeed(Map.empty)) assertResult(expected)(result) @@ -326,12 +313,13 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { } def md5Hash(file: File): MD5Hash = { + object TestEnv extends Hasher.Live with FileSystem.Live new DefaultRuntime {} .unsafeRunSync { - hashService - .hashLocalObject(file.toPath) + Hasher + .hashObject(file.toPath) .map(_.get(MD5)) - .provide(FileSystem.Live) + .provide(TestEnv) } .toEither .toOption @@ -361,14 +349,13 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { ConfigOptions(List(configOptions: _*)) private def invoke( - hashService: HashService, configOptions: ConfigOptions, result: Task[S3ObjectsData], files: Task[Map[Path, File]] ) = { - type TestEnv = Storage with Console with Config with FileSystem + type TestEnv = Storage with Console with Config with FileSystem with Hasher val testEnv: TestEnv = new Storage.Test with Console.Test with Config.Live - with FileSystem.Live { + with FileSystem.Live with Hasher.Live { override def listResult: Task[S3ObjectsData] = result override def uploadResult: UIO[StorageQueueEvent] = Task.die(new NotImplementedError) @@ -384,7 +371,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { for { config <- ConfigurationBuilder.buildConfig(configOptions) _ <- Config.set(config) - plan <- PlanBuilder.createPlan(hashService) + plan <- PlanBuilder.createPlan } yield plan new DefaultRuntime {} diff --git a/core/src/test/scala/net/kemitix/thorp/core/MD5HashGeneratorTest.scala b/core/src/test/scala/net/kemitix/thorp/core/hasher/MD5HashGeneratorTest.scala similarity index 90% rename from core/src/test/scala/net/kemitix/thorp/core/MD5HashGeneratorTest.scala rename to core/src/test/scala/net/kemitix/thorp/core/hasher/MD5HashGeneratorTest.scala index 050a616..7237e4f 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/MD5HashGeneratorTest.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/hasher/MD5HashGeneratorTest.scala @@ -1,4 +1,4 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.core.hasher import java.nio.file.Path @@ -12,7 +12,7 @@ class MD5HashGeneratorTest extends FunSpec { describe("md5File()") { describe("read a small file (smaller than buffer)") { - val path = Resource(this, "upload/root-file").toPath + val path = Resource(this, "../upload/root-file").toPath it("should generate the correct hash") { val expected = Right(Root.hash) val result = invoke(path) @@ -21,7 +21,7 @@ class MD5HashGeneratorTest extends FunSpec { } describe("read a large file (bigger than buffer)") { - val path = Resource(this, "big-file").toPath + val path = Resource(this, "../big-file").toPath it("should generate the correct hash") { val expected = Right(BigFile.hash) val result = invoke(path) @@ -39,7 +39,7 @@ class MD5HashGeneratorTest extends FunSpec { describe("md5FileChunk") { describe("read chunks of file") { - val path = Resource(this, "big-file").toPath + val path = Resource(this, "../big-file").toPath it("should generate the correct hash for first chunk of the file") { val part1 = BigFile.Part1 val expected = Right(part1.hash.hash) diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/Filter.scala b/domain/src/main/scala/net/kemitix/thorp/domain/Filter.scala index d8b8a8e..3a730b8 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/Filter.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/Filter.scala @@ -7,7 +7,7 @@ sealed trait Filter object Filter { - def isIncluded(filters: List[Filter])(p: Path): Boolean = { + def isIncluded(p: Path)(filters: List[Filter]): Boolean = { sealed trait State case class Unknown() extends State case class Accepted() extends State diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/Monoid.scala b/domain/src/main/scala/net/kemitix/thorp/domain/Monoid.scala new file mode 100644 index 0000000..e43f060 --- /dev/null +++ b/domain/src/main/scala/net/kemitix/thorp/domain/Monoid.scala @@ -0,0 +1,6 @@ +package net.kemitix.thorp.domain + +trait Monoid[T] { + def zero: T + def op(t1: T, t2: T): T +} diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/Sources.scala b/domain/src/main/scala/net/kemitix/thorp/domain/Sources.scala index fbdb4e9..3bb13ef 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/Sources.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/Sources.scala @@ -15,12 +15,9 @@ import java.nio.file.Path case class Sources( paths: List[Path] ) { - def ++(path: Path): Sources = this ++ List(path) - def ++(otherPaths: List[Path]): Sources = - Sources(otherPaths.foldLeft(paths) { (acc, path) => - if (acc.contains(path)) acc - else acc ++ List(path) - }) + def +(path: Path)(implicit m: Monoid[Sources]): Sources = this ++ List(path) + def ++(otherPaths: List[Path])(implicit m: Monoid[Sources]): Sources = + m.op(this, Sources(otherPaths)) /** * Returns the source path for the given path. @@ -28,3 +25,17 @@ case class Sources( def forPath(path: Path): Path = paths.find(source => path.startsWith(source)).get } + +object Sources { + + final val emptySources = Sources(List.empty) + + implicit def sourcesAppendMonoid: Monoid[Sources] = new Monoid[Sources] { + override def zero: Sources = emptySources + override def op(t1: Sources, t2: Sources): Sources = + Sources(t2.paths.foldLeft(t1.paths) { (acc, path) => + if (acc.contains(path)) acc + else acc ++ List(path) + }) + } +} diff --git a/domain/src/test/scala/net/kemitix/thorp/domain/FiltersSuite.scala b/domain/src/test/scala/net/kemitix/thorp/domain/FiltersSuite.scala index 36988a2..3523c2f 100644 --- a/domain/src/test/scala/net/kemitix/thorp/domain/FiltersSuite.scala +++ b/domain/src/test/scala/net/kemitix/thorp/domain/FiltersSuite.scala @@ -86,7 +86,7 @@ class FiltersSuite extends FunSpec { } describe("isIncluded") { def invoke(filters: List[Filter]) = { - paths.filter(path => Filter.isIncluded(filters)(path)) + paths.filter(path => Filter.isIncluded(path)(filters)) } describe("when there are no filters") { diff --git a/core/src/test/scala/net/kemitix/thorp/core/TemporaryFolder.scala b/domain/src/test/scala/net/kemitix/thorp/domain/TemporaryFolder.scala similarity index 97% rename from core/src/test/scala/net/kemitix/thorp/core/TemporaryFolder.scala rename to domain/src/test/scala/net/kemitix/thorp/domain/TemporaryFolder.scala index c8c2cbc..4b481df 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/TemporaryFolder.scala +++ b/domain/src/test/scala/net/kemitix/thorp/domain/TemporaryFolder.scala @@ -1,4 +1,4 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.domain import java.io.{File, IOException, PrintWriter} import java.nio.file.attribute.BasicFileAttributes diff --git a/filesystem/src/main/scala/net/kemitix/thorp/filesystem/FileSystem.scala b/filesystem/src/main/scala/net/kemitix/thorp/filesystem/FileSystem.scala index 552e677..e43bb4b 100644 --- a/filesystem/src/main/scala/net/kemitix/thorp/filesystem/FileSystem.scala +++ b/filesystem/src/main/scala/net/kemitix/thorp/filesystem/FileSystem.scala @@ -18,7 +18,7 @@ object FileSystem { def fileExists(file: File): ZIO[FileSystem, Throwable, Boolean] def openManagedFileInputStream(file: File, offset: Long = 0L) : TaskR[FileSystem, ZManaged[Any, Throwable, FileInputStream]] - def fileLines(file: File): TaskR[FileSystem, List[String]] + def fileLines(file: File): TaskR[FileSystem, Seq[String]] } trait Live extends FileSystem { override val filesystem: Service = new Service { @@ -42,17 +42,11 @@ object FileSystem { ZIO(ZManaged.make(acquire)(release)) } - override def fileLines(file: File): TaskR[FileSystem, List[String]] = { - + override def fileLines(file: File): TaskR[FileSystem, Seq[String]] = { def acquire = ZIO(Files.lines(file.toPath)) - - def release(lines: stream.Stream[String]) = - ZIO.effectTotal(lines.close()) - def use(lines: stream.Stream[String]) = ZIO.effectTotal(lines.iterator.asScala.toList) - - ZIO.bracket(acquire)(release)(use) + acquire.bracketAuto(use) } } } @@ -84,6 +78,6 @@ object FileSystem { : TaskR[FileSystem, ZManaged[FileSystem, Throwable, FileInputStream]] = ZIO.accessM(_.filesystem openManagedFileInputStream (file, offset)) - final def lines(file: File): TaskR[FileSystem, List[String]] = + final def lines(file: File): TaskR[FileSystem, Seq[String]] = ZIO.accessM(_.filesystem fileLines (file)) } diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Copier.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Copier.scala index b95bb2b..ca0e315 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Copier.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Copier.scala @@ -14,48 +14,34 @@ import zio.{IO, Task, UIO} trait Copier { def copy(amazonS3: AmazonS3.Client)( + request: Request): UIO[StorageQueueEvent] = + copyObject(amazonS3)(request) + .fold(foldFailure(request.sourceKey, request.targetKey), + foldSuccess(request.sourceKey, request.targetKey)) + + case class Request( bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey - ): UIO[StorageQueueEvent] = - copyObject(amazonS3)(bucket, sourceKey, hash, targetKey) - .fold(foldFailure(sourceKey, targetKey), - foldSuccess(sourceKey, targetKey)) + ) - private def copyObject(amazonS3: AmazonS3.Client)( - bucket: Bucket, - sourceKey: RemoteKey, - hash: MD5Hash, - targetKey: RemoteKey - ): IO[S3ClientException, CopyObjectResult] = { - - def handleResult - : Option[CopyObjectResult] => IO[S3ClientException, CopyObjectResult] = - maybeResult => - IO.fromEither { - maybeResult - .toRight(HashError) - } - - def handleError: Throwable => IO[S3ClientException, CopyObjectResult] = - error => - Task.fail { - CopyError(error) - } - - val request = - new CopyObjectRequest( - bucket.name, - sourceKey.key, - bucket.name, - targetKey.key - ).withMatchingETagConstraint(hash.hash) + private def copyObject(amazonS3: AmazonS3.Client)(request: Request) = amazonS3 - .copyObject(request) - .fold(handleError, handleResult) + .copyObject(copyObjectRequest(request)) + .fold( + error => Task.fail(CopyError(error)), + result => IO.fromEither(result.toRight(HashError)) + ) .flatten - } + + private def copyObjectRequest(copyRequest: Request) = + new CopyObjectRequest( + copyRequest.bucket.name, + copyRequest.sourceKey.key, + copyRequest.bucket.name, + copyRequest.targetKey.key + ).withMatchingETagConstraint(copyRequest.hash.hash) private def foldFailure( sourceKey: RemoteKey, diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala index 969c860..6bca9df 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala @@ -32,11 +32,7 @@ trait Lister { token => request.withContinuationToken(token) def fetchBatch: ListObjectsV2Request => TaskR[Console, Batch] = - request => - for { - _ <- ListerLogger.logFetchBatch - batch <- tryFetchBatch(amazonS3)(request) - } yield batch + request => ListerLogger.logFetchBatch *> tryFetchBatch(amazonS3)(request) def fetchMore: Option[Token] => TaskR[Console, Stream[S3ObjectSummary]] = { case None => TaskR.succeed(Stream.empty) diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3HashService.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3HashService.scala deleted file mode 100644 index 1a4184b..0000000 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3HashService.scala +++ /dev/null @@ -1,35 +0,0 @@ -package net.kemitix.thorp.storage.aws - -import java.nio.file.Path - -import net.kemitix.thorp.core.{HashService, MD5HashGenerator} -import net.kemitix.thorp.domain.HashType.MD5 -import net.kemitix.thorp.domain.{HashType, MD5Hash} -import net.kemitix.thorp.filesystem.FileSystem -import zio.TaskR - -trait S3HashService extends HashService { - - /** - * Generates an MD5 Hash and an multi-part ETag - * - * @param path the local path to scan - * @return a set of hash values - */ - override def hashLocalObject( - path: Path - ): TaskR[FileSystem, Map[HashType, MD5Hash]] = - for { - md5 <- MD5HashGenerator.md5File(path) - etag <- ETagGenerator.eTag(path).map(MD5Hash(_)) - } yield - Map( - MD5 -> md5, - ETag -> etag - ) - -} - -object S3HashService extends S3HashService { - lazy val defaultHashService: HashService = S3HashService -} diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala index b89b4ef..acedb1f 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala @@ -38,7 +38,7 @@ object S3Storage { sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey): UIO[StorageQueueEvent] = - Copier.copy(client)(bucket, sourceKey, hash, targetKey) + Copier.copy(client)(Copier.Request(bucket, sourceKey, hash, targetKey)) override def delete(bucket: Bucket, remoteKey: RemoteKey): UIO[StorageQueueEvent] = diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala index 6bd1589..e3ad3db 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala @@ -28,22 +28,17 @@ trait Uploader { .catchAll(handleError(localFile.remoteKey)) private def handleError(remoteKey: RemoteKey)(e: Throwable) = - UIO.succeed(ErrorQueueEvent(Action.Upload(remoteKey.key), remoteKey, e)) + UIO(ErrorQueueEvent(Action.Upload(remoteKey.key), remoteKey, e)) private def transfer(transferManager: => AmazonTransferManager)( localFile: LocalFile, bucket: Bucket, uploadEventListener: UploadEventListener - ) = { - val listener = progressListener(uploadEventListener) - for { - putObjectRequest <- request(localFile, bucket, listener) - event <- dispatch(transferManager, putObjectRequest) - } yield event - } + ) = + request(localFile, bucket, progressListener(uploadEventListener)) >>= + dispatch(transferManager) - private def dispatch( - transferManager: AmazonTransferManager, + private def dispatch(transferManager: AmazonTransferManager)( putObjectRequest: PutObjectRequest ) = { transferManager diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ETagGenerator.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/hasher/ETagGenerator.scala similarity index 73% rename from storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ETagGenerator.scala rename to storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/hasher/ETagGenerator.scala index ff111b4..ada93e4 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ETagGenerator.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/hasher/ETagGenerator.scala @@ -1,29 +1,31 @@ -package net.kemitix.thorp.storage.aws +package net.kemitix.thorp.storage.aws.hasher import java.nio.file.Path import com.amazonaws.services.s3.model.PutObjectRequest import com.amazonaws.services.s3.transfer.TransferManagerConfiguration import com.amazonaws.services.s3.transfer.internal.TransferManagerUtils -import net.kemitix.thorp.core.MD5HashGenerator -import net.kemitix.thorp.domain.MD5Hash +import net.kemitix.thorp.core.hasher.Hasher +import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.filesystem.FileSystem import zio.{TaskR, ZIO} -trait ETagGenerator { +private trait ETagGenerator { def eTag( path: Path - ): TaskR[FileSystem, String] = { + ): TaskR[Hasher with FileSystem, String] = { val partSize = calculatePartSize(path) val parts = numParts(path.toFile.length, partSize) - ZIO - .foreach(partsIndex(parts))(digestChunk(path, partSize)) - .map(concatenateDigests) - .map(MD5HashGenerator.hex) + eTagHex(path, partSize, parts) .map(hash => s"$hash-$parts") } + private def eTagHex(path: Path, partSize: Long, parts: Long) = + ZIO + .foreach(partsIndex(parts))(digestChunk(path, partSize)) + .map(concatenateDigests) >>= Hasher.hex + private def partsIndex(parts: Long) = Range.Long(0, parts, 1).toList @@ -51,14 +53,10 @@ trait ETagGenerator { path: Path, chunkSize: Long )(chunkNumber: Long) = - hashChunk(path, chunkNumber, chunkSize).map(_.digest) - - def hashChunk( - path: Path, - chunkNumber: Long, - chunkSize: Long - ): TaskR[FileSystem, MD5Hash] = - MD5HashGenerator.md5FileChunk(path, chunkNumber * chunkSize, chunkSize) + Hasher + .hashObjectChunk(path, chunkNumber, chunkSize) + .map(_(MD5)) + .map(_.digest) def offsets( totalFileSizeBytes: Long, @@ -67,4 +65,4 @@ trait ETagGenerator { Range.Long(0, totalFileSizeBytes, optimalPartSize).toList } -object ETagGenerator extends ETagGenerator +private object ETagGenerator extends ETagGenerator diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/hasher/S3Hasher.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/hasher/S3Hasher.scala new file mode 100644 index 0000000..7cca3fe --- /dev/null +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/hasher/S3Hasher.scala @@ -0,0 +1,45 @@ +package net.kemitix.thorp.storage.aws.hasher + +import java.nio.file.Path + +import net.kemitix.thorp.core.hasher.Hasher +import net.kemitix.thorp.core.hasher.Hasher.Live.{hasher => CoreHasher} +import net.kemitix.thorp.core.hasher.Hasher.Service +import net.kemitix.thorp.domain.{HashType, MD5Hash} +import net.kemitix.thorp.filesystem.FileSystem +import net.kemitix.thorp.storage.aws.ETag +import zio.TaskR + +object S3Hasher { + + trait Live extends Hasher { + val hasher: Service = new Service { + + /** + * Generates an MD5 Hash and an multi-part ETag + * + * @param path the local path to scan + * @return a set of hash values + */ + override def hashObject( + path: Path): TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] = + for { + base <- CoreHasher.hashObject(path) + etag <- ETagGenerator.eTag(path).map(MD5Hash(_)) + } yield base + (ETag -> etag) + + override def hashObjectChunk(path: Path, + chunkNumber: Long, + chunkSize: Long) + : TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] = + CoreHasher.hashObjectChunk(path, chunkNumber, chunkSize) + + override def hex(in: Array[Byte]): TaskR[Hasher, String] = + CoreHasher.hex(in) + + override def digest(in: String): TaskR[Hasher, Array[Byte]] = + CoreHasher.digest(in) + } + + } +} diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala index 1fca764..2be78df 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala @@ -42,7 +42,8 @@ trait AmazonS3ClientTestFixture extends MockFactory { sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey): UIO[StorageQueueEvent] = - Copier.copy(client)(bucket, sourceKey, hash, targetKey) + Copier.copy(client)( + Copier.Request(bucket, sourceKey, hash, targetKey)) override def delete(bucket: Bucket, remoteKey: RemoteKey): UIO[StorageQueueEvent] = diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/CopierTest.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/CopierTest.scala index 463c2b8..b428b74 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/CopierTest.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/CopierTest.scala @@ -88,7 +88,8 @@ class CopierTest extends FreeSpec { amazonS3Client: AmazonS3.Client ) = runtime.unsafeRunSync { - Copier.copy(amazonS3Client)(bucket, sourceKey, hash, targetKey) + Copier.copy(amazonS3Client)( + Copier.Request(bucket, sourceKey, hash, targetKey)) }.toEither } diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ETagGeneratorTest.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/hasher/ETagGeneratorTest.scala similarity index 77% rename from storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ETagGeneratorTest.scala rename to storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/hasher/ETagGeneratorTest.scala index 9f26e6f..931943a 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ETagGeneratorTest.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/hasher/ETagGeneratorTest.scala @@ -1,18 +1,20 @@ -package net.kemitix.thorp.storage.aws +package net.kemitix.thorp.storage.aws.hasher import java.nio.file.Path import com.amazonaws.services.s3.transfer.TransferManagerConfiguration import net.kemitix.thorp.config.Resource +import net.kemitix.thorp.core.hasher.Hasher +import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.filesystem.FileSystem import org.scalatest.FunSpec import zio.DefaultRuntime class ETagGeneratorTest extends FunSpec { - private val runtime = new DefaultRuntime {} + object TestEnv extends Hasher.Live with FileSystem.Live - private val bigFile = Resource(this, "big-file") + private val bigFile = Resource(this, "../big-file") private val bigFilePath = bigFile.toPath private val configuration = new TransferManagerConfiguration private val chunkSize = 1200000 @@ -41,14 +43,16 @@ class ETagGeneratorTest extends FunSpec { md5Hashes.foreach { case (hash, index) => assertResult(Right(hash))( - invoke(bigFilePath, index, chunkSize).map(_.hash)) + invoke(bigFilePath, index, chunkSize) + .map(_(MD5)) + .map(_.hash)) } } def invoke(path: Path, index: Long, size: Long) = new DefaultRuntime {}.unsafeRunSync { - ETagGenerator - .hashChunk(path, index, size) - .provide(FileSystem.Live) + Hasher + .hashObjectChunk(path, index, size) + .provide(TestEnv) }.toEither } @@ -58,12 +62,13 @@ class ETagGeneratorTest extends FunSpec { val result = invoke(bigFilePath) assertResult(Right(expected))(result) } - def invoke(path: Path) = + def invoke(path: Path) = { new DefaultRuntime {}.unsafeRunSync { ETagGenerator .eTag(path) - .provide(FileSystem.Live) + .provide(TestEnv) }.toEither + } } }