Display simple error message when bucket is invalid (#81)
* [core,storage-{api,aws}] Allow Lister errors to be reported to user * [cli,core,storage-*] Display simple error message when bucket is invalid * [core] optimise imports * [storate-aws] optimise imports * [core] SyncSuite don't use get on an optional type
This commit is contained in:
11 changed files with 74 additions and 63 deletions
@ -10,17 +10,19 @@ trait Program {
def apply(cliOptions: Seq[ConfigOption]): IO[ExitCode] = {
implicit val logger: Logger = new PrintLogger()
Synchronise(defaultStorageService, cliOptions).flatMap {
case Left(errors) =>
for {
_ <- logger.error(s"There were errors:")
_ <- errors.map(error => logger.error(s" - $error")).sequence
} yield ExitCode.Error
case Right(actions) =>
for {
events <- handleActions(UnversionedMirrorArchive.default(defaultStorageService), actions)
_ <- SyncLogging.logRunFinished(events)
} yield ExitCode.Success
for {
actions <- Synchronise(defaultStorageService, cliOptions).valueOrF(handleErrors)
events <- handleActions(UnversionedMirrorArchive.default(defaultStorageService), actions)
_ <- SyncLogging.logRunFinished(events)
} yield ExitCode.Success
private def handleErrors(implicit logger: Logger): List[String] => IO[Stream[Action]] = {
errors => {
for {
_ <- logger.error("There were errors:")
_ <- errors.map(error => logger.error(s" - $error")).sequence
} yield Stream()
@ -31,4 +33,4 @@ trait Program {
object Program extends Program
object Program extends Program
@ -4,8 +4,8 @@ import java.io.File
import cats.effect.IO
import cats.implicits._
import net.kemitix.thorp.domain.{Bucket, Config, Logger, RemoteKey, StorageQueueEvent}
import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, ErrorQueueEvent, UploadQueueEvent}
import net.kemitix.thorp.domain._
trait SyncLogging {
@ -1,6 +1,7 @@
package net.kemitix.thorp.core
import cats.data.NonEmptyChain
import cats.data.EitherT
import cats.effect.IO
import cats.implicits._
import net.kemitix.thorp.core.Action.DoNothing
@ -11,12 +12,10 @@ trait Synchronise {
def apply(storageService: StorageService,
configOptions: Seq[ConfigOption])
(implicit logger: Logger): IO[Either[List[String], Stream[Action]]] =
.flatMap {
case Left(errors) => IO.pure(Left(errorMessages(errors)))
case Right(config) => useValidConfig(storageService, config)
(implicit logger: Logger): EitherT[IO, List[String], Stream[Action]] =
.flatMap(config => useValidConfig(storageService, config))
def errorMessages(errors: NonEmptyChain[ConfigValidation]): List[String] =
errors.map(cv => cv.errorMessage).toList
@ -28,25 +27,26 @@ trait Synchronise {
def useValidConfig(storageService: StorageService,
config: Config)
(implicit logger: Logger): IO[Either[List[String], Stream[Action]]] = {
(implicit logger: Logger): EitherT[IO, List[String], Stream[Action]] = {
for {
_ <- SyncLogging.logRunStart(config.bucket, config.prefix, config.source)
_ <- EitherT.liftF(SyncLogging.logRunStart(config.bucket, config.prefix, config.source))
actions <- gatherMetadata(storageService, logger, config)
.map { md =>
val (rd, ld) = md
val actions1 = actionsForLocalFiles(config, ld, rd)
val actions2 = actionsForRemoteKeys(config, rd)
Right((actions1 ++ actions2).filter(removeDoNothing))
.swap.map(error => List(error)).swap
.map {
case (remoteData, localData) =>
(actionsForLocalFiles(config, localData, remoteData) ++
actionsForRemoteKeys(config, remoteData))
} yield actions
private def gatherMetadata(storageService: StorageService,
logger: Logger,
config: Config) =
config: Config): EitherT[IO, String, (S3ObjectsData, Stream[LocalFile])] =
for {
remoteData <- fetchRemoteData(storageService, config)
localData <- findLocalFiles(config, logger)
localData <- EitherT.liftF(findLocalFiles(config, logger))
} yield (remoteData, localData)
private def actionsForLocalFiles(config: Config, localData: Stream[LocalFile], remoteData: S3ObjectsData) =
@ -1,7 +1,7 @@
package net.kemitix.thorp.core
import net.kemitix.thorp.domain.{MD5Hash, RemoteKey}
import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, UploadQueueEvent}
import net.kemitix.thorp.domain.{MD5Hash, RemoteKey}
import org.scalatest.FunSpec
class StorageQueueEventSuite extends FunSpec {
@ -3,6 +3,7 @@ package net.kemitix.thorp.core
import java.io.File
import java.time.Instant
import cats.data.EitherT
import cats.effect.IO
import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.core.MD5HashData.{leafHash, rootHash}
@ -38,7 +39,7 @@ class SyncSuite
def invokeSubject(storageService: StorageService,
configOptions: List[ConfigOption]): Either[List[String], Stream[Action]] = {
Synchronise(storageService, configOptions).unsafeRunSync
Synchronise(storageService, configOptions).value.unsafeRunSync
describe("when all files should be uploaded") {
@ -46,12 +47,11 @@ class SyncSuite
byHash = Map(),
byKey = Map()))
it("uploads all files") {
val expected = Stream(
val expected = Right(Set(
ToUpload(testBucket, rootFile),
ToUpload(testBucket, leafFile))
ToUpload(testBucket, leafFile)))
val result = invokeSubject(storageService, configOptions)
describe("when no files should be uploaded") {
@ -141,8 +141,8 @@ class SyncSuite
extends StorageService {
override def listObjects(bucket: Bucket,
prefix: RemoteKey): IO[S3ObjectsData] =
prefix: RemoteKey): EitherT[IO, String, S3ObjectsData] =
override def upload(localFile: LocalFile,
bucket: Bucket,
@ -1,12 +1,13 @@
package net.kemitix.thorp.storage.api
import cats.data.EitherT
import cats.effect.IO
import net.kemitix.thorp.domain._
trait StorageService {
def listObjects(bucket: Bucket,
prefix: RemoteKey): IO[S3ObjectsData]
prefix: RemoteKey): EitherT[IO, String, S3ObjectsData]
def upload(localFile: LocalFile,
bucket: Bucket,
@ -1,6 +1,8 @@
package net.kemitix.thorp.storage.aws
import cats.effect.IO
import cats.data.EitherT
import cats.implicits._
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary}
import net.kemitix.thorp.domain
@ -9,11 +11,12 @@ import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash
import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey
import scala.collection.JavaConverters._
import scala.util.{Success, Try}
class S3ClientObjectLister(amazonS3: AmazonS3) {
def listObjects(bucket: Bucket,
prefix: RemoteKey): IO[S3ObjectsData] = {
prefix: RemoteKey): EitherT[IO, String, S3ObjectsData] = {
type Token = String
type Batch = (Stream[S3ObjectSummary], Option[Token])
@ -23,29 +26,35 @@ class S3ClientObjectLister(amazonS3: AmazonS3) {
def fetchBatch: ListObjectsV2Request => IO[Batch] =
request => IO.pure {
val result = amazonS3.listObjectsV2(request)
val more: Option[Token] =
if (result.isTruncated) Some(result.getNextContinuationToken)
else None
(result.getObjectSummaries.asScala.toStream, more)
def fetchBatch: ListObjectsV2Request => EitherT[IO, String, Batch] =
request =>
EitherT {
IO.pure {
.map { result =>
val more: Option[Token] =
if (result.isTruncated) Some(result.getNextContinuationToken)
else None
(result.getObjectSummaries.asScala.toStream, more)
}.toEither.swap.map(e => e.getMessage).swap
def fetchMore(more: Option[Token]): IO[Stream[S3ObjectSummary]] = {
def fetchMore(more: Option[Token]): EitherT[IO, String, Stream[S3ObjectSummary]] = {
more match {
case None => IO.pure(Stream.empty)
case None => EitherT.right(IO.pure(Stream.empty))
case Some(token) => fetch(requestMore(token))
def fetch: ListObjectsV2Request => IO[Stream[S3ObjectSummary]] =
request =>
for {
batch <- fetchBatch(request)
(summaries, more) = batch
rest <- fetchMore(more)
} yield summaries ++ rest
def fetch: ListObjectsV2Request => EitherT[IO, String, Stream[S3ObjectSummary]] =
request => {
for {
batch <- fetchBatch(request)
(summaries, more) = batch
rest <- fetchMore(more)
} yield summaries ++ rest
for {
summaries <- fetch(new ListObjectsV2Request().withBucketName(bucket.name).withPrefix(prefix.key))
@ -1,5 +1,6 @@
package net.kemitix.thorp.storage.aws
import cats.data.EitherT
import cats.effect.IO
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.transfer.TransferManager
@ -16,7 +17,7 @@ class S3StorageService(amazonS3Client: => AmazonS3,
lazy val deleter = new S3ClientDeleter(amazonS3Client)
override def listObjects(bucket: Bucket,
prefix: RemoteKey): IO[S3ObjectsData] =
prefix: RemoteKey): EitherT[IO, String, S3ObjectsData] =
objectLister.listObjects(bucket, prefix)
override def copy(bucket: Bucket,
@ -57,17 +57,15 @@ class S3StorageServiceSuite
(amazonS3 listObjectsV2 (_: ListObjectsV2Request)).when(*).returns(myFakeResponse)
it("should build list of hash lookups, with duplicate objects grouped by hash") {
val expected = S3ObjectsData(
val expected = Right(S3ObjectsData(
byHash = Map(
h1 -> Set(KeyModified(k1a, lm), KeyModified(k1b, lm)),
h2 -> Set(KeyModified(k2, lm))),
byKey = Map(
k1a -> HashModified(h1, lm),
k1b -> HashModified(h1, lm),
k2 -> HashModified(h2, lm)))
val result = storageService.listObjects(Bucket("bucket"), RemoteKey("prefix")).unsafeRunSync
k2 -> HashModified(h2, lm))))
val result = storageService.listObjects(Bucket("bucket"), RemoteKey("prefix")).value.unsafeRunSync
@ -6,8 +6,8 @@ import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.PutObjectRequest
import com.amazonaws.services.s3.transfer.model.UploadResult
import com.amazonaws.services.s3.transfer.{TransferManager, Upload}
import net.kemitix.thorp.core.{KeyGenerator, Resource, S3MetaDataEnricher}
import net.kemitix.thorp.core.MD5HashData.rootHash
import net.kemitix.thorp.core.{KeyGenerator, Resource, S3MetaDataEnricher}
import net.kemitix.thorp.domain.StorageQueueEvent.UploadQueueEvent
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.StorageService
@ -6,8 +6,8 @@ import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.transfer._
import net.kemitix.thorp.core.KeyGenerator.generateKey
import net.kemitix.thorp.core.Resource
import net.kemitix.thorp.domain.{UploadEventListener, _}
import net.kemitix.thorp.domain.StorageQueueEvent.UploadQueueEvent
import net.kemitix.thorp.domain.{UploadEventListener, _}
import org.scalamock.scalatest.MockFactory
import org.scalatest.FunSpec
Add table
Reference in a new issue