[core] PlanExecutor handles building large action list better (#163)
The original code had been able to handle this, but the extraction the a separate module introduced the stackoverflow.
This commit is contained in:
parent
2be82eafa2
commit
551988d6b8
3 changed files with 85 additions and 20 deletions
|
@ -19,11 +19,27 @@ trait PlanExecutor {
|
||||||
for {
|
for {
|
||||||
actionCounter <- Ref.make(0)
|
actionCounter <- Ref.make(0)
|
||||||
bytesCounter <- Ref.make(0L)
|
bytesCounter <- Ref.make(0L)
|
||||||
events <- ZIO.foreach(syncPlan.actions) {
|
events <- applyActions(archive, syncPlan, actionCounter, bytesCounter)
|
||||||
updateArchive(archive, actionCounter, bytesCounter)
|
|
||||||
}
|
|
||||||
} yield events
|
} yield events
|
||||||
|
|
||||||
|
private def applyActions(
|
||||||
|
archive: ThorpArchive,
|
||||||
|
syncPlan: SyncPlan,
|
||||||
|
actionCounter: Ref[Int],
|
||||||
|
bytesCounter: Ref[Long]
|
||||||
|
): ZIO[Storage with Console with Config,
|
||||||
|
Throwable,
|
||||||
|
Stream[StorageQueueEvent]] = {
|
||||||
|
ZIO.foldLeft(syncPlan.actions)(Stream.empty[StorageQueueEvent]) {
|
||||||
|
(stream: Stream[StorageQueueEvent], action) =>
|
||||||
|
val result: ZIO[Storage with Console with Config,
|
||||||
|
Throwable,
|
||||||
|
StorageQueueEvent] =
|
||||||
|
updateArchive(archive, actionCounter, bytesCounter)(action)
|
||||||
|
result.map(event => event #:: stream)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private def updateArchive(archive: ThorpArchive,
|
private def updateArchive(archive: ThorpArchive,
|
||||||
actionCounterRef: Ref[Int],
|
actionCounterRef: Ref[Int],
|
||||||
bytesCounterRef: Ref[Long])(action: Action) =
|
bytesCounterRef: Ref[Long])(action: Action) =
|
||||||
|
|
|
@ -0,0 +1,55 @@
|
||||||
|
package net.kemitix.thorp.core
|
||||||
|
|
||||||
|
import net.kemitix.thorp.config.Config
|
||||||
|
import net.kemitix.thorp.console.Console
|
||||||
|
import net.kemitix.thorp.core.Action.DoNothing
|
||||||
|
import net.kemitix.thorp.domain.{
|
||||||
|
Bucket,
|
||||||
|
RemoteKey,
|
||||||
|
StorageQueueEvent,
|
||||||
|
SyncTotals
|
||||||
|
}
|
||||||
|
import net.kemitix.thorp.storage.api.Storage
|
||||||
|
import org.scalatest.FreeSpec
|
||||||
|
import zio.{DefaultRuntime, ZIO}
|
||||||
|
|
||||||
|
class PlanExecutorTest extends FreeSpec {
|
||||||
|
|
||||||
|
private def subject(in: Stream[Int]): ZIO[Any, Throwable, Stream[Int]] =
|
||||||
|
ZIO.foldLeft(in)(Stream.empty[Int])((s, i) => ZIO(i #:: s)).map(_.reverse)
|
||||||
|
|
||||||
|
"zio foreach on a stream can be a stream" in {
|
||||||
|
val input = (1 to 1000000).toStream
|
||||||
|
val program = subject(input)
|
||||||
|
val result = new DefaultRuntime {}.unsafeRunSync(program).toEither
|
||||||
|
assertResult(Right(input))(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
"build plan with 100,000 actions" in {
|
||||||
|
val nActions = 100000
|
||||||
|
val bucket = Bucket("bucket")
|
||||||
|
val remoteKey = RemoteKey("remoteKey")
|
||||||
|
val input = (1 to nActions).toStream.map(DoNothing(bucket, remoteKey, _))
|
||||||
|
|
||||||
|
val syncTotals = SyncTotals.empty
|
||||||
|
val archiveTask = UnversionedMirrorArchive.default(syncTotals)
|
||||||
|
|
||||||
|
val syncPlan = SyncPlan(input, syncTotals)
|
||||||
|
val program: ZIO[Storage with Config with Console,
|
||||||
|
Throwable,
|
||||||
|
Seq[StorageQueueEvent]] =
|
||||||
|
archiveTask.flatMap(archive =>
|
||||||
|
PlanExecutor.executePlan(archive, syncPlan))
|
||||||
|
|
||||||
|
val result: Either[Throwable, Seq[StorageQueueEvent]] =
|
||||||
|
new DefaultRuntime {}.unsafeRunSync(program.provide(TestEnv)).toEither
|
||||||
|
|
||||||
|
val expected = Right(
|
||||||
|
(1 to nActions).toStream
|
||||||
|
.map(_ => StorageQueueEvent.DoNothingQueueEvent(remoteKey)))
|
||||||
|
assertResult(expected)(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
object TestEnv extends Storage.Test with Config.Live with Console.Test
|
||||||
|
|
||||||
|
}
|
|
@ -40,11 +40,16 @@ object Storage {
|
||||||
|
|
||||||
trait Test extends Storage {
|
trait Test extends Storage {
|
||||||
|
|
||||||
def listResult: Task[RemoteObjects]
|
def listResult: Task[RemoteObjects] =
|
||||||
def uploadResult: UIO[StorageQueueEvent]
|
Task.die(new NotImplementedError)
|
||||||
def copyResult: UIO[StorageQueueEvent]
|
def uploadResult: UIO[StorageQueueEvent] =
|
||||||
def deleteResult: UIO[StorageQueueEvent]
|
Task.die(new NotImplementedError)
|
||||||
def shutdownResult: UIO[StorageQueueEvent]
|
def copyResult: UIO[StorageQueueEvent] =
|
||||||
|
Task.die(new NotImplementedError)
|
||||||
|
def deleteResult: UIO[StorageQueueEvent] =
|
||||||
|
Task.die(new NotImplementedError)
|
||||||
|
def shutdownResult: UIO[StorageQueueEvent] =
|
||||||
|
Task.die(new NotImplementedError)
|
||||||
|
|
||||||
val storage: Service = new Service {
|
val storage: Service = new Service {
|
||||||
|
|
||||||
|
@ -77,18 +82,7 @@ object Storage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
object Test extends Test {
|
object Test extends Test
|
||||||
override def listResult: Task[RemoteObjects] =
|
|
||||||
Task.die(new NotImplementedError)
|
|
||||||
override def uploadResult: UIO[StorageQueueEvent] =
|
|
||||||
Task.die(new NotImplementedError)
|
|
||||||
override def copyResult: UIO[StorageQueueEvent] =
|
|
||||||
Task.die(new NotImplementedError)
|
|
||||||
override def deleteResult: UIO[StorageQueueEvent] =
|
|
||||||
Task.die(new NotImplementedError)
|
|
||||||
override def shutdownResult: UIO[StorageQueueEvent] =
|
|
||||||
Task.die(new NotImplementedError)
|
|
||||||
}
|
|
||||||
|
|
||||||
final def list(bucket: Bucket,
|
final def list(bucket: Bucket,
|
||||||
prefix: RemoteKey): RIO[Storage with Console, RemoteObjects] =
|
prefix: RemoteKey): RIO[Storage with Console, RemoteObjects] =
|
||||||
|
|
Loading…
Reference in a new issue