Refactor Program.updateArchive(ThorpArchive,Action,Int,Stream,Long) (#142)
* [cli] Program Refactor * [cli] Program Refactor
This commit is contained in:
parent
e3b0260b6d
commit
b643b7ea92
5 changed files with 55 additions and 58 deletions
|
@ -4,7 +4,6 @@ import net.kemitix.thorp.config._
|
||||||
import net.kemitix.thorp.console._
|
import net.kemitix.thorp.console._
|
||||||
import net.kemitix.thorp.core.CoreTypes.CoreProgram
|
import net.kemitix.thorp.core.CoreTypes.CoreProgram
|
||||||
import net.kemitix.thorp.core._
|
import net.kemitix.thorp.core._
|
||||||
import net.kemitix.thorp.domain.StorageQueueEvent
|
|
||||||
import zio.ZIO
|
import zio.ZIO
|
||||||
|
|
||||||
trait Program {
|
trait Program {
|
||||||
|
@ -24,62 +23,44 @@ trait Program {
|
||||||
private def showVersion: ConfigOptions => Boolean =
|
private def showVersion: ConfigOptions => Boolean =
|
||||||
cli => ConfigQuery.showVersion(cli)
|
cli => ConfigQuery.showVersion(cli)
|
||||||
|
|
||||||
private def execute = {
|
private def execute =
|
||||||
for {
|
for {
|
||||||
plan <- PlanBuilder.createPlan
|
plan <- PlanBuilder.createPlan
|
||||||
archive <- UnversionedMirrorArchive.default(plan.syncTotals)
|
archive <- UnversionedMirrorArchive.default(plan.syncTotals)
|
||||||
events <- applyPlan(archive, plan)
|
events <- applyPlan(archive, plan)
|
||||||
_ <- SyncLogging.logRunFinished(events)
|
_ <- SyncLogging.logRunFinished(events)
|
||||||
} yield ()
|
} yield ()
|
||||||
}
|
|
||||||
|
|
||||||
private def handleErrors(throwable: Throwable) =
|
private def handleErrors(throwable: Throwable) =
|
||||||
for {
|
Console.putStrLn("There were errors:") *> logValidationErrors(throwable)
|
||||||
_ <- Console.putStrLn("There were errors:")
|
|
||||||
_ <- throwable match {
|
|
||||||
case ConfigValidationException(errors) =>
|
|
||||||
ZIO.foreach_(errors)(error => Console.putStrLn(s"- $error"))
|
|
||||||
}
|
|
||||||
} yield ()
|
|
||||||
|
|
||||||
private def applyPlan(
|
private def logValidationErrors(throwable: Throwable) =
|
||||||
archive: ThorpArchive,
|
throwable match {
|
||||||
syncPlan: SyncPlan
|
case ConfigValidationException(errors) =>
|
||||||
) = {
|
ZIO.foreach_(errors)(error => Console.putStrLn(s"- $error"))
|
||||||
val zero: (Stream[StorageQueueEvent], Long) =
|
}
|
||||||
(Stream(), syncPlan.syncTotals.totalSizeBytes)
|
|
||||||
val actions = syncPlan.actions.zipWithIndex
|
private def applyPlan(archive: ThorpArchive, syncPlan: SyncPlan) =
|
||||||
ZIO
|
ZIO
|
||||||
.foldLeft(actions)(zero)((acc, action) =>
|
.foldLeft(sequenceActions(syncPlan.actions))(
|
||||||
applyAction(archive, acc, action))
|
EventQueue(Stream.empty, syncPlan.syncTotals.totalSizeBytes))(
|
||||||
.map {
|
applyAction(archive)(_, _))
|
||||||
case (events, _) => events
|
.map(_.events)
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private def applyAction(
|
private def sequenceActions(actions: Stream[Action]) =
|
||||||
archive: ThorpArchive,
|
actions.zipWithIndex
|
||||||
acc: (Stream[StorageQueueEvent], Long),
|
.map({ case (a, i) => SequencedAction(a, i) })
|
||||||
indexedAction: (Action, Int)
|
|
||||||
|
private def applyAction(archive: ThorpArchive)(
|
||||||
|
queue: EventQueue,
|
||||||
|
action: SequencedAction
|
||||||
) = {
|
) = {
|
||||||
val (action, index) = indexedAction
|
val remainingBytes = queue.bytesInQueue - action.action.size
|
||||||
val (stream, bytesToDo) = acc
|
archive
|
||||||
val remainingBytes = bytesToDo - action.size
|
.update(action, remainingBytes)
|
||||||
updateArchive(archive, action, index, stream, remainingBytes)
|
.map(events => EventQueue(queue.events ++ Stream(events), remainingBytes))
|
||||||
.map((_, remainingBytes))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private def updateArchive(
|
|
||||||
archive: ThorpArchive,
|
|
||||||
action: Action,
|
|
||||||
index: Int,
|
|
||||||
stream: Stream[StorageQueueEvent],
|
|
||||||
remainingBytes: Long
|
|
||||||
) =
|
|
||||||
for {
|
|
||||||
event <- archive.update(index, action, remainingBytes)
|
|
||||||
} yield stream ++ Stream(event)
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
object Program extends Program
|
object Program extends Program
|
||||||
|
|
|
@ -0,0 +1,8 @@
|
||||||
|
package net.kemitix.thorp.core
|
||||||
|
|
||||||
|
import net.kemitix.thorp.domain.StorageQueueEvent
|
||||||
|
|
||||||
|
case class EventQueue(
|
||||||
|
events: Stream[StorageQueueEvent],
|
||||||
|
bytesInQueue: Long
|
||||||
|
)
|
|
@ -0,0 +1,6 @@
|
||||||
|
package net.kemitix.thorp.core
|
||||||
|
|
||||||
|
case class SequencedAction(
|
||||||
|
action: Action,
|
||||||
|
index: Int
|
||||||
|
)
|
|
@ -16,8 +16,7 @@ import zio.{TaskR, ZIO}
|
||||||
trait ThorpArchive {
|
trait ThorpArchive {
|
||||||
|
|
||||||
def update(
|
def update(
|
||||||
index: Int,
|
sequencedAction: SequencedAction,
|
||||||
action: Action,
|
|
||||||
totalBytesSoFar: Long
|
totalBytesSoFar: Long
|
||||||
): TaskR[Storage with Console with Config, StorageQueueEvent]
|
): TaskR[Storage with Console with Config, StorageQueueEvent]
|
||||||
|
|
||||||
|
|
|
@ -12,18 +12,17 @@ case class UnversionedMirrorArchive(syncTotals: SyncTotals)
|
||||||
extends ThorpArchive {
|
extends ThorpArchive {
|
||||||
|
|
||||||
override def update(
|
override def update(
|
||||||
index: Int,
|
sequencedAction: SequencedAction,
|
||||||
action: Action,
|
|
||||||
totalBytesSoFar: Long
|
totalBytesSoFar: Long
|
||||||
): TaskR[Storage with Console with Config, StorageQueueEvent] =
|
): TaskR[Storage with Console with Config, StorageQueueEvent] =
|
||||||
action match {
|
sequencedAction match {
|
||||||
case ToUpload(bucket, localFile, _) =>
|
case SequencedAction(ToUpload(bucket, localFile, _), index) =>
|
||||||
doUpload(index, totalBytesSoFar, bucket, localFile) >>= logEvent
|
doUpload(index, totalBytesSoFar, bucket, localFile) >>= logEvent
|
||||||
case ToCopy(bucket, sourceKey, hash, targetKey, _) =>
|
case SequencedAction(ToCopy(bucket, sourceKey, hash, targetKey, _), _) =>
|
||||||
Storage.copy(bucket, sourceKey, hash, targetKey) >>= logEvent
|
Storage.copy(bucket, sourceKey, hash, targetKey) >>= logEvent
|
||||||
case ToDelete(bucket, remoteKey, _) =>
|
case SequencedAction(ToDelete(bucket, remoteKey, _), _) =>
|
||||||
Storage.delete(bucket, remoteKey) >>= logEvent
|
Storage.delete(bucket, remoteKey) >>= logEvent
|
||||||
case DoNothing(_, remoteKey, _) =>
|
case SequencedAction(DoNothing(_, remoteKey, _), _) =>
|
||||||
Task(DoNothingQueueEvent(remoteKey))
|
Task(DoNothingQueueEvent(remoteKey))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -33,12 +32,16 @@ case class UnversionedMirrorArchive(syncTotals: SyncTotals)
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
localFile: LocalFile
|
localFile: LocalFile
|
||||||
) =
|
) =
|
||||||
Storage.upload(localFile,
|
Storage.upload(
|
||||||
bucket,
|
localFile,
|
||||||
UploadEventListener.Settings(localFile,
|
bucket,
|
||||||
index,
|
UploadEventListener.Settings(
|
||||||
syncTotals,
|
localFile,
|
||||||
totalBytesSoFar))
|
index,
|
||||||
|
syncTotals,
|
||||||
|
totalBytesSoFar
|
||||||
|
)
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
object UnversionedMirrorArchive {
|
object UnversionedMirrorArchive {
|
||||||
|
|
Loading…
Reference in a new issue