diff --git a/build.sbt b/build.sbt index e90dba2..066fd4d 100644 --- a/build.sbt +++ b/build.sbt @@ -8,9 +8,13 @@ scalaVersion := "2.12.8" libraryDependencies += "com.github.scopt" %% "scopt" % "4.0.0-RC2" // AWS SDK +/// wraps the in-preview Java SDK V2 which is incomplete and doesn't support multi-part uploads libraryDependencies += "com.github.j5ik2o" %% "reactive-aws-s3-core" % "1.1.3" libraryDependencies += "com.github.j5ik2o" %% "reactive-aws-s3-cats" % "1.1.3" +// AWS SDK - multi-part upload +libraryDependencies += "com.amazonaws" % "aws-java-sdk-s3" % "1.11.560" + // Logging libraryDependencies += "com.typesafe.scala-logging" %% "scala-logging" % "3.9.2" libraryDependencies += "org.slf4j" % "slf4j-log4j12" % "1.7.26" diff --git a/src/main/scala/net/kemitix/s3thorp/ActionSubmitter.scala b/src/main/scala/net/kemitix/s3thorp/ActionSubmitter.scala index 044894f..24c22ab 100644 --- a/src/main/scala/net/kemitix/s3thorp/ActionSubmitter.scala +++ b/src/main/scala/net/kemitix/s3thorp/ActionSubmitter.scala @@ -13,7 +13,7 @@ trait ActionSubmitter action match { case ToUpload(file) => log4(s" Upload: ${file.relative}") - upload(file, c.bucket) + upload(file, c.bucket, 1) case ToCopy(sourceKey, hash, targetKey) => log4(s" Copy: $sourceKey => $targetKey") copy(c.bucket, sourceKey, hash, targetKey) diff --git a/src/main/scala/net/kemitix/s3thorp/Config.scala b/src/main/scala/net/kemitix/s3thorp/Config.scala index 9eb7350..b9a93a9 100644 --- a/src/main/scala/net/kemitix/s3thorp/Config.scala +++ b/src/main/scala/net/kemitix/s3thorp/Config.scala @@ -6,7 +6,10 @@ case class Config(bucket: Bucket = Bucket(""), prefix: RemoteKey = RemoteKey(""), verbose: Int = 1, filters: Seq[Filter] = List(), + multiPartThreshold: Long = 1024 * 1024 * 5, + maxRetries: Int = 3, source: File ) { require(source.isDirectory, s"Source must be a directory: $source") + require(multiPartThreshold >= 1024 * 1024 * 5, s"Threshold for multi-part upload is 5Mb: '$multiPartThreshold'") } diff --git a/src/main/scala/net/kemitix/s3thorp/MD5HashGenerator.scala b/src/main/scala/net/kemitix/s3thorp/MD5HashGenerator.scala index 56491cd..803dde2 100644 --- a/src/main/scala/net/kemitix/s3thorp/MD5HashGenerator.scala +++ b/src/main/scala/net/kemitix/s3thorp/MD5HashGenerator.scala @@ -6,15 +6,30 @@ import java.security.{DigestInputStream, MessageDigest} trait MD5HashGenerator extends Logging { - def md5File(file: File)(implicit c: Config): MD5Hash = { - log5(s"md5file:reading:${file.length}:$file") - val buffer = new Array[Byte](8192) - val md5 = MessageDigest.getInstance("MD5") - val dis = new DigestInputStream(new FileInputStream(file), md5) - try { while (dis.read(buffer) != -1) { } } finally { dis.close() } - val hash = md5.digest.map("%02x".format(_)).mkString - log5(s"md5file:generated:$hash:$file") - MD5Hash(hash) + def md5File(file: File) + (implicit c: Config): MD5Hash = { + val hash = md5FilePart(file, 0, file.length) + hash + } + + def md5FilePart(file: File, + offset: Long, + size: Long) + (implicit c: Config): MD5Hash = { + log5(s"md5:reading:offset $offset:size $size:$file") + val fis = new FileInputStream(file) + fis skip offset + val buffer = new Array[Byte](size.toInt) + fis read buffer + val hash = md5PartBody(buffer) + log5(s"md5:generated:${hash.hash}") + hash + } + + def md5PartBody(partBody: Array[Byte]): MD5Hash = { + val md5 = MessageDigest getInstance "MD5" + md5 update partBody + MD5Hash((md5.digest map ("%02x" format _)).mkString) } } diff --git a/src/main/scala/net/kemitix/s3thorp/S3Action.scala b/src/main/scala/net/kemitix/s3thorp/S3Action.scala index 903dc9e..114e5d8 100644 --- a/src/main/scala/net/kemitix/s3thorp/S3Action.scala +++ b/src/main/scala/net/kemitix/s3thorp/S3Action.scala @@ -24,6 +24,10 @@ case class DeleteS3Action(remoteKey: RemoteKey) extends S3Action { override val order: Int = 3 } +case class ErroredS3Action(remoteKey: RemoteKey, e: Throwable) extends S3Action { + override val order: Int = 10 +} + object S3Action { implicit def ord[A <: S3Action]: Ordering[A] = Ordering.by(_.order) } diff --git a/src/main/scala/net/kemitix/s3thorp/Sync.scala b/src/main/scala/net/kemitix/s3thorp/Sync.scala index 6c06adf..9e3b5cf 100644 --- a/src/main/scala/net/kemitix/s3thorp/Sync.scala +++ b/src/main/scala/net/kemitix/s3thorp/Sync.scala @@ -38,8 +38,10 @@ class Sync(s3Client: S3Client) ioActions.flatMap { actions => IO { actions.sorted } } override def upload(localFile: LocalFile, - bucket: Bucket)(implicit c: Config): IO[UploadS3Action] = - s3Client.upload(localFile, bucket) + bucket: Bucket, + tryCount: Int) + (implicit c: Config): IO[S3Action] = + s3Client.upload(localFile, bucket, tryCount) override def copy(bucket: Bucket, sourceKey: RemoteKey, diff --git a/src/main/scala/net/kemitix/s3thorp/awssdk/CancellableMultiPartUpload.scala b/src/main/scala/net/kemitix/s3thorp/awssdk/CancellableMultiPartUpload.scala new file mode 100644 index 0000000..60eb2bd --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/awssdk/CancellableMultiPartUpload.scala @@ -0,0 +1,4 @@ +package net.kemitix.s3thorp.awssdk + +case class CancellableMultiPartUpload(e: Throwable, + uploadId: String) extends Exception(e) diff --git a/src/main/scala/net/kemitix/s3thorp/awssdk/QuoteStripper.scala b/src/main/scala/net/kemitix/s3thorp/awssdk/QuoteStripper.scala new file mode 100644 index 0000000..28a9d0a --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/awssdk/QuoteStripper.scala @@ -0,0 +1,7 @@ +package net.kemitix.s3thorp.awssdk + +trait QuoteStripper { + + def stripQuotes: Char => Boolean = _ != '"' + +} diff --git a/src/main/scala/net/kemitix/s3thorp/awssdk/S3Client.scala b/src/main/scala/net/kemitix/s3thorp/awssdk/S3Client.scala index 9956a11..39b83f2 100644 --- a/src/main/scala/net/kemitix/s3thorp/awssdk/S3Client.scala +++ b/src/main/scala/net/kemitix/s3thorp/awssdk/S3Client.scala @@ -19,8 +19,9 @@ trait S3Client { )(implicit c: Config): IO[S3ObjectsData] def upload(localFile: LocalFile, - bucket: Bucket - )(implicit c: Config): IO[UploadS3Action] + bucket: Bucket, + tryCount: Int + )(implicit c: Config): IO[S3Action] def copy(bucket: Bucket, sourceKey: RemoteKey, diff --git a/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientCopier.scala b/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientCopier.scala new file mode 100644 index 0000000..20e7a7a --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientCopier.scala @@ -0,0 +1,28 @@ +package net.kemitix.s3thorp.awssdk + +import cats.effect.IO +import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient +import net.kemitix.s3thorp.{Bucket, Config, CopyS3Action, MD5Hash, RemoteKey} +import software.amazon.awssdk.services.s3.model.CopyObjectRequest + +private class S3ClientCopier(s3Client: S3CatsIOClient) + extends S3ClientLogging { + + def copy(bucket: Bucket, + sourceKey: RemoteKey, + hash: MD5Hash, + targetKey: RemoteKey) + (implicit c: Config): IO[CopyS3Action] = { + val request = CopyObjectRequest.builder + .bucket(bucket.name) + .copySource(s"${bucket.name}/${sourceKey.key}") + .copySourceIfMatch(hash.hash) + .key(targetKey.key).build + s3Client.copyObject(request) + .bracket( + logCopyStart(bucket, sourceKey, targetKey))( + logCopyFinish(bucket, sourceKey,targetKey)) + .map(_ => CopyS3Action(targetKey)) + } + +} diff --git a/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientDeleter.scala b/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientDeleter.scala new file mode 100644 index 0000000..7c6d6d1 --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientDeleter.scala @@ -0,0 +1,24 @@ +package net.kemitix.s3thorp.awssdk + +import cats.effect.IO +import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient +import net.kemitix.s3thorp.{Bucket, Config, DeleteS3Action, RemoteKey} +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest + +private class S3ClientDeleter(s3Client: S3CatsIOClient) + extends S3ClientLogging { + + def delete(bucket: Bucket, + remoteKey: RemoteKey) + (implicit c: Config): IO[DeleteS3Action] = { + val request = DeleteObjectRequest.builder + .bucket(bucket.name) + .key(remoteKey.key).build + s3Client.deleteObject(request) + .bracket( + logDeleteStart(bucket, remoteKey))( + logDeleteFinish(bucket, remoteKey)) + .map(_ => DeleteS3Action(remoteKey)) + } + +} diff --git a/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientMultiPartTransferManager.scala b/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientMultiPartTransferManager.scala new file mode 100644 index 0000000..8966e10 --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientMultiPartTransferManager.scala @@ -0,0 +1,27 @@ +package net.kemitix.s3thorp.awssdk +import cats.effect.IO +import com.amazonaws.services.s3.transfer.TransferManager +import net.kemitix.s3thorp._ + +class S3ClientMultiPartTransferManager(transferManager: TransferManager) + extends S3ClientUploader + with S3ClientMultiPartUploaderLogging { + + def accepts(localFile: LocalFile) + (implicit c: Config): Boolean = + localFile.file.length >= c.multiPartThreshold + + override + def upload(localFile: LocalFile, + bucket: Bucket, + tryCount: Int) + (implicit c: Config): IO[S3Action] = { + IO { + logMultiPartUploadStart(localFile, tryCount) + val result = transferManager.upload(bucket.name, localFile.remoteKey.key, localFile.file) + .waitForUploadResult() + logMultiPartUploadFinished(localFile) + UploadS3Action(RemoteKey(result.getKey), MD5Hash(result.getETag)) + } + } +} diff --git a/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientMultiPartUploader.scala b/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientMultiPartUploader.scala new file mode 100644 index 0000000..b088c1d --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientMultiPartUploader.scala @@ -0,0 +1,145 @@ +package net.kemitix.s3thorp.awssdk + +import scala.collection.JavaConverters._ +import cats.effect.IO +import cats.implicits._ +import com.amazonaws.services.s3.AmazonS3 +import com.amazonaws.services.s3.model.{AbortMultipartUploadRequest, AmazonS3Exception, CompleteMultipartUploadRequest, CompleteMultipartUploadResult, InitiateMultipartUploadRequest, InitiateMultipartUploadResult, PartETag, UploadPartRequest, UploadPartResult} +import net.kemitix.s3thorp._ + +import scala.util.control.NonFatal + +private class S3ClientMultiPartUploader(s3Client: AmazonS3) + extends S3ClientUploader + with S3ClientMultiPartUploaderLogging + with MD5HashGenerator + with QuoteStripper { + + def accepts(localFile: LocalFile) + (implicit c: Config): Boolean = + localFile.file.length >= c.multiPartThreshold + + def createUpload(bucket: Bucket, localFile: LocalFile) + (implicit c: Config): IO[InitiateMultipartUploadResult] = { + logMultiPartUploadInitiate(localFile) + IO(s3Client initiateMultipartUpload createUploadRequest(bucket, localFile)) + } + + def createUploadRequest(bucket: Bucket, localFile: LocalFile) = + new InitiateMultipartUploadRequest( + bucket.name, + localFile.remoteKey.key) + + def parts(localFile: LocalFile, + response: InitiateMultipartUploadResult) + (implicit c: Config): IO[Stream[UploadPartRequest]] = { + val fileSize = localFile.file.length + val maxParts = 1024 // arbitrary, supports upto 10,000 (I, think) + val threshold = c.multiPartThreshold + val nParts = Math.min((fileSize / threshold) + 1, maxParts).toInt + val partSize = fileSize / nParts + val maxUpload = nParts * partSize + + IO { + require(fileSize <= maxUpload, + s"File (${localFile.file.getPath}) size ($fileSize) exceeds upload limit: $maxUpload") + logMultiPartUploadPartsDetails(localFile, nParts, partSize) + for { + partNumber <- (1 to nParts).toStream + offSet = (partNumber - 1) * partSize + chunkSize = Math.min(fileSize - offSet, partSize) + partHash = md5FilePart(localFile.file, offSet, chunkSize) + _ = logMultiPartUploadPartDetails(localFile, partNumber, partHash) + uploadPartRequest = createUploadPartRequest(localFile, response, partNumber, chunkSize, partHash) + } yield uploadPartRequest + } + } + + private def createUploadPartRequest(localFile: LocalFile, + response: InitiateMultipartUploadResult, + partNumber: Int, + chunkSize: Long, + partHash: MD5Hash) + (implicit c: Config) = { + new UploadPartRequest() + .withBucketName(c.bucket.name) + .withKey(localFile.remoteKey.key) + .withUploadId(response.getUploadId) + .withPartNumber(partNumber) + .withPartSize(chunkSize) + .withMD5Digest(partHash.hash) + .withFile(localFile.file) + .withFileOffset((partNumber - 1) * chunkSize) + } + + def uploadPart(localFile: LocalFile) + (implicit c: Config): UploadPartRequest => IO[UploadPartResult] = + partRequest => { + logMultiPartUploadPart(localFile, partRequest) + IO(s3Client.uploadPart(partRequest)) + .handleErrorWith{ + case error: AmazonS3Exception => { + logMultiPartUploadPartError(localFile, partRequest, error) + IO.raiseError(CancellableMultiPartUpload(error, partRequest.getUploadId)) + }} + } + + def uploadParts(localFile: LocalFile, + parts: Stream[UploadPartRequest]) + (implicit c: Config): IO[Stream[UploadPartResult]] = + (parts map uploadPart(localFile)).sequence + + def completeUpload(createUploadResponse: InitiateMultipartUploadResult, + uploadPartResponses: Stream[UploadPartResult], + localFile: LocalFile) + (implicit c: Config): IO[CompleteMultipartUploadResult] = { + logMultiPartUploadCompleted(createUploadResponse, uploadPartResponses, localFile) + IO(s3Client completeMultipartUpload createCompleteRequest(createUploadResponse, uploadPartResponses.toList)) + } + + def createCompleteRequest(createUploadResponse: InitiateMultipartUploadResult, + uploadPartResult: List[UploadPartResult]) = { + new CompleteMultipartUploadRequest() + .withBucketName(createUploadResponse.getBucketName) + .withKey(createUploadResponse.getKey) + .withUploadId(createUploadResponse.getUploadId) + .withPartETags(uploadPartResult.asJava) + } + + def cancel(uploadId: String, localFile: LocalFile) + (implicit c: Config): IO[Unit] = { + logMultiPartUploadCancelling(localFile) + IO(s3Client abortMultipartUpload createAbortRequest(uploadId, localFile)) + } + + def createAbortRequest(uploadId: String, + localFile: LocalFile) + (implicit c: Config): AbortMultipartUploadRequest = + new AbortMultipartUploadRequest(c.bucket.name, localFile.remoteKey.key, uploadId) + + override def upload(localFile: LocalFile, + bucket: Bucket, + tryCount: Int) + (implicit c: Config): IO[S3Action] = { + logMultiPartUploadStart(localFile, tryCount) + + (for { + createUploadResponse <- createUpload(bucket, localFile) + parts <- parts(localFile, createUploadResponse) + uploadPartResponses <- uploadParts(localFile, parts) + completedUploadResponse <- completeUpload(createUploadResponse, uploadPartResponses, localFile) + } yield completedUploadResponse) + .map(_.getETag) + .map(_ filter stripQuotes) + .map(MD5Hash) + .map(UploadS3Action(localFile.remoteKey, _)) + .handleErrorWith { + case CancellableMultiPartUpload(e, uploadId) => + if (tryCount >= c.maxRetries) IO(logErrorCancelling(e, localFile)) *> cancel(uploadId, localFile) *> IO.pure(ErroredS3Action(localFile.remoteKey, e)) + else IO(logErrorRetrying(e, localFile, tryCount)) *> upload(localFile, bucket, tryCount + 1) + case NonFatal(e) => + if (tryCount >= c.maxRetries) IO(logErrorUnknown(e, localFile)) *> IO.pure(ErroredS3Action(localFile.remoteKey, e)) + else IO(logErrorRetrying(e, localFile, tryCount)) *> upload(localFile, bucket, tryCount + 1) + } + } +} diff --git a/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientMultiPartUploaderLogging.scala b/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientMultiPartUploaderLogging.scala new file mode 100644 index 0000000..7031c86 --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientMultiPartUploaderLogging.scala @@ -0,0 +1,77 @@ +package net.kemitix.s3thorp.awssdk + +import com.amazonaws.services.s3.model.{AmazonS3Exception, InitiateMultipartUploadResult, UploadPartRequest, UploadPartResult} +import net.kemitix.s3thorp.{Config, LocalFile, MD5Hash} + +trait S3ClientMultiPartUploaderLogging + extends S3ClientLogging { + + private val prefix = "multi-part upload" + + def logMultiPartUploadStart(localFile: LocalFile, + tryCount: Int) + (implicit c: Config): Unit = + log4(s"$prefix:upload:try $tryCount: ${localFile.remoteKey.key}") + + def logMultiPartUploadFinished(localFile: LocalFile) + (implicit c: Config): Unit = + log4(s"$prefix:upload:finished: ${localFile.remoteKey.key}") + + def logMultiPartUploadInitiate(localFile: LocalFile) + (implicit c: Config): Unit = + log5(s"$prefix:initiating: ${localFile.remoteKey.key}") + + def logMultiPartUploadPartsDetails(localFile: LocalFile, + nParts: Int, + partSize: Long) + (implicit c: Config): Unit = + log5(s"$prefix:parts $nParts:each $partSize: ${localFile.remoteKey.key}") + + def logMultiPartUploadPartDetails(localFile: LocalFile, + partNumber: Int, + partHash: MD5Hash) + (implicit c: Config): Unit = + log5(s"$prefix:part $partNumber:hash ${partHash.hash}: ${localFile.remoteKey.key}") + + def logMultiPartUploadPart(localFile: LocalFile, + partRequest: UploadPartRequest) + (implicit c: Config): Unit = + log5(s"$prefix:sending:part ${partRequest.getPartNumber}: ${partRequest.getMd5Digest}: ${localFile.remoteKey.key}") + + def logMultiPartUploadPartDone(localFile: LocalFile, + partRequest: UploadPartRequest, + result: UploadPartResult) + (implicit c: Config): Unit = + log5(s"$prefix:sent:part ${partRequest.getPartNumber}: ${result.getPartETag}: ${localFile.remoteKey.key}") + + def logMultiPartUploadPartError(localFile: LocalFile, + partRequest: UploadPartRequest, + error: AmazonS3Exception) + (implicit c: Config): Unit = { + val returnedMD5Hash = error.getAdditionalDetails.get("Content-MD5") + warn(s"$prefix:error:part ${partRequest.getPartNumber}:ret-hash $returnedMD5Hash: ${localFile.remoteKey.key}") + } + + def logMultiPartUploadCompleted(createUploadResponse: InitiateMultipartUploadResult, + uploadPartResponses: Stream[UploadPartResult], + localFile: LocalFile) + (implicit c: Config): Unit = + log4(s"$prefix:completed:parts ${uploadPartResponses.size}: ${localFile.remoteKey.key}") + + def logMultiPartUploadCancelling(localFile: LocalFile) + (implicit c: Config): Unit = + warn(s"$prefix:cancelling: ${localFile.remoteKey.key}") + + def logErrorRetrying(e: Throwable, localFile: LocalFile, tryCount: Int) + (implicit c: Config): Unit = + warn(s"$prefix:retry:error ${e.getMessage}: ${localFile.remoteKey.key}") + + def logErrorCancelling(e: Throwable, localFile: LocalFile) + (implicit c: Config) : Unit = + error(s"$prefix:cancelling:error ${e.getMessage}: ${localFile.remoteKey.key}") + + def logErrorUnknown(e: Throwable, localFile: LocalFile) + (implicit c: Config): Unit = + error(s"$prefix:unknown:error $e: ${localFile.remoteKey.key}") + +} diff --git a/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientObjectLister.scala b/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientObjectLister.scala new file mode 100644 index 0000000..bab10b2 --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientObjectLister.scala @@ -0,0 +1,38 @@ +package net.kemitix.s3thorp.awssdk + +import cats.effect.IO +import net.kemitix.s3thorp.{Bucket, Config, HashModified, LastModified, MD5Hash, RemoteKey} +import software.amazon.awssdk.services.s3.model.{ListObjectsV2Request, S3Object} +import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient +import scala.collection.JavaConverters._ + +private class S3ClientObjectLister(s3Client: S3CatsIOClient) + extends S3ClientLogging + with S3ObjectsByHash + with QuoteStripper { + + def listObjects(bucket: Bucket, + prefix: RemoteKey) + (implicit c: Config): IO[S3ObjectsData] = { + val request = ListObjectsV2Request.builder + .bucket(bucket.name) + .prefix(prefix.key).build + s3Client.listObjectsV2(request) + .bracket( + logListObjectsStart(bucket, prefix))( + logListObjectsFinish(bucket,prefix)) + .map(_.contents) + .map(_.asScala) + .map(_.toStream) + .map(os => S3ObjectsData(byHash(os), byKey(os))) + } + + private def byKey(os: Stream[S3Object]) = + os.map { o => { + val remoteKey = RemoteKey(o.key) + val hash = MD5Hash(o.eTag() filter stripQuotes) + val lastModified = LastModified(o.lastModified()) + (remoteKey, HashModified(hash, lastModified)) + }}.toMap + +} diff --git a/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientPutObjectUploader.scala b/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientPutObjectUploader.scala new file mode 100644 index 0000000..283922b --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientPutObjectUploader.scala @@ -0,0 +1,34 @@ +package net.kemitix.s3thorp.awssdk + +import cats.effect.IO +import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient +import net.kemitix.s3thorp.{Bucket, Config, LocalFile, MD5Hash, UploadS3Action} +import software.amazon.awssdk.core.async.AsyncRequestBody +import software.amazon.awssdk.services.s3.model.PutObjectRequest + +private class S3ClientPutObjectUploader(s3Client: S3CatsIOClient) + extends S3ClientUploader + with S3ClientLogging + with QuoteStripper { + + override def accepts(localFile: LocalFile)(implicit c: Config): Boolean = true + + override + def upload(localFile: LocalFile, + bucket: Bucket, + tryCount: Int) + (implicit c: Config): IO[UploadS3Action] = { + val request = PutObjectRequest.builder + .bucket(bucket.name) + .key(localFile.remoteKey.key).build + val body = AsyncRequestBody.fromFile(localFile.file) + s3Client.putObject(request, body) + .bracket( + logUploadStart(localFile, bucket))( + logUploadFinish(localFile, bucket)) + .map(_.eTag) + .map(_ filter stripQuotes) + .map(MD5Hash) + .map(UploadS3Action(localFile.remoteKey, _)) + } +} diff --git a/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientUploader.scala b/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientUploader.scala new file mode 100644 index 0000000..c9e6964 --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientUploader.scala @@ -0,0 +1,16 @@ +package net.kemitix.s3thorp.awssdk + +import cats.effect.IO +import net.kemitix.s3thorp.{Bucket, Config, LocalFile, S3Action} + +trait S3ClientUploader { + + def accepts(localFile: LocalFile) + (implicit c: Config): Boolean + + def upload(localFile: LocalFile, + bucket: Bucket, + tryCount: Int) + (implicit c: Config): IO[S3Action] + +} diff --git a/src/main/scala/net/kemitix/s3thorp/awssdk/ThorpS3Client.scala b/src/main/scala/net/kemitix/s3thorp/awssdk/ThorpS3Client.scala index dfa8a97..7df1b15 100644 --- a/src/main/scala/net/kemitix/s3thorp/awssdk/ThorpS3Client.scala +++ b/src/main/scala/net/kemitix/s3thorp/awssdk/ThorpS3Client.scala @@ -1,89 +1,49 @@ package net.kemitix.s3thorp.awssdk import cats.effect.IO +import com.amazonaws.services.s3.transfer.TransferManagerBuilder +import com.amazonaws.services.s3.{AmazonS3Client, AmazonS3ClientBuilder} import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient import net.kemitix.s3thorp._ -import software.amazon.awssdk.core.async.AsyncRequestBody import software.amazon.awssdk.services.s3.model.{Bucket => _, _} -import scala.collection.JavaConverters._ - private class ThorpS3Client(s3Client: S3CatsIOClient) extends S3Client - with S3ObjectsByHash - with S3ClientLogging { + with S3ClientLogging + with QuoteStripper { + + lazy val amazonS3Client = AmazonS3ClientBuilder.defaultClient + lazy val amazonS3TransferManager = TransferManagerBuilder.defaultTransferManager + lazy val objectLister = new S3ClientObjectLister(s3Client) + lazy val copier = new S3ClientCopier(s3Client) + lazy val uploader = new S3ClientPutObjectUploader(s3Client) + lazy val multiPartUploader = new S3ClientMultiPartTransferManager(amazonS3TransferManager) + lazy val deleter = new S3ClientDeleter(s3Client) override def listObjects(bucket: Bucket, prefix: RemoteKey) - (implicit c: Config): IO[S3ObjectsData] = { - val request = ListObjectsV2Request.builder - .bucket(bucket.name) - .prefix(prefix.key).build - s3Client.listObjectsV2(request) - .bracket( - logListObjectsStart(bucket, prefix))( - logListObjectsFinish(bucket,prefix)) - .map(_.contents) - .map(_.asScala) - .map(_.toStream) - .map(os => S3ObjectsData(byHash(os), byKey(os))) - } + (implicit c: Config): IO[S3ObjectsData] = + objectLister.listObjects(bucket, prefix) - private def byKey(os: Stream[S3Object]) = - os.map { o => { - val remoteKey = RemoteKey(o.key) - val hash = MD5Hash(o.eTag() filter stripQuotes) - val lastModified = LastModified(o.lastModified()) - (remoteKey, HashModified(hash, lastModified)) - }}.toMap - - override def upload(localFile: LocalFile, - bucket: Bucket) - (implicit c: Config): IO[UploadS3Action] = { - val request = PutObjectRequest.builder - .bucket(bucket.name) - .key(localFile.remoteKey.key).build - val body = AsyncRequestBody.fromFile(localFile.file) - s3Client.putObject(request, body) - .bracket( - logUploadStart(localFile, bucket))( - logUploadFinish(localFile, bucket)) - .map(_.eTag) - .map(_ filter stripQuotes) - .map(MD5Hash) - .map(UploadS3Action(localFile.remoteKey, _)) - } - - private def stripQuotes: Char => Boolean = _ != '"' override def copy(bucket: Bucket, sourceKey: RemoteKey, hash: MD5Hash, targetKey: RemoteKey) - (implicit c: Config): IO[CopyS3Action] = { - val request = CopyObjectRequest.builder - .bucket(bucket.name) - .copySource(s"${bucket.name}/${sourceKey.key}") - .copySourceIfMatch(hash.hash) - .key(targetKey.key).build - s3Client.copyObject(request) - .bracket( - logCopyStart(bucket, sourceKey, targetKey))( - logCopyFinish(bucket, sourceKey,targetKey)) - .map(_ => CopyS3Action(targetKey)) - } + (implicit c: Config): IO[CopyS3Action] = + copier.copy(bucket, sourceKey,hash, targetKey) + + + override def upload(localFile: LocalFile, + bucket: Bucket, + tryCount: Int) + (implicit c: Config): IO[S3Action] = + if (multiPartUploader.accepts(localFile)) multiPartUploader.upload(localFile, bucket, 1) + else uploader.upload(localFile, bucket, tryCount) override def delete(bucket: Bucket, remoteKey: RemoteKey) - (implicit c: Config): IO[DeleteS3Action] = { - val request = DeleteObjectRequest.builder - .bucket(bucket.name) - .key(remoteKey.key).build - s3Client.deleteObject(request) - .bracket( - logDeleteStart(bucket, remoteKey))( - logDeleteFinish(bucket, remoteKey)) - .map(_ => DeleteS3Action(remoteKey)) - } + (implicit c: Config): IO[DeleteS3Action] = + deleter.delete(bucket, remoteKey) } diff --git a/src/test/resources/net/kemitix/s3thorp/big-file b/src/test/resources/net/kemitix/s3thorp/big-file new file mode 100644 index 0000000..a8350a9 Binary files /dev/null and b/src/test/resources/net/kemitix/s3thorp/big-file differ diff --git a/src/test/resources/net/kemitix/s3thorp/small-file b/src/test/resources/net/kemitix/s3thorp/small-file new file mode 100644 index 0000000..b0abe58 Binary files /dev/null and b/src/test/resources/net/kemitix/s3thorp/small-file differ diff --git a/src/test/scala/net/kemitix/s3thorp/DummyS3Client.scala b/src/test/scala/net/kemitix/s3thorp/DummyS3Client.scala index 4bced54..768db94 100644 --- a/src/test/scala/net/kemitix/s3thorp/DummyS3Client.scala +++ b/src/test/scala/net/kemitix/s3thorp/DummyS3Client.scala @@ -6,7 +6,8 @@ import net.kemitix.s3thorp.awssdk.{S3ObjectsData, S3Client} trait DummyS3Client extends S3Client { override def upload(localFile: LocalFile, - bucket: Bucket + bucket: Bucket, + tryCount: Int )(implicit c: Config): IO[UploadS3Action] = ??? override def copy(bucket: Bucket, diff --git a/src/test/scala/net/kemitix/s3thorp/MD5HashGeneratorTest.scala b/src/test/scala/net/kemitix/s3thorp/MD5HashGeneratorTest.scala new file mode 100644 index 0000000..5636c37 --- /dev/null +++ b/src/test/scala/net/kemitix/s3thorp/MD5HashGeneratorTest.scala @@ -0,0 +1,58 @@ +package net.kemitix.s3thorp + +import java.nio.file.Files + +class MD5HashGeneratorTest extends UnitTest { + + private val source = Resource(this, "upload") + private val prefix = RemoteKey("prefix") + implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) + + new MD5HashGenerator { + describe("read a small file (smaller than buffer)") { + val file = Resource(this, "upload/root-file") + it("should generate the correct hash") { + val expected = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e") + val result = md5File(file) + assertResult(expected)(result) + } + } + describe("read a buffer") { + val file = Resource(this, "upload/root-file") + val buffer: Array[Byte] = Files.readAllBytes(file.toPath) + it("should generate the correct hash") { + val expected = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e") + val result = md5PartBody(buffer) + assertResult(expected)(result) + } + } + describe("read a large file (bigger than buffer)") { + val file = Resource(this, "big-file") + it("should generate the correct hash") { + val expected = MD5Hash("b1ab1f7680138e6db7309200584e35d8") + val result = md5File(file) + assertResult(expected)(result) + } + } + describe("read part of a file") { + val file = Resource(this, "big-file") + val halfFileLength = file.length / 2 + assertResult(file.length)(halfFileLength * 2) + describe("when starting at the beginning of the file") { + it("should generate the correct hash") { + val expected = MD5Hash("aadf0d266cefe0fcdb241a51798d74b3") + val result = md5FilePart(file, 0, halfFileLength) + assertResult(expected)(result) + } + } + describe("when starting in the middle of the file") { + it("should generate the correct hash") { + val expected = MD5Hash("16e08d53ca36e729d808fd5e4f7e35dc") + val result = md5FilePart(file, halfFileLength, halfFileLength) + assertResult(expected)(result) + } + } + } + } + +} diff --git a/src/test/scala/net/kemitix/s3thorp/SyncSuite.scala b/src/test/scala/net/kemitix/s3thorp/SyncSuite.scala index f0399d2..723d9e8 100644 --- a/src/test/scala/net/kemitix/s3thorp/SyncSuite.scala +++ b/src/test/scala/net/kemitix/s3thorp/SyncSuite.scala @@ -29,14 +29,17 @@ class SyncSuite val md5Hash = MD5Hash("the-hash") val testLocalFile = aLocalFile("file", md5Hash, source, generateKey(source, prefix)) val sync = new Sync(new S3Client with DummyS3Client { - override def upload(localFile: LocalFile, bucket: Bucket)(implicit c: Config) = IO { + override def upload(localFile: LocalFile, + bucket: Bucket, + tryCount: Int) + (implicit c: Config) = IO { assert(bucket == testBucket) UploadS3Action(localFile.remoteKey, md5Hash) } }) it("delegates unmodified to the S3Client") { assertResult(UploadS3Action(RemoteKey(prefix.key + "/file"), md5Hash))( - sync.upload(testLocalFile, testBucket). + sync.upload(testLocalFile, testBucket, 1). unsafeRunSync()) } } @@ -177,7 +180,8 @@ class SyncSuite override def listObjects(bucket: Bucket, prefix: RemoteKey)(implicit c: Config) = IO {s3ObjectsData} override def upload(localFile: LocalFile, - bucket: Bucket + bucket: Bucket, + tryCount: Int )(implicit c: Config) = IO { if (bucket == testBucket) uploadsRecord += (localFile.relative.toString -> localFile.remoteKey) diff --git a/src/test/scala/net/kemitix/s3thorp/awssdk/MyAmazonS3Client.scala b/src/test/scala/net/kemitix/s3thorp/awssdk/MyAmazonS3Client.scala new file mode 100644 index 0000000..061cb8b --- /dev/null +++ b/src/test/scala/net/kemitix/s3thorp/awssdk/MyAmazonS3Client.scala @@ -0,0 +1,375 @@ +package net.kemitix.s3thorp.awssdk + +import java.io.{File, InputStream} +import java.net.URL +import java.util +import java.util.Date + +import com.amazonaws.{AmazonWebServiceRequest, HttpMethod} +import com.amazonaws.regions.Region +import com.amazonaws.services.s3.model.analytics.AnalyticsConfiguration +import com.amazonaws.services.s3.model.inventory.InventoryConfiguration +import com.amazonaws.services.s3.{AmazonS3, S3ClientOptions, S3ResponseMetadata, model} +import com.amazonaws.services.s3.model.metrics.MetricsConfiguration +import com.amazonaws.services.s3.model._ +import com.amazonaws.services.s3.waiters.AmazonS3Waiters + +class MyAmazonS3Client extends AmazonS3 { + override def setEndpoint(endpoint: String): Unit = ??? + + override def setRegion(region: Region): Unit = ??? + + override def setS3ClientOptions(clientOptions: S3ClientOptions): Unit = ??? + + override def changeObjectStorageClass(bucketName: String, key: String, newStorageClass: StorageClass): Unit = ??? + + override def setObjectRedirectLocation(bucketName: String, key: String, newRedirectLocation: String): Unit = ??? + + override def listObjects(bucketName: String): ObjectListing = ??? + + override def listObjects(bucketName: String, prefix: String): ObjectListing = ??? + + override def listObjects(listObjectsRequest: ListObjectsRequest): ObjectListing = ??? + + override def listObjectsV2(bucketName: String): ListObjectsV2Result = ??? + + override def listObjectsV2(bucketName: String, prefix: String): ListObjectsV2Result = ??? + + override def listObjectsV2(listObjectsV2Request: ListObjectsV2Request): ListObjectsV2Result = ??? + + override def listNextBatchOfObjects(previousObjectListing: ObjectListing): ObjectListing = ??? + + override def listNextBatchOfObjects(listNextBatchOfObjectsRequest: ListNextBatchOfObjectsRequest): ObjectListing = ??? + + override def listVersions(bucketName: String, prefix: String): VersionListing = ??? + + override def listNextBatchOfVersions(previousVersionListing: VersionListing): VersionListing = ??? + + override def listNextBatchOfVersions(listNextBatchOfVersionsRequest: ListNextBatchOfVersionsRequest): VersionListing = ??? + + override def listVersions(bucketName: String, prefix: String, keyMarker: String, versionIdMarker: String, delimiter: String, maxResults: Integer): VersionListing = ??? + + override def listVersions(listVersionsRequest: ListVersionsRequest): VersionListing = ??? + + override def getS3AccountOwner: Owner = ??? + + override def getS3AccountOwner(getS3AccountOwnerRequest: GetS3AccountOwnerRequest): Owner = ??? + + override def doesBucketExist(bucketName: String): Boolean = ??? + + override def doesBucketExistV2(bucketName: String): Boolean = ??? + + override def headBucket(headBucketRequest: HeadBucketRequest): HeadBucketResult = ??? + + override def listBuckets(): util.List[Bucket] = ??? + + override def listBuckets(listBucketsRequest: ListBucketsRequest): util.List[Bucket] = ??? + + override def getBucketLocation(bucketName: String): String = ??? + + override def getBucketLocation(getBucketLocationRequest: GetBucketLocationRequest): String = ??? + + override def createBucket(createBucketRequest: CreateBucketRequest): Bucket = ??? + + override def createBucket(bucketName: String): Bucket = ??? + + override def createBucket(bucketName: String, region: model.Region): Bucket = ??? + + override def createBucket(bucketName: String, region: String): Bucket = ??? + + override def getObjectAcl(bucketName: String, key: String): AccessControlList = ??? + + override def getObjectAcl(bucketName: String, key: String, versionId: String): AccessControlList = ??? + + override def getObjectAcl(getObjectAclRequest: GetObjectAclRequest): AccessControlList = ??? + + override def setObjectAcl(bucketName: String, key: String, acl: AccessControlList): Unit = ??? + + override def setObjectAcl(bucketName: String, key: String, acl: CannedAccessControlList): Unit = ??? + + override def setObjectAcl(bucketName: String, key: String, versionId: String, acl: AccessControlList): Unit = ??? + + override def setObjectAcl(bucketName: String, key: String, versionId: String, acl: CannedAccessControlList): Unit = ??? + + override def setObjectAcl(setObjectAclRequest: SetObjectAclRequest): Unit = ??? + + override def getBucketAcl(bucketName: String): AccessControlList = ??? + + override def setBucketAcl(setBucketAclRequest: SetBucketAclRequest): Unit = ??? + + override def getBucketAcl(getBucketAclRequest: GetBucketAclRequest): AccessControlList = ??? + + override def setBucketAcl(bucketName: String, acl: AccessControlList): Unit = ??? + + override def setBucketAcl(bucketName: String, acl: CannedAccessControlList): Unit = ??? + + override def getObjectMetadata(bucketName: String, key: String): ObjectMetadata = ??? + + override def getObjectMetadata(getObjectMetadataRequest: GetObjectMetadataRequest): ObjectMetadata = ??? + + override def getObject(bucketName: String, key: String): S3Object = ??? + + override def getObject(getObjectRequest: GetObjectRequest): S3Object = ??? + + override def getObject(getObjectRequest: GetObjectRequest, destinationFile: File): ObjectMetadata = ??? + + override def getObjectAsString(bucketName: String, key: String): String = ??? + + override def getObjectTagging(getObjectTaggingRequest: GetObjectTaggingRequest): GetObjectTaggingResult = ??? + + override def setObjectTagging(setObjectTaggingRequest: SetObjectTaggingRequest): SetObjectTaggingResult = ??? + + override def deleteObjectTagging(deleteObjectTaggingRequest: DeleteObjectTaggingRequest): DeleteObjectTaggingResult = ??? + + override def deleteBucket(deleteBucketRequest: DeleteBucketRequest): Unit = ??? + + override def deleteBucket(bucketName: String): Unit = ??? + + override def putObject(putObjectRequest: PutObjectRequest): PutObjectResult = ??? + + override def putObject(bucketName: String, key: String, file: File): PutObjectResult = ??? + + override def putObject(bucketName: String, key: String, input: InputStream, metadata: ObjectMetadata): PutObjectResult = ??? + + override def putObject(bucketName: String, key: String, content: String): PutObjectResult = ??? + + override def copyObject(sourceBucketName: String, sourceKey: String, destinationBucketName: String, destinationKey: String): CopyObjectResult = ??? + + override def copyObject(copyObjectRequest: CopyObjectRequest): CopyObjectResult = ??? + + override def copyPart(copyPartRequest: CopyPartRequest): CopyPartResult = ??? + + override def deleteObject(bucketName: String, key: String): Unit = ??? + + override def deleteObject(deleteObjectRequest: DeleteObjectRequest): Unit = ??? + + override def deleteObjects(deleteObjectsRequest: DeleteObjectsRequest): DeleteObjectsResult = ??? + + override def deleteVersion(bucketName: String, key: String, versionId: String): Unit = ??? + + override def deleteVersion(deleteVersionRequest: DeleteVersionRequest): Unit = ??? + + override def getBucketLoggingConfiguration(bucketName: String): BucketLoggingConfiguration = ??? + + override def getBucketLoggingConfiguration(getBucketLoggingConfigurationRequest: GetBucketLoggingConfigurationRequest): BucketLoggingConfiguration = ??? + + override def setBucketLoggingConfiguration(setBucketLoggingConfigurationRequest: SetBucketLoggingConfigurationRequest): Unit = ??? + + override def getBucketVersioningConfiguration(bucketName: String): BucketVersioningConfiguration = ??? + + override def getBucketVersioningConfiguration(getBucketVersioningConfigurationRequest: GetBucketVersioningConfigurationRequest): BucketVersioningConfiguration = ??? + + override def setBucketVersioningConfiguration(setBucketVersioningConfigurationRequest: SetBucketVersioningConfigurationRequest): Unit = ??? + + override def getBucketLifecycleConfiguration(bucketName: String): BucketLifecycleConfiguration = ??? + + override def getBucketLifecycleConfiguration(getBucketLifecycleConfigurationRequest: GetBucketLifecycleConfigurationRequest): BucketLifecycleConfiguration = ??? + + override def setBucketLifecycleConfiguration(bucketName: String, bucketLifecycleConfiguration: BucketLifecycleConfiguration): Unit = ??? + + override def setBucketLifecycleConfiguration(setBucketLifecycleConfigurationRequest: SetBucketLifecycleConfigurationRequest): Unit = ??? + + override def deleteBucketLifecycleConfiguration(bucketName: String): Unit = ??? + + override def deleteBucketLifecycleConfiguration(deleteBucketLifecycleConfigurationRequest: DeleteBucketLifecycleConfigurationRequest): Unit = ??? + + override def getBucketCrossOriginConfiguration(bucketName: String): BucketCrossOriginConfiguration = ??? + + override def getBucketCrossOriginConfiguration(getBucketCrossOriginConfigurationRequest: GetBucketCrossOriginConfigurationRequest): BucketCrossOriginConfiguration = ??? + + override def setBucketCrossOriginConfiguration(bucketName: String, bucketCrossOriginConfiguration: BucketCrossOriginConfiguration): Unit = ??? + + override def setBucketCrossOriginConfiguration(setBucketCrossOriginConfigurationRequest: SetBucketCrossOriginConfigurationRequest): Unit = ??? + + override def deleteBucketCrossOriginConfiguration(bucketName: String): Unit = ??? + + override def deleteBucketCrossOriginConfiguration(deleteBucketCrossOriginConfigurationRequest: DeleteBucketCrossOriginConfigurationRequest): Unit = ??? + + override def getBucketTaggingConfiguration(bucketName: String): BucketTaggingConfiguration = ??? + + override def getBucketTaggingConfiguration(getBucketTaggingConfigurationRequest: GetBucketTaggingConfigurationRequest): BucketTaggingConfiguration = ??? + + override def setBucketTaggingConfiguration(bucketName: String, bucketTaggingConfiguration: BucketTaggingConfiguration): Unit = ??? + + override def setBucketTaggingConfiguration(setBucketTaggingConfigurationRequest: SetBucketTaggingConfigurationRequest): Unit = ??? + + override def deleteBucketTaggingConfiguration(bucketName: String): Unit = ??? + + override def deleteBucketTaggingConfiguration(deleteBucketTaggingConfigurationRequest: DeleteBucketTaggingConfigurationRequest): Unit = ??? + + override def getBucketNotificationConfiguration(bucketName: String): BucketNotificationConfiguration = ??? + + override def getBucketNotificationConfiguration(getBucketNotificationConfigurationRequest: GetBucketNotificationConfigurationRequest): BucketNotificationConfiguration = ??? + + override def setBucketNotificationConfiguration(setBucketNotificationConfigurationRequest: SetBucketNotificationConfigurationRequest): Unit = ??? + + override def setBucketNotificationConfiguration(bucketName: String, bucketNotificationConfiguration: BucketNotificationConfiguration): Unit = ??? + + override def getBucketWebsiteConfiguration(bucketName: String): BucketWebsiteConfiguration = ??? + + override def getBucketWebsiteConfiguration(getBucketWebsiteConfigurationRequest: GetBucketWebsiteConfigurationRequest): BucketWebsiteConfiguration = ??? + + override def setBucketWebsiteConfiguration(bucketName: String, configuration: BucketWebsiteConfiguration): Unit = ??? + + override def setBucketWebsiteConfiguration(setBucketWebsiteConfigurationRequest: SetBucketWebsiteConfigurationRequest): Unit = ??? + + override def deleteBucketWebsiteConfiguration(bucketName: String): Unit = ??? + + override def deleteBucketWebsiteConfiguration(deleteBucketWebsiteConfigurationRequest: DeleteBucketWebsiteConfigurationRequest): Unit = ??? + + override def getBucketPolicy(bucketName: String): BucketPolicy = ??? + + override def getBucketPolicy(getBucketPolicyRequest: GetBucketPolicyRequest): BucketPolicy = ??? + + override def setBucketPolicy(bucketName: String, policyText: String): Unit = ??? + + override def setBucketPolicy(setBucketPolicyRequest: SetBucketPolicyRequest): Unit = ??? + + override def deleteBucketPolicy(bucketName: String): Unit = ??? + + override def deleteBucketPolicy(deleteBucketPolicyRequest: DeleteBucketPolicyRequest): Unit = ??? + + override def generatePresignedUrl(bucketName: String, key: String, expiration: Date): URL = ??? + + override def generatePresignedUrl(bucketName: String, key: String, expiration: Date, method: HttpMethod): URL = ??? + + override def generatePresignedUrl(generatePresignedUrlRequest: GeneratePresignedUrlRequest): URL = ??? + + override def initiateMultipartUpload(request: InitiateMultipartUploadRequest): InitiateMultipartUploadResult = ??? + + override def uploadPart(request: UploadPartRequest): UploadPartResult = ??? + + override def listParts(request: ListPartsRequest): PartListing = ??? + + override def abortMultipartUpload(request: AbortMultipartUploadRequest): Unit = ??? + + override def completeMultipartUpload(request: CompleteMultipartUploadRequest): CompleteMultipartUploadResult = ??? + + override def listMultipartUploads(request: ListMultipartUploadsRequest): MultipartUploadListing = ??? + + override def getCachedResponseMetadata(request: AmazonWebServiceRequest): S3ResponseMetadata = ??? + + override def restoreObject(request: RestoreObjectRequest): Unit = ??? + + override def restoreObjectV2(request: RestoreObjectRequest): RestoreObjectResult = ??? + + override def restoreObject(bucketName: String, key: String, expirationInDays: Int): Unit = ??? + + override def enableRequesterPays(bucketName: String): Unit = ??? + + override def disableRequesterPays(bucketName: String): Unit = ??? + + override def isRequesterPaysEnabled(bucketName: String): Boolean = ??? + + override def setBucketReplicationConfiguration(bucketName: String, configuration: BucketReplicationConfiguration): Unit = ??? + + override def setBucketReplicationConfiguration(setBucketReplicationConfigurationRequest: SetBucketReplicationConfigurationRequest): Unit = ??? + + override def getBucketReplicationConfiguration(bucketName: String): BucketReplicationConfiguration = ??? + + override def getBucketReplicationConfiguration(getBucketReplicationConfigurationRequest: GetBucketReplicationConfigurationRequest): BucketReplicationConfiguration = ??? + + override def deleteBucketReplicationConfiguration(bucketName: String): Unit = ??? + + override def deleteBucketReplicationConfiguration(request: DeleteBucketReplicationConfigurationRequest): Unit = ??? + + override def doesObjectExist(bucketName: String, objectName: String): Boolean = ??? + + override def getBucketAccelerateConfiguration(bucketName: String): BucketAccelerateConfiguration = ??? + + override def getBucketAccelerateConfiguration(getBucketAccelerateConfigurationRequest: GetBucketAccelerateConfigurationRequest): BucketAccelerateConfiguration = ??? + + override def setBucketAccelerateConfiguration(bucketName: String, accelerateConfiguration: BucketAccelerateConfiguration): Unit = ??? + + override def setBucketAccelerateConfiguration(setBucketAccelerateConfigurationRequest: SetBucketAccelerateConfigurationRequest): Unit = ??? + + override def deleteBucketMetricsConfiguration(bucketName: String, id: String): DeleteBucketMetricsConfigurationResult = ??? + + override def deleteBucketMetricsConfiguration(deleteBucketMetricsConfigurationRequest: DeleteBucketMetricsConfigurationRequest): DeleteBucketMetricsConfigurationResult = ??? + + override def getBucketMetricsConfiguration(bucketName: String, id: String): GetBucketMetricsConfigurationResult = ??? + + override def getBucketMetricsConfiguration(getBucketMetricsConfigurationRequest: GetBucketMetricsConfigurationRequest): GetBucketMetricsConfigurationResult = ??? + + override def setBucketMetricsConfiguration(bucketName: String, metricsConfiguration: MetricsConfiguration): SetBucketMetricsConfigurationResult = ??? + + override def setBucketMetricsConfiguration(setBucketMetricsConfigurationRequest: SetBucketMetricsConfigurationRequest): SetBucketMetricsConfigurationResult = ??? + + override def listBucketMetricsConfigurations(listBucketMetricsConfigurationsRequest: ListBucketMetricsConfigurationsRequest): ListBucketMetricsConfigurationsResult = ??? + + override def deleteBucketAnalyticsConfiguration(bucketName: String, id: String): DeleteBucketAnalyticsConfigurationResult = ??? + + override def deleteBucketAnalyticsConfiguration(deleteBucketAnalyticsConfigurationRequest: DeleteBucketAnalyticsConfigurationRequest): DeleteBucketAnalyticsConfigurationResult = ??? + + override def getBucketAnalyticsConfiguration(bucketName: String, id: String): GetBucketAnalyticsConfigurationResult = ??? + + override def getBucketAnalyticsConfiguration(getBucketAnalyticsConfigurationRequest: GetBucketAnalyticsConfigurationRequest): GetBucketAnalyticsConfigurationResult = ??? + + override def setBucketAnalyticsConfiguration(bucketName: String, analyticsConfiguration: AnalyticsConfiguration): SetBucketAnalyticsConfigurationResult = ??? + + override def setBucketAnalyticsConfiguration(setBucketAnalyticsConfigurationRequest: SetBucketAnalyticsConfigurationRequest): SetBucketAnalyticsConfigurationResult = ??? + + override def listBucketAnalyticsConfigurations(listBucketAnalyticsConfigurationsRequest: ListBucketAnalyticsConfigurationsRequest): ListBucketAnalyticsConfigurationsResult = ??? + + override def deleteBucketInventoryConfiguration(bucketName: String, id: String): DeleteBucketInventoryConfigurationResult = ??? + + override def deleteBucketInventoryConfiguration(deleteBucketInventoryConfigurationRequest: DeleteBucketInventoryConfigurationRequest): DeleteBucketInventoryConfigurationResult = ??? + + override def getBucketInventoryConfiguration(bucketName: String, id: String): GetBucketInventoryConfigurationResult = ??? + + override def getBucketInventoryConfiguration(getBucketInventoryConfigurationRequest: GetBucketInventoryConfigurationRequest): GetBucketInventoryConfigurationResult = ??? + + override def setBucketInventoryConfiguration(bucketName: String, inventoryConfiguration: InventoryConfiguration): SetBucketInventoryConfigurationResult = ??? + + override def setBucketInventoryConfiguration(setBucketInventoryConfigurationRequest: SetBucketInventoryConfigurationRequest): SetBucketInventoryConfigurationResult = ??? + + override def listBucketInventoryConfigurations(listBucketInventoryConfigurationsRequest: ListBucketInventoryConfigurationsRequest): ListBucketInventoryConfigurationsResult = ??? + + override def deleteBucketEncryption(bucketName: String): DeleteBucketEncryptionResult = ??? + + override def deleteBucketEncryption(request: DeleteBucketEncryptionRequest): DeleteBucketEncryptionResult = ??? + + override def getBucketEncryption(bucketName: String): GetBucketEncryptionResult = ??? + + override def getBucketEncryption(request: GetBucketEncryptionRequest): GetBucketEncryptionResult = ??? + + override def setBucketEncryption(setBucketEncryptionRequest: SetBucketEncryptionRequest): SetBucketEncryptionResult = ??? + + override def setPublicAccessBlock(request: SetPublicAccessBlockRequest): SetPublicAccessBlockResult = ??? + + override def getPublicAccessBlock(request: GetPublicAccessBlockRequest): GetPublicAccessBlockResult = ??? + + override def deletePublicAccessBlock(request: DeletePublicAccessBlockRequest): DeletePublicAccessBlockResult = ??? + + override def getBucketPolicyStatus(request: GetBucketPolicyStatusRequest): GetBucketPolicyStatusResult = ??? + + override def selectObjectContent(selectRequest: SelectObjectContentRequest): SelectObjectContentResult = ??? + + override def setObjectLegalHold(setObjectLegalHoldRequest: SetObjectLegalHoldRequest): SetObjectLegalHoldResult = ??? + + override def getObjectLegalHold(getObjectLegalHoldRequest: GetObjectLegalHoldRequest): GetObjectLegalHoldResult = ??? + + override def setObjectLockConfiguration(setObjectLockConfigurationRequest: SetObjectLockConfigurationRequest): SetObjectLockConfigurationResult = ??? + + override def getObjectLockConfiguration(getObjectLockConfigurationRequest: GetObjectLockConfigurationRequest): GetObjectLockConfigurationResult = ??? + + override def setObjectRetention(setObjectRetentionRequest: SetObjectRetentionRequest): SetObjectRetentionResult = ??? + + override def getObjectRetention(getObjectRetentionRequest: GetObjectRetentionRequest): GetObjectRetentionResult = ??? + + override def download(presignedUrlDownloadRequest: PresignedUrlDownloadRequest): PresignedUrlDownloadResult = ??? + + override def download(presignedUrlDownloadRequest: PresignedUrlDownloadRequest, destinationFile: File): Unit = ??? + + override def upload(presignedUrlUploadRequest: PresignedUrlUploadRequest): PresignedUrlUploadResult = ??? + + override def shutdown(): Unit = ??? + + override def getRegion: model.Region = ??? + + override def getRegionName: String = ??? + + override def getUrl(bucketName: String, key: String): URL = ??? + + override def waiters(): AmazonS3Waiters = ??? +} diff --git a/src/test/scala/net/kemitix/s3thorp/awssdk/MyS3CatsIOClient.scala b/src/test/scala/net/kemitix/s3thorp/awssdk/MyS3CatsIOClient.scala new file mode 100644 index 0000000..d478fad --- /dev/null +++ b/src/test/scala/net/kemitix/s3thorp/awssdk/MyS3CatsIOClient.scala @@ -0,0 +1,15 @@ +package net.kemitix.s3thorp.awssdk + +import com.github.j5ik2o.reactive.aws.s3.S3AsyncClient +import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient +import software.amazon.awssdk.services.s3 + +trait MyS3CatsIOClient extends S3CatsIOClient { + override val underlying: S3AsyncClient = new S3AsyncClient { + override val underlying: s3.S3AsyncClient = new s3.S3AsyncClient { + override def serviceName(): String = "fake-s3-client" + + override def close(): Unit = () + } + } +} diff --git a/src/test/scala/net/kemitix/s3thorp/awssdk/S3ClientMultiPartTransferManagerSuite.scala b/src/test/scala/net/kemitix/s3thorp/awssdk/S3ClientMultiPartTransferManagerSuite.scala new file mode 100644 index 0000000..db904c8 --- /dev/null +++ b/src/test/scala/net/kemitix/s3thorp/awssdk/S3ClientMultiPartTransferManagerSuite.scala @@ -0,0 +1,112 @@ +package net.kemitix.s3thorp.awssdk + +import java.io.File +import java.time.Instant + +import com.amazonaws.AmazonClientException +import com.amazonaws.services.s3.model +import com.amazonaws.services.s3.transfer.model.UploadResult +import com.amazonaws.services.s3.transfer.{PauseResult, PersistableUpload, Transfer, TransferManager, TransferProgress, Upload} +import net.kemitix.s3thorp.{Bucket, Config, KeyGenerator, LastModified, MD5Hash, MD5HashGenerator, RemoteKey, Resource, UnitTest, UploadS3Action} + +class S3ClientMultiPartTransferManagerSuite + extends UnitTest + with KeyGenerator { + + private val source = Resource(this, "..") + private val prefix = RemoteKey("prefix") + implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) + private val fileToKey = generateKey(config.source, config.prefix) _ + val lastModified = LastModified(Instant.now()) + + describe("S3ClientMultiPartTransferManagerSuite") { + describe("accepts") { + val transferManager = new MyTransferManager(("", "", new File("")), RemoteKey(""), MD5Hash("")) + val uploader = new S3ClientMultiPartTransferManager(transferManager) + describe("small-file") { + val smallFile = aLocalFile("small-file", MD5Hash("the-hash"), source, fileToKey) + it("should be a small-file") { + assert(smallFile.file.length < 5 * 1024 * 1024) + } + it("should not accept small-file") { + assertResult(false)(uploader.accepts(smallFile)) + } + } + describe("big-file") { + val bigFile = aLocalFile("big-file", MD5Hash("the-hash"), source, fileToKey) + it("should be a big-file") { + assert(bigFile.file.length > 5 * 1024 * 1024) + } + it("should accept big-file") { + assertResult(true)(uploader.accepts(bigFile)) + } + } + } + describe("upload") { + val returnedKey = RemoteKey("returned-key") + val returnedHash = MD5Hash("returned-hash") + val bigFile = aLocalFile("small-file", MD5Hash("the-hash"), source, fileToKey) + val transferManager = new MyTransferManager( + (config.bucket.name, bigFile.remoteKey.key, bigFile.file), + returnedKey, returnedHash) + val uploader = new S3ClientMultiPartTransferManager(transferManager) + it("should upload") { + val expected = UploadS3Action(returnedKey, returnedHash) + val result = uploader.upload(bigFile, config.bucket, 1).unsafeRunSync + assertResult(expected)(result) + } + } + } + + class MyTransferManager(signature: (String, String, File), + returnedKey: RemoteKey, + returnedHash: MD5Hash) extends TransferManager { + override def upload(bucketName: String, key: String, file: File): Upload = { + if ((bucketName, key, file) == signature) { + new MyUpload { + override def waitForUploadResult(): UploadResult = { + val result = new UploadResult() + result.setBucketName(bucketName) + result.setETag(returnedHash.hash) + result.setKey(returnedKey.key) + result.setVersionId("version-id") + result + } + } + } else new MyUpload + } + } + class MyUpload extends Upload { + + override def waitForUploadResult(): UploadResult = ??? + + override def pause(): PersistableUpload = ??? + + override def tryPause(forceCancelTransfers: Boolean): PauseResult[PersistableUpload] = ??? + + override def abort(): Unit = ??? + + override def isDone: Boolean = ??? + + override def waitForCompletion(): Unit = ??? + + override def waitForException(): AmazonClientException = ??? + + override def getDescription: String = ??? + + override def getState: Transfer.TransferState = ??? + + override def getProgress: TransferProgress = ??? + + import com.amazonaws.event.ProgressListener + + override def addProgressListener(listener: ProgressListener): Unit = ??? + + override def removeProgressListener(listener: ProgressListener): Unit = ??? + + override def addProgressListener(listener: model.ProgressListener): Unit = ??? + + override def removeProgressListener(listener: model.ProgressListener): Unit = ??? + } + +} diff --git a/src/test/scala/net/kemitix/s3thorp/awssdk/S3ClientMultiPartUploaderSuite.scala b/src/test/scala/net/kemitix/s3thorp/awssdk/S3ClientMultiPartUploaderSuite.scala new file mode 100644 index 0000000..2f30b91 --- /dev/null +++ b/src/test/scala/net/kemitix/s3thorp/awssdk/S3ClientMultiPartUploaderSuite.scala @@ -0,0 +1,279 @@ +package net.kemitix.s3thorp.awssdk + +import scala.collection.JavaConverters._ +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference} + +import com.amazonaws.services.s3.model.{Bucket => _, _} +import net.kemitix.s3thorp._ + +class S3ClientMultiPartUploaderSuite + extends UnitTest + with KeyGenerator { + + private val source = Resource(this, "..") + private val prefix = RemoteKey("prefix") + implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) + private val fileToKey = generateKey(config.source, config.prefix) _ + + describe("multi-part uploader accepts") { + val uploader = new S3ClientMultiPartUploader(new MyAmazonS3Client {}) + + it("should reject small file") { + // small-file: dd if=/dev/urandom of=src/test/resources/net/kemitix/s3thorp/small-file bs=1047552 count=5 + // 1047552 = 1024 * 1023 + // file size 5kb under 5Mb threshold + val smallFile = aLocalFile("small-file", MD5Hash(""), source, fileToKey) + assert(smallFile.file.exists, "sample small file is missing") + assert(smallFile.file.length == 5 * 1024 * 1023, "sample small file is wrong size") + val result = uploader.accepts(smallFile) + assertResult(false)(result) + } + it("should accept big file") { + // big-file: dd if=/dev/urandom of=src/test/resources/net/kemitix/s3thorp/big-file bs=1049600 count=5 + // 1049600 = 1024 * 1025 + // file size 5kb over 5Mb threshold + val bigFile = aLocalFile("big-file", MD5Hash(""), source, fileToKey) + assert(bigFile.file.exists, "sample big file is missing") + assert(bigFile.file.length == 5 * 1024 * 1025, "sample big file is wrong size") + val result = uploader.accepts(bigFile) + assertResult(true)(result) + } + } + + def uploadPartRequest(partNumber: Int) = { + val request = new UploadPartRequest + request.setPartNumber(partNumber) + request + } + + def uploadPartResult(eTag: String) = { + val result = new UploadPartResult + result.setETag(eTag) + result + } + + describe("mulit-part uploader upload") { + val theFile = aLocalFile("big-file", MD5Hash(""), source, fileToKey) + val uploadId = "upload-id" + val createUploadResponse = new InitiateMultipartUploadResult() + createUploadResponse.setBucketName(config.bucket.name) + createUploadResponse.setKey(theFile.remoteKey.key) + createUploadResponse.setUploadId(uploadId) + val uploadPartRequest1 = uploadPartRequest(1) + val uploadPartRequest2 = uploadPartRequest(2) + val uploadPartRequest3 = uploadPartRequest(3) + val part1md5 = "aadf0d266cefe0fcdb241a51798d74b3" + val part2md5 = "16e08d53ca36e729d808fd5e4f7e35dc" + val uploadPartResponse1 = uploadPartResult(part1md5) + val uploadPartResponse2 = uploadPartResult(part2md5) + val uploadPartResponse3 = uploadPartResult("part-3") + val completeUploadResponse = new CompleteMultipartUploadResult() + completeUploadResponse.setETag("hash") + describe("multi-part uploader upload components") { + val uploader = new RecordingMultiPartUploader() + describe("create upload request") { + val request = uploader.createUploadRequest(config.bucket, theFile) + it("should have bucket") { + assertResult(config.bucket.name)(request.getBucketName) + } + it("should have key") { + assertResult(theFile.remoteKey.key)(request.getKey) + } + } + describe("initiate upload") { + it("should createMultipartUpload") { + val expected = createUploadResponse + val result = uploader.createUpload(config.bucket, theFile).unsafeRunSync + assertResult(expected)(result) + } + } + describe("create UploadPartRequests for file") { + val chunkSize = 5l * 1024 * 1025 / 2 + // to create expected md5 values for each chunk: + // split -d -b $((5 * 1024 * 1025 / 2)) big-file + // creates x00 and x01 + // md5sum x0[01] + val result = uploader.parts(theFile, createUploadResponse).unsafeRunSync.toList + it("should create two parts") { + assertResult(2)(result.size) + } + it("create part 1") { + val part1 = result(0) + assertResult((1, chunkSize, part1md5))((part1.getPartNumber, part1.getPartSize, part1.getMd5Digest)) + } + it("create part 2") { + val part2 = result(1) + assertResult((2, chunkSize, part2md5))((part2.getPartNumber, part2.getPartSize, part2.getMd5Digest)) + } + } + describe("upload part") { + it("should uploadPart") { + val expected = uploadPartResponse3 + val result = uploader.uploadPart(theFile)(config)(uploadPartRequest3).unsafeRunSync + assertResult(expected)(result) + } + } + describe("upload parts") { + val uploadPartRequests = Stream(uploadPartRequest1, uploadPartRequest2) + it("should uploadPart for each") { + val expected = List(uploadPartResponse1, uploadPartResponse2) + val result = uploader.uploadParts(theFile, uploadPartRequests).unsafeRunSync.toList + assertResult(expected)(result) + } + } + describe("create complete request") { + val request = uploader.createCompleteRequest(createUploadResponse, List(uploadPartResponse1, uploadPartResponse2)) + it("should have the bucket name") { + assertResult(config.bucket.name)(request.getBucketName) + } + it("should have the key") { + assertResult(theFile.remoteKey.key)(request.getKey) + } + it("should have the upload id") { + assertResult(uploadId)(request.getUploadId) + } + it("should have the etags") { + val expected = List(new PartETag(1, part1md5), new PartETag(2, part2md5)) + assertResult(expected.map(_.getETag))(request.getPartETags.asScala.map(_.getETag)) + } + } + describe("complete upload") { + val uploadPartResponses = Stream(uploadPartResponse1, uploadPartResponse2, uploadPartResponse3) + it("should completeUpload") { + val expected = completeUploadResponse + val result = uploader.completeUpload(createUploadResponse, uploadPartResponses, theFile).unsafeRunSync + assertResult(expected)(result) + } + } + describe("create abort request") { + val abortRequest = uploader.createAbortRequest(uploadId, theFile) + it("should have the upload id") { + assertResult(uploadId)(abortRequest.getUploadId) + } + it("should have the bucket") { + assertResult(config.bucket.name)(abortRequest.getBucketName) + } + it("should have the key") { + assertResult(theFile.remoteKey.key)(abortRequest.getKey) + } + } + describe("abort upload") { + it("should abortUpload") { + pending + } + } + } + describe("multi-part uploader upload complete") { + describe("upload") { + describe("when all okay") { + val uploader = new RecordingMultiPartUploader() + uploader.upload(theFile, config.bucket, 1).unsafeRunSync + it("should initiate the upload") { + assert(uploader.initiated.get) + } + it("should upload both parts") { + assertResult(Set(1, 2))(uploader.partsUploaded.get) + } + it("should complete the upload") { + assert(uploader.completed.get) + } + } + describe("when initiate upload fails") { + val uploader = new RecordingMultiPartUploader(initOkay = false) + uploader.upload(theFile, config.bucket, 1).unsafeRunSync + it("should not upload any parts") { + assertResult(Set())(uploader.partsUploaded.get) + } + it("should not complete the upload") { + assertResult(false)(uploader.completed.get) + } + } + describe("when uploading a part fails once") { + val uploader = new RecordingMultiPartUploader(partTriesRequired = 2) + uploader.upload(theFile, config.bucket, 1).unsafeRunSync + it("should initiate the upload") { + assert(uploader.initiated.get) + } + it("should upload all parts") { + assertResult(Set(1, 2))(uploader.partsUploaded.get) + } + it("should complete the upload") { + assert(uploader.completed.get) + } + } + describe("when uploading a part fails too many times") { + val uploader = new RecordingMultiPartUploader(partTriesRequired = 4) + uploader.upload(theFile, config.bucket, 1).unsafeRunSync + it("should initiate the upload") { + assert(uploader.initiated.get) + } + it("should not complete the upload") { + assertResult(Set())(uploader.partsUploaded.get) + } + it("should cancel the upload") { + assert(uploader.canceled.get) + } + } + } + } + class RecordingMultiPartUploader(initOkay: Boolean = true, + partTriesRequired: Int = 1, + val initiated: AtomicBoolean = new AtomicBoolean(false), + val partsUploaded: AtomicReference[Set[Int]] = new AtomicReference[Set[Int]](Set()), + val part0Tries: AtomicInteger = new AtomicInteger(0), + val part1Tries: AtomicInteger = new AtomicInteger(0), + val part2Tries: AtomicInteger = new AtomicInteger(0), + val completed: AtomicBoolean = new AtomicBoolean(false), + val canceled: AtomicBoolean = new AtomicBoolean(false)) + extends S3ClientMultiPartUploader( + new MyAmazonS3Client { + + def error[A]: A = { + val exception = new AmazonS3Exception("error") + exception.setAdditionalDetails(Map("Content-MD5" -> "(hash)").asJava) + throw exception + } + + override def initiateMultipartUpload(createMultipartUploadRequest: InitiateMultipartUploadRequest): InitiateMultipartUploadResult = + if (initOkay) { + initiated set true + createUploadResponse + } + else error + + override def uploadPart(uploadPartRequest: UploadPartRequest): UploadPartResult = + uploadPartRequest match { + case _ if uploadPartRequest.getPartNumber == 1 => { + if (part0Tries.incrementAndGet >= partTriesRequired) { + partsUploaded getAndUpdate (t => t + 1) + uploadPartResponse1 + } + else error + } + case _ if uploadPartRequest.getPartNumber == 2 => { + if (part1Tries.incrementAndGet >= partTriesRequired) { + partsUploaded getAndUpdate (t => t + 2) + uploadPartResponse2 + } + else error + } + case _ if uploadPartRequest.getPartNumber == 3 => { + if (part2Tries.incrementAndGet >= partTriesRequired) { + partsUploaded getAndUpdate (t => t + 3) + uploadPartResponse3 + } + else error + } + } + + override def completeMultipartUpload(completeMultipartUploadRequest: CompleteMultipartUploadRequest): CompleteMultipartUploadResult = { + completed set true + completeUploadResponse + } + + override def abortMultipartUpload(abortMultipartUploadRequest: AbortMultipartUploadRequest): Unit = { + canceled set true + } + }) {} + } +} \ No newline at end of file diff --git a/src/test/scala/net/kemitix/s3thorp/awssdk/S3ClientSuite.scala b/src/test/scala/net/kemitix/s3thorp/awssdk/S3ClientSuite.scala index de38611..5dc87d0 100644 --- a/src/test/scala/net/kemitix/s3thorp/awssdk/S3ClientSuite.scala +++ b/src/test/scala/net/kemitix/s3thorp/awssdk/S3ClientSuite.scala @@ -76,7 +76,7 @@ class S3ClientSuite describe("upload") { def invoke(s3Client: ThorpS3Client, localFile: LocalFile, bucket: Bucket) = - s3Client.upload(localFile, bucket).unsafeRunSync + s3Client.upload(localFile, bucket, 1).unsafeRunSync describe("when uploading a file") { val md5Hash = MD5Hash("the-md5hash") val s3Client = new ThorpS3Client( diff --git a/src/test/scala/net/kemitix/s3thorp/awssdk/ThorpS3ClientSuite.scala b/src/test/scala/net/kemitix/s3thorp/awssdk/ThorpS3ClientSuite.scala index c2b3016..dd7db30 100644 --- a/src/test/scala/net/kemitix/s3thorp/awssdk/ThorpS3ClientSuite.scala +++ b/src/test/scala/net/kemitix/s3thorp/awssdk/ThorpS3ClientSuite.scala @@ -57,13 +57,5 @@ class ThorpS3ClientSuite extends FunSpec { assertResult(expected)(result) } } - trait MyS3CatsIOClient extends S3CatsIOClient { - override val underlying: S3AsyncClient = new S3AsyncClient { - override val underlying: s3.S3AsyncClient = new s3.S3AsyncClient { - override def serviceName(): String = "fake-s3-client" - override def close(): Unit = () - } - } - } }