From f277b5e789d4b26c975ce24a7e2448ba90377204 Mon Sep 17 00:00:00 2001 From: Paul Campbell Date: Tue, 23 Jul 2019 23:13:09 +0100 Subject: [PATCH] Improve logging consistency (#119) * [console] create new effect MyConsole * [core] SyncLogging use ConsoleOut.ValidConfig * [storage-aws] Extract AmazonS3 trait to help testing * [console] MyConsole use UIO * [storage-aws] S3StorageServiceSuite convert to FreeSpec * [storage-aws] S3StorageServiceSuite extract S3ClientTest trait * [storage-aws] remove incomplete test UploaderSuite * [storage-aws] extract and rename AmazonS3ClientTestFixture * [storage-aws] Copier handle and log errors * [core] ThropArchive log completed uploads here * [core] ThorpArchive log copies and deletes * Improve consistency of logging * [storage-aws] CopierTest extract from S3StorageServiceSuite * [storage-aws] Copier handle hash match errors properly * [core] ThropArchive display erros in red * [core] SequencePlan extracted * [core] Clear line after deletion log * [changelog] updated * [cli] Program tidy up imports * [storage-aws] Copier replace null with Option * [storage-aws] AmazonS3 copyObject returns Some[CopyObjectResult] --- CHANGELOG.org | 5 + build.sbt | 8 +- .../scala/net/kemitix/thorp/cli/Main.scala | 14 ++- .../scala/net/kemitix/thorp/cli/Program.scala | 32 +++--- .../net/kemitix/thorp/cli/ProgramTest.scala | 11 ++- .../kemitix/thorp/console/ConsoleOut.scala | 21 ++++ .../net/kemitix/thorp/console/MyConsole.scala | 31 ++++++ .../net/kemitix/thorp/console/package.scala | 16 +++ .../net/kemitix/thorp/core/PlanBuilder.scala | 20 ++-- .../net/kemitix/thorp/core/SequencePlan.scala | 16 +++ .../net/kemitix/thorp/core/SyncLogging.scala | 32 ++---- .../net/kemitix/thorp/core/ThorpArchive.scala | 61 ++++++++++-- .../thorp/core/UnversionedMirrorArchive.scala | 8 +- .../thorp/core/DummyStorageService.scala | 10 +- .../kemitix/thorp/core/PlanBuilderTest.scala | 9 +- .../kemitix/thorp/core/SequencePlanTest.scala | 41 ++++++++ .../thorp/core/StorageQueueEventSuite.scala | 27 ----- .../net/kemitix/thorp/core/SyncSuite.scala | 15 +-- .../thorp/domain/StorageQueueEvent.scala | 50 +++++----- .../thorp/domain/UploadEventLogger.scala | 4 +- .../thorp/storage/api/StorageService.scala | 4 +- .../kemitix/thorp/storage/aws/AmazonS3.scala | 41 ++++++++ .../kemitix/thorp/storage/aws/Copier.scala | 49 ++++++++-- .../kemitix/thorp/storage/aws/Deleter.scala | 3 +- .../kemitix/thorp/storage/aws/Lister.scala | 20 ++-- .../thorp/storage/aws/ListerLogger.scala | 4 +- .../thorp/storage/aws/S3ClientException.scala | 13 +++ .../thorp/storage/aws/S3StorageService.scala | 7 +- .../storage/aws/S3StorageServiceBuilder.scala | 6 +- .../kemitix/thorp/storage/aws/Uploader.scala | 8 +- .../aws/AmazonS3ClientTestFixture.scala | 18 ++++ .../thorp/storage/aws/CopierTest.scala | 98 +++++++++++++++++++ .../storage/aws/S3StorageServiceSuite.scala | 85 +++++++--------- .../storage/aws/StorageServiceSuite.scala | 5 +- .../thorp/storage/aws/UploaderSuite.scala | 55 ----------- 35 files changed, 569 insertions(+), 278 deletions(-) create mode 100644 console/src/main/scala/net/kemitix/thorp/console/ConsoleOut.scala create mode 100644 console/src/main/scala/net/kemitix/thorp/console/MyConsole.scala create mode 100644 console/src/main/scala/net/kemitix/thorp/console/package.scala create mode 100644 core/src/main/scala/net/kemitix/thorp/core/SequencePlan.scala create mode 100644 core/src/test/scala/net/kemitix/thorp/core/SequencePlanTest.scala delete mode 100644 core/src/test/scala/net/kemitix/thorp/core/StorageQueueEventSuite.scala create mode 100644 storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/AmazonS3.scala create mode 100644 storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientException.scala create mode 100644 storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala create mode 100644 storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/CopierTest.scala delete mode 100644 storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala diff --git a/CHANGELOG.org b/CHANGELOG.org index 31c8187..7325cfb 100644 --- a/CHANGELOG.org +++ b/CHANGELOG.org @@ -8,6 +8,11 @@ The format is based on [[https://keepachangelog.com/en/1.0.0/][Keep a Changelog] * [0.8.0] - 2019-??-?? +** Added + + - Log copy and delete operations (#119) + - Log errors with red label (#119) + ** Changed - Replace cats-effect with zio (#117) diff --git a/build.sbt b/build.sbt index 4c16680..6e2965b 100644 --- a/build.sbt +++ b/build.sbt @@ -63,7 +63,7 @@ val zioDependencies = Seq( ) ) -// cli -> thorp-lib -> storage-aws -> core -> storage-api -> domain +// cli -> thorp-lib -> storage-aws -> core -> storage-api -> console -> domain lazy val thorp = (project in file(".")) .settings(commonSettings) @@ -112,6 +112,12 @@ lazy val `storage-api` = (project in file("storage-api")) .settings(commonSettings) .settings(zioDependencies) .settings(assemblyJarName in assembly := "storage-api.jar") + .dependsOn(console) + +lazy val console = (project in file("console")) + .settings(commonSettings) + .settings(zioDependencies) + .settings(assemblyJarName in assembly := "console.jar") .dependsOn(domain) lazy val domain = (project in file("domain")) diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala b/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala index f36cc5c..13ee928 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala @@ -1,14 +1,22 @@ package net.kemitix.thorp.cli -import zio.{App, ZIO} +import net.kemitix.thorp.console.MyConsole +import zio.internal.PlatformLive +import zio.{App, Runtime, UIO, ZIO} object Main extends App { - override def run(args: List[String]): ZIO[Environment, Nothing, Int] = { + private val runtime = Runtime(MyConsole.Live, PlatformLive.Default) + + override def run(args: List[String]): ZIO[Environment, Nothing, Int] = + runtime.unsafeRun { + appLogic(args).fold(_ => UIO(1), _ => UIO(0)) + } + + private def appLogic(args: List[String]): ZIO[MyConsole, Throwable, Unit] = for { cliOptions <- ParseArgs(args) _ <- Program.run(cliOptions) } yield () - }.fold(failure => 1, success => 0) } diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala index 35d07e9..6fadc2e 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala @@ -1,17 +1,17 @@ package net.kemitix.thorp.cli +import net.kemitix.thorp.console._ import net.kemitix.thorp.core._ import net.kemitix.thorp.domain.{StorageQueueEvent, SyncTotals} import net.kemitix.thorp.storage.aws.S3HashService.defaultHashService import net.kemitix.thorp.storage.aws.S3StorageServiceBuilder.defaultStorageService -import zio.console._ import zio.{Task, TaskR, ZIO} trait Program extends PlanBuilder { lazy val version = s"Thorp v${thorp.BuildInfo.version}" - def run(cliOptions: ConfigOptions): ZIO[Console, Nothing, Unit] = { + def run(cliOptions: ConfigOptions): ZIO[MyConsole, Nothing, Unit] = { val showVersion = ConfigQuery.showVersion(cliOptions) for { _ <- ZIO.when(showVersion)(putStrLn(version)) @@ -20,7 +20,7 @@ trait Program extends PlanBuilder { } private def execute( - cliOptions: ConfigOptions): ZIO[Console, Throwable, Unit] = { + cliOptions: ConfigOptions): ZIO[MyConsole, Throwable, Unit] = { for { plan <- createPlan(defaultStorageService, defaultHashService, cliOptions) archive <- thorpArchive(cliOptions, plan.syncTotals) @@ -30,7 +30,8 @@ trait Program extends PlanBuilder { } yield () } - private def handleErrors(throwable: Throwable): ZIO[Console, Nothing, Unit] = + private def handleErrors( + throwable: Throwable): ZIO[MyConsole, Nothing, Unit] = for { _ <- putStrLn("There were errors:") _ <- throwable match { @@ -54,21 +55,20 @@ trait Program extends PlanBuilder { private def handleActions( archive: ThorpArchive, syncPlan: SyncPlan - ): TaskR[Console, Stream[StorageQueueEvent]] = { + ): TaskR[MyConsole, Stream[StorageQueueEvent]] = { type Accumulator = (Stream[StorageQueueEvent], Long) val zero: Accumulator = (Stream(), syncPlan.syncTotals.totalSizeBytes) TaskR - .foldLeft(syncPlan.actions.reverse.zipWithIndex)(zero)( - (acc, indexedAction) => { - val (action, index) = indexedAction - val (stream, bytesToDo) = acc - val remainingBytes = bytesToDo - action.size - (for { - event <- archive.update(index, action, remainingBytes) - events = stream ++ Stream(event) - } yield events) - .map((_, remainingBytes)) - }) + .foldLeft(syncPlan.actions.zipWithIndex)(zero)((acc, indexedAction) => { + val (action, index) = indexedAction + val (stream, bytesToDo) = acc + val remainingBytes = bytesToDo - action.size + (for { + event <- archive.update(index, action, remainingBytes) + events = stream ++ Stream(event) + } yield events) + .map((_, remainingBytes)) + }) .map { case (events, _) => events } diff --git a/cli/src/test/scala/net/kemitix/thorp/cli/ProgramTest.scala b/cli/src/test/scala/net/kemitix/thorp/cli/ProgramTest.scala index 921dd30..eb4f221 100644 --- a/cli/src/test/scala/net/kemitix/thorp/cli/ProgramTest.scala +++ b/cli/src/test/scala/net/kemitix/thorp/cli/ProgramTest.scala @@ -3,18 +3,19 @@ package net.kemitix.thorp.cli import java.io.File import java.nio.file.Path +import net.kemitix.thorp.console.MyConsole import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.core._ import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.api.{HashService, StorageService} import org.scalatest.FunSpec -import zio.console.Console -import zio.{DefaultRuntime, Task, TaskR} +import zio.internal.PlatformLive +import zio.{Runtime, Task, TaskR} class ProgramTest extends FunSpec { - private val runtime = new DefaultRuntime {} + private val runtime = Runtime(MyConsole.Live, PlatformLive.Default) val source: File = Resource(this, ".") val sourcePath: Path = source.toPath @@ -39,7 +40,7 @@ class ProgramTest extends FunSpec { it("should be handled in correct order") { val expected = List(copyAction, uploadAction, deleteAction) invoke(configOptions) - val result = archive.actions + val result = archive.actions.reverse assertResult(expected)(result) } } @@ -65,7 +66,7 @@ class ProgramTest extends FunSpec { index: Int, action: Action, totalBytesSoFar: Long - ): TaskR[Console, StorageQueueEvent] = { + ): TaskR[MyConsole, StorageQueueEvent] = { actions = action :: actions TaskR(DoNothingQueueEvent(RemoteKey(""))) } diff --git a/console/src/main/scala/net/kemitix/thorp/console/ConsoleOut.scala b/console/src/main/scala/net/kemitix/thorp/console/ConsoleOut.scala new file mode 100644 index 0000000..81f9689 --- /dev/null +++ b/console/src/main/scala/net/kemitix/thorp/console/ConsoleOut.scala @@ -0,0 +1,21 @@ +package net.kemitix.thorp.console + +import net.kemitix.thorp.domain.{Bucket, RemoteKey, Sources} + +sealed trait ConsoleOut { + def en: String +} +object ConsoleOut { + case class ValidConfig( + bucket: Bucket, + prefix: RemoteKey, + sources: Sources + ) extends ConsoleOut { + private val sourcesList = sources.paths.mkString(", ") + override def en: String = + List(s"Bucket: ${bucket.name}", + s"Prefix: ${prefix.key}", + s"Source: $sourcesList") + .mkString(", ") + } +} diff --git a/console/src/main/scala/net/kemitix/thorp/console/MyConsole.scala b/console/src/main/scala/net/kemitix/thorp/console/MyConsole.scala new file mode 100644 index 0000000..3b379b0 --- /dev/null +++ b/console/src/main/scala/net/kemitix/thorp/console/MyConsole.scala @@ -0,0 +1,31 @@ +package net.kemitix.thorp.console + +import java.io.PrintStream + +import zio.{UIO, ZIO} + +trait MyConsole { + val console: MyConsole.Service +} + +object MyConsole { + + trait Service { + def putStrLn(line: ConsoleOut): ZIO[MyConsole, Nothing, Unit] + def putStrLn(line: String): ZIO[MyConsole, Nothing, Unit] + } + + trait Live extends MyConsole { + val console: Service = new Service { + override def putStrLn(line: ConsoleOut): ZIO[MyConsole, Nothing, Unit] = + putStrLn(line) + override def putStrLn(line: String): ZIO[MyConsole, Nothing, Unit] = + putStrLn(Console.out)(line) + final def putStrLn(stream: PrintStream)( + line: String): ZIO[MyConsole, Nothing, Unit] = + UIO(Console.withOut(stream)(Console.println(line))) + } + } + + object Live extends Live +} diff --git a/console/src/main/scala/net/kemitix/thorp/console/package.scala b/console/src/main/scala/net/kemitix/thorp/console/package.scala new file mode 100644 index 0000000..49e5eb2 --- /dev/null +++ b/console/src/main/scala/net/kemitix/thorp/console/package.scala @@ -0,0 +1,16 @@ +package net.kemitix.thorp + +import zio.ZIO + +package object console extends MyConsole.Service { + + final val consoleService: ZIO[MyConsole, Nothing, MyConsole.Service] = + ZIO.access(_.console) + + final def putStrLn(line: String): ZIO[MyConsole, Nothing, Unit] = + ZIO.accessM(_.console putStrLn line) + + override def putStrLn(line: ConsoleOut): ZIO[MyConsole, Nothing, Unit] = + putStrLn(line.en) + +} diff --git a/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala b/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala index 7f7796c..b1e2347 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala @@ -1,9 +1,9 @@ package net.kemitix.thorp.core -import net.kemitix.thorp.core.Action.DoNothing +import net.kemitix.thorp.console._ +import net.kemitix.thorp.core.Action._ import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.api.{HashService, StorageService} -import zio.console._ import zio.{Task, TaskR} trait PlanBuilder { @@ -12,7 +12,7 @@ trait PlanBuilder { storageService: StorageService, hashService: HashService, configOptions: ConfigOptions - ): TaskR[Console, SyncPlan] = + ): TaskR[MyConsole, SyncPlan] = ConfigurationBuilder .buildConfig(configOptions) .catchAll(errors => TaskR.fail(ConfigValidationException(errors))) @@ -21,7 +21,7 @@ trait PlanBuilder { def useValidConfig( storageService: StorageService, hashService: HashService - )(implicit c: Config): TaskR[Console, SyncPlan] = { + )(implicit c: Config): TaskR[MyConsole, SyncPlan] = { for { _ <- SyncLogging.logRunStart(c.bucket, c.prefix, c.sources) actions <- buildPlan(storageService, hashService) @@ -31,7 +31,7 @@ trait PlanBuilder { private def buildPlan( storageService: StorageService, hashService: HashService - )(implicit c: Config): TaskR[Console, SyncPlan] = + )(implicit c: Config): TaskR[MyConsole, SyncPlan] = for { metadata <- gatherMetadata(storageService, hashService) } yield assemblePlan(c)(metadata) @@ -40,7 +40,9 @@ trait PlanBuilder { implicit c: Config): ((S3ObjectsData, LocalFiles)) => SyncPlan = { case (remoteData, localData) => SyncPlan( - actions = createActions(remoteData, localData).filter(doesSomething), + actions = createActions(remoteData, localData) + .filter(doesSomething) + .sortBy(SequencePlan.order), syncTotals = SyncTotals(count = localData.count, totalSizeBytes = localData.totalSizeBytes) ) @@ -89,7 +91,7 @@ trait PlanBuilder { private def gatherMetadata( storageService: StorageService, hashService: HashService - )(implicit c: Config): TaskR[Console, (S3ObjectsData, LocalFiles)] = + )(implicit c: Config): TaskR[MyConsole, (S3ObjectsData, LocalFiles)] = for { remoteData <- fetchRemoteData(storageService) localData <- findLocalFiles(hashService) @@ -97,12 +99,12 @@ trait PlanBuilder { private def fetchRemoteData( storageService: StorageService - )(implicit c: Config): TaskR[Console, S3ObjectsData] = + )(implicit c: Config): TaskR[MyConsole, S3ObjectsData] = storageService.listObjects(c.bucket, c.prefix) private def findLocalFiles( hashService: HashService - )(implicit config: Config): TaskR[Console, LocalFiles] = + )(implicit config: Config): TaskR[MyConsole, LocalFiles] = for { _ <- SyncLogging.logFileScan localFiles <- findFiles(hashService) diff --git a/core/src/main/scala/net/kemitix/thorp/core/SequencePlan.scala b/core/src/main/scala/net/kemitix/thorp/core/SequencePlan.scala new file mode 100644 index 0000000..a7d5c85 --- /dev/null +++ b/core/src/main/scala/net/kemitix/thorp/core/SequencePlan.scala @@ -0,0 +1,16 @@ +package net.kemitix.thorp.core + +import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} + +trait SequencePlan { + + def order: Action => Int = { + case _: DoNothing => 0 + case _: ToCopy => 1 + case _: ToUpload => 2 + case _: ToDelete => 3 + } + +} + +object SequencePlan extends SequencePlan diff --git a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala b/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala index a4d6c32..250d54c 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala @@ -1,5 +1,7 @@ package net.kemitix.thorp.core +import net.kemitix.thorp.console._ +//import net.kemitix.thorp.console.MyConsole._ import net.kemitix.thorp.domain.StorageQueueEvent.{ CopyQueueEvent, DeleteQueueEvent, @@ -7,8 +9,8 @@ import net.kemitix.thorp.domain.StorageQueueEvent.{ UploadQueueEvent } import net.kemitix.thorp.domain._ +import net.kemitix.thorp.domain.Terminal.eraseToEndOfScreen import zio.ZIO -import zio.console._ trait SyncLogging { @@ -16,45 +18,27 @@ trait SyncLogging { bucket: Bucket, prefix: RemoteKey, sources: Sources - ): ZIO[Console, Nothing, Unit] = { - val sourcesList = sources.paths.mkString(", ") + ): ZIO[MyConsole, Nothing, Unit] = for { - _ <- putStrLn( - List(s"Bucket: ${bucket.name}", - s"Prefix: ${prefix.key}", - s"Source: $sourcesList") - .mkString(", ")) + _ <- putStrLn(ConsoleOut.ValidConfig(bucket, prefix, sources)) } yield () - } - def logFileScan(implicit c: Config): ZIO[Console, Nothing, Unit] = + def logFileScan(implicit c: Config): ZIO[MyConsole, Nothing, Unit] = putStrLn(s"Scanning local files: ${c.sources.paths.mkString(", ")}...") def logRunFinished( actions: Stream[StorageQueueEvent] - ): ZIO[Console, Nothing, Unit] = { + ): ZIO[MyConsole, Nothing, Unit] = { val counters = actions.foldLeft(Counters())(countActivities) for { + _ <- putStrLn(eraseToEndOfScreen) _ <- putStrLn(s"Uploaded ${counters.uploaded} files") _ <- putStrLn(s"Copied ${counters.copied} files") _ <- putStrLn(s"Deleted ${counters.deleted} files") _ <- putStrLn(s"Errors ${counters.errors}") - _ <- logErrors(actions) } yield () } - def logErrors( - actions: Stream[StorageQueueEvent] - ): ZIO[Console, Nothing, Unit] = { - ZIO.foldLeft(actions)(()) { (_, action) => - action match { - case ErrorQueueEvent(k, e) => - putStrLn(s"${k.key}: ${e.getMessage}") - case _ => ZIO.unit - } - } - } - private def countActivities: (Counters, StorageQueueEvent) => Counters = (counters: Counters, s3Action: StorageQueueEvent) => { import Counters._ diff --git a/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala b/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala index 297dce3..1292136 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala @@ -1,8 +1,19 @@ package net.kemitix.thorp.core -import net.kemitix.thorp.domain.{LocalFile, StorageQueueEvent} +import net.kemitix.thorp.console._ +import net.kemitix.thorp.domain.StorageQueueEvent +import net.kemitix.thorp.domain.StorageQueueEvent.{ + CopyQueueEvent, + DeleteQueueEvent, + DoNothingQueueEvent, + ErrorQueueEvent, + ShutdownQueueEvent, + UploadQueueEvent +} +import net.kemitix.thorp.domain.Terminal._ import zio.TaskR -import zio.console._ + +import scala.io.AnsiColor._ trait ThorpArchive { @@ -10,15 +21,45 @@ trait ThorpArchive { index: Int, action: Action, totalBytesSoFar: Long - ): TaskR[Console, StorageQueueEvent] + ): TaskR[MyConsole, StorageQueueEvent] - def logFileUploaded( - localFile: LocalFile, + def logEvent( + event: StorageQueueEvent, batchMode: Boolean - ): TaskR[Console, Unit] = - for { - _ <- TaskR.when(batchMode)( - putStrLn(s"Uploaded: ${localFile.remoteKey.key}")) - } yield () + ): TaskR[MyConsole, Unit] = + event match { + case UploadQueueEvent(remoteKey, _) => + for { + _ <- TaskR.when(batchMode)(putStrLn(s"Uploaded: ${remoteKey.key}")) + _ <- TaskR.when(!batchMode)( + putStrLn( + s"${GREEN}Uploaded:$RESET ${remoteKey.key}$eraseToEndOfScreen")) + } yield () + case CopyQueueEvent(sourceKey, targetKey) => + for { + _ <- TaskR.when(batchMode)( + putStrLn(s"Copied: ${sourceKey.key} => ${targetKey.key}")) + _ <- TaskR.when(!batchMode)( + putStrLn( + s"${GREEN}Copied:$RESET ${sourceKey.key} => ${targetKey.key}$eraseToEndOfScreen") + ) + } yield () + case DeleteQueueEvent(remoteKey) => + for { + _ <- TaskR.when(batchMode)(putStrLn(s"Deleted: $remoteKey")) + _ <- TaskR.when(!batchMode)( + putStrLn( + s"${GREEN}Deleted:$RESET ${remoteKey.key}$eraseToEndOfScreen")) + } yield () + case ErrorQueueEvent(action, _, e) => + for { + _ <- TaskR.when(batchMode)( + putStrLn(s"${action.name} failed: ${action.keys}: ${e.getMessage}")) + _ <- TaskR.when(!batchMode)(putStrLn( + s"${RED}ERROR:$RESET ${action.name} ${action.keys}: ${e.getMessage}$eraseToEndOfScreen")) + } yield () + case DoNothingQueueEvent(_) => TaskR(()) + case ShutdownQueueEvent() => TaskR(()) + } } diff --git a/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala b/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala index 0a1dd60..82fd076 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala @@ -1,10 +1,10 @@ package net.kemitix.thorp.core +import net.kemitix.thorp.console._ import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.api.StorageService -import zio.console.Console import zio.{Task, TaskR} case class UnversionedMirrorArchive( @@ -17,20 +17,22 @@ case class UnversionedMirrorArchive( index: Int, action: Action, totalBytesSoFar: Long - ): TaskR[Console, StorageQueueEvent] = + ): TaskR[MyConsole, StorageQueueEvent] = action match { case ToUpload(bucket, localFile, _) => for { event <- doUpload(index, totalBytesSoFar, bucket, localFile) - _ <- logFileUploaded(localFile, batchMode) + _ <- logEvent(event, batchMode) } yield event case ToCopy(bucket, sourceKey, hash, targetKey, _) => for { event <- storageService.copy(bucket, sourceKey, hash, targetKey) + _ <- logEvent(event, batchMode) } yield event case ToDelete(bucket, remoteKey, _) => for { event <- storageService.delete(bucket, remoteKey) + _ <- logEvent(event, batchMode) } yield event case DoNothing(_, remoteKey, _) => Task(DoNothingQueueEvent(remoteKey)) diff --git a/core/src/test/scala/net/kemitix/thorp/core/DummyStorageService.scala b/core/src/test/scala/net/kemitix/thorp/core/DummyStorageService.scala index 79b313b..8298c0e 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/DummyStorageService.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/DummyStorageService.scala @@ -2,9 +2,9 @@ package net.kemitix.thorp.core import java.io.File +import net.kemitix.thorp.console._ import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.api.StorageService -import zio.console.Console import zio.{Task, TaskR} case class DummyStorageService(s3ObjectData: S3ObjectsData, @@ -14,8 +14,10 @@ case class DummyStorageService(s3ObjectData: S3ObjectsData, override def shutdown: Task[StorageQueueEvent] = Task(StorageQueueEvent.ShutdownQueueEvent()) - override def listObjects(bucket: Bucket, - prefix: RemoteKey): TaskR[Console, S3ObjectsData] = + override def listObjects( + bucket: Bucket, + prefix: RemoteKey + ): TaskR[MyConsole, S3ObjectsData] = TaskR(s3ObjectData) override def upload(localFile: LocalFile, @@ -31,7 +33,7 @@ case class DummyStorageService(s3ObjectData: S3ObjectsData, sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey): Task[StorageQueueEvent] = - Task(StorageQueueEvent.CopyQueueEvent(targetKey)) + Task(StorageQueueEvent.CopyQueueEvent(sourceKey, targetKey)) override def delete(bucket: Bucket, remoteKey: RemoteKey): Task[StorageQueueEvent] = diff --git a/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala b/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala index 05fa48c..e55d9e6 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala @@ -3,15 +3,17 @@ package net.kemitix.thorp.core import java.io.File import java.nio.file.Path +import net.kemitix.thorp.console._ import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.api.{HashService, StorageService} import org.scalatest.FreeSpec -import zio.DefaultRuntime +import zio.Runtime +import zio.internal.PlatformLive class PlanBuilderTest extends FreeSpec with TemporaryFolder { - private val runtime = new DefaultRuntime {} + private val runtime = Runtime(MyConsole.Live, PlatformLive.Default) private val lastModified: LastModified = LastModified() private val planBuilder = new PlanBuilder {} @@ -464,7 +466,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { storageService: StorageService, hashService: HashService, configOptions: ConfigOptions - ): Either[Any, List[(String, String, String, String, String)]] = + ): Either[Any, List[(String, String, String, String, String)]] = { runtime .unsafeRunSync { planBuilder @@ -485,5 +487,6 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { ("do-nothing", remoteKey.key, "", "", "") case x => ("other", x.toString, "", "", "") })) + } } diff --git a/core/src/test/scala/net/kemitix/thorp/core/SequencePlanTest.scala b/core/src/test/scala/net/kemitix/thorp/core/SequencePlanTest.scala new file mode 100644 index 0000000..0f8df6f --- /dev/null +++ b/core/src/test/scala/net/kemitix/thorp/core/SequencePlanTest.scala @@ -0,0 +1,41 @@ +package net.kemitix.thorp.core + +import java.io.File + +import net.kemitix.thorp.core.Action._ +import net.kemitix.thorp.domain.{Bucket, LocalFile, MD5Hash, RemoteKey} +import org.scalatest.FreeSpec + +class SequencePlanTest extends FreeSpec { + + "sort" - { + "a list of assorted actions" - { + val bucket = Bucket("aBucket") + val remoteKey1 = RemoteKey("remoteKey1") + val remoteKey2 = RemoteKey("targetHash") + val hash = MD5Hash("aHash") + val hashes: Map[String, MD5Hash] = Map() + val size = 1024 + val file1 = new File("aFile") + val file2 = new File("aFile") + val source = new File("source") + val localFile1 = + LocalFile(file1, source, hashes, remoteKey1) + val localFile2 = + LocalFile(file2, source, hashes, remoteKey2) + val copy1 = ToCopy(bucket, remoteKey1, hash, remoteKey2, size) + val copy2 = ToCopy(bucket, remoteKey2, hash, remoteKey1, size) + val upload1 = ToUpload(bucket, localFile1, size) + val upload2 = ToUpload(bucket, localFile1, size) + val delete1 = ToDelete(bucket, remoteKey1, size) + val delete2 = ToDelete(bucket, remoteKey2, size) + "should be in correct order" in { + val actions = List(copy1, delete1, upload1, delete2, upload2, copy2) + val expected = List(copy1, copy2, upload1, upload2, delete1, delete2) + val result = actions.sortBy(SequencePlan.order) + assertResult(expected)(result) + } + } + } + +} diff --git a/core/src/test/scala/net/kemitix/thorp/core/StorageQueueEventSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/StorageQueueEventSuite.scala deleted file mode 100644 index e745675..0000000 --- a/core/src/test/scala/net/kemitix/thorp/core/StorageQueueEventSuite.scala +++ /dev/null @@ -1,27 +0,0 @@ -package net.kemitix.thorp.core - -import net.kemitix.thorp.domain.StorageQueueEvent.{ - CopyQueueEvent, - DeleteQueueEvent, - UploadQueueEvent -} -import net.kemitix.thorp.domain.{MD5Hash, RemoteKey} -import org.scalatest.FunSpec - -class StorageQueueEventSuite extends FunSpec { - - describe("Ordering of types") { - val remoteKey = RemoteKey("remote-key") - val md5Hash = MD5Hash("md5hash") - val copy = CopyQueueEvent(remoteKey) - val upload = UploadQueueEvent(remoteKey, md5Hash) - val delete = DeleteQueueEvent(remoteKey) - val unsorted = List(delete, copy, upload) - it("should sort as copy < upload < delete ") { - val result = unsorted.sorted - val expected = List(copy, upload, delete) - assertResult(expected)(result) - } - } - -} diff --git a/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala index dce6d89..f3d00e1 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala @@ -4,6 +4,8 @@ import java.io.File import java.nio.file.Paths import java.time.Instant +import net.kemitix.thorp.console +import net.kemitix.thorp.console.MyConsole import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.domain.MD5HashData.{Leaf, Root} import net.kemitix.thorp.domain.StorageQueueEvent.{ @@ -15,12 +17,12 @@ import net.kemitix.thorp.domain.StorageQueueEvent.{ import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.api.{HashService, StorageService} import org.scalatest.FunSpec -import zio.console.Console -import zio.{DefaultRuntime, Task, TaskR} +import zio.internal.PlatformLive +import zio.{Runtime, Task, TaskR} class SyncSuite extends FunSpec { - private val runtime = new DefaultRuntime {} + private val runtime = Runtime(MyConsole.Live, PlatformLive.Default) private val testBucket = Bucket("bucket") private val source = Resource(this, "upload") @@ -195,8 +197,9 @@ class SyncSuite extends FunSpec { s3ObjectsData: S3ObjectsData) extends StorageService { - override def listObjects(bucket: Bucket, - prefix: RemoteKey): TaskR[Console, S3ObjectsData] = + override def listObjects( + bucket: Bucket, + prefix: RemoteKey): TaskR[console.MyConsole, S3ObjectsData] = TaskR(s3ObjectsData) override def upload(localFile: LocalFile, @@ -210,7 +213,7 @@ class SyncSuite extends FunSpec { sourceKey: RemoteKey, hashes: MD5Hash, targetKey: RemoteKey): Task[CopyQueueEvent] = - Task(CopyQueueEvent(targetKey)) + Task(CopyQueueEvent(sourceKey, targetKey)) override def delete(bucket: Bucket, remoteKey: RemoteKey): Task[DeleteQueueEvent] = diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/StorageQueueEvent.scala b/domain/src/main/scala/net/kemitix/thorp/domain/StorageQueueEvent.scala index 31e7e8d..3594690 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/StorageQueueEvent.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/StorageQueueEvent.scala @@ -1,49 +1,49 @@ package net.kemitix.thorp.domain -sealed trait StorageQueueEvent { - - val order: Int - -} +sealed trait StorageQueueEvent object StorageQueueEvent { final case class DoNothingQueueEvent( remoteKey: RemoteKey - ) extends StorageQueueEvent { - override val order: Int = 0 - } + ) extends StorageQueueEvent final case class CopyQueueEvent( - remoteKey: RemoteKey - ) extends StorageQueueEvent { - override val order: Int = 1 - } + sourceKey: RemoteKey, + targetKey: RemoteKey + ) extends StorageQueueEvent final case class UploadQueueEvent( remoteKey: RemoteKey, md5Hash: MD5Hash - ) extends StorageQueueEvent { - override val order: Int = 2 - } + ) extends StorageQueueEvent final case class DeleteQueueEvent( remoteKey: RemoteKey - ) extends StorageQueueEvent { - override val order: Int = 3 - } + ) extends StorageQueueEvent final case class ErrorQueueEvent( + action: Action, remoteKey: RemoteKey, e: Throwable - ) extends StorageQueueEvent { - override val order: Int = 10 - } + ) extends StorageQueueEvent - final case class ShutdownQueueEvent() extends StorageQueueEvent { - override val order: Int = 99 - } + final case class ShutdownQueueEvent() extends StorageQueueEvent - implicit def ord[A <: StorageQueueEvent]: Ordering[A] = Ordering.by(_.order) + sealed trait Action { + val name: String + val keys: String + } + object Action { + case class Copy(keys: String) extends Action { + override val name: String = "Copy" + } + case class Upload(keys: String) extends Action { + override val name: String = "Upload" + } + case class Delete(keys: String) extends Action { + override val name: String = "Delete" + } + } } diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventLogger.scala b/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventLogger.scala index afc9063..692a271 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventLogger.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventLogger.scala @@ -19,7 +19,7 @@ trait UploadEventLogger { val remoteKey = localFile.remoteKey.key val fileLength = localFile.file.length val statusHeight = 7 - if (bytesTransferred < fileLength) { + if (bytesTransferred < fileLength) println( s"${GREEN}Uploading:$RESET $remoteKey$eraseToEndOfScreen\n" + statusWithBar(" File", sizeInEnglish, bytesTransferred, fileLength) + @@ -29,8 +29,6 @@ trait UploadEventLogger { bytesTransferred + totalBytesSoFar, syncTotals.totalSizeBytes) + s"${Terminal.cursorPrevLine(statusHeight)}") - } else - println(s"${GREEN}Uploaded:$RESET $remoteKey$eraseToEndOfScreen") } private def statusWithBar( diff --git a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala index 0e38d45..fbc84a2 100644 --- a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala +++ b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala @@ -1,7 +1,7 @@ package net.kemitix.thorp.storage.api +import net.kemitix.thorp.console.MyConsole import net.kemitix.thorp.domain._ -import zio.console.Console import zio.{Task, TaskR} trait StorageService { @@ -11,7 +11,7 @@ trait StorageService { def listObjects( bucket: Bucket, prefix: RemoteKey - ): TaskR[Console, S3ObjectsData] + ): TaskR[MyConsole, S3ObjectsData] def upload( localFile: LocalFile, diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/AmazonS3.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/AmazonS3.scala new file mode 100644 index 0000000..b4cf0f5 --- /dev/null +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/AmazonS3.scala @@ -0,0 +1,41 @@ +package net.kemitix.thorp.storage.aws + +import com.amazonaws.services.s3.{AmazonS3 => AmazonS3Client} +import com.amazonaws.services.s3.model.{ + CopyObjectRequest, + CopyObjectResult, + DeleteObjectRequest, + ListObjectsV2Request, + ListObjectsV2Result +} + +object AmazonS3 { + + trait Client { + + def shutdown(): Unit + + def deleteObject: DeleteObjectRequest => Unit + + def copyObject: CopyObjectRequest => Option[CopyObjectResult] + + def listObjectsV2: ListObjectsV2Request => ListObjectsV2Result + + } + + case class ClientImpl(amazonS3: AmazonS3Client) extends Client { + + def shutdown(): Unit = amazonS3.shutdown() + + def deleteObject: DeleteObjectRequest => Unit = + request => amazonS3.deleteObject(request) + + def copyObject: CopyObjectRequest => Option[CopyObjectResult] = + request => Option(amazonS3.copyObject(request)) + + def listObjectsV2: ListObjectsV2Request => ListObjectsV2Result = + request => amazonS3.listObjectsV2(request) + + } + +} diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Copier.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Copier.scala index 61f8b51..bbacad3 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Copier.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Copier.scala @@ -1,12 +1,21 @@ package net.kemitix.thorp.storage.aws -import com.amazonaws.services.s3.AmazonS3 -import com.amazonaws.services.s3.model.CopyObjectRequest -import net.kemitix.thorp.domain.StorageQueueEvent.CopyQueueEvent +import com.amazonaws.services.s3.model.{ + AmazonS3Exception, + CopyObjectRequest, + CopyObjectResult +} +import net.kemitix.thorp.domain.StorageQueueEvent.{Action, CopyQueueEvent} import net.kemitix.thorp.domain._ +import net.kemitix.thorp.storage.aws.S3ClientException.{ + HashMatchError, + S3Exception +} import zio.Task -class Copier(amazonS3: AmazonS3) { +import scala.util.{Failure, Success, Try} + +class Copier(amazonS3: AmazonS3.Client) { def copy( bucket: Bucket, @@ -15,8 +24,34 @@ class Copier(amazonS3: AmazonS3) { targetKey: RemoteKey ): Task[StorageQueueEvent] = for { - _ <- copyObject(bucket, sourceKey, hash, targetKey) - } yield CopyQueueEvent(targetKey) + copyResult <- copyObject(bucket, sourceKey, hash, targetKey) + result <- mapCopyResult(copyResult, sourceKey, targetKey) + } yield result + + private def mapCopyResult( + copyResult: Try[Option[CopyObjectResult]], + sourceKey: RemoteKey, + targetKey: RemoteKey + ) = + copyResult match { + case Success(None) => + Task.succeed( + StorageQueueEvent + .ErrorQueueEvent( + Action.Copy(s"${sourceKey.key} => ${targetKey.key}"), + targetKey, + HashMatchError)) + case Success(Some(_)) => + Task.succeed(CopyQueueEvent(sourceKey, targetKey)) + case Failure(e: AmazonS3Exception) => + Task.succeed( + StorageQueueEvent.ErrorQueueEvent( + Action.Copy(s"${sourceKey.key} => ${targetKey.key}"), + targetKey, + S3Exception(e.getMessage)) + ) + case Failure(e) => Task.fail(e) + } private def copyObject( bucket: Bucket, @@ -31,7 +66,7 @@ class Copier(amazonS3: AmazonS3) { bucket.name, targetKey.key ).withMatchingETagConstraint(hash.hash) - Task(amazonS3.copyObject(request)) + Task(Try(amazonS3.copyObject(request))) } } diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Deleter.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Deleter.scala index 8a1f84e..68a8e61 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Deleter.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Deleter.scala @@ -1,12 +1,11 @@ package net.kemitix.thorp.storage.aws -import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.DeleteObjectRequest import net.kemitix.thorp.domain.StorageQueueEvent.DeleteQueueEvent import net.kemitix.thorp.domain.{Bucket, RemoteKey, StorageQueueEvent} import zio.Task -class Deleter(amazonS3: AmazonS3) { +class Deleter(amazonS3: AmazonS3.Client) { def delete( bucket: Bucket, diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala index 772a5c2..540af77 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala @@ -1,17 +1,16 @@ package net.kemitix.thorp.storage.aws -import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary} +import net.kemitix.thorp.console._ import net.kemitix.thorp.domain import net.kemitix.thorp.domain.{Bucket, RemoteKey, S3ObjectsData} import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey -import zio.console.Console -import zio.{IO, Task, TaskR, ZIO} +import zio.{Task, TaskR} import scala.collection.JavaConverters._ -class Lister(amazonS3: AmazonS3) { +class Lister(amazonS3: AmazonS3.Client) { private type Token = String private type Batch = (Stream[S3ObjectSummary], Option[Token]) @@ -19,7 +18,7 @@ class Lister(amazonS3: AmazonS3) { def listObjects( bucket: Bucket, prefix: RemoteKey - ): TaskR[Console, S3ObjectsData] = { + ): TaskR[MyConsole, S3ObjectsData] = { val requestMore = (token: Token) => new ListObjectsV2Request() @@ -27,7 +26,7 @@ class Lister(amazonS3: AmazonS3) { .withPrefix(prefix.key) .withContinuationToken(token) - def fetchBatch: ListObjectsV2Request => TaskR[Console, Batch] = + def fetchBatch: ListObjectsV2Request => TaskR[MyConsole, Batch] = request => for { _ <- ListerLogger.logFetchBatch @@ -36,14 +35,15 @@ class Lister(amazonS3: AmazonS3) { def fetchMore( more: Option[Token] - ): TaskR[Console, Stream[S3ObjectSummary]] = { + ): TaskR[MyConsole, Stream[S3ObjectSummary]] = { more match { - case None => ZIO.succeed(Stream.empty) + case None => TaskR.succeed(Stream.empty) case Some(token) => fetch(requestMore(token)) } } - def fetch: ListObjectsV2Request => TaskR[Console, Stream[S3ObjectSummary]] = + def fetch + : ListObjectsV2Request => TaskR[MyConsole, Stream[S3ObjectSummary]] = request => { for { batch <- fetchBatch(request) @@ -63,7 +63,7 @@ class Lister(amazonS3: AmazonS3) { private def tryFetchBatch( request: ListObjectsV2Request ): Task[(Stream[S3ObjectSummary], Option[Token])] = - IO(amazonS3.listObjectsV2(request)) + Task(amazonS3.listObjectsV2(request)) .map { result => val more: Option[Token] = if (result.isTruncated) Some(result.getNextContinuationToken) diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ListerLogger.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ListerLogger.scala index 1bd49d5..94997f0 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ListerLogger.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ListerLogger.scala @@ -1,10 +1,10 @@ package net.kemitix.thorp.storage.aws +import net.kemitix.thorp.console._ import zio.TaskR -import zio.console._ trait ListerLogger { - def logFetchBatch: TaskR[Console, Unit] = + def logFetchBatch: TaskR[MyConsole, Unit] = putStrLn("Fetching remote summaries...") } object ListerLogger extends ListerLogger diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientException.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientException.scala new file mode 100644 index 0000000..1fae071 --- /dev/null +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientException.scala @@ -0,0 +1,13 @@ +package net.kemitix.thorp.storage.aws + +sealed trait S3ClientException extends Exception + +object S3ClientException { + case object HashMatchError extends S3ClientException { + override def getMessage: String = + "The hash of the object to be overwritten did not match the the expected value" + } + case class S3Exception(message: String) extends S3ClientException { + override def getMessage: String = message + } +} diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala index bdfa15d..0719a9d 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala @@ -1,14 +1,13 @@ package net.kemitix.thorp.storage.aws -import com.amazonaws.services.s3.AmazonS3 +import net.kemitix.thorp.console.MyConsole import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.api.StorageService -import zio.console.Console import zio.{Task, TaskR} class S3StorageService( - amazonS3Client: => AmazonS3, + amazonS3Client: => AmazonS3.Client, amazonTransferManager: => AmazonTransferManager ) extends StorageService { @@ -20,7 +19,7 @@ class S3StorageService( override def listObjects( bucket: Bucket, prefix: RemoteKey - ): TaskR[Console, S3ObjectsData] = + ): TaskR[MyConsole, S3ObjectsData] = objectLister.listObjects(bucket, prefix) override def copy( diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageServiceBuilder.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageServiceBuilder.scala index 67ddc87..578911a 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageServiceBuilder.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageServiceBuilder.scala @@ -1,13 +1,13 @@ package net.kemitix.thorp.storage.aws +import com.amazonaws.services.s3.AmazonS3ClientBuilder import com.amazonaws.services.s3.transfer.TransferManagerBuilder -import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder} import net.kemitix.thorp.storage.api.StorageService object S3StorageServiceBuilder { def createService( - amazonS3Client: AmazonS3, + amazonS3Client: AmazonS3.Client, amazonTransferManager: AmazonTransferManager ): StorageService = new S3StorageService( @@ -17,7 +17,7 @@ object S3StorageServiceBuilder { lazy val defaultStorageService: StorageService = createService( - AmazonS3ClientBuilder.defaultClient, + AmazonS3.ClientImpl(AmazonS3ClientBuilder.defaultClient), AmazonTransferManager(TransferManagerBuilder.defaultTransferManager) ) diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala index 3d6e84e..8755000 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala @@ -3,6 +3,7 @@ package net.kemitix.thorp.storage.aws import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener} import com.amazonaws.services.s3.model.{ObjectMetadata, PutObjectRequest} import net.kemitix.thorp.domain.StorageQueueEvent.{ + Action, ErrorQueueEvent, UploadQueueEvent } @@ -39,7 +40,12 @@ class Uploader(transferManager: => AmazonTransferManager) { .map(_.waitForUploadResult) .map(upload => UploadQueueEvent(RemoteKey(upload.getKey), MD5Hash(upload.getETag))) - .catchAll(e => Task.succeed(ErrorQueueEvent(localFile.remoteKey, e))) + .catchAll( + e => + Task.succeed( + ErrorQueueEvent(Action.Upload(localFile.remoteKey.key), + localFile.remoteKey, + e))) } private def request( diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala new file mode 100644 index 0000000..7a1f0f6 --- /dev/null +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala @@ -0,0 +1,18 @@ +package net.kemitix.thorp.storage.aws + +import org.scalamock.scalatest.MockFactory + +trait AmazonS3ClientTestFixture extends MockFactory { + + val fixture: Fixture = + Fixture(stub[AmazonS3.Client], stub[AmazonTransferManager]) + + case class Fixture( + amazonS3Client: AmazonS3.Client, + amazonS3TransferManager: AmazonTransferManager, + ) { + lazy val storageService: S3StorageService = + new S3StorageService(amazonS3Client, amazonS3TransferManager) + } + +} diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/CopierTest.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/CopierTest.scala new file mode 100644 index 0000000..99e5c39 --- /dev/null +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/CopierTest.scala @@ -0,0 +1,98 @@ +package net.kemitix.thorp.storage.aws + +import com.amazonaws.services.s3.model.{AmazonS3Exception, CopyObjectResult} +import net.kemitix.thorp.console.MyConsole +import net.kemitix.thorp.domain.StorageQueueEvent.{Action, ErrorQueueEvent} +import net.kemitix.thorp.domain._ +import net.kemitix.thorp.storage.aws.S3ClientException.{ + HashMatchError, + S3Exception +} +import org.scalatest.FreeSpec +import zio.Runtime +import zio.internal.PlatformLive + +class CopierTest extends FreeSpec { + + private val runtime = Runtime(MyConsole.Live, PlatformLive.Default) + + "copier" - { + val bucket = Bucket("aBucket") + val sourceKey = RemoteKey("sourceKey") + val hash = MD5Hash("aHash") + val targetKey = RemoteKey("targetKey") + "when source exists" - { + "when source hash matches" - { + "copies from source to target" in { + val event = StorageQueueEvent.CopyQueueEvent(sourceKey, targetKey) + val expected = Right(event) + new AmazonS3ClientTestFixture { + (fixture.amazonS3Client.copyObject _) + .when() + .returns(_ => Some(new CopyObjectResult)) + private val result = + invoke(bucket, sourceKey, hash, targetKey, fixture.storageService) + assertResult(expected)(result) + } + } + } + "when source hash does not match" - { + "skip the file with an error" in { + new AmazonS3ClientTestFixture { + (fixture.amazonS3Client.copyObject _) + .when() + .returns(_ => None) + private val result = + invoke(bucket, sourceKey, hash, targetKey, fixture.storageService) + result match { + case Right( + ErrorQueueEvent(Action.Copy("sourceKey => targetKey"), + RemoteKey("targetKey"), + e)) => + e match { + case HashMatchError => assert(true) + case _ => fail("Not a HashMatchError") + } + case e => fail("Not an ErrorQueueEvent: " + e) + } + } + } + } + "when client throws an exception" - { + "skip the file with an error" in { + new AmazonS3ClientTestFixture { + private val expectedMessage = "The specified key does not exist" + (fixture.amazonS3Client.copyObject _) + .when() + .throws(new AmazonS3Exception(expectedMessage)) + private val result = + invoke(bucket, sourceKey, hash, targetKey, fixture.storageService) + result match { + case Right( + ErrorQueueEvent(Action.Copy("sourceKey => targetKey"), + RemoteKey("targetKey"), + e)) => + e match { + case S3Exception(message) => + assert(message.startsWith(expectedMessage)) + case _ => fail("Not an S3Exception") + } + case e => fail("Not an ErrorQueueEvent: " + e) + } + } + } + } + } + def invoke( + bucket: Bucket, + sourceKey: RemoteKey, + hash: MD5Hash, + targetKey: RemoteKey, + storageService: S3StorageService + ) = + runtime.unsafeRunSync { + storageService.copy(bucket, sourceKey, hash, targetKey) + }.toEither + } + +} diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3StorageServiceSuite.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3StorageServiceSuite.scala index f6ff943..86722ec 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3StorageServiceSuite.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3StorageServiceSuite.scala @@ -4,35 +4,20 @@ import java.time.Instant import java.time.temporal.ChronoUnit import java.util.Date -import com.amazonaws.services.s3.AmazonS3 -import com.amazonaws.services.s3.model.{ - ListObjectsV2Request, - ListObjectsV2Result, - S3ObjectSummary -} +import com.amazonaws.services.s3.model.{ListObjectsV2Result, S3ObjectSummary} +import net.kemitix.thorp.console.MyConsole import net.kemitix.thorp.core.Resource import net.kemitix.thorp.domain._ import org.scalamock.scalatest.MockFactory -import org.scalatest.FunSpec -import zio.DefaultRuntime +import org.scalatest.FreeSpec +import zio.Runtime +import zio.internal.PlatformLive -class S3StorageServiceSuite extends FunSpec with MockFactory { +class S3StorageServiceSuite extends FreeSpec with MockFactory { - private val runtime = new DefaultRuntime {} - - describe("listObjectsInPrefix") { - val source = Resource(this, "upload") - val sourcePath = source.toPath - val prefix = RemoteKey("prefix") - implicit val config: Config = - Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath))) - - val lm = LastModified(Instant.now.truncatedTo(ChronoUnit.MILLIS)) - - val h1 = MD5Hash("hash1") - - val k1a = RemoteKey("key1a") + private val runtime = Runtime(MyConsole.Live, PlatformLive.Default) + "listObjects" - { def objectSummary(hash: MD5Hash, remoteKey: RemoteKey, lastModified: LastModified) = { @@ -42,31 +27,27 @@ class S3StorageServiceSuite extends FunSpec with MockFactory { summary.setLastModified(Date.from(lastModified.when)) summary } - - val o1a = objectSummary(h1, k1a, lm) - - val k1b = RemoteKey("key1b") - val o1b = objectSummary(h1, k1b, lm) - - val h2 = MD5Hash("hash2") - val k2 = RemoteKey("key2") - val o2 = objectSummary(h2, k2, lm) - - val amazonS3 = stub[AmazonS3] - val amazonS3TransferManager = stub[AmazonTransferManager] - val storageService = new S3StorageService(amazonS3, amazonS3TransferManager) - + val source = Resource(this, "upload") + val sourcePath = source.toPath + val prefix = RemoteKey("prefix") + implicit val config: Config = + Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath))) + val lm = LastModified(Instant.now.truncatedTo(ChronoUnit.MILLIS)) + val h1 = MD5Hash("hash1") + val k1a = RemoteKey("key1a") + val o1a = objectSummary(h1, k1a, lm) + val k1b = RemoteKey("key1b") + val o1b = objectSummary(h1, k1b, lm) + val h2 = MD5Hash("hash2") + val k2 = RemoteKey("key2") + val o2 = objectSummary(h2, k2, lm) val myFakeResponse = new ListObjectsV2Result() val summaries = myFakeResponse.getObjectSummaries summaries.add(o1a) summaries.add(o1b) summaries.add(o2) - (amazonS3 listObjectsV2 (_: ListObjectsV2Request)) - .when(*) - .returns(myFakeResponse) - it( - "should build list of hash lookups, with duplicate objects grouped by hash") { + "should build list of hash lookups, with duplicate objects grouped by hash" in { val expected = Right( S3ObjectsData( byHash = Map(h1 -> Set(KeyModified(k1a, lm), KeyModified(k1b, lm)), @@ -75,15 +56,19 @@ class S3StorageServiceSuite extends FunSpec with MockFactory { k1b -> HashModified(h1, lm), k2 -> HashModified(h2, lm)) )) - val result = invoke(storageService) - assertResult(expected)(result) + new AmazonS3ClientTestFixture { + (fixture.amazonS3Client.listObjectsV2 _) + .when() + .returns(_ => myFakeResponse) + private val result = invoke(fixture.storageService) + assertResult(expected)(result) + } } + def invoke(storageService: S3StorageService) = + runtime.unsafeRunSync { + storageService + .listObjects(Bucket("bucket"), RemoteKey("prefix")) + }.toEither } - private def invoke(storageService: S3StorageService) = - runtime.unsafeRunSync { - storageService - .listObjects(Bucket("bucket"), RemoteKey("prefix")) - }.toEither - } diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala index 71764c7..170ec8c 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala @@ -2,7 +2,6 @@ package net.kemitix.thorp.storage.aws import java.time.Instant -import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.PutObjectRequest import com.amazonaws.services.s3.transfer.model.UploadResult import net.kemitix.thorp.core.{KeyGenerator, Resource, S3MetaDataEnricher} @@ -122,10 +121,10 @@ class StorageServiceSuite extends FunSpec with MockFactory { describe("upload") { describe("when uploading a file") { - val amazonS3 = stub[AmazonS3] + val amazonS3Client = stub[AmazonS3.Client] val amazonTransferManager = stub[AmazonTransferManager] val storageService = - new S3StorageService(amazonS3, amazonTransferManager) + new S3StorageService(amazonS3Client, amazonTransferManager) val prefix = RemoteKey("prefix") val localFile = diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala deleted file mode 100644 index 0f6909c..0000000 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala +++ /dev/null @@ -1,55 +0,0 @@ -package net.kemitix.thorp.storage.aws - -import com.amazonaws.services.s3.AmazonS3 -import com.amazonaws.services.s3.transfer._ -import net.kemitix.thorp.core.KeyGenerator.generateKey -import net.kemitix.thorp.core.Resource -import net.kemitix.thorp.domain.StorageQueueEvent.UploadQueueEvent -import net.kemitix.thorp.domain.{UploadEventListener, _} -import org.scalamock.scalatest.MockFactory -import org.scalatest.FunSpec - -class UploaderSuite extends FunSpec with MockFactory { - - private val batchMode: Boolean = true - private val source = Resource(this, ".") - private val sourcePath = source.toPath - private val prefix = RemoteKey("prefix") - implicit private val config: Config = - Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath))) - private val fileToKey = generateKey(config.sources, config.prefix) _ - - def md5HashMap(hash: MD5Hash): Map[String, MD5Hash] = Map("md5" -> hash) - - describe("S3ClientMultiPartTransferManagerSuite") { - describe("upload") { - pending - // how much of this test is testing the amazonTransferManager - // Should we just test that the correct parameters are passed to initiate, or will this test - // just collapse and die if the amazonS3 doesn't respond properly to TransferManager input - // dies when putObject is called - val returnedKey = RemoteKey("returned-key") - val returnedHash = MD5Hash("returned-hash") - val bigFile = LocalFile.resolve("small-file", - md5HashMap(MD5Hash("the-hash")), - sourcePath, - fileToKey) - val uploadEventListener = - UploadEventListener(bigFile, 1, SyncTotals(), 0L) - val amazonS3 = mock[AmazonS3] - val amazonTransferManager = - AmazonTransferManager( - TransferManagerBuilder.standard().withS3Client(amazonS3).build) - val uploader = new Uploader(amazonTransferManager) - it("should upload") { - val expected = UploadQueueEvent(returnedKey, returnedHash) - val result = uploader.upload(bigFile, - config.bucket, - batchMode, - uploadEventListener, - 1) - assertResult(expected)(result) - } - } - } -}