Upload progress via uievent (#196)

* [uishell] Fix package name

* [sbt] Update eip-zio from 0.3.1 to 0.3.2

* [sbt] add dependency on eip-zio to domain

* [uishell] move Upload*Event* to uishell

* UploadEventListener
* UploadEventLogger
* UploadProgressEvent

* [uishell] Log upload progress using UIShell

* [uishell] inline UploadEventLogger into UIShell

* [uishell] Remove println and use Console
This commit is contained in:
Paul Campbell 2019-09-11 22:47:42 +01:00 committed by GitHub
parent 3d4c238030
commit a2c061d655
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 135 additions and 124 deletions

View file

@ -15,7 +15,7 @@ import net.kemitix.thorp.domain.StorageEvent.{
import net.kemitix.thorp.filesystem.{FileSystem, Hasher}
import net.kemitix.thorp.lib._
import net.kemitix.thorp.storage.Storage
import net.kemitix.throp.uishell.{UIEvent, UIShell}
import net.kemitix.thorp.uishell.{UIEvent, UIShell}
import zio.clock.Clock
import zio.{RIO, UIO, ZIO}

View file

@ -71,7 +71,7 @@ val zioDependencies = Seq(
val eipDependencies = Seq(
libraryDependencies ++= Seq(
"net.kemitix" %% "eip-zio" % "0.3.1"
"net.kemitix" %% "eip-zio" % "0.3.2"
)
)
@ -169,3 +169,4 @@ lazy val domain = (project in file("domain"))
.settings(assemblyJarName in assembly := "domain.jar")
.settings(testDependencies)
.settings(zioDependencies)
.settings(eipDependencies)

View file

@ -1,33 +0,0 @@
package net.kemitix.thorp.domain
import java.util.concurrent.atomic.AtomicLong
import net.kemitix.thorp.domain.UploadProgressEvent.RequestEvent
import net.kemitix.thorp.domain.UploadEventLogger.RequestCycle
object UploadEventListener {
final case class Settings(
localFile: LocalFile,
index: Int,
totalBytesSoFar: Long,
batchMode: Boolean
)
def listener(settings: Settings): UploadProgressEvent => Unit = {
val bytesTransferred = new AtomicLong(0L)
event =>
{
event match {
case e: RequestEvent =>
UploadEventLogger(
RequestCycle(settings.localFile,
bytesTransferred.addAndGet(e.transferred),
settings.index,
settings.totalBytesSoFar))
case _ => ()
}
}
}
}

View file

@ -1,41 +0,0 @@
package net.kemitix.thorp.domain
import net.kemitix.thorp.domain.Terminal._
import scala.io.AnsiColor._
object UploadEventLogger {
final case class RequestCycle(
localFile: LocalFile,
bytesTransferred: Long,
index: Int,
totalBytesSoFar: Long
)
def apply(requestCycle: RequestCycle): Unit = {
val remoteKey = requestCycle.localFile.remoteKey.key
val fileLength = requestCycle.localFile.file.length
val statusHeight = 3
if (requestCycle.bytesTransferred < fileLength)
println(
s"${GREEN}Uploading:$RESET $remoteKey$eraseToEndOfScreen\n" +
statusWithBar(" File",
SizeTranslation.sizeInEnglish,
requestCycle.bytesTransferred,
fileLength) +
s"${Terminal.cursorPrevLine(statusHeight)}")
}
private def statusWithBar(
label: String,
format: Long => String,
current: Long,
max: Long
): String = {
val percent = f"${(current * 100) / max}%2d"
s"$GREEN$label:$RESET ($percent%) ${format(current)} of ${format(max)}" +
s"$eraseLineForward\n" +
progressBar(current, max, Terminal.width)
}
}

View file

@ -13,7 +13,7 @@ import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem.{FileSystem, Hasher}
import net.kemitix.thorp.lib.FileScanner.Hashes
import net.kemitix.thorp.storage.Storage
import net.kemitix.throp.uishell.UIEvent
import net.kemitix.thorp.uishell.UIEvent
import zio._
import zio.clock.Clock
@ -98,7 +98,7 @@ object LocalFileSystem extends LocalFileSystem {
bytesCounter <- bytesCounterRef.update(_ + action.size)
_ <- uiActionChosen(uiChannel)(action)
sequencedAction = SequencedAction(action, actionCounter)
event <- archive.update(sequencedAction, bytesCounter)
event <- archive.update(uiChannel, sequencedAction, bytesCounter)
_ <- eventsRef.update(list => event :: list)
_ <- uiActionFinished(uiChannel)(action, actionCounter, bytesCounter)
} yield ()
@ -242,7 +242,7 @@ object LocalFileSystem extends LocalFileSystem {
_ <- uiActionChosen(uiChannel)(action)
bytesCounter <- bytesCounterRef.update(_ + action.size)
sequencedAction = SequencedAction(action, actionCounter)
event <- archive.update(sequencedAction, 0L)
event <- archive.update(uiChannel, sequencedAction, 0L)
_ <- eventsRef.update(list => event :: list)
_ <- uiActionFinished(uiChannel)(action,
actionCounter,

View file

@ -1,5 +1,6 @@
package net.kemitix.thorp.lib
import net.kemitix.eip.zio.MessageChannel.UChannel
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console.ConsoleOut.{
CopyComplete,
@ -11,11 +12,13 @@ import net.kemitix.thorp.console._
import net.kemitix.thorp.domain.StorageEvent
import net.kemitix.thorp.domain.StorageEvent._
import net.kemitix.thorp.storage.Storage
import net.kemitix.thorp.uishell.UIEvent
import zio.{RIO, ZIO}
trait ThorpArchive {
def update(
uiChannel: UChannel[Any, UIEvent],
sequencedAction: SequencedAction,
totalBytesSoFar: Long
): ZIO[Storage with Config, Nothing, StorageEvent]

View file

@ -1,21 +1,24 @@
package net.kemitix.thorp.lib
import net.kemitix.eip.zio.MessageChannel.UChannel
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain.StorageEvent.DoNothingEvent
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.Storage
import net.kemitix.thorp.uishell.{UIEvent, UploadEventListener}
import zio.{UIO, ZIO}
trait UnversionedMirrorArchive extends ThorpArchive {
override def update(
uiChannel: UChannel[Any, UIEvent],
sequencedAction: SequencedAction,
totalBytesSoFar: Long
): ZIO[Storage with Config, Nothing, StorageEvent] =
sequencedAction match {
case SequencedAction(ToUpload(bucket, localFile, _), index) =>
doUpload(index, totalBytesSoFar, bucket, localFile)
doUpload(uiChannel, index, totalBytesSoFar, bucket, localFile)
case SequencedAction(ToCopy(bucket, sourceKey, hash, targetKey, _), _) =>
Storage.copy(bucket, sourceKey, hash, targetKey)
case SequencedAction(ToDelete(bucket, remoteKey, _), _) =>
@ -25,6 +28,7 @@ trait UnversionedMirrorArchive extends ThorpArchive {
}
private def doUpload(
uiChannel: UChannel[Any, UIEvent],
index: Int,
totalBytesSoFar: Long,
bucket: Bucket,
@ -36,6 +40,7 @@ trait UnversionedMirrorArchive extends ThorpArchive {
localFile,
bucket,
UploadEventListener.Settings(
uiChannel: UChannel[Any, UIEvent],
localFile,
index,
totalBytesSoFar,

View file

@ -3,6 +3,7 @@ package net.kemitix.thorp.lib
import java.util.concurrent.atomic.AtomicReference
import net.kemitix.eip.zio.MessageChannel
import net.kemitix.eip.zio.MessageChannel.UChannel
import net.kemitix.thorp.config.ConfigOption.{
IgnoreGlobalOptions,
IgnoreUserOptions
@ -17,12 +18,17 @@ import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem.{FileSystem, Hasher, Resource}
import net.kemitix.thorp.storage.Storage
import net.kemitix.throp.uishell.UIEvent
import net.kemitix.throp.uishell.UIEvent._
import net.kemitix.thorp.uishell.UIEvent
import net.kemitix.thorp.uishell.UIEvent.{
ActionChosen,
ActionFinished,
FileFound,
KeyFound
}
import org.scalatest.FreeSpec
import org.scalatest.Matchers._
import zio.clock.Clock
import zio.{DefaultRuntime, UIO}
import zio.{DefaultRuntime, UIO, ZIO}
import scala.collection.MapView
@ -44,12 +50,15 @@ class LocalFileSystemTest extends FreeSpec {
private val uiEvents = new AtomicReference[List[UIEvent]](List.empty)
private val actions = new AtomicReference[List[SequencedAction]](List.empty)
private def archive: ThorpArchive =
(sequencedAction: SequencedAction, _) =>
UIO {
actions.updateAndGet(l => sequencedAction :: l)
StorageEvent.DoNothingEvent(sequencedAction.action.remoteKey)
private def archive: ThorpArchive = new ThorpArchive {
override def update(uiChannel: UChannel[Any, UIEvent],
sequencedAction: SequencedAction,
totalBytesSoFar: Long)
: ZIO[Storage with Config, Nothing, StorageEvent] = UIO {
actions.updateAndGet(l => sequencedAction :: l)
StorageEvent.DoNothingEvent(sequencedAction.action.remoteKey)
}
}
private val runtime = new DefaultRuntime {}

View file

@ -6,6 +6,7 @@ import net.kemitix.thorp.domain.StorageEvent.ShutdownEvent
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.Storage
import net.kemitix.thorp.storage.Storage.Service
import net.kemitix.thorp.uishell.UploadEventListener
import zio.{RIO, UIO}
object S3Storage {

View file

@ -11,21 +11,20 @@ import net.kemitix.thorp.domain.StorageEvent.{
ErrorEvent,
UploadEvent
}
import net.kemitix.thorp.domain.UploadProgressEvent.{
ByteTransferEvent,
RequestEvent,
TransferEvent
}
import net.kemitix.thorp.domain.{
Bucket,
LocalFile,
MD5Hash,
RemoteKey,
StorageEvent,
UploadEventListener,
UploadProgressEvent
StorageEvent
}
import net.kemitix.thorp.storage.aws.Uploader.Request
import net.kemitix.thorp.uishell.UploadProgressEvent.{
ByteTransferEvent,
RequestEvent,
TransferEvent
}
import net.kemitix.thorp.uishell.{UploadEventListener, UploadProgressEvent}
import zio.UIO
trait Uploader {

View file

@ -3,6 +3,7 @@ package net.kemitix.thorp.storage.aws
import net.kemitix.thorp.domain.StorageEvent.ShutdownEvent
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.Storage
import net.kemitix.thorp.uishell.UploadEventListener
import org.scalamock.scalatest.MockFactory
import zio.{RIO, UIO}

View file

@ -5,6 +5,7 @@ import java.io.File
import com.amazonaws.SdkClientException
import com.amazonaws.services.s3.model.AmazonS3Exception
import com.amazonaws.services.s3.transfer.model.UploadResult
import net.kemitix.eip.zio.MessageChannel.UChannel
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain.StorageEvent.{
@ -17,9 +18,12 @@ import org.scalamock.scalatest.MockFactory
import org.scalatest.FreeSpec
import zio.{DefaultRuntime, Task}
import net.kemitix.thorp.filesystem.Resource
import net.kemitix.thorp.uishell.{UIEvent, UploadEventListener}
class UploaderTest extends FreeSpec with MockFactory {
val uiChannel: UChannel[Any, UIEvent] = zioMessage => ()
"upload" - {
val aSource: File = Resource(this, "")
val aFile: File = Resource(this, "small-file")
@ -35,7 +39,7 @@ class UploaderTest extends FreeSpec with MockFactory {
override def waitForUploadResult: UploadResult = uploadResult
}
val listenerSettings =
UploadEventListener.Settings(localFile, 0, 0, batchMode = true)
UploadEventListener.Settings(uiChannel, localFile, 0, 0, batchMode = true)
"when no error" in {
val expected =
Right(UploadEvent(remoteKey, aHash))

View file

@ -1,6 +1,7 @@
package net.kemitix.thorp.storage
import net.kemitix.thorp.domain._
import net.kemitix.thorp.uishell.UploadEventListener
import zio.{RIO, Task, UIO, ZIO}
trait Storage {

View file

@ -1,4 +1,4 @@
package net.kemitix.throp.uishell
package net.kemitix.thorp.uishell
import net.kemitix.eip.zio.MessageChannel
import net.kemitix.thorp.config.Config

View file

@ -1,4 +1,4 @@
package net.kemitix.throp.uishell
package net.kemitix.thorp.uishell
import net.kemitix.thorp.domain._
@ -34,4 +34,10 @@ object UIEvent {
case class KeyFound(remoteKey: RemoteKey) extends UIEvent
case class RequestCycle(localFile: LocalFile,
bytesTransferred: Long,
index: Int,
totalBytesSoFar: Long)
extends UIEvent
}

View file

@ -1,4 +1,4 @@
package net.kemitix.throp.uishell
package net.kemitix.thorp.uishell
import net.kemitix.eip.zio.MessageChannel
import net.kemitix.thorp.config.Config
@ -8,18 +8,20 @@ import net.kemitix.thorp.console.ConsoleOut.{
UploadComplete
}
import net.kemitix.thorp.console.{Console, ConsoleOut}
import net.kemitix.thorp.domain.{
Action,
Counters,
LocalFile,
MD5Hash,
RemoteKey
}
import net.kemitix.thorp.domain.Action.ToUpload
import net.kemitix.thorp.domain.Terminal.eraseToEndOfScreen
import net.kemitix.thorp.domain.SizeTranslation.sizeInEnglish
import net.kemitix.thorp.domain.Terminal.{
eraseLineForward,
eraseToEndOfScreen,
progressBar
}
import net.kemitix.thorp.domain._
import zio.{UIO, ZIO}
import scala.io.AnsiColor.{GREEN, RESET}
object UIShell {
def receiver: UIO[MessageChannel.UReceiver[Console with Config, UIEvent]] =
UIO { uiEventMessage =>
uiEventMessage.body match {
@ -34,11 +36,16 @@ object UIShell {
uploadWaitComplete(action)
case UIEvent.ActionFinished(action, _, _) => actionFinished(action)
case UIEvent.KeyFound(_) => UIO(())
case UIEvent.RequestCycle(localFile,
bytesTransferred,
index,
totalBytesSoFar) =>
requestCycle(localFile, bytesTransferred, index, totalBytesSoFar)
}
}
private def actionFinished(
action: Action): ZIO[Console with Config, Nothing, Unit] = {
action: Action): ZIO[Console with Config, Nothing, Unit] =
for {
batchMode <- Config.batchMode
_ <- action match {
@ -51,46 +58,57 @@ object UIShell {
Console.putMessageLnB(DeleteComplete(remoteKey), batchMode)
}
} yield ()
}
private def uploadWaitComplete(
action: Action): ZIO[Console, Nothing, Unit] = {
private def uploadWaitComplete(action: Action): ZIO[Console, Nothing, Unit] =
Console.putStrLn(s"Finished waiting to other upload - now $action")
}
private def awaitingUpload(remoteKey: RemoteKey,
hash: MD5Hash): ZIO[Console, Nothing, Unit] = {
hash: MD5Hash): ZIO[Console, Nothing, Unit] =
Console.putStrLn(
s"Awaiting another upload of $hash before copying it to $remoteKey")
}
private def fileFound(
localFile: LocalFile): ZIO[Console with Config, Nothing, Unit] = {
localFile: LocalFile): ZIO[Console with Config, Nothing, Unit] =
for {
batchMode <- Config.batchMode
_ <- ZIO.when(batchMode)(Console.putStrLn(s"Found: ${localFile.file}"))
} yield ()
}
private def showSummary(
counters: Counters): ZIO[Console with Config, Nothing, Unit] = {
counters: Counters): ZIO[Console with Config, Nothing, Unit] =
Console.putStrLn(eraseToEndOfScreen) *>
Console.putStrLn(s"Uploaded ${counters.uploaded} files") *>
Console.putStrLn(s"Copied ${counters.copied} files") *>
Console.putStrLn(s"Deleted ${counters.deleted} files") *>
Console.putStrLn(s"Errors ${counters.errors}")
}
private def remoteDataFetched(size: Int): ZIO[Console, Nothing, Unit] = {
private def remoteDataFetched(size: Int): ZIO[Console, Nothing, Unit] =
Console.putStrLn(s"Found $size remote objects")
}
private def showValidConfig: ZIO[Console with Config, Nothing, Unit] = {
private def showValidConfig: ZIO[Console with Config, Nothing, Unit] =
for {
bucket <- Config.bucket
prefix <- Config.prefix
sources <- Config.sources
_ <- Console.putMessageLn(ConsoleOut.ValidConfig(bucket, prefix, sources))
} yield ()
}
private def requestCycle(
localFile: LocalFile,
bytesTransferred: Long,
index: Int,
totalBytesSoFar: Long): ZIO[Console with Config, Nothing, Unit] =
ZIO.when(bytesTransferred < localFile.file.length()) {
val fileLength = localFile.file.length
val remoteKey = localFile.remoteKey.key
val statusHeight = 3
val percent = f"${(bytesTransferred * 100) / fileLength}%2d"
Console.putStrLn(
s"${GREEN}Uploading:$RESET $remoteKey$eraseToEndOfScreen\n" +
s"$GREEN File:$RESET ($percent%) ${sizeInEnglish(bytesTransferred)} of ${sizeInEnglish(fileLength)}" +
s"$eraseLineForward\n" +
progressBar(bytesTransferred, fileLength, Terminal.width) +
s"${Terminal.cursorPrevLine(statusHeight)}")
}
}

View file

@ -0,0 +1,37 @@
package net.kemitix.thorp.uishell
import java.util.concurrent.atomic.AtomicLong
import net.kemitix.eip.zio.Message
import net.kemitix.eip.zio.MessageChannel.UChannel
import net.kemitix.thorp.domain.LocalFile
import net.kemitix.thorp.uishell.UploadProgressEvent.RequestEvent
object UploadEventListener {
final case class Settings(
uiChannel: UChannel[Any, UIEvent],
localFile: LocalFile,
index: Int,
totalBytesSoFar: Long,
batchMode: Boolean
)
def listener(settings: Settings): UploadProgressEvent => Unit = {
val bytesTransferred = new AtomicLong(0L)
event =>
{
event match {
case e: RequestEvent =>
settings.uiChannel(
Message.withBody(
UIEvent.RequestCycle(settings.localFile,
bytesTransferred.addAndGet(e.transferred),
settings.index,
settings.totalBytesSoFar)))
case _ => ()
}
}
}
}

View file

@ -1,4 +1,4 @@
package net.kemitix.thorp.domain
package net.kemitix.thorp.uishell
sealed trait UploadProgressEvent {
def name: String