diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala index 6acd473..4198d6a 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala @@ -4,7 +4,6 @@ import net.kemitix.thorp.config._ import net.kemitix.thorp.console._ import net.kemitix.thorp.core.CoreTypes.CoreProgram import net.kemitix.thorp.core._ -import net.kemitix.thorp.domain.StorageQueueEvent import zio.ZIO trait Program { @@ -24,62 +23,44 @@ trait Program { private def showVersion: ConfigOptions => Boolean = cli => ConfigQuery.showVersion(cli) - private def execute = { + private def execute = for { plan <- PlanBuilder.createPlan archive <- UnversionedMirrorArchive.default(plan.syncTotals) events <- applyPlan(archive, plan) _ <- SyncLogging.logRunFinished(events) } yield () - } private def handleErrors(throwable: Throwable) = - for { - _ <- Console.putStrLn("There were errors:") - _ <- throwable match { - case ConfigValidationException(errors) => - ZIO.foreach_(errors)(error => Console.putStrLn(s"- $error")) - } - } yield () + Console.putStrLn("There were errors:") *> logValidationErrors(throwable) - private def applyPlan( - archive: ThorpArchive, - syncPlan: SyncPlan - ) = { - val zero: (Stream[StorageQueueEvent], Long) = - (Stream(), syncPlan.syncTotals.totalSizeBytes) - val actions = syncPlan.actions.zipWithIndex + private def logValidationErrors(throwable: Throwable) = + throwable match { + case ConfigValidationException(errors) => + ZIO.foreach_(errors)(error => Console.putStrLn(s"- $error")) + } + + private def applyPlan(archive: ThorpArchive, syncPlan: SyncPlan) = ZIO - .foldLeft(actions)(zero)((acc, action) => - applyAction(archive, acc, action)) - .map { - case (events, _) => events - } - } + .foldLeft(sequenceActions(syncPlan.actions))( + EventQueue(Stream.empty, syncPlan.syncTotals.totalSizeBytes))( + applyAction(archive)(_, _)) + .map(_.events) - private def applyAction( - archive: ThorpArchive, - acc: (Stream[StorageQueueEvent], Long), - indexedAction: (Action, Int) + private def sequenceActions(actions: Stream[Action]) = + actions.zipWithIndex + .map({ case (a, i) => SequencedAction(a, i) }) + + private def applyAction(archive: ThorpArchive)( + queue: EventQueue, + action: SequencedAction ) = { - val (action, index) = indexedAction - val (stream, bytesToDo) = acc - val remainingBytes = bytesToDo - action.size - updateArchive(archive, action, index, stream, remainingBytes) - .map((_, remainingBytes)) + val remainingBytes = queue.bytesInQueue - action.action.size + archive + .update(action, remainingBytes) + .map(events => EventQueue(queue.events ++ Stream(events), remainingBytes)) } - private def updateArchive( - archive: ThorpArchive, - action: Action, - index: Int, - stream: Stream[StorageQueueEvent], - remainingBytes: Long - ) = - for { - event <- archive.update(index, action, remainingBytes) - } yield stream ++ Stream(event) - } object Program extends Program diff --git a/core/src/main/scala/net/kemitix/thorp/core/EventQueue.scala b/core/src/main/scala/net/kemitix/thorp/core/EventQueue.scala new file mode 100644 index 0000000..850f664 --- /dev/null +++ b/core/src/main/scala/net/kemitix/thorp/core/EventQueue.scala @@ -0,0 +1,8 @@ +package net.kemitix.thorp.core + +import net.kemitix.thorp.domain.StorageQueueEvent + +case class EventQueue( + events: Stream[StorageQueueEvent], + bytesInQueue: Long +) diff --git a/core/src/main/scala/net/kemitix/thorp/core/SequencedAction.scala b/core/src/main/scala/net/kemitix/thorp/core/SequencedAction.scala new file mode 100644 index 0000000..d9af258 --- /dev/null +++ b/core/src/main/scala/net/kemitix/thorp/core/SequencedAction.scala @@ -0,0 +1,6 @@ +package net.kemitix.thorp.core + +case class SequencedAction( + action: Action, + index: Int +) diff --git a/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala b/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala index f415cb7..1244f38 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala @@ -16,8 +16,7 @@ import zio.{TaskR, ZIO} trait ThorpArchive { def update( - index: Int, - action: Action, + sequencedAction: SequencedAction, totalBytesSoFar: Long ): TaskR[Storage with Console with Config, StorageQueueEvent] diff --git a/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala b/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala index 5096846..d6969ff 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala @@ -12,18 +12,17 @@ case class UnversionedMirrorArchive(syncTotals: SyncTotals) extends ThorpArchive { override def update( - index: Int, - action: Action, + sequencedAction: SequencedAction, totalBytesSoFar: Long ): TaskR[Storage with Console with Config, StorageQueueEvent] = - action match { - case ToUpload(bucket, localFile, _) => + sequencedAction match { + case SequencedAction(ToUpload(bucket, localFile, _), index) => doUpload(index, totalBytesSoFar, bucket, localFile) >>= logEvent - case ToCopy(bucket, sourceKey, hash, targetKey, _) => + case SequencedAction(ToCopy(bucket, sourceKey, hash, targetKey, _), _) => Storage.copy(bucket, sourceKey, hash, targetKey) >>= logEvent - case ToDelete(bucket, remoteKey, _) => + case SequencedAction(ToDelete(bucket, remoteKey, _), _) => Storage.delete(bucket, remoteKey) >>= logEvent - case DoNothing(_, remoteKey, _) => + case SequencedAction(DoNothing(_, remoteKey, _), _) => Task(DoNothingQueueEvent(remoteKey)) } @@ -33,12 +32,16 @@ case class UnversionedMirrorArchive(syncTotals: SyncTotals) bucket: Bucket, localFile: LocalFile ) = - Storage.upload(localFile, - bucket, - UploadEventListener.Settings(localFile, - index, - syncTotals, - totalBytesSoFar)) + Storage.upload( + localFile, + bucket, + UploadEventListener.Settings( + localFile, + index, + syncTotals, + totalBytesSoFar + ) + ) } object UnversionedMirrorArchive {