From 8d0c3e23c9f155b1f76fd52054513a490e84bb54 Mon Sep 17 00:00:00 2001 From: Paul Campbell Date: Fri, 14 Jun 2019 08:51:05 +0100 Subject: [PATCH] Improve purity by moving all IO out to the edge (#52) * [aws-api] TTFI UploadProgressLogging#logTransfer * [aws-api] TTFI UploadProgressLogging#logRequestCycle * [aws-api] TTFI UploadProgressLogging#logByteTransfer * [aws-api] TTFI UploadProgressListener * [aws-lib] TTFI UploaderLogging * [aws-api] UploadProgressListener refactoring * [aws-api] UploadProgressListener remove IO/Monad This class is a wrapper for a callback method used by the AWS SDK. Unfortunately you can't get the IO() created when that listener is called by the SDK, so unless we manually run unsafeRunSync, as we have done previously, it would never be executed. This removes the IO monad entirely and simply runs the code when the callback is triggered. * [aws-lib] S3ClientLogging remove unused method * [aws-lib] TTFI S3ClientLogging * [aws-lib] TTFI S3ClientCopier * [aws-lib] TTFI S3ClientObjectLister * [aws-lib] TTFI Uploader * [aws-lib] TTFI S3ClientDeleter * [aws-api] TTFI S3Client * [aws-lib] TTFI S3ClientBuilder and ThorpS3Client * [core] TTFI ActionSubmitter * [cli] TTFI Logger * [core] TTFI MD5HashGenerator * [core] TTFI LocalFileStream * [core] Sync refactoring * [core] TTFI Sync * [aws-lib] S3ObjectsByHashSuite truncate lastmodified to match Date * [aws-lib] ThorpS3ClientSuite truncate lastmodified to match Date * [core] MD5HashGeneratorTest switch to Id from IO * [sbt] restrict cats-effect to cli module, cats-core elsewhere * [core] MD5HashGenerator collapse lines --- .../kemitix/s3thorp/aws/api/S3Client.scala | 13 ++-- .../aws/api/UploadProgressListener.scala | 10 +-- .../aws/api/UploadProgressLogging.scala | 22 +++--- .../s3thorp/aws/lib/S3ClientBuilder.scala | 7 +- .../s3thorp/aws/lib/S3ClientCopier.scala | 32 +++++---- .../s3thorp/aws/lib/S3ClientDeleter.scala | 13 ++-- .../s3thorp/aws/lib/S3ClientLogging.scala | 41 +++++------ .../aws/lib/S3ClientObjectLister.scala | 43 ++++++------ .../s3thorp/aws/lib/ThorpS3Client.scala | 24 +++---- .../kemitix/s3thorp/aws/lib/Uploader.scala | 49 ++++++------- .../s3thorp/aws/lib/UploaderLogging.scala | 10 +-- .../s3thorp/aws/lib/S3ClientSuite.scala | 10 +-- .../aws/lib/S3ObjectsByHashSuite.scala | 3 +- .../s3thorp/aws/lib/ThorpS3ClientSuite.scala | 11 +-- .../s3thorp/aws/lib/UploaderSuite.scala | 8 +-- build.sbt | 16 ++++- .../net/kemitix/s3thorp/cli/Logger.scala | 15 ++-- .../scala/net/kemitix/s3thorp/cli/Main.scala | 20 +++--- .../ActionSubmitter.scala | 13 ++-- .../LocalFileStream.scala | 27 ++++---- .../MD5HashGenerator.scala | 42 ++++++------ .../scala/net.kemitix.s3thorp.core/Sync.scala | 48 ++++++++----- .../SyncLogging.scala | 21 +++--- .../s3thorp/core/LocalFileStreamSuite.scala | 10 +-- .../s3thorp/core/MD5HashGeneratorTest.scala | 10 ++- .../net/kemitix/s3thorp/core/SyncSuite.scala | 68 +++++++++---------- 26 files changed, 304 insertions(+), 282 deletions(-) diff --git a/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/S3Client.scala b/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/S3Client.scala index 0b0b9ff..ebe2fb1 100644 --- a/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/S3Client.scala +++ b/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/S3Client.scala @@ -1,14 +1,13 @@ package net.kemitix.s3thorp.aws.api -import cats.effect.IO import net.kemitix.s3thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action} import net.kemitix.s3thorp.domain.{Bucket, LocalFile, MD5Hash, RemoteKey, S3ObjectsData} -trait S3Client { +trait S3Client[M[_]] { def listObjects(bucket: Bucket, prefix: RemoteKey - )(implicit info: Int => String => IO[Unit]): IO[S3ObjectsData] + )(implicit info: Int => String => M[Unit]): M[S3ObjectsData] def upload(localFile: LocalFile, bucket: Bucket, @@ -16,17 +15,17 @@ trait S3Client { multiPartThreshold: Long, tryCount: Int, maxRetries: Int) - (implicit info: Int => String => IO[Unit], - warn: String => IO[Unit]): IO[S3Action] + (implicit info: Int => String => M[Unit], + warn: String => M[Unit]): M[S3Action] def copy(bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey - )(implicit info: Int => String => IO[Unit]): IO[CopyS3Action] + )(implicit info: Int => String => M[Unit]): M[CopyS3Action] def delete(bucket: Bucket, remoteKey: RemoteKey - )(implicit info: Int => String => IO[Unit]): IO[DeleteS3Action] + )(implicit info: Int => String => M[Unit]): M[DeleteS3Action] } diff --git a/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressListener.scala b/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressListener.scala index 879e015..ad32f74 100644 --- a/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressListener.scala +++ b/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressListener.scala @@ -1,23 +1,19 @@ package net.kemitix.s3thorp.aws.api -import cats.effect.IO import net.kemitix.s3thorp.aws.api.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent} import net.kemitix.s3thorp.domain.LocalFile class UploadProgressListener(localFile: LocalFile) - (implicit info: Int => String => IO[Unit]) extends UploadProgressLogging { var bytesTransferred = 0L - def listener: UploadEvent => IO[Unit] = + def listener: UploadEvent => Unit = { case e: TransferEvent => logTransfer(localFile, e) - case e: RequestEvent => { - val transferred = e.transferred - bytesTransferred += transferred + case e: RequestEvent => + bytesTransferred += e.transferred logRequestCycle(localFile, e, bytesTransferred) - } case e: ByteTransferEvent => logByteTransfer(e) } } diff --git a/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressLogging.scala b/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressLogging.scala index 3ad231b..63c8125 100644 --- a/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressLogging.scala +++ b/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressLogging.scala @@ -1,9 +1,8 @@ package net.kemitix.s3thorp.aws.api -import cats.effect.IO import net.kemitix.s3thorp.aws.api.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent} import net.kemitix.s3thorp.domain.Terminal.{clearLine, returnToPreviousLine} -import net.kemitix.s3thorp.domain.{Terminal, LocalFile} +import net.kemitix.s3thorp.domain.{LocalFile, Terminal} import net.kemitix.s3thorp.domain.SizeTranslation.sizeInEnglish import scala.io.AnsiColor._ @@ -11,16 +10,14 @@ import scala.io.AnsiColor._ trait UploadProgressLogging { def logTransfer(localFile: LocalFile, - event: TransferEvent) - (implicit info: Int => String => IO[Unit]): IO[Unit] = - info(2)(s"Transfer:${event.name}: ${localFile.remoteKey.key}") + event: TransferEvent): Unit = + println(s"Transfer:${event.name}: ${localFile.remoteKey.key}") private val oneHundredPercent = 100 def logRequestCycle(localFile: LocalFile, - event: RequestEvent, - bytesTransferred: Long) - (implicit info: Int => String => IO[Unit]): IO[Unit] = { + event: RequestEvent, + bytesTransferred: Long): Unit = { val remoteKey = localFile.remoteKey.key val fileLength = localFile.file.length val consoleWidth = Terminal.width - 2 @@ -31,13 +28,12 @@ trait UploadProgressLogging { val bar = s"[$head$tail]" val transferred = sizeInEnglish(bytesTransferred) val fileSize = sizeInEnglish(fileLength) - IO(print(s"${clearLine}Uploading $transferred of $fileSize : $remoteKey\n$bar$returnToPreviousLine")) + print(s"${clearLine}Uploading $transferred of $fileSize : $remoteKey\n$bar$returnToPreviousLine") } else - IO(print(clearLine)) + print(clearLine) } - def logByteTransfer(event: ByteTransferEvent) - (implicit info: Int => String => IO[Unit]): IO[Unit] = - info(3)(".") + def logByteTransfer(event: ByteTransferEvent): Unit = + print(".") } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientBuilder.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientBuilder.scala index 1c2d607..dfcfe74 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientBuilder.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientBuilder.scala @@ -1,16 +1,17 @@ package net.kemitix.s3thorp.aws.lib +import cats.Monad import com.amazonaws.services.s3.transfer.{TransferManager, TransferManagerBuilder} import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder} import net.kemitix.s3thorp.aws.api.S3Client object S3ClientBuilder { - def createClient(amazonS3Client: AmazonS3, - amazonS3TransferManager: TransferManager): S3Client = + def createClient[M[_]: Monad](amazonS3Client: AmazonS3, + amazonS3TransferManager: TransferManager): S3Client[M] = new ThorpS3Client(amazonS3Client, amazonS3TransferManager) - val defaultClient: S3Client = + def defaultClient[M[_]: Monad]: S3Client[M] = createClient(AmazonS3ClientBuilder.defaultClient, TransferManagerBuilder.defaultTransferManager) } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientCopier.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientCopier.scala index 902d95c..1ee9c32 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientCopier.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientCopier.scala @@ -1,32 +1,34 @@ package net.kemitix.s3thorp.aws.lib -import cats.effect.IO +import cats.Monad +import cats.implicits._ import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.CopyObjectRequest import net.kemitix.s3thorp.aws.api.S3Action.CopyS3Action import net.kemitix.s3thorp.aws.lib.S3ClientLogging.{logCopyFinish, logCopyStart} import net.kemitix.s3thorp.domain.{Bucket, MD5Hash, RemoteKey} -class S3ClientCopier(amazonS3: AmazonS3) { +class S3ClientCopier[M[_]: Monad](amazonS3: AmazonS3) { def copy(bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey) - (implicit info: Int => String => IO[Unit]): IO[CopyS3Action] = { - IO { - new CopyObjectRequest( - bucket.name, sourceKey.key, - bucket.name, targetKey.key) + (implicit info: Int => String => M[Unit]): M[CopyS3Action] = + for { + _ <- logCopyStart[M](bucket, sourceKey, targetKey) + _ <- copyObject(bucket, sourceKey, hash, targetKey) + _ <- logCopyFinish[M](bucket, sourceKey,targetKey) + } yield CopyS3Action(targetKey) + + private def copyObject(bucket: Bucket, + sourceKey: RemoteKey, + hash: MD5Hash, + targetKey: RemoteKey) = { + val request = + new CopyObjectRequest(bucket.name, sourceKey.key, bucket.name, targetKey.key) .withMatchingETagConstraint(hash.hash) - }.bracket { - request => - for { - _ <- logCopyStart(bucket, sourceKey, targetKey) - result <- IO(amazonS3.copyObject(request)) - } yield result - }(_ => logCopyFinish(bucket, sourceKey,targetKey)) - .map(_ => CopyS3Action(targetKey)) + Monad[M].pure(amazonS3.copyObject(request)) } } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientDeleter.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientDeleter.scala index 158d3dd..170a049 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientDeleter.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientDeleter.scala @@ -1,24 +1,25 @@ package net.kemitix.s3thorp.aws.lib -import cats.effect.IO +import cats.Monad +import cats.implicits._ import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.DeleteObjectRequest import net.kemitix.s3thorp.aws.api.S3Action.DeleteS3Action import net.kemitix.s3thorp.aws.lib.S3ClientLogging.{logDeleteFinish, logDeleteStart} import net.kemitix.s3thorp.domain.{Bucket, RemoteKey} -class S3ClientDeleter(amazonS3: AmazonS3) { +class S3ClientDeleter[M[_]: Monad](amazonS3: AmazonS3) { def delete(bucket: Bucket, remoteKey: RemoteKey) - (implicit info: Int => String => IO[Unit]): IO[DeleteS3Action] = + (implicit info: Int => String => M[Unit]): M[DeleteS3Action] = for { - _ <- logDeleteStart(bucket, remoteKey) + _ <- logDeleteStart[M](bucket, remoteKey) _ <- deleteObject(bucket, remoteKey) - _ <- logDeleteFinish(bucket, remoteKey) + _ <- logDeleteFinish[M](bucket, remoteKey) } yield DeleteS3Action(remoteKey) - private def deleteObject(bucket: Bucket, remoteKey: RemoteKey) = IO { + private def deleteObject(bucket: Bucket, remoteKey: RemoteKey) = Monad[M].pure { amazonS3.deleteObject(new DeleteObjectRequest(bucket.name, remoteKey.key)) } } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientLogging.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientLogging.scala index 4392bba..4e9669d 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientLogging.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientLogging.scala @@ -1,53 +1,46 @@ package net.kemitix.s3thorp.aws.lib -import cats.effect.IO +import cats.Monad import com.amazonaws.services.s3.model.PutObjectResult import net.kemitix.s3thorp.domain.{Bucket, LocalFile, RemoteKey} object S3ClientLogging { - def logListObjectsStart(bucket: Bucket, + def logListObjectsStart[M[_]: Monad](bucket: Bucket, prefix: RemoteKey) - (implicit info: Int => String => IO[Unit]): IO[Unit] = + (implicit info: Int => String => M[Unit]): M[Unit] = info(1)(s"Fetch S3 Summary: ${bucket.name}:${prefix.key}") - def logListObjectsFinish(bucket: Bucket, + def logListObjectsFinish[M[_]: Monad](bucket: Bucket, prefix: RemoteKey) - (implicit info: Int => String => IO[Unit]): IO[Unit] = + (implicit info: Int => String => M[Unit]): M[Unit] = info(2)(s"Fetched S3 Summary: ${bucket.name}:${prefix.key}") - def logUploadStart(localFile: LocalFile, - bucket: Bucket) - (implicit info: Int => String => IO[Unit]): PutObjectResult => IO[PutObjectResult] = - in => for { - _ <- info(1)(s"Uploading: ${bucket.name}:${localFile.remoteKey.key}") - } yield in - - def logUploadFinish(localFile: LocalFile, + def logUploadFinish[M[_]: Monad](localFile: LocalFile, bucket: Bucket) - (implicit info: Int => String => IO[Unit]): PutObjectResult => IO[Unit] = + (implicit info: Int => String => M[Unit]): PutObjectResult => M[Unit] = _ => info(2)(s"Uploaded: ${bucket.name}:${localFile.remoteKey.key}") - def logCopyStart(bucket: Bucket, - sourceKey: RemoteKey, - targetKey: RemoteKey) - (implicit info: Int => String => IO[Unit]): IO[Unit] = + def logCopyStart[M[_]: Monad](bucket: Bucket, + sourceKey: RemoteKey, + targetKey: RemoteKey) + (implicit info: Int => String => M[Unit]): M[Unit] = info(1)(s"Copy: ${bucket.name}:${sourceKey.key} => ${targetKey.key}") - def logCopyFinish(bucket: Bucket, + def logCopyFinish[M[_]: Monad](bucket: Bucket, sourceKey: RemoteKey, targetKey: RemoteKey) - (implicit info: Int => String => IO[Unit]): IO[Unit] = + (implicit info: Int => String => M[Unit]): M[Unit] = info(2)(s"Copied: ${bucket.name}:${sourceKey.key} => ${targetKey.key}") - def logDeleteStart(bucket: Bucket, + def logDeleteStart[M[_]: Monad](bucket: Bucket, remoteKey: RemoteKey) - (implicit info: Int => String => IO[Unit]): IO[Unit] = + (implicit info: Int => String => M[Unit]): M[Unit] = info(1)(s"Delete: ${bucket.name}:${remoteKey.key}") - def logDeleteFinish(bucket: Bucket, + def logDeleteFinish[M[_]: Monad](bucket: Bucket, remoteKey: RemoteKey) - (implicit info: Int => String => IO[Unit]): IO[Unit] = + (implicit info: Int => String => M[Unit]): M[Unit] = info(2)(s"Deleted: ${bucket.name}:${remoteKey.key}") } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientObjectLister.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientObjectLister.scala index 2cc70d8..dd5898c 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientObjectLister.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientObjectLister.scala @@ -1,6 +1,7 @@ package net.kemitix.s3thorp.aws.lib -import cats.effect.IO +import cats.Monad +import cats.implicits._ import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary} import net.kemitix.s3thorp.aws.lib.S3ClientLogging.{logListObjectsFinish, logListObjectsStart} @@ -10,11 +11,11 @@ import net.kemitix.s3thorp.domain._ import scala.collection.JavaConverters._ -class S3ClientObjectLister(amazonS3: AmazonS3) { +class S3ClientObjectLister[M[_]: Monad](amazonS3: AmazonS3) { def listObjects(bucket: Bucket, prefix: RemoteKey) - (implicit info: Int => String => IO[Unit]): IO[S3ObjectsData] = { + (implicit info: Int => String => M[Unit]): M[S3ObjectsData] = { type Token = String type Batch = (Stream[S3ObjectSummary], Option[Token]) @@ -24,8 +25,8 @@ class S3ClientObjectLister(amazonS3: AmazonS3) { .withPrefix(prefix.key) .withContinuationToken(token) - def fetchBatch: ListObjectsV2Request => IO[Batch] = - request => IO { + def fetchBatch: ListObjectsV2Request => M[Batch] = + request => Monad[M].pure { val result = amazonS3.listObjectsV2(request) val more: Option[Token] = if (result.isTruncated) Some(result.getNextContinuationToken) @@ -33,29 +34,27 @@ class S3ClientObjectLister(amazonS3: AmazonS3) { (result.getObjectSummaries.asScala.toStream, more) } - def fetch: ListObjectsV2Request => IO[Stream[S3ObjectSummary]] = + def fetchMore(more: Option[Token]): M[Stream[S3ObjectSummary]] = { + more match { + case None => Monad[M].pure(Stream.empty) + case Some(token) => fetch(requestMore(token)) + } + } + + def fetch: ListObjectsV2Request => M[Stream[S3ObjectSummary]] = request => for { batch <- fetchBatch(request) (summaries, more) = batch - rest <- more match { - case None => IO{Stream()} - case Some(token) => fetch(requestMore(token)) - } + rest <- fetchMore(more) } yield summaries ++ rest - IO { - new ListObjectsV2Request() - .withBucketName(bucket.name) - .withPrefix(prefix.key) - }.bracket { - request => - for { - _ <- logListObjectsStart(bucket, prefix) - summaries <- fetch(request) - } yield summaries - }(_ => logListObjectsFinish(bucket,prefix)) - .map(os => S3ObjectsData(byHash(os), byKey(os))) + for { + _ <- logListObjectsStart[M](bucket, prefix) + r = new ListObjectsV2Request().withBucketName(bucket.name).withPrefix(prefix.key) + summaries <- fetch(r) + _ <- logListObjectsFinish[M](bucket, prefix) + } yield S3ObjectsData(byHash(summaries), byKey(summaries)) } } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/ThorpS3Client.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/ThorpS3Client.scala index fed8353..b821c34 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/ThorpS3Client.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/ThorpS3Client.scala @@ -1,31 +1,31 @@ package net.kemitix.s3thorp.aws.lib -import cats.effect.IO +import cats.Monad import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.transfer.TransferManager import net.kemitix.s3thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action} import net.kemitix.s3thorp.aws.api.{S3Action, S3Client, UploadProgressListener} import net.kemitix.s3thorp.domain._ -class ThorpS3Client(amazonS3Client: => AmazonS3, +class ThorpS3Client[M[_]: Monad](amazonS3Client: => AmazonS3, amazonS3TransferManager: => TransferManager) - extends S3Client { + extends S3Client[M] { - lazy val objectLister = new S3ClientObjectLister(amazonS3Client) - lazy val copier = new S3ClientCopier(amazonS3Client) - lazy val uploader = new Uploader(amazonS3TransferManager) - lazy val deleter = new S3ClientDeleter(amazonS3Client) + lazy val objectLister = new S3ClientObjectLister[M](amazonS3Client) + lazy val copier = new S3ClientCopier[M](amazonS3Client) + lazy val uploader = new Uploader[M](amazonS3TransferManager) + lazy val deleter = new S3ClientDeleter[M](amazonS3Client) override def listObjects(bucket: Bucket, prefix: RemoteKey) - (implicit info: Int => String => IO[Unit]): IO[S3ObjectsData] = + (implicit info: Int => String => M[Unit]): M[S3ObjectsData] = objectLister.listObjects(bucket, prefix) override def copy(bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey) - (implicit info: Int => String => IO[Unit]): IO[CopyS3Action] = + (implicit info: Int => String => M[Unit]): M[CopyS3Action] = copier.copy(bucket, sourceKey,hash, targetKey) override def upload(localFile: LocalFile, @@ -34,13 +34,13 @@ class ThorpS3Client(amazonS3Client: => AmazonS3, multiPartThreshold: Long, tryCount: Int, maxRetries: Int) - (implicit info: Int => String => IO[Unit], - warn: String => IO[Unit]): IO[S3Action] = + (implicit info: Int => String => M[Unit], + warn: String => M[Unit]): M[S3Action] = uploader.upload(localFile, bucket, progressListener, multiPartThreshold, 1, maxRetries) override def delete(bucket: Bucket, remoteKey: RemoteKey) - (implicit info: Int => String => IO[Unit]): IO[DeleteS3Action] = + (implicit info: Int => String => M[Unit]): M[DeleteS3Action] = deleter.delete(bucket, remoteKey) } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/Uploader.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/Uploader.scala index c2ba261..3576bfa 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/Uploader.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/Uploader.scala @@ -1,6 +1,7 @@ package net.kemitix.s3thorp.aws.lib -import cats.effect.IO +import cats.Monad +import cats.implicits._ import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener} import com.amazonaws.services.s3.model.PutObjectRequest import com.amazonaws.services.s3.transfer.model.UploadResult @@ -13,7 +14,7 @@ import net.kemitix.s3thorp.domain.{Bucket, LocalFile, MD5Hash, RemoteKey} import scala.util.Try -class Uploader(transferManager: => AmazonTransferManager) { +class Uploader[M[_]: Monad](transferManager: => AmazonTransferManager) { def accepts(localFile: LocalFile) (implicit multiPartThreshold: Long): Boolean = @@ -25,24 +26,24 @@ class Uploader(transferManager: => AmazonTransferManager) { multiPartThreshold: Long, tryCount: Int, maxRetries: Int) - (implicit info: Int => String => IO[Unit], - warn: String => IO[Unit]): IO[S3Action] = + (implicit info: Int => String => M[Unit], + warn: String => M[Unit]): M[S3Action] = for { - _ <- logMultiPartUploadStart(localFile, tryCount) - upload <- upload(localFile, bucket, uploadProgressListener) - _ <- logMultiPartUploadFinished(localFile) + _ <- logMultiPartUploadStart[M](localFile, tryCount) + upload <- transfer(localFile, bucket, uploadProgressListener) + _ <- logMultiPartUploadFinished[M](localFile) } yield upload match { case Right(r) => UploadS3Action(RemoteKey(r.getKey), MD5Hash(r.getETag)) case Left(e) => ErroredS3Action(localFile.remoteKey, e) } - private def upload(localFile: LocalFile, - bucket: Bucket, - uploadProgressListener: UploadProgressListener, - ): IO[Either[Throwable, UploadResult]] = { - val listener = progressListener(uploadProgressListener) + private def transfer(localFile: LocalFile, + bucket: Bucket, + uploadProgressListener: UploadProgressListener, + ): M[Either[Throwable, UploadResult]] = { + val listener: ProgressListener = progressListener(uploadProgressListener) val putObjectRequest = request(localFile, bucket, listener) - IO { + Monad[M].pure { Try(transferManager.upload(putObjectRequest)) .map(_.waitForUploadResult) .toEither @@ -56,16 +57,18 @@ class Uploader(transferManager: => AmazonTransferManager) { private def progressListener(uploadProgressListener: UploadProgressListener) = new ProgressListener { override def progressChanged(progressEvent: ProgressEvent): Unit = { - uploadProgressListener.listener( - progressEvent match { - case e: ProgressEvent if isTransfer(e) => - TransferEvent(e.getEventType.name) - case e: ProgressEvent if isByteTransfer(e) => - ByteTransferEvent(e.getEventType.name) - case e: ProgressEvent => - RequestEvent(e.getEventType.name, e.getBytes, e.getBytesTransferred) - }) - .unsafeRunSync // the listener doesn't execute otherwise as it is never returned + uploadProgressListener.listener(eventHandler(progressEvent)) + } + + private def eventHandler(progressEvent: ProgressEvent) = { + progressEvent match { + case e: ProgressEvent if isTransfer(e) => + TransferEvent(e.getEventType.name) + case e: ProgressEvent if isByteTransfer(e) => + ByteTransferEvent(e.getEventType.name) + case e: ProgressEvent => + RequestEvent(e.getEventType.name, e.getBytes, e.getBytesTransferred) + } } } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/UploaderLogging.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/UploaderLogging.scala index 645d138..207c111 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/UploaderLogging.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/UploaderLogging.scala @@ -1,22 +1,22 @@ package net.kemitix.s3thorp.aws.lib -import cats.effect.IO +import cats.Monad import net.kemitix.s3thorp.domain.Terminal.clearLine import net.kemitix.s3thorp.domain.SizeTranslation.sizeInEnglish import net.kemitix.s3thorp.domain.LocalFile object UploaderLogging { - def logMultiPartUploadStart(localFile: LocalFile, + def logMultiPartUploadStart[M[_]: Monad](localFile: LocalFile, tryCount: Int) - (implicit info: Int => String => IO[Unit]): IO[Unit] = { + (implicit info: Int => String => M[Unit]): M[Unit] = { val tryMessage = if (tryCount == 1) "" else s"try $tryCount" val size = sizeInEnglish(localFile.file.length) info(1)(s"${clearLine}upload:$tryMessage:$size:${localFile.remoteKey.key}") } - def logMultiPartUploadFinished(localFile: LocalFile) - (implicit info: Int => String => IO[Unit]): IO[Unit] = + def logMultiPartUploadFinished[M[_]: Monad](localFile: LocalFile) + (implicit info: Int => String => M[Unit]): M[Unit] = info(4)(s"upload:finished: ${localFile.remoteKey.key}") } diff --git a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ClientSuite.scala b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ClientSuite.scala index d6bba63..c4a0690 100644 --- a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ClientSuite.scala +++ b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ClientSuite.scala @@ -2,7 +2,7 @@ package net.kemitix.s3thorp.aws.lib import java.time.Instant -import cats.effect.IO +import cats.Id import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.PutObjectRequest import com.amazonaws.services.s3.transfer.model.UploadResult @@ -23,8 +23,8 @@ class S3ClientSuite private val prefix = RemoteKey("prefix") implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) - implicit private val logInfo: Int => String => IO[Unit] = _ => _ => IO.unit - implicit private val logWarn: String => IO[Unit] = _ => IO.unit + implicit private val logInfo: Int => String => Id[Unit] = _ => _ => () + implicit private val logWarn: String => Id[Unit] = _ => () private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _ describe("getS3Status") { @@ -44,7 +44,7 @@ class S3ClientSuite keyotherkey.remoteKey -> HashModified(hash, lastModified), keydiffhash.remoteKey -> HashModified(diffhash, lastModified))) - def invoke(self: S3Client, localFile: LocalFile) = { + def invoke(self: S3Client[Id], localFile: LocalFile) = { S3MetaDataEnricher.getS3Status(localFile, s3ObjectsData) } @@ -108,7 +108,7 @@ class S3ClientSuite pending //FIXME: works okay on its own, but fails when run with others val expected = UploadS3Action(remoteKey, rootHash) - val result = s3Client.upload(localFile, bucket, progressListener, config.multiPartThreshold, 1, config.maxRetries).unsafeRunSync + val result = s3Client.upload(localFile, bucket, progressListener, config.multiPartThreshold, 1, config.maxRetries) assertResult(expected)(result) } } diff --git a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ObjectsByHashSuite.scala b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ObjectsByHashSuite.scala index 6ed6f8b..518de05 100644 --- a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ObjectsByHashSuite.scala +++ b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ObjectsByHashSuite.scala @@ -1,6 +1,7 @@ package net.kemitix.s3thorp.aws.lib import java.time.Instant +import java.time.temporal.ChronoUnit import java.util.Date import com.amazonaws.services.s3.model.S3ObjectSummary @@ -13,7 +14,7 @@ class S3ObjectsByHashSuite extends FunSpec { val hash = MD5Hash("hash") val key1 = RemoteKey("key-1") val key2 = RemoteKey("key-2") - val lastModified = LastModified(Instant.now) + val lastModified = LastModified(Instant.now.truncatedTo(ChronoUnit.MILLIS)) val o1 = s3object(hash, key1, lastModified) val o2 = s3object(hash, key2, lastModified) val os = Stream(o1, o2) diff --git a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/ThorpS3ClientSuite.scala b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/ThorpS3ClientSuite.scala index 9214bdc..86bc114 100644 --- a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/ThorpS3ClientSuite.scala +++ b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/ThorpS3ClientSuite.scala @@ -1,9 +1,10 @@ package net.kemitix.s3thorp.aws.lib import java.time.Instant +import java.time.temporal.ChronoUnit import java.util.Date -import cats.effect.IO +import cats.Id import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.{ListObjectsV2Request, ListObjectsV2Result, S3ObjectSummary} import com.amazonaws.services.s3.transfer.TransferManager @@ -20,9 +21,9 @@ class ThorpS3ClientSuite val source = Resource(this, "upload") val prefix = RemoteKey("prefix") implicit val config: Config = Config(Bucket("bucket"), prefix, source = source) - implicit val logInfo: Int => String => IO[Unit] = _ => _ => IO.unit + implicit val logInfo: Int => String => Id[Unit] = _ => _ => () - val lm = LastModified(Instant.now) + val lm = LastModified(Instant.now.truncatedTo(ChronoUnit.MILLIS)) val h1 = MD5Hash("hash1") @@ -47,7 +48,7 @@ class ThorpS3ClientSuite val amazonS3 = stub[AmazonS3] val amazonS3TransferManager = stub[TransferManager] - val s3Client = new ThorpS3Client(amazonS3, amazonS3TransferManager) + val s3Client = new ThorpS3Client[Id](amazonS3, amazonS3TransferManager) val myFakeResponse = new ListObjectsV2Result() val summaries = myFakeResponse.getObjectSummaries @@ -65,7 +66,7 @@ class ThorpS3ClientSuite k1a -> HashModified(h1, lm), k1b -> HashModified(h1, lm), k2 -> HashModified(h2, lm))) - val result = s3Client.listObjects(Bucket("bucket"), RemoteKey("prefix")).unsafeRunSync + val result = s3Client.listObjects(Bucket("bucket"), RemoteKey("prefix")) assertResult(expected.byHash.keys)(result.byHash.keys) assertResult(expected.byKey.keys)(result.byKey.keys) assertResult(expected)(result) diff --git a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/UploaderSuite.scala b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/UploaderSuite.scala index 7add7ca..0e681cb 100644 --- a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/UploaderSuite.scala +++ b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/UploaderSuite.scala @@ -2,7 +2,7 @@ package net.kemitix.s3thorp.aws.lib import java.time.Instant -import cats.effect.IO +import cats.Id import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.transfer._ import net.kemitix.s3thorp.aws.api.S3Action.UploadS3Action @@ -20,8 +20,8 @@ class UploaderSuite private val source = Resource(this, ".") private val prefix = RemoteKey("prefix") implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) - implicit private val logInfo: Int => String => IO[Unit] = _ => _ => IO.unit - implicit private val logWarn: String => IO[Unit] = _ => IO.unit + implicit private val logInfo: Int => String => Id[Unit] = _ => _ => () + implicit private val logWarn: String => Id[Unit] = _ => () private val fileToKey = generateKey(config.source, config.prefix) _ val lastModified = LastModified(Instant.now()) @@ -63,7 +63,7 @@ class UploaderSuite val uploader = new Uploader(amazonS3TransferManager) it("should upload") { val expected = UploadS3Action(returnedKey, returnedHash) - val result = uploader.upload(bigFile, config.bucket, progressListener, config.multiPartThreshold, 1, config.maxRetries).unsafeRunSync + val result = uploader.upload(bigFile, config.bucket, progressListener, config.multiPartThreshold, 1, config.maxRetries) assertResult(expected)(result) } } diff --git a/build.sbt b/build.sbt index 6b918b4..eba0043 100644 --- a/build.sbt +++ b/build.sbt @@ -27,6 +27,19 @@ val awsSdkDependencies = Seq( "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % "2.9.9" ) ) +val catsSettings = Seq ( + libraryDependencies ++= Seq( + "org.typelevel" %% "cats-core" % "1.6.1" + ), + // recommended for cats-effects + scalacOptions ++= Seq( + "-feature", + "-deprecation", + "-unchecked", + "-language:postfixOps", + "-language:higherKinds", + "-Ypartial-unification") +) val catsEffectsSettings = Seq( libraryDependencies ++= Seq( "org.typelevel" %% "cats-effect" % "1.3.1" @@ -47,6 +60,7 @@ lazy val cli = (project in file("cli")) .settings(commonSettings) .settings(mainClass in assembly := Some("net.kemitix.s3thorp.cli.Main")) .settings(applicationSettings) + .settings(catsEffectsSettings) .aggregate(`aws-lib`, core, `aws-api`, domain) .settings(commandLineParsing) .dependsOn(`aws-lib`) @@ -67,7 +81,7 @@ lazy val core = (project in file("core")) lazy val `aws-api` = (project in file("aws-api")) .settings(commonSettings) .settings(assemblyJarName in assembly := "aws-api.jar") - .settings(catsEffectsSettings) + .settings(catsSettings) .dependsOn(domain) lazy val domain = (project in file("domain")) diff --git a/cli/src/main/scala/net/kemitix/s3thorp/cli/Logger.scala b/cli/src/main/scala/net/kemitix/s3thorp/cli/Logger.scala index 0ea2a5e..6f8fb89 100644 --- a/cli/src/main/scala/net/kemitix/s3thorp/cli/Logger.scala +++ b/cli/src/main/scala/net/kemitix/s3thorp/cli/Logger.scala @@ -1,15 +1,16 @@ package net.kemitix.s3thorp.cli -import cats.effect.IO +import cats.Monad -class Logger(verbosity: Int) { - def info(level: Int)(message: String): IO[Unit] = - if (verbosity >= level) IO(println(s"[INFO:$level] $message")) - else IO.unit +class Logger[M[_]: Monad](verbosity: Int) { - def warn(message: String): IO[Unit] = IO(println(s"[ WARN] $message")) + def info(level: Int)(message: String): M[Unit] = + if (verbosity >= level) Monad[M].pure(println(s"[INFO:$level] $message")) + else Monad[M].unit - def error(message: String): IO[Unit] = IO(println(s"[ ERROR] $message")) + def warn(message: String): M[Unit] = Monad[M].pure(println(s"[ WARN] $message")) + + def error(message: String): M[Unit] = Monad[M].pure(println(s"[ ERROR] $message")) } diff --git a/cli/src/main/scala/net/kemitix/s3thorp/cli/Main.scala b/cli/src/main/scala/net/kemitix/s3thorp/cli/Main.scala index 4b93814..39fe0ec 100644 --- a/cli/src/main/scala/net/kemitix/s3thorp/cli/Main.scala +++ b/cli/src/main/scala/net/kemitix/s3thorp/cli/Main.scala @@ -18,20 +18,24 @@ object Main extends IOApp { def program(args: List[String]): IO[ExitCode] = for { config <- ParseArgs(args, defaultConfig) - logger = new Logger(config.verbose) + logger = new Logger[IO](config.verbose) info = (l: Int) => (m: String) => logger.info(l)(m) - md5HashGenerator = (file: File) => md5File(file)(info) _ <- logger.info(1)("S3Thorp - hashed sync for s3") - _ <- Sync.run( + _ <- Sync.run[IO]( + config, S3ClientBuilder.defaultClient, - md5HashGenerator, - l => i => logger.info(l)(i), - w => logger.warn(w), - e => logger.error(e))(config) + hashGenerator(info), + info, + w => logger.warn(w)) } yield ExitCode.Success + private def hashGenerator(info: Int => String => IO[Unit]) = { + implicit val logInfo: Int => String => IO[Unit] = info + file: File => md5File[IO](file) + } + override def run(args: List[String]): IO[ExitCode] = { - val logger = new Logger(1) + val logger = new Logger[IO](1) program(args) .guaranteeCase { case Canceled => logger.warn("Interrupted") diff --git a/core/src/main/scala/net.kemitix.s3thorp.core/ActionSubmitter.scala b/core/src/main/scala/net.kemitix.s3thorp.core/ActionSubmitter.scala index f588f90..b15ee6b 100644 --- a/core/src/main/scala/net.kemitix.s3thorp.core/ActionSubmitter.scala +++ b/core/src/main/scala/net.kemitix.s3thorp.core/ActionSubmitter.scala @@ -1,6 +1,7 @@ package net.kemitix.s3thorp.core -import cats.effect.IO +import cats.Monad +import cats.implicits._ import net.kemitix.s3thorp.aws.api.S3Action.DoNothingS3Action import net.kemitix.s3thorp.aws.api.{S3Action, S3Client, UploadProgressListener} import net.kemitix.s3thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} @@ -8,10 +9,10 @@ import net.kemitix.s3thorp.domain.Config object ActionSubmitter { - def submitAction(s3Client: S3Client, action: Action) - (implicit c: Config, - info: Int => String => IO[Unit], - warn: String => IO[Unit]): Stream[IO[S3Action]] = { + def submitAction[M[_]: Monad](s3Client: S3Client[M], action: Action) + (implicit c: Config, + info: Int => String => M[Unit], + warn: String => M[Unit]): Stream[M[S3Action]] = { Stream( action match { case ToUpload(bucket, localFile) => @@ -31,7 +32,7 @@ object ActionSubmitter { action <- s3Client.delete(bucket, remoteKey) } yield action case DoNothing(bucket, remoteKey) => - IO.pure(DoNothingS3Action(remoteKey)) + Monad[M].pure(DoNothingS3Action(remoteKey)) }) } } diff --git a/core/src/main/scala/net.kemitix.s3thorp.core/LocalFileStream.scala b/core/src/main/scala/net.kemitix.s3thorp.core/LocalFileStream.scala index ece56c4..10f9f5f 100644 --- a/core/src/main/scala/net.kemitix.s3thorp.core/LocalFileStream.scala +++ b/core/src/main/scala/net.kemitix.s3thorp.core/LocalFileStream.scala @@ -3,23 +3,24 @@ package net.kemitix.s3thorp.core import java.io.File import java.nio.file.Path -import cats.effect.IO +import cats.Monad +import cats.implicits._ import net.kemitix.s3thorp.core.KeyGenerator.generateKey import net.kemitix.s3thorp.domain.{Config, Filter, LocalFile, MD5Hash} object LocalFileStream { - def findFiles(file: File, - md5HashGenerator: File => IO[MD5Hash], - info: Int => String => IO[Unit]) - (implicit c: Config): IO[Stream[LocalFile]] = { + def findFiles[M[_]: Monad](file: File, + md5HashGenerator: File => M[MD5Hash], + info: Int => String => M[Unit]) + (implicit c: Config): M[Stream[LocalFile]] = { val filters: Path => Boolean = Filter.isIncluded(c.filters) - def loop(file: File): IO[Stream[LocalFile]] = { + def loop(file: File): M[Stream[LocalFile]] = { - def dirPaths(file: File): IO[Stream[File]] = - IO { + def dirPaths(file: File): M[Stream[File]] = + Monad[M].pure { Option(file.listFiles) .getOrElse(throw new IllegalArgumentException(s"Directory not found $file")) } @@ -27,23 +28,23 @@ object LocalFileStream { Stream(fs: _*) .filter(f => filters(f.toPath))) - def recurseIntoSubDirectories(file: File)(implicit c: Config): IO[Stream[LocalFile]] = + def recurseIntoSubDirectories(file: File)(implicit c: Config): M[Stream[LocalFile]] = file match { case f if f.isDirectory => loop(file) case _ => for(hash <- md5HashGenerator(file)) yield Stream(LocalFile(file, c.source, hash, generateKey(c.source, c.prefix))) } - def recurse(fs: Stream[File]): IO[Stream[LocalFile]] = - fs.foldLeft(IO.pure(Stream.empty[LocalFile]))((acc, f) => + def recurse(fs: Stream[File]): M[Stream[LocalFile]] = + fs.foldLeft(Monad[M].pure(Stream.empty[LocalFile]))((acc, f) => recurseIntoSubDirectories(f) .flatMap(lfs => acc.map(s => s ++ lfs))) for { - _ <- IO(info(2)(s"- Entering: $file")) + _ <- info(2)(s"- Entering: $file") fs <- dirPaths(file) lfs <- recurse(fs) - _ <- IO(info(5)(s"- Leaving : $file")) + _ <- info(5)(s"- Leaving : $file") } yield lfs } diff --git a/core/src/main/scala/net.kemitix.s3thorp.core/MD5HashGenerator.scala b/core/src/main/scala/net.kemitix.s3thorp.core/MD5HashGenerator.scala index 9f9a247..85f8ee3 100644 --- a/core/src/main/scala/net.kemitix.s3thorp.core/MD5HashGenerator.scala +++ b/core/src/main/scala/net.kemitix.s3thorp.core/MD5HashGenerator.scala @@ -3,23 +3,21 @@ package net.kemitix.s3thorp.core import java.io.{File, FileInputStream} import java.security.MessageDigest -import cats.effect.IO +import cats.Monad +import cats.implicits._ import net.kemitix.s3thorp.domain.MD5Hash import scala.collection.immutable.NumericRange object MD5HashGenerator { - def md5File(file: File) - (implicit info: Int => String => IO[Unit]): IO[MD5Hash] = { + def md5File[M[_]: Monad](file: File) + (implicit info: Int => String => M[Unit]): M[MD5Hash] = { val maxBufferSize = 8048 - val defaultBuffer = new Array[Byte](maxBufferSize) - - def openFile = IO(new FileInputStream(file)) - - def closeFile = {fis: FileInputStream => IO(fis.close())} + def openFile = Monad[M].pure(new FileInputStream(file)) + def closeFile = {fis: FileInputStream => Monad[M].pure(fis.close())} def nextChunkSize(currentOffset: Long) = { // a value between 1 and maxBufferSize @@ -31,25 +29,29 @@ object MD5HashGenerator { def readToBuffer(fis: FileInputStream, currentOffset: Long) = { val buffer = - if (nextChunkSize(currentOffset) < maxBufferSize) - new Array[Byte](nextChunkSize(currentOffset)) - else - defaultBuffer + if (nextChunkSize(currentOffset) < maxBufferSize) new Array[Byte](nextChunkSize(currentOffset)) + else defaultBuffer fis read buffer buffer } - def readFile: IO[String] = openFile - .bracket(fis => IO { + def digestFile(fis: FileInputStream) = + Monad[M].pure { val md5 = MessageDigest getInstance "MD5" NumericRange(0, file.length, maxBufferSize) - .foreach{currentOffset => { - val buffer = readToBuffer(fis, currentOffset) - md5 update buffer - } - } + .foreach { currentOffset => { + val buffer = readToBuffer(fis, currentOffset) + md5 update buffer + }} (md5.digest map ("%02x" format _)).mkString - })(closeFile) + } + + def readFile: M[String] = + for { + fis <- openFile + md5 <- digestFile(fis) + _ <- closeFile(fis) + } yield md5 for { _ <- info(5)(s"md5:reading:size ${file.length}:$file") diff --git a/core/src/main/scala/net.kemitix.s3thorp.core/Sync.scala b/core/src/main/scala/net.kemitix.s3thorp.core/Sync.scala index 6dea7f2..0269b47 100644 --- a/core/src/main/scala/net.kemitix.s3thorp.core/Sync.scala +++ b/core/src/main/scala/net.kemitix.s3thorp.core/Sync.scala @@ -2,7 +2,7 @@ package net.kemitix.s3thorp.core import java.io.File -import cats.effect.IO +import cats.Monad import cats.implicits._ import net.kemitix.s3thorp.aws.api.{S3Action, S3Client} import net.kemitix.s3thorp.core.Action.ToDelete @@ -11,33 +11,47 @@ import net.kemitix.s3thorp.core.ActionSubmitter.submitAction import net.kemitix.s3thorp.core.LocalFileStream.findFiles import net.kemitix.s3thorp.core.S3MetaDataEnricher.getMetadata import net.kemitix.s3thorp.core.SyncLogging.{logFileScan, logRunFinished, logRunStart} -import net.kemitix.s3thorp.domain.{Config, MD5Hash, S3ObjectsData} +import net.kemitix.s3thorp.domain.{Config, LocalFile, MD5Hash, S3MetaData, S3ObjectsData} object Sync { - def run(s3Client: S3Client, - md5HashGenerator: File => IO[MD5Hash], - info: Int => String => IO[Unit], - warn: String => IO[Unit], - error: String => IO[Unit]) - (implicit c: Config): IO[Unit] = { + def run[M[_]: Monad](config: Config, + s3Client: S3Client[M], + md5HashGenerator: File => M[MD5Hash], + info: Int => String => M[Unit], + warn: String => M[Unit]): M[Unit] = { - def copyUploadActions(s3Data: S3ObjectsData): IO[Stream[S3Action]] = + implicit val c: Config = config + implicit val logInfo: Int => String => M[Unit] = info + implicit val logWarn: String => M[Unit] = warn + + def metaData(s3Data: S3ObjectsData, sFiles: Stream[LocalFile]) = + Monad[M].pure(sFiles.map(file => getMetadata(file, s3Data))) + + def actions(sData: Stream[S3MetaData]) = + Monad[M].pure(sData.flatMap(s3MetaData => createActions(s3MetaData))) + + def submit(sActions: Stream[Action]) = + Monad[M].pure(sActions.flatMap(action => submitAction[M](s3Client, action))) + + def copyUploadActions(s3Data: S3ObjectsData): M[Stream[S3Action]] = (for { - sFiles <- findFiles(c.source, md5HashGenerator, info) - sData <- IO(sFiles.map(file => getMetadata(file, s3Data))) - sActions <- IO(sData.flatMap(s3MetaData => createActions(s3MetaData))) - sS3Actions <- IO(sActions.flatMap(action => submitAction(s3Client, action)(c, info, warn))) - } yield sS3Actions.sequence) + files <- findFiles(c.source, md5HashGenerator, info) + metaData <- metaData(s3Data, files) + actions <- actions(metaData) + s3Actions <- submit(actions) + } yield s3Actions.sequence) .flatten .map(streamS3Actions => streamS3Actions.sorted) - def deleteActions(s3ObjectsData: S3ObjectsData): IO[Stream[S3Action]] = + def deleteActions(s3ObjectsData: S3ObjectsData): M[Stream[S3Action]] = (for { key <- s3ObjectsData.byKey.keys if key.isMissingLocally(c.source, c.prefix) - ioDelAction <- submitAction(s3Client, ToDelete(c.bucket, key))(c, info, warn) - } yield ioDelAction).toStream.sequence + ioDelAction <- submitAction[M](s3Client, ToDelete(c.bucket, key)) + } yield ioDelAction) + .toStream + .sequence for { _ <- logRunStart(info) diff --git a/core/src/main/scala/net.kemitix.s3thorp.core/SyncLogging.scala b/core/src/main/scala/net.kemitix.s3thorp.core/SyncLogging.scala index 81c6ad8..806ac16 100644 --- a/core/src/main/scala/net.kemitix.s3thorp.core/SyncLogging.scala +++ b/core/src/main/scala/net.kemitix.s3thorp.core/SyncLogging.scala @@ -1,6 +1,7 @@ package net.kemitix.s3thorp.core -import cats.effect.IO +import cats.Monad +import cats.implicits._ import net.kemitix.s3thorp.aws.api.S3Action import net.kemitix.s3thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action, ErroredS3Action, UploadS3Action} import net.kemitix.s3thorp.domain.Config @@ -8,25 +9,25 @@ import net.kemitix.s3thorp.domain.Config // Logging for the Sync class object SyncLogging { - def logRunStart(info: Int => String => IO[Unit]) - (implicit c: Config): IO[Unit] = + def logRunStart[M[_]: Monad](info: Int => String => M[Unit]) + (implicit c: Config): M[Unit] = info(1)(s"Bucket: ${c.bucket.name}, Prefix: ${c.prefix.key}, Source: ${c.source}, ") - def logFileScan(info: Int => String => IO[Unit]) - (implicit c: Config): IO[Unit] = + def logFileScan[M[_]: Monad](info: Int => String => M[Unit]) + (implicit c: Config): M[Unit] = info(1)(s"Scanning local files: ${c.source}...") - def logRunFinished(actions: Stream[S3Action], - info: Int => String => IO[Unit]) - (implicit c: Config): IO[Unit] = + def logRunFinished[M[_]: Monad](actions: Stream[S3Action], + info: Int => String => M[Unit]) + (implicit c: Config): M[Unit] = { + val counters = actions.foldLeft(Counters())(countActivities) for { - _ <- IO.unit - counters = actions.foldLeft(Counters())(countActivities) _ <- info(1)(s"Uploaded ${counters.uploaded} files") _ <- info(1)(s"Copied ${counters.copied} files") _ <- info(1)(s"Deleted ${counters.deleted} files") _ <- info(1)(s"Errors ${counters.errors}") } yield () + } private def countActivities(implicit c: Config): (Counters, S3Action) => Counters = (counters: Counters, s3Action: S3Action) => { diff --git a/core/src/test/scala/net/kemitix/s3thorp/core/LocalFileStreamSuite.scala b/core/src/test/scala/net/kemitix/s3thorp/core/LocalFileStreamSuite.scala index 76a2ddc..95ec9be 100644 --- a/core/src/test/scala/net/kemitix/s3thorp/core/LocalFileStreamSuite.scala +++ b/core/src/test/scala/net/kemitix/s3thorp/core/LocalFileStreamSuite.scala @@ -2,21 +2,21 @@ package net.kemitix.s3thorp.core import java.io.File -import cats.effect.IO +import cats.Id import net.kemitix.s3thorp.domain.{Config, LocalFile, MD5Hash} import org.scalatest.FunSpec class LocalFileStreamSuite extends FunSpec { val uploadResource = Resource(this, "upload") - val config: Config = Config(source = uploadResource) - implicit private val logInfo: Int => String => IO[Unit] = l => i => IO.unit - val md5HashGenerator: File => IO[MD5Hash] = file => MD5HashGenerator.md5File(file) + implicit val config: Config = Config(source = uploadResource) + implicit private val logInfo: Int => String => Id[Unit] = l => i => () + val md5HashGenerator: File => Id[MD5Hash] = file => MD5HashGenerator.md5File[Id](file) describe("findFiles") { it("should find all files") { val result: Set[String] = - LocalFileStream.findFiles(uploadResource, md5HashGenerator, logInfo)(config).unsafeRunSync.toSet + LocalFileStream.findFiles[Id](uploadResource, md5HashGenerator, logInfo).toSet .map { x: LocalFile => x.relative.toString } assertResult(Set("subdir/leaf-file", "root-file"))(result) } diff --git a/core/src/test/scala/net/kemitix/s3thorp/core/MD5HashGeneratorTest.scala b/core/src/test/scala/net/kemitix/s3thorp/core/MD5HashGeneratorTest.scala index c71000c..674fa22 100644 --- a/core/src/test/scala/net/kemitix/s3thorp/core/MD5HashGeneratorTest.scala +++ b/core/src/test/scala/net/kemitix/s3thorp/core/MD5HashGeneratorTest.scala @@ -1,8 +1,6 @@ package net.kemitix.s3thorp.core -import java.nio.file.Files - -import cats.effect.IO +import cats.Id import net.kemitix.s3thorp.core.MD5HashData.rootHash import net.kemitix.s3thorp.domain.{Bucket, Config, MD5Hash, RemoteKey} import org.scalatest.FunSpec @@ -12,12 +10,12 @@ class MD5HashGeneratorTest extends FunSpec { private val source = Resource(this, "upload") private val prefix = RemoteKey("prefix") implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) - implicit private val logInfo: Int => String => IO[Unit] = l => i => IO.unit + implicit private val logInfo: Int => String => Id[Unit] = l => i => () describe("read a small file (smaller than buffer)") { val file = Resource(this, "upload/root-file") it("should generate the correct hash") { - val result = MD5HashGenerator.md5File(file).unsafeRunSync + val result = MD5HashGenerator.md5File[Id](file) assertResult(rootHash)(result) } } @@ -25,7 +23,7 @@ class MD5HashGeneratorTest extends FunSpec { val file = Resource(this, "big-file") it("should generate the correct hash") { val expected = MD5Hash("b1ab1f7680138e6db7309200584e35d8") - val result = MD5HashGenerator.md5File(file).unsafeRunSync + val result = MD5HashGenerator.md5File[Id](file) assertResult(expected)(result) } } diff --git a/core/src/test/scala/net/kemitix/s3thorp/core/SyncSuite.scala b/core/src/test/scala/net/kemitix/s3thorp/core/SyncSuite.scala index 732f8ee..d310393 100644 --- a/core/src/test/scala/net/kemitix/s3thorp/core/SyncSuite.scala +++ b/core/src/test/scala/net/kemitix/s3thorp/core/SyncSuite.scala @@ -3,7 +3,7 @@ package net.kemitix.s3thorp.core import java.io.File import java.time.Instant -import cats.effect.IO +import cats.Id import net.kemitix.s3thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action, UploadS3Action} import net.kemitix.s3thorp.aws.api.{S3Client, UploadProgressListener} import net.kemitix.s3thorp.core.MD5HashData.{leafHash, rootHash} @@ -16,32 +16,29 @@ class SyncSuite private val source = Resource(this, "upload") private val prefix = RemoteKey("prefix") - implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) - implicit private val logInfo: Int => String => IO[Unit] = _ => _ => IO.unit - implicit private val logWarn: String => IO[Unit] = _ => IO.unit - private def logError: String => IO[Unit] = _ => IO.unit + val config = Config(Bucket("bucket"), prefix, source = source) + implicit private val logInfo: Int => String => Id[Unit] = _ => _ => () + implicit private val logWarn: String => Id[Unit] = _ => () private val lastModified = LastModified(Instant.now) private val fileToKey: File => RemoteKey = KeyGenerator.generateKey(source, prefix) private val rootFile = LocalFile.resolve("root-file", rootHash, source, fileToKey) private val leafFile = LocalFile.resolve("subdir/leaf-file", leafHash, source, fileToKey) - private val md5HashGenerator = MD5HashGenerator.md5File(_) + private val md5HashGenerator = MD5HashGenerator.md5File[Id](_) - def putObjectRequest(bucket: Bucket, remoteKey: RemoteKey, localFile: LocalFile) = { + def putObjectRequest(bucket: Bucket, remoteKey: RemoteKey, localFile: LocalFile): (String, String, File) = (bucket.name, remoteKey.key, localFile.file) - } describe("run") { val testBucket = Bucket("bucket") // source contains the files root-file and subdir/leaf-file - val config = Config(Bucket("bucket"), RemoteKey("prefix"), source = source) val rootRemoteKey = RemoteKey("prefix/root-file") val leafRemoteKey = RemoteKey("prefix/subdir/leaf-file") describe("when all files should be uploaded") { val s3Client = new RecordingClient(testBucket, S3ObjectsData( byHash = Map(), byKey = Map())) - Sync.run(s3Client, md5HashGenerator, logInfo, logWarn, logError)(config).unsafeRunSync + Sync.run(config, s3Client, md5HashGenerator, logInfo, logWarn) it("uploads all files") { val expectedUploads = Map( "subdir/leaf-file" -> leafRemoteKey, @@ -67,7 +64,7 @@ class SyncSuite RemoteKey("prefix/root-file") -> HashModified(rootHash, lastModified), RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified))) val s3Client = new RecordingClient(testBucket, s3ObjectsData) - Sync.run(s3Client, md5HashGenerator, logInfo, logWarn, logError)(config).unsafeRunSync + Sync.run(config, s3Client, md5HashGenerator, logInfo, logWarn) it("uploads nothing") { val expectedUploads = Map() assertResult(expectedUploads)(s3Client.uploadsRecord) @@ -91,7 +88,7 @@ class SyncSuite RemoteKey("prefix/root-file-old") -> HashModified(rootHash, lastModified), RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified))) val s3Client = new RecordingClient(testBucket, s3ObjectsData) - Sync.run(s3Client, md5HashGenerator, logInfo, logWarn, logError)(config).unsafeRunSync + Sync.run(config, s3Client, md5HashGenerator, logInfo, logWarn) it("uploads nothing") { val expectedUploads = Map() assertResult(expectedUploads)(s3Client.uploadsRecord) @@ -119,17 +116,17 @@ class SyncSuite byKey = Map( deletedKey -> HashModified(deletedHash, lastModified))) val s3Client = new RecordingClient(testBucket, s3ObjectsData) - Sync.run(s3Client, md5HashGenerator, logInfo, logWarn, logError)(config).unsafeRunSync + Sync.run(config, s3Client, md5HashGenerator, logInfo, logWarn) it("deleted key") { val expectedDeletions = Set(deletedKey) assertResult(expectedDeletions)(s3Client.deletionsRecord) } } describe("when a file is excluded") { - val configWithExclusion = config.copy(filters = List(Exclude("leaf"))) + val config: Config = Config(Bucket("bucket"), prefix, source = source, filters = List(Exclude("leaf"))) val s3ObjectsData = S3ObjectsData(Map(), Map()) val s3Client = new RecordingClient(testBucket, s3ObjectsData) - Sync.run(s3Client, md5HashGenerator, logInfo, logWarn, logError)(configWithExclusion).unsafeRunSync + Sync.run(config, s3Client, md5HashGenerator, logInfo, logWarn) it("is not uploaded") { val expectedUploads = Map( "root-file" -> rootRemoteKey @@ -141,7 +138,7 @@ class SyncSuite class RecordingClient(testBucket: Bucket, s3ObjectsData: S3ObjectsData) - extends S3Client { + extends S3Client[Id] { var uploadsRecord: Map[String, RemoteKey] = Map() var copiesRecord: Map[RemoteKey, RemoteKey] = Map() @@ -149,8 +146,8 @@ class SyncSuite override def listObjects(bucket: Bucket, prefix: RemoteKey) - (implicit info: Int => String => IO[Unit]) = - IO.pure(s3ObjectsData) + (implicit info: Int => String => Id[Unit]): S3ObjectsData = + s3ObjectsData override def upload(localFile: LocalFile, bucket: Bucket, @@ -158,32 +155,29 @@ class SyncSuite multiPartThreshold: Long, tryCount: Int, maxRetries: Int) - (implicit info: Int => String => IO[Unit], - warn: String => IO[Unit]) = - IO { - if (bucket == testBucket) - uploadsRecord += (localFile.relative.toString -> localFile.remoteKey) - UploadS3Action(localFile.remoteKey, MD5Hash("some hash value")) - } + (implicit info: Int => String => Id[Unit], + warn: String => Id[Unit]): UploadS3Action = { + if (bucket == testBucket) + uploadsRecord += (localFile.relative.toString -> localFile.remoteKey) + UploadS3Action(localFile.remoteKey, MD5Hash("some hash value")) + } override def copy(bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey - )(implicit info: Int => String => IO[Unit]) = - IO { - if (bucket == testBucket) - copiesRecord += (sourceKey -> targetKey) - CopyS3Action(targetKey) - } + )(implicit info: Int => String => Id[Unit]): CopyS3Action = { + if (bucket == testBucket) + copiesRecord += (sourceKey -> targetKey) + CopyS3Action(targetKey) + } override def delete(bucket: Bucket, remoteKey: RemoteKey - )(implicit info: Int => String => IO[Unit]) = - IO { - if (bucket == testBucket) - deletionsRecord += remoteKey - DeleteS3Action(remoteKey) - } + )(implicit info: Int => String => Id[Unit]): DeleteS3Action = { + if (bucket == testBucket) + deletionsRecord += remoteKey + DeleteS3Action(remoteKey) + } } }