diff --git a/app/src/main/scala/net/kemitix/thorp/Program.scala b/app/src/main/scala/net/kemitix/thorp/Program.scala index 0ec9f8a..23eb4c5 100644 --- a/app/src/main/scala/net/kemitix/thorp/Program.scala +++ b/app/src/main/scala/net/kemitix/thorp/Program.scala @@ -36,7 +36,7 @@ trait Program { private def executeWithUI(configuration: Configuration): Unit = { val uiChannel: Channel[UIEvent] = Channel.create("thorp-ui") - uiChannel.addListener(UIShell.receiver(configuration)) + uiChannel.addListener(UIShell.listener(configuration)) uiChannel.run(sink => execute(configuration, sink), "thorp-main") uiChannel.start() uiChannel.waitForShutdown() @@ -52,6 +52,7 @@ trait Program { .scanCopyUpload(configuration, uiSink, remoteObjects, archive) val deleteEvents = LocalFileSystem .scanDelete(configuration, uiSink, remoteObjects, archive) + Storage.getInstance().shutdown(); showSummary(uiSink)(storageEvents ++ deleteEvents) uiSink.shutdown(); } diff --git a/docs/images/reactor-graph.png b/docs/images/reactor-graph.png index a5af530..b0e2940 100644 Binary files a/docs/images/reactor-graph.png and b/docs/images/reactor-graph.png differ diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/UnversionedMirrorArchive.scala b/lib/src/main/scala/net/kemitix/thorp/lib/UnversionedMirrorArchive.scala index cf02639..9edab94 100644 --- a/lib/src/main/scala/net/kemitix/thorp/lib/UnversionedMirrorArchive.scala +++ b/lib/src/main/scala/net/kemitix/thorp/lib/UnversionedMirrorArchive.scala @@ -69,7 +69,7 @@ trait UnversionedMirrorArchive extends ThorpArchive { totalBytesSoFar: Long, bucket: Bucket, localFile: LocalFile) = - UploadEventListener.Settings( + UploadEventListener.settings( uiSink, localFile, index, diff --git a/storage-aws/src/main/java/net/kemitix/thorp/storage/aws/S3Storage.java b/storage-aws/src/main/java/net/kemitix/thorp/storage/aws/S3Storage.java index 7610629..f3a9be5 100644 --- a/storage-aws/src/main/java/net/kemitix/thorp/storage/aws/S3Storage.java +++ b/storage-aws/src/main/java/net/kemitix/thorp/storage/aws/S3Storage.java @@ -62,4 +62,9 @@ public class S3Storage implements Storage { ) { return deleter.apply(S3Deleter.request(bucket, remoteKey)); } + + @Override + public void shutdown() { + transferManager.shutdownNow(true); + } } diff --git a/storage/src/main/java/net/kemitix/thorp/storage/Storage.java b/storage/src/main/java/net/kemitix/thorp/storage/Storage.java index cd1dfab..4185d37 100644 --- a/storage/src/main/java/net/kemitix/thorp/storage/Storage.java +++ b/storage/src/main/java/net/kemitix/thorp/storage/Storage.java @@ -30,6 +30,8 @@ public interface Storage { RemoteKey remoteKey ); + void shutdown(); + static Storage getInstance() { return ServiceLoader.load(Storage.class).iterator().next(); } diff --git a/uishell/pom.xml b/uishell/pom.xml index 21d1b51..df24d72 100644 --- a/uishell/pom.xml +++ b/uishell/pom.xml @@ -32,28 +32,5 @@ net.kemitix.thorp thorp-filesystem - - - - org.scala-lang - scala-library - - - - - org.scalatest - scalatest_2.13 - test - - - - - - net.alchim31.maven - scala-maven-plugin - - - - \ No newline at end of file diff --git a/uishell/src/main/java/net/kemitix/thorp/uishell/UIShell.java b/uishell/src/main/java/net/kemitix/thorp/uishell/UIShell.java new file mode 100644 index 0000000..54a17d3 --- /dev/null +++ b/uishell/src/main/java/net/kemitix/thorp/uishell/UIShell.java @@ -0,0 +1,198 @@ +package net.kemitix.thorp.uishell; + +import net.kemitix.thorp.config.Configuration; +import net.kemitix.thorp.console.Console; +import net.kemitix.thorp.console.ConsoleOut; +import net.kemitix.thorp.domain.*; + +import static net.kemitix.thorp.domain.Terminal.eraseLineForward; +import static net.kemitix.thorp.domain.Terminal.eraseToEndOfScreen; + +public interface UIShell { + static Channel.Listener listener(Configuration configuration) { + return new Channel.Listener() { + @Override + public void accept(UIEvent uiEvent) { + if (uiEvent instanceof UIEvent.RequestCycle) + requestCycle((UIEvent.RequestCycle) uiEvent, configuration); + else if (uiEvent instanceof UIEvent.FileFound) + fileFound((UIEvent.FileFound) uiEvent, configuration); + else if (uiEvent instanceof UIEvent.ActionChosen) + actionChosen((UIEvent.ActionChosen) uiEvent, configuration); + else if (uiEvent instanceof UIEvent.AwaitingAnotherUpload) + awaitingAnotherUpload((UIEvent.AwaitingAnotherUpload) uiEvent); + else if (uiEvent instanceof UIEvent.AnotherUploadWaitComplete) + anotherUploadWaitComplete((UIEvent.AnotherUploadWaitComplete) uiEvent); + else if (uiEvent instanceof UIEvent.ActionFinished) + actionFinished((UIEvent.ActionFinished) uiEvent, configuration); + else if (uiEvent instanceof UIEvent.RemoteDataFetched) + remoteDataFetched((UIEvent.RemoteDataFetched) uiEvent); + else if (uiEvent instanceof UIEvent.ShowSummary) + showSummary((UIEvent.ShowSummary) uiEvent); + else if (uiEvent instanceof UIEvent.ShowValidConfig) + showValidConfig(configuration); + } + + private void requestCycle( + UIEvent.RequestCycle uiEvent, + Configuration configuration + ) { + ProgressUI.requestCycle( + configuration, + uiEvent.localFile, + uiEvent.bytesTransferred, + uiEvent.index, + uiEvent.totalBytesSoFar + ); + } + + private void actionFinished( + UIEvent.ActionFinished uiEvent, + Configuration configuration + ) { + StorageEvent storageEvent = uiEvent.event; + if (storageEvent instanceof StorageEvent.CopyEvent) + copyActionFinished( + (StorageEvent.CopyEvent) storageEvent, + configuration); + else if (storageEvent instanceof StorageEvent.UploadEvent) + uploadActionFinished( + (StorageEvent.UploadEvent) storageEvent, + configuration); + else if (storageEvent instanceof StorageEvent.DeleteEvent) + deleteActionFinished( + (StorageEvent.DeleteEvent) storageEvent, + configuration); + else if (storageEvent instanceof StorageEvent.ErrorEvent) + errorActionFinished( + (StorageEvent.ErrorEvent) storageEvent, + configuration); + } + + private void errorActionFinished( + StorageEvent.ErrorEvent errorEvent, + Configuration configuration + ) { + RemoteKey remoteKey = errorEvent.remoteKey; + StorageEvent.ActionSummary action = errorEvent.action; + Throwable e = errorEvent.e; + ProgressUI.finishedUploading(remoteKey); + Console.putMessageLnB( + ConsoleOut.errorQueueEventOccurred(action, e), + configuration.batchMode); + } + + private void deleteActionFinished( + StorageEvent.DeleteEvent deleteEvent, + Configuration configuration + ) { + RemoteKey remoteKey = deleteEvent.remoteKey; + Console.putMessageLnB( + ConsoleOut.deleteComplete(remoteKey), + configuration.batchMode); + } + + private void uploadActionFinished( + StorageEvent.UploadEvent uploadEvent, + Configuration configuration + ) { + RemoteKey remoteKey = uploadEvent.remoteKey; + Console.putMessageLnB( + ConsoleOut.uploadComplete(remoteKey), + configuration.batchMode); + ProgressUI.finishedUploading(remoteKey); + } + + private void copyActionFinished( + StorageEvent.CopyEvent copyEvent, + Configuration configuration + ) { + RemoteKey sourceKey = copyEvent.sourceKey; + RemoteKey targetKey = copyEvent.targetKey; + Console.putMessageLnB( + ConsoleOut.copyComplete(sourceKey, targetKey), + configuration.batchMode); + } + + private void anotherUploadWaitComplete( + UIEvent.AnotherUploadWaitComplete uiEvent + ) { + Console.putStrLn(String.format( + "Finished waiting to other upload - now %s", + uiEvent.action)); + } + + private void awaitingAnotherUpload( + UIEvent.AwaitingAnotherUpload uiEvent + ) { + Console.putStrLn(String.format( + "Awaiting another upload of %s before copying it to %s", + uiEvent.hash, uiEvent.remoteKey)); + } + + private void actionChosen( + UIEvent.ActionChosen uiEvent, + Configuration configuration + ) { + Action action = uiEvent.action; + if (configuration.batchMode) + Console.putStrLn(action.asString()); + else if (!(action instanceof Action.DoNothing)) { + Console.putStr(String.format("%s%s\r", + action.asString(), + eraseLineForward)); + } + } + + private void fileFound( + UIEvent.FileFound uiEvent, + Configuration configuration + ) { + if (configuration.batchMode) { + Console.putStrLn(String.format( + "Found: %s", uiEvent.localFile.file)); + } + } + + private void showSummary( + UIEvent.ShowSummary uiEvent + ) { + Console.putStrLn(eraseToEndOfScreen); + Counters counters = uiEvent.counters; + Console.putStrLn(String.format("Uploaded %d files", + counters.uploaded)); + Console.putStrLn(String.format("Copied %d files", + counters.copied)); + Console.putStrLn(String.format("Deleted %d files", + counters.deleted)); + Console.putStrLn(String.format("Errors %d", + counters.errors)); + } + + private void remoteDataFetched( + UIEvent.RemoteDataFetched uiEvent + ) { + Console.putStrLn(String.format("Found %d remote objects", + uiEvent.size)); + } + + private void showValidConfig( + Configuration configuration + ) { + Console.putMessageLn( + ConsoleOut.validConfig( + configuration.bucket, + configuration.prefix, + configuration.sources)); + } + }; + + // def trimHead(str: String): String = { + // val width = Terminal.width + // str.length match { + // case l if l > width => str.substring(l - width) + // case _ => str + // } + // } + } +} diff --git a/uishell/src/main/java/net/kemitix/thorp/uishell/UploadEventListener.java b/uishell/src/main/java/net/kemitix/thorp/uishell/UploadEventListener.java new file mode 100644 index 0000000..7b135b0 --- /dev/null +++ b/uishell/src/main/java/net/kemitix/thorp/uishell/UploadEventListener.java @@ -0,0 +1,47 @@ +package net.kemitix.thorp.uishell; + +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; +import net.kemitix.thorp.domain.Channel; +import net.kemitix.thorp.domain.LocalFile; +import net.kemitix.thorp.uishell.UploadProgressEvent.RequestEvent; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +public class UploadEventListener { + + Consumer listener(Settings settings) { + AtomicLong bytesTransferred = new AtomicLong(0L); + return event -> { + if (event instanceof RequestEvent) { + RequestEvent requestEvent = (RequestEvent) event; + settings.uiSink.accept( + UIEvent.requestCycle( + settings.localFile, + bytesTransferred.addAndGet(requestEvent.transferred), + settings.index, + settings.totalBytesSoFar + ) + ); + } + }; + } + + public static Settings settings( + Channel.Sink uiSink, + LocalFile localFile, + int index, + long totalBytesToFar, + boolean batchMode + ) { + return new Settings(uiSink, localFile, index, totalBytesToFar); + } + @RequiredArgsConstructor(access = AccessLevel.PRIVATE) + public static class Settings { + public final Channel.Sink uiSink; + public final LocalFile localFile; + public final int index; + public final long totalBytesSoFar; + } +} diff --git a/uishell/src/main/scala/net/kemitix/thorp/uishell/UIShell.scala b/uishell/src/main/scala/net/kemitix/thorp/uishell/UIShell.scala deleted file mode 100644 index 363e19a..0000000 --- a/uishell/src/main/scala/net/kemitix/thorp/uishell/UIShell.scala +++ /dev/null @@ -1,125 +0,0 @@ -package net.kemitix.thorp.uishell - -import net.kemitix.thorp.config.Configuration -import net.kemitix.thorp.console.{Console, ConsoleOut} -import net.kemitix.thorp.domain.Action.DoNothing -import net.kemitix.thorp.domain.Channel.Listener -import net.kemitix.thorp.domain.Terminal.{eraseLineForward, eraseToEndOfScreen} -import net.kemitix.thorp.domain._ - -object UIShell { - - def receiver(configuration: Configuration): Listener[UIEvent] = { - case _: UIEvent.ShowValidConfig => showValidConfig(configuration) - case uie: UIEvent.RemoteDataFetched => remoteDataFetched(uie.size) - case uie: UIEvent.ShowSummary => showSummary(uie.counters) - case uie: UIEvent.FileFound => - fileFound(configuration, uie.localFile) - case uie: UIEvent.ActionChosen => - actionChosen(configuration, uie.action) - case uie: UIEvent.AwaitingAnotherUpload => - awaitingUpload(uie.remoteKey, uie.hash) - case uie: UIEvent.AnotherUploadWaitComplete => - uploadWaitComplete(uie.action) - case uie: UIEvent.ActionFinished => - actionFinished(configuration, uie.event) - case _: UIEvent.KeyFound => () - case uie: UIEvent.RequestCycle => - ProgressUI.requestCycle( - configuration, - uie.localFile, - uie.bytesTransferred, - uie.index, - uie.totalBytesSoFar - ) - } - - private def actionFinished(configuration: Configuration, - event: StorageEvent): Unit = - event match { - case _: StorageEvent.DoNothingEvent => () - case copyEvent: StorageEvent.CopyEvent => - val sourceKey = copyEvent.sourceKey - val targetKey = copyEvent.targetKey - Console.putMessageLnB( - ConsoleOut.copyComplete(sourceKey, targetKey), - configuration.batchMode - ) - case uploadEvent: StorageEvent.UploadEvent => - val remoteKey = uploadEvent.remoteKey - Console - .putMessageLnB( - ConsoleOut.uploadComplete(remoteKey), - configuration.batchMode - ) - ProgressUI.finishedUploading(remoteKey) - case deleteEvent: StorageEvent.DeleteEvent => - val remoteKey = deleteEvent.remoteKey - Console.putMessageLnB( - ConsoleOut.deleteComplete(remoteKey), - configuration.batchMode - ) - case errorEvent: StorageEvent.ErrorEvent => - val remoteKey = errorEvent.remoteKey - val action = errorEvent.action - val e = errorEvent.e - ProgressUI.finishedUploading(remoteKey) - Console.putMessageLnB( - ConsoleOut.errorQueueEventOccurred(action, e), - configuration.batchMode - ) - case _: StorageEvent.ShutdownEvent => () - } - - private def uploadWaitComplete(action: Action): Unit = - Console.putStrLn(s"Finished waiting to other upload - now $action") - - private def awaitingUpload(remoteKey: RemoteKey, hash: MD5Hash): Unit = - Console.putStrLn( - s"Awaiting another upload of $hash before copying it to $remoteKey" - ) - - private def fileFound(configuration: Configuration, - localFile: LocalFile): Unit = - if (configuration.batchMode) { - Console.putStrLn(s"Found: ${localFile.file}") - } - - private def showSummary(counters: Counters): Unit = { - Console.putStrLn(eraseToEndOfScreen) - Console.putStrLn(s"Uploaded ${counters.uploaded} files") - Console.putStrLn(s"Copied ${counters.copied} files") - Console.putStrLn(s"Deleted ${counters.deleted} files") - Console.putStrLn(s"Errors ${counters.errors}") - } - - private def remoteDataFetched(size: Int): Unit = - Console.putStrLn(s"Found $size remote objects") - - private def showValidConfig(configuration: Configuration): Unit = - Console.putMessageLn( - ConsoleOut.validConfig( - configuration.bucket, - configuration.prefix, - configuration.sources - ) - ) - - def trimHead(str: String): String = { - val width = Terminal.width - str.length match { - case l if l > width => str.substring(l - width) - case _ => str - } - } - - def actionChosen(configuration: Configuration, action: Action): Unit = - if (configuration.batchMode) - Console.putStrLn(action.asString()) - else - action match { - case _: DoNothing => () - case _ => Console.putStr(action.asString() + eraseLineForward + "\r") - } - -} diff --git a/uishell/src/main/scala/net/kemitix/thorp/uishell/UploadEventListener.scala b/uishell/src/main/scala/net/kemitix/thorp/uishell/UploadEventListener.scala deleted file mode 100644 index 957f6de..0000000 --- a/uishell/src/main/scala/net/kemitix/thorp/uishell/UploadEventListener.scala +++ /dev/null @@ -1,35 +0,0 @@ -package net.kemitix.thorp.uishell - -import java.util.concurrent.atomic.AtomicLong - -import net.kemitix.thorp.domain.{Channel, LocalFile} -import net.kemitix.thorp.uishell.UploadProgressEvent.RequestEvent - -object UploadEventListener { - - final case class Settings(uiSink: Channel.Sink[UIEvent], - localFile: LocalFile, - index: Int, - totalBytesSoFar: Long, - batchMode: Boolean) - - def listener(settings: Settings): UploadProgressEvent => Unit = { - val bytesTransferred = new AtomicLong(0L) - event => - { - event match { - case e: RequestEvent => - settings.uiSink.accept( - UIEvent.requestCycle( - settings.localFile, - bytesTransferred.addAndGet(e.transferred), - settings.index, - settings.totalBytesSoFar - ) - ) - case _ => () - } - } - } - -}