Remove dead code (#190)

* [lib] Remove PlanBuilder and PlanExecutor

* [lib] Remove ActionGenerator and LocalFileStream

* [lib] Remove S3MetaDataEnricher

* [lib] Remove Remote

* [filesystem] fix paths to test resources

* [lib] Remove LocalFileValidator

* [lib] Remove SyncPlan

* [lib] Remove SequencePlan

* [lib] Remove KeyGenerator

* [lib] Remove DummyStorageService

* [lib] Remove EventQueue

* [lib] Remove SyncLogging

* [lib] Remove LocalFiles

* [lib] inline CoreTypes into Program

* [lib] Remote EIPTest

* [lib] LocalFileSystem remove unneccary parens

* [domain] Remove Monoid

* [domain] Remove MatchedMetedata

* [domain] Remove NonUnit

* [domain] Remove RemoteMetaData

* [domain] Rename StorageQueueEvent as StorageEvent

* [domain] Remove SyncTotals

* [domain] Rename UploadEvent as UploadProgressEvent

* [sbt] fix assembly merge strategy to work with zio and zio-streams
This commit is contained in:
Paul Campbell 2019-09-08 07:29:23 +01:00 committed by GitHub
parent 777bc970d7
commit becd297858
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
53 changed files with 264 additions and 2288 deletions

View file

@ -5,15 +5,14 @@ import net.kemitix.eip.zio.{Message, MessageChannel}
import net.kemitix.thorp.cli.CliArgs import net.kemitix.thorp.cli.CliArgs
import net.kemitix.thorp.config._ import net.kemitix.thorp.config._
import net.kemitix.thorp.console._ import net.kemitix.thorp.console._
import net.kemitix.thorp.domain.{Counters, StorageQueueEvent} import net.kemitix.thorp.domain.{Counters, StorageEvent}
import net.kemitix.thorp.domain.StorageQueueEvent.{ import net.kemitix.thorp.domain.StorageEvent.{
CopyQueueEvent, CopyEvent,
DeleteQueueEvent, DeleteEvent,
ErrorQueueEvent, ErrorEvent,
UploadQueueEvent UploadEvent
} }
import net.kemitix.thorp.filesystem.{FileSystem, Hasher} import net.kemitix.thorp.filesystem.{FileSystem, Hasher}
import net.kemitix.thorp.lib.CoreTypes.CoreProgram
import net.kemitix.thorp.lib._ import net.kemitix.thorp.lib._
import net.kemitix.thorp.storage.Storage import net.kemitix.thorp.storage.Storage
import net.kemitix.throp.uishell.{UIEvent, UIShell} import net.kemitix.throp.uishell.{UIEvent, UIShell}
@ -24,7 +23,10 @@ trait Program {
lazy val version = s"Thorp v${thorp.BuildInfo.version}" lazy val version = s"Thorp v${thorp.BuildInfo.version}"
def run(args: List[String]): CoreProgram[Unit] = { def run(args: List[String]): ZIO[
Storage with Console with Config with Clock with FileSystem with Hasher with FileScanner,
Throwable,
Unit] = {
for { for {
cli <- CliArgs.parse(args) cli <- CliArgs.parse(args)
config <- ConfigurationBuilder.buildConfig(cli) config <- ConfigurationBuilder.buildConfig(cli)
@ -87,21 +89,21 @@ trait Program {
} }
private def showSummary(uiChannel: UIChannel)( private def showSummary(uiChannel: UIChannel)(
events: Seq[StorageQueueEvent]): RIO[Clock, Unit] = { events: Seq[StorageEvent]): RIO[Clock, Unit] = {
val counters = events.foldLeft(Counters.empty)(countActivities) val counters = events.foldLeft(Counters.empty)(countActivities)
Message.create(UIEvent.ShowSummary(counters)) >>= Message.create(UIEvent.ShowSummary(counters)) >>=
MessageChannel.send(uiChannel) MessageChannel.send(uiChannel)
} }
private def countActivities: (Counters, StorageQueueEvent) => Counters = private def countActivities: (Counters, StorageEvent) => Counters =
(counters: Counters, s3Action: StorageQueueEvent) => { (counters: Counters, s3Action: StorageEvent) => {
val increment: Int => Int = _ + 1 val increment: Int => Int = _ + 1
s3Action match { s3Action match {
case _: UploadQueueEvent => case _: UploadEvent =>
Counters.uploaded.modify(increment)(counters) Counters.uploaded.modify(increment)(counters)
case _: CopyQueueEvent => Counters.copied.modify(increment)(counters) case _: CopyEvent => Counters.copied.modify(increment)(counters)
case _: DeleteQueueEvent => Counters.deleted.modify(increment)(counters) case _: DeleteEvent => Counters.deleted.modify(increment)(counters)
case _: ErrorQueueEvent => Counters.errors.modify(increment)(counters) case _: ErrorEvent => Counters.errors.modify(increment)(counters)
case _ => counters case _ => counters
} }
} }

View file

@ -32,7 +32,11 @@ val commonSettings = Seq(
Wart.NonUnitStatements, Wart.NonUnitStatements,
Wart.StringPlusAny Wart.StringPlusAny
).contains(wart)), ).contains(wart)),
test in assembly := {} test in assembly := {},
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
) )
val applicationSettings = Seq( val applicationSettings = Seq(

View file

@ -2,7 +2,6 @@ package net.kemitix.thorp.config
import java.nio.file.Paths import java.nio.file.Paths
import net.kemitix.thorp.domain.NonUnit.~*
import net.kemitix.thorp.domain.Sources import net.kemitix.thorp.domain.Sources
import org.scalatest.FreeSpec import org.scalatest.FreeSpec
@ -76,7 +75,7 @@ class ConfigQueryTest extends FreeSpec {
val pwd = Paths.get(System.getenv("PWD")) val pwd = Paths.get(System.getenv("PWD"))
val expected = Sources(List(pwd)) val expected = Sources(List(pwd))
val result = ConfigQuery.sources(ConfigOptions(List())) val result = ConfigQuery.sources(ConfigOptions(List()))
~*(assertResult(expected)(result)) assertResult(expected)(result)
} }
} }
"when is set once" - { "when is set once" - {

View file

@ -3,7 +3,6 @@ package net.kemitix.thorp.config
import java.nio.file.{Path, Paths} import java.nio.file.{Path, Paths}
import net.kemitix.thorp.domain.Filter.{Exclude, Include} import net.kemitix.thorp.domain.Filter.{Exclude, Include}
import net.kemitix.thorp.domain.NonUnit.~*
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem.FileSystem import net.kemitix.thorp.filesystem.FileSystem
import org.scalatest.FunSpec import org.scalatest.FunSpec
@ -131,10 +130,10 @@ class ConfigurationBuilderTest extends FunSpec with TemporaryFolder {
Filter.Include("current-include"))) Filter.Include("current-include")))
val options = configOptions(ConfigOption.Source(currentSource)) val options = configOptions(ConfigOption.Source(currentSource))
val result = invoke(options) val result = invoke(options)
~*(assertResult(expectedSources)(result.map(_.sources))) assertResult(expectedSources)(result.map(_.sources))
~*(assertResult(expectedBuckets)(result.map(_.bucket))) assertResult(expectedBuckets)(result.map(_.bucket))
~*(assertResult(expectedPrefixes)(result.map(_.prefix))) assertResult(expectedPrefixes)(result.map(_.prefix))
~*(assertResult(expectedFilters)(result.map(_.filters))) assertResult(expectedFilters)(result.map(_.filters))
}) })
}) })
} }

View file

