Merge config package object with Config object (#138)

* [config] merge config package object with Config object

* [config] Only extract batchMode value where used

Don't pull it out early then pass a boolean around
This commit is contained in:
Paul Campbell 2019-07-30 10:33:06 +01:00 committed by GitHub
parent a93781007d
commit e5de73e705
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 139 additions and 138 deletions

View file

@ -58,8 +58,8 @@ val zioDependencies = Seq(
) )
// cli -> thorp-lib -> storage-aws -> core -> storage-api -> console -> domain // cli -> thorp-lib -> storage-aws -> core -> storage-api -> console -> domain
// core -> config -> domain // storage-api -> config -> domain
// config -> filesystem // config -> filesystem
lazy val thorp = (project in file(".")) lazy val thorp = (project in file("."))
.settings(commonSettings) .settings(commonSettings)
@ -102,13 +102,13 @@ lazy val core = (project in file("core"))
.settings(testDependencies) .settings(testDependencies)
.dependsOn(`storage-api`) .dependsOn(`storage-api`)
.dependsOn(domain % "compile->compile;test->test") .dependsOn(domain % "compile->compile;test->test")
.dependsOn(config)
lazy val `storage-api` = (project in file("storage-api")) lazy val `storage-api` = (project in file("storage-api"))
.settings(commonSettings) .settings(commonSettings)
.settings(zioDependencies) .settings(zioDependencies)
.settings(assemblyJarName in assembly := "storage-api.jar") .settings(assemblyJarName in assembly := "storage-api.jar")
.dependsOn(console) .dependsOn(console)
.dependsOn(config)
lazy val console = (project in file("console")) lazy val console = (project in file("console"))
.settings(commonSettings) .settings(commonSettings)

View file

