Refactor Uploader.upload(LocalFile,Bucket,Boolean,UploadEventListener,Int) (#129)
* [storate-api] Remove unused tryCount * [storage-aws] Uploader Reduce the number of parameters
This commit is contained in:
parent
899e0724c9
commit
4a54038933
7 changed files with 54 additions and 63 deletions
|
@ -36,8 +36,7 @@ case class UnversionedMirrorArchive(syncTotals: SyncTotals)
|
||||||
Storage.upload(
|
Storage.upload(
|
||||||
localFile,
|
localFile,
|
||||||
bucket,
|
bucket,
|
||||||
UploadEventListener(localFile, index, syncTotals, totalBytesSoFar),
|
UploadEventListener(localFile, index, syncTotals, totalBytesSoFar))
|
||||||
1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
object UnversionedMirrorArchive {
|
object UnversionedMirrorArchive {
|
||||||
|
|
|
@ -20,10 +20,11 @@ case class DummyStorageService(s3ObjectData: S3ObjectsData,
|
||||||
): TaskR[Console, S3ObjectsData] =
|
): TaskR[Console, S3ObjectsData] =
|
||||||
TaskR(s3ObjectData)
|
TaskR(s3ObjectData)
|
||||||
|
|
||||||
override def upload(localFile: LocalFile,
|
override def upload(
|
||||||
|
localFile: LocalFile,
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
uploadEventListener: UploadEventListener,
|
uploadEventListener: UploadEventListener,
|
||||||
tryCount: Int): UIO[StorageQueueEvent] = {
|
): UIO[StorageQueueEvent] = {
|
||||||
val (remoteKey, md5Hash) = uploadFiles(localFile.file)
|
val (remoteKey, md5Hash) = uploadFiles(localFile.file)
|
||||||
UIO(StorageQueueEvent.UploadQueueEvent(remoteKey, md5Hash))
|
UIO(StorageQueueEvent.UploadQueueEvent(remoteKey, md5Hash))
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,6 @@ object Storage {
|
||||||
localFile: LocalFile,
|
localFile: LocalFile,
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
uploadEventListener: UploadEventListener,
|
uploadEventListener: UploadEventListener,
|
||||||
tryCount: Int
|
|
||||||
): ZIO[Storage with Config, Nothing, StorageQueueEvent]
|
): ZIO[Storage with Config, Nothing, StorageQueueEvent]
|
||||||
|
|
||||||
def copy(
|
def copy(
|
||||||
|
@ -57,8 +56,8 @@ object Storage {
|
||||||
override def upload(
|
override def upload(
|
||||||
localFile: LocalFile,
|
localFile: LocalFile,
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
uploadEventListener: UploadEventListener,
|
uploadEventListener: UploadEventListener
|
||||||
tryCount: Int): ZIO[Storage, Nothing, StorageQueueEvent] =
|
): ZIO[Storage, Nothing, StorageQueueEvent] =
|
||||||
uploadResult
|
uploadResult
|
||||||
|
|
||||||
override def copy(
|
override def copy(
|
||||||
|
@ -99,11 +98,9 @@ object Storage {
|
||||||
final def upload(
|
final def upload(
|
||||||
localFile: LocalFile,
|
localFile: LocalFile,
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
uploadEventListener: UploadEventListener,
|
uploadEventListener: UploadEventListener
|
||||||
tryCount: Int
|
|
||||||
): ZIO[Storage with Config, Nothing, StorageQueueEvent] =
|
): ZIO[Storage with Config, Nothing, StorageQueueEvent] =
|
||||||
ZIO.accessM(
|
ZIO.accessM(_.storage upload (localFile, bucket, uploadEventListener))
|
||||||
_.storage upload (localFile, bucket, uploadEventListener, tryCount))
|
|
||||||
|
|
||||||
final def copy(
|
final def copy(
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
|
|
|
@ -28,11 +28,9 @@ object S3Storage {
|
||||||
localFile: LocalFile,
|
localFile: LocalFile,
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
uploadEventListener: UploadEventListener,
|
uploadEventListener: UploadEventListener,
|
||||||
tryCount: Int): ZIO[Config, Nothing, StorageQueueEvent] =
|
): ZIO[Config, Nothing, StorageQueueEvent] =
|
||||||
Uploader.upload(transferManager)(localFile,
|
Uploader.upload(transferManager)(
|
||||||
bucket,
|
Uploader.Request(localFile, bucket, uploadEventListener))
|
||||||
uploadEventListener,
|
|
||||||
1)
|
|
||||||
|
|
||||||
override def copy(bucket: Bucket,
|
override def copy(bucket: Bucket,
|
||||||
sourceKey: RemoteKey,
|
sourceKey: RemoteKey,
|
||||||
|
|
|
@ -18,24 +18,24 @@ import zio.{UIO, ZIO}
|
||||||
|
|
||||||
trait Uploader {
|
trait Uploader {
|
||||||
|
|
||||||
def upload(transferManager: => AmazonTransferManager)(
|
case class Request(
|
||||||
localFile: LocalFile,
|
localFile: LocalFile,
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
uploadEventListener: UploadEventListener,
|
uploadEventListener: UploadEventListener
|
||||||
tryCount: Int
|
)
|
||||||
): ZIO[Config, Nothing, StorageQueueEvent] =
|
|
||||||
transfer(transferManager)(localFile, bucket, uploadEventListener)
|
def upload(transferManager: => AmazonTransferManager)(
|
||||||
.catchAll(handleError(localFile.remoteKey))
|
request: Request): ZIO[Config, Nothing, StorageQueueEvent] =
|
||||||
|
transfer(transferManager)(request)
|
||||||
|
.catchAll(handleError(request.localFile.remoteKey))
|
||||||
|
|
||||||
private def handleError(remoteKey: RemoteKey)(e: Throwable) =
|
private def handleError(remoteKey: RemoteKey)(e: Throwable) =
|
||||||
UIO(ErrorQueueEvent(Action.Upload(remoteKey.key), remoteKey, e))
|
UIO(ErrorQueueEvent(Action.Upload(remoteKey.key), remoteKey, e))
|
||||||
|
|
||||||
private def transfer(transferManager: => AmazonTransferManager)(
|
private def transfer(transferManager: => AmazonTransferManager)(
|
||||||
localFile: LocalFile,
|
request: Request
|
||||||
bucket: Bucket,
|
|
||||||
uploadEventListener: UploadEventListener
|
|
||||||
) =
|
) =
|
||||||
request(localFile, bucket, progressListener(uploadEventListener)) >>=
|
putObjectRequest(request) >>=
|
||||||
dispatch(transferManager)
|
dispatch(transferManager)
|
||||||
|
|
||||||
private def dispatch(transferManager: AmazonTransferManager)(
|
private def dispatch(transferManager: AmazonTransferManager)(
|
||||||
|
@ -49,18 +49,20 @@ trait Uploader {
|
||||||
MD5Hash(uploadResult.getETag)))
|
MD5Hash(uploadResult.getETag)))
|
||||||
}
|
}
|
||||||
|
|
||||||
private def request(
|
private def putObjectRequest(
|
||||||
localFile: LocalFile,
|
request: Request
|
||||||
bucket: Bucket,
|
|
||||||
listener: ProgressListener
|
|
||||||
) = {
|
) = {
|
||||||
val request =
|
val putRequest =
|
||||||
new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file)
|
new PutObjectRequest(request.bucket.name,
|
||||||
.withMetadata(metadata(localFile))
|
request.localFile.remoteKey.key,
|
||||||
|
request.localFile.file)
|
||||||
|
.withMetadata(metadata(request.localFile))
|
||||||
for {
|
for {
|
||||||
batchMode <- Config.batchMode
|
batchMode <- Config.batchMode
|
||||||
r = if (batchMode) request
|
r = if (batchMode) putRequest
|
||||||
else request.withGeneralProgressListener(listener)
|
else
|
||||||
|
putRequest.withGeneralProgressListener(
|
||||||
|
progressListener(request.uploadEventListener))
|
||||||
} yield r
|
} yield r
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,28 +25,31 @@ trait AmazonS3ClientTestFixture extends MockFactory {
|
||||||
|
|
||||||
override def listObjects(
|
override def listObjects(
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
prefix: RemoteKey): TaskR[Console, S3ObjectsData] =
|
prefix: RemoteKey
|
||||||
|
): TaskR[Console, S3ObjectsData] =
|
||||||
Lister.listObjects(client)(bucket, prefix)
|
Lister.listObjects(client)(bucket, prefix)
|
||||||
|
|
||||||
override def upload(
|
override def upload(
|
||||||
localFile: LocalFile,
|
localFile: LocalFile,
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
uploadEventListener: UploadEventListener,
|
uploadEventListener: UploadEventListener,
|
||||||
tryCount: Int): ZIO[Config, Nothing, StorageQueueEvent] =
|
): ZIO[Config, Nothing, StorageQueueEvent] =
|
||||||
Uploader.upload(transferManager)(localFile,
|
Uploader.upload(transferManager)(
|
||||||
bucket,
|
Uploader.Request(localFile, bucket, uploadEventListener))
|
||||||
uploadEventListener,
|
|
||||||
1)
|
|
||||||
|
|
||||||
override def copy(bucket: Bucket,
|
override def copy(
|
||||||
|
bucket: Bucket,
|
||||||
sourceKey: RemoteKey,
|
sourceKey: RemoteKey,
|
||||||
hash: MD5Hash,
|
hash: MD5Hash,
|
||||||
targetKey: RemoteKey): UIO[StorageQueueEvent] =
|
targetKey: RemoteKey
|
||||||
|
): UIO[StorageQueueEvent] =
|
||||||
Copier.copy(client)(
|
Copier.copy(client)(
|
||||||
Copier.Request(bucket, sourceKey, hash, targetKey))
|
Copier.Request(bucket, sourceKey, hash, targetKey))
|
||||||
|
|
||||||
override def delete(bucket: Bucket,
|
override def delete(
|
||||||
remoteKey: RemoteKey): UIO[StorageQueueEvent] =
|
bucket: Bucket,
|
||||||
|
remoteKey: RemoteKey
|
||||||
|
): UIO[StorageQueueEvent] =
|
||||||
Deleter.delete(client)(bucket, remoteKey)
|
Deleter.delete(client)(bucket, remoteKey)
|
||||||
|
|
||||||
override def shutdown: UIO[StorageQueueEvent] = {
|
override def shutdown: UIO[StorageQueueEvent] = {
|
||||||
|
|
|
@ -27,7 +27,6 @@ class UploaderTest extends FreeSpec with MockFactory {
|
||||||
val remoteKey = RemoteKey("aRemoteKey")
|
val remoteKey = RemoteKey("aRemoteKey")
|
||||||
val localFile = LocalFile(aFile, aSource, hashes, remoteKey)
|
val localFile = LocalFile(aFile, aSource, hashes, remoteKey)
|
||||||
val bucket = Bucket("aBucket")
|
val bucket = Bucket("aBucket")
|
||||||
val tryCount = 1
|
|
||||||
val uploadResult = new UploadResult
|
val uploadResult = new UploadResult
|
||||||
uploadResult.setKey(remoteKey.key)
|
uploadResult.setKey(remoteKey.key)
|
||||||
uploadResult.setETag(aHash.hash)
|
uploadResult.setETag(aHash.hash)
|
||||||
|
@ -47,8 +46,7 @@ class UploaderTest extends FreeSpec with MockFactory {
|
||||||
invoke(fixture.amazonS3TransferManager)(
|
invoke(fixture.amazonS3TransferManager)(
|
||||||
localFile,
|
localFile,
|
||||||
bucket,
|
bucket,
|
||||||
uploadEventListener,
|
uploadEventListener
|
||||||
tryCount
|
|
||||||
)
|
)
|
||||||
assertResult(expected)(result)
|
assertResult(expected)(result)
|
||||||
}
|
}
|
||||||
|
@ -66,8 +64,7 @@ class UploaderTest extends FreeSpec with MockFactory {
|
||||||
invoke(fixture.amazonS3TransferManager)(
|
invoke(fixture.amazonS3TransferManager)(
|
||||||
localFile,
|
localFile,
|
||||||
bucket,
|
bucket,
|
||||||
uploadEventListener,
|
uploadEventListener
|
||||||
tryCount
|
|
||||||
)
|
)
|
||||||
assertResult(expected)(result)
|
assertResult(expected)(result)
|
||||||
}
|
}
|
||||||
|
@ -85,8 +82,7 @@ class UploaderTest extends FreeSpec with MockFactory {
|
||||||
invoke(fixture.amazonS3TransferManager)(
|
invoke(fixture.amazonS3TransferManager)(
|
||||||
localFile,
|
localFile,
|
||||||
bucket,
|
bucket,
|
||||||
uploadEventListener,
|
uploadEventListener
|
||||||
tryCount
|
|
||||||
)
|
)
|
||||||
assertResult(expected)(result)
|
assertResult(expected)(result)
|
||||||
}
|
}
|
||||||
|
@ -94,19 +90,14 @@ class UploaderTest extends FreeSpec with MockFactory {
|
||||||
def invoke(transferManager: AmazonTransferManager)(
|
def invoke(transferManager: AmazonTransferManager)(
|
||||||
localFile: LocalFile,
|
localFile: LocalFile,
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
uploadEventListener: UploadEventListener,
|
uploadEventListener: UploadEventListener
|
||||||
tryCount: Int
|
|
||||||
) = {
|
) = {
|
||||||
type TestEnv = Config
|
type TestEnv = Config
|
||||||
val testEnv: TestEnv = Config.Live
|
val testEnv: TestEnv = Config.Live
|
||||||
new DefaultRuntime {}.unsafeRunSync {
|
new DefaultRuntime {}.unsafeRunSync {
|
||||||
Uploader
|
Uploader
|
||||||
.upload(transferManager)(
|
.upload(transferManager)(
|
||||||
localFile,
|
Uploader.Request(localFile, bucket, uploadEventListener))
|
||||||
bucket,
|
|
||||||
uploadEventListener,
|
|
||||||
tryCount
|
|
||||||
)
|
|
||||||
.provide(testEnv)
|
.provide(testEnv)
|
||||||
}.toEither
|
}.toEither
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue