From 656f3fec305211d4ddfb8fdded0c5a30e628561d Mon Sep 17 00:00:00 2001 From: Paul Campbell Date: Thu, 8 Aug 2019 20:37:25 +0100 Subject: [PATCH] [core] Extract PlanExecutor --- .../scala/net/kemitix/thorp/cli/Program.scala | 30 ++------- .../net/kemitix/thorp/core/PlanBuilder.scala | 61 +++++++++---------- .../net/kemitix/thorp/core/PlanExecutor.scala | 39 ++++++++++++ .../net/kemitix/thorp/core/SyncLogging.scala | 2 +- .../kemitix/thorp/core/PlanBuilderTest.scala | 9 +-- 5 files changed, 77 insertions(+), 64 deletions(-) create mode 100644 core/src/main/scala/net/kemitix/thorp/core/PlanExecutor.scala diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala index 4198d6a..9aedde4 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala @@ -25,10 +25,11 @@ trait Program { private def execute = for { - plan <- PlanBuilder.createPlan - archive <- UnversionedMirrorArchive.default(plan.syncTotals) - events <- applyPlan(archive, plan) - _ <- SyncLogging.logRunFinished(events) + _ <- SyncLogging.logRunStart + syncPlan <- PlanBuilder.createPlan + archive <- UnversionedMirrorArchive.default(syncPlan.syncTotals) + events <- PlanExecutor.executePlan(archive, syncPlan) + _ <- SyncLogging.logRunFinished(events) } yield () private def handleErrors(throwable: Throwable) = @@ -40,27 +41,6 @@ trait Program { ZIO.foreach_(errors)(error => Console.putStrLn(s"- $error")) } - private def applyPlan(archive: ThorpArchive, syncPlan: SyncPlan) = - ZIO - .foldLeft(sequenceActions(syncPlan.actions))( - EventQueue(Stream.empty, syncPlan.syncTotals.totalSizeBytes))( - applyAction(archive)(_, _)) - .map(_.events) - - private def sequenceActions(actions: Stream[Action]) = - actions.zipWithIndex - .map({ case (a, i) => SequencedAction(a, i) }) - - private def applyAction(archive: ThorpArchive)( - queue: EventQueue, - action: SequencedAction - ) = { - val remainingBytes = queue.bytesInQueue - action.action.size - archive - .update(action, remainingBytes) - .map(events => EventQueue(queue.events ++ Stream(events), remainingBytes)) - } - } object Program extends Program diff --git a/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala b/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala index f2b9c56..fe20b9c 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala @@ -13,20 +13,14 @@ object PlanBuilder { def createPlan : RIO[Storage with Console with Config with FileSystem with Hasher, - SyncPlan] = - SyncLogging.logRunStart *> buildPlan - - private def buildPlan = - gatherMetadata >>= assemblePlan - - private def gatherMetadata = - fetchRemoteData &&& findLocalFiles + SyncPlan] = (fetchRemoteData <&> findLocalFiles) >>= assemblePlan private def fetchRemoteData = for { bucket <- Config.bucket prefix <- Config.prefix objects <- Storage.list(bucket, prefix) + _ <- Console.putStrLn(s"Found ${objects.byKey.size} remote objects") } yield objects private def assemblePlan(metadata: (RemoteObjects, LocalFiles)) = @@ -34,17 +28,17 @@ object PlanBuilder { case (remoteObjects, localData) => createActions(remoteObjects, localData.localFiles) .map(_.filter(doesSomething).sortBy(SequencePlan.order)) - .map( - SyncPlan - .create(_, - SyncTotals - .create(localData.count, localData.totalSizeBytes, 0L))) + .map(syncPlan(localData)) } - private def createActions( - remoteObjects: RemoteObjects, - localFiles: Stream[LocalFile] - ) = + private def syncPlan(localData: LocalFiles): Stream[Action] => SyncPlan = + SyncPlan.create(_, syncTotal(localData)) + + private def syncTotal(localData: LocalFiles): SyncTotals = + SyncTotals.create(localData.count, localData.totalSizeBytes, 0L) + + private def createActions(remoteObjects: RemoteObjects, + localFiles: Stream[LocalFile]) = for { fileActions <- actionsForLocalFiles(remoteObjects, localFiles) remoteActions <- actionsForRemoteKeys(remoteObjects.byKey.keys) @@ -55,28 +49,30 @@ object PlanBuilder { case _ => true } - private def actionsForLocalFiles( - remoteObjects: RemoteObjects, - localFiles: Stream[LocalFile] - ) = - ZIO.foldLeft(localFiles)(Stream.empty[Action])((acc, localFile) => - createActionsFromLocalFile(remoteObjects, acc, localFile).map(_ #::: acc)) + private def actionsForLocalFiles(remoteObjects: RemoteObjects, + localFiles: Stream[LocalFile]) = + ZIO.foldLeft(localFiles)(Stream.empty[Action])( + (acc, localFile) => + createActionsFromLocalFile(remoteObjects, acc, localFile) + .map(_ #::: acc) + ) - private def createActionsFromLocalFile( - remoteObjects: RemoteObjects, - previousActions: Stream[Action], - localFile: LocalFile - ) = + private def createActionsFromLocalFile(remoteObjects: RemoteObjects, + previousActions: Stream[Action], + localFile: LocalFile) = ActionGenerator.createActions( S3MetaDataEnricher.getMetadata(localFile, remoteObjects), - previousActions) + previousActions + ) private def actionsForRemoteKeys(remoteKeys: Iterable[RemoteKey]) = - ZIO.foldLeft(remoteKeys)(Stream.empty[Action])((acc, remoteKey) => - createActionFromRemoteKey(remoteKey).map(_ #:: acc)) + ZIO.foldLeft(remoteKeys)(Stream.empty[Action])( + (acc, remoteKey) => createActionFromRemoteKey(remoteKey).map(_ #:: acc) + ) private def createActionFromRemoteKey( - remoteKey: RemoteKey): ZIO[FileSystem with Config, Throwable, Action] = + remoteKey: RemoteKey + ): ZIO[FileSystem with Config, Throwable, Action] = for { bucket <- Config.bucket prefix <- Config.prefix @@ -93,6 +89,7 @@ object PlanBuilder { for { sources <- Config.sources found <- ZIO.foreach(sources.paths)(LocalFileStream.findFiles) + _ <- Console.putStrLn(s"Found ${found.flatMap(_.localFiles).size} files") } yield LocalFiles.reduce(found.toStream) } diff --git a/core/src/main/scala/net/kemitix/thorp/core/PlanExecutor.scala b/core/src/main/scala/net/kemitix/thorp/core/PlanExecutor.scala new file mode 100644 index 0000000..2d87106 --- /dev/null +++ b/core/src/main/scala/net/kemitix/thorp/core/PlanExecutor.scala @@ -0,0 +1,39 @@ +package net.kemitix.thorp.core + +import net.kemitix.thorp.config.Config +import net.kemitix.thorp.console.Console +import net.kemitix.thorp.domain.StorageQueueEvent +import net.kemitix.thorp.storage.api.Storage +import zio.{Ref, ZIO} + +trait PlanExecutor { + + def executePlan( + archive: ThorpArchive, + syncPlan: SyncPlan + ): ZIO[Storage with Config with Console, + Throwable, + Seq[ + StorageQueueEvent + ]] = + for { + actionCounter <- Ref.make(0) + bytesCounter <- Ref.make(0L) + events <- ZIO.foreach(syncPlan.actions) { + updateArchive(archive, actionCounter, bytesCounter) + } + } yield events + + private def updateArchive(archive: ThorpArchive, + actionCounterRef: Ref[Int], + bytesCounterRef: Ref[Long])(action: Action) = + for { + actionCounter <- actionCounterRef.update(_ + 1) + bytesCounter <- bytesCounterRef.update(_ + action.size) + event <- archive.update(SequencedAction(action, actionCounter), + bytesCounter) + } yield event + +} + +object PlanExecutor extends PlanExecutor diff --git a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala b/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala index 3c31b84..2c5071e 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala @@ -30,7 +30,7 @@ trait SyncLogging { } yield () def logRunFinished( - actions: Stream[StorageQueueEvent] + actions: Seq[StorageQueueEvent] ): ZIO[Console, Nothing, Unit] = { val counters = actions.foldLeft(Counters.empty)(countActivities) Console.putStrLn(eraseToEndOfScreen) *> diff --git a/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala b/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala index 43f9d47..70587ce 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala @@ -363,23 +363,20 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { override def shutdownResult: UIO[StorageQueueEvent] = Task.die(new NotImplementedError) } - def testProgram = for { config <- ConfigurationBuilder.buildConfig(configOptions) _ <- Config.set(config) plan <- PlanBuilder.createPlan } yield plan - new DefaultRuntime {} .unsafeRunSync(testProgram.provide(testEnv)) .toEither .map(convertResult) } - private def convertResult - : SyncPlan => List[(String, String, String, String, String)] = - _.actions.toList.map({ + private def convertResult(plan: SyncPlan) = + plan.actions.map({ case ToUpload(_, lf, _) => ("upload", lf.remoteKey.key, @@ -391,6 +388,6 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { ("copy", sourceKey.key, MD5Hash.hash(hash), targetKey.key, "") case DoNothing(_, remoteKey, _) => ("do-nothing", remoteKey.key, "", "", "") - case x => ("other", "", "", "", "") + case _ => ("other", "", "", "", "") }) }