Shutdown storage service once completed (#88)
This commit is contained in:
parent
ac9a52f93f
commit
c23376a037
7 changed files with 31 additions and 10 deletions
|
@ -12,8 +12,10 @@ trait Program {
|
||||||
def apply(cliOptions: Seq[ConfigOption]): IO[ExitCode] = {
|
def apply(cliOptions: Seq[ConfigOption]): IO[ExitCode] = {
|
||||||
implicit val logger: Logger = new PrintLogger()
|
implicit val logger: Logger = new PrintLogger()
|
||||||
for {
|
for {
|
||||||
actions <- Synchronise(defaultStorageService, defaultHashService, cliOptions).valueOrF(handleErrors)
|
storageService <- defaultStorageService
|
||||||
events <- handleActions(UnversionedMirrorArchive.default(defaultStorageService), actions)
|
actions <- Synchronise(storageService, defaultHashService, cliOptions).valueOrF(handleErrors)
|
||||||
|
events <- handleActions(UnversionedMirrorArchive.default(storageService), actions)
|
||||||
|
_ <- storageService.shutdown
|
||||||
_ <- SyncLogging.logRunFinished(events)
|
_ <- SyncLogging.logRunFinished(events)
|
||||||
} yield ExitCode.Success
|
} yield ExitCode.Success
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,7 +49,7 @@ trait SyncLogging {
|
||||||
counters.copy(copied = counters.copied + 1)
|
counters.copy(copied = counters.copied + 1)
|
||||||
case _: DeleteQueueEvent =>
|
case _: DeleteQueueEvent =>
|
||||||
counters.copy(deleted = counters.deleted + 1)
|
counters.copy(deleted = counters.deleted + 1)
|
||||||
case ErrorQueueEvent(k, e) =>
|
case ErrorQueueEvent(_, _) =>
|
||||||
counters.copy(errors = counters.errors + 1)
|
counters.copy(errors = counters.errors + 1)
|
||||||
case _ => counters
|
case _ => counters
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,7 @@ import cats.data.EitherT
|
||||||
import cats.effect.IO
|
import cats.effect.IO
|
||||||
import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload}
|
import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload}
|
||||||
import net.kemitix.thorp.domain.MD5HashData.{Leaf, Root}
|
import net.kemitix.thorp.domain.MD5HashData.{Leaf, Root}
|
||||||
import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, UploadQueueEvent}
|
import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, ShutdownQueueEvent, UploadQueueEvent}
|
||||||
import net.kemitix.thorp.domain._
|
import net.kemitix.thorp.domain._
|
||||||
import net.kemitix.thorp.storage.api.{HashService, StorageService}
|
import net.kemitix.thorp.storage.api.{HashService, StorageService}
|
||||||
import org.scalatest.FunSpec
|
import org.scalatest.FunSpec
|
||||||
|
@ -175,5 +175,8 @@ class SyncSuite
|
||||||
override def delete(bucket: Bucket,
|
override def delete(bucket: Bucket,
|
||||||
remoteKey: RemoteKey): IO[DeleteQueueEvent] =
|
remoteKey: RemoteKey): IO[DeleteQueueEvent] =
|
||||||
IO.pure(DeleteQueueEvent(remoteKey))
|
IO.pure(DeleteQueueEvent(remoteKey))
|
||||||
|
|
||||||
|
override def shutdown: IO[StorageQueueEvent] =
|
||||||
|
IO.pure(ShutdownQueueEvent())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,9 +2,6 @@ package net.kemitix.thorp.domain
|
||||||
|
|
||||||
sealed trait StorageQueueEvent {
|
sealed trait StorageQueueEvent {
|
||||||
|
|
||||||
// the remote key that was uploaded, deleted or otherwise updated by the action
|
|
||||||
def remoteKey: RemoteKey
|
|
||||||
|
|
||||||
val order: Int
|
val order: Int
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -32,6 +29,10 @@ object StorageQueueEvent {
|
||||||
override val order: Int = 10
|
override val order: Int = 10
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final case class ShutdownQueueEvent() extends StorageQueueEvent {
|
||||||
|
override val order: Int = 99
|
||||||
|
}
|
||||||
|
|
||||||
implicit def ord[A <: StorageQueueEvent]: Ordering[A] = Ordering.by(_.order)
|
implicit def ord[A <: StorageQueueEvent]: Ordering[A] = Ordering.by(_.order)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,8 @@ import net.kemitix.thorp.domain._
|
||||||
|
|
||||||
trait StorageService {
|
trait StorageService {
|
||||||
|
|
||||||
|
def shutdown: IO[StorageQueueEvent]
|
||||||
|
|
||||||
def listObjects(bucket: Bucket,
|
def listObjects(bucket: Bucket,
|
||||||
prefix: RemoteKey): EitherT[IO, String, S3ObjectsData]
|
prefix: RemoteKey): EitherT[IO, String, S3ObjectsData]
|
||||||
|
|
||||||
|
|
|
@ -4,6 +4,7 @@ import cats.data.EitherT
|
||||||
import cats.effect.IO
|
import cats.effect.IO
|
||||||
import com.amazonaws.services.s3.AmazonS3
|
import com.amazonaws.services.s3.AmazonS3
|
||||||
import com.amazonaws.services.s3.transfer.TransferManager
|
import com.amazonaws.services.s3.transfer.TransferManager
|
||||||
|
import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent
|
||||||
import net.kemitix.thorp.domain._
|
import net.kemitix.thorp.domain._
|
||||||
import net.kemitix.thorp.storage.api.StorageService
|
import net.kemitix.thorp.storage.api.StorageService
|
||||||
|
|
||||||
|
@ -36,4 +37,11 @@ class S3StorageService(amazonS3Client: => AmazonS3,
|
||||||
remoteKey: RemoteKey): IO[StorageQueueEvent] =
|
remoteKey: RemoteKey): IO[StorageQueueEvent] =
|
||||||
deleter.delete(bucket, remoteKey)
|
deleter.delete(bucket, remoteKey)
|
||||||
|
|
||||||
|
override def shutdown: IO[StorageQueueEvent] =
|
||||||
|
IO {
|
||||||
|
amazonS3TransferManager.shutdownNow(true)
|
||||||
|
amazonS3Client.shutdown()
|
||||||
|
ShutdownQueueEvent()
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package net.kemitix.thorp.storage.aws
|
package net.kemitix.thorp.storage.aws
|
||||||
|
|
||||||
|
import cats.effect.IO
|
||||||
import com.amazonaws.services.s3.transfer.{TransferManager, TransferManagerBuilder}
|
import com.amazonaws.services.s3.transfer.{TransferManager, TransferManagerBuilder}
|
||||||
import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder}
|
import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder}
|
||||||
import net.kemitix.thorp.storage.api.StorageService
|
import net.kemitix.thorp.storage.api.StorageService
|
||||||
|
@ -10,7 +11,11 @@ object S3StorageServiceBuilder {
|
||||||
amazonS3TransferManager: TransferManager): StorageService =
|
amazonS3TransferManager: TransferManager): StorageService =
|
||||||
new S3StorageService(amazonS3Client, amazonS3TransferManager)
|
new S3StorageService(amazonS3Client, amazonS3TransferManager)
|
||||||
|
|
||||||
def defaultStorageService: StorageService =
|
def defaultStorageService: IO[StorageService] =
|
||||||
createService(AmazonS3ClientBuilder.defaultClient, TransferManagerBuilder.defaultTransferManager)
|
IO {
|
||||||
|
createService(
|
||||||
|
AmazonS3ClientBuilder.defaultClient,
|
||||||
|
TransferManagerBuilder.defaultTransferManager)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue