Improve upload logging (#44)

* [aws-lib] fold S3ClientUploader trait into it's only implementation

This trait was only implemented by S3ClientTransferManager.

* [core] SyncLogging: more robust matching

No longer cares about parameters to case classes, just their types.

* [cli] Logger uses IO for log methods

* [aws-lib] remove 'transfer-manager'prefix and only show tryCount > 1

* [sbt,cli] remove log4j and scala-logging dependencies

* [domain] move QuoteStripper to Domain

Use it directly in MD5Hash to strip quotes from any input.

* [core] SyncLogging call info in proper context

If the IO.unit returned by the info calls isn't part of the chain that
is returned from the function, then the delayed IO action is never
called.

* [aws-lib] Display size in bytes of file being uploaded

* [core] call info in correct context

* [cli] call info in correct context

* [aws-lib] raise summary fetch message to info 1

* [cli] include correct level in info messages

* [aws-lib] S3ClientLogging adjust logging levels

* [aws-lib] display file sizes in english

* [aws-lib] ObjectLister use IO.bracket properly

* [aws-lib] Copier use IO.bracket properly

* [aws-lib] Deleter refactor

* [aws-lib] TransferManagerLogging remove unused methods

* [aws-lib] TransferManager refactor

* [aws-lib] TransferManager refactor

* [aws-lib] TransferManager displays log messages

Use the UploadProgressListener that was being ignored, and use
unsafeRunSync to execute the suspended effect within the IO[Unit].
Using unsafeRunSync is required to render the effects as the listener
returns Unit, meaning the suspended effects would be discarded.

* [domain] Extract SizeTranslation into module

* [aws-api] report bytes transferred in progress

* [core] fix calls to info

info now returns an IO already, so don't need to wrap it in one.

* [aws-lib] remove unused class

* [aws-lib] UploadProgress displays progress bar while uploading

* [aws-api] UploadProgressLogging optimise imports

* [aws-api] UploadProgressLogging rename variables

* [domain] add Terminal object

* [aws-api] UploadProgressLogging use console width and two lines

- Improved clearing of lines after progress bar
- Use console width for progress bar size

* [aws-lib] S3ClientLogging optimise imports

* [aws-lib] TransferManager clear line before logging

* [aws-lib] rename class as TransferManager

* [aws-lib] rename TransferManger as Uploader to not clash

We are using an AWS SDK class with the same name.
This commit is contained in:
Paul Campbell 2019-06-10 19:45:36 +01:00 committed by GitHub
parent 44c66c042c
commit 97efed76b4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
34 changed files with 308 additions and 330 deletions

View file

@ -8,7 +8,7 @@ trait S3Client {
def listObjects(bucket: Bucket,
prefix: RemoteKey
)(implicit info: Int => String => Unit): IO[S3ObjectsData]
)(implicit info: Int => String => IO[Unit]): IO[S3ObjectsData]
def upload(localFile: LocalFile,
bucket: Bucket,
@ -16,17 +16,17 @@ trait S3Client {
multiPartThreshold: Long,
tryCount: Int,
maxRetries: Int)
(implicit info: Int => String => Unit,
warn: String => Unit): IO[S3Action]
(implicit info: Int => String => IO[Unit],
warn: String => IO[Unit]): IO[S3Action]
def copy(bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey
)(implicit info: Int => String => Unit): IO[CopyS3Action]
)(implicit info: Int => String => IO[Unit]): IO[CopyS3Action]
def delete(bucket: Bucket,
remoteKey: RemoteKey
)(implicit info: Int => String => Unit): IO[DeleteS3Action]
)(implicit info: Int => String => IO[Unit]): IO[DeleteS3Action]
}

View file

@ -1,16 +1,23 @@
package net.kemitix.s3thorp.aws.api
import cats.effect.IO
import net.kemitix.s3thorp.aws.api.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent}
import net.kemitix.s3thorp.domain.LocalFile
class UploadProgressListener(localFile: LocalFile)
(implicit info: Int => String => Unit)
(implicit info: Int => String => IO[Unit])
extends UploadProgressLogging {
def listener: UploadEvent => Unit =
var bytesTransferred = 0L
def listener: UploadEvent => IO[Unit] =
{
case e: TransferEvent => logTransfer(localFile, e)
case e: RequestEvent => logRequestCycle(localFile, e)
case e: RequestEvent => {
val transferred = e.transferred
bytesTransferred += transferred
logRequestCycle(localFile, e, bytesTransferred)
}
case e: ByteTransferEvent => logByteTransfer(e)
}
}

View file

@ -1,22 +1,43 @@
package net.kemitix.s3thorp.aws.api
import cats.effect.IO
import net.kemitix.s3thorp.aws.api.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent}
import net.kemitix.s3thorp.domain.LocalFile
import net.kemitix.s3thorp.domain.Terminal.{clearLine, returnToPreviousLine}
import net.kemitix.s3thorp.domain.{Terminal, LocalFile}
import net.kemitix.s3thorp.domain.SizeTranslation.sizeInEnglish
import scala.io.AnsiColor._
trait UploadProgressLogging {
def logTransfer(localFile: LocalFile,
event: TransferEvent)
(implicit info: Int => String => Unit): Unit =
info(2)(s"Transfer:${event.name}: ${localFile.remoteKey.key}")
(implicit info: Int => String => IO[Unit]): IO[Unit] =
info(2)(s"Transfer:${event.name}: ${localFile.remoteKey.key}")
private val oneHundredPercent = 100
def logRequestCycle(localFile: LocalFile,
event: RequestEvent)
(implicit info: Int => String => Unit): Unit =
info(3)(s"Uploading:${event.name}:${event.transferred}/${event.bytes}:${localFile.remoteKey.key}")
event: RequestEvent,
bytesTransferred: Long)
(implicit info: Int => String => IO[Unit]): IO[Unit] = {
val remoteKey = localFile.remoteKey.key
val fileLength = localFile.file.length
val consoleWidth = Terminal.width - 2
val done = ((bytesTransferred.toDouble / fileLength.toDouble) * consoleWidth).toInt
if (done < oneHundredPercent) {
val head = s"$GREEN_B$GREEN#$RESET" * done
val tail = " " * (consoleWidth - done)
val bar = s"[$head$tail]"
val transferred = sizeInEnglish(bytesTransferred)
val fileSize = sizeInEnglish(fileLength)
IO(print(s"${clearLine}Uploading $transferred of $fileSize : $remoteKey\n$bar$returnToPreviousLine"))
} else
IO(print(clearLine))
}
def logByteTransfer(event: ByteTransferEvent)
(implicit info: Int => String => Unit): Unit =
(implicit info: Int => String => IO[Unit]): IO[Unit] =
info(3)(".")
}

