Support multiple parallel uploads (#188)

* [filesystem] listFiles no longer returns errors

* [cli,config] Take parallel parameter

* [config] Config add .parallel

* [lib] Perform copy and upload in parallel

* [uishell] Extract UIRequestCycle

* [uishell] Display all pending uploads progress

* [app] Always display version

* [app] Highlight version

* [uishell] UIRequestCycle refactoring

* [uishell] UIShell Don’t hide chosen actions in batch mode

* [uishell] UIShell fix typo

* [console] ConsoleOut fix typo
This commit is contained in:
Paul Campbell 2019-09-27 16:08:16 +01:00 committed by GitHub
parent f51501b13d
commit 06dd4f8fed
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 152 additions and 43 deletions

View file

@ -18,10 +18,11 @@ import net.kemitix.thorp.storage.Storage
import net.kemitix.thorp.uishell.{UIEvent, UIShell}
import zio.clock.Clock
import zio.{RIO, UIO, ZIO}
import scala.io.AnsiColor.{WHITE, RESET}
trait Program {
lazy val version = s"Thorp v${thorp.BuildInfo.version}"
lazy val version = s"${WHITE}Thorp v${thorp.BuildInfo.version}$RESET"
def run(args: List[String]): ZIO[
Storage with Console with Config with Clock with FileSystem with Hasher with FileScanner,
@ -31,7 +32,7 @@ trait Program {
cli <- CliArgs.parse(args)
config <- ConfigurationBuilder.buildConfig(cli)
_ <- Config.set(config)
_ <- ZIO.when(showVersion(cli))(Console.putStrLn(version))
_ <- Console.putStrLn(version)
_ <- ZIO.when(!showVersion(cli))(executeWithUI.catchAll(handleErrors))
} yield ()
}

View file

@ -37,6 +37,9 @@ object CliArgs {
opt[String]('p', "prefix")
.action((str, cos) => ConfigOption.Prefix(str) :: cos)
.text("Prefix within the S3 Bucket"),
opt[Int]('P', "parallel")
.action((int, cos) => ConfigOption.Parallel(int) :: cos)
.text("Maximum Parallel uploads"),
opt[String]('i', "include")
.unbounded()
.action((str, cos) => ConfigOption.Include(str) :: cos)

View file

@ -72,6 +72,29 @@ class CliArgsTest extends FunSpec {
}
}
describe("parse - parallel") {
def invokeWithArguments(args: List[String]): ConfigOptions = {
val strings = List("--source", pathTo("."), "--bucket", "bucket")
.concat(args)
.filter(_ != "")
val maybeOptions = invoke(strings)
maybeOptions.getOrElse(ConfigOptions.empty)
}
describe("when no parallel parameter") {
val configOptions = invokeWithArguments(List.empty[String])
it("should have parallel of 1") {
assertResult(1)(ConfigOptions.parallel(configOptions))
}
}
describe("when parallel parameter given") {
val configOptions = invokeWithArguments(List("--parallel", "5"))
it("should have parallel of 5") {
assertResult(5)(ConfigOptions.parallel(configOptions))
}
}
}
private def pathTo(value: String): String =
Try(Resource(this, value))
.map(_.getCanonicalPath)

View file

@ -18,6 +18,7 @@ object Config {
def prefix: ZIO[Config, Nothing, RemoteKey]
def sources: ZIO[Config, Nothing, Sources]
def filters: ZIO[Config, Nothing, List[Filter]]
def parallel: UIO[Int]
}
trait Live extends Config {
@ -42,6 +43,8 @@ object Config {
override def filters: ZIO[Config, Nothing, List[Filter]] =
UIO(configRef.get).map(_.filters)
override def parallel: UIO[Int] = UIO(configRef.get).map(_.parallel)
}
}
@ -64,4 +67,7 @@ object Config {
final def filters: ZIO[Config, Nothing, List[Filter]] =
ZIO.accessM(_.config filters)
final def parallel: ZIO[Config, Nothing, Int] =
ZIO.accessM(_.config parallel)
}

View file

@ -65,4 +65,9 @@ object ConfigOption {
override def update(config: Configuration): Configuration = config
}
case class Parallel(factor: Int) extends ConfigOption {
override def update(config: Configuration): Configuration =
parallel.set(factor)(config)
}
}

View file

@ -13,6 +13,15 @@ final case class ConfigOptions(options: List[ConfigOption]) {
}
object ConfigOptions {
val defaultParallel = 1
def parallel(configOptions: ConfigOptions): Int = {
configOptions.options
.collectFirst {
case ConfigOption.Parallel(factor) => factor
}
.getOrElse(defaultParallel)
}
val empty: ConfigOptions = ConfigOptions(List.empty)
val options: SimpleLens[ConfigOptions, List[ConfigOption]] =
SimpleLens[ConfigOptions, List[ConfigOption]](_.options,

View file

@ -8,6 +8,7 @@ private[config] final case class Configuration(
filters: List[Filter],
debug: Boolean,
batchMode: Boolean,
parallel: Int,
sources: Sources
)
@ -18,6 +19,7 @@ private[config] object Configuration {
filters = List.empty,
debug = false,
batchMode = false,
parallel = 1,
sources = Sources(List.empty)
)
val sources: SimpleLens[Configuration, Sources] =
@ -34,4 +36,6 @@ private[config] object Configuration {
val batchMode: SimpleLens[Configuration, Boolean] =
SimpleLens[Configuration, Boolean](_.batchMode,
b => a => b.copy(batchMode = a))
val parallel: SimpleLens[Configuration, Int] =
SimpleLens[Configuration, Int](_.parallel, b => a => b.copy(parallel = a))
}

View file

@ -56,7 +56,7 @@ object ConsoleOut {
override def en: String =
s"${GREEN}Deleted:$RESET ${remoteKey.key}$eraseToEndOfScreen"
override def enBatch: String =
s"Deleted: $remoteKey"
s"Deleted: ${remoteKey.key}"
}
final case class ErrorQueueEventOccurred(action: ActionSummary, e: Throwable)

View file

@ -20,7 +20,7 @@ object FileSystem {
: RIO[FileSystem, ZManaged[Any, Throwable, FileInputStream]]
def fileLines(file: File): RIO[FileSystem, Seq[String]]
def isDirectory(file: File): RIO[FileSystem, Boolean]
def listFiles(path: Path): RIO[FileSystem, Iterable[File]]
def listFiles(path: Path): UIO[List[File]]
def length(file: File): ZIO[FileSystem, Nothing, Long]
def hasLocalFile(sources: Sources,
prefix: RemoteKey,
@ -58,8 +58,9 @@ object FileSystem {
override def isDirectory(file: File): RIO[FileSystem, Boolean] =
Task(file.isDirectory)
override def listFiles(path: Path): RIO[FileSystem, Iterable[File]] =
Task(path.toFile.listFiles())
override def listFiles(path: Path): UIO[List[File]] =
Task(List.from(path.toFile.listFiles()))
.catchAll(_ => UIO.succeed(List.empty[File]))
override def length(file: File): ZIO[FileSystem, Nothing, Long] =
UIO(file.length)
@ -84,7 +85,7 @@ object FileSystem {
val fileExistsResultMap: UIO[Map[Path, File]]
val fileLinesResult: Task[List[String]]
val isDirResult: Task[Boolean]
val listFilesResult: RIO[FileSystem, Iterable[File]]
val listFilesResult: UIO[List[File]]
val lengthResult: UIO[Long]
val managedFileInputStream: Task[ZManaged[Any, Throwable, FileInputStream]]
val hasLocalFileResult: UIO[Boolean]
@ -104,7 +105,7 @@ object FileSystem {
override def isDirectory(file: File): RIO[FileSystem, Boolean] =
isDirResult
override def listFiles(path: Path): RIO[FileSystem, Iterable[File]] =
override def listFiles(path: Path): UIO[List[File]] =
listFilesResult
override def length(file: File): UIO[Long] =
@ -135,7 +136,7 @@ object FileSystem {
final def isDirectory(file: File): RIO[FileSystem, Boolean] =
ZIO.accessM(_.filesystem.isDirectory(file))
final def listFiles(path: Path): RIO[FileSystem, Iterable[File]] =
final def listFiles(path: Path): ZIO[FileSystem, Nothing, List[File]] =
ZIO.accessM(_.filesystem.listFiles(path))
final def length(file: File): ZIO[FileSystem, Nothing, Long] =

View file

@ -56,7 +56,10 @@ object LocalFileSystem extends LocalFileSystem {
actionCounter,
bytesCounter,
eventsRef)
_ <- MessageChannel.pointToPoint(fileSender)(fileReceiver).runDrain
parallel <- Config.parallel
_ <- MessageChannel
.pointToPointPar(parallel)(fileSender)(fileReceiver)
.runDrain
events <- eventsRef.get
} yield events

View file

@ -0,0 +1,72 @@
package net.kemitix.thorp.uishell
import java.util.concurrent.atomic.AtomicReference
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.domain.SizeTranslation.sizeInEnglish
import net.kemitix.thorp.domain.Terminal.{eraseLineForward, progressBar}
import net.kemitix.thorp.domain.{LocalFile, RemoteKey, Terminal}
import zio.{UIO, ZIO}
import scala.io.AnsiColor.{GREEN, RESET}
object UIRequestCycle {
private case class UploadState(transferred: Long, fileLength: Long)
private val uploads: AtomicReference[Map[RemoteKey, UploadState]] =
new AtomicReference[Map[RemoteKey, UploadState]](Map.empty)
private val statusHeight = 3
def handle(localFile: LocalFile,
bytesTransferred: Long,
index: Int,
totalBytesSoFar: Long): ZIO[Console with Config, Nothing, Unit] =
for {
_ <- ZIO.when(bytesTransferred < localFile.file.length())(
stillUploading(localFile.remoteKey,
localFile.file.length(),
bytesTransferred))
_ <- ZIO.when(bytesTransferred >= localFile.file.length()) {
finishedUploading(localFile.remoteKey)
}
} yield ()
private def stillUploading(
remoteKey: RemoteKey,
fileLength: Long,
bytesTransferred: Long
): ZIO[Console, Nothing, Unit] = {
val current: Map[RemoteKey, UploadState] =
uploads.updateAndGet((m: Map[RemoteKey, UploadState]) =>
m.updated(remoteKey, UploadState(bytesTransferred, fileLength)))
val resetCursor = s"${Terminal.cursorPrevLine(statusHeight) * current.size}"
ZIO.foreach(current) { entry =>
{
val (remoteKey, state) = entry
val percent = f"${(state.transferred * 100) / state.fileLength}%2d"
val transferred = sizeInEnglish(state.transferred)
val fileLength = sizeInEnglish(state.fileLength)
val line1 =
s"${GREEN}Uploading:$RESET ${remoteKey.key}$eraseLineForward"
val line2 = s"$GREEN File:$RESET ($percent%) $transferred of $fileLength" + s"$eraseLineForward"
val line3 =
progressBar(state.transferred, state.fileLength, Terminal.width)
Console.putStrLn(line1) *>
Console.putStrLn(line2) *>
Console.putStrLn(line3)
}
} *> Console.putStr(resetCursor)
}
private def finishedUploading(
remoteKey: RemoteKey
): ZIO[Any, Nothing, Unit] = {
UIO(uploads.updateAndGet((m: Map[RemoteKey, UploadState]) =>
m.removed(remoteKey))) *> UIO.unit
}
}

View file

@ -10,17 +10,10 @@ import net.kemitix.thorp.console.ConsoleOut.{
}
import net.kemitix.thorp.console.{Console, ConsoleOut}
import net.kemitix.thorp.domain.Action.ToUpload
import net.kemitix.thorp.domain.SizeTranslation.sizeInEnglish
import net.kemitix.thorp.domain.Terminal.{
eraseLineForward,
eraseToEndOfScreen,
progressBar
}
import net.kemitix.thorp.domain.Terminal.{eraseLineForward, eraseToEndOfScreen}
import net.kemitix.thorp.domain._
import zio.{UIO, ZIO}
import scala.io.AnsiColor.{GREEN, RESET}
object UIShell {
def receiver: UIO[MessageChannel.UReceiver[Console with Config, UIEvent]] =
@ -42,7 +35,10 @@ object UIShell {
bytesTransferred,
index,
totalBytesSoFar) =>
requestCycle(localFile, bytesTransferred, index, totalBytesSoFar)
UIRequestCycle.handle(localFile,
bytesTransferred,
index,
totalBytesSoFar)
}
}
@ -98,34 +94,17 @@ object UIShell {
_ <- Console.putMessageLn(ConsoleOut.ValidConfig(bucket, prefix, sources))
} yield ()
private def requestCycle(
localFile: LocalFile,
bytesTransferred: Long,
index: Int,
totalBytesSoFar: Long): ZIO[Console with Config, Nothing, Unit] =
ZIO.when(bytesTransferred < localFile.file.length()) {
val fileLength = localFile.file.length
val remoteKey = localFile.remoteKey.key
val statusHeight = 3
val percent = f"${(bytesTransferred * 100) / fileLength}%2d"
Console.putStrLn(
s"${GREEN}Uploading:$RESET $remoteKey$eraseToEndOfScreen\n" +
s"$GREEN File:$RESET ($percent%) ${sizeInEnglish(bytesTransferred)} of ${sizeInEnglish(fileLength)}" +
s"$eraseLineForward\n" +
progressBar(bytesTransferred, fileLength, Terminal.width) +
s"${Terminal.cursorPrevLine(statusHeight)}")
}
private def actionAsString(action: Action): String = action match {
case Action.DoNothing(bucket, remoteKey, size) =>
s"Do nothing: ${remoteKey.key}"
case ToUpload(bucket, localFile, size) => s"Upload: ${localFile.remoteKey}"
case ToUpload(bucket, localFile, size) =>
s"Upload: ${localFile.remoteKey.key}"
case Action.ToCopy(bucket, sourceKey, hash, targetKey, size) =>
s"Copy: ${sourceKey.key} => ${targetKey.key}"
case Action.ToDelete(bucket, remoteKey, size) => s"Delete: ${remoteKey.key}"
}
def trimHeadTerminal(str: String): String = {
def trimHead(str: String): String = {
val width = Terminal.width
str.length match {
case l if l > width => str.substring(l - width)
@ -133,9 +112,12 @@ object UIShell {
}
}
def actionChosen(action: Action): ZIO[Console with Config, Nothing, Unit] = {
Console.putStr(
trimHeadTerminal(actionAsString(action)) + eraseLineForward + "\r")
}
def actionChosen(action: Action): ZIO[Console with Config, Nothing, Unit] =
for {
batch <- Config.batchMode
message = trimHead(actionAsString(action)) + eraseLineForward
_ <- ZIO.when(!batch) { Console.putStr(message + "\r") }
_ <- ZIO.when(batch) { Console.putStrLn(message) }
} yield ()
}