From 97efed76b4899ed52e9257f75496651d23549046 Mon Sep 17 00:00:00 2001 From: Paul Campbell Date: Mon, 10 Jun 2019 19:45:36 +0100 Subject: [PATCH] Improve upload logging (#44) * [aws-lib] fold S3ClientUploader trait into it's only implementation This trait was only implemented by S3ClientTransferManager. * [core] SyncLogging: more robust matching No longer cares about parameters to case classes, just their types. * [cli] Logger uses IO for log methods * [aws-lib] remove 'transfer-manager'prefix and only show tryCount > 1 * [sbt,cli] remove log4j and scala-logging dependencies * [domain] move QuoteStripper to Domain Use it directly in MD5Hash to strip quotes from any input. * [core] SyncLogging call info in proper context If the IO.unit returned by the info calls isn't part of the chain that is returned from the function, then the delayed IO action is never called. * [aws-lib] Display size in bytes of file being uploaded * [core] call info in correct context * [cli] call info in correct context * [aws-lib] raise summary fetch message to info 1 * [cli] include correct level in info messages * [aws-lib] S3ClientLogging adjust logging levels * [aws-lib] display file sizes in english * [aws-lib] ObjectLister use IO.bracket properly * [aws-lib] Copier use IO.bracket properly * [aws-lib] Deleter refactor * [aws-lib] TransferManagerLogging remove unused methods * [aws-lib] TransferManager refactor * [aws-lib] TransferManager refactor * [aws-lib] TransferManager displays log messages Use the UploadProgressListener that was being ignored, and use unsafeRunSync to execute the suspended effect within the IO[Unit]. Using unsafeRunSync is required to render the effects as the listener returns Unit, meaning the suspended effects would be discarded. * [domain] Extract SizeTranslation into module * [aws-api] report bytes transferred in progress * [core] fix calls to info info now returns an IO already, so don't need to wrap it in one. * [aws-lib] remove unused class * [aws-lib] UploadProgress displays progress bar while uploading * [aws-api] UploadProgressLogging optimise imports * [aws-api] UploadProgressLogging rename variables * [domain] add Terminal object * [aws-api] UploadProgressLogging use console width and two lines - Improved clearing of lines after progress bar - Use console width for progress bar size * [aws-lib] S3ClientLogging optimise imports * [aws-lib] TransferManager clear line before logging * [aws-lib] rename class as TransferManager * [aws-lib] rename TransferManger as Uploader to not clash We are using an AWS SDK class with the same name. --- .../kemitix/s3thorp/aws/api/S3Client.scala | 10 +-- .../aws/api/UploadProgressListener.scala | 13 +++- .../aws/api/UploadProgressLogging.scala | 35 +++++++-- .../aws/lib/CancellableMultiPartUpload.scala | 5 -- .../s3thorp/aws/lib/S3ClientCopier.scala | 16 ++-- .../s3thorp/aws/lib/S3ClientDeleter.scala | 8 +- .../s3thorp/aws/lib/S3ClientLogging.scala | 55 +++++-------- .../aws/lib/S3ClientObjectLister.scala | 27 ++++--- .../aws/lib/S3ClientTransferManager.scala | 37 --------- .../lib/S3ClientTransferManagerLogging.scala | 77 ------------------- .../s3thorp/aws/lib/S3ClientUploader.scala | 35 --------- .../s3thorp/aws/lib/S3ObjectsByKey.scala | 3 +- .../s3thorp/aws/lib/ThorpS3Client.scala | 12 +-- .../kemitix/s3thorp/aws/lib/Uploader.scala | 64 +++++++++++++++ .../s3thorp/aws/lib/UploaderLogging.scala | 22 ++++++ .../s3thorp/aws/lib/S3ClientSuite.scala | 5 +- .../s3thorp/aws/lib/ThorpS3ClientSuite.scala | 3 +- ...ManagerSuite.scala => UploaderSuite.scala} | 11 +-- build.sbt | 7 -- cli/src/main/resources/log4j.xml | 21 ----- .../net/kemitix/s3thorp/cli/Logger.scala | 13 ++-- .../scala/net/kemitix/s3thorp/cli/Main.scala | 8 +- .../ActionSubmitter.scala | 28 ++++--- .../LocalFileStream.scala | 2 +- .../MD5HashGenerator.scala | 8 +- .../scala/net.kemitix.s3thorp.core/Sync.scala | 6 +- .../SyncLogging.scala | 32 ++++---- .../s3thorp/core/LocalFileStreamSuite.scala | 2 +- .../s3thorp/core/MD5HashGeneratorTest.scala | 3 +- .../net/kemitix/s3thorp/core/SyncSuite.scala | 20 +++-- .../net/kemitix/s3thorp/domain/MD5Hash.scala | 6 +- .../s3thorp/domain}/QuoteStripper.scala | 2 +- .../s3thorp/domain/SizeTranslation.scala | 14 ++++ .../net/kemitix/s3thorp/domain/Terminal.scala | 28 +++++++ 34 files changed, 308 insertions(+), 330 deletions(-) delete mode 100644 aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/CancellableMultiPartUpload.scala delete mode 100644 aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientTransferManager.scala delete mode 100644 aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientTransferManagerLogging.scala delete mode 100644 aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientUploader.scala create mode 100644 aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/Uploader.scala create mode 100644 aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/UploaderLogging.scala rename aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/{S3ClientTransferManagerSuite.scala => UploaderSuite.scala} (90%) delete mode 100644 cli/src/main/resources/log4j.xml rename {core/src/main/scala/net.kemitix.s3thorp.core => domain/src/main/scala/net/kemitix/s3thorp/domain}/QuoteStripper.scala (67%) create mode 100644 domain/src/main/scala/net/kemitix/s3thorp/domain/SizeTranslation.scala create mode 100644 domain/src/main/scala/net/kemitix/s3thorp/domain/Terminal.scala diff --git a/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/S3Client.scala b/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/S3Client.scala index 99fef29..0b0b9ff 100644 --- a/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/S3Client.scala +++ b/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/S3Client.scala @@ -8,7 +8,7 @@ trait S3Client { def listObjects(bucket: Bucket, prefix: RemoteKey - )(implicit info: Int => String => Unit): IO[S3ObjectsData] + )(implicit info: Int => String => IO[Unit]): IO[S3ObjectsData] def upload(localFile: LocalFile, bucket: Bucket, @@ -16,17 +16,17 @@ trait S3Client { multiPartThreshold: Long, tryCount: Int, maxRetries: Int) - (implicit info: Int => String => Unit, - warn: String => Unit): IO[S3Action] + (implicit info: Int => String => IO[Unit], + warn: String => IO[Unit]): IO[S3Action] def copy(bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey - )(implicit info: Int => String => Unit): IO[CopyS3Action] + )(implicit info: Int => String => IO[Unit]): IO[CopyS3Action] def delete(bucket: Bucket, remoteKey: RemoteKey - )(implicit info: Int => String => Unit): IO[DeleteS3Action] + )(implicit info: Int => String => IO[Unit]): IO[DeleteS3Action] } diff --git a/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressListener.scala b/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressListener.scala index 8bd94b2..879e015 100644 --- a/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressListener.scala +++ b/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressListener.scala @@ -1,16 +1,23 @@ package net.kemitix.s3thorp.aws.api +import cats.effect.IO import net.kemitix.s3thorp.aws.api.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent} import net.kemitix.s3thorp.domain.LocalFile class UploadProgressListener(localFile: LocalFile) - (implicit info: Int => String => Unit) + (implicit info: Int => String => IO[Unit]) extends UploadProgressLogging { - def listener: UploadEvent => Unit = + var bytesTransferred = 0L + + def listener: UploadEvent => IO[Unit] = { case e: TransferEvent => logTransfer(localFile, e) - case e: RequestEvent => logRequestCycle(localFile, e) + case e: RequestEvent => { + val transferred = e.transferred + bytesTransferred += transferred + logRequestCycle(localFile, e, bytesTransferred) + } case e: ByteTransferEvent => logByteTransfer(e) } } diff --git a/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressLogging.scala b/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressLogging.scala index 46a4213..3ad231b 100644 --- a/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressLogging.scala +++ b/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressLogging.scala @@ -1,22 +1,43 @@ package net.kemitix.s3thorp.aws.api +import cats.effect.IO import net.kemitix.s3thorp.aws.api.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent} -import net.kemitix.s3thorp.domain.LocalFile +import net.kemitix.s3thorp.domain.Terminal.{clearLine, returnToPreviousLine} +import net.kemitix.s3thorp.domain.{Terminal, LocalFile} +import net.kemitix.s3thorp.domain.SizeTranslation.sizeInEnglish + +import scala.io.AnsiColor._ trait UploadProgressLogging { def logTransfer(localFile: LocalFile, event: TransferEvent) - (implicit info: Int => String => Unit): Unit = - info(2)(s"Transfer:${event.name}: ${localFile.remoteKey.key}") + (implicit info: Int => String => IO[Unit]): IO[Unit] = + info(2)(s"Transfer:${event.name}: ${localFile.remoteKey.key}") + + private val oneHundredPercent = 100 def logRequestCycle(localFile: LocalFile, - event: RequestEvent) - (implicit info: Int => String => Unit): Unit = - info(3)(s"Uploading:${event.name}:${event.transferred}/${event.bytes}:${localFile.remoteKey.key}") + event: RequestEvent, + bytesTransferred: Long) + (implicit info: Int => String => IO[Unit]): IO[Unit] = { + val remoteKey = localFile.remoteKey.key + val fileLength = localFile.file.length + val consoleWidth = Terminal.width - 2 + val done = ((bytesTransferred.toDouble / fileLength.toDouble) * consoleWidth).toInt + if (done < oneHundredPercent) { + val head = s"$GREEN_B$GREEN#$RESET" * done + val tail = " " * (consoleWidth - done) + val bar = s"[$head$tail]" + val transferred = sizeInEnglish(bytesTransferred) + val fileSize = sizeInEnglish(fileLength) + IO(print(s"${clearLine}Uploading $transferred of $fileSize : $remoteKey\n$bar$returnToPreviousLine")) + } else + IO(print(clearLine)) + } def logByteTransfer(event: ByteTransferEvent) - (implicit info: Int => String => Unit): Unit = + (implicit info: Int => String => IO[Unit]): IO[Unit] = info(3)(".") } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/CancellableMultiPartUpload.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/CancellableMultiPartUpload.scala deleted file mode 100644 index f0c0de7..0000000 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/CancellableMultiPartUpload.scala +++ /dev/null @@ -1,5 +0,0 @@ -package net.kemitix.s3thorp.aws.lib - -final case class CancellableMultiPartUpload( - e: Throwable, - uploadId: String) extends Exception(e) diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientCopier.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientCopier.scala index 8e1ec2a..902d95c 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientCopier.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientCopier.scala @@ -13,17 +13,19 @@ class S3ClientCopier(amazonS3: AmazonS3) { sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey) - (implicit info: Int => String => Unit): IO[CopyS3Action] = { - val request = + (implicit info: Int => String => IO[Unit]): IO[CopyS3Action] = { + IO { new CopyObjectRequest( bucket.name, sourceKey.key, bucket.name, targetKey.key) .withMatchingETagConstraint(hash.hash) - IO { - amazonS3.copyObject(request) - }.bracket( - logCopyStart(bucket, sourceKey, targetKey))( - logCopyFinish(bucket, sourceKey,targetKey)) + }.bracket { + request => + for { + _ <- logCopyStart(bucket, sourceKey, targetKey) + result <- IO(amazonS3.copyObject(request)) + } yield result + }(_ => logCopyFinish(bucket, sourceKey,targetKey)) .map(_ => CopyS3Action(targetKey)) } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientDeleter.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientDeleter.scala index 9acbead..158d3dd 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientDeleter.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientDeleter.scala @@ -11,12 +11,14 @@ class S3ClientDeleter(amazonS3: AmazonS3) { def delete(bucket: Bucket, remoteKey: RemoteKey) - (implicit info: Int => String => Unit): IO[DeleteS3Action] = + (implicit info: Int => String => IO[Unit]): IO[DeleteS3Action] = for { _ <- logDeleteStart(bucket, remoteKey) - request = new DeleteObjectRequest(bucket.name, remoteKey.key) - _ <- IO{amazonS3.deleteObject(request)} + _ <- deleteObject(bucket, remoteKey) _ <- logDeleteFinish(bucket, remoteKey) } yield DeleteS3Action(remoteKey) + private def deleteObject(bucket: Bucket, remoteKey: RemoteKey) = IO { + amazonS3.deleteObject(new DeleteObjectRequest(bucket.name, remoteKey.key)) + } } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientLogging.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientLogging.scala index b14dcfc..4392bba 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientLogging.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientLogging.scala @@ -1,70 +1,53 @@ package net.kemitix.s3thorp.aws.lib import cats.effect.IO -import com.amazonaws.services.s3.model.{CopyObjectResult, DeleteObjectsResult, ListObjectsV2Result, PutObjectResult, S3ObjectSummary} +import com.amazonaws.services.s3.model.PutObjectResult import net.kemitix.s3thorp.domain.{Bucket, LocalFile, RemoteKey} object S3ClientLogging { def logListObjectsStart(bucket: Bucket, prefix: RemoteKey) - (implicit info: Int => String => Unit): Stream[S3ObjectSummary] => IO[Stream[S3ObjectSummary]] = - in => IO { - info(3)(s"Fetch S3 Summary: ${bucket.name}:${prefix.key}") - in - } + (implicit info: Int => String => IO[Unit]): IO[Unit] = + info(1)(s"Fetch S3 Summary: ${bucket.name}:${prefix.key}") def logListObjectsFinish(bucket: Bucket, prefix: RemoteKey) - (implicit info: Int => String => Unit): Stream[S3ObjectSummary] => IO[Unit] = - _ => IO { - info(2)(s"Fetched S3 Summary: ${bucket.name}:${prefix.key}") - } + (implicit info: Int => String => IO[Unit]): IO[Unit] = + info(2)(s"Fetched S3 Summary: ${bucket.name}:${prefix.key}") def logUploadStart(localFile: LocalFile, bucket: Bucket) - (implicit info: Int => String => Unit): PutObjectResult => IO[PutObjectResult] = - in => IO { - info(4)(s"Uploading: ${bucket.name}:${localFile.remoteKey.key}") - in - } + (implicit info: Int => String => IO[Unit]): PutObjectResult => IO[PutObjectResult] = + in => for { + _ <- info(1)(s"Uploading: ${bucket.name}:${localFile.remoteKey.key}") + } yield in def logUploadFinish(localFile: LocalFile, bucket: Bucket) - (implicit info: Int => String => Unit): PutObjectResult => IO[Unit] = - _ => IO { - info(1)(s"Uploaded: ${bucket.name}:${localFile.remoteKey.key}") - } + (implicit info: Int => String => IO[Unit]): PutObjectResult => IO[Unit] = + _ => info(2)(s"Uploaded: ${bucket.name}:${localFile.remoteKey.key}") def logCopyStart(bucket: Bucket, sourceKey: RemoteKey, targetKey: RemoteKey) - (implicit info: Int => String => Unit): CopyObjectResult => IO[CopyObjectResult] = - in => IO { - info(4)(s"Copy: ${bucket.name}:${sourceKey.key} => ${targetKey.key}") - in - } + (implicit info: Int => String => IO[Unit]): IO[Unit] = + info(1)(s"Copy: ${bucket.name}:${sourceKey.key} => ${targetKey.key}") def logCopyFinish(bucket: Bucket, sourceKey: RemoteKey, targetKey: RemoteKey) - (implicit info: Int => String => Unit): CopyObjectResult => IO[Unit] = - _ => IO { - info(3)(s"Copied: ${bucket.name}:${sourceKey.key} => ${targetKey.key}") - } + (implicit info: Int => String => IO[Unit]): IO[Unit] = + info(2)(s"Copied: ${bucket.name}:${sourceKey.key} => ${targetKey.key}") def logDeleteStart(bucket: Bucket, remoteKey: RemoteKey) - (implicit info: Int => String => Unit): IO[Unit] = - IO { - info(4)(s"Delete: ${bucket.name}:${remoteKey.key}") - } + (implicit info: Int => String => IO[Unit]): IO[Unit] = + info(1)(s"Delete: ${bucket.name}:${remoteKey.key}") def logDeleteFinish(bucket: Bucket, remoteKey: RemoteKey) - (implicit info: Int => String => Unit): IO[Unit] = - IO { - info(3)(s"Deleted: ${bucket.name}:${remoteKey.key}") - } + (implicit info: Int => String => IO[Unit]): IO[Unit] = + info(2)(s"Deleted: ${bucket.name}:${remoteKey.key}") } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientObjectLister.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientObjectLister.scala index f901425..2cc70d8 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientObjectLister.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientObjectLister.scala @@ -14,22 +14,18 @@ class S3ClientObjectLister(amazonS3: AmazonS3) { def listObjects(bucket: Bucket, prefix: RemoteKey) - (implicit info: Int => String => Unit): IO[S3ObjectsData] = { + (implicit info: Int => String => IO[Unit]): IO[S3ObjectsData] = { type Token = String type Batch = (Stream[S3ObjectSummary], Option[Token]) - val requestInitial = new ListObjectsV2Request() - .withBucketName(bucket.name) - .withPrefix(prefix.key) - val requestMore = (token:Token) => new ListObjectsV2Request() .withBucketName(bucket.name) .withPrefix(prefix.key) .withContinuationToken(token) def fetchBatch: ListObjectsV2Request => IO[Batch] = - request => IO{ + request => IO { val result = amazonS3.listObjectsV2(request) val more: Option[Token] = if (result.isTruncated) Some(result.getNextContinuationToken) @@ -37,21 +33,28 @@ class S3ClientObjectLister(amazonS3: AmazonS3) { (result.getObjectSummaries.asScala.toStream, more) } - def fetchAll: ListObjectsV2Request => IO[Stream[S3ObjectSummary]] = + def fetch: ListObjectsV2Request => IO[Stream[S3ObjectSummary]] = request => for { batch <- fetchBatch(request) (summaries, more) = batch rest <- more match { case None => IO{Stream()} - case Some(token) => fetchAll(requestMore(token)) + case Some(token) => fetch(requestMore(token)) } } yield summaries ++ rest - fetchAll(requestInitial) - .bracket( - logListObjectsStart(bucket, prefix))( - logListObjectsFinish(bucket,prefix)) + IO { + new ListObjectsV2Request() + .withBucketName(bucket.name) + .withPrefix(prefix.key) + }.bracket { + request => + for { + _ <- logListObjectsStart(bucket, prefix) + summaries <- fetch(request) + } yield summaries + }(_ => logListObjectsFinish(bucket,prefix)) .map(os => S3ObjectsData(byHash(os), byKey(os))) } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientTransferManager.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientTransferManager.scala deleted file mode 100644 index 1fafc12..0000000 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientTransferManager.scala +++ /dev/null @@ -1,37 +0,0 @@ -package net.kemitix.s3thorp.aws.lib - -import cats.effect.IO -import com.amazonaws.services.s3.model.PutObjectRequest -import com.amazonaws.services.s3.transfer.TransferManager -import net.kemitix.s3thorp.aws.api.S3Action.UploadS3Action -import net.kemitix.s3thorp.aws.api.{S3Action, UploadProgressListener} -import net.kemitix.s3thorp.aws.lib.S3ClientTransferManagerLogging.{logMultiPartUploadFinished, logMultiPartUploadStart} -import net.kemitix.s3thorp.domain.{Bucket, LocalFile, MD5Hash, RemoteKey} - -class S3ClientTransferManager(transferManager: => TransferManager) - extends S3ClientUploader { - - def accepts(localFile: LocalFile) - (implicit multiPartThreshold: Long): Boolean = - localFile.file.length >= multiPartThreshold - - override - def upload(localFile: LocalFile, - bucket: Bucket, - uploadProgressListener: UploadProgressListener, - multiPartThreshold: Long, - tryCount: Int, - maxRetries: Int) - (implicit info: Int => String => Unit, - warn: String => Unit): IO[S3Action] = { - val putObjectRequest: PutObjectRequest = - new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file) - .withGeneralProgressListener(progressListener(uploadProgressListener)) - for { - _ <- logMultiPartUploadStart(localFile, tryCount) - upload = transferManager.upload(putObjectRequest) - result <- IO{upload.waitForUploadResult} - _ <- logMultiPartUploadFinished(localFile) - } yield UploadS3Action(RemoteKey(result.getKey), MD5Hash(result.getETag)) - } -} diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientTransferManagerLogging.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientTransferManagerLogging.scala deleted file mode 100644 index 2f7a823..0000000 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientTransferManagerLogging.scala +++ /dev/null @@ -1,77 +0,0 @@ -package net.kemitix.s3thorp.aws.lib - -import cats.effect.IO -import com.amazonaws.services.s3.model.{AmazonS3Exception, InitiateMultipartUploadResult, UploadPartRequest, UploadPartResult} -import net.kemitix.s3thorp.domain.{LocalFile, MD5Hash} - -object S3ClientTransferManagerLogging { - - private val prefix = "transfer-manager" - - def logMultiPartUploadStart(localFile: LocalFile, - tryCount: Int) - (implicit info: Int => String => Unit): IO[Unit] = - IO{info(1)(s"$prefix:upload:try $tryCount: ${localFile.remoteKey.key}")} - - def logMultiPartUploadFinished(localFile: LocalFile) - (implicit info: Int => String => Unit): IO[Unit] = - IO{info(4)(s"$prefix:upload:finished: ${localFile.remoteKey.key}")} - - def logMultiPartUploadInitiate(localFile: LocalFile) - (implicit info: Int => String => Unit): Unit = - info(5)(s"$prefix:initiating: ${localFile.remoteKey.key}") - - def logMultiPartUploadPartsDetails(localFile: LocalFile, - nParts: Int, - partSize: Long) - (implicit info: Int => String => Unit): Unit = - info(5)(s"$prefix:parts $nParts:each $partSize: ${localFile.remoteKey.key}") - - def logMultiPartUploadPartDetails(localFile: LocalFile, - partNumber: Int, - partHash: MD5Hash) - (implicit info: Int => String => Unit): Unit = - info(5)(s"$prefix:part $partNumber:hash ${partHash.hash}: ${localFile.remoteKey.key}") - - def logMultiPartUploadPart(localFile: LocalFile, - partRequest: UploadPartRequest) - (implicit info: Int => String => Unit): Unit = - info(5)(s"$prefix:sending:part ${partRequest.getPartNumber}: ${partRequest.getMd5Digest}: ${localFile.remoteKey.key}") - - def logMultiPartUploadPartDone(localFile: LocalFile, - partRequest: UploadPartRequest, - result: UploadPartResult) - (implicit info: Int => String => Unit): Unit = - info(5)(s"$prefix:sent:part ${partRequest.getPartNumber}: ${result.getPartETag}: ${localFile.remoteKey.key}") - - def logMultiPartUploadPartError(localFile: LocalFile, - partRequest: UploadPartRequest, - error: AmazonS3Exception) - (implicit warn: String => Unit): Unit = { - val returnedMD5Hash = error.getAdditionalDetails.get("Content-MD5") - warn(s"$prefix:error:part ${partRequest.getPartNumber}:ret-hash $returnedMD5Hash: ${localFile.remoteKey.key}") - } - - def logMultiPartUploadCompleted(createUploadResponse: InitiateMultipartUploadResult, - uploadPartResponses: Stream[UploadPartResult], - localFile: LocalFile) - (implicit info: Int => String => Unit): Unit = - info(1)(s"$prefix:completed:parts ${uploadPartResponses.size}: ${localFile.remoteKey.key}") - - def logMultiPartUploadCancelling(localFile: LocalFile) - (implicit warn: String => Unit): Unit = - warn(s"$prefix:cancelling: ${localFile.remoteKey.key}") - - def logErrorRetrying(e: Throwable, localFile: LocalFile, tryCount: Int) - (implicit warn: String => Unit): Unit = - warn(s"$prefix:retry:error ${e.getMessage}: ${localFile.remoteKey.key}") - - def logErrorCancelling(e: Throwable, localFile: LocalFile) - (implicit error: String => Unit) : Unit = - error(s"$prefix:cancelling:error ${e.getMessage}: ${localFile.remoteKey.key}") - - def logErrorUnknown(e: Throwable, localFile: LocalFile) - (implicit error: String => Unit): Unit = - error(s"$prefix:unknown:error $e: ${localFile.remoteKey.key}") - -} diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientUploader.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientUploader.scala deleted file mode 100644 index 0dde250..0000000 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientUploader.scala +++ /dev/null @@ -1,35 +0,0 @@ -package net.kemitix.s3thorp.aws.lib - -import cats.effect.IO -import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener} -import net.kemitix.s3thorp.aws.api.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent} -import net.kemitix.s3thorp.aws.api.{S3Action, UploadProgressListener} -import net.kemitix.s3thorp.domain.{Bucket, LocalFile} - -trait S3ClientUploader { - - def accepts(localFile: LocalFile) - (implicit multiPartThreshold: Long): Boolean - - def upload(localFile: LocalFile, - bucket: Bucket, - progressListener: UploadProgressListener, - multiPartThreshold: Long, - tryCount: Int, - maxRetries: Int) - (implicit info: Int => String => Unit, - warn: String => Unit): IO[S3Action] - - def progressListener(uploadProgressListener: UploadProgressListener): ProgressListener = { - new ProgressListener { - override def progressChanged(event: ProgressEvent): Unit = { - event match { - case e if e.getEventType.isTransferEvent => TransferEvent(e.getEventType.name) - case e if e.getEventType equals ProgressEventType.RESPONSE_BYTE_TRANSFER_EVENT => ByteTransferEvent(e.getEventType.name) - case e => RequestEvent(e.getEventType.name, e.getBytes, e.getBytesTransferred) - } - } - } - } - -} diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ObjectsByKey.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ObjectsByKey.scala index a26aeb2..0ed6a7c 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ObjectsByKey.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ObjectsByKey.scala @@ -2,14 +2,13 @@ package net.kemitix.s3thorp.aws.lib import com.amazonaws.services.s3.model.S3ObjectSummary import net.kemitix.s3thorp.domain.{HashModified, LastModified, MD5Hash, RemoteKey} -import net.kemitix.s3thorp.core.QuoteStripper.stripQuotes object S3ObjectsByKey { def byKey(os: Stream[S3ObjectSummary]) = os.map { o => { val remoteKey = RemoteKey(o.getKey) - val hash = MD5Hash(o.getETag filter stripQuotes) + val hash = MD5Hash(o.getETag) val lastModified = LastModified(o.getLastModified.toInstant) (remoteKey, HashModified(hash, lastModified)) }}.toMap diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/ThorpS3Client.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/ThorpS3Client.scala index 3c569d2..fed8353 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/ThorpS3Client.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/ThorpS3Client.scala @@ -13,19 +13,19 @@ class ThorpS3Client(amazonS3Client: => AmazonS3, lazy val objectLister = new S3ClientObjectLister(amazonS3Client) lazy val copier = new S3ClientCopier(amazonS3Client) - lazy val uploader = new S3ClientTransferManager(amazonS3TransferManager) + lazy val uploader = new Uploader(amazonS3TransferManager) lazy val deleter = new S3ClientDeleter(amazonS3Client) override def listObjects(bucket: Bucket, prefix: RemoteKey) - (implicit info: Int => String => Unit): IO[S3ObjectsData] = + (implicit info: Int => String => IO[Unit]): IO[S3ObjectsData] = objectLister.listObjects(bucket, prefix) override def copy(bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey) - (implicit info: Int => String => Unit): IO[CopyS3Action] = + (implicit info: Int => String => IO[Unit]): IO[CopyS3Action] = copier.copy(bucket, sourceKey,hash, targetKey) override def upload(localFile: LocalFile, @@ -34,13 +34,13 @@ class ThorpS3Client(amazonS3Client: => AmazonS3, multiPartThreshold: Long, tryCount: Int, maxRetries: Int) - (implicit info: Int => String => Unit, - warn: String => Unit): IO[S3Action] = + (implicit info: Int => String => IO[Unit], + warn: String => IO[Unit]): IO[S3Action] = uploader.upload(localFile, bucket, progressListener, multiPartThreshold, 1, maxRetries) override def delete(bucket: Bucket, remoteKey: RemoteKey) - (implicit info: Int => String => Unit): IO[DeleteS3Action] = + (implicit info: Int => String => IO[Unit]): IO[DeleteS3Action] = deleter.delete(bucket, remoteKey) } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/Uploader.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/Uploader.scala new file mode 100644 index 0000000..eeba3a2 --- /dev/null +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/Uploader.scala @@ -0,0 +1,64 @@ +package net.kemitix.s3thorp.aws.lib + +import cats.effect.IO +import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener} +import com.amazonaws.services.s3.model.PutObjectRequest +import com.amazonaws.services.s3.transfer.{TransferManager => AmazonTransferManager} +import net.kemitix.s3thorp.aws.api.S3Action.UploadS3Action +import net.kemitix.s3thorp.aws.api.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent} +import net.kemitix.s3thorp.aws.api.{S3Action, UploadProgressListener} +import net.kemitix.s3thorp.aws.lib.UploaderLogging.{logMultiPartUploadFinished, logMultiPartUploadStart} +import net.kemitix.s3thorp.domain.{Bucket, LocalFile, MD5Hash, RemoteKey} + +class Uploader(transferManager: => AmazonTransferManager) { + + def accepts(localFile: LocalFile) + (implicit multiPartThreshold: Long): Boolean = + localFile.file.length >= multiPartThreshold + + def upload(localFile: LocalFile, + bucket: Bucket, + uploadProgressListener: UploadProgressListener, + multiPartThreshold: Long, + tryCount: Int, + maxRetries: Int) + (implicit info: Int => String => IO[Unit], + warn: String => IO[Unit]): IO[S3Action] = { + for { + _ <- logMultiPartUploadStart(localFile, tryCount) + listener = progressListener(uploadProgressListener) + putObjectRequest = request(localFile, bucket, listener) + upload = transferManager.upload(putObjectRequest) + result <- IO{upload.waitForUploadResult} + _ <- logMultiPartUploadFinished(localFile) + } yield UploadS3Action(RemoteKey(result.getKey), MD5Hash(result.getETag)) + } + + private def request(localFile: LocalFile, bucket: Bucket, listener: ProgressListener): PutObjectRequest = { + new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file) + .withGeneralProgressListener(listener) + } + + private def progressListener(uploadProgressListener: UploadProgressListener) = + new ProgressListener { + override def progressChanged(progressEvent: ProgressEvent): Unit = { + uploadProgressListener.listener( + progressEvent match { + case e: ProgressEvent if isTransfer(e) => + TransferEvent(e.getEventType.name) + case e: ProgressEvent if isByteTransfer(e) => + ByteTransferEvent(e.getEventType.name) + case e: ProgressEvent => + RequestEvent(e.getEventType.name, e.getBytes, e.getBytesTransferred) + }) + .unsafeRunSync // the listener doesn't execute otherwise as it is never returned + } + } + + private def isTransfer(e: ProgressEvent) = + e.getEventType.isTransferEvent + + private def isByteTransfer(e: ProgressEvent) = + e.getEventType equals ProgressEventType.RESPONSE_BYTE_TRANSFER_EVENT + +} diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/UploaderLogging.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/UploaderLogging.scala new file mode 100644 index 0000000..645d138 --- /dev/null +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/UploaderLogging.scala @@ -0,0 +1,22 @@ +package net.kemitix.s3thorp.aws.lib + +import cats.effect.IO +import net.kemitix.s3thorp.domain.Terminal.clearLine +import net.kemitix.s3thorp.domain.SizeTranslation.sizeInEnglish +import net.kemitix.s3thorp.domain.LocalFile + +object UploaderLogging { + + def logMultiPartUploadStart(localFile: LocalFile, + tryCount: Int) + (implicit info: Int => String => IO[Unit]): IO[Unit] = { + val tryMessage = if (tryCount == 1) "" else s"try $tryCount" + val size = sizeInEnglish(localFile.file.length) + info(1)(s"${clearLine}upload:$tryMessage:$size:${localFile.remoteKey.key}") + } + + def logMultiPartUploadFinished(localFile: LocalFile) + (implicit info: Int => String => IO[Unit]): IO[Unit] = + info(4)(s"upload:finished: ${localFile.remoteKey.key}") + +} diff --git a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ClientSuite.scala b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ClientSuite.scala index 568abca..d6bba63 100644 --- a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ClientSuite.scala +++ b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ClientSuite.scala @@ -2,6 +2,7 @@ package net.kemitix.s3thorp.aws.lib import java.time.Instant +import cats.effect.IO import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.PutObjectRequest import com.amazonaws.services.s3.transfer.model.UploadResult @@ -22,8 +23,8 @@ class S3ClientSuite private val prefix = RemoteKey("prefix") implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) - implicit private val logInfo: Int => String => Unit = l => m => () - implicit private val logWarn: String => Unit = w => () + implicit private val logInfo: Int => String => IO[Unit] = _ => _ => IO.unit + implicit private val logWarn: String => IO[Unit] = _ => IO.unit private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _ describe("getS3Status") { diff --git a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/ThorpS3ClientSuite.scala b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/ThorpS3ClientSuite.scala index e6a2cb0..9214bdc 100644 --- a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/ThorpS3ClientSuite.scala +++ b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/ThorpS3ClientSuite.scala @@ -3,6 +3,7 @@ package net.kemitix.s3thorp.aws.lib import java.time.Instant import java.util.Date +import cats.effect.IO import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.{ListObjectsV2Request, ListObjectsV2Result, S3ObjectSummary} import com.amazonaws.services.s3.transfer.TransferManager @@ -19,7 +20,7 @@ class ThorpS3ClientSuite val source = Resource(this, "upload") val prefix = RemoteKey("prefix") implicit val config: Config = Config(Bucket("bucket"), prefix, source = source) - implicit val logInfo: Int => String => Unit = l => m => () + implicit val logInfo: Int => String => IO[Unit] = _ => _ => IO.unit val lm = LastModified(Instant.now) diff --git a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ClientTransferManagerSuite.scala b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/UploaderSuite.scala similarity index 90% rename from aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ClientTransferManagerSuite.scala rename to aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/UploaderSuite.scala index 9b5fc1f..7add7ca 100644 --- a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ClientTransferManagerSuite.scala +++ b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/UploaderSuite.scala @@ -2,6 +2,7 @@ package net.kemitix.s3thorp.aws.lib import java.time.Instant +import cats.effect.IO import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.transfer._ import net.kemitix.s3thorp.aws.api.S3Action.UploadS3Action @@ -12,22 +13,22 @@ import net.kemitix.s3thorp.domain._ import org.scalamock.scalatest.MockFactory import org.scalatest.FunSpec -class S3ClientTransferManagerSuite +class UploaderSuite extends FunSpec with MockFactory { private val source = Resource(this, ".") private val prefix = RemoteKey("prefix") implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) - implicit private val logInfo: Int => String => Unit = l => m => () - implicit private val logWarn: String => Unit = w => () + implicit private val logInfo: Int => String => IO[Unit] = _ => _ => IO.unit + implicit private val logWarn: String => IO[Unit] = _ => IO.unit private val fileToKey = generateKey(config.source, config.prefix) _ val lastModified = LastModified(Instant.now()) describe("S3ClientMultiPartTransferManagerSuite") { describe("accepts") { val transferManager = stub[TransferManager] - val uploader = new S3ClientTransferManager(transferManager) + val uploader = new Uploader(transferManager) describe("small-file") { val smallFile = LocalFile.resolve("small-file", MD5Hash("the-hash"), source, fileToKey) it("should be a small-file") { @@ -59,7 +60,7 @@ class S3ClientTransferManagerSuite val progressListener = new UploadProgressListener(bigFile) val amazonS3 = mock[AmazonS3] val amazonS3TransferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build - val uploader = new S3ClientTransferManager(amazonS3TransferManager) + val uploader = new Uploader(amazonS3TransferManager) it("should upload") { val expected = UploadS3Action(returnedKey, returnedHash) val result = uploader.upload(bigFile, config.bucket, progressListener, config.multiPartThreshold, 1, config.maxRetries).unsafeRunSync diff --git a/build.sbt b/build.sbt index 59111b2..b8fb0c7 100644 --- a/build.sbt +++ b/build.sbt @@ -22,12 +22,6 @@ val awsSdkDependencies = Seq( "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % "2.9.9" ) ) -val loggingSettings = Seq( - libraryDependencies ++= Seq( - "com.typesafe.scala-logging" %% "scala-logging" % "3.9.2", - "org.slf4j" % "slf4j-log4j12" % "1.7.26", - ) -) val catsEffectsSettings = Seq( libraryDependencies ++= Seq( "org.typelevel" %% "cats-effect" % "1.3.1" @@ -47,7 +41,6 @@ val catsEffectsSettings = Seq( lazy val cli = (project in file("cli")) .settings(applicationSettings) .aggregate(`aws-lib`, core, `aws-api`, domain) - .settings(loggingSettings) .settings(commandLineParsing) .dependsOn(`aws-lib`) diff --git a/cli/src/main/resources/log4j.xml b/cli/src/main/resources/log4j.xml deleted file mode 100644 index d0564b2..0000000 --- a/cli/src/main/resources/log4j.xml +++ /dev/null @@ -1,21 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/cli/src/main/scala/net/kemitix/s3thorp/cli/Logger.scala b/cli/src/main/scala/net/kemitix/s3thorp/cli/Logger.scala index fae0a48..0ea2a5e 100644 --- a/cli/src/main/scala/net/kemitix/s3thorp/cli/Logger.scala +++ b/cli/src/main/scala/net/kemitix/s3thorp/cli/Logger.scala @@ -1,14 +1,15 @@ package net.kemitix.s3thorp.cli -import com.typesafe.scalalogging.LazyLogging -import net.kemitix.s3thorp.domain.Config +import cats.effect.IO -class Logger(verbosity: Int) extends LazyLogging { +class Logger(verbosity: Int) { - def info(level: Int)(message: String): Unit = if (verbosity >= level) logger.info(s"1:$message") + def info(level: Int)(message: String): IO[Unit] = + if (verbosity >= level) IO(println(s"[INFO:$level] $message")) + else IO.unit - def warn(message: String): Unit = logger.warn(message) + def warn(message: String): IO[Unit] = IO(println(s"[ WARN] $message")) - def error(message: String): Unit = logger.error(message) + def error(message: String): IO[Unit] = IO(println(s"[ ERROR] $message")) } diff --git a/cli/src/main/scala/net/kemitix/s3thorp/cli/Main.scala b/cli/src/main/scala/net/kemitix/s3thorp/cli/Main.scala index 1fe0650..4b93814 100644 --- a/cli/src/main/scala/net/kemitix/s3thorp/cli/Main.scala +++ b/cli/src/main/scala/net/kemitix/s3thorp/cli/Main.scala @@ -21,7 +21,7 @@ object Main extends IOApp { logger = new Logger(config.verbose) info = (l: Int) => (m: String) => logger.info(l)(m) md5HashGenerator = (file: File) => md5File(file)(info) - _ <- IO(logger.info(1)("S3Thorp - hashed sync for s3")) + _ <- logger.info(1)("S3Thorp - hashed sync for s3") _ <- Sync.run( S3ClientBuilder.defaultClient, md5HashGenerator, @@ -34,9 +34,9 @@ object Main extends IOApp { val logger = new Logger(1) program(args) .guaranteeCase { - case Canceled => IO(logger.warn("Interrupted")) - case Error(e) => IO(logger.error(e.getMessage)) - case Completed => IO(logger.info(1)("Done")) + case Canceled => logger.warn("Interrupted") + case Error(e) => logger.error(e.getMessage) + case Completed => logger.info(1)("Done") } } diff --git a/core/src/main/scala/net.kemitix.s3thorp.core/ActionSubmitter.scala b/core/src/main/scala/net.kemitix.s3thorp.core/ActionSubmitter.scala index 08a9fbd..f588f90 100644 --- a/core/src/main/scala/net.kemitix.s3thorp.core/ActionSubmitter.scala +++ b/core/src/main/scala/net.kemitix.s3thorp.core/ActionSubmitter.scala @@ -10,22 +10,28 @@ object ActionSubmitter { def submitAction(s3Client: S3Client, action: Action) (implicit c: Config, - info: Int => String => Unit, - warn: String => Unit): Stream[IO[S3Action]] = { + info: Int => String => IO[Unit], + warn: String => IO[Unit]): Stream[IO[S3Action]] = { Stream( action match { case ToUpload(bucket, localFile) => - info(4)(s" Upload: ${localFile.relative}") - val progressListener = new UploadProgressListener(localFile) - s3Client.upload(localFile, bucket, progressListener, c.multiPartThreshold, 1, c.maxRetries) + for { + _ <- info(4) (s" Upload: ${localFile.relative}") + progressListener = new UploadProgressListener(localFile) + action <- s3Client.upload(localFile, bucket, progressListener, c.multiPartThreshold, 1, c.maxRetries) + } yield action case ToCopy(bucket, sourceKey, hash, targetKey) => - info(4)(s" Copy: ${sourceKey.key} => ${targetKey.key}") - s3Client.copy(bucket, sourceKey, hash, targetKey) + for { + _ <- info(4)(s" Copy: ${sourceKey.key} => ${targetKey.key}") + action <- s3Client.copy(bucket, sourceKey, hash, targetKey) + } yield action case ToDelete(bucket, remoteKey) => - info(4)(s" Delete: ${remoteKey.key}") - s3Client.delete(bucket, remoteKey) - case DoNothing(bucket, remoteKey) => IO { - DoNothingS3Action(remoteKey)} + for { + _ <- info(4)(s" Delete: ${remoteKey.key}") + action <- s3Client.delete(bucket, remoteKey) + } yield action + case DoNothing(bucket, remoteKey) => + IO.pure(DoNothingS3Action(remoteKey)) }) } } diff --git a/core/src/main/scala/net.kemitix.s3thorp.core/LocalFileStream.scala b/core/src/main/scala/net.kemitix.s3thorp.core/LocalFileStream.scala index e931ac7..ece56c4 100644 --- a/core/src/main/scala/net.kemitix.s3thorp.core/LocalFileStream.scala +++ b/core/src/main/scala/net.kemitix.s3thorp.core/LocalFileStream.scala @@ -11,7 +11,7 @@ object LocalFileStream { def findFiles(file: File, md5HashGenerator: File => IO[MD5Hash], - info: Int => String => Unit) + info: Int => String => IO[Unit]) (implicit c: Config): IO[Stream[LocalFile]] = { val filters: Path => Boolean = Filter.isIncluded(c.filters) diff --git a/core/src/main/scala/net.kemitix.s3thorp.core/MD5HashGenerator.scala b/core/src/main/scala/net.kemitix.s3thorp.core/MD5HashGenerator.scala index 3c18b19..8095cbe 100644 --- a/core/src/main/scala/net.kemitix.s3thorp.core/MD5HashGenerator.scala +++ b/core/src/main/scala/net.kemitix.s3thorp.core/MD5HashGenerator.scala @@ -9,13 +9,13 @@ import net.kemitix.s3thorp.domain.MD5Hash object MD5HashGenerator { def md5File(file: File) - (implicit info: Int => String => Unit): IO[MD5Hash] = + (implicit info: Int => String => IO[Unit]): IO[MD5Hash] = md5FilePart(file, 0, file.length) def md5FilePart(file: File, offset: Long, size: Long) - (implicit info: Int => String => Unit): IO[MD5Hash] = { + (implicit info: Int => String => IO[Unit]): IO[MD5Hash] = { val buffer = new Array[Byte](size.toInt) def readIntoBuffer = { @@ -34,10 +34,10 @@ object MD5HashGenerator { def readFile = openFile.bracket(readIntoBuffer)(closeFile) for { - _ <- IO(info(5)(s"md5:reading:offset $offset:size $size:$file")) + _ <- info(5)(s"md5:reading:offset $offset:size $size:$file") _ <- readFile hash = md5PartBody(buffer) - _ <- IO (info(5)(s"md5:generated:${hash.hash}")) + _ <- info(4)(s"md5:generated:${hash.hash}:$file") } yield hash } diff --git a/core/src/main/scala/net.kemitix.s3thorp.core/Sync.scala b/core/src/main/scala/net.kemitix.s3thorp.core/Sync.scala index 1d8a665..6dea7f2 100644 --- a/core/src/main/scala/net.kemitix.s3thorp.core/Sync.scala +++ b/core/src/main/scala/net.kemitix.s3thorp.core/Sync.scala @@ -17,9 +17,9 @@ object Sync { def run(s3Client: S3Client, md5HashGenerator: File => IO[MD5Hash], - info: Int => String => Unit, - warn: String => Unit, - error: String => Unit) + info: Int => String => IO[Unit], + warn: String => IO[Unit], + error: String => IO[Unit]) (implicit c: Config): IO[Unit] = { def copyUploadActions(s3Data: S3ObjectsData): IO[Stream[S3Action]] = diff --git a/core/src/main/scala/net.kemitix.s3thorp.core/SyncLogging.scala b/core/src/main/scala/net.kemitix.s3thorp.core/SyncLogging.scala index 1b1137b..2b926e9 100644 --- a/core/src/main/scala/net.kemitix.s3thorp.core/SyncLogging.scala +++ b/core/src/main/scala/net.kemitix.s3thorp.core/SyncLogging.scala @@ -8,29 +8,33 @@ import net.kemitix.s3thorp.domain.Config // Logging for the Sync class object SyncLogging { - def logRunStart[F[_]](info: Int => String => Unit)(implicit c: Config): IO[Unit] = IO { - info(1)(s"Bucket: ${c.bucket.name}, Prefix: ${c.prefix.key}, Source: ${c.source}, ")} + def logRunStart[F[_]](info: Int => String => IO[Unit]) + (implicit c: Config): IO[Unit] = + info(1)(s"Bucket: ${c.bucket.name}, Prefix: ${c.prefix.key}, Source: ${c.source}, ") - def logFileScan(info: Int => String => Unit)(implicit c: Config): IO[Unit] = IO{ - info(1)(s"Scanning local files: ${c.source}...")} + def logFileScan(info: Int => String => IO[Unit]) + (implicit c: Config): IO[Unit] = + info(1)(s"Scanning local files: ${c.source}...") def logRunFinished(actions: Stream[S3Action], - info: Int => String => Unit) - (implicit c: Config): IO[Unit] = IO { - val counters = actions.foldLeft(Counters())(countActivities) - info(1)(s"Uploaded ${counters.uploaded} files") - info(1)(s"Copied ${counters.copied} files") - info(1)(s"Deleted ${counters.deleted} files") - } + info: Int => String => IO[Unit]) + (implicit c: Config): IO[Unit] = + for { + _ <- IO.unit + counters = actions.foldLeft(Counters())(countActivities) + _ <- info(1)(s"Uploaded ${counters.uploaded} files") + _ <- info(1)(s"Copied ${counters.copied} files") + _ <- info(1)(s"Deleted ${counters.deleted} files") + } yield () private def countActivities(implicit c: Config): (Counters, S3Action) => Counters = (counters: Counters, s3Action: S3Action) => { s3Action match { - case UploadS3Action(remoteKey, _) => + case _: UploadS3Action => counters.copy(uploaded = counters.uploaded + 1) - case CopyS3Action(remoteKey) => + case _: CopyS3Action => counters.copy(copied = counters.copied + 1) - case DeleteS3Action(remoteKey) => + case _: DeleteS3Action => counters.copy(deleted = counters.deleted + 1) case _ => counters } diff --git a/core/src/test/scala/net/kemitix/s3thorp/core/LocalFileStreamSuite.scala b/core/src/test/scala/net/kemitix/s3thorp/core/LocalFileStreamSuite.scala index fe16c52..76a2ddc 100644 --- a/core/src/test/scala/net/kemitix/s3thorp/core/LocalFileStreamSuite.scala +++ b/core/src/test/scala/net/kemitix/s3thorp/core/LocalFileStreamSuite.scala @@ -10,7 +10,7 @@ class LocalFileStreamSuite extends FunSpec { val uploadResource = Resource(this, "upload") val config: Config = Config(source = uploadResource) - implicit private val logInfo: Int => String => Unit = l => i => () + implicit private val logInfo: Int => String => IO[Unit] = l => i => IO.unit val md5HashGenerator: File => IO[MD5Hash] = file => MD5HashGenerator.md5File(file) describe("findFiles") { diff --git a/core/src/test/scala/net/kemitix/s3thorp/core/MD5HashGeneratorTest.scala b/core/src/test/scala/net/kemitix/s3thorp/core/MD5HashGeneratorTest.scala index 3e071b8..439fdf0 100644 --- a/core/src/test/scala/net/kemitix/s3thorp/core/MD5HashGeneratorTest.scala +++ b/core/src/test/scala/net/kemitix/s3thorp/core/MD5HashGeneratorTest.scala @@ -2,6 +2,7 @@ package net.kemitix.s3thorp.core import java.nio.file.Files +import cats.effect.IO import net.kemitix.s3thorp.core.MD5HashData.rootHash import net.kemitix.s3thorp.domain.{Bucket, Config, MD5Hash, RemoteKey} import org.scalatest.FunSpec @@ -11,7 +12,7 @@ class MD5HashGeneratorTest extends FunSpec { private val source = Resource(this, "upload") private val prefix = RemoteKey("prefix") implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) - implicit private val logInfo: Int => String => Unit = l => i => () + implicit private val logInfo: Int => String => IO[Unit] = l => i => IO.unit describe("read a small file (smaller than buffer)") { val file = Resource(this, "upload/root-file") diff --git a/core/src/test/scala/net/kemitix/s3thorp/core/SyncSuite.scala b/core/src/test/scala/net/kemitix/s3thorp/core/SyncSuite.scala index 34cb296..732f8ee 100644 --- a/core/src/test/scala/net/kemitix/s3thorp/core/SyncSuite.scala +++ b/core/src/test/scala/net/kemitix/s3thorp/core/SyncSuite.scala @@ -17,9 +17,9 @@ class SyncSuite private val source = Resource(this, "upload") private val prefix = RemoteKey("prefix") implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) - implicit private val logInfo: Int => String => Unit = l => i => () - implicit private val logWarn: String => Unit = w => () - private def logError: String => Unit = e => () + implicit private val logInfo: Int => String => IO[Unit] = _ => _ => IO.unit + implicit private val logWarn: String => IO[Unit] = _ => IO.unit + private def logError: String => IO[Unit] = _ => IO.unit private val lastModified = LastModified(Instant.now) private val fileToKey: File => RemoteKey = KeyGenerator.generateKey(source, prefix) private val rootFile = LocalFile.resolve("root-file", rootHash, source, fileToKey) @@ -149,10 +149,8 @@ class SyncSuite override def listObjects(bucket: Bucket, prefix: RemoteKey) - (implicit info: Int => String => Unit) = - IO { - s3ObjectsData - } + (implicit info: Int => String => IO[Unit]) = + IO.pure(s3ObjectsData) override def upload(localFile: LocalFile, bucket: Bucket, @@ -160,8 +158,8 @@ class SyncSuite multiPartThreshold: Long, tryCount: Int, maxRetries: Int) - (implicit info: Int => String => Unit, - warn: String => Unit) = + (implicit info: Int => String => IO[Unit], + warn: String => IO[Unit]) = IO { if (bucket == testBucket) uploadsRecord += (localFile.relative.toString -> localFile.remoteKey) @@ -172,7 +170,7 @@ class SyncSuite sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey - )(implicit info: Int => String => Unit) = + )(implicit info: Int => String => IO[Unit]) = IO { if (bucket == testBucket) copiesRecord += (sourceKey -> targetKey) @@ -181,7 +179,7 @@ class SyncSuite override def delete(bucket: Bucket, remoteKey: RemoteKey - )(implicit info: Int => String => Unit) = + )(implicit info: Int => String => IO[Unit]) = IO { if (bucket == testBucket) deletionsRecord += remoteKey diff --git a/domain/src/main/scala/net/kemitix/s3thorp/domain/MD5Hash.scala b/domain/src/main/scala/net/kemitix/s3thorp/domain/MD5Hash.scala index 842a542..f37e201 100644 --- a/domain/src/main/scala/net/kemitix/s3thorp/domain/MD5Hash.scala +++ b/domain/src/main/scala/net/kemitix/s3thorp/domain/MD5Hash.scala @@ -1,7 +1,9 @@ package net.kemitix.s3thorp.domain -final case class MD5Hash(hash: String) { +import net.kemitix.s3thorp.domain.QuoteStripper.stripQuotes - require(!hash.contains("\"")) +final case class MD5Hash(in: String) { + + lazy val hash: String = in filter stripQuotes } diff --git a/core/src/main/scala/net.kemitix.s3thorp.core/QuoteStripper.scala b/domain/src/main/scala/net/kemitix/s3thorp/domain/QuoteStripper.scala similarity index 67% rename from core/src/main/scala/net.kemitix.s3thorp.core/QuoteStripper.scala rename to domain/src/main/scala/net/kemitix/s3thorp/domain/QuoteStripper.scala index 0f6d8f3..c432fba 100644 --- a/core/src/main/scala/net.kemitix.s3thorp.core/QuoteStripper.scala +++ b/domain/src/main/scala/net/kemitix/s3thorp/domain/QuoteStripper.scala @@ -1,4 +1,4 @@ -package net.kemitix.s3thorp.core +package net.kemitix.s3thorp.domain object QuoteStripper { diff --git a/domain/src/main/scala/net/kemitix/s3thorp/domain/SizeTranslation.scala b/domain/src/main/scala/net/kemitix/s3thorp/domain/SizeTranslation.scala new file mode 100644 index 0000000..ee21975 --- /dev/null +++ b/domain/src/main/scala/net/kemitix/s3thorp/domain/SizeTranslation.scala @@ -0,0 +1,14 @@ +package net.kemitix.s3thorp.domain + +object SizeTranslation { + + def sizeInEnglish(length: Long): String = + length match { + case bytes if bytes > 1024 * 1024 * 1024 => s"${bytes / 1024 / 1024 /1024}Gb" + case bytes if bytes > 1024 * 1024 => s"${bytes / 1024 / 1024}Mb" + case bytes if bytes > 1024 => s"${bytes / 1024}Kb" + case bytes => s"${length}b" + } + + +} diff --git a/domain/src/main/scala/net/kemitix/s3thorp/domain/Terminal.scala b/domain/src/main/scala/net/kemitix/s3thorp/domain/Terminal.scala new file mode 100644 index 0000000..b4b8984 --- /dev/null +++ b/domain/src/main/scala/net/kemitix/s3thorp/domain/Terminal.scala @@ -0,0 +1,28 @@ +package net.kemitix.s3thorp.domain + +object Terminal { + + /** + * Clears the whole terminal line. + */ + val clearLine = "\u001B[2K\r" + /** + * Moves the cursor up one line and back to the start of the line. + */ + val returnToPreviousLine = "\u001B[1A\r" + + /** + * The Width of the terminal, as reported by the COLUMNS environment variable. + * + * N.B. Not all environment will update this value when the terminal is resized. + * + * @return the number of columns in the terminal + */ + def width: Int = { + Option(System.getenv("COLUMNS")) + .map(_.toInt) + .map(Math.max(_, 10)) + .getOrElse(80) + } + +}