[core] Extract PlanExecutor

This commit is contained in:
Paul Campbell 2019-08-08 20:37:25 +01:00
parent 242804294d
commit 656f3fec30
5 changed files with 77 additions and 64 deletions

View file

@ -25,9 +25,10 @@ trait Program {
private def execute = private def execute =
for { for {
plan <- PlanBuilder.createPlan _ <- SyncLogging.logRunStart
archive <- UnversionedMirrorArchive.default(plan.syncTotals) syncPlan <- PlanBuilder.createPlan
events <- applyPlan(archive, plan) archive <- UnversionedMirrorArchive.default(syncPlan.syncTotals)
events <- PlanExecutor.executePlan(archive, syncPlan)
_ <- SyncLogging.logRunFinished(events) _ <- SyncLogging.logRunFinished(events)
} yield () } yield ()
@ -40,27 +41,6 @@ trait Program {
ZIO.foreach_(errors)(error => Console.putStrLn(s"- $error")) ZIO.foreach_(errors)(error => Console.putStrLn(s"- $error"))
} }
private def applyPlan(archive: ThorpArchive, syncPlan: SyncPlan) =
ZIO
.foldLeft(sequenceActions(syncPlan.actions))(
EventQueue(Stream.empty, syncPlan.syncTotals.totalSizeBytes))(
applyAction(archive)(_, _))
.map(_.events)
private def sequenceActions(actions: Stream[Action]) =
actions.zipWithIndex
.map({ case (a, i) => SequencedAction(a, i) })
private def applyAction(archive: ThorpArchive)(
queue: EventQueue,
action: SequencedAction
) = {
val remainingBytes = queue.bytesInQueue - action.action.size
archive
.update(action, remainingBytes)
.map(events => EventQueue(queue.events ++ Stream(events), remainingBytes))
}
} }
object Program extends Program object Program extends Program

View file

