diff --git a/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/S3Client.scala b/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/S3Client.scala index 99fef29..0b0b9ff 100644 --- a/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/S3Client.scala +++ b/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/S3Client.scala @@ -8,7 +8,7 @@ trait S3Client { def listObjects(bucket: Bucket, prefix: RemoteKey - )(implicit info: Int => String => Unit): IO[S3ObjectsData] + )(implicit info: Int => String => IO[Unit]): IO[S3ObjectsData] def upload(localFile: LocalFile, bucket: Bucket, @@ -16,17 +16,17 @@ trait S3Client { multiPartThreshold: Long, tryCount: Int, maxRetries: Int) - (implicit info: Int => String => Unit, - warn: String => Unit): IO[S3Action] + (implicit info: Int => String => IO[Unit], + warn: String => IO[Unit]): IO[S3Action] def copy(bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey - )(implicit info: Int => String => Unit): IO[CopyS3Action] + )(implicit info: Int => String => IO[Unit]): IO[CopyS3Action] def delete(bucket: Bucket, remoteKey: RemoteKey - )(implicit info: Int => String => Unit): IO[DeleteS3Action] + )(implicit info: Int => String => IO[Unit]): IO[DeleteS3Action] } diff --git a/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressListener.scala b/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressListener.scala index 8bd94b2..879e015 100644 --- a/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressListener.scala +++ b/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressListener.scala @@ -1,16 +1,23 @@ package net.kemitix.s3thorp.aws.api +import cats.effect.IO import net.kemitix.s3thorp.aws.api.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent} import net.kemitix.s3thorp.domain.LocalFile class UploadProgressListener(localFile: LocalFile) - (implicit info: Int => String => Unit) + (implicit info: Int => String => IO[Unit]) extends UploadProgressLogging { - def listener: UploadEvent => Unit = + var bytesTransferred = 0L + + def listener: UploadEvent => IO[Unit] = { case e: TransferEvent => logTransfer(localFile, e) - case e: RequestEvent => logRequestCycle(localFile, e) + case e: RequestEvent => { + val transferred = e.transferred + bytesTransferred += transferred + logRequestCycle(localFile, e, bytesTransferred) + } case e: ByteTransferEvent => logByteTransfer(e) } } diff --git a/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressLogging.scala b/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressLogging.scala index 46a4213..3ad231b 100644 --- a/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressLogging.scala +++ b/aws-api/src/main/scala/net/kemitix/s3thorp/aws/api/UploadProgressLogging.scala @@ -1,22 +1,43 @@ package net.kemitix.s3thorp.aws.api +import cats.effect.IO import net.kemitix.s3thorp.aws.api.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent} -import net.kemitix.s3thorp.domain.LocalFile +import net.kemitix.s3thorp.domain.Terminal.{clearLine, returnToPreviousLine} +import net.kemitix.s3thorp.domain.{Terminal, LocalFile} +import net.kemitix.s3thorp.domain.SizeTranslation.sizeInEnglish + +import scala.io.AnsiColor._ trait UploadProgressLogging { def logTransfer(localFile: LocalFile, event: TransferEvent) - (implicit info: Int => String => Unit): Unit = - info(2)(s"Transfer:${event.name}: ${localFile.remoteKey.key}") + (implicit info: Int => String => IO[Unit]): IO[Unit] = + info(2)(s"Transfer:${event.name}: ${localFile.remoteKey.key}") + + private val oneHundredPercent = 100 def logRequestCycle(localFile: LocalFile, - event: RequestEvent) - (implicit info: Int => String => Unit): Unit = - info(3)(s"Uploading:${event.name}:${event.transferred}/${event.bytes}:${localFile.remoteKey.key}") + event: RequestEvent, + bytesTransferred: Long) + (implicit info: Int => String => IO[Unit]): IO[Unit] = { + val remoteKey = localFile.remoteKey.key + val fileLength = localFile.file.length + val consoleWidth = Terminal.width - 2 + val done = ((bytesTransferred.toDouble / fileLength.toDouble) * consoleWidth).toInt + if (done < oneHundredPercent) { + val head = s"$GREEN_B$GREEN#$RESET" * done + val tail = " " * (consoleWidth - done) + val bar = s"[$head$tail]" + val transferred = sizeInEnglish(bytesTransferred) + val fileSize = sizeInEnglish(fileLength) + IO(print(s"${clearLine}Uploading $transferred of $fileSize : $remoteKey\n$bar$returnToPreviousLine")) + } else + IO(print(clearLine)) + } def logByteTransfer(event: ByteTransferEvent) - (implicit info: Int => String => Unit): Unit = + (implicit info: Int => String => IO[Unit]): IO[Unit] = info(3)(".") } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/CancellableMultiPartUpload.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/CancellableMultiPartUpload.scala deleted file mode 100644 index f0c0de7..0000000 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/CancellableMultiPartUpload.scala +++ /dev/null @@ -1,5 +0,0 @@ -package net.kemitix.s3thorp.aws.lib - -final case class CancellableMultiPartUpload( - e: Throwable, - uploadId: String) extends Exception(e) diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientCopier.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientCopier.scala index 8e1ec2a..902d95c 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientCopier.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientCopier.scala @@ -13,17 +13,19 @@ class S3ClientCopier(amazonS3: AmazonS3) { sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey) - (implicit info: Int => String => Unit): IO[CopyS3Action] = { - val request = + (implicit info: Int => String => IO[Unit]): IO[CopyS3Action] = { + IO { new CopyObjectRequest( bucket.name, sourceKey.key, bucket.name, targetKey.key) .withMatchingETagConstraint(hash.hash) - IO { - amazonS3.copyObject(request) - }.bracket( - logCopyStart(bucket, sourceKey, targetKey))( - logCopyFinish(bucket, sourceKey,targetKey)) + }.bracket { + request => + for { + _ <- logCopyStart(bucket, sourceKey, targetKey) + result <- IO(amazonS3.copyObject(request)) + } yield result + }(_ => logCopyFinish(bucket, sourceKey,targetKey)) .map(_ => CopyS3Action(targetKey)) } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientDeleter.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientDeleter.scala index 9acbead..158d3dd 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientDeleter.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientDeleter.scala @@ -11,12 +11,14 @@ class S3ClientDeleter(amazonS3: AmazonS3) { def delete(bucket: Bucket, remoteKey: RemoteKey) - (implicit info: Int => String => Unit): IO[DeleteS3Action] = + (implicit info: Int => String => IO[Unit]): IO[DeleteS3Action] = for { _ <- logDeleteStart(bucket, remoteKey) - request = new DeleteObjectRequest(bucket.name, remoteKey.key) - _ <- IO{amazonS3.deleteObject(request)} + _ <- deleteObject(bucket, remoteKey) _ <- logDeleteFinish(bucket, remoteKey) } yield DeleteS3Action(remoteKey) + private def deleteObject(bucket: Bucket, remoteKey: RemoteKey) = IO { + amazonS3.deleteObject(new DeleteObjectRequest(bucket.name, remoteKey.key)) + } } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientLogging.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientLogging.scala index b14dcfc..4392bba 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientLogging.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientLogging.scala @@ -1,70 +1,53 @@ package net.kemitix.s3thorp.aws.lib import cats.effect.IO -import com.amazonaws.services.s3.model.{CopyObjectResult, DeleteObjectsResult, ListObjectsV2Result, PutObjectResult, S3ObjectSummary} +import com.amazonaws.services.s3.model.PutObjectResult import net.kemitix.s3thorp.domain.{Bucket, LocalFile, RemoteKey} object S3ClientLogging { def logListObjectsStart(bucket: Bucket, prefix: RemoteKey) - (implicit info: Int => String => Unit): Stream[S3ObjectSummary] => IO[Stream[S3ObjectSummary]] = - in => IO { - info(3)(s"Fetch S3 Summary: ${bucket.name}:${prefix.key}") - in - } + (implicit info: Int => String => IO[Unit]): IO[Unit] = + info(1)(s"Fetch S3 Summary: ${bucket.name}:${prefix.key}") def logListObjectsFinish(bucket: Bucket, prefix: RemoteKey) - (implicit info: Int => String => Unit): Stream[S3ObjectSummary] => IO[Unit] = - _ => IO { - info(2)(s"Fetched S3 Summary: ${bucket.name}:${prefix.key}") - } + (implicit info: Int => String => IO[Unit]): IO[Unit] = + info(2)(s"Fetched S3 Summary: ${bucket.name}:${prefix.key}") def logUploadStart(localFile: LocalFile, bucket: Bucket) - (implicit info: Int => String => Unit): PutObjectResult => IO[PutObjectResult] = - in => IO { - info(4)(s"Uploading: ${bucket.name}:${localFile.remoteKey.key}") - in - } + (implicit info: Int => String => IO[Unit]): PutObjectResult => IO[PutObjectResult] = + in => for { + _ <- info(1)(s"Uploading: ${bucket.name}:${localFile.remoteKey.key}") + } yield in def logUploadFinish(localFile: LocalFile, bucket: Bucket) - (implicit info: Int => String => Unit): PutObjectResult => IO[Unit] = - _ => IO { - info(1)(s"Uploaded: ${bucket.name}:${localFile.remoteKey.key}") - } + (implicit info: Int => String => IO[Unit]): PutObjectResult => IO[Unit] = + _ => info(2)(s"Uploaded: ${bucket.name}:${localFile.remoteKey.key}") def logCopyStart(bucket: Bucket, sourceKey: RemoteKey, targetKey: RemoteKey) - (implicit info: Int => String => Unit): CopyObjectResult => IO[CopyObjectResult] = - in => IO { - info(4)(s"Copy: ${bucket.name}:${sourceKey.key} => ${targetKey.key}") - in - } + (implicit info: Int => String => IO[Unit]): IO[Unit] = + info(1)(s"Copy: ${bucket.name}:${sourceKey.key} => ${targetKey.key}") def logCopyFinish(bucket: Bucket, sourceKey: RemoteKey, targetKey: RemoteKey) - (implicit info: Int => String => Unit): CopyObjectResult => IO[Unit] = - _ => IO { - info(3)(s"Copied: ${bucket.name}:${sourceKey.key} => ${targetKey.key}") - } + (implicit info: Int => String => IO[Unit]): IO[Unit] = + info(2)(s"Copied: ${bucket.name}:${sourceKey.key} => ${targetKey.key}") def logDeleteStart(bucket: Bucket, remoteKey: RemoteKey) - (implicit info: Int => String => Unit): IO[Unit] = - IO { - info(4)(s"Delete: ${bucket.name}:${remoteKey.key}") - } + (implicit info: Int => String => IO[Unit]): IO[Unit] = + info(1)(s"Delete: ${bucket.name}:${remoteKey.key}") def logDeleteFinish(bucket: Bucket, remoteKey: RemoteKey) - (implicit info: Int => String => Unit): IO[Unit] = - IO { - info(3)(s"Deleted: ${bucket.name}:${remoteKey.key}") - } + (implicit info: Int => String => IO[Unit]): IO[Unit] = + info(2)(s"Deleted: ${bucket.name}:${remoteKey.key}") } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientObjectLister.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientObjectLister.scala index f901425..2cc70d8 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientObjectLister.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientObjectLister.scala @@ -14,22 +14,18 @@ class S3ClientObjectLister(amazonS3: AmazonS3) { def listObjects(bucket: Bucket, prefix: RemoteKey) - (implicit info: Int => String => Unit): IO[S3ObjectsData] = { + (implicit info: Int => String => IO[Unit]): IO[S3ObjectsData] = { type Token = String type Batch = (Stream[S3ObjectSummary], Option[Token]) - val requestInitial = new ListObjectsV2Request() - .withBucketName(bucket.name) - .withPrefix(prefix.key) - val requestMore = (token:Token) => new ListObjectsV2Request() .withBucketName(bucket.name) .withPrefix(prefix.key) .withContinuationToken(token) def fetchBatch: ListObjectsV2Request => IO[Batch] = - request => IO{ + request => IO { val result = amazonS3.listObjectsV2(request) val more: Option[Token] = if (result.isTruncated) Some(result.getNextContinuationToken) @@ -37,21 +33,28 @@ class S3ClientObjectLister(amazonS3: AmazonS3) { (result.getObjectSummaries.asScala.toStream, more) } - def fetchAll: ListObjectsV2Request => IO[Stream[S3ObjectSummary]] = + def fetch: ListObjectsV2Request => IO[Stream[S3ObjectSummary]] = request => for { batch <- fetchBatch(request) (summaries, more) = batch rest <- more match { case None => IO{Stream()} - case Some(token) => fetchAll(requestMore(token)) + case Some(token) => fetch(requestMore(token)) } } yield summaries ++ rest - fetchAll(requestInitial) - .bracket( - logListObjectsStart(bucket, prefix))( - logListObjectsFinish(bucket,prefix)) + IO { + new ListObjectsV2Request() + .withBucketName(bucket.name) + .withPrefix(prefix.key) + }.bracket { + request => + for { + _ <- logListObjectsStart(bucket, prefix) + summaries <- fetch(request) + } yield summaries + }(_ => logListObjectsFinish(bucket,prefix)) .map(os => S3ObjectsData(byHash(os), byKey(os))) } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientTransferManager.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientTransferManager.scala deleted file mode 100644 index 1fafc12..0000000 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientTransferManager.scala +++ /dev/null @@ -1,37 +0,0 @@ -package net.kemitix.s3thorp.aws.lib - -import cats.effect.IO -import com.amazonaws.services.s3.model.PutObjectRequest -import com.amazonaws.services.s3.transfer.TransferManager -import net.kemitix.s3thorp.aws.api.S3Action.UploadS3Action -import net.kemitix.s3thorp.aws.api.{S3Action, UploadProgressListener} -import net.kemitix.s3thorp.aws.lib.S3ClientTransferManagerLogging.{logMultiPartUploadFinished, logMultiPartUploadStart} -import net.kemitix.s3thorp.domain.{Bucket, LocalFile, MD5Hash, RemoteKey} - -class S3ClientTransferManager(transferManager: => TransferManager) - extends S3ClientUploader { - - def accepts(localFile: LocalFile) - (implicit multiPartThreshold: Long): Boolean = - localFile.file.length >= multiPartThreshold - - override - def upload(localFile: LocalFile, - bucket: Bucket, - uploadProgressListener: UploadProgressListener, - multiPartThreshold: Long, - tryCount: Int, - maxRetries: Int) - (implicit info: Int => String => Unit, - warn: String => Unit): IO[S3Action] = { - val putObjectRequest: PutObjectRequest = - new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file) - .withGeneralProgressListener(progressListener(uploadProgressListener)) - for { - _ <- logMultiPartUploadStart(localFile, tryCount) - upload = transferManager.upload(putObjectRequest) - result <- IO{upload.waitForUploadResult} - _ <- logMultiPartUploadFinished(localFile) - } yield UploadS3Action(RemoteKey(result.getKey), MD5Hash(result.getETag)) - } -} diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientTransferManagerLogging.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientTransferManagerLogging.scala deleted file mode 100644 index 2f7a823..0000000 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientTransferManagerLogging.scala +++ /dev/null @@ -1,77 +0,0 @@ -package net.kemitix.s3thorp.aws.lib - -import cats.effect.IO -import com.amazonaws.services.s3.model.{AmazonS3Exception, InitiateMultipartUploadResult, UploadPartRequest, UploadPartResult} -import net.kemitix.s3thorp.domain.{LocalFile, MD5Hash} - -object S3ClientTransferManagerLogging { - - private val prefix = "transfer-manager" - - def logMultiPartUploadStart(localFile: LocalFile, - tryCount: Int) - (implicit info: Int => String => Unit): IO[Unit] = - IO{info(1)(s"$prefix:upload:try $tryCount: ${localFile.remoteKey.key}")} - - def logMultiPartUploadFinished(localFile: LocalFile) - (implicit info: Int => String => Unit): IO[Unit] = - IO{info(4)(s"$prefix:upload:finished: ${localFile.remoteKey.key}")} - - def logMultiPartUploadInitiate(localFile: LocalFile) - (implicit info: Int => String => Unit): Unit = - info(5)(s"$prefix:initiating: ${localFile.remoteKey.key}") - - def logMultiPartUploadPartsDetails(localFile: LocalFile, - nParts: Int, - partSize: Long) - (implicit info: Int => String => Unit): Unit = - info(5)(s"$prefix:parts $nParts:each $partSize: ${localFile.remoteKey.key}") - - def logMultiPartUploadPartDetails(localFile: LocalFile, - partNumber: Int, - partHash: MD5Hash) - (implicit info: Int => String => Unit): Unit = - info(5)(s"$prefix:part $partNumber:hash ${partHash.hash}: ${localFile.remoteKey.key}") - - def logMultiPartUploadPart(localFile: LocalFile, - partRequest: UploadPartRequest) - (implicit info: Int => String => Unit): Unit = - info(5)(s"$prefix:sending:part ${partRequest.getPartNumber}: ${partRequest.getMd5Digest}: ${localFile.remoteKey.key}") - - def logMultiPartUploadPartDone(localFile: LocalFile, - partRequest: UploadPartRequest, - result: UploadPartResult) - (implicit info: Int => String => Unit): Unit = - info(5)(s"$prefix:sent:part ${partRequest.getPartNumber}: ${result.getPartETag}: ${localFile.remoteKey.key}") - - def logMultiPartUploadPartError(localFile: LocalFile, - partRequest: UploadPartRequest, - error: AmazonS3Exception) - (implicit warn: String => Unit): Unit = { - val returnedMD5Hash = error.getAdditionalDetails.get("Content-MD5") - warn(s"$prefix:error:part ${partRequest.getPartNumber}:ret-hash $returnedMD5Hash: ${localFile.remoteKey.key}") - } - - def logMultiPartUploadCompleted(createUploadResponse: InitiateMultipartUploadResult, - uploadPartResponses: Stream[UploadPartResult], - localFile: LocalFile) - (implicit info: Int => String => Unit): Unit = - info(1)(s"$prefix:completed:parts ${uploadPartResponses.size}: ${localFile.remoteKey.key}") - - def logMultiPartUploadCancelling(localFile: LocalFile) - (implicit warn: String => Unit): Unit = - warn(s"$prefix:cancelling: ${localFile.remoteKey.key}") - - def logErrorRetrying(e: Throwable, localFile: LocalFile, tryCount: Int) - (implicit warn: String => Unit): Unit = - warn(s"$prefix:retry:error ${e.getMessage}: ${localFile.remoteKey.key}") - - def logErrorCancelling(e: Throwable, localFile: LocalFile) - (implicit error: String => Unit) : Unit = - error(s"$prefix:cancelling:error ${e.getMessage}: ${localFile.remoteKey.key}") - - def logErrorUnknown(e: Throwable, localFile: LocalFile) - (implicit error: String => Unit): Unit = - error(s"$prefix:unknown:error $e: ${localFile.remoteKey.key}") - -} diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientUploader.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientUploader.scala deleted file mode 100644 index 0dde250..0000000 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientUploader.scala +++ /dev/null @@ -1,35 +0,0 @@ -package net.kemitix.s3thorp.aws.lib - -import cats.effect.IO -import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener} -import net.kemitix.s3thorp.aws.api.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent} -import net.kemitix.s3thorp.aws.api.{S3Action, UploadProgressListener} -import net.kemitix.s3thorp.domain.{Bucket, LocalFile} - -trait S3ClientUploader { - - def accepts(localFile: LocalFile) - (implicit multiPartThreshold: Long): Boolean - - def upload(localFile: LocalFile, - bucket: Bucket, - progressListener: UploadProgressListener, - multiPartThreshold: Long, - tryCount: Int, - maxRetries: Int) - (implicit info: Int => String => Unit, - warn: String => Unit): IO[S3Action] - - def progressListener(uploadProgressListener: UploadProgressListener): ProgressListener = { - new ProgressListener { - override def progressChanged(event: ProgressEvent): Unit = { - event match { - case e if e.getEventType.isTransferEvent => TransferEvent(e.getEventType.name) - case e if e.getEventType equals ProgressEventType.RESPONSE_BYTE_TRANSFER_EVENT => ByteTransferEvent(e.getEventType.name) - case e => RequestEvent(e.getEventType.name, e.getBytes, e.getBytesTransferred) - } - } - } - } - -} diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ObjectsByKey.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ObjectsByKey.scala index a26aeb2..0ed6a7c 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ObjectsByKey.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ObjectsByKey.scala @@ -2,14 +2,13 @@ package net.kemitix.s3thorp.aws.lib import com.amazonaws.services.s3.model.S3ObjectSummary import net.kemitix.s3thorp.domain.{HashModified, LastModified, MD5Hash, RemoteKey} -import net.kemitix.s3thorp.core.QuoteStripper.stripQuotes object S3ObjectsByKey { def byKey(os: Stream[S3ObjectSummary]) = os.map { o => { val remoteKey = RemoteKey(o.getKey) - val hash = MD5Hash(o.getETag filter stripQuotes) + val hash = MD5Hash(o.getETag) val lastModified = LastModified(o.getLastModified.toInstant) (remoteKey, HashModified(hash, lastModified)) }}.toMap diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/ThorpS3Client.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/ThorpS3Client.scala index 3c569d2..fed8353 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/ThorpS3Client.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/ThorpS3Client.scala @@ -13,19 +13,19 @@ class ThorpS3Client(amazonS3Client: => AmazonS3, lazy val objectLister = new S3ClientObjectLister(amazonS3Client) lazy val copier = new S3ClientCopier(amazonS3Client) - lazy val uploader = new S3ClientTransferManager(amazonS3TransferManager) + lazy val uploader = new Uploader(amazonS3TransferManager) lazy val deleter = new S3ClientDeleter(amazonS3Client) override def listObjects(bucket: Bucket, prefix: RemoteKey) - (implicit info: Int => String => Unit): IO[S3ObjectsData] = + (implicit info: Int => String => IO[Unit]): IO[S3ObjectsData] = objectLister.listObjects(bucket, prefix) override def copy(bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey) - (implicit info: Int => String => Unit): IO[CopyS3Action] = + (implicit info: Int => String => IO[Unit]): IO[CopyS3Action] = copier.copy(bucket, sourceKey,hash, targetKey) override def upload(localFile: LocalFile, @@ -34,13 +34,13 @@ class ThorpS3Client(amazonS3Client: => AmazonS3, multiPartThreshold: Long, tryCount: Int, maxRetries: Int) - (implicit info: Int => String => Unit, - warn: String => Unit): IO[S3Action] = + (implicit info: Int => String => IO[Unit], + warn: String => IO[Unit]): IO[S3Action] = uploader.upload(localFile, bucket, progressListener, multiPartThreshold, 1, maxRetries) override def delete(bucket: Bucket, remoteKey: RemoteKey) - (implicit info: Int => String => Unit): IO[DeleteS3Action] = + (implicit info: Int => String => IO[Unit]): IO[DeleteS3Action] = deleter.delete(bucket, remoteKey) } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/Uploader.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/Uploader.scala new file mode 100644 index 0000000..eeba3a2 --- /dev/null +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/Uploader.scala @@ -0,0 +1,64 @@ +package net.kemitix.s3thorp.aws.lib + +import cats.effect.IO +import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener} +import com.amazonaws.services.s3.model.PutObjectRequest +import com.amazonaws.services.s3.transfer.{TransferManager => AmazonTransferManager} +import net.kemitix.s3thorp.aws.api.S3Action.UploadS3Action +import net.kemitix.s3thorp.aws.api.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent} +import net.kemitix.s3thorp.aws.api.{S3Action, UploadProgressListener} +import net.kemitix.s3thorp.aws.lib.UploaderLogging.{logMultiPartUploadFinished, logMultiPartUploadStart} +import net.kemitix.s3thorp.domain.{Bucket, LocalFile, MD5Hash, RemoteKey} + +class Uploader(transferManager: => AmazonTransferManager) { + + def accepts(localFile: LocalFile) + (implicit multiPartThreshold: Long): Boolean = + localFile.file.length >= multiPartThreshold + + def upload(localFile: LocalFile, + bucket: Bucket, + uploadProgressListener: UploadProgressListener, + multiPartThreshold: Long, + tryCount: Int, + maxRetries: Int) + (implicit info: Int => String => IO[Unit], + warn: String => IO[Unit]): IO[S3Action] = { + for { + _ <- logMultiPartUploadStart(localFile, tryCount) + listener = progressListener(uploadProgressListener) + putObjectRequest = request(localFile, bucket, listener) + upload = transferManager.upload(putObjectRequest) + result <- IO{upload.waitForUploadResult} + _ <- logMultiPartUploadFinished(localFile) + } yield UploadS3Action(RemoteKey(result.getKey), MD5Hash(result.getETag)) + } + + private def request(localFile: LocalFile, bucket: Bucket, listener: ProgressListener): PutObjectRequest = { + new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file) + .withGeneralProgressListener(listener) + } + + private def progressListener(uploadProgressListener: UploadProgressListener) = + new ProgressListener { + override def progressChanged(progressEvent: ProgressEvent): Unit = { + uploadProgressListener.listener( + progressEvent match { + case e: ProgressEvent if isTransfer(e) => + TransferEvent(e.getEventType.name) + case e: ProgressEvent if isByteTransfer(e) => + ByteTransferEvent(e.getEventType.name) + case e: ProgressEvent => + RequestEvent(e.getEventType.name, e.getBytes, e.getBytesTransferred) + }) + .unsafeRunSync // the listener doesn't execute otherwise as it is never returned + } + } + + private def isTransfer(e: ProgressEvent) = + e.getEventType.isTransferEvent + + private def isByteTransfer(e: ProgressEvent) = + e.getEventType equals ProgressEventType.RESPONSE_BYTE_TRANSFER_EVENT + +} diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/UploaderLogging.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/UploaderLogging.scala new file mode 100644 index 0000000..645d138 --- /dev/null +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/UploaderLogging.scala @@ -0,0 +1,22 @@ +package net.kemitix.s3thorp.aws.lib + +import cats.effect.IO +import net.kemitix.s3thorp.domain.Terminal.clearLine +import net.kemitix.s3thorp.domain.SizeTranslation.sizeInEnglish +import net.kemitix.s3thorp.domain.LocalFile + +object UploaderLogging { + + def logMultiPartUploadStart(localFile: LocalFile, + tryCount: Int) + (implicit info: Int => String => IO[Unit]): IO[Unit] = { + val tryMessage = if (tryCount == 1) "" else s"try $tryCount" + val size = sizeInEnglish(localFile.file.length) + info(1)(s"${clearLine}upload:$tryMessage:$size:${localFile.remoteKey.key}") + } + + def logMultiPartUploadFinished(localFile: LocalFile) + (implicit info: Int => String => IO[Unit]): IO[Unit] = + info(4)(s"upload:finished: ${localFile.remoteKey.key}") + +} diff --git a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ClientSuite.scala b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ClientSuite.scala index 568abca..d6bba63 100644 --- a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ClientSuite.scala +++ b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ClientSuite.scala @@ -2,6 +2,7 @@ package net.kemitix.s3thorp.aws.lib import java.time.Instant +import cats.effect.IO import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.PutObjectRequest import com.amazonaws.services.s3.transfer.model.UploadResult @@ -22,8 +23,8 @@ class S3ClientSuite private val prefix = RemoteKey("prefix") implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) - implicit private val logInfo: Int => String => Unit = l => m => () - implicit private val logWarn: String => Unit = w => () + implicit private val logInfo: Int => String => IO[Unit] = _ => _ => IO.unit + implicit private val logWarn: String => IO[Unit] = _ => IO.unit private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _ describe("getS3Status") { diff --git a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/ThorpS3ClientSuite.scala b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/ThorpS3ClientSuite.scala index e6a2cb0..9214bdc 100644 --- a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/ThorpS3ClientSuite.scala +++ b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/ThorpS3ClientSuite.scala @@ -3,6 +3,7 @@ package net.kemitix.s3thorp.aws.lib import java.time.Instant import java.util.Date +import cats.effect.IO import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.{ListObjectsV2Request, ListObjectsV2Result, S3ObjectSummary} import com.amazonaws.services.s3.transfer.TransferManager @@ -19,7 +20,7 @@ class ThorpS3ClientSuite val source = Resource(this, "upload") val prefix = RemoteKey("prefix") implicit val config: Config = Config(Bucket("bucket"), prefix, source = source) - implicit val logInfo: Int => String => Unit = l => m => () + implicit val logInfo: Int => String => IO[Unit] = _ => _ => IO.unit val lm = LastModified(Instant.now) diff --git a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ClientTransferManagerSuite.scala b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/UploaderSuite.scala similarity index 90% rename from aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ClientTransferManagerSuite.scala rename to aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/UploaderSuite.scala index 9b5fc1f..7add7ca 100644 --- a/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/S3ClientTransferManagerSuite.scala +++ b/aws-lib/src/test/scala/net/kemitix/s3thorp/aws/lib/UploaderSuite.scala @@ -2,6 +2,7 @@ package net.kemitix.s3thorp.aws.lib import java.time.Instant +import cats.effect.IO import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.transfer._ import net.kemitix.s3thorp.aws.api.S3Action.UploadS3Action @@ -12,22 +13,22 @@ import net.kemitix.s3thorp.domain._ import org.scalamock.scalatest.MockFactory import org.scalatest.FunSpec -class S3ClientTransferManagerSuite +class UploaderSuite extends FunSpec with MockFactory { private val source = Resource(this, ".") private val prefix = RemoteKey("prefix") implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) - implicit private val logInfo: Int => String => Unit = l => m => () - implicit private val logWarn: String => Unit = w => () + implicit private val logInfo: Int => String => IO[Unit] = _ => _ => IO.unit + implicit private val logWarn: String => IO[Unit] = _ => IO.unit private val fileToKey = generateKey(config.source, config.prefix) _ val lastModified = LastModified(Instant.now()) describe("S3ClientMultiPartTransferManagerSuite") { describe("accepts") { val transferManager = stub[TransferManager] - val uploader = new S3ClientTransferManager(transferManager) + val uploader = new Uploader(transferManager) describe("small-file") { val smallFile = LocalFile.resolve("small-file", MD5Hash("the-hash"), source, fileToKey) it("should be a small-file") { @@ -59,7 +60,7 @@ class S3ClientTransferManagerSuite val progressListener = new UploadProgressListener(bigFile) val amazonS3 = mock[AmazonS3] val amazonS3TransferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build - val uploader = new S3ClientTransferManager(amazonS3TransferManager) + val uploader = new Uploader(amazonS3TransferManager) it("should upload") { val expected = UploadS3Action(returnedKey, returnedHash) val result = uploader.upload(bigFile, config.bucket, progressListener, config.multiPartThreshold, 1, config.maxRetries).unsafeRunSync diff --git a/build.sbt b/build.sbt index 59111b2..b8fb0c7 100644 --- a/build.sbt +++ b/build.sbt @@ -22,12 +22,6 @@ val awsSdkDependencies = Seq( "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % "2.9.9" ) ) -val loggingSettings = Seq( - libraryDependencies ++= Seq( - "com.typesafe.scala-logging" %% "scala-logging" % "3.9.2", - "org.slf4j" % "slf4j-log4j12" % "1.7.26", - ) -) val catsEffectsSettings = Seq( libraryDependencies ++= Seq( "org.typelevel" %% "cats-effect" % "1.3.1" @@ -47,7 +41,6 @@ val catsEffectsSettings = Seq( lazy val cli = (project in file("cli")) .settings(applicationSettings) .aggregate(`aws-lib`, core, `aws-api`, domain) - .settings(loggingSettings) .settings(commandLineParsing) .dependsOn(`aws-lib`) diff --git a/cli/src/main/resources/log4j.xml b/cli/src/main/resources/log4j.xml deleted file mode 100644 index d0564b2..0000000 --- a/cli/src/main/resources/log4j.xml +++ /dev/null @@ -1,21 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/cli/src/main/scala/net/kemitix/s3thorp/cli/Logger.scala b/cli/src/main/scala/net/kemitix/s3thorp/cli/Logger.scala index fae0a48..0ea2a5e 100644 --- a/cli/src/main/scala/net/kemitix/s3thorp/cli/Logger.scala +++ b/cli/src/main/scala/net/kemitix/s3thorp/cli/Logger.scala @@ -1,14 +1,15 @@ package net.kemitix.s3thorp.cli -import com.typesafe.scalalogging.LazyLogging -import net.kemitix.s3thorp.domain.Config +import cats.effect.IO -class Logger(verbosity: Int) extends LazyLogging { +class Logger(verbosity: Int) { - def info(level: Int)(message: String): Unit = if (verbosity >= level) logger.info(s"1:$message") + def info(level: Int)(message: String): IO[Unit] = + if (verbosity >= level) IO(println(s"[INFO:$level] $message")) + else IO.unit - def warn(message: String): Unit = logger.warn(message) + def warn(message: String): IO[Unit] = IO(println(s"[ WARN] $message")) - def error(message: String): Unit = logger.error(message) + def error(message: String): IO[Unit] = IO(println(s"[ ERROR] $message")) } diff --git a/cli/src/main/scala/net/kemitix/s3thorp/cli/Main.scala b/cli/src/main/scala/net/kemitix/s3thorp/cli/Main.scala index 1fe0650..4b93814 100644 --- a/cli/src/main/scala/net/kemitix/s3thorp/cli/Main.scala +++ b/cli/src/main/scala/net/kemitix/s3thorp/cli/Main.scala @@ -21,7 +21,7 @@ object Main extends IOApp { logger = new Logger(config.verbose) info = (l: Int) => (m: String) => logger.info(l)(m) md5HashGenerator = (file: File) => md5File(file)(info) - _ <- IO(logger.info(1)("S3Thorp - hashed sync for s3")) + _ <- logger.info(1)("S3Thorp - hashed sync for s3") _ <- Sync.run( S3ClientBuilder.defaultClient, md5HashGenerator, @@ -34,9 +34,9 @@ object Main extends IOApp { val logger = new Logger(1) program(args) .guaranteeCase { - case Canceled => IO(logger.warn("Interrupted")) - case Error(e) => IO(logger.error(e.getMessage)) - case Completed => IO(logger.info(1)("Done")) + case Canceled => logger.warn("Interrupted") + case Error(e) => logger.error(e.getMessage) + case Completed => logger.info(1)("Done") } } diff --git a/core/src/main/scala/net.kemitix.s3thorp.core/ActionSubmitter.scala b/core/src/main/scala/net.kemitix.s3thorp.core/ActionSubmitter.scala index 08a9fbd..f588f90 100644 --- a/core/src/main/scala/net.kemitix.s3thorp.core/ActionSubmitter.scala +++ b/core/src/main/scala/net.kemitix.s3thorp.core/ActionSubmitter.scala @@ -10,22 +10,28 @@ object ActionSubmitter { def submitAction(s3Client: S3Client, action: Action) (implicit c: Config, - info: Int => String => Unit, - warn: String => Unit): Stream[IO[S3Action]] = { + info: Int => String => IO[Unit], + warn: String => IO[Unit]): Stream[IO[S3Action]] = { Stream( action match { case ToUpload(bucket, localFile) => - info(4)(s" Upload: ${localFile.relative}") - val progressListener = new UploadProgressListener(localFile) - s3Client.upload(localFile, bucket, progressListener, c.multiPartThreshold, 1, c.maxRetries) + for { + _ <- info(4) (s" Upload: ${localFile.relative}") + progressListener = new UploadProgressListener(localFile) + action <- s3Client.upload(localFile, bucket, progressListener, c.multiPartThreshold, 1, c.maxRetries) + } yield action case ToCopy(bucket, sourceKey, hash, targetKey) => - info(4)(s" Copy: ${sourceKey.key} => ${targetKey.key}") - s3Client.copy(bucket, sourceKey, hash, targetKey) + for { + _ <- info(4)(s" Copy: ${sourceKey.key} => ${targetKey.key}") + action <- s3Client.copy(bucket, sourceKey, hash, targetKey) + } yield action case ToDelete(bucket, remoteKey) => - info(4)(s" Delete: ${remoteKey.key}") - s3Client.delete(bucket, remoteKey) - case DoNothing(bucket, remoteKey) => IO { - DoNothingS3Action(remoteKey)} + for { + _ <- info(4)(s" Delete: ${remoteKey.key}") + action <- s3Client.delete(bucket, remoteKey) + } yield action + case DoNothing(bucket, remoteKey) => + IO.pure(DoNothingS3Action(remoteKey)) }) } } diff --git a/core/src/main/scala/net.kemitix.s3thorp.core/LocalFileStream.scala b/core/src/main/scala/net.kemitix.s3thorp.core/LocalFileStream.scala index e931ac7..ece56c4 100644 --- a/core/src/main/scala/net.kemitix.s3thorp.core/LocalFileStream.scala +++ b/core/src/main/scala/net.kemitix.s3thorp.core/LocalFileStream.scala @@ -11,7 +11,7 @@ object LocalFileStream { def findFiles(file: File, md5HashGenerator: File => IO[MD5Hash], - info: Int => String => Unit) + info: Int => String => IO[Unit]) (implicit c: Config): IO[Stream[LocalFile]] = { val filters: Path => Boolean = Filter.isIncluded(c.filters) diff --git a/core/src/main/scala/net.kemitix.s3thorp.core/MD5HashGenerator.scala b/core/src/main/scala/net.kemitix.s3thorp.core/MD5HashGenerator.scala index 3c18b19..8095cbe 100644 --- a/core/src/main/scala/net.kemitix.s3thorp.core/MD5HashGenerator.scala +++ b/core/src/main/scala/net.kemitix.s3thorp.core/MD5HashGenerator.scala @@ -9,13 +9,13 @@ import net.kemitix.s3thorp.domain.MD5Hash object MD5HashGenerator { def md5File(file: File) - (implicit info: Int => String => Unit): IO[MD5Hash] = + (implicit info: Int => String => IO[Unit]): IO[MD5Hash] = md5FilePart(file, 0, file.length) def md5FilePart(file: File, offset: Long, size: Long) - (implicit info: Int => String => Unit): IO[MD5Hash] = { + (implicit info: Int => String => IO[Unit]): IO[MD5Hash] = { val buffer = new Array[Byte](size.toInt) def readIntoBuffer = { @@ -34,10 +34,10 @@ object MD5HashGenerator { def readFile = openFile.bracket(readIntoBuffer)(closeFile) for { - _ <- IO(info(5)(s"md5:reading:offset $offset:size $size:$file")) + _ <- info(5)(s"md5:reading:offset $offset:size $size:$file") _ <- readFile hash = md5PartBody(buffer) - _ <- IO (info(5)(s"md5:generated:${hash.hash}")) + _ <- info(4)(s"md5:generated:${hash.hash}:$file") } yield hash } diff --git a/core/src/main/scala/net.kemitix.s3thorp.core/Sync.scala b/core/src/main/scala/net.kemitix.s3thorp.core/Sync.scala index 1d8a665..6dea7f2 100644 --- a/core/src/main/scala/net.kemitix.s3thorp.core/Sync.scala +++ b/core/src/main/scala/net.kemitix.s3thorp.core/Sync.scala @@ -17,9 +17,9 @@ object Sync { def run(s3Client: S3Client, md5HashGenerator: File => IO[MD5Hash], - info: Int => String => Unit, - warn: String => Unit, - error: String => Unit) + info: Int => String => IO[Unit], + warn: String => IO[Unit], + error: String => IO[Unit]) (implicit c: Config): IO[Unit] = { def copyUploadActions(s3Data: S3ObjectsData): IO[Stream[S3Action]] = diff --git a/core/src/main/scala/net.kemitix.s3thorp.core/SyncLogging.scala b/core/src/main/scala/net.kemitix.s3thorp.core/SyncLogging.scala index 1b1137b..2b926e9 100644 --- a/core/src/main/scala/net.kemitix.s3thorp.core/SyncLogging.scala +++ b/core/src/main/scala/net.kemitix.s3thorp.core/SyncLogging.scala @@ -8,29 +8,33 @@ import net.kemitix.s3thorp.domain.Config // Logging for the Sync class object SyncLogging { - def logRunStart[F[_]](info: Int => String => Unit)(implicit c: Config): IO[Unit] = IO { - info(1)(s"Bucket: ${c.bucket.name}, Prefix: ${c.prefix.key}, Source: ${c.source}, ")} + def logRunStart[F[_]](info: Int => String => IO[Unit]) + (implicit c: Config): IO[Unit] = + info(1)(s"Bucket: ${c.bucket.name}, Prefix: ${c.prefix.key}, Source: ${c.source}, ") - def logFileScan(info: Int => String => Unit)(implicit c: Config): IO[Unit] = IO{ - info(1)(s"Scanning local files: ${c.source}...")} + def logFileScan(info: Int => String => IO[Unit]) + (implicit c: Config): IO[Unit] = + info(1)(s"Scanning local files: ${c.source}...") def logRunFinished(actions: Stream[S3Action], - info: Int => String => Unit) - (implicit c: Config): IO[Unit] = IO { - val counters = actions.foldLeft(Counters())(countActivities) - info(1)(s"Uploaded ${counters.uploaded} files") - info(1)(s"Copied ${counters.copied} files") - info(1)(s"Deleted ${counters.deleted} files") - } + info: Int => String => IO[Unit]) + (implicit c: Config): IO[Unit] = + for { + _ <- IO.unit + counters = actions.foldLeft(Counters())(countActivities) + _ <- info(1)(s"Uploaded ${counters.uploaded} files") + _ <- info(1)(s"Copied ${counters.copied} files") + _ <- info(1)(s"Deleted ${counters.deleted} files") + } yield () private def countActivities(implicit c: Config): (Counters, S3Action) => Counters = (counters: Counters, s3Action: S3Action) => { s3Action match { - case UploadS3Action(remoteKey, _) => + case _: UploadS3Action => counters.copy(uploaded = counters.uploaded + 1) - case CopyS3Action(remoteKey) => + case _: CopyS3Action => counters.copy(copied = counters.copied + 1) - case DeleteS3Action(remoteKey) => + case _: DeleteS3Action => counters.copy(deleted = counters.deleted + 1) case _ => counters } diff --git a/core/src/test/scala/net/kemitix/s3thorp/core/LocalFileStreamSuite.scala b/core/src/test/scala/net/kemitix/s3thorp/core/LocalFileStreamSuite.scala index fe16c52..76a2ddc 100644 --- a/core/src/test/scala/net/kemitix/s3thorp/core/LocalFileStreamSuite.scala +++ b/core/src/test/scala/net/kemitix/s3thorp/core/LocalFileStreamSuite.scala @@ -10,7 +10,7 @@ class LocalFileStreamSuite extends FunSpec { val uploadResource = Resource(this, "upload") val config: Config = Config(source = uploadResource) - implicit private val logInfo: Int => String => Unit = l => i => () + implicit private val logInfo: Int => String => IO[Unit] = l => i => IO.unit val md5HashGenerator: File => IO[MD5Hash] = file => MD5HashGenerator.md5File(file) describe("findFiles") { diff --git a/core/src/test/scala/net/kemitix/s3thorp/core/MD5HashGeneratorTest.scala b/core/src/test/scala/net/kemitix/s3thorp/core/MD5HashGeneratorTest.scala index 3e071b8..439fdf0 100644 --- a/core/src/test/scala/net/kemitix/s3thorp/core/MD5HashGeneratorTest.scala +++ b/core/src/test/scala/net/kemitix/s3thorp/core/MD5HashGeneratorTest.scala @@ -2,6 +2,7 @@ package net.kemitix.s3thorp.core import java.nio.file.Files +import cats.effect.IO import net.kemitix.s3thorp.core.MD5HashData.rootHash import net.kemitix.s3thorp.domain.{Bucket, Config, MD5Hash, RemoteKey} import org.scalatest.FunSpec @@ -11,7 +12,7 @@ class MD5HashGeneratorTest extends FunSpec { private val source = Resource(this, "upload") private val prefix = RemoteKey("prefix") implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) - implicit private val logInfo: Int => String => Unit = l => i => () + implicit private val logInfo: Int => String => IO[Unit] = l => i => IO.unit describe("read a small file (smaller than buffer)") { val file = Resource(this, "upload/root-file") diff --git a/core/src/test/scala/net/kemitix/s3thorp/core/SyncSuite.scala b/core/src/test/scala/net/kemitix/s3thorp/core/SyncSuite.scala index 34cb296..732f8ee 100644 --- a/core/src/test/scala/net/kemitix/s3thorp/core/SyncSuite.scala +++ b/core/src/test/scala/net/kemitix/s3thorp/core/SyncSuite.scala @@ -17,9 +17,9 @@ class SyncSuite private val source = Resource(this, "upload") private val prefix = RemoteKey("prefix") implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) - implicit private val logInfo: Int => String => Unit = l => i => () - implicit private val logWarn: String => Unit = w => () - private def logError: String => Unit = e => () + implicit private val logInfo: Int => String => IO[Unit] = _ => _ => IO.unit + implicit private val logWarn: String => IO[Unit] = _ => IO.unit + private def logError: String => IO[Unit] = _ => IO.unit private val lastModified = LastModified(Instant.now) private val fileToKey: File => RemoteKey = KeyGenerator.generateKey(source, prefix) private val rootFile = LocalFile.resolve("root-file", rootHash, source, fileToKey) @@ -149,10 +149,8 @@ class SyncSuite override def listObjects(bucket: Bucket, prefix: RemoteKey) - (implicit info: Int => String => Unit) = - IO { - s3ObjectsData - } + (implicit info: Int => String => IO[Unit]) = + IO.pure(s3ObjectsData) override def upload(localFile: LocalFile, bucket: Bucket, @@ -160,8 +158,8 @@ class SyncSuite multiPartThreshold: Long, tryCount: Int, maxRetries: Int) - (implicit info: Int => String => Unit, - warn: String => Unit) = + (implicit info: Int => String => IO[Unit], + warn: String => IO[Unit]) = IO { if (bucket == testBucket) uploadsRecord += (localFile.relative.toString -> localFile.remoteKey) @@ -172,7 +170,7 @@ class SyncSuite sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey - )(implicit info: Int => String => Unit) = + )(implicit info: Int => String => IO[Unit]) = IO { if (bucket == testBucket) copiesRecord += (sourceKey -> targetKey) @@ -181,7 +179,7 @@ class SyncSuite override def delete(bucket: Bucket, remoteKey: RemoteKey - )(implicit info: Int => String => Unit) = + )(implicit info: Int => String => IO[Unit]) = IO { if (bucket == testBucket) deletionsRecord += remoteKey diff --git a/domain/src/main/scala/net/kemitix/s3thorp/domain/MD5Hash.scala b/domain/src/main/scala/net/kemitix/s3thorp/domain/MD5Hash.scala index 842a542..f37e201 100644 --- a/domain/src/main/scala/net/kemitix/s3thorp/domain/MD5Hash.scala +++ b/domain/src/main/scala/net/kemitix/s3thorp/domain/MD5Hash.scala @@ -1,7 +1,9 @@ package net.kemitix.s3thorp.domain -final case class MD5Hash(hash: String) { +import net.kemitix.s3thorp.domain.QuoteStripper.stripQuotes - require(!hash.contains("\"")) +final case class MD5Hash(in: String) { + + lazy val hash: String = in filter stripQuotes } diff --git a/core/src/main/scala/net.kemitix.s3thorp.core/QuoteStripper.scala b/domain/src/main/scala/net/kemitix/s3thorp/domain/QuoteStripper.scala similarity index 67% rename from core/src/main/scala/net.kemitix.s3thorp.core/QuoteStripper.scala rename to domain/src/main/scala/net/kemitix/s3thorp/domain/QuoteStripper.scala index 0f6d8f3..c432fba 100644 --- a/core/src/main/scala/net.kemitix.s3thorp.core/QuoteStripper.scala +++ b/domain/src/main/scala/net/kemitix/s3thorp/domain/QuoteStripper.scala @@ -1,4 +1,4 @@ -package net.kemitix.s3thorp.core +package net.kemitix.s3thorp.domain object QuoteStripper { diff --git a/domain/src/main/scala/net/kemitix/s3thorp/domain/SizeTranslation.scala b/domain/src/main/scala/net/kemitix/s3thorp/domain/SizeTranslation.scala new file mode 100644 index 0000000..ee21975 --- /dev/null +++ b/domain/src/main/scala/net/kemitix/s3thorp/domain/SizeTranslation.scala @@ -0,0 +1,14 @@ +package net.kemitix.s3thorp.domain + +object SizeTranslation { + + def sizeInEnglish(length: Long): String = + length match { + case bytes if bytes > 1024 * 1024 * 1024 => s"${bytes / 1024 / 1024 /1024}Gb" + case bytes if bytes > 1024 * 1024 => s"${bytes / 1024 / 1024}Mb" + case bytes if bytes > 1024 => s"${bytes / 1024}Kb" + case bytes => s"${length}b" + } + + +} diff --git a/domain/src/main/scala/net/kemitix/s3thorp/domain/Terminal.scala b/domain/src/main/scala/net/kemitix/s3thorp/domain/Terminal.scala new file mode 100644 index 0000000..b4b8984 --- /dev/null +++ b/domain/src/main/scala/net/kemitix/s3thorp/domain/Terminal.scala @@ -0,0 +1,28 @@ +package net.kemitix.s3thorp.domain + +object Terminal { + + /** + * Clears the whole terminal line. + */ + val clearLine = "\u001B[2K\r" + /** + * Moves the cursor up one line and back to the start of the line. + */ + val returnToPreviousLine = "\u001B[1A\r" + + /** + * The Width of the terminal, as reported by the COLUMNS environment variable. + * + * N.B. Not all environment will update this value when the terminal is resized. + * + * @return the number of columns in the terminal + */ + def width: Int = { + Option(System.getenv("COLUMNS")) + .map(_.toInt) + .map(Math.max(_, 10)) + .getOrElse(80) + } + +}