Add a batch mode that provides a simple log output (#85)

* [changelog] Updated

* [readme] Updated

* [domain] Config Add batch-mode flag

* [core] ConfigOption Add BatchMode option

* [core] ConfigQuery Add batchMode query

Also replaced verbose exists case clauses with a simple contains.

* [core] ConfigOptions added to replace Seq[ConfigOption]

* [core] Syncronise rename method to createPlan

* [cli] Program rename apply as run

* [storage-aws] S3StorageServiceBuilder stop using IO to create object

* [storage-aws] S3StorageServiceBuilder make default service lazy

* [storage-aws] Rename S3ClientCopier => Copier

* [storage-aws] Rename S3ClientDeleter => Deleter

* [storage-aws] Rename S3ClientObjectLister => Lister

* [storage-aws] Only attach upload listener when in batch mode

Only detects batch mode when selected as a command line option

* [core] Synchronise use leftMap rather than swap.map.swap

* [cli] ParseArgs add `-B` and `--batch` options to enable batch mode

* [core] ThorpArchive logs file uploaded when in batch mode
This commit is contained in:
Paul Campbell 2019-07-02 08:43:52 +01:00 committed by GitHub
parent 1440990d79
commit 1267b6e313
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
28 changed files with 143 additions and 91 deletions

View file

@ -10,6 +10,7 @@ The format is based on [[https://keepachangelog.com/en/1.0.0/][Keep a Changelog]
** Added ** Added
- Add a version command-line option (#99) - Add a version command-line option (#99)
- Add a batch mode (#85)
* [0.6.0] - 2019-06-30 * [0.6.0] - 2019-06-30

View file

@ -16,6 +16,8 @@ hash of the file contents.
thorp thorp
Usage: thorp [options] Usage: thorp [options]
-V, --version Display the version and quit
-B, --batch Enabled batch-mode
-s, --source <value> Source directory to sync to S3 -s, --source <value> Source directory to sync to S3
-b, --bucket <value> S3 bucket name -b, --bucket <value> S3 bucket name
-p, --prefix <value> Prefix within the S3 Bucket -p, --prefix <value> Prefix within the S3 Bucket
@ -30,6 +32,11 @@ If you don't provide a ~source~ the current diretory will be used.
The ~--include~ and ~--exclude~ parameters can be used more than once. The ~--include~ and ~--exclude~ parameters can be used more than once.
** Batch mode
Batch mode disable the ANSI console display and logs simple messages
that can be written to a file.
* Configuration * Configuration
Configuration will be read from these files: Configuration will be read from these files:

View file

@ -8,7 +8,7 @@ object Main extends IOApp {
override def run(args: List[String]): IO[ExitCode] = { override def run(args: List[String]): IO[ExitCode] = {
val exitCaseLogger = new PrintLogger(false) val exitCaseLogger = new PrintLogger(false)
ParseArgs(args) ParseArgs(args)
.map(Program(_)) .map(Program.run)
.getOrElse(IO(ExitCode.Error)) .getOrElse(IO(ExitCode.Error))
.guaranteeCase { .guaranteeCase {
case Canceled => exitCaseLogger.warn("Interrupted") case Canceled => exitCaseLogger.warn("Interrupted")

View file

@ -2,7 +2,7 @@ package net.kemitix.thorp.cli
import java.nio.file.Paths import java.nio.file.Paths
import net.kemitix.thorp.core.ConfigOption import net.kemitix.thorp.core.{ConfigOption, ConfigOptions}
import scopt.OParser import scopt.OParser
object ParseArgs { object ParseArgs {
@ -16,6 +16,9 @@ object ParseArgs {
opt[Unit]('V', "version") opt[Unit]('V', "version")
.action((_, cos) => ConfigOption.Version :: cos) .action((_, cos) => ConfigOption.Version :: cos)
.text("Show version"), .text("Show version"),
opt[Unit]('B', "batch")
.action((_, cos) => ConfigOption.BatchMode :: cos)
.text("Enable batch-mode"),
opt[String]('s', "source") opt[String]('s', "source")
.action((str, cos) => ConfigOption.Source(Paths.get(str)) :: cos) .action((str, cos) => ConfigOption.Source(Paths.get(str)) :: cos)
.text("Source directory to sync to destination"), .text("Source directory to sync to destination"),
@ -45,7 +48,8 @@ object ParseArgs {
) )
} }
def apply(args: List[String]): Option[List[ConfigOption]] = def apply(args: List[String]): Option[ConfigOptions] =
OParser.parse(configParser, args, List()) OParser.parse(configParser, args, List())
.map(ConfigOptions)
} }

View file

@ -9,20 +9,20 @@ import net.kemitix.thorp.storage.aws.S3StorageServiceBuilder.defaultStorageServi
trait Program { trait Program {
def apply(cliOptions: Seq[ConfigOption]): IO[ExitCode] = { def run(cliOptions: ConfigOptions): IO[ExitCode] = {
implicit val logger: Logger = new PrintLogger() implicit val logger: Logger = new PrintLogger()
if (ConfigQuery.showVersion(cliOptions)) IO { if (ConfigQuery.showVersion(cliOptions)) IO {
println(s"Thorp v${thorp.BuildInfo.version}") println(s"Thorp v${thorp.BuildInfo.version}")
ExitCode.Success ExitCode.Success
} else } else {
for { for {
storageService <- defaultStorageService actions <- Synchronise.createPlan(defaultStorageService, defaultHashService, cliOptions).valueOrF(handleErrors)
actions <- Synchronise(storageService, defaultHashService, cliOptions).valueOrF(handleErrors) events <- handleActions(UnversionedMirrorArchive.default(defaultStorageService, ConfigQuery.batchMode(cliOptions)), actions)
events <- handleActions(UnversionedMirrorArchive.default(storageService), actions) _ <- defaultStorageService.shutdown
_ <- storageService.shutdown
_ <- SyncLogging.logRunFinished(events) _ <- SyncLogging.logRunFinished(events)
} yield ExitCode.Success } yield ExitCode.Success
} }
}
private def handleErrors(implicit logger: Logger): List[String] => IO[Stream[Action]] = { private def handleErrors(implicit logger: Logger): List[String] => IO[Stream[Action]] = {
errors => { errors => {
@ -34,7 +34,8 @@ trait Program {
} }
private def handleActions(archive: ThorpArchive, private def handleActions(archive: ThorpArchive,
actions: Stream[Action]): IO[Stream[StorageQueueEvent]] = actions: Stream[Action])
(implicit l: Logger): IO[Stream[StorageQueueEvent]] =
actions.foldLeft(Stream[IO[StorageQueueEvent]]()) { actions.foldLeft(Stream[IO[StorageQueueEvent]]()) {
(stream, action) => archive.update(action) ++ stream (stream, action) => archive.update(action) ++ stream
}.sequence }.sequence

View file

@ -1,7 +1,7 @@
package net.kemitix.thorp.cli package net.kemitix.thorp.cli
import net.kemitix.thorp.core.ConfigOption.Debug import net.kemitix.thorp.core.ConfigOption.Debug
import net.kemitix.thorp.core.{ConfigOption, Resource} import net.kemitix.thorp.core.{ConfigOptions, Resource}
import org.scalatest.FunSpec import org.scalatest.FunSpec
import scala.util.Try import scala.util.Try
@ -26,11 +26,11 @@ class ParseArgsTest extends FunSpec {
} }
describe("parse - debug") { describe("parse - debug") {
def invokeWithArgument(arg: String): List[ConfigOption] = { def invokeWithArgument(arg: String): ConfigOptions = {
val strings = List("--source", pathTo("."), "--bucket", "bucket", arg) val strings = List("--source", pathTo("."), "--bucket", "bucket", arg)
.filter(_ != "") .filter(_ != "")
val maybeOptions = ParseArgs(strings) val maybeOptions = ParseArgs(strings)
maybeOptions.getOrElse(List()) maybeOptions.getOrElse(ConfigOptions())
} }
describe("when no debug flag") { describe("when no debug flag") {

View file

@ -13,6 +13,9 @@ object ConfigOption {
case object Version extends ConfigOption { case object Version extends ConfigOption {
override def update(config: Config): Config = config override def update(config: Config): Config = config
} }
case object BatchMode extends ConfigOption {
override def update(config: Config): Config = config.copy(batchMode = true)
}
case class Source(path: Path) extends ConfigOption { case class Source(path: Path) extends ConfigOption {
override def update(config: Config): Config = config.copy(source = path.toFile) override def update(config: Config): Config = config.copy(source = path.toFile)
} }

View file

@ -0,0 +1,20 @@
package net.kemitix.thorp.core
import cats.Semigroup
case class ConfigOptions(options: List[ConfigOption] = List())
extends Semigroup[ConfigOptions] {
override def combine(x: ConfigOptions, y: ConfigOptions): ConfigOptions =
x ++ y
def ++(other: ConfigOptions): ConfigOptions =
ConfigOptions(options ++ other.options)
def ::(head: ConfigOption): ConfigOptions =
ConfigOptions(head :: options)
def contains[A1 >: ConfigOption](elem: A1): Boolean =
options contains elem
}

View file

@ -2,23 +2,17 @@ package net.kemitix.thorp.core
trait ConfigQuery { trait ConfigQuery {
def showVersion(configOptions: Seq[ConfigOption]): Boolean = def showVersion(configOptions: ConfigOptions): Boolean =
configOptions.exists { configOptions contains ConfigOption.Version
case ConfigOption.Version => true
case _ => false
}
def ignoreUserOptions(configOptions: Seq[ConfigOption]): Boolean = def batchMode(configOptions: ConfigOptions): Boolean =
configOptions.exists { configOptions contains ConfigOption.BatchMode
case ConfigOption.IgnoreUserOptions => true
case _ => false
}
def ignoreGlobalOptions(configOptions: Seq[ConfigOption]): Boolean = def ignoreUserOptions(configOptions: ConfigOptions): Boolean =
configOptions.exists { configOptions contains ConfigOption.IgnoreUserOptions
case ConfigOption.IgnoreGlobalOptions => true
case _ => false def ignoreGlobalOptions(configOptions: ConfigOptions): Boolean =
} configOptions contains ConfigOption.IgnoreGlobalOptions
} }

View file

@ -19,7 +19,7 @@ trait ConfigurationBuilder {
private val defaultConfig: Config = Config(source = pwdFile) private val defaultConfig: Config = Config(source = pwdFile)
def buildConfig(priorityOptions: Seq[ConfigOption]): IO[Either[NonEmptyChain[ConfigValidation], Config]] = { def buildConfig(priorityOptions: ConfigOptions): IO[Either[NonEmptyChain[ConfigValidation], Config]] = {
val source = findSource(priorityOptions) val source = findSource(priorityOptions)
for { for {
sourceOptions <- sourceOptions(source) sourceOptions <- sourceOptions(source)
@ -30,30 +30,30 @@ trait ConfigurationBuilder {
} yield validateConfig(config).toEither } yield validateConfig(config).toEither
} }
private def findSource(priorityOptions: Seq[ConfigOption]): File = private def findSource(priorityOptions: ConfigOptions): File =
priorityOptions.foldRight(pwdFile)((co, f) => co match { priorityOptions.options.foldRight(pwdFile)((co, f) => co match {
case ConfigOption.Source(source) => source.toFile case ConfigOption.Source(source) => source.toFile
case _ => f case _ => f
}) })
private def sourceOptions(source: File): IO[Seq[ConfigOption]] = private def sourceOptions(source: File): IO[ConfigOptions] =
readFile(source, ".thorp.conf") readFile(source, ".thorp.conf")
private def userOptions(higherPriorityOptions: Seq[ConfigOption]): IO[Seq[ConfigOption]] = private def userOptions(higherPriorityOptions: ConfigOptions): IO[ConfigOptions] =
if (ConfigQuery.ignoreUserOptions(higherPriorityOptions)) IO(List()) if (ConfigQuery.ignoreUserOptions(higherPriorityOptions)) IO(ConfigOptions())
else readFile(userHome, ".config/thorp.conf") else readFile(userHome, ".config/thorp.conf")
private def globalOptions(higherPriorityOptions: Seq[ConfigOption]): IO[Seq[ConfigOption]] = private def globalOptions(higherPriorityOptions: ConfigOptions): IO[ConfigOptions] =
if (ConfigQuery.ignoreGlobalOptions(higherPriorityOptions)) IO(List()) if (ConfigQuery.ignoreGlobalOptions(higherPriorityOptions)) IO(ConfigOptions())
else parseFile(Paths.get("/etc/thorp.conf")) else parseFile(Paths.get("/etc/thorp.conf"))
private def userHome = new File(System.getProperty("user.home")) private def userHome = new File(System.getProperty("user.home"))
private def readFile(source: File, filename: String): IO[Seq[ConfigOption]] = private def readFile(source: File, filename: String): IO[ConfigOptions] =
parseFile(source.toPath.resolve(filename)) parseFile(source.toPath.resolve(filename))
private def collateOptions(configOptions: Seq[ConfigOption]): Config = private def collateOptions(configOptions: ConfigOptions): Config =
configOptions.foldRight(defaultConfig)((co, c) => co.update(c)) configOptions.options.foldRight(defaultConfig)((co, c) => co.update(c))
} }
object ConfigurationBuilder extends ConfigurationBuilder object ConfigurationBuilder extends ConfigurationBuilder

View file

@ -8,7 +8,7 @@ import scala.collection.JavaConverters._
trait ParseConfigFile { trait ParseConfigFile {
def parseFile(filename: Path): IO[Seq[ConfigOption]] = def parseFile(filename: Path): IO[ConfigOptions] =
readFile(filename).map(ParseConfigLines.parseLines) readFile(filename).map(ParseConfigLines.parseLines)
private def readFile(filename: Path) = { private def readFile(filename: Path) = {

View file

@ -7,8 +7,8 @@ import net.kemitix.thorp.core.ConfigOption._
trait ParseConfigLines { trait ParseConfigLines {
def parseLines(lines: List[String]): List[ConfigOption] = def parseLines(lines: List[String]): ConfigOptions =
lines.flatMap(parseLine) ConfigOptions(lines.flatMap(parseLine))
private val pattern = "^\\s*(?<key>\\S*)\\s*=\\s*(?<value>\\S*)\\s*$" private val pattern = "^\\s*(?<key>\\S*)\\s*=\\s*(?<value>\\S*)\\s*$"
private val format = Pattern.compile(pattern) private val format = Pattern.compile(pattern)

View file

@ -9,12 +9,12 @@ import net.kemitix.thorp.storage.api.{HashService, StorageService}
trait Synchronise { trait Synchronise {
def apply(storageService: StorageService, def createPlan(storageService: StorageService,
hashService: HashService, hashService: HashService,
configOptions: Seq[ConfigOption]) configOptions: ConfigOptions)
(implicit l: Logger): EitherT[IO, List[String], Stream[Action]] = (implicit l: Logger): EitherT[IO, List[String], Stream[Action]] =
EitherT(ConfigurationBuilder.buildConfig(configOptions)) EitherT(ConfigurationBuilder.buildConfig(configOptions))
.swap.map(errorMessages).swap .leftMap(errorMessages)
.flatMap(config => useValidConfig(storageService, hashService)(config, l)) .flatMap(config => useValidConfig(storageService, hashService)(config, l))
def errorMessages(errors: NonEmptyChain[ConfigValidation]): List[String] = def errorMessages(errors: NonEmptyChain[ConfigValidation]): List[String] =

View file

@ -1,10 +1,15 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import cats.effect.IO import cats.effect.IO
import net.kemitix.thorp.domain.StorageQueueEvent import net.kemitix.thorp.domain.{LocalFile, Logger, StorageQueueEvent}
trait ThorpArchive { trait ThorpArchive {
def update(action: Action): Stream[IO[StorageQueueEvent]] def update(action: Action)(implicit l: Logger): Stream[IO[StorageQueueEvent]]
def fileUploaded(localFile: LocalFile,
batchMode: Boolean)
(implicit l: Logger): IO[Unit] =
if (batchMode) l.info(s"Uploaded: ${localFile.remoteKey.key}") else IO.unit
} }

View file

@ -3,16 +3,19 @@ package net.kemitix.thorp.core
import cats.effect.IO import cats.effect.IO
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent
import net.kemitix.thorp.domain.{StorageQueueEvent, UploadEventListener} import net.kemitix.thorp.domain.{LocalFile, Logger, StorageQueueEvent, UploadEventListener}
import net.kemitix.thorp.storage.api.StorageService import net.kemitix.thorp.storage.api.StorageService
case class UnversionedMirrorArchive(storageService: StorageService) extends ThorpArchive { case class UnversionedMirrorArchive(storageService: StorageService,
override def update(action: Action): Stream[IO[StorageQueueEvent]] = batchMode: Boolean) extends ThorpArchive {
override def update(action: Action)
(implicit l: Logger): Stream[IO[StorageQueueEvent]] =
Stream( Stream(
action match { action match {
case ToUpload(bucket, localFile) => case ToUpload(bucket, localFile) =>
for { for {
event <- storageService.upload(localFile, bucket, new UploadEventListener(localFile), 1) event <- storageService.upload(localFile, bucket, batchMode, new UploadEventListener(localFile), 1)
_ <- fileUploaded(localFile, batchMode)
} yield event } yield event
case ToCopy(bucket, sourceKey, hash, targetKey) => case ToCopy(bucket, sourceKey, hash, targetKey) =>
for { for {
@ -29,6 +32,7 @@ case class UnversionedMirrorArchive(storageService: StorageService) extends Thor
} }
object UnversionedMirrorArchive { object UnversionedMirrorArchive {
def default(storageService: StorageService): ThorpArchive = def default(storageService: StorageService,
new UnversionedMirrorArchive(storageService) batchMode: Boolean): ThorpArchive =
new UnversionedMirrorArchive(storageService, batchMode)
} }

View file

@ -7,7 +7,7 @@ import org.scalatest.FunSpec
class ParseConfigFileTest extends FunSpec { class ParseConfigFileTest extends FunSpec {
private def invoke(filename: Path) = ParseConfigFile.parseFile(filename).unsafeRunSync private def invoke(filename: Path) = ParseConfigFile.parseFile(filename).unsafeRunSync
private val empty = List() private val empty = ConfigOptions()
describe("parse a missing file") { describe("parse a missing file") {
val filename = Paths.get("/path/to/missing/file") val filename = Paths.get("/path/to/missing/file")
@ -29,7 +29,9 @@ class ParseConfigFileTest extends FunSpec {
} }
describe("parse a file with properties") { describe("parse a file with properties") {
val filename = Resource(this, "simple-config").toPath val filename = Resource(this, "simple-config").toPath
val expected = List(ConfigOption.Source(Paths.get("/path/to/source")), ConfigOption.Bucket("bucket-name")) val expected = ConfigOptions(List(
ConfigOption.Source(Paths.get("/path/to/source")),
ConfigOption.Bucket("bucket-name")))
it("should return some options") { it("should return some options") {
assertResult(expected)(invoke(filename)) assertResult(expected)(invoke(filename))
} }

View file

@ -9,63 +9,63 @@ class ParseConfigLinesTest extends FunSpec {
describe("parse single lines") { describe("parse single lines") {
describe("source") { describe("source") {
it("should parse") { it("should parse") {
val expected = List(ConfigOption.Source(Paths.get("/path/to/source"))) val expected = ConfigOptions(List(ConfigOption.Source(Paths.get("/path/to/source"))))
val result = ParseConfigLines.parseLines(List("source = /path/to/source")) val result = ParseConfigLines.parseLines(List("source = /path/to/source"))
assertResult(expected)(result) assertResult(expected)(result)
} }
} }
describe("bucket") { describe("bucket") {
it("should parse") { it("should parse") {
val expected = List(ConfigOption.Bucket("bucket-name")) val expected = ConfigOptions(List(ConfigOption.Bucket("bucket-name")))
val result = ParseConfigLines.parseLines(List("bucket = bucket-name")) val result = ParseConfigLines.parseLines(List("bucket = bucket-name"))
assertResult(expected)(result) assertResult(expected)(result)
} }
} }
describe("prefix") { describe("prefix") {
it("should parse") { it("should parse") {
val expected = List(ConfigOption.Prefix("prefix/to/files")) val expected = ConfigOptions(List(ConfigOption.Prefix("prefix/to/files")))
val result = ParseConfigLines.parseLines(List("prefix = prefix/to/files")) val result = ParseConfigLines.parseLines(List("prefix = prefix/to/files"))
assertResult(expected)(result) assertResult(expected)(result)
} }
} }
describe("include") { describe("include") {
it("should parse") { it("should parse") {
val expected = List(ConfigOption.Include("path/to/include")) val expected = ConfigOptions(List(ConfigOption.Include("path/to/include")))
val result = ParseConfigLines.parseLines(List("include = path/to/include")) val result = ParseConfigLines.parseLines(List("include = path/to/include"))
assertResult(expected)(result) assertResult(expected)(result)
} }
} }
describe("exclude") { describe("exclude") {
it("should parse") { it("should parse") {
val expected = List(ConfigOption.Exclude("path/to/exclude")) val expected = ConfigOptions(List(ConfigOption.Exclude("path/to/exclude")))
val result = ParseConfigLines.parseLines(List("exclude = path/to/exclude")) val result = ParseConfigLines.parseLines(List("exclude = path/to/exclude"))
assertResult(expected)(result) assertResult(expected)(result)
} }
} }
describe("debug - true") { describe("debug - true") {
it("should parse") { it("should parse") {
val expected = List(ConfigOption.Debug()) val expected = ConfigOptions(List(ConfigOption.Debug()))
val result = ParseConfigLines.parseLines(List("debug = true")) val result = ParseConfigLines.parseLines(List("debug = true"))
assertResult(expected)(result) assertResult(expected)(result)
} }
} }
describe("debug - false") { describe("debug - false") {
it("should parse") { it("should parse") {
val expected = List() val expected = ConfigOptions()
val result = ParseConfigLines.parseLines(List("debug = false")) val result = ParseConfigLines.parseLines(List("debug = false"))
assertResult(expected)(result) assertResult(expected)(result)
} }
} }
describe("comment line") { describe("comment line") {
it("should be ignored") { it("should be ignored") {
val expected = List() val expected = ConfigOptions()
val result = ParseConfigLines.parseLines(List("# ignore me")) val result = ParseConfigLines.parseLines(List("# ignore me"))
assertResult(expected)(result) assertResult(expected)(result)
} }
} }
describe("unrecognised option") { describe("unrecognised option") {
it("should be ignored") { it("should be ignored") {
val expected = List() val expected = ConfigOptions()
val result = ParseConfigLines.parseLines(List("unsupported = option")) val result = ParseConfigLines.parseLines(List("unsupported = option"))
assertResult(expected)(result) assertResult(expected)(result)
} }

View file

@ -18,13 +18,13 @@ class SyncSuite
private val source = Resource(this, "upload") private val source = Resource(this, "upload")
private val prefix = RemoteKey("prefix") private val prefix = RemoteKey("prefix")
private val configOptions = List( private val configOptions = ConfigOptions(List(
ConfigOption.Source(source.toPath), ConfigOption.Source(source.toPath),
ConfigOption.Bucket("bucket"), ConfigOption.Bucket("bucket"),
ConfigOption.Prefix("prefix"), ConfigOption.Prefix("prefix"),
ConfigOption.IgnoreGlobalOptions, ConfigOption.IgnoreGlobalOptions,
ConfigOption.IgnoreUserOptions ConfigOption.IgnoreUserOptions
) ))
implicit private val logger: Logger = new DummyLogger implicit private val logger: Logger = new DummyLogger
private val lastModified = LastModified(Instant.now) private val lastModified = LastModified(Instant.now)
@ -50,8 +50,8 @@ class SyncSuite
def invokeSubject(storageService: StorageService, def invokeSubject(storageService: StorageService,
hashService: HashService, hashService: HashService,
configOptions: List[ConfigOption]): Either[List[String], Stream[Action]] = { configOptions: ConfigOptions): Either[List[String], Stream[Action]] = {
Synchronise(storageService, hashService, configOptions).value.unsafeRunSync Synchronise.createPlan(storageService, hashService, configOptions).value.unsafeRunSync
} }
describe("when all files should be uploaded") { describe("when all files should be uploaded") {
@ -162,6 +162,7 @@ class SyncSuite
override def upload(localFile: LocalFile, override def upload(localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener, uploadEventListener: UploadEventListener,
tryCount: Int): IO[UploadQueueEvent] = tryCount: Int): IO[UploadQueueEvent] =
IO.pure(UploadQueueEvent(localFile.remoteKey, localFile.hashes("md5"))) IO.pure(UploadQueueEvent(localFile.remoteKey, localFile.hashes("md5")))

View file

@ -6,4 +6,5 @@ final case class Config(bucket: Bucket = Bucket(""),
prefix: RemoteKey = RemoteKey(""), prefix: RemoteKey = RemoteKey(""),
filters: List[Filter] = List(), filters: List[Filter] = List(),
debug: Boolean = false, debug: Boolean = false,
batchMode: Boolean = false,
source: File) source: File)

View file

@ -13,6 +13,7 @@ trait StorageService {
def upload(localFile: LocalFile, def upload(localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener, uploadEventListener: UploadEventListener,
tryCount: Int): IO[StorageQueueEvent] tryCount: Int): IO[StorageQueueEvent]

View file

@ -6,7 +6,7 @@ import com.amazonaws.services.s3.model.CopyObjectRequest
import net.kemitix.thorp.domain.StorageQueueEvent.CopyQueueEvent import net.kemitix.thorp.domain.StorageQueueEvent.CopyQueueEvent
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
class S3ClientCopier(amazonS3: AmazonS3) { class Copier(amazonS3: AmazonS3) {
def copy(bucket: Bucket, def copy(bucket: Bucket,
sourceKey: RemoteKey, sourceKey: RemoteKey,

View file

@ -6,7 +6,7 @@ import com.amazonaws.services.s3.model.DeleteObjectRequest
import net.kemitix.thorp.domain.StorageQueueEvent.DeleteQueueEvent import net.kemitix.thorp.domain.StorageQueueEvent.DeleteQueueEvent
import net.kemitix.thorp.domain.{Bucket, RemoteKey} import net.kemitix.thorp.domain.{Bucket, RemoteKey}
class S3ClientDeleter(amazonS3: AmazonS3) { class Deleter(amazonS3: AmazonS3) {
def delete(bucket: Bucket, def delete(bucket: Bucket,
remoteKey: RemoteKey): IO[DeleteQueueEvent] = remoteKey: RemoteKey): IO[DeleteQueueEvent] =

View file

@ -12,7 +12,7 @@ import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.util.Try import scala.util.Try
class S3ClientObjectLister(amazonS3: AmazonS3) { class Lister(amazonS3: AmazonS3) {
def listObjects(bucket: Bucket, def listObjects(bucket: Bucket,
prefix: RemoteKey): EitherT[IO, String, S3ObjectsData] = { prefix: RemoteKey): EitherT[IO, String, S3ObjectsData] = {

View file

@ -12,10 +12,10 @@ class S3StorageService(amazonS3Client: => AmazonS3,
amazonS3TransferManager: => TransferManager) amazonS3TransferManager: => TransferManager)
extends StorageService { extends StorageService {
lazy val objectLister = new S3ClientObjectLister(amazonS3Client) lazy val objectLister = new Lister(amazonS3Client)
lazy val copier = new S3ClientCopier(amazonS3Client) lazy val copier = new Copier(amazonS3Client)
lazy val uploader = new Uploader(amazonS3TransferManager) lazy val uploader = new Uploader(amazonS3TransferManager)
lazy val deleter = new S3ClientDeleter(amazonS3Client) lazy val deleter = new Deleter(amazonS3Client)
override def listObjects(bucket: Bucket, override def listObjects(bucket: Bucket,
prefix: RemoteKey): EitherT[IO, String, S3ObjectsData] = prefix: RemoteKey): EitherT[IO, String, S3ObjectsData] =
@ -29,9 +29,10 @@ class S3StorageService(amazonS3Client: => AmazonS3,
override def upload(localFile: LocalFile, override def upload(localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener, uploadEventListener: UploadEventListener,
tryCount: Int): IO[StorageQueueEvent] = tryCount: Int): IO[StorageQueueEvent] =
uploader.upload(localFile, bucket, uploadEventListener, 1) uploader.upload(localFile, bucket, batchMode, uploadEventListener, 1)
override def delete(bucket: Bucket, override def delete(bucket: Bucket,
remoteKey: RemoteKey): IO[StorageQueueEvent] = remoteKey: RemoteKey): IO[StorageQueueEvent] =

View file

@ -1,6 +1,5 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import cats.effect.IO
import com.amazonaws.services.s3.transfer.{TransferManager, TransferManagerBuilder} import com.amazonaws.services.s3.transfer.{TransferManager, TransferManagerBuilder}
import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder} import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder}
import net.kemitix.thorp.storage.api.StorageService import net.kemitix.thorp.storage.api.StorageService
@ -11,11 +10,9 @@ object S3StorageServiceBuilder {
amazonS3TransferManager: TransferManager): StorageService = amazonS3TransferManager: TransferManager): StorageService =
new S3StorageService(amazonS3Client, amazonS3TransferManager) new S3StorageService(amazonS3Client, amazonS3TransferManager)
def defaultStorageService: IO[StorageService] = lazy val defaultStorageService: StorageService =
IO {
createService( createService(
AmazonS3ClientBuilder.defaultClient, AmazonS3ClientBuilder.defaultClient,
TransferManagerBuilder.defaultTransferManager) TransferManagerBuilder.defaultTransferManager)
}
} }

View file

@ -15,10 +15,11 @@ class Uploader(transferManager: => AmazonTransferManager) {
def upload(localFile: LocalFile, def upload(localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener, uploadEventListener: UploadEventListener,
tryCount: Int): IO[StorageQueueEvent] = tryCount: Int): IO[StorageQueueEvent] =
for { for {
upload <- transfer(localFile, bucket, uploadEventListener) upload <- transfer(localFile, bucket, batchMode, uploadEventListener)
action = upload match { action = upload match {
case Right(r) => UploadQueueEvent(RemoteKey(r.getKey), MD5Hash(r.getETag)) case Right(r) => UploadQueueEvent(RemoteKey(r.getKey), MD5Hash(r.getETag))
case Left(e) => ErrorQueueEvent(localFile.remoteKey, e) case Left(e) => ErrorQueueEvent(localFile.remoteKey, e)
@ -27,10 +28,11 @@ class Uploader(transferManager: => AmazonTransferManager) {
private def transfer(localFile: LocalFile, private def transfer(localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener, uploadEventListener: UploadEventListener,
): IO[Either[Throwable, UploadResult]] = { ): IO[Either[Throwable, UploadResult]] = {
val listener: ProgressListener = progressListener(uploadEventListener) val listener: ProgressListener = progressListener(uploadEventListener)
val putObjectRequest = request(localFile, bucket, listener) val putObjectRequest = request(localFile, bucket, batchMode, listener)
IO { IO {
Try(transferManager.upload(putObjectRequest)) Try(transferManager.upload(putObjectRequest))
.map(_.waitForUploadResult) .map(_.waitForUploadResult)
@ -38,12 +40,16 @@ class Uploader(transferManager: => AmazonTransferManager) {
} }
} }
private def request(localFile: LocalFile, bucket: Bucket, listener: ProgressListener): PutObjectRequest = { private def request(localFile: LocalFile,
bucket: Bucket,
batchMode: Boolean,
listener: ProgressListener): PutObjectRequest = {
val metadata = new ObjectMetadata() val metadata = new ObjectMetadata()
localFile.md5base64.foreach(metadata.setContentMD5) localFile.md5base64.foreach(metadata.setContentMD5)
new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file) val request = new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file)
.withMetadata(metadata) .withMetadata(metadata)
.withGeneralProgressListener(listener) if (batchMode) request
else request.withGeneralProgressListener(listener)
} }
private def progressListener(uploadEventListener: UploadEventListener) = private def progressListener(uploadEventListener: UploadEventListener) =

View file

@ -102,6 +102,8 @@ class StorageServiceSuite
Map("md5" -> hash) Map("md5" -> hash)
} }
val batchMode: Boolean = true
describe("upload") { describe("upload") {
describe("when uploading a file") { describe("when uploading a file") {
@ -127,7 +129,7 @@ class StorageServiceSuite
pending pending
//FIXME: works okay on its own, but fails when run with others //FIXME: works okay on its own, but fails when run with others
val expected = UploadQueueEvent(remoteKey, Root.hash) val expected = UploadQueueEvent(remoteKey, Root.hash)
val result = storageService.upload(localFile, bucket, uploadEventListener, 1) val result = storageService.upload(localFile, bucket, batchMode, uploadEventListener, 1)
assertResult(expected)(result) assertResult(expected)(result)
} }
} }

View file

@ -27,6 +27,8 @@ class UploaderSuite
"md5" -> hash "md5" -> hash
) )
val batchMode: Boolean = true
describe("S3ClientMultiPartTransferManagerSuite") { describe("S3ClientMultiPartTransferManagerSuite") {
describe("upload") { describe("upload") {
pending pending
@ -43,7 +45,7 @@ class UploaderSuite
val uploader = new Uploader(amazonS3TransferManager) val uploader = new Uploader(amazonS3TransferManager)
it("should upload") { it("should upload") {
val expected = UploadQueueEvent(returnedKey, returnedHash) val expected = UploadQueueEvent(returnedKey, returnedHash)
val result = uploader.upload(bigFile, config.bucket, uploadEventListener, 1) val result = uploader.upload(bigFile, config.bucket, batchMode, uploadEventListener, 1)
assertResult(expected)(result) assertResult(expected)(result)
} }
} }