Convert console module to Java (#471)

* console.ConsoleOut: convert to Java

* console.Console: convert to Java

* console: remove scala dependencies and plugins

* console.ConsoleOut: remove lingering scala import
This commit is contained in:
Paul Campbell 2020-06-21 17:46:14 +01:00 committed by GitHub
parent c793d833ae
commit 53eaeeb75f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 418 additions and 402 deletions

View file

@ -1,6 +1,5 @@
package net.kemitix.thorp package net.kemitix.thorp
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.lib.FileScanner import net.kemitix.thorp.lib.FileScanner
import net.kemitix.thorp.storage.aws.S3Storage import net.kemitix.thorp.storage.aws.S3Storage
import zio.clock.Clock import zio.clock.Clock
@ -10,7 +9,6 @@ object Main extends App {
object LiveThorpApp object LiveThorpApp
extends S3Storage.Live extends S3Storage.Live
with Console.Live
with Clock.Live with Clock.Live
with FileScanner.Live with FileScanner.Live

View file

@ -23,21 +23,24 @@ import scala.jdk.CollectionConverters._
trait Program { trait Program {
val version = "0.11.0" val version = "0.11.0"
lazy val versionLabel = s"${WHITE}Thorp v$version$RESET" lazy val versionLabel = s"${WHITE}Thorp v$version$RESET"
def run(args: List[String]) def run(
: ZIO[Storage with Console with Clock with FileScanner, Nothing, Unit] = { args: List[String]
): ZIO[Storage with Clock with FileScanner, Nothing, Unit] = {
(for { (for {
cli <- CliArgs.parse(args) cli <- CliArgs.parse(args)
config <- IO(ConfigurationBuilder.buildConfig(cli)) config <- IO(ConfigurationBuilder.buildConfig(cli))
_ <- Console.putStrLn(versionLabel) _ <- UIO(Console.putStrLn(versionLabel))
_ <- ZIO.when(!showVersion(cli))( _ <- ZIO.when(!showVersion(cli))(
executeWithUI(config).catchAll(handleErrors)) executeWithUI(config).catchAll(handleErrors)
)
} yield ()) } yield ())
.catchAll(e => { .catchAll(e => {
Console.putStrLn("An ERROR occurred:") Console.putStrLn("An ERROR occurred:")
Console.putStrLn(e.getMessage) Console.putStrLn(e.getMessage)
UIO.unit
}) })
} }
@ -45,33 +48,30 @@ trait Program {
private def showVersion: ConfigOptions => Boolean = private def showVersion: ConfigOptions => Boolean =
cli => ConfigQuery.showVersion(cli) cli => ConfigQuery.showVersion(cli)
private def executeWithUI(configuration: Configuration) = private def executeWithUI(
configuration: Configuration
): ZIO[Storage with Clock with FileScanner, Throwable, Unit] =
for { for {
uiEventSender <- execute(configuration) uiEventSender <- execute(configuration)
uiEventReceiver <- UIShell.receiver(configuration) uiEventReceiver <- UIShell.receiver(configuration)
_ <- MessageChannel.pointToPoint(uiEventSender)(uiEventReceiver).runDrain _ <- MessageChannel.pointToPoint(uiEventSender)(uiEventReceiver).runDrain
} yield () } yield ()
type UIChannel = UChannel[Any, UIEvent] type UIChannel = UChannel[Any, UIEvent]
private def execute(configuration: Configuration): ZIO[ private def execute(
Any, configuration: Configuration
Nothing, ): UIO[MessageChannel.ESender[Storage with Clock with FileScanner,
MessageChannel.ESender[Storage with Clock with FileScanner with Console, Throwable,
Throwable, UIEvent]] = UIO { uiChannel =>
UIEvent]] = UIO { uiChannel =>
(for { (for {
_ <- showValidConfig(uiChannel) _ <- showValidConfig(uiChannel)
remoteData <- fetchRemoteData(configuration, uiChannel) remoteData <- fetchRemoteData(configuration, uiChannel)
archive <- UIO(UnversionedMirrorArchive) archive <- UIO(UnversionedMirrorArchive)
copyUploadEvents <- LocalFileSystem.scanCopyUpload(configuration, copyUploadEvents <- LocalFileSystem
uiChannel, .scanCopyUpload(configuration, uiChannel, remoteData, archive)
remoteData, deleteEvents <- LocalFileSystem
archive) .scanDelete(configuration, uiChannel, remoteData, archive)
deleteEvents <- LocalFileSystem.scanDelete(configuration,
uiChannel,
remoteData,
archive)
_ <- showSummary(uiChannel)(copyUploadEvents ++ deleteEvents) _ <- showSummary(uiChannel)(copyUploadEvents ++ deleteEvents)
} yield ()) <* MessageChannel.endChannel(uiChannel) } yield ()) <* MessageChannel.endChannel(uiChannel)
} }
@ -79,9 +79,10 @@ trait Program {
private def showValidConfig(uiChannel: UIChannel) = private def showValidConfig(uiChannel: UIChannel) =
Message.create(UIEvent.ShowValidConfig) >>= MessageChannel.send(uiChannel) Message.create(UIEvent.ShowValidConfig) >>= MessageChannel.send(uiChannel)
private def fetchRemoteData(configuration: Configuration, private def fetchRemoteData(
uiChannel: UIChannel) configuration: Configuration,
: ZIO[Clock with Storage with Console, Throwable, RemoteObjects] = { uiChannel: UIChannel
): ZIO[Clock with Storage, Throwable, RemoteObjects] = {
val bucket = configuration.bucket val bucket = configuration.bucket
val prefix = configuration.prefix val prefix = configuration.prefix
for { for {
@ -92,17 +93,21 @@ trait Program {
} }
private def handleErrors(throwable: Throwable) = private def handleErrors(throwable: Throwable) =
Console.putStrLn("There were errors:") *> logValidationErrors(throwable) UIO(Console.putStrLn("There were errors:")) *> logValidationErrors(
throwable
)
private def logValidationErrors(throwable: Throwable) = private def logValidationErrors(throwable: Throwable) =
throwable match { throwable match {
case validateError: ConfigValidationException => case validateError: ConfigValidationException =>
ZIO.foreach_(validateError.getErrors.asScala)(error => ZIO.foreach_(validateError.getErrors.asScala)(
Console.putStrLn(s"- $error")) error => UIO(Console.putStrLn(s"- $error"))
)
} }
private def showSummary(uiChannel: UIChannel)( private def showSummary(
events: Seq[StorageEvent]): RIO[Clock, Unit] = { uiChannel: UIChannel
)(events: Seq[StorageEvent]): RIO[Clock, Unit] = {
val counters = events.foldLeft(Counters.empty)(countActivities) val counters = events.foldLeft(Counters.empty)(countActivities)
Message.create(UIEvent.ShowSummary(counters)) >>= Message.create(UIEvent.ShowSummary(counters)) >>=
MessageChannel.send(uiChannel) MessageChannel.send(uiChannel)

View file

@ -12,36 +12,17 @@
<name>console</name> <name>console</name>
<dependencies> <dependencies>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- thorp --> <!-- thorp -->
<dependency> <dependency>
<groupId>net.kemitix.thorp</groupId> <groupId>net.kemitix.thorp</groupId>
<artifactId>thorp-domain</artifactId> <artifactId>thorp-domain</artifactId>
</dependency> </dependency>
<!-- scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!-- zio -->
<dependency>
<groupId>dev.zio</groupId>
<artifactId>zio_2.13</artifactId>
</dependency>
<dependency>
<groupId>dev.zio</groupId>
<artifactId>zio-streams_2.13</artifactId>
</dependency>
</dependencies> </dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project> </project>

View file

@ -0,0 +1,25 @@
package net.kemitix.thorp.console;
public interface Console {
static void putStrLn(String line) {
System.out.println(line);
}
static void putStr(String line) {
System.out.print(line);
}
static void putMessageLnB(
ConsoleOut.WithBatchMode message,
boolean batchMode
) {
putStrLn(
batchMode
? message.enBatch()
: message.en());
}
static void putMessageLn(
ConsoleOut message
) {
putStrLn(message.en());
}
}

View file

@ -0,0 +1,149 @@
package net.kemitix.thorp.console;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import net.kemitix.thorp.domain.*;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.function.Function;
import java.util.stream.Collectors;
public interface ConsoleOut {
String en();
default String eraseToEndOfScreen() {
return Terminal.eraseToEndOfScreen;
}
default String reset() {
return "\u001B[0m";
}
default String red() {
return "\u001B[31m";
}
default String green() {
return "\u001B[32m";
}
interface WithBatchMode extends ConsoleOut, Function<Boolean, String> {
String enBatch();
default String selectLine(boolean batchMode) {
return batchMode
? enBatch()
: en();
}
@Override
default String apply(Boolean batchMode) {
return selectLine(batchMode);
}
}
static ConsoleOut validConfig(
Bucket bucket,
RemoteKey prefix,
Sources sources
) {
return new ValidConfig(bucket, prefix, sources);
}
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
class ValidConfig implements ConsoleOut {
private final Bucket bucket;
private final RemoteKey prefix;
private final Sources sources;
@Override
public String en() {
return String.join(", ", Arrays.asList(
"Bucket: " + bucket.name(),
"Prefix: " + prefix.key(),
"Source: " + sources.paths().stream()
.map(Path::toString)
.collect(Collectors.joining(", "))));
}
}
static ConsoleOut.WithBatchMode uploadComplete(RemoteKey remoteKey) {
return new UploadComplete(remoteKey);
}
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
class UploadComplete implements ConsoleOut.WithBatchMode {
private final RemoteKey remoteKey;
@Override
public String en() {
return String.format("%sUploaded:%s %s%s",
green(), reset(),
remoteKey.key(),
eraseToEndOfScreen());
}
@Override
public String enBatch() {
return String.format("Uploaded: %s", remoteKey.key());
}
}
static ConsoleOut.WithBatchMode copyComplete(RemoteKey sourceKey, RemoteKey targetKey) {
return new CopyComplete(sourceKey, targetKey);
}
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
class CopyComplete implements ConsoleOut.WithBatchMode {
private final RemoteKey sourceKey;
private final RemoteKey targetKey;
@Override
public String en() {
return String.format("%sCopied:%s %s => %s%s",
green(), reset(),
sourceKey.key(),
targetKey.key(),
eraseToEndOfScreen());
}
@Override
public String enBatch() {
return String.format("Copied: %s => %s",
sourceKey.key(), targetKey.key());
}
}
static ConsoleOut.WithBatchMode deleteComplete(RemoteKey remoteKey) {
return new DeleteComplete(remoteKey);
}
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
class DeleteComplete implements WithBatchMode {
private final RemoteKey remoteKey;
@Override
public String en() {
return String.format("Deleted: %s", remoteKey);
}
@Override
public String enBatch() {
return String.format("%sDeleted%s: %s%s",
green(), reset(),
remoteKey,
eraseToEndOfScreen());
}
}
static ConsoleOut.WithBatchMode errorQueueEventOccurred(
StorageEvent.ActionSummary actionSummary,
Throwable error
) {
return new ErrorQueueEventOccurred(actionSummary, error);
}
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
class ErrorQueueEventOccurred implements WithBatchMode {
private final StorageEvent.ActionSummary actionSummary;
private final Throwable error;
@Override
public String en() {
return String.format("%s failed: %s: %s",
actionSummary.name(),
actionSummary.keys(),
error.getMessage());
}
@Override
public String enBatch() {
return String.format("%sERROR%s: %s %s: %s%s",
red(), reset(),
actionSummary.name(),
actionSummary.keys(),
error.getMessage(),
eraseToEndOfScreen());
}
}
}

View file

@ -1,81 +0,0 @@
package net.kemitix.thorp.console
import java.io.PrintStream
import java.util.concurrent.atomic.AtomicReference
import zio.{UIO, ZIO}
import scala.{Console => SConsole}
trait Console {
val console: Console.Service
}
object Console {
trait Service {
def putMessageLn(line: ConsoleOut): ZIO[Console, Nothing, Unit]
def putStrLn(line: String): ZIO[Console, Nothing, Unit]
def putStr(line: String): ZIO[Console, Nothing, Unit]
}
trait Live extends Console {
val console: Service = new Service {
override def putMessageLn(line: ConsoleOut): ZIO[Console, Nothing, Unit] =
putStrLn(line.en)
override def putStrLn(line: String): ZIO[Console, Nothing, Unit] =
putStrLnPrintStream(SConsole.out)(line)
override def putStr(line: String): ZIO[Console, Nothing, Unit] =
putStrPrintStream(SConsole.out)(line)
final def putStrLnPrintStream(stream: PrintStream)(
line: String): ZIO[Console, Nothing, Unit] =
UIO(SConsole.withOut(stream)(SConsole.println(line)))
final def putStrPrintStream(stream: PrintStream)(
line: String): ZIO[Console, Nothing, Unit] =
UIO(SConsole.withOut(stream)(SConsole.print(line)))
}
}
object Live extends Live
trait Test extends Console {
private val output = new AtomicReference(List.empty[String])
def getOutput: List[String] = output.get
val console: Service = new Service {
override def putMessageLn(line: ConsoleOut): ZIO[Console, Nothing, Unit] =
putStrLn(line.en)
override def putStrLn(line: String): ZIO[Console, Nothing, Unit] = {
val _ = output.accumulateAndGet(List(line), (a, b) => a ++ b)
ZIO.succeed(())
}
override def putStr(line: String): ZIO[Console, Nothing, Unit] = {
val _ = output.accumulateAndGet(List(line), (a, b) => a ++ b)
ZIO.succeed(())
}
}
}
object Test extends Test
final val consoleService: ZIO[Console, Nothing, Console.Service] =
ZIO.access(_.console)
final def putStrLn(line: String): ZIO[Console, Nothing, Unit] =
ZIO.accessM(_.console putStrLn line)
final def putStr(line: String): ZIO[Console, Nothing, Unit] =
ZIO.accessM(_.console.putStr(line))
final def putMessageLn(line: ConsoleOut): ZIO[Console, Nothing, Unit] =
ZIO.accessM(_.console putMessageLn line)
final def putMessageLnB(line: ConsoleOut.WithBatchMode,
batchMode: Boolean): ZIO[Console, Nothing, Unit] =
ZIO.accessM(line(batchMode) >>= _.console.putStrLn)
}

View file

@ -1,70 +0,0 @@
package net.kemitix.thorp.console
import scala.jdk.CollectionConverters._
import net.kemitix.thorp.domain.StorageEvent.ActionSummary
import net.kemitix.thorp.domain.Terminal._
import net.kemitix.thorp.domain.{Bucket, RemoteKey, Sources}
import zio.UIO
import scala.io.AnsiColor._
sealed trait ConsoleOut {
def en: String
}
object ConsoleOut {
sealed trait WithBatchMode {
def en: String
def enBatch: String
def apply(batchMode: Boolean): UIO[String] =
selectLine(batchMode)
private def selectLine(batchMode: Boolean) =
if (batchMode) UIO(enBatch) else UIO(en)
}
final case class ValidConfig(
bucket: Bucket,
prefix: RemoteKey,
sources: Sources
) extends ConsoleOut {
private val sourcesList = sources.paths.asScala.mkString(", ")
override def en: String =
List(s"Bucket: ${bucket.name}",
s"Prefix: ${prefix.key}",
s"Source: $sourcesList")
.mkString(", ")
}
final case class UploadComplete(remoteKey: RemoteKey)
extends ConsoleOut.WithBatchMode {
override def en: String =
s"${GREEN}Uploaded:$RESET ${remoteKey.key}$eraseToEndOfScreen"
override def enBatch: String =
s"Uploaded: ${remoteKey.key}"
}
final case class CopyComplete(sourceKey: RemoteKey, targetKey: RemoteKey)
extends ConsoleOut.WithBatchMode {
override def en: String =
s"${GREEN}Copied:$RESET ${sourceKey.key} => ${targetKey.key}$eraseToEndOfScreen"
override def enBatch: String =
s"Copied: ${sourceKey.key} => ${targetKey.key}"
}
final case class DeleteComplete(remoteKey: RemoteKey)
extends ConsoleOut.WithBatchMode {
override def en: String =
s"${GREEN}Deleted:$RESET ${remoteKey.key}$eraseToEndOfScreen"
override def enBatch: String =
s"Deleted: ${remoteKey.key}"
}
final case class ErrorQueueEventOccurred(action: ActionSummary, e: Throwable)
extends ConsoleOut.WithBatchMode {
override def en: String =
s"${RED}ERROR:$RESET ${action.name} ${action.keys}: ${e.getMessage}$eraseToEndOfScreen"
override def enBatch: String =
s"${action.name} failed: ${action.keys}: ${e.getMessage}"
}
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 287 KiB

After

Width:  |  Height:  |  Size: 279 KiB

View file

@ -2,54 +2,64 @@ package net.kemitix.thorp.lib
import net.kemitix.eip.zio.MessageChannel.UChannel import net.kemitix.eip.zio.MessageChannel.UChannel
import net.kemitix.thorp.config.Configuration import net.kemitix.thorp.config.Configuration
import net.kemitix.thorp.console.ConsoleOut.{
CopyComplete,
DeleteComplete,
ErrorQueueEventOccurred,
UploadComplete
}
import net.kemitix.thorp.console._ import net.kemitix.thorp.console._
import net.kemitix.thorp.domain.StorageEvent import net.kemitix.thorp.domain.StorageEvent
import net.kemitix.thorp.domain.StorageEvent._ import net.kemitix.thorp.domain.StorageEvent._
import net.kemitix.thorp.storage.Storage import net.kemitix.thorp.storage.Storage
import net.kemitix.thorp.uishell.UIEvent import net.kemitix.thorp.uishell.UIEvent
import zio.{RIO, ZIO} import zio.{UIO, ZIO}
trait ThorpArchive { trait ThorpArchive {
def update( def update(configuration: Configuration,
configuration: Configuration, uiChannel: UChannel[Any, UIEvent],
uiChannel: UChannel[Any, UIEvent], sequencedAction: SequencedAction,
sequencedAction: SequencedAction, totalBytesSoFar: Long): ZIO[Storage, Nothing, StorageEvent]
totalBytesSoFar: Long
): ZIO[Storage, Nothing, StorageEvent]
def logEvent(configuration: Configuration, def logEvent(configuration: Configuration,
event: StorageEvent): RIO[Console, StorageEvent] = { event: StorageEvent): UIO[StorageEvent] = {
val batchMode = configuration.batchMode val batchMode = configuration.batchMode
for { for {
sqe <- event match { sqe <- event match {
case uploadEvent: UploadEvent => case uploadEvent: UploadEvent =>
val remoteKey = uploadEvent.remoteKey val remoteKey = uploadEvent.remoteKey
ZIO(event) <* Console.putMessageLnB(UploadComplete(remoteKey), UIO(event) <* {
batchMode) Console.putMessageLnB(
ConsoleOut.uploadComplete(remoteKey),
batchMode
)
UIO.unit
}
case copyEvent: CopyEvent => case copyEvent: CopyEvent =>
val sourceKey = copyEvent.sourceKey val sourceKey = copyEvent.sourceKey
val targetKey = copyEvent.targetKey val targetKey = copyEvent.targetKey
ZIO(event) <* Console.putMessageLnB( UIO(event) <* {
CopyComplete(sourceKey, targetKey), Console.putMessageLnB(
batchMode) ConsoleOut.copyComplete(sourceKey, targetKey),
batchMode
)
UIO.unit
}
case deleteEvent: DeleteEvent => case deleteEvent: DeleteEvent =>
val remoteKey = deleteEvent.remoteKey val remoteKey = deleteEvent.remoteKey
ZIO(event) <* Console.putMessageLnB(DeleteComplete(remoteKey), UIO(event) <* {
batchMode) Console.putMessageLnB(
ConsoleOut.deleteComplete(remoteKey),
batchMode
)
UIO.unit
}
case errorEvent: ErrorEvent => case errorEvent: ErrorEvent =>
val action = errorEvent.action val action = errorEvent.action
val e = errorEvent.e val e = errorEvent.e
ZIO(event) <* Console.putMessageLnB( UIO(event) <* {
ErrorQueueEventOccurred(action, e), Console.putMessageLnB(
batchMode) ConsoleOut.errorQueueEventOccurred(action, e),
case _ => ZIO(event) batchMode
)
UIO.unit
}
case _ => UIO(event)
} }
} yield sqe } yield sqe
} }

View file

@ -2,7 +2,6 @@ package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.AmazonS3ClientBuilder import com.amazonaws.services.s3.AmazonS3ClientBuilder
import com.amazonaws.services.s3.transfer.TransferManagerBuilder import com.amazonaws.services.s3.transfer.TransferManagerBuilder
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.Storage import net.kemitix.thorp.storage.Storage
import net.kemitix.thorp.storage.Storage.Service import net.kemitix.thorp.storage.Storage.Service
@ -17,22 +16,20 @@ object S3Storage {
AmazonS3Client.create(AmazonS3ClientBuilder.standard().build()) AmazonS3Client.create(AmazonS3ClientBuilder.standard().build())
private val transferManager: S3TransferManager = private val transferManager: S3TransferManager =
S3TransferManager.create(TransferManagerBuilder.defaultTransferManager) S3TransferManager.create(TransferManagerBuilder.defaultTransferManager)
private val copier = S3Copier.copier(client) private val copier = S3Copier.copier(client)
private val uploader = S3Uploader.uploader(transferManager) private val uploader = S3Uploader.uploader(transferManager)
private val deleter = S3Deleter.deleter(client) private val deleter = S3Deleter.deleter(client)
private val lister = S3Lister.lister(client) private val lister = S3Lister.lister(client)
override def listObjects( override def listObjects(bucket: Bucket,
bucket: Bucket, prefix: RemoteKey): RIO[Storage, RemoteObjects] =
prefix: RemoteKey): RIO[Storage with Console, RemoteObjects] =
UIO { UIO {
lister(S3Lister.request(bucket, prefix)) lister(S3Lister.request(bucket, prefix))
} }
override def upload( override def upload(localFile: LocalFile,
localFile: LocalFile, bucket: Bucket,
bucket: Bucket, listenerSettings: UploadEventListener.Settings,
listenerSettings: UploadEventListener.Settings,
): UIO[StorageEvent] = ): UIO[StorageEvent] =
UIO { UIO {
uploader(S3Uploader.request(localFile, bucket)) uploader(S3Uploader.request(localFile, bucket))

View file

@ -1,6 +1,5 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.Storage import net.kemitix.thorp.storage.Storage
import net.kemitix.thorp.uishell.UploadEventListener import net.kemitix.thorp.uishell.UploadEventListener
@ -12,51 +11,46 @@ trait AmazonS3ClientTestFixture extends MockFactory {
@SuppressWarnings(Array("org.wartremover.warts.PublicInference")) @SuppressWarnings(Array("org.wartremover.warts.PublicInference"))
private val manager = stub[S3TransferManager] private val manager = stub[S3TransferManager]
@SuppressWarnings(Array("org.wartremover.warts.PublicInference")) @SuppressWarnings(Array("org.wartremover.warts.PublicInference"))
private val client = stub[AmazonS3Client] private val client = stub[AmazonS3Client]
val fixture: Fixture = Fixture(client, manager) val fixture: Fixture = Fixture(client, manager)
case class Fixture( case class Fixture(amazonS3Client: AmazonS3Client,
amazonS3Client: AmazonS3Client, amazonS3TransferManager: S3TransferManager,
amazonS3TransferManager: S3TransferManager,
) { ) {
lazy val storageService: Storage.Service = lazy val storageService: Storage.Service =
new Storage.Service { new Storage.Service {
private val client = amazonS3Client private val client = amazonS3Client
private val transferManager = amazonS3TransferManager private val transferManager = amazonS3TransferManager
override def listObjects( override def listObjects(
bucket: Bucket, bucket: Bucket,
prefix: RemoteKey prefix: RemoteKey
): RIO[Storage with Console, RemoteObjects] = ): RIO[Storage, RemoteObjects] =
UIO { UIO {
S3Lister.lister(client)(S3Lister.request(bucket, prefix)) S3Lister.lister(client)(S3Lister.request(bucket, prefix))
} }
override def upload( override def upload(localFile: LocalFile,
localFile: LocalFile, bucket: Bucket,
bucket: Bucket, listenerSettings: UploadEventListener.Settings,
listenerSettings: UploadEventListener.Settings,
): UIO[StorageEvent] = ): UIO[StorageEvent] =
UIO( UIO(
S3Uploader.uploader(transferManager)( S3Uploader
S3Uploader.request(localFile, bucket))) .uploader(transferManager)(S3Uploader.request(localFile, bucket))
)
override def copy( override def copy(bucket: Bucket,
bucket: Bucket, sourceKey: RemoteKey,
sourceKey: RemoteKey, hash: MD5Hash,
hash: MD5Hash, targetKey: RemoteKey): UIO[StorageEvent] =
targetKey: RemoteKey
): UIO[StorageEvent] =
UIO { UIO {
val request = S3Copier.request(bucket, sourceKey, hash, targetKey) val request = S3Copier.request(bucket, sourceKey, hash, targetKey)
S3Copier.copier(client)(request) S3Copier.copier(client)(request)
} }
override def delete( override def delete(bucket: Bucket,
bucket: Bucket, remoteKey: RemoteKey): UIO[StorageEvent] =
remoteKey: RemoteKey
): UIO[StorageEvent] =
UIO(S3Deleter.deleter(client)(S3Deleter.request(bucket, remoteKey))) UIO(S3Deleter.deleter(client)(S3Deleter.request(bucket, remoteKey)))
override def shutdown: UIO[StorageEvent] = { override def shutdown: UIO[StorageEvent] = {

View file

@ -107,8 +107,8 @@ class ListerTest extends FreeSpec {
// } // }
// def invoke(amazonS3Client: AmazonS3Client.Client)(bucket: Bucket, // def invoke(amazonS3Client: AmazonS3Client.Client)(bucket: Bucket,
// prefix: RemoteKey) = { // prefix: RemoteKey) = {
// object TestEnv extends Storage.Test with Console.Test // object TestEnv extends Storage.Test
// val program: RIO[Storage with Console, RemoteObjects] = Lister // val program: RIO[Storage, RemoteObjects] = Lister
// .listObjects(amazonS3Client)(bucket, prefix) // .listObjects(amazonS3Client)(bucket, prefix)
// val runtime = new DefaultRuntime {} // val runtime = new DefaultRuntime {}
// runtime.unsafeRunSync(program.provide(TestEnv)).toEither // runtime.unsafeRunSync(program.provide(TestEnv)).toEither

View file

@ -1,6 +1,5 @@
package net.kemitix.thorp.storage package net.kemitix.thorp.storage
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.uishell.UploadEventListener import net.kemitix.thorp.uishell.UploadEventListener
import zio.{RIO, Task, UIO, ZIO} import zio.{RIO, Task, UIO, ZIO}
@ -12,28 +11,20 @@ trait Storage {
object Storage { object Storage {
trait Service { trait Service {
def listObjects( def listObjects(bucket: Bucket,
bucket: Bucket, prefix: RemoteKey): RIO[Storage, RemoteObjects]
prefix: RemoteKey
): RIO[Storage with Console, RemoteObjects]
def upload( def upload(localFile: LocalFile,
localFile: LocalFile, bucket: Bucket,
bucket: Bucket, listenerSettings: UploadEventListener.Settings,
listenerSettings: UploadEventListener.Settings,
): ZIO[Storage, Nothing, StorageEvent] ): ZIO[Storage, Nothing, StorageEvent]
def copy( def copy(bucket: Bucket,
bucket: Bucket, sourceKey: RemoteKey,
sourceKey: RemoteKey, hash: MD5Hash,
hash: MD5Hash, targetKey: RemoteKey): ZIO[Storage, Nothing, StorageEvent]
targetKey: RemoteKey
): ZIO[Storage, Nothing, StorageEvent]
def delete( def delete(bucket: Bucket, remoteKey: RemoteKey): UIO[StorageEvent]
bucket: Bucket,
remoteKey: RemoteKey
): UIO[StorageEvent]
def shutdown: UIO[StorageEvent] def shutdown: UIO[StorageEvent]
} }
@ -58,17 +49,18 @@ object Storage {
listResult listResult
override def upload( override def upload(
localFile: LocalFile, localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
listenerSettings: UploadEventListener.Settings listenerSettings: UploadEventListener.Settings
): ZIO[Storage, Nothing, StorageEvent] = ): ZIO[Storage, Nothing, StorageEvent] =
uploadResult uploadResult
override def copy( override def copy(
bucket: Bucket, bucket: Bucket,
sourceKey: RemoteKey, sourceKey: RemoteKey,
hash: MD5Hash, hash: MD5Hash,
targetKey: RemoteKey): ZIO[Storage, Nothing, StorageEvent] = targetKey: RemoteKey
): ZIO[Storage, Nothing, StorageEvent] =
copyResult copyResult
override def delete(bucket: Bucket, override def delete(bucket: Bucket,
@ -84,28 +76,24 @@ object Storage {
object Test extends Test object Test extends Test
final def list(bucket: Bucket, final def list(bucket: Bucket,
prefix: RemoteKey): RIO[Storage with Console, RemoteObjects] = prefix: RemoteKey): RIO[Storage, RemoteObjects] =
ZIO.accessM(_.storage listObjects (bucket, prefix)) ZIO.accessM(_.storage listObjects (bucket, prefix))
final def upload( final def upload(
localFile: LocalFile, localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
listenerSettings: UploadEventListener.Settings listenerSettings: UploadEventListener.Settings
): ZIO[Storage, Nothing, StorageEvent] = ): ZIO[Storage, Nothing, StorageEvent] =
ZIO.accessM(_.storage upload (localFile, bucket, listenerSettings)) ZIO.accessM(_.storage upload (localFile, bucket, listenerSettings))
final def copy( final def copy(bucket: Bucket,
bucket: Bucket, sourceKey: RemoteKey,
sourceKey: RemoteKey, hash: MD5Hash,
hash: MD5Hash, targetKey: RemoteKey): ZIO[Storage, Nothing, StorageEvent] =
targetKey: RemoteKey
): ZIO[Storage, Nothing, StorageEvent] =
ZIO.accessM(_.storage copy (bucket, sourceKey, hash, targetKey)) ZIO.accessM(_.storage copy (bucket, sourceKey, hash, targetKey))
final def delete( final def delete(bucket: Bucket,
bucket: Bucket, remoteKey: RemoteKey): ZIO[Storage, Nothing, StorageEvent] =
remoteKey: RemoteKey
): ZIO[Storage, Nothing, StorageEvent] =
ZIO.accessM(_.storage delete (bucket, remoteKey)) ZIO.accessM(_.storage delete (bucket, remoteKey))
} }

View file

@ -24,52 +24,57 @@ object ProgressUI {
localFile: LocalFile, localFile: LocalFile,
bytesTransferred: Long, bytesTransferred: Long,
index: Int, index: Int,
totalBytesSoFar: Long): ZIO[Console, Nothing, Unit] = totalBytesSoFar: Long): UIO[Unit] =
for { for {
_ <- ZIO.when(bytesTransferred < localFile.file.length())( _ <- ZIO.when(bytesTransferred < localFile.file.length())(
stillUploading(localFile.remoteKey, stillUploading(
localFile.file.length(), localFile.remoteKey,
bytesTransferred)) localFile.file.length(),
bytesTransferred
)
)
_ <- ZIO.when(bytesTransferred >= localFile.file.length()) { _ <- ZIO.when(bytesTransferred >= localFile.file.length()) {
finishedUploading(localFile.remoteKey) finishedUploading(localFile.remoteKey)
} }
} yield () } yield ()
private def stillUploading( private def stillUploading(remoteKey: RemoteKey,
remoteKey: RemoteKey, fileLength: Long,
fileLength: Long, bytesTransferred: Long): UIO[Unit] = {
bytesTransferred: Long
): ZIO[Console, Nothing, Unit] = {
val current: Map[RemoteKey, UploadState] = val current: Map[RemoteKey, UploadState] =
uploads.updateAndGet((m: Map[RemoteKey, UploadState]) => uploads.updateAndGet(
m.updated(remoteKey, UploadState(bytesTransferred, fileLength))) (m: Map[RemoteKey, UploadState]) =>
m.updated(remoteKey, UploadState(bytesTransferred, fileLength))
)
val resetCursor = s"${Terminal.cursorPrevLine(statusHeight) * current.size}" val resetCursor = s"${Terminal.cursorPrevLine(statusHeight) * current.size}"
ZIO.foreach(current) { entry => ZIO.foreach(current) { entry =>
{ {
val (remoteKey, state) = entry val (remoteKey, state) = entry
val percent = f"${(state.transferred * 100) / state.fileLength}%2d" val percent = f"${(state.transferred * 100) / state.fileLength}%2d"
val transferred = sizeInEnglish(state.transferred) val transferred = sizeInEnglish(state.transferred)
val fileLength = sizeInEnglish(state.fileLength) val fileLength = sizeInEnglish(state.fileLength)
val line1 = val line1 =
s"${GREEN}Uploading:$RESET ${remoteKey.key}$eraseLineForward" s"${GREEN}Uploading:$RESET ${remoteKey.key}$eraseLineForward"
val line2body = s"($percent%) $transferred of $fileLength " val line2body = s"($percent%) $transferred of $fileLength "
val bar = val bar =
progressBar(state.transferred.toDouble, progressBar(
state.fileLength.toDouble, state.transferred.toDouble,
Terminal.width - line2body.length) state.fileLength.toDouble,
Terminal.width - line2body.length
)
val line2 = s"$GREEN$line2body$RESET$bar$eraseLineForward" val line2 = s"$GREEN$line2body$RESET$bar$eraseLineForward"
Console.putStrLn(line1) *> UIO(Console.putStrLn(line1)) *>
Console.putStrLn(line2) UIO(Console.putStrLn(line2))
} }
} *> Console.putStr(resetCursor) } *> UIO(Console.putStr(resetCursor))
} }
def finishedUploading( def finishedUploading(remoteKey: RemoteKey): ZIO[Any, Nothing, Unit] = {
remoteKey: RemoteKey UIO(
): ZIO[Any, Nothing, Unit] = { uploads
UIO(uploads.updateAndGet((m: Map[RemoteKey, UploadState]) => .updateAndGet((m: Map[RemoteKey, UploadState]) => m.removed(remoteKey))
m.removed(remoteKey))) *> UIO.unit ) *> UIO.unit
} }
} }

View file

@ -2,12 +2,6 @@ package net.kemitix.thorp.uishell
import net.kemitix.eip.zio.MessageChannel import net.kemitix.eip.zio.MessageChannel
import net.kemitix.thorp.config.Configuration import net.kemitix.thorp.config.Configuration
import net.kemitix.thorp.console.ConsoleOut.{
CopyComplete,
DeleteComplete,
ErrorQueueEventOccurred,
UploadComplete
}
import net.kemitix.thorp.console.{Console, ConsoleOut} import net.kemitix.thorp.console.{Console, ConsoleOut}
import net.kemitix.thorp.domain.Terminal.{eraseLineForward, eraseToEndOfScreen} import net.kemitix.thorp.domain.Terminal.{eraseLineForward, eraseToEndOfScreen}
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
@ -15,8 +9,9 @@ import zio.{UIO, ZIO}
object UIShell { object UIShell {
def receiver(configuration: Configuration) def receiver(
: UIO[MessageChannel.UReceiver[Console, UIEvent]] = configuration: Configuration
): UIO[MessageChannel.UReceiver[Any, UIEvent]] =
UIO { uiEventMessage => UIO { uiEventMessage =>
uiEventMessage.body match { uiEventMessage.body match {
case UIEvent.ShowValidConfig => showValidConfig(configuration) case UIEvent.ShowValidConfig => showValidConfig(configuration)
@ -31,80 +26,103 @@ object UIShell {
case UIEvent.ActionFinished(_, _, _, event) => case UIEvent.ActionFinished(_, _, _, event) =>
actionFinished(configuration, event) actionFinished(configuration, event)
case UIEvent.KeyFound(_) => UIO(()) case UIEvent.KeyFound(_) => UIO(())
case UIEvent.RequestCycle(localFile, case UIEvent.RequestCycle(
bytesTransferred, localFile,
index, bytesTransferred,
totalBytesSoFar) => index,
ProgressUI.requestCycle(configuration, totalBytesSoFar
localFile, ) =>
bytesTransferred, ProgressUI.requestCycle(
index, configuration,
totalBytesSoFar) localFile,
bytesTransferred,
index,
totalBytesSoFar
)
} }
} }
private def actionFinished( private def actionFinished(configuration: Configuration,
configuration: Configuration, event: StorageEvent): UIO[Unit] = {
event: StorageEvent): ZIO[Console, Nothing, Unit] = {
val batchMode = configuration.batchMode val batchMode = configuration.batchMode
for { for {
_ <- event match { _ <- event match {
case _: StorageEvent.DoNothingEvent => UIO.unit case _: StorageEvent.DoNothingEvent => UIO.unit
case copyEvent: StorageEvent.CopyEvent => { case copyEvent: StorageEvent.CopyEvent =>
val sourceKey = copyEvent.sourceKey val sourceKey = copyEvent.sourceKey
val targetKey = copyEvent.targetKey val targetKey = copyEvent.targetKey
Console.putMessageLnB(CopyComplete(sourceKey, targetKey), batchMode) Console.putMessageLnB(
} ConsoleOut.copyComplete(sourceKey, targetKey),
case uploadEvent: StorageEvent.UploadEvent => { batchMode
)
UIO.unit
case uploadEvent: StorageEvent.UploadEvent =>
val remoteKey = uploadEvent.remoteKey val remoteKey = uploadEvent.remoteKey
ProgressUI.finishedUploading(remoteKey) *> Console
Console.putMessageLnB(UploadComplete(remoteKey), batchMode) .putMessageLnB(ConsoleOut.uploadComplete(remoteKey), batchMode)
} ProgressUI.finishedUploading(remoteKey)
case deleteEvent: StorageEvent.DeleteEvent => { case deleteEvent: StorageEvent.DeleteEvent =>
val remoteKey = deleteEvent.remoteKey val remoteKey = deleteEvent.remoteKey
Console.putMessageLnB(DeleteComplete(remoteKey), batchMode) Console.putMessageLnB(ConsoleOut.deleteComplete(remoteKey), batchMode)
} UIO.unit
case errorEvent: StorageEvent.ErrorEvent => { case errorEvent: StorageEvent.ErrorEvent =>
val remoteKey = errorEvent.remoteKey val remoteKey = errorEvent.remoteKey
val action = errorEvent.action val action = errorEvent.action
val e = errorEvent.e val e = errorEvent.e
ProgressUI.finishedUploading(remoteKey) *> ProgressUI.finishedUploading(remoteKey) *>
Console.putMessageLnB(ErrorQueueEventOccurred(action, e), batchMode) UIO(
} Console.putMessageLnB(
ConsoleOut.errorQueueEventOccurred(action, e),
batchMode
)
)
case _: StorageEvent.ShutdownEvent => UIO.unit case _: StorageEvent.ShutdownEvent => UIO.unit
} }
} yield () } yield ()
} }
private def uploadWaitComplete(action: Action): ZIO[Console, Nothing, Unit] = private def uploadWaitComplete(action: Action): UIO[Unit] = {
Console.putStrLn(s"Finished waiting to other upload - now $action") Console.putStrLn(s"Finished waiting to other upload - now $action")
UIO.unit
}
private def awaitingUpload(remoteKey: RemoteKey, private def awaitingUpload(remoteKey: RemoteKey, hash: MD5Hash): UIO[Unit] = {
hash: MD5Hash): ZIO[Console, Nothing, Unit] =
Console.putStrLn( Console.putStrLn(
s"Awaiting another upload of $hash before copying it to $remoteKey") s"Awaiting another upload of $hash before copying it to $remoteKey"
)
UIO.unit
}
private def fileFound(configuration: Configuration, private def fileFound(configuration: Configuration,
localFile: LocalFile): ZIO[Console, Nothing, Unit] = localFile: LocalFile): UIO[Unit] =
ZIO.when(configuration.batchMode)( ZIO.when(configuration.batchMode) {
Console.putStrLn(s"Found: ${localFile.file}")) Console.putStrLn(s"Found: ${localFile.file}")
UIO.unit
}
private def showSummary(counters: Counters): ZIO[Console, Nothing, Unit] = private def showSummary(counters: Counters): UIO[Unit] = {
Console.putStrLn(eraseToEndOfScreen) *> Console.putStrLn(eraseToEndOfScreen)
Console.putStrLn(s"Uploaded ${counters.uploaded} files") *> Console.putStrLn(s"Uploaded ${counters.uploaded} files")
Console.putStrLn(s"Copied ${counters.copied} files") *> Console.putStrLn(s"Copied ${counters.copied} files")
Console.putStrLn(s"Deleted ${counters.deleted} files") *> Console.putStrLn(s"Deleted ${counters.deleted} files")
Console.putStrLn(s"Errors ${counters.errors}") Console.putStrLn(s"Errors ${counters.errors}")
UIO.unit
}
private def remoteDataFetched(size: Int): ZIO[Console, Nothing, Unit] = private def remoteDataFetched(size: Int): UIO[Unit] = {
Console.putStrLn(s"Found $size remote objects") Console.putStrLn(s"Found $size remote objects")
UIO.unit
}
private def showValidConfig( private def showValidConfig(configuration: Configuration): UIO[Unit] = {
configuration: Configuration): ZIO[Console, Nothing, Unit] =
Console.putMessageLn( Console.putMessageLn(
ConsoleOut.ValidConfig(configuration.bucket, ConsoleOut.validConfig(
configuration.prefix, configuration.bucket,
configuration.sources)) configuration.prefix,
configuration.sources
)
)
UIO.unit
}
def trimHead(str: String): String = { def trimHead(str: String): String = {
val width = Terminal.width val width = Terminal.width
@ -114,14 +132,11 @@ object UIShell {
} }
} }
def actionChosen(configuration: Configuration, def actionChosen(configuration: Configuration, action: Action): UIO[Unit] = {
action: Action): ZIO[Console, Nothing, Unit] = {
val message = trimHead(action.asString()) + eraseLineForward val message = trimHead(action.asString()) + eraseLineForward
val batch = configuration.batchMode if (configuration.batchMode) Console.putStr(message + "\r")
for { else Console.putStrLn(message)
_ <- ZIO.when(!batch) { Console.putStr(message + "\r") } UIO.unit
_ <- ZIO.when(batch) { Console.putStrLn(message) }
} yield ()
} }
} }