Introduce backend abstraction to hide S3 (#76)

Add backend abstraction to hide S3
This commit is contained in:
Paul Campbell 2019-06-22 07:20:59 +01:00 committed by GitHub
parent 761c1c9784
commit 9d2271fdcf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
39 changed files with 252 additions and 254 deletions

View file

@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [[https://keepachangelog.com/en/1.0.0/][Keep a Changelog]], and this project adheres to
[[https://semver.org/spec/v2.0.0.html][Semantic Versioning]].
* [0.6.0] - 2019-??-??
** Added
- Abstraction layer encapsulating S3 as Storage (#76)
* [0.5.0] - 2019-06-21
** Added

View file

@ -1,40 +0,0 @@
package net.kemitix.thorp.aws.api
import net.kemitix.thorp.domain.{MD5Hash, RemoteKey}
sealed trait S3Action {
// the remote key that was uploaded, deleted or otherwise updated by the action
def remoteKey: RemoteKey
val order: Int
}
object S3Action {
final case class DoNothingS3Action(remoteKey: RemoteKey) extends S3Action {
override val order: Int = 0
}
final case class CopyS3Action(remoteKey: RemoteKey) extends S3Action {
override val order: Int = 1
}
final case class UploadS3Action(
remoteKey: RemoteKey,
md5Hash: MD5Hash) extends S3Action {
override val order: Int = 2
}
final case class DeleteS3Action(remoteKey: RemoteKey) extends S3Action {
override val order: Int = 3
}
final case class ErroredS3Action(remoteKey: RemoteKey, e: Throwable) extends S3Action {
override val order: Int = 10
}
implicit def ord[A <: S3Action]: Ordering[A] = Ordering.by(_.order)
}

View file

@ -1,16 +0,0 @@
package net.kemitix.thorp.aws.lib
import com.amazonaws.services.s3.transfer.{TransferManager, TransferManagerBuilder}
import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder}
import net.kemitix.thorp.aws.api.S3Client
object S3ClientBuilder {
def createClient(amazonS3Client: AmazonS3,
amazonS3TransferManager: TransferManager): S3Client =
new ThorpS3Client(amazonS3Client, amazonS3TransferManager)
def defaultClient: S3Client =
createClient(AmazonS3ClientBuilder.defaultClient, TransferManagerBuilder.defaultTransferManager)
}

View file

@ -41,16 +41,16 @@ val catsEffectsSettings = Seq(
"-Ypartial-unification")
)
// cli -> thorp-lib -> aws-lib -> core -> aws-api -> domain
// cli -> thorp-lib -> storage-aws -> core -> storage-api -> domain
lazy val root = (project in file("."))
lazy val thorp = (project in file("."))
.settings(commonSettings)
lazy val cli = (project in file("cli"))
.settings(commonSettings)
.settings(mainClass in assembly := Some("net.kemitix.thorp.cli.Main"))
.settings(applicationSettings)
.aggregate(`thorp-lib`, `aws-lib`, core, `aws-api`, domain)
.aggregate(`thorp-lib`, `storage-aws`, core, `storage-api`, domain)
.settings(commandLineParsing)
.settings(testDependencies)
.dependsOn(`thorp-lib`)
@ -58,11 +58,11 @@ lazy val cli = (project in file("cli"))
lazy val `thorp-lib` = (project in file("thorp-lib"))
.settings(commonSettings)
.settings(assemblyJarName in assembly := "thorp-lib.jar")
.dependsOn(`aws-lib`)
.dependsOn(`storage-aws`)
lazy val `aws-lib` = (project in file("aws-lib"))
lazy val `storage-aws` = (project in file("storage-aws"))
.settings(commonSettings)
.settings(assemblyJarName in assembly := "aws-lib.jar")
.settings(assemblyJarName in assembly := "storage-aws.jar")
.settings(awsSdkDependencies)
.settings(testDependencies)
.dependsOn(core)
@ -71,11 +71,11 @@ lazy val core = (project in file("core"))
.settings(commonSettings)
.settings(assemblyJarName in assembly := "core.jar")
.settings(testDependencies)
.dependsOn(`aws-api`)
.dependsOn(`storage-api`)
lazy val `aws-api` = (project in file("aws-api"))
lazy val `storage-api` = (project in file("storage-api"))
.settings(commonSettings)
.settings(assemblyJarName in assembly := "aws-api.jar")
.settings(assemblyJarName in assembly := "storage-api.jar")
.dependsOn(domain)
lazy val domain = (project in file("domain"))

View file

@ -1,15 +1,15 @@
package net.kemitix.thorp.cli
import cats.effect.{ExitCode, IO}
import net.kemitix.thorp.aws.lib.S3ClientBuilder
import net.kemitix.thorp.core.{ConfigOption, Sync}
import net.kemitix.thorp.domain.Logger
import net.kemitix.thorp.storage.aws.S3StorageServiceBuilder.defaultStorageService
trait Program {
def apply(configOptions: Seq[ConfigOption]): IO[ExitCode] = {
implicit val logger: Logger = new PrintLogger()
Sync(S3ClientBuilder.defaultClient)(configOptions) flatMap {
Sync(defaultStorageService)(configOptions) flatMap {
case Left(errors) =>
for {
_ <- logger.error(s"There were errors:")

View file

@ -1,37 +1,39 @@
package net.kemitix.thorp.core
import cats.effect.IO
import net.kemitix.thorp.aws.api.S3Action.DoNothingS3Action
import net.kemitix.thorp.aws.api.{S3Action, S3Client, UploadProgressListener}
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain.{Config, Logger}
import net.kemitix.thorp.domain.{Config, Logger, StorageQueueEvent, UploadEventListener}
import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent
import net.kemitix.thorp.storage.api.StorageService
object ActionSubmitter {
trait ActionSubmitter {
def submitAction(s3Client: S3Client,
def submitAction(storageService: StorageService,
action: Action)
(implicit c: Config,
logger: Logger): Stream[IO[S3Action]] = {
logger: Logger): Stream[IO[StorageQueueEvent]] = {
Stream(
action match {
case ToUpload(bucket, localFile) =>
for {
_ <- logger.info(s" Upload: ${localFile.relative}")
progressListener = new UploadProgressListener(localFile)
action <- s3Client.upload(localFile, bucket, progressListener, 1)
} yield action
uploadEventListener = new UploadEventListener(localFile)
event <- storageService.upload(localFile, bucket, uploadEventListener, 1)
} yield event
case ToCopy(bucket, sourceKey, hash, targetKey) =>
for {
_ <- logger.info(s" Copy: ${sourceKey.key} => ${targetKey.key}")
action <- s3Client.copy(bucket, sourceKey, hash, targetKey)
} yield action
event <- storageService.copy(bucket, sourceKey, hash, targetKey)
} yield event
case ToDelete(bucket, remoteKey) =>
for {
_ <- logger.info(s" Delete: ${remoteKey.key}")
action <- s3Client.delete(bucket, remoteKey)
} yield action
case DoNothing(bucket, remoteKey) =>
IO.pure(DoNothingS3Action(remoteKey))
event <- storageService.delete(bucket, remoteKey)
} yield event
case DoNothing(_, remoteKey) =>
IO.pure(DoNothingQueueEvent(remoteKey))
})
}
}
object ActionSubmitter extends ActionSubmitter

View file

@ -3,7 +3,7 @@ package net.kemitix.thorp.core
import java.nio.file.Paths
import java.util.regex.Pattern
import net.kemitix.thorp.core.ConfigOption.{Bucket, Debug, Exclude, Include, Prefix, Source}
import net.kemitix.thorp.core.ConfigOption._
trait ParseConfigLines {

View file

@ -2,7 +2,6 @@ package net.kemitix.thorp.core
import cats.effect.IO
import cats.implicits._
import net.kemitix.thorp.aws.api.{S3Action, S3Client}
import net.kemitix.thorp.core.Action.ToDelete
import net.kemitix.thorp.core.ActionGenerator.createActions
import net.kemitix.thorp.core.ActionSubmitter.submitAction
@ -11,6 +10,7 @@ import net.kemitix.thorp.core.LocalFileStream.findFiles
import net.kemitix.thorp.core.S3MetaDataEnricher.getMetadata
import net.kemitix.thorp.core.SyncLogging.{logFileScan, logRunFinished, logRunStart}
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.StorageService
trait Sync {
@ -20,24 +20,24 @@ trait Sync {
} yield errorMessages
}
def apply(s3Client: S3Client)
def apply(storageService: StorageService)
(configOptions: Seq[ConfigOption])
(implicit defaultLogger: Logger): IO[Either[List[String], Unit]] =
buildConfig(configOptions).flatMap {
case Right(config) => runWithValidConfig(s3Client, defaultLogger, config)
case Right(config) => runWithValidConfig(storageService, defaultLogger, config)
case Left(errors) => IO.pure(Left(errorMessages(errors.toList)))
}
private def runWithValidConfig(s3Client: S3Client,
private def runWithValidConfig(storageService: StorageService,
defaultLogger: Logger,
config: Config) = {
for {
_ <- run(config, s3Client, defaultLogger.withDebug(config.debug))
_ <- run(config, storageService, defaultLogger.withDebug(config.debug))
} yield Right(())
}
private def run(cliConfig: Config,
s3Client: S3Client,
storageService: StorageService,
logger: Logger): IO[Unit] = {
implicit val c: Config = cliConfig
@ -50,9 +50,9 @@ trait Sync {
IO.pure(sData.flatMap(s3MetaData => createActions(s3MetaData)))
def submit(sActions: Stream[Action]) =
IO(sActions.flatMap(action => submitAction(s3Client, action)))
IO(sActions.flatMap(action => submitAction(storageService, action)))
def copyUploadActions(s3Data: S3ObjectsData): IO[Stream[S3Action]] =
def copyUploadActions(s3Data: S3ObjectsData): IO[Stream[StorageQueueEvent]] =
(for {
files <- findFiles(c.source, MD5HashGenerator.md5File(_))
metaData <- metaData(s3Data, files)
@ -62,18 +62,18 @@ trait Sync {
.flatten
.map(streamS3Actions => streamS3Actions.sorted)
def deleteActions(s3ObjectsData: S3ObjectsData): IO[Stream[S3Action]] =
def deleteActions(s3ObjectsData: S3ObjectsData): IO[Stream[StorageQueueEvent]] =
(for {
key <- s3ObjectsData.byKey.keys
if key.isMissingLocally(c.source, c.prefix)
ioDelAction <- submitAction(s3Client, ToDelete(c.bucket, key))
ioDelAction <- submitAction(storageService, ToDelete(c.bucket, key))
} yield ioDelAction)
.toStream
.sequence
for {
_ <- logRunStart
s3data <- s3Client.listObjects(c.bucket, c.prefix)
s3data <- storageService.listObjects(c.bucket, c.prefix)
_ <- logFileScan
copyUploadActions <- copyUploadActions(s3data)
deleteActions <- deleteActions(s3data)

View file

@ -2,12 +2,11 @@ package net.kemitix.thorp.core
import cats.effect.IO
import cats.implicits._
import net.kemitix.thorp.aws.api.S3Action
import net.kemitix.thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action, ErroredS3Action, UploadS3Action}
import net.kemitix.thorp.domain.{Config, Logger}
import net.kemitix.thorp.domain.{Config, Logger, StorageQueueEvent}
import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, ErrorQueueEvent, UploadQueueEvent}
// Logging for the Sync class
object SyncLogging {
trait SyncLogging {
def logRunStart(implicit c: Config,
logger: Logger): IO[Unit] =
@ -17,16 +16,16 @@ object SyncLogging {
logger: Logger): IO[Unit] =
logger.info(s"Scanning local files: ${c.source}...")
def logErrors(actions: Stream[S3Action])
def logErrors(actions: Stream[StorageQueueEvent])
(implicit logger: Logger): IO[Unit] =
for {
_ <- actions.map {
case ErroredS3Action(k, e) => logger.warn(s"${k.key}: ${e.getMessage}")
case ErrorQueueEvent(k, e) => logger.warn(s"${k.key}: ${e.getMessage}")
case _ => IO.unit
}.sequence
} yield ()
def logRunFinished(actions: Stream[S3Action])
def logRunFinished(actions: Stream[StorageQueueEvent])
(implicit c: Config,
logger: Logger): IO[Unit] = {
val counters = actions.foldLeft(Counters())(countActivities)
@ -40,19 +39,21 @@ object SyncLogging {
}
private def countActivities(implicit c: Config,
logger: Logger): (Counters, S3Action) => Counters =
(counters: Counters, s3Action: S3Action) => {
logger: Logger): (Counters, StorageQueueEvent) => Counters =
(counters: Counters, s3Action: StorageQueueEvent) => {
s3Action match {
case _: UploadS3Action =>
case _: UploadQueueEvent =>
counters.copy(uploaded = counters.uploaded + 1)
case _: CopyS3Action =>
case _: CopyQueueEvent =>
counters.copy(copied = counters.copied + 1)
case _: DeleteS3Action =>
case _: DeleteQueueEvent =>
counters.copy(deleted = counters.deleted + 1)
case ErroredS3Action(k, e) =>
case ErrorQueueEvent(k, e) =>
counters.copy(errors = counters.errors + 1)
case _ => counters
}
}
}
object SyncLogging extends SyncLogging

View file

@ -1,17 +1,17 @@
package net.kemitix.thorp.core
import net.kemitix.thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action, UploadS3Action}
import net.kemitix.thorp.domain.{MD5Hash, RemoteKey}
import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, UploadQueueEvent}
import org.scalatest.FunSpec
class S3ActionSuite extends FunSpec {
class StorageQueueEventSuite extends FunSpec {
describe("Ordering of types") {
val remoteKey = RemoteKey("remote-key")
val md5Hash = MD5Hash("md5hash")
val copy = CopyS3Action(remoteKey)
val upload = UploadS3Action(remoteKey, md5Hash)
val delete = DeleteS3Action(remoteKey)
val copy = CopyQueueEvent(remoteKey)
val upload = UploadQueueEvent(remoteKey, md5Hash)
val delete = DeleteQueueEvent(remoteKey)
val unsorted = List(delete, copy, upload)
it("should sort as copy < upload < delete ") {
val result = unsorted.sorted

View file

@ -4,11 +4,10 @@ import java.io.File
import java.time.Instant
import cats.effect.IO
import net.kemitix.thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action, UploadS3Action}
import net.kemitix.thorp.aws.api.{S3Client, UploadProgressListener}
import net.kemitix.thorp.core.MD5HashData.{leafHash, rootHash}
import net.kemitix.thorp.domain.Filter.Exclude
import net.kemitix.thorp.domain._
import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, UploadQueueEvent}
import net.kemitix.thorp.storage.api.StorageService
import org.scalatest.FunSpec
class SyncSuite
@ -35,30 +34,31 @@ class SyncSuite
val rootRemoteKey = RemoteKey("prefix/root-file")
val leafRemoteKey = RemoteKey("prefix/subdir/leaf-file")
def invokeSubject(s3Client: RecordingClient, configOptions: List[ConfigOption]) = {
Sync(s3Client)(configOptions).unsafeRunSync
def invokeSubject(storageService: StorageService,
configOptions: List[ConfigOption]) = {
Sync(storageService)(configOptions).unsafeRunSync
}
describe("when all files should be uploaded") {
val s3Client = new RecordingClient(testBucket, S3ObjectsData(
val storageService = new RecordingStorageService(testBucket, S3ObjectsData(
byHash = Map(),
byKey = Map()))
it("uploads all files") {
val expectedUploads = Map(
"subdir/leaf-file" -> leafRemoteKey,
"root-file" -> rootRemoteKey)
invokeSubject(s3Client, configOptions)
assertResult(expectedUploads)(s3Client.uploadsRecord)
invokeSubject(storageService, configOptions)
assertResult(expectedUploads)(storageService.uploadsRecord)
}
it("copies nothing") {
val expectedCopies = Map()
invokeSubject(s3Client, configOptions)
assertResult(expectedCopies)(s3Client.copiesRecord)
invokeSubject(storageService, configOptions)
assertResult(expectedCopies)(storageService.copiesRecord)
}
it("deletes nothing") {
val expectedDeletions = Set()
invokeSubject(s3Client, configOptions)
assertResult(expectedDeletions)(s3Client.deletionsRecord)
invokeSubject(storageService, configOptions)
assertResult(expectedDeletions)(storageService.deletionsRecord)
}
}
describe("when no files should be uploaded") {
@ -69,21 +69,21 @@ class SyncSuite
byKey = Map(
RemoteKey("prefix/root-file") -> HashModified(rootHash, lastModified),
RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified)))
val s3Client = new RecordingClient(testBucket, s3ObjectsData)
val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
it("uploads nothing") {
val expectedUploads = Map()
invokeSubject(s3Client, configOptions)
assertResult(expectedUploads)(s3Client.uploadsRecord)
invokeSubject(storageService, configOptions)
assertResult(expectedUploads)(storageService.uploadsRecord)
}
it("copies nothing") {
val expectedCopies = Map()
invokeSubject(s3Client, configOptions)
assertResult(expectedCopies)(s3Client.copiesRecord)
invokeSubject(storageService, configOptions)
assertResult(expectedCopies)(storageService.copiesRecord)
}
it("deletes nothing") {
val expectedDeletions = Set()
invokeSubject(s3Client, configOptions)
assertResult(expectedDeletions)(s3Client.deletionsRecord)
invokeSubject(storageService, configOptions)
assertResult(expectedDeletions)(storageService.deletionsRecord)
}
}
describe("when a file is renamed it is moved on S3 with no upload") {
@ -95,21 +95,21 @@ class SyncSuite
byKey = Map(
RemoteKey("prefix/root-file-old") -> HashModified(rootHash, lastModified),
RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified)))
val s3Client = new RecordingClient(testBucket, s3ObjectsData)
val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
it("uploads nothing") {
invokeSubject(s3Client, configOptions)
invokeSubject(storageService, configOptions)
val expectedUploads = Map()
assertResult(expectedUploads)(s3Client.uploadsRecord)
assertResult(expectedUploads)(storageService.uploadsRecord)
}
it("copies the file") {
val expectedCopies = Map(RemoteKey("prefix/root-file-old") -> RemoteKey("prefix/root-file"))
invokeSubject(s3Client, configOptions)
assertResult(expectedCopies)(s3Client.copiesRecord)
invokeSubject(storageService, configOptions)
assertResult(expectedCopies)(storageService.copiesRecord)
}
it("deletes the original") {
val expectedDeletions = Set(RemoteKey("prefix/root-file-old"))
invokeSubject(s3Client, configOptions)
assertResult(expectedDeletions)(s3Client.deletionsRecord)
invokeSubject(storageService, configOptions)
assertResult(expectedDeletions)(storageService.deletionsRecord)
}
}
describe("when a file is copied it is copied on S3 with no upload") {
@ -125,29 +125,29 @@ class SyncSuite
deletedHash -> Set(KeyModified(RemoteKey("prefix/deleted-file"), lastModified))),
byKey = Map(
deletedKey -> HashModified(deletedHash, lastModified)))
val s3Client = new RecordingClient(testBucket, s3ObjectsData)
val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
it("deleted key") {
val expectedDeletions = Set(deletedKey)
invokeSubject(s3Client, configOptions)
assertResult(expectedDeletions)(s3Client.deletionsRecord)
invokeSubject(storageService, configOptions)
assertResult(expectedDeletions)(storageService.deletionsRecord)
}
}
describe("when a file is excluded") {
val s3ObjectsData = S3ObjectsData(Map(), Map())
val s3Client = new RecordingClient(testBucket, s3ObjectsData)
val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
it("is not uploaded") {
val expectedUploads = Map(
"root-file" -> rootRemoteKey
)
invokeSubject(s3Client, ConfigOption.Exclude("leaf") :: configOptions)
assertResult(expectedUploads)(s3Client.uploadsRecord)
invokeSubject(storageService, ConfigOption.Exclude("leaf") :: configOptions)
assertResult(expectedUploads)(storageService.uploadsRecord)
}
}
}
class RecordingClient(testBucket: Bucket,
s3ObjectsData: S3ObjectsData)
extends S3Client {
class RecordingStorageService(testBucket: Bucket,
s3ObjectsData: S3ObjectsData)
extends StorageService {
var uploadsRecord: Map[String, RemoteKey] = Map()
var copiesRecord: Map[RemoteKey, RemoteKey] = Map()
@ -160,30 +160,30 @@ class SyncSuite
override def upload(localFile: LocalFile,
bucket: Bucket,
progressListener: UploadProgressListener,
uploadEventListener: UploadEventListener,
tryCount: Int)
(implicit logger: Logger): IO[UploadS3Action] = {
(implicit logger: Logger): IO[UploadQueueEvent] = {
if (bucket == testBucket)
uploadsRecord += (localFile.relative.toString -> localFile.remoteKey)
IO.pure(UploadS3Action(localFile.remoteKey, localFile.hash))
IO.pure(UploadQueueEvent(localFile.remoteKey, localFile.hash))
}
override def copy(bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey
)(implicit logger: Logger): IO[CopyS3Action] = {
)(implicit logger: Logger): IO[CopyQueueEvent] = {
if (bucket == testBucket)
copiesRecord += (sourceKey -> targetKey)
IO.pure(CopyS3Action(targetKey))
IO.pure(CopyQueueEvent(targetKey))
}
override def delete(bucket: Bucket,
remoteKey: RemoteKey
)(implicit logger: Logger): IO[DeleteS3Action] = {
)(implicit logger: Logger): IO[DeleteQueueEvent] = {
if (bucket == testBucket)
deletionsRecord += remoteKey
IO.pure(DeleteS3Action(remoteKey))
IO.pure(DeleteQueueEvent(remoteKey))
}
}
}

View file

@ -0,0 +1,37 @@
package net.kemitix.thorp.domain
sealed trait StorageQueueEvent {
// the remote key that was uploaded, deleted or otherwise updated by the action
def remoteKey: RemoteKey
val order: Int
}
object StorageQueueEvent {
final case class DoNothingQueueEvent(remoteKey: RemoteKey) extends StorageQueueEvent {
override val order: Int = 0
}
final case class CopyQueueEvent(remoteKey: RemoteKey) extends StorageQueueEvent {
override val order: Int = 1
}
final case class UploadQueueEvent(remoteKey: RemoteKey,
md5Hash: MD5Hash) extends StorageQueueEvent {
override val order: Int = 2
}
final case class DeleteQueueEvent(remoteKey: RemoteKey) extends StorageQueueEvent {
override val order: Int = 3
}
final case class ErrorQueueEvent(remoteKey: RemoteKey, e: Throwable) extends StorageQueueEvent {
override val order: Int = 10
}
implicit def ord[A <: StorageQueueEvent]: Ordering[A] = Ordering.by(_.order)
}

View file

@ -1,7 +1,5 @@
package net.kemitix.thorp.domain
import scala.io.AnsiColor._
object Terminal {
private val esc = "\u001B"

View file

@ -1,4 +1,4 @@
package net.kemitix.thorp.aws.api
package net.kemitix.thorp.domain
sealed trait UploadEvent {
def name: String

View file

@ -1,10 +1,9 @@
package net.kemitix.thorp.aws.api
package net.kemitix.thorp.domain
import net.kemitix.thorp.aws.api.UploadEvent.RequestEvent
import net.kemitix.thorp.domain.LocalFile
import net.kemitix.thorp.domain.UploadEvent.RequestEvent
import net.kemitix.thorp.domain.UploadEventLogger.logRequestCycle
class UploadProgressListener(localFile: LocalFile)
extends UploadProgressLogging {
class UploadEventListener(localFile: LocalFile) {
var bytesTransferred = 0L

View file

@ -1,11 +1,10 @@
package net.kemitix.thorp.aws.api
package net.kemitix.thorp.domain
import net.kemitix.thorp.aws.api.UploadEvent.RequestEvent
import net.kemitix.thorp.domain.SizeTranslation.sizeInEnglish
import net.kemitix.thorp.domain.Terminal._
import net.kemitix.thorp.domain.{LocalFile, Terminal}
import net.kemitix.thorp.domain.UploadEvent.RequestEvent
trait UploadProgressLogging {
trait UploadEventLogger {
def logRequestCycle(localFile: LocalFile,
event: RequestEvent,
@ -22,3 +21,5 @@ trait UploadProgressLogging {
}
}
object UploadEventLogger extends UploadEventLogger

View file

@ -1,7 +1,5 @@
package net.kemitix.thorp.domain
import scala.io.AnsiColor._
import org.scalatest.FunSpec
class TerminalTest extends FunSpec {

View file

@ -1,10 +1,9 @@
package net.kemitix.thorp.aws.api
package net.kemitix.thorp.storage.api
import cats.effect.IO
import net.kemitix.thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action}
import net.kemitix.thorp.domain._
trait S3Client {
trait StorageService {
def listObjects(bucket: Bucket,
prefix: RemoteKey
@ -12,18 +11,18 @@ trait S3Client {
def upload(localFile: LocalFile,
bucket: Bucket,
uploadProgressListener: UploadProgressListener,
uploadEventListener: UploadEventListener,
tryCount: Int)
(implicit logger: Logger): IO[S3Action]
(implicit logger: Logger): IO[StorageQueueEvent]
def copy(bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey
)(implicit logger: Logger): IO[CopyS3Action]
)(implicit logger: Logger): IO[StorageQueueEvent]
def delete(bucket: Bucket,
remoteKey: RemoteKey
)(implicit logger: Logger): IO[DeleteS3Action]
)(implicit logger: Logger): IO[StorageQueueEvent]
}

View file

@ -1,11 +1,11 @@
package net.kemitix.thorp.aws.lib
package net.kemitix.thorp.storage.aws
import cats.effect.IO
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.CopyObjectRequest
import net.kemitix.thorp.aws.api.S3Action.CopyS3Action
import net.kemitix.thorp.aws.lib.S3ClientLogging.{logCopyFinish, logCopyStart}
import net.kemitix.thorp.domain.{Bucket, Logger, MD5Hash, RemoteKey}
import net.kemitix.thorp.domain.StorageQueueEvent.CopyQueueEvent
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.aws.S3ClientLogging.{logCopyStart, logCopyFinish}
class S3ClientCopier(amazonS3: AmazonS3) {
@ -13,12 +13,12 @@ class S3ClientCopier(amazonS3: AmazonS3) {
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey)
(implicit logger: Logger): IO[CopyS3Action] =
for {
_ <- logCopyStart(bucket, sourceKey, targetKey)
_ <- copyObject(bucket, sourceKey, hash, targetKey)
_ <- logCopyFinish(bucket, sourceKey,targetKey)
} yield CopyS3Action(targetKey)
(implicit logger: Logger): IO[StorageQueueEvent] =
for {
_ <- logCopyStart(bucket, sourceKey, targetKey)
_ <- copyObject(bucket, sourceKey, hash, targetKey)
_ <- logCopyFinish(bucket, sourceKey,targetKey)
} yield CopyQueueEvent(targetKey)
private def copyObject(bucket: Bucket,
sourceKey: RemoteKey,

View file

@ -1,22 +1,22 @@
package net.kemitix.thorp.aws.lib
package net.kemitix.thorp.storage.aws
import cats.effect.IO
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.DeleteObjectRequest
import net.kemitix.thorp.aws.api.S3Action.DeleteS3Action
import net.kemitix.thorp.aws.lib.S3ClientLogging.{logDeleteFinish, logDeleteStart}
import net.kemitix.thorp.domain.StorageQueueEvent.DeleteQueueEvent
import net.kemitix.thorp.domain.{Bucket, Logger, RemoteKey}
import net.kemitix.thorp.storage.aws.S3ClientLogging.{logDeleteStart, logDeleteFinish}
class S3ClientDeleter(amazonS3: AmazonS3) {
def delete(bucket: Bucket,
remoteKey: RemoteKey)
(implicit logger: Logger): IO[DeleteS3Action] =
(implicit logger: Logger): IO[DeleteQueueEvent] =
for {
_ <- logDeleteStart(bucket, remoteKey)
_ <- deleteObject(bucket, remoteKey)
_ <- logDeleteFinish(bucket, remoteKey)
} yield DeleteS3Action(remoteKey)
} yield DeleteQueueEvent(remoteKey)
private def deleteObject(bucket: Bucket, remoteKey: RemoteKey) =
IO(amazonS3.deleteObject(new DeleteObjectRequest(bucket.name, remoteKey.key)))

View file

@ -1,4 +1,4 @@
package net.kemitix.thorp.aws.lib
package net.kemitix.thorp.storage.aws
import cats.effect.IO
import net.kemitix.thorp.domain.{Bucket, Logger, RemoteKey}

View file

@ -1,13 +1,13 @@
package net.kemitix.thorp.aws.lib
package net.kemitix.thorp.storage.aws
import cats.effect.IO
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary}
import net.kemitix.thorp.aws.lib.S3ClientLogging.{logListObjectsFinish, logListObjectsStart}
import net.kemitix.thorp.aws.lib.S3ObjectsByHash.byHash
import net.kemitix.thorp.aws.lib.S3ObjectsByKey.byKey
import net.kemitix.thorp.domain
import net.kemitix.thorp.domain.{Bucket, Logger, RemoteKey, S3ObjectsData}
import net.kemitix.thorp.storage.aws.S3ClientLogging.{logListObjectsStart, logListObjectsFinish}
import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash
import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey
import scala.collection.JavaConverters._

View file

@ -1,4 +1,4 @@
package net.kemitix.thorp.aws.lib
package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.model.S3ObjectSummary
import net.kemitix.thorp.domain.{KeyModified, LastModified, MD5Hash, RemoteKey}

View file

@ -1,4 +1,4 @@
package net.kemitix.thorp.aws.lib
package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.model.S3ObjectSummary
import net.kemitix.thorp.domain.{HashModified, LastModified, MD5Hash, RemoteKey}

View file

@ -1,15 +1,14 @@
package net.kemitix.thorp.aws.lib
package net.kemitix.thorp.storage.aws
import cats.effect.IO
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.transfer.TransferManager
import net.kemitix.thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action}
import net.kemitix.thorp.aws.api.{S3Action, S3Client, UploadProgressListener}
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.StorageService
class ThorpS3Client(amazonS3Client: => AmazonS3,
amazonS3TransferManager: => TransferManager)
extends S3Client {
class S3StorageService(amazonS3Client: => AmazonS3,
amazonS3TransferManager: => TransferManager)
extends StorageService {
lazy val objectLister = new S3ClientObjectLister(amazonS3Client)
lazy val copier = new S3ClientCopier(amazonS3Client)
@ -25,19 +24,19 @@ class ThorpS3Client(amazonS3Client: => AmazonS3,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey)
(implicit logger: Logger): IO[CopyS3Action] =
(implicit logger: Logger): IO[StorageQueueEvent] =
copier.copy(bucket, sourceKey,hash, targetKey)
override def upload(localFile: LocalFile,
bucket: Bucket,
progressListener: UploadProgressListener,
uploadEventListener: UploadEventListener,
tryCount: Int)
(implicit logger: Logger): IO[S3Action] =
uploader.upload(localFile, bucket, progressListener, 1)
(implicit logger: Logger): IO[StorageQueueEvent] =
uploader.upload(localFile, bucket, uploadEventListener, 1)
override def delete(bucket: Bucket,
remoteKey: RemoteKey)
(implicit logger: Logger): IO[DeleteS3Action] =
(implicit logger: Logger): IO[StorageQueueEvent] =
deleter.delete(bucket, remoteKey)
}

View file

@ -0,0 +1,16 @@
package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.transfer.{TransferManager, TransferManagerBuilder}
import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder}
import net.kemitix.thorp.storage.api.StorageService
object S3StorageServiceBuilder {
def createService(amazonS3Client: AmazonS3,
amazonS3TransferManager: TransferManager): StorageService =
new S3StorageService(amazonS3Client, amazonS3TransferManager)
def defaultStorageService: StorageService =
createService(AmazonS3ClientBuilder.defaultClient, TransferManagerBuilder.defaultTransferManager)
}

View file

@ -1,15 +1,14 @@
package net.kemitix.thorp.aws.lib
package net.kemitix.thorp.storage.aws
import cats.effect.IO
import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener}
import com.amazonaws.services.s3.model.{ObjectMetadata, PutObjectRequest}
import com.amazonaws.services.s3.transfer.model.UploadResult
import com.amazonaws.services.s3.transfer.{TransferManager => AmazonTransferManager}
import net.kemitix.thorp.aws.api.S3Action.{ErroredS3Action, UploadS3Action}
import net.kemitix.thorp.aws.api.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent}
import net.kemitix.thorp.aws.api.{S3Action, UploadProgressListener}
import net.kemitix.thorp.aws.lib.UploaderLogging.{logMultiPartUploadFinished, logMultiPartUploadStart}
import net.kemitix.thorp.domain._
import net.kemitix.thorp.domain.StorageQueueEvent.{ErrorQueueEvent, UploadQueueEvent}
import net.kemitix.thorp.domain.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent}
import net.kemitix.thorp.domain.{StorageQueueEvent, _}
import net.kemitix.thorp.storage.aws.UploaderLogging.{logMultiPartUploadStart, logMultiPartUploadFinished}
import scala.util.Try
@ -17,24 +16,24 @@ class Uploader(transferManager: => AmazonTransferManager) {
def upload(localFile: LocalFile,
bucket: Bucket,
uploadProgressListener: UploadProgressListener,
uploadEventListener: UploadEventListener,
tryCount: Int)
(implicit logger: Logger): IO[S3Action] =
(implicit logger: Logger): IO[StorageQueueEvent] =
for {
_ <- logMultiPartUploadStart(localFile, tryCount)
upload <- transfer(localFile, bucket, uploadProgressListener)
upload <- transfer(localFile, bucket, uploadEventListener)
action = upload match {
case Right(r) => UploadS3Action(RemoteKey(r.getKey), MD5Hash(r.getETag))
case Left(e) => ErroredS3Action(localFile.remoteKey, e)
case Right(r) => UploadQueueEvent(RemoteKey(r.getKey), MD5Hash(r.getETag))
case Left(e) => ErrorQueueEvent(localFile.remoteKey, e)
}
_ <- logMultiPartUploadFinished(localFile)
} yield action
private def transfer(localFile: LocalFile,
bucket: Bucket,
uploadProgressListener: UploadProgressListener,
uploadEventListener: UploadEventListener,
): IO[Either[Throwable, UploadResult]] = {
val listener: ProgressListener = progressListener(uploadProgressListener)
val listener: ProgressListener = progressListener(uploadEventListener)
val putObjectRequest = request(localFile, bucket, listener)
IO {
Try(transferManager.upload(putObjectRequest))
@ -51,10 +50,10 @@ class Uploader(transferManager: => AmazonTransferManager) {
.withGeneralProgressListener(listener)
}
private def progressListener(uploadProgressListener: UploadProgressListener) =
private def progressListener(uploadEventListener: UploadEventListener) =
new ProgressListener {
override def progressChanged(progressEvent: ProgressEvent): Unit = {
uploadProgressListener.listener(eventHandler(progressEvent))
uploadEventListener.listener(eventHandler(progressEvent))
}
private def eventHandler(progressEvent: ProgressEvent) = {

View file

@ -1,4 +1,4 @@
package net.kemitix.thorp.aws.lib
package net.kemitix.thorp.storage.aws
import cats.effect.IO
import net.kemitix.thorp.domain.SizeTranslation.sizeInEnglish

View file

@ -1,4 +1,4 @@
package net.kemitix.thorp.aws.lib
package net.kemitix.thorp.storage.aws
import cats.effect.IO
import net.kemitix.thorp.domain.Logger

View file

@ -1,4 +1,4 @@
package net.kemitix.thorp.aws.lib
package net.kemitix.thorp.storage.aws
import net.kemitix.thorp.domain.MD5Hash

View file

@ -1,4 +1,4 @@
package net.kemitix.thorp.aws.lib
package net.kemitix.thorp.storage.aws
import java.time.Instant
import java.time.temporal.ChronoUnit

View file

@ -1,4 +1,4 @@
package net.kemitix.thorp.aws.lib
package net.kemitix.thorp.storage.aws
import java.time.Instant
import java.time.temporal.ChronoUnit
@ -12,7 +12,7 @@ import net.kemitix.thorp.domain._
import org.scalamock.scalatest.MockFactory
import org.scalatest.FunSpec
class ThorpS3ClientSuite
class S3StorageServiceSuite
extends FunSpec
with MockFactory {
@ -47,7 +47,7 @@ class ThorpS3ClientSuite
val amazonS3 = stub[AmazonS3]
val amazonS3TransferManager = stub[TransferManager]
val s3Client = new ThorpS3Client(amazonS3, amazonS3TransferManager)
val storageService = new S3StorageService(amazonS3, amazonS3TransferManager)
val myFakeResponse = new ListObjectsV2Result()
val summaries = myFakeResponse.getObjectSummaries
@ -65,7 +65,7 @@ class ThorpS3ClientSuite
k1a -> HashModified(h1, lm),
k1b -> HashModified(h1, lm),
k2 -> HashModified(h2, lm)))
val result = s3Client.listObjects(Bucket("bucket"), RemoteKey("prefix")).unsafeRunSync
val result = storageService.listObjects(Bucket("bucket"), RemoteKey("prefix")).unsafeRunSync
assertResult(expected.byHash.keys)(result.byHash.keys)
assertResult(expected.byKey.keys)(result.byKey.keys)
assertResult(expected)(result)

View file

@ -1,4 +1,4 @@
package net.kemitix.thorp.aws.lib
package net.kemitix.thorp.storage.aws
import java.time.Instant
@ -6,15 +6,15 @@ import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.PutObjectRequest
import com.amazonaws.services.s3.transfer.model.UploadResult
import com.amazonaws.services.s3.transfer.{TransferManager, Upload}
import net.kemitix.thorp.aws.api.S3Action.UploadS3Action
import net.kemitix.thorp.aws.api.{S3Client, UploadProgressListener}
import net.kemitix.thorp.aws.lib.MD5HashData.rootHash
import net.kemitix.thorp.core.{KeyGenerator, Resource, S3MetaDataEnricher}
import net.kemitix.thorp.core.MD5HashData.rootHash
import net.kemitix.thorp.domain.StorageQueueEvent.UploadQueueEvent
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.StorageService
import org.scalamock.scalatest.MockFactory
import org.scalatest.FunSpec
class S3ClientSuite
class StorageServiceSuite
extends FunSpec
with MockFactory {
@ -42,40 +42,40 @@ class S3ClientSuite
keyotherkey.remoteKey -> HashModified(hash, lastModified),
keydiffhash.remoteKey -> HashModified(diffhash, lastModified)))
def invoke(self: S3Client, localFile: LocalFile) = {
def invoke(self: StorageService, localFile: LocalFile) = {
S3MetaDataEnricher.getS3Status(localFile, s3ObjectsData)
}
describe("when remote key exists") {
val s3Client = S3ClientBuilder.defaultClient
val storageService = S3StorageServiceBuilder.defaultStorageService
it("should return (Some, Set.nonEmpty)") {
assertResult(
(Some(HashModified(hash, lastModified)),
Set(
KeyModified(key, lastModified),
KeyModified(keyotherkey.remoteKey, lastModified)))
)(invoke(s3Client, localFile))
)(invoke(storageService, localFile))
}
}
describe("when remote key does not exist and no others matches hash") {
val s3Client = S3ClientBuilder.defaultClient
val storageService = S3StorageServiceBuilder.defaultStorageService
it("should return (None, Set.empty)") {
val localFile = LocalFile.resolve("missing-file", MD5Hash("unique"), source, fileToKey)
assertResult(
(None,
Set.empty)
)(invoke(s3Client, localFile))
)(invoke(storageService, localFile))
}
}
describe("when remote key exists and no others match hash") {
val s3Client = S3ClientBuilder.defaultClient
val storageService = S3StorageServiceBuilder.defaultStorageService
it("should return (None, Set.nonEmpty)") {
assertResult(
(Some(HashModified(diffhash, lastModified)),
Set(KeyModified(keydiffhash.remoteKey, lastModified)))
)(invoke(s3Client, keydiffhash))
)(invoke(storageService, keydiffhash))
}
}
@ -86,14 +86,14 @@ class S3ClientSuite
describe("when uploading a file") {
val amazonS3 = stub[AmazonS3]
val amazonS3TransferManager = stub[TransferManager]
val s3Client = new ThorpS3Client(amazonS3, amazonS3TransferManager)
val storageService = new S3StorageService(amazonS3, amazonS3TransferManager)
val prefix = RemoteKey("prefix")
val localFile =
LocalFile.resolve("root-file", rootHash, source, KeyGenerator.generateKey(source, prefix))
val bucket = Bucket("a-bucket")
val remoteKey = RemoteKey("prefix/root-file")
val progressListener = new UploadProgressListener(localFile)
val uploadEventListener = new UploadEventListener(localFile)
val upload = stub[Upload]
(amazonS3TransferManager upload (_: PutObjectRequest)).when(*).returns(upload)
@ -105,8 +105,8 @@ class S3ClientSuite
it("should return hash of uploaded file") {
pending
//FIXME: works okay on its own, but fails when run with others
val expected = UploadS3Action(remoteKey, rootHash)
val result = s3Client.upload(localFile, bucket, progressListener, 1)
val expected = UploadQueueEvent(remoteKey, rootHash)
val result = storageService.upload(localFile, bucket, uploadEventListener, 1)
assertResult(expected)(result)
}
}

View file

@ -1,14 +1,13 @@
package net.kemitix.thorp.aws.lib
package net.kemitix.thorp.storage.aws
import java.time.Instant
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.transfer._
import net.kemitix.thorp.aws.api.S3Action.UploadS3Action
import net.kemitix.thorp.aws.api.UploadProgressListener
import net.kemitix.thorp.core.KeyGenerator.generateKey
import net.kemitix.thorp.core.Resource
import net.kemitix.thorp.domain._
import net.kemitix.thorp.domain.{UploadEventListener, _}
import net.kemitix.thorp.domain.StorageQueueEvent.UploadQueueEvent
import org.scalamock.scalatest.MockFactory
import org.scalatest.FunSpec
@ -33,13 +32,13 @@ class UploaderSuite
val returnedKey = RemoteKey("returned-key")
val returnedHash = MD5Hash("returned-hash")
val bigFile = LocalFile.resolve("small-file", MD5Hash("the-hash"), source, fileToKey)
val progressListener = new UploadProgressListener(bigFile)
val uploadEventListener = new UploadEventListener(bigFile)
val amazonS3 = mock[AmazonS3]
val amazonS3TransferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build
val uploader = new Uploader(amazonS3TransferManager)
it("should upload") {
val expected = UploadS3Action(returnedKey, returnedHash)
val result = uploader.upload(bigFile, config.bucket, progressListener, 1)
val expected = UploadQueueEvent(returnedKey, returnedHash)
val result = uploader.upload(bigFile, config.bucket, uploadEventListener, 1)
assertResult(expected)(result)
}
}