diff --git a/CHANGELOG.org b/CHANGELOG.org index bb87882..31c8187 100644 --- a/CHANGELOG.org +++ b/CHANGELOG.org @@ -6,6 +6,12 @@ The format is based on [[https://keepachangelog.com/en/1.0.0/][Keep a Changelog] [[https://semver.org/spec/v2.0.0.html][Semantic Versioning]]. +* [0.8.0] - 2019-??-?? + +** Changed + + - Replace cats-effect with zio (#117) + * [0.7.2] - 2019-07-19 ** Changed diff --git a/build.sbt b/build.sbt index 49898a7..4c16680 100644 --- a/build.sbt +++ b/build.sbt @@ -41,7 +41,7 @@ val testDependencies = Seq( val domainDependencies = Seq( libraryDependencies ++= Seq( "com.github.julien-truffaut" %% "monocle-core" % "1.6.0", - "com.github.julien-truffaut" %% "monocle-macro" % "1.6.0", + "com.github.julien-truffaut" %% "monocle-macro" % "1.6.0" ) ) val commandLineParsing = Seq( @@ -57,9 +57,9 @@ val awsSdkDependencies = Seq( "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % "2.9.9" ) ) -val catsEffectsSettings = Seq( - libraryDependencies ++= Seq( - "org.typelevel" %% "cats-effect" % "1.3.1" +val zioDependencies = Seq( + libraryDependencies ++= Seq ( + "dev.zio" %% "zio" % "1.0.0-RC10-1" ) ) @@ -110,6 +110,7 @@ lazy val core = (project in file("core")) lazy val `storage-api` = (project in file("storage-api")) .settings(commonSettings) + .settings(zioDependencies) .settings(assemblyJarName in assembly := "storage-api.jar") .dependsOn(domain) @@ -117,5 +118,4 @@ lazy val domain = (project in file("domain")) .settings(commonSettings) .settings(domainDependencies) .settings(assemblyJarName in assembly := "domain.jar") - .settings(catsEffectsSettings) .settings(testDependencies) diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala b/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala index 38814d5..f36cc5c 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala @@ -1,20 +1,14 @@ package net.kemitix.thorp.cli -import cats.effect.ExitCase.{Canceled, Completed, Error} -import cats.effect.{ExitCode, IO, IOApp} +import zio.{App, ZIO} -object Main extends IOApp { +object Main extends App { - override def run(args: List[String]): IO[ExitCode] = { - val exitCaseLogger = new PrintLogger(false) - ParseArgs(args) - .map(Program.run) - .getOrElse(IO(ExitCode.Error)) - .guaranteeCase { - case Canceled => exitCaseLogger.warn("Interrupted") - case Error(e) => exitCaseLogger.error(e.getMessage) - case Completed => IO.unit - } - } + override def run(args: List[String]): ZIO[Environment, Nothing, Int] = { + for { + cliOptions <- ParseArgs(args) + _ <- Program.run(cliOptions) + } yield () + }.fold(failure => 1, success => 0) } diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/ParseArgs.scala b/cli/src/main/scala/net/kemitix/thorp/cli/ParseArgs.scala index 2527fd7..3042ff1 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/ParseArgs.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/ParseArgs.scala @@ -4,9 +4,17 @@ import java.nio.file.Paths import net.kemitix.thorp.core.{ConfigOption, ConfigOptions} import scopt.OParser +import zio.Task object ParseArgs { + def apply(args: List[String]): Task[ConfigOptions] = Task { + OParser + .parse(configParser, args, List()) + .map(ConfigOptions(_)) + .getOrElse(ConfigOptions()) + } + val configParser: OParser[Unit, List[ConfigOption]] = { val parserBuilder = OParser.builder[List[ConfigOption]] import parserBuilder._ @@ -49,9 +57,4 @@ object ParseArgs { ) } - def apply(args: List[String]): Option[ConfigOptions] = - OParser - .parse(configParser, args, List()) - .map(ConfigOptions(_)) - } diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/PrintLogger.scala b/cli/src/main/scala/net/kemitix/thorp/cli/PrintLogger.scala deleted file mode 100644 index af68c58..0000000 --- a/cli/src/main/scala/net/kemitix/thorp/cli/PrintLogger.scala +++ /dev/null @@ -1,25 +0,0 @@ -package net.kemitix.thorp.cli - -import cats.effect.IO -import net.kemitix.thorp.domain.Logger - -class PrintLogger(isDebug: Boolean = false) extends Logger { - - override def debug(message: => String): IO[Unit] = - if (isDebug) IO(println(s"[ DEBUG] $message")) - else IO.unit - - override def info(message: => String): IO[Unit] = - IO(println(s"[ INFO] $message")) - - override def warn(message: String): IO[Unit] = - IO(println(s"[ WARN] $message")) - - override def error(message: String): IO[Unit] = - IO(println(s"[ ERROR] $message")) - - override def withDebug(debug: Boolean): Logger = - if (isDebug == debug) this - else new PrintLogger(debug) - -} diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala index a558b87..35d07e9 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala @@ -1,74 +1,79 @@ package net.kemitix.thorp.cli -import cats.effect.{ExitCode, IO} -import cats.implicits._ import net.kemitix.thorp.core._ -import net.kemitix.thorp.domain.{Logger, StorageQueueEvent} +import net.kemitix.thorp.domain.{StorageQueueEvent, SyncTotals} import net.kemitix.thorp.storage.aws.S3HashService.defaultHashService import net.kemitix.thorp.storage.aws.S3StorageServiceBuilder.defaultStorageService +import zio.console._ +import zio.{Task, TaskR, ZIO} trait Program extends PlanBuilder { - def run(cliOptions: ConfigOptions): IO[ExitCode] = { - implicit val logger: Logger = new PrintLogger() - if (ConfigQuery.showVersion(cliOptions)) - for { - _ <- logger.info(s"Thorp v${thorp.BuildInfo.version}") - } yield ExitCode.Success - else - for { - syncPlan <- createPlan( - defaultStorageService, - defaultHashService, - cliOptions - ).valueOrF(handleErrors) - archive <- thorpArchive(cliOptions, syncPlan) - events <- handleActions(archive, syncPlan) - _ <- defaultStorageService.shutdown - _ <- SyncLogging.logRunFinished(events) - } yield ExitCode.Success + lazy val version = s"Thorp v${thorp.BuildInfo.version}" + + def run(cliOptions: ConfigOptions): ZIO[Console, Nothing, Unit] = { + val showVersion = ConfigQuery.showVersion(cliOptions) + for { + _ <- ZIO.when(showVersion)(putStrLn(version)) + _ <- ZIO.when(!showVersion)(execute(cliOptions).catchAll(handleErrors)) + } yield () } + private def execute( + cliOptions: ConfigOptions): ZIO[Console, Throwable, Unit] = { + for { + plan <- createPlan(defaultStorageService, defaultHashService, cliOptions) + archive <- thorpArchive(cliOptions, plan.syncTotals) + events <- handleActions(archive, plan) + _ <- defaultStorageService.shutdown + _ <- SyncLogging.logRunFinished(events) + } yield () + } + + private def handleErrors(throwable: Throwable): ZIO[Console, Nothing, Unit] = + for { + _ <- putStrLn("There were errors:") + _ <- throwable match { + case ConfigValidationException(errors) => + ZIO.foreach(errors)(error => putStrLn(s"- $error")) + case x => throw x + } + } yield () + def thorpArchive( cliOptions: ConfigOptions, - syncPlan: SyncPlan - ): IO[ThorpArchive] = - IO.pure( - UnversionedMirrorArchive.default( - defaultStorageService, - ConfigQuery.batchMode(cliOptions), - syncPlan.syncTotals - )) - - private def handleErrors( - implicit logger: Logger - ): List[String] => IO[SyncPlan] = errors => { - for { - _ <- logger.error("There were errors:") - _ <- errors.map(error => logger.error(s" - $error")).sequence - } yield SyncPlan() + syncTotals: SyncTotals + ): Task[ThorpArchive] = Task { + UnversionedMirrorArchive.default( + defaultStorageService, + ConfigQuery.batchMode(cliOptions), + syncTotals + ) } private def handleActions( archive: ThorpArchive, syncPlan: SyncPlan - )(implicit l: Logger): IO[Stream[StorageQueueEvent]] = { - type Accumulator = (Stream[IO[StorageQueueEvent]], Long) + ): TaskR[Console, Stream[StorageQueueEvent]] = { + type Accumulator = (Stream[StorageQueueEvent], Long) val zero: Accumulator = (Stream(), syncPlan.syncTotals.totalSizeBytes) - val (actions, _) = syncPlan.actions.zipWithIndex.reverse - .foldLeft(zero) { (acc: Accumulator, indexedAction) => - { - val (stream, bytesToDo) = acc + TaskR + .foldLeft(syncPlan.actions.reverse.zipWithIndex)(zero)( + (acc, indexedAction) => { val (action, index) = indexedAction + val (stream, bytesToDo) = acc val remainingBytes = bytesToDo - action.size - ( - archive.update(index, action, remainingBytes) ++ stream, - remainingBytes - ) - } + (for { + event <- archive.update(index, action, remainingBytes) + events = stream ++ Stream(event) + } yield events) + .map((_, remainingBytes)) + }) + .map { + case (events, _) => events } - actions.sequence } + } object Program extends Program diff --git a/cli/src/test/scala/net/kemitix/thorp/cli/ParseArgsTest.scala b/cli/src/test/scala/net/kemitix/thorp/cli/ParseArgsTest.scala index 253e905..bc94662 100644 --- a/cli/src/test/scala/net/kemitix/thorp/cli/ParseArgsTest.scala +++ b/cli/src/test/scala/net/kemitix/thorp/cli/ParseArgsTest.scala @@ -5,16 +5,19 @@ import java.nio.file.Paths import net.kemitix.thorp.core.ConfigOption.Debug import net.kemitix.thorp.core.{ConfigOptions, ConfigQuery, Resource} import org.scalatest.FunSpec +import zio.DefaultRuntime import scala.util.Try class ParseArgsTest extends FunSpec { + private val runtime = new DefaultRuntime {} + val source = Resource(this, "") describe("parse - source") { def invokeWithSource(path: String) = - ParseArgs(List("--source", path, "--bucket", "bucket")) + invoke(List("--source", path, "--bucket", "bucket")) describe("when source is a directory") { it("should succeed") { @@ -31,7 +34,7 @@ class ParseArgsTest extends FunSpec { List("--source", "path1", "--source", "path2", "--bucket", "bucket") it("should get multiple sources") { val expected = Some(Set("path1", "path2").map(Paths.get(_))) - val configOptions = ParseArgs(args) + val configOptions = invoke(args) val result = configOptions.map(ConfigQuery.sources(_).paths.toSet) assertResult(expected)(result) } @@ -42,7 +45,7 @@ class ParseArgsTest extends FunSpec { def invokeWithArgument(arg: String): ConfigOptions = { val strings = List("--source", pathTo("."), "--bucket", "bucket", arg) .filter(_ != "") - val maybeOptions = ParseArgs(strings) + val maybeOptions = invoke(strings) maybeOptions.getOrElse(ConfigOptions()) } @@ -71,4 +74,12 @@ class ParseArgsTest extends FunSpec { .map(_.getCanonicalPath) .getOrElse("[not-found]") + private def invoke(args: List[String]) = + runtime + .unsafeRunSync { + ParseArgs(args) + } + .toEither + .toOption + } diff --git a/cli/src/test/scala/net/kemitix/thorp/cli/ProgramTest.scala b/cli/src/test/scala/net/kemitix/thorp/cli/ProgramTest.scala index 7e2834b..921dd30 100644 --- a/cli/src/test/scala/net/kemitix/thorp/cli/ProgramTest.scala +++ b/cli/src/test/scala/net/kemitix/thorp/cli/ProgramTest.scala @@ -3,16 +3,19 @@ package net.kemitix.thorp.cli import java.io.File import java.nio.file.Path -import cats.data.EitherT -import cats.effect.IO import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.core._ +import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.api.{HashService, StorageService} import org.scalatest.FunSpec +import zio.console.Console +import zio.{DefaultRuntime, Task, TaskR} class ProgramTest extends FunSpec { + private val runtime = new DefaultRuntime {} + val source: File = Resource(this, ".") val sourcePath: Path = source.toPath val bucket: Bucket = Bucket("aBucket") @@ -35,36 +38,46 @@ class ProgramTest extends FunSpec { val archive = TestProgram.thorpArchive it("should be handled in correct order") { val expected = List(copyAction, uploadAction, deleteAction) - TestProgram.run(configOptions).unsafeRunSync + invoke(configOptions) val result = archive.actions assertResult(expected)(result) } } + private def invoke(configOptions: ConfigOptions) = + runtime.unsafeRunSync { + TestProgram.run(configOptions) + }.toEither + trait TestPlanBuilder extends PlanBuilder { - override def createPlan(storageService: StorageService, - hashService: HashService, - configOptions: ConfigOptions)( - implicit l: Logger): EitherT[IO, List[String], SyncPlan] = { - EitherT.right( - IO(SyncPlan(Stream(copyAction, uploadAction, deleteAction)))) + override def createPlan( + storageService: StorageService, + hashService: HashService, + configOptions: ConfigOptions + ): Task[SyncPlan] = { + Task(SyncPlan(Stream(copyAction, uploadAction, deleteAction))) } } class ActionCaptureArchive extends ThorpArchive { var actions: List[Action] = List[Action]() - override def update(index: Int, action: Action, totalBytesSoFar: Long)( - implicit l: Logger): Stream[IO[StorageQueueEvent]] = { + override def update( + index: Int, + action: Action, + totalBytesSoFar: Long + ): TaskR[Console, StorageQueueEvent] = { actions = action :: actions - Stream() + TaskR(DoNothingQueueEvent(RemoteKey(""))) } } object TestProgram extends Program with TestPlanBuilder { val thorpArchive: ActionCaptureArchive = new ActionCaptureArchive - override def thorpArchive(cliOptions: ConfigOptions, - syncPlan: SyncPlan): IO[ThorpArchive] = - IO.pure(thorpArchive) + override def thorpArchive( + cliOptions: ConfigOptions, + syncTotals: SyncTotals + ): Task[ThorpArchive] = + Task(thorpArchive) } } diff --git a/core/src/main/scala/net/kemitix/thorp/core/ConfigOptions.scala b/core/src/main/scala/net/kemitix/thorp/core/ConfigOptions.scala index 3b7a523..0fdd95f 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ConfigOptions.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ConfigOptions.scala @@ -1,14 +1,13 @@ package net.kemitix.thorp.core -import cats.Semigroup import monocle.Lens import monocle.macros.GenLens case class ConfigOptions( options: List[ConfigOption] = List() -) extends Semigroup[ConfigOptions] { +) { - override def combine( + def combine( x: ConfigOptions, y: ConfigOptions ): ConfigOptions = diff --git a/core/src/main/scala/net/kemitix/thorp/core/ConfigValidation.scala b/core/src/main/scala/net/kemitix/thorp/core/ConfigValidation.scala index de9ad81..10a3c73 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ConfigValidation.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ConfigValidation.scala @@ -1,5 +1,7 @@ package net.kemitix.thorp.core +import java.nio.file.Path + sealed trait ConfigValidation { def errorMessage: String @@ -19,4 +21,11 @@ object ConfigValidation { override def errorMessage: String = "Bucket name is missing" } + case class ErrorReadingFile( + path: Path, + message: String + ) extends ConfigValidation { + override def errorMessage: String = s"Error reading file '$path': $message" + } + } diff --git a/core/src/main/scala/net/kemitix/thorp/core/ConfigValidationException.scala b/core/src/main/scala/net/kemitix/thorp/core/ConfigValidationException.scala new file mode 100644 index 0000000..37e4e20 --- /dev/null +++ b/core/src/main/scala/net/kemitix/thorp/core/ConfigValidationException.scala @@ -0,0 +1,5 @@ +package net.kemitix.thorp.core + +final case class ConfigValidationException( + errors: List[ConfigValidation] +) extends Exception diff --git a/core/src/main/scala/net/kemitix/thorp/core/ConfigValidator.scala b/core/src/main/scala/net/kemitix/thorp/core/ConfigValidator.scala index a3222ed..0fdb2ce 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ConfigValidator.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ConfigValidator.scala @@ -2,42 +2,57 @@ package net.kemitix.thorp.core import java.nio.file.Path -import cats.data.{NonEmptyChain, Validated, ValidatedNec} -import cats.implicits._ import net.kemitix.thorp.domain.{Bucket, Config, Sources} +import zio.IO sealed trait ConfigValidator { - type ValidationResult[A] = ValidatedNec[ConfigValidation, A] - def validateConfig( - config: Config): Validated[NonEmptyChain[ConfigValidation], Config] = - ( - validateSources(config.sources), - validateBucket(config.bucket) - ).mapN((_, _) => config) + config: Config + ): IO[List[ConfigValidation], Config] = IO.fromEither { + for { + _ <- validateSources(config.sources) + _ <- validateBucket(config.bucket) + } yield config + } - def validateBucket(bucket: Bucket): ValidationResult[Bucket] = - if (bucket.name.isEmpty) ConfigValidation.BucketNameIsMissing.invalidNec - else bucket.validNec + def validateBucket(bucket: Bucket): Either[List[ConfigValidation], Bucket] = + if (bucket.name.isEmpty) Left(List(ConfigValidation.BucketNameIsMissing)) + else Right(bucket) - def validateSources(sources: Sources): ValidationResult[Sources] = - sources.paths - .map(validateSource) - .sequence - .map(_ => sources) + def validateSources( + sources: Sources): Either[List[ConfigValidation], Sources] = + (for { + x <- sources.paths.foldLeft(List[ConfigValidation]()) { + (acc: List[ConfigValidation], path) => + { + validateSource(path) match { + case Left(errors) => acc ++ errors + case Right(_) => acc + } + } + } + } yield x) match { + case Nil => Right(sources) + case errors => Left(errors) + } - def validateSource(source: Path): ValidationResult[Path] = - validateSourceIsDirectory(source) - .andThen(s => validateSourceIsReadable(s)) + def validateSource(source: Path): Either[List[ConfigValidation], Path] = + for { + _ <- validateSourceIsDirectory(source) + _ <- validateSourceIsReadable(source) + } yield source - def validateSourceIsDirectory(source: Path): ValidationResult[Path] = - if (source.toFile.isDirectory) source.validNec - else ConfigValidation.SourceIsNotADirectory.invalidNec + def validateSourceIsDirectory( + source: Path): Either[List[ConfigValidation], Path] = + if (source.toFile.isDirectory) Right(source) + else Left(List(ConfigValidation.SourceIsNotADirectory)) + + def validateSourceIsReadable( + source: Path): Either[List[ConfigValidation], Path] = + if (source.toFile.canRead) Right(source) + else Left(List(ConfigValidation.SourceIsNotReadable)) - def validateSourceIsReadable(source: Path): ValidationResult[Path] = - if (source.toFile.canRead) source.validNec - else ConfigValidation.SourceIsNotReadable.invalidNec } object ConfigValidator extends ConfigValidator diff --git a/core/src/main/scala/net/kemitix/thorp/core/ConfigurationBuilder.scala b/core/src/main/scala/net/kemitix/thorp/core/ConfigurationBuilder.scala index 9d19da3..df50081 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ConfigurationBuilder.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ConfigurationBuilder.scala @@ -2,12 +2,11 @@ package net.kemitix.thorp.core import java.nio.file.Paths -import cats.data.NonEmptyChain -import cats.effect.IO +import net.kemitix.thorp.core.ConfigOptions.options import net.kemitix.thorp.core.ConfigValidator.validateConfig import net.kemitix.thorp.core.ParseConfigFile.parseFile -import net.kemitix.thorp.core.ConfigOptions.options import net.kemitix.thorp.domain.Config +import zio.IO /** * Builds a configuration from settings in a file within the @@ -19,25 +18,31 @@ trait ConfigurationBuilder { private val globalConfig = Paths.get("/etc/thorp.conf") private val userHome = Paths.get(System.getProperty("user.home")) - def buildConfig(priorityOpts: ConfigOptions) - : IO[Either[NonEmptyChain[ConfigValidation], Config]] = { - val sources = ConfigQuery.sources(priorityOpts) + def buildConfig( + priorityOpts: ConfigOptions): IO[List[ConfigValidation], Config] = for { - sourceOpts <- SourceConfigLoader.loadSourceConfigs(sources) + config <- getConfigOptions(priorityOpts).map(collateOptions) + valid <- validateConfig(config) + } yield valid + + private def getConfigOptions( + priorityOpts: ConfigOptions): IO[List[ConfigValidation], ConfigOptions] = + for { + sourceOpts <- SourceConfigLoader.loadSourceConfigs( + ConfigQuery.sources(priorityOpts)) userOpts <- userOptions(priorityOpts ++ sourceOpts) globalOpts <- globalOptions(priorityOpts ++ sourceOpts ++ userOpts) - collected = priorityOpts ++ sourceOpts ++ userOpts ++ globalOpts - config = collateOptions(collected) - } yield validateConfig(config).toEither - } + } yield priorityOpts ++ sourceOpts ++ userOpts ++ globalOpts - private val emptyConfig = IO(ConfigOptions()) + private val emptyConfig = IO.succeed(ConfigOptions()) - private def userOptions(priorityOpts: ConfigOptions) = + private def userOptions( + priorityOpts: ConfigOptions): IO[List[ConfigValidation], ConfigOptions] = if (ConfigQuery.ignoreUserOptions(priorityOpts)) emptyConfig else parseFile(userHome.resolve(userConfigFilename)) - private def globalOptions(priorityOpts: ConfigOptions) = + private def globalOptions( + priorityOpts: ConfigOptions): IO[List[ConfigValidation], ConfigOptions] = if (ConfigQuery.ignoreGlobalOptions(priorityOpts)) emptyConfig else parseFile(globalConfig) diff --git a/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala b/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala index e7facd8..42ceca3 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala @@ -2,53 +2,46 @@ package net.kemitix.thorp.core import java.nio.file.Path -import cats.effect.IO import net.kemitix.thorp.core.KeyGenerator.generateKey import net.kemitix.thorp.domain import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.api.HashService +import zio.Task object LocalFileStream { - private val emptyIOLocalFiles = IO.pure(LocalFiles()) - def findFiles( source: Path, hashService: HashService )( - implicit c: Config, - logger: Logger - ): IO[LocalFiles] = { + implicit c: Config + ): Task[LocalFiles] = { val isIncluded: Path => Boolean = Filter.isIncluded(c.filters) - val pathToLocalFile: Path => IO[LocalFiles] = path => - localFile(hashService, logger, c)(path) + val pathToLocalFile: Path => Task[LocalFiles] = path => + localFile(hashService, c)(path) - def loop(path: Path): IO[LocalFiles] = { + def loop(path: Path): Task[LocalFiles] = { - def dirPaths(path: Path) = + def dirPaths(path: Path): Task[Stream[Path]] = listFiles(path) .map(_.filter(isIncluded)) - def recurseIntoSubDirectories(path: Path) = + def recurseIntoSubDirectories(path: Path): Task[LocalFiles] = path.toFile match { case f if f.isDirectory => loop(path) case _ => pathToLocalFile(path) } - def recurse(paths: Stream[Path]) = - paths.foldLeft(emptyIOLocalFiles)( - (acc, path) => - recurseIntoSubDirectories(path) - .flatMap(localFiles => - acc.map(accLocalFiles => accLocalFiles ++ localFiles))) + def recurse(paths: Stream[Path]): Task[LocalFiles] = + Task.foldLeft(paths)(LocalFiles())((acc, path) => { + recurseIntoSubDirectories(path).map(localFiles => acc ++ localFiles) + }) for { - _ <- logger.debug(s"- Entering: $path") paths <- dirPaths(path) localFiles <- recurse(paths) - _ <- logger.debug(s"- Leaving : $path") } yield localFiles } @@ -57,14 +50,13 @@ object LocalFileStream { def localFile( hashService: HashService, - l: Logger, c: Config - ): Path => IO[LocalFiles] = + ): Path => Task[LocalFiles] = path => { val file = path.toFile val source = c.sources.forPath(path) for { - hash <- hashService.hashLocalObject(path)(l) + hash <- hashService.hashLocalObject(path) } yield LocalFiles(localFiles = Stream( domain.LocalFile(file, @@ -75,15 +67,11 @@ object LocalFileStream { totalSizeBytes = file.length) } - //TODO: Change this to return an Either[IllegalArgumentException, Stream[Path]] - private def listFiles(path: Path) = { - IO( - Option(path.toFile.listFiles) - .map { fs => - Stream(fs: _*) - .map(_.toPath) - } - .getOrElse( - throw new IllegalArgumentException(s"Directory not found $path"))) - } + private def listFiles(path: Path): Task[Stream[Path]] = + for { + files <- Task(path.toFile.listFiles) + _ <- Task.when(files == null)( + Task.fail(new IllegalArgumentException(s"Directory not found $path"))) + } yield Stream(files: _*).map(_.toPath) + } diff --git a/core/src/main/scala/net/kemitix/thorp/core/MD5HashGenerator.scala b/core/src/main/scala/net/kemitix/thorp/core/MD5HashGenerator.scala index 76fe67f..3972846 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/MD5HashGenerator.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/MD5HashGenerator.scala @@ -4,8 +4,8 @@ import java.io.{File, FileInputStream} import java.nio.file.Path import java.security.MessageDigest -import cats.effect.IO -import net.kemitix.thorp.domain.{Logger, MD5Hash} +import net.kemitix.thorp.domain.MD5Hash +import zio.Task import scala.collection.immutable.NumericRange @@ -26,21 +26,19 @@ object MD5HashGenerator { md5.digest } - def md5File(path: Path)(implicit logger: Logger): IO[MD5Hash] = + def md5File(path: Path): Task[MD5Hash] = md5FileChunk(path, 0, path.toFile.length) def md5FileChunk( path: Path, offset: Long, size: Long - )(implicit logger: Logger): IO[MD5Hash] = { + ): Task[MD5Hash] = { val file = path.toFile val endOffset = Math.min(offset + size, file.length) for { - _ <- logger.debug(s"md5:reading:size ${file.length}:$path") digest <- readFile(file, offset, endOffset) hash = MD5Hash.fromDigest(digest) - _ <- logger.debug(s"md5:generated:${hash.hash}:$path") } yield hash } @@ -58,20 +56,20 @@ object MD5HashGenerator { private def openFile( file: File, offset: Long - ) = IO { + ) = Task { val stream = new FileInputStream(file) stream skip offset stream } - private def closeFile(fis: FileInputStream) = IO(fis.close()) + private def closeFile(fis: FileInputStream) = Task(fis.close()) private def digestFile( fis: FileInputStream, offset: Long, endOffset: Long ) = - IO { + Task { val md5 = MessageDigest getInstance "MD5" NumericRange(offset, endOffset, maxBufferSize) .foreach(currentOffset => diff --git a/core/src/main/scala/net/kemitix/thorp/core/ParseConfigFile.scala b/core/src/main/scala/net/kemitix/thorp/core/ParseConfigFile.scala index fce315e..2d4bd07 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ParseConfigFile.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ParseConfigFile.scala @@ -2,25 +2,29 @@ package net.kemitix.thorp.core import java.nio.file.{Files, Path} -import cats.effect.IO +import zio.{IO, Task} import scala.collection.JavaConverters._ trait ParseConfigFile { - def parseFile(filename: Path): IO[ConfigOptions] = + def parseFile(filename: Path): IO[List[ConfigValidation], ConfigOptions] = readFile(filename) .map(ParseConfigLines.parseLines) + .catchAll(h => + IO.fail( + List(ConfigValidation.ErrorReadingFile(filename, h.getMessage)))) - private def readFile(filename: Path) = { + private def readFile(filename: Path): Task[List[String]] = { if (Files.exists(filename)) readFileThatExists(filename) - else IO.pure(List()) + else IO(List()) } - private def readFileThatExists(filename: Path) = + private def readFileThatExists(filename: Path): Task[List[String]] = for { lines <- IO(Files.lines(filename)) list = lines.iterator.asScala.toList + //FIXME: use a bracket to close the file _ <- IO(lines.close()) } yield list diff --git a/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala b/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala index 2e8c4aa..7f7796c 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala @@ -1,11 +1,10 @@ package net.kemitix.thorp.core -import cats.data.{EitherT, NonEmptyChain} -import cats.effect.IO -import cats.implicits._ import net.kemitix.thorp.core.Action.DoNothing import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.api.{HashService, StorageService} +import zio.console._ +import zio.{Task, TaskR} trait PlanBuilder { @@ -13,30 +12,29 @@ trait PlanBuilder { storageService: StorageService, hashService: HashService, configOptions: ConfigOptions - )(implicit l: Logger): EitherT[IO, List[String], SyncPlan] = - EitherT(ConfigurationBuilder.buildConfig(configOptions)) - .leftMap(errorMessages) - .flatMap(config => useValidConfig(storageService, hashService)(config, l)) - - def errorMessages(errors: NonEmptyChain[ConfigValidation]): List[String] = - errors.map(cv => cv.errorMessage).toList + ): TaskR[Console, SyncPlan] = + ConfigurationBuilder + .buildConfig(configOptions) + .catchAll(errors => TaskR.fail(ConfigValidationException(errors))) + .flatMap(config => useValidConfig(storageService, hashService)(config)) def useValidConfig( storageService: StorageService, hashService: HashService - )(implicit c: Config, l: Logger): EitherT[IO, List[String], SyncPlan] = + )(implicit c: Config): TaskR[Console, SyncPlan] = { for { - _ <- EitherT.liftF(SyncLogging.logRunStart(c.bucket, c.prefix, c.sources)) + _ <- SyncLogging.logRunStart(c.bucket, c.prefix, c.sources) actions <- buildPlan(storageService, hashService) } yield actions + } private def buildPlan( storageService: StorageService, hashService: HashService - )(implicit c: Config, l: Logger) = - gatherMetadata(storageService, hashService) - .leftMap(List(_)) - .map(assemblePlan) + )(implicit c: Config): TaskR[Console, SyncPlan] = + for { + metadata <- gatherMetadata(storageService, hashService) + } yield assemblePlan(c)(metadata) def assemblePlan( implicit c: Config): ((S3ObjectsData, LocalFiles)) => SyncPlan = { @@ -91,21 +89,20 @@ trait PlanBuilder { private def gatherMetadata( storageService: StorageService, hashService: HashService - )(implicit l: Logger, - c: Config): EitherT[IO, String, (S3ObjectsData, LocalFiles)] = + )(implicit c: Config): TaskR[Console, (S3ObjectsData, LocalFiles)] = for { remoteData <- fetchRemoteData(storageService) - localData <- EitherT.liftF(findLocalFiles(hashService)) + localData <- findLocalFiles(hashService) } yield (remoteData, localData) private def fetchRemoteData( storageService: StorageService - )(implicit c: Config, l: Logger) = + )(implicit c: Config): TaskR[Console, S3ObjectsData] = storageService.listObjects(c.bucket, c.prefix) private def findLocalFiles( hashService: HashService - )(implicit config: Config, l: Logger) = + )(implicit config: Config): TaskR[Console, LocalFiles] = for { _ <- SyncLogging.logFileScan localFiles <- findFiles(hashService) @@ -113,19 +110,10 @@ trait PlanBuilder { private def findFiles( hashService: HashService - )(implicit c: Config, l: Logger) = { - val ioListLocalFiles = (for { - source <- c.sources.paths - } yield LocalFileStream.findFiles(source, hashService)).sequence - for { - listLocalFiles <- ioListLocalFiles - localFiles = listLocalFiles.foldRight(LocalFiles()) { - (acc, moreLocalFiles) => - { - acc ++ moreLocalFiles - } - } - } yield localFiles + )(implicit c: Config): Task[LocalFiles] = { + Task + .foreach(c.sources.paths)(LocalFileStream.findFiles(_, hashService)) + .map(_.foldLeft(LocalFiles())((acc, localFile) => acc ++ localFile)) } } diff --git a/core/src/main/scala/net/kemitix/thorp/core/SimpleHashService.scala b/core/src/main/scala/net/kemitix/thorp/core/SimpleHashService.scala index bc54aa6..6991af6 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/SimpleHashService.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/SimpleHashService.scala @@ -2,15 +2,15 @@ package net.kemitix.thorp.core import java.nio.file.Path -import cats.effect.IO -import net.kemitix.thorp.domain.{Logger, MD5Hash} +import net.kemitix.thorp.domain.MD5Hash import net.kemitix.thorp.storage.api.HashService +import zio.Task case class SimpleHashService() extends HashService { override def hashLocalObject( path: Path - )(implicit l: Logger): IO[Map[String, MD5Hash]] = + ): Task[Map[String, MD5Hash]] = for { md5 <- MD5HashGenerator.md5File(path) } yield Map("md5" -> md5) diff --git a/core/src/main/scala/net/kemitix/thorp/core/SourceConfigLoader.scala b/core/src/main/scala/net/kemitix/thorp/core/SourceConfigLoader.scala index 2b9daa6..2669802 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/SourceConfigLoader.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/SourceConfigLoader.scala @@ -1,25 +1,26 @@ package net.kemitix.thorp.core -import cats.effect.IO -import cats.implicits._ import net.kemitix.thorp.domain.Sources +import zio.IO trait SourceConfigLoader { val thorpConfigFileName = ".thorp.conf" - def loadSourceConfigs: Sources => IO[ConfigOptions] = + def loadSourceConfigs: Sources => IO[List[ConfigValidation], ConfigOptions] = sources => { val sourceConfigOptions = - ConfigOptions(sources.paths.map(ConfigOption.Source(_))) + ConfigOptions(sources.paths.map(ConfigOption.Source)) val reduce: List[ConfigOptions] => ConfigOptions = - _.foldLeft(sourceConfigOptions) { (acc, co) => acc ++ co } + _.foldLeft(sourceConfigOptions) { (acc, co) => + acc ++ co + } - sources.paths - .map(_.resolve(thorpConfigFileName)) - .map(ParseConfigFile.parseFile).sequence + IO.foreach(sources.paths) { path => + ParseConfigFile.parseFile(path.resolve(thorpConfigFileName)) + } .map(reduce) } diff --git a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala b/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala index d86c651..a4d6c32 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala @@ -1,7 +1,5 @@ package net.kemitix.thorp.core -import cats.effect.IO -import cats.implicits._ import net.kemitix.thorp.domain.StorageQueueEvent.{ CopyQueueEvent, DeleteQueueEvent, @@ -9,6 +7,8 @@ import net.kemitix.thorp.domain.StorageQueueEvent.{ UploadQueueEvent } import net.kemitix.thorp.domain._ +import zio.ZIO +import zio.console._ trait SyncLogging { @@ -16,40 +16,44 @@ trait SyncLogging { bucket: Bucket, prefix: RemoteKey, sources: Sources - )(implicit logger: Logger): IO[Unit] = { + ): ZIO[Console, Nothing, Unit] = { val sourcesList = sources.paths.mkString(", ") - logger.info( - List(s"Bucket: ${bucket.name}", - s"Prefix: ${prefix.key}", - s"Source: $sourcesList") - .mkString(", ")) + for { + _ <- putStrLn( + List(s"Bucket: ${bucket.name}", + s"Prefix: ${prefix.key}", + s"Source: $sourcesList") + .mkString(", ")) + } yield () } - def logFileScan(implicit c: Config, logger: Logger): IO[Unit] = - logger.info(s"Scanning local files: ${c.sources.paths.mkString(", ")}...") + def logFileScan(implicit c: Config): ZIO[Console, Nothing, Unit] = + putStrLn(s"Scanning local files: ${c.sources.paths.mkString(", ")}...") def logRunFinished( actions: Stream[StorageQueueEvent] - )(implicit logger: Logger): IO[Unit] = { + ): ZIO[Console, Nothing, Unit] = { val counters = actions.foldLeft(Counters())(countActivities) for { - _ <- logger.info(s"Uploaded ${counters.uploaded} files") - _ <- logger.info(s"Copied ${counters.copied} files") - _ <- logger.info(s"Deleted ${counters.deleted} files") - _ <- logger.info(s"Errors ${counters.errors}") + _ <- putStrLn(s"Uploaded ${counters.uploaded} files") + _ <- putStrLn(s"Copied ${counters.copied} files") + _ <- putStrLn(s"Deleted ${counters.deleted} files") + _ <- putStrLn(s"Errors ${counters.errors}") _ <- logErrors(actions) } yield () } def logErrors( actions: Stream[StorageQueueEvent] - )(implicit logger: Logger): IO[Unit] = - for { - _ <- actions.map { - case ErrorQueueEvent(k, e) => logger.warn(s"${k.key}: ${e.getMessage}") - case _ => IO.unit - }.sequence - } yield () + ): ZIO[Console, Nothing, Unit] = { + ZIO.foldLeft(actions)(()) { (_, action) => + action match { + case ErrorQueueEvent(k, e) => + putStrLn(s"${k.key}: ${e.getMessage}") + case _ => ZIO.unit + } + } + } private def countActivities: (Counters, StorageQueueEvent) => Counters = (counters: Counters, s3Action: StorageQueueEvent) => { diff --git a/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala b/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala index 0382aff..297dce3 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala @@ -1,7 +1,8 @@ package net.kemitix.thorp.core -import cats.effect.IO -import net.kemitix.thorp.domain.{LocalFile, Logger, StorageQueueEvent} +import net.kemitix.thorp.domain.{LocalFile, StorageQueueEvent} +import zio.TaskR +import zio.console._ trait ThorpArchive { @@ -9,13 +10,15 @@ trait ThorpArchive { index: Int, action: Action, totalBytesSoFar: Long - )(implicit l: Logger): Stream[IO[StorageQueueEvent]] + ): TaskR[Console, StorageQueueEvent] def logFileUploaded( localFile: LocalFile, batchMode: Boolean - )(implicit l: Logger): IO[Unit] = - if (batchMode) l.info(s"Uploaded: ${localFile.remoteKey.key}") - else IO.unit + ): TaskR[Console, Unit] = + for { + _ <- TaskR.when(batchMode)( + putStrLn(s"Uploaded: ${localFile.remoteKey.key}")) + } yield () } diff --git a/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala b/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala index d353aa6..0a1dd60 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala @@ -1,10 +1,11 @@ package net.kemitix.thorp.core -import cats.effect.IO import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.api.StorageService +import zio.console.Console +import zio.{Task, TaskR} case class UnversionedMirrorArchive( storageService: StorageService, @@ -16,8 +17,8 @@ case class UnversionedMirrorArchive( index: Int, action: Action, totalBytesSoFar: Long - )(implicit l: Logger): Stream[IO[StorageQueueEvent]] = - Stream(action match { + ): TaskR[Console, StorageQueueEvent] = + action match { case ToUpload(bucket, localFile, _) => for { event <- doUpload(index, totalBytesSoFar, bucket, localFile) @@ -32,8 +33,8 @@ case class UnversionedMirrorArchive( event <- storageService.delete(bucket, remoteKey) } yield event case DoNothing(_, remoteKey, _) => - IO.pure(DoNothingQueueEvent(remoteKey)) - }) + Task(DoNothingQueueEvent(remoteKey)) + } private def doUpload( index: Int, diff --git a/core/src/test/scala/net/kemitix/thorp/core/ConfigOptionTest.scala b/core/src/test/scala/net/kemitix/thorp/core/ConfigOptionTest.scala index 997c845..73f5476 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/ConfigOptionTest.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/ConfigOptionTest.scala @@ -2,6 +2,7 @@ package net.kemitix.thorp.core import net.kemitix.thorp.domain.Sources import org.scalatest.FunSpec +import zio.DefaultRuntime class ConfigOptionTest extends FunSpec with TemporaryFolder { @@ -27,6 +28,9 @@ class ConfigOptionTest extends FunSpec with TemporaryFolder { } private def invoke(configOptions: ConfigOptions) = { - ConfigurationBuilder.buildConfig(configOptions).unsafeRunSync + val runtime = new DefaultRuntime {} + runtime.unsafeRunSync { + ConfigurationBuilder.buildConfig(configOptions) + }.toEither } } diff --git a/core/src/test/scala/net/kemitix/thorp/core/ConfigurationBuilderTest.scala b/core/src/test/scala/net/kemitix/thorp/core/ConfigurationBuilderTest.scala index fb38050..df75c5f 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/ConfigurationBuilderTest.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/ConfigurationBuilderTest.scala @@ -5,6 +5,7 @@ import java.nio.file.{Path, Paths} import net.kemitix.thorp.domain.Filter.{Exclude, Include} import net.kemitix.thorp.domain._ import org.scalatest.FunSpec +import zio.DefaultRuntime class ConfigurationBuilderTest extends FunSpec with TemporaryFolder { @@ -23,8 +24,8 @@ class ConfigurationBuilderTest extends FunSpec with TemporaryFolder { describe("when no source") { it("should use the current (PWD) directory") { val expected = Right(Sources(List(pwd))) - val options = configOptions(coBucket) - val result = invoke(options).map(_.sources) + val options = configOptions(coBucket) + val result = invoke(options).map(_.sources) assertResult(expected)(result) } } @@ -32,11 +33,12 @@ class ConfigurationBuilderTest extends FunSpec with TemporaryFolder { describe("with .thorp.conf") { describe("with settings") { withDirectory(source => { - val configFileName = createFile(source, thorpConfigFileName, - "bucket = a-bucket", - "prefix = a-prefix", - "include = an-inclusion", - "exclude = an-exclusion") + val configFileName = createFile(source, + thorpConfigFileName, + "bucket = a-bucket", + "prefix = a-prefix", + "include = an-inclusion", + "exclude = an-exclusion") val result = invoke(configOptions(ConfigOption.Source(source))) it("should have bucket") { val expected = Right(Bucket("a-bucket")) @@ -47,7 +49,8 @@ class ConfigurationBuilderTest extends FunSpec with TemporaryFolder { assertResult(expected)(result.map(_.prefix)) } it("should have filters") { - val expected = Right(List(Exclude("an-exclusion"), Include("an-inclusion"))) + val expected = + Right(List(Exclude("an-exclusion"), Include("an-inclusion"))) assertResult(expected)(result.map(_.filters)) } }) @@ -120,9 +123,9 @@ class ConfigurationBuilderTest extends FunSpec with TemporaryFolder { // should have prefix from current only val expectedPrefixes = Right(RemoteKey("current-prefix")) // should have filters from both sources - val expectedFilters = Right(List( - Filter.Exclude("current-exclude"), - Filter.Include("current-include"))) + val expectedFilters = Right( + List(Filter.Exclude("current-exclude"), + Filter.Include("current-include"))) val options = configOptions(ConfigOption.Source(currentSource)) val result = invoke(options) assertResult(expectedSources)(result.map(_.sources)) @@ -135,7 +138,8 @@ class ConfigurationBuilderTest extends FunSpec with TemporaryFolder { } } - describe("when source has thorp.config source to another source that does the same") { + describe( + "when source has thorp.config source to another source that does the same") { it("should only include first two sources") { withDirectory(currentSource => { withDirectory(parentSource => { @@ -143,11 +147,12 @@ class ConfigurationBuilderTest extends FunSpec with TemporaryFolder { thorpConfigFileName, s"source = $parentSource") withDirectory(grandParentSource => { - writeFile(parentSource, thorpConfigFileName, s"source = $grandParentSource") + writeFile(parentSource, + thorpConfigFileName, + s"source = $grandParentSource") val expected = Right(List(currentSource, parentSource)) - val options = configOptions( - ConfigOption.Source(currentSource), - coBucket) + val options = + configOptions(ConfigOption.Source(currentSource), coBucket) val result = invoke(options).map(_.sources.paths) assertResult(expected)(result) }) @@ -156,7 +161,11 @@ class ConfigurationBuilderTest extends FunSpec with TemporaryFolder { } } - private def invoke(configOptions: ConfigOptions) = - ConfigurationBuilder.buildConfig(configOptions).unsafeRunSync + private def invoke(configOptions: ConfigOptions) = { + val runtime = new DefaultRuntime {} + runtime.unsafeRunSync { + ConfigurationBuilder.buildConfig(configOptions) + }.toEither + } } diff --git a/core/src/test/scala/net/kemitix/thorp/core/DummyHashService.scala b/core/src/test/scala/net/kemitix/thorp/core/DummyHashService.scala index 2663b89..31b7941 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/DummyHashService.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/DummyHashService.scala @@ -2,15 +2,14 @@ package net.kemitix.thorp.core import java.nio.file.Path -import cats.effect.IO -import net.kemitix.thorp.domain.{Logger, MD5Hash} +import net.kemitix.thorp.domain.MD5Hash import net.kemitix.thorp.storage.api.HashService +import zio.Task case class DummyHashService(hashes: Map[Path, Map[String, MD5Hash]]) extends HashService { - override def hashLocalObject(path: Path)( - implicit l: Logger): IO[Map[String, MD5Hash]] = - IO.pure(hashes(path)) + override def hashLocalObject(path: Path): Task[Map[String, MD5Hash]] = + Task(hashes(path)) } diff --git a/core/src/test/scala/net/kemitix/thorp/core/DummyLogger.scala b/core/src/test/scala/net/kemitix/thorp/core/DummyLogger.scala deleted file mode 100644 index 4b2329c..0000000 --- a/core/src/test/scala/net/kemitix/thorp/core/DummyLogger.scala +++ /dev/null @@ -1,18 +0,0 @@ -package net.kemitix.thorp.core - -import cats.effect.IO -import net.kemitix.thorp.domain.Logger - -class DummyLogger extends Logger { - - override def debug(message: => String): IO[Unit] = IO.unit - - override def info(message: => String): IO[Unit] = IO.unit - - override def warn(message: String): IO[Unit] = IO.unit - - override def error(message: String): IO[Unit] = IO.unit - - override def withDebug(debug: Boolean): Logger = this - -} diff --git a/core/src/test/scala/net/kemitix/thorp/core/DummyStorageService.scala b/core/src/test/scala/net/kemitix/thorp/core/DummyStorageService.scala index b44a49c..79b313b 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/DummyStorageService.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/DummyStorageService.scala @@ -2,39 +2,39 @@ package net.kemitix.thorp.core import java.io.File -import cats.data.EitherT -import cats.effect.IO import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.api.StorageService +import zio.console.Console +import zio.{Task, TaskR} case class DummyStorageService(s3ObjectData: S3ObjectsData, uploadFiles: Map[File, (RemoteKey, MD5Hash)]) extends StorageService { - override def shutdown: IO[StorageQueueEvent] = - IO.pure(StorageQueueEvent.ShutdownQueueEvent()) + override def shutdown: Task[StorageQueueEvent] = + Task(StorageQueueEvent.ShutdownQueueEvent()) - override def listObjects(bucket: Bucket, prefix: RemoteKey)( - implicit l: Logger): EitherT[IO, String, S3ObjectsData] = - EitherT.liftF(IO.pure(s3ObjectData)) + override def listObjects(bucket: Bucket, + prefix: RemoteKey): TaskR[Console, S3ObjectsData] = + TaskR(s3ObjectData) override def upload(localFile: LocalFile, bucket: Bucket, batchMode: Boolean, uploadEventListener: UploadEventListener, - tryCount: Int): IO[StorageQueueEvent] = { + tryCount: Int): Task[StorageQueueEvent] = { val (remoteKey, md5Hash) = uploadFiles(localFile.file) - IO.pure(StorageQueueEvent.UploadQueueEvent(remoteKey, md5Hash)) + Task(StorageQueueEvent.UploadQueueEvent(remoteKey, md5Hash)) } override def copy(bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, - targetKey: RemoteKey): IO[StorageQueueEvent] = - IO.pure(StorageQueueEvent.CopyQueueEvent(targetKey)) + targetKey: RemoteKey): Task[StorageQueueEvent] = + Task(StorageQueueEvent.CopyQueueEvent(targetKey)) override def delete(bucket: Bucket, - remoteKey: RemoteKey): IO[StorageQueueEvent] = - IO.pure(StorageQueueEvent.DeleteQueueEvent(remoteKey)) + remoteKey: RemoteKey): Task[StorageQueueEvent] = + Task(StorageQueueEvent.DeleteQueueEvent(remoteKey)) } diff --git a/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala index b8f0a34..9388a01 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala @@ -5,6 +5,7 @@ import java.nio.file.Paths import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.api.HashService import org.scalatest.FunSpec +import zio.DefaultRuntime class LocalFileStreamSuite extends FunSpec { @@ -21,28 +22,34 @@ class LocalFileStreamSuite extends FunSpec { implicit private val config: Config = Config( sources = Sources(List(sourcePath))) - implicit private val logger: Logger = new DummyLogger describe("findFiles") { it("should find all files") { - val result: Set[String] = - invoke.localFiles.toSet - .map { x: LocalFile => - x.relative.toString - } - assertResult(Set("subdir/leaf-file", "root-file"))(result) + val expected = Right(Set("subdir/leaf-file", "root-file")) + val result = + invoke() + .map(_.localFiles) + .map(localFiles => localFiles.map(_.relative.toString)) + .map(_.toSet) + assertResult(expected)(result) } it("should count all files") { - val result = invoke.count - assertResult(2)(result) + val expected = Right(2) + val result = invoke().map(_.count) + assertResult(expected)(result) } it("should sum the size of all files") { - val result = invoke.totalSizeBytes - assertResult(113)(result) + val expected = Right(113) + val result = invoke().map(_.totalSizeBytes) + assertResult(expected)(result) } } - private def invoke = - LocalFileStream.findFiles(sourcePath, hashService).unsafeRunSync + private def invoke() = { + val runtime = new DefaultRuntime {} + runtime.unsafeRunSync { + LocalFileStream.findFiles(sourcePath, hashService) + }.toEither + } } diff --git a/core/src/test/scala/net/kemitix/thorp/core/MD5HashGeneratorTest.scala b/core/src/test/scala/net/kemitix/thorp/core/MD5HashGeneratorTest.scala index 1d43637..e6a091e 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/MD5HashGeneratorTest.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/MD5HashGeneratorTest.scala @@ -1,50 +1,69 @@ package net.kemitix.thorp.core -import net.kemitix.thorp.domain.MD5HashData.Root +import java.nio.file.Path + +import net.kemitix.thorp.domain.MD5HashData.{BigFile, Root} import net.kemitix.thorp.domain._ import org.scalatest.FunSpec +import zio.DefaultRuntime class MD5HashGeneratorTest extends FunSpec { + private val runtime = new DefaultRuntime {} + private val source = Resource(this, "upload") private val sourcePath = source.toPath private val prefix = RemoteKey("prefix") implicit private val config: Config = Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath))) - implicit private val logger: Logger = new DummyLogger - describe("read a small file (smaller than buffer)") { - val path = Resource(this, "upload/root-file").toPath - it("should generate the correct hash") { - val result = MD5HashGenerator.md5File(path).unsafeRunSync - assertResult(Root.hash)(result) + describe("md5File()") { + describe("read a small file (smaller than buffer)") { + val path = Resource(this, "upload/root-file").toPath + it("should generate the correct hash") { + val expected = Right(Root.hash) + val result = invoke(path) + assertResult(expected)(result) + } + } + + describe("read a large file (bigger than buffer)") { + val path = Resource(this, "big-file").toPath + it("should generate the correct hash") { + val expected = Right(BigFile.hash) + val result = invoke(path) + assertResult(expected)(result) + } + } + + def invoke(path: Path) = { + runtime.unsafeRunSync { + MD5HashGenerator.md5File(path) + }.toEither } } - describe("read a large file (bigger than buffer)") { - val path = Resource(this, "big-file").toPath - it("should generate the correct hash") { - val expected = MD5HashData.BigFile.hash - val result = MD5HashGenerator.md5File(path).unsafeRunSync - assertResult(expected)(result) + + describe("md5FileChunk") { + describe("read chunks of file") { + val path = Resource(this, "big-file").toPath + it("should generate the correct hash for first chunk of the file") { + val part1 = BigFile.Part1 + val expected = Right(part1.hash.hash) + val result = invoke(path, part1.offset, part1.size).map(_.hash) + assertResult(expected)(result) + } + it("should generate the correct hash for second chunk of the file") { + val part2 = BigFile.Part2 + val expected = Right(part2.hash.hash) + val result = invoke(path, part2.offset, part2.size).map(_.hash) + assertResult(expected)(result) + } } - } - describe("read chunks of file") { - val path = Resource(this, "big-file").toPath - it("should generate the correct hash for first chunk of the file") { - val part1 = MD5HashData.BigFile.Part1 - val expected = part1.hash - val result = MD5HashGenerator - .md5FileChunk(path, part1.offset, part1.size) - .unsafeRunSync - assertResult(expected)(result) - } - it("should generate the correcy hash for second chunk of the file") { - val part2 = MD5HashData.BigFile.Part2 - val expected = part2.hash - val result = MD5HashGenerator - .md5FileChunk(path, part2.offset, part2.size) - .unsafeRunSync - assertResult(expected)(result) + + def invoke(path: Path, offset: Long, size: Long) = { + runtime.unsafeRunSync { + MD5HashGenerator.md5FileChunk(path, offset, size) + }.toEither } } diff --git a/core/src/test/scala/net/kemitix/thorp/core/ParseConfigFileTest.scala b/core/src/test/scala/net/kemitix/thorp/core/ParseConfigFileTest.scala index e78d10b..e3cb0df 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/ParseConfigFileTest.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/ParseConfigFileTest.scala @@ -3,13 +3,11 @@ package net.kemitix.thorp.core import java.nio.file.{Path, Paths} import org.scalatest.FunSpec +import zio.DefaultRuntime class ParseConfigFileTest extends FunSpec { - private val empty = ConfigOptions() - - private def invoke(filename: Path) = - ParseConfigFile.parseFile(filename).unsafeRunSync + private val empty = Right(ConfigOptions()) describe("parse a missing file") { val filename = Paths.get("/path/to/missing/file") @@ -31,11 +29,18 @@ class ParseConfigFileTest extends FunSpec { } describe("parse a file with properties") { val filename = Resource(this, "simple-config").toPath - val expected = ConfigOptions( - List(ConfigOption.Source(Paths.get("/path/to/source")), - ConfigOption.Bucket("bucket-name"))) + val expected = Right( + ConfigOptions(List(ConfigOption.Source(Paths.get("/path/to/source")), + ConfigOption.Bucket("bucket-name")))) it("should return some options") { assertResult(expected)(invoke(filename)) } } + + private def invoke(filename: Path) = { + val runtime = new DefaultRuntime {} + runtime.unsafeRunSync { + ParseConfigFile.parseFile(filename) + }.toEither + } } diff --git a/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala b/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala index 724deba..05fa48c 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala @@ -7,12 +7,15 @@ import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.api.{HashService, StorageService} import org.scalatest.FreeSpec +import zio.DefaultRuntime class PlanBuilderTest extends FreeSpec with TemporaryFolder { - val lastModified: LastModified = LastModified() - private val planBuilder = new PlanBuilder {} - private implicit val logger: Logger = new DummyLogger - private val emptyS3ObjectData = S3ObjectsData() + + private val runtime = new DefaultRuntime {} + + private val lastModified: LastModified = LastModified() + private val planBuilder = new PlanBuilder {} + private val emptyS3ObjectData = S3ObjectsData() "create a plan" - { @@ -426,7 +429,14 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { } def md5Hash(file: File) = { - hashService.hashLocalObject(file.toPath).unsafeRunSync()("md5") + runtime + .unsafeRunSync { + hashService.hashLocalObject(file.toPath).map(_.get("md5")) + } + .toEither + .toOption + .flatten + .getOrElse(MD5Hash("invalid md5 hash in test")) } } @@ -450,14 +460,17 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { private def configOptions(configOptions: ConfigOption*): ConfigOptions = ConfigOptions(List(configOptions: _*)) - private def invoke(storageService: StorageService, - hashService: HashService, - configOptions: ConfigOptions) - : Either[List[String], List[(String, String, String, String, String)]] = - planBuilder - .createPlan(storageService, hashService, configOptions) - .value - .unsafeRunSync() + private def invoke( + storageService: StorageService, + hashService: HashService, + configOptions: ConfigOptions + ): Either[Any, List[(String, String, String, String, String)]] = + runtime + .unsafeRunSync { + planBuilder + .createPlan(storageService, hashService, configOptions) + } + .toEither .map(_.actions.toList.map({ case ToUpload(_, lf, _) => ("upload", diff --git a/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala index c1c0ffc..dce6d89 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala @@ -4,8 +4,6 @@ import java.io.File import java.nio.file.Paths import java.time.Instant -import cats.data.EitherT -import cats.effect.IO import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.domain.MD5HashData.{Leaf, Root} import net.kemitix.thorp.domain.StorageQueueEvent.{ @@ -17,8 +15,13 @@ import net.kemitix.thorp.domain.StorageQueueEvent.{ import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.api.{HashService, StorageService} import org.scalatest.FunSpec +import zio.console.Console +import zio.{DefaultRuntime, Task, TaskR} class SyncSuite extends FunSpec { + + private val runtime = new DefaultRuntime {} + private val testBucket = Bucket("bucket") private val source = Resource(this, "upload") private val sourcePath = source.toPath @@ -30,7 +33,6 @@ class SyncSuite extends FunSpec { md5HashMap(Root.hash), sourcePath, _ => rootRemoteKey) - implicit private val logger: Logger = new DummyLogger private val leafFile: LocalFile = LocalFile.resolve("subdir/leaf-file", md5HashMap(Leaf.hash), @@ -58,24 +60,6 @@ class SyncSuite extends FunSpec { localFile: LocalFile): (String, String, File) = (bucket.name, remoteKey.key, localFile.file) - def invokeSubjectForActions( - storageService: StorageService, - hashService: HashService, - configOptions: ConfigOptions): Either[List[String], Stream[Action]] = { - invokeSubject(storageService, hashService, configOptions) - .map(_.actions) - } - - def invokeSubject( - storageService: StorageService, - hashService: HashService, - configOptions: ConfigOptions): Either[List[String], SyncPlan] = { - PlanBuilder - .createPlan(storageService, hashService, configOptions) - .value - .unsafeRunSync - } - private def md5HashMap(md5Hash: MD5Hash): Map[String, MD5Hash] = Map("md5" -> md5Hash) @@ -211,28 +195,45 @@ class SyncSuite extends FunSpec { s3ObjectsData: S3ObjectsData) extends StorageService { - override def listObjects(bucket: Bucket, prefix: RemoteKey)( - implicit l: Logger): EitherT[IO, String, S3ObjectsData] = - EitherT.liftF(IO.pure(s3ObjectsData)) + override def listObjects(bucket: Bucket, + prefix: RemoteKey): TaskR[Console, S3ObjectsData] = + TaskR(s3ObjectsData) override def upload(localFile: LocalFile, bucket: Bucket, batchMode: Boolean, uploadEventListener: UploadEventListener, - tryCount: Int): IO[UploadQueueEvent] = - IO.pure(UploadQueueEvent(localFile.remoteKey, localFile.hashes("md5"))) + tryCount: Int): Task[UploadQueueEvent] = + Task(UploadQueueEvent(localFile.remoteKey, localFile.hashes("md5"))) override def copy(bucket: Bucket, sourceKey: RemoteKey, hashes: MD5Hash, - targetKey: RemoteKey): IO[CopyQueueEvent] = - IO.pure(CopyQueueEvent(targetKey)) + targetKey: RemoteKey): Task[CopyQueueEvent] = + Task(CopyQueueEvent(targetKey)) override def delete(bucket: Bucket, - remoteKey: RemoteKey): IO[DeleteQueueEvent] = - IO.pure(DeleteQueueEvent(remoteKey)) + remoteKey: RemoteKey): Task[DeleteQueueEvent] = + Task(DeleteQueueEvent(remoteKey)) - override def shutdown: IO[StorageQueueEvent] = - IO.pure(ShutdownQueueEvent()) + override def shutdown: Task[StorageQueueEvent] = + Task(ShutdownQueueEvent()) + } + + def invokeSubjectForActions( + storageService: StorageService, + hashService: HashService, + configOptions: ConfigOptions): Either[Any, Stream[Action]] = { + invoke(storageService, hashService, configOptions) + .map(_.actions) + } + + def invoke(storageService: StorageService, + hashService: HashService, + configOptions: ConfigOptions): Either[Any, SyncPlan] = { + runtime.unsafeRunSync { + PlanBuilder + .createPlan(storageService, hashService, configOptions) + }.toEither } } diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/Logger.scala b/domain/src/main/scala/net/kemitix/thorp/domain/Logger.scala index 6e13b78..8862d74 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/Logger.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/Logger.scala @@ -1,7 +1,5 @@ package net.kemitix.thorp.domain -import cats.effect.IO - trait Logger { // returns an instance of Logger with debug set as indicated @@ -9,9 +7,9 @@ trait Logger { // it returns itself, unmodified def withDebug(debug: Boolean): Logger - def debug(message: => String): IO[Unit] - def info(message: => String): IO[Unit] - def warn(message: String): IO[Unit] - def error(message: String): IO[Unit] + def debug(message: => String): Unit + def info(message: => String): Unit + def warn(message: String): Unit + def error(message: String): Unit } diff --git a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/HashService.scala b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/HashService.scala index a7325b0..6b2c022 100644 --- a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/HashService.scala +++ b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/HashService.scala @@ -2,14 +2,14 @@ package net.kemitix.thorp.storage.api import java.nio.file.Path -import cats.effect.IO -import net.kemitix.thorp.domain.{Logger, MD5Hash} +import net.kemitix.thorp.domain.MD5Hash +import zio.Task /** * Creates one, or more, hashes for local objects. */ trait HashService { - def hashLocalObject(path: Path)(implicit l: Logger): IO[Map[String, MD5Hash]] + def hashLocalObject(path: Path): Task[Map[String, MD5Hash]] } diff --git a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala index 92d5b2e..0e38d45 100644 --- a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala +++ b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala @@ -1,17 +1,17 @@ package net.kemitix.thorp.storage.api -import cats.data.EitherT -import cats.effect.IO import net.kemitix.thorp.domain._ +import zio.console.Console +import zio.{Task, TaskR} trait StorageService { - def shutdown: IO[StorageQueueEvent] + def shutdown: Task[StorageQueueEvent] def listObjects( bucket: Bucket, prefix: RemoteKey - )(implicit l: Logger): EitherT[IO, String, S3ObjectsData] + ): TaskR[Console, S3ObjectsData] def upload( localFile: LocalFile, @@ -19,18 +19,18 @@ trait StorageService { batchMode: Boolean, uploadEventListener: UploadEventListener, tryCount: Int - ): IO[StorageQueueEvent] + ): Task[StorageQueueEvent] def copy( bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey - ): IO[StorageQueueEvent] + ): Task[StorageQueueEvent] def delete( bucket: Bucket, remoteKey: RemoteKey - ): IO[StorageQueueEvent] + ): Task[StorageQueueEvent] } diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Copier.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Copier.scala index 55bef98..61f8b51 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Copier.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Copier.scala @@ -1,10 +1,10 @@ package net.kemitix.thorp.storage.aws -import cats.effect.IO import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.CopyObjectRequest import net.kemitix.thorp.domain.StorageQueueEvent.CopyQueueEvent import net.kemitix.thorp.domain._ +import zio.Task class Copier(amazonS3: AmazonS3) { @@ -13,7 +13,7 @@ class Copier(amazonS3: AmazonS3) { sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey - ): IO[StorageQueueEvent] = + ): Task[StorageQueueEvent] = for { _ <- copyObject(bucket, sourceKey, hash, targetKey) } yield CopyQueueEvent(targetKey) @@ -31,7 +31,7 @@ class Copier(amazonS3: AmazonS3) { bucket.name, targetKey.key ).withMatchingETagConstraint(hash.hash) - IO(amazonS3.copyObject(request)) + Task(amazonS3.copyObject(request)) } } diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Deleter.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Deleter.scala index 05ff619..8a1f84e 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Deleter.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Deleter.scala @@ -1,17 +1,17 @@ package net.kemitix.thorp.storage.aws -import cats.effect.IO import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.DeleteObjectRequest import net.kemitix.thorp.domain.StorageQueueEvent.DeleteQueueEvent import net.kemitix.thorp.domain.{Bucket, RemoteKey, StorageQueueEvent} +import zio.Task class Deleter(amazonS3: AmazonS3) { def delete( bucket: Bucket, remoteKey: RemoteKey - ): IO[StorageQueueEvent] = + ): Task[StorageQueueEvent] = for { _ <- deleteObject(bucket, remoteKey) } yield DeleteQueueEvent(remoteKey) @@ -21,7 +21,7 @@ class Deleter(amazonS3: AmazonS3) { remoteKey: RemoteKey ) = { val request = new DeleteObjectRequest(bucket.name, remoteKey.key) - IO(amazonS3.deleteObject(request)) + Task(amazonS3.deleteObject(request)) } } diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ETagGenerator.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ETagGenerator.scala index e0a68a3..69f9fe9 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ETagGenerator.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ETagGenerator.scala @@ -2,25 +2,25 @@ package net.kemitix.thorp.storage.aws import java.nio.file.Path -import cats.effect.IO -import cats.implicits._ import com.amazonaws.services.s3.model.PutObjectRequest import com.amazonaws.services.s3.transfer.TransferManagerConfiguration import com.amazonaws.services.s3.transfer.internal.TransferManagerUtils import net.kemitix.thorp.core.MD5HashGenerator -import net.kemitix.thorp.domain.{Logger, MD5Hash} +import net.kemitix.thorp.domain.MD5Hash +import zio.Task trait ETagGenerator { def eTag( path: Path - )(implicit l: Logger): IO[String] = { + ): Task[String] = { val partSize = calculatePartSize(path) val parts = numParts(path.toFile.length, partSize) - partsIndex(parts) - .map(digestChunk(path, partSize)) - .sequence - .map(concatenateDigests) + Task + .foreach(partsIndex(parts)) { chunkNumber => + digestChunk(path, partSize)(chunkNumber) + } + .map(parts => concatenateDigests(parts)) .map(MD5HashGenerator.hex) .map(hash => s"$hash-$parts") } @@ -53,14 +53,14 @@ trait ETagGenerator { chunkSize: Long )( chunkNumber: Long - )(implicit l: Logger): IO[Array[Byte]] = + ): Task[Array[Byte]] = hashChunk(path, chunkNumber, chunkSize).map(_.digest) def hashChunk( path: Path, chunkNumber: Long, chunkSize: Long - )(implicit l: Logger): IO[MD5Hash] = + ): Task[MD5Hash] = MD5HashGenerator.md5FileChunk(path, chunkNumber * chunkSize, chunkSize) def offsets( diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala index f508d7e..772a5c2 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala @@ -1,17 +1,15 @@ package net.kemitix.thorp.storage.aws -import cats.data.EitherT -import cats.effect.IO -import cats.implicits._ import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary} import net.kemitix.thorp.domain -import net.kemitix.thorp.domain.{Bucket, Logger, RemoteKey, S3ObjectsData} +import net.kemitix.thorp.domain.{Bucket, RemoteKey, S3ObjectsData} import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey +import zio.console.Console +import zio.{IO, Task, TaskR, ZIO} import scala.collection.JavaConverters._ -import scala.util.Try class Lister(amazonS3: AmazonS3) { @@ -21,7 +19,7 @@ class Lister(amazonS3: AmazonS3) { def listObjects( bucket: Bucket, prefix: RemoteKey - )(implicit l: Logger): EitherT[IO, String, S3ObjectsData] = { + ): TaskR[Console, S3ObjectsData] = { val requestMore = (token: Token) => new ListObjectsV2Request() @@ -29,26 +27,23 @@ class Lister(amazonS3: AmazonS3) { .withPrefix(prefix.key) .withContinuationToken(token) - def fetchBatch: ListObjectsV2Request => EitherT[IO, String, Batch] = + def fetchBatch: ListObjectsV2Request => TaskR[Console, Batch] = request => - EitherT { - for { - _ <- ListerLogger.logFetchBatch - batch <- tryFetchBatch(request) - } yield batch - } + for { + _ <- ListerLogger.logFetchBatch + batch <- tryFetchBatch(request) + } yield batch def fetchMore( more: Option[Token] - ): EitherT[IO, String, Stream[S3ObjectSummary]] = { + ): TaskR[Console, Stream[S3ObjectSummary]] = { more match { - case None => EitherT.right(IO.pure(Stream.empty)) + case None => ZIO.succeed(Stream.empty) case Some(token) => fetch(requestMore(token)) } } - def fetch - : ListObjectsV2Request => EitherT[IO, String, Stream[S3ObjectSummary]] = + def fetch: ListObjectsV2Request => TaskR[Console, Stream[S3ObjectSummary]] = request => { for { batch <- fetchBatch(request) @@ -67,17 +62,12 @@ class Lister(amazonS3: AmazonS3) { private def tryFetchBatch( request: ListObjectsV2Request - ): IO[Either[String, (Stream[S3ObjectSummary], Option[Token])]] = { - IO { - Try(amazonS3.listObjectsV2(request)) - .map { result => - val more: Option[Token] = - if (result.isTruncated) Some(result.getNextContinuationToken) - else None - (result.getObjectSummaries.asScala.toStream, more) - } - .toEither - .leftMap(e => e.getMessage) - } - } + ): Task[(Stream[S3ObjectSummary], Option[Token])] = + IO(amazonS3.listObjectsV2(request)) + .map { result => + val more: Option[Token] = + if (result.isTruncated) Some(result.getNextContinuationToken) + else None + (result.getObjectSummaries.asScala.toStream, more) + } } diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ListerLogger.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ListerLogger.scala index c07548e..1bd49d5 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ListerLogger.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ListerLogger.scala @@ -1,10 +1,10 @@ package net.kemitix.thorp.storage.aws -import cats.effect.IO -import net.kemitix.thorp.domain.Logger +import zio.TaskR +import zio.console._ trait ListerLogger { - def logFetchBatch(implicit l: Logger): IO[Unit] = - l.info("Fetching remote summaries...") + def logFetchBatch: TaskR[Console, Unit] = + putStrLn("Fetching remote summaries...") } object ListerLogger extends ListerLogger diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3HashService.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3HashService.scala index 495fb7b..644ca33 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3HashService.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3HashService.scala @@ -2,10 +2,10 @@ package net.kemitix.thorp.storage.aws import java.nio.file.Path -import cats.effect.IO import net.kemitix.thorp.core.MD5HashGenerator -import net.kemitix.thorp.domain.{Logger, MD5Hash} +import net.kemitix.thorp.domain.MD5Hash import net.kemitix.thorp.storage.api.HashService +import zio.Task trait S3HashService extends HashService { @@ -17,7 +17,7 @@ trait S3HashService extends HashService { */ override def hashLocalObject( path: Path - )(implicit l: Logger): IO[Map[String, MD5Hash]] = + ): Task[Map[String, MD5Hash]] = for { md5 <- MD5HashGenerator.md5File(path) etag <- ETagGenerator.eTag(path).map(MD5Hash(_)) diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala index d6e57d7..bdfa15d 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala @@ -1,11 +1,11 @@ package net.kemitix.thorp.storage.aws -import cats.data.EitherT -import cats.effect.IO import com.amazonaws.services.s3.AmazonS3 import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.api.StorageService +import zio.console.Console +import zio.{Task, TaskR} class S3StorageService( amazonS3Client: => AmazonS3, @@ -20,7 +20,7 @@ class S3StorageService( override def listObjects( bucket: Bucket, prefix: RemoteKey - )(implicit l: Logger): EitherT[IO, String, S3ObjectsData] = + ): TaskR[Console, S3ObjectsData] = objectLister.listObjects(bucket, prefix) override def copy( @@ -28,7 +28,7 @@ class S3StorageService( sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey - ): IO[StorageQueueEvent] = + ): Task[StorageQueueEvent] = copier.copy(bucket, sourceKey, hash, targetKey) override def upload( @@ -37,17 +37,17 @@ class S3StorageService( batchMode: Boolean, uploadEventListener: UploadEventListener, tryCount: Int - ): IO[StorageQueueEvent] = + ): Task[StorageQueueEvent] = uploader.upload(localFile, bucket, batchMode, uploadEventListener, 1) override def delete( bucket: Bucket, remoteKey: RemoteKey - ): IO[StorageQueueEvent] = + ): Task[StorageQueueEvent] = deleter.delete(bucket, remoteKey) - override def shutdown: IO[StorageQueueEvent] = - IO { + override def shutdown: Task[StorageQueueEvent] = + Task { amazonTransferManager.shutdownNow(true) amazonS3Client.shutdown() ShutdownQueueEvent() diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala index b4f46cb..3d6e84e 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala @@ -1,9 +1,7 @@ package net.kemitix.thorp.storage.aws -import cats.effect.IO import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener} import com.amazonaws.services.s3.model.{ObjectMetadata, PutObjectRequest} -import com.amazonaws.services.s3.transfer.model.UploadResult import net.kemitix.thorp.domain.StorageQueueEvent.{ ErrorQueueEvent, UploadQueueEvent @@ -14,8 +12,7 @@ import net.kemitix.thorp.domain.UploadEvent.{ TransferEvent } import net.kemitix.thorp.domain.{StorageQueueEvent, _} - -import scala.util.Try +import zio.Task class Uploader(transferManager: => AmazonTransferManager) { @@ -25,29 +22,24 @@ class Uploader(transferManager: => AmazonTransferManager) { batchMode: Boolean, uploadEventListener: UploadEventListener, tryCount: Int - ): IO[StorageQueueEvent] = + ): Task[StorageQueueEvent] = for { upload <- transfer(localFile, bucket, batchMode, uploadEventListener) - action = upload match { - case Right(r) => - UploadQueueEvent(RemoteKey(r.getKey), MD5Hash(r.getETag)) - case Left(e) => ErrorQueueEvent(localFile.remoteKey, e) - } - } yield action + } yield upload private def transfer( localFile: LocalFile, bucket: Bucket, batchMode: Boolean, uploadEventListener: UploadEventListener - ): IO[Either[Throwable, UploadResult]] = { + ): Task[StorageQueueEvent] = { val listener: ProgressListener = progressListener(uploadEventListener) val putObjectRequest = request(localFile, bucket, batchMode, listener) - IO { - Try(transferManager.upload(putObjectRequest)) - .map(_.waitForUploadResult) - .toEither - } + Task(transferManager.upload(putObjectRequest)) + .map(_.waitForUploadResult) + .map(upload => + UploadQueueEvent(RemoteKey(upload.getKey), MD5Hash(upload.getETag))) + .catchAll(e => Task.succeed(ErrorQueueEvent(localFile.remoteKey, e))) } private def request( diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/DummyLogger.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/DummyLogger.scala deleted file mode 100644 index 9600a3f..0000000 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/DummyLogger.scala +++ /dev/null @@ -1,18 +0,0 @@ -package net.kemitix.thorp.storage.aws - -import cats.effect.IO -import net.kemitix.thorp.domain.Logger - -class DummyLogger extends Logger { - - override def debug(message: => String): IO[Unit] = IO.unit - - override def info(message: => String): IO[Unit] = IO.unit - - override def warn(message: String): IO[Unit] = IO.unit - - override def error(message: String): IO[Unit] = IO.unit - - override def withDebug(debug: Boolean): Logger = this - -} diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ETagGeneratorTest.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ETagGeneratorTest.scala index 96625f0..eb901bc 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ETagGeneratorTest.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ETagGeneratorTest.scala @@ -1,18 +1,21 @@ package net.kemitix.thorp.storage.aws +import java.nio.file.Path + import com.amazonaws.services.s3.transfer.TransferManagerConfiguration import net.kemitix.thorp.core.Resource -import net.kemitix.thorp.domain.MD5Hash import org.scalatest.FunSpec +import zio.DefaultRuntime class ETagGeneratorTest extends FunSpec { + private val runtime = new DefaultRuntime {} + private val bigFile = Resource(this, "big-file") private val bigFilePath = bigFile.toPath private val configuration = new TransferManagerConfiguration private val chunkSize = 1200000 configuration.setMinimumUploadPartSize(chunkSize) - private val logger = new DummyLogger describe("Create offsets") { it("should create offsets") { @@ -25,10 +28,6 @@ class ETagGeneratorTest extends FunSpec { } } - def test(expected: String, result: MD5Hash): Unit = { - assertResult(expected)(result.hash) - } - describe("create md5 hash for each chunk") { it("should create expected hash for chunks") { val md5Hashes = List( @@ -40,20 +39,26 @@ class ETagGeneratorTest extends FunSpec { ).zipWithIndex md5Hashes.foreach { case (hash, index) => - test(hash, - ETagGenerator - .hashChunk(bigFilePath, index, chunkSize)(logger) - .unsafeRunSync) + assertResult(Right(hash))( + invoke(bigFilePath, index, chunkSize).map(_.hash)) } } + def invoke(path: Path, index: Long, size: Long) = + runtime.unsafeRunSync { + ETagGenerator.hashChunk(path, index, size) + }.toEither } describe("create etag for whole file") { val expected = "f14327c90ad105244c446c498bfe9a7d-2" it("should match aws etag for the file") { - val result = ETagGenerator.eTag(bigFilePath)(logger).unsafeRunSync - assertResult(expected)(result) + val result = invoke(bigFilePath) + assertResult(Right(expected))(result) } + def invoke(path: Path) = + runtime.unsafeRunSync { + ETagGenerator.eTag(path) + }.toEither } } diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3StorageServiceSuite.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3StorageServiceSuite.scala index 5caa112..f6ff943 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3StorageServiceSuite.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3StorageServiceSuite.scala @@ -14,16 +14,18 @@ import net.kemitix.thorp.core.Resource import net.kemitix.thorp.domain._ import org.scalamock.scalatest.MockFactory import org.scalatest.FunSpec +import zio.DefaultRuntime class S3StorageServiceSuite extends FunSpec with MockFactory { + private val runtime = new DefaultRuntime {} + describe("listObjectsInPrefix") { val source = Resource(this, "upload") val sourcePath = source.toPath val prefix = RemoteKey("prefix") implicit val config: Config = Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath))) - implicit val implLogger: Logger = new DummyLogger val lm = LastModified(Instant.now.truncatedTo(ChronoUnit.MILLIS)) @@ -73,12 +75,15 @@ class S3StorageServiceSuite extends FunSpec with MockFactory { k1b -> HashModified(h1, lm), k2 -> HashModified(h2, lm)) )) - val result = storageService - .listObjects(Bucket("bucket"), RemoteKey("prefix")) - .value - .unsafeRunSync + val result = invoke(storageService) assertResult(expected)(result) } } + private def invoke(storageService: S3StorageService) = + runtime.unsafeRunSync { + storageService + .listObjects(Bucket("bucket"), RemoteKey("prefix")) + }.toEither + } diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala index 0f77280..71764c7 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala @@ -20,7 +20,6 @@ class StorageServiceSuite extends FunSpec with MockFactory { private val prefix = RemoteKey("prefix") implicit private val config: Config = Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath))) - implicit private val implLogger: Logger = new DummyLogger private val fileToKey = KeyGenerator.generateKey(config.sources, config.prefix) _ diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala index ad2f4ad..0f6909c 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala @@ -17,8 +17,7 @@ class UploaderSuite extends FunSpec with MockFactory { private val prefix = RemoteKey("prefix") implicit private val config: Config = Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath))) - private val fileToKey = generateKey(config.sources, config.prefix) _ - implicit private val implLogger: Logger = new DummyLogger + private val fileToKey = generateKey(config.sources, config.prefix) _ def md5HashMap(hash: MD5Hash): Map[String, MD5Hash] = Map("md5" -> hash)