Handle when a file is not found after initial scan (#49)

* [aws-lib] Uploader refactoring

* [aws-lib] Uploader remove redundant braces

* [aws-lib] Uploader start upload inside IO

The upload method starts the upload to S3, so should be within the IO.

* [core] log count of errors

* [aws-lib] Uploader handle errors and count them
This commit is contained in:
Paul Campbell 2019-06-10 21:22:46 +01:00 committed by GitHub
parent 97efed76b4
commit fe4bec0f12
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 30 additions and 12 deletions

View file

@ -3,13 +3,16 @@ package net.kemitix.s3thorp.aws.lib
import cats.effect.IO import cats.effect.IO
import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener} import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener}
import com.amazonaws.services.s3.model.PutObjectRequest import com.amazonaws.services.s3.model.PutObjectRequest
import com.amazonaws.services.s3.transfer.model.UploadResult
import com.amazonaws.services.s3.transfer.{TransferManager => AmazonTransferManager} import com.amazonaws.services.s3.transfer.{TransferManager => AmazonTransferManager}
import net.kemitix.s3thorp.aws.api.S3Action.UploadS3Action import net.kemitix.s3thorp.aws.api.S3Action.{ErroredS3Action, UploadS3Action}
import net.kemitix.s3thorp.aws.api.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent} import net.kemitix.s3thorp.aws.api.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent}
import net.kemitix.s3thorp.aws.api.{S3Action, UploadProgressListener} import net.kemitix.s3thorp.aws.api.{S3Action, UploadProgressListener}
import net.kemitix.s3thorp.aws.lib.UploaderLogging.{logMultiPartUploadFinished, logMultiPartUploadStart} import net.kemitix.s3thorp.aws.lib.UploaderLogging.{logMultiPartUploadFinished, logMultiPartUploadStart}
import net.kemitix.s3thorp.domain.{Bucket, LocalFile, MD5Hash, RemoteKey} import net.kemitix.s3thorp.domain.{Bucket, LocalFile, MD5Hash, RemoteKey}
import scala.util.Try
class Uploader(transferManager: => AmazonTransferManager) { class Uploader(transferManager: => AmazonTransferManager) {
def accepts(localFile: LocalFile) def accepts(localFile: LocalFile)
@ -23,21 +26,32 @@ class Uploader(transferManager: => AmazonTransferManager) {
tryCount: Int, tryCount: Int,
maxRetries: Int) maxRetries: Int)
(implicit info: Int => String => IO[Unit], (implicit info: Int => String => IO[Unit],
warn: String => IO[Unit]): IO[S3Action] = { warn: String => IO[Unit]): IO[S3Action] =
for { for {
_ <- logMultiPartUploadStart(localFile, tryCount) _ <- logMultiPartUploadStart(localFile, tryCount)
listener = progressListener(uploadProgressListener) upload <- upload(localFile, bucket, uploadProgressListener)
putObjectRequest = request(localFile, bucket, listener)
upload = transferManager.upload(putObjectRequest)
result <- IO{upload.waitForUploadResult}
_ <- logMultiPartUploadFinished(localFile) _ <- logMultiPartUploadFinished(localFile)
} yield UploadS3Action(RemoteKey(result.getKey), MD5Hash(result.getETag)) } yield upload match {
case Right(r) => UploadS3Action(RemoteKey(r.getKey), MD5Hash(r.getETag))
case Left(e) => ErroredS3Action(localFile.remoteKey, e)
}
private def upload(localFile: LocalFile,
bucket: Bucket,
uploadProgressListener: UploadProgressListener,
): IO[Either[Throwable, UploadResult]] = {
val listener = progressListener(uploadProgressListener)
val putObjectRequest = request(localFile, bucket, listener)
IO {
Try(transferManager.upload(putObjectRequest))
.map(_.waitForUploadResult)
.toEither
}
} }
private def request(localFile: LocalFile, bucket: Bucket, listener: ProgressListener): PutObjectRequest = { private def request(localFile: LocalFile, bucket: Bucket, listener: ProgressListener): PutObjectRequest =
new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file) new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file)
.withGeneralProgressListener(listener) .withGeneralProgressListener(listener)
}
private def progressListener(uploadProgressListener: UploadProgressListener) = private def progressListener(uploadProgressListener: UploadProgressListener) =
new ProgressListener { new ProgressListener {

View file

@ -2,4 +2,5 @@ package net.kemitix.s3thorp.core
final case class Counters(uploaded: Int = 0, final case class Counters(uploaded: Int = 0,
deleted: Int = 0, deleted: Int = 0,
copied: Int = 0) copied: Int = 0,
errors: Int = 0)

View file

@ -2,13 +2,13 @@ package net.kemitix.s3thorp.core
import cats.effect.IO import cats.effect.IO
import net.kemitix.s3thorp.aws.api.S3Action import net.kemitix.s3thorp.aws.api.S3Action
import net.kemitix.s3thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action, UploadS3Action} import net.kemitix.s3thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action, ErroredS3Action, UploadS3Action}
import net.kemitix.s3thorp.domain.Config import net.kemitix.s3thorp.domain.Config
// Logging for the Sync class // Logging for the Sync class
object SyncLogging { object SyncLogging {
def logRunStart[F[_]](info: Int => String => IO[Unit]) def logRunStart(info: Int => String => IO[Unit])
(implicit c: Config): IO[Unit] = (implicit c: Config): IO[Unit] =
info(1)(s"Bucket: ${c.bucket.name}, Prefix: ${c.prefix.key}, Source: ${c.source}, ") info(1)(s"Bucket: ${c.bucket.name}, Prefix: ${c.prefix.key}, Source: ${c.source}, ")
@ -25,6 +25,7 @@ object SyncLogging {
_ <- info(1)(s"Uploaded ${counters.uploaded} files") _ <- info(1)(s"Uploaded ${counters.uploaded} files")
_ <- info(1)(s"Copied ${counters.copied} files") _ <- info(1)(s"Copied ${counters.copied} files")
_ <- info(1)(s"Deleted ${counters.deleted} files") _ <- info(1)(s"Deleted ${counters.deleted} files")
_ <- info(1)(s"Errors ${counters.errors}")
} yield () } yield ()
private def countActivities(implicit c: Config): (Counters, S3Action) => Counters = private def countActivities(implicit c: Config): (Counters, S3Action) => Counters =
@ -36,6 +37,8 @@ object SyncLogging {
counters.copy(copied = counters.copied + 1) counters.copy(copied = counters.copied + 1)
case _: DeleteS3Action => case _: DeleteS3Action =>
counters.copy(deleted = counters.deleted + 1) counters.copy(deleted = counters.deleted + 1)
case ErroredS3Action(k, e) =>
counters.copy(errors = counters.errors + 1)
case _ => counters case _ => counters
} }
} }