Add support for per directory configuration files (#71)
* [core] rename the config supplied from CLI as such This distinguishes it as config supplied from the command line. * [core] add ConfigOption * [core] ConfigOption can update a Config * [core] new validator for config * [domain] Config doesn't validate source any more * [cli] PrintLogger default to not print debug messages * [cli] Use ConfigOptions and new ConfigValidator * [sbt] Use common settings for project root * [domain] RemoteKey can handle when prefix is empty * [cli] remove banner * [domain] Logger can create version of itself with debug flipped * [core] Build and validate Config within core module This means that the `thorp-lib` module can validate its input from a list of `ConfigOption`s. * [core] refactor ConfigurationBuilder * [core] refactor ConfigurationBuilder * [sbt] starting back from tagless-final by using IO where needed * [core] Add ParseConfigFile * [sbt] Make cats-effect available from the domain * Roll back from tagless-final to just use cat-effect's IO * [core] extract ParseConfigLines * [core] ConfigurationBuilder rename apply as buildConfig * [core] ParseConfig[Files,Lines] rename apply methods * [core] refactor ParseConfigFile and add tests * [core] Sync fix call to run * [core] SyncSuite update tests to use ConfigOptions
This commit is contained in:
parent
afd28e7100
commit
910688ee32
42 changed files with 602 additions and 276 deletions
|
@ -1,13 +1,14 @@
|
|||
package net.kemitix.thorp.aws.api
|
||||
|
||||
import cats.effect.IO
|
||||
import net.kemitix.thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action}
|
||||
import net.kemitix.thorp.domain._
|
||||
|
||||
trait S3Client[M[_]] {
|
||||
trait S3Client {
|
||||
|
||||
def listObjects(bucket: Bucket,
|
||||
prefix: RemoteKey
|
||||
)(implicit logger: Logger[M]): M[S3ObjectsData]
|
||||
)(implicit logger: Logger): IO[S3ObjectsData]
|
||||
|
||||
def upload(localFile: LocalFile,
|
||||
bucket: Bucket,
|
||||
|
@ -15,16 +16,16 @@ trait S3Client[M[_]] {
|
|||
multiPartThreshold: Long,
|
||||
tryCount: Int,
|
||||
maxRetries: Int)
|
||||
(implicit logger: Logger[M]): M[S3Action]
|
||||
(implicit logger: Logger): IO[S3Action]
|
||||
|
||||
def copy(bucket: Bucket,
|
||||
sourceKey: RemoteKey,
|
||||
hash: MD5Hash,
|
||||
targetKey: RemoteKey
|
||||
)(implicit logger: Logger[M]): M[CopyS3Action]
|
||||
)(implicit logger: Logger): IO[CopyS3Action]
|
||||
|
||||
def delete(bucket: Bucket,
|
||||
remoteKey: RemoteKey
|
||||
)(implicit logger: Logger[M]): M[DeleteS3Action]
|
||||
)(implicit logger: Logger): IO[DeleteS3Action]
|
||||
|
||||
}
|
||||
|
|
|
@ -1,17 +1,16 @@
|
|||
package net.kemitix.thorp.aws.lib
|
||||
|
||||
import cats.Monad
|
||||
import com.amazonaws.services.s3.transfer.{TransferManager, TransferManagerBuilder}
|
||||
import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder}
|
||||
import net.kemitix.thorp.aws.api.S3Client
|
||||
|
||||
object S3ClientBuilder {
|
||||
|
||||
def createClient[M[_]: Monad](amazonS3Client: AmazonS3,
|
||||
amazonS3TransferManager: TransferManager): S3Client[M] =
|
||||
def createClient(amazonS3Client: AmazonS3,
|
||||
amazonS3TransferManager: TransferManager): S3Client =
|
||||
new ThorpS3Client(amazonS3Client, amazonS3TransferManager)
|
||||
|
||||
def defaultClient[M[_]: Monad]: S3Client[M] =
|
||||
def defaultClient: S3Client =
|
||||
createClient(AmazonS3ClientBuilder.defaultClient, TransferManagerBuilder.defaultTransferManager)
|
||||
|
||||
}
|
||||
|
|
|
@ -1,24 +1,23 @@
|
|||
package net.kemitix.thorp.aws.lib
|
||||
|
||||
import cats.Monad
|
||||
import cats.implicits._
|
||||
import cats.effect.IO
|
||||
import com.amazonaws.services.s3.AmazonS3
|
||||
import com.amazonaws.services.s3.model.CopyObjectRequest
|
||||
import net.kemitix.thorp.aws.api.S3Action.CopyS3Action
|
||||
import net.kemitix.thorp.aws.lib.S3ClientLogging.{logCopyFinish, logCopyStart}
|
||||
import net.kemitix.thorp.domain.{Bucket, Logger, MD5Hash, RemoteKey}
|
||||
|
||||
class S3ClientCopier[M[_]: Monad](amazonS3: AmazonS3) {
|
||||
class S3ClientCopier(amazonS3: AmazonS3) {
|
||||
|
||||
def copy(bucket: Bucket,
|
||||
sourceKey: RemoteKey,
|
||||
hash: MD5Hash,
|
||||
targetKey: RemoteKey)
|
||||
(implicit logger: Logger[M]): M[CopyS3Action] =
|
||||
(implicit logger: Logger): IO[CopyS3Action] =
|
||||
for {
|
||||
_ <- logCopyStart[M](bucket, sourceKey, targetKey)
|
||||
_ <- logCopyStart(bucket, sourceKey, targetKey)
|
||||
_ <- copyObject(bucket, sourceKey, hash, targetKey)
|
||||
_ <- logCopyFinish[M](bucket, sourceKey,targetKey)
|
||||
_ <- logCopyFinish(bucket, sourceKey,targetKey)
|
||||
} yield CopyS3Action(targetKey)
|
||||
|
||||
private def copyObject(bucket: Bucket,
|
||||
|
@ -28,7 +27,7 @@ class S3ClientCopier[M[_]: Monad](amazonS3: AmazonS3) {
|
|||
val request =
|
||||
new CopyObjectRequest(bucket.name, sourceKey.key, bucket.name, targetKey.key)
|
||||
.withMatchingETagConstraint(hash.hash)
|
||||
Monad[M].pure(amazonS3.copyObject(request))
|
||||
IO(amazonS3.copyObject(request))
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,25 +1,24 @@
|
|||
package net.kemitix.thorp.aws.lib
|
||||
|
||||
import cats.Monad
|
||||
import cats.implicits._
|
||||
import cats.effect.IO
|
||||
import com.amazonaws.services.s3.AmazonS3
|
||||
import com.amazonaws.services.s3.model.DeleteObjectRequest
|
||||
import net.kemitix.thorp.aws.api.S3Action.DeleteS3Action
|
||||
import net.kemitix.thorp.aws.lib.S3ClientLogging.{logDeleteFinish, logDeleteStart}
|
||||
import net.kemitix.thorp.domain.{Bucket, Logger, RemoteKey}
|
||||
|
||||
class S3ClientDeleter[M[_]: Monad](amazonS3: AmazonS3) {
|
||||
class S3ClientDeleter(amazonS3: AmazonS3) {
|
||||
|
||||
def delete(bucket: Bucket,
|
||||
remoteKey: RemoteKey)
|
||||
(implicit logger: Logger[M]): M[DeleteS3Action] =
|
||||
(implicit logger: Logger): IO[DeleteS3Action] =
|
||||
for {
|
||||
_ <- logDeleteStart[M](bucket, remoteKey)
|
||||
_ <- logDeleteStart(bucket, remoteKey)
|
||||
_ <- deleteObject(bucket, remoteKey)
|
||||
_ <- logDeleteFinish[M](bucket, remoteKey)
|
||||
_ <- logDeleteFinish(bucket, remoteKey)
|
||||
} yield DeleteS3Action(remoteKey)
|
||||
|
||||
private def deleteObject(bucket: Bucket, remoteKey: RemoteKey) = Monad[M].pure {
|
||||
amazonS3.deleteObject(new DeleteObjectRequest(bucket.name, remoteKey.key))
|
||||
}
|
||||
private def deleteObject(bucket: Bucket, remoteKey: RemoteKey) =
|
||||
IO(amazonS3.deleteObject(new DeleteObjectRequest(bucket.name, remoteKey.key)))
|
||||
|
||||
}
|
||||
|
|
|
@ -1,40 +1,40 @@
|
|||
package net.kemitix.thorp.aws.lib
|
||||
|
||||
import cats.Monad
|
||||
import cats.effect.IO
|
||||
import net.kemitix.thorp.domain.{Bucket, Logger, RemoteKey}
|
||||
|
||||
object S3ClientLogging {
|
||||
|
||||
def logListObjectsStart[M[_]: Monad](bucket: Bucket,
|
||||
def logListObjectsStart(bucket: Bucket,
|
||||
prefix: RemoteKey)
|
||||
(implicit logger: Logger[M]): M[Unit] =
|
||||
(implicit logger: Logger): IO[Unit] =
|
||||
logger.info(s"Fetch S3 Summary: ${bucket.name}:${prefix.key}")
|
||||
|
||||
def logListObjectsFinish[M[_]: Monad](bucket: Bucket,
|
||||
def logListObjectsFinish(bucket: Bucket,
|
||||
prefix: RemoteKey)
|
||||
(implicit logger: Logger[M]): M[Unit] =
|
||||
(implicit logger: Logger): IO[Unit] =
|
||||
logger.info(s"Fetched S3 Summary: ${bucket.name}:${prefix.key}")
|
||||
|
||||
def logCopyStart[M[_]: Monad](bucket: Bucket,
|
||||
sourceKey: RemoteKey,
|
||||
targetKey: RemoteKey)
|
||||
(implicit logger: Logger[M]): M[Unit] =
|
||||
def logCopyStart(bucket: Bucket,
|
||||
sourceKey: RemoteKey,
|
||||
targetKey: RemoteKey)
|
||||
(implicit logger: Logger): IO[Unit] =
|
||||
logger.info(s"Copy: ${bucket.name}:${sourceKey.key} => ${targetKey.key}")
|
||||
|
||||
def logCopyFinish[M[_]: Monad](bucket: Bucket,
|
||||
def logCopyFinish(bucket: Bucket,
|
||||
sourceKey: RemoteKey,
|
||||
targetKey: RemoteKey)
|
||||
(implicit logger: Logger[M]): M[Unit] =
|
||||
(implicit logger: Logger): IO[Unit] =
|
||||
logger.info(s"Copied: ${bucket.name}:${sourceKey.key} => ${targetKey.key}")
|
||||
|
||||
def logDeleteStart[M[_]: Monad](bucket: Bucket,
|
||||
def logDeleteStart(bucket: Bucket,
|
||||
remoteKey: RemoteKey)
|
||||
(implicit logger: Logger[M]): M[Unit] =
|
||||
(implicit logger: Logger): IO[Unit] =
|
||||
logger.info(s"Delete: ${bucket.name}:${remoteKey.key}")
|
||||
|
||||
def logDeleteFinish[M[_]: Monad](bucket: Bucket,
|
||||
def logDeleteFinish(bucket: Bucket,
|
||||
remoteKey: RemoteKey)
|
||||
(implicit logger: Logger[M]): M[Unit] =
|
||||
(implicit logger: Logger): IO[Unit] =
|
||||
logger.info(s"Deleted: ${bucket.name}:${remoteKey.key}")
|
||||
|
||||
}
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package net.kemitix.thorp.aws.lib
|
||||
|
||||
import cats.Monad
|
||||
import cats.implicits._
|
||||
import cats.effect.IO
|
||||
import com.amazonaws.services.s3.AmazonS3
|
||||
import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary}
|
||||
import net.kemitix.thorp.aws.lib.S3ClientLogging.{logListObjectsFinish, logListObjectsStart}
|
||||
|
@ -12,11 +11,11 @@ import net.kemitix.thorp.domain.{Bucket, Logger, RemoteKey, S3ObjectsData}
|
|||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
class S3ClientObjectLister[M[_]: Monad](amazonS3: AmazonS3) {
|
||||
class S3ClientObjectLister(amazonS3: AmazonS3) {
|
||||
|
||||
def listObjects(bucket: Bucket,
|
||||
prefix: RemoteKey)
|
||||
(implicit logger: Logger[M]): M[S3ObjectsData] = {
|
||||
(implicit logger: Logger): IO[S3ObjectsData] = {
|
||||
|
||||
type Token = String
|
||||
type Batch = (Stream[S3ObjectSummary], Option[Token])
|
||||
|
@ -26,8 +25,8 @@ class S3ClientObjectLister[M[_]: Monad](amazonS3: AmazonS3) {
|
|||
.withPrefix(prefix.key)
|
||||
.withContinuationToken(token)
|
||||
|
||||
def fetchBatch: ListObjectsV2Request => M[Batch] =
|
||||
request => Monad[M].pure {
|
||||
def fetchBatch: ListObjectsV2Request => IO[Batch] =
|
||||
request => IO.pure {
|
||||
val result = amazonS3.listObjectsV2(request)
|
||||
val more: Option[Token] =
|
||||
if (result.isTruncated) Some(result.getNextContinuationToken)
|
||||
|
@ -35,14 +34,14 @@ class S3ClientObjectLister[M[_]: Monad](amazonS3: AmazonS3) {
|
|||
(result.getObjectSummaries.asScala.toStream, more)
|
||||
}
|
||||
|
||||
def fetchMore(more: Option[Token]): M[Stream[S3ObjectSummary]] = {
|
||||
def fetchMore(more: Option[Token]): IO[Stream[S3ObjectSummary]] = {
|
||||
more match {
|
||||
case None => Monad[M].pure(Stream.empty)
|
||||
case None => IO.pure(Stream.empty)
|
||||
case Some(token) => fetch(requestMore(token))
|
||||
}
|
||||
}
|
||||
|
||||
def fetch: ListObjectsV2Request => M[Stream[S3ObjectSummary]] =
|
||||
def fetch: ListObjectsV2Request => IO[Stream[S3ObjectSummary]] =
|
||||
request =>
|
||||
for {
|
||||
batch <- fetchBatch(request)
|
||||
|
@ -51,10 +50,10 @@ class S3ClientObjectLister[M[_]: Monad](amazonS3: AmazonS3) {
|
|||
} yield summaries ++ rest
|
||||
|
||||
for {
|
||||
_ <- logListObjectsStart[M](bucket, prefix)
|
||||
_ <- logListObjectsStart(bucket, prefix)
|
||||
r = new ListObjectsV2Request().withBucketName(bucket.name).withPrefix(prefix.key)
|
||||
summaries <- fetch(r)
|
||||
_ <- logListObjectsFinish[M](bucket, prefix)
|
||||
_ <- logListObjectsFinish(bucket, prefix)
|
||||
} yield domain.S3ObjectsData(byHash(summaries), byKey(summaries))
|
||||
}
|
||||
|
||||
|
|
|
@ -1,31 +1,31 @@
|
|||
package net.kemitix.thorp.aws.lib
|
||||
|
||||
import cats.Monad
|
||||
import cats.effect.IO
|
||||
import com.amazonaws.services.s3.AmazonS3
|
||||
import com.amazonaws.services.s3.transfer.TransferManager
|
||||
import net.kemitix.thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action}
|
||||
import net.kemitix.thorp.aws.api.{S3Action, S3Client, UploadProgressListener}
|
||||
import net.kemitix.thorp.domain._
|
||||
|
||||
class ThorpS3Client[M[_]: Monad](amazonS3Client: => AmazonS3,
|
||||
class ThorpS3Client(amazonS3Client: => AmazonS3,
|
||||
amazonS3TransferManager: => TransferManager)
|
||||
extends S3Client[M] {
|
||||
extends S3Client {
|
||||
|
||||
lazy val objectLister = new S3ClientObjectLister[M](amazonS3Client)
|
||||
lazy val copier = new S3ClientCopier[M](amazonS3Client)
|
||||
lazy val uploader = new Uploader[M](amazonS3TransferManager)
|
||||
lazy val deleter = new S3ClientDeleter[M](amazonS3Client)
|
||||
lazy val objectLister = new S3ClientObjectLister(amazonS3Client)
|
||||
lazy val copier = new S3ClientCopier(amazonS3Client)
|
||||
lazy val uploader = new Uploader(amazonS3TransferManager)
|
||||
lazy val deleter = new S3ClientDeleter(amazonS3Client)
|
||||
|
||||
override def listObjects(bucket: Bucket,
|
||||
prefix: RemoteKey)
|
||||
(implicit logger: Logger[M]): M[S3ObjectsData] =
|
||||
(implicit logger: Logger): IO[S3ObjectsData] =
|
||||
objectLister.listObjects(bucket, prefix)
|
||||
|
||||
override def copy(bucket: Bucket,
|
||||
sourceKey: RemoteKey,
|
||||
hash: MD5Hash,
|
||||
targetKey: RemoteKey)
|
||||
(implicit logger: Logger[M]): M[CopyS3Action] =
|
||||
(implicit logger: Logger): IO[CopyS3Action] =
|
||||
copier.copy(bucket, sourceKey,hash, targetKey)
|
||||
|
||||
override def upload(localFile: LocalFile,
|
||||
|
@ -34,12 +34,12 @@ class ThorpS3Client[M[_]: Monad](amazonS3Client: => AmazonS3,
|
|||
multiPartThreshold: Long,
|
||||
tryCount: Int,
|
||||
maxRetries: Int)
|
||||
(implicit logger: Logger[M]): M[S3Action] =
|
||||
(implicit logger: Logger): IO[S3Action] =
|
||||
uploader.upload(localFile, bucket, progressListener, multiPartThreshold, 1, maxRetries)
|
||||
|
||||
override def delete(bucket: Bucket,
|
||||
remoteKey: RemoteKey)
|
||||
(implicit logger: Logger[M]): M[DeleteS3Action] =
|
||||
(implicit logger: Logger): IO[DeleteS3Action] =
|
||||
deleter.delete(bucket, remoteKey)
|
||||
|
||||
}
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package net.kemitix.thorp.aws.lib
|
||||
|
||||
import cats.Monad
|
||||
import cats.implicits._
|
||||
import cats.effect.IO
|
||||
import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener}
|
||||
import com.amazonaws.services.s3.model.PutObjectRequest
|
||||
import com.amazonaws.services.s3.transfer.model.UploadResult
|
||||
|
@ -14,7 +13,7 @@ import net.kemitix.thorp.domain._
|
|||
|
||||
import scala.util.Try
|
||||
|
||||
class Uploader[M[_]: Monad](transferManager: => AmazonTransferManager) {
|
||||
class Uploader(transferManager: => AmazonTransferManager) {
|
||||
|
||||
def accepts(localFile: LocalFile)
|
||||
(implicit multiPartThreshold: Long): Boolean =
|
||||
|
@ -26,11 +25,11 @@ class Uploader[M[_]: Monad](transferManager: => AmazonTransferManager) {
|
|||
multiPartThreshold: Long,
|
||||
tryCount: Int,
|
||||
maxRetries: Int)
|
||||
(implicit logger: Logger[M]): M[S3Action] =
|
||||
(implicit logger: Logger): IO[S3Action] =
|
||||
for {
|
||||
_ <- logMultiPartUploadStart[M](localFile, tryCount)
|
||||
_ <- logMultiPartUploadStart(localFile, tryCount)
|
||||
upload <- transfer(localFile, bucket, uploadProgressListener)
|
||||
_ <- logMultiPartUploadFinished[M](localFile)
|
||||
_ <- logMultiPartUploadFinished(localFile)
|
||||
} yield upload match {
|
||||
case Right(r) => UploadS3Action(RemoteKey(r.getKey), MD5Hash(r.getETag))
|
||||
case Left(e) => ErroredS3Action(localFile.remoteKey, e)
|
||||
|
@ -39,10 +38,10 @@ class Uploader[M[_]: Monad](transferManager: => AmazonTransferManager) {
|
|||
private def transfer(localFile: LocalFile,
|
||||
bucket: Bucket,
|
||||
uploadProgressListener: UploadProgressListener,
|
||||
): M[Either[Throwable, UploadResult]] = {
|
||||
): IO[Either[Throwable, UploadResult]] = {
|
||||
val listener: ProgressListener = progressListener(uploadProgressListener)
|
||||
val putObjectRequest = request(localFile, bucket, listener)
|
||||
Monad[M].pure {
|
||||
IO {
|
||||
Try(transferManager.upload(putObjectRequest))
|
||||
.map(_.waitForUploadResult)
|
||||
.toEither
|
||||
|
|
|
@ -1,22 +1,22 @@
|
|||
package net.kemitix.thorp.aws.lib
|
||||
|
||||
import cats.Monad
|
||||
import cats.effect.IO
|
||||
import net.kemitix.thorp.domain.SizeTranslation.sizeInEnglish
|
||||
import net.kemitix.thorp.domain.Terminal.clearLine
|
||||
import net.kemitix.thorp.domain.{LocalFile, Logger}
|
||||
|
||||
object UploaderLogging {
|
||||
|
||||
def logMultiPartUploadStart[M[_]: Monad](localFile: LocalFile,
|
||||
def logMultiPartUploadStart(localFile: LocalFile,
|
||||
tryCount: Int)
|
||||
(implicit logger: Logger[M]): M[Unit] = {
|
||||
(implicit logger: Logger): IO[Unit] = {
|
||||
val tryMessage = if (tryCount == 1) "" else s"try $tryCount"
|
||||
val size = sizeInEnglish(localFile.file.length)
|
||||
logger.info(s"${clearLine}upload:$tryMessage:$size:${localFile.remoteKey.key}")
|
||||
}
|
||||
|
||||
def logMultiPartUploadFinished[M[_]: Monad](localFile: LocalFile)
|
||||
(implicit logger: Logger[M]): M[Unit] =
|
||||
def logMultiPartUploadFinished(localFile: LocalFile)
|
||||
(implicit logger: Logger): IO[Unit] =
|
||||
logger.debug(s"upload:finished: ${localFile.remoteKey.key}")
|
||||
|
||||
}
|
||||
|
|
|
@ -1,16 +1,18 @@
|
|||
package net.kemitix.thorp.aws.lib
|
||||
|
||||
import cats.Monad
|
||||
import cats.effect.IO
|
||||
import net.kemitix.thorp.domain.Logger
|
||||
|
||||
class DummyLogger[M[_]: Monad] extends Logger[M] {
|
||||
class DummyLogger extends Logger {
|
||||
|
||||
override def debug(message: => String): M[Unit] = Monad[M].unit
|
||||
override def debug(message: => String): IO[Unit] = IO.unit
|
||||
|
||||
override def info(message: =>String): M[Unit] = Monad[M].unit
|
||||
override def info(message: =>String): IO[Unit] = IO.unit
|
||||
|
||||
override def warn(message: String): M[Unit] = Monad[M].unit
|
||||
override def warn(message: String): IO[Unit] = IO.unit
|
||||
|
||||
override def error(message: String): M[Unit] = Monad[M].unit
|
||||
override def error(message: String): IO[Unit] = IO.unit
|
||||
|
||||
override def withDebug(debug: Boolean): Logger = this
|
||||
|
||||
}
|
||||
|
|
|
@ -2,7 +2,6 @@ package net.kemitix.thorp.aws.lib
|
|||
|
||||
import java.time.Instant
|
||||
|
||||
import cats.Id
|
||||
import com.amazonaws.services.s3.AmazonS3
|
||||
import com.amazonaws.services.s3.model.PutObjectRequest
|
||||
import com.amazonaws.services.s3.transfer.model.UploadResult
|
||||
|
@ -23,7 +22,7 @@ class S3ClientSuite
|
|||
|
||||
private val prefix = RemoteKey("prefix")
|
||||
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
|
||||
implicit private val implLogger: Logger[Id] = new DummyLogger[Id]
|
||||
implicit private val implLogger: Logger = new DummyLogger
|
||||
private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _
|
||||
|
||||
describe("getS3Status") {
|
||||
|
@ -43,7 +42,7 @@ class S3ClientSuite
|
|||
keyotherkey.remoteKey -> HashModified(hash, lastModified),
|
||||
keydiffhash.remoteKey -> HashModified(diffhash, lastModified)))
|
||||
|
||||
def invoke(self: S3Client[Id], localFile: LocalFile) = {
|
||||
def invoke(self: S3Client, localFile: LocalFile) = {
|
||||
S3MetaDataEnricher.getS3Status(localFile, s3ObjectsData)
|
||||
}
|
||||
|
||||
|
|
|
@ -4,7 +4,6 @@ import java.time.Instant
|
|||
import java.time.temporal.ChronoUnit
|
||||
import java.util.Date
|
||||
|
||||
import cats.Id
|
||||
import com.amazonaws.services.s3.AmazonS3
|
||||
import com.amazonaws.services.s3.model.{ListObjectsV2Request, ListObjectsV2Result, S3ObjectSummary}
|
||||
import com.amazonaws.services.s3.transfer.TransferManager
|
||||
|
@ -21,7 +20,7 @@ class ThorpS3ClientSuite
|
|||
val source = Resource(this, "upload")
|
||||
val prefix = RemoteKey("prefix")
|
||||
implicit val config: Config = Config(Bucket("bucket"), prefix, source = source)
|
||||
implicit val implLogger: Logger[Id] = new DummyLogger[Id]
|
||||
implicit val implLogger: Logger = new DummyLogger
|
||||
|
||||
val lm = LastModified(Instant.now.truncatedTo(ChronoUnit.MILLIS))
|
||||
|
||||
|
@ -48,7 +47,7 @@ class ThorpS3ClientSuite
|
|||
|
||||
val amazonS3 = stub[AmazonS3]
|
||||
val amazonS3TransferManager = stub[TransferManager]
|
||||
val s3Client = new ThorpS3Client[Id](amazonS3, amazonS3TransferManager)
|
||||
val s3Client = new ThorpS3Client(amazonS3, amazonS3TransferManager)
|
||||
|
||||
val myFakeResponse = new ListObjectsV2Result()
|
||||
val summaries = myFakeResponse.getObjectSummaries
|
||||
|
@ -66,7 +65,7 @@ class ThorpS3ClientSuite
|
|||
k1a -> HashModified(h1, lm),
|
||||
k1b -> HashModified(h1, lm),
|
||||
k2 -> HashModified(h2, lm)))
|
||||
val result = s3Client.listObjects(Bucket("bucket"), RemoteKey("prefix"))
|
||||
val result = s3Client.listObjects(Bucket("bucket"), RemoteKey("prefix")).unsafeRunSync
|
||||
assertResult(expected.byHash.keys)(result.byHash.keys)
|
||||
assertResult(expected.byKey.keys)(result.byKey.keys)
|
||||
assertResult(expected)(result)
|
||||
|
|
|
@ -2,7 +2,6 @@ package net.kemitix.thorp.aws.lib
|
|||
|
||||
import java.time.Instant
|
||||
|
||||
import cats.Id
|
||||
import com.amazonaws.services.s3.AmazonS3
|
||||
import com.amazonaws.services.s3.transfer._
|
||||
import net.kemitix.thorp.aws.api.S3Action.UploadS3Action
|
||||
|
@ -20,7 +19,7 @@ class UploaderSuite
|
|||
private val source = Resource(this, ".")
|
||||
private val prefix = RemoteKey("prefix")
|
||||
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
|
||||
implicit private val implLogger: Logger[Id] = new DummyLogger[Id]
|
||||
implicit private val implLogger: Logger = new DummyLogger
|
||||
private val fileToKey = generateKey(config.source, config.prefix) _
|
||||
val lastModified = LastModified(Instant.now())
|
||||
|
||||
|
|
19
build.sbt
19
build.sbt
|
@ -27,19 +27,6 @@ val awsSdkDependencies = Seq(
|
|||
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % "2.9.9"
|
||||
)
|
||||
)
|
||||
val catsSettings = Seq (
|
||||
libraryDependencies ++= Seq(
|
||||
"org.typelevel" %% "cats-core" % "1.6.1"
|
||||
),
|
||||
// recommended for cats-effects
|
||||
scalacOptions ++= Seq(
|
||||
"-feature",
|
||||
"-deprecation",
|
||||
"-unchecked",
|
||||
"-language:postfixOps",
|
||||
"-language:higherKinds",
|
||||
"-Ypartial-unification")
|
||||
)
|
||||
val catsEffectsSettings = Seq(
|
||||
libraryDependencies ++= Seq(
|
||||
"org.typelevel" %% "cats-effect" % "1.3.1"
|
||||
|
@ -56,11 +43,13 @@ val catsEffectsSettings = Seq(
|
|||
|
||||
// cli -> thorp-lib -> aws-lib -> core -> aws-api -> domain
|
||||
|
||||
lazy val root = (project in file("."))
|
||||
.settings(commonSettings)
|
||||
|
||||
lazy val cli = (project in file("cli"))
|
||||
.settings(commonSettings)
|
||||
.settings(mainClass in assembly := Some("net.kemitix.thorp.cli.Main"))
|
||||
.settings(applicationSettings)
|
||||
.settings(catsEffectsSettings)
|
||||
.aggregate(`thorp-lib`, `aws-lib`, core, `aws-api`, domain)
|
||||
.settings(commandLineParsing)
|
||||
.settings(testDependencies)
|
||||
|
@ -87,10 +76,10 @@ lazy val core = (project in file("core"))
|
|||
lazy val `aws-api` = (project in file("aws-api"))
|
||||
.settings(commonSettings)
|
||||
.settings(assemblyJarName in assembly := "aws-api.jar")
|
||||
.settings(catsSettings)
|
||||
.dependsOn(domain)
|
||||
|
||||
lazy val domain = (project in file("domain"))
|
||||
.settings(commonSettings)
|
||||
.settings(assemblyJarName in assembly := "domain.jar")
|
||||
.settings(catsEffectsSettings)
|
||||
.settings(testDependencies)
|
||||
|
|
|
@ -1,20 +1,14 @@
|
|||
package net.kemitix.thorp.cli
|
||||
|
||||
import java.nio.file.Paths
|
||||
|
||||
import cats.effect.ExitCase.{Canceled, Completed, Error}
|
||||
import cats.effect.{ExitCode, IO, IOApp}
|
||||
import net.kemitix.thorp.domain.Config
|
||||
|
||||
object Main extends IOApp {
|
||||
|
||||
val defaultConfig: Config =
|
||||
Config(source = Paths.get(".").toFile)
|
||||
|
||||
override def run(args: List[String]): IO[ExitCode] = {
|
||||
val exitCaseLogger = new PrintLogger[IO](false)
|
||||
ParseArgs(args, defaultConfig)
|
||||
.map(Program[IO])
|
||||
val exitCaseLogger = new PrintLogger(false)
|
||||
ParseArgs(args)
|
||||
.map(Program(_))
|
||||
.getOrElse(IO(ExitCode.Error))
|
||||
.guaranteeCase {
|
||||
case Canceled => exitCaseLogger.warn("Interrupted")
|
||||
|
|
|
@ -1,47 +1,42 @@
|
|||
package net.kemitix.thorp.cli
|
||||
|
||||
import java.io.File
|
||||
import java.nio.file.Paths
|
||||
|
||||
import net.kemitix.thorp.domain.Filter.{Exclude, Include}
|
||||
import net.kemitix.thorp.domain.{Bucket, Config, RemoteKey}
|
||||
import net.kemitix.thorp.core.ConfigOption
|
||||
import scopt.OParser
|
||||
|
||||
object ParseArgs {
|
||||
|
||||
val configParser: OParser[Unit, Config] = {
|
||||
val parserBuilder = OParser.builder[Config]
|
||||
val configParser: OParser[Unit, List[ConfigOption]] = {
|
||||
val parserBuilder = OParser.builder[List[ConfigOption]]
|
||||
import parserBuilder._
|
||||
OParser.sequence(
|
||||
programName("thorp"),
|
||||
head("thorp"),
|
||||
opt[String]('s', "source")
|
||||
.action((str, c) => c.copy(source = Paths.get(str).toFile))
|
||||
.validate(s => if (new File(s).isDirectory) Right(()) else Left("Source is not a directory"))
|
||||
.required()
|
||||
.text("Source directory to sync to S3"),
|
||||
.action((str, cos) => ConfigOption.Source(Paths.get(str)) :: cos)
|
||||
.text("Source directory to sync to destination"),
|
||||
opt[String]('b', "bucket")
|
||||
.action((str, c) => c.copy(bucket = Bucket(str)))
|
||||
.required()
|
||||
.action((str, cos) => ConfigOption.Bucket(str) :: cos)
|
||||
.text("S3 bucket name"),
|
||||
opt[String]('p', "prefix")
|
||||
.action((str, c) => c.copy(prefix = RemoteKey(str)))
|
||||
.action((str, cos) => ConfigOption.Prefix(str) :: cos)
|
||||
.text("Prefix within the S3 Bucket"),
|
||||
opt[Seq[String]]('i', "include")
|
||||
opt[String]('i', "include")
|
||||
.unbounded()
|
||||
.action((str, c) => c.copy(filters = c.filters ++ str.map(Include)))
|
||||
.action((str, cos) => ConfigOption.Include(str) :: cos)
|
||||
.text("Include only matching paths"),
|
||||
opt[Seq[String]]('x', "exclude")
|
||||
opt[String]('x', "exclude")
|
||||
.unbounded()
|
||||
.action((str,c) => c.copy(filters = c.filters ++ str.map(Exclude)))
|
||||
.action((str,cos) => ConfigOption.Exclude(str) :: cos)
|
||||
.text("Exclude matching paths"),
|
||||
opt[Unit]('d', "debug")
|
||||
.action((_, c) => c.copy(debug = true))
|
||||
.action((_, cos) => ConfigOption.Debug() :: cos)
|
||||
.text("Enable debug logging")
|
||||
)
|
||||
}
|
||||
|
||||
def apply(args: List[String], defaultConfig: Config): Option[Config] =
|
||||
OParser.parse(configParser, args, defaultConfig)
|
||||
def apply(args: List[String]): Option[List[ConfigOption]] =
|
||||
OParser.parse(configParser, args, List())
|
||||
|
||||
}
|
||||
|
|
|
@ -1,18 +1,22 @@
|
|||
package net.kemitix.thorp.cli
|
||||
|
||||
import cats.Monad
|
||||
import cats.effect.IO
|
||||
import net.kemitix.thorp.domain.Logger
|
||||
|
||||
class PrintLogger[M[_]: Monad](isDebug: Boolean) extends Logger[M] {
|
||||
class PrintLogger(isDebug: Boolean = false) extends Logger {
|
||||
|
||||
override def debug(message: => String): M[Unit] =
|
||||
if (isDebug) Monad[M].pure(println(s"[ DEBUG] $message"))
|
||||
else Monad[M].unit
|
||||
override def debug(message: => String): IO[Unit] =
|
||||
if (isDebug) IO(println(s"[ DEBUG] $message"))
|
||||
else IO.unit
|
||||
|
||||
override def info(message: => String): M[Unit] = Monad[M].pure(println(s"[ INFO] $message"))
|
||||
override def info(message: => String): IO[Unit] = IO(println(s"[ INFO] $message"))
|
||||
|
||||
override def warn(message: String): M[Unit] = Monad[M].pure(println(s"[ WARN] $message"))
|
||||
override def warn(message: String): IO[Unit] = IO(println(s"[ WARN] $message"))
|
||||
|
||||
override def error(message: String): M[Unit] = Monad[M].pure(println(s"[ ERROR] $message"))
|
||||
override def error(message: String): IO[Unit] = IO(println(s"[ ERROR] $message"))
|
||||
|
||||
override def withDebug(debug: Boolean): Logger =
|
||||
if (isDebug == debug) this
|
||||
else new PrintLogger(debug)
|
||||
|
||||
}
|
||||
|
|
|
@ -1,20 +1,24 @@
|
|||
package net.kemitix.thorp.cli
|
||||
|
||||
import cats.Monad
|
||||
import cats.effect.ExitCode
|
||||
import cats.implicits._
|
||||
import cats.effect.{ExitCode, IO}
|
||||
import net.kemitix.thorp.aws.lib.S3ClientBuilder
|
||||
import net.kemitix.thorp.core.Sync
|
||||
import net.kemitix.thorp.domain.{Config, Logger}
|
||||
import net.kemitix.thorp.core.{ConfigOption, Sync}
|
||||
import net.kemitix.thorp.domain.Logger
|
||||
|
||||
object Program {
|
||||
trait Program {
|
||||
|
||||
def apply[M[_]: Monad](config: Config): M[ExitCode] = {
|
||||
implicit val logger: Logger[M] = new PrintLogger[M](config.debug)
|
||||
for {
|
||||
_ <- logger.info("Thorp - hashed sync for cloud storage")
|
||||
_ <- Sync.run[M](config, S3ClientBuilder.defaultClient)
|
||||
} yield ExitCode.Success
|
||||
def apply(configOptions: Seq[ConfigOption]): IO[ExitCode] = {
|
||||
implicit val logger: Logger = new PrintLogger()
|
||||
Sync(S3ClientBuilder.defaultClient)(configOptions) flatMap {
|
||||
case Left(errors) =>
|
||||
for {
|
||||
_ <- logger.error(s"There were errors:")
|
||||
_ <- IO.pure(errors.map(error => logger.error(s" - $error")))
|
||||
} yield ExitCode.Error
|
||||
case Right(_) => IO.pure(ExitCode.Success)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object Program extends Program
|
|
@ -1,7 +1,7 @@
|
|||
package net.kemitix.thorp.cli
|
||||
|
||||
import net.kemitix.thorp.core.Resource
|
||||
import net.kemitix.thorp.domain.Config
|
||||
import net.kemitix.thorp.core.ConfigOption.Debug
|
||||
import net.kemitix.thorp.core.{ConfigOption, Resource}
|
||||
import org.scalatest.FunSpec
|
||||
|
||||
import scala.util.Try
|
||||
|
@ -9,11 +9,10 @@ import scala.util.Try
|
|||
class ParseArgsTest extends FunSpec {
|
||||
|
||||
val source = Resource(this, "")
|
||||
val defaultConfig: Config = Config(source = source)
|
||||
|
||||
describe("parse - source") {
|
||||
def invokeWithSource(path: String) =
|
||||
ParseArgs(List("--source", path, "--bucket", "bucket"), defaultConfig)
|
||||
ParseArgs(List("--source", path, "--bucket", "bucket"))
|
||||
|
||||
describe("when source is a directory") {
|
||||
val result = invokeWithSource(pathTo("."))
|
||||
|
@ -21,52 +20,35 @@ class ParseArgsTest extends FunSpec {
|
|||
assert(result.isDefined)
|
||||
}
|
||||
}
|
||||
describe("when source is a file") {
|
||||
val result = invokeWithSource(pathTo("ParseArgs.class"))
|
||||
it("should fail") {
|
||||
assert(result.isEmpty)
|
||||
}
|
||||
}
|
||||
describe("when source is not found") {
|
||||
val result = invokeWithSource(pathTo("not-found"))
|
||||
it("should fail") {
|
||||
assert(result.isEmpty)
|
||||
}
|
||||
}
|
||||
describe("when source is a relative path to a directory") {
|
||||
it("should succeed") {pending}
|
||||
}
|
||||
describe("when source is a relative path to a file") {
|
||||
it("should fail") {pending}
|
||||
}
|
||||
describe("when source is a relative path to a missing path") {
|
||||
it("should fail") {pending}
|
||||
}
|
||||
}
|
||||
|
||||
describe("parse - debug") {
|
||||
def invokeWithDebug(debug: String) = {
|
||||
val strings = List("--source", pathTo("."), "--bucket", "bucket", debug)
|
||||
def invokeWithArgument(arg: String): List[ConfigOption] = {
|
||||
val strings = List("--source", pathTo("."), "--bucket", "bucket", arg)
|
||||
.filter(_ != "")
|
||||
ParseArgs(strings, defaultConfig).map(_.debug)
|
||||
val maybeOptions = ParseArgs(strings)
|
||||
maybeOptions.getOrElse(List())
|
||||
}
|
||||
|
||||
describe("when no debug flag") {
|
||||
val debugFlag = invokeWithDebug("")
|
||||
val configOptions = invokeWithArgument("")
|
||||
it("debug should be false") {
|
||||
assert(debugFlag.contains(false))
|
||||
assertResult(false)(configOptions.contains(Debug()))
|
||||
}
|
||||
}
|
||||
describe("when long debug flag") {
|
||||
val debugFlag = invokeWithDebug("--debug")
|
||||
val configOptions = invokeWithArgument("--debug")
|
||||
it("debug should be true") {
|
||||
assert(debugFlag.contains(true))
|
||||
assert(configOptions.contains(Debug()))
|
||||
}
|
||||
}
|
||||
describe("when short debug flag") {
|
||||
val debugFlag = invokeWithDebug("-d")
|
||||
val configOptions = invokeWithArgument("-d")
|
||||
it("debug should be true") {
|
||||
assert(debugFlag.contains(true))
|
||||
assert(configOptions.contains(Debug()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package net.kemitix.thorp.core
|
||||
|
||||
import cats.Monad
|
||||
import cats.implicits._
|
||||
import cats.effect.IO
|
||||
import net.kemitix.thorp.aws.api.S3Action.DoNothingS3Action
|
||||
import net.kemitix.thorp.aws.api.{S3Action, S3Client, UploadProgressListener}
|
||||
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
|
||||
|
@ -9,9 +8,10 @@ import net.kemitix.thorp.domain.{Config, Logger}
|
|||
|
||||
object ActionSubmitter {
|
||||
|
||||
def submitAction[M[_]: Monad](s3Client: S3Client[M], action: Action)
|
||||
(implicit c: Config,
|
||||
logger: Logger[M]): Stream[M[S3Action]] = {
|
||||
def submitAction(s3Client: S3Client,
|
||||
action: Action)
|
||||
(implicit c: Config,
|
||||
logger: Logger): Stream[IO[S3Action]] = {
|
||||
Stream(
|
||||
action match {
|
||||
case ToUpload(bucket, localFile) =>
|
||||
|
@ -31,7 +31,7 @@ object ActionSubmitter {
|
|||
action <- s3Client.delete(bucket, remoteKey)
|
||||
} yield action
|
||||
case DoNothing(bucket, remoteKey) =>
|
||||
Monad[M].pure(DoNothingS3Action(remoteKey))
|
||||
IO.pure(DoNothingS3Action(remoteKey))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
package net.kemitix.thorp.core
|
||||
|
||||
import java.nio.file.Path
|
||||
|
||||
import net.kemitix.thorp.domain
|
||||
import net.kemitix.thorp.domain.{Config, RemoteKey}
|
||||
|
||||
sealed trait ConfigOption {
|
||||
def update(config: Config): Config
|
||||
}
|
||||
|
||||
object ConfigOption {
|
||||
case class Source(path: Path) extends ConfigOption {
|
||||
override def update(config: Config): Config = config.copy(source = path.toFile)
|
||||
}
|
||||
case class Bucket(name: String) extends ConfigOption {
|
||||
override def update(config: Config): Config = config.copy(bucket = domain.Bucket(name))
|
||||
}
|
||||
case class Prefix(path: String) extends ConfigOption {
|
||||
override def update(config: Config): Config = config.copy(prefix = RemoteKey(path))
|
||||
}
|
||||
case class Include(pattern: String) extends ConfigOption {
|
||||
override def update(config: Config): Config = config.copy(filters = domain.Filter.Include(pattern) :: config.filters)
|
||||
}
|
||||
case class Exclude(pattern: String) extends ConfigOption {
|
||||
override def update(config: Config): Config = config.copy(filters = domain.Filter.Exclude(pattern) :: config.filters)
|
||||
}
|
||||
case class Debug() extends ConfigOption {
|
||||
override def update(config: Config): Config = config.copy(debug = true)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
package net.kemitix.thorp.core
|
||||
|
||||
sealed trait ConfigValidation {
|
||||
|
||||
def errorMessage: String
|
||||
}
|
||||
|
||||
object ConfigValidation {
|
||||
|
||||
case object SourceIsNotADirectory extends ConfigValidation {
|
||||
override def errorMessage: String = "Source must be a directory"
|
||||
}
|
||||
|
||||
case object SourceIsNotReadable extends ConfigValidation {
|
||||
override def errorMessage: String = "Source must be readable"
|
||||
}
|
||||
|
||||
case object BucketNameIsMissing extends ConfigValidation {
|
||||
override def errorMessage: String = "Bucket name is missing"
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
package net.kemitix.thorp.core
|
||||
|
||||
import java.io.File
|
||||
|
||||
import cats.data.{NonEmptyChain, Validated, ValidatedNec}
|
||||
import cats.implicits._
|
||||
import net.kemitix.thorp.domain.{Bucket, Config}
|
||||
|
||||
sealed trait ConfigValidator {
|
||||
|
||||
type ValidationResult[A] = ValidatedNec[ConfigValidation, A]
|
||||
|
||||
def validateSourceIsDirectory(source: File): ValidationResult[File] =
|
||||
if(source.isDirectory) source.validNec
|
||||
else ConfigValidation.SourceIsNotADirectory.invalidNec
|
||||
|
||||
def validateSourceIsReadable(source: File): ValidationResult[File] =
|
||||
if(source.canRead) source.validNec
|
||||
else ConfigValidation.SourceIsNotReadable.invalidNec
|
||||
|
||||
def validateSource(source: File): ValidationResult[File] =
|
||||
validateSourceIsDirectory(source).andThen(s => validateSourceIsReadable(s))
|
||||
|
||||
def validateBucket(bucket: Bucket): ValidationResult[Bucket] =
|
||||
if (bucket.name.isEmpty) ConfigValidation.BucketNameIsMissing.invalidNec
|
||||
else bucket.validNec
|
||||
|
||||
def validateConfig(config: Config): Validated[NonEmptyChain[ConfigValidation], Config] =
|
||||
(
|
||||
validateSource(config.source),
|
||||
validateBucket(config.bucket)
|
||||
).mapN((_, _) => config)
|
||||
}
|
||||
|
||||
object ConfigValidator extends ConfigValidator
|
|
@ -0,0 +1,47 @@
|
|||
package net.kemitix.thorp.core
|
||||
|
||||
import java.io.File
|
||||
import java.nio.file.Paths
|
||||
|
||||
import cats.data.NonEmptyChain
|
||||
import cats.effect.IO
|
||||
import net.kemitix.thorp.core.ConfigValidator.validateConfig
|
||||
import net.kemitix.thorp.domain.Config
|
||||
import net.kemitix.thorp.core.ParseConfigFile.parseFile
|
||||
|
||||
/**
|
||||
* Builds a configuration from settings in a file within the
|
||||
* `source` directory and from supplied configuration options.
|
||||
*/
|
||||
trait ConfigurationBuilder {
|
||||
|
||||
private val pwdFile: File = Paths.get(System.getenv("PWD")).toFile
|
||||
|
||||
private val defaultConfig: Config = Config(source = pwdFile)
|
||||
|
||||
def buildConfig(priorityOptions: Seq[ConfigOption]): IO[Either[NonEmptyChain[ConfigValidation], Config]] = {
|
||||
val source = findSource(priorityOptions)
|
||||
for {
|
||||
options <- sourceOptions(source)
|
||||
collected = priorityOptions ++ options
|
||||
config = collateOptions(collected)
|
||||
} yield validateConfig(config).toEither
|
||||
}
|
||||
|
||||
private def findSource(priorityOptions: Seq[ConfigOption]): File =
|
||||
priorityOptions.foldRight(pwdFile)((co, f) => co match {
|
||||
case ConfigOption.Source(source) => source.toFile
|
||||
case _ => f
|
||||
})
|
||||
|
||||
private def sourceOptions(source: File): IO[Seq[ConfigOption]] =
|
||||
readFile(source, ".thorp.conf")
|
||||
|
||||
private def readFile(source: File, filename: String): IO[Seq[ConfigOption]] =
|
||||
parseFile(source.toPath.resolve(filename))
|
||||
|
||||
private def collateOptions(configOptions: Seq[ConfigOption]): Config =
|
||||
configOptions.foldRight(defaultConfig)((co, c) => co.update(c))
|
||||
}
|
||||
|
||||
object ConfigurationBuilder extends ConfigurationBuilder
|
|
@ -3,25 +3,24 @@ package net.kemitix.thorp.core
|
|||
import java.io.File
|
||||
import java.nio.file.Path
|
||||
|
||||
import cats.Monad
|
||||
import cats.implicits._
|
||||
import cats.effect.IO
|
||||
import net.kemitix.thorp.core.KeyGenerator.generateKey
|
||||
import net.kemitix.thorp.domain
|
||||
import net.kemitix.thorp.domain._
|
||||
|
||||
object LocalFileStream {
|
||||
|
||||
def findFiles[M[_]: Monad](file: File,
|
||||
md5HashGenerator: File => M[MD5Hash])
|
||||
(implicit c: Config,
|
||||
logger: Logger[M]): M[Stream[LocalFile]] = {
|
||||
def findFiles(file: File,
|
||||
md5HashGenerator: File => IO[MD5Hash])
|
||||
(implicit c: Config,
|
||||
logger: Logger): IO[Stream[LocalFile]] = {
|
||||
|
||||
val filters: Path => Boolean = Filter.isIncluded(c.filters)
|
||||
|
||||
def loop(file: File): M[Stream[LocalFile]] = {
|
||||
def loop(file: File): IO[Stream[LocalFile]] = {
|
||||
|
||||
def dirPaths(file: File): M[Stream[File]] =
|
||||
Monad[M].pure {
|
||||
def dirPaths(file: File): IO[Stream[File]] =
|
||||
IO.pure {
|
||||
Option(file.listFiles)
|
||||
.getOrElse(throw new IllegalArgumentException(s"Directory not found $file"))
|
||||
}
|
||||
|
@ -29,15 +28,15 @@ object LocalFileStream {
|
|||
Stream(fs: _*)
|
||||
.filter(f => filters(f.toPath)))
|
||||
|
||||
def recurseIntoSubDirectories(file: File)(implicit c: Config): M[Stream[LocalFile]] =
|
||||
def recurseIntoSubDirectories(file: File)(implicit c: Config): IO[Stream[LocalFile]] =
|
||||
file match {
|
||||
case f if f.isDirectory => loop(file)
|
||||
case _ => for(hash <- md5HashGenerator(file))
|
||||
yield Stream(domain.LocalFile(file, c.source, hash, generateKey(c.source, c.prefix)))
|
||||
}
|
||||
|
||||
def recurse(fs: Stream[File]): M[Stream[LocalFile]] =
|
||||
fs.foldLeft(Monad[M].pure(Stream.empty[LocalFile]))((acc, f) =>
|
||||
def recurse(fs: Stream[File]): IO[Stream[LocalFile]] =
|
||||
fs.foldLeft(IO.pure(Stream.empty[LocalFile]))((acc, f) =>
|
||||
recurseIntoSubDirectories(f)
|
||||
.flatMap(lfs => acc.map(s => s ++ lfs)))
|
||||
|
||||
|
|
|
@ -3,21 +3,20 @@ package net.kemitix.thorp.core
|
|||
import java.io.{File, FileInputStream}
|
||||
import java.security.MessageDigest
|
||||
|
||||
import cats.Monad
|
||||
import cats.implicits._
|
||||
import cats.effect.IO
|
||||
import net.kemitix.thorp.domain.{Logger, MD5Hash}
|
||||
|
||||
import scala.collection.immutable.NumericRange
|
||||
|
||||
object MD5HashGenerator {
|
||||
|
||||
def md5File[M[_]: Monad](file: File)
|
||||
(implicit logger: Logger[M]): M[MD5Hash] = {
|
||||
def md5File(file: File)
|
||||
(implicit logger: Logger): IO[MD5Hash] = {
|
||||
|
||||
val maxBufferSize = 8048
|
||||
val defaultBuffer = new Array[Byte](maxBufferSize)
|
||||
def openFile = Monad[M].pure(new FileInputStream(file))
|
||||
def closeFile = {fis: FileInputStream => Monad[M].pure(fis.close())}
|
||||
def openFile = IO.pure(new FileInputStream(file))
|
||||
def closeFile = {fis: FileInputStream => IO(fis.close())}
|
||||
|
||||
def nextChunkSize(currentOffset: Long) = {
|
||||
// a value between 1 and maxBufferSize
|
||||
|
@ -36,7 +35,7 @@ object MD5HashGenerator {
|
|||
}
|
||||
|
||||
def digestFile(fis: FileInputStream) =
|
||||
Monad[M].pure {
|
||||
IO {
|
||||
val md5 = MessageDigest getInstance "MD5"
|
||||
NumericRange(0, file.length, maxBufferSize)
|
||||
.foreach { currentOffset => {
|
||||
|
@ -46,7 +45,7 @@ object MD5HashGenerator {
|
|||
(md5.digest map ("%02x" format _)).mkString
|
||||
}
|
||||
|
||||
def readFile: M[String] =
|
||||
def readFile: IO[String] =
|
||||
for {
|
||||
fis <- openFile
|
||||
md5 <- digestFile(fis)
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
package net.kemitix.thorp.core
|
||||
|
||||
import java.nio.file.{Files, Path}
|
||||
|
||||
import cats.effect.IO
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
trait ParseConfigFile {
|
||||
|
||||
def parseFile(filename: Path): IO[Seq[ConfigOption]] =
|
||||
readFile(filename).map(ParseConfigLines.parseLines)
|
||||
|
||||
private def readFile(filename: Path) = {
|
||||
if (Files.exists(filename)) readFileThatExists(filename)
|
||||
else IO.pure(List())
|
||||
}
|
||||
|
||||
private def readFileThatExists(filename: Path) =
|
||||
for {
|
||||
lines <- IO(Files.lines(filename))
|
||||
list = lines.iterator.asScala.toList
|
||||
_ <- IO(lines.close())
|
||||
} yield list
|
||||
|
||||
}
|
||||
|
||||
object ParseConfigFile extends ParseConfigFile
|
|
@ -0,0 +1,43 @@
|
|||
package net.kemitix.thorp.core
|
||||
|
||||
import java.nio.file.Paths
|
||||
import java.util.regex.Pattern
|
||||
|
||||
import net.kemitix.thorp.core.ConfigOption.{Bucket, Debug, Exclude, Include, Prefix, Source}
|
||||
|
||||
trait ParseConfigLines {
|
||||
|
||||
def parseLines(lines: List[String]): List[ConfigOption] =
|
||||
lines.flatMap(parseLine)
|
||||
|
||||
private val pattern = "^\\s*(?<key>\\S*)\\s*=\\s*(?<value>\\S*)\\s*$"
|
||||
private val format = Pattern.compile(pattern)
|
||||
|
||||
private def parseLine(str: String) =
|
||||
format.matcher(str) match {
|
||||
case m if m.matches => parseKeyValue(m.group("key"), m.group("value"))
|
||||
case _ =>None
|
||||
}
|
||||
|
||||
def truthy(value: String): Boolean =
|
||||
value.toLowerCase match {
|
||||
case "true" => true
|
||||
case "yes" => true
|
||||
case "enabled" => true
|
||||
case _ => false
|
||||
}
|
||||
|
||||
private def parseKeyValue(key: String, value: String): Option[ConfigOption] =
|
||||
key.toLowerCase match {
|
||||
case "source" => Some(Source(Paths.get(value)))
|
||||
case "bucket" => Some(Bucket(value))
|
||||
case "prefix" => Some(Prefix(value))
|
||||
case "include" => Some(Include(value))
|
||||
case "exclude" => Some(Exclude(value))
|
||||
case "debug" => if (truthy(value)) Some(Debug()) else None
|
||||
case _ => None
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object ParseConfigLines extends ParseConfigLines
|
|
@ -1,36 +1,55 @@
|
|||
package net.kemitix.thorp.core
|
||||
|
||||
import cats.Monad
|
||||
import cats.effect.IO
|
||||
import cats.implicits._
|
||||
import net.kemitix.thorp.aws.api.{S3Action, S3Client}
|
||||
import net.kemitix.thorp.core.Action.ToDelete
|
||||
import net.kemitix.thorp.core.ActionGenerator.createActions
|
||||
import net.kemitix.thorp.core.ActionSubmitter.submitAction
|
||||
import net.kemitix.thorp.core.ConfigurationBuilder.buildConfig
|
||||
import net.kemitix.thorp.core.LocalFileStream.findFiles
|
||||
import net.kemitix.thorp.core.S3MetaDataEnricher.getMetadata
|
||||
import net.kemitix.thorp.core.SyncLogging.{logFileScan, logRunFinished, logRunStart}
|
||||
import net.kemitix.thorp.domain._
|
||||
|
||||
object Sync {
|
||||
trait Sync {
|
||||
|
||||
def run[M[_]: Monad](config: Config,
|
||||
s3Client: S3Client[M])
|
||||
(implicit logger: Logger[M]): M[Unit] = {
|
||||
def errorMessages(errors: List[ConfigValidation]): List[String] = {
|
||||
for {
|
||||
errorMessages <- errors.map(cv => cv.errorMessage)
|
||||
} yield errorMessages
|
||||
}
|
||||
|
||||
implicit val c: Config = config
|
||||
def apply(s3Client: S3Client)
|
||||
(configOptions: Seq[ConfigOption])
|
||||
(implicit defaultLogger: Logger): IO[Either[List[String], Unit]] =
|
||||
buildConfig(configOptions).flatMap {
|
||||
case Left(errors) => IO.pure(Left(errorMessages(errors.toList)))
|
||||
case Right(config) =>
|
||||
for {
|
||||
_ <- run(config, s3Client, defaultLogger.withDebug(config.debug))
|
||||
} yield Right(())
|
||||
}
|
||||
|
||||
private def run(cliConfig: Config,
|
||||
s3Client: S3Client,
|
||||
logger: Logger): IO[Unit] = {
|
||||
|
||||
implicit val c: Config = cliConfig
|
||||
implicit val l: Logger = logger
|
||||
|
||||
def metaData(s3Data: S3ObjectsData, sFiles: Stream[LocalFile]) =
|
||||
Monad[M].pure(sFiles.map(file => getMetadata(file, s3Data)))
|
||||
IO.pure(sFiles.map(file => getMetadata(file, s3Data)))
|
||||
|
||||
def actions(sData: Stream[S3MetaData]) =
|
||||
Monad[M].pure(sData.flatMap(s3MetaData => createActions(s3MetaData)))
|
||||
IO.pure(sData.flatMap(s3MetaData => createActions(s3MetaData)))
|
||||
|
||||
def submit(sActions: Stream[Action]) =
|
||||
Monad[M].pure(sActions.flatMap(action => submitAction[M](s3Client, action)))
|
||||
IO(sActions.flatMap(action => submitAction(s3Client, action)))
|
||||
|
||||
def copyUploadActions(s3Data: S3ObjectsData): M[Stream[S3Action]] =
|
||||
def copyUploadActions(s3Data: S3ObjectsData): IO[Stream[S3Action]] =
|
||||
(for {
|
||||
files <- findFiles(c.source, MD5HashGenerator.md5File[M](_))
|
||||
files <- findFiles(c.source, MD5HashGenerator.md5File(_))
|
||||
metaData <- metaData(s3Data, files)
|
||||
actions <- actions(metaData)
|
||||
s3Actions <- submit(actions)
|
||||
|
@ -38,23 +57,24 @@ object Sync {
|
|||
.flatten
|
||||
.map(streamS3Actions => streamS3Actions.sorted)
|
||||
|
||||
def deleteActions(s3ObjectsData: S3ObjectsData): M[Stream[S3Action]] =
|
||||
def deleteActions(s3ObjectsData: S3ObjectsData): IO[Stream[S3Action]] =
|
||||
(for {
|
||||
key <- s3ObjectsData.byKey.keys
|
||||
if key.isMissingLocally(c.source, c.prefix)
|
||||
ioDelAction <- submitAction[M](s3Client, ToDelete(c.bucket, key))
|
||||
ioDelAction <- submitAction(s3Client, ToDelete(c.bucket, key))
|
||||
} yield ioDelAction)
|
||||
.toStream
|
||||
.sequence
|
||||
|
||||
for {
|
||||
_ <- logRunStart[M]
|
||||
_ <- logRunStart
|
||||
s3data <- s3Client.listObjects(c.bucket, c.prefix)
|
||||
_ <- logFileScan[M]
|
||||
_ <- logFileScan
|
||||
copyUploadActions <- copyUploadActions(s3data)
|
||||
deleteActions <- deleteActions(s3data)
|
||||
_ <- logRunFinished[M](copyUploadActions ++ deleteActions)
|
||||
_ <- logRunFinished(copyUploadActions ++ deleteActions)
|
||||
} yield ()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object Sync extends Sync
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package net.kemitix.thorp.core
|
||||
|
||||
import cats.Monad
|
||||
import cats.implicits._
|
||||
import cats.effect.IO
|
||||
import net.kemitix.thorp.aws.api.S3Action
|
||||
import net.kemitix.thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action, ErroredS3Action, UploadS3Action}
|
||||
import net.kemitix.thorp.domain.{Config, Logger}
|
||||
|
@ -9,17 +8,17 @@ import net.kemitix.thorp.domain.{Config, Logger}
|
|||
// Logging for the Sync class
|
||||
object SyncLogging {
|
||||
|
||||
def logRunStart[M[_]: Monad](implicit c: Config,
|
||||
logger: Logger[M]): M[Unit] =
|
||||
def logRunStart(implicit c: Config,
|
||||
logger: Logger): IO[Unit] =
|
||||
logger.info(s"Bucket: ${c.bucket.name}, Prefix: ${c.prefix.key}, Source: ${c.source}, ")
|
||||
|
||||
def logFileScan[M[_]: Monad](implicit c: Config,
|
||||
logger: Logger[M]): M[Unit] =
|
||||
def logFileScan(implicit c: Config,
|
||||
logger: Logger): IO[Unit] =
|
||||
logger.info(s"Scanning local files: ${c.source}...")
|
||||
|
||||
def logRunFinished[M[_]: Monad](actions: Stream[S3Action])
|
||||
(implicit c: Config,
|
||||
logger: Logger[M]): M[Unit] = {
|
||||
def logRunFinished(actions: Stream[S3Action])
|
||||
(implicit c: Config,
|
||||
logger: Logger): IO[Unit] = {
|
||||
val counters = actions.foldLeft(Counters())(countActivities)
|
||||
for {
|
||||
_ <- logger.info(s"Uploaded ${counters.uploaded} files")
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
no valid = config items
|
|
@ -0,0 +1,2 @@
|
|||
source = /path/to/source
|
||||
bucket = bucket-name
|
|
@ -1,16 +1,18 @@
|
|||
package net.kemitix.thorp.core
|
||||
|
||||
import cats.Monad
|
||||
import cats.effect.IO
|
||||
import net.kemitix.thorp.domain.Logger
|
||||
|
||||
class DummyLogger[M[_]: Monad] extends Logger[M] {
|
||||
class DummyLogger extends Logger {
|
||||
|
||||
override def debug(message: => String): M[Unit] = Monad[M].unit
|
||||
override def debug(message: => String): IO[Unit] = IO.unit
|
||||
|
||||
override def info(message: =>String): M[Unit] = Monad[M].unit
|
||||
override def info(message: =>String): IO[Unit] = IO.unit
|
||||
|
||||
override def warn(message: String): M[Unit] = Monad[M].unit
|
||||
override def warn(message: String): IO[Unit] = IO.unit
|
||||
|
||||
override def error(message: String): M[Unit] = Monad[M].unit
|
||||
override def error(message: String): IO[Unit] = IO.unit
|
||||
|
||||
override def withDebug(debug: Boolean): Logger = this
|
||||
|
||||
}
|
||||
|
|
|
@ -2,7 +2,7 @@ package net.kemitix.thorp.core
|
|||
|
||||
import java.io.File
|
||||
|
||||
import cats.Id
|
||||
import cats.effect.IO
|
||||
import net.kemitix.thorp.domain.{Config, LocalFile, Logger, MD5Hash}
|
||||
import org.scalatest.FunSpec
|
||||
|
||||
|
@ -10,13 +10,13 @@ class LocalFileStreamSuite extends FunSpec {
|
|||
|
||||
val uploadResource = Resource(this, "upload")
|
||||
implicit val config: Config = Config(source = uploadResource)
|
||||
implicit private val logger: Logger[Id] = new DummyLogger[Id]
|
||||
val md5HashGenerator: File => Id[MD5Hash] = file => MD5HashGenerator.md5File[Id](file)
|
||||
implicit private val logger: Logger = new DummyLogger
|
||||
val md5HashGenerator: File => IO[MD5Hash] = file => MD5HashGenerator.md5File(file)
|
||||
|
||||
describe("findFiles") {
|
||||
it("should find all files") {
|
||||
val result: Set[String] =
|
||||
LocalFileStream.findFiles[Id](uploadResource, md5HashGenerator).toSet
|
||||
LocalFileStream.findFiles(uploadResource, md5HashGenerator).unsafeRunSync.toSet
|
||||
.map { x: LocalFile => x.relative.toString }
|
||||
assertResult(Set("subdir/leaf-file", "root-file"))(result)
|
||||
}
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
package net.kemitix.thorp.core
|
||||
|
||||
import cats.Id
|
||||
import net.kemitix.thorp.core.MD5HashData.rootHash
|
||||
import net.kemitix.thorp.domain._
|
||||
import org.scalatest.FunSpec
|
||||
|
@ -10,12 +9,12 @@ class MD5HashGeneratorTest extends FunSpec {
|
|||
private val source = Resource(this, "upload")
|
||||
private val prefix = RemoteKey("prefix")
|
||||
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
|
||||
implicit private val logger: Logger[Id] = new DummyLogger[Id]
|
||||
implicit private val logger: Logger = new DummyLogger
|
||||
|
||||
describe("read a small file (smaller than buffer)") {
|
||||
val file = Resource(this, "upload/root-file")
|
||||
it("should generate the correct hash") {
|
||||
val result = MD5HashGenerator.md5File[Id](file)
|
||||
val result = MD5HashGenerator.md5File(file).unsafeRunSync
|
||||
assertResult(rootHash)(result)
|
||||
}
|
||||
}
|
||||
|
@ -23,7 +22,7 @@ class MD5HashGeneratorTest extends FunSpec {
|
|||
val file = Resource(this, "big-file")
|
||||
it("should generate the correct hash") {
|
||||
val expected = MD5Hash("b1ab1f7680138e6db7309200584e35d8")
|
||||
val result = MD5HashGenerator.md5File[Id](file)
|
||||
val result = MD5HashGenerator.md5File(file).unsafeRunSync
|
||||
assertResult(expected)(result)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
package net.kemitix.thorp.core
|
||||
|
||||
import java.nio.file.{Path, Paths}
|
||||
|
||||
import org.scalatest.FunSpec
|
||||
|
||||
class ParseConfigFileTest extends FunSpec {
|
||||
|
||||
private def invoke(filename: Path) = ParseConfigFile.parseFile(filename).unsafeRunSync
|
||||
private val empty = List()
|
||||
|
||||
describe("parse a missing file") {
|
||||
val filename = Paths.get("/path/to/missing/file")
|
||||
it("should return no options") {
|
||||
assertResult(empty)(invoke(filename))
|
||||
}
|
||||
}
|
||||
describe("parse an empty file") {
|
||||
val filename = Resource(this, "empty-file").toPath
|
||||
it("should return no options") {
|
||||
assertResult(empty)(invoke(filename))
|
||||
}
|
||||
}
|
||||
describe("parse a file with no valid entries") {
|
||||
val filename = Resource(this, "invalid-config").toPath
|
||||
it("should return no options") {
|
||||
assertResult(empty)(invoke(filename))
|
||||
}
|
||||
}
|
||||
describe("parse a file with properties") {
|
||||
val filename = Resource(this, "simple-config").toPath
|
||||
val expected = List(ConfigOption.Source(Paths.get("/path/to/source")), ConfigOption.Bucket("bucket-name"))
|
||||
it("should return some options") {
|
||||
assertResult(expected)(invoke(filename))
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,74 @@
|
|||
package net.kemitix.thorp.core
|
||||
|
||||
import java.nio.file.Paths
|
||||
|
||||
import org.scalatest.FunSpec
|
||||
|
||||
class ParseConfigLinesTest extends FunSpec {
|
||||
|
||||
describe("parse single lines") {
|
||||
describe("source") {
|
||||
it("should parse") {
|
||||
val expected = List(ConfigOption.Source(Paths.get("/path/to/source")))
|
||||
val result = ParseConfigLines.parseLines(List("source = /path/to/source"))
|
||||
assertResult(expected)(result)
|
||||
}
|
||||
}
|
||||
describe("bucket") {
|
||||
it("should parse") {
|
||||
val expected = List(ConfigOption.Bucket("bucket-name"))
|
||||
val result = ParseConfigLines.parseLines(List("bucket = bucket-name"))
|
||||
assertResult(expected)(result)
|
||||
}
|
||||
}
|
||||
describe("prefix") {
|
||||
it("should parse") {
|
||||
val expected = List(ConfigOption.Prefix("prefix/to/files"))
|
||||
val result = ParseConfigLines.parseLines(List("prefix = prefix/to/files"))
|
||||
assertResult(expected)(result)
|
||||
}
|
||||
}
|
||||
describe("include") {
|
||||
it("should parse") {
|
||||
val expected = List(ConfigOption.Include("path/to/include"))
|
||||
val result = ParseConfigLines.parseLines(List("include = path/to/include"))
|
||||
assertResult(expected)(result)
|
||||
}
|
||||
}
|
||||
describe("exclude") {
|
||||
it("should parse") {
|
||||
val expected = List(ConfigOption.Exclude("path/to/exclude"))
|
||||
val result = ParseConfigLines.parseLines(List("exclude = path/to/exclude"))
|
||||
assertResult(expected)(result)
|
||||
}
|
||||
}
|
||||
describe("debug - true") {
|
||||
it("should parse") {
|
||||
val expected = List(ConfigOption.Debug())
|
||||
val result = ParseConfigLines.parseLines(List("debug = true"))
|
||||
assertResult(expected)(result)
|
||||
}
|
||||
}
|
||||
describe("debug - false") {
|
||||
it("should parse") {
|
||||
val expected = List()
|
||||
val result = ParseConfigLines.parseLines(List("debug = false"))
|
||||
assertResult(expected)(result)
|
||||
}
|
||||
}
|
||||
describe("comment line") {
|
||||
it("should be ignored") {
|
||||
val expected = List()
|
||||
val result = ParseConfigLines.parseLines(List("# ignore me"))
|
||||
assertResult(expected)(result)
|
||||
}
|
||||
}
|
||||
describe("unrecognised option") {
|
||||
it("should be ignored") {
|
||||
val expected = List()
|
||||
val result = ParseConfigLines.parseLines(List("unsupported = option"))
|
||||
assertResult(expected)(result)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -3,7 +3,7 @@ package net.kemitix.thorp.core
|
|||
import java.io.File
|
||||
import java.time.Instant
|
||||
|
||||
import cats.Id
|
||||
import cats.effect.IO
|
||||
import net.kemitix.thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action, UploadS3Action}
|
||||
import net.kemitix.thorp.aws.api.{S3Client, UploadProgressListener}
|
||||
import net.kemitix.thorp.core.MD5HashData.{leafHash, rootHash}
|
||||
|
@ -16,23 +16,32 @@ class SyncSuite
|
|||
|
||||
private val source = Resource(this, "upload")
|
||||
private val prefix = RemoteKey("prefix")
|
||||
val config = Config(Bucket("bucket"), prefix, source = source)
|
||||
implicit private val logger: Logger[Id] = new DummyLogger[Id]
|
||||
private val configOptions = List(
|
||||
ConfigOption.Source(source.toPath),
|
||||
ConfigOption.Bucket("bucket"),
|
||||
ConfigOption.Prefix("prefix")
|
||||
)
|
||||
implicit private val logger: Logger = new DummyLogger
|
||||
private val lastModified = LastModified(Instant.now)
|
||||
|
||||
def putObjectRequest(bucket: Bucket, remoteKey: RemoteKey, localFile: LocalFile): (String, String, File) =
|
||||
(bucket.name, remoteKey.key, localFile.file)
|
||||
|
||||
describe("run") {
|
||||
describe("Sync.apply") {
|
||||
val testBucket = Bucket("bucket")
|
||||
// source contains the files root-file and subdir/leaf-file
|
||||
val rootRemoteKey = RemoteKey("prefix/root-file")
|
||||
val leafRemoteKey = RemoteKey("prefix/subdir/leaf-file")
|
||||
|
||||
def invokeSubject(s3Client: RecordingClient, configOptions: List[ConfigOption]) = {
|
||||
Sync(s3Client)(configOptions).unsafeRunSync
|
||||
}
|
||||
|
||||
describe("when all files should be uploaded") {
|
||||
val s3Client = new RecordingClient(testBucket, S3ObjectsData(
|
||||
byHash = Map(),
|
||||
byKey = Map()))
|
||||
Sync.run(config, s3Client)
|
||||
invokeSubject(s3Client, configOptions)
|
||||
it("uploads all files") {
|
||||
val expectedUploads = Map(
|
||||
"subdir/leaf-file" -> leafRemoteKey,
|
||||
|
@ -58,7 +67,7 @@ class SyncSuite
|
|||
RemoteKey("prefix/root-file") -> HashModified(rootHash, lastModified),
|
||||
RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified)))
|
||||
val s3Client = new RecordingClient(testBucket, s3ObjectsData)
|
||||
Sync.run(config, s3Client)
|
||||
invokeSubject(s3Client, configOptions)
|
||||
it("uploads nothing") {
|
||||
val expectedUploads = Map()
|
||||
assertResult(expectedUploads)(s3Client.uploadsRecord)
|
||||
|
@ -82,7 +91,7 @@ class SyncSuite
|
|||
RemoteKey("prefix/root-file-old") -> HashModified(rootHash, lastModified),
|
||||
RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified)))
|
||||
val s3Client = new RecordingClient(testBucket, s3ObjectsData)
|
||||
Sync.run(config, s3Client)
|
||||
invokeSubject(s3Client, configOptions)
|
||||
it("uploads nothing") {
|
||||
val expectedUploads = Map()
|
||||
assertResult(expectedUploads)(s3Client.uploadsRecord)
|
||||
|
@ -110,17 +119,16 @@ class SyncSuite
|
|||
byKey = Map(
|
||||
deletedKey -> HashModified(deletedHash, lastModified)))
|
||||
val s3Client = new RecordingClient(testBucket, s3ObjectsData)
|
||||
Sync.run(config, s3Client)
|
||||
invokeSubject(s3Client, configOptions)
|
||||
it("deleted key") {
|
||||
val expectedDeletions = Set(deletedKey)
|
||||
assertResult(expectedDeletions)(s3Client.deletionsRecord)
|
||||
}
|
||||
}
|
||||
describe("when a file is excluded") {
|
||||
val config: Config = Config(Bucket("bucket"), prefix, source = source, filters = List(Exclude("leaf")))
|
||||
val s3ObjectsData = S3ObjectsData(Map(), Map())
|
||||
val s3Client = new RecordingClient(testBucket, s3ObjectsData)
|
||||
Sync.run(config, s3Client)
|
||||
invokeSubject(s3Client, ConfigOption.Exclude("leaf") :: configOptions)
|
||||
it("is not uploaded") {
|
||||
val expectedUploads = Map(
|
||||
"root-file" -> rootRemoteKey
|
||||
|
@ -132,7 +140,7 @@ class SyncSuite
|
|||
|
||||
class RecordingClient(testBucket: Bucket,
|
||||
s3ObjectsData: S3ObjectsData)
|
||||
extends S3Client[Id] {
|
||||
extends S3Client {
|
||||
|
||||
var uploadsRecord: Map[String, RemoteKey] = Map()
|
||||
var copiesRecord: Map[RemoteKey, RemoteKey] = Map()
|
||||
|
@ -140,8 +148,8 @@ class SyncSuite
|
|||
|
||||
override def listObjects(bucket: Bucket,
|
||||
prefix: RemoteKey)
|
||||
(implicit logger: Logger[Id]): S3ObjectsData =
|
||||
s3ObjectsData
|
||||
(implicit logger: Logger): IO[S3ObjectsData] =
|
||||
IO.pure(s3ObjectsData)
|
||||
|
||||
override def upload(localFile: LocalFile,
|
||||
bucket: Bucket,
|
||||
|
@ -149,28 +157,28 @@ class SyncSuite
|
|||
multiPartThreshold: Long,
|
||||
tryCount: Int,
|
||||
maxRetries: Int)
|
||||
(implicit logger: Logger[Id]): UploadS3Action = {
|
||||
(implicit logger: Logger): IO[UploadS3Action] = {
|
||||
if (bucket == testBucket)
|
||||
uploadsRecord += (localFile.relative.toString -> localFile.remoteKey)
|
||||
UploadS3Action(localFile.remoteKey, MD5Hash("some hash value"))
|
||||
IO.pure(UploadS3Action(localFile.remoteKey, MD5Hash("some hash value")))
|
||||
}
|
||||
|
||||
override def copy(bucket: Bucket,
|
||||
sourceKey: RemoteKey,
|
||||
hash: MD5Hash,
|
||||
targetKey: RemoteKey
|
||||
)(implicit logger: Logger[Id]): CopyS3Action = {
|
||||
)(implicit logger: Logger): IO[CopyS3Action] = {
|
||||
if (bucket == testBucket)
|
||||
copiesRecord += (sourceKey -> targetKey)
|
||||
CopyS3Action(targetKey)
|
||||
IO.pure(CopyS3Action(targetKey))
|
||||
}
|
||||
|
||||
override def delete(bucket: Bucket,
|
||||
remoteKey: RemoteKey
|
||||
)(implicit logger: Logger[Id]): DeleteS3Action = {
|
||||
)(implicit logger: Logger): IO[DeleteS3Action] = {
|
||||
if (bucket == testBucket)
|
||||
deletionsRecord += remoteKey
|
||||
DeleteS3Action(remoteKey)
|
||||
IO.pure(DeleteS3Action(remoteKey))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,6 +11,5 @@ final case class Config(
|
|||
debug: Boolean = false,
|
||||
source: File
|
||||
) {
|
||||
require(source.isDirectory, s"Source must be a directory: $source")
|
||||
require(multiPartThreshold >= 1024 * 1024 * 5, s"Threshold for multi-part upload is 5Mb: '$multiPartThreshold'")
|
||||
}
|
||||
|
|
|
@ -1,10 +1,17 @@
|
|||
package net.kemitix.thorp.domain
|
||||
|
||||
trait Logger[M[_]] {
|
||||
import cats.effect.IO
|
||||
|
||||
def debug(message: => String): M[Unit]
|
||||
def info(message: => String): M[Unit]
|
||||
def warn(message: String): M[Unit]
|
||||
def error(message: String): M[Unit]
|
||||
trait Logger {
|
||||
|
||||
// returns an instance of Logger with debug set as indicated
|
||||
// where the current Logger already matches this state, then
|
||||
// it returns itself, unmodified
|
||||
def withDebug(debug: Boolean): Logger
|
||||
|
||||
def debug(message: => String): IO[Unit]
|
||||
def info(message: => String): IO[Unit]
|
||||
def warn(message: String): IO[Unit]
|
||||
def error(message: String): IO[Unit]
|
||||
|
||||
}
|
||||
|
|
|
@ -4,10 +4,21 @@ import java.io.File
|
|||
import java.nio.file.Paths
|
||||
|
||||
final case class RemoteKey(key: String) {
|
||||
|
||||
def asFile(source: File, prefix: RemoteKey): File =
|
||||
source.toPath.resolve(Paths.get(prefix.key).relativize(Paths.get(key))).toFile
|
||||
source.toPath.resolve(relativeTo(prefix)).toFile
|
||||
|
||||
private def relativeTo(prefix: RemoteKey) = {
|
||||
prefix match {
|
||||
case RemoteKey("") => Paths.get(prefix.key)
|
||||
case _ => Paths.get(prefix.key).relativize(Paths.get(key))
|
||||
}
|
||||
}
|
||||
|
||||
def isMissingLocally(source: File, prefix: RemoteKey): Boolean =
|
||||
! asFile(source, prefix).exists
|
||||
|
||||
def resolve(path: String): RemoteKey =
|
||||
RemoteKey(key + "/" + path)
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue