Remove ZIO (#479)

* [BROKEN]Remove ZIO and EIP-ZIO

* [BROKEN]uishell.UIShell.receiver: implement

* [BROKEN]domain.Channel: new implementation replacing MessageChannel

* [BROKEN] use domain.Channel

* Shutdown filescanner channel when finished

* start the uiChannel

* domain.Channel: channel completes once shutdown and queue empty

* uishell: down use eraseLineForward in batch mode

* lib: set file runner before adding listener

* uishell: don’t log do nothing events when not in batch mode

* domain.Channel: if exception in child thread then shutdown channel

* uishell: use correct line endings for showing chosen actions

* domain.Channel: don’t wait for shutdown if not running

* domain: remove legacy MessageChannel

* domain.Channel: don’t hold thread array

* lib.LocalFileSystem: restore delete scanner

* lib.LocalFileSystem: shutdown deletetion channel

* domain.Channel: improved shutdown logic

* clean up
This commit is contained in:
Paul Campbell 2020-06-24 22:39:16 +01:00 committed by GitHub
parent ec4ed099a5
commit a4bd24ebce
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 981 additions and 1153 deletions

View file

@ -51,12 +51,6 @@
<groupId>org.scala-lang</groupId> <groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId> <artifactId>scala-library</artifactId>
</dependency> </dependency>
<!-- eip-zio -->
<dependency>
<groupId>net.kemitix</groupId>
<artifactId>eip-zio_2.13</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

View file

@ -1,17 +1,8 @@
package net.kemitix.thorp package net.kemitix.thorp
import net.kemitix.thorp.lib.FileScanner object Main {
import zio.clock.Clock
import zio.{App, ZEnv, ZIO}
object Main extends App { def main(args: Array[String]): Unit =
Program.run(args.toList)
object LiveThorpApp extends Clock.Live with FileScanner.Live
override def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
Program
.run(args)
.provide(LiveThorpApp)
.fold(_ => 1, _ => 0)
} }

View file

@ -1,7 +1,5 @@
package net.kemitix.thorp package net.kemitix.thorp
import net.kemitix.eip.zio.MessageChannel.UChannel
import net.kemitix.eip.zio.{Message, MessageChannel}
import net.kemitix.thorp.cli.CliArgs import net.kemitix.thorp.cli.CliArgs
import net.kemitix.thorp.config._ import net.kemitix.thorp.config._
import net.kemitix.thorp.console._ import net.kemitix.thorp.console._
@ -11,12 +9,10 @@ import net.kemitix.thorp.domain.StorageEvent.{
ErrorEvent, ErrorEvent,
UploadEvent UploadEvent
} }
import net.kemitix.thorp.domain.{Counters, RemoteObjects, StorageEvent} import net.kemitix.thorp.domain.{Channel, Counters, StorageEvent}
import net.kemitix.thorp.lib._ import net.kemitix.thorp.lib.{LocalFileSystem, UnversionedMirrorArchive}
import net.kemitix.thorp.storage.Storage import net.kemitix.thorp.storage.Storage
import net.kemitix.thorp.uishell.{UIEvent, UIShell} import net.kemitix.thorp.uishell.{UIEvent, UIShell}
import zio.clock.Clock
import zio.{IO, RIO, UIO, ZIO}
import scala.io.AnsiColor.{RESET, WHITE} import scala.io.AnsiColor.{RESET, WHITE}
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
@ -26,91 +22,68 @@ trait Program {
val version = "0.11.0" val version = "0.11.0"
lazy val versionLabel = s"${WHITE}Thorp v$version$RESET" lazy val versionLabel = s"${WHITE}Thorp v$version$RESET"
def run(args: List[String]): ZIO[Clock with FileScanner, Nothing, Unit] = { def run(args: List[String]): Unit = {
(for { val cli = CliArgs.parse(args.toArray)
cli <- UIO(CliArgs.parse(args.toArray)) val config = ConfigurationBuilder.buildConfig(cli)
config <- IO(ConfigurationBuilder.buildConfig(cli)) Console.putStrLn(versionLabel)
_ <- UIO(Console.putStrLn(versionLabel)) if (!showVersion(cli)) {
_ <- ZIO.when(!showVersion(cli))( executeWithUI(config)
executeWithUI(config).catchAll(handleErrors) }
)
} yield ())
.catchAll(e => {
Console.putStrLn("An ERROR occurred:")
Console.putStrLn(e.getMessage)
UIO.unit
})
} }
private def showVersion: ConfigOptions => Boolean = private def showVersion: ConfigOptions => Boolean =
cli => ConfigQuery.showVersion(cli) cli => ConfigQuery.showVersion(cli)
private def executeWithUI( private def executeWithUI(configuration: Configuration): Unit = {
configuration: Configuration val uiChannel: Channel[UIEvent] = Channel.create("thorp-ui")
): ZIO[Clock with FileScanner, Throwable, Unit] = uiChannel.addListener(UIShell.receiver(configuration))
for { uiChannel.run(sink => execute(configuration, sink), "thorp-main")
uiEventSender <- execute(configuration) uiChannel.start()
uiEventReceiver <- UIShell.receiver(configuration) uiChannel.waitForShutdown()
_ <- MessageChannel.pointToPoint(uiEventSender)(uiEventReceiver).runDrain }
} yield ()
type UIChannel = UChannel[Any, UIEvent] private def execute(configuration: Configuration,
uiSink: Channel.Sink[UIEvent]) = {
showValidConfig(uiSink)
val remoteObjects =
fetchRemoteData(configuration, uiSink)
val archive = UnversionedMirrorArchive
val storageEvents = LocalFileSystem
.scanCopyUpload(configuration, uiSink, remoteObjects, archive)
val deleteEvents = LocalFileSystem
.scanDelete(configuration, uiSink, remoteObjects, archive)
showSummary(uiSink)(storageEvents ++ deleteEvents)
uiSink.shutdown();
}
private def execute( private def showValidConfig(uiSink: Channel.Sink[UIEvent]): Unit =
configuration: Configuration uiSink.accept(UIEvent.showValidConfig)
): UIO[MessageChannel.ESender[Clock with FileScanner, Throwable, UIEvent]] =
UIO { uiChannel =>
(for {
_ <- showValidConfig(uiChannel)
remoteData <- fetchRemoteData(configuration, uiChannel)
archive <- UIO(UnversionedMirrorArchive)
copyUploadEvents <- LocalFileSystem
.scanCopyUpload(configuration, uiChannel, remoteData, archive)
deleteEvents <- LocalFileSystem
.scanDelete(configuration, uiChannel, remoteData, archive)
_ <- showSummary(uiChannel)(copyUploadEvents ++ deleteEvents)
} yield ()) <* MessageChannel.endChannel(uiChannel)
}
private def showValidConfig(uiChannel: UIChannel) = private def fetchRemoteData(configuration: Configuration,
Message.create(UIEvent.showValidConfig) >>= MessageChannel.send(uiChannel) uiSink: Channel.Sink[UIEvent]) = {
private def fetchRemoteData(
configuration: Configuration,
uiChannel: UIChannel
): ZIO[Clock, Throwable, RemoteObjects] = {
val bucket = configuration.bucket val bucket = configuration.bucket
val prefix = configuration.prefix val prefix = configuration.prefix
val objects = Storage.getInstance().list(bucket, prefix) val objects = Storage.getInstance().list(bucket, prefix)
for { uiSink.accept(UIEvent.remoteDataFetched(objects.byKey.size))
_ <- Message.create(UIEvent.remoteDataFetched(objects.byKey.size)) >>= MessageChannel objects
.send(uiChannel)
} yield objects
} }
private def handleErrors(throwable: Throwable) = //TODO not called
UIO(Console.putStrLn("There were errors:")) *> logValidationErrors(
throwable
)
private def logValidationErrors(throwable: Throwable) = private def logValidationErrors(throwable: Throwable) =
throwable match { throwable match {
case validateError: ConfigValidationException => case validateError: ConfigValidationException =>
ZIO.foreach_(validateError.getErrors.asScala)( validateError.getErrors.asScala
error => UIO(Console.putStrLn(s"- $error")) .map(error => Console.putStrLn(s"- $error"))
)
} }
private def showSummary( private def showSummary(
uiChannel: UIChannel uiSink: Channel.Sink[UIEvent]
)(events: Seq[StorageEvent]): RIO[Clock, Unit] = { )(events: Seq[StorageEvent]): Unit = {
val counters = events.foldLeft(Counters.empty)(countActivities) val counters = events.foldLeft(Counters.empty)(countActivities)
Message.create(UIEvent.showSummary(counters)) >>= uiSink.accept(UIEvent.showSummary(counters))
MessageChannel.send(uiChannel)
} }
private def countActivities: (Counters, StorageEvent) => Counters = private def countActivities =
(counters: Counters, s3Action: StorageEvent) => { (counters: Counters, s3Action: StorageEvent) => {
s3Action match { s3Action match {
case _: UploadEvent => counters.incrementUploaded() case _: UploadEvent => counters.incrementUploaded()
@ -120,7 +93,6 @@ trait Program {
case _ => counters case _ => counters
} }
} }
} }
object Program extends Program object Program extends Program

View file

