Restructure using EIP-ZIO channels (#183)

* [sbt] Rename storage-api as storage

* [storage] remove dependency upon console

* [storage] remove dependency upon config

* [console] remove dependency upon config

* [sbt] Add app module

Make cli module actually cli, by moving CliArgs parser into it and
Main and Program into app.

* add app that depends on cli and thorp-lib
* move non-cli specific to app
* make cli depend on config
* make cli not depend on thorp-lib

* [sbt] make module dependencies more explicit

* make app depend on storage-aws
* make cli depend on filesystem's tests
* make thorp-lib depend on core
* make thorp-lib not depend on storage-aws
* make storage-aws not depend on core's tests
* make storage-aws depend on storage
* make storage-aws depend on filesystem and its tests
* make storage-aws depend on console
* make storage-aws depend on core
* make core depend on filesystem and its tests
* make filesystem depend on domain and its tests

* [sbt] merge thorp-lib with core as lib

* [sbt] add zio streams

* [lib] Add EIPTest

* [sbt] Allow NonUnitStatements

* [lib] EIPTest Message Channel rewritten using ZIO Stream

* [sbt] Add eip-zip 0.2.0 as dependency in lib

* Remove file counter and total upload size progress

Simplifying UnversionedMirrorArchive so we can create it before we
know what actions are needed.

* Fetch Remote Data before preparing any plans

* [domain] RemoteObjects only holds a single RemoteKey per Hash

Having multiple keys for a hash is redundant. They are only used to
create copy commands, and only one source remote key is needed for
that.

* [lib] Add a State trait

* [lib] Add FileScanner

* Add FileSystem.length(File)

* Add Clock to the Environment

* [domain] Sources update format

* [domain] Asking for a path that isn't in any Sources is fatal

There should never be any situation where are path not within a Source
is supplied. If there is, then something is badly wrong.

* [lib] Add test on use of zio.Ref

* [uishell] Add stub module

* [sbt] Upgrade eip-zio from 0.2.0 to 0.3.0

* [uishell] Add UIEvent stub

* [uishell] Add UIShell stub

* [sbt] Add eip-zio dependencies to app module

* [app] Wrap existing execution in simple point to point channel

* [uishell] Add UIEvent.ShowValidConfig

* [app] Remember to end the channel to allow prog to exit

* [app] purify environment for showValidConfig

* [app] Create type alias for pure effect free channel ref

* [app] Program refactoring

* [uishell] Add UIEvent.RemoteDataFetched

* [domain] Move Counters from lib

* [uishell] Add UIEvent.ShowSummary

* [lib] Add stub for PushLocalChanges

* [lib] Clean up FileScanner Environment types

* [lib] End channel after scanning files

* [lib] PushLocalChanges uses FileScanner

Scans files and sends them to a dummy receiver.

* [uishell] Add UIEvent.FileFound

* [lib] rename PushLocalChanges.apply as LocalFielSystem.scanCopyUpload

* [lib] FileScanner return LocalFile

* [domain] add length to LocalFile

* [domain] Add interogation queries to RemoteObjects

* [domain] Remove RemoteObject.keyForHashes

* [domain] RemoteObjects.remoteHasHash return the key and the hash

* [lib] LocalFileSystem.scanCopyUpload create Actions

* [domain] Move Action from lib

* [uishell] Log actions

* [lib] FileScanner respects Filters

* [lib] Create remoteKey for files correctly

* [lib] LocalFileSystem refactoring

* [lib] ThorpArchive.update doesn't need Console

* [uishell] Don't log choosen Action

* [uishell] Add UIEvent.ActionFinished

* [lib] LocalFileSystem refactoring

* [lib] Switch to using LocalFileSystem to do Copy/Upload

Todo or Broken:

- [ ] Delete actions don't happen
- [ ] Counters in summary are all zeros

* [lib] LocalFileStream display summary counters correctly

* [app] Restore ability to delete remote files

* [lib] LocalFileSystem deletes remote when local does NOT exist

* [filesystem] move hasLocalFile to FileSystem

* [filesystem] fix detection of local files from a RemoteKey

The configured Prefix wasn't being taken into account, meaning that
the expected local file for a RemoteKey was wrong.

* [filesystem] fix broken FileSystem test

* [domain] fix RemoteKey test

* [sbt] Upgrade eip-zio to 0.3.1 for zio-stream 1.0.0-RC12-1 compatibility

* [app] Program refactorting

* [lib] Remove unused class

* [lib] Remove test

* [uishell] Refactor large method
This commit is contained in:
Paul Campbell 2019-09-07 07:52:13 +01:00 committed by GitHub
parent 3d4dc956d0
commit c5d7d4933c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
85 changed files with 1357 additions and 554 deletions

View file

@ -1,10 +1,12 @@
package net.kemitix.thorp.cli
package net.kemitix.thorp
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.filesystem.FileSystem
import net.kemitix.thorp.lib.FileScanner
import net.kemitix.thorp.storage.aws.S3Storage
import net.kemitix.thorp.storage.aws.hasher.S3Hasher
import zio.clock.Clock
import zio.{App, ZIO}
object Main extends App {
@ -12,9 +14,11 @@ object Main extends App {
object LiveThorpApp
extends S3Storage.Live
with Console.Live
with Clock.Live
with Config.Live
with FileSystem.Live
with S3Hasher.Live
with FileScanner.Live
override def run(args: List[String]): ZIO[Environment, Nothing, Int] =
Program

View file

@ -0,0 +1,111 @@
package net.kemitix.thorp
import net.kemitix.eip.zio.MessageChannel.UChannel
import net.kemitix.eip.zio.{Message, MessageChannel}
import net.kemitix.thorp.cli.CliArgs
import net.kemitix.thorp.config._
import net.kemitix.thorp.console._
import net.kemitix.thorp.domain.{Counters, StorageQueueEvent}
import net.kemitix.thorp.domain.StorageQueueEvent.{
CopyQueueEvent,
DeleteQueueEvent,
ErrorQueueEvent,
UploadQueueEvent
}
import net.kemitix.thorp.filesystem.{FileSystem, Hasher}
import net.kemitix.thorp.lib.CoreTypes.CoreProgram
import net.kemitix.thorp.lib._
import net.kemitix.thorp.storage.Storage
import net.kemitix.throp.uishell.{UIEvent, UIShell}
import zio.clock.Clock
import zio.{RIO, UIO, ZIO}
trait Program {
lazy val version = s"Thorp v${thorp.BuildInfo.version}"
def run(args: List[String]): CoreProgram[Unit] = {
for {
cli <- CliArgs.parse(args)
config <- ConfigurationBuilder.buildConfig(cli)
_ <- Config.set(config)
_ <- ZIO.when(showVersion(cli))(Console.putStrLn(version))
_ <- ZIO.when(!showVersion(cli))(executeWithUI.catchAll(handleErrors))
} yield ()
}
private def showVersion: ConfigOptions => Boolean =
cli => ConfigQuery.showVersion(cli)
private def executeWithUI =
for {
uiEventSender <- execute
uiEventReceiver <- UIShell.receiver
_ <- MessageChannel.pointToPoint(uiEventSender)(uiEventReceiver).runDrain
} yield ()
type UIChannel = UChannel[Any, UIEvent]
private def execute
: ZIO[Any,
Nothing,
MessageChannel.ESender[
Storage with Config with FileSystem with Hasher with Clock with FileScanner,
Throwable,
UIEvent]] = UIO { uiChannel =>
(for {
_ <- showValidConfig(uiChannel)
remoteData <- fetchRemoteData(uiChannel)
archive <- UIO(UnversionedMirrorArchive)
copyUploadEvents <- LocalFileSystem.scanCopyUpload(uiChannel,
remoteData,
archive)
deleteEvents <- LocalFileSystem.scanDelete(uiChannel, remoteData, archive)
_ <- showSummary(uiChannel)(copyUploadEvents ++ deleteEvents)
} yield ()) <* MessageChannel.endChannel(uiChannel)
}
private def showValidConfig(uiChannel: UIChannel) =
Message.create(UIEvent.ShowValidConfig) >>= MessageChannel.send(uiChannel)
private def fetchRemoteData(uiChannel: UIChannel) =
for {
bucket <- Config.bucket
prefix <- Config.prefix
objects <- Storage.list(bucket, prefix)
_ <- Message.create(UIEvent.RemoteDataFetched(objects.byKey.size)) >>= MessageChannel
.send(uiChannel)
} yield objects
private def handleErrors(throwable: Throwable) =
Console.putStrLn("There were errors:") *> logValidationErrors(throwable)
private def logValidationErrors(throwable: Throwable) =
throwable match {
case ConfigValidationException(errors) =>
ZIO.foreach_(errors)(error => Console.putStrLn(s"- $error"))
}
private def showSummary(uiChannel: UIChannel)(
events: Seq[StorageQueueEvent]): RIO[Clock, Unit] = {
val counters = events.foldLeft(Counters.empty)(countActivities)
Message.create(UIEvent.ShowSummary(counters)) >>=
MessageChannel.send(uiChannel)
}
private def countActivities: (Counters, StorageQueueEvent) => Counters =
(counters: Counters, s3Action: StorageQueueEvent) => {
val increment: Int => Int = _ + 1
s3Action match {
case _: UploadQueueEvent =>
Counters.uploaded.modify(increment)(counters)
case _: CopyQueueEvent => Counters.copied.modify(increment)(counters)
case _: DeleteQueueEvent => Counters.deleted.modify(increment)(counters)
case _: ErrorQueueEvent => Counters.errors.modify(increment)(counters)
case _ => counters
}
}
}
object Program extends Program

View file

@ -24,7 +24,6 @@ val commonSettings = Seq(
"-deprecation",
"-unchecked",
"-language:postfixOps",
"-language:higherKinds",
"-language:higherKinds"),
wartremoverErrors ++= Warts.unsafe.filterNot(wart => List(
Wart.Any,
@ -61,27 +60,26 @@ val awsSdkDependencies = Seq(
)
val zioDependencies = Seq(
libraryDependencies ++= Seq (
"dev.zio" %% "zio" % "1.0.0-RC12-1"
"dev.zio" %% "zio" % "1.0.0-RC12-1",
"dev.zio" %% "zio-streams" % "1.0.0-RC12-1"
)
)
// cli -> thorp-lib -> storage-aws -> core -> storage-api -> console -> config -> domain
// storage-api -> config -> filesystem
val eipDependencies = Seq(
libraryDependencies ++= Seq(
"net.kemitix" %% "eip-zio" % "0.3.1"
)
)
lazy val thorp = (project in file("."))
.settings(commonSettings)
.aggregate(cli, `thorp-lib`, `storage-aws`, core, `storage-api`, domain)
.aggregate(app, cli, `storage-aws`, lib, `storage`, domain)
lazy val cli = (project in file("cli"))
lazy val app = (project in file("app"))
.settings(commonSettings)
.settings(mainClass in assembly := Some("net.kemitix.thorp.cli.Main"))
.settings(mainClass in assembly := Some("net.kemitix.thorp.Main"))
.settings(applicationSettings)
.settings(testDependencies)
.enablePlugins(BuildInfoPlugin)
.settings(
buildInfoKeys := Seq[BuildInfoKey](name, version),
buildInfoPackage := "thorp"
)
.settings(eipDependencies)
.settings(Seq(
assemblyOption in assembly := (
assemblyOption in assembly).value
@ -89,39 +87,62 @@ lazy val cli = (project in file("cli"))
Some(defaultShellScript)),
assemblyJarName in assembly := "thorp"
))
.dependsOn(`thorp-lib`)
lazy val `thorp-lib` = (project in file("thorp-lib"))
.settings(commonSettings)
.settings(assemblyJarName in assembly := "thorp-lib.jar")
.dependsOn(cli)
.dependsOn(lib)
.dependsOn(`storage-aws`)
lazy val cli = (project in file("cli"))
.settings(commonSettings)
.settings(testDependencies)
.dependsOn(config)
.dependsOn(filesystem % "test->test")
lazy val `storage-aws` = (project in file("storage-aws"))
.settings(commonSettings)
.settings(assemblyJarName in assembly := "storage-aws.jar")
.settings(awsSdkDependencies)
.settings(testDependencies)
.dependsOn(core % "compile->compile;test->test")
.dependsOn(storage)
.dependsOn(filesystem % "compile->compile;test->test")
.dependsOn(console)
.dependsOn(lib)
lazy val core = (project in file("core"))
lazy val lib = (project in file("lib"))
.settings(commonSettings)
.settings(assemblyJarName in assembly := "core.jar")
.settings(assemblyJarName in assembly := "lib.jar")
.settings(testDependencies)
.dependsOn(`storage-api`)
.dependsOn(domain % "compile->compile;test->test")
lazy val `storage-api` = (project in file("storage-api"))
.settings(commonSettings)
.settings(zioDependencies)
.settings(assemblyJarName in assembly := "storage-api.jar")
.enablePlugins(BuildInfoPlugin)
.settings(
buildInfoKeys := Seq[BuildInfoKey](name, version),
buildInfoPackage := "thorp"
)
.dependsOn(storage)
.dependsOn(console)
.dependsOn(config)
.dependsOn(domain % "compile->compile;test->test")
.dependsOn(filesystem % "compile->compile;test->test")
lazy val storage = (project in file("storage"))
.settings(commonSettings)
.settings(zioDependencies)
.settings(assemblyJarName in assembly := "storage.jar")
.dependsOn(uishell)
.dependsOn(domain)
lazy val uishell = (project in file("uishell"))
.settings(commonSettings)
.settings(zioDependencies)
.settings(eipDependencies)
.settings(assemblyJarName in assembly := "uishell.jar")
.dependsOn(config)
.dependsOn(console)
.dependsOn(filesystem)
lazy val console = (project in file("console"))
.settings(commonSettings)
.settings(zioDependencies)
.settings(assemblyJarName in assembly := "console.jar")
.dependsOn(config)
.dependsOn(domain)
lazy val config = (project in file("config"))
.settings(commonSettings)
@ -137,6 +158,7 @@ lazy val filesystem = (project in file("filesystem"))
.settings(zioDependencies)
.settings(testDependencies)
.settings(assemblyJarName in assembly := "filesystem.jar")
.dependsOn(domain % "compile->compile;test->test")
lazy val domain = (project in file("domain"))
.settings(commonSettings)

View file

@ -1,7 +1,8 @@
package net.kemitix.thorp.config
package net.kemitix.thorp.cli
import java.nio.file.Paths
import net.kemitix.thorp.config.{ConfigOption, ConfigOptions}
import scopt.OParser
import zio.Task

View file

@ -1,46 +0,0 @@
package net.kemitix.thorp.cli
import net.kemitix.thorp.config._
import net.kemitix.thorp.console._
import net.kemitix.thorp.core.CoreTypes.CoreProgram
import net.kemitix.thorp.core._
import zio.ZIO
trait Program {
lazy val version = s"Thorp v${thorp.BuildInfo.version}"
def run(args: List[String]): CoreProgram[Unit] = {
for {
cli <- CliArgs.parse(args)
config <- ConfigurationBuilder.buildConfig(cli)
_ <- Config.set(config)
_ <- ZIO.when(showVersion(cli))(Console.putStrLn(version))
_ <- ZIO.when(!showVersion(cli))(execute.catchAll(handleErrors))
} yield ()
}
private def showVersion: ConfigOptions => Boolean =
cli => ConfigQuery.showVersion(cli)
private def execute =
for {
_ <- SyncLogging.logRunStart
syncPlan <- PlanBuilder.createPlan
archive <- UnversionedMirrorArchive.default(syncPlan.syncTotals)
events <- PlanExecutor.executePlan(archive, syncPlan)
_ <- SyncLogging.logRunFinished(events)
} yield ()
private def handleErrors(throwable: Throwable) =
Console.putStrLn("There were errors:") *> logValidationErrors(throwable)
private def logValidationErrors(throwable: Throwable) =
throwable match {
case ConfigValidationException(errors) =>
ZIO.foreach_(errors)(error => Console.putStrLn(s"- $error"))
}
}
object Program extends Program

View file

@ -1,8 +1,10 @@
package net.kemitix.thorp.config
package net.kemitix.thorp.cli
import java.nio.file.Paths
import net.kemitix.thorp.config.ConfigOption.Debug
import net.kemitix.thorp.config.{ConfigOptions, ConfigQuery}
import net.kemitix.thorp.filesystem.Resource
import org.scalatest.FunSpec
import zio.DefaultRuntime

View file

@ -3,7 +3,6 @@ package net.kemitix.thorp.console
import java.io.PrintStream
import java.util.concurrent.atomic.AtomicReference
import net.kemitix.thorp.config.Config
import zio.{UIO, ZIO}
import scala.{Console => SConsole}
@ -61,8 +60,8 @@ object Console {
final def putMessageLn(line: ConsoleOut): ZIO[Console, Nothing, Unit] =
ZIO.accessM(_.console putMessageLn line)
final def putMessageLnB(
line: ConsoleOut.WithBatchMode): ZIO[Console with Config, Nothing, Unit] =
ZIO.accessM(line() >>= _.console.putStrLn)
final def putMessageLnB(line: ConsoleOut.WithBatchMode,
batchMode: Boolean): ZIO[Console, Nothing, Unit] =
ZIO.accessM(line(batchMode) >>= _.console.putStrLn)
}

View file

@ -1,10 +1,9 @@
package net.kemitix.thorp.console
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.domain.StorageQueueEvent.Action
import net.kemitix.thorp.domain.Terminal._
import net.kemitix.thorp.domain.{Bucket, RemoteKey, Sources}
import zio.{UIO, ZIO}
import zio.UIO
import scala.io.AnsiColor._
@ -17,8 +16,8 @@ object ConsoleOut {
sealed trait WithBatchMode {
def en: String
def enBatch: String
def apply(): ZIO[Config, Nothing, String] =
Config.batchMode >>= selectLine
def apply(batchMode: Boolean): UIO[String] =
selectLine(batchMode)
private def selectLine(batchMode: Boolean) =
if (batchMode) UIO(enBatch) else UIO(en)
}

View file

@ -1,15 +0,0 @@
package net.kemitix.thorp.core
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.core.hasher.Hasher
import net.kemitix.thorp.filesystem.FileSystem
import net.kemitix.thorp.storage.api.Storage
import zio.ZIO
object CoreTypes {
type CoreEnv = Storage with Console with Config with FileSystem with Hasher
type CoreProgram[A] = ZIO[CoreEnv, Throwable, A]
}

View file

@ -1,40 +0,0 @@
package net.kemitix.thorp.core
import net.kemitix.thorp.domain._
object S3MetaDataEnricher {
def getMetadata(
localFile: LocalFile,
remoteObjects: RemoteObjects
): MatchedMetadata = {
val (keyMatches, hashMatches) = getS3Status(localFile, remoteObjects)
MatchedMetadata(
localFile,
matchByKey = keyMatches.map { hash =>
RemoteMetaData(localFile.remoteKey, hash)
},
matchByHash = hashMatches.map {
case (key, hash) => RemoteMetaData(key, hash)
}
)
}
def getS3Status(
localFile: LocalFile,
remoteObjects: RemoteObjects
): (Option[MD5Hash], Set[(RemoteKey, MD5Hash)]) = {
val matchingByKey = remoteObjects.byKey.get(localFile.remoteKey)
val matchingByHash = localFile.hashes
.map {
case (_, md5Hash) =>
remoteObjects.byHash
.getOrElse(md5Hash, Set())
.map(key => (key, md5Hash))
}
.flatten
.toSet
(matchingByKey, matchingByHash)
}
}

View file

@ -1,58 +0,0 @@
package net.kemitix.thorp.core
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console._
import net.kemitix.thorp.domain.StorageQueueEvent.{
CopyQueueEvent,
DeleteQueueEvent,
ErrorQueueEvent,
UploadQueueEvent
}
import net.kemitix.thorp.domain._
import net.kemitix.thorp.domain.Terminal.eraseToEndOfScreen
import zio.ZIO
trait SyncLogging {
def logRunStart: ZIO[Console with Config, Nothing, Unit] =
for {
bucket <- Config.bucket
prefix <- Config.prefix
sources <- Config.sources
_ <- Console.putMessageLn(ConsoleOut.ValidConfig(bucket, prefix, sources))
} yield ()
def logFileScan: ZIO[Config with Console, Nothing, Unit] =
for {
sources <- Config.sources
_ <- Console.putStrLn(
s"Scanning local files: ${sources.paths.mkString(", ")}...")
} yield ()
def logRunFinished(
actions: Seq[StorageQueueEvent]
): ZIO[Console, Nothing, Unit] = {
val counters = actions.foldLeft(Counters.empty)(countActivities)
Console.putStrLn(eraseToEndOfScreen) *>
Console.putStrLn(s"Uploaded ${counters.uploaded} files") *>
Console.putStrLn(s"Copied ${counters.copied} files") *>
Console.putStrLn(s"Deleted ${counters.deleted} files") *>
Console.putStrLn(s"Errors ${counters.errors}")
}
private def countActivities: (Counters, StorageQueueEvent) => Counters =
(counters: Counters, s3Action: StorageQueueEvent) => {
import Counters._
val increment: Int => Int = _ + 1
s3Action match {
case _: UploadQueueEvent => uploaded.modify(increment)(counters)
case _: CopyQueueEvent => copied.modify(increment)(counters)
case _: DeleteQueueEvent => deleted.modify(increment)(counters)
case _: ErrorQueueEvent => errors.modify(increment)(counters)
case _ => counters
}
}
}
object SyncLogging extends SyncLogging

View file

@ -1,38 +0,0 @@
package net.kemitix.thorp.core
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console.ConsoleOut.{
CopyComplete,
DeleteComplete,
ErrorQueueEventOccurred,
UploadComplete
}
import net.kemitix.thorp.console._
import net.kemitix.thorp.domain.StorageQueueEvent
import net.kemitix.thorp.domain.StorageQueueEvent._
import net.kemitix.thorp.storage.api.Storage
import zio.{RIO, ZIO}
trait ThorpArchive {
def update(
sequencedAction: SequencedAction,
totalBytesSoFar: Long
): RIO[Storage with Console with Config, StorageQueueEvent]
def logEvent(
event: StorageQueueEvent): RIO[Console with Config, StorageQueueEvent] =
event match {
case UploadQueueEvent(remoteKey, _) =>
ZIO(event) <* Console.putMessageLnB(UploadComplete(remoteKey))
case CopyQueueEvent(sourceKey, targetKey) =>
ZIO(event) <* Console.putMessageLnB(CopyComplete(sourceKey, targetKey))
case DeleteQueueEvent(remoteKey) =>
ZIO(event) <* Console.putMessageLnB(DeleteComplete(remoteKey))
case ErrorQueueEvent(action, _, e) =>
ZIO(event) <* Console.putMessageLnB(ErrorQueueEventOccurred(action, e))
case DoNothingQueueEvent(_) => ZIO(event)
case ShutdownQueueEvent() => ZIO(event)
}
}

View file

@ -1,52 +0,0 @@
package net.kemitix.thorp.core
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console._
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.Storage
import zio.{Task, RIO}
final case class UnversionedMirrorArchive(syncTotals: SyncTotals)
extends ThorpArchive {
override def update(
sequencedAction: SequencedAction,
totalBytesSoFar: Long
): RIO[Storage with Console with Config, StorageQueueEvent] =
sequencedAction match {
case SequencedAction(ToUpload(bucket, localFile, _), index) =>
doUpload(index, totalBytesSoFar, bucket, localFile) >>= logEvent
case SequencedAction(ToCopy(bucket, sourceKey, hash, targetKey, _), _) =>
Storage.copy(bucket, sourceKey, hash, targetKey) >>= logEvent
case SequencedAction(ToDelete(bucket, remoteKey, _), _) =>
Storage.delete(bucket, remoteKey) >>= logEvent
case SequencedAction(DoNothing(_, remoteKey, _), _) =>
Task(DoNothingQueueEvent(remoteKey))
}
private def doUpload(
index: Int,
totalBytesSoFar: Long,
bucket: Bucket,
localFile: LocalFile
) =
Storage.upload(
localFile,
bucket,
UploadEventListener.Settings(
localFile,
index,
syncTotals,
totalBytesSoFar
)
)
}
object UnversionedMirrorArchive {
def default(syncTotals: SyncTotals): Task[ThorpArchive] =
Task {
new UnversionedMirrorArchive(syncTotals)
}
}

View file

@ -1,6 +1,4 @@
package net.kemitix.thorp.core
import net.kemitix.thorp.domain.{Bucket, LocalFile, MD5Hash, RemoteKey}
package net.kemitix.thorp.domain
sealed trait Action {
def bucket: Bucket

View file

@ -1,6 +1,4 @@
package net.kemitix.thorp.core
import net.kemitix.thorp.domain.SimpleLens
package net.kemitix.thorp.domain
final case class Counters(
uploaded: Int,

View file

@ -10,7 +10,8 @@ final case class LocalFile private (
file: File,
source: File,
hashes: Map[HashType, MD5Hash],
remoteKey: RemoteKey
remoteKey: RemoteKey,
length: Long
)
object LocalFile {

View file

@ -3,6 +3,6 @@ package net.kemitix.thorp.domain
// For the LocalFile, the set of matching S3 objects with the same MD5Hash, and any S3 object with the same remote key
final case class MatchedMetadata(
localFile: LocalFile,
matchByHash: Set[RemoteMetaData], //TODO Can this be an Option?
matchByHash: Option[RemoteMetaData],
matchByKey: Option[RemoteMetaData]
)

View file

@ -2,23 +2,34 @@ package net.kemitix.thorp.domain
import java.io.File
import java.nio.file.{Path, Paths}
import Implicits._
import zio.UIO
final case class RemoteKey(key: String)
object RemoteKey {
val key: SimpleLens[RemoteKey, String] =
SimpleLens[RemoteKey, String](_.key, b => a => b.copy(key = a))
def asFile(source: Path, prefix: RemoteKey)(
remoteKey: RemoteKey): Option[File] =
if (remoteKey.key.length === 0) None
if (remoteKey.key.length === 0 || !remoteKey.key.startsWith(prefix.key))
None
else Some(source.resolve(RemoteKey.relativeTo(prefix)(remoteKey)).toFile)
def relativeTo(prefix: RemoteKey)(remoteKey: RemoteKey): Path = {
prefix match {
case RemoteKey("") => Paths.get(remoteKey.key)
case _ => Paths.get(prefix.key).relativize(Paths.get(remoteKey.key))
}
def relativeTo(prefix: RemoteKey)(remoteKey: RemoteKey): Path = prefix match {
case RemoteKey("") => Paths.get(remoteKey.key)
case _ => Paths.get(prefix.key).relativize(Paths.get(remoteKey.key))
}
def resolve(path: String)(remoteKey: RemoteKey): RemoteKey =
RemoteKey(List(remoteKey.key, path).filterNot(_.isEmpty).mkString("/"))
def fromSourcePath(source: Path, path: Path): RemoteKey =
RemoteKey(source.relativize(path).toString)
def from(source: Path, prefix: RemoteKey, file: File): UIO[RemoteKey] =
UIO(RemoteKey.resolve(source.relativize(file.toPath).toString)(prefix))
}

View file

@ -1,18 +1,45 @@
package net.kemitix.thorp.domain
import zio.UIO
import scala.collection.MapView
/**
* A list of objects and their MD5 hash values.
*/
final case class RemoteObjects private (
byHash: MapView[MD5Hash, Set[RemoteKey]],
byHash: MapView[MD5Hash, RemoteKey],
byKey: MapView[RemoteKey, MD5Hash]
)
object RemoteObjects {
val empty: RemoteObjects = RemoteObjects(MapView.empty, MapView.empty)
def create(byHash: MapView[MD5Hash, Set[RemoteKey]],
def create(byHash: MapView[MD5Hash, RemoteKey],
byKey: MapView[RemoteKey, MD5Hash]): RemoteObjects =
RemoteObjects(byHash, byKey)
def remoteKeyExists(
remoteObjects: RemoteObjects,
remoteKey: RemoteKey
): UIO[Boolean] = UIO(remoteObjects.byKey.contains(remoteKey))
def remoteMatchesLocalFile(
remoteObjects: RemoteObjects,
localFile: LocalFile
): UIO[Boolean] =
UIO(
remoteObjects.byKey
.get(localFile.remoteKey)
.exists(LocalFile.matchesHash(localFile)))
def remoteHasHash(
remoteObjects: RemoteObjects,
hashes: Map[HashType, MD5Hash]
): UIO[Option[(RemoteKey, MD5Hash)]] =
UIO(remoteObjects.byHash.collectFirst {
case (hash, key) if (hashes.values.exists(h => h == hash)) => (key, hash)
})
}

View file

@ -2,7 +2,7 @@ package net.kemitix.thorp.domain
import java.nio.file.Path
import zio.{Task, ZIO}
import zio.{UIO, ZIO}
/**
* The paths to synchronise with target.
@ -14,13 +14,16 @@ import zio.{Task, ZIO}
*
* A path should only occur once in paths.
*/
final case class Sources(
paths: List[Path]
) {
final case class Sources(paths: List[Path]) {
def +(path: Path): Sources = this ++ List(path)
def ++(otherPaths: List[Path]): Sources =
Sources(otherPaths.foldLeft(paths)((acc, path) =>
if (acc contains path) acc else acc ++ List(path)))
Sources(
otherPaths.foldLeft(paths)(
(acc, path) =>
if (acc contains path) acc
else acc ++ List(path)
)
)
}
object Sources {
@ -29,8 +32,10 @@ object Sources {
/**
* Returns the source path for the given path.
*/
def forPath(path: Path)(sources: Sources): Task[Path] =
def forPath(path: Path)(sources: Sources): UIO[Path] =
ZIO
.fromOption(sources.paths.find(s => path.startsWith(s)))
.mapError(_ => new Exception("Path is not within any known source"))
.orDieWith { _ =>
new RuntimeException("Path is not within any known source")
}
}

View file

@ -10,8 +10,8 @@ object UploadEventListener {
final case class Settings(
localFile: LocalFile,
index: Int,
syncTotals: SyncTotals,
totalBytesSoFar: Long
totalBytesSoFar: Long,
batchMode: Boolean
)
def listener(settings: Settings): UploadEvent => Unit = {
@ -24,7 +24,6 @@ object UploadEventListener {
RequestCycle(settings.localFile,
bytesTransferred.addAndGet(e.transferred),
settings.index,
settings.syncTotals,
settings.totalBytesSoFar))
case _ => ()
}

View file

@ -10,14 +10,13 @@ object UploadEventLogger {
localFile: LocalFile,
bytesTransferred: Long,
index: Int,
syncTotals: SyncTotals,
totalBytesSoFar: Long
)
def apply(requestCycle: RequestCycle): Unit = {
val remoteKey = requestCycle.localFile.remoteKey.key
val fileLength = requestCycle.localFile.file.length
val statusHeight = 7
val statusHeight = 3
if (requestCycle.bytesTransferred < fileLength)
println(
s"${GREEN}Uploading:$RESET $remoteKey$eraseToEndOfScreen\n" +
@ -25,15 +24,6 @@ object UploadEventLogger {
SizeTranslation.sizeInEnglish,
requestCycle.bytesTransferred,
fileLength) +
statusWithBar("Files",
l => l.toString,
requestCycle.index,
requestCycle.syncTotals.count) +
statusWithBar(
" Size",
SizeTranslation.sizeInEnglish,
requestCycle.bytesTransferred + requestCycle.totalBytesSoFar,
requestCycle.syncTotals.totalSizeBytes) +
s"${Terminal.cursorPrevLine(statusHeight)}")
}

View file

@ -4,6 +4,7 @@ import java.io.File
import java.nio.file.Paths
import org.scalatest.FreeSpec
import zio.DefaultRuntime
class RemoteKeyTest extends FreeSpec {
@ -67,6 +68,63 @@ class RemoteKeyTest extends FreeSpec {
assertResult(expected)(result)
}
}
"fromSourcePath" - {
"when path in source" in {
val source = Paths.get("/source")
val path = source.resolve("/source/child")
val expected = RemoteKey("child")
val result = RemoteKey.fromSourcePath(source, path)
assertResult(expected)(result)
}
}
"from source, prefix, file" - {
"when file in source" in {
val source = Paths.get("/source")
val prefix = RemoteKey("prefix")
val file = new File("/source/dir/filename")
val expected = RemoteKey("prefix/dir/filename")
val program = RemoteKey.from(source, prefix, file)
val result = new DefaultRuntime {}.unsafeRunSync(program).toEither
assertResult(Right(expected))(result)
}
}
}
"asFile" - {
"remoteKey is empty" in {
val source = Paths.get("/source")
val prefix = RemoteKey("prefix")
val remoteKey = RemoteKey("")
val expected = None
val result = RemoteKey.asFile(source, prefix)(remoteKey)
assertResult(expected)(result)
}
"remoteKey is not empty" - {
"remoteKey is within prefix" in {
val source = Paths.get("/source")
val prefix = RemoteKey("prefix")
val remoteKey = RemoteKey("prefix/key")
val expected = Some(Paths.get("/source/key").toFile)
val result = RemoteKey.asFile(source, prefix)(remoteKey)
assertResult(expected)(result)
}
"remoteKey is outwith prefix" in {
val source = Paths.get("/source")
val prefix = RemoteKey("prefix")
val remoteKey = RemoteKey("elsewhere/key")
val expected = None
val result = RemoteKey.asFile(source, prefix)(remoteKey)
assertResult(expected)(result)
}
}
}
}

View file

@ -4,7 +4,8 @@ import java.io.{File, FileInputStream}
import java.nio.file.{Files, Path}
import java.util.stream
import zio.{Task, RIO, UIO, ZIO, ZManaged}
import net.kemitix.thorp.domain.{RemoteKey, Sources}
import zio._
import scala.jdk.CollectionConverters._
@ -13,18 +14,23 @@ trait FileSystem {
}
object FileSystem {
trait Service {
def fileExists(file: File): ZIO[FileSystem, Throwable, Boolean]
def fileExists(file: File): ZIO[FileSystem, Nothing, Boolean]
def openManagedFileInputStream(file: File, offset: Long)
: RIO[FileSystem, ZManaged[Any, Throwable, FileInputStream]]
def fileLines(file: File): RIO[FileSystem, Seq[String]]
def isDirectory(file: File): RIO[FileSystem, Boolean]
def listFiles(path: Path): RIO[FileSystem, Iterable[File]]
def length(file: File): ZIO[FileSystem, Nothing, Long]
def hasLocalFile(sources: Sources,
prefix: RemoteKey,
remoteKey: RemoteKey): ZIO[FileSystem, Nothing, Boolean]
}
trait Live extends FileSystem {
override val filesystem: Service = new Service {
override def fileExists(
file: File
): RIO[FileSystem, Boolean] = ZIO(file.exists)
): ZIO[FileSystem, Nothing, Boolean] = UIO(file.exists)
override def openManagedFileInputStream(file: File, offset: Long)
: RIO[FileSystem, ZManaged[Any, Throwable, FileInputStream]] = {
@ -48,18 +54,44 @@ object FileSystem {
ZIO.effectTotal(lines.iterator.asScala.toList)
acquire.bracketAuto(use)
}
override def isDirectory(file: File): RIO[FileSystem, Boolean] =
Task(file.isDirectory)
override def listFiles(path: Path): RIO[FileSystem, Iterable[File]] =
Task(path.toFile.listFiles())
override def length(file: File): ZIO[FileSystem, Nothing, Long] =
UIO(file.length)
override def hasLocalFile(
sources: Sources,
prefix: RemoteKey,
remoteKey: RemoteKey): ZIO[FileSystem, Nothing, Boolean] = {
ZIO.foldLeft(sources.paths)(false) { (accExists, source) =>
RemoteKey
.asFile(source, prefix)(remoteKey)
.map(FileSystem.exists)
.getOrElse(UIO(false))
.map(_ || accExists)
}
}
}
}
object Live extends Live
trait Test extends FileSystem {
val fileExistsResultMap: Task[Map[Path, File]]
val fileExistsResultMap: UIO[Map[Path, File]]
val fileLinesResult: Task[List[String]]
val isDirResult: Task[Boolean]
val listFilesResult: RIO[FileSystem, Iterable[File]]
val lengthResult: UIO[Long]
val managedFileInputStream: Task[ZManaged[Any, Throwable, FileInputStream]]
val hasLocalFileResult: UIO[Boolean]
override val filesystem: Service = new Service {
override def fileExists(file: File): RIO[FileSystem, Boolean] =
override def fileExists(file: File): ZIO[FileSystem, Nothing, Boolean] =
fileExistsResultMap.map(m => m.keys.exists(_ equals file.toPath))
override def openManagedFileInputStream(file: File, offset: Long)
@ -68,10 +100,25 @@ object FileSystem {
override def fileLines(file: File): RIO[FileSystem, List[String]] =
fileLinesResult
override def isDirectory(file: File): RIO[FileSystem, Boolean] =
isDirResult
override def listFiles(path: Path): RIO[FileSystem, Iterable[File]] =
listFilesResult
override def length(file: File): UIO[Long] =
lengthResult
override def hasLocalFile(
sources: Sources,
prefix: RemoteKey,
remoteKey: RemoteKey): ZIO[FileSystem, Nothing, Boolean] =
hasLocalFileResult
}
}
final def exists(file: File): RIO[FileSystem, Boolean] =
final def exists(file: File): ZIO[FileSystem, Nothing, Boolean] =
ZIO.accessM(_.filesystem fileExists file)
final def openAtOffset(file: File, offset: Long)
@ -84,4 +131,19 @@ object FileSystem {
final def lines(file: File): RIO[FileSystem, Seq[String]] =
ZIO.accessM(_.filesystem fileLines (file))
final def isDirectory(file: File): RIO[FileSystem, Boolean] =
ZIO.accessM(_.filesystem.isDirectory(file))
final def listFiles(path: Path): RIO[FileSystem, Iterable[File]] =
ZIO.accessM(_.filesystem.listFiles(path))
final def length(file: File): ZIO[FileSystem, Nothing, Long] =
ZIO.accessM(_.filesystem.length(file))
final def hasLocalFile(
sources: Sources,
prefix: RemoteKey,
remoteKey: RemoteKey): ZIO[FileSystem, Nothing, Boolean] =
ZIO.accessM(_.filesystem.hasLocalFile(sources, prefix, remoteKey))
}

View file

@ -1,11 +1,10 @@
package net.kemitix.thorp.core.hasher
package net.kemitix.thorp.filesystem
import java.nio.file.Path
import java.util.concurrent.atomic.AtomicReference
import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain.{HashType, MD5Hash}
import net.kemitix.thorp.filesystem.FileSystem
import zio.{RIO, ZIO}
/**

View file

@ -1,12 +1,11 @@
package net.kemitix.thorp.core.hasher
package net.kemitix.thorp.filesystem
import java.io.{File, FileInputStream}
import java.nio.file.Path
import java.security.MessageDigest
import net.kemitix.thorp.domain.MD5Hash
import net.kemitix.thorp.filesystem.FileSystem
import zio.{Task, RIO}
import zio.{RIO, Task}
import scala.collection.immutable.NumericRange

View file

@ -0,0 +1,42 @@
package net.kemitix.thorp.filesystem
import net.kemitix.thorp.domain.{RemoteKey, Sources, TemporaryFolder}
import org.scalatest.FreeSpec
import zio.DefaultRuntime
class FileSystemTest extends FreeSpec with TemporaryFolder {
"Live" - {
"hasLocalFile" - {
"file exists" in {
withDirectory(dir => {
val filename = "filename"
createFile(dir, filename, contents = "")
val remoteKey = RemoteKey(filename)
val sources = Sources(List(dir))
val prefix = RemoteKey("")
val program = FileSystem.hasLocalFile(sources, prefix, remoteKey)
val result = new DefaultRuntime {}
.unsafeRunSync(program.provide(FileSystem.Live))
.toEither
val expected = true
assertResult(Right(expected))(result)
})
}
"file does not exist" in {
withDirectory(dir => {
val filename = "filename"
val remoteKey = RemoteKey(filename)
val sources = Sources(List(dir))
val prefix = RemoteKey("")
val program = FileSystem.hasLocalFile(sources, prefix, remoteKey)
val result = new DefaultRuntime {}
.unsafeRunSync(program.provide(FileSystem.Live))
.toEither
val expected = false
assertResult(Right(expected))(result)
})
}
}
}
}

View file

@ -1,11 +1,9 @@
package net.kemitix.thorp.core.hasher
package net.kemitix.thorp.filesystem
import java.nio.file.Path
import net.kemitix.thorp.config.Resource
import net.kemitix.thorp.domain.MD5Hash
import net.kemitix.thorp.domain.MD5HashData.{BigFile, Root}
import net.kemitix.thorp.filesystem.FileSystem
import org.scalatest.FunSpec
import zio.DefaultRuntime

View file

@ -1,4 +1,4 @@
package net.kemitix.thorp.config
package net.kemitix.thorp.filesystem
import java.io.File

View file

@ -1,7 +1,7 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.lib
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToUpload}
import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToUpload}
import net.kemitix.thorp.domain.Implicits._
import net.kemitix.thorp.domain._
import zio.RIO
@ -79,7 +79,7 @@ object ActionGenerator {
private def copyFile(
bucket: Bucket,
localFile: LocalFile,
remoteMetaData: Set[RemoteMetaData]
remoteMetaData: Option[RemoteMetaData]
) =
LazyList
.from(remoteMetaData)

View file

@ -0,0 +1,21 @@
package net.kemitix.thorp.lib
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.filesystem.{FileSystem, Hasher}
import net.kemitix.thorp.storage.Storage
import zio.ZIO
import zio.clock.Clock
object CoreTypes {
type CoreEnv = Storage
with Console
with Config
with Clock
with FileSystem
with Hasher
with FileScanner
type CoreProgram[A] = ZIO[CoreEnv, Throwable, A]
}

View file

@ -1,4 +1,4 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.lib
import net.kemitix.thorp.domain.StorageQueueEvent

View file

@ -0,0 +1,88 @@
package net.kemitix.thorp.lib
import java.io.File
import java.nio.file.Path
import net.kemitix.eip.zio.MessageChannel.{EChannel, ESender}
import net.kemitix.eip.zio.{Message, MessageChannel}
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.domain.{
Filter,
HashType,
LocalFile,
MD5Hash,
RemoteKey,
Sources
}
import net.kemitix.thorp.filesystem.{FileSystem, Hasher}
import zio.clock.Clock
import zio.{RIO, UIO, ZIO}
trait FileScanner {
val fileScanner: FileScanner.Service
}
object FileScanner {
type RemoteHashes = Map[MD5Hash, RemoteKey]
type Hashes = Map[HashType, MD5Hash]
type ScannedFile = LocalFile
type FileSender = ESender[Clock with Hasher with FileSystem with Config,
Throwable,
ScannedFile]
type ScannerChannel = EChannel[Any, Throwable, ScannedFile]
trait Service {
def scanSources: RIO[FileScanner, FileSender]
}
trait Live extends FileScanner {
val fileScanner: Service = new Service {
override def scanSources: RIO[FileScanner, FileSender] =
RIO { channel =>
(for {
sources <- Config.sources
_ <- ZIO.foreach(sources.paths)(scanPath(channel)(_))
} yield ()) <* MessageChannel.endChannel(channel)
}
private def scanPath(channel: ScannerChannel)(path: Path)
: ZIO[Clock with Config with Hasher with FileSystem, Throwable, Unit] =
for {
filters <- Config.filters
files <- FileSystem.listFiles(path)
_ <- ZIO.foreach(files)(handleFile(channel, filters))
} yield ()
private def handleFile(
channel: ScannerChannel,
filters: List[Filter]
)(file: File) =
for {
isDir <- FileSystem.isDirectory(file)
isIncluded <- UIO(Filters.isIncluded(file.toPath)(filters))
_ <- ZIO.when(isIncluded && isDir)(scanPath(channel)(file.toPath))
_ <- ZIO.when(isIncluded && !isDir)(sendHashedFile(channel)(file))
} yield ()
private def sendHashedFile(channel: ScannerChannel)(file: File) =
for {
sources <- Config.sources
source <- Sources.forPath(file.toPath)(sources)
prefix <- Config.prefix
hashes <- Hasher.hashObject(file.toPath)
remoteKey <- RemoteKey.from(source, prefix, file)
size <- FileSystem.length(file)
localFile <- ZIO(
LocalFile(file, source.toFile, hashes, remoteKey, size))
hashedFile <- Message.create(localFile)
_ <- MessageChannel.send(channel)(hashedFile)
} yield ()
}
}
object Live extends Live
final def scanSources: RIO[FileScanner, FileSender] =
ZIO.accessM(_.fileScanner.scanSources)
}

View file

@ -1,4 +1,4 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.lib
import java.nio.file.Path

View file

@ -1,4 +1,4 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.lib
import java.nio.file.Path

View file

@ -1,13 +1,12 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.lib
import java.io.File
import java.nio.file.Path
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.core.hasher.Hasher
import net.kemitix.thorp.domain.Sources
import net.kemitix.thorp.filesystem.FileSystem
import zio.{Task, RIO, ZIO}
import net.kemitix.thorp.filesystem.{FileSystem, Hasher}
import zio.{RIO, Task, ZIO}
object LocalFileStream {

View file

@ -0,0 +1,256 @@
package net.kemitix.thorp.lib
import net.kemitix.eip.zio.MessageChannel.UChannel
import net.kemitix.eip.zio.{Message, MessageChannel}
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain.RemoteObjects.{
remoteHasHash,
remoteKeyExists,
remoteMatchesLocalFile
}
import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem.{FileSystem, Hasher}
import net.kemitix.thorp.lib.FileScanner.Hashes
import net.kemitix.thorp.storage.Storage
import net.kemitix.throp.uishell.UIEvent
import zio._
import zio.clock.Clock
trait LocalFileSystem {
def scanCopyUpload(
uiChannel: UChannel[Any, UIEvent],
remoteObjects: RemoteObjects,
archive: ThorpArchive
): RIO[
Clock with Config with Hasher with FileSystem with FileScanner with Storage,
Seq[StorageQueueEvent]]
def scanDelete(
uiChannel: UChannel[Any, UIEvent],
remoteData: RemoteObjects,
archive: UnversionedMirrorArchive.type
): RIO[Clock with Config with FileSystem with Storage, Seq[StorageQueueEvent]]
}
object LocalFileSystem extends LocalFileSystem {
override def scanCopyUpload(
uiChannel: UChannel[Any, UIEvent],
remoteObjects: RemoteObjects,
archive: ThorpArchive
): RIO[
Clock with Hasher with FileSystem with Config with FileScanner with Storage,
Seq[StorageQueueEvent]] =
for {
actionCounter <- Ref.make(0)
bytesCounter <- Ref.make(0L)
uploads <- Ref.make(Map.empty[MD5Hash, Promise[Throwable, RemoteKey]])
eventsRef <- Ref.make(List.empty[StorageQueueEvent])
fileSender <- FileScanner.scanSources
fileReceiver <- fileReceiver(uiChannel,
remoteObjects,
archive,
uploads,
actionCounter,
bytesCounter,
eventsRef)
_ <- MessageChannel.pointToPoint(fileSender)(fileReceiver).runDrain
events <- eventsRef.get
} yield events
override def scanDelete(
uiChannel: UChannel[Any, UIEvent],
remoteData: RemoteObjects,
archive: UnversionedMirrorArchive.type
): RIO[Clock with Config with FileSystem with Storage,
Seq[StorageQueueEvent]] =
for {
actionCounter <- Ref.make(0)
bytesCounter <- Ref.make(0L)
eventsRef <- Ref.make(List.empty[StorageQueueEvent])
keySender <- keySender(remoteData.byKey.keys)
keyReceiver <- keyReceiver(uiChannel,
archive,
actionCounter,
bytesCounter,
eventsRef)
_ <- MessageChannel.pointToPoint(keySender)(keyReceiver).runDrain
events <- eventsRef.get
} yield events
private def fileReceiver(
uiChannel: UChannel[Any, UIEvent],
remoteObjects: RemoteObjects,
archive: ThorpArchive,
uploads: Ref[Map[MD5Hash, Promise[Throwable, RemoteKey]]],
actionCounterRef: Ref[Int],
bytesCounterRef: Ref[Long],
eventsRef: Ref[List[StorageQueueEvent]]
): UIO[MessageChannel.UReceiver[Clock with Config with Storage,
FileScanner.ScannedFile]] =
UIO { message =>
val localFile = message.body
for {
_ <- uiFileFound(uiChannel)(localFile)
action <- chooseAction(remoteObjects, uploads, uiChannel)(localFile)
actionCounter <- actionCounterRef.update(_ + 1)
bytesCounter <- bytesCounterRef.update(_ + action.size)
actionChosenMessage <- Message.create(UIEvent.ActionChosen(action))
_ <- MessageChannel.send(uiChannel)(actionChosenMessage)
sequencedAction = SequencedAction(action, actionCounter)
event <- archive.update(sequencedAction, bytesCounter)
_ <- eventsRef.update(list => event :: list)
_ <- uiActionFinished(uiChannel)(action, actionCounter, bytesCounter)
} yield ()
}
private def uiActionFinished(uiChannel: UChannel[Any, UIEvent])(
action: Action,
actionCounter: Int,
bytesCounter: Long
) =
Message.create(UIEvent.ActionFinished(action, actionCounter, bytesCounter)) >>=
MessageChannel.send(uiChannel)
private def uiFileFound(uiChannel: UChannel[Any, UIEvent])(
localFile: LocalFile) =
Message.create(UIEvent.FileFound(localFile)) >>=
MessageChannel.send(uiChannel)
private def chooseAction(
remoteObjects: RemoteObjects,
uploads: Ref[Map[MD5Hash, Promise[Throwable, RemoteKey]]],
uiChannel: UChannel[Any, UIEvent],
)(localFile: LocalFile): ZIO[Config with Clock, Nothing, Action] = {
for {
remoteExists <- remoteKeyExists(remoteObjects, localFile.remoteKey)
remoteMatches <- remoteMatchesLocalFile(remoteObjects, localFile)
remoteForHash <- remoteHasHash(remoteObjects, localFile.hashes)
previous <- uploads.get
bucket <- Config.bucket
action <- if (remoteExists && remoteMatches)
doNothing(localFile, bucket)
else {
remoteForHash match {
case Some((sourceKey, hash)) =>
doCopy(localFile, bucket, sourceKey, hash)
case _ if (matchesPreviousUpload(previous, localFile.hashes)) =>
doCopyWithPreviousUpload(localFile, bucket, previous, uiChannel)
case _ =>
doUpload(localFile, bucket)
}
}
} yield action
}
private def matchesPreviousUpload(
previous: Map[MD5Hash, Promise[Throwable, RemoteKey]],
hashes: Hashes
): Boolean =
hashes.exists({
case (_, hash) => previous.contains(hash)
})
private def doNothing(
localFile: LocalFile,
bucket: Bucket
): UIO[Action] = UIO {
DoNothing(bucket, localFile.remoteKey, localFile.length)
}
private def doCopy(
localFile: LocalFile,
bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash
): UIO[Action] = UIO {
ToCopy(bucket, sourceKey, hash, localFile.remoteKey, localFile.length)
}
private def doCopyWithPreviousUpload(
localFile: LocalFile,
bucket: Bucket,
previous: Map[MD5Hash, Promise[Throwable, RemoteKey]],
uiChannel: UChannel[Any, UIEvent],
): ZIO[Clock, Nothing, Action] = {
localFile.hashes
.find({ case (_, hash) => previous.contains(hash) })
.map({
case (_, hash) =>
for {
awaitingMessage <- Message.create(
UIEvent.AwaitingAnotherUpload(localFile.remoteKey, hash))
_ <- MessageChannel.send(uiChannel)(awaitingMessage)
action <- previous(hash).await.map(
remoteKey =>
ToCopy(bucket,
remoteKey,
hash,
localFile.remoteKey,
localFile.length))
waitFinishedMessage <- Message.create(
UIEvent.AnotherUploadWaitComplete(action))
_ <- MessageChannel.send(uiChannel)(waitFinishedMessage)
} yield action
})
.getOrElse(doUpload(localFile, bucket))
.refineToOrDie[Nothing]
}
private def doUpload(
localFile: LocalFile,
bucket: Bucket
): UIO[Action] = {
UIO(ToUpload(bucket, localFile, localFile.length))
}
def keySender(
keys: Iterable[RemoteKey]): UIO[MessageChannel.Sender[Clock, RemoteKey]] =
UIO { channel =>
ZIO.foreach(keys) { key =>
Message.create(key) >>= MessageChannel.send(channel)
} *> MessageChannel.endChannel(channel)
}
def keyReceiver(
uiChannel: UChannel[Any, UIEvent],
archive: ThorpArchive,
actionCounterRef: Ref[Int],
bytesCounterRef: Ref[Long],
eventsRef: Ref[List[StorageQueueEvent]]
): UIO[
MessageChannel.UReceiver[Clock with Config with FileSystem with Storage,
RemoteKey]] =
UIO { message =>
{
val remoteKey = message.body
for {
_ <- uiKeyFound(uiChannel)(remoteKey)
sources <- Config.sources
prefix <- Config.prefix
exists <- FileSystem.hasLocalFile(sources, prefix, remoteKey)
_ <- ZIO.when(!exists) {
for {
actionCounter <- actionCounterRef.update(_ + 1)
bucket <- Config.bucket
action = ToDelete(bucket, remoteKey, 0L)
bytesCounter <- bytesCounterRef.update(_ + action.size)
sequencedAction = SequencedAction(action, actionCounter)
event <- archive.update(sequencedAction, 0L)
_ <- eventsRef.update(list => event :: list)
_ <- uiActionFinished(uiChannel)(action,
actionCounter,
bytesCounter)
} yield ()
}
} yield ()
}
}
private def uiKeyFound(uiChannel: UChannel[Any, UIEvent])(
remoteKey: RemoteKey) =
Message.create(UIEvent.KeyFound(remoteKey)) >>=
MessageChannel.send(uiChannel)
}

View file

@ -1,4 +1,4 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.lib
import java.io.File
import java.nio.file.Path
@ -24,7 +24,7 @@ object LocalFileValidator {
for {
file <- validateFile(path.toFile)
remoteKey <- validateRemoteKey(sources, prefix, path)
} yield LocalFile(file, source, hash, remoteKey)
} yield LocalFile(file, source, hash, remoteKey, file.length)
private def validateFile(file: File): IO[Violation, File] =
if (file.isDirectory)

View file

@ -1,4 +1,4 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.lib
import net.kemitix.thorp.domain.LocalFile

View file

@ -1,35 +1,24 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.lib
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console._
import net.kemitix.thorp.core.Action._
import net.kemitix.thorp.core.hasher.Hasher
import net.kemitix.thorp.domain.Action._
import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem.FileSystem
import net.kemitix.thorp.storage.api.Storage
import net.kemitix.thorp.filesystem.{FileSystem, Hasher}
import net.kemitix.thorp.storage.Storage
import zio.{RIO, ZIO}
object PlanBuilder {
def createPlan
def createPlan(remoteObjects: RemoteObjects)
: RIO[Storage with Console with Config with FileSystem with Hasher,
SyncPlan] = (fetchRemoteData <&> findLocalFiles) >>= assemblePlan
SyncPlan] = findLocalFiles >>= assemblePlan(remoteObjects)
private def fetchRemoteData =
for {
bucket <- Config.bucket
prefix <- Config.prefix
objects <- Storage.list(bucket, prefix)
_ <- Console.putStrLn(s"Found ${objects.byKey.size} remote objects")
} yield objects
private def assemblePlan(metadata: (RemoteObjects, LocalFiles)) =
metadata match {
case (remoteObjects, localData) =>
createActions(remoteObjects, localData.localFiles)
.map(_.filter(doesSomething).sortBy(SequencePlan.order))
.map(syncPlan(localData))
}
private def assemblePlan(remoteObjects: RemoteObjects)(
localData: LocalFiles) =
createActions(remoteObjects, localData.localFiles)
.map(_.filter(doesSomething).sortBy(SequencePlan.order))
.map(syncPlan(localData))
private def syncPlan(localData: LocalFiles): LazyList[Action] => SyncPlan =
SyncPlan.create(_, syncTotal(localData))

View file

@ -1,9 +1,9 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.lib
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.domain.StorageQueueEvent
import net.kemitix.thorp.storage.api.Storage
import net.kemitix.thorp.domain.{Action, StorageQueueEvent}
import net.kemitix.thorp.storage.Storage
import zio.{Ref, ZIO}
trait PlanExecutor {

View file

@ -1,4 +1,4 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.lib
import java.nio.file.Path

View file

@ -0,0 +1,50 @@
package net.kemitix.thorp.lib
import net.kemitix.thorp.domain._
object S3MetaDataEnricher {
def getMetadata(
localFile: LocalFile,
remoteObjects: RemoteObjects
): MatchedMetadata = {
val (keyMatches, hashMatches) = getS3Status(localFile, remoteObjects)
val maybeByKey: Option[RemoteMetaData] = keyMatches.map { hash =>
RemoteMetaData(localFile.remoteKey, hash)
}
val maybeByHash: Option[RemoteMetaData] = hashMatches.map {
case (md5Hash, remoteKey) =>
RemoteMetaData(remoteKey, md5Hash)
}.headOption
MatchedMetadata(
localFile,
matchByKey = maybeByKey,
matchByHash = maybeByHash
)
}
def getS3Status(
localFile: LocalFile,
remoteObjects: RemoteObjects
): (Option[MD5Hash], Map[MD5Hash, RemoteKey]) = {
val byKey: Option[MD5Hash] =
remoteObjects.byKey.get(localFile.remoteKey)
val hashes: Map[HashType, MD5Hash] = localFile.hashes
val byHash: Map[MD5Hash, RemoteKey] =
hashes
.map {
case (hashType, md5Hash) =>
(md5Hash, remoteObjects.byHash.get(md5Hash))
}
.flatMap {
case (md5Hash, Some(maybeyKey)) => Some((md5Hash, maybeyKey))
case (_, None) => None
}
(byKey, byHash)
}
}

View file

@ -1,6 +1,7 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.lib
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain.Action
import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
trait SequencePlan {

View file

@ -1,4 +1,6 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.lib
import net.kemitix.thorp.domain.Action
final case class SequencedAction(
action: Action,

View file

@ -0,0 +1,18 @@
package net.kemitix.thorp.lib
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console._
import zio.ZIO
trait SyncLogging {
def logFileScan: ZIO[Config with Console, Nothing, Unit] =
for {
sources <- Config.sources
_ <- Console.putStrLn(
s"Scanning local files: ${sources.paths.mkString(", ")}...")
} yield ()
}
object SyncLogging extends SyncLogging

View file

@ -1,6 +1,6 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.lib
import net.kemitix.thorp.domain.SyncTotals
import net.kemitix.thorp.domain.{Action, SyncTotals}
final case class SyncPlan private (
actions: LazyList[Action],

View file

@ -0,0 +1,47 @@
package net.kemitix.thorp.lib
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console.ConsoleOut.{
CopyComplete,
DeleteComplete,
ErrorQueueEventOccurred,
UploadComplete
}
import net.kemitix.thorp.console._
import net.kemitix.thorp.domain.StorageQueueEvent
import net.kemitix.thorp.domain.StorageQueueEvent._
import net.kemitix.thorp.storage.Storage
import zio.{RIO, ZIO}
trait ThorpArchive {
def update(
sequencedAction: SequencedAction,
totalBytesSoFar: Long
): ZIO[Storage with Config, Nothing, StorageQueueEvent]
def logEvent(
event: StorageQueueEvent): RIO[Console with Config, StorageQueueEvent] =
for {
batchMode <- Config.batchMode
sqe <- event match {
case UploadQueueEvent(remoteKey, _) =>
ZIO(event) <* Console.putMessageLnB(UploadComplete(remoteKey),
batchMode)
case CopyQueueEvent(sourceKey, targetKey) =>
ZIO(event) <* Console.putMessageLnB(
CopyComplete(sourceKey, targetKey),
batchMode)
case DeleteQueueEvent(remoteKey) =>
ZIO(event) <* Console.putMessageLnB(DeleteComplete(remoteKey),
batchMode)
case ErrorQueueEvent(action, _, e) =>
ZIO(event) <* Console.putMessageLnB(
ErrorQueueEventOccurred(action, e),
batchMode)
case DoNothingQueueEvent(_) => ZIO(event)
case ShutdownQueueEvent() => ZIO(event)
}
} yield sqe
}

View file

@ -0,0 +1,48 @@
package net.kemitix.thorp.lib
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.Storage
import zio.{UIO, ZIO}
trait UnversionedMirrorArchive extends ThorpArchive {
override def update(
sequencedAction: SequencedAction,
totalBytesSoFar: Long
): ZIO[Storage with Config, Nothing, StorageQueueEvent] =
sequencedAction match {
case SequencedAction(ToUpload(bucket, localFile, _), index) =>
doUpload(index, totalBytesSoFar, bucket, localFile)
case SequencedAction(ToCopy(bucket, sourceKey, hash, targetKey, _), _) =>
Storage.copy(bucket, sourceKey, hash, targetKey)
case SequencedAction(ToDelete(bucket, remoteKey, _), _) =>
Storage.delete(bucket, remoteKey)
case SequencedAction(DoNothing(_, remoteKey, _), _) =>
UIO(DoNothingQueueEvent(remoteKey))
}
private def doUpload(
index: Int,
totalBytesSoFar: Long,
bucket: Bucket,
localFile: LocalFile
) =
for {
batchMode <- Config.batchMode
upload <- Storage.upload(
localFile,
bucket,
UploadEventListener.Settings(
localFile,
index,
totalBytesSoFar,
batchMode
)
)
} yield upload
}
object UnversionedMirrorArchive extends UnversionedMirrorArchive

View file

@ -0,0 +1 @@
This file is in the root directory of the upload tree.

View file

@ -1,10 +1,10 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.lib
import net.kemitix.thorp.config._
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToUpload}
import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToUpload}
import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem.FileSystem
import net.kemitix.thorp.filesystem.{FileSystem, Resource}
import org.scalatest.FunSpec
import zio.DefaultRuntime
@ -38,7 +38,7 @@ class ActionGeneratorSuite extends FunSpec {
theRemoteMetadata = RemoteMetaData(theFile.remoteKey, theHash)
input = MatchedMetadata(
theFile, // local exists
matchByHash = Set(theRemoteMetadata), // remote matches
matchByHash = Some(theRemoteMetadata), // remote matches
matchByKey = Some(theRemoteMetadata) // remote exists
)
} yield (theFile, input)
@ -67,7 +67,7 @@ class ActionGeneratorSuite extends FunSpec {
otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash)
input = MatchedMetadata(
theFile, // local exists
matchByHash = Set(otherRemoteMetadata), // other matches
matchByHash = Some(otherRemoteMetadata), // other matches
matchByKey = None) // remote is missing
} yield (theFile, theRemoteKey, input, otherRemoteKey)
it("copy from other key") {
@ -94,7 +94,7 @@ class ActionGeneratorSuite extends FunSpec {
sources,
prefix)
input = MatchedMetadata(theFile, // local exists
matchByHash = Set.empty, // other no matches
matchByHash = None, // other no matches
matchByKey = None) // remote is missing
} yield (theFile, input)
it("upload") {
@ -127,7 +127,7 @@ class ActionGeneratorSuite extends FunSpec {
)
input = MatchedMetadata(
theFile, // local exists
matchByHash = Set(otherRemoteMetadata), // other matches
matchByHash = Some(otherRemoteMetadata), // other matches
matchByKey = Some(oldRemoteMetadata)) // remote exists
} yield (theFile, theRemoteKey, input, otherRemoteKey)
it("copy from other key") {
@ -160,7 +160,7 @@ class ActionGeneratorSuite extends FunSpec {
theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash)
input = MatchedMetadata(
theFile, // local exists
matchByHash = Set.empty, // remote no match, other no match
matchByHash = None, // remote no match, other no match
matchByKey = Some(theRemoteMetadata) // remote exists
)
} yield (theFile, input)

View file

@ -1,10 +1,9 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.lib
import java.io.File
import net.kemitix.thorp.console._
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.Storage
import net.kemitix.thorp.storage.Storage
import zio.{RIO, UIO}
final case class DummyStorageService(
@ -18,7 +17,7 @@ final case class DummyStorageService(
override def listObjects(
bucket: Bucket,
prefix: RemoteKey
): RIO[Console, RemoteObjects] =
): RIO[Storage, RemoteObjects] =
RIO(remoteObjects)
override def upload(

View file

@ -0,0 +1,50 @@
package net.kemitix.thorp.lib
import org.scalatest.FreeSpec
import zio.stream._
import zio.{DefaultRuntime, IO, UIO, ZIO}
// Experiment on how to trigger events asynchronously
// i.e. Have one thread add to a queue and another take from the queue, neither waiting for the other thread to finish
class EIPTest extends FreeSpec {
"queue" in {
type Callback[A] = IO[Option[Throwable], A] => Unit
def offerInt(cb: Callback[Int], i: Int) = ZIO(cb(IO.succeed(i)))
def closeStream(cb: Callback[Int]) = ZIO(cb(IO.fail(None)))
def publish: Callback[Int] => IO[Throwable, _] =
cb =>
ZIO.foreach(1 to 3) { i =>
ZIO {
println(s"put $i")
Thread.sleep(100)
} *> offerInt(cb, i)
} *> closeStream(cb)
val program = Stream
.effectAsyncM(publish)
.mapM(i => ZIO(println(s"get $i")))
new DefaultRuntime {}.unsafeRunSync(program.runDrain)
}
"EIP: Message Channel" in {
type Message = Int
type Callback = IO[Option[Throwable], Message] => Unit
def producer: Callback => UIO[Unit] =
cb =>
ZIO.foreach(1 to 3)(message =>
UIO {
println(s"put $message")
cb(ZIO.succeed(message))
Thread.sleep(100)
}) *> UIO(cb(ZIO.fail(None)))
def consumer: Message => ZIO[Any, Throwable, Unit] =
message => ZIO(println(s"got $message"))
val program = zio.stream.Stream
.effectAsyncM(producer)
.buffer(1)
.mapM(consumer)
new DefaultRuntime {}.unsafeRunSync(program.runDrain)
}
}

View file

@ -1,4 +1,4 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.lib
import java.nio.file.Paths

View file

@ -1,10 +1,10 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.lib
import java.io.File
import java.nio.file.Path
import net.kemitix.thorp.config.Resource
import net.kemitix.thorp.domain.{RemoteKey, Sources}
import net.kemitix.thorp.filesystem.Resource
import org.scalatest.FunSpec
import zio.DefaultRuntime

View file

@ -1,4 +1,4 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.lib
import java.nio.file.Paths
@ -6,15 +6,13 @@ import net.kemitix.thorp.config.{
Config,
ConfigOption,
ConfigOptions,
ConfigurationBuilder,
Resource
ConfigurationBuilder
}
import net.kemitix.thorp.console._
import net.kemitix.thorp.core.hasher.Hasher
import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem.FileSystem
import net.kemitix.thorp.storage.api.Storage
import net.kemitix.thorp.filesystem.{FileSystem, Hasher, Resource}
import net.kemitix.thorp.storage.Storage
import org.scalatest.FunSpec
import zio.{DefaultRuntime, Task, UIO}

View file

@ -1,9 +1,9 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.lib
import net.kemitix.thorp.config.Resource
import net.kemitix.thorp.core.S3MetaDataEnricher.{getMetadata, getS3Status}
import net.kemitix.thorp.lib.S3MetaDataEnricher.{getMetadata, getS3Status}
import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem.Resource
import org.scalatest.FunSpec
import scala.collection.MapView
@ -15,17 +15,11 @@ class MatchedMetadataEnricherSuite extends FunSpec {
private val prefix = RemoteKey("prefix")
def getMatchesByKey(
status: (Option[MD5Hash], Set[(RemoteKey, MD5Hash)])): Option[MD5Hash] = {
status: (Option[MD5Hash], Map[MD5Hash, RemoteKey])): Option[MD5Hash] = {
val (byKey, _) = status
byKey
}
def getMatchesByHash(status: (Option[MD5Hash], Set[(RemoteKey, MD5Hash)]))
: Set[(RemoteKey, MD5Hash)] = {
val (_, byHash) = status
byHash
}
describe("enrich with metadata") {
describe(
@ -39,7 +33,7 @@ class MatchedMetadataEnricherSuite extends FunSpec {
prefix)
theRemoteKey = theFile.remoteKey
remoteObjects = RemoteObjects(
byHash = MapView(theHash -> Set(theRemoteKey)),
byHash = MapView(theHash -> theRemoteKey),
byKey = MapView(theRemoteKey -> theHash)
)
theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash)
@ -47,9 +41,10 @@ class MatchedMetadataEnricherSuite extends FunSpec {
it("generates valid metadata") {
env.map({
case (theFile, theRemoteMetadata, remoteObjects) => {
val expected = MatchedMetadata(theFile,
matchByHash = Set(theRemoteMetadata),
matchByKey = Some(theRemoteMetadata))
val expected =
MatchedMetadata(theFile,
matchByHash = Some(theRemoteMetadata),
matchByKey = Some(theRemoteMetadata))
val result = getMetadata(theFile, remoteObjects)
assertResult(expected)(result)
}
@ -67,7 +62,7 @@ class MatchedMetadataEnricherSuite extends FunSpec {
prefix)
theRemoteKey: RemoteKey = RemoteKey.resolve("the-file")(prefix)
remoteObjects = RemoteObjects(
byHash = MapView(theHash -> Set(theRemoteKey)),
byHash = MapView(theHash -> theRemoteKey),
byKey = MapView(theRemoteKey -> theHash)
)
theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash)
@ -75,9 +70,10 @@ class MatchedMetadataEnricherSuite extends FunSpec {
it("generates valid metadata") {
env.map({
case (theFile, theRemoteMetadata, remoteObjects) => {
val expected = MatchedMetadata(theFile,
matchByHash = Set(theRemoteMetadata),
matchByKey = Some(theRemoteMetadata))
val expected =
MatchedMetadata(theFile,
matchByHash = Some(theRemoteMetadata),
matchByKey = Some(theRemoteMetadata))
val result = getMetadata(theFile, remoteObjects)
assertResult(expected)(result)
}
@ -95,7 +91,7 @@ class MatchedMetadataEnricherSuite extends FunSpec {
prefix)
otherRemoteKey = RemoteKey("other-key")
remoteObjects = RemoteObjects(
byHash = MapView(theHash -> Set(otherRemoteKey)),
byHash = MapView(theHash -> otherRemoteKey),
byKey = MapView(otherRemoteKey -> theHash)
)
otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash)
@ -105,7 +101,7 @@ class MatchedMetadataEnricherSuite extends FunSpec {
case (theFile, otherRemoteMetadata, remoteObjects) => {
val expected = MatchedMetadata(theFile,
matchByHash =
Set(otherRemoteMetadata),
Some(otherRemoteMetadata),
matchByKey = None)
val result = getMetadata(theFile, remoteObjects)
assertResult(expected)(result)
@ -128,9 +124,7 @@ class MatchedMetadataEnricherSuite extends FunSpec {
env.map({
case (theFile, remoteObjects) => {
val expected =
MatchedMetadata(theFile,
matchByHash = Set.empty,
matchByKey = None)
MatchedMetadata(theFile, matchByHash = None, matchByKey = None)
val result = getMetadata(theFile, remoteObjects)
assertResult(expected)(result)
}
@ -150,8 +144,7 @@ class MatchedMetadataEnricherSuite extends FunSpec {
oldHash = MD5Hash("old-hash")
otherRemoteKey = RemoteKey.resolve("other-key")(prefix)
remoteObjects = RemoteObjects(
byHash = MapView(oldHash -> Set(theRemoteKey),
theHash -> Set(otherRemoteKey)),
byHash = MapView(oldHash -> theRemoteKey, theHash -> otherRemoteKey),
byKey = MapView(
theRemoteKey -> oldHash,
otherRemoteKey -> theHash
@ -168,7 +161,7 @@ class MatchedMetadataEnricherSuite extends FunSpec {
remoteObjects) => {
val expected = MatchedMetadata(theFile,
matchByHash =
Set(otherRemoteMetadata),
Some(otherRemoteMetadata),
matchByKey = Some(theRemoteMetadata))
val result = getMetadata(theFile, remoteObjects)
assertResult(expected)(result)
@ -188,7 +181,7 @@ class MatchedMetadataEnricherSuite extends FunSpec {
theRemoteKey = theFile.remoteKey
oldHash = MD5Hash("old-hash")
remoteObjects = RemoteObjects(
byHash = MapView(oldHash -> Set(theRemoteKey), theHash -> Set.empty),
byHash = MapView(oldHash -> theRemoteKey),
byKey = MapView(theRemoteKey -> oldHash)
)
theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash)
@ -197,7 +190,7 @@ class MatchedMetadataEnricherSuite extends FunSpec {
env.map({
case (theFile, theRemoteMetadata, remoteObjects) => {
val expected = MatchedMetadata(theFile,
matchByHash = Set.empty,
matchByHash = None,
matchByKey = Some(theRemoteMetadata))
val result = getMetadata(theFile, remoteObjects)
assertResult(expected)(result)
@ -233,8 +226,8 @@ class MatchedMetadataEnricherSuite extends FunSpec {
prefix)
remoteObjects = RemoteObjects(
byHash = MapView(
hash -> Set(key, keyOtherKey.remoteKey),
diffHash -> Set(keyDiffHash.remoteKey)
hash -> key,
diffHash -> keyDiffHash.remoteKey
),
byKey = MapView(
key -> hash,
@ -283,7 +276,10 @@ class MatchedMetadataEnricherSuite extends FunSpec {
case (remoteObjects, _, _, _) => {
env2.map({
case (localFile) => {
val result = getMatchesByHash(invoke(localFile, remoteObjects))
val result = {
val (_, byHash) = invoke(localFile, remoteObjects)
byHash
}
assert(result.isEmpty)
}
})
@ -300,7 +296,10 @@ class MatchedMetadataEnricherSuite extends FunSpec {
assert(result.contains(diffHash))
}
it("should return only itself in match by hash") {
val result = getMatchesByHash(invoke(keyDiffHash, remoteObjects))
val result = {
val (_, byHash) = invoke(keyDiffHash, remoteObjects)
byHash
}
assert(result === Set((keyDiffHash.remoteKey, diffHash)))
}
}

View file

@ -1,4 +1,4 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.lib
import java.io.File
import java.nio.file.Path
@ -10,12 +10,11 @@ import net.kemitix.thorp.config.{
ConfigurationBuilder
}
import net.kemitix.thorp.console._
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.core.hasher.Hasher
import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem._
import net.kemitix.thorp.storage.api.Storage
import net.kemitix.thorp.filesystem.{Hasher, _}
import net.kemitix.thorp.storage.Storage
import org.scalatest.FreeSpec
import zio.{DefaultRuntime, Task, UIO}
@ -64,7 +63,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val anOtherKey = RemoteKey("other")
val expected = Right(List(toCopy(anOtherKey, aHash, remoteKey)))
val remoteObjects = RemoteObjects(
byHash = MapView(aHash -> Set(anOtherKey)),
byHash = MapView(aHash -> anOtherKey),
byKey = MapView(anOtherKey -> aHash)
)
val result =
@ -86,7 +85,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
// DoNothing actions should have been filtered out of the plan
val expected = Right(List())
val remoteObjects = RemoteObjects(
byHash = MapView(hash -> Set(remoteKey)),
byHash = MapView(hash -> remoteKey),
byKey = MapView(remoteKey -> hash)
)
val result =
@ -107,7 +106,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val expected =
Right(List(toUpload(remoteKey, currentHash, source, file)))
val remoteObjects = RemoteObjects(
byHash = MapView(originalHash -> Set(remoteKey)),
byHash = MapView(originalHash -> remoteKey),
byKey = MapView(remoteKey -> originalHash)
)
val result =
@ -126,7 +125,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val sourceKey = RemoteKey("other-key")
val expected = Right(List(toCopy(sourceKey, hash, remoteKey)))
val remoteObjects = RemoteObjects(
byHash = MapView(hash -> Set(sourceKey)),
byHash = MapView(hash -> sourceKey),
byKey = MapView.empty
)
val result =
@ -151,7 +150,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
// DoNothing actions should have been filtered out of the plan
val expected = Right(List())
val remoteObjects = RemoteObjects(
byHash = MapView(hash -> Set(remoteKey)),
byHash = MapView(hash -> remoteKey),
byKey = MapView(remoteKey -> hash)
)
val result =
@ -168,7 +167,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val hash = MD5Hash("file-content")
val expected = Right(List(toDelete(remoteKey)))
val remoteObjects = RemoteObjects(
byHash = MapView(hash -> Set(remoteKey)),
byHash = MapView(hash -> remoteKey),
byKey = MapView(remoteKey -> hash)
)
val result =
@ -255,7 +254,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val hash2 = md5Hash(fileInSecondSource)
val expected = Right(List())
val remoteObjects =
RemoteObjects(byHash = MapView(hash2 -> Set(remoteKey2)),
RemoteObjects(byHash = MapView(hash2 -> remoteKey2),
byKey = MapView(remoteKey2 -> hash2))
val result =
invoke(options(firstSource)(secondSource),
@ -276,7 +275,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
withDirectory(secondSource => {
val expected = Right(List())
val remoteObjects =
RemoteObjects(byHash = MapView(hash1 -> Set(remoteKey1)),
RemoteObjects(byHash = MapView(hash1 -> remoteKey1),
byKey = MapView(remoteKey1 -> hash1))
val result =
invoke(options(firstSource)(secondSource),
@ -367,9 +366,12 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
}
def testProgram =
for {
config <- ConfigurationBuilder.buildConfig(configOptions)
_ <- Config.set(config)
plan <- PlanBuilder.createPlan
config <- ConfigurationBuilder.buildConfig(configOptions)
_ <- Config.set(config)
bucket <- Config.bucket
prefix <- Config.prefix
remoteObjects <- Storage.list(bucket, prefix)
plan <- PlanBuilder.createPlan(remoteObjects)
} yield plan
new DefaultRuntime {}
.unsafeRunSync(testProgram.provide(testEnv))

View file

@ -1,17 +1,17 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.lib
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.core.Action.DoNothing
import net.kemitix.thorp.domain.Action.DoNothing
import net.kemitix.thorp.domain.{
Bucket,
RemoteKey,
StorageQueueEvent,
SyncTotals
}
import net.kemitix.thorp.storage.api.Storage
import net.kemitix.thorp.storage.Storage
import org.scalatest.FreeSpec
import zio.{DefaultRuntime, ZIO}
import zio.{DefaultRuntime, UIO, ZIO}
class PlanExecutorTest extends FreeSpec {
@ -33,7 +33,7 @@ class PlanExecutorTest extends FreeSpec {
LazyList.from(1 to nActions).map(DoNothing(bucket, remoteKey, _))
val syncTotals = SyncTotals.empty
val archiveTask = UnversionedMirrorArchive.default(syncTotals)
val archiveTask = UIO(UnversionedMirrorArchive)
val syncPlan = SyncPlan(input, syncTotals)
val program: ZIO[Storage with Config with Console,

View file

@ -1,15 +1,9 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.lib
import java.io.File
import net.kemitix.thorp.core.Action._
import net.kemitix.thorp.domain.{
Bucket,
HashType,
LocalFile,
MD5Hash,
RemoteKey
}
import net.kemitix.thorp.domain.Action._
import net.kemitix.thorp.domain._
import org.scalatest.FreeSpec
class SequencePlanTest extends FreeSpec {
@ -26,9 +20,9 @@ class SequencePlanTest extends FreeSpec {
val file2 = new File("aFile")
val source = new File("source")
val localFile1 =
LocalFile(file1, source, hashes, remoteKey1)
LocalFile(file1, source, hashes, remoteKey1, file1.length)
val _ =
LocalFile(file2, source, hashes, remoteKey2)
LocalFile(file2, source, hashes, remoteKey2, file2.length)
val copy1 = ToCopy(bucket, remoteKey1, hash, remoteKey2, size)
val copy2 = ToCopy(bucket, remoteKey2, hash, remoteKey1, size)
val upload1 = ToUpload(bucket, localFile1, size)

26
modules.dot Normal file
View file

@ -0,0 +1,26 @@
digraph deps {
app -> cli
app -> lib
app -> "storage-aws"
cli -> config
lib -> storage
lib -> console
lib -> config
lib -> filesystem
lib -> domain
"storage-aws" -> storage
config -> filesystem
config -> domain
storage -> domain
console -> domain
filesystem -> domain
}

View file

@ -5,11 +5,11 @@ import com.amazonaws.services.s3.model.{
ListObjectsV2Result,
S3ObjectSummary
}
import net.kemitix.thorp.console._
import net.kemitix.thorp.domain.{Bucket, RemoteKey, RemoteObjects}
import net.kemitix.thorp.storage.Storage
import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash
import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey
import zio.{Task, RIO}
import zio.{RIO, Task}
import scala.jdk.CollectionConverters._
@ -21,7 +21,7 @@ trait Lister {
def listObjects(amazonS3: AmazonS3.Client)(
bucket: Bucket,
prefix: RemoteKey
): RIO[Console, RemoteObjects] = {
): RIO[Storage, RemoteObjects] = {
def request =
new ListObjectsV2Request()
@ -31,15 +31,16 @@ trait Lister {
def requestMore: Token => ListObjectsV2Request =
token => request.withContinuationToken(token)
def fetchBatch: ListObjectsV2Request => RIO[Console, Batch] =
request => ListerLogger.logFetchBatch *> tryFetchBatch(amazonS3)(request)
def fetchBatch: ListObjectsV2Request => Task[Batch] =
request => tryFetchBatch(amazonS3)(request)
def fetchMore: Option[Token] => RIO[Console, LazyList[S3ObjectSummary]] = {
def fetchMore: Option[Token] => Task[LazyList[S3ObjectSummary]] = {
case None => RIO.succeed(LazyList.empty)
case Some(token) => fetch(requestMore(token))
}
def fetch: ListObjectsV2Request => RIO[Console, LazyList[S3ObjectSummary]] =
def fetch: ListObjectsV2Request => Task[LazyList[S3ObjectSummary]] =
request =>
for {
batch <- fetchBatch(request)

View file

@ -9,12 +9,11 @@ object S3ObjectsByHash {
def byHash(
os: LazyList[S3ObjectSummary]
): MapView[MD5Hash, Set[RemoteKey]] = {
val mD5HashToS3Objects: Map[MD5Hash, LazyList[S3ObjectSummary]] =
os.groupBy(o => MD5Hash(o.getETag.filter(_ != '"')))
mD5HashToS3Objects.view.mapValues { os =>
os.map(_.getKey).map(RemoteKey(_)).toSet
}
}
): MapView[MD5Hash, RemoteKey] =
os.map { o =>
(MD5Hash(o.getETag) -> RemoteKey(o.getKey))
}
.toMap
.view
}

View file

@ -2,13 +2,11 @@ package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import com.amazonaws.services.s3.transfer.TransferManagerBuilder
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.Storage
import net.kemitix.thorp.storage.api.Storage.Service
import zio.{RIO, UIO, ZIO}
import net.kemitix.thorp.storage.Storage
import net.kemitix.thorp.storage.Storage.Service
import zio.{RIO, UIO}
object S3Storage {
trait Live extends Storage {
@ -21,14 +19,14 @@ object S3Storage {
TransferManagerBuilder.defaultTransferManager)
override def listObjects(bucket: Bucket,
prefix: RemoteKey): RIO[Console, RemoteObjects] =
prefix: RemoteKey): RIO[Storage, RemoteObjects] =
Lister.listObjects(client)(bucket, prefix)
override def upload(
localFile: LocalFile,
bucket: Bucket,
listenerSettings: UploadEventListener.Settings,
): ZIO[Config, Nothing, StorageQueueEvent] =
): UIO[StorageQueueEvent] =
Uploader.upload(transferManager)(
Uploader.Request(localFile, bucket, listenerSettings))

View file

@ -5,7 +5,6 @@ import java.util.concurrent.locks.StampedLock
import com.amazonaws.event.ProgressEventType.RESPONSE_BYTE_TRANSFER_EVENT
import com.amazonaws.event.{ProgressEvent, ProgressListener}
import com.amazonaws.services.s3.model.{ObjectMetadata, PutObjectRequest}
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.domain.Implicits._
import net.kemitix.thorp.domain.StorageQueueEvent.{
Action,
@ -19,12 +18,12 @@ import net.kemitix.thorp.domain.UploadEvent.{
}
import net.kemitix.thorp.domain.{StorageQueueEvent, _}
import net.kemitix.thorp.storage.aws.Uploader.Request
import zio.{UIO, ZIO}
import zio.UIO
trait Uploader {
def upload(transferManager: => AmazonTransferManager)(
request: Request): ZIO[Config, Nothing, StorageQueueEvent] =
request: Request): UIO[StorageQueueEvent] =
transfer(transferManager)(request)
.catchAll(handleError(request.localFile.remoteKey))
@ -35,8 +34,7 @@ trait Uploader {
private def transfer(transferManager: => AmazonTransferManager)(
request: Request
) =
putObjectRequest(request) >>=
dispatch(transferManager)
dispatch(transferManager)(putObjectRequest(request))
private def dispatch(transferManager: AmazonTransferManager)(
putObjectRequest: PutObjectRequest
@ -57,13 +55,10 @@ trait Uploader {
request.localFile.remoteKey.key,
request.localFile.file)
.withMetadata(metadata(request.localFile))
for {
batchMode <- Config.batchMode
r = if (batchMode) putRequest
else
putRequest.withGeneralProgressListener(
progressListener(request.uploadEventListener))
} yield r
if (request.uploadEventListener.batchMode) putRequest
else
putRequest.withGeneralProgressListener(
progressListener(request.uploadEventListener))
}
private def metadata: LocalFile => ObjectMetadata = localFile => {

View file

@ -5,10 +5,9 @@ import java.nio.file.Path
import com.amazonaws.services.s3.model.PutObjectRequest
import com.amazonaws.services.s3.transfer.TransferManagerConfiguration
import com.amazonaws.services.s3.transfer.internal.TransferManagerUtils
import net.kemitix.thorp.core.hasher.Hasher
import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain.MD5Hash
import net.kemitix.thorp.filesystem.FileSystem
import net.kemitix.thorp.filesystem.{FileSystem, Hasher}
import zio.{RIO, ZIO}
private trait ETagGenerator {

View file

@ -2,11 +2,10 @@ package net.kemitix.thorp.storage.aws.hasher
import java.nio.file.Path
import net.kemitix.thorp.core.hasher.Hasher
import net.kemitix.thorp.core.hasher.Hasher.Live.{hasher => CoreHasher}
import net.kemitix.thorp.core.hasher.Hasher.Service
import net.kemitix.thorp.domain.{HashType, MD5Hash}
import net.kemitix.thorp.filesystem.FileSystem
import net.kemitix.thorp.filesystem.Hasher.Live.{hasher => CoreHasher}
import net.kemitix.thorp.filesystem.Hasher.Service
import net.kemitix.thorp.filesystem.{FileSystem, Hasher}
import net.kemitix.thorp.storage.aws.ETag
import zio.RIO

View file

@ -1,12 +1,10 @@
package net.kemitix.thorp.storage.aws
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.Storage
import net.kemitix.thorp.storage.Storage
import org.scalamock.scalatest.MockFactory
import zio.{RIO, UIO, ZIO}
import zio.{RIO, UIO}
trait AmazonS3ClientTestFixture extends MockFactory {
@ -29,14 +27,14 @@ trait AmazonS3ClientTestFixture extends MockFactory {
override def listObjects(
bucket: Bucket,
prefix: RemoteKey
): RIO[Console, RemoteObjects] =
): RIO[Storage, RemoteObjects] =
Lister.listObjects(client)(bucket, prefix)
override def upload(
localFile: LocalFile,
bucket: Bucket,
listenerSettings: UploadEventListener.Settings,
): ZIO[Config, Nothing, StorageQueueEvent] =
): UIO[StorageQueueEvent] =
Uploader.upload(transferManager)(
Uploader.Request(localFile, bucket, listenerSettings))

View file

@ -8,18 +8,15 @@ import com.amazonaws.services.s3.model.{
ListObjectsV2Result,
S3ObjectSummary
}
import net.kemitix.thorp.console._
import net.kemitix.thorp.domain.NonUnit.~*
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.Storage
import org.scalatest.FreeSpec
import org.scalatest.Matchers._
import zio.internal.PlatformLive
import zio.{Runtime, Task, UIO}
import zio.{DefaultRuntime, Task, UIO}
class ListerTest extends FreeSpec {
private val runtime = Runtime(Console.Live, PlatformLive.Default)
"list" - {
val bucket = Bucket("aBucket")
val prefix = RemoteKey("aRemoteKey")
@ -28,7 +25,7 @@ class ListerTest extends FreeSpec {
val nowDate = new Date
val key = "key"
val etag = "etag"
val expectedHashMap = Map(MD5Hash(etag) -> Set(RemoteKey(key)))
val expectedHashMap = Map(MD5Hash(etag) -> RemoteKey(key))
val expectedKeyMap = Map(RemoteKey(key) -> MD5Hash(etag))
new AmazonS3ClientTestFixture {
(fixture.amazonS3Client.listObjectsV2 _)
@ -51,8 +48,8 @@ class ListerTest extends FreeSpec {
val key2 = "key2"
val etag2 = "etag2"
val expectedHashMap = Map(
MD5Hash(etag1) -> Set(RemoteKey(key1)),
MD5Hash(etag2) -> Set(RemoteKey(key2))
MD5Hash(etag1) -> RemoteKey(key1),
MD5Hash(etag2) -> RemoteKey(key2)
)
val expectedKeyMap = Map(
RemoteKey(key1) -> MD5Hash(etag1),
@ -121,8 +118,10 @@ class ListerTest extends FreeSpec {
}
def invoke(amazonS3Client: AmazonS3.Client)(bucket: Bucket,
prefix: RemoteKey) =
runtime.unsafeRunSync {
Lister.listObjects(amazonS3Client)(bucket, prefix)
new DefaultRuntime {}.unsafeRunSync {
Lister
.listObjects(amazonS3Client)(bucket, prefix)
.provide(Storage.Test)
}.toEither
}

View file

@ -14,8 +14,8 @@ class S3ObjectsByHashSuite extends FunSpec {
val o2 = s3object(hash, key2)
val os = LazyList(o1, o2)
it("should group by the hash value") {
val expected: Map[MD5Hash, Set[RemoteKey]] = Map(
hash -> Set(key1, key2)
val expected: Map[MD5Hash, RemoteKey] = Map(
hash -> key2
)
val result = Map.from(S3ObjectsByHash.byHash(os))
assertResult(expected)(result)

View file

@ -1,9 +1,9 @@
package net.kemitix.thorp.storage.aws
import net.kemitix.thorp.config.Resource
import net.kemitix.thorp.core.{LocalFileValidator, S3MetaDataEnricher}
import net.kemitix.thorp.lib.{LocalFileValidator, S3MetaDataEnricher}
import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem.Resource
import org.scalamock.scalatest.MockFactory
import org.scalatest.FunSpec
@ -39,8 +39,8 @@ class StorageServiceSuite extends FunSpec with MockFactory {
prefix)
s3ObjectsData = RemoteObjects(
byHash = MapView(
hash -> Set(key, keyOtherKey.remoteKey),
diffHash -> Set(keyDiffHash.remoteKey)
hash -> key,
diffHash -> keyDiffHash.remoteKey
),
byKey = MapView(
key -> hash,
@ -59,14 +59,14 @@ class StorageServiceSuite extends FunSpec with MockFactory {
def invoke(localFile: LocalFile, s3ObjectsData: RemoteObjects) =
S3MetaDataEnricher.getS3Status(localFile, s3ObjectsData)
def getMatchesByKey(status: (Option[MD5Hash], Set[(RemoteKey, MD5Hash)]))
: Option[MD5Hash] = {
def getMatchesByKey(
status: (Option[MD5Hash], Map[MD5Hash, RemoteKey])): Option[MD5Hash] = {
val (byKey, _) = status
byKey
}
def getMatchesByHash(status: (Option[MD5Hash], Set[(RemoteKey, MD5Hash)]))
: Set[(RemoteKey, MD5Hash)] = {
def getMatchesByHash(status: (Option[MD5Hash], Map[MD5Hash, RemoteKey]))
: Map[MD5Hash, RemoteKey] = {
val (_, byHash) = status
byHash
}

View file

@ -5,7 +5,7 @@ import java.io.File
import com.amazonaws.SdkClientException
import com.amazonaws.services.s3.model.AmazonS3Exception
import com.amazonaws.services.s3.transfer.model.UploadResult
import net.kemitix.thorp.config.{Config, Resource}
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain.StorageQueueEvent.{
Action,
@ -17,6 +17,7 @@ import org.scalamock.scalatest.MockFactory
import org.scalatest.FreeSpec
import zio.{DefaultRuntime, Task}
import net.kemitix.thorp.domain.NonUnit.~*
import net.kemitix.thorp.filesystem.Resource
class UploaderTest extends FreeSpec with MockFactory {
@ -26,7 +27,7 @@ class UploaderTest extends FreeSpec with MockFactory {
val aHash = MD5Hash("aHash")
val hashes = Map[HashType, MD5Hash](MD5 -> aHash)
val remoteKey = RemoteKey("aRemoteKey")
val localFile = LocalFile(aFile, aSource, hashes, remoteKey)
val localFile = LocalFile(aFile, aSource, hashes, remoteKey, aFile.length)
val bucket = Bucket("aBucket")
val uploadResult = new UploadResult
uploadResult.setKey(remoteKey.key)
@ -35,7 +36,7 @@ class UploaderTest extends FreeSpec with MockFactory {
override def waitForUploadResult: UploadResult = uploadResult
}
val listenerSettings =
UploadEventListener.Settings(localFile, 0, SyncTotals(1, 0, 0), 0)
UploadEventListener.Settings(localFile, 0, 0, batchMode = true)
"when no error" in {
val expected =
Right(UploadQueueEvent(remoteKey, aHash))

View file

@ -3,11 +3,9 @@ package net.kemitix.thorp.storage.aws.hasher
import java.nio.file.Path
import com.amazonaws.services.s3.transfer.TransferManagerConfiguration
import net.kemitix.thorp.config.Resource
import net.kemitix.thorp.core.hasher.Hasher
import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain.MD5Hash
import net.kemitix.thorp.filesystem.FileSystem
import net.kemitix.thorp.filesystem.{FileSystem, Hasher, Resource}
import org.scalatest.FunSpec
import zio.DefaultRuntime

View file

@ -1,9 +1,7 @@
package net.kemitix.thorp.storage.api
package net.kemitix.thorp.storage
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.domain._
import zio.{Task, RIO, UIO, ZIO}
import zio.{RIO, Task, UIO, ZIO}
trait Storage {
val storage: Storage.Service
@ -15,13 +13,13 @@ object Storage {
def listObjects(
bucket: Bucket,
prefix: RemoteKey
): RIO[Storage with Console, RemoteObjects]
): RIO[Storage, RemoteObjects]
def upload(
localFile: LocalFile,
bucket: Bucket,
listenerSettings: UploadEventListener.Settings,
): ZIO[Storage with Config, Nothing, StorageQueueEvent]
): ZIO[Storage, Nothing, StorageQueueEvent]
def copy(
bucket: Bucket,
@ -53,9 +51,8 @@ object Storage {
val storage: Service = new Service {
override def listObjects(
bucket: Bucket,
prefix: RemoteKey): RIO[Storage with Console, RemoteObjects] =
override def listObjects(bucket: Bucket,
prefix: RemoteKey): RIO[Storage, RemoteObjects] =
listResult
override def upload(
@ -85,14 +82,14 @@ object Storage {
object Test extends Test
final def list(bucket: Bucket,
prefix: RemoteKey): RIO[Storage with Console, RemoteObjects] =
prefix: RemoteKey): RIO[Storage, RemoteObjects] =
ZIO.accessM(_.storage listObjects (bucket, prefix))
final def upload(
localFile: LocalFile,
bucket: Bucket,
listenerSettings: UploadEventListener.Settings
): ZIO[Storage with Config, Nothing, StorageQueueEvent] =
): ZIO[Storage, Nothing, StorageQueueEvent] =
ZIO.accessM(_.storage upload (localFile, bucket, listenerSettings))
final def copy(

View file

@ -0,0 +1,22 @@
package net.kemitix.throp.uishell
import net.kemitix.eip.zio.MessageChannel
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.filesystem.{FileSystem, Hasher}
import zio.clock.Clock
sealed trait ProgressEvent
object ProgressEvent {
type Env = Console
type ProgressSender =
MessageChannel.ESender[Config with Clock with Hasher with FileSystem,
Throwable,
ProgressEvent]
type ProgressReceiver =
MessageChannel.Receiver[ProgressEvent.Env, ProgressEvent]
type ProgressChannel = MessageChannel.Channel[Console, ProgressEvent]
final case class PingEvent() extends ProgressEvent
}

View file

@ -0,0 +1,37 @@
package net.kemitix.throp.uishell
import net.kemitix.thorp.domain._
sealed trait UIEvent
object UIEvent {
case object ShowValidConfig extends UIEvent
case class RemoteDataFetched(size: Int) extends UIEvent
case class ShowSummary(counters: Counters) extends UIEvent
case class FileFound(localFile: LocalFile) extends UIEvent
case class ActionChosen(action: Action) extends UIEvent
/**
* The content of the file ({{hash}}) that will be placed
* at {{remoteKey}} is already being uploaded to another
* location. Once that upload has completed, its RemoteKey
* will become available and a Copy action can be made.
* @param remoteKey where this upload will copy the other to
* @param hash the hash of the file being uploaded
*/
case class AwaitingAnotherUpload(remoteKey: RemoteKey, hash: MD5Hash)
extends UIEvent
case class AnotherUploadWaitComplete(action: Action) extends UIEvent
case class ActionFinished(action: Action,
actionCounter: Int,
bytesCounter: Long)
extends UIEvent
case class KeyFound(remoteKey: RemoteKey) extends UIEvent
}

View file

@ -0,0 +1,96 @@
package net.kemitix.throp.uishell
import net.kemitix.eip.zio.MessageChannel
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console.ConsoleOut.{
CopyComplete,
DeleteComplete,
UploadComplete
}
import net.kemitix.thorp.console.{Console, ConsoleOut}
import net.kemitix.thorp.domain.{
Action,
Counters,
LocalFile,
MD5Hash,
RemoteKey
}
import net.kemitix.thorp.domain.Action.ToUpload
import net.kemitix.thorp.domain.Terminal.eraseToEndOfScreen
import zio.{UIO, ZIO}
object UIShell {
def receiver: UIO[MessageChannel.UReceiver[Console with Config, UIEvent]] =
UIO { uiEventMessage =>
uiEventMessage.body match {
case UIEvent.ShowValidConfig => showValidConfig
case UIEvent.RemoteDataFetched(size) => remoteDataFetched(size)
case UIEvent.ShowSummary(counters) => showSummary(counters)
case UIEvent.FileFound(localFile) => fileFound(localFile)
case UIEvent.ActionChosen(action) => UIO(())
case UIEvent.AwaitingAnotherUpload(remoteKey, hash) =>
awaitingUpload(remoteKey, hash)
case UIEvent.AnotherUploadWaitComplete(action) =>
uploadWaitComplete(action)
case UIEvent.ActionFinished(action, _, _) => actionFinished(action)
case UIEvent.KeyFound(_) => UIO(())
}
}
private def actionFinished(
action: Action): ZIO[Console with Config, Nothing, Unit] = {
for {
batchMode <- Config.batchMode
_ <- action match {
case _: Action.DoNothing => UIO(())
case ToUpload(_, localFile, _) =>
Console.putMessageLnB(UploadComplete(localFile.remoteKey), batchMode)
case Action.ToCopy(_, sourceKey, _, targetKey, _) =>
Console.putMessageLnB(CopyComplete(sourceKey, targetKey), batchMode)
case Action.ToDelete(_, remoteKey, _) =>
Console.putMessageLnB(DeleteComplete(remoteKey), batchMode)
}
} yield ()
}
private def uploadWaitComplete(
action: Action): ZIO[Console, Nothing, Unit] = {
Console.putStrLn(s"Finished waiting to other upload - now $action")
}
private def awaitingUpload(remoteKey: RemoteKey,
hash: MD5Hash): ZIO[Console, Nothing, Unit] = {
Console.putStrLn(
s"Awaiting another upload of $hash before copying it to $remoteKey")
}
private def fileFound(
localFile: LocalFile): ZIO[Console with Config, Nothing, Unit] = {
for {
batchMode <- Config.batchMode
_ <- ZIO.when(batchMode)(Console.putStrLn(s"Found: ${localFile.file}"))
} yield ()
}
private def showSummary(
counters: Counters): ZIO[Console with Config, Nothing, Unit] = {
Console.putStrLn(eraseToEndOfScreen) *>
Console.putStrLn(s"Uploaded ${counters.uploaded} files") *>
Console.putStrLn(s"Copied ${counters.copied} files") *>
Console.putStrLn(s"Deleted ${counters.deleted} files") *>
Console.putStrLn(s"Errors ${counters.errors}")
}
private def remoteDataFetched(size: Int): ZIO[Console, Nothing, Unit] = {
Console.putStrLn(s"Found $size remote objects")
}
private def showValidConfig: ZIO[Console with Config, Nothing, Unit] = {
for {
bucket <- Config.bucket
prefix <- Config.prefix
sources <- Config.sources
_ <- Console.putMessageLn(ConsoleOut.ValidConfig(bucket, prefix, sources))
} yield ()
}
}