diff --git a/CHANGELOG.org b/CHANGELOG.org index 4659175..efb9d5d 100644 --- a/CHANGELOG.org +++ b/CHANGELOG.org @@ -10,6 +10,7 @@ The format is based on [[https://keepachangelog.com/en/1.0.0/][Keep a Changelog] ** Added - Add a version command-line option (#99) + - Add a batch mode (#85) * [0.6.0] - 2019-06-30 diff --git a/README.org b/README.org index 6234694..59a93db 100644 --- a/README.org +++ b/README.org @@ -16,6 +16,8 @@ hash of the file contents. thorp Usage: thorp [options] + -V, --version Display the version and quit + -B, --batch Enabled batch-mode -s, --source Source directory to sync to S3 -b, --bucket S3 bucket name -p, --prefix Prefix within the S3 Bucket @@ -30,6 +32,11 @@ If you don't provide a ~source~ the current diretory will be used. The ~--include~ and ~--exclude~ parameters can be used more than once. +** Batch mode + +Batch mode disable the ANSI console display and logs simple messages +that can be written to a file. + * Configuration Configuration will be read from these files: diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala b/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala index af4dba3..251d30f 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala @@ -8,7 +8,7 @@ object Main extends IOApp { override def run(args: List[String]): IO[ExitCode] = { val exitCaseLogger = new PrintLogger(false) ParseArgs(args) - .map(Program(_)) + .map(Program.run) .getOrElse(IO(ExitCode.Error)) .guaranteeCase { case Canceled => exitCaseLogger.warn("Interrupted") diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/ParseArgs.scala b/cli/src/main/scala/net/kemitix/thorp/cli/ParseArgs.scala index 843ca27..6de4dba 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/ParseArgs.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/ParseArgs.scala @@ -2,7 +2,7 @@ package net.kemitix.thorp.cli import java.nio.file.Paths -import net.kemitix.thorp.core.ConfigOption +import net.kemitix.thorp.core.{ConfigOption, ConfigOptions} import scopt.OParser object ParseArgs { @@ -14,8 +14,11 @@ object ParseArgs { programName("thorp"), head("thorp"), opt[Unit]('V', "version") - .action((_, cos) => ConfigOption.Version :: cos) - .text("Show version"), + .action((_, cos) => ConfigOption.Version :: cos) + .text("Show version"), + opt[Unit]('B', "batch") + .action((_, cos) => ConfigOption.BatchMode :: cos) + .text("Enable batch-mode"), opt[String]('s', "source") .action((str, cos) => ConfigOption.Source(Paths.get(str)) :: cos) .text("Source directory to sync to destination"), @@ -45,7 +48,8 @@ object ParseArgs { ) } - def apply(args: List[String]): Option[List[ConfigOption]] = + def apply(args: List[String]): Option[ConfigOptions] = OParser.parse(configParser, args, List()) + .map(ConfigOptions) } diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala index 529cfd4..f100a2d 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala @@ -9,19 +9,19 @@ import net.kemitix.thorp.storage.aws.S3StorageServiceBuilder.defaultStorageServi trait Program { - def apply(cliOptions: Seq[ConfigOption]): IO[ExitCode] = { + def run(cliOptions: ConfigOptions): IO[ExitCode] = { implicit val logger: Logger = new PrintLogger() if (ConfigQuery.showVersion(cliOptions)) IO { println(s"Thorp v${thorp.BuildInfo.version}") ExitCode.Success - } else + } else { for { - storageService <- defaultStorageService - actions <- Synchronise(storageService, defaultHashService, cliOptions).valueOrF(handleErrors) - events <- handleActions(UnversionedMirrorArchive.default(storageService), actions) - _ <- storageService.shutdown + actions <- Synchronise.createPlan(defaultStorageService, defaultHashService, cliOptions).valueOrF(handleErrors) + events <- handleActions(UnversionedMirrorArchive.default(defaultStorageService, ConfigQuery.batchMode(cliOptions)), actions) + _ <- defaultStorageService.shutdown _ <- SyncLogging.logRunFinished(events) } yield ExitCode.Success + } } private def handleErrors(implicit logger: Logger): List[String] => IO[Stream[Action]] = { @@ -34,7 +34,8 @@ trait Program { } private def handleActions(archive: ThorpArchive, - actions: Stream[Action]): IO[Stream[StorageQueueEvent]] = + actions: Stream[Action]) + (implicit l: Logger): IO[Stream[StorageQueueEvent]] = actions.foldLeft(Stream[IO[StorageQueueEvent]]()) { (stream, action) => archive.update(action) ++ stream }.sequence diff --git a/cli/src/test/scala/net/kemitix/thorp/cli/ParseArgsTest.scala b/cli/src/test/scala/net/kemitix/thorp/cli/ParseArgsTest.scala index 74efca9..94f308e 100644 --- a/cli/src/test/scala/net/kemitix/thorp/cli/ParseArgsTest.scala +++ b/cli/src/test/scala/net/kemitix/thorp/cli/ParseArgsTest.scala @@ -1,7 +1,7 @@ package net.kemitix.thorp.cli import net.kemitix.thorp.core.ConfigOption.Debug -import net.kemitix.thorp.core.{ConfigOption, Resource} +import net.kemitix.thorp.core.{ConfigOptions, Resource} import org.scalatest.FunSpec import scala.util.Try @@ -26,11 +26,11 @@ class ParseArgsTest extends FunSpec { } describe("parse - debug") { - def invokeWithArgument(arg: String): List[ConfigOption] = { + def invokeWithArgument(arg: String): ConfigOptions = { val strings = List("--source", pathTo("."), "--bucket", "bucket", arg) .filter(_ != "") val maybeOptions = ParseArgs(strings) - maybeOptions.getOrElse(List()) + maybeOptions.getOrElse(ConfigOptions()) } describe("when no debug flag") { diff --git a/core/src/main/scala/net/kemitix/thorp/core/ConfigOption.scala b/core/src/main/scala/net/kemitix/thorp/core/ConfigOption.scala index 87aea8f..ee2a99c 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ConfigOption.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ConfigOption.scala @@ -13,6 +13,9 @@ object ConfigOption { case object Version extends ConfigOption { override def update(config: Config): Config = config } + case object BatchMode extends ConfigOption { + override def update(config: Config): Config = config.copy(batchMode = true) + } case class Source(path: Path) extends ConfigOption { override def update(config: Config): Config = config.copy(source = path.toFile) } diff --git a/core/src/main/scala/net/kemitix/thorp/core/ConfigOptions.scala b/core/src/main/scala/net/kemitix/thorp/core/ConfigOptions.scala new file mode 100644 index 0000000..a7ea8b6 --- /dev/null +++ b/core/src/main/scala/net/kemitix/thorp/core/ConfigOptions.scala @@ -0,0 +1,20 @@ +package net.kemitix.thorp.core + +import cats.Semigroup + +case class ConfigOptions(options: List[ConfigOption] = List()) + extends Semigroup[ConfigOptions] { + + override def combine(x: ConfigOptions, y: ConfigOptions): ConfigOptions = + x ++ y + + def ++(other: ConfigOptions): ConfigOptions = + ConfigOptions(options ++ other.options) + + def ::(head: ConfigOption): ConfigOptions = + ConfigOptions(head :: options) + + def contains[A1 >: ConfigOption](elem: A1): Boolean = + options contains elem + +} diff --git a/core/src/main/scala/net/kemitix/thorp/core/ConfigQuery.scala b/core/src/main/scala/net/kemitix/thorp/core/ConfigQuery.scala index 685b05c..1ace770 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ConfigQuery.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ConfigQuery.scala @@ -2,23 +2,17 @@ package net.kemitix.thorp.core trait ConfigQuery { - def showVersion(configOptions: Seq[ConfigOption]): Boolean = - configOptions.exists { - case ConfigOption.Version => true - case _ => false - } + def showVersion(configOptions: ConfigOptions): Boolean = + configOptions contains ConfigOption.Version - def ignoreUserOptions(configOptions: Seq[ConfigOption]): Boolean = - configOptions.exists { - case ConfigOption.IgnoreUserOptions => true - case _ => false - } + def batchMode(configOptions: ConfigOptions): Boolean = + configOptions contains ConfigOption.BatchMode - def ignoreGlobalOptions(configOptions: Seq[ConfigOption]): Boolean = - configOptions.exists { - case ConfigOption.IgnoreGlobalOptions => true - case _ => false - } + def ignoreUserOptions(configOptions: ConfigOptions): Boolean = + configOptions contains ConfigOption.IgnoreUserOptions + + def ignoreGlobalOptions(configOptions: ConfigOptions): Boolean = + configOptions contains ConfigOption.IgnoreGlobalOptions } diff --git a/core/src/main/scala/net/kemitix/thorp/core/ConfigurationBuilder.scala b/core/src/main/scala/net/kemitix/thorp/core/ConfigurationBuilder.scala index 11afe72..615c9ba 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ConfigurationBuilder.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ConfigurationBuilder.scala @@ -19,7 +19,7 @@ trait ConfigurationBuilder { private val defaultConfig: Config = Config(source = pwdFile) - def buildConfig(priorityOptions: Seq[ConfigOption]): IO[Either[NonEmptyChain[ConfigValidation], Config]] = { + def buildConfig(priorityOptions: ConfigOptions): IO[Either[NonEmptyChain[ConfigValidation], Config]] = { val source = findSource(priorityOptions) for { sourceOptions <- sourceOptions(source) @@ -30,30 +30,30 @@ trait ConfigurationBuilder { } yield validateConfig(config).toEither } - private def findSource(priorityOptions: Seq[ConfigOption]): File = - priorityOptions.foldRight(pwdFile)((co, f) => co match { + private def findSource(priorityOptions: ConfigOptions): File = + priorityOptions.options.foldRight(pwdFile)((co, f) => co match { case ConfigOption.Source(source) => source.toFile case _ => f }) - private def sourceOptions(source: File): IO[Seq[ConfigOption]] = + private def sourceOptions(source: File): IO[ConfigOptions] = readFile(source, ".thorp.conf") - private def userOptions(higherPriorityOptions: Seq[ConfigOption]): IO[Seq[ConfigOption]] = - if (ConfigQuery.ignoreUserOptions(higherPriorityOptions)) IO(List()) + private def userOptions(higherPriorityOptions: ConfigOptions): IO[ConfigOptions] = + if (ConfigQuery.ignoreUserOptions(higherPriorityOptions)) IO(ConfigOptions()) else readFile(userHome, ".config/thorp.conf") - private def globalOptions(higherPriorityOptions: Seq[ConfigOption]): IO[Seq[ConfigOption]] = - if (ConfigQuery.ignoreGlobalOptions(higherPriorityOptions)) IO(List()) + private def globalOptions(higherPriorityOptions: ConfigOptions): IO[ConfigOptions] = + if (ConfigQuery.ignoreGlobalOptions(higherPriorityOptions)) IO(ConfigOptions()) else parseFile(Paths.get("/etc/thorp.conf")) private def userHome = new File(System.getProperty("user.home")) - private def readFile(source: File, filename: String): IO[Seq[ConfigOption]] = + private def readFile(source: File, filename: String): IO[ConfigOptions] = parseFile(source.toPath.resolve(filename)) - private def collateOptions(configOptions: Seq[ConfigOption]): Config = - configOptions.foldRight(defaultConfig)((co, c) => co.update(c)) + private def collateOptions(configOptions: ConfigOptions): Config = + configOptions.options.foldRight(defaultConfig)((co, c) => co.update(c)) } object ConfigurationBuilder extends ConfigurationBuilder diff --git a/core/src/main/scala/net/kemitix/thorp/core/ParseConfigFile.scala b/core/src/main/scala/net/kemitix/thorp/core/ParseConfigFile.scala index 935065a..7cc16f5 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ParseConfigFile.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ParseConfigFile.scala @@ -8,7 +8,7 @@ import scala.collection.JavaConverters._ trait ParseConfigFile { - def parseFile(filename: Path): IO[Seq[ConfigOption]] = + def parseFile(filename: Path): IO[ConfigOptions] = readFile(filename).map(ParseConfigLines.parseLines) private def readFile(filename: Path) = { diff --git a/core/src/main/scala/net/kemitix/thorp/core/ParseConfigLines.scala b/core/src/main/scala/net/kemitix/thorp/core/ParseConfigLines.scala index 5c9cc36..fa91527 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ParseConfigLines.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ParseConfigLines.scala @@ -7,8 +7,8 @@ import net.kemitix.thorp.core.ConfigOption._ trait ParseConfigLines { - def parseLines(lines: List[String]): List[ConfigOption] = - lines.flatMap(parseLine) + def parseLines(lines: List[String]): ConfigOptions = + ConfigOptions(lines.flatMap(parseLine)) private val pattern = "^\\s*(?\\S*)\\s*=\\s*(?\\S*)\\s*$" private val format = Pattern.compile(pattern) diff --git a/core/src/main/scala/net/kemitix/thorp/core/Synchronise.scala b/core/src/main/scala/net/kemitix/thorp/core/Synchronise.scala index 8e8d494..ca4bf13 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/Synchronise.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/Synchronise.scala @@ -9,12 +9,12 @@ import net.kemitix.thorp.storage.api.{HashService, StorageService} trait Synchronise { - def apply(storageService: StorageService, + def createPlan(storageService: StorageService, hashService: HashService, - configOptions: Seq[ConfigOption]) + configOptions: ConfigOptions) (implicit l: Logger): EitherT[IO, List[String], Stream[Action]] = EitherT(ConfigurationBuilder.buildConfig(configOptions)) - .swap.map(errorMessages).swap + .leftMap(errorMessages) .flatMap(config => useValidConfig(storageService, hashService)(config, l)) def errorMessages(errors: NonEmptyChain[ConfigValidation]): List[String] = diff --git a/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala b/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala index 6889bcc..ed026bb 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala @@ -1,10 +1,15 @@ package net.kemitix.thorp.core import cats.effect.IO -import net.kemitix.thorp.domain.StorageQueueEvent +import net.kemitix.thorp.domain.{LocalFile, Logger, StorageQueueEvent} trait ThorpArchive { - def update(action: Action): Stream[IO[StorageQueueEvent]] + def update(action: Action)(implicit l: Logger): Stream[IO[StorageQueueEvent]] + + def fileUploaded(localFile: LocalFile, + batchMode: Boolean) + (implicit l: Logger): IO[Unit] = + if (batchMode) l.info(s"Uploaded: ${localFile.remoteKey.key}") else IO.unit } diff --git a/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala b/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala index cc7c8c9..3e3cbb7 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala @@ -3,16 +3,19 @@ package net.kemitix.thorp.core import cats.effect.IO import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent -import net.kemitix.thorp.domain.{StorageQueueEvent, UploadEventListener} +import net.kemitix.thorp.domain.{LocalFile, Logger, StorageQueueEvent, UploadEventListener} import net.kemitix.thorp.storage.api.StorageService -case class UnversionedMirrorArchive(storageService: StorageService) extends ThorpArchive { - override def update(action: Action): Stream[IO[StorageQueueEvent]] = +case class UnversionedMirrorArchive(storageService: StorageService, + batchMode: Boolean) extends ThorpArchive { + override def update(action: Action) + (implicit l: Logger): Stream[IO[StorageQueueEvent]] = Stream( action match { case ToUpload(bucket, localFile) => for { - event <- storageService.upload(localFile, bucket, new UploadEventListener(localFile), 1) + event <- storageService.upload(localFile, bucket, batchMode, new UploadEventListener(localFile), 1) + _ <- fileUploaded(localFile, batchMode) } yield event case ToCopy(bucket, sourceKey, hash, targetKey) => for { @@ -29,6 +32,7 @@ case class UnversionedMirrorArchive(storageService: StorageService) extends Thor } object UnversionedMirrorArchive { - def default(storageService: StorageService): ThorpArchive = - new UnversionedMirrorArchive(storageService) + def default(storageService: StorageService, + batchMode: Boolean): ThorpArchive = + new UnversionedMirrorArchive(storageService, batchMode) } diff --git a/core/src/test/scala/net/kemitix/thorp/core/ParseConfigFileTest.scala b/core/src/test/scala/net/kemitix/thorp/core/ParseConfigFileTest.scala index 7a11de4..3a7f92e 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/ParseConfigFileTest.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/ParseConfigFileTest.scala @@ -7,7 +7,7 @@ import org.scalatest.FunSpec class ParseConfigFileTest extends FunSpec { private def invoke(filename: Path) = ParseConfigFile.parseFile(filename).unsafeRunSync - private val empty = List() + private val empty = ConfigOptions() describe("parse a missing file") { val filename = Paths.get("/path/to/missing/file") @@ -29,7 +29,9 @@ class ParseConfigFileTest extends FunSpec { } describe("parse a file with properties") { val filename = Resource(this, "simple-config").toPath - val expected = List(ConfigOption.Source(Paths.get("/path/to/source")), ConfigOption.Bucket("bucket-name")) + val expected = ConfigOptions(List( + ConfigOption.Source(Paths.get("/path/to/source")), + ConfigOption.Bucket("bucket-name"))) it("should return some options") { assertResult(expected)(invoke(filename)) } diff --git a/core/src/test/scala/net/kemitix/thorp/core/ParseConfigLinesTest.scala b/core/src/test/scala/net/kemitix/thorp/core/ParseConfigLinesTest.scala index 525a858..1894e84 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/ParseConfigLinesTest.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/ParseConfigLinesTest.scala @@ -9,63 +9,63 @@ class ParseConfigLinesTest extends FunSpec { describe("parse single lines") { describe("source") { it("should parse") { - val expected = List(ConfigOption.Source(Paths.get("/path/to/source"))) + val expected = ConfigOptions(List(ConfigOption.Source(Paths.get("/path/to/source")))) val result = ParseConfigLines.parseLines(List("source = /path/to/source")) assertResult(expected)(result) } } describe("bucket") { it("should parse") { - val expected = List(ConfigOption.Bucket("bucket-name")) + val expected = ConfigOptions(List(ConfigOption.Bucket("bucket-name"))) val result = ParseConfigLines.parseLines(List("bucket = bucket-name")) assertResult(expected)(result) } } describe("prefix") { it("should parse") { - val expected = List(ConfigOption.Prefix("prefix/to/files")) + val expected = ConfigOptions(List(ConfigOption.Prefix("prefix/to/files"))) val result = ParseConfigLines.parseLines(List("prefix = prefix/to/files")) assertResult(expected)(result) } } describe("include") { it("should parse") { - val expected = List(ConfigOption.Include("path/to/include")) + val expected = ConfigOptions(List(ConfigOption.Include("path/to/include"))) val result = ParseConfigLines.parseLines(List("include = path/to/include")) assertResult(expected)(result) } } describe("exclude") { it("should parse") { - val expected = List(ConfigOption.Exclude("path/to/exclude")) + val expected = ConfigOptions(List(ConfigOption.Exclude("path/to/exclude"))) val result = ParseConfigLines.parseLines(List("exclude = path/to/exclude")) assertResult(expected)(result) } } describe("debug - true") { it("should parse") { - val expected = List(ConfigOption.Debug()) + val expected = ConfigOptions(List(ConfigOption.Debug())) val result = ParseConfigLines.parseLines(List("debug = true")) assertResult(expected)(result) } } describe("debug - false") { it("should parse") { - val expected = List() + val expected = ConfigOptions() val result = ParseConfigLines.parseLines(List("debug = false")) assertResult(expected)(result) } } describe("comment line") { it("should be ignored") { - val expected = List() + val expected = ConfigOptions() val result = ParseConfigLines.parseLines(List("# ignore me")) assertResult(expected)(result) } } describe("unrecognised option") { it("should be ignored") { - val expected = List() + val expected = ConfigOptions() val result = ParseConfigLines.parseLines(List("unsupported = option")) assertResult(expected)(result) } diff --git a/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala index bf2ec98..c56b0eb 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala @@ -18,13 +18,13 @@ class SyncSuite private val source = Resource(this, "upload") private val prefix = RemoteKey("prefix") - private val configOptions = List( + private val configOptions = ConfigOptions(List( ConfigOption.Source(source.toPath), ConfigOption.Bucket("bucket"), ConfigOption.Prefix("prefix"), ConfigOption.IgnoreGlobalOptions, ConfigOption.IgnoreUserOptions - ) + )) implicit private val logger: Logger = new DummyLogger private val lastModified = LastModified(Instant.now) @@ -50,8 +50,8 @@ class SyncSuite def invokeSubject(storageService: StorageService, hashService: HashService, - configOptions: List[ConfigOption]): Either[List[String], Stream[Action]] = { - Synchronise(storageService, hashService, configOptions).value.unsafeRunSync + configOptions: ConfigOptions): Either[List[String], Stream[Action]] = { + Synchronise.createPlan(storageService, hashService, configOptions).value.unsafeRunSync } describe("when all files should be uploaded") { @@ -162,6 +162,7 @@ class SyncSuite override def upload(localFile: LocalFile, bucket: Bucket, + batchMode: Boolean, uploadEventListener: UploadEventListener, tryCount: Int): IO[UploadQueueEvent] = IO.pure(UploadQueueEvent(localFile.remoteKey, localFile.hashes("md5"))) diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/Config.scala b/domain/src/main/scala/net/kemitix/thorp/domain/Config.scala index bad59ff..f326c90 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/Config.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/Config.scala @@ -6,4 +6,5 @@ final case class Config(bucket: Bucket = Bucket(""), prefix: RemoteKey = RemoteKey(""), filters: List[Filter] = List(), debug: Boolean = false, + batchMode: Boolean = false, source: File) diff --git a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala index a21d830..d4f9595 100644 --- a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala +++ b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala @@ -13,6 +13,7 @@ trait StorageService { def upload(localFile: LocalFile, bucket: Bucket, + batchMode: Boolean, uploadEventListener: UploadEventListener, tryCount: Int): IO[StorageQueueEvent] diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientCopier.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Copier.scala similarity index 95% rename from storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientCopier.scala rename to storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Copier.scala index 008b9aa..0d64b5d 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientCopier.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Copier.scala @@ -6,7 +6,7 @@ import com.amazonaws.services.s3.model.CopyObjectRequest import net.kemitix.thorp.domain.StorageQueueEvent.CopyQueueEvent import net.kemitix.thorp.domain._ -class S3ClientCopier(amazonS3: AmazonS3) { +class Copier(amazonS3: AmazonS3) { def copy(bucket: Bucket, sourceKey: RemoteKey, diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientDeleter.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Deleter.scala similarity index 93% rename from storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientDeleter.scala rename to storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Deleter.scala index fd26f43..7faeaeb 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientDeleter.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Deleter.scala @@ -6,7 +6,7 @@ import com.amazonaws.services.s3.model.DeleteObjectRequest import net.kemitix.thorp.domain.StorageQueueEvent.DeleteQueueEvent import net.kemitix.thorp.domain.{Bucket, RemoteKey} -class S3ClientDeleter(amazonS3: AmazonS3) { +class Deleter(amazonS3: AmazonS3) { def delete(bucket: Bucket, remoteKey: RemoteKey): IO[DeleteQueueEvent] = diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientObjectLister.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala similarity index 97% rename from storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientObjectLister.scala rename to storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala index 3556079..772e887 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientObjectLister.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Lister.scala @@ -12,7 +12,7 @@ import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey import scala.collection.JavaConverters._ import scala.util.Try -class S3ClientObjectLister(amazonS3: AmazonS3) { +class Lister(amazonS3: AmazonS3) { def listObjects(bucket: Bucket, prefix: RemoteKey): EitherT[IO, String, S3ObjectsData] = { diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala index 187e79e..887ae90 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala @@ -12,10 +12,10 @@ class S3StorageService(amazonS3Client: => AmazonS3, amazonS3TransferManager: => TransferManager) extends StorageService { - lazy val objectLister = new S3ClientObjectLister(amazonS3Client) - lazy val copier = new S3ClientCopier(amazonS3Client) + lazy val objectLister = new Lister(amazonS3Client) + lazy val copier = new Copier(amazonS3Client) lazy val uploader = new Uploader(amazonS3TransferManager) - lazy val deleter = new S3ClientDeleter(amazonS3Client) + lazy val deleter = new Deleter(amazonS3Client) override def listObjects(bucket: Bucket, prefix: RemoteKey): EitherT[IO, String, S3ObjectsData] = @@ -29,9 +29,10 @@ class S3StorageService(amazonS3Client: => AmazonS3, override def upload(localFile: LocalFile, bucket: Bucket, + batchMode: Boolean, uploadEventListener: UploadEventListener, tryCount: Int): IO[StorageQueueEvent] = - uploader.upload(localFile, bucket, uploadEventListener, 1) + uploader.upload(localFile, bucket, batchMode, uploadEventListener, 1) override def delete(bucket: Bucket, remoteKey: RemoteKey): IO[StorageQueueEvent] = diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageServiceBuilder.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageServiceBuilder.scala index 064c942..3ea4dee 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageServiceBuilder.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageServiceBuilder.scala @@ -1,6 +1,5 @@ package net.kemitix.thorp.storage.aws -import cats.effect.IO import com.amazonaws.services.s3.transfer.{TransferManager, TransferManagerBuilder} import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder} import net.kemitix.thorp.storage.api.StorageService @@ -11,11 +10,9 @@ object S3StorageServiceBuilder { amazonS3TransferManager: TransferManager): StorageService = new S3StorageService(amazonS3Client, amazonS3TransferManager) - def defaultStorageService: IO[StorageService] = - IO { - createService( - AmazonS3ClientBuilder.defaultClient, - TransferManagerBuilder.defaultTransferManager) - } + lazy val defaultStorageService: StorageService = + createService( + AmazonS3ClientBuilder.defaultClient, + TransferManagerBuilder.defaultTransferManager) } diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala index c81aaf2..cf88bd3 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala @@ -15,10 +15,11 @@ class Uploader(transferManager: => AmazonTransferManager) { def upload(localFile: LocalFile, bucket: Bucket, + batchMode: Boolean, uploadEventListener: UploadEventListener, tryCount: Int): IO[StorageQueueEvent] = for { - upload <- transfer(localFile, bucket, uploadEventListener) + upload <- transfer(localFile, bucket, batchMode, uploadEventListener) action = upload match { case Right(r) => UploadQueueEvent(RemoteKey(r.getKey), MD5Hash(r.getETag)) case Left(e) => ErrorQueueEvent(localFile.remoteKey, e) @@ -27,10 +28,11 @@ class Uploader(transferManager: => AmazonTransferManager) { private def transfer(localFile: LocalFile, bucket: Bucket, + batchMode: Boolean, uploadEventListener: UploadEventListener, ): IO[Either[Throwable, UploadResult]] = { val listener: ProgressListener = progressListener(uploadEventListener) - val putObjectRequest = request(localFile, bucket, listener) + val putObjectRequest = request(localFile, bucket, batchMode, listener) IO { Try(transferManager.upload(putObjectRequest)) .map(_.waitForUploadResult) @@ -38,12 +40,16 @@ class Uploader(transferManager: => AmazonTransferManager) { } } - private def request(localFile: LocalFile, bucket: Bucket, listener: ProgressListener): PutObjectRequest = { + private def request(localFile: LocalFile, + bucket: Bucket, + batchMode: Boolean, + listener: ProgressListener): PutObjectRequest = { val metadata = new ObjectMetadata() localFile.md5base64.foreach(metadata.setContentMD5) - new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file) + val request = new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file) .withMetadata(metadata) - .withGeneralProgressListener(listener) + if (batchMode) request + else request.withGeneralProgressListener(listener) } private def progressListener(uploadEventListener: UploadEventListener) = diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala index 04ecdff..5b53f3a 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala @@ -102,6 +102,8 @@ class StorageServiceSuite Map("md5" -> hash) } + val batchMode: Boolean = true + describe("upload") { describe("when uploading a file") { @@ -127,7 +129,7 @@ class StorageServiceSuite pending //FIXME: works okay on its own, but fails when run with others val expected = UploadQueueEvent(remoteKey, Root.hash) - val result = storageService.upload(localFile, bucket, uploadEventListener, 1) + val result = storageService.upload(localFile, bucket, batchMode, uploadEventListener, 1) assertResult(expected)(result) } } diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala index 2d4c6fd..5265187 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala @@ -27,6 +27,8 @@ class UploaderSuite "md5" -> hash ) + val batchMode: Boolean = true + describe("S3ClientMultiPartTransferManagerSuite") { describe("upload") { pending @@ -43,7 +45,7 @@ class UploaderSuite val uploader = new Uploader(amazonS3TransferManager) it("should upload") { val expected = UploadQueueEvent(returnedKey, returnedHash) - val result = uploader.upload(bigFile, config.bucket, uploadEventListener, 1) + val result = uploader.upload(bigFile, config.bucket, batchMode, uploadEventListener, 1) assertResult(expected)(result) } }