diff --git a/app/src/main/scala/net/kemitix/thorp/Program.scala b/app/src/main/scala/net/kemitix/thorp/Program.scala index 31eaa9c..271943d 100644 --- a/app/src/main/scala/net/kemitix/thorp/Program.scala +++ b/app/src/main/scala/net/kemitix/thorp/Program.scala @@ -18,10 +18,11 @@ import net.kemitix.thorp.storage.Storage import net.kemitix.thorp.uishell.{UIEvent, UIShell} import zio.clock.Clock import zio.{RIO, UIO, ZIO} +import scala.io.AnsiColor.{WHITE, RESET} trait Program { - lazy val version = s"Thorp v${thorp.BuildInfo.version}" + lazy val version = s"${WHITE}Thorp v${thorp.BuildInfo.version}$RESET" def run(args: List[String]): ZIO[ Storage with Console with Config with Clock with FileSystem with Hasher with FileScanner, @@ -31,7 +32,7 @@ trait Program { cli <- CliArgs.parse(args) config <- ConfigurationBuilder.buildConfig(cli) _ <- Config.set(config) - _ <- ZIO.when(showVersion(cli))(Console.putStrLn(version)) + _ <- Console.putStrLn(version) _ <- ZIO.when(!showVersion(cli))(executeWithUI.catchAll(handleErrors)) } yield () } diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/CliArgs.scala b/cli/src/main/scala/net/kemitix/thorp/cli/CliArgs.scala index cdb35f4..07487e8 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/CliArgs.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/CliArgs.scala @@ -37,6 +37,9 @@ object CliArgs { opt[String]('p', "prefix") .action((str, cos) => ConfigOption.Prefix(str) :: cos) .text("Prefix within the S3 Bucket"), + opt[Int]('P', "parallel") + .action((int, cos) => ConfigOption.Parallel(int) :: cos) + .text("Maximum Parallel uploads"), opt[String]('i', "include") .unbounded() .action((str, cos) => ConfigOption.Include(str) :: cos) diff --git a/cli/src/test/scala/net/kemitix/thorp/cli/CliArgsTest.scala b/cli/src/test/scala/net/kemitix/thorp/cli/CliArgsTest.scala index e271615..56babf1 100644 --- a/cli/src/test/scala/net/kemitix/thorp/cli/CliArgsTest.scala +++ b/cli/src/test/scala/net/kemitix/thorp/cli/CliArgsTest.scala @@ -72,6 +72,29 @@ class CliArgsTest extends FunSpec { } } + describe("parse - parallel") { + def invokeWithArguments(args: List[String]): ConfigOptions = { + val strings = List("--source", pathTo("."), "--bucket", "bucket") + .concat(args) + .filter(_ != "") + val maybeOptions = invoke(strings) + maybeOptions.getOrElse(ConfigOptions.empty) + } + + describe("when no parallel parameter") { + val configOptions = invokeWithArguments(List.empty[String]) + it("should have parallel of 1") { + assertResult(1)(ConfigOptions.parallel(configOptions)) + } + } + describe("when parallel parameter given") { + val configOptions = invokeWithArguments(List("--parallel", "5")) + it("should have parallel of 5") { + assertResult(5)(ConfigOptions.parallel(configOptions)) + } + } + } + private def pathTo(value: String): String = Try(Resource(this, value)) .map(_.getCanonicalPath) diff --git a/config/src/main/scala/net/kemitix/thorp/config/Config.scala b/config/src/main/scala/net/kemitix/thorp/config/Config.scala index 3fe2b71..5568f61 100644 --- a/config/src/main/scala/net/kemitix/thorp/config/Config.scala +++ b/config/src/main/scala/net/kemitix/thorp/config/Config.scala @@ -18,6 +18,7 @@ object Config { def prefix: ZIO[Config, Nothing, RemoteKey] def sources: ZIO[Config, Nothing, Sources] def filters: ZIO[Config, Nothing, List[Filter]] + def parallel: UIO[Int] } trait Live extends Config { @@ -42,6 +43,8 @@ object Config { override def filters: ZIO[Config, Nothing, List[Filter]] = UIO(configRef.get).map(_.filters) + + override def parallel: UIO[Int] = UIO(configRef.get).map(_.parallel) } } @@ -64,4 +67,7 @@ object Config { final def filters: ZIO[Config, Nothing, List[Filter]] = ZIO.accessM(_.config filters) + + final def parallel: ZIO[Config, Nothing, Int] = + ZIO.accessM(_.config parallel) } diff --git a/config/src/main/scala/net/kemitix/thorp/config/ConfigOption.scala b/config/src/main/scala/net/kemitix/thorp/config/ConfigOption.scala index d4335b0..ac0002b 100644 --- a/config/src/main/scala/net/kemitix/thorp/config/ConfigOption.scala +++ b/config/src/main/scala/net/kemitix/thorp/config/ConfigOption.scala @@ -65,4 +65,9 @@ object ConfigOption { override def update(config: Configuration): Configuration = config } + case class Parallel(factor: Int) extends ConfigOption { + override def update(config: Configuration): Configuration = + parallel.set(factor)(config) + } + } diff --git a/config/src/main/scala/net/kemitix/thorp/config/ConfigOptions.scala b/config/src/main/scala/net/kemitix/thorp/config/ConfigOptions.scala index ec95cbb..17f6f31 100644 --- a/config/src/main/scala/net/kemitix/thorp/config/ConfigOptions.scala +++ b/config/src/main/scala/net/kemitix/thorp/config/ConfigOptions.scala @@ -13,6 +13,15 @@ final case class ConfigOptions(options: List[ConfigOption]) { } object ConfigOptions { + val defaultParallel = 1 + def parallel(configOptions: ConfigOptions): Int = { + configOptions.options + .collectFirst { + case ConfigOption.Parallel(factor) => factor + } + .getOrElse(defaultParallel) + } + val empty: ConfigOptions = ConfigOptions(List.empty) val options: SimpleLens[ConfigOptions, List[ConfigOption]] = SimpleLens[ConfigOptions, List[ConfigOption]](_.options, diff --git a/config/src/main/scala/net/kemitix/thorp/config/Configuration.scala b/config/src/main/scala/net/kemitix/thorp/config/Configuration.scala index 335f8c0..98e4926 100644 --- a/config/src/main/scala/net/kemitix/thorp/config/Configuration.scala +++ b/config/src/main/scala/net/kemitix/thorp/config/Configuration.scala @@ -8,6 +8,7 @@ private[config] final case class Configuration( filters: List[Filter], debug: Boolean, batchMode: Boolean, + parallel: Int, sources: Sources ) @@ -18,6 +19,7 @@ private[config] object Configuration { filters = List.empty, debug = false, batchMode = false, + parallel = 1, sources = Sources(List.empty) ) val sources: SimpleLens[Configuration, Sources] = @@ -34,4 +36,6 @@ private[config] object Configuration { val batchMode: SimpleLens[Configuration, Boolean] = SimpleLens[Configuration, Boolean](_.batchMode, b => a => b.copy(batchMode = a)) + val parallel: SimpleLens[Configuration, Int] = + SimpleLens[Configuration, Int](_.parallel, b => a => b.copy(parallel = a)) } diff --git a/console/src/main/scala/net/kemitix/thorp/console/ConsoleOut.scala b/console/src/main/scala/net/kemitix/thorp/console/ConsoleOut.scala index fe55118..dd2137b 100644 --- a/console/src/main/scala/net/kemitix/thorp/console/ConsoleOut.scala +++ b/console/src/main/scala/net/kemitix/thorp/console/ConsoleOut.scala @@ -56,7 +56,7 @@ object ConsoleOut { override def en: String = s"${GREEN}Deleted:$RESET ${remoteKey.key}$eraseToEndOfScreen" override def enBatch: String = - s"Deleted: $remoteKey" + s"Deleted: ${remoteKey.key}" } final case class ErrorQueueEventOccurred(action: ActionSummary, e: Throwable) diff --git a/filesystem/src/main/scala/net/kemitix/thorp/filesystem/FileSystem.scala b/filesystem/src/main/scala/net/kemitix/thorp/filesystem/FileSystem.scala index 7ddc96a..b13209c 100644 --- a/filesystem/src/main/scala/net/kemitix/thorp/filesystem/FileSystem.scala +++ b/filesystem/src/main/scala/net/kemitix/thorp/filesystem/FileSystem.scala @@ -20,7 +20,7 @@ object FileSystem { : RIO[FileSystem, ZManaged[Any, Throwable, FileInputStream]] def fileLines(file: File): RIO[FileSystem, Seq[String]] def isDirectory(file: File): RIO[FileSystem, Boolean] - def listFiles(path: Path): RIO[FileSystem, Iterable[File]] + def listFiles(path: Path): UIO[List[File]] def length(file: File): ZIO[FileSystem, Nothing, Long] def hasLocalFile(sources: Sources, prefix: RemoteKey, @@ -58,8 +58,9 @@ object FileSystem { override def isDirectory(file: File): RIO[FileSystem, Boolean] = Task(file.isDirectory) - override def listFiles(path: Path): RIO[FileSystem, Iterable[File]] = - Task(path.toFile.listFiles()) + override def listFiles(path: Path): UIO[List[File]] = + Task(List.from(path.toFile.listFiles())) + .catchAll(_ => UIO.succeed(List.empty[File])) override def length(file: File): ZIO[FileSystem, Nothing, Long] = UIO(file.length) @@ -84,7 +85,7 @@ object FileSystem { val fileExistsResultMap: UIO[Map[Path, File]] val fileLinesResult: Task[List[String]] val isDirResult: Task[Boolean] - val listFilesResult: RIO[FileSystem, Iterable[File]] + val listFilesResult: UIO[List[File]] val lengthResult: UIO[Long] val managedFileInputStream: Task[ZManaged[Any, Throwable, FileInputStream]] val hasLocalFileResult: UIO[Boolean] @@ -104,7 +105,7 @@ object FileSystem { override def isDirectory(file: File): RIO[FileSystem, Boolean] = isDirResult - override def listFiles(path: Path): RIO[FileSystem, Iterable[File]] = + override def listFiles(path: Path): UIO[List[File]] = listFilesResult override def length(file: File): UIO[Long] = @@ -135,7 +136,7 @@ object FileSystem { final def isDirectory(file: File): RIO[FileSystem, Boolean] = ZIO.accessM(_.filesystem.isDirectory(file)) - final def listFiles(path: Path): RIO[FileSystem, Iterable[File]] = + final def listFiles(path: Path): ZIO[FileSystem, Nothing, List[File]] = ZIO.accessM(_.filesystem.listFiles(path)) final def length(file: File): ZIO[FileSystem, Nothing, Long] = diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileSystem.scala b/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileSystem.scala index b646b0c..774541b 100644 --- a/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileSystem.scala +++ b/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileSystem.scala @@ -56,7 +56,10 @@ object LocalFileSystem extends LocalFileSystem { actionCounter, bytesCounter, eventsRef) - _ <- MessageChannel.pointToPoint(fileSender)(fileReceiver).runDrain + parallel <- Config.parallel + _ <- MessageChannel + .pointToPointPar(parallel)(fileSender)(fileReceiver) + .runDrain events <- eventsRef.get } yield events diff --git a/uishell/src/main/scala/net/kemitix/thorp/uishell/UIRequestCycle.scala b/uishell/src/main/scala/net/kemitix/thorp/uishell/UIRequestCycle.scala new file mode 100644 index 0000000..6f8b4aa --- /dev/null +++ b/uishell/src/main/scala/net/kemitix/thorp/uishell/UIRequestCycle.scala @@ -0,0 +1,72 @@ +package net.kemitix.thorp.uishell + +import java.util.concurrent.atomic.AtomicReference + +import net.kemitix.thorp.config.Config +import net.kemitix.thorp.console.Console +import net.kemitix.thorp.domain.SizeTranslation.sizeInEnglish +import net.kemitix.thorp.domain.Terminal.{eraseLineForward, progressBar} +import net.kemitix.thorp.domain.{LocalFile, RemoteKey, Terminal} +import zio.{UIO, ZIO} + +import scala.io.AnsiColor.{GREEN, RESET} + +object UIRequestCycle { + + private case class UploadState(transferred: Long, fileLength: Long) + + private val uploads: AtomicReference[Map[RemoteKey, UploadState]] = + new AtomicReference[Map[RemoteKey, UploadState]](Map.empty) + + private val statusHeight = 3 + + def handle(localFile: LocalFile, + bytesTransferred: Long, + index: Int, + totalBytesSoFar: Long): ZIO[Console with Config, Nothing, Unit] = + for { + _ <- ZIO.when(bytesTransferred < localFile.file.length())( + stillUploading(localFile.remoteKey, + localFile.file.length(), + bytesTransferred)) + _ <- ZIO.when(bytesTransferred >= localFile.file.length()) { + finishedUploading(localFile.remoteKey) + } + } yield () + + private def stillUploading( + remoteKey: RemoteKey, + fileLength: Long, + bytesTransferred: Long + ): ZIO[Console, Nothing, Unit] = { + val current: Map[RemoteKey, UploadState] = + uploads.updateAndGet((m: Map[RemoteKey, UploadState]) => + m.updated(remoteKey, UploadState(bytesTransferred, fileLength))) + val resetCursor = s"${Terminal.cursorPrevLine(statusHeight) * current.size}" + ZIO.foreach(current) { entry => + { + val (remoteKey, state) = entry + + val percent = f"${(state.transferred * 100) / state.fileLength}%2d" + val transferred = sizeInEnglish(state.transferred) + val fileLength = sizeInEnglish(state.fileLength) + val line1 = + s"${GREEN}Uploading:$RESET ${remoteKey.key}$eraseLineForward" + val line2 = s"$GREEN File:$RESET ($percent%) $transferred of $fileLength" + s"$eraseLineForward" + val line3 = + progressBar(state.transferred, state.fileLength, Terminal.width) + Console.putStrLn(line1) *> + Console.putStrLn(line2) *> + Console.putStrLn(line3) + } + } *> Console.putStr(resetCursor) + } + + private def finishedUploading( + remoteKey: RemoteKey + ): ZIO[Any, Nothing, Unit] = { + UIO(uploads.updateAndGet((m: Map[RemoteKey, UploadState]) => + m.removed(remoteKey))) *> UIO.unit + } + +} diff --git a/uishell/src/main/scala/net/kemitix/thorp/uishell/UIShell.scala b/uishell/src/main/scala/net/kemitix/thorp/uishell/UIShell.scala index 39c24fb..d7074e9 100644 --- a/uishell/src/main/scala/net/kemitix/thorp/uishell/UIShell.scala +++ b/uishell/src/main/scala/net/kemitix/thorp/uishell/UIShell.scala @@ -10,17 +10,10 @@ import net.kemitix.thorp.console.ConsoleOut.{ } import net.kemitix.thorp.console.{Console, ConsoleOut} import net.kemitix.thorp.domain.Action.ToUpload -import net.kemitix.thorp.domain.SizeTranslation.sizeInEnglish -import net.kemitix.thorp.domain.Terminal.{ - eraseLineForward, - eraseToEndOfScreen, - progressBar -} +import net.kemitix.thorp.domain.Terminal.{eraseLineForward, eraseToEndOfScreen} import net.kemitix.thorp.domain._ import zio.{UIO, ZIO} -import scala.io.AnsiColor.{GREEN, RESET} - object UIShell { def receiver: UIO[MessageChannel.UReceiver[Console with Config, UIEvent]] = @@ -42,7 +35,10 @@ object UIShell { bytesTransferred, index, totalBytesSoFar) => - requestCycle(localFile, bytesTransferred, index, totalBytesSoFar) + UIRequestCycle.handle(localFile, + bytesTransferred, + index, + totalBytesSoFar) } } @@ -98,34 +94,17 @@ object UIShell { _ <- Console.putMessageLn(ConsoleOut.ValidConfig(bucket, prefix, sources)) } yield () - private def requestCycle( - localFile: LocalFile, - bytesTransferred: Long, - index: Int, - totalBytesSoFar: Long): ZIO[Console with Config, Nothing, Unit] = - ZIO.when(bytesTransferred < localFile.file.length()) { - val fileLength = localFile.file.length - val remoteKey = localFile.remoteKey.key - val statusHeight = 3 - val percent = f"${(bytesTransferred * 100) / fileLength}%2d" - Console.putStrLn( - s"${GREEN}Uploading:$RESET $remoteKey$eraseToEndOfScreen\n" + - s"$GREEN File:$RESET ($percent%) ${sizeInEnglish(bytesTransferred)} of ${sizeInEnglish(fileLength)}" + - s"$eraseLineForward\n" + - progressBar(bytesTransferred, fileLength, Terminal.width) + - s"${Terminal.cursorPrevLine(statusHeight)}") - } - private def actionAsString(action: Action): String = action match { case Action.DoNothing(bucket, remoteKey, size) => s"Do nothing: ${remoteKey.key}" - case ToUpload(bucket, localFile, size) => s"Upload: ${localFile.remoteKey}" + case ToUpload(bucket, localFile, size) => + s"Upload: ${localFile.remoteKey.key}" case Action.ToCopy(bucket, sourceKey, hash, targetKey, size) => s"Copy: ${sourceKey.key} => ${targetKey.key}" case Action.ToDelete(bucket, remoteKey, size) => s"Delete: ${remoteKey.key}" } - def trimHeadTerminal(str: String): String = { + def trimHead(str: String): String = { val width = Terminal.width str.length match { case l if l > width => str.substring(l - width) @@ -133,9 +112,12 @@ object UIShell { } } - def actionChosen(action: Action): ZIO[Console with Config, Nothing, Unit] = { - Console.putStr( - trimHeadTerminal(actionAsString(action)) + eraseLineForward + "\r") - } + def actionChosen(action: Action): ZIO[Console with Config, Nothing, Unit] = + for { + batch <- Config.batchMode + message = trimHead(actionAsString(action)) + eraseLineForward + _ <- ZIO.when(!batch) { Console.putStr(message + "\r") } + _ <- ZIO.when(batch) { Console.putStrLn(message) } + } yield () }