Convert Storage environment to Java (#476)

* uishell.UploadProgressEvent: convert to Java

* uishell.UIEvent: convert to Java

* uishell.ProgressUI: convert to Java

* uishell.ProgressEvent: remove unused

* lib.MessageChannel: added as replacement for eip-zio version

* domain: move MessageChannel to module

* storage: convert to Java
This commit is contained in:
Paul Campbell 2020-06-23 10:23:28 +01:00 committed by GitHub
parent 97d0e0d190
commit 0ae523a1e7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 860 additions and 736 deletions

View file

@ -0,0 +1 @@
net.kemitix.thorp.storage.aws.S3Storage

View file

@ -1,16 +1,12 @@
package net.kemitix.thorp package net.kemitix.thorp
import net.kemitix.thorp.lib.FileScanner import net.kemitix.thorp.lib.FileScanner
import net.kemitix.thorp.storage.aws.S3Storage
import zio.clock.Clock import zio.clock.Clock
import zio.{App, ZEnv, ZIO} import zio.{App, ZEnv, ZIO}
object Main extends App { object Main extends App {
object LiveThorpApp object LiveThorpApp extends Clock.Live with FileScanner.Live
extends S3Storage.Live
with Clock.Live
with FileScanner.Live
override def run(args: List[String]): ZIO[ZEnv, Nothing, Int] = override def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
Program Program

View file

@ -26,9 +26,7 @@ trait Program {
val version = "0.11.0" val version = "0.11.0"
lazy val versionLabel = s"${WHITE}Thorp v$version$RESET" lazy val versionLabel = s"${WHITE}Thorp v$version$RESET"
def run( def run(args: List[String]): ZIO[Clock with FileScanner, Nothing, Unit] = {
args: List[String]
): ZIO[Storage with Clock with FileScanner, Nothing, Unit] = {
(for { (for {
cli <- UIO(CliArgs.parse(args.toArray)) cli <- UIO(CliArgs.parse(args.toArray))
config <- IO(ConfigurationBuilder.buildConfig(cli)) config <- IO(ConfigurationBuilder.buildConfig(cli))
@ -50,7 +48,7 @@ trait Program {
private def executeWithUI( private def executeWithUI(
configuration: Configuration configuration: Configuration
): ZIO[Storage with Clock with FileScanner, Throwable, Unit] = ): ZIO[Clock with FileScanner, Throwable, Unit] =
for { for {
uiEventSender <- execute(configuration) uiEventSender <- execute(configuration)
uiEventReceiver <- UIShell.receiver(configuration) uiEventReceiver <- UIShell.receiver(configuration)
@ -61,9 +59,8 @@ trait Program {
private def execute( private def execute(
configuration: Configuration configuration: Configuration
): UIO[MessageChannel.ESender[Storage with Clock with FileScanner, ): UIO[MessageChannel.ESender[Clock with FileScanner, Throwable, UIEvent]] =
Throwable, UIO { uiChannel =>
UIEvent]] = UIO { uiChannel =>
(for { (for {
_ <- showValidConfig(uiChannel) _ <- showValidConfig(uiChannel)
remoteData <- fetchRemoteData(configuration, uiChannel) remoteData <- fetchRemoteData(configuration, uiChannel)
@ -77,17 +74,17 @@ trait Program {
} }
private def showValidConfig(uiChannel: UIChannel) = private def showValidConfig(uiChannel: UIChannel) =
Message.create(UIEvent.ShowValidConfig) >>= MessageChannel.send(uiChannel) Message.create(UIEvent.showValidConfig) >>= MessageChannel.send(uiChannel)
private def fetchRemoteData( private def fetchRemoteData(
configuration: Configuration, configuration: Configuration,
uiChannel: UIChannel uiChannel: UIChannel
): ZIO[Clock with Storage, Throwable, RemoteObjects] = { ): ZIO[Clock, Throwable, RemoteObjects] = {
val bucket = configuration.bucket val bucket = configuration.bucket
val prefix = configuration.prefix val prefix = configuration.prefix
val objects = Storage.getInstance().list(bucket, prefix)
for { for {
objects <- Storage.list(bucket, prefix) _ <- Message.create(UIEvent.remoteDataFetched(objects.byKey.size)) >>= MessageChannel
_ <- Message.create(UIEvent.RemoteDataFetched(objects.byKey.size)) >>= MessageChannel
.send(uiChannel) .send(uiChannel)
} yield objects } yield objects
} }
@ -109,7 +106,7 @@ trait Program {
uiChannel: UIChannel uiChannel: UIChannel
)(events: Seq[StorageEvent]): RIO[Clock, Unit] = { )(events: Seq[StorageEvent]): RIO[Clock, Unit] = {
val counters = events.foldLeft(Counters.empty)(countActivities) val counters = events.foldLeft(Counters.empty)(countActivities)
Message.create(UIEvent.ShowSummary(counters)) >>= Message.create(UIEvent.showSummary(counters)) >>=
MessageChannel.send(uiChannel) MessageChannel.send(uiChannel)
} }

View file

@ -11,18 +11,6 @@ import java.util.stream.Collectors;
public interface ConsoleOut { public interface ConsoleOut {
String en(); String en();
default String eraseToEndOfScreen() {
return Terminal.eraseToEndOfScreen;
}
default String reset() {
return "\u001B[0m";
}
default String red() {
return "\u001B[31m";
}
default String green() {
return "\u001B[32m";
}
interface WithBatchMode extends ConsoleOut, Function<Boolean, String> { interface WithBatchMode extends ConsoleOut, Function<Boolean, String> {
String enBatch(); String enBatch();
default String selectLine(boolean batchMode) { default String selectLine(boolean batchMode) {
@ -69,9 +57,9 @@ public interface ConsoleOut {
@Override @Override
public String en() { public String en() {
return String.format("%sUploaded:%s %s%s", return String.format("%sUploaded:%s %s%s",
green(), reset(), Terminal.green, Terminal.reset,
remoteKey.key(), remoteKey.key(),
eraseToEndOfScreen()); Terminal.eraseToEndOfScreen);
} }
@Override @Override
public String enBatch() { public String enBatch() {
@ -89,10 +77,10 @@ public interface ConsoleOut {
@Override @Override
public String en() { public String en() {
return String.format("%sCopied:%s %s => %s%s", return String.format("%sCopied:%s %s => %s%s",
green(), reset(), Terminal.green, Terminal.reset,
sourceKey.key(), sourceKey.key(),
targetKey.key(), targetKey.key(),
eraseToEndOfScreen()); Terminal.eraseToEndOfScreen);
} }
@Override @Override
public String enBatch() { public String enBatch() {
@ -113,9 +101,9 @@ public interface ConsoleOut {
@Override @Override
public String enBatch() { public String enBatch() {
return String.format("%sDeleted%s: %s%s", return String.format("%sDeleted%s: %s%s",
green(), reset(), Terminal.green, Terminal.reset,
remoteKey, remoteKey,
eraseToEndOfScreen()); Terminal.eraseToEndOfScreen);
} }
} }
static ConsoleOut.WithBatchMode errorQueueEventOccurred( static ConsoleOut.WithBatchMode errorQueueEventOccurred(
@ -138,11 +126,11 @@ public interface ConsoleOut {
@Override @Override
public String enBatch() { public String enBatch() {
return String.format("%sERROR%s: %s %s: %s%s", return String.format("%sERROR%s: %s %s: %s%s",
red(), reset(), Terminal.red, Terminal.reset,
actionSummary.name(), actionSummary.name(),
actionSummary.keys(), actionSummary.keys(),
error.getMessage(), error.getMessage(),
eraseToEndOfScreen()); Terminal.eraseToEndOfScreen);
} }
} }

Binary file not shown.

Before

Width:  |  Height:  |  Size: 236 KiB

After

Width:  |  Height:  |  Size: 233 KiB

View file

@ -0,0 +1,71 @@
package net.kemitix.thorp.domain;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public class MessageChannel<T> {
private final MessageSupplier<T> messageSupplier;
private final List<MessageConsumer<T>> messageConsumers;
private final Thread channelThread;
static <T> MessageChannel<T> create(MessageSupplier<T> supplier) {
List<MessageConsumer<T>> consumers = new ArrayList<>();
return new MessageChannel<T>(supplier, consumers,
new Thread(new ChannelRunner<T>(supplier, consumers)));
}
public static <T> BlockingQueue<T> createMessageSupplier(Class<T> messageClass) {
return new LinkedTransferQueue<>();
}
public void addMessageConsumer(MessageConsumer<T> consumer) {
messageConsumers.add(consumer);
}
public void startChannel() {
channelThread.start();
}
public void shutdownChannel() {
channelThread.interrupt();
}
public interface MessageSupplier<T> {
T take() throws InterruptedException;
boolean isComplete();
}
public interface MessageConsumer<T> {
void accept(T message);
}
@RequiredArgsConstructor
private static class ChannelRunner<T> implements Runnable {
AtomicBoolean shutdownTrigger = new AtomicBoolean(false);
private final MessageSupplier<T> supplier;
private final List<MessageConsumer<T>> consumers;
@Override
public void run() {
while (!shutdownTrigger.get()) {
try {
T message = supplier.take();
for (MessageConsumer<T> consumer : consumers) {
consumer.accept(message);
}
if (supplier.isComplete()) {
shutdownTrigger.set(true);
}
} catch (InterruptedException e) {
shutdownTrigger.set(true);
}
}
}
}
}

View file

@ -0,0 +1,11 @@
package net.kemitix.thorp.domain;
import java.util.stream.IntStream;
public class StringUtil {
public static String repeat(String s, int times) {
StringBuilder sb = new StringBuilder();
IntStream.range(0, times).forEach(x -> sb.append(s));
return sb.toString();
}
}

View file

@ -3,7 +3,6 @@ package net.kemitix.thorp.domain;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.stream.IntStream;
public class Terminal { public class Terminal {
@ -67,6 +66,10 @@ public class Terminal {
public static String enableAlternateBuffer = csi + "?1049h"; public static String enableAlternateBuffer = csi + "?1049h";
public static String disableAlternateBuffer = csi + "?1049l"; public static String disableAlternateBuffer = csi + "?1049l";
public static String red = "\u001B[31m";
public static String green = "\u001B[32m";
public static String reset = "\u001B[0m";
private static Map<Integer, String> getSubBars() { private static Map<Integer, String> getSubBars() {
Map<Integer, String> subBars = new HashMap<>(); Map<Integer, String> subBars = new HashMap<>();
subBars.put(0, " "); subBars.put(0, " ");
@ -186,15 +189,10 @@ public class Terminal {
int fullHeadSize = pxDone / phases; int fullHeadSize = pxDone / phases;
int part = pxDone % phases; int part = pxDone % phases;
String partial = part != 0 ? subBars.getOrDefault(part, "") : ""; String partial = part != 0 ? subBars.getOrDefault(part, "") : "";
String head = repeat("", fullHeadSize) + partial; String head = StringUtil.repeat("", fullHeadSize) + partial;
int tailSize = barWidth - head.length(); int tailSize = barWidth - head.length();
String tail = repeat(" ", tailSize); String tail = StringUtil.repeat(" ", tailSize);
return "[" + head + tail + "]"; return "[" + head + tail + "]";
} }
private static String repeat(String s, int times) {
StringBuilder sb = new StringBuilder();
IntStream.range(0, times).forEach(x -> sb.append(s));
return sb.toString();
}
} }

View file

@ -12,6 +12,12 @@
<name>lib</name> <name>lib</name>
<dependencies> <dependencies>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- thorp --> <!-- thorp -->
<dependency> <dependency>
<groupId>net.kemitix.thorp</groupId> <groupId>net.kemitix.thorp</groupId>

View file

@ -37,21 +37,23 @@ object LocalFileSystem extends LocalFileSystem {
uiChannel: UChannel[Any, UIEvent], uiChannel: UChannel[Any, UIEvent],
remoteObjects: RemoteObjects, remoteObjects: RemoteObjects,
archive: ThorpArchive archive: ThorpArchive
): RIO[Clock with FileScanner with Storage, Seq[StorageEvent]] = ): RIO[Clock with FileScanner, Seq[StorageEvent]] =
for { for {
actionCounter <- Ref.make(0) actionCounter <- Ref.make(0)
bytesCounter <- Ref.make(0L) bytesCounter <- Ref.make(0L)
uploads <- Ref.make(Map.empty[MD5Hash, Promise[Throwable, RemoteKey]]) uploads <- Ref.make(Map.empty[MD5Hash, Promise[Throwable, RemoteKey]])
eventsRef <- Ref.make(List.empty[StorageEvent]) eventsRef <- Ref.make(List.empty[StorageEvent])
fileSender <- FileScanner.scanSources(configuration) fileSender <- FileScanner.scanSources(configuration)
fileReceiver <- fileReceiver(configuration, fileReceiver <- fileReceiver(
configuration,
uiChannel, uiChannel,
remoteObjects, remoteObjects,
archive, archive,
uploads, uploads,
actionCounter, actionCounter,
bytesCounter, bytesCounter,
eventsRef) eventsRef
)
parallel = configuration.parallel parallel = configuration.parallel
_ <- MessageChannel _ <- MessageChannel
.pointToPointPar(parallel)(fileSender)(fileReceiver) .pointToPointPar(parallel)(fileSender)(fileReceiver)
@ -64,18 +66,20 @@ object LocalFileSystem extends LocalFileSystem {
uiChannel: UChannel[Any, UIEvent], uiChannel: UChannel[Any, UIEvent],
remoteData: RemoteObjects, remoteData: RemoteObjects,
archive: ThorpArchive archive: ThorpArchive
): RIO[Clock with Storage, Seq[StorageEvent]] = ): RIO[Clock, Seq[StorageEvent]] =
for { for {
actionCounter <- Ref.make(0) actionCounter <- Ref.make(0)
bytesCounter <- Ref.make(0L) bytesCounter <- Ref.make(0L)
eventsRef <- Ref.make(List.empty[StorageEvent]) eventsRef <- Ref.make(List.empty[StorageEvent])
keySender <- keySender(remoteData.byKey.keys.asScala) keySender <- keySender(remoteData.byKey.keys.asScala)
keyReceiver <- keyReceiver(configuration, keyReceiver <- keyReceiver(
configuration,
uiChannel, uiChannel,
archive, archive,
actionCounter, actionCounter,
bytesCounter, bytesCounter,
eventsRef) eventsRef
)
parallel = configuration.parallel parallel = configuration.parallel
_ <- MessageChannel _ <- MessageChannel
.pointToPointPar(parallel)(keySender)(keyReceiver) .pointToPointPar(parallel)(keySender)(keyReceiver)
@ -92,35 +96,41 @@ object LocalFileSystem extends LocalFileSystem {
actionCounterRef: Ref[Int], actionCounterRef: Ref[Int],
bytesCounterRef: Ref[Long], bytesCounterRef: Ref[Long],
eventsRef: Ref[List[StorageEvent]] eventsRef: Ref[List[StorageEvent]]
): UIO[ ): UIO[MessageChannel.UReceiver[Clock, FileScanner.ScannedFile]] =
MessageChannel.UReceiver[Clock with Storage, FileScanner.ScannedFile]] =
UIO { message => UIO { message =>
val localFile = message.body val localFile = message.body
for { for {
_ <- uiFileFound(uiChannel)(localFile) _ <- uiFileFound(uiChannel)(localFile)
action <- chooseAction(configuration, action <- chooseAction(
configuration,
remoteObjects, remoteObjects,
uploads, uploads,
uiChannel)(localFile) uiChannel
)(localFile)
actionCounter <- actionCounterRef.update(_ + 1) actionCounter <- actionCounterRef.update(_ + 1)
bytesCounter <- bytesCounterRef.update(_ + action.size) bytesCounter <- bytesCounterRef.update(_ + action.size)
_ <- uiActionChosen(uiChannel)(action) _ <- uiActionChosen(uiChannel)(action)
sequencedAction = SequencedAction(action, actionCounter) sequencedAction = SequencedAction(action, actionCounter)
event <- archive.update(configuration, event <- archive.update(
configuration,
uiChannel, uiChannel,
sequencedAction, sequencedAction,
bytesCounter) bytesCounter
)
_ <- eventsRef.update(list => event :: list) _ <- eventsRef.update(list => event :: list)
_ <- uiActionFinished(uiChannel)(action, _ <- uiActionFinished(uiChannel)(
action,
actionCounter, actionCounter,
bytesCounter, bytesCounter,
event) event
)
} yield () } yield ()
} }
private def uiActionChosen(uiChannel: MessageChannel.UChannel[Any, UIEvent])( private def uiActionChosen(
action: Action) = uiChannel: MessageChannel.UChannel[Any, UIEvent]
Message.create(UIEvent.ActionChosen(action)) >>= )(action: Action) =
Message.create(UIEvent.actionChosen(action)) >>=
MessageChannel.send(uiChannel) MessageChannel.send(uiChannel)
private def uiActionFinished(uiChannel: UChannel[Any, UIEvent])( private def uiActionFinished(uiChannel: UChannel[Any, UIEvent])(
@ -130,12 +140,14 @@ object LocalFileSystem extends LocalFileSystem {
event: StorageEvent event: StorageEvent
) = ) =
Message.create( Message.create(
UIEvent.ActionFinished(action, actionCounter, bytesCounter, event)) >>= UIEvent.actionFinished(action, actionCounter, bytesCounter, event)
) >>=
MessageChannel.send(uiChannel) MessageChannel.send(uiChannel)
private def uiFileFound(uiChannel: UChannel[Any, UIEvent])( private def uiFileFound(
localFile: LocalFile) = uiChannel: UChannel[Any, UIEvent]
Message.create(UIEvent.FileFound(localFile)) >>= )(localFile: LocalFile) =
Message.create(UIEvent.fileFound(localFile)) >>=
MessageChannel.send(uiChannel) MessageChannel.send(uiChannel)
private def chooseAction( private def chooseAction(
@ -148,7 +160,8 @@ object LocalFileSystem extends LocalFileSystem {
remoteExists <- UIO(remoteObjects.remoteKeyExists(localFile.remoteKey)) remoteExists <- UIO(remoteObjects.remoteKeyExists(localFile.remoteKey))
remoteMatches <- UIO(remoteObjects.remoteMatchesLocalFile(localFile)) remoteMatches <- UIO(remoteObjects.remoteMatchesLocalFile(localFile))
remoteForHash <- UIO( remoteForHash <- UIO(
remoteObjects.remoteHasHash(localFile.hashes).toScala) remoteObjects.remoteHasHash(localFile.hashes).toScala
)
previous <- uploads.get previous <- uploads.get
bucket = configuration.bucket bucket = configuration.bucket
action <- if (remoteExists && remoteMatches) action <- if (remoteExists && remoteMatches)
@ -179,24 +192,17 @@ object LocalFileSystem extends LocalFileSystem {
previous.contains(hash) previous.contains(hash)
}) })
private def doNothing( private def doNothing(localFile: LocalFile, bucket: Bucket): UIO[Action] =
localFile: LocalFile, UIO {
bucket: Bucket
): UIO[Action] = UIO {
Action.doNothing(bucket, localFile.remoteKey, localFile.length) Action.doNothing(bucket, localFile.remoteKey, localFile.length)
} }
private def doCopy( private def doCopy(localFile: LocalFile,
localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
sourceKey: RemoteKey, sourceKey: RemoteKey,
hash: MD5Hash hash: MD5Hash): UIO[Action] = UIO {
): UIO[Action] = UIO { Action
Action.toCopy(bucket, .toCopy(bucket, sourceKey, hash, localFile.remoteKey, localFile.length)
sourceKey,
hash,
localFile.remoteKey,
localFile.length)
} }
private def doCopyWithPreviousUpload( private def doCopyWithPreviousUpload(
@ -216,17 +222,22 @@ object LocalFileSystem extends LocalFileSystem {
.map({ hash => .map({ hash =>
for { for {
awaitingMessage <- Message.create( awaitingMessage <- Message.create(
UIEvent.AwaitingAnotherUpload(localFile.remoteKey, hash)) UIEvent.awaitingAnotherUpload(localFile.remoteKey, hash)
)
_ <- MessageChannel.send(uiChannel)(awaitingMessage) _ <- MessageChannel.send(uiChannel)(awaitingMessage)
action <- previous(hash).await.map( action <- previous(hash).await.map(
remoteKey => remoteKey =>
Action.toCopy(bucket, Action.toCopy(
bucket,
remoteKey, remoteKey,
hash, hash,
localFile.remoteKey, localFile.remoteKey,
localFile.length)) localFile.length
)
)
waitFinishedMessage <- Message.create( waitFinishedMessage <- Message.create(
UIEvent.AnotherUploadWaitComplete(action)) UIEvent.anotherUploadWaitComplete(action)
)
_ <- MessageChannel.send(uiChannel)(waitFinishedMessage) _ <- MessageChannel.send(uiChannel)(waitFinishedMessage)
} yield action } yield action
}) })
@ -234,15 +245,13 @@ object LocalFileSystem extends LocalFileSystem {
.refineToOrDie[Nothing] .refineToOrDie[Nothing]
} }
private def doUpload( private def doUpload(localFile: LocalFile, bucket: Bucket): UIO[Action] = {
localFile: LocalFile,
bucket: Bucket
): UIO[Action] = {
UIO(Action.toUpload(bucket, localFile, localFile.length)) UIO(Action.toUpload(bucket, localFile, localFile.length))
} }
def keySender( def keySender(
keys: Iterable[RemoteKey]): UIO[MessageChannel.Sender[Clock, RemoteKey]] = keys: Iterable[RemoteKey]
): UIO[MessageChannel.Sender[Clock, RemoteKey]] =
UIO { channel => UIO { channel =>
ZIO.foreach(keys) { key => ZIO.foreach(keys) { key =>
Message.create(key) >>= MessageChannel.send(channel) Message.create(key) >>= MessageChannel.send(channel)
@ -256,7 +265,7 @@ object LocalFileSystem extends LocalFileSystem {
actionCounterRef: Ref[Int], actionCounterRef: Ref[Int],
bytesCounterRef: Ref[Long], bytesCounterRef: Ref[Long],
eventsRef: Ref[List[StorageEvent]] eventsRef: Ref[List[StorageEvent]]
): UIO[MessageChannel.UReceiver[Clock with Storage, RemoteKey]] = ): UIO[MessageChannel.UReceiver[Clock, RemoteKey]] =
UIO { message => UIO { message =>
{ {
val remoteKey = message.body val remoteKey = message.body
@ -273,24 +282,29 @@ object LocalFileSystem extends LocalFileSystem {
_ <- uiActionChosen(uiChannel)(action) _ <- uiActionChosen(uiChannel)(action)
bytesCounter <- bytesCounterRef.update(_ + action.size) bytesCounter <- bytesCounterRef.update(_ + action.size)
sequencedAction = SequencedAction(action, actionCounter) sequencedAction = SequencedAction(action, actionCounter)
event <- archive.update(configuration, event <- archive.update(
configuration,
uiChannel, uiChannel,
sequencedAction, sequencedAction,
0L) 0L
)
_ <- eventsRef.update(list => event :: list) _ <- eventsRef.update(list => event :: list)
_ <- uiActionFinished(uiChannel)(action, _ <- uiActionFinished(uiChannel)(
action,
actionCounter, actionCounter,
bytesCounter, bytesCounter,
event) event
)
} yield () } yield ()
} }
} yield () } yield ()
} }
} }
private def uiKeyFound(uiChannel: UChannel[Any, UIEvent])( private def uiKeyFound(
remoteKey: RemoteKey) = uiChannel: UChannel[Any, UIEvent]
Message.create(UIEvent.KeyFound(remoteKey)) >>= )(remoteKey: RemoteKey) =
Message.create(UIEvent.keyFound(remoteKey)) >>=
MessageChannel.send(uiChannel) MessageChannel.send(uiChannel)
} }

View file

@ -5,16 +5,15 @@ import net.kemitix.thorp.config.Configuration
import net.kemitix.thorp.console._ import net.kemitix.thorp.console._
import net.kemitix.thorp.domain.StorageEvent import net.kemitix.thorp.domain.StorageEvent
import net.kemitix.thorp.domain.StorageEvent._ import net.kemitix.thorp.domain.StorageEvent._
import net.kemitix.thorp.storage.Storage
import net.kemitix.thorp.uishell.UIEvent import net.kemitix.thorp.uishell.UIEvent
import zio.{UIO, ZIO} import zio.UIO
trait ThorpArchive { trait ThorpArchive {
def update(configuration: Configuration, def update(configuration: Configuration,
uiChannel: UChannel[Any, UIEvent], uiChannel: UChannel[Any, UIEvent],
sequencedAction: SequencedAction, sequencedAction: SequencedAction,
totalBytesSoFar: Long): ZIO[Storage, Nothing, StorageEvent] totalBytesSoFar: Long): UIO[StorageEvent]
def logEvent(configuration: Configuration, def logEvent(configuration: Configuration,
event: StorageEvent): UIO[StorageEvent] = { event: StorageEvent): UIO[StorageEvent] = {

View file

@ -6,72 +6,86 @@ import net.kemitix.thorp.domain.Action.{ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.Storage import net.kemitix.thorp.storage.Storage
import net.kemitix.thorp.uishell.{UIEvent, UploadEventListener} import net.kemitix.thorp.uishell.{UIEvent, UploadEventListener}
import zio.{UIO, ZIO} import zio.UIO
trait UnversionedMirrorArchive extends ThorpArchive { trait UnversionedMirrorArchive extends ThorpArchive {
override def update( override def update(configuration: Configuration,
configuration: Configuration,
uiChannel: UChannel[Any, UIEvent], uiChannel: UChannel[Any, UIEvent],
sequencedAction: SequencedAction, sequencedAction: SequencedAction,
totalBytesSoFar: Long totalBytesSoFar: Long): UIO[StorageEvent] = {
): ZIO[Storage, Nothing, StorageEvent] = {
val action = sequencedAction.action val action = sequencedAction.action
val index = sequencedAction.index val index = sequencedAction.index
val bucket = action.bucket val bucket = action.bucket
action match { action match {
case upload: ToUpload => case upload: ToUpload =>
val localFile = upload.localFile val localFile = upload.localFile
doUpload(configuration, UIO {
doUpload(
configuration,
uiChannel, uiChannel,
index, index,
totalBytesSoFar, totalBytesSoFar,
bucket, bucket,
localFile) localFile
)
}
case toCopy: ToCopy => case toCopy: ToCopy =>
val sourceKey = toCopy.sourceKey val sourceKey = toCopy.sourceKey
val hash = toCopy.hash val hash = toCopy.hash
val targetKey = toCopy.targetKey val targetKey = toCopy.targetKey
Storage.copy(bucket, sourceKey, hash, targetKey) UIO {
Storage
.getInstance()
.copy(bucket, sourceKey, hash, targetKey)
}
case toDelete: ToDelete => case toDelete: ToDelete =>
val remoteKey = toDelete.remoteKey val remoteKey = toDelete.remoteKey
Storage.delete(bucket, remoteKey) UIO {
Storage.getInstance().delete(bucket, remoteKey)
}
case doNothing: Action.DoNothing => case doNothing: Action.DoNothing =>
val remoteKey = doNothing.remoteKey val remoteKey = doNothing.remoteKey
UIO(StorageEvent.doNothingEvent(remoteKey)) UIO {
StorageEvent.doNothingEvent(remoteKey)
}
} }
} }
private def doUpload( private def doUpload(configuration: Configuration,
configuration: Configuration,
uiChannel: UChannel[Any, UIEvent], uiChannel: UChannel[Any, UIEvent],
index: Int, index: Int,
totalBytesSoFar: Long, totalBytesSoFar: Long,
bucket: Bucket, bucket: Bucket,
localFile: LocalFile localFile: LocalFile) =
) = Storage
Storage.upload(localFile, .getInstance()
.upload(
localFile,
bucket, bucket,
listenerSettings(configuration, listenerSettings(
configuration,
uiChannel, uiChannel,
index, index,
totalBytesSoFar, totalBytesSoFar,
bucket, bucket,
localFile)) localFile
)
)
private def listenerSettings( private def listenerSettings(configuration: Configuration,
configuration: Configuration,
uiChannel: UChannel[Any, UIEvent], uiChannel: UChannel[Any, UIEvent],
index: Int, index: Int,
totalBytesSoFar: Long, totalBytesSoFar: Long,
bucket: Bucket, bucket: Bucket,
localFile: LocalFile localFile: LocalFile) =
) = UploadEventListener.Settings(
UploadEventListener.Settings(uiChannel, uiChannel,
localFile, localFile,
index, index,
totalBytesSoFar, totalBytesSoFar,
configuration.batchMode) configuration.batchMode
)
} }

View file

@ -13,7 +13,6 @@ import net.kemitix.thorp.config.{
import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem.Resource import net.kemitix.thorp.filesystem.Resource
import net.kemitix.thorp.storage.Storage
import net.kemitix.thorp.uishell.UIEvent import net.kemitix.thorp.uishell.UIEvent
import net.kemitix.thorp.uishell.UIEvent.{ import net.kemitix.thorp.uishell.UIEvent.{
ActionChosen, ActionChosen,
@ -24,7 +23,7 @@ import net.kemitix.thorp.uishell.UIEvent.{
import org.scalatest.FreeSpec import org.scalatest.FreeSpec
import org.scalatest.Matchers._ import org.scalatest.Matchers._
import zio.clock.Clock import zio.clock.Clock
import zio.{DefaultRuntime, UIO, ZIO} import zio.{DefaultRuntime, UIO}
import scala.collection.MapView import scala.collection.MapView
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
@ -42,17 +41,17 @@ class LocalFileSystemTest extends FreeSpec {
bucketOption, bucketOption,
ConfigOption.ignoreGlobalOptions(), ConfigOption.ignoreGlobalOptions(),
ConfigOption.ignoreUserOptions() ConfigOption.ignoreUserOptions()
).asJava) ).asJava
)
private val uiEvents = new AtomicReference[List[UIEvent]](List.empty) private val uiEvents = new AtomicReference[List[UIEvent]](List.empty)
private val actions = new AtomicReference[List[SequencedAction]](List.empty) private val actions = new AtomicReference[List[SequencedAction]](List.empty)
private def archive: ThorpArchive = new ThorpArchive { private def archive: ThorpArchive = new ThorpArchive {
override def update( override def update(configuration: Configuration,
configuration: Configuration,
uiChannel: UChannel[Any, UIEvent], uiChannel: UChannel[Any, UIEvent],
sequencedAction: SequencedAction, sequencedAction: SequencedAction,
totalBytesSoFar: Long): ZIO[Storage, Nothing, StorageEvent] = UIO { totalBytesSoFar: Long): UIO[StorageEvent] = UIO {
actions.updateAndGet(l => sequencedAction :: l) actions.updateAndGet(l => sequencedAction :: l)
StorageEvent.doNothingEvent(sequencedAction.action.remoteKey) StorageEvent.doNothingEvent(sequencedAction.action.remoteKey)
} }
@ -60,22 +59,21 @@ class LocalFileSystemTest extends FreeSpec {
private val runtime = new DefaultRuntime {} private val runtime = new DefaultRuntime {}
private object TestEnv private object TestEnv extends Clock.Live with FileScanner.Live
extends Clock.Live
with FileScanner.Live
with Storage.Test
"scanCopyUpload" - { "scanCopyUpload" - {
def sender(configuration: Configuration, objects: RemoteObjects) def sender(
: UIO[MessageChannel.ESender[Clock with FileScanner with Storage, configuration: Configuration,
Throwable, objects: RemoteObjects
UIEvent]] = ): UIO[MessageChannel.ESender[Clock with FileScanner, Throwable, UIEvent]] =
UIO { uiChannel => UIO { uiChannel =>
(for { (for {
_ <- LocalFileSystem.scanCopyUpload(configuration, _ <- LocalFileSystem.scanCopyUpload(
configuration,
uiChannel, uiChannel,
objects, objects,
archive) archive
)
} yield ()) <* MessageChannel.endChannel(uiChannel) } yield ()) <* MessageChannel.endChannel(uiChannel)
} }
def receiver(): UIO[MessageChannel.UReceiver[Any, UIEvent]] = def receiver(): UIO[MessageChannel.UReceiver[Any, UIEvent]] =
@ -102,7 +100,8 @@ class LocalFileSystemTest extends FreeSpec {
actionList.filter(_.isInstanceOf[ToUpload]) should have size 2 actionList.filter(_.isInstanceOf[ToUpload]) should have size 2
actionList.map(_.remoteKey) shouldEqual Set( actionList.map(_.remoteKey) shouldEqual Set(
MD5HashData.Root.remoteKey, MD5HashData.Root.remoteKey,
MD5HashData.Leaf.remoteKey) MD5HashData.Leaf.remoteKey
)
} }
"ui is updated" in { "ui is updated" in {
uiEvents.set(List.empty) uiEvents.set(List.empty)
@ -112,7 +111,8 @@ class LocalFileSystemTest extends FreeSpec {
summary should contain inOrderElementsOf List( summary should contain inOrderElementsOf List(
"file found : root-file", "file found : root-file",
"action chosen : root-file : ToUpload", "action chosen : root-file : ToUpload",
"action finished : root-file : ToUpload") "action finished : root-file : ToUpload"
)
summary should contain inOrderElementsOf List( summary should contain inOrderElementsOf List(
"file found : subdir/leaf-file", "file found : subdir/leaf-file",
"action chosen : subdir/leaf-file : ToUpload", "action chosen : subdir/leaf-file : ToUpload",
@ -126,10 +126,12 @@ class LocalFileSystemTest extends FreeSpec {
RemoteObjects.create( RemoteObjects.create(
MapView( MapView(
MD5HashData.Root.hash -> MD5HashData.Root.remoteKey, MD5HashData.Root.hash -> MD5HashData.Root.remoteKey,
MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey).toMap.asJava, MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey
).toMap.asJava,
MapView( MapView(
MD5HashData.Root.remoteKey -> MD5HashData.Root.hash, MD5HashData.Root.remoteKey -> MD5HashData.Root.hash,
MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash).toMap.asJava MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash
).toMap.asJava
) )
"do nothing for all files" - { "do nothing for all files" - {
"all archive actions do nothing" in { "all archive actions do nothing" in {
@ -147,7 +149,8 @@ class LocalFileSystemTest extends FreeSpec {
summary should contain inOrderElementsOf List( summary should contain inOrderElementsOf List(
"file found : root-file", "file found : root-file",
"action chosen : root-file : DoNothing", "action chosen : root-file : DoNothing",
"action finished : root-file : DoNothing") "action finished : root-file : DoNothing"
)
summary should contain inOrderElementsOf List( summary should contain inOrderElementsOf List(
"file found : subdir/leaf-file", "file found : subdir/leaf-file",
"action chosen : subdir/leaf-file : DoNothing", "action chosen : subdir/leaf-file : DoNothing",
@ -181,7 +184,8 @@ class LocalFileSystemTest extends FreeSpec {
summary should contain inOrderElementsOf List( summary should contain inOrderElementsOf List(
"file found : root-file", "file found : root-file",
"action chosen : root-file : DoNothing", "action chosen : root-file : DoNothing",
"action finished : root-file : DoNothing") "action finished : root-file : DoNothing"
)
summary should contain inOrderElementsOf List( summary should contain inOrderElementsOf List(
"file found : subdir/leaf-file", "file found : subdir/leaf-file",
"action chosen : subdir/leaf-file : ToUpload", "action chosen : subdir/leaf-file : ToUpload",
@ -195,10 +199,12 @@ class LocalFileSystemTest extends FreeSpec {
RemoteObjects.create( RemoteObjects.create(
MapView( MapView(
MD5HashData.Root.hash -> MD5HashData.Leaf.remoteKey, MD5HashData.Root.hash -> MD5HashData.Leaf.remoteKey,
MD5HashData.Leaf.hash -> MD5HashData.Root.remoteKey).toMap.asJava, MD5HashData.Leaf.hash -> MD5HashData.Root.remoteKey
).toMap.asJava,
MapView( MapView(
MD5HashData.Root.remoteKey -> MD5HashData.Leaf.hash, MD5HashData.Root.remoteKey -> MD5HashData.Leaf.hash,
MD5HashData.Leaf.remoteKey -> MD5HashData.Root.hash).toMap.asJava MD5HashData.Leaf.remoteKey -> MD5HashData.Root.hash
).toMap.asJava
) )
"copy files" - { "copy files" - {
"archive swaps objects" ignore { "archive swaps objects" ignore {
@ -213,10 +219,12 @@ class LocalFileSystemTest extends FreeSpec {
RemoteObjects.create( RemoteObjects.create(
MapView( MapView(
MD5HashData.Root.hash -> otherRootKey, MD5HashData.Root.hash -> otherRootKey,
MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey).toMap.asJava, MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey
).toMap.asJava,
MapView( MapView(
otherRootKey -> MD5HashData.Root.hash, otherRootKey -> MD5HashData.Root.hash,
MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash).toMap.asJava MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash
).toMap.asJava
) )
"copy object and delete original" in { "copy object and delete original" in {
actions.set(List.empty) actions.set(List.empty)
@ -237,7 +245,8 @@ class LocalFileSystemTest extends FreeSpec {
summary should contain inOrderElementsOf List( summary should contain inOrderElementsOf List(
"file found : root-file", "file found : root-file",
"action chosen : root-file : ToCopy", "action chosen : root-file : ToCopy",
"action finished : root-file : ToCopy") "action finished : root-file : ToCopy"
)
summary should contain inOrderElementsOf List( summary should contain inOrderElementsOf List(
"file found : subdir/leaf-file", "file found : subdir/leaf-file",
"action chosen : subdir/leaf-file : DoNothing", "action chosen : subdir/leaf-file : DoNothing",
@ -248,14 +257,18 @@ class LocalFileSystemTest extends FreeSpec {
} }
"scanDelete" - { "scanDelete" - {
def sender(configuration: Configuration, objects: RemoteObjects) def sender(
: UIO[MessageChannel.ESender[Clock with Storage, Throwable, UIEvent]] = configuration: Configuration,
objects: RemoteObjects
): UIO[MessageChannel.ESender[Clock, Throwable, UIEvent]] =
UIO { uiChannel => UIO { uiChannel =>
(for { (for {
_ <- LocalFileSystem.scanDelete(configuration, _ <- LocalFileSystem.scanDelete(
configuration,
uiChannel, uiChannel,
objects, objects,
archive) archive
)
} yield ()) <* MessageChannel.endChannel(uiChannel) } yield ()) <* MessageChannel.endChannel(uiChannel)
} }
def receiver(): UIO[MessageChannel.UReceiver[Any, UIEvent]] = def receiver(): UIO[MessageChannel.UReceiver[Any, UIEvent]] =
@ -278,10 +291,12 @@ class LocalFileSystemTest extends FreeSpec {
val remoteObjects = RemoteObjects.create( val remoteObjects = RemoteObjects.create(
MapView( MapView(
MD5HashData.Root.hash -> MD5HashData.Root.remoteKey, MD5HashData.Root.hash -> MD5HashData.Root.remoteKey,
MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey).toMap.asJava, MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey
).toMap.asJava,
MapView( MapView(
MD5HashData.Root.remoteKey -> MD5HashData.Root.hash, MD5HashData.Root.remoteKey -> MD5HashData.Root.hash,
MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash).toMap.asJava MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash
).toMap.asJava
) )
"do nothing for all files" - { "do nothing for all files" - {
"no archive actions" in { "no archive actions" in {
@ -293,8 +308,10 @@ class LocalFileSystemTest extends FreeSpec {
"ui is updated" in { "ui is updated" in {
uiEvents.set(List.empty) uiEvents.set(List.empty)
runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv)) runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv))
uiEventsSummary shouldEqual List("key found: root-file", uiEventsSummary shouldEqual List(
"key found: subdir/leaf-file") "key found: root-file",
"key found: subdir/leaf-file"
)
} }
} }
} }
@ -302,12 +319,16 @@ class LocalFileSystemTest extends FreeSpec {
val extraHash = MD5Hash.create("extra") val extraHash = MD5Hash.create("extra")
val extraObject = RemoteKey.create("extra") val extraObject = RemoteKey.create("extra")
val remoteObjects = RemoteObjects.create( val remoteObjects = RemoteObjects.create(
MapView(MD5HashData.Root.hash -> MD5HashData.Root.remoteKey, MapView(
MD5HashData.Root.hash -> MD5HashData.Root.remoteKey,
MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey, MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey,
extraHash -> extraObject).toMap.asJava, extraHash -> extraObject
MapView(MD5HashData.Root.remoteKey -> MD5HashData.Root.hash, ).toMap.asJava,
MapView(
MD5HashData.Root.remoteKey -> MD5HashData.Root.hash,
MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash, MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash,
extraObject -> extraHash).toMap.asJava extraObject -> extraHash
).toMap.asJava
) )
"remove the extra object" - { "remove the extra object" - {
"archive delete action" in { "archive delete action" in {
@ -339,18 +360,22 @@ class LocalFileSystemTest extends FreeSpec {
.get() .get()
.reverse .reverse
.map { .map {
case FileFound(localFile) => case uie: FileFound =>
String.format("file found : %s", localFile.remoteKey.key) String.format("file found : %s", uie.localFile.remoteKey.key)
case ActionChosen(action) => case uie: ActionChosen =>
String.format("action chosen : %s : %s", String.format(
action.remoteKey.key, "action chosen : %s : %s",
action.getClass.getSimpleName) uie.action.remoteKey.key,
case ActionFinished(action, actionCounter, bytesCounter, event) => uie.action.getClass.getSimpleName
String.format("action finished : %s : %s", )
action.remoteKey.key, case uie: ActionFinished =>
action.getClass.getSimpleName) String.format(
case KeyFound(remoteKey) => "action finished : %s : %s",
String.format("key found: %s", remoteKey.key) uie.action.remoteKey.key,
uie.action.getClass.getSimpleName
)
case uie: KeyFound =>
String.format("key found: %s", uie.remoteKey.key)
case x => String.format("unknown : %s", x.getClass.getSimpleName) case x => String.format("unknown : %s", x.getClass.getSimpleName)
} }
} }

View file

@ -0,0 +1,65 @@
package net.kemitix.thorp.storage.aws;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.DeleteObjectRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import net.kemitix.thorp.domain.*;
import net.kemitix.thorp.storage.Storage;
import net.kemitix.thorp.uishell.UploadEventListener;
import java.util.function.Function;
public class S3Storage implements Storage {
private final AmazonS3Client client =
AmazonS3Client.create(AmazonS3ClientBuilder.standard().build());
private final S3TransferManager transferManager =
S3TransferManager.create(
TransferManagerBuilder.defaultTransferManager());
private final Function<PutObjectRequest, StorageEvent> uploader =
S3Uploader.uploader(transferManager);
private final Function<ListObjectsV2Request, RemoteObjects> lister =
S3Lister.lister(client);
private final Function<CopyObjectRequest, StorageEvent> copier =
S3Copier.copier(client);
private final Function<DeleteObjectRequest, StorageEvent> deleter =
S3Deleter.deleter(client);
@Override
public RemoteObjects list(
Bucket bucket,
RemoteKey prefix
) {
return lister.apply(S3Lister.request(bucket, prefix));
}
@Override
public StorageEvent upload(
LocalFile localFile,
Bucket bucket,
UploadEventListener.Settings listener
) {
return uploader.apply(S3Uploader.request(localFile, bucket));
}
@Override
public StorageEvent copy(
Bucket bucket,
RemoteKey sourceKey,
MD5Hash hash,
RemoteKey targetKey
) {
return copier.apply(S3Copier.request(bucket, sourceKey, hash, targetKey));
}
@Override
public StorageEvent delete(
Bucket bucket,
RemoteKey remoteKey
) {
return deleter.apply(S3Deleter.request(bucket, remoteKey));
}
}

View file

@ -1,59 +0,0 @@
package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import com.amazonaws.services.s3.transfer.TransferManagerBuilder
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.Storage
import net.kemitix.thorp.storage.Storage.Service
import net.kemitix.thorp.uishell.UploadEventListener
import zio.{RIO, UIO}
object S3Storage {
trait Live extends Storage {
val storage: Service = new Service {
private val client: AmazonS3Client =
AmazonS3Client.create(AmazonS3ClientBuilder.standard().build())
private val transferManager: S3TransferManager =
S3TransferManager.create(TransferManagerBuilder.defaultTransferManager)
private val copier = S3Copier.copier(client)
private val uploader = S3Uploader.uploader(transferManager)
private val deleter = S3Deleter.deleter(client)
private val lister = S3Lister.lister(client)
override def listObjects(bucket: Bucket,
prefix: RemoteKey): RIO[Storage, RemoteObjects] =
UIO {
lister(S3Lister.request(bucket, prefix))
}
override def upload(localFile: LocalFile,
bucket: Bucket,
listenerSettings: UploadEventListener.Settings,
): UIO[StorageEvent] =
UIO {
uploader(S3Uploader.request(localFile, bucket))
}
override def copy(bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey): UIO[StorageEvent] =
UIO {
copier(S3Copier.request(bucket, sourceKey, hash, targetKey))
}
override def delete(bucket: Bucket,
remoteKey: RemoteKey): UIO[StorageEvent] =
UIO {
deleter(S3Deleter.request(bucket, remoteKey))
}
override def shutdown: UIO[StorageEvent] = {
UIO(transferManager.shutdownNow(true)) *> UIO(client.shutdown())
.map(_ => StorageEvent.shutdownEvent())
}
}
}
object Live extends Live
}

View file

@ -1,10 +1,7 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.Storage import net.kemitix.thorp.storage.Storage
import net.kemitix.thorp.uishell.UploadEventListener
import org.scalamock.scalatest.MockFactory import org.scalamock.scalatest.MockFactory
import zio.{RIO, UIO}
trait AmazonS3ClientTestFixture extends MockFactory { trait AmazonS3ClientTestFixture extends MockFactory {
@ -17,47 +14,47 @@ trait AmazonS3ClientTestFixture extends MockFactory {
case class Fixture(amazonS3Client: AmazonS3Client, case class Fixture(amazonS3Client: AmazonS3Client,
amazonS3TransferManager: S3TransferManager, amazonS3TransferManager: S3TransferManager,
) { ) {
lazy val storageService: Storage.Service = lazy val storageService: Storage = Storage.getInstance()
new Storage.Service { // new Storage.Service {
//
private val client = amazonS3Client // private val client = amazonS3Client
private val transferManager = amazonS3TransferManager // private val transferManager = amazonS3TransferManager
//
override def listObjects( // override def listObjects(
bucket: Bucket, // bucket: Bucket,
prefix: RemoteKey // prefix: RemoteKey
): RIO[Storage, RemoteObjects] = // ): RIO[Storage, RemoteObjects] =
UIO { // UIO {
S3Lister.lister(client)(S3Lister.request(bucket, prefix)) // S3Lister.lister(client)(S3Lister.request(bucket, prefix))
} // }
//
override def upload(localFile: LocalFile, // override def upload(localFile: LocalFile,
bucket: Bucket, // bucket: Bucket,
listenerSettings: UploadEventListener.Settings, // listenerSettings: UploadEventListener.Settings,
): UIO[StorageEvent] = // ): UIO[StorageEvent] =
UIO( // UIO(
S3Uploader // S3Uploader
.uploader(transferManager)(S3Uploader.request(localFile, bucket)) // .uploader(transferManager)(S3Uploader.request(localFile, bucket))
) // )
//
override def copy(bucket: Bucket, // override def copy(bucket: Bucket,
sourceKey: RemoteKey, // sourceKey: RemoteKey,
hash: MD5Hash, // hash: MD5Hash,
targetKey: RemoteKey): UIO[StorageEvent] = // targetKey: RemoteKey): UIO[StorageEvent] =
UIO { // UIO {
val request = S3Copier.request(bucket, sourceKey, hash, targetKey) // val request = S3Copier.request(bucket, sourceKey, hash, targetKey)
S3Copier.copier(client)(request) // S3Copier.copier(client)(request)
} // }
//
override def delete(bucket: Bucket, // override def delete(bucket: Bucket,
remoteKey: RemoteKey): UIO[StorageEvent] = // remoteKey: RemoteKey): UIO[StorageEvent] =
UIO(S3Deleter.deleter(client)(S3Deleter.request(bucket, remoteKey))) // UIO(S3Deleter.deleter(client)(S3Deleter.request(bucket, remoteKey)))
//
override def shutdown: UIO[StorageEvent] = { // override def shutdown: UIO[StorageEvent] = {
UIO(transferManager.shutdownNow(true)) *> UIO(client.shutdown()) // UIO(transferManager.shutdownNow(true)) *> UIO(client.shutdown())
.map(_ => StorageEvent.shutdownEvent()) // .map(_ => StorageEvent.shutdownEvent())
} // }
} // }
} }
} }

View file

@ -21,22 +21,6 @@
<groupId>net.kemitix.thorp</groupId> <groupId>net.kemitix.thorp</groupId>
<artifactId>thorp-domain</artifactId> <artifactId>thorp-domain</artifactId>
</dependency> </dependency>
<!-- scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!-- zio -->
<dependency>
<groupId>dev.zio</groupId>
<artifactId>zio_2.13</artifactId>
</dependency>
<dependency>
<groupId>dev.zio</groupId>
<artifactId>zio-streams_2.13</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

View file

@ -0,0 +1,37 @@
package net.kemitix.thorp.storage;
import net.kemitix.thorp.domain.*;
import net.kemitix.thorp.uishell.UploadEventListener;
import java.util.ServiceLoader;
public interface Storage {
RemoteObjects list(
Bucket bucket,
RemoteKey prefix
);
StorageEvent upload(
LocalFile localFile,
Bucket bucket,
UploadEventListener.Settings listener
);
StorageEvent copy(
Bucket bucket,
RemoteKey sourceKey,
MD5Hash hash,
RemoteKey targetKey
);
StorageEvent delete(
Bucket bucket,
RemoteKey remoteKey
);
static Storage getInstance() {
return ServiceLoader.load(Storage.class).iterator().next();
}
}

View file

@ -1,99 +0,0 @@
package net.kemitix.thorp.storage
import net.kemitix.thorp.domain._
import net.kemitix.thorp.uishell.UploadEventListener
import zio.{RIO, Task, UIO, ZIO}
trait Storage {
val storage: Storage.Service
}
object Storage {
trait Service {
def listObjects(bucket: Bucket,
prefix: RemoteKey): RIO[Storage, RemoteObjects]
def upload(localFile: LocalFile,
bucket: Bucket,
listenerSettings: UploadEventListener.Settings,
): ZIO[Storage, Nothing, StorageEvent]
def copy(bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey): ZIO[Storage, Nothing, StorageEvent]
def delete(bucket: Bucket, remoteKey: RemoteKey): UIO[StorageEvent]
def shutdown: UIO[StorageEvent]
}
trait Test extends Storage {
def listResult: Task[RemoteObjects] =
Task.die(new NotImplementedError)
def uploadResult: UIO[StorageEvent] =
Task.die(new NotImplementedError)
def copyResult: UIO[StorageEvent] =
Task.die(new NotImplementedError)
def deleteResult: UIO[StorageEvent] =
Task.die(new NotImplementedError)
def shutdownResult: UIO[StorageEvent] =
Task.die(new NotImplementedError)
val storage: Service = new Service {
override def listObjects(bucket: Bucket,
prefix: RemoteKey): RIO[Storage, RemoteObjects] =
listResult
override def upload(
localFile: LocalFile,
bucket: Bucket,
listenerSettings: UploadEventListener.Settings
): ZIO[Storage, Nothing, StorageEvent] =
uploadResult
override def copy(
bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey
): ZIO[Storage, Nothing, StorageEvent] =
copyResult
override def delete(bucket: Bucket,
remoteKey: RemoteKey): UIO[StorageEvent] =
deleteResult
override def shutdown: UIO[StorageEvent] =
shutdownResult
}
}
object Test extends Test
final def list(bucket: Bucket,
prefix: RemoteKey): RIO[Storage, RemoteObjects] =
ZIO.accessM(_.storage listObjects (bucket, prefix))
final def upload(
localFile: LocalFile,
bucket: Bucket,
listenerSettings: UploadEventListener.Settings
): ZIO[Storage, Nothing, StorageEvent] =
ZIO.accessM(_.storage upload (localFile, bucket, listenerSettings))
final def copy(bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey): ZIO[Storage, Nothing, StorageEvent] =
ZIO.accessM(_.storage copy (bucket, sourceKey, hash, targetKey))
final def delete(bucket: Bucket,
remoteKey: RemoteKey): ZIO[Storage, Nothing, StorageEvent] =
ZIO.accessM(_.storage delete (bucket, remoteKey))
}

View file

@ -12,6 +12,13 @@
<name>uishell</name> <name>uishell</name>
<dependencies> <dependencies>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- thorp --> <!-- thorp -->
<dependency> <dependency>
<groupId>net.kemitix.thorp</groupId> <groupId>net.kemitix.thorp</groupId>

View file

@ -0,0 +1,98 @@
package net.kemitix.thorp.uishell;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import net.kemitix.thorp.config.Configuration;
import net.kemitix.thorp.console.Console;
import net.kemitix.thorp.domain.LocalFile;
import net.kemitix.thorp.domain.RemoteKey;
import net.kemitix.thorp.domain.StringUtil;
import net.kemitix.thorp.domain.Terminal;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import static net.kemitix.thorp.domain.SizeTranslation.sizeInEnglish;
import static net.kemitix.thorp.domain.Terminal.progressBar;
public class ProgressUI {
static UploadState uploadState(long transferred, long fileLength) {
return new UploadState(transferred, fileLength);
}
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
static class UploadState {
public final long transferred;
public final long fileLength;
}
private static final AtomicReference<Map<RemoteKey, UploadState>> uploads =
new AtomicReference<>(Collections.emptyMap());
private static final int statusHeight = 2;
public static void requestCycle(
Configuration configuration,
LocalFile localFile,
long bytesTransferred,
int index,
long totalBytesSoFar
) {
if (bytesTransferred < localFile.length) {
stillUploading(
localFile.remoteKey,
localFile.length,
bytesTransferred
);
} else {
finishedUploading(localFile.remoteKey);
}
}
static void stillUploading(RemoteKey remoteKey,
long fileLength,
long bytesTransferred) {
Map<RemoteKey, UploadState> current =
uploads.updateAndGet(map -> {
HashMap<RemoteKey, UploadState> updated = new HashMap<>(map);
updated.put(
remoteKey,
uploadState(bytesTransferred, fileLength));
return updated;
});
String resetCursor = StringUtil.repeat(
Terminal.cursorPrevLine(statusHeight), current.size());
current.forEach((key, state) -> {
String percent = String.format("%2d", (state.transferred * 100) / state.fileLength);
String transferred = sizeInEnglish(state.transferred);
String fileLength1 = sizeInEnglish(state.fileLength);
String line1 = String.format("%sUploading%s: %s:%s",
Terminal.green, Terminal.reset,
key.key(),
Terminal.eraseLineForward);
String line2body = String.format(
"%s%% %s or %s",
percent, transferred, fileLength1);
String bar = progressBar(
state.transferred,
state.fileLength,
Terminal.width() - line2body.length());
String line2 = String.join("",
Terminal.green, line2body, Terminal.reset,
bar, Terminal.eraseLineForward);
Console.putStrLn(line1);
Console.putStrLn(line2);
});
Console.putStr(resetCursor);
}
static void finishedUploading(RemoteKey remoteKey) {
uploads.updateAndGet(map -> {
Map<RemoteKey, UploadState> updated = new HashMap<>(map);
updated.remove(remoteKey);
return updated;
});
}
}

View file

@ -0,0 +1,105 @@
package net.kemitix.thorp.uishell;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import net.kemitix.thorp.domain.*;
public interface UIEvent {
static UIEvent showValidConfig() {
return new ShowValidConfig();
}
class ShowValidConfig implements UIEvent { }
static UIEvent remoteDataFetched(int size) {
return new RemoteDataFetched(size);
}
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
class RemoteDataFetched implements UIEvent {
public final int size;
}
static UIEvent showSummary(Counters counters) {
return new ShowSummary(counters);
}
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
class ShowSummary implements UIEvent {
public final Counters counters;
}
static UIEvent fileFound(LocalFile localFile) {
return new FileFound(localFile);
}
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
class FileFound implements UIEvent {
public final LocalFile localFile;
}
static UIEvent actionChosen(Action action) {
return new ActionChosen(action);
}
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
class ActionChosen implements UIEvent {
public final Action action;
}
/**
* The content of the file ({{hash}}) that will be placed
* at {{remoteKey}} is already being uploaded to another
* location. Once that upload has completed, its RemoteKey
* will become available and a Copy action can be made.
* @param remoteKey where this upload will copy the other to
* @param hash the hash of the file being uploaded
*/
static UIEvent awaitingAnotherUpload(RemoteKey remoteKey, MD5Hash hash) {
return new AwaitingAnotherUpload(remoteKey, hash);
}
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
class AwaitingAnotherUpload implements UIEvent {
public final RemoteKey remoteKey;
public final MD5Hash hash;
}
static UIEvent anotherUploadWaitComplete(Action action) {
return new AnotherUploadWaitComplete(action);
}
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
class AnotherUploadWaitComplete implements UIEvent {
public final Action action;
}
static UIEvent actionFinished(Action action,
int actionCounter,
long bytesCounter,
StorageEvent event) {
return new ActionFinished(action, actionCounter, bytesCounter, event);
}
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
class ActionFinished implements UIEvent {
public final Action action;
public final int actionCounter;
public final long bytesCounter;
public final StorageEvent event;
}
static UIEvent keyFound(RemoteKey remoteKey) {
return new KeyFound(remoteKey);
}
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
class KeyFound implements UIEvent {
public final RemoteKey remoteKey;
}
static UIEvent requestCycle(LocalFile localFile,
long bytesTransfered,
int index,
long totalBytesSoFar) {
return new RequestCycle(localFile, bytesTransfered, index, totalBytesSoFar);
}
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
class RequestCycle implements UIEvent {
public final LocalFile localFile;
public final long bytesTransferred;
public final int index;
public final long totalBytesSoFar;
}
}

View file

@ -0,0 +1,35 @@
package net.kemitix.thorp.uishell;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
public interface UploadProgressEvent {
static UploadProgressEvent transferEvent(String name) {
return new TransferEvent(name);
}
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
class TransferEvent implements UploadProgressEvent {
public final String name;
}
static UploadProgressEvent requestEvent(String name,
long bytes,
long transferred) {
return new RequestEvent(name, bytes, transferred);
}
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
class RequestEvent implements UploadProgressEvent {
public final String name;
public final long bytes;
public final long transferred;
}
static UploadProgressEvent bytesTransferEvent(String name) {
return new TransferEvent(name);
}
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
class ByteTransferEvent implements UploadProgressEvent {
public final String name;
}
}

View file

@ -1,19 +0,0 @@
package net.kemitix.thorp.uishell
import net.kemitix.eip.zio.MessageChannel
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.filesystem.FileSystem
import zio.clock.Clock
sealed trait ProgressEvent
object ProgressEvent {
type Env = Console
type ProgressSender =
MessageChannel.ESender[Clock with FileSystem, Throwable, ProgressEvent]
type ProgressReceiver =
MessageChannel.Receiver[ProgressEvent.Env, ProgressEvent]
type ProgressChannel = MessageChannel.Channel[Console, ProgressEvent]
final case class PingEvent() extends ProgressEvent
}

View file

@ -1,80 +0,0 @@
package net.kemitix.thorp.uishell
import java.util.concurrent.atomic.AtomicReference
import net.kemitix.thorp.config.Configuration
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.domain.SizeTranslation.sizeInEnglish
import net.kemitix.thorp.domain.Terminal.{eraseLineForward, progressBar}
import net.kemitix.thorp.domain.{LocalFile, RemoteKey, Terminal}
import zio.{UIO, ZIO}
import scala.io.AnsiColor.{GREEN, RESET}
object ProgressUI {
private case class UploadState(transferred: Long, fileLength: Long)
private val uploads: AtomicReference[Map[RemoteKey, UploadState]] =
new AtomicReference[Map[RemoteKey, UploadState]](Map.empty)
private val statusHeight = 2
def requestCycle(configuration: Configuration,
localFile: LocalFile,
bytesTransferred: Long,
index: Int,
totalBytesSoFar: Long): UIO[Unit] =
for {
_ <- ZIO.when(bytesTransferred < localFile.file.length())(
stillUploading(
localFile.remoteKey,
localFile.file.length(),
bytesTransferred
)
)
_ <- ZIO.when(bytesTransferred >= localFile.file.length()) {
finishedUploading(localFile.remoteKey)
}
} yield ()
private def stillUploading(remoteKey: RemoteKey,
fileLength: Long,
bytesTransferred: Long): UIO[Unit] = {
val current: Map[RemoteKey, UploadState] =
uploads.updateAndGet(
(m: Map[RemoteKey, UploadState]) =>
m.updated(remoteKey, UploadState(bytesTransferred, fileLength))
)
val resetCursor = s"${Terminal.cursorPrevLine(statusHeight) * current.size}"
ZIO.foreach(current) { entry =>
{
val (remoteKey, state) = entry
val percent = f"${(state.transferred * 100) / state.fileLength}%2d"
val transferred = sizeInEnglish(state.transferred)
val fileLength = sizeInEnglish(state.fileLength)
val line1 =
s"${GREEN}Uploading:$RESET ${remoteKey.key}$eraseLineForward"
val line2body = s"($percent%) $transferred of $fileLength "
val bar =
progressBar(
state.transferred.toDouble,
state.fileLength.toDouble,
Terminal.width - line2body.length
)
val line2 = s"$GREEN$line2body$RESET$bar$eraseLineForward"
UIO(Console.putStrLn(line1)) *>
UIO(Console.putStrLn(line2))
}
} *> UIO(Console.putStr(resetCursor))
}
def finishedUploading(remoteKey: RemoteKey): ZIO[Any, Nothing, Unit] = {
UIO(
uploads
.updateAndGet((m: Map[RemoteKey, UploadState]) => m.removed(remoteKey))
) *> UIO.unit
}
}

View file

@ -1,44 +0,0 @@
package net.kemitix.thorp.uishell
import net.kemitix.thorp.domain._
sealed trait UIEvent
object UIEvent {
case object ShowValidConfig extends UIEvent
case class RemoteDataFetched(size: Int) extends UIEvent
case class ShowSummary(counters: Counters) extends UIEvent
case class FileFound(localFile: LocalFile) extends UIEvent
case class ActionChosen(action: Action) extends UIEvent
/**
* The content of the file ({{hash}}) that will be placed
* at {{remoteKey}} is already being uploaded to another
* location. Once that upload has completed, its RemoteKey
* will become available and a Copy action can be made.
* @param remoteKey where this upload will copy the other to
* @param hash the hash of the file being uploaded
*/
case class AwaitingAnotherUpload(remoteKey: RemoteKey, hash: MD5Hash)
extends UIEvent
case class AnotherUploadWaitComplete(action: Action) extends UIEvent
case class ActionFinished(action: Action,
actionCounter: Int,
bytesCounter: Long,
event: StorageEvent)
extends UIEvent
case class KeyFound(remoteKey: RemoteKey) extends UIEvent
case class RequestCycle(localFile: LocalFile,
bytesTransferred: Long,
index: Int,
totalBytesSoFar: Long)
extends UIEvent
}

View file

@ -14,31 +14,28 @@ object UIShell {
): UIO[MessageChannel.UReceiver[Any, UIEvent]] = ): UIO[MessageChannel.UReceiver[Any, UIEvent]] =
UIO { uiEventMessage => UIO { uiEventMessage =>
uiEventMessage.body match { uiEventMessage.body match {
case UIEvent.ShowValidConfig => showValidConfig(configuration) case _: UIEvent.ShowValidConfig => showValidConfig(configuration)
case UIEvent.RemoteDataFetched(size) => remoteDataFetched(size) case uie: UIEvent.RemoteDataFetched => remoteDataFetched(uie.size)
case UIEvent.ShowSummary(counters) => showSummary(counters) case uie: UIEvent.ShowSummary => showSummary(uie.counters)
case UIEvent.FileFound(localFile) => fileFound(configuration, localFile) case uie: UIEvent.FileFound => fileFound(configuration, uie.localFile)
case UIEvent.ActionChosen(action) => actionChosen(configuration, action) case uie: UIEvent.ActionChosen =>
case UIEvent.AwaitingAnotherUpload(remoteKey, hash) => actionChosen(configuration, uie.action)
awaitingUpload(remoteKey, hash) case uie: UIEvent.AwaitingAnotherUpload =>
case UIEvent.AnotherUploadWaitComplete(action) => awaitingUpload(uie.remoteKey, uie.hash)
uploadWaitComplete(action) case uie: UIEvent.AnotherUploadWaitComplete =>
case UIEvent.ActionFinished(_, _, _, event) => uploadWaitComplete(uie.action)
actionFinished(configuration, event) case uie: UIEvent.ActionFinished =>
case UIEvent.KeyFound(_) => UIO(()) actionFinished(configuration, uie.event)
case UIEvent.RequestCycle( case _: UIEvent.KeyFound => UIO.unit
localFile, case uie: UIEvent.RequestCycle =>
bytesTransferred,
index,
totalBytesSoFar
) =>
ProgressUI.requestCycle( ProgressUI.requestCycle(
configuration, configuration,
localFile, uie.localFile,
bytesTransferred, uie.bytesTransferred,
index, uie.index,
totalBytesSoFar uie.totalBytesSoFar
) )
UIO.unit
} }
} }
@ -61,6 +58,7 @@ object UIShell {
Console Console
.putMessageLnB(ConsoleOut.uploadComplete(remoteKey), batchMode) .putMessageLnB(ConsoleOut.uploadComplete(remoteKey), batchMode)
ProgressUI.finishedUploading(remoteKey) ProgressUI.finishedUploading(remoteKey)
UIO.unit
case deleteEvent: StorageEvent.DeleteEvent => case deleteEvent: StorageEvent.DeleteEvent =>
val remoteKey = deleteEvent.remoteKey val remoteKey = deleteEvent.remoteKey
Console.putMessageLnB(ConsoleOut.deleteComplete(remoteKey), batchMode) Console.putMessageLnB(ConsoleOut.deleteComplete(remoteKey), batchMode)
@ -69,7 +67,7 @@ object UIShell {
val remoteKey = errorEvent.remoteKey val remoteKey = errorEvent.remoteKey
val action = errorEvent.action val action = errorEvent.action
val e = errorEvent.e val e = errorEvent.e
ProgressUI.finishedUploading(remoteKey) *> ProgressUI.finishedUploading(remoteKey)
UIO( UIO(
Console.putMessageLnB( Console.putMessageLnB(
ConsoleOut.errorQueueEventOccurred(action, e), ConsoleOut.errorQueueEventOccurred(action, e),

View file

@ -9,13 +9,11 @@ import net.kemitix.thorp.uishell.UploadProgressEvent.RequestEvent
object UploadEventListener { object UploadEventListener {
final case class Settings( final case class Settings(uiChannel: UChannel[Any, UIEvent],
uiChannel: UChannel[Any, UIEvent],
localFile: LocalFile, localFile: LocalFile,
index: Int, index: Int,
totalBytesSoFar: Long, totalBytesSoFar: Long,
batchMode: Boolean batchMode: Boolean)
)
def listener(settings: Settings): UploadProgressEvent => Unit = { def listener(settings: Settings): UploadProgressEvent => Unit = {
val bytesTransferred = new AtomicLong(0L) val bytesTransferred = new AtomicLong(0L)
@ -25,10 +23,14 @@ object UploadEventListener {
case e: RequestEvent => case e: RequestEvent =>
settings.uiChannel( settings.uiChannel(
Message.withBody( Message.withBody(
UIEvent.RequestCycle(settings.localFile, UIEvent.requestCycle(
settings.localFile,
bytesTransferred.addAndGet(e.transferred), bytesTransferred.addAndGet(e.transferred),
settings.index, settings.index,
settings.totalBytesSoFar))) settings.totalBytesSoFar
)
)
)
case _ => () case _ => ()
} }
} }

View file

@ -1,23 +0,0 @@
package net.kemitix.thorp.uishell
sealed trait UploadProgressEvent {
def name: String
}
object UploadProgressEvent {
final case class TransferEvent(
name: String
) extends UploadProgressEvent
final case class RequestEvent(
name: String,
bytes: Long,
transferred: Long
) extends UploadProgressEvent
final case class ByteTransferEvent(
name: String
) extends UploadProgressEvent
}