diff --git a/app/src/main/scala/net/kemitix/thorp/Main.scala b/app/src/main/scala/net/kemitix/thorp/Main.scala index 7ee5a7a..b0b57d1 100644 --- a/app/src/main/scala/net/kemitix/thorp/Main.scala +++ b/app/src/main/scala/net/kemitix/thorp/Main.scala @@ -1,6 +1,5 @@ package net.kemitix.thorp -import net.kemitix.thorp.console.Console import net.kemitix.thorp.lib.FileScanner import net.kemitix.thorp.storage.aws.S3Storage import zio.clock.Clock @@ -10,7 +9,6 @@ object Main extends App { object LiveThorpApp extends S3Storage.Live - with Console.Live with Clock.Live with FileScanner.Live diff --git a/app/src/main/scala/net/kemitix/thorp/Program.scala b/app/src/main/scala/net/kemitix/thorp/Program.scala index 91b3582..c23edd2 100644 --- a/app/src/main/scala/net/kemitix/thorp/Program.scala +++ b/app/src/main/scala/net/kemitix/thorp/Program.scala @@ -23,21 +23,24 @@ import scala.jdk.CollectionConverters._ trait Program { - val version = "0.11.0" + val version = "0.11.0" lazy val versionLabel = s"${WHITE}Thorp v$version$RESET" - def run(args: List[String]) - : ZIO[Storage with Console with Clock with FileScanner, Nothing, Unit] = { + def run( + args: List[String] + ): ZIO[Storage with Clock with FileScanner, Nothing, Unit] = { (for { - cli <- CliArgs.parse(args) + cli <- CliArgs.parse(args) config <- IO(ConfigurationBuilder.buildConfig(cli)) - _ <- Console.putStrLn(versionLabel) + _ <- UIO(Console.putStrLn(versionLabel)) _ <- ZIO.when(!showVersion(cli))( - executeWithUI(config).catchAll(handleErrors)) + executeWithUI(config).catchAll(handleErrors) + ) } yield ()) .catchAll(e => { Console.putStrLn("An ERROR occurred:") Console.putStrLn(e.getMessage) + UIO.unit }) } @@ -45,33 +48,30 @@ trait Program { private def showVersion: ConfigOptions => Boolean = cli => ConfigQuery.showVersion(cli) - private def executeWithUI(configuration: Configuration) = + private def executeWithUI( + configuration: Configuration + ): ZIO[Storage with Clock with FileScanner, Throwable, Unit] = for { - uiEventSender <- execute(configuration) + uiEventSender <- execute(configuration) uiEventReceiver <- UIShell.receiver(configuration) - _ <- MessageChannel.pointToPoint(uiEventSender)(uiEventReceiver).runDrain + _ <- MessageChannel.pointToPoint(uiEventSender)(uiEventReceiver).runDrain } yield () type UIChannel = UChannel[Any, UIEvent] - private def execute(configuration: Configuration): ZIO[ - Any, - Nothing, - MessageChannel.ESender[Storage with Clock with FileScanner with Console, - Throwable, - UIEvent]] = UIO { uiChannel => + private def execute( + configuration: Configuration + ): UIO[MessageChannel.ESender[Storage with Clock with FileScanner, + Throwable, + UIEvent]] = UIO { uiChannel => (for { - _ <- showValidConfig(uiChannel) + _ <- showValidConfig(uiChannel) remoteData <- fetchRemoteData(configuration, uiChannel) - archive <- UIO(UnversionedMirrorArchive) - copyUploadEvents <- LocalFileSystem.scanCopyUpload(configuration, - uiChannel, - remoteData, - archive) - deleteEvents <- LocalFileSystem.scanDelete(configuration, - uiChannel, - remoteData, - archive) + archive <- UIO(UnversionedMirrorArchive) + copyUploadEvents <- LocalFileSystem + .scanCopyUpload(configuration, uiChannel, remoteData, archive) + deleteEvents <- LocalFileSystem + .scanDelete(configuration, uiChannel, remoteData, archive) _ <- showSummary(uiChannel)(copyUploadEvents ++ deleteEvents) } yield ()) <* MessageChannel.endChannel(uiChannel) } @@ -79,9 +79,10 @@ trait Program { private def showValidConfig(uiChannel: UIChannel) = Message.create(UIEvent.ShowValidConfig) >>= MessageChannel.send(uiChannel) - private def fetchRemoteData(configuration: Configuration, - uiChannel: UIChannel) - : ZIO[Clock with Storage with Console, Throwable, RemoteObjects] = { + private def fetchRemoteData( + configuration: Configuration, + uiChannel: UIChannel + ): ZIO[Clock with Storage, Throwable, RemoteObjects] = { val bucket = configuration.bucket val prefix = configuration.prefix for { @@ -92,17 +93,21 @@ trait Program { } private def handleErrors(throwable: Throwable) = - Console.putStrLn("There were errors:") *> logValidationErrors(throwable) + UIO(Console.putStrLn("There were errors:")) *> logValidationErrors( + throwable + ) private def logValidationErrors(throwable: Throwable) = throwable match { case validateError: ConfigValidationException => - ZIO.foreach_(validateError.getErrors.asScala)(error => - Console.putStrLn(s"- $error")) + ZIO.foreach_(validateError.getErrors.asScala)( + error => UIO(Console.putStrLn(s"- $error")) + ) } - private def showSummary(uiChannel: UIChannel)( - events: Seq[StorageEvent]): RIO[Clock, Unit] = { + private def showSummary( + uiChannel: UIChannel + )(events: Seq[StorageEvent]): RIO[Clock, Unit] = { val counters = events.foldLeft(Counters.empty)(countActivities) Message.create(UIEvent.ShowSummary(counters)) >>= MessageChannel.send(uiChannel) diff --git a/console/pom.xml b/console/pom.xml index 025719e..0ed6bbd 100644 --- a/console/pom.xml +++ b/console/pom.xml @@ -12,36 +12,17 @@ console + + + org.projectlombok + lombok + true + + net.kemitix.thorp thorp-domain - - - - org.scala-lang - scala-library - - - - - dev.zio - zio_2.13 - - - dev.zio - zio-streams_2.13 - - - - - - net.alchim31.maven - scala-maven-plugin - - - - \ No newline at end of file diff --git a/console/src/main/java/net/kemitix/thorp/console/Console.java b/console/src/main/java/net/kemitix/thorp/console/Console.java new file mode 100644 index 0000000..85c17cc --- /dev/null +++ b/console/src/main/java/net/kemitix/thorp/console/Console.java @@ -0,0 +1,25 @@ +package net.kemitix.thorp.console; + +public interface Console { + static void putStrLn(String line) { + System.out.println(line); + } + static void putStr(String line) { + System.out.print(line); + } + + static void putMessageLnB( + ConsoleOut.WithBatchMode message, + boolean batchMode + ) { + putStrLn( + batchMode + ? message.enBatch() + : message.en()); + } + static void putMessageLn( + ConsoleOut message + ) { + putStrLn(message.en()); + } +} diff --git a/console/src/main/java/net/kemitix/thorp/console/ConsoleOut.java b/console/src/main/java/net/kemitix/thorp/console/ConsoleOut.java new file mode 100644 index 0000000..8d76fd5 --- /dev/null +++ b/console/src/main/java/net/kemitix/thorp/console/ConsoleOut.java @@ -0,0 +1,149 @@ +package net.kemitix.thorp.console; + +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; +import net.kemitix.thorp.domain.*; + +import java.nio.file.Path; +import java.util.Arrays; +import java.util.function.Function; +import java.util.stream.Collectors; + +public interface ConsoleOut { + String en(); + default String eraseToEndOfScreen() { + return Terminal.eraseToEndOfScreen; + } + default String reset() { + return "\u001B[0m"; + } + default String red() { + return "\u001B[31m"; + } + default String green() { + return "\u001B[32m"; + } + interface WithBatchMode extends ConsoleOut, Function { + String enBatch(); + default String selectLine(boolean batchMode) { + return batchMode + ? enBatch() + : en(); + } + @Override + default String apply(Boolean batchMode) { + return selectLine(batchMode); + } + } + + static ConsoleOut validConfig( + Bucket bucket, + RemoteKey prefix, + Sources sources + ) { + return new ValidConfig(bucket, prefix, sources); + } + @RequiredArgsConstructor(access = AccessLevel.PRIVATE) + class ValidConfig implements ConsoleOut { + private final Bucket bucket; + private final RemoteKey prefix; + private final Sources sources; + + @Override + public String en() { + return String.join(", ", Arrays.asList( + "Bucket: " + bucket.name(), + "Prefix: " + prefix.key(), + "Source: " + sources.paths().stream() + .map(Path::toString) + .collect(Collectors.joining(", ")))); + } + } + + static ConsoleOut.WithBatchMode uploadComplete(RemoteKey remoteKey) { + return new UploadComplete(remoteKey); + } + @RequiredArgsConstructor(access = AccessLevel.PRIVATE) + class UploadComplete implements ConsoleOut.WithBatchMode { + private final RemoteKey remoteKey; + @Override + public String en() { + return String.format("%sUploaded:%s %s%s", + green(), reset(), + remoteKey.key(), + eraseToEndOfScreen()); + } + @Override + public String enBatch() { + return String.format("Uploaded: %s", remoteKey.key()); + } + } + + static ConsoleOut.WithBatchMode copyComplete(RemoteKey sourceKey, RemoteKey targetKey) { + return new CopyComplete(sourceKey, targetKey); + } + @RequiredArgsConstructor(access = AccessLevel.PRIVATE) + class CopyComplete implements ConsoleOut.WithBatchMode { + private final RemoteKey sourceKey; + private final RemoteKey targetKey; + @Override + public String en() { + return String.format("%sCopied:%s %s => %s%s", + green(), reset(), + sourceKey.key(), + targetKey.key(), + eraseToEndOfScreen()); + } + @Override + public String enBatch() { + return String.format("Copied: %s => %s", + sourceKey.key(), targetKey.key()); + } + } + static ConsoleOut.WithBatchMode deleteComplete(RemoteKey remoteKey) { + return new DeleteComplete(remoteKey); + } + @RequiredArgsConstructor(access = AccessLevel.PRIVATE) + class DeleteComplete implements WithBatchMode { + private final RemoteKey remoteKey; + @Override + public String en() { + return String.format("Deleted: %s", remoteKey); + } + @Override + public String enBatch() { + return String.format("%sDeleted%s: %s%s", + green(), reset(), + remoteKey, + eraseToEndOfScreen()); + } + } + static ConsoleOut.WithBatchMode errorQueueEventOccurred( + StorageEvent.ActionSummary actionSummary, + Throwable error + ) { + return new ErrorQueueEventOccurred(actionSummary, error); + } + @RequiredArgsConstructor(access = AccessLevel.PRIVATE) + class ErrorQueueEventOccurred implements WithBatchMode { + private final StorageEvent.ActionSummary actionSummary; + private final Throwable error; + @Override + public String en() { + return String.format("%s failed: %s: %s", + actionSummary.name(), + actionSummary.keys(), + error.getMessage()); + } + @Override + public String enBatch() { + return String.format("%sERROR%s: %s %s: %s%s", + red(), reset(), + actionSummary.name(), + actionSummary.keys(), + error.getMessage(), + eraseToEndOfScreen()); + } + } + +} diff --git a/console/src/main/scala/net/kemitix/thorp/console/Console.scala b/console/src/main/scala/net/kemitix/thorp/console/Console.scala deleted file mode 100644 index 1cf0372..0000000 --- a/console/src/main/scala/net/kemitix/thorp/console/Console.scala +++ /dev/null @@ -1,81 +0,0 @@ -package net.kemitix.thorp.console - -import java.io.PrintStream -import java.util.concurrent.atomic.AtomicReference - -import zio.{UIO, ZIO} - -import scala.{Console => SConsole} - -trait Console { - val console: Console.Service -} - -object Console { - - trait Service { - def putMessageLn(line: ConsoleOut): ZIO[Console, Nothing, Unit] - def putStrLn(line: String): ZIO[Console, Nothing, Unit] - def putStr(line: String): ZIO[Console, Nothing, Unit] - } - - trait Live extends Console { - val console: Service = new Service { - override def putMessageLn(line: ConsoleOut): ZIO[Console, Nothing, Unit] = - putStrLn(line.en) - override def putStrLn(line: String): ZIO[Console, Nothing, Unit] = - putStrLnPrintStream(SConsole.out)(line) - override def putStr(line: String): ZIO[Console, Nothing, Unit] = - putStrPrintStream(SConsole.out)(line) - final def putStrLnPrintStream(stream: PrintStream)( - line: String): ZIO[Console, Nothing, Unit] = - UIO(SConsole.withOut(stream)(SConsole.println(line))) - final def putStrPrintStream(stream: PrintStream)( - line: String): ZIO[Console, Nothing, Unit] = - UIO(SConsole.withOut(stream)(SConsole.print(line))) - - } - } - - object Live extends Live - - trait Test extends Console { - - private val output = new AtomicReference(List.empty[String]) - def getOutput: List[String] = output.get - - val console: Service = new Service { - override def putMessageLn(line: ConsoleOut): ZIO[Console, Nothing, Unit] = - putStrLn(line.en) - - override def putStrLn(line: String): ZIO[Console, Nothing, Unit] = { - val _ = output.accumulateAndGet(List(line), (a, b) => a ++ b) - ZIO.succeed(()) - } - - override def putStr(line: String): ZIO[Console, Nothing, Unit] = { - val _ = output.accumulateAndGet(List(line), (a, b) => a ++ b) - ZIO.succeed(()) - } - } - } - - object Test extends Test - - final val consoleService: ZIO[Console, Nothing, Console.Service] = - ZIO.access(_.console) - - final def putStrLn(line: String): ZIO[Console, Nothing, Unit] = - ZIO.accessM(_.console putStrLn line) - - final def putStr(line: String): ZIO[Console, Nothing, Unit] = - ZIO.accessM(_.console.putStr(line)) - - final def putMessageLn(line: ConsoleOut): ZIO[Console, Nothing, Unit] = - ZIO.accessM(_.console putMessageLn line) - - final def putMessageLnB(line: ConsoleOut.WithBatchMode, - batchMode: Boolean): ZIO[Console, Nothing, Unit] = - ZIO.accessM(line(batchMode) >>= _.console.putStrLn) - -} diff --git a/console/src/main/scala/net/kemitix/thorp/console/ConsoleOut.scala b/console/src/main/scala/net/kemitix/thorp/console/ConsoleOut.scala deleted file mode 100644 index 1fa3d92..0000000 --- a/console/src/main/scala/net/kemitix/thorp/console/ConsoleOut.scala +++ /dev/null @@ -1,70 +0,0 @@ -package net.kemitix.thorp.console - -import scala.jdk.CollectionConverters._ -import net.kemitix.thorp.domain.StorageEvent.ActionSummary -import net.kemitix.thorp.domain.Terminal._ -import net.kemitix.thorp.domain.{Bucket, RemoteKey, Sources} -import zio.UIO - -import scala.io.AnsiColor._ - -sealed trait ConsoleOut { - def en: String -} - -object ConsoleOut { - - sealed trait WithBatchMode { - def en: String - def enBatch: String - def apply(batchMode: Boolean): UIO[String] = - selectLine(batchMode) - private def selectLine(batchMode: Boolean) = - if (batchMode) UIO(enBatch) else UIO(en) - } - - final case class ValidConfig( - bucket: Bucket, - prefix: RemoteKey, - sources: Sources - ) extends ConsoleOut { - private val sourcesList = sources.paths.asScala.mkString(", ") - override def en: String = - List(s"Bucket: ${bucket.name}", - s"Prefix: ${prefix.key}", - s"Source: $sourcesList") - .mkString(", ") - } - - final case class UploadComplete(remoteKey: RemoteKey) - extends ConsoleOut.WithBatchMode { - override def en: String = - s"${GREEN}Uploaded:$RESET ${remoteKey.key}$eraseToEndOfScreen" - override def enBatch: String = - s"Uploaded: ${remoteKey.key}" - } - - final case class CopyComplete(sourceKey: RemoteKey, targetKey: RemoteKey) - extends ConsoleOut.WithBatchMode { - override def en: String = - s"${GREEN}Copied:$RESET ${sourceKey.key} => ${targetKey.key}$eraseToEndOfScreen" - override def enBatch: String = - s"Copied: ${sourceKey.key} => ${targetKey.key}" - } - - final case class DeleteComplete(remoteKey: RemoteKey) - extends ConsoleOut.WithBatchMode { - override def en: String = - s"${GREEN}Deleted:$RESET ${remoteKey.key}$eraseToEndOfScreen" - override def enBatch: String = - s"Deleted: ${remoteKey.key}" - } - - final case class ErrorQueueEventOccurred(action: ActionSummary, e: Throwable) - extends ConsoleOut.WithBatchMode { - override def en: String = - s"${RED}ERROR:$RESET ${action.name} ${action.keys}: ${e.getMessage}$eraseToEndOfScreen" - override def enBatch: String = - s"${action.name} failed: ${action.keys}: ${e.getMessage}" - } -} diff --git a/docs/images/reactor-graph.png b/docs/images/reactor-graph.png index e8642e8..922c497 100644 Binary files a/docs/images/reactor-graph.png and b/docs/images/reactor-graph.png differ diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/ThorpArchive.scala b/lib/src/main/scala/net/kemitix/thorp/lib/ThorpArchive.scala index 65d448c..5550c6e 100644 --- a/lib/src/main/scala/net/kemitix/thorp/lib/ThorpArchive.scala +++ b/lib/src/main/scala/net/kemitix/thorp/lib/ThorpArchive.scala @@ -2,54 +2,64 @@ package net.kemitix.thorp.lib import net.kemitix.eip.zio.MessageChannel.UChannel import net.kemitix.thorp.config.Configuration -import net.kemitix.thorp.console.ConsoleOut.{ - CopyComplete, - DeleteComplete, - ErrorQueueEventOccurred, - UploadComplete -} import net.kemitix.thorp.console._ import net.kemitix.thorp.domain.StorageEvent import net.kemitix.thorp.domain.StorageEvent._ import net.kemitix.thorp.storage.Storage import net.kemitix.thorp.uishell.UIEvent -import zio.{RIO, ZIO} +import zio.{UIO, ZIO} trait ThorpArchive { - def update( - configuration: Configuration, - uiChannel: UChannel[Any, UIEvent], - sequencedAction: SequencedAction, - totalBytesSoFar: Long - ): ZIO[Storage, Nothing, StorageEvent] + def update(configuration: Configuration, + uiChannel: UChannel[Any, UIEvent], + sequencedAction: SequencedAction, + totalBytesSoFar: Long): ZIO[Storage, Nothing, StorageEvent] def logEvent(configuration: Configuration, - event: StorageEvent): RIO[Console, StorageEvent] = { + event: StorageEvent): UIO[StorageEvent] = { val batchMode = configuration.batchMode for { sqe <- event match { case uploadEvent: UploadEvent => val remoteKey = uploadEvent.remoteKey - ZIO(event) <* Console.putMessageLnB(UploadComplete(remoteKey), - batchMode) + UIO(event) <* { + Console.putMessageLnB( + ConsoleOut.uploadComplete(remoteKey), + batchMode + ) + UIO.unit + } case copyEvent: CopyEvent => val sourceKey = copyEvent.sourceKey val targetKey = copyEvent.targetKey - ZIO(event) <* Console.putMessageLnB( - CopyComplete(sourceKey, targetKey), - batchMode) + UIO(event) <* { + Console.putMessageLnB( + ConsoleOut.copyComplete(sourceKey, targetKey), + batchMode + ) + UIO.unit + } case deleteEvent: DeleteEvent => val remoteKey = deleteEvent.remoteKey - ZIO(event) <* Console.putMessageLnB(DeleteComplete(remoteKey), - batchMode) + UIO(event) <* { + Console.putMessageLnB( + ConsoleOut.deleteComplete(remoteKey), + batchMode + ) + UIO.unit + } case errorEvent: ErrorEvent => val action = errorEvent.action - val e = errorEvent.e - ZIO(event) <* Console.putMessageLnB( - ErrorQueueEventOccurred(action, e), - batchMode) - case _ => ZIO(event) + val e = errorEvent.e + UIO(event) <* { + Console.putMessageLnB( + ConsoleOut.errorQueueEventOccurred(action, e), + batchMode + ) + UIO.unit + } + case _ => UIO(event) } } yield sqe } diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala index 1a6e6b1..66d1223 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala @@ -2,7 +2,6 @@ package net.kemitix.thorp.storage.aws import com.amazonaws.services.s3.AmazonS3ClientBuilder import com.amazonaws.services.s3.transfer.TransferManagerBuilder -import net.kemitix.thorp.console.Console import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.Storage import net.kemitix.thorp.storage.Storage.Service @@ -17,22 +16,20 @@ object S3Storage { AmazonS3Client.create(AmazonS3ClientBuilder.standard().build()) private val transferManager: S3TransferManager = S3TransferManager.create(TransferManagerBuilder.defaultTransferManager) - private val copier = S3Copier.copier(client) + private val copier = S3Copier.copier(client) private val uploader = S3Uploader.uploader(transferManager) - private val deleter = S3Deleter.deleter(client) - private val lister = S3Lister.lister(client) + private val deleter = S3Deleter.deleter(client) + private val lister = S3Lister.lister(client) - override def listObjects( - bucket: Bucket, - prefix: RemoteKey): RIO[Storage with Console, RemoteObjects] = + override def listObjects(bucket: Bucket, + prefix: RemoteKey): RIO[Storage, RemoteObjects] = UIO { lister(S3Lister.request(bucket, prefix)) } - override def upload( - localFile: LocalFile, - bucket: Bucket, - listenerSettings: UploadEventListener.Settings, + override def upload(localFile: LocalFile, + bucket: Bucket, + listenerSettings: UploadEventListener.Settings, ): UIO[StorageEvent] = UIO { uploader(S3Uploader.request(localFile, bucket)) diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala index 93dd1b6..d273b5f 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala @@ -1,6 +1,5 @@ package net.kemitix.thorp.storage.aws -import net.kemitix.thorp.console.Console import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.Storage import net.kemitix.thorp.uishell.UploadEventListener @@ -12,51 +11,46 @@ trait AmazonS3ClientTestFixture extends MockFactory { @SuppressWarnings(Array("org.wartremover.warts.PublicInference")) private val manager = stub[S3TransferManager] @SuppressWarnings(Array("org.wartremover.warts.PublicInference")) - private val client = stub[AmazonS3Client] + private val client = stub[AmazonS3Client] val fixture: Fixture = Fixture(client, manager) - case class Fixture( - amazonS3Client: AmazonS3Client, - amazonS3TransferManager: S3TransferManager, + case class Fixture(amazonS3Client: AmazonS3Client, + amazonS3TransferManager: S3TransferManager, ) { lazy val storageService: Storage.Service = new Storage.Service { - private val client = amazonS3Client + private val client = amazonS3Client private val transferManager = amazonS3TransferManager override def listObjects( - bucket: Bucket, - prefix: RemoteKey - ): RIO[Storage with Console, RemoteObjects] = + bucket: Bucket, + prefix: RemoteKey + ): RIO[Storage, RemoteObjects] = UIO { S3Lister.lister(client)(S3Lister.request(bucket, prefix)) } - override def upload( - localFile: LocalFile, - bucket: Bucket, - listenerSettings: UploadEventListener.Settings, + override def upload(localFile: LocalFile, + bucket: Bucket, + listenerSettings: UploadEventListener.Settings, ): UIO[StorageEvent] = UIO( - S3Uploader.uploader(transferManager)( - S3Uploader.request(localFile, bucket))) + S3Uploader + .uploader(transferManager)(S3Uploader.request(localFile, bucket)) + ) - override def copy( - bucket: Bucket, - sourceKey: RemoteKey, - hash: MD5Hash, - targetKey: RemoteKey - ): UIO[StorageEvent] = + override def copy(bucket: Bucket, + sourceKey: RemoteKey, + hash: MD5Hash, + targetKey: RemoteKey): UIO[StorageEvent] = UIO { val request = S3Copier.request(bucket, sourceKey, hash, targetKey) S3Copier.copier(client)(request) } - override def delete( - bucket: Bucket, - remoteKey: RemoteKey - ): UIO[StorageEvent] = + override def delete(bucket: Bucket, + remoteKey: RemoteKey): UIO[StorageEvent] = UIO(S3Deleter.deleter(client)(S3Deleter.request(bucket, remoteKey))) override def shutdown: UIO[StorageEvent] = { diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ListerTest.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ListerTest.scala index 2583b5b..f279b1e 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ListerTest.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ListerTest.scala @@ -107,8 +107,8 @@ class ListerTest extends FreeSpec { // } // def invoke(amazonS3Client: AmazonS3Client.Client)(bucket: Bucket, // prefix: RemoteKey) = { -// object TestEnv extends Storage.Test with Console.Test -// val program: RIO[Storage with Console, RemoteObjects] = Lister +// object TestEnv extends Storage.Test +// val program: RIO[Storage, RemoteObjects] = Lister // .listObjects(amazonS3Client)(bucket, prefix) // val runtime = new DefaultRuntime {} // runtime.unsafeRunSync(program.provide(TestEnv)).toEither diff --git a/storage/src/main/scala/net/kemitix/thorp/storage/Storage.scala b/storage/src/main/scala/net/kemitix/thorp/storage/Storage.scala index fd1ac36..4898de4 100644 --- a/storage/src/main/scala/net/kemitix/thorp/storage/Storage.scala +++ b/storage/src/main/scala/net/kemitix/thorp/storage/Storage.scala @@ -1,6 +1,5 @@ package net.kemitix.thorp.storage -import net.kemitix.thorp.console.Console import net.kemitix.thorp.domain._ import net.kemitix.thorp.uishell.UploadEventListener import zio.{RIO, Task, UIO, ZIO} @@ -12,28 +11,20 @@ trait Storage { object Storage { trait Service { - def listObjects( - bucket: Bucket, - prefix: RemoteKey - ): RIO[Storage with Console, RemoteObjects] + def listObjects(bucket: Bucket, + prefix: RemoteKey): RIO[Storage, RemoteObjects] - def upload( - localFile: LocalFile, - bucket: Bucket, - listenerSettings: UploadEventListener.Settings, + def upload(localFile: LocalFile, + bucket: Bucket, + listenerSettings: UploadEventListener.Settings, ): ZIO[Storage, Nothing, StorageEvent] - def copy( - bucket: Bucket, - sourceKey: RemoteKey, - hash: MD5Hash, - targetKey: RemoteKey - ): ZIO[Storage, Nothing, StorageEvent] + def copy(bucket: Bucket, + sourceKey: RemoteKey, + hash: MD5Hash, + targetKey: RemoteKey): ZIO[Storage, Nothing, StorageEvent] - def delete( - bucket: Bucket, - remoteKey: RemoteKey - ): UIO[StorageEvent] + def delete(bucket: Bucket, remoteKey: RemoteKey): UIO[StorageEvent] def shutdown: UIO[StorageEvent] } @@ -58,17 +49,18 @@ object Storage { listResult override def upload( - localFile: LocalFile, - bucket: Bucket, - listenerSettings: UploadEventListener.Settings + localFile: LocalFile, + bucket: Bucket, + listenerSettings: UploadEventListener.Settings ): ZIO[Storage, Nothing, StorageEvent] = uploadResult override def copy( - bucket: Bucket, - sourceKey: RemoteKey, - hash: MD5Hash, - targetKey: RemoteKey): ZIO[Storage, Nothing, StorageEvent] = + bucket: Bucket, + sourceKey: RemoteKey, + hash: MD5Hash, + targetKey: RemoteKey + ): ZIO[Storage, Nothing, StorageEvent] = copyResult override def delete(bucket: Bucket, @@ -84,28 +76,24 @@ object Storage { object Test extends Test final def list(bucket: Bucket, - prefix: RemoteKey): RIO[Storage with Console, RemoteObjects] = + prefix: RemoteKey): RIO[Storage, RemoteObjects] = ZIO.accessM(_.storage listObjects (bucket, prefix)) final def upload( - localFile: LocalFile, - bucket: Bucket, - listenerSettings: UploadEventListener.Settings + localFile: LocalFile, + bucket: Bucket, + listenerSettings: UploadEventListener.Settings ): ZIO[Storage, Nothing, StorageEvent] = ZIO.accessM(_.storage upload (localFile, bucket, listenerSettings)) - final def copy( - bucket: Bucket, - sourceKey: RemoteKey, - hash: MD5Hash, - targetKey: RemoteKey - ): ZIO[Storage, Nothing, StorageEvent] = + final def copy(bucket: Bucket, + sourceKey: RemoteKey, + hash: MD5Hash, + targetKey: RemoteKey): ZIO[Storage, Nothing, StorageEvent] = ZIO.accessM(_.storage copy (bucket, sourceKey, hash, targetKey)) - final def delete( - bucket: Bucket, - remoteKey: RemoteKey - ): ZIO[Storage, Nothing, StorageEvent] = + final def delete(bucket: Bucket, + remoteKey: RemoteKey): ZIO[Storage, Nothing, StorageEvent] = ZIO.accessM(_.storage delete (bucket, remoteKey)) } diff --git a/uishell/src/main/scala/net/kemitix/thorp/uishell/ProgressUI.scala b/uishell/src/main/scala/net/kemitix/thorp/uishell/ProgressUI.scala index a8c0282..af2c5be 100644 --- a/uishell/src/main/scala/net/kemitix/thorp/uishell/ProgressUI.scala +++ b/uishell/src/main/scala/net/kemitix/thorp/uishell/ProgressUI.scala @@ -24,52 +24,57 @@ object ProgressUI { localFile: LocalFile, bytesTransferred: Long, index: Int, - totalBytesSoFar: Long): ZIO[Console, Nothing, Unit] = + totalBytesSoFar: Long): UIO[Unit] = for { _ <- ZIO.when(bytesTransferred < localFile.file.length())( - stillUploading(localFile.remoteKey, - localFile.file.length(), - bytesTransferred)) + stillUploading( + localFile.remoteKey, + localFile.file.length(), + bytesTransferred + ) + ) _ <- ZIO.when(bytesTransferred >= localFile.file.length()) { finishedUploading(localFile.remoteKey) } } yield () - private def stillUploading( - remoteKey: RemoteKey, - fileLength: Long, - bytesTransferred: Long - ): ZIO[Console, Nothing, Unit] = { + private def stillUploading(remoteKey: RemoteKey, + fileLength: Long, + bytesTransferred: Long): UIO[Unit] = { val current: Map[RemoteKey, UploadState] = - uploads.updateAndGet((m: Map[RemoteKey, UploadState]) => - m.updated(remoteKey, UploadState(bytesTransferred, fileLength))) + uploads.updateAndGet( + (m: Map[RemoteKey, UploadState]) => + m.updated(remoteKey, UploadState(bytesTransferred, fileLength)) + ) val resetCursor = s"${Terminal.cursorPrevLine(statusHeight) * current.size}" ZIO.foreach(current) { entry => { val (remoteKey, state) = entry - val percent = f"${(state.transferred * 100) / state.fileLength}%2d" + val percent = f"${(state.transferred * 100) / state.fileLength}%2d" val transferred = sizeInEnglish(state.transferred) - val fileLength = sizeInEnglish(state.fileLength) + val fileLength = sizeInEnglish(state.fileLength) val line1 = s"${GREEN}Uploading:$RESET ${remoteKey.key}$eraseLineForward" val line2body = s"($percent%) $transferred of $fileLength " val bar = - progressBar(state.transferred.toDouble, - state.fileLength.toDouble, - Terminal.width - line2body.length) + progressBar( + state.transferred.toDouble, + state.fileLength.toDouble, + Terminal.width - line2body.length + ) val line2 = s"$GREEN$line2body$RESET$bar$eraseLineForward" - Console.putStrLn(line1) *> - Console.putStrLn(line2) + UIO(Console.putStrLn(line1)) *> + UIO(Console.putStrLn(line2)) } - } *> Console.putStr(resetCursor) + } *> UIO(Console.putStr(resetCursor)) } - def finishedUploading( - remoteKey: RemoteKey - ): ZIO[Any, Nothing, Unit] = { - UIO(uploads.updateAndGet((m: Map[RemoteKey, UploadState]) => - m.removed(remoteKey))) *> UIO.unit + def finishedUploading(remoteKey: RemoteKey): ZIO[Any, Nothing, Unit] = { + UIO( + uploads + .updateAndGet((m: Map[RemoteKey, UploadState]) => m.removed(remoteKey)) + ) *> UIO.unit } } diff --git a/uishell/src/main/scala/net/kemitix/thorp/uishell/UIShell.scala b/uishell/src/main/scala/net/kemitix/thorp/uishell/UIShell.scala index b678d80..0e70a57 100644 --- a/uishell/src/main/scala/net/kemitix/thorp/uishell/UIShell.scala +++ b/uishell/src/main/scala/net/kemitix/thorp/uishell/UIShell.scala @@ -2,12 +2,6 @@ package net.kemitix.thorp.uishell import net.kemitix.eip.zio.MessageChannel import net.kemitix.thorp.config.Configuration -import net.kemitix.thorp.console.ConsoleOut.{ - CopyComplete, - DeleteComplete, - ErrorQueueEventOccurred, - UploadComplete -} import net.kemitix.thorp.console.{Console, ConsoleOut} import net.kemitix.thorp.domain.Terminal.{eraseLineForward, eraseToEndOfScreen} import net.kemitix.thorp.domain._ @@ -15,8 +9,9 @@ import zio.{UIO, ZIO} object UIShell { - def receiver(configuration: Configuration) - : UIO[MessageChannel.UReceiver[Console, UIEvent]] = + def receiver( + configuration: Configuration + ): UIO[MessageChannel.UReceiver[Any, UIEvent]] = UIO { uiEventMessage => uiEventMessage.body match { case UIEvent.ShowValidConfig => showValidConfig(configuration) @@ -31,80 +26,103 @@ object UIShell { case UIEvent.ActionFinished(_, _, _, event) => actionFinished(configuration, event) case UIEvent.KeyFound(_) => UIO(()) - case UIEvent.RequestCycle(localFile, - bytesTransferred, - index, - totalBytesSoFar) => - ProgressUI.requestCycle(configuration, - localFile, - bytesTransferred, - index, - totalBytesSoFar) + case UIEvent.RequestCycle( + localFile, + bytesTransferred, + index, + totalBytesSoFar + ) => + ProgressUI.requestCycle( + configuration, + localFile, + bytesTransferred, + index, + totalBytesSoFar + ) } } - private def actionFinished( - configuration: Configuration, - event: StorageEvent): ZIO[Console, Nothing, Unit] = { + private def actionFinished(configuration: Configuration, + event: StorageEvent): UIO[Unit] = { val batchMode = configuration.batchMode for { _ <- event match { case _: StorageEvent.DoNothingEvent => UIO.unit - case copyEvent: StorageEvent.CopyEvent => { + case copyEvent: StorageEvent.CopyEvent => val sourceKey = copyEvent.sourceKey val targetKey = copyEvent.targetKey - Console.putMessageLnB(CopyComplete(sourceKey, targetKey), batchMode) - } - case uploadEvent: StorageEvent.UploadEvent => { + Console.putMessageLnB( + ConsoleOut.copyComplete(sourceKey, targetKey), + batchMode + ) + UIO.unit + case uploadEvent: StorageEvent.UploadEvent => val remoteKey = uploadEvent.remoteKey - ProgressUI.finishedUploading(remoteKey) *> - Console.putMessageLnB(UploadComplete(remoteKey), batchMode) - } - case deleteEvent: StorageEvent.DeleteEvent => { + Console + .putMessageLnB(ConsoleOut.uploadComplete(remoteKey), batchMode) + ProgressUI.finishedUploading(remoteKey) + case deleteEvent: StorageEvent.DeleteEvent => val remoteKey = deleteEvent.remoteKey - Console.putMessageLnB(DeleteComplete(remoteKey), batchMode) - } - case errorEvent: StorageEvent.ErrorEvent => { + Console.putMessageLnB(ConsoleOut.deleteComplete(remoteKey), batchMode) + UIO.unit + case errorEvent: StorageEvent.ErrorEvent => val remoteKey = errorEvent.remoteKey - val action = errorEvent.action - val e = errorEvent.e + val action = errorEvent.action + val e = errorEvent.e ProgressUI.finishedUploading(remoteKey) *> - Console.putMessageLnB(ErrorQueueEventOccurred(action, e), batchMode) - } + UIO( + Console.putMessageLnB( + ConsoleOut.errorQueueEventOccurred(action, e), + batchMode + ) + ) case _: StorageEvent.ShutdownEvent => UIO.unit } } yield () } - private def uploadWaitComplete(action: Action): ZIO[Console, Nothing, Unit] = + private def uploadWaitComplete(action: Action): UIO[Unit] = { Console.putStrLn(s"Finished waiting to other upload - now $action") + UIO.unit + } - private def awaitingUpload(remoteKey: RemoteKey, - hash: MD5Hash): ZIO[Console, Nothing, Unit] = + private def awaitingUpload(remoteKey: RemoteKey, hash: MD5Hash): UIO[Unit] = { Console.putStrLn( - s"Awaiting another upload of $hash before copying it to $remoteKey") - + s"Awaiting another upload of $hash before copying it to $remoteKey" + ) + UIO.unit + } private def fileFound(configuration: Configuration, - localFile: LocalFile): ZIO[Console, Nothing, Unit] = - ZIO.when(configuration.batchMode)( - Console.putStrLn(s"Found: ${localFile.file}")) + localFile: LocalFile): UIO[Unit] = + ZIO.when(configuration.batchMode) { + Console.putStrLn(s"Found: ${localFile.file}") + UIO.unit + } - private def showSummary(counters: Counters): ZIO[Console, Nothing, Unit] = - Console.putStrLn(eraseToEndOfScreen) *> - Console.putStrLn(s"Uploaded ${counters.uploaded} files") *> - Console.putStrLn(s"Copied ${counters.copied} files") *> - Console.putStrLn(s"Deleted ${counters.deleted} files") *> - Console.putStrLn(s"Errors ${counters.errors}") + private def showSummary(counters: Counters): UIO[Unit] = { + Console.putStrLn(eraseToEndOfScreen) + Console.putStrLn(s"Uploaded ${counters.uploaded} files") + Console.putStrLn(s"Copied ${counters.copied} files") + Console.putStrLn(s"Deleted ${counters.deleted} files") + Console.putStrLn(s"Errors ${counters.errors}") + UIO.unit + } - private def remoteDataFetched(size: Int): ZIO[Console, Nothing, Unit] = + private def remoteDataFetched(size: Int): UIO[Unit] = { Console.putStrLn(s"Found $size remote objects") + UIO.unit + } - private def showValidConfig( - configuration: Configuration): ZIO[Console, Nothing, Unit] = + private def showValidConfig(configuration: Configuration): UIO[Unit] = { Console.putMessageLn( - ConsoleOut.ValidConfig(configuration.bucket, - configuration.prefix, - configuration.sources)) + ConsoleOut.validConfig( + configuration.bucket, + configuration.prefix, + configuration.sources + ) + ) + UIO.unit + } def trimHead(str: String): String = { val width = Terminal.width @@ -114,14 +132,11 @@ object UIShell { } } - def actionChosen(configuration: Configuration, - action: Action): ZIO[Console, Nothing, Unit] = { + def actionChosen(configuration: Configuration, action: Action): UIO[Unit] = { val message = trimHead(action.asString()) + eraseLineForward - val batch = configuration.batchMode - for { - _ <- ZIO.when(!batch) { Console.putStr(message + "\r") } - _ <- ZIO.when(batch) { Console.putStrLn(message) } - } yield () + if (configuration.batchMode) Console.putStr(message + "\r") + else Console.putStrLn(message) + UIO.unit } }