diff --git a/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/S3Client.scala b/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/S3Client.scala index 0b0b9ff..ebe2fb1 100644 --- a/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/S3Client.scala +++ b/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/S3Client.scala @@ -1,14 +1,13 @@ package net.kemitix.s3thorp.aws.api -import cats.effect.IO import net.kemitix.s3thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action} import net.kemitix.s3thorp.domain.{Bucket, LocalFile, MD5Hash, RemoteKey, S3ObjectsData} -trait S3Client { +trait S3Client[M[_]] { def listObjects(bucket: Bucket, prefix: RemoteKey - )(implicit info: Int => String => IO[Unit]): IO[S3ObjectsData] + )(implicit info: Int => String => M[Unit]): M[S3ObjectsData] def upload(localFile: LocalFile, bucket: Bucket, @@ -16,17 +15,17 @@ trait S3Client { multiPartThreshold: Long, tryCount: Int, maxRetries: Int) - (implicit info: Int => String => IO[Unit], - warn: String => IO[Unit]): IO[S3Action] + (implicit info: Int => String => M[Unit], + warn: String => M[Unit]): M[S3Action] def copy(bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey - )(implicit info: Int => String => IO[Unit]): IO[CopyS3Action] + )(implicit info: Int => String => M[Unit]): M[CopyS3Action] def delete(bucket: Bucket, remoteKey: RemoteKey - )(implicit info: Int => String => IO[Unit]): IO[DeleteS3Action] + )(implicit info: Int => String => M[Unit]): M[DeleteS3Action] } diff --git a/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressListener.scala b/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressListener.scala index 879e015..ad32f74 100644 --- a/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressListener.scala +++ b/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressListener.scala @@ -1,23 +1,19 @@ package net.kemitix.s3thorp.aws.api -import cats.effect.IO import net.kemitix.s3thorp.aws.api.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent} import net.kemitix.s3thorp.domain.LocalFile class UploadProgressListener(localFile: LocalFile) - (implicit info: Int => String => IO[Unit]) extends UploadProgressLogging { var bytesTransferred = 0L - def listener: UploadEvent => IO[Unit] = + def listener: UploadEvent => Unit = { case e: TransferEvent => logTransfer(localFile, e) - case e: RequestEvent => { - val transferred = e.transferred - bytesTransferred += transferred + case e: RequestEvent => + bytesTransferred += e.transferred logRequestCycle(localFile, e, bytesTransferred) - } case e: ByteTransferEvent => logByteTransfer(e) } } diff --git a/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressLogging.scala b/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressLogging.scala index 3ad231b..63c8125 100644 --- a/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressLogging.scala +++ b/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressLogging.scala @@ -1,9 +1,8 @@ package net.kemitix.s3thorp.aws.api -import cats.effect.IO import net.kemitix.s3thorp.aws.api.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent} import net.kemitix.s3thorp.domain.Terminal.{clearLine, returnToPreviousLine} -import net.kemitix.s3thorp.domain.{Terminal, LocalFile} +import net.kemitix.s3thorp.domain.{LocalFile, Terminal} import net.kemitix.s3thorp.domain.SizeTranslation.sizeInEnglish import scala.io.AnsiColor._ @@ -11,16 +10,14 @@ import scala.io.AnsiColor._ trait UploadProgressLogging { def logTransfer(localFile: LocalFile, - event: TransferEvent) - (implicit info: Int => String => IO[Unit]): IO[Unit] = - info(2)(s"Transfer:${event.name}: ${localFile.remoteKey.key}") + event: TransferEvent): Unit = + println(s"Transfer:${event.name}: ${localFile.remoteKey.key}") private val oneHundredPercent = 100 def logRequestCycle(localFile: LocalFile, - event: RequestEvent, - bytesTransferred: Long) - (implicit info: Int => String => IO[Unit]): IO[Unit] = { + event: RequestEvent, + bytesTransferred: Long): Unit = { val remoteKey = localFile.remoteKey.key val fileLength = localFile.file.length val consoleWidth = Terminal.width - 2 @@ -31,13 +28,12 @@ trait UploadProgressLogging { val bar = s"[$head$tail]" val transferred = sizeInEnglish(bytesTransferred) val fileSize = sizeInEnglish(fileLength) - IO(print(s"${clearLine}Uploading $transferred of $fileSize : $remoteKey\n$bar$returnToPreviousLine")) + print(s"${clearLine}Uploading $transferred of $fileSize : $remoteKey\n$bar$returnToPreviousLine") } else - IO(print(clearLine)) + print(clearLine) } - def logByteTransfer(event: ByteTransferEvent) - (implicit info: Int => String => IO[Unit]): IO[Unit] = - info(3)(".") + def logByteTransfer(event: ByteTransferEvent): Unit = + print(".") } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientBuilder.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientBuilder.scala index 1c2d607..dfcfe74 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientBuilder.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientBuilder.scala @@ -1,16 +1,17 @@ package net.kemitix.s3thorp.aws.lib +import cats.Monad import com.amazonaws.services.s3.transfer.{TransferManager, TransferManagerBuilder} import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder} import net.kemitix.s3thorp.aws.api.S3Client object S3ClientBuilder { - def createClient(amazonS3Client: AmazonS3, - amazonS3TransferManager: TransferManager): S3Client = + def createClient[M[_]: Monad](amazonS3Client: AmazonS3, + amazonS3TransferManager: TransferManager): S3Client[M] = new ThorpS3Client(amazonS3Client, amazonS3TransferManager) - val defaultClient: S3Client = + def defaultClient[M[_]: Monad]: S3Client[M] = createClient(AmazonS3ClientBuilder.defaultClient, TransferManagerBuilder.defaultTransferManager) } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientCopier.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientCopier.scala index 902d95c..1ee9c32 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientCopier.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientCopier.scala @@ -1,32 +1,34 @@ package net.kemitix.s3thorp.aws.lib -import cats.effect.IO +import cats.Monad +import cats.implicits._ import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.CopyObjectRequest import net.kemitix.s3thorp.aws.api.S3Action.CopyS3Action import net.kemitix.s3thorp.aws.lib.S3ClientLogging.{logCopyFinish, logCopyStart} import net.kemitix.s3thorp.domain.{Bucket, MD5Hash, RemoteKey} -class S3ClientCopier(amazonS3: AmazonS3) { +class S3ClientCopier[M[_]: Monad](amazonS3: AmazonS3) { def copy(bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey) - (implicit info: Int => String => IO[Unit]): IO[CopyS3Action] = { - IO { - new CopyObjectRequest( - bucket.name, sourceKey.key, - bucket.name, targetKey.key) + (implicit info: Int => String => M[Unit]): M[CopyS3Action] = + for { + _ <- logCopyStart[M](bucket, sourceKey, targetKey) + _ <- copyObject(bucket, sourceKey, hash, targetKey) + _ <- logCopyFinish[M](bucket, sourceKey,targetKey) + } yield CopyS3Action(targetKey) + + private def copyObject(bucket: Bucket, + sourceKey: RemoteKey, + hash: MD5Hash, + targetKey: RemoteKey) = { + val request = + new CopyObjectRequest(bucket.name, sourceKey.key, bucket.name, targetKey.key) .withMatchingETagConstraint(hash.hash) - }.bracket { - request => - for { - _ <- logCopyStart(bucket, sourceKey, targetKey) - result <- IO(amazonS3.copyObject(request)) - } yield result - }(_ => logCopyFinish(bucket, sourceKey,targetKey)) - .map(_ => CopyS3Action(targetKey)) + Monad[M].pure(amazonS3.copyObject(request)) } } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientDeleter.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientDeleter.scala index 158d3dd..170a049 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientDeleter.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientDeleter.scala @@ -1,24 +1,25 @@ package net.kemitix.s3thorp.aws.lib -import cats.effect.IO +import cats.Monad +import cats.implicits._ import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.DeleteObjectRequest import net.kemitix.s3thorp.aws.api.S3Action.DeleteS3Action import net.kemitix.s3thorp.aws.lib.S3ClientLogging.{logDeleteFinish, logDeleteStart} import net.kemitix.s3thorp.domain.{Bucket, RemoteKey} -class S3ClientDeleter(amazonS3: AmazonS3) { +class S3ClientDeleter[M[_]: Monad](amazonS3: AmazonS3) { def delete(bucket: Bucket, remoteKey: RemoteKey) - (implicit info: Int => String => IO[Unit]): IO[DeleteS3Action] = + (implicit info: Int => String => M[Unit]): M[DeleteS3Action] = for { - _ <- logDeleteStart(bucket, remoteKey) + _ <- logDeleteStart[M](bucket, remoteKey) _ <- deleteObject(bucket, remoteKey) - _ <- logDeleteFinish(bucket, remoteKey) + _ <- logDeleteFinish[M](bucket, remoteKey) } yield DeleteS3Action(remoteKey) - private def deleteObject(bucket: Bucket, remoteKey: RemoteKey) = IO { + private def deleteObject(bucket: Bucket, remoteKey: RemoteKey) = Monad[M].pure { amazonS3.deleteObject(new DeleteObjectRequest(bucket.name, remoteKey.key)) } } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientLogging.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientLogging.scala index 4392bba..4e9669d 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientLogging.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientLogging.scala @@ -1,53 +1,46 @@ package net.kemitix.s3thorp.aws.lib -import cats.effect.IO +import cats.Monad import com.amazonaws.services.s3.model.PutObjectResult import net.kemitix.s3thorp.domain.{Bucket, LocalFile, RemoteKey} object S3ClientLogging { - def logListObjectsStart(bucket: Bucket, + def logListObjectsStart[M[_]: Monad](bucket: Bucket, prefix: RemoteKey) - (implicit info: Int => String => IO[Unit]): IO[Unit] = + (implicit info: Int => String => M[Unit]): M[Unit] = info(1)(s"Fetch S3 Summary: ${bucket.name}:${prefix.key}") - def logListObjectsFinish(bucket: Bucket, + def logListObjectsFinish[M[_]: Monad](bucket: Bucket, prefix: RemoteKey) - (implicit info: Int => String => IO[Unit]): IO[Unit] = + (implicit info: Int => String => M[Unit]): M[Unit] = info(2)(s"Fetched S3 Summary: ${bucket.name}:${prefix.key}") - def logUploadStart(localFile: LocalFile, - bucket: Bucket) - (implicit info: Int => String => IO[Unit]): PutObjectResult => IO[PutObjectResult] = - in => for { - _ <- info(1)(s"Uploading: ${bucket.name}:${localFile.remoteKey.key}") - } yield in - - def logUploadFinish(localFile: LocalFile, + def logUploadFinish[M[_]: Monad](localFile: LocalFile, bucket: Bucket) - (implicit info: Int => String => IO[Unit]): PutObjectResult => IO[Unit] = + (implicit info: Int => String => M[Unit]): PutObjectResult => M[Unit] = _ => info(2)(s"Uploaded: ${bucket.name}:${localFile.remoteKey.key}") - def logCopyStart(bucket: Bucket, - sourceKey: RemoteKey, - targetKey: RemoteKey) - (implicit info: Int => String => IO[Unit]): IO[Unit] = + def logCopyStart[M[_]: Monad](bucket: Bucket, + sourceKey: RemoteKey, + targetKey: RemoteKey) + (implicit info: Int => String => M[Unit]): M[Unit] = info(1)(s"Copy: ${bucket.name}:${sourceKey.key} => ${targetKey.key}") - def logCopyFinish(bucket: Bucket, + def logCopyFinish[M[_]: Monad](bucket: Bucket, sourceKey: RemoteKey, targetKey: RemoteKey) - (implicit info: Int => String => IO[Unit]): IO[Unit] = + (implicit info: Int => String => M[Unit]): M[Unit] = info(2)(s"Copied: ${bucket.name}:${sourceKey.key} => ${targetKey.key}") - def logDeleteStart(bucket: Bucket, + def logDeleteStart[M[_]: Monad](bucket: Bucket, remoteKey: RemoteKey) - (implicit info: Int => String => IO[Unit]): IO[Unit] = + (implicit info: Int => String => M[Unit]): M[Unit] = info(1)(s"Delete: ${bucket.name}:${remoteKey.key}") - def logDeleteFinish(bucket: Bucket, + def logDeleteFinish[M[_]: Monad](bucket: Bucket, remoteKey: RemoteKey) - (implicit info: Int => String => IO[Unit]): IO[Unit] = + (implicit info: Int => String => M[Unit]): M[Unit] = info(2)(s"Deleted: ${bucket.name}:${remoteKey.key}") } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientObjectLister.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientObjectLister.scala index 2cc70d8..dd5898c 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientObjectLister.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientObjectLister.scala @@ -1,6 +1,7 @@ package net.kemitix.s3thorp.aws.lib -import cats.effect.IO +import cats.Monad +import cats.implicits._ import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary} import net.kemitix.s3thorp.aws.lib.S3ClientLogging.{logListObjectsFinish, logListObjectsStart} @@ -10,11 +11,11 @@ import net.kemitix.s3thorp.domain._ import scala.collection.JavaConverters._ -class S3ClientObjectLister(amazonS3: AmazonS3) { +class S3ClientObjectLister[M[_]: Monad](amazonS3: AmazonS3) { def listObjects(bucket: Bucket, prefix: RemoteKey) - (implicit info: Int => String => IO[Unit]): IO[S3ObjectsData] = { + (implicit info: Int => String => M[Unit]): M[S3ObjectsData] = { type Token = String type Batch = (Stream[S3ObjectSummary], Option[Token]) @@ -24,8 +25,8 @@ class S3ClientObjectLister(amazonS3: AmazonS3) { .withPrefix(prefix.key) .withContinuationToken(token) - def fetchBatch: ListObjectsV2Request => IO[Batch] = - request => IO { + def fetchBatch: ListObjectsV2Request => M[Batch] = + request => Monad[M].pure { val result = amazonS3.listObjectsV2(request) val more: Option[Token] = if (result.isTruncated) Some(result.getNextContinuationToken) @@ -33,29 +34,27 @@ class S3ClientObjectLister(amazonS3: AmazonS3) { (result.getObjectSummaries.asScala.toStream, more) } - def fetch: ListObjectsV2Request => IO[Stream[S3ObjectSummary]] = + def fetchMore(more: Option[Token]): M[Stream[S3ObjectSummary]] = { + more match { + case None => Monad[M].pure(Stream.empty) + case Some(token) => fetch(requestMore(token)) + } + } + + def fetch: ListObjectsV2Request => M[Stream[S3ObjectSummary]] = request => for { batch <- fetchBatch(request) (summaries, more) = batch - rest <- more match { - case None => IO{Stream()} - case Some(token) => fetch(requestMore(token)) - } + rest <- fetchMore(more) } yield summaries ++ rest - IO { - new ListObjectsV2Request() - .withBucketName(bucket.name) - .withPrefix(prefix.key) - }.bracket { - request => - for { - _ <- logListObjectsStart(bucket, prefix) - summaries <- fetch(request) - } yield summaries - }(_ => logListObjectsFinish(bucket,prefix)) - .map(os => S3ObjectsData(byHash(os), byKey(os))) + for { + _ <- logListObjectsStart[M](bucket, prefix) + r = new ListObjectsV2Request().withBucketName(bucket.name).withPrefix(prefix.key) + summaries <- fetch(r) + _ <- logListObjectsFinish[M](bucket, prefix) + } yield S3ObjectsData(byHash(summaries), byKey(summaries)) } } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/ThorpS3Client.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/ThorpS3Client.scala index fed8353..b821c34 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/ThorpS3Client.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/ThorpS3Client.scala @@ -1,31 +1,31 @@ package net.kemitix.s3thorp.aws.lib -import cats.effect.IO +import cats.Monad import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.transfer.TransferManager import net.kemitix.s3thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action} import net.kemitix.s3thorp.aws.api.{S3Action, S3Client, UploadProgressListener} import net.kemitix.s3thorp.domain._ -class ThorpS3Client(amazonS3Client: => AmazonS3, +class ThorpS3Client[M[_]: Monad](amazonS3Client: => AmazonS3, amazonS3TransferManager: => TransferManager) - extends S3Client { + extends S3Client[M] { - lazy val objectLister = new S3ClientObjectLister(amazonS3Client) - lazy val copier = new S3ClientCopier(amazonS3Client) - lazy val uploader = new Uploader(amazonS3TransferManager) - lazy val deleter = new S3ClientDeleter(amazonS3Client) + lazy val objectLister = new S3ClientObjectLister[M](amazonS3Client) + lazy val copier = new S3ClientCopier[M](amazonS3Client) + lazy val uploader = new Uploader[M](amazonS3TransferManager) + lazy val deleter = new S3ClientDeleter[M](amazonS3Client) override def listObjects(bucket: Bucket, prefix: RemoteKey) - (implicit info: Int => String => IO[Unit]): IO[S3ObjectsData] = + (implicit info: Int => String => M[Unit]): M[S3ObjectsData] = objectLister.listObjects(bucket, prefix) override def copy(bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey) - (implicit info: Int => String => IO[Unit]): IO[CopyS3Action] = + (implicit info: Int => String => M[Unit]): M[CopyS3Action] = copier.copy(bucket, sourceKey,hash, targetKey) override def upload(localFile: LocalFile, @@ -34,13 +34,13 @@ class ThorpS3Client(amazonS3Client: => AmazonS3, multiPartThreshold: Long, tryCount: Int, maxRetries: Int) - (implicit info: Int => String => IO[Unit], - warn: String => IO[Unit]): IO[S3Action] = + (implicit info: Int => String => M[Unit], + warn: String => M[Unit]): M[S3Action] = uploader.upload(localFile, bucket, progressListener, multiPartThreshold, 1, maxRetries) override def delete(bucket: Bucket, remoteKey: RemoteKey) - (implicit info: Int => String => IO[Unit]): IO[DeleteS3Action] = + (implicit info: Int => String => M[Unit]): M[DeleteS3Action] = deleter.delete(bucket, remoteKey) } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/Uploader.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/Uploader.scala index c2ba261..3576bfa 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/Uploader.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/Uploader.scala @@ -1,6 +1,7 @@ package net.kemitix.s3thorp.aws.lib -import cats.effect.IO +import cats.Monad +import cats.implicits._ import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener} import com.amazonaws.services.s3.model.PutObjectRequest import com.amazonaws.services.s3.transfer.model.UploadResult @@ -13,7 +14,7 @@ import net.kemitix.s3thorp.domain.{Bucket, LocalFile, MD5Hash, RemoteKey} import scala.util.Try -class Uploader(transferManager: => AmazonTransferManager) { +class Uploader[M[_]: Monad](transferManager: => AmazonTransferManager) { def accepts(localFile: LocalFile) (implicit multiPartThreshold: Long): Boolean = @@ -25,24 +26,24 @@ class Uploader(transferManager: => AmazonTransferManager) { multiPartThreshold: Long, tryCount: Int, maxRetries: Int) - (implicit info: Int => String => IO[Unit], - warn: String => IO[Unit]): IO[S3Action] = + (implicit info: Int => String => M[Unit], + warn: String => M[Unit]): M[S3Action] = for { - _ <- logMultiPartUploadStart(localFile, tryCount) - upload <- upload(localFile, bucket, uploadProgressListener) - _ <- logMultiPartUploadFinished(localFile) + _ <- logMultiPartUploadStart[M](localFile, tryCount) + upload <- transfer(localFile, bucket, uploadProgressListener) + _ <- logMultiPartUploadFinished[M](localFile) } yield upload match { case Right(r) => UploadS3Action(RemoteKey(r.getKey), MD5Hash(r.getETag)) case Left(e) => ErroredS3Action(localFile.remoteKey, e) } - private def upload(localFile: LocalFile, - bucket: Bucket, - uploadProgressListener: UploadProgressListener, - ): IO[Either[Throwable, UploadResult]] = { - val listener = progressListener(uploadProgressListener) + private def transfer(localFile: LocalFile, + bucket: Bucket, + uploadProgressListener: UploadProgressListener, + ): M[Either[Throwable, UploadResult]] = { + val listener: ProgressListener = progressListener(uploadProgressListener) val putObjectRequest = request(localFile, bucket, listener) - IO { + Monad[M].pure { Try(transferManager.upload(putObjectRequest)) .map(_.waitForUploadResult) .toEither @@ -56,16 +57,18 @@ class Uploader(transferManager: => AmazonTransferManager) { private def progressListener(uploadProgressListener: UploadProgressListener) = new ProgressListener { override def progressChanged(progressEvent: ProgressEvent): Unit = { - uploadProgressListener.listener( - progressEvent match { - case e: ProgressEvent if isTransfer(e) => - TransferEvent(e.getEventType.name) - case e: ProgressEvent if isByteTransfer(e) => - ByteTransferEvent(e.getEventType.name) - case e: ProgressEvent => - RequestEvent(e.getEventType.name, e.getBytes, e.getBytesTransferred) - }) - .unsafeRunSync // the listener doesn't execute otherwise as it is never returned + uploadProgressListener.listener(eventHandler(progressEvent)) + } + + private def eventHandler(progressEvent: ProgressEvent) = { + progressEvent match { + case e: ProgressEvent if isTransfer(e) => + TransferEvent(e.getEventType.name) + case e: ProgressEvent if isByteTransfer(e) => + ByteTransferEvent(e.getEventType.name) + case e: ProgressEvent => + RequestEvent(e.getEventType.name, e.getBytes, e.getBytesTransferred) + } } } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/UploaderLogging.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/UploaderLogging.scala index 645d138..207c111 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/UploaderLogging.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/UploaderLogging.scala @@ -1,22 +1,22 @@ package net.kemitix.s3thorp.aws.lib -import cats.effect.IO +import cats.Monad import net.kemitix.s3thorp.domain.Terminal.clearLine import net.kemitix.s3thorp.domain.SizeTranslation.sizeInEnglish import net.kemitix.s3thorp.domain.LocalFile object UploaderLogging { - def logMultiPartUploadStart(localFile: LocalFile, + def logMultiPartUploadStart[M[_]: Monad](localFile: LocalFile, tryCount: Int) - (implicit info: Int => String => IO[Unit]): IO[Unit] = { + (implicit info: Int => String => M[Unit]): M[Unit] = { val tryMessage = if (tryCount == 1) "" else s"try $tryCount" val size = sizeInEnglish(localFile.file.length) info(1)(s"${clearLine}upload:$tryMessage:$size:${localFile.remoteKey.key}") } - def logMultiPartUploadFinished(localFile: LocalFile) - (implicit info: Int => String => IO[Unit]): IO[Unit] = + def logMultiPartUploadFinished[M[_]: Monad](localFile: LocalFile) + (implicit info: Int => String => M[Unit]): M[Unit] = info(4)(s"upload:finished: ${localFile.remoteKey.key}") } diff --git a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ClientSuite.scala b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ClientSuite.scala index d6bba63..c4a0690 100644 --- a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ClientSuite.scala +++ b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ClientSuite.scala @@ -2,7 +2,7 @@ package net.kemitix.s3thorp.aws.lib import java.time.Instant -import cats.effect.IO +import cats.Id import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.PutObjectRequest import com.amazonaws.services.s3.transfer.model.UploadResult @@ -23,8 +23,8 @@ class S3ClientSuite private val prefix = RemoteKey("prefix") implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) - implicit private val logInfo: Int => String => IO[Unit] = _ => _ => IO.unit - implicit private val logWarn: String => IO[Unit] = _ => IO.unit + implicit private val logInfo: Int => String => Id[Unit] = _ => _ => () + implicit private val logWarn: String => Id[Unit] = _ => () private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _ describe("getS3Status") { @@ -44,7 +44,7 @@ class S3ClientSuite keyotherkey.remoteKey -> HashModified(hash, lastModified), keydiffhash.remoteKey -> HashModified(diffhash, lastModified))) - def invoke(self: S3Client, localFile: LocalFile) = { + def invoke(self: S3Client[Id], localFile: LocalFile) = { S3MetaDataEnricher.getS3Status(localFile, s3ObjectsData) } @@ -108,7 +108,7 @@ class S3ClientSuite pending //FIXME: works okay on its own, but fails when run with others val expected = UploadS3Action(remoteKey, rootHash) - val result = s3Client.upload(localFile, bucket, progressListener, config.multiPartThreshold, 1, config.maxRetries).unsafeRunSync + val result = s3Client.upload(localFile, bucket, progressListener, config.multiPartThreshold, 1, config.maxRetries) assertResult(expected)(result) } } diff --git a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ObjectsByHashSuite.scala b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ObjectsByHashSuite.scala index 6ed6f8b..518de05 100644 --- a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ObjectsByHashSuite.scala +++ b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ObjectsByHashSuite.scala @@ -1,6 +1,7 @@ package net.kemitix.s3thorp.aws.lib import java.time.Instant +import java.time.temporal.ChronoUnit import java.util.Date import com.amazonaws.services.s3.model.S3ObjectSummary @@ -13,7 +14,7 @@ class S3ObjectsByHashSuite extends FunSpec { val hash = MD5Hash("hash") val key1 = RemoteKey("key-1") val key2 = RemoteKey("key-2") - val lastModified = LastModified(Instant.now) + val lastModified = LastModified(Instant.now.truncatedTo(ChronoUnit.MILLIS)) val o1 = s3object(hash, key1, lastModified) val o2 = s3object(hash, key2, lastModified) val os = Stream(o1, o2) diff --git a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/ThorpS3ClientSuite.scala b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/ThorpS3ClientSuite.scala index 9214bdc..86bc114 100644 --- a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/ThorpS3ClientSuite.scala +++ b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/ThorpS3ClientSuite.scala @@ -1,9 +1,10 @@ package net.kemitix.s3thorp.aws.lib import java.time.Instant +import java.time.temporal.ChronoUnit import java.util.Date -import cats.effect.IO +import cats.Id import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.{ListObjectsV2Request, ListObjectsV2Result, S3ObjectSummary} import com.amazonaws.services.s3.transfer.TransferManager @@ -20,9 +21,9 @@ class ThorpS3ClientSuite val source = Resource(this, "upload") val prefix = RemoteKey("prefix") implicit val config: Config = Config(Bucket("bucket"), prefix, source = source) - implicit val logInfo: Int => String => IO[Unit] = _ => _ => IO.unit + implicit val logInfo: Int => String => Id[Unit] = _ => _ => () - val lm = LastModified(Instant.now) + val lm = LastModified(Instant.now.truncatedTo(ChronoUnit.MILLIS)) val h1 = MD5Hash("hash1") @@ -47,7 +48,7 @@ class ThorpS3ClientSuite val amazonS3 = stub[AmazonS3] val amazonS3TransferManager = stub[TransferManager] - val s3Client = new ThorpS3Client(amazonS3, amazonS3TransferManager) + val s3Client = new ThorpS3Client[Id](amazonS3, amazonS3TransferManager) val myFakeResponse = new ListObjectsV2Result() val summaries = myFakeResponse.getObjectSummaries @@ -65,7 +66,7 @@ class ThorpS3ClientSuite k1a -> HashModified(h1, lm), k1b -> HashModified(h1, lm), k2 -> HashModified(h2, lm))) - val result = s3Client.listObjects(Bucket("bucket"), RemoteKey("prefix")).unsafeRunSync + val result = s3Client.listObjects(Bucket("bucket"), RemoteKey("prefix")) assertResult(expected.byHash.keys)(result.byHash.keys) assertResult(expected.byKey.keys)(result.byKey.keys) assertResult(expected)(result) diff --git a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/UploaderSuite.scala b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/UploaderSuite.scala index 7add7ca..0e681cb 100644 --- a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/UploaderSuite.scala +++ b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/UploaderSuite.scala @@ -2,7 +2,7 @@ package net.kemitix.s3thorp.aws.lib import java.time.Instant -import cats.effect.IO +import cats.Id import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.transfer._ import net.kemitix.s3thorp.aws.api.S3Action.UploadS3Action @@ -20,8 +20,8 @@ class UploaderSuite private val source = Resource(this, ".") private val prefix = RemoteKey("prefix") implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) - implicit private val logInfo: Int => String => IO[Unit] = _ => _ => IO.unit - implicit private val logWarn: String => IO[Unit] = _ => IO.unit + implicit private val logInfo: Int => String => Id[Unit] = _ => _ => () + implicit private val logWarn: String => Id[Unit] = _ => () private val fileToKey = generateKey(config.source, config.prefix) _ val lastModified = LastModified(Instant.now()) @@ -63,7 +63,7 @@ class UploaderSuite val uploader = new Uploader(amazonS3TransferManager) it("should upload") { val expected = UploadS3Action(returnedKey, returnedHash) - val result = uploader.upload(bigFile, config.bucket, progressListener, config.multiPartThreshold, 1, config.maxRetries).unsafeRunSync + val result = uploader.upload(bigFile, config.bucket, progressListener, config.multiPartThreshold, 1, config.maxRetries) assertResult(expected)(result) } } diff --git a/build.sbt b/build.sbt index 6b918b4..eba0043 100644 --- a/build.sbt +++ b/build.sbt @@ -27,6 +27,19 @@ val awsSdkDependencies = Seq( "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % "2.9.9" ) ) +val catsSettings = Seq ( + libraryDependencies ++= Seq( + "org.typelevel" %% "cats-core" % "1.6.1" + ), + // recommended for cats-effects + scalacOptions ++= Seq( + "-feature", + "-deprecation", + "-unchecked", + "-language:postfixOps", + "-language:higherKinds", + "-Ypartial-unification") +) val catsEffectsSettings = Seq( libraryDependencies ++= Seq( "org.typelevel" %% "cats-effect" % "1.3.1" @@ -47,6 +60,7 @@ lazy val cli = (project in file("cli")) .settings(commonSettings) .settings(mainClass in assembly := Some("net.kemitix.s3thorp.cli.Main")) .settings(applicationSettings) + .settings(catsEffectsSettings) .aggregate(`aws-lib`, core, `aws-api`, domain) .settings(commandLineParsing) .dependsOn(`aws-lib`) @@ -67,7 +81,7 @@ lazy val core = (project in file("core")) lazy val `aws-api` = (project in file("aws-api")) .settings(commonSettings) .settings(assemblyJarName in assembly := "aws-api.jar") - .settings(catsEffectsSettings) + .settings(catsSettings) .dependsOn(domain) lazy val domain = (project in file("domain")) diff --git a/cli/src/main/scala/net/kemitix/s3thorp/cli/Logger.scala b/cli/src/main/scala/net/kemitix/s3thorp/cli/Logger.scala index 0ea2a5e..6f8fb89 100644 --- a/cli/src/main/scala/net/kemitix/s3thorp/cli/Logger.scala +++ b/cli/src/main/scala/net/kemitix/s3thorp/cli/Logger.scala @@ -1,15 +1,16 @@ package net.kemitix.s3thorp.cli -import cats.effect.IO +import cats.Monad -class Logger(verbosity: Int) { - def info(level: Int)(message: String): IO[Unit] = - if (verbosity >= level) IO(println(s"[INFO:$level] $message")) - else IO.unit +class Logger[M[_]: Monad](verbosity: Int) { - def warn(message: String): IO[Unit] = IO(println(s"[ WARN] $message")) + def info(level: Int)(message: String): M[Unit] = + if (verbosity >= level) Monad[M].pure(println(s"[INFO:$level] $message")) + else Monad[M].unit - def error(message: String): IO[Unit] = IO(println(s"[ ERROR] $message")) + def warn(message: String): M[Unit] = Monad[M].pure(println(s"[ WARN] $message")) + + def error(message: String): M[Unit] = Monad[M].pure(println(s"[ ERROR] $message")) } diff --git a/cli/src/main/scala/net/kemitix/s3thorp/cli/Main.scala b/cli/src/main/scala/net/kemitix/s3thorp/cli/Main.scala index 4b93814..39fe0ec 100644 --- a/cli/src/main/scala/net/kemitix/s3thorp/cli/Main.scala +++ b/cli/src/main/scala/net/kemitix/s3thorp/cli/Main.scala @@ -18,20 +18,24 @@ object Main extends IOApp { def program(args: List[String]): IO[ExitCode] = for { config <- ParseArgs(args, defaultConfig) - logger = new Logger(config.verbose) + logger = new Logger[IO](config.verbose) info = (l: Int) => (m: String) => logger.info(l)(m) - md5HashGenerator = (file: File) => md5File(file)(info) _ <- logger.info(1)("S3Thorp - hashed sync for s3") - _ <- Sync.run( + _ <- Sync.run[IO]( + config, S3ClientBuilder.defaultClient, - md5HashGenerator, - l => i => logger.info(l)(i), - w => logger.warn(w), - e => logger.error(e))(config) + hashGenerator(info), + info, + w => logger.warn(w)) } yield ExitCode.Success + private def hashGenerator(info: Int => String => IO[Unit]) = { + implicit val logInfo: Int => String => IO[Unit] = info + file: File => md5File[IO](file) + } + override def run(args: List[String]): IO[ExitCode] = { - val logger = new Logger(1) + val logger = new Logger[IO](1) program(args) .guaranteeCase { case Canceled => logger.warn("Interrupted") diff --git a/core/src/main/scala/net.kemitix.s3thorp.core/ActionSubmitter.scala b/core/src/main/scala/net.kemitix.s3thorp.core/ActionSubmitter.scala index f588f90..b15ee6b 100644 --- a/core/src/main/scala/net.kemitix.s3thorp.core/ActionSubmitter.scala +++ b/core/src/main/scala/net.kemitix.s3thorp.core/ActionSubmitter.scala @@ -1,6 +1,7 @@ package net.kemitix.s3thorp.core -import cats.effect.IO +import cats.Monad +import cats.implicits._ import net.kemitix.s3thorp.aws.api.S3Action.DoNothingS3Action import net.kemitix.s3thorp.aws.api.{S3Action, S3Client, UploadProgressListener} import net.kemitix.s3thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} @@ -8,10 +9,10 @@ import net.kemitix.s3thorp.domain.Config object ActionSubmitter { - def submitAction(s3Client: S3Client, action: Action) - (implicit c: Config, - info: Int => String => IO[Unit], - warn: String => IO[Unit]): Stream[IO[S3Action]] = { + def submitAction[M[_]: Monad](s3Client: S3Client[M], action: Action) + (implicit c: Config, + info: Int => String => M[Unit], + warn: String => M[Unit]): Stream[M[S3Action]] = { Stream( action match { case ToUpload(bucket, localFile) => @@ -31,7 +32,7 @@ object ActionSubmitter { action <- s3Client.delete(bucket, remoteKey) } yield action case DoNothing(bucket, remoteKey) => - IO.pure(DoNothingS3Action(remoteKey)) + Monad[M].pure(DoNothingS3Action(remoteKey)) }) } } diff --git a/core/src/main/scala/net.kemitix.s3thorp.core/LocalFileStream.scala b/core/src/main/scala/net.kemitix.s3thorp.core/LocalFileStream.scala index ece56c4..10f9f5f 100644 --- a/core/src/main/scala/net.kemitix.s3thorp.core/LocalFileStream.scala +++ b/core/src/main/scala/net.kemitix.s3thorp.core/LocalFileStream.scala @@ -3,23 +3,24 @@ package net.kemitix.s3thorp.core import java.io.File import java.nio.file.Path -import cats.effect.IO +import cats.Monad +import cats.implicits._ import net.kemitix.s3thorp.core.KeyGenerator.generateKey import net.kemitix.s3thorp.domain.{Config, Filter, LocalFile, MD5Hash} object LocalFileStream { - def findFiles(file: File, - md5HashGenerator: File => IO[MD5Hash], - info: Int => String => IO[Unit]) - (implicit c: Config): IO[Stream[LocalFile]] = { + def findFiles[M[_]: Monad](file: File, + md5HashGenerator: File => M[MD5Hash], + info: Int => String => M[Unit]) + (implicit c: Config): M[Stream[LocalFile]] = { val filters: Path => Boolean = Filter.isIncluded(c.filters) - def loop(file: File): IO[Stream[LocalFile]] = { + def loop(file: File): M[Stream[LocalFile]] = { - def dirPaths(file: File): IO[Stream[File]] = - IO { + def dirPaths(file: File): M[Stream[File]] = + Monad[M].pure { Option(file.listFiles) .getOrElse(throw new IllegalArgumentException(s"Directory not found $file")) } @@ -27,23 +28,23 @@ object LocalFileStream { Stream(fs: _*) .filter(f => filters(f.toPath))) - def recurseIntoSubDirectories(file: File)(implicit c: Config): IO[Stream[LocalFile]] = + def recurseIntoSubDirectories(file: File)(implicit c: Config): M[Stream[LocalFile]] = file match { case f if f.isDirectory => loop(file) case _ => for(hash <- md5HashGenerator(file)) yield Stream(LocalFile(file, c.source, hash, generateKey(c.source, c.prefix))) } - def recurse(fs: Stream[File]): IO[Stream[LocalFile]] = - fs.foldLeft(IO.pure(Stream.empty[LocalFile]))((acc, f) => + def recurse(fs: Stream[File]): M[Stream[LocalFile]] = + fs.foldLeft(Monad[M].pure(Stream.empty[LocalFile]))((acc, f) => recurseIntoSubDirectories(f) .flatMap(lfs => acc.map(s => s ++ lfs))) for { - _ <- IO(info(2)(s"- Entering: $file")) + _ <- info(2)(s"- Entering: $file") fs <- dirPaths(file) lfs <- recurse(fs) - _ <- IO(info(5)(s"- Leaving : $file")) + _ <- info(5)(s"- Leaving : $file") } yield lfs } diff --git a/core/src/main/scala/net.kemitix.s3thorp.core/MD5HashGenerator.scala b/core/src/main/scala/net.kemitix.s3thorp.core/MD5HashGenerator.scala index 9f9a247..85f8ee3 100644 --- a/core/src/main/scala/net.kemitix.s3thorp.core/MD5HashGenerator.scala +++ b/core/src/main/scala/net.kemitix.s3thorp.core/MD5HashGenerator.scala @@ -3,23 +3,21 @@ package net.kemitix.s3thorp.core import java.io.{File, FileInputStream} import java.security.MessageDigest -import cats.effect.IO +import cats.Monad +import cats.implicits._ import net.kemitix.s3thorp.domain.MD5Hash import scala.collection.immutable.NumericRange object MD5HashGenerator { - def md5File(file: File) - (implicit info: Int => String => IO[Unit]): IO[MD5Hash] = { + def md5File[M[_]: Monad](file: File) + (implicit info: Int => String => M[Unit]): M[MD5Hash] = { val maxBufferSize = 8048 - val defaultBuffer = new Array[Byte](maxBufferSize) - - def openFile = IO(new FileInputStream(file)) - - def closeFile = {fis: FileInputStream => IO(fis.close())} + def openFile = Monad[M].pure(new FileInputStream(file)) + def closeFile = {fis: FileInputStream => Monad[M].pure(fis.close())} def nextChunkSize(currentOffset: Long) = { // a value between 1 and maxBufferSize @@ -31,25 +29,29 @@ object MD5HashGenerator { def readToBuffer(fis: FileInputStream, currentOffset: Long) = { val buffer = - if (nextChunkSize(currentOffset) < maxBufferSize) - new Array[Byte](nextChunkSize(currentOffset)) - else - defaultBuffer + if (nextChunkSize(currentOffset) < maxBufferSize) new Array[Byte](nextChunkSize(currentOffset)) + else defaultBuffer fis read buffer buffer } - def readFile: IO[String] = openFile - .bracket(fis => IO { + def digestFile(fis: FileInputStream) = + Monad[M].pure { val md5 = MessageDigest getInstance "MD5" NumericRange(0, file.length, maxBufferSize) - .foreach{currentOffset => { - val buffer = readToBuffer(fis, currentOffset) - md5 update buffer - } - } + .foreach { currentOffset => { + val buffer = readToBuffer(fis, currentOffset) + md5 update buffer + }} (md5.digest map ("%02x" format _)).mkString - })(closeFile) + } + + def readFile: M[String] = + for { + fis <- openFile + md5 <- digestFile(fis) + _ <- closeFile(fis) + } yield md5 for { _ <- info(5)(s"md5:reading:size ${file.length}:$file") diff --git a/core/src/main/scala/net.kemitix.s3thorp.core/Sync.scala b/core/src/main/scala/net.kemitix.s3thorp.core/Sync.scala index 6dea7f2..0269b47 100644 --- a/core/src/main/scala/net.kemitix.s3thorp.core/Sync.scala +++ b/core/src/main/scala/net.kemitix.s3thorp.core/Sync.scala @@ -2,7 +2,7 @@ package net.kemitix.s3thorp.core import java.io.File -import cats.effect.IO +import cats.Monad import cats.implicits._ import net.kemitix.s3thorp.aws.api.{S3Action, S3Client} import net.kemitix.s3thorp.core.Action.ToDelete @@ -11,33 +11,47 @@ import net.kemitix.s3thorp.core.ActionSubmitter.submitAction import net.kemitix.s3thorp.core.LocalFileStream.findFiles import net.kemitix.s3thorp.core.S3MetaDataEnricher.getMetadata import net.kemitix.s3thorp.core.SyncLogging.{logFileScan, logRunFinished, logRunStart} -import net.kemitix.s3thorp.domain.{Config, MD5Hash, S3ObjectsData} +import net.kemitix.s3thorp.domain.{Config, LocalFile, MD5Hash, S3MetaData, S3ObjectsData} object Sync { - def run(s3Client: S3Client, - md5HashGenerator: File => IO[MD5Hash], - info: Int => String => IO[Unit], - warn: String => IO[Unit], - error: String => IO[Unit]) - (implicit c: Config): IO[Unit] = { + def run[M[_]: Monad](config: Config, + s3Client: S3Client[M], + md5HashGenerator: File => M[MD5Hash], + info: Int => String => M[Unit], + warn: String => M[Unit]): M[Unit] = { - def copyUploadActions(s3Data: S3ObjectsData): IO[Stream[S3Action]] = + implicit val c: Config = config + implicit val logInfo: Int => String => M[Unit] = info + implicit val logWarn: String => M[Unit] = warn + + def metaData(s3Data: S3ObjectsData, sFiles: Stream[LocalFile]) = + Monad[M].pure(sFiles.map(file => getMetadata(file, s3Data))) + + def actions(sData: Stream[S3MetaData]) = + Monad[M].pure(sData.flatMap(s3MetaData => createActions(s3MetaData))) + + def submit(sActions: Stream[Action]) = + Monad[M].pure(sActions.flatMap(action => submitAction[M](s3Client, action))) + + def copyUploadActions(s3Data: S3ObjectsData): M[Stream[S3Action]] = (for { - sFiles <- findFiles(c.source, md5HashGenerator, info) - sData <- IO(sFiles.map(file => getMetadata(file, s3Data))) - sActions <- IO(sData.flatMap(s3MetaData => createActions(s3MetaData))) - sS3Actions <- IO(sActions.flatMap(action => submitAction(s3Client, action)(c, info, warn))) - } yield sS3Actions.sequence) + files <- findFiles(c.source, md5HashGenerator, info) + metaData <- metaData(s3Data, files) + actions <- actions(metaData) + s3Actions <- submit(actions) + } yield s3Actions.sequence) .flatten .map(streamS3Actions => streamS3Actions.sorted) - def deleteActions(s3ObjectsData: S3ObjectsData): IO[Stream[S3Action]] = + def deleteActions(s3ObjectsData: S3ObjectsData): M[Stream[S3Action]] = (for { key <- s3ObjectsData.byKey.keys if key.isMissingLocally(c.source, c.prefix) - ioDelAction <- submitAction(s3Client, ToDelete(c.bucket, key))(c, info, warn) - } yield ioDelAction).toStream.sequence + ioDelAction <- submitAction[M](s3Client, ToDelete(c.bucket, key)) + } yield ioDelAction) + .toStream + .sequence for { _ <- logRunStart(info) diff --git a/core/src/main/scala/net.kemitix.s3thorp.core/SyncLogging.scala b/core/src/main/scala/net.kemitix.s3thorp.core/SyncLogging.scala index 81c6ad8..806ac16 100644 --- a/core/src/main/scala/net.kemitix.s3thorp.core/SyncLogging.scala +++ b/core/src/main/scala/net.kemitix.s3thorp.core/SyncLogging.scala @@ -1,6 +1,7 @@ package net.kemitix.s3thorp.core -import cats.effect.IO +import cats.Monad +import cats.implicits._ import net.kemitix.s3thorp.aws.api.S3Action import net.kemitix.s3thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action, ErroredS3Action, UploadS3Action} import net.kemitix.s3thorp.domain.Config @@ -8,25 +9,25 @@ import net.kemitix.s3thorp.domain.Config // Logging for the Sync class object SyncLogging { - def logRunStart(info: Int => String => IO[Unit]) - (implicit c: Config): IO[Unit] = + def logRunStart[M[_]: Monad](info: Int => String => M[Unit]) + (implicit c: Config): M[Unit] = info(1)(s"Bucket: ${c.bucket.name}, Prefix: ${c.prefix.key}, Source: ${c.source}, ") - def logFileScan(info: Int => String => IO[Unit]) - (implicit c: Config): IO[Unit] = + def logFileScan[M[_]: Monad](info: Int => String => M[Unit]) + (implicit c: Config): M[Unit] = info(1)(s"Scanning local files: ${c.source}...") - def logRunFinished(actions: Stream[S3Action], - info: Int => String => IO[Unit]) - (implicit c: Config): IO[Unit] = + def logRunFinished[M[_]: Monad](actions: Stream[S3Action], + info: Int => String => M[Unit]) + (implicit c: Config): M[Unit] = { + val counters = actions.foldLeft(Counters())(countActivities) for { - _ <- IO.unit - counters = actions.foldLeft(Counters())(countActivities) _ <- info(1)(s"Uploaded ${counters.uploaded} files") _ <- info(1)(s"Copied ${counters.copied} files") _ <- info(1)(s"Deleted ${counters.deleted} files") _ <- info(1)(s"Errors ${counters.errors}") } yield () + } private def countActivities(implicit c: Config): (Counters, S3Action) => Counters = (counters: Counters, s3Action: S3Action) => { diff --git a/core/src/test/scala/net/kemitix/s3thorp/core/LocalFileStreamSuite.scala b/core/src/test/scala/net/kemitix/s3thorp/core/LocalFileStreamSuite.scala index 76a2ddc..95ec9be 100644 --- a/core/src/test/scala/net/kemitix/s3thorp/core/LocalFileStreamSuite.scala +++ b/core/src/test/scala/net/kemitix/s3thorp/core/LocalFileStreamSuite.scala @@ -2,21 +2,21 @@ package net.kemitix.s3thorp.core import java.io.File -import cats.effect.IO +import cats.Id import net.kemitix.s3thorp.domain.{Config, LocalFile, MD5Hash} import org.scalatest.FunSpec class LocalFileStreamSuite extends FunSpec { val uploadResource = Resource(this, "upload") - val config: Config = Config(source = uploadResource) - implicit private val logInfo: Int => String => IO[Unit] = l => i => IO.unit - val md5HashGenerator: File => IO[MD5Hash] = file => MD5HashGenerator.md5File(file) + implicit val config: Config = Config(source = uploadResource) + implicit private val logInfo: Int => String => Id[Unit] = l => i => () + val md5HashGenerator: File => Id[MD5Hash] = file => MD5HashGenerator.md5File[Id](file) describe("findFiles") { it("should find all files") { val result: Set[String] = - LocalFileStream.findFiles(uploadResource, md5HashGenerator, logInfo)(config).unsafeRunSync.toSet + LocalFileStream.findFiles[Id](uploadResource, md5HashGenerator, logInfo).toSet .map { x: LocalFile => x.relative.toString } assertResult(Set("subdir/leaf-file", "root-file"))(result) } diff --git a/core/src/test/scala/net/kemitix/s3thorp/core/MD5HashGeneratorTest.scala b/core/src/test/scala/net/kemitix/s3thorp/core/MD5HashGeneratorTest.scala index c71000c..674fa22 100644 --- a/core/src/test/scala/net/kemitix/s3thorp/core/MD5HashGeneratorTest.scala +++ b/core/src/test/scala/net/kemitix/s3thorp/core/MD5HashGeneratorTest.scala @@ -1,8 +1,6 @@ package net.kemitix.s3thorp.core -import java.nio.file.Files - -import cats.effect.IO +import cats.Id import net.kemitix.s3thorp.core.MD5HashData.rootHash import net.kemitix.s3thorp.domain.{Bucket, Config, MD5Hash, RemoteKey} import org.scalatest.FunSpec @@ -12,12 +10,12 @@ class MD5HashGeneratorTest extends FunSpec { private val source = Resource(this, "upload") private val prefix = RemoteKey("prefix") implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) - implicit private val logInfo: Int => String => IO[Unit] = l => i => IO.unit + implicit private val logInfo: Int => String => Id[Unit] = l => i => () describe("read a small file (smaller than buffer)") { val file = Resource(this, "upload/root-file") it("should generate the correct hash") { - val result = MD5HashGenerator.md5File(file).unsafeRunSync + val result = MD5HashGenerator.md5File[Id](file) assertResult(rootHash)(result) } } @@ -25,7 +23,7 @@ class MD5HashGeneratorTest extends FunSpec { val file = Resource(this, "big-file") it("should generate the correct hash") { val expected = MD5Hash("b1ab1f7680138e6db7309200584e35d8") - val result = MD5HashGenerator.md5File(file).unsafeRunSync + val result = MD5HashGenerator.md5File[Id](file) assertResult(expected)(result) } } diff --git a/core/src/test/scala/net/kemitix/s3thorp/core/SyncSuite.scala b/core/src/test/scala/net/kemitix/s3thorp/core/SyncSuite.scala index 732f8ee..d310393 100644 --- a/core/src/test/scala/net/kemitix/s3thorp/core/SyncSuite.scala +++ b/core/src/test/scala/net/kemitix/s3thorp/core/SyncSuite.scala @@ -3,7 +3,7 @@ package net.kemitix.s3thorp.core import java.io.File import java.time.Instant -import cats.effect.IO +import cats.Id import net.kemitix.s3thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action, UploadS3Action} import net.kemitix.s3thorp.aws.api.{S3Client, UploadProgressListener} import net.kemitix.s3thorp.core.MD5HashData.{leafHash, rootHash} @@ -16,32 +16,29 @@ class SyncSuite private val source = Resource(this, "upload") private val prefix = RemoteKey("prefix") - implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) - implicit private val logInfo: Int => String => IO[Unit] = _ => _ => IO.unit - implicit private val logWarn: String => IO[Unit] = _ => IO.unit - private def logError: String => IO[Unit] = _ => IO.unit + val config = Config(Bucket("bucket"), prefix, source = source) + implicit private val logInfo: Int => String => Id[Unit] = _ => _ => () + implicit private val logWarn: String => Id[Unit] = _ => () private val lastModified = LastModified(Instant.now) private val fileToKey: File => RemoteKey = KeyGenerator.generateKey(source, prefix) private val rootFile = LocalFile.resolve("root-file", rootHash, source, fileToKey) private val leafFile = LocalFile.resolve("subdir/leaf-file", leafHash, source, fileToKey) - private val md5HashGenerator = MD5HashGenerator.md5File(_) + private val md5HashGenerator = MD5HashGenerator.md5File[Id](_) - def putObjectRequest(bucket: Bucket, remoteKey: RemoteKey, localFile: LocalFile) = { + def putObjectRequest(bucket: Bucket, remoteKey: RemoteKey, localFile: LocalFile): (String, String, File) = (bucket.name, remoteKey.key, localFile.file) - } describe("run") { val testBucket = Bucket("bucket") // source contains the files root-file and subdir/leaf-file - val config = Config(Bucket("bucket"), RemoteKey("prefix"), source = source) val rootRemoteKey = RemoteKey("prefix/root-file") val leafRemoteKey = RemoteKey("prefix/subdir/leaf-file") describe("when all files should be uploaded") { val s3Client = new RecordingClient(testBucket, S3ObjectsData( byHash = Map(), byKey = Map())) - Sync.run(s3Client, md5HashGenerator, logInfo, logWarn, logError)(config).unsafeRunSync + Sync.run(config, s3Client, md5HashGenerator, logInfo, logWarn) it("uploads all files") { val expectedUploads = Map( "subdir/leaf-file" -> leafRemoteKey, @@ -67,7 +64,7 @@ class SyncSuite RemoteKey("prefix/root-file") -> HashModified(rootHash, lastModified), RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified))) val s3Client = new RecordingClient(testBucket, s3ObjectsData) - Sync.run(s3Client, md5HashGenerator, logInfo, logWarn, logError)(config).unsafeRunSync + Sync.run(config, s3Client, md5HashGenerator, logInfo, logWarn) it("uploads nothing") { val expectedUploads = Map() assertResult(expectedUploads)(s3Client.uploadsRecord) @@ -91,7 +88,7 @@ class SyncSuite RemoteKey("prefix/root-file-old") -> HashModified(rootHash, lastModified), RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified))) val s3Client = new RecordingClient(testBucket, s3ObjectsData) - Sync.run(s3Client, md5HashGenerator, logInfo, logWarn, logError)(config).unsafeRunSync + Sync.run(config, s3Client, md5HashGenerator, logInfo, logWarn) it("uploads nothing") { val expectedUploads = Map() assertResult(expectedUploads)(s3Client.uploadsRecord) @@ -119,17 +116,17 @@ class SyncSuite byKey = Map( deletedKey -> HashModified(deletedHash, lastModified))) val s3Client = new RecordingClient(testBucket, s3ObjectsData) - Sync.run(s3Client, md5HashGenerator, logInfo, logWarn, logError)(config).unsafeRunSync + Sync.run(config, s3Client, md5HashGenerator, logInfo, logWarn) it("deleted key") { val expectedDeletions = Set(deletedKey) assertResult(expectedDeletions)(s3Client.deletionsRecord) } } describe("when a file is excluded") { - val configWithExclusion = config.copy(filters = List(Exclude("leaf"))) + val config: Config = Config(Bucket("bucket"), prefix, source = source, filters = List(Exclude("leaf"))) val s3ObjectsData = S3ObjectsData(Map(), Map()) val s3Client = new RecordingClient(testBucket, s3ObjectsData) - Sync.run(s3Client, md5HashGenerator, logInfo, logWarn, logError)(configWithExclusion).unsafeRunSync + Sync.run(config, s3Client, md5HashGenerator, logInfo, logWarn) it("is not uploaded") { val expectedUploads = Map( "root-file" -> rootRemoteKey @@ -141,7 +138,7 @@ class SyncSuite class RecordingClient(testBucket: Bucket, s3ObjectsData: S3ObjectsData) - extends S3Client { + extends S3Client[Id] { var uploadsRecord: Map[String, RemoteKey] = Map() var copiesRecord: Map[RemoteKey, RemoteKey] = Map() @@ -149,8 +146,8 @@ class SyncSuite override def listObjects(bucket: Bucket, prefix: RemoteKey) - (implicit info: Int => String => IO[Unit]) = - IO.pure(s3ObjectsData) + (implicit info: Int => String => Id[Unit]): S3ObjectsData = + s3ObjectsData override def upload(localFile: LocalFile, bucket: Bucket, @@ -158,32 +155,29 @@ class SyncSuite multiPartThreshold: Long, tryCount: Int, maxRetries: Int) - (implicit info: Int => String => IO[Unit], - warn: String => IO[Unit]) = - IO { - if (bucket == testBucket) - uploadsRecord += (localFile.relative.toString -> localFile.remoteKey) - UploadS3Action(localFile.remoteKey, MD5Hash("some hash value")) - } + (implicit info: Int => String => Id[Unit], + warn: String => Id[Unit]): UploadS3Action = { + if (bucket == testBucket) + uploadsRecord += (localFile.relative.toString -> localFile.remoteKey) + UploadS3Action(localFile.remoteKey, MD5Hash("some hash value")) + } override def copy(bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey - )(implicit info: Int => String => IO[Unit]) = - IO { - if (bucket == testBucket) - copiesRecord += (sourceKey -> targetKey) - CopyS3Action(targetKey) - } + )(implicit info: Int => String => Id[Unit]): CopyS3Action = { + if (bucket == testBucket) + copiesRecord += (sourceKey -> targetKey) + CopyS3Action(targetKey) + } override def delete(bucket: Bucket, remoteKey: RemoteKey - )(implicit info: Int => String => IO[Unit]) = - IO { - if (bucket == testBucket) - deletionsRecord += remoteKey - DeleteS3Action(remoteKey) - } + )(implicit info: Int => String => Id[Unit]): DeleteS3Action = { + if (bucket == testBucket) + deletionsRecord += remoteKey + DeleteS3Action(remoteKey) + } } }