From c5d7d4933c971172c356d09b24775f8e0c9b348d Mon Sep 17 00:00:00 2001 From: Paul Campbell Date: Sat, 7 Sep 2019 07:52:13 +0100 Subject: [PATCH] Restructure using EIP-ZIO channels (#183) * [sbt] Rename storage-api as storage * [storage] remove dependency upon console * [storage] remove dependency upon config * [console] remove dependency upon config * [sbt] Add app module Make cli module actually cli, by moving CliArgs parser into it and Main and Program into app. * add app that depends on cli and thorp-lib * move non-cli specific to app * make cli depend on config * make cli not depend on thorp-lib * [sbt] make module dependencies more explicit * make app depend on storage-aws * make cli depend on filesystem's tests * make thorp-lib depend on core * make thorp-lib not depend on storage-aws * make storage-aws not depend on core's tests * make storage-aws depend on storage * make storage-aws depend on filesystem and its tests * make storage-aws depend on console * make storage-aws depend on core * make core depend on filesystem and its tests * make filesystem depend on domain and its tests * [sbt] merge thorp-lib with core as lib * [sbt] add zio streams * [lib] Add EIPTest * [sbt] Allow NonUnitStatements * [lib] EIPTest Message Channel rewritten using ZIO Stream * [sbt] Add eip-zip 0.2.0 as dependency in lib * Remove file counter and total upload size progress Simplifying UnversionedMirrorArchive so we can create it before we know what actions are needed. * Fetch Remote Data before preparing any plans * [domain] RemoteObjects only holds a single RemoteKey per Hash Having multiple keys for a hash is redundant. They are only used to create copy commands, and only one source remote key is needed for that. * [lib] Add a State trait * [lib] Add FileScanner * Add FileSystem.length(File) * Add Clock to the Environment * [domain] Sources update format * [domain] Asking for a path that isn't in any Sources is fatal There should never be any situation where are path not within a Source is supplied. If there is, then something is badly wrong. * [lib] Add test on use of zio.Ref * [uishell] Add stub module * [sbt] Upgrade eip-zio from 0.2.0 to 0.3.0 * [uishell] Add UIEvent stub * [uishell] Add UIShell stub * [sbt] Add eip-zio dependencies to app module * [app] Wrap existing execution in simple point to point channel * [uishell] Add UIEvent.ShowValidConfig * [app] Remember to end the channel to allow prog to exit * [app] purify environment for showValidConfig * [app] Create type alias for pure effect free channel ref * [app] Program refactoring * [uishell] Add UIEvent.RemoteDataFetched * [domain] Move Counters from lib * [uishell] Add UIEvent.ShowSummary * [lib] Add stub for PushLocalChanges * [lib] Clean up FileScanner Environment types * [lib] End channel after scanning files * [lib] PushLocalChanges uses FileScanner Scans files and sends them to a dummy receiver. * [uishell] Add UIEvent.FileFound * [lib] rename PushLocalChanges.apply as LocalFielSystem.scanCopyUpload * [lib] FileScanner return LocalFile * [domain] add length to LocalFile * [domain] Add interogation queries to RemoteObjects * [domain] Remove RemoteObject.keyForHashes * [domain] RemoteObjects.remoteHasHash return the key and the hash * [lib] LocalFileSystem.scanCopyUpload create Actions * [domain] Move Action from lib * [uishell] Log actions * [lib] FileScanner respects Filters * [lib] Create remoteKey for files correctly * [lib] LocalFileSystem refactoring * [lib] ThorpArchive.update doesn't need Console * [uishell] Don't log choosen Action * [uishell] Add UIEvent.ActionFinished * [lib] LocalFileSystem refactoring * [lib] Switch to using LocalFileSystem to do Copy/Upload Todo or Broken: - [ ] Delete actions don't happen - [ ] Counters in summary are all zeros * [lib] LocalFileStream display summary counters correctly * [app] Restore ability to delete remote files * [lib] LocalFileSystem deletes remote when local does NOT exist * [filesystem] move hasLocalFile to FileSystem * [filesystem] fix detection of local files from a RemoteKey The configured Prefix wasn't being taken into account, meaning that the expected local file for a RemoteKey was wrong. * [filesystem] fix broken FileSystem test * [domain] fix RemoteKey test * [sbt] Upgrade eip-zio to 0.3.1 for zio-stream 1.0.0-RC12-1 compatibility * [app] Program refactorting * [lib] Remove unused class * [lib] Remove test * [uishell] Refactor large method --- .../main/scala/net/kemitix/thorp}/Main.scala | 6 +- .../scala/net/kemitix/thorp/Program.scala | 111 ++++++++ build.sbt | 80 ++++-- .../net/kemitix/thorp/cli}/CliArgs.scala | 3 +- .../scala/net/kemitix/thorp/cli/Program.scala | 46 ---- .../net/kemitix/thorp/cli}/CliArgsTest.scala | 4 +- .../net/kemitix/thorp/console/Console.scala | 7 +- .../kemitix/thorp/console/ConsoleOut.scala | 7 +- .../net/kemitix/thorp/core/CoreTypes.scala | 15 - .../thorp/core/S3MetaDataEnricher.scala | 40 --- .../net/kemitix/thorp/core/SyncLogging.scala | 58 ---- .../net/kemitix/thorp/core/ThorpArchive.scala | 38 --- .../thorp/core/UnversionedMirrorArchive.scala | 52 ---- .../net/kemitix/thorp/domain}/Action.scala | 4 +- .../net/kemitix/thorp/domain}/Counters.scala | 4 +- .../net/kemitix/thorp/domain/LocalFile.scala | 3 +- .../thorp/domain/MatchedMetadata.scala | 2 +- .../net/kemitix/thorp/domain/RemoteKey.scala | 23 +- .../kemitix/thorp/domain/RemoteObjects.scala | 31 ++- .../net/kemitix/thorp/domain/Sources.scala | 21 +- .../thorp/domain/UploadEventListener.scala | 5 +- .../thorp/domain/UploadEventLogger.scala | 12 +- .../kemitix/thorp/domain/RemoteKeyTest.scala | 58 ++++ .../kemitix/thorp/filesystem/FileSystem.scala | 76 +++++- .../kemitix/thorp/filesystem}/Hasher.scala | 3 +- .../thorp/filesystem}/MD5HashGenerator.scala | 5 +- .../net/kemitix/thorp/filesystem}/big-file | Bin .../thorp/filesystem}/upload/root-file | 0 .../thorp/filesystem/FileSystemTest.scala | 42 +++ .../filesystem}/MD5HashGeneratorTest.scala | 4 +- .../kemitix/thorp/filesystem}/Resource.scala | 2 +- .../kemitix/thorp/lib}/ActionGenerator.scala | 6 +- .../net/kemitix/thorp/lib/CoreTypes.scala | 21 ++ .../net/kemitix/thorp/lib}/EventQueue.scala | 2 +- .../net/kemitix/thorp/lib/FileScanner.scala | 88 ++++++ .../net/kemitix/thorp/lib}/Filters.scala | 2 +- .../net/kemitix/thorp/lib}/KeyGenerator.scala | 2 +- .../kemitix/thorp/lib}/LocalFileStream.scala | 7 +- .../kemitix/thorp/lib/LocalFileSystem.scala | 256 ++++++++++++++++++ .../thorp/lib}/LocalFileValidator.scala | 4 +- .../net/kemitix/thorp/lib}/LocalFiles.scala | 2 +- .../net/kemitix/thorp/lib}/PlanBuilder.scala | 33 +-- .../net/kemitix/thorp/lib}/PlanExecutor.scala | 6 +- .../scala/net/kemitix/thorp/lib}/Remote.scala | 2 +- .../thorp/lib/S3MetaDataEnricher.scala | 50 ++++ .../net/kemitix/thorp/lib}/SequencePlan.scala | 5 +- .../kemitix/thorp/lib}/SequencedAction.scala | 4 +- .../net/kemitix/thorp/lib/SyncLogging.scala | 18 ++ .../net/kemitix/thorp/lib}/SyncPlan.scala | 4 +- .../net/kemitix/thorp/lib/ThorpArchive.scala | 47 ++++ .../thorp/lib/UnversionedMirrorArchive.scala | 48 ++++ .../net/kemitix/thorp/lib}/File-6964 | Bin .../net/kemitix/thorp/lib}/empty-file | 0 .../net/kemitix/thorp/lib}/invalid-config | 0 .../net/kemitix/thorp/lib}/simple-config | 0 .../net/kemitix/thorp/lib}/small-file | Bin .../net/kemitix/thorp/lib/upload/root-file | 1 + .../thorp/lib}/upload/subdir/leaf-file | 0 .../thorp/lib}/ActionGeneratorSuite.scala | 16 +- .../thorp/lib}/DummyStorageService.scala | 7 +- .../scala/net/kemitix/thorp/lib/EIPTest.scala | 50 ++++ .../net/kemitix/thorp/lib}/FiltersSuite.scala | 2 +- .../thorp/lib}/KeyGeneratorSuite.scala | 4 +- .../thorp/lib}/LocalFileStreamSuite.scala | 10 +- .../lib}/MatchedMetadataEnricherSuite.scala | 63 +++-- .../kemitix/thorp/lib}/PlanBuilderTest.scala | 34 +-- .../kemitix/thorp/lib}/PlanExecutorTest.scala | 10 +- .../kemitix/thorp/lib}/SequencePlanTest.scala | 16 +- modules.dot | 26 ++ .../kemitix/thorp/storage/aws/Lister.scala | 15 +- .../thorp/storage/aws/S3ObjectsByHash.scala | 13 +- .../kemitix/thorp/storage/aws/S3Storage.scala | 12 +- .../kemitix/thorp/storage/aws/Uploader.scala | 19 +- .../storage/aws/hasher/ETagGenerator.scala | 3 +- .../thorp/storage/aws/hasher/S3Hasher.scala | 7 +- .../aws/AmazonS3ClientTestFixture.scala | 10 +- .../thorp/storage/aws/ListerTest.scala | 19 +- .../storage/aws/S3ObjectsByHashSuite.scala | 4 +- .../storage/aws/StorageServiceSuite.scala | 16 +- .../thorp/storage/aws/UploaderTest.scala | 7 +- .../aws/hasher/ETagGeneratorTest.scala | 4 +- .../net/kemitix/thorp/storage}/Storage.scala | 19 +- .../kemitix/throp/uishell/ProgressEvent.scala | 22 ++ .../net/kemitix/throp/uishell/UIEvent.scala | 37 +++ .../net/kemitix/throp/uishell/UIShell.scala | 96 +++++++ 85 files changed, 1357 insertions(+), 554 deletions(-) rename {cli/src/main/scala/net/kemitix/thorp/cli => app/src/main/scala/net/kemitix/thorp}/Main.scala (80%) create mode 100644 app/src/main/scala/net/kemitix/thorp/Program.scala rename {config/src/main/scala/net/kemitix/thorp/config => cli/src/main/scala/net/kemitix/thorp/cli}/CliArgs.scala (95%) delete mode 100644 cli/src/main/scala/net/kemitix/thorp/cli/Program.scala rename {config/src/test/scala/net/kemitix/thorp/config => cli/src/test/scala/net/kemitix/thorp/cli}/CliArgsTest.scala (94%) delete mode 100644 core/src/main/scala/net/kemitix/thorp/core/CoreTypes.scala delete mode 100644 core/src/main/scala/net/kemitix/thorp/core/S3MetaDataEnricher.scala delete mode 100644 core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala delete mode 100644 core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala delete mode 100644 core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala rename {core/src/main/scala/net/kemitix/thorp/core => domain/src/main/scala/net/kemitix/thorp/domain}/Action.scala (85%) rename {core/src/main/scala/net/kemitix/thorp/core => domain/src/main/scala/net/kemitix/thorp/domain}/Counters.scala (89%) rename {core/src/main/scala/net/kemitix/thorp/core/hasher => filesystem/src/main/scala/net/kemitix/thorp/filesystem}/Hasher.scala (97%) rename {core/src/main/scala/net/kemitix/thorp/core/hasher => filesystem/src/main/scala/net/kemitix/thorp/filesystem}/MD5HashGenerator.scala (95%) rename {core/src/test/resources/net/kemitix/thorp/core => filesystem/src/test/resources/net/kemitix/thorp/filesystem}/big-file (100%) rename {core/src/test/resources/net/kemitix/thorp/core => filesystem/src/test/resources/net/kemitix/thorp/filesystem}/upload/root-file (100%) create mode 100644 filesystem/src/test/scala/net/kemitix/thorp/filesystem/FileSystemTest.scala rename {core/src/test/scala/net/kemitix/thorp/core/hasher => filesystem/src/test/scala/net/kemitix/thorp/filesystem}/MD5HashGeneratorTest.scala (94%) rename {config/src/main/scala/net/kemitix/thorp/config => filesystem/src/test/scala/net/kemitix/thorp/filesystem}/Resource.scala (80%) rename {core/src/main/scala/net/kemitix/thorp/core => lib/src/main/scala/net/kemitix/thorp/lib}/ActionGenerator.scala (95%) create mode 100644 lib/src/main/scala/net/kemitix/thorp/lib/CoreTypes.scala rename {core/src/main/scala/net/kemitix/thorp/core => lib/src/main/scala/net/kemitix/thorp/lib}/EventQueue.scala (82%) create mode 100644 lib/src/main/scala/net/kemitix/thorp/lib/FileScanner.scala rename {core/src/main/scala/net/kemitix/thorp/core => lib/src/main/scala/net/kemitix/thorp/lib}/Filters.scala (97%) rename {core/src/main/scala/net/kemitix/thorp/core => lib/src/main/scala/net/kemitix/thorp/lib}/KeyGenerator.scala (92%) rename {core/src/main/scala/net/kemitix/thorp/core => lib/src/main/scala/net/kemitix/thorp/lib}/LocalFileStream.scala (93%) create mode 100644 lib/src/main/scala/net/kemitix/thorp/lib/LocalFileSystem.scala rename {core/src/main/scala/net/kemitix/thorp/core => lib/src/main/scala/net/kemitix/thorp/lib}/LocalFileValidator.scala (94%) rename {core/src/main/scala/net/kemitix/thorp/core => lib/src/main/scala/net/kemitix/thorp/lib}/LocalFiles.scala (95%) rename {core/src/main/scala/net/kemitix/thorp/core => lib/src/main/scala/net/kemitix/thorp/lib}/PlanBuilder.scala (75%) rename {core/src/main/scala/net/kemitix/thorp/core => lib/src/main/scala/net/kemitix/thorp/lib}/PlanExecutor.scala (92%) rename {core/src/main/scala/net/kemitix/thorp/core => lib/src/main/scala/net/kemitix/thorp/lib}/Remote.scala (96%) create mode 100644 lib/src/main/scala/net/kemitix/thorp/lib/S3MetaDataEnricher.scala rename {core/src/main/scala/net/kemitix/thorp/core => lib/src/main/scala/net/kemitix/thorp/lib}/SequencePlan.scala (58%) rename {core/src/main/scala/net/kemitix/thorp/core => lib/src/main/scala/net/kemitix/thorp/lib}/SequencedAction.scala (50%) create mode 100644 lib/src/main/scala/net/kemitix/thorp/lib/SyncLogging.scala rename {core/src/main/scala/net/kemitix/thorp/core => lib/src/main/scala/net/kemitix/thorp/lib}/SyncPlan.scala (78%) create mode 100644 lib/src/main/scala/net/kemitix/thorp/lib/ThorpArchive.scala create mode 100644 lib/src/main/scala/net/kemitix/thorp/lib/UnversionedMirrorArchive.scala rename {core/src/test/resources/net/kemitix/thorp/core => lib/src/test/resources/net/kemitix/thorp/lib}/File-6964 (100%) rename {core/src/test/resources/net/kemitix/thorp/core => lib/src/test/resources/net/kemitix/thorp/lib}/empty-file (100%) rename {core/src/test/resources/net/kemitix/thorp/core => lib/src/test/resources/net/kemitix/thorp/lib}/invalid-config (100%) rename {core/src/test/resources/net/kemitix/thorp/core => lib/src/test/resources/net/kemitix/thorp/lib}/simple-config (100%) rename {core/src/test/resources/net/kemitix/thorp/core => lib/src/test/resources/net/kemitix/thorp/lib}/small-file (100%) create mode 100644 lib/src/test/resources/net/kemitix/thorp/lib/upload/root-file rename {core/src/test/resources/net/kemitix/thorp/core => lib/src/test/resources/net/kemitix/thorp/lib}/upload/subdir/leaf-file (100%) rename {core/src/test/scala/net/kemitix/thorp/core => lib/src/test/scala/net/kemitix/thorp/lib}/ActionGeneratorSuite.scala (93%) rename {core/src/test/scala/net/kemitix/thorp/core => lib/src/test/scala/net/kemitix/thorp/lib}/DummyStorageService.scala (88%) create mode 100644 lib/src/test/scala/net/kemitix/thorp/lib/EIPTest.scala rename {core/src/test/scala/net/kemitix/thorp/core => lib/src/test/scala/net/kemitix/thorp/lib}/FiltersSuite.scala (99%) rename {core/src/test/scala/net/kemitix/thorp/core => lib/src/test/scala/net/kemitix/thorp/lib}/KeyGeneratorSuite.scala (94%) rename {core/src/test/scala/net/kemitix/thorp/core => lib/src/test/scala/net/kemitix/thorp/lib}/LocalFileStreamSuite.scala (92%) rename {core/src/test/scala/net/kemitix/thorp/core => lib/src/test/scala/net/kemitix/thorp/lib}/MatchedMetadataEnricherSuite.scala (85%) rename {core/src/test/scala/net/kemitix/thorp/core => lib/src/test/scala/net/kemitix/thorp/lib}/PlanBuilderTest.scala (93%) rename {core/src/test/scala/net/kemitix/thorp/core => lib/src/test/scala/net/kemitix/thorp/lib}/PlanExecutorTest.scala (87%) rename {core/src/test/scala/net/kemitix/thorp/core => lib/src/test/scala/net/kemitix/thorp/lib}/SequencePlanTest.scala (82%) create mode 100644 modules.dot rename {storage-api/src/main/scala/net/kemitix/thorp/storage/api => storage/src/main/scala/net/kemitix/thorp/storage}/Storage.scala (82%) create mode 100644 uishell/src/main/scala/net/kemitix/throp/uishell/ProgressEvent.scala create mode 100644 uishell/src/main/scala/net/kemitix/throp/uishell/UIEvent.scala create mode 100644 uishell/src/main/scala/net/kemitix/throp/uishell/UIShell.scala diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala b/app/src/main/scala/net/kemitix/thorp/Main.scala similarity index 80% rename from cli/src/main/scala/net/kemitix/thorp/cli/Main.scala rename to app/src/main/scala/net/kemitix/thorp/Main.scala index fdfa986..6831294 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala +++ b/app/src/main/scala/net/kemitix/thorp/Main.scala @@ -1,10 +1,12 @@ -package net.kemitix.thorp.cli +package net.kemitix.thorp import net.kemitix.thorp.config.Config import net.kemitix.thorp.console.Console import net.kemitix.thorp.filesystem.FileSystem +import net.kemitix.thorp.lib.FileScanner import net.kemitix.thorp.storage.aws.S3Storage import net.kemitix.thorp.storage.aws.hasher.S3Hasher +import zio.clock.Clock import zio.{App, ZIO} object Main extends App { @@ -12,9 +14,11 @@ object Main extends App { object LiveThorpApp extends S3Storage.Live with Console.Live + with Clock.Live with Config.Live with FileSystem.Live with S3Hasher.Live + with FileScanner.Live override def run(args: List[String]): ZIO[Environment, Nothing, Int] = Program diff --git a/app/src/main/scala/net/kemitix/thorp/Program.scala b/app/src/main/scala/net/kemitix/thorp/Program.scala new file mode 100644 index 0000000..b9e6d60 --- /dev/null +++ b/app/src/main/scala/net/kemitix/thorp/Program.scala @@ -0,0 +1,111 @@ +package net.kemitix.thorp + +import net.kemitix.eip.zio.MessageChannel.UChannel +import net.kemitix.eip.zio.{Message, MessageChannel} +import net.kemitix.thorp.cli.CliArgs +import net.kemitix.thorp.config._ +import net.kemitix.thorp.console._ +import net.kemitix.thorp.domain.{Counters, StorageQueueEvent} +import net.kemitix.thorp.domain.StorageQueueEvent.{ + CopyQueueEvent, + DeleteQueueEvent, + ErrorQueueEvent, + UploadQueueEvent +} +import net.kemitix.thorp.filesystem.{FileSystem, Hasher} +import net.kemitix.thorp.lib.CoreTypes.CoreProgram +import net.kemitix.thorp.lib._ +import net.kemitix.thorp.storage.Storage +import net.kemitix.throp.uishell.{UIEvent, UIShell} +import zio.clock.Clock +import zio.{RIO, UIO, ZIO} + +trait Program { + + lazy val version = s"Thorp v${thorp.BuildInfo.version}" + + def run(args: List[String]): CoreProgram[Unit] = { + for { + cli <- CliArgs.parse(args) + config <- ConfigurationBuilder.buildConfig(cli) + _ <- Config.set(config) + _ <- ZIO.when(showVersion(cli))(Console.putStrLn(version)) + _ <- ZIO.when(!showVersion(cli))(executeWithUI.catchAll(handleErrors)) + } yield () + } + + private def showVersion: ConfigOptions => Boolean = + cli => ConfigQuery.showVersion(cli) + + private def executeWithUI = + for { + uiEventSender <- execute + uiEventReceiver <- UIShell.receiver + _ <- MessageChannel.pointToPoint(uiEventSender)(uiEventReceiver).runDrain + } yield () + + type UIChannel = UChannel[Any, UIEvent] + + private def execute + : ZIO[Any, + Nothing, + MessageChannel.ESender[ + Storage with Config with FileSystem with Hasher with Clock with FileScanner, + Throwable, + UIEvent]] = UIO { uiChannel => + (for { + _ <- showValidConfig(uiChannel) + remoteData <- fetchRemoteData(uiChannel) + archive <- UIO(UnversionedMirrorArchive) + copyUploadEvents <- LocalFileSystem.scanCopyUpload(uiChannel, + remoteData, + archive) + deleteEvents <- LocalFileSystem.scanDelete(uiChannel, remoteData, archive) + _ <- showSummary(uiChannel)(copyUploadEvents ++ deleteEvents) + } yield ()) <* MessageChannel.endChannel(uiChannel) + } + + private def showValidConfig(uiChannel: UIChannel) = + Message.create(UIEvent.ShowValidConfig) >>= MessageChannel.send(uiChannel) + + private def fetchRemoteData(uiChannel: UIChannel) = + for { + bucket <- Config.bucket + prefix <- Config.prefix + objects <- Storage.list(bucket, prefix) + _ <- Message.create(UIEvent.RemoteDataFetched(objects.byKey.size)) >>= MessageChannel + .send(uiChannel) + } yield objects + + private def handleErrors(throwable: Throwable) = + Console.putStrLn("There were errors:") *> logValidationErrors(throwable) + + private def logValidationErrors(throwable: Throwable) = + throwable match { + case ConfigValidationException(errors) => + ZIO.foreach_(errors)(error => Console.putStrLn(s"- $error")) + } + + private def showSummary(uiChannel: UIChannel)( + events: Seq[StorageQueueEvent]): RIO[Clock, Unit] = { + val counters = events.foldLeft(Counters.empty)(countActivities) + Message.create(UIEvent.ShowSummary(counters)) >>= + MessageChannel.send(uiChannel) + } + + private def countActivities: (Counters, StorageQueueEvent) => Counters = + (counters: Counters, s3Action: StorageQueueEvent) => { + val increment: Int => Int = _ + 1 + s3Action match { + case _: UploadQueueEvent => + Counters.uploaded.modify(increment)(counters) + case _: CopyQueueEvent => Counters.copied.modify(increment)(counters) + case _: DeleteQueueEvent => Counters.deleted.modify(increment)(counters) + case _: ErrorQueueEvent => Counters.errors.modify(increment)(counters) + case _ => counters + } + } + +} + +object Program extends Program diff --git a/build.sbt b/build.sbt index 17f3da4..2a41cac 100644 --- a/build.sbt +++ b/build.sbt @@ -24,7 +24,6 @@ val commonSettings = Seq( "-deprecation", "-unchecked", "-language:postfixOps", - "-language:higherKinds", "-language:higherKinds"), wartremoverErrors ++= Warts.unsafe.filterNot(wart => List( Wart.Any, @@ -61,27 +60,26 @@ val awsSdkDependencies = Seq( ) val zioDependencies = Seq( libraryDependencies ++= Seq ( - "dev.zio" %% "zio" % "1.0.0-RC12-1" + "dev.zio" %% "zio" % "1.0.0-RC12-1", + "dev.zio" %% "zio-streams" % "1.0.0-RC12-1" ) ) -// cli -> thorp-lib -> storage-aws -> core -> storage-api -> console -> config -> domain -// storage-api -> config -> filesystem +val eipDependencies = Seq( + libraryDependencies ++= Seq( + "net.kemitix" %% "eip-zio" % "0.3.1" + ) +) lazy val thorp = (project in file(".")) .settings(commonSettings) - .aggregate(cli, `thorp-lib`, `storage-aws`, core, `storage-api`, domain) + .aggregate(app, cli, `storage-aws`, lib, `storage`, domain) -lazy val cli = (project in file("cli")) +lazy val app = (project in file("app")) .settings(commonSettings) - .settings(mainClass in assembly := Some("net.kemitix.thorp.cli.Main")) + .settings(mainClass in assembly := Some("net.kemitix.thorp.Main")) .settings(applicationSettings) - .settings(testDependencies) - .enablePlugins(BuildInfoPlugin) - .settings( - buildInfoKeys := Seq[BuildInfoKey](name, version), - buildInfoPackage := "thorp" - ) + .settings(eipDependencies) .settings(Seq( assemblyOption in assembly := ( assemblyOption in assembly).value @@ -89,39 +87,62 @@ lazy val cli = (project in file("cli")) Some(defaultShellScript)), assemblyJarName in assembly := "thorp" )) - .dependsOn(`thorp-lib`) - -lazy val `thorp-lib` = (project in file("thorp-lib")) - .settings(commonSettings) - .settings(assemblyJarName in assembly := "thorp-lib.jar") + .dependsOn(cli) + .dependsOn(lib) .dependsOn(`storage-aws`) +lazy val cli = (project in file("cli")) + .settings(commonSettings) + .settings(testDependencies) + .dependsOn(config) + .dependsOn(filesystem % "test->test") + lazy val `storage-aws` = (project in file("storage-aws")) .settings(commonSettings) .settings(assemblyJarName in assembly := "storage-aws.jar") .settings(awsSdkDependencies) .settings(testDependencies) - .dependsOn(core % "compile->compile;test->test") + .dependsOn(storage) + .dependsOn(filesystem % "compile->compile;test->test") + .dependsOn(console) + .dependsOn(lib) -lazy val core = (project in file("core")) +lazy val lib = (project in file("lib")) .settings(commonSettings) - .settings(assemblyJarName in assembly := "core.jar") + .settings(assemblyJarName in assembly := "lib.jar") .settings(testDependencies) - .dependsOn(`storage-api`) - .dependsOn(domain % "compile->compile;test->test") - -lazy val `storage-api` = (project in file("storage-api")) - .settings(commonSettings) - .settings(zioDependencies) - .settings(assemblyJarName in assembly := "storage-api.jar") + .enablePlugins(BuildInfoPlugin) + .settings( + buildInfoKeys := Seq[BuildInfoKey](name, version), + buildInfoPackage := "thorp" + ) + .dependsOn(storage) .dependsOn(console) .dependsOn(config) + .dependsOn(domain % "compile->compile;test->test") + .dependsOn(filesystem % "compile->compile;test->test") + +lazy val storage = (project in file("storage")) + .settings(commonSettings) + .settings(zioDependencies) + .settings(assemblyJarName in assembly := "storage.jar") + .dependsOn(uishell) + .dependsOn(domain) + +lazy val uishell = (project in file("uishell")) + .settings(commonSettings) + .settings(zioDependencies) + .settings(eipDependencies) + .settings(assemblyJarName in assembly := "uishell.jar") + .dependsOn(config) + .dependsOn(console) + .dependsOn(filesystem) lazy val console = (project in file("console")) .settings(commonSettings) .settings(zioDependencies) .settings(assemblyJarName in assembly := "console.jar") - .dependsOn(config) + .dependsOn(domain) lazy val config = (project in file("config")) .settings(commonSettings) @@ -137,6 +158,7 @@ lazy val filesystem = (project in file("filesystem")) .settings(zioDependencies) .settings(testDependencies) .settings(assemblyJarName in assembly := "filesystem.jar") + .dependsOn(domain % "compile->compile;test->test") lazy val domain = (project in file("domain")) .settings(commonSettings) diff --git a/config/src/main/scala/net/kemitix/thorp/config/CliArgs.scala b/cli/src/main/scala/net/kemitix/thorp/cli/CliArgs.scala similarity index 95% rename from config/src/main/scala/net/kemitix/thorp/config/CliArgs.scala rename to cli/src/main/scala/net/kemitix/thorp/cli/CliArgs.scala index 973e8eb..cdb35f4 100644 --- a/config/src/main/scala/net/kemitix/thorp/config/CliArgs.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/CliArgs.scala @@ -1,7 +1,8 @@ -package net.kemitix.thorp.config +package net.kemitix.thorp.cli import java.nio.file.Paths +import net.kemitix.thorp.config.{ConfigOption, ConfigOptions} import scopt.OParser import zio.Task diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala deleted file mode 100644 index 9aedde4..0000000 --- a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala +++ /dev/null @@ -1,46 +0,0 @@ -package net.kemitix.thorp.cli - -import net.kemitix.thorp.config._ -import net.kemitix.thorp.console._ -import net.kemitix.thorp.core.CoreTypes.CoreProgram -import net.kemitix.thorp.core._ -import zio.ZIO - -trait Program { - - lazy val version = s"Thorp v${thorp.BuildInfo.version}" - - def run(args: List[String]): CoreProgram[Unit] = { - for { - cli <- CliArgs.parse(args) - config <- ConfigurationBuilder.buildConfig(cli) - _ <- Config.set(config) - _ <- ZIO.when(showVersion(cli))(Console.putStrLn(version)) - _ <- ZIO.when(!showVersion(cli))(execute.catchAll(handleErrors)) - } yield () - } - - private def showVersion: ConfigOptions => Boolean = - cli => ConfigQuery.showVersion(cli) - - private def execute = - for { - _ <- SyncLogging.logRunStart - syncPlan <- PlanBuilder.createPlan - archive <- UnversionedMirrorArchive.default(syncPlan.syncTotals) - events <- PlanExecutor.executePlan(archive, syncPlan) - _ <- SyncLogging.logRunFinished(events) - } yield () - - private def handleErrors(throwable: Throwable) = - Console.putStrLn("There were errors:") *> logValidationErrors(throwable) - - private def logValidationErrors(throwable: Throwable) = - throwable match { - case ConfigValidationException(errors) => - ZIO.foreach_(errors)(error => Console.putStrLn(s"- $error")) - } - -} - -object Program extends Program diff --git a/config/src/test/scala/net/kemitix/thorp/config/CliArgsTest.scala b/cli/src/test/scala/net/kemitix/thorp/cli/CliArgsTest.scala similarity index 94% rename from config/src/test/scala/net/kemitix/thorp/config/CliArgsTest.scala rename to cli/src/test/scala/net/kemitix/thorp/cli/CliArgsTest.scala index 9eb26ad..e271615 100644 --- a/config/src/test/scala/net/kemitix/thorp/config/CliArgsTest.scala +++ b/cli/src/test/scala/net/kemitix/thorp/cli/CliArgsTest.scala @@ -1,8 +1,10 @@ -package net.kemitix.thorp.config +package net.kemitix.thorp.cli import java.nio.file.Paths import net.kemitix.thorp.config.ConfigOption.Debug +import net.kemitix.thorp.config.{ConfigOptions, ConfigQuery} +import net.kemitix.thorp.filesystem.Resource import org.scalatest.FunSpec import zio.DefaultRuntime diff --git a/console/src/main/scala/net/kemitix/thorp/console/Console.scala b/console/src/main/scala/net/kemitix/thorp/console/Console.scala index 3d588bd..6dcede4 100644 --- a/console/src/main/scala/net/kemitix/thorp/console/Console.scala +++ b/console/src/main/scala/net/kemitix/thorp/console/Console.scala @@ -3,7 +3,6 @@ package net.kemitix.thorp.console import java.io.PrintStream import java.util.concurrent.atomic.AtomicReference -import net.kemitix.thorp.config.Config import zio.{UIO, ZIO} import scala.{Console => SConsole} @@ -61,8 +60,8 @@ object Console { final def putMessageLn(line: ConsoleOut): ZIO[Console, Nothing, Unit] = ZIO.accessM(_.console putMessageLn line) - final def putMessageLnB( - line: ConsoleOut.WithBatchMode): ZIO[Console with Config, Nothing, Unit] = - ZIO.accessM(line() >>= _.console.putStrLn) + final def putMessageLnB(line: ConsoleOut.WithBatchMode, + batchMode: Boolean): ZIO[Console, Nothing, Unit] = + ZIO.accessM(line(batchMode) >>= _.console.putStrLn) } diff --git a/console/src/main/scala/net/kemitix/thorp/console/ConsoleOut.scala b/console/src/main/scala/net/kemitix/thorp/console/ConsoleOut.scala index 4a9c9d3..0e9b431 100644 --- a/console/src/main/scala/net/kemitix/thorp/console/ConsoleOut.scala +++ b/console/src/main/scala/net/kemitix/thorp/console/ConsoleOut.scala @@ -1,10 +1,9 @@ package net.kemitix.thorp.console -import net.kemitix.thorp.config.Config import net.kemitix.thorp.domain.StorageQueueEvent.Action import net.kemitix.thorp.domain.Terminal._ import net.kemitix.thorp.domain.{Bucket, RemoteKey, Sources} -import zio.{UIO, ZIO} +import zio.UIO import scala.io.AnsiColor._ @@ -17,8 +16,8 @@ object ConsoleOut { sealed trait WithBatchMode { def en: String def enBatch: String - def apply(): ZIO[Config, Nothing, String] = - Config.batchMode >>= selectLine + def apply(batchMode: Boolean): UIO[String] = + selectLine(batchMode) private def selectLine(batchMode: Boolean) = if (batchMode) UIO(enBatch) else UIO(en) } diff --git a/core/src/main/scala/net/kemitix/thorp/core/CoreTypes.scala b/core/src/main/scala/net/kemitix/thorp/core/CoreTypes.scala deleted file mode 100644 index 7da3ff9..0000000 --- a/core/src/main/scala/net/kemitix/thorp/core/CoreTypes.scala +++ /dev/null @@ -1,15 +0,0 @@ -package net.kemitix.thorp.core - -import net.kemitix.thorp.config.Config -import net.kemitix.thorp.console.Console -import net.kemitix.thorp.core.hasher.Hasher -import net.kemitix.thorp.filesystem.FileSystem -import net.kemitix.thorp.storage.api.Storage -import zio.ZIO - -object CoreTypes { - - type CoreEnv = Storage with Console with Config with FileSystem with Hasher - type CoreProgram[A] = ZIO[CoreEnv, Throwable, A] - -} diff --git a/core/src/main/scala/net/kemitix/thorp/core/S3MetaDataEnricher.scala b/core/src/main/scala/net/kemitix/thorp/core/S3MetaDataEnricher.scala deleted file mode 100644 index c89bf57..0000000 --- a/core/src/main/scala/net/kemitix/thorp/core/S3MetaDataEnricher.scala +++ /dev/null @@ -1,40 +0,0 @@ -package net.kemitix.thorp.core - -import net.kemitix.thorp.domain._ - -object S3MetaDataEnricher { - - def getMetadata( - localFile: LocalFile, - remoteObjects: RemoteObjects - ): MatchedMetadata = { - val (keyMatches, hashMatches) = getS3Status(localFile, remoteObjects) - MatchedMetadata( - localFile, - matchByKey = keyMatches.map { hash => - RemoteMetaData(localFile.remoteKey, hash) - }, - matchByHash = hashMatches.map { - case (key, hash) => RemoteMetaData(key, hash) - } - ) - } - - def getS3Status( - localFile: LocalFile, - remoteObjects: RemoteObjects - ): (Option[MD5Hash], Set[(RemoteKey, MD5Hash)]) = { - val matchingByKey = remoteObjects.byKey.get(localFile.remoteKey) - val matchingByHash = localFile.hashes - .map { - case (_, md5Hash) => - remoteObjects.byHash - .getOrElse(md5Hash, Set()) - .map(key => (key, md5Hash)) - } - .flatten - .toSet - (matchingByKey, matchingByHash) - } - -} diff --git a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala b/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala deleted file mode 100644 index 2c5071e..0000000 --- a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala +++ /dev/null @@ -1,58 +0,0 @@ -package net.kemitix.thorp.core - -import net.kemitix.thorp.config.Config -import net.kemitix.thorp.console._ -import net.kemitix.thorp.domain.StorageQueueEvent.{ - CopyQueueEvent, - DeleteQueueEvent, - ErrorQueueEvent, - UploadQueueEvent -} -import net.kemitix.thorp.domain._ -import net.kemitix.thorp.domain.Terminal.eraseToEndOfScreen -import zio.ZIO - -trait SyncLogging { - - def logRunStart: ZIO[Console with Config, Nothing, Unit] = - for { - bucket <- Config.bucket - prefix <- Config.prefix - sources <- Config.sources - _ <- Console.putMessageLn(ConsoleOut.ValidConfig(bucket, prefix, sources)) - } yield () - - def logFileScan: ZIO[Config with Console, Nothing, Unit] = - for { - sources <- Config.sources - _ <- Console.putStrLn( - s"Scanning local files: ${sources.paths.mkString(", ")}...") - } yield () - - def logRunFinished( - actions: Seq[StorageQueueEvent] - ): ZIO[Console, Nothing, Unit] = { - val counters = actions.foldLeft(Counters.empty)(countActivities) - Console.putStrLn(eraseToEndOfScreen) *> - Console.putStrLn(s"Uploaded ${counters.uploaded} files") *> - Console.putStrLn(s"Copied ${counters.copied} files") *> - Console.putStrLn(s"Deleted ${counters.deleted} files") *> - Console.putStrLn(s"Errors ${counters.errors}") - } - - private def countActivities: (Counters, StorageQueueEvent) => Counters = - (counters: Counters, s3Action: StorageQueueEvent) => { - import Counters._ - val increment: Int => Int = _ + 1 - s3Action match { - case _: UploadQueueEvent => uploaded.modify(increment)(counters) - case _: CopyQueueEvent => copied.modify(increment)(counters) - case _: DeleteQueueEvent => deleted.modify(increment)(counters) - case _: ErrorQueueEvent => errors.modify(increment)(counters) - case _ => counters - } - } - -} - -object SyncLogging extends SyncLogging diff --git a/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala b/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala deleted file mode 100644 index 39fea9d..0000000 --- a/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala +++ /dev/null @@ -1,38 +0,0 @@ -package net.kemitix.thorp.core - -import net.kemitix.thorp.config.Config -import net.kemitix.thorp.console.ConsoleOut.{ - CopyComplete, - DeleteComplete, - ErrorQueueEventOccurred, - UploadComplete -} -import net.kemitix.thorp.console._ -import net.kemitix.thorp.domain.StorageQueueEvent -import net.kemitix.thorp.domain.StorageQueueEvent._ -import net.kemitix.thorp.storage.api.Storage -import zio.{RIO, ZIO} - -trait ThorpArchive { - - def update( - sequencedAction: SequencedAction, - totalBytesSoFar: Long - ): RIO[Storage with Console with Config, StorageQueueEvent] - - def logEvent( - event: StorageQueueEvent): RIO[Console with Config, StorageQueueEvent] = - event match { - case UploadQueueEvent(remoteKey, _) => - ZIO(event) <* Console.putMessageLnB(UploadComplete(remoteKey)) - case CopyQueueEvent(sourceKey, targetKey) => - ZIO(event) <* Console.putMessageLnB(CopyComplete(sourceKey, targetKey)) - case DeleteQueueEvent(remoteKey) => - ZIO(event) <* Console.putMessageLnB(DeleteComplete(remoteKey)) - case ErrorQueueEvent(action, _, e) => - ZIO(event) <* Console.putMessageLnB(ErrorQueueEventOccurred(action, e)) - case DoNothingQueueEvent(_) => ZIO(event) - case ShutdownQueueEvent() => ZIO(event) - } - -} diff --git a/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala b/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala deleted file mode 100644 index 60010f9..0000000 --- a/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala +++ /dev/null @@ -1,52 +0,0 @@ -package net.kemitix.thorp.core - -import net.kemitix.thorp.config.Config -import net.kemitix.thorp.console._ -import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} -import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent -import net.kemitix.thorp.domain._ -import net.kemitix.thorp.storage.api.Storage -import zio.{Task, RIO} - -final case class UnversionedMirrorArchive(syncTotals: SyncTotals) - extends ThorpArchive { - - override def update( - sequencedAction: SequencedAction, - totalBytesSoFar: Long - ): RIO[Storage with Console with Config, StorageQueueEvent] = - sequencedAction match { - case SequencedAction(ToUpload(bucket, localFile, _), index) => - doUpload(index, totalBytesSoFar, bucket, localFile) >>= logEvent - case SequencedAction(ToCopy(bucket, sourceKey, hash, targetKey, _), _) => - Storage.copy(bucket, sourceKey, hash, targetKey) >>= logEvent - case SequencedAction(ToDelete(bucket, remoteKey, _), _) => - Storage.delete(bucket, remoteKey) >>= logEvent - case SequencedAction(DoNothing(_, remoteKey, _), _) => - Task(DoNothingQueueEvent(remoteKey)) - } - - private def doUpload( - index: Int, - totalBytesSoFar: Long, - bucket: Bucket, - localFile: LocalFile - ) = - Storage.upload( - localFile, - bucket, - UploadEventListener.Settings( - localFile, - index, - syncTotals, - totalBytesSoFar - ) - ) -} - -object UnversionedMirrorArchive { - def default(syncTotals: SyncTotals): Task[ThorpArchive] = - Task { - new UnversionedMirrorArchive(syncTotals) - } -} diff --git a/core/src/main/scala/net/kemitix/thorp/core/Action.scala b/domain/src/main/scala/net/kemitix/thorp/domain/Action.scala similarity index 85% rename from core/src/main/scala/net/kemitix/thorp/core/Action.scala rename to domain/src/main/scala/net/kemitix/thorp/domain/Action.scala index fa75c66..aa30550 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/Action.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/Action.scala @@ -1,6 +1,4 @@ -package net.kemitix.thorp.core - -import net.kemitix.thorp.domain.{Bucket, LocalFile, MD5Hash, RemoteKey} +package net.kemitix.thorp.domain sealed trait Action { def bucket: Bucket diff --git a/core/src/main/scala/net/kemitix/thorp/core/Counters.scala b/domain/src/main/scala/net/kemitix/thorp/domain/Counters.scala similarity index 89% rename from core/src/main/scala/net/kemitix/thorp/core/Counters.scala rename to domain/src/main/scala/net/kemitix/thorp/domain/Counters.scala index d1f5654..a90b8bc 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/Counters.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/Counters.scala @@ -1,6 +1,4 @@ -package net.kemitix.thorp.core - -import net.kemitix.thorp.domain.SimpleLens +package net.kemitix.thorp.domain final case class Counters( uploaded: Int, diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/LocalFile.scala b/domain/src/main/scala/net/kemitix/thorp/domain/LocalFile.scala index 783f845..dad6864 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/LocalFile.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/LocalFile.scala @@ -10,7 +10,8 @@ final case class LocalFile private ( file: File, source: File, hashes: Map[HashType, MD5Hash], - remoteKey: RemoteKey + remoteKey: RemoteKey, + length: Long ) object LocalFile { diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/MatchedMetadata.scala b/domain/src/main/scala/net/kemitix/thorp/domain/MatchedMetadata.scala index da5845f..bcbfd46 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/MatchedMetadata.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/MatchedMetadata.scala @@ -3,6 +3,6 @@ package net.kemitix.thorp.domain // For the LocalFile, the set of matching S3 objects with the same MD5Hash, and any S3 object with the same remote key final case class MatchedMetadata( localFile: LocalFile, - matchByHash: Set[RemoteMetaData], //TODO Can this be an Option? + matchByHash: Option[RemoteMetaData], matchByKey: Option[RemoteMetaData] ) diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/RemoteKey.scala b/domain/src/main/scala/net/kemitix/thorp/domain/RemoteKey.scala index 0d99bb6..291c29c 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/RemoteKey.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/RemoteKey.scala @@ -2,23 +2,34 @@ package net.kemitix.thorp.domain import java.io.File import java.nio.file.{Path, Paths} + import Implicits._ +import zio.UIO final case class RemoteKey(key: String) object RemoteKey { + val key: SimpleLens[RemoteKey, String] = SimpleLens[RemoteKey, String](_.key, b => a => b.copy(key = a)) + def asFile(source: Path, prefix: RemoteKey)( remoteKey: RemoteKey): Option[File] = - if (remoteKey.key.length === 0) None + if (remoteKey.key.length === 0 || !remoteKey.key.startsWith(prefix.key)) + None else Some(source.resolve(RemoteKey.relativeTo(prefix)(remoteKey)).toFile) - def relativeTo(prefix: RemoteKey)(remoteKey: RemoteKey): Path = { - prefix match { - case RemoteKey("") => Paths.get(remoteKey.key) - case _ => Paths.get(prefix.key).relativize(Paths.get(remoteKey.key)) - } + + def relativeTo(prefix: RemoteKey)(remoteKey: RemoteKey): Path = prefix match { + case RemoteKey("") => Paths.get(remoteKey.key) + case _ => Paths.get(prefix.key).relativize(Paths.get(remoteKey.key)) } + def resolve(path: String)(remoteKey: RemoteKey): RemoteKey = RemoteKey(List(remoteKey.key, path).filterNot(_.isEmpty).mkString("/")) + + def fromSourcePath(source: Path, path: Path): RemoteKey = + RemoteKey(source.relativize(path).toString) + + def from(source: Path, prefix: RemoteKey, file: File): UIO[RemoteKey] = + UIO(RemoteKey.resolve(source.relativize(file.toPath).toString)(prefix)) } diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/RemoteObjects.scala b/domain/src/main/scala/net/kemitix/thorp/domain/RemoteObjects.scala index abe4dc0..b8b590c 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/RemoteObjects.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/RemoteObjects.scala @@ -1,18 +1,45 @@ package net.kemitix.thorp.domain +import zio.UIO + import scala.collection.MapView /** * A list of objects and their MD5 hash values. */ final case class RemoteObjects private ( - byHash: MapView[MD5Hash, Set[RemoteKey]], + byHash: MapView[MD5Hash, RemoteKey], byKey: MapView[RemoteKey, MD5Hash] ) object RemoteObjects { + val empty: RemoteObjects = RemoteObjects(MapView.empty, MapView.empty) - def create(byHash: MapView[MD5Hash, Set[RemoteKey]], + + def create(byHash: MapView[MD5Hash, RemoteKey], byKey: MapView[RemoteKey, MD5Hash]): RemoteObjects = RemoteObjects(byHash, byKey) + + def remoteKeyExists( + remoteObjects: RemoteObjects, + remoteKey: RemoteKey + ): UIO[Boolean] = UIO(remoteObjects.byKey.contains(remoteKey)) + + def remoteMatchesLocalFile( + remoteObjects: RemoteObjects, + localFile: LocalFile + ): UIO[Boolean] = + UIO( + remoteObjects.byKey + .get(localFile.remoteKey) + .exists(LocalFile.matchesHash(localFile))) + + def remoteHasHash( + remoteObjects: RemoteObjects, + hashes: Map[HashType, MD5Hash] + ): UIO[Option[(RemoteKey, MD5Hash)]] = + UIO(remoteObjects.byHash.collectFirst { + case (hash, key) if (hashes.values.exists(h => h == hash)) => (key, hash) + }) + } diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/Sources.scala b/domain/src/main/scala/net/kemitix/thorp/domain/Sources.scala index d810792..668eca2 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/Sources.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/Sources.scala @@ -2,7 +2,7 @@ package net.kemitix.thorp.domain import java.nio.file.Path -import zio.{Task, ZIO} +import zio.{UIO, ZIO} /** * The paths to synchronise with target. @@ -14,13 +14,16 @@ import zio.{Task, ZIO} * * A path should only occur once in paths. */ -final case class Sources( - paths: List[Path] -) { +final case class Sources(paths: List[Path]) { def +(path: Path): Sources = this ++ List(path) def ++(otherPaths: List[Path]): Sources = - Sources(otherPaths.foldLeft(paths)((acc, path) => - if (acc contains path) acc else acc ++ List(path))) + Sources( + otherPaths.foldLeft(paths)( + (acc, path) => + if (acc contains path) acc + else acc ++ List(path) + ) + ) } object Sources { @@ -29,8 +32,10 @@ object Sources { /** * Returns the source path for the given path. */ - def forPath(path: Path)(sources: Sources): Task[Path] = + def forPath(path: Path)(sources: Sources): UIO[Path] = ZIO .fromOption(sources.paths.find(s => path.startsWith(s))) - .mapError(_ => new Exception("Path is not within any known source")) + .orDieWith { _ => + new RuntimeException("Path is not within any known source") + } } diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventListener.scala b/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventListener.scala index e532dda..7a9981f 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventListener.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventListener.scala @@ -10,8 +10,8 @@ object UploadEventListener { final case class Settings( localFile: LocalFile, index: Int, - syncTotals: SyncTotals, - totalBytesSoFar: Long + totalBytesSoFar: Long, + batchMode: Boolean ) def listener(settings: Settings): UploadEvent => Unit = { @@ -24,7 +24,6 @@ object UploadEventListener { RequestCycle(settings.localFile, bytesTransferred.addAndGet(e.transferred), settings.index, - settings.syncTotals, settings.totalBytesSoFar)) case _ => () } diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventLogger.scala b/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventLogger.scala index 959fa5a..6cefc33 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventLogger.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventLogger.scala @@ -10,14 +10,13 @@ object UploadEventLogger { localFile: LocalFile, bytesTransferred: Long, index: Int, - syncTotals: SyncTotals, totalBytesSoFar: Long ) def apply(requestCycle: RequestCycle): Unit = { val remoteKey = requestCycle.localFile.remoteKey.key val fileLength = requestCycle.localFile.file.length - val statusHeight = 7 + val statusHeight = 3 if (requestCycle.bytesTransferred < fileLength) println( s"${GREEN}Uploading:$RESET $remoteKey$eraseToEndOfScreen\n" + @@ -25,15 +24,6 @@ object UploadEventLogger { SizeTranslation.sizeInEnglish, requestCycle.bytesTransferred, fileLength) + - statusWithBar("Files", - l => l.toString, - requestCycle.index, - requestCycle.syncTotals.count) + - statusWithBar( - " Size", - SizeTranslation.sizeInEnglish, - requestCycle.bytesTransferred + requestCycle.totalBytesSoFar, - requestCycle.syncTotals.totalSizeBytes) + s"${Terminal.cursorPrevLine(statusHeight)}") } diff --git a/domain/src/test/scala/net/kemitix/thorp/domain/RemoteKeyTest.scala b/domain/src/test/scala/net/kemitix/thorp/domain/RemoteKeyTest.scala index 6bbc0f3..86342ee 100644 --- a/domain/src/test/scala/net/kemitix/thorp/domain/RemoteKeyTest.scala +++ b/domain/src/test/scala/net/kemitix/thorp/domain/RemoteKeyTest.scala @@ -4,6 +4,7 @@ import java.io.File import java.nio.file.Paths import org.scalatest.FreeSpec +import zio.DefaultRuntime class RemoteKeyTest extends FreeSpec { @@ -67,6 +68,63 @@ class RemoteKeyTest extends FreeSpec { assertResult(expected)(result) } } + "fromSourcePath" - { + "when path in source" in { + val source = Paths.get("/source") + val path = source.resolve("/source/child") + val expected = RemoteKey("child") + val result = RemoteKey.fromSourcePath(source, path) + assertResult(expected)(result) + } + } + "from source, prefix, file" - { + "when file in source" in { + val source = Paths.get("/source") + val prefix = RemoteKey("prefix") + val file = new File("/source/dir/filename") + val expected = RemoteKey("prefix/dir/filename") + val program = RemoteKey.from(source, prefix, file) + val result = new DefaultRuntime {}.unsafeRunSync(program).toEither + assertResult(Right(expected))(result) + } + } + } + "asFile" - { + "remoteKey is empty" in { + val source = Paths.get("/source") + val prefix = RemoteKey("prefix") + val remoteKey = RemoteKey("") + + val expected = None + + val result = RemoteKey.asFile(source, prefix)(remoteKey) + + assertResult(expected)(result) + } + "remoteKey is not empty" - { + "remoteKey is within prefix" in { + val source = Paths.get("/source") + val prefix = RemoteKey("prefix") + val remoteKey = RemoteKey("prefix/key") + + val expected = Some(Paths.get("/source/key").toFile) + + val result = RemoteKey.asFile(source, prefix)(remoteKey) + + assertResult(expected)(result) + } + "remoteKey is outwith prefix" in { + val source = Paths.get("/source") + val prefix = RemoteKey("prefix") + val remoteKey = RemoteKey("elsewhere/key") + + val expected = None + + val result = RemoteKey.asFile(source, prefix)(remoteKey) + + assertResult(expected)(result) + } + } } } diff --git a/filesystem/src/main/scala/net/kemitix/thorp/filesystem/FileSystem.scala b/filesystem/src/main/scala/net/kemitix/thorp/filesystem/FileSystem.scala index b09c535..7ddc96a 100644 --- a/filesystem/src/main/scala/net/kemitix/thorp/filesystem/FileSystem.scala +++ b/filesystem/src/main/scala/net/kemitix/thorp/filesystem/FileSystem.scala @@ -4,7 +4,8 @@ import java.io.{File, FileInputStream} import java.nio.file.{Files, Path} import java.util.stream -import zio.{Task, RIO, UIO, ZIO, ZManaged} +import net.kemitix.thorp.domain.{RemoteKey, Sources} +import zio._ import scala.jdk.CollectionConverters._ @@ -13,18 +14,23 @@ trait FileSystem { } object FileSystem { - trait Service { - def fileExists(file: File): ZIO[FileSystem, Throwable, Boolean] + def fileExists(file: File): ZIO[FileSystem, Nothing, Boolean] def openManagedFileInputStream(file: File, offset: Long) : RIO[FileSystem, ZManaged[Any, Throwable, FileInputStream]] def fileLines(file: File): RIO[FileSystem, Seq[String]] + def isDirectory(file: File): RIO[FileSystem, Boolean] + def listFiles(path: Path): RIO[FileSystem, Iterable[File]] + def length(file: File): ZIO[FileSystem, Nothing, Long] + def hasLocalFile(sources: Sources, + prefix: RemoteKey, + remoteKey: RemoteKey): ZIO[FileSystem, Nothing, Boolean] } trait Live extends FileSystem { override val filesystem: Service = new Service { override def fileExists( file: File - ): RIO[FileSystem, Boolean] = ZIO(file.exists) + ): ZIO[FileSystem, Nothing, Boolean] = UIO(file.exists) override def openManagedFileInputStream(file: File, offset: Long) : RIO[FileSystem, ZManaged[Any, Throwable, FileInputStream]] = { @@ -48,18 +54,44 @@ object FileSystem { ZIO.effectTotal(lines.iterator.asScala.toList) acquire.bracketAuto(use) } + + override def isDirectory(file: File): RIO[FileSystem, Boolean] = + Task(file.isDirectory) + + override def listFiles(path: Path): RIO[FileSystem, Iterable[File]] = + Task(path.toFile.listFiles()) + + override def length(file: File): ZIO[FileSystem, Nothing, Long] = + UIO(file.length) + + override def hasLocalFile( + sources: Sources, + prefix: RemoteKey, + remoteKey: RemoteKey): ZIO[FileSystem, Nothing, Boolean] = { + ZIO.foldLeft(sources.paths)(false) { (accExists, source) => + RemoteKey + .asFile(source, prefix)(remoteKey) + .map(FileSystem.exists) + .getOrElse(UIO(false)) + .map(_ || accExists) + } + } } } object Live extends Live trait Test extends FileSystem { - val fileExistsResultMap: Task[Map[Path, File]] + val fileExistsResultMap: UIO[Map[Path, File]] val fileLinesResult: Task[List[String]] + val isDirResult: Task[Boolean] + val listFilesResult: RIO[FileSystem, Iterable[File]] + val lengthResult: UIO[Long] val managedFileInputStream: Task[ZManaged[Any, Throwable, FileInputStream]] + val hasLocalFileResult: UIO[Boolean] override val filesystem: Service = new Service { - override def fileExists(file: File): RIO[FileSystem, Boolean] = + override def fileExists(file: File): ZIO[FileSystem, Nothing, Boolean] = fileExistsResultMap.map(m => m.keys.exists(_ equals file.toPath)) override def openManagedFileInputStream(file: File, offset: Long) @@ -68,10 +100,25 @@ object FileSystem { override def fileLines(file: File): RIO[FileSystem, List[String]] = fileLinesResult + + override def isDirectory(file: File): RIO[FileSystem, Boolean] = + isDirResult + + override def listFiles(path: Path): RIO[FileSystem, Iterable[File]] = + listFilesResult + + override def length(file: File): UIO[Long] = + lengthResult + + override def hasLocalFile( + sources: Sources, + prefix: RemoteKey, + remoteKey: RemoteKey): ZIO[FileSystem, Nothing, Boolean] = + hasLocalFileResult } } - final def exists(file: File): RIO[FileSystem, Boolean] = + final def exists(file: File): ZIO[FileSystem, Nothing, Boolean] = ZIO.accessM(_.filesystem fileExists file) final def openAtOffset(file: File, offset: Long) @@ -84,4 +131,19 @@ object FileSystem { final def lines(file: File): RIO[FileSystem, Seq[String]] = ZIO.accessM(_.filesystem fileLines (file)) + + final def isDirectory(file: File): RIO[FileSystem, Boolean] = + ZIO.accessM(_.filesystem.isDirectory(file)) + + final def listFiles(path: Path): RIO[FileSystem, Iterable[File]] = + ZIO.accessM(_.filesystem.listFiles(path)) + + final def length(file: File): ZIO[FileSystem, Nothing, Long] = + ZIO.accessM(_.filesystem.length(file)) + + final def hasLocalFile( + sources: Sources, + prefix: RemoteKey, + remoteKey: RemoteKey): ZIO[FileSystem, Nothing, Boolean] = + ZIO.accessM(_.filesystem.hasLocalFile(sources, prefix, remoteKey)) } diff --git a/core/src/main/scala/net/kemitix/thorp/core/hasher/Hasher.scala b/filesystem/src/main/scala/net/kemitix/thorp/filesystem/Hasher.scala similarity index 97% rename from core/src/main/scala/net/kemitix/thorp/core/hasher/Hasher.scala rename to filesystem/src/main/scala/net/kemitix/thorp/filesystem/Hasher.scala index 8972a2f..cdb0d3e 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/hasher/Hasher.scala +++ b/filesystem/src/main/scala/net/kemitix/thorp/filesystem/Hasher.scala @@ -1,11 +1,10 @@ -package net.kemitix.thorp.core.hasher +package net.kemitix.thorp.filesystem import java.nio.file.Path import java.util.concurrent.atomic.AtomicReference import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain.{HashType, MD5Hash} -import net.kemitix.thorp.filesystem.FileSystem import zio.{RIO, ZIO} /** diff --git a/core/src/main/scala/net/kemitix/thorp/core/hasher/MD5HashGenerator.scala b/filesystem/src/main/scala/net/kemitix/thorp/filesystem/MD5HashGenerator.scala similarity index 95% rename from core/src/main/scala/net/kemitix/thorp/core/hasher/MD5HashGenerator.scala rename to filesystem/src/main/scala/net/kemitix/thorp/filesystem/MD5HashGenerator.scala index 6cc50a3..b620744 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/hasher/MD5HashGenerator.scala +++ b/filesystem/src/main/scala/net/kemitix/thorp/filesystem/MD5HashGenerator.scala @@ -1,12 +1,11 @@ -package net.kemitix.thorp.core.hasher +package net.kemitix.thorp.filesystem import java.io.{File, FileInputStream} import java.nio.file.Path import java.security.MessageDigest import net.kemitix.thorp.domain.MD5Hash -import net.kemitix.thorp.filesystem.FileSystem -import zio.{Task, RIO} +import zio.{RIO, Task} import scala.collection.immutable.NumericRange diff --git a/core/src/test/resources/net/kemitix/thorp/core/big-file b/filesystem/src/test/resources/net/kemitix/thorp/filesystem/big-file similarity index 100% rename from core/src/test/resources/net/kemitix/thorp/core/big-file rename to filesystem/src/test/resources/net/kemitix/thorp/filesystem/big-file diff --git a/core/src/test/resources/net/kemitix/thorp/core/upload/root-file b/filesystem/src/test/resources/net/kemitix/thorp/filesystem/upload/root-file similarity index 100% rename from core/src/test/resources/net/kemitix/thorp/core/upload/root-file rename to filesystem/src/test/resources/net/kemitix/thorp/filesystem/upload/root-file diff --git a/filesystem/src/test/scala/net/kemitix/thorp/filesystem/FileSystemTest.scala b/filesystem/src/test/scala/net/kemitix/thorp/filesystem/FileSystemTest.scala new file mode 100644 index 0000000..3da7c51 --- /dev/null +++ b/filesystem/src/test/scala/net/kemitix/thorp/filesystem/FileSystemTest.scala @@ -0,0 +1,42 @@ +package net.kemitix.thorp.filesystem + +import net.kemitix.thorp.domain.{RemoteKey, Sources, TemporaryFolder} +import org.scalatest.FreeSpec +import zio.DefaultRuntime + +class FileSystemTest extends FreeSpec with TemporaryFolder { + + "Live" - { + "hasLocalFile" - { + "file exists" in { + withDirectory(dir => { + val filename = "filename" + createFile(dir, filename, contents = "") + val remoteKey = RemoteKey(filename) + val sources = Sources(List(dir)) + val prefix = RemoteKey("") + val program = FileSystem.hasLocalFile(sources, prefix, remoteKey) + val result = new DefaultRuntime {} + .unsafeRunSync(program.provide(FileSystem.Live)) + .toEither + val expected = true + assertResult(Right(expected))(result) + }) + } + "file does not exist" in { + withDirectory(dir => { + val filename = "filename" + val remoteKey = RemoteKey(filename) + val sources = Sources(List(dir)) + val prefix = RemoteKey("") + val program = FileSystem.hasLocalFile(sources, prefix, remoteKey) + val result = new DefaultRuntime {} + .unsafeRunSync(program.provide(FileSystem.Live)) + .toEither + val expected = false + assertResult(Right(expected))(result) + }) + } + } + } +} diff --git a/core/src/test/scala/net/kemitix/thorp/core/hasher/MD5HashGeneratorTest.scala b/filesystem/src/test/scala/net/kemitix/thorp/filesystem/MD5HashGeneratorTest.scala similarity index 94% rename from core/src/test/scala/net/kemitix/thorp/core/hasher/MD5HashGeneratorTest.scala rename to filesystem/src/test/scala/net/kemitix/thorp/filesystem/MD5HashGeneratorTest.scala index 7ccc624..2e7524c 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/hasher/MD5HashGeneratorTest.scala +++ b/filesystem/src/test/scala/net/kemitix/thorp/filesystem/MD5HashGeneratorTest.scala @@ -1,11 +1,9 @@ -package net.kemitix.thorp.core.hasher +package net.kemitix.thorp.filesystem import java.nio.file.Path -import net.kemitix.thorp.config.Resource import net.kemitix.thorp.domain.MD5Hash import net.kemitix.thorp.domain.MD5HashData.{BigFile, Root} -import net.kemitix.thorp.filesystem.FileSystem import org.scalatest.FunSpec import zio.DefaultRuntime diff --git a/config/src/main/scala/net/kemitix/thorp/config/Resource.scala b/filesystem/src/test/scala/net/kemitix/thorp/filesystem/Resource.scala similarity index 80% rename from config/src/main/scala/net/kemitix/thorp/config/Resource.scala rename to filesystem/src/test/scala/net/kemitix/thorp/filesystem/Resource.scala index 06554a0..fdd768e 100644 --- a/config/src/main/scala/net/kemitix/thorp/config/Resource.scala +++ b/filesystem/src/test/scala/net/kemitix/thorp/filesystem/Resource.scala @@ -1,4 +1,4 @@ -package net.kemitix.thorp.config +package net.kemitix.thorp.filesystem import java.io.File diff --git a/core/src/main/scala/net/kemitix/thorp/core/ActionGenerator.scala b/lib/src/main/scala/net/kemitix/thorp/lib/ActionGenerator.scala similarity index 95% rename from core/src/main/scala/net/kemitix/thorp/core/ActionGenerator.scala rename to lib/src/main/scala/net/kemitix/thorp/lib/ActionGenerator.scala index 7faa7da..41757d9 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ActionGenerator.scala +++ b/lib/src/main/scala/net/kemitix/thorp/lib/ActionGenerator.scala @@ -1,7 +1,7 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.lib import net.kemitix.thorp.config.Config -import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToUpload} +import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToUpload} import net.kemitix.thorp.domain.Implicits._ import net.kemitix.thorp.domain._ import zio.RIO @@ -79,7 +79,7 @@ object ActionGenerator { private def copyFile( bucket: Bucket, localFile: LocalFile, - remoteMetaData: Set[RemoteMetaData] + remoteMetaData: Option[RemoteMetaData] ) = LazyList .from(remoteMetaData) diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/CoreTypes.scala b/lib/src/main/scala/net/kemitix/thorp/lib/CoreTypes.scala new file mode 100644 index 0000000..62510ab --- /dev/null +++ b/lib/src/main/scala/net/kemitix/thorp/lib/CoreTypes.scala @@ -0,0 +1,21 @@ +package net.kemitix.thorp.lib + +import net.kemitix.thorp.config.Config +import net.kemitix.thorp.console.Console +import net.kemitix.thorp.filesystem.{FileSystem, Hasher} +import net.kemitix.thorp.storage.Storage +import zio.ZIO +import zio.clock.Clock + +object CoreTypes { + + type CoreEnv = Storage + with Console + with Config + with Clock + with FileSystem + with Hasher + with FileScanner + type CoreProgram[A] = ZIO[CoreEnv, Throwable, A] + +} diff --git a/core/src/main/scala/net/kemitix/thorp/core/EventQueue.scala b/lib/src/main/scala/net/kemitix/thorp/lib/EventQueue.scala similarity index 82% rename from core/src/main/scala/net/kemitix/thorp/core/EventQueue.scala rename to lib/src/main/scala/net/kemitix/thorp/lib/EventQueue.scala index 22adae1..fe90533 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/EventQueue.scala +++ b/lib/src/main/scala/net/kemitix/thorp/lib/EventQueue.scala @@ -1,4 +1,4 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.lib import net.kemitix.thorp.domain.StorageQueueEvent diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/FileScanner.scala b/lib/src/main/scala/net/kemitix/thorp/lib/FileScanner.scala new file mode 100644 index 0000000..fb4d5e3 --- /dev/null +++ b/lib/src/main/scala/net/kemitix/thorp/lib/FileScanner.scala @@ -0,0 +1,88 @@ +package net.kemitix.thorp.lib + +import java.io.File +import java.nio.file.Path + +import net.kemitix.eip.zio.MessageChannel.{EChannel, ESender} +import net.kemitix.eip.zio.{Message, MessageChannel} +import net.kemitix.thorp.config.Config +import net.kemitix.thorp.domain.{ + Filter, + HashType, + LocalFile, + MD5Hash, + RemoteKey, + Sources +} +import net.kemitix.thorp.filesystem.{FileSystem, Hasher} +import zio.clock.Clock +import zio.{RIO, UIO, ZIO} + +trait FileScanner { + val fileScanner: FileScanner.Service +} + +object FileScanner { + + type RemoteHashes = Map[MD5Hash, RemoteKey] + type Hashes = Map[HashType, MD5Hash] + type ScannedFile = LocalFile + type FileSender = ESender[Clock with Hasher with FileSystem with Config, + Throwable, + ScannedFile] + type ScannerChannel = EChannel[Any, Throwable, ScannedFile] + + trait Service { + def scanSources: RIO[FileScanner, FileSender] + } + + trait Live extends FileScanner { + val fileScanner: Service = new Service { + + override def scanSources: RIO[FileScanner, FileSender] = + RIO { channel => + (for { + sources <- Config.sources + _ <- ZIO.foreach(sources.paths)(scanPath(channel)(_)) + } yield ()) <* MessageChannel.endChannel(channel) + } + + private def scanPath(channel: ScannerChannel)(path: Path) + : ZIO[Clock with Config with Hasher with FileSystem, Throwable, Unit] = + for { + filters <- Config.filters + files <- FileSystem.listFiles(path) + _ <- ZIO.foreach(files)(handleFile(channel, filters)) + } yield () + + private def handleFile( + channel: ScannerChannel, + filters: List[Filter] + )(file: File) = + for { + isDir <- FileSystem.isDirectory(file) + isIncluded <- UIO(Filters.isIncluded(file.toPath)(filters)) + _ <- ZIO.when(isIncluded && isDir)(scanPath(channel)(file.toPath)) + _ <- ZIO.when(isIncluded && !isDir)(sendHashedFile(channel)(file)) + } yield () + + private def sendHashedFile(channel: ScannerChannel)(file: File) = + for { + sources <- Config.sources + source <- Sources.forPath(file.toPath)(sources) + prefix <- Config.prefix + hashes <- Hasher.hashObject(file.toPath) + remoteKey <- RemoteKey.from(source, prefix, file) + size <- FileSystem.length(file) + localFile <- ZIO( + LocalFile(file, source.toFile, hashes, remoteKey, size)) + hashedFile <- Message.create(localFile) + _ <- MessageChannel.send(channel)(hashedFile) + } yield () + } + + } + object Live extends Live + final def scanSources: RIO[FileScanner, FileSender] = + ZIO.accessM(_.fileScanner.scanSources) +} diff --git a/core/src/main/scala/net/kemitix/thorp/core/Filters.scala b/lib/src/main/scala/net/kemitix/thorp/lib/Filters.scala similarity index 97% rename from core/src/main/scala/net/kemitix/thorp/core/Filters.scala rename to lib/src/main/scala/net/kemitix/thorp/lib/Filters.scala index 329a6f2..1a7ab64 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/Filters.scala +++ b/lib/src/main/scala/net/kemitix/thorp/lib/Filters.scala @@ -1,4 +1,4 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.lib import java.nio.file.Path diff --git a/core/src/main/scala/net/kemitix/thorp/core/KeyGenerator.scala b/lib/src/main/scala/net/kemitix/thorp/lib/KeyGenerator.scala similarity index 92% rename from core/src/main/scala/net/kemitix/thorp/core/KeyGenerator.scala rename to lib/src/main/scala/net/kemitix/thorp/lib/KeyGenerator.scala index 1e4f90d..4cf2c4f 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/KeyGenerator.scala +++ b/lib/src/main/scala/net/kemitix/thorp/lib/KeyGenerator.scala @@ -1,4 +1,4 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.lib import java.nio.file.Path diff --git a/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala b/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileStream.scala similarity index 93% rename from core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala rename to lib/src/main/scala/net/kemitix/thorp/lib/LocalFileStream.scala index e2b86e7..ebe7db6 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala +++ b/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileStream.scala @@ -1,13 +1,12 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.lib import java.io.File import java.nio.file.Path import net.kemitix.thorp.config.Config -import net.kemitix.thorp.core.hasher.Hasher import net.kemitix.thorp.domain.Sources -import net.kemitix.thorp.filesystem.FileSystem -import zio.{Task, RIO, ZIO} +import net.kemitix.thorp.filesystem.{FileSystem, Hasher} +import zio.{RIO, Task, ZIO} object LocalFileStream { diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileSystem.scala b/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileSystem.scala new file mode 100644 index 0000000..88e42c6 --- /dev/null +++ b/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileSystem.scala @@ -0,0 +1,256 @@ +package net.kemitix.thorp.lib + +import net.kemitix.eip.zio.MessageChannel.UChannel +import net.kemitix.eip.zio.{Message, MessageChannel} +import net.kemitix.thorp.config.Config +import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToDelete, ToUpload} +import net.kemitix.thorp.domain.RemoteObjects.{ + remoteHasHash, + remoteKeyExists, + remoteMatchesLocalFile +} +import net.kemitix.thorp.domain._ +import net.kemitix.thorp.filesystem.{FileSystem, Hasher} +import net.kemitix.thorp.lib.FileScanner.Hashes +import net.kemitix.thorp.storage.Storage +import net.kemitix.throp.uishell.UIEvent +import zio._ +import zio.clock.Clock + +trait LocalFileSystem { + + def scanCopyUpload( + uiChannel: UChannel[Any, UIEvent], + remoteObjects: RemoteObjects, + archive: ThorpArchive + ): RIO[ + Clock with Config with Hasher with FileSystem with FileScanner with Storage, + Seq[StorageQueueEvent]] + + def scanDelete( + uiChannel: UChannel[Any, UIEvent], + remoteData: RemoteObjects, + archive: UnversionedMirrorArchive.type + ): RIO[Clock with Config with FileSystem with Storage, Seq[StorageQueueEvent]] + +} +object LocalFileSystem extends LocalFileSystem { + + override def scanCopyUpload( + uiChannel: UChannel[Any, UIEvent], + remoteObjects: RemoteObjects, + archive: ThorpArchive + ): RIO[ + Clock with Hasher with FileSystem with Config with FileScanner with Storage, + Seq[StorageQueueEvent]] = + for { + actionCounter <- Ref.make(0) + bytesCounter <- Ref.make(0L) + uploads <- Ref.make(Map.empty[MD5Hash, Promise[Throwable, RemoteKey]]) + eventsRef <- Ref.make(List.empty[StorageQueueEvent]) + fileSender <- FileScanner.scanSources + fileReceiver <- fileReceiver(uiChannel, + remoteObjects, + archive, + uploads, + actionCounter, + bytesCounter, + eventsRef) + _ <- MessageChannel.pointToPoint(fileSender)(fileReceiver).runDrain + events <- eventsRef.get + } yield events + + override def scanDelete( + uiChannel: UChannel[Any, UIEvent], + remoteData: RemoteObjects, + archive: UnversionedMirrorArchive.type + ): RIO[Clock with Config with FileSystem with Storage, + Seq[StorageQueueEvent]] = + for { + actionCounter <- Ref.make(0) + bytesCounter <- Ref.make(0L) + eventsRef <- Ref.make(List.empty[StorageQueueEvent]) + keySender <- keySender(remoteData.byKey.keys) + keyReceiver <- keyReceiver(uiChannel, + archive, + actionCounter, + bytesCounter, + eventsRef) + _ <- MessageChannel.pointToPoint(keySender)(keyReceiver).runDrain + events <- eventsRef.get + } yield events + + private def fileReceiver( + uiChannel: UChannel[Any, UIEvent], + remoteObjects: RemoteObjects, + archive: ThorpArchive, + uploads: Ref[Map[MD5Hash, Promise[Throwable, RemoteKey]]], + actionCounterRef: Ref[Int], + bytesCounterRef: Ref[Long], + eventsRef: Ref[List[StorageQueueEvent]] + ): UIO[MessageChannel.UReceiver[Clock with Config with Storage, + FileScanner.ScannedFile]] = + UIO { message => + val localFile = message.body + for { + _ <- uiFileFound(uiChannel)(localFile) + action <- chooseAction(remoteObjects, uploads, uiChannel)(localFile) + actionCounter <- actionCounterRef.update(_ + 1) + bytesCounter <- bytesCounterRef.update(_ + action.size) + actionChosenMessage <- Message.create(UIEvent.ActionChosen(action)) + _ <- MessageChannel.send(uiChannel)(actionChosenMessage) + sequencedAction = SequencedAction(action, actionCounter) + event <- archive.update(sequencedAction, bytesCounter) + _ <- eventsRef.update(list => event :: list) + _ <- uiActionFinished(uiChannel)(action, actionCounter, bytesCounter) + } yield () + } + private def uiActionFinished(uiChannel: UChannel[Any, UIEvent])( + action: Action, + actionCounter: Int, + bytesCounter: Long + ) = + Message.create(UIEvent.ActionFinished(action, actionCounter, bytesCounter)) >>= + MessageChannel.send(uiChannel) + + private def uiFileFound(uiChannel: UChannel[Any, UIEvent])( + localFile: LocalFile) = + Message.create(UIEvent.FileFound(localFile)) >>= + MessageChannel.send(uiChannel) + + private def chooseAction( + remoteObjects: RemoteObjects, + uploads: Ref[Map[MD5Hash, Promise[Throwable, RemoteKey]]], + uiChannel: UChannel[Any, UIEvent], + )(localFile: LocalFile): ZIO[Config with Clock, Nothing, Action] = { + for { + remoteExists <- remoteKeyExists(remoteObjects, localFile.remoteKey) + remoteMatches <- remoteMatchesLocalFile(remoteObjects, localFile) + remoteForHash <- remoteHasHash(remoteObjects, localFile.hashes) + previous <- uploads.get + bucket <- Config.bucket + action <- if (remoteExists && remoteMatches) + doNothing(localFile, bucket) + else { + remoteForHash match { + case Some((sourceKey, hash)) => + doCopy(localFile, bucket, sourceKey, hash) + case _ if (matchesPreviousUpload(previous, localFile.hashes)) => + doCopyWithPreviousUpload(localFile, bucket, previous, uiChannel) + case _ => + doUpload(localFile, bucket) + } + } + } yield action + } + + private def matchesPreviousUpload( + previous: Map[MD5Hash, Promise[Throwable, RemoteKey]], + hashes: Hashes + ): Boolean = + hashes.exists({ + case (_, hash) => previous.contains(hash) + }) + + private def doNothing( + localFile: LocalFile, + bucket: Bucket + ): UIO[Action] = UIO { + DoNothing(bucket, localFile.remoteKey, localFile.length) + } + + private def doCopy( + localFile: LocalFile, + bucket: Bucket, + sourceKey: RemoteKey, + hash: MD5Hash + ): UIO[Action] = UIO { + ToCopy(bucket, sourceKey, hash, localFile.remoteKey, localFile.length) + } + + private def doCopyWithPreviousUpload( + localFile: LocalFile, + bucket: Bucket, + previous: Map[MD5Hash, Promise[Throwable, RemoteKey]], + uiChannel: UChannel[Any, UIEvent], + ): ZIO[Clock, Nothing, Action] = { + localFile.hashes + .find({ case (_, hash) => previous.contains(hash) }) + .map({ + case (_, hash) => + for { + awaitingMessage <- Message.create( + UIEvent.AwaitingAnotherUpload(localFile.remoteKey, hash)) + _ <- MessageChannel.send(uiChannel)(awaitingMessage) + action <- previous(hash).await.map( + remoteKey => + ToCopy(bucket, + remoteKey, + hash, + localFile.remoteKey, + localFile.length)) + waitFinishedMessage <- Message.create( + UIEvent.AnotherUploadWaitComplete(action)) + _ <- MessageChannel.send(uiChannel)(waitFinishedMessage) + } yield action + }) + .getOrElse(doUpload(localFile, bucket)) + .refineToOrDie[Nothing] + } + + private def doUpload( + localFile: LocalFile, + bucket: Bucket + ): UIO[Action] = { + UIO(ToUpload(bucket, localFile, localFile.length)) + } + + def keySender( + keys: Iterable[RemoteKey]): UIO[MessageChannel.Sender[Clock, RemoteKey]] = + UIO { channel => + ZIO.foreach(keys) { key => + Message.create(key) >>= MessageChannel.send(channel) + } *> MessageChannel.endChannel(channel) + } + + def keyReceiver( + uiChannel: UChannel[Any, UIEvent], + archive: ThorpArchive, + actionCounterRef: Ref[Int], + bytesCounterRef: Ref[Long], + eventsRef: Ref[List[StorageQueueEvent]] + ): UIO[ + MessageChannel.UReceiver[Clock with Config with FileSystem with Storage, + RemoteKey]] = + UIO { message => + { + val remoteKey = message.body + for { + _ <- uiKeyFound(uiChannel)(remoteKey) + sources <- Config.sources + prefix <- Config.prefix + exists <- FileSystem.hasLocalFile(sources, prefix, remoteKey) + _ <- ZIO.when(!exists) { + for { + actionCounter <- actionCounterRef.update(_ + 1) + bucket <- Config.bucket + action = ToDelete(bucket, remoteKey, 0L) + bytesCounter <- bytesCounterRef.update(_ + action.size) + sequencedAction = SequencedAction(action, actionCounter) + event <- archive.update(sequencedAction, 0L) + _ <- eventsRef.update(list => event :: list) + _ <- uiActionFinished(uiChannel)(action, + actionCounter, + bytesCounter) + } yield () + } + } yield () + } + } + + private def uiKeyFound(uiChannel: UChannel[Any, UIEvent])( + remoteKey: RemoteKey) = + Message.create(UIEvent.KeyFound(remoteKey)) >>= + MessageChannel.send(uiChannel) + +} diff --git a/core/src/main/scala/net/kemitix/thorp/core/LocalFileValidator.scala b/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileValidator.scala similarity index 94% rename from core/src/main/scala/net/kemitix/thorp/core/LocalFileValidator.scala rename to lib/src/main/scala/net/kemitix/thorp/lib/LocalFileValidator.scala index 9023c2b..8f76837 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/LocalFileValidator.scala +++ b/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileValidator.scala @@ -1,4 +1,4 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.lib import java.io.File import java.nio.file.Path @@ -24,7 +24,7 @@ object LocalFileValidator { for { file <- validateFile(path.toFile) remoteKey <- validateRemoteKey(sources, prefix, path) - } yield LocalFile(file, source, hash, remoteKey) + } yield LocalFile(file, source, hash, remoteKey, file.length) private def validateFile(file: File): IO[Violation, File] = if (file.isDirectory) diff --git a/core/src/main/scala/net/kemitix/thorp/core/LocalFiles.scala b/lib/src/main/scala/net/kemitix/thorp/lib/LocalFiles.scala similarity index 95% rename from core/src/main/scala/net/kemitix/thorp/core/LocalFiles.scala rename to lib/src/main/scala/net/kemitix/thorp/lib/LocalFiles.scala index 6b9b0af..a524275 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/LocalFiles.scala +++ b/lib/src/main/scala/net/kemitix/thorp/lib/LocalFiles.scala @@ -1,4 +1,4 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.lib import net.kemitix.thorp.domain.LocalFile diff --git a/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala b/lib/src/main/scala/net/kemitix/thorp/lib/PlanBuilder.scala similarity index 75% rename from core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala rename to lib/src/main/scala/net/kemitix/thorp/lib/PlanBuilder.scala index 638a61e..543e40d 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala +++ b/lib/src/main/scala/net/kemitix/thorp/lib/PlanBuilder.scala @@ -1,35 +1,24 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.lib import net.kemitix.thorp.config.Config import net.kemitix.thorp.console._ -import net.kemitix.thorp.core.Action._ -import net.kemitix.thorp.core.hasher.Hasher +import net.kemitix.thorp.domain.Action._ import net.kemitix.thorp.domain._ -import net.kemitix.thorp.filesystem.FileSystem -import net.kemitix.thorp.storage.api.Storage +import net.kemitix.thorp.filesystem.{FileSystem, Hasher} +import net.kemitix.thorp.storage.Storage import zio.{RIO, ZIO} object PlanBuilder { - def createPlan + def createPlan(remoteObjects: RemoteObjects) : RIO[Storage with Console with Config with FileSystem with Hasher, - SyncPlan] = (fetchRemoteData <&> findLocalFiles) >>= assemblePlan + SyncPlan] = findLocalFiles >>= assemblePlan(remoteObjects) - private def fetchRemoteData = - for { - bucket <- Config.bucket - prefix <- Config.prefix - objects <- Storage.list(bucket, prefix) - _ <- Console.putStrLn(s"Found ${objects.byKey.size} remote objects") - } yield objects - - private def assemblePlan(metadata: (RemoteObjects, LocalFiles)) = - metadata match { - case (remoteObjects, localData) => - createActions(remoteObjects, localData.localFiles) - .map(_.filter(doesSomething).sortBy(SequencePlan.order)) - .map(syncPlan(localData)) - } + private def assemblePlan(remoteObjects: RemoteObjects)( + localData: LocalFiles) = + createActions(remoteObjects, localData.localFiles) + .map(_.filter(doesSomething).sortBy(SequencePlan.order)) + .map(syncPlan(localData)) private def syncPlan(localData: LocalFiles): LazyList[Action] => SyncPlan = SyncPlan.create(_, syncTotal(localData)) diff --git a/core/src/main/scala/net/kemitix/thorp/core/PlanExecutor.scala b/lib/src/main/scala/net/kemitix/thorp/lib/PlanExecutor.scala similarity index 92% rename from core/src/main/scala/net/kemitix/thorp/core/PlanExecutor.scala rename to lib/src/main/scala/net/kemitix/thorp/lib/PlanExecutor.scala index 7562d73..1213330 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/PlanExecutor.scala +++ b/lib/src/main/scala/net/kemitix/thorp/lib/PlanExecutor.scala @@ -1,9 +1,9 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.lib import net.kemitix.thorp.config.Config import net.kemitix.thorp.console.Console -import net.kemitix.thorp.domain.StorageQueueEvent -import net.kemitix.thorp.storage.api.Storage +import net.kemitix.thorp.domain.{Action, StorageQueueEvent} +import net.kemitix.thorp.storage.Storage import zio.{Ref, ZIO} trait PlanExecutor { diff --git a/core/src/main/scala/net/kemitix/thorp/core/Remote.scala b/lib/src/main/scala/net/kemitix/thorp/lib/Remote.scala similarity index 96% rename from core/src/main/scala/net/kemitix/thorp/core/Remote.scala rename to lib/src/main/scala/net/kemitix/thorp/lib/Remote.scala index 956c598..582b406 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/Remote.scala +++ b/lib/src/main/scala/net/kemitix/thorp/lib/Remote.scala @@ -1,4 +1,4 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.lib import java.nio.file.Path diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/S3MetaDataEnricher.scala b/lib/src/main/scala/net/kemitix/thorp/lib/S3MetaDataEnricher.scala new file mode 100644 index 0000000..b0d8c15 --- /dev/null +++ b/lib/src/main/scala/net/kemitix/thorp/lib/S3MetaDataEnricher.scala @@ -0,0 +1,50 @@ +package net.kemitix.thorp.lib + +import net.kemitix.thorp.domain._ + +object S3MetaDataEnricher { + + def getMetadata( + localFile: LocalFile, + remoteObjects: RemoteObjects + ): MatchedMetadata = { + val (keyMatches, hashMatches) = getS3Status(localFile, remoteObjects) + + val maybeByKey: Option[RemoteMetaData] = keyMatches.map { hash => + RemoteMetaData(localFile.remoteKey, hash) + } + + val maybeByHash: Option[RemoteMetaData] = hashMatches.map { + case (md5Hash, remoteKey) => + RemoteMetaData(remoteKey, md5Hash) + }.headOption + + MatchedMetadata( + localFile, + matchByKey = maybeByKey, + matchByHash = maybeByHash + ) + } + + def getS3Status( + localFile: LocalFile, + remoteObjects: RemoteObjects + ): (Option[MD5Hash], Map[MD5Hash, RemoteKey]) = { + val byKey: Option[MD5Hash] = + remoteObjects.byKey.get(localFile.remoteKey) + val hashes: Map[HashType, MD5Hash] = localFile.hashes + val byHash: Map[MD5Hash, RemoteKey] = + hashes + .map { + case (hashType, md5Hash) => + (md5Hash, remoteObjects.byHash.get(md5Hash)) + } + .flatMap { + case (md5Hash, Some(maybeyKey)) => Some((md5Hash, maybeyKey)) + case (_, None) => None + } + + (byKey, byHash) + } + +} diff --git a/core/src/main/scala/net/kemitix/thorp/core/SequencePlan.scala b/lib/src/main/scala/net/kemitix/thorp/lib/SequencePlan.scala similarity index 58% rename from core/src/main/scala/net/kemitix/thorp/core/SequencePlan.scala rename to lib/src/main/scala/net/kemitix/thorp/lib/SequencePlan.scala index a7d5c85..e6e4904 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/SequencePlan.scala +++ b/lib/src/main/scala/net/kemitix/thorp/lib/SequencePlan.scala @@ -1,6 +1,7 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.lib -import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} +import net.kemitix.thorp.domain.Action +import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToDelete, ToUpload} trait SequencePlan { diff --git a/core/src/main/scala/net/kemitix/thorp/core/SequencedAction.scala b/lib/src/main/scala/net/kemitix/thorp/lib/SequencedAction.scala similarity index 50% rename from core/src/main/scala/net/kemitix/thorp/core/SequencedAction.scala rename to lib/src/main/scala/net/kemitix/thorp/lib/SequencedAction.scala index f7a1dc4..c81c05a 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/SequencedAction.scala +++ b/lib/src/main/scala/net/kemitix/thorp/lib/SequencedAction.scala @@ -1,4 +1,6 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.lib + +import net.kemitix.thorp.domain.Action final case class SequencedAction( action: Action, diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/SyncLogging.scala b/lib/src/main/scala/net/kemitix/thorp/lib/SyncLogging.scala new file mode 100644 index 0000000..112a665 --- /dev/null +++ b/lib/src/main/scala/net/kemitix/thorp/lib/SyncLogging.scala @@ -0,0 +1,18 @@ +package net.kemitix.thorp.lib + +import net.kemitix.thorp.config.Config +import net.kemitix.thorp.console._ +import zio.ZIO + +trait SyncLogging { + + def logFileScan: ZIO[Config with Console, Nothing, Unit] = + for { + sources <- Config.sources + _ <- Console.putStrLn( + s"Scanning local files: ${sources.paths.mkString(", ")}...") + } yield () + +} + +object SyncLogging extends SyncLogging diff --git a/core/src/main/scala/net/kemitix/thorp/core/SyncPlan.scala b/lib/src/main/scala/net/kemitix/thorp/lib/SyncPlan.scala similarity index 78% rename from core/src/main/scala/net/kemitix/thorp/core/SyncPlan.scala rename to lib/src/main/scala/net/kemitix/thorp/lib/SyncPlan.scala index 1815f59..8d6d028 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/SyncPlan.scala +++ b/lib/src/main/scala/net/kemitix/thorp/lib/SyncPlan.scala @@ -1,6 +1,6 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.lib -import net.kemitix.thorp.domain.SyncTotals +import net.kemitix.thorp.domain.{Action, SyncTotals} final case class SyncPlan private ( actions: LazyList[Action], diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/ThorpArchive.scala b/lib/src/main/scala/net/kemitix/thorp/lib/ThorpArchive.scala new file mode 100644 index 0000000..a9e168d --- /dev/null +++ b/lib/src/main/scala/net/kemitix/thorp/lib/ThorpArchive.scala @@ -0,0 +1,47 @@ +package net.kemitix.thorp.lib + +import net.kemitix.thorp.config.Config +import net.kemitix.thorp.console.ConsoleOut.{ + CopyComplete, + DeleteComplete, + ErrorQueueEventOccurred, + UploadComplete +} +import net.kemitix.thorp.console._ +import net.kemitix.thorp.domain.StorageQueueEvent +import net.kemitix.thorp.domain.StorageQueueEvent._ +import net.kemitix.thorp.storage.Storage +import zio.{RIO, ZIO} + +trait ThorpArchive { + + def update( + sequencedAction: SequencedAction, + totalBytesSoFar: Long + ): ZIO[Storage with Config, Nothing, StorageQueueEvent] + + def logEvent( + event: StorageQueueEvent): RIO[Console with Config, StorageQueueEvent] = + for { + batchMode <- Config.batchMode + sqe <- event match { + case UploadQueueEvent(remoteKey, _) => + ZIO(event) <* Console.putMessageLnB(UploadComplete(remoteKey), + batchMode) + case CopyQueueEvent(sourceKey, targetKey) => + ZIO(event) <* Console.putMessageLnB( + CopyComplete(sourceKey, targetKey), + batchMode) + case DeleteQueueEvent(remoteKey) => + ZIO(event) <* Console.putMessageLnB(DeleteComplete(remoteKey), + batchMode) + case ErrorQueueEvent(action, _, e) => + ZIO(event) <* Console.putMessageLnB( + ErrorQueueEventOccurred(action, e), + batchMode) + case DoNothingQueueEvent(_) => ZIO(event) + case ShutdownQueueEvent() => ZIO(event) + } + } yield sqe + +} diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/UnversionedMirrorArchive.scala b/lib/src/main/scala/net/kemitix/thorp/lib/UnversionedMirrorArchive.scala new file mode 100644 index 0000000..154e923 --- /dev/null +++ b/lib/src/main/scala/net/kemitix/thorp/lib/UnversionedMirrorArchive.scala @@ -0,0 +1,48 @@ +package net.kemitix.thorp.lib + +import net.kemitix.thorp.config.Config +import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToDelete, ToUpload} +import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent +import net.kemitix.thorp.domain._ +import net.kemitix.thorp.storage.Storage +import zio.{UIO, ZIO} + +trait UnversionedMirrorArchive extends ThorpArchive { + + override def update( + sequencedAction: SequencedAction, + totalBytesSoFar: Long + ): ZIO[Storage with Config, Nothing, StorageQueueEvent] = + sequencedAction match { + case SequencedAction(ToUpload(bucket, localFile, _), index) => + doUpload(index, totalBytesSoFar, bucket, localFile) + case SequencedAction(ToCopy(bucket, sourceKey, hash, targetKey, _), _) => + Storage.copy(bucket, sourceKey, hash, targetKey) + case SequencedAction(ToDelete(bucket, remoteKey, _), _) => + Storage.delete(bucket, remoteKey) + case SequencedAction(DoNothing(_, remoteKey, _), _) => + UIO(DoNothingQueueEvent(remoteKey)) + } + + private def doUpload( + index: Int, + totalBytesSoFar: Long, + bucket: Bucket, + localFile: LocalFile + ) = + for { + batchMode <- Config.batchMode + upload <- Storage.upload( + localFile, + bucket, + UploadEventListener.Settings( + localFile, + index, + totalBytesSoFar, + batchMode + ) + ) + } yield upload +} + +object UnversionedMirrorArchive extends UnversionedMirrorArchive diff --git a/core/src/test/resources/net/kemitix/thorp/core/File-6964 b/lib/src/test/resources/net/kemitix/thorp/lib/File-6964 similarity index 100% rename from core/src/test/resources/net/kemitix/thorp/core/File-6964 rename to lib/src/test/resources/net/kemitix/thorp/lib/File-6964 diff --git a/core/src/test/resources/net/kemitix/thorp/core/empty-file b/lib/src/test/resources/net/kemitix/thorp/lib/empty-file similarity index 100% rename from core/src/test/resources/net/kemitix/thorp/core/empty-file rename to lib/src/test/resources/net/kemitix/thorp/lib/empty-file diff --git a/core/src/test/resources/net/kemitix/thorp/core/invalid-config b/lib/src/test/resources/net/kemitix/thorp/lib/invalid-config similarity index 100% rename from core/src/test/resources/net/kemitix/thorp/core/invalid-config rename to lib/src/test/resources/net/kemitix/thorp/lib/invalid-config diff --git a/core/src/test/resources/net/kemitix/thorp/core/simple-config b/lib/src/test/resources/net/kemitix/thorp/lib/simple-config similarity index 100% rename from core/src/test/resources/net/kemitix/thorp/core/simple-config rename to lib/src/test/resources/net/kemitix/thorp/lib/simple-config diff --git a/core/src/test/resources/net/kemitix/thorp/core/small-file b/lib/src/test/resources/net/kemitix/thorp/lib/small-file similarity index 100% rename from core/src/test/resources/net/kemitix/thorp/core/small-file rename to lib/src/test/resources/net/kemitix/thorp/lib/small-file diff --git a/lib/src/test/resources/net/kemitix/thorp/lib/upload/root-file b/lib/src/test/resources/net/kemitix/thorp/lib/upload/root-file new file mode 100644 index 0000000..996f41a --- /dev/null +++ b/lib/src/test/resources/net/kemitix/thorp/lib/upload/root-file @@ -0,0 +1 @@ +This file is in the root directory of the upload tree. diff --git a/core/src/test/resources/net/kemitix/thorp/core/upload/subdir/leaf-file b/lib/src/test/resources/net/kemitix/thorp/lib/upload/subdir/leaf-file similarity index 100% rename from core/src/test/resources/net/kemitix/thorp/core/upload/subdir/leaf-file rename to lib/src/test/resources/net/kemitix/thorp/lib/upload/subdir/leaf-file diff --git a/core/src/test/scala/net/kemitix/thorp/core/ActionGeneratorSuite.scala b/lib/src/test/scala/net/kemitix/thorp/lib/ActionGeneratorSuite.scala similarity index 93% rename from core/src/test/scala/net/kemitix/thorp/core/ActionGeneratorSuite.scala rename to lib/src/test/scala/net/kemitix/thorp/lib/ActionGeneratorSuite.scala index a466392..bf845d5 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/ActionGeneratorSuite.scala +++ b/lib/src/test/scala/net/kemitix/thorp/lib/ActionGeneratorSuite.scala @@ -1,10 +1,10 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.lib import net.kemitix.thorp.config._ -import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToUpload} +import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToUpload} import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain._ -import net.kemitix.thorp.filesystem.FileSystem +import net.kemitix.thorp.filesystem.{FileSystem, Resource} import org.scalatest.FunSpec import zio.DefaultRuntime @@ -38,7 +38,7 @@ class ActionGeneratorSuite extends FunSpec { theRemoteMetadata = RemoteMetaData(theFile.remoteKey, theHash) input = MatchedMetadata( theFile, // local exists - matchByHash = Set(theRemoteMetadata), // remote matches + matchByHash = Some(theRemoteMetadata), // remote matches matchByKey = Some(theRemoteMetadata) // remote exists ) } yield (theFile, input) @@ -67,7 +67,7 @@ class ActionGeneratorSuite extends FunSpec { otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash) input = MatchedMetadata( theFile, // local exists - matchByHash = Set(otherRemoteMetadata), // other matches + matchByHash = Some(otherRemoteMetadata), // other matches matchByKey = None) // remote is missing } yield (theFile, theRemoteKey, input, otherRemoteKey) it("copy from other key") { @@ -94,7 +94,7 @@ class ActionGeneratorSuite extends FunSpec { sources, prefix) input = MatchedMetadata(theFile, // local exists - matchByHash = Set.empty, // other no matches + matchByHash = None, // other no matches matchByKey = None) // remote is missing } yield (theFile, input) it("upload") { @@ -127,7 +127,7 @@ class ActionGeneratorSuite extends FunSpec { ) input = MatchedMetadata( theFile, // local exists - matchByHash = Set(otherRemoteMetadata), // other matches + matchByHash = Some(otherRemoteMetadata), // other matches matchByKey = Some(oldRemoteMetadata)) // remote exists } yield (theFile, theRemoteKey, input, otherRemoteKey) it("copy from other key") { @@ -160,7 +160,7 @@ class ActionGeneratorSuite extends FunSpec { theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash) input = MatchedMetadata( theFile, // local exists - matchByHash = Set.empty, // remote no match, other no match + matchByHash = None, // remote no match, other no match matchByKey = Some(theRemoteMetadata) // remote exists ) } yield (theFile, input) diff --git a/core/src/test/scala/net/kemitix/thorp/core/DummyStorageService.scala b/lib/src/test/scala/net/kemitix/thorp/lib/DummyStorageService.scala similarity index 88% rename from core/src/test/scala/net/kemitix/thorp/core/DummyStorageService.scala rename to lib/src/test/scala/net/kemitix/thorp/lib/DummyStorageService.scala index 378e4cb..95a5655 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/DummyStorageService.scala +++ b/lib/src/test/scala/net/kemitix/thorp/lib/DummyStorageService.scala @@ -1,10 +1,9 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.lib import java.io.File -import net.kemitix.thorp.console._ import net.kemitix.thorp.domain._ -import net.kemitix.thorp.storage.api.Storage +import net.kemitix.thorp.storage.Storage import zio.{RIO, UIO} final case class DummyStorageService( @@ -18,7 +17,7 @@ final case class DummyStorageService( override def listObjects( bucket: Bucket, prefix: RemoteKey - ): RIO[Console, RemoteObjects] = + ): RIO[Storage, RemoteObjects] = RIO(remoteObjects) override def upload( diff --git a/lib/src/test/scala/net/kemitix/thorp/lib/EIPTest.scala b/lib/src/test/scala/net/kemitix/thorp/lib/EIPTest.scala new file mode 100644 index 0000000..6a14e90 --- /dev/null +++ b/lib/src/test/scala/net/kemitix/thorp/lib/EIPTest.scala @@ -0,0 +1,50 @@ +package net.kemitix.thorp.lib + +import org.scalatest.FreeSpec +import zio.stream._ +import zio.{DefaultRuntime, IO, UIO, ZIO} + +// Experiment on how to trigger events asynchronously +// i.e. Have one thread add to a queue and another take from the queue, neither waiting for the other thread to finish +class EIPTest extends FreeSpec { + + "queue" in { + type Callback[A] = IO[Option[Throwable], A] => Unit + def offerInt(cb: Callback[Int], i: Int) = ZIO(cb(IO.succeed(i))) + def closeStream(cb: Callback[Int]) = ZIO(cb(IO.fail(None))) + def publish: Callback[Int] => IO[Throwable, _] = + cb => + ZIO.foreach(1 to 3) { i => + ZIO { + println(s"put $i") + Thread.sleep(100) + } *> offerInt(cb, i) + } *> closeStream(cb) + val program = Stream + .effectAsyncM(publish) + .mapM(i => ZIO(println(s"get $i"))) + + new DefaultRuntime {}.unsafeRunSync(program.runDrain) + } + + "EIP: Message Channel" in { + type Message = Int + type Callback = IO[Option[Throwable], Message] => Unit + def producer: Callback => UIO[Unit] = + cb => + ZIO.foreach(1 to 3)(message => + UIO { + println(s"put $message") + cb(ZIO.succeed(message)) + Thread.sleep(100) + }) *> UIO(cb(ZIO.fail(None))) + def consumer: Message => ZIO[Any, Throwable, Unit] = + message => ZIO(println(s"got $message")) + val program = zio.stream.Stream + .effectAsyncM(producer) + .buffer(1) + .mapM(consumer) + new DefaultRuntime {}.unsafeRunSync(program.runDrain) + } + +} diff --git a/core/src/test/scala/net/kemitix/thorp/core/FiltersSuite.scala b/lib/src/test/scala/net/kemitix/thorp/lib/FiltersSuite.scala similarity index 99% rename from core/src/test/scala/net/kemitix/thorp/core/FiltersSuite.scala rename to lib/src/test/scala/net/kemitix/thorp/lib/FiltersSuite.scala index ffdf3f4..4f01487 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/FiltersSuite.scala +++ b/lib/src/test/scala/net/kemitix/thorp/lib/FiltersSuite.scala @@ -1,4 +1,4 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.lib import java.nio.file.Paths diff --git a/core/src/test/scala/net/kemitix/thorp/core/KeyGeneratorSuite.scala b/lib/src/test/scala/net/kemitix/thorp/lib/KeyGeneratorSuite.scala similarity index 94% rename from core/src/test/scala/net/kemitix/thorp/core/KeyGeneratorSuite.scala rename to lib/src/test/scala/net/kemitix/thorp/lib/KeyGeneratorSuite.scala index 61f3cfa..0de7a11 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/KeyGeneratorSuite.scala +++ b/lib/src/test/scala/net/kemitix/thorp/lib/KeyGeneratorSuite.scala @@ -1,10 +1,10 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.lib import java.io.File import java.nio.file.Path -import net.kemitix.thorp.config.Resource import net.kemitix.thorp.domain.{RemoteKey, Sources} +import net.kemitix.thorp.filesystem.Resource import org.scalatest.FunSpec import zio.DefaultRuntime diff --git a/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala b/lib/src/test/scala/net/kemitix/thorp/lib/LocalFileStreamSuite.scala similarity index 92% rename from core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala rename to lib/src/test/scala/net/kemitix/thorp/lib/LocalFileStreamSuite.scala index 6cf3848..5b10ec1 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala +++ b/lib/src/test/scala/net/kemitix/thorp/lib/LocalFileStreamSuite.scala @@ -1,4 +1,4 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.lib import java.nio.file.Paths @@ -6,15 +6,13 @@ import net.kemitix.thorp.config.{ Config, ConfigOption, ConfigOptions, - ConfigurationBuilder, - Resource + ConfigurationBuilder } import net.kemitix.thorp.console._ -import net.kemitix.thorp.core.hasher.Hasher import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain._ -import net.kemitix.thorp.filesystem.FileSystem -import net.kemitix.thorp.storage.api.Storage +import net.kemitix.thorp.filesystem.{FileSystem, Hasher, Resource} +import net.kemitix.thorp.storage.Storage import org.scalatest.FunSpec import zio.{DefaultRuntime, Task, UIO} diff --git a/core/src/test/scala/net/kemitix/thorp/core/MatchedMetadataEnricherSuite.scala b/lib/src/test/scala/net/kemitix/thorp/lib/MatchedMetadataEnricherSuite.scala similarity index 85% rename from core/src/test/scala/net/kemitix/thorp/core/MatchedMetadataEnricherSuite.scala rename to lib/src/test/scala/net/kemitix/thorp/lib/MatchedMetadataEnricherSuite.scala index 53f8cfc..6245662 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/MatchedMetadataEnricherSuite.scala +++ b/lib/src/test/scala/net/kemitix/thorp/lib/MatchedMetadataEnricherSuite.scala @@ -1,9 +1,9 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.lib -import net.kemitix.thorp.config.Resource -import net.kemitix.thorp.core.S3MetaDataEnricher.{getMetadata, getS3Status} +import net.kemitix.thorp.lib.S3MetaDataEnricher.{getMetadata, getS3Status} import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain._ +import net.kemitix.thorp.filesystem.Resource import org.scalatest.FunSpec import scala.collection.MapView @@ -15,17 +15,11 @@ class MatchedMetadataEnricherSuite extends FunSpec { private val prefix = RemoteKey("prefix") def getMatchesByKey( - status: (Option[MD5Hash], Set[(RemoteKey, MD5Hash)])): Option[MD5Hash] = { + status: (Option[MD5Hash], Map[MD5Hash, RemoteKey])): Option[MD5Hash] = { val (byKey, _) = status byKey } - def getMatchesByHash(status: (Option[MD5Hash], Set[(RemoteKey, MD5Hash)])) - : Set[(RemoteKey, MD5Hash)] = { - val (_, byHash) = status - byHash - } - describe("enrich with metadata") { describe( @@ -39,7 +33,7 @@ class MatchedMetadataEnricherSuite extends FunSpec { prefix) theRemoteKey = theFile.remoteKey remoteObjects = RemoteObjects( - byHash = MapView(theHash -> Set(theRemoteKey)), + byHash = MapView(theHash -> theRemoteKey), byKey = MapView(theRemoteKey -> theHash) ) theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash) @@ -47,9 +41,10 @@ class MatchedMetadataEnricherSuite extends FunSpec { it("generates valid metadata") { env.map({ case (theFile, theRemoteMetadata, remoteObjects) => { - val expected = MatchedMetadata(theFile, - matchByHash = Set(theRemoteMetadata), - matchByKey = Some(theRemoteMetadata)) + val expected = + MatchedMetadata(theFile, + matchByHash = Some(theRemoteMetadata), + matchByKey = Some(theRemoteMetadata)) val result = getMetadata(theFile, remoteObjects) assertResult(expected)(result) } @@ -67,7 +62,7 @@ class MatchedMetadataEnricherSuite extends FunSpec { prefix) theRemoteKey: RemoteKey = RemoteKey.resolve("the-file")(prefix) remoteObjects = RemoteObjects( - byHash = MapView(theHash -> Set(theRemoteKey)), + byHash = MapView(theHash -> theRemoteKey), byKey = MapView(theRemoteKey -> theHash) ) theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash) @@ -75,9 +70,10 @@ class MatchedMetadataEnricherSuite extends FunSpec { it("generates valid metadata") { env.map({ case (theFile, theRemoteMetadata, remoteObjects) => { - val expected = MatchedMetadata(theFile, - matchByHash = Set(theRemoteMetadata), - matchByKey = Some(theRemoteMetadata)) + val expected = + MatchedMetadata(theFile, + matchByHash = Some(theRemoteMetadata), + matchByKey = Some(theRemoteMetadata)) val result = getMetadata(theFile, remoteObjects) assertResult(expected)(result) } @@ -95,7 +91,7 @@ class MatchedMetadataEnricherSuite extends FunSpec { prefix) otherRemoteKey = RemoteKey("other-key") remoteObjects = RemoteObjects( - byHash = MapView(theHash -> Set(otherRemoteKey)), + byHash = MapView(theHash -> otherRemoteKey), byKey = MapView(otherRemoteKey -> theHash) ) otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash) @@ -105,7 +101,7 @@ class MatchedMetadataEnricherSuite extends FunSpec { case (theFile, otherRemoteMetadata, remoteObjects) => { val expected = MatchedMetadata(theFile, matchByHash = - Set(otherRemoteMetadata), + Some(otherRemoteMetadata), matchByKey = None) val result = getMetadata(theFile, remoteObjects) assertResult(expected)(result) @@ -128,9 +124,7 @@ class MatchedMetadataEnricherSuite extends FunSpec { env.map({ case (theFile, remoteObjects) => { val expected = - MatchedMetadata(theFile, - matchByHash = Set.empty, - matchByKey = None) + MatchedMetadata(theFile, matchByHash = None, matchByKey = None) val result = getMetadata(theFile, remoteObjects) assertResult(expected)(result) } @@ -150,8 +144,7 @@ class MatchedMetadataEnricherSuite extends FunSpec { oldHash = MD5Hash("old-hash") otherRemoteKey = RemoteKey.resolve("other-key")(prefix) remoteObjects = RemoteObjects( - byHash = MapView(oldHash -> Set(theRemoteKey), - theHash -> Set(otherRemoteKey)), + byHash = MapView(oldHash -> theRemoteKey, theHash -> otherRemoteKey), byKey = MapView( theRemoteKey -> oldHash, otherRemoteKey -> theHash @@ -168,7 +161,7 @@ class MatchedMetadataEnricherSuite extends FunSpec { remoteObjects) => { val expected = MatchedMetadata(theFile, matchByHash = - Set(otherRemoteMetadata), + Some(otherRemoteMetadata), matchByKey = Some(theRemoteMetadata)) val result = getMetadata(theFile, remoteObjects) assertResult(expected)(result) @@ -188,7 +181,7 @@ class MatchedMetadataEnricherSuite extends FunSpec { theRemoteKey = theFile.remoteKey oldHash = MD5Hash("old-hash") remoteObjects = RemoteObjects( - byHash = MapView(oldHash -> Set(theRemoteKey), theHash -> Set.empty), + byHash = MapView(oldHash -> theRemoteKey), byKey = MapView(theRemoteKey -> oldHash) ) theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash) @@ -197,7 +190,7 @@ class MatchedMetadataEnricherSuite extends FunSpec { env.map({ case (theFile, theRemoteMetadata, remoteObjects) => { val expected = MatchedMetadata(theFile, - matchByHash = Set.empty, + matchByHash = None, matchByKey = Some(theRemoteMetadata)) val result = getMetadata(theFile, remoteObjects) assertResult(expected)(result) @@ -233,8 +226,8 @@ class MatchedMetadataEnricherSuite extends FunSpec { prefix) remoteObjects = RemoteObjects( byHash = MapView( - hash -> Set(key, keyOtherKey.remoteKey), - diffHash -> Set(keyDiffHash.remoteKey) + hash -> key, + diffHash -> keyDiffHash.remoteKey ), byKey = MapView( key -> hash, @@ -283,7 +276,10 @@ class MatchedMetadataEnricherSuite extends FunSpec { case (remoteObjects, _, _, _) => { env2.map({ case (localFile) => { - val result = getMatchesByHash(invoke(localFile, remoteObjects)) + val result = { + val (_, byHash) = invoke(localFile, remoteObjects) + byHash + } assert(result.isEmpty) } }) @@ -300,7 +296,10 @@ class MatchedMetadataEnricherSuite extends FunSpec { assert(result.contains(diffHash)) } it("should return only itself in match by hash") { - val result = getMatchesByHash(invoke(keyDiffHash, remoteObjects)) + val result = { + val (_, byHash) = invoke(keyDiffHash, remoteObjects) + byHash + } assert(result === Set((keyDiffHash.remoteKey, diffHash))) } } diff --git a/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala b/lib/src/test/scala/net/kemitix/thorp/lib/PlanBuilderTest.scala similarity index 93% rename from core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala rename to lib/src/test/scala/net/kemitix/thorp/lib/PlanBuilderTest.scala index 1fcef9c..b1b1b11 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala +++ b/lib/src/test/scala/net/kemitix/thorp/lib/PlanBuilderTest.scala @@ -1,4 +1,4 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.lib import java.io.File import java.nio.file.Path @@ -10,12 +10,11 @@ import net.kemitix.thorp.config.{ ConfigurationBuilder } import net.kemitix.thorp.console._ -import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} -import net.kemitix.thorp.core.hasher.Hasher +import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain._ -import net.kemitix.thorp.filesystem._ -import net.kemitix.thorp.storage.api.Storage +import net.kemitix.thorp.filesystem.{Hasher, _} +import net.kemitix.thorp.storage.Storage import org.scalatest.FreeSpec import zio.{DefaultRuntime, Task, UIO} @@ -64,7 +63,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { val anOtherKey = RemoteKey("other") val expected = Right(List(toCopy(anOtherKey, aHash, remoteKey))) val remoteObjects = RemoteObjects( - byHash = MapView(aHash -> Set(anOtherKey)), + byHash = MapView(aHash -> anOtherKey), byKey = MapView(anOtherKey -> aHash) ) val result = @@ -86,7 +85,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { // DoNothing actions should have been filtered out of the plan val expected = Right(List()) val remoteObjects = RemoteObjects( - byHash = MapView(hash -> Set(remoteKey)), + byHash = MapView(hash -> remoteKey), byKey = MapView(remoteKey -> hash) ) val result = @@ -107,7 +106,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { val expected = Right(List(toUpload(remoteKey, currentHash, source, file))) val remoteObjects = RemoteObjects( - byHash = MapView(originalHash -> Set(remoteKey)), + byHash = MapView(originalHash -> remoteKey), byKey = MapView(remoteKey -> originalHash) ) val result = @@ -126,7 +125,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { val sourceKey = RemoteKey("other-key") val expected = Right(List(toCopy(sourceKey, hash, remoteKey))) val remoteObjects = RemoteObjects( - byHash = MapView(hash -> Set(sourceKey)), + byHash = MapView(hash -> sourceKey), byKey = MapView.empty ) val result = @@ -151,7 +150,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { // DoNothing actions should have been filtered out of the plan val expected = Right(List()) val remoteObjects = RemoteObjects( - byHash = MapView(hash -> Set(remoteKey)), + byHash = MapView(hash -> remoteKey), byKey = MapView(remoteKey -> hash) ) val result = @@ -168,7 +167,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { val hash = MD5Hash("file-content") val expected = Right(List(toDelete(remoteKey))) val remoteObjects = RemoteObjects( - byHash = MapView(hash -> Set(remoteKey)), + byHash = MapView(hash -> remoteKey), byKey = MapView(remoteKey -> hash) ) val result = @@ -255,7 +254,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { val hash2 = md5Hash(fileInSecondSource) val expected = Right(List()) val remoteObjects = - RemoteObjects(byHash = MapView(hash2 -> Set(remoteKey2)), + RemoteObjects(byHash = MapView(hash2 -> remoteKey2), byKey = MapView(remoteKey2 -> hash2)) val result = invoke(options(firstSource)(secondSource), @@ -276,7 +275,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { withDirectory(secondSource => { val expected = Right(List()) val remoteObjects = - RemoteObjects(byHash = MapView(hash1 -> Set(remoteKey1)), + RemoteObjects(byHash = MapView(hash1 -> remoteKey1), byKey = MapView(remoteKey1 -> hash1)) val result = invoke(options(firstSource)(secondSource), @@ -367,9 +366,12 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { } def testProgram = for { - config <- ConfigurationBuilder.buildConfig(configOptions) - _ <- Config.set(config) - plan <- PlanBuilder.createPlan + config <- ConfigurationBuilder.buildConfig(configOptions) + _ <- Config.set(config) + bucket <- Config.bucket + prefix <- Config.prefix + remoteObjects <- Storage.list(bucket, prefix) + plan <- PlanBuilder.createPlan(remoteObjects) } yield plan new DefaultRuntime {} .unsafeRunSync(testProgram.provide(testEnv)) diff --git a/core/src/test/scala/net/kemitix/thorp/core/PlanExecutorTest.scala b/lib/src/test/scala/net/kemitix/thorp/lib/PlanExecutorTest.scala similarity index 87% rename from core/src/test/scala/net/kemitix/thorp/core/PlanExecutorTest.scala rename to lib/src/test/scala/net/kemitix/thorp/lib/PlanExecutorTest.scala index 18858bc..dfa7ca6 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/PlanExecutorTest.scala +++ b/lib/src/test/scala/net/kemitix/thorp/lib/PlanExecutorTest.scala @@ -1,17 +1,17 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.lib import net.kemitix.thorp.config.Config import net.kemitix.thorp.console.Console -import net.kemitix.thorp.core.Action.DoNothing +import net.kemitix.thorp.domain.Action.DoNothing import net.kemitix.thorp.domain.{ Bucket, RemoteKey, StorageQueueEvent, SyncTotals } -import net.kemitix.thorp.storage.api.Storage +import net.kemitix.thorp.storage.Storage import org.scalatest.FreeSpec -import zio.{DefaultRuntime, ZIO} +import zio.{DefaultRuntime, UIO, ZIO} class PlanExecutorTest extends FreeSpec { @@ -33,7 +33,7 @@ class PlanExecutorTest extends FreeSpec { LazyList.from(1 to nActions).map(DoNothing(bucket, remoteKey, _)) val syncTotals = SyncTotals.empty - val archiveTask = UnversionedMirrorArchive.default(syncTotals) + val archiveTask = UIO(UnversionedMirrorArchive) val syncPlan = SyncPlan(input, syncTotals) val program: ZIO[Storage with Config with Console, diff --git a/core/src/test/scala/net/kemitix/thorp/core/SequencePlanTest.scala b/lib/src/test/scala/net/kemitix/thorp/lib/SequencePlanTest.scala similarity index 82% rename from core/src/test/scala/net/kemitix/thorp/core/SequencePlanTest.scala rename to lib/src/test/scala/net/kemitix/thorp/lib/SequencePlanTest.scala index 32e08ba..597fd13 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/SequencePlanTest.scala +++ b/lib/src/test/scala/net/kemitix/thorp/lib/SequencePlanTest.scala @@ -1,15 +1,9 @@ -package net.kemitix.thorp.core +package net.kemitix.thorp.lib import java.io.File -import net.kemitix.thorp.core.Action._ -import net.kemitix.thorp.domain.{ - Bucket, - HashType, - LocalFile, - MD5Hash, - RemoteKey -} +import net.kemitix.thorp.domain.Action._ +import net.kemitix.thorp.domain._ import org.scalatest.FreeSpec class SequencePlanTest extends FreeSpec { @@ -26,9 +20,9 @@ class SequencePlanTest extends FreeSpec { val file2 = new File("aFile") val source = new File("source") val localFile1 = - LocalFile(file1, source, hashes, remoteKey1) + LocalFile(file1, source, hashes, remoteKey1, file1.length) val _ = - LocalFile(file2, source, hashes, remoteKey2) + LocalFile(file2, source, hashes, remoteKey2, file2.length) val copy1 = ToCopy(bucket, remoteKey1, hash, remoteKey2, size) val copy2 = ToCopy(bucket, remoteKey2, hash, remoteKey1, size) val upload1 = ToUpload(bucket, localFile1, size) diff --git a/modules.dot b/modules.dot new file mode 100644 index 0000000..8a51da8 --- /dev/null +++ b/modules.dot @@ -0,0 +1,26 @@ +digraph deps { + +app -> cli +app -> lib +app -> "storage-aws" + +cli -> config + +lib -> storage +lib -> console +lib -> config +lib -> filesystem +lib -> domain + +"storage-aws" -> storage + +config -> filesystem +config -> domain + +storage -> domain + +console -> domain + +filesystem -> domain + +} diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala index 906de76..0d37a3f 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala @@ -5,11 +5,11 @@ import com.amazonaws.services.s3.model.{ ListObjectsV2Result, S3ObjectSummary } -import net.kemitix.thorp.console._ import net.kemitix.thorp.domain.{Bucket, RemoteKey, RemoteObjects} +import net.kemitix.thorp.storage.Storage import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey -import zio.{Task, RIO} +import zio.{RIO, Task} import scala.jdk.CollectionConverters._ @@ -21,7 +21,7 @@ trait Lister { def listObjects(amazonS3: AmazonS3.Client)( bucket: Bucket, prefix: RemoteKey - ): RIO[Console, RemoteObjects] = { + ): RIO[Storage, RemoteObjects] = { def request = new ListObjectsV2Request() @@ -31,15 +31,16 @@ trait Lister { def requestMore: Token => ListObjectsV2Request = token => request.withContinuationToken(token) - def fetchBatch: ListObjectsV2Request => RIO[Console, Batch] = - request => ListerLogger.logFetchBatch *> tryFetchBatch(amazonS3)(request) + def fetchBatch: ListObjectsV2Request => Task[Batch] = + request => tryFetchBatch(amazonS3)(request) - def fetchMore: Option[Token] => RIO[Console, LazyList[S3ObjectSummary]] = { + def fetchMore: Option[Token] => Task[LazyList[S3ObjectSummary]] = { case None => RIO.succeed(LazyList.empty) case Some(token) => fetch(requestMore(token)) } - def fetch: ListObjectsV2Request => RIO[Console, LazyList[S3ObjectSummary]] = + def fetch: ListObjectsV2Request => Task[LazyList[S3ObjectSummary]] = + request => for { batch <- fetchBatch(request) diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ObjectsByHash.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ObjectsByHash.scala index 0eee832..fab8cf6 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ObjectsByHash.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ObjectsByHash.scala @@ -9,12 +9,11 @@ object S3ObjectsByHash { def byHash( os: LazyList[S3ObjectSummary] - ): MapView[MD5Hash, Set[RemoteKey]] = { - val mD5HashToS3Objects: Map[MD5Hash, LazyList[S3ObjectSummary]] = - os.groupBy(o => MD5Hash(o.getETag.filter(_ != '"'))) - mD5HashToS3Objects.view.mapValues { os => - os.map(_.getKey).map(RemoteKey(_)).toSet - } - } + ): MapView[MD5Hash, RemoteKey] = + os.map { o => + (MD5Hash(o.getETag) -> RemoteKey(o.getKey)) + } + .toMap + .view } diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala index f564917..2fdb5f1 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala @@ -2,13 +2,11 @@ package net.kemitix.thorp.storage.aws import com.amazonaws.services.s3.AmazonS3ClientBuilder import com.amazonaws.services.s3.transfer.TransferManagerBuilder -import net.kemitix.thorp.config.Config -import net.kemitix.thorp.console.Console import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent import net.kemitix.thorp.domain._ -import net.kemitix.thorp.storage.api.Storage -import net.kemitix.thorp.storage.api.Storage.Service -import zio.{RIO, UIO, ZIO} +import net.kemitix.thorp.storage.Storage +import net.kemitix.thorp.storage.Storage.Service +import zio.{RIO, UIO} object S3Storage { trait Live extends Storage { @@ -21,14 +19,14 @@ object S3Storage { TransferManagerBuilder.defaultTransferManager) override def listObjects(bucket: Bucket, - prefix: RemoteKey): RIO[Console, RemoteObjects] = + prefix: RemoteKey): RIO[Storage, RemoteObjects] = Lister.listObjects(client)(bucket, prefix) override def upload( localFile: LocalFile, bucket: Bucket, listenerSettings: UploadEventListener.Settings, - ): ZIO[Config, Nothing, StorageQueueEvent] = + ): UIO[StorageQueueEvent] = Uploader.upload(transferManager)( Uploader.Request(localFile, bucket, listenerSettings)) diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala index b6dead7..e0d6ed7 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala @@ -5,7 +5,6 @@ import java.util.concurrent.locks.StampedLock import com.amazonaws.event.ProgressEventType.RESPONSE_BYTE_TRANSFER_EVENT import com.amazonaws.event.{ProgressEvent, ProgressListener} import com.amazonaws.services.s3.model.{ObjectMetadata, PutObjectRequest} -import net.kemitix.thorp.config.Config import net.kemitix.thorp.domain.Implicits._ import net.kemitix.thorp.domain.StorageQueueEvent.{ Action, @@ -19,12 +18,12 @@ import net.kemitix.thorp.domain.UploadEvent.{ } import net.kemitix.thorp.domain.{StorageQueueEvent, _} import net.kemitix.thorp.storage.aws.Uploader.Request -import zio.{UIO, ZIO} +import zio.UIO trait Uploader { def upload(transferManager: => AmazonTransferManager)( - request: Request): ZIO[Config, Nothing, StorageQueueEvent] = + request: Request): UIO[StorageQueueEvent] = transfer(transferManager)(request) .catchAll(handleError(request.localFile.remoteKey)) @@ -35,8 +34,7 @@ trait Uploader { private def transfer(transferManager: => AmazonTransferManager)( request: Request ) = - putObjectRequest(request) >>= - dispatch(transferManager) + dispatch(transferManager)(putObjectRequest(request)) private def dispatch(transferManager: AmazonTransferManager)( putObjectRequest: PutObjectRequest @@ -57,13 +55,10 @@ trait Uploader { request.localFile.remoteKey.key, request.localFile.file) .withMetadata(metadata(request.localFile)) - for { - batchMode <- Config.batchMode - r = if (batchMode) putRequest - else - putRequest.withGeneralProgressListener( - progressListener(request.uploadEventListener)) - } yield r + if (request.uploadEventListener.batchMode) putRequest + else + putRequest.withGeneralProgressListener( + progressListener(request.uploadEventListener)) } private def metadata: LocalFile => ObjectMetadata = localFile => { diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/hasher/ETagGenerator.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/hasher/ETagGenerator.scala index a7d218d..fb9301e 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/hasher/ETagGenerator.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/hasher/ETagGenerator.scala @@ -5,10 +5,9 @@ import java.nio.file.Path import com.amazonaws.services.s3.model.PutObjectRequest import com.amazonaws.services.s3.transfer.TransferManagerConfiguration import com.amazonaws.services.s3.transfer.internal.TransferManagerUtils -import net.kemitix.thorp.core.hasher.Hasher import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain.MD5Hash -import net.kemitix.thorp.filesystem.FileSystem +import net.kemitix.thorp.filesystem.{FileSystem, Hasher} import zio.{RIO, ZIO} private trait ETagGenerator { diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/hasher/S3Hasher.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/hasher/S3Hasher.scala index 9add325..bfbe8e0 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/hasher/S3Hasher.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/hasher/S3Hasher.scala @@ -2,11 +2,10 @@ package net.kemitix.thorp.storage.aws.hasher import java.nio.file.Path -import net.kemitix.thorp.core.hasher.Hasher -import net.kemitix.thorp.core.hasher.Hasher.Live.{hasher => CoreHasher} -import net.kemitix.thorp.core.hasher.Hasher.Service import net.kemitix.thorp.domain.{HashType, MD5Hash} -import net.kemitix.thorp.filesystem.FileSystem +import net.kemitix.thorp.filesystem.Hasher.Live.{hasher => CoreHasher} +import net.kemitix.thorp.filesystem.Hasher.Service +import net.kemitix.thorp.filesystem.{FileSystem, Hasher} import net.kemitix.thorp.storage.aws.ETag import zio.RIO diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala index c23a854..f923f2e 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala @@ -1,12 +1,10 @@ package net.kemitix.thorp.storage.aws -import net.kemitix.thorp.config.Config -import net.kemitix.thorp.console.Console import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent import net.kemitix.thorp.domain._ -import net.kemitix.thorp.storage.api.Storage +import net.kemitix.thorp.storage.Storage import org.scalamock.scalatest.MockFactory -import zio.{RIO, UIO, ZIO} +import zio.{RIO, UIO} trait AmazonS3ClientTestFixture extends MockFactory { @@ -29,14 +27,14 @@ trait AmazonS3ClientTestFixture extends MockFactory { override def listObjects( bucket: Bucket, prefix: RemoteKey - ): RIO[Console, RemoteObjects] = + ): RIO[Storage, RemoteObjects] = Lister.listObjects(client)(bucket, prefix) override def upload( localFile: LocalFile, bucket: Bucket, listenerSettings: UploadEventListener.Settings, - ): ZIO[Config, Nothing, StorageQueueEvent] = + ): UIO[StorageQueueEvent] = Uploader.upload(transferManager)( Uploader.Request(localFile, bucket, listenerSettings)) diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ListerTest.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ListerTest.scala index c2078c2..ec5ee49 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ListerTest.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ListerTest.scala @@ -8,18 +8,15 @@ import com.amazonaws.services.s3.model.{ ListObjectsV2Result, S3ObjectSummary } -import net.kemitix.thorp.console._ import net.kemitix.thorp.domain.NonUnit.~* import net.kemitix.thorp.domain._ +import net.kemitix.thorp.storage.Storage import org.scalatest.FreeSpec import org.scalatest.Matchers._ -import zio.internal.PlatformLive -import zio.{Runtime, Task, UIO} +import zio.{DefaultRuntime, Task, UIO} class ListerTest extends FreeSpec { - private val runtime = Runtime(Console.Live, PlatformLive.Default) - "list" - { val bucket = Bucket("aBucket") val prefix = RemoteKey("aRemoteKey") @@ -28,7 +25,7 @@ class ListerTest extends FreeSpec { val nowDate = new Date val key = "key" val etag = "etag" - val expectedHashMap = Map(MD5Hash(etag) -> Set(RemoteKey(key))) + val expectedHashMap = Map(MD5Hash(etag) -> RemoteKey(key)) val expectedKeyMap = Map(RemoteKey(key) -> MD5Hash(etag)) new AmazonS3ClientTestFixture { (fixture.amazonS3Client.listObjectsV2 _) @@ -51,8 +48,8 @@ class ListerTest extends FreeSpec { val key2 = "key2" val etag2 = "etag2" val expectedHashMap = Map( - MD5Hash(etag1) -> Set(RemoteKey(key1)), - MD5Hash(etag2) -> Set(RemoteKey(key2)) + MD5Hash(etag1) -> RemoteKey(key1), + MD5Hash(etag2) -> RemoteKey(key2) ) val expectedKeyMap = Map( RemoteKey(key1) -> MD5Hash(etag1), @@ -121,8 +118,10 @@ class ListerTest extends FreeSpec { } def invoke(amazonS3Client: AmazonS3.Client)(bucket: Bucket, prefix: RemoteKey) = - runtime.unsafeRunSync { - Lister.listObjects(amazonS3Client)(bucket, prefix) + new DefaultRuntime {}.unsafeRunSync { + Lister + .listObjects(amazonS3Client)(bucket, prefix) + .provide(Storage.Test) }.toEither } diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3ObjectsByHashSuite.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3ObjectsByHashSuite.scala index 14ca5c2..8ec70bf 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3ObjectsByHashSuite.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3ObjectsByHashSuite.scala @@ -14,8 +14,8 @@ class S3ObjectsByHashSuite extends FunSpec { val o2 = s3object(hash, key2) val os = LazyList(o1, o2) it("should group by the hash value") { - val expected: Map[MD5Hash, Set[RemoteKey]] = Map( - hash -> Set(key1, key2) + val expected: Map[MD5Hash, RemoteKey] = Map( + hash -> key2 ) val result = Map.from(S3ObjectsByHash.byHash(os)) assertResult(expected)(result) diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala index c049ba3..15540fb 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala @@ -1,9 +1,9 @@ package net.kemitix.thorp.storage.aws -import net.kemitix.thorp.config.Resource -import net.kemitix.thorp.core.{LocalFileValidator, S3MetaDataEnricher} +import net.kemitix.thorp.lib.{LocalFileValidator, S3MetaDataEnricher} import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain._ +import net.kemitix.thorp.filesystem.Resource import org.scalamock.scalatest.MockFactory import org.scalatest.FunSpec @@ -39,8 +39,8 @@ class StorageServiceSuite extends FunSpec with MockFactory { prefix) s3ObjectsData = RemoteObjects( byHash = MapView( - hash -> Set(key, keyOtherKey.remoteKey), - diffHash -> Set(keyDiffHash.remoteKey) + hash -> key, + diffHash -> keyDiffHash.remoteKey ), byKey = MapView( key -> hash, @@ -59,14 +59,14 @@ class StorageServiceSuite extends FunSpec with MockFactory { def invoke(localFile: LocalFile, s3ObjectsData: RemoteObjects) = S3MetaDataEnricher.getS3Status(localFile, s3ObjectsData) - def getMatchesByKey(status: (Option[MD5Hash], Set[(RemoteKey, MD5Hash)])) - : Option[MD5Hash] = { + def getMatchesByKey( + status: (Option[MD5Hash], Map[MD5Hash, RemoteKey])): Option[MD5Hash] = { val (byKey, _) = status byKey } - def getMatchesByHash(status: (Option[MD5Hash], Set[(RemoteKey, MD5Hash)])) - : Set[(RemoteKey, MD5Hash)] = { + def getMatchesByHash(status: (Option[MD5Hash], Map[MD5Hash, RemoteKey])) + : Map[MD5Hash, RemoteKey] = { val (_, byHash) = status byHash } diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderTest.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderTest.scala index 0a559c4..18e55e5 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderTest.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderTest.scala @@ -5,7 +5,7 @@ import java.io.File import com.amazonaws.SdkClientException import com.amazonaws.services.s3.model.AmazonS3Exception import com.amazonaws.services.s3.transfer.model.UploadResult -import net.kemitix.thorp.config.{Config, Resource} +import net.kemitix.thorp.config.Config import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain.StorageQueueEvent.{ Action, @@ -17,6 +17,7 @@ import org.scalamock.scalatest.MockFactory import org.scalatest.FreeSpec import zio.{DefaultRuntime, Task} import net.kemitix.thorp.domain.NonUnit.~* +import net.kemitix.thorp.filesystem.Resource class UploaderTest extends FreeSpec with MockFactory { @@ -26,7 +27,7 @@ class UploaderTest extends FreeSpec with MockFactory { val aHash = MD5Hash("aHash") val hashes = Map[HashType, MD5Hash](MD5 -> aHash) val remoteKey = RemoteKey("aRemoteKey") - val localFile = LocalFile(aFile, aSource, hashes, remoteKey) + val localFile = LocalFile(aFile, aSource, hashes, remoteKey, aFile.length) val bucket = Bucket("aBucket") val uploadResult = new UploadResult uploadResult.setKey(remoteKey.key) @@ -35,7 +36,7 @@ class UploaderTest extends FreeSpec with MockFactory { override def waitForUploadResult: UploadResult = uploadResult } val listenerSettings = - UploadEventListener.Settings(localFile, 0, SyncTotals(1, 0, 0), 0) + UploadEventListener.Settings(localFile, 0, 0, batchMode = true) "when no error" in { val expected = Right(UploadQueueEvent(remoteKey, aHash)) diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/hasher/ETagGeneratorTest.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/hasher/ETagGeneratorTest.scala index feb90a5..876e5b2 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/hasher/ETagGeneratorTest.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/hasher/ETagGeneratorTest.scala @@ -3,11 +3,9 @@ package net.kemitix.thorp.storage.aws.hasher import java.nio.file.Path import com.amazonaws.services.s3.transfer.TransferManagerConfiguration -import net.kemitix.thorp.config.Resource -import net.kemitix.thorp.core.hasher.Hasher import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain.MD5Hash -import net.kemitix.thorp.filesystem.FileSystem +import net.kemitix.thorp.filesystem.{FileSystem, Hasher, Resource} import org.scalatest.FunSpec import zio.DefaultRuntime diff --git a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/Storage.scala b/storage/src/main/scala/net/kemitix/thorp/storage/Storage.scala similarity index 82% rename from storage-api/src/main/scala/net/kemitix/thorp/storage/api/Storage.scala rename to storage/src/main/scala/net/kemitix/thorp/storage/Storage.scala index f3148a6..e36f60d 100644 --- a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/Storage.scala +++ b/storage/src/main/scala/net/kemitix/thorp/storage/Storage.scala @@ -1,9 +1,7 @@ -package net.kemitix.thorp.storage.api +package net.kemitix.thorp.storage -import net.kemitix.thorp.config.Config -import net.kemitix.thorp.console.Console import net.kemitix.thorp.domain._ -import zio.{Task, RIO, UIO, ZIO} +import zio.{RIO, Task, UIO, ZIO} trait Storage { val storage: Storage.Service @@ -15,13 +13,13 @@ object Storage { def listObjects( bucket: Bucket, prefix: RemoteKey - ): RIO[Storage with Console, RemoteObjects] + ): RIO[Storage, RemoteObjects] def upload( localFile: LocalFile, bucket: Bucket, listenerSettings: UploadEventListener.Settings, - ): ZIO[Storage with Config, Nothing, StorageQueueEvent] + ): ZIO[Storage, Nothing, StorageQueueEvent] def copy( bucket: Bucket, @@ -53,9 +51,8 @@ object Storage { val storage: Service = new Service { - override def listObjects( - bucket: Bucket, - prefix: RemoteKey): RIO[Storage with Console, RemoteObjects] = + override def listObjects(bucket: Bucket, + prefix: RemoteKey): RIO[Storage, RemoteObjects] = listResult override def upload( @@ -85,14 +82,14 @@ object Storage { object Test extends Test final def list(bucket: Bucket, - prefix: RemoteKey): RIO[Storage with Console, RemoteObjects] = + prefix: RemoteKey): RIO[Storage, RemoteObjects] = ZIO.accessM(_.storage listObjects (bucket, prefix)) final def upload( localFile: LocalFile, bucket: Bucket, listenerSettings: UploadEventListener.Settings - ): ZIO[Storage with Config, Nothing, StorageQueueEvent] = + ): ZIO[Storage, Nothing, StorageQueueEvent] = ZIO.accessM(_.storage upload (localFile, bucket, listenerSettings)) final def copy( diff --git a/uishell/src/main/scala/net/kemitix/throp/uishell/ProgressEvent.scala b/uishell/src/main/scala/net/kemitix/throp/uishell/ProgressEvent.scala new file mode 100644 index 0000000..c98919a --- /dev/null +++ b/uishell/src/main/scala/net/kemitix/throp/uishell/ProgressEvent.scala @@ -0,0 +1,22 @@ +package net.kemitix.throp.uishell + +import net.kemitix.eip.zio.MessageChannel +import net.kemitix.thorp.config.Config +import net.kemitix.thorp.console.Console +import net.kemitix.thorp.filesystem.{FileSystem, Hasher} +import zio.clock.Clock + +sealed trait ProgressEvent + +object ProgressEvent { + type Env = Console + type ProgressSender = + MessageChannel.ESender[Config with Clock with Hasher with FileSystem, + Throwable, + ProgressEvent] + type ProgressReceiver = + MessageChannel.Receiver[ProgressEvent.Env, ProgressEvent] + type ProgressChannel = MessageChannel.Channel[Console, ProgressEvent] + + final case class PingEvent() extends ProgressEvent +} diff --git a/uishell/src/main/scala/net/kemitix/throp/uishell/UIEvent.scala b/uishell/src/main/scala/net/kemitix/throp/uishell/UIEvent.scala new file mode 100644 index 0000000..63e0468 --- /dev/null +++ b/uishell/src/main/scala/net/kemitix/throp/uishell/UIEvent.scala @@ -0,0 +1,37 @@ +package net.kemitix.throp.uishell + +import net.kemitix.thorp.domain._ + +sealed trait UIEvent +object UIEvent { + case object ShowValidConfig extends UIEvent + + case class RemoteDataFetched(size: Int) extends UIEvent + + case class ShowSummary(counters: Counters) extends UIEvent + + case class FileFound(localFile: LocalFile) extends UIEvent + + case class ActionChosen(action: Action) extends UIEvent + + /** + * The content of the file ({{hash}}) that will be placed + * at {{remoteKey}} is already being uploaded to another + * location. Once that upload has completed, its RemoteKey + * will become available and a Copy action can be made. + * @param remoteKey where this upload will copy the other to + * @param hash the hash of the file being uploaded + */ + case class AwaitingAnotherUpload(remoteKey: RemoteKey, hash: MD5Hash) + extends UIEvent + + case class AnotherUploadWaitComplete(action: Action) extends UIEvent + + case class ActionFinished(action: Action, + actionCounter: Int, + bytesCounter: Long) + extends UIEvent + + case class KeyFound(remoteKey: RemoteKey) extends UIEvent + +} diff --git a/uishell/src/main/scala/net/kemitix/throp/uishell/UIShell.scala b/uishell/src/main/scala/net/kemitix/throp/uishell/UIShell.scala new file mode 100644 index 0000000..5299b55 --- /dev/null +++ b/uishell/src/main/scala/net/kemitix/throp/uishell/UIShell.scala @@ -0,0 +1,96 @@ +package net.kemitix.throp.uishell + +import net.kemitix.eip.zio.MessageChannel +import net.kemitix.thorp.config.Config +import net.kemitix.thorp.console.ConsoleOut.{ + CopyComplete, + DeleteComplete, + UploadComplete +} +import net.kemitix.thorp.console.{Console, ConsoleOut} +import net.kemitix.thorp.domain.{ + Action, + Counters, + LocalFile, + MD5Hash, + RemoteKey +} +import net.kemitix.thorp.domain.Action.ToUpload +import net.kemitix.thorp.domain.Terminal.eraseToEndOfScreen +import zio.{UIO, ZIO} + +object UIShell { + def receiver: UIO[MessageChannel.UReceiver[Console with Config, UIEvent]] = + UIO { uiEventMessage => + uiEventMessage.body match { + case UIEvent.ShowValidConfig => showValidConfig + case UIEvent.RemoteDataFetched(size) => remoteDataFetched(size) + case UIEvent.ShowSummary(counters) => showSummary(counters) + case UIEvent.FileFound(localFile) => fileFound(localFile) + case UIEvent.ActionChosen(action) => UIO(()) + case UIEvent.AwaitingAnotherUpload(remoteKey, hash) => + awaitingUpload(remoteKey, hash) + case UIEvent.AnotherUploadWaitComplete(action) => + uploadWaitComplete(action) + case UIEvent.ActionFinished(action, _, _) => actionFinished(action) + case UIEvent.KeyFound(_) => UIO(()) + } + } + + private def actionFinished( + action: Action): ZIO[Console with Config, Nothing, Unit] = { + for { + batchMode <- Config.batchMode + _ <- action match { + case _: Action.DoNothing => UIO(()) + case ToUpload(_, localFile, _) => + Console.putMessageLnB(UploadComplete(localFile.remoteKey), batchMode) + case Action.ToCopy(_, sourceKey, _, targetKey, _) => + Console.putMessageLnB(CopyComplete(sourceKey, targetKey), batchMode) + case Action.ToDelete(_, remoteKey, _) => + Console.putMessageLnB(DeleteComplete(remoteKey), batchMode) + } + } yield () + } + + private def uploadWaitComplete( + action: Action): ZIO[Console, Nothing, Unit] = { + Console.putStrLn(s"Finished waiting to other upload - now $action") + } + + private def awaitingUpload(remoteKey: RemoteKey, + hash: MD5Hash): ZIO[Console, Nothing, Unit] = { + Console.putStrLn( + s"Awaiting another upload of $hash before copying it to $remoteKey") + } + + private def fileFound( + localFile: LocalFile): ZIO[Console with Config, Nothing, Unit] = { + for { + batchMode <- Config.batchMode + _ <- ZIO.when(batchMode)(Console.putStrLn(s"Found: ${localFile.file}")) + } yield () + } + + private def showSummary( + counters: Counters): ZIO[Console with Config, Nothing, Unit] = { + Console.putStrLn(eraseToEndOfScreen) *> + Console.putStrLn(s"Uploaded ${counters.uploaded} files") *> + Console.putStrLn(s"Copied ${counters.copied} files") *> + Console.putStrLn(s"Deleted ${counters.deleted} files") *> + Console.putStrLn(s"Errors ${counters.errors}") + } + + private def remoteDataFetched(size: Int): ZIO[Console, Nothing, Unit] = { + Console.putStrLn(s"Found $size remote objects") + } + + private def showValidConfig: ZIO[Console with Config, Nothing, Unit] = { + for { + bucket <- Config.bucket + prefix <- Config.prefix + sources <- Config.sources + _ <- Console.putMessageLn(ConsoleOut.ValidConfig(bucket, prefix, sources)) + } yield () + } +}