Use multi-part upload for large files (i.e. files > 5Mb) (#22)

* [ThorpS3Client] Extract QuoteStripper and S3ClientObjectLister

* [ThorpS3Client] Extract S3ClientUploader

* [ThorpS3Client] Extract S3ClientCopier

* [ThorpS3Client] Extract S3ClientDeleter

* [ThropS3Client] Can select upload strategy based on file size

Currently switches to an alternate that is a clone of the original
method.

* [MD5HashGenerator] Add md5FilePart

Reimplement md5File using md5FilePart

* [MyS3CatsIOClient] extracted

* [S3ClientMultiPartUploader] add tests for accept def

* [S3ClientMultiPartUploader] initiate multi-part upload

* [Md5HashGenerator] add tests reading part of a file = failing test

* [Md5HashGenerator] fix when reading part of a file

* [S3ClientMultiPartUploader] create UploadPartRequests

* [S3ClientMultiPartUploader] uploadPart delegates to an S3Client

* [S3ClientMultiPartUploader] uploadParts uploads each part

* [S3ClientMultiPartUploader] complete upload should completeUpload

* [S3ClientMultiPartUploader] upload file tests when all okay

* [S3ClientMultiPartUploader] Use Recording client in component tests

* [s3ClientMultiPartUploader] remove unused variable

* [S3ClientMultiPartUploader] failing test for init upload error

* [S3ClientMultiPartUploader] Handle errors during multi-part upload

* [S3ClientMultiPartUploader] Retry uploads

* [S3Action] ErroredS4Action now holds the error

* [S3ClientMultiPartUploader] Add logging

* [S3ClientMultiPartUploader] Display warning messages

* [S3ClientMultiPartUploader] test creation of CreateMulitpartUploadRequest

* [S3ClientMultiPartUploader] specify bucket in UploadPartRequest

* [S3ClientMultiPartUploader] verify complete request has upload id

* [S3ClientMultiPartUploader] verify abort request contains upload id

* [S3ClientMultiPartUploader] add logging around retry errors

* [S3ClientMultiPartUploader] verify upload part request had remote key

* [S3ClientMultipartuploaderLogging] refactoring/rewriting strings

* [S3ClientMultiPartUploader] add bucket to abort request

* [S3ClientMultiPartUploader] part numbers must start at 1

* [S3ClientMultiPartUploader] fix capitalisation in comment

* [Config] define maxRetries

* [S3ClientMultiPartUploader] abort request should have the remote key

* [S3ClientMultiPartUploader] display remote key properly

* [S3ClientMultiPartUploader] rename method for plural parts

* [S3ClientMultiPartUploader] log hash and part number

* [MD5HashGenerator] support creating hash from a byte array

* [sbt] add aws-java-sdk-s3 (v1) for multi-part uploads

The reactive-aws-s3-* library is based on the V2 of the Java library,
which doesn't support multi-part uploads.

* [S3ClientMultiPartUploader] use Amazon S3 Client (from v1 sdk)

* [S3ClientMultiPartUploader] include file and offset in upload part request

* {S3ClientMultiPartUploader] Add part etags to complete request

* [S3ClientMultiPartUploader] Use withers to create requests

* [S3ClientMultiPartUploader] don't bounce responses to tags when client accepts then as is

* [MD5HashGenerator] use MD5Hash

* [S3ClientMultiPartUploader] include hash in sending log message

* [S3ClientMultiPartUploader] tests throw correct exception

* [S3ClientMultiPartUploader] Include returned hash in error and log when send is finished

* [S3ClientUploader] Extract as trait, renaming implementations

* [S3Client] upload def now requires tryCount

* [S3ClientUploader] add accepts to trait

* [S3ClientMultiPartUploaderSuite] remove ambiguity over class import

* [S3ClientMultiPartTransferManager] implement and use
This commit is contained in:
Paul Campbell 2019-05-27 20:37:59 +01:00 committed by GitHub
parent 2ff5d68b4f
commit 082babb94d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 1318 additions and 93 deletions

View file

@ -8,9 +8,13 @@ scalaVersion := "2.12.8"
libraryDependencies += "com.github.scopt" %% "scopt" % "4.0.0-RC2" libraryDependencies += "com.github.scopt" %% "scopt" % "4.0.0-RC2"
// AWS SDK // AWS SDK
/// wraps the in-preview Java SDK V2 which is incomplete and doesn't support multi-part uploads
libraryDependencies += "com.github.j5ik2o" %% "reactive-aws-s3-core" % "1.1.3" libraryDependencies += "com.github.j5ik2o" %% "reactive-aws-s3-core" % "1.1.3"
libraryDependencies += "com.github.j5ik2o" %% "reactive-aws-s3-cats" % "1.1.3" libraryDependencies += "com.github.j5ik2o" %% "reactive-aws-s3-cats" % "1.1.3"
// AWS SDK - multi-part upload
libraryDependencies += "com.amazonaws" % "aws-java-sdk-s3" % "1.11.560"
// Logging // Logging
libraryDependencies += "com.typesafe.scala-logging" %% "scala-logging" % "3.9.2" libraryDependencies += "com.typesafe.scala-logging" %% "scala-logging" % "3.9.2"
libraryDependencies += "org.slf4j" % "slf4j-log4j12" % "1.7.26" libraryDependencies += "org.slf4j" % "slf4j-log4j12" % "1.7.26"

View file

@ -13,7 +13,7 @@ trait ActionSubmitter
action match { action match {
case ToUpload(file) => case ToUpload(file) =>
log4(s" Upload: ${file.relative}") log4(s" Upload: ${file.relative}")
upload(file, c.bucket) upload(file, c.bucket, 1)
case ToCopy(sourceKey, hash, targetKey) => case ToCopy(sourceKey, hash, targetKey) =>
log4(s" Copy: $sourceKey => $targetKey") log4(s" Copy: $sourceKey => $targetKey")
copy(c.bucket, sourceKey, hash, targetKey) copy(c.bucket, sourceKey, hash, targetKey)

View file

@ -6,7 +6,10 @@ case class Config(bucket: Bucket = Bucket(""),
prefix: RemoteKey = RemoteKey(""), prefix: RemoteKey = RemoteKey(""),
verbose: Int = 1, verbose: Int = 1,
filters: Seq[Filter] = List(), filters: Seq[Filter] = List(),
multiPartThreshold: Long = 1024 * 1024 * 5,
maxRetries: Int = 3,
source: File source: File
) { ) {
require(source.isDirectory, s"Source must be a directory: $source") require(source.isDirectory, s"Source must be a directory: $source")
require(multiPartThreshold >= 1024 * 1024 * 5, s"Threshold for multi-part upload is 5Mb: '$multiPartThreshold'")
} }

View file

@ -6,15 +6,30 @@ import java.security.{DigestInputStream, MessageDigest}
trait MD5HashGenerator trait MD5HashGenerator
extends Logging { extends Logging {
def md5File(file: File)(implicit c: Config): MD5Hash = { def md5File(file: File)
log5(s"md5file:reading:${file.length}:$file") (implicit c: Config): MD5Hash = {
val buffer = new Array[Byte](8192) val hash = md5FilePart(file, 0, file.length)
val md5 = MessageDigest.getInstance("MD5") hash
val dis = new DigestInputStream(new FileInputStream(file), md5) }
try { while (dis.read(buffer) != -1) { } } finally { dis.close() }
val hash = md5.digest.map("%02x".format(_)).mkString def md5FilePart(file: File,
log5(s"md5file:generated:$hash:$file") offset: Long,
MD5Hash(hash) size: Long)
(implicit c: Config): MD5Hash = {
log5(s"md5:reading:offset $offset:size $size:$file")
val fis = new FileInputStream(file)
fis skip offset
val buffer = new Array[Byte](size.toInt)
fis read buffer
val hash = md5PartBody(buffer)
log5(s"md5:generated:${hash.hash}")
hash
}
def md5PartBody(partBody: Array[Byte]): MD5Hash = {
val md5 = MessageDigest getInstance "MD5"
md5 update partBody
MD5Hash((md5.digest map ("%02x" format _)).mkString)
} }
} }

View file

@ -24,6 +24,10 @@ case class DeleteS3Action(remoteKey: RemoteKey) extends S3Action {
override val order: Int = 3 override val order: Int = 3
} }
case class ErroredS3Action(remoteKey: RemoteKey, e: Throwable) extends S3Action {
override val order: Int = 10
}
object S3Action { object S3Action {
implicit def ord[A <: S3Action]: Ordering[A] = Ordering.by(_.order) implicit def ord[A <: S3Action]: Ordering[A] = Ordering.by(_.order)
} }

View file

@ -38,8 +38,10 @@ class Sync(s3Client: S3Client)
ioActions.flatMap { actions => IO { actions.sorted } } ioActions.flatMap { actions => IO { actions.sorted } }
override def upload(localFile: LocalFile, override def upload(localFile: LocalFile,
bucket: Bucket)(implicit c: Config): IO[UploadS3Action] = bucket: Bucket,
s3Client.upload(localFile, bucket) tryCount: Int)
(implicit c: Config): IO[S3Action] =
s3Client.upload(localFile, bucket, tryCount)
override def copy(bucket: Bucket, override def copy(bucket: Bucket,
sourceKey: RemoteKey, sourceKey: RemoteKey,

View file

@ -0,0 +1,4 @@
package net.kemitix.s3thorp.awssdk
case class CancellableMultiPartUpload(e: Throwable,
uploadId: String) extends Exception(e)

View file

@ -0,0 +1,7 @@
package net.kemitix.s3thorp.awssdk
trait QuoteStripper {
def stripQuotes: Char => Boolean = _ != '"'
}

View file

@ -19,8 +19,9 @@ trait S3Client {
)(implicit c: Config): IO[S3ObjectsData] )(implicit c: Config): IO[S3ObjectsData]
def upload(localFile: LocalFile, def upload(localFile: LocalFile,
bucket: Bucket bucket: Bucket,
)(implicit c: Config): IO[UploadS3Action] tryCount: Int
)(implicit c: Config): IO[S3Action]
def copy(bucket: Bucket, def copy(bucket: Bucket,
sourceKey: RemoteKey, sourceKey: RemoteKey,

View file

@ -0,0 +1,28 @@
package net.kemitix.s3thorp.awssdk
import cats.effect.IO
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
import net.kemitix.s3thorp.{Bucket, Config, CopyS3Action, MD5Hash, RemoteKey}
import software.amazon.awssdk.services.s3.model.CopyObjectRequest
private class S3ClientCopier(s3Client: S3CatsIOClient)
extends S3ClientLogging {
def copy(bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey)
(implicit c: Config): IO[CopyS3Action] = {
val request = CopyObjectRequest.builder
.bucket(bucket.name)
.copySource(s"${bucket.name}/${sourceKey.key}")
.copySourceIfMatch(hash.hash)
.key(targetKey.key).build
s3Client.copyObject(request)
.bracket(
logCopyStart(bucket, sourceKey, targetKey))(
logCopyFinish(bucket, sourceKey,targetKey))
.map(_ => CopyS3Action(targetKey))
}
}

View file

@ -0,0 +1,24 @@
package net.kemitix.s3thorp.awssdk
import cats.effect.IO
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
import net.kemitix.s3thorp.{Bucket, Config, DeleteS3Action, RemoteKey}
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest
private class S3ClientDeleter(s3Client: S3CatsIOClient)
extends S3ClientLogging {
def delete(bucket: Bucket,
remoteKey: RemoteKey)
(implicit c: Config): IO[DeleteS3Action] = {
val request = DeleteObjectRequest.builder
.bucket(bucket.name)
.key(remoteKey.key).build
s3Client.deleteObject(request)
.bracket(
logDeleteStart(bucket, remoteKey))(
logDeleteFinish(bucket, remoteKey))
.map(_ => DeleteS3Action(remoteKey))
}
}

View file

@ -0,0 +1,27 @@
package net.kemitix.s3thorp.awssdk
import cats.effect.IO
import com.amazonaws.services.s3.transfer.TransferManager
import net.kemitix.s3thorp._
class S3ClientMultiPartTransferManager(transferManager: TransferManager)
extends S3ClientUploader
with S3ClientMultiPartUploaderLogging {
def accepts(localFile: LocalFile)
(implicit c: Config): Boolean =
localFile.file.length >= c.multiPartThreshold
override
def upload(localFile: LocalFile,
bucket: Bucket,
tryCount: Int)
(implicit c: Config): IO[S3Action] = {
IO {
logMultiPartUploadStart(localFile, tryCount)
val result = transferManager.upload(bucket.name, localFile.remoteKey.key, localFile.file)
.waitForUploadResult()
logMultiPartUploadFinished(localFile)
UploadS3Action(RemoteKey(result.getKey), MD5Hash(result.getETag))
}
}
}

View file

@ -0,0 +1,145 @@
package net.kemitix.s3thorp.awssdk
import scala.collection.JavaConverters._
import cats.effect.IO
import cats.implicits._
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.{AbortMultipartUploadRequest, AmazonS3Exception, CompleteMultipartUploadRequest, CompleteMultipartUploadResult, InitiateMultipartUploadRequest, InitiateMultipartUploadResult, PartETag, UploadPartRequest, UploadPartResult}
import net.kemitix.s3thorp._
import scala.util.control.NonFatal
private class S3ClientMultiPartUploader(s3Client: AmazonS3)
extends S3ClientUploader
with S3ClientMultiPartUploaderLogging
with MD5HashGenerator
with QuoteStripper {
def accepts(localFile: LocalFile)
(implicit c: Config): Boolean =
localFile.file.length >= c.multiPartThreshold
def createUpload(bucket: Bucket, localFile: LocalFile)
(implicit c: Config): IO[InitiateMultipartUploadResult] = {
logMultiPartUploadInitiate(localFile)
IO(s3Client initiateMultipartUpload createUploadRequest(bucket, localFile))
}
def createUploadRequest(bucket: Bucket, localFile: LocalFile) =
new InitiateMultipartUploadRequest(
bucket.name,
localFile.remoteKey.key)
def parts(localFile: LocalFile,
response: InitiateMultipartUploadResult)
(implicit c: Config): IO[Stream[UploadPartRequest]] = {
val fileSize = localFile.file.length
val maxParts = 1024 // arbitrary, supports upto 10,000 (I, think)
val threshold = c.multiPartThreshold
val nParts = Math.min((fileSize / threshold) + 1, maxParts).toInt
val partSize = fileSize / nParts
val maxUpload = nParts * partSize
IO {
require(fileSize <= maxUpload,
s"File (${localFile.file.getPath}) size ($fileSize) exceeds upload limit: $maxUpload")
logMultiPartUploadPartsDetails(localFile, nParts, partSize)
for {
partNumber <- (1 to nParts).toStream
offSet = (partNumber - 1) * partSize
chunkSize = Math.min(fileSize - offSet, partSize)
partHash = md5FilePart(localFile.file, offSet, chunkSize)
_ = logMultiPartUploadPartDetails(localFile, partNumber, partHash)
uploadPartRequest = createUploadPartRequest(localFile, response, partNumber, chunkSize, partHash)
} yield uploadPartRequest
}
}
private def createUploadPartRequest(localFile: LocalFile,
response: InitiateMultipartUploadResult,
partNumber: Int,
chunkSize: Long,
partHash: MD5Hash)
(implicit c: Config) = {
new UploadPartRequest()
.withBucketName(c.bucket.name)
.withKey(localFile.remoteKey.key)
.withUploadId(response.getUploadId)
.withPartNumber(partNumber)
.withPartSize(chunkSize)
.withMD5Digest(partHash.hash)
.withFile(localFile.file)
.withFileOffset((partNumber - 1) * chunkSize)
}
def uploadPart(localFile: LocalFile)
(implicit c: Config): UploadPartRequest => IO[UploadPartResult] =
partRequest => {
logMultiPartUploadPart(localFile, partRequest)
IO(s3Client.uploadPart(partRequest))
.handleErrorWith{
case error: AmazonS3Exception => {
logMultiPartUploadPartError(localFile, partRequest, error)
IO.raiseError(CancellableMultiPartUpload(error, partRequest.getUploadId))
}}
}
def uploadParts(localFile: LocalFile,
parts: Stream[UploadPartRequest])
(implicit c: Config): IO[Stream[UploadPartResult]] =
(parts map uploadPart(localFile)).sequence
def completeUpload(createUploadResponse: InitiateMultipartUploadResult,
uploadPartResponses: Stream[UploadPartResult],
localFile: LocalFile)
(implicit c: Config): IO[CompleteMultipartUploadResult] = {
logMultiPartUploadCompleted(createUploadResponse, uploadPartResponses, localFile)
IO(s3Client completeMultipartUpload createCompleteRequest(createUploadResponse, uploadPartResponses.toList))
}
def createCompleteRequest(createUploadResponse: InitiateMultipartUploadResult,
uploadPartResult: List[UploadPartResult]) = {
new CompleteMultipartUploadRequest()
.withBucketName(createUploadResponse.getBucketName)
.withKey(createUploadResponse.getKey)
.withUploadId(createUploadResponse.getUploadId)
.withPartETags(uploadPartResult.asJava)
}
def cancel(uploadId: String, localFile: LocalFile)
(implicit c: Config): IO[Unit] = {
logMultiPartUploadCancelling(localFile)
IO(s3Client abortMultipartUpload createAbortRequest(uploadId, localFile))
}
def createAbortRequest(uploadId: String,
localFile: LocalFile)
(implicit c: Config): AbortMultipartUploadRequest =
new AbortMultipartUploadRequest(c.bucket.name, localFile.remoteKey.key, uploadId)
override def upload(localFile: LocalFile,
bucket: Bucket,
tryCount: Int)
(implicit c: Config): IO[S3Action] = {
logMultiPartUploadStart(localFile, tryCount)
(for {
createUploadResponse <- createUpload(bucket, localFile)
parts <- parts(localFile, createUploadResponse)
uploadPartResponses <- uploadParts(localFile, parts)
completedUploadResponse <- completeUpload(createUploadResponse, uploadPartResponses, localFile)
} yield completedUploadResponse)
.map(_.getETag)
.map(_ filter stripQuotes)
.map(MD5Hash)
.map(UploadS3Action(localFile.remoteKey, _))
.handleErrorWith {
case CancellableMultiPartUpload(e, uploadId) =>
if (tryCount >= c.maxRetries) IO(logErrorCancelling(e, localFile)) *> cancel(uploadId, localFile) *> IO.pure(ErroredS3Action(localFile.remoteKey, e))
else IO(logErrorRetrying(e, localFile, tryCount)) *> upload(localFile, bucket, tryCount + 1)
case NonFatal(e) =>
if (tryCount >= c.maxRetries) IO(logErrorUnknown(e, localFile)) *> IO.pure(ErroredS3Action(localFile.remoteKey, e))
else IO(logErrorRetrying(e, localFile, tryCount)) *> upload(localFile, bucket, tryCount + 1)
}
}
}

View file

@ -0,0 +1,77 @@
package net.kemitix.s3thorp.awssdk
import com.amazonaws.services.s3.model.{AmazonS3Exception, InitiateMultipartUploadResult, UploadPartRequest, UploadPartResult}
import net.kemitix.s3thorp.{Config, LocalFile, MD5Hash}
trait S3ClientMultiPartUploaderLogging
extends S3ClientLogging {
private val prefix = "multi-part upload"
def logMultiPartUploadStart(localFile: LocalFile,
tryCount: Int)
(implicit c: Config): Unit =
log4(s"$prefix:upload:try $tryCount: ${localFile.remoteKey.key}")
def logMultiPartUploadFinished(localFile: LocalFile)
(implicit c: Config): Unit =
log4(s"$prefix:upload:finished: ${localFile.remoteKey.key}")
def logMultiPartUploadInitiate(localFile: LocalFile)
(implicit c: Config): Unit =
log5(s"$prefix:initiating: ${localFile.remoteKey.key}")
def logMultiPartUploadPartsDetails(localFile: LocalFile,
nParts: Int,
partSize: Long)
(implicit c: Config): Unit =
log5(s"$prefix:parts $nParts:each $partSize: ${localFile.remoteKey.key}")
def logMultiPartUploadPartDetails(localFile: LocalFile,
partNumber: Int,
partHash: MD5Hash)
(implicit c: Config): Unit =
log5(s"$prefix:part $partNumber:hash ${partHash.hash}: ${localFile.remoteKey.key}")
def logMultiPartUploadPart(localFile: LocalFile,
partRequest: UploadPartRequest)
(implicit c: Config): Unit =
log5(s"$prefix:sending:part ${partRequest.getPartNumber}: ${partRequest.getMd5Digest}: ${localFile.remoteKey.key}")
def logMultiPartUploadPartDone(localFile: LocalFile,
partRequest: UploadPartRequest,
result: UploadPartResult)
(implicit c: Config): Unit =
log5(s"$prefix:sent:part ${partRequest.getPartNumber}: ${result.getPartETag}: ${localFile.remoteKey.key}")
def logMultiPartUploadPartError(localFile: LocalFile,
partRequest: UploadPartRequest,
error: AmazonS3Exception)
(implicit c: Config): Unit = {
val returnedMD5Hash = error.getAdditionalDetails.get("Content-MD5")
warn(s"$prefix:error:part ${partRequest.getPartNumber}:ret-hash $returnedMD5Hash: ${localFile.remoteKey.key}")
}
def logMultiPartUploadCompleted(createUploadResponse: InitiateMultipartUploadResult,
uploadPartResponses: Stream[UploadPartResult],
localFile: LocalFile)
(implicit c: Config): Unit =
log4(s"$prefix:completed:parts ${uploadPartResponses.size}: ${localFile.remoteKey.key}")
def logMultiPartUploadCancelling(localFile: LocalFile)
(implicit c: Config): Unit =
warn(s"$prefix:cancelling: ${localFile.remoteKey.key}")
def logErrorRetrying(e: Throwable, localFile: LocalFile, tryCount: Int)
(implicit c: Config): Unit =
warn(s"$prefix:retry:error ${e.getMessage}: ${localFile.remoteKey.key}")
def logErrorCancelling(e: Throwable, localFile: LocalFile)
(implicit c: Config) : Unit =
error(s"$prefix:cancelling:error ${e.getMessage}: ${localFile.remoteKey.key}")
def logErrorUnknown(e: Throwable, localFile: LocalFile)
(implicit c: Config): Unit =
error(s"$prefix:unknown:error $e: ${localFile.remoteKey.key}")
}

View file

@ -0,0 +1,38 @@
package net.kemitix.s3thorp.awssdk
import cats.effect.IO
import net.kemitix.s3thorp.{Bucket, Config, HashModified, LastModified, MD5Hash, RemoteKey}
import software.amazon.awssdk.services.s3.model.{ListObjectsV2Request, S3Object}
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
import scala.collection.JavaConverters._
private class S3ClientObjectLister(s3Client: S3CatsIOClient)
extends S3ClientLogging
with S3ObjectsByHash
with QuoteStripper {
def listObjects(bucket: Bucket,
prefix: RemoteKey)
(implicit c: Config): IO[S3ObjectsData] = {
val request = ListObjectsV2Request.builder
.bucket(bucket.name)
.prefix(prefix.key).build
s3Client.listObjectsV2(request)
.bracket(
logListObjectsStart(bucket, prefix))(
logListObjectsFinish(bucket,prefix))
.map(_.contents)
.map(_.asScala)
.map(_.toStream)
.map(os => S3ObjectsData(byHash(os), byKey(os)))
}
private def byKey(os: Stream[S3Object]) =
os.map { o => {
val remoteKey = RemoteKey(o.key)
val hash = MD5Hash(o.eTag() filter stripQuotes)
val lastModified = LastModified(o.lastModified())
(remoteKey, HashModified(hash, lastModified))
}}.toMap
}

View file

@ -0,0 +1,34 @@
package net.kemitix.s3thorp.awssdk
import cats.effect.IO
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
import net.kemitix.s3thorp.{Bucket, Config, LocalFile, MD5Hash, UploadS3Action}
import software.amazon.awssdk.core.async.AsyncRequestBody
import software.amazon.awssdk.services.s3.model.PutObjectRequest
private class S3ClientPutObjectUploader(s3Client: S3CatsIOClient)
extends S3ClientUploader
with S3ClientLogging
with QuoteStripper {
override def accepts(localFile: LocalFile)(implicit c: Config): Boolean = true
override
def upload(localFile: LocalFile,
bucket: Bucket,
tryCount: Int)
(implicit c: Config): IO[UploadS3Action] = {
val request = PutObjectRequest.builder
.bucket(bucket.name)
.key(localFile.remoteKey.key).build
val body = AsyncRequestBody.fromFile(localFile.file)
s3Client.putObject(request, body)
.bracket(
logUploadStart(localFile, bucket))(
logUploadFinish(localFile, bucket))
.map(_.eTag)
.map(_ filter stripQuotes)
.map(MD5Hash)
.map(UploadS3Action(localFile.remoteKey, _))
}
}

View file

@ -0,0 +1,16 @@
package net.kemitix.s3thorp.awssdk
import cats.effect.IO
import net.kemitix.s3thorp.{Bucket, Config, LocalFile, S3Action}
trait S3ClientUploader {
def accepts(localFile: LocalFile)
(implicit c: Config): Boolean
def upload(localFile: LocalFile,
bucket: Bucket,
tryCount: Int)
(implicit c: Config): IO[S3Action]
}

View file

@ -1,89 +1,49 @@
package net.kemitix.s3thorp.awssdk package net.kemitix.s3thorp.awssdk
import cats.effect.IO import cats.effect.IO
import com.amazonaws.services.s3.transfer.TransferManagerBuilder
import com.amazonaws.services.s3.{AmazonS3Client, AmazonS3ClientBuilder}
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
import net.kemitix.s3thorp._ import net.kemitix.s3thorp._
import software.amazon.awssdk.core.async.AsyncRequestBody
import software.amazon.awssdk.services.s3.model.{Bucket => _, _} import software.amazon.awssdk.services.s3.model.{Bucket => _, _}
import scala.collection.JavaConverters._
private class ThorpS3Client(s3Client: S3CatsIOClient) private class ThorpS3Client(s3Client: S3CatsIOClient)
extends S3Client extends S3Client
with S3ObjectsByHash with S3ClientLogging
with S3ClientLogging { with QuoteStripper {
lazy val amazonS3Client = AmazonS3ClientBuilder.defaultClient
lazy val amazonS3TransferManager = TransferManagerBuilder.defaultTransferManager
lazy val objectLister = new S3ClientObjectLister(s3Client)
lazy val copier = new S3ClientCopier(s3Client)
lazy val uploader = new S3ClientPutObjectUploader(s3Client)
lazy val multiPartUploader = new S3ClientMultiPartTransferManager(amazonS3TransferManager)
lazy val deleter = new S3ClientDeleter(s3Client)
override def listObjects(bucket: Bucket, override def listObjects(bucket: Bucket,
prefix: RemoteKey) prefix: RemoteKey)
(implicit c: Config): IO[S3ObjectsData] = { (implicit c: Config): IO[S3ObjectsData] =
val request = ListObjectsV2Request.builder objectLister.listObjects(bucket, prefix)
.bucket(bucket.name)
.prefix(prefix.key).build
s3Client.listObjectsV2(request)
.bracket(
logListObjectsStart(bucket, prefix))(
logListObjectsFinish(bucket,prefix))
.map(_.contents)
.map(_.asScala)
.map(_.toStream)
.map(os => S3ObjectsData(byHash(os), byKey(os)))
}
private def byKey(os: Stream[S3Object]) =
os.map { o => {
val remoteKey = RemoteKey(o.key)
val hash = MD5Hash(o.eTag() filter stripQuotes)
val lastModified = LastModified(o.lastModified())
(remoteKey, HashModified(hash, lastModified))
}}.toMap
override def upload(localFile: LocalFile,
bucket: Bucket)
(implicit c: Config): IO[UploadS3Action] = {
val request = PutObjectRequest.builder
.bucket(bucket.name)
.key(localFile.remoteKey.key).build
val body = AsyncRequestBody.fromFile(localFile.file)
s3Client.putObject(request, body)
.bracket(
logUploadStart(localFile, bucket))(
logUploadFinish(localFile, bucket))
.map(_.eTag)
.map(_ filter stripQuotes)
.map(MD5Hash)
.map(UploadS3Action(localFile.remoteKey, _))
}
private def stripQuotes: Char => Boolean = _ != '"'
override def copy(bucket: Bucket, override def copy(bucket: Bucket,
sourceKey: RemoteKey, sourceKey: RemoteKey,
hash: MD5Hash, hash: MD5Hash,
targetKey: RemoteKey) targetKey: RemoteKey)
(implicit c: Config): IO[CopyS3Action] = { (implicit c: Config): IO[CopyS3Action] =
val request = CopyObjectRequest.builder copier.copy(bucket, sourceKey,hash, targetKey)
.bucket(bucket.name)
.copySource(s"${bucket.name}/${sourceKey.key}")
.copySourceIfMatch(hash.hash) override def upload(localFile: LocalFile,
.key(targetKey.key).build bucket: Bucket,
s3Client.copyObject(request) tryCount: Int)
.bracket( (implicit c: Config): IO[S3Action] =
logCopyStart(bucket, sourceKey, targetKey))( if (multiPartUploader.accepts(localFile)) multiPartUploader.upload(localFile, bucket, 1)
logCopyFinish(bucket, sourceKey,targetKey)) else uploader.upload(localFile, bucket, tryCount)
.map(_ => CopyS3Action(targetKey))
}
override def delete(bucket: Bucket, override def delete(bucket: Bucket,
remoteKey: RemoteKey) remoteKey: RemoteKey)
(implicit c: Config): IO[DeleteS3Action] = { (implicit c: Config): IO[DeleteS3Action] =
val request = DeleteObjectRequest.builder deleter.delete(bucket, remoteKey)
.bucket(bucket.name)
.key(remoteKey.key).build
s3Client.deleteObject(request)
.bracket(
logDeleteStart(bucket, remoteKey))(
logDeleteFinish(bucket, remoteKey))
.map(_ => DeleteS3Action(remoteKey))
}
} }

Binary file not shown.

Binary file not shown.

View file

@ -6,7 +6,8 @@ import net.kemitix.s3thorp.awssdk.{S3ObjectsData, S3Client}
trait DummyS3Client extends S3Client { trait DummyS3Client extends S3Client {
override def upload(localFile: LocalFile, override def upload(localFile: LocalFile,
bucket: Bucket bucket: Bucket,
tryCount: Int
)(implicit c: Config): IO[UploadS3Action] = ??? )(implicit c: Config): IO[UploadS3Action] = ???
override def copy(bucket: Bucket, override def copy(bucket: Bucket,

View file

@ -0,0 +1,58 @@
package net.kemitix.s3thorp
import java.nio.file.Files
class MD5HashGeneratorTest extends UnitTest {
private val source = Resource(this, "upload")
private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
new MD5HashGenerator {
describe("read a small file (smaller than buffer)") {
val file = Resource(this, "upload/root-file")
it("should generate the correct hash") {
val expected = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e")
val result = md5File(file)
assertResult(expected)(result)
}
}
describe("read a buffer") {
val file = Resource(this, "upload/root-file")
val buffer: Array[Byte] = Files.readAllBytes(file.toPath)
it("should generate the correct hash") {
val expected = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e")
val result = md5PartBody(buffer)
assertResult(expected)(result)
}
}
describe("read a large file (bigger than buffer)") {
val file = Resource(this, "big-file")
it("should generate the correct hash") {
val expected = MD5Hash("b1ab1f7680138e6db7309200584e35d8")
val result = md5File(file)
assertResult(expected)(result)
}
}
describe("read part of a file") {
val file = Resource(this, "big-file")
val halfFileLength = file.length / 2
assertResult(file.length)(halfFileLength * 2)
describe("when starting at the beginning of the file") {
it("should generate the correct hash") {
val expected = MD5Hash("aadf0d266cefe0fcdb241a51798d74b3")
val result = md5FilePart(file, 0, halfFileLength)
assertResult(expected)(result)
}
}
describe("when starting in the middle of the file") {
it("should generate the correct hash") {
val expected = MD5Hash("16e08d53ca36e729d808fd5e4f7e35dc")
val result = md5FilePart(file, halfFileLength, halfFileLength)
assertResult(expected)(result)
}
}
}
}
}

View file

@ -29,14 +29,17 @@ class SyncSuite
val md5Hash = MD5Hash("the-hash") val md5Hash = MD5Hash("the-hash")
val testLocalFile = aLocalFile("file", md5Hash, source, generateKey(source, prefix)) val testLocalFile = aLocalFile("file", md5Hash, source, generateKey(source, prefix))
val sync = new Sync(new S3Client with DummyS3Client { val sync = new Sync(new S3Client with DummyS3Client {
override def upload(localFile: LocalFile, bucket: Bucket)(implicit c: Config) = IO { override def upload(localFile: LocalFile,
bucket: Bucket,
tryCount: Int)
(implicit c: Config) = IO {
assert(bucket == testBucket) assert(bucket == testBucket)
UploadS3Action(localFile.remoteKey, md5Hash) UploadS3Action(localFile.remoteKey, md5Hash)
} }
}) })
it("delegates unmodified to the S3Client") { it("delegates unmodified to the S3Client") {
assertResult(UploadS3Action(RemoteKey(prefix.key + "/file"), md5Hash))( assertResult(UploadS3Action(RemoteKey(prefix.key + "/file"), md5Hash))(
sync.upload(testLocalFile, testBucket). sync.upload(testLocalFile, testBucket, 1).
unsafeRunSync()) unsafeRunSync())
} }
} }
@ -177,7 +180,8 @@ class SyncSuite
override def listObjects(bucket: Bucket, prefix: RemoteKey)(implicit c: Config) = IO {s3ObjectsData} override def listObjects(bucket: Bucket, prefix: RemoteKey)(implicit c: Config) = IO {s3ObjectsData}
override def upload(localFile: LocalFile, override def upload(localFile: LocalFile,
bucket: Bucket bucket: Bucket,
tryCount: Int
)(implicit c: Config) = IO { )(implicit c: Config) = IO {
if (bucket == testBucket) if (bucket == testBucket)
uploadsRecord += (localFile.relative.toString -> localFile.remoteKey) uploadsRecord += (localFile.relative.toString -> localFile.remoteKey)

View file

@ -0,0 +1,375 @@
package net.kemitix.s3thorp.awssdk
import java.io.{File, InputStream}
import java.net.URL
import java.util
import java.util.Date
import com.amazonaws.{AmazonWebServiceRequest, HttpMethod}
import com.amazonaws.regions.Region
import com.amazonaws.services.s3.model.analytics.AnalyticsConfiguration
import com.amazonaws.services.s3.model.inventory.InventoryConfiguration
import com.amazonaws.services.s3.{AmazonS3, S3ClientOptions, S3ResponseMetadata, model}
import com.amazonaws.services.s3.model.metrics.MetricsConfiguration
import com.amazonaws.services.s3.model._
import com.amazonaws.services.s3.waiters.AmazonS3Waiters
class MyAmazonS3Client extends AmazonS3 {
override def setEndpoint(endpoint: String): Unit = ???
override def setRegion(region: Region): Unit = ???
override def setS3ClientOptions(clientOptions: S3ClientOptions): Unit = ???
override def changeObjectStorageClass(bucketName: String, key: String, newStorageClass: StorageClass): Unit = ???
override def setObjectRedirectLocation(bucketName: String, key: String, newRedirectLocation: String): Unit = ???
override def listObjects(bucketName: String): ObjectListing = ???
override def listObjects(bucketName: String, prefix: String): ObjectListing = ???
override def listObjects(listObjectsRequest: ListObjectsRequest): ObjectListing = ???
override def listObjectsV2(bucketName: String): ListObjectsV2Result = ???
override def listObjectsV2(bucketName: String, prefix: String): ListObjectsV2Result = ???
override def listObjectsV2(listObjectsV2Request: ListObjectsV2Request): ListObjectsV2Result = ???
override def listNextBatchOfObjects(previousObjectListing: ObjectListing): ObjectListing = ???
override def listNextBatchOfObjects(listNextBatchOfObjectsRequest: ListNextBatchOfObjectsRequest): ObjectListing = ???
override def listVersions(bucketName: String, prefix: String): VersionListing = ???
override def listNextBatchOfVersions(previousVersionListing: VersionListing): VersionListing = ???
override def listNextBatchOfVersions(listNextBatchOfVersionsRequest: ListNextBatchOfVersionsRequest): VersionListing = ???
override def listVersions(bucketName: String, prefix: String, keyMarker: String, versionIdMarker: String, delimiter: String, maxResults: Integer): VersionListing = ???
override def listVersions(listVersionsRequest: ListVersionsRequest): VersionListing = ???
override def getS3AccountOwner: Owner = ???
override def getS3AccountOwner(getS3AccountOwnerRequest: GetS3AccountOwnerRequest): Owner = ???
override def doesBucketExist(bucketName: String): Boolean = ???
override def doesBucketExistV2(bucketName: String): Boolean = ???
override def headBucket(headBucketRequest: HeadBucketRequest): HeadBucketResult = ???
override def listBuckets(): util.List[Bucket] = ???
override def listBuckets(listBucketsRequest: ListBucketsRequest): util.List[Bucket] = ???
override def getBucketLocation(bucketName: String): String = ???
override def getBucketLocation(getBucketLocationRequest: GetBucketLocationRequest): String = ???
override def createBucket(createBucketRequest: CreateBucketRequest): Bucket = ???
override def createBucket(bucketName: String): Bucket = ???
override def createBucket(bucketName: String, region: model.Region): Bucket = ???
override def createBucket(bucketName: String, region: String): Bucket = ???
override def getObjectAcl(bucketName: String, key: String): AccessControlList = ???
override def getObjectAcl(bucketName: String, key: String, versionId: String): AccessControlList = ???
override def getObjectAcl(getObjectAclRequest: GetObjectAclRequest): AccessControlList = ???
override def setObjectAcl(bucketName: String, key: String, acl: AccessControlList): Unit = ???
override def setObjectAcl(bucketName: String, key: String, acl: CannedAccessControlList): Unit = ???
override def setObjectAcl(bucketName: String, key: String, versionId: String, acl: AccessControlList): Unit = ???
override def setObjectAcl(bucketName: String, key: String, versionId: String, acl: CannedAccessControlList): Unit = ???
override def setObjectAcl(setObjectAclRequest: SetObjectAclRequest): Unit = ???
override def getBucketAcl(bucketName: String): AccessControlList = ???
override def setBucketAcl(setBucketAclRequest: SetBucketAclRequest): Unit = ???
override def getBucketAcl(getBucketAclRequest: GetBucketAclRequest): AccessControlList = ???
override def setBucketAcl(bucketName: String, acl: AccessControlList): Unit = ???
override def setBucketAcl(bucketName: String, acl: CannedAccessControlList): Unit = ???
override def getObjectMetadata(bucketName: String, key: String): ObjectMetadata = ???
override def getObjectMetadata(getObjectMetadataRequest: GetObjectMetadataRequest): ObjectMetadata = ???
override def getObject(bucketName: String, key: String): S3Object = ???
override def getObject(getObjectRequest: GetObjectRequest): S3Object = ???
override def getObject(getObjectRequest: GetObjectRequest, destinationFile: File): ObjectMetadata = ???
override def getObjectAsString(bucketName: String, key: String): String = ???
override def getObjectTagging(getObjectTaggingRequest: GetObjectTaggingRequest): GetObjectTaggingResult = ???
override def setObjectTagging(setObjectTaggingRequest: SetObjectTaggingRequest): SetObjectTaggingResult = ???
override def deleteObjectTagging(deleteObjectTaggingRequest: DeleteObjectTaggingRequest): DeleteObjectTaggingResult = ???
override def deleteBucket(deleteBucketRequest: DeleteBucketRequest): Unit = ???
override def deleteBucket(bucketName: String): Unit = ???
override def putObject(putObjectRequest: PutObjectRequest): PutObjectResult = ???
override def putObject(bucketName: String, key: String, file: File): PutObjectResult = ???
override def putObject(bucketName: String, key: String, input: InputStream, metadata: ObjectMetadata): PutObjectResult = ???
override def putObject(bucketName: String, key: String, content: String): PutObjectResult = ???
override def copyObject(sourceBucketName: String, sourceKey: String, destinationBucketName: String, destinationKey: String): CopyObjectResult = ???
override def copyObject(copyObjectRequest: CopyObjectRequest): CopyObjectResult = ???
override def copyPart(copyPartRequest: CopyPartRequest): CopyPartResult = ???
override def deleteObject(bucketName: String, key: String): Unit = ???
override def deleteObject(deleteObjectRequest: DeleteObjectRequest): Unit = ???
override def deleteObjects(deleteObjectsRequest: DeleteObjectsRequest): DeleteObjectsResult = ???
override def deleteVersion(bucketName: String, key: String, versionId: String): Unit = ???
override def deleteVersion(deleteVersionRequest: DeleteVersionRequest): Unit = ???
override def getBucketLoggingConfiguration(bucketName: String): BucketLoggingConfiguration = ???
override def getBucketLoggingConfiguration(getBucketLoggingConfigurationRequest: GetBucketLoggingConfigurationRequest): BucketLoggingConfiguration = ???
override def setBucketLoggingConfiguration(setBucketLoggingConfigurationRequest: SetBucketLoggingConfigurationRequest): Unit = ???
override def getBucketVersioningConfiguration(bucketName: String): BucketVersioningConfiguration = ???
override def getBucketVersioningConfiguration(getBucketVersioningConfigurationRequest: GetBucketVersioningConfigurationRequest): BucketVersioningConfiguration = ???
override def setBucketVersioningConfiguration(setBucketVersioningConfigurationRequest: SetBucketVersioningConfigurationRequest): Unit = ???
override def getBucketLifecycleConfiguration(bucketName: String): BucketLifecycleConfiguration = ???
override def getBucketLifecycleConfiguration(getBucketLifecycleConfigurationRequest: GetBucketLifecycleConfigurationRequest): BucketLifecycleConfiguration = ???
override def setBucketLifecycleConfiguration(bucketName: String, bucketLifecycleConfiguration: BucketLifecycleConfiguration): Unit = ???
override def setBucketLifecycleConfiguration(setBucketLifecycleConfigurationRequest: SetBucketLifecycleConfigurationRequest): Unit = ???
override def deleteBucketLifecycleConfiguration(bucketName: String): Unit = ???
override def deleteBucketLifecycleConfiguration(deleteBucketLifecycleConfigurationRequest: DeleteBucketLifecycleConfigurationRequest): Unit = ???
override def getBucketCrossOriginConfiguration(bucketName: String): BucketCrossOriginConfiguration = ???
override def getBucketCrossOriginConfiguration(getBucketCrossOriginConfigurationRequest: GetBucketCrossOriginConfigurationRequest): BucketCrossOriginConfiguration = ???
override def setBucketCrossOriginConfiguration(bucketName: String, bucketCrossOriginConfiguration: BucketCrossOriginConfiguration): Unit = ???
override def setBucketCrossOriginConfiguration(setBucketCrossOriginConfigurationRequest: SetBucketCrossOriginConfigurationRequest): Unit = ???
override def deleteBucketCrossOriginConfiguration(bucketName: String): Unit = ???
override def deleteBucketCrossOriginConfiguration(deleteBucketCrossOriginConfigurationRequest: DeleteBucketCrossOriginConfigurationRequest): Unit = ???
override def getBucketTaggingConfiguration(bucketName: String): BucketTaggingConfiguration = ???
override def getBucketTaggingConfiguration(getBucketTaggingConfigurationRequest: GetBucketTaggingConfigurationRequest): BucketTaggingConfiguration = ???
override def setBucketTaggingConfiguration(bucketName: String, bucketTaggingConfiguration: BucketTaggingConfiguration): Unit = ???
override def setBucketTaggingConfiguration(setBucketTaggingConfigurationRequest: SetBucketTaggingConfigurationRequest): Unit = ???
override def deleteBucketTaggingConfiguration(bucketName: String): Unit = ???
override def deleteBucketTaggingConfiguration(deleteBucketTaggingConfigurationRequest: DeleteBucketTaggingConfigurationRequest): Unit = ???
override def getBucketNotificationConfiguration(bucketName: String): BucketNotificationConfiguration = ???
override def getBucketNotificationConfiguration(getBucketNotificationConfigurationRequest: GetBucketNotificationConfigurationRequest): BucketNotificationConfiguration = ???
override def setBucketNotificationConfiguration(setBucketNotificationConfigurationRequest: SetBucketNotificationConfigurationRequest): Unit = ???
override def setBucketNotificationConfiguration(bucketName: String, bucketNotificationConfiguration: BucketNotificationConfiguration): Unit = ???
override def getBucketWebsiteConfiguration(bucketName: String): BucketWebsiteConfiguration = ???
override def getBucketWebsiteConfiguration(getBucketWebsiteConfigurationRequest: GetBucketWebsiteConfigurationRequest): BucketWebsiteConfiguration = ???
override def setBucketWebsiteConfiguration(bucketName: String, configuration: BucketWebsiteConfiguration): Unit = ???
override def setBucketWebsiteConfiguration(setBucketWebsiteConfigurationRequest: SetBucketWebsiteConfigurationRequest): Unit = ???
override def deleteBucketWebsiteConfiguration(bucketName: String): Unit = ???
override def deleteBucketWebsiteConfiguration(deleteBucketWebsiteConfigurationRequest: DeleteBucketWebsiteConfigurationRequest): Unit = ???
override def getBucketPolicy(bucketName: String): BucketPolicy = ???
override def getBucketPolicy(getBucketPolicyRequest: GetBucketPolicyRequest): BucketPolicy = ???
override def setBucketPolicy(bucketName: String, policyText: String): Unit = ???
override def setBucketPolicy(setBucketPolicyRequest: SetBucketPolicyRequest): Unit = ???
override def deleteBucketPolicy(bucketName: String): Unit = ???
override def deleteBucketPolicy(deleteBucketPolicyRequest: DeleteBucketPolicyRequest): Unit = ???
override def generatePresignedUrl(bucketName: String, key: String, expiration: Date): URL = ???
override def generatePresignedUrl(bucketName: String, key: String, expiration: Date, method: HttpMethod): URL = ???
override def generatePresignedUrl(generatePresignedUrlRequest: GeneratePresignedUrlRequest): URL = ???
override def initiateMultipartUpload(request: InitiateMultipartUploadRequest): InitiateMultipartUploadResult = ???
override def uploadPart(request: UploadPartRequest): UploadPartResult = ???
override def listParts(request: ListPartsRequest): PartListing = ???
override def abortMultipartUpload(request: AbortMultipartUploadRequest): Unit = ???
override def completeMultipartUpload(request: CompleteMultipartUploadRequest): CompleteMultipartUploadResult = ???
override def listMultipartUploads(request: ListMultipartUploadsRequest): MultipartUploadListing = ???
override def getCachedResponseMetadata(request: AmazonWebServiceRequest): S3ResponseMetadata = ???
override def restoreObject(request: RestoreObjectRequest): Unit = ???
override def restoreObjectV2(request: RestoreObjectRequest): RestoreObjectResult = ???
override def restoreObject(bucketName: String, key: String, expirationInDays: Int): Unit = ???
override def enableRequesterPays(bucketName: String): Unit = ???
override def disableRequesterPays(bucketName: String): Unit = ???
override def isRequesterPaysEnabled(bucketName: String): Boolean = ???
override def setBucketReplicationConfiguration(bucketName: String, configuration: BucketReplicationConfiguration): Unit = ???
override def setBucketReplicationConfiguration(setBucketReplicationConfigurationRequest: SetBucketReplicationConfigurationRequest): Unit = ???
override def getBucketReplicationConfiguration(bucketName: String): BucketReplicationConfiguration = ???
override def getBucketReplicationConfiguration(getBucketReplicationConfigurationRequest: GetBucketReplicationConfigurationRequest): BucketReplicationConfiguration = ???
override def deleteBucketReplicationConfiguration(bucketName: String): Unit = ???
override def deleteBucketReplicationConfiguration(request: DeleteBucketReplicationConfigurationRequest): Unit = ???
override def doesObjectExist(bucketName: String, objectName: String): Boolean = ???
override def getBucketAccelerateConfiguration(bucketName: String): BucketAccelerateConfiguration = ???
override def getBucketAccelerateConfiguration(getBucketAccelerateConfigurationRequest: GetBucketAccelerateConfigurationRequest): BucketAccelerateConfiguration = ???
override def setBucketAccelerateConfiguration(bucketName: String, accelerateConfiguration: BucketAccelerateConfiguration): Unit = ???
override def setBucketAccelerateConfiguration(setBucketAccelerateConfigurationRequest: SetBucketAccelerateConfigurationRequest): Unit = ???
override def deleteBucketMetricsConfiguration(bucketName: String, id: String): DeleteBucketMetricsConfigurationResult = ???
override def deleteBucketMetricsConfiguration(deleteBucketMetricsConfigurationRequest: DeleteBucketMetricsConfigurationRequest): DeleteBucketMetricsConfigurationResult = ???
override def getBucketMetricsConfiguration(bucketName: String, id: String): GetBucketMetricsConfigurationResult = ???
override def getBucketMetricsConfiguration(getBucketMetricsConfigurationRequest: GetBucketMetricsConfigurationRequest): GetBucketMetricsConfigurationResult = ???
override def setBucketMetricsConfiguration(bucketName: String, metricsConfiguration: MetricsConfiguration): SetBucketMetricsConfigurationResult = ???
override def setBucketMetricsConfiguration(setBucketMetricsConfigurationRequest: SetBucketMetricsConfigurationRequest): SetBucketMetricsConfigurationResult = ???
override def listBucketMetricsConfigurations(listBucketMetricsConfigurationsRequest: ListBucketMetricsConfigurationsRequest): ListBucketMetricsConfigurationsResult = ???
override def deleteBucketAnalyticsConfiguration(bucketName: String, id: String): DeleteBucketAnalyticsConfigurationResult = ???
override def deleteBucketAnalyticsConfiguration(deleteBucketAnalyticsConfigurationRequest: DeleteBucketAnalyticsConfigurationRequest): DeleteBucketAnalyticsConfigurationResult = ???
override def getBucketAnalyticsConfiguration(bucketName: String, id: String): GetBucketAnalyticsConfigurationResult = ???
override def getBucketAnalyticsConfiguration(getBucketAnalyticsConfigurationRequest: GetBucketAnalyticsConfigurationRequest): GetBucketAnalyticsConfigurationResult = ???
override def setBucketAnalyticsConfiguration(bucketName: String, analyticsConfiguration: AnalyticsConfiguration): SetBucketAnalyticsConfigurationResult = ???
override def setBucketAnalyticsConfiguration(setBucketAnalyticsConfigurationRequest: SetBucketAnalyticsConfigurationRequest): SetBucketAnalyticsConfigurationResult = ???
override def listBucketAnalyticsConfigurations(listBucketAnalyticsConfigurationsRequest: ListBucketAnalyticsConfigurationsRequest): ListBucketAnalyticsConfigurationsResult = ???
override def deleteBucketInventoryConfiguration(bucketName: String, id: String): DeleteBucketInventoryConfigurationResult = ???
override def deleteBucketInventoryConfiguration(deleteBucketInventoryConfigurationRequest: DeleteBucketInventoryConfigurationRequest): DeleteBucketInventoryConfigurationResult = ???
override def getBucketInventoryConfiguration(bucketName: String, id: String): GetBucketInventoryConfigurationResult = ???
override def getBucketInventoryConfiguration(getBucketInventoryConfigurationRequest: GetBucketInventoryConfigurationRequest): GetBucketInventoryConfigurationResult = ???
override def setBucketInventoryConfiguration(bucketName: String, inventoryConfiguration: InventoryConfiguration): SetBucketInventoryConfigurationResult = ???
override def setBucketInventoryConfiguration(setBucketInventoryConfigurationRequest: SetBucketInventoryConfigurationRequest): SetBucketInventoryConfigurationResult = ???
override def listBucketInventoryConfigurations(listBucketInventoryConfigurationsRequest: ListBucketInventoryConfigurationsRequest): ListBucketInventoryConfigurationsResult = ???
override def deleteBucketEncryption(bucketName: String): DeleteBucketEncryptionResult = ???
override def deleteBucketEncryption(request: DeleteBucketEncryptionRequest): DeleteBucketEncryptionResult = ???
override def getBucketEncryption(bucketName: String): GetBucketEncryptionResult = ???
override def getBucketEncryption(request: GetBucketEncryptionRequest): GetBucketEncryptionResult = ???
override def setBucketEncryption(setBucketEncryptionRequest: SetBucketEncryptionRequest): SetBucketEncryptionResult = ???
override def setPublicAccessBlock(request: SetPublicAccessBlockRequest): SetPublicAccessBlockResult = ???
override def getPublicAccessBlock(request: GetPublicAccessBlockRequest): GetPublicAccessBlockResult = ???
override def deletePublicAccessBlock(request: DeletePublicAccessBlockRequest): DeletePublicAccessBlockResult = ???
override def getBucketPolicyStatus(request: GetBucketPolicyStatusRequest): GetBucketPolicyStatusResult = ???
override def selectObjectContent(selectRequest: SelectObjectContentRequest): SelectObjectContentResult = ???
override def setObjectLegalHold(setObjectLegalHoldRequest: SetObjectLegalHoldRequest): SetObjectLegalHoldResult = ???
override def getObjectLegalHold(getObjectLegalHoldRequest: GetObjectLegalHoldRequest): GetObjectLegalHoldResult = ???
override def setObjectLockConfiguration(setObjectLockConfigurationRequest: SetObjectLockConfigurationRequest): SetObjectLockConfigurationResult = ???
override def getObjectLockConfiguration(getObjectLockConfigurationRequest: GetObjectLockConfigurationRequest): GetObjectLockConfigurationResult = ???
override def setObjectRetention(setObjectRetentionRequest: SetObjectRetentionRequest): SetObjectRetentionResult = ???
override def getObjectRetention(getObjectRetentionRequest: GetObjectRetentionRequest): GetObjectRetentionResult = ???
override def download(presignedUrlDownloadRequest: PresignedUrlDownloadRequest): PresignedUrlDownloadResult = ???
override def download(presignedUrlDownloadRequest: PresignedUrlDownloadRequest, destinationFile: File): Unit = ???
override def upload(presignedUrlUploadRequest: PresignedUrlUploadRequest): PresignedUrlUploadResult = ???
override def shutdown(): Unit = ???
override def getRegion: model.Region = ???
override def getRegionName: String = ???
override def getUrl(bucketName: String, key: String): URL = ???
override def waiters(): AmazonS3Waiters = ???
}

View file

@ -0,0 +1,15 @@
package net.kemitix.s3thorp.awssdk
import com.github.j5ik2o.reactive.aws.s3.S3AsyncClient
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
import software.amazon.awssdk.services.s3
trait MyS3CatsIOClient extends S3CatsIOClient {
override val underlying: S3AsyncClient = new S3AsyncClient {
override val underlying: s3.S3AsyncClient = new s3.S3AsyncClient {
override def serviceName(): String = "fake-s3-client"
override def close(): Unit = ()
}
}
}

View file

@ -0,0 +1,112 @@
package net.kemitix.s3thorp.awssdk
import java.io.File
import java.time.Instant
import com.amazonaws.AmazonClientException
import com.amazonaws.services.s3.model
import com.amazonaws.services.s3.transfer.model.UploadResult
import com.amazonaws.services.s3.transfer.{PauseResult, PersistableUpload, Transfer, TransferManager, TransferProgress, Upload}
import net.kemitix.s3thorp.{Bucket, Config, KeyGenerator, LastModified, MD5Hash, MD5HashGenerator, RemoteKey, Resource, UnitTest, UploadS3Action}
class S3ClientMultiPartTransferManagerSuite
extends UnitTest
with KeyGenerator {
private val source = Resource(this, "..")
private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
private val fileToKey = generateKey(config.source, config.prefix) _
val lastModified = LastModified(Instant.now())
describe("S3ClientMultiPartTransferManagerSuite") {
describe("accepts") {
val transferManager = new MyTransferManager(("", "", new File("")), RemoteKey(""), MD5Hash(""))
val uploader = new S3ClientMultiPartTransferManager(transferManager)
describe("small-file") {
val smallFile = aLocalFile("small-file", MD5Hash("the-hash"), source, fileToKey)
it("should be a small-file") {
assert(smallFile.file.length < 5 * 1024 * 1024)
}
it("should not accept small-file") {
assertResult(false)(uploader.accepts(smallFile))
}
}
describe("big-file") {
val bigFile = aLocalFile("big-file", MD5Hash("the-hash"), source, fileToKey)
it("should be a big-file") {
assert(bigFile.file.length > 5 * 1024 * 1024)
}
it("should accept big-file") {
assertResult(true)(uploader.accepts(bigFile))
}
}
}
describe("upload") {
val returnedKey = RemoteKey("returned-key")
val returnedHash = MD5Hash("returned-hash")
val bigFile = aLocalFile("small-file", MD5Hash("the-hash"), source, fileToKey)
val transferManager = new MyTransferManager(
(config.bucket.name, bigFile.remoteKey.key, bigFile.file),
returnedKey, returnedHash)
val uploader = new S3ClientMultiPartTransferManager(transferManager)
it("should upload") {
val expected = UploadS3Action(returnedKey, returnedHash)
val result = uploader.upload(bigFile, config.bucket, 1).unsafeRunSync
assertResult(expected)(result)
}
}
}
class MyTransferManager(signature: (String, String, File),
returnedKey: RemoteKey,
returnedHash: MD5Hash) extends TransferManager {
override def upload(bucketName: String, key: String, file: File): Upload = {
if ((bucketName, key, file) == signature) {
new MyUpload {
override def waitForUploadResult(): UploadResult = {
val result = new UploadResult()
result.setBucketName(bucketName)
result.setETag(returnedHash.hash)
result.setKey(returnedKey.key)
result.setVersionId("version-id")
result
}
}
} else new MyUpload
}
}
class MyUpload extends Upload {
override def waitForUploadResult(): UploadResult = ???
override def pause(): PersistableUpload = ???
override def tryPause(forceCancelTransfers: Boolean): PauseResult[PersistableUpload] = ???
override def abort(): Unit = ???
override def isDone: Boolean = ???
override def waitForCompletion(): Unit = ???
override def waitForException(): AmazonClientException = ???
override def getDescription: String = ???
override def getState: Transfer.TransferState = ???
override def getProgress: TransferProgress = ???
import com.amazonaws.event.ProgressListener
override def addProgressListener(listener: ProgressListener): Unit = ???
override def removeProgressListener(listener: ProgressListener): Unit = ???
override def addProgressListener(listener: model.ProgressListener): Unit = ???
override def removeProgressListener(listener: model.ProgressListener): Unit = ???
}
}

View file

@ -0,0 +1,279 @@
package net.kemitix.s3thorp.awssdk
import scala.collection.JavaConverters._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
import com.amazonaws.services.s3.model.{Bucket => _, _}
import net.kemitix.s3thorp._
class S3ClientMultiPartUploaderSuite
extends UnitTest
with KeyGenerator {
private val source = Resource(this, "..")
private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
private val fileToKey = generateKey(config.source, config.prefix) _
describe("multi-part uploader accepts") {
val uploader = new S3ClientMultiPartUploader(new MyAmazonS3Client {})
it("should reject small file") {
// small-file: dd if=/dev/urandom of=src/test/resources/net/kemitix/s3thorp/small-file bs=1047552 count=5
// 1047552 = 1024 * 1023
// file size 5kb under 5Mb threshold
val smallFile = aLocalFile("small-file", MD5Hash(""), source, fileToKey)
assert(smallFile.file.exists, "sample small file is missing")
assert(smallFile.file.length == 5 * 1024 * 1023, "sample small file is wrong size")
val result = uploader.accepts(smallFile)
assertResult(false)(result)
}
it("should accept big file") {
// big-file: dd if=/dev/urandom of=src/test/resources/net/kemitix/s3thorp/big-file bs=1049600 count=5
// 1049600 = 1024 * 1025
// file size 5kb over 5Mb threshold
val bigFile = aLocalFile("big-file", MD5Hash(""), source, fileToKey)
assert(bigFile.file.exists, "sample big file is missing")
assert(bigFile.file.length == 5 * 1024 * 1025, "sample big file is wrong size")
val result = uploader.accepts(bigFile)
assertResult(true)(result)
}
}
def uploadPartRequest(partNumber: Int) = {
val request = new UploadPartRequest
request.setPartNumber(partNumber)
request
}
def uploadPartResult(eTag: String) = {
val result = new UploadPartResult
result.setETag(eTag)
result
}
describe("mulit-part uploader upload") {
val theFile = aLocalFile("big-file", MD5Hash(""), source, fileToKey)
val uploadId = "upload-id"
val createUploadResponse = new InitiateMultipartUploadResult()
createUploadResponse.setBucketName(config.bucket.name)
createUploadResponse.setKey(theFile.remoteKey.key)
createUploadResponse.setUploadId(uploadId)
val uploadPartRequest1 = uploadPartRequest(1)
val uploadPartRequest2 = uploadPartRequest(2)
val uploadPartRequest3 = uploadPartRequest(3)
val part1md5 = "aadf0d266cefe0fcdb241a51798d74b3"
val part2md5 = "16e08d53ca36e729d808fd5e4f7e35dc"
val uploadPartResponse1 = uploadPartResult(part1md5)
val uploadPartResponse2 = uploadPartResult(part2md5)
val uploadPartResponse3 = uploadPartResult("part-3")
val completeUploadResponse = new CompleteMultipartUploadResult()
completeUploadResponse.setETag("hash")
describe("multi-part uploader upload components") {
val uploader = new RecordingMultiPartUploader()
describe("create upload request") {
val request = uploader.createUploadRequest(config.bucket, theFile)
it("should have bucket") {
assertResult(config.bucket.name)(request.getBucketName)
}
it("should have key") {
assertResult(theFile.remoteKey.key)(request.getKey)
}
}
describe("initiate upload") {
it("should createMultipartUpload") {
val expected = createUploadResponse
val result = uploader.createUpload(config.bucket, theFile).unsafeRunSync
assertResult(expected)(result)
}
}
describe("create UploadPartRequests for file") {
val chunkSize = 5l * 1024 * 1025 / 2
// to create expected md5 values for each chunk:
// split -d -b $((5 * 1024 * 1025 / 2)) big-file
// creates x00 and x01
// md5sum x0[01]
val result = uploader.parts(theFile, createUploadResponse).unsafeRunSync.toList
it("should create two parts") {
assertResult(2)(result.size)
}
it("create part 1") {
val part1 = result(0)
assertResult((1, chunkSize, part1md5))((part1.getPartNumber, part1.getPartSize, part1.getMd5Digest))
}
it("create part 2") {
val part2 = result(1)
assertResult((2, chunkSize, part2md5))((part2.getPartNumber, part2.getPartSize, part2.getMd5Digest))
}
}
describe("upload part") {
it("should uploadPart") {
val expected = uploadPartResponse3
val result = uploader.uploadPart(theFile)(config)(uploadPartRequest3).unsafeRunSync
assertResult(expected)(result)
}
}
describe("upload parts") {
val uploadPartRequests = Stream(uploadPartRequest1, uploadPartRequest2)
it("should uploadPart for each") {
val expected = List(uploadPartResponse1, uploadPartResponse2)
val result = uploader.uploadParts(theFile, uploadPartRequests).unsafeRunSync.toList
assertResult(expected)(result)
}
}
describe("create complete request") {
val request = uploader.createCompleteRequest(createUploadResponse, List(uploadPartResponse1, uploadPartResponse2))
it("should have the bucket name") {
assertResult(config.bucket.name)(request.getBucketName)
}
it("should have the key") {
assertResult(theFile.remoteKey.key)(request.getKey)
}
it("should have the upload id") {
assertResult(uploadId)(request.getUploadId)
}
it("should have the etags") {
val expected = List(new PartETag(1, part1md5), new PartETag(2, part2md5))
assertResult(expected.map(_.getETag))(request.getPartETags.asScala.map(_.getETag))
}
}
describe("complete upload") {
val uploadPartResponses = Stream(uploadPartResponse1, uploadPartResponse2, uploadPartResponse3)
it("should completeUpload") {
val expected = completeUploadResponse
val result = uploader.completeUpload(createUploadResponse, uploadPartResponses, theFile).unsafeRunSync
assertResult(expected)(result)
}
}
describe("create abort request") {
val abortRequest = uploader.createAbortRequest(uploadId, theFile)
it("should have the upload id") {
assertResult(uploadId)(abortRequest.getUploadId)
}
it("should have the bucket") {
assertResult(config.bucket.name)(abortRequest.getBucketName)
}
it("should have the key") {
assertResult(theFile.remoteKey.key)(abortRequest.getKey)
}
}
describe("abort upload") {
it("should abortUpload") {
pending
}
}
}
describe("multi-part uploader upload complete") {
describe("upload") {
describe("when all okay") {
val uploader = new RecordingMultiPartUploader()
uploader.upload(theFile, config.bucket, 1).unsafeRunSync
it("should initiate the upload") {
assert(uploader.initiated.get)
}
it("should upload both parts") {
assertResult(Set(1, 2))(uploader.partsUploaded.get)
}
it("should complete the upload") {
assert(uploader.completed.get)
}
}
describe("when initiate upload fails") {
val uploader = new RecordingMultiPartUploader(initOkay = false)
uploader.upload(theFile, config.bucket, 1).unsafeRunSync
it("should not upload any parts") {
assertResult(Set())(uploader.partsUploaded.get)
}
it("should not complete the upload") {
assertResult(false)(uploader.completed.get)
}
}
describe("when uploading a part fails once") {
val uploader = new RecordingMultiPartUploader(partTriesRequired = 2)
uploader.upload(theFile, config.bucket, 1).unsafeRunSync
it("should initiate the upload") {
assert(uploader.initiated.get)
}
it("should upload all parts") {
assertResult(Set(1, 2))(uploader.partsUploaded.get)
}
it("should complete the upload") {
assert(uploader.completed.get)
}
}
describe("when uploading a part fails too many times") {
val uploader = new RecordingMultiPartUploader(partTriesRequired = 4)
uploader.upload(theFile, config.bucket, 1).unsafeRunSync
it("should initiate the upload") {
assert(uploader.initiated.get)
}
it("should not complete the upload") {
assertResult(Set())(uploader.partsUploaded.get)
}
it("should cancel the upload") {
assert(uploader.canceled.get)
}
}
}
}
class RecordingMultiPartUploader(initOkay: Boolean = true,
partTriesRequired: Int = 1,
val initiated: AtomicBoolean = new AtomicBoolean(false),
val partsUploaded: AtomicReference[Set[Int]] = new AtomicReference[Set[Int]](Set()),
val part0Tries: AtomicInteger = new AtomicInteger(0),
val part1Tries: AtomicInteger = new AtomicInteger(0),
val part2Tries: AtomicInteger = new AtomicInteger(0),
val completed: AtomicBoolean = new AtomicBoolean(false),
val canceled: AtomicBoolean = new AtomicBoolean(false))
extends S3ClientMultiPartUploader(
new MyAmazonS3Client {
def error[A]: A = {
val exception = new AmazonS3Exception("error")
exception.setAdditionalDetails(Map("Content-MD5" -> "(hash)").asJava)
throw exception
}
override def initiateMultipartUpload(createMultipartUploadRequest: InitiateMultipartUploadRequest): InitiateMultipartUploadResult =
if (initOkay) {
initiated set true
createUploadResponse
}
else error
override def uploadPart(uploadPartRequest: UploadPartRequest): UploadPartResult =
uploadPartRequest match {
case _ if uploadPartRequest.getPartNumber == 1 => {
if (part0Tries.incrementAndGet >= partTriesRequired) {
partsUploaded getAndUpdate (t => t + 1)
uploadPartResponse1
}
else error
}
case _ if uploadPartRequest.getPartNumber == 2 => {
if (part1Tries.incrementAndGet >= partTriesRequired) {
partsUploaded getAndUpdate (t => t + 2)
uploadPartResponse2
}
else error
}
case _ if uploadPartRequest.getPartNumber == 3 => {
if (part2Tries.incrementAndGet >= partTriesRequired) {
partsUploaded getAndUpdate (t => t + 3)
uploadPartResponse3
}
else error
}
}
override def completeMultipartUpload(completeMultipartUploadRequest: CompleteMultipartUploadRequest): CompleteMultipartUploadResult = {
completed set true
completeUploadResponse
}
override def abortMultipartUpload(abortMultipartUploadRequest: AbortMultipartUploadRequest): Unit = {
canceled set true
}
}) {}
}
}

View file

@ -76,7 +76,7 @@ class S3ClientSuite
describe("upload") { describe("upload") {
def invoke(s3Client: ThorpS3Client, localFile: LocalFile, bucket: Bucket) = def invoke(s3Client: ThorpS3Client, localFile: LocalFile, bucket: Bucket) =
s3Client.upload(localFile, bucket).unsafeRunSync s3Client.upload(localFile, bucket, 1).unsafeRunSync
describe("when uploading a file") { describe("when uploading a file") {
val md5Hash = MD5Hash("the-md5hash") val md5Hash = MD5Hash("the-md5hash")
val s3Client = new ThorpS3Client( val s3Client = new ThorpS3Client(

View file

@ -57,13 +57,5 @@ class ThorpS3ClientSuite extends FunSpec {
assertResult(expected)(result) assertResult(expected)(result)
} }
} }
trait MyS3CatsIOClient extends S3CatsIOClient {
override val underlying: S3AsyncClient = new S3AsyncClient {
override val underlying: s3.S3AsyncClient = new s3.S3AsyncClient {
override def serviceName(): String = "fake-s3-client"
override def close(): Unit = ()
}
}
}
} }