@ -13,20 +13,14 @@ object PlanBuilder {
def createPlan def createPlan
: RIO[Storage with Console with Config with FileSystem with Hasher, : RIO[Storage with Console with Config with FileSystem with Hasher,
SyncPlan] = SyncPlan] = (fetchRemoteData <&> findLocalFiles) >>= assemblePlan
SyncLogging.logRunStart *> buildPlan
private def buildPlan =
gatherMetadata >>= assemblePlan
private def gatherMetadata =
fetchRemoteData &&& findLocalFiles
private def fetchRemoteData = private def fetchRemoteData =
for { for {
bucket <- Config.bucket bucket <- Config.bucket
prefix <- Config.prefix prefix <- Config.prefix
objects <- Storage.list(bucket, prefix) objects <- Storage.list(bucket, prefix)
_ <- Console.putStrLn(s"Found ${objects.byKey.size} remote objects")
} yield objects } yield objects
private def assemblePlan(metadata: (RemoteObjects, LocalFiles)) = private def assemblePlan(metadata: (RemoteObjects, LocalFiles)) =
@ -34,17 +28,17 @@ object PlanBuilder {
case (remoteObjects, localData) => case (remoteObjects, localData) =>
createActions(remoteObjects, localData.localFiles) createActions(remoteObjects, localData.localFiles)
.map(_.filter(doesSomething).sortBy(SequencePlan.order)) .map(_.filter(doesSomething).sortBy(SequencePlan.order))
.map( .map(syncPlan(localData))
SyncPlan
.create(_,
SyncTotals
.create(localData.count, localData.totalSizeBytes, 0L)))
} }
private def createActions( private def syncPlan(localData: LocalFiles): Stream[Action] => SyncPlan =
remoteObjects: RemoteObjects, SyncPlan.create(_, syncTotal(localData))
localFiles: Stream[LocalFile]
) = private def syncTotal(localData: LocalFiles): SyncTotals =
SyncTotals.create(localData.count, localData.totalSizeBytes, 0L)
private def createActions(remoteObjects: RemoteObjects,
localFiles: Stream[LocalFile]) =
for { for {
fileActions <- actionsForLocalFiles(remoteObjects, localFiles) fileActions <- actionsForLocalFiles(remoteObjects, localFiles)
remoteActions <- actionsForRemoteKeys(remoteObjects.byKey.keys) remoteActions <- actionsForRemoteKeys(remoteObjects.byKey.keys)
@ -55,28 +49,30 @@ object PlanBuilder {
case _ => true case _ => true
} }
private def actionsForLocalFiles( private def actionsForLocalFiles(remoteObjects: RemoteObjects,
remoteObjects: RemoteObjects, localFiles: Stream[LocalFile]) =
localFiles: Stream[LocalFile] ZIO.foldLeft(localFiles)(Stream.empty[Action])(
) = (acc, localFile) =>
ZIO.foldLeft(localFiles)(Stream.empty[Action])((acc, localFile) => createActionsFromLocalFile(remoteObjects, acc, localFile)
createActionsFromLocalFile(remoteObjects, acc, localFile).map(_ #::: acc)) .map(_ #::: acc)
)
private def createActionsFromLocalFile( private def createActionsFromLocalFile(remoteObjects: RemoteObjects,
remoteObjects: RemoteObjects,
previousActions: Stream[Action], previousActions: Stream[Action],
localFile: LocalFile localFile: LocalFile) =
) =
ActionGenerator.createActions( ActionGenerator.createActions(
S3MetaDataEnricher.getMetadata(localFile, remoteObjects), S3MetaDataEnricher.getMetadata(localFile, remoteObjects),
previousActions) previousActions
)
private def actionsForRemoteKeys(remoteKeys: Iterable[RemoteKey]) = private def actionsForRemoteKeys(remoteKeys: Iterable[RemoteKey]) =
ZIO.foldLeft(remoteKeys)(Stream.empty[Action])((acc, remoteKey) => ZIO.foldLeft(remoteKeys)(Stream.empty[Action])(
createActionFromRemoteKey(remoteKey).map(_ #:: acc)) (acc, remoteKey) => createActionFromRemoteKey(remoteKey).map(_ #:: acc)
)
private def createActionFromRemoteKey( private def createActionFromRemoteKey(
remoteKey: RemoteKey): ZIO[FileSystem with Config, Throwable, Action] = remoteKey: RemoteKey
): ZIO[FileSystem with Config, Throwable, Action] =
for { for {
bucket <- Config.bucket bucket <- Config.bucket
prefix <- Config.prefix prefix <- Config.prefix
@ -93,6 +89,7 @@ object PlanBuilder {
for { for {
sources <- Config.sources sources <- Config.sources
found <- ZIO.foreach(sources.paths)(LocalFileStream.findFiles) found <- ZIO.foreach(sources.paths)(LocalFileStream.findFiles)
_ <- Console.putStrLn(s"Found ${found.flatMap(_.localFiles).size} files")
} yield LocalFiles.reduce(found.toStream) } yield LocalFiles.reduce(found.toStream)
} }

View file

@ -0,0 +1,39 @@
package net.kemitix.thorp.core
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.domain.StorageQueueEvent
import net.kemitix.thorp.storage.api.Storage
import zio.{Ref, ZIO}
trait PlanExecutor {
def executePlan(
archive: ThorpArchive,
syncPlan: SyncPlan
): ZIO[Storage with Config with Console,
Throwable,
Seq[
StorageQueueEvent
]] =
for {
actionCounter <- Ref.make(0)
bytesCounter <- Ref.make(0L)
events <- ZIO.foreach(syncPlan.actions) {
updateArchive(archive, actionCounter, bytesCounter)
}
} yield events
private def updateArchive(archive: ThorpArchive,
actionCounterRef: Ref[Int],
bytesCounterRef: Ref[Long])(action: Action) =
for {
actionCounter <- actionCounterRef.update(_ + 1)
bytesCounter <- bytesCounterRef.update(_ + action.size)
event <- archive.update(SequencedAction(action, actionCounter),
bytesCounter)
} yield event
}
object PlanExecutor extends PlanExecutor

View file

@ -30,7 +30,7 @@ trait SyncLogging {
} yield () } yield ()
def logRunFinished( def logRunFinished(
actions: Stream[StorageQueueEvent] actions: Seq[StorageQueueEvent]
): ZIO[Console, Nothing, Unit] = { ): ZIO[Console, Nothing, Unit] = {
val counters = actions.foldLeft(Counters.empty)(countActivities) val counters = actions.foldLeft(Counters.empty)(countActivities)
Console.putStrLn(eraseToEndOfScreen) *> Console.putStrLn(eraseToEndOfScreen) *>

View file

@ -363,23 +363,20 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
override def shutdownResult: UIO[StorageQueueEvent] = override def shutdownResult: UIO[StorageQueueEvent] =
Task.die(new NotImplementedError) Task.die(new NotImplementedError)
} }
def testProgram = def testProgram =
for { for {
config <- ConfigurationBuilder.buildConfig(configOptions) config <- ConfigurationBuilder.buildConfig(configOptions)
_ <- Config.set(config) _ <- Config.set(config)
plan <- PlanBuilder.createPlan plan <- PlanBuilder.createPlan
} yield plan } yield plan
new DefaultRuntime {} new DefaultRuntime {}
.unsafeRunSync(testProgram.provide(testEnv)) .unsafeRunSync(testProgram.provide(testEnv))
.toEither .toEither
.map(convertResult) .map(convertResult)
} }
private def convertResult private def convertResult(plan: SyncPlan) =
: SyncPlan => List[(String, String, String, String, String)] = plan.actions.map({
_.actions.toList.map({
case ToUpload(_, lf, _) => case ToUpload(_, lf, _) =>
("upload", ("upload",
lf.remoteKey.key, lf.remoteKey.key,
@ -391,6 +388,6 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
("copy", sourceKey.key, MD5Hash.hash(hash), targetKey.key, "") ("copy", sourceKey.key, MD5Hash.hash(hash), targetKey.key, "")
case DoNothing(_, remoteKey, _) => case DoNothing(_, remoteKey, _) =>
("do-nothing", remoteKey.key, "", "", "") ("do-nothing", remoteKey.key, "", "", "")
case x => ("other", "", "", "", "") case _ => ("other", "", "", "", "")
}) })
} }