diff --git a/build.sbt b/build.sbt index d7e9b96..9483fe8 100644 --- a/build.sbt +++ b/build.sbt @@ -58,8 +58,8 @@ val zioDependencies = Seq( ) // cli -> thorp-lib -> storage-aws -> core -> storage-api -> console -> domain -// core -> config -> domain -// config -> filesystem +// storage-api -> config -> domain +// config -> filesystem lazy val thorp = (project in file(".")) .settings(commonSettings) @@ -102,13 +102,13 @@ lazy val core = (project in file("core")) .settings(testDependencies) .dependsOn(`storage-api`) .dependsOn(domain % "compile->compile;test->test") - .dependsOn(config) lazy val `storage-api` = (project in file("storage-api")) .settings(commonSettings) .settings(zioDependencies) .settings(assemblyJarName in assembly := "storage-api.jar") .dependsOn(console) + .dependsOn(config) lazy val console = (project in file("console")) .settings(commonSettings) diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala index d1e9e83..4ac6f47 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala @@ -16,7 +16,7 @@ trait Program { for { cli <- CliArgs.parse(args) config <- ConfigurationBuilder.buildConfig(cli) - _ <- setConfiguration(config) + _ <- Config.set(config) _ <- ZIO.when(showVersion(cli))(Console.putStrLn(version)) _ <- ZIO.when(!showVersion(cli))(execute.catchAll(handleErrors)) } yield () @@ -27,11 +27,10 @@ trait Program { private def execute = { for { - plan <- PlanBuilder.createPlan(defaultHashService) - batchMode <- isBatchMode - archive <- UnversionedMirrorArchive.default(batchMode, plan.syncTotals) - events <- applyPlan(archive, plan) - _ <- SyncLogging.logRunFinished(events) + plan <- PlanBuilder.createPlan(defaultHashService) + archive <- UnversionedMirrorArchive.default(plan.syncTotals) + events <- applyPlan(archive, plan) + _ <- SyncLogging.logRunFinished(events) } yield () } diff --git a/config/src/main/scala/net/kemitix/thorp/config/Config.scala b/config/src/main/scala/net/kemitix/thorp/config/Config.scala index 19f7722..e998924 100644 --- a/config/src/main/scala/net/kemitix/thorp/config/Config.scala +++ b/config/src/main/scala/net/kemitix/thorp/config/Config.scala @@ -47,4 +47,21 @@ object Config { object Live extends Live + final def set(config: Configuration): ZIO[Config, Nothing, Unit] = + ZIO.accessM(_.config setConfiguration config) + + final def batchMode: ZIO[Config, Nothing, Boolean] = + ZIO.accessM(_.config isBatchMode) + + final def bucket: ZIO[Config, Nothing, Bucket] = + ZIO.accessM(_.config bucket) + + final def prefix: ZIO[Config, Nothing, RemoteKey] = + ZIO.accessM(_.config prefix) + + final def sources: ZIO[Config, Nothing, Sources] = + ZIO.accessM(_.config sources) + + final def filters: ZIO[Config, Nothing, List[Filter]] = + ZIO.accessM(_.config filters) } diff --git a/config/src/main/scala/net/kemitix/thorp/config/package.scala b/config/src/main/scala/net/kemitix/thorp/config/package.scala deleted file mode 100644 index ca38a9c..0000000 --- a/config/src/main/scala/net/kemitix/thorp/config/package.scala +++ /dev/null @@ -1,29 +0,0 @@ -package net.kemitix.thorp - -import net.kemitix.thorp.domain.{Bucket, Filter, RemoteKey, Sources} -import zio.ZIO - -package object config { - - final val configService: ZIO[Config, Nothing, Config.Service] = - ZIO.access(_.config) - - final def setConfiguration( - config: Configuration): ZIO[Config, Nothing, Unit] = - ZIO.accessM(_.config setConfiguration config) - - final def isBatchMode: ZIO[Config, Nothing, Boolean] = - ZIO.accessM(_.config isBatchMode) - - final def getBucket: ZIO[Config, Nothing, Bucket] = - ZIO.accessM(_.config bucket) - - final def getPrefix: ZIO[Config, Nothing, RemoteKey] = - ZIO.accessM(_.config prefix) - - final def getSources: ZIO[Config, Nothing, Sources] = - ZIO.accessM(_.config sources) - - final def getFilters: ZIO[Config, Nothing, List[Filter]] = - ZIO.accessM(_.config filters) -} diff --git a/core/src/main/scala/net/kemitix/thorp/core/ActionGenerator.scala b/core/src/main/scala/net/kemitix/thorp/core/ActionGenerator.scala index c885139..b3db88c 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ActionGenerator.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ActionGenerator.scala @@ -1,6 +1,6 @@ package net.kemitix.thorp.core -import net.kemitix.thorp.config._ +import net.kemitix.thorp.config.Config import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToUpload} import net.kemitix.thorp.domain._ import zio.ZIO @@ -12,7 +12,7 @@ object ActionGenerator { previousActions: Stream[Action] ): ZIO[Config, Nothing, Stream[Action]] = for { - bucket <- getBucket + bucket <- Config.bucket } yield s3MetaData match { // #1 local exists, remote exists, remote matches - do nothing diff --git a/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala b/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala index e19904a..900745d 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala @@ -2,7 +2,7 @@ package net.kemitix.thorp.core import java.nio.file.Path -import net.kemitix.thorp.config._ +import net.kemitix.thorp.config.Config import net.kemitix.thorp.core.KeyGenerator.generateKey import net.kemitix.thorp.domain._ import net.kemitix.thorp.filesystem.FileSystem @@ -56,8 +56,8 @@ object LocalFileStream { private def localFile(hashService: HashService)(path: Path) = { val file = path.toFile for { - sources <- getSources - prefix <- getPrefix + sources <- Config.sources + prefix <- Config.prefix hash <- hashService.hashLocalObject(path) localFile = LocalFile(file, sources.forPath(path).toFile, @@ -78,7 +78,7 @@ object LocalFileStream { private def isIncluded(path: Path) = for { - filters <- getFilters + filters <- Config.filters } yield Filter.isIncluded(filters)(path) private def pathToLocalFile(hashService: HashService)(path: Path) = diff --git a/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala b/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala index d3ee826..d613fa1 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala @@ -1,6 +1,6 @@ package net.kemitix.thorp.core -import net.kemitix.thorp.config._ +import net.kemitix.thorp.config.Config import net.kemitix.thorp.console._ import net.kemitix.thorp.core.Action._ import net.kemitix.thorp.domain._ @@ -72,9 +72,9 @@ trait PlanBuilder { private def createActionFromRemoteKey(remoteKey: RemoteKey) = for { - bucket <- getBucket - prefix <- getPrefix - sources <- getSources + bucket <- Config.bucket + prefix <- Config.prefix + sources <- Config.sources needsDeleted <- Remote.isMissingLocally(sources, prefix)(remoteKey) } yield if (needsDeleted) ToDelete(bucket, remoteKey, 0L) @@ -88,8 +88,8 @@ trait PlanBuilder { private def fetchRemoteData = for { - bucket <- getBucket - prefix <- getPrefix + bucket <- Config.bucket + prefix <- Config.prefix objects <- Storage.list(bucket, prefix) } yield objects @@ -101,7 +101,7 @@ trait PlanBuilder { private def findFiles(hashService: HashService) = for { - sources <- getSources + sources <- Config.sources paths = sources.paths found <- ZIO.foreach(paths)(path => LocalFileStream.findFiles(hashService)(path)) diff --git a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala b/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala index 44f0300..bad5433 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala @@ -1,6 +1,6 @@ package net.kemitix.thorp.core -import net.kemitix.thorp.config._ +import net.kemitix.thorp.config.Config import net.kemitix.thorp.console._ import net.kemitix.thorp.domain.StorageQueueEvent.{ CopyQueueEvent, @@ -16,15 +16,15 @@ trait SyncLogging { def logRunStart: ZIO[Console with Config, Nothing, Unit] = for { - bucket <- getBucket - prefix <- getPrefix - sources <- getSources + bucket <- Config.bucket + prefix <- Config.prefix + sources <- Config.sources _ <- Console.putMessageLn(ConsoleOut.ValidConfig(bucket, prefix, sources)) } yield () def logFileScan: ZIO[Config with Console, Nothing, Unit] = for { - sources <- getSources + sources <- Config.sources _ <- Console.putStrLn( s"Scanning local files: ${sources.paths.mkString(", ")}...") } yield () diff --git a/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala b/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala index 03d0119..faaaecf 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ThorpArchive.scala @@ -1,5 +1,6 @@ package net.kemitix.thorp.core +import net.kemitix.thorp.config.Config import net.kemitix.thorp.console._ import net.kemitix.thorp.domain.StorageQueueEvent import net.kemitix.thorp.domain.StorageQueueEvent.{ @@ -22,15 +23,15 @@ trait ThorpArchive { index: Int, action: Action, totalBytesSoFar: Long - ): TaskR[Storage with Console, StorageQueueEvent] + ): TaskR[Storage with Console with Config, StorageQueueEvent] def logEvent( - event: StorageQueueEvent, - batchMode: Boolean - ): TaskR[Console, Unit] = + event: StorageQueueEvent + ): TaskR[Console with Config, Unit] = event match { case UploadQueueEvent(remoteKey, _) => for { + batchMode <- Config.batchMode _ <- TaskR.when(batchMode)( Console.putStrLn(s"Uploaded: ${remoteKey.key}")) _ <- TaskR.when(!batchMode)( @@ -39,6 +40,7 @@ trait ThorpArchive { } yield () case CopyQueueEvent(sourceKey, targetKey) => for { + batchMode <- Config.batchMode _ <- TaskR.when(batchMode)( Console.putStrLn(s"Copied: ${sourceKey.key} => ${targetKey.key}")) _ <- TaskR.when(!batchMode)( @@ -48,13 +50,15 @@ trait ThorpArchive { } yield () case DeleteQueueEvent(remoteKey) => for { - _ <- TaskR.when(batchMode)(Console.putStrLn(s"Deleted: $remoteKey")) + batchMode <- Config.batchMode + _ <- TaskR.when(batchMode)(Console.putStrLn(s"Deleted: $remoteKey")) _ <- TaskR.when(!batchMode)( Console.putStrLn( s"${GREEN}Deleted:$RESET ${remoteKey.key}$eraseToEndOfScreen")) } yield () case ErrorQueueEvent(action, _, e) => for { + batchMode <- Config.batchMode _ <- TaskR.when(batchMode)( Console.putStrLn( s"${action.name} failed: ${action.keys}: ${e.getMessage}")) diff --git a/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala b/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala index 5845fb6..ab9f7ae 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/UnversionedMirrorArchive.scala @@ -1,5 +1,6 @@ package net.kemitix.thorp.core +import net.kemitix.thorp.config.Config import net.kemitix.thorp.console._ import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent @@ -7,31 +8,29 @@ import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.api.Storage import zio.{Task, TaskR} -case class UnversionedMirrorArchive( - batchMode: Boolean, - syncTotals: SyncTotals -) extends ThorpArchive { +case class UnversionedMirrorArchive(syncTotals: SyncTotals) + extends ThorpArchive { override def update( index: Int, action: Action, totalBytesSoFar: Long - ): TaskR[Storage with Console, StorageQueueEvent] = + ): TaskR[Storage with Console with Config, StorageQueueEvent] = action match { case ToUpload(bucket, localFile, _) => for { event <- doUpload(index, totalBytesSoFar, bucket, localFile) - _ <- logEvent(event, batchMode) + _ <- logEvent(event) } yield event case ToCopy(bucket, sourceKey, hash, targetKey, _) => for { event <- Storage.copy(bucket, sourceKey, hash, targetKey) - _ <- logEvent(event, batchMode) + _ <- logEvent(event) } yield event case ToDelete(bucket, remoteKey, _) => for { event <- Storage.delete(bucket, remoteKey) - _ <- logEvent(event, batchMode) + _ <- logEvent(event) } yield event case DoNothing(_, remoteKey, _) => Task(DoNothingQueueEvent(remoteKey)) @@ -46,17 +45,13 @@ case class UnversionedMirrorArchive( Storage.upload( localFile, bucket, - batchMode, UploadEventListener(localFile, index, syncTotals, totalBytesSoFar), 1) } object UnversionedMirrorArchive { - def default( - batchMode: Boolean, - syncTotals: SyncTotals - ): Task[ThorpArchive] = + def default(syncTotals: SyncTotals): Task[ThorpArchive] = Task { - new UnversionedMirrorArchive(batchMode, syncTotals) + new UnversionedMirrorArchive(syncTotals) } } diff --git a/core/src/test/scala/net/kemitix/thorp/core/ActionGeneratorSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/ActionGeneratorSuite.scala index 813560a..4e97e2d 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/ActionGeneratorSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/ActionGeneratorSuite.scala @@ -2,7 +2,13 @@ package net.kemitix.thorp.core import java.time.Instant -import net.kemitix.thorp.config._ +import net.kemitix.thorp.config.{ + Config, + ConfigOption, + ConfigOptions, + ConfigurationBuilder, + Resource +} import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToUpload} import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain._ @@ -170,7 +176,7 @@ class ActionGeneratorSuite extends FunSpec { def testProgram = for { config <- ConfigurationBuilder.buildConfig(configOptions) - _ <- setConfiguration(config) + _ <- Config.set(config) actions <- ActionGenerator.createActions(input, previousActions) } yield actions diff --git a/core/src/test/scala/net/kemitix/thorp/core/DummyStorageService.scala b/core/src/test/scala/net/kemitix/thorp/core/DummyStorageService.scala index b59e62e..b2aeed6 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/DummyStorageService.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/DummyStorageService.scala @@ -22,7 +22,6 @@ case class DummyStorageService(s3ObjectData: S3ObjectsData, override def upload(localFile: LocalFile, bucket: Bucket, - batchMode: Boolean, uploadEventListener: UploadEventListener, tryCount: Int): UIO[StorageQueueEvent] = { val (remoteKey, md5Hash) = uploadFiles(localFile.file) diff --git a/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala index a9fcd31..a92d419 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala @@ -2,7 +2,13 @@ package net.kemitix.thorp.core import java.nio.file.Paths -import net.kemitix.thorp.config._ +import net.kemitix.thorp.config.{ + Config, + ConfigOption, + ConfigOptions, + ConfigurationBuilder, + Resource +} import net.kemitix.thorp.console._ import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain._ @@ -73,7 +79,7 @@ class LocalFileStreamSuite extends FunSpec { def testProgram = for { config <- ConfigurationBuilder.buildConfig(configOptions) - _ <- setConfiguration(config) + _ <- Config.set(config) files <- LocalFileStream.findFiles(hashService)(sourcePath) } yield files diff --git a/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala b/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala index 8b9182e..650e855 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala @@ -3,7 +3,12 @@ package net.kemitix.thorp.core import java.io.File import java.nio.file.Path -import net.kemitix.thorp.config._ +import net.kemitix.thorp.config.{ + Config, + ConfigOption, + ConfigOptions, + ConfigurationBuilder +} import net.kemitix.thorp.console._ import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.domain.HashType.MD5 @@ -378,7 +383,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder { def testProgram = for { config <- ConfigurationBuilder.buildConfig(configOptions) - _ <- setConfiguration(config) + _ <- Config.set(config) plan <- PlanBuilder.createPlan(hashService) } yield plan diff --git a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/Storage.scala b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/Storage.scala index feffa12..ec388f7 100644 --- a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/Storage.scala +++ b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/Storage.scala @@ -1,5 +1,6 @@ package net.kemitix.thorp.storage.api +import net.kemitix.thorp.config.Config import net.kemitix.thorp.console.Console import net.kemitix.thorp.domain._ import zio.{Task, TaskR, UIO, ZIO} @@ -19,10 +20,9 @@ object Storage { def upload( localFile: LocalFile, bucket: Bucket, - batchMode: Boolean, uploadEventListener: UploadEventListener, tryCount: Int - ): ZIO[Storage, Nothing, StorageQueueEvent] + ): ZIO[Storage with Config, Nothing, StorageQueueEvent] def copy( bucket: Bucket, @@ -57,7 +57,6 @@ object Storage { override def upload( localFile: LocalFile, bucket: Bucket, - batchMode: Boolean, uploadEventListener: UploadEventListener, tryCount: Int): ZIO[Storage, Nothing, StorageQueueEvent] = uploadResult @@ -100,12 +99,11 @@ object Storage { final def upload( localFile: LocalFile, bucket: Bucket, - batchMode: Boolean, uploadEventListener: UploadEventListener, tryCount: Int - ): ZIO[Storage, Nothing, StorageQueueEvent] = + ): ZIO[Storage with Config, Nothing, StorageQueueEvent] = ZIO.accessM( - _.storage upload (localFile, bucket, batchMode, uploadEventListener, tryCount)) + _.storage upload (localFile, bucket, uploadEventListener, tryCount)) final def copy( bucket: Bucket, diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala index 0f245fd..b89b4ef 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3Storage.scala @@ -2,12 +2,13 @@ package net.kemitix.thorp.storage.aws import com.amazonaws.services.s3.AmazonS3ClientBuilder import com.amazonaws.services.s3.transfer.TransferManagerBuilder +import net.kemitix.thorp.config.Config import net.kemitix.thorp.console.Console import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.api.Storage import net.kemitix.thorp.storage.api.Storage.Service -import zio.{TaskR, UIO} +import zio.{TaskR, UIO, ZIO} object S3Storage { trait Live extends Storage { @@ -23,14 +24,13 @@ object S3Storage { prefix: RemoteKey): TaskR[Console, S3ObjectsData] = Lister.listObjects(client)(bucket, prefix) - override def upload(localFile: LocalFile, - bucket: Bucket, - batchMode: Boolean, - uploadEventListener: UploadEventListener, - tryCount: Int): UIO[StorageQueueEvent] = + override def upload( + localFile: LocalFile, + bucket: Bucket, + uploadEventListener: UploadEventListener, + tryCount: Int): ZIO[Config, Nothing, StorageQueueEvent] = Uploader.upload(transferManager)(localFile, bucket, - batchMode, uploadEventListener, 1) diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala index b1f5a1a..6bd1589 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala @@ -2,6 +2,7 @@ package net.kemitix.thorp.storage.aws import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener} import com.amazonaws.services.s3.model.{ObjectMetadata, PutObjectRequest} +import net.kemitix.thorp.config.Config import net.kemitix.thorp.domain.StorageQueueEvent.{ Action, ErrorQueueEvent, @@ -13,35 +14,38 @@ import net.kemitix.thorp.domain.UploadEvent.{ TransferEvent } import net.kemitix.thorp.domain.{StorageQueueEvent, _} -import zio.{Task, UIO} +import zio.{UIO, ZIO} trait Uploader { def upload(transferManager: => AmazonTransferManager)( localFile: LocalFile, bucket: Bucket, - batchMode: Boolean, uploadEventListener: UploadEventListener, tryCount: Int - ): UIO[StorageQueueEvent] = - transfer(transferManager)(localFile, bucket, batchMode, uploadEventListener) + ): ZIO[Config, Nothing, StorageQueueEvent] = + transfer(transferManager)(localFile, bucket, uploadEventListener) .catchAll(handleError(localFile.remoteKey)) - private def handleError( - remoteKey: RemoteKey): Throwable => UIO[ErrorQueueEvent] = { e => + private def handleError(remoteKey: RemoteKey)(e: Throwable) = UIO.succeed(ErrorQueueEvent(Action.Upload(remoteKey.key), remoteKey, e)) - } private def transfer(transferManager: => AmazonTransferManager)( localFile: LocalFile, bucket: Bucket, - batchMode: Boolean, uploadEventListener: UploadEventListener - ): Task[StorageQueueEvent] = { - - val listener = progressListener(uploadEventListener) - val putObjectRequest = request(localFile, bucket, batchMode, listener) + ) = { + val listener = progressListener(uploadEventListener) + for { + putObjectRequest <- request(localFile, bucket, listener) + event <- dispatch(transferManager, putObjectRequest) + } yield event + } + private def dispatch( + transferManager: AmazonTransferManager, + putObjectRequest: PutObjectRequest + ) = { transferManager .upload(putObjectRequest) .map(_.waitForUploadResult) @@ -53,14 +57,16 @@ trait Uploader { private def request( localFile: LocalFile, bucket: Bucket, - batchMode: Boolean, listener: ProgressListener - ): PutObjectRequest = { + ) = { val request = new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file) .withMetadata(metadata(localFile)) - if (batchMode) request - else request.withGeneralProgressListener(listener) + for { + batchMode <- Config.batchMode + r = if (batchMode) request + else request.withGeneralProgressListener(listener) + } yield r } private def metadata: LocalFile => ObjectMetadata = localFile => { diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala index dafea77..1fca764 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/AmazonS3ClientTestFixture.scala @@ -1,11 +1,12 @@ package net.kemitix.thorp.storage.aws +import net.kemitix.thorp.config.Config import net.kemitix.thorp.console.Console import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.api.Storage import org.scalamock.scalatest.MockFactory -import zio.{TaskR, UIO} +import zio.{TaskR, UIO, ZIO} trait AmazonS3ClientTestFixture extends MockFactory { @@ -27,14 +28,13 @@ trait AmazonS3ClientTestFixture extends MockFactory { prefix: RemoteKey): TaskR[Console, S3ObjectsData] = Lister.listObjects(client)(bucket, prefix) - override def upload(localFile: LocalFile, - bucket: Bucket, - batchMode: Boolean, - uploadEventListener: UploadEventListener, - tryCount: Int): UIO[StorageQueueEvent] = + override def upload( + localFile: LocalFile, + bucket: Bucket, + uploadEventListener: UploadEventListener, + tryCount: Int): ZIO[Config, Nothing, StorageQueueEvent] = Uploader.upload(transferManager)(localFile, bucket, - batchMode, uploadEventListener, 1) diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderTest.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderTest.scala index aa68d03..fa039f1 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderTest.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderTest.scala @@ -5,8 +5,7 @@ import java.io.File import com.amazonaws.SdkClientException import com.amazonaws.services.s3.model.AmazonS3Exception import com.amazonaws.services.s3.transfer.model.UploadResult -import net.kemitix.thorp.config.Resource -import net.kemitix.thorp.console._ +import net.kemitix.thorp.config.{Config, Resource} import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain.StorageQueueEvent.{ Action, @@ -16,13 +15,10 @@ import net.kemitix.thorp.domain.StorageQueueEvent.{ import net.kemitix.thorp.domain._ import org.scalamock.scalatest.MockFactory import org.scalatest.FreeSpec -import zio.internal.PlatformLive -import zio.{Runtime, Task} +import zio.{DefaultRuntime, Task} class UploaderTest extends FreeSpec with MockFactory { - private val runtime = Runtime(Console.Live, PlatformLive.Default) - "upload" - { val aSource: File = Resource(this, "") val aFile: File = Resource(this, "small-file") @@ -31,7 +27,6 @@ class UploaderTest extends FreeSpec with MockFactory { val remoteKey = RemoteKey("aRemoteKey") val localFile = LocalFile(aFile, aSource, hashes, remoteKey) val bucket = Bucket("aBucket") - val batchMode = false val tryCount = 1 val uploadResult = new UploadResult uploadResult.setKey(remoteKey.key) @@ -52,7 +47,6 @@ class UploaderTest extends FreeSpec with MockFactory { invoke(fixture.amazonS3TransferManager)( localFile, bucket, - batchMode, uploadEventListener, tryCount ) @@ -72,7 +66,6 @@ class UploaderTest extends FreeSpec with MockFactory { invoke(fixture.amazonS3TransferManager)( localFile, bucket, - batchMode, uploadEventListener, tryCount ) @@ -92,7 +85,6 @@ class UploaderTest extends FreeSpec with MockFactory { invoke(fixture.amazonS3TransferManager)( localFile, bucket, - batchMode, uploadEventListener, tryCount ) @@ -102,19 +94,22 @@ class UploaderTest extends FreeSpec with MockFactory { def invoke(transferManager: AmazonTransferManager)( localFile: LocalFile, bucket: Bucket, - batchMode: Boolean, uploadEventListener: UploadEventListener, tryCount: Int - ) = - runtime.unsafeRunSync { - Uploader.upload(transferManager)( - localFile, - bucket, - batchMode, - uploadEventListener, - tryCount - ) + ) = { + type TestEnv = Config + val testEnv: TestEnv = Config.Live + new DefaultRuntime {}.unsafeRunSync { + Uploader + .upload(transferManager)( + localFile, + bucket, + uploadEventListener, + tryCount + ) + .provide(testEnv) }.toEither + } } }