View file

@ -1,5 +0,0 @@
package net.kemitix.s3thorp.aws.lib
final case class CancellableMultiPartUpload(
e: Throwable,
uploadId: String) extends Exception(e)

View file

@ -13,17 +13,19 @@ class S3ClientCopier(amazonS3: AmazonS3) {
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey)
(implicit info: Int => String => Unit): IO[CopyS3Action] = {
val request =
(implicit info: Int => String => IO[Unit]): IO[CopyS3Action] = {
IO {
new CopyObjectRequest(
bucket.name, sourceKey.key,
bucket.name, targetKey.key)
.withMatchingETagConstraint(hash.hash)
IO {
amazonS3.copyObject(request)
}.bracket(
logCopyStart(bucket, sourceKey, targetKey))(
logCopyFinish(bucket, sourceKey,targetKey))
}.bracket {
request =>
for {
_ <- logCopyStart(bucket, sourceKey, targetKey)
result <- IO(amazonS3.copyObject(request))
} yield result
}(_ => logCopyFinish(bucket, sourceKey,targetKey))
.map(_ => CopyS3Action(targetKey))
}

View file

@ -11,12 +11,14 @@ class S3ClientDeleter(amazonS3: AmazonS3) {
def delete(bucket: Bucket,
remoteKey: RemoteKey)
(implicit info: Int => String => Unit): IO[DeleteS3Action] =
(implicit info: Int => String => IO[Unit]): IO[DeleteS3Action] =
for {
_ <- logDeleteStart(bucket, remoteKey)
request = new DeleteObjectRequest(bucket.name, remoteKey.key)
_ <- IO{amazonS3.deleteObject(request)}
_ <- deleteObject(bucket, remoteKey)
_ <- logDeleteFinish(bucket, remoteKey)
} yield DeleteS3Action(remoteKey)
private def deleteObject(bucket: Bucket, remoteKey: RemoteKey) = IO {
amazonS3.deleteObject(new DeleteObjectRequest(bucket.name, remoteKey.key))
}
}

View file

@ -1,70 +1,53 @@
package net.kemitix.s3thorp.aws.lib
import cats.effect.IO
import com.amazonaws.services.s3.model.{CopyObjectResult, DeleteObjectsResult, ListObjectsV2Result, PutObjectResult, S3ObjectSummary}
import com.amazonaws.services.s3.model.PutObjectResult
import net.kemitix.s3thorp.domain.{Bucket, LocalFile, RemoteKey}
object S3ClientLogging {
def logListObjectsStart(bucket: Bucket,
prefix: RemoteKey)
(implicit info: Int => String => Unit): Stream[S3ObjectSummary] => IO[Stream[S3ObjectSummary]] =
in => IO {
info(3)(s"Fetch S3 Summary: ${bucket.name}:${prefix.key}")
in
}
(implicit info: Int => String => IO[Unit]): IO[Unit] =
info(1)(s"Fetch S3 Summary: ${bucket.name}:${prefix.key}")
def logListObjectsFinish(bucket: Bucket,
prefix: RemoteKey)
(implicit info: Int => String => Unit): Stream[S3ObjectSummary] => IO[Unit] =
_ => IO {
info(2)(s"Fetched S3 Summary: ${bucket.name}:${prefix.key}")
}
(implicit info: Int => String => IO[Unit]): IO[Unit] =
info(2)(s"Fetched S3 Summary: ${bucket.name}:${prefix.key}")
def logUploadStart(localFile: LocalFile,
bucket: Bucket)
(implicit info: Int => String => Unit): PutObjectResult => IO[PutObjectResult] =
in => IO {
info(4)(s"Uploading: ${bucket.name}:${localFile.remoteKey.key}")
in
}
(implicit info: Int => String => IO[Unit]): PutObjectResult => IO[PutObjectResult] =
in => for {
_ <- info(1)(s"Uploading: ${bucket.name}:${localFile.remoteKey.key}")
} yield in
def logUploadFinish(localFile: LocalFile,
bucket: Bucket)
(implicit info: Int => String => Unit): PutObjectResult => IO[Unit] =
_ => IO {
info(1)(s"Uploaded: ${bucket.name}:${localFile.remoteKey.key}")
}
(implicit info: Int => String => IO[Unit]): PutObjectResult => IO[Unit] =
_ => info(2)(s"Uploaded: ${bucket.name}:${localFile.remoteKey.key}")
def logCopyStart(bucket: Bucket,
sourceKey: RemoteKey,
targetKey: RemoteKey)
(implicit info: Int => String => Unit): CopyObjectResult => IO[CopyObjectResult] =
in => IO {
info(4)(s"Copy: ${bucket.name}:${sourceKey.key} => ${targetKey.key}")
in
}
(implicit info: Int => String => IO[Unit]): IO[Unit] =
info(1)(s"Copy: ${bucket.name}:${sourceKey.key} => ${targetKey.key}")
def logCopyFinish(bucket: Bucket,
sourceKey: RemoteKey,
targetKey: RemoteKey)
(implicit info: Int => String => Unit): CopyObjectResult => IO[Unit] =
_ => IO {
info(3)(s"Copied: ${bucket.name}:${sourceKey.key} => ${targetKey.key}")
}
(implicit info: Int => String => IO[Unit]): IO[Unit] =
info(2)(s"Copied: ${bucket.name}:${sourceKey.key} => ${targetKey.key}")
def logDeleteStart(bucket: Bucket,
remoteKey: RemoteKey)
(implicit info: Int => String => Unit): IO[Unit] =
IO {
info(4)(s"Delete: ${bucket.name}:${remoteKey.key}")
}
(implicit info: Int => String => IO[Unit]): IO[Unit] =
info(1)(s"Delete: ${bucket.name}:${remoteKey.key}")
def logDeleteFinish(bucket: Bucket,
remoteKey: RemoteKey)
(implicit info: Int => String => Unit): IO[Unit] =
IO {
info(3)(s"Deleted: ${bucket.name}:${remoteKey.key}")
}
(implicit info: Int => String => IO[Unit]): IO[Unit] =
info(2)(s"Deleted: ${bucket.name}:${remoteKey.key}")
}

View file

@ -14,22 +14,18 @@ class S3ClientObjectLister(amazonS3: AmazonS3) {
def listObjects(bucket: Bucket,
prefix: RemoteKey)
(implicit info: Int => String => Unit): IO[S3ObjectsData] = {
(implicit info: Int => String => IO[Unit]): IO[S3ObjectsData] = {
type Token = String
type Batch = (Stream[S3ObjectSummary], Option[Token])
val requestInitial = new ListObjectsV2Request()
.withBucketName(bucket.name)
.withPrefix(prefix.key)
val requestMore = (token:Token) => new ListObjectsV2Request()
.withBucketName(bucket.name)
.withPrefix(prefix.key)
.withContinuationToken(token)
def fetchBatch: ListObjectsV2Request => IO[Batch] =
request => IO{
request => IO {
val result = amazonS3.listObjectsV2(request)
val more: Option[Token] =
if (result.isTruncated) Some(result.getNextContinuationToken)
@ -37,21 +33,28 @@ class S3ClientObjectLister(amazonS3: AmazonS3) {
(result.getObjectSummaries.asScala.toStream, more)
}
def fetchAll: ListObjectsV2Request => IO[Stream[S3ObjectSummary]] =
def fetch: ListObjectsV2Request => IO[Stream[S3ObjectSummary]] =
request =>
for {
batch <- fetchBatch(request)
(summaries, more) = batch
rest <- more match {
case None => IO{Stream()}
case Some(token) => fetchAll(requestMore(token))
case Some(token) => fetch(requestMore(token))
}
} yield summaries ++ rest
fetchAll(requestInitial)
.bracket(
logListObjectsStart(bucket, prefix))(
logListObjectsFinish(bucket,prefix))
IO {
new ListObjectsV2Request()
.withBucketName(bucket.name)
.withPrefix(prefix.key)
}.bracket {
request =>
for {
_ <- logListObjectsStart(bucket, prefix)
summaries <- fetch(request)
} yield summaries
}(_ => logListObjectsFinish(bucket,prefix))
.map(os => S3ObjectsData(byHash(os), byKey(os)))
}

View file

@ -1,37 +0,0 @@
package net.kemitix.s3thorp.aws.lib
import cats.effect.IO
import com.amazonaws.services.s3.model.PutObjectRequest
import com.amazonaws.services.s3.transfer.TransferManager
import net.kemitix.s3thorp.aws.api.S3Action.UploadS3Action
import net.kemitix.s3thorp.aws.api.{S3Action, UploadProgressListener}
import net.kemitix.s3thorp.aws.lib.S3ClientTransferManagerLogging.{logMultiPartUploadFinished, logMultiPartUploadStart}
import net.kemitix.s3thorp.domain.{Bucket, LocalFile, MD5Hash, RemoteKey}
class S3ClientTransferManager(transferManager: => TransferManager)
extends S3ClientUploader {
def accepts(localFile: LocalFile)
(implicit multiPartThreshold: Long): Boolean =
localFile.file.length >= multiPartThreshold
override
def upload(localFile: LocalFile,
bucket: Bucket,
uploadProgressListener: UploadProgressListener,
multiPartThreshold: Long,
tryCount: Int,
maxRetries: Int)
(implicit info: Int => String => Unit,
warn: String => Unit): IO[S3Action] = {
val putObjectRequest: PutObjectRequest =
new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file)
.withGeneralProgressListener(progressListener(uploadProgressListener))
for {
_ <- logMultiPartUploadStart(localFile, tryCount)
upload = transferManager.upload(putObjectRequest)
result <- IO{upload.waitForUploadResult}
_ <- logMultiPartUploadFinished(localFile)
} yield UploadS3Action(RemoteKey(result.getKey), MD5Hash(result.getETag))
}
}

View file

@ -1,77 +0,0 @@
package net.kemitix.s3thorp.aws.lib
import cats.effect.IO
import com.amazonaws.services.s3.model.{AmazonS3Exception, InitiateMultipartUploadResult, UploadPartRequest, UploadPartResult}
import net.kemitix.s3thorp.domain.{LocalFile, MD5Hash}
object S3ClientTransferManagerLogging {
private val prefix = "transfer-manager"
def logMultiPartUploadStart(localFile: LocalFile,
tryCount: Int)
(implicit info: Int => String => Unit): IO[Unit] =
IO{info(1)(s"$prefix:upload:try $tryCount: ${localFile.remoteKey.key}")}
def logMultiPartUploadFinished(localFile: LocalFile)
(implicit info: Int => String => Unit): IO[Unit] =
IO{info(4)(s"$prefix:upload:finished: ${localFile.remoteKey.key}")}
def logMultiPartUploadInitiate(localFile: LocalFile)
(implicit info: Int => String => Unit): Unit =
info(5)(s"$prefix:initiating: ${localFile.remoteKey.key}")
def logMultiPartUploadPartsDetails(localFile: LocalFile,
nParts: Int,
partSize: Long)
(implicit info: Int => String => Unit): Unit =
info(5)(s"$prefix:parts $nParts:each $partSize: ${localFile.remoteKey.key}")
def logMultiPartUploadPartDetails(localFile: LocalFile,
partNumber: Int,
partHash: MD5Hash)
(implicit info: Int => String => Unit): Unit =
info(5)(s"$prefix:part $partNumber:hash ${partHash.hash}: ${localFile.remoteKey.key}")
def logMultiPartUploadPart(localFile: LocalFile,
partRequest: UploadPartRequest)
(implicit info: Int => String => Unit): Unit =
info(5)(s"$prefix:sending:part ${partRequest.getPartNumber}: ${partRequest.getMd5Digest}: ${localFile.remoteKey.key}")
def logMultiPartUploadPartDone(localFile: LocalFile,
partRequest: UploadPartRequest,
result: UploadPartResult)
(implicit info: Int => String => Unit): Unit =
info(5)(s"$prefix:sent:part ${partRequest.getPartNumber}: ${result.getPartETag}: ${localFile.remoteKey.key}")
def logMultiPartUploadPartError(localFile: LocalFile,
partRequest: UploadPartRequest,
error: AmazonS3Exception)
(implicit warn: String => Unit): Unit = {
val returnedMD5Hash = error.getAdditionalDetails.get("Content-MD5")
warn(s"$prefix:error:part ${partRequest.getPartNumber}:ret-hash $returnedMD5Hash: ${localFile.remoteKey.key}")
}
def logMultiPartUploadCompleted(createUploadResponse: InitiateMultipartUploadResult,
uploadPartResponses: Stream[UploadPartResult],
localFile: LocalFile)
(implicit info: Int => String => Unit): Unit =
info(1)(s"$prefix:completed:parts ${uploadPartResponses.size}: ${localFile.remoteKey.key}")
def logMultiPartUploadCancelling(localFile: LocalFile)
(implicit warn: String => Unit): Unit =
warn(s"$prefix:cancelling: ${localFile.remoteKey.key}")
def logErrorRetrying(e: Throwable, localFile: LocalFile, tryCount: Int)
(implicit warn: String => Unit): Unit =
warn(s"$prefix:retry:error ${e.getMessage}: ${localFile.remoteKey.key}")
def logErrorCancelling(e: Throwable, localFile: LocalFile)
(implicit error: String => Unit) : Unit =
error(s"$prefix:cancelling:error ${e.getMessage}: ${localFile.remoteKey.key}")
def logErrorUnknown(e: Throwable, localFile: LocalFile)
(implicit error: String => Unit): Unit =
error(s"$prefix:unknown:error $e: ${localFile.remoteKey.key}")
}

View file

@ -1,35 +0,0 @@
package net.kemitix.s3thorp.aws.lib
import cats.effect.IO
import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener}
import net.kemitix.s3thorp.aws.api.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent}
import net.kemitix.s3thorp.aws.api.{S3Action, UploadProgressListener}
import net.kemitix.s3thorp.domain.{Bucket, LocalFile}
trait S3ClientUploader {
def accepts(localFile: LocalFile)
(implicit multiPartThreshold: Long): Boolean
def upload(localFile: LocalFile,
bucket: Bucket,
progressListener: UploadProgressListener,
multiPartThreshold: Long,
tryCount: Int,
maxRetries: Int)
(implicit info: Int => String => Unit,
warn: String => Unit): IO[S3Action]
def progressListener(uploadProgressListener: UploadProgressListener): ProgressListener = {
new ProgressListener {
override def progressChanged(event: ProgressEvent): Unit = {
event match {
case e if e.getEventType.isTransferEvent => TransferEvent(e.getEventType.name)
case e if e.getEventType equals ProgressEventType.RESPONSE_BYTE_TRANSFER_EVENT => ByteTransferEvent(e.getEventType.name)
case e => RequestEvent(e.getEventType.name, e.getBytes, e.getBytesTransferred)
}
}
}
}
}

View file

@ -2,14 +2,13 @@ package net.kemitix.s3thorp.aws.lib
import com.amazonaws.services.s3.model.S3ObjectSummary
import net.kemitix.s3thorp.domain.{HashModified, LastModified, MD5Hash, RemoteKey}
import net.kemitix.s3thorp.core.QuoteStripper.stripQuotes
object S3ObjectsByKey {
def byKey(os: Stream[S3ObjectSummary]) =
os.map { o => {
val remoteKey = RemoteKey(o.getKey)
val hash = MD5Hash(o.getETag filter stripQuotes)
val hash = MD5Hash(o.getETag)
val lastModified = LastModified(o.getLastModified.toInstant)
(remoteKey, HashModified(hash, lastModified))
}}.toMap

View file

@ -13,19 +13,19 @@ class ThorpS3Client(amazonS3Client: => AmazonS3,
lazy val objectLister = new S3ClientObjectLister(amazonS3Client)
lazy val copier = new S3ClientCopier(amazonS3Client)
lazy val uploader = new S3ClientTransferManager(amazonS3TransferManager)
lazy val uploader = new Uploader(amazonS3TransferManager)
lazy val deleter = new S3ClientDeleter(amazonS3Client)
override def listObjects(bucket: Bucket,
prefix: RemoteKey)
(implicit info: Int => String => Unit): IO[S3ObjectsData] =
(implicit info: Int => String => IO[Unit]): IO[S3ObjectsData] =
objectLister.listObjects(bucket, prefix)
override def copy(bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey)
(implicit info: Int => String => Unit): IO[CopyS3Action] =
(implicit info: Int => String => IO[Unit]): IO[CopyS3Action] =
copier.copy(bucket, sourceKey,hash, targetKey)
override def upload(localFile: LocalFile,
@ -34,13 +34,13 @@ class ThorpS3Client(amazonS3Client: => AmazonS3,
multiPartThreshold: Long,
tryCount: Int,
maxRetries: Int)
(implicit info: Int => String => Unit,
warn: String => Unit): IO[S3Action] =
(implicit info: Int => String => IO[Unit],
warn: String => IO[Unit]): IO[S3Action] =
uploader.upload(localFile, bucket, progressListener, multiPartThreshold, 1, maxRetries)
override def delete(bucket: Bucket,
remoteKey: RemoteKey)
(implicit info: Int => String => Unit): IO[DeleteS3Action] =
(implicit info: Int => String => IO[Unit]): IO[DeleteS3Action] =
deleter.delete(bucket, remoteKey)
}

View file

@ -0,0 +1,64 @@
package net.kemitix.s3thorp.aws.lib
import cats.effect.IO
import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener}
import com.amazonaws.services.s3.model.PutObjectRequest
import com.amazonaws.services.s3.transfer.{TransferManager => AmazonTransferManager}
import net.kemitix.s3thorp.aws.api.S3Action.UploadS3Action
import net.kemitix.s3thorp.aws.api.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent}
import net.kemitix.s3thorp.aws.api.{S3Action, UploadProgressListener}
import net.kemitix.s3thorp.aws.lib.UploaderLogging.{logMultiPartUploadFinished, logMultiPartUploadStart}
import net.kemitix.s3thorp.domain.{Bucket, LocalFile, MD5Hash, RemoteKey}
class Uploader(transferManager: => AmazonTransferManager) {
def accepts(localFile: LocalFile)
(implicit multiPartThreshold: Long): Boolean =
localFile.file.length >= multiPartThreshold
def upload(localFile: LocalFile,
bucket: Bucket,
uploadProgressListener: UploadProgressListener,
multiPartThreshold: Long,
tryCount: Int,
maxRetries: Int)
(implicit info: Int => String => IO[Unit],
warn: String => IO[Unit]): IO[S3Action] = {
for {
_ <- logMultiPartUploadStart(localFile, tryCount)
listener = progressListener(uploadProgressListener)
putObjectRequest = request(localFile, bucket, listener)
upload = transferManager.upload(putObjectRequest)
result <- IO{upload.waitForUploadResult}
_ <- logMultiPartUploadFinished(localFile)
} yield UploadS3Action(RemoteKey(result.getKey), MD5Hash(result.getETag))
}
private def request(localFile: LocalFile, bucket: Bucket, listener: ProgressListener): PutObjectRequest = {
new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file)
.withGeneralProgressListener(listener)
}
private def progressListener(uploadProgressListener: UploadProgressListener) =
new ProgressListener {
override def progressChanged(progressEvent: ProgressEvent): Unit = {
uploadProgressListener.listener(
progressEvent match {
case e: ProgressEvent if isTransfer(e) =>
TransferEvent(e.getEventType.name)
case e: ProgressEvent if isByteTransfer(e) =>
ByteTransferEvent(e.getEventType.name)
case e: ProgressEvent =>
RequestEvent(e.getEventType.name, e.getBytes, e.getBytesTransferred)
})
.unsafeRunSync // the listener doesn't execute otherwise as it is never returned
}
}
private def isTransfer(e: ProgressEvent) =
e.getEventType.isTransferEvent
private def isByteTransfer(e: ProgressEvent) =
e.getEventType equals ProgressEventType.RESPONSE_BYTE_TRANSFER_EVENT
}

View file

@ -0,0 +1,22 @@
package net.kemitix.s3thorp.aws.lib
import cats.effect.IO
import net.kemitix.s3thorp.domain.Terminal.clearLine
import net.kemitix.s3thorp.domain.SizeTranslation.sizeInEnglish
import net.kemitix.s3thorp.domain.LocalFile
object UploaderLogging {
def logMultiPartUploadStart(localFile: LocalFile,
tryCount: Int)
(implicit info: Int => String => IO[Unit]): IO[Unit] = {
val tryMessage = if (tryCount == 1) "" else s"try $tryCount"
val size = sizeInEnglish(localFile.file.length)
info(1)(s"${clearLine}upload:$tryMessage:$size:${localFile.remoteKey.key}")
}
def logMultiPartUploadFinished(localFile: LocalFile)
(implicit info: Int => String => IO[Unit]): IO[Unit] =
info(4)(s"upload:finished: ${localFile.remoteKey.key}")
}

View file

@ -2,6 +2,7 @@ package net.kemitix.s3thorp.aws.lib
import java.time.Instant
import cats.effect.IO
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.PutObjectRequest
import com.amazonaws.services.s3.transfer.model.UploadResult
@ -22,8 +23,8 @@ class S3ClientSuite
private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
implicit private val logInfo: Int => String => Unit = l => m => ()
implicit private val logWarn: String => Unit = w => ()
implicit private val logInfo: Int => String => IO[Unit] = _ => _ => IO.unit
implicit private val logWarn: String => IO[Unit] = _ => IO.unit
private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _
describe("getS3Status") {

View file

@ -3,6 +3,7 @@ package net.kemitix.s3thorp.aws.lib
import java.time.Instant
import java.util.Date
import cats.effect.IO
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.{ListObjectsV2Request, ListObjectsV2Result, S3ObjectSummary}
import com.amazonaws.services.s3.transfer.TransferManager
@ -19,7 +20,7 @@ class ThorpS3ClientSuite
val source = Resource(this, "upload")
val prefix = RemoteKey("prefix")
implicit val config: Config = Config(Bucket("bucket"), prefix, source = source)
implicit val logInfo: Int => String => Unit = l => m => ()
implicit val logInfo: Int => String => IO[Unit] = _ => _ => IO.unit
val lm = LastModified(Instant.now)

View file

@ -2,6 +2,7 @@ package net.kemitix.s3thorp.aws.lib
import java.time.Instant
import cats.effect.IO
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.transfer._
import net.kemitix.s3thorp.aws.api.S3Action.UploadS3Action
@ -12,22 +13,22 @@ import net.kemitix.s3thorp.domain._
import org.scalamock.scalatest.MockFactory
import org.scalatest.FunSpec
class S3ClientTransferManagerSuite
class UploaderSuite
extends FunSpec
with MockFactory {
private val source = Resource(this, ".")
private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
implicit private val logInfo: Int => String => Unit = l => m => ()
implicit private val logWarn: String => Unit = w => ()
implicit private val logInfo: Int => String => IO[Unit] = _ => _ => IO.unit
implicit private val logWarn: String => IO[Unit] = _ => IO.unit
private val fileToKey = generateKey(config.source, config.prefix) _
val lastModified = LastModified(Instant.now())
describe("S3ClientMultiPartTransferManagerSuite") {
describe("accepts") {
val transferManager = stub[TransferManager]
val uploader = new S3ClientTransferManager(transferManager)
val uploader = new Uploader(transferManager)
describe("small-file") {
val smallFile = LocalFile.resolve("small-file", MD5Hash("the-hash"), source, fileToKey)
it("should be a small-file") {
@ -59,7 +60,7 @@ class S3ClientTransferManagerSuite
val progressListener = new UploadProgressListener(bigFile)
val amazonS3 = mock[AmazonS3]
val amazonS3TransferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build
val uploader = new S3ClientTransferManager(amazonS3TransferManager)
val uploader = new Uploader(amazonS3TransferManager)
it("should upload") {
val expected = UploadS3Action(returnedKey, returnedHash)
val result = uploader.upload(bigFile, config.bucket, progressListener, config.multiPartThreshold, 1, config.maxRetries).unsafeRunSync

View file

@ -22,12 +22,6 @@ val awsSdkDependencies = Seq(
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % "2.9.9"
)
)
val loggingSettings = Seq(
libraryDependencies ++= Seq(
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.2",
"org.slf4j" % "slf4j-log4j12" % "1.7.26",
)
)
val catsEffectsSettings = Seq(
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-effect" % "1.3.1"
@ -47,7 +41,6 @@ val catsEffectsSettings = Seq(
lazy val cli = (project in file("cli"))
.settings(applicationSettings)
.aggregate(`aws-lib`, core, `aws-api`, domain)
.settings(loggingSettings)
.settings(commandLineParsing)
.dependsOn(`aws-lib`)

View file

@ -1,21 +0,0 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<appender name="console" class="org.apache.log4j.ConsoleAppender">
<param name="Target" value="System.out"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%-5p - %m%n"/>
</layout>
</appender>
<logger name="net.kemitix.s3thorp">
<level value="info"/>
</logger>
<root>
<priority value ="warn" />
<appender-ref ref="console" />
</root>
</log4j:configuration>

View file

@ -1,14 +1,15 @@
package net.kemitix.s3thorp.cli
import com.typesafe.scalalogging.LazyLogging
import net.kemitix.s3thorp.domain.Config
import cats.effect.IO
class Logger(verbosity: Int) extends LazyLogging {
class Logger(verbosity: Int) {
def info(level: Int)(message: String): Unit = if (verbosity >= level) logger.info(s"1:$message")
def info(level: Int)(message: String): IO[Unit] =
if (verbosity >= level) IO(println(s"[INFO:$level] $message"))
else IO.unit
def warn(message: String): Unit = logger.warn(message)
def warn(message: String): IO[Unit] = IO(println(s"[ WARN] $message"))
def error(message: String): Unit = logger.error(message)
def error(message: String): IO[Unit] = IO(println(s"[ ERROR] $message"))
}

View file

@ -21,7 +21,7 @@ object Main extends IOApp {
logger = new Logger(config.verbose)
info = (l: Int) => (m: String) => logger.info(l)(m)
md5HashGenerator = (file: File) => md5File(file)(info)
_ <- IO(logger.info(1)("S3Thorp - hashed sync for s3"))
_ <- logger.info(1)("S3Thorp - hashed sync for s3")
_ <- Sync.run(
S3ClientBuilder.defaultClient,
md5HashGenerator,
@ -34,9 +34,9 @@ object Main extends IOApp {
val logger = new Logger(1)
program(args)
.guaranteeCase {
case Canceled => IO(logger.warn("Interrupted"))
case Error(e) => IO(logger.error(e.getMessage))
case Completed => IO(logger.info(1)("Done"))
case Canceled => logger.warn("Interrupted")
case Error(e) => logger.error(e.getMessage)
case Completed => logger.info(1)("Done")
}
}

View file

@ -10,22 +10,28 @@ object ActionSubmitter {
def submitAction(s3Client: S3Client, action: Action)
(implicit c: Config,
info: Int => String => Unit,
warn: String => Unit): Stream[IO[S3Action]] = {
info: Int => String => IO[Unit],
warn: String => IO[Unit]): Stream[IO[S3Action]] = {
Stream(
action match {
case ToUpload(bucket, localFile) =>
info(4)(s" Upload: ${localFile.relative}")
val progressListener = new UploadProgressListener(localFile)
s3Client.upload(localFile, bucket, progressListener, c.multiPartThreshold, 1, c.maxRetries)
for {
_ <- info(4) (s" Upload: ${localFile.relative}")
progressListener = new UploadProgressListener(localFile)
action <- s3Client.upload(localFile, bucket, progressListener, c.multiPartThreshold, 1, c.maxRetries)
} yield action
case ToCopy(bucket, sourceKey, hash, targetKey) =>
info(4)(s" Copy: ${sourceKey.key} => ${targetKey.key}")
s3Client.copy(bucket, sourceKey, hash, targetKey)
for {
_ <- info(4)(s" Copy: ${sourceKey.key} => ${targetKey.key}")
action <- s3Client.copy(bucket, sourceKey, hash, targetKey)
} yield action
case ToDelete(bucket, remoteKey) =>
info(4)(s" Delete: ${remoteKey.key}")
s3Client.delete(bucket, remoteKey)
case DoNothing(bucket, remoteKey) => IO {
DoNothingS3Action(remoteKey)}
for {
_ <- info(4)(s" Delete: ${remoteKey.key}")
action <- s3Client.delete(bucket, remoteKey)
} yield action
case DoNothing(bucket, remoteKey) =>
IO.pure(DoNothingS3Action(remoteKey))
})
}
}

View file

@ -11,7 +11,7 @@ object LocalFileStream {
def findFiles(file: File,
md5HashGenerator: File => IO[MD5Hash],
info: Int => String => Unit)
info: Int => String => IO[Unit])
(implicit c: Config): IO[Stream[LocalFile]] = {
val filters: Path => Boolean = Filter.isIncluded(c.filters)

View file

@ -9,13 +9,13 @@ import net.kemitix.s3thorp.domain.MD5Hash
object MD5HashGenerator {
def md5File(file: File)
(implicit info: Int => String => Unit): IO[MD5Hash] =
(implicit info: Int => String => IO[Unit]): IO[MD5Hash] =
md5FilePart(file, 0, file.length)
def md5FilePart(file: File,
offset: Long,
size: Long)
(implicit info: Int => String => Unit): IO[MD5Hash] = {
(implicit info: Int => String => IO[Unit]): IO[MD5Hash] = {
val buffer = new Array[Byte](size.toInt)
def readIntoBuffer = {
@ -34,10 +34,10 @@ object MD5HashGenerator {
def readFile = openFile.bracket(readIntoBuffer)(closeFile)
for {
_ <- IO(info(5)(s"md5:reading:offset $offset:size $size:$file"))
_ <- info(5)(s"md5:reading:offset $offset:size $size:$file")
_ <- readFile
hash = md5PartBody(buffer)
_ <- IO (info(5)(s"md5:generated:${hash.hash}"))
_ <- info(4)(s"md5:generated:${hash.hash}:$file")
} yield hash
}

View file

@ -17,9 +17,9 @@ object Sync {
def run(s3Client: S3Client,
md5HashGenerator: File => IO[MD5Hash],
info: Int => String => Unit,
warn: String => Unit,
error: String => Unit)
info: Int => String => IO[Unit],
warn: String => IO[Unit],
error: String => IO[Unit])
(implicit c: Config): IO[Unit] = {
def copyUploadActions(s3Data: S3ObjectsData): IO[Stream[S3Action]] =

View file

@ -8,29 +8,33 @@ import net.kemitix.s3thorp.domain.Config
// Logging for the Sync class
object SyncLogging {
def logRunStart[F[_]](info: Int => String => Unit)(implicit c: Config): IO[Unit] = IO {
info(1)(s"Bucket: ${c.bucket.name}, Prefix: ${c.prefix.key}, Source: ${c.source}, ")}
def logRunStart[F[_]](info: Int => String => IO[Unit])
(implicit c: Config): IO[Unit] =
info(1)(s"Bucket: ${c.bucket.name}, Prefix: ${c.prefix.key}, Source: ${c.source}, ")
def logFileScan(info: Int => String => Unit)(implicit c: Config): IO[Unit] = IO{
info(1)(s"Scanning local files: ${c.source}...")}
def logFileScan(info: Int => String => IO[Unit])
(implicit c: Config): IO[Unit] =
info(1)(s"Scanning local files: ${c.source}...")
def logRunFinished(actions: Stream[S3Action],
info: Int => String => Unit)
(implicit c: Config): IO[Unit] = IO {
val counters = actions.foldLeft(Counters())(countActivities)
info(1)(s"Uploaded ${counters.uploaded} files")
info(1)(s"Copied ${counters.copied} files")
info(1)(s"Deleted ${counters.deleted} files")
}
info: Int => String => IO[Unit])
(implicit c: Config): IO[Unit] =
for {
_ <- IO.unit
counters = actions.foldLeft(Counters())(countActivities)
_ <- info(1)(s"Uploaded ${counters.uploaded} files")
_ <- info(1)(s"Copied ${counters.copied} files")
_ <- info(1)(s"Deleted ${counters.deleted} files")
} yield ()
private def countActivities(implicit c: Config): (Counters, S3Action) => Counters =
(counters: Counters, s3Action: S3Action) => {
s3Action match {
case UploadS3Action(remoteKey, _) =>
case _: UploadS3Action =>
counters.copy(uploaded = counters.uploaded + 1)
case CopyS3Action(remoteKey) =>
case _: CopyS3Action =>
counters.copy(copied = counters.copied + 1)
case DeleteS3Action(remoteKey) =>
case _: DeleteS3Action =>
counters.copy(deleted = counters.deleted + 1)
case _ => counters
}

View file

@ -10,7 +10,7 @@ class LocalFileStreamSuite extends FunSpec {
val uploadResource = Resource(this, "upload")
val config: Config = Config(source = uploadResource)
implicit private val logInfo: Int => String => Unit = l => i => ()
implicit private val logInfo: Int => String => IO[Unit] = l => i => IO.unit
val md5HashGenerator: File => IO[MD5Hash] = file => MD5HashGenerator.md5File(file)
describe("findFiles") {

View file

@ -2,6 +2,7 @@ package net.kemitix.s3thorp.core
import java.nio.file.Files
import cats.effect.IO
import net.kemitix.s3thorp.core.MD5HashData.rootHash
import net.kemitix.s3thorp.domain.{Bucket, Config, MD5Hash, RemoteKey}
import org.scalatest.FunSpec
@ -11,7 +12,7 @@ class MD5HashGeneratorTest extends FunSpec {
private val source = Resource(this, "upload")
private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
implicit private val logInfo: Int => String => Unit = l => i => ()
implicit private val logInfo: Int => String => IO[Unit] = l => i => IO.unit
describe("read a small file (smaller than buffer)") {
val file = Resource(this, "upload/root-file")

View file

@ -17,9 +17,9 @@ class SyncSuite
private val source = Resource(this, "upload")
private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
implicit private val logInfo: Int => String => Unit = l => i => ()
implicit private val logWarn: String => Unit = w => ()
private def logError: String => Unit = e => ()
implicit private val logInfo: Int => String => IO[Unit] = _ => _ => IO.unit
implicit private val logWarn: String => IO[Unit] = _ => IO.unit
private def logError: String => IO[Unit] = _ => IO.unit
private val lastModified = LastModified(Instant.now)
private val fileToKey: File => RemoteKey = KeyGenerator.generateKey(source, prefix)
private val rootFile = LocalFile.resolve("root-file", rootHash, source, fileToKey)
@ -149,10 +149,8 @@ class SyncSuite
override def listObjects(bucket: Bucket,
prefix: RemoteKey)
(implicit info: Int => String => Unit) =
IO {
s3ObjectsData
}
(implicit info: Int => String => IO[Unit]) =
IO.pure(s3ObjectsData)
override def upload(localFile: LocalFile,
bucket: Bucket,
@ -160,8 +158,8 @@ class SyncSuite
multiPartThreshold: Long,
tryCount: Int,
maxRetries: Int)
(implicit info: Int => String => Unit,
warn: String => Unit) =
(implicit info: Int => String => IO[Unit],
warn: String => IO[Unit]) =
IO {
if (bucket == testBucket)
uploadsRecord += (localFile.relative.toString -> localFile.remoteKey)
@ -172,7 +170,7 @@ class SyncSuite
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey
)(implicit info: Int => String => Unit) =
)(implicit info: Int => String => IO[Unit]) =
IO {
if (bucket == testBucket)
copiesRecord += (sourceKey -> targetKey)
@ -181,7 +179,7 @@ class SyncSuite
override def delete(bucket: Bucket,
remoteKey: RemoteKey
)(implicit info: Int => String => Unit) =
)(implicit info: Int => String => IO[Unit]) =
IO {
if (bucket == testBucket)
deletionsRecord += remoteKey

View file

@ -1,7 +1,9 @@
package net.kemitix.s3thorp.domain
final case class MD5Hash(hash: String) {
import net.kemitix.s3thorp.domain.QuoteStripper.stripQuotes
require(!hash.contains("\""))
final case class MD5Hash(in: String) {
lazy val hash: String = in filter stripQuotes
}

View file

@ -1,4 +1,4 @@
package net.kemitix.s3thorp.core
package net.kemitix.s3thorp.domain
object QuoteStripper {

View file

@ -0,0 +1,14 @@
package net.kemitix.s3thorp.domain
object SizeTranslation {
def sizeInEnglish(length: Long): String =
length match {
case bytes if bytes > 1024 * 1024 * 1024 => s"${bytes / 1024 / 1024 /1024}Gb"
case bytes if bytes > 1024 * 1024 => s"${bytes / 1024 / 1024}Mb"
case bytes if bytes > 1024 => s"${bytes / 1024}Kb"
case bytes => s"${length}b"
}
}

View file

@ -0,0 +1,28 @@
package net.kemitix.s3thorp.domain
object Terminal {
/**
* Clears the whole terminal line.
*/
val clearLine = "\u001B[2K\r"
/**
* Moves the cursor up one line and back to the start of the line.
*/
val returnToPreviousLine = "\u001B[1A\r"
/**
* The Width of the terminal, as reported by the COLUMNS environment variable.
*
* N.B. Not all environment will update this value when the terminal is resized.
*
* @return the number of columns in the terminal
*/
def width: Int = {
Option(System.getenv("COLUMNS"))
.map(_.toInt)
.map(Math.max(_, 10))
.getOrElse(80)
}
}