@ -1,6 +1,6 @@
package net.kemitix.thorp.console package net.kemitix.thorp.console
import net.kemitix.thorp.domain.StorageQueueEvent.Action import net.kemitix.thorp.domain.StorageEvent.ActionSummary
import net.kemitix.thorp.domain.Terminal._ import net.kemitix.thorp.domain.Terminal._
import net.kemitix.thorp.domain.{Bucket, RemoteKey, Sources} import net.kemitix.thorp.domain.{Bucket, RemoteKey, Sources}
import zio.UIO import zio.UIO
@ -59,7 +59,7 @@ object ConsoleOut {
s"Deleted: $remoteKey" s"Deleted: $remoteKey"
} }
final case class ErrorQueueEventOccurred(action: Action, e: Throwable) final case class ErrorQueueEventOccurred(action: ActionSummary, e: Throwable)
extends ConsoleOut.WithBatchMode { extends ConsoleOut.WithBatchMode {
override def en: String = override def en: String =
s"${action.name} failed: ${action.keys}: ${e.getMessage}" s"${action.name} failed: ${action.keys}: ${e.getMessage}"

View file

@ -1,8 +0,0 @@
package net.kemitix.thorp.domain
// For the LocalFile, the set of matching S3 objects with the same MD5Hash, and any S3 object with the same remote key
final case class MatchedMetadata(
localFile: LocalFile,
matchByHash: Option[RemoteMetaData],
matchByKey: Option[RemoteMetaData]
)

View file

@ -1,6 +0,0 @@
package net.kemitix.thorp.domain
trait Monoid[T] {
def zero: T
def op(t1: T, t2: T): T
}

View file

@ -1,8 +0,0 @@
package net.kemitix.thorp.domain
object NonUnit {
@specialized def ~*[A](evaluateForSideEffectOnly: A): Unit = {
val _ = evaluateForSideEffectOnly
() //Return unit to prevent warning due to discarding value
}
}

View file

@ -1,6 +0,0 @@
package net.kemitix.thorp.domain
final case class RemoteMetaData(
remoteKey: RemoteKey,
hash: MD5Hash
)

View file

@ -0,0 +1,49 @@
package net.kemitix.thorp.domain
sealed trait StorageEvent
object StorageEvent {
final case class DoNothingEvent(
remoteKey: RemoteKey
) extends StorageEvent
final case class CopyEvent(
sourceKey: RemoteKey,
targetKey: RemoteKey
) extends StorageEvent
final case class UploadEvent(
remoteKey: RemoteKey,
md5Hash: MD5Hash
) extends StorageEvent
final case class DeleteEvent(
remoteKey: RemoteKey
) extends StorageEvent
final case class ErrorEvent(
action: ActionSummary,
remoteKey: RemoteKey,
e: Throwable
) extends StorageEvent
final case class ShutdownEvent() extends StorageEvent
sealed trait ActionSummary {
val name: String
val keys: String
}
object ActionSummary {
final case class Copy(keys: String) extends ActionSummary {
override val name: String = "Copy"
}
final case class Upload(keys: String) extends ActionSummary {
override val name: String = "Upload"
}
final case class Delete(keys: String) extends ActionSummary {
override val name: String = "Delete"
}
}
}

View file

@ -1,49 +0,0 @@
package net.kemitix.thorp.domain
sealed trait StorageQueueEvent
object StorageQueueEvent {
final case class DoNothingQueueEvent(
remoteKey: RemoteKey
) extends StorageQueueEvent
final case class CopyQueueEvent(
sourceKey: RemoteKey,
targetKey: RemoteKey
) extends StorageQueueEvent
final case class UploadQueueEvent(
remoteKey: RemoteKey,
md5Hash: MD5Hash
) extends StorageQueueEvent
final case class DeleteQueueEvent(
remoteKey: RemoteKey
) extends StorageQueueEvent
final case class ErrorQueueEvent(
action: Action,
remoteKey: RemoteKey,
e: Throwable
) extends StorageQueueEvent
final case class ShutdownQueueEvent() extends StorageQueueEvent
sealed trait Action {
val name: String
val keys: String
}
object Action {
final case class Copy(keys: String) extends Action {
override val name: String = "Copy"
}
final case class Upload(keys: String) extends Action {
override val name: String = "Upload"
}
final case class Delete(keys: String) extends Action {
override val name: String = "Delete"
}
}
}

View file

@ -1,15 +0,0 @@
package net.kemitix.thorp.domain
final case class SyncTotals private (
count: Long,
totalSizeBytes: Long,
sizeUploadedBytes: Long
)
object SyncTotals {
val empty: SyncTotals = SyncTotals(0L, 0L, 0L)
def create(count: Long,
totalSizeBytes: Long,
sizeUploadedBytes: Long): SyncTotals =
SyncTotals(count, totalSizeBytes, sizeUploadedBytes)
}

View file

@ -2,7 +2,7 @@ package net.kemitix.thorp.domain
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import net.kemitix.thorp.domain.UploadEvent.RequestEvent import net.kemitix.thorp.domain.UploadProgressEvent.RequestEvent
import net.kemitix.thorp.domain.UploadEventLogger.RequestCycle import net.kemitix.thorp.domain.UploadEventLogger.RequestCycle
object UploadEventListener { object UploadEventListener {
@ -14,7 +14,7 @@ object UploadEventListener {
batchMode: Boolean batchMode: Boolean
) )
def listener(settings: Settings): UploadEvent => Unit = { def listener(settings: Settings): UploadProgressEvent => Unit = {
val bytesTransferred = new AtomicLong(0L) val bytesTransferred = new AtomicLong(0L)
event => event =>
{ {

View file

@ -1,23 +1,23 @@
package net.kemitix.thorp.domain package net.kemitix.thorp.domain
sealed trait UploadEvent { sealed trait UploadProgressEvent {
def name: String def name: String
} }
object UploadEvent { object UploadProgressEvent {
final case class TransferEvent( final case class TransferEvent(
name: String name: String
) extends UploadEvent ) extends UploadProgressEvent
final case class RequestEvent( final case class RequestEvent(
name: String, name: String,
bytes: Long, bytes: Long,
transferred: Long transferred: Long
) extends UploadEvent ) extends UploadProgressEvent
final case class ByteTransferEvent( final case class ByteTransferEvent(
name: String name: String
) extends UploadEvent ) extends UploadProgressEvent
} }

View file

@ -4,8 +4,6 @@ import java.io.{File, IOException, PrintWriter}
import java.nio.file.attribute.BasicFileAttributes import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.{FileVisitResult, Files, Path, SimpleFileVisitor} import java.nio.file.{FileVisitResult, Files, Path, SimpleFileVisitor}
import net.kemitix.thorp.domain.NonUnit.~*
import scala.util.Try import scala.util.Try
trait TemporaryFolder { trait TemporaryFolder {
@ -15,16 +13,15 @@ trait TemporaryFolder {
val dir: Path = Files.createTempDirectory("thorp-temp") val dir: Path = Files.createTempDirectory("thorp-temp")
val t = Try(testCode(dir)) val t = Try(testCode(dir))
remove(dir) remove(dir)
~*(t.get) t.get
()
} }
def remove(root: Path): Unit = { def remove(root: Path): Unit = {
~*(
Files.walkFileTree( Files.walkFileTree(
root, root,
new SimpleFileVisitor[Path] { new SimpleFileVisitor[Path] {
override def visitFile( override def visitFile(file: Path,
file: Path,
attrs: BasicFileAttributes): FileVisitResult = { attrs: BasicFileAttributes): FileVisitResult = {
Files.delete(file) Files.delete(file)
FileVisitResult.CONTINUE FileVisitResult.CONTINUE
@ -35,7 +32,7 @@ trait TemporaryFolder {
FileVisitResult.CONTINUE FileVisitResult.CONTINUE
} }
} }
)) )
} }
def createFile(directory: Path, name: String, contents: String*): File = { def createFile(directory: Path, name: String, contents: String*): File = {
@ -48,6 +45,6 @@ trait TemporaryFolder {
} }
def writeFile(directory: Path, name: String, contents: String*): Unit = def writeFile(directory: Path, name: String, contents: String*): Unit =
~*(createFile(directory, name, contents: _*)) createFile(directory, name, contents: _*)
} }

View file

@ -11,7 +11,7 @@ class MD5HashGeneratorTest extends FunSpec {
describe("md5File()") { describe("md5File()") {
describe("read a small file (smaller than buffer)") { describe("read a small file (smaller than buffer)") {
val path = Resource(this, "../upload/root-file").toPath val path = Resource(this, "upload/root-file").toPath
it("should generate the correct hash") { it("should generate the correct hash") {
val expected = Right(Root.hash) val expected = Right(Root.hash)
val result = invoke(path) val result = invoke(path)
@ -20,7 +20,7 @@ class MD5HashGeneratorTest extends FunSpec {
} }
describe("read a large file (bigger than buffer)") { describe("read a large file (bigger than buffer)") {
val path = Resource(this, "../big-file").toPath val path = Resource(this, "big-file").toPath
it("should generate the correct hash") { it("should generate the correct hash") {
val expected = Right(BigFile.hash) val expected = Right(BigFile.hash)
val result = invoke(path) val result = invoke(path)
@ -38,7 +38,7 @@ class MD5HashGeneratorTest extends FunSpec {
describe("md5FileChunk") { describe("md5FileChunk") {
describe("read chunks of file") { describe("read chunks of file") {
val path = Resource(this, "../big-file").toPath val path = Resource(this, "big-file").toPath
it("should generate the correct hash for first chunk of the file") { it("should generate the correct hash for first chunk of the file") {
val part1 = BigFile.Part1 val part1 = BigFile.Part1
val expected = Right(MD5Hash.hash(part1.hash)) val expected = Right(MD5Hash.hash(part1.hash))

View file

@ -1,95 +0,0 @@
package net.kemitix.thorp.lib
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToUpload}
import net.kemitix.thorp.domain.Implicits._
import net.kemitix.thorp.domain._
import zio.RIO
object ActionGenerator {
def createActions(
matchedMetadata: MatchedMetadata,
previousActions: LazyList[Action]
): RIO[Config, LazyList[Action]] =
for {
bucket <- Config.bucket
} yield
genAction(formattedMetadata(matchedMetadata, previousActions), bucket)
private def formattedMetadata(
matchedMetadata: MatchedMetadata,
previousActions: LazyList[Action]): TaggedMetadata = {
val remoteExists = matchedMetadata.matchByKey.nonEmpty
val remoteMatches = remoteExists && matchedMetadata.matchByKey.exists(m =>
LocalFile.matchesHash(matchedMetadata.localFile)(m.hash))
val anyMatches = matchedMetadata.matchByHash.nonEmpty
TaggedMetadata(matchedMetadata,
previousActions,
remoteExists,
remoteMatches,
anyMatches)
}
final case class TaggedMetadata(
matchedMetadata: MatchedMetadata,
previousActions: LazyList[Action],
remoteExists: Boolean,
remoteMatches: Boolean,
anyMatches: Boolean
)
private def genAction(taggedMetadata: TaggedMetadata,
bucket: Bucket): LazyList[Action] = {
taggedMetadata match {
case TaggedMetadata(md, _, remoteExists, remoteMatches, _)
if remoteExists && remoteMatches =>
doNothing(bucket, md.localFile.remoteKey)
case TaggedMetadata(md, _, _, _, anyMatches) if anyMatches =>
copyFile(bucket, md.localFile, md.matchByHash)
case TaggedMetadata(md, previous, _, _, _)
if isNotUploadAlreadyQueued(previous)(md.localFile) =>
uploadFile(bucket, md.localFile)
case TaggedMetadata(md, _, _, _, _) =>
doNothing(bucket, md.localFile.remoteKey)
}
}
private def key = LocalFile.remoteKey ^|-> RemoteKey.key
def isNotUploadAlreadyQueued(
previousActions: LazyList[Action]
)(
localFile: LocalFile
): Boolean = !previousActions.exists {
case ToUpload(_, lf, _) => key.get(lf) === key.get(localFile)
case _ => false
}
private def doNothing(
bucket: Bucket,
remoteKey: RemoteKey
) = LazyList(DoNothing(bucket, remoteKey, 0L))
private def uploadFile(
bucket: Bucket,
localFile: LocalFile
) = LazyList(ToUpload(bucket, localFile, localFile.file.length))
private def copyFile(
bucket: Bucket,
localFile: LocalFile,
remoteMetaData: Option[RemoteMetaData]
) =
LazyList
.from(remoteMetaData)
.take(1)
.map(
other =>
ToCopy(bucket,
other.remoteKey,
other.hash,
localFile.remoteKey,
localFile.file.length))
}

View file

@ -1,21 +0,0 @@
package net.kemitix.thorp.lib
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.filesystem.{FileSystem, Hasher}
import net.kemitix.thorp.storage.Storage
import zio.ZIO
import zio.clock.Clock
object CoreTypes {
type CoreEnv = Storage
with Console
with Config
with Clock
with FileSystem
with Hasher
with FileScanner
type CoreProgram[A] = ZIO[CoreEnv, Throwable, A]
}

View file

@ -1,8 +0,0 @@
package net.kemitix.thorp.lib
import net.kemitix.thorp.domain.StorageQueueEvent
final case class EventQueue(
events: LazyList[StorageQueueEvent],
bytesInQueue: Long
)

View file

@ -1,20 +0,0 @@
package net.kemitix.thorp.lib
import java.nio.file.Path
import net.kemitix.thorp.domain.{RemoteKey, Sources}
import zio.Task
object KeyGenerator {
def generateKey(
sources: Sources,
prefix: RemoteKey
)(path: Path): Task[RemoteKey] =
Sources
.forPath(path)(sources)
.map(_.relativize(path.toAbsolutePath))
.map(_.toFile.getPath)
.map(RemoteKey.resolve(_)(prefix))
}

View file

@ -1,80 +0,0 @@
package net.kemitix.thorp.lib
import java.io.File
import java.nio.file.Path
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.domain.Sources
import net.kemitix.thorp.filesystem.{FileSystem, Hasher}
import zio.{RIO, Task, ZIO}
object LocalFileStream {
def findFiles(
source: Path
): RIO[Config with FileSystem with Hasher, LocalFiles] = {
def recurseIntoSubDirectories(
path: Path): RIO[Config with FileSystem with Hasher, LocalFiles] =
path.toFile match {
case f if f.isDirectory => loop(path)
case _ => localFile(path)
}
def recurse(paths: LazyList[Path])
: RIO[Config with FileSystem with Hasher, LocalFiles] =
for {
recursed <- ZIO.foreach(paths)(path => recurseIntoSubDirectories(path))
} yield LocalFiles.reduce(LazyList.from(recursed))
def loop(path: Path): RIO[Config with FileSystem with Hasher, LocalFiles] =
dirPaths(path) >>= recurse
loop(source)
}
private def dirPaths(path: Path) =
listFiles(path) >>= includedDirPaths
private def includedDirPaths(paths: LazyList[Path]) =
for {
flaggedPaths <- RIO.foreach(paths)(path =>
isIncluded(path).map((path, _)))
} yield
LazyList
.from(flaggedPaths)
.filter({ case (_, included) => included })
.map({ case (path, _) => path })
private def localFile(path: Path) =
for {
sources <- Config.sources
prefix <- Config.prefix
source <- Sources.forPath(path)(sources)
hash <- Hasher.hashObject(path)
localFile <- LocalFileValidator.validate(path,
source.toFile,
hash,
sources,
prefix)
} yield LocalFiles.one(localFile)
private def listFiles(path: Path) =
for {
files <- Task(path.toFile.listFiles)
_ <- filesMustExist(path, files)
} yield LazyList.from(files.toIndexedSeq).map(_.toPath)
private def filesMustExist(path: Path, files: Array[File]) =
Task {
Option(files)
.map(_ => ())
.getOrElse(new IllegalArgumentException(s"Directory not found $path"))
}
private def isIncluded(path: Path) =
for {
filters <- Config.filters
} yield Filters.isIncluded(path)(filters)
}

View file

@ -25,13 +25,13 @@ trait LocalFileSystem {
archive: ThorpArchive archive: ThorpArchive
): RIO[ ): RIO[
Clock with Config with Hasher with FileSystem with FileScanner with Storage, Clock with Config with Hasher with FileSystem with FileScanner with Storage,
Seq[StorageQueueEvent]] Seq[StorageEvent]]
def scanDelete( def scanDelete(
uiChannel: UChannel[Any, UIEvent], uiChannel: UChannel[Any, UIEvent],
remoteData: RemoteObjects, remoteData: RemoteObjects,
archive: UnversionedMirrorArchive.type archive: UnversionedMirrorArchive.type
): RIO[Clock with Config with FileSystem with Storage, Seq[StorageQueueEvent]] ): RIO[Clock with Config with FileSystem with Storage, Seq[StorageEvent]]
} }
object LocalFileSystem extends LocalFileSystem { object LocalFileSystem extends LocalFileSystem {
@ -42,12 +42,12 @@ object LocalFileSystem extends LocalFileSystem {
archive: ThorpArchive archive: ThorpArchive
): RIO[ ): RIO[
Clock with Hasher with FileSystem with Config with FileScanner with Storage, Clock with Hasher with FileSystem with Config with FileScanner with Storage,
Seq[StorageQueueEvent]] = Seq[StorageEvent]] =
for { for {
actionCounter <- Ref.make(0) actionCounter <- Ref.make(0)
bytesCounter <- Ref.make(0L) bytesCounter <- Ref.make(0L)
uploads <- Ref.make(Map.empty[MD5Hash, Promise[Throwable, RemoteKey]]) uploads <- Ref.make(Map.empty[MD5Hash, Promise[Throwable, RemoteKey]])
eventsRef <- Ref.make(List.empty[StorageQueueEvent]) eventsRef <- Ref.make(List.empty[StorageEvent])
fileSender <- FileScanner.scanSources fileSender <- FileScanner.scanSources
fileReceiver <- fileReceiver(uiChannel, fileReceiver <- fileReceiver(uiChannel,
remoteObjects, remoteObjects,
@ -64,12 +64,11 @@ object LocalFileSystem extends LocalFileSystem {
uiChannel: UChannel[Any, UIEvent], uiChannel: UChannel[Any, UIEvent],
remoteData: RemoteObjects, remoteData: RemoteObjects,
archive: UnversionedMirrorArchive.type archive: UnversionedMirrorArchive.type
): RIO[Clock with Config with FileSystem with Storage, ): RIO[Clock with Config with FileSystem with Storage, Seq[StorageEvent]] =
Seq[StorageQueueEvent]] =
for { for {
actionCounter <- Ref.make(0) actionCounter <- Ref.make(0)
bytesCounter <- Ref.make(0L) bytesCounter <- Ref.make(0L)
eventsRef <- Ref.make(List.empty[StorageQueueEvent]) eventsRef <- Ref.make(List.empty[StorageEvent])
keySender <- keySender(remoteData.byKey.keys) keySender <- keySender(remoteData.byKey.keys)
keyReceiver <- keyReceiver(uiChannel, keyReceiver <- keyReceiver(uiChannel,
archive, archive,
@ -87,7 +86,7 @@ object LocalFileSystem extends LocalFileSystem {
uploads: Ref[Map[MD5Hash, Promise[Throwable, RemoteKey]]], uploads: Ref[Map[MD5Hash, Promise[Throwable, RemoteKey]]],
actionCounterRef: Ref[Int], actionCounterRef: Ref[Int],
bytesCounterRef: Ref[Long], bytesCounterRef: Ref[Long],
eventsRef: Ref[List[StorageQueueEvent]] eventsRef: Ref[List[StorageEvent]]
): UIO[MessageChannel.UReceiver[Clock with Config with Storage, ): UIO[MessageChannel.UReceiver[Clock with Config with Storage,
FileScanner.ScannedFile]] = FileScanner.ScannedFile]] =
UIO { message => UIO { message =>
@ -135,7 +134,7 @@ object LocalFileSystem extends LocalFileSystem {
remoteForHash match { remoteForHash match {
case Some((sourceKey, hash)) => case Some((sourceKey, hash)) =>
doCopy(localFile, bucket, sourceKey, hash) doCopy(localFile, bucket, sourceKey, hash)
case _ if (matchesPreviousUpload(previous, localFile.hashes)) => case _ if matchesPreviousUpload(previous, localFile.hashes) =>
doCopyWithPreviousUpload(localFile, bucket, previous, uiChannel) doCopyWithPreviousUpload(localFile, bucket, previous, uiChannel)
case _ => case _ =>
doUpload(localFile, bucket) doUpload(localFile, bucket)
@ -218,7 +217,7 @@ object LocalFileSystem extends LocalFileSystem {
archive: ThorpArchive, archive: ThorpArchive,
actionCounterRef: Ref[Int], actionCounterRef: Ref[Int],
bytesCounterRef: Ref[Long], bytesCounterRef: Ref[Long],
eventsRef: Ref[List[StorageQueueEvent]] eventsRef: Ref[List[StorageEvent]]
): UIO[ ): UIO[
MessageChannel.UReceiver[Clock with Config with FileSystem with Storage, MessageChannel.UReceiver[Clock with Config with FileSystem with Storage,
RemoteKey]] = RemoteKey]] =

View file

@ -1,67 +0,0 @@
package net.kemitix.thorp.lib
import java.io.File
import java.nio.file.Path
import net.kemitix.thorp.domain.{
HashType,
LocalFile,
MD5Hash,
RemoteKey,
Sources
}
import zio.{IO, ZIO}
object LocalFileValidator {
def validate(
path: Path,
source: File,
hash: Map[HashType, MD5Hash],
sources: Sources,
prefix: RemoteKey
): IO[Violation, LocalFile] =
for {
file <- validateFile(path.toFile)
remoteKey <- validateRemoteKey(sources, prefix, path)
} yield LocalFile(file, source, hash, remoteKey, file.length)
private def validateFile(file: File): IO[Violation, File] =
if (file.isDirectory)
ZIO.fail(Violation.IsNotAFile(file))
else
ZIO.succeed(file)
private def validateRemoteKey(sources: Sources,
prefix: RemoteKey,
path: Path): IO[Violation, RemoteKey] =
KeyGenerator
.generateKey(sources, prefix)(path)
.mapError(e => Violation.InvalidRemoteKey(path, e))
sealed trait Violation extends Throwable {
def getMessage: String
}
object Violation {
final case class IsNotAFile(file: File) extends Violation {
override def getMessage: String = s"Local File must be a file: ${file}"
}
final case class InvalidRemoteKey(path: Path, e: Throwable)
extends Violation {
override def getMessage: String =
s"Remote Key for '${path}' is invalid: ${e.getMessage}"
}
}
def resolve(
path: String,
md5Hashes: Map[HashType, MD5Hash],
source: Path,
sources: Sources,
prefix: RemoteKey
): IO[Violation, LocalFile] = {
val resolvedPath = source.resolve(path)
validate(resolvedPath, source.toFile, md5Hashes, sources, prefix)
}
}

View file

@ -1,24 +0,0 @@
package net.kemitix.thorp.lib
import net.kemitix.thorp.domain.LocalFile
final case class LocalFiles(
localFiles: LazyList[LocalFile],
count: Long,
totalSizeBytes: Long
) {
def ++(append: LocalFiles): LocalFiles =
copy(
localFiles = localFiles ++ append.localFiles,
count = count + append.count,
totalSizeBytes = totalSizeBytes + append.totalSizeBytes
)
}
object LocalFiles {
val empty: LocalFiles = LocalFiles(LazyList.empty, 0L, 0L)
def reduce: LazyList[LocalFiles] => LocalFiles =
list => list.foldLeft(LocalFiles.empty)((acc, lf) => acc ++ lf)
def one(localFile: LocalFile): LocalFiles =
LocalFiles(LazyList(localFile), 1, localFile.file.length)
}

View file

@ -1,84 +0,0 @@
package net.kemitix.thorp.lib
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console._
import net.kemitix.thorp.domain.Action._
import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem.{FileSystem, Hasher}
import net.kemitix.thorp.storage.Storage
import zio.{RIO, ZIO}
object PlanBuilder {
def createPlan(remoteObjects: RemoteObjects)
: RIO[Storage with Console with Config with FileSystem with Hasher,
SyncPlan] = findLocalFiles >>= assemblePlan(remoteObjects)
private def assemblePlan(remoteObjects: RemoteObjects)(
localData: LocalFiles) =
createActions(remoteObjects, localData.localFiles)
.map(_.filter(doesSomething).sortBy(SequencePlan.order))
.map(syncPlan(localData))
private def syncPlan(localData: LocalFiles): LazyList[Action] => SyncPlan =
SyncPlan.create(_, syncTotal(localData))
private def syncTotal(localData: LocalFiles): SyncTotals =
SyncTotals.create(localData.count, localData.totalSizeBytes, 0L)
private def createActions(remoteObjects: RemoteObjects,
localFiles: LazyList[LocalFile]) =
for {
fileActions <- actionsForLocalFiles(remoteObjects, localFiles)
remoteActions <- actionsForRemoteKeys(remoteObjects.byKey.keys)
} yield fileActions ++ remoteActions
private def doesSomething: Action => Boolean = {
case _: DoNothing => false
case _ => true
}
private def actionsForLocalFiles(remoteObjects: RemoteObjects,
localFiles: LazyList[LocalFile]) =
ZIO.foldLeft(localFiles)(LazyList.empty[Action])(
(acc, localFile) =>
createActionsFromLocalFile(remoteObjects, acc, localFile)
.map(_ #::: acc)
)
private def createActionsFromLocalFile(remoteObjects: RemoteObjects,
previousActions: LazyList[Action],
localFile: LocalFile) =
ActionGenerator.createActions(
S3MetaDataEnricher.getMetadata(localFile, remoteObjects),
previousActions
)
private def actionsForRemoteKeys(remoteKeys: Iterable[RemoteKey]) =
ZIO.foldLeft(remoteKeys)(LazyList.empty[Action])(
(acc, remoteKey) => createActionFromRemoteKey(remoteKey).map(_ #:: acc)
)
private def createActionFromRemoteKey(
remoteKey: RemoteKey
): ZIO[FileSystem with Config, Throwable, Action] =
for {
bucket <- Config.bucket
prefix <- Config.prefix
sources <- Config.sources
needsDeleted <- Remote.isMissingLocally(sources, prefix, remoteKey)
} yield
if (needsDeleted) ToDelete(bucket, remoteKey, 0L)
else DoNothing(bucket, remoteKey, 0L)
private def findLocalFiles =
SyncLogging.logFileScan *> findFiles
private def findFiles =
for {
sources <- Config.sources
found <- ZIO.foreach(sources.paths)(LocalFileStream.findFiles)
_ <- Console.putStrLn(s"Found ${found.flatMap(_.localFiles).size} files")
} yield LocalFiles.reduce(LazyList.from(found))
}

View file

@ -1,55 +0,0 @@
package net.kemitix.thorp.lib
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.domain.{Action, StorageQueueEvent}
import net.kemitix.thorp.storage.Storage
import zio.{Ref, ZIO}
trait PlanExecutor {
def executePlan(
archive: ThorpArchive,
syncPlan: SyncPlan
): ZIO[Storage with Config with Console,
Throwable,
Seq[
StorageQueueEvent
]] =
for {
actionCounter <- Ref.make(0)
bytesCounter <- Ref.make(0L)
events <- applyActions(archive, syncPlan, actionCounter, bytesCounter)
} yield events
private def applyActions(
archive: ThorpArchive,
syncPlan: SyncPlan,
actionCounter: Ref[Int],
bytesCounter: Ref[Long]
): ZIO[Storage with Console with Config,
Throwable,
LazyList[StorageQueueEvent]] = {
ZIO.foldLeft(syncPlan.actions)(LazyList.empty[StorageQueueEvent]) {
(stream: LazyList[StorageQueueEvent], action) =>
val result: ZIO[Storage with Console with Config,
Throwable,
StorageQueueEvent] =
updateArchive(archive, actionCounter, bytesCounter)(action)
result.map(event => event #:: stream)
}
}
private def updateArchive(archive: ThorpArchive,
actionCounterRef: Ref[Int],
bytesCounterRef: Ref[Long])(action: Action) =
for {
actionCounter <- actionCounterRef.update(_ + 1)
bytesCounter <- bytesCounterRef.update(_ + action.size)
event <- archive.update(SequencedAction(action, actionCounter),
bytesCounter)
} yield event
}
object PlanExecutor extends PlanExecutor

View file

@ -1,30 +0,0 @@
package net.kemitix.thorp.lib
import java.nio.file.Path
import net.kemitix.thorp.domain.{RemoteKey, Sources}
import net.kemitix.thorp.filesystem.FileSystem
import zio.{RIO, ZIO}
object Remote {
def isMissingLocally(sources: Sources,
prefix: RemoteKey,
remoteKey: RemoteKey): RIO[FileSystem, Boolean] =
existsLocally(sources, prefix)(remoteKey)
.map(exists => !exists)
def existsLocally(sources: Sources, prefix: RemoteKey)(
remoteKey: RemoteKey
): RIO[FileSystem, Boolean] = {
def existsInSource(source: Path) =
RemoteKey.asFile(source, prefix)(remoteKey) match {
case Some(file) => FileSystem.exists(file)
case None => ZIO.succeed(false)
}
ZIO
.foreach(sources.paths)(existsInSource)
.map(lb => lb.exists(l => l))
}
}

View file

@ -1,50 +0,0 @@
package net.kemitix.thorp.lib
import net.kemitix.thorp.domain._
object S3MetaDataEnricher {
def getMetadata(
localFile: LocalFile,
remoteObjects: RemoteObjects
): MatchedMetadata = {
val (keyMatches, hashMatches) = getS3Status(localFile, remoteObjects)
val maybeByKey: Option[RemoteMetaData] = keyMatches.map { hash =>
RemoteMetaData(localFile.remoteKey, hash)
}
val maybeByHash: Option[RemoteMetaData] = hashMatches.map {
case (md5Hash, remoteKey) =>
RemoteMetaData(remoteKey, md5Hash)
}.headOption
MatchedMetadata(
localFile,
matchByKey = maybeByKey,
matchByHash = maybeByHash
)
}
def getS3Status(
localFile: LocalFile,
remoteObjects: RemoteObjects
): (Option[MD5Hash], Map[MD5Hash, RemoteKey]) = {
val byKey: Option[MD5Hash] =
remoteObjects.byKey.get(localFile.remoteKey)
val hashes: Map[HashType, MD5Hash] = localFile.hashes
val byHash: Map[MD5Hash, RemoteKey] =
hashes
.map {
case (hashType, md5Hash) =>
(md5Hash, remoteObjects.byHash.get(md5Hash))
}
.flatMap {
case (md5Hash, Some(maybeyKey)) => Some((md5Hash, maybeyKey))
case (_, None) => None
}
(byKey, byHash)
}
}

View file

@ -1,17 +0,0 @@
package net.kemitix.thorp.lib
import net.kemitix.thorp.domain.Action
import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
trait SequencePlan {
def order: Action => Int = {
case _: DoNothing => 0
case _: ToCopy => 1
case _: ToUpload => 2
case _: ToDelete => 3
}
}
object SequencePlan extends SequencePlan

View file

@ -1,18 +0,0 @@
package net.kemitix.thorp.lib
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console._
import zio.ZIO
trait SyncLogging {
def logFileScan: ZIO[Config with Console, Nothing, Unit] =
for {
sources <- Config.sources
_ <- Console.putStrLn(
s"Scanning local files: ${sources.paths.mkString(", ")}...")
} yield ()
}
object SyncLogging extends SyncLogging

View file

@ -1,14 +0,0 @@
package net.kemitix.thorp.lib
import net.kemitix.thorp.domain.{Action, SyncTotals}
final case class SyncPlan private (
actions: LazyList[Action],
syncTotals: SyncTotals
)
object SyncPlan {
val empty: SyncPlan = SyncPlan(LazyList.empty, SyncTotals.empty)
def create(actions: LazyList[Action], syncTotals: SyncTotals): SyncPlan =
SyncPlan(actions, syncTotals)
}

View file

@ -8,8 +8,8 @@ import net.kemitix.thorp.console.ConsoleOut.{
UploadComplete UploadComplete
} }
import net.kemitix.thorp.console._ import net.kemitix.thorp.console._
import net.kemitix.thorp.domain.StorageQueueEvent import net.kemitix.thorp.domain.StorageEvent
import net.kemitix.thorp.domain.StorageQueueEvent._ import net.kemitix.thorp.domain.StorageEvent._
import net.kemitix.thorp.storage.Storage import net.kemitix.thorp.storage.Storage
import zio.{RIO, ZIO} import zio.{RIO, ZIO}
@ -18,29 +18,28 @@ trait ThorpArchive {
def update( def update(
sequencedAction: SequencedAction, sequencedAction: SequencedAction,
totalBytesSoFar: Long totalBytesSoFar: Long
): ZIO[Storage with Config, Nothing, StorageQueueEvent] ): ZIO[Storage with Config, Nothing, StorageEvent]
def logEvent( def logEvent(event: StorageEvent): RIO[Console with Config, StorageEvent] =
event: StorageQueueEvent): RIO[Console with Config, StorageQueueEvent] =
for { for {
batchMode <- Config.batchMode batchMode <- Config.batchMode
sqe <- event match { sqe <- event match {
case UploadQueueEvent(remoteKey, _) => case UploadEvent(remoteKey, _) =>
ZIO(event) <* Console.putMessageLnB(UploadComplete(remoteKey), ZIO(event) <* Console.putMessageLnB(UploadComplete(remoteKey),
batchMode) batchMode)
case CopyQueueEvent(sourceKey, targetKey) => case CopyEvent(sourceKey, targetKey) =>
ZIO(event) <* Console.putMessageLnB( ZIO(event) <* Console.putMessageLnB(
CopyComplete(sourceKey, targetKey), CopyComplete(sourceKey, targetKey),
batchMode) batchMode)
case DeleteQueueEvent(remoteKey) => case DeleteEvent(remoteKey) =>
ZIO(event) <* Console.putMessageLnB(DeleteComplete(remoteKey), ZIO(event) <* Console.putMessageLnB(DeleteComplete(remoteKey),
batchMode) batchMode)
case ErrorQueueEvent(action, _, e) => case ErrorEvent(action, _, e) =>
ZIO(event) <* Console.putMessageLnB( ZIO(event) <* Console.putMessageLnB(
ErrorQueueEventOccurred(action, e), ErrorQueueEventOccurred(action, e),
batchMode) batchMode)
case DoNothingQueueEvent(_) => ZIO(event) case DoNothingEvent(_) => ZIO(event)
case ShutdownQueueEvent() => ZIO(event) case ShutdownEvent() => ZIO(event)
} }
} yield sqe } yield sqe

View file

@ -2,7 +2,7 @@ package net.kemitix.thorp.lib
import net.kemitix.thorp.config.Config import net.kemitix.thorp.config.Config
import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent import net.kemitix.thorp.domain.StorageEvent.DoNothingEvent
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.Storage import net.kemitix.thorp.storage.Storage
import zio.{UIO, ZIO} import zio.{UIO, ZIO}
@ -12,7 +12,7 @@ trait UnversionedMirrorArchive extends ThorpArchive {
override def update( override def update(
sequencedAction: SequencedAction, sequencedAction: SequencedAction,
totalBytesSoFar: Long totalBytesSoFar: Long
): ZIO[Storage with Config, Nothing, StorageQueueEvent] = ): ZIO[Storage with Config, Nothing, StorageEvent] =
sequencedAction match { sequencedAction match {
case SequencedAction(ToUpload(bucket, localFile, _), index) => case SequencedAction(ToUpload(bucket, localFile, _), index) =>
doUpload(index, totalBytesSoFar, bucket, localFile) doUpload(index, totalBytesSoFar, bucket, localFile)
@ -21,7 +21,7 @@ trait UnversionedMirrorArchive extends ThorpArchive {
case SequencedAction(ToDelete(bucket, remoteKey, _), _) => case SequencedAction(ToDelete(bucket, remoteKey, _), _) =>
Storage.delete(bucket, remoteKey) Storage.delete(bucket, remoteKey)
case SequencedAction(DoNothing(_, remoteKey, _), _) => case SequencedAction(DoNothing(_, remoteKey, _), _) =>
UIO(DoNothingQueueEvent(remoteKey)) UIO(DoNothingEvent(remoteKey))
} }
private def doUpload( private def doUpload(

View file

@ -1,207 +0,0 @@
package net.kemitix.thorp.lib
import net.kemitix.thorp.config._
import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToUpload}
import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem.{FileSystem, Resource}
import org.scalatest.FunSpec
import zio.DefaultRuntime
class ActionGeneratorSuite extends FunSpec {
private val source = Resource(this, "upload")
private val sourcePath = source.toPath
private val sources = Sources(List(sourcePath))
private val prefix = RemoteKey("prefix")
private val bucket = Bucket("bucket")
private val configOptions = ConfigOptions(
List[ConfigOption](
ConfigOption.Bucket("bucket"),
ConfigOption.Prefix("prefix"),
ConfigOption.Source(sourcePath),
ConfigOption.IgnoreUserOptions,
ConfigOption.IgnoreGlobalOptions
))
describe("create actions") {
val previousActions = LazyList.empty[Action]
describe("#1 local exists, remote exists, remote matches - do nothing") {
val theHash = MD5Hash("the-hash")
val env = for {
theFile <- LocalFileValidator.resolve("the-file",
md5HashMap(theHash),
sourcePath,
sources,
prefix)
theRemoteMetadata = RemoteMetaData(theFile.remoteKey, theHash)
input = MatchedMetadata(
theFile, // local exists
matchByHash = Some(theRemoteMetadata), // remote matches
matchByKey = Some(theRemoteMetadata) // remote exists
)
} yield (theFile, input)
it("do nothing") {
env.map({
case (theFile, input) => {
val expected =
Right(LazyList(
DoNothing(bucket, theFile.remoteKey, theFile.file.length + 1)))
val result = invoke(input, previousActions)
assertResult(expected)(result)
}
})
}
}
describe("#2 local exists, remote is missing, other matches - copy") {
val theHash = MD5Hash("the-hash")
val env = for {
theFile <- LocalFileValidator.resolve("the-file",
md5HashMap(theHash),
sourcePath,
sources,
prefix)
theRemoteKey = theFile.remoteKey
otherRemoteKey = RemoteKey.resolve("other-key")(prefix)
otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash)
input = MatchedMetadata(
theFile, // local exists
matchByHash = Some(otherRemoteMetadata), // other matches
matchByKey = None) // remote is missing
} yield (theFile, theRemoteKey, input, otherRemoteKey)
it("copy from other key") {
env.map({
case (theFile, theRemoteKey, input, otherRemoteKey) => {
val expected = Right(
LazyList(
ToCopy(bucket,
otherRemoteKey,
theHash,
theRemoteKey,
theFile.file.length))) // copy
val result = invoke(input, previousActions)
assertResult(expected)(result)
}
})
}
describe("#3 local exists, remote is missing, other no matches - upload") {
val theHash = MD5Hash("the-hash")
val env = for {
theFile <- LocalFileValidator.resolve("the-file",
md5HashMap(theHash),
sourcePath,
sources,
prefix)
input = MatchedMetadata(theFile, // local exists
matchByHash = None, // other no matches
matchByKey = None) // remote is missing
} yield (theFile, input)
it("upload") {
env.map({
case (theFile, input) => {
val expected = Right(LazyList(
ToUpload(bucket, theFile, theFile.file.length))) // upload
val result = invoke(input, previousActions)
assertResult(expected)(result)
}
})
}
}
}
describe(
"#4 local exists, remote exists, remote no match, other matches - copy") {
val theHash = MD5Hash("the-hash")
val env = for {
theFile <- LocalFileValidator.resolve("the-file",
md5HashMap(theHash),
sourcePath,
sources,
prefix)
theRemoteKey = theFile.remoteKey
oldHash = MD5Hash("old-hash")
otherRemoteKey = RemoteKey.resolve("other-key")(prefix)
otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash)
oldRemoteMetadata = RemoteMetaData(theRemoteKey,
hash = oldHash // remote no match
)
input = MatchedMetadata(
theFile, // local exists
matchByHash = Some(otherRemoteMetadata), // other matches
matchByKey = Some(oldRemoteMetadata)) // remote exists
} yield (theFile, theRemoteKey, input, otherRemoteKey)
it("copy from other key") {
env.map({
case (theFile, theRemoteKey, input, otherRemoteKey) => {
val expected = Right(
LazyList(
ToCopy(bucket,
otherRemoteKey,
theHash,
theRemoteKey,
theFile.file.length))) // copy
val result = invoke(input, previousActions)
assertResult(expected)(result)
}
})
}
}
describe(
"#5 local exists, remote exists, remote no match, other no matches - upload") {
val theHash = MD5Hash("the-hash")
val env = for {
theFile <- LocalFileValidator.resolve("the-file",
md5HashMap(theHash),
sourcePath,
sources,
prefix)
theRemoteKey = theFile.remoteKey
oldHash = MD5Hash("old-hash")
theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash)
input = MatchedMetadata(
theFile, // local exists
matchByHash = None, // remote no match, other no match
matchByKey = Some(theRemoteMetadata) // remote exists
)
} yield (theFile, input)
it("upload") {
env.map({
case (theFile, input) => {
val expected = Right(LazyList(
ToUpload(bucket, theFile, theFile.file.length))) // upload
val result = invoke(input, previousActions)
assertResult(expected)(result)
}
})
}
}
describe("#6 local missing, remote exists - delete") {
it("TODO") {
pending
}
}
}
private def md5HashMap(theHash: MD5Hash): Map[HashType, MD5Hash] = {
Map(MD5 -> theHash)
}
private def invoke(
input: MatchedMetadata,
previousActions: LazyList[Action]
) = {
type TestEnv = Config with FileSystem
val testEnv: TestEnv = new Config.Live with FileSystem.Live {}
def testProgram =
for {
config <- ConfigurationBuilder.buildConfig(configOptions)
_ <- Config.set(config)
actions <- ActionGenerator.createActions(input, previousActions)
} yield actions
new DefaultRuntime {}.unsafeRunSync {
testProgram.provide(testEnv)
}.toEither
}
}

View file

@ -1,42 +0,0 @@
package net.kemitix.thorp.lib
import java.io.File
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.Storage
import zio.{RIO, UIO}
final case class DummyStorageService(
remoteObjects: RemoteObjects,
uploadFiles: Map[File, (RemoteKey, MD5Hash)])
extends Storage.Service {
override def shutdown: UIO[StorageQueueEvent] =
UIO(StorageQueueEvent.ShutdownQueueEvent())
override def listObjects(
bucket: Bucket,
prefix: RemoteKey
): RIO[Storage, RemoteObjects] =
RIO(remoteObjects)
override def upload(
localFile: LocalFile,
bucket: Bucket,
uploadEventListener: UploadEventListener.Settings,
): UIO[StorageQueueEvent] = {
val (remoteKey, md5Hash) = uploadFiles(localFile.file)
UIO(StorageQueueEvent.UploadQueueEvent(remoteKey, md5Hash))
}
override def copy(bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey): UIO[StorageQueueEvent] =
UIO(StorageQueueEvent.CopyQueueEvent(sourceKey, targetKey))
override def delete(bucket: Bucket,
remoteKey: RemoteKey): UIO[StorageQueueEvent] =
UIO(StorageQueueEvent.DeleteQueueEvent(remoteKey))
}

View file

@ -1,50 +0,0 @@
package net.kemitix.thorp.lib
import org.scalatest.FreeSpec
import zio.stream._
import zio.{DefaultRuntime, IO, UIO, ZIO}
// Experiment on how to trigger events asynchronously
// i.e. Have one thread add to a queue and another take from the queue, neither waiting for the other thread to finish
class EIPTest extends FreeSpec {
"queue" in {
type Callback[A] = IO[Option[Throwable], A] => Unit
def offerInt(cb: Callback[Int], i: Int) = ZIO(cb(IO.succeed(i)))
def closeStream(cb: Callback[Int]) = ZIO(cb(IO.fail(None)))
def publish: Callback[Int] => IO[Throwable, _] =
cb =>
ZIO.foreach(1 to 3) { i =>
ZIO {
println(s"put $i")
Thread.sleep(100)
} *> offerInt(cb, i)
} *> closeStream(cb)
val program = Stream
.effectAsyncM(publish)
.mapM(i => ZIO(println(s"get $i")))
new DefaultRuntime {}.unsafeRunSync(program.runDrain)
}
"EIP: Message Channel" in {
type Message = Int
type Callback = IO[Option[Throwable], Message] => Unit
def producer: Callback => UIO[Unit] =
cb =>
ZIO.foreach(1 to 3)(message =>
UIO {
println(s"put $message")
cb(ZIO.succeed(message))
Thread.sleep(100)
}) *> UIO(cb(ZIO.fail(None)))
def consumer: Message => ZIO[Any, Throwable, Unit] =
message => ZIO(println(s"got $message"))
val program = zio.stream.Stream
.effectAsyncM(producer)
.buffer(1)
.mapM(consumer)
new DefaultRuntime {}.unsafeRunSync(program.runDrain)
}
}

View file

@ -1,45 +0,0 @@
package net.kemitix.thorp.lib
import java.io.File
import java.nio.file.Path
import net.kemitix.thorp.domain.{RemoteKey, Sources}
import net.kemitix.thorp.filesystem.Resource
import org.scalatest.FunSpec
import zio.DefaultRuntime
class KeyGeneratorSuite extends FunSpec {
private val source: File = Resource(this, "upload")
private val sourcePath = source.toPath
private val prefix = RemoteKey("prefix")
private val sources = Sources(List(sourcePath))
describe("key generator") {
describe("when file is within source") {
it("has a valid key") {
val subdir = "subdir"
val expected = Right(RemoteKey(s"${prefix.key}/$subdir"))
val result = invoke(sourcePath.resolve(subdir))
assertResult(expected)(result)
}
}
describe("when file is deeper within source") {
it("has a valid key") {
val subdir = "subdir/deeper/still"
val expected = Right(RemoteKey(s"${prefix.key}/$subdir"))
val result = invoke(sourcePath.resolve(subdir))
assertResult(expected)(result)
}
}
def invoke(path: Path) = {
new DefaultRuntime {}.unsafeRunSync {
KeyGenerator.generateKey(sources, prefix)(path)
}.toEither
}
}
}

View file

@ -1,92 +0,0 @@
package net.kemitix.thorp.lib
import java.nio.file.Paths
import net.kemitix.thorp.config.{
Config,
ConfigOption,
ConfigOptions,
ConfigurationBuilder
}
import net.kemitix.thorp.console._
import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem.{FileSystem, Hasher, Resource}
import net.kemitix.thorp.storage.Storage
import org.scalatest.FunSpec
import zio.{DefaultRuntime, Task, UIO}
class LocalFileStreamSuite extends FunSpec {
private val source = Resource(this, "upload")
private val sourcePath = source.toPath
private def file(filename: String) =
sourcePath.resolve(Paths.get(filename))
describe("findFiles") {
it("should find all files") {
val expected = Right(Set("subdir/leaf-file", "root-file"))
val result =
invoke()
.map(_.localFiles)
.map(_.map(LocalFile.relativeToSource(_).toFile.getPath))
.map(_.toSet)
assertResult(expected)(result)
}
it("should count all files") {
val expected = Right(2)
val result = invoke().map(_.count)
assertResult(expected)(result)
}
it("should sum the size of all files") {
val expected = Right(113)
val result = invoke().map(_.totalSizeBytes)
assertResult(expected)(result)
}
}
private def invoke() = {
type TestEnv = Storage
with Console
with Config
with FileSystem
with Hasher.Test
val testEnv: TestEnv = new Storage.Test with Console.Test with Config.Live
with FileSystem.Live with Hasher.Test {
override def listResult: Task[RemoteObjects] =
Task.die(new NotImplementedError)
override def uploadResult: UIO[StorageQueueEvent] =
Task.die(new NotImplementedError)
override def copyResult: UIO[StorageQueueEvent] =
Task.die(new NotImplementedError)
override def deleteResult: UIO[StorageQueueEvent] =
Task.die(new NotImplementedError)
override def shutdownResult: UIO[StorageQueueEvent] =
Task.die(new NotImplementedError)
}
testEnv.hashes.set(
Map(
file("root-file") -> Map(MD5 -> MD5HashData.Root.hash),
file("subdir/leaf-file") -> Map(MD5 -> MD5HashData.Leaf.hash)
))
val configOptions = ConfigOptions(
List[ConfigOption](
ConfigOption.IgnoreGlobalOptions,
ConfigOption.IgnoreUserOptions,
ConfigOption.Source(sourcePath),
ConfigOption.Bucket("aBucket")
))
def testProgram =
for {
config <- ConfigurationBuilder.buildConfig(configOptions)
_ <- Config.set(config)
files <- LocalFileStream.findFiles(sourcePath)
} yield files
new DefaultRuntime {}.unsafeRunSync {
testProgram.provide(testEnv)
}.toEither
}
}

View file

@ -1,310 +0,0 @@
package net.kemitix.thorp.lib
import net.kemitix.thorp.lib.S3MetaDataEnricher.{getMetadata, getS3Status}
import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem.Resource
import org.scalatest.FunSpec
import scala.collection.MapView
class MatchedMetadataEnricherSuite extends FunSpec {
private val source = Resource(this, "upload")
private val sourcePath = source.toPath
private val sources = Sources(List(sourcePath))
private val prefix = RemoteKey("prefix")
def getMatchesByKey(
status: (Option[MD5Hash], Map[MD5Hash, RemoteKey])): Option[MD5Hash] = {
val (byKey, _) = status
byKey
}
describe("enrich with metadata") {
describe(
"#1a local exists, remote exists, remote matches, other matches - do nothing") {
val theHash: MD5Hash = MD5Hash("the-file-hash")
val env = for {
theFile <- LocalFileValidator.resolve("the-file",
md5HashMap(theHash),
sourcePath,
sources,
prefix)
theRemoteKey = theFile.remoteKey
remoteObjects = RemoteObjects(
byHash = MapView(theHash -> theRemoteKey),
byKey = MapView(theRemoteKey -> theHash)
)
theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash)
} yield (theFile, theRemoteMetadata, remoteObjects)
it("generates valid metadata") {
env.map({
case (theFile, theRemoteMetadata, remoteObjects) => {
val expected =
MatchedMetadata(theFile,
matchByHash = Some(theRemoteMetadata),
matchByKey = Some(theRemoteMetadata))
val result = getMetadata(theFile, remoteObjects)
assertResult(expected)(result)
}
})
}
}
describe(
"#1b local exists, remote exists, remote matches, other no matches - do nothing") {
val theHash: MD5Hash = MD5Hash("the-file-hash")
val env = for {
theFile <- LocalFileValidator.resolve("the-file",
md5HashMap(theHash),
sourcePath,
sources,
prefix)
theRemoteKey: RemoteKey = RemoteKey.resolve("the-file")(prefix)
remoteObjects = RemoteObjects(
byHash = MapView(theHash -> theRemoteKey),
byKey = MapView(theRemoteKey -> theHash)
)
theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash)
} yield (theFile, theRemoteMetadata, remoteObjects)
it("generates valid metadata") {
env.map({
case (theFile, theRemoteMetadata, remoteObjects) => {
val expected =
MatchedMetadata(theFile,
matchByHash = Some(theRemoteMetadata),
matchByKey = Some(theRemoteMetadata))
val result = getMetadata(theFile, remoteObjects)
assertResult(expected)(result)
}
})
}
}
describe(
"#2 local exists, remote is missing, remote no match, other matches - copy") {
val theHash = MD5Hash("the-hash")
val env = for {
theFile <- LocalFileValidator.resolve("the-file",
md5HashMap(theHash),
sourcePath,
sources,
prefix)
otherRemoteKey = RemoteKey("other-key")
remoteObjects = RemoteObjects(
byHash = MapView(theHash -> otherRemoteKey),
byKey = MapView(otherRemoteKey -> theHash)
)
otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash)
} yield (theFile, otherRemoteMetadata, remoteObjects)
it("generates valid metadata") {
env.map({
case (theFile, otherRemoteMetadata, remoteObjects) => {
val expected = MatchedMetadata(theFile,
matchByHash =
Some(otherRemoteMetadata),
matchByKey = None)
val result = getMetadata(theFile, remoteObjects)
assertResult(expected)(result)
}
})
}
}
describe(
"#3 local exists, remote is missing, remote no match, other no matches - upload") {
val theHash = MD5Hash("the-hash")
val env = for {
theFile <- LocalFileValidator.resolve("the-file",
md5HashMap(theHash),
sourcePath,
sources,
prefix)
remoteObjects = RemoteObjects.empty
} yield (theFile, remoteObjects)
it("generates valid metadata") {
env.map({
case (theFile, remoteObjects) => {
val expected =
MatchedMetadata(theFile, matchByHash = None, matchByKey = None)
val result = getMetadata(theFile, remoteObjects)
assertResult(expected)(result)
}
})
}
}
describe(
"#4 local exists, remote exists, remote no match, other matches - copy") {
val theHash = MD5Hash("the-hash")
val env = for {
theFile <- LocalFileValidator.resolve("the-file",
md5HashMap(theHash),
sourcePath,
sources,
prefix)
theRemoteKey = theFile.remoteKey
oldHash = MD5Hash("old-hash")
otherRemoteKey = RemoteKey.resolve("other-key")(prefix)
remoteObjects = RemoteObjects(
byHash = MapView(oldHash -> theRemoteKey, theHash -> otherRemoteKey),
byKey = MapView(
theRemoteKey -> oldHash,
otherRemoteKey -> theHash
)
)
theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash)
otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash)
} yield (theFile, theRemoteMetadata, otherRemoteMetadata, remoteObjects)
it("generates valid metadata") {
env.map({
case (theFile,
theRemoteMetadata,
otherRemoteMetadata,
remoteObjects) => {
val expected = MatchedMetadata(theFile,
matchByHash =
Some(otherRemoteMetadata),
matchByKey = Some(theRemoteMetadata))
val result = getMetadata(theFile, remoteObjects)
assertResult(expected)(result)
}
})
}
}
describe(
"#5 local exists, remote exists, remote no match, other no matches - upload") {
val theHash = MD5Hash("the-hash")
val env = for {
theFile <- LocalFileValidator.resolve("the-file",
md5HashMap(theHash),
sourcePath,
sources,
prefix)
theRemoteKey = theFile.remoteKey
oldHash = MD5Hash("old-hash")
remoteObjects = RemoteObjects(
byHash = MapView(oldHash -> theRemoteKey),
byKey = MapView(theRemoteKey -> oldHash)
)
theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash)
} yield (theFile, theRemoteMetadata, remoteObjects)
it("generates valid metadata") {
env.map({
case (theFile, theRemoteMetadata, remoteObjects) => {
val expected = MatchedMetadata(theFile,
matchByHash = None,
matchByKey = Some(theRemoteMetadata))
val result = getMetadata(theFile, remoteObjects)
assertResult(expected)(result)
}
})
}
}
}
private def md5HashMap(theHash: MD5Hash): Map[HashType, MD5Hash] = {
Map(MD5 -> theHash)
}
describe("getS3Status") {
val hash = MD5Hash("hash")
val env = for {
localFile <- LocalFileValidator.resolve("the-file",
md5HashMap(hash),
sourcePath,
sources,
prefix)
key = localFile.remoteKey
keyOtherKey <- LocalFileValidator.resolve("other-key-same-hash",
md5HashMap(hash),
sourcePath,
sources,
prefix)
diffHash = MD5Hash("diff")
keyDiffHash <- LocalFileValidator.resolve("other-key-diff-hash",
md5HashMap(diffHash),
sourcePath,
sources,
prefix)
remoteObjects = RemoteObjects(
byHash = MapView(
hash -> key,
diffHash -> keyDiffHash.remoteKey
),
byKey = MapView(
key -> hash,
keyOtherKey.remoteKey -> hash,
keyDiffHash.remoteKey -> diffHash
)
)
} yield (remoteObjects, localFile, keyDiffHash, diffHash)
def invoke(localFile: LocalFile, s3ObjectsData: RemoteObjects) = {
getS3Status(localFile, s3ObjectsData)
}
describe("when remote key exists") {
it("should return a result for matching key") {
env.map({
case (remoteObjects, localFile: LocalFile, _, _) =>
val result = getMatchesByKey(invoke(localFile, remoteObjects))
assert(result.contains(hash))
})
}
}
describe("when remote key does not exist and no others matches hash") {
val env2 = for {
localFile <- LocalFileValidator.resolve("missing-remote",
md5HashMap(MD5Hash("unique")),
sourcePath,
sources,
prefix)
} yield (localFile)
it("should return no matches by key") {
env.map({
case (remoteObjects, _, _, _) => {
env2.map({
case (localFile) => {
val result = getMatchesByKey(invoke(localFile, remoteObjects))
assert(result.isEmpty)
}
})
}
})
}
it("should return no matches by hash") {
env.map({
case (remoteObjects, _, _, _) => {
env2.map({
case (localFile) => {
val result = {
val (_, byHash) = invoke(localFile, remoteObjects)
byHash
}
assert(result.isEmpty)
}
})
}
})
}
}
describe("when remote key exists and no others match hash") {
val _ = env.map({
case (remoteObjects, _, keyDiffHash, diffHash) => {
it("should return match by key") {
val result = getMatchesByKey(invoke(keyDiffHash, remoteObjects))
assert(result.contains(diffHash))
}
it("should return only itself in match by hash") {
val result = {
val (_, byHash) = invoke(keyDiffHash, remoteObjects)
byHash
}
assert(result === Set((keyDiffHash.remoteKey, diffHash)))
}
}
})
}
}
}

View file

@ -1,397 +0,0 @@
package net.kemitix.thorp.lib
import java.io.File
import java.nio.file.Path
import net.kemitix.thorp.config.{
Config,
ConfigOption,
ConfigOptions,
ConfigurationBuilder
}
import net.kemitix.thorp.console._
import net.kemitix.thorp.domain.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem.{Hasher, _}
import net.kemitix.thorp.storage.Storage
import org.scalatest.FreeSpec
import zio.{DefaultRuntime, Task, UIO}
import scala.collection.MapView
class PlanBuilderTest extends FreeSpec with TemporaryFolder {
private val emptyRemoteObjects = RemoteObjects.empty
"create a plan" - {
"one source" - {
val options: Path => ConfigOptions =
source =>
configOptions(ConfigOption.Source(source),
ConfigOption.Bucket("a-bucket"),
ConfigOption.IgnoreUserOptions,
ConfigOption.IgnoreGlobalOptions)
"a file" - {
val filename = "aFile"
val remoteKey = RemoteKey(filename)
"with no matching remote key" - {
"with no other remote key with matching hash" - {
"upload file" in {
withDirectory(source => {
val file = createFile(source, filename, "file-content")
val hash = md5Hash(file)
val expected =
Right(List(toUpload(remoteKey, hash, source, file)))
val result =
invoke(options(source),
UIO.succeed(emptyRemoteObjects),
UIO.succeed(Map(file.toPath -> file)))
assertResult(expected)(result)
})
}
}
"with another remote key with matching hash" - {
"copy file" in {
withDirectory(source => {
val anOtherFilename = "other"
val content = "file-content"
val aFile = createFile(source, filename, content)
val anOtherFile = createFile(source, anOtherFilename, content)
val aHash = md5Hash(aFile)
val anOtherKey = RemoteKey("other")
val expected = Right(List(toCopy(anOtherKey, aHash, remoteKey)))
val remoteObjects = RemoteObjects(
byHash = MapView(aHash -> anOtherKey),
byKey = MapView(anOtherKey -> aHash)
)
val result =
invoke(options(source),
UIO.succeed(remoteObjects),
UIO.succeed(Map(aFile.toPath -> aFile,
anOtherFile.toPath -> anOtherFile)))
assertResult(expected)(result)
})
}
}
}
"with matching remote key" - {
"with matching hash" - {
"do nothing" in {
withDirectory(source => {
val file = createFile(source, filename, "file-content")
val hash = md5Hash(file)
// DoNothing actions should have been filtered out of the plan
val expected = Right(List())
val remoteObjects = RemoteObjects(
byHash = MapView(hash -> remoteKey),
byKey = MapView(remoteKey -> hash)
)
val result =
invoke(options(source),
UIO.succeed(remoteObjects),
UIO.succeed(Map(file.toPath -> file)))
assertResult(expected)(result)
})
}
}
"with different hash" - {
"with no matching remote hash" - {
"upload file" in {
withDirectory(source => {
val file = createFile(source, filename, "file-content")
val currentHash = md5Hash(file)
val originalHash = MD5Hash("original-file-content")
val expected =
Right(List(toUpload(remoteKey, currentHash, source, file)))
val remoteObjects = RemoteObjects(
byHash = MapView(originalHash -> remoteKey),
byKey = MapView(remoteKey -> originalHash)
)
val result =
invoke(options(source),
UIO.succeed(remoteObjects),
UIO.succeed(Map(file.toPath -> file)))
assertResult(expected)(result)
})
}
}
"with matching remote hash" - {
"copy file" in {
withDirectory(source => {
val file = createFile(source, filename, "file-content")
val hash = md5Hash(file)
val sourceKey = RemoteKey("other-key")
val expected = Right(List(toCopy(sourceKey, hash, remoteKey)))
val remoteObjects = RemoteObjects(
byHash = MapView(hash -> sourceKey),
byKey = MapView.empty
)
val result =
invoke(options(source),
UIO.succeed(remoteObjects),
UIO.succeed(Map(file.toPath -> file)))
assertResult(expected)(result)
})
}
}
}
}
}
"a remote key" - {
val filename = "aFile"
val remoteKey = RemoteKey(filename)
"with a matching local file" - {
"do nothing" in {
withDirectory(source => {
val file = createFile(source, filename, "file-content")
val hash = md5Hash(file)
// DoNothing actions should have been filtered out of the plan
val expected = Right(List())
val remoteObjects = RemoteObjects(
byHash = MapView(hash -> remoteKey),
byKey = MapView(remoteKey -> hash)
)
val result =
invoke(options(source),
UIO.succeed(remoteObjects),
UIO.succeed(Map(file.toPath -> file)))
assertResult(expected)(result)
})
}
}
"with no matching local file" - {
"delete remote key" in {
withDirectory(source => {
val hash = MD5Hash("file-content")
val expected = Right(List(toDelete(remoteKey)))
val remoteObjects = RemoteObjects(
byHash = MapView(hash -> remoteKey),
byKey = MapView(remoteKey -> hash)
)
val result =
invoke(options(source),
UIO.succeed(remoteObjects),
UIO.succeed(Map.empty))
assertResult(expected)(result)
})
}
}
}
}
"two sources" - {
val filename1 = "file-1"
val filename2 = "file-2"
val remoteKey1 = RemoteKey(filename1)
val remoteKey2 = RemoteKey(filename2)
val options: Path => Path => ConfigOptions =
source1 =>
source2 =>
configOptions(ConfigOption.Source(source1),
ConfigOption.Source(source2),
ConfigOption.Bucket("a-bucket"))
"unique files in both" - {
"upload all files" in {
withDirectory(firstSource => {
val fileInFirstSource =
createFile(firstSource, filename1, "file-1-content")
val hash1 = md5Hash(fileInFirstSource)
withDirectory(secondSource => {
val fileInSecondSource =
createFile(secondSource, filename2, "file-2-content")
val hash2 = md5Hash(fileInSecondSource)
val expected = Right(
Set(
toUpload(remoteKey2, hash2, secondSource, fileInSecondSource),
toUpload(remoteKey1, hash1, firstSource, fileInFirstSource)
))
val result =
invoke(
options(firstSource)(secondSource),
UIO.succeed(emptyRemoteObjects),
UIO.succeed(
Map(fileInFirstSource.toPath -> fileInFirstSource,
fileInSecondSource.toPath -> fileInSecondSource))
).map(_.toSet)
assertResult(expected)(result)
})
})
}
}
"same filename in both" - {
"only upload file in first source" in {
withDirectory(firstSource => {
val fileInFirstSource =
createFile(firstSource, filename1, "file-1-content")
val hash1 = md5Hash(fileInFirstSource)
withDirectory(secondSource => {
val fileInSecondSource =
createFile(secondSource, filename1, "file-2-content")
val hash2 = md5Hash(fileInSecondSource)
val expected = Right(List(
toUpload(remoteKey1, hash1, firstSource, fileInFirstSource)))
val result =
invoke(
options(firstSource)(secondSource),
UIO.succeed(emptyRemoteObjects),
UIO.succeed(
Map(fileInFirstSource.toPath -> fileInFirstSource,
fileInSecondSource.toPath -> fileInSecondSource))
)
assertResult(expected)(result)
})
})
}
}
"with a remote file only present in second source" - {
"do not delete it " in {
withDirectory(firstSource => {
withDirectory(secondSource => {
val fileInSecondSource =
createFile(secondSource, filename2, "file-2-content")
val hash2 = md5Hash(fileInSecondSource)
val expected = Right(List())
val remoteObjects =
RemoteObjects(byHash = MapView(hash2 -> remoteKey2),
byKey = MapView(remoteKey2 -> hash2))
val result =
invoke(options(firstSource)(secondSource),
UIO.succeed(remoteObjects),
UIO.succeed(
Map(fileInSecondSource.toPath -> fileInSecondSource)))
assertResult(expected)(result)
})
})
}
}
"with remote file only present in first source" - {
"do not delete it" in {
withDirectory(firstSource => {
val fileInFirstSource: File =
createFile(firstSource, filename1, "file-1-content")
val hash1 = md5Hash(fileInFirstSource)
withDirectory(secondSource => {
val expected = Right(List())
val remoteObjects =
RemoteObjects(byHash = MapView(hash1 -> remoteKey1),
byKey = MapView(remoteKey1 -> hash1))
val result =
invoke(options(firstSource)(secondSource),
UIO.succeed(remoteObjects),
UIO.succeed(
Map(fileInFirstSource.toPath -> fileInFirstSource)))
assertResult(expected)(result)
})
})
}
}
"with remote file not present in either source" - {
"delete from remote" in {
withDirectory(firstSource => {
withDirectory(secondSource => {
val expected = Right(List(toDelete(remoteKey1)))
val remoteObjects =
RemoteObjects(byHash = MapView.empty,
byKey = MapView(remoteKey1 -> MD5Hash("")))
val result =
invoke(options(firstSource)(secondSource),
UIO.succeed(remoteObjects),
UIO.succeed(Map.empty))
assertResult(expected)(result)
})
})
}
}
}
def md5Hash(file: File): MD5Hash = {
object TestEnv extends Hasher.Live with FileSystem.Live
new DefaultRuntime {}
.unsafeRunSync {
Hasher
.hashObject(file.toPath)
.map(_.get(MD5))
.provide(TestEnv)
}
.toEither
.toOption
.flatten
.getOrElse(MD5Hash("invalid md5 hash in test"))
}
}
private def toUpload(remoteKey: RemoteKey,
md5Hash: MD5Hash,
source: Path,
file: File): (String, String, String, String, String) =
("upload",
remoteKey.key,
MD5Hash.hash(md5Hash),
source.toFile.getPath,
file.toString)
private def toCopy(
sourceKey: RemoteKey,
md5Hash: MD5Hash,
targetKey: RemoteKey): (String, String, String, String, String) =
("copy", sourceKey.key, MD5Hash.hash(md5Hash), targetKey.key, "")
private def toDelete(
remoteKey: RemoteKey): (String, String, String, String, String) =
("delete", remoteKey.key, "", "", "")
private def configOptions(configOptions: ConfigOption*): ConfigOptions =
ConfigOptions(List(configOptions: _*))
private def invoke(
configOptions: ConfigOptions,
result: Task[RemoteObjects],
files: Task[Map[Path, File]]
) = {
type TestEnv = Storage with Console with Config with FileSystem with Hasher
val testEnv: TestEnv = new Storage.Test with Console.Test with Config.Live
with FileSystem.Live with Hasher.Live {
override def listResult: Task[RemoteObjects] = result
override def uploadResult: UIO[StorageQueueEvent] =
Task.die(new NotImplementedError)
override def copyResult: UIO[StorageQueueEvent] =
Task.die(new NotImplementedError)
override def deleteResult: UIO[StorageQueueEvent] =
Task.die(new NotImplementedError)
override def shutdownResult: UIO[StorageQueueEvent] =
Task.die(new NotImplementedError)
}
def testProgram =
for {
config <- ConfigurationBuilder.buildConfig(configOptions)
_ <- Config.set(config)
bucket <- Config.bucket
prefix <- Config.prefix
remoteObjects <- Storage.list(bucket, prefix)
plan <- PlanBuilder.createPlan(remoteObjects)
} yield plan
new DefaultRuntime {}
.unsafeRunSync(testProgram.provide(testEnv))
.toEither
.map(convertResult)
}
private def convertResult(plan: SyncPlan) =
plan.actions.map({
case ToUpload(_, lf, _) =>
("upload",
lf.remoteKey.key,
MD5Hash.hash(lf.hashes(MD5)),
lf.source.toString,
lf.file.toString)
case ToDelete(_, remoteKey, _) => ("delete", remoteKey.key, "", "", "")
case ToCopy(_, sourceKey, hash, targetKey, _) =>
("copy", sourceKey.key, MD5Hash.hash(hash), targetKey.key, "")
case DoNothing(_, remoteKey, _) =>
("do-nothing", remoteKey.key, "", "", "")
case _ => ("other", "", "", "", "")
})
}

View file

@ -1,57 +0,0 @@
package net.kemitix.thorp.lib
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.domain.Action.DoNothing
import net.kemitix.thorp.domain.{
Bucket,
RemoteKey,
StorageQueueEvent,
SyncTotals
}
import net.kemitix.thorp.storage.Storage
import org.scalatest.FreeSpec
import zio.{DefaultRuntime, UIO, ZIO}
class PlanExecutorTest extends FreeSpec {
private def subject(in: LazyList[Int]): ZIO[Any, Throwable, LazyList[Int]] =
ZIO.foldLeft(in)(LazyList.empty[Int])((s, i) => ZIO(i #:: s)).map(_.reverse)
"zio foreach on a stream can be a stream" in {
val input = LazyList.from(1 to 1000000)
val program = subject(input)
val result = new DefaultRuntime {}.unsafeRunSync(program).toEither
assertResult(Right(input))(result)
}
"build plan with 100,000 actions" in {
val nActions = 100000
val bucket = Bucket("bucket")
val remoteKey = RemoteKey("remoteKey")
val input =
LazyList.from(1 to nActions).map(DoNothing(bucket, remoteKey, _))
val syncTotals = SyncTotals.empty
val archiveTask = UIO(UnversionedMirrorArchive)
val syncPlan = SyncPlan(input, syncTotals)
val program: ZIO[Storage with Config with Console,
Throwable,
Seq[StorageQueueEvent]] =
archiveTask.flatMap(archive =>
PlanExecutor.executePlan(archive, syncPlan))
val result: Either[Throwable, Seq[StorageQueueEvent]] =
new DefaultRuntime {}.unsafeRunSync(program.provide(TestEnv)).toEither
val expected = Right(
LazyList
.from(1 to nActions)
.map(_ => StorageQueueEvent.DoNothingQueueEvent(remoteKey)))
assertResult(expected)(result)
}
object TestEnv extends Storage.Test with Config.Live with Console.Test
}

View file

@ -1,43 +0,0 @@
package net.kemitix.thorp.lib
import java.io.File
import net.kemitix.thorp.domain.Action._
import net.kemitix.thorp.domain._
import org.scalatest.FreeSpec
class SequencePlanTest extends FreeSpec {
"sort" - {
"a list of assorted actions" - {
val bucket = Bucket("aBucket")
val remoteKey1 = RemoteKey("remoteKey1")
val remoteKey2 = RemoteKey("targetHash")
val hash = MD5Hash("aHash")
val hashes = Map[HashType, MD5Hash]()
val size = 1024
val file1 = new File("aFile")
val file2 = new File("aFile")
val source = new File("source")
val localFile1 =
LocalFile(file1, source, hashes, remoteKey1, file1.length)
val _ =
LocalFile(file2, source, hashes, remoteKey2, file2.length)
val copy1 = ToCopy(bucket, remoteKey1, hash, remoteKey2, size)
val copy2 = ToCopy(bucket, remoteKey2, hash, remoteKey1, size)
val upload1 = ToUpload(bucket, localFile1, size)
val upload2 = ToUpload(bucket, localFile1, size)
val delete1 = ToDelete(bucket, remoteKey1, size)
val delete2 = ToDelete(bucket, remoteKey2, size)
"should be in correct order" in {
val actions =
List[Action](copy1, delete1, upload1, delete2, upload2, copy2)
val expected =
List[Action](copy1, copy2, upload1, upload2, delete1, delete2)
val result = actions.sortBy(SequencePlan.order)
assertResult(expected)(result)
}
}
}
}

View file

@ -2,10 +2,10 @@ package net.kemitix.thorp.storage.aws
import com.amazonaws.SdkClientException import com.amazonaws.SdkClientException
import com.amazonaws.services.s3.model.{CopyObjectRequest, CopyObjectResult} import com.amazonaws.services.s3.model.{CopyObjectRequest, CopyObjectResult}
import net.kemitix.thorp.domain.StorageQueueEvent.{ import net.kemitix.thorp.domain.StorageEvent.{
Action, ActionSummary,
CopyQueueEvent, CopyEvent,
ErrorQueueEvent ErrorEvent
} }
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.aws.S3ClientException.{CopyError, HashError} import net.kemitix.thorp.storage.aws.S3ClientException.{CopyError, HashError}
@ -13,8 +13,7 @@ import zio.{IO, Task, UIO}
trait Copier { trait Copier {
def copy(amazonS3: AmazonS3.Client)( def copy(amazonS3: AmazonS3.Client)(request: Request): UIO[StorageEvent] =
request: Request): UIO[StorageQueueEvent] =
copyObject(amazonS3)(request) copyObject(amazonS3)(request)
.fold(foldFailure(request.sourceKey, request.targetKey), .fold(foldFailure(request.sourceKey, request.targetKey),
foldSuccess(request.sourceKey, request.targetKey)) foldSuccess(request.sourceKey, request.targetKey))
@ -43,9 +42,8 @@ trait Copier {
copyRequest.targetKey.key copyRequest.targetKey.key
).withMatchingETagConstraint(MD5Hash.hash(copyRequest.hash)) ).withMatchingETagConstraint(MD5Hash.hash(copyRequest.hash))
private def foldFailure( private def foldFailure(sourceKey: RemoteKey,
sourceKey: RemoteKey, targetKey: RemoteKey): Throwable => StorageEvent = {
targetKey: RemoteKey): Throwable => StorageQueueEvent = {
case error: SdkClientException => case error: SdkClientException =>
errorEvent(sourceKey, targetKey, error) errorEvent(sourceKey, targetKey, error)
case error => case error =>
@ -55,20 +53,21 @@ trait Copier {
private def foldSuccess( private def foldSuccess(
sourceKey: RemoteKey, sourceKey: RemoteKey,
targetKey: RemoteKey): CopyObjectResult => StorageQueueEvent = targetKey: RemoteKey): CopyObjectResult => StorageEvent =
result => result =>
Option(result) match { Option(result) match {
case Some(_) => CopyQueueEvent(sourceKey, targetKey) case Some(_) => CopyEvent(sourceKey, targetKey)
case None => case None =>
errorEvent(sourceKey, targetKey, HashError) errorEvent(sourceKey, targetKey, HashError)
} }
private def errorEvent: (RemoteKey, RemoteKey, Throwable) => ErrorQueueEvent = private def errorEvent: (RemoteKey, RemoteKey, Throwable) => ErrorEvent =
(sourceKey, targetKey, error) => (sourceKey, targetKey, error) =>
ErrorQueueEvent(action(sourceKey, targetKey), targetKey, error) ErrorEvent(action(sourceKey, targetKey), targetKey, error)
private def action(sourceKey: RemoteKey, targetKey: RemoteKey): Action = private def action(sourceKey: RemoteKey,
Action.Copy(s"${sourceKey.key} => ${targetKey.key}") targetKey: RemoteKey): ActionSummary =
ActionSummary.Copy(s"${sourceKey.key} => ${targetKey.key}")
} }

View file

@ -1,12 +1,12 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.model.DeleteObjectRequest import com.amazonaws.services.s3.model.DeleteObjectRequest
import net.kemitix.thorp.domain.StorageQueueEvent.{ import net.kemitix.thorp.domain.StorageEvent.{
Action, ActionSummary,
DeleteQueueEvent, DeleteEvent,
ErrorQueueEvent ErrorEvent
} }
import net.kemitix.thorp.domain.{Bucket, RemoteKey, StorageQueueEvent} import net.kemitix.thorp.domain.{Bucket, RemoteKey, StorageEvent}
import zio.{Task, UIO, ZIO} import zio.{Task, UIO, ZIO}
trait Deleter { trait Deleter {
@ -14,17 +14,17 @@ trait Deleter {
def delete(amazonS3: AmazonS3.Client)( def delete(amazonS3: AmazonS3.Client)(
bucket: Bucket, bucket: Bucket,
remoteKey: RemoteKey remoteKey: RemoteKey
): UIO[StorageQueueEvent] = ): UIO[StorageEvent] =
deleteObject(amazonS3)(bucket, remoteKey) deleteObject(amazonS3)(bucket, remoteKey)
.catchAll(e => .catchAll(e =>
UIO(ErrorQueueEvent(Action.Delete(remoteKey.key), remoteKey, e))) UIO(ErrorEvent(ActionSummary.Delete(remoteKey.key), remoteKey, e)))
private def deleteObject(amazonS3: AmazonS3.Client)( private def deleteObject(amazonS3: AmazonS3.Client)(
bucket: Bucket, bucket: Bucket,
remoteKey: RemoteKey remoteKey: RemoteKey
): Task[StorageQueueEvent] = ): Task[StorageEvent] =
(amazonS3.deleteObject(new DeleteObjectRequest(bucket.name, remoteKey.key)) (amazonS3.deleteObject(new DeleteObjectRequest(bucket.name, remoteKey.key))
*> ZIO(DeleteQueueEvent(remoteKey))) *> ZIO(DeleteEvent(remoteKey)))
} }
object Deleter extends Deleter object Deleter extends Deleter

View file

@ -2,7 +2,7 @@ package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.AmazonS3ClientBuilder import com.amazonaws.services.s3.AmazonS3ClientBuilder
import com.amazonaws.services.s3.transfer.TransferManagerBuilder import com.amazonaws.services.s3.transfer.TransferManagerBuilder
import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent import net.kemitix.thorp.domain.StorageEvent.ShutdownEvent
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.Storage import net.kemitix.thorp.storage.Storage
import net.kemitix.thorp.storage.Storage.Service import net.kemitix.thorp.storage.Storage.Service
@ -26,23 +26,23 @@ object S3Storage {
localFile: LocalFile, localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
listenerSettings: UploadEventListener.Settings, listenerSettings: UploadEventListener.Settings,
): UIO[StorageQueueEvent] = ): UIO[StorageEvent] =
Uploader.upload(transferManager)( Uploader.upload(transferManager)(
Uploader.Request(localFile, bucket, listenerSettings)) Uploader.Request(localFile, bucket, listenerSettings))
override def copy(bucket: Bucket, override def copy(bucket: Bucket,
sourceKey: RemoteKey, sourceKey: RemoteKey,
hash: MD5Hash, hash: MD5Hash,
targetKey: RemoteKey): UIO[StorageQueueEvent] = targetKey: RemoteKey): UIO[StorageEvent] =
Copier.copy(client)(Copier.Request(bucket, sourceKey, hash, targetKey)) Copier.copy(client)(Copier.Request(bucket, sourceKey, hash, targetKey))
override def delete(bucket: Bucket, override def delete(bucket: Bucket,
remoteKey: RemoteKey): UIO[StorageQueueEvent] = remoteKey: RemoteKey): UIO[StorageEvent] =
Deleter.delete(client)(bucket, remoteKey) Deleter.delete(client)(bucket, remoteKey)
override def shutdown: UIO[StorageQueueEvent] = { override def shutdown: UIO[StorageEvent] = {
transferManager.shutdownNow(true) *> transferManager.shutdownNow(true) *>
client.shutdown().map(_ => ShutdownQueueEvent()) client.shutdown().map(_ => ShutdownEvent())
} }
} }
} }

View file

@ -6,30 +6,38 @@ import com.amazonaws.event.ProgressEventType.RESPONSE_BYTE_TRANSFER_EVENT
import com.amazonaws.event.{ProgressEvent, ProgressListener} import com.amazonaws.event.{ProgressEvent, ProgressListener}
import com.amazonaws.services.s3.model.{ObjectMetadata, PutObjectRequest} import com.amazonaws.services.s3.model.{ObjectMetadata, PutObjectRequest}
import net.kemitix.thorp.domain.Implicits._ import net.kemitix.thorp.domain.Implicits._
import net.kemitix.thorp.domain.StorageQueueEvent.{ import net.kemitix.thorp.domain.StorageEvent.{
Action, ActionSummary,
ErrorQueueEvent, ErrorEvent,
UploadQueueEvent UploadEvent
} }
import net.kemitix.thorp.domain.UploadEvent.{ import net.kemitix.thorp.domain.UploadProgressEvent.{
ByteTransferEvent, ByteTransferEvent,
RequestEvent, RequestEvent,
TransferEvent TransferEvent
} }
import net.kemitix.thorp.domain.{StorageQueueEvent, _} import net.kemitix.thorp.domain.{
Bucket,
LocalFile,
MD5Hash,
RemoteKey,
StorageEvent,
UploadEventListener,
UploadProgressEvent
}
import net.kemitix.thorp.storage.aws.Uploader.Request import net.kemitix.thorp.storage.aws.Uploader.Request
import zio.UIO import zio.UIO
trait Uploader { trait Uploader {
def upload(transferManager: => AmazonTransferManager)( def upload(transferManager: => AmazonTransferManager)(
request: Request): UIO[StorageQueueEvent] = request: Request): UIO[StorageEvent] =
transfer(transferManager)(request) transfer(transferManager)(request)
.catchAll(handleError(request.localFile.remoteKey)) .catchAll(handleError(request.localFile.remoteKey))
private def handleError(remoteKey: RemoteKey)( private def handleError(remoteKey: RemoteKey)(
e: Throwable): UIO[StorageQueueEvent] = e: Throwable): UIO[StorageEvent] =
UIO(ErrorQueueEvent(Action.Upload(remoteKey.key), remoteKey, e)) UIO(ErrorEvent(ActionSummary.Upload(remoteKey.key), remoteKey, e))
private def transfer(transferManager: => AmazonTransferManager)( private def transfer(transferManager: => AmazonTransferManager)(
request: Request request: Request
@ -43,7 +51,7 @@ trait Uploader {
.upload(putObjectRequest) .upload(putObjectRequest)
.map(_.waitForUploadResult) .map(_.waitForUploadResult)
.map(uploadResult => .map(uploadResult =>
UploadQueueEvent(RemoteKey(uploadResult.getKey), UploadEvent(RemoteKey(uploadResult.getKey),
MD5Hash(uploadResult.getETag))) MD5Hash(uploadResult.getETag)))
} }
@ -79,7 +87,7 @@ trait Uploader {
lock.unlock(writeLock) lock.unlock(writeLock)
} }
private def eventHandler: ProgressEvent => UploadEvent = private def eventHandler: ProgressEvent => UploadProgressEvent =
progressEvent => { progressEvent => {
def isTransfer: ProgressEvent => Boolean = def isTransfer: ProgressEvent => Boolean =
_.getEventType.isTransferEvent _.getEventType.isTransferEvent

View file

@ -1,6 +1,6 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent import net.kemitix.thorp.domain.StorageEvent.ShutdownEvent
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.Storage import net.kemitix.thorp.storage.Storage
import org.scalamock.scalatest.MockFactory import org.scalamock.scalatest.MockFactory
@ -34,7 +34,7 @@ trait AmazonS3ClientTestFixture extends MockFactory {
localFile: LocalFile, localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
listenerSettings: UploadEventListener.Settings, listenerSettings: UploadEventListener.Settings,
): UIO[StorageQueueEvent] = ): UIO[StorageEvent] =
Uploader.upload(transferManager)( Uploader.upload(transferManager)(
Uploader.Request(localFile, bucket, listenerSettings)) Uploader.Request(localFile, bucket, listenerSettings))
@ -43,19 +43,19 @@ trait AmazonS3ClientTestFixture extends MockFactory {
sourceKey: RemoteKey, sourceKey: RemoteKey,
hash: MD5Hash, hash: MD5Hash,
targetKey: RemoteKey targetKey: RemoteKey
): UIO[StorageQueueEvent] = ): UIO[StorageEvent] =
Copier.copy(client)( Copier.copy(client)(
Copier.Request(bucket, sourceKey, hash, targetKey)) Copier.Request(bucket, sourceKey, hash, targetKey))
override def delete( override def delete(
bucket: Bucket, bucket: Bucket,
remoteKey: RemoteKey remoteKey: RemoteKey
): UIO[StorageQueueEvent] = ): UIO[StorageEvent] =
Deleter.delete(client)(bucket, remoteKey) Deleter.delete(client)(bucket, remoteKey)
override def shutdown: UIO[StorageQueueEvent] = { override def shutdown: UIO[StorageEvent] = {
transferManager.shutdownNow(true) *> transferManager.shutdownNow(true) *>
client.shutdown().map(_ => ShutdownQueueEvent()) client.shutdown().map(_ => ShutdownEvent())
} }
} }
} }

View file

@ -2,8 +2,7 @@ package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.model.{AmazonS3Exception, CopyObjectResult} import com.amazonaws.services.s3.model.{AmazonS3Exception, CopyObjectResult}
import net.kemitix.thorp.console.Console import net.kemitix.thorp.console.Console
import net.kemitix.thorp.domain.NonUnit.~* import net.kemitix.thorp.domain.StorageEvent.{ActionSummary, ErrorEvent}
import net.kemitix.thorp.domain.StorageQueueEvent.{Action, ErrorQueueEvent}
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.aws.S3ClientException.{CopyError, HashError} import net.kemitix.thorp.storage.aws.S3ClientException.{CopyError, HashError}
import org.scalatest.FreeSpec import org.scalatest.FreeSpec
@ -22,31 +21,29 @@ class CopierTest extends FreeSpec {
"when source exists" - { "when source exists" - {
"when source hash matches" - { "when source hash matches" - {
"copies from source to target" in { "copies from source to target" in {
val event = StorageQueueEvent.CopyQueueEvent(sourceKey, targetKey) val event = StorageEvent.CopyEvent(sourceKey, targetKey)
val expected = Right(event) val expected = Right(event)
new AmazonS3ClientTestFixture { new AmazonS3ClientTestFixture {
~*(
(fixture.amazonS3Client.copyObject _) (fixture.amazonS3Client.copyObject _)
.when() .when()
.returns(_ => Task.succeed(Some(new CopyObjectResult)))) .returns(_ => Task.succeed(Some(new CopyObjectResult)))
private val result = private val result =
invoke(bucket, sourceKey, hash, targetKey, fixture.amazonS3Client) invoke(bucket, sourceKey, hash, targetKey, fixture.amazonS3Client)
~*(assertResult(expected)(result)) assertResult(expected)(result)
} }
} }
} }
"when source hash does not match" - { "when source hash does not match" - {
"skip the file with an error" in { "skip the file with an error" in {
new AmazonS3ClientTestFixture { new AmazonS3ClientTestFixture {
~*(
(fixture.amazonS3Client.copyObject _) (fixture.amazonS3Client.copyObject _)
.when() .when()
.returns(_ => Task.succeed(None))) .returns(_ => Task.succeed(None))
private val result = private val result =
invoke(bucket, sourceKey, hash, targetKey, fixture.amazonS3Client) invoke(bucket, sourceKey, hash, targetKey, fixture.amazonS3Client)
~*(result match { result match {
case Right( case Right(
ErrorQueueEvent(Action.Copy("sourceKey => targetKey"), ErrorEvent(ActionSummary.Copy("sourceKey => targetKey"),
RemoteKey("targetKey"), RemoteKey("targetKey"),
e)) => e)) =>
e match { e match {
@ -54,7 +51,7 @@ class CopierTest extends FreeSpec {
case _ => fail(s"Not a HashError: ${e.getMessage}") case _ => fail(s"Not a HashError: ${e.getMessage}")
} }
case e => fail(s"Not an ErrorQueueEvent: $e") case e => fail(s"Not an ErrorQueueEvent: $e")
}) }
} }
} }
} }
@ -62,14 +59,14 @@ class CopierTest extends FreeSpec {
"skip the file with an error" in { "skip the file with an error" in {
new AmazonS3ClientTestFixture { new AmazonS3ClientTestFixture {
private val expectedMessage = "The specified key does not exist" private val expectedMessage = "The specified key does not exist"
~*((fixture.amazonS3Client.copyObject _) (fixture.amazonS3Client.copyObject _)
.when() .when()
.returns(_ => Task.fail(new AmazonS3Exception(expectedMessage)))) .returns(_ => Task.fail(new AmazonS3Exception(expectedMessage)))
private val result = private val result =
invoke(bucket, sourceKey, hash, targetKey, fixture.amazonS3Client) invoke(bucket, sourceKey, hash, targetKey, fixture.amazonS3Client)
~*(result match { result match {
case Right( case Right(
ErrorQueueEvent(Action.Copy("sourceKey => targetKey"), ErrorEvent(ActionSummary.Copy("sourceKey => targetKey"),
RemoteKey("targetKey"), RemoteKey("targetKey"),
e)) => e)) =>
e match { e match {
@ -78,7 +75,7 @@ class CopierTest extends FreeSpec {
case _ => fail(s"Not a CopyError: ${e.getMessage}") case _ => fail(s"Not a CopyError: ${e.getMessage}")
} }
case e => fail(s"Not an ErrorQueueEvent: ${e}") case e => fail(s"Not an ErrorQueueEvent: ${e}")
}) }
} }
} }
} }

View file

@ -3,16 +3,15 @@ package net.kemitix.thorp.storage.aws
import com.amazonaws.SdkClientException import com.amazonaws.SdkClientException
import com.amazonaws.services.s3.model.AmazonS3Exception import com.amazonaws.services.s3.model.AmazonS3Exception
import net.kemitix.thorp.console._ import net.kemitix.thorp.console._
import net.kemitix.thorp.domain.StorageQueueEvent.{ import net.kemitix.thorp.domain.StorageEvent.{
Action, ActionSummary,
DeleteQueueEvent, DeleteEvent,
ErrorQueueEvent ErrorEvent
} }
import net.kemitix.thorp.domain.{Bucket, RemoteKey} import net.kemitix.thorp.domain.{Bucket, RemoteKey}
import org.scalatest.FreeSpec import org.scalatest.FreeSpec
import zio.internal.PlatformLive import zio.internal.PlatformLive
import zio.{Runtime, Task, UIO} import zio.{Runtime, Task, UIO}
import net.kemitix.thorp.domain.NonUnit.~*
class DeleterTest extends FreeSpec { class DeleterTest extends FreeSpec {
@ -22,42 +21,39 @@ class DeleterTest extends FreeSpec {
val bucket = Bucket("aBucket") val bucket = Bucket("aBucket")
val remoteKey = RemoteKey("aRemoteKey") val remoteKey = RemoteKey("aRemoteKey")
"when no errors" in { "when no errors" in {
val expected = Right(DeleteQueueEvent(remoteKey)) val expected = Right(DeleteEvent(remoteKey))
new AmazonS3ClientTestFixture { new AmazonS3ClientTestFixture {
~*(
(fixture.amazonS3Client.deleteObject _) (fixture.amazonS3Client.deleteObject _)
.when() .when()
.returns(_ => UIO.succeed(()))) .returns(_ => UIO.succeed(()))
private val result = invoke(fixture.amazonS3Client)(bucket, remoteKey) private val result = invoke(fixture.amazonS3Client)(bucket, remoteKey)
~*(assertResult(expected)(result)) assertResult(expected)(result)
} }
} }
"when Amazon Service Exception" in { "when Amazon Service Exception" in {
val exception = new AmazonS3Exception("message") val exception = new AmazonS3Exception("message")
val expected = val expected =
Right( Right(
ErrorQueueEvent(Action.Delete(remoteKey.key), remoteKey, exception)) ErrorEvent(ActionSummary.Delete(remoteKey.key), remoteKey, exception))
new AmazonS3ClientTestFixture { new AmazonS3ClientTestFixture {
~*(
(fixture.amazonS3Client.deleteObject _) (fixture.amazonS3Client.deleteObject _)
.when() .when()
.returns(_ => Task.fail(exception))) .returns(_ => Task.fail(exception))
private val result = invoke(fixture.amazonS3Client)(bucket, remoteKey) private val result = invoke(fixture.amazonS3Client)(bucket, remoteKey)
~*(assertResult(expected)(result)) assertResult(expected)(result)
} }
} }
"when Amazon SDK Client Exception" in { "when Amazon SDK Client Exception" in {
val exception = new SdkClientException("message") val exception = new SdkClientException("message")
val expected = val expected =
Right( Right(
ErrorQueueEvent(Action.Delete(remoteKey.key), remoteKey, exception)) ErrorEvent(ActionSummary.Delete(remoteKey.key), remoteKey, exception))
new AmazonS3ClientTestFixture { new AmazonS3ClientTestFixture {
~*(
(fixture.amazonS3Client.deleteObject _) (fixture.amazonS3Client.deleteObject _)
.when() .when()
.returns(_ => Task.fail(exception))) .returns(_ => Task.fail(exception))
private val result = invoke(fixture.amazonS3Client)(bucket, remoteKey) private val result = invoke(fixture.amazonS3Client)(bucket, remoteKey)
~*(assertResult(expected)(result)) assertResult(expected)(result)
} }
} }
def invoke(amazonS3Client: AmazonS3.Client)(bucket: Bucket, def invoke(amazonS3Client: AmazonS3.Client)(bucket: Bucket,

View file

@ -8,7 +8,6 @@ import com.amazonaws.services.s3.model.{
ListObjectsV2Result, ListObjectsV2Result,
S3ObjectSummary S3ObjectSummary
} }
import net.kemitix.thorp.domain.NonUnit.~*
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.Storage import net.kemitix.thorp.storage.Storage
import org.scalatest.FreeSpec import org.scalatest.FreeSpec
@ -88,7 +87,7 @@ class ListerTest extends FreeSpec {
etag: String, etag: String,
truncated: Boolean) = { truncated: Boolean) = {
val result = new ListObjectsV2Result val result = new ListObjectsV2Result
~*(result.getObjectSummaries.add(objectSummary(key, etag, nowDate))) result.getObjectSummaries.add(objectSummary(key, etag, nowDate))
result.setTruncated(truncated) result.setTruncated(truncated)
result result
} }
@ -97,10 +96,9 @@ class ListerTest extends FreeSpec {
"when Amazon Service Exception" in { "when Amazon Service Exception" in {
val exception = new AmazonS3Exception("message") val exception = new AmazonS3Exception("message")
new AmazonS3ClientTestFixture { new AmazonS3ClientTestFixture {
~*(
(fixture.amazonS3Client.listObjectsV2 _) (fixture.amazonS3Client.listObjectsV2 _)
.when() .when()
.returns(_ => Task.fail(exception))) .returns(_ => Task.fail(exception))
private val result = invoke(fixture.amazonS3Client)(bucket, prefix) private val result = invoke(fixture.amazonS3Client)(bucket, prefix)
assert(result.isLeft) assert(result.isLeft)
} }
@ -108,10 +106,9 @@ class ListerTest extends FreeSpec {
"when Amazon SDK Client Exception" in { "when Amazon SDK Client Exception" in {
val exception = new SdkClientException("message") val exception = new SdkClientException("message")
new AmazonS3ClientTestFixture { new AmazonS3ClientTestFixture {
~*(
(fixture.amazonS3Client.listObjectsV2 _) (fixture.amazonS3Client.listObjectsV2 _)
.when() .when()
.returns(_ => Task.fail(exception))) .returns(_ => Task.fail(exception))
private val result = invoke(fixture.amazonS3Client)(bucket, prefix) private val result = invoke(fixture.amazonS3Client)(bucket, prefix)
assert(result.isLeft) assert(result.isLeft)
} }

View file

@ -1,147 +0,0 @@
package net.kemitix.thorp.storage.aws
import net.kemitix.thorp.lib.{LocalFileValidator, S3MetaDataEnricher}
import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem.Resource
import org.scalamock.scalatest.MockFactory
import org.scalatest.FunSpec
import scala.collection.MapView
class StorageServiceSuite extends FunSpec with MockFactory {
private val source = Resource(this, "upload")
private val sourcePath = source.toPath
private val sources = Sources(List(sourcePath))
private val prefix = RemoteKey("prefix")
describe("getS3Status") {
val hash = MD5Hash("hash")
val env = for {
localFile <- LocalFileValidator.resolve("the-file",
Map(MD5 -> hash),
sourcePath,
sources,
prefix)
key = localFile.remoteKey
keyOtherKey <- LocalFileValidator.resolve("other-key-same-hash",
Map(MD5 -> hash),
sourcePath,
sources,
prefix)
diffHash = MD5Hash("diff")
keyDiffHash <- LocalFileValidator.resolve("other-key-diff-hash",
Map(MD5 -> diffHash),
sourcePath,
sources,
prefix)
s3ObjectsData = RemoteObjects(
byHash = MapView(
hash -> key,
diffHash -> keyDiffHash.remoteKey
),
byKey = MapView(
key -> hash,
keyOtherKey.remoteKey -> hash,
keyDiffHash.remoteKey -> diffHash
)
)
} yield
(s3ObjectsData,
localFile: LocalFile,
keyOtherKey,
keyDiffHash,
diffHash,
key)
def invoke(localFile: LocalFile, s3ObjectsData: RemoteObjects) =
S3MetaDataEnricher.getS3Status(localFile, s3ObjectsData)
def getMatchesByKey(
status: (Option[MD5Hash], Map[MD5Hash, RemoteKey])): Option[MD5Hash] = {
val (byKey, _) = status
byKey
}
def getMatchesByHash(status: (Option[MD5Hash], Map[MD5Hash, RemoteKey]))
: Map[MD5Hash, RemoteKey] = {
val (_, byHash) = status
byHash
}
describe(
"when remote key exists, unmodified and other key matches the hash") {
it("should return the match by key") {
env.map({
case (s3ObjectsData, localFile, _, _, _, _) => {
val result = getMatchesByKey(invoke(localFile, s3ObjectsData))
assert(result.contains(hash))
}
})
}
it("should return both matches for the hash") {
env.map({
case (s3ObjectsData, localFile, keyOtherKey, _, _, key) => {
val result = getMatchesByHash(invoke(localFile, s3ObjectsData))
assertResult(
Set((hash, key), (hash, keyOtherKey.remoteKey))
)(result)
}
})
}
}
describe("when remote key does not exist and no others matches hash") {
val env2 = LocalFileValidator
.resolve("missing-file",
Map(MD5 -> MD5Hash("unique")),
sourcePath,
sources,
prefix)
it("should return no matches by key") {
env2.map(localFile => {
env.map({
case (s3ObjectsData, _, _, _, _, _) => {
val result = getMatchesByKey(invoke(localFile, s3ObjectsData))
assert(result.isEmpty)
}
})
})
}
it("should return no matches by hash") {
env2.map(localFile => {
env.map({
case (s3ObjectsData, _, _, _, _, _) => {
val result = getMatchesByHash(invoke(localFile, s3ObjectsData))
assert(result.isEmpty)
}
})
})
}
}
describe("when remote key exists and no others match hash") {
it("should return the match by key") {
env.map({
case (s3ObjectsData, _, _, keyDiffHash, diffHash, _) => {
val result = getMatchesByKey(invoke(keyDiffHash, s3ObjectsData))
assert(result.contains(diffHash))
}
})
}
it("should return one match by hash") {
env.map({
case (s3ObjectsData, _, _, keyDiffHash, diffHash, _) => {
val result = getMatchesByHash(invoke(keyDiffHash, s3ObjectsData))
assertResult(
Set((diffHash, keyDiffHash.remoteKey))
)(result)
}
})
}
}
}
}

View file

@ -7,16 +7,15 @@ import com.amazonaws.services.s3.model.AmazonS3Exception
import com.amazonaws.services.s3.transfer.model.UploadResult import com.amazonaws.services.s3.transfer.model.UploadResult
import net.kemitix.thorp.config.Config import net.kemitix.thorp.config.Config
import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain.StorageQueueEvent.{ import net.kemitix.thorp.domain.StorageEvent.{
Action, ActionSummary,
ErrorQueueEvent, ErrorEvent,
UploadQueueEvent UploadEvent
} }
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import org.scalamock.scalatest.MockFactory import org.scalamock.scalatest.MockFactory
import org.scalatest.FreeSpec import org.scalatest.FreeSpec
import zio.{DefaultRuntime, Task} import zio.{DefaultRuntime, Task}
import net.kemitix.thorp.domain.NonUnit.~*
import net.kemitix.thorp.filesystem.Resource import net.kemitix.thorp.filesystem.Resource
class UploaderTest extends FreeSpec with MockFactory { class UploaderTest extends FreeSpec with MockFactory {
@ -39,57 +38,54 @@ class UploaderTest extends FreeSpec with MockFactory {
UploadEventListener.Settings(localFile, 0, 0, batchMode = true) UploadEventListener.Settings(localFile, 0, 0, batchMode = true)
"when no error" in { "when no error" in {
val expected = val expected =
Right(UploadQueueEvent(remoteKey, aHash)) Right(UploadEvent(remoteKey, aHash))
new AmazonS3ClientTestFixture { new AmazonS3ClientTestFixture {
~*(
(fixture.amazonS3TransferManager.upload _) (fixture.amazonS3TransferManager.upload _)
.when() .when()
.returns(_ => Task.succeed(inProgress))) .returns(_ => Task.succeed(inProgress))
private val result = private val result =
invoke(fixture.amazonS3TransferManager)( invoke(fixture.amazonS3TransferManager)(
localFile, localFile,
bucket, bucket,
listenerSettings listenerSettings
) )
~*(assertResult(expected)(result)) assertResult(expected)(result)
} }
} }
"when Amazon Service Exception" in { "when Amazon Service Exception" in {
val exception = new AmazonS3Exception("message") val exception = new AmazonS3Exception("message")
val expected = val expected =
Right( Right(
ErrorQueueEvent(Action.Upload(remoteKey.key), remoteKey, exception)) ErrorEvent(ActionSummary.Upload(remoteKey.key), remoteKey, exception))
new AmazonS3ClientTestFixture { new AmazonS3ClientTestFixture {
~*(
(fixture.amazonS3TransferManager.upload _) (fixture.amazonS3TransferManager.upload _)
.when() .when()
.returns(_ => Task.fail(exception))) .returns(_ => Task.fail(exception))
private val result = private val result =
invoke(fixture.amazonS3TransferManager)( invoke(fixture.amazonS3TransferManager)(
localFile, localFile,
bucket, bucket,
listenerSettings listenerSettings
) )
~*(assertResult(expected)(result)) assertResult(expected)(result)
} }
} }
"when Amazon SDK Client Exception" in { "when Amazon SDK Client Exception" in {
val exception = new SdkClientException("message") val exception = new SdkClientException("message")
val expected = val expected =
Right( Right(
ErrorQueueEvent(Action.Upload(remoteKey.key), remoteKey, exception)) ErrorEvent(ActionSummary.Upload(remoteKey.key), remoteKey, exception))
new AmazonS3ClientTestFixture { new AmazonS3ClientTestFixture {
~*(
(fixture.amazonS3TransferManager.upload _) (fixture.amazonS3TransferManager.upload _)
.when() .when()
.returns(_ => Task.fail(exception))) .returns(_ => Task.fail(exception))
private val result = private val result =
invoke(fixture.amazonS3TransferManager)( invoke(fixture.amazonS3TransferManager)(
localFile, localFile,
bucket, bucket,
listenerSettings listenerSettings
) )
~*(assertResult(expected)(result)) assertResult(expected)(result)
} }
} }
def invoke(transferManager: AmazonTransferManager)( def invoke(transferManager: AmazonTransferManager)(

View file

@ -19,34 +19,34 @@ object Storage {
localFile: LocalFile, localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
listenerSettings: UploadEventListener.Settings, listenerSettings: UploadEventListener.Settings,
): ZIO[Storage, Nothing, StorageQueueEvent] ): ZIO[Storage, Nothing, StorageEvent]
def copy( def copy(
bucket: Bucket, bucket: Bucket,
sourceKey: RemoteKey, sourceKey: RemoteKey,
hash: MD5Hash, hash: MD5Hash,
targetKey: RemoteKey targetKey: RemoteKey
): ZIO[Storage, Nothing, StorageQueueEvent] ): ZIO[Storage, Nothing, StorageEvent]
def delete( def delete(
bucket: Bucket, bucket: Bucket,
remoteKey: RemoteKey remoteKey: RemoteKey
): UIO[StorageQueueEvent] ): UIO[StorageEvent]
def shutdown: UIO[StorageQueueEvent] def shutdown: UIO[StorageEvent]
} }
trait Test extends Storage { trait Test extends Storage {
def listResult: Task[RemoteObjects] = def listResult: Task[RemoteObjects] =
Task.die(new NotImplementedError) Task.die(new NotImplementedError)
def uploadResult: UIO[StorageQueueEvent] = def uploadResult: UIO[StorageEvent] =
Task.die(new NotImplementedError) Task.die(new NotImplementedError)
def copyResult: UIO[StorageQueueEvent] = def copyResult: UIO[StorageEvent] =
Task.die(new NotImplementedError) Task.die(new NotImplementedError)
def deleteResult: UIO[StorageQueueEvent] = def deleteResult: UIO[StorageEvent] =
Task.die(new NotImplementedError) Task.die(new NotImplementedError)
def shutdownResult: UIO[StorageQueueEvent] = def shutdownResult: UIO[StorageEvent] =
Task.die(new NotImplementedError) Task.die(new NotImplementedError)
val storage: Service = new Service { val storage: Service = new Service {
@ -59,21 +59,21 @@ object Storage {
localFile: LocalFile, localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
listenerSettings: UploadEventListener.Settings listenerSettings: UploadEventListener.Settings
): ZIO[Storage, Nothing, StorageQueueEvent] = ): ZIO[Storage, Nothing, StorageEvent] =
uploadResult uploadResult
override def copy( override def copy(
bucket: Bucket, bucket: Bucket,
sourceKey: RemoteKey, sourceKey: RemoteKey,
hash: MD5Hash, hash: MD5Hash,
targetKey: RemoteKey): ZIO[Storage, Nothing, StorageQueueEvent] = targetKey: RemoteKey): ZIO[Storage, Nothing, StorageEvent] =
copyResult copyResult
override def delete(bucket: Bucket, override def delete(bucket: Bucket,
remoteKey: RemoteKey): UIO[StorageQueueEvent] = remoteKey: RemoteKey): UIO[StorageEvent] =
deleteResult deleteResult
override def shutdown: UIO[StorageQueueEvent] = override def shutdown: UIO[StorageEvent] =
shutdownResult shutdownResult
} }
@ -89,7 +89,7 @@ object Storage {
localFile: LocalFile, localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
listenerSettings: UploadEventListener.Settings listenerSettings: UploadEventListener.Settings
): ZIO[Storage, Nothing, StorageQueueEvent] = ): ZIO[Storage, Nothing, StorageEvent] =
ZIO.accessM(_.storage upload (localFile, bucket, listenerSettings)) ZIO.accessM(_.storage upload (localFile, bucket, listenerSettings))
final def copy( final def copy(
@ -97,13 +97,13 @@ object Storage {
sourceKey: RemoteKey, sourceKey: RemoteKey,
hash: MD5Hash, hash: MD5Hash,
targetKey: RemoteKey targetKey: RemoteKey
): ZIO[Storage, Nothing, StorageQueueEvent] = ): ZIO[Storage, Nothing, StorageEvent] =
ZIO.accessM(_.storage copy (bucket, sourceKey, hash, targetKey)) ZIO.accessM(_.storage copy (bucket, sourceKey, hash, targetKey))
final def delete( final def delete(
bucket: Bucket, bucket: Bucket,
remoteKey: RemoteKey remoteKey: RemoteKey
): ZIO[Storage, Nothing, StorageQueueEvent] = ): ZIO[Storage, Nothing, StorageEvent] =
ZIO.accessM(_.storage delete (bucket, remoteKey)) ZIO.accessM(_.storage delete (bucket, remoteKey))
} }