Convert lib module to Java (#487)
* lib.FileScannerTest: convert to Java * lib.Filters: convert to Java * lib.FiltersTest: convert to Java * lib.Archive: convert to Java * lib.LocalFileSystem: convert to Java * domain.Channel: better channel termination * domain.Channel: improve assurance that threads die * Ensure uisink and storage shutdown when exiting program * domain.HashesTest: clean up * domain.Channel.run() already shuts down channel when runner finishes * lib.FileScannerTest: add listener before run * domain.Channel: add lock around take and only interrupt if waiting * .run: add intellij test run configurations * app. update in-code version * app: chain ui channel construction * domain.Channel: add tracing option and simplify thread naming * storage-aws: use default multipartUploadThreshold * app: force exit once program is completed * domain.channel: split up and move Channel to its own package * lib.LocalFileSystemTest: convert to Java * lib.SequencedAction: convert to Java * domain.LocalFile: don’t box file length * lib.FileScannerTest: include hashes * lib.UnversionedMirrorArchive: convert to Java * lib: remove scala dependencies
This commit is contained in:
parent
f30021873e
commit
783402f745
39 changed files with 1402 additions and 1281 deletions
16
.run/Main.run.xml
Normal file
16
.run/Main.run.xml
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
<component name="ProjectRunConfigurationManager">
|
||||||
|
<configuration default="false" name="Main" type="Application" factoryName="Application">
|
||||||
|
<option name="MAIN_CLASS_NAME" value="net.kemitix.thorp.Main" />
|
||||||
|
<module name="thorp" />
|
||||||
|
<option name="PROGRAM_PARAMETERS" value="--source $PROJECT_DIR$/../" />
|
||||||
|
<extension name="coverage">
|
||||||
|
<pattern>
|
||||||
|
<option name="PATTERN" value="net.kemitix.thorp.*" />
|
||||||
|
<option name="ENABLED" value="true" />
|
||||||
|
</pattern>
|
||||||
|
</extension>
|
||||||
|
<method v="2">
|
||||||
|
<option name="Make" enabled="true" />
|
||||||
|
</method>
|
||||||
|
</configuration>
|
||||||
|
</component>
|
23
.run/Test whole project.run.xml
Normal file
23
.run/Test whole project.run.xml
Normal file
|
@ -0,0 +1,23 @@
|
||||||
|
<component name="ProjectRunConfigurationManager">
|
||||||
|
<configuration default="false" name="Test whole project" type="JUnit" factoryName="JUnit" repeat_count="3">
|
||||||
|
<useClassPathOnly />
|
||||||
|
<extension name="coverage">
|
||||||
|
<pattern>
|
||||||
|
<option name="PATTERN" value="net.kemitix.thorp.*" />
|
||||||
|
<option name="ENABLED" value="true" />
|
||||||
|
</pattern>
|
||||||
|
</extension>
|
||||||
|
<option name="PACKAGE_NAME" value="net.kemitix.thorp" />
|
||||||
|
<option name="MAIN_CLASS_NAME" value="" />
|
||||||
|
<option name="METHOD_NAME" value="" />
|
||||||
|
<option name="TEST_OBJECT" value="package" />
|
||||||
|
<option name="PARAMETERS" value="" />
|
||||||
|
<option name="TEST_SEARCH_SCOPE">
|
||||||
|
<value defaultName="wholeProject" />
|
||||||
|
</option>
|
||||||
|
<method v="2">
|
||||||
|
<option name="Make" enabled="true" />
|
||||||
|
<option name="BSP.BeforeRunTask" enabled="true" />
|
||||||
|
</method>
|
||||||
|
</configuration>
|
||||||
|
</component>
|
|
@ -2,7 +2,9 @@ package net.kemitix.thorp
|
||||||
|
|
||||||
object Main {
|
object Main {
|
||||||
|
|
||||||
def main(args: Array[String]): Unit =
|
def main(args: Array[String]): Unit = {
|
||||||
Program.run(args.toList)
|
Program.run(args.toList)
|
||||||
|
System.exit(0)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,7 +9,8 @@ import net.kemitix.thorp.domain.StorageEvent.{
|
||||||
ErrorEvent,
|
ErrorEvent,
|
||||||
UploadEvent
|
UploadEvent
|
||||||
}
|
}
|
||||||
import net.kemitix.thorp.domain.{Channel, Counters, StorageEvent}
|
import net.kemitix.thorp.domain.channel.{Channel, Sink}
|
||||||
|
import net.kemitix.thorp.domain.{Counters, StorageEvent}
|
||||||
import net.kemitix.thorp.lib.{LocalFileSystem, UnversionedMirrorArchive}
|
import net.kemitix.thorp.lib.{LocalFileSystem, UnversionedMirrorArchive}
|
||||||
import net.kemitix.thorp.storage.Storage
|
import net.kemitix.thorp.storage.Storage
|
||||||
import net.kemitix.thorp.uishell.{UIEvent, UIShell}
|
import net.kemitix.thorp.uishell.{UIEvent, UIShell}
|
||||||
|
@ -19,7 +20,7 @@ import scala.jdk.CollectionConverters._
|
||||||
|
|
||||||
trait Program {
|
trait Program {
|
||||||
|
|
||||||
val version = "0.11.0"
|
val version = "1.1.0-SNAPSHOT"
|
||||||
lazy val versionLabel = s"${WHITE}Thorp v$version$RESET"
|
lazy val versionLabel = s"${WHITE}Thorp v$version$RESET"
|
||||||
|
|
||||||
def run(args: List[String]): Unit = {
|
def run(args: List[String]): Unit = {
|
||||||
|
@ -35,33 +36,39 @@ trait Program {
|
||||||
cli => ConfigQuery.showVersion(cli)
|
cli => ConfigQuery.showVersion(cli)
|
||||||
|
|
||||||
private def executeWithUI(configuration: Configuration): Unit = {
|
private def executeWithUI(configuration: Configuration): Unit = {
|
||||||
val uiChannel: Channel[UIEvent] = Channel.create("thorp-ui")
|
Channel
|
||||||
uiChannel.addListener(UIShell.listener(configuration))
|
.create("ui")
|
||||||
uiChannel.run(sink => execute(configuration, sink), "thorp-main")
|
.addListener(UIShell.listener(configuration))
|
||||||
uiChannel.start()
|
.run(execute(configuration)(_))
|
||||||
uiChannel.waitForShutdown()
|
.start()
|
||||||
|
.waitForShutdown()
|
||||||
}
|
}
|
||||||
|
|
||||||
private def execute(configuration: Configuration,
|
private def execute(
|
||||||
uiSink: Channel.Sink[UIEvent]) = {
|
configuration: Configuration
|
||||||
|
)(uiSink: Sink[UIEvent]): Unit = {
|
||||||
|
try {
|
||||||
showValidConfig(uiSink)
|
showValidConfig(uiSink)
|
||||||
val remoteObjects =
|
val remoteObjects =
|
||||||
fetchRemoteData(configuration, uiSink)
|
fetchRemoteData(configuration, uiSink)
|
||||||
val archive = UnversionedMirrorArchive
|
val archive = UnversionedMirrorArchive.create
|
||||||
val storageEvents = LocalFileSystem
|
val storageEvents = LocalFileSystem
|
||||||
.scanCopyUpload(configuration, uiSink, remoteObjects, archive)
|
.scanCopyUpload(configuration, uiSink, remoteObjects, archive)
|
||||||
val deleteEvents = LocalFileSystem
|
val deleteEvents = LocalFileSystem
|
||||||
.scanDelete(configuration, uiSink, remoteObjects, archive)
|
.scanDelete(configuration, uiSink, remoteObjects, archive)
|
||||||
Storage.getInstance().shutdown();
|
showSummary(uiSink)(
|
||||||
showSummary(uiSink)(storageEvents ++ deleteEvents)
|
(storageEvents.asScala ++ deleteEvents.asScala).toList
|
||||||
uiSink.shutdown();
|
)
|
||||||
|
} finally {
|
||||||
|
Storage.getInstance().shutdown()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def showValidConfig(uiSink: Channel.Sink[UIEvent]): Unit =
|
private def showValidConfig(uiSink: Sink[UIEvent]): Unit =
|
||||||
uiSink.accept(UIEvent.showValidConfig)
|
uiSink.accept(UIEvent.showValidConfig)
|
||||||
|
|
||||||
private def fetchRemoteData(configuration: Configuration,
|
private def fetchRemoteData(configuration: Configuration,
|
||||||
uiSink: Channel.Sink[UIEvent]) = {
|
uiSink: Sink[UIEvent]) = {
|
||||||
val bucket = configuration.bucket
|
val bucket = configuration.bucket
|
||||||
val prefix = configuration.prefix
|
val prefix = configuration.prefix
|
||||||
val objects = Storage.getInstance().list(bucket, prefix)
|
val objects = Storage.getInstance().list(bucket, prefix)
|
||||||
|
@ -78,7 +85,7 @@ trait Program {
|
||||||
}
|
}
|
||||||
|
|
||||||
private def showSummary(
|
private def showSummary(
|
||||||
uiSink: Channel.Sink[UIEvent]
|
uiSink: Sink[UIEvent]
|
||||||
)(events: Seq[StorageEvent]): Unit = {
|
)(events: Seq[StorageEvent]): Unit = {
|
||||||
val counters = events.foldLeft(Counters.empty)(countActivities)
|
val counters = events.foldLeft(Counters.empty)(countActivities)
|
||||||
uiSink.accept(UIEvent.showSummary(counters))
|
uiSink.accept(UIEvent.showSummary(counters))
|
||||||
|
|
|
@ -21,7 +21,7 @@ public class Configuration {
|
||||||
public final boolean batchMode;
|
public final boolean batchMode;
|
||||||
public final int parallel;
|
public final int parallel;
|
||||||
public final Sources sources;
|
public final Sources sources;
|
||||||
static Configuration create() {
|
public static Configuration create() {
|
||||||
return new Configuration(
|
return new Configuration(
|
||||||
Bucket.named(""),
|
Bucket.named(""),
|
||||||
RemoteKey.create(""),
|
RemoteKey.create(""),
|
||||||
|
|
Binary file not shown.
Before Width: | Height: | Size: 193 KiB After Width: | Height: | Size: 177 KiB |
|
@ -1,188 +0,0 @@
|
||||||
package net.kemitix.thorp.domain;
|
|
||||||
|
|
||||||
import lombok.RequiredArgsConstructor;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.concurrent.BlockingQueue;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.LinkedTransferQueue;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.function.Consumer;
|
|
||||||
|
|
||||||
public interface Channel<T> {
|
|
||||||
static <T> Channel<T> create(String name) {
|
|
||||||
return new ChannelImpl<T>(name);
|
|
||||||
}
|
|
||||||
|
|
||||||
void start();
|
|
||||||
Channel<T> add(T item);
|
|
||||||
Channel<T> addAll(Collection<T> items);
|
|
||||||
Channel<T> addListener(Listener<T> listener);
|
|
||||||
Channel<T> removeListener(Listener<T> listener);
|
|
||||||
Channel<T> run(Consumer<Sink<T>> program, String name);
|
|
||||||
void shutdown();
|
|
||||||
void shutdownNow() throws InterruptedException;
|
|
||||||
void waitForShutdown() throws InterruptedException;
|
|
||||||
|
|
||||||
class ChannelImpl<T> implements Channel<T> {
|
|
||||||
private final BlockingQueue<T> queue = new LinkedTransferQueue<>();
|
|
||||||
private final Runner<T> runner;
|
|
||||||
private final Thread thread;
|
|
||||||
|
|
||||||
public ChannelImpl(String name) {
|
|
||||||
runner = new Runner<T>(name, queue);
|
|
||||||
thread = new Thread(runner, name);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void start() {
|
|
||||||
thread.start();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Channel<T> add(T item) {
|
|
||||||
queue.add(item);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Channel<T> addAll(Collection<T> items) {
|
|
||||||
queue.addAll(items);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Channel<T> addListener(Listener<T> listener) {
|
|
||||||
runner.addListener(listener);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Channel<T> removeListener(Listener<T> listener) {
|
|
||||||
runner.removeListener(listener);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Channel<T> run(Consumer<Sink<T>> program, String name) {
|
|
||||||
return spawn(() -> program.accept(runner), name);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Channel<T> spawn(Runnable runnable, String name) {
|
|
||||||
Thread thread = new Thread(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
runnable.run();
|
|
||||||
} catch (Exception e) {
|
|
||||||
shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, name);
|
|
||||||
thread.start();
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void shutdown() {
|
|
||||||
runner.shutdown();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void shutdownNow() throws InterruptedException {
|
|
||||||
runner.shutdownNow();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void waitForShutdown() throws InterruptedException {
|
|
||||||
runner.waitForShutdown();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@RequiredArgsConstructor
|
|
||||||
class Runner<T> implements Runnable, Sink<T> {
|
|
||||||
private final String name;
|
|
||||||
private final BlockingQueue<T> queue;
|
|
||||||
private final AtomicBoolean shutdown = new AtomicBoolean(false);
|
|
||||||
private final List<Listener<T>> listeners = new ArrayList<>();
|
|
||||||
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
|
|
||||||
private Thread runnerThread;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
runnerThread = Thread.currentThread();
|
|
||||||
while(isRunning()) {
|
|
||||||
takeItem()
|
|
||||||
.ifPresent(item -> {
|
|
||||||
listeners.forEach(listener -> {
|
|
||||||
listener.accept(item);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
shutdownLatch.countDown();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void addListener(Listener<T> listener) {
|
|
||||||
listeners.add(listener);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void removeListener(Listener<T> listener) {
|
|
||||||
listeners.remove(listener);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Optional<T> takeItem() {
|
|
||||||
try {
|
|
||||||
return Optional.of(queue.take());
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
shutdown();
|
|
||||||
}
|
|
||||||
return Optional.empty();
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean isRunning() {
|
|
||||||
return !isShutdown();
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean isShutdown() {
|
|
||||||
return shutdown.get() && queue.isEmpty();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void accept(T item) {
|
|
||||||
queue.add(item);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void shutdown() {
|
|
||||||
if (isRunning()) {
|
|
||||||
shutdown.set(true);
|
|
||||||
}
|
|
||||||
if (queue.isEmpty() && runnerThread != null) {
|
|
||||||
runnerThread.interrupt();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void shutdownNow() throws InterruptedException {
|
|
||||||
shutdown();
|
|
||||||
waitForShutdown();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void waitForShutdown() throws InterruptedException {
|
|
||||||
if (isRunning())
|
|
||||||
shutdownLatch.await();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
interface Listener<T> {
|
|
||||||
void accept(T item);
|
|
||||||
}
|
|
||||||
|
|
||||||
interface Sink<T> {
|
|
||||||
void accept(T item);
|
|
||||||
void shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,7 +1,5 @@
|
||||||
package net.kemitix.thorp.domain;
|
package net.kemitix.thorp.domain;
|
||||||
|
|
||||||
import lombok.AccessLevel;
|
|
||||||
import lombok.RequiredArgsConstructor;
|
|
||||||
import net.kemitix.mon.TypeAlias;
|
import net.kemitix.mon.TypeAlias;
|
||||||
|
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
|
|
|
@ -21,6 +21,10 @@ public interface HashGenerator {
|
||||||
for(HashGenerator hashGenerator: hashGenerators) {
|
for(HashGenerator hashGenerator: hashGenerators) {
|
||||||
list.add(hashGenerator);
|
list.add(hashGenerator);
|
||||||
}
|
}
|
||||||
|
if (list.isEmpty()) {
|
||||||
|
throw new UnsupportedOperationException(
|
||||||
|
"No HashGenerator implementations available");
|
||||||
|
}
|
||||||
return list;
|
return list;
|
||||||
}
|
}
|
||||||
static HashGenerator generatorFor(String label) {
|
static HashGenerator generatorFor(String label) {
|
||||||
|
|
|
@ -1,24 +1,26 @@
|
||||||
package net.kemitix.thorp.domain;
|
package net.kemitix.thorp.domain;
|
||||||
|
|
||||||
import lombok.AccessLevel;
|
import lombok.AccessLevel;
|
||||||
|
import lombok.EqualsAndHashCode;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
|
@EqualsAndHashCode
|
||||||
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
|
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
|
||||||
public class LocalFile {
|
public class LocalFile {
|
||||||
public final File file;
|
public final File file;
|
||||||
public final File source;
|
public final File source;
|
||||||
public final Hashes hashes;
|
public final Hashes hashes;
|
||||||
public final RemoteKey remoteKey;
|
public final RemoteKey remoteKey;
|
||||||
public final Long length;
|
public final long length;
|
||||||
public static LocalFile create(
|
public static LocalFile create(
|
||||||
File file,
|
File file,
|
||||||
File source,
|
File source,
|
||||||
Hashes hashes,
|
Hashes hashes,
|
||||||
RemoteKey remoteKey,
|
RemoteKey remoteKey,
|
||||||
Long length
|
long length
|
||||||
) {
|
) {
|
||||||
return new LocalFile(file, source, hashes, remoteKey, length);
|
return new LocalFile(file, source, hashes, remoteKey, length);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,26 @@
|
||||||
|
package net.kemitix.thorp.domain.channel;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
public interface Channel<T> extends Sink<T> {
|
||||||
|
|
||||||
|
static <T> Channel<T> create(String name) {
|
||||||
|
return new ChannelImpl<>(name, false);
|
||||||
|
}
|
||||||
|
static <T> Channel<T> createWithTracing(String name) {
|
||||||
|
return new ChannelImpl<>(name, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
Channel<T> start();
|
||||||
|
Channel<T> add(T item);
|
||||||
|
Channel<T> addAll(Collection<T> items);
|
||||||
|
Channel<T> addListener(Listener<T> listener);
|
||||||
|
Channel<T> removeListener(Listener<T> listener);
|
||||||
|
Channel<T> run(Consumer<Sink<T>> program);
|
||||||
|
|
||||||
|
void shutdown();
|
||||||
|
void shutdownNow() throws InterruptedException;
|
||||||
|
void waitForShutdown() throws InterruptedException;
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,97 @@
|
||||||
|
package net.kemitix.thorp.domain.channel;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.LinkedTransferQueue;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
class ChannelImpl<T> implements Channel<T> {
|
||||||
|
private final boolean tracing;
|
||||||
|
private final BlockingQueue<T> queue = new LinkedTransferQueue<>();
|
||||||
|
private final Runner<T> runner;
|
||||||
|
private final Thread thread;
|
||||||
|
private final String name;
|
||||||
|
|
||||||
|
public ChannelImpl(String name, boolean tracing) {
|
||||||
|
this.name = name;
|
||||||
|
this.tracing = tracing;
|
||||||
|
runner = new Runner<>(queue, tracing);
|
||||||
|
thread = new Thread(runner, String.format("---->-lnr-%s", name));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Channel<T> start() {
|
||||||
|
trace("launching");
|
||||||
|
thread.start();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void trace(String message) {
|
||||||
|
if (tracing)
|
||||||
|
System.out.printf("[channel:%s] %s%n", Thread.currentThread().getName(), message);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void accept(T item) {
|
||||||
|
queue.add(item);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Channel<T> add(T item) {
|
||||||
|
queue.add(item);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Channel<T> addAll(Collection<T> items) {
|
||||||
|
queue.addAll(items);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Channel<T> addListener(Listener<T> listener) {
|
||||||
|
runner.addListener(listener);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Channel<T> removeListener(Listener<T> listener) {
|
||||||
|
runner.removeListener(listener);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Channel<T> run(Consumer<Sink<T>> program) {
|
||||||
|
return spawn(() -> program.accept(runner));
|
||||||
|
}
|
||||||
|
|
||||||
|
private Channel<T> spawn(Runnable runnable) {
|
||||||
|
Thread thread = new Thread(() -> {
|
||||||
|
trace("starting");
|
||||||
|
try {
|
||||||
|
runnable.run();
|
||||||
|
trace("finishing normally");
|
||||||
|
} finally {
|
||||||
|
shutdown();
|
||||||
|
trace("stopping");
|
||||||
|
}
|
||||||
|
}, String.format("channel-src->-----%s", name));
|
||||||
|
thread.start();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdown() {
|
||||||
|
runner.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdownNow() throws InterruptedException {
|
||||||
|
runner.shutdownNow();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void waitForShutdown() throws InterruptedException {
|
||||||
|
runner.waitForShutdown();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,5 @@
|
||||||
|
package net.kemitix.thorp.domain.channel;
|
||||||
|
|
||||||
|
public interface Listener<T> {
|
||||||
|
void accept(T item);
|
||||||
|
}
|
|
@ -0,0 +1,107 @@
|
||||||
|
package net.kemitix.thorp.domain.channel;
|
||||||
|
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
class Runner<T> implements Runnable, Sink<T> {
|
||||||
|
|
||||||
|
private final BlockingQueue<T> queue;
|
||||||
|
private final boolean tracing;
|
||||||
|
|
||||||
|
private final AtomicBoolean shutdown = new AtomicBoolean(false);
|
||||||
|
private final AtomicBoolean isWaiting = new AtomicBoolean(false);
|
||||||
|
private final List<Listener<T>> listeners = new ArrayList<>();
|
||||||
|
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
|
||||||
|
private final Object takeLock = new Object();
|
||||||
|
|
||||||
|
private Thread runnerThread;
|
||||||
|
|
||||||
|
public void trace(String message) {
|
||||||
|
if (tracing)
|
||||||
|
System.out.printf("[runner :%s] %s%n", Thread.currentThread().getName(), message);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
runnerThread = Thread.currentThread();
|
||||||
|
trace("starting");
|
||||||
|
try {
|
||||||
|
while (isRunning()) {
|
||||||
|
takeItem()
|
||||||
|
.ifPresent(item -> {
|
||||||
|
listeners.forEach(listener ->
|
||||||
|
listener.accept(item));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
trace("finishing");
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// would have been waiting to take from an empty queue when it was shutdown
|
||||||
|
trace(String.format("interrupted (%d items in queue)", queue.size()));
|
||||||
|
} finally {
|
||||||
|
trace("complete");
|
||||||
|
shutdownLatch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addListener(Listener<T> listener) {
|
||||||
|
listeners.add(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeListener(Listener<T> listener) {
|
||||||
|
listeners.remove(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Optional<T> takeItem() throws InterruptedException {
|
||||||
|
synchronized (takeLock) {
|
||||||
|
isWaiting.set(true);
|
||||||
|
T take = queue.take();
|
||||||
|
isWaiting.set(false);
|
||||||
|
return Optional.of(take);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isRunning() {
|
||||||
|
return !isShutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isShutdown() {
|
||||||
|
return shutdown.get() && queue.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void accept(T item) {
|
||||||
|
queue.add(item);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdown() {
|
||||||
|
String threadName = Thread.currentThread().getName();
|
||||||
|
if (isRunning()) {
|
||||||
|
trace("running - marking as shutdown");
|
||||||
|
shutdown.set(true);
|
||||||
|
}
|
||||||
|
if (queue.isEmpty() && isWaiting.get() && runnerThread != null) {
|
||||||
|
trace("interrupting waiting runner");
|
||||||
|
runnerThread.interrupt();
|
||||||
|
} else {
|
||||||
|
trace(String.format("NOT interrupting runner (%d items, waiting: %s)", queue.size(), isWaiting.get()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void shutdownNow() throws InterruptedException {
|
||||||
|
shutdown();
|
||||||
|
waitForShutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void waitForShutdown() throws InterruptedException {
|
||||||
|
if (isRunning())
|
||||||
|
shutdownLatch.await();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,5 @@
|
||||||
|
package net.kemitix.thorp.domain.channel;
|
||||||
|
|
||||||
|
public interface Sink<T> extends Listener<T> {
|
||||||
|
void shutdown();
|
||||||
|
}
|
|
@ -10,11 +10,8 @@ import java.util.Arrays;
|
||||||
public class HashesTest
|
public class HashesTest
|
||||||
implements WithAssertions {
|
implements WithAssertions {
|
||||||
|
|
||||||
@Nested
|
|
||||||
@DisplayName("mergeAll")
|
|
||||||
public class MergeAll {
|
|
||||||
@Test
|
@Test
|
||||||
@DisplayName("")
|
@DisplayName("mergeAll()")
|
||||||
public void mergeAll() {
|
public void mergeAll() {
|
||||||
//given
|
//given
|
||||||
HashType key1 = HashType.MD5;
|
HashType key1 = HashType.MD5;
|
||||||
|
@ -29,5 +26,5 @@ public class HashesTest
|
||||||
assertThat(result.keys()).containsExactlyInAnyOrder(key1, key2);
|
assertThat(result.keys()).containsExactlyInAnyOrder(key1, key2);
|
||||||
assertThat(result.values()).containsExactlyInAnyOrder(value1, value2);
|
assertThat(result.values()).containsExactlyInAnyOrder(value1, value2);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
}
|
23
lib/pom.xml
23
lib/pom.xml
|
@ -41,27 +41,16 @@
|
||||||
<artifactId>thorp-storage</artifactId>
|
<artifactId>thorp-storage</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<!-- scala -->
|
<!-- testing -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.scala-lang</groupId>
|
<groupId>org.junit.jupiter</groupId>
|
||||||
<artifactId>scala-library</artifactId>
|
<artifactId>junit-jupiter</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<!-- scala - testing -->
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.scalatest</groupId>
|
<groupId>org.assertj</groupId>
|
||||||
<artifactId>scalatest_2.13</artifactId>
|
<artifactId>assertj-core</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
|
||||||
<plugins>
|
|
||||||
<plugin>
|
|
||||||
<groupId>net.alchim31.maven</groupId>
|
|
||||||
<artifactId>scala-maven-plugin</artifactId>
|
|
||||||
</plugin>
|
|
||||||
</plugins>
|
|
||||||
</build>
|
|
||||||
|
|
||||||
</project>
|
</project>
|
49
lib/src/main/java/net/kemitix/thorp/lib/Archive.java
Normal file
49
lib/src/main/java/net/kemitix/thorp/lib/Archive.java
Normal file
|
@ -0,0 +1,49 @@
|
||||||
|
package net.kemitix.thorp.lib;
|
||||||
|
|
||||||
|
import net.kemitix.thorp.config.Configuration;
|
||||||
|
import net.kemitix.thorp.console.Console;
|
||||||
|
import net.kemitix.thorp.console.ConsoleOut;
|
||||||
|
import net.kemitix.thorp.domain.RemoteKey;
|
||||||
|
import net.kemitix.thorp.domain.StorageEvent;
|
||||||
|
import net.kemitix.thorp.domain.channel.Sink;
|
||||||
|
import net.kemitix.thorp.uishell.UIEvent;
|
||||||
|
|
||||||
|
public interface Archive {
|
||||||
|
|
||||||
|
StorageEvent update(
|
||||||
|
Configuration configuration,
|
||||||
|
Sink<UIEvent> uiSink,
|
||||||
|
SequencedAction sequencedAction,
|
||||||
|
long totalBytesSoFar);
|
||||||
|
|
||||||
|
default StorageEvent logEvent(
|
||||||
|
Configuration configuration,
|
||||||
|
StorageEvent event
|
||||||
|
) {
|
||||||
|
boolean batchMode = configuration.batchMode;
|
||||||
|
|
||||||
|
if (event instanceof StorageEvent.UploadEvent) {
|
||||||
|
RemoteKey remoteKey = ((StorageEvent.UploadEvent) event).remoteKey;
|
||||||
|
Console.putMessageLnB(
|
||||||
|
ConsoleOut.uploadComplete(remoteKey), batchMode);
|
||||||
|
} else if (event instanceof StorageEvent.CopyEvent) {
|
||||||
|
StorageEvent.CopyEvent copyEvent = (StorageEvent.CopyEvent) event;
|
||||||
|
RemoteKey sourceKey = copyEvent.sourceKey;
|
||||||
|
RemoteKey targetKey = copyEvent.targetKey;
|
||||||
|
Console.putMessageLnB(
|
||||||
|
ConsoleOut.copyComplete(sourceKey, targetKey), batchMode);
|
||||||
|
} else if (event instanceof StorageEvent.DeleteEvent) {
|
||||||
|
RemoteKey remoteKey = ((StorageEvent.DeleteEvent) event).remoteKey;
|
||||||
|
Console.putMessageLnB(
|
||||||
|
ConsoleOut.deleteComplete(remoteKey), batchMode);
|
||||||
|
} else if (event instanceof StorageEvent.ErrorEvent) {
|
||||||
|
StorageEvent.ErrorEvent errorEvent = (StorageEvent.ErrorEvent) event;
|
||||||
|
StorageEvent.ActionSummary action = errorEvent.action;
|
||||||
|
Throwable e = errorEvent.e;
|
||||||
|
Console.putMessageLnB(
|
||||||
|
ConsoleOut.errorQueueEventOccurred(action, e), batchMode);
|
||||||
|
}
|
||||||
|
|
||||||
|
return event;
|
||||||
|
}
|
||||||
|
}
|
|
@ -2,6 +2,7 @@ package net.kemitix.thorp.lib;
|
||||||
|
|
||||||
import net.kemitix.thorp.config.Configuration;
|
import net.kemitix.thorp.config.Configuration;
|
||||||
import net.kemitix.thorp.domain.*;
|
import net.kemitix.thorp.domain.*;
|
||||||
|
import net.kemitix.thorp.domain.channel.Sink;
|
||||||
import net.kemitix.thorp.filesystem.FileSystem;
|
import net.kemitix.thorp.filesystem.FileSystem;
|
||||||
import net.kemitix.thorp.filesystem.PathCache;
|
import net.kemitix.thorp.filesystem.PathCache;
|
||||||
|
|
||||||
|
@ -14,17 +15,16 @@ import java.util.List;
|
||||||
public interface FileScanner {
|
public interface FileScanner {
|
||||||
static void scanSources(
|
static void scanSources(
|
||||||
Configuration configuration,
|
Configuration configuration,
|
||||||
Channel.Sink<LocalFile> fileSink
|
Sink<LocalFile> fileSink
|
||||||
) {
|
) {
|
||||||
configuration.sources.paths()
|
configuration.sources.paths()
|
||||||
.forEach(path ->
|
.forEach(path ->
|
||||||
scanSource(configuration, fileSink, path));
|
scanSource(configuration, fileSink, path));
|
||||||
fileSink.shutdown();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void scanSource(
|
static void scanSource(
|
||||||
Configuration configuration,
|
Configuration configuration,
|
||||||
Channel.Sink<LocalFile> fileSink,
|
Sink<LocalFile> fileSink,
|
||||||
Path sourcePath
|
Path sourcePath
|
||||||
) {
|
) {
|
||||||
scanPath(configuration, fileSink, sourcePath);
|
scanPath(configuration, fileSink, sourcePath);
|
||||||
|
@ -32,7 +32,7 @@ public interface FileScanner {
|
||||||
|
|
||||||
static void scanPath(
|
static void scanPath(
|
||||||
Configuration configuration,
|
Configuration configuration,
|
||||||
Channel.Sink<LocalFile> fileSink,
|
Sink<LocalFile> fileSink,
|
||||||
Path path
|
Path path
|
||||||
) {
|
) {
|
||||||
// dirs
|
// dirs
|
||||||
|
@ -45,7 +45,7 @@ public interface FileScanner {
|
||||||
|
|
||||||
static void handleFile(
|
static void handleFile(
|
||||||
Configuration configuration,
|
Configuration configuration,
|
||||||
Channel.Sink<LocalFile> fileSink,
|
Sink<LocalFile> fileSink,
|
||||||
File file
|
File file
|
||||||
) {
|
) {
|
||||||
boolean isIncluded = Filters.isIncluded(configuration, file);
|
boolean isIncluded = Filters.isIncluded(configuration, file);
|
||||||
|
|
56
lib/src/main/java/net/kemitix/thorp/lib/Filters.java
Normal file
56
lib/src/main/java/net/kemitix/thorp/lib/Filters.java
Normal file
|
@ -0,0 +1,56 @@
|
||||||
|
package net.kemitix.thorp.lib;
|
||||||
|
|
||||||
|
import net.kemitix.thorp.config.Configuration;
|
||||||
|
import net.kemitix.thorp.domain.Filter;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
public interface Filters {
|
||||||
|
|
||||||
|
static boolean isIncluded(
|
||||||
|
Configuration configuration,
|
||||||
|
File file
|
||||||
|
) {
|
||||||
|
return isIncluded(file.toPath(), configuration.filters);
|
||||||
|
}
|
||||||
|
|
||||||
|
static boolean isIncluded(
|
||||||
|
Path path,
|
||||||
|
List<Filter> filters
|
||||||
|
) {
|
||||||
|
AtomicBoolean included = new AtomicBoolean(isIncludedByDefault(filters));
|
||||||
|
filters.forEach(
|
||||||
|
filter -> {
|
||||||
|
if (included.get()) {
|
||||||
|
if (filter instanceof Filter.Exclude) {
|
||||||
|
boolean excludedByFilter = isExcludedByFilter(path, filter);
|
||||||
|
included.set(!excludedByFilter);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (filter instanceof Filter.Include) {
|
||||||
|
boolean includedByFilter = isIncludedByFilter(path, filter);
|
||||||
|
included.set(includedByFilter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
return included.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
static boolean isIncludedByDefault(List<Filter> filters) {
|
||||||
|
return filters.isEmpty() ||
|
||||||
|
filters.stream()
|
||||||
|
.allMatch(filter ->
|
||||||
|
filter instanceof Filter.Exclude);
|
||||||
|
}
|
||||||
|
|
||||||
|
static boolean isIncludedByFilter(Path path, Filter filter) {
|
||||||
|
return filter.predicate().test(path.toFile().getPath());
|
||||||
|
}
|
||||||
|
static boolean isExcludedByFilter(Path path, Filter filter) {
|
||||||
|
return filter.predicate().test(path.toFile().getPath());
|
||||||
|
}
|
||||||
|
}
|
183
lib/src/main/java/net/kemitix/thorp/lib/LocalFileSystem.java
Normal file
183
lib/src/main/java/net/kemitix/thorp/lib/LocalFileSystem.java
Normal file
|
@ -0,0 +1,183 @@
|
||||||
|
package net.kemitix.thorp.lib;
|
||||||
|
|
||||||
|
import lombok.val;
|
||||||
|
import net.kemitix.thorp.config.Configuration;
|
||||||
|
import net.kemitix.thorp.domain.*;
|
||||||
|
import net.kemitix.thorp.domain.channel.Channel;
|
||||||
|
import net.kemitix.thorp.domain.channel.Listener;
|
||||||
|
import net.kemitix.thorp.domain.channel.Sink;
|
||||||
|
import net.kemitix.thorp.filesystem.FileSystem;
|
||||||
|
import net.kemitix.thorp.uishell.UIEvent;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.function.BiConsumer;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
public interface LocalFileSystem {
|
||||||
|
static List<StorageEvent> scanCopyUpload(
|
||||||
|
Configuration configuration,
|
||||||
|
Sink<UIEvent> uiSink,
|
||||||
|
RemoteObjects remoteObjects,
|
||||||
|
Archive archive
|
||||||
|
) throws InterruptedException {
|
||||||
|
AtomicInteger actionCounter = new AtomicInteger();
|
||||||
|
AtomicLong bytesCounter = new AtomicLong();
|
||||||
|
Map<MD5Hash, RemoteKey> uploads = new HashMap<>();
|
||||||
|
Deque<StorageEvent> events = new LinkedList<>();
|
||||||
|
|
||||||
|
Channel.<LocalFile>create("file-scanner")
|
||||||
|
.addListener(
|
||||||
|
listener(configuration, uiSink, remoteObjects, archive,
|
||||||
|
uploads, actionCounter, bytesCounter, events))
|
||||||
|
.run(sink -> FileScanner.scanSources(configuration, sink))
|
||||||
|
.start()
|
||||||
|
.waitForShutdown();
|
||||||
|
|
||||||
|
return new ArrayList<>(events);
|
||||||
|
}
|
||||||
|
|
||||||
|
static Listener<LocalFile> listener(
|
||||||
|
Configuration configuration,
|
||||||
|
Sink<UIEvent> uiSink,
|
||||||
|
RemoteObjects remoteObjects,
|
||||||
|
Archive archive,
|
||||||
|
Map<MD5Hash, RemoteKey> uploads,
|
||||||
|
AtomicInteger actionCounter,
|
||||||
|
AtomicLong bytesCounter,
|
||||||
|
Deque<StorageEvent> events
|
||||||
|
) {
|
||||||
|
val chooseAction = chooseAction(configuration, remoteObjects, uploads, uiSink);
|
||||||
|
val uiActionChosen = uiActionChosen(uiSink);
|
||||||
|
val uiActionFinished = uiActionFinished(uiSink, actionCounter, bytesCounter);
|
||||||
|
return localFile -> {
|
||||||
|
uiSink.accept(UIEvent.fileFound(localFile));
|
||||||
|
Action action = chooseAction.apply(localFile);
|
||||||
|
actionCounter.incrementAndGet();
|
||||||
|
bytesCounter.addAndGet(action.size);
|
||||||
|
uiActionChosen.accept(action);
|
||||||
|
SequencedAction sequencedAction =
|
||||||
|
SequencedAction.create(action, actionCounter.get());
|
||||||
|
StorageEvent event = archive.update(
|
||||||
|
configuration, uiSink, sequencedAction, bytesCounter.get());
|
||||||
|
events.addFirst(event);
|
||||||
|
uiActionFinished.accept(action, event);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
static BiConsumer<Action, StorageEvent> uiActionFinished(
|
||||||
|
Sink<UIEvent> uiSink,
|
||||||
|
AtomicInteger actionCounter,
|
||||||
|
AtomicLong bytesCounter
|
||||||
|
) {
|
||||||
|
return (action, event) -> uiSink.accept(
|
||||||
|
UIEvent.actionFinished(action,
|
||||||
|
actionCounter.get(), bytesCounter.get(), event));
|
||||||
|
}
|
||||||
|
|
||||||
|
static Consumer<Action> uiActionChosen(Sink<UIEvent> uiSink) {
|
||||||
|
return action -> uiSink.accept(UIEvent.actionChosen(action));
|
||||||
|
}
|
||||||
|
|
||||||
|
static Function<LocalFile, Action> chooseAction(
|
||||||
|
Configuration configuration,
|
||||||
|
RemoteObjects remoteObjects,
|
||||||
|
Map<MD5Hash, RemoteKey> uploads,
|
||||||
|
Sink<UIEvent> uiSink
|
||||||
|
) {
|
||||||
|
return localFile -> {
|
||||||
|
boolean remoteExists = remoteObjects.remoteKeyExists(localFile.remoteKey);
|
||||||
|
boolean remoteMatches = remoteObjects.remoteMatchesLocalFile(localFile);
|
||||||
|
Optional<Tuple<RemoteKey, MD5Hash>> remoteForHash =
|
||||||
|
remoteObjects.remoteHasHash(localFile.hashes);
|
||||||
|
Bucket bucket = configuration.bucket;
|
||||||
|
if (remoteExists && remoteMatches) {
|
||||||
|
return Action.doNothing(bucket, localFile.remoteKey, localFile.length);
|
||||||
|
}
|
||||||
|
if (remoteForHash.isPresent()) {
|
||||||
|
RemoteKey sourceKey = remoteForHash.get().a;
|
||||||
|
MD5Hash hash = remoteForHash.get().b;
|
||||||
|
return Action
|
||||||
|
.toCopy(bucket, sourceKey, hash,
|
||||||
|
localFile.remoteKey,
|
||||||
|
localFile.length);
|
||||||
|
} else if (localFile.hashes.values().stream()
|
||||||
|
.anyMatch(uploads::containsKey)) {
|
||||||
|
return doCopyWithPreviousUpload(localFile, bucket, uploads, uiSink);
|
||||||
|
}
|
||||||
|
return Action.toUpload(bucket, localFile, localFile.length);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
static Action doCopyWithPreviousUpload(
|
||||||
|
LocalFile localFile,
|
||||||
|
Bucket bucket,
|
||||||
|
Map<MD5Hash, RemoteKey> uploads,
|
||||||
|
Sink<UIEvent> uiSink
|
||||||
|
) {
|
||||||
|
return localFile.hashes
|
||||||
|
.values()
|
||||||
|
.stream()
|
||||||
|
.filter(uploads::containsKey)
|
||||||
|
.findFirst()
|
||||||
|
.map(hash -> {
|
||||||
|
uiSink.accept(UIEvent.awaitingAnotherUpload(localFile.remoteKey, hash));
|
||||||
|
Action action = Action.toCopy(bucket, uploads.get(hash), hash, localFile.remoteKey, localFile.length);
|
||||||
|
//FIXME - there is no waiting going on here!!
|
||||||
|
uiSink.accept(UIEvent.anotherUploadWaitComplete(action));
|
||||||
|
return action;
|
||||||
|
}).orElseGet(() ->
|
||||||
|
Action.toUpload(bucket, localFile, localFile.length));
|
||||||
|
}
|
||||||
|
|
||||||
|
static List<StorageEvent> scanDelete(
|
||||||
|
Configuration configuration,
|
||||||
|
Sink<UIEvent> uiSink,
|
||||||
|
RemoteObjects remoteData,
|
||||||
|
Archive archive
|
||||||
|
) throws InterruptedException {
|
||||||
|
AtomicInteger actionCounter = new AtomicInteger();
|
||||||
|
AtomicLong bytesCounter = new AtomicLong();
|
||||||
|
Deque<StorageEvent> events = new LinkedList<>();
|
||||||
|
Channel.<RemoteKey>create("delete-scan")
|
||||||
|
.addListener(deleteListener(
|
||||||
|
configuration, uiSink, archive,
|
||||||
|
actionCounter, bytesCounter, events))
|
||||||
|
.run(sink -> remoteData.byKey.keys().forEach(sink::accept))
|
||||||
|
.start()
|
||||||
|
.waitForShutdown();
|
||||||
|
return new ArrayList<>(events);
|
||||||
|
}
|
||||||
|
|
||||||
|
static Listener<RemoteKey> deleteListener(
|
||||||
|
Configuration configuration,
|
||||||
|
Sink<UIEvent> uiSink,
|
||||||
|
Archive archive,
|
||||||
|
AtomicInteger actionCounter,
|
||||||
|
AtomicLong bytesCounter,
|
||||||
|
Deque<StorageEvent> events
|
||||||
|
) {
|
||||||
|
return remoteKey -> {
|
||||||
|
uiSink.accept(UIEvent.keyFound(remoteKey));
|
||||||
|
val sources = configuration.sources;
|
||||||
|
val prefix = configuration.prefix;
|
||||||
|
val exists = FileSystem.hasLocalFile(sources, prefix, remoteKey);
|
||||||
|
if (!exists) {
|
||||||
|
actionCounter.incrementAndGet();
|
||||||
|
val bucket = configuration.bucket;
|
||||||
|
val action = Action.toDelete(bucket, remoteKey, 0L);
|
||||||
|
uiActionChosen(uiSink).accept(action);
|
||||||
|
bytesCounter.addAndGet(action.size);
|
||||||
|
val sequencedAction =
|
||||||
|
SequencedAction.create(action, actionCounter.get());
|
||||||
|
val event = archive.update(configuration, uiSink,
|
||||||
|
sequencedAction, 0L);
|
||||||
|
events.addFirst(event);
|
||||||
|
uiActionFinished(uiSink, actionCounter, bytesCounter)
|
||||||
|
.accept(action, event);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
20
lib/src/main/java/net/kemitix/thorp/lib/SequencedAction.java
Normal file
20
lib/src/main/java/net/kemitix/thorp/lib/SequencedAction.java
Normal file
|
@ -0,0 +1,20 @@
|
||||||
|
package net.kemitix.thorp.lib;
|
||||||
|
|
||||||
|
import net.kemitix.mon.TypeAlias;
|
||||||
|
import net.kemitix.thorp.domain.Action;
|
||||||
|
import net.kemitix.thorp.domain.Tuple;
|
||||||
|
|
||||||
|
public class SequencedAction extends TypeAlias<Tuple<Action, Integer>> {
|
||||||
|
private SequencedAction(Tuple<Action, Integer> value) {
|
||||||
|
super(value);
|
||||||
|
}
|
||||||
|
public static SequencedAction create(Action action, Integer index) {
|
||||||
|
return new SequencedAction(Tuple.create(action, index));
|
||||||
|
}
|
||||||
|
public Action action() {
|
||||||
|
return getValue().a;
|
||||||
|
}
|
||||||
|
public int index() {
|
||||||
|
return getValue().b;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,55 @@
|
||||||
|
package net.kemitix.thorp.lib;
|
||||||
|
|
||||||
|
import net.kemitix.thorp.config.Configuration;
|
||||||
|
import net.kemitix.thorp.domain.*;
|
||||||
|
import net.kemitix.thorp.domain.channel.Sink;
|
||||||
|
import net.kemitix.thorp.storage.Storage;
|
||||||
|
import net.kemitix.thorp.uishell.UIEvent;
|
||||||
|
import net.kemitix.thorp.uishell.UploadEventListener;
|
||||||
|
|
||||||
|
public class UnversionedMirrorArchive implements Archive {
|
||||||
|
|
||||||
|
public static Archive create() {
|
||||||
|
return new UnversionedMirrorArchive();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public StorageEvent update(
|
||||||
|
Configuration configuration,
|
||||||
|
Sink<UIEvent> uiSink,
|
||||||
|
SequencedAction sequencedAction,
|
||||||
|
long totalBytesSoFar
|
||||||
|
) {
|
||||||
|
Action action = sequencedAction.action();
|
||||||
|
int index = sequencedAction.index();
|
||||||
|
Bucket bucket = action.bucket;
|
||||||
|
if (action instanceof Action.ToUpload) {
|
||||||
|
LocalFile localFile = ((Action.ToUpload) action).localFile;
|
||||||
|
return Storage.getInstance()
|
||||||
|
.upload(localFile, bucket,
|
||||||
|
UploadEventListener.settings(
|
||||||
|
uiSink, localFile, index, totalBytesSoFar,
|
||||||
|
configuration.batchMode));
|
||||||
|
} else if(action instanceof Action.ToCopy) {
|
||||||
|
Action.ToCopy toCopy = (Action.ToCopy) action;
|
||||||
|
RemoteKey sourceKey = toCopy.sourceKey;
|
||||||
|
MD5Hash hash = toCopy.hash;
|
||||||
|
RemoteKey targetKey = toCopy.targetKey;
|
||||||
|
return Storage.getInstance()
|
||||||
|
.copy(bucket, sourceKey, hash, targetKey);
|
||||||
|
} else if(action instanceof Action.ToDelete) {
|
||||||
|
RemoteKey remoteKey = action.remoteKey;
|
||||||
|
try {
|
||||||
|
return Storage.getInstance().delete(bucket, remoteKey);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
return StorageEvent.errorEvent(
|
||||||
|
StorageEvent.ActionSummary.delete(remoteKey.key()),
|
||||||
|
remoteKey, e);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
RemoteKey remoteKey = action.remoteKey;
|
||||||
|
return StorageEvent.doNothingEvent(remoteKey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1,50 +0,0 @@
|
||||||
package net.kemitix.thorp.lib
|
|
||||||
|
|
||||||
import java.io.File
|
|
||||||
import java.nio.file.Path
|
|
||||||
|
|
||||||
import net.kemitix.thorp.config.Configuration
|
|
||||||
import net.kemitix.thorp.domain.Filter
|
|
||||||
import net.kemitix.thorp.domain.Filter.{Exclude, Include}
|
|
||||||
|
|
||||||
import scala.jdk.CollectionConverters._
|
|
||||||
|
|
||||||
object Filters {
|
|
||||||
|
|
||||||
def isIncluded(configuration: Configuration, file: File): Boolean =
|
|
||||||
isIncluded(file.toPath)(configuration.filters.asScala.toList)
|
|
||||||
|
|
||||||
def isIncluded(p: Path)(filters: List[Filter]): Boolean = {
|
|
||||||
sealed trait State
|
|
||||||
final case class Unknown() extends State
|
|
||||||
final case class Accepted() extends State
|
|
||||||
final case class Discarded() extends State
|
|
||||||
val excluded = isExcludedByFilter(p)(_)
|
|
||||||
val included = isIncludedByFilter(p)(_)
|
|
||||||
filters.foldRight(Unknown(): State)(
|
|
||||||
(filter, state) =>
|
|
||||||
(filter, state) match {
|
|
||||||
case (_, Accepted()) => Accepted()
|
|
||||||
case (_, Discarded()) => Discarded()
|
|
||||||
case (x: Exclude, _) if excluded(x) => Discarded()
|
|
||||||
case (i: Include, _) if included(i) => Accepted()
|
|
||||||
case _ => Unknown()
|
|
||||||
}
|
|
||||||
) match {
|
|
||||||
case Accepted() => true
|
|
||||||
case Discarded() => false
|
|
||||||
case Unknown() =>
|
|
||||||
filters.forall {
|
|
||||||
case _: Include => false
|
|
||||||
case _ => true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def isIncludedByFilter(path: Path)(filter: Filter): Boolean =
|
|
||||||
filter.predicate.test(path.toFile.getPath)
|
|
||||||
|
|
||||||
def isExcludedByFilter(path: Path)(filter: Filter): Boolean =
|
|
||||||
filter.predicate.test(path.toFile.getPath)
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,262 +0,0 @@
|
||||||
package net.kemitix.thorp.lib
|
|
||||||
|
|
||||||
import java.util
|
|
||||||
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
|
|
||||||
|
|
||||||
import net.kemitix.thorp.config.Configuration
|
|
||||||
import net.kemitix.thorp.domain.Channel.{Listener, Sink}
|
|
||||||
import net.kemitix.thorp.domain.{RemoteObjects, _}
|
|
||||||
import net.kemitix.thorp.filesystem.FileSystem
|
|
||||||
import net.kemitix.thorp.uishell.UIEvent
|
|
||||||
|
|
||||||
import scala.jdk.CollectionConverters._
|
|
||||||
import scala.jdk.OptionConverters._
|
|
||||||
|
|
||||||
trait LocalFileSystem {
|
|
||||||
|
|
||||||
def scanCopyUpload(configuration: Configuration,
|
|
||||||
uiSink: Channel.Sink[UIEvent],
|
|
||||||
remoteObjects: RemoteObjects,
|
|
||||||
archive: ThorpArchive): Seq[StorageEvent]
|
|
||||||
|
|
||||||
def scanDelete(configuration: Configuration,
|
|
||||||
uiSink: Channel.Sink[UIEvent],
|
|
||||||
remoteData: RemoteObjects,
|
|
||||||
archive: ThorpArchive): Seq[StorageEvent]
|
|
||||||
|
|
||||||
}
|
|
||||||
object LocalFileSystem extends LocalFileSystem {
|
|
||||||
|
|
||||||
override def scanCopyUpload(configuration: Configuration,
|
|
||||||
uiSink: Channel.Sink[UIEvent],
|
|
||||||
remoteObjects: RemoteObjects,
|
|
||||||
archive: ThorpArchive): Seq[StorageEvent] = {
|
|
||||||
|
|
||||||
val fileChannel: Channel[LocalFile] = Channel.create("files")
|
|
||||||
|
|
||||||
// state for the file receiver
|
|
||||||
val actionCounter = new AtomicInteger()
|
|
||||||
val bytesCounter = new AtomicLong()
|
|
||||||
val uploads = Map.empty[MD5Hash, RemoteKey]
|
|
||||||
val events = new util.LinkedList[StorageEvent]
|
|
||||||
|
|
||||||
fileChannel.addListener(
|
|
||||||
fileReceiver(
|
|
||||||
configuration,
|
|
||||||
uiSink,
|
|
||||||
remoteObjects,
|
|
||||||
archive,
|
|
||||||
uploads,
|
|
||||||
actionCounter,
|
|
||||||
bytesCounter,
|
|
||||||
events
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
fileChannel.run(
|
|
||||||
sink => FileScanner.scanSources(configuration, sink),
|
|
||||||
"scan-sources"
|
|
||||||
)
|
|
||||||
|
|
||||||
fileChannel.start()
|
|
||||||
fileChannel.waitForShutdown()
|
|
||||||
events.asScala.toList
|
|
||||||
}
|
|
||||||
|
|
||||||
override def scanDelete(configuration: Configuration,
|
|
||||||
uiSink: Channel.Sink[UIEvent],
|
|
||||||
remoteData: RemoteObjects,
|
|
||||||
archive: ThorpArchive): Seq[StorageEvent] = {
|
|
||||||
val deletionsChannel: Channel[RemoteKey] = Channel.create("deletions")
|
|
||||||
|
|
||||||
// state for the file receiver
|
|
||||||
val actionCounter = new AtomicInteger()
|
|
||||||
val bytesCounter = new AtomicLong()
|
|
||||||
val events = new util.LinkedList[StorageEvent]
|
|
||||||
|
|
||||||
deletionsChannel.addListener(
|
|
||||||
keyReceiver(
|
|
||||||
configuration,
|
|
||||||
uiSink,
|
|
||||||
archive,
|
|
||||||
actionCounter,
|
|
||||||
bytesCounter,
|
|
||||||
events
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
deletionsChannel.run(sink => {
|
|
||||||
remoteData.byKey.keys().forEach(key => sink.accept(key))
|
|
||||||
sink.shutdown()
|
|
||||||
}, "delete-source")
|
|
||||||
|
|
||||||
deletionsChannel.start()
|
|
||||||
deletionsChannel.waitForShutdown()
|
|
||||||
events.asScala.toList
|
|
||||||
}
|
|
||||||
|
|
||||||
private def fileReceiver(
|
|
||||||
configuration: Configuration,
|
|
||||||
uiSink: Channel.Sink[UIEvent],
|
|
||||||
remoteObjects: RemoteObjects,
|
|
||||||
archive: ThorpArchive,
|
|
||||||
uploads: Map[MD5Hash, RemoteKey],
|
|
||||||
actionCounter: AtomicInteger,
|
|
||||||
bytesCounter: AtomicLong,
|
|
||||||
events: util.Deque[StorageEvent]
|
|
||||||
): Listener[LocalFile] = { (localFile: LocalFile) =>
|
|
||||||
{
|
|
||||||
uiFileFound(uiSink)(localFile)
|
|
||||||
val action =
|
|
||||||
chooseAction(configuration, remoteObjects, uploads, uiSink)(localFile)
|
|
||||||
actionCounter.incrementAndGet()
|
|
||||||
bytesCounter.addAndGet(action.size)
|
|
||||||
uiActionChosen(uiSink)(action)
|
|
||||||
val sequencedAction = SequencedAction(action, actionCounter.get())
|
|
||||||
val event = archive
|
|
||||||
.update(configuration, uiSink, sequencedAction, bytesCounter.get)
|
|
||||||
events.addFirst(event)
|
|
||||||
uiActionFinished(uiSink)(
|
|
||||||
action,
|
|
||||||
actionCounter.get,
|
|
||||||
bytesCounter.get,
|
|
||||||
event
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private def uiActionChosen(
|
|
||||||
uiSink: Channel.Sink[UIEvent]
|
|
||||||
)(action: Action): Unit =
|
|
||||||
uiSink.accept(UIEvent.actionChosen(action))
|
|
||||||
|
|
||||||
private def uiActionFinished(uiSink: Channel.Sink[UIEvent])(
|
|
||||||
action: Action,
|
|
||||||
actionCounter: Int,
|
|
||||||
bytesCounter: Long,
|
|
||||||
event: StorageEvent
|
|
||||||
): Unit =
|
|
||||||
uiSink.accept(
|
|
||||||
UIEvent.actionFinished(action, actionCounter, bytesCounter, event)
|
|
||||||
)
|
|
||||||
|
|
||||||
private def uiFileFound(
|
|
||||||
uiSink: Channel.Sink[UIEvent]
|
|
||||||
)(localFile: LocalFile): Unit =
|
|
||||||
uiSink.accept(UIEvent.fileFound(localFile))
|
|
||||||
|
|
||||||
private def chooseAction(configuration: Configuration,
|
|
||||||
remoteObjects: RemoteObjects,
|
|
||||||
uploads: Map[MD5Hash, RemoteKey],
|
|
||||||
uiSink: Channel.Sink[UIEvent],
|
|
||||||
)(localFile: LocalFile): Action = {
|
|
||||||
val remoteExists = remoteObjects.remoteKeyExists(localFile.remoteKey)
|
|
||||||
val remoteMatches = remoteObjects.remoteMatchesLocalFile(localFile)
|
|
||||||
val remoteForHash = remoteObjects.remoteHasHash(localFile.hashes).toScala
|
|
||||||
val previous = uploads
|
|
||||||
val bucket = configuration.bucket
|
|
||||||
val action = if (remoteExists && remoteMatches) {
|
|
||||||
doNothing(localFile, bucket)
|
|
||||||
} else {
|
|
||||||
remoteForHash match {
|
|
||||||
case pair: Some[Tuple[RemoteKey, MD5Hash]] =>
|
|
||||||
val sourceKey = pair.value.a
|
|
||||||
val hash = pair.value.b
|
|
||||||
doCopy(localFile, bucket, sourceKey, hash)
|
|
||||||
case _ if matchesPreviousUpload(previous, localFile.hashes) =>
|
|
||||||
doCopyWithPreviousUpload(localFile, bucket, previous, uiSink)
|
|
||||||
case _ =>
|
|
||||||
doUpload(localFile, bucket)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
action
|
|
||||||
}
|
|
||||||
|
|
||||||
private def matchesPreviousUpload(previous: Map[MD5Hash, RemoteKey],
|
|
||||||
hashes: Hashes) =
|
|
||||||
hashes
|
|
||||||
.values()
|
|
||||||
.stream()
|
|
||||||
.anyMatch({ hash =>
|
|
||||||
previous.contains(hash)
|
|
||||||
})
|
|
||||||
|
|
||||||
private def doNothing(localFile: LocalFile, bucket: Bucket) =
|
|
||||||
Action.doNothing(bucket, localFile.remoteKey, localFile.length)
|
|
||||||
|
|
||||||
private def doCopy(localFile: LocalFile,
|
|
||||||
bucket: Bucket,
|
|
||||||
sourceKey: RemoteKey,
|
|
||||||
hash: MD5Hash) =
|
|
||||||
Action
|
|
||||||
.toCopy(bucket, sourceKey, hash, localFile.remoteKey, localFile.length)
|
|
||||||
|
|
||||||
private def doCopyWithPreviousUpload(localFile: LocalFile,
|
|
||||||
bucket: Bucket,
|
|
||||||
previous: Map[MD5Hash, RemoteKey],
|
|
||||||
uiSink: Channel.Sink[UIEvent],
|
|
||||||
) = {
|
|
||||||
localFile.hashes
|
|
||||||
.values()
|
|
||||||
.stream()
|
|
||||||
.filter({ hash =>
|
|
||||||
previous.contains(hash)
|
|
||||||
})
|
|
||||||
.findFirst()
|
|
||||||
.toScala
|
|
||||||
.map({ hash =>
|
|
||||||
{
|
|
||||||
uiSink
|
|
||||||
.accept(UIEvent.awaitingAnotherUpload(localFile.remoteKey, hash))
|
|
||||||
val action = Action.toCopy(
|
|
||||||
bucket,
|
|
||||||
previous(hash),
|
|
||||||
hash,
|
|
||||||
localFile.remoteKey,
|
|
||||||
localFile.length
|
|
||||||
)
|
|
||||||
uiSink.accept(UIEvent.anotherUploadWaitComplete(action))
|
|
||||||
action
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.getOrElse(doUpload(localFile, bucket))
|
|
||||||
}
|
|
||||||
|
|
||||||
private def doUpload(localFile: LocalFile, bucket: Bucket) =
|
|
||||||
Action.toUpload(bucket, localFile, localFile.length)
|
|
||||||
|
|
||||||
def keyReceiver(configuration: Configuration,
|
|
||||||
uiSink: Channel.Sink[UIEvent],
|
|
||||||
archive: ThorpArchive,
|
|
||||||
actionCounter: AtomicInteger,
|
|
||||||
bytesCounter: AtomicLong,
|
|
||||||
events: util.Deque[StorageEvent]): Listener[RemoteKey] = {
|
|
||||||
(remoteKey: RemoteKey) =>
|
|
||||||
{
|
|
||||||
uiKeyFound(uiSink)(remoteKey)
|
|
||||||
val sources = configuration.sources
|
|
||||||
val prefix = configuration.prefix
|
|
||||||
val exists = FileSystem.hasLocalFile(sources, prefix, remoteKey)
|
|
||||||
if (!exists) {
|
|
||||||
actionCounter.incrementAndGet()
|
|
||||||
val bucket = configuration.bucket
|
|
||||||
val action = Action.toDelete(bucket, remoteKey, 0L)
|
|
||||||
uiActionChosen(uiSink)(action)
|
|
||||||
bytesCounter.addAndGet(action.size)
|
|
||||||
val sequencedAction = SequencedAction(action, actionCounter.get())
|
|
||||||
val event = archive.update(configuration, uiSink, sequencedAction, 0L)
|
|
||||||
events.addFirst(event)
|
|
||||||
uiActionFinished(uiSink)(
|
|
||||||
action,
|
|
||||||
actionCounter.get(),
|
|
||||||
bytesCounter.get(),
|
|
||||||
event
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private def uiKeyFound(uiSink: Sink[UIEvent])(remoteKey: RemoteKey): Unit =
|
|
||||||
uiSink.accept(UIEvent.keyFound(remoteKey))
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,8 +0,0 @@
|
||||||
package net.kemitix.thorp.lib
|
|
||||||
|
|
||||||
import net.kemitix.thorp.domain.Action
|
|
||||||
|
|
||||||
final case class SequencedAction(
|
|
||||||
action: Action,
|
|
||||||
index: Int
|
|
||||||
)
|
|
|
@ -1,44 +0,0 @@
|
||||||
package net.kemitix.thorp.lib
|
|
||||||
|
|
||||||
import net.kemitix.thorp.config.Configuration
|
|
||||||
import net.kemitix.thorp.console._
|
|
||||||
import net.kemitix.thorp.domain.StorageEvent._
|
|
||||||
import net.kemitix.thorp.domain.{Channel, StorageEvent}
|
|
||||||
import net.kemitix.thorp.uishell.UIEvent
|
|
||||||
|
|
||||||
trait ThorpArchive {
|
|
||||||
|
|
||||||
def update(configuration: Configuration,
|
|
||||||
uiSink: Channel.Sink[UIEvent],
|
|
||||||
sequencedAction: SequencedAction,
|
|
||||||
totalBytesSoFar: Long): StorageEvent
|
|
||||||
|
|
||||||
def logEvent(configuration: Configuration,
|
|
||||||
event: StorageEvent): StorageEvent = {
|
|
||||||
val batchMode = configuration.batchMode
|
|
||||||
event match {
|
|
||||||
case uploadEvent: UploadEvent =>
|
|
||||||
val remoteKey = uploadEvent.remoteKey
|
|
||||||
Console.putMessageLnB(ConsoleOut.uploadComplete(remoteKey), batchMode)
|
|
||||||
case copyEvent: CopyEvent =>
|
|
||||||
val sourceKey = copyEvent.sourceKey
|
|
||||||
val targetKey = copyEvent.targetKey
|
|
||||||
Console.putMessageLnB(
|
|
||||||
ConsoleOut.copyComplete(sourceKey, targetKey),
|
|
||||||
batchMode
|
|
||||||
)
|
|
||||||
case deleteEvent: DeleteEvent =>
|
|
||||||
val remoteKey = deleteEvent.remoteKey
|
|
||||||
Console.putMessageLnB(ConsoleOut.deleteComplete(remoteKey), batchMode)
|
|
||||||
case errorEvent: ErrorEvent =>
|
|
||||||
val action = errorEvent.action
|
|
||||||
val e = errorEvent.e
|
|
||||||
Console.putMessageLnB(
|
|
||||||
ConsoleOut.errorQueueEventOccurred(action, e),
|
|
||||||
batchMode
|
|
||||||
)
|
|
||||||
}
|
|
||||||
event
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,92 +0,0 @@
|
||||||
package net.kemitix.thorp.lib
|
|
||||||
|
|
||||||
import net.kemitix.thorp.config.Configuration
|
|
||||||
import net.kemitix.thorp.domain.Action.{ToCopy, ToDelete, ToUpload}
|
|
||||||
import net.kemitix.thorp.domain.StorageEvent.ActionSummary
|
|
||||||
import net.kemitix.thorp.domain._
|
|
||||||
import net.kemitix.thorp.storage.Storage
|
|
||||||
import net.kemitix.thorp.uishell.{UIEvent, UploadEventListener}
|
|
||||||
|
|
||||||
trait UnversionedMirrorArchive extends ThorpArchive {
|
|
||||||
|
|
||||||
override def update(configuration: Configuration,
|
|
||||||
uiSink: Channel.Sink[UIEvent],
|
|
||||||
sequencedAction: SequencedAction,
|
|
||||||
totalBytesSoFar: Long): StorageEvent = {
|
|
||||||
val action = sequencedAction.action
|
|
||||||
val index = sequencedAction.index
|
|
||||||
val bucket = action.bucket
|
|
||||||
action match {
|
|
||||||
case upload: ToUpload =>
|
|
||||||
val localFile = upload.localFile
|
|
||||||
doUpload(
|
|
||||||
configuration,
|
|
||||||
uiSink,
|
|
||||||
index,
|
|
||||||
totalBytesSoFar,
|
|
||||||
bucket,
|
|
||||||
localFile
|
|
||||||
)
|
|
||||||
case toCopy: ToCopy =>
|
|
||||||
val sourceKey = toCopy.sourceKey
|
|
||||||
val hash = toCopy.hash
|
|
||||||
val targetKey = toCopy.targetKey
|
|
||||||
Storage
|
|
||||||
.getInstance()
|
|
||||||
.copy(bucket, sourceKey, hash, targetKey)
|
|
||||||
case toDelete: ToDelete =>
|
|
||||||
val remoteKey = toDelete.remoteKey
|
|
||||||
try {
|
|
||||||
Storage.getInstance().delete(bucket, remoteKey)
|
|
||||||
} catch {
|
|
||||||
case e: Throwable =>
|
|
||||||
StorageEvent.errorEvent(
|
|
||||||
ActionSummary.delete(remoteKey.key()),
|
|
||||||
remoteKey,
|
|
||||||
e
|
|
||||||
);
|
|
||||||
}
|
|
||||||
case doNothing: Action.DoNothing =>
|
|
||||||
val remoteKey = doNothing.remoteKey
|
|
||||||
StorageEvent.doNothingEvent(remoteKey)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private def doUpload(configuration: Configuration,
|
|
||||||
uiSink: Channel.Sink[UIEvent],
|
|
||||||
index: Int,
|
|
||||||
totalBytesSoFar: Long,
|
|
||||||
bucket: Bucket,
|
|
||||||
localFile: LocalFile) =
|
|
||||||
Storage
|
|
||||||
.getInstance()
|
|
||||||
.upload(
|
|
||||||
localFile,
|
|
||||||
bucket,
|
|
||||||
listenerSettings(
|
|
||||||
configuration,
|
|
||||||
uiSink,
|
|
||||||
index,
|
|
||||||
totalBytesSoFar,
|
|
||||||
bucket,
|
|
||||||
localFile
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
private def listenerSettings(configuration: Configuration,
|
|
||||||
uiSink: Channel.Sink[UIEvent],
|
|
||||||
index: Int,
|
|
||||||
totalBytesSoFar: Long,
|
|
||||||
bucket: Bucket,
|
|
||||||
localFile: LocalFile) =
|
|
||||||
UploadEventListener.settings(
|
|
||||||
uiSink,
|
|
||||||
localFile,
|
|
||||||
index,
|
|
||||||
totalBytesSoFar,
|
|
||||||
configuration.batchMode
|
|
||||||
)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
object UnversionedMirrorArchive extends UnversionedMirrorArchive
|
|
49
lib/src/test/java/net/kemitix/thorp/lib/FileScannerTest.java
Normal file
49
lib/src/test/java/net/kemitix/thorp/lib/FileScannerTest.java
Normal file
|
@ -0,0 +1,49 @@
|
||||||
|
package net.kemitix.thorp.lib;
|
||||||
|
|
||||||
|
import net.kemitix.thorp.config.Configuration;
|
||||||
|
import net.kemitix.thorp.domain.*;
|
||||||
|
import net.kemitix.thorp.domain.channel.Channel;
|
||||||
|
import net.kemitix.thorp.filesystem.Resource;
|
||||||
|
import org.assertj.core.api.WithAssertions;
|
||||||
|
import org.junit.jupiter.api.DisplayName;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class FileScannerTest
|
||||||
|
implements WithAssertions {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("scan resources as source")
|
||||||
|
public void scanResources() throws InterruptedException {
|
||||||
|
//given
|
||||||
|
File source = Resource.select(this, "upload").toFile();
|
||||||
|
Configuration configuration = Configuration.create()
|
||||||
|
.withSources(Sources.create(
|
||||||
|
Collections.singletonList(
|
||||||
|
source.toPath())));
|
||||||
|
List<LocalFile> localFiles = new ArrayList<>();
|
||||||
|
//when
|
||||||
|
Channel.<LocalFile>create("test-file-scan")
|
||||||
|
.addListener(localFiles::add)
|
||||||
|
.run(sink -> FileScanner.scanSources(configuration, sink))
|
||||||
|
.start()
|
||||||
|
.waitForShutdown();
|
||||||
|
//then
|
||||||
|
File rootFile = source.toPath().resolve("root-file").toFile();
|
||||||
|
File leafFile = source.toPath().resolve("subdir/leaf-file").toFile();
|
||||||
|
Hashes rootHashes = Hashes.create(HashType.MD5, MD5HashData.Root.hash);
|
||||||
|
Hashes leafHashes = Hashes.create(HashType.MD5, MD5HashData.Leaf.hash);
|
||||||
|
RemoteKey rootKey = RemoteKey.create("root-file");
|
||||||
|
RemoteKey leafKey = RemoteKey.create("subdir/leaf-file");
|
||||||
|
long rootLength = 55L;
|
||||||
|
long leafLength = 58L;
|
||||||
|
assertThat(localFiles)
|
||||||
|
.containsExactlyInAnyOrder(
|
||||||
|
LocalFile.create(rootFile, source, rootHashes, rootKey, rootLength),
|
||||||
|
LocalFile.create(leafFile, source, leafHashes, leafKey, leafLength));
|
||||||
|
}
|
||||||
|
}
|
190
lib/src/test/java/net/kemitix/thorp/lib/FiltersTest.java
Normal file
190
lib/src/test/java/net/kemitix/thorp/lib/FiltersTest.java
Normal file
|
@ -0,0 +1,190 @@
|
||||||
|
package net.kemitix.thorp.lib;
|
||||||
|
|
||||||
|
import net.kemitix.thorp.domain.Filter;
|
||||||
|
import org.assertj.core.api.WithAssertions;
|
||||||
|
import org.junit.jupiter.api.DisplayName;
|
||||||
|
import org.junit.jupiter.api.Nested;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
public class FiltersTest
|
||||||
|
implements WithAssertions {
|
||||||
|
|
||||||
|
private final String path1 = "a-file";
|
||||||
|
private final String path2 = "another-file.txt";
|
||||||
|
private final String path3 = "/path/to/a/file.txt";
|
||||||
|
private final String path4 = "/path/to/another/file";
|
||||||
|
private final String path5 = "/home/pcampbell/repos/kemitix/s3thorp";
|
||||||
|
private final String path6 = "/kemitix/s3thorp/upload/subdir";
|
||||||
|
private final List<Path> paths =
|
||||||
|
Stream.of(path1, path2, path3, path4, path5, path6)
|
||||||
|
.map(Paths::get)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
@Nested
|
||||||
|
@DisplayName("include")
|
||||||
|
public class IncludeTests {
|
||||||
|
@Test
|
||||||
|
@DisplayName("default filter")
|
||||||
|
public void defaultFilter() {
|
||||||
|
//given
|
||||||
|
List<Filter> filters = Collections.singletonList(
|
||||||
|
Filter.Include.all());
|
||||||
|
//then
|
||||||
|
assertThat(paths)
|
||||||
|
.allMatch(path ->
|
||||||
|
Filters.isIncluded(path, filters));
|
||||||
|
}
|
||||||
|
@Nested
|
||||||
|
@DisplayName("directory exact match")
|
||||||
|
public class DirectoryExactMatchTests {
|
||||||
|
List<Filter> filters = Collections.singletonList(
|
||||||
|
Filter.include("/upload/subdir/"));
|
||||||
|
@Test
|
||||||
|
@DisplayName("include matching directory")
|
||||||
|
public void includeMatchingDirectory() {
|
||||||
|
//given
|
||||||
|
Path path = Paths.get("/upload/subdir/leaf-dir");
|
||||||
|
//when
|
||||||
|
boolean included = Filters.isIncluded(path, filters);
|
||||||
|
//then
|
||||||
|
assertThat(included).isTrue();
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
@DisplayName("exclude non-matching file")
|
||||||
|
public void excludeNonMatchingFile() {
|
||||||
|
//given
|
||||||
|
Path path = Paths.get("/upload/other-file");
|
||||||
|
//when
|
||||||
|
boolean included = Filters.isIncluded(path, filters);
|
||||||
|
//then
|
||||||
|
assertThat(included).isFalse();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@Nested
|
||||||
|
@DisplayName("file partial match 'root'")
|
||||||
|
public class FilePartialMatchRootTests {
|
||||||
|
List<Filter> filters = Collections.singletonList(
|
||||||
|
Filter.include("root"));
|
||||||
|
@Test
|
||||||
|
@DisplayName("include matching file")
|
||||||
|
public void includeMatchingFile() {
|
||||||
|
//given
|
||||||
|
Path path = Paths.get("/upload/root-file");
|
||||||
|
//when
|
||||||
|
boolean included = Filters.isIncluded(path, filters);
|
||||||
|
//then
|
||||||
|
assertThat(included).isTrue();
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
@DisplayName("exclude non-matching file (1)")
|
||||||
|
public void excludeNonMatchingFile1() {
|
||||||
|
//given
|
||||||
|
Path path = Paths.get("/test-file-for-hash.txt");
|
||||||
|
//when
|
||||||
|
boolean included = Filters.isIncluded(path, filters);
|
||||||
|
//then
|
||||||
|
assertThat(included).isFalse();
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
@DisplayName("exclude non-matching file (2)")
|
||||||
|
public void excludeNonMatchingFile2() {
|
||||||
|
//given
|
||||||
|
Path path = Paths.get("/upload/subdir/lead-file");
|
||||||
|
//when
|
||||||
|
boolean included = Filters.isIncluded(path, filters);
|
||||||
|
//then
|
||||||
|
assertThat(included).isFalse();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@Nested
|
||||||
|
@DisplayName("exclude")
|
||||||
|
public class ExcludeTests {
|
||||||
|
@Nested
|
||||||
|
@DisplayName("directory exact match exclude")
|
||||||
|
public class DirectoryMatchTests {
|
||||||
|
List<Filter> filters = Collections.singletonList(
|
||||||
|
Filter.exclude("/upload/subdir/"));
|
||||||
|
@Test
|
||||||
|
@DisplayName("exclude matching directory")
|
||||||
|
public void excludeDirectory() {
|
||||||
|
//given
|
||||||
|
Path path = Paths.get("/upload/subdir/leaf-file");
|
||||||
|
//when
|
||||||
|
boolean included = Filters.isIncluded(path, filters);
|
||||||
|
//then
|
||||||
|
assertThat(included).isFalse();
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
@DisplayName("include non-matching file")
|
||||||
|
public void includeFile() {
|
||||||
|
//given
|
||||||
|
Path path = Paths.get("/upload/other-file");
|
||||||
|
//when
|
||||||
|
boolean included = Filters.isIncluded(path, filters);
|
||||||
|
//then
|
||||||
|
assertThat(included).isTrue();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@Nested
|
||||||
|
@DisplayName("file partial match")
|
||||||
|
public class PartialMatchTests {
|
||||||
|
List<Filter> filters = Collections.singletonList(
|
||||||
|
Filter.exclude("root"));
|
||||||
|
@Test
|
||||||
|
@DisplayName("exclude matching file")
|
||||||
|
public void excludeFile() {
|
||||||
|
//given
|
||||||
|
Path path = Paths.get("/upload/root-file");
|
||||||
|
//when
|
||||||
|
boolean included = Filters.isIncluded(path, filters);
|
||||||
|
//then
|
||||||
|
assertThat(included).isFalse();
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
@DisplayName("include non-matching file (1)")
|
||||||
|
public void includeFile1() {
|
||||||
|
//given
|
||||||
|
Path path = Paths.get("/test-file-for-hash.txt");
|
||||||
|
//when
|
||||||
|
boolean included = Filters.isIncluded(path, filters);
|
||||||
|
//then
|
||||||
|
assertThat(included).isTrue();
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
@DisplayName("include non-matching file (2)")
|
||||||
|
public void includeFile2() {
|
||||||
|
//given
|
||||||
|
Path path = Paths.get("/upload/subdir/leaf-file");
|
||||||
|
//when
|
||||||
|
boolean included = Filters.isIncluded(path, filters);
|
||||||
|
//then
|
||||||
|
assertThat(included).isTrue();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@Nested
|
||||||
|
@DisplayName("isIncluded")
|
||||||
|
public class IsIncludedTests {
|
||||||
|
List<Path> invoke(List<Filter> filters) {
|
||||||
|
return paths.stream()
|
||||||
|
.filter(path ->
|
||||||
|
Filters.isIncluded(path, filters))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("when no filters then accepts all paths")
|
||||||
|
public void whenNoFilters_thenAcceptAll() {
|
||||||
|
assertThat(invoke(Collections.emptyList()))
|
||||||
|
.containsExactlyElementsOf(paths);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
426
lib/src/test/java/net/kemitix/thorp/lib/LocalFileSystemTest.java
Normal file
426
lib/src/test/java/net/kemitix/thorp/lib/LocalFileSystemTest.java
Normal file
|
@ -0,0 +1,426 @@
|
||||||
|
package net.kemitix.thorp.lib;
|
||||||
|
|
||||||
|
import net.kemitix.thorp.config.*;
|
||||||
|
import net.kemitix.thorp.domain.*;
|
||||||
|
import net.kemitix.thorp.domain.channel.Channel;
|
||||||
|
import net.kemitix.thorp.domain.channel.Listener;
|
||||||
|
import net.kemitix.thorp.domain.channel.Sink;
|
||||||
|
import net.kemitix.thorp.filesystem.Resource;
|
||||||
|
import net.kemitix.thorp.uishell.UIEvent;
|
||||||
|
import org.assertj.core.api.WithAssertions;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.DisplayName;
|
||||||
|
import org.junit.jupiter.api.Nested;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
public class LocalFileSystemTest
|
||||||
|
implements WithAssertions {
|
||||||
|
|
||||||
|
private final Resource source = Resource.select(this, "upload");
|
||||||
|
private final Path sourcePath = source.toPath();
|
||||||
|
private final ConfigOption sourceOption = ConfigOption.source(sourcePath);
|
||||||
|
private final Bucket bucket = Bucket.named("bucket");
|
||||||
|
private final ConfigOption bucketOption = ConfigOption.bucket(bucket.name());
|
||||||
|
private final ConfigOptions configOptions = ConfigOptions.create(
|
||||||
|
Arrays.asList(
|
||||||
|
sourceOption,
|
||||||
|
bucketOption,
|
||||||
|
ConfigOption.ignoreGlobalOptions(),
|
||||||
|
ConfigOption.ignoreUserOptions()
|
||||||
|
));
|
||||||
|
|
||||||
|
private final AtomicReference<List<UIEvent>> uiEvents = new AtomicReference<>(Collections.emptyList());
|
||||||
|
private final AtomicReference<List<SequencedAction>> actions = new AtomicReference<>(Collections.emptyList());
|
||||||
|
|
||||||
|
private final Archive archive =
|
||||||
|
(configuration, uiSink, sequencedAction, totalBytesSoFar) -> {
|
||||||
|
actions.updateAndGet(l -> {
|
||||||
|
List<SequencedAction> list = new ArrayList<>();
|
||||||
|
list.add(sequencedAction);
|
||||||
|
list.addAll(l);
|
||||||
|
return list;
|
||||||
|
});
|
||||||
|
return StorageEvent.doNothingEvent(
|
||||||
|
sequencedAction.action().remoteKey);
|
||||||
|
};
|
||||||
|
|
||||||
|
@Nested
|
||||||
|
@DisplayName("scanCopyUpload")
|
||||||
|
public class ScanCopyUploadTests {
|
||||||
|
@Nested
|
||||||
|
@DisplayName("where remote is empty")
|
||||||
|
public class WhereRemoteEmptyTests {
|
||||||
|
RemoteObjects remoteObjects = RemoteObjects.empty;
|
||||||
|
Configuration configuration = ConfigurationBuilder.buildConfig(configOptions);
|
||||||
|
List<UIEvent> uiEventList = new ArrayList<>();
|
||||||
|
Channel<UIEvent> uiSink = Channel.<UIEvent>create("ui-test")
|
||||||
|
.addListener(uiEventList::add)
|
||||||
|
.start();
|
||||||
|
List<StorageEvent> storageEvents;
|
||||||
|
|
||||||
|
public WhereRemoteEmptyTests() throws IOException, ConfigValidationException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setUp() throws InterruptedException {
|
||||||
|
storageEvents = LocalFileSystem.scanCopyUpload(
|
||||||
|
configuration, uiSink, remoteObjects, archive);
|
||||||
|
uiSink.shutdownNow();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("update archive with all files")
|
||||||
|
public void uploadArchive() {
|
||||||
|
assertThat(storageEvents).hasSize(2);
|
||||||
|
assertThat(actions.get().stream()
|
||||||
|
.map(SequencedAction::action))
|
||||||
|
.allSatisfy(action ->
|
||||||
|
assertThat(action)
|
||||||
|
.isInstanceOf(Action.ToUpload.class));
|
||||||
|
assertThat(actions.get().stream()
|
||||||
|
.map(SequencedAction::action)
|
||||||
|
.map(action -> action.remoteKey))
|
||||||
|
.containsExactlyInAnyOrder(
|
||||||
|
MD5HashData.Root.remoteKey,
|
||||||
|
MD5HashData.Leaf.remoteKey
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("update ui with all files")
|
||||||
|
public void updateUI() {
|
||||||
|
assertThat(uiEventSummary(uiEventList))
|
||||||
|
.hasSize(6)
|
||||||
|
.contains(
|
||||||
|
"file found : root-file",
|
||||||
|
"action chosen : root-file : ToUpload",
|
||||||
|
"action finished : root-file : ToUpload")
|
||||||
|
.contains(
|
||||||
|
"file found : subdir/leaf-file",
|
||||||
|
"action chosen : subdir/leaf-file : ToUpload",
|
||||||
|
"action finished : subdir/leaf-file : ToUpload");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Nested
|
||||||
|
@DisplayName("where remote has all")
|
||||||
|
public class WhereRemoteHasAllTests {
|
||||||
|
Map<MD5Hash, RemoteKey> hashToKey = new HashMap<>();
|
||||||
|
Map<RemoteKey, MD5Hash> keyToHash = new HashMap<>();
|
||||||
|
RemoteObjects remoteObjects;
|
||||||
|
Configuration configuration = ConfigurationBuilder.buildConfig(configOptions);
|
||||||
|
List<UIEvent> uiEventList = new ArrayList<>();
|
||||||
|
Channel<UIEvent> uiSink = Channel.<UIEvent>create("ui-test")
|
||||||
|
.addListener(uiEventList::add)
|
||||||
|
.start();
|
||||||
|
List<StorageEvent> storageEvents;
|
||||||
|
|
||||||
|
public WhereRemoteHasAllTests() throws IOException, ConfigValidationException {
|
||||||
|
hashToKey.put(MD5HashData.Root.hash, MD5HashData.Root.remoteKey);
|
||||||
|
hashToKey.put(MD5HashData.Leaf.hash, MD5HashData.Leaf.remoteKey);
|
||||||
|
keyToHash.put(MD5HashData.Root.remoteKey, MD5HashData.Root.hash);
|
||||||
|
keyToHash.put(MD5HashData.Leaf.remoteKey, MD5HashData.Leaf.hash);
|
||||||
|
remoteObjects = RemoteObjects.create(
|
||||||
|
MapView.of(hashToKey).asMap(),
|
||||||
|
MapView.of(keyToHash).asMap());
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setUp() throws InterruptedException {
|
||||||
|
storageEvents = LocalFileSystem.scanCopyUpload(
|
||||||
|
configuration, uiSink, remoteObjects, archive);
|
||||||
|
uiSink.shutdownNow();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("do nothing for all files")
|
||||||
|
public void doNothing() {
|
||||||
|
assertThat(storageEvents).hasSize(2);
|
||||||
|
assertThat(actions.get().stream()
|
||||||
|
.map(SequencedAction::action))
|
||||||
|
.allSatisfy(action ->
|
||||||
|
assertThat(action)
|
||||||
|
.isInstanceOf(Action.DoNothing.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("update ui with do nothing")
|
||||||
|
public void updateUI() {
|
||||||
|
assertThat(uiEventSummary(uiEventList))
|
||||||
|
.hasSize(6)
|
||||||
|
.contains(
|
||||||
|
"file found : root-file",
|
||||||
|
"action chosen : root-file : DoNothing",
|
||||||
|
"action finished : root-file : DoNothing")
|
||||||
|
.contains(
|
||||||
|
"file found : subdir/leaf-file",
|
||||||
|
"action chosen : subdir/leaf-file : DoNothing",
|
||||||
|
"action finished : subdir/leaf-file : DoNothing");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Nested
|
||||||
|
@DisplayName("where remote has some")
|
||||||
|
public class WhereRemoteHasSomeTests {
|
||||||
|
Map<MD5Hash, RemoteKey> hashToKey = new HashMap<>();
|
||||||
|
Map<RemoteKey, MD5Hash> keyToHash = new HashMap<>();
|
||||||
|
RemoteObjects remoteObjects;
|
||||||
|
Configuration configuration = ConfigurationBuilder.buildConfig(configOptions);
|
||||||
|
List<UIEvent> uiEventList = new ArrayList<>();
|
||||||
|
Channel<UIEvent> uiSink = Channel.<UIEvent>create("ui-test")
|
||||||
|
.addListener(uiEventList::add)
|
||||||
|
.start();
|
||||||
|
List<StorageEvent> storageEvents;
|
||||||
|
|
||||||
|
public WhereRemoteHasSomeTests() throws IOException, ConfigValidationException {
|
||||||
|
hashToKey.put(MD5HashData.Root.hash, MD5HashData.Root.remoteKey);
|
||||||
|
keyToHash.put(MD5HashData.Root.remoteKey, MD5HashData.Root.hash);
|
||||||
|
remoteObjects = RemoteObjects.create(
|
||||||
|
MapView.of(hashToKey).asMap(),
|
||||||
|
MapView.of(keyToHash).asMap());
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setUp() throws InterruptedException {
|
||||||
|
storageEvents = LocalFileSystem.scanCopyUpload(
|
||||||
|
configuration, uiSink, remoteObjects, archive);
|
||||||
|
uiSink.shutdownNow();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("do nothing for some, upload for others")
|
||||||
|
public void doNothingAnUpload() {
|
||||||
|
assertThat(storageEvents).hasSize(2);
|
||||||
|
assertThat(actions.get().stream()
|
||||||
|
.map(SequencedAction::action))
|
||||||
|
.filteredOn(action -> action instanceof Action.DoNothing)
|
||||||
|
.hasSize(1);
|
||||||
|
assertThat(actions.get().stream()
|
||||||
|
.map(SequencedAction::action))
|
||||||
|
.filteredOn(action -> action instanceof Action.ToUpload)
|
||||||
|
.hasSize(1);
|
||||||
|
assertThat(actions.get().stream()
|
||||||
|
.map(SequencedAction::action)
|
||||||
|
.map(action -> action.remoteKey))
|
||||||
|
.containsExactlyInAnyOrder(
|
||||||
|
MD5HashData.Root.remoteKey,
|
||||||
|
MD5HashData.Leaf.remoteKey
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("update ui with do nothing")
|
||||||
|
public void updateUI() {
|
||||||
|
assertThat(uiEventSummary(uiEventList))
|
||||||
|
.hasSize(6)
|
||||||
|
.contains(
|
||||||
|
"file found : root-file",
|
||||||
|
"action chosen : root-file : DoNothing",
|
||||||
|
"action finished : root-file : DoNothing")
|
||||||
|
.contains(
|
||||||
|
"file found : subdir/leaf-file",
|
||||||
|
"action chosen : subdir/leaf-file : ToUpload",
|
||||||
|
"action finished : subdir/leaf-file : ToUpload");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Nested
|
||||||
|
@DisplayName("where file has been renamed")
|
||||||
|
public class WhereFileRenamedTests {
|
||||||
|
Map<MD5Hash, RemoteKey> hashToKey = new HashMap<>();
|
||||||
|
Map<RemoteKey, MD5Hash> keyToHash = new HashMap<>();
|
||||||
|
RemoteObjects remoteObjects;
|
||||||
|
Configuration configuration = ConfigurationBuilder.buildConfig(configOptions);
|
||||||
|
List<UIEvent> uiEventList = new ArrayList<>();
|
||||||
|
Channel<UIEvent> uiSink = Channel.<UIEvent>create("ui-test")
|
||||||
|
.addListener(uiEventList::add)
|
||||||
|
.start();
|
||||||
|
List<StorageEvent> storageEvents;
|
||||||
|
|
||||||
|
public WhereFileRenamedTests() throws IOException, ConfigValidationException {
|
||||||
|
RemoteKey otherKey = RemoteKey.create("/old-filename");
|
||||||
|
hashToKey.put(MD5HashData.Root.hash, otherKey);
|
||||||
|
hashToKey.put(MD5HashData.Leaf.hash, MD5HashData.Leaf.remoteKey);
|
||||||
|
keyToHash.put(otherKey, MD5HashData.Root.hash);
|
||||||
|
keyToHash.put(MD5HashData.Leaf.remoteKey, MD5HashData.Leaf.hash);
|
||||||
|
remoteObjects = RemoteObjects.create(
|
||||||
|
MapView.of(hashToKey).asMap(),
|
||||||
|
MapView.of(keyToHash).asMap());
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setUp() throws InterruptedException {
|
||||||
|
storageEvents = LocalFileSystem.scanCopyUpload(
|
||||||
|
configuration, uiSink, remoteObjects, archive);
|
||||||
|
uiSink.shutdownNow();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("copy")
|
||||||
|
public void copy() {
|
||||||
|
assertThat(storageEvents).hasSize(2);
|
||||||
|
assertThat(actions.get().stream()
|
||||||
|
.map(SequencedAction::action))
|
||||||
|
.filteredOn(action -> action instanceof Action.ToCopy)
|
||||||
|
.hasSize(1);
|
||||||
|
assertThat(actions.get().stream()
|
||||||
|
.map(SequencedAction::action))
|
||||||
|
.filteredOn(action -> action instanceof Action.DoNothing)
|
||||||
|
.hasSize(1);
|
||||||
|
assertThat(actions.get().stream()
|
||||||
|
.map(SequencedAction::action)
|
||||||
|
.map(action -> action.remoteKey))
|
||||||
|
.containsExactlyInAnyOrder(
|
||||||
|
MD5HashData.Root.remoteKey,
|
||||||
|
MD5HashData.Leaf.remoteKey
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("update ui")
|
||||||
|
public void updateUI() {
|
||||||
|
assertThat(uiEventSummary(uiEventList))
|
||||||
|
.hasSize(6)
|
||||||
|
.contains(
|
||||||
|
"file found : root-file",
|
||||||
|
"action chosen : root-file : ToCopy",
|
||||||
|
"action finished : root-file : ToCopy")
|
||||||
|
.contains(
|
||||||
|
"file found : subdir/leaf-file",
|
||||||
|
"action chosen : subdir/leaf-file : DoNothing",
|
||||||
|
"action finished : subdir/leaf-file : DoNothing");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Nested
|
||||||
|
@DisplayName("scanDelete")
|
||||||
|
public class ScanDeleteTests {
|
||||||
|
@Nested
|
||||||
|
@DisplayName("where remote has no extra objects")
|
||||||
|
public class RemoteHasNoExtrasTests {
|
||||||
|
RemoteObjects remoteObjects = RemoteObjects.empty;
|
||||||
|
Configuration configuration = ConfigurationBuilder.buildConfig(configOptions);
|
||||||
|
List<UIEvent> uiEventList = new ArrayList<>();
|
||||||
|
Channel<UIEvent> uiSink = Channel.<UIEvent>create("ui-test")
|
||||||
|
.addListener(uiEventList::add)
|
||||||
|
.start();
|
||||||
|
List<StorageEvent> storageEvents;
|
||||||
|
|
||||||
|
public RemoteHasNoExtrasTests() throws IOException, ConfigValidationException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setUp() throws InterruptedException {
|
||||||
|
storageEvents = LocalFileSystem.scanDelete(
|
||||||
|
configuration, uiSink, remoteObjects, archive);
|
||||||
|
uiSink.shutdownNow();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("no archive actions")
|
||||||
|
public void noArchiveUpdates() {
|
||||||
|
assertThat(storageEvents).isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("update ui")
|
||||||
|
public void updateUI() {
|
||||||
|
assertThat(uiEventList).isEmpty();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@Nested
|
||||||
|
@DisplayName("where remote has extra objects")
|
||||||
|
public class RemoteHasExtrasTests {
|
||||||
|
Map<MD5Hash, RemoteKey> hashToKey = new HashMap<>();
|
||||||
|
Map<RemoteKey, MD5Hash> keyToHash = new HashMap<>();
|
||||||
|
RemoteObjects remoteObjects;
|
||||||
|
Configuration configuration = ConfigurationBuilder.buildConfig(configOptions);
|
||||||
|
List<UIEvent> uiEventList = new ArrayList<>();
|
||||||
|
Channel<UIEvent> uiSink = Channel.<UIEvent>create("ui-test")
|
||||||
|
.addListener(uiEventList::add)
|
||||||
|
.start();
|
||||||
|
List<StorageEvent> storageEvents;
|
||||||
|
RemoteKey extraKey = RemoteKey.create("/extra");
|
||||||
|
MD5Hash extraHash = MD5Hash.create("extra-hash");
|
||||||
|
|
||||||
|
public RemoteHasExtrasTests() throws IOException, ConfigValidationException {
|
||||||
|
hashToKey.put(extraHash, extraKey);
|
||||||
|
keyToHash.put(extraKey, extraHash);
|
||||||
|
remoteObjects = RemoteObjects.create(
|
||||||
|
MapView.of(hashToKey).asMap(),
|
||||||
|
MapView.of(keyToHash).asMap());
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setUp() throws InterruptedException {
|
||||||
|
storageEvents = LocalFileSystem.scanDelete(
|
||||||
|
configuration, uiSink, remoteObjects, archive);
|
||||||
|
uiSink.shutdownNow();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("archive delete action")
|
||||||
|
public void archiveDeleteUpdates() {
|
||||||
|
assertThat(storageEvents).hasSize(1);
|
||||||
|
assertThat(actions.get().stream()
|
||||||
|
.map(SequencedAction::action))
|
||||||
|
.filteredOn(action -> action instanceof Action.ToDelete)
|
||||||
|
.hasSize(1);
|
||||||
|
assertThat(actions.get().stream()
|
||||||
|
.map(SequencedAction::action)
|
||||||
|
.map(action -> action.remoteKey))
|
||||||
|
.containsExactly(extraKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("update ui")
|
||||||
|
public void updateUI() {
|
||||||
|
assertThat(uiEventSummary(uiEventList))
|
||||||
|
.hasSize(3)
|
||||||
|
.contains(
|
||||||
|
"key found: /extra",
|
||||||
|
"action chosen : /extra : ToDelete",
|
||||||
|
"action finished : /extra : ToDelete");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<String> uiEventSummary(List<UIEvent> uiEvents) {
|
||||||
|
Deque<String> summary = new ArrayDeque<>();
|
||||||
|
uiEvents.stream()
|
||||||
|
.map(uiEvent -> {
|
||||||
|
if (uiEvent instanceof UIEvent.FileFound) {
|
||||||
|
return String.format("file found : %s",
|
||||||
|
((UIEvent.FileFound) uiEvent).localFile
|
||||||
|
.remoteKey.key());
|
||||||
|
} else if (uiEvent instanceof UIEvent.ActionChosen) {
|
||||||
|
Action action = ((UIEvent.ActionChosen) uiEvent).action;
|
||||||
|
return String.format(
|
||||||
|
"action chosen : %s : %s",
|
||||||
|
action.remoteKey.key(),
|
||||||
|
action.getClass().getSimpleName());
|
||||||
|
} else if (uiEvent instanceof UIEvent.ActionFinished) {
|
||||||
|
Action action = ((UIEvent.ActionFinished) uiEvent).action;
|
||||||
|
return String.format(
|
||||||
|
"action finished : %s : %s",
|
||||||
|
action.remoteKey.key(),
|
||||||
|
action.getClass().getSimpleName());
|
||||||
|
} else if (uiEvent instanceof UIEvent.KeyFound) {
|
||||||
|
return String.format("key found: %s",
|
||||||
|
((UIEvent.KeyFound) uiEvent).remoteKey
|
||||||
|
.key());
|
||||||
|
}
|
||||||
|
return String.format("unknown : %s",
|
||||||
|
uiEvent.getClass().getSimpleName());
|
||||||
|
})
|
||||||
|
.forEach(summary::addLast);
|
||||||
|
return new ArrayList<>(summary);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1 @@
|
||||||
|
net.kemitix.thorp.filesystem.MD5HashGenerator
|
|
@ -1,46 +0,0 @@
|
||||||
package net.kemitix.thorp.lib
|
|
||||||
|
|
||||||
import org.scalatest.FreeSpec
|
|
||||||
|
|
||||||
class FileScannerTest extends FreeSpec {
|
|
||||||
|
|
||||||
// "scanSources" - {
|
|
||||||
// "creates a FileSender for files in resources" in {
|
|
||||||
// def receiver(scanned: Ref[List[RemoteKey]])
|
|
||||||
// : UIO[MessageChannel.UReceiver[Any, ScannedFile]] = UIO { message =>
|
|
||||||
// for {
|
|
||||||
// _ <- scanned.update(l => message.body.remoteKey :: l)
|
|
||||||
// } yield ()
|
|
||||||
// }
|
|
||||||
// val scannedFiles =
|
|
||||||
// new AtomicReference[List[RemoteKey]](List.empty)
|
|
||||||
// val sourcePath = Resource.select(this, "upload").toPath
|
|
||||||
// val configOptions: List[ConfigOption] =
|
|
||||||
// List[ConfigOption](ConfigOption.source(sourcePath),
|
|
||||||
// ConfigOption.bucket("bucket"),
|
|
||||||
// ConfigOption.ignoreGlobalOptions(),
|
|
||||||
// ConfigOption.ignoreUserOptions())
|
|
||||||
// val program: ZIO[Clock with FileScanner, Throwable, Unit] = {
|
|
||||||
// val configuration = ConfigurationBuilder.buildConfig(
|
|
||||||
// ConfigOptions.create(configOptions.asJava))
|
|
||||||
// for {
|
|
||||||
// scanner <- FileScanner.scanSources(configuration)
|
|
||||||
// scannedRef <- Ref.make[List[RemoteKey]](List.empty)
|
|
||||||
// receiver <- receiver(scannedRef)
|
|
||||||
// _ <- MessageChannel.pointToPoint(scanner)(receiver).runDrain
|
|
||||||
// scanned <- scannedRef.get
|
|
||||||
// _ <- UIO(scannedFiles.set(scanned))
|
|
||||||
// } yield ()
|
|
||||||
// }
|
|
||||||
// object TestEnv extends FileScanner.Live with Clock.Live
|
|
||||||
// val completed =
|
|
||||||
// new DefaultRuntime {}.unsafeRunSync(program.provide(TestEnv)).toEither
|
|
||||||
// assert(completed.isRight)
|
|
||||||
// assertResult(
|
|
||||||
// Set(RemoteKey.create("root-file"),
|
|
||||||
// RemoteKey.create("subdir/leaf-file")))(scannedFiles.get.toSet)
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// }
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,139 +0,0 @@
|
||||||
package net.kemitix.thorp.lib
|
|
||||||
|
|
||||||
import java.nio.file.Paths
|
|
||||||
|
|
||||||
import net.kemitix.thorp.domain.Filter
|
|
||||||
import net.kemitix.thorp.domain.Filter.{Exclude, Include}
|
|
||||||
import org.scalatest.FunSpec
|
|
||||||
|
|
||||||
class FiltersSuite extends FunSpec {
|
|
||||||
|
|
||||||
private val path1 = "a-file"
|
|
||||||
private val path2 = "another-file.txt"
|
|
||||||
private val path3 = "/path/to/a/file.txt"
|
|
||||||
private val path4 = "/path/to/another/file"
|
|
||||||
private val path5 = "/home/pcampbell/repos/kemitix/s3thorp"
|
|
||||||
private val path6 = "/kemitix/s3thorp/upload/subdir"
|
|
||||||
private val paths =
|
|
||||||
List(path1, path2, path3, path4, path5, path6).map(Paths.get(_))
|
|
||||||
|
|
||||||
describe("Include") {
|
|
||||||
|
|
||||||
describe("default filter") {
|
|
||||||
val include = Include.all
|
|
||||||
it("should include files") {
|
|
||||||
paths.foreach(path =>
|
|
||||||
assertResult(true)(Filters.isIncludedByFilter(path)(include)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
describe("directory exact match include '/upload/subdir/'") {
|
|
||||||
val include = Include.create("/upload/subdir/")
|
|
||||||
it("include matching directory") {
|
|
||||||
val matching = Paths.get("/upload/subdir/leaf-file")
|
|
||||||
assertResult(true)(Filters.isIncludedByFilter(matching)(include))
|
|
||||||
}
|
|
||||||
it("exclude non-matching files") {
|
|
||||||
val nonMatching = Paths.get("/upload/other-file")
|
|
||||||
assertResult(false)(Filters.isIncludedByFilter(nonMatching)(include))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
describe("file partial match 'root'") {
|
|
||||||
val include = Include.create("root")
|
|
||||||
it("include matching file '/upload/root-file") {
|
|
||||||
val matching = Paths.get("/upload/root-file")
|
|
||||||
assertResult(true)(Filters.isIncludedByFilter(matching)(include))
|
|
||||||
}
|
|
||||||
it("exclude non-matching files 'test-file-for-hash.txt'") {
|
|
||||||
val nonMatching1 = Paths.get("/test-file-for-hash.txt")
|
|
||||||
assertResult(false)(Filters.isIncludedByFilter(nonMatching1)(include))
|
|
||||||
}
|
|
||||||
it("exclude non-matching files '/upload/subdir/leaf-file'") {
|
|
||||||
val nonMatching2 = Paths.get("/upload/subdir/leaf-file")
|
|
||||||
assertResult(false)(Filters.isIncludedByFilter(nonMatching2)(include))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
describe("Exclude") {
|
|
||||||
// describe("default exclude") {
|
|
||||||
// val exclude = Exclude()
|
|
||||||
// it("should exclude all files") {
|
|
||||||
// paths.foreach(path => {
|
|
||||||
// assertResult(true)(exclude.isExcluded(path))
|
|
||||||
// })
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
describe("directory exact match exclude '/upload/subdir/'") {
|
|
||||||
val exclude = Exclude.create("/upload/subdir/")
|
|
||||||
it("exclude matching directory") {
|
|
||||||
val matching = Paths.get("/upload/subdir/leaf-file")
|
|
||||||
assertResult(true)(Filters.isExcludedByFilter(matching)(exclude))
|
|
||||||
}
|
|
||||||
it("include non-matching files") {
|
|
||||||
val nonMatching = Paths.get("/upload/other-file")
|
|
||||||
assertResult(false)(Filters.isExcludedByFilter(nonMatching)(exclude))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
describe("file partial match 'root'") {
|
|
||||||
val exclude = Exclude.create("root")
|
|
||||||
it("exclude matching file '/upload/root-file") {
|
|
||||||
val matching = Paths.get("/upload/root-file")
|
|
||||||
assertResult(true)(Filters.isExcludedByFilter(matching)(exclude))
|
|
||||||
}
|
|
||||||
it("include non-matching files 'test-file-for-hash.txt'") {
|
|
||||||
val nonMatching1 = Paths.get("/test-file-for-hash.txt")
|
|
||||||
assertResult(false)(Filters.isExcludedByFilter(nonMatching1)(exclude))
|
|
||||||
}
|
|
||||||
it("include non-matching files '/upload/subdir/leaf-file'") {
|
|
||||||
val nonMatching2 = Paths.get("/upload/subdir/leaf-file")
|
|
||||||
assertResult(false)(Filters.isExcludedByFilter(nonMatching2)(exclude))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
describe("isIncluded") {
|
|
||||||
def invoke(filters: List[Filter]) = {
|
|
||||||
paths.filter(path => Filters.isIncluded(path)(filters))
|
|
||||||
}
|
|
||||||
|
|
||||||
describe("when there are no filters") {
|
|
||||||
val filters = List[Filter]()
|
|
||||||
it("should accept all files") {
|
|
||||||
val expected = paths
|
|
||||||
val result = invoke(filters)
|
|
||||||
assertResult(expected)(result)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
describe("when a single include") {
|
|
||||||
val filters = List(Include.create(".txt"))
|
|
||||||
it("should only include two matching paths") {
|
|
||||||
val expected = List(path2, path3).map(Paths.get(_))
|
|
||||||
val result = invoke(filters)
|
|
||||||
assertResult(expected)(result)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
describe("when a single exclude") {
|
|
||||||
val filters = List(Exclude.create("path"))
|
|
||||||
it("should include only other paths") {
|
|
||||||
val expected = List(path1, path2, path5, path6).map(Paths.get(_))
|
|
||||||
val result = invoke(filters)
|
|
||||||
assertResult(expected)(result)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
describe("when include .txt files, but then exclude everything trumps all") {
|
|
||||||
val filters = List[Filter](Include.create(".txt"), Exclude.create(".*"))
|
|
||||||
it("should include nothing") {
|
|
||||||
val expected = List()
|
|
||||||
val result = invoke(filters)
|
|
||||||
assertResult(expected)(result)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
describe("when exclude everything except .txt files") {
|
|
||||||
val filters = List[Filter](Exclude.create(".*"), Include.create(".txt"))
|
|
||||||
it("should include only the .txt files") {
|
|
||||||
val expected = List(path2, path3).map(Paths.get(_))
|
|
||||||
val result = invoke(filters)
|
|
||||||
assertResult(expected)(result)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,369 +0,0 @@
|
||||||
package net.kemitix.thorp.lib
|
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicReference
|
|
||||||
|
|
||||||
import net.kemitix.thorp.config.{ConfigOption, ConfigOptions, Configuration}
|
|
||||||
import net.kemitix.thorp.domain._
|
|
||||||
import net.kemitix.thorp.filesystem.Resource
|
|
||||||
import net.kemitix.thorp.uishell.UIEvent
|
|
||||||
import net.kemitix.thorp.uishell.UIEvent.{
|
|
||||||
ActionChosen,
|
|
||||||
ActionFinished,
|
|
||||||
FileFound,
|
|
||||||
KeyFound
|
|
||||||
}
|
|
||||||
import org.scalatest.FreeSpec
|
|
||||||
|
|
||||||
import scala.jdk.CollectionConverters._
|
|
||||||
|
|
||||||
class LocalFileSystemTest extends FreeSpec {
|
|
||||||
|
|
||||||
private val source = Resource.select(this, "upload")
|
|
||||||
private val sourcePath = source.toPath
|
|
||||||
private val sourceOption = ConfigOption.source(sourcePath)
|
|
||||||
private val bucket = Bucket.named("bucket")
|
|
||||||
private val bucketOption = ConfigOption.bucket(bucket.name)
|
|
||||||
private val configOptions = ConfigOptions.create(
|
|
||||||
List[ConfigOption](
|
|
||||||
sourceOption,
|
|
||||||
bucketOption,
|
|
||||||
ConfigOption.ignoreGlobalOptions(),
|
|
||||||
ConfigOption.ignoreUserOptions()
|
|
||||||
).asJava
|
|
||||||
)
|
|
||||||
|
|
||||||
private val uiEvents = new AtomicReference[List[UIEvent]](List.empty)
|
|
||||||
private val actions = new AtomicReference[List[SequencedAction]](List.empty)
|
|
||||||
|
|
||||||
private def archive: ThorpArchive = new ThorpArchive {
|
|
||||||
override def update(configuration: Configuration,
|
|
||||||
uiSink: Channel.Sink[UIEvent],
|
|
||||||
sequencedAction: SequencedAction,
|
|
||||||
totalBytesSoFar: Long): StorageEvent = {
|
|
||||||
actions.updateAndGet(l => sequencedAction :: l)
|
|
||||||
StorageEvent.doNothingEvent(sequencedAction.action.remoteKey)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// private object TestEnv extends Clock.Live with FileScanner.Live
|
|
||||||
//
|
|
||||||
// "scanCopyUpload" - {
|
|
||||||
// def sender(
|
|
||||||
// configuration: Configuration,
|
|
||||||
// objects: RemoteObjects
|
|
||||||
// ): UIO[MessageChannel.ESender[Clock with FileScanner, Throwable, UIEvent]] =
|
|
||||||
// UIO { uiChannel =>
|
|
||||||
// (for {
|
|
||||||
// _ <- LocalFileSystem.scanCopyUpload(
|
|
||||||
// configuration,
|
|
||||||
// uiChannel,
|
|
||||||
// objects,
|
|
||||||
// archive
|
|
||||||
// )
|
|
||||||
// } yield ()) <* MessageChannel.endChannel(uiChannel)
|
|
||||||
// }
|
|
||||||
// def receiver(): UIO[MessageChannel.UReceiver[Any, UIEvent]] =
|
|
||||||
// UIO { message =>
|
|
||||||
// val uiEvent = message.body
|
|
||||||
// uiEvents.updateAndGet(l => uiEvent :: l)
|
|
||||||
// UIO(())
|
|
||||||
// }
|
|
||||||
// def program(remoteObjects: RemoteObjects) = {
|
|
||||||
// val configuration = ConfigurationBuilder.buildConfig(configOptions)
|
|
||||||
// for {
|
|
||||||
// sender <- sender(configuration, remoteObjects)
|
|
||||||
// receiver <- receiver()
|
|
||||||
// _ <- MessageChannel.pointToPoint(sender)(receiver).runDrain
|
|
||||||
// } yield ()
|
|
||||||
// }
|
|
||||||
// "where remote has no objects" - {
|
|
||||||
// val remoteObjects = RemoteObjects.empty
|
|
||||||
// "upload all files" - {
|
|
||||||
// "update archive with upload actions" in {
|
|
||||||
// actions.set(List.empty)
|
|
||||||
// runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv))
|
|
||||||
// val actionList: Set[Action] = actions.get.map(_.action).toSet
|
|
||||||
// actionList.filter(_.isInstanceOf[ToUpload]) should have size 2
|
|
||||||
// actionList.map(_.remoteKey) shouldEqual Set(
|
|
||||||
// MD5HashData.Root.remoteKey,
|
|
||||||
// MD5HashData.Leaf.remoteKey
|
|
||||||
// )
|
|
||||||
// }
|
|
||||||
// "ui is updated" in {
|
|
||||||
// uiEvents.set(List.empty)
|
|
||||||
// runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv))
|
|
||||||
// val summary = uiEventsSummary
|
|
||||||
// summary should have size 6
|
|
||||||
// summary should contain inOrderElementsOf List(
|
|
||||||
// "file found : root-file",
|
|
||||||
// "action chosen : root-file : ToUpload",
|
|
||||||
// "action finished : root-file : ToUpload"
|
|
||||||
// )
|
|
||||||
// summary should contain inOrderElementsOf List(
|
|
||||||
// "file found : subdir/leaf-file",
|
|
||||||
// "action chosen : subdir/leaf-file : ToUpload",
|
|
||||||
// "action finished : subdir/leaf-file : ToUpload"
|
|
||||||
// )
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// "where remote has all object" - {
|
|
||||||
// val remoteObjects =
|
|
||||||
// RemoteObjects.create(
|
|
||||||
// MapView(
|
|
||||||
// MD5HashData.Root.hash -> MD5HashData.Root.remoteKey,
|
|
||||||
// MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey
|
|
||||||
// ).toMap.asJava,
|
|
||||||
// MapView(
|
|
||||||
// MD5HashData.Root.remoteKey -> MD5HashData.Root.hash,
|
|
||||||
// MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash
|
|
||||||
// ).toMap.asJava
|
|
||||||
// )
|
|
||||||
// "do nothing for all files" - {
|
|
||||||
// "all archive actions do nothing" in {
|
|
||||||
// actions.set(List.empty)
|
|
||||||
// runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv))
|
|
||||||
// val actionList: Set[Action] = actions.get.map(_.action).toSet
|
|
||||||
// actionList should have size 2
|
|
||||||
// actionList.filter(_.isInstanceOf[DoNothing]) should have size 2
|
|
||||||
// }
|
|
||||||
// "ui is updated" in {
|
|
||||||
// uiEvents.set(List.empty)
|
|
||||||
// runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv))
|
|
||||||
// val summary = uiEventsSummary
|
|
||||||
// summary should have size 6
|
|
||||||
// summary should contain inOrderElementsOf List(
|
|
||||||
// "file found : root-file",
|
|
||||||
// "action chosen : root-file : DoNothing",
|
|
||||||
// "action finished : root-file : DoNothing"
|
|
||||||
// )
|
|
||||||
// summary should contain inOrderElementsOf List(
|
|
||||||
// "file found : subdir/leaf-file",
|
|
||||||
// "action chosen : subdir/leaf-file : DoNothing",
|
|
||||||
// "action finished : subdir/leaf-file : DoNothing"
|
|
||||||
// )
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// "where remote has some objects" - {
|
|
||||||
// val remoteObjects =
|
|
||||||
// RemoteObjects.create(
|
|
||||||
// MapView(MD5HashData.Root.hash -> MD5HashData.Root.remoteKey).toMap.asJava,
|
|
||||||
// MapView(MD5HashData.Root.remoteKey -> MD5HashData.Root.hash).toMap.asJava
|
|
||||||
// )
|
|
||||||
// "upload leaf, do nothing for root" - {
|
|
||||||
// "archive actions upload leaf" in {
|
|
||||||
// actions.set(List.empty)
|
|
||||||
// runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv))
|
|
||||||
// val actionList: Set[Action] = actions.get.map(_.action).toSet
|
|
||||||
// actionList
|
|
||||||
// .filter(_.isInstanceOf[DoNothing])
|
|
||||||
// .map(_.remoteKey) shouldEqual Set(MD5HashData.Root.remoteKey)
|
|
||||||
// actionList
|
|
||||||
// .filter(_.isInstanceOf[ToUpload])
|
|
||||||
// .map(_.remoteKey) shouldEqual Set(MD5HashData.Leaf.remoteKey)
|
|
||||||
// }
|
|
||||||
// "ui is updated" in {
|
|
||||||
// uiEvents.set(List.empty)
|
|
||||||
// runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv))
|
|
||||||
// val summary = uiEventsSummary
|
|
||||||
// summary should contain inOrderElementsOf List(
|
|
||||||
// "file found : root-file",
|
|
||||||
// "action chosen : root-file : DoNothing",
|
|
||||||
// "action finished : root-file : DoNothing"
|
|
||||||
// )
|
|
||||||
// summary should contain inOrderElementsOf List(
|
|
||||||
// "file found : subdir/leaf-file",
|
|
||||||
// "action chosen : subdir/leaf-file : ToUpload",
|
|
||||||
// "action finished : subdir/leaf-file : ToUpload"
|
|
||||||
// )
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// "where remote objects are swapped" ignore {
|
|
||||||
// val remoteObjects =
|
|
||||||
// RemoteObjects.create(
|
|
||||||
// MapView(
|
|
||||||
// MD5HashData.Root.hash -> MD5HashData.Leaf.remoteKey,
|
|
||||||
// MD5HashData.Leaf.hash -> MD5HashData.Root.remoteKey
|
|
||||||
// ).toMap.asJava,
|
|
||||||
// MapView(
|
|
||||||
// MD5HashData.Root.remoteKey -> MD5HashData.Leaf.hash,
|
|
||||||
// MD5HashData.Leaf.remoteKey -> MD5HashData.Root.hash
|
|
||||||
// ).toMap.asJava
|
|
||||||
// )
|
|
||||||
// "copy files" - {
|
|
||||||
// "archive swaps objects" ignore {
|
|
||||||
// // not supported
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// "where file has been renamed" - {
|
|
||||||
// // renamed from "other/root" to "root-file"
|
|
||||||
// val otherRootKey = RemoteKey.create("other/root")
|
|
||||||
// val remoteObjects =
|
|
||||||
// RemoteObjects.create(
|
|
||||||
// MapView(
|
|
||||||
// MD5HashData.Root.hash -> otherRootKey,
|
|
||||||
// MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey
|
|
||||||
// ).toMap.asJava,
|
|
||||||
// MapView(
|
|
||||||
// otherRootKey -> MD5HashData.Root.hash,
|
|
||||||
// MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash
|
|
||||||
// ).toMap.asJava
|
|
||||||
// )
|
|
||||||
// "copy object and delete original" in {
|
|
||||||
// actions.set(List.empty)
|
|
||||||
// runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv))
|
|
||||||
// val actionList: Set[Action] = actions.get.map(_.action).toSet
|
|
||||||
// actionList should have size 2
|
|
||||||
// actionList
|
|
||||||
// .filter(_.isInstanceOf[DoNothing])
|
|
||||||
// .map(_.remoteKey) shouldEqual Set(MD5HashData.Leaf.remoteKey)
|
|
||||||
// actionList
|
|
||||||
// .filter(_.isInstanceOf[ToCopy])
|
|
||||||
// .map(_.remoteKey) shouldEqual Set(MD5HashData.Root.remoteKey)
|
|
||||||
// }
|
|
||||||
// "ui is updated" in {
|
|
||||||
// uiEvents.set(List.empty)
|
|
||||||
// runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv))
|
|
||||||
// val summary = uiEventsSummary
|
|
||||||
// summary should contain inOrderElementsOf List(
|
|
||||||
// "file found : root-file",
|
|
||||||
// "action chosen : root-file : ToCopy",
|
|
||||||
// "action finished : root-file : ToCopy"
|
|
||||||
// )
|
|
||||||
// summary should contain inOrderElementsOf List(
|
|
||||||
// "file found : subdir/leaf-file",
|
|
||||||
// "action chosen : subdir/leaf-file : DoNothing",
|
|
||||||
// "action finished : subdir/leaf-file : DoNothing"
|
|
||||||
// )
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// "scanDelete" - {
|
|
||||||
// def sender(
|
|
||||||
// configuration: Configuration,
|
|
||||||
// objects: RemoteObjects
|
|
||||||
// ): UIO[MessageChannel.ESender[Clock, Throwable, UIEvent]] =
|
|
||||||
// UIO { uiChannel =>
|
|
||||||
// (for {
|
|
||||||
// _ <- LocalFileSystem.scanDelete(
|
|
||||||
// configuration,
|
|
||||||
// uiChannel,
|
|
||||||
// objects,
|
|
||||||
// archive
|
|
||||||
// )
|
|
||||||
// } yield ()) <* MessageChannel.endChannel(uiChannel)
|
|
||||||
// }
|
|
||||||
// def receiver(): UIO[MessageChannel.UReceiver[Any, UIEvent]] =
|
|
||||||
// UIO { message =>
|
|
||||||
// val uiEvent = message.body
|
|
||||||
// uiEvents.updateAndGet(l => uiEvent :: l)
|
|
||||||
// UIO(())
|
|
||||||
// }
|
|
||||||
// def program(remoteObjects: RemoteObjects) = {
|
|
||||||
// {
|
|
||||||
// val configuration = ConfigurationBuilder.buildConfig(configOptions)
|
|
||||||
// for {
|
|
||||||
// sender <- sender(configuration, remoteObjects)
|
|
||||||
// receiver <- receiver()
|
|
||||||
// _ <- MessageChannel.pointToPoint(sender)(receiver).runDrain
|
|
||||||
// } yield ()
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// "where remote has no extra objects" - {
|
|
||||||
// val remoteObjects = RemoteObjects.create(
|
|
||||||
// MapView(
|
|
||||||
// MD5HashData.Root.hash -> MD5HashData.Root.remoteKey,
|
|
||||||
// MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey
|
|
||||||
// ).toMap.asJava,
|
|
||||||
// MapView(
|
|
||||||
// MD5HashData.Root.remoteKey -> MD5HashData.Root.hash,
|
|
||||||
// MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash
|
|
||||||
// ).toMap.asJava
|
|
||||||
// )
|
|
||||||
// "do nothing for all files" - {
|
|
||||||
// "no archive actions" in {
|
|
||||||
// actions.set(List.empty)
|
|
||||||
// runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv))
|
|
||||||
// val actionList: Set[Action] = actions.get.map(_.action).toSet
|
|
||||||
// actionList should have size 0
|
|
||||||
// }
|
|
||||||
// "ui is updated" in {
|
|
||||||
// uiEvents.set(List.empty)
|
|
||||||
// runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv))
|
|
||||||
// uiEventsSummary shouldEqual List(
|
|
||||||
// "key found: root-file",
|
|
||||||
// "key found: subdir/leaf-file"
|
|
||||||
// )
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// "where remote has extra objects" - {
|
|
||||||
// val extraHash = MD5Hash.create("extra")
|
|
||||||
// val extraObject = RemoteKey.create("extra")
|
|
||||||
// val remoteObjects = RemoteObjects.create(
|
|
||||||
// MapView(
|
|
||||||
// MD5HashData.Root.hash -> MD5HashData.Root.remoteKey,
|
|
||||||
// MD5HashData.Leaf.hash -> MD5HashData.Leaf.remoteKey,
|
|
||||||
// extraHash -> extraObject
|
|
||||||
// ).toMap.asJava,
|
|
||||||
// MapView(
|
|
||||||
// MD5HashData.Root.remoteKey -> MD5HashData.Root.hash,
|
|
||||||
// MD5HashData.Leaf.remoteKey -> MD5HashData.Leaf.hash,
|
|
||||||
// extraObject -> extraHash
|
|
||||||
// ).toMap.asJava
|
|
||||||
// )
|
|
||||||
// "remove the extra object" - {
|
|
||||||
// "archive delete action" in {
|
|
||||||
// actions.set(List.empty)
|
|
||||||
// runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv))
|
|
||||||
// val actionList: Set[Action] = actions.get.map(_.action).toSet
|
|
||||||
// actionList should have size 1
|
|
||||||
// actionList
|
|
||||||
// .filter(_.isInstanceOf[ToDelete])
|
|
||||||
// .map(_.remoteKey) shouldEqual Set(extraObject)
|
|
||||||
// }
|
|
||||||
// "ui is updated" in {
|
|
||||||
// uiEvents.set(List.empty)
|
|
||||||
// runtime.unsafeRunSync(program(remoteObjects).provide(TestEnv))
|
|
||||||
// uiEventsSummary shouldEqual List(
|
|
||||||
// "key found: root-file",
|
|
||||||
// "key found: subdir/leaf-file",
|
|
||||||
// "key found: extra",
|
|
||||||
// "action chosen : extra : ToDelete",
|
|
||||||
// "action finished : extra : ToDelete"
|
|
||||||
// )
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
private def uiEventsSummary: List[String] = {
|
|
||||||
uiEvents
|
|
||||||
.get()
|
|
||||||
.reverse
|
|
||||||
.map {
|
|
||||||
case uie: FileFound =>
|
|
||||||
String.format("file found : %s", uie.localFile.remoteKey.key)
|
|
||||||
case uie: ActionChosen =>
|
|
||||||
String.format(
|
|
||||||
"action chosen : %s : %s",
|
|
||||||
uie.action.remoteKey.key,
|
|
||||||
uie.action.getClass.getSimpleName
|
|
||||||
)
|
|
||||||
case uie: ActionFinished =>
|
|
||||||
String.format(
|
|
||||||
"action finished : %s : %s",
|
|
||||||
uie.action.remoteKey.key,
|
|
||||||
uie.action.getClass.getSimpleName
|
|
||||||
)
|
|
||||||
case uie: KeyFound =>
|
|
||||||
String.format("key found: %s", uie.remoteKey.key)
|
|
||||||
case x => String.format("unknown : %s", x.getClass.getSimpleName)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,8 +1,7 @@
|
||||||
package net.kemitix.thorp.storage.aws;
|
package net.kemitix.thorp.storage.aws;
|
||||||
|
|
||||||
import com.amazonaws.services.s3.model.PutObjectRequest;
|
|
||||||
import com.amazonaws.services.s3.transfer.TransferManagerConfiguration;
|
import com.amazonaws.services.s3.transfer.TransferManagerConfiguration;
|
||||||
import com.amazonaws.services.s3.transfer.internal.TransferManagerUtils;
|
import lombok.With;
|
||||||
import net.kemitix.thorp.domain.HashGenerator;
|
import net.kemitix.thorp.domain.HashGenerator;
|
||||||
import net.kemitix.thorp.domain.HashType;
|
import net.kemitix.thorp.domain.HashType;
|
||||||
import net.kemitix.thorp.domain.Hashes;
|
import net.kemitix.thorp.domain.Hashes;
|
||||||
|
@ -19,14 +18,27 @@ import java.util.stream.LongStream;
|
||||||
|
|
||||||
import static net.kemitix.thorp.filesystem.MD5HashGenerator.md5FileChunk;
|
import static net.kemitix.thorp.filesystem.MD5HashGenerator.md5FileChunk;
|
||||||
|
|
||||||
|
@With
|
||||||
public class S3ETagGenerator implements HashGenerator {
|
public class S3ETagGenerator implements HashGenerator {
|
||||||
|
|
||||||
|
private final long multipartUploadThreshold;
|
||||||
|
|
||||||
|
public S3ETagGenerator() {
|
||||||
|
multipartUploadThreshold =
|
||||||
|
new TransferManagerConfiguration().getMultipartUploadThreshold();
|
||||||
|
}
|
||||||
|
|
||||||
|
public S3ETagGenerator(long multipartUploadThreshold) {
|
||||||
|
this.multipartUploadThreshold = multipartUploadThreshold;
|
||||||
|
}
|
||||||
|
|
||||||
@Deprecated // Use hashFile
|
@Deprecated // Use hashFile
|
||||||
public String eTag(Path path) throws IOException, NoSuchAlgorithmException {
|
public String eTag(Path path) throws IOException, NoSuchAlgorithmException {
|
||||||
return hashFile(path);
|
return hashFile(path);
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
public String hashFile(Path path) throws IOException, NoSuchAlgorithmException {
|
public String hashFile(Path path) throws IOException, NoSuchAlgorithmException {
|
||||||
long partSize = calculatePartSize(path);
|
long partSize = calculatePartSize();
|
||||||
long parts = numParts(path.toFile().length(), partSize);
|
long parts = numParts(path.toFile().length(), partSize);
|
||||||
String eTagHex = eTagHex(path, partSize, parts);
|
String eTagHex = eTagHex(path, partSize, parts);
|
||||||
return String.format("%s-%d", eTagHex, parts);
|
return String.format("%s-%d", eTagHex, parts);
|
||||||
|
@ -51,10 +63,8 @@ public class S3ETagGenerator implements HashGenerator {
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
private long calculatePartSize(Path path) {
|
private long calculatePartSize() {
|
||||||
return TransferManagerUtils.calculateOptimalPartSize(
|
return multipartUploadThreshold;
|
||||||
new PutObjectRequest("", "", path.toFile()),
|
|
||||||
new TransferManagerConfiguration());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private long numParts(long length, long partSize) {
|
private long numParts(long length, long partSize) {
|
||||||
|
|
|
@ -48,7 +48,8 @@ public class ETagGeneratorTest
|
||||||
@Test
|
@Test
|
||||||
@DisplayName("create eTag for whole file")
|
@DisplayName("create eTag for whole file")
|
||||||
public void createTagForWholeFile() throws IOException, NoSuchAlgorithmException {
|
public void createTagForWholeFile() throws IOException, NoSuchAlgorithmException {
|
||||||
String result = generator.hashFile(bigFile.toPath());
|
String result = generator.withMultipartUploadThreshold(5 * 1024 * 1024)
|
||||||
|
.hashFile(bigFile.toPath());
|
||||||
assertThat(result).isEqualTo(BIG_FILE_ETAG);
|
assertThat(result).isEqualTo(BIG_FILE_ETAG);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,13 +4,14 @@ import net.kemitix.thorp.config.Configuration;
|
||||||
import net.kemitix.thorp.console.Console;
|
import net.kemitix.thorp.console.Console;
|
||||||
import net.kemitix.thorp.console.ConsoleOut;
|
import net.kemitix.thorp.console.ConsoleOut;
|
||||||
import net.kemitix.thorp.domain.*;
|
import net.kemitix.thorp.domain.*;
|
||||||
|
import net.kemitix.thorp.domain.channel.Listener;
|
||||||
|
|
||||||
import static net.kemitix.thorp.domain.Terminal.eraseLineForward;
|
import static net.kemitix.thorp.domain.Terminal.eraseLineForward;
|
||||||
import static net.kemitix.thorp.domain.Terminal.eraseToEndOfScreen;
|
import static net.kemitix.thorp.domain.Terminal.eraseToEndOfScreen;
|
||||||
|
|
||||||
public interface UIShell {
|
public interface UIShell {
|
||||||
static Channel.Listener<UIEvent> listener(Configuration configuration) {
|
static Listener<UIEvent> listener(Configuration configuration) {
|
||||||
return new Channel.Listener<UIEvent>() {
|
return new Listener<UIEvent>() {
|
||||||
@Override
|
@Override
|
||||||
public void accept(UIEvent uiEvent) {
|
public void accept(UIEvent uiEvent) {
|
||||||
if (uiEvent instanceof UIEvent.RequestCycle)
|
if (uiEvent instanceof UIEvent.RequestCycle)
|
||||||
|
|
|
@ -2,8 +2,8 @@ package net.kemitix.thorp.uishell;
|
||||||
|
|
||||||
import lombok.AccessLevel;
|
import lombok.AccessLevel;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import net.kemitix.thorp.domain.Channel;
|
|
||||||
import net.kemitix.thorp.domain.LocalFile;
|
import net.kemitix.thorp.domain.LocalFile;
|
||||||
|
import net.kemitix.thorp.domain.channel.Sink;
|
||||||
import net.kemitix.thorp.uishell.UploadProgressEvent.RequestEvent;
|
import net.kemitix.thorp.uishell.UploadProgressEvent.RequestEvent;
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
@ -29,7 +29,7 @@ public class UploadEventListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Settings settings(
|
public static Settings settings(
|
||||||
Channel.Sink<UIEvent> uiSink,
|
Sink<UIEvent> uiSink,
|
||||||
LocalFile localFile,
|
LocalFile localFile,
|
||||||
int index,
|
int index,
|
||||||
long totalBytesToFar,
|
long totalBytesToFar,
|
||||||
|
@ -39,7 +39,7 @@ public class UploadEventListener {
|
||||||
}
|
}
|
||||||
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
|
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
|
||||||
public static class Settings {
|
public static class Settings {
|
||||||
public final Channel.Sink<UIEvent> uiSink;
|
public final Sink<UIEvent> uiSink;
|
||||||
public final LocalFile localFile;
|
public final LocalFile localFile;
|
||||||
public final int index;
|
public final int index;
|
||||||
public final long totalBytesSoFar;
|
public final long totalBytesSoFar;
|
||||||
|
|
Loading…
Reference in a new issue