@ -3,7 +3,6 @@ package net.kemitix.thorp.config;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.util.List;
public interface ParseConfigFile { public interface ParseConfigFile {
static ConfigOptions parseFile(File file) throws IOException { static ConfigOptions parseFile(File file) throws IOException {

View file

@ -15,9 +15,8 @@ public interface SourceConfigLoader {
ConfigOptions.create( ConfigOptions.create(
sources.paths() sources.paths()
.stream() .stream()
.peek(path -> { .peek(path ->
System.out.println("Using source: " + path); System.out.println("Using source: " + path))
})
.map(ConfigOption::source) .map(ConfigOption::source)
.collect(Collectors.toList())); .collect(Collectors.toList()));
// add settings from each source as options // add settings from each source as options

Binary file not shown.

Before

Width:  |  Height:  |  Size: 233 KiB

After

Width:  |  Height:  |  Size: 219 KiB

View file

@ -0,0 +1,188 @@
package net.kemitix.thorp.domain;
import lombok.RequiredArgsConstructor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
public interface Channel<T> {
static <T> Channel<T> create(String name) {
return new ChannelImpl<T>(name);
}
void start();
Channel<T> add(T item);
Channel<T> addAll(Collection<T> items);
Channel<T> addListener(Listener<T> listener);
Channel<T> removeListener(Listener<T> listener);
Channel<T> run(Consumer<Sink<T>> program, String name);
void shutdown();
void shutdownNow() throws InterruptedException;
void waitForShutdown() throws InterruptedException;
class ChannelImpl<T> implements Channel<T> {
private final BlockingQueue<T> queue = new LinkedTransferQueue<>();
private final Runner<T> runner;
private final Thread thread;
public ChannelImpl(String name) {
runner = new Runner<T>(name, queue);
thread = new Thread(runner, name);
}
@Override
public void start() {
thread.start();
}
@Override
public Channel<T> add(T item) {
queue.add(item);
return this;
}
@Override
public Channel<T> addAll(Collection<T> items) {
queue.addAll(items);
return this;
}
@Override
public Channel<T> addListener(Listener<T> listener) {
runner.addListener(listener);
return this;
}
@Override
public Channel<T> removeListener(Listener<T> listener) {
runner.removeListener(listener);
return this;
}
@Override
public Channel<T> run(Consumer<Sink<T>> program, String name) {
return spawn(() -> program.accept(runner), name);
}
private Channel<T> spawn(Runnable runnable, String name) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
runnable.run();
} catch (Exception e) {
shutdown();
}
}
}, name);
thread.start();
return this;
}
@Override
public void shutdown() {
runner.shutdown();
}
@Override
public void shutdownNow() throws InterruptedException {
runner.shutdownNow();
}
@Override
public void waitForShutdown() throws InterruptedException {
runner.waitForShutdown();
}
}
@RequiredArgsConstructor
class Runner<T> implements Runnable, Sink<T> {
private final String name;
private final BlockingQueue<T> queue;
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final List<Listener<T>> listeners = new ArrayList<>();
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
private Thread runnerThread;
@Override
public void run() {
runnerThread = Thread.currentThread();
while(isRunning()) {
takeItem()
.ifPresent(item -> {
listeners.forEach(listener -> {
listener.accept(item);
});
});
}
shutdownLatch.countDown();
}
public void addListener(Listener<T> listener) {
listeners.add(listener);
}
public void removeListener(Listener<T> listener) {
listeners.remove(listener);
}
public Optional<T> takeItem() {
try {
return Optional.of(queue.take());
} catch (InterruptedException e) {
shutdown();
}
return Optional.empty();
}
private boolean isRunning() {
return !isShutdown();
}
private boolean isShutdown() {
return shutdown.get() && queue.isEmpty();
}
@Override
public void accept(T item) {
queue.add(item);
}
@Override
public void shutdown() {
if (isRunning()) {
shutdown.set(true);
}
if (queue.isEmpty() && runnerThread != null) {
runnerThread.interrupt();
}
}
public void shutdownNow() throws InterruptedException {
shutdown();
waitForShutdown();
}
public void waitForShutdown() throws InterruptedException {
if (isRunning())
shutdownLatch.await();
}
}
interface Listener<T> {
void accept(T item);
}
interface Sink<T> {
void accept(T item);
void shutdown();
}
}

View file

@ -1,71 +0,0 @@
package net.kemitix.thorp.domain;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public class MessageChannel<T> {
private final MessageSupplier<T> messageSupplier;
private final List<MessageConsumer<T>> messageConsumers;
private final Thread channelThread;
static <T> MessageChannel<T> create(MessageSupplier<T> supplier) {
List<MessageConsumer<T>> consumers = new ArrayList<>();
return new MessageChannel<T>(supplier, consumers,
new Thread(new ChannelRunner<T>(supplier, consumers)));
}
public static <T> BlockingQueue<T> createMessageSupplier(Class<T> messageClass) {
return new LinkedTransferQueue<>();
}
public void addMessageConsumer(MessageConsumer<T> consumer) {
messageConsumers.add(consumer);
}
public void startChannel() {
channelThread.start();
}
public void shutdownChannel() {
channelThread.interrupt();
}
public interface MessageSupplier<T> {
T take() throws InterruptedException;
boolean isComplete();
}
public interface MessageConsumer<T> {
void accept(T message);
}
@RequiredArgsConstructor
private static class ChannelRunner<T> implements Runnable {
AtomicBoolean shutdownTrigger = new AtomicBoolean(false);
private final MessageSupplier<T> supplier;
private final List<MessageConsumer<T>> consumers;
@Override
public void run() {
while (!shutdownTrigger.get()) {
try {
T message = supplier.take();
for (MessageConsumer<T> consumer : consumers) {
consumer.accept(message);
}
if (supplier.isComplete()) {
shutdownTrigger.set(true);
}
} catch (InterruptedException e) {
shutdownTrigger.set(true);
}
}
}
}
}

View file

@ -16,6 +16,7 @@
<dependency> <dependency>
<groupId>org.projectlombok</groupId> <groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId> <artifactId>lombok</artifactId>
<optional>true</optional>
</dependency> </dependency>
<!-- thorp --> <!-- thorp -->

View file

@ -0,0 +1,80 @@
package net.kemitix.thorp.lib;
import net.kemitix.thorp.config.Configuration;
import net.kemitix.thorp.domain.*;
import net.kemitix.thorp.filesystem.FileSystem;
import net.kemitix.thorp.filesystem.PathCache;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.security.NoSuchAlgorithmException;
import java.util.List;
public interface FileScanner {
static void scanSources(
Configuration configuration,
Channel.Sink<LocalFile> fileSink
) {
configuration.sources.paths()
.forEach(path ->
scanSource(configuration, fileSink, path));
fileSink.shutdown();
}
static void scanSource(
Configuration configuration,
Channel.Sink<LocalFile> fileSink,
Path sourcePath
) {
scanPath(configuration, fileSink, sourcePath);
}
static void scanPath(
Configuration configuration,
Channel.Sink<LocalFile> fileSink,
Path path
) {
// dirs
FileSystem.listDirs(path).forEach(dir ->
scanPath(configuration, fileSink, dir));
// files
List<File> files = FileSystem.listFiles(path);
files.forEach(file -> handleFile(configuration, fileSink, file));
}
static void handleFile(
Configuration configuration,
Channel.Sink<LocalFile> fileSink,
File file
) {
boolean isIncluded = Filters.isIncluded(configuration, file);
if (isIncluded) {
File source = configuration.sources.forPath(file.toPath()).toFile();
Hashes hashes = hashObject(file);
RemoteKey remoteKey =
RemoteKey.from(source.toPath(), configuration.prefix, file);
LocalFile localFile =
LocalFile.create(
file, source, hashes, remoteKey, file.length());
fileSink.accept(localFile);
}
}
static Hashes hashObject(File file) {
try {
return HashGenerator.hashObject(file.toPath());
} catch (IOException | NoSuchAlgorithmException e) {
throw new RuntimeException("Error hashing object: " + file, e);
}
}
static PathCache findCache(Path sourcePath) {
try {
return FileSystem.findCache(sourcePath);
} catch (IOException e) {
throw new RuntimeException(
"Error finding source cache for source: " + sourcePath, e);
}
}
}

View file

@ -1,160 +0,0 @@
package net.kemitix.thorp.lib
import java.io.File
import java.nio.file.Path
import scala.jdk.CollectionConverters._
import net.kemitix.eip.zio.MessageChannel.{EChannel, ESender}
import net.kemitix.eip.zio.{Message, MessageChannel}
import net.kemitix.thorp.config.Configuration
import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem._
import zio.clock.Clock
import zio.{RIO, UIO, ZIO}
trait FileScanner {
val fileScanner: FileScanner.Service
}
object FileScanner {
type RemoteHashes = Map[MD5Hash, RemoteKey]
type ScannedFile = LocalFile
type FileSender =
ESender[Clock with FileScanner, Throwable, ScannedFile]
type ScannerChannel = EChannel[Any, Throwable, ScannedFile]
type CacheData = (Path, FileData)
type CacheChannel = EChannel[Any, Throwable, CacheData]
type CacheSender =
ESender[Clock with FileScanner, Throwable, CacheData]
final def scanSources(
configuration: Configuration): RIO[FileScanner, FileSender] =
ZIO.accessM(_.fileScanner.scanSources(configuration))
trait Service {
def scanSources(configuration: Configuration): RIO[FileScanner, FileSender]
}
trait Live extends FileScanner {
val fileScanner: Service = new Service {
override def scanSources(
configuration: Configuration): RIO[FileScanner, FileSender] =
RIO {
fileChannel: EChannel[Clock with FileScanner,
Throwable,
ScannedFile] =>
{
val sources = configuration.sources
(for {
_ <- ZIO.foreach(sources.paths.asScala) { sourcePath =>
for {
cacheSender <- scanSource(configuration, fileChannel)(
sourcePath)
cacheReceiver <- cacheReceiver(sourcePath)
_ <- MessageChannel
.pointToPoint(cacheSender)(cacheReceiver)
.runDrain
_ = FileSystem.moveFile(
sourcePath.resolve(PathCache.tempFileName),
sourcePath.resolve(PathCache.fileName))
} yield ()
}
} yield ()) <* MessageChannel.endChannel(fileChannel)
}
}
private def scanSource(configuration: Configuration,
fileChannel: ScannerChannel)(
sourcePath: Path): RIO[FileScanner, CacheSender] =
RIO { cacheChannel =>
(for {
cache <- UIO(FileSystem.findCache(sourcePath))
_ <- scanPath(configuration, fileChannel, cacheChannel)(sourcePath,
cache)
} yield ()) <* MessageChannel.endChannel(cacheChannel)
}
private def scanPath(configuration: Configuration,
fileChannel: ScannerChannel,
cacheChannel: CacheChannel)(
path: Path,
cache: PathCache): ZIO[Clock with FileScanner, Throwable, Unit] =
for {
dirs <- UIO(FileSystem.listDirs(path))
_ <- ZIO.foreach(dirs.asScala)(
scanPath(configuration, fileChannel, cacheChannel)(_, cache))
files = FileSystem.listFiles(path).asScala.toList
_ <- handleFiles(configuration,
fileChannel,
cacheChannel,
cache,
files)
} yield ()
private def handleFiles(
configuration: Configuration,
fileChannel: ScannerChannel,
cacheChannel: CacheChannel,
pathCache: PathCache,
files: List[File]
): ZIO[Clock, Throwable, List[Unit]] =
ZIO.foreach(files) {
handleFile(configuration, fileChannel, cacheChannel, pathCache)
}
private def handleFile(
configuration: Configuration,
fileChannel: ScannerChannel,
cacheChannel: CacheChannel,
cache: PathCache
)(file: File): ZIO[Clock, Throwable, Unit] =
for {
isIncluded <- Filters.isIncluded(configuration, file)
_ <- ZIO.when(isIncluded) {
sendHashedFile(configuration, fileChannel, cacheChannel)(file,
cache)
}
} yield ()
private def sendHashedFile(
configuration: Configuration,
fileChannel: ScannerChannel,
cacheChannel: CacheChannel
)(file: File, pathCache: PathCache) = {
val sources = configuration.sources
val source = sources.forPath(file.toPath)
val prefix = configuration.prefix
val path = source.relativize(file.toPath)
val hashes = HashGenerator.hashObject(file.toPath)
val remoteKey = RemoteKey.from(source, prefix, file)
val size = file.length()
for {
fileMsg <- Message.create(
LocalFile.create(file, source.toFile, hashes, remoteKey, size))
_ <- MessageChannel.send(fileChannel)(fileMsg)
modified <- UIO(FileSystem.lastModified(file))
cacheMsg <- Message.create(
path -> FileData.create(hashes, LastModified.at(modified)))
_ <- MessageChannel.send(cacheChannel)(cacheMsg)
} yield ()
}
def cacheReceiver(
sourcePath: Path): UIO[MessageChannel.UReceiver[Any, CacheData]] = {
val tempFile = sourcePath.resolve(PathCache.tempFileName).toFile
UIO { message =>
val (path, fileData) = message.body
for {
line <- UIO(PathCache.export(path, fileData).asScala)
_ <- UIO(FileSystem.appendLines(line.toList.asJava, tempFile))
} yield ()
}
}
}
}
object Live extends Live
}

