diff --git a/app/src/main/resources/META-INF/services/net.kemitix.thorp.storage.Storage b/app/src/main/resources/META-INF/services/net.kemitix.thorp.storage.Storage new file mode 100644 index 0000000..ecfe4d9 --- /dev/null +++ b/app/src/main/resources/META-INF/services/net.kemitix.thorp.storage.Storage @@ -0,0 +1 @@ +net.kemitix.thorp.storage.aws.S3Storage diff --git a/app/src/main/scala/net/kemitix/thorp/Main.scala b/app/src/main/scala/net/kemitix/thorp/Main.scala index b0b57d1..2cdb5e8 100644 --- a/app/src/main/scala/net/kemitix/thorp/Main.scala +++ b/app/src/main/scala/net/kemitix/thorp/Main.scala @@ -1,16 +1,12 @@ package net.kemitix.thorp import net.kemitix.thorp.lib.FileScanner -import net.kemitix.thorp.storage.aws.S3Storage import zio.clock.Clock import zio.{App, ZEnv, ZIO} object Main extends App { - object LiveThorpApp - extends S3Storage.Live - with Clock.Live - with FileScanner.Live + object LiveThorpApp extends Clock.Live with FileScanner.Live override def run(args: List[String]): ZIO[ZEnv, Nothing, Int] = Program diff --git a/app/src/main/scala/net/kemitix/thorp/Program.scala b/app/src/main/scala/net/kemitix/thorp/Program.scala index fec8adf..1cde799 100644 --- a/app/src/main/scala/net/kemitix/thorp/Program.scala +++ b/app/src/main/scala/net/kemitix/thorp/Program.scala @@ -26,9 +26,7 @@ trait Program { val version = "0.11.0" lazy val versionLabel = s"${WHITE}Thorp v$version$RESET" - def run( - args: List[String] - ): ZIO[Storage with Clock with FileScanner, Nothing, Unit] = { + def run(args: List[String]): ZIO[Clock with FileScanner, Nothing, Unit] = { (for { cli <- UIO(CliArgs.parse(args.toArray)) config <- IO(ConfigurationBuilder.buildConfig(cli)) @@ -50,7 +48,7 @@ trait Program { private def executeWithUI( configuration: Configuration - ): ZIO[Storage with Clock with FileScanner, Throwable, Unit] = + ): ZIO[Clock with FileScanner, Throwable, Unit] = for { uiEventSender <- execute(configuration) uiEventReceiver <- UIShell.receiver(configuration) @@ -61,33 +59,32 @@ trait Program { private def execute( configuration: Configuration - ): UIO[MessageChannel.ESender[Storage with Clock with FileScanner, - Throwable, - UIEvent]] = UIO { uiChannel => - (for { - _ <- showValidConfig(uiChannel) - remoteData <- fetchRemoteData(configuration, uiChannel) - archive <- UIO(UnversionedMirrorArchive) - copyUploadEvents <- LocalFileSystem - .scanCopyUpload(configuration, uiChannel, remoteData, archive) - deleteEvents <- LocalFileSystem - .scanDelete(configuration, uiChannel, remoteData, archive) - _ <- showSummary(uiChannel)(copyUploadEvents ++ deleteEvents) - } yield ()) <* MessageChannel.endChannel(uiChannel) - } + ): UIO[MessageChannel.ESender[Clock with FileScanner, Throwable, UIEvent]] = + UIO { uiChannel => + (for { + _ <- showValidConfig(uiChannel) + remoteData <- fetchRemoteData(configuration, uiChannel) + archive <- UIO(UnversionedMirrorArchive) + copyUploadEvents <- LocalFileSystem + .scanCopyUpload(configuration, uiChannel, remoteData, archive) + deleteEvents <- LocalFileSystem + .scanDelete(configuration, uiChannel, remoteData, archive) + _ <- showSummary(uiChannel)(copyUploadEvents ++ deleteEvents) + } yield ()) <* MessageChannel.endChannel(uiChannel) + } private def showValidConfig(uiChannel: UIChannel) = - Message.create(UIEvent.ShowValidConfig) >>= MessageChannel.send(uiChannel) + Message.create(UIEvent.showValidConfig) >>= MessageChannel.send(uiChannel) private def fetchRemoteData( configuration: Configuration, uiChannel: UIChannel - ): ZIO[Clock with Storage, Throwable, RemoteObjects] = { + ): ZIO[Clock, Throwable, RemoteObjects] = { val bucket = configuration.bucket val prefix = configuration.prefix + val objects = Storage.getInstance().list(bucket, prefix) for { - objects <- Storage.list(bucket, prefix) - _ <- Message.create(UIEvent.RemoteDataFetched(objects.byKey.size)) >>= MessageChannel + _ <- Message.create(UIEvent.remoteDataFetched(objects.byKey.size)) >>= MessageChannel .send(uiChannel) } yield objects } @@ -109,7 +106,7 @@ trait Program { uiChannel: UIChannel )(events: Seq[StorageEvent]): RIO[Clock, Unit] = { val counters = events.foldLeft(Counters.empty)(countActivities) - Message.create(UIEvent.ShowSummary(counters)) >>= + Message.create(UIEvent.showSummary(counters)) >>= MessageChannel.send(uiChannel) } diff --git a/console/src/main/java/net/kemitix/thorp/console/ConsoleOut.java b/console/src/main/java/net/kemitix/thorp/console/ConsoleOut.java index 8d76fd5..55dfa33 100644 --- a/console/src/main/java/net/kemitix/thorp/console/ConsoleOut.java +++ b/console/src/main/java/net/kemitix/thorp/console/ConsoleOut.java @@ -11,18 +11,6 @@ import java.util.stream.Collectors; public interface ConsoleOut { String en(); - default String eraseToEndOfScreen() { - return Terminal.eraseToEndOfScreen; - } - default String reset() { - return "\u001B[0m"; - } - default String red() { - return "\u001B[31m"; - } - default String green() { - return "\u001B[32m"; - } interface WithBatchMode extends ConsoleOut, Function { String enBatch(); default String selectLine(boolean batchMode) { @@ -69,9 +57,9 @@ public interface ConsoleOut { @Override public String en() { return String.format("%sUploaded:%s %s%s", - green(), reset(), + Terminal.green, Terminal.reset, remoteKey.key(), - eraseToEndOfScreen()); + Terminal.eraseToEndOfScreen); } @Override public String enBatch() { @@ -89,10 +77,10 @@ public interface ConsoleOut { @Override public String en() { return String.format("%sCopied:%s %s => %s%s", - green(), reset(), + Terminal.green, Terminal.reset, sourceKey.key(), targetKey.key(), - eraseToEndOfScreen()); + Terminal.eraseToEndOfScreen); } @Override public String enBatch() { @@ -113,9 +101,9 @@ public interface ConsoleOut { @Override public String enBatch() { return String.format("%sDeleted%s: %s%s", - green(), reset(), + Terminal.green, Terminal.reset, remoteKey, - eraseToEndOfScreen()); + Terminal.eraseToEndOfScreen); } } static ConsoleOut.WithBatchMode errorQueueEventOccurred( @@ -138,11 +126,11 @@ public interface ConsoleOut { @Override public String enBatch() { return String.format("%sERROR%s: %s %s: %s%s", - red(), reset(), + Terminal.red, Terminal.reset, actionSummary.name(), actionSummary.keys(), error.getMessage(), - eraseToEndOfScreen()); + Terminal.eraseToEndOfScreen); } } diff --git a/docs/images/reactor-graph.png b/docs/images/reactor-graph.png index 7d3db38..806e78a 100644 Binary files a/docs/images/reactor-graph.png and b/docs/images/reactor-graph.png differ diff --git a/domain/src/main/java/net/kemitix/thorp/domain/MessageChannel.java b/domain/src/main/java/net/kemitix/thorp/domain/MessageChannel.java new file mode 100644 index 0000000..09f853e --- /dev/null +++ b/domain/src/main/java/net/kemitix/thorp/domain/MessageChannel.java @@ -0,0 +1,71 @@ +package net.kemitix.thorp.domain; + +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) +public class MessageChannel { + + private final MessageSupplier messageSupplier; + private final List> messageConsumers; + private final Thread channelThread; + + static MessageChannel create(MessageSupplier supplier) { + List> consumers = new ArrayList<>(); + return new MessageChannel(supplier, consumers, + new Thread(new ChannelRunner(supplier, consumers))); + } + + public static BlockingQueue createMessageSupplier(Class messageClass) { + return new LinkedTransferQueue<>(); + } + + public void addMessageConsumer(MessageConsumer consumer) { + messageConsumers.add(consumer); + } + + public void startChannel() { + channelThread.start(); + } + + public void shutdownChannel() { + channelThread.interrupt(); + } + + public interface MessageSupplier { + T take() throws InterruptedException; + boolean isComplete(); + } + public interface MessageConsumer { + void accept(T message); + } + + @RequiredArgsConstructor + private static class ChannelRunner implements Runnable { + AtomicBoolean shutdownTrigger = new AtomicBoolean(false); + private final MessageSupplier supplier; + private final List> consumers; + @Override + public void run() { + while (!shutdownTrigger.get()) { + try { + T message = supplier.take(); + for (MessageConsumer consumer : consumers) { + consumer.accept(message); + } + if (supplier.isComplete()) { + shutdownTrigger.set(true); + } + } catch (InterruptedException e) { + shutdownTrigger.set(true); + } + } + } + } +} diff --git a/domain/src/main/java/net/kemitix/thorp/domain/StringUtil.java b/domain/src/main/java/net/kemitix/thorp/domain/StringUtil.java new file mode 100644 index 0000000..676bf69 --- /dev/null +++ b/domain/src/main/java/net/kemitix/thorp/domain/StringUtil.java @@ -0,0 +1,11 @@ +package net.kemitix.thorp.domain; + +import java.util.stream.IntStream; + +public class StringUtil { + public static String repeat(String s, int times) { + StringBuilder sb = new StringBuilder(); + IntStream.range(0, times).forEach(x -> sb.append(s)); + return sb.toString(); + } +} diff --git a/domain/src/main/java/net/kemitix/thorp/domain/Terminal.java b/domain/src/main/java/net/kemitix/thorp/domain/Terminal.java index 492f6cd..d701e61 100644 --- a/domain/src/main/java/net/kemitix/thorp/domain/Terminal.java +++ b/domain/src/main/java/net/kemitix/thorp/domain/Terminal.java @@ -3,7 +3,6 @@ package net.kemitix.thorp.domain; import java.util.HashMap; import java.util.Map; import java.util.Optional; -import java.util.stream.IntStream; public class Terminal { @@ -67,6 +66,10 @@ public class Terminal { public static String enableAlternateBuffer = csi + "?1049h"; public static String disableAlternateBuffer = csi + "?1049l"; + public static String red = "\u001B[31m"; + public static String green = "\u001B[32m"; + public static String reset = "\u001B[0m"; + private static Map getSubBars() { Map subBars = new HashMap<>(); subBars.put(0, " "); @@ -186,15 +189,10 @@ public class Terminal { int fullHeadSize = pxDone / phases; int part = pxDone % phases; String partial = part != 0 ? subBars.getOrDefault(part, "") : ""; - String head = repeat("█", fullHeadSize) + partial; + String head = StringUtil.repeat("█", fullHeadSize) + partial; int tailSize = barWidth - head.length(); - String tail = repeat(" ", tailSize); + String tail = StringUtil.repeat(" ", tailSize); return "[" + head + tail + "]"; } - private static String repeat(String s, int times) { - StringBuilder sb = new StringBuilder(); - IntStream.range(0, times).forEach(x -> sb.append(s)); - return sb.toString(); - } } diff --git a/lib/pom.xml b/lib/pom.xml index 6a58df5..20864a8 100644 --- a/lib/pom.xml +++ b/lib/pom.xml @@ -12,6 +12,12 @@ lib + + + org.projectlombok + lombok + + net.kemitix.thorp diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileSystem.scala b/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileSystem.scala index 74e6571..a417cfc 100644 --- a/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileSystem.scala +++ b/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileSystem.scala @@ -16,42 +16,44 @@ import zio.clock.Clock trait LocalFileSystem { def scanCopyUpload( - configuration: Configuration, - uiChannel: UChannel[Any, UIEvent], - remoteObjects: RemoteObjects, - archive: ThorpArchive + configuration: Configuration, + uiChannel: UChannel[Any, UIEvent], + remoteObjects: RemoteObjects, + archive: ThorpArchive ): RIO[Clock with FileScanner with Storage, Seq[StorageEvent]] def scanDelete( - configuration: Configuration, - uiChannel: UChannel[Any, UIEvent], - remoteData: RemoteObjects, - archive: ThorpArchive + configuration: Configuration, + uiChannel: UChannel[Any, UIEvent], + remoteData: RemoteObjects, + archive: ThorpArchive ): RIO[Clock with Storage, Seq[StorageEvent]] } object LocalFileSystem extends LocalFileSystem { override def scanCopyUpload( - configuration: Configuration, - uiChannel: UChannel[Any, UIEvent], - remoteObjects: RemoteObjects, - archive: ThorpArchive - ): RIO[Clock with FileScanner with Storage, Seq[StorageEvent]] = + configuration: Configuration, + uiChannel: UChannel[Any, UIEvent], + remoteObjects: RemoteObjects, + archive: ThorpArchive + ): RIO[Clock with FileScanner, Seq[StorageEvent]] = for { actionCounter <- Ref.make(0) - bytesCounter <- Ref.make(0L) - uploads <- Ref.make(Map.empty[MD5Hash, Promise[Throwable, RemoteKey]]) - eventsRef <- Ref.make(List.empty[StorageEvent]) - fileSender <- FileScanner.scanSources(configuration) - fileReceiver <- fileReceiver(configuration, - uiChannel, - remoteObjects, - archive, - uploads, - actionCounter, - bytesCounter, - eventsRef) + bytesCounter <- Ref.make(0L) + uploads <- Ref.make(Map.empty[MD5Hash, Promise[Throwable, RemoteKey]]) + eventsRef <- Ref.make(List.empty[StorageEvent]) + fileSender <- FileScanner.scanSources(configuration) + fileReceiver <- fileReceiver( + configuration, + uiChannel, + remoteObjects, + archive, + uploads, + actionCounter, + bytesCounter, + eventsRef + ) parallel = configuration.parallel _ <- MessageChannel .pointToPointPar(parallel)(fileSender)(fileReceiver) @@ -60,22 +62,24 @@ object LocalFileSystem extends LocalFileSystem { } yield events override def scanDelete( - configuration: Configuration, - uiChannel: UChannel[Any, UIEvent], - remoteData: RemoteObjects, - archive: ThorpArchive - ): RIO[Clock with Storage, Seq[StorageEvent]] = + configuration: Configuration, + uiChannel: UChannel[Any, UIEvent], + remoteData: RemoteObjects, + archive: ThorpArchive + ): RIO[Clock, Seq[StorageEvent]] = for { actionCounter <- Ref.make(0) - bytesCounter <- Ref.make(0L) - eventsRef <- Ref.make(List.empty[StorageEvent]) - keySender <- keySender(remoteData.byKey.keys.asScala) - keyReceiver <- keyReceiver(configuration, - uiChannel, - archive, - actionCounter, - bytesCounter, - eventsRef) + bytesCounter <- Ref.make(0L) + eventsRef <- Ref.make(List.empty[StorageEvent]) + keySender <- keySender(remoteData.byKey.keys.asScala) + keyReceiver <- keyReceiver( + configuration, + uiChannel, + archive, + actionCounter, + bytesCounter, + eventsRef + ) parallel = configuration.parallel _ <- MessageChannel .pointToPointPar(parallel)(keySender)(keyReceiver) @@ -84,71 +88,80 @@ object LocalFileSystem extends LocalFileSystem { } yield events private def fileReceiver( - configuration: Configuration, - uiChannel: UChannel[Any, UIEvent], - remoteObjects: RemoteObjects, - archive: ThorpArchive, - uploads: Ref[Map[MD5Hash, Promise[Throwable, RemoteKey]]], - actionCounterRef: Ref[Int], - bytesCounterRef: Ref[Long], - eventsRef: Ref[List[StorageEvent]] - ): UIO[ - MessageChannel.UReceiver[Clock with Storage, FileScanner.ScannedFile]] = + configuration: Configuration, + uiChannel: UChannel[Any, UIEvent], + remoteObjects: RemoteObjects, + archive: ThorpArchive, + uploads: Ref[Map[MD5Hash, Promise[Throwable, RemoteKey]]], + actionCounterRef: Ref[Int], + bytesCounterRef: Ref[Long], + eventsRef: Ref[List[StorageEvent]] + ): UIO[MessageChannel.UReceiver[Clock, FileScanner.ScannedFile]] = UIO { message => val localFile = message.body for { _ <- uiFileFound(uiChannel)(localFile) - action <- chooseAction(configuration, - remoteObjects, - uploads, - uiChannel)(localFile) + action <- chooseAction( + configuration, + remoteObjects, + uploads, + uiChannel + )(localFile) actionCounter <- actionCounterRef.update(_ + 1) - bytesCounter <- bytesCounterRef.update(_ + action.size) - _ <- uiActionChosen(uiChannel)(action) + bytesCounter <- bytesCounterRef.update(_ + action.size) + _ <- uiActionChosen(uiChannel)(action) sequencedAction = SequencedAction(action, actionCounter) - event <- archive.update(configuration, - uiChannel, - sequencedAction, - bytesCounter) + event <- archive.update( + configuration, + uiChannel, + sequencedAction, + bytesCounter + ) _ <- eventsRef.update(list => event :: list) - _ <- uiActionFinished(uiChannel)(action, - actionCounter, - bytesCounter, - event) + _ <- uiActionFinished(uiChannel)( + action, + actionCounter, + bytesCounter, + event + ) } yield () } - private def uiActionChosen(uiChannel: MessageChannel.UChannel[Any, UIEvent])( - action: Action) = - Message.create(UIEvent.ActionChosen(action)) >>= + private def uiActionChosen( + uiChannel: MessageChannel.UChannel[Any, UIEvent] + )(action: Action) = + Message.create(UIEvent.actionChosen(action)) >>= MessageChannel.send(uiChannel) private def uiActionFinished(uiChannel: UChannel[Any, UIEvent])( - action: Action, - actionCounter: Int, - bytesCounter: Long, - event: StorageEvent + action: Action, + actionCounter: Int, + bytesCounter: Long, + event: StorageEvent ) = Message.create( - UIEvent.ActionFinished(action, actionCounter, bytesCounter, event)) >>= + UIEvent.actionFinished(action, actionCounter, bytesCounter, event) + ) >>= MessageChannel.send(uiChannel) - private def uiFileFound(uiChannel: UChannel[Any, UIEvent])( - localFile: LocalFile) = - Message.create(UIEvent.FileFound(localFile)) >>= + private def uiFileFound( + uiChannel: UChannel[Any, UIEvent] + )(localFile: LocalFile) = + Message.create(UIEvent.fileFound(localFile)) >>= MessageChannel.send(uiChannel) private def chooseAction( - configuration: Configuration, - remoteObjects: RemoteObjects, - uploads: Ref[Map[MD5Hash, Promise[Throwable, RemoteKey]]], - uiChannel: UChannel[Any, UIEvent], + configuration: Configuration, + remoteObjects: RemoteObjects, + uploads: Ref[Map[MD5Hash, Promise[Throwable, RemoteKey]]], + uiChannel: UChannel[Any, UIEvent], )(localFile: LocalFile): ZIO[Clock, Nothing, Action] = { for { - remoteExists <- UIO(remoteObjects.remoteKeyExists(localFile.remoteKey)) + remoteExists <- UIO(remoteObjects.remoteKeyExists(localFile.remoteKey)) remoteMatches <- UIO(remoteObjects.remoteMatchesLocalFile(localFile)) remoteForHash <- UIO( - remoteObjects.remoteHasHash(localFile.hashes).toScala) + remoteObjects.remoteHasHash(localFile.hashes).toScala + ) previous <- uploads.get bucket = configuration.bucket action <- if (remoteExists && remoteMatches) @@ -157,7 +170,7 @@ object LocalFileSystem extends LocalFileSystem { remoteForHash match { case pair: Some[Tuple[RemoteKey, MD5Hash]] => val sourceKey = pair.value.a - val hash = pair.value.b + val hash = pair.value.b doCopy(localFile, bucket, sourceKey, hash) case _ if matchesPreviousUpload(previous, localFile.hashes) => doCopyWithPreviousUpload(localFile, bucket, previous, uiChannel) @@ -169,8 +182,8 @@ object LocalFileSystem extends LocalFileSystem { } private def matchesPreviousUpload( - previous: Map[MD5Hash, Promise[Throwable, RemoteKey]], - hashes: Hashes + previous: Map[MD5Hash, Promise[Throwable, RemoteKey]], + hashes: Hashes ): Boolean = hashes .values() @@ -179,31 +192,24 @@ object LocalFileSystem extends LocalFileSystem { previous.contains(hash) }) - private def doNothing( - localFile: LocalFile, - bucket: Bucket - ): UIO[Action] = UIO { - Action.doNothing(bucket, localFile.remoteKey, localFile.length) - } + private def doNothing(localFile: LocalFile, bucket: Bucket): UIO[Action] = + UIO { + Action.doNothing(bucket, localFile.remoteKey, localFile.length) + } - private def doCopy( - localFile: LocalFile, - bucket: Bucket, - sourceKey: RemoteKey, - hash: MD5Hash - ): UIO[Action] = UIO { - Action.toCopy(bucket, - sourceKey, - hash, - localFile.remoteKey, - localFile.length) + private def doCopy(localFile: LocalFile, + bucket: Bucket, + sourceKey: RemoteKey, + hash: MD5Hash): UIO[Action] = UIO { + Action + .toCopy(bucket, sourceKey, hash, localFile.remoteKey, localFile.length) } private def doCopyWithPreviousUpload( - localFile: LocalFile, - bucket: Bucket, - previous: Map[MD5Hash, Promise[Throwable, RemoteKey]], - uiChannel: UChannel[Any, UIEvent], + localFile: LocalFile, + bucket: Bucket, + previous: Map[MD5Hash, Promise[Throwable, RemoteKey]], + uiChannel: UChannel[Any, UIEvent], ): ZIO[Clock, Nothing, Action] = { localFile.hashes .values() @@ -216,17 +222,22 @@ object LocalFileSystem extends LocalFileSystem { .map({ hash => for { awaitingMessage <- Message.create( - UIEvent.AwaitingAnotherUpload(localFile.remoteKey, hash)) + UIEvent.awaitingAnotherUpload(localFile.remoteKey, hash) + ) _ <- MessageChannel.send(uiChannel)(awaitingMessage) action <- previous(hash).await.map( remoteKey => - Action.toCopy(bucket, - remoteKey, - hash, - localFile.remoteKey, - localFile.length)) + Action.toCopy( + bucket, + remoteKey, + hash, + localFile.remoteKey, + localFile.length + ) + ) waitFinishedMessage <- Message.create( - UIEvent.AnotherUploadWaitComplete(action)) + UIEvent.anotherUploadWaitComplete(action) + ) _ <- MessageChannel.send(uiChannel)(waitFinishedMessage) } yield action }) @@ -234,15 +245,13 @@ object LocalFileSystem extends LocalFileSystem { .refineToOrDie[Nothing] } - private def doUpload( - localFile: LocalFile, - bucket: Bucket - ): UIO[Action] = { + private def doUpload(localFile: LocalFile, bucket: Bucket): UIO[Action] = { UIO(Action.toUpload(bucket, localFile, localFile.length)) } def keySender( - keys: Iterable[RemoteKey]): UIO[MessageChannel.Sender[Clock, RemoteKey]] = + keys: Iterable[RemoteKey] + ): UIO[MessageChannel.Sender[Clock, RemoteKey]] = UIO { channel => ZIO.foreach(keys) { key => Message.create(key) >>= MessageChannel.send(channel) @@ -250,47 +259,52 @@ object LocalFileSystem extends LocalFileSystem { } def keyReceiver( - configuration: Configuration, - uiChannel: UChannel[Any, UIEvent], - archive: ThorpArchive, - actionCounterRef: Ref[Int], - bytesCounterRef: Ref[Long], - eventsRef: Ref[List[StorageEvent]] - ): UIO[MessageChannel.UReceiver[Clock with Storage, RemoteKey]] = + configuration: Configuration, + uiChannel: UChannel[Any, UIEvent], + archive: ThorpArchive, + actionCounterRef: Ref[Int], + bytesCounterRef: Ref[Long], + eventsRef: Ref[List[StorageEvent]] + ): UIO[MessageChannel.UReceiver[Clock, RemoteKey]] = UIO { message => { val remoteKey = message.body for { _ <- uiKeyFound(uiChannel)(remoteKey) sources = configuration.sources - prefix = configuration.prefix - exists = FileSystem.hasLocalFile(sources, prefix, remoteKey) + prefix = configuration.prefix + exists = FileSystem.hasLocalFile(sources, prefix, remoteKey) _ <- ZIO.when(!exists) { for { actionCounter <- actionCounterRef.update(_ + 1) bucket = configuration.bucket action = Action.toDelete(bucket, remoteKey, 0L) - _ <- uiActionChosen(uiChannel)(action) + _ <- uiActionChosen(uiChannel)(action) bytesCounter <- bytesCounterRef.update(_ + action.size) sequencedAction = SequencedAction(action, actionCounter) - event <- archive.update(configuration, - uiChannel, - sequencedAction, - 0L) + event <- archive.update( + configuration, + uiChannel, + sequencedAction, + 0L + ) _ <- eventsRef.update(list => event :: list) - _ <- uiActionFinished(uiChannel)(action, - actionCounter, - bytesCounter, - event) + _ <- uiActionFinished(uiChannel)( + action, + actionCounter, + bytesCounter, + event + ) } yield () } } yield () } } - private def uiKeyFound(uiChannel: UChannel[Any, UIEvent])( - remoteKey: RemoteKey) = - Message.create(UIEvent.KeyFound(remoteKey)) >>= + private def uiKeyFound( + uiChannel: UChannel[Any, UIEvent] + )(remoteKey: RemoteKey) = + Message.create(UIEvent.keyFound(remoteKey)) >>= MessageChannel.send(uiChannel) } diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/ThorpArchive.scala b/lib/src/main/scala/net/kemitix/thorp/lib/ThorpArchive.scala index 5550c6e..76602a3 100644 --- a/lib/src/main/scala/net/kemitix/thorp/lib/ThorpArchive.scala +++ b/lib/src/main/scala/net/kemitix/thorp/lib/ThorpArchive.scala @@ -5,16 +5,15 @@ import net.kemitix.thorp.config.Configuration import net.kemitix.thorp.console._ import net.kemitix.thorp.domain.StorageEvent import net.kemitix.thorp.domain.StorageEvent._ -import net.kemitix.thorp.storage.Storage import net.kemitix.thorp.uishell.UIEvent -import zio.{UIO, ZIO} +import zio.UIO trait ThorpArchive { def update(configuration: Configuration, uiChannel: UChannel[Any, UIEvent], sequencedAction: SequencedAction, - totalBytesSoFar: Long): ZIO[Storage, Nothing, StorageEvent] + totalBytesSoFar: Long): UIO[StorageEvent] def logEvent(configuration: Configuration, event: StorageEvent): UIO[StorageEvent] = { diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/UnversionedMirrorArchive.scala b/lib/src/main/scala/net/kemitix/thorp/lib/UnversionedMirrorArchive.scala index f259ec1..909d579 100644 --- a/lib/src/main/scala/net/kemitix/thorp/lib/UnversionedMirrorArchive.scala +++ b/lib/src/main/scala/net/kemitix/thorp/lib/UnversionedMirrorArchive.scala @@ -6,72 +6,86 @@ import net.kemitix.thorp.domain.Action.{ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.Storage import net.kemitix.thorp.uishell.{UIEvent, UploadEventListener} -import zio.{UIO, ZIO} +import zio.UIO trait UnversionedMirrorArchive extends ThorpArchive { - override def update( - configuration: Configuration, - uiChannel: UChannel[Any, UIEvent], - sequencedAction: SequencedAction, - totalBytesSoFar: Long - ): ZIO[Storage, Nothing, StorageEvent] = { + override def update(configuration: Configuration, + uiChannel: UChannel[Any, UIEvent], + sequencedAction: SequencedAction, + totalBytesSoFar: Long): UIO[StorageEvent] = { val action = sequencedAction.action - val index = sequencedAction.index + val index = sequencedAction.index val bucket = action.bucket action match { case upload: ToUpload => val localFile = upload.localFile - doUpload(configuration, - uiChannel, - index, - totalBytesSoFar, - bucket, - localFile) + UIO { + doUpload( + configuration, + uiChannel, + index, + totalBytesSoFar, + bucket, + localFile + ) + } case toCopy: ToCopy => val sourceKey = toCopy.sourceKey - val hash = toCopy.hash + val hash = toCopy.hash val targetKey = toCopy.targetKey - Storage.copy(bucket, sourceKey, hash, targetKey) + UIO { + Storage + .getInstance() + .copy(bucket, sourceKey, hash, targetKey) + } case toDelete: ToDelete => val remoteKey = toDelete.remoteKey - Storage.delete(bucket, remoteKey) + UIO { + Storage.getInstance().delete(bucket, remoteKey) + } case doNothing: Action.DoNothing => val remoteKey = doNothing.remoteKey - UIO(StorageEvent.doNothingEvent(remoteKey)) + UIO { + StorageEvent.doNothingEvent(remoteKey) + } } } - private def doUpload( - configuration: Configuration, - uiChannel: UChannel[Any, UIEvent], - index: Int, - totalBytesSoFar: Long, - bucket: Bucket, - localFile: LocalFile - ) = - Storage.upload(localFile, - bucket, - listenerSettings(configuration, - uiChannel, - index, - totalBytesSoFar, - bucket, - localFile)) + private def doUpload(configuration: Configuration, + uiChannel: UChannel[Any, UIEvent], + index: Int, + totalBytesSoFar: Long, + bucket: Bucket, + localFile: LocalFile) = + Storage + .getInstance() + .upload( + localFile, + bucket, + listenerSettings( + configuration, + uiChannel, + index, + totalBytesSoFar, + bucket, + localFile + ) + ) - private def listenerSettings( - configuration: Configuration, - uiChannel: UChannel[Any, UIEvent], - index: Int, - totalBytesSoFar: Long, - bucket: Bucket, - localFile: LocalFile - ) = - UploadEventListener.Settings(uiChannel, - localFile, - index, - totalBytesSoFar, - configuration.batchMode) + private def listenerSettings(configuration: Configuration, + uiChannel: UChannel[Any, UIEvent], + index: Int, + totalBytesSoFar: Long, + bucket: Bucket, + localFile: LocalFile) = + UploadEventListener.Settings( + uiChannel, + localFile, + index, + totalBytesSoFar, + configuration.batchMode + ) } diff --git a/lib/src/test/scala/net/kemitix/thorp/lib/LocalFileSystemTest.scala b/lib/src/test/scala/net/kemitix/thorp/lib/LocalFileSystemTest.scala index a57a3bb..d5eab6a 100644 --- a/lib/src/test/scala/net/kemitix/thorp/lib/LocalFileSystemTest.scala +++ b/lib/src/test/scala/net/kemitix/thorp/lib/LocalFileSystemTest.scala @@ -13,7 +13,6 @@ import net.kemitix.thorp.config.{ import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.domain._ import net.kemitix.thorp.filesystem.Resource -import net.kemitix.thorp.storage.Storage import net.kemitix.thorp.uishell.UIEvent import net.kemitix.thorp.uishell.UIEvent.{ ActionChosen, @@ -24,17 +23,17 @@ import net.kemitix.thorp.uishell.UIEvent.{ import org.scalatest.FreeSpec import org.scalatest.Matchers._ import zio.clock.Clock -import zio.{DefaultRuntime, UIO, ZIO} +import zio.{DefaultRuntime, UIO} import scala.collection.MapView import scala.jdk.CollectionConverters._ class LocalFileSystemTest extends FreeSpec { - private val source = Resource.select(this, "upload") - private val sourcePath = source.toPath + private val source = Resource.select(this, "upload") + private val sourcePath = source.toPath private val sourceOption = ConfigOption.source(sourcePath) - private val bucket = Bucket.named("bucket") + private val bucket = Bucket.named("bucket") private val bucketOption = ConfigOption.bucket(bucket.name) private val configOptions = ConfigOptions.create( List[ConfigOption]( @@ -42,17 +41,17 @@ class LocalFileSystemTest extends FreeSpec { bucketOption, ConfigOption.ignoreGlobalOptions(), ConfigOption.ignoreUserOptions() - ).asJava) + ).asJava + ) private val uiEvents = new AtomicReference[List[UIEvent]](List.empty) - private val actions = new AtomicReference[List[SequencedAction]](List.empty) + private val actions = new AtomicReference[List[SequencedAction]](List.empty) private def archive: ThorpArchive = new ThorpArchive { - override def update( - configuration: Configuration, - uiChannel: UChannel[Any, UIEvent], - sequencedAction: SequencedAction, - totalBytesSoFar: Long): ZIO[Storage, Nothing, StorageEvent] = UIO { + override def update(configuration: Configuration, + uiChannel: UChannel[Any, UIEvent], + sequencedAction: SequencedAction, + totalBytesSoFar: Long): UIO[StorageEvent] = UIO { actions.updateAndGet(l => sequencedAction :: l) StorageEvent.doNothingEvent(sequencedAction.action.remoteKey) } @@ -60,22 +59,21 @@ class LocalFileSystemTest extends FreeSpec { private val runtime = new DefaultRuntime {} - private object TestEnv - extends Clock.Live - with FileScanner.Live - with Storage.Test + private object TestEnv extends Clock.Live with FileScanner.Live "scanCopyUpload" - { - def sender(configuration: Configuration, objects: RemoteObjects) - : UIO[MessageChannel.ESender[Clock with FileScanner with Storage, - Throwable, - UIEvent]] = + def sender( + configuration: Configuration, + objects: RemoteObjects + ): UIO[MessageChannel.ESender[Clock with FileScanner, Throwable, UIEvent]] = UIO { uiChannel => (for { - _ <- LocalFileSystem.scanCopyUpload(configuration, - uiChannel, - objects, - archive) + _ <- LocalFileSystem.scanCopyUpload( + configuration, + uiChannel, + objects, + archive + ) } yield ()) <* MessageChannel.endChannel(uiChannel) } def receiver(): UIO[MessageChannel.UReceiver[Any, UIEvent]] = @@ -87,9 +85,9 @@ class LocalFileSystemTest extends FreeSpec { def program(remoteObjects: RemoteObjects) = { val configuration = ConfigurationBuilder.buildConfig(configOptions) for { - sender <- sender(configuration, remoteObjects) + sender <- sender(configuration, remoteObjects) receiver <- receiver() - _ <- MessageChannel.pointToPoint(sender)(receiver).runDrain + _ <- MessageChannel.pointToPoint(sender)(receiver).runDrain } yield () } "where remote has no objects" - { @@ -102,7 +100,8 @@ class LocalFileSystemTest extends FreeSpec { actionList.filter(_.isInstanceOf[ToUpload]) should have size 2 actionList.map(_.remoteKey) shouldEqual Set( MD5HashData.Root.remoteKey, - MD5HashData.Leaf.remoteKey) + MD5HashData.Leaf.remoteKey + ) } "ui is updated" in { uiEvents.set(List.empty) @@ -112,7 +111,8 @@ class LocalFileSystemTest extends FreeSpec { summary should contain inOrderElementsOf List( "file found : root-file", "action chosen : root-file : ToUpload", - "action finished : root-file : ToUpload") + "action finished : root-file : ToUpload" + ) summary should contain inOrderElementsOf List( "file found : subdir/leaf-file", "action chosen : subdir/leaf-file : ToUpload", @@ -126,10 +126,12 @@ class LocalFileSystemTest extends FreeSpec { RemoteObjects.create( MapView( MD5HashData.Root.hash -> MD5HashData.Root.remoteKey, - MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey).toMap.asJava, + MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey + ).toMap.asJava, MapView( MD5HashData.Root.remoteKey -> MD5HashData.Root.hash, - MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash).toMap.asJava + MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash + ).toMap.asJava ) "do nothing for all files" - { "all archive actions do nothing" in { @@ -147,7 +149,8 @@ class LocalFileSystemTest extends FreeSpec { summary should contain inOrderElementsOf List( "file found : root-file", "action chosen : root-file : DoNothing", - "action finished : root-file : DoNothing") + "action finished : root-file : DoNothing" + ) summary should contain inOrderElementsOf List( "file found : subdir/leaf-file", "action chosen : subdir/leaf-file : DoNothing", @@ -159,7 +162,7 @@ class LocalFileSystemTest extends FreeSpec { "where remote has some objects" - { val remoteObjects = RemoteObjects.create( - MapView(MD5HashData.Root.hash -> MD5HashData.Root.remoteKey).toMap.asJava, + MapView(MD5HashData.Root.hash -> MD5HashData.Root.remoteKey).toMap.asJava, MapView(MD5HashData.Root.remoteKey -> MD5HashData.Root.hash).toMap.asJava ) "upload leaf, do nothing for root" - { @@ -181,7 +184,8 @@ class LocalFileSystemTest extends FreeSpec { summary should contain inOrderElementsOf List( "file found : root-file", "action chosen : root-file : DoNothing", - "action finished : root-file : DoNothing") + "action finished : root-file : DoNothing" + ) summary should contain inOrderElementsOf List( "file found : subdir/leaf-file", "action chosen : subdir/leaf-file : ToUpload", @@ -195,10 +199,12 @@ class LocalFileSystemTest extends FreeSpec { RemoteObjects.create( MapView( MD5HashData.Root.hash -> MD5HashData.Leaf.remoteKey, - MD5HashData.Leaf.hash -> MD5HashData.Root.remoteKey).toMap.asJava, + MD5HashData.Leaf.hash -> MD5HashData.Root.remoteKey + ).toMap.asJava, MapView( MD5HashData.Root.remoteKey -> MD5HashData.Leaf.hash, - MD5HashData.Leaf.remoteKey -> MD5HashData.Root.hash).toMap.asJava + MD5HashData.Leaf.remoteKey -> MD5HashData.Root.hash + ).toMap.asJava ) "copy files" - { "archive swaps objects" ignore { @@ -213,10 +219,12 @@ class LocalFileSystemTest extends FreeSpec { RemoteObjects.create( MapView( MD5HashData.Root.hash -> otherRootKey, - MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey).toMap.asJava, + MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey + ).toMap.asJava, MapView( - otherRootKey -> MD5HashData.Root.hash, - MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash).toMap.asJava + otherRootKey -> MD5HashData.Root.hash, + MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash + ).toMap.asJava ) "copy object and delete original" in { actions.set(List.empty) @@ -237,7 +245,8 @@ class LocalFileSystemTest extends FreeSpec { summary should contain inOrderElementsOf List( "file found : root-file", "action chosen : root-file : ToCopy", - "action finished : root-file : ToCopy") + "action finished : root-file : ToCopy" + ) summary should contain inOrderElementsOf List( "file found : subdir/leaf-file", "action chosen : subdir/leaf-file : DoNothing", @@ -248,14 +257,18 @@ class LocalFileSystemTest extends FreeSpec { } "scanDelete" - { - def sender(configuration: Configuration, objects: RemoteObjects) - : UIO[MessageChannel.ESender[Clock with Storage, Throwable, UIEvent]] = + def sender( + configuration: Configuration, + objects: RemoteObjects + ): UIO[MessageChannel.ESender[Clock, Throwable, UIEvent]] = UIO { uiChannel => (for { - _ <- LocalFileSystem.scanDelete(configuration, - uiChannel, - objects, - archive) + _ <- LocalFileSystem.scanDelete( + configuration, + uiChannel, + objects, + archive + ) } yield ()) <* MessageChannel.endChannel(uiChannel) } def receiver(): UIO[MessageChannel.UReceiver[Any, UIEvent]] = @@ -268,9 +281,9 @@ class LocalFileSystemTest extends FreeSpec { { val configuration = ConfigurationBuilder.buildConfig(configOptions) for { - sender <- sender(configuration, remoteObjects) + sender <- sender(configuration, remoteObjects) receiver <- receiver() - _ <- MessageChannel.pointToPoint(sender)(receiver).runDrain + _ <- MessageChannel.pointToPoint(sender)(receiver).runDrain } yield () } } @@ -278,10 +291,12 @@ class LocalFileSystemTest extends FreeSpec { val remoteObjects = RemoteObjects.create( MapView( MD5HashData.Root.hash -> MD5HashData.Root.remoteKey, - MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey).toMap.asJava, + MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey + ).toMap.asJava, MapView( MD5HashData.Root.remoteKey -> MD5HashData.Root.hash, - MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash).toMap.asJava + MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash + ).toMap.asJava ) "do nothing for all files" - { "no archive actions" in { @@ -293,21 +308,27 @@ class LocalFileSystemTest extends FreeSpec { "ui is updated" in { uiEvents.set(List.empty) runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv)) - uiEventsSummary shouldEqual List("key found: root-file", - "key found: subdir/leaf-file") + uiEventsSummary shouldEqual List( + "key found: root-file", + "key found: subdir/leaf-file" + ) } } } "where remote has extra objects" - { - val extraHash = MD5Hash.create("extra") + val extraHash = MD5Hash.create("extra") val extraObject = RemoteKey.create("extra") val remoteObjects = RemoteObjects.create( - MapView(MD5HashData.Root.hash -> MD5HashData.Root.remoteKey, - MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey, - extraHash -> extraObject).toMap.asJava, - MapView(MD5HashData.Root.remoteKey -> MD5HashData.Root.hash, - MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash, - extraObject -> extraHash).toMap.asJava + MapView( + MD5HashData.Root.hash -> MD5HashData.Root.remoteKey, + MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey, + extraHash -> extraObject + ).toMap.asJava, + MapView( + MD5HashData.Root.remoteKey -> MD5HashData.Root.hash, + MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash, + extraObject -> extraHash + ).toMap.asJava ) "remove the extra object" - { "archive delete action" in { @@ -339,18 +360,22 @@ class LocalFileSystemTest extends FreeSpec { .get() .reverse .map { - case FileFound(localFile) => - String.format("file found : %s", localFile.remoteKey.key) - case ActionChosen(action) => - String.format("action chosen : %s : %s", - action.remoteKey.key, - action.getClass.getSimpleName) - case ActionFinished(action, actionCounter, bytesCounter, event) => - String.format("action finished : %s : %s", - action.remoteKey.key, - action.getClass.getSimpleName) - case KeyFound(remoteKey) => - String.format("key found: %s", remoteKey.key) + case uie: FileFound => + String.format("file found : %s", uie.localFile.remoteKey.key) + case uie: ActionChosen => + String.format( + "action chosen : %s : %s", + uie.action.remoteKey.key, + uie.action.getClass.getSimpleName + ) + case uie: ActionFinished => + String.format( + "action finished : %s : %s", + uie.action.remoteKey.key, + uie.action.getClass.getSimpleName + ) + case uie: KeyFound => + String.format("key found: %s", uie.remoteKey.key) case x => String.format("unknown : %s", x.getClass.getSimpleName) } } diff --git a/storage-aws/src/main/java/net/kemitix/thorp/storage/aws/S3Storage.java b/storage-aws/src/main/java/net/kemitix/thorp/storage/aws/S3Storage.java new file mode 100644 index 0000000..7610629 --- /dev/null +++ b/storage-aws/src/main/java/net/kemitix/thorp/storage/aws/S3Storage.java @@ -0,0 +1,65 @@ +package net.kemitix.thorp.storage.aws; + +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.CopyObjectRequest; +import com.amazonaws.services.s3.model.DeleteObjectRequest; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.transfer.TransferManagerBuilder; +import net.kemitix.thorp.domain.*; +import net.kemitix.thorp.storage.Storage; +import net.kemitix.thorp.uishell.UploadEventListener; + +import java.util.function.Function; + +public class S3Storage implements Storage { + + private final AmazonS3Client client = + AmazonS3Client.create(AmazonS3ClientBuilder.standard().build()); + private final S3TransferManager transferManager = + S3TransferManager.create( + TransferManagerBuilder.defaultTransferManager()); + private final Function uploader = + S3Uploader.uploader(transferManager); + private final Function lister = + S3Lister.lister(client); + private final Function copier = + S3Copier.copier(client); + private final Function deleter = + S3Deleter.deleter(client); + + @Override + public RemoteObjects list( + Bucket bucket, + RemoteKey prefix + ) { + return lister.apply(S3Lister.request(bucket, prefix)); + } + + @Override + public StorageEvent upload( + LocalFile localFile, + Bucket bucket, + UploadEventListener.Settings listener + ) { + return uploader.apply(S3Uploader.request(localFile, bucket)); + } + + @Override + public StorageEvent copy( + Bucket bucket, + RemoteKey sourceKey, + MD5Hash hash, + RemoteKey targetKey + ) { + return copier.apply(S3Copier.request(bucket, sourceKey, hash, targetKey)); + } + + @Override + public StorageEvent delete( + Bucket bucket, + RemoteKey remoteKey + ) { + return deleter.apply(S3Deleter.request(bucket, remoteKey)); + } +} diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala deleted file mode 100644 index 66d1223..0000000 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala +++ /dev/null @@ -1,59 +0,0 @@ -package net.kemitix.thorp.storage.aws - -import com.amazonaws.services.s3.AmazonS3ClientBuilder -import com.amazonaws.services.s3.transfer.TransferManagerBuilder -import net.kemitix.thorp.domain._ -import net.kemitix.thorp.storage.Storage -import net.kemitix.thorp.storage.Storage.Service -import net.kemitix.thorp.uishell.UploadEventListener -import zio.{RIO, UIO} - -object S3Storage { - trait Live extends Storage { - val storage: Service = new Service { - - private val client: AmazonS3Client = - AmazonS3Client.create(AmazonS3ClientBuilder.standard().build()) - private val transferManager: S3TransferManager = - S3TransferManager.create(TransferManagerBuilder.defaultTransferManager) - private val copier = S3Copier.copier(client) - private val uploader = S3Uploader.uploader(transferManager) - private val deleter = S3Deleter.deleter(client) - private val lister = S3Lister.lister(client) - - override def listObjects(bucket: Bucket, - prefix: RemoteKey): RIO[Storage, RemoteObjects] = - UIO { - lister(S3Lister.request(bucket, prefix)) - } - - override def upload(localFile: LocalFile, - bucket: Bucket, - listenerSettings: UploadEventListener.Settings, - ): UIO[StorageEvent] = - UIO { - uploader(S3Uploader.request(localFile, bucket)) - } - - override def copy(bucket: Bucket, - sourceKey: RemoteKey, - hash: MD5Hash, - targetKey: RemoteKey): UIO[StorageEvent] = - UIO { - copier(S3Copier.request(bucket, sourceKey, hash, targetKey)) - } - - override def delete(bucket: Bucket, - remoteKey: RemoteKey): UIO[StorageEvent] = - UIO { - deleter(S3Deleter.request(bucket, remoteKey)) - } - - override def shutdown: UIO[StorageEvent] = { - UIO(transferManager.shutdownNow(true)) *> UIO(client.shutdown()) - .map(_ => StorageEvent.shutdownEvent()) - } - } - } - object Live extends Live -} diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala index d273b5f..c96c5cc 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala @@ -1,10 +1,7 @@ package net.kemitix.thorp.storage.aws -import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.Storage -import net.kemitix.thorp.uishell.UploadEventListener import org.scalamock.scalatest.MockFactory -import zio.{RIO, UIO} trait AmazonS3ClientTestFixture extends MockFactory { @@ -17,47 +14,47 @@ trait AmazonS3ClientTestFixture extends MockFactory { case class Fixture(amazonS3Client: AmazonS3Client, amazonS3TransferManager: S3TransferManager, ) { - lazy val storageService: Storage.Service = - new Storage.Service { - - private val client = amazonS3Client - private val transferManager = amazonS3TransferManager - - override def listObjects( - bucket: Bucket, - prefix: RemoteKey - ): RIO[Storage, RemoteObjects] = - UIO { - S3Lister.lister(client)(S3Lister.request(bucket, prefix)) - } - - override def upload(localFile: LocalFile, - bucket: Bucket, - listenerSettings: UploadEventListener.Settings, - ): UIO[StorageEvent] = - UIO( - S3Uploader - .uploader(transferManager)(S3Uploader.request(localFile, bucket)) - ) - - override def copy(bucket: Bucket, - sourceKey: RemoteKey, - hash: MD5Hash, - targetKey: RemoteKey): UIO[StorageEvent] = - UIO { - val request = S3Copier.request(bucket, sourceKey, hash, targetKey) - S3Copier.copier(client)(request) - } - - override def delete(bucket: Bucket, - remoteKey: RemoteKey): UIO[StorageEvent] = - UIO(S3Deleter.deleter(client)(S3Deleter.request(bucket, remoteKey))) - - override def shutdown: UIO[StorageEvent] = { - UIO(transferManager.shutdownNow(true)) *> UIO(client.shutdown()) - .map(_ => StorageEvent.shutdownEvent()) - } - } + lazy val storageService: Storage = Storage.getInstance() +// new Storage.Service { +// +// private val client = amazonS3Client +// private val transferManager = amazonS3TransferManager +// +// override def listObjects( +// bucket: Bucket, +// prefix: RemoteKey +// ): RIO[Storage, RemoteObjects] = +// UIO { +// S3Lister.lister(client)(S3Lister.request(bucket, prefix)) +// } +// +// override def upload(localFile: LocalFile, +// bucket: Bucket, +// listenerSettings: UploadEventListener.Settings, +// ): UIO[StorageEvent] = +// UIO( +// S3Uploader +// .uploader(transferManager)(S3Uploader.request(localFile, bucket)) +// ) +// +// override def copy(bucket: Bucket, +// sourceKey: RemoteKey, +// hash: MD5Hash, +// targetKey: RemoteKey): UIO[StorageEvent] = +// UIO { +// val request = S3Copier.request(bucket, sourceKey, hash, targetKey) +// S3Copier.copier(client)(request) +// } +// +// override def delete(bucket: Bucket, +// remoteKey: RemoteKey): UIO[StorageEvent] = +// UIO(S3Deleter.deleter(client)(S3Deleter.request(bucket, remoteKey))) +// +// override def shutdown: UIO[StorageEvent] = { +// UIO(transferManager.shutdownNow(true)) *> UIO(client.shutdown()) +// .map(_ => StorageEvent.shutdownEvent()) +// } +// } } } diff --git a/storage/pom.xml b/storage/pom.xml index 40a74f7..5cb25f5 100644 --- a/storage/pom.xml +++ b/storage/pom.xml @@ -21,22 +21,6 @@ net.kemitix.thorp thorp-domain - - - - org.scala-lang - scala-library - - - - - dev.zio - zio_2.13 - - - dev.zio - zio-streams_2.13 - diff --git a/storage/src/main/java/net/kemitix/thorp/storage/Storage.java b/storage/src/main/java/net/kemitix/thorp/storage/Storage.java new file mode 100644 index 0000000..cd1dfab --- /dev/null +++ b/storage/src/main/java/net/kemitix/thorp/storage/Storage.java @@ -0,0 +1,37 @@ +package net.kemitix.thorp.storage; + +import net.kemitix.thorp.domain.*; +import net.kemitix.thorp.uishell.UploadEventListener; + +import java.util.ServiceLoader; + +public interface Storage { + + RemoteObjects list( + Bucket bucket, + RemoteKey prefix + ); + + StorageEvent upload( + LocalFile localFile, + Bucket bucket, + UploadEventListener.Settings listener + ); + + StorageEvent copy( + Bucket bucket, + RemoteKey sourceKey, + MD5Hash hash, + RemoteKey targetKey + ); + + StorageEvent delete( + Bucket bucket, + RemoteKey remoteKey + ); + + static Storage getInstance() { + return ServiceLoader.load(Storage.class).iterator().next(); + } + +} diff --git a/storage/src/main/scala/net/kemitix/thorp/storage/Storage.scala b/storage/src/main/scala/net/kemitix/thorp/storage/Storage.scala deleted file mode 100644 index 4898de4..0000000 --- a/storage/src/main/scala/net/kemitix/thorp/storage/Storage.scala +++ /dev/null @@ -1,99 +0,0 @@ -package net.kemitix.thorp.storage - -import net.kemitix.thorp.domain._ -import net.kemitix.thorp.uishell.UploadEventListener -import zio.{RIO, Task, UIO, ZIO} - -trait Storage { - val storage: Storage.Service -} - -object Storage { - trait Service { - - def listObjects(bucket: Bucket, - prefix: RemoteKey): RIO[Storage, RemoteObjects] - - def upload(localFile: LocalFile, - bucket: Bucket, - listenerSettings: UploadEventListener.Settings, - ): ZIO[Storage, Nothing, StorageEvent] - - def copy(bucket: Bucket, - sourceKey: RemoteKey, - hash: MD5Hash, - targetKey: RemoteKey): ZIO[Storage, Nothing, StorageEvent] - - def delete(bucket: Bucket, remoteKey: RemoteKey): UIO[StorageEvent] - - def shutdown: UIO[StorageEvent] - } - - trait Test extends Storage { - - def listResult: Task[RemoteObjects] = - Task.die(new NotImplementedError) - def uploadResult: UIO[StorageEvent] = - Task.die(new NotImplementedError) - def copyResult: UIO[StorageEvent] = - Task.die(new NotImplementedError) - def deleteResult: UIO[StorageEvent] = - Task.die(new NotImplementedError) - def shutdownResult: UIO[StorageEvent] = - Task.die(new NotImplementedError) - - val storage: Service = new Service { - - override def listObjects(bucket: Bucket, - prefix: RemoteKey): RIO[Storage, RemoteObjects] = - listResult - - override def upload( - localFile: LocalFile, - bucket: Bucket, - listenerSettings: UploadEventListener.Settings - ): ZIO[Storage, Nothing, StorageEvent] = - uploadResult - - override def copy( - bucket: Bucket, - sourceKey: RemoteKey, - hash: MD5Hash, - targetKey: RemoteKey - ): ZIO[Storage, Nothing, StorageEvent] = - copyResult - - override def delete(bucket: Bucket, - remoteKey: RemoteKey): UIO[StorageEvent] = - deleteResult - - override def shutdown: UIO[StorageEvent] = - shutdownResult - - } - } - - object Test extends Test - - final def list(bucket: Bucket, - prefix: RemoteKey): RIO[Storage, RemoteObjects] = - ZIO.accessM(_.storage listObjects (bucket, prefix)) - - final def upload( - localFile: LocalFile, - bucket: Bucket, - listenerSettings: UploadEventListener.Settings - ): ZIO[Storage, Nothing, StorageEvent] = - ZIO.accessM(_.storage upload (localFile, bucket, listenerSettings)) - - final def copy(bucket: Bucket, - sourceKey: RemoteKey, - hash: MD5Hash, - targetKey: RemoteKey): ZIO[Storage, Nothing, StorageEvent] = - ZIO.accessM(_.storage copy (bucket, sourceKey, hash, targetKey)) - - final def delete(bucket: Bucket, - remoteKey: RemoteKey): ZIO[Storage, Nothing, StorageEvent] = - ZIO.accessM(_.storage delete (bucket, remoteKey)) - -} diff --git a/uishell/pom.xml b/uishell/pom.xml index a7d87b2..f8eab90 100644 --- a/uishell/pom.xml +++ b/uishell/pom.xml @@ -12,6 +12,13 @@ uishell + + + org.projectlombok + lombok + true + + net.kemitix.thorp diff --git a/uishell/src/main/java/net/kemitix/thorp/uishell/ProgressUI.java b/uishell/src/main/java/net/kemitix/thorp/uishell/ProgressUI.java new file mode 100644 index 0000000..4d95edb --- /dev/null +++ b/uishell/src/main/java/net/kemitix/thorp/uishell/ProgressUI.java @@ -0,0 +1,98 @@ +package net.kemitix.thorp.uishell; + +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; +import net.kemitix.thorp.config.Configuration; +import net.kemitix.thorp.console.Console; +import net.kemitix.thorp.domain.LocalFile; +import net.kemitix.thorp.domain.RemoteKey; +import net.kemitix.thorp.domain.StringUtil; +import net.kemitix.thorp.domain.Terminal; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static net.kemitix.thorp.domain.SizeTranslation.sizeInEnglish; +import static net.kemitix.thorp.domain.Terminal.progressBar; + +public class ProgressUI { + + static UploadState uploadState(long transferred, long fileLength) { + return new UploadState(transferred, fileLength); + } + @RequiredArgsConstructor(access = AccessLevel.PRIVATE) + static class UploadState { + public final long transferred; + public final long fileLength; + } + + private static final AtomicReference> uploads = + new AtomicReference<>(Collections.emptyMap()); + + private static final int statusHeight = 2; + + public static void requestCycle( + Configuration configuration, + LocalFile localFile, + long bytesTransferred, + int index, + long totalBytesSoFar + ) { + if (bytesTransferred < localFile.length) { + stillUploading( + localFile.remoteKey, + localFile.length, + bytesTransferred + ); + } else { + finishedUploading(localFile.remoteKey); + } + } + + static void stillUploading(RemoteKey remoteKey, + long fileLength, + long bytesTransferred) { + Map current = + uploads.updateAndGet(map -> { + HashMap updated = new HashMap<>(map); + updated.put( + remoteKey, + uploadState(bytesTransferred, fileLength)); + return updated; + }); + String resetCursor = StringUtil.repeat( + Terminal.cursorPrevLine(statusHeight), current.size()); + current.forEach((key, state) -> { + String percent = String.format("%2d", (state.transferred * 100) / state.fileLength); + String transferred = sizeInEnglish(state.transferred); + String fileLength1 = sizeInEnglish(state.fileLength); + String line1 = String.format("%sUploading%s: %s:%s", + Terminal.green, Terminal.reset, + key.key(), + Terminal.eraseLineForward); + String line2body = String.format( + "%s%% %s or %s", + percent, transferred, fileLength1); + String bar = progressBar( + state.transferred, + state.fileLength, + Terminal.width() - line2body.length()); + String line2 = String.join("", + Terminal.green, line2body, Terminal.reset, + bar, Terminal.eraseLineForward); + Console.putStrLn(line1); + Console.putStrLn(line2); + }); + Console.putStr(resetCursor); + } + + static void finishedUploading(RemoteKey remoteKey) { + uploads.updateAndGet(map -> { + Map updated = new HashMap<>(map); + updated.remove(remoteKey); + return updated; + }); + } +} diff --git a/uishell/src/main/java/net/kemitix/thorp/uishell/UIEvent.java b/uishell/src/main/java/net/kemitix/thorp/uishell/UIEvent.java new file mode 100644 index 0000000..d102806 --- /dev/null +++ b/uishell/src/main/java/net/kemitix/thorp/uishell/UIEvent.java @@ -0,0 +1,105 @@ +package net.kemitix.thorp.uishell; + +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; +import net.kemitix.thorp.domain.*; + +public interface UIEvent { + static UIEvent showValidConfig() { + return new ShowValidConfig(); + } + class ShowValidConfig implements UIEvent { } + + static UIEvent remoteDataFetched(int size) { + return new RemoteDataFetched(size); + } + @RequiredArgsConstructor(access = AccessLevel.PRIVATE) + class RemoteDataFetched implements UIEvent { + public final int size; + } + + static UIEvent showSummary(Counters counters) { + return new ShowSummary(counters); + } + @RequiredArgsConstructor(access = AccessLevel.PRIVATE) + class ShowSummary implements UIEvent { + public final Counters counters; + } + + static UIEvent fileFound(LocalFile localFile) { + return new FileFound(localFile); + } + @RequiredArgsConstructor(access = AccessLevel.PRIVATE) + class FileFound implements UIEvent { + public final LocalFile localFile; + } + + static UIEvent actionChosen(Action action) { + return new ActionChosen(action); + } + @RequiredArgsConstructor(access = AccessLevel.PRIVATE) + class ActionChosen implements UIEvent { + public final Action action; + } + + /** + * The content of the file ({{hash}}) that will be placed + * at {{remoteKey}} is already being uploaded to another + * location. Once that upload has completed, its RemoteKey + * will become available and a Copy action can be made. + * @param remoteKey where this upload will copy the other to + * @param hash the hash of the file being uploaded + */ + static UIEvent awaitingAnotherUpload(RemoteKey remoteKey, MD5Hash hash) { + return new AwaitingAnotherUpload(remoteKey, hash); + } + @RequiredArgsConstructor(access = AccessLevel.PRIVATE) + class AwaitingAnotherUpload implements UIEvent { + public final RemoteKey remoteKey; + public final MD5Hash hash; + } + + static UIEvent anotherUploadWaitComplete(Action action) { + return new AnotherUploadWaitComplete(action); + } + @RequiredArgsConstructor(access = AccessLevel.PRIVATE) + class AnotherUploadWaitComplete implements UIEvent { + public final Action action; + } + + static UIEvent actionFinished(Action action, + int actionCounter, + long bytesCounter, + StorageEvent event) { + return new ActionFinished(action, actionCounter, bytesCounter, event); + } + @RequiredArgsConstructor(access = AccessLevel.PRIVATE) + class ActionFinished implements UIEvent { + public final Action action; + public final int actionCounter; + public final long bytesCounter; + public final StorageEvent event; + } + + static UIEvent keyFound(RemoteKey remoteKey) { + return new KeyFound(remoteKey); + } + @RequiredArgsConstructor(access = AccessLevel.PRIVATE) + class KeyFound implements UIEvent { + public final RemoteKey remoteKey; + } + + static UIEvent requestCycle(LocalFile localFile, + long bytesTransfered, + int index, + long totalBytesSoFar) { + return new RequestCycle(localFile, bytesTransfered, index, totalBytesSoFar); + } + @RequiredArgsConstructor(access = AccessLevel.PRIVATE) + class RequestCycle implements UIEvent { + public final LocalFile localFile; + public final long bytesTransferred; + public final int index; + public final long totalBytesSoFar; + } +} diff --git a/uishell/src/main/java/net/kemitix/thorp/uishell/UploadProgressEvent.java b/uishell/src/main/java/net/kemitix/thorp/uishell/UploadProgressEvent.java new file mode 100644 index 0000000..dd34024 --- /dev/null +++ b/uishell/src/main/java/net/kemitix/thorp/uishell/UploadProgressEvent.java @@ -0,0 +1,35 @@ +package net.kemitix.thorp.uishell; + +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; + +public interface UploadProgressEvent { + + static UploadProgressEvent transferEvent(String name) { + return new TransferEvent(name); + } + @RequiredArgsConstructor(access = AccessLevel.PRIVATE) + class TransferEvent implements UploadProgressEvent { + public final String name; + } + + static UploadProgressEvent requestEvent(String name, + long bytes, + long transferred) { + return new RequestEvent(name, bytes, transferred); + } + @RequiredArgsConstructor(access = AccessLevel.PRIVATE) + class RequestEvent implements UploadProgressEvent { + public final String name; + public final long bytes; + public final long transferred; + } + + static UploadProgressEvent bytesTransferEvent(String name) { + return new TransferEvent(name); + } + @RequiredArgsConstructor(access = AccessLevel.PRIVATE) + class ByteTransferEvent implements UploadProgressEvent { + public final String name; + } +} diff --git a/uishell/src/main/scala/net/kemitix/thorp/uishell/ProgressEvent.scala b/uishell/src/main/scala/net/kemitix/thorp/uishell/ProgressEvent.scala deleted file mode 100644 index fa85387..0000000 --- a/uishell/src/main/scala/net/kemitix/thorp/uishell/ProgressEvent.scala +++ /dev/null @@ -1,19 +0,0 @@ -package net.kemitix.thorp.uishell - -import net.kemitix.eip.zio.MessageChannel -import net.kemitix.thorp.console.Console -import net.kemitix.thorp.filesystem.FileSystem -import zio.clock.Clock - -sealed trait ProgressEvent - -object ProgressEvent { - type Env = Console - type ProgressSender = - MessageChannel.ESender[Clock with FileSystem, Throwable, ProgressEvent] - type ProgressReceiver = - MessageChannel.Receiver[ProgressEvent.Env, ProgressEvent] - type ProgressChannel = MessageChannel.Channel[Console, ProgressEvent] - - final case class PingEvent() extends ProgressEvent -} diff --git a/uishell/src/main/scala/net/kemitix/thorp/uishell/ProgressUI.scala b/uishell/src/main/scala/net/kemitix/thorp/uishell/ProgressUI.scala deleted file mode 100644 index af2c5be..0000000 --- a/uishell/src/main/scala/net/kemitix/thorp/uishell/ProgressUI.scala +++ /dev/null @@ -1,80 +0,0 @@ -package net.kemitix.thorp.uishell - -import java.util.concurrent.atomic.AtomicReference - -import net.kemitix.thorp.config.Configuration -import net.kemitix.thorp.console.Console -import net.kemitix.thorp.domain.SizeTranslation.sizeInEnglish -import net.kemitix.thorp.domain.Terminal.{eraseLineForward, progressBar} -import net.kemitix.thorp.domain.{LocalFile, RemoteKey, Terminal} -import zio.{UIO, ZIO} - -import scala.io.AnsiColor.{GREEN, RESET} - -object ProgressUI { - - private case class UploadState(transferred: Long, fileLength: Long) - - private val uploads: AtomicReference[Map[RemoteKey, UploadState]] = - new AtomicReference[Map[RemoteKey, UploadState]](Map.empty) - - private val statusHeight = 2 - - def requestCycle(configuration: Configuration, - localFile: LocalFile, - bytesTransferred: Long, - index: Int, - totalBytesSoFar: Long): UIO[Unit] = - for { - _ <- ZIO.when(bytesTransferred < localFile.file.length())( - stillUploading( - localFile.remoteKey, - localFile.file.length(), - bytesTransferred - ) - ) - _ <- ZIO.when(bytesTransferred >= localFile.file.length()) { - finishedUploading(localFile.remoteKey) - } - } yield () - - private def stillUploading(remoteKey: RemoteKey, - fileLength: Long, - bytesTransferred: Long): UIO[Unit] = { - val current: Map[RemoteKey, UploadState] = - uploads.updateAndGet( - (m: Map[RemoteKey, UploadState]) => - m.updated(remoteKey, UploadState(bytesTransferred, fileLength)) - ) - val resetCursor = s"${Terminal.cursorPrevLine(statusHeight) * current.size}" - ZIO.foreach(current) { entry => - { - val (remoteKey, state) = entry - - val percent = f"${(state.transferred * 100) / state.fileLength}%2d" - val transferred = sizeInEnglish(state.transferred) - val fileLength = sizeInEnglish(state.fileLength) - val line1 = - s"${GREEN}Uploading:$RESET ${remoteKey.key}$eraseLineForward" - val line2body = s"($percent%) $transferred of $fileLength " - val bar = - progressBar( - state.transferred.toDouble, - state.fileLength.toDouble, - Terminal.width - line2body.length - ) - val line2 = s"$GREEN$line2body$RESET$bar$eraseLineForward" - UIO(Console.putStrLn(line1)) *> - UIO(Console.putStrLn(line2)) - } - } *> UIO(Console.putStr(resetCursor)) - } - - def finishedUploading(remoteKey: RemoteKey): ZIO[Any, Nothing, Unit] = { - UIO( - uploads - .updateAndGet((m: Map[RemoteKey, UploadState]) => m.removed(remoteKey)) - ) *> UIO.unit - } - -} diff --git a/uishell/src/main/scala/net/kemitix/thorp/uishell/UIEvent.scala b/uishell/src/main/scala/net/kemitix/thorp/uishell/UIEvent.scala deleted file mode 100644 index 81ae8ee..0000000 --- a/uishell/src/main/scala/net/kemitix/thorp/uishell/UIEvent.scala +++ /dev/null @@ -1,44 +0,0 @@ -package net.kemitix.thorp.uishell - -import net.kemitix.thorp.domain._ - -sealed trait UIEvent -object UIEvent { - case object ShowValidConfig extends UIEvent - - case class RemoteDataFetched(size: Int) extends UIEvent - - case class ShowSummary(counters: Counters) extends UIEvent - - case class FileFound(localFile: LocalFile) extends UIEvent - - case class ActionChosen(action: Action) extends UIEvent - - /** - * The content of the file ({{hash}}) that will be placed - * at {{remoteKey}} is already being uploaded to another - * location. Once that upload has completed, its RemoteKey - * will become available and a Copy action can be made. - * @param remoteKey where this upload will copy the other to - * @param hash the hash of the file being uploaded - */ - case class AwaitingAnotherUpload(remoteKey: RemoteKey, hash: MD5Hash) - extends UIEvent - - case class AnotherUploadWaitComplete(action: Action) extends UIEvent - - case class ActionFinished(action: Action, - actionCounter: Int, - bytesCounter: Long, - event: StorageEvent) - extends UIEvent - - case class KeyFound(remoteKey: RemoteKey) extends UIEvent - - case class RequestCycle(localFile: LocalFile, - bytesTransferred: Long, - index: Int, - totalBytesSoFar: Long) - extends UIEvent - -} diff --git a/uishell/src/main/scala/net/kemitix/thorp/uishell/UIShell.scala b/uishell/src/main/scala/net/kemitix/thorp/uishell/UIShell.scala index 0e70a57..02b14fa 100644 --- a/uishell/src/main/scala/net/kemitix/thorp/uishell/UIShell.scala +++ b/uishell/src/main/scala/net/kemitix/thorp/uishell/UIShell.scala @@ -14,31 +14,28 @@ object UIShell { ): UIO[MessageChannel.UReceiver[Any, UIEvent]] = UIO { uiEventMessage => uiEventMessage.body match { - case UIEvent.ShowValidConfig => showValidConfig(configuration) - case UIEvent.RemoteDataFetched(size) => remoteDataFetched(size) - case UIEvent.ShowSummary(counters) => showSummary(counters) - case UIEvent.FileFound(localFile) => fileFound(configuration, localFile) - case UIEvent.ActionChosen(action) => actionChosen(configuration, action) - case UIEvent.AwaitingAnotherUpload(remoteKey, hash) => - awaitingUpload(remoteKey, hash) - case UIEvent.AnotherUploadWaitComplete(action) => - uploadWaitComplete(action) - case UIEvent.ActionFinished(_, _, _, event) => - actionFinished(configuration, event) - case UIEvent.KeyFound(_) => UIO(()) - case UIEvent.RequestCycle( - localFile, - bytesTransferred, - index, - totalBytesSoFar - ) => + case _: UIEvent.ShowValidConfig => showValidConfig(configuration) + case uie: UIEvent.RemoteDataFetched => remoteDataFetched(uie.size) + case uie: UIEvent.ShowSummary => showSummary(uie.counters) + case uie: UIEvent.FileFound => fileFound(configuration, uie.localFile) + case uie: UIEvent.ActionChosen => + actionChosen(configuration, uie.action) + case uie: UIEvent.AwaitingAnotherUpload => + awaitingUpload(uie.remoteKey, uie.hash) + case uie: UIEvent.AnotherUploadWaitComplete => + uploadWaitComplete(uie.action) + case uie: UIEvent.ActionFinished => + actionFinished(configuration, uie.event) + case _: UIEvent.KeyFound => UIO.unit + case uie: UIEvent.RequestCycle => ProgressUI.requestCycle( configuration, - localFile, - bytesTransferred, - index, - totalBytesSoFar + uie.localFile, + uie.bytesTransferred, + uie.index, + uie.totalBytesSoFar ) + UIO.unit } } @@ -61,6 +58,7 @@ object UIShell { Console .putMessageLnB(ConsoleOut.uploadComplete(remoteKey), batchMode) ProgressUI.finishedUploading(remoteKey) + UIO.unit case deleteEvent: StorageEvent.DeleteEvent => val remoteKey = deleteEvent.remoteKey Console.putMessageLnB(ConsoleOut.deleteComplete(remoteKey), batchMode) @@ -69,13 +67,13 @@ object UIShell { val remoteKey = errorEvent.remoteKey val action = errorEvent.action val e = errorEvent.e - ProgressUI.finishedUploading(remoteKey) *> - UIO( - Console.putMessageLnB( - ConsoleOut.errorQueueEventOccurred(action, e), - batchMode - ) + ProgressUI.finishedUploading(remoteKey) + UIO( + Console.putMessageLnB( + ConsoleOut.errorQueueEventOccurred(action, e), + batchMode ) + ) case _: StorageEvent.ShutdownEvent => UIO.unit } } yield () diff --git a/uishell/src/main/scala/net/kemitix/thorp/uishell/UploadEventListener.scala b/uishell/src/main/scala/net/kemitix/thorp/uishell/UploadEventListener.scala index 31addc6..92070d5 100644 --- a/uishell/src/main/scala/net/kemitix/thorp/uishell/UploadEventListener.scala +++ b/uishell/src/main/scala/net/kemitix/thorp/uishell/UploadEventListener.scala @@ -9,13 +9,11 @@ import net.kemitix.thorp.uishell.UploadProgressEvent.RequestEvent object UploadEventListener { - final case class Settings( - uiChannel: UChannel[Any, UIEvent], - localFile: LocalFile, - index: Int, - totalBytesSoFar: Long, - batchMode: Boolean - ) + final case class Settings(uiChannel: UChannel[Any, UIEvent], + localFile: LocalFile, + index: Int, + totalBytesSoFar: Long, + batchMode: Boolean) def listener(settings: Settings): UploadProgressEvent => Unit = { val bytesTransferred = new AtomicLong(0L) @@ -25,10 +23,14 @@ object UploadEventListener { case e: RequestEvent => settings.uiChannel( Message.withBody( - UIEvent.RequestCycle(settings.localFile, - bytesTransferred.addAndGet(e.transferred), - settings.index, - settings.totalBytesSoFar))) + UIEvent.requestCycle( + settings.localFile, + bytesTransferred.addAndGet(e.transferred), + settings.index, + settings.totalBytesSoFar + ) + ) + ) case _ => () } } diff --git a/uishell/src/main/scala/net/kemitix/thorp/uishell/UploadProgressEvent.scala b/uishell/src/main/scala/net/kemitix/thorp/uishell/UploadProgressEvent.scala deleted file mode 100644 index c012402..0000000 --- a/uishell/src/main/scala/net/kemitix/thorp/uishell/UploadProgressEvent.scala +++ /dev/null @@ -1,23 +0,0 @@ -package net.kemitix.thorp.uishell - -sealed trait UploadProgressEvent { - def name: String -} - -object UploadProgressEvent { - - final case class TransferEvent( - name: String - ) extends UploadProgressEvent - - final case class RequestEvent( - name: String, - bytes: Long, - transferred: Long - ) extends UploadProgressEvent - - final case class ByteTransferEvent( - name: String - ) extends UploadProgressEvent - -}