From 00c04187e8cee22e62300855cb8a754d41f722ee Mon Sep 17 00:00:00 2001 From: Paul Campbell Date: Thu, 4 Jul 2019 18:58:31 +0100 Subject: [PATCH] Display total size and progress for entire run (#94) * [changelog] updated * [core] Wrap Stream[LocalFile] as LocalFiles * [core] LocalFiles counts files * [core] LocalFiles sums file lengths * [core] Restore logFileScan * [storage-aws] Lister logs when fetching object summaries * [storage-aws] Extract ListerLogger * [core] Synchronise use leftMap * [core] Syncronise extract assemblePlan * [core] Wrap Stream[Action] in SyncPlan * [core] Copy the file count and totalSizeBytes across to SyncPlan * [cli] Program rename actions as syncPlan * [cli] Program extract thorpArchive def * [cli] Program extract createPlan def * [cli] Program refactoring * [cli] Program remove println showing version * [cli] Program rename actions parameter as syncPlan * [core] ThorpArchive add an index to each action * [cli] Program make SyncPlan available to ThorpArchive * [core] Pass SyncTotals to Archive * [domain] Move SyncTotals into module * [domain] Pass index and SyncTotals to UploadEventListener * [domain] UploadEventLogger add file count a size progress bars * [domain] UploadEventLogger better display stability and add file index * [cli] Index files in correct order * [cli] Program extends Synchronise * [core] Rename Synchronise as PlanBuilder * [cli] Program add test to check actions don't get reordered from plan * [core] collect file size totals * [domain] UploadEventLogger include percentage * [cli] ProgramTest Use wildcards when selecting more than 6 elements --- CHANGELOG.org | 1 + .../scala/net/kemitix/thorp/cli/Program.scala | 55 +++++++++++----- .../net/kemitix/thorp/cli/ProgramTest.scala | 64 +++++++++++++++++++ .../scala/net/kemitix/thorp/core/Action.scala | 13 ++-- .../kemitix/thorp/core/ActionGenerator.scala | 6 +- .../kemitix/thorp/core/LocalFileStream.scala | 22 ++++--- .../net/kemitix/thorp/core/LocalFiles.scala | 14 ++++ .../{Synchronise.scala => PlanBuilder.scala} | 52 +++++++++------ .../net/kemitix/thorp/core/SyncPlan.scala | 6 ++ .../net/kemitix/thorp/core/ThorpArchive.scala | 5 +- .../thorp/core/UnversionedMirrorArchive.scala | 25 +++++--- .../thorp/core/ActionGeneratorSuite.scala | 10 +-- .../thorp/core/LocalFileStreamSuite.scala | 14 +++- .../net/kemitix/thorp/core/SyncSuite.scala | 36 +++++++---- .../net/kemitix/thorp/domain/SyncTotals.scala | 5 ++ .../thorp/domain/UploadEventListener.scala | 12 ++-- .../thorp/domain/UploadEventLogger.scala | 30 +++++++-- .../thorp/storage/api/StorageService.scala | 3 +- .../kemitix/thorp/storage/aws/Lister.scala | 42 ++++++------ .../thorp/storage/aws/ListerLogger.scala | 9 +++ .../thorp/storage/aws/S3StorageService.scala | 3 +- .../storage/aws/StorageServiceSuite.scala | 2 +- .../thorp/storage/aws/UploaderSuite.scala | 2 +- 23 files changed, 317 insertions(+), 114 deletions(-) create mode 100644 cli/src/test/scala/net/kemitix/thorp/cli/ProgramTest.scala create mode 100644 core/src/main/scala/net/kemitix/thorp/core/LocalFiles.scala rename core/src/main/scala/net/kemitix/thorp/core/{Synchronise.scala => PlanBuilder.scala} (62%) create mode 100644 core/src/main/scala/net/kemitix/thorp/core/SyncPlan.scala create mode 100644 domain/src/main/scala/net/kemitix/thorp/domain/SyncTotals.scala create mode 100644 storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ListerLogger.scala diff --git a/CHANGELOG.org b/CHANGELOG.org index 88fdf54..b4f6a6a 100644 --- a/CHANGELOG.org +++ b/CHANGELOG.org @@ -11,6 +11,7 @@ The format is based on [[https://keepachangelog.com/en/1.0.0/][Keep a Changelog] - Add a version command-line option (#99) - Add a batch mode (#85) + - Display total size and progress for entire run (#94) * [0.6.1] - 2019-07-03 diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala index f100a2d..463f5de 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala @@ -7,38 +7,63 @@ import net.kemitix.thorp.domain.{Logger, StorageQueueEvent} import net.kemitix.thorp.storage.aws.S3HashService.defaultHashService import net.kemitix.thorp.storage.aws.S3StorageServiceBuilder.defaultStorageService -trait Program { +trait Program extends PlanBuilder { def run(cliOptions: ConfigOptions): IO[ExitCode] = { implicit val logger: Logger = new PrintLogger() - if (ConfigQuery.showVersion(cliOptions)) IO { - println(s"Thorp v${thorp.BuildInfo.version}") - ExitCode.Success - } else { + if (ConfigQuery.showVersion(cliOptions)) for { - actions <- Synchronise.createPlan(defaultStorageService, defaultHashService, cliOptions).valueOrF(handleErrors) - events <- handleActions(UnversionedMirrorArchive.default(defaultStorageService, ConfigQuery.batchMode(cliOptions)), actions) + _ <- logger.info(s"Thorp v${thorp.BuildInfo.version}") + } yield ExitCode.Success + else + for { + syncPlan <- createPlan(defaultStorageService, defaultHashService, cliOptions).valueOrF(handleErrors) + archive <- thorpArchive(cliOptions, syncPlan) + events <- handleActions(archive, syncPlan) _ <- defaultStorageService.shutdown _ <- SyncLogging.logRunFinished(events) } yield ExitCode.Success - } } - private def handleErrors(implicit logger: Logger): List[String] => IO[Stream[Action]] = { + def thorpArchive(cliOptions: ConfigOptions, + syncPlan: SyncPlan): IO[ThorpArchive] = + IO.pure( + UnversionedMirrorArchive.default( + defaultStorageService, + ConfigQuery.batchMode(cliOptions), + syncPlan.syncTotals + )) + + private def handleErrors(implicit logger: Logger): List[String] => IO[SyncPlan] = { errors => { for { _ <- logger.error("There were errors:") _ <- errors.map(error => logger.error(s" - $error")).sequence - } yield Stream() + } yield SyncPlan() } } private def handleActions(archive: ThorpArchive, - actions: Stream[Action]) - (implicit l: Logger): IO[Stream[StorageQueueEvent]] = - actions.foldLeft(Stream[IO[StorageQueueEvent]]()) { - (stream, action) => archive.update(action) ++ stream - }.sequence + syncPlan: SyncPlan) + (implicit l: Logger): IO[Stream[StorageQueueEvent]] = { + type Accumulator = (Stream[IO[StorageQueueEvent]], Long) + val zero: Accumulator = (Stream(), syncPlan.syncTotals.totalSizeBytes) + val (actions, _) = syncPlan.actions + .zipWithIndex + .reverse + .foldLeft(zero) { + (acc: Accumulator, indexedAction) => { + val (stream, bytesToDo) = acc + val (action, index) = indexedAction + val remainingBytes = bytesToDo - action.size + ( + archive.update(index, action, remainingBytes) ++ stream, + remainingBytes + ) + } + } + actions.sequence + } } object Program extends Program diff --git a/cli/src/test/scala/net/kemitix/thorp/cli/ProgramTest.scala b/cli/src/test/scala/net/kemitix/thorp/cli/ProgramTest.scala new file mode 100644 index 0000000..3aa77f3 --- /dev/null +++ b/cli/src/test/scala/net/kemitix/thorp/cli/ProgramTest.scala @@ -0,0 +1,64 @@ +package net.kemitix.thorp.cli + +import java.io.File + +import cats.data.EitherT +import cats.effect.IO +import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload} +import net.kemitix.thorp.core._ +import net.kemitix.thorp.domain.{Bucket, LocalFile, Logger, MD5Hash, RemoteKey, StorageQueueEvent} +import net.kemitix.thorp.storage.api.{HashService, StorageService} +import org.scalatest.FunSpec + +class ProgramTest extends FunSpec { + + val source: File = Resource(this, ".") + val bucket: Bucket = Bucket("aBucket") + val hash: MD5Hash = MD5Hash("aHash") + val copyAction: Action = ToCopy(bucket, RemoteKey("copy-me"), hash, RemoteKey("overwrite-me"), 17L) + val uploadAction: Action = ToUpload(bucket, LocalFile.resolve("aFile", Map(), source, _ => RemoteKey("upload-me")), 23L) + val deleteAction: Action = ToDelete(bucket, RemoteKey("delete-me"), 0L) + + val configOptions: ConfigOptions = ConfigOptions(options = List( + ConfigOption.IgnoreGlobalOptions, + ConfigOption.IgnoreUserOptions + )) + + describe("upload, copy and delete actions in plan") { + val archive = TestProgram.thorpArchive + it("should be handled in correct order") { + val expected = List(copyAction, uploadAction, deleteAction) + TestProgram.run(configOptions).unsafeRunSync + val result = archive.actions + assertResult(expected)(result) + } + } + + object TestProgram extends Program with TestPlanBuilder { + val thorpArchive: ActionCaptureArchive = new ActionCaptureArchive + override def thorpArchive(cliOptions: ConfigOptions, syncPlan: SyncPlan): IO[ThorpArchive] = + IO.pure(thorpArchive) + } + + trait TestPlanBuilder extends PlanBuilder { + override def createPlan(storageService: StorageService, + hashService: HashService, + configOptions: ConfigOptions) + (implicit l: Logger): EitherT[IO, List[String], SyncPlan] = { + EitherT.right(IO(SyncPlan(Stream(copyAction, uploadAction, deleteAction)))) + } + } + + class ActionCaptureArchive extends ThorpArchive { + var actions: List[Action] = List[Action]() + override def update(index: Int, + action: Action, + totalBytesSoFar: Long) + (implicit l: Logger): Stream[IO[StorageQueueEvent]] = { + actions = action :: actions + Stream() + } + } + +} + diff --git a/core/src/main/scala/net/kemitix/thorp/core/Action.scala b/core/src/main/scala/net/kemitix/thorp/core/Action.scala index eb5bf25..3c9fb41 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/Action.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/Action.scala @@ -4,21 +4,26 @@ import net.kemitix.thorp.domain.{Bucket, LocalFile, MD5Hash, RemoteKey} sealed trait Action { def bucket: Bucket + def size: Long } object Action { final case class DoNothing(bucket: Bucket, - remoteKey: RemoteKey) extends Action + remoteKey: RemoteKey, + size: Long) extends Action final case class ToUpload(bucket: Bucket, - localFile: LocalFile) extends Action + localFile: LocalFile, + size: Long) extends Action final case class ToCopy(bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, - targetKey: RemoteKey) extends Action + targetKey: RemoteKey, + size: Long) extends Action final case class ToDelete(bucket: Bucket, - remoteKey: RemoteKey) extends Action + remoteKey: RemoteKey, + size: Long) extends Action } diff --git a/core/src/main/scala/net/kemitix/thorp/core/ActionGenerator.scala b/core/src/main/scala/net/kemitix/thorp/core/ActionGenerator.scala index 18b09ff..3aadc2a 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ActionGenerator.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ActionGenerator.scala @@ -40,12 +40,12 @@ object ActionGenerator { private def doNothing(bucket: Bucket, remoteKey: RemoteKey) = Stream( - DoNothing(bucket, remoteKey)) + DoNothing(bucket, remoteKey, 0L)) private def uploadFile(bucket: Bucket, localFile: LocalFile) = Stream( - ToUpload(bucket, localFile)) + ToUpload(bucket, localFile, localFile.file.length)) private def copyFile(bucket: Bucket, localFile: LocalFile, @@ -54,7 +54,7 @@ object ActionGenerator { headOption.toStream.map { remoteMetaData => val sourceKey = remoteMetaData.remoteKey val hash = remoteMetaData.hash - ToCopy(bucket, sourceKey, hash, localFile.remoteKey) + ToCopy(bucket, sourceKey, hash, localFile.remoteKey, localFile.file.length) } } diff --git a/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala b/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala index 060fed9..4876978 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala @@ -14,11 +14,11 @@ object LocalFileStream { def findFiles(file: File, hashService: HashService) (implicit c: Config, - logger: Logger): IO[Stream[LocalFile]] = { + logger: Logger): IO[LocalFiles] = { val filters: Path => Boolean = Filter.isIncluded(c.filters) - def loop(file: File): IO[Stream[LocalFile]] = { + def loop(file: File): IO[LocalFiles] = { def dirPaths(file: File): IO[Stream[File]] = IO(listFiles(file)) @@ -26,16 +26,16 @@ object LocalFileStream { Stream(fs: _*) .filter(f => filters(f.toPath))) - def recurseIntoSubDirectories(file: File): IO[Stream[LocalFile]] = + def recurseIntoSubDirectories(file: File): IO[LocalFiles] = file match { case f if f.isDirectory => loop(file) case _ => localFile(hashService, file) } - def recurse(fs: Stream[File]): IO[Stream[LocalFile]] = - fs.foldLeft(IO.pure(Stream.empty[LocalFile]))((acc, f) => + def recurse(fs: Stream[File]): IO[LocalFiles] = + fs.foldLeft(IO.pure(LocalFiles()))((acc, f) => recurseIntoSubDirectories(f) - .flatMap(lfs => acc.map(s => s ++ lfs))) + .flatMap(localFiles => acc.map(accLocalFiles => accLocalFiles ++ localFiles))) for { _ <- logger.debug(s"- Entering: $file") @@ -48,10 +48,16 @@ object LocalFileStream { loop(file) } - private def localFile(hashService: HashService, file: File)(implicit l: Logger, c: Config) = { + private def localFile(hashService: HashService, + file: File) + (implicit l: Logger, c: Config) = { for { hash <- hashService.hashLocalObject(file) - } yield Stream(domain.LocalFile(file, c.source, hash, generateKey(c.source, c.prefix)(file))) + } yield + LocalFiles( + localFiles = Stream(domain.LocalFile(file, c.source, hash, generateKey(c.source, c.prefix)(file))), + count = 1, + totalSizeBytes = file.length) } //TODO: Change this to return an Either[IllegalArgumentException, Array[File]] diff --git a/core/src/main/scala/net/kemitix/thorp/core/LocalFiles.scala b/core/src/main/scala/net/kemitix/thorp/core/LocalFiles.scala new file mode 100644 index 0000000..f85a2fb --- /dev/null +++ b/core/src/main/scala/net/kemitix/thorp/core/LocalFiles.scala @@ -0,0 +1,14 @@ +package net.kemitix.thorp.core + +import net.kemitix.thorp.domain.LocalFile + +case class LocalFiles(localFiles: Stream[LocalFile] = Stream(), + count: Long = 0, + totalSizeBytes: Long = 0) { + def ++(append: LocalFiles): LocalFiles = + copy(localFiles = localFiles ++ append.localFiles, + count = count + append.count, + totalSizeBytes = totalSizeBytes + append.totalSizeBytes) + +} + diff --git a/core/src/main/scala/net/kemitix/thorp/core/Synchronise.scala b/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala similarity index 62% rename from core/src/main/scala/net/kemitix/thorp/core/Synchronise.scala rename to core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala index ca4bf13..c02d870 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/Synchronise.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala @@ -7,12 +7,12 @@ import net.kemitix.thorp.core.Action.DoNothing import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.api.{HashService, StorageService} -trait Synchronise { +trait PlanBuilder { def createPlan(storageService: StorageService, hashService: HashService, configOptions: ConfigOptions) - (implicit l: Logger): EitherT[IO, List[String], Stream[Action]] = + (implicit l: Logger): EitherT[IO, List[String], SyncPlan] = EitherT(ConfigurationBuilder.buildConfig(configOptions)) .leftMap(errorMessages) .flatMap(config => useValidConfig(storageService, hashService)(config, l)) @@ -25,44 +25,58 @@ trait Synchronise { case _ => true } + def assemblePlan(implicit c: Config): ((S3ObjectsData, LocalFiles)) => SyncPlan = { + case (remoteData, localData) => { + val actions = + (actionsForLocalFiles(localData, remoteData) ++ + actionsForRemoteKeys(remoteData)) + .filter(removeDoNothing) + SyncPlan( + actions = actions, + syncTotals = SyncTotals( + count = localData.count, + totalSizeBytes = localData.totalSizeBytes)) + } + } + def useValidConfig(storageService: StorageService, hashService: HashService) - (implicit c: Config, l: Logger): EitherT[IO, List[String], Stream[Action]] = { + (implicit c: Config, l: Logger): EitherT[IO, List[String], SyncPlan] = { for { _ <- EitherT.liftF(SyncLogging.logRunStart(c.bucket, c.prefix, c.source)) actions <- gatherMetadata(storageService, hashService) - .swap.map(error => List(error)).swap - .map { - case (remoteData, localData) => - (actionsForLocalFiles(localData, remoteData) ++ - actionsForRemoteKeys(remoteData)) - .filter(removeDoNothing) - } + .leftMap(error => List(error)) + .map(assemblePlan) } yield actions } private def gatherMetadata(storageService: StorageService, hashService: HashService) (implicit l: Logger, - c: Config): EitherT[IO, String, (S3ObjectsData, Stream[LocalFile])] = + c: Config): EitherT[IO, String, (S3ObjectsData, LocalFiles)] = for { remoteData <- fetchRemoteData(storageService) localData <- EitherT.liftF(findLocalFiles(hashService)) } yield (remoteData, localData) - private def actionsForLocalFiles(localData: Stream[LocalFile], remoteData: S3ObjectsData) + private def actionsForLocalFiles(localData: LocalFiles, remoteData: S3ObjectsData) (implicit c: Config) = - localData.foldLeft(Stream[Action]())((acc, lf) => createActionFromLocalFile(lf, remoteData) ++ acc) + localData.localFiles.foldLeft(Stream[Action]())((acc, lf) => createActionFromLocalFile(lf, remoteData) ++ acc) private def actionsForRemoteKeys(remoteData: S3ObjectsData) (implicit c: Config) = remoteData.byKey.keys.foldLeft(Stream[Action]())((acc, rk) => createActionFromRemoteKey(rk) #:: acc) - private def fetchRemoteData(storageService: StorageService)(implicit c: Config) = + private def fetchRemoteData(storageService: StorageService) + (implicit c: Config, l: Logger) = storageService.listObjects(c.bucket, c.prefix) - private def findLocalFiles(hashService: HashService)(implicit config: Config, l: Logger) = - LocalFileStream.findFiles(config.source, hashService) + private def findLocalFiles(hashService: HashService) + (implicit config: Config, l: Logger) = + for { + _ <- SyncLogging.logFileScan + localFiles <- LocalFileStream.findFiles(config.source, hashService) + } yield localFiles private def createActionFromLocalFile(lf: LocalFile, remoteData: S3ObjectsData) (implicit c: Config) = @@ -70,9 +84,9 @@ trait Synchronise { private def createActionFromRemoteKey(rk: RemoteKey) (implicit c: Config) = - if (rk.isMissingLocally(c.source, c.prefix)) Action.ToDelete(c.bucket, rk) - else DoNothing(c.bucket, rk) + if (rk.isMissingLocally(c.source, c.prefix)) Action.ToDelete(c.bucket, rk, 0L) + else DoNothing(c.bucket, rk, 0L) } -object Synchronise extends Synchronise +object PlanBuilder extends PlanBuilder diff --git a/core/src/main/scala/net/kemitix/thorp/core/SyncPlan.scala b/core/src/main/scala/net/kemitix/thorp/core/SyncPlan.scala new file mode 100644 index 0000000..c692e87 --- /dev/null +++ b/core/src/main/scala/net/kemitix/thorp/core/SyncPlan.scala @@ -0,0 +1,6 @@ +package net.kemitix.thorp.core + +import net.kemitix.thorp.domain.SyncTotals + +case class SyncPlan(actions: Stream[Action] = Stream(), + syncTotals: SyncTotals = SyncTotals()) diff --git a/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala b/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala index ed026bb..689883e 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala @@ -5,7 +5,10 @@ import net.kemitix.thorp.domain.{LocalFile, Logger, StorageQueueEvent} trait ThorpArchive { - def update(action: Action)(implicit l: Logger): Stream[IO[StorageQueueEvent]] + def update(index: Int, + action: Action, + totalBytesSoFar: Long) + (implicit l: Logger): Stream[IO[StorageQueueEvent]] def fileUploaded(localFile: LocalFile, batchMode: Boolean) diff --git a/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala b/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala index 3e3cbb7..511ff05 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala @@ -3,29 +3,33 @@ package net.kemitix.thorp.core import cats.effect.IO import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent -import net.kemitix.thorp.domain.{LocalFile, Logger, StorageQueueEvent, UploadEventListener} +import net.kemitix.thorp.domain.{Logger, StorageQueueEvent, SyncTotals, UploadEventListener} import net.kemitix.thorp.storage.api.StorageService case class UnversionedMirrorArchive(storageService: StorageService, - batchMode: Boolean) extends ThorpArchive { - override def update(action: Action) + batchMode: Boolean, + syncTotals: SyncTotals) extends ThorpArchive { + override def update(index: Int, + action: Action, + totalBytesSoFar: Long) (implicit l: Logger): Stream[IO[StorageQueueEvent]] = Stream( action match { - case ToUpload(bucket, localFile) => + case ToUpload(bucket, localFile, size) => for { - event <- storageService.upload(localFile, bucket, batchMode, new UploadEventListener(localFile), 1) + event <- storageService.upload(localFile, bucket, batchMode, + new UploadEventListener(localFile, index, syncTotals, totalBytesSoFar), 1) _ <- fileUploaded(localFile, batchMode) } yield event - case ToCopy(bucket, sourceKey, hash, targetKey) => + case ToCopy(bucket, sourceKey, hash, targetKey, size) => for { event <- storageService.copy(bucket, sourceKey, hash, targetKey) } yield event - case ToDelete(bucket, remoteKey) => + case ToDelete(bucket, remoteKey, size) => for { event <- storageService.delete(bucket, remoteKey) } yield event - case DoNothing(_, remoteKey) => + case DoNothing(_, remoteKey, size) => IO.pure(DoNothingQueueEvent(remoteKey)) }) @@ -33,6 +37,7 @@ case class UnversionedMirrorArchive(storageService: StorageService, object UnversionedMirrorArchive { def default(storageService: StorageService, - batchMode: Boolean): ThorpArchive = - new UnversionedMirrorArchive(storageService, batchMode) + batchMode: Boolean, + syncTotals: SyncTotals): ThorpArchive = + new UnversionedMirrorArchive(storageService, batchMode, syncTotals) } diff --git a/core/src/test/scala/net/kemitix/thorp/core/ActionGeneratorSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/ActionGeneratorSuite.scala index 76a3b62..dc39d59 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/ActionGeneratorSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/ActionGeneratorSuite.scala @@ -29,7 +29,7 @@ class ActionGeneratorSuite matchByKey = Some(theRemoteMetadata) // remote exists ) it("do nothing") { - val expected = List(DoNothing(bucket, theFile.remoteKey)) + val expected = List(DoNothing(bucket, theFile.remoteKey, theFile.file.length)) val result = invoke(input) assertResult(expected)(result) } @@ -44,7 +44,7 @@ class ActionGeneratorSuite matchByHash = Set(otherRemoteMetadata), // other matches matchByKey = None) // remote is missing it("copy from other key") { - val expected = List(ToCopy(bucket, otherRemoteKey, theHash, theRemoteKey)) // copy + val expected = List(ToCopy(bucket, otherRemoteKey, theHash, theRemoteKey, theFile.file.length)) // copy val result = invoke(input) assertResult(expected)(result) } @@ -56,7 +56,7 @@ class ActionGeneratorSuite matchByHash = Set.empty, // other no matches matchByKey = None) // remote is missing it("upload") { - val expected = List(ToUpload(bucket, theFile)) // upload + val expected = List(ToUpload(bucket, theFile, theFile.file.length)) // upload val result = invoke(input) assertResult(expected)(result) } @@ -75,7 +75,7 @@ class ActionGeneratorSuite matchByHash = Set(otherRemoteMetadata), // other matches matchByKey = Some(oldRemoteMetadata)) // remote exists it("copy from other key") { - val expected = List(ToCopy(bucket, otherRemoteKey, theHash, theRemoteKey)) // copy + val expected = List(ToCopy(bucket, otherRemoteKey, theHash, theRemoteKey, theFile.file.length)) // copy val result = invoke(input) assertResult(expected)(result) } @@ -91,7 +91,7 @@ class ActionGeneratorSuite matchByKey = Some(theRemoteMetadata) // remote exists ) it("upload") { - val expected = List(ToUpload(bucket, theFile)) // upload + val expected = List(ToUpload(bucket, theFile, theFile.file.length)) // upload val result = invoke(input) assertResult(expected)(result) } diff --git a/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala index f9fea81..c1557b0 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala @@ -23,9 +23,21 @@ class LocalFileStreamSuite extends FunSpec { describe("findFiles") { it("should find all files") { val result: Set[String] = - LocalFileStream.findFiles(uploadResource, hashService).unsafeRunSync.toSet + invoke.localFiles.toSet .map { x: LocalFile => x.relative.toString } assertResult(Set("subdir/leaf-file", "root-file"))(result) } + it("should count all files") { + val result = invoke.count + assertResult(2)(result) + } + it("should sum the size of all files") { + val result = invoke.totalSizeBytes + assertResult(113)(result) + } + } + + private def invoke = { + LocalFileStream.findFiles(uploadResource, hashService).unsafeRunSync } } diff --git a/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala index c56b0eb..e0cb20b 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala @@ -49,9 +49,16 @@ class SyncSuite )) def invokeSubject(storageService: StorageService, - hashService: HashService, - configOptions: ConfigOptions): Either[List[String], Stream[Action]] = { - Synchronise.createPlan(storageService, hashService, configOptions).value.unsafeRunSync + hashService: HashService, + configOptions: ConfigOptions): Either[List[String], SyncPlan] = { + PlanBuilder.createPlan(storageService, hashService, configOptions).value.unsafeRunSync + } + + def invokeSubjectForActions(storageService: StorageService, + hashService: HashService, + configOptions: ConfigOptions): Either[List[String], Stream[Action]] = { + invokeSubject(storageService, hashService, configOptions) + .map(_.actions) } describe("when all files should be uploaded") { @@ -60,9 +67,9 @@ class SyncSuite byKey = Map())) it("uploads all files") { val expected = Right(Set( - ToUpload(testBucket, rootFile), - ToUpload(testBucket, leafFile))) - val result = invokeSubject(storageService, hashService, configOptions) + ToUpload(testBucket, rootFile, rootFile.file.length), + ToUpload(testBucket, leafFile, leafFile.file.length))) + val result = invokeSubjectForActions(storageService, hashService, configOptions) assertResult(expected)(result.map(_.toSet)) } } @@ -81,7 +88,7 @@ class SyncSuite val storageService = new RecordingStorageService(testBucket, s3ObjectsData) it("no actions") { val expected = Stream() - val result = invokeSubject(storageService, hashService, configOptions) + val result = invokeSubjectForActions(storageService, hashService, configOptions) assert(result.isRight) assertResult(expected)(result.right.get) } @@ -100,10 +107,10 @@ class SyncSuite val storageService = new RecordingStorageService(testBucket, s3ObjectsData) it("copies the file and deletes the original") { val expected = Stream( - ToCopy(testBucket, sourceKey, Root.hash, targetKey), - ToDelete(testBucket, sourceKey) + ToCopy(testBucket, sourceKey, Root.hash, targetKey, rootFile.file.length), + ToDelete(testBucket, sourceKey, 0L) ) - val result = invokeSubject(storageService, hashService, configOptions) + val result = invokeSubjectForActions(storageService, hashService, configOptions) assert(result.isRight) assertResult(expected)(result.right.get) } @@ -128,9 +135,9 @@ class SyncSuite val storageService = new RecordingStorageService(testBucket, s3ObjectsData) it("deleted key") { val expected = Stream( - ToDelete(testBucket, deletedKey) + ToDelete(testBucket, deletedKey, 0L) ) - val result = invokeSubject(storageService,hashService, configOptions) + val result = invokeSubjectForActions(storageService,hashService, configOptions) assert(result.isRight) assertResult(expected)(result.right.get) } @@ -146,7 +153,7 @@ class SyncSuite val storageService = new RecordingStorageService(testBucket, s3ObjectsData) it("is not uploaded") { val expected = Stream() - val result = invokeSubject(storageService, hashService, ConfigOption.Exclude("leaf") :: configOptions) + val result = invokeSubjectForActions(storageService, hashService, ConfigOption.Exclude("leaf") :: configOptions) assert(result.isRight) assertResult(expected)(result.right.get) } @@ -157,7 +164,8 @@ class SyncSuite extends StorageService { override def listObjects(bucket: Bucket, - prefix: RemoteKey): EitherT[IO, String, S3ObjectsData] = + prefix: RemoteKey) + (implicit l: Logger): EitherT[IO, String, S3ObjectsData] = EitherT.liftF(IO.pure(s3ObjectsData)) override def upload(localFile: LocalFile, diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/SyncTotals.scala b/domain/src/main/scala/net/kemitix/thorp/domain/SyncTotals.scala new file mode 100644 index 0000000..c1e44f7 --- /dev/null +++ b/domain/src/main/scala/net/kemitix/thorp/domain/SyncTotals.scala @@ -0,0 +1,5 @@ +package net.kemitix.thorp.domain + +case class SyncTotals(count: Long = 0L, + totalSizeBytes: Long = 0L, + sizeUploadedBytes: Long = 0L) diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventListener.scala b/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventListener.scala index bac104a..b7c2427 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventListener.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventListener.scala @@ -3,15 +3,17 @@ package net.kemitix.thorp.domain import net.kemitix.thorp.domain.UploadEvent.RequestEvent import net.kemitix.thorp.domain.UploadEventLogger.logRequestCycle -class UploadEventListener(localFile: LocalFile) { +class UploadEventListener(localFile: LocalFile, + index: Int, + syncTotals: SyncTotals, + totalBytesSoFar: Long) { var bytesTransferred = 0L - def listener: UploadEvent => Unit = - { + def listener: UploadEvent => Unit = { case e: RequestEvent => bytesTransferred += e.transferred - logRequestCycle(localFile, e, bytesTransferred) - case _ => () + logRequestCycle(localFile, e, bytesTransferred, index, syncTotals, totalBytesSoFar) + case _ => () } } diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventLogger.scala b/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventLogger.scala index 873020e..7054925 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventLogger.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/UploadEventLogger.scala @@ -10,19 +10,35 @@ trait UploadEventLogger { def logRequestCycle(localFile: LocalFile, event: RequestEvent, - bytesTransferred: Long): Unit = { + bytesTransferred: Long, + index: Int, + syncTotals: SyncTotals, + totalBytesSoFar: Long): Unit = { val remoteKey = localFile.remoteKey.key val fileLength = localFile.file.length + val statusHeight = 7 if (bytesTransferred < fileLength) { - val bar = progressBar(bytesTransferred, fileLength.toDouble, Terminal.width) - val transferred = sizeInEnglish(bytesTransferred) - val fileSize = sizeInEnglish(fileLength) - val message = s"${GREEN}Uploaded $transferred of $fileSize $RESET: $remoteKey$eraseLineForward" - println(s"$message\n$bar${Terminal.cursorPrevLine() * 2}") + println( + s"${GREEN}Uploading:$RESET $remoteKey$eraseToEndOfScreen\n" + + statusWithBar(" File", sizeInEnglish, bytesTransferred, fileLength) + + statusWithBar("Files", l => l.toString, index, syncTotals.count) + + statusWithBar(" Size", sizeInEnglish, bytesTransferred + totalBytesSoFar, syncTotals.totalSizeBytes) + + s"${Terminal.cursorPrevLine(statusHeight)}") } else - println(s"${GREEN}Uploaded:$RESET $remoteKey$eraseLineForward") + println(s"${GREEN}Uploaded:$RESET $remoteKey$eraseToEndOfScreen") } + private def statusWithBar(label: String, + format: Long => String, + current: Long, + max: Long, + pre: Long = 0): String = { + val percent = f"${(current * 100) / max}%2d" + s"$GREEN$label:$RESET ($percent%) ${format(current)} of ${format(max)}" + + (if (pre > 0) s" (pre-synced ${format(pre)}" + else "") + s"$eraseLineForward\n" + + progressBar(current, max, Terminal.width) + } } object UploadEventLogger extends UploadEventLogger diff --git a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala index d4f9595..b526405 100644 --- a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala +++ b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala @@ -9,7 +9,8 @@ trait StorageService { def shutdown: IO[StorageQueueEvent] def listObjects(bucket: Bucket, - prefix: RemoteKey): EitherT[IO, String, S3ObjectsData] + prefix: RemoteKey) + (implicit l: Logger): EitherT[IO, String, S3ObjectsData] def upload(localFile: LocalFile, bucket: Bucket, diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala index 772e887..7440f98 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala @@ -5,7 +5,7 @@ import cats.effect.IO import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary} import net.kemitix.thorp.domain -import net.kemitix.thorp.domain.{Bucket, RemoteKey, S3ObjectsData} +import net.kemitix.thorp.domain.{Bucket, Logger, RemoteKey, S3ObjectsData} import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey @@ -14,30 +14,25 @@ import scala.util.Try class Lister(amazonS3: AmazonS3) { + private type Token = String + private type Batch = (Stream[S3ObjectSummary], Option[Token]) + def listObjects(bucket: Bucket, - prefix: RemoteKey): EitherT[IO, String, S3ObjectsData] = { + prefix: RemoteKey) + (implicit l: Logger): EitherT[IO, String, S3ObjectsData] = { - type Token = String - type Batch = (Stream[S3ObjectSummary], Option[Token]) - - val requestMore = (token:Token) => new ListObjectsV2Request() + val requestMore = (token: Token) => new ListObjectsV2Request() .withBucketName(bucket.name) .withPrefix(prefix.key) .withContinuationToken(token) def fetchBatch: ListObjectsV2Request => EitherT[IO, String, Batch] = - request => - EitherT { - IO.pure { - Try(amazonS3.listObjectsV2(request)) - .map { result => - val more: Option[Token] = - if (result.isTruncated) Some(result.getNextContinuationToken) - else None - (result.getObjectSummaries.asScala.toStream, more) - }.toEither.swap.map(e => e.getMessage).swap - } - } + request => EitherT { + for { + _ <- ListerLogger.logFetchBatch + batch <- tryFetchBatch(request) + } yield batch + } def fetchMore(more: Option[Token]): EitherT[IO, String, Stream[S3ObjectSummary]] = { more match { @@ -60,4 +55,15 @@ class Lister(amazonS3: AmazonS3) { } yield domain.S3ObjectsData(byHash(summaries), byKey(summaries)) } + private def tryFetchBatch(request: ListObjectsV2Request): IO[Either[String, (Stream[S3ObjectSummary], Option[Token])]] = { + IO { + Try(amazonS3.listObjectsV2(request)) + .map { result => + val more: Option[Token] = + if (result.isTruncated) Some(result.getNextContinuationToken) + else None + (result.getObjectSummaries.asScala.toStream, more) + }.toEither.swap.map(e => e.getMessage).swap + } + } } diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ListerLogger.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ListerLogger.scala new file mode 100644 index 0000000..363bfca --- /dev/null +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ListerLogger.scala @@ -0,0 +1,9 @@ +package net.kemitix.thorp.storage.aws + +import cats.effect.IO +import net.kemitix.thorp.domain.Logger + +trait ListerLogger { + def logFetchBatch(implicit l: Logger): IO[Unit] = l.info("Fetching remote summaries...") +} +object ListerLogger extends ListerLogger diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala index 887ae90..6e63539 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala @@ -18,7 +18,8 @@ class S3StorageService(amazonS3Client: => AmazonS3, lazy val deleter = new Deleter(amazonS3Client) override def listObjects(bucket: Bucket, - prefix: RemoteKey): EitherT[IO, String, S3ObjectsData] = + prefix: RemoteKey) + (implicit l: Logger): EitherT[IO, String, S3ObjectsData] = objectLister.listObjects(bucket, prefix) override def copy(bucket: Bucket, diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala index 5b53f3a..a12f3ee 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala @@ -116,7 +116,7 @@ class StorageServiceSuite LocalFile.resolve("root-file", md5HashMap(Root.hash), source, KeyGenerator.generateKey(source, prefix)) val bucket = Bucket("a-bucket") val remoteKey = RemoteKey("prefix/root-file") - val uploadEventListener = new UploadEventListener(localFile) + val uploadEventListener = new UploadEventListener(localFile, 1, SyncTotals(), 0L) val upload = stub[Upload] (amazonS3TransferManager upload (_: PutObjectRequest)).when(*).returns(upload) diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala index 5265187..0ebba03 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala @@ -39,7 +39,7 @@ class UploaderSuite val returnedKey = RemoteKey("returned-key") val returnedHash = MD5Hash("returned-hash") val bigFile = LocalFile.resolve("small-file", md5HashMap(MD5Hash("the-hash")), source, fileToKey) - val uploadEventListener = new UploadEventListener(bigFile) + val uploadEventListener = new UploadEventListener(bigFile, 1, SyncTotals(), 0L) val amazonS3 = mock[AmazonS3] val amazonS3TransferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build val uploader = new Uploader(amazonS3TransferManager)