Display upload progress (#29)

* [S3ClientMultiPartTransferManager] use request object

* [ActionSubmitter] unwrap RemoteKey in log messages

* [ActionSubmitter] rename variable

* [Logging] include log level in info messages

* [LocalFileStream] log when entering directory at level 2

* [UploadProgress{Listener,Logging}: add initial implementations

* [S3Client] def upload not requires an UploadProgressListener as a parameter

* [UploadProgressListener] rename method

* [S3ClientPutObjectUploader] Log upload progress for file <5Mb

Switched to using the AWS SDK V1 for PutObject as the V2 doesn't
support progress callbacks.

* Fix up tests

* Adjust logging levels
This commit is contained in:
Paul Campbell 2019-05-30 16:59:37 +01:00 committed by GitHub
parent 602c5ef150
commit 574d4c5885
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 587 additions and 93 deletions

View file

@ -1,7 +1,7 @@
package net.kemitix.s3thorp package net.kemitix.s3thorp
import cats.effect.IO import cats.effect.IO
import net.kemitix.s3thorp.awssdk.S3Client import net.kemitix.s3thorp.awssdk.{S3Client, UploadProgressListener}
trait ActionSubmitter trait ActionSubmitter
extends S3Client extends S3Client
@ -11,14 +11,15 @@ trait ActionSubmitter
(implicit c: Config): Stream[IO[S3Action]] = { (implicit c: Config): Stream[IO[S3Action]] = {
Stream( Stream(
action match { action match {
case ToUpload(file) => case ToUpload(localFile) =>
log4(s" Upload: ${file.relative}") log4(s" Upload: ${localFile.relative}")
upload(file, c.bucket, 1) val progressListener = new UploadProgressListener(localFile)
upload(localFile, c.bucket, progressListener, 1)
case ToCopy(sourceKey, hash, targetKey) => case ToCopy(sourceKey, hash, targetKey) =>
log4(s" Copy: $sourceKey => $targetKey") log4(s" Copy: ${sourceKey.key} => ${targetKey.key}")
copy(c.bucket, sourceKey, hash, targetKey) copy(c.bucket, sourceKey, hash, targetKey)
case ToDelete(remoteKey) => case ToDelete(remoteKey) =>
log4(s" Delete: $remoteKey") log4(s" Delete: ${remoteKey.key}")
delete(c.bucket, remoteKey) delete(c.bucket, remoteKey)
case DoNothing(remoteKey) => IO { case DoNothing(remoteKey) => IO {
DoNothingS3Action(remoteKey)} DoNothingS3Action(remoteKey)}

View file

@ -8,7 +8,7 @@ trait LocalFileStream
def findFiles(file: File) def findFiles(file: File)
(implicit c: Config): Stream[LocalFile] = { (implicit c: Config): Stream[LocalFile] = {
log5(s"- Entering: $file") log2(s"- Entering: $file")
val files = for { val files = for {
f <- dirPaths(file) f <- dirPaths(file)
.filter { f => f.isDirectory || c.filters.forall { filter => filter isIncluded f.toPath } } .filter { f => f.isDirectory || c.filters.forall { filter => filter isIncluded f.toPath } }

View file

@ -4,15 +4,15 @@ import com.typesafe.scalalogging.LazyLogging
trait Logging extends LazyLogging { trait Logging extends LazyLogging {
def log1(message: String)(implicit config: Config): Unit = if (config.verbose >= 1) logger.info(message) def log1(message: String)(implicit config: Config): Unit = if (config.verbose >= 1) logger.info(s"1:$message")
def log2(message: String)(implicit config: Config): Unit = if (config.verbose >= 2) logger.info(message) def log2(message: String)(implicit config: Config): Unit = if (config.verbose >= 2) logger.info(s"2:$message")
def log3(message: String)(implicit config: Config): Unit = if (config.verbose >= 3) logger.info(message) def log3(message: String)(implicit config: Config): Unit = if (config.verbose >= 3) logger.info(s"3:$message")
def log4(message: String)(implicit config: Config): Unit = if (config.verbose >= 4) logger.info(message) def log4(message: String)(implicit config: Config): Unit = if (config.verbose >= 4) logger.info(s"4:$message")
def log5(message: String)(implicit config: Config): Unit = if (config.verbose >= 5) logger.info(message) def log5(message: String)(implicit config: Config): Unit = if (config.verbose >= 5) logger.info(s"5:$message")
def warn(message: String): Unit = logger.warn(message) def warn(message: String): Unit = logger.warn(message)

View file

@ -2,7 +2,7 @@ package net.kemitix.s3thorp
import cats.effect.IO import cats.effect.IO
import cats.implicits._ import cats.implicits._
import net.kemitix.s3thorp.awssdk.{S3Client, S3ObjectsData} import net.kemitix.s3thorp.awssdk.{S3Client, S3ObjectsData, UploadProgressListener}
class Sync(s3Client: S3Client) class Sync(s3Client: S3Client)
extends LocalFileStream extends LocalFileStream
@ -39,9 +39,10 @@ class Sync(s3Client: S3Client)
override def upload(localFile: LocalFile, override def upload(localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
progressListener: UploadProgressListener,
tryCount: Int) tryCount: Int)
(implicit c: Config): IO[S3Action] = (implicit c: Config): IO[S3Action] =
s3Client.upload(localFile, bucket, tryCount) s3Client.upload(localFile, bucket, progressListener, tryCount)
override def copy(bucket: Bucket, override def copy(bucket: Bucket,
sourceKey: RemoteKey, sourceKey: RemoteKey,

View file

@ -1,6 +1,8 @@
package net.kemitix.s3thorp.awssdk package net.kemitix.s3thorp.awssdk
import cats.effect.IO import cats.effect.IO
import com.amazonaws.services.s3.{AmazonS3, AmazonS3Client, AmazonS3ClientBuilder}
import com.amazonaws.services.s3.transfer.{TransferManager, TransferManagerBuilder}
import com.github.j5ik2o.reactive.aws.s3.S3AsyncClient import com.github.j5ik2o.reactive.aws.s3.S3AsyncClient
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
import net.kemitix.s3thorp._ import net.kemitix.s3thorp._
@ -20,6 +22,7 @@ trait S3Client {
def upload(localFile: LocalFile, def upload(localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
progressListener: UploadProgressListener,
tryCount: Int tryCount: Int
)(implicit c: Config): IO[S3Action] )(implicit c: Config): IO[S3Action]
@ -37,11 +40,15 @@ trait S3Client {
object S3Client { object S3Client {
def createClient(s3AsyncClient: S3AsyncClient): S3Client = { def createClient(s3AsyncClient: S3AsyncClient,
new ThorpS3Client(S3CatsIOClient(s3AsyncClient)) amazonS3Client: AmazonS3,
amazonS3TransferManager: TransferManager): S3Client = {
new ThorpS3Client(S3CatsIOClient(s3AsyncClient), amazonS3Client, amazonS3TransferManager)
} }
val defaultClient: S3Client = val defaultClient: S3Client =
createClient(new JavaClientWrapper {}.underlying) createClient(new JavaClientWrapper {}.underlying,
AmazonS3ClientBuilder.defaultClient,
TransferManagerBuilder.defaultTransferManager)
} }

View file

@ -1,8 +1,9 @@
package net.kemitix.s3thorp.awssdk package net.kemitix.s3thorp.awssdk
import cats.effect.IO import cats.effect.IO
import com.amazonaws.services.s3.model.PutObjectResult
import net.kemitix.s3thorp.{Bucket, Config, LocalFile, Logging, RemoteKey} import net.kemitix.s3thorp.{Bucket, Config, LocalFile, Logging, RemoteKey}
import software.amazon.awssdk.services.s3.model.{CopyObjectResponse, DeleteObjectResponse, ListObjectsV2Response, PutObjectResponse} import software.amazon.awssdk.services.s3.model.{CopyObjectResponse, DeleteObjectResponse, ListObjectsV2Response}
trait S3ClientLogging trait S3ClientLogging
extends Logging { extends Logging {
@ -26,7 +27,7 @@ trait S3ClientLogging
def logUploadStart(localFile: LocalFile, def logUploadStart(localFile: LocalFile,
bucket: Bucket) bucket: Bucket)
(implicit c: Config): PutObjectResponse => IO[PutObjectResponse] = { (implicit c: Config): PutObjectResult => IO[PutObjectResult] = {
in => IO { in => IO {
log4(s"Uploading: ${bucket.name}:${localFile.remoteKey.key}") log4(s"Uploading: ${bucket.name}:${localFile.remoteKey.key}")
in in
@ -35,9 +36,9 @@ trait S3ClientLogging
def logUploadFinish(localFile: LocalFile, def logUploadFinish(localFile: LocalFile,
bucket: Bucket) bucket: Bucket)
(implicit c: Config): PutObjectResponse => IO[Unit] = { (implicit c: Config): PutObjectResult => IO[Unit] = {
in =>IO { in =>IO {
log3(s"Uploaded: ${bucket.name}:${localFile.remoteKey.key}") log1(s"Uploaded: ${bucket.name}:${localFile.remoteKey.key}")
} }
} }

View file

@ -1,9 +1,11 @@
package net.kemitix.s3thorp.awssdk package net.kemitix.s3thorp.awssdk
import cats.effect.IO import cats.effect.IO
import com.amazonaws.services.s3.model.PutObjectRequest
import com.amazonaws.services.s3.transfer.TransferManager import com.amazonaws.services.s3.transfer.TransferManager
import net.kemitix.s3thorp._ import net.kemitix.s3thorp._
class S3ClientMultiPartTransferManager(transferManager: TransferManager) class S3ClientMultiPartTransferManager(transferManager: => TransferManager)
extends S3ClientUploader extends S3ClientUploader
with S3ClientMultiPartUploaderLogging { with S3ClientMultiPartUploaderLogging {
@ -14,11 +16,15 @@ class S3ClientMultiPartTransferManager(transferManager: TransferManager)
override override
def upload(localFile: LocalFile, def upload(localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
progressListener: UploadProgressListener,
tryCount: Int) tryCount: Int)
(implicit c: Config): IO[S3Action] = { (implicit c: Config): IO[S3Action] = {
val putObjectRequest: PutObjectRequest =
new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file)
.withGeneralProgressListener(progressListener.listener)
IO { IO {
logMultiPartUploadStart(localFile, tryCount) logMultiPartUploadStart(localFile, tryCount)
val result = transferManager.upload(bucket.name, localFile.remoteKey.key, localFile.file) val result = transferManager.upload(putObjectRequest)
.waitForUploadResult() .waitForUploadResult()
logMultiPartUploadFinished(localFile) logMultiPartUploadFinished(localFile)
UploadS3Action(RemoteKey(result.getKey), MD5Hash(result.getETag)) UploadS3Action(RemoteKey(result.getKey), MD5Hash(result.getETag))

View file

@ -119,6 +119,7 @@ private class S3ClientMultiPartUploader(s3Client: AmazonS3)
override def upload(localFile: LocalFile, override def upload(localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
progressListener: UploadProgressListener,
tryCount: Int) tryCount: Int)
(implicit c: Config): IO[S3Action] = { (implicit c: Config): IO[S3Action] = {
logMultiPartUploadStart(localFile, tryCount) logMultiPartUploadStart(localFile, tryCount)
@ -136,10 +137,10 @@ private class S3ClientMultiPartUploader(s3Client: AmazonS3)
.handleErrorWith { .handleErrorWith {
case CancellableMultiPartUpload(e, uploadId) => case CancellableMultiPartUpload(e, uploadId) =>
if (tryCount >= c.maxRetries) IO(logErrorCancelling(e, localFile)) *> cancel(uploadId, localFile) *> IO.pure(ErroredS3Action(localFile.remoteKey, e)) if (tryCount >= c.maxRetries) IO(logErrorCancelling(e, localFile)) *> cancel(uploadId, localFile) *> IO.pure(ErroredS3Action(localFile.remoteKey, e))
else IO(logErrorRetrying(e, localFile, tryCount)) *> upload(localFile, bucket, tryCount + 1) else IO(logErrorRetrying(e, localFile, tryCount)) *> upload(localFile, bucket, progressListener, tryCount + 1)
case NonFatal(e) => case NonFatal(e) =>
if (tryCount >= c.maxRetries) IO(logErrorUnknown(e, localFile)) *> IO.pure(ErroredS3Action(localFile.remoteKey, e)) if (tryCount >= c.maxRetries) IO(logErrorUnknown(e, localFile)) *> IO.pure(ErroredS3Action(localFile.remoteKey, e))
else IO(logErrorRetrying(e, localFile, tryCount)) *> upload(localFile, bucket, tryCount + 1) else IO(logErrorRetrying(e, localFile, tryCount)) *> upload(localFile, bucket, progressListener, tryCount + 1)
} }
} }
} }

View file

@ -11,7 +11,7 @@ trait S3ClientMultiPartUploaderLogging
def logMultiPartUploadStart(localFile: LocalFile, def logMultiPartUploadStart(localFile: LocalFile,
tryCount: Int) tryCount: Int)
(implicit c: Config): Unit = (implicit c: Config): Unit =
log4(s"$prefix:upload:try $tryCount: ${localFile.remoteKey.key}") log1(s"$prefix:upload:try $tryCount: ${localFile.remoteKey.key}")
def logMultiPartUploadFinished(localFile: LocalFile) def logMultiPartUploadFinished(localFile: LocalFile)
(implicit c: Config): Unit = (implicit c: Config): Unit =
@ -56,7 +56,7 @@ trait S3ClientMultiPartUploaderLogging
uploadPartResponses: Stream[UploadPartResult], uploadPartResponses: Stream[UploadPartResult],
localFile: LocalFile) localFile: LocalFile)
(implicit c: Config): Unit = (implicit c: Config): Unit =
log4(s"$prefix:completed:parts ${uploadPartResponses.size}: ${localFile.remoteKey.key}") log1(s"$prefix:completed:parts ${uploadPartResponses.size}: ${localFile.remoteKey.key}")
def logMultiPartUploadCancelling(localFile: LocalFile) def logMultiPartUploadCancelling(localFile: LocalFile)
(implicit c: Config): Unit = (implicit c: Config): Unit =

View file

@ -1,12 +1,11 @@
package net.kemitix.s3thorp.awssdk package net.kemitix.s3thorp.awssdk
import cats.effect.IO import cats.effect.IO
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient import com.amazonaws.services.s3.AmazonS3
import net.kemitix.s3thorp.{Bucket, Config, LocalFile, MD5Hash, UploadS3Action} import com.amazonaws.services.s3.model.PutObjectRequest
import software.amazon.awssdk.core.async.AsyncRequestBody import net.kemitix.s3thorp._
import software.amazon.awssdk.services.s3.model.PutObjectRequest
private class S3ClientPutObjectUploader(s3Client: S3CatsIOClient) private class S3ClientPutObjectUploader(s3Client: => AmazonS3)
extends S3ClientUploader extends S3ClientUploader
with S3ClientLogging with S3ClientLogging
with QuoteStripper { with QuoteStripper {
@ -16,17 +15,17 @@ private class S3ClientPutObjectUploader(s3Client: S3CatsIOClient)
override override
def upload(localFile: LocalFile, def upload(localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
progressListener: UploadProgressListener,
tryCount: Int) tryCount: Int)
(implicit c: Config): IO[UploadS3Action] = { (implicit c: Config): IO[UploadS3Action] = {
val request = PutObjectRequest.builder val request: PutObjectRequest =
.bucket(bucket.name) new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file)
.key(localFile.remoteKey.key).build .withGeneralProgressListener(progressListener.listener)
val body = AsyncRequestBody.fromFile(localFile.file) IO(s3Client.putObject(request))
s3Client.putObject(request, body)
.bracket( .bracket(
logUploadStart(localFile, bucket))( logUploadStart(localFile, bucket))(
logUploadFinish(localFile, bucket)) logUploadFinish(localFile, bucket))
.map(_.eTag) .map(_.getETag)
.map(_ filter stripQuotes) .map(_ filter stripQuotes)
.map(MD5Hash) .map(MD5Hash)
.map(UploadS3Action(localFile.remoteKey, _)) .map(UploadS3Action(localFile.remoteKey, _))

View file

@ -10,6 +10,7 @@ trait S3ClientUploader {
def upload(localFile: LocalFile, def upload(localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
progressListener: UploadProgressListener,
tryCount: Int) tryCount: Int)
(implicit c: Config): IO[S3Action] (implicit c: Config): IO[S3Action]

View file

@ -1,24 +1,26 @@
package net.kemitix.s3thorp.awssdk package net.kemitix.s3thorp.awssdk
import cats.effect.IO import cats.effect.IO
import com.amazonaws.services.s3.transfer.TransferManagerBuilder import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.{AmazonS3Client, AmazonS3ClientBuilder} import com.amazonaws.services.s3.transfer.TransferManager
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
import net.kemitix.s3thorp._ import net.kemitix.s3thorp._
import software.amazon.awssdk.services.s3.model.{Bucket => _, _} import software.amazon.awssdk.services.s3.model.{Bucket => _}
private class ThorpS3Client(s3Client: S3CatsIOClient) private class ThorpS3Client(ioS3Client: S3CatsIOClient,
amazonS3Client: => AmazonS3,
amazonS3TransferManager: => TransferManager)
extends S3Client extends S3Client
with S3ClientLogging with S3ClientLogging
with QuoteStripper { with QuoteStripper {
lazy val amazonS3Client = AmazonS3ClientBuilder.defaultClient // lazy val amazonS3Client = AmazonS3ClientBuilder.defaultClient
lazy val amazonS3TransferManager = TransferManagerBuilder.defaultTransferManager // lazy val amazonS3TransferManager = TransferManagerBuilder.defaultTransferManager
lazy val objectLister = new S3ClientObjectLister(s3Client) lazy val objectLister = new S3ClientObjectLister(ioS3Client)
lazy val copier = new S3ClientCopier(s3Client) lazy val copier = new S3ClientCopier(ioS3Client)
lazy val uploader = new S3ClientPutObjectUploader(s3Client) lazy val uploader = new S3ClientPutObjectUploader(amazonS3Client)
lazy val multiPartUploader = new S3ClientMultiPartTransferManager(amazonS3TransferManager) lazy val multiPartUploader = new S3ClientMultiPartTransferManager(amazonS3TransferManager)
lazy val deleter = new S3ClientDeleter(s3Client) lazy val deleter = new S3ClientDeleter(ioS3Client)
override def listObjects(bucket: Bucket, override def listObjects(bucket: Bucket,
prefix: RemoteKey) prefix: RemoteKey)
@ -36,10 +38,12 @@ private class ThorpS3Client(s3Client: S3CatsIOClient)
override def upload(localFile: LocalFile, override def upload(localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
progressListener: UploadProgressListener,
tryCount: Int) tryCount: Int)
(implicit c: Config): IO[S3Action] = (implicit c: Config): IO[S3Action] =
if (multiPartUploader.accepts(localFile)) multiPartUploader.upload(localFile, bucket, 1)
else uploader.upload(localFile, bucket, tryCount) if (multiPartUploader.accepts(localFile)) multiPartUploader.upload(localFile, bucket, progressListener, 1)
else uploader.upload(localFile, bucket, progressListener, tryCount)
override def delete(bucket: Bucket, override def delete(bucket: Bucket,
remoteKey: RemoteKey) remoteKey: RemoteKey)

View file

@ -0,0 +1,17 @@
package net.kemitix.s3thorp.awssdk
import com.amazonaws.event.{ProgressEvent, ProgressListener}
import net.kemitix.s3thorp.{Config, LocalFile}
class UploadProgressListener(localFile: LocalFile)
(implicit c: Config)
extends UploadProgressLogging {
def listener: ProgressListener = new ProgressListener {
override def progressChanged(progressEvent: ProgressEvent): Unit = {
val eventType = progressEvent.getEventType
if (eventType.isTransferEvent) logTransfer(localFile, eventType)
else logRequestCycle(localFile, eventType, progressEvent.getBytes, progressEvent.getBytesTransferred)
}
}
}

View file

@ -0,0 +1,22 @@
package net.kemitix.s3thorp.awssdk
import com.amazonaws.event.ProgressEventType
import net.kemitix.s3thorp.{Logging, LocalFile, Config}
trait UploadProgressLogging
extends Logging {
def logTransfer(localFile: LocalFile,
eventType: ProgressEventType)
(implicit c: Config): Unit =
log2(s"Transfer:${eventType.name}: ${localFile.remoteKey.key}")
def logRequestCycle(localFile: LocalFile,
eventType: ProgressEventType,
bytes: Long,
transferred: Long)
(implicit c: Config): Unit =
if (eventType equals ProgressEventType.REQUEST_BYTE_TRANSFER_EVENT) print('.')
else log3(s"Uploading:${eventType.name}:$transferred/$bytes:${localFile.remoteKey.key}")
}

View file

@ -1,12 +1,13 @@
package net.kemitix.s3thorp package net.kemitix.s3thorp
import cats.effect.IO import cats.effect.IO
import net.kemitix.s3thorp.awssdk.{S3ObjectsData, S3Client} import net.kemitix.s3thorp.awssdk.{S3Client, S3ObjectsData, UploadProgressListener}
trait DummyS3Client extends S3Client { trait DummyS3Client extends S3Client {
override def upload(localFile: LocalFile, override def upload(localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
progressListener: UploadProgressListener,
tryCount: Int tryCount: Int
)(implicit c: Config): IO[UploadS3Action] = ??? )(implicit c: Config): IO[UploadS3Action] = ???

View file

@ -1,16 +1,27 @@
package net.kemitix.s3thorp package net.kemitix.s3thorp
import java.io.File import java.io.{File, InputStream}
import java.net.URL
import java.time.Instant import java.time.Instant
import java.util
import java.util.Date
import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletableFuture
import cats.effect.IO import cats.effect.IO
import net.kemitix.s3thorp.awssdk.{S3Client, S3ObjectsData} import com.amazonaws.regions.Region
import com.amazonaws.services.s3.model._
import com.amazonaws.services.s3.model.analytics.AnalyticsConfiguration
import com.amazonaws.services.s3.model.inventory.InventoryConfiguration
import com.amazonaws.services.s3.model.metrics.MetricsConfiguration
import com.amazonaws.services.s3.transfer.TransferManagerBuilder
import com.amazonaws.services.s3.waiters.AmazonS3Waiters
import com.amazonaws.services.s3.{AmazonS3, S3ClientOptions, S3ResponseMetadata, model}
import com.amazonaws.{AmazonWebServiceRequest, HttpMethod}
import com.github.j5ik2o.reactive.aws.s3.S3AsyncClient import com.github.j5ik2o.reactive.aws.s3.S3AsyncClient
import software.amazon.awssdk.core.async.AsyncRequestBody import net.kemitix.s3thorp.awssdk.{MyAmazonS3, S3Client, S3ObjectsData, UploadProgressListener}
import software.amazon.awssdk.services.s3.{S3AsyncClient => JavaS3AsyncClient}
import software.amazon.awssdk.services.s3 import software.amazon.awssdk.services.s3
import software.amazon.awssdk.services.s3.model.{ListObjectsV2Request, ListObjectsV2Response, PutObjectRequest, PutObjectResponse} import software.amazon.awssdk.services.s3.model.{ListObjectsV2Request, ListObjectsV2Response}
import software.amazon.awssdk.services.s3.{S3AsyncClient => JavaS3AsyncClient}
class SyncSuite class SyncSuite
extends UnitTest extends UnitTest
@ -20,6 +31,11 @@ class SyncSuite
private val prefix = RemoteKey("prefix") private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
private val lastModified = LastModified(Instant.now) private val lastModified = LastModified(Instant.now)
val fileToKey: File => RemoteKey = generateKey(source, prefix)
val rootHash = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e")
val leafHash = MD5Hash("208386a650bdec61cfcd7bd8dcb6b542")
val rootFile = aLocalFile("root-file", rootHash, source, fileToKey)
val leafFile = aLocalFile("subdir/leaf-file", leafHash, source, fileToKey)
describe("s3client thunk") { describe("s3client thunk") {
val testBucket = Bucket("bucket") val testBucket = Bucket("bucket")
@ -28,9 +44,11 @@ class SyncSuite
describe("upload") { describe("upload") {
val md5Hash = MD5Hash("the-hash") val md5Hash = MD5Hash("the-hash")
val testLocalFile = aLocalFile("file", md5Hash, source, generateKey(source, prefix)) val testLocalFile = aLocalFile("file", md5Hash, source, generateKey(source, prefix))
val progressListener = new UploadProgressListener(testLocalFile)
val sync = new Sync(new S3Client with DummyS3Client { val sync = new Sync(new S3Client with DummyS3Client {
override def upload(localFile: LocalFile, override def upload(localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
progressListener: UploadProgressListener,
tryCount: Int) tryCount: Int)
(implicit c: Config) = IO { (implicit c: Config) = IO {
assert(bucket == testBucket) assert(bucket == testBucket)
@ -39,11 +57,16 @@ class SyncSuite
}) })
it("delegates unmodified to the S3Client") { it("delegates unmodified to the S3Client") {
assertResult(UploadS3Action(RemoteKey(prefix.key + "/file"), md5Hash))( assertResult(UploadS3Action(RemoteKey(prefix.key + "/file"), md5Hash))(
sync.upload(testLocalFile, testBucket, 1). sync.upload(testLocalFile, testBucket, progressListener, 1).
unsafeRunSync()) unsafeRunSync())
} }
} }
} }
def putObjectRequest(bucket: Bucket, remoteKey: RemoteKey, localFile: LocalFile) = {
(bucket.name, remoteKey.key, localFile.file)
}
describe("run") { describe("run") {
val testBucket = Bucket("bucket") val testBucket = Bucket("bucket")
val source = Resource(this, "upload") val source = Resource(this, "upload")
@ -51,8 +74,6 @@ class SyncSuite
val config = Config(Bucket("bucket"), RemoteKey("prefix"), source = source) val config = Config(Bucket("bucket"), RemoteKey("prefix"), source = source)
val rootRemoteKey = RemoteKey("prefix/root-file") val rootRemoteKey = RemoteKey("prefix/root-file")
val leafRemoteKey = RemoteKey("prefix/subdir/leaf-file") val leafRemoteKey = RemoteKey("prefix/subdir/leaf-file")
val rootHash = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e")
val leafHash = MD5Hash("208386a650bdec61cfcd7bd8dcb6b542")
describe("when all files should be uploaded") { describe("when all files should be uploaded") {
val sync = new RecordingSync(testBucket, new DummyS3Client {}, S3ObjectsData( val sync = new RecordingSync(testBucket, new DummyS3Client {}, S3ObjectsData(
byHash = Map(), byHash = Map(),
@ -144,16 +165,19 @@ class SyncSuite
} }
} }
describe("io actions execute") { describe("io actions execute") {
val recordingS3ClientLegacy = new RecordingS3ClientLegacy
val recordingS3Client = new RecordingS3Client val recordingS3Client = new RecordingS3Client
val client = S3Client.createClient(recordingS3Client) val transferManager = TransferManagerBuilder.standard
.withS3Client(recordingS3Client).build
val client = S3Client.createClient(recordingS3ClientLegacy, recordingS3Client, transferManager)
val sync = new Sync(client) val sync = new Sync(client)
sync.run(config).unsafeRunSync sync.run(config).unsafeRunSync
it("invokes the underlying Java s3client") { it("invokes the underlying Java s3client") {
val expected = Set( val expected = Set(
PutObjectRequest.builder().bucket(testBucket.name).key(rootRemoteKey.key).build(), putObjectRequest(testBucket, rootRemoteKey, rootFile),
PutObjectRequest.builder().bucket(testBucket.name).key(leafRemoteKey.key).build() putObjectRequest(testBucket, leafRemoteKey, leafFile)
) )
val result = recordingS3Client.puts val result = recordingS3Client.puts map {r => (r.getBucketName, r.getKey, r.getFile)}
assertResult(expected)(result) assertResult(expected)(result)
} }
} }
@ -181,6 +205,7 @@ class SyncSuite
override def upload(localFile: LocalFile, override def upload(localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
progressListener: UploadProgressListener,
tryCount: Int tryCount: Int
)(implicit c: Config) = IO { )(implicit c: Config) = IO {
if (bucket == testBucket) if (bucket == testBucket)
@ -207,7 +232,7 @@ class SyncSuite
} }
} }
class RecordingS3Client extends S3AsyncClient { class RecordingS3ClientLegacy extends S3AsyncClient {
var lists: Set[ListObjectsV2Request] = Set() var lists: Set[ListObjectsV2Request] = Set()
var puts: Set[PutObjectRequest] = Set() var puts: Set[PutObjectRequest] = Set()
override val underlying: s3.S3AsyncClient = new JavaS3AsyncClient { override val underlying: s3.S3AsyncClient = new JavaS3AsyncClient {
@ -220,12 +245,19 @@ class SyncSuite
CompletableFuture.completedFuture(ListObjectsV2Response.builder().build()) CompletableFuture.completedFuture(ListObjectsV2Response.builder().build())
} }
override def putObject(putObjectRequest: PutObjectRequest, }
requestBody: AsyncRequestBody): CompletableFuture[PutObjectResponse] = { }
class RecordingS3Client extends MyAmazonS3 {
var lists: Set[ListObjectsV2Request] = Set()
var puts: Set[PutObjectRequest] = Set()
override def putObject(putObjectRequest: PutObjectRequest): PutObjectResult = {
puts += putObjectRequest puts += putObjectRequest
CompletableFuture.completedFuture(PutObjectResponse.builder().eTag("not-null").build()) val result = new PutObjectResult
result.setETag("not-null")
result
} }
} }
}
} }

View file

@ -0,0 +1,375 @@
package net.kemitix.s3thorp.awssdk
import java.io.{File, InputStream}
import java.net.URL
import java.util
import java.util.Date
import com.amazonaws.{AmazonWebServiceRequest, HttpMethod}
import com.amazonaws.regions.Region
import com.amazonaws.services.s3.model.analytics.AnalyticsConfiguration
import com.amazonaws.services.s3.model.inventory.InventoryConfiguration
import com.amazonaws.services.s3.{AmazonS3, S3ClientOptions, S3ResponseMetadata, model}
import com.amazonaws.services.s3.model.metrics.MetricsConfiguration
import com.amazonaws.services.s3.model._
import com.amazonaws.services.s3.waiters.AmazonS3Waiters
abstract class MyAmazonS3 extends AmazonS3 {
override def setEndpoint(endpoint: String): Unit = ???
override def setRegion(region: Region): Unit = ???
override def setS3ClientOptions(clientOptions: S3ClientOptions): Unit = ???
override def changeObjectStorageClass(bucketName: String, key: String, newStorageClass: StorageClass): Unit = ???
override def setObjectRedirectLocation(bucketName: String, key: String, newRedirectLocation: String): Unit = ???
override def listObjects(bucketName: String): ObjectListing = ???
override def listObjects(bucketName: String, prefix: String): ObjectListing = ???
override def listObjects(listObjectsRequest: ListObjectsRequest): ObjectListing = ???
override def listObjectsV2(bucketName: String): ListObjectsV2Result = ???
override def listObjectsV2(bucketName: String, prefix: String): ListObjectsV2Result = ???
override def listObjectsV2(listObjectsV2Request: ListObjectsV2Request): ListObjectsV2Result = ???
override def listNextBatchOfObjects(previousObjectListing: ObjectListing): ObjectListing = ???
override def listNextBatchOfObjects(listNextBatchOfObjectsRequest: ListNextBatchOfObjectsRequest): ObjectListing = ???
override def listVersions(bucketName: String, prefix: String): VersionListing = ???
override def listNextBatchOfVersions(previousVersionListing: VersionListing): VersionListing = ???
override def listNextBatchOfVersions(listNextBatchOfVersionsRequest: ListNextBatchOfVersionsRequest): VersionListing = ???
override def listVersions(bucketName: String, prefix: String, keyMarker: String, versionIdMarker: String, delimiter: String, maxResults: Integer): VersionListing = ???
override def listVersions(listVersionsRequest: ListVersionsRequest): VersionListing = ???
override def getS3AccountOwner: Owner = ???
override def getS3AccountOwner(getS3AccountOwnerRequest: GetS3AccountOwnerRequest): Owner = ???
override def doesBucketExist(bucketName: String): Boolean = ???
override def doesBucketExistV2(bucketName: String): Boolean = ???
override def headBucket(headBucketRequest: HeadBucketRequest): HeadBucketResult = ???
override def listBuckets(): util.List[Bucket] = ???
override def listBuckets(listBucketsRequest: ListBucketsRequest): util.List[Bucket] = ???
override def getBucketLocation(bucketName: String): String = ???
override def getBucketLocation(getBucketLocationRequest: GetBucketLocationRequest): String = ???
override def createBucket(createBucketRequest: CreateBucketRequest): Bucket = ???
override def createBucket(bucketName: String): Bucket = ???
override def createBucket(bucketName: String, region: model.Region): Bucket = ???
override def createBucket(bucketName: String, region: String): Bucket = ???
override def getObjectAcl(bucketName: String, key: String): AccessControlList = ???
override def getObjectAcl(bucketName: String, key: String, versionId: String): AccessControlList = ???
override def getObjectAcl(getObjectAclRequest: GetObjectAclRequest): AccessControlList = ???
override def setObjectAcl(bucketName: String, key: String, acl: AccessControlList): Unit = ???
override def setObjectAcl(bucketName: String, key: String, acl: CannedAccessControlList): Unit = ???
override def setObjectAcl(bucketName: String, key: String, versionId: String, acl: AccessControlList): Unit = ???
override def setObjectAcl(bucketName: String, key: String, versionId: String, acl: CannedAccessControlList): Unit = ???
override def setObjectAcl(setObjectAclRequest: SetObjectAclRequest): Unit = ???
override def getBucketAcl(bucketName: String): AccessControlList = ???
override def setBucketAcl(setBucketAclRequest: SetBucketAclRequest): Unit = ???
override def getBucketAcl(getBucketAclRequest: GetBucketAclRequest): AccessControlList = ???
override def setBucketAcl(bucketName: String, acl: AccessControlList): Unit = ???
override def setBucketAcl(bucketName: String, acl: CannedAccessControlList): Unit = ???
override def getObjectMetadata(bucketName: String, key: String): ObjectMetadata = ???
override def getObjectMetadata(getObjectMetadataRequest: GetObjectMetadataRequest): ObjectMetadata = ???
override def getObject(bucketName: String, key: String): S3Object = ???
override def getObject(getObjectRequest: GetObjectRequest): S3Object = ???
override def getObject(getObjectRequest: GetObjectRequest, destinationFile: File): ObjectMetadata = ???
override def getObjectAsString(bucketName: String, key: String): String = ???
override def getObjectTagging(getObjectTaggingRequest: GetObjectTaggingRequest): GetObjectTaggingResult = ???
override def setObjectTagging(setObjectTaggingRequest: SetObjectTaggingRequest): SetObjectTaggingResult = ???
override def deleteObjectTagging(deleteObjectTaggingRequest: DeleteObjectTaggingRequest): DeleteObjectTaggingResult = ???
override def deleteBucket(deleteBucketRequest: DeleteBucketRequest): Unit = ???
override def deleteBucket(bucketName: String): Unit = ???
override def putObject(putObjectRequest: PutObjectRequest): PutObjectResult = ???
override def putObject(bucketName: String, key: String, file: File): PutObjectResult = ???
override def putObject(bucketName: String, key: String, input: InputStream, metadata: ObjectMetadata): PutObjectResult = ???
override def putObject(bucketName: String, key: String, content: String): PutObjectResult = ???
override def copyObject(sourceBucketName: String, sourceKey: String, destinationBucketName: String, destinationKey: String): CopyObjectResult = ???
override def copyObject(copyObjectRequest: CopyObjectRequest): CopyObjectResult = ???
override def copyPart(copyPartRequest: CopyPartRequest): CopyPartResult = ???
override def deleteObject(bucketName: String, key: String): Unit = ???
override def deleteObject(deleteObjectRequest: DeleteObjectRequest): Unit = ???
override def deleteObjects(deleteObjectsRequest: DeleteObjectsRequest): DeleteObjectsResult = ???
override def deleteVersion(bucketName: String, key: String, versionId: String): Unit = ???
override def deleteVersion(deleteVersionRequest: DeleteVersionRequest): Unit = ???
override def getBucketLoggingConfiguration(bucketName: String): BucketLoggingConfiguration = ???
override def getBucketLoggingConfiguration(getBucketLoggingConfigurationRequest: GetBucketLoggingConfigurationRequest): BucketLoggingConfiguration = ???
override def setBucketLoggingConfiguration(setBucketLoggingConfigurationRequest: SetBucketLoggingConfigurationRequest): Unit = ???
override def getBucketVersioningConfiguration(bucketName: String): BucketVersioningConfiguration = ???
override def getBucketVersioningConfiguration(getBucketVersioningConfigurationRequest: GetBucketVersioningConfigurationRequest): BucketVersioningConfiguration = ???
override def setBucketVersioningConfiguration(setBucketVersioningConfigurationRequest: SetBucketVersioningConfigurationRequest): Unit = ???
override def getBucketLifecycleConfiguration(bucketName: String): BucketLifecycleConfiguration = ???
override def getBucketLifecycleConfiguration(getBucketLifecycleConfigurationRequest: GetBucketLifecycleConfigurationRequest): BucketLifecycleConfiguration = ???
override def setBucketLifecycleConfiguration(bucketName: String, bucketLifecycleConfiguration: BucketLifecycleConfiguration): Unit = ???
override def setBucketLifecycleConfiguration(setBucketLifecycleConfigurationRequest: SetBucketLifecycleConfigurationRequest): Unit = ???
override def deleteBucketLifecycleConfiguration(bucketName: String): Unit = ???
override def deleteBucketLifecycleConfiguration(deleteBucketLifecycleConfigurationRequest: DeleteBucketLifecycleConfigurationRequest): Unit = ???
override def getBucketCrossOriginConfiguration(bucketName: String): BucketCrossOriginConfiguration = ???
override def getBucketCrossOriginConfiguration(getBucketCrossOriginConfigurationRequest: GetBucketCrossOriginConfigurationRequest): BucketCrossOriginConfiguration = ???
override def setBucketCrossOriginConfiguration(bucketName: String, bucketCrossOriginConfiguration: BucketCrossOriginConfiguration): Unit = ???
override def setBucketCrossOriginConfiguration(setBucketCrossOriginConfigurationRequest: SetBucketCrossOriginConfigurationRequest): Unit = ???
override def deleteBucketCrossOriginConfiguration(bucketName: String): Unit = ???
override def deleteBucketCrossOriginConfiguration(deleteBucketCrossOriginConfigurationRequest: DeleteBucketCrossOriginConfigurationRequest): Unit = ???
override def getBucketTaggingConfiguration(bucketName: String): BucketTaggingConfiguration = ???
override def getBucketTaggingConfiguration(getBucketTaggingConfigurationRequest: GetBucketTaggingConfigurationRequest): BucketTaggingConfiguration = ???
override def setBucketTaggingConfiguration(bucketName: String, bucketTaggingConfiguration: BucketTaggingConfiguration): Unit = ???
override def setBucketTaggingConfiguration(setBucketTaggingConfigurationRequest: SetBucketTaggingConfigurationRequest): Unit = ???
override def deleteBucketTaggingConfiguration(bucketName: String): Unit = ???
override def deleteBucketTaggingConfiguration(deleteBucketTaggingConfigurationRequest: DeleteBucketTaggingConfigurationRequest): Unit = ???
override def getBucketNotificationConfiguration(bucketName: String): BucketNotificationConfiguration = ???
override def getBucketNotificationConfiguration(getBucketNotificationConfigurationRequest: GetBucketNotificationConfigurationRequest): BucketNotificationConfiguration = ???
override def setBucketNotificationConfiguration(setBucketNotificationConfigurationRequest: SetBucketNotificationConfigurationRequest): Unit = ???
override def setBucketNotificationConfiguration(bucketName: String, bucketNotificationConfiguration: BucketNotificationConfiguration): Unit = ???
override def getBucketWebsiteConfiguration(bucketName: String): BucketWebsiteConfiguration = ???
override def getBucketWebsiteConfiguration(getBucketWebsiteConfigurationRequest: GetBucketWebsiteConfigurationRequest): BucketWebsiteConfiguration = ???
override def setBucketWebsiteConfiguration(bucketName: String, configuration: BucketWebsiteConfiguration): Unit = ???
override def setBucketWebsiteConfiguration(setBucketWebsiteConfigurationRequest: SetBucketWebsiteConfigurationRequest): Unit = ???
override def deleteBucketWebsiteConfiguration(bucketName: String): Unit = ???
override def deleteBucketWebsiteConfiguration(deleteBucketWebsiteConfigurationRequest: DeleteBucketWebsiteConfigurationRequest): Unit = ???
override def getBucketPolicy(bucketName: String): BucketPolicy = ???
override def getBucketPolicy(getBucketPolicyRequest: GetBucketPolicyRequest): BucketPolicy = ???
override def setBucketPolicy(bucketName: String, policyText: String): Unit = ???
override def setBucketPolicy(setBucketPolicyRequest: SetBucketPolicyRequest): Unit = ???
override def deleteBucketPolicy(bucketName: String): Unit = ???
override def deleteBucketPolicy(deleteBucketPolicyRequest: DeleteBucketPolicyRequest): Unit = ???
override def generatePresignedUrl(bucketName: String, key: String, expiration: Date): URL = ???
override def generatePresignedUrl(bucketName: String, key: String, expiration: Date, method: HttpMethod): URL = ???
override def generatePresignedUrl(generatePresignedUrlRequest: GeneratePresignedUrlRequest): URL = ???
override def initiateMultipartUpload(request: InitiateMultipartUploadRequest): InitiateMultipartUploadResult = ???
override def uploadPart(request: UploadPartRequest): UploadPartResult = ???
override def listParts(request: ListPartsRequest): PartListing = ???
override def abortMultipartUpload(request: AbortMultipartUploadRequest): Unit = ???
override def completeMultipartUpload(request: CompleteMultipartUploadRequest): CompleteMultipartUploadResult = ???
override def listMultipartUploads(request: ListMultipartUploadsRequest): MultipartUploadListing = ???
override def getCachedResponseMetadata(request: AmazonWebServiceRequest): S3ResponseMetadata = ???
override def restoreObject(request: RestoreObjectRequest): Unit = ???
override def restoreObjectV2(request: RestoreObjectRequest): RestoreObjectResult = ???
override def restoreObject(bucketName: String, key: String, expirationInDays: Int): Unit = ???
override def enableRequesterPays(bucketName: String): Unit = ???
override def disableRequesterPays(bucketName: String): Unit = ???
override def isRequesterPaysEnabled(bucketName: String): Boolean = ???
override def setBucketReplicationConfiguration(bucketName: String, configuration: BucketReplicationConfiguration): Unit = ???
override def setBucketReplicationConfiguration(setBucketReplicationConfigurationRequest: SetBucketReplicationConfigurationRequest): Unit = ???
override def getBucketReplicationConfiguration(bucketName: String): BucketReplicationConfiguration = ???
override def getBucketReplicationConfiguration(getBucketReplicationConfigurationRequest: GetBucketReplicationConfigurationRequest): BucketReplicationConfiguration = ???
override def deleteBucketReplicationConfiguration(bucketName: String): Unit = ???
override def deleteBucketReplicationConfiguration(request: DeleteBucketReplicationConfigurationRequest): Unit = ???
override def doesObjectExist(bucketName: String, objectName: String): Boolean = ???
override def getBucketAccelerateConfiguration(bucketName: String): BucketAccelerateConfiguration = ???
override def getBucketAccelerateConfiguration(getBucketAccelerateConfigurationRequest: GetBucketAccelerateConfigurationRequest): BucketAccelerateConfiguration = ???
override def setBucketAccelerateConfiguration(bucketName: String, accelerateConfiguration: BucketAccelerateConfiguration): Unit = ???
override def setBucketAccelerateConfiguration(setBucketAccelerateConfigurationRequest: SetBucketAccelerateConfigurationRequest): Unit = ???
override def deleteBucketMetricsConfiguration(bucketName: String, id: String): DeleteBucketMetricsConfigurationResult = ???
override def deleteBucketMetricsConfiguration(deleteBucketMetricsConfigurationRequest: DeleteBucketMetricsConfigurationRequest): DeleteBucketMetricsConfigurationResult = ???
override def getBucketMetricsConfiguration(bucketName: String, id: String): GetBucketMetricsConfigurationResult = ???
override def getBucketMetricsConfiguration(getBucketMetricsConfigurationRequest: GetBucketMetricsConfigurationRequest): GetBucketMetricsConfigurationResult = ???
override def setBucketMetricsConfiguration(bucketName: String, metricsConfiguration: MetricsConfiguration): SetBucketMetricsConfigurationResult = ???
override def setBucketMetricsConfiguration(setBucketMetricsConfigurationRequest: SetBucketMetricsConfigurationRequest): SetBucketMetricsConfigurationResult = ???
override def listBucketMetricsConfigurations(listBucketMetricsConfigurationsRequest: ListBucketMetricsConfigurationsRequest): ListBucketMetricsConfigurationsResult = ???
override def deleteBucketAnalyticsConfiguration(bucketName: String, id: String): DeleteBucketAnalyticsConfigurationResult = ???
override def deleteBucketAnalyticsConfiguration(deleteBucketAnalyticsConfigurationRequest: DeleteBucketAnalyticsConfigurationRequest): DeleteBucketAnalyticsConfigurationResult = ???
override def getBucketAnalyticsConfiguration(bucketName: String, id: String): GetBucketAnalyticsConfigurationResult = ???
override def getBucketAnalyticsConfiguration(getBucketAnalyticsConfigurationRequest: GetBucketAnalyticsConfigurationRequest): GetBucketAnalyticsConfigurationResult = ???
override def setBucketAnalyticsConfiguration(bucketName: String, analyticsConfiguration: AnalyticsConfiguration): SetBucketAnalyticsConfigurationResult = ???
override def setBucketAnalyticsConfiguration(setBucketAnalyticsConfigurationRequest: SetBucketAnalyticsConfigurationRequest): SetBucketAnalyticsConfigurationResult = ???
override def listBucketAnalyticsConfigurations(listBucketAnalyticsConfigurationsRequest: ListBucketAnalyticsConfigurationsRequest): ListBucketAnalyticsConfigurationsResult = ???
override def deleteBucketInventoryConfiguration(bucketName: String, id: String): DeleteBucketInventoryConfigurationResult = ???
override def deleteBucketInventoryConfiguration(deleteBucketInventoryConfigurationRequest: DeleteBucketInventoryConfigurationRequest): DeleteBucketInventoryConfigurationResult = ???
override def getBucketInventoryConfiguration(bucketName: String, id: String): GetBucketInventoryConfigurationResult = ???
override def getBucketInventoryConfiguration(getBucketInventoryConfigurationRequest: GetBucketInventoryConfigurationRequest): GetBucketInventoryConfigurationResult = ???
override def setBucketInventoryConfiguration(bucketName: String, inventoryConfiguration: InventoryConfiguration): SetBucketInventoryConfigurationResult = ???
override def setBucketInventoryConfiguration(setBucketInventoryConfigurationRequest: SetBucketInventoryConfigurationRequest): SetBucketInventoryConfigurationResult = ???
override def listBucketInventoryConfigurations(listBucketInventoryConfigurationsRequest: ListBucketInventoryConfigurationsRequest): ListBucketInventoryConfigurationsResult = ???
override def deleteBucketEncryption(bucketName: String): DeleteBucketEncryptionResult = ???
override def deleteBucketEncryption(request: DeleteBucketEncryptionRequest): DeleteBucketEncryptionResult = ???
override def getBucketEncryption(bucketName: String): GetBucketEncryptionResult = ???
override def getBucketEncryption(request: GetBucketEncryptionRequest): GetBucketEncryptionResult = ???
override def setBucketEncryption(setBucketEncryptionRequest: SetBucketEncryptionRequest): SetBucketEncryptionResult = ???
override def setPublicAccessBlock(request: SetPublicAccessBlockRequest): SetPublicAccessBlockResult = ???
override def getPublicAccessBlock(request: GetPublicAccessBlockRequest): GetPublicAccessBlockResult = ???
override def deletePublicAccessBlock(request: DeletePublicAccessBlockRequest): DeletePublicAccessBlockResult = ???
override def getBucketPolicyStatus(request: GetBucketPolicyStatusRequest): GetBucketPolicyStatusResult = ???
override def selectObjectContent(selectRequest: SelectObjectContentRequest): SelectObjectContentResult = ???
override def setObjectLegalHold(setObjectLegalHoldRequest: SetObjectLegalHoldRequest): SetObjectLegalHoldResult = ???
override def getObjectLegalHold(getObjectLegalHoldRequest: GetObjectLegalHoldRequest): GetObjectLegalHoldResult = ???
override def setObjectLockConfiguration(setObjectLockConfigurationRequest: SetObjectLockConfigurationRequest): SetObjectLockConfigurationResult = ???
override def getObjectLockConfiguration(getObjectLockConfigurationRequest: GetObjectLockConfigurationRequest): GetObjectLockConfigurationResult = ???
override def setObjectRetention(setObjectRetentionRequest: SetObjectRetentionRequest): SetObjectRetentionResult = ???
override def getObjectRetention(getObjectRetentionRequest: GetObjectRetentionRequest): GetObjectRetentionResult = ???
override def download(presignedUrlDownloadRequest: PresignedUrlDownloadRequest): PresignedUrlDownloadResult = ???
override def download(presignedUrlDownloadRequest: PresignedUrlDownloadRequest, destinationFile: File): Unit = ???
override def upload(presignedUrlUploadRequest: PresignedUrlUploadRequest): PresignedUrlUploadResult = ???
override def shutdown(): Unit = ???
override def getRegion: model.Region = ???
override def getRegionName: String = ???
override def getUrl(bucketName: String, key: String): URL = ???
override def waiters(): AmazonS3Waiters = ???
}

View file

@ -6,7 +6,7 @@ import java.time.Instant
import com.amazonaws.AmazonClientException import com.amazonaws.AmazonClientException
import com.amazonaws.services.s3.model import com.amazonaws.services.s3.model
import com.amazonaws.services.s3.transfer.model.UploadResult import com.amazonaws.services.s3.transfer.model.UploadResult
import com.amazonaws.services.s3.transfer.{PauseResult, PersistableUpload, Transfer, TransferManager, TransferProgress, Upload} import com.amazonaws.services.s3.transfer.{PauseResult, PersistableUpload, Transfer, TransferManager, TransferManagerBuilder, TransferProgress, Upload}
import net.kemitix.s3thorp.{Bucket, Config, KeyGenerator, LastModified, MD5Hash, MD5HashGenerator, RemoteKey, Resource, UnitTest, UploadS3Action} import net.kemitix.s3thorp.{Bucket, Config, KeyGenerator, LastModified, MD5Hash, MD5HashGenerator, RemoteKey, Resource, UnitTest, UploadS3Action}
class S3ClientMultiPartTransferManagerSuite class S3ClientMultiPartTransferManagerSuite
@ -43,16 +43,25 @@ class S3ClientMultiPartTransferManagerSuite
} }
} }
describe("upload") { describe("upload") {
pending
// how much of this test is testing the amazonTransferManager
// Should we just test that the correct parameters are passed to initiate, or will this test
// just collapse and die if the amazonS3 doesn't respond properly to TransferManager input
// dies when putObject is called
val returnedKey = RemoteKey("returned-key") val returnedKey = RemoteKey("returned-key")
val returnedHash = MD5Hash("returned-hash") val returnedHash = MD5Hash("returned-hash")
val bigFile = aLocalFile("small-file", MD5Hash("the-hash"), source, fileToKey) val bigFile = aLocalFile("small-file", MD5Hash("the-hash"), source, fileToKey)
val transferManager = new MyTransferManager( val progressListener = new UploadProgressListener(bigFile)
(config.bucket.name, bigFile.remoteKey.key, bigFile.file), val amazonS3 = new MyAmazonS3 {}
returnedKey, returnedHash) val amazonS3TransferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build
val uploader = new S3ClientMultiPartTransferManager(transferManager) new MyTransferManager(
signature = (config.bucket.name, bigFile.remoteKey.key, bigFile.file),
returnedKey = returnedKey,
returnedHash = returnedHash)
val uploader = new S3ClientMultiPartTransferManager(amazonS3TransferManager)
it("should upload") { it("should upload") {
val expected = UploadS3Action(returnedKey, returnedHash) val expected = UploadS3Action(returnedKey, returnedHash)
val result = uploader.upload(bigFile, config.bucket, 1).unsafeRunSync val result = uploader.upload(bigFile, config.bucket, progressListener, 1).unsafeRunSync
assertResult(expected)(result) assertResult(expected)(result)
} }
} }

View file

@ -54,6 +54,7 @@ class S3ClientMultiPartUploaderSuite
describe("mulit-part uploader upload") { describe("mulit-part uploader upload") {
val theFile = aLocalFile("big-file", MD5Hash(""), source, fileToKey) val theFile = aLocalFile("big-file", MD5Hash(""), source, fileToKey)
val progressListener = new UploadProgressListener(theFile)
val uploadId = "upload-id" val uploadId = "upload-id"
val createUploadResponse = new InitiateMultipartUploadResult() val createUploadResponse = new InitiateMultipartUploadResult()
createUploadResponse.setBucketName(config.bucket.name) createUploadResponse.setBucketName(config.bucket.name)
@ -167,7 +168,7 @@ class S3ClientMultiPartUploaderSuite
describe("upload") { describe("upload") {
describe("when all okay") { describe("when all okay") {
val uploader = new RecordingMultiPartUploader() val uploader = new RecordingMultiPartUploader()
uploader.upload(theFile, config.bucket, 1).unsafeRunSync uploader.upload(theFile, config.bucket, progressListener, 1).unsafeRunSync
it("should initiate the upload") { it("should initiate the upload") {
assert(uploader.initiated.get) assert(uploader.initiated.get)
} }
@ -180,7 +181,7 @@ class S3ClientMultiPartUploaderSuite
} }
describe("when initiate upload fails") { describe("when initiate upload fails") {
val uploader = new RecordingMultiPartUploader(initOkay = false) val uploader = new RecordingMultiPartUploader(initOkay = false)
uploader.upload(theFile, config.bucket, 1).unsafeRunSync uploader.upload(theFile, config.bucket, progressListener, 1).unsafeRunSync
it("should not upload any parts") { it("should not upload any parts") {
assertResult(Set())(uploader.partsUploaded.get) assertResult(Set())(uploader.partsUploaded.get)
} }
@ -190,7 +191,7 @@ class S3ClientMultiPartUploaderSuite
} }
describe("when uploading a part fails once") { describe("when uploading a part fails once") {
val uploader = new RecordingMultiPartUploader(partTriesRequired = 2) val uploader = new RecordingMultiPartUploader(partTriesRequired = 2)
uploader.upload(theFile, config.bucket, 1).unsafeRunSync uploader.upload(theFile, config.bucket, progressListener, 1).unsafeRunSync
it("should initiate the upload") { it("should initiate the upload") {
assert(uploader.initiated.get) assert(uploader.initiated.get)
} }
@ -203,7 +204,7 @@ class S3ClientMultiPartUploaderSuite
} }
describe("when uploading a part fails too many times") { describe("when uploading a part fails too many times") {
val uploader = new RecordingMultiPartUploader(partTriesRequired = 4) val uploader = new RecordingMultiPartUploader(partTriesRequired = 4)
uploader.upload(theFile, config.bucket, 1).unsafeRunSync uploader.upload(theFile, config.bucket, progressListener, 1).unsafeRunSync
it("should initiate the upload") { it("should initiate the upload") {
assert(uploader.initiated.get) assert(uploader.initiated.get)
} }

View file

@ -4,6 +4,9 @@ import java.io.File
import java.time.Instant import java.time.Instant
import cats.effect.IO import cats.effect.IO
import com.amazonaws.services.s3.model
import com.amazonaws.services.s3.model.PutObjectResult
import com.amazonaws.services.s3.transfer.{TransferManager, TransferManagerBuilder}
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
import net.kemitix.s3thorp._ import net.kemitix.s3thorp._
import software.amazon.awssdk.services.s3.model.{PutObjectRequest, PutObjectResponse} import software.amazon.awssdk.services.s3.model.{PutObjectRequest, PutObjectResponse}
@ -75,22 +78,31 @@ class S3ClientSuite
} }
describe("upload") { describe("upload") {
def invoke(s3Client: ThorpS3Client, localFile: LocalFile, bucket: Bucket) = def invoke(s3Client: ThorpS3Client, localFile: LocalFile, bucket: Bucket, progressListener: UploadProgressListener) =
s3Client.upload(localFile, bucket, 1).unsafeRunSync s3Client.upload(localFile, bucket, progressListener, 1).unsafeRunSync
describe("when uploading a file") { describe("when uploading a file") {
val md5Hash = MD5Hash("the-md5hash") val source = Resource(this, "../upload")
val md5Hash = new MD5HashGenerator {}.md5File(source.toPath.resolve("root-file").toFile)
val amazonS3 = new MyAmazonS3 {
override def putObject(putObjectRequest: model.PutObjectRequest): PutObjectResult = {
val result = new PutObjectResult
result.setETag(md5Hash.hash)
result
}
}
val amazonS3TransferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build
val s3Client = new ThorpS3Client( val s3Client = new ThorpS3Client(
new S3CatsIOClient with JavaClientWrapper { new S3CatsIOClient with JavaClientWrapper {
override def putObject(putObjectRequest: PutObjectRequest, requestBody: RB) = // override def putObject(putObjectRequest: PutObjectRequest, requestBody: RB) =
IO(PutObjectResponse.builder().eTag(md5Hash.hash).build()) // IO(PutObjectResponse.builder().eTag(md5Hash.hash).build())
}) }, amazonS3, amazonS3TransferManager)
val source = new File("/")
val prefix = RemoteKey("prefix") val prefix = RemoteKey("prefix")
val localFile: LocalFile = aLocalFile("/some/file", md5Hash, source, generateKey(source, prefix)) val localFile: LocalFile = aLocalFile("root-file", md5Hash, source, generateKey(source, prefix))
val bucket: Bucket = Bucket("a-bucket") val bucket: Bucket = Bucket("a-bucket")
val remoteKey: RemoteKey = RemoteKey("prefix/some/file") val remoteKey: RemoteKey = RemoteKey("prefix/root-file")
val progressListener = new UploadProgressListener(localFile)
it("should return hash of uploaded file") { it("should return hash of uploaded file") {
assertResult(UploadS3Action(remoteKey, md5Hash))(invoke(s3Client, localFile, bucket)) assertResult(UploadS3Action(remoteKey, md5Hash))(invoke(s3Client, localFile, bucket, progressListener))
} }
} }
} }

View file

@ -3,6 +3,8 @@ package net.kemitix.s3thorp.awssdk
import java.time.Instant import java.time.Instant
import cats.effect.IO import cats.effect.IO
import com.amazonaws.services.s3.model.{PutObjectRequest, PutObjectResult}
import com.amazonaws.services.s3.transfer.{TransferManager, TransferManagerBuilder}
import com.github.j5ik2o.reactive.aws.s3.S3AsyncClient import com.github.j5ik2o.reactive.aws.s3.S3AsyncClient
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
import net.kemitix.s3thorp._ import net.kemitix.s3thorp._
@ -38,10 +40,12 @@ class ThorpS3ClientSuite extends FunSpec {
.contents(List(o1a, o1b, o2).asJava) .contents(List(o1a, o1b, o2).asJava)
.build() .build()
} }
val amazonS3 = new MyAmazonS3 {}
val amazonS3TransferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build
val s3client = new ThorpS3Client(new MyS3CatsIOClient { val s3client = new ThorpS3Client(new MyS3CatsIOClient {
override def listObjectsV2(listObjectsV2Request: ListObjectsV2Request): IO[ListObjectsV2Response] = override def listObjectsV2(listObjectsV2Request: ListObjectsV2Request): IO[ListObjectsV2Response] =
myFakeResponse myFakeResponse
}) }, amazonS3, amazonS3TransferManager)
it("should build list of hash lookups, with duplicate objects grouped by hash") { it("should build list of hash lookups, with duplicate objects grouped by hash") {
val expected = S3ObjectsData( val expected = S3ObjectsData(
byHash = Map( byHash = Map(