diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/Uploader.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/Uploader.scala index eeba3a2..c2ba261 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/Uploader.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/Uploader.scala @@ -3,13 +3,16 @@ package net.kemitix.s3thorp.aws.lib import cats.effect.IO import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener} import com.amazonaws.services.s3.model.PutObjectRequest +import com.amazonaws.services.s3.transfer.model.UploadResult import com.amazonaws.services.s3.transfer.{TransferManager => AmazonTransferManager} -import net.kemitix.s3thorp.aws.api.S3Action.UploadS3Action +import net.kemitix.s3thorp.aws.api.S3Action.{ErroredS3Action, UploadS3Action} import net.kemitix.s3thorp.aws.api.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent} import net.kemitix.s3thorp.aws.api.{S3Action, UploadProgressListener} import net.kemitix.s3thorp.aws.lib.UploaderLogging.{logMultiPartUploadFinished, logMultiPartUploadStart} import net.kemitix.s3thorp.domain.{Bucket, LocalFile, MD5Hash, RemoteKey} +import scala.util.Try + class Uploader(transferManager: => AmazonTransferManager) { def accepts(localFile: LocalFile) @@ -23,21 +26,32 @@ class Uploader(transferManager: => AmazonTransferManager) { tryCount: Int, maxRetries: Int) (implicit info: Int => String => IO[Unit], - warn: String => IO[Unit]): IO[S3Action] = { + warn: String => IO[Unit]): IO[S3Action] = for { _ <- logMultiPartUploadStart(localFile, tryCount) - listener = progressListener(uploadProgressListener) - putObjectRequest = request(localFile, bucket, listener) - upload = transferManager.upload(putObjectRequest) - result <- IO{upload.waitForUploadResult} + upload <- upload(localFile, bucket, uploadProgressListener) _ <- logMultiPartUploadFinished(localFile) - } yield UploadS3Action(RemoteKey(result.getKey), MD5Hash(result.getETag)) + } yield upload match { + case Right(r) => UploadS3Action(RemoteKey(r.getKey), MD5Hash(r.getETag)) + case Left(e) => ErroredS3Action(localFile.remoteKey, e) + } + + private def upload(localFile: LocalFile, + bucket: Bucket, + uploadProgressListener: UploadProgressListener, + ): IO[Either[Throwable, UploadResult]] = { + val listener = progressListener(uploadProgressListener) + val putObjectRequest = request(localFile, bucket, listener) + IO { + Try(transferManager.upload(putObjectRequest)) + .map(_.waitForUploadResult) + .toEither + } } - private def request(localFile: LocalFile, bucket: Bucket, listener: ProgressListener): PutObjectRequest = { + private def request(localFile: LocalFile, bucket: Bucket, listener: ProgressListener): PutObjectRequest = new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file) .withGeneralProgressListener(listener) - } private def progressListener(uploadProgressListener: UploadProgressListener) = new ProgressListener { diff --git a/core/src/main/scala/net.kemitix.s3thorp.core/Counters.scala b/core/src/main/scala/net.kemitix.s3thorp.core/Counters.scala index 35aac99..b8c89a2 100644 --- a/core/src/main/scala/net.kemitix.s3thorp.core/Counters.scala +++ b/core/src/main/scala/net.kemitix.s3thorp.core/Counters.scala @@ -2,4 +2,5 @@ package net.kemitix.s3thorp.core final case class Counters(uploaded: Int = 0, deleted: Int = 0, - copied: Int = 0) + copied: Int = 0, + errors: Int = 0) diff --git a/core/src/main/scala/net.kemitix.s3thorp.core/SyncLogging.scala b/core/src/main/scala/net.kemitix.s3thorp.core/SyncLogging.scala index 2b926e9..81c6ad8 100644 --- a/core/src/main/scala/net.kemitix.s3thorp.core/SyncLogging.scala +++ b/core/src/main/scala/net.kemitix.s3thorp.core/SyncLogging.scala @@ -2,13 +2,13 @@ package net.kemitix.s3thorp.core import cats.effect.IO import net.kemitix.s3thorp.aws.api.S3Action -import net.kemitix.s3thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action, UploadS3Action} +import net.kemitix.s3thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action, ErroredS3Action, UploadS3Action} import net.kemitix.s3thorp.domain.Config // Logging for the Sync class object SyncLogging { - def logRunStart[F[_]](info: Int => String => IO[Unit]) + def logRunStart(info: Int => String => IO[Unit]) (implicit c: Config): IO[Unit] = info(1)(s"Bucket: ${c.bucket.name}, Prefix: ${c.prefix.key}, Source: ${c.source}, ") @@ -25,6 +25,7 @@ object SyncLogging { _ <- info(1)(s"Uploaded ${counters.uploaded} files") _ <- info(1)(s"Copied ${counters.copied} files") _ <- info(1)(s"Deleted ${counters.deleted} files") + _ <- info(1)(s"Errors ${counters.errors}") } yield () private def countActivities(implicit c: Config): (Counters, S3Action) => Counters = @@ -36,6 +37,8 @@ object SyncLogging { counters.copy(copied = counters.copied + 1) case _: DeleteS3Action => counters.copy(deleted = counters.deleted + 1) + case ErroredS3Action(k, e) => + counters.copy(errors = counters.errors + 1) case _ => counters } }