diff --git a/CHANGELOG.org b/CHANGELOG.org index 996eab3..c3556ae 100644 --- a/CHANGELOG.org +++ b/CHANGELOG.org @@ -16,9 +16,10 @@ The format is based on [[https://keepachangelog.com/en/1.0.0/][Keep a Changelog] ** Changed - - Replace cats-effect with zio (#117) - - Replace Monocle with local SimpleLens implementation (#121) - - Don't use String as key in Map for hashes (#124) + - [internal] Replace cats-effect with zio (#117) + - [internal] Replace Monocle with local SimpleLens implementation (#121) + - [internal] Don't use String as key in Map for hashes (#124) + - [internal] Convert Storage to full ZIO effect module (#133) ** Dependencies diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/ParseArgs.scala b/cli/src/main/scala/net/kemitix/thorp/cli/CliArgs.scala similarity index 95% rename from cli/src/main/scala/net/kemitix/thorp/cli/ParseArgs.scala rename to cli/src/main/scala/net/kemitix/thorp/cli/CliArgs.scala index 3042ff1..26901db 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/ParseArgs.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/CliArgs.scala @@ -6,9 +6,9 @@ import net.kemitix.thorp.core.{ConfigOption, ConfigOptions} import scopt.OParser import zio.Task -object ParseArgs { +object CliArgs { - def apply(args: List[String]): Task[ConfigOptions] = Task { + def parse(args: List[String]): Task[ConfigOptions] = Task { OParser .parse(configParser, args, List()) .map(ConfigOptions(_)) diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala b/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala index 13ee928..1a36fbd 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala @@ -1,22 +1,17 @@ package net.kemitix.thorp.cli -import net.kemitix.thorp.console.MyConsole -import zio.internal.PlatformLive -import zio.{App, Runtime, UIO, ZIO} +import net.kemitix.thorp.console.Console +import net.kemitix.thorp.storage.aws.S3Storage +import zio.{App, ZIO} object Main extends App { - private val runtime = Runtime(MyConsole.Live, PlatformLive.Default) + object LiveThorpApp extends S3Storage.Live with Console.Live override def run(args: List[String]): ZIO[Environment, Nothing, Int] = - runtime.unsafeRun { - appLogic(args).fold(_ => UIO(1), _ => UIO(0)) - } - - private def appLogic(args: List[String]): ZIO[MyConsole, Throwable, Unit] = - for { - cliOptions <- ParseArgs(args) - _ <- Program.run(cliOptions) - } yield () + Program + .run(args) + .provide(LiveThorpApp) + .fold(_ => 1, _ => 0) } diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala index 6fadc2e..cb05e74 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala @@ -1,79 +1,87 @@ package net.kemitix.thorp.cli import net.kemitix.thorp.console._ +import net.kemitix.thorp.core.CoreTypes.CoreProgram import net.kemitix.thorp.core._ -import net.kemitix.thorp.domain.{StorageQueueEvent, SyncTotals} +import net.kemitix.thorp.domain.StorageQueueEvent import net.kemitix.thorp.storage.aws.S3HashService.defaultHashService -import net.kemitix.thorp.storage.aws.S3StorageServiceBuilder.defaultStorageService -import zio.{Task, TaskR, ZIO} +import zio.{UIO, ZIO} -trait Program extends PlanBuilder { +trait Program { lazy val version = s"Thorp v${thorp.BuildInfo.version}" - def run(cliOptions: ConfigOptions): ZIO[MyConsole, Nothing, Unit] = { - val showVersion = ConfigQuery.showVersion(cliOptions) + def run(args: List[String]): CoreProgram[Unit] = { for { - _ <- ZIO.when(showVersion)(putStrLn(version)) - _ <- ZIO.when(!showVersion)(execute(cliOptions).catchAll(handleErrors)) + cli <- CliArgs.parse(args) + _ <- ZIO.when(showVersion(cli))(putStrLn(version)) + _ <- ZIO.when(!showVersion(cli))(execute(cli).catchAll(handleErrors)) } yield () } - private def execute( - cliOptions: ConfigOptions): ZIO[MyConsole, Throwable, Unit] = { + private def showVersion: ConfigOptions => Boolean = + cli => ConfigQuery.showVersion(cli) + + private def execute(cliOptions: ConfigOptions) = { for { - plan <- createPlan(defaultStorageService, defaultHashService, cliOptions) - archive <- thorpArchive(cliOptions, plan.syncTotals) - events <- handleActions(archive, plan) - _ <- defaultStorageService.shutdown - _ <- SyncLogging.logRunFinished(events) + plan <- PlanBuilder.createPlan(defaultHashService, cliOptions) + batchMode <- isBatchMode(cliOptions) + archive <- UnversionedMirrorArchive.default(batchMode, plan.syncTotals) + events <- applyPlan(archive, plan) + _ <- SyncLogging.logRunFinished(events) } yield () } - private def handleErrors( - throwable: Throwable): ZIO[MyConsole, Nothing, Unit] = + private def isBatchMode(cliOptions: ConfigOptions) = + UIO(ConfigQuery.batchMode(cliOptions)) + + private def handleErrors(throwable: Throwable) = for { _ <- putStrLn("There were errors:") _ <- throwable match { case ConfigValidationException(errors) => ZIO.foreach(errors)(error => putStrLn(s"- $error")) - case x => throw x } } yield () - def thorpArchive( - cliOptions: ConfigOptions, - syncTotals: SyncTotals - ): Task[ThorpArchive] = Task { - UnversionedMirrorArchive.default( - defaultStorageService, - ConfigQuery.batchMode(cliOptions), - syncTotals - ) - } - - private def handleActions( + private def applyPlan( archive: ThorpArchive, syncPlan: SyncPlan - ): TaskR[MyConsole, Stream[StorageQueueEvent]] = { - type Accumulator = (Stream[StorageQueueEvent], Long) - val zero: Accumulator = (Stream(), syncPlan.syncTotals.totalSizeBytes) - TaskR - .foldLeft(syncPlan.actions.zipWithIndex)(zero)((acc, indexedAction) => { - val (action, index) = indexedAction - val (stream, bytesToDo) = acc - val remainingBytes = bytesToDo - action.size - (for { - event <- archive.update(index, action, remainingBytes) - events = stream ++ Stream(event) - } yield events) - .map((_, remainingBytes)) - }) + ) = { + val zero: (Stream[StorageQueueEvent], Long) = + (Stream(), syncPlan.syncTotals.totalSizeBytes) + val actions = syncPlan.actions.zipWithIndex + ZIO + .foldLeft(actions)(zero)((acc, action) => + applyAction(archive, acc, action)) .map { case (events, _) => events } } + private def applyAction( + archive: ThorpArchive, + acc: (Stream[StorageQueueEvent], Long), + indexedAction: (Action, Int) + ) = { + val (action, index) = indexedAction + val (stream, bytesToDo) = acc + val remainingBytes = bytesToDo - action.size + updateArchive(archive, action, index, stream, remainingBytes) + .map((_, remainingBytes)) + } + + private def updateArchive( + archive: ThorpArchive, + action: Action, + index: Int, + stream: Stream[StorageQueueEvent], + remainingBytes: Long + ) = + for { + event <- archive.update(index, action, remainingBytes) + } yield stream ++ Stream(event) + } object Program extends Program diff --git a/cli/src/test/scala/net/kemitix/thorp/cli/ParseArgsTest.scala b/cli/src/test/scala/net/kemitix/thorp/cli/CliArgsTest.scala similarity index 97% rename from cli/src/test/scala/net/kemitix/thorp/cli/ParseArgsTest.scala rename to cli/src/test/scala/net/kemitix/thorp/cli/CliArgsTest.scala index bc94662..1596b6c 100644 --- a/cli/src/test/scala/net/kemitix/thorp/cli/ParseArgsTest.scala +++ b/cli/src/test/scala/net/kemitix/thorp/cli/CliArgsTest.scala @@ -9,7 +9,7 @@ import zio.DefaultRuntime import scala.util.Try -class ParseArgsTest extends FunSpec { +class CliArgsTest extends FunSpec { private val runtime = new DefaultRuntime {} @@ -77,7 +77,7 @@ class ParseArgsTest extends FunSpec { private def invoke(args: List[String]) = runtime .unsafeRunSync { - ParseArgs(args) + CliArgs.parse(args) } .toEither .toOption diff --git a/cli/src/test/scala/net/kemitix/thorp/cli/ProgramTest.scala b/cli/src/test/scala/net/kemitix/thorp/cli/ProgramTest.scala deleted file mode 100644 index eb4f221..0000000 --- a/cli/src/test/scala/net/kemitix/thorp/cli/ProgramTest.scala +++ /dev/null @@ -1,84 +0,0 @@ -package net.kemitix.thorp.cli - -import java.io.File -import java.nio.file.Path - -import net.kemitix.thorp.console.MyConsole -import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload} -import net.kemitix.thorp.core._ -import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent -import net.kemitix.thorp.domain._ -import net.kemitix.thorp.storage.api.{HashService, StorageService} -import org.scalatest.FunSpec -import zio.internal.PlatformLive -import zio.{Runtime, Task, TaskR} - -class ProgramTest extends FunSpec { - - private val runtime = Runtime(MyConsole.Live, PlatformLive.Default) - - val source: File = Resource(this, ".") - val sourcePath: Path = source.toPath - val bucket: Bucket = Bucket("aBucket") - val hash: MD5Hash = MD5Hash("aHash") - val copyAction: Action = - ToCopy(bucket, RemoteKey("copy-me"), hash, RemoteKey("overwrite-me"), 17L) - val uploadAction: Action = ToUpload( - bucket, - LocalFile.resolve("aFile", Map(), sourcePath, _ => RemoteKey("upload-me")), - 23L) - val deleteAction: Action = ToDelete(bucket, RemoteKey("delete-me"), 0L) - - val configOptions: ConfigOptions = ConfigOptions( - options = List( - ConfigOption.IgnoreGlobalOptions, - ConfigOption.IgnoreUserOptions - )) - - describe("upload, copy and delete actions in plan") { - val archive = TestProgram.thorpArchive - it("should be handled in correct order") { - val expected = List(copyAction, uploadAction, deleteAction) - invoke(configOptions) - val result = archive.actions.reverse - assertResult(expected)(result) - } - } - - private def invoke(configOptions: ConfigOptions) = - runtime.unsafeRunSync { - TestProgram.run(configOptions) - }.toEither - - trait TestPlanBuilder extends PlanBuilder { - override def createPlan( - storageService: StorageService, - hashService: HashService, - configOptions: ConfigOptions - ): Task[SyncPlan] = { - Task(SyncPlan(Stream(copyAction, uploadAction, deleteAction))) - } - } - - class ActionCaptureArchive extends ThorpArchive { - var actions: List[Action] = List[Action]() - override def update( - index: Int, - action: Action, - totalBytesSoFar: Long - ): TaskR[MyConsole, StorageQueueEvent] = { - actions = action :: actions - TaskR(DoNothingQueueEvent(RemoteKey(""))) - } - } - - object TestProgram extends Program with TestPlanBuilder { - val thorpArchive: ActionCaptureArchive = new ActionCaptureArchive - override def thorpArchive( - cliOptions: ConfigOptions, - syncTotals: SyncTotals - ): Task[ThorpArchive] = - Task(thorpArchive) - } - -} diff --git a/console/src/main/scala/net/kemitix/thorp/console/Console.scala b/console/src/main/scala/net/kemitix/thorp/console/Console.scala new file mode 100644 index 0000000..8aa3d40 --- /dev/null +++ b/console/src/main/scala/net/kemitix/thorp/console/Console.scala @@ -0,0 +1,54 @@ +package net.kemitix.thorp.console + +import java.io.PrintStream +import java.util.concurrent.atomic.AtomicReference + +import zio.{UIO, ZIO} + +import scala.{Console => SConsole} + +trait Console { + val console: Console.Service +} + +object Console { + + trait Service { + def putStrLn(line: ConsoleOut): ZIO[Console, Nothing, Unit] + def putStrLn(line: String): ZIO[Console, Nothing, Unit] + } + + trait Live extends Console { + val console: Service = new Service { + override def putStrLn(line: ConsoleOut): ZIO[Console, Nothing, Unit] = + putStrLn(line.en) + override def putStrLn(line: String): ZIO[Console, Nothing, Unit] = + putStrLn(SConsole.out)(line) + final def putStrLn(stream: PrintStream)( + line: String): ZIO[Console, Nothing, Unit] = + UIO(SConsole.withOut(stream)(SConsole.println(line))) + } + } + + object Live extends Live + + trait Test extends Console { + + private val output = new AtomicReference(List.empty[String]) + def getOutput: List[String] = output.get + + val console: Service = new Service { + override def putStrLn(line: ConsoleOut): ZIO[Console, Nothing, Unit] = + putStrLn(line.en) + + override def putStrLn(line: String): ZIO[Console, Nothing, Unit] = { + output.accumulateAndGet(List(line), (a, b) => a ++ b) + ZIO.succeed(()) + } + + } + } + + object Test extends Test + +} diff --git a/console/src/main/scala/net/kemitix/thorp/console/MyConsole.scala b/console/src/main/scala/net/kemitix/thorp/console/MyConsole.scala deleted file mode 100644 index 3b379b0..0000000 --- a/console/src/main/scala/net/kemitix/thorp/console/MyConsole.scala +++ /dev/null @@ -1,31 +0,0 @@ -package net.kemitix.thorp.console - -import java.io.PrintStream - -import zio.{UIO, ZIO} - -trait MyConsole { - val console: MyConsole.Service -} - -object MyConsole { - - trait Service { - def putStrLn(line: ConsoleOut): ZIO[MyConsole, Nothing, Unit] - def putStrLn(line: String): ZIO[MyConsole, Nothing, Unit] - } - - trait Live extends MyConsole { - val console: Service = new Service { - override def putStrLn(line: ConsoleOut): ZIO[MyConsole, Nothing, Unit] = - putStrLn(line) - override def putStrLn(line: String): ZIO[MyConsole, Nothing, Unit] = - putStrLn(Console.out)(line) - final def putStrLn(stream: PrintStream)( - line: String): ZIO[MyConsole, Nothing, Unit] = - UIO(Console.withOut(stream)(Console.println(line))) - } - } - - object Live extends Live -} diff --git a/console/src/main/scala/net/kemitix/thorp/console/package.scala b/console/src/main/scala/net/kemitix/thorp/console/package.scala index 49e5eb2..b96d052 100644 --- a/console/src/main/scala/net/kemitix/thorp/console/package.scala +++ b/console/src/main/scala/net/kemitix/thorp/console/package.scala @@ -2,15 +2,15 @@ package net.kemitix.thorp import zio.ZIO -package object console extends MyConsole.Service { +package object console { - final val consoleService: ZIO[MyConsole, Nothing, MyConsole.Service] = + final val consoleService: ZIO[Console, Nothing, Console.Service] = ZIO.access(_.console) - final def putStrLn(line: String): ZIO[MyConsole, Nothing, Unit] = + final def putStrLn(line: String): ZIO[Console, Nothing, Unit] = ZIO.accessM(_.console putStrLn line) - override def putStrLn(line: ConsoleOut): ZIO[MyConsole, Nothing, Unit] = - putStrLn(line.en) + final def putStrLn(line: ConsoleOut): ZIO[Console, Nothing, Unit] = + ZIO.accessM(_.console putStrLn line) } diff --git a/core/src/main/scala/net/kemitix/thorp/core/CoreTypes.scala b/core/src/main/scala/net/kemitix/thorp/core/CoreTypes.scala new file mode 100644 index 0000000..b60da04 --- /dev/null +++ b/core/src/main/scala/net/kemitix/thorp/core/CoreTypes.scala @@ -0,0 +1,12 @@ +package net.kemitix.thorp.core + +import net.kemitix.thorp.console.Console +import net.kemitix.thorp.storage.api.Storage +import zio.ZIO + +object CoreTypes { + + type CoreEnv = Storage with Console + type CoreProgram[A] = ZIO[CoreEnv, Throwable, A] + +} diff --git a/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala b/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala index b1e2347..91d8271 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala @@ -3,44 +3,42 @@ package net.kemitix.thorp.core import net.kemitix.thorp.console._ import net.kemitix.thorp.core.Action._ import net.kemitix.thorp.domain._ -import net.kemitix.thorp.storage.api.{HashService, StorageService} +import net.kemitix.thorp.storage._ +import net.kemitix.thorp.storage.api.{HashService, Storage} import zio.{Task, TaskR} trait PlanBuilder { def createPlan( - storageService: StorageService, hashService: HashService, configOptions: ConfigOptions - ): TaskR[MyConsole, SyncPlan] = + ): TaskR[Storage with Console, SyncPlan] = ConfigurationBuilder .buildConfig(configOptions) .catchAll(errors => TaskR.fail(ConfigValidationException(errors))) - .flatMap(config => useValidConfig(storageService, hashService)(config)) + .flatMap(config => useValidConfig(hashService)(config)) - def useValidConfig( - storageService: StorageService, + private def useValidConfig( hashService: HashService - )(implicit c: Config): TaskR[MyConsole, SyncPlan] = { + )(implicit c: Config) = { for { _ <- SyncLogging.logRunStart(c.bucket, c.prefix, c.sources) - actions <- buildPlan(storageService, hashService) + actions <- buildPlan(hashService) } yield actions } private def buildPlan( - storageService: StorageService, hashService: HashService - )(implicit c: Config): TaskR[MyConsole, SyncPlan] = + )(implicit c: Config) = for { - metadata <- gatherMetadata(storageService, hashService) + metadata <- gatherMetadata(hashService) } yield assemblePlan(c)(metadata) - def assemblePlan( + private def assemblePlan( implicit c: Config): ((S3ObjectsData, LocalFiles)) => SyncPlan = { case (remoteData, localData) => SyncPlan( - actions = createActions(remoteData, localData) + actions = createActions(c)(remoteData)(localData) .filter(doesSomething) .sortBy(SequencePlan.order), syncTotals = SyncTotals(count = localData.count, @@ -48,63 +46,64 @@ trait PlanBuilder { ) } - private def createActions( - remoteData: S3ObjectsData, - localData: LocalFiles - )(implicit c: Config): Stream[Action] = - actionsForLocalFiles(localData, remoteData) ++ - actionsForRemoteKeys(remoteData) + private def createActions + : Config => S3ObjectsData => LocalFiles => Stream[Action] = + c => + remoteData => + localData => + actionsForLocalFiles(c)(remoteData)(localData) ++ + actionsForRemoteKeys(c)(remoteData) - def doesSomething: Action => Boolean = { + private def doesSomething: Action => Boolean = { case _: DoNothing => false case _ => true } - private val emptyActionStream = Stream[Action]() + private def actionsForLocalFiles + : Config => S3ObjectsData => LocalFiles => Stream[Action] = + c => + remoteData => + localData => + localData.localFiles.foldLeft(Stream.empty[Action])((acc, lf) => + createActionFromLocalFile(c)(lf)(remoteData)(acc) ++ acc) - private def actionsForLocalFiles( - localData: LocalFiles, - remoteData: S3ObjectsData - )(implicit c: Config) = - localData.localFiles.foldLeft(emptyActionStream)((acc, lf) => - createActionFromLocalFile(lf, remoteData, acc) ++ acc) + private def createActionFromLocalFile + : Config => LocalFile => S3ObjectsData => Stream[Action] => Stream[Action] = + c => + lf => + remoteData => + previousActions => + ActionGenerator.createActions( + S3MetaDataEnricher.getMetadata(lf, remoteData)(c), + previousActions)(c) - private def createActionFromLocalFile( - lf: LocalFile, - remoteData: S3ObjectsData, - previousActions: Stream[Action] - )(implicit c: Config) = - ActionGenerator.createActions( - S3MetaDataEnricher.getMetadata(lf, remoteData), - previousActions) + private def actionsForRemoteKeys: Config => S3ObjectsData => Stream[Action] = + c => + remoteData => + remoteData.byKey.keys.foldLeft(Stream.empty[Action])((acc, rk) => + createActionFromRemoteKey(c)(rk) #:: acc) - private def actionsForRemoteKeys(remoteData: S3ObjectsData)( - implicit c: Config) = - remoteData.byKey.keys.foldLeft(emptyActionStream)((acc, rk) => - createActionFromRemoteKey(rk) #:: acc) - - private def createActionFromRemoteKey(rk: RemoteKey)(implicit c: Config) = - if (rk.isMissingLocally(c.sources, c.prefix)) - Action.ToDelete(c.bucket, rk, 0L) - else DoNothing(c.bucket, rk, 0L) + private def createActionFromRemoteKey: Config => RemoteKey => Action = + c => + rk => + if (rk.isMissingLocally(c.sources, c.prefix)) + Action.ToDelete(c.bucket, rk, 0L) + else DoNothing(c.bucket, rk, 0L) private def gatherMetadata( - storageService: StorageService, hashService: HashService - )(implicit c: Config): TaskR[MyConsole, (S3ObjectsData, LocalFiles)] = + )(implicit c: Config) = for { - remoteData <- fetchRemoteData(storageService) + remoteData <- fetchRemoteData localData <- findLocalFiles(hashService) } yield (remoteData, localData) - private def fetchRemoteData( - storageService: StorageService - )(implicit c: Config): TaskR[MyConsole, S3ObjectsData] = - storageService.listObjects(c.bucket, c.prefix) + private def fetchRemoteData(implicit c: Config) = + listObjects(c.bucket, c.prefix) private def findLocalFiles( hashService: HashService - )(implicit config: Config): TaskR[MyConsole, LocalFiles] = + )(implicit config: Config) = for { _ <- SyncLogging.logFileScan localFiles <- findFiles(hashService) @@ -112,7 +111,7 @@ trait PlanBuilder { private def findFiles( hashService: HashService - )(implicit c: Config): Task[LocalFiles] = { + )(implicit c: Config) = { Task .foreach(c.sources.paths)(LocalFileStream.findFiles(_, hashService)) .map(_.foldLeft(LocalFiles())((acc, localFile) => acc ++ localFile)) diff --git a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala b/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala index c0ac772..bbe0034 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala @@ -17,17 +17,17 @@ trait SyncLogging { bucket: Bucket, prefix: RemoteKey, sources: Sources - ): ZIO[MyConsole, Nothing, Unit] = + ): ZIO[Console, Nothing, Unit] = for { _ <- putStrLn(ConsoleOut.ValidConfig(bucket, prefix, sources)) } yield () - def logFileScan(implicit c: Config): ZIO[MyConsole, Nothing, Unit] = + def logFileScan(implicit c: Config): ZIO[Console, Nothing, Unit] = putStrLn(s"Scanning local files: ${c.sources.paths.mkString(", ")}...") def logRunFinished( actions: Stream[StorageQueueEvent] - ): ZIO[MyConsole, Nothing, Unit] = { + ): ZIO[Console, Nothing, Unit] = { val counters = actions.foldLeft(Counters())(countActivities) for { _ <- putStrLn(eraseToEndOfScreen) diff --git a/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala b/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala index 1292136..e3d2641 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala @@ -11,6 +11,7 @@ import net.kemitix.thorp.domain.StorageQueueEvent.{ UploadQueueEvent } import net.kemitix.thorp.domain.Terminal._ +import net.kemitix.thorp.storage.api.Storage import zio.TaskR import scala.io.AnsiColor._ @@ -21,12 +22,12 @@ trait ThorpArchive { index: Int, action: Action, totalBytesSoFar: Long - ): TaskR[MyConsole, StorageQueueEvent] + ): TaskR[Storage with Console, StorageQueueEvent] def logEvent( event: StorageQueueEvent, batchMode: Boolean - ): TaskR[MyConsole, Unit] = + ): TaskR[Console, Unit] = event match { case UploadQueueEvent(remoteKey, _) => for { diff --git a/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala b/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala index 82fd076..a47ab81 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala @@ -4,11 +4,11 @@ import net.kemitix.thorp.console._ import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent import net.kemitix.thorp.domain._ -import net.kemitix.thorp.storage.api.StorageService +import net.kemitix.thorp.storage._ +import net.kemitix.thorp.storage.api.Storage import zio.{Task, TaskR} case class UnversionedMirrorArchive( - storageService: StorageService, batchMode: Boolean, syncTotals: SyncTotals ) extends ThorpArchive { @@ -17,7 +17,7 @@ case class UnversionedMirrorArchive( index: Int, action: Action, totalBytesSoFar: Long - ): TaskR[MyConsole, StorageQueueEvent] = + ): TaskR[Storage with Console, StorageQueueEvent] = action match { case ToUpload(bucket, localFile, _) => for { @@ -26,12 +26,12 @@ case class UnversionedMirrorArchive( } yield event case ToCopy(bucket, sourceKey, hash, targetKey, _) => for { - event <- storageService.copy(bucket, sourceKey, hash, targetKey) + event <- copyObject(bucket, sourceKey, hash, targetKey) _ <- logEvent(event, batchMode) } yield event case ToDelete(bucket, remoteKey, _) => for { - event <- storageService.delete(bucket, remoteKey) + event <- deleteObject(bucket, remoteKey) _ <- logEvent(event, batchMode) } yield event case DoNothing(_, remoteKey, _) => @@ -44,19 +44,19 @@ case class UnversionedMirrorArchive( bucket: Bucket, localFile: LocalFile ) = - storageService.upload( - localFile, - bucket, - batchMode, - UploadEventListener(localFile, index, syncTotals, totalBytesSoFar), - 1) + upload(localFile, + bucket, + batchMode, + UploadEventListener(localFile, index, syncTotals, totalBytesSoFar), + 1) } object UnversionedMirrorArchive { def default( - storageService: StorageService, batchMode: Boolean, syncTotals: SyncTotals - ): ThorpArchive = - new UnversionedMirrorArchive(storageService, batchMode, syncTotals) + ): Task[ThorpArchive] = + Task { + new UnversionedMirrorArchive(batchMode, syncTotals) + } } diff --git a/core/src/test/scala/net/kemitix/thorp/core/DummyStorageService.scala b/core/src/test/scala/net/kemitix/thorp/core/DummyStorageService.scala index 8298c0e..b59e62e 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/DummyStorageService.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/DummyStorageService.scala @@ -4,39 +4,39 @@ import java.io.File import net.kemitix.thorp.console._ import net.kemitix.thorp.domain._ -import net.kemitix.thorp.storage.api.StorageService -import zio.{Task, TaskR} +import net.kemitix.thorp.storage.api.Storage +import zio.{TaskR, UIO} case class DummyStorageService(s3ObjectData: S3ObjectsData, uploadFiles: Map[File, (RemoteKey, MD5Hash)]) - extends StorageService { + extends Storage.Service { - override def shutdown: Task[StorageQueueEvent] = - Task(StorageQueueEvent.ShutdownQueueEvent()) + override def shutdown: UIO[StorageQueueEvent] = + UIO(StorageQueueEvent.ShutdownQueueEvent()) override def listObjects( bucket: Bucket, prefix: RemoteKey - ): TaskR[MyConsole, S3ObjectsData] = + ): TaskR[Console, S3ObjectsData] = TaskR(s3ObjectData) override def upload(localFile: LocalFile, bucket: Bucket, batchMode: Boolean, uploadEventListener: UploadEventListener, - tryCount: Int): Task[StorageQueueEvent] = { + tryCount: Int): UIO[StorageQueueEvent] = { val (remoteKey, md5Hash) = uploadFiles(localFile.file) - Task(StorageQueueEvent.UploadQueueEvent(remoteKey, md5Hash)) + UIO(StorageQueueEvent.UploadQueueEvent(remoteKey, md5Hash)) } override def copy(bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, - targetKey: RemoteKey): Task[StorageQueueEvent] = - Task(StorageQueueEvent.CopyQueueEvent(sourceKey, targetKey)) + targetKey: RemoteKey): UIO[StorageQueueEvent] = + UIO(StorageQueueEvent.CopyQueueEvent(sourceKey, targetKey)) override def delete(bucket: Bucket, - remoteKey: RemoteKey): Task[StorageQueueEvent] = - Task(StorageQueueEvent.DeleteQueueEvent(remoteKey)) + remoteKey: RemoteKey): UIO[StorageQueueEvent] = + UIO(StorageQueueEvent.DeleteQueueEvent(remoteKey)) } diff --git a/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala b/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala index 9fd0d75..8e19aee 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala @@ -7,17 +7,13 @@ import net.kemitix.thorp.console._ import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain._ -import net.kemitix.thorp.storage.api.{HashService, StorageService} +import net.kemitix.thorp.storage.api.{HashService, Storage} import org.scalatest.FreeSpec -import zio.Runtime -import zio.internal.PlatformLive +import zio.{DefaultRuntime, Task, UIO} class PlanBuilderTest extends FreeSpec with TemporaryFolder { - private val runtime = Runtime(MyConsole.Live, PlatformLive.Default) - private val lastModified: LastModified = LastModified() - private val planBuilder = new PlanBuilder {} private val emptyS3ObjectData = S3ObjectsData() "create a plan" - { @@ -25,6 +21,10 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { val hashService = SimpleHashService() "one source" - { + val options: Path => ConfigOptions = + source => + configOptions(ConfigOption.Source(source), + ConfigOption.Bucket("a-bucket")) "a file" - { val filename = "aFile" val remoteKey = RemoteKey(filename) @@ -34,24 +34,12 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { withDirectory(source => { val file = createFile(source, filename, "file-content") val hash = md5Hash(file) - - val expected = Right( - List( - toUpload(remoteKey, hash, source, file) - )) - - val storageService = - DummyStorageService(emptyS3ObjectData, - Map( - file -> (remoteKey, hash) - )) - + val expected = + Right(List(toUpload(remoteKey, hash, source, file))) val result = - invoke(storageService, - hashService, - configOptions(ConfigOption.Source(source), - ConfigOption.Bucket("a-bucket"))) - + invoke(hashService, + options(source), + UIO.succeed(emptyS3ObjectData)) assertResult(expected)(result) }) } @@ -64,32 +52,17 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { val aFile = createFile(source, filename, content) val anOtherFile = createFile(source, anOtherFilename, content) val aHash = md5Hash(aFile) - - val anOtherKey = RemoteKey("other") - - val expected = Right( - List( - toCopy(anOtherKey, aHash, remoteKey) - )) - + val anOtherKey = RemoteKey("other") + val expected = Right(List(toCopy(anOtherKey, aHash, remoteKey))) val s3ObjectsData = S3ObjectsData( byHash = Map(aHash -> Set(KeyModified(anOtherKey, lastModified))), byKey = Map(anOtherKey -> HashModified(aHash, lastModified)) ) - - val storageService = - DummyStorageService(s3ObjectsData, - Map( - aFile -> (remoteKey, aHash) - )) - val result = - invoke(storageService, - hashService, - configOptions(ConfigOption.Source(source), - ConfigOption.Bucket("a-bucket"))) - + invoke(hashService, + options(source), + UIO.succeed(s3ObjectsData)) assertResult(expected)(result) }) } @@ -101,28 +74,17 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { withDirectory(source => { val file = createFile(source, filename, "file-content") val hash = md5Hash(file) - // DoNothing actions should have been filtered out of the plan val expected = Right(List()) - val s3ObjectsData = S3ObjectsData( byHash = Map(hash -> Set(KeyModified(remoteKey, lastModified))), byKey = Map(remoteKey -> HashModified(hash, lastModified)) ) - - val storageService = - DummyStorageService(s3ObjectsData, - Map( - file -> (remoteKey, hash) - )) - val result = - invoke(storageService, - hashService, - configOptions(ConfigOption.Source(source), - ConfigOption.Bucket("a-bucket"))) - + invoke(hashService, + options(source), + UIO.succeed(s3ObjectsData)) assertResult(expected)(result) }) } @@ -134,31 +96,18 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { val file = createFile(source, filename, "file-content") val currentHash = md5Hash(file) val originalHash = MD5Hash("original-file-content") - - val expected = Right( - List( - toUpload(remoteKey, currentHash, source, file) - )) - + val expected = + Right(List(toUpload(remoteKey, currentHash, source, file))) val s3ObjectsData = S3ObjectsData( byHash = Map(originalHash -> Set( KeyModified(remoteKey, lastModified))), byKey = Map(remoteKey -> HashModified(originalHash, lastModified)) ) - - val storageService = - DummyStorageService(s3ObjectsData, - Map( - file -> (remoteKey, currentHash) - )) - val result = - invoke(storageService, - hashService, - configOptions(ConfigOption.Source(source), - ConfigOption.Bucket("a-bucket"))) - + invoke(hashService, + options(source), + UIO.succeed(s3ObjectsData)) assertResult(expected)(result) }) } @@ -169,30 +118,16 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { val file = createFile(source, filename, "file-content") val hash = md5Hash(file) val sourceKey = RemoteKey("other-key") - - val expected = Right( - List( - toCopy(sourceKey, hash, remoteKey) - )) - + val expected = Right(List(toCopy(sourceKey, hash, remoteKey))) val s3ObjectsData = S3ObjectsData( byHash = Map(hash -> Set(KeyModified(sourceKey, lastModified))), byKey = Map() ) - - val storageService = - DummyStorageService(s3ObjectsData, - Map( - file -> (remoteKey, hash) - )) - val result = - invoke(storageService, - hashService, - configOptions(ConfigOption.Source(source), - ConfigOption.Bucket("a-bucket"))) - + invoke(hashService, + options(source), + UIO.succeed(s3ObjectsData)) assertResult(expected)(result) }) } @@ -208,54 +143,29 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { withDirectory(source => { val file = createFile(source, filename, "file-content") val hash = md5Hash(file) - // DoNothing actions should have been filtered out of the plan val expected = Right(List()) - val s3ObjectsData = S3ObjectsData( byHash = Map(hash -> Set(KeyModified(remoteKey, lastModified))), byKey = Map(remoteKey -> HashModified(hash, lastModified)) ) - - val storageService = - DummyStorageService(s3ObjectsData, - Map( - file -> (remoteKey, hash) - )) - val result = - invoke(storageService, - hashService, - configOptions(ConfigOption.Source(source), - ConfigOption.Bucket("a-bucket"))) - + invoke(hashService, options(source), UIO.succeed(s3ObjectsData)) assertResult(expected)(result) }) } } "with no matching local file" - { - "delete remote key" ignore { + "delete remote key" in { withDirectory(source => { - val hash = MD5Hash("file-content") - - val expected = Right( - List( - toDelete(remoteKey) - )) - + val hash = MD5Hash("file-content") + val expected = Right(List(toDelete(remoteKey))) val s3ObjectsData = S3ObjectsData( byHash = Map(hash -> Set(KeyModified(remoteKey, lastModified))), byKey = Map(remoteKey -> HashModified(hash, lastModified)) ) - - val storageService = DummyStorageService(s3ObjectsData, Map.empty) - val result = - invoke(storageService, - hashService, - configOptions(ConfigOption.Source(source), - ConfigOption.Bucket("a-bucket"))) - + invoke(hashService, options(source), UIO.succeed(s3ObjectsData)) assertResult(expected)(result) }) } @@ -268,36 +178,31 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { val filename2 = "file-2" val remoteKey1 = RemoteKey(filename1) val remoteKey2 = RemoteKey(filename2) + val options: Path => Path => ConfigOptions = + source1 => + source2 => + configOptions(ConfigOption.Source(source1), + ConfigOption.Source(source2), + ConfigOption.Bucket("a-bucket")) "unique files in both" - { "upload all files" in { withDirectory(firstSource => { val fileInFirstSource = createFile(firstSource, filename1, "file-1-content") val hash1 = md5Hash(fileInFirstSource) - withDirectory(secondSource => { val fileInSecondSource = createFile(secondSource, filename2, "file-2-content") val hash2 = md5Hash(fileInSecondSource) - val expected = Right( List( toUpload(remoteKey2, hash2, secondSource, fileInSecondSource), toUpload(remoteKey1, hash1, firstSource, fileInFirstSource) )) - - val storageService = DummyStorageService( - emptyS3ObjectData, - Map(fileInFirstSource -> (remoteKey1, hash1), - fileInSecondSource -> (remoteKey2, hash2))) - val result = - invoke(storageService, - hashService, - configOptions(ConfigOption.Source(firstSource), - ConfigOption.Source(secondSource), - ConfigOption.Bucket("a-bucket"))) - + invoke(hashService, + options(firstSource)(secondSource), + UIO.succeed(emptyS3ObjectData)) assertResult(expected)(result) }) }) @@ -309,29 +214,16 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { val fileInFirstSource: File = createFile(firstSource, filename1, "file-1-content") val hash1 = md5Hash(fileInFirstSource) - withDirectory(secondSource => { val fileInSecondSource: File = createFile(secondSource, filename1, "file-2-content") val hash2 = md5Hash(fileInSecondSource) - - val expected = Right( - List( - toUpload(remoteKey1, hash1, firstSource, fileInFirstSource) - )) - - val storageService = DummyStorageService( - emptyS3ObjectData, - Map(fileInFirstSource -> (remoteKey1, hash1), - fileInSecondSource -> (remoteKey2, hash2))) - + val expected = Right(List( + toUpload(remoteKey1, hash1, firstSource, fileInFirstSource))) val result = - invoke(storageService, - hashService, - configOptions(ConfigOption.Source(firstSource), - ConfigOption.Source(secondSource), - ConfigOption.Bucket("a-bucket"))) - + invoke(hashService, + options(firstSource)(secondSource), + UIO.succeed(emptyS3ObjectData)) assertResult(expected)(result) }) }) @@ -340,30 +232,19 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { "with a remote file only present in second source" - { "do not delete it " in { withDirectory(firstSource => { - withDirectory(secondSource => { val fileInSecondSource = createFile(secondSource, filename2, "file-2-content") - val hash2 = md5Hash(fileInSecondSource) - + val hash2 = md5Hash(fileInSecondSource) val expected = Right(List()) - val s3ObjectData = S3ObjectsData( byHash = Map(hash2 -> Set(KeyModified(remoteKey2, lastModified))), byKey = Map(remoteKey2 -> HashModified(hash2, lastModified))) - - val storageService = DummyStorageService( - s3ObjectData, - Map(fileInSecondSource -> (remoteKey2, hash2))) - val result = - invoke(storageService, - hashService, - configOptions(ConfigOption.Source(firstSource), - ConfigOption.Source(secondSource), - ConfigOption.Bucket("a-bucket"))) - + invoke(hashService, + options(firstSource)(secondSource), + UIO.succeed(s3ObjectData)) assertResult(expected)(result) }) }) @@ -375,27 +256,16 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { val fileInFirstSource: File = createFile(firstSource, filename1, "file-1-content") val hash1 = md5Hash(fileInFirstSource) - withDirectory(secondSource => { - val expected = Right(List()) - val s3ObjectData = S3ObjectsData( byHash = Map(hash1 -> Set(KeyModified(remoteKey1, lastModified))), byKey = Map(remoteKey1 -> HashModified(hash1, lastModified))) - - val storageService = DummyStorageService( - s3ObjectData, - Map(fileInFirstSource -> (remoteKey1, hash1))) - val result = - invoke(storageService, - hashService, - configOptions(ConfigOption.Source(firstSource), - ConfigOption.Source(secondSource), - ConfigOption.Bucket("a-bucket"))) - + invoke(hashService, + options(firstSource)(secondSource), + UIO.succeed(s3ObjectData)) assertResult(expected)(result) }) }) @@ -404,26 +274,14 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { "with remote file not present in either source" - { "delete from remote" in { withDirectory(firstSource => { - withDirectory(secondSource => { - - val expected = Right( - List( - toDelete(remoteKey1) - )) - + val expected = Right(List(toDelete(remoteKey1))) val s3ObjectData = S3ObjectsData(byKey = Map(remoteKey1 -> HashModified(MD5Hash(""), lastModified))) - - val storageService = DummyStorageService(s3ObjectData, Map()) - val result = - invoke(storageService, - hashService, - configOptions(ConfigOption.Source(firstSource), - ConfigOption.Source(secondSource), - ConfigOption.Bucket("a-bucket"))) - + invoke(hashService, + options(firstSource)(secondSource), + UIO.succeed(s3ObjectData)) assertResult(expected)(result) }) }) @@ -431,8 +289,8 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { } } - def md5Hash(file: File) = { - runtime + def md5Hash(file: File): MD5Hash = { + new DefaultRuntime {} .unsafeRunSync { hashService.hashLocalObject(file.toPath).map(_.get(MD5)) } @@ -464,30 +322,47 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { ConfigOptions(List(configOptions: _*)) private def invoke( - storageService: StorageService, hashService: HashService, - configOptions: ConfigOptions + configOptions: ConfigOptions, + result: Task[S3ObjectsData] ): Either[Any, List[(String, String, String, String, String)]] = { - runtime + type TestEnv = Storage.Test with Console.Test + val testEnv: TestEnv = new Storage.Test with Console.Test { + override def listResult: Task[S3ObjectsData] = result + override def uploadResult: UIO[StorageQueueEvent] = + Task.die(new NotImplementedError) + override def copyResult: UIO[StorageQueueEvent] = + Task.die(new NotImplementedError) + override def deleteResult: UIO[StorageQueueEvent] = + Task.die(new NotImplementedError) + override def shutdownResult: UIO[StorageQueueEvent] = + Task.die(new NotImplementedError) + } + + new DefaultRuntime {} .unsafeRunSync { - planBuilder - .createPlan(storageService, hashService, configOptions) + PlanBuilder + .createPlan(hashService, configOptions) + .provide(testEnv) } .toEither - .map(_.actions.toList.map({ - case ToUpload(_, lf, _) => - ("upload", - lf.remoteKey.key, - lf.hashes(MD5).hash, - lf.source.toString, - lf.file.toString) - case ToDelete(_, remoteKey, _) => ("delete", remoteKey.key, "", "", "") - case ToCopy(_, sourceKey, hash, targetKey, _) => - ("copy", sourceKey.key, hash.hash, targetKey.key, "") - case DoNothing(_, remoteKey, _) => - ("do-nothing", remoteKey.key, "", "", "") - case x => ("other", x.toString, "", "", "") - })) + .map(convertResult) } + private def convertResult + : SyncPlan => List[(String, String, String, String, String)] = + _.actions.toList.map({ + case ToUpload(_, lf, _) => + ("upload", + lf.remoteKey.key, + lf.hashes(MD5).hash, + lf.source.toString, + lf.file.toString) + case ToDelete(_, remoteKey, _) => ("delete", remoteKey.key, "", "", "") + case ToCopy(_, sourceKey, hash, targetKey, _) => + ("copy", sourceKey.key, hash.hash, targetKey.key, "") + case DoNothing(_, remoteKey, _) => + ("do-nothing", remoteKey.key, "", "", "") + case x => ("other", x.toString, "", "", "") + }) } diff --git a/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala deleted file mode 100644 index 0b3f1ea..0000000 --- a/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala +++ /dev/null @@ -1,243 +0,0 @@ -package net.kemitix.thorp.core - -import java.io.File -import java.nio.file.Paths -import java.time.Instant - -import net.kemitix.thorp.console -import net.kemitix.thorp.console.MyConsole -import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload} -import net.kemitix.thorp.domain.HashType.MD5 -import net.kemitix.thorp.domain.MD5HashData.{Leaf, Root} -import net.kemitix.thorp.domain.StorageQueueEvent.{ - CopyQueueEvent, - DeleteQueueEvent, - ShutdownQueueEvent, - UploadQueueEvent -} -import net.kemitix.thorp.domain._ -import net.kemitix.thorp.storage.api.{HashService, StorageService} -import org.scalatest.FunSpec -import zio.internal.PlatformLive -import zio.{Runtime, Task, TaskR} - -class SyncSuite extends FunSpec { - - private val runtime = Runtime(MyConsole.Live, PlatformLive.Default) - - private val testBucket = Bucket("bucket") - private val source = Resource(this, "upload") - private val sourcePath = source.toPath - // source contains the files root-file and subdir/leaf-file - private val rootRemoteKey = RemoteKey("prefix/root-file") - private val leafRemoteKey = RemoteKey("prefix/subdir/leaf-file") - private val rootFile: LocalFile = - LocalFile.resolve("root-file", - md5HashMap(Root.hash), - sourcePath, - _ => rootRemoteKey) - private val leafFile: LocalFile = - LocalFile.resolve("subdir/leaf-file", - md5HashMap(Leaf.hash), - sourcePath, - _ => leafRemoteKey) - private val hashService = - DummyHashService( - Map( - file("root-file") -> Map(MD5 -> MD5HashData.Root.hash), - file("subdir/leaf-file") -> Map(MD5 -> MD5HashData.Leaf.hash) - )) - private val configOptions = - ConfigOptions( - List( - ConfigOption.Source(sourcePath), - ConfigOption.Bucket("bucket"), - ConfigOption.Prefix("prefix"), - ConfigOption.IgnoreGlobalOptions, - ConfigOption.IgnoreUserOptions - )) - private val lastModified = LastModified(Instant.now) - - def putObjectRequest(bucket: Bucket, - remoteKey: RemoteKey, - localFile: LocalFile): (String, String, File) = - (bucket.name, remoteKey.key, localFile.file) - - private def md5HashMap(md5Hash: MD5Hash): Map[HashType, MD5Hash] = - Map(MD5 -> md5Hash) - - private def file(filename: String) = - sourcePath.resolve(Paths.get(filename)) - - describe("when all files should be uploaded") { - val storageService = - new RecordingStorageService(testBucket, S3ObjectsData()) - it("uploads all files") { - val expected = Right( - Set(ToUpload(testBucket, rootFile, rootFile.file.length), - ToUpload(testBucket, leafFile, leafFile.file.length))) - val result = - invokeSubjectForActions(storageService, hashService, configOptions) - assertResult(expected)(result.map(_.toSet)) - } - } - - describe("when no files should be uploaded") { - val s3ObjectsData = S3ObjectsData( - byHash = Map( - Root.hash -> Set( - KeyModified(RemoteKey("prefix/root-file"), lastModified)), - Leaf.hash -> Set( - KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified)) - ), - byKey = Map( - RemoteKey("prefix/root-file") -> HashModified(Root.hash, lastModified), - RemoteKey("prefix/subdir/leaf-file") -> HashModified(Leaf.hash, - lastModified)) - ) - val storageService = new RecordingStorageService(testBucket, s3ObjectsData) - it("no actions") { - val expected = Stream() - val result = - invokeSubjectForActions(storageService, hashService, configOptions) - assert(result.isRight) - assertResult(expected)(result.right.get) - } - } - describe("when a file is renamed it is moved on S3 with no upload") { - val sourceKey = RemoteKey("prefix/root-file-old") - val targetKey = RemoteKey("prefix/root-file") - // 'root-file-old' should be renamed as 'root-file' - val s3ObjectsData = S3ObjectsData( - byHash = - Map(Root.hash -> Set(KeyModified(sourceKey, lastModified)), - Leaf.hash -> Set( - KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))), - byKey = - Map(sourceKey -> HashModified(Root.hash, lastModified), - RemoteKey("prefix/subdir/leaf-file") -> HashModified(Leaf.hash, - lastModified)) - ) - val storageService = new RecordingStorageService(testBucket, s3ObjectsData) - it("copies the file and deletes the original") { - val expected = Stream( - ToCopy(testBucket, - sourceKey, - Root.hash, - targetKey, - rootFile.file.length), - ToDelete(testBucket, sourceKey, 0L) - ) - val result = - invokeSubjectForActions(storageService, hashService, configOptions) - assert(result.isRight) - assertResult(expected)(result.right.get) - } - } - describe("when a file is copied it is copied on S3 with no upload") { - it("TODO") { - pending - } - } - describe("when a file is deleted locally it is deleted from S3") { - val deletedHash = MD5Hash("deleted-hash") - val deletedKey = RemoteKey("prefix/deleted-file") - val s3ObjectsData = S3ObjectsData( - byHash = Map( - Root.hash -> Set( - KeyModified(RemoteKey("prefix/root-file"), lastModified)), - Leaf.hash -> Set( - KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified)), - deletedHash -> Set( - KeyModified(RemoteKey("prefix/deleted-file"), lastModified)) - ), - byKey = Map( - RemoteKey("prefix/root-file") -> HashModified(Root.hash, lastModified), - RemoteKey("prefix/subdir/leaf-file") -> HashModified(Leaf.hash, - lastModified), - deletedKey -> HashModified(deletedHash, lastModified) - ) - ) - val storageService = new RecordingStorageService(testBucket, s3ObjectsData) - it("deleted key") { - val expected = Stream( - ToDelete(testBucket, deletedKey, 0L) - ) - val result = - invokeSubjectForActions(storageService, hashService, configOptions) - assert(result.isRight) - assertResult(expected)(result.right.get) - } - } - describe("when a file is excluded") { - val s3ObjectsData = S3ObjectsData( - byHash = Map( - Root.hash -> Set( - KeyModified(RemoteKey("prefix/root-file"), lastModified)), - Leaf.hash -> Set( - KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified)) - ), - byKey = Map( - RemoteKey("prefix/root-file") -> HashModified(Root.hash, lastModified), - RemoteKey("prefix/subdir/leaf-file") -> HashModified(Leaf.hash, - lastModified)) - ) - val storageService = new RecordingStorageService(testBucket, s3ObjectsData) - it("is not uploaded") { - val expected = Stream() - val result = - invokeSubjectForActions(storageService, - hashService, - ConfigOption.Exclude("leaf") :: configOptions) - assert(result.isRight) - assertResult(expected)(result.right.get) - } - } - - class RecordingStorageService(testBucket: Bucket, - s3ObjectsData: S3ObjectsData) - extends StorageService { - - override def listObjects( - bucket: Bucket, - prefix: RemoteKey): TaskR[console.MyConsole, S3ObjectsData] = - TaskR(s3ObjectsData) - - override def upload(localFile: LocalFile, - bucket: Bucket, - batchMode: Boolean, - uploadEventListener: UploadEventListener, - tryCount: Int): Task[UploadQueueEvent] = - Task(UploadQueueEvent(localFile.remoteKey, localFile.hashes(MD5))) - - override def copy(bucket: Bucket, - sourceKey: RemoteKey, - hashes: MD5Hash, - targetKey: RemoteKey): Task[CopyQueueEvent] = - Task(CopyQueueEvent(sourceKey, targetKey)) - - override def delete(bucket: Bucket, - remoteKey: RemoteKey): Task[DeleteQueueEvent] = - Task(DeleteQueueEvent(remoteKey)) - - override def shutdown: Task[StorageQueueEvent] = - Task(ShutdownQueueEvent()) - } - - def invokeSubjectForActions( - storageService: StorageService, - hashService: HashService, - configOptions: ConfigOptions): Either[Any, Stream[Action]] = { - invoke(storageService, hashService, configOptions) - .map(_.actions) - } - - def invoke(storageService: StorageService, - hashService: HashService, - configOptions: ConfigOptions): Either[Any, SyncPlan] = { - runtime.unsafeRunSync { - PlanBuilder - .createPlan(storageService, hashService, configOptions) - }.toEither - } -} diff --git a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/Storage.scala b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/Storage.scala new file mode 100644 index 0000000..478e969 --- /dev/null +++ b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/Storage.scala @@ -0,0 +1,95 @@ +package net.kemitix.thorp.storage.api + +import net.kemitix.thorp.console.Console +import net.kemitix.thorp.domain._ +import zio.{Task, TaskR, UIO, ZIO} + +trait Storage { + val storage: Storage.Service +} + +object Storage { + trait Service { + + def listObjects( + bucket: Bucket, + prefix: RemoteKey + ): TaskR[Storage with Console, S3ObjectsData] + + def upload( + localFile: LocalFile, + bucket: Bucket, + batchMode: Boolean, + uploadEventListener: UploadEventListener, + tryCount: Int + ): ZIO[Storage, Nothing, StorageQueueEvent] + + def copy( + bucket: Bucket, + sourceKey: RemoteKey, + hash: MD5Hash, + targetKey: RemoteKey + ): ZIO[Storage, Nothing, StorageQueueEvent] + + def delete( + bucket: Bucket, + remoteKey: RemoteKey + ): UIO[StorageQueueEvent] + + def shutdown: UIO[StorageQueueEvent] + } + + trait Test extends Storage { + + def listResult: Task[S3ObjectsData] + def uploadResult: UIO[StorageQueueEvent] + def copyResult: UIO[StorageQueueEvent] + def deleteResult: UIO[StorageQueueEvent] + def shutdownResult: UIO[StorageQueueEvent] + + val storage: Service = new Service { + + override def listObjects( + bucket: Bucket, + prefix: RemoteKey): TaskR[Storage with Console, S3ObjectsData] = + listResult + + override def upload( + localFile: LocalFile, + bucket: Bucket, + batchMode: Boolean, + uploadEventListener: UploadEventListener, + tryCount: Int): ZIO[Storage, Nothing, StorageQueueEvent] = + uploadResult + + override def copy( + bucket: Bucket, + sourceKey: RemoteKey, + hash: MD5Hash, + targetKey: RemoteKey): ZIO[Storage, Nothing, StorageQueueEvent] = + copyResult + + override def delete(bucket: Bucket, + remoteKey: RemoteKey): UIO[StorageQueueEvent] = + deleteResult + + override def shutdown: UIO[StorageQueueEvent] = + shutdownResult + + } + } + + object Test extends Test { + override def listResult: Task[S3ObjectsData] = + Task.die(new NotImplementedError) + override def uploadResult: UIO[StorageQueueEvent] = + Task.die(new NotImplementedError) + override def copyResult: UIO[StorageQueueEvent] = + Task.die(new NotImplementedError) + override def deleteResult: UIO[StorageQueueEvent] = + Task.die(new NotImplementedError) + override def shutdownResult: UIO[StorageQueueEvent] = + Task.die(new NotImplementedError) + } + +} diff --git a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala deleted file mode 100644 index fbc84a2..0000000 --- a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala +++ /dev/null @@ -1,36 +0,0 @@ -package net.kemitix.thorp.storage.api - -import net.kemitix.thorp.console.MyConsole -import net.kemitix.thorp.domain._ -import zio.{Task, TaskR} - -trait StorageService { - - def shutdown: Task[StorageQueueEvent] - - def listObjects( - bucket: Bucket, - prefix: RemoteKey - ): TaskR[MyConsole, S3ObjectsData] - - def upload( - localFile: LocalFile, - bucket: Bucket, - batchMode: Boolean, - uploadEventListener: UploadEventListener, - tryCount: Int - ): Task[StorageQueueEvent] - - def copy( - bucket: Bucket, - sourceKey: RemoteKey, - hash: MD5Hash, - targetKey: RemoteKey - ): Task[StorageQueueEvent] - - def delete( - bucket: Bucket, - remoteKey: RemoteKey - ): Task[StorageQueueEvent] - -} diff --git a/storage-api/src/main/scala/net/kemitix/thorp/storage/storage.scala b/storage-api/src/main/scala/net/kemitix/thorp/storage/storage.scala new file mode 100644 index 0000000..de1dc73 --- /dev/null +++ b/storage-api/src/main/scala/net/kemitix/thorp/storage/storage.scala @@ -0,0 +1,42 @@ +package net.kemitix.thorp + +import net.kemitix.thorp.console.Console +import net.kemitix.thorp.domain._ +import net.kemitix.thorp.storage.api.Storage +import zio.{TaskR, ZIO} + +package object storage { + + final val storageService: ZIO[Storage, Nothing, Storage.Service] = + ZIO.access(_.storage) + + final def listObjects( + bucket: Bucket, + prefix: RemoteKey): TaskR[Storage with Console, S3ObjectsData] = + ZIO.accessM(_.storage listObjects (bucket, prefix)) + + final def upload( + localFile: LocalFile, + bucket: Bucket, + batchMode: Boolean, + uploadEventListener: UploadEventListener, + tryCount: Int + ): ZIO[Storage, Nothing, StorageQueueEvent] = + ZIO.accessM( + _.storage upload (localFile, bucket, batchMode, uploadEventListener, tryCount)) + + final def copyObject( + bucket: Bucket, + sourceKey: RemoteKey, + hash: MD5Hash, + targetKey: RemoteKey + ): ZIO[Storage, Nothing, StorageQueueEvent] = + ZIO.accessM(_.storage copy (bucket, sourceKey, hash, targetKey)) + + final def deleteObject( + bucket: Bucket, + remoteKey: RemoteKey + ): ZIO[Storage, Nothing, StorageQueueEvent] = + ZIO.accessM(_.storage delete (bucket, remoteKey)) + +} diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/AmazonS3.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/AmazonS3.scala index b4cf0f5..7f31ad8 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/AmazonS3.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/AmazonS3.scala @@ -1,40 +1,47 @@ package net.kemitix.thorp.storage.aws +import com.amazonaws.services.s3.model._ import com.amazonaws.services.s3.{AmazonS3 => AmazonS3Client} -import com.amazonaws.services.s3.model.{ - CopyObjectRequest, - CopyObjectResult, - DeleteObjectRequest, - ListObjectsV2Request, - ListObjectsV2Result -} +import zio.{Task, UIO} object AmazonS3 { trait Client { - def shutdown(): Unit + def shutdown(): UIO[Unit] - def deleteObject: DeleteObjectRequest => Unit + def deleteObject: DeleteObjectRequest => Task[Unit] - def copyObject: CopyObjectRequest => Option[CopyObjectResult] + def copyObject: CopyObjectRequest => Task[Option[CopyObjectResult]] - def listObjectsV2: ListObjectsV2Request => ListObjectsV2Result + def listObjectsV2: ListObjectsV2Request => Task[ListObjectsV2Result] } case class ClientImpl(amazonS3: AmazonS3Client) extends Client { - def shutdown(): Unit = amazonS3.shutdown() + def shutdown(): UIO[Unit] = + UIO { + amazonS3.shutdown() + } - def deleteObject: DeleteObjectRequest => Unit = - request => amazonS3.deleteObject(request) + def deleteObject: DeleteObjectRequest => Task[Unit] = + request => + Task { + amazonS3.deleteObject(request) + } - def copyObject: CopyObjectRequest => Option[CopyObjectResult] = - request => Option(amazonS3.copyObject(request)) + def copyObject: CopyObjectRequest => Task[Option[CopyObjectResult]] = + request => + Task { + amazonS3.copyObject(request) + }.map(Option(_)) - def listObjectsV2: ListObjectsV2Request => ListObjectsV2Result = - request => amazonS3.listObjectsV2(request) + def listObjectsV2: ListObjectsV2Request => Task[ListObjectsV2Result] = + request => + Task { + amazonS3.listObjectsV2(request) + } } diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/AmazonTransferManager.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/AmazonTransferManager.scala index 25dcc45..dbe1c24 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/AmazonTransferManager.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/AmazonTransferManager.scala @@ -2,10 +2,19 @@ package net.kemitix.thorp.storage.aws import com.amazonaws.services.s3.model.PutObjectRequest import com.amazonaws.services.s3.transfer.TransferManager +import net.kemitix.thorp.storage.aws.AmazonUpload.{ + CompletableUpload, + InProgress +} +import zio.{Task, UIO} case class AmazonTransferManager(transferManager: TransferManager) { - def shutdownNow(now: Boolean): Unit = transferManager.shutdownNow(now) + def shutdownNow(now: Boolean): UIO[Unit] = + UIO(transferManager.shutdownNow(now)) + + def upload: PutObjectRequest => Task[InProgress] = + putObjectRequest => + Task(transferManager.upload(putObjectRequest)) + .map(CompletableUpload) - def upload(putObjectRequest: PutObjectRequest): AmazonUpload = - AmazonUpload(transferManager.upload(putObjectRequest)) } diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/AmazonUpload.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/AmazonUpload.scala index 4b404e6..47c6567 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/AmazonUpload.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/AmazonUpload.scala @@ -3,6 +3,15 @@ package net.kemitix.thorp.storage.aws import com.amazonaws.services.s3.transfer.Upload import com.amazonaws.services.s3.transfer.model.UploadResult -case class AmazonUpload(upload: Upload) { - def waitForUploadResult: UploadResult = upload.waitForUploadResult() +object AmazonUpload { + + trait InProgress { + def waitForUploadResult: UploadResult + } + + case class CompletableUpload(upload: Upload) extends InProgress { + override def waitForUploadResult: UploadResult = + upload.waitForUploadResult() + } + } diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Copier.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Copier.scala index bbacad3..b95bb2b 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Copier.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Copier.scala @@ -1,64 +1,49 @@ package net.kemitix.thorp.storage.aws -import com.amazonaws.services.s3.model.{ - AmazonS3Exception, - CopyObjectRequest, - CopyObjectResult +import com.amazonaws.SdkClientException +import com.amazonaws.services.s3.model.{CopyObjectRequest, CopyObjectResult} +import net.kemitix.thorp.domain.StorageQueueEvent.{ + Action, + CopyQueueEvent, + ErrorQueueEvent } -import net.kemitix.thorp.domain.StorageQueueEvent.{Action, CopyQueueEvent} import net.kemitix.thorp.domain._ -import net.kemitix.thorp.storage.aws.S3ClientException.{ - HashMatchError, - S3Exception -} -import zio.Task +import net.kemitix.thorp.storage.aws.S3ClientException.{CopyError, HashError} +import zio.{IO, Task, UIO} -import scala.util.{Failure, Success, Try} +trait Copier { -class Copier(amazonS3: AmazonS3.Client) { - - def copy( + def copy(amazonS3: AmazonS3.Client)( bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey - ): Task[StorageQueueEvent] = - for { - copyResult <- copyObject(bucket, sourceKey, hash, targetKey) - result <- mapCopyResult(copyResult, sourceKey, targetKey) - } yield result + ): UIO[StorageQueueEvent] = + copyObject(amazonS3)(bucket, sourceKey, hash, targetKey) + .fold(foldFailure(sourceKey, targetKey), + foldSuccess(sourceKey, targetKey)) - private def mapCopyResult( - copyResult: Try[Option[CopyObjectResult]], - sourceKey: RemoteKey, - targetKey: RemoteKey - ) = - copyResult match { - case Success(None) => - Task.succeed( - StorageQueueEvent - .ErrorQueueEvent( - Action.Copy(s"${sourceKey.key} => ${targetKey.key}"), - targetKey, - HashMatchError)) - case Success(Some(_)) => - Task.succeed(CopyQueueEvent(sourceKey, targetKey)) - case Failure(e: AmazonS3Exception) => - Task.succeed( - StorageQueueEvent.ErrorQueueEvent( - Action.Copy(s"${sourceKey.key} => ${targetKey.key}"), - targetKey, - S3Exception(e.getMessage)) - ) - case Failure(e) => Task.fail(e) - } - - private def copyObject( + private def copyObject(amazonS3: AmazonS3.Client)( bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey - ) = { + ): IO[S3ClientException, CopyObjectResult] = { + + def handleResult + : Option[CopyObjectResult] => IO[S3ClientException, CopyObjectResult] = + maybeResult => + IO.fromEither { + maybeResult + .toRight(HashError) + } + + def handleError: Throwable => IO[S3ClientException, CopyObjectResult] = + error => + Task.fail { + CopyError(error) + } + val request = new CopyObjectRequest( bucket.name, @@ -66,7 +51,39 @@ class Copier(amazonS3: AmazonS3.Client) { bucket.name, targetKey.key ).withMatchingETagConstraint(hash.hash) - Task(Try(amazonS3.copyObject(request))) + amazonS3 + .copyObject(request) + .fold(handleError, handleResult) + .flatten } + private def foldFailure( + sourceKey: RemoteKey, + targetKey: RemoteKey): S3ClientException => StorageQueueEvent = { + case error: SdkClientException => + errorEvent(sourceKey, targetKey, error) + case error => + errorEvent(sourceKey, targetKey, error) + + } + + private def foldSuccess( + sourceKey: RemoteKey, + targetKey: RemoteKey): CopyObjectResult => StorageQueueEvent = + result => + Option(result) match { + case Some(_) => CopyQueueEvent(sourceKey, targetKey) + case None => + errorEvent(sourceKey, targetKey, HashError) + } + + private def errorEvent: (RemoteKey, RemoteKey, Throwable) => ErrorQueueEvent = + (sourceKey, targetKey, error) => + ErrorQueueEvent(action(sourceKey, targetKey), targetKey, error) + + private def action(sourceKey: RemoteKey, targetKey: RemoteKey): Action = + Action.Copy(s"${sourceKey.key} => ${targetKey.key}") + } + +object Copier extends Copier diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Deleter.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Deleter.scala index 68a8e61..2efcc94 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Deleter.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Deleter.scala @@ -1,26 +1,31 @@ package net.kemitix.thorp.storage.aws import com.amazonaws.services.s3.model.DeleteObjectRequest -import net.kemitix.thorp.domain.StorageQueueEvent.DeleteQueueEvent +import net.kemitix.thorp.domain.StorageQueueEvent.{ + Action, + DeleteQueueEvent, + ErrorQueueEvent +} import net.kemitix.thorp.domain.{Bucket, RemoteKey, StorageQueueEvent} -import zio.Task +import zio.{Task, UIO} -class Deleter(amazonS3: AmazonS3.Client) { +trait Deleter { - def delete( + def delete(amazonS3: AmazonS3.Client)( bucket: Bucket, remoteKey: RemoteKey - ): Task[StorageQueueEvent] = - for { - _ <- deleteObject(bucket, remoteKey) - } yield DeleteQueueEvent(remoteKey) + ): UIO[StorageQueueEvent] = + deleteObject(amazonS3)(bucket, remoteKey) + .map(_ => DeleteQueueEvent(remoteKey)) + .catchAll(e => + UIO(ErrorQueueEvent(Action.Delete(remoteKey.key), remoteKey, e))) - private def deleteObject( + private def deleteObject(amazonS3: AmazonS3.Client)( bucket: Bucket, remoteKey: RemoteKey - ) = { - val request = new DeleteObjectRequest(bucket.name, remoteKey.key) - Task(amazonS3.deleteObject(request)) - } + ): Task[Unit] = + amazonS3.deleteObject(new DeleteObjectRequest(bucket.name, remoteKey.key)) } + +object Deleter extends Deleter diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala index 540af77..969c860 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala @@ -1,8 +1,11 @@ package net.kemitix.thorp.storage.aws -import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary} +import com.amazonaws.services.s3.model.{ + ListObjectsV2Request, + ListObjectsV2Result, + S3ObjectSummary +} import net.kemitix.thorp.console._ -import net.kemitix.thorp.domain import net.kemitix.thorp.domain.{Bucket, RemoteKey, S3ObjectsData} import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey @@ -10,64 +13,64 @@ import zio.{Task, TaskR} import scala.collection.JavaConverters._ -class Lister(amazonS3: AmazonS3.Client) { +trait Lister { private type Token = String - private type Batch = (Stream[S3ObjectSummary], Option[Token]) + case class Batch(summaries: Stream[S3ObjectSummary], more: Option[Token]) - def listObjects( + def listObjects(amazonS3: AmazonS3.Client)( bucket: Bucket, prefix: RemoteKey - ): TaskR[MyConsole, S3ObjectsData] = { + ): TaskR[Console, S3ObjectsData] = { - val requestMore = (token: Token) => + def request = new ListObjectsV2Request() .withBucketName(bucket.name) .withPrefix(prefix.key) - .withContinuationToken(token) - def fetchBatch: ListObjectsV2Request => TaskR[MyConsole, Batch] = + def requestMore: Token => ListObjectsV2Request = + token => request.withContinuationToken(token) + + def fetchBatch: ListObjectsV2Request => TaskR[Console, Batch] = request => for { _ <- ListerLogger.logFetchBatch - batch <- tryFetchBatch(request) + batch <- tryFetchBatch(amazonS3)(request) } yield batch - def fetchMore( - more: Option[Token] - ): TaskR[MyConsole, Stream[S3ObjectSummary]] = { - more match { - case None => TaskR.succeed(Stream.empty) - case Some(token) => fetch(requestMore(token)) - } + def fetchMore: Option[Token] => TaskR[Console, Stream[S3ObjectSummary]] = { + case None => TaskR.succeed(Stream.empty) + case Some(token) => fetch(requestMore(token)) } - def fetch - : ListObjectsV2Request => TaskR[MyConsole, Stream[S3ObjectSummary]] = - request => { + def fetch: ListObjectsV2Request => TaskR[Console, Stream[S3ObjectSummary]] = + request => for { batch <- fetchBatch(request) - (summaries, more) = batch - rest <- fetchMore(more) - } yield summaries ++ rest - } + more <- fetchMore(batch.more) + } yield batch.summaries ++ more - for { - summaries <- fetch( - new ListObjectsV2Request() - .withBucketName(bucket.name) - .withPrefix(prefix.key)) - } yield domain.S3ObjectsData(byHash(summaries), byKey(summaries)) + fetch(request) + .map(summaries => { + S3ObjectsData(byHash(summaries), byKey(summaries)) + }) } private def tryFetchBatch( - request: ListObjectsV2Request - ): Task[(Stream[S3ObjectSummary], Option[Token])] = - Task(amazonS3.listObjectsV2(request)) - .map { result => - val more: Option[Token] = - if (result.isTruncated) Some(result.getNextContinuationToken) - else None - (result.getObjectSummaries.asScala.toStream, more) - } + amazonS3: AmazonS3.Client): ListObjectsV2Request => Task[Batch] = + request => + amazonS3 + .listObjectsV2(request) + .map(result => Batch(objectSummaries(result), moreToken(result))) + + private def objectSummaries( + result: ListObjectsV2Result): Stream[S3ObjectSummary] = + result.getObjectSummaries.asScala.toStream + + private def moreToken(result: ListObjectsV2Result): Option[String] = + if (result.isTruncated) Some(result.getNextContinuationToken) + else None + } + +object Lister extends Lister diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ListerLogger.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ListerLogger.scala index 94997f0..4f2fbb2 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ListerLogger.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ListerLogger.scala @@ -4,7 +4,7 @@ import net.kemitix.thorp.console._ import zio.TaskR trait ListerLogger { - def logFetchBatch: TaskR[MyConsole, Unit] = + def logFetchBatch: TaskR[Console, Unit] = putStrLn("Fetching remote summaries...") } object ListerLogger extends ListerLogger diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientException.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientException.scala index 1fae071..70ffb23 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientException.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientException.scala @@ -1,9 +1,13 @@ package net.kemitix.thorp.storage.aws -sealed trait S3ClientException extends Exception +sealed trait S3ClientException extends Throwable object S3ClientException { - case object HashMatchError extends S3ClientException { + case object HashError extends S3ClientException { + override def getMessage: String = + "The hash of the object to be overwritten did not match the the expected value" + } + case class CopyError(error: Throwable) extends S3ClientException { override def getMessage: String = "The hash of the object to be overwritten did not match the the expected value" } diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala new file mode 100644 index 0000000..0f245fd --- /dev/null +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala @@ -0,0 +1,54 @@ +package net.kemitix.thorp.storage.aws + +import com.amazonaws.services.s3.AmazonS3ClientBuilder +import com.amazonaws.services.s3.transfer.TransferManagerBuilder +import net.kemitix.thorp.console.Console +import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent +import net.kemitix.thorp.domain._ +import net.kemitix.thorp.storage.api.Storage +import net.kemitix.thorp.storage.api.Storage.Service +import zio.{TaskR, UIO} + +object S3Storage { + trait Live extends Storage { + val storage: Service = new Service { + + private val client: AmazonS3.Client = + AmazonS3.ClientImpl(AmazonS3ClientBuilder.defaultClient) + private val transferManager: AmazonTransferManager = + AmazonTransferManager(TransferManagerBuilder.defaultTransferManager) + + override def listObjects( + bucket: Bucket, + prefix: RemoteKey): TaskR[Console, S3ObjectsData] = + Lister.listObjects(client)(bucket, prefix) + + override def upload(localFile: LocalFile, + bucket: Bucket, + batchMode: Boolean, + uploadEventListener: UploadEventListener, + tryCount: Int): UIO[StorageQueueEvent] = + Uploader.upload(transferManager)(localFile, + bucket, + batchMode, + uploadEventListener, + 1) + + override def copy(bucket: Bucket, + sourceKey: RemoteKey, + hash: MD5Hash, + targetKey: RemoteKey): UIO[StorageQueueEvent] = + Copier.copy(client)(bucket, sourceKey, hash, targetKey) + + override def delete(bucket: Bucket, + remoteKey: RemoteKey): UIO[StorageQueueEvent] = + Deleter.delete(client)(bucket, remoteKey) + + override def shutdown: UIO[StorageQueueEvent] = { + transferManager.shutdownNow(true) + client.shutdown().map(_ => ShutdownQueueEvent()) + } + } + } + object Live extends Live +} diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala deleted file mode 100644 index 0719a9d..0000000 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala +++ /dev/null @@ -1,55 +0,0 @@ -package net.kemitix.thorp.storage.aws - -import net.kemitix.thorp.console.MyConsole -import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent -import net.kemitix.thorp.domain._ -import net.kemitix.thorp.storage.api.StorageService -import zio.{Task, TaskR} - -class S3StorageService( - amazonS3Client: => AmazonS3.Client, - amazonTransferManager: => AmazonTransferManager -) extends StorageService { - - lazy val objectLister = new Lister(amazonS3Client) - lazy val copier = new Copier(amazonS3Client) - lazy val uploader = new Uploader(amazonTransferManager) - lazy val deleter = new Deleter(amazonS3Client) - - override def listObjects( - bucket: Bucket, - prefix: RemoteKey - ): TaskR[MyConsole, S3ObjectsData] = - objectLister.listObjects(bucket, prefix) - - override def copy( - bucket: Bucket, - sourceKey: RemoteKey, - hash: MD5Hash, - targetKey: RemoteKey - ): Task[StorageQueueEvent] = - copier.copy(bucket, sourceKey, hash, targetKey) - - override def upload( - localFile: LocalFile, - bucket: Bucket, - batchMode: Boolean, - uploadEventListener: UploadEventListener, - tryCount: Int - ): Task[StorageQueueEvent] = - uploader.upload(localFile, bucket, batchMode, uploadEventListener, 1) - - override def delete( - bucket: Bucket, - remoteKey: RemoteKey - ): Task[StorageQueueEvent] = - deleter.delete(bucket, remoteKey) - - override def shutdown: Task[StorageQueueEvent] = - Task { - amazonTransferManager.shutdownNow(true) - amazonS3Client.shutdown() - ShutdownQueueEvent() - } - -} diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageServiceBuilder.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageServiceBuilder.scala deleted file mode 100644 index 578911a..0000000 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageServiceBuilder.scala +++ /dev/null @@ -1,24 +0,0 @@ -package net.kemitix.thorp.storage.aws - -import com.amazonaws.services.s3.AmazonS3ClientBuilder -import com.amazonaws.services.s3.transfer.TransferManagerBuilder -import net.kemitix.thorp.storage.api.StorageService - -object S3StorageServiceBuilder { - - def createService( - amazonS3Client: AmazonS3.Client, - amazonTransferManager: AmazonTransferManager - ): StorageService = - new S3StorageService( - amazonS3Client, - amazonTransferManager - ) - - lazy val defaultStorageService: StorageService = - createService( - AmazonS3.ClientImpl(AmazonS3ClientBuilder.defaultClient), - AmazonTransferManager(TransferManagerBuilder.defaultTransferManager) - ) - -} diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala index 8755000..b1f5a1a 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala @@ -13,39 +13,41 @@ import net.kemitix.thorp.domain.UploadEvent.{ TransferEvent } import net.kemitix.thorp.domain.{StorageQueueEvent, _} -import zio.Task +import zio.{Task, UIO} -class Uploader(transferManager: => AmazonTransferManager) { +trait Uploader { - def upload( + def upload(transferManager: => AmazonTransferManager)( localFile: LocalFile, bucket: Bucket, batchMode: Boolean, uploadEventListener: UploadEventListener, tryCount: Int - ): Task[StorageQueueEvent] = - for { - upload <- transfer(localFile, bucket, batchMode, uploadEventListener) - } yield upload + ): UIO[StorageQueueEvent] = + transfer(transferManager)(localFile, bucket, batchMode, uploadEventListener) + .catchAll(handleError(localFile.remoteKey)) - private def transfer( + private def handleError( + remoteKey: RemoteKey): Throwable => UIO[ErrorQueueEvent] = { e => + UIO.succeed(ErrorQueueEvent(Action.Upload(remoteKey.key), remoteKey, e)) + } + + private def transfer(transferManager: => AmazonTransferManager)( localFile: LocalFile, bucket: Bucket, batchMode: Boolean, uploadEventListener: UploadEventListener ): Task[StorageQueueEvent] = { - val listener: ProgressListener = progressListener(uploadEventListener) - val putObjectRequest = request(localFile, bucket, batchMode, listener) - Task(transferManager.upload(putObjectRequest)) + + val listener = progressListener(uploadEventListener) + val putObjectRequest = request(localFile, bucket, batchMode, listener) + + transferManager + .upload(putObjectRequest) .map(_.waitForUploadResult) - .map(upload => - UploadQueueEvent(RemoteKey(upload.getKey), MD5Hash(upload.getETag))) - .catchAll( - e => - Task.succeed( - ErrorQueueEvent(Action.Upload(localFile.remoteKey.key), - localFile.remoteKey, - e))) + .map(uploadResult => + UploadQueueEvent(RemoteKey(uploadResult.getKey), + MD5Hash(uploadResult.getETag))) } private def request( @@ -54,36 +56,45 @@ class Uploader(transferManager: => AmazonTransferManager) { batchMode: Boolean, listener: ProgressListener ): PutObjectRequest = { - val metadata = new ObjectMetadata() - localFile.md5base64.foreach(metadata.setContentMD5) val request = new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file) - .withMetadata(metadata) + .withMetadata(metadata(localFile)) if (batchMode) request else request.withGeneralProgressListener(listener) } - private def progressListener(uploadEventListener: UploadEventListener) = - new ProgressListener { - override def progressChanged(progressEvent: ProgressEvent): Unit = - uploadEventListener.listener(eventHandler(progressEvent)) + private def metadata: LocalFile => ObjectMetadata = localFile => { + val metadata = new ObjectMetadata() + localFile.md5base64.foreach(metadata.setContentMD5) + metadata + } - private def eventHandler(progressEvent: ProgressEvent) = { - progressEvent match { - case e: ProgressEvent if isTransfer(e) => - TransferEvent(e.getEventType.name) - case e: ProgressEvent if isByteTransfer(e) => - ByteTransferEvent(e.getEventType.name) - case e: ProgressEvent => - RequestEvent(e.getEventType.name, e.getBytes, e.getBytesTransferred) - } - } + private def progressListener: UploadEventListener => ProgressListener = + uploadEventListener => + new ProgressListener { + override def progressChanged(progressEvent: ProgressEvent): Unit = + uploadEventListener.listener(eventHandler(progressEvent)) + + private def eventHandler: ProgressEvent => UploadEvent = + progressEvent => { + def isTransfer: ProgressEvent => Boolean = + _.getEventType.isTransferEvent + def isByteTransfer: ProgressEvent => Boolean = + _.getEventType.equals( + ProgressEventType.RESPONSE_BYTE_TRANSFER_EVENT) + progressEvent match { + case e: ProgressEvent if isTransfer(e) => + TransferEvent(e.getEventType.name) + case e: ProgressEvent if isByteTransfer(e) => + ByteTransferEvent(e.getEventType.name) + case e: ProgressEvent => + RequestEvent(e.getEventType.name, + e.getBytes, + e.getBytesTransferred) + } + } } - private def isTransfer(e: ProgressEvent) = - e.getEventType.isTransferEvent - - private def isByteTransfer(e: ProgressEvent) = - e.getEventType equals ProgressEventType.RESPONSE_BYTE_TRANSFER_EVENT - } + +object Uploader extends Uploader diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala index 7a1f0f6..dafea77 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala @@ -1,6 +1,11 @@ package net.kemitix.thorp.storage.aws +import net.kemitix.thorp.console.Console +import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent +import net.kemitix.thorp.domain._ +import net.kemitix.thorp.storage.api.Storage import org.scalamock.scalatest.MockFactory +import zio.{TaskR, UIO} trait AmazonS3ClientTestFixture extends MockFactory { @@ -11,8 +16,43 @@ trait AmazonS3ClientTestFixture extends MockFactory { amazonS3Client: AmazonS3.Client, amazonS3TransferManager: AmazonTransferManager, ) { - lazy val storageService: S3StorageService = - new S3StorageService(amazonS3Client, amazonS3TransferManager) + lazy val storageService: Storage.Service = + new Storage.Service { + + private val client = amazonS3Client + private val transferManager = amazonS3TransferManager + + override def listObjects( + bucket: Bucket, + prefix: RemoteKey): TaskR[Console, S3ObjectsData] = + Lister.listObjects(client)(bucket, prefix) + + override def upload(localFile: LocalFile, + bucket: Bucket, + batchMode: Boolean, + uploadEventListener: UploadEventListener, + tryCount: Int): UIO[StorageQueueEvent] = + Uploader.upload(transferManager)(localFile, + bucket, + batchMode, + uploadEventListener, + 1) + + override def copy(bucket: Bucket, + sourceKey: RemoteKey, + hash: MD5Hash, + targetKey: RemoteKey): UIO[StorageQueueEvent] = + Copier.copy(client)(bucket, sourceKey, hash, targetKey) + + override def delete(bucket: Bucket, + remoteKey: RemoteKey): UIO[StorageQueueEvent] = + Deleter.delete(client)(bucket, remoteKey) + + override def shutdown: UIO[StorageQueueEvent] = { + transferManager.shutdownNow(true) + client.shutdown().map(_ => ShutdownQueueEvent()) + } + } } } diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/CopierTest.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/CopierTest.scala index 99e5c39..463c2b8 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/CopierTest.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/CopierTest.scala @@ -1,20 +1,17 @@ package net.kemitix.thorp.storage.aws import com.amazonaws.services.s3.model.{AmazonS3Exception, CopyObjectResult} -import net.kemitix.thorp.console.MyConsole +import net.kemitix.thorp.console.Console import net.kemitix.thorp.domain.StorageQueueEvent.{Action, ErrorQueueEvent} import net.kemitix.thorp.domain._ -import net.kemitix.thorp.storage.aws.S3ClientException.{ - HashMatchError, - S3Exception -} +import net.kemitix.thorp.storage.aws.S3ClientException.{CopyError, HashError} import org.scalatest.FreeSpec -import zio.Runtime import zio.internal.PlatformLive +import zio.{Runtime, Task} class CopierTest extends FreeSpec { - private val runtime = Runtime(MyConsole.Live, PlatformLive.Default) + private val runtime = Runtime(Console.Live, PlatformLive.Default) "copier" - { val bucket = Bucket("aBucket") @@ -29,9 +26,9 @@ class CopierTest extends FreeSpec { new AmazonS3ClientTestFixture { (fixture.amazonS3Client.copyObject _) .when() - .returns(_ => Some(new CopyObjectResult)) + .returns(_ => Task.succeed(Some(new CopyObjectResult))) private val result = - invoke(bucket, sourceKey, hash, targetKey, fixture.storageService) + invoke(bucket, sourceKey, hash, targetKey, fixture.amazonS3Client) assertResult(expected)(result) } } @@ -41,17 +38,17 @@ class CopierTest extends FreeSpec { new AmazonS3ClientTestFixture { (fixture.amazonS3Client.copyObject _) .when() - .returns(_ => None) + .returns(_ => Task.succeed(None)) private val result = - invoke(bucket, sourceKey, hash, targetKey, fixture.storageService) + invoke(bucket, sourceKey, hash, targetKey, fixture.amazonS3Client) result match { case Right( ErrorQueueEvent(Action.Copy("sourceKey => targetKey"), RemoteKey("targetKey"), e)) => e match { - case HashMatchError => assert(true) - case _ => fail("Not a HashMatchError") + case HashError => assert(true) + case _ => fail("Not a HashError: " + e) } case e => fail("Not an ErrorQueueEvent: " + e) } @@ -64,18 +61,18 @@ class CopierTest extends FreeSpec { private val expectedMessage = "The specified key does not exist" (fixture.amazonS3Client.copyObject _) .when() - .throws(new AmazonS3Exception(expectedMessage)) + .returns(_ => Task.fail(new AmazonS3Exception(expectedMessage))) private val result = - invoke(bucket, sourceKey, hash, targetKey, fixture.storageService) + invoke(bucket, sourceKey, hash, targetKey, fixture.amazonS3Client) result match { case Right( ErrorQueueEvent(Action.Copy("sourceKey => targetKey"), RemoteKey("targetKey"), e)) => e match { - case S3Exception(message) => - assert(message.startsWith(expectedMessage)) - case _ => fail("Not an S3Exception") + case CopyError(cause) => + assert(cause.getMessage.startsWith(expectedMessage)) + case _ => fail("Not a CopyError: " + e) } case e => fail("Not an ErrorQueueEvent: " + e) } @@ -88,10 +85,10 @@ class CopierTest extends FreeSpec { sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey, - storageService: S3StorageService + amazonS3Client: AmazonS3.Client ) = runtime.unsafeRunSync { - storageService.copy(bucket, sourceKey, hash, targetKey) + Copier.copy(amazonS3Client)(bucket, sourceKey, hash, targetKey) }.toEither } diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/DeleterTest.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/DeleterTest.scala new file mode 100644 index 0000000..caaac71 --- /dev/null +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/DeleterTest.scala @@ -0,0 +1,66 @@ +package net.kemitix.thorp.storage.aws + +import com.amazonaws.SdkClientException +import com.amazonaws.services.s3.model.AmazonS3Exception +import net.kemitix.thorp.console._ +import net.kemitix.thorp.domain.StorageQueueEvent.{ + Action, + DeleteQueueEvent, + ErrorQueueEvent +} +import net.kemitix.thorp.domain.{Bucket, RemoteKey} +import org.scalatest.FreeSpec +import zio.internal.PlatformLive +import zio.{Runtime, Task, UIO} + +class DeleterTest extends FreeSpec { + + private val runtime = Runtime(Console.Live, PlatformLive.Default) + + "delete" - { + val bucket = Bucket("aBucket") + val remoteKey = RemoteKey("aRemoteKey") + "when no errors" in { + val expected = Right(DeleteQueueEvent(remoteKey)) + new AmazonS3ClientTestFixture { + (fixture.amazonS3Client.deleteObject _) + .when() + .returns(_ => UIO.succeed(())) + private val result = invoke(fixture.amazonS3Client)(bucket, remoteKey) + assertResult(expected)(result) + } + } + "when Amazon Service Exception" in { + val exception = new AmazonS3Exception("message") + val expected = + Right( + ErrorQueueEvent(Action.Delete(remoteKey.key), remoteKey, exception)) + new AmazonS3ClientTestFixture { + (fixture.amazonS3Client.deleteObject _) + .when() + .returns(_ => Task.fail(exception)) + private val result = invoke(fixture.amazonS3Client)(bucket, remoteKey) + assertResult(expected)(result) + } + } + "when Amazon SDK Client Exception" in { + val exception = new SdkClientException("message") + val expected = + Right( + ErrorQueueEvent(Action.Delete(remoteKey.key), remoteKey, exception)) + new AmazonS3ClientTestFixture { + (fixture.amazonS3Client.deleteObject _) + .when() + .returns(_ => Task.fail(exception)) + private val result = invoke(fixture.amazonS3Client)(bucket, remoteKey) + assertResult(expected)(result) + } + } + def invoke(amazonS3Client: AmazonS3.Client)(bucket: Bucket, + remoteKey: RemoteKey) = + runtime.unsafeRunSync { + Deleter.delete(amazonS3Client)(bucket, remoteKey) + }.toEither + + } +} diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ListerTest.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ListerTest.scala new file mode 100644 index 0000000..b3f829c --- /dev/null +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ListerTest.scala @@ -0,0 +1,129 @@ +package net.kemitix.thorp.storage.aws + +import java.util.Date + +import com.amazonaws.SdkClientException +import com.amazonaws.services.s3.model.{ + AmazonS3Exception, + ListObjectsV2Result, + S3ObjectSummary +} +import net.kemitix.thorp.console._ +import net.kemitix.thorp.domain._ +import org.scalatest.FreeSpec +import zio.internal.PlatformLive +import zio.{Runtime, Task, UIO} + +class ListerTest extends FreeSpec { + + private val runtime = Runtime(Console.Live, PlatformLive.Default) + + "list" - { + val bucket = Bucket("aBucket") + val prefix = RemoteKey("aRemoteKey") + "when no errors" - { + "when single fetch required" in { + val nowDate = new Date + val nowInstant = nowDate.toInstant + val key = "key" + val etag = "etag" + val expectedHashMap = Map( + MD5Hash(etag) -> Set( + KeyModified(RemoteKey(key), LastModified(nowInstant)))) + val expectedKeyMap = Map( + RemoteKey(key) -> HashModified(MD5Hash(etag), + LastModified(nowInstant)) + ) + val expected = Right(S3ObjectsData(expectedHashMap, expectedKeyMap)) + new AmazonS3ClientTestFixture { + (fixture.amazonS3Client.listObjectsV2 _) + .when() + .returns(_ => { + UIO.succeed(objectResults(nowDate, key, etag, false)) + }) + private val result = invoke(fixture.amazonS3Client)(bucket, prefix) + assertResult(expected)(result) + } + } + + "when second fetch required" in { + val nowDate = new Date + val nowInstant = nowDate.toInstant + val key1 = "key1" + val etag1 = "etag1" + val key2 = "key2" + val etag2 = "etag2" + val expectedHashMap = Map( + MD5Hash(etag1) -> Set( + KeyModified(RemoteKey(key1), LastModified(nowInstant))), + MD5Hash(etag2) -> Set( + KeyModified(RemoteKey(key2), LastModified(nowInstant))) + ) + val expectedKeyMap = Map( + RemoteKey(key1) -> HashModified(MD5Hash(etag1), + LastModified(nowInstant)), + RemoteKey(key2) -> HashModified(MD5Hash(etag2), + LastModified(nowInstant)) + ) + val expected = Right(S3ObjectsData(expectedHashMap, expectedKeyMap)) + new AmazonS3ClientTestFixture { + (fixture.amazonS3Client.listObjectsV2 _) + .when() + .returns(_ => UIO(objectResults(nowDate, key1, etag1, true))) + .noMoreThanOnce() + (fixture.amazonS3Client.listObjectsV2 _) + .when() + .returns(_ => UIO(objectResults(nowDate, key2, etag2, false))) + private val result = invoke(fixture.amazonS3Client)(bucket, prefix) + assertResult(expected)(result) + } + } + + def objectSummary(key: String, etag: String, lastModified: Date) = { + val objectSummary = new S3ObjectSummary + objectSummary.setKey(key) + objectSummary.setETag(etag) + objectSummary.setLastModified(lastModified) + objectSummary + } + + def objectResults(nowDate: Date, + key: String, + etag: String, + truncated: Boolean) = { + val result = new ListObjectsV2Result + result.getObjectSummaries.add(objectSummary(key, etag, nowDate)) + result.setTruncated(truncated) + result + } + + } + "when Amazon Service Exception" in { + val exception = new AmazonS3Exception("message") + new AmazonS3ClientTestFixture { + (fixture.amazonS3Client.listObjectsV2 _) + .when() + .returns(_ => Task.fail(exception)) + private val result = invoke(fixture.amazonS3Client)(bucket, prefix) + assert(result.isLeft) + } + } + "when Amazon SDK Client Exception" in { + val exception = new SdkClientException("message") + new AmazonS3ClientTestFixture { + (fixture.amazonS3Client.listObjectsV2 _) + .when() + .returns(_ => Task.fail(exception)) + private val result = invoke(fixture.amazonS3Client)(bucket, prefix) + assert(result.isLeft) + } + } + def invoke(amazonS3Client: AmazonS3.Client)(bucket: Bucket, + prefix: RemoteKey) = + runtime.unsafeRunSync { + Lister.listObjects(amazonS3Client)(bucket, prefix) + }.toEither + + } + +} diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3StorageServiceSuite.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3StorageServiceSuite.scala deleted file mode 100644 index 86722ec..0000000 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3StorageServiceSuite.scala +++ /dev/null @@ -1,74 +0,0 @@ -package net.kemitix.thorp.storage.aws - -import java.time.Instant -import java.time.temporal.ChronoUnit -import java.util.Date - -import com.amazonaws.services.s3.model.{ListObjectsV2Result, S3ObjectSummary} -import net.kemitix.thorp.console.MyConsole -import net.kemitix.thorp.core.Resource -import net.kemitix.thorp.domain._ -import org.scalamock.scalatest.MockFactory -import org.scalatest.FreeSpec -import zio.Runtime -import zio.internal.PlatformLive - -class S3StorageServiceSuite extends FreeSpec with MockFactory { - - private val runtime = Runtime(MyConsole.Live, PlatformLive.Default) - - "listObjects" - { - def objectSummary(hash: MD5Hash, - remoteKey: RemoteKey, - lastModified: LastModified) = { - val summary = new S3ObjectSummary() - summary.setETag(hash.hash) - summary.setKey(remoteKey.key) - summary.setLastModified(Date.from(lastModified.when)) - summary - } - val source = Resource(this, "upload") - val sourcePath = source.toPath - val prefix = RemoteKey("prefix") - implicit val config: Config = - Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath))) - val lm = LastModified(Instant.now.truncatedTo(ChronoUnit.MILLIS)) - val h1 = MD5Hash("hash1") - val k1a = RemoteKey("key1a") - val o1a = objectSummary(h1, k1a, lm) - val k1b = RemoteKey("key1b") - val o1b = objectSummary(h1, k1b, lm) - val h2 = MD5Hash("hash2") - val k2 = RemoteKey("key2") - val o2 = objectSummary(h2, k2, lm) - val myFakeResponse = new ListObjectsV2Result() - val summaries = myFakeResponse.getObjectSummaries - summaries.add(o1a) - summaries.add(o1b) - summaries.add(o2) - - "should build list of hash lookups, with duplicate objects grouped by hash" in { - val expected = Right( - S3ObjectsData( - byHash = Map(h1 -> Set(KeyModified(k1a, lm), KeyModified(k1b, lm)), - h2 -> Set(KeyModified(k2, lm))), - byKey = Map(k1a -> HashModified(h1, lm), - k1b -> HashModified(h1, lm), - k2 -> HashModified(h2, lm)) - )) - new AmazonS3ClientTestFixture { - (fixture.amazonS3Client.listObjectsV2 _) - .when() - .returns(_ => myFakeResponse) - private val result = invoke(fixture.storageService) - assertResult(expected)(result) - } - } - def invoke(storageService: S3StorageService) = - runtime.unsafeRunSync { - storageService - .listObjects(Bucket("bucket"), RemoteKey("prefix")) - }.toEither - } - -} diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala index 9722d30..00b2576 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala @@ -2,12 +2,8 @@ package net.kemitix.thorp.storage.aws import java.time.Instant -import com.amazonaws.services.s3.model.PutObjectRequest -import com.amazonaws.services.s3.transfer.model.UploadResult import net.kemitix.thorp.core.{KeyGenerator, Resource, S3MetaDataEnricher} import net.kemitix.thorp.domain.HashType.MD5 -import net.kemitix.thorp.domain.MD5HashData.Root -import net.kemitix.thorp.domain.StorageQueueEvent.UploadQueueEvent import net.kemitix.thorp.domain._ import org.scalamock.scalatest.MockFactory import org.scalatest.FunSpec @@ -24,17 +20,18 @@ class StorageServiceSuite extends FunSpec with MockFactory { KeyGenerator.generateKey(config.sources, config.prefix) _ describe("getS3Status") { + val hash = MD5Hash("hash") val localFile = - LocalFile.resolve("the-file", md5HashMap(hash), sourcePath, fileToKey) + LocalFile.resolve("the-file", Map(MD5 -> hash), sourcePath, fileToKey) val key = localFile.remoteKey val keyOtherKey = LocalFile.resolve("other-key-same-hash", - md5HashMap(hash), + Map(MD5 -> hash), sourcePath, fileToKey) val diffHash = MD5Hash("diff") val keyDiffHash = LocalFile.resolve("other-key-diff-hash", - md5HashMap(diffHash), + Map(MD5 -> diffHash), sourcePath, fileToKey) val lastModified = LastModified(Instant.now) @@ -85,7 +82,7 @@ class StorageServiceSuite extends FunSpec with MockFactory { describe("when remote key does not exist and no others matches hash") { val localFile = LocalFile.resolve("missing-file", - md5HashMap(MD5Hash("unique")), + Map(MD5 -> MD5Hash("unique")), sourcePath, fileToKey) it("should return no matches by key") { @@ -114,51 +111,4 @@ class StorageServiceSuite extends FunSpec with MockFactory { } - private def md5HashMap(hash: MD5Hash): Map[HashType, MD5Hash] = - Map(MD5 -> hash) - - val batchMode: Boolean = true - - describe("upload") { - - describe("when uploading a file") { - val amazonS3Client = stub[AmazonS3.Client] - val amazonTransferManager = stub[AmazonTransferManager] - val storageService = - new S3StorageService(amazonS3Client, amazonTransferManager) - - val prefix = RemoteKey("prefix") - val localFile = - LocalFile.resolve("root-file", - md5HashMap(Root.hash), - sourcePath, - KeyGenerator.generateKey(config.sources, prefix)) - val bucket = Bucket("a-bucket") - val remoteKey = RemoteKey("prefix/root-file") - val uploadEventListener = - UploadEventListener(localFile, 1, SyncTotals(), 0L) - - val upload = stub[AmazonUpload] - (amazonTransferManager upload (_: PutObjectRequest)) - .when(*) - .returns(upload) - val uploadResult = stub[UploadResult] - (upload.waitForUploadResult _).when().returns(uploadResult) - (uploadResult.getETag _).when().returns(Root.hash.hash) - (uploadResult.getKey _).when().returns(remoteKey.key) - - it("should return hash of uploaded file") { - pending - //FIXME: works okay on its own, but fails when run with others - val expected = UploadQueueEvent(remoteKey, Root.hash) - val result = - storageService.upload(localFile, - bucket, - batchMode, - uploadEventListener, - 1) - assertResult(expected)(result) - } - } - } } diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderTest.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderTest.scala new file mode 100644 index 0000000..ccc5e0f --- /dev/null +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderTest.scala @@ -0,0 +1,120 @@ +package net.kemitix.thorp.storage.aws + +import java.io.File + +import com.amazonaws.SdkClientException +import com.amazonaws.services.s3.model.AmazonS3Exception +import com.amazonaws.services.s3.transfer.model.UploadResult +import net.kemitix.thorp.console._ +import net.kemitix.thorp.core.Resource +import net.kemitix.thorp.domain.HashType.MD5 +import net.kemitix.thorp.domain.StorageQueueEvent.{ + Action, + ErrorQueueEvent, + UploadQueueEvent +} +import net.kemitix.thorp.domain._ +import org.scalamock.scalatest.MockFactory +import org.scalatest.FreeSpec +import zio.internal.PlatformLive +import zio.{Runtime, Task} + +class UploaderTest extends FreeSpec with MockFactory { + + private val runtime = Runtime(Console.Live, PlatformLive.Default) + + "upload" - { + val aSource: File = Resource(this, "") + val aFile: File = Resource(this, "small-file") + val aHash = MD5Hash("aHash") + val hashes = Map[HashType, MD5Hash](MD5 -> aHash) + val remoteKey = RemoteKey("aRemoteKey") + val localFile = LocalFile(aFile, aSource, hashes, remoteKey) + val bucket = Bucket("aBucket") + val batchMode = false + val tryCount = 1 + val uploadResult = new UploadResult + uploadResult.setKey(remoteKey.key) + uploadResult.setETag(aHash.hash) + val inProgress = new AmazonUpload.InProgress { + override def waitForUploadResult: UploadResult = uploadResult + } + val uploadEventListener = + UploadEventListener(localFile, 0, SyncTotals(1, 0, 0), 0) + "when no error" in { + val expected = + Right(UploadQueueEvent(remoteKey, aHash)) + new AmazonS3ClientTestFixture { + (fixture.amazonS3TransferManager.upload _) + .when() + .returns(_ => Task.succeed(inProgress)) + private val result = + invoke(fixture.amazonS3TransferManager)( + localFile, + bucket, + batchMode, + uploadEventListener, + tryCount + ) + assertResult(expected)(result) + } + } + "when Amazon Service Exception" in { + val exception = new AmazonS3Exception("message") + val expected = + Right( + ErrorQueueEvent(Action.Upload(remoteKey.key), remoteKey, exception)) + new AmazonS3ClientTestFixture { + (fixture.amazonS3TransferManager.upload _) + .when() + .returns(_ => Task.fail(exception)) + private val result = + invoke(fixture.amazonS3TransferManager)( + localFile, + bucket, + batchMode, + uploadEventListener, + tryCount + ) + assertResult(expected)(result) + } + } + "when Amazon SDK Client Exception" in { + val exception = new SdkClientException("message") + val expected = + Right( + ErrorQueueEvent(Action.Upload(remoteKey.key), remoteKey, exception)) + new AmazonS3ClientTestFixture { + (fixture.amazonS3TransferManager.upload _) + .when() + .returns(_ => Task.fail(exception)) + private val result = + invoke(fixture.amazonS3TransferManager)( + localFile, + bucket, + batchMode, + uploadEventListener, + tryCount + ) + assertResult(expected)(result) + } + } + def invoke(transferManager: AmazonTransferManager)( + localFile: LocalFile, + bucket: Bucket, + batchMode: Boolean, + uploadEventListener: UploadEventListener, + tryCount: Int + ) = + runtime.unsafeRunSync { + Uploader.upload(transferManager)( + localFile, + bucket, + batchMode, + uploadEventListener, + tryCount + ) + }.toEither + } + +}