View file

@ -6,30 +6,31 @@ import java.nio.file.Path
import net.kemitix.thorp.config.Configuration import net.kemitix.thorp.config.Configuration
import net.kemitix.thorp.domain.Filter import net.kemitix.thorp.domain.Filter
import net.kemitix.thorp.domain.Filter.{Exclude, Include} import net.kemitix.thorp.domain.Filter.{Exclude, Include}
import zio.UIO
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
object Filters { object Filters {
def isIncluded(configuration: Configuration, file: File): UIO[Boolean] = def isIncluded(configuration: Configuration, file: File): Boolean =
UIO(isIncluded(file.toPath)(configuration.filters.asScala.toList)) isIncluded(file.toPath)(configuration.filters.asScala.toList)
def isIncluded(p: Path)(filters: List[Filter]): Boolean = { def isIncluded(p: Path)(filters: List[Filter]): Boolean = {
sealed trait State sealed trait State
final case class Unknown() extends State final case class Unknown() extends State
final case class Accepted() extends State final case class Accepted() extends State
final case class Discarded() extends State final case class Discarded() extends State
val excluded = isExcludedByFilter(p)(_) val excluded = isExcludedByFilter(p)(_)
val included = isIncludedByFilter(p)(_) val included = isIncludedByFilter(p)(_)
filters.foldRight(Unknown(): State)((filter, state) => filters.foldRight(Unknown(): State)(
(filter, state) match { (filter, state) =>
case (_, Accepted()) => Accepted() (filter, state) match {
case (_, Discarded()) => Discarded() case (_, Accepted()) => Accepted()
case (x: Exclude, _) if excluded(x) => Discarded() case (_, Discarded()) => Discarded()
case (i: Include, _) if included(i) => Accepted() case (x: Exclude, _) if excluded(x) => Discarded()
case _ => Unknown() case (i: Include, _) if included(i) => Accepted()
}) match { case _ => Unknown()
}
) match {
case Accepted() => true case Accepted() => true
case Discarded() => false case Discarded() => false
case Unknown() => case Unknown() =>

View file

@ -1,190 +1,179 @@
package net.kemitix.thorp.lib package net.kemitix.thorp.lib
import scala.jdk.OptionConverters._ import java.util
import scala.jdk.CollectionConverters._ import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import net.kemitix.eip.zio.MessageChannel.UChannel
import net.kemitix.eip.zio.{Message, MessageChannel}
import net.kemitix.thorp.config.Configuration import net.kemitix.thorp.config.Configuration
import net.kemitix.thorp.domain.RemoteObjects import net.kemitix.thorp.domain.Channel.{Listener, Sink}
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain.{RemoteObjects, _}
import net.kemitix.thorp.filesystem.FileSystem import net.kemitix.thorp.filesystem.FileSystem
import net.kemitix.thorp.storage.Storage
import net.kemitix.thorp.uishell.UIEvent import net.kemitix.thorp.uishell.UIEvent
import zio._
import zio.clock.Clock import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters._
trait LocalFileSystem { trait LocalFileSystem {
def scanCopyUpload( def scanCopyUpload(configuration: Configuration,
configuration: Configuration, uiSink: Channel.Sink[UIEvent],
uiChannel: UChannel[Any, UIEvent], remoteObjects: RemoteObjects,
remoteObjects: RemoteObjects, archive: ThorpArchive): Seq[StorageEvent]
archive: ThorpArchive
): RIO[Clock with FileScanner with Storage, Seq[StorageEvent]]
def scanDelete( def scanDelete(configuration: Configuration,
configuration: Configuration, uiSink: Channel.Sink[UIEvent],
uiChannel: UChannel[Any, UIEvent], remoteData: RemoteObjects,
remoteData: RemoteObjects, archive: ThorpArchive): Seq[StorageEvent]
archive: ThorpArchive
): RIO[Clock with Storage, Seq[StorageEvent]]
} }
object LocalFileSystem extends LocalFileSystem { object LocalFileSystem extends LocalFileSystem {
override def scanCopyUpload( override def scanCopyUpload(configuration: Configuration,
configuration: Configuration, uiSink: Channel.Sink[UIEvent],
uiChannel: UChannel[Any, UIEvent], remoteObjects: RemoteObjects,
remoteObjects: RemoteObjects, archive: ThorpArchive): Seq[StorageEvent] = {
archive: ThorpArchive
): RIO[Clock with FileScanner, Seq[StorageEvent]] = val fileChannel: Channel[LocalFile] = Channel.create("files")
for {
actionCounter <- Ref.make(0) // state for the file receiver
bytesCounter <- Ref.make(0L) val actionCounter = new AtomicInteger()
uploads <- Ref.make(Map.empty[MD5Hash, Promise[Throwable, RemoteKey]]) val bytesCounter = new AtomicLong()
eventsRef <- Ref.make(List.empty[StorageEvent]) val uploads = Map.empty[MD5Hash, RemoteKey]
fileSender <- FileScanner.scanSources(configuration) val events = new util.LinkedList[StorageEvent]
fileReceiver <- fileReceiver(
fileChannel.addListener(
fileReceiver(
configuration, configuration,
uiChannel, uiSink,
remoteObjects, remoteObjects,
archive, archive,
uploads, uploads,
actionCounter, actionCounter,
bytesCounter, bytesCounter,
eventsRef events
) )
parallel = configuration.parallel )
_ <- MessageChannel
.pointToPointPar(parallel)(fileSender)(fileReceiver)
.runDrain
events <- eventsRef.get
} yield events
override def scanDelete( fileChannel.run(
configuration: Configuration, sink => FileScanner.scanSources(configuration, sink),
uiChannel: UChannel[Any, UIEvent], "scan-sources"
remoteData: RemoteObjects, )
archive: ThorpArchive
): RIO[Clock, Seq[StorageEvent]] = fileChannel.start()
for { fileChannel.waitForShutdown()
actionCounter <- Ref.make(0) events.asScala.toList
bytesCounter <- Ref.make(0L) }
eventsRef <- Ref.make(List.empty[StorageEvent])
keySender <- keySender(remoteData.byKey.keys.asScala) override def scanDelete(configuration: Configuration,
keyReceiver <- keyReceiver( uiSink: Channel.Sink[UIEvent],
remoteData: RemoteObjects,
archive: ThorpArchive): Seq[StorageEvent] = {
val deletionsChannel: Channel[RemoteKey] = Channel.create("deletions")
// state for the file receiver
val actionCounter = new AtomicInteger()
val bytesCounter = new AtomicLong()
val events = new util.LinkedList[StorageEvent]
deletionsChannel.addListener(
keyReceiver(
configuration, configuration,
uiChannel, uiSink,
archive, archive,
actionCounter, actionCounter,
bytesCounter, bytesCounter,
eventsRef events
) )
parallel = configuration.parallel )
_ <- MessageChannel
.pointToPointPar(parallel)(keySender)(keyReceiver) deletionsChannel.run(sink => {
.runDrain remoteData.byKey.keys().forEach(key => sink.accept(key))
events <- eventsRef.get sink.shutdown()
} yield events }, "delete-source")
deletionsChannel.start()
deletionsChannel.waitForShutdown()
events.asScala.toList
}
private def fileReceiver( private def fileReceiver(
configuration: Configuration, configuration: Configuration,
uiChannel: UChannel[Any, UIEvent], uiSink: Channel.Sink[UIEvent],
remoteObjects: RemoteObjects, remoteObjects: RemoteObjects,
archive: ThorpArchive, archive: ThorpArchive,
uploads: Ref[Map[MD5Hash, Promise[Throwable, RemoteKey]]], uploads: Map[MD5Hash, RemoteKey],
actionCounterRef: Ref[Int], actionCounter: AtomicInteger,
bytesCounterRef: Ref[Long], bytesCounter: AtomicLong,
eventsRef: Ref[List[StorageEvent]] events: util.Deque[StorageEvent]
): UIO[MessageChannel.UReceiver[Clock, FileScanner.ScannedFile]] = ): Listener[LocalFile] = { (localFile: LocalFile) =>
UIO { message => {
val localFile = message.body uiFileFound(uiSink)(localFile)
for { val action =
_ <- uiFileFound(uiChannel)(localFile) chooseAction(configuration, remoteObjects, uploads, uiSink)(localFile)
action <- chooseAction( actionCounter.incrementAndGet()
configuration, bytesCounter.addAndGet(action.size)
remoteObjects, uiActionChosen(uiSink)(action)
uploads, val sequencedAction = SequencedAction(action, actionCounter.get())
uiChannel val event = archive
)(localFile) .update(configuration, uiSink, sequencedAction, bytesCounter.get)
actionCounter <- actionCounterRef.update(_ + 1) events.addFirst(event)
bytesCounter <- bytesCounterRef.update(_ + action.size) uiActionFinished(uiSink)(
_ <- uiActionChosen(uiChannel)(action) action,
sequencedAction = SequencedAction(action, actionCounter) actionCounter.get,
event <- archive.update( bytesCounter.get,
configuration, event
uiChannel, )
sequencedAction,
bytesCounter
)
_ <- eventsRef.update(list => event :: list)
_ <- uiActionFinished(uiChannel)(
action,
actionCounter,
bytesCounter,
event
)
} yield ()
} }
}
private def uiActionChosen( private def uiActionChosen(
uiChannel: MessageChannel.UChannel[Any, UIEvent] uiSink: Channel.Sink[UIEvent]
)(action: Action) = )(action: Action): Unit =
Message.create(UIEvent.actionChosen(action)) >>= uiSink.accept(UIEvent.actionChosen(action))
MessageChannel.send(uiChannel)
private def uiActionFinished(uiChannel: UChannel[Any, UIEvent])( private def uiActionFinished(uiSink: Channel.Sink[UIEvent])(
action: Action, action: Action,
actionCounter: Int, actionCounter: Int,
bytesCounter: Long, bytesCounter: Long,
event: StorageEvent event: StorageEvent
) = ): Unit =
Message.create( uiSink.accept(
UIEvent.actionFinished(action, actionCounter, bytesCounter, event) UIEvent.actionFinished(action, actionCounter, bytesCounter, event)
) >>= )
MessageChannel.send(uiChannel)
private def uiFileFound( private def uiFileFound(
uiChannel: UChannel[Any, UIEvent] uiSink: Channel.Sink[UIEvent]
)(localFile: LocalFile) = )(localFile: LocalFile): Unit =
Message.create(UIEvent.fileFound(localFile)) >>= uiSink.accept(UIEvent.fileFound(localFile))
MessageChannel.send(uiChannel)
private def chooseAction( private def chooseAction(configuration: Configuration,
configuration: Configuration, remoteObjects: RemoteObjects,
remoteObjects: RemoteObjects, uploads: Map[MD5Hash, RemoteKey],
uploads: Ref[Map[MD5Hash, Promise[Throwable, RemoteKey]]], uiSink: Channel.Sink[UIEvent],
uiChannel: UChannel[Any, UIEvent], )(localFile: LocalFile): Action = {
)(localFile: LocalFile): ZIO[Clock, Nothing, Action] = { val remoteExists = remoteObjects.remoteKeyExists(localFile.remoteKey)
for { val remoteMatches = remoteObjects.remoteMatchesLocalFile(localFile)
remoteExists <- UIO(remoteObjects.remoteKeyExists(localFile.remoteKey)) val remoteForHash = remoteObjects.remoteHasHash(localFile.hashes).toScala
remoteMatches <- UIO(remoteObjects.remoteMatchesLocalFile(localFile)) val previous = uploads
remoteForHash <- UIO( val bucket = configuration.bucket
remoteObjects.remoteHasHash(localFile.hashes).toScala val action = if (remoteExists && remoteMatches) {
) doNothing(localFile, bucket)
previous <- uploads.get } else {
bucket = configuration.bucket remoteForHash match {
action <- if (remoteExists && remoteMatches) case pair: Some[Tuple[RemoteKey, MD5Hash]] =>
doNothing(localFile, bucket) val sourceKey = pair.value.a
else { val hash = pair.value.b
remoteForHash match { doCopy(localFile, bucket, sourceKey, hash)
case pair: Some[Tuple[RemoteKey, MD5Hash]] => case _ if matchesPreviousUpload(previous, localFile.hashes) =>
val sourceKey = pair.value.a doCopyWithPreviousUpload(localFile, bucket, previous, uiSink)
val hash = pair.value.b case _ =>
doCopy(localFile, bucket, sourceKey, hash) doUpload(localFile, bucket)
case _ if matchesPreviousUpload(previous, localFile.hashes) =>
doCopyWithPreviousUpload(localFile, bucket, previous, uiChannel)
case _ =>
doUpload(localFile, bucket)
}
} }
} yield action }
action
} }
private def matchesPreviousUpload( private def matchesPreviousUpload(previous: Map[MD5Hash, RemoteKey],
previous: Map[MD5Hash, Promise[Throwable, RemoteKey]], hashes: Hashes) =
hashes: Hashes
): Boolean =
hashes hashes
.values() .values()
.stream() .stream()
@ -192,25 +181,21 @@ object LocalFileSystem extends LocalFileSystem {
previous.contains(hash) previous.contains(hash)
}) })
private def doNothing(localFile: LocalFile, bucket: Bucket): UIO[Action] = private def doNothing(localFile: LocalFile, bucket: Bucket) =
UIO { Action.doNothing(bucket, localFile.remoteKey, localFile.length)
Action.doNothing(bucket, localFile.remoteKey, localFile.length)
}
private def doCopy(localFile: LocalFile, private def doCopy(localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
sourceKey: RemoteKey, sourceKey: RemoteKey,
hash: MD5Hash): UIO[Action] = UIO { hash: MD5Hash) =
Action Action
.toCopy(bucket, sourceKey, hash, localFile.remoteKey, localFile.length) .toCopy(bucket, sourceKey, hash, localFile.remoteKey, localFile.length)
}
private def doCopyWithPreviousUpload( private def doCopyWithPreviousUpload(localFile: LocalFile,
localFile: LocalFile, bucket: Bucket,
bucket: Bucket, previous: Map[MD5Hash, RemoteKey],
previous: Map[MD5Hash, Promise[Throwable, RemoteKey]], uiSink: Channel.Sink[UIEvent],
uiChannel: UChannel[Any, UIEvent], ) = {
): ZIO[Clock, Nothing, Action] = {
localFile.hashes localFile.hashes
.values() .values()
.stream() .stream()
@ -220,91 +205,58 @@ object LocalFileSystem extends LocalFileSystem {
.findFirst() .findFirst()
.toScala .toScala
.map({ hash => .map({ hash =>
for { {
awaitingMessage <- Message.create( uiSink
UIEvent.awaitingAnotherUpload(localFile.remoteKey, hash) .accept(UIEvent.awaitingAnotherUpload(localFile.remoteKey, hash))
val action = Action.toCopy(
bucket,
previous(hash),
hash,
localFile.remoteKey,
localFile.length
) )
_ <- MessageChannel.send(uiChannel)(awaitingMessage) uiSink.accept(UIEvent.anotherUploadWaitComplete(action))
action <- previous(hash).await.map( action
remoteKey => }
Action.toCopy(
bucket,
remoteKey,
hash,
localFile.remoteKey,
localFile.length
)
)
waitFinishedMessage <- Message.create(
UIEvent.anotherUploadWaitComplete(action)
)
_ <- MessageChannel.send(uiChannel)(waitFinishedMessage)
} yield action
}) })
.getOrElse(doUpload(localFile, bucket)) .getOrElse(doUpload(localFile, bucket))
.refineToOrDie[Nothing]
} }
private def doUpload(localFile: LocalFile, bucket: Bucket): UIO[Action] = { private def doUpload(localFile: LocalFile, bucket: Bucket) =
UIO(Action.toUpload(bucket, localFile, localFile.length)) Action.toUpload(bucket, localFile, localFile.length)
}
def keySender( def keyReceiver(configuration: Configuration,
keys: Iterable[RemoteKey] uiSink: Channel.Sink[UIEvent],
): UIO[MessageChannel.Sender[Clock, RemoteKey]] = archive: ThorpArchive,
UIO { channel => actionCounter: AtomicInteger,
ZIO.foreach(keys) { key => bytesCounter: AtomicLong,
Message.create(key) >>= MessageChannel.send(channel) events: util.Deque[StorageEvent]): Listener[RemoteKey] = {
} *> MessageChannel.endChannel(channel) (remoteKey: RemoteKey) =>
}
def keyReceiver(
configuration: Configuration,
uiChannel: UChannel[Any, UIEvent],
archive: ThorpArchive,
actionCounterRef: Ref[Int],
bytesCounterRef: Ref[Long],
eventsRef: Ref[List[StorageEvent]]
): UIO[MessageChannel.UReceiver[Clock, RemoteKey]] =
UIO { message =>
{ {
val remoteKey = message.body uiKeyFound(uiSink)(remoteKey)
for { val sources = configuration.sources
_ <- uiKeyFound(uiChannel)(remoteKey) val prefix = configuration.prefix
sources = configuration.sources val exists = FileSystem.hasLocalFile(sources, prefix, remoteKey)
prefix = configuration.prefix if (!exists) {
exists = FileSystem.hasLocalFile(sources, prefix, remoteKey) actionCounter.incrementAndGet()
_ <- ZIO.when(!exists) { val bucket = configuration.bucket
for { val action = Action.toDelete(bucket, remoteKey, 0L)
actionCounter <- actionCounterRef.update(_ + 1) uiActionChosen(uiSink)(action)
bucket = configuration.bucket bytesCounter.addAndGet(action.size)
action = Action.toDelete(bucket, remoteKey, 0L) val sequencedAction = SequencedAction(action, actionCounter.get())
_ <- uiActionChosen(uiChannel)(action) val event = archive.update(configuration, uiSink, sequencedAction, 0L)
bytesCounter <- bytesCounterRef.update(_ + action.size) events.addFirst(event)
sequencedAction = SequencedAction(action, actionCounter) uiActionFinished(uiSink)(
event <- archive.update( action,
configuration, actionCounter.get(),
uiChannel, bytesCounter.get(),
sequencedAction, event
0L )
) }
_ <- eventsRef.update(list => event :: list)
_ <- uiActionFinished(uiChannel)(
action,
actionCounter,
bytesCounter,
event
)
} yield ()
}
} yield ()
} }
} }
private def uiKeyFound( private def uiKeyFound(uiSink: Sink[UIEvent])(remoteKey: RemoteKey): Unit =
uiChannel: UChannel[Any, UIEvent] uiSink.accept(UIEvent.keyFound(remoteKey))
)(remoteKey: RemoteKey) =
Message.create(UIEvent.keyFound(remoteKey)) >>=
MessageChannel.send(uiChannel)
} }

View file

@ -1,66 +1,44 @@
package net.kemitix.thorp.lib package net.kemitix.thorp.lib
import net.kemitix.eip.zio.MessageChannel.UChannel
import net.kemitix.thorp.config.Configuration import net.kemitix.thorp.config.Configuration
import net.kemitix.thorp.console._ import net.kemitix.thorp.console._
import net.kemitix.thorp.domain.StorageEvent
import net.kemitix.thorp.domain.StorageEvent._ import net.kemitix.thorp.domain.StorageEvent._
import net.kemitix.thorp.domain.{Channel, StorageEvent}
import net.kemitix.thorp.uishell.UIEvent import net.kemitix.thorp.uishell.UIEvent
import zio.UIO
trait ThorpArchive { trait ThorpArchive {
def update(configuration: Configuration, def update(configuration: Configuration,
uiChannel: UChannel[Any, UIEvent], uiSink: Channel.Sink[UIEvent],
sequencedAction: SequencedAction, sequencedAction: SequencedAction,
totalBytesSoFar: Long): UIO[StorageEvent] totalBytesSoFar: Long): StorageEvent
def logEvent(configuration: Configuration, def logEvent(configuration: Configuration,
event: StorageEvent): UIO[StorageEvent] = { event: StorageEvent): StorageEvent = {
val batchMode = configuration.batchMode val batchMode = configuration.batchMode
for { event match {
sqe <- event match { case uploadEvent: UploadEvent =>
case uploadEvent: UploadEvent => val remoteKey = uploadEvent.remoteKey
val remoteKey = uploadEvent.remoteKey Console.putMessageLnB(ConsoleOut.uploadComplete(remoteKey), batchMode)
UIO(event) <* { case copyEvent: CopyEvent =>
Console.putMessageLnB( val sourceKey = copyEvent.sourceKey
ConsoleOut.uploadComplete(remoteKey), val targetKey = copyEvent.targetKey
batchMode Console.putMessageLnB(
) ConsoleOut.copyComplete(sourceKey, targetKey),
UIO.unit batchMode
} )
case copyEvent: CopyEvent => case deleteEvent: DeleteEvent =>
val sourceKey = copyEvent.sourceKey val remoteKey = deleteEvent.remoteKey
val targetKey = copyEvent.targetKey Console.putMessageLnB(ConsoleOut.deleteComplete(remoteKey), batchMode)
UIO(event) <* { case errorEvent: ErrorEvent =>
Console.putMessageLnB( val action = errorEvent.action
ConsoleOut.copyComplete(sourceKey, targetKey), val e = errorEvent.e
batchMode Console.putMessageLnB(
) ConsoleOut.errorQueueEventOccurred(action, e),
UIO.unit batchMode
} )
case deleteEvent: DeleteEvent => }
val remoteKey = deleteEvent.remoteKey event
UIO(event) <* {
Console.putMessageLnB(
ConsoleOut.deleteComplete(remoteKey),
batchMode
)
UIO.unit
}
case errorEvent: ErrorEvent =>
val action = errorEvent.action
val e = errorEvent.e
UIO(event) <* {
Console.putMessageLnB(
ConsoleOut.errorQueueEventOccurred(action, e),
batchMode
)
UIO.unit
}
case _ => UIO(event)
}
} yield sqe
} }
} }

View file

@ -1,59 +1,49 @@
package net.kemitix.thorp.lib package net.kemitix.thorp.lib
import net.kemitix.eip.zio.MessageChannel.UChannel
import net.kemitix.thorp.config.Configuration import net.kemitix.thorp.config.Configuration
import net.kemitix.thorp.domain.Action.{ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.domain.Action.{ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.Storage import net.kemitix.thorp.storage.Storage
import net.kemitix.thorp.uishell.{UIEvent, UploadEventListener} import net.kemitix.thorp.uishell.{UIEvent, UploadEventListener}
import zio.UIO
trait UnversionedMirrorArchive extends ThorpArchive { trait UnversionedMirrorArchive extends ThorpArchive {
override def update(configuration: Configuration, override def update(configuration: Configuration,
uiChannel: UChannel[Any, UIEvent], uiSink: Channel.Sink[UIEvent],
sequencedAction: SequencedAction, sequencedAction: SequencedAction,
totalBytesSoFar: Long): UIO[StorageEvent] = { totalBytesSoFar: Long): StorageEvent = {
val action = sequencedAction.action val action = sequencedAction.action
val index = sequencedAction.index val index = sequencedAction.index
val bucket = action.bucket val bucket = action.bucket
action match { action match {
case upload: ToUpload => case upload: ToUpload =>
val localFile = upload.localFile val localFile = upload.localFile
UIO { doUpload(
doUpload( configuration,
configuration, uiSink,
uiChannel, index,
index, totalBytesSoFar,
totalBytesSoFar, bucket,
bucket, localFile
localFile )
)
}
case toCopy: ToCopy => case toCopy: ToCopy =>
val sourceKey = toCopy.sourceKey val sourceKey = toCopy.sourceKey
val hash = toCopy.hash val hash = toCopy.hash
val targetKey = toCopy.targetKey val targetKey = toCopy.targetKey
UIO { Storage
Storage .getInstance()
.getInstance() .copy(bucket, sourceKey, hash, targetKey)
.copy(bucket, sourceKey, hash, targetKey)
}
case toDelete: ToDelete => case toDelete: ToDelete =>
val remoteKey = toDelete.remoteKey val remoteKey = toDelete.remoteKey
UIO { Storage.getInstance().delete(bucket, remoteKey)
Storage.getInstance().delete(bucket, remoteKey)
}
case doNothing: Action.DoNothing => case doNothing: Action.DoNothing =>
val remoteKey = doNothing.remoteKey val remoteKey = doNothing.remoteKey
UIO { StorageEvent.doNothingEvent(remoteKey)
StorageEvent.doNothingEvent(remoteKey)
}
} }
} }
private def doUpload(configuration: Configuration, private def doUpload(configuration: Configuration,
uiChannel: UChannel[Any, UIEvent], uiSink: Channel.Sink[UIEvent],
index: Int, index: Int,
totalBytesSoFar: Long, totalBytesSoFar: Long,
bucket: Bucket, bucket: Bucket,
@ -65,7 +55,7 @@ trait UnversionedMirrorArchive extends ThorpArchive {
bucket, bucket,
listenerSettings( listenerSettings(
configuration, configuration,
uiChannel, uiSink,
index, index,
totalBytesSoFar, totalBytesSoFar,
bucket, bucket,
@ -74,13 +64,13 @@ trait UnversionedMirrorArchive extends ThorpArchive {
) )
private def listenerSettings(configuration: Configuration, private def listenerSettings(configuration: Configuration,
uiChannel: UChannel[Any, UIEvent], uiSink: Channel.Sink[UIEvent],
index: Int, index: Int,
totalBytesSoFar: Long, totalBytesSoFar: Long,
bucket: Bucket, bucket: Bucket,
localFile: LocalFile) = localFile: LocalFile) =
UploadEventListener.Settings( UploadEventListener.Settings(
uiChannel, uiSink,
localFile, localFile,
index, index,
totalBytesSoFar, totalBytesSoFar,

View file

@ -1,61 +1,46 @@
package net.kemitix.thorp.lib package net.kemitix.thorp.lib
import java.util.concurrent.atomic.AtomicReference
import scala.jdk.CollectionConverters._
import net.kemitix.eip.zio.MessageChannel
import net.kemitix.thorp.config.{
ConfigOption,
ConfigOptions,
ConfigurationBuilder
}
import net.kemitix.thorp.domain.RemoteKey
import net.kemitix.thorp.filesystem.Resource
import net.kemitix.thorp.lib.FileScanner.ScannedFile
import org.scalatest.FreeSpec import org.scalatest.FreeSpec
import zio.clock.Clock
import zio.{DefaultRuntime, Ref, UIO, ZIO}
class FileScannerTest extends FreeSpec { class FileScannerTest extends FreeSpec {
"scanSources" - { // "scanSources" - {
"creates a FileSender for files in resources" in { // "creates a FileSender for files in resources" in {
def receiver(scanned: Ref[List[RemoteKey]]) // def receiver(scanned: Ref[List[RemoteKey]])
: UIO[MessageChannel.UReceiver[Any, ScannedFile]] = UIO { message => // : UIO[MessageChannel.UReceiver[Any, ScannedFile]] = UIO { message =>
for { // for {
_ <- scanned.update(l => message.body.remoteKey :: l) // _ <- scanned.update(l => message.body.remoteKey :: l)
} yield () // } yield ()
} // }
val scannedFiles = // val scannedFiles =
new AtomicReference[List[RemoteKey]](List.empty) // new AtomicReference[List[RemoteKey]](List.empty)
val sourcePath = Resource.select(this, "upload").toPath // val sourcePath = Resource.select(this, "upload").toPath
val configOptions: List[ConfigOption] = // val configOptions: List[ConfigOption] =
List[ConfigOption](ConfigOption.source(sourcePath), // List[ConfigOption](ConfigOption.source(sourcePath),
ConfigOption.bucket("bucket"), // ConfigOption.bucket("bucket"),
ConfigOption.ignoreGlobalOptions(), // ConfigOption.ignoreGlobalOptions(),
ConfigOption.ignoreUserOptions()) // ConfigOption.ignoreUserOptions())
val program: ZIO[Clock with FileScanner, Throwable, Unit] = { // val program: ZIO[Clock with FileScanner, Throwable, Unit] = {
val configuration = ConfigurationBuilder.buildConfig( // val configuration = ConfigurationBuilder.buildConfig(
ConfigOptions.create(configOptions.asJava)) // ConfigOptions.create(configOptions.asJava))
for { // for {
scanner <- FileScanner.scanSources(configuration) // scanner <- FileScanner.scanSources(configuration)
scannedRef <- Ref.make[List[RemoteKey]](List.empty) // scannedRef <- Ref.make[List[RemoteKey]](List.empty)
receiver <- receiver(scannedRef) // receiver <- receiver(scannedRef)
_ <- MessageChannel.pointToPoint(scanner)(receiver).runDrain // _ <- MessageChannel.pointToPoint(scanner)(receiver).runDrain
scanned <- scannedRef.get // scanned <- scannedRef.get
_ <- UIO(scannedFiles.set(scanned)) // _ <- UIO(scannedFiles.set(scanned))
} yield () // } yield ()
} // }
object TestEnv extends FileScanner.Live with Clock.Live // object TestEnv extends FileScanner.Live with Clock.Live
val completed = // val completed =
new DefaultRuntime {}.unsafeRunSync(program.provide(TestEnv)).toEither // new DefaultRuntime {}.unsafeRunSync(program.provide(TestEnv)).toEither
assert(completed.isRight) // assert(completed.isRight)
assertResult( // assertResult(
Set(RemoteKey.create("root-file"), // Set(RemoteKey.create("root-file"),
RemoteKey.create("subdir/leaf-file")))(scannedFiles.get.toSet) // RemoteKey.create("subdir/leaf-file")))(scannedFiles.get.toSet)
} // }
//
} // }
} }

View file

@ -2,15 +2,7 @@ package net.kemitix.thorp.lib
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import net.kemitix.eip.zio.MessageChannel import net.kemitix.thorp.config.{ConfigOption, ConfigOptions, Configuration}
import net.kemitix.eip.zio.MessageChannel.UChannel
import net.kemitix.thorp.config.{
ConfigOption,
ConfigOptions,
Configuration,
ConfigurationBuilder
}
import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem.Resource import net.kemitix.thorp.filesystem.Resource
import net.kemitix.thorp.uishell.UIEvent import net.kemitix.thorp.uishell.UIEvent
@ -21,11 +13,7 @@ import net.kemitix.thorp.uishell.UIEvent.{
KeyFound KeyFound
} }
import org.scalatest.FreeSpec import org.scalatest.FreeSpec
import org.scalatest.Matchers._
import zio.clock.Clock
import zio.{DefaultRuntime, UIO}
import scala.collection.MapView
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
class LocalFileSystemTest extends FreeSpec { class LocalFileSystemTest extends FreeSpec {
@ -49,311 +37,309 @@ class LocalFileSystemTest extends FreeSpec {
private def archive: ThorpArchive = new ThorpArchive { private def archive: ThorpArchive = new ThorpArchive {
override def update(configuration: Configuration, override def update(configuration: Configuration,
uiChannel: UChannel[Any, UIEvent], uiSink: Channel.Sink[UIEvent],
sequencedAction: SequencedAction, sequencedAction: SequencedAction,
totalBytesSoFar: Long): UIO[StorageEvent] = UIO { totalBytesSoFar: Long): StorageEvent = {
actions.updateAndGet(l => sequencedAction :: l) actions.updateAndGet(l => sequencedAction :: l)
StorageEvent.doNothingEvent(sequencedAction.action.remoteKey) StorageEvent.doNothingEvent(sequencedAction.action.remoteKey)
} }
} }
private val runtime = new DefaultRuntime {} // private object TestEnv extends Clock.Live with FileScanner.Live
//
private object TestEnv extends Clock.Live with FileScanner.Live // "scanCopyUpload" - {
// def sender(
"scanCopyUpload" - { // configuration: Configuration,
def sender( // objects: RemoteObjects
configuration: Configuration, // ): UIO[MessageChannel.ESender[Clock with FileScanner, Throwable, UIEvent]] =
objects: RemoteObjects // UIO { uiChannel =>
): UIO[MessageChannel.ESender[Clock with FileScanner, Throwable, UIEvent]] = // (for {
UIO { uiChannel => // _ <- LocalFileSystem.scanCopyUpload(
(for { // configuration,
_ <- LocalFileSystem.scanCopyUpload( // uiChannel,
configuration, // objects,
uiChannel, // archive
objects, // )
archive // } yield ()) <* MessageChannel.endChannel(uiChannel)
) // }
} yield ()) <* MessageChannel.endChannel(uiChannel) // def receiver(): UIO[MessageChannel.UReceiver[Any, UIEvent]] =
} // UIO { message =>
def receiver(): UIO[MessageChannel.UReceiver[Any, UIEvent]] = // val uiEvent = message.body
UIO { message => // uiEvents.updateAndGet(l => uiEvent :: l)
val uiEvent = message.body // UIO(())
uiEvents.updateAndGet(l => uiEvent :: l) // }
UIO(()) // def program(remoteObjects: RemoteObjects) = {
} // val configuration = ConfigurationBuilder.buildConfig(configOptions)
def program(remoteObjects: RemoteObjects) = { // for {
val configuration = ConfigurationBuilder.buildConfig(configOptions) // sender <- sender(configuration, remoteObjects)
for { // receiver <- receiver()
sender <- sender(configuration, remoteObjects) // _ <- MessageChannel.pointToPoint(sender)(receiver).runDrain
receiver <- receiver() // } yield ()
_ <- MessageChannel.pointToPoint(sender)(receiver).runDrain // }
} yield () // "where remote has no objects" - {
} // val remoteObjects = RemoteObjects.empty
"where remote has no objects" - { // "upload all files" - {
val remoteObjects = RemoteObjects.empty // "update archive with upload actions" in {
"upload all files" - { // actions.set(List.empty)
"update archive with upload actions" in { // runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv))
actions.set(List.empty) // val actionList: Set[Action] = actions.get.map(_.action).toSet
runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv)) // actionList.filter(_.isInstanceOf[ToUpload]) should have size 2
val actionList: Set[Action] = actions.get.map(_.action).toSet // actionList.map(_.remoteKey) shouldEqual Set(
actionList.filter(_.isInstanceOf[ToUpload]) should have size 2 // MD5HashData.Root.remoteKey,
actionList.map(_.remoteKey) shouldEqual Set( // MD5HashData.Leaf.remoteKey
MD5HashData.Root.remoteKey, // )
MD5HashData.Leaf.remoteKey // }
) // "ui is updated" in {
} // uiEvents.set(List.empty)
"ui is updated" in { // runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv))
uiEvents.set(List.empty) // val summary = uiEventsSummary
runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv)) // summary should have size 6
val summary = uiEventsSummary // summary should contain inOrderElementsOf List(
summary should have size 6 // "file found : root-file",
summary should contain inOrderElementsOf List( // "action chosen : root-file : ToUpload",
"file found : root-file", // "action finished : root-file : ToUpload"
"action chosen : root-file : ToUpload", // )
"action finished : root-file : ToUpload" // summary should contain inOrderElementsOf List(
) // "file found : subdir/leaf-file",
summary should contain inOrderElementsOf List( // "action chosen : subdir/leaf-file : ToUpload",
"file found : subdir/leaf-file", // "action finished : subdir/leaf-file : ToUpload"
"action chosen : subdir/leaf-file : ToUpload", // )
"action finished : subdir/leaf-file : ToUpload" // }
) // }
} // }
} // "where remote has all object" - {
} // val remoteObjects =
"where remote has all object" - { // RemoteObjects.create(
val remoteObjects = // MapView(
RemoteObjects.create( // MD5HashData.Root.hash -> MD5HashData.Root.remoteKey,
MapView( // MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey
MD5HashData.Root.hash -> MD5HashData.Root.remoteKey, // ).toMap.asJava,
MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey // MapView(
).toMap.asJava, // MD5HashData.Root.remoteKey -> MD5HashData.Root.hash,
MapView( // MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash
MD5HashData.Root.remoteKey -> MD5HashData.Root.hash, // ).toMap.asJava
MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash // )
).toMap.asJava // "do nothing for all files" - {
) // "all archive actions do nothing" in {
"do nothing for all files" - { // actions.set(List.empty)
"all archive actions do nothing" in { // runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv))
actions.set(List.empty) // val actionList: Set[Action] = actions.get.map(_.action).toSet
runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv)) // actionList should have size 2
val actionList: Set[Action] = actions.get.map(_.action).toSet // actionList.filter(_.isInstanceOf[DoNothing]) should have size 2
actionList should have size 2 // }
actionList.filter(_.isInstanceOf[DoNothing]) should have size 2 // "ui is updated" in {
} // uiEvents.set(List.empty)
"ui is updated" in { // runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv))
uiEvents.set(List.empty) // val summary = uiEventsSummary
runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv)) // summary should have size 6
val summary = uiEventsSummary // summary should contain inOrderElementsOf List(
summary should have size 6 // "file found : root-file",
summary should contain inOrderElementsOf List( // "action chosen : root-file : DoNothing",
"file found : root-file", // "action finished : root-file : DoNothing"
"action chosen : root-file : DoNothing", // )
"action finished : root-file : DoNothing" // summary should contain inOrderElementsOf List(
) // "file found : subdir/leaf-file",
summary should contain inOrderElementsOf List( // "action chosen : subdir/leaf-file : DoNothing",
"file found : subdir/leaf-file", // "action finished : subdir/leaf-file : DoNothing"
"action chosen : subdir/leaf-file : DoNothing", // )
"action finished : subdir/leaf-file : DoNothing" // }
) // }
} // }
} // "where remote has some objects" - {
} // val remoteObjects =
"where remote has some objects" - { // RemoteObjects.create(
val remoteObjects = // MapView(MD5HashData.Root.hash -> MD5HashData.Root.remoteKey).toMap.asJava,
RemoteObjects.create( // MapView(MD5HashData.Root.remoteKey -> MD5HashData.Root.hash).toMap.asJava
MapView(MD5HashData.Root.hash -> MD5HashData.Root.remoteKey).toMap.asJava, // )
MapView(MD5HashData.Root.remoteKey -> MD5HashData.Root.hash).toMap.asJava // "upload leaf, do nothing for root" - {
) // "archive actions upload leaf" in {
"upload leaf, do nothing for root" - { // actions.set(List.empty)
"archive actions upload leaf" in { // runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv))
actions.set(List.empty) // val actionList: Set[Action] = actions.get.map(_.action).toSet
runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv)) // actionList
val actionList: Set[Action] = actions.get.map(_.action).toSet // .filter(_.isInstanceOf[DoNothing])
actionList // .map(_.remoteKey) shouldEqual Set(MD5HashData.Root.remoteKey)
.filter(_.isInstanceOf[DoNothing]) // actionList
.map(_.remoteKey) shouldEqual Set(MD5HashData.Root.remoteKey) // .filter(_.isInstanceOf[ToUpload])
actionList // .map(_.remoteKey) shouldEqual Set(MD5HashData.Leaf.remoteKey)
.filter(_.isInstanceOf[ToUpload]) // }
.map(_.remoteKey) shouldEqual Set(MD5HashData.Leaf.remoteKey) // "ui is updated" in {
} // uiEvents.set(List.empty)
"ui is updated" in { // runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv))
uiEvents.set(List.empty) // val summary = uiEventsSummary
runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv)) // summary should contain inOrderElementsOf List(
val summary = uiEventsSummary // "file found : root-file",
summary should contain inOrderElementsOf List( // "action chosen : root-file : DoNothing",
"file found : root-file", // "action finished : root-file : DoNothing"
"action chosen : root-file : DoNothing", // )
"action finished : root-file : DoNothing" // summary should contain inOrderElementsOf List(
) // "file found : subdir/leaf-file",
summary should contain inOrderElementsOf List( // "action chosen : subdir/leaf-file : ToUpload",
"file found : subdir/leaf-file", // "action finished : subdir/leaf-file : ToUpload"
"action chosen : subdir/leaf-file : ToUpload", // )
"action finished : subdir/leaf-file : ToUpload" // }
) // }
} // }
} // "where remote objects are swapped" ignore {
} // val remoteObjects =
"where remote objects are swapped" ignore { // RemoteObjects.create(
val remoteObjects = // MapView(
RemoteObjects.create( // MD5HashData.Root.hash -> MD5HashData.Leaf.remoteKey,
MapView( // MD5HashData.Leaf.hash -> MD5HashData.Root.remoteKey
MD5HashData.Root.hash -> MD5HashData.Leaf.remoteKey, // ).toMap.asJava,
MD5HashData.Leaf.hash -> MD5HashData.Root.remoteKey // MapView(
).toMap.asJava, // MD5HashData.Root.remoteKey -> MD5HashData.Leaf.hash,
MapView( // MD5HashData.Leaf.remoteKey -> MD5HashData.Root.hash
MD5HashData.Root.remoteKey -> MD5HashData.Leaf.hash, // ).toMap.asJava
MD5HashData.Leaf.remoteKey -> MD5HashData.Root.hash // )
).toMap.asJava // "copy files" - {
) // "archive swaps objects" ignore {
"copy files" - { // // not supported
"archive swaps objects" ignore { // }
// not supported // }
} // }
} // "where file has been renamed" - {
} // // renamed from "other/root" to "root-file"
"where file has been renamed" - { // val otherRootKey = RemoteKey.create("other/root")
// renamed from "other/root" to "root-file" // val remoteObjects =
val otherRootKey = RemoteKey.create("other/root") // RemoteObjects.create(
val remoteObjects = // MapView(
RemoteObjects.create( // MD5HashData.Root.hash -> otherRootKey,
MapView( // MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey
MD5HashData.Root.hash -> otherRootKey, // ).toMap.asJava,
MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey // MapView(
).toMap.asJava, // otherRootKey -> MD5HashData.Root.hash,
MapView( // MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash
otherRootKey -> MD5HashData.Root.hash, // ).toMap.asJava
MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash // )
).toMap.asJava // "copy object and delete original" in {
) // actions.set(List.empty)
"copy object and delete original" in { // runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv))
actions.set(List.empty) // val actionList: Set[Action] = actions.get.map(_.action).toSet
runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv)) // actionList should have size 2
val actionList: Set[Action] = actions.get.map(_.action).toSet // actionList
actionList should have size 2 // .filter(_.isInstanceOf[DoNothing])
actionList // .map(_.remoteKey) shouldEqual Set(MD5HashData.Leaf.remoteKey)
.filter(_.isInstanceOf[DoNothing]) // actionList
.map(_.remoteKey) shouldEqual Set(MD5HashData.Leaf.remoteKey) // .filter(_.isInstanceOf[ToCopy])
actionList // .map(_.remoteKey) shouldEqual Set(MD5HashData.Root.remoteKey)
.filter(_.isInstanceOf[ToCopy]) // }
.map(_.remoteKey) shouldEqual Set(MD5HashData.Root.remoteKey) // "ui is updated" in {
} // uiEvents.set(List.empty)
"ui is updated" in { // runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv))
uiEvents.set(List.empty) // val summary = uiEventsSummary
runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv)) // summary should contain inOrderElementsOf List(
val summary = uiEventsSummary // "file found : root-file",
summary should contain inOrderElementsOf List( // "action chosen : root-file : ToCopy",
"file found : root-file", // "action finished : root-file : ToCopy"
"action chosen : root-file : ToCopy", // )
"action finished : root-file : ToCopy" // summary should contain inOrderElementsOf List(
) // "file found : subdir/leaf-file",
summary should contain inOrderElementsOf List( // "action chosen : subdir/leaf-file : DoNothing",
"file found : subdir/leaf-file", // "action finished : subdir/leaf-file : DoNothing"
"action chosen : subdir/leaf-file : DoNothing", // )
"action finished : subdir/leaf-file : DoNothing" // }
) // }
} // }
} //
} // "scanDelete" - {
// def sender(
"scanDelete" - { // configuration: Configuration,
def sender( // objects: RemoteObjects
configuration: Configuration, // ): UIO[MessageChannel.ESender[Clock, Throwable, UIEvent]] =
objects: RemoteObjects // UIO { uiChannel =>
): UIO[MessageChannel.ESender[Clock, Throwable, UIEvent]] = // (for {
UIO { uiChannel => // _ <- LocalFileSystem.scanDelete(
(for { // configuration,
_ <- LocalFileSystem.scanDelete( // uiChannel,
configuration, // objects,
uiChannel, // archive
objects, // )
archive // } yield ()) <* MessageChannel.endChannel(uiChannel)
) // }
} yield ()) <* MessageChannel.endChannel(uiChannel) // def receiver(): UIO[MessageChannel.UReceiver[Any, UIEvent]] =
} // UIO { message =>
def receiver(): UIO[MessageChannel.UReceiver[Any, UIEvent]] = // val uiEvent = message.body
UIO { message => // uiEvents.updateAndGet(l => uiEvent :: l)
val uiEvent = message.body // UIO(())
uiEvents.updateAndGet(l => uiEvent :: l) // }
UIO(()) // def program(remoteObjects: RemoteObjects) = {
} // {
def program(remoteObjects: RemoteObjects) = { // val configuration = ConfigurationBuilder.buildConfig(configOptions)
{ // for {
val configuration = ConfigurationBuilder.buildConfig(configOptions) // sender <- sender(configuration, remoteObjects)
for { // receiver <- receiver()
sender <- sender(configuration, remoteObjects) // _ <- MessageChannel.pointToPoint(sender)(receiver).runDrain
receiver <- receiver() // } yield ()
_ <- MessageChannel.pointToPoint(sender)(receiver).runDrain // }
} yield () // }
} // "where remote has no extra objects" - {
} // val remoteObjects = RemoteObjects.create(
"where remote has no extra objects" - { // MapView(
val remoteObjects = RemoteObjects.create( // MD5HashData.Root.hash -> MD5HashData.Root.remoteKey,
MapView( // MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey
MD5HashData.Root.hash -> MD5HashData.Root.remoteKey, // ).toMap.asJava,
MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey // MapView(
).toMap.asJava, // MD5HashData.Root.remoteKey -> MD5HashData.Root.hash,
MapView( // MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash
MD5HashData.Root.remoteKey -> MD5HashData.Root.hash, // ).toMap.asJava
MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash // )
).toMap.asJava // "do nothing for all files" - {
) // "no archive actions" in {
"do nothing for all files" - { // actions.set(List.empty)
"no archive actions" in { // runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv))
actions.set(List.empty) // val actionList: Set[Action] = actions.get.map(_.action).toSet
runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv)) // actionList should have size 0
val actionList: Set[Action] = actions.get.map(_.action).toSet // }
actionList should have size 0 // "ui is updated" in {
} // uiEvents.set(List.empty)
"ui is updated" in { // runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv))
uiEvents.set(List.empty) // uiEventsSummary shouldEqual List(
runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv)) // "key found: root-file",
uiEventsSummary shouldEqual List( // "key found: subdir/leaf-file"
"key found: root-file", // )
"key found: subdir/leaf-file" // }
) // }
} // }
} // "where remote has extra objects" - {
} // val extraHash = MD5Hash.create("extra")
"where remote has extra objects" - { // val extraObject = RemoteKey.create("extra")
val extraHash = MD5Hash.create("extra") // val remoteObjects = RemoteObjects.create(
val extraObject = RemoteKey.create("extra") // MapView(
val remoteObjects = RemoteObjects.create( // MD5HashData.Root.hash -> MD5HashData.Root.remoteKey,
MapView( // MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey,
MD5HashData.Root.hash -> MD5HashData.Root.remoteKey, // extraHash -> extraObject
MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey, // ).toMap.asJava,
extraHash -> extraObject // MapView(
).toMap.asJava, // MD5HashData.Root.remoteKey -> MD5HashData.Root.hash,
MapView( // MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash,
MD5HashData.Root.remoteKey -> MD5HashData.Root.hash, // extraObject -> extraHash
MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash, // ).toMap.asJava
extraObject -> extraHash // )
).toMap.asJava // "remove the extra object" - {
) // "archive delete action" in {
"remove the extra object" - { // actions.set(List.empty)
"archive delete action" in { // runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv))
actions.set(List.empty) // val actionList: Set[Action] = actions.get.map(_.action).toSet
runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv)) // actionList should have size 1
val actionList: Set[Action] = actions.get.map(_.action).toSet // actionList
actionList should have size 1 // .filter(_.isInstanceOf[ToDelete])
actionList // .map(_.remoteKey) shouldEqual Set(extraObject)
.filter(_.isInstanceOf[ToDelete]) // }
.map(_.remoteKey) shouldEqual Set(extraObject) // "ui is updated" in {
} // uiEvents.set(List.empty)
"ui is updated" in { // runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv))
uiEvents.set(List.empty) // uiEventsSummary shouldEqual List(
runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv)) // "key found: root-file",
uiEventsSummary shouldEqual List( // "key found: subdir/leaf-file",
"key found: root-file", // "key found: extra",
"key found: subdir/leaf-file", // "action chosen : extra : ToDelete",
"key found: extra", // "action finished : extra : ToDelete"
"action chosen : extra : ToDelete", // )
"action finished : extra : ToDelete" // }
) // }
} // }
} // }
}
}
private def uiEventsSummary: List[String] = { private def uiEventsSummary: List[String] = {
uiEvents uiEvents

View file

@ -25,7 +25,6 @@
<junit.version>5.6.2</junit.version> <junit.version>5.6.2</junit.version>
<assertj.version>3.16.1</assertj.version> <assertj.version>3.16.1</assertj.version>
<mockito.version>3.3.3</mockito.version> <mockito.version>3.3.3</mockito.version>
<zio.version>1.0.0-RC16</zio.version>
</properties> </properties>
<dependencyManagement> <dependencyManagement>
@ -119,23 +118,6 @@
<artifactId>scala-library</artifactId> <artifactId>scala-library</artifactId>
<version>${scala-library.version}</version> <version>${scala-library.version}</version>
</dependency> </dependency>
<!-- scala - zio -->
<dependency>
<groupId>dev.zio</groupId>
<artifactId>zio_2.13</artifactId>
<version>${zio.version}</version>
</dependency>
<dependency>
<groupId>dev.zio</groupId>
<artifactId>zio-streams_2.13</artifactId>
<version>${zio.version}</version>
</dependency>
<!-- scala - eip-zio -->
<dependency>
<groupId>net.kemitix</groupId>
<artifactId>eip-zio_2.13</artifactId>
<version>0.3.2</version>
</dependency>
<!-- scala - command line parsing --> <!-- scala - command line parsing -->
<dependency> <dependency>
<groupId>com.github.scopt</groupId> <groupId>com.github.scopt</groupId>

View file

@ -3,14 +3,13 @@ package net.kemitix.thorp.storage.aws.hasher
import com.amazonaws.services.s3.transfer.TransferManagerConfiguration import com.amazonaws.services.s3.transfer.TransferManagerConfiguration
import net.kemitix.thorp.filesystem.Resource import net.kemitix.thorp.filesystem.Resource
import org.scalatest.FreeSpec import org.scalatest.FreeSpec
import zio.DefaultRuntime
class ETagGeneratorTest extends FreeSpec { class ETagGeneratorTest extends FreeSpec {
private val bigFile = Resource.select(this, "../big-file") private val bigFile = Resource.select(this, "../big-file")
private val bigFilePath = bigFile.toPath private val bigFilePath = bigFile.toPath
private val configuration = new TransferManagerConfiguration private val configuration = new TransferManagerConfiguration
private val chunkSize = 1200000 private val chunkSize = 1200000
configuration.setMinimumUploadPartSize(chunkSize) configuration.setMinimumUploadPartSize(chunkSize)
// "Create offsets" - { // "Create offsets" - {
@ -24,9 +23,6 @@ class ETagGeneratorTest extends FreeSpec {
// } // }
// } // }
private val runtime: DefaultRuntime = new DefaultRuntime {}
object TestEnv
// "create md5 hash for each chunk" - { // "create md5 hash for each chunk" - {
// "should create expected hash for chunks" in { // "should create expected hash for chunks" in {
// val md5Hashes = List( // val md5Hashes = List(

View file

@ -39,22 +39,6 @@
<artifactId>scala-library</artifactId> <artifactId>scala-library</artifactId>
</dependency> </dependency>
<!-- eip-zio -->
<dependency>
<groupId>net.kemitix</groupId>
<artifactId>eip-zio_2.13</artifactId>
</dependency>
<!-- zio -->
<dependency>
<groupId>dev.zio</groupId>
<artifactId>zio_2.13</artifactId>
</dependency>
<dependency>
<groupId>dev.zio</groupId>
<artifactId>zio-streams_2.13</artifactId>
</dependency>
<!-- scala - testing --> <!-- scala - testing -->
<dependency> <dependency>
<groupId>org.scalatest</groupId> <groupId>org.scalatest</groupId>

View file

@ -1,117 +1,102 @@
package net.kemitix.thorp.uishell package net.kemitix.thorp.uishell
import net.kemitix.eip.zio.MessageChannel
import net.kemitix.thorp.config.Configuration import net.kemitix.thorp.config.Configuration
import net.kemitix.thorp.console.{Console, ConsoleOut} import net.kemitix.thorp.console.{Console, ConsoleOut}
import net.kemitix.thorp.domain.Action.DoNothing
import net.kemitix.thorp.domain.Channel.Listener
import net.kemitix.thorp.domain.Terminal.{eraseLineForward, eraseToEndOfScreen} import net.kemitix.thorp.domain.Terminal.{eraseLineForward, eraseToEndOfScreen}
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import zio.{UIO, ZIO}
object UIShell { object UIShell {
def receiver( def receiver(configuration: Configuration): Listener[UIEvent] = {
configuration: Configuration case _: UIEvent.ShowValidConfig => showValidConfig(configuration)
): UIO[MessageChannel.UReceiver[Any, UIEvent]] = case uie: UIEvent.RemoteDataFetched => remoteDataFetched(uie.size)
UIO { uiEventMessage => case uie: UIEvent.ShowSummary => showSummary(uie.counters)
uiEventMessage.body match { case uie: UIEvent.FileFound =>
case _: UIEvent.ShowValidConfig => showValidConfig(configuration) fileFound(configuration, uie.localFile)
case uie: UIEvent.RemoteDataFetched => remoteDataFetched(uie.size) case uie: UIEvent.ActionChosen =>
case uie: UIEvent.ShowSummary => showSummary(uie.counters) actionChosen(configuration, uie.action)
case uie: UIEvent.FileFound => fileFound(configuration, uie.localFile) case uie: UIEvent.AwaitingAnotherUpload =>
case uie: UIEvent.ActionChosen => awaitingUpload(uie.remoteKey, uie.hash)
actionChosen(configuration, uie.action) case uie: UIEvent.AnotherUploadWaitComplete =>
case uie: UIEvent.AwaitingAnotherUpload => uploadWaitComplete(uie.action)
awaitingUpload(uie.remoteKey, uie.hash) case uie: UIEvent.ActionFinished =>
case uie: UIEvent.AnotherUploadWaitComplete => actionFinished(configuration, uie.event)
uploadWaitComplete(uie.action) case _: UIEvent.KeyFound => ()
case uie: UIEvent.ActionFinished => case uie: UIEvent.RequestCycle =>
actionFinished(configuration, uie.event) ProgressUI.requestCycle(
case _: UIEvent.KeyFound => UIO.unit configuration,
case uie: UIEvent.RequestCycle => uie.localFile,
ProgressUI.requestCycle( uie.bytesTransferred,
configuration, uie.index,
uie.localFile, uie.totalBytesSoFar
uie.bytesTransferred, )
uie.index, }
uie.totalBytesSoFar
)
UIO.unit
}
}
private def actionFinished(configuration: Configuration, private def actionFinished(configuration: Configuration,
event: StorageEvent): UIO[Unit] = { event: StorageEvent): Unit =
val batchMode = configuration.batchMode event match {
for { case _: StorageEvent.DoNothingEvent => ()
_ <- event match { case copyEvent: StorageEvent.CopyEvent =>
case _: StorageEvent.DoNothingEvent => UIO.unit val sourceKey = copyEvent.sourceKey
case copyEvent: StorageEvent.CopyEvent => val targetKey = copyEvent.targetKey
val sourceKey = copyEvent.sourceKey Console.putMessageLnB(
val targetKey = copyEvent.targetKey ConsoleOut.copyComplete(sourceKey, targetKey),
Console.putMessageLnB( configuration.batchMode
ConsoleOut.copyComplete(sourceKey, targetKey), )
batchMode case uploadEvent: StorageEvent.UploadEvent =>
val remoteKey = uploadEvent.remoteKey
Console
.putMessageLnB(
ConsoleOut.uploadComplete(remoteKey),
configuration.batchMode
) )
UIO.unit ProgressUI.finishedUploading(remoteKey)
case uploadEvent: StorageEvent.UploadEvent => case deleteEvent: StorageEvent.DeleteEvent =>
val remoteKey = uploadEvent.remoteKey val remoteKey = deleteEvent.remoteKey
Console Console.putMessageLnB(
.putMessageLnB(ConsoleOut.uploadComplete(remoteKey), batchMode) ConsoleOut.deleteComplete(remoteKey),
ProgressUI.finishedUploading(remoteKey) configuration.batchMode
UIO.unit )
case deleteEvent: StorageEvent.DeleteEvent => case errorEvent: StorageEvent.ErrorEvent =>
val remoteKey = deleteEvent.remoteKey val remoteKey = errorEvent.remoteKey
Console.putMessageLnB(ConsoleOut.deleteComplete(remoteKey), batchMode) val action = errorEvent.action
UIO.unit val e = errorEvent.e
case errorEvent: StorageEvent.ErrorEvent => ProgressUI.finishedUploading(remoteKey)
val remoteKey = errorEvent.remoteKey Console.putMessageLnB(
val action = errorEvent.action ConsoleOut.errorQueueEventOccurred(action, e),
val e = errorEvent.e configuration.batchMode
ProgressUI.finishedUploading(remoteKey) )
UIO( case _: StorageEvent.ShutdownEvent => ()
Console.putMessageLnB( }
ConsoleOut.errorQueueEventOccurred(action, e),
batchMode
)
)
case _: StorageEvent.ShutdownEvent => UIO.unit
}
} yield ()
}
private def uploadWaitComplete(action: Action): UIO[Unit] = { private def uploadWaitComplete(action: Action): Unit =
Console.putStrLn(s"Finished waiting to other upload - now $action") Console.putStrLn(s"Finished waiting to other upload - now $action")
UIO.unit
}
private def awaitingUpload(remoteKey: RemoteKey, hash: MD5Hash): UIO[Unit] = { private def awaitingUpload(remoteKey: RemoteKey, hash: MD5Hash): Unit =
Console.putStrLn( Console.putStrLn(
s"Awaiting another upload of $hash before copying it to $remoteKey" s"Awaiting another upload of $hash before copying it to $remoteKey"
) )
UIO.unit
}
private def fileFound(configuration: Configuration, private def fileFound(configuration: Configuration,
localFile: LocalFile): UIO[Unit] = localFile: LocalFile): Unit =
ZIO.when(configuration.batchMode) { if (configuration.batchMode) {
Console.putStrLn(s"Found: ${localFile.file}") Console.putStrLn(s"Found: ${localFile.file}")
UIO.unit
} }
private def showSummary(counters: Counters): UIO[Unit] = { private def showSummary(counters: Counters): Unit = {
Console.putStrLn(eraseToEndOfScreen) Console.putStrLn(eraseToEndOfScreen)
Console.putStrLn(s"Uploaded ${counters.uploaded} files") Console.putStrLn(s"Uploaded ${counters.uploaded} files")
Console.putStrLn(s"Copied ${counters.copied} files") Console.putStrLn(s"Copied ${counters.copied} files")
Console.putStrLn(s"Deleted ${counters.deleted} files") Console.putStrLn(s"Deleted ${counters.deleted} files")
Console.putStrLn(s"Errors ${counters.errors}") Console.putStrLn(s"Errors ${counters.errors}")
UIO.unit
} }
private def remoteDataFetched(size: Int): UIO[Unit] = { private def remoteDataFetched(size: Int): Unit =
Console.putStrLn(s"Found $size remote objects") Console.putStrLn(s"Found $size remote objects")
UIO.unit
}
private def showValidConfig(configuration: Configuration): UIO[Unit] = { private def showValidConfig(configuration: Configuration): Unit =
Console.putMessageLn( Console.putMessageLn(
ConsoleOut.validConfig( ConsoleOut.validConfig(
configuration.bucket, configuration.bucket,
@ -119,8 +104,6 @@ object UIShell {
configuration.sources configuration.sources
) )
) )
UIO.unit
}
def trimHead(str: String): String = { def trimHead(str: String): String = {
val width = Terminal.width val width = Terminal.width
@ -130,11 +113,13 @@ object UIShell {
} }
} }
def actionChosen(configuration: Configuration, action: Action): UIO[Unit] = { def actionChosen(configuration: Configuration, action: Action): Unit =
val message = trimHead(action.asString()) + eraseLineForward if (configuration.batchMode)
if (configuration.batchMode) Console.putStr(message + "\r") Console.putStrLn(action.asString())
else Console.putStrLn(message) else
UIO.unit action match {
} case _: DoNothing => ()
case _ => Console.putStr(action.asString() + eraseLineForward + "\r")
}
} }

View file

@ -2,14 +2,12 @@ package net.kemitix.thorp.uishell
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import net.kemitix.eip.zio.Message import net.kemitix.thorp.domain.{Channel, LocalFile}
import net.kemitix.eip.zio.MessageChannel.UChannel
import net.kemitix.thorp.domain.LocalFile
import net.kemitix.thorp.uishell.UploadProgressEvent.RequestEvent import net.kemitix.thorp.uishell.UploadProgressEvent.RequestEvent
object UploadEventListener { object UploadEventListener {
final case class Settings(uiChannel: UChannel[Any, UIEvent], final case class Settings(uiSink: Channel.Sink[UIEvent],
localFile: LocalFile, localFile: LocalFile,
index: Int, index: Int,
totalBytesSoFar: Long, totalBytesSoFar: Long,
@ -21,14 +19,12 @@ object UploadEventListener {
{ {
event match { event match {
case e: RequestEvent => case e: RequestEvent =>
settings.uiChannel( settings.uiSink.accept(
Message.withBody( UIEvent.requestCycle(
UIEvent.requestCycle( settings.localFile,
settings.localFile, bytesTransferred.addAndGet(e.transferred),
bytesTransferred.addAndGet(e.transferred), settings.index,
settings.index, settings.totalBytesSoFar
settings.totalBytesSoFar
)
) )
) )
case _ => () case _ => ()