diff --git a/CHANGELOG.org b/CHANGELOG.org index aeeeb76..f79e484 100644 --- a/CHANGELOG.org +++ b/CHANGELOG.org @@ -10,6 +10,7 @@ The format is based on [[https://keepachangelog.com/en/1.0.0/][Keep a Changelog] ** Added - Abstraction layer encapsulating S3 as Storage (#76) + - Multiple copies of file are only uploaded once (#74) * [0.5.0] - 2019-06-21 diff --git a/build.sbt b/build.sbt index 4505875..06eaf65 100644 --- a/build.sbt +++ b/build.sbt @@ -45,12 +45,12 @@ val catsEffectsSettings = Seq( lazy val thorp = (project in file(".")) .settings(commonSettings) + .aggregate(cli, `thorp-lib`, `storage-aws`, core, `storage-api`, domain) lazy val cli = (project in file("cli")) .settings(commonSettings) .settings(mainClass in assembly := Some("net.kemitix.thorp.cli.Main")) .settings(applicationSettings) - .aggregate(`thorp-lib`, `storage-aws`, core, `storage-api`, domain) .settings(commandLineParsing) .settings(testDependencies) .dependsOn(`thorp-lib`) @@ -65,7 +65,7 @@ lazy val `storage-aws` = (project in file("storage-aws")) .settings(assemblyJarName in assembly := "storage-aws.jar") .settings(awsSdkDependencies) .settings(testDependencies) - .dependsOn(core) + .dependsOn(core % "compile->compile;test->test") lazy val core = (project in file("core")) .settings(commonSettings) diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala index c286c24..5a2fe81 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala @@ -1,24 +1,34 @@ package net.kemitix.thorp.cli import cats.effect.{ExitCode, IO} -import net.kemitix.thorp.core.{ConfigOption, Sync} -import net.kemitix.thorp.domain.Logger +import cats.implicits._ +import net.kemitix.thorp.core._ +import net.kemitix.thorp.domain.{Logger, StorageQueueEvent} import net.kemitix.thorp.storage.aws.S3StorageServiceBuilder.defaultStorageService trait Program { - def apply(configOptions: Seq[ConfigOption]): IO[ExitCode] = { + def apply(cliOptions: Seq[ConfigOption]): IO[ExitCode] = { implicit val logger: Logger = new PrintLogger() - Sync(defaultStorageService)(configOptions) flatMap { + Synchronise(defaultStorageService, cliOptions).flatMap { case Left(errors) => for { _ <- logger.error(s"There were errors:") - _ <- IO.pure(errors.map(error => logger.error(s" - $error"))) + _ <- errors.map(error => logger.error(s" - $error")).sequence } yield ExitCode.Error - case Right(_) => IO.pure(ExitCode.Success) + case Right(actions) => + for { + events <- handleActions(UnversionedMirrorArchive.default(defaultStorageService), actions) + _ <- SyncLogging.logRunFinished(events) + } yield ExitCode.Success } } + private def handleActions(archive: ThorpArchive, + actions: Stream[Action]): IO[Stream[StorageQueueEvent]] = + actions.foldRight(Stream[IO[StorageQueueEvent]]()) { + (action, stream) => archive.update(action) ++ stream + }.sequence } object Program extends Program \ No newline at end of file diff --git a/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala b/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala index 0844bbc..220ac97 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala @@ -29,7 +29,7 @@ object LocalFileStream { file match { case f if f.isDirectory => loop(file) case _ => for(hash <- md5HashGenerator(file)) - yield Stream(domain.LocalFile(file, c.source, hash, generateKey(c.source, c.prefix))) + yield Stream(domain.LocalFile(file, c.source, hash, generateKey(c.source, c.prefix)(file))) } def recurse(fs: Stream[File]): IO[Stream[LocalFile]] = diff --git a/core/src/main/scala/net/kemitix/thorp/core/Sync.scala b/core/src/main/scala/net/kemitix/thorp/core/Sync.scala deleted file mode 100644 index c4edfa7..0000000 --- a/core/src/main/scala/net/kemitix/thorp/core/Sync.scala +++ /dev/null @@ -1,85 +0,0 @@ -package net.kemitix.thorp.core - -import cats.effect.IO -import cats.implicits._ -import net.kemitix.thorp.core.Action.ToDelete -import net.kemitix.thorp.core.ActionGenerator.createActions -import net.kemitix.thorp.core.ActionSubmitter.submitAction -import net.kemitix.thorp.core.ConfigurationBuilder.buildConfig -import net.kemitix.thorp.core.LocalFileStream.findFiles -import net.kemitix.thorp.core.S3MetaDataEnricher.getMetadata -import net.kemitix.thorp.core.SyncLogging.{logFileScan, logRunFinished, logRunStart} -import net.kemitix.thorp.domain._ -import net.kemitix.thorp.storage.api.StorageService - -trait Sync { - - def errorMessages(errors: List[ConfigValidation]): List[String] = { - for { - errorMessages <- errors.map(cv => cv.errorMessage) - } yield errorMessages - } - - def apply(storageService: StorageService) - (configOptions: Seq[ConfigOption]) - (implicit defaultLogger: Logger): IO[Either[List[String], Unit]] = - buildConfig(configOptions).flatMap { - case Right(config) => runWithValidConfig(storageService, defaultLogger, config) - case Left(errors) => IO.pure(Left(errorMessages(errors.toList))) - } - - private def runWithValidConfig(storageService: StorageService, - defaultLogger: Logger, - config: Config) = { - for { - _ <- run(config, storageService, defaultLogger.withDebug(config.debug)) - } yield Right(()) - } - - private def run(cliConfig: Config, - storageService: StorageService, - logger: Logger): IO[Unit] = { - - implicit val c: Config = cliConfig - implicit val l: Logger = logger - - def metaData(s3Data: S3ObjectsData, sFiles: Stream[LocalFile]) = - IO.pure(sFiles.map(file => getMetadata(file, s3Data))) - - def actions(sData: Stream[S3MetaData]) = - IO.pure(sData.flatMap(s3MetaData => createActions(s3MetaData))) - - def submit(sActions: Stream[Action]) = - IO(sActions.flatMap(action => submitAction(storageService, action))) - - def copyUploadActions(s3Data: S3ObjectsData): IO[Stream[StorageQueueEvent]] = - (for { - files <- findFiles(c.source, MD5HashGenerator.md5File(_)) - metaData <- metaData(s3Data, files) - actions <- actions(metaData) - s3Actions <- submit(actions) - } yield s3Actions.sequence) - .flatten - .map(streamS3Actions => streamS3Actions.sorted) - - def deleteActions(s3ObjectsData: S3ObjectsData): IO[Stream[StorageQueueEvent]] = - (for { - key <- s3ObjectsData.byKey.keys - if key.isMissingLocally(c.source, c.prefix) - ioDelAction <- submitAction(storageService, ToDelete(c.bucket, key)) - } yield ioDelAction) - .toStream - .sequence - - for { - _ <- logRunStart - s3data <- storageService.listObjects(c.bucket, c.prefix) - _ <- logFileScan - copyUploadActions <- copyUploadActions(s3data) - deleteActions <- deleteActions(s3data) - _ <- logRunFinished(copyUploadActions ++ deleteActions) - } yield () - } -} - -object Sync extends Sync diff --git a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala b/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala index 2096cbe..5197359 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala @@ -1,16 +1,19 @@ package net.kemitix.thorp.core +import java.io.File + import cats.effect.IO import cats.implicits._ -import net.kemitix.thorp.domain.{Config, Logger, StorageQueueEvent} +import net.kemitix.thorp.domain.{Bucket, Config, Logger, RemoteKey, StorageQueueEvent} import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, ErrorQueueEvent, UploadQueueEvent} -// Logging for the Sync class trait SyncLogging { - def logRunStart(implicit c: Config, - logger: Logger): IO[Unit] = - logger.info(s"Bucket: ${c.bucket.name}, Prefix: ${c.prefix.key}, Source: ${c.source}, ") + def logRunStart(bucket: Bucket, + prefix: RemoteKey, + source: File) + (implicit logger: Logger): IO[Unit] = + logger.info(s"Bucket: ${bucket.name}, Prefix: ${prefix.key}, Source: $source, ") def logFileScan(implicit c: Config, logger: Logger): IO[Unit] = @@ -26,8 +29,7 @@ trait SyncLogging { } yield () def logRunFinished(actions: Stream[StorageQueueEvent]) - (implicit c: Config, - logger: Logger): IO[Unit] = { + (implicit logger: Logger): IO[Unit] = { val counters = actions.foldLeft(Counters())(countActivities) for { _ <- logger.info(s"Uploaded ${counters.uploaded} files") @@ -38,8 +40,7 @@ trait SyncLogging { } yield () } - private def countActivities(implicit c: Config, - logger: Logger): (Counters, StorageQueueEvent) => Counters = + private def countActivities: (Counters, StorageQueueEvent) => Counters = (counters: Counters, s3Action: StorageQueueEvent) => { s3Action match { case _: UploadQueueEvent => diff --git a/core/src/main/scala/net/kemitix/thorp/core/Synchronise.scala b/core/src/main/scala/net/kemitix/thorp/core/Synchronise.scala new file mode 100644 index 0000000..4c915f1 --- /dev/null +++ b/core/src/main/scala/net/kemitix/thorp/core/Synchronise.scala @@ -0,0 +1,73 @@ +package net.kemitix.thorp.core + +import cats.data.NonEmptyChain +import cats.effect.IO +import cats.implicits._ +import net.kemitix.thorp.core.Action.DoNothing +import net.kemitix.thorp.domain.{Config, LocalFile, Logger, RemoteKey, S3ObjectsData} +import net.kemitix.thorp.storage.api.StorageService + +trait Synchronise { + + def apply(storageService: StorageService, + configOptions: Seq[ConfigOption]) + (implicit logger: Logger): IO[Either[List[String], Stream[Action]]] = + ConfigurationBuilder.buildConfig(configOptions) + .flatMap { + case Left(errors) => IO.pure(Left(errorMessages(errors))) + case Right(config) => useValidConfig(storageService, config) + } + + def errorMessages(errors: NonEmptyChain[ConfigValidation]): List[String] = + errors.map(cv => cv.errorMessage).toList + + def removeDoNothing: Action => Boolean = { + case _: DoNothing => false + case _ => true + } + + def useValidConfig(storageService: StorageService, + config: Config) + (implicit logger: Logger): IO[Either[List[String], Stream[Action]]] = { + for { + _ <- SyncLogging.logRunStart(config.bucket, config.prefix, config.source) + actions <- gatherMetadata(storageService, logger, config) + .map { md => + val (rd, ld) = md + val actions1 = actionsForLocalFiles(config, ld, rd) + val actions2 = actionsForRemoteKeys(config, rd) + Right((actions1 ++ actions2).filter(removeDoNothing)) + } + } yield actions + } + + private def gatherMetadata(storageService: StorageService, + logger: Logger, + config: Config) = + for { + remoteData <- fetchRemoteData(storageService, config) + localData <- findLocalFiles(config, logger) + } yield (remoteData, localData) + + private def actionsForLocalFiles(config: Config, localData: Stream[LocalFile], remoteData: S3ObjectsData) = + localData.foldLeft(Stream[Action]())((acc, lf) => createActionFromLocalFile(config, lf, remoteData) ++ acc) + + private def actionsForRemoteKeys(config: Config, remoteData: S3ObjectsData) = + remoteData.byKey.keys.foldLeft(Stream[Action]())((acc, rk) => createActionFromRemoteKey(config, rk) #:: acc) + + private def fetchRemoteData(storageService: StorageService, config: Config) = + storageService.listObjects(config.bucket, config.prefix) + + private def findLocalFiles(implicit config: Config, l: Logger) = + LocalFileStream.findFiles(config.source, MD5HashGenerator.md5File(_)) + + private def createActionFromLocalFile(c: Config, lf: LocalFile, remoteData: S3ObjectsData) = + ActionGenerator.createActions(S3MetaDataEnricher.getMetadata(lf, remoteData)(c))(c) + + private def createActionFromRemoteKey(c: Config, rk: RemoteKey) = + if (rk.isMissingLocally(c.source, c.prefix)) Action.ToDelete(c.bucket, rk) + else DoNothing(c.bucket, rk) + +} + +object Synchronise extends Synchronise diff --git a/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala b/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala new file mode 100644 index 0000000..6889bcc --- /dev/null +++ b/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala @@ -0,0 +1,10 @@ +package net.kemitix.thorp.core + +import cats.effect.IO +import net.kemitix.thorp.domain.StorageQueueEvent + +trait ThorpArchive { + + def update(action: Action): Stream[IO[StorageQueueEvent]] + +} diff --git a/core/src/main/scala/net/kemitix/thorp/core/ActionSubmitter.scala b/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala similarity index 52% rename from core/src/main/scala/net/kemitix/thorp/core/ActionSubmitter.scala rename to core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala index 9f56200..cc7c8c9 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ActionSubmitter.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala @@ -2,38 +2,33 @@ package net.kemitix.thorp.core import cats.effect.IO import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} -import net.kemitix.thorp.domain.{Config, Logger, StorageQueueEvent, UploadEventListener} import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent +import net.kemitix.thorp.domain.{StorageQueueEvent, UploadEventListener} import net.kemitix.thorp.storage.api.StorageService -trait ActionSubmitter { - - def submitAction(storageService: StorageService, - action: Action) - (implicit c: Config, - logger: Logger): Stream[IO[StorageQueueEvent]] = { +case class UnversionedMirrorArchive(storageService: StorageService) extends ThorpArchive { + override def update(action: Action): Stream[IO[StorageQueueEvent]] = Stream( action match { case ToUpload(bucket, localFile) => for { - _ <- logger.info(s" Upload: ${localFile.relative}") - uploadEventListener = new UploadEventListener(localFile) - event <- storageService.upload(localFile, bucket, uploadEventListener, 1) + event <- storageService.upload(localFile, bucket, new UploadEventListener(localFile), 1) } yield event case ToCopy(bucket, sourceKey, hash, targetKey) => for { - _ <- logger.info(s" Copy: ${sourceKey.key} => ${targetKey.key}") event <- storageService.copy(bucket, sourceKey, hash, targetKey) } yield event case ToDelete(bucket, remoteKey) => for { - _ <- logger.info(s" Delete: ${remoteKey.key}") event <- storageService.delete(bucket, remoteKey) } yield event case DoNothing(_, remoteKey) => IO.pure(DoNothingQueueEvent(remoteKey)) }) - } + } -object ActionSubmitter extends ActionSubmitter +object UnversionedMirrorArchive { + def default(storageService: StorageService): ThorpArchive = + new UnversionedMirrorArchive(storageService) +} diff --git a/core/src/test/scala/net/kemitix/thorp/core/MD5HashData.scala b/core/src/test/scala/net/kemitix/thorp/core/MD5HashData.scala index 24e7703..1316fe0 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/MD5HashData.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/MD5HashData.scala @@ -6,6 +6,6 @@ object MD5HashData { val rootHash = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e", Some("o6asEaDrV3uBs7tclcyKbg==")) - val leafHash = MD5Hash("208386a650bdec61cfcd7bd8dcb6b542") + val leafHash = MD5Hash("208386a650bdec61cfcd7bd8dcb6b542", Some("IIOGplC97GHPzXvY3La1Qg==")) } diff --git a/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala index 6062242..7a0b6bc 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala @@ -4,6 +4,7 @@ import java.io.File import java.time.Instant import cats.effect.IO +import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.core.MD5HashData.{leafHash, rootHash} import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, UploadQueueEvent} @@ -28,120 +29,110 @@ class SyncSuite def putObjectRequest(bucket: Bucket, remoteKey: RemoteKey, localFile: LocalFile): (String, String, File) = (bucket.name, remoteKey.key, localFile.file) - describe("Sync.apply") { - val testBucket = Bucket("bucket") - // source contains the files root-file and subdir/leaf-file - val rootRemoteKey = RemoteKey("prefix/root-file") - val leafRemoteKey = RemoteKey("prefix/subdir/leaf-file") + val testBucket = Bucket("bucket") + // source contains the files root-file and subdir/leaf-file + val rootRemoteKey = RemoteKey("prefix/root-file") + val leafRemoteKey = RemoteKey("prefix/subdir/leaf-file") + val rootFile: LocalFile = LocalFile.resolve("root-file", rootHash, source, _ => rootRemoteKey) + val leafFile: LocalFile = LocalFile.resolve("subdir/leaf-file", leafHash, source, _ => leafRemoteKey) - def invokeSubject(storageService: StorageService, - configOptions: List[ConfigOption]) = { - Sync(storageService)(configOptions).unsafeRunSync - } + def invokeSubject(storageService: StorageService, + configOptions: List[ConfigOption]): Either[List[String], Stream[Action]] = { + Synchronise(storageService, configOptions).unsafeRunSync + } - describe("when all files should be uploaded") { - val storageService = new RecordingStorageService(testBucket, S3ObjectsData( - byHash = Map(), - byKey = Map())) - it("uploads all files") { - val expectedUploads = Map( - "subdir/leaf-file" -> leafRemoteKey, - "root-file" -> rootRemoteKey) - invokeSubject(storageService, configOptions) - assertResult(expectedUploads)(storageService.uploadsRecord) - } - it("copies nothing") { - val expectedCopies = Map() - invokeSubject(storageService, configOptions) - assertResult(expectedCopies)(storageService.copiesRecord) - } - it("deletes nothing") { - val expectedDeletions = Set() - invokeSubject(storageService, configOptions) - assertResult(expectedDeletions)(storageService.deletionsRecord) - } + describe("when all files should be uploaded") { + val storageService = new RecordingStorageService(testBucket, S3ObjectsData( + byHash = Map(), + byKey = Map())) + it("uploads all files") { + val expected = Stream( + ToUpload(testBucket, rootFile), + ToUpload(testBucket, leafFile)) + val result = invokeSubject(storageService, configOptions) + assert(result.isRight) + assertResult(expected)(result.right.get) } - describe("when no files should be uploaded") { - val s3ObjectsData = S3ObjectsData( - byHash = Map( - rootHash -> Set(KeyModified(RemoteKey("prefix/root-file"), lastModified)), - leafHash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))), - byKey = Map( - RemoteKey("prefix/root-file") -> HashModified(rootHash, lastModified), - RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified))) - val storageService = new RecordingStorageService(testBucket, s3ObjectsData) - it("uploads nothing") { - val expectedUploads = Map() - invokeSubject(storageService, configOptions) - assertResult(expectedUploads)(storageService.uploadsRecord) - } - it("copies nothing") { - val expectedCopies = Map() - invokeSubject(storageService, configOptions) - assertResult(expectedCopies)(storageService.copiesRecord) - } - it("deletes nothing") { - val expectedDeletions = Set() - invokeSubject(storageService, configOptions) - assertResult(expectedDeletions)(storageService.deletionsRecord) - } + } + describe("when no files should be uploaded") { + val s3ObjectsData = S3ObjectsData( + byHash = Map( + rootHash -> Set(KeyModified(RemoteKey("prefix/root-file"), lastModified)), + leafHash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))), + byKey = Map( + RemoteKey("prefix/root-file") -> HashModified(rootHash, lastModified), + RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified))) + val storageService = new RecordingStorageService(testBucket, s3ObjectsData) + it("no actions") { + val expected = Stream() + val result = invokeSubject(storageService, configOptions) + assert(result.isRight) + assertResult(expected)(result.right.get) } - describe("when a file is renamed it is moved on S3 with no upload") { - // 'root-file-old' should be renamed as 'root-file' - val s3ObjectsData = S3ObjectsData( - byHash = Map( - rootHash -> Set(KeyModified(RemoteKey("prefix/root-file-old"), lastModified)), - leafHash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))), - byKey = Map( - RemoteKey("prefix/root-file-old") -> HashModified(rootHash, lastModified), - RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified))) - val storageService = new RecordingStorageService(testBucket, s3ObjectsData) - it("uploads nothing") { - invokeSubject(storageService, configOptions) - val expectedUploads = Map() - assertResult(expectedUploads)(storageService.uploadsRecord) - } - it("copies the file") { - val expectedCopies = Map(RemoteKey("prefix/root-file-old") -> RemoteKey("prefix/root-file")) - invokeSubject(storageService, configOptions) - assertResult(expectedCopies)(storageService.copiesRecord) - } - it("deletes the original") { - val expectedDeletions = Set(RemoteKey("prefix/root-file-old")) - invokeSubject(storageService, configOptions) - assertResult(expectedDeletions)(storageService.deletionsRecord) - } + } + describe("when a file is renamed it is moved on S3 with no upload") { + val sourceKey = RemoteKey("prefix/root-file-old") + val targetKey = RemoteKey("prefix/root-file") + // 'root-file-old' should be renamed as 'root-file' + val s3ObjectsData = S3ObjectsData( + byHash = Map( + rootHash -> Set(KeyModified(sourceKey, lastModified)), + leafHash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))), + byKey = Map( + sourceKey -> HashModified(rootHash, lastModified), + RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified))) + val storageService = new RecordingStorageService(testBucket, s3ObjectsData) + it("copies the file and deletes the original") { + val expected = Stream( + ToCopy(testBucket, sourceKey, rootHash, targetKey), + ToDelete(testBucket, sourceKey) + ) + val result = invokeSubject(storageService, configOptions) + assert(result.isRight) + assertResult(expected)(result.right.get) } - describe("when a file is copied it is copied on S3 with no upload") { - it("TODO") { - pending - } + } + describe("when a file is copied it is copied on S3 with no upload") { + it("TODO") { + pending } - describe("when a file is deleted locally it is deleted from S3") { - val deletedHash = MD5Hash("deleted-hash") - val deletedKey = RemoteKey("prefix/deleted-file") - val s3ObjectsData = S3ObjectsData( - byHash = Map( - deletedHash -> Set(KeyModified(RemoteKey("prefix/deleted-file"), lastModified))), - byKey = Map( - deletedKey -> HashModified(deletedHash, lastModified))) - val storageService = new RecordingStorageService(testBucket, s3ObjectsData) - it("deleted key") { - val expectedDeletions = Set(deletedKey) - invokeSubject(storageService, configOptions) - assertResult(expectedDeletions)(storageService.deletionsRecord) - } + } + describe("when a file is deleted locally it is deleted from S3") { + val deletedHash = MD5Hash("deleted-hash") + val deletedKey = RemoteKey("prefix/deleted-file") + val s3ObjectsData = S3ObjectsData( + byHash = Map( + rootHash -> Set(KeyModified(RemoteKey("prefix/root-file"), lastModified)), + leafHash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified)), + deletedHash -> Set(KeyModified(RemoteKey("prefix/deleted-file"), lastModified))), + byKey = Map( + RemoteKey("prefix/root-file") -> HashModified(rootHash, lastModified), + RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified), + deletedKey -> HashModified(deletedHash, lastModified))) + val storageService = new RecordingStorageService(testBucket, s3ObjectsData) + it("deleted key") { + val expected = Stream( + ToDelete(testBucket, deletedKey) + ) + val result = invokeSubject(storageService, configOptions) + assert(result.isRight) + assertResult(expected)(result.right.get) } - describe("when a file is excluded") { - val s3ObjectsData = S3ObjectsData(Map(), Map()) - val storageService = new RecordingStorageService(testBucket, s3ObjectsData) - it("is not uploaded") { - val expectedUploads = Map( - "root-file" -> rootRemoteKey - ) - invokeSubject(storageService, ConfigOption.Exclude("leaf") :: configOptions) - assertResult(expectedUploads)(storageService.uploadsRecord) - } + } + describe("when a file is excluded") { + val s3ObjectsData = S3ObjectsData( + byHash = Map( + rootHash -> Set(KeyModified(RemoteKey("prefix/root-file"), lastModified)), + leafHash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))), + byKey = Map( + RemoteKey("prefix/root-file") -> HashModified(rootHash, lastModified), + RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified))) + val storageService = new RecordingStorageService(testBucket, s3ObjectsData) + it("is not uploaded") { + val expected = Stream() + val result = invokeSubject(storageService, ConfigOption.Exclude("leaf") :: configOptions) + assert(result.isRight) + assertResult(expected)(result.right.get) } } @@ -149,40 +140,26 @@ class SyncSuite s3ObjectsData: S3ObjectsData) extends StorageService { - var uploadsRecord: Map[String, RemoteKey] = Map() - var copiesRecord: Map[RemoteKey, RemoteKey] = Map() - var deletionsRecord: Set[RemoteKey] = Set() - override def listObjects(bucket: Bucket, - prefix: RemoteKey) - (implicit logger: Logger): IO[S3ObjectsData] = + prefix: RemoteKey): IO[S3ObjectsData] = IO.pure(s3ObjectsData) override def upload(localFile: LocalFile, bucket: Bucket, uploadEventListener: UploadEventListener, - tryCount: Int) - (implicit logger: Logger): IO[UploadQueueEvent] = { - if (bucket == testBucket) - uploadsRecord += (localFile.relative.toString -> localFile.remoteKey) + tryCount: Int): IO[UploadQueueEvent] = { IO.pure(UploadQueueEvent(localFile.remoteKey, localFile.hash)) } override def copy(bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, - targetKey: RemoteKey - )(implicit logger: Logger): IO[CopyQueueEvent] = { - if (bucket == testBucket) - copiesRecord += (sourceKey -> targetKey) + targetKey: RemoteKey): IO[CopyQueueEvent] = { IO.pure(CopyQueueEvent(targetKey)) } override def delete(bucket: Bucket, - remoteKey: RemoteKey - )(implicit logger: Logger): IO[DeleteQueueEvent] = { - if (bucket == testBucket) - deletionsRecord += remoteKey + remoteKey: RemoteKey): IO[DeleteQueueEvent] = { IO.pure(DeleteQueueEvent(remoteKey)) } } diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/LocalFile.scala b/domain/src/main/scala/net/kemitix/thorp/domain/LocalFile.scala index c77633a..eae8bb7 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/LocalFile.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/LocalFile.scala @@ -3,13 +3,10 @@ package net.kemitix.thorp.domain import java.io.File import java.nio.file.Path -final case class LocalFile(file: File, source: File, hash: MD5Hash, keyGenerator: File => RemoteKey) { +final case class LocalFile(file: File, source: File, hash: MD5Hash, remoteKey: RemoteKey) { require(!file.isDirectory, s"LocalFile must not be a directory: $file") - // the equivalent location of the file on S3 - def remoteKey: RemoteKey = keyGenerator(file) - def isDirectory: Boolean = file.isDirectory // the path of the file within the source @@ -25,6 +22,6 @@ object LocalFile { source: File, fileToKey: File => RemoteKey): LocalFile = { val file = source.toPath.resolve(path).toFile - LocalFile(file, source, md5Hash, fileToKey) + LocalFile(file, source, md5Hash, fileToKey(file)) } } diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/Terminal.scala b/domain/src/main/scala/net/kemitix/thorp/domain/Terminal.scala index eb0d241..333709d 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/Terminal.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/Terminal.scala @@ -10,116 +10,116 @@ object Terminal { * * Stops at the edge of the screen. */ - def cursorUp(lines: Int = 1) = csi + lines + "A" + def cursorUp(lines: Int = 1): String = csi + lines + "A" /** * Move the cursor down, default 1 line. * * Stops at the edge of the screen. */ - def cursorDown(lines: Int = 1) = csi + lines + "B" + def cursorDown(lines: Int = 1): String = csi + lines + "B" /** * Move the cursor forward, default 1 column. * * Stops at the edge of the screen. */ - def cursorForward(cols: Int = 1) = csi + cols + "C" + def cursorForward(cols: Int = 1): String = csi + cols + "C" /** * Move the cursor back, default 1 column, * * Stops at the edge of the screen. */ - def cursorBack(cols: Int = 1) = csi + cols + "D" + def cursorBack(cols: Int = 1): String = csi + cols + "D" /** * Move the cursor to the beginning of the line, default 1, down. */ - def cursorNextLine(lines: Int = 1) = csi + lines + "E" + def cursorNextLine(lines: Int = 1): String = csi + lines + "E" /** * Move the cursor to the beginning of the line, default 1, up. */ - def cursorPrevLine(lines: Int = 1) = csi + lines + "F" + def cursorPrevLine(lines: Int = 1): String = csi + lines + "F" /** * Move the cursor to the column on the current line. */ - def cursorHorizAbs(col: Int) = csi + col + "G" + def cursorHorizAbs(col: Int): String = csi + col + "G" /** * Move the cursor to the position on screen (1,1 is the top-left). */ - def cursorPosition(row: Int, col: Int) = csi + row + ";" + col + "H" + def cursorPosition(row: Int, col: Int): String = csi + row + ";" + col + "H" /** * Clear from cursor to end of screen. */ - val eraseToEndOfScreen = csi + "0J" + val eraseToEndOfScreen: String = csi + "0J" /** * Clear from cursor to beginning of screen. */ - val eraseToStartOfScreen = csi + "1J" + val eraseToStartOfScreen: String = csi + "1J" /** * Clear screen and move cursor to top-left. * * On DOS the "2J" command also moves to 1,1, so we force that behaviour for all. */ - val eraseScreen = csi + "2J" + cursorPosition(1, 1) + val eraseScreen: String = csi + "2J" + cursorPosition(1, 1) /** * Clear screen and scrollback buffer then move cursor to top-left. * * Anticipate that same DOS behaviour here, and to maintain consistency with {@link #eraseScreen}. */ - val eraseScreenAndBuffer = csi + "3J" + val eraseScreenAndBuffer: String = csi + "3J" /** * Clears the terminal line to the right of the cursor. * * Does not move the cursor. */ - val eraseLineForward = csi + "0K" + val eraseLineForward: String = csi + "0K" /** * Clears the terminal line to the left of the cursor. * * Does not move the cursor. */ - val eraseLineBack= csi + "1K" + val eraseLineBack: String = csi + "1K" /** * Clears the whole terminal line. * * Does not move the cursor. */ - val eraseLine = csi + "2K" + val eraseLine: String = csi + "2K" /** * Scroll page up, default 1, lines. */ - def scrollUp(lines: Int = 1) = csi + lines + "S" + def scrollUp(lines: Int = 1): String = csi + lines + "S" /** * Scroll page down, default 1, lines. */ - def scrollDown(lines: Int = 1) = csi + lines + "T" + def scrollDown(lines: Int = 1): String = csi + lines + "T" /** * Saves the cursor position/state. */ - val saveCursorPosition = csi + "s" + val saveCursorPosition: String = csi + "s" /** * Restores the cursor position/state. */ - val restoreCursorPosition = csi + "u" + val restoreCursorPosition: String = csi + "u" - val enableAlternateBuffer = csi + "?1049h" + val enableAlternateBuffer: String = csi + "?1049h" - val disableAlternateBuffer = csi + "?1049l" + val disableAlternateBuffer: String = csi + "?1049l" /** * The Width of the terminal, as reported by the COLUMNS environment variable. @@ -135,7 +135,7 @@ object Terminal { .getOrElse(80) } - val subBars = Map( + private val subBars = Map( 0 -> " ", 1 -> "▏", 2 -> "▎", diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventLogger.scala b/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventLogger.scala index 26a8d24..873020e 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventLogger.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventLogger.scala @@ -4,6 +4,8 @@ import net.kemitix.thorp.domain.SizeTranslation.sizeInEnglish import net.kemitix.thorp.domain.Terminal._ import net.kemitix.thorp.domain.UploadEvent.RequestEvent +import scala.io.AnsiColor._ + trait UploadEventLogger { def logRequestCycle(localFile: LocalFile, @@ -15,9 +17,10 @@ trait UploadEventLogger { val bar = progressBar(bytesTransferred, fileLength.toDouble, Terminal.width) val transferred = sizeInEnglish(bytesTransferred) val fileSize = sizeInEnglish(fileLength) - print(s"${eraseLine}Uploading $transferred of $fileSize : $remoteKey\n$bar${cursorUp()}\r") + val message = s"${GREEN}Uploaded $transferred of $fileSize $RESET: $remoteKey$eraseLineForward" + println(s"$message\n$bar${Terminal.cursorPrevLine() * 2}") } else - print(eraseLine) + println(s"${GREEN}Uploaded:$RESET $remoteKey$eraseLineForward") } } diff --git a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala index dd2f826..0b5868b 100644 --- a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala +++ b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala @@ -6,23 +6,19 @@ import net.kemitix.thorp.domain._ trait StorageService { def listObjects(bucket: Bucket, - prefix: RemoteKey - )(implicit logger: Logger): IO[S3ObjectsData] + prefix: RemoteKey): IO[S3ObjectsData] def upload(localFile: LocalFile, bucket: Bucket, uploadEventListener: UploadEventListener, - tryCount: Int) - (implicit logger: Logger): IO[StorageQueueEvent] + tryCount: Int): IO[StorageQueueEvent] def copy(bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, - targetKey: RemoteKey - )(implicit logger: Logger): IO[StorageQueueEvent] + targetKey: RemoteKey): IO[StorageQueueEvent] def delete(bucket: Bucket, - remoteKey: RemoteKey - )(implicit logger: Logger): IO[StorageQueueEvent] + remoteKey: RemoteKey): IO[StorageQueueEvent] } diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientCopier.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientCopier.scala index cf73674..008b9aa 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientCopier.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientCopier.scala @@ -5,19 +5,15 @@ import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.CopyObjectRequest import net.kemitix.thorp.domain.StorageQueueEvent.CopyQueueEvent import net.kemitix.thorp.domain._ -import net.kemitix.thorp.storage.aws.S3ClientLogging.{logCopyStart, logCopyFinish} class S3ClientCopier(amazonS3: AmazonS3) { def copy(bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, - targetKey: RemoteKey) - (implicit logger: Logger): IO[StorageQueueEvent] = + targetKey: RemoteKey): IO[StorageQueueEvent] = for { - _ <- logCopyStart(bucket, sourceKey, targetKey) _ <- copyObject(bucket, sourceKey, hash, targetKey) - _ <- logCopyFinish(bucket, sourceKey,targetKey) } yield CopyQueueEvent(targetKey) private def copyObject(bucket: Bucket, diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientDeleter.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientDeleter.scala index c3e48cc..fd26f43 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientDeleter.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientDeleter.scala @@ -4,18 +4,14 @@ import cats.effect.IO import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.DeleteObjectRequest import net.kemitix.thorp.domain.StorageQueueEvent.DeleteQueueEvent -import net.kemitix.thorp.domain.{Bucket, Logger, RemoteKey} -import net.kemitix.thorp.storage.aws.S3ClientLogging.{logDeleteStart, logDeleteFinish} +import net.kemitix.thorp.domain.{Bucket, RemoteKey} class S3ClientDeleter(amazonS3: AmazonS3) { def delete(bucket: Bucket, - remoteKey: RemoteKey) - (implicit logger: Logger): IO[DeleteQueueEvent] = + remoteKey: RemoteKey): IO[DeleteQueueEvent] = for { - _ <- logDeleteStart(bucket, remoteKey) _ <- deleteObject(bucket, remoteKey) - _ <- logDeleteFinish(bucket, remoteKey) } yield DeleteQueueEvent(remoteKey) private def deleteObject(bucket: Bucket, remoteKey: RemoteKey) = diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientLogging.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientLogging.scala deleted file mode 100644 index c82348d..0000000 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientLogging.scala +++ /dev/null @@ -1,40 +0,0 @@ -package net.kemitix.thorp.storage.aws - -import cats.effect.IO -import net.kemitix.thorp.domain.{Bucket, Logger, RemoteKey} - -object S3ClientLogging { - - def logListObjectsStart(bucket: Bucket, - prefix: RemoteKey) - (implicit logger: Logger): IO[Unit] = - logger.info(s"Fetch S3 Summary: ${bucket.name}:${prefix.key}") - - def logListObjectsFinish(bucket: Bucket, - prefix: RemoteKey) - (implicit logger: Logger): IO[Unit] = - logger.info(s"Fetched S3 Summary: ${bucket.name}:${prefix.key}") - - def logCopyStart(bucket: Bucket, - sourceKey: RemoteKey, - targetKey: RemoteKey) - (implicit logger: Logger): IO[Unit] = - logger.info(s"Copy: ${bucket.name}:${sourceKey.key} => ${targetKey.key}") - - def logCopyFinish(bucket: Bucket, - sourceKey: RemoteKey, - targetKey: RemoteKey) - (implicit logger: Logger): IO[Unit] = - logger.info(s"Copied: ${bucket.name}:${sourceKey.key} => ${targetKey.key}") - - def logDeleteStart(bucket: Bucket, - remoteKey: RemoteKey) - (implicit logger: Logger): IO[Unit] = - logger.info(s"Delete: ${bucket.name}:${remoteKey.key}") - - def logDeleteFinish(bucket: Bucket, - remoteKey: RemoteKey) - (implicit logger: Logger): IO[Unit] = - logger.info(s"Deleted: ${bucket.name}:${remoteKey.key}") - -} diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientObjectLister.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientObjectLister.scala index 6772c1c..dfcf15c 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientObjectLister.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientObjectLister.scala @@ -4,8 +4,7 @@ import cats.effect.IO import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary} import net.kemitix.thorp.domain -import net.kemitix.thorp.domain.{Bucket, Logger, RemoteKey, S3ObjectsData} -import net.kemitix.thorp.storage.aws.S3ClientLogging.{logListObjectsStart, logListObjectsFinish} +import net.kemitix.thorp.domain.{Bucket, RemoteKey, S3ObjectsData} import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey @@ -14,8 +13,7 @@ import scala.collection.JavaConverters._ class S3ClientObjectLister(amazonS3: AmazonS3) { def listObjects(bucket: Bucket, - prefix: RemoteKey) - (implicit logger: Logger): IO[S3ObjectsData] = { + prefix: RemoteKey): IO[S3ObjectsData] = { type Token = String type Batch = (Stream[S3ObjectSummary], Option[Token]) @@ -50,10 +48,7 @@ class S3ClientObjectLister(amazonS3: AmazonS3) { } yield summaries ++ rest for { - _ <- logListObjectsStart(bucket, prefix) - r = new ListObjectsV2Request().withBucketName(bucket.name).withPrefix(prefix.key) - summaries <- fetch(r) - _ <- logListObjectsFinish(bucket, prefix) + summaries <- fetch(new ListObjectsV2Request().withBucketName(bucket.name).withPrefix(prefix.key)) } yield domain.S3ObjectsData(byHash(summaries), byKey(summaries)) } diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala index 1816659..fef37e2 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala @@ -16,27 +16,23 @@ class S3StorageService(amazonS3Client: => AmazonS3, lazy val deleter = new S3ClientDeleter(amazonS3Client) override def listObjects(bucket: Bucket, - prefix: RemoteKey) - (implicit logger: Logger): IO[S3ObjectsData] = + prefix: RemoteKey): IO[S3ObjectsData] = objectLister.listObjects(bucket, prefix) override def copy(bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, - targetKey: RemoteKey) - (implicit logger: Logger): IO[StorageQueueEvent] = + targetKey: RemoteKey): IO[StorageQueueEvent] = copier.copy(bucket, sourceKey,hash, targetKey) override def upload(localFile: LocalFile, bucket: Bucket, uploadEventListener: UploadEventListener, - tryCount: Int) - (implicit logger: Logger): IO[StorageQueueEvent] = + tryCount: Int): IO[StorageQueueEvent] = uploader.upload(localFile, bucket, uploadEventListener, 1) override def delete(bucket: Bucket, - remoteKey: RemoteKey) - (implicit logger: Logger): IO[StorageQueueEvent] = + remoteKey: RemoteKey): IO[StorageQueueEvent] = deleter.delete(bucket, remoteKey) } diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala index ab7d295..3d63735 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala @@ -8,7 +8,6 @@ import com.amazonaws.services.s3.transfer.{TransferManager => AmazonTransferMana import net.kemitix.thorp.domain.StorageQueueEvent.{ErrorQueueEvent, UploadQueueEvent} import net.kemitix.thorp.domain.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent} import net.kemitix.thorp.domain.{StorageQueueEvent, _} -import net.kemitix.thorp.storage.aws.UploaderLogging.{logMultiPartUploadStart, logMultiPartUploadFinished} import scala.util.Try @@ -17,16 +16,13 @@ class Uploader(transferManager: => AmazonTransferManager) { def upload(localFile: LocalFile, bucket: Bucket, uploadEventListener: UploadEventListener, - tryCount: Int) - (implicit logger: Logger): IO[StorageQueueEvent] = + tryCount: Int): IO[StorageQueueEvent] = for { - _ <- logMultiPartUploadStart(localFile, tryCount) upload <- transfer(localFile, bucket, uploadEventListener) action = upload match { case Right(r) => UploadQueueEvent(RemoteKey(r.getKey), MD5Hash(r.getETag)) case Left(e) => ErrorQueueEvent(localFile.remoteKey, e) } - _ <- logMultiPartUploadFinished(localFile) } yield action private def transfer(localFile: LocalFile, diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/UploaderLogging.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/UploaderLogging.scala deleted file mode 100644 index fccec4c..0000000 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/UploaderLogging.scala +++ /dev/null @@ -1,22 +0,0 @@ -package net.kemitix.thorp.storage.aws - -import cats.effect.IO -import net.kemitix.thorp.domain.SizeTranslation.sizeInEnglish -import net.kemitix.thorp.domain.Terminal._ -import net.kemitix.thorp.domain.{LocalFile, Logger} - -object UploaderLogging { - - def logMultiPartUploadStart(localFile: LocalFile, - tryCount: Int) - (implicit logger: Logger): IO[Unit] = { - val tryMessage = if (tryCount == 1) "" else s"try $tryCount" - val size = sizeInEnglish(localFile.file.length) - logger.info(s"${eraseLine}upload:$tryMessage:$size:${localFile.remoteKey.key}") - } - - def logMultiPartUploadFinished(localFile: LocalFile) - (implicit logger: Logger): IO[Unit] = - logger.debug(s"upload:finished: ${localFile.remoteKey.key}") - -}