From c23376a037b364dab25a49c2282f5d973f2f339f Mon Sep 17 00:00:00 2001 From: Paul Campbell Date: Sat, 29 Jun 2019 20:04:36 +0100 Subject: [PATCH] Shutdown storage service once completed (#88) --- cli/src/main/scala/net/kemitix/thorp/cli/Program.scala | 6 ++++-- .../main/scala/net/kemitix/thorp/core/SyncLogging.scala | 4 ++-- .../test/scala/net/kemitix/thorp/core/SyncSuite.scala | 5 ++++- .../net/kemitix/thorp/domain/StorageQueueEvent.scala | 7 ++++--- .../net/kemitix/thorp/storage/api/StorageService.scala | 2 ++ .../net/kemitix/thorp/storage/aws/S3StorageService.scala | 8 ++++++++ .../thorp/storage/aws/S3StorageServiceBuilder.scala | 9 +++++++-- 7 files changed, 31 insertions(+), 10 deletions(-) diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala index 686c90f..0c0a8e3 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala @@ -12,8 +12,10 @@ trait Program { def apply(cliOptions: Seq[ConfigOption]): IO[ExitCode] = { implicit val logger: Logger = new PrintLogger() for { - actions <- Synchronise(defaultStorageService, defaultHashService, cliOptions).valueOrF(handleErrors) - events <- handleActions(UnversionedMirrorArchive.default(defaultStorageService), actions) + storageService <- defaultStorageService + actions <- Synchronise(storageService, defaultHashService, cliOptions).valueOrF(handleErrors) + events <- handleActions(UnversionedMirrorArchive.default(storageService), actions) + _ <- storageService.shutdown _ <- SyncLogging.logRunFinished(events) } yield ExitCode.Success } diff --git a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala b/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala index 3a83056..7784dc3 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala @@ -49,8 +49,8 @@ trait SyncLogging { counters.copy(copied = counters.copied + 1) case _: DeleteQueueEvent => counters.copy(deleted = counters.deleted + 1) - case ErrorQueueEvent(k, e) => - counters.copy(errors = counters.errors + 1) + case ErrorQueueEvent(_, _) => + counters.copy(errors = counters.errors + 1) case _ => counters } } diff --git a/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala index b542cbb..bf2ec98 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala @@ -8,7 +8,7 @@ import cats.data.EitherT import cats.effect.IO import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.domain.MD5HashData.{Leaf, Root} -import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, UploadQueueEvent} +import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, ShutdownQueueEvent, UploadQueueEvent} import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.api.{HashService, StorageService} import org.scalatest.FunSpec @@ -175,5 +175,8 @@ class SyncSuite override def delete(bucket: Bucket, remoteKey: RemoteKey): IO[DeleteQueueEvent] = IO.pure(DeleteQueueEvent(remoteKey)) + + override def shutdown: IO[StorageQueueEvent] = + IO.pure(ShutdownQueueEvent()) } } diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/StorageQueueEvent.scala b/domain/src/main/scala/net/kemitix/thorp/domain/StorageQueueEvent.scala index 391d081..b9a824a 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/StorageQueueEvent.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/StorageQueueEvent.scala @@ -2,9 +2,6 @@ package net.kemitix.thorp.domain sealed trait StorageQueueEvent { - // the remote key that was uploaded, deleted or otherwise updated by the action - def remoteKey: RemoteKey - val order: Int } @@ -32,6 +29,10 @@ object StorageQueueEvent { override val order: Int = 10 } + final case class ShutdownQueueEvent() extends StorageQueueEvent { + override val order: Int = 99 + } + implicit def ord[A <: StorageQueueEvent]: Ordering[A] = Ordering.by(_.order) } diff --git a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala index 4c67890..a21d830 100644 --- a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala +++ b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/StorageService.scala @@ -6,6 +6,8 @@ import net.kemitix.thorp.domain._ trait StorageService { + def shutdown: IO[StorageQueueEvent] + def listObjects(bucket: Bucket, prefix: RemoteKey): EitherT[IO, String, S3ObjectsData] diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala index 35938c6..187e79e 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageService.scala @@ -4,6 +4,7 @@ import cats.data.EitherT import cats.effect.IO import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.transfer.TransferManager +import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.api.StorageService @@ -36,4 +37,11 @@ class S3StorageService(amazonS3Client: => AmazonS3, remoteKey: RemoteKey): IO[StorageQueueEvent] = deleter.delete(bucket, remoteKey) + override def shutdown: IO[StorageQueueEvent] = + IO { + amazonS3TransferManager.shutdownNow(true) + amazonS3Client.shutdown() + ShutdownQueueEvent() + } + } diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageServiceBuilder.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageServiceBuilder.scala index bcdfcaa..064c942 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageServiceBuilder.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3StorageServiceBuilder.scala @@ -1,5 +1,6 @@ package net.kemitix.thorp.storage.aws +import cats.effect.IO import com.amazonaws.services.s3.transfer.{TransferManager, TransferManagerBuilder} import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder} import net.kemitix.thorp.storage.api.StorageService @@ -10,7 +11,11 @@ object S3StorageServiceBuilder { amazonS3TransferManager: TransferManager): StorageService = new S3StorageService(amazonS3Client, amazonS3TransferManager) - def defaultStorageService: StorageService = - createService(AmazonS3ClientBuilder.defaultClient, TransferManagerBuilder.defaultTransferManager) + def defaultStorageService: IO[StorageService] = + IO { + createService( + AmazonS3ClientBuilder.defaultClient, + TransferManagerBuilder.defaultTransferManager) + } }