diff --git a/CHANGELOG.org b/CHANGELOG.org index a6a5f3b..aeeeb76 100644 --- a/CHANGELOG.org +++ b/CHANGELOG.org @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [[https://keepachangelog.com/en/1.0.0/][Keep a Changelog]], and this project adheres to [[https://semver.org/spec/v2.0.0.html][Semantic Versioning]]. +* [0.6.0] - 2019-??-?? + +** Added + + - Abstraction layer encapsulating S3 as Storage (#76) + * [0.5.0] - 2019-06-21 ** Added diff --git a/aws-api/src/main/scala/net/kemitix/thorp/aws/api/S3Action.scala b/aws-api/src/main/scala/net/kemitix/thorp/aws/api/S3Action.scala deleted file mode 100644 index d502fb6..0000000 --- a/aws-api/src/main/scala/net/kemitix/thorp/aws/api/S3Action.scala +++ /dev/null @@ -1,40 +0,0 @@ -package net.kemitix.thorp.aws.api - -import net.kemitix.thorp.domain.{MD5Hash, RemoteKey} - -sealed trait S3Action { - - // the remote key that was uploaded, deleted or otherwise updated by the action - def remoteKey: RemoteKey - - val order: Int - -} - -object S3Action { - - final case class DoNothingS3Action(remoteKey: RemoteKey) extends S3Action { - override val order: Int = 0 - } - - final case class CopyS3Action(remoteKey: RemoteKey) extends S3Action { - override val order: Int = 1 - } - - final case class UploadS3Action( - remoteKey: RemoteKey, - md5Hash: MD5Hash) extends S3Action { - override val order: Int = 2 - } - - final case class DeleteS3Action(remoteKey: RemoteKey) extends S3Action { - override val order: Int = 3 - } - - final case class ErroredS3Action(remoteKey: RemoteKey, e: Throwable) extends S3Action { - override val order: Int = 10 - } - - implicit def ord[A <: S3Action]: Ordering[A] = Ordering.by(_.order) - -} diff --git a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientBuilder.scala b/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientBuilder.scala deleted file mode 100644 index 077776a..0000000 --- a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientBuilder.scala +++ /dev/null @@ -1,16 +0,0 @@ -package net.kemitix.thorp.aws.lib - -import com.amazonaws.services.s3.transfer.{TransferManager, TransferManagerBuilder} -import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder} -import net.kemitix.thorp.aws.api.S3Client - -object S3ClientBuilder { - - def createClient(amazonS3Client: AmazonS3, - amazonS3TransferManager: TransferManager): S3Client = - new ThorpS3Client(amazonS3Client, amazonS3TransferManager) - - def defaultClient: S3Client = - createClient(AmazonS3ClientBuilder.defaultClient, TransferManagerBuilder.defaultTransferManager) - -} diff --git a/build.sbt b/build.sbt index 86102ff..4505875 100644 --- a/build.sbt +++ b/build.sbt @@ -41,16 +41,16 @@ val catsEffectsSettings = Seq( "-Ypartial-unification") ) -// cli -> thorp-lib -> aws-lib -> core -> aws-api -> domain +// cli -> thorp-lib -> storage-aws -> core -> storage-api -> domain -lazy val root = (project in file(".")) +lazy val thorp = (project in file(".")) .settings(commonSettings) lazy val cli = (project in file("cli")) .settings(commonSettings) .settings(mainClass in assembly := Some("net.kemitix.thorp.cli.Main")) .settings(applicationSettings) - .aggregate(`thorp-lib`, `aws-lib`, core, `aws-api`, domain) + .aggregate(`thorp-lib`, `storage-aws`, core, `storage-api`, domain) .settings(commandLineParsing) .settings(testDependencies) .dependsOn(`thorp-lib`) @@ -58,11 +58,11 @@ lazy val cli = (project in file("cli")) lazy val `thorp-lib` = (project in file("thorp-lib")) .settings(commonSettings) .settings(assemblyJarName in assembly := "thorp-lib.jar") - .dependsOn(`aws-lib`) + .dependsOn(`storage-aws`) -lazy val `aws-lib` = (project in file("aws-lib")) +lazy val `storage-aws` = (project in file("storage-aws")) .settings(commonSettings) - .settings(assemblyJarName in assembly := "aws-lib.jar") + .settings(assemblyJarName in assembly := "storage-aws.jar") .settings(awsSdkDependencies) .settings(testDependencies) .dependsOn(core) @@ -71,11 +71,11 @@ lazy val core = (project in file("core")) .settings(commonSettings) .settings(assemblyJarName in assembly := "core.jar") .settings(testDependencies) - .dependsOn(`aws-api`) + .dependsOn(`storage-api`) -lazy val `aws-api` = (project in file("aws-api")) +lazy val `storage-api` = (project in file("storage-api")) .settings(commonSettings) - .settings(assemblyJarName in assembly := "aws-api.jar") + .settings(assemblyJarName in assembly := "storage-api.jar") .dependsOn(domain) lazy val domain = (project in file("domain")) diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala index 22d6843..c286c24 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala @@ -1,15 +1,15 @@ package net.kemitix.thorp.cli import cats.effect.{ExitCode, IO} -import net.kemitix.thorp.aws.lib.S3ClientBuilder import net.kemitix.thorp.core.{ConfigOption, Sync} import net.kemitix.thorp.domain.Logger +import net.kemitix.thorp.storage.aws.S3StorageServiceBuilder.defaultStorageService trait Program { def apply(configOptions: Seq[ConfigOption]): IO[ExitCode] = { implicit val logger: Logger = new PrintLogger() - Sync(S3ClientBuilder.defaultClient)(configOptions) flatMap { + Sync(defaultStorageService)(configOptions) flatMap { case Left(errors) => for { _ <- logger.error(s"There were errors:") diff --git a/core/src/main/scala/net/kemitix/thorp/core/ActionSubmitter.scala b/core/src/main/scala/net/kemitix/thorp/core/ActionSubmitter.scala index f164784..9f56200 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ActionSubmitter.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ActionSubmitter.scala @@ -1,37 +1,39 @@ package net.kemitix.thorp.core import cats.effect.IO -import net.kemitix.thorp.aws.api.S3Action.DoNothingS3Action -import net.kemitix.thorp.aws.api.{S3Action, S3Client, UploadProgressListener} import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} -import net.kemitix.thorp.domain.{Config, Logger} +import net.kemitix.thorp.domain.{Config, Logger, StorageQueueEvent, UploadEventListener} +import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent +import net.kemitix.thorp.storage.api.StorageService -object ActionSubmitter { +trait ActionSubmitter { - def submitAction(s3Client: S3Client, + def submitAction(storageService: StorageService, action: Action) (implicit c: Config, - logger: Logger): Stream[IO[S3Action]] = { + logger: Logger): Stream[IO[StorageQueueEvent]] = { Stream( action match { case ToUpload(bucket, localFile) => for { _ <- logger.info(s" Upload: ${localFile.relative}") - progressListener = new UploadProgressListener(localFile) - action <- s3Client.upload(localFile, bucket, progressListener, 1) - } yield action + uploadEventListener = new UploadEventListener(localFile) + event <- storageService.upload(localFile, bucket, uploadEventListener, 1) + } yield event case ToCopy(bucket, sourceKey, hash, targetKey) => for { _ <- logger.info(s" Copy: ${sourceKey.key} => ${targetKey.key}") - action <- s3Client.copy(bucket, sourceKey, hash, targetKey) - } yield action + event <- storageService.copy(bucket, sourceKey, hash, targetKey) + } yield event case ToDelete(bucket, remoteKey) => for { _ <- logger.info(s" Delete: ${remoteKey.key}") - action <- s3Client.delete(bucket, remoteKey) - } yield action - case DoNothing(bucket, remoteKey) => - IO.pure(DoNothingS3Action(remoteKey)) + event <- storageService.delete(bucket, remoteKey) + } yield event + case DoNothing(_, remoteKey) => + IO.pure(DoNothingQueueEvent(remoteKey)) }) } } + +object ActionSubmitter extends ActionSubmitter diff --git a/core/src/main/scala/net/kemitix/thorp/core/ParseConfigLines.scala b/core/src/main/scala/net/kemitix/thorp/core/ParseConfigLines.scala index 7e273bb..5c9cc36 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ParseConfigLines.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ParseConfigLines.scala @@ -3,7 +3,7 @@ package net.kemitix.thorp.core import java.nio.file.Paths import java.util.regex.Pattern -import net.kemitix.thorp.core.ConfigOption.{Bucket, Debug, Exclude, Include, Prefix, Source} +import net.kemitix.thorp.core.ConfigOption._ trait ParseConfigLines { diff --git a/core/src/main/scala/net/kemitix/thorp/core/Sync.scala b/core/src/main/scala/net/kemitix/thorp/core/Sync.scala index febef7d..c4edfa7 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/Sync.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/Sync.scala @@ -2,7 +2,6 @@ package net.kemitix.thorp.core import cats.effect.IO import cats.implicits._ -import net.kemitix.thorp.aws.api.{S3Action, S3Client} import net.kemitix.thorp.core.Action.ToDelete import net.kemitix.thorp.core.ActionGenerator.createActions import net.kemitix.thorp.core.ActionSubmitter.submitAction @@ -11,6 +10,7 @@ import net.kemitix.thorp.core.LocalFileStream.findFiles import net.kemitix.thorp.core.S3MetaDataEnricher.getMetadata import net.kemitix.thorp.core.SyncLogging.{logFileScan, logRunFinished, logRunStart} import net.kemitix.thorp.domain._ +import net.kemitix.thorp.storage.api.StorageService trait Sync { @@ -20,24 +20,24 @@ trait Sync { } yield errorMessages } - def apply(s3Client: S3Client) + def apply(storageService: StorageService) (configOptions: Seq[ConfigOption]) (implicit defaultLogger: Logger): IO[Either[List[String], Unit]] = buildConfig(configOptions).flatMap { - case Right(config) => runWithValidConfig(s3Client, defaultLogger, config) + case Right(config) => runWithValidConfig(storageService, defaultLogger, config) case Left(errors) => IO.pure(Left(errorMessages(errors.toList))) } - private def runWithValidConfig(s3Client: S3Client, + private def runWithValidConfig(storageService: StorageService, defaultLogger: Logger, config: Config) = { for { - _ <- run(config, s3Client, defaultLogger.withDebug(config.debug)) + _ <- run(config, storageService, defaultLogger.withDebug(config.debug)) } yield Right(()) } private def run(cliConfig: Config, - s3Client: S3Client, + storageService: StorageService, logger: Logger): IO[Unit] = { implicit val c: Config = cliConfig @@ -50,9 +50,9 @@ trait Sync { IO.pure(sData.flatMap(s3MetaData => createActions(s3MetaData))) def submit(sActions: Stream[Action]) = - IO(sActions.flatMap(action => submitAction(s3Client, action))) + IO(sActions.flatMap(action => submitAction(storageService, action))) - def copyUploadActions(s3Data: S3ObjectsData): IO[Stream[S3Action]] = + def copyUploadActions(s3Data: S3ObjectsData): IO[Stream[StorageQueueEvent]] = (for { files <- findFiles(c.source, MD5HashGenerator.md5File(_)) metaData <- metaData(s3Data, files) @@ -62,18 +62,18 @@ trait Sync { .flatten .map(streamS3Actions => streamS3Actions.sorted) - def deleteActions(s3ObjectsData: S3ObjectsData): IO[Stream[S3Action]] = + def deleteActions(s3ObjectsData: S3ObjectsData): IO[Stream[StorageQueueEvent]] = (for { key <- s3ObjectsData.byKey.keys if key.isMissingLocally(c.source, c.prefix) - ioDelAction <- submitAction(s3Client, ToDelete(c.bucket, key)) + ioDelAction <- submitAction(storageService, ToDelete(c.bucket, key)) } yield ioDelAction) .toStream .sequence for { _ <- logRunStart - s3data <- s3Client.listObjects(c.bucket, c.prefix) + s3data <- storageService.listObjects(c.bucket, c.prefix) _ <- logFileScan copyUploadActions <- copyUploadActions(s3data) deleteActions <- deleteActions(s3data) diff --git a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala b/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala index 94ee11c..2096cbe 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala @@ -2,12 +2,11 @@ package net.kemitix.thorp.core import cats.effect.IO import cats.implicits._ -import net.kemitix.thorp.aws.api.S3Action -import net.kemitix.thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action, ErroredS3Action, UploadS3Action} -import net.kemitix.thorp.domain.{Config, Logger} +import net.kemitix.thorp.domain.{Config, Logger, StorageQueueEvent} +import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, ErrorQueueEvent, UploadQueueEvent} // Logging for the Sync class -object SyncLogging { +trait SyncLogging { def logRunStart(implicit c: Config, logger: Logger): IO[Unit] = @@ -17,16 +16,16 @@ object SyncLogging { logger: Logger): IO[Unit] = logger.info(s"Scanning local files: ${c.source}...") - def logErrors(actions: Stream[S3Action]) + def logErrors(actions: Stream[StorageQueueEvent]) (implicit logger: Logger): IO[Unit] = for { _ <- actions.map { - case ErroredS3Action(k, e) => logger.warn(s"${k.key}: ${e.getMessage}") + case ErrorQueueEvent(k, e) => logger.warn(s"${k.key}: ${e.getMessage}") case _ => IO.unit }.sequence } yield () - def logRunFinished(actions: Stream[S3Action]) + def logRunFinished(actions: Stream[StorageQueueEvent]) (implicit c: Config, logger: Logger): IO[Unit] = { val counters = actions.foldLeft(Counters())(countActivities) @@ -40,19 +39,21 @@ object SyncLogging { } private def countActivities(implicit c: Config, - logger: Logger): (Counters, S3Action) => Counters = - (counters: Counters, s3Action: S3Action) => { + logger: Logger): (Counters, StorageQueueEvent) => Counters = + (counters: Counters, s3Action: StorageQueueEvent) => { s3Action match { - case _: UploadS3Action => + case _: UploadQueueEvent => counters.copy(uploaded = counters.uploaded + 1) - case _: CopyS3Action => + case _: CopyQueueEvent => counters.copy(copied = counters.copied + 1) - case _: DeleteS3Action => + case _: DeleteQueueEvent => counters.copy(deleted = counters.deleted + 1) - case ErroredS3Action(k, e) => + case ErrorQueueEvent(k, e) => counters.copy(errors = counters.errors + 1) case _ => counters } } } + +object SyncLogging extends SyncLogging diff --git a/core/src/test/scala/net/kemitix/thorp/core/S3ActionSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/StorageQueueEventSuite.scala similarity index 61% rename from core/src/test/scala/net/kemitix/thorp/core/S3ActionSuite.scala rename to core/src/test/scala/net/kemitix/thorp/core/StorageQueueEventSuite.scala index 36834a7..b43d2c9 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/S3ActionSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/StorageQueueEventSuite.scala @@ -1,17 +1,17 @@ package net.kemitix.thorp.core -import net.kemitix.thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action, UploadS3Action} import net.kemitix.thorp.domain.{MD5Hash, RemoteKey} +import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, UploadQueueEvent} import org.scalatest.FunSpec -class S3ActionSuite extends FunSpec { +class StorageQueueEventSuite extends FunSpec { describe("Ordering of types") { val remoteKey = RemoteKey("remote-key") val md5Hash = MD5Hash("md5hash") - val copy = CopyS3Action(remoteKey) - val upload = UploadS3Action(remoteKey, md5Hash) - val delete = DeleteS3Action(remoteKey) + val copy = CopyQueueEvent(remoteKey) + val upload = UploadQueueEvent(remoteKey, md5Hash) + val delete = DeleteQueueEvent(remoteKey) val unsorted = List(delete, copy, upload) it("should sort as copy < upload < delete ") { val result = unsorted.sorted diff --git a/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala index 864a4dd..6062242 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala @@ -4,11 +4,10 @@ import java.io.File import java.time.Instant import cats.effect.IO -import net.kemitix.thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action, UploadS3Action} -import net.kemitix.thorp.aws.api.{S3Client, UploadProgressListener} import net.kemitix.thorp.core.MD5HashData.{leafHash, rootHash} -import net.kemitix.thorp.domain.Filter.Exclude import net.kemitix.thorp.domain._ +import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, UploadQueueEvent} +import net.kemitix.thorp.storage.api.StorageService import org.scalatest.FunSpec class SyncSuite @@ -35,30 +34,31 @@ class SyncSuite val rootRemoteKey = RemoteKey("prefix/root-file") val leafRemoteKey = RemoteKey("prefix/subdir/leaf-file") - def invokeSubject(s3Client: RecordingClient, configOptions: List[ConfigOption]) = { - Sync(s3Client)(configOptions).unsafeRunSync + def invokeSubject(storageService: StorageService, + configOptions: List[ConfigOption]) = { + Sync(storageService)(configOptions).unsafeRunSync } describe("when all files should be uploaded") { - val s3Client = new RecordingClient(testBucket, S3ObjectsData( + val storageService = new RecordingStorageService(testBucket, S3ObjectsData( byHash = Map(), byKey = Map())) it("uploads all files") { val expectedUploads = Map( "subdir/leaf-file" -> leafRemoteKey, "root-file" -> rootRemoteKey) - invokeSubject(s3Client, configOptions) - assertResult(expectedUploads)(s3Client.uploadsRecord) + invokeSubject(storageService, configOptions) + assertResult(expectedUploads)(storageService.uploadsRecord) } it("copies nothing") { val expectedCopies = Map() - invokeSubject(s3Client, configOptions) - assertResult(expectedCopies)(s3Client.copiesRecord) + invokeSubject(storageService, configOptions) + assertResult(expectedCopies)(storageService.copiesRecord) } it("deletes nothing") { val expectedDeletions = Set() - invokeSubject(s3Client, configOptions) - assertResult(expectedDeletions)(s3Client.deletionsRecord) + invokeSubject(storageService, configOptions) + assertResult(expectedDeletions)(storageService.deletionsRecord) } } describe("when no files should be uploaded") { @@ -69,21 +69,21 @@ class SyncSuite byKey = Map( RemoteKey("prefix/root-file") -> HashModified(rootHash, lastModified), RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified))) - val s3Client = new RecordingClient(testBucket, s3ObjectsData) + val storageService = new RecordingStorageService(testBucket, s3ObjectsData) it("uploads nothing") { val expectedUploads = Map() - invokeSubject(s3Client, configOptions) - assertResult(expectedUploads)(s3Client.uploadsRecord) + invokeSubject(storageService, configOptions) + assertResult(expectedUploads)(storageService.uploadsRecord) } it("copies nothing") { val expectedCopies = Map() - invokeSubject(s3Client, configOptions) - assertResult(expectedCopies)(s3Client.copiesRecord) + invokeSubject(storageService, configOptions) + assertResult(expectedCopies)(storageService.copiesRecord) } it("deletes nothing") { val expectedDeletions = Set() - invokeSubject(s3Client, configOptions) - assertResult(expectedDeletions)(s3Client.deletionsRecord) + invokeSubject(storageService, configOptions) + assertResult(expectedDeletions)(storageService.deletionsRecord) } } describe("when a file is renamed it is moved on S3 with no upload") { @@ -95,21 +95,21 @@ class SyncSuite byKey = Map( RemoteKey("prefix/root-file-old") -> HashModified(rootHash, lastModified), RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified))) - val s3Client = new RecordingClient(testBucket, s3ObjectsData) + val storageService = new RecordingStorageService(testBucket, s3ObjectsData) it("uploads nothing") { - invokeSubject(s3Client, configOptions) + invokeSubject(storageService, configOptions) val expectedUploads = Map() - assertResult(expectedUploads)(s3Client.uploadsRecord) + assertResult(expectedUploads)(storageService.uploadsRecord) } it("copies the file") { val expectedCopies = Map(RemoteKey("prefix/root-file-old") -> RemoteKey("prefix/root-file")) - invokeSubject(s3Client, configOptions) - assertResult(expectedCopies)(s3Client.copiesRecord) + invokeSubject(storageService, configOptions) + assertResult(expectedCopies)(storageService.copiesRecord) } it("deletes the original") { val expectedDeletions = Set(RemoteKey("prefix/root-file-old")) - invokeSubject(s3Client, configOptions) - assertResult(expectedDeletions)(s3Client.deletionsRecord) + invokeSubject(storageService, configOptions) + assertResult(expectedDeletions)(storageService.deletionsRecord) } } describe("when a file is copied it is copied on S3 with no upload") { @@ -125,29 +125,29 @@ class SyncSuite deletedHash -> Set(KeyModified(RemoteKey("prefix/deleted-file"), lastModified))), byKey = Map( deletedKey -> HashModified(deletedHash, lastModified))) - val s3Client = new RecordingClient(testBucket, s3ObjectsData) + val storageService = new RecordingStorageService(testBucket, s3ObjectsData) it("deleted key") { val expectedDeletions = Set(deletedKey) - invokeSubject(s3Client, configOptions) - assertResult(expectedDeletions)(s3Client.deletionsRecord) + invokeSubject(storageService, configOptions) + assertResult(expectedDeletions)(storageService.deletionsRecord) } } describe("when a file is excluded") { val s3ObjectsData = S3ObjectsData(Map(), Map()) - val s3Client = new RecordingClient(testBucket, s3ObjectsData) + val storageService = new RecordingStorageService(testBucket, s3ObjectsData) it("is not uploaded") { val expectedUploads = Map( "root-file" -> rootRemoteKey ) - invokeSubject(s3Client, ConfigOption.Exclude("leaf") :: configOptions) - assertResult(expectedUploads)(s3Client.uploadsRecord) + invokeSubject(storageService, ConfigOption.Exclude("leaf") :: configOptions) + assertResult(expectedUploads)(storageService.uploadsRecord) } } } - class RecordingClient(testBucket: Bucket, - s3ObjectsData: S3ObjectsData) - extends S3Client { + class RecordingStorageService(testBucket: Bucket, + s3ObjectsData: S3ObjectsData) + extends StorageService { var uploadsRecord: Map[String, RemoteKey] = Map() var copiesRecord: Map[RemoteKey, RemoteKey] = Map() @@ -160,30 +160,30 @@ class SyncSuite override def upload(localFile: LocalFile, bucket: Bucket, - progressListener: UploadProgressListener, + uploadEventListener: UploadEventListener, tryCount: Int) - (implicit logger: Logger): IO[UploadS3Action] = { + (implicit logger: Logger): IO[UploadQueueEvent] = { if (bucket == testBucket) uploadsRecord += (localFile.relative.toString -> localFile.remoteKey) - IO.pure(UploadS3Action(localFile.remoteKey, localFile.hash)) + IO.pure(UploadQueueEvent(localFile.remoteKey, localFile.hash)) } override def copy(bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey - )(implicit logger: Logger): IO[CopyS3Action] = { + )(implicit logger: Logger): IO[CopyQueueEvent] = { if (bucket == testBucket) copiesRecord += (sourceKey -> targetKey) - IO.pure(CopyS3Action(targetKey)) + IO.pure(CopyQueueEvent(targetKey)) } override def delete(bucket: Bucket, remoteKey: RemoteKey - )(implicit logger: Logger): IO[DeleteS3Action] = { + )(implicit logger: Logger): IO[DeleteQueueEvent] = { if (bucket == testBucket) deletionsRecord += remoteKey - IO.pure(DeleteS3Action(remoteKey)) + IO.pure(DeleteQueueEvent(remoteKey)) } } } diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/StorageQueueEvent.scala b/domain/src/main/scala/net/kemitix/thorp/domain/StorageQueueEvent.scala new file mode 100644 index 0000000..391d081 --- /dev/null +++ b/domain/src/main/scala/net/kemitix/thorp/domain/StorageQueueEvent.scala @@ -0,0 +1,37 @@ +package net.kemitix.thorp.domain + +sealed trait StorageQueueEvent { + + // the remote key that was uploaded, deleted or otherwise updated by the action + def remoteKey: RemoteKey + + val order: Int + +} + +object StorageQueueEvent { + + final case class DoNothingQueueEvent(remoteKey: RemoteKey) extends StorageQueueEvent { + override val order: Int = 0 + } + + final case class CopyQueueEvent(remoteKey: RemoteKey) extends StorageQueueEvent { + override val order: Int = 1 + } + + final case class UploadQueueEvent(remoteKey: RemoteKey, + md5Hash: MD5Hash) extends StorageQueueEvent { + override val order: Int = 2 + } + + final case class DeleteQueueEvent(remoteKey: RemoteKey) extends StorageQueueEvent { + override val order: Int = 3 + } + + final case class ErrorQueueEvent(remoteKey: RemoteKey, e: Throwable) extends StorageQueueEvent { + override val order: Int = 10 + } + + implicit def ord[A <: StorageQueueEvent]: Ordering[A] = Ordering.by(_.order) + +} diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/Terminal.scala b/domain/src/main/scala/net/kemitix/thorp/domain/Terminal.scala index 8354611..eb0d241 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/Terminal.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/Terminal.scala @@ -1,7 +1,5 @@ package net.kemitix.thorp.domain -import scala.io.AnsiColor._ - object Terminal { private val esc = "\u001B" diff --git a/aws-api/src/main/scala/net/kemitix/thorp/aws/api/UploadEvent.scala b/domain/src/main/scala/net/kemitix/thorp/domain/UploadEvent.scala similarity index 91% rename from aws-api/src/main/scala/net/kemitix/thorp/aws/api/UploadEvent.scala rename to domain/src/main/scala/net/kemitix/thorp/domain/UploadEvent.scala index 7914308..db890c6 100644 --- a/aws-api/src/main/scala/net/kemitix/thorp/aws/api/UploadEvent.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/UploadEvent.scala @@ -1,4 +1,4 @@ -package net.kemitix.thorp.aws.api +package net.kemitix.thorp.domain sealed trait UploadEvent { def name: String diff --git a/aws-api/src/main/scala/net/kemitix/thorp/aws/api/UploadProgressListener.scala b/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventListener.scala similarity index 51% rename from aws-api/src/main/scala/net/kemitix/thorp/aws/api/UploadProgressListener.scala rename to domain/src/main/scala/net/kemitix/thorp/domain/UploadEventListener.scala index 0d8fcae..bac104a 100644 --- a/aws-api/src/main/scala/net/kemitix/thorp/aws/api/UploadProgressListener.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventListener.scala @@ -1,10 +1,9 @@ -package net.kemitix.thorp.aws.api +package net.kemitix.thorp.domain -import net.kemitix.thorp.aws.api.UploadEvent.RequestEvent -import net.kemitix.thorp.domain.LocalFile +import net.kemitix.thorp.domain.UploadEvent.RequestEvent +import net.kemitix.thorp.domain.UploadEventLogger.logRequestCycle -class UploadProgressListener(localFile: LocalFile) - extends UploadProgressLogging { +class UploadEventListener(localFile: LocalFile) { var bytesTransferred = 0L diff --git a/aws-api/src/main/scala/net/kemitix/thorp/aws/api/UploadProgressLogging.scala b/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventLogger.scala similarity index 80% rename from aws-api/src/main/scala/net/kemitix/thorp/aws/api/UploadProgressLogging.scala rename to domain/src/main/scala/net/kemitix/thorp/domain/UploadEventLogger.scala index 3f9244e..26a8d24 100644 --- a/aws-api/src/main/scala/net/kemitix/thorp/aws/api/UploadProgressLogging.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventLogger.scala @@ -1,11 +1,10 @@ -package net.kemitix.thorp.aws.api +package net.kemitix.thorp.domain -import net.kemitix.thorp.aws.api.UploadEvent.RequestEvent import net.kemitix.thorp.domain.SizeTranslation.sizeInEnglish import net.kemitix.thorp.domain.Terminal._ -import net.kemitix.thorp.domain.{LocalFile, Terminal} +import net.kemitix.thorp.domain.UploadEvent.RequestEvent -trait UploadProgressLogging { +trait UploadEventLogger { def logRequestCycle(localFile: LocalFile, event: RequestEvent, @@ -22,3 +21,5 @@ trait UploadProgressLogging { } } + +object UploadEventLogger extends UploadEventLogger diff --git a/domain/src/test/scala/net/kemitix/thorp/domain/TerminalTest.scala b/domain/src/test/scala/net/kemitix/thorp/domain/TerminalTest.scala index d2ce137..9a31997 100644 --- a/domain/src/test/scala/net/kemitix/thorp/domain/TerminalTest.scala +++ b/domain/src/test/scala/net/kemitix/thorp/domain/TerminalTest.scala @@ -1,7 +1,5 @@ package net.kemitix.thorp.domain -import scala.io.AnsiColor._ - import org.scalatest.FunSpec class TerminalTest extends FunSpec { diff --git a/aws-api/src/main/scala/net/kemitix/thorp/aws/api/S3Client.scala b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala similarity index 57% rename from aws-api/src/main/scala/net/kemitix/thorp/aws/api/S3Client.scala rename to storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala index a8a92b3..dd2f826 100644 --- a/aws-api/src/main/scala/net/kemitix/thorp/aws/api/S3Client.scala +++ b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala @@ -1,10 +1,9 @@ -package net.kemitix.thorp.aws.api +package net.kemitix.thorp.storage.api import cats.effect.IO -import net.kemitix.thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action} import net.kemitix.thorp.domain._ -trait S3Client { +trait StorageService { def listObjects(bucket: Bucket, prefix: RemoteKey @@ -12,18 +11,18 @@ trait S3Client { def upload(localFile: LocalFile, bucket: Bucket, - uploadProgressListener: UploadProgressListener, + uploadEventListener: UploadEventListener, tryCount: Int) - (implicit logger: Logger): IO[S3Action] + (implicit logger: Logger): IO[StorageQueueEvent] def copy(bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey - )(implicit logger: Logger): IO[CopyS3Action] + )(implicit logger: Logger): IO[StorageQueueEvent] def delete(bucket: Bucket, remoteKey: RemoteKey - )(implicit logger: Logger): IO[DeleteS3Action] + )(implicit logger: Logger): IO[StorageQueueEvent] } diff --git a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientCopier.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientCopier.scala similarity index 57% rename from aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientCopier.scala rename to storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientCopier.scala index df8ab38..cf73674 100644 --- a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientCopier.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientCopier.scala @@ -1,11 +1,11 @@ -package net.kemitix.thorp.aws.lib +package net.kemitix.thorp.storage.aws import cats.effect.IO import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.CopyObjectRequest -import net.kemitix.thorp.aws.api.S3Action.CopyS3Action -import net.kemitix.thorp.aws.lib.S3ClientLogging.{logCopyFinish, logCopyStart} -import net.kemitix.thorp.domain.{Bucket, Logger, MD5Hash, RemoteKey} +import net.kemitix.thorp.domain.StorageQueueEvent.CopyQueueEvent +import net.kemitix.thorp.domain._ +import net.kemitix.thorp.storage.aws.S3ClientLogging.{logCopyStart, logCopyFinish} class S3ClientCopier(amazonS3: AmazonS3) { @@ -13,12 +13,12 @@ class S3ClientCopier(amazonS3: AmazonS3) { sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey) - (implicit logger: Logger): IO[CopyS3Action] = - for { - _ <- logCopyStart(bucket, sourceKey, targetKey) - _ <- copyObject(bucket, sourceKey, hash, targetKey) - _ <- logCopyFinish(bucket, sourceKey,targetKey) - } yield CopyS3Action(targetKey) + (implicit logger: Logger): IO[StorageQueueEvent] = + for { + _ <- logCopyStart(bucket, sourceKey, targetKey) + _ <- copyObject(bucket, sourceKey, hash, targetKey) + _ <- logCopyFinish(bucket, sourceKey,targetKey) + } yield CopyQueueEvent(targetKey) private def copyObject(bucket: Bucket, sourceKey: RemoteKey, diff --git a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientDeleter.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientDeleter.scala similarity index 66% rename from aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientDeleter.scala rename to storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientDeleter.scala index 68ed98b..c3e48cc 100644 --- a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientDeleter.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientDeleter.scala @@ -1,22 +1,22 @@ -package net.kemitix.thorp.aws.lib +package net.kemitix.thorp.storage.aws import cats.effect.IO import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.DeleteObjectRequest -import net.kemitix.thorp.aws.api.S3Action.DeleteS3Action -import net.kemitix.thorp.aws.lib.S3ClientLogging.{logDeleteFinish, logDeleteStart} +import net.kemitix.thorp.domain.StorageQueueEvent.DeleteQueueEvent import net.kemitix.thorp.domain.{Bucket, Logger, RemoteKey} +import net.kemitix.thorp.storage.aws.S3ClientLogging.{logDeleteStart, logDeleteFinish} class S3ClientDeleter(amazonS3: AmazonS3) { def delete(bucket: Bucket, remoteKey: RemoteKey) - (implicit logger: Logger): IO[DeleteS3Action] = + (implicit logger: Logger): IO[DeleteQueueEvent] = for { _ <- logDeleteStart(bucket, remoteKey) _ <- deleteObject(bucket, remoteKey) _ <- logDeleteFinish(bucket, remoteKey) - } yield DeleteS3Action(remoteKey) + } yield DeleteQueueEvent(remoteKey) private def deleteObject(bucket: Bucket, remoteKey: RemoteKey) = IO(amazonS3.deleteObject(new DeleteObjectRequest(bucket.name, remoteKey.key))) diff --git a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientLogging.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientLogging.scala similarity index 97% rename from aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientLogging.scala rename to storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientLogging.scala index 7ada90e..c82348d 100644 --- a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientLogging.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientLogging.scala @@ -1,4 +1,4 @@ -package net.kemitix.thorp.aws.lib +package net.kemitix.thorp.storage.aws import cats.effect.IO import net.kemitix.thorp.domain.{Bucket, Logger, RemoteKey} diff --git a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientObjectLister.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientObjectLister.scala similarity index 87% rename from aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientObjectLister.scala rename to storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientObjectLister.scala index cc54677..6772c1c 100644 --- a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientObjectLister.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientObjectLister.scala @@ -1,13 +1,13 @@ -package net.kemitix.thorp.aws.lib +package net.kemitix.thorp.storage.aws import cats.effect.IO import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary} -import net.kemitix.thorp.aws.lib.S3ClientLogging.{logListObjectsFinish, logListObjectsStart} -import net.kemitix.thorp.aws.lib.S3ObjectsByHash.byHash -import net.kemitix.thorp.aws.lib.S3ObjectsByKey.byKey import net.kemitix.thorp.domain import net.kemitix.thorp.domain.{Bucket, Logger, RemoteKey, S3ObjectsData} +import net.kemitix.thorp.storage.aws.S3ClientLogging.{logListObjectsStart, logListObjectsFinish} +import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash +import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey import scala.collection.JavaConverters._ diff --git a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ObjectsByHash.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ObjectsByHash.scala similarity index 94% rename from aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ObjectsByHash.scala rename to storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ObjectsByHash.scala index 86d6ecf..e985864 100644 --- a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ObjectsByHash.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ObjectsByHash.scala @@ -1,4 +1,4 @@ -package net.kemitix.thorp.aws.lib +package net.kemitix.thorp.storage.aws import com.amazonaws.services.s3.model.S3ObjectSummary import net.kemitix.thorp.domain.{KeyModified, LastModified, MD5Hash, RemoteKey} diff --git a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ObjectsByKey.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ObjectsByKey.scala similarity index 92% rename from aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ObjectsByKey.scala rename to storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ObjectsByKey.scala index 0459f79..5bcfc67 100644 --- a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ObjectsByKey.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ObjectsByKey.scala @@ -1,4 +1,4 @@ -package net.kemitix.thorp.aws.lib +package net.kemitix.thorp.storage.aws import com.amazonaws.services.s3.model.S3ObjectSummary import net.kemitix.thorp.domain.{HashModified, LastModified, MD5Hash, RemoteKey} diff --git a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/ThorpS3Client.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala similarity index 62% rename from aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/ThorpS3Client.scala rename to storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala index 403700b..1816659 100644 --- a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/ThorpS3Client.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala @@ -1,15 +1,14 @@ -package net.kemitix.thorp.aws.lib +package net.kemitix.thorp.storage.aws import cats.effect.IO import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.transfer.TransferManager -import net.kemitix.thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action} -import net.kemitix.thorp.aws.api.{S3Action, S3Client, UploadProgressListener} import net.kemitix.thorp.domain._ +import net.kemitix.thorp.storage.api.StorageService -class ThorpS3Client(amazonS3Client: => AmazonS3, - amazonS3TransferManager: => TransferManager) - extends S3Client { +class S3StorageService(amazonS3Client: => AmazonS3, + amazonS3TransferManager: => TransferManager) + extends StorageService { lazy val objectLister = new S3ClientObjectLister(amazonS3Client) lazy val copier = new S3ClientCopier(amazonS3Client) @@ -25,19 +24,19 @@ class ThorpS3Client(amazonS3Client: => AmazonS3, sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey) - (implicit logger: Logger): IO[CopyS3Action] = + (implicit logger: Logger): IO[StorageQueueEvent] = copier.copy(bucket, sourceKey,hash, targetKey) override def upload(localFile: LocalFile, bucket: Bucket, - progressListener: UploadProgressListener, + uploadEventListener: UploadEventListener, tryCount: Int) - (implicit logger: Logger): IO[S3Action] = - uploader.upload(localFile, bucket, progressListener, 1) + (implicit logger: Logger): IO[StorageQueueEvent] = + uploader.upload(localFile, bucket, uploadEventListener, 1) override def delete(bucket: Bucket, remoteKey: RemoteKey) - (implicit logger: Logger): IO[DeleteS3Action] = + (implicit logger: Logger): IO[StorageQueueEvent] = deleter.delete(bucket, remoteKey) } diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageServiceBuilder.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageServiceBuilder.scala new file mode 100644 index 0000000..bcdfcaa --- /dev/null +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageServiceBuilder.scala @@ -0,0 +1,16 @@ +package net.kemitix.thorp.storage.aws + +import com.amazonaws.services.s3.transfer.{TransferManager, TransferManagerBuilder} +import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder} +import net.kemitix.thorp.storage.api.StorageService + +object S3StorageServiceBuilder { + + def createService(amazonS3Client: AmazonS3, + amazonS3TransferManager: TransferManager): StorageService = + new S3StorageService(amazonS3Client, amazonS3TransferManager) + + def defaultStorageService: StorageService = + createService(AmazonS3ClientBuilder.defaultClient, TransferManagerBuilder.defaultTransferManager) + +} diff --git a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/Uploader.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala similarity index 67% rename from aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/Uploader.scala rename to storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala index de69eb0..ab7d295 100644 --- a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/Uploader.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala @@ -1,15 +1,14 @@ -package net.kemitix.thorp.aws.lib +package net.kemitix.thorp.storage.aws import cats.effect.IO import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener} import com.amazonaws.services.s3.model.{ObjectMetadata, PutObjectRequest} import com.amazonaws.services.s3.transfer.model.UploadResult import com.amazonaws.services.s3.transfer.{TransferManager => AmazonTransferManager} -import net.kemitix.thorp.aws.api.S3Action.{ErroredS3Action, UploadS3Action} -import net.kemitix.thorp.aws.api.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent} -import net.kemitix.thorp.aws.api.{S3Action, UploadProgressListener} -import net.kemitix.thorp.aws.lib.UploaderLogging.{logMultiPartUploadFinished, logMultiPartUploadStart} -import net.kemitix.thorp.domain._ +import net.kemitix.thorp.domain.StorageQueueEvent.{ErrorQueueEvent, UploadQueueEvent} +import net.kemitix.thorp.domain.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent} +import net.kemitix.thorp.domain.{StorageQueueEvent, _} +import net.kemitix.thorp.storage.aws.UploaderLogging.{logMultiPartUploadStart, logMultiPartUploadFinished} import scala.util.Try @@ -17,24 +16,24 @@ class Uploader(transferManager: => AmazonTransferManager) { def upload(localFile: LocalFile, bucket: Bucket, - uploadProgressListener: UploadProgressListener, + uploadEventListener: UploadEventListener, tryCount: Int) - (implicit logger: Logger): IO[S3Action] = + (implicit logger: Logger): IO[StorageQueueEvent] = for { _ <- logMultiPartUploadStart(localFile, tryCount) - upload <- transfer(localFile, bucket, uploadProgressListener) + upload <- transfer(localFile, bucket, uploadEventListener) action = upload match { - case Right(r) => UploadS3Action(RemoteKey(r.getKey), MD5Hash(r.getETag)) - case Left(e) => ErroredS3Action(localFile.remoteKey, e) + case Right(r) => UploadQueueEvent(RemoteKey(r.getKey), MD5Hash(r.getETag)) + case Left(e) => ErrorQueueEvent(localFile.remoteKey, e) } _ <- logMultiPartUploadFinished(localFile) } yield action private def transfer(localFile: LocalFile, bucket: Bucket, - uploadProgressListener: UploadProgressListener, + uploadEventListener: UploadEventListener, ): IO[Either[Throwable, UploadResult]] = { - val listener: ProgressListener = progressListener(uploadProgressListener) + val listener: ProgressListener = progressListener(uploadEventListener) val putObjectRequest = request(localFile, bucket, listener) IO { Try(transferManager.upload(putObjectRequest)) @@ -51,10 +50,10 @@ class Uploader(transferManager: => AmazonTransferManager) { .withGeneralProgressListener(listener) } - private def progressListener(uploadProgressListener: UploadProgressListener) = + private def progressListener(uploadEventListener: UploadEventListener) = new ProgressListener { override def progressChanged(progressEvent: ProgressEvent): Unit = { - uploadProgressListener.listener(eventHandler(progressEvent)) + uploadEventListener.listener(eventHandler(progressEvent)) } private def eventHandler(progressEvent: ProgressEvent) = { diff --git a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/UploaderLogging.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/UploaderLogging.scala similarity index 95% rename from aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/UploaderLogging.scala rename to storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/UploaderLogging.scala index 944c468..fccec4c 100644 --- a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/UploaderLogging.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/UploaderLogging.scala @@ -1,4 +1,4 @@ -package net.kemitix.thorp.aws.lib +package net.kemitix.thorp.storage.aws import cats.effect.IO import net.kemitix.thorp.domain.SizeTranslation.sizeInEnglish diff --git a/aws-lib/src/test/resources/net/kemitix/thorp/aws/lib/big-file b/storage-aws/src/test/resources/net/kemitix/thorp/storage/aws/big-file similarity index 100% rename from aws-lib/src/test/resources/net/kemitix/thorp/aws/lib/big-file rename to storage-aws/src/test/resources/net/kemitix/thorp/storage/aws/big-file diff --git a/aws-lib/src/test/resources/net/kemitix/thorp/aws/lib/small-file b/storage-aws/src/test/resources/net/kemitix/thorp/storage/aws/small-file similarity index 100% rename from aws-lib/src/test/resources/net/kemitix/thorp/aws/lib/small-file rename to storage-aws/src/test/resources/net/kemitix/thorp/storage/aws/small-file diff --git a/aws-lib/src/test/resources/net/kemitix/thorp/aws/lib/test-file-for-hash.txt b/storage-aws/src/test/resources/net/kemitix/thorp/storage/aws/test-file-for-hash.txt similarity index 100% rename from aws-lib/src/test/resources/net/kemitix/thorp/aws/lib/test-file-for-hash.txt rename to storage-aws/src/test/resources/net/kemitix/thorp/storage/aws/test-file-for-hash.txt diff --git a/aws-lib/src/test/resources/net/kemitix/thorp/aws/lib/upload/root-file b/storage-aws/src/test/resources/net/kemitix/thorp/storage/aws/upload/root-file similarity index 100% rename from aws-lib/src/test/resources/net/kemitix/thorp/aws/lib/upload/root-file rename to storage-aws/src/test/resources/net/kemitix/thorp/storage/aws/upload/root-file diff --git a/aws-lib/src/test/resources/net/kemitix/thorp/aws/lib/upload/subdir/leaf-file b/storage-aws/src/test/resources/net/kemitix/thorp/storage/aws/upload/subdir/leaf-file similarity index 100% rename from aws-lib/src/test/resources/net/kemitix/thorp/aws/lib/upload/subdir/leaf-file rename to storage-aws/src/test/resources/net/kemitix/thorp/storage/aws/upload/subdir/leaf-file diff --git a/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/DummyLogger.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/DummyLogger.scala similarity index 91% rename from aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/DummyLogger.scala rename to storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/DummyLogger.scala index 236e704..2807872 100644 --- a/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/DummyLogger.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/DummyLogger.scala @@ -1,4 +1,4 @@ -package net.kemitix.thorp.aws.lib +package net.kemitix.thorp.storage.aws import cats.effect.IO import net.kemitix.thorp.domain.Logger diff --git a/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/MD5HashData.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/MD5HashData.scala similarity index 83% rename from aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/MD5HashData.scala rename to storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/MD5HashData.scala index 7caef4c..9d38ff4 100644 --- a/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/MD5HashData.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/MD5HashData.scala @@ -1,4 +1,4 @@ -package net.kemitix.thorp.aws.lib +package net.kemitix.thorp.storage.aws import net.kemitix.thorp.domain.MD5Hash diff --git a/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/S3ObjectsByHashSuite.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3ObjectsByHashSuite.scala similarity index 97% rename from aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/S3ObjectsByHashSuite.scala rename to storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3ObjectsByHashSuite.scala index 74bc43f..c9a2452 100644 --- a/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/S3ObjectsByHashSuite.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3ObjectsByHashSuite.scala @@ -1,4 +1,4 @@ -package net.kemitix.thorp.aws.lib +package net.kemitix.thorp.storage.aws import java.time.Instant import java.time.temporal.ChronoUnit diff --git a/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/ThorpS3ClientSuite.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3StorageServiceSuite.scala similarity index 90% rename from aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/ThorpS3ClientSuite.scala rename to storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3StorageServiceSuite.scala index b62f172..4c52a93 100644 --- a/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/ThorpS3ClientSuite.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3StorageServiceSuite.scala @@ -1,4 +1,4 @@ -package net.kemitix.thorp.aws.lib +package net.kemitix.thorp.storage.aws import java.time.Instant import java.time.temporal.ChronoUnit @@ -12,7 +12,7 @@ import net.kemitix.thorp.domain._ import org.scalamock.scalatest.MockFactory import org.scalatest.FunSpec -class ThorpS3ClientSuite +class S3StorageServiceSuite extends FunSpec with MockFactory { @@ -47,7 +47,7 @@ class ThorpS3ClientSuite val amazonS3 = stub[AmazonS3] val amazonS3TransferManager = stub[TransferManager] - val s3Client = new ThorpS3Client(amazonS3, amazonS3TransferManager) + val storageService = new S3StorageService(amazonS3, amazonS3TransferManager) val myFakeResponse = new ListObjectsV2Result() val summaries = myFakeResponse.getObjectSummaries @@ -65,7 +65,7 @@ class ThorpS3ClientSuite k1a -> HashModified(h1, lm), k1b -> HashModified(h1, lm), k2 -> HashModified(h2, lm))) - val result = s3Client.listObjects(Bucket("bucket"), RemoteKey("prefix")).unsafeRunSync + val result = storageService.listObjects(Bucket("bucket"), RemoteKey("prefix")).unsafeRunSync assertResult(expected.byHash.keys)(result.byHash.keys) assertResult(expected.byKey.keys)(result.byKey.keys) assertResult(expected)(result) diff --git a/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/S3ClientSuite.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala similarity index 79% rename from aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/S3ClientSuite.scala rename to storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala index 2a6d53a..96b9ddb 100644 --- a/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/S3ClientSuite.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala @@ -1,4 +1,4 @@ -package net.kemitix.thorp.aws.lib +package net.kemitix.thorp.storage.aws import java.time.Instant @@ -6,15 +6,15 @@ import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.PutObjectRequest import com.amazonaws.services.s3.transfer.model.UploadResult import com.amazonaws.services.s3.transfer.{TransferManager, Upload} -import net.kemitix.thorp.aws.api.S3Action.UploadS3Action -import net.kemitix.thorp.aws.api.{S3Client, UploadProgressListener} -import net.kemitix.thorp.aws.lib.MD5HashData.rootHash import net.kemitix.thorp.core.{KeyGenerator, Resource, S3MetaDataEnricher} +import net.kemitix.thorp.core.MD5HashData.rootHash +import net.kemitix.thorp.domain.StorageQueueEvent.UploadQueueEvent import net.kemitix.thorp.domain._ +import net.kemitix.thorp.storage.api.StorageService import org.scalamock.scalatest.MockFactory import org.scalatest.FunSpec -class S3ClientSuite +class StorageServiceSuite extends FunSpec with MockFactory { @@ -42,40 +42,40 @@ class S3ClientSuite keyotherkey.remoteKey -> HashModified(hash, lastModified), keydiffhash.remoteKey -> HashModified(diffhash, lastModified))) - def invoke(self: S3Client, localFile: LocalFile) = { + def invoke(self: StorageService, localFile: LocalFile) = { S3MetaDataEnricher.getS3Status(localFile, s3ObjectsData) } describe("when remote key exists") { - val s3Client = S3ClientBuilder.defaultClient + val storageService = S3StorageServiceBuilder.defaultStorageService it("should return (Some, Set.nonEmpty)") { assertResult( (Some(HashModified(hash, lastModified)), Set( KeyModified(key, lastModified), KeyModified(keyotherkey.remoteKey, lastModified))) - )(invoke(s3Client, localFile)) + )(invoke(storageService, localFile)) } } describe("when remote key does not exist and no others matches hash") { - val s3Client = S3ClientBuilder.defaultClient + val storageService = S3StorageServiceBuilder.defaultStorageService it("should return (None, Set.empty)") { val localFile = LocalFile.resolve("missing-file", MD5Hash("unique"), source, fileToKey) assertResult( (None, Set.empty) - )(invoke(s3Client, localFile)) + )(invoke(storageService, localFile)) } } describe("when remote key exists and no others match hash") { - val s3Client = S3ClientBuilder.defaultClient + val storageService = S3StorageServiceBuilder.defaultStorageService it("should return (None, Set.nonEmpty)") { assertResult( (Some(HashModified(diffhash, lastModified)), Set(KeyModified(keydiffhash.remoteKey, lastModified))) - )(invoke(s3Client, keydiffhash)) + )(invoke(storageService, keydiffhash)) } } @@ -86,14 +86,14 @@ class S3ClientSuite describe("when uploading a file") { val amazonS3 = stub[AmazonS3] val amazonS3TransferManager = stub[TransferManager] - val s3Client = new ThorpS3Client(amazonS3, amazonS3TransferManager) + val storageService = new S3StorageService(amazonS3, amazonS3TransferManager) val prefix = RemoteKey("prefix") val localFile = LocalFile.resolve("root-file", rootHash, source, KeyGenerator.generateKey(source, prefix)) val bucket = Bucket("a-bucket") val remoteKey = RemoteKey("prefix/root-file") - val progressListener = new UploadProgressListener(localFile) + val uploadEventListener = new UploadEventListener(localFile) val upload = stub[Upload] (amazonS3TransferManager upload (_: PutObjectRequest)).when(*).returns(upload) @@ -105,8 +105,8 @@ class S3ClientSuite it("should return hash of uploaded file") { pending //FIXME: works okay on its own, but fails when run with others - val expected = UploadS3Action(remoteKey, rootHash) - val result = s3Client.upload(localFile, bucket, progressListener, 1) + val expected = UploadQueueEvent(remoteKey, rootHash) + val result = storageService.upload(localFile, bucket, uploadEventListener, 1) assertResult(expected)(result) } } diff --git a/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/UploaderSuite.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala similarity index 80% rename from aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/UploaderSuite.scala rename to storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala index e8a5ff1..30b613d 100644 --- a/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/UploaderSuite.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala @@ -1,14 +1,13 @@ -package net.kemitix.thorp.aws.lib +package net.kemitix.thorp.storage.aws import java.time.Instant import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.transfer._ -import net.kemitix.thorp.aws.api.S3Action.UploadS3Action -import net.kemitix.thorp.aws.api.UploadProgressListener import net.kemitix.thorp.core.KeyGenerator.generateKey import net.kemitix.thorp.core.Resource -import net.kemitix.thorp.domain._ +import net.kemitix.thorp.domain.{UploadEventListener, _} +import net.kemitix.thorp.domain.StorageQueueEvent.UploadQueueEvent import org.scalamock.scalatest.MockFactory import org.scalatest.FunSpec @@ -33,13 +32,13 @@ class UploaderSuite val returnedKey = RemoteKey("returned-key") val returnedHash = MD5Hash("returned-hash") val bigFile = LocalFile.resolve("small-file", MD5Hash("the-hash"), source, fileToKey) - val progressListener = new UploadProgressListener(bigFile) + val uploadEventListener = new UploadEventListener(bigFile) val amazonS3 = mock[AmazonS3] val amazonS3TransferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build val uploader = new Uploader(amazonS3TransferManager) it("should upload") { - val expected = UploadS3Action(returnedKey, returnedHash) - val result = uploader.upload(bigFile, config.bucket, progressListener, 1) + val expected = UploadQueueEvent(returnedKey, returnedHash) + val result = uploader.upload(bigFile, config.bucket, uploadEventListener, 1) assertResult(expected)(result) } }