diff --git a/aws-api/src/main/scala/net/kemitix/thorp/aws/api/S3Client.scala b/aws-api/src/main/scala/net/kemitix/thorp/aws/api/S3Client.scala index c38e43c..497c93e 100644 --- a/aws-api/src/main/scala/net/kemitix/thorp/aws/api/S3Client.scala +++ b/aws-api/src/main/scala/net/kemitix/thorp/aws/api/S3Client.scala @@ -1,13 +1,14 @@ package net.kemitix.thorp.aws.api +import cats.effect.IO import net.kemitix.thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action} import net.kemitix.thorp.domain._ -trait S3Client[M[_]] { +trait S3Client { def listObjects(bucket: Bucket, prefix: RemoteKey - )(implicit logger: Logger[M]): M[S3ObjectsData] + )(implicit logger: Logger): IO[S3ObjectsData] def upload(localFile: LocalFile, bucket: Bucket, @@ -15,16 +16,16 @@ trait S3Client[M[_]] { multiPartThreshold: Long, tryCount: Int, maxRetries: Int) - (implicit logger: Logger[M]): M[S3Action] + (implicit logger: Logger): IO[S3Action] def copy(bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey - )(implicit logger: Logger[M]): M[CopyS3Action] + )(implicit logger: Logger): IO[CopyS3Action] def delete(bucket: Bucket, remoteKey: RemoteKey - )(implicit logger: Logger[M]): M[DeleteS3Action] + )(implicit logger: Logger): IO[DeleteS3Action] } diff --git a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientBuilder.scala b/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientBuilder.scala index ca75ef8..077776a 100644 --- a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientBuilder.scala +++ b/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientBuilder.scala @@ -1,17 +1,16 @@ package net.kemitix.thorp.aws.lib -import cats.Monad import com.amazonaws.services.s3.transfer.{TransferManager, TransferManagerBuilder} import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder} import net.kemitix.thorp.aws.api.S3Client object S3ClientBuilder { - def createClient[M[_]: Monad](amazonS3Client: AmazonS3, - amazonS3TransferManager: TransferManager): S3Client[M] = + def createClient(amazonS3Client: AmazonS3, + amazonS3TransferManager: TransferManager): S3Client = new ThorpS3Client(amazonS3Client, amazonS3TransferManager) - def defaultClient[M[_]: Monad]: S3Client[M] = + def defaultClient: S3Client = createClient(AmazonS3ClientBuilder.defaultClient, TransferManagerBuilder.defaultTransferManager) } diff --git a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientCopier.scala b/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientCopier.scala index b3662be..df8ab38 100644 --- a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientCopier.scala +++ b/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientCopier.scala @@ -1,24 +1,23 @@ package net.kemitix.thorp.aws.lib -import cats.Monad -import cats.implicits._ +import cats.effect.IO import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.CopyObjectRequest import net.kemitix.thorp.aws.api.S3Action.CopyS3Action import net.kemitix.thorp.aws.lib.S3ClientLogging.{logCopyFinish, logCopyStart} import net.kemitix.thorp.domain.{Bucket, Logger, MD5Hash, RemoteKey} -class S3ClientCopier[M[_]: Monad](amazonS3: AmazonS3) { +class S3ClientCopier(amazonS3: AmazonS3) { def copy(bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey) - (implicit logger: Logger[M]): M[CopyS3Action] = + (implicit logger: Logger): IO[CopyS3Action] = for { - _ <- logCopyStart[M](bucket, sourceKey, targetKey) + _ <- logCopyStart(bucket, sourceKey, targetKey) _ <- copyObject(bucket, sourceKey, hash, targetKey) - _ <- logCopyFinish[M](bucket, sourceKey,targetKey) + _ <- logCopyFinish(bucket, sourceKey,targetKey) } yield CopyS3Action(targetKey) private def copyObject(bucket: Bucket, @@ -28,7 +27,7 @@ class S3ClientCopier[M[_]: Monad](amazonS3: AmazonS3) { val request = new CopyObjectRequest(bucket.name, sourceKey.key, bucket.name, targetKey.key) .withMatchingETagConstraint(hash.hash) - Monad[M].pure(amazonS3.copyObject(request)) + IO(amazonS3.copyObject(request)) } } diff --git a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientDeleter.scala b/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientDeleter.scala index 59c7877..68ed98b 100644 --- a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientDeleter.scala +++ b/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientDeleter.scala @@ -1,25 +1,24 @@ package net.kemitix.thorp.aws.lib -import cats.Monad -import cats.implicits._ +import cats.effect.IO import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.DeleteObjectRequest import net.kemitix.thorp.aws.api.S3Action.DeleteS3Action import net.kemitix.thorp.aws.lib.S3ClientLogging.{logDeleteFinish, logDeleteStart} import net.kemitix.thorp.domain.{Bucket, Logger, RemoteKey} -class S3ClientDeleter[M[_]: Monad](amazonS3: AmazonS3) { +class S3ClientDeleter(amazonS3: AmazonS3) { def delete(bucket: Bucket, remoteKey: RemoteKey) - (implicit logger: Logger[M]): M[DeleteS3Action] = + (implicit logger: Logger): IO[DeleteS3Action] = for { - _ <- logDeleteStart[M](bucket, remoteKey) + _ <- logDeleteStart(bucket, remoteKey) _ <- deleteObject(bucket, remoteKey) - _ <- logDeleteFinish[M](bucket, remoteKey) + _ <- logDeleteFinish(bucket, remoteKey) } yield DeleteS3Action(remoteKey) - private def deleteObject(bucket: Bucket, remoteKey: RemoteKey) = Monad[M].pure { - amazonS3.deleteObject(new DeleteObjectRequest(bucket.name, remoteKey.key)) - } + private def deleteObject(bucket: Bucket, remoteKey: RemoteKey) = + IO(amazonS3.deleteObject(new DeleteObjectRequest(bucket.name, remoteKey.key))) + } diff --git a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientLogging.scala b/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientLogging.scala index a7a9c63..7ada90e 100644 --- a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientLogging.scala +++ b/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientLogging.scala @@ -1,40 +1,40 @@ package net.kemitix.thorp.aws.lib -import cats.Monad +import cats.effect.IO import net.kemitix.thorp.domain.{Bucket, Logger, RemoteKey} object S3ClientLogging { - def logListObjectsStart[M[_]: Monad](bucket: Bucket, + def logListObjectsStart(bucket: Bucket, prefix: RemoteKey) - (implicit logger: Logger[M]): M[Unit] = + (implicit logger: Logger): IO[Unit] = logger.info(s"Fetch S3 Summary: ${bucket.name}:${prefix.key}") - def logListObjectsFinish[M[_]: Monad](bucket: Bucket, + def logListObjectsFinish(bucket: Bucket, prefix: RemoteKey) - (implicit logger: Logger[M]): M[Unit] = + (implicit logger: Logger): IO[Unit] = logger.info(s"Fetched S3 Summary: ${bucket.name}:${prefix.key}") - def logCopyStart[M[_]: Monad](bucket: Bucket, - sourceKey: RemoteKey, - targetKey: RemoteKey) - (implicit logger: Logger[M]): M[Unit] = + def logCopyStart(bucket: Bucket, + sourceKey: RemoteKey, + targetKey: RemoteKey) + (implicit logger: Logger): IO[Unit] = logger.info(s"Copy: ${bucket.name}:${sourceKey.key} => ${targetKey.key}") - def logCopyFinish[M[_]: Monad](bucket: Bucket, + def logCopyFinish(bucket: Bucket, sourceKey: RemoteKey, targetKey: RemoteKey) - (implicit logger: Logger[M]): M[Unit] = + (implicit logger: Logger): IO[Unit] = logger.info(s"Copied: ${bucket.name}:${sourceKey.key} => ${targetKey.key}") - def logDeleteStart[M[_]: Monad](bucket: Bucket, + def logDeleteStart(bucket: Bucket, remoteKey: RemoteKey) - (implicit logger: Logger[M]): M[Unit] = + (implicit logger: Logger): IO[Unit] = logger.info(s"Delete: ${bucket.name}:${remoteKey.key}") - def logDeleteFinish[M[_]: Monad](bucket: Bucket, + def logDeleteFinish(bucket: Bucket, remoteKey: RemoteKey) - (implicit logger: Logger[M]): M[Unit] = + (implicit logger: Logger): IO[Unit] = logger.info(s"Deleted: ${bucket.name}:${remoteKey.key}") } diff --git a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientObjectLister.scala b/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientObjectLister.scala index c58a0cb..cc54677 100644 --- a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientObjectLister.scala +++ b/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/S3ClientObjectLister.scala @@ -1,7 +1,6 @@ package net.kemitix.thorp.aws.lib -import cats.Monad -import cats.implicits._ +import cats.effect.IO import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary} import net.kemitix.thorp.aws.lib.S3ClientLogging.{logListObjectsFinish, logListObjectsStart} @@ -12,11 +11,11 @@ import net.kemitix.thorp.domain.{Bucket, Logger, RemoteKey, S3ObjectsData} import scala.collection.JavaConverters._ -class S3ClientObjectLister[M[_]: Monad](amazonS3: AmazonS3) { +class S3ClientObjectLister(amazonS3: AmazonS3) { def listObjects(bucket: Bucket, prefix: RemoteKey) - (implicit logger: Logger[M]): M[S3ObjectsData] = { + (implicit logger: Logger): IO[S3ObjectsData] = { type Token = String type Batch = (Stream[S3ObjectSummary], Option[Token]) @@ -26,8 +25,8 @@ class S3ClientObjectLister[M[_]: Monad](amazonS3: AmazonS3) { .withPrefix(prefix.key) .withContinuationToken(token) - def fetchBatch: ListObjectsV2Request => M[Batch] = - request => Monad[M].pure { + def fetchBatch: ListObjectsV2Request => IO[Batch] = + request => IO.pure { val result = amazonS3.listObjectsV2(request) val more: Option[Token] = if (result.isTruncated) Some(result.getNextContinuationToken) @@ -35,14 +34,14 @@ class S3ClientObjectLister[M[_]: Monad](amazonS3: AmazonS3) { (result.getObjectSummaries.asScala.toStream, more) } - def fetchMore(more: Option[Token]): M[Stream[S3ObjectSummary]] = { + def fetchMore(more: Option[Token]): IO[Stream[S3ObjectSummary]] = { more match { - case None => Monad[M].pure(Stream.empty) + case None => IO.pure(Stream.empty) case Some(token) => fetch(requestMore(token)) } } - def fetch: ListObjectsV2Request => M[Stream[S3ObjectSummary]] = + def fetch: ListObjectsV2Request => IO[Stream[S3ObjectSummary]] = request => for { batch <- fetchBatch(request) @@ -51,10 +50,10 @@ class S3ClientObjectLister[M[_]: Monad](amazonS3: AmazonS3) { } yield summaries ++ rest for { - _ <- logListObjectsStart[M](bucket, prefix) + _ <- logListObjectsStart(bucket, prefix) r = new ListObjectsV2Request().withBucketName(bucket.name).withPrefix(prefix.key) summaries <- fetch(r) - _ <- logListObjectsFinish[M](bucket, prefix) + _ <- logListObjectsFinish(bucket, prefix) } yield domain.S3ObjectsData(byHash(summaries), byKey(summaries)) } diff --git a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/ThorpS3Client.scala b/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/ThorpS3Client.scala index 7ceb671..7b721db 100644 --- a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/ThorpS3Client.scala +++ b/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/ThorpS3Client.scala @@ -1,31 +1,31 @@ package net.kemitix.thorp.aws.lib -import cats.Monad +import cats.effect.IO import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.transfer.TransferManager import net.kemitix.thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action} import net.kemitix.thorp.aws.api.{S3Action, S3Client, UploadProgressListener} import net.kemitix.thorp.domain._ -class ThorpS3Client[M[_]: Monad](amazonS3Client: => AmazonS3, +class ThorpS3Client(amazonS3Client: => AmazonS3, amazonS3TransferManager: => TransferManager) - extends S3Client[M] { + extends S3Client { - lazy val objectLister = new S3ClientObjectLister[M](amazonS3Client) - lazy val copier = new S3ClientCopier[M](amazonS3Client) - lazy val uploader = new Uploader[M](amazonS3TransferManager) - lazy val deleter = new S3ClientDeleter[M](amazonS3Client) + lazy val objectLister = new S3ClientObjectLister(amazonS3Client) + lazy val copier = new S3ClientCopier(amazonS3Client) + lazy val uploader = new Uploader(amazonS3TransferManager) + lazy val deleter = new S3ClientDeleter(amazonS3Client) override def listObjects(bucket: Bucket, prefix: RemoteKey) - (implicit logger: Logger[M]): M[S3ObjectsData] = + (implicit logger: Logger): IO[S3ObjectsData] = objectLister.listObjects(bucket, prefix) override def copy(bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey) - (implicit logger: Logger[M]): M[CopyS3Action] = + (implicit logger: Logger): IO[CopyS3Action] = copier.copy(bucket, sourceKey,hash, targetKey) override def upload(localFile: LocalFile, @@ -34,12 +34,12 @@ class ThorpS3Client[M[_]: Monad](amazonS3Client: => AmazonS3, multiPartThreshold: Long, tryCount: Int, maxRetries: Int) - (implicit logger: Logger[M]): M[S3Action] = + (implicit logger: Logger): IO[S3Action] = uploader.upload(localFile, bucket, progressListener, multiPartThreshold, 1, maxRetries) override def delete(bucket: Bucket, remoteKey: RemoteKey) - (implicit logger: Logger[M]): M[DeleteS3Action] = + (implicit logger: Logger): IO[DeleteS3Action] = deleter.delete(bucket, remoteKey) } diff --git a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/Uploader.scala b/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/Uploader.scala index 704ef8c..7699884 100644 --- a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/Uploader.scala +++ b/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/Uploader.scala @@ -1,7 +1,6 @@ package net.kemitix.thorp.aws.lib -import cats.Monad -import cats.implicits._ +import cats.effect.IO import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener} import com.amazonaws.services.s3.model.PutObjectRequest import com.amazonaws.services.s3.transfer.model.UploadResult @@ -14,7 +13,7 @@ import net.kemitix.thorp.domain._ import scala.util.Try -class Uploader[M[_]: Monad](transferManager: => AmazonTransferManager) { +class Uploader(transferManager: => AmazonTransferManager) { def accepts(localFile: LocalFile) (implicit multiPartThreshold: Long): Boolean = @@ -26,11 +25,11 @@ class Uploader[M[_]: Monad](transferManager: => AmazonTransferManager) { multiPartThreshold: Long, tryCount: Int, maxRetries: Int) - (implicit logger: Logger[M]): M[S3Action] = + (implicit logger: Logger): IO[S3Action] = for { - _ <- logMultiPartUploadStart[M](localFile, tryCount) + _ <- logMultiPartUploadStart(localFile, tryCount) upload <- transfer(localFile, bucket, uploadProgressListener) - _ <- logMultiPartUploadFinished[M](localFile) + _ <- logMultiPartUploadFinished(localFile) } yield upload match { case Right(r) => UploadS3Action(RemoteKey(r.getKey), MD5Hash(r.getETag)) case Left(e) => ErroredS3Action(localFile.remoteKey, e) @@ -39,10 +38,10 @@ class Uploader[M[_]: Monad](transferManager: => AmazonTransferManager) { private def transfer(localFile: LocalFile, bucket: Bucket, uploadProgressListener: UploadProgressListener, - ): M[Either[Throwable, UploadResult]] = { + ): IO[Either[Throwable, UploadResult]] = { val listener: ProgressListener = progressListener(uploadProgressListener) val putObjectRequest = request(localFile, bucket, listener) - Monad[M].pure { + IO { Try(transferManager.upload(putObjectRequest)) .map(_.waitForUploadResult) .toEither diff --git a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/UploaderLogging.scala b/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/UploaderLogging.scala index 12c68e9..0324782 100644 --- a/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/UploaderLogging.scala +++ b/aws-lib/src/main/scala/net/kemitix/thorp/aws/lib/UploaderLogging.scala @@ -1,22 +1,22 @@ package net.kemitix.thorp.aws.lib -import cats.Monad +import cats.effect.IO import net.kemitix.thorp.domain.SizeTranslation.sizeInEnglish import net.kemitix.thorp.domain.Terminal.clearLine import net.kemitix.thorp.domain.{LocalFile, Logger} object UploaderLogging { - def logMultiPartUploadStart[M[_]: Monad](localFile: LocalFile, + def logMultiPartUploadStart(localFile: LocalFile, tryCount: Int) - (implicit logger: Logger[M]): M[Unit] = { + (implicit logger: Logger): IO[Unit] = { val tryMessage = if (tryCount == 1) "" else s"try $tryCount" val size = sizeInEnglish(localFile.file.length) logger.info(s"${clearLine}upload:$tryMessage:$size:${localFile.remoteKey.key}") } - def logMultiPartUploadFinished[M[_]: Monad](localFile: LocalFile) - (implicit logger: Logger[M]): M[Unit] = + def logMultiPartUploadFinished(localFile: LocalFile) + (implicit logger: Logger): IO[Unit] = logger.debug(s"upload:finished: ${localFile.remoteKey.key}") } diff --git a/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/DummyLogger.scala b/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/DummyLogger.scala index 42b7507..236e704 100644 --- a/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/DummyLogger.scala +++ b/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/DummyLogger.scala @@ -1,16 +1,18 @@ package net.kemitix.thorp.aws.lib -import cats.Monad +import cats.effect.IO import net.kemitix.thorp.domain.Logger -class DummyLogger[M[_]: Monad] extends Logger[M] { +class DummyLogger extends Logger { - override def debug(message: => String): M[Unit] = Monad[M].unit + override def debug(message: => String): IO[Unit] = IO.unit - override def info(message: =>String): M[Unit] = Monad[M].unit + override def info(message: =>String): IO[Unit] = IO.unit - override def warn(message: String): M[Unit] = Monad[M].unit + override def warn(message: String): IO[Unit] = IO.unit - override def error(message: String): M[Unit] = Monad[M].unit + override def error(message: String): IO[Unit] = IO.unit + + override def withDebug(debug: Boolean): Logger = this } diff --git a/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/S3ClientSuite.scala b/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/S3ClientSuite.scala index 1e7d757..79ece93 100644 --- a/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/S3ClientSuite.scala +++ b/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/S3ClientSuite.scala @@ -2,7 +2,6 @@ package net.kemitix.thorp.aws.lib import java.time.Instant -import cats.Id import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.PutObjectRequest import com.amazonaws.services.s3.transfer.model.UploadResult @@ -23,7 +22,7 @@ class S3ClientSuite private val prefix = RemoteKey("prefix") implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) - implicit private val implLogger: Logger[Id] = new DummyLogger[Id] + implicit private val implLogger: Logger = new DummyLogger private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _ describe("getS3Status") { @@ -43,7 +42,7 @@ class S3ClientSuite keyotherkey.remoteKey -> HashModified(hash, lastModified), keydiffhash.remoteKey -> HashModified(diffhash, lastModified))) - def invoke(self: S3Client[Id], localFile: LocalFile) = { + def invoke(self: S3Client, localFile: LocalFile) = { S3MetaDataEnricher.getS3Status(localFile, s3ObjectsData) } diff --git a/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/ThorpS3ClientSuite.scala b/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/ThorpS3ClientSuite.scala index 8cb3507..b62f172 100644 --- a/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/ThorpS3ClientSuite.scala +++ b/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/ThorpS3ClientSuite.scala @@ -4,7 +4,6 @@ import java.time.Instant import java.time.temporal.ChronoUnit import java.util.Date -import cats.Id import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.{ListObjectsV2Request, ListObjectsV2Result, S3ObjectSummary} import com.amazonaws.services.s3.transfer.TransferManager @@ -21,7 +20,7 @@ class ThorpS3ClientSuite val source = Resource(this, "upload") val prefix = RemoteKey("prefix") implicit val config: Config = Config(Bucket("bucket"), prefix, source = source) - implicit val implLogger: Logger[Id] = new DummyLogger[Id] + implicit val implLogger: Logger = new DummyLogger val lm = LastModified(Instant.now.truncatedTo(ChronoUnit.MILLIS)) @@ -48,7 +47,7 @@ class ThorpS3ClientSuite val amazonS3 = stub[AmazonS3] val amazonS3TransferManager = stub[TransferManager] - val s3Client = new ThorpS3Client[Id](amazonS3, amazonS3TransferManager) + val s3Client = new ThorpS3Client(amazonS3, amazonS3TransferManager) val myFakeResponse = new ListObjectsV2Result() val summaries = myFakeResponse.getObjectSummaries @@ -66,7 +65,7 @@ class ThorpS3ClientSuite k1a -> HashModified(h1, lm), k1b -> HashModified(h1, lm), k2 -> HashModified(h2, lm))) - val result = s3Client.listObjects(Bucket("bucket"), RemoteKey("prefix")) + val result = s3Client.listObjects(Bucket("bucket"), RemoteKey("prefix")).unsafeRunSync assertResult(expected.byHash.keys)(result.byHash.keys) assertResult(expected.byKey.keys)(result.byKey.keys) assertResult(expected)(result) diff --git a/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/UploaderSuite.scala b/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/UploaderSuite.scala index c9f04fe..1d501d1 100644 --- a/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/UploaderSuite.scala +++ b/aws-lib/src/test/scala/net/kemitix/thorp/aws/lib/UploaderSuite.scala @@ -2,7 +2,6 @@ package net.kemitix.thorp.aws.lib import java.time.Instant -import cats.Id import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.transfer._ import net.kemitix.thorp.aws.api.S3Action.UploadS3Action @@ -20,7 +19,7 @@ class UploaderSuite private val source = Resource(this, ".") private val prefix = RemoteKey("prefix") implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) - implicit private val implLogger: Logger[Id] = new DummyLogger[Id] + implicit private val implLogger: Logger = new DummyLogger private val fileToKey = generateKey(config.source, config.prefix) _ val lastModified = LastModified(Instant.now()) diff --git a/build.sbt b/build.sbt index 94ef26e..86102ff 100644 --- a/build.sbt +++ b/build.sbt @@ -27,19 +27,6 @@ val awsSdkDependencies = Seq( "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % "2.9.9" ) ) -val catsSettings = Seq ( - libraryDependencies ++= Seq( - "org.typelevel" %% "cats-core" % "1.6.1" - ), - // recommended for cats-effects - scalacOptions ++= Seq( - "-feature", - "-deprecation", - "-unchecked", - "-language:postfixOps", - "-language:higherKinds", - "-Ypartial-unification") -) val catsEffectsSettings = Seq( libraryDependencies ++= Seq( "org.typelevel" %% "cats-effect" % "1.3.1" @@ -56,11 +43,13 @@ val catsEffectsSettings = Seq( // cli -> thorp-lib -> aws-lib -> core -> aws-api -> domain +lazy val root = (project in file(".")) + .settings(commonSettings) + lazy val cli = (project in file("cli")) .settings(commonSettings) .settings(mainClass in assembly := Some("net.kemitix.thorp.cli.Main")) .settings(applicationSettings) - .settings(catsEffectsSettings) .aggregate(`thorp-lib`, `aws-lib`, core, `aws-api`, domain) .settings(commandLineParsing) .settings(testDependencies) @@ -87,10 +76,10 @@ lazy val core = (project in file("core")) lazy val `aws-api` = (project in file("aws-api")) .settings(commonSettings) .settings(assemblyJarName in assembly := "aws-api.jar") - .settings(catsSettings) .dependsOn(domain) lazy val domain = (project in file("domain")) .settings(commonSettings) .settings(assemblyJarName in assembly := "domain.jar") + .settings(catsEffectsSettings) .settings(testDependencies) diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala b/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala index e9913ea..890512a 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/Main.scala @@ -1,20 +1,14 @@ package net.kemitix.thorp.cli -import java.nio.file.Paths - import cats.effect.ExitCase.{Canceled, Completed, Error} import cats.effect.{ExitCode, IO, IOApp} -import net.kemitix.thorp.domain.Config object Main extends IOApp { - val defaultConfig: Config = - Config(source = Paths.get(".").toFile) - override def run(args: List[String]): IO[ExitCode] = { - val exitCaseLogger = new PrintLogger[IO](false) - ParseArgs(args, defaultConfig) - .map(Program[IO]) + val exitCaseLogger = new PrintLogger(false) + ParseArgs(args) + .map(Program(_)) .getOrElse(IO(ExitCode.Error)) .guaranteeCase { case Canceled => exitCaseLogger.warn("Interrupted") diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/ParseArgs.scala b/cli/src/main/scala/net/kemitix/thorp/cli/ParseArgs.scala index 3c6133d..2b4aecf 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/ParseArgs.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/ParseArgs.scala @@ -1,47 +1,42 @@ package net.kemitix.thorp.cli -import java.io.File import java.nio.file.Paths -import net.kemitix.thorp.domain.Filter.{Exclude, Include} -import net.kemitix.thorp.domain.{Bucket, Config, RemoteKey} +import net.kemitix.thorp.core.ConfigOption import scopt.OParser object ParseArgs { - val configParser: OParser[Unit, Config] = { - val parserBuilder = OParser.builder[Config] + val configParser: OParser[Unit, List[ConfigOption]] = { + val parserBuilder = OParser.builder[List[ConfigOption]] import parserBuilder._ OParser.sequence( programName("thorp"), head("thorp"), opt[String]('s', "source") - .action((str, c) => c.copy(source = Paths.get(str).toFile)) - .validate(s => if (new File(s).isDirectory) Right(()) else Left("Source is not a directory")) - .required() - .text("Source directory to sync to S3"), + .action((str, cos) => ConfigOption.Source(Paths.get(str)) :: cos) + .text("Source directory to sync to destination"), opt[String]('b', "bucket") - .action((str, c) => c.copy(bucket = Bucket(str))) - .required() + .action((str, cos) => ConfigOption.Bucket(str) :: cos) .text("S3 bucket name"), opt[String]('p', "prefix") - .action((str, c) => c.copy(prefix = RemoteKey(str))) + .action((str, cos) => ConfigOption.Prefix(str) :: cos) .text("Prefix within the S3 Bucket"), - opt[Seq[String]]('i', "include") + opt[String]('i', "include") .unbounded() - .action((str, c) => c.copy(filters = c.filters ++ str.map(Include))) + .action((str, cos) => ConfigOption.Include(str) :: cos) .text("Include only matching paths"), - opt[Seq[String]]('x', "exclude") + opt[String]('x', "exclude") .unbounded() - .action((str,c) => c.copy(filters = c.filters ++ str.map(Exclude))) + .action((str,cos) => ConfigOption.Exclude(str) :: cos) .text("Exclude matching paths"), opt[Unit]('d', "debug") - .action((_, c) => c.copy(debug = true)) + .action((_, cos) => ConfigOption.Debug() :: cos) .text("Enable debug logging") ) } - def apply(args: List[String], defaultConfig: Config): Option[Config] = - OParser.parse(configParser, args, defaultConfig) + def apply(args: List[String]): Option[List[ConfigOption]] = + OParser.parse(configParser, args, List()) } diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/PrintLogger.scala b/cli/src/main/scala/net/kemitix/thorp/cli/PrintLogger.scala index e37c15a..8b54d70 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/PrintLogger.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/PrintLogger.scala @@ -1,18 +1,22 @@ package net.kemitix.thorp.cli -import cats.Monad +import cats.effect.IO import net.kemitix.thorp.domain.Logger -class PrintLogger[M[_]: Monad](isDebug: Boolean) extends Logger[M] { +class PrintLogger(isDebug: Boolean = false) extends Logger { - override def debug(message: => String): M[Unit] = - if (isDebug) Monad[M].pure(println(s"[ DEBUG] $message")) - else Monad[M].unit + override def debug(message: => String): IO[Unit] = + if (isDebug) IO(println(s"[ DEBUG] $message")) + else IO.unit - override def info(message: => String): M[Unit] = Monad[M].pure(println(s"[ INFO] $message")) + override def info(message: => String): IO[Unit] = IO(println(s"[ INFO] $message")) - override def warn(message: String): M[Unit] = Monad[M].pure(println(s"[ WARN] $message")) + override def warn(message: String): IO[Unit] = IO(println(s"[ WARN] $message")) - override def error(message: String): M[Unit] = Monad[M].pure(println(s"[ ERROR] $message")) + override def error(message: String): IO[Unit] = IO(println(s"[ ERROR] $message")) + + override def withDebug(debug: Boolean): Logger = + if (isDebug == debug) this + else new PrintLogger(debug) } diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala index 2395f13..22d6843 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala @@ -1,20 +1,24 @@ package net.kemitix.thorp.cli -import cats.Monad -import cats.effect.ExitCode -import cats.implicits._ +import cats.effect.{ExitCode, IO} import net.kemitix.thorp.aws.lib.S3ClientBuilder -import net.kemitix.thorp.core.Sync -import net.kemitix.thorp.domain.{Config, Logger} +import net.kemitix.thorp.core.{ConfigOption, Sync} +import net.kemitix.thorp.domain.Logger -object Program { +trait Program { - def apply[M[_]: Monad](config: Config): M[ExitCode] = { - implicit val logger: Logger[M] = new PrintLogger[M](config.debug) - for { - _ <- logger.info("Thorp - hashed sync for cloud storage") - _ <- Sync.run[M](config, S3ClientBuilder.defaultClient) - } yield ExitCode.Success + def apply(configOptions: Seq[ConfigOption]): IO[ExitCode] = { + implicit val logger: Logger = new PrintLogger() + Sync(S3ClientBuilder.defaultClient)(configOptions) flatMap { + case Left(errors) => + for { + _ <- logger.error(s"There were errors:") + _ <- IO.pure(errors.map(error => logger.error(s" - $error"))) + } yield ExitCode.Error + case Right(_) => IO.pure(ExitCode.Success) + } } } + +object Program extends Program \ No newline at end of file diff --git a/cli/src/test/scala/net/kemitix/thorp/cli/ParseArgsTest.scala b/cli/src/test/scala/net/kemitix/thorp/cli/ParseArgsTest.scala index 2efbec1..74efca9 100644 --- a/cli/src/test/scala/net/kemitix/thorp/cli/ParseArgsTest.scala +++ b/cli/src/test/scala/net/kemitix/thorp/cli/ParseArgsTest.scala @@ -1,7 +1,7 @@ package net.kemitix.thorp.cli -import net.kemitix.thorp.core.Resource -import net.kemitix.thorp.domain.Config +import net.kemitix.thorp.core.ConfigOption.Debug +import net.kemitix.thorp.core.{ConfigOption, Resource} import org.scalatest.FunSpec import scala.util.Try @@ -9,11 +9,10 @@ import scala.util.Try class ParseArgsTest extends FunSpec { val source = Resource(this, "") - val defaultConfig: Config = Config(source = source) describe("parse - source") { def invokeWithSource(path: String) = - ParseArgs(List("--source", path, "--bucket", "bucket"), defaultConfig) + ParseArgs(List("--source", path, "--bucket", "bucket")) describe("when source is a directory") { val result = invokeWithSource(pathTo(".")) @@ -21,52 +20,35 @@ class ParseArgsTest extends FunSpec { assert(result.isDefined) } } - describe("when source is a file") { - val result = invokeWithSource(pathTo("ParseArgs.class")) - it("should fail") { - assert(result.isEmpty) - } - } - describe("when source is not found") { - val result = invokeWithSource(pathTo("not-found")) - it("should fail") { - assert(result.isEmpty) - } - } describe("when source is a relative path to a directory") { it("should succeed") {pending} } - describe("when source is a relative path to a file") { - it("should fail") {pending} - } - describe("when source is a relative path to a missing path") { - it("should fail") {pending} - } } describe("parse - debug") { - def invokeWithDebug(debug: String) = { - val strings = List("--source", pathTo("."), "--bucket", "bucket", debug) + def invokeWithArgument(arg: String): List[ConfigOption] = { + val strings = List("--source", pathTo("."), "--bucket", "bucket", arg) .filter(_ != "") - ParseArgs(strings, defaultConfig).map(_.debug) + val maybeOptions = ParseArgs(strings) + maybeOptions.getOrElse(List()) } describe("when no debug flag") { - val debugFlag = invokeWithDebug("") + val configOptions = invokeWithArgument("") it("debug should be false") { - assert(debugFlag.contains(false)) + assertResult(false)(configOptions.contains(Debug())) } } describe("when long debug flag") { - val debugFlag = invokeWithDebug("--debug") + val configOptions = invokeWithArgument("--debug") it("debug should be true") { - assert(debugFlag.contains(true)) + assert(configOptions.contains(Debug())) } } describe("when short debug flag") { - val debugFlag = invokeWithDebug("-d") + val configOptions = invokeWithArgument("-d") it("debug should be true") { - assert(debugFlag.contains(true)) + assert(configOptions.contains(Debug())) } } } diff --git a/core/src/main/scala/net/kemitix/thorp/core/ActionSubmitter.scala b/core/src/main/scala/net/kemitix/thorp/core/ActionSubmitter.scala index 2ba91dd..c743f1c 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ActionSubmitter.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ActionSubmitter.scala @@ -1,7 +1,6 @@ package net.kemitix.thorp.core -import cats.Monad -import cats.implicits._ +import cats.effect.IO import net.kemitix.thorp.aws.api.S3Action.DoNothingS3Action import net.kemitix.thorp.aws.api.{S3Action, S3Client, UploadProgressListener} import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} @@ -9,9 +8,10 @@ import net.kemitix.thorp.domain.{Config, Logger} object ActionSubmitter { - def submitAction[M[_]: Monad](s3Client: S3Client[M], action: Action) - (implicit c: Config, - logger: Logger[M]): Stream[M[S3Action]] = { + def submitAction(s3Client: S3Client, + action: Action) + (implicit c: Config, + logger: Logger): Stream[IO[S3Action]] = { Stream( action match { case ToUpload(bucket, localFile) => @@ -31,7 +31,7 @@ object ActionSubmitter { action <- s3Client.delete(bucket, remoteKey) } yield action case DoNothing(bucket, remoteKey) => - Monad[M].pure(DoNothingS3Action(remoteKey)) + IO.pure(DoNothingS3Action(remoteKey)) }) } } diff --git a/core/src/main/scala/net/kemitix/thorp/core/ConfigOption.scala b/core/src/main/scala/net/kemitix/thorp/core/ConfigOption.scala new file mode 100644 index 0000000..85590d9 --- /dev/null +++ b/core/src/main/scala/net/kemitix/thorp/core/ConfigOption.scala @@ -0,0 +1,31 @@ +package net.kemitix.thorp.core + +import java.nio.file.Path + +import net.kemitix.thorp.domain +import net.kemitix.thorp.domain.{Config, RemoteKey} + +sealed trait ConfigOption { + def update(config: Config): Config +} + +object ConfigOption { + case class Source(path: Path) extends ConfigOption { + override def update(config: Config): Config = config.copy(source = path.toFile) + } + case class Bucket(name: String) extends ConfigOption { + override def update(config: Config): Config = config.copy(bucket = domain.Bucket(name)) + } + case class Prefix(path: String) extends ConfigOption { + override def update(config: Config): Config = config.copy(prefix = RemoteKey(path)) + } + case class Include(pattern: String) extends ConfigOption { + override def update(config: Config): Config = config.copy(filters = domain.Filter.Include(pattern) :: config.filters) + } + case class Exclude(pattern: String) extends ConfigOption { + override def update(config: Config): Config = config.copy(filters = domain.Filter.Exclude(pattern) :: config.filters) + } + case class Debug() extends ConfigOption { + override def update(config: Config): Config = config.copy(debug = true) + } +} diff --git a/core/src/main/scala/net/kemitix/thorp/core/ConfigValidation.scala b/core/src/main/scala/net/kemitix/thorp/core/ConfigValidation.scala new file mode 100644 index 0000000..de9ad81 --- /dev/null +++ b/core/src/main/scala/net/kemitix/thorp/core/ConfigValidation.scala @@ -0,0 +1,22 @@ +package net.kemitix.thorp.core + +sealed trait ConfigValidation { + + def errorMessage: String +} + +object ConfigValidation { + + case object SourceIsNotADirectory extends ConfigValidation { + override def errorMessage: String = "Source must be a directory" + } + + case object SourceIsNotReadable extends ConfigValidation { + override def errorMessage: String = "Source must be readable" + } + + case object BucketNameIsMissing extends ConfigValidation { + override def errorMessage: String = "Bucket name is missing" + } + +} diff --git a/core/src/main/scala/net/kemitix/thorp/core/ConfigValidator.scala b/core/src/main/scala/net/kemitix/thorp/core/ConfigValidator.scala new file mode 100644 index 0000000..5a6a48f --- /dev/null +++ b/core/src/main/scala/net/kemitix/thorp/core/ConfigValidator.scala @@ -0,0 +1,35 @@ +package net.kemitix.thorp.core + +import java.io.File + +import cats.data.{NonEmptyChain, Validated, ValidatedNec} +import cats.implicits._ +import net.kemitix.thorp.domain.{Bucket, Config} + +sealed trait ConfigValidator { + + type ValidationResult[A] = ValidatedNec[ConfigValidation, A] + + def validateSourceIsDirectory(source: File): ValidationResult[File] = + if(source.isDirectory) source.validNec + else ConfigValidation.SourceIsNotADirectory.invalidNec + + def validateSourceIsReadable(source: File): ValidationResult[File] = + if(source.canRead) source.validNec + else ConfigValidation.SourceIsNotReadable.invalidNec + + def validateSource(source: File): ValidationResult[File] = + validateSourceIsDirectory(source).andThen(s => validateSourceIsReadable(s)) + + def validateBucket(bucket: Bucket): ValidationResult[Bucket] = + if (bucket.name.isEmpty) ConfigValidation.BucketNameIsMissing.invalidNec + else bucket.validNec + + def validateConfig(config: Config): Validated[NonEmptyChain[ConfigValidation], Config] = + ( + validateSource(config.source), + validateBucket(config.bucket) + ).mapN((_, _) => config) +} + +object ConfigValidator extends ConfigValidator diff --git a/core/src/main/scala/net/kemitix/thorp/core/ConfigurationBuilder.scala b/core/src/main/scala/net/kemitix/thorp/core/ConfigurationBuilder.scala new file mode 100644 index 0000000..4b48bd6 --- /dev/null +++ b/core/src/main/scala/net/kemitix/thorp/core/ConfigurationBuilder.scala @@ -0,0 +1,47 @@ +package net.kemitix.thorp.core + +import java.io.File +import java.nio.file.Paths + +import cats.data.NonEmptyChain +import cats.effect.IO +import net.kemitix.thorp.core.ConfigValidator.validateConfig +import net.kemitix.thorp.domain.Config +import net.kemitix.thorp.core.ParseConfigFile.parseFile + +/** + * Builds a configuration from settings in a file within the + * `source` directory and from supplied configuration options. + */ +trait ConfigurationBuilder { + + private val pwdFile: File = Paths.get(System.getenv("PWD")).toFile + + private val defaultConfig: Config = Config(source = pwdFile) + + def buildConfig(priorityOptions: Seq[ConfigOption]): IO[Either[NonEmptyChain[ConfigValidation], Config]] = { + val source = findSource(priorityOptions) + for { + options <- sourceOptions(source) + collected = priorityOptions ++ options + config = collateOptions(collected) + } yield validateConfig(config).toEither + } + + private def findSource(priorityOptions: Seq[ConfigOption]): File = + priorityOptions.foldRight(pwdFile)((co, f) => co match { + case ConfigOption.Source(source) => source.toFile + case _ => f + }) + + private def sourceOptions(source: File): IO[Seq[ConfigOption]] = + readFile(source, ".thorp.conf") + + private def readFile(source: File, filename: String): IO[Seq[ConfigOption]] = + parseFile(source.toPath.resolve(filename)) + + private def collateOptions(configOptions: Seq[ConfigOption]): Config = + configOptions.foldRight(defaultConfig)((co, c) => co.update(c)) +} + +object ConfigurationBuilder extends ConfigurationBuilder diff --git a/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala b/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala index 52ea218..5648dd9 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala @@ -3,25 +3,24 @@ package net.kemitix.thorp.core import java.io.File import java.nio.file.Path -import cats.Monad -import cats.implicits._ +import cats.effect.IO import net.kemitix.thorp.core.KeyGenerator.generateKey import net.kemitix.thorp.domain import net.kemitix.thorp.domain._ object LocalFileStream { - def findFiles[M[_]: Monad](file: File, - md5HashGenerator: File => M[MD5Hash]) - (implicit c: Config, - logger: Logger[M]): M[Stream[LocalFile]] = { + def findFiles(file: File, + md5HashGenerator: File => IO[MD5Hash]) + (implicit c: Config, + logger: Logger): IO[Stream[LocalFile]] = { val filters: Path => Boolean = Filter.isIncluded(c.filters) - def loop(file: File): M[Stream[LocalFile]] = { + def loop(file: File): IO[Stream[LocalFile]] = { - def dirPaths(file: File): M[Stream[File]] = - Monad[M].pure { + def dirPaths(file: File): IO[Stream[File]] = + IO.pure { Option(file.listFiles) .getOrElse(throw new IllegalArgumentException(s"Directory not found $file")) } @@ -29,15 +28,15 @@ object LocalFileStream { Stream(fs: _*) .filter(f => filters(f.toPath))) - def recurseIntoSubDirectories(file: File)(implicit c: Config): M[Stream[LocalFile]] = + def recurseIntoSubDirectories(file: File)(implicit c: Config): IO[Stream[LocalFile]] = file match { case f if f.isDirectory => loop(file) case _ => for(hash <- md5HashGenerator(file)) yield Stream(domain.LocalFile(file, c.source, hash, generateKey(c.source, c.prefix))) } - def recurse(fs: Stream[File]): M[Stream[LocalFile]] = - fs.foldLeft(Monad[M].pure(Stream.empty[LocalFile]))((acc, f) => + def recurse(fs: Stream[File]): IO[Stream[LocalFile]] = + fs.foldLeft(IO.pure(Stream.empty[LocalFile]))((acc, f) => recurseIntoSubDirectories(f) .flatMap(lfs => acc.map(s => s ++ lfs))) diff --git a/core/src/main/scala/net/kemitix/thorp/core/MD5HashGenerator.scala b/core/src/main/scala/net/kemitix/thorp/core/MD5HashGenerator.scala index 321ec46..7cdc7b7 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/MD5HashGenerator.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/MD5HashGenerator.scala @@ -3,21 +3,20 @@ package net.kemitix.thorp.core import java.io.{File, FileInputStream} import java.security.MessageDigest -import cats.Monad -import cats.implicits._ +import cats.effect.IO import net.kemitix.thorp.domain.{Logger, MD5Hash} import scala.collection.immutable.NumericRange object MD5HashGenerator { - def md5File[M[_]: Monad](file: File) - (implicit logger: Logger[M]): M[MD5Hash] = { + def md5File(file: File) + (implicit logger: Logger): IO[MD5Hash] = { val maxBufferSize = 8048 val defaultBuffer = new Array[Byte](maxBufferSize) - def openFile = Monad[M].pure(new FileInputStream(file)) - def closeFile = {fis: FileInputStream => Monad[M].pure(fis.close())} + def openFile = IO.pure(new FileInputStream(file)) + def closeFile = {fis: FileInputStream => IO(fis.close())} def nextChunkSize(currentOffset: Long) = { // a value between 1 and maxBufferSize @@ -36,7 +35,7 @@ object MD5HashGenerator { } def digestFile(fis: FileInputStream) = - Monad[M].pure { + IO { val md5 = MessageDigest getInstance "MD5" NumericRange(0, file.length, maxBufferSize) .foreach { currentOffset => { @@ -46,7 +45,7 @@ object MD5HashGenerator { (md5.digest map ("%02x" format _)).mkString } - def readFile: M[String] = + def readFile: IO[String] = for { fis <- openFile md5 <- digestFile(fis) diff --git a/core/src/main/scala/net/kemitix/thorp/core/ParseConfigFile.scala b/core/src/main/scala/net/kemitix/thorp/core/ParseConfigFile.scala new file mode 100644 index 0000000..935065a --- /dev/null +++ b/core/src/main/scala/net/kemitix/thorp/core/ParseConfigFile.scala @@ -0,0 +1,28 @@ +package net.kemitix.thorp.core + +import java.nio.file.{Files, Path} + +import cats.effect.IO + +import scala.collection.JavaConverters._ + +trait ParseConfigFile { + + def parseFile(filename: Path): IO[Seq[ConfigOption]] = + readFile(filename).map(ParseConfigLines.parseLines) + + private def readFile(filename: Path) = { + if (Files.exists(filename)) readFileThatExists(filename) + else IO.pure(List()) + } + + private def readFileThatExists(filename: Path) = + for { + lines <- IO(Files.lines(filename)) + list = lines.iterator.asScala.toList + _ <- IO(lines.close()) + } yield list + +} + +object ParseConfigFile extends ParseConfigFile \ No newline at end of file diff --git a/core/src/main/scala/net/kemitix/thorp/core/ParseConfigLines.scala b/core/src/main/scala/net/kemitix/thorp/core/ParseConfigLines.scala new file mode 100644 index 0000000..7e273bb --- /dev/null +++ b/core/src/main/scala/net/kemitix/thorp/core/ParseConfigLines.scala @@ -0,0 +1,43 @@ +package net.kemitix.thorp.core + +import java.nio.file.Paths +import java.util.regex.Pattern + +import net.kemitix.thorp.core.ConfigOption.{Bucket, Debug, Exclude, Include, Prefix, Source} + +trait ParseConfigLines { + + def parseLines(lines: List[String]): List[ConfigOption] = + lines.flatMap(parseLine) + + private val pattern = "^\\s*(?\\S*)\\s*=\\s*(?\\S*)\\s*$" + private val format = Pattern.compile(pattern) + + private def parseLine(str: String) = + format.matcher(str) match { + case m if m.matches => parseKeyValue(m.group("key"), m.group("value")) + case _ =>None + } + + def truthy(value: String): Boolean = + value.toLowerCase match { + case "true" => true + case "yes" => true + case "enabled" => true + case _ => false + } + + private def parseKeyValue(key: String, value: String): Option[ConfigOption] = + key.toLowerCase match { + case "source" => Some(Source(Paths.get(value))) + case "bucket" => Some(Bucket(value)) + case "prefix" => Some(Prefix(value)) + case "include" => Some(Include(value)) + case "exclude" => Some(Exclude(value)) + case "debug" => if (truthy(value)) Some(Debug()) else None + case _ => None + } + +} + +object ParseConfigLines extends ParseConfigLines diff --git a/core/src/main/scala/net/kemitix/thorp/core/Sync.scala b/core/src/main/scala/net/kemitix/thorp/core/Sync.scala index 41cd855..bfc3bf7 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/Sync.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/Sync.scala @@ -1,36 +1,55 @@ package net.kemitix.thorp.core -import cats.Monad +import cats.effect.IO import cats.implicits._ import net.kemitix.thorp.aws.api.{S3Action, S3Client} import net.kemitix.thorp.core.Action.ToDelete import net.kemitix.thorp.core.ActionGenerator.createActions import net.kemitix.thorp.core.ActionSubmitter.submitAction +import net.kemitix.thorp.core.ConfigurationBuilder.buildConfig import net.kemitix.thorp.core.LocalFileStream.findFiles import net.kemitix.thorp.core.S3MetaDataEnricher.getMetadata import net.kemitix.thorp.core.SyncLogging.{logFileScan, logRunFinished, logRunStart} import net.kemitix.thorp.domain._ -object Sync { +trait Sync { - def run[M[_]: Monad](config: Config, - s3Client: S3Client[M]) - (implicit logger: Logger[M]): M[Unit] = { + def errorMessages(errors: List[ConfigValidation]): List[String] = { + for { + errorMessages <- errors.map(cv => cv.errorMessage) + } yield errorMessages + } - implicit val c: Config = config + def apply(s3Client: S3Client) + (configOptions: Seq[ConfigOption]) + (implicit defaultLogger: Logger): IO[Either[List[String], Unit]] = + buildConfig(configOptions).flatMap { + case Left(errors) => IO.pure(Left(errorMessages(errors.toList))) + case Right(config) => + for { + _ <- run(config, s3Client, defaultLogger.withDebug(config.debug)) + } yield Right(()) + } + + private def run(cliConfig: Config, + s3Client: S3Client, + logger: Logger): IO[Unit] = { + + implicit val c: Config = cliConfig + implicit val l: Logger = logger def metaData(s3Data: S3ObjectsData, sFiles: Stream[LocalFile]) = - Monad[M].pure(sFiles.map(file => getMetadata(file, s3Data))) + IO.pure(sFiles.map(file => getMetadata(file, s3Data))) def actions(sData: Stream[S3MetaData]) = - Monad[M].pure(sData.flatMap(s3MetaData => createActions(s3MetaData))) + IO.pure(sData.flatMap(s3MetaData => createActions(s3MetaData))) def submit(sActions: Stream[Action]) = - Monad[M].pure(sActions.flatMap(action => submitAction[M](s3Client, action))) + IO(sActions.flatMap(action => submitAction(s3Client, action))) - def copyUploadActions(s3Data: S3ObjectsData): M[Stream[S3Action]] = + def copyUploadActions(s3Data: S3ObjectsData): IO[Stream[S3Action]] = (for { - files <- findFiles(c.source, MD5HashGenerator.md5File[M](_)) + files <- findFiles(c.source, MD5HashGenerator.md5File(_)) metaData <- metaData(s3Data, files) actions <- actions(metaData) s3Actions <- submit(actions) @@ -38,23 +57,24 @@ object Sync { .flatten .map(streamS3Actions => streamS3Actions.sorted) - def deleteActions(s3ObjectsData: S3ObjectsData): M[Stream[S3Action]] = + def deleteActions(s3ObjectsData: S3ObjectsData): IO[Stream[S3Action]] = (for { key <- s3ObjectsData.byKey.keys if key.isMissingLocally(c.source, c.prefix) - ioDelAction <- submitAction[M](s3Client, ToDelete(c.bucket, key)) + ioDelAction <- submitAction(s3Client, ToDelete(c.bucket, key)) } yield ioDelAction) .toStream .sequence for { - _ <- logRunStart[M] + _ <- logRunStart s3data <- s3Client.listObjects(c.bucket, c.prefix) - _ <- logFileScan[M] + _ <- logFileScan copyUploadActions <- copyUploadActions(s3data) deleteActions <- deleteActions(s3data) - _ <- logRunFinished[M](copyUploadActions ++ deleteActions) + _ <- logRunFinished(copyUploadActions ++ deleteActions) } yield () } - } + +object Sync extends Sync diff --git a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala b/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala index 3c077bb..80e940b 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala @@ -1,7 +1,6 @@ package net.kemitix.thorp.core -import cats.Monad -import cats.implicits._ +import cats.effect.IO import net.kemitix.thorp.aws.api.S3Action import net.kemitix.thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action, ErroredS3Action, UploadS3Action} import net.kemitix.thorp.domain.{Config, Logger} @@ -9,17 +8,17 @@ import net.kemitix.thorp.domain.{Config, Logger} // Logging for the Sync class object SyncLogging { - def logRunStart[M[_]: Monad](implicit c: Config, - logger: Logger[M]): M[Unit] = + def logRunStart(implicit c: Config, + logger: Logger): IO[Unit] = logger.info(s"Bucket: ${c.bucket.name}, Prefix: ${c.prefix.key}, Source: ${c.source}, ") - def logFileScan[M[_]: Monad](implicit c: Config, - logger: Logger[M]): M[Unit] = + def logFileScan(implicit c: Config, + logger: Logger): IO[Unit] = logger.info(s"Scanning local files: ${c.source}...") - def logRunFinished[M[_]: Monad](actions: Stream[S3Action]) - (implicit c: Config, - logger: Logger[M]): M[Unit] = { + def logRunFinished(actions: Stream[S3Action]) + (implicit c: Config, + logger: Logger): IO[Unit] = { val counters = actions.foldLeft(Counters())(countActivities) for { _ <- logger.info(s"Uploaded ${counters.uploaded} files") diff --git a/core/src/test/resources/net/kemitix/thorp/core/empty-file b/core/src/test/resources/net/kemitix/thorp/core/empty-file new file mode 100644 index 0000000..e69de29 diff --git a/core/src/test/resources/net/kemitix/thorp/core/invalid-config b/core/src/test/resources/net/kemitix/thorp/core/invalid-config new file mode 100644 index 0000000..7d5b540 --- /dev/null +++ b/core/src/test/resources/net/kemitix/thorp/core/invalid-config @@ -0,0 +1 @@ +no valid = config items diff --git a/core/src/test/resources/net/kemitix/thorp/core/simple-config b/core/src/test/resources/net/kemitix/thorp/core/simple-config new file mode 100644 index 0000000..fcac950 --- /dev/null +++ b/core/src/test/resources/net/kemitix/thorp/core/simple-config @@ -0,0 +1,2 @@ +source = /path/to/source +bucket = bucket-name diff --git a/core/src/test/scala/net/kemitix/thorp/core/DummyLogger.scala b/core/src/test/scala/net/kemitix/thorp/core/DummyLogger.scala index de2449e..4825a31 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/DummyLogger.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/DummyLogger.scala @@ -1,16 +1,18 @@ package net.kemitix.thorp.core -import cats.Monad +import cats.effect.IO import net.kemitix.thorp.domain.Logger -class DummyLogger[M[_]: Monad] extends Logger[M] { +class DummyLogger extends Logger { - override def debug(message: => String): M[Unit] = Monad[M].unit + override def debug(message: => String): IO[Unit] = IO.unit - override def info(message: =>String): M[Unit] = Monad[M].unit + override def info(message: =>String): IO[Unit] = IO.unit - override def warn(message: String): M[Unit] = Monad[M].unit + override def warn(message: String): IO[Unit] = IO.unit - override def error(message: String): M[Unit] = Monad[M].unit + override def error(message: String): IO[Unit] = IO.unit + + override def withDebug(debug: Boolean): Logger = this } diff --git a/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala index beddece..f5485b7 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala @@ -2,7 +2,7 @@ package net.kemitix.thorp.core import java.io.File -import cats.Id +import cats.effect.IO import net.kemitix.thorp.domain.{Config, LocalFile, Logger, MD5Hash} import org.scalatest.FunSpec @@ -10,13 +10,13 @@ class LocalFileStreamSuite extends FunSpec { val uploadResource = Resource(this, "upload") implicit val config: Config = Config(source = uploadResource) - implicit private val logger: Logger[Id] = new DummyLogger[Id] - val md5HashGenerator: File => Id[MD5Hash] = file => MD5HashGenerator.md5File[Id](file) + implicit private val logger: Logger = new DummyLogger + val md5HashGenerator: File => IO[MD5Hash] = file => MD5HashGenerator.md5File(file) describe("findFiles") { it("should find all files") { val result: Set[String] = - LocalFileStream.findFiles[Id](uploadResource, md5HashGenerator).toSet + LocalFileStream.findFiles(uploadResource, md5HashGenerator).unsafeRunSync.toSet .map { x: LocalFile => x.relative.toString } assertResult(Set("subdir/leaf-file", "root-file"))(result) } diff --git a/core/src/test/scala/net/kemitix/thorp/core/MD5HashGeneratorTest.scala b/core/src/test/scala/net/kemitix/thorp/core/MD5HashGeneratorTest.scala index 4bbc8ec..607a3d1 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/MD5HashGeneratorTest.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/MD5HashGeneratorTest.scala @@ -1,6 +1,5 @@ package net.kemitix.thorp.core -import cats.Id import net.kemitix.thorp.core.MD5HashData.rootHash import net.kemitix.thorp.domain._ import org.scalatest.FunSpec @@ -10,12 +9,12 @@ class MD5HashGeneratorTest extends FunSpec { private val source = Resource(this, "upload") private val prefix = RemoteKey("prefix") implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) - implicit private val logger: Logger[Id] = new DummyLogger[Id] + implicit private val logger: Logger = new DummyLogger describe("read a small file (smaller than buffer)") { val file = Resource(this, "upload/root-file") it("should generate the correct hash") { - val result = MD5HashGenerator.md5File[Id](file) + val result = MD5HashGenerator.md5File(file).unsafeRunSync assertResult(rootHash)(result) } } @@ -23,7 +22,7 @@ class MD5HashGeneratorTest extends FunSpec { val file = Resource(this, "big-file") it("should generate the correct hash") { val expected = MD5Hash("b1ab1f7680138e6db7309200584e35d8") - val result = MD5HashGenerator.md5File[Id](file) + val result = MD5HashGenerator.md5File(file).unsafeRunSync assertResult(expected)(result) } } diff --git a/core/src/test/scala/net/kemitix/thorp/core/ParseConfigFileTest.scala b/core/src/test/scala/net/kemitix/thorp/core/ParseConfigFileTest.scala new file mode 100644 index 0000000..7a11de4 --- /dev/null +++ b/core/src/test/scala/net/kemitix/thorp/core/ParseConfigFileTest.scala @@ -0,0 +1,37 @@ +package net.kemitix.thorp.core + +import java.nio.file.{Path, Paths} + +import org.scalatest.FunSpec + +class ParseConfigFileTest extends FunSpec { + + private def invoke(filename: Path) = ParseConfigFile.parseFile(filename).unsafeRunSync + private val empty = List() + + describe("parse a missing file") { + val filename = Paths.get("/path/to/missing/file") + it("should return no options") { + assertResult(empty)(invoke(filename)) + } + } + describe("parse an empty file") { + val filename = Resource(this, "empty-file").toPath + it("should return no options") { + assertResult(empty)(invoke(filename)) + } + } + describe("parse a file with no valid entries") { + val filename = Resource(this, "invalid-config").toPath + it("should return no options") { + assertResult(empty)(invoke(filename)) + } + } + describe("parse a file with properties") { + val filename = Resource(this, "simple-config").toPath + val expected = List(ConfigOption.Source(Paths.get("/path/to/source")), ConfigOption.Bucket("bucket-name")) + it("should return some options") { + assertResult(expected)(invoke(filename)) + } + } +} diff --git a/core/src/test/scala/net/kemitix/thorp/core/ParseConfigLinesTest.scala b/core/src/test/scala/net/kemitix/thorp/core/ParseConfigLinesTest.scala new file mode 100644 index 0000000..525a858 --- /dev/null +++ b/core/src/test/scala/net/kemitix/thorp/core/ParseConfigLinesTest.scala @@ -0,0 +1,74 @@ +package net.kemitix.thorp.core + +import java.nio.file.Paths + +import org.scalatest.FunSpec + +class ParseConfigLinesTest extends FunSpec { + + describe("parse single lines") { + describe("source") { + it("should parse") { + val expected = List(ConfigOption.Source(Paths.get("/path/to/source"))) + val result = ParseConfigLines.parseLines(List("source = /path/to/source")) + assertResult(expected)(result) + } + } + describe("bucket") { + it("should parse") { + val expected = List(ConfigOption.Bucket("bucket-name")) + val result = ParseConfigLines.parseLines(List("bucket = bucket-name")) + assertResult(expected)(result) + } + } + describe("prefix") { + it("should parse") { + val expected = List(ConfigOption.Prefix("prefix/to/files")) + val result = ParseConfigLines.parseLines(List("prefix = prefix/to/files")) + assertResult(expected)(result) + } + } + describe("include") { + it("should parse") { + val expected = List(ConfigOption.Include("path/to/include")) + val result = ParseConfigLines.parseLines(List("include = path/to/include")) + assertResult(expected)(result) + } + } + describe("exclude") { + it("should parse") { + val expected = List(ConfigOption.Exclude("path/to/exclude")) + val result = ParseConfigLines.parseLines(List("exclude = path/to/exclude")) + assertResult(expected)(result) + } + } + describe("debug - true") { + it("should parse") { + val expected = List(ConfigOption.Debug()) + val result = ParseConfigLines.parseLines(List("debug = true")) + assertResult(expected)(result) + } + } + describe("debug - false") { + it("should parse") { + val expected = List() + val result = ParseConfigLines.parseLines(List("debug = false")) + assertResult(expected)(result) + } + } + describe("comment line") { + it("should be ignored") { + val expected = List() + val result = ParseConfigLines.parseLines(List("# ignore me")) + assertResult(expected)(result) + } + } + describe("unrecognised option") { + it("should be ignored") { + val expected = List() + val result = ParseConfigLines.parseLines(List("unsupported = option")) + assertResult(expected)(result) + } + } + } +} diff --git a/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala index bc28da7..dcf789e 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala @@ -3,7 +3,7 @@ package net.kemitix.thorp.core import java.io.File import java.time.Instant -import cats.Id +import cats.effect.IO import net.kemitix.thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action, UploadS3Action} import net.kemitix.thorp.aws.api.{S3Client, UploadProgressListener} import net.kemitix.thorp.core.MD5HashData.{leafHash, rootHash} @@ -16,23 +16,32 @@ class SyncSuite private val source = Resource(this, "upload") private val prefix = RemoteKey("prefix") - val config = Config(Bucket("bucket"), prefix, source = source) - implicit private val logger: Logger[Id] = new DummyLogger[Id] + private val configOptions = List( + ConfigOption.Source(source.toPath), + ConfigOption.Bucket("bucket"), + ConfigOption.Prefix("prefix") + ) + implicit private val logger: Logger = new DummyLogger private val lastModified = LastModified(Instant.now) def putObjectRequest(bucket: Bucket, remoteKey: RemoteKey, localFile: LocalFile): (String, String, File) = (bucket.name, remoteKey.key, localFile.file) - describe("run") { + describe("Sync.apply") { val testBucket = Bucket("bucket") // source contains the files root-file and subdir/leaf-file val rootRemoteKey = RemoteKey("prefix/root-file") val leafRemoteKey = RemoteKey("prefix/subdir/leaf-file") + + def invokeSubject(s3Client: RecordingClient, configOptions: List[ConfigOption]) = { + Sync(s3Client)(configOptions).unsafeRunSync + } + describe("when all files should be uploaded") { val s3Client = new RecordingClient(testBucket, S3ObjectsData( byHash = Map(), byKey = Map())) - Sync.run(config, s3Client) + invokeSubject(s3Client, configOptions) it("uploads all files") { val expectedUploads = Map( "subdir/leaf-file" -> leafRemoteKey, @@ -58,7 +67,7 @@ class SyncSuite RemoteKey("prefix/root-file") -> HashModified(rootHash, lastModified), RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified))) val s3Client = new RecordingClient(testBucket, s3ObjectsData) - Sync.run(config, s3Client) + invokeSubject(s3Client, configOptions) it("uploads nothing") { val expectedUploads = Map() assertResult(expectedUploads)(s3Client.uploadsRecord) @@ -82,7 +91,7 @@ class SyncSuite RemoteKey("prefix/root-file-old") -> HashModified(rootHash, lastModified), RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified))) val s3Client = new RecordingClient(testBucket, s3ObjectsData) - Sync.run(config, s3Client) + invokeSubject(s3Client, configOptions) it("uploads nothing") { val expectedUploads = Map() assertResult(expectedUploads)(s3Client.uploadsRecord) @@ -110,17 +119,16 @@ class SyncSuite byKey = Map( deletedKey -> HashModified(deletedHash, lastModified))) val s3Client = new RecordingClient(testBucket, s3ObjectsData) - Sync.run(config, s3Client) + invokeSubject(s3Client, configOptions) it("deleted key") { val expectedDeletions = Set(deletedKey) assertResult(expectedDeletions)(s3Client.deletionsRecord) } } describe("when a file is excluded") { - val config: Config = Config(Bucket("bucket"), prefix, source = source, filters = List(Exclude("leaf"))) val s3ObjectsData = S3ObjectsData(Map(), Map()) val s3Client = new RecordingClient(testBucket, s3ObjectsData) - Sync.run(config, s3Client) + invokeSubject(s3Client, ConfigOption.Exclude("leaf") :: configOptions) it("is not uploaded") { val expectedUploads = Map( "root-file" -> rootRemoteKey @@ -132,7 +140,7 @@ class SyncSuite class RecordingClient(testBucket: Bucket, s3ObjectsData: S3ObjectsData) - extends S3Client[Id] { + extends S3Client { var uploadsRecord: Map[String, RemoteKey] = Map() var copiesRecord: Map[RemoteKey, RemoteKey] = Map() @@ -140,8 +148,8 @@ class SyncSuite override def listObjects(bucket: Bucket, prefix: RemoteKey) - (implicit logger: Logger[Id]): S3ObjectsData = - s3ObjectsData + (implicit logger: Logger): IO[S3ObjectsData] = + IO.pure(s3ObjectsData) override def upload(localFile: LocalFile, bucket: Bucket, @@ -149,28 +157,28 @@ class SyncSuite multiPartThreshold: Long, tryCount: Int, maxRetries: Int) - (implicit logger: Logger[Id]): UploadS3Action = { + (implicit logger: Logger): IO[UploadS3Action] = { if (bucket == testBucket) uploadsRecord += (localFile.relative.toString -> localFile.remoteKey) - UploadS3Action(localFile.remoteKey, MD5Hash("some hash value")) + IO.pure(UploadS3Action(localFile.remoteKey, MD5Hash("some hash value"))) } override def copy(bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey - )(implicit logger: Logger[Id]): CopyS3Action = { + )(implicit logger: Logger): IO[CopyS3Action] = { if (bucket == testBucket) copiesRecord += (sourceKey -> targetKey) - CopyS3Action(targetKey) + IO.pure(CopyS3Action(targetKey)) } override def delete(bucket: Bucket, remoteKey: RemoteKey - )(implicit logger: Logger[Id]): DeleteS3Action = { + )(implicit logger: Logger): IO[DeleteS3Action] = { if (bucket == testBucket) deletionsRecord += remoteKey - DeleteS3Action(remoteKey) + IO.pure(DeleteS3Action(remoteKey)) } } } diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/Config.scala b/domain/src/main/scala/net/kemitix/thorp/domain/Config.scala index 072ae70..282c7b3 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/Config.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/Config.scala @@ -11,6 +11,5 @@ final case class Config( debug: Boolean = false, source: File ) { - require(source.isDirectory, s"Source must be a directory: $source") require(multiPartThreshold >= 1024 * 1024 * 5, s"Threshold for multi-part upload is 5Mb: '$multiPartThreshold'") } diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/Logger.scala b/domain/src/main/scala/net/kemitix/thorp/domain/Logger.scala index d3196c1..6e13b78 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/Logger.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/Logger.scala @@ -1,10 +1,17 @@ package net.kemitix.thorp.domain -trait Logger[M[_]] { +import cats.effect.IO - def debug(message: => String): M[Unit] - def info(message: => String): M[Unit] - def warn(message: String): M[Unit] - def error(message: String): M[Unit] +trait Logger { + + // returns an instance of Logger with debug set as indicated + // where the current Logger already matches this state, then + // it returns itself, unmodified + def withDebug(debug: Boolean): Logger + + def debug(message: => String): IO[Unit] + def info(message: => String): IO[Unit] + def warn(message: String): IO[Unit] + def error(message: String): IO[Unit] } diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/RemoteKey.scala b/domain/src/main/scala/net/kemitix/thorp/domain/RemoteKey.scala index 599b6fe..f75cb8c 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/RemoteKey.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/RemoteKey.scala @@ -4,10 +4,21 @@ import java.io.File import java.nio.file.Paths final case class RemoteKey(key: String) { + def asFile(source: File, prefix: RemoteKey): File = - source.toPath.resolve(Paths.get(prefix.key).relativize(Paths.get(key))).toFile + source.toPath.resolve(relativeTo(prefix)).toFile + + private def relativeTo(prefix: RemoteKey) = { + prefix match { + case RemoteKey("") => Paths.get(prefix.key) + case _ => Paths.get(prefix.key).relativize(Paths.get(key)) + } + } + def isMissingLocally(source: File, prefix: RemoteKey): Boolean = ! asFile(source, prefix).exists + def resolve(path: String): RemoteKey = RemoteKey(key + "/" + path) + }