Improve logging consistency (#119)

* [console] create new effect MyConsole

* [core] SyncLogging use ConsoleOut.ValidConfig

* [storage-aws] Extract AmazonS3 trait to help testing

* [console] MyConsole use UIO

* [storage-aws] S3StorageServiceSuite convert to FreeSpec

* [storage-aws] S3StorageServiceSuite extract S3ClientTest trait

* [storage-aws] remove incomplete test UploaderSuite

* [storage-aws] extract and rename AmazonS3ClientTestFixture

* [storage-aws] Copier handle and log errors

* [core] ThropArchive log completed uploads here

* [core] ThorpArchive log copies and deletes

* Improve consistency of logging

* [storage-aws] CopierTest extract from S3StorageServiceSuite

* [storage-aws] Copier handle hash match errors properly

* [core] ThropArchive display erros in red

* [core] SequencePlan extracted

* [core] Clear line after deletion log

* [changelog] updated

* [cli] Program tidy up imports

* [storage-aws] Copier replace null with Option

* [storage-aws] AmazonS3 copyObject returns Some[CopyObjectResult]
This commit is contained in:
Paul Campbell 2019-07-23 23:13:09 +01:00 committed by GitHub
parent 8cca46340c
commit f277b5e789
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
35 changed files with 569 additions and 278 deletions

View file

@ -8,6 +8,11 @@ The format is based on [[https://keepachangelog.com/en/1.0.0/][Keep a Changelog]
* [0.8.0] - 2019-??-?? * [0.8.0] - 2019-??-??
** Added
- Log copy and delete operations (#119)
- Log errors with red label (#119)
** Changed ** Changed
- Replace cats-effect with zio (#117) - Replace cats-effect with zio (#117)

View file

@ -63,7 +63,7 @@ val zioDependencies = Seq(
) )
) )
// cli -> thorp-lib -> storage-aws -> core -> storage-api -> domain // cli -> thorp-lib -> storage-aws -> core -> storage-api -> console -> domain
lazy val thorp = (project in file(".")) lazy val thorp = (project in file("."))
.settings(commonSettings) .settings(commonSettings)
@ -112,6 +112,12 @@ lazy val `storage-api` = (project in file("storage-api"))
.settings(commonSettings) .settings(commonSettings)
.settings(zioDependencies) .settings(zioDependencies)
.settings(assemblyJarName in assembly := "storage-api.jar") .settings(assemblyJarName in assembly := "storage-api.jar")
.dependsOn(console)
lazy val console = (project in file("console"))
.settings(commonSettings)
.settings(zioDependencies)
.settings(assemblyJarName in assembly := "console.jar")
.dependsOn(domain) .dependsOn(domain)
lazy val domain = (project in file("domain")) lazy val domain = (project in file("domain"))

View file

@ -1,14 +1,22 @@
package net.kemitix.thorp.cli package net.kemitix.thorp.cli
import zio.{App, ZIO} import net.kemitix.thorp.console.MyConsole
import zio.internal.PlatformLive
import zio.{App, Runtime, UIO, ZIO}
object Main extends App { object Main extends App {
override def run(args: List[String]): ZIO[Environment, Nothing, Int] = { private val runtime = Runtime(MyConsole.Live, PlatformLive.Default)
override def run(args: List[String]): ZIO[Environment, Nothing, Int] =
runtime.unsafeRun {
appLogic(args).fold(_ => UIO(1), _ => UIO(0))
}
private def appLogic(args: List[String]): ZIO[MyConsole, Throwable, Unit] =
for { for {
cliOptions <- ParseArgs(args) cliOptions <- ParseArgs(args)
_ <- Program.run(cliOptions) _ <- Program.run(cliOptions)
} yield () } yield ()
}.fold(failure => 1, success => 0)
} }

View file

@ -1,17 +1,17 @@
package net.kemitix.thorp.cli package net.kemitix.thorp.cli
import net.kemitix.thorp.console._
import net.kemitix.thorp.core._ import net.kemitix.thorp.core._
import net.kemitix.thorp.domain.{StorageQueueEvent, SyncTotals} import net.kemitix.thorp.domain.{StorageQueueEvent, SyncTotals}
import net.kemitix.thorp.storage.aws.S3HashService.defaultHashService import net.kemitix.thorp.storage.aws.S3HashService.defaultHashService
import net.kemitix.thorp.storage.aws.S3StorageServiceBuilder.defaultStorageService import net.kemitix.thorp.storage.aws.S3StorageServiceBuilder.defaultStorageService
import zio.console._
import zio.{Task, TaskR, ZIO} import zio.{Task, TaskR, ZIO}
trait Program extends PlanBuilder { trait Program extends PlanBuilder {
lazy val version = s"Thorp v${thorp.BuildInfo.version}" lazy val version = s"Thorp v${thorp.BuildInfo.version}"
def run(cliOptions: ConfigOptions): ZIO[Console, Nothing, Unit] = { def run(cliOptions: ConfigOptions): ZIO[MyConsole, Nothing, Unit] = {
val showVersion = ConfigQuery.showVersion(cliOptions) val showVersion = ConfigQuery.showVersion(cliOptions)
for { for {
_ <- ZIO.when(showVersion)(putStrLn(version)) _ <- ZIO.when(showVersion)(putStrLn(version))
@ -20,7 +20,7 @@ trait Program extends PlanBuilder {
} }
private def execute( private def execute(
cliOptions: ConfigOptions): ZIO[Console, Throwable, Unit] = { cliOptions: ConfigOptions): ZIO[MyConsole, Throwable, Unit] = {
for { for {
plan <- createPlan(defaultStorageService, defaultHashService, cliOptions) plan <- createPlan(defaultStorageService, defaultHashService, cliOptions)
archive <- thorpArchive(cliOptions, plan.syncTotals) archive <- thorpArchive(cliOptions, plan.syncTotals)
@ -30,7 +30,8 @@ trait Program extends PlanBuilder {
} yield () } yield ()
} }
private def handleErrors(throwable: Throwable): ZIO[Console, Nothing, Unit] = private def handleErrors(
throwable: Throwable): ZIO[MyConsole, Nothing, Unit] =
for { for {
_ <- putStrLn("There were errors:") _ <- putStrLn("There were errors:")
_ <- throwable match { _ <- throwable match {
@ -54,21 +55,20 @@ trait Program extends PlanBuilder {
private def handleActions( private def handleActions(
archive: ThorpArchive, archive: ThorpArchive,
syncPlan: SyncPlan syncPlan: SyncPlan
): TaskR[Console, Stream[StorageQueueEvent]] = { ): TaskR[MyConsole, Stream[StorageQueueEvent]] = {
type Accumulator = (Stream[StorageQueueEvent], Long) type Accumulator = (Stream[StorageQueueEvent], Long)
val zero: Accumulator = (Stream(), syncPlan.syncTotals.totalSizeBytes) val zero: Accumulator = (Stream(), syncPlan.syncTotals.totalSizeBytes)
TaskR TaskR
.foldLeft(syncPlan.actions.reverse.zipWithIndex)(zero)( .foldLeft(syncPlan.actions.zipWithIndex)(zero)((acc, indexedAction) => {
(acc, indexedAction) => { val (action, index) = indexedAction
val (action, index) = indexedAction val (stream, bytesToDo) = acc
val (stream, bytesToDo) = acc val remainingBytes = bytesToDo - action.size
val remainingBytes = bytesToDo - action.size (for {
(for { event <- archive.update(index, action, remainingBytes)
event <- archive.update(index, action, remainingBytes) events = stream ++ Stream(event)
events = stream ++ Stream(event) } yield events)
} yield events) .map((_, remainingBytes))
.map((_, remainingBytes)) })
})
.map { .map {
case (events, _) => events case (events, _) => events
} }

View file

@ -3,18 +3,19 @@ package net.kemitix.thorp.cli
import java.io.File import java.io.File
import java.nio.file.Path import java.nio.file.Path
import net.kemitix.thorp.console.MyConsole
import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.core._ import net.kemitix.thorp.core._
import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.{HashService, StorageService} import net.kemitix.thorp.storage.api.{HashService, StorageService}
import org.scalatest.FunSpec import org.scalatest.FunSpec
import zio.console.Console import zio.internal.PlatformLive
import zio.{DefaultRuntime, Task, TaskR} import zio.{Runtime, Task, TaskR}
class ProgramTest extends FunSpec { class ProgramTest extends FunSpec {
private val runtime = new DefaultRuntime {} private val runtime = Runtime(MyConsole.Live, PlatformLive.Default)
val source: File = Resource(this, ".") val source: File = Resource(this, ".")
val sourcePath: Path = source.toPath val sourcePath: Path = source.toPath
@ -39,7 +40,7 @@ class ProgramTest extends FunSpec {
it("should be handled in correct order") { it("should be handled in correct order") {
val expected = List(copyAction, uploadAction, deleteAction) val expected = List(copyAction, uploadAction, deleteAction)
invoke(configOptions) invoke(configOptions)
val result = archive.actions val result = archive.actions.reverse
assertResult(expected)(result) assertResult(expected)(result)
} }
} }
@ -65,7 +66,7 @@ class ProgramTest extends FunSpec {
index: Int, index: Int,
action: Action, action: Action,
totalBytesSoFar: Long totalBytesSoFar: Long
): TaskR[Console, StorageQueueEvent] = { ): TaskR[MyConsole, StorageQueueEvent] = {
actions = action :: actions actions = action :: actions
TaskR(DoNothingQueueEvent(RemoteKey(""))) TaskR(DoNothingQueueEvent(RemoteKey("")))
} }

View file

@ -0,0 +1,21 @@
package net.kemitix.thorp.console
import net.kemitix.thorp.domain.{Bucket, RemoteKey, Sources}
sealed trait ConsoleOut {
def en: String
}
object ConsoleOut {
case class ValidConfig(
bucket: Bucket,
prefix: RemoteKey,
sources: Sources
) extends ConsoleOut {
private val sourcesList = sources.paths.mkString(", ")
override def en: String =
List(s"Bucket: ${bucket.name}",
s"Prefix: ${prefix.key}",
s"Source: $sourcesList")
.mkString(", ")
}
}

View file

@ -0,0 +1,31 @@
package net.kemitix.thorp.console
import java.io.PrintStream
import zio.{UIO, ZIO}
trait MyConsole {
val console: MyConsole.Service
}
object MyConsole {
trait Service {
def putStrLn(line: ConsoleOut): ZIO[MyConsole, Nothing, Unit]
def putStrLn(line: String): ZIO[MyConsole, Nothing, Unit]
}
trait Live extends MyConsole {
val console: Service = new Service {
override def putStrLn(line: ConsoleOut): ZIO[MyConsole, Nothing, Unit] =
putStrLn(line)
override def putStrLn(line: String): ZIO[MyConsole, Nothing, Unit] =
putStrLn(Console.out)(line)
final def putStrLn(stream: PrintStream)(
line: String): ZIO[MyConsole, Nothing, Unit] =
UIO(Console.withOut(stream)(Console.println(line)))
}
}
object Live extends Live
}

View file

@ -0,0 +1,16 @@
package net.kemitix.thorp
import zio.ZIO
package object console extends MyConsole.Service {
final val consoleService: ZIO[MyConsole, Nothing, MyConsole.Service] =
ZIO.access(_.console)
final def putStrLn(line: String): ZIO[MyConsole, Nothing, Unit] =
ZIO.accessM(_.console putStrLn line)
override def putStrLn(line: ConsoleOut): ZIO[MyConsole, Nothing, Unit] =
putStrLn(line.en)
}

View file

@ -1,9 +1,9 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import net.kemitix.thorp.core.Action.DoNothing import net.kemitix.thorp.console._
import net.kemitix.thorp.core.Action._
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.{HashService, StorageService} import net.kemitix.thorp.storage.api.{HashService, StorageService}
import zio.console._
import zio.{Task, TaskR} import zio.{Task, TaskR}
trait PlanBuilder { trait PlanBuilder {
@ -12,7 +12,7 @@ trait PlanBuilder {
storageService: StorageService, storageService: StorageService,
hashService: HashService, hashService: HashService,
configOptions: ConfigOptions configOptions: ConfigOptions
): TaskR[Console, SyncPlan] = ): TaskR[MyConsole, SyncPlan] =
ConfigurationBuilder ConfigurationBuilder
.buildConfig(configOptions) .buildConfig(configOptions)
.catchAll(errors => TaskR.fail(ConfigValidationException(errors))) .catchAll(errors => TaskR.fail(ConfigValidationException(errors)))
@ -21,7 +21,7 @@ trait PlanBuilder {
def useValidConfig( def useValidConfig(
storageService: StorageService, storageService: StorageService,
hashService: HashService hashService: HashService
)(implicit c: Config): TaskR[Console, SyncPlan] = { )(implicit c: Config): TaskR[MyConsole, SyncPlan] = {
for { for {
_ <- SyncLogging.logRunStart(c.bucket, c.prefix, c.sources) _ <- SyncLogging.logRunStart(c.bucket, c.prefix, c.sources)
actions <- buildPlan(storageService, hashService) actions <- buildPlan(storageService, hashService)
@ -31,7 +31,7 @@ trait PlanBuilder {
private def buildPlan( private def buildPlan(
storageService: StorageService, storageService: StorageService,
hashService: HashService hashService: HashService
)(implicit c: Config): TaskR[Console, SyncPlan] = )(implicit c: Config): TaskR[MyConsole, SyncPlan] =
for { for {
metadata <- gatherMetadata(storageService, hashService) metadata <- gatherMetadata(storageService, hashService)
} yield assemblePlan(c)(metadata) } yield assemblePlan(c)(metadata)
@ -40,7 +40,9 @@ trait PlanBuilder {
implicit c: Config): ((S3ObjectsData, LocalFiles)) => SyncPlan = { implicit c: Config): ((S3ObjectsData, LocalFiles)) => SyncPlan = {
case (remoteData, localData) => case (remoteData, localData) =>
SyncPlan( SyncPlan(
actions = createActions(remoteData, localData).filter(doesSomething), actions = createActions(remoteData, localData)
.filter(doesSomething)
.sortBy(SequencePlan.order),
syncTotals = SyncTotals(count = localData.count, syncTotals = SyncTotals(count = localData.count,
totalSizeBytes = localData.totalSizeBytes) totalSizeBytes = localData.totalSizeBytes)
) )
@ -89,7 +91,7 @@ trait PlanBuilder {
private def gatherMetadata( private def gatherMetadata(
storageService: StorageService, storageService: StorageService,
hashService: HashService hashService: HashService
)(implicit c: Config): TaskR[Console, (S3ObjectsData, LocalFiles)] = )(implicit c: Config): TaskR[MyConsole, (S3ObjectsData, LocalFiles)] =
for { for {
remoteData <- fetchRemoteData(storageService) remoteData <- fetchRemoteData(storageService)
localData <- findLocalFiles(hashService) localData <- findLocalFiles(hashService)
@ -97,12 +99,12 @@ trait PlanBuilder {
private def fetchRemoteData( private def fetchRemoteData(
storageService: StorageService storageService: StorageService
)(implicit c: Config): TaskR[Console, S3ObjectsData] = )(implicit c: Config): TaskR[MyConsole, S3ObjectsData] =
storageService.listObjects(c.bucket, c.prefix) storageService.listObjects(c.bucket, c.prefix)
private def findLocalFiles( private def findLocalFiles(
hashService: HashService hashService: HashService
)(implicit config: Config): TaskR[Console, LocalFiles] = )(implicit config: Config): TaskR[MyConsole, LocalFiles] =
for { for {
_ <- SyncLogging.logFileScan _ <- SyncLogging.logFileScan
localFiles <- findFiles(hashService) localFiles <- findFiles(hashService)

View file

@ -0,0 +1,16 @@
package net.kemitix.thorp.core
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
trait SequencePlan {
def order: Action => Int = {
case _: DoNothing => 0
case _: ToCopy => 1
case _: ToUpload => 2
case _: ToDelete => 3
}
}
object SequencePlan extends SequencePlan

View file

@ -1,5 +1,7 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import net.kemitix.thorp.console._
//import net.kemitix.thorp.console.MyConsole._
import net.kemitix.thorp.domain.StorageQueueEvent.{ import net.kemitix.thorp.domain.StorageQueueEvent.{
CopyQueueEvent, CopyQueueEvent,
DeleteQueueEvent, DeleteQueueEvent,
@ -7,8 +9,8 @@ import net.kemitix.thorp.domain.StorageQueueEvent.{
UploadQueueEvent UploadQueueEvent
} }
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.domain.Terminal.eraseToEndOfScreen
import zio.ZIO import zio.ZIO
import zio.console._
trait SyncLogging { trait SyncLogging {
@ -16,45 +18,27 @@ trait SyncLogging {
bucket: Bucket, bucket: Bucket,
prefix: RemoteKey, prefix: RemoteKey,
sources: Sources sources: Sources
): ZIO[Console, Nothing, Unit] = { ): ZIO[MyConsole, Nothing, Unit] =
val sourcesList = sources.paths.mkString(", ")
for { for {
_ <- putStrLn( _ <- putStrLn(ConsoleOut.ValidConfig(bucket, prefix, sources))
List(s"Bucket: ${bucket.name}",
s"Prefix: ${prefix.key}",
s"Source: $sourcesList")
.mkString(", "))
} yield () } yield ()
}
def logFileScan(implicit c: Config): ZIO[Console, Nothing, Unit] = def logFileScan(implicit c: Config): ZIO[MyConsole, Nothing, Unit] =
putStrLn(s"Scanning local files: ${c.sources.paths.mkString(", ")}...") putStrLn(s"Scanning local files: ${c.sources.paths.mkString(", ")}...")
def logRunFinished( def logRunFinished(
actions: Stream[StorageQueueEvent] actions: Stream[StorageQueueEvent]
): ZIO[Console, Nothing, Unit] = { ): ZIO[MyConsole, Nothing, Unit] = {
val counters = actions.foldLeft(Counters())(countActivities) val counters = actions.foldLeft(Counters())(countActivities)
for { for {
_ <- putStrLn(eraseToEndOfScreen)
_ <- putStrLn(s"Uploaded ${counters.uploaded} files") _ <- putStrLn(s"Uploaded ${counters.uploaded} files")
_ <- putStrLn(s"Copied ${counters.copied} files") _ <- putStrLn(s"Copied ${counters.copied} files")
_ <- putStrLn(s"Deleted ${counters.deleted} files") _ <- putStrLn(s"Deleted ${counters.deleted} files")
_ <- putStrLn(s"Errors ${counters.errors}") _ <- putStrLn(s"Errors ${counters.errors}")
_ <- logErrors(actions)
} yield () } yield ()
} }
def logErrors(
actions: Stream[StorageQueueEvent]
): ZIO[Console, Nothing, Unit] = {
ZIO.foldLeft(actions)(()) { (_, action) =>
action match {
case ErrorQueueEvent(k, e) =>
putStrLn(s"${k.key}: ${e.getMessage}")
case _ => ZIO.unit
}
}
}
private def countActivities: (Counters, StorageQueueEvent) => Counters = private def countActivities: (Counters, StorageQueueEvent) => Counters =
(counters: Counters, s3Action: StorageQueueEvent) => { (counters: Counters, s3Action: StorageQueueEvent) => {
import Counters._ import Counters._

View file

@ -1,8 +1,19 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import net.kemitix.thorp.domain.{LocalFile, StorageQueueEvent} import net.kemitix.thorp.console._
import net.kemitix.thorp.domain.StorageQueueEvent
import net.kemitix.thorp.domain.StorageQueueEvent.{
CopyQueueEvent,
DeleteQueueEvent,
DoNothingQueueEvent,
ErrorQueueEvent,
ShutdownQueueEvent,
UploadQueueEvent
}
import net.kemitix.thorp.domain.Terminal._
import zio.TaskR import zio.TaskR
import zio.console._
import scala.io.AnsiColor._
trait ThorpArchive { trait ThorpArchive {
@ -10,15 +21,45 @@ trait ThorpArchive {
index: Int, index: Int,
action: Action, action: Action,
totalBytesSoFar: Long totalBytesSoFar: Long
): TaskR[Console, StorageQueueEvent] ): TaskR[MyConsole, StorageQueueEvent]
def logFileUploaded( def logEvent(
localFile: LocalFile, event: StorageQueueEvent,
batchMode: Boolean batchMode: Boolean
): TaskR[Console, Unit] = ): TaskR[MyConsole, Unit] =
for { event match {
_ <- TaskR.when(batchMode)( case UploadQueueEvent(remoteKey, _) =>
putStrLn(s"Uploaded: ${localFile.remoteKey.key}")) for {
} yield () _ <- TaskR.when(batchMode)(putStrLn(s"Uploaded: ${remoteKey.key}"))
_ <- TaskR.when(!batchMode)(
putStrLn(
s"${GREEN}Uploaded:$RESET ${remoteKey.key}$eraseToEndOfScreen"))
} yield ()
case CopyQueueEvent(sourceKey, targetKey) =>
for {
_ <- TaskR.when(batchMode)(
putStrLn(s"Copied: ${sourceKey.key} => ${targetKey.key}"))
_ <- TaskR.when(!batchMode)(
putStrLn(
s"${GREEN}Copied:$RESET ${sourceKey.key} => ${targetKey.key}$eraseToEndOfScreen")
)
} yield ()
case DeleteQueueEvent(remoteKey) =>
for {
_ <- TaskR.when(batchMode)(putStrLn(s"Deleted: $remoteKey"))
_ <- TaskR.when(!batchMode)(
putStrLn(
s"${GREEN}Deleted:$RESET ${remoteKey.key}$eraseToEndOfScreen"))
} yield ()
case ErrorQueueEvent(action, _, e) =>
for {
_ <- TaskR.when(batchMode)(
putStrLn(s"${action.name} failed: ${action.keys}: ${e.getMessage}"))
_ <- TaskR.when(!batchMode)(putStrLn(
s"${RED}ERROR:$RESET ${action.name} ${action.keys}: ${e.getMessage}$eraseToEndOfScreen"))
} yield ()
case DoNothingQueueEvent(_) => TaskR(())
case ShutdownQueueEvent() => TaskR(())
}
} }

View file

@ -1,10 +1,10 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import net.kemitix.thorp.console._
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.StorageService import net.kemitix.thorp.storage.api.StorageService
import zio.console.Console
import zio.{Task, TaskR} import zio.{Task, TaskR}
case class UnversionedMirrorArchive( case class UnversionedMirrorArchive(
@ -17,20 +17,22 @@ case class UnversionedMirrorArchive(
index: Int, index: Int,
action: Action, action: Action,
totalBytesSoFar: Long totalBytesSoFar: Long
): TaskR[Console, StorageQueueEvent] = ): TaskR[MyConsole, StorageQueueEvent] =
action match { action match {
case ToUpload(bucket, localFile, _) => case ToUpload(bucket, localFile, _) =>
for { for {
event <- doUpload(index, totalBytesSoFar, bucket, localFile) event <- doUpload(index, totalBytesSoFar, bucket, localFile)
_ <- logFileUploaded(localFile, batchMode) _ <- logEvent(event, batchMode)
} yield event } yield event
case ToCopy(bucket, sourceKey, hash, targetKey, _) => case ToCopy(bucket, sourceKey, hash, targetKey, _) =>
for { for {
event <- storageService.copy(bucket, sourceKey, hash, targetKey) event <- storageService.copy(bucket, sourceKey, hash, targetKey)
_ <- logEvent(event, batchMode)
} yield event } yield event
case ToDelete(bucket, remoteKey, _) => case ToDelete(bucket, remoteKey, _) =>
for { for {
event <- storageService.delete(bucket, remoteKey) event <- storageService.delete(bucket, remoteKey)
_ <- logEvent(event, batchMode)
} yield event } yield event
case DoNothing(_, remoteKey, _) => case DoNothing(_, remoteKey, _) =>
Task(DoNothingQueueEvent(remoteKey)) Task(DoNothingQueueEvent(remoteKey))

View file

@ -2,9 +2,9 @@ package net.kemitix.thorp.core
import java.io.File import java.io.File
import net.kemitix.thorp.console._
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.StorageService import net.kemitix.thorp.storage.api.StorageService
import zio.console.Console
import zio.{Task, TaskR} import zio.{Task, TaskR}
case class DummyStorageService(s3ObjectData: S3ObjectsData, case class DummyStorageService(s3ObjectData: S3ObjectsData,
@ -14,8 +14,10 @@ case class DummyStorageService(s3ObjectData: S3ObjectsData,
override def shutdown: Task[StorageQueueEvent] = override def shutdown: Task[StorageQueueEvent] =
Task(StorageQueueEvent.ShutdownQueueEvent()) Task(StorageQueueEvent.ShutdownQueueEvent())
override def listObjects(bucket: Bucket, override def listObjects(
prefix: RemoteKey): TaskR[Console, S3ObjectsData] = bucket: Bucket,
prefix: RemoteKey
): TaskR[MyConsole, S3ObjectsData] =
TaskR(s3ObjectData) TaskR(s3ObjectData)
override def upload(localFile: LocalFile, override def upload(localFile: LocalFile,
@ -31,7 +33,7 @@ case class DummyStorageService(s3ObjectData: S3ObjectsData,
sourceKey: RemoteKey, sourceKey: RemoteKey,
hash: MD5Hash, hash: MD5Hash,
targetKey: RemoteKey): Task[StorageQueueEvent] = targetKey: RemoteKey): Task[StorageQueueEvent] =
Task(StorageQueueEvent.CopyQueueEvent(targetKey)) Task(StorageQueueEvent.CopyQueueEvent(sourceKey, targetKey))
override def delete(bucket: Bucket, override def delete(bucket: Bucket,
remoteKey: RemoteKey): Task[StorageQueueEvent] = remoteKey: RemoteKey): Task[StorageQueueEvent] =

View file

@ -3,15 +3,17 @@ package net.kemitix.thorp.core
import java.io.File import java.io.File
import java.nio.file.Path import java.nio.file.Path
import net.kemitix.thorp.console._
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.{HashService, StorageService} import net.kemitix.thorp.storage.api.{HashService, StorageService}
import org.scalatest.FreeSpec import org.scalatest.FreeSpec
import zio.DefaultRuntime import zio.Runtime
import zio.internal.PlatformLive
class PlanBuilderTest extends FreeSpec with TemporaryFolder { class PlanBuilderTest extends FreeSpec with TemporaryFolder {
private val runtime = new DefaultRuntime {} private val runtime = Runtime(MyConsole.Live, PlatformLive.Default)
private val lastModified: LastModified = LastModified() private val lastModified: LastModified = LastModified()
private val planBuilder = new PlanBuilder {} private val planBuilder = new PlanBuilder {}
@ -464,7 +466,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
storageService: StorageService, storageService: StorageService,
hashService: HashService, hashService: HashService,
configOptions: ConfigOptions configOptions: ConfigOptions
): Either[Any, List[(String, String, String, String, String)]] = ): Either[Any, List[(String, String, String, String, String)]] = {
runtime runtime
.unsafeRunSync { .unsafeRunSync {
planBuilder planBuilder
@ -485,5 +487,6 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
("do-nothing", remoteKey.key, "", "", "") ("do-nothing", remoteKey.key, "", "", "")
case x => ("other", x.toString, "", "", "") case x => ("other", x.toString, "", "", "")
})) }))
}
} }

View file

@ -0,0 +1,41 @@
package net.kemitix.thorp.core
import java.io.File
import net.kemitix.thorp.core.Action._
import net.kemitix.thorp.domain.{Bucket, LocalFile, MD5Hash, RemoteKey}
import org.scalatest.FreeSpec
class SequencePlanTest extends FreeSpec {
"sort" - {
"a list of assorted actions" - {
val bucket = Bucket("aBucket")
val remoteKey1 = RemoteKey("remoteKey1")
val remoteKey2 = RemoteKey("targetHash")
val hash = MD5Hash("aHash")
val hashes: Map[String, MD5Hash] = Map()
val size = 1024
val file1 = new File("aFile")
val file2 = new File("aFile")
val source = new File("source")
val localFile1 =
LocalFile(file1, source, hashes, remoteKey1)
val localFile2 =
LocalFile(file2, source, hashes, remoteKey2)
val copy1 = ToCopy(bucket, remoteKey1, hash, remoteKey2, size)
val copy2 = ToCopy(bucket, remoteKey2, hash, remoteKey1, size)
val upload1 = ToUpload(bucket, localFile1, size)
val upload2 = ToUpload(bucket, localFile1, size)
val delete1 = ToDelete(bucket, remoteKey1, size)
val delete2 = ToDelete(bucket, remoteKey2, size)
"should be in correct order" in {
val actions = List(copy1, delete1, upload1, delete2, upload2, copy2)
val expected = List(copy1, copy2, upload1, upload2, delete1, delete2)
val result = actions.sortBy(SequencePlan.order)
assertResult(expected)(result)
}
}
}
}

View file

@ -1,27 +0,0 @@
package net.kemitix.thorp.core
import net.kemitix.thorp.domain.StorageQueueEvent.{
CopyQueueEvent,
DeleteQueueEvent,
UploadQueueEvent
}
import net.kemitix.thorp.domain.{MD5Hash, RemoteKey}
import org.scalatest.FunSpec
class StorageQueueEventSuite extends FunSpec {
describe("Ordering of types") {
val remoteKey = RemoteKey("remote-key")
val md5Hash = MD5Hash("md5hash")
val copy = CopyQueueEvent(remoteKey)
val upload = UploadQueueEvent(remoteKey, md5Hash)
val delete = DeleteQueueEvent(remoteKey)
val unsorted = List(delete, copy, upload)
it("should sort as copy < upload < delete ") {
val result = unsorted.sorted
val expected = List(copy, upload, delete)
assertResult(expected)(result)
}
}
}

View file

@ -4,6 +4,8 @@ import java.io.File
import java.nio.file.Paths import java.nio.file.Paths
import java.time.Instant import java.time.Instant
import net.kemitix.thorp.console
import net.kemitix.thorp.console.MyConsole
import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain.MD5HashData.{Leaf, Root} import net.kemitix.thorp.domain.MD5HashData.{Leaf, Root}
import net.kemitix.thorp.domain.StorageQueueEvent.{ import net.kemitix.thorp.domain.StorageQueueEvent.{
@ -15,12 +17,12 @@ import net.kemitix.thorp.domain.StorageQueueEvent.{
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.{HashService, StorageService} import net.kemitix.thorp.storage.api.{HashService, StorageService}
import org.scalatest.FunSpec import org.scalatest.FunSpec
import zio.console.Console import zio.internal.PlatformLive
import zio.{DefaultRuntime, Task, TaskR} import zio.{Runtime, Task, TaskR}
class SyncSuite extends FunSpec { class SyncSuite extends FunSpec {
private val runtime = new DefaultRuntime {} private val runtime = Runtime(MyConsole.Live, PlatformLive.Default)
private val testBucket = Bucket("bucket") private val testBucket = Bucket("bucket")
private val source = Resource(this, "upload") private val source = Resource(this, "upload")
@ -195,8 +197,9 @@ class SyncSuite extends FunSpec {
s3ObjectsData: S3ObjectsData) s3ObjectsData: S3ObjectsData)
extends StorageService { extends StorageService {
override def listObjects(bucket: Bucket, override def listObjects(
prefix: RemoteKey): TaskR[Console, S3ObjectsData] = bucket: Bucket,
prefix: RemoteKey): TaskR[console.MyConsole, S3ObjectsData] =
TaskR(s3ObjectsData) TaskR(s3ObjectsData)
override def upload(localFile: LocalFile, override def upload(localFile: LocalFile,
@ -210,7 +213,7 @@ class SyncSuite extends FunSpec {
sourceKey: RemoteKey, sourceKey: RemoteKey,
hashes: MD5Hash, hashes: MD5Hash,
targetKey: RemoteKey): Task[CopyQueueEvent] = targetKey: RemoteKey): Task[CopyQueueEvent] =
Task(CopyQueueEvent(targetKey)) Task(CopyQueueEvent(sourceKey, targetKey))
override def delete(bucket: Bucket, override def delete(bucket: Bucket,
remoteKey: RemoteKey): Task[DeleteQueueEvent] = remoteKey: RemoteKey): Task[DeleteQueueEvent] =

View file

@ -1,49 +1,49 @@
package net.kemitix.thorp.domain package net.kemitix.thorp.domain
sealed trait StorageQueueEvent { sealed trait StorageQueueEvent
val order: Int
}
object StorageQueueEvent { object StorageQueueEvent {
final case class DoNothingQueueEvent( final case class DoNothingQueueEvent(
remoteKey: RemoteKey remoteKey: RemoteKey
) extends StorageQueueEvent { ) extends StorageQueueEvent
override val order: Int = 0
}
final case class CopyQueueEvent( final case class CopyQueueEvent(
remoteKey: RemoteKey sourceKey: RemoteKey,
) extends StorageQueueEvent { targetKey: RemoteKey
override val order: Int = 1 ) extends StorageQueueEvent
}
final case class UploadQueueEvent( final case class UploadQueueEvent(
remoteKey: RemoteKey, remoteKey: RemoteKey,
md5Hash: MD5Hash md5Hash: MD5Hash
) extends StorageQueueEvent { ) extends StorageQueueEvent
override val order: Int = 2
}
final case class DeleteQueueEvent( final case class DeleteQueueEvent(
remoteKey: RemoteKey remoteKey: RemoteKey
) extends StorageQueueEvent { ) extends StorageQueueEvent
override val order: Int = 3
}
final case class ErrorQueueEvent( final case class ErrorQueueEvent(
action: Action,
remoteKey: RemoteKey, remoteKey: RemoteKey,
e: Throwable e: Throwable
) extends StorageQueueEvent { ) extends StorageQueueEvent
override val order: Int = 10
}
final case class ShutdownQueueEvent() extends StorageQueueEvent { final case class ShutdownQueueEvent() extends StorageQueueEvent
override val order: Int = 99
}
implicit def ord[A <: StorageQueueEvent]: Ordering[A] = Ordering.by(_.order) sealed trait Action {
val name: String
val keys: String
}
object Action {
case class Copy(keys: String) extends Action {
override val name: String = "Copy"
}
case class Upload(keys: String) extends Action {
override val name: String = "Upload"
}
case class Delete(keys: String) extends Action {
override val name: String = "Delete"
}
}
} }

View file

@ -19,7 +19,7 @@ trait UploadEventLogger {
val remoteKey = localFile.remoteKey.key val remoteKey = localFile.remoteKey.key
val fileLength = localFile.file.length val fileLength = localFile.file.length
val statusHeight = 7 val statusHeight = 7
if (bytesTransferred < fileLength) { if (bytesTransferred < fileLength)
println( println(
s"${GREEN}Uploading:$RESET $remoteKey$eraseToEndOfScreen\n" + s"${GREEN}Uploading:$RESET $remoteKey$eraseToEndOfScreen\n" +
statusWithBar(" File", sizeInEnglish, bytesTransferred, fileLength) + statusWithBar(" File", sizeInEnglish, bytesTransferred, fileLength) +
@ -29,8 +29,6 @@ trait UploadEventLogger {
bytesTransferred + totalBytesSoFar, bytesTransferred + totalBytesSoFar,
syncTotals.totalSizeBytes) + syncTotals.totalSizeBytes) +
s"${Terminal.cursorPrevLine(statusHeight)}") s"${Terminal.cursorPrevLine(statusHeight)}")
} else
println(s"${GREEN}Uploaded:$RESET $remoteKey$eraseToEndOfScreen")
} }
private def statusWithBar( private def statusWithBar(

View file

@ -1,7 +1,7 @@
package net.kemitix.thorp.storage.api package net.kemitix.thorp.storage.api
import net.kemitix.thorp.console.MyConsole
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import zio.console.Console
import zio.{Task, TaskR} import zio.{Task, TaskR}
trait StorageService { trait StorageService {
@ -11,7 +11,7 @@ trait StorageService {
def listObjects( def listObjects(
bucket: Bucket, bucket: Bucket,
prefix: RemoteKey prefix: RemoteKey
): TaskR[Console, S3ObjectsData] ): TaskR[MyConsole, S3ObjectsData]
def upload( def upload(
localFile: LocalFile, localFile: LocalFile,

View file

@ -0,0 +1,41 @@
package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.{AmazonS3 => AmazonS3Client}
import com.amazonaws.services.s3.model.{
CopyObjectRequest,
CopyObjectResult,
DeleteObjectRequest,
ListObjectsV2Request,
ListObjectsV2Result
}
object AmazonS3 {
trait Client {
def shutdown(): Unit
def deleteObject: DeleteObjectRequest => Unit
def copyObject: CopyObjectRequest => Option[CopyObjectResult]
def listObjectsV2: ListObjectsV2Request => ListObjectsV2Result
}
case class ClientImpl(amazonS3: AmazonS3Client) extends Client {
def shutdown(): Unit = amazonS3.shutdown()
def deleteObject: DeleteObjectRequest => Unit =
request => amazonS3.deleteObject(request)
def copyObject: CopyObjectRequest => Option[CopyObjectResult] =
request => Option(amazonS3.copyObject(request))
def listObjectsV2: ListObjectsV2Request => ListObjectsV2Result =
request => amazonS3.listObjectsV2(request)
}
}

View file

@ -1,12 +1,21 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.{
import com.amazonaws.services.s3.model.CopyObjectRequest AmazonS3Exception,
import net.kemitix.thorp.domain.StorageQueueEvent.CopyQueueEvent CopyObjectRequest,
CopyObjectResult
}
import net.kemitix.thorp.domain.StorageQueueEvent.{Action, CopyQueueEvent}
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.aws.S3ClientException.{
HashMatchError,
S3Exception
}
import zio.Task import zio.Task
class Copier(amazonS3: AmazonS3) { import scala.util.{Failure, Success, Try}
class Copier(amazonS3: AmazonS3.Client) {
def copy( def copy(
bucket: Bucket, bucket: Bucket,
@ -15,8 +24,34 @@ class Copier(amazonS3: AmazonS3) {
targetKey: RemoteKey targetKey: RemoteKey
): Task[StorageQueueEvent] = ): Task[StorageQueueEvent] =
for { for {
_ <- copyObject(bucket, sourceKey, hash, targetKey) copyResult <- copyObject(bucket, sourceKey, hash, targetKey)
} yield CopyQueueEvent(targetKey) result <- mapCopyResult(copyResult, sourceKey, targetKey)
} yield result
private def mapCopyResult(
copyResult: Try[Option[CopyObjectResult]],
sourceKey: RemoteKey,
targetKey: RemoteKey
) =
copyResult match {
case Success(None) =>
Task.succeed(
StorageQueueEvent
.ErrorQueueEvent(
Action.Copy(s"${sourceKey.key} => ${targetKey.key}"),
targetKey,
HashMatchError))
case Success(Some(_)) =>
Task.succeed(CopyQueueEvent(sourceKey, targetKey))
case Failure(e: AmazonS3Exception) =>
Task.succeed(
StorageQueueEvent.ErrorQueueEvent(
Action.Copy(s"${sourceKey.key} => ${targetKey.key}"),
targetKey,
S3Exception(e.getMessage))
)
case Failure(e) => Task.fail(e)
}
private def copyObject( private def copyObject(
bucket: Bucket, bucket: Bucket,
@ -31,7 +66,7 @@ class Copier(amazonS3: AmazonS3) {
bucket.name, bucket.name,
targetKey.key targetKey.key
).withMatchingETagConstraint(hash.hash) ).withMatchingETagConstraint(hash.hash)
Task(amazonS3.copyObject(request)) Task(Try(amazonS3.copyObject(request)))
} }
} }

View file

@ -1,12 +1,11 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.DeleteObjectRequest import com.amazonaws.services.s3.model.DeleteObjectRequest
import net.kemitix.thorp.domain.StorageQueueEvent.DeleteQueueEvent import net.kemitix.thorp.domain.StorageQueueEvent.DeleteQueueEvent
import net.kemitix.thorp.domain.{Bucket, RemoteKey, StorageQueueEvent} import net.kemitix.thorp.domain.{Bucket, RemoteKey, StorageQueueEvent}
import zio.Task import zio.Task
class Deleter(amazonS3: AmazonS3) { class Deleter(amazonS3: AmazonS3.Client) {
def delete( def delete(
bucket: Bucket, bucket: Bucket,

View file

@ -1,17 +1,16 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary} import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary}
import net.kemitix.thorp.console._
import net.kemitix.thorp.domain import net.kemitix.thorp.domain
import net.kemitix.thorp.domain.{Bucket, RemoteKey, S3ObjectsData} import net.kemitix.thorp.domain.{Bucket, RemoteKey, S3ObjectsData}
import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash
import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey
import zio.console.Console import zio.{Task, TaskR}
import zio.{IO, Task, TaskR, ZIO}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
class Lister(amazonS3: AmazonS3) { class Lister(amazonS3: AmazonS3.Client) {
private type Token = String private type Token = String
private type Batch = (Stream[S3ObjectSummary], Option[Token]) private type Batch = (Stream[S3ObjectSummary], Option[Token])
@ -19,7 +18,7 @@ class Lister(amazonS3: AmazonS3) {
def listObjects( def listObjects(
bucket: Bucket, bucket: Bucket,
prefix: RemoteKey prefix: RemoteKey
): TaskR[Console, S3ObjectsData] = { ): TaskR[MyConsole, S3ObjectsData] = {
val requestMore = (token: Token) => val requestMore = (token: Token) =>
new ListObjectsV2Request() new ListObjectsV2Request()
@ -27,7 +26,7 @@ class Lister(amazonS3: AmazonS3) {
.withPrefix(prefix.key) .withPrefix(prefix.key)
.withContinuationToken(token) .withContinuationToken(token)
def fetchBatch: ListObjectsV2Request => TaskR[Console, Batch] = def fetchBatch: ListObjectsV2Request => TaskR[MyConsole, Batch] =
request => request =>
for { for {
_ <- ListerLogger.logFetchBatch _ <- ListerLogger.logFetchBatch
@ -36,14 +35,15 @@ class Lister(amazonS3: AmazonS3) {
def fetchMore( def fetchMore(
more: Option[Token] more: Option[Token]
): TaskR[Console, Stream[S3ObjectSummary]] = { ): TaskR[MyConsole, Stream[S3ObjectSummary]] = {
more match { more match {
case None => ZIO.succeed(Stream.empty) case None => TaskR.succeed(Stream.empty)
case Some(token) => fetch(requestMore(token)) case Some(token) => fetch(requestMore(token))
} }
} }
def fetch: ListObjectsV2Request => TaskR[Console, Stream[S3ObjectSummary]] = def fetch
: ListObjectsV2Request => TaskR[MyConsole, Stream[S3ObjectSummary]] =
request => { request => {
for { for {
batch <- fetchBatch(request) batch <- fetchBatch(request)
@ -63,7 +63,7 @@ class Lister(amazonS3: AmazonS3) {
private def tryFetchBatch( private def tryFetchBatch(
request: ListObjectsV2Request request: ListObjectsV2Request
): Task[(Stream[S3ObjectSummary], Option[Token])] = ): Task[(Stream[S3ObjectSummary], Option[Token])] =
IO(amazonS3.listObjectsV2(request)) Task(amazonS3.listObjectsV2(request))
.map { result => .map { result =>
val more: Option[Token] = val more: Option[Token] =
if (result.isTruncated) Some(result.getNextContinuationToken) if (result.isTruncated) Some(result.getNextContinuationToken)

View file

@ -1,10 +1,10 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import net.kemitix.thorp.console._
import zio.TaskR import zio.TaskR
import zio.console._
trait ListerLogger { trait ListerLogger {
def logFetchBatch: TaskR[Console, Unit] = def logFetchBatch: TaskR[MyConsole, Unit] =
putStrLn("Fetching remote summaries...") putStrLn("Fetching remote summaries...")
} }
object ListerLogger extends ListerLogger object ListerLogger extends ListerLogger

View file

@ -0,0 +1,13 @@
package net.kemitix.thorp.storage.aws
sealed trait S3ClientException extends Exception
object S3ClientException {
case object HashMatchError extends S3ClientException {
override def getMessage: String =
"The hash of the object to be overwritten did not match the the expected value"
}
case class S3Exception(message: String) extends S3ClientException {
override def getMessage: String = message
}
}

View file

@ -1,14 +1,13 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.AmazonS3 import net.kemitix.thorp.console.MyConsole
import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.StorageService import net.kemitix.thorp.storage.api.StorageService
import zio.console.Console
import zio.{Task, TaskR} import zio.{Task, TaskR}
class S3StorageService( class S3StorageService(
amazonS3Client: => AmazonS3, amazonS3Client: => AmazonS3.Client,
amazonTransferManager: => AmazonTransferManager amazonTransferManager: => AmazonTransferManager
) extends StorageService { ) extends StorageService {
@ -20,7 +19,7 @@ class S3StorageService(
override def listObjects( override def listObjects(
bucket: Bucket, bucket: Bucket,
prefix: RemoteKey prefix: RemoteKey
): TaskR[Console, S3ObjectsData] = ): TaskR[MyConsole, S3ObjectsData] =
objectLister.listObjects(bucket, prefix) objectLister.listObjects(bucket, prefix)
override def copy( override def copy(

View file

@ -1,13 +1,13 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import com.amazonaws.services.s3.transfer.TransferManagerBuilder import com.amazonaws.services.s3.transfer.TransferManagerBuilder
import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder}
import net.kemitix.thorp.storage.api.StorageService import net.kemitix.thorp.storage.api.StorageService
object S3StorageServiceBuilder { object S3StorageServiceBuilder {
def createService( def createService(
amazonS3Client: AmazonS3, amazonS3Client: AmazonS3.Client,
amazonTransferManager: AmazonTransferManager amazonTransferManager: AmazonTransferManager
): StorageService = ): StorageService =
new S3StorageService( new S3StorageService(
@ -17,7 +17,7 @@ object S3StorageServiceBuilder {
lazy val defaultStorageService: StorageService = lazy val defaultStorageService: StorageService =
createService( createService(
AmazonS3ClientBuilder.defaultClient, AmazonS3.ClientImpl(AmazonS3ClientBuilder.defaultClient),
AmazonTransferManager(TransferManagerBuilder.defaultTransferManager) AmazonTransferManager(TransferManagerBuilder.defaultTransferManager)
) )

View file

@ -3,6 +3,7 @@ package net.kemitix.thorp.storage.aws
import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener} import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener}
import com.amazonaws.services.s3.model.{ObjectMetadata, PutObjectRequest} import com.amazonaws.services.s3.model.{ObjectMetadata, PutObjectRequest}
import net.kemitix.thorp.domain.StorageQueueEvent.{ import net.kemitix.thorp.domain.StorageQueueEvent.{
Action,
ErrorQueueEvent, ErrorQueueEvent,
UploadQueueEvent UploadQueueEvent
} }
@ -39,7 +40,12 @@ class Uploader(transferManager: => AmazonTransferManager) {
.map(_.waitForUploadResult) .map(_.waitForUploadResult)
.map(upload => .map(upload =>
UploadQueueEvent(RemoteKey(upload.getKey), MD5Hash(upload.getETag))) UploadQueueEvent(RemoteKey(upload.getKey), MD5Hash(upload.getETag)))
.catchAll(e => Task.succeed(ErrorQueueEvent(localFile.remoteKey, e))) .catchAll(
e =>
Task.succeed(
ErrorQueueEvent(Action.Upload(localFile.remoteKey.key),
localFile.remoteKey,
e)))
} }
private def request( private def request(

View file

@ -0,0 +1,18 @@
package net.kemitix.thorp.storage.aws
import org.scalamock.scalatest.MockFactory
trait AmazonS3ClientTestFixture extends MockFactory {
val fixture: Fixture =
Fixture(stub[AmazonS3.Client], stub[AmazonTransferManager])
case class Fixture(
amazonS3Client: AmazonS3.Client,
amazonS3TransferManager: AmazonTransferManager,
) {
lazy val storageService: S3StorageService =
new S3StorageService(amazonS3Client, amazonS3TransferManager)
}
}

View file

@ -0,0 +1,98 @@
package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.model.{AmazonS3Exception, CopyObjectResult}
import net.kemitix.thorp.console.MyConsole
import net.kemitix.thorp.domain.StorageQueueEvent.{Action, ErrorQueueEvent}
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.aws.S3ClientException.{
HashMatchError,
S3Exception
}
import org.scalatest.FreeSpec
import zio.Runtime
import zio.internal.PlatformLive
class CopierTest extends FreeSpec {
private val runtime = Runtime(MyConsole.Live, PlatformLive.Default)
"copier" - {
val bucket = Bucket("aBucket")
val sourceKey = RemoteKey("sourceKey")
val hash = MD5Hash("aHash")
val targetKey = RemoteKey("targetKey")
"when source exists" - {
"when source hash matches" - {
"copies from source to target" in {
val event = StorageQueueEvent.CopyQueueEvent(sourceKey, targetKey)
val expected = Right(event)
new AmazonS3ClientTestFixture {
(fixture.amazonS3Client.copyObject _)
.when()
.returns(_ => Some(new CopyObjectResult))
private val result =
invoke(bucket, sourceKey, hash, targetKey, fixture.storageService)
assertResult(expected)(result)
}
}
}
"when source hash does not match" - {
"skip the file with an error" in {
new AmazonS3ClientTestFixture {
(fixture.amazonS3Client.copyObject _)
.when()
.returns(_ => None)
private val result =
invoke(bucket, sourceKey, hash, targetKey, fixture.storageService)
result match {
case Right(
ErrorQueueEvent(Action.Copy("sourceKey => targetKey"),
RemoteKey("targetKey"),
e)) =>
e match {
case HashMatchError => assert(true)
case _ => fail("Not a HashMatchError")
}
case e => fail("Not an ErrorQueueEvent: " + e)
}
}
}
}
"when client throws an exception" - {
"skip the file with an error" in {
new AmazonS3ClientTestFixture {
private val expectedMessage = "The specified key does not exist"
(fixture.amazonS3Client.copyObject _)
.when()
.throws(new AmazonS3Exception(expectedMessage))
private val result =
invoke(bucket, sourceKey, hash, targetKey, fixture.storageService)
result match {
case Right(
ErrorQueueEvent(Action.Copy("sourceKey => targetKey"),
RemoteKey("targetKey"),
e)) =>
e match {
case S3Exception(message) =>
assert(message.startsWith(expectedMessage))
case _ => fail("Not an S3Exception")
}
case e => fail("Not an ErrorQueueEvent: " + e)
}
}
}
}
}
def invoke(
bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey,
storageService: S3StorageService
) =
runtime.unsafeRunSync {
storageService.copy(bucket, sourceKey, hash, targetKey)
}.toEither
}
}

View file

@ -4,35 +4,20 @@ import java.time.Instant
import java.time.temporal.ChronoUnit import java.time.temporal.ChronoUnit
import java.util.Date import java.util.Date
import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.{ListObjectsV2Result, S3ObjectSummary}
import com.amazonaws.services.s3.model.{ import net.kemitix.thorp.console.MyConsole
ListObjectsV2Request,
ListObjectsV2Result,
S3ObjectSummary
}
import net.kemitix.thorp.core.Resource import net.kemitix.thorp.core.Resource
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import org.scalamock.scalatest.MockFactory import org.scalamock.scalatest.MockFactory
import org.scalatest.FunSpec import org.scalatest.FreeSpec
import zio.DefaultRuntime import zio.Runtime
import zio.internal.PlatformLive
class S3StorageServiceSuite extends FunSpec with MockFactory { class S3StorageServiceSuite extends FreeSpec with MockFactory {
private val runtime = new DefaultRuntime {} private val runtime = Runtime(MyConsole.Live, PlatformLive.Default)
describe("listObjectsInPrefix") {
val source = Resource(this, "upload")
val sourcePath = source.toPath
val prefix = RemoteKey("prefix")
implicit val config: Config =
Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
val lm = LastModified(Instant.now.truncatedTo(ChronoUnit.MILLIS))
val h1 = MD5Hash("hash1")
val k1a = RemoteKey("key1a")
"listObjects" - {
def objectSummary(hash: MD5Hash, def objectSummary(hash: MD5Hash,
remoteKey: RemoteKey, remoteKey: RemoteKey,
lastModified: LastModified) = { lastModified: LastModified) = {
@ -42,31 +27,27 @@ class S3StorageServiceSuite extends FunSpec with MockFactory {
summary.setLastModified(Date.from(lastModified.when)) summary.setLastModified(Date.from(lastModified.when))
summary summary
} }
val source = Resource(this, "upload")
val o1a = objectSummary(h1, k1a, lm) val sourcePath = source.toPath
val prefix = RemoteKey("prefix")
val k1b = RemoteKey("key1b") implicit val config: Config =
val o1b = objectSummary(h1, k1b, lm) Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
val lm = LastModified(Instant.now.truncatedTo(ChronoUnit.MILLIS))
val h2 = MD5Hash("hash2") val h1 = MD5Hash("hash1")
val k2 = RemoteKey("key2") val k1a = RemoteKey("key1a")
val o2 = objectSummary(h2, k2, lm) val o1a = objectSummary(h1, k1a, lm)
val k1b = RemoteKey("key1b")
val amazonS3 = stub[AmazonS3] val o1b = objectSummary(h1, k1b, lm)
val amazonS3TransferManager = stub[AmazonTransferManager] val h2 = MD5Hash("hash2")
val storageService = new S3StorageService(amazonS3, amazonS3TransferManager) val k2 = RemoteKey("key2")
val o2 = objectSummary(h2, k2, lm)
val myFakeResponse = new ListObjectsV2Result() val myFakeResponse = new ListObjectsV2Result()
val summaries = myFakeResponse.getObjectSummaries val summaries = myFakeResponse.getObjectSummaries
summaries.add(o1a) summaries.add(o1a)
summaries.add(o1b) summaries.add(o1b)
summaries.add(o2) summaries.add(o2)
(amazonS3 listObjectsV2 (_: ListObjectsV2Request))
.when(*)
.returns(myFakeResponse)
it( "should build list of hash lookups, with duplicate objects grouped by hash" in {
"should build list of hash lookups, with duplicate objects grouped by hash") {
val expected = Right( val expected = Right(
S3ObjectsData( S3ObjectsData(
byHash = Map(h1 -> Set(KeyModified(k1a, lm), KeyModified(k1b, lm)), byHash = Map(h1 -> Set(KeyModified(k1a, lm), KeyModified(k1b, lm)),
@ -75,15 +56,19 @@ class S3StorageServiceSuite extends FunSpec with MockFactory {
k1b -> HashModified(h1, lm), k1b -> HashModified(h1, lm),
k2 -> HashModified(h2, lm)) k2 -> HashModified(h2, lm))
)) ))
val result = invoke(storageService) new AmazonS3ClientTestFixture {
assertResult(expected)(result) (fixture.amazonS3Client.listObjectsV2 _)
.when()
.returns(_ => myFakeResponse)
private val result = invoke(fixture.storageService)
assertResult(expected)(result)
}
} }
def invoke(storageService: S3StorageService) =
runtime.unsafeRunSync {
storageService
.listObjects(Bucket("bucket"), RemoteKey("prefix"))
}.toEither
} }
private def invoke(storageService: S3StorageService) =
runtime.unsafeRunSync {
storageService
.listObjects(Bucket("bucket"), RemoteKey("prefix"))
}.toEither
} }

View file

@ -2,7 +2,6 @@ package net.kemitix.thorp.storage.aws
import java.time.Instant import java.time.Instant
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.PutObjectRequest import com.amazonaws.services.s3.model.PutObjectRequest
import com.amazonaws.services.s3.transfer.model.UploadResult import com.amazonaws.services.s3.transfer.model.UploadResult
import net.kemitix.thorp.core.{KeyGenerator, Resource, S3MetaDataEnricher} import net.kemitix.thorp.core.{KeyGenerator, Resource, S3MetaDataEnricher}
@ -122,10 +121,10 @@ class StorageServiceSuite extends FunSpec with MockFactory {
describe("upload") { describe("upload") {
describe("when uploading a file") { describe("when uploading a file") {
val amazonS3 = stub[AmazonS3] val amazonS3Client = stub[AmazonS3.Client]
val amazonTransferManager = stub[AmazonTransferManager] val amazonTransferManager = stub[AmazonTransferManager]
val storageService = val storageService =
new S3StorageService(amazonS3, amazonTransferManager) new S3StorageService(amazonS3Client, amazonTransferManager)
val prefix = RemoteKey("prefix") val prefix = RemoteKey("prefix")
val localFile = val localFile =

View file

@ -1,55 +0,0 @@
package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.transfer._
import net.kemitix.thorp.core.KeyGenerator.generateKey
import net.kemitix.thorp.core.Resource
import net.kemitix.thorp.domain.StorageQueueEvent.UploadQueueEvent
import net.kemitix.thorp.domain.{UploadEventListener, _}
import org.scalamock.scalatest.MockFactory
import org.scalatest.FunSpec
class UploaderSuite extends FunSpec with MockFactory {
private val batchMode: Boolean = true
private val source = Resource(this, ".")
private val sourcePath = source.toPath
private val prefix = RemoteKey("prefix")
implicit private val config: Config =
Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
private val fileToKey = generateKey(config.sources, config.prefix) _
def md5HashMap(hash: MD5Hash): Map[String, MD5Hash] = Map("md5" -> hash)
describe("S3ClientMultiPartTransferManagerSuite") {
describe("upload") {
pending
// how much of this test is testing the amazonTransferManager
// Should we just test that the correct parameters are passed to initiate, or will this test
// just collapse and die if the amazonS3 doesn't respond properly to TransferManager input
// dies when putObject is called
val returnedKey = RemoteKey("returned-key")
val returnedHash = MD5Hash("returned-hash")
val bigFile = LocalFile.resolve("small-file",
md5HashMap(MD5Hash("the-hash")),
sourcePath,
fileToKey)
val uploadEventListener =
UploadEventListener(bigFile, 1, SyncTotals(), 0L)
val amazonS3 = mock[AmazonS3]
val amazonTransferManager =
AmazonTransferManager(
TransferManagerBuilder.standard().withS3Client(amazonS3).build)
val uploader = new Uploader(amazonTransferManager)
it("should upload") {
val expected = UploadQueueEvent(returnedKey, returnedHash)
val result = uploader.upload(bigFile,
config.bucket,
batchMode,
uploadEventListener,
1)
assertResult(expected)(result)
}
}
}
}