@ -16,7 +16,7 @@ trait Program {
for { for {
cli <- CliArgs.parse(args) cli <- CliArgs.parse(args)
config <- ConfigurationBuilder.buildConfig(cli) config <- ConfigurationBuilder.buildConfig(cli)
_ <- setConfiguration(config) _ <- Config.set(config)
_ <- ZIO.when(showVersion(cli))(Console.putStrLn(version)) _ <- ZIO.when(showVersion(cli))(Console.putStrLn(version))
_ <- ZIO.when(!showVersion(cli))(execute.catchAll(handleErrors)) _ <- ZIO.when(!showVersion(cli))(execute.catchAll(handleErrors))
} yield () } yield ()
@ -27,11 +27,10 @@ trait Program {
private def execute = { private def execute = {
for { for {
plan <- PlanBuilder.createPlan(defaultHashService) plan <- PlanBuilder.createPlan(defaultHashService)
batchMode <- isBatchMode archive <- UnversionedMirrorArchive.default(plan.syncTotals)
archive <- UnversionedMirrorArchive.default(batchMode, plan.syncTotals) events <- applyPlan(archive, plan)
events <- applyPlan(archive, plan) _ <- SyncLogging.logRunFinished(events)
_ <- SyncLogging.logRunFinished(events)
} yield () } yield ()
} }

View file

@ -47,4 +47,21 @@ object Config {
object Live extends Live object Live extends Live
final def set(config: Configuration): ZIO[Config, Nothing, Unit] =
ZIO.accessM(_.config setConfiguration config)
final def batchMode: ZIO[Config, Nothing, Boolean] =
ZIO.accessM(_.config isBatchMode)
final def bucket: ZIO[Config, Nothing, Bucket] =
ZIO.accessM(_.config bucket)
final def prefix: ZIO[Config, Nothing, RemoteKey] =
ZIO.accessM(_.config prefix)
final def sources: ZIO[Config, Nothing, Sources] =
ZIO.accessM(_.config sources)
final def filters: ZIO[Config, Nothing, List[Filter]] =
ZIO.accessM(_.config filters)
} }

View file

@ -1,29 +0,0 @@
package net.kemitix.thorp
import net.kemitix.thorp.domain.{Bucket, Filter, RemoteKey, Sources}
import zio.ZIO
package object config {
final val configService: ZIO[Config, Nothing, Config.Service] =
ZIO.access(_.config)
final def setConfiguration(
config: Configuration): ZIO[Config, Nothing, Unit] =
ZIO.accessM(_.config setConfiguration config)
final def isBatchMode: ZIO[Config, Nothing, Boolean] =
ZIO.accessM(_.config isBatchMode)
final def getBucket: ZIO[Config, Nothing, Bucket] =
ZIO.accessM(_.config bucket)
final def getPrefix: ZIO[Config, Nothing, RemoteKey] =
ZIO.accessM(_.config prefix)
final def getSources: ZIO[Config, Nothing, Sources] =
ZIO.accessM(_.config sources)
final def getFilters: ZIO[Config, Nothing, List[Filter]] =
ZIO.accessM(_.config filters)
}

View file

@ -1,6 +1,6 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import net.kemitix.thorp.config._ import net.kemitix.thorp.config.Config
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToUpload} import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToUpload}
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import zio.ZIO import zio.ZIO
@ -12,7 +12,7 @@ object ActionGenerator {
previousActions: Stream[Action] previousActions: Stream[Action]
): ZIO[Config, Nothing, Stream[Action]] = ): ZIO[Config, Nothing, Stream[Action]] =
for { for {
bucket <- getBucket bucket <- Config.bucket
} yield } yield
s3MetaData match { s3MetaData match {
// #1 local exists, remote exists, remote matches - do nothing // #1 local exists, remote exists, remote matches - do nothing

View file

@ -2,7 +2,7 @@ package net.kemitix.thorp.core
import java.nio.file.Path import java.nio.file.Path
import net.kemitix.thorp.config._ import net.kemitix.thorp.config.Config
import net.kemitix.thorp.core.KeyGenerator.generateKey import net.kemitix.thorp.core.KeyGenerator.generateKey
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem.FileSystem import net.kemitix.thorp.filesystem.FileSystem
@ -56,8 +56,8 @@ object LocalFileStream {
private def localFile(hashService: HashService)(path: Path) = { private def localFile(hashService: HashService)(path: Path) = {
val file = path.toFile val file = path.toFile
for { for {
sources <- getSources sources <- Config.sources
prefix <- getPrefix prefix <- Config.prefix
hash <- hashService.hashLocalObject(path) hash <- hashService.hashLocalObject(path)
localFile = LocalFile(file, localFile = LocalFile(file,
sources.forPath(path).toFile, sources.forPath(path).toFile,
@ -78,7 +78,7 @@ object LocalFileStream {
private def isIncluded(path: Path) = private def isIncluded(path: Path) =
for { for {
filters <- getFilters filters <- Config.filters
} yield Filter.isIncluded(filters)(path) } yield Filter.isIncluded(filters)(path)
private def pathToLocalFile(hashService: HashService)(path: Path) = private def pathToLocalFile(hashService: HashService)(path: Path) =

View file

@ -1,6 +1,6 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import net.kemitix.thorp.config._ import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console._ import net.kemitix.thorp.console._
import net.kemitix.thorp.core.Action._ import net.kemitix.thorp.core.Action._
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
@ -72,9 +72,9 @@ trait PlanBuilder {
private def createActionFromRemoteKey(remoteKey: RemoteKey) = private def createActionFromRemoteKey(remoteKey: RemoteKey) =
for { for {
bucket <- getBucket bucket <- Config.bucket
prefix <- getPrefix prefix <- Config.prefix
sources <- getSources sources <- Config.sources
needsDeleted <- Remote.isMissingLocally(sources, prefix)(remoteKey) needsDeleted <- Remote.isMissingLocally(sources, prefix)(remoteKey)
} yield } yield
if (needsDeleted) ToDelete(bucket, remoteKey, 0L) if (needsDeleted) ToDelete(bucket, remoteKey, 0L)
@ -88,8 +88,8 @@ trait PlanBuilder {
private def fetchRemoteData = private def fetchRemoteData =
for { for {
bucket <- getBucket bucket <- Config.bucket
prefix <- getPrefix prefix <- Config.prefix
objects <- Storage.list(bucket, prefix) objects <- Storage.list(bucket, prefix)
} yield objects } yield objects
@ -101,7 +101,7 @@ trait PlanBuilder {
private def findFiles(hashService: HashService) = private def findFiles(hashService: HashService) =
for { for {
sources <- getSources sources <- Config.sources
paths = sources.paths paths = sources.paths
found <- ZIO.foreach(paths)(path => found <- ZIO.foreach(paths)(path =>
LocalFileStream.findFiles(hashService)(path)) LocalFileStream.findFiles(hashService)(path))

View file

@ -1,6 +1,6 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import net.kemitix.thorp.config._ import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console._ import net.kemitix.thorp.console._
import net.kemitix.thorp.domain.StorageQueueEvent.{ import net.kemitix.thorp.domain.StorageQueueEvent.{
CopyQueueEvent, CopyQueueEvent,
@ -16,15 +16,15 @@ trait SyncLogging {
def logRunStart: ZIO[Console with Config, Nothing, Unit] = def logRunStart: ZIO[Console with Config, Nothing, Unit] =
for { for {
bucket <- getBucket bucket <- Config.bucket
prefix <- getPrefix prefix <- Config.prefix
sources <- getSources sources <- Config.sources
_ <- Console.putMessageLn(ConsoleOut.ValidConfig(bucket, prefix, sources)) _ <- Console.putMessageLn(ConsoleOut.ValidConfig(bucket, prefix, sources))
} yield () } yield ()
def logFileScan: ZIO[Config with Console, Nothing, Unit] = def logFileScan: ZIO[Config with Console, Nothing, Unit] =
for { for {
sources <- getSources sources <- Config.sources
_ <- Console.putStrLn( _ <- Console.putStrLn(
s"Scanning local files: ${sources.paths.mkString(", ")}...") s"Scanning local files: ${sources.paths.mkString(", ")}...")
} yield () } yield ()

View file

@ -1,5 +1,6 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console._ import net.kemitix.thorp.console._
import net.kemitix.thorp.domain.StorageQueueEvent import net.kemitix.thorp.domain.StorageQueueEvent
import net.kemitix.thorp.domain.StorageQueueEvent.{ import net.kemitix.thorp.domain.StorageQueueEvent.{
@ -22,15 +23,15 @@ trait ThorpArchive {
index: Int, index: Int,
action: Action, action: Action,
totalBytesSoFar: Long totalBytesSoFar: Long
): TaskR[Storage with Console, StorageQueueEvent] ): TaskR[Storage with Console with Config, StorageQueueEvent]
def logEvent( def logEvent(
event: StorageQueueEvent, event: StorageQueueEvent
batchMode: Boolean ): TaskR[Console with Config, Unit] =
): TaskR[Console, Unit] =
event match { event match {
case UploadQueueEvent(remoteKey, _) => case UploadQueueEvent(remoteKey, _) =>
for { for {
batchMode <- Config.batchMode
_ <- TaskR.when(batchMode)( _ <- TaskR.when(batchMode)(
Console.putStrLn(s"Uploaded: ${remoteKey.key}")) Console.putStrLn(s"Uploaded: ${remoteKey.key}"))
_ <- TaskR.when(!batchMode)( _ <- TaskR.when(!batchMode)(
@ -39,6 +40,7 @@ trait ThorpArchive {
} yield () } yield ()
case CopyQueueEvent(sourceKey, targetKey) => case CopyQueueEvent(sourceKey, targetKey) =>
for { for {
batchMode <- Config.batchMode
_ <- TaskR.when(batchMode)( _ <- TaskR.when(batchMode)(
Console.putStrLn(s"Copied: ${sourceKey.key} => ${targetKey.key}")) Console.putStrLn(s"Copied: ${sourceKey.key} => ${targetKey.key}"))
_ <- TaskR.when(!batchMode)( _ <- TaskR.when(!batchMode)(
@ -48,13 +50,15 @@ trait ThorpArchive {
} yield () } yield ()
case DeleteQueueEvent(remoteKey) => case DeleteQueueEvent(remoteKey) =>
for { for {
_ <- TaskR.when(batchMode)(Console.putStrLn(s"Deleted: $remoteKey")) batchMode <- Config.batchMode
_ <- TaskR.when(batchMode)(Console.putStrLn(s"Deleted: $remoteKey"))
_ <- TaskR.when(!batchMode)( _ <- TaskR.when(!batchMode)(
Console.putStrLn( Console.putStrLn(
s"${GREEN}Deleted:$RESET ${remoteKey.key}$eraseToEndOfScreen")) s"${GREEN}Deleted:$RESET ${remoteKey.key}$eraseToEndOfScreen"))
} yield () } yield ()
case ErrorQueueEvent(action, _, e) => case ErrorQueueEvent(action, _, e) =>
for { for {
batchMode <- Config.batchMode
_ <- TaskR.when(batchMode)( _ <- TaskR.when(batchMode)(
Console.putStrLn( Console.putStrLn(
s"${action.name} failed: ${action.keys}: ${e.getMessage}")) s"${action.name} failed: ${action.keys}: ${e.getMessage}"))

View file

@ -1,5 +1,6 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console._ import net.kemitix.thorp.console._
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent
@ -7,31 +8,29 @@ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.Storage import net.kemitix.thorp.storage.api.Storage
import zio.{Task, TaskR} import zio.{Task, TaskR}
case class UnversionedMirrorArchive( case class UnversionedMirrorArchive(syncTotals: SyncTotals)
batchMode: Boolean, extends ThorpArchive {
syncTotals: SyncTotals
) extends ThorpArchive {
override def update( override def update(
index: Int, index: Int,
action: Action, action: Action,
totalBytesSoFar: Long totalBytesSoFar: Long
): TaskR[Storage with Console, StorageQueueEvent] = ): TaskR[Storage with Console with Config, StorageQueueEvent] =
action match { action match {
case ToUpload(bucket, localFile, _) => case ToUpload(bucket, localFile, _) =>
for { for {
event <- doUpload(index, totalBytesSoFar, bucket, localFile) event <- doUpload(index, totalBytesSoFar, bucket, localFile)
_ <- logEvent(event, batchMode) _ <- logEvent(event)
} yield event } yield event
case ToCopy(bucket, sourceKey, hash, targetKey, _) => case ToCopy(bucket, sourceKey, hash, targetKey, _) =>
for { for {
event <- Storage.copy(bucket, sourceKey, hash, targetKey) event <- Storage.copy(bucket, sourceKey, hash, targetKey)
_ <- logEvent(event, batchMode) _ <- logEvent(event)
} yield event } yield event
case ToDelete(bucket, remoteKey, _) => case ToDelete(bucket, remoteKey, _) =>
for { for {
event <- Storage.delete(bucket, remoteKey) event <- Storage.delete(bucket, remoteKey)
_ <- logEvent(event, batchMode) _ <- logEvent(event)
} yield event } yield event
case DoNothing(_, remoteKey, _) => case DoNothing(_, remoteKey, _) =>
Task(DoNothingQueueEvent(remoteKey)) Task(DoNothingQueueEvent(remoteKey))
@ -46,17 +45,13 @@ case class UnversionedMirrorArchive(
Storage.upload( Storage.upload(
localFile, localFile,
bucket, bucket,
batchMode,
UploadEventListener(localFile, index, syncTotals, totalBytesSoFar), UploadEventListener(localFile, index, syncTotals, totalBytesSoFar),
1) 1)
} }
object UnversionedMirrorArchive { object UnversionedMirrorArchive {
def default( def default(syncTotals: SyncTotals): Task[ThorpArchive] =
batchMode: Boolean,
syncTotals: SyncTotals
): Task[ThorpArchive] =
Task { Task {
new UnversionedMirrorArchive(batchMode, syncTotals) new UnversionedMirrorArchive(syncTotals)
} }
} }

View file

@ -2,7 +2,13 @@ package net.kemitix.thorp.core
import java.time.Instant import java.time.Instant
import net.kemitix.thorp.config._ import net.kemitix.thorp.config.{
Config,
ConfigOption,
ConfigOptions,
ConfigurationBuilder,
Resource
}
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToUpload} import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToUpload}
import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
@ -170,7 +176,7 @@ class ActionGeneratorSuite extends FunSpec {
def testProgram = def testProgram =
for { for {
config <- ConfigurationBuilder.buildConfig(configOptions) config <- ConfigurationBuilder.buildConfig(configOptions)
_ <- setConfiguration(config) _ <- Config.set(config)
actions <- ActionGenerator.createActions(input, previousActions) actions <- ActionGenerator.createActions(input, previousActions)
} yield actions } yield actions

View file

@ -22,7 +22,6 @@ case class DummyStorageService(s3ObjectData: S3ObjectsData,
override def upload(localFile: LocalFile, override def upload(localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener, uploadEventListener: UploadEventListener,
tryCount: Int): UIO[StorageQueueEvent] = { tryCount: Int): UIO[StorageQueueEvent] = {
val (remoteKey, md5Hash) = uploadFiles(localFile.file) val (remoteKey, md5Hash) = uploadFiles(localFile.file)

View file

@ -2,7 +2,13 @@ package net.kemitix.thorp.core
import java.nio.file.Paths import java.nio.file.Paths
import net.kemitix.thorp.config._ import net.kemitix.thorp.config.{
Config,
ConfigOption,
ConfigOptions,
ConfigurationBuilder,
Resource
}
import net.kemitix.thorp.console._ import net.kemitix.thorp.console._
import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
@ -73,7 +79,7 @@ class LocalFileStreamSuite extends FunSpec {
def testProgram = def testProgram =
for { for {
config <- ConfigurationBuilder.buildConfig(configOptions) config <- ConfigurationBuilder.buildConfig(configOptions)
_ <- setConfiguration(config) _ <- Config.set(config)
files <- LocalFileStream.findFiles(hashService)(sourcePath) files <- LocalFileStream.findFiles(hashService)(sourcePath)
} yield files } yield files

View file

@ -3,7 +3,12 @@ package net.kemitix.thorp.core
import java.io.File import java.io.File
import java.nio.file.Path import java.nio.file.Path
import net.kemitix.thorp.config._ import net.kemitix.thorp.config.{
Config,
ConfigOption,
ConfigOptions,
ConfigurationBuilder
}
import net.kemitix.thorp.console._ import net.kemitix.thorp.console._
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain.HashType.MD5
@ -378,7 +383,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
def testProgram = def testProgram =
for { for {
config <- ConfigurationBuilder.buildConfig(configOptions) config <- ConfigurationBuilder.buildConfig(configOptions)
_ <- setConfiguration(config) _ <- Config.set(config)
plan <- PlanBuilder.createPlan(hashService) plan <- PlanBuilder.createPlan(hashService)
} yield plan } yield plan

View file

@ -1,5 +1,6 @@
package net.kemitix.thorp.storage.api package net.kemitix.thorp.storage.api
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console.Console import net.kemitix.thorp.console.Console
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import zio.{Task, TaskR, UIO, ZIO} import zio.{Task, TaskR, UIO, ZIO}
@ -19,10 +20,9 @@ object Storage {
def upload( def upload(
localFile: LocalFile, localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener, uploadEventListener: UploadEventListener,
tryCount: Int tryCount: Int
): ZIO[Storage, Nothing, StorageQueueEvent] ): ZIO[Storage with Config, Nothing, StorageQueueEvent]
def copy( def copy(
bucket: Bucket, bucket: Bucket,
@ -57,7 +57,6 @@ object Storage {
override def upload( override def upload(
localFile: LocalFile, localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener, uploadEventListener: UploadEventListener,
tryCount: Int): ZIO[Storage, Nothing, StorageQueueEvent] = tryCount: Int): ZIO[Storage, Nothing, StorageQueueEvent] =
uploadResult uploadResult
@ -100,12 +99,11 @@ object Storage {
final def upload( final def upload(
localFile: LocalFile, localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener, uploadEventListener: UploadEventListener,
tryCount: Int tryCount: Int
): ZIO[Storage, Nothing, StorageQueueEvent] = ): ZIO[Storage with Config, Nothing, StorageQueueEvent] =
ZIO.accessM( ZIO.accessM(
_.storage upload (localFile, bucket, batchMode, uploadEventListener, tryCount)) _.storage upload (localFile, bucket, uploadEventListener, tryCount))
final def copy( final def copy(
bucket: Bucket, bucket: Bucket,

View file

@ -2,12 +2,13 @@ package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.AmazonS3ClientBuilder import com.amazonaws.services.s3.AmazonS3ClientBuilder
import com.amazonaws.services.s3.transfer.TransferManagerBuilder import com.amazonaws.services.s3.transfer.TransferManagerBuilder
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console.Console import net.kemitix.thorp.console.Console
import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.Storage import net.kemitix.thorp.storage.api.Storage
import net.kemitix.thorp.storage.api.Storage.Service import net.kemitix.thorp.storage.api.Storage.Service
import zio.{TaskR, UIO} import zio.{TaskR, UIO, ZIO}
object S3Storage { object S3Storage {
trait Live extends Storage { trait Live extends Storage {
@ -23,14 +24,13 @@ object S3Storage {
prefix: RemoteKey): TaskR[Console, S3ObjectsData] = prefix: RemoteKey): TaskR[Console, S3ObjectsData] =
Lister.listObjects(client)(bucket, prefix) Lister.listObjects(client)(bucket, prefix)
override def upload(localFile: LocalFile, override def upload(
bucket: Bucket, localFile: LocalFile,
batchMode: Boolean, bucket: Bucket,
uploadEventListener: UploadEventListener, uploadEventListener: UploadEventListener,
tryCount: Int): UIO[StorageQueueEvent] = tryCount: Int): ZIO[Config, Nothing, StorageQueueEvent] =
Uploader.upload(transferManager)(localFile, Uploader.upload(transferManager)(localFile,
bucket, bucket,
batchMode,
uploadEventListener, uploadEventListener,
1) 1)

View file

@ -2,6 +2,7 @@ package net.kemitix.thorp.storage.aws
import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener} import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener}
import com.amazonaws.services.s3.model.{ObjectMetadata, PutObjectRequest} import com.amazonaws.services.s3.model.{ObjectMetadata, PutObjectRequest}
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.domain.StorageQueueEvent.{ import net.kemitix.thorp.domain.StorageQueueEvent.{
Action, Action,
ErrorQueueEvent, ErrorQueueEvent,
@ -13,35 +14,38 @@ import net.kemitix.thorp.domain.UploadEvent.{
TransferEvent TransferEvent
} }
import net.kemitix.thorp.domain.{StorageQueueEvent, _} import net.kemitix.thorp.domain.{StorageQueueEvent, _}
import zio.{Task, UIO} import zio.{UIO, ZIO}
trait Uploader { trait Uploader {
def upload(transferManager: => AmazonTransferManager)( def upload(transferManager: => AmazonTransferManager)(
localFile: LocalFile, localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener, uploadEventListener: UploadEventListener,
tryCount: Int tryCount: Int
): UIO[StorageQueueEvent] = ): ZIO[Config, Nothing, StorageQueueEvent] =
transfer(transferManager)(localFile, bucket, batchMode, uploadEventListener) transfer(transferManager)(localFile, bucket, uploadEventListener)
.catchAll(handleError(localFile.remoteKey)) .catchAll(handleError(localFile.remoteKey))
private def handleError( private def handleError(remoteKey: RemoteKey)(e: Throwable) =
remoteKey: RemoteKey): Throwable => UIO[ErrorQueueEvent] = { e =>
UIO.succeed(ErrorQueueEvent(Action.Upload(remoteKey.key), remoteKey, e)) UIO.succeed(ErrorQueueEvent(Action.Upload(remoteKey.key), remoteKey, e))
}
private def transfer(transferManager: => AmazonTransferManager)( private def transfer(transferManager: => AmazonTransferManager)(
localFile: LocalFile, localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener uploadEventListener: UploadEventListener
): Task[StorageQueueEvent] = { ) = {
val listener = progressListener(uploadEventListener)
val listener = progressListener(uploadEventListener) for {
val putObjectRequest = request(localFile, bucket, batchMode, listener) putObjectRequest <- request(localFile, bucket, listener)
event <- dispatch(transferManager, putObjectRequest)
} yield event
}
private def dispatch(
transferManager: AmazonTransferManager,
putObjectRequest: PutObjectRequest
) = {
transferManager transferManager
.upload(putObjectRequest) .upload(putObjectRequest)
.map(_.waitForUploadResult) .map(_.waitForUploadResult)
@ -53,14 +57,16 @@ trait Uploader {
private def request( private def request(
localFile: LocalFile, localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
batchMode: Boolean,
listener: ProgressListener listener: ProgressListener
): PutObjectRequest = { ) = {
val request = val request =
new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file) new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file)
.withMetadata(metadata(localFile)) .withMetadata(metadata(localFile))
if (batchMode) request for {
else request.withGeneralProgressListener(listener) batchMode <- Config.batchMode
r = if (batchMode) request
else request.withGeneralProgressListener(listener)
} yield r
} }
private def metadata: LocalFile => ObjectMetadata = localFile => { private def metadata: LocalFile => ObjectMetadata = localFile => {

View file

@ -1,11 +1,12 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console.Console import net.kemitix.thorp.console.Console
import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.Storage import net.kemitix.thorp.storage.api.Storage
import org.scalamock.scalatest.MockFactory import org.scalamock.scalatest.MockFactory
import zio.{TaskR, UIO} import zio.{TaskR, UIO, ZIO}
trait AmazonS3ClientTestFixture extends MockFactory { trait AmazonS3ClientTestFixture extends MockFactory {
@ -27,14 +28,13 @@ trait AmazonS3ClientTestFixture extends MockFactory {
prefix: RemoteKey): TaskR[Console, S3ObjectsData] = prefix: RemoteKey): TaskR[Console, S3ObjectsData] =
Lister.listObjects(client)(bucket, prefix) Lister.listObjects(client)(bucket, prefix)
override def upload(localFile: LocalFile, override def upload(
bucket: Bucket, localFile: LocalFile,
batchMode: Boolean, bucket: Bucket,
uploadEventListener: UploadEventListener, uploadEventListener: UploadEventListener,
tryCount: Int): UIO[StorageQueueEvent] = tryCount: Int): ZIO[Config, Nothing, StorageQueueEvent] =
Uploader.upload(transferManager)(localFile, Uploader.upload(transferManager)(localFile,
bucket, bucket,
batchMode,
uploadEventListener, uploadEventListener,
1) 1)

View file

@ -5,8 +5,7 @@ import java.io.File
import com.amazonaws.SdkClientException import com.amazonaws.SdkClientException
import com.amazonaws.services.s3.model.AmazonS3Exception import com.amazonaws.services.s3.model.AmazonS3Exception
import com.amazonaws.services.s3.transfer.model.UploadResult import com.amazonaws.services.s3.transfer.model.UploadResult
import net.kemitix.thorp.config.Resource import net.kemitix.thorp.config.{Config, Resource}
import net.kemitix.thorp.console._
import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain.StorageQueueEvent.{ import net.kemitix.thorp.domain.StorageQueueEvent.{
Action, Action,
@ -16,13 +15,10 @@ import net.kemitix.thorp.domain.StorageQueueEvent.{
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import org.scalamock.scalatest.MockFactory import org.scalamock.scalatest.MockFactory
import org.scalatest.FreeSpec import org.scalatest.FreeSpec
import zio.internal.PlatformLive import zio.{DefaultRuntime, Task}
import zio.{Runtime, Task}
class UploaderTest extends FreeSpec with MockFactory { class UploaderTest extends FreeSpec with MockFactory {
private val runtime = Runtime(Console.Live, PlatformLive.Default)
"upload" - { "upload" - {
val aSource: File = Resource(this, "") val aSource: File = Resource(this, "")
val aFile: File = Resource(this, "small-file") val aFile: File = Resource(this, "small-file")
@ -31,7 +27,6 @@ class UploaderTest extends FreeSpec with MockFactory {
val remoteKey = RemoteKey("aRemoteKey") val remoteKey = RemoteKey("aRemoteKey")
val localFile = LocalFile(aFile, aSource, hashes, remoteKey) val localFile = LocalFile(aFile, aSource, hashes, remoteKey)
val bucket = Bucket("aBucket") val bucket = Bucket("aBucket")
val batchMode = false
val tryCount = 1 val tryCount = 1
val uploadResult = new UploadResult val uploadResult = new UploadResult
uploadResult.setKey(remoteKey.key) uploadResult.setKey(remoteKey.key)
@ -52,7 +47,6 @@ class UploaderTest extends FreeSpec with MockFactory {
invoke(fixture.amazonS3TransferManager)( invoke(fixture.amazonS3TransferManager)(
localFile, localFile,
bucket, bucket,
batchMode,
uploadEventListener, uploadEventListener,
tryCount tryCount
) )
@ -72,7 +66,6 @@ class UploaderTest extends FreeSpec with MockFactory {
invoke(fixture.amazonS3TransferManager)( invoke(fixture.amazonS3TransferManager)(
localFile, localFile,
bucket, bucket,
batchMode,
uploadEventListener, uploadEventListener,
tryCount tryCount
) )
@ -92,7 +85,6 @@ class UploaderTest extends FreeSpec with MockFactory {
invoke(fixture.amazonS3TransferManager)( invoke(fixture.amazonS3TransferManager)(
localFile, localFile,
bucket, bucket,
batchMode,
uploadEventListener, uploadEventListener,
tryCount tryCount
) )
@ -102,19 +94,22 @@ class UploaderTest extends FreeSpec with MockFactory {
def invoke(transferManager: AmazonTransferManager)( def invoke(transferManager: AmazonTransferManager)(
localFile: LocalFile, localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener, uploadEventListener: UploadEventListener,
tryCount: Int tryCount: Int
) = ) = {
runtime.unsafeRunSync { type TestEnv = Config
Uploader.upload(transferManager)( val testEnv: TestEnv = Config.Live
localFile, new DefaultRuntime {}.unsafeRunSync {
bucket, Uploader
batchMode, .upload(transferManager)(
uploadEventListener, localFile,
tryCount bucket,
) uploadEventListener,
tryCount
)
.provide(testEnv)
}.toEither }.toEither
}
} }
} }