From becd297858d9758866359aa38c68c95220e68d07 Mon Sep 17 00:00:00 2001 From: Paul Campbell Date: Sun, 8 Sep 2019 07:29:23 +0100 Subject: [PATCH] Remove dead code (#190) * [lib] Remove PlanBuilder and PlanExecutor * [lib] Remove ActionGenerator and LocalFileStream * [lib] Remove S3MetaDataEnricher * [lib] Remove Remote * [filesystem] fix paths to test resources * [lib] Remove LocalFileValidator * [lib] Remove SyncPlan * [lib] Remove SequencePlan * [lib] Remove KeyGenerator * [lib] Remove DummyStorageService * [lib] Remove EventQueue * [lib] Remove SyncLogging * [lib] Remove LocalFiles * [lib] inline CoreTypes into Program * [lib] Remote EIPTest * [lib] LocalFileSystem remove unneccary parens * [domain] Remove Monoid * [domain] Remove MatchedMetedata * [domain] Remove NonUnit * [domain] Remove RemoteMetaData * [domain] Rename StorageQueueEvent as StorageEvent * [domain] Remove SyncTotals * [domain] Rename UploadEvent as UploadProgressEvent * [sbt] fix assembly merge strategy to work with zio and zio-streams --- .../scala/net/kemitix/thorp/Program.scala | 34 +- build.sbt | 6 +- .../thorp/config/ConfigQueryTest.scala | 3 +- .../config/ConfigurationBuilderTest.scala | 9 +- .../kemitix/thorp/console/ConsoleOut.scala | 4 +- .../thorp/domain/MatchedMetadata.scala | 8 - .../net/kemitix/thorp/domain/Monoid.scala | 6 - .../net/kemitix/thorp/domain/NonUnit.scala | 8 - .../kemitix/thorp/domain/RemoteMetaData.scala | 6 - .../kemitix/thorp/domain/StorageEvent.scala | 49 +++ .../thorp/domain/StorageQueueEvent.scala | 49 --- .../net/kemitix/thorp/domain/SyncTotals.scala | 15 - .../thorp/domain/UploadEventListener.scala | 4 +- ...dEvent.scala => UploadProgressEvent.scala} | 10 +- .../thorp/domain/TemporaryFolder.scala | 37 +- .../filesystem/MD5HashGeneratorTest.scala | 6 +- .../kemitix/thorp/lib/ActionGenerator.scala | 95 ----- .../net/kemitix/thorp/lib/CoreTypes.scala | 21 - .../net/kemitix/thorp/lib/EventQueue.scala | 8 - .../net/kemitix/thorp/lib/KeyGenerator.scala | 20 - .../kemitix/thorp/lib/LocalFileStream.scala | 80 ---- .../kemitix/thorp/lib/LocalFileSystem.scala | 19 +- .../thorp/lib/LocalFileValidator.scala | 67 --- .../net/kemitix/thorp/lib/LocalFiles.scala | 24 -- .../net/kemitix/thorp/lib/PlanBuilder.scala | 84 ---- .../net/kemitix/thorp/lib/PlanExecutor.scala | 55 --- .../scala/net/kemitix/thorp/lib/Remote.scala | 30 -- .../thorp/lib/S3MetaDataEnricher.scala | 50 --- .../net/kemitix/thorp/lib/SequencePlan.scala | 17 - .../net/kemitix/thorp/lib/SyncLogging.scala | 18 - .../net/kemitix/thorp/lib/SyncPlan.scala | 14 - .../net/kemitix/thorp/lib/ThorpArchive.scala | 21 +- .../thorp/lib/UnversionedMirrorArchive.scala | 6 +- .../thorp/lib/ActionGeneratorSuite.scala | 207 --------- .../thorp/lib/DummyStorageService.scala | 42 -- .../scala/net/kemitix/thorp/lib/EIPTest.scala | 50 --- .../kemitix/thorp/lib/KeyGeneratorSuite.scala | 45 -- .../thorp/lib/LocalFileStreamSuite.scala | 92 ---- .../lib/MatchedMetadataEnricherSuite.scala | 310 -------------- .../kemitix/thorp/lib/PlanBuilderTest.scala | 397 ------------------ .../kemitix/thorp/lib/PlanExecutorTest.scala | 57 --- .../kemitix/thorp/lib/SequencePlanTest.scala | 43 -- .../kemitix/thorp/storage/aws/Copier.scala | 29 +- .../kemitix/thorp/storage/aws/Deleter.scala | 18 +- .../kemitix/thorp/storage/aws/S3Storage.scala | 12 +- .../kemitix/thorp/storage/aws/Uploader.scala | 32 +- .../aws/AmazonS3ClientTestFixture.scala | 12 +- .../thorp/storage/aws/CopierTest.scala | 45 +- .../thorp/storage/aws/DeleterTest.scala | 42 +- .../thorp/storage/aws/ListerTest.scala | 17 +- .../storage/aws/StorageServiceSuite.scala | 147 ------- .../thorp/storage/aws/UploaderTest.scala | 42 +- .../net/kemitix/thorp/storage/Storage.scala | 30 +- 53 files changed, 264 insertions(+), 2288 deletions(-) delete mode 100644 domain/src/main/scala/net/kemitix/thorp/domain/MatchedMetadata.scala delete mode 100644 domain/src/main/scala/net/kemitix/thorp/domain/Monoid.scala delete mode 100644 domain/src/main/scala/net/kemitix/thorp/domain/NonUnit.scala delete mode 100644 domain/src/main/scala/net/kemitix/thorp/domain/RemoteMetaData.scala create mode 100644 domain/src/main/scala/net/kemitix/thorp/domain/StorageEvent.scala delete mode 100644 domain/src/main/scala/net/kemitix/thorp/domain/StorageQueueEvent.scala delete mode 100644 domain/src/main/scala/net/kemitix/thorp/domain/SyncTotals.scala rename domain/src/main/scala/net/kemitix/thorp/domain/{UploadEvent.scala => UploadProgressEvent.scala} (62%) delete mode 100644 lib/src/main/scala/net/kemitix/thorp/lib/ActionGenerator.scala delete mode 100644 lib/src/main/scala/net/kemitix/thorp/lib/CoreTypes.scala delete mode 100644 lib/src/main/scala/net/kemitix/thorp/lib/EventQueue.scala delete mode 100644 lib/src/main/scala/net/kemitix/thorp/lib/KeyGenerator.scala delete mode 100644 lib/src/main/scala/net/kemitix/thorp/lib/LocalFileStream.scala delete mode 100644 lib/src/main/scala/net/kemitix/thorp/lib/LocalFileValidator.scala delete mode 100644 lib/src/main/scala/net/kemitix/thorp/lib/LocalFiles.scala delete mode 100644 lib/src/main/scala/net/kemitix/thorp/lib/PlanBuilder.scala delete mode 100644 lib/src/main/scala/net/kemitix/thorp/lib/PlanExecutor.scala delete mode 100644 lib/src/main/scala/net/kemitix/thorp/lib/Remote.scala delete mode 100644 lib/src/main/scala/net/kemitix/thorp/lib/S3MetaDataEnricher.scala delete mode 100644 lib/src/main/scala/net/kemitix/thorp/lib/SequencePlan.scala delete mode 100644 lib/src/main/scala/net/kemitix/thorp/lib/SyncLogging.scala delete mode 100644 lib/src/main/scala/net/kemitix/thorp/lib/SyncPlan.scala delete mode 100644 lib/src/test/scala/net/kemitix/thorp/lib/ActionGeneratorSuite.scala delete mode 100644 lib/src/test/scala/net/kemitix/thorp/lib/DummyStorageService.scala delete mode 100644 lib/src/test/scala/net/kemitix/thorp/lib/EIPTest.scala delete mode 100644 lib/src/test/scala/net/kemitix/thorp/lib/KeyGeneratorSuite.scala delete mode 100644 lib/src/test/scala/net/kemitix/thorp/lib/LocalFileStreamSuite.scala delete mode 100644 lib/src/test/scala/net/kemitix/thorp/lib/MatchedMetadataEnricherSuite.scala delete mode 100644 lib/src/test/scala/net/kemitix/thorp/lib/PlanBuilderTest.scala delete mode 100644 lib/src/test/scala/net/kemitix/thorp/lib/PlanExecutorTest.scala delete mode 100644 lib/src/test/scala/net/kemitix/thorp/lib/SequencePlanTest.scala delete mode 100644 storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala diff --git a/app/src/main/scala/net/kemitix/thorp/Program.scala b/app/src/main/scala/net/kemitix/thorp/Program.scala index b9e6d60..87f9c06 100644 --- a/app/src/main/scala/net/kemitix/thorp/Program.scala +++ b/app/src/main/scala/net/kemitix/thorp/Program.scala @@ -5,15 +5,14 @@ import net.kemitix.eip.zio.{Message, MessageChannel} import net.kemitix.thorp.cli.CliArgs import net.kemitix.thorp.config._ import net.kemitix.thorp.console._ -import net.kemitix.thorp.domain.{Counters, StorageQueueEvent} -import net.kemitix.thorp.domain.StorageQueueEvent.{ - CopyQueueEvent, - DeleteQueueEvent, - ErrorQueueEvent, - UploadQueueEvent +import net.kemitix.thorp.domain.{Counters, StorageEvent} +import net.kemitix.thorp.domain.StorageEvent.{ + CopyEvent, + DeleteEvent, + ErrorEvent, + UploadEvent } import net.kemitix.thorp.filesystem.{FileSystem, Hasher} -import net.kemitix.thorp.lib.CoreTypes.CoreProgram import net.kemitix.thorp.lib._ import net.kemitix.thorp.storage.Storage import net.kemitix.throp.uishell.{UIEvent, UIShell} @@ -24,7 +23,10 @@ trait Program { lazy val version = s"Thorp v${thorp.BuildInfo.version}" - def run(args: List[String]): CoreProgram[Unit] = { + def run(args: List[String]): ZIO[ + Storage with Console with Config with Clock with FileSystem with Hasher with FileScanner, + Throwable, + Unit] = { for { cli <- CliArgs.parse(args) config <- ConfigurationBuilder.buildConfig(cli) @@ -87,22 +89,22 @@ trait Program { } private def showSummary(uiChannel: UIChannel)( - events: Seq[StorageQueueEvent]): RIO[Clock, Unit] = { + events: Seq[StorageEvent]): RIO[Clock, Unit] = { val counters = events.foldLeft(Counters.empty)(countActivities) Message.create(UIEvent.ShowSummary(counters)) >>= MessageChannel.send(uiChannel) } - private def countActivities: (Counters, StorageQueueEvent) => Counters = - (counters: Counters, s3Action: StorageQueueEvent) => { + private def countActivities: (Counters, StorageEvent) => Counters = + (counters: Counters, s3Action: StorageEvent) => { val increment: Int => Int = _ + 1 s3Action match { - case _: UploadQueueEvent => + case _: UploadEvent => Counters.uploaded.modify(increment)(counters) - case _: CopyQueueEvent => Counters.copied.modify(increment)(counters) - case _: DeleteQueueEvent => Counters.deleted.modify(increment)(counters) - case _: ErrorQueueEvent => Counters.errors.modify(increment)(counters) - case _ => counters + case _: CopyEvent => Counters.copied.modify(increment)(counters) + case _: DeleteEvent => Counters.deleted.modify(increment)(counters) + case _: ErrorEvent => Counters.errors.modify(increment)(counters) + case _ => counters } } diff --git a/build.sbt b/build.sbt index 2a41cac..d30cfda 100644 --- a/build.sbt +++ b/build.sbt @@ -32,7 +32,11 @@ val commonSettings = Seq( Wart.NonUnitStatements, Wart.StringPlusAny ).contains(wart)), - test in assembly := {} + test in assembly := {}, + assemblyMergeStrategy in assembly := { + case PathList("META-INF", xs @ _*) => MergeStrategy.discard + case x => MergeStrategy.first + } ) val applicationSettings = Seq( diff --git a/config/src/test/scala/net/kemitix/thorp/config/ConfigQueryTest.scala b/config/src/test/scala/net/kemitix/thorp/config/ConfigQueryTest.scala index cf03c2a..1a83d6d 100644 --- a/config/src/test/scala/net/kemitix/thorp/config/ConfigQueryTest.scala +++ b/config/src/test/scala/net/kemitix/thorp/config/ConfigQueryTest.scala @@ -2,7 +2,6 @@ package net.kemitix.thorp.config import java.nio.file.Paths -import net.kemitix.thorp.domain.NonUnit.~* import net.kemitix.thorp.domain.Sources import org.scalatest.FreeSpec @@ -76,7 +75,7 @@ class ConfigQueryTest extends FreeSpec { val pwd = Paths.get(System.getenv("PWD")) val expected = Sources(List(pwd)) val result = ConfigQuery.sources(ConfigOptions(List())) - ~*(assertResult(expected)(result)) + assertResult(expected)(result) } } "when is set once" - { diff --git a/config/src/test/scala/net/kemitix/thorp/config/ConfigurationBuilderTest.scala b/config/src/test/scala/net/kemitix/thorp/config/ConfigurationBuilderTest.scala index 040e2f1..3f53c04 100644 --- a/config/src/test/scala/net/kemitix/thorp/config/ConfigurationBuilderTest.scala +++ b/config/src/test/scala/net/kemitix/thorp/config/ConfigurationBuilderTest.scala @@ -3,7 +3,6 @@ package net.kemitix.thorp.config import java.nio.file.{Path, Paths} import net.kemitix.thorp.domain.Filter.{Exclude, Include} -import net.kemitix.thorp.domain.NonUnit.~* import net.kemitix.thorp.domain._ import net.kemitix.thorp.filesystem.FileSystem import org.scalatest.FunSpec @@ -131,10 +130,10 @@ class ConfigurationBuilderTest extends FunSpec with TemporaryFolder { Filter.Include("current-include"))) val options = configOptions(ConfigOption.Source(currentSource)) val result = invoke(options) - ~*(assertResult(expectedSources)(result.map(_.sources))) - ~*(assertResult(expectedBuckets)(result.map(_.bucket))) - ~*(assertResult(expectedPrefixes)(result.map(_.prefix))) - ~*(assertResult(expectedFilters)(result.map(_.filters))) + assertResult(expectedSources)(result.map(_.sources)) + assertResult(expectedBuckets)(result.map(_.bucket)) + assertResult(expectedPrefixes)(result.map(_.prefix)) + assertResult(expectedFilters)(result.map(_.filters)) }) }) } diff --git a/console/src/main/scala/net/kemitix/thorp/console/ConsoleOut.scala b/console/src/main/scala/net/kemitix/thorp/console/ConsoleOut.scala index 0e9b431..f968c0c 100644 --- a/console/src/main/scala/net/kemitix/thorp/console/ConsoleOut.scala +++ b/console/src/main/scala/net/kemitix/thorp/console/ConsoleOut.scala @@ -1,6 +1,6 @@ package net.kemitix.thorp.console -import net.kemitix.thorp.domain.StorageQueueEvent.Action +import net.kemitix.thorp.domain.StorageEvent.ActionSummary import net.kemitix.thorp.domain.Terminal._ import net.kemitix.thorp.domain.{Bucket, RemoteKey, Sources} import zio.UIO @@ -59,7 +59,7 @@ object ConsoleOut { s"Deleted: $remoteKey" } - final case class ErrorQueueEventOccurred(action: Action, e: Throwable) + final case class ErrorQueueEventOccurred(action: ActionSummary, e: Throwable) extends ConsoleOut.WithBatchMode { override def en: String = s"${action.name} failed: ${action.keys}: ${e.getMessage}" diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/MatchedMetadata.scala b/domain/src/main/scala/net/kemitix/thorp/domain/MatchedMetadata.scala deleted file mode 100644 index bcbfd46..0000000 --- a/domain/src/main/scala/net/kemitix/thorp/domain/MatchedMetadata.scala +++ /dev/null @@ -1,8 +0,0 @@ -package net.kemitix.thorp.domain - -// For the LocalFile, the set of matching S3 objects with the same MD5Hash, and any S3 object with the same remote key -final case class MatchedMetadata( - localFile: LocalFile, - matchByHash: Option[RemoteMetaData], - matchByKey: Option[RemoteMetaData] -) diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/Monoid.scala b/domain/src/main/scala/net/kemitix/thorp/domain/Monoid.scala deleted file mode 100644 index e43f060..0000000 --- a/domain/src/main/scala/net/kemitix/thorp/domain/Monoid.scala +++ /dev/null @@ -1,6 +0,0 @@ -package net.kemitix.thorp.domain - -trait Monoid[T] { - def zero: T - def op(t1: T, t2: T): T -} diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/NonUnit.scala b/domain/src/main/scala/net/kemitix/thorp/domain/NonUnit.scala deleted file mode 100644 index f532de1..0000000 --- a/domain/src/main/scala/net/kemitix/thorp/domain/NonUnit.scala +++ /dev/null @@ -1,8 +0,0 @@ -package net.kemitix.thorp.domain - -object NonUnit { - @specialized def ~*[A](evaluateForSideEffectOnly: A): Unit = { - val _ = evaluateForSideEffectOnly - () //Return unit to prevent warning due to discarding value - } -} diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/RemoteMetaData.scala b/domain/src/main/scala/net/kemitix/thorp/domain/RemoteMetaData.scala deleted file mode 100644 index 5841a9d..0000000 --- a/domain/src/main/scala/net/kemitix/thorp/domain/RemoteMetaData.scala +++ /dev/null @@ -1,6 +0,0 @@ -package net.kemitix.thorp.domain - -final case class RemoteMetaData( - remoteKey: RemoteKey, - hash: MD5Hash -) diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/StorageEvent.scala b/domain/src/main/scala/net/kemitix/thorp/domain/StorageEvent.scala new file mode 100644 index 0000000..c396a2e --- /dev/null +++ b/domain/src/main/scala/net/kemitix/thorp/domain/StorageEvent.scala @@ -0,0 +1,49 @@ +package net.kemitix.thorp.domain + +sealed trait StorageEvent + +object StorageEvent { + + final case class DoNothingEvent( + remoteKey: RemoteKey + ) extends StorageEvent + + final case class CopyEvent( + sourceKey: RemoteKey, + targetKey: RemoteKey + ) extends StorageEvent + + final case class UploadEvent( + remoteKey: RemoteKey, + md5Hash: MD5Hash + ) extends StorageEvent + + final case class DeleteEvent( + remoteKey: RemoteKey + ) extends StorageEvent + + final case class ErrorEvent( + action: ActionSummary, + remoteKey: RemoteKey, + e: Throwable + ) extends StorageEvent + + final case class ShutdownEvent() extends StorageEvent + + sealed trait ActionSummary { + val name: String + val keys: String + } + object ActionSummary { + final case class Copy(keys: String) extends ActionSummary { + override val name: String = "Copy" + } + final case class Upload(keys: String) extends ActionSummary { + override val name: String = "Upload" + } + final case class Delete(keys: String) extends ActionSummary { + override val name: String = "Delete" + } + } + +} diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/StorageQueueEvent.scala b/domain/src/main/scala/net/kemitix/thorp/domain/StorageQueueEvent.scala deleted file mode 100644 index 52b946a..0000000 --- a/domain/src/main/scala/net/kemitix/thorp/domain/StorageQueueEvent.scala +++ /dev/null @@ -1,49 +0,0 @@ -package net.kemitix.thorp.domain - -sealed trait StorageQueueEvent - -object StorageQueueEvent { - - final case class DoNothingQueueEvent( - remoteKey: RemoteKey - ) extends StorageQueueEvent - - final case class CopyQueueEvent( - sourceKey: RemoteKey, - targetKey: RemoteKey - ) extends StorageQueueEvent - - final case class UploadQueueEvent( - remoteKey: RemoteKey, - md5Hash: MD5Hash - ) extends StorageQueueEvent - - final case class DeleteQueueEvent( - remoteKey: RemoteKey - ) extends StorageQueueEvent - - final case class ErrorQueueEvent( - action: Action, - remoteKey: RemoteKey, - e: Throwable - ) extends StorageQueueEvent - - final case class ShutdownQueueEvent() extends StorageQueueEvent - - sealed trait Action { - val name: String - val keys: String - } - object Action { - final case class Copy(keys: String) extends Action { - override val name: String = "Copy" - } - final case class Upload(keys: String) extends Action { - override val name: String = "Upload" - } - final case class Delete(keys: String) extends Action { - override val name: String = "Delete" - } - } - -} diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/SyncTotals.scala b/domain/src/main/scala/net/kemitix/thorp/domain/SyncTotals.scala deleted file mode 100644 index f5657b4..0000000 --- a/domain/src/main/scala/net/kemitix/thorp/domain/SyncTotals.scala +++ /dev/null @@ -1,15 +0,0 @@ -package net.kemitix.thorp.domain - -final case class SyncTotals private ( - count: Long, - totalSizeBytes: Long, - sizeUploadedBytes: Long -) - -object SyncTotals { - val empty: SyncTotals = SyncTotals(0L, 0L, 0L) - def create(count: Long, - totalSizeBytes: Long, - sizeUploadedBytes: Long): SyncTotals = - SyncTotals(count, totalSizeBytes, sizeUploadedBytes) -} diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventListener.scala b/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventListener.scala index 7a9981f..da76833 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventListener.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventListener.scala @@ -2,7 +2,7 @@ package net.kemitix.thorp.domain import java.util.concurrent.atomic.AtomicLong -import net.kemitix.thorp.domain.UploadEvent.RequestEvent +import net.kemitix.thorp.domain.UploadProgressEvent.RequestEvent import net.kemitix.thorp.domain.UploadEventLogger.RequestCycle object UploadEventListener { @@ -14,7 +14,7 @@ object UploadEventListener { batchMode: Boolean ) - def listener(settings: Settings): UploadEvent => Unit = { + def listener(settings: Settings): UploadProgressEvent => Unit = { val bytesTransferred = new AtomicLong(0L) event => { diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/UploadEvent.scala b/domain/src/main/scala/net/kemitix/thorp/domain/UploadProgressEvent.scala similarity index 62% rename from domain/src/main/scala/net/kemitix/thorp/domain/UploadEvent.scala rename to domain/src/main/scala/net/kemitix/thorp/domain/UploadProgressEvent.scala index d416904..3431dd7 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/UploadEvent.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/UploadProgressEvent.scala @@ -1,23 +1,23 @@ package net.kemitix.thorp.domain -sealed trait UploadEvent { +sealed trait UploadProgressEvent { def name: String } -object UploadEvent { +object UploadProgressEvent { final case class TransferEvent( name: String - ) extends UploadEvent + ) extends UploadProgressEvent final case class RequestEvent( name: String, bytes: Long, transferred: Long - ) extends UploadEvent + ) extends UploadProgressEvent final case class ByteTransferEvent( name: String - ) extends UploadEvent + ) extends UploadProgressEvent } diff --git a/domain/src/test/scala/net/kemitix/thorp/domain/TemporaryFolder.scala b/domain/src/test/scala/net/kemitix/thorp/domain/TemporaryFolder.scala index 446302b..be0adaf 100644 --- a/domain/src/test/scala/net/kemitix/thorp/domain/TemporaryFolder.scala +++ b/domain/src/test/scala/net/kemitix/thorp/domain/TemporaryFolder.scala @@ -4,8 +4,6 @@ import java.io.{File, IOException, PrintWriter} import java.nio.file.attribute.BasicFileAttributes import java.nio.file.{FileVisitResult, Files, Path, SimpleFileVisitor} -import net.kemitix.thorp.domain.NonUnit.~* - import scala.util.Try trait TemporaryFolder { @@ -15,27 +13,26 @@ trait TemporaryFolder { val dir: Path = Files.createTempDirectory("thorp-temp") val t = Try(testCode(dir)) remove(dir) - ~*(t.get) + t.get + () } def remove(root: Path): Unit = { - ~*( - Files.walkFileTree( - root, - new SimpleFileVisitor[Path] { - override def visitFile( - file: Path, - attrs: BasicFileAttributes): FileVisitResult = { - Files.delete(file) - FileVisitResult.CONTINUE - } - override def postVisitDirectory(dir: Path, - exc: IOException): FileVisitResult = { - Files.delete(dir) - FileVisitResult.CONTINUE - } + Files.walkFileTree( + root, + new SimpleFileVisitor[Path] { + override def visitFile(file: Path, + attrs: BasicFileAttributes): FileVisitResult = { + Files.delete(file) + FileVisitResult.CONTINUE } - )) + override def postVisitDirectory(dir: Path, + exc: IOException): FileVisitResult = { + Files.delete(dir) + FileVisitResult.CONTINUE + } + } + ) } def createFile(directory: Path, name: String, contents: String*): File = { @@ -48,6 +45,6 @@ trait TemporaryFolder { } def writeFile(directory: Path, name: String, contents: String*): Unit = - ~*(createFile(directory, name, contents: _*)) + createFile(directory, name, contents: _*) } diff --git a/filesystem/src/test/scala/net/kemitix/thorp/filesystem/MD5HashGeneratorTest.scala b/filesystem/src/test/scala/net/kemitix/thorp/filesystem/MD5HashGeneratorTest.scala index 2e7524c..493c3d8 100644 --- a/filesystem/src/test/scala/net/kemitix/thorp/filesystem/MD5HashGeneratorTest.scala +++ b/filesystem/src/test/scala/net/kemitix/thorp/filesystem/MD5HashGeneratorTest.scala @@ -11,7 +11,7 @@ class MD5HashGeneratorTest extends FunSpec { describe("md5File()") { describe("read a small file (smaller than buffer)") { - val path = Resource(this, "../upload/root-file").toPath + val path = Resource(this, "upload/root-file").toPath it("should generate the correct hash") { val expected = Right(Root.hash) val result = invoke(path) @@ -20,7 +20,7 @@ class MD5HashGeneratorTest extends FunSpec { } describe("read a large file (bigger than buffer)") { - val path = Resource(this, "../big-file").toPath + val path = Resource(this, "big-file").toPath it("should generate the correct hash") { val expected = Right(BigFile.hash) val result = invoke(path) @@ -38,7 +38,7 @@ class MD5HashGeneratorTest extends FunSpec { describe("md5FileChunk") { describe("read chunks of file") { - val path = Resource(this, "../big-file").toPath + val path = Resource(this, "big-file").toPath it("should generate the correct hash for first chunk of the file") { val part1 = BigFile.Part1 val expected = Right(MD5Hash.hash(part1.hash)) diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/ActionGenerator.scala b/lib/src/main/scala/net/kemitix/thorp/lib/ActionGenerator.scala deleted file mode 100644 index 41757d9..0000000 --- a/lib/src/main/scala/net/kemitix/thorp/lib/ActionGenerator.scala +++ /dev/null @@ -1,95 +0,0 @@ -package net.kemitix.thorp.lib - -import net.kemitix.thorp.config.Config -import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToUpload} -import net.kemitix.thorp.domain.Implicits._ -import net.kemitix.thorp.domain._ -import zio.RIO - -object ActionGenerator { - - def createActions( - matchedMetadata: MatchedMetadata, - previousActions: LazyList[Action] - ): RIO[Config, LazyList[Action]] = - for { - bucket <- Config.bucket - } yield - genAction(formattedMetadata(matchedMetadata, previousActions), bucket) - - private def formattedMetadata( - matchedMetadata: MatchedMetadata, - previousActions: LazyList[Action]): TaggedMetadata = { - val remoteExists = matchedMetadata.matchByKey.nonEmpty - val remoteMatches = remoteExists && matchedMetadata.matchByKey.exists(m => - LocalFile.matchesHash(matchedMetadata.localFile)(m.hash)) - val anyMatches = matchedMetadata.matchByHash.nonEmpty - TaggedMetadata(matchedMetadata, - previousActions, - remoteExists, - remoteMatches, - anyMatches) - } - - final case class TaggedMetadata( - matchedMetadata: MatchedMetadata, - previousActions: LazyList[Action], - remoteExists: Boolean, - remoteMatches: Boolean, - anyMatches: Boolean - ) - - private def genAction(taggedMetadata: TaggedMetadata, - bucket: Bucket): LazyList[Action] = { - taggedMetadata match { - case TaggedMetadata(md, _, remoteExists, remoteMatches, _) - if remoteExists && remoteMatches => - doNothing(bucket, md.localFile.remoteKey) - case TaggedMetadata(md, _, _, _, anyMatches) if anyMatches => - copyFile(bucket, md.localFile, md.matchByHash) - case TaggedMetadata(md, previous, _, _, _) - if isNotUploadAlreadyQueued(previous)(md.localFile) => - uploadFile(bucket, md.localFile) - case TaggedMetadata(md, _, _, _, _) => - doNothing(bucket, md.localFile.remoteKey) - } - } - - private def key = LocalFile.remoteKey ^|-> RemoteKey.key - - def isNotUploadAlreadyQueued( - previousActions: LazyList[Action] - )( - localFile: LocalFile - ): Boolean = !previousActions.exists { - case ToUpload(_, lf, _) => key.get(lf) === key.get(localFile) - case _ => false - } - - private def doNothing( - bucket: Bucket, - remoteKey: RemoteKey - ) = LazyList(DoNothing(bucket, remoteKey, 0L)) - - private def uploadFile( - bucket: Bucket, - localFile: LocalFile - ) = LazyList(ToUpload(bucket, localFile, localFile.file.length)) - - private def copyFile( - bucket: Bucket, - localFile: LocalFile, - remoteMetaData: Option[RemoteMetaData] - ) = - LazyList - .from(remoteMetaData) - .take(1) - .map( - other => - ToCopy(bucket, - other.remoteKey, - other.hash, - localFile.remoteKey, - localFile.file.length)) - -} diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/CoreTypes.scala b/lib/src/main/scala/net/kemitix/thorp/lib/CoreTypes.scala deleted file mode 100644 index 62510ab..0000000 --- a/lib/src/main/scala/net/kemitix/thorp/lib/CoreTypes.scala +++ /dev/null @@ -1,21 +0,0 @@ -package net.kemitix.thorp.lib - -import net.kemitix.thorp.config.Config -import net.kemitix.thorp.console.Console -import net.kemitix.thorp.filesystem.{FileSystem, Hasher} -import net.kemitix.thorp.storage.Storage -import zio.ZIO -import zio.clock.Clock - -object CoreTypes { - - type CoreEnv = Storage - with Console - with Config - with Clock - with FileSystem - with Hasher - with FileScanner - type CoreProgram[A] = ZIO[CoreEnv, Throwable, A] - -} diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/EventQueue.scala b/lib/src/main/scala/net/kemitix/thorp/lib/EventQueue.scala deleted file mode 100644 index fe90533..0000000 --- a/lib/src/main/scala/net/kemitix/thorp/lib/EventQueue.scala +++ /dev/null @@ -1,8 +0,0 @@ -package net.kemitix.thorp.lib - -import net.kemitix.thorp.domain.StorageQueueEvent - -final case class EventQueue( - events: LazyList[StorageQueueEvent], - bytesInQueue: Long -) diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/KeyGenerator.scala b/lib/src/main/scala/net/kemitix/thorp/lib/KeyGenerator.scala deleted file mode 100644 index 4cf2c4f..0000000 --- a/lib/src/main/scala/net/kemitix/thorp/lib/KeyGenerator.scala +++ /dev/null @@ -1,20 +0,0 @@ -package net.kemitix.thorp.lib - -import java.nio.file.Path - -import net.kemitix.thorp.domain.{RemoteKey, Sources} -import zio.Task - -object KeyGenerator { - - def generateKey( - sources: Sources, - prefix: RemoteKey - )(path: Path): Task[RemoteKey] = - Sources - .forPath(path)(sources) - .map(_.relativize(path.toAbsolutePath)) - .map(_.toFile.getPath) - .map(RemoteKey.resolve(_)(prefix)) - -} diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileStream.scala b/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileStream.scala deleted file mode 100644 index ebe7db6..0000000 --- a/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileStream.scala +++ /dev/null @@ -1,80 +0,0 @@ -package net.kemitix.thorp.lib - -import java.io.File -import java.nio.file.Path - -import net.kemitix.thorp.config.Config -import net.kemitix.thorp.domain.Sources -import net.kemitix.thorp.filesystem.{FileSystem, Hasher} -import zio.{RIO, Task, ZIO} - -object LocalFileStream { - - def findFiles( - source: Path - ): RIO[Config with FileSystem with Hasher, LocalFiles] = { - - def recurseIntoSubDirectories( - path: Path): RIO[Config with FileSystem with Hasher, LocalFiles] = - path.toFile match { - case f if f.isDirectory => loop(path) - case _ => localFile(path) - } - - def recurse(paths: LazyList[Path]) - : RIO[Config with FileSystem with Hasher, LocalFiles] = - for { - recursed <- ZIO.foreach(paths)(path => recurseIntoSubDirectories(path)) - } yield LocalFiles.reduce(LazyList.from(recursed)) - - def loop(path: Path): RIO[Config with FileSystem with Hasher, LocalFiles] = - dirPaths(path) >>= recurse - - loop(source) - } - - private def dirPaths(path: Path) = - listFiles(path) >>= includedDirPaths - - private def includedDirPaths(paths: LazyList[Path]) = - for { - flaggedPaths <- RIO.foreach(paths)(path => - isIncluded(path).map((path, _))) - } yield - LazyList - .from(flaggedPaths) - .filter({ case (_, included) => included }) - .map({ case (path, _) => path }) - - private def localFile(path: Path) = - for { - sources <- Config.sources - prefix <- Config.prefix - source <- Sources.forPath(path)(sources) - hash <- Hasher.hashObject(path) - localFile <- LocalFileValidator.validate(path, - source.toFile, - hash, - sources, - prefix) - } yield LocalFiles.one(localFile) - - private def listFiles(path: Path) = - for { - files <- Task(path.toFile.listFiles) - _ <- filesMustExist(path, files) - } yield LazyList.from(files.toIndexedSeq).map(_.toPath) - - private def filesMustExist(path: Path, files: Array[File]) = - Task { - Option(files) - .map(_ => ()) - .getOrElse(new IllegalArgumentException(s"Directory not found $path")) - } - - private def isIncluded(path: Path) = - for { - filters <- Config.filters - } yield Filters.isIncluded(path)(filters) - -} diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileSystem.scala b/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileSystem.scala index 88e42c6..7a68be0 100644 --- a/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileSystem.scala +++ b/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileSystem.scala @@ -25,13 +25,13 @@ trait LocalFileSystem { archive: ThorpArchive ): RIO[ Clock with Config with Hasher with FileSystem with FileScanner with Storage, - Seq[StorageQueueEvent]] + Seq[StorageEvent]] def scanDelete( uiChannel: UChannel[Any, UIEvent], remoteData: RemoteObjects, archive: UnversionedMirrorArchive.type - ): RIO[Clock with Config with FileSystem with Storage, Seq[StorageQueueEvent]] + ): RIO[Clock with Config with FileSystem with Storage, Seq[StorageEvent]] } object LocalFileSystem extends LocalFileSystem { @@ -42,12 +42,12 @@ object LocalFileSystem extends LocalFileSystem { archive: ThorpArchive ): RIO[ Clock with Hasher with FileSystem with Config with FileScanner with Storage, - Seq[StorageQueueEvent]] = + Seq[StorageEvent]] = for { actionCounter <- Ref.make(0) bytesCounter <- Ref.make(0L) uploads <- Ref.make(Map.empty[MD5Hash, Promise[Throwable, RemoteKey]]) - eventsRef <- Ref.make(List.empty[StorageQueueEvent]) + eventsRef <- Ref.make(List.empty[StorageEvent]) fileSender <- FileScanner.scanSources fileReceiver <- fileReceiver(uiChannel, remoteObjects, @@ -64,12 +64,11 @@ object LocalFileSystem extends LocalFileSystem { uiChannel: UChannel[Any, UIEvent], remoteData: RemoteObjects, archive: UnversionedMirrorArchive.type - ): RIO[Clock with Config with FileSystem with Storage, - Seq[StorageQueueEvent]] = + ): RIO[Clock with Config with FileSystem with Storage, Seq[StorageEvent]] = for { actionCounter <- Ref.make(0) bytesCounter <- Ref.make(0L) - eventsRef <- Ref.make(List.empty[StorageQueueEvent]) + eventsRef <- Ref.make(List.empty[StorageEvent]) keySender <- keySender(remoteData.byKey.keys) keyReceiver <- keyReceiver(uiChannel, archive, @@ -87,7 +86,7 @@ object LocalFileSystem extends LocalFileSystem { uploads: Ref[Map[MD5Hash, Promise[Throwable, RemoteKey]]], actionCounterRef: Ref[Int], bytesCounterRef: Ref[Long], - eventsRef: Ref[List[StorageQueueEvent]] + eventsRef: Ref[List[StorageEvent]] ): UIO[MessageChannel.UReceiver[Clock with Config with Storage, FileScanner.ScannedFile]] = UIO { message => @@ -135,7 +134,7 @@ object LocalFileSystem extends LocalFileSystem { remoteForHash match { case Some((sourceKey, hash)) => doCopy(localFile, bucket, sourceKey, hash) - case _ if (matchesPreviousUpload(previous, localFile.hashes)) => + case _ if matchesPreviousUpload(previous, localFile.hashes) => doCopyWithPreviousUpload(localFile, bucket, previous, uiChannel) case _ => doUpload(localFile, bucket) @@ -218,7 +217,7 @@ object LocalFileSystem extends LocalFileSystem { archive: ThorpArchive, actionCounterRef: Ref[Int], bytesCounterRef: Ref[Long], - eventsRef: Ref[List[StorageQueueEvent]] + eventsRef: Ref[List[StorageEvent]] ): UIO[ MessageChannel.UReceiver[Clock with Config with FileSystem with Storage, RemoteKey]] = diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileValidator.scala b/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileValidator.scala deleted file mode 100644 index 8f76837..0000000 --- a/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileValidator.scala +++ /dev/null @@ -1,67 +0,0 @@ -package net.kemitix.thorp.lib - -import java.io.File -import java.nio.file.Path - -import net.kemitix.thorp.domain.{ - HashType, - LocalFile, - MD5Hash, - RemoteKey, - Sources -} -import zio.{IO, ZIO} - -object LocalFileValidator { - - def validate( - path: Path, - source: File, - hash: Map[HashType, MD5Hash], - sources: Sources, - prefix: RemoteKey - ): IO[Violation, LocalFile] = - for { - file <- validateFile(path.toFile) - remoteKey <- validateRemoteKey(sources, prefix, path) - } yield LocalFile(file, source, hash, remoteKey, file.length) - - private def validateFile(file: File): IO[Violation, File] = - if (file.isDirectory) - ZIO.fail(Violation.IsNotAFile(file)) - else - ZIO.succeed(file) - - private def validateRemoteKey(sources: Sources, - prefix: RemoteKey, - path: Path): IO[Violation, RemoteKey] = - KeyGenerator - .generateKey(sources, prefix)(path) - .mapError(e => Violation.InvalidRemoteKey(path, e)) - - sealed trait Violation extends Throwable { - def getMessage: String - } - object Violation { - final case class IsNotAFile(file: File) extends Violation { - override def getMessage: String = s"Local File must be a file: ${file}" - } - final case class InvalidRemoteKey(path: Path, e: Throwable) - extends Violation { - override def getMessage: String = - s"Remote Key for '${path}' is invalid: ${e.getMessage}" - } - } - - def resolve( - path: String, - md5Hashes: Map[HashType, MD5Hash], - source: Path, - sources: Sources, - prefix: RemoteKey - ): IO[Violation, LocalFile] = { - val resolvedPath = source.resolve(path) - validate(resolvedPath, source.toFile, md5Hashes, sources, prefix) - } - -} diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/LocalFiles.scala b/lib/src/main/scala/net/kemitix/thorp/lib/LocalFiles.scala deleted file mode 100644 index a524275..0000000 --- a/lib/src/main/scala/net/kemitix/thorp/lib/LocalFiles.scala +++ /dev/null @@ -1,24 +0,0 @@ -package net.kemitix.thorp.lib - -import net.kemitix.thorp.domain.LocalFile - -final case class LocalFiles( - localFiles: LazyList[LocalFile], - count: Long, - totalSizeBytes: Long -) { - def ++(append: LocalFiles): LocalFiles = - copy( - localFiles = localFiles ++ append.localFiles, - count = count + append.count, - totalSizeBytes = totalSizeBytes + append.totalSizeBytes - ) -} - -object LocalFiles { - val empty: LocalFiles = LocalFiles(LazyList.empty, 0L, 0L) - def reduce: LazyList[LocalFiles] => LocalFiles = - list => list.foldLeft(LocalFiles.empty)((acc, lf) => acc ++ lf) - def one(localFile: LocalFile): LocalFiles = - LocalFiles(LazyList(localFile), 1, localFile.file.length) -} diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/PlanBuilder.scala b/lib/src/main/scala/net/kemitix/thorp/lib/PlanBuilder.scala deleted file mode 100644 index 543e40d..0000000 --- a/lib/src/main/scala/net/kemitix/thorp/lib/PlanBuilder.scala +++ /dev/null @@ -1,84 +0,0 @@ -package net.kemitix.thorp.lib - -import net.kemitix.thorp.config.Config -import net.kemitix.thorp.console._ -import net.kemitix.thorp.domain.Action._ -import net.kemitix.thorp.domain._ -import net.kemitix.thorp.filesystem.{FileSystem, Hasher} -import net.kemitix.thorp.storage.Storage -import zio.{RIO, ZIO} - -object PlanBuilder { - - def createPlan(remoteObjects: RemoteObjects) - : RIO[Storage with Console with Config with FileSystem with Hasher, - SyncPlan] = findLocalFiles >>= assemblePlan(remoteObjects) - - private def assemblePlan(remoteObjects: RemoteObjects)( - localData: LocalFiles) = - createActions(remoteObjects, localData.localFiles) - .map(_.filter(doesSomething).sortBy(SequencePlan.order)) - .map(syncPlan(localData)) - - private def syncPlan(localData: LocalFiles): LazyList[Action] => SyncPlan = - SyncPlan.create(_, syncTotal(localData)) - - private def syncTotal(localData: LocalFiles): SyncTotals = - SyncTotals.create(localData.count, localData.totalSizeBytes, 0L) - - private def createActions(remoteObjects: RemoteObjects, - localFiles: LazyList[LocalFile]) = - for { - fileActions <- actionsForLocalFiles(remoteObjects, localFiles) - remoteActions <- actionsForRemoteKeys(remoteObjects.byKey.keys) - } yield fileActions ++ remoteActions - - private def doesSomething: Action => Boolean = { - case _: DoNothing => false - case _ => true - } - - private def actionsForLocalFiles(remoteObjects: RemoteObjects, - localFiles: LazyList[LocalFile]) = - ZIO.foldLeft(localFiles)(LazyList.empty[Action])( - (acc, localFile) => - createActionsFromLocalFile(remoteObjects, acc, localFile) - .map(_ #::: acc) - ) - - private def createActionsFromLocalFile(remoteObjects: RemoteObjects, - previousActions: LazyList[Action], - localFile: LocalFile) = - ActionGenerator.createActions( - S3MetaDataEnricher.getMetadata(localFile, remoteObjects), - previousActions - ) - - private def actionsForRemoteKeys(remoteKeys: Iterable[RemoteKey]) = - ZIO.foldLeft(remoteKeys)(LazyList.empty[Action])( - (acc, remoteKey) => createActionFromRemoteKey(remoteKey).map(_ #:: acc) - ) - - private def createActionFromRemoteKey( - remoteKey: RemoteKey - ): ZIO[FileSystem with Config, Throwable, Action] = - for { - bucket <- Config.bucket - prefix <- Config.prefix - sources <- Config.sources - needsDeleted <- Remote.isMissingLocally(sources, prefix, remoteKey) - } yield - if (needsDeleted) ToDelete(bucket, remoteKey, 0L) - else DoNothing(bucket, remoteKey, 0L) - - private def findLocalFiles = - SyncLogging.logFileScan *> findFiles - - private def findFiles = - for { - sources <- Config.sources - found <- ZIO.foreach(sources.paths)(LocalFileStream.findFiles) - _ <- Console.putStrLn(s"Found ${found.flatMap(_.localFiles).size} files") - } yield LocalFiles.reduce(LazyList.from(found)) - -} diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/PlanExecutor.scala b/lib/src/main/scala/net/kemitix/thorp/lib/PlanExecutor.scala deleted file mode 100644 index 1213330..0000000 --- a/lib/src/main/scala/net/kemitix/thorp/lib/PlanExecutor.scala +++ /dev/null @@ -1,55 +0,0 @@ -package net.kemitix.thorp.lib - -import net.kemitix.thorp.config.Config -import net.kemitix.thorp.console.Console -import net.kemitix.thorp.domain.{Action, StorageQueueEvent} -import net.kemitix.thorp.storage.Storage -import zio.{Ref, ZIO} - -trait PlanExecutor { - - def executePlan( - archive: ThorpArchive, - syncPlan: SyncPlan - ): ZIO[Storage with Config with Console, - Throwable, - Seq[ - StorageQueueEvent - ]] = - for { - actionCounter <- Ref.make(0) - bytesCounter <- Ref.make(0L) - events <- applyActions(archive, syncPlan, actionCounter, bytesCounter) - } yield events - - private def applyActions( - archive: ThorpArchive, - syncPlan: SyncPlan, - actionCounter: Ref[Int], - bytesCounter: Ref[Long] - ): ZIO[Storage with Console with Config, - Throwable, - LazyList[StorageQueueEvent]] = { - ZIO.foldLeft(syncPlan.actions)(LazyList.empty[StorageQueueEvent]) { - (stream: LazyList[StorageQueueEvent], action) => - val result: ZIO[Storage with Console with Config, - Throwable, - StorageQueueEvent] = - updateArchive(archive, actionCounter, bytesCounter)(action) - result.map(event => event #:: stream) - } - } - - private def updateArchive(archive: ThorpArchive, - actionCounterRef: Ref[Int], - bytesCounterRef: Ref[Long])(action: Action) = - for { - actionCounter <- actionCounterRef.update(_ + 1) - bytesCounter <- bytesCounterRef.update(_ + action.size) - event <- archive.update(SequencedAction(action, actionCounter), - bytesCounter) - } yield event - -} - -object PlanExecutor extends PlanExecutor diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/Remote.scala b/lib/src/main/scala/net/kemitix/thorp/lib/Remote.scala deleted file mode 100644 index 582b406..0000000 --- a/lib/src/main/scala/net/kemitix/thorp/lib/Remote.scala +++ /dev/null @@ -1,30 +0,0 @@ -package net.kemitix.thorp.lib - -import java.nio.file.Path - -import net.kemitix.thorp.domain.{RemoteKey, Sources} -import net.kemitix.thorp.filesystem.FileSystem -import zio.{RIO, ZIO} - -object Remote { - - def isMissingLocally(sources: Sources, - prefix: RemoteKey, - remoteKey: RemoteKey): RIO[FileSystem, Boolean] = - existsLocally(sources, prefix)(remoteKey) - .map(exists => !exists) - - def existsLocally(sources: Sources, prefix: RemoteKey)( - remoteKey: RemoteKey - ): RIO[FileSystem, Boolean] = { - def existsInSource(source: Path) = - RemoteKey.asFile(source, prefix)(remoteKey) match { - case Some(file) => FileSystem.exists(file) - case None => ZIO.succeed(false) - } - ZIO - .foreach(sources.paths)(existsInSource) - .map(lb => lb.exists(l => l)) - } - -} diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/S3MetaDataEnricher.scala b/lib/src/main/scala/net/kemitix/thorp/lib/S3MetaDataEnricher.scala deleted file mode 100644 index b0d8c15..0000000 --- a/lib/src/main/scala/net/kemitix/thorp/lib/S3MetaDataEnricher.scala +++ /dev/null @@ -1,50 +0,0 @@ -package net.kemitix.thorp.lib - -import net.kemitix.thorp.domain._ - -object S3MetaDataEnricher { - - def getMetadata( - localFile: LocalFile, - remoteObjects: RemoteObjects - ): MatchedMetadata = { - val (keyMatches, hashMatches) = getS3Status(localFile, remoteObjects) - - val maybeByKey: Option[RemoteMetaData] = keyMatches.map { hash => - RemoteMetaData(localFile.remoteKey, hash) - } - - val maybeByHash: Option[RemoteMetaData] = hashMatches.map { - case (md5Hash, remoteKey) => - RemoteMetaData(remoteKey, md5Hash) - }.headOption - - MatchedMetadata( - localFile, - matchByKey = maybeByKey, - matchByHash = maybeByHash - ) - } - - def getS3Status( - localFile: LocalFile, - remoteObjects: RemoteObjects - ): (Option[MD5Hash], Map[MD5Hash, RemoteKey]) = { - val byKey: Option[MD5Hash] = - remoteObjects.byKey.get(localFile.remoteKey) - val hashes: Map[HashType, MD5Hash] = localFile.hashes - val byHash: Map[MD5Hash, RemoteKey] = - hashes - .map { - case (hashType, md5Hash) => - (md5Hash, remoteObjects.byHash.get(md5Hash)) - } - .flatMap { - case (md5Hash, Some(maybeyKey)) => Some((md5Hash, maybeyKey)) - case (_, None) => None - } - - (byKey, byHash) - } - -} diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/SequencePlan.scala b/lib/src/main/scala/net/kemitix/thorp/lib/SequencePlan.scala deleted file mode 100644 index e6e4904..0000000 --- a/lib/src/main/scala/net/kemitix/thorp/lib/SequencePlan.scala +++ /dev/null @@ -1,17 +0,0 @@ -package net.kemitix.thorp.lib - -import net.kemitix.thorp.domain.Action -import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToDelete, ToUpload} - -trait SequencePlan { - - def order: Action => Int = { - case _: DoNothing => 0 - case _: ToCopy => 1 - case _: ToUpload => 2 - case _: ToDelete => 3 - } - -} - -object SequencePlan extends SequencePlan diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/SyncLogging.scala b/lib/src/main/scala/net/kemitix/thorp/lib/SyncLogging.scala deleted file mode 100644 index 112a665..0000000 --- a/lib/src/main/scala/net/kemitix/thorp/lib/SyncLogging.scala +++ /dev/null @@ -1,18 +0,0 @@ -package net.kemitix.thorp.lib - -import net.kemitix.thorp.config.Config -import net.kemitix.thorp.console._ -import zio.ZIO - -trait SyncLogging { - - def logFileScan: ZIO[Config with Console, Nothing, Unit] = - for { - sources <- Config.sources - _ <- Console.putStrLn( - s"Scanning local files: ${sources.paths.mkString(", ")}...") - } yield () - -} - -object SyncLogging extends SyncLogging diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/SyncPlan.scala b/lib/src/main/scala/net/kemitix/thorp/lib/SyncPlan.scala deleted file mode 100644 index 8d6d028..0000000 --- a/lib/src/main/scala/net/kemitix/thorp/lib/SyncPlan.scala +++ /dev/null @@ -1,14 +0,0 @@ -package net.kemitix.thorp.lib - -import net.kemitix.thorp.domain.{Action, SyncTotals} - -final case class SyncPlan private ( - actions: LazyList[Action], - syncTotals: SyncTotals -) - -object SyncPlan { - val empty: SyncPlan = SyncPlan(LazyList.empty, SyncTotals.empty) - def create(actions: LazyList[Action], syncTotals: SyncTotals): SyncPlan = - SyncPlan(actions, syncTotals) -} diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/ThorpArchive.scala b/lib/src/main/scala/net/kemitix/thorp/lib/ThorpArchive.scala index a9e168d..ff8c015 100644 --- a/lib/src/main/scala/net/kemitix/thorp/lib/ThorpArchive.scala +++ b/lib/src/main/scala/net/kemitix/thorp/lib/ThorpArchive.scala @@ -8,8 +8,8 @@ import net.kemitix.thorp.console.ConsoleOut.{ UploadComplete } import net.kemitix.thorp.console._ -import net.kemitix.thorp.domain.StorageQueueEvent -import net.kemitix.thorp.domain.StorageQueueEvent._ +import net.kemitix.thorp.domain.StorageEvent +import net.kemitix.thorp.domain.StorageEvent._ import net.kemitix.thorp.storage.Storage import zio.{RIO, ZIO} @@ -18,29 +18,28 @@ trait ThorpArchive { def update( sequencedAction: SequencedAction, totalBytesSoFar: Long - ): ZIO[Storage with Config, Nothing, StorageQueueEvent] + ): ZIO[Storage with Config, Nothing, StorageEvent] - def logEvent( - event: StorageQueueEvent): RIO[Console with Config, StorageQueueEvent] = + def logEvent(event: StorageEvent): RIO[Console with Config, StorageEvent] = for { batchMode <- Config.batchMode sqe <- event match { - case UploadQueueEvent(remoteKey, _) => + case UploadEvent(remoteKey, _) => ZIO(event) <* Console.putMessageLnB(UploadComplete(remoteKey), batchMode) - case CopyQueueEvent(sourceKey, targetKey) => + case CopyEvent(sourceKey, targetKey) => ZIO(event) <* Console.putMessageLnB( CopyComplete(sourceKey, targetKey), batchMode) - case DeleteQueueEvent(remoteKey) => + case DeleteEvent(remoteKey) => ZIO(event) <* Console.putMessageLnB(DeleteComplete(remoteKey), batchMode) - case ErrorQueueEvent(action, _, e) => + case ErrorEvent(action, _, e) => ZIO(event) <* Console.putMessageLnB( ErrorQueueEventOccurred(action, e), batchMode) - case DoNothingQueueEvent(_) => ZIO(event) - case ShutdownQueueEvent() => ZIO(event) + case DoNothingEvent(_) => ZIO(event) + case ShutdownEvent() => ZIO(event) } } yield sqe diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/UnversionedMirrorArchive.scala b/lib/src/main/scala/net/kemitix/thorp/lib/UnversionedMirrorArchive.scala index 154e923..e87c39b 100644 --- a/lib/src/main/scala/net/kemitix/thorp/lib/UnversionedMirrorArchive.scala +++ b/lib/src/main/scala/net/kemitix/thorp/lib/UnversionedMirrorArchive.scala @@ -2,7 +2,7 @@ package net.kemitix.thorp.lib import net.kemitix.thorp.config.Config import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToDelete, ToUpload} -import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent +import net.kemitix.thorp.domain.StorageEvent.DoNothingEvent import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.Storage import zio.{UIO, ZIO} @@ -12,7 +12,7 @@ trait UnversionedMirrorArchive extends ThorpArchive { override def update( sequencedAction: SequencedAction, totalBytesSoFar: Long - ): ZIO[Storage with Config, Nothing, StorageQueueEvent] = + ): ZIO[Storage with Config, Nothing, StorageEvent] = sequencedAction match { case SequencedAction(ToUpload(bucket, localFile, _), index) => doUpload(index, totalBytesSoFar, bucket, localFile) @@ -21,7 +21,7 @@ trait UnversionedMirrorArchive extends ThorpArchive { case SequencedAction(ToDelete(bucket, remoteKey, _), _) => Storage.delete(bucket, remoteKey) case SequencedAction(DoNothing(_, remoteKey, _), _) => - UIO(DoNothingQueueEvent(remoteKey)) + UIO(DoNothingEvent(remoteKey)) } private def doUpload( diff --git a/lib/src/test/scala/net/kemitix/thorp/lib/ActionGeneratorSuite.scala b/lib/src/test/scala/net/kemitix/thorp/lib/ActionGeneratorSuite.scala deleted file mode 100644 index bf845d5..0000000 --- a/lib/src/test/scala/net/kemitix/thorp/lib/ActionGeneratorSuite.scala +++ /dev/null @@ -1,207 +0,0 @@ -package net.kemitix.thorp.lib - -import net.kemitix.thorp.config._ -import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToUpload} -import net.kemitix.thorp.domain.HashType.MD5 -import net.kemitix.thorp.domain._ -import net.kemitix.thorp.filesystem.{FileSystem, Resource} -import org.scalatest.FunSpec -import zio.DefaultRuntime - -class ActionGeneratorSuite extends FunSpec { - private val source = Resource(this, "upload") - private val sourcePath = source.toPath - private val sources = Sources(List(sourcePath)) - private val prefix = RemoteKey("prefix") - private val bucket = Bucket("bucket") - private val configOptions = ConfigOptions( - List[ConfigOption]( - ConfigOption.Bucket("bucket"), - ConfigOption.Prefix("prefix"), - ConfigOption.Source(sourcePath), - ConfigOption.IgnoreUserOptions, - ConfigOption.IgnoreGlobalOptions - )) - - describe("create actions") { - - val previousActions = LazyList.empty[Action] - - describe("#1 local exists, remote exists, remote matches - do nothing") { - val theHash = MD5Hash("the-hash") - val env = for { - theFile <- LocalFileValidator.resolve("the-file", - md5HashMap(theHash), - sourcePath, - sources, - prefix) - theRemoteMetadata = RemoteMetaData(theFile.remoteKey, theHash) - input = MatchedMetadata( - theFile, // local exists - matchByHash = Some(theRemoteMetadata), // remote matches - matchByKey = Some(theRemoteMetadata) // remote exists - ) - } yield (theFile, input) - it("do nothing") { - env.map({ - case (theFile, input) => { - val expected = - Right(LazyList( - DoNothing(bucket, theFile.remoteKey, theFile.file.length + 1))) - val result = invoke(input, previousActions) - assertResult(expected)(result) - } - }) - } - } - describe("#2 local exists, remote is missing, other matches - copy") { - val theHash = MD5Hash("the-hash") - val env = for { - theFile <- LocalFileValidator.resolve("the-file", - md5HashMap(theHash), - sourcePath, - sources, - prefix) - theRemoteKey = theFile.remoteKey - otherRemoteKey = RemoteKey.resolve("other-key")(prefix) - otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash) - input = MatchedMetadata( - theFile, // local exists - matchByHash = Some(otherRemoteMetadata), // other matches - matchByKey = None) // remote is missing - } yield (theFile, theRemoteKey, input, otherRemoteKey) - it("copy from other key") { - env.map({ - case (theFile, theRemoteKey, input, otherRemoteKey) => { - val expected = Right( - LazyList( - ToCopy(bucket, - otherRemoteKey, - theHash, - theRemoteKey, - theFile.file.length))) // copy - val result = invoke(input, previousActions) - assertResult(expected)(result) - } - }) - } - describe("#3 local exists, remote is missing, other no matches - upload") { - val theHash = MD5Hash("the-hash") - val env = for { - theFile <- LocalFileValidator.resolve("the-file", - md5HashMap(theHash), - sourcePath, - sources, - prefix) - input = MatchedMetadata(theFile, // local exists - matchByHash = None, // other no matches - matchByKey = None) // remote is missing - } yield (theFile, input) - it("upload") { - env.map({ - case (theFile, input) => { - val expected = Right(LazyList( - ToUpload(bucket, theFile, theFile.file.length))) // upload - val result = invoke(input, previousActions) - assertResult(expected)(result) - } - }) - } - } - } - describe( - "#4 local exists, remote exists, remote no match, other matches - copy") { - val theHash = MD5Hash("the-hash") - val env = for { - theFile <- LocalFileValidator.resolve("the-file", - md5HashMap(theHash), - sourcePath, - sources, - prefix) - theRemoteKey = theFile.remoteKey - oldHash = MD5Hash("old-hash") - otherRemoteKey = RemoteKey.resolve("other-key")(prefix) - otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash) - oldRemoteMetadata = RemoteMetaData(theRemoteKey, - hash = oldHash // remote no match - ) - input = MatchedMetadata( - theFile, // local exists - matchByHash = Some(otherRemoteMetadata), // other matches - matchByKey = Some(oldRemoteMetadata)) // remote exists - } yield (theFile, theRemoteKey, input, otherRemoteKey) - it("copy from other key") { - env.map({ - case (theFile, theRemoteKey, input, otherRemoteKey) => { - val expected = Right( - LazyList( - ToCopy(bucket, - otherRemoteKey, - theHash, - theRemoteKey, - theFile.file.length))) // copy - val result = invoke(input, previousActions) - assertResult(expected)(result) - } - }) - } - } - describe( - "#5 local exists, remote exists, remote no match, other no matches - upload") { - val theHash = MD5Hash("the-hash") - val env = for { - theFile <- LocalFileValidator.resolve("the-file", - md5HashMap(theHash), - sourcePath, - sources, - prefix) - theRemoteKey = theFile.remoteKey - oldHash = MD5Hash("old-hash") - theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash) - input = MatchedMetadata( - theFile, // local exists - matchByHash = None, // remote no match, other no match - matchByKey = Some(theRemoteMetadata) // remote exists - ) - } yield (theFile, input) - it("upload") { - env.map({ - case (theFile, input) => { - val expected = Right(LazyList( - ToUpload(bucket, theFile, theFile.file.length))) // upload - val result = invoke(input, previousActions) - assertResult(expected)(result) - } - }) - } - } - describe("#6 local missing, remote exists - delete") { - it("TODO") { - pending - } - } - } - - private def md5HashMap(theHash: MD5Hash): Map[HashType, MD5Hash] = { - Map(MD5 -> theHash) - } - - private def invoke( - input: MatchedMetadata, - previousActions: LazyList[Action] - ) = { - type TestEnv = Config with FileSystem - val testEnv: TestEnv = new Config.Live with FileSystem.Live {} - - def testProgram = - for { - config <- ConfigurationBuilder.buildConfig(configOptions) - _ <- Config.set(config) - actions <- ActionGenerator.createActions(input, previousActions) - } yield actions - - new DefaultRuntime {}.unsafeRunSync { - testProgram.provide(testEnv) - }.toEither - } -} diff --git a/lib/src/test/scala/net/kemitix/thorp/lib/DummyStorageService.scala b/lib/src/test/scala/net/kemitix/thorp/lib/DummyStorageService.scala deleted file mode 100644 index 95a5655..0000000 --- a/lib/src/test/scala/net/kemitix/thorp/lib/DummyStorageService.scala +++ /dev/null @@ -1,42 +0,0 @@ -package net.kemitix.thorp.lib - -import java.io.File - -import net.kemitix.thorp.domain._ -import net.kemitix.thorp.storage.Storage -import zio.{RIO, UIO} - -final case class DummyStorageService( - remoteObjects: RemoteObjects, - uploadFiles: Map[File, (RemoteKey, MD5Hash)]) - extends Storage.Service { - - override def shutdown: UIO[StorageQueueEvent] = - UIO(StorageQueueEvent.ShutdownQueueEvent()) - - override def listObjects( - bucket: Bucket, - prefix: RemoteKey - ): RIO[Storage, RemoteObjects] = - RIO(remoteObjects) - - override def upload( - localFile: LocalFile, - bucket: Bucket, - uploadEventListener: UploadEventListener.Settings, - ): UIO[StorageQueueEvent] = { - val (remoteKey, md5Hash) = uploadFiles(localFile.file) - UIO(StorageQueueEvent.UploadQueueEvent(remoteKey, md5Hash)) - } - - override def copy(bucket: Bucket, - sourceKey: RemoteKey, - hash: MD5Hash, - targetKey: RemoteKey): UIO[StorageQueueEvent] = - UIO(StorageQueueEvent.CopyQueueEvent(sourceKey, targetKey)) - - override def delete(bucket: Bucket, - remoteKey: RemoteKey): UIO[StorageQueueEvent] = - UIO(StorageQueueEvent.DeleteQueueEvent(remoteKey)) - -} diff --git a/lib/src/test/scala/net/kemitix/thorp/lib/EIPTest.scala b/lib/src/test/scala/net/kemitix/thorp/lib/EIPTest.scala deleted file mode 100644 index 6a14e90..0000000 --- a/lib/src/test/scala/net/kemitix/thorp/lib/EIPTest.scala +++ /dev/null @@ -1,50 +0,0 @@ -package net.kemitix.thorp.lib - -import org.scalatest.FreeSpec -import zio.stream._ -import zio.{DefaultRuntime, IO, UIO, ZIO} - -// Experiment on how to trigger events asynchronously -// i.e. Have one thread add to a queue and another take from the queue, neither waiting for the other thread to finish -class EIPTest extends FreeSpec { - - "queue" in { - type Callback[A] = IO[Option[Throwable], A] => Unit - def offerInt(cb: Callback[Int], i: Int) = ZIO(cb(IO.succeed(i))) - def closeStream(cb: Callback[Int]) = ZIO(cb(IO.fail(None))) - def publish: Callback[Int] => IO[Throwable, _] = - cb => - ZIO.foreach(1 to 3) { i => - ZIO { - println(s"put $i") - Thread.sleep(100) - } *> offerInt(cb, i) - } *> closeStream(cb) - val program = Stream - .effectAsyncM(publish) - .mapM(i => ZIO(println(s"get $i"))) - - new DefaultRuntime {}.unsafeRunSync(program.runDrain) - } - - "EIP: Message Channel" in { - type Message = Int - type Callback = IO[Option[Throwable], Message] => Unit - def producer: Callback => UIO[Unit] = - cb => - ZIO.foreach(1 to 3)(message => - UIO { - println(s"put $message") - cb(ZIO.succeed(message)) - Thread.sleep(100) - }) *> UIO(cb(ZIO.fail(None))) - def consumer: Message => ZIO[Any, Throwable, Unit] = - message => ZIO(println(s"got $message")) - val program = zio.stream.Stream - .effectAsyncM(producer) - .buffer(1) - .mapM(consumer) - new DefaultRuntime {}.unsafeRunSync(program.runDrain) - } - -} diff --git a/lib/src/test/scala/net/kemitix/thorp/lib/KeyGeneratorSuite.scala b/lib/src/test/scala/net/kemitix/thorp/lib/KeyGeneratorSuite.scala deleted file mode 100644 index 0de7a11..0000000 --- a/lib/src/test/scala/net/kemitix/thorp/lib/KeyGeneratorSuite.scala +++ /dev/null @@ -1,45 +0,0 @@ -package net.kemitix.thorp.lib - -import java.io.File -import java.nio.file.Path - -import net.kemitix.thorp.domain.{RemoteKey, Sources} -import net.kemitix.thorp.filesystem.Resource -import org.scalatest.FunSpec -import zio.DefaultRuntime - -class KeyGeneratorSuite extends FunSpec { - - private val source: File = Resource(this, "upload") - private val sourcePath = source.toPath - private val prefix = RemoteKey("prefix") - private val sources = Sources(List(sourcePath)) - - describe("key generator") { - - describe("when file is within source") { - it("has a valid key") { - val subdir = "subdir" - val expected = Right(RemoteKey(s"${prefix.key}/$subdir")) - val result = invoke(sourcePath.resolve(subdir)) - assertResult(expected)(result) - } - } - - describe("when file is deeper within source") { - it("has a valid key") { - val subdir = "subdir/deeper/still" - val expected = Right(RemoteKey(s"${prefix.key}/$subdir")) - val result = invoke(sourcePath.resolve(subdir)) - assertResult(expected)(result) - } - } - - def invoke(path: Path) = { - new DefaultRuntime {}.unsafeRunSync { - KeyGenerator.generateKey(sources, prefix)(path) - }.toEither - } - } - -} diff --git a/lib/src/test/scala/net/kemitix/thorp/lib/LocalFileStreamSuite.scala b/lib/src/test/scala/net/kemitix/thorp/lib/LocalFileStreamSuite.scala deleted file mode 100644 index 5b10ec1..0000000 --- a/lib/src/test/scala/net/kemitix/thorp/lib/LocalFileStreamSuite.scala +++ /dev/null @@ -1,92 +0,0 @@ -package net.kemitix.thorp.lib - -import java.nio.file.Paths - -import net.kemitix.thorp.config.{ - Config, - ConfigOption, - ConfigOptions, - ConfigurationBuilder -} -import net.kemitix.thorp.console._ -import net.kemitix.thorp.domain.HashType.MD5 -import net.kemitix.thorp.domain._ -import net.kemitix.thorp.filesystem.{FileSystem, Hasher, Resource} -import net.kemitix.thorp.storage.Storage -import org.scalatest.FunSpec -import zio.{DefaultRuntime, Task, UIO} - -class LocalFileStreamSuite extends FunSpec { - - private val source = Resource(this, "upload") - private val sourcePath = source.toPath - - private def file(filename: String) = - sourcePath.resolve(Paths.get(filename)) - - describe("findFiles") { - it("should find all files") { - val expected = Right(Set("subdir/leaf-file", "root-file")) - val result = - invoke() - .map(_.localFiles) - .map(_.map(LocalFile.relativeToSource(_).toFile.getPath)) - .map(_.toSet) - assertResult(expected)(result) - } - it("should count all files") { - val expected = Right(2) - val result = invoke().map(_.count) - assertResult(expected)(result) - } - it("should sum the size of all files") { - val expected = Right(113) - val result = invoke().map(_.totalSizeBytes) - assertResult(expected)(result) - } - } - - private def invoke() = { - type TestEnv = Storage - with Console - with Config - with FileSystem - with Hasher.Test - val testEnv: TestEnv = new Storage.Test with Console.Test with Config.Live - with FileSystem.Live with Hasher.Test { - override def listResult: Task[RemoteObjects] = - Task.die(new NotImplementedError) - override def uploadResult: UIO[StorageQueueEvent] = - Task.die(new NotImplementedError) - override def copyResult: UIO[StorageQueueEvent] = - Task.die(new NotImplementedError) - override def deleteResult: UIO[StorageQueueEvent] = - Task.die(new NotImplementedError) - override def shutdownResult: UIO[StorageQueueEvent] = - Task.die(new NotImplementedError) - } - testEnv.hashes.set( - Map( - file("root-file") -> Map(MD5 -> MD5HashData.Root.hash), - file("subdir/leaf-file") -> Map(MD5 -> MD5HashData.Leaf.hash) - )) - val configOptions = ConfigOptions( - List[ConfigOption]( - ConfigOption.IgnoreGlobalOptions, - ConfigOption.IgnoreUserOptions, - ConfigOption.Source(sourcePath), - ConfigOption.Bucket("aBucket") - )) - def testProgram = - for { - config <- ConfigurationBuilder.buildConfig(configOptions) - _ <- Config.set(config) - files <- LocalFileStream.findFiles(sourcePath) - } yield files - - new DefaultRuntime {}.unsafeRunSync { - testProgram.provide(testEnv) - }.toEither - } - -} diff --git a/lib/src/test/scala/net/kemitix/thorp/lib/MatchedMetadataEnricherSuite.scala b/lib/src/test/scala/net/kemitix/thorp/lib/MatchedMetadataEnricherSuite.scala deleted file mode 100644 index 6245662..0000000 --- a/lib/src/test/scala/net/kemitix/thorp/lib/MatchedMetadataEnricherSuite.scala +++ /dev/null @@ -1,310 +0,0 @@ -package net.kemitix.thorp.lib - -import net.kemitix.thorp.lib.S3MetaDataEnricher.{getMetadata, getS3Status} -import net.kemitix.thorp.domain.HashType.MD5 -import net.kemitix.thorp.domain._ -import net.kemitix.thorp.filesystem.Resource -import org.scalatest.FunSpec - -import scala.collection.MapView - -class MatchedMetadataEnricherSuite extends FunSpec { - private val source = Resource(this, "upload") - private val sourcePath = source.toPath - private val sources = Sources(List(sourcePath)) - private val prefix = RemoteKey("prefix") - - def getMatchesByKey( - status: (Option[MD5Hash], Map[MD5Hash, RemoteKey])): Option[MD5Hash] = { - val (byKey, _) = status - byKey - } - - describe("enrich with metadata") { - - describe( - "#1a local exists, remote exists, remote matches, other matches - do nothing") { - val theHash: MD5Hash = MD5Hash("the-file-hash") - val env = for { - theFile <- LocalFileValidator.resolve("the-file", - md5HashMap(theHash), - sourcePath, - sources, - prefix) - theRemoteKey = theFile.remoteKey - remoteObjects = RemoteObjects( - byHash = MapView(theHash -> theRemoteKey), - byKey = MapView(theRemoteKey -> theHash) - ) - theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash) - } yield (theFile, theRemoteMetadata, remoteObjects) - it("generates valid metadata") { - env.map({ - case (theFile, theRemoteMetadata, remoteObjects) => { - val expected = - MatchedMetadata(theFile, - matchByHash = Some(theRemoteMetadata), - matchByKey = Some(theRemoteMetadata)) - val result = getMetadata(theFile, remoteObjects) - assertResult(expected)(result) - } - }) - } - } - describe( - "#1b local exists, remote exists, remote matches, other no matches - do nothing") { - val theHash: MD5Hash = MD5Hash("the-file-hash") - val env = for { - theFile <- LocalFileValidator.resolve("the-file", - md5HashMap(theHash), - sourcePath, - sources, - prefix) - theRemoteKey: RemoteKey = RemoteKey.resolve("the-file")(prefix) - remoteObjects = RemoteObjects( - byHash = MapView(theHash -> theRemoteKey), - byKey = MapView(theRemoteKey -> theHash) - ) - theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash) - } yield (theFile, theRemoteMetadata, remoteObjects) - it("generates valid metadata") { - env.map({ - case (theFile, theRemoteMetadata, remoteObjects) => { - val expected = - MatchedMetadata(theFile, - matchByHash = Some(theRemoteMetadata), - matchByKey = Some(theRemoteMetadata)) - val result = getMetadata(theFile, remoteObjects) - assertResult(expected)(result) - } - }) - } - } - describe( - "#2 local exists, remote is missing, remote no match, other matches - copy") { - val theHash = MD5Hash("the-hash") - val env = for { - theFile <- LocalFileValidator.resolve("the-file", - md5HashMap(theHash), - sourcePath, - sources, - prefix) - otherRemoteKey = RemoteKey("other-key") - remoteObjects = RemoteObjects( - byHash = MapView(theHash -> otherRemoteKey), - byKey = MapView(otherRemoteKey -> theHash) - ) - otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash) - } yield (theFile, otherRemoteMetadata, remoteObjects) - it("generates valid metadata") { - env.map({ - case (theFile, otherRemoteMetadata, remoteObjects) => { - val expected = MatchedMetadata(theFile, - matchByHash = - Some(otherRemoteMetadata), - matchByKey = None) - val result = getMetadata(theFile, remoteObjects) - assertResult(expected)(result) - } - }) - } - } - describe( - "#3 local exists, remote is missing, remote no match, other no matches - upload") { - val theHash = MD5Hash("the-hash") - val env = for { - theFile <- LocalFileValidator.resolve("the-file", - md5HashMap(theHash), - sourcePath, - sources, - prefix) - remoteObjects = RemoteObjects.empty - } yield (theFile, remoteObjects) - it("generates valid metadata") { - env.map({ - case (theFile, remoteObjects) => { - val expected = - MatchedMetadata(theFile, matchByHash = None, matchByKey = None) - val result = getMetadata(theFile, remoteObjects) - assertResult(expected)(result) - } - }) - } - } - describe( - "#4 local exists, remote exists, remote no match, other matches - copy") { - val theHash = MD5Hash("the-hash") - val env = for { - theFile <- LocalFileValidator.resolve("the-file", - md5HashMap(theHash), - sourcePath, - sources, - prefix) - theRemoteKey = theFile.remoteKey - oldHash = MD5Hash("old-hash") - otherRemoteKey = RemoteKey.resolve("other-key")(prefix) - remoteObjects = RemoteObjects( - byHash = MapView(oldHash -> theRemoteKey, theHash -> otherRemoteKey), - byKey = MapView( - theRemoteKey -> oldHash, - otherRemoteKey -> theHash - ) - ) - theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash) - otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash) - } yield (theFile, theRemoteMetadata, otherRemoteMetadata, remoteObjects) - it("generates valid metadata") { - env.map({ - case (theFile, - theRemoteMetadata, - otherRemoteMetadata, - remoteObjects) => { - val expected = MatchedMetadata(theFile, - matchByHash = - Some(otherRemoteMetadata), - matchByKey = Some(theRemoteMetadata)) - val result = getMetadata(theFile, remoteObjects) - assertResult(expected)(result) - } - }) - } - } - describe( - "#5 local exists, remote exists, remote no match, other no matches - upload") { - val theHash = MD5Hash("the-hash") - val env = for { - theFile <- LocalFileValidator.resolve("the-file", - md5HashMap(theHash), - sourcePath, - sources, - prefix) - theRemoteKey = theFile.remoteKey - oldHash = MD5Hash("old-hash") - remoteObjects = RemoteObjects( - byHash = MapView(oldHash -> theRemoteKey), - byKey = MapView(theRemoteKey -> oldHash) - ) - theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash) - } yield (theFile, theRemoteMetadata, remoteObjects) - it("generates valid metadata") { - env.map({ - case (theFile, theRemoteMetadata, remoteObjects) => { - val expected = MatchedMetadata(theFile, - matchByHash = None, - matchByKey = Some(theRemoteMetadata)) - val result = getMetadata(theFile, remoteObjects) - assertResult(expected)(result) - } - }) - } - } - } - - private def md5HashMap(theHash: MD5Hash): Map[HashType, MD5Hash] = { - Map(MD5 -> theHash) - } - - describe("getS3Status") { - val hash = MD5Hash("hash") - val env = for { - localFile <- LocalFileValidator.resolve("the-file", - md5HashMap(hash), - sourcePath, - sources, - prefix) - key = localFile.remoteKey - keyOtherKey <- LocalFileValidator.resolve("other-key-same-hash", - md5HashMap(hash), - sourcePath, - sources, - prefix) - diffHash = MD5Hash("diff") - keyDiffHash <- LocalFileValidator.resolve("other-key-diff-hash", - md5HashMap(diffHash), - sourcePath, - sources, - prefix) - remoteObjects = RemoteObjects( - byHash = MapView( - hash -> key, - diffHash -> keyDiffHash.remoteKey - ), - byKey = MapView( - key -> hash, - keyOtherKey.remoteKey -> hash, - keyDiffHash.remoteKey -> diffHash - ) - ) - } yield (remoteObjects, localFile, keyDiffHash, diffHash) - - def invoke(localFile: LocalFile, s3ObjectsData: RemoteObjects) = { - getS3Status(localFile, s3ObjectsData) - } - - describe("when remote key exists") { - it("should return a result for matching key") { - env.map({ - case (remoteObjects, localFile: LocalFile, _, _) => - val result = getMatchesByKey(invoke(localFile, remoteObjects)) - assert(result.contains(hash)) - }) - } - } - - describe("when remote key does not exist and no others matches hash") { - val env2 = for { - localFile <- LocalFileValidator.resolve("missing-remote", - md5HashMap(MD5Hash("unique")), - sourcePath, - sources, - prefix) - } yield (localFile) - it("should return no matches by key") { - env.map({ - case (remoteObjects, _, _, _) => { - env2.map({ - case (localFile) => { - val result = getMatchesByKey(invoke(localFile, remoteObjects)) - assert(result.isEmpty) - } - }) - } - }) - } - it("should return no matches by hash") { - env.map({ - case (remoteObjects, _, _, _) => { - env2.map({ - case (localFile) => { - val result = { - val (_, byHash) = invoke(localFile, remoteObjects) - byHash - } - assert(result.isEmpty) - } - }) - } - }) - } - } - - describe("when remote key exists and no others match hash") { - val _ = env.map({ - case (remoteObjects, _, keyDiffHash, diffHash) => { - it("should return match by key") { - val result = getMatchesByKey(invoke(keyDiffHash, remoteObjects)) - assert(result.contains(diffHash)) - } - it("should return only itself in match by hash") { - val result = { - val (_, byHash) = invoke(keyDiffHash, remoteObjects) - byHash - } - assert(result === Set((keyDiffHash.remoteKey, diffHash))) - } - } - }) - } - } - -} diff --git a/lib/src/test/scala/net/kemitix/thorp/lib/PlanBuilderTest.scala b/lib/src/test/scala/net/kemitix/thorp/lib/PlanBuilderTest.scala deleted file mode 100644 index b1b1b11..0000000 --- a/lib/src/test/scala/net/kemitix/thorp/lib/PlanBuilderTest.scala +++ /dev/null @@ -1,397 +0,0 @@ -package net.kemitix.thorp.lib - -import java.io.File -import java.nio.file.Path - -import net.kemitix.thorp.config.{ - Config, - ConfigOption, - ConfigOptions, - ConfigurationBuilder -} -import net.kemitix.thorp.console._ -import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToDelete, ToUpload} -import net.kemitix.thorp.domain.HashType.MD5 -import net.kemitix.thorp.domain._ -import net.kemitix.thorp.filesystem.{Hasher, _} -import net.kemitix.thorp.storage.Storage -import org.scalatest.FreeSpec -import zio.{DefaultRuntime, Task, UIO} - -import scala.collection.MapView - -class PlanBuilderTest extends FreeSpec with TemporaryFolder { - - private val emptyRemoteObjects = RemoteObjects.empty - - "create a plan" - { - - "one source" - { - val options: Path => ConfigOptions = - source => - configOptions(ConfigOption.Source(source), - ConfigOption.Bucket("a-bucket"), - ConfigOption.IgnoreUserOptions, - ConfigOption.IgnoreGlobalOptions) - "a file" - { - val filename = "aFile" - val remoteKey = RemoteKey(filename) - "with no matching remote key" - { - "with no other remote key with matching hash" - { - "upload file" in { - withDirectory(source => { - val file = createFile(source, filename, "file-content") - val hash = md5Hash(file) - val expected = - Right(List(toUpload(remoteKey, hash, source, file))) - val result = - invoke(options(source), - UIO.succeed(emptyRemoteObjects), - UIO.succeed(Map(file.toPath -> file))) - assertResult(expected)(result) - }) - } - } - "with another remote key with matching hash" - { - "copy file" in { - withDirectory(source => { - val anOtherFilename = "other" - val content = "file-content" - val aFile = createFile(source, filename, content) - val anOtherFile = createFile(source, anOtherFilename, content) - val aHash = md5Hash(aFile) - val anOtherKey = RemoteKey("other") - val expected = Right(List(toCopy(anOtherKey, aHash, remoteKey))) - val remoteObjects = RemoteObjects( - byHash = MapView(aHash -> anOtherKey), - byKey = MapView(anOtherKey -> aHash) - ) - val result = - invoke(options(source), - UIO.succeed(remoteObjects), - UIO.succeed(Map(aFile.toPath -> aFile, - anOtherFile.toPath -> anOtherFile))) - assertResult(expected)(result) - }) - } - } - } - "with matching remote key" - { - "with matching hash" - { - "do nothing" in { - withDirectory(source => { - val file = createFile(source, filename, "file-content") - val hash = md5Hash(file) - // DoNothing actions should have been filtered out of the plan - val expected = Right(List()) - val remoteObjects = RemoteObjects( - byHash = MapView(hash -> remoteKey), - byKey = MapView(remoteKey -> hash) - ) - val result = - invoke(options(source), - UIO.succeed(remoteObjects), - UIO.succeed(Map(file.toPath -> file))) - assertResult(expected)(result) - }) - } - } - "with different hash" - { - "with no matching remote hash" - { - "upload file" in { - withDirectory(source => { - val file = createFile(source, filename, "file-content") - val currentHash = md5Hash(file) - val originalHash = MD5Hash("original-file-content") - val expected = - Right(List(toUpload(remoteKey, currentHash, source, file))) - val remoteObjects = RemoteObjects( - byHash = MapView(originalHash -> remoteKey), - byKey = MapView(remoteKey -> originalHash) - ) - val result = - invoke(options(source), - UIO.succeed(remoteObjects), - UIO.succeed(Map(file.toPath -> file))) - assertResult(expected)(result) - }) - } - } - "with matching remote hash" - { - "copy file" in { - withDirectory(source => { - val file = createFile(source, filename, "file-content") - val hash = md5Hash(file) - val sourceKey = RemoteKey("other-key") - val expected = Right(List(toCopy(sourceKey, hash, remoteKey))) - val remoteObjects = RemoteObjects( - byHash = MapView(hash -> sourceKey), - byKey = MapView.empty - ) - val result = - invoke(options(source), - UIO.succeed(remoteObjects), - UIO.succeed(Map(file.toPath -> file))) - assertResult(expected)(result) - }) - } - } - } - } - } - "a remote key" - { - val filename = "aFile" - val remoteKey = RemoteKey(filename) - "with a matching local file" - { - "do nothing" in { - withDirectory(source => { - val file = createFile(source, filename, "file-content") - val hash = md5Hash(file) - // DoNothing actions should have been filtered out of the plan - val expected = Right(List()) - val remoteObjects = RemoteObjects( - byHash = MapView(hash -> remoteKey), - byKey = MapView(remoteKey -> hash) - ) - val result = - invoke(options(source), - UIO.succeed(remoteObjects), - UIO.succeed(Map(file.toPath -> file))) - assertResult(expected)(result) - }) - } - } - "with no matching local file" - { - "delete remote key" in { - withDirectory(source => { - val hash = MD5Hash("file-content") - val expected = Right(List(toDelete(remoteKey))) - val remoteObjects = RemoteObjects( - byHash = MapView(hash -> remoteKey), - byKey = MapView(remoteKey -> hash) - ) - val result = - invoke(options(source), - UIO.succeed(remoteObjects), - UIO.succeed(Map.empty)) - assertResult(expected)(result) - }) - } - } - } - } - - "two sources" - { - val filename1 = "file-1" - val filename2 = "file-2" - val remoteKey1 = RemoteKey(filename1) - val remoteKey2 = RemoteKey(filename2) - val options: Path => Path => ConfigOptions = - source1 => - source2 => - configOptions(ConfigOption.Source(source1), - ConfigOption.Source(source2), - ConfigOption.Bucket("a-bucket")) - "unique files in both" - { - "upload all files" in { - withDirectory(firstSource => { - val fileInFirstSource = - createFile(firstSource, filename1, "file-1-content") - val hash1 = md5Hash(fileInFirstSource) - withDirectory(secondSource => { - val fileInSecondSource = - createFile(secondSource, filename2, "file-2-content") - val hash2 = md5Hash(fileInSecondSource) - val expected = Right( - Set( - toUpload(remoteKey2, hash2, secondSource, fileInSecondSource), - toUpload(remoteKey1, hash1, firstSource, fileInFirstSource) - )) - val result = - invoke( - options(firstSource)(secondSource), - UIO.succeed(emptyRemoteObjects), - UIO.succeed( - Map(fileInFirstSource.toPath -> fileInFirstSource, - fileInSecondSource.toPath -> fileInSecondSource)) - ).map(_.toSet) - assertResult(expected)(result) - }) - }) - } - } - "same filename in both" - { - "only upload file in first source" in { - withDirectory(firstSource => { - val fileInFirstSource = - createFile(firstSource, filename1, "file-1-content") - val hash1 = md5Hash(fileInFirstSource) - withDirectory(secondSource => { - val fileInSecondSource = - createFile(secondSource, filename1, "file-2-content") - val hash2 = md5Hash(fileInSecondSource) - val expected = Right(List( - toUpload(remoteKey1, hash1, firstSource, fileInFirstSource))) - val result = - invoke( - options(firstSource)(secondSource), - UIO.succeed(emptyRemoteObjects), - UIO.succeed( - Map(fileInFirstSource.toPath -> fileInFirstSource, - fileInSecondSource.toPath -> fileInSecondSource)) - ) - assertResult(expected)(result) - }) - }) - } - } - "with a remote file only present in second source" - { - "do not delete it " in { - withDirectory(firstSource => { - withDirectory(secondSource => { - val fileInSecondSource = - createFile(secondSource, filename2, "file-2-content") - val hash2 = md5Hash(fileInSecondSource) - val expected = Right(List()) - val remoteObjects = - RemoteObjects(byHash = MapView(hash2 -> remoteKey2), - byKey = MapView(remoteKey2 -> hash2)) - val result = - invoke(options(firstSource)(secondSource), - UIO.succeed(remoteObjects), - UIO.succeed( - Map(fileInSecondSource.toPath -> fileInSecondSource))) - assertResult(expected)(result) - }) - }) - } - } - "with remote file only present in first source" - { - "do not delete it" in { - withDirectory(firstSource => { - val fileInFirstSource: File = - createFile(firstSource, filename1, "file-1-content") - val hash1 = md5Hash(fileInFirstSource) - withDirectory(secondSource => { - val expected = Right(List()) - val remoteObjects = - RemoteObjects(byHash = MapView(hash1 -> remoteKey1), - byKey = MapView(remoteKey1 -> hash1)) - val result = - invoke(options(firstSource)(secondSource), - UIO.succeed(remoteObjects), - UIO.succeed( - Map(fileInFirstSource.toPath -> fileInFirstSource))) - assertResult(expected)(result) - }) - }) - } - } - "with remote file not present in either source" - { - "delete from remote" in { - withDirectory(firstSource => { - withDirectory(secondSource => { - val expected = Right(List(toDelete(remoteKey1))) - val remoteObjects = - RemoteObjects(byHash = MapView.empty, - byKey = MapView(remoteKey1 -> MD5Hash(""))) - val result = - invoke(options(firstSource)(secondSource), - UIO.succeed(remoteObjects), - UIO.succeed(Map.empty)) - assertResult(expected)(result) - }) - }) - } - } - } - - def md5Hash(file: File): MD5Hash = { - object TestEnv extends Hasher.Live with FileSystem.Live - new DefaultRuntime {} - .unsafeRunSync { - Hasher - .hashObject(file.toPath) - .map(_.get(MD5)) - .provide(TestEnv) - } - .toEither - .toOption - .flatten - .getOrElse(MD5Hash("invalid md5 hash in test")) - } - - } - - private def toUpload(remoteKey: RemoteKey, - md5Hash: MD5Hash, - source: Path, - file: File): (String, String, String, String, String) = - ("upload", - remoteKey.key, - MD5Hash.hash(md5Hash), - source.toFile.getPath, - file.toString) - - private def toCopy( - sourceKey: RemoteKey, - md5Hash: MD5Hash, - targetKey: RemoteKey): (String, String, String, String, String) = - ("copy", sourceKey.key, MD5Hash.hash(md5Hash), targetKey.key, "") - - private def toDelete( - remoteKey: RemoteKey): (String, String, String, String, String) = - ("delete", remoteKey.key, "", "", "") - - private def configOptions(configOptions: ConfigOption*): ConfigOptions = - ConfigOptions(List(configOptions: _*)) - - private def invoke( - configOptions: ConfigOptions, - result: Task[RemoteObjects], - files: Task[Map[Path, File]] - ) = { - type TestEnv = Storage with Console with Config with FileSystem with Hasher - val testEnv: TestEnv = new Storage.Test with Console.Test with Config.Live - with FileSystem.Live with Hasher.Live { - override def listResult: Task[RemoteObjects] = result - override def uploadResult: UIO[StorageQueueEvent] = - Task.die(new NotImplementedError) - override def copyResult: UIO[StorageQueueEvent] = - Task.die(new NotImplementedError) - override def deleteResult: UIO[StorageQueueEvent] = - Task.die(new NotImplementedError) - override def shutdownResult: UIO[StorageQueueEvent] = - Task.die(new NotImplementedError) - } - def testProgram = - for { - config <- ConfigurationBuilder.buildConfig(configOptions) - _ <- Config.set(config) - bucket <- Config.bucket - prefix <- Config.prefix - remoteObjects <- Storage.list(bucket, prefix) - plan <- PlanBuilder.createPlan(remoteObjects) - } yield plan - new DefaultRuntime {} - .unsafeRunSync(testProgram.provide(testEnv)) - .toEither - .map(convertResult) - } - - private def convertResult(plan: SyncPlan) = - plan.actions.map({ - case ToUpload(_, lf, _) => - ("upload", - lf.remoteKey.key, - MD5Hash.hash(lf.hashes(MD5)), - lf.source.toString, - lf.file.toString) - case ToDelete(_, remoteKey, _) => ("delete", remoteKey.key, "", "", "") - case ToCopy(_, sourceKey, hash, targetKey, _) => - ("copy", sourceKey.key, MD5Hash.hash(hash), targetKey.key, "") - case DoNothing(_, remoteKey, _) => - ("do-nothing", remoteKey.key, "", "", "") - case _ => ("other", "", "", "", "") - }) -} diff --git a/lib/src/test/scala/net/kemitix/thorp/lib/PlanExecutorTest.scala b/lib/src/test/scala/net/kemitix/thorp/lib/PlanExecutorTest.scala deleted file mode 100644 index dfa7ca6..0000000 --- a/lib/src/test/scala/net/kemitix/thorp/lib/PlanExecutorTest.scala +++ /dev/null @@ -1,57 +0,0 @@ -package net.kemitix.thorp.lib - -import net.kemitix.thorp.config.Config -import net.kemitix.thorp.console.Console -import net.kemitix.thorp.domain.Action.DoNothing -import net.kemitix.thorp.domain.{ - Bucket, - RemoteKey, - StorageQueueEvent, - SyncTotals -} -import net.kemitix.thorp.storage.Storage -import org.scalatest.FreeSpec -import zio.{DefaultRuntime, UIO, ZIO} - -class PlanExecutorTest extends FreeSpec { - - private def subject(in: LazyList[Int]): ZIO[Any, Throwable, LazyList[Int]] = - ZIO.foldLeft(in)(LazyList.empty[Int])((s, i) => ZIO(i #:: s)).map(_.reverse) - - "zio foreach on a stream can be a stream" in { - val input = LazyList.from(1 to 1000000) - val program = subject(input) - val result = new DefaultRuntime {}.unsafeRunSync(program).toEither - assertResult(Right(input))(result) - } - - "build plan with 100,000 actions" in { - val nActions = 100000 - val bucket = Bucket("bucket") - val remoteKey = RemoteKey("remoteKey") - val input = - LazyList.from(1 to nActions).map(DoNothing(bucket, remoteKey, _)) - - val syncTotals = SyncTotals.empty - val archiveTask = UIO(UnversionedMirrorArchive) - - val syncPlan = SyncPlan(input, syncTotals) - val program: ZIO[Storage with Config with Console, - Throwable, - Seq[StorageQueueEvent]] = - archiveTask.flatMap(archive => - PlanExecutor.executePlan(archive, syncPlan)) - - val result: Either[Throwable, Seq[StorageQueueEvent]] = - new DefaultRuntime {}.unsafeRunSync(program.provide(TestEnv)).toEither - - val expected = Right( - LazyList - .from(1 to nActions) - .map(_ => StorageQueueEvent.DoNothingQueueEvent(remoteKey))) - assertResult(expected)(result) - } - - object TestEnv extends Storage.Test with Config.Live with Console.Test - -} diff --git a/lib/src/test/scala/net/kemitix/thorp/lib/SequencePlanTest.scala b/lib/src/test/scala/net/kemitix/thorp/lib/SequencePlanTest.scala deleted file mode 100644 index 597fd13..0000000 --- a/lib/src/test/scala/net/kemitix/thorp/lib/SequencePlanTest.scala +++ /dev/null @@ -1,43 +0,0 @@ -package net.kemitix.thorp.lib - -import java.io.File - -import net.kemitix.thorp.domain.Action._ -import net.kemitix.thorp.domain._ -import org.scalatest.FreeSpec - -class SequencePlanTest extends FreeSpec { - - "sort" - { - "a list of assorted actions" - { - val bucket = Bucket("aBucket") - val remoteKey1 = RemoteKey("remoteKey1") - val remoteKey2 = RemoteKey("targetHash") - val hash = MD5Hash("aHash") - val hashes = Map[HashType, MD5Hash]() - val size = 1024 - val file1 = new File("aFile") - val file2 = new File("aFile") - val source = new File("source") - val localFile1 = - LocalFile(file1, source, hashes, remoteKey1, file1.length) - val _ = - LocalFile(file2, source, hashes, remoteKey2, file2.length) - val copy1 = ToCopy(bucket, remoteKey1, hash, remoteKey2, size) - val copy2 = ToCopy(bucket, remoteKey2, hash, remoteKey1, size) - val upload1 = ToUpload(bucket, localFile1, size) - val upload2 = ToUpload(bucket, localFile1, size) - val delete1 = ToDelete(bucket, remoteKey1, size) - val delete2 = ToDelete(bucket, remoteKey2, size) - "should be in correct order" in { - val actions = - List[Action](copy1, delete1, upload1, delete2, upload2, copy2) - val expected = - List[Action](copy1, copy2, upload1, upload2, delete1, delete2) - val result = actions.sortBy(SequencePlan.order) - assertResult(expected)(result) - } - } - } - -} diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Copier.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Copier.scala index c8fa0c1..d061645 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Copier.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Copier.scala @@ -2,10 +2,10 @@ package net.kemitix.thorp.storage.aws import com.amazonaws.SdkClientException import com.amazonaws.services.s3.model.{CopyObjectRequest, CopyObjectResult} -import net.kemitix.thorp.domain.StorageQueueEvent.{ - Action, - CopyQueueEvent, - ErrorQueueEvent +import net.kemitix.thorp.domain.StorageEvent.{ + ActionSummary, + CopyEvent, + ErrorEvent } import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.aws.S3ClientException.{CopyError, HashError} @@ -13,8 +13,7 @@ import zio.{IO, Task, UIO} trait Copier { - def copy(amazonS3: AmazonS3.Client)( - request: Request): UIO[StorageQueueEvent] = + def copy(amazonS3: AmazonS3.Client)(request: Request): UIO[StorageEvent] = copyObject(amazonS3)(request) .fold(foldFailure(request.sourceKey, request.targetKey), foldSuccess(request.sourceKey, request.targetKey)) @@ -43,9 +42,8 @@ trait Copier { copyRequest.targetKey.key ).withMatchingETagConstraint(MD5Hash.hash(copyRequest.hash)) - private def foldFailure( - sourceKey: RemoteKey, - targetKey: RemoteKey): Throwable => StorageQueueEvent = { + private def foldFailure(sourceKey: RemoteKey, + targetKey: RemoteKey): Throwable => StorageEvent = { case error: SdkClientException => errorEvent(sourceKey, targetKey, error) case error => @@ -55,20 +53,21 @@ trait Copier { private def foldSuccess( sourceKey: RemoteKey, - targetKey: RemoteKey): CopyObjectResult => StorageQueueEvent = + targetKey: RemoteKey): CopyObjectResult => StorageEvent = result => Option(result) match { - case Some(_) => CopyQueueEvent(sourceKey, targetKey) + case Some(_) => CopyEvent(sourceKey, targetKey) case None => errorEvent(sourceKey, targetKey, HashError) } - private def errorEvent: (RemoteKey, RemoteKey, Throwable) => ErrorQueueEvent = + private def errorEvent: (RemoteKey, RemoteKey, Throwable) => ErrorEvent = (sourceKey, targetKey, error) => - ErrorQueueEvent(action(sourceKey, targetKey), targetKey, error) + ErrorEvent(action(sourceKey, targetKey), targetKey, error) - private def action(sourceKey: RemoteKey, targetKey: RemoteKey): Action = - Action.Copy(s"${sourceKey.key} => ${targetKey.key}") + private def action(sourceKey: RemoteKey, + targetKey: RemoteKey): ActionSummary = + ActionSummary.Copy(s"${sourceKey.key} => ${targetKey.key}") } diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Deleter.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Deleter.scala index e8af30e..33c8cc7 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Deleter.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Deleter.scala @@ -1,12 +1,12 @@ package net.kemitix.thorp.storage.aws import com.amazonaws.services.s3.model.DeleteObjectRequest -import net.kemitix.thorp.domain.StorageQueueEvent.{ - Action, - DeleteQueueEvent, - ErrorQueueEvent +import net.kemitix.thorp.domain.StorageEvent.{ + ActionSummary, + DeleteEvent, + ErrorEvent } -import net.kemitix.thorp.domain.{Bucket, RemoteKey, StorageQueueEvent} +import net.kemitix.thorp.domain.{Bucket, RemoteKey, StorageEvent} import zio.{Task, UIO, ZIO} trait Deleter { @@ -14,17 +14,17 @@ trait Deleter { def delete(amazonS3: AmazonS3.Client)( bucket: Bucket, remoteKey: RemoteKey - ): UIO[StorageQueueEvent] = + ): UIO[StorageEvent] = deleteObject(amazonS3)(bucket, remoteKey) .catchAll(e => - UIO(ErrorQueueEvent(Action.Delete(remoteKey.key), remoteKey, e))) + UIO(ErrorEvent(ActionSummary.Delete(remoteKey.key), remoteKey, e))) private def deleteObject(amazonS3: AmazonS3.Client)( bucket: Bucket, remoteKey: RemoteKey - ): Task[StorageQueueEvent] = + ): Task[StorageEvent] = (amazonS3.deleteObject(new DeleteObjectRequest(bucket.name, remoteKey.key)) - *> ZIO(DeleteQueueEvent(remoteKey))) + *> ZIO(DeleteEvent(remoteKey))) } object Deleter extends Deleter diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala index 2fdb5f1..540fd81 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala @@ -2,7 +2,7 @@ package net.kemitix.thorp.storage.aws import com.amazonaws.services.s3.AmazonS3ClientBuilder import com.amazonaws.services.s3.transfer.TransferManagerBuilder -import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent +import net.kemitix.thorp.domain.StorageEvent.ShutdownEvent import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.Storage import net.kemitix.thorp.storage.Storage.Service @@ -26,23 +26,23 @@ object S3Storage { localFile: LocalFile, bucket: Bucket, listenerSettings: UploadEventListener.Settings, - ): UIO[StorageQueueEvent] = + ): UIO[StorageEvent] = Uploader.upload(transferManager)( Uploader.Request(localFile, bucket, listenerSettings)) override def copy(bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, - targetKey: RemoteKey): UIO[StorageQueueEvent] = + targetKey: RemoteKey): UIO[StorageEvent] = Copier.copy(client)(Copier.Request(bucket, sourceKey, hash, targetKey)) override def delete(bucket: Bucket, - remoteKey: RemoteKey): UIO[StorageQueueEvent] = + remoteKey: RemoteKey): UIO[StorageEvent] = Deleter.delete(client)(bucket, remoteKey) - override def shutdown: UIO[StorageQueueEvent] = { + override def shutdown: UIO[StorageEvent] = { transferManager.shutdownNow(true) *> - client.shutdown().map(_ => ShutdownQueueEvent()) + client.shutdown().map(_ => ShutdownEvent()) } } } diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala index e0d6ed7..d45bc07 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala @@ -6,30 +6,38 @@ import com.amazonaws.event.ProgressEventType.RESPONSE_BYTE_TRANSFER_EVENT import com.amazonaws.event.{ProgressEvent, ProgressListener} import com.amazonaws.services.s3.model.{ObjectMetadata, PutObjectRequest} import net.kemitix.thorp.domain.Implicits._ -import net.kemitix.thorp.domain.StorageQueueEvent.{ - Action, - ErrorQueueEvent, - UploadQueueEvent +import net.kemitix.thorp.domain.StorageEvent.{ + ActionSummary, + ErrorEvent, + UploadEvent } -import net.kemitix.thorp.domain.UploadEvent.{ +import net.kemitix.thorp.domain.UploadProgressEvent.{ ByteTransferEvent, RequestEvent, TransferEvent } -import net.kemitix.thorp.domain.{StorageQueueEvent, _} +import net.kemitix.thorp.domain.{ + Bucket, + LocalFile, + MD5Hash, + RemoteKey, + StorageEvent, + UploadEventListener, + UploadProgressEvent +} import net.kemitix.thorp.storage.aws.Uploader.Request import zio.UIO trait Uploader { def upload(transferManager: => AmazonTransferManager)( - request: Request): UIO[StorageQueueEvent] = + request: Request): UIO[StorageEvent] = transfer(transferManager)(request) .catchAll(handleError(request.localFile.remoteKey)) private def handleError(remoteKey: RemoteKey)( - e: Throwable): UIO[StorageQueueEvent] = - UIO(ErrorQueueEvent(Action.Upload(remoteKey.key), remoteKey, e)) + e: Throwable): UIO[StorageEvent] = + UIO(ErrorEvent(ActionSummary.Upload(remoteKey.key), remoteKey, e)) private def transfer(transferManager: => AmazonTransferManager)( request: Request @@ -43,8 +51,8 @@ trait Uploader { .upload(putObjectRequest) .map(_.waitForUploadResult) .map(uploadResult => - UploadQueueEvent(RemoteKey(uploadResult.getKey), - MD5Hash(uploadResult.getETag))) + UploadEvent(RemoteKey(uploadResult.getKey), + MD5Hash(uploadResult.getETag))) } private def putObjectRequest( @@ -79,7 +87,7 @@ trait Uploader { lock.unlock(writeLock) } - private def eventHandler: ProgressEvent => UploadEvent = + private def eventHandler: ProgressEvent => UploadProgressEvent = progressEvent => { def isTransfer: ProgressEvent => Boolean = _.getEventType.isTransferEvent diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala index f923f2e..18343cf 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala @@ -1,6 +1,6 @@ package net.kemitix.thorp.storage.aws -import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent +import net.kemitix.thorp.domain.StorageEvent.ShutdownEvent import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.Storage import org.scalamock.scalatest.MockFactory @@ -34,7 +34,7 @@ trait AmazonS3ClientTestFixture extends MockFactory { localFile: LocalFile, bucket: Bucket, listenerSettings: UploadEventListener.Settings, - ): UIO[StorageQueueEvent] = + ): UIO[StorageEvent] = Uploader.upload(transferManager)( Uploader.Request(localFile, bucket, listenerSettings)) @@ -43,19 +43,19 @@ trait AmazonS3ClientTestFixture extends MockFactory { sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey - ): UIO[StorageQueueEvent] = + ): UIO[StorageEvent] = Copier.copy(client)( Copier.Request(bucket, sourceKey, hash, targetKey)) override def delete( bucket: Bucket, remoteKey: RemoteKey - ): UIO[StorageQueueEvent] = + ): UIO[StorageEvent] = Deleter.delete(client)(bucket, remoteKey) - override def shutdown: UIO[StorageQueueEvent] = { + override def shutdown: UIO[StorageEvent] = { transferManager.shutdownNow(true) *> - client.shutdown().map(_ => ShutdownQueueEvent()) + client.shutdown().map(_ => ShutdownEvent()) } } } diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/CopierTest.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/CopierTest.scala index a5f9cdf..5f6a628 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/CopierTest.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/CopierTest.scala @@ -2,8 +2,7 @@ package net.kemitix.thorp.storage.aws import com.amazonaws.services.s3.model.{AmazonS3Exception, CopyObjectResult} import net.kemitix.thorp.console.Console -import net.kemitix.thorp.domain.NonUnit.~* -import net.kemitix.thorp.domain.StorageQueueEvent.{Action, ErrorQueueEvent} +import net.kemitix.thorp.domain.StorageEvent.{ActionSummary, ErrorEvent} import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.aws.S3ClientException.{CopyError, HashError} import org.scalatest.FreeSpec @@ -22,39 +21,37 @@ class CopierTest extends FreeSpec { "when source exists" - { "when source hash matches" - { "copies from source to target" in { - val event = StorageQueueEvent.CopyQueueEvent(sourceKey, targetKey) + val event = StorageEvent.CopyEvent(sourceKey, targetKey) val expected = Right(event) new AmazonS3ClientTestFixture { - ~*( - (fixture.amazonS3Client.copyObject _) - .when() - .returns(_ => Task.succeed(Some(new CopyObjectResult)))) + (fixture.amazonS3Client.copyObject _) + .when() + .returns(_ => Task.succeed(Some(new CopyObjectResult))) private val result = invoke(bucket, sourceKey, hash, targetKey, fixture.amazonS3Client) - ~*(assertResult(expected)(result)) + assertResult(expected)(result) } } } "when source hash does not match" - { "skip the file with an error" in { new AmazonS3ClientTestFixture { - ~*( - (fixture.amazonS3Client.copyObject _) - .when() - .returns(_ => Task.succeed(None))) + (fixture.amazonS3Client.copyObject _) + .when() + .returns(_ => Task.succeed(None)) private val result = invoke(bucket, sourceKey, hash, targetKey, fixture.amazonS3Client) - ~*(result match { + result match { case Right( - ErrorQueueEvent(Action.Copy("sourceKey => targetKey"), - RemoteKey("targetKey"), - e)) => + ErrorEvent(ActionSummary.Copy("sourceKey => targetKey"), + RemoteKey("targetKey"), + e)) => e match { case HashError => assert(true) case _ => fail(s"Not a HashError: ${e.getMessage}") } case e => fail(s"Not an ErrorQueueEvent: $e") - }) + } } } } @@ -62,23 +59,23 @@ class CopierTest extends FreeSpec { "skip the file with an error" in { new AmazonS3ClientTestFixture { private val expectedMessage = "The specified key does not exist" - ~*((fixture.amazonS3Client.copyObject _) + (fixture.amazonS3Client.copyObject _) .when() - .returns(_ => Task.fail(new AmazonS3Exception(expectedMessage)))) + .returns(_ => Task.fail(new AmazonS3Exception(expectedMessage))) private val result = invoke(bucket, sourceKey, hash, targetKey, fixture.amazonS3Client) - ~*(result match { + result match { case Right( - ErrorQueueEvent(Action.Copy("sourceKey => targetKey"), - RemoteKey("targetKey"), - e)) => + ErrorEvent(ActionSummary.Copy("sourceKey => targetKey"), + RemoteKey("targetKey"), + e)) => e match { case CopyError(cause) => assert(cause.getMessage.startsWith(expectedMessage)) case _ => fail(s"Not a CopyError: ${e.getMessage}") } case e => fail(s"Not an ErrorQueueEvent: ${e}") - }) + } } } } diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/DeleterTest.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/DeleterTest.scala index 300f84e..7b3d25c 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/DeleterTest.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/DeleterTest.scala @@ -3,16 +3,15 @@ package net.kemitix.thorp.storage.aws import com.amazonaws.SdkClientException import com.amazonaws.services.s3.model.AmazonS3Exception import net.kemitix.thorp.console._ -import net.kemitix.thorp.domain.StorageQueueEvent.{ - Action, - DeleteQueueEvent, - ErrorQueueEvent +import net.kemitix.thorp.domain.StorageEvent.{ + ActionSummary, + DeleteEvent, + ErrorEvent } import net.kemitix.thorp.domain.{Bucket, RemoteKey} import org.scalatest.FreeSpec import zio.internal.PlatformLive import zio.{Runtime, Task, UIO} -import net.kemitix.thorp.domain.NonUnit.~* class DeleterTest extends FreeSpec { @@ -22,42 +21,39 @@ class DeleterTest extends FreeSpec { val bucket = Bucket("aBucket") val remoteKey = RemoteKey("aRemoteKey") "when no errors" in { - val expected = Right(DeleteQueueEvent(remoteKey)) + val expected = Right(DeleteEvent(remoteKey)) new AmazonS3ClientTestFixture { - ~*( - (fixture.amazonS3Client.deleteObject _) - .when() - .returns(_ => UIO.succeed(()))) + (fixture.amazonS3Client.deleteObject _) + .when() + .returns(_ => UIO.succeed(())) private val result = invoke(fixture.amazonS3Client)(bucket, remoteKey) - ~*(assertResult(expected)(result)) + assertResult(expected)(result) } } "when Amazon Service Exception" in { val exception = new AmazonS3Exception("message") val expected = Right( - ErrorQueueEvent(Action.Delete(remoteKey.key), remoteKey, exception)) + ErrorEvent(ActionSummary.Delete(remoteKey.key), remoteKey, exception)) new AmazonS3ClientTestFixture { - ~*( - (fixture.amazonS3Client.deleteObject _) - .when() - .returns(_ => Task.fail(exception))) + (fixture.amazonS3Client.deleteObject _) + .when() + .returns(_ => Task.fail(exception)) private val result = invoke(fixture.amazonS3Client)(bucket, remoteKey) - ~*(assertResult(expected)(result)) + assertResult(expected)(result) } } "when Amazon SDK Client Exception" in { val exception = new SdkClientException("message") val expected = Right( - ErrorQueueEvent(Action.Delete(remoteKey.key), remoteKey, exception)) + ErrorEvent(ActionSummary.Delete(remoteKey.key), remoteKey, exception)) new AmazonS3ClientTestFixture { - ~*( - (fixture.amazonS3Client.deleteObject _) - .when() - .returns(_ => Task.fail(exception))) + (fixture.amazonS3Client.deleteObject _) + .when() + .returns(_ => Task.fail(exception)) private val result = invoke(fixture.amazonS3Client)(bucket, remoteKey) - ~*(assertResult(expected)(result)) + assertResult(expected)(result) } } def invoke(amazonS3Client: AmazonS3.Client)(bucket: Bucket, diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ListerTest.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ListerTest.scala index ec5ee49..c231ed7 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ListerTest.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ListerTest.scala @@ -8,7 +8,6 @@ import com.amazonaws.services.s3.model.{ ListObjectsV2Result, S3ObjectSummary } -import net.kemitix.thorp.domain.NonUnit.~* import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.Storage import org.scalatest.FreeSpec @@ -88,7 +87,7 @@ class ListerTest extends FreeSpec { etag: String, truncated: Boolean) = { val result = new ListObjectsV2Result - ~*(result.getObjectSummaries.add(objectSummary(key, etag, nowDate))) + result.getObjectSummaries.add(objectSummary(key, etag, nowDate)) result.setTruncated(truncated) result } @@ -97,10 +96,9 @@ class ListerTest extends FreeSpec { "when Amazon Service Exception" in { val exception = new AmazonS3Exception("message") new AmazonS3ClientTestFixture { - ~*( - (fixture.amazonS3Client.listObjectsV2 _) - .when() - .returns(_ => Task.fail(exception))) + (fixture.amazonS3Client.listObjectsV2 _) + .when() + .returns(_ => Task.fail(exception)) private val result = invoke(fixture.amazonS3Client)(bucket, prefix) assert(result.isLeft) } @@ -108,10 +106,9 @@ class ListerTest extends FreeSpec { "when Amazon SDK Client Exception" in { val exception = new SdkClientException("message") new AmazonS3ClientTestFixture { - ~*( - (fixture.amazonS3Client.listObjectsV2 _) - .when() - .returns(_ => Task.fail(exception))) + (fixture.amazonS3Client.listObjectsV2 _) + .when() + .returns(_ => Task.fail(exception)) private val result = invoke(fixture.amazonS3Client)(bucket, prefix) assert(result.isLeft) } diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala deleted file mode 100644 index 15540fb..0000000 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala +++ /dev/null @@ -1,147 +0,0 @@ -package net.kemitix.thorp.storage.aws - -import net.kemitix.thorp.lib.{LocalFileValidator, S3MetaDataEnricher} -import net.kemitix.thorp.domain.HashType.MD5 -import net.kemitix.thorp.domain._ -import net.kemitix.thorp.filesystem.Resource -import org.scalamock.scalatest.MockFactory -import org.scalatest.FunSpec - -import scala.collection.MapView - -class StorageServiceSuite extends FunSpec with MockFactory { - - private val source = Resource(this, "upload") - private val sourcePath = source.toPath - private val sources = Sources(List(sourcePath)) - private val prefix = RemoteKey("prefix") - - describe("getS3Status") { - - val hash = MD5Hash("hash") - val env = for { - localFile <- LocalFileValidator.resolve("the-file", - Map(MD5 -> hash), - sourcePath, - sources, - prefix) - key = localFile.remoteKey - keyOtherKey <- LocalFileValidator.resolve("other-key-same-hash", - Map(MD5 -> hash), - sourcePath, - sources, - prefix) - diffHash = MD5Hash("diff") - keyDiffHash <- LocalFileValidator.resolve("other-key-diff-hash", - Map(MD5 -> diffHash), - sourcePath, - sources, - prefix) - s3ObjectsData = RemoteObjects( - byHash = MapView( - hash -> key, - diffHash -> keyDiffHash.remoteKey - ), - byKey = MapView( - key -> hash, - keyOtherKey.remoteKey -> hash, - keyDiffHash.remoteKey -> diffHash - ) - ) - } yield - (s3ObjectsData, - localFile: LocalFile, - keyOtherKey, - keyDiffHash, - diffHash, - key) - - def invoke(localFile: LocalFile, s3ObjectsData: RemoteObjects) = - S3MetaDataEnricher.getS3Status(localFile, s3ObjectsData) - - def getMatchesByKey( - status: (Option[MD5Hash], Map[MD5Hash, RemoteKey])): Option[MD5Hash] = { - val (byKey, _) = status - byKey - } - - def getMatchesByHash(status: (Option[MD5Hash], Map[MD5Hash, RemoteKey])) - : Map[MD5Hash, RemoteKey] = { - val (_, byHash) = status - byHash - } - - describe( - "when remote key exists, unmodified and other key matches the hash") { - it("should return the match by key") { - env.map({ - case (s3ObjectsData, localFile, _, _, _, _) => { - val result = getMatchesByKey(invoke(localFile, s3ObjectsData)) - assert(result.contains(hash)) - } - }) - } - it("should return both matches for the hash") { - env.map({ - case (s3ObjectsData, localFile, keyOtherKey, _, _, key) => { - val result = getMatchesByHash(invoke(localFile, s3ObjectsData)) - assertResult( - Set((hash, key), (hash, keyOtherKey.remoteKey)) - )(result) - } - }) - } - } - - describe("when remote key does not exist and no others matches hash") { - val env2 = LocalFileValidator - .resolve("missing-file", - Map(MD5 -> MD5Hash("unique")), - sourcePath, - sources, - prefix) - it("should return no matches by key") { - env2.map(localFile => { - env.map({ - case (s3ObjectsData, _, _, _, _, _) => { - val result = getMatchesByKey(invoke(localFile, s3ObjectsData)) - assert(result.isEmpty) - } - }) - }) - } - it("should return no matches by hash") { - env2.map(localFile => { - env.map({ - case (s3ObjectsData, _, _, _, _, _) => { - val result = getMatchesByHash(invoke(localFile, s3ObjectsData)) - assert(result.isEmpty) - } - }) - }) - } - } - - describe("when remote key exists and no others match hash") { - it("should return the match by key") { - env.map({ - case (s3ObjectsData, _, _, keyDiffHash, diffHash, _) => { - val result = getMatchesByKey(invoke(keyDiffHash, s3ObjectsData)) - assert(result.contains(diffHash)) - } - }) - } - it("should return one match by hash") { - env.map({ - case (s3ObjectsData, _, _, keyDiffHash, diffHash, _) => { - val result = getMatchesByHash(invoke(keyDiffHash, s3ObjectsData)) - assertResult( - Set((diffHash, keyDiffHash.remoteKey)) - )(result) - } - }) - } - } - } - -} diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderTest.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderTest.scala index 18e55e5..ca90505 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderTest.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderTest.scala @@ -7,16 +7,15 @@ import com.amazonaws.services.s3.model.AmazonS3Exception import com.amazonaws.services.s3.transfer.model.UploadResult import net.kemitix.thorp.config.Config import net.kemitix.thorp.domain.HashType.MD5 -import net.kemitix.thorp.domain.StorageQueueEvent.{ - Action, - ErrorQueueEvent, - UploadQueueEvent +import net.kemitix.thorp.domain.StorageEvent.{ + ActionSummary, + ErrorEvent, + UploadEvent } import net.kemitix.thorp.domain._ import org.scalamock.scalatest.MockFactory import org.scalatest.FreeSpec import zio.{DefaultRuntime, Task} -import net.kemitix.thorp.domain.NonUnit.~* import net.kemitix.thorp.filesystem.Resource class UploaderTest extends FreeSpec with MockFactory { @@ -39,57 +38,54 @@ class UploaderTest extends FreeSpec with MockFactory { UploadEventListener.Settings(localFile, 0, 0, batchMode = true) "when no error" in { val expected = - Right(UploadQueueEvent(remoteKey, aHash)) + Right(UploadEvent(remoteKey, aHash)) new AmazonS3ClientTestFixture { - ~*( - (fixture.amazonS3TransferManager.upload _) - .when() - .returns(_ => Task.succeed(inProgress))) + (fixture.amazonS3TransferManager.upload _) + .when() + .returns(_ => Task.succeed(inProgress)) private val result = invoke(fixture.amazonS3TransferManager)( localFile, bucket, listenerSettings ) - ~*(assertResult(expected)(result)) + assertResult(expected)(result) } } "when Amazon Service Exception" in { val exception = new AmazonS3Exception("message") val expected = Right( - ErrorQueueEvent(Action.Upload(remoteKey.key), remoteKey, exception)) + ErrorEvent(ActionSummary.Upload(remoteKey.key), remoteKey, exception)) new AmazonS3ClientTestFixture { - ~*( - (fixture.amazonS3TransferManager.upload _) - .when() - .returns(_ => Task.fail(exception))) + (fixture.amazonS3TransferManager.upload _) + .when() + .returns(_ => Task.fail(exception)) private val result = invoke(fixture.amazonS3TransferManager)( localFile, bucket, listenerSettings ) - ~*(assertResult(expected)(result)) + assertResult(expected)(result) } } "when Amazon SDK Client Exception" in { val exception = new SdkClientException("message") val expected = Right( - ErrorQueueEvent(Action.Upload(remoteKey.key), remoteKey, exception)) + ErrorEvent(ActionSummary.Upload(remoteKey.key), remoteKey, exception)) new AmazonS3ClientTestFixture { - ~*( - (fixture.amazonS3TransferManager.upload _) - .when() - .returns(_ => Task.fail(exception))) + (fixture.amazonS3TransferManager.upload _) + .when() + .returns(_ => Task.fail(exception)) private val result = invoke(fixture.amazonS3TransferManager)( localFile, bucket, listenerSettings ) - ~*(assertResult(expected)(result)) + assertResult(expected)(result) } } def invoke(transferManager: AmazonTransferManager)( diff --git a/storage/src/main/scala/net/kemitix/thorp/storage/Storage.scala b/storage/src/main/scala/net/kemitix/thorp/storage/Storage.scala index e36f60d..c9f0a03 100644 --- a/storage/src/main/scala/net/kemitix/thorp/storage/Storage.scala +++ b/storage/src/main/scala/net/kemitix/thorp/storage/Storage.scala @@ -19,34 +19,34 @@ object Storage { localFile: LocalFile, bucket: Bucket, listenerSettings: UploadEventListener.Settings, - ): ZIO[Storage, Nothing, StorageQueueEvent] + ): ZIO[Storage, Nothing, StorageEvent] def copy( bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey - ): ZIO[Storage, Nothing, StorageQueueEvent] + ): ZIO[Storage, Nothing, StorageEvent] def delete( bucket: Bucket, remoteKey: RemoteKey - ): UIO[StorageQueueEvent] + ): UIO[StorageEvent] - def shutdown: UIO[StorageQueueEvent] + def shutdown: UIO[StorageEvent] } trait Test extends Storage { def listResult: Task[RemoteObjects] = Task.die(new NotImplementedError) - def uploadResult: UIO[StorageQueueEvent] = + def uploadResult: UIO[StorageEvent] = Task.die(new NotImplementedError) - def copyResult: UIO[StorageQueueEvent] = + def copyResult: UIO[StorageEvent] = Task.die(new NotImplementedError) - def deleteResult: UIO[StorageQueueEvent] = + def deleteResult: UIO[StorageEvent] = Task.die(new NotImplementedError) - def shutdownResult: UIO[StorageQueueEvent] = + def shutdownResult: UIO[StorageEvent] = Task.die(new NotImplementedError) val storage: Service = new Service { @@ -59,21 +59,21 @@ object Storage { localFile: LocalFile, bucket: Bucket, listenerSettings: UploadEventListener.Settings - ): ZIO[Storage, Nothing, StorageQueueEvent] = + ): ZIO[Storage, Nothing, StorageEvent] = uploadResult override def copy( bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, - targetKey: RemoteKey): ZIO[Storage, Nothing, StorageQueueEvent] = + targetKey: RemoteKey): ZIO[Storage, Nothing, StorageEvent] = copyResult override def delete(bucket: Bucket, - remoteKey: RemoteKey): UIO[StorageQueueEvent] = + remoteKey: RemoteKey): UIO[StorageEvent] = deleteResult - override def shutdown: UIO[StorageQueueEvent] = + override def shutdown: UIO[StorageEvent] = shutdownResult } @@ -89,7 +89,7 @@ object Storage { localFile: LocalFile, bucket: Bucket, listenerSettings: UploadEventListener.Settings - ): ZIO[Storage, Nothing, StorageQueueEvent] = + ): ZIO[Storage, Nothing, StorageEvent] = ZIO.accessM(_.storage upload (localFile, bucket, listenerSettings)) final def copy( @@ -97,13 +97,13 @@ object Storage { sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey - ): ZIO[Storage, Nothing, StorageQueueEvent] = + ): ZIO[Storage, Nothing, StorageEvent] = ZIO.accessM(_.storage copy (bucket, sourceKey, hash, targetKey)) final def delete( bucket: Bucket, remoteKey: RemoteKey - ): ZIO[Storage, Nothing, StorageQueueEvent] = + ): ZIO[Storage, Nothing, StorageEvent] = ZIO.accessM(_.storage delete (bucket, remoteKey)) }