Convert Storage to full ZIO effect module (#133)

* [console] Rename MyConsole as Console

* [console] break infinite loop

* [console] fix typo

* [console] clean up helpers

* [cli] Main use ZIO#provide to run program

* [cli] Main define Program type alias

* [cli] Program handle cli args in Program

* [cli] Program doesn't extend PlanBuilder

* [cli] refactoring

* [cli] rename ParseArgs as CliArgs

* [cli] CliArgs#apply renamed a parse

* [storage-aws] S3StorageService renamed as S3Storage

* [storage-api] Rename StorageService as Storage.Service

* [storage-api] make Storage.copy effectTotal

* [storage-api] make Storage.delete effectTotal

* [storage-api] make Storage.shutdown effectTotal

* [storage-api] make Storage.upload effectTotal

* [storage-aws] Lister refactoring

* [storage-aws] make Lister into a trait

* [storage-aws] make Copier into a trait

* [storage-aws] make Deleter into a trait

* [storate-aws] make Uploader into a trait

* [storage-aws] AmazonS3 move error handling out of client wrapper

* [storage-aws] DeleterTest added

* [storage-aws] ListerTest added

* [storage-aws] Uploader refactoring

* [storage-aws] CopierTest test Copier directly

* [storage-aws] DeleterTest test Deleter directly

* [storate-aws] ListerTest test Lister directly

* [storage-aws] UploaderTest added

* [storage-aws] S3Storage.Live replaces S3StorageServiceBuilder

* Complete migration to Module for Storage

* [cli] Main define LiveThorpApp object

* [core] Add CoreTypes

* [cli] Program Refactoring

* [core] PlanBuilding Refactoring

* [changelog] updated

* [console] Console.Live Usage of get on optional type

* [storage-aws] AmazonS3ClientTestFixture Use wildcards when selecting more than 6 elements
This commit is contained in:
Paul Campbell 2019-07-28 20:11:03 +01:00 committed by GitHub
parent 985cc9f147
commit 96a83e6c3e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
39 changed files with 1121 additions and 1165 deletions

View file

@ -16,9 +16,10 @@ The format is based on [[https://keepachangelog.com/en/1.0.0/][Keep a Changelog]
** Changed ** Changed
- Replace cats-effect with zio (#117) - [internal] Replace cats-effect with zio (#117)
- Replace Monocle with local SimpleLens implementation (#121) - [internal] Replace Monocle with local SimpleLens implementation (#121)
- Don't use String as key in Map for hashes (#124) - [internal] Don't use String as key in Map for hashes (#124)
- [internal] Convert Storage to full ZIO effect module (#133)
** Dependencies ** Dependencies

View file

@ -6,9 +6,9 @@ import net.kemitix.thorp.core.{ConfigOption, ConfigOptions}
import scopt.OParser import scopt.OParser
import zio.Task import zio.Task
object ParseArgs { object CliArgs {
def apply(args: List[String]): Task[ConfigOptions] = Task { def parse(args: List[String]): Task[ConfigOptions] = Task {
OParser OParser
.parse(configParser, args, List()) .parse(configParser, args, List())
.map(ConfigOptions(_)) .map(ConfigOptions(_))

View file

@ -1,22 +1,17 @@
package net.kemitix.thorp.cli package net.kemitix.thorp.cli
import net.kemitix.thorp.console.MyConsole import net.kemitix.thorp.console.Console
import zio.internal.PlatformLive import net.kemitix.thorp.storage.aws.S3Storage
import zio.{App, Runtime, UIO, ZIO} import zio.{App, ZIO}
object Main extends App { object Main extends App {
private val runtime = Runtime(MyConsole.Live, PlatformLive.Default) object LiveThorpApp extends S3Storage.Live with Console.Live
override def run(args: List[String]): ZIO[Environment, Nothing, Int] = override def run(args: List[String]): ZIO[Environment, Nothing, Int] =
runtime.unsafeRun { Program
appLogic(args).fold(_ => UIO(1), _ => UIO(0)) .run(args)
} .provide(LiveThorpApp)
.fold(_ => 1, _ => 0)
private def appLogic(args: List[String]): ZIO[MyConsole, Throwable, Unit] =
for {
cliOptions <- ParseArgs(args)
_ <- Program.run(cliOptions)
} yield ()
} }

View file

@ -1,79 +1,87 @@
package net.kemitix.thorp.cli package net.kemitix.thorp.cli
import net.kemitix.thorp.console._ import net.kemitix.thorp.console._
import net.kemitix.thorp.core.CoreTypes.CoreProgram
import net.kemitix.thorp.core._ import net.kemitix.thorp.core._
import net.kemitix.thorp.domain.{StorageQueueEvent, SyncTotals} import net.kemitix.thorp.domain.StorageQueueEvent
import net.kemitix.thorp.storage.aws.S3HashService.defaultHashService import net.kemitix.thorp.storage.aws.S3HashService.defaultHashService
import net.kemitix.thorp.storage.aws.S3StorageServiceBuilder.defaultStorageService import zio.{UIO, ZIO}
import zio.{Task, TaskR, ZIO}
trait Program extends PlanBuilder { trait Program {
lazy val version = s"Thorp v${thorp.BuildInfo.version}" lazy val version = s"Thorp v${thorp.BuildInfo.version}"
def run(cliOptions: ConfigOptions): ZIO[MyConsole, Nothing, Unit] = { def run(args: List[String]): CoreProgram[Unit] = {
val showVersion = ConfigQuery.showVersion(cliOptions)
for { for {
_ <- ZIO.when(showVersion)(putStrLn(version)) cli <- CliArgs.parse(args)
_ <- ZIO.when(!showVersion)(execute(cliOptions).catchAll(handleErrors)) _ <- ZIO.when(showVersion(cli))(putStrLn(version))
_ <- ZIO.when(!showVersion(cli))(execute(cli).catchAll(handleErrors))
} yield () } yield ()
} }
private def execute( private def showVersion: ConfigOptions => Boolean =
cliOptions: ConfigOptions): ZIO[MyConsole, Throwable, Unit] = { cli => ConfigQuery.showVersion(cli)
private def execute(cliOptions: ConfigOptions) = {
for { for {
plan <- createPlan(defaultStorageService, defaultHashService, cliOptions) plan <- PlanBuilder.createPlan(defaultHashService, cliOptions)
archive <- thorpArchive(cliOptions, plan.syncTotals) batchMode <- isBatchMode(cliOptions)
events <- handleActions(archive, plan) archive <- UnversionedMirrorArchive.default(batchMode, plan.syncTotals)
_ <- defaultStorageService.shutdown events <- applyPlan(archive, plan)
_ <- SyncLogging.logRunFinished(events) _ <- SyncLogging.logRunFinished(events)
} yield () } yield ()
} }
private def handleErrors( private def isBatchMode(cliOptions: ConfigOptions) =
throwable: Throwable): ZIO[MyConsole, Nothing, Unit] = UIO(ConfigQuery.batchMode(cliOptions))
private def handleErrors(throwable: Throwable) =
for { for {
_ <- putStrLn("There were errors:") _ <- putStrLn("There were errors:")
_ <- throwable match { _ <- throwable match {
case ConfigValidationException(errors) => case ConfigValidationException(errors) =>
ZIO.foreach(errors)(error => putStrLn(s"- $error")) ZIO.foreach(errors)(error => putStrLn(s"- $error"))
case x => throw x
} }
} yield () } yield ()
def thorpArchive( private def applyPlan(
cliOptions: ConfigOptions,
syncTotals: SyncTotals
): Task[ThorpArchive] = Task {
UnversionedMirrorArchive.default(
defaultStorageService,
ConfigQuery.batchMode(cliOptions),
syncTotals
)
}
private def handleActions(
archive: ThorpArchive, archive: ThorpArchive,
syncPlan: SyncPlan syncPlan: SyncPlan
): TaskR[MyConsole, Stream[StorageQueueEvent]] = { ) = {
type Accumulator = (Stream[StorageQueueEvent], Long) val zero: (Stream[StorageQueueEvent], Long) =
val zero: Accumulator = (Stream(), syncPlan.syncTotals.totalSizeBytes) (Stream(), syncPlan.syncTotals.totalSizeBytes)
TaskR val actions = syncPlan.actions.zipWithIndex
.foldLeft(syncPlan.actions.zipWithIndex)(zero)((acc, indexedAction) => { ZIO
val (action, index) = indexedAction .foldLeft(actions)(zero)((acc, action) =>
val (stream, bytesToDo) = acc applyAction(archive, acc, action))
val remainingBytes = bytesToDo - action.size
(for {
event <- archive.update(index, action, remainingBytes)
events = stream ++ Stream(event)
} yield events)
.map((_, remainingBytes))
})
.map { .map {
case (events, _) => events case (events, _) => events
} }
} }
private def applyAction(
archive: ThorpArchive,
acc: (Stream[StorageQueueEvent], Long),
indexedAction: (Action, Int)
) = {
val (action, index) = indexedAction
val (stream, bytesToDo) = acc
val remainingBytes = bytesToDo - action.size
updateArchive(archive, action, index, stream, remainingBytes)
.map((_, remainingBytes))
}
private def updateArchive(
archive: ThorpArchive,
action: Action,
index: Int,
stream: Stream[StorageQueueEvent],
remainingBytes: Long
) =
for {
event <- archive.update(index, action, remainingBytes)
} yield stream ++ Stream(event)
} }
object Program extends Program object Program extends Program

View file

@ -9,7 +9,7 @@ import zio.DefaultRuntime
import scala.util.Try import scala.util.Try
class ParseArgsTest extends FunSpec { class CliArgsTest extends FunSpec {
private val runtime = new DefaultRuntime {} private val runtime = new DefaultRuntime {}
@ -77,7 +77,7 @@ class ParseArgsTest extends FunSpec {
private def invoke(args: List[String]) = private def invoke(args: List[String]) =
runtime runtime
.unsafeRunSync { .unsafeRunSync {
ParseArgs(args) CliArgs.parse(args)
} }
.toEither .toEither
.toOption .toOption

View file

@ -1,84 +0,0 @@
package net.kemitix.thorp.cli
import java.io.File
import java.nio.file.Path
import net.kemitix.thorp.console.MyConsole
import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.core._
import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.{HashService, StorageService}
import org.scalatest.FunSpec
import zio.internal.PlatformLive
import zio.{Runtime, Task, TaskR}
class ProgramTest extends FunSpec {
private val runtime = Runtime(MyConsole.Live, PlatformLive.Default)
val source: File = Resource(this, ".")
val sourcePath: Path = source.toPath
val bucket: Bucket = Bucket("aBucket")
val hash: MD5Hash = MD5Hash("aHash")
val copyAction: Action =
ToCopy(bucket, RemoteKey("copy-me"), hash, RemoteKey("overwrite-me"), 17L)
val uploadAction: Action = ToUpload(
bucket,
LocalFile.resolve("aFile", Map(), sourcePath, _ => RemoteKey("upload-me")),
23L)
val deleteAction: Action = ToDelete(bucket, RemoteKey("delete-me"), 0L)
val configOptions: ConfigOptions = ConfigOptions(
options = List(
ConfigOption.IgnoreGlobalOptions,
ConfigOption.IgnoreUserOptions
))
describe("upload, copy and delete actions in plan") {
val archive = TestProgram.thorpArchive
it("should be handled in correct order") {
val expected = List(copyAction, uploadAction, deleteAction)
invoke(configOptions)
val result = archive.actions.reverse
assertResult(expected)(result)
}
}
private def invoke(configOptions: ConfigOptions) =
runtime.unsafeRunSync {
TestProgram.run(configOptions)
}.toEither
trait TestPlanBuilder extends PlanBuilder {
override def createPlan(
storageService: StorageService,
hashService: HashService,
configOptions: ConfigOptions
): Task[SyncPlan] = {
Task(SyncPlan(Stream(copyAction, uploadAction, deleteAction)))
}
}
class ActionCaptureArchive extends ThorpArchive {
var actions: List[Action] = List[Action]()
override def update(
index: Int,
action: Action,
totalBytesSoFar: Long
): TaskR[MyConsole, StorageQueueEvent] = {
actions = action :: actions
TaskR(DoNothingQueueEvent(RemoteKey("")))
}
}
object TestProgram extends Program with TestPlanBuilder {
val thorpArchive: ActionCaptureArchive = new ActionCaptureArchive
override def thorpArchive(
cliOptions: ConfigOptions,
syncTotals: SyncTotals
): Task[ThorpArchive] =
Task(thorpArchive)
}
}

View file

@ -0,0 +1,54 @@
package net.kemitix.thorp.console
import java.io.PrintStream
import java.util.concurrent.atomic.AtomicReference
import zio.{UIO, ZIO}
import scala.{Console => SConsole}
trait Console {
val console: Console.Service
}
object Console {
trait Service {
def putStrLn(line: ConsoleOut): ZIO[Console, Nothing, Unit]
def putStrLn(line: String): ZIO[Console, Nothing, Unit]
}
trait Live extends Console {
val console: Service = new Service {
override def putStrLn(line: ConsoleOut): ZIO[Console, Nothing, Unit] =
putStrLn(line.en)
override def putStrLn(line: String): ZIO[Console, Nothing, Unit] =
putStrLn(SConsole.out)(line)
final def putStrLn(stream: PrintStream)(
line: String): ZIO[Console, Nothing, Unit] =
UIO(SConsole.withOut(stream)(SConsole.println(line)))
}
}
object Live extends Live
trait Test extends Console {
private val output = new AtomicReference(List.empty[String])
def getOutput: List[String] = output.get
val console: Service = new Service {
override def putStrLn(line: ConsoleOut): ZIO[Console, Nothing, Unit] =
putStrLn(line.en)
override def putStrLn(line: String): ZIO[Console, Nothing, Unit] = {
output.accumulateAndGet(List(line), (a, b) => a ++ b)
ZIO.succeed(())
}
}
}
object Test extends Test
}

View file

@ -1,31 +0,0 @@
package net.kemitix.thorp.console
import java.io.PrintStream
import zio.{UIO, ZIO}
trait MyConsole {
val console: MyConsole.Service
}
object MyConsole {
trait Service {
def putStrLn(line: ConsoleOut): ZIO[MyConsole, Nothing, Unit]
def putStrLn(line: String): ZIO[MyConsole, Nothing, Unit]
}
trait Live extends MyConsole {
val console: Service = new Service {
override def putStrLn(line: ConsoleOut): ZIO[MyConsole, Nothing, Unit] =
putStrLn(line)
override def putStrLn(line: String): ZIO[MyConsole, Nothing, Unit] =
putStrLn(Console.out)(line)
final def putStrLn(stream: PrintStream)(
line: String): ZIO[MyConsole, Nothing, Unit] =
UIO(Console.withOut(stream)(Console.println(line)))
}
}
object Live extends Live
}

View file

@ -2,15 +2,15 @@ package net.kemitix.thorp
import zio.ZIO import zio.ZIO
package object console extends MyConsole.Service { package object console {
final val consoleService: ZIO[MyConsole, Nothing, MyConsole.Service] = final val consoleService: ZIO[Console, Nothing, Console.Service] =
ZIO.access(_.console) ZIO.access(_.console)
final def putStrLn(line: String): ZIO[MyConsole, Nothing, Unit] = final def putStrLn(line: String): ZIO[Console, Nothing, Unit] =
ZIO.accessM(_.console putStrLn line) ZIO.accessM(_.console putStrLn line)
override def putStrLn(line: ConsoleOut): ZIO[MyConsole, Nothing, Unit] = final def putStrLn(line: ConsoleOut): ZIO[Console, Nothing, Unit] =
putStrLn(line.en) ZIO.accessM(_.console putStrLn line)
} }

View file

@ -0,0 +1,12 @@
package net.kemitix.thorp.core
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.storage.api.Storage
import zio.ZIO
object CoreTypes {
type CoreEnv = Storage with Console
type CoreProgram[A] = ZIO[CoreEnv, Throwable, A]
}

View file

@ -3,44 +3,42 @@ package net.kemitix.thorp.core
import net.kemitix.thorp.console._ import net.kemitix.thorp.console._
import net.kemitix.thorp.core.Action._ import net.kemitix.thorp.core.Action._
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.{HashService, StorageService} import net.kemitix.thorp.storage._
import net.kemitix.thorp.storage.api.{HashService, Storage}
import zio.{Task, TaskR} import zio.{Task, TaskR}
trait PlanBuilder { trait PlanBuilder {
def createPlan( def createPlan(
storageService: StorageService,
hashService: HashService, hashService: HashService,
configOptions: ConfigOptions configOptions: ConfigOptions
): TaskR[MyConsole, SyncPlan] = ): TaskR[Storage with Console, SyncPlan] =
ConfigurationBuilder ConfigurationBuilder
.buildConfig(configOptions) .buildConfig(configOptions)
.catchAll(errors => TaskR.fail(ConfigValidationException(errors))) .catchAll(errors => TaskR.fail(ConfigValidationException(errors)))
.flatMap(config => useValidConfig(storageService, hashService)(config)) .flatMap(config => useValidConfig(hashService)(config))
def useValidConfig( private def useValidConfig(
storageService: StorageService,
hashService: HashService hashService: HashService
)(implicit c: Config): TaskR[MyConsole, SyncPlan] = { )(implicit c: Config) = {
for { for {
_ <- SyncLogging.logRunStart(c.bucket, c.prefix, c.sources) _ <- SyncLogging.logRunStart(c.bucket, c.prefix, c.sources)
actions <- buildPlan(storageService, hashService) actions <- buildPlan(hashService)
} yield actions } yield actions
} }
private def buildPlan( private def buildPlan(
storageService: StorageService,
hashService: HashService hashService: HashService
)(implicit c: Config): TaskR[MyConsole, SyncPlan] = )(implicit c: Config) =
for { for {
metadata <- gatherMetadata(storageService, hashService) metadata <- gatherMetadata(hashService)
} yield assemblePlan(c)(metadata) } yield assemblePlan(c)(metadata)
def assemblePlan( private def assemblePlan(
implicit c: Config): ((S3ObjectsData, LocalFiles)) => SyncPlan = { implicit c: Config): ((S3ObjectsData, LocalFiles)) => SyncPlan = {
case (remoteData, localData) => case (remoteData, localData) =>
SyncPlan( SyncPlan(
actions = createActions(remoteData, localData) actions = createActions(c)(remoteData)(localData)
.filter(doesSomething) .filter(doesSomething)
.sortBy(SequencePlan.order), .sortBy(SequencePlan.order),
syncTotals = SyncTotals(count = localData.count, syncTotals = SyncTotals(count = localData.count,
@ -48,63 +46,64 @@ trait PlanBuilder {
) )
} }
private def createActions( private def createActions
remoteData: S3ObjectsData, : Config => S3ObjectsData => LocalFiles => Stream[Action] =
localData: LocalFiles c =>
)(implicit c: Config): Stream[Action] = remoteData =>
actionsForLocalFiles(localData, remoteData) ++ localData =>
actionsForRemoteKeys(remoteData) actionsForLocalFiles(c)(remoteData)(localData) ++
actionsForRemoteKeys(c)(remoteData)
def doesSomething: Action => Boolean = { private def doesSomething: Action => Boolean = {
case _: DoNothing => false case _: DoNothing => false
case _ => true case _ => true
} }
private val emptyActionStream = Stream[Action]() private def actionsForLocalFiles
: Config => S3ObjectsData => LocalFiles => Stream[Action] =
c =>
remoteData =>
localData =>
localData.localFiles.foldLeft(Stream.empty[Action])((acc, lf) =>
createActionFromLocalFile(c)(lf)(remoteData)(acc) ++ acc)
private def actionsForLocalFiles( private def createActionFromLocalFile
localData: LocalFiles, : Config => LocalFile => S3ObjectsData => Stream[Action] => Stream[Action] =
remoteData: S3ObjectsData c =>
)(implicit c: Config) = lf =>
localData.localFiles.foldLeft(emptyActionStream)((acc, lf) => remoteData =>
createActionFromLocalFile(lf, remoteData, acc) ++ acc) previousActions =>
ActionGenerator.createActions(
S3MetaDataEnricher.getMetadata(lf, remoteData)(c),
previousActions)(c)
private def createActionFromLocalFile( private def actionsForRemoteKeys: Config => S3ObjectsData => Stream[Action] =
lf: LocalFile, c =>
remoteData: S3ObjectsData, remoteData =>
previousActions: Stream[Action] remoteData.byKey.keys.foldLeft(Stream.empty[Action])((acc, rk) =>
)(implicit c: Config) = createActionFromRemoteKey(c)(rk) #:: acc)
ActionGenerator.createActions(
S3MetaDataEnricher.getMetadata(lf, remoteData),
previousActions)
private def actionsForRemoteKeys(remoteData: S3ObjectsData)( private def createActionFromRemoteKey: Config => RemoteKey => Action =
implicit c: Config) = c =>
remoteData.byKey.keys.foldLeft(emptyActionStream)((acc, rk) => rk =>
createActionFromRemoteKey(rk) #:: acc) if (rk.isMissingLocally(c.sources, c.prefix))
Action.ToDelete(c.bucket, rk, 0L)
private def createActionFromRemoteKey(rk: RemoteKey)(implicit c: Config) = else DoNothing(c.bucket, rk, 0L)
if (rk.isMissingLocally(c.sources, c.prefix))
Action.ToDelete(c.bucket, rk, 0L)
else DoNothing(c.bucket, rk, 0L)
private def gatherMetadata( private def gatherMetadata(
storageService: StorageService,
hashService: HashService hashService: HashService
)(implicit c: Config): TaskR[MyConsole, (S3ObjectsData, LocalFiles)] = )(implicit c: Config) =
for { for {
remoteData <- fetchRemoteData(storageService) remoteData <- fetchRemoteData
localData <- findLocalFiles(hashService) localData <- findLocalFiles(hashService)
} yield (remoteData, localData) } yield (remoteData, localData)
private def fetchRemoteData( private def fetchRemoteData(implicit c: Config) =
storageService: StorageService listObjects(c.bucket, c.prefix)
)(implicit c: Config): TaskR[MyConsole, S3ObjectsData] =
storageService.listObjects(c.bucket, c.prefix)
private def findLocalFiles( private def findLocalFiles(
hashService: HashService hashService: HashService
)(implicit config: Config): TaskR[MyConsole, LocalFiles] = )(implicit config: Config) =
for { for {
_ <- SyncLogging.logFileScan _ <- SyncLogging.logFileScan
localFiles <- findFiles(hashService) localFiles <- findFiles(hashService)
@ -112,7 +111,7 @@ trait PlanBuilder {
private def findFiles( private def findFiles(
hashService: HashService hashService: HashService
)(implicit c: Config): Task[LocalFiles] = { )(implicit c: Config) = {
Task Task
.foreach(c.sources.paths)(LocalFileStream.findFiles(_, hashService)) .foreach(c.sources.paths)(LocalFileStream.findFiles(_, hashService))
.map(_.foldLeft(LocalFiles())((acc, localFile) => acc ++ localFile)) .map(_.foldLeft(LocalFiles())((acc, localFile) => acc ++ localFile))

View file

@ -17,17 +17,17 @@ trait SyncLogging {
bucket: Bucket, bucket: Bucket,
prefix: RemoteKey, prefix: RemoteKey,
sources: Sources sources: Sources
): ZIO[MyConsole, Nothing, Unit] = ): ZIO[Console, Nothing, Unit] =
for { for {
_ <- putStrLn(ConsoleOut.ValidConfig(bucket, prefix, sources)) _ <- putStrLn(ConsoleOut.ValidConfig(bucket, prefix, sources))
} yield () } yield ()
def logFileScan(implicit c: Config): ZIO[MyConsole, Nothing, Unit] = def logFileScan(implicit c: Config): ZIO[Console, Nothing, Unit] =
putStrLn(s"Scanning local files: ${c.sources.paths.mkString(", ")}...") putStrLn(s"Scanning local files: ${c.sources.paths.mkString(", ")}...")
def logRunFinished( def logRunFinished(
actions: Stream[StorageQueueEvent] actions: Stream[StorageQueueEvent]
): ZIO[MyConsole, Nothing, Unit] = { ): ZIO[Console, Nothing, Unit] = {
val counters = actions.foldLeft(Counters())(countActivities) val counters = actions.foldLeft(Counters())(countActivities)
for { for {
_ <- putStrLn(eraseToEndOfScreen) _ <- putStrLn(eraseToEndOfScreen)

View file

@ -11,6 +11,7 @@ import net.kemitix.thorp.domain.StorageQueueEvent.{
UploadQueueEvent UploadQueueEvent
} }
import net.kemitix.thorp.domain.Terminal._ import net.kemitix.thorp.domain.Terminal._
import net.kemitix.thorp.storage.api.Storage
import zio.TaskR import zio.TaskR
import scala.io.AnsiColor._ import scala.io.AnsiColor._
@ -21,12 +22,12 @@ trait ThorpArchive {
index: Int, index: Int,
action: Action, action: Action,
totalBytesSoFar: Long totalBytesSoFar: Long
): TaskR[MyConsole, StorageQueueEvent] ): TaskR[Storage with Console, StorageQueueEvent]
def logEvent( def logEvent(
event: StorageQueueEvent, event: StorageQueueEvent,
batchMode: Boolean batchMode: Boolean
): TaskR[MyConsole, Unit] = ): TaskR[Console, Unit] =
event match { event match {
case UploadQueueEvent(remoteKey, _) => case UploadQueueEvent(remoteKey, _) =>
for { for {

View file

@ -4,11 +4,11 @@ import net.kemitix.thorp.console._
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.StorageService import net.kemitix.thorp.storage._
import net.kemitix.thorp.storage.api.Storage
import zio.{Task, TaskR} import zio.{Task, TaskR}
case class UnversionedMirrorArchive( case class UnversionedMirrorArchive(
storageService: StorageService,
batchMode: Boolean, batchMode: Boolean,
syncTotals: SyncTotals syncTotals: SyncTotals
) extends ThorpArchive { ) extends ThorpArchive {
@ -17,7 +17,7 @@ case class UnversionedMirrorArchive(
index: Int, index: Int,
action: Action, action: Action,
totalBytesSoFar: Long totalBytesSoFar: Long
): TaskR[MyConsole, StorageQueueEvent] = ): TaskR[Storage with Console, StorageQueueEvent] =
action match { action match {
case ToUpload(bucket, localFile, _) => case ToUpload(bucket, localFile, _) =>
for { for {
@ -26,12 +26,12 @@ case class UnversionedMirrorArchive(
} yield event } yield event
case ToCopy(bucket, sourceKey, hash, targetKey, _) => case ToCopy(bucket, sourceKey, hash, targetKey, _) =>
for { for {
event <- storageService.copy(bucket, sourceKey, hash, targetKey) event <- copyObject(bucket, sourceKey, hash, targetKey)
_ <- logEvent(event, batchMode) _ <- logEvent(event, batchMode)
} yield event } yield event
case ToDelete(bucket, remoteKey, _) => case ToDelete(bucket, remoteKey, _) =>
for { for {
event <- storageService.delete(bucket, remoteKey) event <- deleteObject(bucket, remoteKey)
_ <- logEvent(event, batchMode) _ <- logEvent(event, batchMode)
} yield event } yield event
case DoNothing(_, remoteKey, _) => case DoNothing(_, remoteKey, _) =>
@ -44,19 +44,19 @@ case class UnversionedMirrorArchive(
bucket: Bucket, bucket: Bucket,
localFile: LocalFile localFile: LocalFile
) = ) =
storageService.upload( upload(localFile,
localFile, bucket,
bucket, batchMode,
batchMode, UploadEventListener(localFile, index, syncTotals, totalBytesSoFar),
UploadEventListener(localFile, index, syncTotals, totalBytesSoFar), 1)
1)
} }
object UnversionedMirrorArchive { object UnversionedMirrorArchive {
def default( def default(
storageService: StorageService,
batchMode: Boolean, batchMode: Boolean,
syncTotals: SyncTotals syncTotals: SyncTotals
): ThorpArchive = ): Task[ThorpArchive] =
new UnversionedMirrorArchive(storageService, batchMode, syncTotals) Task {
new UnversionedMirrorArchive(batchMode, syncTotals)
}
} }

View file

@ -4,39 +4,39 @@ import java.io.File
import net.kemitix.thorp.console._ import net.kemitix.thorp.console._
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.StorageService import net.kemitix.thorp.storage.api.Storage
import zio.{Task, TaskR} import zio.{TaskR, UIO}
case class DummyStorageService(s3ObjectData: S3ObjectsData, case class DummyStorageService(s3ObjectData: S3ObjectsData,
uploadFiles: Map[File, (RemoteKey, MD5Hash)]) uploadFiles: Map[File, (RemoteKey, MD5Hash)])
extends StorageService { extends Storage.Service {
override def shutdown: Task[StorageQueueEvent] = override def shutdown: UIO[StorageQueueEvent] =
Task(StorageQueueEvent.ShutdownQueueEvent()) UIO(StorageQueueEvent.ShutdownQueueEvent())
override def listObjects( override def listObjects(
bucket: Bucket, bucket: Bucket,
prefix: RemoteKey prefix: RemoteKey
): TaskR[MyConsole, S3ObjectsData] = ): TaskR[Console, S3ObjectsData] =
TaskR(s3ObjectData) TaskR(s3ObjectData)
override def upload(localFile: LocalFile, override def upload(localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
batchMode: Boolean, batchMode: Boolean,
uploadEventListener: UploadEventListener, uploadEventListener: UploadEventListener,
tryCount: Int): Task[StorageQueueEvent] = { tryCount: Int): UIO[StorageQueueEvent] = {
val (remoteKey, md5Hash) = uploadFiles(localFile.file) val (remoteKey, md5Hash) = uploadFiles(localFile.file)
Task(StorageQueueEvent.UploadQueueEvent(remoteKey, md5Hash)) UIO(StorageQueueEvent.UploadQueueEvent(remoteKey, md5Hash))
} }
override def copy(bucket: Bucket, override def copy(bucket: Bucket,
sourceKey: RemoteKey, sourceKey: RemoteKey,
hash: MD5Hash, hash: MD5Hash,
targetKey: RemoteKey): Task[StorageQueueEvent] = targetKey: RemoteKey): UIO[StorageQueueEvent] =
Task(StorageQueueEvent.CopyQueueEvent(sourceKey, targetKey)) UIO(StorageQueueEvent.CopyQueueEvent(sourceKey, targetKey))
override def delete(bucket: Bucket, override def delete(bucket: Bucket,
remoteKey: RemoteKey): Task[StorageQueueEvent] = remoteKey: RemoteKey): UIO[StorageQueueEvent] =
Task(StorageQueueEvent.DeleteQueueEvent(remoteKey)) UIO(StorageQueueEvent.DeleteQueueEvent(remoteKey))
} }

View file

@ -7,17 +7,13 @@ import net.kemitix.thorp.console._
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.{HashService, StorageService} import net.kemitix.thorp.storage.api.{HashService, Storage}
import org.scalatest.FreeSpec import org.scalatest.FreeSpec
import zio.Runtime import zio.{DefaultRuntime, Task, UIO}
import zio.internal.PlatformLive
class PlanBuilderTest extends FreeSpec with TemporaryFolder { class PlanBuilderTest extends FreeSpec with TemporaryFolder {
private val runtime = Runtime(MyConsole.Live, PlatformLive.Default)
private val lastModified: LastModified = LastModified() private val lastModified: LastModified = LastModified()
private val planBuilder = new PlanBuilder {}
private val emptyS3ObjectData = S3ObjectsData() private val emptyS3ObjectData = S3ObjectsData()
"create a plan" - { "create a plan" - {
@ -25,6 +21,10 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val hashService = SimpleHashService() val hashService = SimpleHashService()
"one source" - { "one source" - {
val options: Path => ConfigOptions =
source =>
configOptions(ConfigOption.Source(source),
ConfigOption.Bucket("a-bucket"))
"a file" - { "a file" - {
val filename = "aFile" val filename = "aFile"
val remoteKey = RemoteKey(filename) val remoteKey = RemoteKey(filename)
@ -34,24 +34,12 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
withDirectory(source => { withDirectory(source => {
val file = createFile(source, filename, "file-content") val file = createFile(source, filename, "file-content")
val hash = md5Hash(file) val hash = md5Hash(file)
val expected =
val expected = Right( Right(List(toUpload(remoteKey, hash, source, file)))
List(
toUpload(remoteKey, hash, source, file)
))
val storageService =
DummyStorageService(emptyS3ObjectData,
Map(
file -> (remoteKey, hash)
))
val result = val result =
invoke(storageService, invoke(hashService,
hashService, options(source),
configOptions(ConfigOption.Source(source), UIO.succeed(emptyS3ObjectData))
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result) assertResult(expected)(result)
}) })
} }
@ -64,32 +52,17 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val aFile = createFile(source, filename, content) val aFile = createFile(source, filename, content)
val anOtherFile = createFile(source, anOtherFilename, content) val anOtherFile = createFile(source, anOtherFilename, content)
val aHash = md5Hash(aFile) val aHash = md5Hash(aFile)
val anOtherKey = RemoteKey("other")
val anOtherKey = RemoteKey("other") val expected = Right(List(toCopy(anOtherKey, aHash, remoteKey)))
val expected = Right(
List(
toCopy(anOtherKey, aHash, remoteKey)
))
val s3ObjectsData = S3ObjectsData( val s3ObjectsData = S3ObjectsData(
byHash = byHash =
Map(aHash -> Set(KeyModified(anOtherKey, lastModified))), Map(aHash -> Set(KeyModified(anOtherKey, lastModified))),
byKey = Map(anOtherKey -> HashModified(aHash, lastModified)) byKey = Map(anOtherKey -> HashModified(aHash, lastModified))
) )
val storageService =
DummyStorageService(s3ObjectsData,
Map(
aFile -> (remoteKey, aHash)
))
val result = val result =
invoke(storageService, invoke(hashService,
hashService, options(source),
configOptions(ConfigOption.Source(source), UIO.succeed(s3ObjectsData))
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result) assertResult(expected)(result)
}) })
} }
@ -101,28 +74,17 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
withDirectory(source => { withDirectory(source => {
val file = createFile(source, filename, "file-content") val file = createFile(source, filename, "file-content")
val hash = md5Hash(file) val hash = md5Hash(file)
// DoNothing actions should have been filtered out of the plan // DoNothing actions should have been filtered out of the plan
val expected = Right(List()) val expected = Right(List())
val s3ObjectsData = S3ObjectsData( val s3ObjectsData = S3ObjectsData(
byHash = byHash =
Map(hash -> Set(KeyModified(remoteKey, lastModified))), Map(hash -> Set(KeyModified(remoteKey, lastModified))),
byKey = Map(remoteKey -> HashModified(hash, lastModified)) byKey = Map(remoteKey -> HashModified(hash, lastModified))
) )
val storageService =
DummyStorageService(s3ObjectsData,
Map(
file -> (remoteKey, hash)
))
val result = val result =
invoke(storageService, invoke(hashService,
hashService, options(source),
configOptions(ConfigOption.Source(source), UIO.succeed(s3ObjectsData))
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result) assertResult(expected)(result)
}) })
} }
@ -134,31 +96,18 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val file = createFile(source, filename, "file-content") val file = createFile(source, filename, "file-content")
val currentHash = md5Hash(file) val currentHash = md5Hash(file)
val originalHash = MD5Hash("original-file-content") val originalHash = MD5Hash("original-file-content")
val expected =
val expected = Right( Right(List(toUpload(remoteKey, currentHash, source, file)))
List(
toUpload(remoteKey, currentHash, source, file)
))
val s3ObjectsData = S3ObjectsData( val s3ObjectsData = S3ObjectsData(
byHash = Map(originalHash -> Set( byHash = Map(originalHash -> Set(
KeyModified(remoteKey, lastModified))), KeyModified(remoteKey, lastModified))),
byKey = byKey =
Map(remoteKey -> HashModified(originalHash, lastModified)) Map(remoteKey -> HashModified(originalHash, lastModified))
) )
val storageService =
DummyStorageService(s3ObjectsData,
Map(
file -> (remoteKey, currentHash)
))
val result = val result =
invoke(storageService, invoke(hashService,
hashService, options(source),
configOptions(ConfigOption.Source(source), UIO.succeed(s3ObjectsData))
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result) assertResult(expected)(result)
}) })
} }
@ -169,30 +118,16 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val file = createFile(source, filename, "file-content") val file = createFile(source, filename, "file-content")
val hash = md5Hash(file) val hash = md5Hash(file)
val sourceKey = RemoteKey("other-key") val sourceKey = RemoteKey("other-key")
val expected = Right(List(toCopy(sourceKey, hash, remoteKey)))
val expected = Right(
List(
toCopy(sourceKey, hash, remoteKey)
))
val s3ObjectsData = S3ObjectsData( val s3ObjectsData = S3ObjectsData(
byHash = byHash =
Map(hash -> Set(KeyModified(sourceKey, lastModified))), Map(hash -> Set(KeyModified(sourceKey, lastModified))),
byKey = Map() byKey = Map()
) )
val storageService =
DummyStorageService(s3ObjectsData,
Map(
file -> (remoteKey, hash)
))
val result = val result =
invoke(storageService, invoke(hashService,
hashService, options(source),
configOptions(ConfigOption.Source(source), UIO.succeed(s3ObjectsData))
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result) assertResult(expected)(result)
}) })
} }
@ -208,54 +143,29 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
withDirectory(source => { withDirectory(source => {
val file = createFile(source, filename, "file-content") val file = createFile(source, filename, "file-content")
val hash = md5Hash(file) val hash = md5Hash(file)
// DoNothing actions should have been filtered out of the plan // DoNothing actions should have been filtered out of the plan
val expected = Right(List()) val expected = Right(List())
val s3ObjectsData = S3ObjectsData( val s3ObjectsData = S3ObjectsData(
byHash = Map(hash -> Set(KeyModified(remoteKey, lastModified))), byHash = Map(hash -> Set(KeyModified(remoteKey, lastModified))),
byKey = Map(remoteKey -> HashModified(hash, lastModified)) byKey = Map(remoteKey -> HashModified(hash, lastModified))
) )
val storageService =
DummyStorageService(s3ObjectsData,
Map(
file -> (remoteKey, hash)
))
val result = val result =
invoke(storageService, invoke(hashService, options(source), UIO.succeed(s3ObjectsData))
hashService,
configOptions(ConfigOption.Source(source),
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result) assertResult(expected)(result)
}) })
} }
} }
"with no matching local file" - { "with no matching local file" - {
"delete remote key" ignore { "delete remote key" in {
withDirectory(source => { withDirectory(source => {
val hash = MD5Hash("file-content") val hash = MD5Hash("file-content")
val expected = Right(List(toDelete(remoteKey)))
val expected = Right(
List(
toDelete(remoteKey)
))
val s3ObjectsData = S3ObjectsData( val s3ObjectsData = S3ObjectsData(
byHash = Map(hash -> Set(KeyModified(remoteKey, lastModified))), byHash = Map(hash -> Set(KeyModified(remoteKey, lastModified))),
byKey = Map(remoteKey -> HashModified(hash, lastModified)) byKey = Map(remoteKey -> HashModified(hash, lastModified))
) )
val storageService = DummyStorageService(s3ObjectsData, Map.empty)
val result = val result =
invoke(storageService, invoke(hashService, options(source), UIO.succeed(s3ObjectsData))
hashService,
configOptions(ConfigOption.Source(source),
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result) assertResult(expected)(result)
}) })
} }
@ -268,36 +178,31 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val filename2 = "file-2" val filename2 = "file-2"
val remoteKey1 = RemoteKey(filename1) val remoteKey1 = RemoteKey(filename1)
val remoteKey2 = RemoteKey(filename2) val remoteKey2 = RemoteKey(filename2)
val options: Path => Path => ConfigOptions =
source1 =>
source2 =>
configOptions(ConfigOption.Source(source1),
ConfigOption.Source(source2),
ConfigOption.Bucket("a-bucket"))
"unique files in both" - { "unique files in both" - {
"upload all files" in { "upload all files" in {
withDirectory(firstSource => { withDirectory(firstSource => {
val fileInFirstSource = val fileInFirstSource =
createFile(firstSource, filename1, "file-1-content") createFile(firstSource, filename1, "file-1-content")
val hash1 = md5Hash(fileInFirstSource) val hash1 = md5Hash(fileInFirstSource)
withDirectory(secondSource => { withDirectory(secondSource => {
val fileInSecondSource = val fileInSecondSource =
createFile(secondSource, filename2, "file-2-content") createFile(secondSource, filename2, "file-2-content")
val hash2 = md5Hash(fileInSecondSource) val hash2 = md5Hash(fileInSecondSource)
val expected = Right( val expected = Right(
List( List(
toUpload(remoteKey2, hash2, secondSource, fileInSecondSource), toUpload(remoteKey2, hash2, secondSource, fileInSecondSource),
toUpload(remoteKey1, hash1, firstSource, fileInFirstSource) toUpload(remoteKey1, hash1, firstSource, fileInFirstSource)
)) ))
val storageService = DummyStorageService(
emptyS3ObjectData,
Map(fileInFirstSource -> (remoteKey1, hash1),
fileInSecondSource -> (remoteKey2, hash2)))
val result = val result =
invoke(storageService, invoke(hashService,
hashService, options(firstSource)(secondSource),
configOptions(ConfigOption.Source(firstSource), UIO.succeed(emptyS3ObjectData))
ConfigOption.Source(secondSource),
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result) assertResult(expected)(result)
}) })
}) })
@ -309,29 +214,16 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val fileInFirstSource: File = val fileInFirstSource: File =
createFile(firstSource, filename1, "file-1-content") createFile(firstSource, filename1, "file-1-content")
val hash1 = md5Hash(fileInFirstSource) val hash1 = md5Hash(fileInFirstSource)
withDirectory(secondSource => { withDirectory(secondSource => {
val fileInSecondSource: File = val fileInSecondSource: File =
createFile(secondSource, filename1, "file-2-content") createFile(secondSource, filename1, "file-2-content")
val hash2 = md5Hash(fileInSecondSource) val hash2 = md5Hash(fileInSecondSource)
val expected = Right(List(
val expected = Right( toUpload(remoteKey1, hash1, firstSource, fileInFirstSource)))
List(
toUpload(remoteKey1, hash1, firstSource, fileInFirstSource)
))
val storageService = DummyStorageService(
emptyS3ObjectData,
Map(fileInFirstSource -> (remoteKey1, hash1),
fileInSecondSource -> (remoteKey2, hash2)))
val result = val result =
invoke(storageService, invoke(hashService,
hashService, options(firstSource)(secondSource),
configOptions(ConfigOption.Source(firstSource), UIO.succeed(emptyS3ObjectData))
ConfigOption.Source(secondSource),
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result) assertResult(expected)(result)
}) })
}) })
@ -340,30 +232,19 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
"with a remote file only present in second source" - { "with a remote file only present in second source" - {
"do not delete it " in { "do not delete it " in {
withDirectory(firstSource => { withDirectory(firstSource => {
withDirectory(secondSource => { withDirectory(secondSource => {
val fileInSecondSource = val fileInSecondSource =
createFile(secondSource, filename2, "file-2-content") createFile(secondSource, filename2, "file-2-content")
val hash2 = md5Hash(fileInSecondSource) val hash2 = md5Hash(fileInSecondSource)
val expected = Right(List()) val expected = Right(List())
val s3ObjectData = S3ObjectsData( val s3ObjectData = S3ObjectsData(
byHash = byHash =
Map(hash2 -> Set(KeyModified(remoteKey2, lastModified))), Map(hash2 -> Set(KeyModified(remoteKey2, lastModified))),
byKey = Map(remoteKey2 -> HashModified(hash2, lastModified))) byKey = Map(remoteKey2 -> HashModified(hash2, lastModified)))
val storageService = DummyStorageService(
s3ObjectData,
Map(fileInSecondSource -> (remoteKey2, hash2)))
val result = val result =
invoke(storageService, invoke(hashService,
hashService, options(firstSource)(secondSource),
configOptions(ConfigOption.Source(firstSource), UIO.succeed(s3ObjectData))
ConfigOption.Source(secondSource),
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result) assertResult(expected)(result)
}) })
}) })
@ -375,27 +256,16 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val fileInFirstSource: File = val fileInFirstSource: File =
createFile(firstSource, filename1, "file-1-content") createFile(firstSource, filename1, "file-1-content")
val hash1 = md5Hash(fileInFirstSource) val hash1 = md5Hash(fileInFirstSource)
withDirectory(secondSource => { withDirectory(secondSource => {
val expected = Right(List()) val expected = Right(List())
val s3ObjectData = S3ObjectsData( val s3ObjectData = S3ObjectsData(
byHash = byHash =
Map(hash1 -> Set(KeyModified(remoteKey1, lastModified))), Map(hash1 -> Set(KeyModified(remoteKey1, lastModified))),
byKey = Map(remoteKey1 -> HashModified(hash1, lastModified))) byKey = Map(remoteKey1 -> HashModified(hash1, lastModified)))
val storageService = DummyStorageService(
s3ObjectData,
Map(fileInFirstSource -> (remoteKey1, hash1)))
val result = val result =
invoke(storageService, invoke(hashService,
hashService, options(firstSource)(secondSource),
configOptions(ConfigOption.Source(firstSource), UIO.succeed(s3ObjectData))
ConfigOption.Source(secondSource),
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result) assertResult(expected)(result)
}) })
}) })
@ -404,26 +274,14 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
"with remote file not present in either source" - { "with remote file not present in either source" - {
"delete from remote" in { "delete from remote" in {
withDirectory(firstSource => { withDirectory(firstSource => {
withDirectory(secondSource => { withDirectory(secondSource => {
val expected = Right(List(toDelete(remoteKey1)))
val expected = Right(
List(
toDelete(remoteKey1)
))
val s3ObjectData = S3ObjectsData(byKey = val s3ObjectData = S3ObjectsData(byKey =
Map(remoteKey1 -> HashModified(MD5Hash(""), lastModified))) Map(remoteKey1 -> HashModified(MD5Hash(""), lastModified)))
val storageService = DummyStorageService(s3ObjectData, Map())
val result = val result =
invoke(storageService, invoke(hashService,
hashService, options(firstSource)(secondSource),
configOptions(ConfigOption.Source(firstSource), UIO.succeed(s3ObjectData))
ConfigOption.Source(secondSource),
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result) assertResult(expected)(result)
}) })
}) })
@ -431,8 +289,8 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
} }
} }
def md5Hash(file: File) = { def md5Hash(file: File): MD5Hash = {
runtime new DefaultRuntime {}
.unsafeRunSync { .unsafeRunSync {
hashService.hashLocalObject(file.toPath).map(_.get(MD5)) hashService.hashLocalObject(file.toPath).map(_.get(MD5))
} }
@ -464,30 +322,47 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
ConfigOptions(List(configOptions: _*)) ConfigOptions(List(configOptions: _*))
private def invoke( private def invoke(
storageService: StorageService,
hashService: HashService, hashService: HashService,
configOptions: ConfigOptions configOptions: ConfigOptions,
result: Task[S3ObjectsData]
): Either[Any, List[(String, String, String, String, String)]] = { ): Either[Any, List[(String, String, String, String, String)]] = {
runtime type TestEnv = Storage.Test with Console.Test
val testEnv: TestEnv = new Storage.Test with Console.Test {
override def listResult: Task[S3ObjectsData] = result
override def uploadResult: UIO[StorageQueueEvent] =
Task.die(new NotImplementedError)
override def copyResult: UIO[StorageQueueEvent] =
Task.die(new NotImplementedError)
override def deleteResult: UIO[StorageQueueEvent] =
Task.die(new NotImplementedError)
override def shutdownResult: UIO[StorageQueueEvent] =
Task.die(new NotImplementedError)
}
new DefaultRuntime {}
.unsafeRunSync { .unsafeRunSync {
planBuilder PlanBuilder
.createPlan(storageService, hashService, configOptions) .createPlan(hashService, configOptions)
.provide(testEnv)
} }
.toEither .toEither
.map(_.actions.toList.map({ .map(convertResult)
case ToUpload(_, lf, _) =>
("upload",
lf.remoteKey.key,
lf.hashes(MD5).hash,
lf.source.toString,
lf.file.toString)
case ToDelete(_, remoteKey, _) => ("delete", remoteKey.key, "", "", "")
case ToCopy(_, sourceKey, hash, targetKey, _) =>
("copy", sourceKey.key, hash.hash, targetKey.key, "")
case DoNothing(_, remoteKey, _) =>
("do-nothing", remoteKey.key, "", "", "")
case x => ("other", x.toString, "", "", "")
}))
} }
private def convertResult
: SyncPlan => List[(String, String, String, String, String)] =
_.actions.toList.map({
case ToUpload(_, lf, _) =>
("upload",
lf.remoteKey.key,
lf.hashes(MD5).hash,
lf.source.toString,
lf.file.toString)
case ToDelete(_, remoteKey, _) => ("delete", remoteKey.key, "", "", "")
case ToCopy(_, sourceKey, hash, targetKey, _) =>
("copy", sourceKey.key, hash.hash, targetKey.key, "")
case DoNothing(_, remoteKey, _) =>
("do-nothing", remoteKey.key, "", "", "")
case x => ("other", x.toString, "", "", "")
})
} }

View file

@ -1,243 +0,0 @@
package net.kemitix.thorp.core
import java.io.File
import java.nio.file.Paths
import java.time.Instant
import net.kemitix.thorp.console
import net.kemitix.thorp.console.MyConsole
import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain.MD5HashData.{Leaf, Root}
import net.kemitix.thorp.domain.StorageQueueEvent.{
CopyQueueEvent,
DeleteQueueEvent,
ShutdownQueueEvent,
UploadQueueEvent
}
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.{HashService, StorageService}
import org.scalatest.FunSpec
import zio.internal.PlatformLive
import zio.{Runtime, Task, TaskR}
class SyncSuite extends FunSpec {
private val runtime = Runtime(MyConsole.Live, PlatformLive.Default)
private val testBucket = Bucket("bucket")
private val source = Resource(this, "upload")
private val sourcePath = source.toPath
// source contains the files root-file and subdir/leaf-file
private val rootRemoteKey = RemoteKey("prefix/root-file")
private val leafRemoteKey = RemoteKey("prefix/subdir/leaf-file")
private val rootFile: LocalFile =
LocalFile.resolve("root-file",
md5HashMap(Root.hash),
sourcePath,
_ => rootRemoteKey)
private val leafFile: LocalFile =
LocalFile.resolve("subdir/leaf-file",
md5HashMap(Leaf.hash),
sourcePath,
_ => leafRemoteKey)
private val hashService =
DummyHashService(
Map(
file("root-file") -> Map(MD5 -> MD5HashData.Root.hash),
file("subdir/leaf-file") -> Map(MD5 -> MD5HashData.Leaf.hash)
))
private val configOptions =
ConfigOptions(
List(
ConfigOption.Source(sourcePath),
ConfigOption.Bucket("bucket"),
ConfigOption.Prefix("prefix"),
ConfigOption.IgnoreGlobalOptions,
ConfigOption.IgnoreUserOptions
))
private val lastModified = LastModified(Instant.now)
def putObjectRequest(bucket: Bucket,
remoteKey: RemoteKey,
localFile: LocalFile): (String, String, File) =
(bucket.name, remoteKey.key, localFile.file)
private def md5HashMap(md5Hash: MD5Hash): Map[HashType, MD5Hash] =
Map(MD5 -> md5Hash)
private def file(filename: String) =
sourcePath.resolve(Paths.get(filename))
describe("when all files should be uploaded") {
val storageService =
new RecordingStorageService(testBucket, S3ObjectsData())
it("uploads all files") {
val expected = Right(
Set(ToUpload(testBucket, rootFile, rootFile.file.length),
ToUpload(testBucket, leafFile, leafFile.file.length)))
val result =
invokeSubjectForActions(storageService, hashService, configOptions)
assertResult(expected)(result.map(_.toSet))
}
}
describe("when no files should be uploaded") {
val s3ObjectsData = S3ObjectsData(
byHash = Map(
Root.hash -> Set(
KeyModified(RemoteKey("prefix/root-file"), lastModified)),
Leaf.hash -> Set(
KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))
),
byKey = Map(
RemoteKey("prefix/root-file") -> HashModified(Root.hash, lastModified),
RemoteKey("prefix/subdir/leaf-file") -> HashModified(Leaf.hash,
lastModified))
)
val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
it("no actions") {
val expected = Stream()
val result =
invokeSubjectForActions(storageService, hashService, configOptions)
assert(result.isRight)
assertResult(expected)(result.right.get)
}
}
describe("when a file is renamed it is moved on S3 with no upload") {
val sourceKey = RemoteKey("prefix/root-file-old")
val targetKey = RemoteKey("prefix/root-file")
// 'root-file-old' should be renamed as 'root-file'
val s3ObjectsData = S3ObjectsData(
byHash =
Map(Root.hash -> Set(KeyModified(sourceKey, lastModified)),
Leaf.hash -> Set(
KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))),
byKey =
Map(sourceKey -> HashModified(Root.hash, lastModified),
RemoteKey("prefix/subdir/leaf-file") -> HashModified(Leaf.hash,
lastModified))
)
val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
it("copies the file and deletes the original") {
val expected = Stream(
ToCopy(testBucket,
sourceKey,
Root.hash,
targetKey,
rootFile.file.length),
ToDelete(testBucket, sourceKey, 0L)
)
val result =
invokeSubjectForActions(storageService, hashService, configOptions)
assert(result.isRight)
assertResult(expected)(result.right.get)
}
}
describe("when a file is copied it is copied on S3 with no upload") {
it("TODO") {
pending
}
}
describe("when a file is deleted locally it is deleted from S3") {
val deletedHash = MD5Hash("deleted-hash")
val deletedKey = RemoteKey("prefix/deleted-file")
val s3ObjectsData = S3ObjectsData(
byHash = Map(
Root.hash -> Set(
KeyModified(RemoteKey("prefix/root-file"), lastModified)),
Leaf.hash -> Set(
KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified)),
deletedHash -> Set(
KeyModified(RemoteKey("prefix/deleted-file"), lastModified))
),
byKey = Map(
RemoteKey("prefix/root-file") -> HashModified(Root.hash, lastModified),
RemoteKey("prefix/subdir/leaf-file") -> HashModified(Leaf.hash,
lastModified),
deletedKey -> HashModified(deletedHash, lastModified)
)
)
val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
it("deleted key") {
val expected = Stream(
ToDelete(testBucket, deletedKey, 0L)
)
val result =
invokeSubjectForActions(storageService, hashService, configOptions)
assert(result.isRight)
assertResult(expected)(result.right.get)
}
}
describe("when a file is excluded") {
val s3ObjectsData = S3ObjectsData(
byHash = Map(
Root.hash -> Set(
KeyModified(RemoteKey("prefix/root-file"), lastModified)),
Leaf.hash -> Set(
KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))
),
byKey = Map(
RemoteKey("prefix/root-file") -> HashModified(Root.hash, lastModified),
RemoteKey("prefix/subdir/leaf-file") -> HashModified(Leaf.hash,
lastModified))
)
val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
it("is not uploaded") {
val expected = Stream()
val result =
invokeSubjectForActions(storageService,
hashService,
ConfigOption.Exclude("leaf") :: configOptions)
assert(result.isRight)
assertResult(expected)(result.right.get)
}
}
class RecordingStorageService(testBucket: Bucket,
s3ObjectsData: S3ObjectsData)
extends StorageService {
override def listObjects(
bucket: Bucket,
prefix: RemoteKey): TaskR[console.MyConsole, S3ObjectsData] =
TaskR(s3ObjectsData)
override def upload(localFile: LocalFile,
bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener,
tryCount: Int): Task[UploadQueueEvent] =
Task(UploadQueueEvent(localFile.remoteKey, localFile.hashes(MD5)))
override def copy(bucket: Bucket,
sourceKey: RemoteKey,
hashes: MD5Hash,
targetKey: RemoteKey): Task[CopyQueueEvent] =
Task(CopyQueueEvent(sourceKey, targetKey))
override def delete(bucket: Bucket,
remoteKey: RemoteKey): Task[DeleteQueueEvent] =
Task(DeleteQueueEvent(remoteKey))
override def shutdown: Task[StorageQueueEvent] =
Task(ShutdownQueueEvent())
}
def invokeSubjectForActions(
storageService: StorageService,
hashService: HashService,
configOptions: ConfigOptions): Either[Any, Stream[Action]] = {
invoke(storageService, hashService, configOptions)
.map(_.actions)
}
def invoke(storageService: StorageService,
hashService: HashService,
configOptions: ConfigOptions): Either[Any, SyncPlan] = {
runtime.unsafeRunSync {
PlanBuilder
.createPlan(storageService, hashService, configOptions)
}.toEither
}
}

View file

@ -0,0 +1,95 @@
package net.kemitix.thorp.storage.api
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.domain._
import zio.{Task, TaskR, UIO, ZIO}
trait Storage {
val storage: Storage.Service
}
object Storage {
trait Service {
def listObjects(
bucket: Bucket,
prefix: RemoteKey
): TaskR[Storage with Console, S3ObjectsData]
def upload(
localFile: LocalFile,
bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener,
tryCount: Int
): ZIO[Storage, Nothing, StorageQueueEvent]
def copy(
bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey
): ZIO[Storage, Nothing, StorageQueueEvent]
def delete(
bucket: Bucket,
remoteKey: RemoteKey
): UIO[StorageQueueEvent]
def shutdown: UIO[StorageQueueEvent]
}
trait Test extends Storage {
def listResult: Task[S3ObjectsData]
def uploadResult: UIO[StorageQueueEvent]
def copyResult: UIO[StorageQueueEvent]
def deleteResult: UIO[StorageQueueEvent]
def shutdownResult: UIO[StorageQueueEvent]
val storage: Service = new Service {
override def listObjects(
bucket: Bucket,
prefix: RemoteKey): TaskR[Storage with Console, S3ObjectsData] =
listResult
override def upload(
localFile: LocalFile,
bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener,
tryCount: Int): ZIO[Storage, Nothing, StorageQueueEvent] =
uploadResult
override def copy(
bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey): ZIO[Storage, Nothing, StorageQueueEvent] =
copyResult
override def delete(bucket: Bucket,
remoteKey: RemoteKey): UIO[StorageQueueEvent] =
deleteResult
override def shutdown: UIO[StorageQueueEvent] =
shutdownResult
}
}
object Test extends Test {
override def listResult: Task[S3ObjectsData] =
Task.die(new NotImplementedError)
override def uploadResult: UIO[StorageQueueEvent] =
Task.die(new NotImplementedError)
override def copyResult: UIO[StorageQueueEvent] =
Task.die(new NotImplementedError)
override def deleteResult: UIO[StorageQueueEvent] =
Task.die(new NotImplementedError)
override def shutdownResult: UIO[StorageQueueEvent] =
Task.die(new NotImplementedError)
}
}

View file

@ -1,36 +0,0 @@
package net.kemitix.thorp.storage.api
import net.kemitix.thorp.console.MyConsole
import net.kemitix.thorp.domain._
import zio.{Task, TaskR}
trait StorageService {
def shutdown: Task[StorageQueueEvent]
def listObjects(
bucket: Bucket,
prefix: RemoteKey
): TaskR[MyConsole, S3ObjectsData]
def upload(
localFile: LocalFile,
bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener,
tryCount: Int
): Task[StorageQueueEvent]
def copy(
bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey
): Task[StorageQueueEvent]
def delete(
bucket: Bucket,
remoteKey: RemoteKey
): Task[StorageQueueEvent]
}

View file

@ -0,0 +1,42 @@
package net.kemitix.thorp
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.Storage
import zio.{TaskR, ZIO}
package object storage {
final val storageService: ZIO[Storage, Nothing, Storage.Service] =
ZIO.access(_.storage)
final def listObjects(
bucket: Bucket,
prefix: RemoteKey): TaskR[Storage with Console, S3ObjectsData] =
ZIO.accessM(_.storage listObjects (bucket, prefix))
final def upload(
localFile: LocalFile,
bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener,
tryCount: Int
): ZIO[Storage, Nothing, StorageQueueEvent] =
ZIO.accessM(
_.storage upload (localFile, bucket, batchMode, uploadEventListener, tryCount))
final def copyObject(
bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey
): ZIO[Storage, Nothing, StorageQueueEvent] =
ZIO.accessM(_.storage copy (bucket, sourceKey, hash, targetKey))
final def deleteObject(
bucket: Bucket,
remoteKey: RemoteKey
): ZIO[Storage, Nothing, StorageQueueEvent] =
ZIO.accessM(_.storage delete (bucket, remoteKey))
}

View file

@ -1,40 +1,47 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.model._
import com.amazonaws.services.s3.{AmazonS3 => AmazonS3Client} import com.amazonaws.services.s3.{AmazonS3 => AmazonS3Client}
import com.amazonaws.services.s3.model.{ import zio.{Task, UIO}
CopyObjectRequest,
CopyObjectResult,
DeleteObjectRequest,
ListObjectsV2Request,
ListObjectsV2Result
}
object AmazonS3 { object AmazonS3 {
trait Client { trait Client {
def shutdown(): Unit def shutdown(): UIO[Unit]
def deleteObject: DeleteObjectRequest => Unit def deleteObject: DeleteObjectRequest => Task[Unit]
def copyObject: CopyObjectRequest => Option[CopyObjectResult] def copyObject: CopyObjectRequest => Task[Option[CopyObjectResult]]
def listObjectsV2: ListObjectsV2Request => ListObjectsV2Result def listObjectsV2: ListObjectsV2Request => Task[ListObjectsV2Result]
} }
case class ClientImpl(amazonS3: AmazonS3Client) extends Client { case class ClientImpl(amazonS3: AmazonS3Client) extends Client {
def shutdown(): Unit = amazonS3.shutdown() def shutdown(): UIO[Unit] =
UIO {
amazonS3.shutdown()
}
def deleteObject: DeleteObjectRequest => Unit = def deleteObject: DeleteObjectRequest => Task[Unit] =
request => amazonS3.deleteObject(request) request =>
Task {
amazonS3.deleteObject(request)
}
def copyObject: CopyObjectRequest => Option[CopyObjectResult] = def copyObject: CopyObjectRequest => Task[Option[CopyObjectResult]] =
request => Option(amazonS3.copyObject(request)) request =>
Task {
amazonS3.copyObject(request)
}.map(Option(_))
def listObjectsV2: ListObjectsV2Request => ListObjectsV2Result = def listObjectsV2: ListObjectsV2Request => Task[ListObjectsV2Result] =
request => amazonS3.listObjectsV2(request) request =>
Task {
amazonS3.listObjectsV2(request)
}
} }

View file

@ -2,10 +2,19 @@ package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.model.PutObjectRequest import com.amazonaws.services.s3.model.PutObjectRequest
import com.amazonaws.services.s3.transfer.TransferManager import com.amazonaws.services.s3.transfer.TransferManager
import net.kemitix.thorp.storage.aws.AmazonUpload.{
CompletableUpload,
InProgress
}
import zio.{Task, UIO}
case class AmazonTransferManager(transferManager: TransferManager) { case class AmazonTransferManager(transferManager: TransferManager) {
def shutdownNow(now: Boolean): Unit = transferManager.shutdownNow(now) def shutdownNow(now: Boolean): UIO[Unit] =
UIO(transferManager.shutdownNow(now))
def upload: PutObjectRequest => Task[InProgress] =
putObjectRequest =>
Task(transferManager.upload(putObjectRequest))
.map(CompletableUpload)
def upload(putObjectRequest: PutObjectRequest): AmazonUpload =
AmazonUpload(transferManager.upload(putObjectRequest))
} }

View file

@ -3,6 +3,15 @@ package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.transfer.Upload import com.amazonaws.services.s3.transfer.Upload
import com.amazonaws.services.s3.transfer.model.UploadResult import com.amazonaws.services.s3.transfer.model.UploadResult
case class AmazonUpload(upload: Upload) { object AmazonUpload {
def waitForUploadResult: UploadResult = upload.waitForUploadResult()
trait InProgress {
def waitForUploadResult: UploadResult
}
case class CompletableUpload(upload: Upload) extends InProgress {
override def waitForUploadResult: UploadResult =
upload.waitForUploadResult()
}
} }

View file

@ -1,64 +1,49 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.model.{ import com.amazonaws.SdkClientException
AmazonS3Exception, import com.amazonaws.services.s3.model.{CopyObjectRequest, CopyObjectResult}
CopyObjectRequest, import net.kemitix.thorp.domain.StorageQueueEvent.{
CopyObjectResult Action,
CopyQueueEvent,
ErrorQueueEvent
} }
import net.kemitix.thorp.domain.StorageQueueEvent.{Action, CopyQueueEvent}
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.aws.S3ClientException.{ import net.kemitix.thorp.storage.aws.S3ClientException.{CopyError, HashError}
HashMatchError, import zio.{IO, Task, UIO}
S3Exception
}
import zio.Task
import scala.util.{Failure, Success, Try} trait Copier {
class Copier(amazonS3: AmazonS3.Client) { def copy(amazonS3: AmazonS3.Client)(
def copy(
bucket: Bucket, bucket: Bucket,
sourceKey: RemoteKey, sourceKey: RemoteKey,
hash: MD5Hash, hash: MD5Hash,
targetKey: RemoteKey targetKey: RemoteKey
): Task[StorageQueueEvent] = ): UIO[StorageQueueEvent] =
for { copyObject(amazonS3)(bucket, sourceKey, hash, targetKey)
copyResult <- copyObject(bucket, sourceKey, hash, targetKey) .fold(foldFailure(sourceKey, targetKey),
result <- mapCopyResult(copyResult, sourceKey, targetKey) foldSuccess(sourceKey, targetKey))
} yield result
private def mapCopyResult( private def copyObject(amazonS3: AmazonS3.Client)(
copyResult: Try[Option[CopyObjectResult]],
sourceKey: RemoteKey,
targetKey: RemoteKey
) =
copyResult match {
case Success(None) =>
Task.succeed(
StorageQueueEvent
.ErrorQueueEvent(
Action.Copy(s"${sourceKey.key} => ${targetKey.key}"),
targetKey,
HashMatchError))
case Success(Some(_)) =>
Task.succeed(CopyQueueEvent(sourceKey, targetKey))
case Failure(e: AmazonS3Exception) =>
Task.succeed(
StorageQueueEvent.ErrorQueueEvent(
Action.Copy(s"${sourceKey.key} => ${targetKey.key}"),
targetKey,
S3Exception(e.getMessage))
)
case Failure(e) => Task.fail(e)
}
private def copyObject(
bucket: Bucket, bucket: Bucket,
sourceKey: RemoteKey, sourceKey: RemoteKey,
hash: MD5Hash, hash: MD5Hash,
targetKey: RemoteKey targetKey: RemoteKey
) = { ): IO[S3ClientException, CopyObjectResult] = {
def handleResult
: Option[CopyObjectResult] => IO[S3ClientException, CopyObjectResult] =
maybeResult =>
IO.fromEither {
maybeResult
.toRight(HashError)
}
def handleError: Throwable => IO[S3ClientException, CopyObjectResult] =
error =>
Task.fail {
CopyError(error)
}
val request = val request =
new CopyObjectRequest( new CopyObjectRequest(
bucket.name, bucket.name,
@ -66,7 +51,39 @@ class Copier(amazonS3: AmazonS3.Client) {
bucket.name, bucket.name,
targetKey.key targetKey.key
).withMatchingETagConstraint(hash.hash) ).withMatchingETagConstraint(hash.hash)
Task(Try(amazonS3.copyObject(request))) amazonS3
.copyObject(request)
.fold(handleError, handleResult)
.flatten
} }
private def foldFailure(
sourceKey: RemoteKey,
targetKey: RemoteKey): S3ClientException => StorageQueueEvent = {
case error: SdkClientException =>
errorEvent(sourceKey, targetKey, error)
case error =>
errorEvent(sourceKey, targetKey, error)
}
private def foldSuccess(
sourceKey: RemoteKey,
targetKey: RemoteKey): CopyObjectResult => StorageQueueEvent =
result =>
Option(result) match {
case Some(_) => CopyQueueEvent(sourceKey, targetKey)
case None =>
errorEvent(sourceKey, targetKey, HashError)
}
private def errorEvent: (RemoteKey, RemoteKey, Throwable) => ErrorQueueEvent =
(sourceKey, targetKey, error) =>
ErrorQueueEvent(action(sourceKey, targetKey), targetKey, error)
private def action(sourceKey: RemoteKey, targetKey: RemoteKey): Action =
Action.Copy(s"${sourceKey.key} => ${targetKey.key}")
} }
object Copier extends Copier

View file

@ -1,26 +1,31 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.model.DeleteObjectRequest import com.amazonaws.services.s3.model.DeleteObjectRequest
import net.kemitix.thorp.domain.StorageQueueEvent.DeleteQueueEvent import net.kemitix.thorp.domain.StorageQueueEvent.{
Action,
DeleteQueueEvent,
ErrorQueueEvent
}
import net.kemitix.thorp.domain.{Bucket, RemoteKey, StorageQueueEvent} import net.kemitix.thorp.domain.{Bucket, RemoteKey, StorageQueueEvent}
import zio.Task import zio.{Task, UIO}
class Deleter(amazonS3: AmazonS3.Client) { trait Deleter {
def delete( def delete(amazonS3: AmazonS3.Client)(
bucket: Bucket, bucket: Bucket,
remoteKey: RemoteKey remoteKey: RemoteKey
): Task[StorageQueueEvent] = ): UIO[StorageQueueEvent] =
for { deleteObject(amazonS3)(bucket, remoteKey)
_ <- deleteObject(bucket, remoteKey) .map(_ => DeleteQueueEvent(remoteKey))
} yield DeleteQueueEvent(remoteKey) .catchAll(e =>
UIO(ErrorQueueEvent(Action.Delete(remoteKey.key), remoteKey, e)))
private def deleteObject( private def deleteObject(amazonS3: AmazonS3.Client)(
bucket: Bucket, bucket: Bucket,
remoteKey: RemoteKey remoteKey: RemoteKey
) = { ): Task[Unit] =
val request = new DeleteObjectRequest(bucket.name, remoteKey.key) amazonS3.deleteObject(new DeleteObjectRequest(bucket.name, remoteKey.key))
Task(amazonS3.deleteObject(request))
}
} }
object Deleter extends Deleter

View file

@ -1,8 +1,11 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary} import com.amazonaws.services.s3.model.{
ListObjectsV2Request,
ListObjectsV2Result,
S3ObjectSummary
}
import net.kemitix.thorp.console._ import net.kemitix.thorp.console._
import net.kemitix.thorp.domain
import net.kemitix.thorp.domain.{Bucket, RemoteKey, S3ObjectsData} import net.kemitix.thorp.domain.{Bucket, RemoteKey, S3ObjectsData}
import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash
import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey
@ -10,64 +13,64 @@ import zio.{Task, TaskR}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
class Lister(amazonS3: AmazonS3.Client) { trait Lister {
private type Token = String private type Token = String
private type Batch = (Stream[S3ObjectSummary], Option[Token]) case class Batch(summaries: Stream[S3ObjectSummary], more: Option[Token])
def listObjects( def listObjects(amazonS3: AmazonS3.Client)(
bucket: Bucket, bucket: Bucket,
prefix: RemoteKey prefix: RemoteKey
): TaskR[MyConsole, S3ObjectsData] = { ): TaskR[Console, S3ObjectsData] = {
val requestMore = (token: Token) => def request =
new ListObjectsV2Request() new ListObjectsV2Request()
.withBucketName(bucket.name) .withBucketName(bucket.name)
.withPrefix(prefix.key) .withPrefix(prefix.key)
.withContinuationToken(token)
def fetchBatch: ListObjectsV2Request => TaskR[MyConsole, Batch] = def requestMore: Token => ListObjectsV2Request =
token => request.withContinuationToken(token)
def fetchBatch: ListObjectsV2Request => TaskR[Console, Batch] =
request => request =>
for { for {
_ <- ListerLogger.logFetchBatch _ <- ListerLogger.logFetchBatch
batch <- tryFetchBatch(request) batch <- tryFetchBatch(amazonS3)(request)
} yield batch } yield batch
def fetchMore( def fetchMore: Option[Token] => TaskR[Console, Stream[S3ObjectSummary]] = {
more: Option[Token] case None => TaskR.succeed(Stream.empty)
): TaskR[MyConsole, Stream[S3ObjectSummary]] = { case Some(token) => fetch(requestMore(token))
more match {
case None => TaskR.succeed(Stream.empty)
case Some(token) => fetch(requestMore(token))
}
} }
def fetch def fetch: ListObjectsV2Request => TaskR[Console, Stream[S3ObjectSummary]] =
: ListObjectsV2Request => TaskR[MyConsole, Stream[S3ObjectSummary]] = request =>
request => {
for { for {
batch <- fetchBatch(request) batch <- fetchBatch(request)
(summaries, more) = batch more <- fetchMore(batch.more)
rest <- fetchMore(more) } yield batch.summaries ++ more
} yield summaries ++ rest
}
for { fetch(request)
summaries <- fetch( .map(summaries => {
new ListObjectsV2Request() S3ObjectsData(byHash(summaries), byKey(summaries))
.withBucketName(bucket.name) })
.withPrefix(prefix.key))
} yield domain.S3ObjectsData(byHash(summaries), byKey(summaries))
} }
private def tryFetchBatch( private def tryFetchBatch(
request: ListObjectsV2Request amazonS3: AmazonS3.Client): ListObjectsV2Request => Task[Batch] =
): Task[(Stream[S3ObjectSummary], Option[Token])] = request =>
Task(amazonS3.listObjectsV2(request)) amazonS3
.map { result => .listObjectsV2(request)
val more: Option[Token] = .map(result => Batch(objectSummaries(result), moreToken(result)))
if (result.isTruncated) Some(result.getNextContinuationToken)
else None private def objectSummaries(
(result.getObjectSummaries.asScala.toStream, more) result: ListObjectsV2Result): Stream[S3ObjectSummary] =
} result.getObjectSummaries.asScala.toStream
private def moreToken(result: ListObjectsV2Result): Option[String] =
if (result.isTruncated) Some(result.getNextContinuationToken)
else None
} }
object Lister extends Lister

View file

@ -4,7 +4,7 @@ import net.kemitix.thorp.console._
import zio.TaskR import zio.TaskR
trait ListerLogger { trait ListerLogger {
def logFetchBatch: TaskR[MyConsole, Unit] = def logFetchBatch: TaskR[Console, Unit] =
putStrLn("Fetching remote summaries...") putStrLn("Fetching remote summaries...")
} }
object ListerLogger extends ListerLogger object ListerLogger extends ListerLogger

View file

@ -1,9 +1,13 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
sealed trait S3ClientException extends Exception sealed trait S3ClientException extends Throwable
object S3ClientException { object S3ClientException {
case object HashMatchError extends S3ClientException { case object HashError extends S3ClientException {
override def getMessage: String =
"The hash of the object to be overwritten did not match the the expected value"
}
case class CopyError(error: Throwable) extends S3ClientException {
override def getMessage: String = override def getMessage: String =
"The hash of the object to be overwritten did not match the the expected value" "The hash of the object to be overwritten did not match the the expected value"
} }

View file

@ -0,0 +1,54 @@
package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import com.amazonaws.services.s3.transfer.TransferManagerBuilder
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.Storage
import net.kemitix.thorp.storage.api.Storage.Service
import zio.{TaskR, UIO}
object S3Storage {
trait Live extends Storage {
val storage: Service = new Service {
private val client: AmazonS3.Client =
AmazonS3.ClientImpl(AmazonS3ClientBuilder.defaultClient)
private val transferManager: AmazonTransferManager =
AmazonTransferManager(TransferManagerBuilder.defaultTransferManager)
override def listObjects(
bucket: Bucket,
prefix: RemoteKey): TaskR[Console, S3ObjectsData] =
Lister.listObjects(client)(bucket, prefix)
override def upload(localFile: LocalFile,
bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener,
tryCount: Int): UIO[StorageQueueEvent] =
Uploader.upload(transferManager)(localFile,
bucket,
batchMode,
uploadEventListener,
1)
override def copy(bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey): UIO[StorageQueueEvent] =
Copier.copy(client)(bucket, sourceKey, hash, targetKey)
override def delete(bucket: Bucket,
remoteKey: RemoteKey): UIO[StorageQueueEvent] =
Deleter.delete(client)(bucket, remoteKey)
override def shutdown: UIO[StorageQueueEvent] = {
transferManager.shutdownNow(true)
client.shutdown().map(_ => ShutdownQueueEvent())
}
}
}
object Live extends Live
}

View file

@ -1,55 +0,0 @@
package net.kemitix.thorp.storage.aws
import net.kemitix.thorp.console.MyConsole
import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.StorageService
import zio.{Task, TaskR}
class S3StorageService(
amazonS3Client: => AmazonS3.Client,
amazonTransferManager: => AmazonTransferManager
) extends StorageService {
lazy val objectLister = new Lister(amazonS3Client)
lazy val copier = new Copier(amazonS3Client)
lazy val uploader = new Uploader(amazonTransferManager)
lazy val deleter = new Deleter(amazonS3Client)
override def listObjects(
bucket: Bucket,
prefix: RemoteKey
): TaskR[MyConsole, S3ObjectsData] =
objectLister.listObjects(bucket, prefix)
override def copy(
bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey
): Task[StorageQueueEvent] =
copier.copy(bucket, sourceKey, hash, targetKey)
override def upload(
localFile: LocalFile,
bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener,
tryCount: Int
): Task[StorageQueueEvent] =
uploader.upload(localFile, bucket, batchMode, uploadEventListener, 1)
override def delete(
bucket: Bucket,
remoteKey: RemoteKey
): Task[StorageQueueEvent] =
deleter.delete(bucket, remoteKey)
override def shutdown: Task[StorageQueueEvent] =
Task {
amazonTransferManager.shutdownNow(true)
amazonS3Client.shutdown()
ShutdownQueueEvent()
}
}

View file

@ -1,24 +0,0 @@
package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import com.amazonaws.services.s3.transfer.TransferManagerBuilder
import net.kemitix.thorp.storage.api.StorageService
object S3StorageServiceBuilder {
def createService(
amazonS3Client: AmazonS3.Client,
amazonTransferManager: AmazonTransferManager
): StorageService =
new S3StorageService(
amazonS3Client,
amazonTransferManager
)
lazy val defaultStorageService: StorageService =
createService(
AmazonS3.ClientImpl(AmazonS3ClientBuilder.defaultClient),
AmazonTransferManager(TransferManagerBuilder.defaultTransferManager)
)
}

View file

@ -13,39 +13,41 @@ import net.kemitix.thorp.domain.UploadEvent.{
TransferEvent TransferEvent
} }
import net.kemitix.thorp.domain.{StorageQueueEvent, _} import net.kemitix.thorp.domain.{StorageQueueEvent, _}
import zio.Task import zio.{Task, UIO}
class Uploader(transferManager: => AmazonTransferManager) { trait Uploader {
def upload( def upload(transferManager: => AmazonTransferManager)(
localFile: LocalFile, localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
batchMode: Boolean, batchMode: Boolean,
uploadEventListener: UploadEventListener, uploadEventListener: UploadEventListener,
tryCount: Int tryCount: Int
): Task[StorageQueueEvent] = ): UIO[StorageQueueEvent] =
for { transfer(transferManager)(localFile, bucket, batchMode, uploadEventListener)
upload <- transfer(localFile, bucket, batchMode, uploadEventListener) .catchAll(handleError(localFile.remoteKey))
} yield upload
private def transfer( private def handleError(
remoteKey: RemoteKey): Throwable => UIO[ErrorQueueEvent] = { e =>
UIO.succeed(ErrorQueueEvent(Action.Upload(remoteKey.key), remoteKey, e))
}
private def transfer(transferManager: => AmazonTransferManager)(
localFile: LocalFile, localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
batchMode: Boolean, batchMode: Boolean,
uploadEventListener: UploadEventListener uploadEventListener: UploadEventListener
): Task[StorageQueueEvent] = { ): Task[StorageQueueEvent] = {
val listener: ProgressListener = progressListener(uploadEventListener)
val putObjectRequest = request(localFile, bucket, batchMode, listener) val listener = progressListener(uploadEventListener)
Task(transferManager.upload(putObjectRequest)) val putObjectRequest = request(localFile, bucket, batchMode, listener)
transferManager
.upload(putObjectRequest)
.map(_.waitForUploadResult) .map(_.waitForUploadResult)
.map(upload => .map(uploadResult =>
UploadQueueEvent(RemoteKey(upload.getKey), MD5Hash(upload.getETag))) UploadQueueEvent(RemoteKey(uploadResult.getKey),
.catchAll( MD5Hash(uploadResult.getETag)))
e =>
Task.succeed(
ErrorQueueEvent(Action.Upload(localFile.remoteKey.key),
localFile.remoteKey,
e)))
} }
private def request( private def request(
@ -54,36 +56,45 @@ class Uploader(transferManager: => AmazonTransferManager) {
batchMode: Boolean, batchMode: Boolean,
listener: ProgressListener listener: ProgressListener
): PutObjectRequest = { ): PutObjectRequest = {
val metadata = new ObjectMetadata()
localFile.md5base64.foreach(metadata.setContentMD5)
val request = val request =
new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file) new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file)
.withMetadata(metadata) .withMetadata(metadata(localFile))
if (batchMode) request if (batchMode) request
else request.withGeneralProgressListener(listener) else request.withGeneralProgressListener(listener)
} }
private def progressListener(uploadEventListener: UploadEventListener) = private def metadata: LocalFile => ObjectMetadata = localFile => {
new ProgressListener { val metadata = new ObjectMetadata()
override def progressChanged(progressEvent: ProgressEvent): Unit = localFile.md5base64.foreach(metadata.setContentMD5)
uploadEventListener.listener(eventHandler(progressEvent)) metadata
}
private def eventHandler(progressEvent: ProgressEvent) = { private def progressListener: UploadEventListener => ProgressListener =
progressEvent match { uploadEventListener =>
case e: ProgressEvent if isTransfer(e) => new ProgressListener {
TransferEvent(e.getEventType.name) override def progressChanged(progressEvent: ProgressEvent): Unit =
case e: ProgressEvent if isByteTransfer(e) => uploadEventListener.listener(eventHandler(progressEvent))
ByteTransferEvent(e.getEventType.name)
case e: ProgressEvent => private def eventHandler: ProgressEvent => UploadEvent =
RequestEvent(e.getEventType.name, e.getBytes, e.getBytesTransferred) progressEvent => {
} def isTransfer: ProgressEvent => Boolean =
} _.getEventType.isTransferEvent
def isByteTransfer: ProgressEvent => Boolean =
_.getEventType.equals(
ProgressEventType.RESPONSE_BYTE_TRANSFER_EVENT)
progressEvent match {
case e: ProgressEvent if isTransfer(e) =>
TransferEvent(e.getEventType.name)
case e: ProgressEvent if isByteTransfer(e) =>
ByteTransferEvent(e.getEventType.name)
case e: ProgressEvent =>
RequestEvent(e.getEventType.name,
e.getBytes,
e.getBytesTransferred)
}
}
} }
private def isTransfer(e: ProgressEvent) =
e.getEventType.isTransferEvent
private def isByteTransfer(e: ProgressEvent) =
e.getEventType equals ProgressEventType.RESPONSE_BYTE_TRANSFER_EVENT
} }
object Uploader extends Uploader

View file

@ -1,6 +1,11 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.Storage
import org.scalamock.scalatest.MockFactory import org.scalamock.scalatest.MockFactory
import zio.{TaskR, UIO}
trait AmazonS3ClientTestFixture extends MockFactory { trait AmazonS3ClientTestFixture extends MockFactory {
@ -11,8 +16,43 @@ trait AmazonS3ClientTestFixture extends MockFactory {
amazonS3Client: AmazonS3.Client, amazonS3Client: AmazonS3.Client,
amazonS3TransferManager: AmazonTransferManager, amazonS3TransferManager: AmazonTransferManager,
) { ) {
lazy val storageService: S3StorageService = lazy val storageService: Storage.Service =
new S3StorageService(amazonS3Client, amazonS3TransferManager) new Storage.Service {
private val client = amazonS3Client
private val transferManager = amazonS3TransferManager
override def listObjects(
bucket: Bucket,
prefix: RemoteKey): TaskR[Console, S3ObjectsData] =
Lister.listObjects(client)(bucket, prefix)
override def upload(localFile: LocalFile,
bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener,
tryCount: Int): UIO[StorageQueueEvent] =
Uploader.upload(transferManager)(localFile,
bucket,
batchMode,
uploadEventListener,
1)
override def copy(bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey): UIO[StorageQueueEvent] =
Copier.copy(client)(bucket, sourceKey, hash, targetKey)
override def delete(bucket: Bucket,
remoteKey: RemoteKey): UIO[StorageQueueEvent] =
Deleter.delete(client)(bucket, remoteKey)
override def shutdown: UIO[StorageQueueEvent] = {
transferManager.shutdownNow(true)
client.shutdown().map(_ => ShutdownQueueEvent())
}
}
} }
} }

View file

@ -1,20 +1,17 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.model.{AmazonS3Exception, CopyObjectResult} import com.amazonaws.services.s3.model.{AmazonS3Exception, CopyObjectResult}
import net.kemitix.thorp.console.MyConsole import net.kemitix.thorp.console.Console
import net.kemitix.thorp.domain.StorageQueueEvent.{Action, ErrorQueueEvent} import net.kemitix.thorp.domain.StorageQueueEvent.{Action, ErrorQueueEvent}
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.aws.S3ClientException.{ import net.kemitix.thorp.storage.aws.S3ClientException.{CopyError, HashError}
HashMatchError,
S3Exception
}
import org.scalatest.FreeSpec import org.scalatest.FreeSpec
import zio.Runtime
import zio.internal.PlatformLive import zio.internal.PlatformLive
import zio.{Runtime, Task}
class CopierTest extends FreeSpec { class CopierTest extends FreeSpec {
private val runtime = Runtime(MyConsole.Live, PlatformLive.Default) private val runtime = Runtime(Console.Live, PlatformLive.Default)
"copier" - { "copier" - {
val bucket = Bucket("aBucket") val bucket = Bucket("aBucket")
@ -29,9 +26,9 @@ class CopierTest extends FreeSpec {
new AmazonS3ClientTestFixture { new AmazonS3ClientTestFixture {
(fixture.amazonS3Client.copyObject _) (fixture.amazonS3Client.copyObject _)
.when() .when()
.returns(_ => Some(new CopyObjectResult)) .returns(_ => Task.succeed(Some(new CopyObjectResult)))
private val result = private val result =
invoke(bucket, sourceKey, hash, targetKey, fixture.storageService) invoke(bucket, sourceKey, hash, targetKey, fixture.amazonS3Client)
assertResult(expected)(result) assertResult(expected)(result)
} }
} }
@ -41,17 +38,17 @@ class CopierTest extends FreeSpec {
new AmazonS3ClientTestFixture { new AmazonS3ClientTestFixture {
(fixture.amazonS3Client.copyObject _) (fixture.amazonS3Client.copyObject _)
.when() .when()
.returns(_ => None) .returns(_ => Task.succeed(None))
private val result = private val result =
invoke(bucket, sourceKey, hash, targetKey, fixture.storageService) invoke(bucket, sourceKey, hash, targetKey, fixture.amazonS3Client)
result match { result match {
case Right( case Right(
ErrorQueueEvent(Action.Copy("sourceKey => targetKey"), ErrorQueueEvent(Action.Copy("sourceKey => targetKey"),
RemoteKey("targetKey"), RemoteKey("targetKey"),
e)) => e)) =>
e match { e match {
case HashMatchError => assert(true) case HashError => assert(true)
case _ => fail("Not a HashMatchError") case _ => fail("Not a HashError: " + e)
} }
case e => fail("Not an ErrorQueueEvent: " + e) case e => fail("Not an ErrorQueueEvent: " + e)
} }
@ -64,18 +61,18 @@ class CopierTest extends FreeSpec {
private val expectedMessage = "The specified key does not exist" private val expectedMessage = "The specified key does not exist"
(fixture.amazonS3Client.copyObject _) (fixture.amazonS3Client.copyObject _)
.when() .when()
.throws(new AmazonS3Exception(expectedMessage)) .returns(_ => Task.fail(new AmazonS3Exception(expectedMessage)))
private val result = private val result =
invoke(bucket, sourceKey, hash, targetKey, fixture.storageService) invoke(bucket, sourceKey, hash, targetKey, fixture.amazonS3Client)
result match { result match {
case Right( case Right(
ErrorQueueEvent(Action.Copy("sourceKey => targetKey"), ErrorQueueEvent(Action.Copy("sourceKey => targetKey"),
RemoteKey("targetKey"), RemoteKey("targetKey"),
e)) => e)) =>
e match { e match {
case S3Exception(message) => case CopyError(cause) =>
assert(message.startsWith(expectedMessage)) assert(cause.getMessage.startsWith(expectedMessage))
case _ => fail("Not an S3Exception") case _ => fail("Not a CopyError: " + e)
} }
case e => fail("Not an ErrorQueueEvent: " + e) case e => fail("Not an ErrorQueueEvent: " + e)
} }
@ -88,10 +85,10 @@ class CopierTest extends FreeSpec {
sourceKey: RemoteKey, sourceKey: RemoteKey,
hash: MD5Hash, hash: MD5Hash,
targetKey: RemoteKey, targetKey: RemoteKey,
storageService: S3StorageService amazonS3Client: AmazonS3.Client
) = ) =
runtime.unsafeRunSync { runtime.unsafeRunSync {
storageService.copy(bucket, sourceKey, hash, targetKey) Copier.copy(amazonS3Client)(bucket, sourceKey, hash, targetKey)
}.toEither }.toEither
} }

View file

@ -0,0 +1,66 @@
package net.kemitix.thorp.storage.aws
import com.amazonaws.SdkClientException
import com.amazonaws.services.s3.model.AmazonS3Exception
import net.kemitix.thorp.console._
import net.kemitix.thorp.domain.StorageQueueEvent.{
Action,
DeleteQueueEvent,
ErrorQueueEvent
}
import net.kemitix.thorp.domain.{Bucket, RemoteKey}
import org.scalatest.FreeSpec
import zio.internal.PlatformLive
import zio.{Runtime, Task, UIO}
class DeleterTest extends FreeSpec {
private val runtime = Runtime(Console.Live, PlatformLive.Default)
"delete" - {
val bucket = Bucket("aBucket")
val remoteKey = RemoteKey("aRemoteKey")
"when no errors" in {
val expected = Right(DeleteQueueEvent(remoteKey))
new AmazonS3ClientTestFixture {
(fixture.amazonS3Client.deleteObject _)
.when()
.returns(_ => UIO.succeed(()))
private val result = invoke(fixture.amazonS3Client)(bucket, remoteKey)
assertResult(expected)(result)
}
}
"when Amazon Service Exception" in {
val exception = new AmazonS3Exception("message")
val expected =
Right(
ErrorQueueEvent(Action.Delete(remoteKey.key), remoteKey, exception))
new AmazonS3ClientTestFixture {
(fixture.amazonS3Client.deleteObject _)
.when()
.returns(_ => Task.fail(exception))
private val result = invoke(fixture.amazonS3Client)(bucket, remoteKey)
assertResult(expected)(result)
}
}
"when Amazon SDK Client Exception" in {
val exception = new SdkClientException("message")
val expected =
Right(
ErrorQueueEvent(Action.Delete(remoteKey.key), remoteKey, exception))
new AmazonS3ClientTestFixture {
(fixture.amazonS3Client.deleteObject _)
.when()
.returns(_ => Task.fail(exception))
private val result = invoke(fixture.amazonS3Client)(bucket, remoteKey)
assertResult(expected)(result)
}
}
def invoke(amazonS3Client: AmazonS3.Client)(bucket: Bucket,
remoteKey: RemoteKey) =
runtime.unsafeRunSync {
Deleter.delete(amazonS3Client)(bucket, remoteKey)
}.toEither
}
}

View file

@ -0,0 +1,129 @@
package net.kemitix.thorp.storage.aws
import java.util.Date
import com.amazonaws.SdkClientException
import com.amazonaws.services.s3.model.{
AmazonS3Exception,
ListObjectsV2Result,
S3ObjectSummary
}
import net.kemitix.thorp.console._
import net.kemitix.thorp.domain._
import org.scalatest.FreeSpec
import zio.internal.PlatformLive
import zio.{Runtime, Task, UIO}
class ListerTest extends FreeSpec {
private val runtime = Runtime(Console.Live, PlatformLive.Default)
"list" - {
val bucket = Bucket("aBucket")
val prefix = RemoteKey("aRemoteKey")
"when no errors" - {
"when single fetch required" in {
val nowDate = new Date
val nowInstant = nowDate.toInstant
val key = "key"
val etag = "etag"
val expectedHashMap = Map(
MD5Hash(etag) -> Set(
KeyModified(RemoteKey(key), LastModified(nowInstant))))
val expectedKeyMap = Map(
RemoteKey(key) -> HashModified(MD5Hash(etag),
LastModified(nowInstant))
)
val expected = Right(S3ObjectsData(expectedHashMap, expectedKeyMap))
new AmazonS3ClientTestFixture {
(fixture.amazonS3Client.listObjectsV2 _)
.when()
.returns(_ => {
UIO.succeed(objectResults(nowDate, key, etag, false))
})
private val result = invoke(fixture.amazonS3Client)(bucket, prefix)
assertResult(expected)(result)
}
}
"when second fetch required" in {
val nowDate = new Date
val nowInstant = nowDate.toInstant
val key1 = "key1"
val etag1 = "etag1"
val key2 = "key2"
val etag2 = "etag2"
val expectedHashMap = Map(
MD5Hash(etag1) -> Set(
KeyModified(RemoteKey(key1), LastModified(nowInstant))),
MD5Hash(etag2) -> Set(
KeyModified(RemoteKey(key2), LastModified(nowInstant)))
)
val expectedKeyMap = Map(
RemoteKey(key1) -> HashModified(MD5Hash(etag1),
LastModified(nowInstant)),
RemoteKey(key2) -> HashModified(MD5Hash(etag2),
LastModified(nowInstant))
)
val expected = Right(S3ObjectsData(expectedHashMap, expectedKeyMap))
new AmazonS3ClientTestFixture {
(fixture.amazonS3Client.listObjectsV2 _)
.when()
.returns(_ => UIO(objectResults(nowDate, key1, etag1, true)))
.noMoreThanOnce()
(fixture.amazonS3Client.listObjectsV2 _)
.when()
.returns(_ => UIO(objectResults(nowDate, key2, etag2, false)))
private val result = invoke(fixture.amazonS3Client)(bucket, prefix)
assertResult(expected)(result)
}
}
def objectSummary(key: String, etag: String, lastModified: Date) = {
val objectSummary = new S3ObjectSummary
objectSummary.setKey(key)
objectSummary.setETag(etag)
objectSummary.setLastModified(lastModified)
objectSummary
}
def objectResults(nowDate: Date,
key: String,
etag: String,
truncated: Boolean) = {
val result = new ListObjectsV2Result
result.getObjectSummaries.add(objectSummary(key, etag, nowDate))
result.setTruncated(truncated)
result
}
}
"when Amazon Service Exception" in {
val exception = new AmazonS3Exception("message")
new AmazonS3ClientTestFixture {
(fixture.amazonS3Client.listObjectsV2 _)
.when()
.returns(_ => Task.fail(exception))
private val result = invoke(fixture.amazonS3Client)(bucket, prefix)
assert(result.isLeft)
}
}
"when Amazon SDK Client Exception" in {
val exception = new SdkClientException("message")
new AmazonS3ClientTestFixture {
(fixture.amazonS3Client.listObjectsV2 _)
.when()
.returns(_ => Task.fail(exception))
private val result = invoke(fixture.amazonS3Client)(bucket, prefix)
assert(result.isLeft)
}
}
def invoke(amazonS3Client: AmazonS3.Client)(bucket: Bucket,
prefix: RemoteKey) =
runtime.unsafeRunSync {
Lister.listObjects(amazonS3Client)(bucket, prefix)
}.toEither
}
}

View file

@ -1,74 +0,0 @@
package net.kemitix.thorp.storage.aws
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.Date
import com.amazonaws.services.s3.model.{ListObjectsV2Result, S3ObjectSummary}
import net.kemitix.thorp.console.MyConsole
import net.kemitix.thorp.core.Resource
import net.kemitix.thorp.domain._
import org.scalamock.scalatest.MockFactory
import org.scalatest.FreeSpec
import zio.Runtime
import zio.internal.PlatformLive
class S3StorageServiceSuite extends FreeSpec with MockFactory {
private val runtime = Runtime(MyConsole.Live, PlatformLive.Default)
"listObjects" - {
def objectSummary(hash: MD5Hash,
remoteKey: RemoteKey,
lastModified: LastModified) = {
val summary = new S3ObjectSummary()
summary.setETag(hash.hash)
summary.setKey(remoteKey.key)
summary.setLastModified(Date.from(lastModified.when))
summary
}
val source = Resource(this, "upload")
val sourcePath = source.toPath
val prefix = RemoteKey("prefix")
implicit val config: Config =
Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
val lm = LastModified(Instant.now.truncatedTo(ChronoUnit.MILLIS))
val h1 = MD5Hash("hash1")
val k1a = RemoteKey("key1a")
val o1a = objectSummary(h1, k1a, lm)
val k1b = RemoteKey("key1b")
val o1b = objectSummary(h1, k1b, lm)
val h2 = MD5Hash("hash2")
val k2 = RemoteKey("key2")
val o2 = objectSummary(h2, k2, lm)
val myFakeResponse = new ListObjectsV2Result()
val summaries = myFakeResponse.getObjectSummaries
summaries.add(o1a)
summaries.add(o1b)
summaries.add(o2)
"should build list of hash lookups, with duplicate objects grouped by hash" in {
val expected = Right(
S3ObjectsData(
byHash = Map(h1 -> Set(KeyModified(k1a, lm), KeyModified(k1b, lm)),
h2 -> Set(KeyModified(k2, lm))),
byKey = Map(k1a -> HashModified(h1, lm),
k1b -> HashModified(h1, lm),
k2 -> HashModified(h2, lm))
))
new AmazonS3ClientTestFixture {
(fixture.amazonS3Client.listObjectsV2 _)
.when()
.returns(_ => myFakeResponse)
private val result = invoke(fixture.storageService)
assertResult(expected)(result)
}
}
def invoke(storageService: S3StorageService) =
runtime.unsafeRunSync {
storageService
.listObjects(Bucket("bucket"), RemoteKey("prefix"))
}.toEither
}
}

View file

@ -2,12 +2,8 @@ package net.kemitix.thorp.storage.aws
import java.time.Instant import java.time.Instant
import com.amazonaws.services.s3.model.PutObjectRequest
import com.amazonaws.services.s3.transfer.model.UploadResult
import net.kemitix.thorp.core.{KeyGenerator, Resource, S3MetaDataEnricher} import net.kemitix.thorp.core.{KeyGenerator, Resource, S3MetaDataEnricher}
import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain.MD5HashData.Root
import net.kemitix.thorp.domain.StorageQueueEvent.UploadQueueEvent
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import org.scalamock.scalatest.MockFactory import org.scalamock.scalatest.MockFactory
import org.scalatest.FunSpec import org.scalatest.FunSpec
@ -24,17 +20,18 @@ class StorageServiceSuite extends FunSpec with MockFactory {
KeyGenerator.generateKey(config.sources, config.prefix) _ KeyGenerator.generateKey(config.sources, config.prefix) _
describe("getS3Status") { describe("getS3Status") {
val hash = MD5Hash("hash") val hash = MD5Hash("hash")
val localFile = val localFile =
LocalFile.resolve("the-file", md5HashMap(hash), sourcePath, fileToKey) LocalFile.resolve("the-file", Map(MD5 -> hash), sourcePath, fileToKey)
val key = localFile.remoteKey val key = localFile.remoteKey
val keyOtherKey = LocalFile.resolve("other-key-same-hash", val keyOtherKey = LocalFile.resolve("other-key-same-hash",
md5HashMap(hash), Map(MD5 -> hash),
sourcePath, sourcePath,
fileToKey) fileToKey)
val diffHash = MD5Hash("diff") val diffHash = MD5Hash("diff")
val keyDiffHash = LocalFile.resolve("other-key-diff-hash", val keyDiffHash = LocalFile.resolve("other-key-diff-hash",
md5HashMap(diffHash), Map(MD5 -> diffHash),
sourcePath, sourcePath,
fileToKey) fileToKey)
val lastModified = LastModified(Instant.now) val lastModified = LastModified(Instant.now)
@ -85,7 +82,7 @@ class StorageServiceSuite extends FunSpec with MockFactory {
describe("when remote key does not exist and no others matches hash") { describe("when remote key does not exist and no others matches hash") {
val localFile = LocalFile.resolve("missing-file", val localFile = LocalFile.resolve("missing-file",
md5HashMap(MD5Hash("unique")), Map(MD5 -> MD5Hash("unique")),
sourcePath, sourcePath,
fileToKey) fileToKey)
it("should return no matches by key") { it("should return no matches by key") {
@ -114,51 +111,4 @@ class StorageServiceSuite extends FunSpec with MockFactory {
} }
private def md5HashMap(hash: MD5Hash): Map[HashType, MD5Hash] =
Map(MD5 -> hash)
val batchMode: Boolean = true
describe("upload") {
describe("when uploading a file") {
val amazonS3Client = stub[AmazonS3.Client]
val amazonTransferManager = stub[AmazonTransferManager]
val storageService =
new S3StorageService(amazonS3Client, amazonTransferManager)
val prefix = RemoteKey("prefix")
val localFile =
LocalFile.resolve("root-file",
md5HashMap(Root.hash),
sourcePath,
KeyGenerator.generateKey(config.sources, prefix))
val bucket = Bucket("a-bucket")
val remoteKey = RemoteKey("prefix/root-file")
val uploadEventListener =
UploadEventListener(localFile, 1, SyncTotals(), 0L)
val upload = stub[AmazonUpload]
(amazonTransferManager upload (_: PutObjectRequest))
.when(*)
.returns(upload)
val uploadResult = stub[UploadResult]
(upload.waitForUploadResult _).when().returns(uploadResult)
(uploadResult.getETag _).when().returns(Root.hash.hash)
(uploadResult.getKey _).when().returns(remoteKey.key)
it("should return hash of uploaded file") {
pending
//FIXME: works okay on its own, but fails when run with others
val expected = UploadQueueEvent(remoteKey, Root.hash)
val result =
storageService.upload(localFile,
bucket,
batchMode,
uploadEventListener,
1)
assertResult(expected)(result)
}
}
}
} }

View file

@ -0,0 +1,120 @@
package net.kemitix.thorp.storage.aws
import java.io.File
import com.amazonaws.SdkClientException
import com.amazonaws.services.s3.model.AmazonS3Exception
import com.amazonaws.services.s3.transfer.model.UploadResult
import net.kemitix.thorp.console._
import net.kemitix.thorp.core.Resource
import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain.StorageQueueEvent.{
Action,
ErrorQueueEvent,
UploadQueueEvent
}
import net.kemitix.thorp.domain._
import org.scalamock.scalatest.MockFactory
import org.scalatest.FreeSpec
import zio.internal.PlatformLive
import zio.{Runtime, Task}
class UploaderTest extends FreeSpec with MockFactory {
private val runtime = Runtime(Console.Live, PlatformLive.Default)
"upload" - {
val aSource: File = Resource(this, "")
val aFile: File = Resource(this, "small-file")
val aHash = MD5Hash("aHash")
val hashes = Map[HashType, MD5Hash](MD5 -> aHash)
val remoteKey = RemoteKey("aRemoteKey")
val localFile = LocalFile(aFile, aSource, hashes, remoteKey)
val bucket = Bucket("aBucket")
val batchMode = false
val tryCount = 1
val uploadResult = new UploadResult
uploadResult.setKey(remoteKey.key)
uploadResult.setETag(aHash.hash)
val inProgress = new AmazonUpload.InProgress {
override def waitForUploadResult: UploadResult = uploadResult
}
val uploadEventListener =
UploadEventListener(localFile, 0, SyncTotals(1, 0, 0), 0)
"when no error" in {
val expected =
Right(UploadQueueEvent(remoteKey, aHash))
new AmazonS3ClientTestFixture {
(fixture.amazonS3TransferManager.upload _)
.when()
.returns(_ => Task.succeed(inProgress))
private val result =
invoke(fixture.amazonS3TransferManager)(
localFile,
bucket,
batchMode,
uploadEventListener,
tryCount
)
assertResult(expected)(result)
}
}
"when Amazon Service Exception" in {
val exception = new AmazonS3Exception("message")
val expected =
Right(
ErrorQueueEvent(Action.Upload(remoteKey.key), remoteKey, exception))
new AmazonS3ClientTestFixture {
(fixture.amazonS3TransferManager.upload _)
.when()
.returns(_ => Task.fail(exception))
private val result =
invoke(fixture.amazonS3TransferManager)(
localFile,
bucket,
batchMode,
uploadEventListener,
tryCount
)
assertResult(expected)(result)
}
}
"when Amazon SDK Client Exception" in {
val exception = new SdkClientException("message")
val expected =
Right(
ErrorQueueEvent(Action.Upload(remoteKey.key), remoteKey, exception))
new AmazonS3ClientTestFixture {
(fixture.amazonS3TransferManager.upload _)
.when()
.returns(_ => Task.fail(exception))
private val result =
invoke(fixture.amazonS3TransferManager)(
localFile,
bucket,
batchMode,
uploadEventListener,
tryCount
)
assertResult(expected)(result)
}
}
def invoke(transferManager: AmazonTransferManager)(
localFile: LocalFile,
bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener,
tryCount: Int
) =
runtime.unsafeRunSync {
Uploader.upload(transferManager)(
localFile,
bucket,
batchMode,
uploadEventListener,
tryCount
)
}.toEither
}
}