diff --git a/.run/Main.run.xml b/.run/Main.run.xml new file mode 100644 index 0000000..5815443 --- /dev/null +++ b/.run/Main.run.xml @@ -0,0 +1,16 @@ + + + + \ No newline at end of file diff --git a/.run/Test whole project.run.xml b/.run/Test whole project.run.xml new file mode 100644 index 0000000..6ff84cd --- /dev/null +++ b/.run/Test whole project.run.xml @@ -0,0 +1,23 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/app/src/main/scala/net/kemitix/thorp/Main.scala b/app/src/main/scala/net/kemitix/thorp/Main.scala index efab8d5..4c97997 100644 --- a/app/src/main/scala/net/kemitix/thorp/Main.scala +++ b/app/src/main/scala/net/kemitix/thorp/Main.scala @@ -2,7 +2,9 @@ package net.kemitix.thorp object Main { - def main(args: Array[String]): Unit = + def main(args: Array[String]): Unit = { Program.run(args.toList) + System.exit(0) + } } diff --git a/app/src/main/scala/net/kemitix/thorp/Program.scala b/app/src/main/scala/net/kemitix/thorp/Program.scala index 23eb4c5..7cd2320 100644 --- a/app/src/main/scala/net/kemitix/thorp/Program.scala +++ b/app/src/main/scala/net/kemitix/thorp/Program.scala @@ -9,7 +9,8 @@ import net.kemitix.thorp.domain.StorageEvent.{ ErrorEvent, UploadEvent } -import net.kemitix.thorp.domain.{Channel, Counters, StorageEvent} +import net.kemitix.thorp.domain.channel.{Channel, Sink} +import net.kemitix.thorp.domain.{Counters, StorageEvent} import net.kemitix.thorp.lib.{LocalFileSystem, UnversionedMirrorArchive} import net.kemitix.thorp.storage.Storage import net.kemitix.thorp.uishell.{UIEvent, UIShell} @@ -19,7 +20,7 @@ import scala.jdk.CollectionConverters._ trait Program { - val version = "0.11.0" + val version = "1.1.0-SNAPSHOT" lazy val versionLabel = s"${WHITE}Thorp v$version$RESET" def run(args: List[String]): Unit = { @@ -35,33 +36,39 @@ trait Program { cli => ConfigQuery.showVersion(cli) private def executeWithUI(configuration: Configuration): Unit = { - val uiChannel: Channel[UIEvent] = Channel.create("thorp-ui") - uiChannel.addListener(UIShell.listener(configuration)) - uiChannel.run(sink => execute(configuration, sink), "thorp-main") - uiChannel.start() - uiChannel.waitForShutdown() + Channel + .create("ui") + .addListener(UIShell.listener(configuration)) + .run(execute(configuration)(_)) + .start() + .waitForShutdown() } - private def execute(configuration: Configuration, - uiSink: Channel.Sink[UIEvent]) = { - showValidConfig(uiSink) - val remoteObjects = - fetchRemoteData(configuration, uiSink) - val archive = UnversionedMirrorArchive - val storageEvents = LocalFileSystem - .scanCopyUpload(configuration, uiSink, remoteObjects, archive) - val deleteEvents = LocalFileSystem - .scanDelete(configuration, uiSink, remoteObjects, archive) - Storage.getInstance().shutdown(); - showSummary(uiSink)(storageEvents ++ deleteEvents) - uiSink.shutdown(); + private def execute( + configuration: Configuration + )(uiSink: Sink[UIEvent]): Unit = { + try { + showValidConfig(uiSink) + val remoteObjects = + fetchRemoteData(configuration, uiSink) + val archive = UnversionedMirrorArchive.create + val storageEvents = LocalFileSystem + .scanCopyUpload(configuration, uiSink, remoteObjects, archive) + val deleteEvents = LocalFileSystem + .scanDelete(configuration, uiSink, remoteObjects, archive) + showSummary(uiSink)( + (storageEvents.asScala ++ deleteEvents.asScala).toList + ) + } finally { + Storage.getInstance().shutdown() + } } - private def showValidConfig(uiSink: Channel.Sink[UIEvent]): Unit = + private def showValidConfig(uiSink: Sink[UIEvent]): Unit = uiSink.accept(UIEvent.showValidConfig) private def fetchRemoteData(configuration: Configuration, - uiSink: Channel.Sink[UIEvent]) = { + uiSink: Sink[UIEvent]) = { val bucket = configuration.bucket val prefix = configuration.prefix val objects = Storage.getInstance().list(bucket, prefix) @@ -78,7 +85,7 @@ trait Program { } private def showSummary( - uiSink: Channel.Sink[UIEvent] + uiSink: Sink[UIEvent] )(events: Seq[StorageEvent]): Unit = { val counters = events.foldLeft(Counters.empty)(countActivities) uiSink.accept(UIEvent.showSummary(counters)) diff --git a/config/src/main/java/net/kemitix/thorp/config/Configuration.java b/config/src/main/java/net/kemitix/thorp/config/Configuration.java index 1fc138c..5d8675f 100644 --- a/config/src/main/java/net/kemitix/thorp/config/Configuration.java +++ b/config/src/main/java/net/kemitix/thorp/config/Configuration.java @@ -21,7 +21,7 @@ public class Configuration { public final boolean batchMode; public final int parallel; public final Sources sources; - static Configuration create() { + public static Configuration create() { return new Configuration( Bucket.named(""), RemoteKey.create(""), diff --git a/docs/images/reactor-graph.png b/docs/images/reactor-graph.png index d9e9193..9e9e45b 100644 Binary files a/docs/images/reactor-graph.png and b/docs/images/reactor-graph.png differ diff --git a/domain/src/main/java/net/kemitix/thorp/domain/Channel.java b/domain/src/main/java/net/kemitix/thorp/domain/Channel.java deleted file mode 100644 index 59ad8d6..0000000 --- a/domain/src/main/java/net/kemitix/thorp/domain/Channel.java +++ /dev/null @@ -1,188 +0,0 @@ -package net.kemitix.thorp.domain; - -import lombok.RequiredArgsConstructor; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedTransferQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; - -public interface Channel { - static Channel create(String name) { - return new ChannelImpl(name); - } - - void start(); - Channel add(T item); - Channel addAll(Collection items); - Channel addListener(Listener listener); - Channel removeListener(Listener listener); - Channel run(Consumer> program, String name); - void shutdown(); - void shutdownNow() throws InterruptedException; - void waitForShutdown() throws InterruptedException; - - class ChannelImpl implements Channel { - private final BlockingQueue queue = new LinkedTransferQueue<>(); - private final Runner runner; - private final Thread thread; - - public ChannelImpl(String name) { - runner = new Runner(name, queue); - thread = new Thread(runner, name); - } - - @Override - public void start() { - thread.start(); - } - - @Override - public Channel add(T item) { - queue.add(item); - return this; - } - - @Override - public Channel addAll(Collection items) { - queue.addAll(items); - return this; - } - - @Override - public Channel addListener(Listener listener) { - runner.addListener(listener); - return this; - } - - @Override - public Channel removeListener(Listener listener) { - runner.removeListener(listener); - return this; - } - - @Override - public Channel run(Consumer> program, String name) { - return spawn(() -> program.accept(runner), name); - } - - private Channel spawn(Runnable runnable, String name) { - Thread thread = new Thread(new Runnable() { - @Override - public void run() { - try { - runnable.run(); - } catch (Exception e) { - shutdown(); - } - } - }, name); - thread.start(); - return this; - } - - @Override - public void shutdown() { - runner.shutdown(); - } - - @Override - public void shutdownNow() throws InterruptedException { - runner.shutdownNow(); - } - - @Override - public void waitForShutdown() throws InterruptedException { - runner.waitForShutdown(); - } - - } - - @RequiredArgsConstructor - class Runner implements Runnable, Sink { - private final String name; - private final BlockingQueue queue; - private final AtomicBoolean shutdown = new AtomicBoolean(false); - private final List> listeners = new ArrayList<>(); - private final CountDownLatch shutdownLatch = new CountDownLatch(1); - private Thread runnerThread; - - @Override - public void run() { - runnerThread = Thread.currentThread(); - while(isRunning()) { - takeItem() - .ifPresent(item -> { - listeners.forEach(listener -> { - listener.accept(item); - }); - }); - } - shutdownLatch.countDown(); - } - - public void addListener(Listener listener) { - listeners.add(listener); - } - - public void removeListener(Listener listener) { - listeners.remove(listener); - } - - public Optional takeItem() { - try { - return Optional.of(queue.take()); - } catch (InterruptedException e) { - shutdown(); - } - return Optional.empty(); - } - - private boolean isRunning() { - return !isShutdown(); - } - - private boolean isShutdown() { - return shutdown.get() && queue.isEmpty(); - } - - @Override - public void accept(T item) { - queue.add(item); - } - - @Override - public void shutdown() { - if (isRunning()) { - shutdown.set(true); - } - if (queue.isEmpty() && runnerThread != null) { - runnerThread.interrupt(); - } - } - - public void shutdownNow() throws InterruptedException { - shutdown(); - waitForShutdown(); - } - - public void waitForShutdown() throws InterruptedException { - if (isRunning()) - shutdownLatch.await(); - } - } - - interface Listener { - void accept(T item); - } - - interface Sink { - void accept(T item); - void shutdown(); - } -} diff --git a/domain/src/main/java/net/kemitix/thorp/domain/Filter.java b/domain/src/main/java/net/kemitix/thorp/domain/Filter.java index 0ed7aa7..15a3329 100644 --- a/domain/src/main/java/net/kemitix/thorp/domain/Filter.java +++ b/domain/src/main/java/net/kemitix/thorp/domain/Filter.java @@ -1,7 +1,5 @@ package net.kemitix.thorp.domain; -import lombok.AccessLevel; -import lombok.RequiredArgsConstructor; import net.kemitix.mon.TypeAlias; import java.util.function.Predicate; diff --git a/domain/src/main/java/net/kemitix/thorp/domain/HashGenerator.java b/domain/src/main/java/net/kemitix/thorp/domain/HashGenerator.java index afc1779..5d2bf6a 100644 --- a/domain/src/main/java/net/kemitix/thorp/domain/HashGenerator.java +++ b/domain/src/main/java/net/kemitix/thorp/domain/HashGenerator.java @@ -21,6 +21,10 @@ public interface HashGenerator { for(HashGenerator hashGenerator: hashGenerators) { list.add(hashGenerator); } + if (list.isEmpty()) { + throw new UnsupportedOperationException( + "No HashGenerator implementations available"); + } return list; } static HashGenerator generatorFor(String label) { diff --git a/domain/src/main/java/net/kemitix/thorp/domain/LocalFile.java b/domain/src/main/java/net/kemitix/thorp/domain/LocalFile.java index 47cdc19..4aceb53 100644 --- a/domain/src/main/java/net/kemitix/thorp/domain/LocalFile.java +++ b/domain/src/main/java/net/kemitix/thorp/domain/LocalFile.java @@ -1,24 +1,26 @@ package net.kemitix.thorp.domain; import lombok.AccessLevel; +import lombok.EqualsAndHashCode; import lombok.RequiredArgsConstructor; import java.io.File; import java.util.Optional; +@EqualsAndHashCode @RequiredArgsConstructor(access = AccessLevel.PRIVATE) public class LocalFile { public final File file; public final File source; public final Hashes hashes; public final RemoteKey remoteKey; - public final Long length; + public final long length; public static LocalFile create( File file, File source, Hashes hashes, RemoteKey remoteKey, - Long length + long length ) { return new LocalFile(file, source, hashes, remoteKey, length); } diff --git a/domain/src/main/java/net/kemitix/thorp/domain/channel/Channel.java b/domain/src/main/java/net/kemitix/thorp/domain/channel/Channel.java new file mode 100644 index 0000000..06d7e7b --- /dev/null +++ b/domain/src/main/java/net/kemitix/thorp/domain/channel/Channel.java @@ -0,0 +1,26 @@ +package net.kemitix.thorp.domain.channel; + +import java.util.Collection; +import java.util.function.Consumer; + +public interface Channel extends Sink { + + static Channel create(String name) { + return new ChannelImpl<>(name, false); + } + static Channel createWithTracing(String name) { + return new ChannelImpl<>(name, true); + } + + Channel start(); + Channel add(T item); + Channel addAll(Collection items); + Channel addListener(Listener listener); + Channel removeListener(Listener listener); + Channel run(Consumer> program); + + void shutdown(); + void shutdownNow() throws InterruptedException; + void waitForShutdown() throws InterruptedException; + +} diff --git a/domain/src/main/java/net/kemitix/thorp/domain/channel/ChannelImpl.java b/domain/src/main/java/net/kemitix/thorp/domain/channel/ChannelImpl.java new file mode 100644 index 0000000..2f618b5 --- /dev/null +++ b/domain/src/main/java/net/kemitix/thorp/domain/channel/ChannelImpl.java @@ -0,0 +1,97 @@ +package net.kemitix.thorp.domain.channel; + +import java.util.Collection; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedTransferQueue; +import java.util.function.Consumer; + +class ChannelImpl implements Channel { + private final boolean tracing; + private final BlockingQueue queue = new LinkedTransferQueue<>(); + private final Runner runner; + private final Thread thread; + private final String name; + + public ChannelImpl(String name, boolean tracing) { + this.name = name; + this.tracing = tracing; + runner = new Runner<>(queue, tracing); + thread = new Thread(runner, String.format("---->-lnr-%s", name)); + } + + @Override + public Channel start() { + trace("launching"); + thread.start(); + return this; + } + + public void trace(String message) { + if (tracing) + System.out.printf("[channel:%s] %s%n", Thread.currentThread().getName(), message); + } + + @Override + public void accept(T item) { + queue.add(item); + } + + @Override + public Channel add(T item) { + queue.add(item); + return this; + } + + @Override + public Channel addAll(Collection items) { + queue.addAll(items); + return this; + } + + @Override + public Channel addListener(Listener listener) { + runner.addListener(listener); + return this; + } + + @Override + public Channel removeListener(Listener listener) { + runner.removeListener(listener); + return this; + } + + @Override + public Channel run(Consumer> program) { + return spawn(() -> program.accept(runner)); + } + + private Channel spawn(Runnable runnable) { + Thread thread = new Thread(() -> { + trace("starting"); + try { + runnable.run(); + trace("finishing normally"); + } finally { + shutdown(); + trace("stopping"); + } + }, String.format("channel-src->-----%s", name)); + thread.start(); + return this; + } + + @Override + public void shutdown() { + runner.shutdown(); + } + + @Override + public void shutdownNow() throws InterruptedException { + runner.shutdownNow(); + } + + @Override + public void waitForShutdown() throws InterruptedException { + runner.waitForShutdown(); + } +} diff --git a/domain/src/main/java/net/kemitix/thorp/domain/channel/Listener.java b/domain/src/main/java/net/kemitix/thorp/domain/channel/Listener.java new file mode 100644 index 0000000..c5d1af6 --- /dev/null +++ b/domain/src/main/java/net/kemitix/thorp/domain/channel/Listener.java @@ -0,0 +1,5 @@ +package net.kemitix.thorp.domain.channel; + +public interface Listener { + void accept(T item); +} diff --git a/domain/src/main/java/net/kemitix/thorp/domain/channel/Runner.java b/domain/src/main/java/net/kemitix/thorp/domain/channel/Runner.java new file mode 100644 index 0000000..fbffabc --- /dev/null +++ b/domain/src/main/java/net/kemitix/thorp/domain/channel/Runner.java @@ -0,0 +1,107 @@ +package net.kemitix.thorp.domain.channel; + +import lombok.RequiredArgsConstructor; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +@RequiredArgsConstructor +class Runner implements Runnable, Sink { + + private final BlockingQueue queue; + private final boolean tracing; + + private final AtomicBoolean shutdown = new AtomicBoolean(false); + private final AtomicBoolean isWaiting = new AtomicBoolean(false); + private final List> listeners = new ArrayList<>(); + private final CountDownLatch shutdownLatch = new CountDownLatch(1); + private final Object takeLock = new Object(); + + private Thread runnerThread; + + public void trace(String message) { + if (tracing) + System.out.printf("[runner :%s] %s%n", Thread.currentThread().getName(), message); + } + + @Override + public void run() { + runnerThread = Thread.currentThread(); + trace("starting"); + try { + while (isRunning()) { + takeItem() + .ifPresent(item -> { + listeners.forEach(listener -> + listener.accept(item)); + }); + } + trace("finishing"); + } catch (InterruptedException e) { + // would have been waiting to take from an empty queue when it was shutdown + trace(String.format("interrupted (%d items in queue)", queue.size())); + } finally { + trace("complete"); + shutdownLatch.countDown(); + } + } + + public void addListener(Listener listener) { + listeners.add(listener); + } + + public void removeListener(Listener listener) { + listeners.remove(listener); + } + + public Optional takeItem() throws InterruptedException { + synchronized (takeLock) { + isWaiting.set(true); + T take = queue.take(); + isWaiting.set(false); + return Optional.of(take); + } + } + + private boolean isRunning() { + return !isShutdown(); + } + + private boolean isShutdown() { + return shutdown.get() && queue.isEmpty(); + } + + @Override + public void accept(T item) { + queue.add(item); + } + + @Override + public void shutdown() { + String threadName = Thread.currentThread().getName(); + if (isRunning()) { + trace("running - marking as shutdown"); + shutdown.set(true); + } + if (queue.isEmpty() && isWaiting.get() && runnerThread != null) { + trace("interrupting waiting runner"); + runnerThread.interrupt(); + } else { + trace(String.format("NOT interrupting runner (%d items, waiting: %s)", queue.size(), isWaiting.get())); + } + } + + public void shutdownNow() throws InterruptedException { + shutdown(); + waitForShutdown(); + } + + public void waitForShutdown() throws InterruptedException { + if (isRunning()) + shutdownLatch.await(); + } + } diff --git a/domain/src/main/java/net/kemitix/thorp/domain/channel/Sink.java b/domain/src/main/java/net/kemitix/thorp/domain/channel/Sink.java new file mode 100644 index 0000000..ee54a10 --- /dev/null +++ b/domain/src/main/java/net/kemitix/thorp/domain/channel/Sink.java @@ -0,0 +1,5 @@ +package net.kemitix.thorp.domain.channel; + +public interface Sink extends Listener { + void shutdown(); +} diff --git a/domain/src/test/java/net/kemitix/thorp/domain/HashesTest.java b/domain/src/test/java/net/kemitix/thorp/domain/HashesTest.java index 4153d40..078c8ee 100644 --- a/domain/src/test/java/net/kemitix/thorp/domain/HashesTest.java +++ b/domain/src/test/java/net/kemitix/thorp/domain/HashesTest.java @@ -10,24 +10,21 @@ import java.util.Arrays; public class HashesTest implements WithAssertions { - @Nested - @DisplayName("mergeAll") - public class MergeAll { - @Test - @DisplayName("") - public void mergeAll() { - //given - HashType key1 = HashType.MD5; - HashType key2 = HashType.DUMMY; - MD5Hash value1 = MD5Hash.create("1"); - MD5Hash value2 = MD5Hash.create("2"); - Hashes hashes1 = Hashes.create(key1, value1); - Hashes hashes2 = Hashes.create(key2, value2); - //when - Hashes result = Hashes.mergeAll(Arrays.asList(hashes1,hashes2)); - //then - assertThat(result.keys()).containsExactlyInAnyOrder(key1, key2); - assertThat(result.values()).containsExactlyInAnyOrder(value1, value2); - } + @Test + @DisplayName("mergeAll()") + public void mergeAll() { + //given + HashType key1 = HashType.MD5; + HashType key2 = HashType.DUMMY; + MD5Hash value1 = MD5Hash.create("1"); + MD5Hash value2 = MD5Hash.create("2"); + Hashes hashes1 = Hashes.create(key1, value1); + Hashes hashes2 = Hashes.create(key2, value2); + //when + Hashes result = Hashes.mergeAll(Arrays.asList(hashes1, hashes2)); + //then + assertThat(result.keys()).containsExactlyInAnyOrder(key1, key2); + assertThat(result.values()).containsExactlyInAnyOrder(value1, value2); } + } \ No newline at end of file diff --git a/lib/pom.xml b/lib/pom.xml index 20a129f..52c9bb9 100644 --- a/lib/pom.xml +++ b/lib/pom.xml @@ -41,27 +41,16 @@ thorp-storage - + - org.scala-lang - scala-library + org.junit.jupiter + junit-jupiter + test - - - org.scalatest - scalatest_2.13 + org.assertj + assertj-core test - - - - - net.alchim31.maven - scala-maven-plugin - - - - \ No newline at end of file diff --git a/lib/src/main/java/net/kemitix/thorp/lib/Archive.java b/lib/src/main/java/net/kemitix/thorp/lib/Archive.java new file mode 100644 index 0000000..b100fdf --- /dev/null +++ b/lib/src/main/java/net/kemitix/thorp/lib/Archive.java @@ -0,0 +1,49 @@ +package net.kemitix.thorp.lib; + +import net.kemitix.thorp.config.Configuration; +import net.kemitix.thorp.console.Console; +import net.kemitix.thorp.console.ConsoleOut; +import net.kemitix.thorp.domain.RemoteKey; +import net.kemitix.thorp.domain.StorageEvent; +import net.kemitix.thorp.domain.channel.Sink; +import net.kemitix.thorp.uishell.UIEvent; + +public interface Archive { + + StorageEvent update( + Configuration configuration, + Sink uiSink, + SequencedAction sequencedAction, + long totalBytesSoFar); + + default StorageEvent logEvent( + Configuration configuration, + StorageEvent event + ) { + boolean batchMode = configuration.batchMode; + + if (event instanceof StorageEvent.UploadEvent) { + RemoteKey remoteKey = ((StorageEvent.UploadEvent) event).remoteKey; + Console.putMessageLnB( + ConsoleOut.uploadComplete(remoteKey), batchMode); + } else if (event instanceof StorageEvent.CopyEvent) { + StorageEvent.CopyEvent copyEvent = (StorageEvent.CopyEvent) event; + RemoteKey sourceKey = copyEvent.sourceKey; + RemoteKey targetKey = copyEvent.targetKey; + Console.putMessageLnB( + ConsoleOut.copyComplete(sourceKey, targetKey), batchMode); + } else if (event instanceof StorageEvent.DeleteEvent) { + RemoteKey remoteKey = ((StorageEvent.DeleteEvent) event).remoteKey; + Console.putMessageLnB( + ConsoleOut.deleteComplete(remoteKey), batchMode); + } else if (event instanceof StorageEvent.ErrorEvent) { + StorageEvent.ErrorEvent errorEvent = (StorageEvent.ErrorEvent) event; + StorageEvent.ActionSummary action = errorEvent.action; + Throwable e = errorEvent.e; + Console.putMessageLnB( + ConsoleOut.errorQueueEventOccurred(action, e), batchMode); + } + + return event; + } +} diff --git a/lib/src/main/java/net/kemitix/thorp/lib/FileScanner.java b/lib/src/main/java/net/kemitix/thorp/lib/FileScanner.java index a65e38c..8a5db43 100644 --- a/lib/src/main/java/net/kemitix/thorp/lib/FileScanner.java +++ b/lib/src/main/java/net/kemitix/thorp/lib/FileScanner.java @@ -2,6 +2,7 @@ package net.kemitix.thorp.lib; import net.kemitix.thorp.config.Configuration; import net.kemitix.thorp.domain.*; +import net.kemitix.thorp.domain.channel.Sink; import net.kemitix.thorp.filesystem.FileSystem; import net.kemitix.thorp.filesystem.PathCache; @@ -14,17 +15,16 @@ import java.util.List; public interface FileScanner { static void scanSources( Configuration configuration, - Channel.Sink fileSink + Sink fileSink ) { configuration.sources.paths() .forEach(path -> scanSource(configuration, fileSink, path)); - fileSink.shutdown(); } static void scanSource( Configuration configuration, - Channel.Sink fileSink, + Sink fileSink, Path sourcePath ) { scanPath(configuration, fileSink, sourcePath); @@ -32,7 +32,7 @@ public interface FileScanner { static void scanPath( Configuration configuration, - Channel.Sink fileSink, + Sink fileSink, Path path ) { // dirs @@ -45,7 +45,7 @@ public interface FileScanner { static void handleFile( Configuration configuration, - Channel.Sink fileSink, + Sink fileSink, File file ) { boolean isIncluded = Filters.isIncluded(configuration, file); diff --git a/lib/src/main/java/net/kemitix/thorp/lib/Filters.java b/lib/src/main/java/net/kemitix/thorp/lib/Filters.java new file mode 100644 index 0000000..a40a36a --- /dev/null +++ b/lib/src/main/java/net/kemitix/thorp/lib/Filters.java @@ -0,0 +1,56 @@ +package net.kemitix.thorp.lib; + +import net.kemitix.thorp.config.Configuration; +import net.kemitix.thorp.domain.Filter; + +import java.io.File; +import java.nio.file.Path; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +public interface Filters { + + static boolean isIncluded( + Configuration configuration, + File file + ) { + return isIncluded(file.toPath(), configuration.filters); + } + + static boolean isIncluded( + Path path, + List filters + ) { + AtomicBoolean included = new AtomicBoolean(isIncludedByDefault(filters)); + filters.forEach( + filter -> { + if (included.get()) { + if (filter instanceof Filter.Exclude) { + boolean excludedByFilter = isExcludedByFilter(path, filter); + included.set(!excludedByFilter); + } + } else { + if (filter instanceof Filter.Include) { + boolean includedByFilter = isIncludedByFilter(path, filter); + included.set(includedByFilter); + } + } + } + ); + return included.get(); + } + + static boolean isIncludedByDefault(List filters) { + return filters.isEmpty() || + filters.stream() + .allMatch(filter -> + filter instanceof Filter.Exclude); + } + + static boolean isIncludedByFilter(Path path, Filter filter) { + return filter.predicate().test(path.toFile().getPath()); + } + static boolean isExcludedByFilter(Path path, Filter filter) { + return filter.predicate().test(path.toFile().getPath()); + } +} diff --git a/lib/src/main/java/net/kemitix/thorp/lib/LocalFileSystem.java b/lib/src/main/java/net/kemitix/thorp/lib/LocalFileSystem.java new file mode 100644 index 0000000..97ae8e2 --- /dev/null +++ b/lib/src/main/java/net/kemitix/thorp/lib/LocalFileSystem.java @@ -0,0 +1,183 @@ +package net.kemitix.thorp.lib; + +import lombok.val; +import net.kemitix.thorp.config.Configuration; +import net.kemitix.thorp.domain.*; +import net.kemitix.thorp.domain.channel.Channel; +import net.kemitix.thorp.domain.channel.Listener; +import net.kemitix.thorp.domain.channel.Sink; +import net.kemitix.thorp.filesystem.FileSystem; +import net.kemitix.thorp.uishell.UIEvent; + +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; + +public interface LocalFileSystem { + static List scanCopyUpload( + Configuration configuration, + Sink uiSink, + RemoteObjects remoteObjects, + Archive archive + ) throws InterruptedException { + AtomicInteger actionCounter = new AtomicInteger(); + AtomicLong bytesCounter = new AtomicLong(); + Map uploads = new HashMap<>(); + Deque events = new LinkedList<>(); + + Channel.create("file-scanner") + .addListener( + listener(configuration, uiSink, remoteObjects, archive, + uploads, actionCounter, bytesCounter, events)) + .run(sink -> FileScanner.scanSources(configuration, sink)) + .start() + .waitForShutdown(); + + return new ArrayList<>(events); + } + + static Listener listener( + Configuration configuration, + Sink uiSink, + RemoteObjects remoteObjects, + Archive archive, + Map uploads, + AtomicInteger actionCounter, + AtomicLong bytesCounter, + Deque events + ) { + val chooseAction = chooseAction(configuration, remoteObjects, uploads, uiSink); + val uiActionChosen = uiActionChosen(uiSink); + val uiActionFinished = uiActionFinished(uiSink, actionCounter, bytesCounter); + return localFile -> { + uiSink.accept(UIEvent.fileFound(localFile)); + Action action = chooseAction.apply(localFile); + actionCounter.incrementAndGet(); + bytesCounter.addAndGet(action.size); + uiActionChosen.accept(action); + SequencedAction sequencedAction = + SequencedAction.create(action, actionCounter.get()); + StorageEvent event = archive.update( + configuration, uiSink, sequencedAction, bytesCounter.get()); + events.addFirst(event); + uiActionFinished.accept(action, event); + }; + } + + static BiConsumer uiActionFinished( + Sink uiSink, + AtomicInteger actionCounter, + AtomicLong bytesCounter + ) { + return (action, event) -> uiSink.accept( + UIEvent.actionFinished(action, + actionCounter.get(), bytesCounter.get(), event)); + } + + static Consumer uiActionChosen(Sink uiSink) { + return action -> uiSink.accept(UIEvent.actionChosen(action)); + } + + static Function chooseAction( + Configuration configuration, + RemoteObjects remoteObjects, + Map uploads, + Sink uiSink + ) { + return localFile -> { + boolean remoteExists = remoteObjects.remoteKeyExists(localFile.remoteKey); + boolean remoteMatches = remoteObjects.remoteMatchesLocalFile(localFile); + Optional> remoteForHash = + remoteObjects.remoteHasHash(localFile.hashes); + Bucket bucket = configuration.bucket; + if (remoteExists && remoteMatches) { + return Action.doNothing(bucket, localFile.remoteKey, localFile.length); + } + if (remoteForHash.isPresent()) { + RemoteKey sourceKey = remoteForHash.get().a; + MD5Hash hash = remoteForHash.get().b; + return Action + .toCopy(bucket, sourceKey, hash, + localFile.remoteKey, + localFile.length); + } else if (localFile.hashes.values().stream() + .anyMatch(uploads::containsKey)) { + return doCopyWithPreviousUpload(localFile, bucket, uploads, uiSink); + } + return Action.toUpload(bucket, localFile, localFile.length); + }; + } + + static Action doCopyWithPreviousUpload( + LocalFile localFile, + Bucket bucket, + Map uploads, + Sink uiSink + ) { + return localFile.hashes + .values() + .stream() + .filter(uploads::containsKey) + .findFirst() + .map(hash -> { + uiSink.accept(UIEvent.awaitingAnotherUpload(localFile.remoteKey, hash)); + Action action = Action.toCopy(bucket, uploads.get(hash), hash, localFile.remoteKey, localFile.length); + //FIXME - there is no waiting going on here!! + uiSink.accept(UIEvent.anotherUploadWaitComplete(action)); + return action; + }).orElseGet(() -> + Action.toUpload(bucket, localFile, localFile.length)); + } + + static List scanDelete( + Configuration configuration, + Sink uiSink, + RemoteObjects remoteData, + Archive archive + ) throws InterruptedException { + AtomicInteger actionCounter = new AtomicInteger(); + AtomicLong bytesCounter = new AtomicLong(); + Deque events = new LinkedList<>(); + Channel.create("delete-scan") + .addListener(deleteListener( + configuration, uiSink, archive, + actionCounter, bytesCounter, events)) + .run(sink -> remoteData.byKey.keys().forEach(sink::accept)) + .start() + .waitForShutdown(); + return new ArrayList<>(events); + } + + static Listener deleteListener( + Configuration configuration, + Sink uiSink, + Archive archive, + AtomicInteger actionCounter, + AtomicLong bytesCounter, + Deque events + ) { + return remoteKey -> { + uiSink.accept(UIEvent.keyFound(remoteKey)); + val sources = configuration.sources; + val prefix = configuration.prefix; + val exists = FileSystem.hasLocalFile(sources, prefix, remoteKey); + if (!exists) { + actionCounter.incrementAndGet(); + val bucket = configuration.bucket; + val action = Action.toDelete(bucket, remoteKey, 0L); + uiActionChosen(uiSink).accept(action); + bytesCounter.addAndGet(action.size); + val sequencedAction = + SequencedAction.create(action, actionCounter.get()); + val event = archive.update(configuration, uiSink, + sequencedAction, 0L); + events.addFirst(event); + uiActionFinished(uiSink, actionCounter, bytesCounter) + .accept(action, event); + } + }; + } +} \ No newline at end of file diff --git a/lib/src/main/java/net/kemitix/thorp/lib/SequencedAction.java b/lib/src/main/java/net/kemitix/thorp/lib/SequencedAction.java new file mode 100644 index 0000000..924a957 --- /dev/null +++ b/lib/src/main/java/net/kemitix/thorp/lib/SequencedAction.java @@ -0,0 +1,20 @@ +package net.kemitix.thorp.lib; + +import net.kemitix.mon.TypeAlias; +import net.kemitix.thorp.domain.Action; +import net.kemitix.thorp.domain.Tuple; + +public class SequencedAction extends TypeAlias> { + private SequencedAction(Tuple value) { + super(value); + } + public static SequencedAction create(Action action, Integer index) { + return new SequencedAction(Tuple.create(action, index)); + } + public Action action() { + return getValue().a; + } + public int index() { + return getValue().b; + } +} diff --git a/lib/src/main/java/net/kemitix/thorp/lib/UnversionedMirrorArchive.java b/lib/src/main/java/net/kemitix/thorp/lib/UnversionedMirrorArchive.java new file mode 100644 index 0000000..47dfde3 --- /dev/null +++ b/lib/src/main/java/net/kemitix/thorp/lib/UnversionedMirrorArchive.java @@ -0,0 +1,55 @@ +package net.kemitix.thorp.lib; + +import net.kemitix.thorp.config.Configuration; +import net.kemitix.thorp.domain.*; +import net.kemitix.thorp.domain.channel.Sink; +import net.kemitix.thorp.storage.Storage; +import net.kemitix.thorp.uishell.UIEvent; +import net.kemitix.thorp.uishell.UploadEventListener; + +public class UnversionedMirrorArchive implements Archive { + + public static Archive create() { + return new UnversionedMirrorArchive(); + } + + @Override + public StorageEvent update( + Configuration configuration, + Sink uiSink, + SequencedAction sequencedAction, + long totalBytesSoFar + ) { + Action action = sequencedAction.action(); + int index = sequencedAction.index(); + Bucket bucket = action.bucket; + if (action instanceof Action.ToUpload) { + LocalFile localFile = ((Action.ToUpload) action).localFile; + return Storage.getInstance() + .upload(localFile, bucket, + UploadEventListener.settings( + uiSink, localFile, index, totalBytesSoFar, + configuration.batchMode)); + } else if(action instanceof Action.ToCopy) { + Action.ToCopy toCopy = (Action.ToCopy) action; + RemoteKey sourceKey = toCopy.sourceKey; + MD5Hash hash = toCopy.hash; + RemoteKey targetKey = toCopy.targetKey; + return Storage.getInstance() + .copy(bucket, sourceKey, hash, targetKey); + } else if(action instanceof Action.ToDelete) { + RemoteKey remoteKey = action.remoteKey; + try { + return Storage.getInstance().delete(bucket, remoteKey); + } catch (Throwable e) { + return StorageEvent.errorEvent( + StorageEvent.ActionSummary.delete(remoteKey.key()), + remoteKey, e); + } + } else { + RemoteKey remoteKey = action.remoteKey; + return StorageEvent.doNothingEvent(remoteKey); + } + } + +} diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/Filters.scala b/lib/src/main/scala/net/kemitix/thorp/lib/Filters.scala deleted file mode 100644 index 54664da..0000000 --- a/lib/src/main/scala/net/kemitix/thorp/lib/Filters.scala +++ /dev/null @@ -1,50 +0,0 @@ -package net.kemitix.thorp.lib - -import java.io.File -import java.nio.file.Path - -import net.kemitix.thorp.config.Configuration -import net.kemitix.thorp.domain.Filter -import net.kemitix.thorp.domain.Filter.{Exclude, Include} - -import scala.jdk.CollectionConverters._ - -object Filters { - - def isIncluded(configuration: Configuration, file: File): Boolean = - isIncluded(file.toPath)(configuration.filters.asScala.toList) - - def isIncluded(p: Path)(filters: List[Filter]): Boolean = { - sealed trait State - final case class Unknown() extends State - final case class Accepted() extends State - final case class Discarded() extends State - val excluded = isExcludedByFilter(p)(_) - val included = isIncludedByFilter(p)(_) - filters.foldRight(Unknown(): State)( - (filter, state) => - (filter, state) match { - case (_, Accepted()) => Accepted() - case (_, Discarded()) => Discarded() - case (x: Exclude, _) if excluded(x) => Discarded() - case (i: Include, _) if included(i) => Accepted() - case _ => Unknown() - } - ) match { - case Accepted() => true - case Discarded() => false - case Unknown() => - filters.forall { - case _: Include => false - case _ => true - } - } - } - - def isIncludedByFilter(path: Path)(filter: Filter): Boolean = - filter.predicate.test(path.toFile.getPath) - - def isExcludedByFilter(path: Path)(filter: Filter): Boolean = - filter.predicate.test(path.toFile.getPath) - -} diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileSystem.scala b/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileSystem.scala deleted file mode 100644 index 98c4b94..0000000 --- a/lib/src/main/scala/net/kemitix/thorp/lib/LocalFileSystem.scala +++ /dev/null @@ -1,262 +0,0 @@ -package net.kemitix.thorp.lib - -import java.util -import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} - -import net.kemitix.thorp.config.Configuration -import net.kemitix.thorp.domain.Channel.{Listener, Sink} -import net.kemitix.thorp.domain.{RemoteObjects, _} -import net.kemitix.thorp.filesystem.FileSystem -import net.kemitix.thorp.uishell.UIEvent - -import scala.jdk.CollectionConverters._ -import scala.jdk.OptionConverters._ - -trait LocalFileSystem { - - def scanCopyUpload(configuration: Configuration, - uiSink: Channel.Sink[UIEvent], - remoteObjects: RemoteObjects, - archive: ThorpArchive): Seq[StorageEvent] - - def scanDelete(configuration: Configuration, - uiSink: Channel.Sink[UIEvent], - remoteData: RemoteObjects, - archive: ThorpArchive): Seq[StorageEvent] - -} -object LocalFileSystem extends LocalFileSystem { - - override def scanCopyUpload(configuration: Configuration, - uiSink: Channel.Sink[UIEvent], - remoteObjects: RemoteObjects, - archive: ThorpArchive): Seq[StorageEvent] = { - - val fileChannel: Channel[LocalFile] = Channel.create("files") - - // state for the file receiver - val actionCounter = new AtomicInteger() - val bytesCounter = new AtomicLong() - val uploads = Map.empty[MD5Hash, RemoteKey] - val events = new util.LinkedList[StorageEvent] - - fileChannel.addListener( - fileReceiver( - configuration, - uiSink, - remoteObjects, - archive, - uploads, - actionCounter, - bytesCounter, - events - ) - ) - - fileChannel.run( - sink => FileScanner.scanSources(configuration, sink), - "scan-sources" - ) - - fileChannel.start() - fileChannel.waitForShutdown() - events.asScala.toList - } - - override def scanDelete(configuration: Configuration, - uiSink: Channel.Sink[UIEvent], - remoteData: RemoteObjects, - archive: ThorpArchive): Seq[StorageEvent] = { - val deletionsChannel: Channel[RemoteKey] = Channel.create("deletions") - - // state for the file receiver - val actionCounter = new AtomicInteger() - val bytesCounter = new AtomicLong() - val events = new util.LinkedList[StorageEvent] - - deletionsChannel.addListener( - keyReceiver( - configuration, - uiSink, - archive, - actionCounter, - bytesCounter, - events - ) - ) - - deletionsChannel.run(sink => { - remoteData.byKey.keys().forEach(key => sink.accept(key)) - sink.shutdown() - }, "delete-source") - - deletionsChannel.start() - deletionsChannel.waitForShutdown() - events.asScala.toList - } - - private def fileReceiver( - configuration: Configuration, - uiSink: Channel.Sink[UIEvent], - remoteObjects: RemoteObjects, - archive: ThorpArchive, - uploads: Map[MD5Hash, RemoteKey], - actionCounter: AtomicInteger, - bytesCounter: AtomicLong, - events: util.Deque[StorageEvent] - ): Listener[LocalFile] = { (localFile: LocalFile) => - { - uiFileFound(uiSink)(localFile) - val action = - chooseAction(configuration, remoteObjects, uploads, uiSink)(localFile) - actionCounter.incrementAndGet() - bytesCounter.addAndGet(action.size) - uiActionChosen(uiSink)(action) - val sequencedAction = SequencedAction(action, actionCounter.get()) - val event = archive - .update(configuration, uiSink, sequencedAction, bytesCounter.get) - events.addFirst(event) - uiActionFinished(uiSink)( - action, - actionCounter.get, - bytesCounter.get, - event - ) - } - } - - private def uiActionChosen( - uiSink: Channel.Sink[UIEvent] - )(action: Action): Unit = - uiSink.accept(UIEvent.actionChosen(action)) - - private def uiActionFinished(uiSink: Channel.Sink[UIEvent])( - action: Action, - actionCounter: Int, - bytesCounter: Long, - event: StorageEvent - ): Unit = - uiSink.accept( - UIEvent.actionFinished(action, actionCounter, bytesCounter, event) - ) - - private def uiFileFound( - uiSink: Channel.Sink[UIEvent] - )(localFile: LocalFile): Unit = - uiSink.accept(UIEvent.fileFound(localFile)) - - private def chooseAction(configuration: Configuration, - remoteObjects: RemoteObjects, - uploads: Map[MD5Hash, RemoteKey], - uiSink: Channel.Sink[UIEvent], - )(localFile: LocalFile): Action = { - val remoteExists = remoteObjects.remoteKeyExists(localFile.remoteKey) - val remoteMatches = remoteObjects.remoteMatchesLocalFile(localFile) - val remoteForHash = remoteObjects.remoteHasHash(localFile.hashes).toScala - val previous = uploads - val bucket = configuration.bucket - val action = if (remoteExists && remoteMatches) { - doNothing(localFile, bucket) - } else { - remoteForHash match { - case pair: Some[Tuple[RemoteKey, MD5Hash]] => - val sourceKey = pair.value.a - val hash = pair.value.b - doCopy(localFile, bucket, sourceKey, hash) - case _ if matchesPreviousUpload(previous, localFile.hashes) => - doCopyWithPreviousUpload(localFile, bucket, previous, uiSink) - case _ => - doUpload(localFile, bucket) - } - } - action - } - - private def matchesPreviousUpload(previous: Map[MD5Hash, RemoteKey], - hashes: Hashes) = - hashes - .values() - .stream() - .anyMatch({ hash => - previous.contains(hash) - }) - - private def doNothing(localFile: LocalFile, bucket: Bucket) = - Action.doNothing(bucket, localFile.remoteKey, localFile.length) - - private def doCopy(localFile: LocalFile, - bucket: Bucket, - sourceKey: RemoteKey, - hash: MD5Hash) = - Action - .toCopy(bucket, sourceKey, hash, localFile.remoteKey, localFile.length) - - private def doCopyWithPreviousUpload(localFile: LocalFile, - bucket: Bucket, - previous: Map[MD5Hash, RemoteKey], - uiSink: Channel.Sink[UIEvent], - ) = { - localFile.hashes - .values() - .stream() - .filter({ hash => - previous.contains(hash) - }) - .findFirst() - .toScala - .map({ hash => - { - uiSink - .accept(UIEvent.awaitingAnotherUpload(localFile.remoteKey, hash)) - val action = Action.toCopy( - bucket, - previous(hash), - hash, - localFile.remoteKey, - localFile.length - ) - uiSink.accept(UIEvent.anotherUploadWaitComplete(action)) - action - } - }) - .getOrElse(doUpload(localFile, bucket)) - } - - private def doUpload(localFile: LocalFile, bucket: Bucket) = - Action.toUpload(bucket, localFile, localFile.length) - - def keyReceiver(configuration: Configuration, - uiSink: Channel.Sink[UIEvent], - archive: ThorpArchive, - actionCounter: AtomicInteger, - bytesCounter: AtomicLong, - events: util.Deque[StorageEvent]): Listener[RemoteKey] = { - (remoteKey: RemoteKey) => - { - uiKeyFound(uiSink)(remoteKey) - val sources = configuration.sources - val prefix = configuration.prefix - val exists = FileSystem.hasLocalFile(sources, prefix, remoteKey) - if (!exists) { - actionCounter.incrementAndGet() - val bucket = configuration.bucket - val action = Action.toDelete(bucket, remoteKey, 0L) - uiActionChosen(uiSink)(action) - bytesCounter.addAndGet(action.size) - val sequencedAction = SequencedAction(action, actionCounter.get()) - val event = archive.update(configuration, uiSink, sequencedAction, 0L) - events.addFirst(event) - uiActionFinished(uiSink)( - action, - actionCounter.get(), - bytesCounter.get(), - event - ) - } - } - } - - private def uiKeyFound(uiSink: Sink[UIEvent])(remoteKey: RemoteKey): Unit = - uiSink.accept(UIEvent.keyFound(remoteKey)) - -} diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/SequencedAction.scala b/lib/src/main/scala/net/kemitix/thorp/lib/SequencedAction.scala deleted file mode 100644 index c81c05a..0000000 --- a/lib/src/main/scala/net/kemitix/thorp/lib/SequencedAction.scala +++ /dev/null @@ -1,8 +0,0 @@ -package net.kemitix.thorp.lib - -import net.kemitix.thorp.domain.Action - -final case class SequencedAction( - action: Action, - index: Int -) diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/ThorpArchive.scala b/lib/src/main/scala/net/kemitix/thorp/lib/ThorpArchive.scala deleted file mode 100644 index 8950d35..0000000 --- a/lib/src/main/scala/net/kemitix/thorp/lib/ThorpArchive.scala +++ /dev/null @@ -1,44 +0,0 @@ -package net.kemitix.thorp.lib - -import net.kemitix.thorp.config.Configuration -import net.kemitix.thorp.console._ -import net.kemitix.thorp.domain.StorageEvent._ -import net.kemitix.thorp.domain.{Channel, StorageEvent} -import net.kemitix.thorp.uishell.UIEvent - -trait ThorpArchive { - - def update(configuration: Configuration, - uiSink: Channel.Sink[UIEvent], - sequencedAction: SequencedAction, - totalBytesSoFar: Long): StorageEvent - - def logEvent(configuration: Configuration, - event: StorageEvent): StorageEvent = { - val batchMode = configuration.batchMode - event match { - case uploadEvent: UploadEvent => - val remoteKey = uploadEvent.remoteKey - Console.putMessageLnB(ConsoleOut.uploadComplete(remoteKey), batchMode) - case copyEvent: CopyEvent => - val sourceKey = copyEvent.sourceKey - val targetKey = copyEvent.targetKey - Console.putMessageLnB( - ConsoleOut.copyComplete(sourceKey, targetKey), - batchMode - ) - case deleteEvent: DeleteEvent => - val remoteKey = deleteEvent.remoteKey - Console.putMessageLnB(ConsoleOut.deleteComplete(remoteKey), batchMode) - case errorEvent: ErrorEvent => - val action = errorEvent.action - val e = errorEvent.e - Console.putMessageLnB( - ConsoleOut.errorQueueEventOccurred(action, e), - batchMode - ) - } - event - } - -} diff --git a/lib/src/main/scala/net/kemitix/thorp/lib/UnversionedMirrorArchive.scala b/lib/src/main/scala/net/kemitix/thorp/lib/UnversionedMirrorArchive.scala deleted file mode 100644 index 121511b..0000000 --- a/lib/src/main/scala/net/kemitix/thorp/lib/UnversionedMirrorArchive.scala +++ /dev/null @@ -1,92 +0,0 @@ -package net.kemitix.thorp.lib - -import net.kemitix.thorp.config.Configuration -import net.kemitix.thorp.domain.Action.{ToCopy, ToDelete, ToUpload} -import net.kemitix.thorp.domain.StorageEvent.ActionSummary -import net.kemitix.thorp.domain._ -import net.kemitix.thorp.storage.Storage -import net.kemitix.thorp.uishell.{UIEvent, UploadEventListener} - -trait UnversionedMirrorArchive extends ThorpArchive { - - override def update(configuration: Configuration, - uiSink: Channel.Sink[UIEvent], - sequencedAction: SequencedAction, - totalBytesSoFar: Long): StorageEvent = { - val action = sequencedAction.action - val index = sequencedAction.index - val bucket = action.bucket - action match { - case upload: ToUpload => - val localFile = upload.localFile - doUpload( - configuration, - uiSink, - index, - totalBytesSoFar, - bucket, - localFile - ) - case toCopy: ToCopy => - val sourceKey = toCopy.sourceKey - val hash = toCopy.hash - val targetKey = toCopy.targetKey - Storage - .getInstance() - .copy(bucket, sourceKey, hash, targetKey) - case toDelete: ToDelete => - val remoteKey = toDelete.remoteKey - try { - Storage.getInstance().delete(bucket, remoteKey) - } catch { - case e: Throwable => - StorageEvent.errorEvent( - ActionSummary.delete(remoteKey.key()), - remoteKey, - e - ); - } - case doNothing: Action.DoNothing => - val remoteKey = doNothing.remoteKey - StorageEvent.doNothingEvent(remoteKey) - } - } - - private def doUpload(configuration: Configuration, - uiSink: Channel.Sink[UIEvent], - index: Int, - totalBytesSoFar: Long, - bucket: Bucket, - localFile: LocalFile) = - Storage - .getInstance() - .upload( - localFile, - bucket, - listenerSettings( - configuration, - uiSink, - index, - totalBytesSoFar, - bucket, - localFile - ) - ) - - private def listenerSettings(configuration: Configuration, - uiSink: Channel.Sink[UIEvent], - index: Int, - totalBytesSoFar: Long, - bucket: Bucket, - localFile: LocalFile) = - UploadEventListener.settings( - uiSink, - localFile, - index, - totalBytesSoFar, - configuration.batchMode - ) - -} - -object UnversionedMirrorArchive extends UnversionedMirrorArchive diff --git a/lib/src/test/java/net/kemitix/thorp/lib/FileScannerTest.java b/lib/src/test/java/net/kemitix/thorp/lib/FileScannerTest.java new file mode 100644 index 0000000..4a386c8 --- /dev/null +++ b/lib/src/test/java/net/kemitix/thorp/lib/FileScannerTest.java @@ -0,0 +1,49 @@ +package net.kemitix.thorp.lib; + +import net.kemitix.thorp.config.Configuration; +import net.kemitix.thorp.domain.*; +import net.kemitix.thorp.domain.channel.Channel; +import net.kemitix.thorp.filesystem.Resource; +import org.assertj.core.api.WithAssertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class FileScannerTest + implements WithAssertions { + + @Test + @DisplayName("scan resources as source") + public void scanResources() throws InterruptedException { + //given + File source = Resource.select(this, "upload").toFile(); + Configuration configuration = Configuration.create() + .withSources(Sources.create( + Collections.singletonList( + source.toPath()))); + List localFiles = new ArrayList<>(); + //when + Channel.create("test-file-scan") + .addListener(localFiles::add) + .run(sink -> FileScanner.scanSources(configuration, sink)) + .start() + .waitForShutdown(); + //then + File rootFile = source.toPath().resolve("root-file").toFile(); + File leafFile = source.toPath().resolve("subdir/leaf-file").toFile(); + Hashes rootHashes = Hashes.create(HashType.MD5, MD5HashData.Root.hash); + Hashes leafHashes = Hashes.create(HashType.MD5, MD5HashData.Leaf.hash); + RemoteKey rootKey = RemoteKey.create("root-file"); + RemoteKey leafKey = RemoteKey.create("subdir/leaf-file"); + long rootLength = 55L; + long leafLength = 58L; + assertThat(localFiles) + .containsExactlyInAnyOrder( + LocalFile.create(rootFile, source, rootHashes, rootKey, rootLength), + LocalFile.create(leafFile, source, leafHashes, leafKey, leafLength)); + } +} diff --git a/lib/src/test/java/net/kemitix/thorp/lib/FiltersTest.java b/lib/src/test/java/net/kemitix/thorp/lib/FiltersTest.java new file mode 100644 index 0000000..626f378 --- /dev/null +++ b/lib/src/test/java/net/kemitix/thorp/lib/FiltersTest.java @@ -0,0 +1,190 @@ +package net.kemitix.thorp.lib; + +import net.kemitix.thorp.domain.Filter; +import org.assertj.core.api.WithAssertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class FiltersTest + implements WithAssertions { + + private final String path1 = "a-file"; + private final String path2 = "another-file.txt"; + private final String path3 = "/path/to/a/file.txt"; + private final String path4 = "/path/to/another/file"; + private final String path5 = "/home/pcampbell/repos/kemitix/s3thorp"; + private final String path6 = "/kemitix/s3thorp/upload/subdir"; + private final List paths = + Stream.of(path1, path2, path3, path4, path5, path6) + .map(Paths::get) + .collect(Collectors.toList()); + + @Nested + @DisplayName("include") + public class IncludeTests { + @Test + @DisplayName("default filter") + public void defaultFilter() { + //given + List filters = Collections.singletonList( + Filter.Include.all()); + //then + assertThat(paths) + .allMatch(path -> + Filters.isIncluded(path, filters)); + } + @Nested + @DisplayName("directory exact match") + public class DirectoryExactMatchTests { + List filters = Collections.singletonList( + Filter.include("/upload/subdir/")); + @Test + @DisplayName("include matching directory") + public void includeMatchingDirectory() { + //given + Path path = Paths.get("/upload/subdir/leaf-dir"); + //when + boolean included = Filters.isIncluded(path, filters); + //then + assertThat(included).isTrue(); + } + @Test + @DisplayName("exclude non-matching file") + public void excludeNonMatchingFile() { + //given + Path path = Paths.get("/upload/other-file"); + //when + boolean included = Filters.isIncluded(path, filters); + //then + assertThat(included).isFalse(); + } + } + @Nested + @DisplayName("file partial match 'root'") + public class FilePartialMatchRootTests { + List filters = Collections.singletonList( + Filter.include("root")); + @Test + @DisplayName("include matching file") + public void includeMatchingFile() { + //given + Path path = Paths.get("/upload/root-file"); + //when + boolean included = Filters.isIncluded(path, filters); + //then + assertThat(included).isTrue(); + } + @Test + @DisplayName("exclude non-matching file (1)") + public void excludeNonMatchingFile1() { + //given + Path path = Paths.get("/test-file-for-hash.txt"); + //when + boolean included = Filters.isIncluded(path, filters); + //then + assertThat(included).isFalse(); + } + @Test + @DisplayName("exclude non-matching file (2)") + public void excludeNonMatchingFile2() { + //given + Path path = Paths.get("/upload/subdir/lead-file"); + //when + boolean included = Filters.isIncluded(path, filters); + //then + assertThat(included).isFalse(); + } + } + } + @Nested + @DisplayName("exclude") + public class ExcludeTests { + @Nested + @DisplayName("directory exact match exclude") + public class DirectoryMatchTests { + List filters = Collections.singletonList( + Filter.exclude("/upload/subdir/")); + @Test + @DisplayName("exclude matching directory") + public void excludeDirectory() { + //given + Path path = Paths.get("/upload/subdir/leaf-file"); + //when + boolean included = Filters.isIncluded(path, filters); + //then + assertThat(included).isFalse(); + } + @Test + @DisplayName("include non-matching file") + public void includeFile() { + //given + Path path = Paths.get("/upload/other-file"); + //when + boolean included = Filters.isIncluded(path, filters); + //then + assertThat(included).isTrue(); + } + } + @Nested + @DisplayName("file partial match") + public class PartialMatchTests { + List filters = Collections.singletonList( + Filter.exclude("root")); + @Test + @DisplayName("exclude matching file") + public void excludeFile() { + //given + Path path = Paths.get("/upload/root-file"); + //when + boolean included = Filters.isIncluded(path, filters); + //then + assertThat(included).isFalse(); + } + @Test + @DisplayName("include non-matching file (1)") + public void includeFile1() { + //given + Path path = Paths.get("/test-file-for-hash.txt"); + //when + boolean included = Filters.isIncluded(path, filters); + //then + assertThat(included).isTrue(); + } + @Test + @DisplayName("include non-matching file (2)") + public void includeFile2() { + //given + Path path = Paths.get("/upload/subdir/leaf-file"); + //when + boolean included = Filters.isIncluded(path, filters); + //then + assertThat(included).isTrue(); + } + } + } + @Nested + @DisplayName("isIncluded") + public class IsIncludedTests { + List invoke(List filters) { + return paths.stream() + .filter(path -> + Filters.isIncluded(path, filters)) + .collect(Collectors.toList()); + } + + @Test + @DisplayName("when no filters then accepts all paths") + public void whenNoFilters_thenAcceptAll() { + assertThat(invoke(Collections.emptyList())) + .containsExactlyElementsOf(paths); + } + } +} diff --git a/lib/src/test/java/net/kemitix/thorp/lib/LocalFileSystemTest.java b/lib/src/test/java/net/kemitix/thorp/lib/LocalFileSystemTest.java new file mode 100644 index 0000000..8723e6a --- /dev/null +++ b/lib/src/test/java/net/kemitix/thorp/lib/LocalFileSystemTest.java @@ -0,0 +1,426 @@ +package net.kemitix.thorp.lib; + +import net.kemitix.thorp.config.*; +import net.kemitix.thorp.domain.*; +import net.kemitix.thorp.domain.channel.Channel; +import net.kemitix.thorp.domain.channel.Listener; +import net.kemitix.thorp.domain.channel.Sink; +import net.kemitix.thorp.filesystem.Resource; +import net.kemitix.thorp.uishell.UIEvent; +import org.assertj.core.api.WithAssertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +public class LocalFileSystemTest + implements WithAssertions { + + private final Resource source = Resource.select(this, "upload"); + private final Path sourcePath = source.toPath(); + private final ConfigOption sourceOption = ConfigOption.source(sourcePath); + private final Bucket bucket = Bucket.named("bucket"); + private final ConfigOption bucketOption = ConfigOption.bucket(bucket.name()); + private final ConfigOptions configOptions = ConfigOptions.create( + Arrays.asList( + sourceOption, + bucketOption, + ConfigOption.ignoreGlobalOptions(), + ConfigOption.ignoreUserOptions() + )); + + private final AtomicReference> uiEvents = new AtomicReference<>(Collections.emptyList()); + private final AtomicReference> actions = new AtomicReference<>(Collections.emptyList()); + + private final Archive archive = + (configuration, uiSink, sequencedAction, totalBytesSoFar) -> { + actions.updateAndGet(l -> { + List list = new ArrayList<>(); + list.add(sequencedAction); + list.addAll(l); + return list; + }); + return StorageEvent.doNothingEvent( + sequencedAction.action().remoteKey); + }; + + @Nested + @DisplayName("scanCopyUpload") + public class ScanCopyUploadTests { + @Nested + @DisplayName("where remote is empty") + public class WhereRemoteEmptyTests { + RemoteObjects remoteObjects = RemoteObjects.empty; + Configuration configuration = ConfigurationBuilder.buildConfig(configOptions); + List uiEventList = new ArrayList<>(); + Channel uiSink = Channel.create("ui-test") + .addListener(uiEventList::add) + .start(); + List storageEvents; + + public WhereRemoteEmptyTests() throws IOException, ConfigValidationException { + } + + @BeforeEach + public void setUp() throws InterruptedException { + storageEvents = LocalFileSystem.scanCopyUpload( + configuration, uiSink, remoteObjects, archive); + uiSink.shutdownNow(); + } + + @Test + @DisplayName("update archive with all files") + public void uploadArchive() { + assertThat(storageEvents).hasSize(2); + assertThat(actions.get().stream() + .map(SequencedAction::action)) + .allSatisfy(action -> + assertThat(action) + .isInstanceOf(Action.ToUpload.class)); + assertThat(actions.get().stream() + .map(SequencedAction::action) + .map(action -> action.remoteKey)) + .containsExactlyInAnyOrder( + MD5HashData.Root.remoteKey, + MD5HashData.Leaf.remoteKey + ); + } + + @Test + @DisplayName("update ui with all files") + public void updateUI() { + assertThat(uiEventSummary(uiEventList)) + .hasSize(6) + .contains( + "file found : root-file", + "action chosen : root-file : ToUpload", + "action finished : root-file : ToUpload") + .contains( + "file found : subdir/leaf-file", + "action chosen : subdir/leaf-file : ToUpload", + "action finished : subdir/leaf-file : ToUpload"); + } + } + + @Nested + @DisplayName("where remote has all") + public class WhereRemoteHasAllTests { + Map hashToKey = new HashMap<>(); + Map keyToHash = new HashMap<>(); + RemoteObjects remoteObjects; + Configuration configuration = ConfigurationBuilder.buildConfig(configOptions); + List uiEventList = new ArrayList<>(); + Channel uiSink = Channel.create("ui-test") + .addListener(uiEventList::add) + .start(); + List storageEvents; + + public WhereRemoteHasAllTests() throws IOException, ConfigValidationException { + hashToKey.put(MD5HashData.Root.hash, MD5HashData.Root.remoteKey); + hashToKey.put(MD5HashData.Leaf.hash, MD5HashData.Leaf.remoteKey); + keyToHash.put(MD5HashData.Root.remoteKey, MD5HashData.Root.hash); + keyToHash.put(MD5HashData.Leaf.remoteKey, MD5HashData.Leaf.hash); + remoteObjects = RemoteObjects.create( + MapView.of(hashToKey).asMap(), + MapView.of(keyToHash).asMap()); + } + + @BeforeEach + public void setUp() throws InterruptedException { + storageEvents = LocalFileSystem.scanCopyUpload( + configuration, uiSink, remoteObjects, archive); + uiSink.shutdownNow(); + } + + @Test + @DisplayName("do nothing for all files") + public void doNothing() { + assertThat(storageEvents).hasSize(2); + assertThat(actions.get().stream() + .map(SequencedAction::action)) + .allSatisfy(action -> + assertThat(action) + .isInstanceOf(Action.DoNothing.class)); + } + + @Test + @DisplayName("update ui with do nothing") + public void updateUI() { + assertThat(uiEventSummary(uiEventList)) + .hasSize(6) + .contains( + "file found : root-file", + "action chosen : root-file : DoNothing", + "action finished : root-file : DoNothing") + .contains( + "file found : subdir/leaf-file", + "action chosen : subdir/leaf-file : DoNothing", + "action finished : subdir/leaf-file : DoNothing"); + } + } + + @Nested + @DisplayName("where remote has some") + public class WhereRemoteHasSomeTests { + Map hashToKey = new HashMap<>(); + Map keyToHash = new HashMap<>(); + RemoteObjects remoteObjects; + Configuration configuration = ConfigurationBuilder.buildConfig(configOptions); + List uiEventList = new ArrayList<>(); + Channel uiSink = Channel.create("ui-test") + .addListener(uiEventList::add) + .start(); + List storageEvents; + + public WhereRemoteHasSomeTests() throws IOException, ConfigValidationException { + hashToKey.put(MD5HashData.Root.hash, MD5HashData.Root.remoteKey); + keyToHash.put(MD5HashData.Root.remoteKey, MD5HashData.Root.hash); + remoteObjects = RemoteObjects.create( + MapView.of(hashToKey).asMap(), + MapView.of(keyToHash).asMap()); + } + + @BeforeEach + public void setUp() throws InterruptedException { + storageEvents = LocalFileSystem.scanCopyUpload( + configuration, uiSink, remoteObjects, archive); + uiSink.shutdownNow(); + } + + @Test + @DisplayName("do nothing for some, upload for others") + public void doNothingAnUpload() { + assertThat(storageEvents).hasSize(2); + assertThat(actions.get().stream() + .map(SequencedAction::action)) + .filteredOn(action -> action instanceof Action.DoNothing) + .hasSize(1); + assertThat(actions.get().stream() + .map(SequencedAction::action)) + .filteredOn(action -> action instanceof Action.ToUpload) + .hasSize(1); + assertThat(actions.get().stream() + .map(SequencedAction::action) + .map(action -> action.remoteKey)) + .containsExactlyInAnyOrder( + MD5HashData.Root.remoteKey, + MD5HashData.Leaf.remoteKey + ); + } + + @Test + @DisplayName("update ui with do nothing") + public void updateUI() { + assertThat(uiEventSummary(uiEventList)) + .hasSize(6) + .contains( + "file found : root-file", + "action chosen : root-file : DoNothing", + "action finished : root-file : DoNothing") + .contains( + "file found : subdir/leaf-file", + "action chosen : subdir/leaf-file : ToUpload", + "action finished : subdir/leaf-file : ToUpload"); + } + } + + @Nested + @DisplayName("where file has been renamed") + public class WhereFileRenamedTests { + Map hashToKey = new HashMap<>(); + Map keyToHash = new HashMap<>(); + RemoteObjects remoteObjects; + Configuration configuration = ConfigurationBuilder.buildConfig(configOptions); + List uiEventList = new ArrayList<>(); + Channel uiSink = Channel.create("ui-test") + .addListener(uiEventList::add) + .start(); + List storageEvents; + + public WhereFileRenamedTests() throws IOException, ConfigValidationException { + RemoteKey otherKey = RemoteKey.create("/old-filename"); + hashToKey.put(MD5HashData.Root.hash, otherKey); + hashToKey.put(MD5HashData.Leaf.hash, MD5HashData.Leaf.remoteKey); + keyToHash.put(otherKey, MD5HashData.Root.hash); + keyToHash.put(MD5HashData.Leaf.remoteKey, MD5HashData.Leaf.hash); + remoteObjects = RemoteObjects.create( + MapView.of(hashToKey).asMap(), + MapView.of(keyToHash).asMap()); + } + + @BeforeEach + public void setUp() throws InterruptedException { + storageEvents = LocalFileSystem.scanCopyUpload( + configuration, uiSink, remoteObjects, archive); + uiSink.shutdownNow(); + } + + @Test + @DisplayName("copy") + public void copy() { + assertThat(storageEvents).hasSize(2); + assertThat(actions.get().stream() + .map(SequencedAction::action)) + .filteredOn(action -> action instanceof Action.ToCopy) + .hasSize(1); + assertThat(actions.get().stream() + .map(SequencedAction::action)) + .filteredOn(action -> action instanceof Action.DoNothing) + .hasSize(1); + assertThat(actions.get().stream() + .map(SequencedAction::action) + .map(action -> action.remoteKey)) + .containsExactlyInAnyOrder( + MD5HashData.Root.remoteKey, + MD5HashData.Leaf.remoteKey + ); + } + + @Test + @DisplayName("update ui") + public void updateUI() { + assertThat(uiEventSummary(uiEventList)) + .hasSize(6) + .contains( + "file found : root-file", + "action chosen : root-file : ToCopy", + "action finished : root-file : ToCopy") + .contains( + "file found : subdir/leaf-file", + "action chosen : subdir/leaf-file : DoNothing", + "action finished : subdir/leaf-file : DoNothing"); + } + } + } + + @Nested + @DisplayName("scanDelete") + public class ScanDeleteTests { + @Nested + @DisplayName("where remote has no extra objects") + public class RemoteHasNoExtrasTests { + RemoteObjects remoteObjects = RemoteObjects.empty; + Configuration configuration = ConfigurationBuilder.buildConfig(configOptions); + List uiEventList = new ArrayList<>(); + Channel uiSink = Channel.create("ui-test") + .addListener(uiEventList::add) + .start(); + List storageEvents; + + public RemoteHasNoExtrasTests() throws IOException, ConfigValidationException { + } + + @BeforeEach + public void setUp() throws InterruptedException { + storageEvents = LocalFileSystem.scanDelete( + configuration, uiSink, remoteObjects, archive); + uiSink.shutdownNow(); + } + + @Test + @DisplayName("no archive actions") + public void noArchiveUpdates() { + assertThat(storageEvents).isEmpty(); + } + + @Test + @DisplayName("update ui") + public void updateUI() { + assertThat(uiEventList).isEmpty(); + } + } + @Nested + @DisplayName("where remote has extra objects") + public class RemoteHasExtrasTests { + Map hashToKey = new HashMap<>(); + Map keyToHash = new HashMap<>(); + RemoteObjects remoteObjects; + Configuration configuration = ConfigurationBuilder.buildConfig(configOptions); + List uiEventList = new ArrayList<>(); + Channel uiSink = Channel.create("ui-test") + .addListener(uiEventList::add) + .start(); + List storageEvents; + RemoteKey extraKey = RemoteKey.create("/extra"); + MD5Hash extraHash = MD5Hash.create("extra-hash"); + + public RemoteHasExtrasTests() throws IOException, ConfigValidationException { + hashToKey.put(extraHash, extraKey); + keyToHash.put(extraKey, extraHash); + remoteObjects = RemoteObjects.create( + MapView.of(hashToKey).asMap(), + MapView.of(keyToHash).asMap()); + } + + @BeforeEach + public void setUp() throws InterruptedException { + storageEvents = LocalFileSystem.scanDelete( + configuration, uiSink, remoteObjects, archive); + uiSink.shutdownNow(); + } + + @Test + @DisplayName("archive delete action") + public void archiveDeleteUpdates() { + assertThat(storageEvents).hasSize(1); + assertThat(actions.get().stream() + .map(SequencedAction::action)) + .filteredOn(action -> action instanceof Action.ToDelete) + .hasSize(1); + assertThat(actions.get().stream() + .map(SequencedAction::action) + .map(action -> action.remoteKey)) + .containsExactly(extraKey); + } + + @Test + @DisplayName("update ui") + public void updateUI() { + assertThat(uiEventSummary(uiEventList)) + .hasSize(3) + .contains( + "key found: /extra", + "action chosen : /extra : ToDelete", + "action finished : /extra : ToDelete"); + } + } + } + + private List uiEventSummary(List uiEvents) { + Deque summary = new ArrayDeque<>(); + uiEvents.stream() + .map(uiEvent -> { + if (uiEvent instanceof UIEvent.FileFound) { + return String.format("file found : %s", + ((UIEvent.FileFound) uiEvent).localFile + .remoteKey.key()); + } else if (uiEvent instanceof UIEvent.ActionChosen) { + Action action = ((UIEvent.ActionChosen) uiEvent).action; + return String.format( + "action chosen : %s : %s", + action.remoteKey.key(), + action.getClass().getSimpleName()); + } else if (uiEvent instanceof UIEvent.ActionFinished) { + Action action = ((UIEvent.ActionFinished) uiEvent).action; + return String.format( + "action finished : %s : %s", + action.remoteKey.key(), + action.getClass().getSimpleName()); + } else if (uiEvent instanceof UIEvent.KeyFound) { + return String.format("key found: %s", + ((UIEvent.KeyFound) uiEvent).remoteKey + .key()); + } + return String.format("unknown : %s", + uiEvent.getClass().getSimpleName()); + }) + .forEach(summary::addLast); + return new ArrayList<>(summary); + } +} diff --git a/lib/src/test/resources/META-INF/services/net.kemitix.thorp.domain.HashGenerator b/lib/src/test/resources/META-INF/services/net.kemitix.thorp.domain.HashGenerator new file mode 100644 index 0000000..91c71ca --- /dev/null +++ b/lib/src/test/resources/META-INF/services/net.kemitix.thorp.domain.HashGenerator @@ -0,0 +1 @@ +net.kemitix.thorp.filesystem.MD5HashGenerator diff --git a/lib/src/test/scala/net/kemitix/thorp/lib/FileScannerTest.scala b/lib/src/test/scala/net/kemitix/thorp/lib/FileScannerTest.scala deleted file mode 100644 index 563c599..0000000 --- a/lib/src/test/scala/net/kemitix/thorp/lib/FileScannerTest.scala +++ /dev/null @@ -1,46 +0,0 @@ -package net.kemitix.thorp.lib - -import org.scalatest.FreeSpec - -class FileScannerTest extends FreeSpec { - -// "scanSources" - { -// "creates a FileSender for files in resources" in { -// def receiver(scanned: Ref[List[RemoteKey]]) -// : UIO[MessageChannel.UReceiver[Any, ScannedFile]] = UIO { message => -// for { -// _ <- scanned.update(l => message.body.remoteKey :: l) -// } yield () -// } -// val scannedFiles = -// new AtomicReference[List[RemoteKey]](List.empty) -// val sourcePath = Resource.select(this, "upload").toPath -// val configOptions: List[ConfigOption] = -// List[ConfigOption](ConfigOption.source(sourcePath), -// ConfigOption.bucket("bucket"), -// ConfigOption.ignoreGlobalOptions(), -// ConfigOption.ignoreUserOptions()) -// val program: ZIO[Clock with FileScanner, Throwable, Unit] = { -// val configuration = ConfigurationBuilder.buildConfig( -// ConfigOptions.create(configOptions.asJava)) -// for { -// scanner <- FileScanner.scanSources(configuration) -// scannedRef <- Ref.make[List[RemoteKey]](List.empty) -// receiver <- receiver(scannedRef) -// _ <- MessageChannel.pointToPoint(scanner)(receiver).runDrain -// scanned <- scannedRef.get -// _ <- UIO(scannedFiles.set(scanned)) -// } yield () -// } -// object TestEnv extends FileScanner.Live with Clock.Live -// val completed = -// new DefaultRuntime {}.unsafeRunSync(program.provide(TestEnv)).toEither -// assert(completed.isRight) -// assertResult( -// Set(RemoteKey.create("root-file"), -// RemoteKey.create("subdir/leaf-file")))(scannedFiles.get.toSet) -// } -// -// } - -} diff --git a/lib/src/test/scala/net/kemitix/thorp/lib/FiltersSuite.scala b/lib/src/test/scala/net/kemitix/thorp/lib/FiltersSuite.scala deleted file mode 100644 index 8903b70..0000000 --- a/lib/src/test/scala/net/kemitix/thorp/lib/FiltersSuite.scala +++ /dev/null @@ -1,139 +0,0 @@ -package net.kemitix.thorp.lib - -import java.nio.file.Paths - -import net.kemitix.thorp.domain.Filter -import net.kemitix.thorp.domain.Filter.{Exclude, Include} -import org.scalatest.FunSpec - -class FiltersSuite extends FunSpec { - - private val path1 = "a-file" - private val path2 = "another-file.txt" - private val path3 = "/path/to/a/file.txt" - private val path4 = "/path/to/another/file" - private val path5 = "/home/pcampbell/repos/kemitix/s3thorp" - private val path6 = "/kemitix/s3thorp/upload/subdir" - private val paths = - List(path1, path2, path3, path4, path5, path6).map(Paths.get(_)) - - describe("Include") { - - describe("default filter") { - val include = Include.all - it("should include files") { - paths.foreach(path => - assertResult(true)(Filters.isIncludedByFilter(path)(include))) - } - } - describe("directory exact match include '/upload/subdir/'") { - val include = Include.create("/upload/subdir/") - it("include matching directory") { - val matching = Paths.get("/upload/subdir/leaf-file") - assertResult(true)(Filters.isIncludedByFilter(matching)(include)) - } - it("exclude non-matching files") { - val nonMatching = Paths.get("/upload/other-file") - assertResult(false)(Filters.isIncludedByFilter(nonMatching)(include)) - } - } - describe("file partial match 'root'") { - val include = Include.create("root") - it("include matching file '/upload/root-file") { - val matching = Paths.get("/upload/root-file") - assertResult(true)(Filters.isIncludedByFilter(matching)(include)) - } - it("exclude non-matching files 'test-file-for-hash.txt'") { - val nonMatching1 = Paths.get("/test-file-for-hash.txt") - assertResult(false)(Filters.isIncludedByFilter(nonMatching1)(include)) - } - it("exclude non-matching files '/upload/subdir/leaf-file'") { - val nonMatching2 = Paths.get("/upload/subdir/leaf-file") - assertResult(false)(Filters.isIncludedByFilter(nonMatching2)(include)) - } - } - } - - describe("Exclude") { -// describe("default exclude") { -// val exclude = Exclude() -// it("should exclude all files") { -// paths.foreach(path => { -// assertResult(true)(exclude.isExcluded(path)) -// }) -// } -// } - describe("directory exact match exclude '/upload/subdir/'") { - val exclude = Exclude.create("/upload/subdir/") - it("exclude matching directory") { - val matching = Paths.get("/upload/subdir/leaf-file") - assertResult(true)(Filters.isExcludedByFilter(matching)(exclude)) - } - it("include non-matching files") { - val nonMatching = Paths.get("/upload/other-file") - assertResult(false)(Filters.isExcludedByFilter(nonMatching)(exclude)) - } - } - describe("file partial match 'root'") { - val exclude = Exclude.create("root") - it("exclude matching file '/upload/root-file") { - val matching = Paths.get("/upload/root-file") - assertResult(true)(Filters.isExcludedByFilter(matching)(exclude)) - } - it("include non-matching files 'test-file-for-hash.txt'") { - val nonMatching1 = Paths.get("/test-file-for-hash.txt") - assertResult(false)(Filters.isExcludedByFilter(nonMatching1)(exclude)) - } - it("include non-matching files '/upload/subdir/leaf-file'") { - val nonMatching2 = Paths.get("/upload/subdir/leaf-file") - assertResult(false)(Filters.isExcludedByFilter(nonMatching2)(exclude)) - } - } - } - describe("isIncluded") { - def invoke(filters: List[Filter]) = { - paths.filter(path => Filters.isIncluded(path)(filters)) - } - - describe("when there are no filters") { - val filters = List[Filter]() - it("should accept all files") { - val expected = paths - val result = invoke(filters) - assertResult(expected)(result) - } - } - describe("when a single include") { - val filters = List(Include.create(".txt")) - it("should only include two matching paths") { - val expected = List(path2, path3).map(Paths.get(_)) - val result = invoke(filters) - assertResult(expected)(result) - } - } - describe("when a single exclude") { - val filters = List(Exclude.create("path")) - it("should include only other paths") { - val expected = List(path1, path2, path5, path6).map(Paths.get(_)) - val result = invoke(filters) - assertResult(expected)(result) - } - } - describe("when include .txt files, but then exclude everything trumps all") { - val filters = List[Filter](Include.create(".txt"), Exclude.create(".*")) - it("should include nothing") { - val expected = List() - val result = invoke(filters) - assertResult(expected)(result) - } - } - describe("when exclude everything except .txt files") { - val filters = List[Filter](Exclude.create(".*"), Include.create(".txt")) - it("should include only the .txt files") { - val expected = List(path2, path3).map(Paths.get(_)) - val result = invoke(filters) - assertResult(expected)(result) - } - } - } -} diff --git a/lib/src/test/scala/net/kemitix/thorp/lib/LocalFileSystemTest.scala b/lib/src/test/scala/net/kemitix/thorp/lib/LocalFileSystemTest.scala deleted file mode 100644 index 863f609..0000000 --- a/lib/src/test/scala/net/kemitix/thorp/lib/LocalFileSystemTest.scala +++ /dev/null @@ -1,369 +0,0 @@ -package net.kemitix.thorp.lib - -import java.util.concurrent.atomic.AtomicReference - -import net.kemitix.thorp.config.{ConfigOption, ConfigOptions, Configuration} -import net.kemitix.thorp.domain._ -import net.kemitix.thorp.filesystem.Resource -import net.kemitix.thorp.uishell.UIEvent -import net.kemitix.thorp.uishell.UIEvent.{ - ActionChosen, - ActionFinished, - FileFound, - KeyFound -} -import org.scalatest.FreeSpec - -import scala.jdk.CollectionConverters._ - -class LocalFileSystemTest extends FreeSpec { - - private val source = Resource.select(this, "upload") - private val sourcePath = source.toPath - private val sourceOption = ConfigOption.source(sourcePath) - private val bucket = Bucket.named("bucket") - private val bucketOption = ConfigOption.bucket(bucket.name) - private val configOptions = ConfigOptions.create( - List[ConfigOption]( - sourceOption, - bucketOption, - ConfigOption.ignoreGlobalOptions(), - ConfigOption.ignoreUserOptions() - ).asJava - ) - - private val uiEvents = new AtomicReference[List[UIEvent]](List.empty) - private val actions = new AtomicReference[List[SequencedAction]](List.empty) - - private def archive: ThorpArchive = new ThorpArchive { - override def update(configuration: Configuration, - uiSink: Channel.Sink[UIEvent], - sequencedAction: SequencedAction, - totalBytesSoFar: Long): StorageEvent = { - actions.updateAndGet(l => sequencedAction :: l) - StorageEvent.doNothingEvent(sequencedAction.action.remoteKey) - } - } - -// private object TestEnv extends Clock.Live with FileScanner.Live -// -// "scanCopyUpload" - { -// def sender( -// configuration: Configuration, -// objects: RemoteObjects -// ): UIO[MessageChannel.ESender[Clock with FileScanner, Throwable, UIEvent]] = -// UIO { uiChannel => -// (for { -// _ <- LocalFileSystem.scanCopyUpload( -// configuration, -// uiChannel, -// objects, -// archive -// ) -// } yield ()) <* MessageChannel.endChannel(uiChannel) -// } -// def receiver(): UIO[MessageChannel.UReceiver[Any, UIEvent]] = -// UIO { message => -// val uiEvent = message.body -// uiEvents.updateAndGet(l => uiEvent :: l) -// UIO(()) -// } -// def program(remoteObjects: RemoteObjects) = { -// val configuration = ConfigurationBuilder.buildConfig(configOptions) -// for { -// sender <- sender(configuration, remoteObjects) -// receiver <- receiver() -// _ <- MessageChannel.pointToPoint(sender)(receiver).runDrain -// } yield () -// } -// "where remote has no objects" - { -// val remoteObjects = RemoteObjects.empty -// "upload all files" - { -// "update archive with upload actions" in { -// actions.set(List.empty) -// runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv)) -// val actionList: Set[Action] = actions.get.map(_.action).toSet -// actionList.filter(_.isInstanceOf[ToUpload]) should have size 2 -// actionList.map(_.remoteKey) shouldEqual Set( -// MD5HashData.Root.remoteKey, -// MD5HashData.Leaf.remoteKey -// ) -// } -// "ui is updated" in { -// uiEvents.set(List.empty) -// runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv)) -// val summary = uiEventsSummary -// summary should have size 6 -// summary should contain inOrderElementsOf List( -// "file found : root-file", -// "action chosen : root-file : ToUpload", -// "action finished : root-file : ToUpload" -// ) -// summary should contain inOrderElementsOf List( -// "file found : subdir/leaf-file", -// "action chosen : subdir/leaf-file : ToUpload", -// "action finished : subdir/leaf-file : ToUpload" -// ) -// } -// } -// } -// "where remote has all object" - { -// val remoteObjects = -// RemoteObjects.create( -// MapView( -// MD5HashData.Root.hash -> MD5HashData.Root.remoteKey, -// MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey -// ).toMap.asJava, -// MapView( -// MD5HashData.Root.remoteKey -> MD5HashData.Root.hash, -// MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash -// ).toMap.asJava -// ) -// "do nothing for all files" - { -// "all archive actions do nothing" in { -// actions.set(List.empty) -// runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv)) -// val actionList: Set[Action] = actions.get.map(_.action).toSet -// actionList should have size 2 -// actionList.filter(_.isInstanceOf[DoNothing]) should have size 2 -// } -// "ui is updated" in { -// uiEvents.set(List.empty) -// runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv)) -// val summary = uiEventsSummary -// summary should have size 6 -// summary should contain inOrderElementsOf List( -// "file found : root-file", -// "action chosen : root-file : DoNothing", -// "action finished : root-file : DoNothing" -// ) -// summary should contain inOrderElementsOf List( -// "file found : subdir/leaf-file", -// "action chosen : subdir/leaf-file : DoNothing", -// "action finished : subdir/leaf-file : DoNothing" -// ) -// } -// } -// } -// "where remote has some objects" - { -// val remoteObjects = -// RemoteObjects.create( -// MapView(MD5HashData.Root.hash -> MD5HashData.Root.remoteKey).toMap.asJava, -// MapView(MD5HashData.Root.remoteKey -> MD5HashData.Root.hash).toMap.asJava -// ) -// "upload leaf, do nothing for root" - { -// "archive actions upload leaf" in { -// actions.set(List.empty) -// runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv)) -// val actionList: Set[Action] = actions.get.map(_.action).toSet -// actionList -// .filter(_.isInstanceOf[DoNothing]) -// .map(_.remoteKey) shouldEqual Set(MD5HashData.Root.remoteKey) -// actionList -// .filter(_.isInstanceOf[ToUpload]) -// .map(_.remoteKey) shouldEqual Set(MD5HashData.Leaf.remoteKey) -// } -// "ui is updated" in { -// uiEvents.set(List.empty) -// runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv)) -// val summary = uiEventsSummary -// summary should contain inOrderElementsOf List( -// "file found : root-file", -// "action chosen : root-file : DoNothing", -// "action finished : root-file : DoNothing" -// ) -// summary should contain inOrderElementsOf List( -// "file found : subdir/leaf-file", -// "action chosen : subdir/leaf-file : ToUpload", -// "action finished : subdir/leaf-file : ToUpload" -// ) -// } -// } -// } -// "where remote objects are swapped" ignore { -// val remoteObjects = -// RemoteObjects.create( -// MapView( -// MD5HashData.Root.hash -> MD5HashData.Leaf.remoteKey, -// MD5HashData.Leaf.hash -> MD5HashData.Root.remoteKey -// ).toMap.asJava, -// MapView( -// MD5HashData.Root.remoteKey -> MD5HashData.Leaf.hash, -// MD5HashData.Leaf.remoteKey -> MD5HashData.Root.hash -// ).toMap.asJava -// ) -// "copy files" - { -// "archive swaps objects" ignore { -// // not supported -// } -// } -// } -// "where file has been renamed" - { -// // renamed from "other/root" to "root-file" -// val otherRootKey = RemoteKey.create("other/root") -// val remoteObjects = -// RemoteObjects.create( -// MapView( -// MD5HashData.Root.hash -> otherRootKey, -// MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey -// ).toMap.asJava, -// MapView( -// otherRootKey -> MD5HashData.Root.hash, -// MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash -// ).toMap.asJava -// ) -// "copy object and delete original" in { -// actions.set(List.empty) -// runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv)) -// val actionList: Set[Action] = actions.get.map(_.action).toSet -// actionList should have size 2 -// actionList -// .filter(_.isInstanceOf[DoNothing]) -// .map(_.remoteKey) shouldEqual Set(MD5HashData.Leaf.remoteKey) -// actionList -// .filter(_.isInstanceOf[ToCopy]) -// .map(_.remoteKey) shouldEqual Set(MD5HashData.Root.remoteKey) -// } -// "ui is updated" in { -// uiEvents.set(List.empty) -// runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv)) -// val summary = uiEventsSummary -// summary should contain inOrderElementsOf List( -// "file found : root-file", -// "action chosen : root-file : ToCopy", -// "action finished : root-file : ToCopy" -// ) -// summary should contain inOrderElementsOf List( -// "file found : subdir/leaf-file", -// "action chosen : subdir/leaf-file : DoNothing", -// "action finished : subdir/leaf-file : DoNothing" -// ) -// } -// } -// } -// -// "scanDelete" - { -// def sender( -// configuration: Configuration, -// objects: RemoteObjects -// ): UIO[MessageChannel.ESender[Clock, Throwable, UIEvent]] = -// UIO { uiChannel => -// (for { -// _ <- LocalFileSystem.scanDelete( -// configuration, -// uiChannel, -// objects, -// archive -// ) -// } yield ()) <* MessageChannel.endChannel(uiChannel) -// } -// def receiver(): UIO[MessageChannel.UReceiver[Any, UIEvent]] = -// UIO { message => -// val uiEvent = message.body -// uiEvents.updateAndGet(l => uiEvent :: l) -// UIO(()) -// } -// def program(remoteObjects: RemoteObjects) = { -// { -// val configuration = ConfigurationBuilder.buildConfig(configOptions) -// for { -// sender <- sender(configuration, remoteObjects) -// receiver <- receiver() -// _ <- MessageChannel.pointToPoint(sender)(receiver).runDrain -// } yield () -// } -// } -// "where remote has no extra objects" - { -// val remoteObjects = RemoteObjects.create( -// MapView( -// MD5HashData.Root.hash -> MD5HashData.Root.remoteKey, -// MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey -// ).toMap.asJava, -// MapView( -// MD5HashData.Root.remoteKey -> MD5HashData.Root.hash, -// MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash -// ).toMap.asJava -// ) -// "do nothing for all files" - { -// "no archive actions" in { -// actions.set(List.empty) -// runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv)) -// val actionList: Set[Action] = actions.get.map(_.action).toSet -// actionList should have size 0 -// } -// "ui is updated" in { -// uiEvents.set(List.empty) -// runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv)) -// uiEventsSummary shouldEqual List( -// "key found: root-file", -// "key found: subdir/leaf-file" -// ) -// } -// } -// } -// "where remote has extra objects" - { -// val extraHash = MD5Hash.create("extra") -// val extraObject = RemoteKey.create("extra") -// val remoteObjects = RemoteObjects.create( -// MapView( -// MD5HashData.Root.hash -> MD5HashData.Root.remoteKey, -// MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey, -// extraHash -> extraObject -// ).toMap.asJava, -// MapView( -// MD5HashData.Root.remoteKey -> MD5HashData.Root.hash, -// MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash, -// extraObject -> extraHash -// ).toMap.asJava -// ) -// "remove the extra object" - { -// "archive delete action" in { -// actions.set(List.empty) -// runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv)) -// val actionList: Set[Action] = actions.get.map(_.action).toSet -// actionList should have size 1 -// actionList -// .filter(_.isInstanceOf[ToDelete]) -// .map(_.remoteKey) shouldEqual Set(extraObject) -// } -// "ui is updated" in { -// uiEvents.set(List.empty) -// runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv)) -// uiEventsSummary shouldEqual List( -// "key found: root-file", -// "key found: subdir/leaf-file", -// "key found: extra", -// "action chosen : extra : ToDelete", -// "action finished : extra : ToDelete" -// ) -// } -// } -// } -// } - - private def uiEventsSummary: List[String] = { - uiEvents - .get() - .reverse - .map { - case uie: FileFound => - String.format("file found : %s", uie.localFile.remoteKey.key) - case uie: ActionChosen => - String.format( - "action chosen : %s : %s", - uie.action.remoteKey.key, - uie.action.getClass.getSimpleName - ) - case uie: ActionFinished => - String.format( - "action finished : %s : %s", - uie.action.remoteKey.key, - uie.action.getClass.getSimpleName - ) - case uie: KeyFound => - String.format("key found: %s", uie.remoteKey.key) - case x => String.format("unknown : %s", x.getClass.getSimpleName) - } - } - -} diff --git a/storage-aws/src/main/java/net/kemitix/thorp/storage/aws/S3ETagGenerator.java b/storage-aws/src/main/java/net/kemitix/thorp/storage/aws/S3ETagGenerator.java index 9debcfa..0be3422 100644 --- a/storage-aws/src/main/java/net/kemitix/thorp/storage/aws/S3ETagGenerator.java +++ b/storage-aws/src/main/java/net/kemitix/thorp/storage/aws/S3ETagGenerator.java @@ -1,8 +1,7 @@ package net.kemitix.thorp.storage.aws; -import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.transfer.TransferManagerConfiguration; -import com.amazonaws.services.s3.transfer.internal.TransferManagerUtils; +import lombok.With; import net.kemitix.thorp.domain.HashGenerator; import net.kemitix.thorp.domain.HashType; import net.kemitix.thorp.domain.Hashes; @@ -19,14 +18,27 @@ import java.util.stream.LongStream; import static net.kemitix.thorp.filesystem.MD5HashGenerator.md5FileChunk; +@With public class S3ETagGenerator implements HashGenerator { + + private final long multipartUploadThreshold; + + public S3ETagGenerator() { + multipartUploadThreshold = + new TransferManagerConfiguration().getMultipartUploadThreshold(); + } + + public S3ETagGenerator(long multipartUploadThreshold) { + this.multipartUploadThreshold = multipartUploadThreshold; + } + @Deprecated // Use hashFile public String eTag(Path path) throws IOException, NoSuchAlgorithmException { return hashFile(path); } @Override public String hashFile(Path path) throws IOException, NoSuchAlgorithmException { - long partSize = calculatePartSize(path); + long partSize = calculatePartSize(); long parts = numParts(path.toFile().length(), partSize); String eTagHex = eTagHex(path, partSize, parts); return String.format("%s-%d", eTagHex, parts); @@ -51,10 +63,8 @@ public class S3ETagGenerator implements HashGenerator { .collect(Collectors.toList()); } - private long calculatePartSize(Path path) { - return TransferManagerUtils.calculateOptimalPartSize( - new PutObjectRequest("", "", path.toFile()), - new TransferManagerConfiguration()); + private long calculatePartSize() { + return multipartUploadThreshold; } private long numParts(long length, long partSize) { diff --git a/storage-aws/src/test/java/net/kemitix/thorp/storage/aws/ETagGeneratorTest.java b/storage-aws/src/test/java/net/kemitix/thorp/storage/aws/ETagGeneratorTest.java index eb87722..fafde25 100644 --- a/storage-aws/src/test/java/net/kemitix/thorp/storage/aws/ETagGeneratorTest.java +++ b/storage-aws/src/test/java/net/kemitix/thorp/storage/aws/ETagGeneratorTest.java @@ -48,7 +48,8 @@ public class ETagGeneratorTest @Test @DisplayName("create eTag for whole file") public void createTagForWholeFile() throws IOException, NoSuchAlgorithmException { - String result = generator.hashFile(bigFile.toPath()); + String result = generator.withMultipartUploadThreshold(5 * 1024 * 1024) + .hashFile(bigFile.toPath()); assertThat(result).isEqualTo(BIG_FILE_ETAG); } } diff --git a/uishell/src/main/java/net/kemitix/thorp/uishell/UIShell.java b/uishell/src/main/java/net/kemitix/thorp/uishell/UIShell.java index 54a17d3..3525440 100644 --- a/uishell/src/main/java/net/kemitix/thorp/uishell/UIShell.java +++ b/uishell/src/main/java/net/kemitix/thorp/uishell/UIShell.java @@ -4,13 +4,14 @@ import net.kemitix.thorp.config.Configuration; import net.kemitix.thorp.console.Console; import net.kemitix.thorp.console.ConsoleOut; import net.kemitix.thorp.domain.*; +import net.kemitix.thorp.domain.channel.Listener; import static net.kemitix.thorp.domain.Terminal.eraseLineForward; import static net.kemitix.thorp.domain.Terminal.eraseToEndOfScreen; public interface UIShell { - static Channel.Listener listener(Configuration configuration) { - return new Channel.Listener() { + static Listener listener(Configuration configuration) { + return new Listener() { @Override public void accept(UIEvent uiEvent) { if (uiEvent instanceof UIEvent.RequestCycle) diff --git a/uishell/src/main/java/net/kemitix/thorp/uishell/UploadEventListener.java b/uishell/src/main/java/net/kemitix/thorp/uishell/UploadEventListener.java index 7b135b0..fd3b0f7 100644 --- a/uishell/src/main/java/net/kemitix/thorp/uishell/UploadEventListener.java +++ b/uishell/src/main/java/net/kemitix/thorp/uishell/UploadEventListener.java @@ -2,8 +2,8 @@ package net.kemitix.thorp.uishell; import lombok.AccessLevel; import lombok.RequiredArgsConstructor; -import net.kemitix.thorp.domain.Channel; import net.kemitix.thorp.domain.LocalFile; +import net.kemitix.thorp.domain.channel.Sink; import net.kemitix.thorp.uishell.UploadProgressEvent.RequestEvent; import java.util.concurrent.atomic.AtomicLong; @@ -29,7 +29,7 @@ public class UploadEventListener { } public static Settings settings( - Channel.Sink uiSink, + Sink uiSink, LocalFile localFile, int index, long totalBytesToFar, @@ -39,7 +39,7 @@ public class UploadEventListener { } @RequiredArgsConstructor(access = AccessLevel.PRIVATE) public static class Settings { - public final Channel.Sink uiSink; + public final Sink uiSink; public final LocalFile localFile; public final int index; public final long totalBytesSoFar;