Replace cats-effect with zio (#117)

* [sbt] Add ZIO dependency to storage-api

* Convert to use ZIO

* [sbt] remove cats-effect dependency

* [changelog] updated

* [cli] Program restore actions to correct order (copy, upload, delete)

* [cli] Program You should not name methods after their defining object

* [core] ConfigValidationException Redundant braces after class definition

* [core] LocalFileStreamSuite Usage of get on optional type.

* [core] PlanBuilderTest Usage of get on optional type.
This commit is contained in:
Paul Campbell 2019-07-21 21:02:04 +01:00 committed by GitHub
parent 32ef58ff11
commit 8cca46340c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
48 changed files with 603 additions and 567 deletions

View file

@ -6,6 +6,12 @@ The format is based on [[https://keepachangelog.com/en/1.0.0/][Keep a Changelog]
[[https://semver.org/spec/v2.0.0.html][Semantic Versioning]]. [[https://semver.org/spec/v2.0.0.html][Semantic Versioning]].
* [0.8.0] - 2019-??-??
** Changed
- Replace cats-effect with zio (#117)
* [0.7.2] - 2019-07-19 * [0.7.2] - 2019-07-19
** Changed ** Changed

View file

@ -41,7 +41,7 @@ val testDependencies = Seq(
val domainDependencies = Seq( val domainDependencies = Seq(
libraryDependencies ++= Seq( libraryDependencies ++= Seq(
"com.github.julien-truffaut" %% "monocle-core" % "1.6.0", "com.github.julien-truffaut" %% "monocle-core" % "1.6.0",
"com.github.julien-truffaut" %% "monocle-macro" % "1.6.0", "com.github.julien-truffaut" %% "monocle-macro" % "1.6.0"
) )
) )
val commandLineParsing = Seq( val commandLineParsing = Seq(
@ -57,9 +57,9 @@ val awsSdkDependencies = Seq(
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % "2.9.9" "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % "2.9.9"
) )
) )
val catsEffectsSettings = Seq( val zioDependencies = Seq(
libraryDependencies ++= Seq( libraryDependencies ++= Seq (
"org.typelevel" %% "cats-effect" % "1.3.1" "dev.zio" %% "zio" % "1.0.0-RC10-1"
) )
) )
@ -110,6 +110,7 @@ lazy val core = (project in file("core"))
lazy val `storage-api` = (project in file("storage-api")) lazy val `storage-api` = (project in file("storage-api"))
.settings(commonSettings) .settings(commonSettings)
.settings(zioDependencies)
.settings(assemblyJarName in assembly := "storage-api.jar") .settings(assemblyJarName in assembly := "storage-api.jar")
.dependsOn(domain) .dependsOn(domain)
@ -117,5 +118,4 @@ lazy val domain = (project in file("domain"))
.settings(commonSettings) .settings(commonSettings)
.settings(domainDependencies) .settings(domainDependencies)
.settings(assemblyJarName in assembly := "domain.jar") .settings(assemblyJarName in assembly := "domain.jar")
.settings(catsEffectsSettings)
.settings(testDependencies) .settings(testDependencies)

View file

@ -1,20 +1,14 @@
package net.kemitix.thorp.cli package net.kemitix.thorp.cli
import cats.effect.ExitCase.{Canceled, Completed, Error} import zio.{App, ZIO}
import cats.effect.{ExitCode, IO, IOApp}
object Main extends IOApp { object Main extends App {
override def run(args: List[String]): IO[ExitCode] = { override def run(args: List[String]): ZIO[Environment, Nothing, Int] = {
val exitCaseLogger = new PrintLogger(false) for {
ParseArgs(args) cliOptions <- ParseArgs(args)
.map(Program.run) _ <- Program.run(cliOptions)
.getOrElse(IO(ExitCode.Error)) } yield ()
.guaranteeCase { }.fold(failure => 1, success => 0)
case Canceled => exitCaseLogger.warn("Interrupted")
case Error(e) => exitCaseLogger.error(e.getMessage)
case Completed => IO.unit
}
}
} }

View file

@ -4,9 +4,17 @@ import java.nio.file.Paths
import net.kemitix.thorp.core.{ConfigOption, ConfigOptions} import net.kemitix.thorp.core.{ConfigOption, ConfigOptions}
import scopt.OParser import scopt.OParser
import zio.Task
object ParseArgs { object ParseArgs {
def apply(args: List[String]): Task[ConfigOptions] = Task {
OParser
.parse(configParser, args, List())
.map(ConfigOptions(_))
.getOrElse(ConfigOptions())
}
val configParser: OParser[Unit, List[ConfigOption]] = { val configParser: OParser[Unit, List[ConfigOption]] = {
val parserBuilder = OParser.builder[List[ConfigOption]] val parserBuilder = OParser.builder[List[ConfigOption]]
import parserBuilder._ import parserBuilder._
@ -49,9 +57,4 @@ object ParseArgs {
) )
} }
def apply(args: List[String]): Option[ConfigOptions] =
OParser
.parse(configParser, args, List())
.map(ConfigOptions(_))
} }

View file

@ -1,25 +0,0 @@
package net.kemitix.thorp.cli
import cats.effect.IO
import net.kemitix.thorp.domain.Logger
class PrintLogger(isDebug: Boolean = false) extends Logger {
override def debug(message: => String): IO[Unit] =
if (isDebug) IO(println(s"[ DEBUG] $message"))
else IO.unit
override def info(message: => String): IO[Unit] =
IO(println(s"[ INFO] $message"))
override def warn(message: String): IO[Unit] =
IO(println(s"[ WARN] $message"))
override def error(message: String): IO[Unit] =
IO(println(s"[ ERROR] $message"))
override def withDebug(debug: Boolean): Logger =
if (isDebug == debug) this
else new PrintLogger(debug)
}

View file

@ -1,74 +1,79 @@
package net.kemitix.thorp.cli package net.kemitix.thorp.cli
import cats.effect.{ExitCode, IO}
import cats.implicits._
import net.kemitix.thorp.core._ import net.kemitix.thorp.core._
import net.kemitix.thorp.domain.{Logger, StorageQueueEvent} import net.kemitix.thorp.domain.{StorageQueueEvent, SyncTotals}
import net.kemitix.thorp.storage.aws.S3HashService.defaultHashService import net.kemitix.thorp.storage.aws.S3HashService.defaultHashService
import net.kemitix.thorp.storage.aws.S3StorageServiceBuilder.defaultStorageService import net.kemitix.thorp.storage.aws.S3StorageServiceBuilder.defaultStorageService
import zio.console._
import zio.{Task, TaskR, ZIO}
trait Program extends PlanBuilder { trait Program extends PlanBuilder {
def run(cliOptions: ConfigOptions): IO[ExitCode] = { lazy val version = s"Thorp v${thorp.BuildInfo.version}"
implicit val logger: Logger = new PrintLogger()
if (ConfigQuery.showVersion(cliOptions)) def run(cliOptions: ConfigOptions): ZIO[Console, Nothing, Unit] = {
val showVersion = ConfigQuery.showVersion(cliOptions)
for { for {
_ <- logger.info(s"Thorp v${thorp.BuildInfo.version}") _ <- ZIO.when(showVersion)(putStrLn(version))
} yield ExitCode.Success _ <- ZIO.when(!showVersion)(execute(cliOptions).catchAll(handleErrors))
else } yield ()
}
private def execute(
cliOptions: ConfigOptions): ZIO[Console, Throwable, Unit] = {
for { for {
syncPlan <- createPlan( plan <- createPlan(defaultStorageService, defaultHashService, cliOptions)
defaultStorageService, archive <- thorpArchive(cliOptions, plan.syncTotals)
defaultHashService, events <- handleActions(archive, plan)
cliOptions
).valueOrF(handleErrors)
archive <- thorpArchive(cliOptions, syncPlan)
events <- handleActions(archive, syncPlan)
_ <- defaultStorageService.shutdown _ <- defaultStorageService.shutdown
_ <- SyncLogging.logRunFinished(events) _ <- SyncLogging.logRunFinished(events)
} yield ExitCode.Success } yield ()
} }
private def handleErrors(throwable: Throwable): ZIO[Console, Nothing, Unit] =
for {
_ <- putStrLn("There were errors:")
_ <- throwable match {
case ConfigValidationException(errors) =>
ZIO.foreach(errors)(error => putStrLn(s"- $error"))
case x => throw x
}
} yield ()
def thorpArchive( def thorpArchive(
cliOptions: ConfigOptions, cliOptions: ConfigOptions,
syncPlan: SyncPlan syncTotals: SyncTotals
): IO[ThorpArchive] = ): Task[ThorpArchive] = Task {
IO.pure(
UnversionedMirrorArchive.default( UnversionedMirrorArchive.default(
defaultStorageService, defaultStorageService,
ConfigQuery.batchMode(cliOptions), ConfigQuery.batchMode(cliOptions),
syncPlan.syncTotals syncTotals
)) )
private def handleErrors(
implicit logger: Logger
): List[String] => IO[SyncPlan] = errors => {
for {
_ <- logger.error("There were errors:")
_ <- errors.map(error => logger.error(s" - $error")).sequence
} yield SyncPlan()
} }
private def handleActions( private def handleActions(
archive: ThorpArchive, archive: ThorpArchive,
syncPlan: SyncPlan syncPlan: SyncPlan
)(implicit l: Logger): IO[Stream[StorageQueueEvent]] = { ): TaskR[Console, Stream[StorageQueueEvent]] = {
type Accumulator = (Stream[IO[StorageQueueEvent]], Long) type Accumulator = (Stream[StorageQueueEvent], Long)
val zero: Accumulator = (Stream(), syncPlan.syncTotals.totalSizeBytes) val zero: Accumulator = (Stream(), syncPlan.syncTotals.totalSizeBytes)
val (actions, _) = syncPlan.actions.zipWithIndex.reverse TaskR
.foldLeft(zero) { (acc: Accumulator, indexedAction) => .foldLeft(syncPlan.actions.reverse.zipWithIndex)(zero)(
{ (acc, indexedAction) => {
val (stream, bytesToDo) = acc
val (action, index) = indexedAction val (action, index) = indexedAction
val (stream, bytesToDo) = acc
val remainingBytes = bytesToDo - action.size val remainingBytes = bytesToDo - action.size
( (for {
archive.update(index, action, remainingBytes) ++ stream, event <- archive.update(index, action, remainingBytes)
remainingBytes events = stream ++ Stream(event)
) } yield events)
.map((_, remainingBytes))
})
.map {
case (events, _) => events
} }
} }
actions.sequence
}
} }
object Program extends Program object Program extends Program

View file

@ -5,16 +5,19 @@ import java.nio.file.Paths
import net.kemitix.thorp.core.ConfigOption.Debug import net.kemitix.thorp.core.ConfigOption.Debug
import net.kemitix.thorp.core.{ConfigOptions, ConfigQuery, Resource} import net.kemitix.thorp.core.{ConfigOptions, ConfigQuery, Resource}
import org.scalatest.FunSpec import org.scalatest.FunSpec
import zio.DefaultRuntime
import scala.util.Try import scala.util.Try
class ParseArgsTest extends FunSpec { class ParseArgsTest extends FunSpec {
private val runtime = new DefaultRuntime {}
val source = Resource(this, "") val source = Resource(this, "")
describe("parse - source") { describe("parse - source") {
def invokeWithSource(path: String) = def invokeWithSource(path: String) =
ParseArgs(List("--source", path, "--bucket", "bucket")) invoke(List("--source", path, "--bucket", "bucket"))
describe("when source is a directory") { describe("when source is a directory") {
it("should succeed") { it("should succeed") {
@ -31,7 +34,7 @@ class ParseArgsTest extends FunSpec {
List("--source", "path1", "--source", "path2", "--bucket", "bucket") List("--source", "path1", "--source", "path2", "--bucket", "bucket")
it("should get multiple sources") { it("should get multiple sources") {
val expected = Some(Set("path1", "path2").map(Paths.get(_))) val expected = Some(Set("path1", "path2").map(Paths.get(_)))
val configOptions = ParseArgs(args) val configOptions = invoke(args)
val result = configOptions.map(ConfigQuery.sources(_).paths.toSet) val result = configOptions.map(ConfigQuery.sources(_).paths.toSet)
assertResult(expected)(result) assertResult(expected)(result)
} }
@ -42,7 +45,7 @@ class ParseArgsTest extends FunSpec {
def invokeWithArgument(arg: String): ConfigOptions = { def invokeWithArgument(arg: String): ConfigOptions = {
val strings = List("--source", pathTo("."), "--bucket", "bucket", arg) val strings = List("--source", pathTo("."), "--bucket", "bucket", arg)
.filter(_ != "") .filter(_ != "")
val maybeOptions = ParseArgs(strings) val maybeOptions = invoke(strings)
maybeOptions.getOrElse(ConfigOptions()) maybeOptions.getOrElse(ConfigOptions())
} }
@ -71,4 +74,12 @@ class ParseArgsTest extends FunSpec {
.map(_.getCanonicalPath) .map(_.getCanonicalPath)
.getOrElse("[not-found]") .getOrElse("[not-found]")
private def invoke(args: List[String]) =
runtime
.unsafeRunSync {
ParseArgs(args)
}
.toEither
.toOption
} }

View file

@ -3,16 +3,19 @@ package net.kemitix.thorp.cli
import java.io.File import java.io.File
import java.nio.file.Path import java.nio.file.Path
import cats.data.EitherT
import cats.effect.IO
import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.core._ import net.kemitix.thorp.core._
import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.{HashService, StorageService} import net.kemitix.thorp.storage.api.{HashService, StorageService}
import org.scalatest.FunSpec import org.scalatest.FunSpec
import zio.console.Console
import zio.{DefaultRuntime, Task, TaskR}
class ProgramTest extends FunSpec { class ProgramTest extends FunSpec {
private val runtime = new DefaultRuntime {}
val source: File = Resource(this, ".") val source: File = Resource(this, ".")
val sourcePath: Path = source.toPath val sourcePath: Path = source.toPath
val bucket: Bucket = Bucket("aBucket") val bucket: Bucket = Bucket("aBucket")
@ -35,36 +38,46 @@ class ProgramTest extends FunSpec {
val archive = TestProgram.thorpArchive val archive = TestProgram.thorpArchive
it("should be handled in correct order") { it("should be handled in correct order") {
val expected = List(copyAction, uploadAction, deleteAction) val expected = List(copyAction, uploadAction, deleteAction)
TestProgram.run(configOptions).unsafeRunSync invoke(configOptions)
val result = archive.actions val result = archive.actions
assertResult(expected)(result) assertResult(expected)(result)
} }
} }
private def invoke(configOptions: ConfigOptions) =
runtime.unsafeRunSync {
TestProgram.run(configOptions)
}.toEither
trait TestPlanBuilder extends PlanBuilder { trait TestPlanBuilder extends PlanBuilder {
override def createPlan(storageService: StorageService, override def createPlan(
storageService: StorageService,
hashService: HashService, hashService: HashService,
configOptions: ConfigOptions)( configOptions: ConfigOptions
implicit l: Logger): EitherT[IO, List[String], SyncPlan] = { ): Task[SyncPlan] = {
EitherT.right( Task(SyncPlan(Stream(copyAction, uploadAction, deleteAction)))
IO(SyncPlan(Stream(copyAction, uploadAction, deleteAction))))
} }
} }
class ActionCaptureArchive extends ThorpArchive { class ActionCaptureArchive extends ThorpArchive {
var actions: List[Action] = List[Action]() var actions: List[Action] = List[Action]()
override def update(index: Int, action: Action, totalBytesSoFar: Long)( override def update(
implicit l: Logger): Stream[IO[StorageQueueEvent]] = { index: Int,
action: Action,
totalBytesSoFar: Long
): TaskR[Console, StorageQueueEvent] = {
actions = action :: actions actions = action :: actions
Stream() TaskR(DoNothingQueueEvent(RemoteKey("")))
} }
} }
object TestProgram extends Program with TestPlanBuilder { object TestProgram extends Program with TestPlanBuilder {
val thorpArchive: ActionCaptureArchive = new ActionCaptureArchive val thorpArchive: ActionCaptureArchive = new ActionCaptureArchive
override def thorpArchive(cliOptions: ConfigOptions, override def thorpArchive(
syncPlan: SyncPlan): IO[ThorpArchive] = cliOptions: ConfigOptions,
IO.pure(thorpArchive) syncTotals: SyncTotals
): Task[ThorpArchive] =
Task(thorpArchive)
} }
} }

View file

@ -1,14 +1,13 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import cats.Semigroup
import monocle.Lens import monocle.Lens
import monocle.macros.GenLens import monocle.macros.GenLens
case class ConfigOptions( case class ConfigOptions(
options: List[ConfigOption] = List() options: List[ConfigOption] = List()
) extends Semigroup[ConfigOptions] { ) {
override def combine( def combine(
x: ConfigOptions, x: ConfigOptions,
y: ConfigOptions y: ConfigOptions
): ConfigOptions = ): ConfigOptions =

View file

@ -1,5 +1,7 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import java.nio.file.Path
sealed trait ConfigValidation { sealed trait ConfigValidation {
def errorMessage: String def errorMessage: String
@ -19,4 +21,11 @@ object ConfigValidation {
override def errorMessage: String = "Bucket name is missing" override def errorMessage: String = "Bucket name is missing"
} }
case class ErrorReadingFile(
path: Path,
message: String
) extends ConfigValidation {
override def errorMessage: String = s"Error reading file '$path': $message"
}
} }

View file

@ -0,0 +1,5 @@
package net.kemitix.thorp.core
final case class ConfigValidationException(
errors: List[ConfigValidation]
) extends Exception

View file

@ -2,42 +2,57 @@ package net.kemitix.thorp.core
import java.nio.file.Path import java.nio.file.Path
import cats.data.{NonEmptyChain, Validated, ValidatedNec}
import cats.implicits._
import net.kemitix.thorp.domain.{Bucket, Config, Sources} import net.kemitix.thorp.domain.{Bucket, Config, Sources}
import zio.IO
sealed trait ConfigValidator { sealed trait ConfigValidator {
type ValidationResult[A] = ValidatedNec[ConfigValidation, A]
def validateConfig( def validateConfig(
config: Config): Validated[NonEmptyChain[ConfigValidation], Config] = config: Config
( ): IO[List[ConfigValidation], Config] = IO.fromEither {
validateSources(config.sources), for {
validateBucket(config.bucket) _ <- validateSources(config.sources)
).mapN((_, _) => config) _ <- validateBucket(config.bucket)
} yield config
}
def validateBucket(bucket: Bucket): ValidationResult[Bucket] = def validateBucket(bucket: Bucket): Either[List[ConfigValidation], Bucket] =
if (bucket.name.isEmpty) ConfigValidation.BucketNameIsMissing.invalidNec if (bucket.name.isEmpty) Left(List(ConfigValidation.BucketNameIsMissing))
else bucket.validNec else Right(bucket)
def validateSources(sources: Sources): ValidationResult[Sources] = def validateSources(
sources.paths sources: Sources): Either[List[ConfigValidation], Sources] =
.map(validateSource) (for {
.sequence x <- sources.paths.foldLeft(List[ConfigValidation]()) {
.map(_ => sources) (acc: List[ConfigValidation], path) =>
{
validateSource(path) match {
case Left(errors) => acc ++ errors
case Right(_) => acc
}
}
}
} yield x) match {
case Nil => Right(sources)
case errors => Left(errors)
}
def validateSource(source: Path): ValidationResult[Path] = def validateSource(source: Path): Either[List[ConfigValidation], Path] =
validateSourceIsDirectory(source) for {
.andThen(s => validateSourceIsReadable(s)) _ <- validateSourceIsDirectory(source)
_ <- validateSourceIsReadable(source)
} yield source
def validateSourceIsDirectory(source: Path): ValidationResult[Path] = def validateSourceIsDirectory(
if (source.toFile.isDirectory) source.validNec source: Path): Either[List[ConfigValidation], Path] =
else ConfigValidation.SourceIsNotADirectory.invalidNec if (source.toFile.isDirectory) Right(source)
else Left(List(ConfigValidation.SourceIsNotADirectory))
def validateSourceIsReadable(
source: Path): Either[List[ConfigValidation], Path] =
if (source.toFile.canRead) Right(source)
else Left(List(ConfigValidation.SourceIsNotReadable))
def validateSourceIsReadable(source: Path): ValidationResult[Path] =
if (source.toFile.canRead) source.validNec
else ConfigValidation.SourceIsNotReadable.invalidNec
} }
object ConfigValidator extends ConfigValidator object ConfigValidator extends ConfigValidator

View file

@ -2,12 +2,11 @@ package net.kemitix.thorp.core
import java.nio.file.Paths import java.nio.file.Paths
import cats.data.NonEmptyChain import net.kemitix.thorp.core.ConfigOptions.options
import cats.effect.IO
import net.kemitix.thorp.core.ConfigValidator.validateConfig import net.kemitix.thorp.core.ConfigValidator.validateConfig
import net.kemitix.thorp.core.ParseConfigFile.parseFile import net.kemitix.thorp.core.ParseConfigFile.parseFile
import net.kemitix.thorp.core.ConfigOptions.options
import net.kemitix.thorp.domain.Config import net.kemitix.thorp.domain.Config
import zio.IO
/** /**
* Builds a configuration from settings in a file within the * Builds a configuration from settings in a file within the
@ -19,25 +18,31 @@ trait ConfigurationBuilder {
private val globalConfig = Paths.get("/etc/thorp.conf") private val globalConfig = Paths.get("/etc/thorp.conf")
private val userHome = Paths.get(System.getProperty("user.home")) private val userHome = Paths.get(System.getProperty("user.home"))
def buildConfig(priorityOpts: ConfigOptions) def buildConfig(
: IO[Either[NonEmptyChain[ConfigValidation], Config]] = { priorityOpts: ConfigOptions): IO[List[ConfigValidation], Config] =
val sources = ConfigQuery.sources(priorityOpts)
for { for {
sourceOpts <- SourceConfigLoader.loadSourceConfigs(sources) config <- getConfigOptions(priorityOpts).map(collateOptions)
valid <- validateConfig(config)
} yield valid
private def getConfigOptions(
priorityOpts: ConfigOptions): IO[List[ConfigValidation], ConfigOptions] =
for {
sourceOpts <- SourceConfigLoader.loadSourceConfigs(
ConfigQuery.sources(priorityOpts))
userOpts <- userOptions(priorityOpts ++ sourceOpts) userOpts <- userOptions(priorityOpts ++ sourceOpts)
globalOpts <- globalOptions(priorityOpts ++ sourceOpts ++ userOpts) globalOpts <- globalOptions(priorityOpts ++ sourceOpts ++ userOpts)
collected = priorityOpts ++ sourceOpts ++ userOpts ++ globalOpts } yield priorityOpts ++ sourceOpts ++ userOpts ++ globalOpts
config = collateOptions(collected)
} yield validateConfig(config).toEither
}
private val emptyConfig = IO(ConfigOptions()) private val emptyConfig = IO.succeed(ConfigOptions())
private def userOptions(priorityOpts: ConfigOptions) = private def userOptions(
priorityOpts: ConfigOptions): IO[List[ConfigValidation], ConfigOptions] =
if (ConfigQuery.ignoreUserOptions(priorityOpts)) emptyConfig if (ConfigQuery.ignoreUserOptions(priorityOpts)) emptyConfig
else parseFile(userHome.resolve(userConfigFilename)) else parseFile(userHome.resolve(userConfigFilename))
private def globalOptions(priorityOpts: ConfigOptions) = private def globalOptions(
priorityOpts: ConfigOptions): IO[List[ConfigValidation], ConfigOptions] =
if (ConfigQuery.ignoreGlobalOptions(priorityOpts)) emptyConfig if (ConfigQuery.ignoreGlobalOptions(priorityOpts)) emptyConfig
else parseFile(globalConfig) else parseFile(globalConfig)

View file

@ -2,53 +2,46 @@ package net.kemitix.thorp.core
import java.nio.file.Path import java.nio.file.Path
import cats.effect.IO
import net.kemitix.thorp.core.KeyGenerator.generateKey import net.kemitix.thorp.core.KeyGenerator.generateKey
import net.kemitix.thorp.domain import net.kemitix.thorp.domain
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.HashService import net.kemitix.thorp.storage.api.HashService
import zio.Task
object LocalFileStream { object LocalFileStream {
private val emptyIOLocalFiles = IO.pure(LocalFiles())
def findFiles( def findFiles(
source: Path, source: Path,
hashService: HashService hashService: HashService
)( )(
implicit c: Config, implicit c: Config
logger: Logger ): Task[LocalFiles] = {
): IO[LocalFiles] = {
val isIncluded: Path => Boolean = Filter.isIncluded(c.filters) val isIncluded: Path => Boolean = Filter.isIncluded(c.filters)
val pathToLocalFile: Path => IO[LocalFiles] = path => val pathToLocalFile: Path => Task[LocalFiles] = path =>
localFile(hashService, logger, c)(path) localFile(hashService, c)(path)
def loop(path: Path): IO[LocalFiles] = { def loop(path: Path): Task[LocalFiles] = {
def dirPaths(path: Path) = def dirPaths(path: Path): Task[Stream[Path]] =
listFiles(path) listFiles(path)
.map(_.filter(isIncluded)) .map(_.filter(isIncluded))
def recurseIntoSubDirectories(path: Path) = def recurseIntoSubDirectories(path: Path): Task[LocalFiles] =
path.toFile match { path.toFile match {
case f if f.isDirectory => loop(path) case f if f.isDirectory => loop(path)
case _ => pathToLocalFile(path) case _ => pathToLocalFile(path)
} }
def recurse(paths: Stream[Path]) = def recurse(paths: Stream[Path]): Task[LocalFiles] =
paths.foldLeft(emptyIOLocalFiles)( Task.foldLeft(paths)(LocalFiles())((acc, path) => {
(acc, path) => recurseIntoSubDirectories(path).map(localFiles => acc ++ localFiles)
recurseIntoSubDirectories(path) })
.flatMap(localFiles =>
acc.map(accLocalFiles => accLocalFiles ++ localFiles)))
for { for {
_ <- logger.debug(s"- Entering: $path")
paths <- dirPaths(path) paths <- dirPaths(path)
localFiles <- recurse(paths) localFiles <- recurse(paths)
_ <- logger.debug(s"- Leaving : $path")
} yield localFiles } yield localFiles
} }
@ -57,14 +50,13 @@ object LocalFileStream {
def localFile( def localFile(
hashService: HashService, hashService: HashService,
l: Logger,
c: Config c: Config
): Path => IO[LocalFiles] = ): Path => Task[LocalFiles] =
path => { path => {
val file = path.toFile val file = path.toFile
val source = c.sources.forPath(path) val source = c.sources.forPath(path)
for { for {
hash <- hashService.hashLocalObject(path)(l) hash <- hashService.hashLocalObject(path)
} yield } yield
LocalFiles(localFiles = Stream( LocalFiles(localFiles = Stream(
domain.LocalFile(file, domain.LocalFile(file,
@ -75,15 +67,11 @@ object LocalFileStream {
totalSizeBytes = file.length) totalSizeBytes = file.length)
} }
//TODO: Change this to return an Either[IllegalArgumentException, Stream[Path]] private def listFiles(path: Path): Task[Stream[Path]] =
private def listFiles(path: Path) = { for {
IO( files <- Task(path.toFile.listFiles)
Option(path.toFile.listFiles) _ <- Task.when(files == null)(
.map { fs => Task.fail(new IllegalArgumentException(s"Directory not found $path")))
Stream(fs: _*) } yield Stream(files: _*).map(_.toPath)
.map(_.toPath)
}
.getOrElse(
throw new IllegalArgumentException(s"Directory not found $path")))
}
} }

View file

@ -4,8 +4,8 @@ import java.io.{File, FileInputStream}
import java.nio.file.Path import java.nio.file.Path
import java.security.MessageDigest import java.security.MessageDigest
import cats.effect.IO import net.kemitix.thorp.domain.MD5Hash
import net.kemitix.thorp.domain.{Logger, MD5Hash} import zio.Task
import scala.collection.immutable.NumericRange import scala.collection.immutable.NumericRange
@ -26,21 +26,19 @@ object MD5HashGenerator {
md5.digest md5.digest
} }
def md5File(path: Path)(implicit logger: Logger): IO[MD5Hash] = def md5File(path: Path): Task[MD5Hash] =
md5FileChunk(path, 0, path.toFile.length) md5FileChunk(path, 0, path.toFile.length)
def md5FileChunk( def md5FileChunk(
path: Path, path: Path,
offset: Long, offset: Long,
size: Long size: Long
)(implicit logger: Logger): IO[MD5Hash] = { ): Task[MD5Hash] = {
val file = path.toFile val file = path.toFile
val endOffset = Math.min(offset + size, file.length) val endOffset = Math.min(offset + size, file.length)
for { for {
_ <- logger.debug(s"md5:reading:size ${file.length}:$path")
digest <- readFile(file, offset, endOffset) digest <- readFile(file, offset, endOffset)
hash = MD5Hash.fromDigest(digest) hash = MD5Hash.fromDigest(digest)
_ <- logger.debug(s"md5:generated:${hash.hash}:$path")
} yield hash } yield hash
} }
@ -58,20 +56,20 @@ object MD5HashGenerator {
private def openFile( private def openFile(
file: File, file: File,
offset: Long offset: Long
) = IO { ) = Task {
val stream = new FileInputStream(file) val stream = new FileInputStream(file)
stream skip offset stream skip offset
stream stream
} }
private def closeFile(fis: FileInputStream) = IO(fis.close()) private def closeFile(fis: FileInputStream) = Task(fis.close())
private def digestFile( private def digestFile(
fis: FileInputStream, fis: FileInputStream,
offset: Long, offset: Long,
endOffset: Long endOffset: Long
) = ) =
IO { Task {
val md5 = MessageDigest getInstance "MD5" val md5 = MessageDigest getInstance "MD5"
NumericRange(offset, endOffset, maxBufferSize) NumericRange(offset, endOffset, maxBufferSize)
.foreach(currentOffset => .foreach(currentOffset =>

View file

@ -2,25 +2,29 @@ package net.kemitix.thorp.core
import java.nio.file.{Files, Path} import java.nio.file.{Files, Path}
import cats.effect.IO import zio.{IO, Task}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
trait ParseConfigFile { trait ParseConfigFile {
def parseFile(filename: Path): IO[ConfigOptions] = def parseFile(filename: Path): IO[List[ConfigValidation], ConfigOptions] =
readFile(filename) readFile(filename)
.map(ParseConfigLines.parseLines) .map(ParseConfigLines.parseLines)
.catchAll(h =>
IO.fail(
List(ConfigValidation.ErrorReadingFile(filename, h.getMessage))))
private def readFile(filename: Path) = { private def readFile(filename: Path): Task[List[String]] = {
if (Files.exists(filename)) readFileThatExists(filename) if (Files.exists(filename)) readFileThatExists(filename)
else IO.pure(List()) else IO(List())
} }
private def readFileThatExists(filename: Path) = private def readFileThatExists(filename: Path): Task[List[String]] =
for { for {
lines <- IO(Files.lines(filename)) lines <- IO(Files.lines(filename))
list = lines.iterator.asScala.toList list = lines.iterator.asScala.toList
//FIXME: use a bracket to close the file
_ <- IO(lines.close()) _ <- IO(lines.close())
} yield list } yield list

View file

@ -1,11 +1,10 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import cats.data.{EitherT, NonEmptyChain}
import cats.effect.IO
import cats.implicits._
import net.kemitix.thorp.core.Action.DoNothing import net.kemitix.thorp.core.Action.DoNothing
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.{HashService, StorageService} import net.kemitix.thorp.storage.api.{HashService, StorageService}
import zio.console._
import zio.{Task, TaskR}
trait PlanBuilder { trait PlanBuilder {
@ -13,30 +12,29 @@ trait PlanBuilder {
storageService: StorageService, storageService: StorageService,
hashService: HashService, hashService: HashService,
configOptions: ConfigOptions configOptions: ConfigOptions
)(implicit l: Logger): EitherT[IO, List[String], SyncPlan] = ): TaskR[Console, SyncPlan] =
EitherT(ConfigurationBuilder.buildConfig(configOptions)) ConfigurationBuilder
.leftMap(errorMessages) .buildConfig(configOptions)
.flatMap(config => useValidConfig(storageService, hashService)(config, l)) .catchAll(errors => TaskR.fail(ConfigValidationException(errors)))
.flatMap(config => useValidConfig(storageService, hashService)(config))
def errorMessages(errors: NonEmptyChain[ConfigValidation]): List[String] =
errors.map(cv => cv.errorMessage).toList
def useValidConfig( def useValidConfig(
storageService: StorageService, storageService: StorageService,
hashService: HashService hashService: HashService
)(implicit c: Config, l: Logger): EitherT[IO, List[String], SyncPlan] = )(implicit c: Config): TaskR[Console, SyncPlan] = {
for { for {
_ <- EitherT.liftF(SyncLogging.logRunStart(c.bucket, c.prefix, c.sources)) _ <- SyncLogging.logRunStart(c.bucket, c.prefix, c.sources)
actions <- buildPlan(storageService, hashService) actions <- buildPlan(storageService, hashService)
} yield actions } yield actions
}
private def buildPlan( private def buildPlan(
storageService: StorageService, storageService: StorageService,
hashService: HashService hashService: HashService
)(implicit c: Config, l: Logger) = )(implicit c: Config): TaskR[Console, SyncPlan] =
gatherMetadata(storageService, hashService) for {
.leftMap(List(_)) metadata <- gatherMetadata(storageService, hashService)
.map(assemblePlan) } yield assemblePlan(c)(metadata)
def assemblePlan( def assemblePlan(
implicit c: Config): ((S3ObjectsData, LocalFiles)) => SyncPlan = { implicit c: Config): ((S3ObjectsData, LocalFiles)) => SyncPlan = {
@ -91,21 +89,20 @@ trait PlanBuilder {
private def gatherMetadata( private def gatherMetadata(
storageService: StorageService, storageService: StorageService,
hashService: HashService hashService: HashService
)(implicit l: Logger, )(implicit c: Config): TaskR[Console, (S3ObjectsData, LocalFiles)] =
c: Config): EitherT[IO, String, (S3ObjectsData, LocalFiles)] =
for { for {
remoteData <- fetchRemoteData(storageService) remoteData <- fetchRemoteData(storageService)
localData <- EitherT.liftF(findLocalFiles(hashService)) localData <- findLocalFiles(hashService)
} yield (remoteData, localData) } yield (remoteData, localData)
private def fetchRemoteData( private def fetchRemoteData(
storageService: StorageService storageService: StorageService
)(implicit c: Config, l: Logger) = )(implicit c: Config): TaskR[Console, S3ObjectsData] =
storageService.listObjects(c.bucket, c.prefix) storageService.listObjects(c.bucket, c.prefix)
private def findLocalFiles( private def findLocalFiles(
hashService: HashService hashService: HashService
)(implicit config: Config, l: Logger) = )(implicit config: Config): TaskR[Console, LocalFiles] =
for { for {
_ <- SyncLogging.logFileScan _ <- SyncLogging.logFileScan
localFiles <- findFiles(hashService) localFiles <- findFiles(hashService)
@ -113,19 +110,10 @@ trait PlanBuilder {
private def findFiles( private def findFiles(
hashService: HashService hashService: HashService
)(implicit c: Config, l: Logger) = { )(implicit c: Config): Task[LocalFiles] = {
val ioListLocalFiles = (for { Task
source <- c.sources.paths .foreach(c.sources.paths)(LocalFileStream.findFiles(_, hashService))
} yield LocalFileStream.findFiles(source, hashService)).sequence .map(_.foldLeft(LocalFiles())((acc, localFile) => acc ++ localFile))
for {
listLocalFiles <- ioListLocalFiles
localFiles = listLocalFiles.foldRight(LocalFiles()) {
(acc, moreLocalFiles) =>
{
acc ++ moreLocalFiles
}
}
} yield localFiles
} }
} }

View file

@ -2,15 +2,15 @@ package net.kemitix.thorp.core
import java.nio.file.Path import java.nio.file.Path
import cats.effect.IO import net.kemitix.thorp.domain.MD5Hash
import net.kemitix.thorp.domain.{Logger, MD5Hash}
import net.kemitix.thorp.storage.api.HashService import net.kemitix.thorp.storage.api.HashService
import zio.Task
case class SimpleHashService() extends HashService { case class SimpleHashService() extends HashService {
override def hashLocalObject( override def hashLocalObject(
path: Path path: Path
)(implicit l: Logger): IO[Map[String, MD5Hash]] = ): Task[Map[String, MD5Hash]] =
for { for {
md5 <- MD5HashGenerator.md5File(path) md5 <- MD5HashGenerator.md5File(path)
} yield Map("md5" -> md5) } yield Map("md5" -> md5)

View file

@ -1,25 +1,26 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import cats.effect.IO
import cats.implicits._
import net.kemitix.thorp.domain.Sources import net.kemitix.thorp.domain.Sources
import zio.IO
trait SourceConfigLoader { trait SourceConfigLoader {
val thorpConfigFileName = ".thorp.conf" val thorpConfigFileName = ".thorp.conf"
def loadSourceConfigs: Sources => IO[ConfigOptions] = def loadSourceConfigs: Sources => IO[List[ConfigValidation], ConfigOptions] =
sources => { sources => {
val sourceConfigOptions = val sourceConfigOptions =
ConfigOptions(sources.paths.map(ConfigOption.Source(_))) ConfigOptions(sources.paths.map(ConfigOption.Source))
val reduce: List[ConfigOptions] => ConfigOptions = val reduce: List[ConfigOptions] => ConfigOptions =
_.foldLeft(sourceConfigOptions) { (acc, co) => acc ++ co } _.foldLeft(sourceConfigOptions) { (acc, co) =>
acc ++ co
}
sources.paths IO.foreach(sources.paths) { path =>
.map(_.resolve(thorpConfigFileName)) ParseConfigFile.parseFile(path.resolve(thorpConfigFileName))
.map(ParseConfigFile.parseFile).sequence }
.map(reduce) .map(reduce)
} }

View file

@ -1,7 +1,5 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import cats.effect.IO
import cats.implicits._
import net.kemitix.thorp.domain.StorageQueueEvent.{ import net.kemitix.thorp.domain.StorageQueueEvent.{
CopyQueueEvent, CopyQueueEvent,
DeleteQueueEvent, DeleteQueueEvent,
@ -9,6 +7,8 @@ import net.kemitix.thorp.domain.StorageQueueEvent.{
UploadQueueEvent UploadQueueEvent
} }
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import zio.ZIO
import zio.console._
trait SyncLogging { trait SyncLogging {
@ -16,40 +16,44 @@ trait SyncLogging {
bucket: Bucket, bucket: Bucket,
prefix: RemoteKey, prefix: RemoteKey,
sources: Sources sources: Sources
)(implicit logger: Logger): IO[Unit] = { ): ZIO[Console, Nothing, Unit] = {
val sourcesList = sources.paths.mkString(", ") val sourcesList = sources.paths.mkString(", ")
logger.info( for {
_ <- putStrLn(
List(s"Bucket: ${bucket.name}", List(s"Bucket: ${bucket.name}",
s"Prefix: ${prefix.key}", s"Prefix: ${prefix.key}",
s"Source: $sourcesList") s"Source: $sourcesList")
.mkString(", ")) .mkString(", "))
} yield ()
} }
def logFileScan(implicit c: Config, logger: Logger): IO[Unit] = def logFileScan(implicit c: Config): ZIO[Console, Nothing, Unit] =
logger.info(s"Scanning local files: ${c.sources.paths.mkString(", ")}...") putStrLn(s"Scanning local files: ${c.sources.paths.mkString(", ")}...")
def logRunFinished( def logRunFinished(
actions: Stream[StorageQueueEvent] actions: Stream[StorageQueueEvent]
)(implicit logger: Logger): IO[Unit] = { ): ZIO[Console, Nothing, Unit] = {
val counters = actions.foldLeft(Counters())(countActivities) val counters = actions.foldLeft(Counters())(countActivities)
for { for {
_ <- logger.info(s"Uploaded ${counters.uploaded} files") _ <- putStrLn(s"Uploaded ${counters.uploaded} files")
_ <- logger.info(s"Copied ${counters.copied} files") _ <- putStrLn(s"Copied ${counters.copied} files")
_ <- logger.info(s"Deleted ${counters.deleted} files") _ <- putStrLn(s"Deleted ${counters.deleted} files")
_ <- logger.info(s"Errors ${counters.errors}") _ <- putStrLn(s"Errors ${counters.errors}")
_ <- logErrors(actions) _ <- logErrors(actions)
} yield () } yield ()
} }
def logErrors( def logErrors(
actions: Stream[StorageQueueEvent] actions: Stream[StorageQueueEvent]
)(implicit logger: Logger): IO[Unit] = ): ZIO[Console, Nothing, Unit] = {
for { ZIO.foldLeft(actions)(()) { (_, action) =>
_ <- actions.map { action match {
case ErrorQueueEvent(k, e) => logger.warn(s"${k.key}: ${e.getMessage}") case ErrorQueueEvent(k, e) =>
case _ => IO.unit putStrLn(s"${k.key}: ${e.getMessage}")
}.sequence case _ => ZIO.unit
} yield () }
}
}
private def countActivities: (Counters, StorageQueueEvent) => Counters = private def countActivities: (Counters, StorageQueueEvent) => Counters =
(counters: Counters, s3Action: StorageQueueEvent) => { (counters: Counters, s3Action: StorageQueueEvent) => {

View file

@ -1,7 +1,8 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import cats.effect.IO import net.kemitix.thorp.domain.{LocalFile, StorageQueueEvent}
import net.kemitix.thorp.domain.{LocalFile, Logger, StorageQueueEvent} import zio.TaskR
import zio.console._
trait ThorpArchive { trait ThorpArchive {
@ -9,13 +10,15 @@ trait ThorpArchive {
index: Int, index: Int,
action: Action, action: Action,
totalBytesSoFar: Long totalBytesSoFar: Long
)(implicit l: Logger): Stream[IO[StorageQueueEvent]] ): TaskR[Console, StorageQueueEvent]
def logFileUploaded( def logFileUploaded(
localFile: LocalFile, localFile: LocalFile,
batchMode: Boolean batchMode: Boolean
)(implicit l: Logger): IO[Unit] = ): TaskR[Console, Unit] =
if (batchMode) l.info(s"Uploaded: ${localFile.remoteKey.key}") for {
else IO.unit _ <- TaskR.when(batchMode)(
putStrLn(s"Uploaded: ${localFile.remoteKey.key}"))
} yield ()
} }

View file

@ -1,10 +1,11 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import cats.effect.IO
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.StorageService import net.kemitix.thorp.storage.api.StorageService
import zio.console.Console
import zio.{Task, TaskR}
case class UnversionedMirrorArchive( case class UnversionedMirrorArchive(
storageService: StorageService, storageService: StorageService,
@ -16,8 +17,8 @@ case class UnversionedMirrorArchive(
index: Int, index: Int,
action: Action, action: Action,
totalBytesSoFar: Long totalBytesSoFar: Long
)(implicit l: Logger): Stream[IO[StorageQueueEvent]] = ): TaskR[Console, StorageQueueEvent] =
Stream(action match { action match {
case ToUpload(bucket, localFile, _) => case ToUpload(bucket, localFile, _) =>
for { for {
event <- doUpload(index, totalBytesSoFar, bucket, localFile) event <- doUpload(index, totalBytesSoFar, bucket, localFile)
@ -32,8 +33,8 @@ case class UnversionedMirrorArchive(
event <- storageService.delete(bucket, remoteKey) event <- storageService.delete(bucket, remoteKey)
} yield event } yield event
case DoNothing(_, remoteKey, _) => case DoNothing(_, remoteKey, _) =>
IO.pure(DoNothingQueueEvent(remoteKey)) Task(DoNothingQueueEvent(remoteKey))
}) }
private def doUpload( private def doUpload(
index: Int, index: Int,

View file

@ -2,6 +2,7 @@ package net.kemitix.thorp.core
import net.kemitix.thorp.domain.Sources import net.kemitix.thorp.domain.Sources
import org.scalatest.FunSpec import org.scalatest.FunSpec
import zio.DefaultRuntime
class ConfigOptionTest extends FunSpec with TemporaryFolder { class ConfigOptionTest extends FunSpec with TemporaryFolder {
@ -27,6 +28,9 @@ class ConfigOptionTest extends FunSpec with TemporaryFolder {
} }
private def invoke(configOptions: ConfigOptions) = { private def invoke(configOptions: ConfigOptions) = {
ConfigurationBuilder.buildConfig(configOptions).unsafeRunSync val runtime = new DefaultRuntime {}
runtime.unsafeRunSync {
ConfigurationBuilder.buildConfig(configOptions)
}.toEither
} }
} }

View file

@ -5,6 +5,7 @@ import java.nio.file.{Path, Paths}
import net.kemitix.thorp.domain.Filter.{Exclude, Include} import net.kemitix.thorp.domain.Filter.{Exclude, Include}
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import org.scalatest.FunSpec import org.scalatest.FunSpec
import zio.DefaultRuntime
class ConfigurationBuilderTest extends FunSpec with TemporaryFolder { class ConfigurationBuilderTest extends FunSpec with TemporaryFolder {
@ -32,7 +33,8 @@ class ConfigurationBuilderTest extends FunSpec with TemporaryFolder {
describe("with .thorp.conf") { describe("with .thorp.conf") {
describe("with settings") { describe("with settings") {
withDirectory(source => { withDirectory(source => {
val configFileName = createFile(source, thorpConfigFileName, val configFileName = createFile(source,
thorpConfigFileName,
"bucket = a-bucket", "bucket = a-bucket",
"prefix = a-prefix", "prefix = a-prefix",
"include = an-inclusion", "include = an-inclusion",
@ -47,7 +49,8 @@ class ConfigurationBuilderTest extends FunSpec with TemporaryFolder {
assertResult(expected)(result.map(_.prefix)) assertResult(expected)(result.map(_.prefix))
} }
it("should have filters") { it("should have filters") {
val expected = Right(List(Exclude("an-exclusion"), Include("an-inclusion"))) val expected =
Right(List(Exclude("an-exclusion"), Include("an-inclusion")))
assertResult(expected)(result.map(_.filters)) assertResult(expected)(result.map(_.filters))
} }
}) })
@ -120,8 +123,8 @@ class ConfigurationBuilderTest extends FunSpec with TemporaryFolder {
// should have prefix from current only // should have prefix from current only
val expectedPrefixes = Right(RemoteKey("current-prefix")) val expectedPrefixes = Right(RemoteKey("current-prefix"))
// should have filters from both sources // should have filters from both sources
val expectedFilters = Right(List( val expectedFilters = Right(
Filter.Exclude("current-exclude"), List(Filter.Exclude("current-exclude"),
Filter.Include("current-include"))) Filter.Include("current-include")))
val options = configOptions(ConfigOption.Source(currentSource)) val options = configOptions(ConfigOption.Source(currentSource))
val result = invoke(options) val result = invoke(options)
@ -135,7 +138,8 @@ class ConfigurationBuilderTest extends FunSpec with TemporaryFolder {
} }
} }
describe("when source has thorp.config source to another source that does the same") { describe(
"when source has thorp.config source to another source that does the same") {
it("should only include first two sources") { it("should only include first two sources") {
withDirectory(currentSource => { withDirectory(currentSource => {
withDirectory(parentSource => { withDirectory(parentSource => {
@ -143,11 +147,12 @@ class ConfigurationBuilderTest extends FunSpec with TemporaryFolder {
thorpConfigFileName, thorpConfigFileName,
s"source = $parentSource") s"source = $parentSource")
withDirectory(grandParentSource => { withDirectory(grandParentSource => {
writeFile(parentSource, thorpConfigFileName, s"source = $grandParentSource") writeFile(parentSource,
thorpConfigFileName,
s"source = $grandParentSource")
val expected = Right(List(currentSource, parentSource)) val expected = Right(List(currentSource, parentSource))
val options = configOptions( val options =
ConfigOption.Source(currentSource), configOptions(ConfigOption.Source(currentSource), coBucket)
coBucket)
val result = invoke(options).map(_.sources.paths) val result = invoke(options).map(_.sources.paths)
assertResult(expected)(result) assertResult(expected)(result)
}) })
@ -156,7 +161,11 @@ class ConfigurationBuilderTest extends FunSpec with TemporaryFolder {
} }
} }
private def invoke(configOptions: ConfigOptions) = private def invoke(configOptions: ConfigOptions) = {
ConfigurationBuilder.buildConfig(configOptions).unsafeRunSync val runtime = new DefaultRuntime {}
runtime.unsafeRunSync {
ConfigurationBuilder.buildConfig(configOptions)
}.toEither
}
} }

View file

@ -2,15 +2,14 @@ package net.kemitix.thorp.core
import java.nio.file.Path import java.nio.file.Path
import cats.effect.IO import net.kemitix.thorp.domain.MD5Hash
import net.kemitix.thorp.domain.{Logger, MD5Hash}
import net.kemitix.thorp.storage.api.HashService import net.kemitix.thorp.storage.api.HashService
import zio.Task
case class DummyHashService(hashes: Map[Path, Map[String, MD5Hash]]) case class DummyHashService(hashes: Map[Path, Map[String, MD5Hash]])
extends HashService { extends HashService {
override def hashLocalObject(path: Path)( override def hashLocalObject(path: Path): Task[Map[String, MD5Hash]] =
implicit l: Logger): IO[Map[String, MD5Hash]] = Task(hashes(path))
IO.pure(hashes(path))
} }

View file

@ -1,18 +0,0 @@
package net.kemitix.thorp.core
import cats.effect.IO
import net.kemitix.thorp.domain.Logger
class DummyLogger extends Logger {
override def debug(message: => String): IO[Unit] = IO.unit
override def info(message: => String): IO[Unit] = IO.unit
override def warn(message: String): IO[Unit] = IO.unit
override def error(message: String): IO[Unit] = IO.unit
override def withDebug(debug: Boolean): Logger = this
}

View file

@ -2,39 +2,39 @@ package net.kemitix.thorp.core
import java.io.File import java.io.File
import cats.data.EitherT
import cats.effect.IO
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.StorageService import net.kemitix.thorp.storage.api.StorageService
import zio.console.Console
import zio.{Task, TaskR}
case class DummyStorageService(s3ObjectData: S3ObjectsData, case class DummyStorageService(s3ObjectData: S3ObjectsData,
uploadFiles: Map[File, (RemoteKey, MD5Hash)]) uploadFiles: Map[File, (RemoteKey, MD5Hash)])
extends StorageService { extends StorageService {
override def shutdown: IO[StorageQueueEvent] = override def shutdown: Task[StorageQueueEvent] =
IO.pure(StorageQueueEvent.ShutdownQueueEvent()) Task(StorageQueueEvent.ShutdownQueueEvent())
override def listObjects(bucket: Bucket, prefix: RemoteKey)( override def listObjects(bucket: Bucket,
implicit l: Logger): EitherT[IO, String, S3ObjectsData] = prefix: RemoteKey): TaskR[Console, S3ObjectsData] =
EitherT.liftF(IO.pure(s3ObjectData)) TaskR(s3ObjectData)
override def upload(localFile: LocalFile, override def upload(localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
batchMode: Boolean, batchMode: Boolean,
uploadEventListener: UploadEventListener, uploadEventListener: UploadEventListener,
tryCount: Int): IO[StorageQueueEvent] = { tryCount: Int): Task[StorageQueueEvent] = {
val (remoteKey, md5Hash) = uploadFiles(localFile.file) val (remoteKey, md5Hash) = uploadFiles(localFile.file)
IO.pure(StorageQueueEvent.UploadQueueEvent(remoteKey, md5Hash)) Task(StorageQueueEvent.UploadQueueEvent(remoteKey, md5Hash))
} }
override def copy(bucket: Bucket, override def copy(bucket: Bucket,
sourceKey: RemoteKey, sourceKey: RemoteKey,
hash: MD5Hash, hash: MD5Hash,
targetKey: RemoteKey): IO[StorageQueueEvent] = targetKey: RemoteKey): Task[StorageQueueEvent] =
IO.pure(StorageQueueEvent.CopyQueueEvent(targetKey)) Task(StorageQueueEvent.CopyQueueEvent(targetKey))
override def delete(bucket: Bucket, override def delete(bucket: Bucket,
remoteKey: RemoteKey): IO[StorageQueueEvent] = remoteKey: RemoteKey): Task[StorageQueueEvent] =
IO.pure(StorageQueueEvent.DeleteQueueEvent(remoteKey)) Task(StorageQueueEvent.DeleteQueueEvent(remoteKey))
} }

View file

@ -5,6 +5,7 @@ import java.nio.file.Paths
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.HashService import net.kemitix.thorp.storage.api.HashService
import org.scalatest.FunSpec import org.scalatest.FunSpec
import zio.DefaultRuntime
class LocalFileStreamSuite extends FunSpec { class LocalFileStreamSuite extends FunSpec {
@ -21,28 +22,34 @@ class LocalFileStreamSuite extends FunSpec {
implicit private val config: Config = Config( implicit private val config: Config = Config(
sources = Sources(List(sourcePath))) sources = Sources(List(sourcePath)))
implicit private val logger: Logger = new DummyLogger
describe("findFiles") { describe("findFiles") {
it("should find all files") { it("should find all files") {
val result: Set[String] = val expected = Right(Set("subdir/leaf-file", "root-file"))
invoke.localFiles.toSet val result =
.map { x: LocalFile => invoke()
x.relative.toString .map(_.localFiles)
} .map(localFiles => localFiles.map(_.relative.toString))
assertResult(Set("subdir/leaf-file", "root-file"))(result) .map(_.toSet)
assertResult(expected)(result)
} }
it("should count all files") { it("should count all files") {
val result = invoke.count val expected = Right(2)
assertResult(2)(result) val result = invoke().map(_.count)
assertResult(expected)(result)
} }
it("should sum the size of all files") { it("should sum the size of all files") {
val result = invoke.totalSizeBytes val expected = Right(113)
assertResult(113)(result) val result = invoke().map(_.totalSizeBytes)
assertResult(expected)(result)
} }
} }
private def invoke = private def invoke() = {
LocalFileStream.findFiles(sourcePath, hashService).unsafeRunSync val runtime = new DefaultRuntime {}
runtime.unsafeRunSync {
LocalFileStream.findFiles(sourcePath, hashService)
}.toEither
}
} }

View file

@ -1,51 +1,70 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import net.kemitix.thorp.domain.MD5HashData.Root import java.nio.file.Path
import net.kemitix.thorp.domain.MD5HashData.{BigFile, Root}
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import org.scalatest.FunSpec import org.scalatest.FunSpec
import zio.DefaultRuntime
class MD5HashGeneratorTest extends FunSpec { class MD5HashGeneratorTest extends FunSpec {
private val runtime = new DefaultRuntime {}
private val source = Resource(this, "upload") private val source = Resource(this, "upload")
private val sourcePath = source.toPath private val sourcePath = source.toPath
private val prefix = RemoteKey("prefix") private val prefix = RemoteKey("prefix")
implicit private val config: Config = implicit private val config: Config =
Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath))) Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
implicit private val logger: Logger = new DummyLogger
describe("md5File()") {
describe("read a small file (smaller than buffer)") { describe("read a small file (smaller than buffer)") {
val path = Resource(this, "upload/root-file").toPath val path = Resource(this, "upload/root-file").toPath
it("should generate the correct hash") { it("should generate the correct hash") {
val result = MD5HashGenerator.md5File(path).unsafeRunSync val expected = Right(Root.hash)
assertResult(Root.hash)(result) val result = invoke(path)
assertResult(expected)(result)
} }
} }
describe("read a large file (bigger than buffer)") { describe("read a large file (bigger than buffer)") {
val path = Resource(this, "big-file").toPath val path = Resource(this, "big-file").toPath
it("should generate the correct hash") { it("should generate the correct hash") {
val expected = MD5HashData.BigFile.hash val expected = Right(BigFile.hash)
val result = MD5HashGenerator.md5File(path).unsafeRunSync val result = invoke(path)
assertResult(expected)(result) assertResult(expected)(result)
} }
} }
def invoke(path: Path) = {
runtime.unsafeRunSync {
MD5HashGenerator.md5File(path)
}.toEither
}
}
describe("md5FileChunk") {
describe("read chunks of file") { describe("read chunks of file") {
val path = Resource(this, "big-file").toPath val path = Resource(this, "big-file").toPath
it("should generate the correct hash for first chunk of the file") { it("should generate the correct hash for first chunk of the file") {
val part1 = MD5HashData.BigFile.Part1 val part1 = BigFile.Part1
val expected = part1.hash val expected = Right(part1.hash.hash)
val result = MD5HashGenerator val result = invoke(path, part1.offset, part1.size).map(_.hash)
.md5FileChunk(path, part1.offset, part1.size)
.unsafeRunSync
assertResult(expected)(result) assertResult(expected)(result)
} }
it("should generate the correcy hash for second chunk of the file") { it("should generate the correct hash for second chunk of the file") {
val part2 = MD5HashData.BigFile.Part2 val part2 = BigFile.Part2
val expected = part2.hash val expected = Right(part2.hash.hash)
val result = MD5HashGenerator val result = invoke(path, part2.offset, part2.size).map(_.hash)
.md5FileChunk(path, part2.offset, part2.size)
.unsafeRunSync
assertResult(expected)(result) assertResult(expected)(result)
} }
} }
def invoke(path: Path, offset: Long, size: Long) = {
runtime.unsafeRunSync {
MD5HashGenerator.md5FileChunk(path, offset, size)
}.toEither
}
}
} }

View file

@ -3,13 +3,11 @@ package net.kemitix.thorp.core
import java.nio.file.{Path, Paths} import java.nio.file.{Path, Paths}
import org.scalatest.FunSpec import org.scalatest.FunSpec
import zio.DefaultRuntime
class ParseConfigFileTest extends FunSpec { class ParseConfigFileTest extends FunSpec {
private val empty = ConfigOptions() private val empty = Right(ConfigOptions())
private def invoke(filename: Path) =
ParseConfigFile.parseFile(filename).unsafeRunSync
describe("parse a missing file") { describe("parse a missing file") {
val filename = Paths.get("/path/to/missing/file") val filename = Paths.get("/path/to/missing/file")
@ -31,11 +29,18 @@ class ParseConfigFileTest extends FunSpec {
} }
describe("parse a file with properties") { describe("parse a file with properties") {
val filename = Resource(this, "simple-config").toPath val filename = Resource(this, "simple-config").toPath
val expected = ConfigOptions( val expected = Right(
List(ConfigOption.Source(Paths.get("/path/to/source")), ConfigOptions(List(ConfigOption.Source(Paths.get("/path/to/source")),
ConfigOption.Bucket("bucket-name"))) ConfigOption.Bucket("bucket-name"))))
it("should return some options") { it("should return some options") {
assertResult(expected)(invoke(filename)) assertResult(expected)(invoke(filename))
} }
} }
private def invoke(filename: Path) = {
val runtime = new DefaultRuntime {}
runtime.unsafeRunSync {
ParseConfigFile.parseFile(filename)
}.toEither
}
} }

View file

@ -7,11 +7,14 @@ import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.{HashService, StorageService} import net.kemitix.thorp.storage.api.{HashService, StorageService}
import org.scalatest.FreeSpec import org.scalatest.FreeSpec
import zio.DefaultRuntime
class PlanBuilderTest extends FreeSpec with TemporaryFolder { class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val lastModified: LastModified = LastModified()
private val runtime = new DefaultRuntime {}
private val lastModified: LastModified = LastModified()
private val planBuilder = new PlanBuilder {} private val planBuilder = new PlanBuilder {}
private implicit val logger: Logger = new DummyLogger
private val emptyS3ObjectData = S3ObjectsData() private val emptyS3ObjectData = S3ObjectsData()
"create a plan" - { "create a plan" - {
@ -426,7 +429,14 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
} }
def md5Hash(file: File) = { def md5Hash(file: File) = {
hashService.hashLocalObject(file.toPath).unsafeRunSync()("md5") runtime
.unsafeRunSync {
hashService.hashLocalObject(file.toPath).map(_.get("md5"))
}
.toEither
.toOption
.flatten
.getOrElse(MD5Hash("invalid md5 hash in test"))
} }
} }
@ -450,14 +460,17 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
private def configOptions(configOptions: ConfigOption*): ConfigOptions = private def configOptions(configOptions: ConfigOption*): ConfigOptions =
ConfigOptions(List(configOptions: _*)) ConfigOptions(List(configOptions: _*))
private def invoke(storageService: StorageService, private def invoke(
storageService: StorageService,
hashService: HashService, hashService: HashService,
configOptions: ConfigOptions) configOptions: ConfigOptions
: Either[List[String], List[(String, String, String, String, String)]] = ): Either[Any, List[(String, String, String, String, String)]] =
runtime
.unsafeRunSync {
planBuilder planBuilder
.createPlan(storageService, hashService, configOptions) .createPlan(storageService, hashService, configOptions)
.value }
.unsafeRunSync() .toEither
.map(_.actions.toList.map({ .map(_.actions.toList.map({
case ToUpload(_, lf, _) => case ToUpload(_, lf, _) =>
("upload", ("upload",

View file

@ -4,8 +4,6 @@ import java.io.File
import java.nio.file.Paths import java.nio.file.Paths
import java.time.Instant import java.time.Instant
import cats.data.EitherT
import cats.effect.IO
import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain.MD5HashData.{Leaf, Root} import net.kemitix.thorp.domain.MD5HashData.{Leaf, Root}
import net.kemitix.thorp.domain.StorageQueueEvent.{ import net.kemitix.thorp.domain.StorageQueueEvent.{
@ -17,8 +15,13 @@ import net.kemitix.thorp.domain.StorageQueueEvent.{
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.{HashService, StorageService} import net.kemitix.thorp.storage.api.{HashService, StorageService}
import org.scalatest.FunSpec import org.scalatest.FunSpec
import zio.console.Console
import zio.{DefaultRuntime, Task, TaskR}
class SyncSuite extends FunSpec { class SyncSuite extends FunSpec {
private val runtime = new DefaultRuntime {}
private val testBucket = Bucket("bucket") private val testBucket = Bucket("bucket")
private val source = Resource(this, "upload") private val source = Resource(this, "upload")
private val sourcePath = source.toPath private val sourcePath = source.toPath
@ -30,7 +33,6 @@ class SyncSuite extends FunSpec {
md5HashMap(Root.hash), md5HashMap(Root.hash),
sourcePath, sourcePath,
_ => rootRemoteKey) _ => rootRemoteKey)
implicit private val logger: Logger = new DummyLogger
private val leafFile: LocalFile = private val leafFile: LocalFile =
LocalFile.resolve("subdir/leaf-file", LocalFile.resolve("subdir/leaf-file",
md5HashMap(Leaf.hash), md5HashMap(Leaf.hash),
@ -58,24 +60,6 @@ class SyncSuite extends FunSpec {
localFile: LocalFile): (String, String, File) = localFile: LocalFile): (String, String, File) =
(bucket.name, remoteKey.key, localFile.file) (bucket.name, remoteKey.key, localFile.file)
def invokeSubjectForActions(
storageService: StorageService,
hashService: HashService,
configOptions: ConfigOptions): Either[List[String], Stream[Action]] = {
invokeSubject(storageService, hashService, configOptions)
.map(_.actions)
}
def invokeSubject(
storageService: StorageService,
hashService: HashService,
configOptions: ConfigOptions): Either[List[String], SyncPlan] = {
PlanBuilder
.createPlan(storageService, hashService, configOptions)
.value
.unsafeRunSync
}
private def md5HashMap(md5Hash: MD5Hash): Map[String, MD5Hash] = private def md5HashMap(md5Hash: MD5Hash): Map[String, MD5Hash] =
Map("md5" -> md5Hash) Map("md5" -> md5Hash)
@ -211,28 +195,45 @@ class SyncSuite extends FunSpec {
s3ObjectsData: S3ObjectsData) s3ObjectsData: S3ObjectsData)
extends StorageService { extends StorageService {
override def listObjects(bucket: Bucket, prefix: RemoteKey)( override def listObjects(bucket: Bucket,
implicit l: Logger): EitherT[IO, String, S3ObjectsData] = prefix: RemoteKey): TaskR[Console, S3ObjectsData] =
EitherT.liftF(IO.pure(s3ObjectsData)) TaskR(s3ObjectsData)
override def upload(localFile: LocalFile, override def upload(localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
batchMode: Boolean, batchMode: Boolean,
uploadEventListener: UploadEventListener, uploadEventListener: UploadEventListener,
tryCount: Int): IO[UploadQueueEvent] = tryCount: Int): Task[UploadQueueEvent] =
IO.pure(UploadQueueEvent(localFile.remoteKey, localFile.hashes("md5"))) Task(UploadQueueEvent(localFile.remoteKey, localFile.hashes("md5")))
override def copy(bucket: Bucket, override def copy(bucket: Bucket,
sourceKey: RemoteKey, sourceKey: RemoteKey,
hashes: MD5Hash, hashes: MD5Hash,
targetKey: RemoteKey): IO[CopyQueueEvent] = targetKey: RemoteKey): Task[CopyQueueEvent] =
IO.pure(CopyQueueEvent(targetKey)) Task(CopyQueueEvent(targetKey))
override def delete(bucket: Bucket, override def delete(bucket: Bucket,
remoteKey: RemoteKey): IO[DeleteQueueEvent] = remoteKey: RemoteKey): Task[DeleteQueueEvent] =
IO.pure(DeleteQueueEvent(remoteKey)) Task(DeleteQueueEvent(remoteKey))
override def shutdown: IO[StorageQueueEvent] = override def shutdown: Task[StorageQueueEvent] =
IO.pure(ShutdownQueueEvent()) Task(ShutdownQueueEvent())
}
def invokeSubjectForActions(
storageService: StorageService,
hashService: HashService,
configOptions: ConfigOptions): Either[Any, Stream[Action]] = {
invoke(storageService, hashService, configOptions)
.map(_.actions)
}
def invoke(storageService: StorageService,
hashService: HashService,
configOptions: ConfigOptions): Either[Any, SyncPlan] = {
runtime.unsafeRunSync {
PlanBuilder
.createPlan(storageService, hashService, configOptions)
}.toEither
} }
} }

View file

@ -1,7 +1,5 @@
package net.kemitix.thorp.domain package net.kemitix.thorp.domain
import cats.effect.IO
trait Logger { trait Logger {
// returns an instance of Logger with debug set as indicated // returns an instance of Logger with debug set as indicated
@ -9,9 +7,9 @@ trait Logger {
// it returns itself, unmodified // it returns itself, unmodified
def withDebug(debug: Boolean): Logger def withDebug(debug: Boolean): Logger
def debug(message: => String): IO[Unit] def debug(message: => String): Unit
def info(message: => String): IO[Unit] def info(message: => String): Unit
def warn(message: String): IO[Unit] def warn(message: String): Unit
def error(message: String): IO[Unit] def error(message: String): Unit
} }

View file

@ -2,14 +2,14 @@ package net.kemitix.thorp.storage.api
import java.nio.file.Path import java.nio.file.Path
import cats.effect.IO import net.kemitix.thorp.domain.MD5Hash
import net.kemitix.thorp.domain.{Logger, MD5Hash} import zio.Task
/** /**
* Creates one, or more, hashes for local objects. * Creates one, or more, hashes for local objects.
*/ */
trait HashService { trait HashService {
def hashLocalObject(path: Path)(implicit l: Logger): IO[Map[String, MD5Hash]] def hashLocalObject(path: Path): Task[Map[String, MD5Hash]]
} }

View file

@ -1,17 +1,17 @@
package net.kemitix.thorp.storage.api package net.kemitix.thorp.storage.api
import cats.data.EitherT
import cats.effect.IO
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import zio.console.Console
import zio.{Task, TaskR}
trait StorageService { trait StorageService {
def shutdown: IO[StorageQueueEvent] def shutdown: Task[StorageQueueEvent]
def listObjects( def listObjects(
bucket: Bucket, bucket: Bucket,
prefix: RemoteKey prefix: RemoteKey
)(implicit l: Logger): EitherT[IO, String, S3ObjectsData] ): TaskR[Console, S3ObjectsData]
def upload( def upload(
localFile: LocalFile, localFile: LocalFile,
@ -19,18 +19,18 @@ trait StorageService {
batchMode: Boolean, batchMode: Boolean,
uploadEventListener: UploadEventListener, uploadEventListener: UploadEventListener,
tryCount: Int tryCount: Int
): IO[StorageQueueEvent] ): Task[StorageQueueEvent]
def copy( def copy(
bucket: Bucket, bucket: Bucket,
sourceKey: RemoteKey, sourceKey: RemoteKey,
hash: MD5Hash, hash: MD5Hash,
targetKey: RemoteKey targetKey: RemoteKey
): IO[StorageQueueEvent] ): Task[StorageQueueEvent]
def delete( def delete(
bucket: Bucket, bucket: Bucket,
remoteKey: RemoteKey remoteKey: RemoteKey
): IO[StorageQueueEvent] ): Task[StorageQueueEvent]
} }

View file

@ -1,10 +1,10 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import cats.effect.IO
import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.CopyObjectRequest import com.amazonaws.services.s3.model.CopyObjectRequest
import net.kemitix.thorp.domain.StorageQueueEvent.CopyQueueEvent import net.kemitix.thorp.domain.StorageQueueEvent.CopyQueueEvent
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import zio.Task
class Copier(amazonS3: AmazonS3) { class Copier(amazonS3: AmazonS3) {
@ -13,7 +13,7 @@ class Copier(amazonS3: AmazonS3) {
sourceKey: RemoteKey, sourceKey: RemoteKey,
hash: MD5Hash, hash: MD5Hash,
targetKey: RemoteKey targetKey: RemoteKey
): IO[StorageQueueEvent] = ): Task[StorageQueueEvent] =
for { for {
_ <- copyObject(bucket, sourceKey, hash, targetKey) _ <- copyObject(bucket, sourceKey, hash, targetKey)
} yield CopyQueueEvent(targetKey) } yield CopyQueueEvent(targetKey)
@ -31,7 +31,7 @@ class Copier(amazonS3: AmazonS3) {
bucket.name, bucket.name,
targetKey.key targetKey.key
).withMatchingETagConstraint(hash.hash) ).withMatchingETagConstraint(hash.hash)
IO(amazonS3.copyObject(request)) Task(amazonS3.copyObject(request))
} }
} }

View file

@ -1,17 +1,17 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import cats.effect.IO
import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.DeleteObjectRequest import com.amazonaws.services.s3.model.DeleteObjectRequest
import net.kemitix.thorp.domain.StorageQueueEvent.DeleteQueueEvent import net.kemitix.thorp.domain.StorageQueueEvent.DeleteQueueEvent
import net.kemitix.thorp.domain.{Bucket, RemoteKey, StorageQueueEvent} import net.kemitix.thorp.domain.{Bucket, RemoteKey, StorageQueueEvent}
import zio.Task
class Deleter(amazonS3: AmazonS3) { class Deleter(amazonS3: AmazonS3) {
def delete( def delete(
bucket: Bucket, bucket: Bucket,
remoteKey: RemoteKey remoteKey: RemoteKey
): IO[StorageQueueEvent] = ): Task[StorageQueueEvent] =
for { for {
_ <- deleteObject(bucket, remoteKey) _ <- deleteObject(bucket, remoteKey)
} yield DeleteQueueEvent(remoteKey) } yield DeleteQueueEvent(remoteKey)
@ -21,7 +21,7 @@ class Deleter(amazonS3: AmazonS3) {
remoteKey: RemoteKey remoteKey: RemoteKey
) = { ) = {
val request = new DeleteObjectRequest(bucket.name, remoteKey.key) val request = new DeleteObjectRequest(bucket.name, remoteKey.key)
IO(amazonS3.deleteObject(request)) Task(amazonS3.deleteObject(request))
} }
} }

View file

@ -2,25 +2,25 @@ package net.kemitix.thorp.storage.aws
import java.nio.file.Path import java.nio.file.Path
import cats.effect.IO
import cats.implicits._
import com.amazonaws.services.s3.model.PutObjectRequest import com.amazonaws.services.s3.model.PutObjectRequest
import com.amazonaws.services.s3.transfer.TransferManagerConfiguration import com.amazonaws.services.s3.transfer.TransferManagerConfiguration
import com.amazonaws.services.s3.transfer.internal.TransferManagerUtils import com.amazonaws.services.s3.transfer.internal.TransferManagerUtils
import net.kemitix.thorp.core.MD5HashGenerator import net.kemitix.thorp.core.MD5HashGenerator
import net.kemitix.thorp.domain.{Logger, MD5Hash} import net.kemitix.thorp.domain.MD5Hash
import zio.Task
trait ETagGenerator { trait ETagGenerator {
def eTag( def eTag(
path: Path path: Path
)(implicit l: Logger): IO[String] = { ): Task[String] = {
val partSize = calculatePartSize(path) val partSize = calculatePartSize(path)
val parts = numParts(path.toFile.length, partSize) val parts = numParts(path.toFile.length, partSize)
partsIndex(parts) Task
.map(digestChunk(path, partSize)) .foreach(partsIndex(parts)) { chunkNumber =>
.sequence digestChunk(path, partSize)(chunkNumber)
.map(concatenateDigests) }
.map(parts => concatenateDigests(parts))
.map(MD5HashGenerator.hex) .map(MD5HashGenerator.hex)
.map(hash => s"$hash-$parts") .map(hash => s"$hash-$parts")
} }
@ -53,14 +53,14 @@ trait ETagGenerator {
chunkSize: Long chunkSize: Long
)( )(
chunkNumber: Long chunkNumber: Long
)(implicit l: Logger): IO[Array[Byte]] = ): Task[Array[Byte]] =
hashChunk(path, chunkNumber, chunkSize).map(_.digest) hashChunk(path, chunkNumber, chunkSize).map(_.digest)
def hashChunk( def hashChunk(
path: Path, path: Path,
chunkNumber: Long, chunkNumber: Long,
chunkSize: Long chunkSize: Long
)(implicit l: Logger): IO[MD5Hash] = ): Task[MD5Hash] =
MD5HashGenerator.md5FileChunk(path, chunkNumber * chunkSize, chunkSize) MD5HashGenerator.md5FileChunk(path, chunkNumber * chunkSize, chunkSize)
def offsets( def offsets(

View file

@ -1,17 +1,15 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import cats.data.EitherT
import cats.effect.IO
import cats.implicits._
import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary} import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary}
import net.kemitix.thorp.domain import net.kemitix.thorp.domain
import net.kemitix.thorp.domain.{Bucket, Logger, RemoteKey, S3ObjectsData} import net.kemitix.thorp.domain.{Bucket, RemoteKey, S3ObjectsData}
import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash
import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey
import zio.console.Console
import zio.{IO, Task, TaskR, ZIO}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.util.Try
class Lister(amazonS3: AmazonS3) { class Lister(amazonS3: AmazonS3) {
@ -21,7 +19,7 @@ class Lister(amazonS3: AmazonS3) {
def listObjects( def listObjects(
bucket: Bucket, bucket: Bucket,
prefix: RemoteKey prefix: RemoteKey
)(implicit l: Logger): EitherT[IO, String, S3ObjectsData] = { ): TaskR[Console, S3ObjectsData] = {
val requestMore = (token: Token) => val requestMore = (token: Token) =>
new ListObjectsV2Request() new ListObjectsV2Request()
@ -29,26 +27,23 @@ class Lister(amazonS3: AmazonS3) {
.withPrefix(prefix.key) .withPrefix(prefix.key)
.withContinuationToken(token) .withContinuationToken(token)
def fetchBatch: ListObjectsV2Request => EitherT[IO, String, Batch] = def fetchBatch: ListObjectsV2Request => TaskR[Console, Batch] =
request => request =>
EitherT {
for { for {
_ <- ListerLogger.logFetchBatch _ <- ListerLogger.logFetchBatch
batch <- tryFetchBatch(request) batch <- tryFetchBatch(request)
} yield batch } yield batch
}
def fetchMore( def fetchMore(
more: Option[Token] more: Option[Token]
): EitherT[IO, String, Stream[S3ObjectSummary]] = { ): TaskR[Console, Stream[S3ObjectSummary]] = {
more match { more match {
case None => EitherT.right(IO.pure(Stream.empty)) case None => ZIO.succeed(Stream.empty)
case Some(token) => fetch(requestMore(token)) case Some(token) => fetch(requestMore(token))
} }
} }
def fetch def fetch: ListObjectsV2Request => TaskR[Console, Stream[S3ObjectSummary]] =
: ListObjectsV2Request => EitherT[IO, String, Stream[S3ObjectSummary]] =
request => { request => {
for { for {
batch <- fetchBatch(request) batch <- fetchBatch(request)
@ -67,17 +62,12 @@ class Lister(amazonS3: AmazonS3) {
private def tryFetchBatch( private def tryFetchBatch(
request: ListObjectsV2Request request: ListObjectsV2Request
): IO[Either[String, (Stream[S3ObjectSummary], Option[Token])]] = { ): Task[(Stream[S3ObjectSummary], Option[Token])] =
IO { IO(amazonS3.listObjectsV2(request))
Try(amazonS3.listObjectsV2(request))
.map { result => .map { result =>
val more: Option[Token] = val more: Option[Token] =
if (result.isTruncated) Some(result.getNextContinuationToken) if (result.isTruncated) Some(result.getNextContinuationToken)
else None else None
(result.getObjectSummaries.asScala.toStream, more) (result.getObjectSummaries.asScala.toStream, more)
} }
.toEither
.leftMap(e => e.getMessage)
}
}
} }

View file

@ -1,10 +1,10 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import cats.effect.IO import zio.TaskR
import net.kemitix.thorp.domain.Logger import zio.console._
trait ListerLogger { trait ListerLogger {
def logFetchBatch(implicit l: Logger): IO[Unit] = def logFetchBatch: TaskR[Console, Unit] =
l.info("Fetching remote summaries...") putStrLn("Fetching remote summaries...")
} }
object ListerLogger extends ListerLogger object ListerLogger extends ListerLogger

View file

@ -2,10 +2,10 @@ package net.kemitix.thorp.storage.aws
import java.nio.file.Path import java.nio.file.Path
import cats.effect.IO
import net.kemitix.thorp.core.MD5HashGenerator import net.kemitix.thorp.core.MD5HashGenerator
import net.kemitix.thorp.domain.{Logger, MD5Hash} import net.kemitix.thorp.domain.MD5Hash
import net.kemitix.thorp.storage.api.HashService import net.kemitix.thorp.storage.api.HashService
import zio.Task
trait S3HashService extends HashService { trait S3HashService extends HashService {
@ -17,7 +17,7 @@ trait S3HashService extends HashService {
*/ */
override def hashLocalObject( override def hashLocalObject(
path: Path path: Path
)(implicit l: Logger): IO[Map[String, MD5Hash]] = ): Task[Map[String, MD5Hash]] =
for { for {
md5 <- MD5HashGenerator.md5File(path) md5 <- MD5HashGenerator.md5File(path)
etag <- ETagGenerator.eTag(path).map(MD5Hash(_)) etag <- ETagGenerator.eTag(path).map(MD5Hash(_))

View file

@ -1,11 +1,11 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import cats.data.EitherT
import cats.effect.IO
import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.AmazonS3
import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.StorageService import net.kemitix.thorp.storage.api.StorageService
import zio.console.Console
import zio.{Task, TaskR}
class S3StorageService( class S3StorageService(
amazonS3Client: => AmazonS3, amazonS3Client: => AmazonS3,
@ -20,7 +20,7 @@ class S3StorageService(
override def listObjects( override def listObjects(
bucket: Bucket, bucket: Bucket,
prefix: RemoteKey prefix: RemoteKey
)(implicit l: Logger): EitherT[IO, String, S3ObjectsData] = ): TaskR[Console, S3ObjectsData] =
objectLister.listObjects(bucket, prefix) objectLister.listObjects(bucket, prefix)
override def copy( override def copy(
@ -28,7 +28,7 @@ class S3StorageService(
sourceKey: RemoteKey, sourceKey: RemoteKey,
hash: MD5Hash, hash: MD5Hash,
targetKey: RemoteKey targetKey: RemoteKey
): IO[StorageQueueEvent] = ): Task[StorageQueueEvent] =
copier.copy(bucket, sourceKey, hash, targetKey) copier.copy(bucket, sourceKey, hash, targetKey)
override def upload( override def upload(
@ -37,17 +37,17 @@ class S3StorageService(
batchMode: Boolean, batchMode: Boolean,
uploadEventListener: UploadEventListener, uploadEventListener: UploadEventListener,
tryCount: Int tryCount: Int
): IO[StorageQueueEvent] = ): Task[StorageQueueEvent] =
uploader.upload(localFile, bucket, batchMode, uploadEventListener, 1) uploader.upload(localFile, bucket, batchMode, uploadEventListener, 1)
override def delete( override def delete(
bucket: Bucket, bucket: Bucket,
remoteKey: RemoteKey remoteKey: RemoteKey
): IO[StorageQueueEvent] = ): Task[StorageQueueEvent] =
deleter.delete(bucket, remoteKey) deleter.delete(bucket, remoteKey)
override def shutdown: IO[StorageQueueEvent] = override def shutdown: Task[StorageQueueEvent] =
IO { Task {
amazonTransferManager.shutdownNow(true) amazonTransferManager.shutdownNow(true)
amazonS3Client.shutdown() amazonS3Client.shutdown()
ShutdownQueueEvent() ShutdownQueueEvent()

View file

@ -1,9 +1,7 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import cats.effect.IO
import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener} import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener}
import com.amazonaws.services.s3.model.{ObjectMetadata, PutObjectRequest} import com.amazonaws.services.s3.model.{ObjectMetadata, PutObjectRequest}
import com.amazonaws.services.s3.transfer.model.UploadResult
import net.kemitix.thorp.domain.StorageQueueEvent.{ import net.kemitix.thorp.domain.StorageQueueEvent.{
ErrorQueueEvent, ErrorQueueEvent,
UploadQueueEvent UploadQueueEvent
@ -14,8 +12,7 @@ import net.kemitix.thorp.domain.UploadEvent.{
TransferEvent TransferEvent
} }
import net.kemitix.thorp.domain.{StorageQueueEvent, _} import net.kemitix.thorp.domain.{StorageQueueEvent, _}
import zio.Task
import scala.util.Try
class Uploader(transferManager: => AmazonTransferManager) { class Uploader(transferManager: => AmazonTransferManager) {
@ -25,29 +22,24 @@ class Uploader(transferManager: => AmazonTransferManager) {
batchMode: Boolean, batchMode: Boolean,
uploadEventListener: UploadEventListener, uploadEventListener: UploadEventListener,
tryCount: Int tryCount: Int
): IO[StorageQueueEvent] = ): Task[StorageQueueEvent] =
for { for {
upload <- transfer(localFile, bucket, batchMode, uploadEventListener) upload <- transfer(localFile, bucket, batchMode, uploadEventListener)
action = upload match { } yield upload
case Right(r) =>
UploadQueueEvent(RemoteKey(r.getKey), MD5Hash(r.getETag))
case Left(e) => ErrorQueueEvent(localFile.remoteKey, e)
}
} yield action
private def transfer( private def transfer(
localFile: LocalFile, localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
batchMode: Boolean, batchMode: Boolean,
uploadEventListener: UploadEventListener uploadEventListener: UploadEventListener
): IO[Either[Throwable, UploadResult]] = { ): Task[StorageQueueEvent] = {
val listener: ProgressListener = progressListener(uploadEventListener) val listener: ProgressListener = progressListener(uploadEventListener)
val putObjectRequest = request(localFile, bucket, batchMode, listener) val putObjectRequest = request(localFile, bucket, batchMode, listener)
IO { Task(transferManager.upload(putObjectRequest))
Try(transferManager.upload(putObjectRequest))
.map(_.waitForUploadResult) .map(_.waitForUploadResult)
.toEither .map(upload =>
} UploadQueueEvent(RemoteKey(upload.getKey), MD5Hash(upload.getETag)))
.catchAll(e => Task.succeed(ErrorQueueEvent(localFile.remoteKey, e)))
} }
private def request( private def request(

View file

@ -1,18 +0,0 @@
package net.kemitix.thorp.storage.aws
import cats.effect.IO
import net.kemitix.thorp.domain.Logger
class DummyLogger extends Logger {
override def debug(message: => String): IO[Unit] = IO.unit
override def info(message: => String): IO[Unit] = IO.unit
override def warn(message: String): IO[Unit] = IO.unit
override def error(message: String): IO[Unit] = IO.unit
override def withDebug(debug: Boolean): Logger = this
}

View file

@ -1,18 +1,21 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import java.nio.file.Path
import com.amazonaws.services.s3.transfer.TransferManagerConfiguration import com.amazonaws.services.s3.transfer.TransferManagerConfiguration
import net.kemitix.thorp.core.Resource import net.kemitix.thorp.core.Resource
import net.kemitix.thorp.domain.MD5Hash
import org.scalatest.FunSpec import org.scalatest.FunSpec
import zio.DefaultRuntime
class ETagGeneratorTest extends FunSpec { class ETagGeneratorTest extends FunSpec {
private val runtime = new DefaultRuntime {}
private val bigFile = Resource(this, "big-file") private val bigFile = Resource(this, "big-file")
private val bigFilePath = bigFile.toPath private val bigFilePath = bigFile.toPath
private val configuration = new TransferManagerConfiguration private val configuration = new TransferManagerConfiguration
private val chunkSize = 1200000 private val chunkSize = 1200000
configuration.setMinimumUploadPartSize(chunkSize) configuration.setMinimumUploadPartSize(chunkSize)
private val logger = new DummyLogger
describe("Create offsets") { describe("Create offsets") {
it("should create offsets") { it("should create offsets") {
@ -25,10 +28,6 @@ class ETagGeneratorTest extends FunSpec {
} }
} }
def test(expected: String, result: MD5Hash): Unit = {
assertResult(expected)(result.hash)
}
describe("create md5 hash for each chunk") { describe("create md5 hash for each chunk") {
it("should create expected hash for chunks") { it("should create expected hash for chunks") {
val md5Hashes = List( val md5Hashes = List(
@ -40,20 +39,26 @@ class ETagGeneratorTest extends FunSpec {
).zipWithIndex ).zipWithIndex
md5Hashes.foreach { md5Hashes.foreach {
case (hash, index) => case (hash, index) =>
test(hash, assertResult(Right(hash))(
ETagGenerator invoke(bigFilePath, index, chunkSize).map(_.hash))
.hashChunk(bigFilePath, index, chunkSize)(logger)
.unsafeRunSync)
} }
} }
def invoke(path: Path, index: Long, size: Long) =
runtime.unsafeRunSync {
ETagGenerator.hashChunk(path, index, size)
}.toEither
} }
describe("create etag for whole file") { describe("create etag for whole file") {
val expected = "f14327c90ad105244c446c498bfe9a7d-2" val expected = "f14327c90ad105244c446c498bfe9a7d-2"
it("should match aws etag for the file") { it("should match aws etag for the file") {
val result = ETagGenerator.eTag(bigFilePath)(logger).unsafeRunSync val result = invoke(bigFilePath)
assertResult(expected)(result) assertResult(Right(expected))(result)
} }
def invoke(path: Path) =
runtime.unsafeRunSync {
ETagGenerator.eTag(path)
}.toEither
} }
} }

View file

@ -14,16 +14,18 @@ import net.kemitix.thorp.core.Resource
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import org.scalamock.scalatest.MockFactory import org.scalamock.scalatest.MockFactory
import org.scalatest.FunSpec import org.scalatest.FunSpec
import zio.DefaultRuntime
class S3StorageServiceSuite extends FunSpec with MockFactory { class S3StorageServiceSuite extends FunSpec with MockFactory {
private val runtime = new DefaultRuntime {}
describe("listObjectsInPrefix") { describe("listObjectsInPrefix") {
val source = Resource(this, "upload") val source = Resource(this, "upload")
val sourcePath = source.toPath val sourcePath = source.toPath
val prefix = RemoteKey("prefix") val prefix = RemoteKey("prefix")
implicit val config: Config = implicit val config: Config =
Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath))) Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
implicit val implLogger: Logger = new DummyLogger
val lm = LastModified(Instant.now.truncatedTo(ChronoUnit.MILLIS)) val lm = LastModified(Instant.now.truncatedTo(ChronoUnit.MILLIS))
@ -73,12 +75,15 @@ class S3StorageServiceSuite extends FunSpec with MockFactory {
k1b -> HashModified(h1, lm), k1b -> HashModified(h1, lm),
k2 -> HashModified(h2, lm)) k2 -> HashModified(h2, lm))
)) ))
val result = storageService val result = invoke(storageService)
.listObjects(Bucket("bucket"), RemoteKey("prefix"))
.value
.unsafeRunSync
assertResult(expected)(result) assertResult(expected)(result)
} }
} }
private def invoke(storageService: S3StorageService) =
runtime.unsafeRunSync {
storageService
.listObjects(Bucket("bucket"), RemoteKey("prefix"))
}.toEither
} }

View file

@ -20,7 +20,6 @@ class StorageServiceSuite extends FunSpec with MockFactory {
private val prefix = RemoteKey("prefix") private val prefix = RemoteKey("prefix")
implicit private val config: Config = implicit private val config: Config =
Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath))) Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
implicit private val implLogger: Logger = new DummyLogger
private val fileToKey = private val fileToKey =
KeyGenerator.generateKey(config.sources, config.prefix) _ KeyGenerator.generateKey(config.sources, config.prefix) _

View file

@ -18,7 +18,6 @@ class UploaderSuite extends FunSpec with MockFactory {
implicit private val config: Config = implicit private val config: Config =
Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath))) Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
private val fileToKey = generateKey(config.sources, config.prefix) _ private val fileToKey = generateKey(config.sources, config.prefix) _
implicit private val implLogger: Logger = new DummyLogger
def md5HashMap(hash: MD5Hash): Map[String, MD5Hash] = Map("md5" -> hash) def md5HashMap(hash: MD5Hash): Map[String, MD5Hash] = Map("md5" -> hash)