Improve purity by moving all IO out to the edge (#52)
* [aws-api] TTFI UploadProgressLogging#logTransfer * [aws-api] TTFI UploadProgressLogging#logRequestCycle * [aws-api] TTFI UploadProgressLogging#logByteTransfer * [aws-api] TTFI UploadProgressListener * [aws-lib] TTFI UploaderLogging * [aws-api] UploadProgressListener refactoring * [aws-api] UploadProgressListener remove IO/Monad This class is a wrapper for a callback method used by the AWS SDK. Unfortunately you can't get the IO() created when that listener is called by the SDK, so unless we manually run unsafeRunSync, as we have done previously, it would never be executed. This removes the IO monad entirely and simply runs the code when the callback is triggered. * [aws-lib] S3ClientLogging remove unused method * [aws-lib] TTFI S3ClientLogging * [aws-lib] TTFI S3ClientCopier * [aws-lib] TTFI S3ClientObjectLister * [aws-lib] TTFI Uploader * [aws-lib] TTFI S3ClientDeleter * [aws-api] TTFI S3Client * [aws-lib] TTFI S3ClientBuilder and ThorpS3Client * [core] TTFI ActionSubmitter * [cli] TTFI Logger * [core] TTFI MD5HashGenerator * [core] TTFI LocalFileStream * [core] Sync refactoring * [core] TTFI Sync * [aws-lib] S3ObjectsByHashSuite truncate lastmodified to match Date * [aws-lib] ThorpS3ClientSuite truncate lastmodified to match Date * [core] MD5HashGeneratorTest switch to Id from IO * [sbt] restrict cats-effect to cli module, cats-core elsewhere * [core] MD5HashGenerator collapse lines
This commit is contained in:
parent
5996632c1e
commit
8d0c3e23c9
26 changed files with 304 additions and 282 deletions
|
@ -1,14 +1,13 @@
|
|||
package net.kemitix.s3thorp.aws.api
|
||||
|
||||
import cats.effect.IO
|
||||
import net.kemitix.s3thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action}
|
||||
import net.kemitix.s3thorp.domain.{Bucket, LocalFile, MD5Hash, RemoteKey, S3ObjectsData}
|
||||
|
||||
trait S3Client {
|
||||
trait S3Client[M[_]] {
|
||||
|
||||
def listObjects(bucket: Bucket,
|
||||
prefix: RemoteKey
|
||||
)(implicit info: Int => String => IO[Unit]): IO[S3ObjectsData]
|
||||
)(implicit info: Int => String => M[Unit]): M[S3ObjectsData]
|
||||
|
||||
def upload(localFile: LocalFile,
|
||||
bucket: Bucket,
|
||||
|
@ -16,17 +15,17 @@ trait S3Client {
|
|||
multiPartThreshold: Long,
|
||||
tryCount: Int,
|
||||
maxRetries: Int)
|
||||
(implicit info: Int => String => IO[Unit],
|
||||
warn: String => IO[Unit]): IO[S3Action]
|
||||
(implicit info: Int => String => M[Unit],
|
||||
warn: String => M[Unit]): M[S3Action]
|
||||
|
||||
def copy(bucket: Bucket,
|
||||
sourceKey: RemoteKey,
|
||||
hash: MD5Hash,
|
||||
targetKey: RemoteKey
|
||||
)(implicit info: Int => String => IO[Unit]): IO[CopyS3Action]
|
||||
)(implicit info: Int => String => M[Unit]): M[CopyS3Action]
|
||||
|
||||
def delete(bucket: Bucket,
|
||||
remoteKey: RemoteKey
|
||||
)(implicit info: Int => String => IO[Unit]): IO[DeleteS3Action]
|
||||
)(implicit info: Int => String => M[Unit]): M[DeleteS3Action]
|
||||
|
||||
}
|
||||
|
|
|
@ -1,23 +1,19 @@
|
|||
package net.kemitix.s3thorp.aws.api
|
||||
|
||||
import cats.effect.IO
|
||||
import net.kemitix.s3thorp.aws.api.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent}
|
||||
import net.kemitix.s3thorp.domain.LocalFile
|
||||
|
||||
class UploadProgressListener(localFile: LocalFile)
|
||||
(implicit info: Int => String => IO[Unit])
|
||||
extends UploadProgressLogging {
|
||||
|
||||
var bytesTransferred = 0L
|
||||
|
||||
def listener: UploadEvent => IO[Unit] =
|
||||
def listener: UploadEvent => Unit =
|
||||
{
|
||||
case e: TransferEvent => logTransfer(localFile, e)
|
||||
case e: RequestEvent => {
|
||||
val transferred = e.transferred
|
||||
bytesTransferred += transferred
|
||||
case e: RequestEvent =>
|
||||
bytesTransferred += e.transferred
|
||||
logRequestCycle(localFile, e, bytesTransferred)
|
||||
}
|
||||
case e: ByteTransferEvent => logByteTransfer(e)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,9 +1,8 @@
|
|||
package net.kemitix.s3thorp.aws.api
|
||||
|
||||
import cats.effect.IO
|
||||
import net.kemitix.s3thorp.aws.api.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent}
|
||||
import net.kemitix.s3thorp.domain.Terminal.{clearLine, returnToPreviousLine}
|
||||
import net.kemitix.s3thorp.domain.{Terminal, LocalFile}
|
||||
import net.kemitix.s3thorp.domain.{LocalFile, Terminal}
|
||||
import net.kemitix.s3thorp.domain.SizeTranslation.sizeInEnglish
|
||||
|
||||
import scala.io.AnsiColor._
|
||||
|
@ -11,16 +10,14 @@ import scala.io.AnsiColor._
|
|||
trait UploadProgressLogging {
|
||||
|
||||
def logTransfer(localFile: LocalFile,
|
||||
event: TransferEvent)
|
||||
(implicit info: Int => String => IO[Unit]): IO[Unit] =
|
||||
info(2)(s"Transfer:${event.name}: ${localFile.remoteKey.key}")
|
||||
event: TransferEvent): Unit =
|
||||
println(s"Transfer:${event.name}: ${localFile.remoteKey.key}")
|
||||
|
||||
private val oneHundredPercent = 100
|
||||
|
||||
def logRequestCycle(localFile: LocalFile,
|
||||
event: RequestEvent,
|
||||
bytesTransferred: Long)
|
||||
(implicit info: Int => String => IO[Unit]): IO[Unit] = {
|
||||
event: RequestEvent,
|
||||
bytesTransferred: Long): Unit = {
|
||||
val remoteKey = localFile.remoteKey.key
|
||||
val fileLength = localFile.file.length
|
||||
val consoleWidth = Terminal.width - 2
|
||||
|
@ -31,13 +28,12 @@ trait UploadProgressLogging {
|
|||
val bar = s"[$head$tail]"
|
||||
val transferred = sizeInEnglish(bytesTransferred)
|
||||
val fileSize = sizeInEnglish(fileLength)
|
||||
IO(print(s"${clearLine}Uploading $transferred of $fileSize : $remoteKey\n$bar$returnToPreviousLine"))
|
||||
print(s"${clearLine}Uploading $transferred of $fileSize : $remoteKey\n$bar$returnToPreviousLine")
|
||||
} else
|
||||
IO(print(clearLine))
|
||||
print(clearLine)
|
||||
}
|
||||
|
||||
def logByteTransfer(event: ByteTransferEvent)
|
||||
(implicit info: Int => String => IO[Unit]): IO[Unit] =
|
||||
info(3)(".")
|
||||
def logByteTransfer(event: ByteTransferEvent): Unit =
|
||||
print(".")
|
||||
|
||||
}
|
||||
|
|
|
@ -1,16 +1,17 @@
|
|||
package net.kemitix.s3thorp.aws.lib
|
||||
|
||||
import cats.Monad
|
||||
import com.amazonaws.services.s3.transfer.{TransferManager, TransferManagerBuilder}
|
||||
import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder}
|
||||
import net.kemitix.s3thorp.aws.api.S3Client
|
||||
|
||||
object S3ClientBuilder {
|
||||
|
||||
def createClient(amazonS3Client: AmazonS3,
|
||||
amazonS3TransferManager: TransferManager): S3Client =
|
||||
def createClient[M[_]: Monad](amazonS3Client: AmazonS3,
|
||||
amazonS3TransferManager: TransferManager): S3Client[M] =
|
||||
new ThorpS3Client(amazonS3Client, amazonS3TransferManager)
|
||||
|
||||
val defaultClient: S3Client =
|
||||
def defaultClient[M[_]: Monad]: S3Client[M] =
|
||||
createClient(AmazonS3ClientBuilder.defaultClient, TransferManagerBuilder.defaultTransferManager)
|
||||
|
||||
}
|
||||
|
|
|
@ -1,32 +1,34 @@
|
|||
package net.kemitix.s3thorp.aws.lib
|
||||
|
||||
import cats.effect.IO
|
||||
import cats.Monad
|
||||
import cats.implicits._
|
||||
import com.amazonaws.services.s3.AmazonS3
|
||||
import com.amazonaws.services.s3.model.CopyObjectRequest
|
||||
import net.kemitix.s3thorp.aws.api.S3Action.CopyS3Action
|
||||
import net.kemitix.s3thorp.aws.lib.S3ClientLogging.{logCopyFinish, logCopyStart}
|
||||
import net.kemitix.s3thorp.domain.{Bucket, MD5Hash, RemoteKey}
|
||||
|
||||
class S3ClientCopier(amazonS3: AmazonS3) {
|
||||
class S3ClientCopier[M[_]: Monad](amazonS3: AmazonS3) {
|
||||
|
||||
def copy(bucket: Bucket,
|
||||
sourceKey: RemoteKey,
|
||||
hash: MD5Hash,
|
||||
targetKey: RemoteKey)
|
||||
(implicit info: Int => String => IO[Unit]): IO[CopyS3Action] = {
|
||||
IO {
|
||||
new CopyObjectRequest(
|
||||
bucket.name, sourceKey.key,
|
||||
bucket.name, targetKey.key)
|
||||
(implicit info: Int => String => M[Unit]): M[CopyS3Action] =
|
||||
for {
|
||||
_ <- logCopyStart[M](bucket, sourceKey, targetKey)
|
||||
_ <- copyObject(bucket, sourceKey, hash, targetKey)
|
||||
_ <- logCopyFinish[M](bucket, sourceKey,targetKey)
|
||||
} yield CopyS3Action(targetKey)
|
||||
|
||||
private def copyObject(bucket: Bucket,
|
||||
sourceKey: RemoteKey,
|
||||
hash: MD5Hash,
|
||||
targetKey: RemoteKey) = {
|
||||
val request =
|
||||
new CopyObjectRequest(bucket.name, sourceKey.key, bucket.name, targetKey.key)
|
||||
.withMatchingETagConstraint(hash.hash)
|
||||
}.bracket {
|
||||
request =>
|
||||
for {
|
||||
_ <- logCopyStart(bucket, sourceKey, targetKey)
|
||||
result <- IO(amazonS3.copyObject(request))
|
||||
} yield result
|
||||
}(_ => logCopyFinish(bucket, sourceKey,targetKey))
|
||||
.map(_ => CopyS3Action(targetKey))
|
||||
Monad[M].pure(amazonS3.copyObject(request))
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,24 +1,25 @@
|
|||
package net.kemitix.s3thorp.aws.lib
|
||||
|
||||
import cats.effect.IO
|
||||
import cats.Monad
|
||||
import cats.implicits._
|
||||
import com.amazonaws.services.s3.AmazonS3
|
||||
import com.amazonaws.services.s3.model.DeleteObjectRequest
|
||||
import net.kemitix.s3thorp.aws.api.S3Action.DeleteS3Action
|
||||
import net.kemitix.s3thorp.aws.lib.S3ClientLogging.{logDeleteFinish, logDeleteStart}
|
||||
import net.kemitix.s3thorp.domain.{Bucket, RemoteKey}
|
||||
|
||||
class S3ClientDeleter(amazonS3: AmazonS3) {
|
||||
class S3ClientDeleter[M[_]: Monad](amazonS3: AmazonS3) {
|
||||
|
||||
def delete(bucket: Bucket,
|
||||
remoteKey: RemoteKey)
|
||||
(implicit info: Int => String => IO[Unit]): IO[DeleteS3Action] =
|
||||
(implicit info: Int => String => M[Unit]): M[DeleteS3Action] =
|
||||
for {
|
||||
_ <- logDeleteStart(bucket, remoteKey)
|
||||
_ <- logDeleteStart[M](bucket, remoteKey)
|
||||
_ <- deleteObject(bucket, remoteKey)
|
||||
_ <- logDeleteFinish(bucket, remoteKey)
|
||||
_ <- logDeleteFinish[M](bucket, remoteKey)
|
||||
} yield DeleteS3Action(remoteKey)
|
||||
|
||||
private def deleteObject(bucket: Bucket, remoteKey: RemoteKey) = IO {
|
||||
private def deleteObject(bucket: Bucket, remoteKey: RemoteKey) = Monad[M].pure {
|
||||
amazonS3.deleteObject(new DeleteObjectRequest(bucket.name, remoteKey.key))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,53 +1,46 @@
|
|||
package net.kemitix.s3thorp.aws.lib
|
||||
|
||||
import cats.effect.IO
|
||||
import cats.Monad
|
||||
import com.amazonaws.services.s3.model.PutObjectResult
|
||||
import net.kemitix.s3thorp.domain.{Bucket, LocalFile, RemoteKey}
|
||||
|
||||
object S3ClientLogging {
|
||||
|
||||
def logListObjectsStart(bucket: Bucket,
|
||||
def logListObjectsStart[M[_]: Monad](bucket: Bucket,
|
||||
prefix: RemoteKey)
|
||||
(implicit info: Int => String => IO[Unit]): IO[Unit] =
|
||||
(implicit info: Int => String => M[Unit]): M[Unit] =
|
||||
info(1)(s"Fetch S3 Summary: ${bucket.name}:${prefix.key}")
|
||||
|
||||
def logListObjectsFinish(bucket: Bucket,
|
||||
def logListObjectsFinish[M[_]: Monad](bucket: Bucket,
|
||||
prefix: RemoteKey)
|
||||
(implicit info: Int => String => IO[Unit]): IO[Unit] =
|
||||
(implicit info: Int => String => M[Unit]): M[Unit] =
|
||||
info(2)(s"Fetched S3 Summary: ${bucket.name}:${prefix.key}")
|
||||
|
||||
def logUploadStart(localFile: LocalFile,
|
||||
bucket: Bucket)
|
||||
(implicit info: Int => String => IO[Unit]): PutObjectResult => IO[PutObjectResult] =
|
||||
in => for {
|
||||
_ <- info(1)(s"Uploading: ${bucket.name}:${localFile.remoteKey.key}")
|
||||
} yield in
|
||||
|
||||
def logUploadFinish(localFile: LocalFile,
|
||||
def logUploadFinish[M[_]: Monad](localFile: LocalFile,
|
||||
bucket: Bucket)
|
||||
(implicit info: Int => String => IO[Unit]): PutObjectResult => IO[Unit] =
|
||||
(implicit info: Int => String => M[Unit]): PutObjectResult => M[Unit] =
|
||||
_ => info(2)(s"Uploaded: ${bucket.name}:${localFile.remoteKey.key}")
|
||||
|
||||
def logCopyStart(bucket: Bucket,
|
||||
sourceKey: RemoteKey,
|
||||
targetKey: RemoteKey)
|
||||
(implicit info: Int => String => IO[Unit]): IO[Unit] =
|
||||
def logCopyStart[M[_]: Monad](bucket: Bucket,
|
||||
sourceKey: RemoteKey,
|
||||
targetKey: RemoteKey)
|
||||
(implicit info: Int => String => M[Unit]): M[Unit] =
|
||||
info(1)(s"Copy: ${bucket.name}:${sourceKey.key} => ${targetKey.key}")
|
||||
|
||||
def logCopyFinish(bucket: Bucket,
|
||||
def logCopyFinish[M[_]: Monad](bucket: Bucket,
|
||||
sourceKey: RemoteKey,
|
||||
targetKey: RemoteKey)
|
||||
(implicit info: Int => String => IO[Unit]): IO[Unit] =
|
||||
(implicit info: Int => String => M[Unit]): M[Unit] =
|
||||
info(2)(s"Copied: ${bucket.name}:${sourceKey.key} => ${targetKey.key}")
|
||||
|
||||
def logDeleteStart(bucket: Bucket,
|
||||
def logDeleteStart[M[_]: Monad](bucket: Bucket,
|
||||
remoteKey: RemoteKey)
|
||||
(implicit info: Int => String => IO[Unit]): IO[Unit] =
|
||||
(implicit info: Int => String => M[Unit]): M[Unit] =
|
||||
info(1)(s"Delete: ${bucket.name}:${remoteKey.key}")
|
||||
|
||||
def logDeleteFinish(bucket: Bucket,
|
||||
def logDeleteFinish[M[_]: Monad](bucket: Bucket,
|
||||
remoteKey: RemoteKey)
|
||||
(implicit info: Int => String => IO[Unit]): IO[Unit] =
|
||||
(implicit info: Int => String => M[Unit]): M[Unit] =
|
||||
info(2)(s"Deleted: ${bucket.name}:${remoteKey.key}")
|
||||
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package net.kemitix.s3thorp.aws.lib
|
||||
|
||||
import cats.effect.IO
|
||||
import cats.Monad
|
||||
import cats.implicits._
|
||||
import com.amazonaws.services.s3.AmazonS3
|
||||
import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary}
|
||||
import net.kemitix.s3thorp.aws.lib.S3ClientLogging.{logListObjectsFinish, logListObjectsStart}
|
||||
|
@ -10,11 +11,11 @@ import net.kemitix.s3thorp.domain._
|
|||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
class S3ClientObjectLister(amazonS3: AmazonS3) {
|
||||
class S3ClientObjectLister[M[_]: Monad](amazonS3: AmazonS3) {
|
||||
|
||||
def listObjects(bucket: Bucket,
|
||||
prefix: RemoteKey)
|
||||
(implicit info: Int => String => IO[Unit]): IO[S3ObjectsData] = {
|
||||
(implicit info: Int => String => M[Unit]): M[S3ObjectsData] = {
|
||||
|
||||
type Token = String
|
||||
type Batch = (Stream[S3ObjectSummary], Option[Token])
|
||||
|
@ -24,8 +25,8 @@ class S3ClientObjectLister(amazonS3: AmazonS3) {
|
|||
.withPrefix(prefix.key)
|
||||
.withContinuationToken(token)
|
||||
|
||||
def fetchBatch: ListObjectsV2Request => IO[Batch] =
|
||||
request => IO {
|
||||
def fetchBatch: ListObjectsV2Request => M[Batch] =
|
||||
request => Monad[M].pure {
|
||||
val result = amazonS3.listObjectsV2(request)
|
||||
val more: Option[Token] =
|
||||
if (result.isTruncated) Some(result.getNextContinuationToken)
|
||||
|
@ -33,29 +34,27 @@ class S3ClientObjectLister(amazonS3: AmazonS3) {
|
|||
(result.getObjectSummaries.asScala.toStream, more)
|
||||
}
|
||||
|
||||
def fetch: ListObjectsV2Request => IO[Stream[S3ObjectSummary]] =
|
||||
def fetchMore(more: Option[Token]): M[Stream[S3ObjectSummary]] = {
|
||||
more match {
|
||||
case None => Monad[M].pure(Stream.empty)
|
||||
case Some(token) => fetch(requestMore(token))
|
||||
}
|
||||
}
|
||||
|
||||
def fetch: ListObjectsV2Request => M[Stream[S3ObjectSummary]] =
|
||||
request =>
|
||||
for {
|
||||
batch <- fetchBatch(request)
|
||||
(summaries, more) = batch
|
||||
rest <- more match {
|
||||
case None => IO{Stream()}
|
||||
case Some(token) => fetch(requestMore(token))
|
||||
}
|
||||
rest <- fetchMore(more)
|
||||
} yield summaries ++ rest
|
||||
|
||||
IO {
|
||||
new ListObjectsV2Request()
|
||||
.withBucketName(bucket.name)
|
||||
.withPrefix(prefix.key)
|
||||
}.bracket {
|
||||
request =>
|
||||
for {
|
||||
_ <- logListObjectsStart(bucket, prefix)
|
||||
summaries <- fetch(request)
|
||||
} yield summaries
|
||||
}(_ => logListObjectsFinish(bucket,prefix))
|
||||
.map(os => S3ObjectsData(byHash(os), byKey(os)))
|
||||
for {
|
||||
_ <- logListObjectsStart[M](bucket, prefix)
|
||||
r = new ListObjectsV2Request().withBucketName(bucket.name).withPrefix(prefix.key)
|
||||
summaries <- fetch(r)
|
||||
_ <- logListObjectsFinish[M](bucket, prefix)
|
||||
} yield S3ObjectsData(byHash(summaries), byKey(summaries))
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,31 +1,31 @@
|
|||
package net.kemitix.s3thorp.aws.lib
|
||||
|
||||
import cats.effect.IO
|
||||
import cats.Monad
|
||||
import com.amazonaws.services.s3.AmazonS3
|
||||
import com.amazonaws.services.s3.transfer.TransferManager
|
||||
import net.kemitix.s3thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action}
|
||||
import net.kemitix.s3thorp.aws.api.{S3Action, S3Client, UploadProgressListener}
|
||||
import net.kemitix.s3thorp.domain._
|
||||
|
||||
class ThorpS3Client(amazonS3Client: => AmazonS3,
|
||||
class ThorpS3Client[M[_]: Monad](amazonS3Client: => AmazonS3,
|
||||
amazonS3TransferManager: => TransferManager)
|
||||
extends S3Client {
|
||||
extends S3Client[M] {
|
||||
|
||||
lazy val objectLister = new S3ClientObjectLister(amazonS3Client)
|
||||
lazy val copier = new S3ClientCopier(amazonS3Client)
|
||||
lazy val uploader = new Uploader(amazonS3TransferManager)
|
||||
lazy val deleter = new S3ClientDeleter(amazonS3Client)
|
||||
lazy val objectLister = new S3ClientObjectLister[M](amazonS3Client)
|
||||
lazy val copier = new S3ClientCopier[M](amazonS3Client)
|
||||
lazy val uploader = new Uploader[M](amazonS3TransferManager)
|
||||
lazy val deleter = new S3ClientDeleter[M](amazonS3Client)
|
||||
|
||||
override def listObjects(bucket: Bucket,
|
||||
prefix: RemoteKey)
|
||||
(implicit info: Int => String => IO[Unit]): IO[S3ObjectsData] =
|
||||
(implicit info: Int => String => M[Unit]): M[S3ObjectsData] =
|
||||
objectLister.listObjects(bucket, prefix)
|
||||
|
||||
override def copy(bucket: Bucket,
|
||||
sourceKey: RemoteKey,
|
||||
hash: MD5Hash,
|
||||
targetKey: RemoteKey)
|
||||
(implicit info: Int => String => IO[Unit]): IO[CopyS3Action] =
|
||||
(implicit info: Int => String => M[Unit]): M[CopyS3Action] =
|
||||
copier.copy(bucket, sourceKey,hash, targetKey)
|
||||
|
||||
override def upload(localFile: LocalFile,
|
||||
|
@ -34,13 +34,13 @@ class ThorpS3Client(amazonS3Client: => AmazonS3,
|
|||
multiPartThreshold: Long,
|
||||
tryCount: Int,
|
||||
maxRetries: Int)
|
||||
(implicit info: Int => String => IO[Unit],
|
||||
warn: String => IO[Unit]): IO[S3Action] =
|
||||
(implicit info: Int => String => M[Unit],
|
||||
warn: String => M[Unit]): M[S3Action] =
|
||||
uploader.upload(localFile, bucket, progressListener, multiPartThreshold, 1, maxRetries)
|
||||
|
||||
override def delete(bucket: Bucket,
|
||||
remoteKey: RemoteKey)
|
||||
(implicit info: Int => String => IO[Unit]): IO[DeleteS3Action] =
|
||||
(implicit info: Int => String => M[Unit]): M[DeleteS3Action] =
|
||||
deleter.delete(bucket, remoteKey)
|
||||
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package net.kemitix.s3thorp.aws.lib
|
||||
|
||||
import cats.effect.IO
|
||||
import cats.Monad
|
||||
import cats.implicits._
|
||||
import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener}
|
||||
import com.amazonaws.services.s3.model.PutObjectRequest
|
||||
import com.amazonaws.services.s3.transfer.model.UploadResult
|
||||
|
@ -13,7 +14,7 @@ import net.kemitix.s3thorp.domain.{Bucket, LocalFile, MD5Hash, RemoteKey}
|
|||
|
||||
import scala.util.Try
|
||||
|
||||
class Uploader(transferManager: => AmazonTransferManager) {
|
||||
class Uploader[M[_]: Monad](transferManager: => AmazonTransferManager) {
|
||||
|
||||
def accepts(localFile: LocalFile)
|
||||
(implicit multiPartThreshold: Long): Boolean =
|
||||
|
@ -25,24 +26,24 @@ class Uploader(transferManager: => AmazonTransferManager) {
|
|||
multiPartThreshold: Long,
|
||||
tryCount: Int,
|
||||
maxRetries: Int)
|
||||
(implicit info: Int => String => IO[Unit],
|
||||
warn: String => IO[Unit]): IO[S3Action] =
|
||||
(implicit info: Int => String => M[Unit],
|
||||
warn: String => M[Unit]): M[S3Action] =
|
||||
for {
|
||||
_ <- logMultiPartUploadStart(localFile, tryCount)
|
||||
upload <- upload(localFile, bucket, uploadProgressListener)
|
||||
_ <- logMultiPartUploadFinished(localFile)
|
||||
_ <- logMultiPartUploadStart[M](localFile, tryCount)
|
||||
upload <- transfer(localFile, bucket, uploadProgressListener)
|
||||
_ <- logMultiPartUploadFinished[M](localFile)
|
||||
} yield upload match {
|
||||
case Right(r) => UploadS3Action(RemoteKey(r.getKey), MD5Hash(r.getETag))
|
||||
case Left(e) => ErroredS3Action(localFile.remoteKey, e)
|
||||
}
|
||||
|
||||
private def upload(localFile: LocalFile,
|
||||
bucket: Bucket,
|
||||
uploadProgressListener: UploadProgressListener,
|
||||
): IO[Either[Throwable, UploadResult]] = {
|
||||
val listener = progressListener(uploadProgressListener)
|
||||
private def transfer(localFile: LocalFile,
|
||||
bucket: Bucket,
|
||||
uploadProgressListener: UploadProgressListener,
|
||||
): M[Either[Throwable, UploadResult]] = {
|
||||
val listener: ProgressListener = progressListener(uploadProgressListener)
|
||||
val putObjectRequest = request(localFile, bucket, listener)
|
||||
IO {
|
||||
Monad[M].pure {
|
||||
Try(transferManager.upload(putObjectRequest))
|
||||
.map(_.waitForUploadResult)
|
||||
.toEither
|
||||
|
@ -56,16 +57,18 @@ class Uploader(transferManager: => AmazonTransferManager) {
|
|||
private def progressListener(uploadProgressListener: UploadProgressListener) =
|
||||
new ProgressListener {
|
||||
override def progressChanged(progressEvent: ProgressEvent): Unit = {
|
||||
uploadProgressListener.listener(
|
||||
progressEvent match {
|
||||
case e: ProgressEvent if isTransfer(e) =>
|
||||
TransferEvent(e.getEventType.name)
|
||||
case e: ProgressEvent if isByteTransfer(e) =>
|
||||
ByteTransferEvent(e.getEventType.name)
|
||||
case e: ProgressEvent =>
|
||||
RequestEvent(e.getEventType.name, e.getBytes, e.getBytesTransferred)
|
||||
})
|
||||
.unsafeRunSync // the listener doesn't execute otherwise as it is never returned
|
||||
uploadProgressListener.listener(eventHandler(progressEvent))
|
||||
}
|
||||
|
||||
private def eventHandler(progressEvent: ProgressEvent) = {
|
||||
progressEvent match {
|
||||
case e: ProgressEvent if isTransfer(e) =>
|
||||
TransferEvent(e.getEventType.name)
|
||||
case e: ProgressEvent if isByteTransfer(e) =>
|
||||
ByteTransferEvent(e.getEventType.name)
|
||||
case e: ProgressEvent =>
|
||||
RequestEvent(e.getEventType.name, e.getBytes, e.getBytesTransferred)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,22 +1,22 @@
|
|||
package net.kemitix.s3thorp.aws.lib
|
||||
|
||||
import cats.effect.IO
|
||||
import cats.Monad
|
||||
import net.kemitix.s3thorp.domain.Terminal.clearLine
|
||||
import net.kemitix.s3thorp.domain.SizeTranslation.sizeInEnglish
|
||||
import net.kemitix.s3thorp.domain.LocalFile
|
||||
|
||||
object UploaderLogging {
|
||||
|
||||
def logMultiPartUploadStart(localFile: LocalFile,
|
||||
def logMultiPartUploadStart[M[_]: Monad](localFile: LocalFile,
|
||||
tryCount: Int)
|
||||
(implicit info: Int => String => IO[Unit]): IO[Unit] = {
|
||||
(implicit info: Int => String => M[Unit]): M[Unit] = {
|
||||
val tryMessage = if (tryCount == 1) "" else s"try $tryCount"
|
||||
val size = sizeInEnglish(localFile.file.length)
|
||||
info(1)(s"${clearLine}upload:$tryMessage:$size:${localFile.remoteKey.key}")
|
||||
}
|
||||
|
||||
def logMultiPartUploadFinished(localFile: LocalFile)
|
||||
(implicit info: Int => String => IO[Unit]): IO[Unit] =
|
||||
def logMultiPartUploadFinished[M[_]: Monad](localFile: LocalFile)
|
||||
(implicit info: Int => String => M[Unit]): M[Unit] =
|
||||
info(4)(s"upload:finished: ${localFile.remoteKey.key}")
|
||||
|
||||
}
|
||||
|
|
|
@ -2,7 +2,7 @@ package net.kemitix.s3thorp.aws.lib
|
|||
|
||||
import java.time.Instant
|
||||
|
||||
import cats.effect.IO
|
||||
import cats.Id
|
||||
import com.amazonaws.services.s3.AmazonS3
|
||||
import com.amazonaws.services.s3.model.PutObjectRequest
|
||||
import com.amazonaws.services.s3.transfer.model.UploadResult
|
||||
|
@ -23,8 +23,8 @@ class S3ClientSuite
|
|||
|
||||
private val prefix = RemoteKey("prefix")
|
||||
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
|
||||
implicit private val logInfo: Int => String => IO[Unit] = _ => _ => IO.unit
|
||||
implicit private val logWarn: String => IO[Unit] = _ => IO.unit
|
||||
implicit private val logInfo: Int => String => Id[Unit] = _ => _ => ()
|
||||
implicit private val logWarn: String => Id[Unit] = _ => ()
|
||||
private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _
|
||||
|
||||
describe("getS3Status") {
|
||||
|
@ -44,7 +44,7 @@ class S3ClientSuite
|
|||
keyotherkey.remoteKey -> HashModified(hash, lastModified),
|
||||
keydiffhash.remoteKey -> HashModified(diffhash, lastModified)))
|
||||
|
||||
def invoke(self: S3Client, localFile: LocalFile) = {
|
||||
def invoke(self: S3Client[Id], localFile: LocalFile) = {
|
||||
S3MetaDataEnricher.getS3Status(localFile, s3ObjectsData)
|
||||
}
|
||||
|
||||
|
@ -108,7 +108,7 @@ class S3ClientSuite
|
|||
pending
|
||||
//FIXME: works okay on its own, but fails when run with others
|
||||
val expected = UploadS3Action(remoteKey, rootHash)
|
||||
val result = s3Client.upload(localFile, bucket, progressListener, config.multiPartThreshold, 1, config.maxRetries).unsafeRunSync
|
||||
val result = s3Client.upload(localFile, bucket, progressListener, config.multiPartThreshold, 1, config.maxRetries)
|
||||
assertResult(expected)(result)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package net.kemitix.s3thorp.aws.lib
|
||||
|
||||
import java.time.Instant
|
||||
import java.time.temporal.ChronoUnit
|
||||
import java.util.Date
|
||||
|
||||
import com.amazonaws.services.s3.model.S3ObjectSummary
|
||||
|
@ -13,7 +14,7 @@ class S3ObjectsByHashSuite extends FunSpec {
|
|||
val hash = MD5Hash("hash")
|
||||
val key1 = RemoteKey("key-1")
|
||||
val key2 = RemoteKey("key-2")
|
||||
val lastModified = LastModified(Instant.now)
|
||||
val lastModified = LastModified(Instant.now.truncatedTo(ChronoUnit.MILLIS))
|
||||
val o1 = s3object(hash, key1, lastModified)
|
||||
val o2 = s3object(hash, key2, lastModified)
|
||||
val os = Stream(o1, o2)
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
package net.kemitix.s3thorp.aws.lib
|
||||
|
||||
import java.time.Instant
|
||||
import java.time.temporal.ChronoUnit
|
||||
import java.util.Date
|
||||
|
||||
import cats.effect.IO
|
||||
import cats.Id
|
||||
import com.amazonaws.services.s3.AmazonS3
|
||||
import com.amazonaws.services.s3.model.{ListObjectsV2Request, ListObjectsV2Result, S3ObjectSummary}
|
||||
import com.amazonaws.services.s3.transfer.TransferManager
|
||||
|
@ -20,9 +21,9 @@ class ThorpS3ClientSuite
|
|||
val source = Resource(this, "upload")
|
||||
val prefix = RemoteKey("prefix")
|
||||
implicit val config: Config = Config(Bucket("bucket"), prefix, source = source)
|
||||
implicit val logInfo: Int => String => IO[Unit] = _ => _ => IO.unit
|
||||
implicit val logInfo: Int => String => Id[Unit] = _ => _ => ()
|
||||
|
||||
val lm = LastModified(Instant.now)
|
||||
val lm = LastModified(Instant.now.truncatedTo(ChronoUnit.MILLIS))
|
||||
|
||||
val h1 = MD5Hash("hash1")
|
||||
|
||||
|
@ -47,7 +48,7 @@ class ThorpS3ClientSuite
|
|||
|
||||
val amazonS3 = stub[AmazonS3]
|
||||
val amazonS3TransferManager = stub[TransferManager]
|
||||
val s3Client = new ThorpS3Client(amazonS3, amazonS3TransferManager)
|
||||
val s3Client = new ThorpS3Client[Id](amazonS3, amazonS3TransferManager)
|
||||
|
||||
val myFakeResponse = new ListObjectsV2Result()
|
||||
val summaries = myFakeResponse.getObjectSummaries
|
||||
|
@ -65,7 +66,7 @@ class ThorpS3ClientSuite
|
|||
k1a -> HashModified(h1, lm),
|
||||
k1b -> HashModified(h1, lm),
|
||||
k2 -> HashModified(h2, lm)))
|
||||
val result = s3Client.listObjects(Bucket("bucket"), RemoteKey("prefix")).unsafeRunSync
|
||||
val result = s3Client.listObjects(Bucket("bucket"), RemoteKey("prefix"))
|
||||
assertResult(expected.byHash.keys)(result.byHash.keys)
|
||||
assertResult(expected.byKey.keys)(result.byKey.keys)
|
||||
assertResult(expected)(result)
|
||||
|
|
|
@ -2,7 +2,7 @@ package net.kemitix.s3thorp.aws.lib
|
|||
|
||||
import java.time.Instant
|
||||
|
||||
import cats.effect.IO
|
||||
import cats.Id
|
||||
import com.amazonaws.services.s3.AmazonS3
|
||||
import com.amazonaws.services.s3.transfer._
|
||||
import net.kemitix.s3thorp.aws.api.S3Action.UploadS3Action
|
||||
|
@ -20,8 +20,8 @@ class UploaderSuite
|
|||
private val source = Resource(this, ".")
|
||||
private val prefix = RemoteKey("prefix")
|
||||
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
|
||||
implicit private val logInfo: Int => String => IO[Unit] = _ => _ => IO.unit
|
||||
implicit private val logWarn: String => IO[Unit] = _ => IO.unit
|
||||
implicit private val logInfo: Int => String => Id[Unit] = _ => _ => ()
|
||||
implicit private val logWarn: String => Id[Unit] = _ => ()
|
||||
private val fileToKey = generateKey(config.source, config.prefix) _
|
||||
val lastModified = LastModified(Instant.now())
|
||||
|
||||
|
@ -63,7 +63,7 @@ class UploaderSuite
|
|||
val uploader = new Uploader(amazonS3TransferManager)
|
||||
it("should upload") {
|
||||
val expected = UploadS3Action(returnedKey, returnedHash)
|
||||
val result = uploader.upload(bigFile, config.bucket, progressListener, config.multiPartThreshold, 1, config.maxRetries).unsafeRunSync
|
||||
val result = uploader.upload(bigFile, config.bucket, progressListener, config.multiPartThreshold, 1, config.maxRetries)
|
||||
assertResult(expected)(result)
|
||||
}
|
||||
}
|
||||
|
|
16
build.sbt
16
build.sbt
|
@ -27,6 +27,19 @@ val awsSdkDependencies = Seq(
|
|||
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % "2.9.9"
|
||||
)
|
||||
)
|
||||
val catsSettings = Seq (
|
||||
libraryDependencies ++= Seq(
|
||||
"org.typelevel" %% "cats-core" % "1.6.1"
|
||||
),
|
||||
// recommended for cats-effects
|
||||
scalacOptions ++= Seq(
|
||||
"-feature",
|
||||
"-deprecation",
|
||||
"-unchecked",
|
||||
"-language:postfixOps",
|
||||
"-language:higherKinds",
|
||||
"-Ypartial-unification")
|
||||
)
|
||||
val catsEffectsSettings = Seq(
|
||||
libraryDependencies ++= Seq(
|
||||
"org.typelevel" %% "cats-effect" % "1.3.1"
|
||||
|
@ -47,6 +60,7 @@ lazy val cli = (project in file("cli"))
|
|||
.settings(commonSettings)
|
||||
.settings(mainClass in assembly := Some("net.kemitix.s3thorp.cli.Main"))
|
||||
.settings(applicationSettings)
|
||||
.settings(catsEffectsSettings)
|
||||
.aggregate(`aws-lib`, core, `aws-api`, domain)
|
||||
.settings(commandLineParsing)
|
||||
.dependsOn(`aws-lib`)
|
||||
|
@ -67,7 +81,7 @@ lazy val core = (project in file("core"))
|
|||
lazy val `aws-api` = (project in file("aws-api"))
|
||||
.settings(commonSettings)
|
||||
.settings(assemblyJarName in assembly := "aws-api.jar")
|
||||
.settings(catsEffectsSettings)
|
||||
.settings(catsSettings)
|
||||
.dependsOn(domain)
|
||||
|
||||
lazy val domain = (project in file("domain"))
|
||||
|
|
|
@ -1,15 +1,16 @@
|
|||
package net.kemitix.s3thorp.cli
|
||||
|
||||
import cats.effect.IO
|
||||
import cats.Monad
|
||||
|
||||
class Logger(verbosity: Int) {
|
||||
|
||||
def info(level: Int)(message: String): IO[Unit] =
|
||||
if (verbosity >= level) IO(println(s"[INFO:$level] $message"))
|
||||
else IO.unit
|
||||
class Logger[M[_]: Monad](verbosity: Int) {
|
||||
|
||||
def warn(message: String): IO[Unit] = IO(println(s"[ WARN] $message"))
|
||||
def info(level: Int)(message: String): M[Unit] =
|
||||
if (verbosity >= level) Monad[M].pure(println(s"[INFO:$level] $message"))
|
||||
else Monad[M].unit
|
||||
|
||||
def error(message: String): IO[Unit] = IO(println(s"[ ERROR] $message"))
|
||||
def warn(message: String): M[Unit] = Monad[M].pure(println(s"[ WARN] $message"))
|
||||
|
||||
def error(message: String): M[Unit] = Monad[M].pure(println(s"[ ERROR] $message"))
|
||||
|
||||
}
|
||||
|
|
|
@ -18,20 +18,24 @@ object Main extends IOApp {
|
|||
def program(args: List[String]): IO[ExitCode] =
|
||||
for {
|
||||
config <- ParseArgs(args, defaultConfig)
|
||||
logger = new Logger(config.verbose)
|
||||
logger = new Logger[IO](config.verbose)
|
||||
info = (l: Int) => (m: String) => logger.info(l)(m)
|
||||
md5HashGenerator = (file: File) => md5File(file)(info)
|
||||
_ <- logger.info(1)("S3Thorp - hashed sync for s3")
|
||||
_ <- Sync.run(
|
||||
_ <- Sync.run[IO](
|
||||
config,
|
||||
S3ClientBuilder.defaultClient,
|
||||
md5HashGenerator,
|
||||
l => i => logger.info(l)(i),
|
||||
w => logger.warn(w),
|
||||
e => logger.error(e))(config)
|
||||
hashGenerator(info),
|
||||
info,
|
||||
w => logger.warn(w))
|
||||
} yield ExitCode.Success
|
||||
|
||||
private def hashGenerator(info: Int => String => IO[Unit]) = {
|
||||
implicit val logInfo: Int => String => IO[Unit] = info
|
||||
file: File => md5File[IO](file)
|
||||
}
|
||||
|
||||
override def run(args: List[String]): IO[ExitCode] = {
|
||||
val logger = new Logger(1)
|
||||
val logger = new Logger[IO](1)
|
||||
program(args)
|
||||
.guaranteeCase {
|
||||
case Canceled => logger.warn("Interrupted")
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package net.kemitix.s3thorp.core
|
||||
|
||||
import cats.effect.IO
|
||||
import cats.Monad
|
||||
import cats.implicits._
|
||||
import net.kemitix.s3thorp.aws.api.S3Action.DoNothingS3Action
|
||||
import net.kemitix.s3thorp.aws.api.{S3Action, S3Client, UploadProgressListener}
|
||||
import net.kemitix.s3thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
|
||||
|
@ -8,10 +9,10 @@ import net.kemitix.s3thorp.domain.Config
|
|||
|
||||
object ActionSubmitter {
|
||||
|
||||
def submitAction(s3Client: S3Client, action: Action)
|
||||
(implicit c: Config,
|
||||
info: Int => String => IO[Unit],
|
||||
warn: String => IO[Unit]): Stream[IO[S3Action]] = {
|
||||
def submitAction[M[_]: Monad](s3Client: S3Client[M], action: Action)
|
||||
(implicit c: Config,
|
||||
info: Int => String => M[Unit],
|
||||
warn: String => M[Unit]): Stream[M[S3Action]] = {
|
||||
Stream(
|
||||
action match {
|
||||
case ToUpload(bucket, localFile) =>
|
||||
|
@ -31,7 +32,7 @@ object ActionSubmitter {
|
|||
action <- s3Client.delete(bucket, remoteKey)
|
||||
} yield action
|
||||
case DoNothing(bucket, remoteKey) =>
|
||||
IO.pure(DoNothingS3Action(remoteKey))
|
||||
Monad[M].pure(DoNothingS3Action(remoteKey))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,23 +3,24 @@ package net.kemitix.s3thorp.core
|
|||
import java.io.File
|
||||
import java.nio.file.Path
|
||||
|
||||
import cats.effect.IO
|
||||
import cats.Monad
|
||||
import cats.implicits._
|
||||
import net.kemitix.s3thorp.core.KeyGenerator.generateKey
|
||||
import net.kemitix.s3thorp.domain.{Config, Filter, LocalFile, MD5Hash}
|
||||
|
||||
object LocalFileStream {
|
||||
|
||||
def findFiles(file: File,
|
||||
md5HashGenerator: File => IO[MD5Hash],
|
||||
info: Int => String => IO[Unit])
|
||||
(implicit c: Config): IO[Stream[LocalFile]] = {
|
||||
def findFiles[M[_]: Monad](file: File,
|
||||
md5HashGenerator: File => M[MD5Hash],
|
||||
info: Int => String => M[Unit])
|
||||
(implicit c: Config): M[Stream[LocalFile]] = {
|
||||
|
||||
val filters: Path => Boolean = Filter.isIncluded(c.filters)
|
||||
|
||||
def loop(file: File): IO[Stream[LocalFile]] = {
|
||||
def loop(file: File): M[Stream[LocalFile]] = {
|
||||
|
||||
def dirPaths(file: File): IO[Stream[File]] =
|
||||
IO {
|
||||
def dirPaths(file: File): M[Stream[File]] =
|
||||
Monad[M].pure {
|
||||
Option(file.listFiles)
|
||||
.getOrElse(throw new IllegalArgumentException(s"Directory not found $file"))
|
||||
}
|
||||
|
@ -27,23 +28,23 @@ object LocalFileStream {
|
|||
Stream(fs: _*)
|
||||
.filter(f => filters(f.toPath)))
|
||||
|
||||
def recurseIntoSubDirectories(file: File)(implicit c: Config): IO[Stream[LocalFile]] =
|
||||
def recurseIntoSubDirectories(file: File)(implicit c: Config): M[Stream[LocalFile]] =
|
||||
file match {
|
||||
case f if f.isDirectory => loop(file)
|
||||
case _ => for(hash <- md5HashGenerator(file))
|
||||
yield Stream(LocalFile(file, c.source, hash, generateKey(c.source, c.prefix)))
|
||||
}
|
||||
|
||||
def recurse(fs: Stream[File]): IO[Stream[LocalFile]] =
|
||||
fs.foldLeft(IO.pure(Stream.empty[LocalFile]))((acc, f) =>
|
||||
def recurse(fs: Stream[File]): M[Stream[LocalFile]] =
|
||||
fs.foldLeft(Monad[M].pure(Stream.empty[LocalFile]))((acc, f) =>
|
||||
recurseIntoSubDirectories(f)
|
||||
.flatMap(lfs => acc.map(s => s ++ lfs)))
|
||||
|
||||
for {
|
||||
_ <- IO(info(2)(s"- Entering: $file"))
|
||||
_ <- info(2)(s"- Entering: $file")
|
||||
fs <- dirPaths(file)
|
||||
lfs <- recurse(fs)
|
||||
_ <- IO(info(5)(s"- Leaving : $file"))
|
||||
_ <- info(5)(s"- Leaving : $file")
|
||||
} yield lfs
|
||||
}
|
||||
|
||||
|
|
|
@ -3,23 +3,21 @@ package net.kemitix.s3thorp.core
|
|||
import java.io.{File, FileInputStream}
|
||||
import java.security.MessageDigest
|
||||
|
||||
import cats.effect.IO
|
||||
import cats.Monad
|
||||
import cats.implicits._
|
||||
import net.kemitix.s3thorp.domain.MD5Hash
|
||||
|
||||
import scala.collection.immutable.NumericRange
|
||||
|
||||
object MD5HashGenerator {
|
||||
|
||||
def md5File(file: File)
|
||||
(implicit info: Int => String => IO[Unit]): IO[MD5Hash] = {
|
||||
def md5File[M[_]: Monad](file: File)
|
||||
(implicit info: Int => String => M[Unit]): M[MD5Hash] = {
|
||||
|
||||
val maxBufferSize = 8048
|
||||
|
||||
val defaultBuffer = new Array[Byte](maxBufferSize)
|
||||
|
||||
def openFile = IO(new FileInputStream(file))
|
||||
|
||||
def closeFile = {fis: FileInputStream => IO(fis.close())}
|
||||
def openFile = Monad[M].pure(new FileInputStream(file))
|
||||
def closeFile = {fis: FileInputStream => Monad[M].pure(fis.close())}
|
||||
|
||||
def nextChunkSize(currentOffset: Long) = {
|
||||
// a value between 1 and maxBufferSize
|
||||
|
@ -31,25 +29,29 @@ object MD5HashGenerator {
|
|||
def readToBuffer(fis: FileInputStream,
|
||||
currentOffset: Long) = {
|
||||
val buffer =
|
||||
if (nextChunkSize(currentOffset) < maxBufferSize)
|
||||
new Array[Byte](nextChunkSize(currentOffset))
|
||||
else
|
||||
defaultBuffer
|
||||
if (nextChunkSize(currentOffset) < maxBufferSize) new Array[Byte](nextChunkSize(currentOffset))
|
||||
else defaultBuffer
|
||||
fis read buffer
|
||||
buffer
|
||||
}
|
||||
|
||||
def readFile: IO[String] = openFile
|
||||
.bracket(fis => IO {
|
||||
def digestFile(fis: FileInputStream) =
|
||||
Monad[M].pure {
|
||||
val md5 = MessageDigest getInstance "MD5"
|
||||
NumericRange(0, file.length, maxBufferSize)
|
||||
.foreach{currentOffset => {
|
||||
val buffer = readToBuffer(fis, currentOffset)
|
||||
md5 update buffer
|
||||
}
|
||||
}
|
||||
.foreach { currentOffset => {
|
||||
val buffer = readToBuffer(fis, currentOffset)
|
||||
md5 update buffer
|
||||
}}
|
||||
(md5.digest map ("%02x" format _)).mkString
|
||||
})(closeFile)
|
||||
}
|
||||
|
||||
def readFile: M[String] =
|
||||
for {
|
||||
fis <- openFile
|
||||
md5 <- digestFile(fis)
|
||||
_ <- closeFile(fis)
|
||||
} yield md5
|
||||
|
||||
for {
|
||||
_ <- info(5)(s"md5:reading:size ${file.length}:$file")
|
||||
|
|
|
@ -2,7 +2,7 @@ package net.kemitix.s3thorp.core
|
|||
|
||||
import java.io.File
|
||||
|
||||
import cats.effect.IO
|
||||
import cats.Monad
|
||||
import cats.implicits._
|
||||
import net.kemitix.s3thorp.aws.api.{S3Action, S3Client}
|
||||
import net.kemitix.s3thorp.core.Action.ToDelete
|
||||
|
@ -11,33 +11,47 @@ import net.kemitix.s3thorp.core.ActionSubmitter.submitAction
|
|||
import net.kemitix.s3thorp.core.LocalFileStream.findFiles
|
||||
import net.kemitix.s3thorp.core.S3MetaDataEnricher.getMetadata
|
||||
import net.kemitix.s3thorp.core.SyncLogging.{logFileScan, logRunFinished, logRunStart}
|
||||
import net.kemitix.s3thorp.domain.{Config, MD5Hash, S3ObjectsData}
|
||||
import net.kemitix.s3thorp.domain.{Config, LocalFile, MD5Hash, S3MetaData, S3ObjectsData}
|
||||
|
||||
object Sync {
|
||||
|
||||
def run(s3Client: S3Client,
|
||||
md5HashGenerator: File => IO[MD5Hash],
|
||||
info: Int => String => IO[Unit],
|
||||
warn: String => IO[Unit],
|
||||
error: String => IO[Unit])
|
||||
(implicit c: Config): IO[Unit] = {
|
||||
def run[M[_]: Monad](config: Config,
|
||||
s3Client: S3Client[M],
|
||||
md5HashGenerator: File => M[MD5Hash],
|
||||
info: Int => String => M[Unit],
|
||||
warn: String => M[Unit]): M[Unit] = {
|
||||
|
||||
def copyUploadActions(s3Data: S3ObjectsData): IO[Stream[S3Action]] =
|
||||
implicit val c: Config = config
|
||||
implicit val logInfo: Int => String => M[Unit] = info
|
||||
implicit val logWarn: String => M[Unit] = warn
|
||||
|
||||
def metaData(s3Data: S3ObjectsData, sFiles: Stream[LocalFile]) =
|
||||
Monad[M].pure(sFiles.map(file => getMetadata(file, s3Data)))
|
||||
|
||||
def actions(sData: Stream[S3MetaData]) =
|
||||
Monad[M].pure(sData.flatMap(s3MetaData => createActions(s3MetaData)))
|
||||
|
||||
def submit(sActions: Stream[Action]) =
|
||||
Monad[M].pure(sActions.flatMap(action => submitAction[M](s3Client, action)))
|
||||
|
||||
def copyUploadActions(s3Data: S3ObjectsData): M[Stream[S3Action]] =
|
||||
(for {
|
||||
sFiles <- findFiles(c.source, md5HashGenerator, info)
|
||||
sData <- IO(sFiles.map(file => getMetadata(file, s3Data)))
|
||||
sActions <- IO(sData.flatMap(s3MetaData => createActions(s3MetaData)))
|
||||
sS3Actions <- IO(sActions.flatMap(action => submitAction(s3Client, action)(c, info, warn)))
|
||||
} yield sS3Actions.sequence)
|
||||
files <- findFiles(c.source, md5HashGenerator, info)
|
||||
metaData <- metaData(s3Data, files)
|
||||
actions <- actions(metaData)
|
||||
s3Actions <- submit(actions)
|
||||
} yield s3Actions.sequence)
|
||||
.flatten
|
||||
.map(streamS3Actions => streamS3Actions.sorted)
|
||||
|
||||
def deleteActions(s3ObjectsData: S3ObjectsData): IO[Stream[S3Action]] =
|
||||
def deleteActions(s3ObjectsData: S3ObjectsData): M[Stream[S3Action]] =
|
||||
(for {
|
||||
key <- s3ObjectsData.byKey.keys
|
||||
if key.isMissingLocally(c.source, c.prefix)
|
||||
ioDelAction <- submitAction(s3Client, ToDelete(c.bucket, key))(c, info, warn)
|
||||
} yield ioDelAction).toStream.sequence
|
||||
ioDelAction <- submitAction[M](s3Client, ToDelete(c.bucket, key))
|
||||
} yield ioDelAction)
|
||||
.toStream
|
||||
.sequence
|
||||
|
||||
for {
|
||||
_ <- logRunStart(info)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package net.kemitix.s3thorp.core
|
||||
|
||||
import cats.effect.IO
|
||||
import cats.Monad
|
||||
import cats.implicits._
|
||||
import net.kemitix.s3thorp.aws.api.S3Action
|
||||
import net.kemitix.s3thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action, ErroredS3Action, UploadS3Action}
|
||||
import net.kemitix.s3thorp.domain.Config
|
||||
|
@ -8,25 +9,25 @@ import net.kemitix.s3thorp.domain.Config
|
|||
// Logging for the Sync class
|
||||
object SyncLogging {
|
||||
|
||||
def logRunStart(info: Int => String => IO[Unit])
|
||||
(implicit c: Config): IO[Unit] =
|
||||
def logRunStart[M[_]: Monad](info: Int => String => M[Unit])
|
||||
(implicit c: Config): M[Unit] =
|
||||
info(1)(s"Bucket: ${c.bucket.name}, Prefix: ${c.prefix.key}, Source: ${c.source}, ")
|
||||
|
||||
def logFileScan(info: Int => String => IO[Unit])
|
||||
(implicit c: Config): IO[Unit] =
|
||||
def logFileScan[M[_]: Monad](info: Int => String => M[Unit])
|
||||
(implicit c: Config): M[Unit] =
|
||||
info(1)(s"Scanning local files: ${c.source}...")
|
||||
|
||||
def logRunFinished(actions: Stream[S3Action],
|
||||
info: Int => String => IO[Unit])
|
||||
(implicit c: Config): IO[Unit] =
|
||||
def logRunFinished[M[_]: Monad](actions: Stream[S3Action],
|
||||
info: Int => String => M[Unit])
|
||||
(implicit c: Config): M[Unit] = {
|
||||
val counters = actions.foldLeft(Counters())(countActivities)
|
||||
for {
|
||||
_ <- IO.unit
|
||||
counters = actions.foldLeft(Counters())(countActivities)
|
||||
_ <- info(1)(s"Uploaded ${counters.uploaded} files")
|
||||
_ <- info(1)(s"Copied ${counters.copied} files")
|
||||
_ <- info(1)(s"Deleted ${counters.deleted} files")
|
||||
_ <- info(1)(s"Errors ${counters.errors}")
|
||||
} yield ()
|
||||
}
|
||||
|
||||
private def countActivities(implicit c: Config): (Counters, S3Action) => Counters =
|
||||
(counters: Counters, s3Action: S3Action) => {
|
||||
|
|
|
@ -2,21 +2,21 @@ package net.kemitix.s3thorp.core
|
|||
|
||||
import java.io.File
|
||||
|
||||
import cats.effect.IO
|
||||
import cats.Id
|
||||
import net.kemitix.s3thorp.domain.{Config, LocalFile, MD5Hash}
|
||||
import org.scalatest.FunSpec
|
||||
|
||||
class LocalFileStreamSuite extends FunSpec {
|
||||
|
||||
val uploadResource = Resource(this, "upload")
|
||||
val config: Config = Config(source = uploadResource)
|
||||
implicit private val logInfo: Int => String => IO[Unit] = l => i => IO.unit
|
||||
val md5HashGenerator: File => IO[MD5Hash] = file => MD5HashGenerator.md5File(file)
|
||||
implicit val config: Config = Config(source = uploadResource)
|
||||
implicit private val logInfo: Int => String => Id[Unit] = l => i => ()
|
||||
val md5HashGenerator: File => Id[MD5Hash] = file => MD5HashGenerator.md5File[Id](file)
|
||||
|
||||
describe("findFiles") {
|
||||
it("should find all files") {
|
||||
val result: Set[String] =
|
||||
LocalFileStream.findFiles(uploadResource, md5HashGenerator, logInfo)(config).unsafeRunSync.toSet
|
||||
LocalFileStream.findFiles[Id](uploadResource, md5HashGenerator, logInfo).toSet
|
||||
.map { x: LocalFile => x.relative.toString }
|
||||
assertResult(Set("subdir/leaf-file", "root-file"))(result)
|
||||
}
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
package net.kemitix.s3thorp.core
|
||||
|
||||
import java.nio.file.Files
|
||||
|
||||
import cats.effect.IO
|
||||
import cats.Id
|
||||
import net.kemitix.s3thorp.core.MD5HashData.rootHash
|
||||
import net.kemitix.s3thorp.domain.{Bucket, Config, MD5Hash, RemoteKey}
|
||||
import org.scalatest.FunSpec
|
||||
|
@ -12,12 +10,12 @@ class MD5HashGeneratorTest extends FunSpec {
|
|||
private val source = Resource(this, "upload")
|
||||
private val prefix = RemoteKey("prefix")
|
||||
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
|
||||
implicit private val logInfo: Int => String => IO[Unit] = l => i => IO.unit
|
||||
implicit private val logInfo: Int => String => Id[Unit] = l => i => ()
|
||||
|
||||
describe("read a small file (smaller than buffer)") {
|
||||
val file = Resource(this, "upload/root-file")
|
||||
it("should generate the correct hash") {
|
||||
val result = MD5HashGenerator.md5File(file).unsafeRunSync
|
||||
val result = MD5HashGenerator.md5File[Id](file)
|
||||
assertResult(rootHash)(result)
|
||||
}
|
||||
}
|
||||
|
@ -25,7 +23,7 @@ class MD5HashGeneratorTest extends FunSpec {
|
|||
val file = Resource(this, "big-file")
|
||||
it("should generate the correct hash") {
|
||||
val expected = MD5Hash("b1ab1f7680138e6db7309200584e35d8")
|
||||
val result = MD5HashGenerator.md5File(file).unsafeRunSync
|
||||
val result = MD5HashGenerator.md5File[Id](file)
|
||||
assertResult(expected)(result)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ package net.kemitix.s3thorp.core
|
|||
import java.io.File
|
||||
import java.time.Instant
|
||||
|
||||
import cats.effect.IO
|
||||
import cats.Id
|
||||
import net.kemitix.s3thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action, UploadS3Action}
|
||||
import net.kemitix.s3thorp.aws.api.{S3Client, UploadProgressListener}
|
||||
import net.kemitix.s3thorp.core.MD5HashData.{leafHash, rootHash}
|
||||
|
@ -16,32 +16,29 @@ class SyncSuite
|
|||
|
||||
private val source = Resource(this, "upload")
|
||||
private val prefix = RemoteKey("prefix")
|
||||
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
|
||||
implicit private val logInfo: Int => String => IO[Unit] = _ => _ => IO.unit
|
||||
implicit private val logWarn: String => IO[Unit] = _ => IO.unit
|
||||
private def logError: String => IO[Unit] = _ => IO.unit
|
||||
val config = Config(Bucket("bucket"), prefix, source = source)
|
||||
implicit private val logInfo: Int => String => Id[Unit] = _ => _ => ()
|
||||
implicit private val logWarn: String => Id[Unit] = _ => ()
|
||||
private val lastModified = LastModified(Instant.now)
|
||||
private val fileToKey: File => RemoteKey = KeyGenerator.generateKey(source, prefix)
|
||||
private val rootFile = LocalFile.resolve("root-file", rootHash, source, fileToKey)
|
||||
private val leafFile = LocalFile.resolve("subdir/leaf-file", leafHash, source, fileToKey)
|
||||
|
||||
private val md5HashGenerator = MD5HashGenerator.md5File(_)
|
||||
private val md5HashGenerator = MD5HashGenerator.md5File[Id](_)
|
||||
|
||||
def putObjectRequest(bucket: Bucket, remoteKey: RemoteKey, localFile: LocalFile) = {
|
||||
def putObjectRequest(bucket: Bucket, remoteKey: RemoteKey, localFile: LocalFile): (String, String, File) =
|
||||
(bucket.name, remoteKey.key, localFile.file)
|
||||
}
|
||||
|
||||
describe("run") {
|
||||
val testBucket = Bucket("bucket")
|
||||
// source contains the files root-file and subdir/leaf-file
|
||||
val config = Config(Bucket("bucket"), RemoteKey("prefix"), source = source)
|
||||
val rootRemoteKey = RemoteKey("prefix/root-file")
|
||||
val leafRemoteKey = RemoteKey("prefix/subdir/leaf-file")
|
||||
describe("when all files should be uploaded") {
|
||||
val s3Client = new RecordingClient(testBucket, S3ObjectsData(
|
||||
byHash = Map(),
|
||||
byKey = Map()))
|
||||
Sync.run(s3Client, md5HashGenerator, logInfo, logWarn, logError)(config).unsafeRunSync
|
||||
Sync.run(config, s3Client, md5HashGenerator, logInfo, logWarn)
|
||||
it("uploads all files") {
|
||||
val expectedUploads = Map(
|
||||
"subdir/leaf-file" -> leafRemoteKey,
|
||||
|
@ -67,7 +64,7 @@ class SyncSuite
|
|||
RemoteKey("prefix/root-file") -> HashModified(rootHash, lastModified),
|
||||
RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified)))
|
||||
val s3Client = new RecordingClient(testBucket, s3ObjectsData)
|
||||
Sync.run(s3Client, md5HashGenerator, logInfo, logWarn, logError)(config).unsafeRunSync
|
||||
Sync.run(config, s3Client, md5HashGenerator, logInfo, logWarn)
|
||||
it("uploads nothing") {
|
||||
val expectedUploads = Map()
|
||||
assertResult(expectedUploads)(s3Client.uploadsRecord)
|
||||
|
@ -91,7 +88,7 @@ class SyncSuite
|
|||
RemoteKey("prefix/root-file-old") -> HashModified(rootHash, lastModified),
|
||||
RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified)))
|
||||
val s3Client = new RecordingClient(testBucket, s3ObjectsData)
|
||||
Sync.run(s3Client, md5HashGenerator, logInfo, logWarn, logError)(config).unsafeRunSync
|
||||
Sync.run(config, s3Client, md5HashGenerator, logInfo, logWarn)
|
||||
it("uploads nothing") {
|
||||
val expectedUploads = Map()
|
||||
assertResult(expectedUploads)(s3Client.uploadsRecord)
|
||||
|
@ -119,17 +116,17 @@ class SyncSuite
|
|||
byKey = Map(
|
||||
deletedKey -> HashModified(deletedHash, lastModified)))
|
||||
val s3Client = new RecordingClient(testBucket, s3ObjectsData)
|
||||
Sync.run(s3Client, md5HashGenerator, logInfo, logWarn, logError)(config).unsafeRunSync
|
||||
Sync.run(config, s3Client, md5HashGenerator, logInfo, logWarn)
|
||||
it("deleted key") {
|
||||
val expectedDeletions = Set(deletedKey)
|
||||
assertResult(expectedDeletions)(s3Client.deletionsRecord)
|
||||
}
|
||||
}
|
||||
describe("when a file is excluded") {
|
||||
val configWithExclusion = config.copy(filters = List(Exclude("leaf")))
|
||||
val config: Config = Config(Bucket("bucket"), prefix, source = source, filters = List(Exclude("leaf")))
|
||||
val s3ObjectsData = S3ObjectsData(Map(), Map())
|
||||
val s3Client = new RecordingClient(testBucket, s3ObjectsData)
|
||||
Sync.run(s3Client, md5HashGenerator, logInfo, logWarn, logError)(configWithExclusion).unsafeRunSync
|
||||
Sync.run(config, s3Client, md5HashGenerator, logInfo, logWarn)
|
||||
it("is not uploaded") {
|
||||
val expectedUploads = Map(
|
||||
"root-file" -> rootRemoteKey
|
||||
|
@ -141,7 +138,7 @@ class SyncSuite
|
|||
|
||||
class RecordingClient(testBucket: Bucket,
|
||||
s3ObjectsData: S3ObjectsData)
|
||||
extends S3Client {
|
||||
extends S3Client[Id] {
|
||||
|
||||
var uploadsRecord: Map[String, RemoteKey] = Map()
|
||||
var copiesRecord: Map[RemoteKey, RemoteKey] = Map()
|
||||
|
@ -149,8 +146,8 @@ class SyncSuite
|
|||
|
||||
override def listObjects(bucket: Bucket,
|
||||
prefix: RemoteKey)
|
||||
(implicit info: Int => String => IO[Unit]) =
|
||||
IO.pure(s3ObjectsData)
|
||||
(implicit info: Int => String => Id[Unit]): S3ObjectsData =
|
||||
s3ObjectsData
|
||||
|
||||
override def upload(localFile: LocalFile,
|
||||
bucket: Bucket,
|
||||
|
@ -158,32 +155,29 @@ class SyncSuite
|
|||
multiPartThreshold: Long,
|
||||
tryCount: Int,
|
||||
maxRetries: Int)
|
||||
(implicit info: Int => String => IO[Unit],
|
||||
warn: String => IO[Unit]) =
|
||||
IO {
|
||||
if (bucket == testBucket)
|
||||
uploadsRecord += (localFile.relative.toString -> localFile.remoteKey)
|
||||
UploadS3Action(localFile.remoteKey, MD5Hash("some hash value"))
|
||||
}
|
||||
(implicit info: Int => String => Id[Unit],
|
||||
warn: String => Id[Unit]): UploadS3Action = {
|
||||
if (bucket == testBucket)
|
||||
uploadsRecord += (localFile.relative.toString -> localFile.remoteKey)
|
||||
UploadS3Action(localFile.remoteKey, MD5Hash("some hash value"))
|
||||
}
|
||||
|
||||
override def copy(bucket: Bucket,
|
||||
sourceKey: RemoteKey,
|
||||
hash: MD5Hash,
|
||||
targetKey: RemoteKey
|
||||
)(implicit info: Int => String => IO[Unit]) =
|
||||
IO {
|
||||
if (bucket == testBucket)
|
||||
copiesRecord += (sourceKey -> targetKey)
|
||||
CopyS3Action(targetKey)
|
||||
}
|
||||
)(implicit info: Int => String => Id[Unit]): CopyS3Action = {
|
||||
if (bucket == testBucket)
|
||||
copiesRecord += (sourceKey -> targetKey)
|
||||
CopyS3Action(targetKey)
|
||||
}
|
||||
|
||||
override def delete(bucket: Bucket,
|
||||
remoteKey: RemoteKey
|
||||
)(implicit info: Int => String => IO[Unit]) =
|
||||
IO {
|
||||
if (bucket == testBucket)
|
||||
deletionsRecord += remoteKey
|
||||
DeleteS3Action(remoteKey)
|
||||
}
|
||||
)(implicit info: Int => String => Id[Unit]): DeleteS3Action = {
|
||||
if (bucket == testBucket)
|
||||
deletionsRecord += remoteKey
|
||||
DeleteS3Action(remoteKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue