Split into subprojects (#36)

* [sbt] define existing single module project as legacyRoot

* [sbt] add empty cli module depending on legacyRoot

* [cli] move Main to cli module

* [cli] move ParseArgs to cli module

* [sbt] limit scope of scopt dependency to cli module

* [cli] moved logging config to cli module

* [cli] rename module directory

* [aws-api] added empty module

* [sbt] aggregate builds from cli

* [aws-lib] add empty module

* [core] add empty module

* [sbt] add comment graphing module dependencies

* [sbt] adjust module dependencies to reflect plan

Include legacyRoot at the base until it can be redistributed

* [legacy] make some awssdk classes non-private

during this transition, these classes being private would cause problems

* [aws-lib] create S3ClientBuilder

This is copied from the legacy S3Client companion object

* [domain] add empty module

* [domain] move Bucket into module

* [legacy] RemoteKey no longer has dependency on Config

* [domain] move RemoteKey into module

* [domain] move MD5Hash into module

* [legacy] LocalFile no longer had dependency on MD5HashGenerator

* [domain] move LocalFile into module

* [domain] mode LastModified into module

* [domain] move RemoteMetaData into module

* [domain] move S3MetaData into module

* [domain] move Exclude into module

* [domain] move Filter into module

* [domain] move KeyModified into module

* [domain] move HashModified into module

* [domain] RemoteKey.resolve added

* [domain] add dependency on scalatest

* [domain] LocalFile.resolve added

* [legacy] Remove UnitTest

* [legacy] optimise imports

* [domain] move S3ObjectsData moved into module

* [legacy] wrapper for using GeneralProgressListener

* [domain] move Config into module

* [sbt] move aws-api below legacyRoot in dependencies

This will allow use to move S3Client into the aws-api module

* [legacy] rename S3Client companion as S3ClientBuilder

Preparation to move this into its own file.

* Inject Logger via CLI (#34)

* [S3Client] refactor defaultClient()

* [S3Client] transfermanager explicitly uses the same s3client

* [S3ClientPutObjectUploader] refactor putObjectRequest creation

* [cli] copy in Logging trait as Logger class

* [cli] Main uses Logger

* [cli] simplify Logger and pass to Sync.run

* [legacy] SyncLogging converted to companion

* [cli] Logger info can more easily use levels again

* [legacy] LocalFileStream uses injected info

* [legacy] S3MetaDataEnricher remove unused Logging

* [legacy] ActionGenerator remove unused Logging

* [legacy] convert ActionGenerator to an object

* [legacy] import log methods from SyncLogging

* [legacy] move getS3Status from S3Client to S3MetaDataEnricher

* [legact] convert ActionsSubmitter to an object

* [legacy] convert LocalFileStream to an object

* [legacy] move Action case classes inside companion

* [legacy] move UploadEvent case classes inside companion and rename

* [legacy] move S3Action case classes into companion

* [legacy] convert Sync to an object

* [cli] Logger takes verbosity level at construction

No longer needs to be passed the whole Config implicitly for each info
call.

* [legacy] stop passing implicit Config for logging purposes

Pass a more specific implicit info: Int => String => Unit instead

* [legacy] remove DummyS3Client

* [legacy] remove Logging

* [legacy] convert MD5HashGenerator to an object

* [aws-api] move S3Client into module

* [legacy] convert KeyGenerator to an object

* [legacy] don't use IO.unsafeRunSync directly

* [legacy] refactor/rewrite Sync.run

* [legacy] Rewrite sort using a for-comprehension

* [legacy] Sync inline sorting

* [legacy] SyncLogging rename method

* [legacy] repair tests

* [sbt] move core module to a dependency of legacyRoot

* [sbt] add test dependencies to core module

* [core] move classes into module

* [aws-lib] move classes into module

* [sbt] remove legacy root
This commit is contained in:
Paul Campbell 2019-06-06 19:24:15 +01:00 committed by GitHub
parent b7e79c0b36
commit f54c50aaf3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
95 changed files with 1177 additions and 985 deletions

View file

@ -0,0 +1,40 @@
package net.kemitix.s3thorp.aws.api
import net.kemitix.s3thorp.domain.{MD5Hash, RemoteKey}
sealed trait S3Action {
// the remote key that was uploaded, deleted or otherwise updated by the action
def remoteKey: RemoteKey
val order: Int
}
object S3Action {
final case class DoNothingS3Action(remoteKey: RemoteKey) extends S3Action {
override val order: Int = 0
}
final case class CopyS3Action(remoteKey: RemoteKey) extends S3Action {
override val order: Int = 1
}
final case class UploadS3Action(
remoteKey: RemoteKey,
md5Hash: MD5Hash) extends S3Action {
override val order: Int = 2
}
final case class DeleteS3Action(remoteKey: RemoteKey) extends S3Action {
override val order: Int = 3
}
final case class ErroredS3Action(remoteKey: RemoteKey, e: Throwable) extends S3Action {
override val order: Int = 10
}
implicit def ord[A <: S3Action]: Ordering[A] = Ordering.by(_.order)
}

View file

@ -0,0 +1,32 @@
package net.kemitix.s3thorp.aws.api
import cats.effect.IO
import net.kemitix.s3thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action}
import net.kemitix.s3thorp.domain.{Bucket, LocalFile, MD5Hash, RemoteKey, S3ObjectsData}
trait S3Client {
def listObjects(bucket: Bucket,
prefix: RemoteKey
)(implicit info: Int => String => Unit): IO[S3ObjectsData]
def upload(localFile: LocalFile,
bucket: Bucket,
uploadProgressListener: UploadProgressListener,
multiPartThreshold: Long,
tryCount: Int,
maxRetries: Int)
(implicit info: Int => String => Unit,
warn: String => Unit): IO[S3Action]
def copy(bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey
)(implicit info: Int => String => Unit): IO[CopyS3Action]
def delete(bucket: Bucket,
remoteKey: RemoteKey
)(implicit info: Int => String => Unit): IO[DeleteS3Action]
}

View file

@ -0,0 +1,17 @@
package net.kemitix.s3thorp.aws.api
sealed trait UploadEvent {
def name: String
}
object UploadEvent {
final case class TransferEvent(name: String) extends UploadEvent
final case class RequestEvent(name: String,
bytes: Long,
transferred: Long) extends UploadEvent
final case class ByteTransferEvent(name: String) extends UploadEvent
}

View file

@ -0,0 +1,16 @@
package net.kemitix.s3thorp.aws.api
import net.kemitix.s3thorp.aws.api.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent}
import net.kemitix.s3thorp.domain.LocalFile
class UploadProgressListener(localFile: LocalFile)
(implicit info: Int => String => Unit)
extends UploadProgressLogging {
def listener: UploadEvent => Unit =
{
case e: TransferEvent => logTransfer(localFile, e)
case e: RequestEvent => logRequestCycle(localFile, e)
case e: ByteTransferEvent => logByteTransfer(e)
}
}

View file

@ -0,0 +1,22 @@
package net.kemitix.s3thorp.aws.api
import net.kemitix.s3thorp.aws.api.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent}
import net.kemitix.s3thorp.domain.LocalFile
trait UploadProgressLogging {
def logTransfer(localFile: LocalFile,
event: TransferEvent)
(implicit info: Int => String => Unit): Unit =
info(2)(s"Transfer:${event.name}: ${localFile.remoteKey.key}")
def logRequestCycle(localFile: LocalFile,
event: RequestEvent)
(implicit info: Int => String => Unit): Unit =
info(3)(s"Uploading:${event.name}:${event.transferred}/${event.bytes}:${localFile.remoteKey.key}")
def logByteTransfer(event: ByteTransferEvent)
(implicit info: Int => String => Unit): Unit =
info(3)(".")
}

View file

