diff --git a/build.sbt b/build.sbt index 9483fe8..abf65be 100644 --- a/build.sbt +++ b/build.sbt @@ -122,7 +122,7 @@ lazy val config = (project in file("config")) .settings(testDependencies) .settings(commandLineParsing) .settings(assemblyJarName in assembly := "config.jar") - .dependsOn(domain) + .dependsOn(domain % "compile->compile;test->test") .dependsOn(filesystem) lazy val filesystem = (project in file("filesystem")) diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala b/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala index 6c7863a..fdfa986 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala @@ -4,6 +4,7 @@ import net.kemitix.thorp.config.Config import net.kemitix.thorp.console.Console import net.kemitix.thorp.filesystem.FileSystem import net.kemitix.thorp.storage.aws.S3Storage +import net.kemitix.thorp.storage.aws.hasher.S3Hasher import zio.{App, ZIO} object Main extends App { @@ -13,6 +14,7 @@ object Main extends App { with Console.Live with Config.Live with FileSystem.Live + with S3Hasher.Live override def run(args: List[String]): ZIO[Environment, Nothing, Int] = Program diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala index 4ac6f47..6acd473 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala @@ -5,7 +5,6 @@ import net.kemitix.thorp.console._ import net.kemitix.thorp.core.CoreTypes.CoreProgram import net.kemitix.thorp.core._ import net.kemitix.thorp.domain.StorageQueueEvent -import net.kemitix.thorp.storage.aws.S3HashService.defaultHashService import zio.ZIO trait Program { @@ -27,7 +26,7 @@ trait Program { private def execute = { for { - plan <- PlanBuilder.createPlan(defaultHashService) + plan <- PlanBuilder.createPlan archive <- UnversionedMirrorArchive.default(plan.syncTotals) events <- applyPlan(archive, plan) _ <- SyncLogging.logRunFinished(events) @@ -39,7 +38,7 @@ trait Program { _ <- Console.putStrLn("There were errors:") _ <- throwable match { case ConfigValidationException(errors) => - ZIO.foreach(errors)(error => Console.putStrLn(s"- $error")) + ZIO.foreach_(errors)(error => Console.putStrLn(s"- $error")) } } yield () diff --git a/config/src/main/scala/net/kemitix/thorp/config/ConfigOption.scala b/config/src/main/scala/net/kemitix/thorp/config/ConfigOption.scala index 04e9b34..f8ba081 100644 --- a/config/src/main/scala/net/kemitix/thorp/config/ConfigOption.scala +++ b/config/src/main/scala/net/kemitix/thorp/config/ConfigOption.scala @@ -14,7 +14,7 @@ object ConfigOption { case class Source(path: Path) extends ConfigOption { override def update(config: Configuration): Configuration = - sources.modify(_ ++ path)(config) + sources.modify(_ + path)(config) } case class Bucket(name: String) extends ConfigOption { diff --git a/config/src/main/scala/net/kemitix/thorp/config/ConfigValidationException.scala b/config/src/main/scala/net/kemitix/thorp/config/ConfigValidationException.scala index b78e35f..927ded2 100644 --- a/config/src/main/scala/net/kemitix/thorp/config/ConfigValidationException.scala +++ b/config/src/main/scala/net/kemitix/thorp/config/ConfigValidationException.scala @@ -1,5 +1,5 @@ package net.kemitix.thorp.config final case class ConfigValidationException( - errors: List[ConfigValidation] + errors: Seq[ConfigValidation] ) extends Exception diff --git a/config/src/main/scala/net/kemitix/thorp/config/ConfigValidator.scala b/config/src/main/scala/net/kemitix/thorp/config/ConfigValidator.scala index 605c82e..0b5e5bb 100644 --- a/config/src/main/scala/net/kemitix/thorp/config/ConfigValidator.scala +++ b/config/src/main/scala/net/kemitix/thorp/config/ConfigValidator.scala @@ -22,17 +22,15 @@ sealed trait ConfigValidator { def validateSources( sources: Sources): Either[List[ConfigValidation], Sources] = - (for { - x <- sources.paths.foldLeft(List[ConfigValidation]()) { - (acc: List[ConfigValidation], path) => - { - validateSource(path) match { - case Left(errors) => acc ++ errors - case Right(_) => acc - } + sources.paths.foldLeft(List[ConfigValidation]()) { + (acc: List[ConfigValidation], path) => + { + validateSource(path) match { + case Left(errors) => acc ++ errors + case Right(_) => acc } - } - } yield x) match { + } + } match { case Nil => Right(sources) case errors => Left(errors) } diff --git a/config/src/main/scala/net/kemitix/thorp/config/ConfigurationBuilder.scala b/config/src/main/scala/net/kemitix/thorp/config/ConfigurationBuilder.scala index f226f1f..3b4fd07 100644 --- a/config/src/main/scala/net/kemitix/thorp/config/ConfigurationBuilder.scala +++ b/config/src/main/scala/net/kemitix/thorp/config/ConfigurationBuilder.scala @@ -17,10 +17,8 @@ trait ConfigurationBuilder { def buildConfig(priorityOpts: ConfigOptions) : ZIO[FileSystem, ConfigValidationException, Configuration] = - (for { - config <- getConfigOptions(priorityOpts).map(collateOptions) - valid <- ConfigValidator.validateConfig(config) - } yield valid) + (getConfigOptions(priorityOpts).map(collateOptions) >>= + ConfigValidator.validateConfig) .catchAll(errors => ZIO.fail(ConfigValidationException(errors))) private def getConfigOptions(priorityOpts: ConfigOptions) = diff --git a/config/src/main/scala/net/kemitix/thorp/config/ParseConfigFile.scala b/config/src/main/scala/net/kemitix/thorp/config/ParseConfigFile.scala index f5bf968..142ee52 100644 --- a/config/src/main/scala/net/kemitix/thorp/config/ParseConfigFile.scala +++ b/config/src/main/scala/net/kemitix/thorp/config/ParseConfigFile.scala @@ -3,24 +3,25 @@ package net.kemitix.thorp.config import java.nio.file.Path import net.kemitix.thorp.filesystem.FileSystem -import zio.{IO, ZIO} +import zio.{IO, TaskR, ZIO} trait ParseConfigFile { def parseFile( - filename: Path): ZIO[FileSystem, List[ConfigValidation], ConfigOptions] = - readFile(filename) - .map(ParseConfigLines.parseLines) - .catchAll(h => - IO.fail( - List(ConfigValidation.ErrorReadingFile(filename, h.getMessage)))) + filename: Path): ZIO[FileSystem, Seq[ConfigValidation], ConfigOptions] = + (readFile(filename) >>= ParseConfigLines.parseLines) + .catchAll( + h => + IO.fail( + List(ConfigValidation.ErrorReadingFile(filename, h.getMessage)))) private def readFile(filename: Path) = - FileSystem - .exists(filename.toFile) - .flatMap( - if (_) FileSystem.lines(filename.toFile) - else ZIO.succeed(List.empty)) + FileSystem.exists(filename.toFile) >>= readLines(filename) + + private def readLines(filename: Path)( + exists: Boolean): TaskR[FileSystem, Seq[String]] = + if (exists) FileSystem.lines(filename.toFile) + else ZIO.succeed(Seq.empty) } diff --git a/config/src/main/scala/net/kemitix/thorp/config/ParseConfigLines.scala b/config/src/main/scala/net/kemitix/thorp/config/ParseConfigLines.scala index c7a1c84..5d35902 100644 --- a/config/src/main/scala/net/kemitix/thorp/config/ParseConfigLines.scala +++ b/config/src/main/scala/net/kemitix/thorp/config/ParseConfigLines.scala @@ -3,22 +3,16 @@ package net.kemitix.thorp.config import java.nio.file.Paths import java.util.regex.Pattern -import net.kemitix.thorp.config.ConfigOption.{ - Bucket, - Debug, - Exclude, - Include, - Prefix, - Source -} +import net.kemitix.thorp.config.ConfigOption._ +import zio.UIO trait ParseConfigLines { private val pattern = "^\\s*(?\\S*)\\s*=\\s*(?\\S*)\\s*$" private val format = Pattern.compile(pattern) - def parseLines(lines: List[String]): ConfigOptions = - ConfigOptions(lines.flatMap(parseLine)) + def parseLines(lines: Seq[String]): UIO[ConfigOptions] = + UIO(ConfigOptions(lines.flatMap(parseLine).toList)) private def parseLine(str: String) = format.matcher(str) match { @@ -40,7 +34,7 @@ trait ParseConfigLines { case _ => None } - def truthy(value: String): Boolean = + private def truthy(value: String): Boolean = value.toLowerCase match { case "true" => true case "yes" => true diff --git a/config/src/main/scala/net/kemitix/thorp/config/SourceConfigLoader.scala b/config/src/main/scala/net/kemitix/thorp/config/SourceConfigLoader.scala index 4be190a..ae941b1 100644 --- a/config/src/main/scala/net/kemitix/thorp/config/SourceConfigLoader.scala +++ b/config/src/main/scala/net/kemitix/thorp/config/SourceConfigLoader.scala @@ -8,24 +8,16 @@ trait SourceConfigLoader { val thorpConfigFileName = ".thorp.conf" - def loadSourceConfigs - : Sources => ZIO[FileSystem, List[ConfigValidation], ConfigOptions] = - sources => { - - val sourceConfigOptions = - ConfigOptions(sources.paths.map(ConfigOption.Source)) - - val reduce: List[ConfigOptions] => ConfigOptions = - _.foldLeft(sourceConfigOptions) { (acc, co) => + def loadSourceConfigs( + sources: Sources): ZIO[FileSystem, Seq[ConfigValidation], ConfigOptions] = + ZIO + .foreach(sources.paths) { path => + ParseConfigFile.parseFile(path.resolve(thorpConfigFileName)) + } + .map(_.foldLeft(ConfigOptions(sources.paths.map(ConfigOption.Source))) { + (acc, co) => acc ++ co - } - - ZIO - .foreach(sources.paths) { path => - ParseConfigFile.parseFile(path.resolve(thorpConfigFileName)) - } - .map(reduce) - } + }) } diff --git a/core/src/test/scala/net/kemitix/thorp/core/ConfigOptionTest.scala b/config/src/test/scala/net/kemitix/thorp/config/ConfigOptionTest.scala similarity index 85% rename from core/src/test/scala/net/kemitix/thorp/core/ConfigOptionTest.scala rename to config/src/test/scala/net/kemitix/thorp/config/ConfigOptionTest.scala index cae4b0d..d19eb73 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/ConfigOptionTest.scala +++ b/config/src/test/scala/net/kemitix/thorp/config/ConfigOptionTest.scala @@ -1,12 +1,6 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.config -import net.kemitix.thorp.config.{ - ConfigOption, - ConfigOptions, - ConfigQuery, - ConfigurationBuilder -} -import net.kemitix.thorp.domain.Sources +import net.kemitix.thorp.domain.{Sources, TemporaryFolder} import net.kemitix.thorp.filesystem.FileSystem import org.scalatest.FunSpec import zio.DefaultRuntime diff --git a/core/src/test/scala/net/kemitix/thorp/core/ConfigQueryTest.scala b/config/src/test/scala/net/kemitix/thorp/config/ConfigQueryTest.scala similarity index 96% rename from core/src/test/scala/net/kemitix/thorp/core/ConfigQueryTest.scala rename to config/src/test/scala/net/kemitix/thorp/config/ConfigQueryTest.scala index 8050300..1a83d6d 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/ConfigQueryTest.scala +++ b/config/src/test/scala/net/kemitix/thorp/config/ConfigQueryTest.scala @@ -1,8 +1,7 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.config import java.nio.file.Paths -import net.kemitix.thorp.config.{ConfigOption, ConfigOptions, ConfigQuery} import net.kemitix.thorp.domain.Sources import org.scalatest.FreeSpec diff --git a/core/src/test/scala/net/kemitix/thorp/core/ConfigurationBuilderTest.scala b/config/src/test/scala/net/kemitix/thorp/config/ConfigurationBuilderTest.scala similarity index 98% rename from core/src/test/scala/net/kemitix/thorp/core/ConfigurationBuilderTest.scala rename to config/src/test/scala/net/kemitix/thorp/config/ConfigurationBuilderTest.scala index 70bc753..56100ed 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/ConfigurationBuilderTest.scala +++ b/config/src/test/scala/net/kemitix/thorp/config/ConfigurationBuilderTest.scala @@ -1,12 +1,7 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.config import java.nio.file.{Path, Paths} -import net.kemitix.thorp.config.{ - ConfigOption, - ConfigOptions, - ConfigurationBuilder -} import net.kemitix.thorp.domain.Filter.{Exclude, Include} import net.kemitix.thorp.domain._ import net.kemitix.thorp.filesystem.FileSystem diff --git a/core/src/test/scala/net/kemitix/thorp/core/ParseConfigFileTest.scala b/config/src/test/scala/net/kemitix/thorp/config/ParseConfigFileTest.scala similarity index 91% rename from core/src/test/scala/net/kemitix/thorp/core/ParseConfigFileTest.scala rename to config/src/test/scala/net/kemitix/thorp/config/ParseConfigFileTest.scala index 9b6524c..c3b62a6 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/ParseConfigFileTest.scala +++ b/config/src/test/scala/net/kemitix/thorp/config/ParseConfigFileTest.scala @@ -1,13 +1,7 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.config import java.nio.file.{Path, Paths} -import net.kemitix.thorp.config.{ - ConfigOption, - ConfigOptions, - ParseConfigFile, - Resource -} import net.kemitix.thorp.filesystem.FileSystem import org.scalatest.FunSpec import zio.DefaultRuntime diff --git a/config/src/test/scala/net/kemitix/thorp/config/ParseConfigLinesTest.scala b/config/src/test/scala/net/kemitix/thorp/config/ParseConfigLinesTest.scala new file mode 100644 index 0000000..8255a54 --- /dev/null +++ b/config/src/test/scala/net/kemitix/thorp/config/ParseConfigLinesTest.scala @@ -0,0 +1,88 @@ +package net.kemitix.thorp.config + +import java.nio.file.Paths + +import org.scalatest.FunSpec +import zio.DefaultRuntime + +class ParseConfigLinesTest extends FunSpec { + + describe("parse single lines") { + describe("source") { + it("should parse") { + val expected = + Right( + ConfigOptions( + List(ConfigOption.Source(Paths.get("/path/to/source"))))) + val result = invoke(List("source = /path/to/source")) + assertResult(expected)(result) + } + } + describe("bucket") { + it("should parse") { + val expected = + Right(ConfigOptions(List(ConfigOption.Bucket("bucket-name")))) + val result = invoke(List("bucket = bucket-name")) + assertResult(expected)(result) + } + } + describe("prefix") { + it("should parse") { + val expected = + Right(ConfigOptions(List(ConfigOption.Prefix("prefix/to/files")))) + val result = invoke(List("prefix = prefix/to/files")) + assertResult(expected)(result) + } + } + describe("include") { + it("should parse") { + val expected = + Right(ConfigOptions(List(ConfigOption.Include("path/to/include")))) + val result = invoke(List("include = path/to/include")) + assertResult(expected)(result) + } + } + describe("exclude") { + it("should parse") { + val expected = + Right(ConfigOptions(List(ConfigOption.Exclude("path/to/exclude")))) + val result = invoke(List("exclude = path/to/exclude")) + assertResult(expected)(result) + } + } + describe("debug - true") { + it("should parse") { + val expected = Right(ConfigOptions(List(ConfigOption.Debug()))) + val result = invoke(List("debug = true")) + assertResult(expected)(result) + } + } + describe("debug - false") { + it("should parse") { + val expected = Right(ConfigOptions()) + val result = invoke(List("debug = false")) + assertResult(expected)(result) + } + } + describe("comment line") { + it("should be ignored") { + val expected = Right(ConfigOptions()) + val result = invoke(List("# ignore me")) + assertResult(expected)(result) + } + } + describe("unrecognised option") { + it("should be ignored") { + val expected = Right(ConfigOptions()) + val result = invoke(List("unsupported = option")) + assertResult(expected)(result) + } + } + + def invoke(lines: List[String]) = { + new DefaultRuntime {}.unsafeRunSync { + ParseConfigLines.parseLines(lines) + }.toEither + } + } +} diff --git a/core/src/main/scala/net/kemitix/thorp/core/ActionGenerator.scala b/core/src/main/scala/net/kemitix/thorp/core/ActionGenerator.scala index b3db88c..05a0108 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ActionGenerator.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ActionGenerator.scala @@ -13,35 +13,37 @@ object ActionGenerator { ): ZIO[Config, Nothing, Stream[Action]] = for { bucket <- Config.bucket - } yield - s3MetaData match { - // #1 local exists, remote exists, remote matches - do nothing - case S3MetaData(localFile, _, Some(RemoteMetaData(key, hash, _))) - if localFile.matches(hash) => - doNothing(bucket, key) - // #2 local exists, remote is missing, other matches - copy - case S3MetaData(localFile, matchByHash, None) if matchByHash.nonEmpty => - copyFile(bucket, localFile, matchByHash) - // #3 local exists, remote is missing, other no matches - upload - case S3MetaData(localFile, matchByHash, None) - if matchByHash.isEmpty && - isUploadAlreadyQueued(previousActions)(localFile) => - uploadFile(bucket, localFile) - // #4 local exists, remote exists, remote no match, other matches - copy - case S3MetaData(localFile, - matchByHash, - Some(RemoteMetaData(_, hash, _))) - if !localFile.matches(hash) && - matchByHash.nonEmpty => - copyFile(bucket, localFile, matchByHash) - // #5 local exists, remote exists, remote no match, other no matches - upload - case S3MetaData(localFile, matchByHash, Some(_)) - if matchByHash.isEmpty => - uploadFile(bucket, localFile) - // fallback - case S3MetaData(localFile, _, _) => - doNothing(bucket, localFile.remoteKey) - } + } yield genAction(s3MetaData, previousActions, bucket) + + private def genAction(s3MetaData: S3MetaData, + previousActions: Stream[Action], + bucket: Bucket): Stream[Action] = { + s3MetaData match { + // #1 local exists, remote exists, remote matches - do nothing + case S3MetaData(localFile, _, Some(RemoteMetaData(key, hash, _))) + if localFile.matches(hash) => + doNothing(bucket, key) + // #2 local exists, remote is missing, other matches - copy + case S3MetaData(localFile, matchByHash, None) if matchByHash.nonEmpty => + copyFile(bucket, localFile, matchByHash) + // #3 local exists, remote is missing, other no matches - upload + case S3MetaData(localFile, matchByHash, None) + if matchByHash.isEmpty && + isUploadAlreadyQueued(previousActions)(localFile) => + uploadFile(bucket, localFile) + // #4 local exists, remote exists, remote no match, other matches - copy + case S3MetaData(localFile, matchByHash, Some(RemoteMetaData(_, hash, _))) + if !localFile.matches(hash) && + matchByHash.nonEmpty => + copyFile(bucket, localFile, matchByHash) + // #5 local exists, remote exists, remote no match, other no matches - upload + case S3MetaData(localFile, matchByHash, Some(_)) if matchByHash.isEmpty => + uploadFile(bucket, localFile) + // fallback + case S3MetaData(localFile, _, _) => + doNothing(bucket, localFile.remoteKey) + } + } private def key = LocalFile.remoteKey ^|-> RemoteKey.key diff --git a/core/src/main/scala/net/kemitix/thorp/core/CoreTypes.scala b/core/src/main/scala/net/kemitix/thorp/core/CoreTypes.scala index ad59bfe..7da3ff9 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/CoreTypes.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/CoreTypes.scala @@ -2,13 +2,14 @@ package net.kemitix.thorp.core import net.kemitix.thorp.config.Config import net.kemitix.thorp.console.Console +import net.kemitix.thorp.core.hasher.Hasher import net.kemitix.thorp.filesystem.FileSystem import net.kemitix.thorp.storage.api.Storage import zio.ZIO object CoreTypes { - type CoreEnv = Storage with Console with Config with FileSystem + type CoreEnv = Storage with Console with Config with FileSystem with Hasher type CoreProgram[A] = ZIO[CoreEnv, Throwable, A] } diff --git a/core/src/main/scala/net/kemitix/thorp/core/HashService.scala b/core/src/main/scala/net/kemitix/thorp/core/HashService.scala deleted file mode 100644 index 2e89a68..0000000 --- a/core/src/main/scala/net/kemitix/thorp/core/HashService.scala +++ /dev/null @@ -1,16 +0,0 @@ -package net.kemitix.thorp.core - -import java.nio.file.Path - -import net.kemitix.thorp.domain.{HashType, MD5Hash} -import net.kemitix.thorp.filesystem.FileSystem -import zio.TaskR - -/** - * Creates one, or more, hashes for local objects. - */ -trait HashService { - - def hashLocalObject(path: Path): TaskR[FileSystem, Map[HashType, MD5Hash]] - -} diff --git a/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala b/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala index 900745d..c1972bf 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala @@ -1,48 +1,43 @@ package net.kemitix.thorp.core +import java.io.File import java.nio.file.Path import net.kemitix.thorp.config.Config import net.kemitix.thorp.core.KeyGenerator.generateKey +import net.kemitix.thorp.core.hasher.Hasher import net.kemitix.thorp.domain._ import net.kemitix.thorp.filesystem.FileSystem import zio.{Task, TaskR, ZIO} object LocalFileStream { - def findFiles(hashService: HashService)( + def findFiles( source: Path - ): TaskR[Config with FileSystem, LocalFiles] = { + ): TaskR[Config with FileSystem with Hasher, LocalFiles] = { def recurseIntoSubDirectories( - path: Path): TaskR[Config with FileSystem, LocalFiles] = + path: Path): TaskR[Config with FileSystem with Hasher, LocalFiles] = path.toFile match { case f if f.isDirectory => loop(path) - case _ => pathToLocalFile(hashService)(path) + case _ => localFile(path) } - def recurse( - paths: Stream[Path]): TaskR[Config with FileSystem, LocalFiles] = + def recurse(paths: Stream[Path]) + : TaskR[Config with FileSystem with Hasher, LocalFiles] = for { recursed <- ZIO.foreach(paths)(path => recurseIntoSubDirectories(path)) } yield LocalFiles.reduce(recursed.toStream) - def loop(path: Path): TaskR[Config with FileSystem, LocalFiles] = { - - for { - paths <- dirPaths(path) - localFiles <- recurse(paths) - } yield localFiles - } + def loop( + path: Path): TaskR[Config with FileSystem with Hasher, LocalFiles] = + dirPaths(path) >>= recurse loop(source) } private def dirPaths(path: Path) = - for { - paths <- listFiles(path) - filtered <- includedDirPaths(paths) - } yield filtered + listFiles(path) >>= includedDirPaths private def includedDirPaths(paths: Stream[Path]) = for { @@ -53,12 +48,12 @@ object LocalFileStream { .filter({ case (_, included) => included }) .map({ case (path, _) => path }) - private def localFile(hashService: HashService)(path: Path) = { + private def localFile(path: Path) = { val file = path.toFile for { sources <- Config.sources prefix <- Config.prefix - hash <- hashService.hashLocalObject(path) + hash <- Hasher.hashObject(path) localFile = LocalFile(file, sources.forPath(path).toFile, hash, @@ -72,16 +67,17 @@ object LocalFileStream { private def listFiles(path: Path) = for { files <- Task(path.toFile.listFiles) - _ <- Task.when(files == null)( - Task.fail(new IllegalArgumentException(s"Directory not found $path"))) + _ <- filesMustExist(path, files) } yield Stream(files: _*).map(_.toPath) + private def filesMustExist(path: Path, files: Array[File]) = { + Task.when(files == null)( + Task.fail(new IllegalArgumentException(s"Directory not found $path"))) + } + private def isIncluded(path: Path) = for { filters <- Config.filters - } yield Filter.isIncluded(filters)(path) - - private def pathToLocalFile(hashService: HashService)(path: Path) = - localFile(hashService)(path) + } yield Filter.isIncluded(path)(filters) } diff --git a/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala b/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala index d613fa1..e04d54c 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala @@ -3,30 +3,36 @@ package net.kemitix.thorp.core import net.kemitix.thorp.config.Config import net.kemitix.thorp.console._ import net.kemitix.thorp.core.Action._ +import net.kemitix.thorp.core.hasher.Hasher import net.kemitix.thorp.domain._ import net.kemitix.thorp.filesystem.FileSystem import net.kemitix.thorp.storage.api.Storage import zio.{TaskR, ZIO} -trait PlanBuilder { +object PlanBuilder { - def createPlan(hashService: HashService) - : TaskR[Storage with Console with Config with FileSystem, SyncPlan] = - for { - _ <- SyncLogging.logRunStart - actions <- buildPlan(hashService) - } yield actions + def createPlan + : TaskR[Storage with Console with Config with FileSystem with Hasher, + SyncPlan] = + SyncLogging.logRunStart *> buildPlan - private def buildPlan(hashService: HashService) = + private def buildPlan = + gatherMetadata >>= assemblePlan + + private def gatherMetadata = + fetchRemoteData &&& findLocalFiles + + private def fetchRemoteData = for { - metadata <- gatherMetadata(hashService) - plan <- assemblePlan(metadata) - } yield plan + bucket <- Config.bucket + prefix <- Config.prefix + objects <- Storage.list(bucket, prefix) + } yield objects private def assemblePlan(metadata: (S3ObjectsData, LocalFiles)) = metadata match { case (remoteData, localData) => - createActions(remoteData, localData) + createActions(remoteData, localData.localFiles) .map(_.filter(doesSomething).sortBy(SequencePlan.order)) .map( SyncPlan(_, SyncTotals(localData.count, localData.totalSizeBytes))) @@ -34,11 +40,11 @@ trait PlanBuilder { private def createActions( remoteData: S3ObjectsData, - localData: LocalFiles + localFiles: Stream[LocalFile] ) = for { - fileActions <- actionsForLocalFiles(remoteData, localData) - remoteActions <- actionsForRemoteKeys(remoteData) + fileActions <- actionsForLocalFiles(remoteData, localFiles) + remoteActions <- actionsForRemoteKeys(remoteData.byKey.keys) } yield fileActions ++ remoteActions private def doesSomething: Action => Boolean = { @@ -48,12 +54,10 @@ trait PlanBuilder { private def actionsForLocalFiles( remoteData: S3ObjectsData, - localData: LocalFiles + localFiles: Stream[LocalFile] ) = - ZIO.foldLeft(localData.localFiles)(Stream.empty[Action])( - (acc, localFile) => - createActionFromLocalFile(remoteData, acc, localFile) - .map(actions => actions ++ acc)) + ZIO.foldLeft(localFiles)(Stream.empty[Action])((acc, localFile) => + createActionFromLocalFile(remoteData, acc, localFile).map(_ ++ acc)) private def createActionFromLocalFile( remoteData: S3ObjectsData, @@ -64,11 +68,9 @@ trait PlanBuilder { S3MetaDataEnricher.getMetadata(localFile, remoteData), previousActions) - private def actionsForRemoteKeys(remoteData: S3ObjectsData) = - ZIO.foldLeft(remoteData.byKey.keys)(Stream.empty[Action]) { - (acc, remoteKey) => - createActionFromRemoteKey(remoteKey).map(action => action #:: acc) - } + private def actionsForRemoteKeys(remoteKeys: Iterable[RemoteKey]) = + ZIO.foldLeft(remoteKeys)(Stream.empty[Action])((acc, remoteKey) => + createActionFromRemoteKey(remoteKey).map(_ #:: acc)) private def createActionFromRemoteKey(remoteKey: RemoteKey) = for { @@ -80,33 +82,13 @@ trait PlanBuilder { if (needsDeleted) ToDelete(bucket, remoteKey, 0L) else DoNothing(bucket, remoteKey, 0L) - private def gatherMetadata(hashService: HashService) = - for { - remoteData <- fetchRemoteData - localData <- findLocalFiles(hashService) - } yield (remoteData, localData) + private def findLocalFiles = + SyncLogging.logFileScan *> findFiles - private def fetchRemoteData = - for { - bucket <- Config.bucket - prefix <- Config.prefix - objects <- Storage.list(bucket, prefix) - } yield objects - - private def findLocalFiles(hashService: HashService) = - for { - _ <- SyncLogging.logFileScan - localFiles <- findFiles(hashService) - } yield localFiles - - private def findFiles(hashService: HashService) = + private def findFiles = for { sources <- Config.sources - paths = sources.paths - found <- ZIO.foreach(paths)(path => - LocalFileStream.findFiles(hashService)(path)) + found <- ZIO.foreach(sources.paths)(LocalFileStream.findFiles) } yield LocalFiles.reduce(found.toStream) } - -object PlanBuilder extends PlanBuilder diff --git a/core/src/main/scala/net/kemitix/thorp/core/SimpleHashService.scala b/core/src/main/scala/net/kemitix/thorp/core/SimpleHashService.scala deleted file mode 100644 index 81de4c5..0000000 --- a/core/src/main/scala/net/kemitix/thorp/core/SimpleHashService.scala +++ /dev/null @@ -1,19 +0,0 @@ -package net.kemitix.thorp.core - -import java.nio.file.Path - -import net.kemitix.thorp.domain.HashType.MD5 -import net.kemitix.thorp.domain.{HashType, MD5Hash} -import net.kemitix.thorp.filesystem.FileSystem -import zio.TaskR - -case class SimpleHashService() extends HashService { - - override def hashLocalObject( - path: Path - ): TaskR[FileSystem, Map[HashType, MD5Hash]] = - for { - md5 <- MD5HashGenerator.md5File(path) - } yield Map(MD5 -> md5) - -} diff --git a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala b/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala index bad5433..11c7631 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala @@ -33,13 +33,11 @@ trait SyncLogging { actions: Stream[StorageQueueEvent] ): ZIO[Console, Nothing, Unit] = { val counters = actions.foldLeft(Counters())(countActivities) - for { - _ <- Console.putStrLn(eraseToEndOfScreen) - _ <- Console.putStrLn(s"Uploaded ${counters.uploaded} files") - _ <- Console.putStrLn(s"Copied ${counters.copied} files") - _ <- Console.putStrLn(s"Deleted ${counters.deleted} files") - _ <- Console.putStrLn(s"Errors ${counters.errors}") - } yield () + Console.putStrLn(eraseToEndOfScreen) *> + Console.putStrLn(s"Uploaded ${counters.uploaded} files") *> + Console.putStrLn(s"Copied ${counters.copied} files") *> + Console.putStrLn(s"Deleted ${counters.deleted} files") *> + Console.putStrLn(s"Errors ${counters.errors}") } private def countActivities: (Counters, StorageQueueEvent) => Counters = diff --git a/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala b/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala index faaaecf..f2adf35 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala @@ -27,7 +27,7 @@ trait ThorpArchive { def logEvent( event: StorageQueueEvent - ): TaskR[Console with Config, Unit] = + ): TaskR[Console with Config, StorageQueueEvent] = event match { case UploadQueueEvent(remoteKey, _) => for { @@ -37,7 +37,7 @@ trait ThorpArchive { _ <- TaskR.when(!batchMode)( Console.putStrLn( s"${GREEN}Uploaded:$RESET ${remoteKey.key}$eraseToEndOfScreen")) - } yield () + } yield event case CopyQueueEvent(sourceKey, targetKey) => for { batchMode <- Config.batchMode @@ -47,7 +47,7 @@ trait ThorpArchive { Console.putStrLn( s"${GREEN}Copied:$RESET ${sourceKey.key} => ${targetKey.key}$eraseToEndOfScreen") ) - } yield () + } yield event case DeleteQueueEvent(remoteKey) => for { batchMode <- Config.batchMode @@ -55,7 +55,7 @@ trait ThorpArchive { _ <- TaskR.when(!batchMode)( Console.putStrLn( s"${GREEN}Deleted:$RESET ${remoteKey.key}$eraseToEndOfScreen")) - } yield () + } yield event case ErrorQueueEvent(action, _, e) => for { batchMode <- Config.batchMode @@ -64,9 +64,9 @@ trait ThorpArchive { s"${action.name} failed: ${action.keys}: ${e.getMessage}")) _ <- TaskR.when(!batchMode)(Console.putStrLn( s"${RED}ERROR:$RESET ${action.name} ${action.keys}: ${e.getMessage}$eraseToEndOfScreen")) - } yield () - case DoNothingQueueEvent(_) => TaskR(()) - case ShutdownQueueEvent() => TaskR(()) + } yield event + case DoNothingQueueEvent(_) => TaskR(event) + case ShutdownQueueEvent() => TaskR(event) } } diff --git a/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala b/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala index ab9f7ae..9fc065b 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala @@ -18,20 +18,11 @@ case class UnversionedMirrorArchive(syncTotals: SyncTotals) ): TaskR[Storage with Console with Config, StorageQueueEvent] = action match { case ToUpload(bucket, localFile, _) => - for { - event <- doUpload(index, totalBytesSoFar, bucket, localFile) - _ <- logEvent(event) - } yield event + doUpload(index, totalBytesSoFar, bucket, localFile) >>= logEvent case ToCopy(bucket, sourceKey, hash, targetKey, _) => - for { - event <- Storage.copy(bucket, sourceKey, hash, targetKey) - _ <- logEvent(event) - } yield event + Storage.copy(bucket, sourceKey, hash, targetKey) >>= logEvent case ToDelete(bucket, remoteKey, _) => - for { - event <- Storage.delete(bucket, remoteKey) - _ <- logEvent(event) - } yield event + Storage.delete(bucket, remoteKey) >>= logEvent case DoNothing(_, remoteKey, _) => Task(DoNothingQueueEvent(remoteKey)) } diff --git a/core/src/main/scala/net/kemitix/thorp/core/hasher/Hasher.scala b/core/src/main/scala/net/kemitix/thorp/core/hasher/Hasher.scala new file mode 100644 index 0000000..476b328 --- /dev/null +++ b/core/src/main/scala/net/kemitix/thorp/core/hasher/Hasher.scala @@ -0,0 +1,96 @@ +package net.kemitix.thorp.core.hasher + +import java.nio.file.Path +import java.util.concurrent.atomic.AtomicReference + +import net.kemitix.thorp.domain.HashType.MD5 +import net.kemitix.thorp.domain.{HashType, MD5Hash} +import net.kemitix.thorp.filesystem.FileSystem +import zio.{TaskR, ZIO} + +/** + * Creates one, or more, hashes for local objects. + */ +trait Hasher { + val hasher: Hasher.Service +} +object Hasher { + trait Service { + def hashObject( + path: Path): TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] + def hashObjectChunk( + path: Path, + chunkNumber: Long, + chunkSize: Long): TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] + def hex(in: Array[Byte]): TaskR[Hasher, String] + def digest(in: String): TaskR[Hasher, Array[Byte]] + } + trait Live extends Hasher { + val hasher: Service = new Service { + override def hashObject( + path: Path): TaskR[FileSystem, Map[HashType, MD5Hash]] = + for { + md5 <- MD5HashGenerator.md5File(path) + } yield Map(MD5 -> md5) + + override def hashObjectChunk(path: Path, + chunkNumber: Long, + chunkSize: Long) + : TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] = + for { + md5 <- MD5HashGenerator.md5FileChunk(path, + chunkNumber * chunkSize, + chunkSize) + } yield Map(MD5 -> md5) + + override def hex(in: Array[Byte]): TaskR[Hasher, String] = + ZIO(MD5HashGenerator.hex(in)) + + override def digest(in: String): TaskR[Hasher, Array[Byte]] = + ZIO(MD5HashGenerator.digest(in)) + } + } + object Live extends Live + + trait Test extends Hasher { + val hashes: AtomicReference[Map[Path, Map[HashType, MD5Hash]]] = + new AtomicReference(Map.empty) + val hashChunks + : AtomicReference[Map[Path, Map[Long, Map[HashType, MD5Hash]]]] = + new AtomicReference(Map.empty) + val hasher: Service = new Service { + override def hashObject( + path: Path): TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] = + ZIO(hashes.get()(path)) + + override def hashObjectChunk(path: Path, + chunkNumber: Long, + chunkSize: Long) + : TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] = + ZIO(hashChunks.get()(path)(chunkNumber)) + + override def hex(in: Array[Byte]): TaskR[Hasher, String] = + ZIO(MD5HashGenerator.hex(in)) + + override def digest(in: String): TaskR[Hasher, Array[Byte]] = + ZIO(MD5HashGenerator.digest(in)) + } + } + object Test extends Test + + final def hashObject( + path: Path): TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] = + ZIO.accessM(_.hasher hashObject path) + + final def hashObjectChunk( + path: Path, + chunkNumber: Long, + chunkSize: Long): TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] = + ZIO.accessM(_.hasher hashObjectChunk (path, chunkNumber, chunkSize)) + + final def hex(in: Array[Byte]): TaskR[Hasher, String] = + ZIO.accessM(_.hasher hex in) + + final def digest(in: String): TaskR[Hasher, Array[Byte]] = + ZIO.accessM(_.hasher digest in) +} diff --git a/core/src/main/scala/net/kemitix/thorp/core/MD5HashGenerator.scala b/core/src/main/scala/net/kemitix/thorp/core/hasher/MD5HashGenerator.scala similarity index 88% rename from core/src/main/scala/net/kemitix/thorp/core/MD5HashGenerator.scala rename to core/src/main/scala/net/kemitix/thorp/core/hasher/MD5HashGenerator.scala index 822d10c..140168a 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/MD5HashGenerator.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/hasher/MD5HashGenerator.scala @@ -1,4 +1,4 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.core.hasher import java.io.{File, FileInputStream} import java.nio.file.Path @@ -10,7 +10,7 @@ import zio.{Task, TaskR} import scala.collection.immutable.NumericRange -object MD5HashGenerator { +private object MD5HashGenerator { val maxBufferSize = 8048 val defaultBuffer = new Array[Byte](maxBufferSize) @@ -48,13 +48,11 @@ object MD5HashGenerator { offset: Long, endOffset: Long ) = - FileSystem - .open(file, offset) - .flatMap { managedFileInputStream => - managedFileInputStream.use { fileInputStream => - digestFile(fileInputStream, offset, endOffset) - } + FileSystem.open(file, offset) >>= { managedFileInputStream => + managedFileInputStream.use { fileInputStream => + digestFile(fileInputStream, offset, endOffset) } + } private def digestFile( fis: FileInputStream, diff --git a/core/src/test/scala/net/kemitix/thorp/core/DummyHashService.scala b/core/src/test/scala/net/kemitix/thorp/core/DummyHashService.scala deleted file mode 100644 index 8a1a9c1..0000000 --- a/core/src/test/scala/net/kemitix/thorp/core/DummyHashService.scala +++ /dev/null @@ -1,14 +0,0 @@ -package net.kemitix.thorp.core - -import java.nio.file.Path - -import net.kemitix.thorp.domain.{HashType, MD5Hash} -import zio.Task - -case class DummyHashService(hashes: Map[Path, Map[HashType, MD5Hash]]) - extends HashService { - - override def hashLocalObject(path: Path): Task[Map[HashType, MD5Hash]] = - Task(hashes(path)) - -} diff --git a/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala index a92d419..17b5755 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala @@ -10,6 +10,7 @@ import net.kemitix.thorp.config.{ Resource } import net.kemitix.thorp.console._ +import net.kemitix.thorp.core.hasher.Hasher import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain._ import net.kemitix.thorp.filesystem.FileSystem @@ -21,30 +22,17 @@ class LocalFileStreamSuite extends FunSpec { private val source = Resource(this, "upload") private val sourcePath = source.toPath - private val hashService: HashService = DummyHashService( - Map( - file("root-file") -> Map(MD5 -> MD5HashData.Root.hash), - file("subdir/leaf-file") -> Map(MD5 -> MD5HashData.Leaf.hash) - )) private def file(filename: String) = sourcePath.resolve(Paths.get(filename)) - private val configOptions = ConfigOptions( - List( - ConfigOption.IgnoreGlobalOptions, - ConfigOption.IgnoreUserOptions, - ConfigOption.Source(sourcePath), - ConfigOption.Bucket("aBucket") - )) - describe("findFiles") { it("should find all files") { val expected = Right(Set("subdir/leaf-file", "root-file")) val result = invoke() .map(_.localFiles) - .map(localFiles => localFiles.map(_.relative.toString)) + .map(_.map(_.relative.toString)) .map(_.toSet) assertResult(expected)(result) } @@ -61,9 +49,13 @@ class LocalFileStreamSuite extends FunSpec { } private def invoke() = { - type TestEnv = Storage with Console with Config with FileSystem + type TestEnv = Storage + with Console + with Config + with FileSystem + with Hasher.Test val testEnv: TestEnv = new Storage.Test with Console.Test with Config.Live - with FileSystem.Live { + with FileSystem.Live with Hasher.Test { override def listResult: Task[S3ObjectsData] = Task.die(new NotImplementedError) override def uploadResult: UIO[StorageQueueEvent] = @@ -75,12 +67,23 @@ class LocalFileStreamSuite extends FunSpec { override def shutdownResult: UIO[StorageQueueEvent] = Task.die(new NotImplementedError) } - + testEnv.hashes.set( + Map( + file("root-file") -> Map(MD5 -> MD5HashData.Root.hash), + file("subdir/leaf-file") -> Map(MD5 -> MD5HashData.Leaf.hash) + )) + val configOptions = ConfigOptions( + List( + ConfigOption.IgnoreGlobalOptions, + ConfigOption.IgnoreUserOptions, + ConfigOption.Source(sourcePath), + ConfigOption.Bucket("aBucket") + )) def testProgram = for { config <- ConfigurationBuilder.buildConfig(configOptions) _ <- Config.set(config) - files <- LocalFileStream.findFiles(hashService)(sourcePath) + files <- LocalFileStream.findFiles(sourcePath) } yield files new DefaultRuntime {}.unsafeRunSync { diff --git a/core/src/test/scala/net/kemitix/thorp/core/ParseConfigLinesTest.scala b/core/src/test/scala/net/kemitix/thorp/core/ParseConfigLinesTest.scala deleted file mode 100644 index 9f0853f..0000000 --- a/core/src/test/scala/net/kemitix/thorp/core/ParseConfigLinesTest.scala +++ /dev/null @@ -1,83 +0,0 @@ -package net.kemitix.thorp.core - -import java.nio.file.Paths - -import net.kemitix.thorp.config.{ConfigOption, ConfigOptions, ParseConfigLines} -import org.scalatest.FunSpec - -class ParseConfigLinesTest extends FunSpec { - - describe("parse single lines") { - describe("source") { - it("should parse") { - val expected = - ConfigOptions(List(ConfigOption.Source(Paths.get("/path/to/source")))) - val result = - ParseConfigLines.parseLines(List("source = /path/to/source")) - assertResult(expected)(result) - } - } - describe("bucket") { - it("should parse") { - val expected = ConfigOptions(List(ConfigOption.Bucket("bucket-name"))) - val result = ParseConfigLines.parseLines(List("bucket = bucket-name")) - assertResult(expected)(result) - } - } - describe("prefix") { - it("should parse") { - val expected = - ConfigOptions(List(ConfigOption.Prefix("prefix/to/files"))) - val result = - ParseConfigLines.parseLines(List("prefix = prefix/to/files")) - assertResult(expected)(result) - } - } - describe("include") { - it("should parse") { - val expected = - ConfigOptions(List(ConfigOption.Include("path/to/include"))) - val result = - ParseConfigLines.parseLines(List("include = path/to/include")) - assertResult(expected)(result) - } - } - describe("exclude") { - it("should parse") { - val expected = - ConfigOptions(List(ConfigOption.Exclude("path/to/exclude"))) - val result = - ParseConfigLines.parseLines(List("exclude = path/to/exclude")) - assertResult(expected)(result) - } - } - describe("debug - true") { - it("should parse") { - val expected = ConfigOptions(List(ConfigOption.Debug())) - val result = ParseConfigLines.parseLines(List("debug = true")) - assertResult(expected)(result) - } - } - describe("debug - false") { - it("should parse") { - val expected = ConfigOptions() - val result = ParseConfigLines.parseLines(List("debug = false")) - assertResult(expected)(result) - } - } - describe("comment line") { - it("should be ignored") { - val expected = ConfigOptions() - val result = ParseConfigLines.parseLines(List("# ignore me")) - assertResult(expected)(result) - } - } - describe("unrecognised option") { - it("should be ignored") { - val expected = ConfigOptions() - val result = ParseConfigLines.parseLines(List("unsupported = option")) - assertResult(expected)(result) - } - } - } -} diff --git a/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala b/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala index 650e855..c5fe30c 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala @@ -11,6 +11,7 @@ import net.kemitix.thorp.config.{ } import net.kemitix.thorp.console._ import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} +import net.kemitix.thorp.core.hasher.Hasher import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain._ import net.kemitix.thorp.filesystem._ @@ -25,8 +26,6 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { "create a plan" - { - val hashService = SimpleHashService() - "one source" - { val options: Path => ConfigOptions = source => @@ -46,8 +45,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { val expected = Right(List(toUpload(remoteKey, hash, source, file))) val result = - invoke(hashService, - options(source), + invoke(options(source), UIO.succeed(emptyS3ObjectData), UIO.succeed(Map(file.toPath -> file))) assertResult(expected)(result) @@ -70,8 +68,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { byKey = Map(anOtherKey -> HashModified(aHash, lastModified)) ) val result = - invoke(hashService, - options(source), + invoke(options(source), UIO.succeed(s3ObjectsData), UIO.succeed(Map(aFile.toPath -> aFile, anOtherFile.toPath -> anOtherFile))) @@ -94,8 +91,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { byKey = Map(remoteKey -> HashModified(hash, lastModified)) ) val result = - invoke(hashService, - options(source), + invoke(options(source), UIO.succeed(s3ObjectsData), UIO.succeed(Map(file.toPath -> file))) assertResult(expected)(result) @@ -118,8 +114,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { Map(remoteKey -> HashModified(originalHash, lastModified)) ) val result = - invoke(hashService, - options(source), + invoke(options(source), UIO.succeed(s3ObjectsData), UIO.succeed(Map(file.toPath -> file))) assertResult(expected)(result) @@ -139,8 +134,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { byKey = Map() ) val result = - invoke(hashService, - options(source), + invoke(options(source), UIO.succeed(s3ObjectsData), UIO.succeed(Map(file.toPath -> file))) assertResult(expected)(result) @@ -165,8 +159,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { byKey = Map(remoteKey -> HashModified(hash, lastModified)) ) val result = - invoke(hashService, - options(source), + invoke(options(source), UIO.succeed(s3ObjectsData), UIO.succeed(Map(file.toPath -> file))) assertResult(expected)(result) @@ -183,8 +176,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { byKey = Map(remoteKey -> HashModified(hash, lastModified)) ) val result = - invoke(hashService, - options(source), + invoke(options(source), UIO.succeed(s3ObjectsData), UIO.succeed(Map.empty)) assertResult(expected)(result) @@ -222,7 +214,6 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { )) val result = invoke( - hashService, options(firstSource)(secondSource), UIO.succeed(emptyS3ObjectData), UIO.succeed( @@ -248,7 +239,6 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { toUpload(remoteKey1, hash1, firstSource, fileInFirstSource))) val result = invoke( - hashService, options(firstSource)(secondSource), UIO.succeed(emptyS3ObjectData), UIO.succeed( @@ -273,8 +263,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { Map(hash2 -> Set(KeyModified(remoteKey2, lastModified))), byKey = Map(remoteKey2 -> HashModified(hash2, lastModified))) val result = - invoke(hashService, - options(firstSource)(secondSource), + invoke(options(firstSource)(secondSource), UIO.succeed(s3ObjectData), UIO.succeed( Map(fileInSecondSource.toPath -> fileInSecondSource))) @@ -296,8 +285,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { Map(hash1 -> Set(KeyModified(remoteKey1, lastModified))), byKey = Map(remoteKey1 -> HashModified(hash1, lastModified))) val result = - invoke(hashService, - options(firstSource)(secondSource), + invoke(options(firstSource)(secondSource), UIO.succeed(s3ObjectData), UIO.succeed( Map(fileInFirstSource.toPath -> fileInFirstSource))) @@ -314,8 +302,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { val s3ObjectData = S3ObjectsData(byKey = Map(remoteKey1 -> HashModified(MD5Hash(""), lastModified))) val result = - invoke(hashService, - options(firstSource)(secondSource), + invoke(options(firstSource)(secondSource), UIO.succeed(s3ObjectData), UIO.succeed(Map.empty)) assertResult(expected)(result) @@ -326,12 +313,13 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { } def md5Hash(file: File): MD5Hash = { + object TestEnv extends Hasher.Live with FileSystem.Live new DefaultRuntime {} .unsafeRunSync { - hashService - .hashLocalObject(file.toPath) + Hasher + .hashObject(file.toPath) .map(_.get(MD5)) - .provide(FileSystem.Live) + .provide(TestEnv) } .toEither .toOption @@ -361,14 +349,13 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { ConfigOptions(List(configOptions: _*)) private def invoke( - hashService: HashService, configOptions: ConfigOptions, result: Task[S3ObjectsData], files: Task[Map[Path, File]] ) = { - type TestEnv = Storage with Console with Config with FileSystem + type TestEnv = Storage with Console with Config with FileSystem with Hasher val testEnv: TestEnv = new Storage.Test with Console.Test with Config.Live - with FileSystem.Live { + with FileSystem.Live with Hasher.Live { override def listResult: Task[S3ObjectsData] = result override def uploadResult: UIO[StorageQueueEvent] = Task.die(new NotImplementedError) @@ -384,7 +371,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { for { config <- ConfigurationBuilder.buildConfig(configOptions) _ <- Config.set(config) - plan <- PlanBuilder.createPlan(hashService) + plan <- PlanBuilder.createPlan } yield plan new DefaultRuntime {} diff --git a/core/src/test/scala/net/kemitix/thorp/core/MD5HashGeneratorTest.scala b/core/src/test/scala/net/kemitix/thorp/core/hasher/MD5HashGeneratorTest.scala similarity index 90% rename from core/src/test/scala/net/kemitix/thorp/core/MD5HashGeneratorTest.scala rename to core/src/test/scala/net/kemitix/thorp/core/hasher/MD5HashGeneratorTest.scala index 050a616..7237e4f 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/MD5HashGeneratorTest.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/hasher/MD5HashGeneratorTest.scala @@ -1,4 +1,4 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.core.hasher import java.nio.file.Path @@ -12,7 +12,7 @@ class MD5HashGeneratorTest extends FunSpec { describe("md5File()") { describe("read a small file (smaller than buffer)") { - val path = Resource(this, "upload/root-file").toPath + val path = Resource(this, "../upload/root-file").toPath it("should generate the correct hash") { val expected = Right(Root.hash) val result = invoke(path) @@ -21,7 +21,7 @@ class MD5HashGeneratorTest extends FunSpec { } describe("read a large file (bigger than buffer)") { - val path = Resource(this, "big-file").toPath + val path = Resource(this, "../big-file").toPath it("should generate the correct hash") { val expected = Right(BigFile.hash) val result = invoke(path) @@ -39,7 +39,7 @@ class MD5HashGeneratorTest extends FunSpec { describe("md5FileChunk") { describe("read chunks of file") { - val path = Resource(this, "big-file").toPath + val path = Resource(this, "../big-file").toPath it("should generate the correct hash for first chunk of the file") { val part1 = BigFile.Part1 val expected = Right(part1.hash.hash) diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/Filter.scala b/domain/src/main/scala/net/kemitix/thorp/domain/Filter.scala index d8b8a8e..3a730b8 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/Filter.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/Filter.scala @@ -7,7 +7,7 @@ sealed trait Filter object Filter { - def isIncluded(filters: List[Filter])(p: Path): Boolean = { + def isIncluded(p: Path)(filters: List[Filter]): Boolean = { sealed trait State case class Unknown() extends State case class Accepted() extends State diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/Monoid.scala b/domain/src/main/scala/net/kemitix/thorp/domain/Monoid.scala new file mode 100644 index 0000000..e43f060 --- /dev/null +++ b/domain/src/main/scala/net/kemitix/thorp/domain/Monoid.scala @@ -0,0 +1,6 @@ +package net.kemitix.thorp.domain + +trait Monoid[T] { + def zero: T + def op(t1: T, t2: T): T +} diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/Sources.scala b/domain/src/main/scala/net/kemitix/thorp/domain/Sources.scala index fbdb4e9..3bb13ef 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/Sources.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/Sources.scala @@ -15,12 +15,9 @@ import java.nio.file.Path case class Sources( paths: List[Path] ) { - def ++(path: Path): Sources = this ++ List(path) - def ++(otherPaths: List[Path]): Sources = - Sources(otherPaths.foldLeft(paths) { (acc, path) => - if (acc.contains(path)) acc - else acc ++ List(path) - }) + def +(path: Path)(implicit m: Monoid[Sources]): Sources = this ++ List(path) + def ++(otherPaths: List[Path])(implicit m: Monoid[Sources]): Sources = + m.op(this, Sources(otherPaths)) /** * Returns the source path for the given path. @@ -28,3 +25,17 @@ case class Sources( def forPath(path: Path): Path = paths.find(source => path.startsWith(source)).get } + +object Sources { + + final val emptySources = Sources(List.empty) + + implicit def sourcesAppendMonoid: Monoid[Sources] = new Monoid[Sources] { + override def zero: Sources = emptySources + override def op(t1: Sources, t2: Sources): Sources = + Sources(t2.paths.foldLeft(t1.paths) { (acc, path) => + if (acc.contains(path)) acc + else acc ++ List(path) + }) + } +} diff --git a/domain/src/test/scala/net/kemitix/thorp/domain/FiltersSuite.scala b/domain/src/test/scala/net/kemitix/thorp/domain/FiltersSuite.scala index 36988a2..3523c2f 100644 --- a/domain/src/test/scala/net/kemitix/thorp/domain/FiltersSuite.scala +++ b/domain/src/test/scala/net/kemitix/thorp/domain/FiltersSuite.scala @@ -86,7 +86,7 @@ class FiltersSuite extends FunSpec { } describe("isIncluded") { def invoke(filters: List[Filter]) = { - paths.filter(path => Filter.isIncluded(filters)(path)) + paths.filter(path => Filter.isIncluded(path)(filters)) } describe("when there are no filters") { diff --git a/core/src/test/scala/net/kemitix/thorp/core/TemporaryFolder.scala b/domain/src/test/scala/net/kemitix/thorp/domain/TemporaryFolder.scala similarity index 97% rename from core/src/test/scala/net/kemitix/thorp/core/TemporaryFolder.scala rename to domain/src/test/scala/net/kemitix/thorp/domain/TemporaryFolder.scala index c8c2cbc..4b481df 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/TemporaryFolder.scala +++ b/domain/src/test/scala/net/kemitix/thorp/domain/TemporaryFolder.scala @@ -1,4 +1,4 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.domain import java.io.{File, IOException, PrintWriter} import java.nio.file.attribute.BasicFileAttributes diff --git a/filesystem/src/main/scala/net/kemitix/thorp/filesystem/FileSystem.scala b/filesystem/src/main/scala/net/kemitix/thorp/filesystem/FileSystem.scala index 552e677..e43bb4b 100644 --- a/filesystem/src/main/scala/net/kemitix/thorp/filesystem/FileSystem.scala +++ b/filesystem/src/main/scala/net/kemitix/thorp/filesystem/FileSystem.scala @@ -18,7 +18,7 @@ object FileSystem { def fileExists(file: File): ZIO[FileSystem, Throwable, Boolean] def openManagedFileInputStream(file: File, offset: Long = 0L) : TaskR[FileSystem, ZManaged[Any, Throwable, FileInputStream]] - def fileLines(file: File): TaskR[FileSystem, List[String]] + def fileLines(file: File): TaskR[FileSystem, Seq[String]] } trait Live extends FileSystem { override val filesystem: Service = new Service { @@ -42,17 +42,11 @@ object FileSystem { ZIO(ZManaged.make(acquire)(release)) } - override def fileLines(file: File): TaskR[FileSystem, List[String]] = { - + override def fileLines(file: File): TaskR[FileSystem, Seq[String]] = { def acquire = ZIO(Files.lines(file.toPath)) - - def release(lines: stream.Stream[String]) = - ZIO.effectTotal(lines.close()) - def use(lines: stream.Stream[String]) = ZIO.effectTotal(lines.iterator.asScala.toList) - - ZIO.bracket(acquire)(release)(use) + acquire.bracketAuto(use) } } } @@ -84,6 +78,6 @@ object FileSystem { : TaskR[FileSystem, ZManaged[FileSystem, Throwable, FileInputStream]] = ZIO.accessM(_.filesystem openManagedFileInputStream (file, offset)) - final def lines(file: File): TaskR[FileSystem, List[String]] = + final def lines(file: File): TaskR[FileSystem, Seq[String]] = ZIO.accessM(_.filesystem fileLines (file)) } diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Copier.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Copier.scala index b95bb2b..ca0e315 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Copier.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Copier.scala @@ -14,48 +14,34 @@ import zio.{IO, Task, UIO} trait Copier { def copy(amazonS3: AmazonS3.Client)( + request: Request): UIO[StorageQueueEvent] = + copyObject(amazonS3)(request) + .fold(foldFailure(request.sourceKey, request.targetKey), + foldSuccess(request.sourceKey, request.targetKey)) + + case class Request( bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey - ): UIO[StorageQueueEvent] = - copyObject(amazonS3)(bucket, sourceKey, hash, targetKey) - .fold(foldFailure(sourceKey, targetKey), - foldSuccess(sourceKey, targetKey)) + ) - private def copyObject(amazonS3: AmazonS3.Client)( - bucket: Bucket, - sourceKey: RemoteKey, - hash: MD5Hash, - targetKey: RemoteKey - ): IO[S3ClientException, CopyObjectResult] = { - - def handleResult - : Option[CopyObjectResult] => IO[S3ClientException, CopyObjectResult] = - maybeResult => - IO.fromEither { - maybeResult - .toRight(HashError) - } - - def handleError: Throwable => IO[S3ClientException, CopyObjectResult] = - error => - Task.fail { - CopyError(error) - } - - val request = - new CopyObjectRequest( - bucket.name, - sourceKey.key, - bucket.name, - targetKey.key - ).withMatchingETagConstraint(hash.hash) + private def copyObject(amazonS3: AmazonS3.Client)(request: Request) = amazonS3 - .copyObject(request) - .fold(handleError, handleResult) + .copyObject(copyObjectRequest(request)) + .fold( + error => Task.fail(CopyError(error)), + result => IO.fromEither(result.toRight(HashError)) + ) .flatten - } + + private def copyObjectRequest(copyRequest: Request) = + new CopyObjectRequest( + copyRequest.bucket.name, + copyRequest.sourceKey.key, + copyRequest.bucket.name, + copyRequest.targetKey.key + ).withMatchingETagConstraint(copyRequest.hash.hash) private def foldFailure( sourceKey: RemoteKey, diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala index 969c860..6bca9df 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala @@ -32,11 +32,7 @@ trait Lister { token => request.withContinuationToken(token) def fetchBatch: ListObjectsV2Request => TaskR[Console, Batch] = - request => - for { - _ <- ListerLogger.logFetchBatch - batch <- tryFetchBatch(amazonS3)(request) - } yield batch + request => ListerLogger.logFetchBatch *> tryFetchBatch(amazonS3)(request) def fetchMore: Option[Token] => TaskR[Console, Stream[S3ObjectSummary]] = { case None => TaskR.succeed(Stream.empty) diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3HashService.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3HashService.scala deleted file mode 100644 index 1a4184b..0000000 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3HashService.scala +++ /dev/null @@ -1,35 +0,0 @@ -package net.kemitix.thorp.storage.aws - -import java.nio.file.Path - -import net.kemitix.thorp.core.{HashService, MD5HashGenerator} -import net.kemitix.thorp.domain.HashType.MD5 -import net.kemitix.thorp.domain.{HashType, MD5Hash} -import net.kemitix.thorp.filesystem.FileSystem -import zio.TaskR - -trait S3HashService extends HashService { - - /** - * Generates an MD5 Hash and an multi-part ETag - * - * @param path the local path to scan - * @return a set of hash values - */ - override def hashLocalObject( - path: Path - ): TaskR[FileSystem, Map[HashType, MD5Hash]] = - for { - md5 <- MD5HashGenerator.md5File(path) - etag <- ETagGenerator.eTag(path).map(MD5Hash(_)) - } yield - Map( - MD5 -> md5, - ETag -> etag - ) - -} - -object S3HashService extends S3HashService { - lazy val defaultHashService: HashService = S3HashService -} diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala index b89b4ef..acedb1f 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala @@ -38,7 +38,7 @@ object S3Storage { sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey): UIO[StorageQueueEvent] = - Copier.copy(client)(bucket, sourceKey, hash, targetKey) + Copier.copy(client)(Copier.Request(bucket, sourceKey, hash, targetKey)) override def delete(bucket: Bucket, remoteKey: RemoteKey): UIO[StorageQueueEvent] = diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala index 6bd1589..e3ad3db 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala @@ -28,22 +28,17 @@ trait Uploader { .catchAll(handleError(localFile.remoteKey)) private def handleError(remoteKey: RemoteKey)(e: Throwable) = - UIO.succeed(ErrorQueueEvent(Action.Upload(remoteKey.key), remoteKey, e)) + UIO(ErrorQueueEvent(Action.Upload(remoteKey.key), remoteKey, e)) private def transfer(transferManager: => AmazonTransferManager)( localFile: LocalFile, bucket: Bucket, uploadEventListener: UploadEventListener - ) = { - val listener = progressListener(uploadEventListener) - for { - putObjectRequest <- request(localFile, bucket, listener) - event <- dispatch(transferManager, putObjectRequest) - } yield event - } + ) = + request(localFile, bucket, progressListener(uploadEventListener)) >>= + dispatch(transferManager) - private def dispatch( - transferManager: AmazonTransferManager, + private def dispatch(transferManager: AmazonTransferManager)( putObjectRequest: PutObjectRequest ) = { transferManager diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ETagGenerator.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/hasher/ETagGenerator.scala similarity index 73% rename from storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ETagGenerator.scala rename to storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/hasher/ETagGenerator.scala index ff111b4..ada93e4 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ETagGenerator.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/hasher/ETagGenerator.scala @@ -1,29 +1,31 @@ -package net.kemitix.thorp.storage.aws +package net.kemitix.thorp.storage.aws.hasher import java.nio.file.Path import com.amazonaws.services.s3.model.PutObjectRequest import com.amazonaws.services.s3.transfer.TransferManagerConfiguration import com.amazonaws.services.s3.transfer.internal.TransferManagerUtils -import net.kemitix.thorp.core.MD5HashGenerator -import net.kemitix.thorp.domain.MD5Hash +import net.kemitix.thorp.core.hasher.Hasher +import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.filesystem.FileSystem import zio.{TaskR, ZIO} -trait ETagGenerator { +private trait ETagGenerator { def eTag( path: Path - ): TaskR[FileSystem, String] = { + ): TaskR[Hasher with FileSystem, String] = { val partSize = calculatePartSize(path) val parts = numParts(path.toFile.length, partSize) - ZIO - .foreach(partsIndex(parts))(digestChunk(path, partSize)) - .map(concatenateDigests) - .map(MD5HashGenerator.hex) + eTagHex(path, partSize, parts) .map(hash => s"$hash-$parts") } + private def eTagHex(path: Path, partSize: Long, parts: Long) = + ZIO + .foreach(partsIndex(parts))(digestChunk(path, partSize)) + .map(concatenateDigests) >>= Hasher.hex + private def partsIndex(parts: Long) = Range.Long(0, parts, 1).toList @@ -51,14 +53,10 @@ trait ETagGenerator { path: Path, chunkSize: Long )(chunkNumber: Long) = - hashChunk(path, chunkNumber, chunkSize).map(_.digest) - - def hashChunk( - path: Path, - chunkNumber: Long, - chunkSize: Long - ): TaskR[FileSystem, MD5Hash] = - MD5HashGenerator.md5FileChunk(path, chunkNumber * chunkSize, chunkSize) + Hasher + .hashObjectChunk(path, chunkNumber, chunkSize) + .map(_(MD5)) + .map(_.digest) def offsets( totalFileSizeBytes: Long, @@ -67,4 +65,4 @@ trait ETagGenerator { Range.Long(0, totalFileSizeBytes, optimalPartSize).toList } -object ETagGenerator extends ETagGenerator +private object ETagGenerator extends ETagGenerator diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/hasher/S3Hasher.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/hasher/S3Hasher.scala new file mode 100644 index 0000000..7cca3fe --- /dev/null +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/hasher/S3Hasher.scala @@ -0,0 +1,45 @@ +package net.kemitix.thorp.storage.aws.hasher + +import java.nio.file.Path + +import net.kemitix.thorp.core.hasher.Hasher +import net.kemitix.thorp.core.hasher.Hasher.Live.{hasher => CoreHasher} +import net.kemitix.thorp.core.hasher.Hasher.Service +import net.kemitix.thorp.domain.{HashType, MD5Hash} +import net.kemitix.thorp.filesystem.FileSystem +import net.kemitix.thorp.storage.aws.ETag +import zio.TaskR + +object S3Hasher { + + trait Live extends Hasher { + val hasher: Service = new Service { + + /** + * Generates an MD5 Hash and an multi-part ETag + * + * @param path the local path to scan + * @return a set of hash values + */ + override def hashObject( + path: Path): TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] = + for { + base <- CoreHasher.hashObject(path) + etag <- ETagGenerator.eTag(path).map(MD5Hash(_)) + } yield base + (ETag -> etag) + + override def hashObjectChunk(path: Path, + chunkNumber: Long, + chunkSize: Long) + : TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] = + CoreHasher.hashObjectChunk(path, chunkNumber, chunkSize) + + override def hex(in: Array[Byte]): TaskR[Hasher, String] = + CoreHasher.hex(in) + + override def digest(in: String): TaskR[Hasher, Array[Byte]] = + CoreHasher.digest(in) + } + + } +} diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala index 1fca764..2be78df 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala @@ -42,7 +42,8 @@ trait AmazonS3ClientTestFixture extends MockFactory { sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey): UIO[StorageQueueEvent] = - Copier.copy(client)(bucket, sourceKey, hash, targetKey) + Copier.copy(client)( + Copier.Request(bucket, sourceKey, hash, targetKey)) override def delete(bucket: Bucket, remoteKey: RemoteKey): UIO[StorageQueueEvent] = diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/CopierTest.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/CopierTest.scala index 463c2b8..b428b74 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/CopierTest.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/CopierTest.scala @@ -88,7 +88,8 @@ class CopierTest extends FreeSpec { amazonS3Client: AmazonS3.Client ) = runtime.unsafeRunSync { - Copier.copy(amazonS3Client)(bucket, sourceKey, hash, targetKey) + Copier.copy(amazonS3Client)( + Copier.Request(bucket, sourceKey, hash, targetKey)) }.toEither } diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ETagGeneratorTest.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/hasher/ETagGeneratorTest.scala similarity index 77% rename from storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ETagGeneratorTest.scala rename to storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/hasher/ETagGeneratorTest.scala index 9f26e6f..931943a 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ETagGeneratorTest.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/hasher/ETagGeneratorTest.scala @@ -1,18 +1,20 @@ -package net.kemitix.thorp.storage.aws +package net.kemitix.thorp.storage.aws.hasher import java.nio.file.Path import com.amazonaws.services.s3.transfer.TransferManagerConfiguration import net.kemitix.thorp.config.Resource +import net.kemitix.thorp.core.hasher.Hasher +import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.filesystem.FileSystem import org.scalatest.FunSpec import zio.DefaultRuntime class ETagGeneratorTest extends FunSpec { - private val runtime = new DefaultRuntime {} + object TestEnv extends Hasher.Live with FileSystem.Live - private val bigFile = Resource(this, "big-file") + private val bigFile = Resource(this, "../big-file") private val bigFilePath = bigFile.toPath private val configuration = new TransferManagerConfiguration private val chunkSize = 1200000 @@ -41,14 +43,16 @@ class ETagGeneratorTest extends FunSpec { md5Hashes.foreach { case (hash, index) => assertResult(Right(hash))( - invoke(bigFilePath, index, chunkSize).map(_.hash)) + invoke(bigFilePath, index, chunkSize) + .map(_(MD5)) + .map(_.hash)) } } def invoke(path: Path, index: Long, size: Long) = new DefaultRuntime {}.unsafeRunSync { - ETagGenerator - .hashChunk(path, index, size) - .provide(FileSystem.Live) + Hasher + .hashObjectChunk(path, index, size) + .provide(TestEnv) }.toEither } @@ -58,12 +62,13 @@ class ETagGeneratorTest extends FunSpec { val result = invoke(bigFilePath) assertResult(Right(expected))(result) } - def invoke(path: Path) = + def invoke(path: Path) = { new DefaultRuntime {}.unsafeRunSync { ETagGenerator .eTag(path) - .provide(FileSystem.Live) + .provide(TestEnv) }.toEither + } } }