From 9418744136f88efc3545c9494d8643bb62903459 Mon Sep 17 00:00:00 2001 From: Paul Campbell Date: Thu, 27 Jun 2019 07:34:15 +0100 Subject: [PATCH] Display simple error message when bucket is invalid (#81) * [core,storage-{api,aws}] Allow Lister errors to be reported to user * [cli,core,storage-*] Display simple error message when bucket is invalid * [core] optimise imports * [storate-aws] optimise imports * [core] SyncSuite don't use get on an optional type --- .../scala/net/kemitix/thorp/cli/Program.scala | 26 ++++++----- .../net/kemitix/thorp/core/SyncLogging.scala | 2 +- .../net/kemitix/thorp/core/Synchronise.scala | 30 ++++++------- .../thorp/core/StorageQueueEventSuite.scala | 2 +- .../net/kemitix/thorp/core/SyncSuite.scala | 14 +++--- .../thorp/storage/api/StorageService.scala | 3 +- .../storage/aws/S3ClientObjectLister.scala | 45 +++++++++++-------- .../thorp/storage/aws/S3StorageService.scala | 3 +- .../storage/aws/S3StorageServiceSuite.scala | 8 ++-- .../storage/aws/StorageServiceSuite.scala | 2 +- .../thorp/storage/aws/UploaderSuite.scala | 2 +- 11 files changed, 74 insertions(+), 63 deletions(-) diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala index 5a2fe81..8fe4699 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala @@ -10,17 +10,19 @@ trait Program { def apply(cliOptions: Seq[ConfigOption]): IO[ExitCode] = { implicit val logger: Logger = new PrintLogger() - Synchronise(defaultStorageService, cliOptions).flatMap { - case Left(errors) => - for { - _ <- logger.error(s"There were errors:") - _ <- errors.map(error => logger.error(s" - $error")).sequence - } yield ExitCode.Error - case Right(actions) => - for { - events <- handleActions(UnversionedMirrorArchive.default(defaultStorageService), actions) - _ <- SyncLogging.logRunFinished(events) - } yield ExitCode.Success + for { + actions <- Synchronise(defaultStorageService, cliOptions).valueOrF(handleErrors) + events <- handleActions(UnversionedMirrorArchive.default(defaultStorageService), actions) + _ <- SyncLogging.logRunFinished(events) + } yield ExitCode.Success + } + + private def handleErrors(implicit logger: Logger): List[String] => IO[Stream[Action]] = { + errors => { + for { + _ <- logger.error("There were errors:") + _ <- errors.map(error => logger.error(s" - $error")).sequence + } yield Stream() } } @@ -31,4 +33,4 @@ trait Program { }.sequence } -object Program extends Program \ No newline at end of file +object Program extends Program diff --git a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala b/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala index 5197359..3a83056 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala @@ -4,8 +4,8 @@ import java.io.File import cats.effect.IO import cats.implicits._ -import net.kemitix.thorp.domain.{Bucket, Config, Logger, RemoteKey, StorageQueueEvent} import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, ErrorQueueEvent, UploadQueueEvent} +import net.kemitix.thorp.domain._ trait SyncLogging { diff --git a/core/src/main/scala/net/kemitix/thorp/core/Synchronise.scala b/core/src/main/scala/net/kemitix/thorp/core/Synchronise.scala index 4c915f1..a4ff672 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/Synchronise.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/Synchronise.scala @@ -1,6 +1,7 @@ package net.kemitix.thorp.core import cats.data.NonEmptyChain +import cats.data.EitherT import cats.effect.IO import cats.implicits._ import net.kemitix.thorp.core.Action.DoNothing @@ -11,12 +12,10 @@ trait Synchronise { def apply(storageService: StorageService, configOptions: Seq[ConfigOption]) - (implicit logger: Logger): IO[Either[List[String], Stream[Action]]] = - ConfigurationBuilder.buildConfig(configOptions) - .flatMap { - case Left(errors) => IO.pure(Left(errorMessages(errors))) - case Right(config) => useValidConfig(storageService, config) - } + (implicit logger: Logger): EitherT[IO, List[String], Stream[Action]] = + EitherT(ConfigurationBuilder.buildConfig(configOptions)) + .swap.map(errorMessages).swap + .flatMap(config => useValidConfig(storageService, config)) def errorMessages(errors: NonEmptyChain[ConfigValidation]): List[String] = errors.map(cv => cv.errorMessage).toList @@ -28,25 +27,26 @@ trait Synchronise { def useValidConfig(storageService: StorageService, config: Config) - (implicit logger: Logger): IO[Either[List[String], Stream[Action]]] = { + (implicit logger: Logger): EitherT[IO, List[String], Stream[Action]] = { for { - _ <- SyncLogging.logRunStart(config.bucket, config.prefix, config.source) + _ <- EitherT.liftF(SyncLogging.logRunStart(config.bucket, config.prefix, config.source)) actions <- gatherMetadata(storageService, logger, config) - .map { md => - val (rd, ld) = md - val actions1 = actionsForLocalFiles(config, ld, rd) - val actions2 = actionsForRemoteKeys(config, rd) - Right((actions1 ++ actions2).filter(removeDoNothing)) + .swap.map(error => List(error)).swap + .map { + case (remoteData, localData) => + (actionsForLocalFiles(config, localData, remoteData) ++ + actionsForRemoteKeys(config, remoteData)) + .filter(removeDoNothing) } } yield actions } private def gatherMetadata(storageService: StorageService, logger: Logger, - config: Config) = + config: Config): EitherT[IO, String, (S3ObjectsData, Stream[LocalFile])] = for { remoteData <- fetchRemoteData(storageService, config) - localData <- findLocalFiles(config, logger) + localData <- EitherT.liftF(findLocalFiles(config, logger)) } yield (remoteData, localData) private def actionsForLocalFiles(config: Config, localData: Stream[LocalFile], remoteData: S3ObjectsData) = diff --git a/core/src/test/scala/net/kemitix/thorp/core/StorageQueueEventSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/StorageQueueEventSuite.scala index b43d2c9..ae82651 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/StorageQueueEventSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/StorageQueueEventSuite.scala @@ -1,7 +1,7 @@ package net.kemitix.thorp.core -import net.kemitix.thorp.domain.{MD5Hash, RemoteKey} import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, UploadQueueEvent} +import net.kemitix.thorp.domain.{MD5Hash, RemoteKey} import org.scalatest.FunSpec class StorageQueueEventSuite extends FunSpec { diff --git a/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala index 7a0b6bc..073f81b 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala @@ -3,6 +3,7 @@ package net.kemitix.thorp.core import java.io.File import java.time.Instant +import cats.data.EitherT import cats.effect.IO import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.core.MD5HashData.{leafHash, rootHash} @@ -38,7 +39,7 @@ class SyncSuite def invokeSubject(storageService: StorageService, configOptions: List[ConfigOption]): Either[List[String], Stream[Action]] = { - Synchronise(storageService, configOptions).unsafeRunSync + Synchronise(storageService, configOptions).value.unsafeRunSync } describe("when all files should be uploaded") { @@ -46,12 +47,11 @@ class SyncSuite byHash = Map(), byKey = Map())) it("uploads all files") { - val expected = Stream( + val expected = Right(Set( ToUpload(testBucket, rootFile), - ToUpload(testBucket, leafFile)) + ToUpload(testBucket, leafFile))) val result = invokeSubject(storageService, configOptions) - assert(result.isRight) - assertResult(expected)(result.right.get) + assertResult(expected)(result.map(_.toSet)) } } describe("when no files should be uploaded") { @@ -141,8 +141,8 @@ class SyncSuite extends StorageService { override def listObjects(bucket: Bucket, - prefix: RemoteKey): IO[S3ObjectsData] = - IO.pure(s3ObjectsData) + prefix: RemoteKey): EitherT[IO, String, S3ObjectsData] = + EitherT.liftF(IO.pure(s3ObjectsData)) override def upload(localFile: LocalFile, bucket: Bucket, diff --git a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala index 0b5868b..4c67890 100644 --- a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala +++ b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala @@ -1,12 +1,13 @@ package net.kemitix.thorp.storage.api +import cats.data.EitherT import cats.effect.IO import net.kemitix.thorp.domain._ trait StorageService { def listObjects(bucket: Bucket, - prefix: RemoteKey): IO[S3ObjectsData] + prefix: RemoteKey): EitherT[IO, String, S3ObjectsData] def upload(localFile: LocalFile, bucket: Bucket, diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientObjectLister.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientObjectLister.scala index dfcf15c..ec804cf 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientObjectLister.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientObjectLister.scala @@ -1,6 +1,8 @@ package net.kemitix.thorp.storage.aws import cats.effect.IO +import cats.data.EitherT +import cats.implicits._ import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary} import net.kemitix.thorp.domain @@ -9,11 +11,12 @@ import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey import scala.collection.JavaConverters._ +import scala.util.{Success, Try} class S3ClientObjectLister(amazonS3: AmazonS3) { def listObjects(bucket: Bucket, - prefix: RemoteKey): IO[S3ObjectsData] = { + prefix: RemoteKey): EitherT[IO, String, S3ObjectsData] = { type Token = String type Batch = (Stream[S3ObjectSummary], Option[Token]) @@ -23,29 +26,35 @@ class S3ClientObjectLister(amazonS3: AmazonS3) { .withPrefix(prefix.key) .withContinuationToken(token) - def fetchBatch: ListObjectsV2Request => IO[Batch] = - request => IO.pure { - val result = amazonS3.listObjectsV2(request) - val more: Option[Token] = - if (result.isTruncated) Some(result.getNextContinuationToken) - else None - (result.getObjectSummaries.asScala.toStream, more) - } + def fetchBatch: ListObjectsV2Request => EitherT[IO, String, Batch] = + request => + EitherT { + IO.pure { + Try(amazonS3.listObjectsV2(request)) + .map { result => + val more: Option[Token] = + if (result.isTruncated) Some(result.getNextContinuationToken) + else None + (result.getObjectSummaries.asScala.toStream, more) + }.toEither.swap.map(e => e.getMessage).swap + } + } - def fetchMore(more: Option[Token]): IO[Stream[S3ObjectSummary]] = { + def fetchMore(more: Option[Token]): EitherT[IO, String, Stream[S3ObjectSummary]] = { more match { - case None => IO.pure(Stream.empty) + case None => EitherT.right(IO.pure(Stream.empty)) case Some(token) => fetch(requestMore(token)) } } - def fetch: ListObjectsV2Request => IO[Stream[S3ObjectSummary]] = - request => - for { - batch <- fetchBatch(request) - (summaries, more) = batch - rest <- fetchMore(more) - } yield summaries ++ rest + def fetch: ListObjectsV2Request => EitherT[IO, String, Stream[S3ObjectSummary]] = + request => { + for { + batch <- fetchBatch(request) + (summaries, more) = batch + rest <- fetchMore(more) + } yield summaries ++ rest + } for { summaries <- fetch(new ListObjectsV2Request().withBucketName(bucket.name).withPrefix(prefix.key)) diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala index fef37e2..35938c6 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala @@ -1,5 +1,6 @@ package net.kemitix.thorp.storage.aws +import cats.data.EitherT import cats.effect.IO import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.transfer.TransferManager @@ -16,7 +17,7 @@ class S3StorageService(amazonS3Client: => AmazonS3, lazy val deleter = new S3ClientDeleter(amazonS3Client) override def listObjects(bucket: Bucket, - prefix: RemoteKey): IO[S3ObjectsData] = + prefix: RemoteKey): EitherT[IO, String, S3ObjectsData] = objectLister.listObjects(bucket, prefix) override def copy(bucket: Bucket, diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3StorageServiceSuite.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3StorageServiceSuite.scala index 4c52a93..b5934f0 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3StorageServiceSuite.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3StorageServiceSuite.scala @@ -57,17 +57,15 @@ class S3StorageServiceSuite (amazonS3 listObjectsV2 (_: ListObjectsV2Request)).when(*).returns(myFakeResponse) it("should build list of hash lookups, with duplicate objects grouped by hash") { - val expected = S3ObjectsData( + val expected = Right(S3ObjectsData( byHash = Map( h1 -> Set(KeyModified(k1a, lm), KeyModified(k1b, lm)), h2 -> Set(KeyModified(k2, lm))), byKey = Map( k1a -> HashModified(h1, lm), k1b -> HashModified(h1, lm), - k2 -> HashModified(h2, lm))) - val result = storageService.listObjects(Bucket("bucket"), RemoteKey("prefix")).unsafeRunSync - assertResult(expected.byHash.keys)(result.byHash.keys) - assertResult(expected.byKey.keys)(result.byKey.keys) + k2 -> HashModified(h2, lm)))) + val result = storageService.listObjects(Bucket("bucket"), RemoteKey("prefix")).value.unsafeRunSync assertResult(expected)(result) } } diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala index 96b9ddb..ac7187c 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala @@ -6,8 +6,8 @@ import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.PutObjectRequest import com.amazonaws.services.s3.transfer.model.UploadResult import com.amazonaws.services.s3.transfer.{TransferManager, Upload} -import net.kemitix.thorp.core.{KeyGenerator, Resource, S3MetaDataEnricher} import net.kemitix.thorp.core.MD5HashData.rootHash +import net.kemitix.thorp.core.{KeyGenerator, Resource, S3MetaDataEnricher} import net.kemitix.thorp.domain.StorageQueueEvent.UploadQueueEvent import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.api.StorageService diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala index 30b613d..49776f9 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala @@ -6,8 +6,8 @@ import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.transfer._ import net.kemitix.thorp.core.KeyGenerator.generateKey import net.kemitix.thorp.core.Resource -import net.kemitix.thorp.domain.{UploadEventListener, _} import net.kemitix.thorp.domain.StorageQueueEvent.UploadQueueEvent +import net.kemitix.thorp.domain.{UploadEventListener, _} import org.scalamock.scalatest.MockFactory import org.scalatest.FunSpec