@ -1,4 +1,4 @@
package net.kemitix.s3thorp.awssdk
package net.kemitix.s3thorp.aws.lib
final case class CancellableMultiPartUpload(
e: Throwable,

View file

@ -1,4 +1,4 @@
package net.kemitix.s3thorp.awssdk
package net.kemitix.s3thorp.aws.lib
import com.github.j5ik2o.reactive.aws.s3.S3AsyncClient
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient

View file

@ -1,4 +1,4 @@
package net.kemitix.s3thorp.awssdk
package net.kemitix.s3thorp.aws.lib
trait QuoteStripper {

View file

@ -0,0 +1,22 @@
package net.kemitix.s3thorp.aws.lib
import com.amazonaws.services.s3.transfer.{TransferManager, TransferManagerBuilder}
import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder}
import com.github.j5ik2o.reactive.aws.s3.S3AsyncClient
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
import net.kemitix.s3thorp.aws.api.S3Client
object S3ClientBuilder {
def createClient(s3AsyncClient: S3AsyncClient,
amazonS3Client: AmazonS3,
amazonS3TransferManager: TransferManager): S3Client = {
new ThorpS3Client(S3CatsIOClient(s3AsyncClient), amazonS3Client, amazonS3TransferManager)
}
val defaultClient: S3Client =
createClient(new JavaClientWrapper {}.underlying,
AmazonS3ClientBuilder.defaultClient,
TransferManagerBuilder.defaultTransferManager)
}

View file

@ -1,18 +1,19 @@
package net.kemitix.s3thorp.awssdk
package net.kemitix.s3thorp.aws.lib
import cats.effect.IO
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
import net.kemitix.s3thorp.{Bucket, Config, CopyS3Action, MD5Hash, RemoteKey}
import net.kemitix.s3thorp.aws.api.S3Action.CopyS3Action
import net.kemitix.s3thorp.domain.{Bucket, MD5Hash, RemoteKey}
import software.amazon.awssdk.services.s3.model.CopyObjectRequest
private class S3ClientCopier(s3Client: S3CatsIOClient)
class S3ClientCopier(s3Client: S3CatsIOClient)
extends S3ClientLogging {
def copy(bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey)
(implicit c: Config): IO[CopyS3Action] = {
(implicit info: Int => String => Unit): IO[CopyS3Action] = {
val request = CopyObjectRequest.builder
.bucket(bucket.name)
.copySource(s"${bucket.name}/${sourceKey.key}")

View file

@ -1,16 +1,17 @@
package net.kemitix.s3thorp.awssdk
package net.kemitix.s3thorp.aws.lib
import cats.effect.IO
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
import net.kemitix.s3thorp.{Bucket, Config, DeleteS3Action, RemoteKey}
import net.kemitix.s3thorp.aws.api.S3Action.DeleteS3Action
import net.kemitix.s3thorp.domain.{Bucket, RemoteKey}
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest
private class S3ClientDeleter(s3Client: S3CatsIOClient)
class S3ClientDeleter(s3Client: S3CatsIOClient)
extends S3ClientLogging {
def delete(bucket: Bucket,
remoteKey: RemoteKey)
(implicit c: Config): IO[DeleteS3Action] = {
(implicit info: Int => String => Unit): IO[DeleteS3Action] = {
val request = DeleteObjectRequest.builder
.bucket(bucket.name)
.key(remoteKey.key).build

View file

@ -0,0 +1,80 @@
package net.kemitix.s3thorp.aws.lib
import cats.effect.IO
import com.amazonaws.services.s3.model.PutObjectResult
import net.kemitix.s3thorp.domain.{Bucket, LocalFile, RemoteKey}
import software.amazon.awssdk.services.s3.model.{CopyObjectResponse, DeleteObjectResponse, ListObjectsV2Response}
trait S3ClientLogging {
def logListObjectsStart(bucket: Bucket,
prefix: RemoteKey)
(implicit info: Int => String => Unit): ListObjectsV2Response => IO[ListObjectsV2Response] = {
in => IO {
info(3)(s"Fetch S3 Summary: ${bucket.name}:${prefix.key}")
in
}
}
def logListObjectsFinish(bucket: Bucket,
prefix: RemoteKey)
(implicit info: Int => String => Unit): ListObjectsV2Response => IO[Unit] = {
in => IO {
info(2)(s"Fetched S3 Summary: ${bucket.name}:${prefix.key}")
}
}
def logUploadStart(localFile: LocalFile,
bucket: Bucket)
(implicit info: Int => String => Unit): PutObjectResult => IO[PutObjectResult] = {
in => IO {
info(4)(s"Uploading: ${bucket.name}:${localFile.remoteKey.key}")
in
}
}
def logUploadFinish(localFile: LocalFile,
bucket: Bucket)
(implicit info: Int => String => Unit): PutObjectResult => IO[Unit] = {
in =>IO {
info(1)(s"Uploaded: ${bucket.name}:${localFile.remoteKey.key}")
}
}
def logCopyStart(bucket: Bucket,
sourceKey: RemoteKey,
targetKey: RemoteKey)
(implicit info: Int => String => Unit): CopyObjectResponse => IO[CopyObjectResponse] = {
in => IO {
info(4)(s"Copy: ${bucket.name}:${sourceKey.key} => ${targetKey.key}")
in
}
}
def logCopyFinish(bucket: Bucket,
sourceKey: RemoteKey,
targetKey: RemoteKey)
(implicit info: Int => String => Unit): CopyObjectResponse => IO[Unit] = {
in => IO {
info(3)(s"Copied: ${bucket.name}:${sourceKey.key} => ${targetKey.key}")
}
}
def logDeleteStart(bucket: Bucket,
remoteKey: RemoteKey)
(implicit info: Int => String => Unit): DeleteObjectResponse => IO[DeleteObjectResponse] = {
in => IO {
info(4)(s"Delete: ${bucket.name}:${remoteKey.key}")
in
}
}
def logDeleteFinish(bucket: Bucket,
remoteKey: RemoteKey)
(implicit info: Int => String => Unit): DeleteObjectResponse => IO[Unit] = {
in => IO {
info(3)(s"Deleted: ${bucket.name}:${remoteKey.key}")
}
}
}

View file

@ -1,27 +1,32 @@
package net.kemitix.s3thorp.awssdk
package net.kemitix.s3thorp.aws.lib
import cats.effect.IO
import com.amazonaws.services.s3.model.PutObjectRequest
import com.amazonaws.services.s3.transfer.TransferManager
import net.kemitix.s3thorp._
import net.kemitix.s3thorp.aws.api.S3Action.UploadS3Action
import net.kemitix.s3thorp.aws.api.{S3Action, UploadProgressListener}
import net.kemitix.s3thorp.domain.{Bucket, LocalFile, MD5Hash, RemoteKey}
class S3ClientMultiPartTransferManager(transferManager: => TransferManager)
extends S3ClientUploader
with S3ClientMultiPartUploaderLogging {
def accepts(localFile: LocalFile)
(implicit c: Config): Boolean =
localFile.file.length >= c.multiPartThreshold
(implicit multiPartThreshold: Long): Boolean =
localFile.file.length >= multiPartThreshold
override
def upload(localFile: LocalFile,
bucket: Bucket,
progressListener: UploadProgressListener,
tryCount: Int)
(implicit c: Config): IO[S3Action] = {
uploadProgressListener: UploadProgressListener,
multiPartThreshold: Long,
tryCount: Int,
maxRetries: Int)
(implicit info: Int => String => Unit,
warn: String => Unit): IO[S3Action] = {
val putObjectRequest: PutObjectRequest =
new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file)
.withGeneralProgressListener(progressListener.listener)
.withGeneralProgressListener(progressListener(uploadProgressListener))
IO {
logMultiPartUploadStart(localFile, tryCount)
val result = transferManager.upload(putObjectRequest)

View file

@ -1,26 +1,28 @@
package net.kemitix.s3thorp.awssdk
package net.kemitix.s3thorp.aws.lib
import scala.collection.JavaConverters._
import cats.effect.IO
import cats.implicits._
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.{AbortMultipartUploadRequest, AmazonS3Exception, CompleteMultipartUploadRequest, CompleteMultipartUploadResult, InitiateMultipartUploadRequest, InitiateMultipartUploadResult, PartETag, UploadPartRequest, UploadPartResult}
import net.kemitix.s3thorp._
import com.amazonaws.services.s3.model._
import net.kemitix.s3thorp.aws.api.S3Action.{ErroredS3Action, UploadS3Action}
import net.kemitix.s3thorp.aws.api.{S3Action, UploadProgressListener}
import net.kemitix.s3thorp.core.MD5HashGenerator.md5FilePart
import net.kemitix.s3thorp.domain.{Bucket, LocalFile, MD5Hash}
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
private class S3ClientMultiPartUploader(s3Client: AmazonS3)
extends S3ClientUploader
with S3ClientMultiPartUploaderLogging
with MD5HashGenerator
with QuoteStripper {
def accepts(localFile: LocalFile)
(implicit c: Config): Boolean =
localFile.file.length >= c.multiPartThreshold
(implicit multiPartThreshold: Long): Boolean =
localFile.file.length >= multiPartThreshold
def createUpload(bucket: Bucket, localFile: LocalFile)
(implicit c: Config): IO[InitiateMultipartUploadResult] = {
(implicit info: Int => String => Unit): IO[InitiateMultipartUploadResult] = {
logMultiPartUploadInitiate(localFile)
IO(s3Client initiateMultipartUpload createUploadRequest(bucket, localFile))
}
@ -30,12 +32,13 @@ private class S3ClientMultiPartUploader(s3Client: AmazonS3)
bucket.name,
localFile.remoteKey.key)
def parts(localFile: LocalFile,
response: InitiateMultipartUploadResult)
(implicit c: Config): IO[Stream[UploadPartRequest]] = {
def parts(bucket: Bucket,
localFile: LocalFile,
response: InitiateMultipartUploadResult,
threshold: Long)
(implicit info: Int => String => Unit): IO[Stream[UploadPartRequest]] = {
val fileSize = localFile.file.length
val maxParts = 1024 // arbitrary, supports upto 10,000 (I, think)
val threshold = c.multiPartThreshold
val nParts = Math.min((fileSize / threshold) + 1, maxParts).toInt
val partSize = fileSize / nParts
val maxUpload = nParts * partSize
@ -50,19 +53,19 @@ private class S3ClientMultiPartUploader(s3Client: AmazonS3)
chunkSize = Math.min(fileSize - offSet, partSize)
partHash = md5FilePart(localFile.file, offSet, chunkSize)
_ = logMultiPartUploadPartDetails(localFile, partNumber, partHash)
uploadPartRequest = createUploadPartRequest(localFile, response, partNumber, chunkSize, partHash)
uploadPartRequest = createUploadPartRequest(bucket, localFile, response, partNumber, chunkSize, partHash)
} yield uploadPartRequest
}
}
private def createUploadPartRequest(localFile: LocalFile,
private def createUploadPartRequest(bucket: Bucket,
localFile: LocalFile,
response: InitiateMultipartUploadResult,
partNumber: Int,
chunkSize: Long,
partHash: MD5Hash)
(implicit c: Config) = {
partHash: MD5Hash) = {
new UploadPartRequest()
.withBucketName(c.bucket.name)
.withBucketName(bucket.name)
.withKey(localFile.remoteKey.key)
.withUploadId(response.getUploadId)
.withPartNumber(partNumber)
@ -73,7 +76,8 @@ private class S3ClientMultiPartUploader(s3Client: AmazonS3)
}
def uploadPart(localFile: LocalFile)
(implicit c: Config): UploadPartRequest => IO[UploadPartResult] =
(implicit info: Int => String => Unit,
warn: String => Unit): UploadPartRequest => IO[UploadPartResult] =
partRequest => {
logMultiPartUploadPart(localFile, partRequest)
IO(s3Client.uploadPart(partRequest))
@ -86,13 +90,14 @@ private class S3ClientMultiPartUploader(s3Client: AmazonS3)
def uploadParts(localFile: LocalFile,
parts: Stream[UploadPartRequest])
(implicit c: Config): IO[Stream[UploadPartResult]] =
(implicit info: Int => String => Unit,
warn: String => Unit): IO[Stream[UploadPartResult]] =
(parts map uploadPart(localFile)).sequence
def completeUpload(createUploadResponse: InitiateMultipartUploadResult,
uploadPartResponses: Stream[UploadPartResult],
localFile: LocalFile)
(implicit c: Config): IO[CompleteMultipartUploadResult] = {
(implicit info: Int => String => Unit): IO[CompleteMultipartUploadResult] = {
logMultiPartUploadCompleted(createUploadResponse, uploadPartResponses, localFile)
IO(s3Client completeMultipartUpload createCompleteRequest(createUploadResponse, uploadPartResponses.toList))
}
@ -106,27 +111,33 @@ private class S3ClientMultiPartUploader(s3Client: AmazonS3)
.withPartETags(uploadPartResult.asJava)
}
def cancel(uploadId: String, localFile: LocalFile)
(implicit c: Config): IO[Unit] = {
def cancel(uploadId: String,
bucket: Bucket,
localFile: LocalFile)
(implicit info: Int => String => Unit,
warn: String => Unit): IO[Unit] = {
logMultiPartUploadCancelling(localFile)
IO(s3Client abortMultipartUpload createAbortRequest(uploadId, localFile))
IO(s3Client abortMultipartUpload createAbortRequest(uploadId, bucket, localFile))
}
def createAbortRequest(uploadId: String,
localFile: LocalFile)
(implicit c: Config): AbortMultipartUploadRequest =
new AbortMultipartUploadRequest(c.bucket.name, localFile.remoteKey.key, uploadId)
bucket: Bucket,
localFile: LocalFile): AbortMultipartUploadRequest =
new AbortMultipartUploadRequest(bucket.name, localFile.remoteKey.key, uploadId)
override def upload(localFile: LocalFile,
bucket: Bucket,
progressListener: UploadProgressListener,
tryCount: Int)
(implicit c: Config): IO[S3Action] = {
multiPartThreshold: Long,
tryCount: Int,
maxRetries: Int)
(implicit info: Int => String => Unit,
warn: String => Unit): IO[S3Action] = {
logMultiPartUploadStart(localFile, tryCount)
(for {
createUploadResponse <- createUpload(bucket, localFile)
parts <- parts(localFile, createUploadResponse)
parts <- parts(bucket, localFile, createUploadResponse, multiPartThreshold)
uploadPartResponses <- uploadParts(localFile, parts)
completedUploadResponse <- completeUpload(createUploadResponse, uploadPartResponses, localFile)
} yield completedUploadResponse)
@ -136,11 +147,11 @@ private class S3ClientMultiPartUploader(s3Client: AmazonS3)
.map(UploadS3Action(localFile.remoteKey, _))
.handleErrorWith {
case CancellableMultiPartUpload(e, uploadId) =>
if (tryCount >= c.maxRetries) IO(logErrorCancelling(e, localFile)) *> cancel(uploadId, localFile) *> IO.pure(ErroredS3Action(localFile.remoteKey, e))
else IO(logErrorRetrying(e, localFile, tryCount)) *> upload(localFile, bucket, progressListener, tryCount + 1)
if (tryCount >= maxRetries) IO(logErrorCancelling(e, localFile)) *> cancel(uploadId, bucket, localFile) *> IO.pure(ErroredS3Action(localFile.remoteKey, e))
else IO(logErrorRetrying(e, localFile, tryCount)) *> upload(localFile, bucket, progressListener, multiPartThreshold, tryCount + 1, maxRetries)
case NonFatal(e) =>
if (tryCount >= c.maxRetries) IO(logErrorUnknown(e, localFile)) *> IO.pure(ErroredS3Action(localFile.remoteKey, e))
else IO(logErrorRetrying(e, localFile, tryCount)) *> upload(localFile, bucket, progressListener, tryCount + 1)
if (tryCount >= maxRetries) IO(logErrorUnknown(e, localFile)) *> IO.pure(ErroredS3Action(localFile.remoteKey, e))
else IO(logErrorRetrying(e, localFile, tryCount)) *> upload(localFile, bucket, progressListener, multiPartThreshold, tryCount + 1, maxRetries)
}
}
}

View file

@ -1,7 +1,7 @@
package net.kemitix.s3thorp.awssdk
package net.kemitix.s3thorp.aws.lib
import com.amazonaws.services.s3.model.{AmazonS3Exception, InitiateMultipartUploadResult, UploadPartRequest, UploadPartResult}
import net.kemitix.s3thorp.{Config, LocalFile, MD5Hash}
import net.kemitix.s3thorp.domain.{LocalFile, MD5Hash}
trait S3ClientMultiPartUploaderLogging
extends S3ClientLogging {
@ -10,44 +10,44 @@ trait S3ClientMultiPartUploaderLogging
def logMultiPartUploadStart(localFile: LocalFile,
tryCount: Int)
(implicit c: Config): Unit =
log1(s"$prefix:upload:try $tryCount: ${localFile.remoteKey.key}")
(implicit info: Int => String => Unit): Unit =
info(1)(s"$prefix:upload:try $tryCount: ${localFile.remoteKey.key}")
def logMultiPartUploadFinished(localFile: LocalFile)
(implicit c: Config): Unit =
log4(s"$prefix:upload:finished: ${localFile.remoteKey.key}")
(implicit info: Int => String => Unit): Unit =
info(4)(s"$prefix:upload:finished: ${localFile.remoteKey.key}")
def logMultiPartUploadInitiate(localFile: LocalFile)
(implicit c: Config): Unit =
log5(s"$prefix:initiating: ${localFile.remoteKey.key}")
(implicit info: Int => String => Unit): Unit =
info(5)(s"$prefix:initiating: ${localFile.remoteKey.key}")
def logMultiPartUploadPartsDetails(localFile: LocalFile,
nParts: Int,
partSize: Long)
(implicit c: Config): Unit =
log5(s"$prefix:parts $nParts:each $partSize: ${localFile.remoteKey.key}")
(implicit info: Int => String => Unit): Unit =
info(5)(s"$prefix:parts $nParts:each $partSize: ${localFile.remoteKey.key}")
def logMultiPartUploadPartDetails(localFile: LocalFile,
partNumber: Int,
partHash: MD5Hash)
(implicit c: Config): Unit =
log5(s"$prefix:part $partNumber:hash ${partHash.hash}: ${localFile.remoteKey.key}")
(implicit info: Int => String => Unit): Unit =
info(5)(s"$prefix:part $partNumber:hash ${partHash.hash}: ${localFile.remoteKey.key}")
def logMultiPartUploadPart(localFile: LocalFile,
partRequest: UploadPartRequest)
(implicit c: Config): Unit =
log5(s"$prefix:sending:part ${partRequest.getPartNumber}: ${partRequest.getMd5Digest}: ${localFile.remoteKey.key}")
(implicit info: Int => String => Unit): Unit =
info(5)(s"$prefix:sending:part ${partRequest.getPartNumber}: ${partRequest.getMd5Digest}: ${localFile.remoteKey.key}")
def logMultiPartUploadPartDone(localFile: LocalFile,
partRequest: UploadPartRequest,
result: UploadPartResult)
(implicit c: Config): Unit =
log5(s"$prefix:sent:part ${partRequest.getPartNumber}: ${result.getPartETag}: ${localFile.remoteKey.key}")
(implicit info: Int => String => Unit): Unit =
info(5)(s"$prefix:sent:part ${partRequest.getPartNumber}: ${result.getPartETag}: ${localFile.remoteKey.key}")
def logMultiPartUploadPartError(localFile: LocalFile,
partRequest: UploadPartRequest,
error: AmazonS3Exception)
(implicit c: Config): Unit = {
(implicit warn: String => Unit): Unit = {
val returnedMD5Hash = error.getAdditionalDetails.get("Content-MD5")
warn(s"$prefix:error:part ${partRequest.getPartNumber}:ret-hash $returnedMD5Hash: ${localFile.remoteKey.key}")
}
@ -55,23 +55,23 @@ trait S3ClientMultiPartUploaderLogging
def logMultiPartUploadCompleted(createUploadResponse: InitiateMultipartUploadResult,
uploadPartResponses: Stream[UploadPartResult],
localFile: LocalFile)
(implicit c: Config): Unit =
log1(s"$prefix:completed:parts ${uploadPartResponses.size}: ${localFile.remoteKey.key}")
(implicit info: Int => String => Unit): Unit =
info(1)(s"$prefix:completed:parts ${uploadPartResponses.size}: ${localFile.remoteKey.key}")
def logMultiPartUploadCancelling(localFile: LocalFile)
(implicit c: Config): Unit =
(implicit warn: String => Unit): Unit =
warn(s"$prefix:cancelling: ${localFile.remoteKey.key}")
def logErrorRetrying(e: Throwable, localFile: LocalFile, tryCount: Int)
(implicit c: Config): Unit =
(implicit warn: String => Unit): Unit =
warn(s"$prefix:retry:error ${e.getMessage}: ${localFile.remoteKey.key}")
def logErrorCancelling(e: Throwable, localFile: LocalFile)
(implicit c: Config) : Unit =
(implicit error: String => Unit) : Unit =
error(s"$prefix:cancelling:error ${e.getMessage}: ${localFile.remoteKey.key}")
def logErrorUnknown(e: Throwable, localFile: LocalFile)
(implicit c: Config): Unit =
(implicit error: String => Unit): Unit =
error(s"$prefix:unknown:error $e: ${localFile.remoteKey.key}")
}

View file

@ -1,19 +1,20 @@
package net.kemitix.s3thorp.awssdk
package net.kemitix.s3thorp.aws.lib
import cats.effect.IO
import net.kemitix.s3thorp.{Bucket, Config, HashModified, LastModified, MD5Hash, RemoteKey}
import software.amazon.awssdk.services.s3.model.{ListObjectsV2Request, S3Object}
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
import net.kemitix.s3thorp.domain._
import software.amazon.awssdk.services.s3.model.{ListObjectsV2Request, S3Object}
import scala.collection.JavaConverters._
private class S3ClientObjectLister(s3Client: S3CatsIOClient)
class S3ClientObjectLister(s3Client: S3CatsIOClient)
extends S3ClientLogging
with S3ObjectsByHash
with QuoteStripper {
def listObjects(bucket: Bucket,
prefix: RemoteKey)
(implicit c: Config): IO[S3ObjectsData] = {
(implicit info: Int => String => Unit): IO[S3ObjectsData] = {
val request = ListObjectsV2Request.builder
.bucket(bucket.name)
.prefix(prefix.key).build

View file

@ -0,0 +1,43 @@
package net.kemitix.s3thorp.aws.lib
import cats.effect.IO
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.PutObjectRequest
import net.kemitix.s3thorp.aws.api.S3Action.UploadS3Action
import net.kemitix.s3thorp.aws.api.UploadProgressListener
import net.kemitix.s3thorp.domain.{Bucket, LocalFile, MD5Hash}
class S3ClientPutObjectUploader(amazonS3: => AmazonS3)
extends S3ClientUploader
with S3ClientLogging
with QuoteStripper {
override def accepts(localFile: LocalFile)(implicit multiPartThreshold: Long): Boolean = true
override def upload(localFile: LocalFile,
bucket: Bucket,
uploadProgressListener: UploadProgressListener,
multiPartThreshold: Long,
tryCount: Int,
maxRetries: Int)
(implicit info: Int => String => Unit,
warn: String => Unit): IO[UploadS3Action] = {
val request = putObjectRequest(localFile, bucket, uploadProgressListener)
IO(amazonS3.putObject(request))
.bracket(
logUploadStart(localFile, bucket))(
logUploadFinish(localFile, bucket))
.map(_.getETag)
.map(_ filter stripQuotes)
.map(MD5Hash)
.map(UploadS3Action(localFile.remoteKey, _))
}
private def putObjectRequest(localFile: LocalFile,
bucket: Bucket,
uploadProgressListener: UploadProgressListener
): PutObjectRequest = {
new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file)
.withGeneralProgressListener(progressListener(uploadProgressListener))
}
}

View file

@ -0,0 +1,36 @@
package net.kemitix.s3thorp.aws.lib
import cats.effect.IO
import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener}
import net.kemitix.s3thorp.aws.api.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent}
import net.kemitix.s3thorp.aws.api.{S3Action, UploadProgressListener}
import net.kemitix.s3thorp.domain.{Bucket, LocalFile}
trait S3ClientUploader {
def accepts(localFile: LocalFile)
(implicit multiPartThreshold: Long): Boolean
def upload(localFile: LocalFile,
bucket: Bucket,
progressListener: UploadProgressListener,
multiPartThreshold: Long,
tryCount: Int,
maxRetries: Int)
(implicit info: Int => String => Unit,
warn: String => Unit): IO[S3Action]
def progressListener(uploadProgressListener: UploadProgressListener): ProgressListener = {
new ProgressListener {
override def progressChanged(event: ProgressEvent): Unit = {
if (event.getEventType.isTransferEvent)
TransferEvent(event.getEventType.name)
else if (event.getEventType equals ProgressEventType.RESPONSE_BYTE_TRANSFER_EVENT)
ByteTransferEvent(event.getEventType.name)
else
RequestEvent(event.getEventType.name, event.getBytes, event.getBytesTransferred)
}
}
}
}

View file

@ -1,6 +1,6 @@
package net.kemitix.s3thorp.awssdk
package net.kemitix.s3thorp.aws.lib
import net.kemitix.s3thorp.{KeyModified, LastModified, MD5Hash, RemoteKey}
import net.kemitix.s3thorp.domain.{KeyModified, LastModified, MD5Hash, RemoteKey}
import software.amazon.awssdk.services.s3.model.S3Object
trait S3ObjectsByHash {

View file

@ -1,13 +1,15 @@
package net.kemitix.s3thorp.awssdk
package net.kemitix.s3thorp.aws.lib
import cats.effect.IO
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.transfer.TransferManager
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
import net.kemitix.s3thorp._
import net.kemitix.s3thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action}
import net.kemitix.s3thorp.aws.api.{S3Action, S3Client, UploadProgressListener}
import net.kemitix.s3thorp.domain._
import software.amazon.awssdk.services.s3.model.{Bucket => _}
private class ThorpS3Client(ioS3Client: S3CatsIOClient,
class ThorpS3Client(ioS3Client: S3CatsIOClient,
amazonS3Client: => AmazonS3,
amazonS3TransferManager: => TransferManager)
extends S3Client
@ -24,7 +26,7 @@ private class ThorpS3Client(ioS3Client: S3CatsIOClient,
override def listObjects(bucket: Bucket,
prefix: RemoteKey)
(implicit c: Config): IO[S3ObjectsData] =
(implicit info: Int => String => Unit): IO[S3ObjectsData] =
objectLister.listObjects(bucket, prefix)
@ -32,22 +34,26 @@ private class ThorpS3Client(ioS3Client: S3CatsIOClient,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey)
(implicit c: Config): IO[CopyS3Action] =
(implicit info: Int => String => Unit): IO[CopyS3Action] =
copier.copy(bucket, sourceKey,hash, targetKey)
override def upload(localFile: LocalFile,
bucket: Bucket,
progressListener: UploadProgressListener,
tryCount: Int)
(implicit c: Config): IO[S3Action] =
if (multiPartUploader.accepts(localFile)) multiPartUploader.upload(localFile, bucket, progressListener, 1)
else uploader.upload(localFile, bucket, progressListener, tryCount)
multiPartThreshold: Long,
tryCount: Int,
maxRetries: Int)
(implicit info: Int => String => Unit,
warn: String => Unit): IO[S3Action] =
if (multiPartUploader.accepts(localFile)(multiPartThreshold))
multiPartUploader.upload(localFile, bucket, progressListener, multiPartThreshold, 1, maxRetries)
else
uploader.upload(localFile, bucket, progressListener, multiPartThreshold, tryCount, maxRetries)
override def delete(bucket: Bucket,
remoteKey: RemoteKey)
(implicit c: Config): IO[DeleteS3Action] =
(implicit info: Int => String => Unit): IO[DeleteS3Action] =
deleter.delete(bucket, remoteKey)
}

View file

@ -1,18 +1,18 @@
package net.kemitix.s3thorp.awssdk
package net.kemitix.s3thorp.aws.lib
import java.io.{File, InputStream}
import java.net.URL
import java.util
import java.util.Date
import com.amazonaws.{AmazonWebServiceRequest, HttpMethod}
import com.amazonaws.regions.Region
import com.amazonaws.services.s3.model._
import com.amazonaws.services.s3.model.analytics.AnalyticsConfiguration
import com.amazonaws.services.s3.model.inventory.InventoryConfiguration
import com.amazonaws.services.s3.{AmazonS3, S3ClientOptions, S3ResponseMetadata, model}
import com.amazonaws.services.s3.model.metrics.MetricsConfiguration
import com.amazonaws.services.s3.model._
import com.amazonaws.services.s3.waiters.AmazonS3Waiters
import com.amazonaws.services.s3.{AmazonS3, S3ClientOptions, S3ResponseMetadata, model}
import com.amazonaws.{AmazonWebServiceRequest, HttpMethod}
abstract class MyAmazonS3 extends AmazonS3 {
override def setEndpoint(endpoint: String): Unit = ???

View file

@ -1,18 +1,18 @@
package net.kemitix.s3thorp.awssdk
package net.kemitix.s3thorp.aws.lib
import java.io.{File, InputStream}
import java.net.URL
import java.util
import java.util.Date
import com.amazonaws.{AmazonWebServiceRequest, HttpMethod}
import com.amazonaws.regions.Region
import com.amazonaws.services.s3.model._
import com.amazonaws.services.s3.model.analytics.AnalyticsConfiguration
import com.amazonaws.services.s3.model.inventory.InventoryConfiguration
import com.amazonaws.services.s3.{AmazonS3, S3ClientOptions, S3ResponseMetadata, model}
import com.amazonaws.services.s3.model.metrics.MetricsConfiguration
import com.amazonaws.services.s3.model._
import com.amazonaws.services.s3.waiters.AmazonS3Waiters
import com.amazonaws.services.s3.{AmazonS3, S3ClientOptions, S3ResponseMetadata, model}
import com.amazonaws.{AmazonWebServiceRequest, HttpMethod}
class MyAmazonS3Client extends AmazonS3 {
override def setEndpoint(endpoint: String): Unit = ???

View file

@ -1,4 +1,4 @@
package net.kemitix.s3thorp.awssdk
package net.kemitix.s3thorp.aws.lib
import com.github.j5ik2o.reactive.aws.s3.S3AsyncClient
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient

View file

@ -1,4 +1,4 @@
package net.kemitix.s3thorp.awssdk
package net.kemitix.s3thorp.aws.lib
import java.io.File
import java.time.Instant
@ -6,17 +6,25 @@ import java.time.Instant
import com.amazonaws.AmazonClientException
import com.amazonaws.services.s3.model
import com.amazonaws.services.s3.transfer.model.UploadResult
import com.amazonaws.services.s3.transfer.{PauseResult, PersistableUpload, Transfer, TransferManager, TransferManagerBuilder, TransferProgress, Upload}
import net.kemitix.s3thorp.{Bucket, Config, KeyGenerator, LastModified, MD5Hash, MD5HashGenerator, RemoteKey, Resource, UnitTest, UploadS3Action}
import com.amazonaws.services.s3.transfer._
import net.kemitix.s3thorp.aws.api.S3Action.UploadS3Action
import net.kemitix.s3thorp.aws.api.UploadProgressListener
import net.kemitix.s3thorp.aws.lib.S3ClientMultiPartTransferManager
import net.kemitix.s3thorp.core.KeyGenerator.generateKey
import net.kemitix.s3thorp.core.{MD5HashGenerator, Resource}
import net.kemitix.s3thorp.domain._
import org.scalatest.FunSpec
class S3ClientMultiPartTransferManagerSuite
extends UnitTest
with KeyGenerator {
extends FunSpec {
private val source = Resource(this, "..")
private val source = Resource(this, ".")
private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
implicit private val logInfo: Int => String => Unit = l => m => ()
implicit private val logWarn: String => Unit = w => ()
private val fileToKey = generateKey(config.source, config.prefix) _
private val fileToHash = (file: File) => MD5HashGenerator.md5File(file)
val lastModified = LastModified(Instant.now())
describe("S3ClientMultiPartTransferManagerSuite") {
@ -24,21 +32,21 @@ class S3ClientMultiPartTransferManagerSuite
val transferManager = new MyTransferManager(("", "", new File("")), RemoteKey(""), MD5Hash(""))
val uploader = new S3ClientMultiPartTransferManager(transferManager)
describe("small-file") {
val smallFile = aLocalFile("small-file", MD5Hash("the-hash"), source, fileToKey)
val smallFile = LocalFile.resolve("small-file", MD5Hash("the-hash"), source, fileToKey, fileToHash)
it("should be a small-file") {
assert(smallFile.file.length < 5 * 1024 * 1024)
}
it("should not accept small-file") {
assertResult(false)(uploader.accepts(smallFile))
assertResult(false)(uploader.accepts(smallFile)(config.multiPartThreshold))
}
}
describe("big-file") {
val bigFile = aLocalFile("big-file", MD5Hash("the-hash"), source, fileToKey)
val bigFile = LocalFile.resolve("big-file", MD5Hash("the-hash"), source, fileToKey, fileToHash)
it("should be a big-file") {
assert(bigFile.file.length > 5 * 1024 * 1024)
}
it("should accept big-file") {
assertResult(true)(uploader.accepts(bigFile))
assertResult(true)(uploader.accepts(bigFile)(config.multiPartThreshold))
}
}
}
@ -50,7 +58,7 @@ class S3ClientMultiPartTransferManagerSuite
// dies when putObject is called
val returnedKey = RemoteKey("returned-key")
val returnedHash = MD5Hash("returned-hash")
val bigFile = aLocalFile("small-file", MD5Hash("the-hash"), source, fileToKey)
val bigFile = LocalFile.resolve("small-file", MD5Hash("the-hash"), source, fileToKey, fileToHash)
val progressListener = new UploadProgressListener(bigFile)
val amazonS3 = new MyAmazonS3 {}
val amazonS3TransferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build
@ -61,7 +69,7 @@ class S3ClientMultiPartTransferManagerSuite
val uploader = new S3ClientMultiPartTransferManager(amazonS3TransferManager)
it("should upload") {
val expected = UploadS3Action(returnedKey, returnedHash)
val result = uploader.upload(bigFile, config.bucket, progressListener, 1).unsafeRunSync
val result = uploader.upload(bigFile, config.bucket, progressListener, config.multiPartThreshold, 1, config.maxRetries).unsafeRunSync
assertResult(expected)(result)
}
}

View file

@ -1,19 +1,27 @@
package net.kemitix.s3thorp.awssdk
package net.kemitix.s3thorp.aws.lib
import scala.collection.JavaConverters._
import java.io.File
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
import com.amazonaws.services.s3.model.{Bucket => _, _}
import net.kemitix.s3thorp._
import net.kemitix.s3thorp.aws.api.UploadProgressListener
import net.kemitix.s3thorp.core.{KeyGenerator, MD5HashGenerator, Resource}
import net.kemitix.s3thorp.domain._
import org.scalatest.FunSpec
import scala.collection.JavaConverters._
class S3ClientMultiPartUploaderSuite
extends UnitTest
with KeyGenerator {
extends FunSpec {
private val source = Resource(this, "..")
private val source = Resource(this, ".")
private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
private val fileToKey = generateKey(config.source, config.prefix) _
private val bucket = Bucket("bucket")
implicit private val config: Config = Config(bucket, prefix, source = source)
implicit private val logInfo: Int => String => Unit = l => m => ()
implicit private val logWarn: String => Unit = w => ()
private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _
private val fileToHash = (file: File) => MD5HashGenerator.md5File(file)
describe("multi-part uploader accepts") {
val uploader = new S3ClientMultiPartUploader(new MyAmazonS3Client {})
@ -22,20 +30,20 @@ class S3ClientMultiPartUploaderSuite
// small-file: dd if=/dev/urandom of=src/test/resources/net/kemitix/s3thorp/small-file bs=1047552 count=5
// 1047552 = 1024 * 1023
// file size 5kb under 5Mb threshold
val smallFile = aLocalFile("small-file", MD5Hash(""), source, fileToKey)
val smallFile = LocalFile.resolve("small-file", MD5Hash(""), source, fileToKey, fileToHash)
assert(smallFile.file.exists, "sample small file is missing")
assert(smallFile.file.length == 5 * 1024 * 1023, "sample small file is wrong size")
val result = uploader.accepts(smallFile)
val result = uploader.accepts(smallFile)(config.multiPartThreshold)
assertResult(false)(result)
}
it("should accept big file") {
// big-file: dd if=/dev/urandom of=src/test/resources/net/kemitix/s3thorp/big-file bs=1049600 count=5
// 1049600 = 1024 * 1025
// file size 5kb over 5Mb threshold
val bigFile = aLocalFile("big-file", MD5Hash(""), source, fileToKey)
val bigFile = LocalFile.resolve("big-file", MD5Hash(""), source, fileToKey, fileToHash)
assert(bigFile.file.exists, "sample big file is missing")
assert(bigFile.file.length == 5 * 1024 * 1025, "sample big file is wrong size")
val result = uploader.accepts(bigFile)
val result = uploader.accepts(bigFile)(config.multiPartThreshold)
assertResult(true)(result)
}
}
@ -53,7 +61,7 @@ class S3ClientMultiPartUploaderSuite
}
describe("mulit-part uploader upload") {
val theFile = aLocalFile("big-file", MD5Hash(""), source, fileToKey)
val theFile = LocalFile.resolve("big-file", MD5Hash(""), source, fileToKey, fileToHash)
val progressListener = new UploadProgressListener(theFile)
val uploadId = "upload-id"
val createUploadResponse = new InitiateMultipartUploadResult()
@ -94,7 +102,7 @@ class S3ClientMultiPartUploaderSuite
// split -d -b $((5 * 1024 * 1025 / 2)) big-file
// creates x00 and x01
// md5sum x0[01]
val result = uploader.parts(theFile, createUploadResponse).unsafeRunSync.toList
val result = uploader.parts(bucket, theFile, createUploadResponse, config.multiPartThreshold).unsafeRunSync.toList
it("should create two parts") {
assertResult(2)(result.size)
}
@ -110,7 +118,7 @@ class S3ClientMultiPartUploaderSuite
describe("upload part") {
it("should uploadPart") {
val expected = uploadPartResponse3
val result = uploader.uploadPart(theFile)(config)(uploadPartRequest3).unsafeRunSync
val result = uploader.uploadPart(theFile)(logInfo, logWarn)(uploadPartRequest3).unsafeRunSync
assertResult(expected)(result)
}
}
@ -147,7 +155,7 @@ class S3ClientMultiPartUploaderSuite
}
}
describe("create abort request") {
val abortRequest = uploader.createAbortRequest(uploadId, theFile)
val abortRequest = uploader.createAbortRequest(uploadId, bucket, theFile)
it("should have the upload id") {
assertResult(uploadId)(abortRequest.getUploadId)
}
@ -168,7 +176,7 @@ class S3ClientMultiPartUploaderSuite
describe("upload") {
describe("when all okay") {
val uploader = new RecordingMultiPartUploader()
uploader.upload(theFile, config.bucket, progressListener, 1).unsafeRunSync
invoke(uploader, theFile, progressListener)
it("should initiate the upload") {
assert(uploader.initiated.get)
}
@ -181,7 +189,7 @@ class S3ClientMultiPartUploaderSuite
}
describe("when initiate upload fails") {
val uploader = new RecordingMultiPartUploader(initOkay = false)
uploader.upload(theFile, config.bucket, progressListener, 1).unsafeRunSync
invoke(uploader, theFile, progressListener)
it("should not upload any parts") {
assertResult(Set())(uploader.partsUploaded.get)
}
@ -191,7 +199,7 @@ class S3ClientMultiPartUploaderSuite
}
describe("when uploading a part fails once") {
val uploader = new RecordingMultiPartUploader(partTriesRequired = 2)
uploader.upload(theFile, config.bucket, progressListener, 1).unsafeRunSync
invoke(uploader, theFile, progressListener)
it("should initiate the upload") {
assert(uploader.initiated.get)
}
@ -204,7 +212,7 @@ class S3ClientMultiPartUploaderSuite
}
describe("when uploading a part fails too many times") {
val uploader = new RecordingMultiPartUploader(partTriesRequired = 4)
uploader.upload(theFile, config.bucket, progressListener, 1).unsafeRunSync
invoke(uploader, theFile, progressListener)
it("should initiate the upload") {
assert(uploader.initiated.get)
}
@ -277,4 +285,8 @@ class S3ClientMultiPartUploaderSuite
}
}) {}
}
private def invoke(uploader: S3ClientMultiPartUploader, theFile: LocalFile, progressListener: UploadProgressListener) = {
uploader.upload(theFile, bucket, progressListener, config.multiPartThreshold, 1, config.maxRetries).unsafeRunSync
}
}

View file

@ -1,33 +1,37 @@
package net.kemitix.s3thorp.awssdk
package net.kemitix.s3thorp.aws.lib
import java.io.File
import java.time.Instant
import cats.effect.IO
import com.amazonaws.services.s3.model
import com.amazonaws.services.s3.model.PutObjectResult
import com.amazonaws.services.s3.transfer.{TransferManager, TransferManagerBuilder}
import com.amazonaws.services.s3.model.{PutObjectRequest, PutObjectResult}
import com.amazonaws.services.s3.transfer.TransferManagerBuilder
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
import net.kemitix.s3thorp._
import software.amazon.awssdk.services.s3.model.{PutObjectRequest, PutObjectResponse}
import net.kemitix.s3thorp.aws.api.S3Action.UploadS3Action
import net.kemitix.s3thorp.aws.api.{S3Client, UploadProgressListener}
import net.kemitix.s3thorp.aws.lib.{JavaClientWrapper, ThorpS3Client}
import net.kemitix.s3thorp.core.{KeyGenerator, MD5HashGenerator, Resource, S3MetaDataEnricher}
import net.kemitix.s3thorp.domain._
import org.scalatest.FunSpec
class S3ClientSuite
extends UnitTest
with KeyGenerator {
extends FunSpec {
val source = Resource(this, "../upload")
val source = Resource(this, "upload")
private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
private val fileToKey = generateKey(config.source, config.prefix) _
implicit private val logInfo: Int => String => Unit = l => m => ()
implicit private val logWarn: String => Unit = w => ()
private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _
private val fileToHash = (file: File) => MD5HashGenerator.md5File(file)
describe("getS3Status") {
val hash = MD5Hash("hash")
val localFile = aLocalFile("the-file", hash, source, fileToKey)
val localFile = LocalFile.resolve("the-file", hash, source, fileToKey, fileToHash)
val key = localFile.remoteKey
val keyotherkey = aLocalFile("other-key-same-hash", hash, source, fileToKey)
val keyotherkey = LocalFile.resolve("other-key-same-hash", hash, source, fileToKey, fileToHash)
val diffhash = MD5Hash("diff")
val keydiffhash = aLocalFile("other-key-diff-hash", diffhash, source, fileToKey)
val keydiffhash = LocalFile.resolve("other-key-diff-hash", diffhash, source, fileToKey, fileToHash)
val lastModified = LastModified(Instant.now)
val s3ObjectsData: S3ObjectsData = S3ObjectsData(
byHash = Map(
@ -39,11 +43,11 @@ class S3ClientSuite
keydiffhash.remoteKey -> HashModified(diffhash, lastModified)))
def invoke(self: S3Client, localFile: LocalFile) = {
self.getS3Status(localFile)(s3ObjectsData)
S3MetaDataEnricher.getS3Status(localFile, s3ObjectsData)
}
describe("when remote key exists") {
val s3Client = S3Client.defaultClient
val s3Client = S3ClientBuilder.defaultClient
it("should return (Some, Set.nonEmpty)") {
assertResult(
(Some(HashModified(hash, lastModified)),
@ -55,9 +59,9 @@ class S3ClientSuite
}
describe("when remote key does not exist and no others matches hash") {
val s3Client = S3Client.defaultClient
val s3Client = S3ClientBuilder.defaultClient
it("should return (None, Set.empty)") {
val localFile = aLocalFile("missing-file", MD5Hash("unique"), source, fileToKey)
val localFile = LocalFile.resolve("missing-file", MD5Hash("unique"), source, fileToKey, fileToHash)
assertResult(
(None,
Set.empty)
@ -66,7 +70,7 @@ class S3ClientSuite
}
describe("when remote key exists and no others match hash") {
val s3Client = S3Client.defaultClient
val s3Client = S3ClientBuilder.defaultClient
it("should return (None, Set.nonEmpty)") {
assertResult(
(Some(HashModified(diffhash, lastModified)),
@ -79,12 +83,12 @@ class S3ClientSuite
describe("upload") {
def invoke(s3Client: ThorpS3Client, localFile: LocalFile, bucket: Bucket, progressListener: UploadProgressListener) =
s3Client.upload(localFile, bucket, progressListener, 1).unsafeRunSync
s3Client.upload(localFile, bucket, progressListener, config.multiPartThreshold, 1, config.maxRetries).unsafeRunSync
describe("when uploading a file") {
val source = Resource(this, "../upload")
val md5Hash = new MD5HashGenerator {}.md5File(source.toPath.resolve("root-file").toFile)
val source = Resource(this, "upload")
val md5Hash = MD5HashGenerator.md5File(source.toPath.resolve("root-file").toFile)
val amazonS3 = new MyAmazonS3 {
override def putObject(putObjectRequest: model.PutObjectRequest): PutObjectResult = {
override def putObject(putObjectRequest: PutObjectRequest): PutObjectResult = {
val result = new PutObjectResult
result.setETag(md5Hash.hash)
result
@ -97,7 +101,7 @@ class S3ClientSuite
// IO(PutObjectResponse.builder().eTag(md5Hash.hash).build())
}, amazonS3, amazonS3TransferManager)
val prefix = RemoteKey("prefix")
val localFile: LocalFile = aLocalFile("root-file", md5Hash, source, generateKey(source, prefix))
val localFile: LocalFile = LocalFile.resolve("root-file", md5Hash, source, KeyGenerator.generateKey(source, prefix), fileToHash)
val bucket: Bucket = Bucket("a-bucket")
val remoteKey: RemoteKey = RemoteKey("prefix/root-file")
val progressListener = new UploadProgressListener(localFile)

View file

@ -1,11 +1,13 @@
package net.kemitix.s3thorp.awssdk
package net.kemitix.s3thorp.aws.lib
import java.time.Instant
import net.kemitix.s3thorp.{KeyModified, LastModified, MD5Hash, RemoteKey, UnitTest}
import net.kemitix.s3thorp.aws.lib.S3ObjectsByHash
import net.kemitix.s3thorp.domain.{KeyModified, LastModified, MD5Hash, RemoteKey}
import org.scalatest.FunSpec
import software.amazon.awssdk.services.s3.model.S3Object
class S3ObjectsByHashSuite extends UnitTest {
class S3ObjectsByHashSuite extends FunSpec {
new S3ObjectsByHash {
describe("grouping s3 object together by their hash values") {

View file

@ -1,67 +1,40 @@
package net.kemitix.s3thorp
package net.kemitix.s3thorp.aws.lib
import java.io.{File, InputStream}
import java.net.URL
import java.io.File
import java.time.Instant
import java.util
import java.util.Date
import java.util.concurrent.CompletableFuture
import cats.effect.IO
import com.amazonaws.regions.Region
import com.amazonaws.services.s3.model._
import com.amazonaws.services.s3.model.analytics.AnalyticsConfiguration
import com.amazonaws.services.s3.model.inventory.InventoryConfiguration
import com.amazonaws.services.s3.model.metrics.MetricsConfiguration
import com.amazonaws.services.s3.model.{PutObjectRequest, PutObjectResult}
import com.amazonaws.services.s3.transfer.TransferManagerBuilder
import com.amazonaws.services.s3.waiters.AmazonS3Waiters
import com.amazonaws.services.s3.{AmazonS3, S3ClientOptions, S3ResponseMetadata, model}
import com.amazonaws.{AmazonWebServiceRequest, HttpMethod}
import com.github.j5ik2o.reactive.aws.s3.S3AsyncClient
import net.kemitix.s3thorp.awssdk.{MyAmazonS3, S3Client, S3ObjectsData, UploadProgressListener}
import net.kemitix.s3thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action, UploadS3Action}
import net.kemitix.s3thorp.aws.api.{S3Client, UploadProgressListener}
import net.kemitix.s3thorp.core.{KeyGenerator, MD5HashGenerator, Resource, Sync}
import net.kemitix.s3thorp.domain._
import org.scalatest.FunSpec
import software.amazon.awssdk.services.s3
import software.amazon.awssdk.services.s3.model.{ListObjectsV2Request, ListObjectsV2Response}
import software.amazon.awssdk.services.s3.{S3AsyncClient => JavaS3AsyncClient}
class SyncSuite
extends UnitTest
with KeyGenerator {
extends FunSpec {
private val source = Resource(this, "upload")
private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
implicit private val logInfo: Int => String => Unit = l => i => ()
implicit private val logWarn: String => Unit = w => ()
def logError: String => Unit = e => ()
private val lastModified = LastModified(Instant.now)
val fileToKey: File => RemoteKey = generateKey(source, prefix)
val fileToKey: File => RemoteKey = KeyGenerator.generateKey(source, prefix)
val fileToHash = (file: File) => MD5HashGenerator.md5File(file)
val rootHash = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e")
val leafHash = MD5Hash("208386a650bdec61cfcd7bd8dcb6b542")
val rootFile = aLocalFile("root-file", rootHash, source, fileToKey)
val leafFile = aLocalFile("subdir/leaf-file", leafHash, source, fileToKey)
val rootFile = LocalFile.resolve("root-file", rootHash, source, fileToKey, fileToHash)
val leafFile = LocalFile.resolve("subdir/leaf-file", leafHash, source, fileToKey, fileToHash)
describe("s3client thunk") {
val testBucket = Bucket("bucket")
val prefix = RemoteKey("prefix")
val source = new File("/")
describe("upload") {
val md5Hash = MD5Hash("the-hash")
val testLocalFile = aLocalFile("file", md5Hash, source, generateKey(source, prefix))
val progressListener = new UploadProgressListener(testLocalFile)
val sync = new Sync(new S3Client with DummyS3Client {
override def upload(localFile: LocalFile,
bucket: Bucket,
progressListener: UploadProgressListener,
tryCount: Int)
(implicit c: Config) = IO {
assert(bucket == testBucket)
UploadS3Action(localFile.remoteKey, md5Hash)
}
})
it("delegates unmodified to the S3Client") {
assertResult(UploadS3Action(RemoteKey(prefix.key + "/file"), md5Hash))(
sync.upload(testLocalFile, testBucket, progressListener, 1).
unsafeRunSync())
}
}
}
val md5HashGenerator: File => MD5Hash = file => MD5HashGenerator.md5File(file)
def putObjectRequest(bucket: Bucket, remoteKey: RemoteKey, localFile: LocalFile) = {
(bucket.name, remoteKey.key, localFile.file)
@ -69,30 +42,29 @@ class SyncSuite
describe("run") {
val testBucket = Bucket("bucket")
val source = Resource(this, "upload")
// source contains the files root-file and subdir/leaf-file
val config = Config(Bucket("bucket"), RemoteKey("prefix"), source = source)
val rootRemoteKey = RemoteKey("prefix/root-file")
val leafRemoteKey = RemoteKey("prefix/subdir/leaf-file")
describe("when all files should be uploaded") {
val sync = new RecordingSync(testBucket, new DummyS3Client {}, S3ObjectsData(
val s3Client = new RecordingClient(testBucket, S3ObjectsData(
byHash = Map(),
byKey = Map()))
sync.run(config).unsafeRunSync
Sync.run(s3Client, md5HashGenerator, logInfo, logWarn, logError)(config).unsafeRunSync
it("uploads all files") {
val expectedUploads = Map(
"subdir/leaf-file" -> leafRemoteKey,
"root-file" -> rootRemoteKey
)
assertResult(expectedUploads)(sync.uploadsRecord)
assertResult(expectedUploads)(s3Client.uploadsRecord)
}
it("copies nothing") {
val expectedCopies = Map()
assertResult(expectedCopies)(sync.copiesRecord)
assertResult(expectedCopies)(s3Client.copiesRecord)
}
it("deletes nothing") {
val expectedDeletions = Set()
assertResult(expectedDeletions)(sync.deletionsRecord)
assertResult(expectedDeletions)(s3Client.deletionsRecord)
}
}
describe("when no files should be uploaded") {
@ -103,19 +75,19 @@ class SyncSuite
byKey = Map(
RemoteKey("prefix/root-file") -> HashModified(rootHash, lastModified),
RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified)))
val sync = new RecordingSync(testBucket, new DummyS3Client {}, s3ObjectsData)
sync.run(config).unsafeRunSync
val s3Client = new RecordingClient(testBucket, s3ObjectsData)
Sync.run(s3Client, md5HashGenerator, logInfo, logWarn, logError)(config).unsafeRunSync
it("uploads nothing") {
val expectedUploads = Map()
assertResult(expectedUploads)(sync.uploadsRecord)
assertResult(expectedUploads)(s3Client.uploadsRecord)
}
it("copies nothing") {
val expectedCopies = Map()
assertResult(expectedCopies)(sync.copiesRecord)
assertResult(expectedCopies)(s3Client.copiesRecord)
}
it("deletes nothing") {
val expectedDeletions = Set()
assertResult(expectedDeletions)(sync.deletionsRecord)
assertResult(expectedDeletions)(s3Client.deletionsRecord)
}
}
describe("when a file is renamed it is moved on S3 with no upload") {
@ -129,19 +101,19 @@ class SyncSuite
byKey = Map(
RemoteKey("prefix/root-file-old") -> HashModified(rootHash, lastModified),
RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified)))
val sync = new RecordingSync(testBucket, new DummyS3Client {}, s3ObjectsData)
sync.run(config).unsafeRunSync
val s3Client = new RecordingClient(testBucket, s3ObjectsData)
Sync.run(s3Client, md5HashGenerator, logInfo, logWarn, logError)(config).unsafeRunSync
it("uploads nothing") {
val expectedUploads = Map()
assertResult(expectedUploads)(sync.uploadsRecord)
assertResult(expectedUploads)(s3Client.uploadsRecord)
}
it("copies the file") {
val expectedCopies = Map(RemoteKey("prefix/root-file-old") -> RemoteKey("prefix/root-file"))
assertResult(expectedCopies)(sync.copiesRecord)
assertResult(expectedCopies)(s3Client.copiesRecord)
}
it("deletes the original") {
val expectedDeletions = Set(RemoteKey("prefix/root-file-old"))
assertResult(expectedDeletions)(sync.deletionsRecord)
assertResult(expectedDeletions)(s3Client.deletionsRecord)
}
}
describe("when a file is copied it is copied on S3 with no upload") {
@ -157,11 +129,11 @@ class SyncSuite
deletedHash -> Set(KeyModified(RemoteKey("prefix/deleted-file"), lastModified))),
byKey = Map(
deletedKey -> HashModified(deletedHash, lastModified)))
val sync = new RecordingSync(testBucket, new DummyS3Client {}, s3ObjectsData)
sync.run(config).unsafeRunSync
val s3Client = new RecordingClient(testBucket, s3ObjectsData)
Sync.run(s3Client, md5HashGenerator, logInfo, logWarn, logError)(config).unsafeRunSync
it("deleted key") {
val expectedDeletions = Set(deletedKey)
assertResult(expectedDeletions)(sync.deletionsRecord)
assertResult(expectedDeletions)(s3Client.deletionsRecord)
}
}
describe("io actions execute") {
@ -169,9 +141,8 @@ class SyncSuite
val recordingS3Client = new RecordingS3Client
val transferManager = TransferManagerBuilder.standard
.withS3Client(recordingS3Client).build
val client = S3Client.createClient(recordingS3ClientLegacy, recordingS3Client, transferManager)
val sync = new Sync(client)
sync.run(config).unsafeRunSync
val s3Client: S3Client = S3ClientBuilder.createClient(recordingS3ClientLegacy, recordingS3Client, transferManager)
Sync.run(s3Client, md5HashGenerator, logInfo, logWarn, logError)(config).unsafeRunSync
it("invokes the underlying Java s3client") {
val expected = Set(
putObjectRequest(testBucket, rootRemoteKey, rootFile),
@ -183,31 +154,35 @@ class SyncSuite
}
describe("when a file is file is excluded") {
val configWithExclusion = config.copy(excludes = List(Exclude("leaf")))
val sync = new RecordingSync(testBucket, new DummyS3Client {}, S3ObjectsData(Map(), Map()))
sync.run(configWithExclusion).unsafeRunSync
val s3ObjectsData = S3ObjectsData(Map(), Map())
val s3Client = new RecordingClient(testBucket, s3ObjectsData)
Sync.run(s3Client, md5HashGenerator, logInfo, logWarn, logError)(configWithExclusion).unsafeRunSync
it("is not uploaded") {
val expectedUploads = Map(
"root-file" -> rootRemoteKey
)
assertResult(expectedUploads)(sync.uploadsRecord)
assertResult(expectedUploads)(s3Client.uploadsRecord)
}
}
}
class RecordingSync(testBucket: Bucket, s3Client: S3Client, s3ObjectsData: S3ObjectsData)
extends Sync(s3Client) {
class RecordingClient(testBucket: Bucket, s3ObjectsData: S3ObjectsData)
extends S3Client {
var uploadsRecord: Map[String, RemoteKey] = Map()
var copiesRecord: Map[RemoteKey, RemoteKey] = Map()
var deletionsRecord: Set[RemoteKey] = Set()
override def listObjects(bucket: Bucket, prefix: RemoteKey)(implicit c: Config) = IO {s3ObjectsData}
override def listObjects(bucket: Bucket, prefix: RemoteKey)(implicit info: Int => String => Unit) = IO {s3ObjectsData}
override def upload(localFile: LocalFile,
bucket: Bucket,
progressListener: UploadProgressListener,
tryCount: Int
)(implicit c: Config) = IO {
multiPartThreshold: Long,
tryCount: Int,
maxRetries: Int)
(implicit info: Int => String => Unit,
warn: String => Unit) = IO {
if (bucket == testBucket)
uploadsRecord += (localFile.relative.toString -> localFile.remoteKey)
UploadS3Action(localFile.remoteKey, MD5Hash("some hash value"))
@ -217,7 +192,7 @@ class SyncSuite
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey
)(implicit c: Config) = IO {
)(implicit info: Int => String => Unit) = IO {
if (bucket == testBucket)
copiesRecord += (sourceKey -> targetKey)
CopyS3Action(targetKey)
@ -225,7 +200,7 @@ class SyncSuite
override def delete(bucket: Bucket,
remoteKey: RemoteKey
)(implicit c: Config) = IO {
)(implicit info: Int => String => Unit) = IO {
if (bucket == testBucket)
deletionsRecord += remoteKey
DeleteS3Action(remoteKey)

View file

@ -1,15 +1,13 @@
package net.kemitix.s3thorp.awssdk
package net.kemitix.s3thorp.aws.lib
import java.time.Instant
import cats.effect.IO
import com.amazonaws.services.s3.model.{PutObjectRequest, PutObjectResult}
import com.amazonaws.services.s3.transfer.{TransferManager, TransferManagerBuilder}
import com.github.j5ik2o.reactive.aws.s3.S3AsyncClient
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
import net.kemitix.s3thorp._
import com.amazonaws.services.s3.transfer.TransferManagerBuilder
import net.kemitix.s3thorp.aws.lib.ThorpS3Client
import net.kemitix.s3thorp.core.Resource
import net.kemitix.s3thorp.domain._
import org.scalatest.FunSpec
import software.amazon.awssdk.services.s3
import software.amazon.awssdk.services.s3.model.{ListObjectsV2Request, ListObjectsV2Response, S3Object}
import scala.collection.JavaConverters._
@ -17,9 +15,10 @@ import scala.collection.JavaConverters._
class ThorpS3ClientSuite extends FunSpec {
describe("listObjectsInPrefix") {
val source = Resource(Main, "upload")
val source = Resource(this, "upload")
val prefix = RemoteKey("prefix")
implicit val config: Config = Config(Bucket("bucket"), prefix, source = source)
implicit val logInfo: Int => String => Unit = l => m => ()
val lm = LastModified(Instant.now)

View file

@ -1,27 +1,37 @@
name := "s3thorp"
version := "0.1"
val applicationSettings = Seq(
name := "s3thorp",
version := "0.1",
scalaVersion := "2.12.8"
// command line arguments parser
libraryDependencies += "com.github.scopt" %% "scopt" % "4.0.0-RC2"
// AWS SDK
)
val testDependencies = Seq(
libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % "3.0.7" % "test"
)
)
val commandLineParsing = Seq(
libraryDependencies ++= Seq(
"com.github.scopt" %% "scopt" % "4.0.0-RC2"
)
)
val awsSdkDependencies = Seq(
libraryDependencies ++= Seq(
/// wraps the in-preview Java SDK V2 which is incomplete and doesn't support multi-part uploads
libraryDependencies += "com.github.j5ik2o" %% "reactive-aws-s3-core" % "1.1.3"
libraryDependencies += "com.github.j5ik2o" %% "reactive-aws-s3-cats" % "1.1.3"
"com.github.j5ik2o" %% "reactive-aws-s3-core" % "1.1.3",
"com.github.j5ik2o" %% "reactive-aws-s3-cats" % "1.1.3",
// AWS SDK - multi-part upload
libraryDependencies += "com.amazonaws" % "aws-java-sdk-s3" % "1.11.564"
// Logging
libraryDependencies += "com.typesafe.scala-logging" %% "scala-logging" % "3.9.2"
libraryDependencies += "org.slf4j" % "slf4j-log4j12" % "1.7.26"
// testing
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.7" % "test"
"com.amazonaws" % "aws-java-sdk-s3" % "1.11.564",
)
)
val loggingSettings = Seq(
libraryDependencies ++= Seq(
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.2",
"org.slf4j" % "slf4j-log4j12" % "1.7.26",
)
)
val catsEffectsSettings = Seq(
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-effect" % "1.2.0"
),
// recommended for cats-effects
scalacOptions ++= Seq(
"-feature",
@ -30,4 +40,29 @@ scalacOptions ++= Seq(
"-language:postfixOps",
"-language:higherKinds",
"-Ypartial-unification")
)
// cli -> aws-lib -> core -> aws-api -> domain
lazy val cli = (project in file("cli"))
.settings(applicationSettings)
.aggregate(`aws-lib`, core, `aws-api`, domain)
.settings(loggingSettings)
.settings(commandLineParsing)
.dependsOn(`aws-lib`)
lazy val `aws-lib` = (project in file("aws-lib"))
.settings(awsSdkDependencies)
.settings(testDependencies)
.dependsOn(core)
lazy val core = (project in file("core"))
.settings(testDependencies)
.dependsOn(`aws-api`)
lazy val `aws-api` = (project in file("aws-api"))
.settings(catsEffectsSettings)
.dependsOn(domain)
lazy val domain = (project in file("domain"))
.settings(testDependencies)

View file

@ -0,0 +1,14 @@
package net.kemitix.s3thorp.cli
import com.typesafe.scalalogging.LazyLogging
import net.kemitix.s3thorp.domain.Config
class Logger(verbosity: Int) extends LazyLogging {
def info(level: Int)(message: String): Unit = if (verbosity >= level) logger.info(s"1:$message")
def warn(message: String): Unit = logger.warn(message)
def error(message: String): Unit = logger.error(message)
}

View file

@ -0,0 +1,43 @@
package net.kemitix.s3thorp.cli
import java.io.File
import java.nio.file.Paths
import cats.effect.ExitCase.{Canceled, Completed, Error}
import cats.effect.{ExitCode, IO, IOApp}
import net.kemitix.s3thorp.core.MD5HashGenerator.md5File
import net.kemitix.s3thorp.aws.lib.S3ClientBuilder
import net.kemitix.s3thorp.core.Sync
import net.kemitix.s3thorp.domain.Config
object Main extends IOApp {
val defaultConfig: Config =
Config(source = Paths.get(".").toFile)
def program(args: List[String]): IO[ExitCode] =
for {
config <- ParseArgs(args, defaultConfig)
logger = new Logger(config.verbose)
info = (l: Int) => (m: String) => logger.info(l)(m)
md5HashGenerator = (file: File) => md5File(file)(info)
_ <- IO(logger.info(1)("S3Thorp - hashed sync for s3"))
_ <- Sync.run(
S3ClientBuilder.defaultClient,
md5HashGenerator,
l => i => logger.info(l)(i),
w => logger.warn(w),
e => logger.error(e))(config)
} yield ExitCode.Success
override def run(args: List[String]): IO[ExitCode] = {
val logger = new Logger(1)
program(args)
.guaranteeCase {
case Canceled => IO(logger.warn("Interrupted"))
case Error(e) => IO(logger.error(e.getMessage))
case Completed => IO(logger.info(1)("Done"))
}
}
}

View file

@ -1,8 +1,10 @@
package net.kemitix.s3thorp
package net.kemitix.s3thorp.cli
import java.nio.file.Paths
import cats.effect.IO
import net.kemitix.s3thorp._
import net.kemitix.s3thorp.domain.{Bucket, Config, Exclude, Filter, RemoteKey}
import scopt.OParser
import scopt.OParser.{builder, parse, sequence}

View file

@ -0,0 +1,24 @@
package net.kemitix.s3thorp.core
import net.kemitix.s3thorp.domain.{Bucket, LocalFile, MD5Hash, RemoteKey}
sealed trait Action {
def bucket: Bucket
}
object Action {
final case class DoNothing(bucket: Bucket,
remoteKey: RemoteKey) extends Action
final case class ToUpload(bucket: Bucket,
localFile: LocalFile) extends Action
final case class ToCopy(bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey) extends Action
final case class ToDelete(bucket: Bucket,
remoteKey: RemoteKey) extends Action
}

View file

@ -1,7 +1,9 @@
package net.kemitix.s3thorp
package net.kemitix.s3thorp.core
trait ActionGenerator
extends Logging {
import net.kemitix.s3thorp.core.Action.{DoNothing, ToCopy, ToUpload}
import net.kemitix.s3thorp.domain._
object ActionGenerator {
def createActions(s3MetaData: S3MetaData)
(implicit c: Config): Stream[Action] =
@ -10,43 +12,47 @@ trait ActionGenerator
// #1 local exists, remote exists, remote matches - do nothing
case S3MetaData(localFile, _, Some(RemoteMetaData(remoteKey, remoteHash, _)))
if localFile.hash == remoteHash
=> doNothing(remoteKey)
=> doNothing(c.bucket, remoteKey)
// #2 local exists, remote is missing, other matches - copy
case S3MetaData(localFile, otherMatches, None)
if otherMatches.nonEmpty
=> copyFile(localFile, otherMatches)
=> copyFile(c.bucket, localFile, otherMatches)
// #3 local exists, remote is missing, other no matches - upload
case S3MetaData(localFile, otherMatches, None)
if otherMatches.isEmpty
=> uploadFile(localFile)
=> uploadFile(c.bucket, localFile)
// #4 local exists, remote exists, remote no match, other matches - copy
case S3MetaData(localFile, otherMatches, Some(RemoteMetaData(_, remoteHash, _)))
if localFile.hash != remoteHash &&
otherMatches.nonEmpty
=> copyFile(localFile, otherMatches)
=> copyFile(c.bucket, localFile, otherMatches)
// #5 local exists, remote exists, remote no match, other no matches - upload
case S3MetaData(localFile, hashMatches, Some(_))
if hashMatches.isEmpty
=> uploadFile(localFile)
=> uploadFile(c.bucket, localFile)
}
private def doNothing(remoteKey: RemoteKey) =
private def doNothing(bucket: Bucket,
remoteKey: RemoteKey) =
Stream(
DoNothing(remoteKey))
DoNothing(bucket, remoteKey))
private def uploadFile(localFile: LocalFile) =
private def uploadFile(bucket: Bucket,
localFile: LocalFile) =
Stream(
ToUpload(localFile))
ToUpload(bucket, localFile))
private def copyFile(localFile: LocalFile,
private def copyFile(bucket: Bucket,
localFile: LocalFile,
matchByHash: Set[RemoteMetaData]): Stream[Action] =
Stream(
ToCopy(
bucket,
sourceKey = matchByHash.head.remoteKey,
hash = localFile.hash,
targetKey = localFile.remoteKey))

View file

@ -0,0 +1,31 @@
package net.kemitix.s3thorp.core
import cats.effect.IO
import net.kemitix.s3thorp.aws.api.S3Action.DoNothingS3Action
import net.kemitix.s3thorp.aws.api.{S3Action, S3Client, UploadProgressListener}
import net.kemitix.s3thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.s3thorp.domain.Config
object ActionSubmitter {
def submitAction(s3Client: S3Client, action: Action)
(implicit c: Config,
info: Int => String => Unit,
warn: String => Unit): Stream[IO[S3Action]] = {
Stream(
action match {
case ToUpload(bucket, localFile) =>
info(4)(s" Upload: ${localFile.relative}")
val progressListener = new UploadProgressListener(localFile)
s3Client.upload(localFile, bucket, progressListener, c.multiPartThreshold, 1, c.maxRetries)
case ToCopy(bucket, sourceKey, hash, targetKey) =>
info(4)(s" Copy: ${sourceKey.key} => ${targetKey.key}")
s3Client.copy(bucket, sourceKey, hash, targetKey)
case ToDelete(bucket, remoteKey) =>
info(4)(s" Delete: ${remoteKey.key}")
s3Client.delete(bucket, remoteKey)
case DoNothing(bucket, remoteKey) => IO {
DoNothingS3Action(remoteKey)}
})
}
}

View file

@ -1,5 +1,4 @@
package net.kemitix.s3thorp
package net.kemitix.s3thorp.core
final case class Counters(uploaded: Int = 0,
deleted: Int = 0,

View file

@ -1,8 +1,10 @@
package net.kemitix.s3thorp
package net.kemitix.s3thorp.core
import java.io.File
trait KeyGenerator {
import net.kemitix.s3thorp.domain.RemoteKey
object KeyGenerator {
def generateKey(source: File, prefix: RemoteKey)
(file: File): RemoteKey = {

View file

@ -0,0 +1,36 @@
package net.kemitix.s3thorp.core
import java.io.File
import net.kemitix.s3thorp.core.KeyGenerator.generateKey
import net.kemitix.s3thorp.domain.{Config, LocalFile, MD5Hash}
object LocalFileStream {
def findFiles(file: File,
md5HashGenerator: File => MD5Hash,
info: Int => String => Unit)
(implicit c: Config): Stream[LocalFile] = {
def loop(file: File): Stream[LocalFile] = {
info(2)(s"- Entering: $file")
val files = for {
f <- dirPaths(file)
.filter { f => f.isDirectory || c.filters.forall { filter => filter isIncluded f.toPath } }
.filter { f => c.excludes.forall { exclude => exclude isIncluded f.toPath } }
fs <- recurseIntoSubDirectories(f)
} yield fs
info(5)(s"- Leaving: $file")
files
}
def dirPaths(file: File): Stream[File] =
Option(file.listFiles)
.getOrElse(throw new IllegalArgumentException(s"Directory not found $file")).toStream
def recurseIntoSubDirectories(file: File)(implicit c: Config): Stream[LocalFile] =
if (file.isDirectory) loop(file)
else Stream(LocalFile(file, c.source, generateKey(c.source, c.prefix), md5HashGenerator))
loop(file)
}
}

View file

@ -1,13 +1,14 @@
package net.kemitix.s3thorp
package net.kemitix.s3thorp.core
import java.io.{File, FileInputStream}
import java.security.{DigestInputStream, MessageDigest}
import java.security.MessageDigest
trait MD5HashGenerator
extends Logging {
import net.kemitix.s3thorp.domain.MD5Hash
object MD5HashGenerator {
def md5File(file: File)
(implicit c: Config): MD5Hash = {
(implicit info: Int => String => Unit): MD5Hash = {
val hash = md5FilePart(file, 0, file.length)
hash
}
@ -15,14 +16,14 @@ trait MD5HashGenerator
def md5FilePart(file: File,
offset: Long,
size: Long)
(implicit c: Config): MD5Hash = {
log5(s"md5:reading:offset $offset:size $size:$file")
(implicit info: Int => String => Unit): MD5Hash = {
info(5)(s"md5:reading:offset $offset:size $size:$file")
val fis = new FileInputStream(file)
fis skip offset
val buffer = new Array[Byte](size.toInt)
fis read buffer
val hash = md5PartBody(buffer)
log5(s"md5:generated:${hash.hash}")
info(5)(s"md5:generated:${hash.hash}")
hash
}

View file

@ -1,4 +1,4 @@
package net.kemitix.s3thorp
package net.kemitix.s3thorp.core
import java.io.{File, FileNotFoundException}
@ -8,7 +8,8 @@ object Resource {
def apply(base: AnyRef,
name: String): File = {
Try(new File(base.getClass.getResource(name).getPath))
.getOrElse(throw new FileNotFoundException(name))
Try{
new File(base.getClass.getResource(name).getPath)
}.getOrElse(throw new FileNotFoundException(name))
}
}

View file

@ -0,0 +1,24 @@
package net.kemitix.s3thorp.core
import net.kemitix.s3thorp.domain._
object S3MetaDataEnricher {
def getMetadata(localFile: LocalFile,
s3ObjectsData: S3ObjectsData)
(implicit c: Config): Stream[S3MetaData] = {
val (keyMatches, hashMatches) = getS3Status(localFile, s3ObjectsData)
Stream(
S3MetaData(localFile,
matchByKey = keyMatches map { hm => RemoteMetaData(localFile.remoteKey, hm.hash, hm.modified) },
matchByHash = hashMatches map { km => RemoteMetaData(km.key, localFile.hash, km.modified) }))
}
def getS3Status(localFile: LocalFile,
s3ObjectsData: S3ObjectsData): (Option[HashModified], Set[KeyModified]) = {
val matchingByKey = s3ObjectsData.byKey.get(localFile.remoteKey)
val matchingByHash = s3ObjectsData.byHash.getOrElse(localFile.hash, Set())
(matchingByKey, matchingByHash)
}
}

View file

@ -0,0 +1,54 @@
package net.kemitix.s3thorp.core
import java.io.File
import cats.effect.IO
import cats.implicits._
import net.kemitix.s3thorp.aws.api.S3Client
import net.kemitix.s3thorp.core.Action.ToDelete
import net.kemitix.s3thorp.core.ActionGenerator.createActions
import net.kemitix.s3thorp.core.ActionSubmitter.submitAction
import net.kemitix.s3thorp.core.LocalFileStream.findFiles
import net.kemitix.s3thorp.core.S3MetaDataEnricher.getMetadata
import net.kemitix.s3thorp.core.SyncLogging.{logFileScan, logRunFinished, logRunStart}
import net.kemitix.s3thorp.domain.{Config, MD5Hash, S3ObjectsData}
object Sync {
def run(s3Client: S3Client,
md5HashGenerator: File => MD5Hash,
info: Int => String => Unit,
warn: String => Unit,
error: String => Unit)
(implicit c: Config): IO[Unit] = {
def copyUploadActions(s3Data: S3ObjectsData) = {
for {actions <- {
for {
file <- findFiles(c.source, md5HashGenerator, info)
data <- getMetadata(file, s3Data)
action <- createActions(data)
s3Action <- submitAction(s3Client, action)(c, info, warn)
} yield s3Action
}.sequence
} yield actions.sorted
}
def deleteActions(s3ObjectsData: S3ObjectsData) = {
(for {
key <- s3ObjectsData.byKey.keys
if key.isMissingLocally(c.source, c.prefix)
ioDelAction <- submitAction(s3Client, ToDelete(c.bucket, key))(c, info, warn)
} yield ioDelAction).toStream.sequence
}
for {
_ <- logRunStart(info)
s3data <- s3Client.listObjects(c.bucket, c.prefix)(info)
_ <- logFileScan(info)
copyUploadActions <- copyUploadActions(s3data)
deleteAction <- deleteActions(s3data)
_ <- logRunFinished(copyUploadActions ++ deleteAction, info)
} yield ()
}
}

View file

@ -0,0 +1,41 @@
package net.kemitix.s3thorp.core
import cats.effect.IO
import net.kemitix.s3thorp.aws.api.S3Action
import net.kemitix.s3thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action, UploadS3Action}
import net.kemitix.s3thorp.domain.Config
// Logging for the Sync class
object SyncLogging {
def logRunStart[F[_]](info: Int => String => Unit)(implicit c: Config): IO[Unit] = IO {
info(1)(s"Bucket: ${c.bucket.name}, Prefix: ${c.prefix.key}, Source: ${c.source}, " +
s"Filter: ${c.filters.map{ f => f.filter}.mkString(""", """)} " +
s"Exclude: ${c.excludes.map{ f => f.exclude}.mkString(""", """)}")}
def logFileScan(info: Int => String => Unit)(implicit c: Config): IO[Unit] = IO{
info(1)(s"Scanning local files: ${c.source}...")}
def logRunFinished(actions: Stream[S3Action],
info: Int => String => Unit)
(implicit c: Config): IO[Unit] = IO {
val counters = actions.foldLeft(Counters())(countActivities)
info(1)(s"Uploaded ${counters.uploaded} files")
info(1)(s"Copied ${counters.copied} files")
info(1)(s"Deleted ${counters.deleted} files")
}
private def countActivities(implicit c: Config): (Counters, S3Action) => Counters =
(counters: Counters, s3Action: S3Action) => {
s3Action match {
case UploadS3Action(remoteKey, _) =>
counters.copy(uploaded = counters.uploaded + 1)
case CopyS3Action(remoteKey) =>
counters.copy(copied = counters.copied + 1)
case DeleteS3Action(remoteKey) =>
counters.copy(deleted = counters.deleted + 1)
case _ => counters
}
}
}

View file

@ -0,0 +1 @@
This file is in the root directory of the upload tree.

View file

@ -0,0 +1 @@
This file is in the subdir folder within the upload tree.

View file

@ -1,70 +1,75 @@
package net.kemitix.s3thorp
package net.kemitix.s3thorp.core
import java.io.File
import java.time.Instant
import net.kemitix.s3thorp.core.Action.{DoNothing, ToCopy, ToUpload}
import net.kemitix.s3thorp.domain._
import org.scalatest.FunSpec
class ActionGeneratorSuite
extends UnitTest
with KeyGenerator {
extends FunSpec {
private val source = Resource(this, "upload")
private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
private val fileToKey = generateKey(config.source, config.prefix) _
private val bucket = Bucket("bucket")
implicit private val config: Config = Config(bucket, prefix, source = source)
implicit private val logInfo: Int => String => Unit = l => i => ()
private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _
private val fileToHash = (file: File) => MD5HashGenerator.md5File(file)
val lastModified = LastModified(Instant.now())
new ActionGenerator {
describe("create actions") {
def invoke(input: S3MetaData) = createActions(input).toList
def invoke(input: S3MetaData) = ActionGenerator.createActions(input).toList
describe("#1 local exists, remote exists, remote matches - do nothing") {
val theHash = MD5Hash("the-hash")
val theFile = aLocalFile("the-file", theHash, source, fileToKey)
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey, fileToHash)
val theRemoteMetadata = RemoteMetaData(theFile.remoteKey, theHash, lastModified)
val input = S3MetaData(theFile, // local exists
matchByHash = Set(theRemoteMetadata), // remote matches
matchByKey = Some(theRemoteMetadata) // remote exists
)
it("do nothing") {
val expected = List(DoNothing(theFile.remoteKey))
val expected = List(DoNothing(bucket, theFile.remoteKey))
val result = invoke(input)
assertResult(expected)(result)
}
}
describe("#2 local exists, remote is missing, other matches - copy") {
val theHash = MD5Hash("the-hash")
val theFile = aLocalFile("the-file", theHash, source, fileToKey)
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey, fileToHash)
val theRemoteKey = theFile.remoteKey
val otherRemoteKey = aRemoteKey(prefix, "other-key")
val otherRemoteKey = prefix.resolve("other-key")
val otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash, lastModified)
val input = S3MetaData(theFile, // local exists
matchByHash = Set(otherRemoteMetadata), // other matches
matchByKey = None) // remote is missing
it("copy from other key") {
val expected = List(ToCopy(otherRemoteKey, theHash, theRemoteKey)) // copy
val expected = List(ToCopy(bucket, otherRemoteKey, theHash, theRemoteKey)) // copy
val result = invoke(input)
assertResult(expected)(result)
}
}
describe("#3 local exists, remote is missing, other no matches - upload") {
val theHash = MD5Hash("the-hash")
val theFile = aLocalFile("the-file", theHash, source, fileToKey)
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey, fileToHash)
val input = S3MetaData(theFile, // local exists
matchByHash = Set.empty, // other no matches
matchByKey = None) // remote is missing
it("upload") {
val expected = List(ToUpload(theFile)) // upload
val expected = List(ToUpload(bucket, theFile)) // upload
val result = invoke(input)
assertResult(expected)(result)
}
}
describe("#4 local exists, remote exists, remote no match, other matches - copy") {
val theHash = MD5Hash("the-hash")
val theFile = aLocalFile("the-file", theHash, source, fileToKey)
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey, fileToHash)
val theRemoteKey = theFile.remoteKey
val oldHash = MD5Hash("old-hash")
val otherRemoteKey = aRemoteKey(prefix, "other-key")
val otherRemoteKey = prefix.resolve("other-key")
val otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash, lastModified)
val oldRemoteMetadata = RemoteMetaData(theRemoteKey,
hash = oldHash, // remote no match
@ -73,14 +78,14 @@ class ActionGeneratorSuite
matchByHash = Set(otherRemoteMetadata), // other matches
matchByKey = Some(oldRemoteMetadata)) // remote exists
it("copy from other key") {
val expected = List(ToCopy(otherRemoteKey, theHash, theRemoteKey)) // copy
val expected = List(ToCopy(bucket, otherRemoteKey, theHash, theRemoteKey)) // copy
val result = invoke(input)
assertResult(expected)(result)
}
}
describe("#5 local exists, remote exists, remote no match, other no matches - upload") {
val theHash = MD5Hash("the-hash")
val theFile = aLocalFile("the-file", theHash, source, fileToKey)
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey, fileToHash)
val theRemoteKey = theFile.remoteKey
val oldHash = MD5Hash("old-hash")
val theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash, lastModified)
@ -89,7 +94,7 @@ class ActionGeneratorSuite
matchByKey = Some(theRemoteMetadata) // remote exists
)
it("upload") {
val expected = List(ToUpload(theFile)) // upload
val expected = List(ToUpload(bucket, theFile)) // upload
val result = invoke(input)
assertResult(expected)(result)
}
@ -101,4 +106,3 @@ class ActionGeneratorSuite
}
}
}
}

View file

@ -1,16 +1,16 @@
package net.kemitix.s3thorp
package net.kemitix.s3thorp.core
import java.io.File
import net.kemitix.s3thorp.domain.{Bucket, Config, RemoteKey}
import org.scalatest.FunSpec
class KeyGeneratorSuite extends FunSpec {
new KeyGenerator {
private val source: File = Resource(this, "upload")
private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
private val fileToKey = generateKey(config.source, config.prefix) _
private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _
describe("key generator") {
def resolve(subdir: String): File = {
@ -31,5 +31,5 @@ class KeyGeneratorSuite extends FunSpec {
}
}
}
}
}

View file

@ -0,0 +1,23 @@
package net.kemitix.s3thorp.core
import java.io.File
import net.kemitix.s3thorp.domain.{Config, LocalFile, MD5Hash}
import org.scalatest.FunSpec
class LocalFileStreamSuite extends FunSpec {
val uploadResource = Resource(this, "upload")
val config: Config = Config(source = uploadResource)
implicit private val logInfo: Int => String => Unit = l => i => ()
val md5HashGenerator: File => MD5Hash = file => MD5HashGenerator.md5File(file)
describe("findFiles") {
it("should find all files") {
val result: Set[String] =
LocalFileStream.findFiles(uploadResource, md5HashGenerator, logInfo)(config).toSet
.map { x: LocalFile => x.relative.toString }
assertResult(Set("subdir/leaf-file", "root-file"))(result)
}
}
}

View file

@ -1,19 +1,22 @@
package net.kemitix.s3thorp
package net.kemitix.s3thorp.core
import java.nio.file.Files
class MD5HashGeneratorTest extends UnitTest {
import net.kemitix.s3thorp.domain.{Bucket, Config, MD5Hash, RemoteKey}
import org.scalatest.FunSpec
class MD5HashGeneratorTest extends FunSpec {
private val source = Resource(this, "upload")
private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
implicit private val logInfo: Int => String => Unit = l => i => ()
new MD5HashGenerator {
describe("read a small file (smaller than buffer)") {
val file = Resource(this, "upload/root-file")
it("should generate the correct hash") {
val expected = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e")
val result = md5File(file)
val result = MD5HashGenerator.md5File(file)
assertResult(expected)(result)
}
}
@ -22,7 +25,7 @@ class MD5HashGeneratorTest extends UnitTest {
val buffer: Array[Byte] = Files.readAllBytes(file.toPath)
it("should generate the correct hash") {
val expected = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e")
val result = md5PartBody(buffer)
val result = MD5HashGenerator.md5PartBody(buffer)
assertResult(expected)(result)
}
}
@ -30,7 +33,7 @@ class MD5HashGeneratorTest extends UnitTest {
val file = Resource(this, "big-file")
it("should generate the correct hash") {
val expected = MD5Hash("b1ab1f7680138e6db7309200584e35d8")
val result = md5File(file)
val result = MD5HashGenerator.md5File(file)
assertResult(expected)(result)
}
}
@ -41,18 +44,17 @@ class MD5HashGeneratorTest extends UnitTest {
describe("when starting at the beginning of the file") {
it("should generate the correct hash") {
val expected = MD5Hash("aadf0d266cefe0fcdb241a51798d74b3")
val result = md5FilePart(file, 0, halfFileLength)
val result = MD5HashGenerator.md5FilePart(file, 0, halfFileLength)
assertResult(expected)(result)
}
}
describe("when starting in the middle of the file") {
it("should generate the correct hash") {
val expected = MD5Hash("16e08d53ca36e729d808fd5e4f7e35dc")
val result = md5FilePart(file, halfFileLength, halfFileLength)
val result = MD5HashGenerator.md5FilePart(file, halfFileLength, halfFileLength)
assertResult(expected)(result)
}
}
}
}
}

View file

@ -1,6 +1,10 @@
package net.kemitix.s3thorp
package net.kemitix.s3thorp.core
class S3ActionSuite extends UnitTest {
import net.kemitix.s3thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action, UploadS3Action}
import net.kemitix.s3thorp.domain.{MD5Hash, RemoteKey}
import org.scalatest.FunSpec
class S3ActionSuite extends FunSpec {
describe("Ordering of types") {
val remoteKey = RemoteKey("remote-key")

View file

@ -1,27 +1,31 @@
package net.kemitix.s3thorp
package net.kemitix.s3thorp.core
import java.io.File
import java.time.Instant
import net.kemitix.s3thorp.awssdk.S3ObjectsData
import net.kemitix.s3thorp.aws.api.S3Client
import net.kemitix.s3thorp.core.S3MetaDataEnricher.{getMetadata, getS3Status}
import net.kemitix.s3thorp.domain._
import org.scalatest.FunSpec
class S3MetaDataEnricherSuite
extends UnitTest
with KeyGenerator {
extends FunSpec {
private val source = Resource(this, "upload")
private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
private val fileToKey = generateKey(config.source, config.prefix) _
implicit private val logInfo: Int => String => Unit = l => i => ()
private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _
private val fileToHash = (file: File) => MD5HashGenerator.md5File(file)
val lastModified = LastModified(Instant.now())
describe("enrich with metadata") {
new S3MetaDataEnricher with DummyS3Client {
describe("#1a local exists, remote exists, remote matches, other matches - do nothing") {
val theHash: MD5Hash = MD5Hash("the-file-hash")
val theFile: LocalFile = aLocalFile("the-file", theHash, source, fileToKey)
val theFile: LocalFile = LocalFile.resolve("the-file", theHash, source, fileToKey, fileToHash)
val theRemoteKey: RemoteKey = theFile.remoteKey
implicit val s3: S3ObjectsData = S3ObjectsData(
val s3: S3ObjectsData = S3ObjectsData(
byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))),
byKey = Map(theRemoteKey -> HashModified(theHash, lastModified))
)
@ -30,15 +34,15 @@ class S3MetaDataEnricherSuite
val expected = Stream(S3MetaData(theFile,
matchByHash = Set(theRemoteMetadata),
matchByKey = Some(theRemoteMetadata)))
val result = getMetadata(theFile)
val result = getMetadata(theFile, s3)
assertResult(expected)(result)
}
}
describe("#1b local exists, remote exists, remote matches, other no matches - do nothing") {
val theHash: MD5Hash = MD5Hash("the-file-hash")
val theFile: LocalFile = aLocalFile("the-file", theHash, source, fileToKey)
val theRemoteKey: RemoteKey = aRemoteKey(prefix, "the-file")
implicit val s3: S3ObjectsData = S3ObjectsData(
val theFile: LocalFile = LocalFile.resolve("the-file", theHash, source, fileToKey, fileToHash)
val theRemoteKey: RemoteKey = prefix.resolve("the-file")
val s3: S3ObjectsData = S3ObjectsData(
byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))),
byKey = Map(theRemoteKey -> HashModified(theHash, lastModified))
)
@ -47,15 +51,15 @@ class S3MetaDataEnricherSuite
val expected = Stream(S3MetaData(theFile,
matchByHash = Set(theRemoteMetadata),
matchByKey = Some(theRemoteMetadata)))
val result = getMetadata(theFile)
val result = getMetadata(theFile, s3)
assertResult(expected)(result)
}
}
describe("#2 local exists, remote is missing, remote no match, other matches - copy") {
val theHash = MD5Hash("the-hash")
val theFile = aLocalFile("the-file", theHash, source, fileToKey)
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey, fileToHash)
val otherRemoteKey = RemoteKey("other-key")
implicit val s3: S3ObjectsData = S3ObjectsData(
val s3: S3ObjectsData = S3ObjectsData(
byHash = Map(theHash -> Set(KeyModified(otherRemoteKey, lastModified))),
byKey = Map(otherRemoteKey -> HashModified(theHash, lastModified))
)
@ -64,14 +68,14 @@ class S3MetaDataEnricherSuite
val expected = Stream(S3MetaData(theFile,
matchByHash = Set(otherRemoteMetadata),
matchByKey = None))
val result = getMetadata(theFile)
val result = getMetadata(theFile, s3)
assertResult(expected)(result)
}
}
describe("#3 local exists, remote is missing, remote no match, other no matches - upload") {
val theHash = MD5Hash("the-hash")
val theFile = aLocalFile("the-file", theHash, source, fileToKey)
implicit val s3: S3ObjectsData = S3ObjectsData(
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey, fileToHash)
val s3: S3ObjectsData = S3ObjectsData(
byHash = Map(),
byKey = Map()
)
@ -79,17 +83,17 @@ class S3MetaDataEnricherSuite
val expected = Stream(S3MetaData(theFile,
matchByHash = Set.empty,
matchByKey = None))
val result = getMetadata(theFile)
val result = getMetadata(theFile, s3)
assertResult(expected)(result)
}
}
describe("#4 local exists, remote exists, remote no match, other matches - copy") {
val theHash = MD5Hash("the-hash")
val theFile = aLocalFile("the-file", theHash, source, fileToKey)
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey, fileToHash)
val theRemoteKey = theFile.remoteKey
val oldHash = MD5Hash("old-hash")
val otherRemoteKey = aRemoteKey(prefix, "other-key")
implicit val s3: S3ObjectsData = S3ObjectsData(
val otherRemoteKey = prefix.resolve("other-key")
val s3: S3ObjectsData = S3ObjectsData(
byHash = Map(
oldHash -> Set(KeyModified(theRemoteKey, lastModified)),
theHash -> Set(KeyModified(otherRemoteKey, lastModified))),
@ -104,16 +108,16 @@ class S3MetaDataEnricherSuite
val expected = Stream(S3MetaData(theFile,
matchByHash = Set(otherRemoteMetadata),
matchByKey = Some(theRemoteMetadata)))
val result = getMetadata(theFile)
val result = getMetadata(theFile, s3)
assertResult(expected)(result)
}
}
describe("#5 local exists, remote exists, remote no match, other no matches - upload") {
val theHash = MD5Hash("the-hash")
val theFile = aLocalFile("the-file", theHash, source, fileToKey)
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey, fileToHash)
val theRemoteKey = theFile.remoteKey
val oldHash = MD5Hash("old-hash")
implicit val s3: S3ObjectsData = S3ObjectsData(
val s3: S3ObjectsData = S3ObjectsData(
byHash = Map(
oldHash -> Set(KeyModified(theRemoteKey, lastModified)),
theHash -> Set.empty),
@ -126,10 +130,63 @@ class S3MetaDataEnricherSuite
val expected = Stream(S3MetaData(theFile,
matchByHash = Set.empty,
matchByKey = Some(theRemoteMetadata)))
val result = getMetadata(theFile)
val result = getMetadata(theFile, s3)
assertResult(expected)(result)
}
}
}
describe("getS3Status") {
val hash = MD5Hash("hash")
val localFile = LocalFile.resolve("the-file", hash, source, fileToKey, fileToHash)
val key = localFile.remoteKey
val keyotherkey = LocalFile.resolve("other-key-same-hash", hash, source, fileToKey, fileToHash)
val diffhash = MD5Hash("diff")
val keydiffhash = LocalFile.resolve("other-key-diff-hash", diffhash, source, fileToKey, fileToHash)
val lastModified = LastModified(Instant.now)
val s3ObjectsData: S3ObjectsData = S3ObjectsData(
byHash = Map(
hash -> Set(KeyModified(key, lastModified), KeyModified(keyotherkey.remoteKey, lastModified)),
diffhash -> Set(KeyModified(keydiffhash.remoteKey, lastModified))),
byKey = Map(
key -> HashModified(hash, lastModified),
keyotherkey.remoteKey -> HashModified(hash, lastModified),
keydiffhash.remoteKey -> HashModified(diffhash, lastModified)))
def invoke(localFile: LocalFile) = {
getS3Status(localFile, s3ObjectsData)
}
describe("when remote key exists") {
it("should return (Some, Set.nonEmpty)") {
assertResult(
(Some(HashModified(hash, lastModified)),
Set(
KeyModified(key, lastModified),
KeyModified(keyotherkey.remoteKey, lastModified)))
)(invoke(localFile))
}
}
describe("when remote key does not exist and no others matches hash") {
it("should return (None, Set.empty)") {
val localFile = LocalFile.resolve("missing-file", MD5Hash("unique"), source, fileToKey, fileToHash)
assertResult(
(None,
Set.empty)
)(invoke(localFile))
}
}
describe("when remote key exists and no others match hash") {
it("should return (None, Set.nonEmpty)") {
assertResult(
(Some(HashModified(diffhash, lastModified)),
Set(KeyModified(keydiffhash.remoteKey, lastModified)))
)(invoke(keydiffhash))
}
}
}
}

View file

@ -1,4 +1,4 @@
package net.kemitix.s3thorp
package net.kemitix.s3thorp.domain
final case class Bucket(name: String) {

View file

@ -1,4 +1,4 @@
package net.kemitix.s3thorp
package net.kemitix.s3thorp.domain
import java.io.File

View file

@ -1,4 +1,4 @@
package net.kemitix.s3thorp
package net.kemitix.s3thorp.domain
import java.nio.file.Path
import java.util.function.Predicate

View file

@ -1,4 +1,4 @@
package net.kemitix.s3thorp
package net.kemitix.s3thorp.domain
import java.nio.file.Path
import java.util.function.Predicate

View file

@ -1,4 +1,4 @@
package net.kemitix.s3thorp
package net.kemitix.s3thorp.domain
final case class HashModified(hash: MD5Hash,
modified: LastModified) {

View file

@ -1,4 +1,4 @@
package net.kemitix.s3thorp
package net.kemitix.s3thorp.domain
final case class KeyModified(key: RemoteKey,
modified: LastModified) {

View file

@ -1,4 +1,4 @@
package net.kemitix.s3thorp
package net.kemitix.s3thorp.domain
import java.time.Instant

View file

@ -0,0 +1,41 @@
package net.kemitix.s3thorp.domain
import java.io.File
import java.nio.file.Path
final case class LocalFile(
file: File,
source: File,
keyGenerator: File => RemoteKey,
md5HashGenerator: File => MD5Hash,
suppliedHash: Option[MD5Hash] = None) {
require(!file.isDirectory, s"LocalFile must not be a directory: $file")
private lazy val myhash = suppliedHash.getOrElse(md5HashGenerator(file))
def hash: MD5Hash = myhash
// the equivalent location of the file on S3
def remoteKey: RemoteKey = keyGenerator(file)
def isDirectory: Boolean = file.isDirectory
// the path of the file within the source
def relative: Path = source.toPath.relativize(file.toPath)
}
object LocalFile {
def resolve(path: String,
myHash: MD5Hash,
source: File,
fileToKey: File => RemoteKey,
fileToHash: File => MD5Hash): LocalFile =
LocalFile(
file = source.toPath.resolve(path).toFile,
source = source,
keyGenerator = fileToKey,
md5HashGenerator = fileToHash,
suppliedHash = Some(myHash))
}

View file

@ -1,4 +1,4 @@
package net.kemitix.s3thorp
package net.kemitix.s3thorp.domain
final case class MD5Hash(hash: String) {

View file

@ -0,0 +1,13 @@
package net.kemitix.s3thorp.domain
import java.io.File
import java.nio.file.Paths
final case class RemoteKey(key: String) {
def asFile(source: File, prefix: RemoteKey): File =
source.toPath.resolve(Paths.get(prefix.key).relativize(Paths.get(key))).toFile
def isMissingLocally(source: File, prefix: RemoteKey): Boolean =
! asFile(source, prefix).exists
def resolve(path: String): RemoteKey =
RemoteKey(key + "/" + path)
}

View file

@ -1,4 +1,4 @@
package net.kemitix.s3thorp
package net.kemitix.s3thorp.domain
final case class RemoteMetaData(remoteKey: RemoteKey,
hash: MD5Hash,

View file

@ -1,4 +1,4 @@
package net.kemitix.s3thorp
package net.kemitix.s3thorp.domain
// For the LocalFile, the set of matching S3 objects with the same MD5Hash, and any S3 object with the same remote key
final case class S3MetaData(

View file

@ -1,6 +1,4 @@
package net.kemitix.s3thorp.awssdk
import net.kemitix.s3thorp.{HashModified, KeyModified, MD5Hash, RemoteKey}
package net.kemitix.s3thorp.domain
/**
* A list of objects and their MD5 hash values.

View file

@ -1,8 +1,10 @@
package net.kemitix.s3thorp
package net.kemitix.s3thorp.domain
import java.nio.file.{Path, Paths}
class ExcludeSuite extends UnitTest {
import org.scalatest.FunSpec
class ExcludeSuite extends FunSpec {
describe("default exclude") {
val exclude = Exclude()

View file

@ -1,8 +1,10 @@
package net.kemitix.s3thorp
package net.kemitix.s3thorp.domain
import java.nio.file.{Path, Paths}
class FilterSuite extends UnitTest {
import org.scalatest.FunSpec
class FilterSuite extends FunSpec {
describe("default filter") {
val filter = Filter()

View file

@ -1,10 +0,0 @@
package net.kemitix.s3thorp
sealed trait Action
final case class DoNothing(remoteKey: RemoteKey) extends Action
final case class ToUpload(localFile: LocalFile) extends Action
final case class ToCopy(
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey) extends Action
final case class ToDelete(remoteKey: RemoteKey) extends Action

View file

@ -1,28 +0,0 @@
package net.kemitix.s3thorp
import cats.effect.IO
import net.kemitix.s3thorp.awssdk.{S3Client, UploadProgressListener}
trait ActionSubmitter
extends S3Client
with Logging {
def submitAction(action: Action)
(implicit c: Config): Stream[IO[S3Action]] = {
Stream(
action match {
case ToUpload(localFile) =>
log4(s" Upload: ${localFile.relative}")
val progressListener = new UploadProgressListener(localFile)
upload(localFile, c.bucket, progressListener, 1)
case ToCopy(sourceKey, hash, targetKey) =>
log4(s" Copy: ${sourceKey.key} => ${targetKey.key}")
copy(c.bucket, sourceKey, hash, targetKey)
case ToDelete(remoteKey) =>
log4(s" Delete: ${remoteKey.key}")
delete(c.bucket, remoteKey)
case DoNothing(remoteKey) => IO {
DoNothingS3Action(remoteKey)}
})
}
}

View file

@ -1,28 +0,0 @@
package net.kemitix.s3thorp
import java.io.File
import java.nio.file.Path
final case class LocalFile(
file: File,
source: File,
keyGenerator: File => RemoteKey,
suppliedHash: Option[MD5Hash] = None)
(implicit c: Config)
extends MD5HashGenerator {
require(!file.isDirectory, s"LocalFile must not be a directory: $file")
private lazy val myhash = suppliedHash.getOrElse(md5File(file))
def hash: MD5Hash = myhash
// the equivalent location of the file on S3
def remoteKey: RemoteKey = keyGenerator(file)
def isDirectory: Boolean = file.isDirectory
// the path of the file within the source
def relative: Path = source.toPath.relativize(file.toPath)
}

View file

@ -1,31 +0,0 @@
package net.kemitix.s3thorp
import java.io.File
trait LocalFileStream
extends KeyGenerator
with Logging {
def findFiles(file: File)
(implicit c: Config): Stream[LocalFile] = {
log2(s"- Entering: $file")
val files = for {
f <- dirPaths(file)
.filter { f => f.isDirectory || c.filters.forall { filter => filter isIncluded f.toPath } }
.filter { f => c.excludes.forall { exclude => exclude isIncluded f.toPath } }
fs <- recurseIntoSubDirectories(f)
} yield fs
log5(s"- Leaving: $file")
files
}
private def dirPaths(file: File): Stream[File] = {
Option(file.listFiles)
.getOrElse(throw new IllegalArgumentException(s"Directory not found $file")).toStream
}
private def recurseIntoSubDirectories(file: File)(implicit c: Config): Stream[LocalFile] =
if (file.isDirectory) findFiles(file)(c)
else Stream(LocalFile(file, c.source, generateKey(c.source, c.prefix)))
}

View file

@ -1,21 +0,0 @@
package net.kemitix.s3thorp
import com.typesafe.scalalogging.LazyLogging
trait Logging extends LazyLogging {
def log1(message: String)(implicit config: Config): Unit = if (config.verbose >= 1) logger.info(s"1:$message")
def log2(message: String)(implicit config: Config): Unit = if (config.verbose >= 2) logger.info(s"2:$message")
def log3(message: String)(implicit config: Config): Unit = if (config.verbose >= 3) logger.info(s"3:$message")
def log4(message: String)(implicit config: Config): Unit = if (config.verbose >= 4) logger.info(s"4:$message")
def log5(message: String)(implicit config: Config): Unit = if (config.verbose >= 5) logger.info(s"5:$message")
def warn(message: String): Unit = logger.warn(message)
def error(message: String): Unit = logger.error(message)
}

View file

@ -1,31 +0,0 @@
package net.kemitix.s3thorp
import java.nio.file.Paths
import cats.effect.ExitCase.{Canceled, Completed, Error}
import cats.effect.{ExitCode, IO, IOApp}
import net.kemitix.s3thorp.awssdk.S3Client
object Main extends IOApp with Logging {
val defaultConfig: Config =
Config(source = Paths.get(".").toFile)
val sync = new Sync(S3Client.defaultClient)
def program(args: List[String]): IO[ExitCode] =
for {
a <- ParseArgs(args, defaultConfig)
_ <- IO(log1("S3Thorp - hashed sync for s3")(a))
_ <- sync.run(a)
} yield ExitCode.Success
override def run(args: List[String]): IO[ExitCode] =
program(args)
.guaranteeCase {
case Canceled => IO(logger.warn("Interrupted"))
case Error(e) => IO(logger.error(e.getMessage))
case Completed => IO(logger.info("Done"))
}
}

View file

@ -1,11 +0,0 @@
package net.kemitix.s3thorp
import java.io.File
import java.nio.file.Paths
final case class RemoteKey(key: String) {
def asFile(implicit c: Config): File =
c.source.toPath.resolve(Paths.get(c.prefix.key).relativize(Paths.get(key))).toFile
def isMissingLocally(implicit c: Config): Boolean =
! asFile.exists
}

View file

@ -1,36 +0,0 @@
package net.kemitix.s3thorp
sealed trait S3Action {
// the remote key that was uploaded, deleted or otherwise updated by the action
def remoteKey: RemoteKey
val order: Int
}
final case class DoNothingS3Action(remoteKey: RemoteKey) extends S3Action {
override val order: Int = 0
}
final case class CopyS3Action(remoteKey: RemoteKey) extends S3Action {
override val order: Int = 1
}
final case class UploadS3Action(
remoteKey: RemoteKey,
md5Hash: MD5Hash) extends S3Action {
override val order: Int = 2
}
final case class DeleteS3Action(remoteKey: RemoteKey) extends S3Action {
override val order: Int = 3
}
final case class ErroredS3Action(remoteKey: RemoteKey, e: Throwable) extends S3Action {
override val order: Int = 10
}
object S3Action {
implicit def ord[A <: S3Action]: Ordering[A] = Ordering.by(_.order)
}

View file

@ -1,19 +0,0 @@
package net.kemitix.s3thorp
import net.kemitix.s3thorp.awssdk.{S3ObjectsData, S3Client}
trait S3MetaDataEnricher
extends S3Client
with Logging {
def getMetadata(localFile: LocalFile)
(implicit c: Config,
s3ObjectsData: S3ObjectsData): Stream[S3MetaData] = {
val (keyMatches, hashMatches) = getS3Status(localFile)
Stream(
S3MetaData(localFile,
matchByKey = keyMatches map { hm => RemoteMetaData(localFile.remoteKey, hm.hash, hm.modified) },
matchByHash = hashMatches map { km => RemoteMetaData(km.key, localFile.hash, km.modified) }))
}
}

View file

@ -1,61 +0,0 @@
package net.kemitix.s3thorp
import cats.effect.IO
import cats.implicits._
import net.kemitix.s3thorp.awssdk.{S3Client, S3ObjectsData, UploadProgressListener}
class Sync(s3Client: S3Client)
extends LocalFileStream
with S3MetaDataEnricher
with ActionGenerator
with ActionSubmitter
with SyncLogging {
def run(implicit c: Config): IO[Unit] = {
logRunStart
listObjects(c.bucket, c.prefix)
.map { implicit s3ObjectsData => {
logFileScan
val actions = for {
file <- findFiles(c.source)
data <- getMetadata(file)
action <- createActions(data)
s3Action <- submitAction(action)
} yield s3Action
val sorted = sort(actions.sequence)
val list = sorted.unsafeRunSync.toList
val delActions = (for {
key <- s3ObjectsData.byKey.keys
if key.isMissingLocally
ioDelAction <- submitAction(ToDelete(key))
} yield ioDelAction).toStream.sequence
val delList = delActions.unsafeRunSync.toList
logRunFinished(list ++ delList)
}}
}
private def sort(ioActions: IO[Stream[S3Action]]) =
ioActions.flatMap { actions => IO { actions.sorted } }
override def upload(localFile: LocalFile,
bucket: Bucket,
progressListener: UploadProgressListener,
tryCount: Int)
(implicit c: Config): IO[S3Action] =
s3Client.upload(localFile, bucket, progressListener, tryCount)
override def copy(bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey)(implicit c: Config): IO[CopyS3Action] =
s3Client.copy(bucket, sourceKey, hash, targetKey)
override def delete(bucket: Bucket,
remoteKey: RemoteKey)(implicit c: Config): IO[DeleteS3Action] =
s3Client.delete(bucket, remoteKey)
override def listObjects(bucket: Bucket,
prefix: RemoteKey
)(implicit c: Config): IO[S3ObjectsData] =
s3Client.listObjects(bucket, prefix)
}

View file

@ -1,38 +0,0 @@
package net.kemitix.s3thorp
import cats.effect.IO
// Logging for the Sync class
trait SyncLogging extends Logging {
def logRunStart(implicit c: Config): Unit =
log1(s"Bucket: ${c.bucket.name}, Prefix: ${c.prefix.key}, Source: ${c.source}, " +
s"Filter: ${c.filters.map{ f => f.filter}.mkString(""", """)} " +
s"Exclude: ${c.excludes.map{ f => f.exclude}.mkString(""", """)}")(c)
def logFileScan(implicit c: Config): Unit =
log1(s"Scanning local files: ${c.source}...")
def logRunFinished(actions: List[S3Action])
(implicit c: Config): Unit = {
val counters = actions.foldLeft(Counters())(logActivity)
log1(s"Uploaded ${counters.uploaded} files")
log1(s"Copied ${counters.copied} files")
log1(s"Deleted ${counters.deleted} files")
}
private def logActivity(implicit c: Config): (Counters, S3Action) => Counters =
(counters: Counters, s3Action: S3Action) => {
s3Action match {
case UploadS3Action(remoteKey, _) =>
counters.copy(uploaded = counters.uploaded + 1)
case CopyS3Action(remoteKey) =>
counters.copy(copied = counters.copied + 1)
case DeleteS3Action(remoteKey) =>
counters.copy(deleted = counters.deleted + 1)
case _ => counters
}
}
}

View file

@ -1,54 +0,0 @@
package net.kemitix.s3thorp.awssdk
import cats.effect.IO
import com.amazonaws.services.s3.{AmazonS3, AmazonS3Client, AmazonS3ClientBuilder}
import com.amazonaws.services.s3.transfer.{TransferManager, TransferManagerBuilder}
import com.github.j5ik2o.reactive.aws.s3.S3AsyncClient
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
import net.kemitix.s3thorp._
trait S3Client {
final def getS3Status(localFile: LocalFile)
(implicit s3ObjectsData: S3ObjectsData): (Option[HashModified], Set[KeyModified]) = {
val matchingByKey = s3ObjectsData.byKey.get(localFile.remoteKey)
val matchingByHash = s3ObjectsData.byHash.getOrElse(localFile.hash, Set())
(matchingByKey, matchingByHash)
}
def listObjects(bucket: Bucket,
prefix: RemoteKey
)(implicit c: Config): IO[S3ObjectsData]
def upload(localFile: LocalFile,
bucket: Bucket,
progressListener: UploadProgressListener,
tryCount: Int
)(implicit c: Config): IO[S3Action]
def copy(bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey
)(implicit c: Config): IO[CopyS3Action]
def delete(bucket: Bucket,
remoteKey: RemoteKey
)(implicit c: Config): IO[DeleteS3Action]
}
object S3Client {
def createClient(s3AsyncClient: S3AsyncClient,
amazonS3Client: AmazonS3,
amazonS3TransferManager: TransferManager): S3Client = {
new ThorpS3Client(S3CatsIOClient(s3AsyncClient), amazonS3Client, amazonS3TransferManager)
}
val defaultClient: S3Client =
createClient(new JavaClientWrapper {}.underlying,
AmazonS3ClientBuilder.defaultClient,
TransferManagerBuilder.defaultTransferManager)
}

View file

@ -1,81 +0,0 @@
package net.kemitix.s3thorp.awssdk
import cats.effect.IO
import com.amazonaws.services.s3.model.PutObjectResult
import net.kemitix.s3thorp.{Bucket, Config, LocalFile, Logging, RemoteKey}
import software.amazon.awssdk.services.s3.model.{CopyObjectResponse, DeleteObjectResponse, ListObjectsV2Response}
trait S3ClientLogging
extends Logging {
def logListObjectsStart(bucket: Bucket,
prefix: RemoteKey)
(implicit c: Config): ListObjectsV2Response => IO[ListObjectsV2Response] = {
in => IO {
log3(s"Fetch S3 Summary: ${bucket.name}:${prefix.key}")
in
}
}
def logListObjectsFinish(bucket: Bucket,
prefix: RemoteKey)
(implicit c: Config): ListObjectsV2Response => IO[Unit] = {
in => IO {
log2(s"Fetched S3 Summary: ${bucket.name}:${prefix.key}")
}
}
def logUploadStart(localFile: LocalFile,
bucket: Bucket)
(implicit c: Config): PutObjectResult => IO[PutObjectResult] = {
in => IO {
log4(s"Uploading: ${bucket.name}:${localFile.remoteKey.key}")
in
}
}
def logUploadFinish(localFile: LocalFile,
bucket: Bucket)
(implicit c: Config): PutObjectResult => IO[Unit] = {
in =>IO {
log1(s"Uploaded: ${bucket.name}:${localFile.remoteKey.key}")
}
}
def logCopyStart(bucket: Bucket,
sourceKey: RemoteKey,
targetKey: RemoteKey)
(implicit c: Config): CopyObjectResponse => IO[CopyObjectResponse] = {
in => IO {
log4(s"Copy: ${bucket.name}:${sourceKey.key} => ${targetKey.key}")
in
}
}
def logCopyFinish(bucket: Bucket,
sourceKey: RemoteKey,
targetKey: RemoteKey)
(implicit c: Config): CopyObjectResponse => IO[Unit] = {
in => IO {
log3(s"Copied: ${bucket.name}:${sourceKey.key} => ${targetKey.key}")
}
}
def logDeleteStart(bucket: Bucket,
remoteKey: RemoteKey)
(implicit c: Config): DeleteObjectResponse => IO[DeleteObjectResponse] = {
in => IO {
log4(s"Delete: ${bucket.name}:${remoteKey.key}")
in
}
}
def logDeleteFinish(bucket: Bucket,
remoteKey: RemoteKey)
(implicit c: Config): DeleteObjectResponse => IO[Unit] = {
in => IO {
log3(s"Deleted: ${bucket.name}:${remoteKey.key}")
}
}
}

View file

@ -1,33 +0,0 @@
package net.kemitix.s3thorp.awssdk
import cats.effect.IO
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.PutObjectRequest
import net.kemitix.s3thorp._
private class S3ClientPutObjectUploader(s3Client: => AmazonS3)
extends S3ClientUploader
with S3ClientLogging
with QuoteStripper {
override def accepts(localFile: LocalFile)(implicit c: Config): Boolean = true
override
def upload(localFile: LocalFile,
bucket: Bucket,
progressListener: UploadProgressListener,
tryCount: Int)
(implicit c: Config): IO[UploadS3Action] = {
val request: PutObjectRequest =
new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file)
.withGeneralProgressListener(progressListener.listener)
IO(s3Client.putObject(request))
.bracket(
logUploadStart(localFile, bucket))(
logUploadFinish(localFile, bucket))
.map(_.getETag)
.map(_ filter stripQuotes)
.map(MD5Hash)
.map(UploadS3Action(localFile.remoteKey, _))
}
}

View file

@ -1,17 +0,0 @@
package net.kemitix.s3thorp.awssdk
import cats.effect.IO
import net.kemitix.s3thorp.{Bucket, Config, LocalFile, S3Action}
trait S3ClientUploader {
def accepts(localFile: LocalFile)
(implicit c: Config): Boolean
def upload(localFile: LocalFile,
bucket: Bucket,
progressListener: UploadProgressListener,
tryCount: Int)
(implicit c: Config): IO[S3Action]
}

View file

@ -1,17 +0,0 @@
package net.kemitix.s3thorp.awssdk
import com.amazonaws.event.{ProgressEvent, ProgressListener}
import net.kemitix.s3thorp.{Config, LocalFile}
class UploadProgressListener(localFile: LocalFile)
(implicit c: Config)
extends UploadProgressLogging {
def listener: ProgressListener = new ProgressListener {
override def progressChanged(progressEvent: ProgressEvent): Unit = {
val eventType = progressEvent.getEventType
if (eventType.isTransferEvent) logTransfer(localFile, eventType)
else logRequestCycle(localFile, eventType, progressEvent.getBytes, progressEvent.getBytesTransferred)
}
}
}

View file

@ -1,22 +0,0 @@
package net.kemitix.s3thorp.awssdk
import com.amazonaws.event.ProgressEventType
import net.kemitix.s3thorp.{Logging, LocalFile, Config}
trait UploadProgressLogging
extends Logging {
def logTransfer(localFile: LocalFile,
eventType: ProgressEventType)
(implicit c: Config): Unit =
log2(s"Transfer:${eventType.name}: ${localFile.remoteKey.key}")
def logRequestCycle(localFile: LocalFile,
eventType: ProgressEventType,
bytes: Long,
transferred: Long)
(implicit c: Config): Unit =
if (eventType equals ProgressEventType.REQUEST_BYTE_TRANSFER_EVENT) print('.')
else log3(s"Uploading:${eventType.name}:$transferred/$bytes:${localFile.remoteKey.key}")
}

View file

@ -1,28 +0,0 @@
package net.kemitix.s3thorp
import cats.effect.IO
import net.kemitix.s3thorp.awssdk.{S3Client, S3ObjectsData, UploadProgressListener}
trait DummyS3Client extends S3Client {
override def upload(localFile: LocalFile,
bucket: Bucket,
progressListener: UploadProgressListener,
tryCount: Int
)(implicit c: Config): IO[UploadS3Action] = ???
override def copy(bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey
)(implicit c: Config): IO[CopyS3Action] = ???
override def delete(bucket: Bucket,
remoteKey: RemoteKey
)(implicit c: Config): IO[DeleteS3Action] = ???
override def listObjects(bucket: Bucket,
prefix: RemoteKey
)(implicit c: Config): IO[S3ObjectsData] = ???
}

View file

@ -1,16 +0,0 @@
package net.kemitix.s3thorp
import org.scalatest.FunSpec
class LocalFileStreamSuite extends FunSpec with LocalFileStream {
describe("findFiles") {
val uploadResource = Resource(this, "upload")
val config: Config = Config(source = uploadResource)
it("should find all files") {
val result: Set[String] = findFiles(uploadResource)(config).toSet
.map { x: LocalFile => x.relative.toString }
assertResult(Set("subdir/leaf-file", "root-file"))(result)
}
}
}

View file

@ -1,20 +0,0 @@
package net.kemitix.s3thorp
import java.io.File
import org.scalatest.FunSpec
abstract class UnitTest extends FunSpec {
def aLocalFile(path: String, myHash: MD5Hash, source: File, fileToKey: File => RemoteKey)
(implicit c: Config): LocalFile =
LocalFile(
file = source.toPath.resolve(path).toFile,
source = source,
keyGenerator = fileToKey,
suppliedHash = Some(myHash))
def aRemoteKey(prefix: RemoteKey, path: String): RemoteKey =
RemoteKey(prefix.key + "/" + path)
}