diff --git a/core/src/main/scala/net/kemitix/thorp/core/PlanExecutor.scala b/core/src/main/scala/net/kemitix/thorp/core/PlanExecutor.scala index 2d87106..b54406f 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/PlanExecutor.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/PlanExecutor.scala @@ -19,11 +19,27 @@ trait PlanExecutor { for { actionCounter <- Ref.make(0) bytesCounter <- Ref.make(0L) - events <- ZIO.foreach(syncPlan.actions) { - updateArchive(archive, actionCounter, bytesCounter) - } + events <- applyActions(archive, syncPlan, actionCounter, bytesCounter) } yield events + private def applyActions( + archive: ThorpArchive, + syncPlan: SyncPlan, + actionCounter: Ref[Int], + bytesCounter: Ref[Long] + ): ZIO[Storage with Console with Config, + Throwable, + Stream[StorageQueueEvent]] = { + ZIO.foldLeft(syncPlan.actions)(Stream.empty[StorageQueueEvent]) { + (stream: Stream[StorageQueueEvent], action) => + val result: ZIO[Storage with Console with Config, + Throwable, + StorageQueueEvent] = + updateArchive(archive, actionCounter, bytesCounter)(action) + result.map(event => event #:: stream) + } + } + private def updateArchive(archive: ThorpArchive, actionCounterRef: Ref[Int], bytesCounterRef: Ref[Long])(action: Action) = diff --git a/core/src/test/scala/net/kemitix/thorp/core/PlanExecutorTest.scala b/core/src/test/scala/net/kemitix/thorp/core/PlanExecutorTest.scala new file mode 100644 index 0000000..a3c1c03 --- /dev/null +++ b/core/src/test/scala/net/kemitix/thorp/core/PlanExecutorTest.scala @@ -0,0 +1,55 @@ +package net.kemitix.thorp.core + +import net.kemitix.thorp.config.Config +import net.kemitix.thorp.console.Console +import net.kemitix.thorp.core.Action.DoNothing +import net.kemitix.thorp.domain.{ + Bucket, + RemoteKey, + StorageQueueEvent, + SyncTotals +} +import net.kemitix.thorp.storage.api.Storage +import org.scalatest.FreeSpec +import zio.{DefaultRuntime, ZIO} + +class PlanExecutorTest extends FreeSpec { + + private def subject(in: Stream[Int]): ZIO[Any, Throwable, Stream[Int]] = + ZIO.foldLeft(in)(Stream.empty[Int])((s, i) => ZIO(i #:: s)).map(_.reverse) + + "zio foreach on a stream can be a stream" in { + val input = (1 to 1000000).toStream + val program = subject(input) + val result = new DefaultRuntime {}.unsafeRunSync(program).toEither + assertResult(Right(input))(result) + } + + "build plan with 100,000 actions" in { + val nActions = 100000 + val bucket = Bucket("bucket") + val remoteKey = RemoteKey("remoteKey") + val input = (1 to nActions).toStream.map(DoNothing(bucket, remoteKey, _)) + + val syncTotals = SyncTotals.empty + val archiveTask = UnversionedMirrorArchive.default(syncTotals) + + val syncPlan = SyncPlan(input, syncTotals) + val program: ZIO[Storage with Config with Console, + Throwable, + Seq[StorageQueueEvent]] = + archiveTask.flatMap(archive => + PlanExecutor.executePlan(archive, syncPlan)) + + val result: Either[Throwable, Seq[StorageQueueEvent]] = + new DefaultRuntime {}.unsafeRunSync(program.provide(TestEnv)).toEither + + val expected = Right( + (1 to nActions).toStream + .map(_ => StorageQueueEvent.DoNothingQueueEvent(remoteKey))) + assertResult(expected)(result) + } + + object TestEnv extends Storage.Test with Config.Live with Console.Test + +} diff --git a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/Storage.scala b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/Storage.scala index c84c5af..f3148a6 100644 --- a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/Storage.scala +++ b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/Storage.scala @@ -40,11 +40,16 @@ object Storage { trait Test extends Storage { - def listResult: Task[RemoteObjects] - def uploadResult: UIO[StorageQueueEvent] - def copyResult: UIO[StorageQueueEvent] - def deleteResult: UIO[StorageQueueEvent] - def shutdownResult: UIO[StorageQueueEvent] + def listResult: Task[RemoteObjects] = + Task.die(new NotImplementedError) + def uploadResult: UIO[StorageQueueEvent] = + Task.die(new NotImplementedError) + def copyResult: UIO[StorageQueueEvent] = + Task.die(new NotImplementedError) + def deleteResult: UIO[StorageQueueEvent] = + Task.die(new NotImplementedError) + def shutdownResult: UIO[StorageQueueEvent] = + Task.die(new NotImplementedError) val storage: Service = new Service { @@ -77,18 +82,7 @@ object Storage { } } - object Test extends Test { - override def listResult: Task[RemoteObjects] = - Task.die(new NotImplementedError) - override def uploadResult: UIO[StorageQueueEvent] = - Task.die(new NotImplementedError) - override def copyResult: UIO[StorageQueueEvent] = - Task.die(new NotImplementedError) - override def deleteResult: UIO[StorageQueueEvent] = - Task.die(new NotImplementedError) - override def shutdownResult: UIO[StorageQueueEvent] = - Task.die(new NotImplementedError) - } + object Test extends Test final def list(bucket: Bucket, prefix: RemoteKey): RIO[Storage with Console, RemoteObjects] =