Restructure sync to use a State with foldLeft around actions (#74)
* [changelog] updated * [cli] Program rename parameter * [core] Add AppState * [core] Synchronise rought draft replacement for Sync Uses the AppState * [core] Synchronise as sequential for-comprehensions * [core] Synchronise as nested for-comprehensions * [sbt] thorp(root) depends on cli moduke * [core] Synchronise extract methods * [core] Synchronise rewritten * [core] Synchronise generates actions * [core] Remove AppState * [core] ActionSubmitter remove unused implicit config parameter * [cli] Program rewritten to use Synchronise * [core] Synchronise useValidConfig accepts Logger implicitly * [core] Synchronise reorder methods * [core] Synchronise refactor errorMessages * [core] SyncLogging logRunStart accepts explicit parameters * [core] remove old Sync * [core] Synchronise restore logRunStart * [domain] Terminal add types to public methods and values * [domain] UploadEventLogger force flush to terminal Also make part of the progress message in green. Not flushing, by using println, cause odd behaviour. Works on normal terminal, but not great in an emacs terminal. Oh well. * [core] SyncLogging.logRunFinished remove unused parameters * [cli] Program restore final summary * [storage-aws] remove logging from module * [core] ThorpArchive replaces ActionSubmitter ActionSubmitter implementation becomes UnversionedMirrorArchive * [domain] cleaner upload progress messages * [cli] Program remove unused Logger * [cli] Program rename parameter * [core] SyncSuite use Synchronise * [sbt] Allow storage-aws to share core test classes * [domain] LocalFile stop storing a lambda The lambda breaks the equality test between LocalFile instances. * [core] MD4HashData add missing base64 digest for leafFile * [core] Synchronise drop DoNothing actions * [core] SyncSuite update tests * [sbt] aggregate modules from root module
This commit is contained in:
parent
9d2271fdcf
commit
0f8708e19f
22 changed files with 269 additions and 374 deletions
|
@ -10,6 +10,7 @@ The format is based on [[https://keepachangelog.com/en/1.0.0/][Keep a Changelog]
|
||||||
** Added
|
** Added
|
||||||
|
|
||||||
- Abstraction layer encapsulating S3 as Storage (#76)
|
- Abstraction layer encapsulating S3 as Storage (#76)
|
||||||
|
- Multiple copies of file are only uploaded once (#74)
|
||||||
|
|
||||||
* [0.5.0] - 2019-06-21
|
* [0.5.0] - 2019-06-21
|
||||||
|
|
||||||
|
|
|
@ -45,12 +45,12 @@ val catsEffectsSettings = Seq(
|
||||||
|
|
||||||
lazy val thorp = (project in file("."))
|
lazy val thorp = (project in file("."))
|
||||||
.settings(commonSettings)
|
.settings(commonSettings)
|
||||||
|
.aggregate(cli, `thorp-lib`, `storage-aws`, core, `storage-api`, domain)
|
||||||
|
|
||||||
lazy val cli = (project in file("cli"))
|
lazy val cli = (project in file("cli"))
|
||||||
.settings(commonSettings)
|
.settings(commonSettings)
|
||||||
.settings(mainClass in assembly := Some("net.kemitix.thorp.cli.Main"))
|
.settings(mainClass in assembly := Some("net.kemitix.thorp.cli.Main"))
|
||||||
.settings(applicationSettings)
|
.settings(applicationSettings)
|
||||||
.aggregate(`thorp-lib`, `storage-aws`, core, `storage-api`, domain)
|
|
||||||
.settings(commandLineParsing)
|
.settings(commandLineParsing)
|
||||||
.settings(testDependencies)
|
.settings(testDependencies)
|
||||||
.dependsOn(`thorp-lib`)
|
.dependsOn(`thorp-lib`)
|
||||||
|
@ -65,7 +65,7 @@ lazy val `storage-aws` = (project in file("storage-aws"))
|
||||||
.settings(assemblyJarName in assembly := "storage-aws.jar")
|
.settings(assemblyJarName in assembly := "storage-aws.jar")
|
||||||
.settings(awsSdkDependencies)
|
.settings(awsSdkDependencies)
|
||||||
.settings(testDependencies)
|
.settings(testDependencies)
|
||||||
.dependsOn(core)
|
.dependsOn(core % "compile->compile;test->test")
|
||||||
|
|
||||||
lazy val core = (project in file("core"))
|
lazy val core = (project in file("core"))
|
||||||
.settings(commonSettings)
|
.settings(commonSettings)
|
||||||
|
|
|
@ -1,24 +1,34 @@
|
||||||
package net.kemitix.thorp.cli
|
package net.kemitix.thorp.cli
|
||||||
|
|
||||||
import cats.effect.{ExitCode, IO}
|
import cats.effect.{ExitCode, IO}
|
||||||
import net.kemitix.thorp.core.{ConfigOption, Sync}
|
import cats.implicits._
|
||||||
import net.kemitix.thorp.domain.Logger
|
import net.kemitix.thorp.core._
|
||||||
|
import net.kemitix.thorp.domain.{Logger, StorageQueueEvent}
|
||||||
import net.kemitix.thorp.storage.aws.S3StorageServiceBuilder.defaultStorageService
|
import net.kemitix.thorp.storage.aws.S3StorageServiceBuilder.defaultStorageService
|
||||||
|
|
||||||
trait Program {
|
trait Program {
|
||||||
|
|
||||||
def apply(configOptions: Seq[ConfigOption]): IO[ExitCode] = {
|
def apply(cliOptions: Seq[ConfigOption]): IO[ExitCode] = {
|
||||||
implicit val logger: Logger = new PrintLogger()
|
implicit val logger: Logger = new PrintLogger()
|
||||||
Sync(defaultStorageService)(configOptions) flatMap {
|
Synchronise(defaultStorageService, cliOptions).flatMap {
|
||||||
case Left(errors) =>
|
case Left(errors) =>
|
||||||
for {
|
for {
|
||||||
_ <- logger.error(s"There were errors:")
|
_ <- logger.error(s"There were errors:")
|
||||||
_ <- IO.pure(errors.map(error => logger.error(s" - $error")))
|
_ <- errors.map(error => logger.error(s" - $error")).sequence
|
||||||
} yield ExitCode.Error
|
} yield ExitCode.Error
|
||||||
case Right(_) => IO.pure(ExitCode.Success)
|
case Right(actions) =>
|
||||||
|
for {
|
||||||
|
events <- handleActions(UnversionedMirrorArchive.default(defaultStorageService), actions)
|
||||||
|
_ <- SyncLogging.logRunFinished(events)
|
||||||
|
} yield ExitCode.Success
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private def handleActions(archive: ThorpArchive,
|
||||||
|
actions: Stream[Action]): IO[Stream[StorageQueueEvent]] =
|
||||||
|
actions.foldRight(Stream[IO[StorageQueueEvent]]()) {
|
||||||
|
(action, stream) => archive.update(action) ++ stream
|
||||||
|
}.sequence
|
||||||
}
|
}
|
||||||
|
|
||||||
object Program extends Program
|
object Program extends Program
|
|
@ -29,7 +29,7 @@ object LocalFileStream {
|
||||||
file match {
|
file match {
|
||||||
case f if f.isDirectory => loop(file)
|
case f if f.isDirectory => loop(file)
|
||||||
case _ => for(hash <- md5HashGenerator(file))
|
case _ => for(hash <- md5HashGenerator(file))
|
||||||
yield Stream(domain.LocalFile(file, c.source, hash, generateKey(c.source, c.prefix)))
|
yield Stream(domain.LocalFile(file, c.source, hash, generateKey(c.source, c.prefix)(file)))
|
||||||
}
|
}
|
||||||
|
|
||||||
def recurse(fs: Stream[File]): IO[Stream[LocalFile]] =
|
def recurse(fs: Stream[File]): IO[Stream[LocalFile]] =
|
||||||
|
|
|
@ -1,85 +0,0 @@
|
||||||
package net.kemitix.thorp.core
|
|
||||||
|
|
||||||
import cats.effect.IO
|
|
||||||
import cats.implicits._
|
|
||||||
import net.kemitix.thorp.core.Action.ToDelete
|
|
||||||
import net.kemitix.thorp.core.ActionGenerator.createActions
|
|
||||||
import net.kemitix.thorp.core.ActionSubmitter.submitAction
|
|
||||||
import net.kemitix.thorp.core.ConfigurationBuilder.buildConfig
|
|
||||||
import net.kemitix.thorp.core.LocalFileStream.findFiles
|
|
||||||
import net.kemitix.thorp.core.S3MetaDataEnricher.getMetadata
|
|
||||||
import net.kemitix.thorp.core.SyncLogging.{logFileScan, logRunFinished, logRunStart}
|
|
||||||
import net.kemitix.thorp.domain._
|
|
||||||
import net.kemitix.thorp.storage.api.StorageService
|
|
||||||
|
|
||||||
trait Sync {
|
|
||||||
|
|
||||||
def errorMessages(errors: List[ConfigValidation]): List[String] = {
|
|
||||||
for {
|
|
||||||
errorMessages <- errors.map(cv => cv.errorMessage)
|
|
||||||
} yield errorMessages
|
|
||||||
}
|
|
||||||
|
|
||||||
def apply(storageService: StorageService)
|
|
||||||
(configOptions: Seq[ConfigOption])
|
|
||||||
(implicit defaultLogger: Logger): IO[Either[List[String], Unit]] =
|
|
||||||
buildConfig(configOptions).flatMap {
|
|
||||||
case Right(config) => runWithValidConfig(storageService, defaultLogger, config)
|
|
||||||
case Left(errors) => IO.pure(Left(errorMessages(errors.toList)))
|
|
||||||
}
|
|
||||||
|
|
||||||
private def runWithValidConfig(storageService: StorageService,
|
|
||||||
defaultLogger: Logger,
|
|
||||||
config: Config) = {
|
|
||||||
for {
|
|
||||||
_ <- run(config, storageService, defaultLogger.withDebug(config.debug))
|
|
||||||
} yield Right(())
|
|
||||||
}
|
|
||||||
|
|
||||||
private def run(cliConfig: Config,
|
|
||||||
storageService: StorageService,
|
|
||||||
logger: Logger): IO[Unit] = {
|
|
||||||
|
|
||||||
implicit val c: Config = cliConfig
|
|
||||||
implicit val l: Logger = logger
|
|
||||||
|
|
||||||
def metaData(s3Data: S3ObjectsData, sFiles: Stream[LocalFile]) =
|
|
||||||
IO.pure(sFiles.map(file => getMetadata(file, s3Data)))
|
|
||||||
|
|
||||||
def actions(sData: Stream[S3MetaData]) =
|
|
||||||
IO.pure(sData.flatMap(s3MetaData => createActions(s3MetaData)))
|
|
||||||
|
|
||||||
def submit(sActions: Stream[Action]) =
|
|
||||||
IO(sActions.flatMap(action => submitAction(storageService, action)))
|
|
||||||
|
|
||||||
def copyUploadActions(s3Data: S3ObjectsData): IO[Stream[StorageQueueEvent]] =
|
|
||||||
(for {
|
|
||||||
files <- findFiles(c.source, MD5HashGenerator.md5File(_))
|
|
||||||
metaData <- metaData(s3Data, files)
|
|
||||||
actions <- actions(metaData)
|
|
||||||
s3Actions <- submit(actions)
|
|
||||||
} yield s3Actions.sequence)
|
|
||||||
.flatten
|
|
||||||
.map(streamS3Actions => streamS3Actions.sorted)
|
|
||||||
|
|
||||||
def deleteActions(s3ObjectsData: S3ObjectsData): IO[Stream[StorageQueueEvent]] =
|
|
||||||
(for {
|
|
||||||
key <- s3ObjectsData.byKey.keys
|
|
||||||
if key.isMissingLocally(c.source, c.prefix)
|
|
||||||
ioDelAction <- submitAction(storageService, ToDelete(c.bucket, key))
|
|
||||||
} yield ioDelAction)
|
|
||||||
.toStream
|
|
||||||
.sequence
|
|
||||||
|
|
||||||
for {
|
|
||||||
_ <- logRunStart
|
|
||||||
s3data <- storageService.listObjects(c.bucket, c.prefix)
|
|
||||||
_ <- logFileScan
|
|
||||||
copyUploadActions <- copyUploadActions(s3data)
|
|
||||||
deleteActions <- deleteActions(s3data)
|
|
||||||
_ <- logRunFinished(copyUploadActions ++ deleteActions)
|
|
||||||
} yield ()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
object Sync extends Sync
|
|
|
@ -1,16 +1,19 @@
|
||||||
package net.kemitix.thorp.core
|
package net.kemitix.thorp.core
|
||||||
|
|
||||||
|
import java.io.File
|
||||||
|
|
||||||
import cats.effect.IO
|
import cats.effect.IO
|
||||||
import cats.implicits._
|
import cats.implicits._
|
||||||
import net.kemitix.thorp.domain.{Config, Logger, StorageQueueEvent}
|
import net.kemitix.thorp.domain.{Bucket, Config, Logger, RemoteKey, StorageQueueEvent}
|
||||||
import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, ErrorQueueEvent, UploadQueueEvent}
|
import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, ErrorQueueEvent, UploadQueueEvent}
|
||||||
|
|
||||||
// Logging for the Sync class
|
|
||||||
trait SyncLogging {
|
trait SyncLogging {
|
||||||
|
|
||||||
def logRunStart(implicit c: Config,
|
def logRunStart(bucket: Bucket,
|
||||||
logger: Logger): IO[Unit] =
|
prefix: RemoteKey,
|
||||||
logger.info(s"Bucket: ${c.bucket.name}, Prefix: ${c.prefix.key}, Source: ${c.source}, ")
|
source: File)
|
||||||
|
(implicit logger: Logger): IO[Unit] =
|
||||||
|
logger.info(s"Bucket: ${bucket.name}, Prefix: ${prefix.key}, Source: $source, ")
|
||||||
|
|
||||||
def logFileScan(implicit c: Config,
|
def logFileScan(implicit c: Config,
|
||||||
logger: Logger): IO[Unit] =
|
logger: Logger): IO[Unit] =
|
||||||
|
@ -26,8 +29,7 @@ trait SyncLogging {
|
||||||
} yield ()
|
} yield ()
|
||||||
|
|
||||||
def logRunFinished(actions: Stream[StorageQueueEvent])
|
def logRunFinished(actions: Stream[StorageQueueEvent])
|
||||||
(implicit c: Config,
|
(implicit logger: Logger): IO[Unit] = {
|
||||||
logger: Logger): IO[Unit] = {
|
|
||||||
val counters = actions.foldLeft(Counters())(countActivities)
|
val counters = actions.foldLeft(Counters())(countActivities)
|
||||||
for {
|
for {
|
||||||
_ <- logger.info(s"Uploaded ${counters.uploaded} files")
|
_ <- logger.info(s"Uploaded ${counters.uploaded} files")
|
||||||
|
@ -38,8 +40,7 @@ trait SyncLogging {
|
||||||
} yield ()
|
} yield ()
|
||||||
}
|
}
|
||||||
|
|
||||||
private def countActivities(implicit c: Config,
|
private def countActivities: (Counters, StorageQueueEvent) => Counters =
|
||||||
logger: Logger): (Counters, StorageQueueEvent) => Counters =
|
|
||||||
(counters: Counters, s3Action: StorageQueueEvent) => {
|
(counters: Counters, s3Action: StorageQueueEvent) => {
|
||||||
s3Action match {
|
s3Action match {
|
||||||
case _: UploadQueueEvent =>
|
case _: UploadQueueEvent =>
|
||||||
|
|
73
core/src/main/scala/net/kemitix/thorp/core/Synchronise.scala
Normal file
73
core/src/main/scala/net/kemitix/thorp/core/Synchronise.scala
Normal file
|
@ -0,0 +1,73 @@
|
||||||
|
package net.kemitix.thorp.core
|
||||||
|
|
||||||
|
import cats.data.NonEmptyChain
|
||||||
|
import cats.effect.IO
|
||||||
|
import cats.implicits._
|
||||||
|
import net.kemitix.thorp.core.Action.DoNothing
|
||||||
|
import net.kemitix.thorp.domain.{Config, LocalFile, Logger, RemoteKey, S3ObjectsData}
|
||||||
|
import net.kemitix.thorp.storage.api.StorageService
|
||||||
|
|
||||||
|
trait Synchronise {
|
||||||
|
|
||||||
|
def apply(storageService: StorageService,
|
||||||
|
configOptions: Seq[ConfigOption])
|
||||||
|
(implicit logger: Logger): IO[Either[List[String], Stream[Action]]] =
|
||||||
|
ConfigurationBuilder.buildConfig(configOptions)
|
||||||
|
.flatMap {
|
||||||
|
case Left(errors) => IO.pure(Left(errorMessages(errors)))
|
||||||
|
case Right(config) => useValidConfig(storageService, config)
|
||||||
|
}
|
||||||
|
|
||||||
|
def errorMessages(errors: NonEmptyChain[ConfigValidation]): List[String] =
|
||||||
|
errors.map(cv => cv.errorMessage).toList
|
||||||
|
|
||||||
|
def removeDoNothing: Action => Boolean = {
|
||||||
|
case _: DoNothing => false
|
||||||
|
case _ => true
|
||||||
|
}
|
||||||
|
|
||||||
|
def useValidConfig(storageService: StorageService,
|
||||||
|
config: Config)
|
||||||
|
(implicit logger: Logger): IO[Either[List[String], Stream[Action]]] = {
|
||||||
|
for {
|
||||||
|
_ <- SyncLogging.logRunStart(config.bucket, config.prefix, config.source)
|
||||||
|
actions <- gatherMetadata(storageService, logger, config)
|
||||||
|
.map { md =>
|
||||||
|
val (rd, ld) = md
|
||||||
|
val actions1 = actionsForLocalFiles(config, ld, rd)
|
||||||
|
val actions2 = actionsForRemoteKeys(config, rd)
|
||||||
|
Right((actions1 ++ actions2).filter(removeDoNothing))
|
||||||
|
}
|
||||||
|
} yield actions
|
||||||
|
}
|
||||||
|
|
||||||
|
private def gatherMetadata(storageService: StorageService,
|
||||||
|
logger: Logger,
|
||||||
|
config: Config) =
|
||||||
|
for {
|
||||||
|
remoteData <- fetchRemoteData(storageService, config)
|
||||||
|
localData <- findLocalFiles(config, logger)
|
||||||
|
} yield (remoteData, localData)
|
||||||
|
|
||||||
|
private def actionsForLocalFiles(config: Config, localData: Stream[LocalFile], remoteData: S3ObjectsData) =
|
||||||
|
localData.foldLeft(Stream[Action]())((acc, lf) => createActionFromLocalFile(config, lf, remoteData) ++ acc)
|
||||||
|
|
||||||
|
private def actionsForRemoteKeys(config: Config, remoteData: S3ObjectsData) =
|
||||||
|
remoteData.byKey.keys.foldLeft(Stream[Action]())((acc, rk) => createActionFromRemoteKey(config, rk) #:: acc)
|
||||||
|
|
||||||
|
private def fetchRemoteData(storageService: StorageService, config: Config) =
|
||||||
|
storageService.listObjects(config.bucket, config.prefix)
|
||||||
|
|
||||||
|
private def findLocalFiles(implicit config: Config, l: Logger) =
|
||||||
|
LocalFileStream.findFiles(config.source, MD5HashGenerator.md5File(_))
|
||||||
|
|
||||||
|
private def createActionFromLocalFile(c: Config, lf: LocalFile, remoteData: S3ObjectsData) =
|
||||||
|
ActionGenerator.createActions(S3MetaDataEnricher.getMetadata(lf, remoteData)(c))(c)
|
||||||
|
|
||||||
|
private def createActionFromRemoteKey(c: Config, rk: RemoteKey) =
|
||||||
|
if (rk.isMissingLocally(c.source, c.prefix)) Action.ToDelete(c.bucket, rk)
|
||||||
|
else DoNothing(c.bucket, rk)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
object Synchronise extends Synchronise
|
|
@ -0,0 +1,10 @@
|
||||||
|
package net.kemitix.thorp.core
|
||||||
|
|
||||||
|
import cats.effect.IO
|
||||||
|
import net.kemitix.thorp.domain.StorageQueueEvent
|
||||||
|
|
||||||
|
trait ThorpArchive {
|
||||||
|
|
||||||
|
def update(action: Action): Stream[IO[StorageQueueEvent]]
|
||||||
|
|
||||||
|
}
|
|
@ -2,38 +2,33 @@ package net.kemitix.thorp.core
|
||||||
|
|
||||||
import cats.effect.IO
|
import cats.effect.IO
|
||||||
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
|
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
|
||||||
import net.kemitix.thorp.domain.{Config, Logger, StorageQueueEvent, UploadEventListener}
|
|
||||||
import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent
|
import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent
|
||||||
|
import net.kemitix.thorp.domain.{StorageQueueEvent, UploadEventListener}
|
||||||
import net.kemitix.thorp.storage.api.StorageService
|
import net.kemitix.thorp.storage.api.StorageService
|
||||||
|
|
||||||
trait ActionSubmitter {
|
case class UnversionedMirrorArchive(storageService: StorageService) extends ThorpArchive {
|
||||||
|
override def update(action: Action): Stream[IO[StorageQueueEvent]] =
|
||||||
def submitAction(storageService: StorageService,
|
|
||||||
action: Action)
|
|
||||||
(implicit c: Config,
|
|
||||||
logger: Logger): Stream[IO[StorageQueueEvent]] = {
|
|
||||||
Stream(
|
Stream(
|
||||||
action match {
|
action match {
|
||||||
case ToUpload(bucket, localFile) =>
|
case ToUpload(bucket, localFile) =>
|
||||||
for {
|
for {
|
||||||
_ <- logger.info(s" Upload: ${localFile.relative}")
|
event <- storageService.upload(localFile, bucket, new UploadEventListener(localFile), 1)
|
||||||
uploadEventListener = new UploadEventListener(localFile)
|
|
||||||
event <- storageService.upload(localFile, bucket, uploadEventListener, 1)
|
|
||||||
} yield event
|
} yield event
|
||||||
case ToCopy(bucket, sourceKey, hash, targetKey) =>
|
case ToCopy(bucket, sourceKey, hash, targetKey) =>
|
||||||
for {
|
for {
|
||||||
_ <- logger.info(s" Copy: ${sourceKey.key} => ${targetKey.key}")
|
|
||||||
event <- storageService.copy(bucket, sourceKey, hash, targetKey)
|
event <- storageService.copy(bucket, sourceKey, hash, targetKey)
|
||||||
} yield event
|
} yield event
|
||||||
case ToDelete(bucket, remoteKey) =>
|
case ToDelete(bucket, remoteKey) =>
|
||||||
for {
|
for {
|
||||||
_ <- logger.info(s" Delete: ${remoteKey.key}")
|
|
||||||
event <- storageService.delete(bucket, remoteKey)
|
event <- storageService.delete(bucket, remoteKey)
|
||||||
} yield event
|
} yield event
|
||||||
case DoNothing(_, remoteKey) =>
|
case DoNothing(_, remoteKey) =>
|
||||||
IO.pure(DoNothingQueueEvent(remoteKey))
|
IO.pure(DoNothingQueueEvent(remoteKey))
|
||||||
})
|
})
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
object ActionSubmitter extends ActionSubmitter
|
object UnversionedMirrorArchive {
|
||||||
|
def default(storageService: StorageService): ThorpArchive =
|
||||||
|
new UnversionedMirrorArchive(storageService)
|
||||||
|
}
|
|
@ -6,6 +6,6 @@ object MD5HashData {
|
||||||
|
|
||||||
val rootHash = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e", Some("o6asEaDrV3uBs7tclcyKbg=="))
|
val rootHash = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e", Some("o6asEaDrV3uBs7tclcyKbg=="))
|
||||||
|
|
||||||
val leafHash = MD5Hash("208386a650bdec61cfcd7bd8dcb6b542")
|
val leafHash = MD5Hash("208386a650bdec61cfcd7bd8dcb6b542", Some("IIOGplC97GHPzXvY3La1Qg=="))
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ import java.io.File
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
|
|
||||||
import cats.effect.IO
|
import cats.effect.IO
|
||||||
|
import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload}
|
||||||
import net.kemitix.thorp.core.MD5HashData.{leafHash, rootHash}
|
import net.kemitix.thorp.core.MD5HashData.{leafHash, rootHash}
|
||||||
import net.kemitix.thorp.domain._
|
import net.kemitix.thorp.domain._
|
||||||
import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, UploadQueueEvent}
|
import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, UploadQueueEvent}
|
||||||
|
@ -28,15 +29,16 @@ class SyncSuite
|
||||||
def putObjectRequest(bucket: Bucket, remoteKey: RemoteKey, localFile: LocalFile): (String, String, File) =
|
def putObjectRequest(bucket: Bucket, remoteKey: RemoteKey, localFile: LocalFile): (String, String, File) =
|
||||||
(bucket.name, remoteKey.key, localFile.file)
|
(bucket.name, remoteKey.key, localFile.file)
|
||||||
|
|
||||||
describe("Sync.apply") {
|
|
||||||
val testBucket = Bucket("bucket")
|
val testBucket = Bucket("bucket")
|
||||||
// source contains the files root-file and subdir/leaf-file
|
// source contains the files root-file and subdir/leaf-file
|
||||||
val rootRemoteKey = RemoteKey("prefix/root-file")
|
val rootRemoteKey = RemoteKey("prefix/root-file")
|
||||||
val leafRemoteKey = RemoteKey("prefix/subdir/leaf-file")
|
val leafRemoteKey = RemoteKey("prefix/subdir/leaf-file")
|
||||||
|
val rootFile: LocalFile = LocalFile.resolve("root-file", rootHash, source, _ => rootRemoteKey)
|
||||||
|
val leafFile: LocalFile = LocalFile.resolve("subdir/leaf-file", leafHash, source, _ => leafRemoteKey)
|
||||||
|
|
||||||
def invokeSubject(storageService: StorageService,
|
def invokeSubject(storageService: StorageService,
|
||||||
configOptions: List[ConfigOption]) = {
|
configOptions: List[ConfigOption]): Either[List[String], Stream[Action]] = {
|
||||||
Sync(storageService)(configOptions).unsafeRunSync
|
Synchronise(storageService, configOptions).unsafeRunSync
|
||||||
}
|
}
|
||||||
|
|
||||||
describe("when all files should be uploaded") {
|
describe("when all files should be uploaded") {
|
||||||
|
@ -44,21 +46,12 @@ class SyncSuite
|
||||||
byHash = Map(),
|
byHash = Map(),
|
||||||
byKey = Map()))
|
byKey = Map()))
|
||||||
it("uploads all files") {
|
it("uploads all files") {
|
||||||
val expectedUploads = Map(
|
val expected = Stream(
|
||||||
"subdir/leaf-file" -> leafRemoteKey,
|
ToUpload(testBucket, rootFile),
|
||||||
"root-file" -> rootRemoteKey)
|
ToUpload(testBucket, leafFile))
|
||||||
invokeSubject(storageService, configOptions)
|
val result = invokeSubject(storageService, configOptions)
|
||||||
assertResult(expectedUploads)(storageService.uploadsRecord)
|
assert(result.isRight)
|
||||||
}
|
assertResult(expected)(result.right.get)
|
||||||
it("copies nothing") {
|
|
||||||
val expectedCopies = Map()
|
|
||||||
invokeSubject(storageService, configOptions)
|
|
||||||
assertResult(expectedCopies)(storageService.copiesRecord)
|
|
||||||
}
|
|
||||||
it("deletes nothing") {
|
|
||||||
val expectedDeletions = Set()
|
|
||||||
invokeSubject(storageService, configOptions)
|
|
||||||
assertResult(expectedDeletions)(storageService.deletionsRecord)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
describe("when no files should be uploaded") {
|
describe("when no files should be uploaded") {
|
||||||
|
@ -70,46 +63,33 @@ class SyncSuite
|
||||||
RemoteKey("prefix/root-file") -> HashModified(rootHash, lastModified),
|
RemoteKey("prefix/root-file") -> HashModified(rootHash, lastModified),
|
||||||
RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified)))
|
RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified)))
|
||||||
val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
|
val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
|
||||||
it("uploads nothing") {
|
it("no actions") {
|
||||||
val expectedUploads = Map()
|
val expected = Stream()
|
||||||
invokeSubject(storageService, configOptions)
|
val result = invokeSubject(storageService, configOptions)
|
||||||
assertResult(expectedUploads)(storageService.uploadsRecord)
|
assert(result.isRight)
|
||||||
}
|
assertResult(expected)(result.right.get)
|
||||||
it("copies nothing") {
|
|
||||||
val expectedCopies = Map()
|
|
||||||
invokeSubject(storageService, configOptions)
|
|
||||||
assertResult(expectedCopies)(storageService.copiesRecord)
|
|
||||||
}
|
|
||||||
it("deletes nothing") {
|
|
||||||
val expectedDeletions = Set()
|
|
||||||
invokeSubject(storageService, configOptions)
|
|
||||||
assertResult(expectedDeletions)(storageService.deletionsRecord)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
describe("when a file is renamed it is moved on S3 with no upload") {
|
describe("when a file is renamed it is moved on S3 with no upload") {
|
||||||
|
val sourceKey = RemoteKey("prefix/root-file-old")
|
||||||
|
val targetKey = RemoteKey("prefix/root-file")
|
||||||
// 'root-file-old' should be renamed as 'root-file'
|
// 'root-file-old' should be renamed as 'root-file'
|
||||||
val s3ObjectsData = S3ObjectsData(
|
val s3ObjectsData = S3ObjectsData(
|
||||||
byHash = Map(
|
byHash = Map(
|
||||||
rootHash -> Set(KeyModified(RemoteKey("prefix/root-file-old"), lastModified)),
|
rootHash -> Set(KeyModified(sourceKey, lastModified)),
|
||||||
leafHash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))),
|
leafHash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))),
|
||||||
byKey = Map(
|
byKey = Map(
|
||||||
RemoteKey("prefix/root-file-old") -> HashModified(rootHash, lastModified),
|
sourceKey -> HashModified(rootHash, lastModified),
|
||||||
RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified)))
|
RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified)))
|
||||||
val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
|
val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
|
||||||
it("uploads nothing") {
|
it("copies the file and deletes the original") {
|
||||||
invokeSubject(storageService, configOptions)
|
val expected = Stream(
|
||||||
val expectedUploads = Map()
|
ToCopy(testBucket, sourceKey, rootHash, targetKey),
|
||||||
assertResult(expectedUploads)(storageService.uploadsRecord)
|
ToDelete(testBucket, sourceKey)
|
||||||
}
|
)
|
||||||
it("copies the file") {
|
val result = invokeSubject(storageService, configOptions)
|
||||||
val expectedCopies = Map(RemoteKey("prefix/root-file-old") -> RemoteKey("prefix/root-file"))
|
assert(result.isRight)
|
||||||
invokeSubject(storageService, configOptions)
|
assertResult(expected)(result.right.get)
|
||||||
assertResult(expectedCopies)(storageService.copiesRecord)
|
|
||||||
}
|
|
||||||
it("deletes the original") {
|
|
||||||
val expectedDeletions = Set(RemoteKey("prefix/root-file-old"))
|
|
||||||
invokeSubject(storageService, configOptions)
|
|
||||||
assertResult(expectedDeletions)(storageService.deletionsRecord)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
describe("when a file is copied it is copied on S3 with no upload") {
|
describe("when a file is copied it is copied on S3 with no upload") {
|
||||||
|
@ -122,26 +102,37 @@ class SyncSuite
|
||||||
val deletedKey = RemoteKey("prefix/deleted-file")
|
val deletedKey = RemoteKey("prefix/deleted-file")
|
||||||
val s3ObjectsData = S3ObjectsData(
|
val s3ObjectsData = S3ObjectsData(
|
||||||
byHash = Map(
|
byHash = Map(
|
||||||
|
rootHash -> Set(KeyModified(RemoteKey("prefix/root-file"), lastModified)),
|
||||||
|
leafHash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified)),
|
||||||
deletedHash -> Set(KeyModified(RemoteKey("prefix/deleted-file"), lastModified))),
|
deletedHash -> Set(KeyModified(RemoteKey("prefix/deleted-file"), lastModified))),
|
||||||
byKey = Map(
|
byKey = Map(
|
||||||
|
RemoteKey("prefix/root-file") -> HashModified(rootHash, lastModified),
|
||||||
|
RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified),
|
||||||
deletedKey -> HashModified(deletedHash, lastModified)))
|
deletedKey -> HashModified(deletedHash, lastModified)))
|
||||||
val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
|
val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
|
||||||
it("deleted key") {
|
it("deleted key") {
|
||||||
val expectedDeletions = Set(deletedKey)
|
val expected = Stream(
|
||||||
invokeSubject(storageService, configOptions)
|
ToDelete(testBucket, deletedKey)
|
||||||
assertResult(expectedDeletions)(storageService.deletionsRecord)
|
)
|
||||||
|
val result = invokeSubject(storageService, configOptions)
|
||||||
|
assert(result.isRight)
|
||||||
|
assertResult(expected)(result.right.get)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
describe("when a file is excluded") {
|
describe("when a file is excluded") {
|
||||||
val s3ObjectsData = S3ObjectsData(Map(), Map())
|
val s3ObjectsData = S3ObjectsData(
|
||||||
|
byHash = Map(
|
||||||
|
rootHash -> Set(KeyModified(RemoteKey("prefix/root-file"), lastModified)),
|
||||||
|
leafHash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))),
|
||||||
|
byKey = Map(
|
||||||
|
RemoteKey("prefix/root-file") -> HashModified(rootHash, lastModified),
|
||||||
|
RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified)))
|
||||||
val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
|
val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
|
||||||
it("is not uploaded") {
|
it("is not uploaded") {
|
||||||
val expectedUploads = Map(
|
val expected = Stream()
|
||||||
"root-file" -> rootRemoteKey
|
val result = invokeSubject(storageService, ConfigOption.Exclude("leaf") :: configOptions)
|
||||||
)
|
assert(result.isRight)
|
||||||
invokeSubject(storageService, ConfigOption.Exclude("leaf") :: configOptions)
|
assertResult(expected)(result.right.get)
|
||||||
assertResult(expectedUploads)(storageService.uploadsRecord)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -149,40 +140,26 @@ class SyncSuite
|
||||||
s3ObjectsData: S3ObjectsData)
|
s3ObjectsData: S3ObjectsData)
|
||||||
extends StorageService {
|
extends StorageService {
|
||||||
|
|
||||||
var uploadsRecord: Map[String, RemoteKey] = Map()
|
|
||||||
var copiesRecord: Map[RemoteKey, RemoteKey] = Map()
|
|
||||||
var deletionsRecord: Set[RemoteKey] = Set()
|
|
||||||
|
|
||||||
override def listObjects(bucket: Bucket,
|
override def listObjects(bucket: Bucket,
|
||||||
prefix: RemoteKey)
|
prefix: RemoteKey): IO[S3ObjectsData] =
|
||||||
(implicit logger: Logger): IO[S3ObjectsData] =
|
|
||||||
IO.pure(s3ObjectsData)
|
IO.pure(s3ObjectsData)
|
||||||
|
|
||||||
override def upload(localFile: LocalFile,
|
override def upload(localFile: LocalFile,
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
uploadEventListener: UploadEventListener,
|
uploadEventListener: UploadEventListener,
|
||||||
tryCount: Int)
|
tryCount: Int): IO[UploadQueueEvent] = {
|
||||||
(implicit logger: Logger): IO[UploadQueueEvent] = {
|
|
||||||
if (bucket == testBucket)
|
|
||||||
uploadsRecord += (localFile.relative.toString -> localFile.remoteKey)
|
|
||||||
IO.pure(UploadQueueEvent(localFile.remoteKey, localFile.hash))
|
IO.pure(UploadQueueEvent(localFile.remoteKey, localFile.hash))
|
||||||
}
|
}
|
||||||
|
|
||||||
override def copy(bucket: Bucket,
|
override def copy(bucket: Bucket,
|
||||||
sourceKey: RemoteKey,
|
sourceKey: RemoteKey,
|
||||||
hash: MD5Hash,
|
hash: MD5Hash,
|
||||||
targetKey: RemoteKey
|
targetKey: RemoteKey): IO[CopyQueueEvent] = {
|
||||||
)(implicit logger: Logger): IO[CopyQueueEvent] = {
|
|
||||||
if (bucket == testBucket)
|
|
||||||
copiesRecord += (sourceKey -> targetKey)
|
|
||||||
IO.pure(CopyQueueEvent(targetKey))
|
IO.pure(CopyQueueEvent(targetKey))
|
||||||
}
|
}
|
||||||
|
|
||||||
override def delete(bucket: Bucket,
|
override def delete(bucket: Bucket,
|
||||||
remoteKey: RemoteKey
|
remoteKey: RemoteKey): IO[DeleteQueueEvent] = {
|
||||||
)(implicit logger: Logger): IO[DeleteQueueEvent] = {
|
|
||||||
if (bucket == testBucket)
|
|
||||||
deletionsRecord += remoteKey
|
|
||||||
IO.pure(DeleteQueueEvent(remoteKey))
|
IO.pure(DeleteQueueEvent(remoteKey))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,13 +3,10 @@ package net.kemitix.thorp.domain
|
||||||
import java.io.File
|
import java.io.File
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
|
|
||||||
final case class LocalFile(file: File, source: File, hash: MD5Hash, keyGenerator: File => RemoteKey) {
|
final case class LocalFile(file: File, source: File, hash: MD5Hash, remoteKey: RemoteKey) {
|
||||||
|
|
||||||
require(!file.isDirectory, s"LocalFile must not be a directory: $file")
|
require(!file.isDirectory, s"LocalFile must not be a directory: $file")
|
||||||
|
|
||||||
// the equivalent location of the file on S3
|
|
||||||
def remoteKey: RemoteKey = keyGenerator(file)
|
|
||||||
|
|
||||||
def isDirectory: Boolean = file.isDirectory
|
def isDirectory: Boolean = file.isDirectory
|
||||||
|
|
||||||
// the path of the file within the source
|
// the path of the file within the source
|
||||||
|
@ -25,6 +22,6 @@ object LocalFile {
|
||||||
source: File,
|
source: File,
|
||||||
fileToKey: File => RemoteKey): LocalFile = {
|
fileToKey: File => RemoteKey): LocalFile = {
|
||||||
val file = source.toPath.resolve(path).toFile
|
val file = source.toPath.resolve(path).toFile
|
||||||
LocalFile(file, source, md5Hash, fileToKey)
|
LocalFile(file, source, md5Hash, fileToKey(file))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,116 +10,116 @@ object Terminal {
|
||||||
*
|
*
|
||||||
* Stops at the edge of the screen.
|
* Stops at the edge of the screen.
|
||||||
*/
|
*/
|
||||||
def cursorUp(lines: Int = 1) = csi + lines + "A"
|
def cursorUp(lines: Int = 1): String = csi + lines + "A"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Move the cursor down, default 1 line.
|
* Move the cursor down, default 1 line.
|
||||||
*
|
*
|
||||||
* Stops at the edge of the screen.
|
* Stops at the edge of the screen.
|
||||||
*/
|
*/
|
||||||
def cursorDown(lines: Int = 1) = csi + lines + "B"
|
def cursorDown(lines: Int = 1): String = csi + lines + "B"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Move the cursor forward, default 1 column.
|
* Move the cursor forward, default 1 column.
|
||||||
*
|
*
|
||||||
* Stops at the edge of the screen.
|
* Stops at the edge of the screen.
|
||||||
*/
|
*/
|
||||||
def cursorForward(cols: Int = 1) = csi + cols + "C"
|
def cursorForward(cols: Int = 1): String = csi + cols + "C"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Move the cursor back, default 1 column,
|
* Move the cursor back, default 1 column,
|
||||||
*
|
*
|
||||||
* Stops at the edge of the screen.
|
* Stops at the edge of the screen.
|
||||||
*/
|
*/
|
||||||
def cursorBack(cols: Int = 1) = csi + cols + "D"
|
def cursorBack(cols: Int = 1): String = csi + cols + "D"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Move the cursor to the beginning of the line, default 1, down.
|
* Move the cursor to the beginning of the line, default 1, down.
|
||||||
*/
|
*/
|
||||||
def cursorNextLine(lines: Int = 1) = csi + lines + "E"
|
def cursorNextLine(lines: Int = 1): String = csi + lines + "E"
|
||||||
/**
|
/**
|
||||||
* Move the cursor to the beginning of the line, default 1, up.
|
* Move the cursor to the beginning of the line, default 1, up.
|
||||||
*/
|
*/
|
||||||
def cursorPrevLine(lines: Int = 1) = csi + lines + "F"
|
def cursorPrevLine(lines: Int = 1): String = csi + lines + "F"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Move the cursor to the column on the current line.
|
* Move the cursor to the column on the current line.
|
||||||
*/
|
*/
|
||||||
def cursorHorizAbs(col: Int) = csi + col + "G"
|
def cursorHorizAbs(col: Int): String = csi + col + "G"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Move the cursor to the position on screen (1,1 is the top-left).
|
* Move the cursor to the position on screen (1,1 is the top-left).
|
||||||
*/
|
*/
|
||||||
def cursorPosition(row: Int, col: Int) = csi + row + ";" + col + "H"
|
def cursorPosition(row: Int, col: Int): String = csi + row + ";" + col + "H"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Clear from cursor to end of screen.
|
* Clear from cursor to end of screen.
|
||||||
*/
|
*/
|
||||||
val eraseToEndOfScreen = csi + "0J"
|
val eraseToEndOfScreen: String = csi + "0J"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Clear from cursor to beginning of screen.
|
* Clear from cursor to beginning of screen.
|
||||||
*/
|
*/
|
||||||
val eraseToStartOfScreen = csi + "1J"
|
val eraseToStartOfScreen: String = csi + "1J"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Clear screen and move cursor to top-left.
|
* Clear screen and move cursor to top-left.
|
||||||
*
|
*
|
||||||
* On DOS the "2J" command also moves to 1,1, so we force that behaviour for all.
|
* On DOS the "2J" command also moves to 1,1, so we force that behaviour for all.
|
||||||
*/
|
*/
|
||||||
val eraseScreen = csi + "2J" + cursorPosition(1, 1)
|
val eraseScreen: String = csi + "2J" + cursorPosition(1, 1)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Clear screen and scrollback buffer then move cursor to top-left.
|
* Clear screen and scrollback buffer then move cursor to top-left.
|
||||||
*
|
*
|
||||||
* Anticipate that same DOS behaviour here, and to maintain consistency with {@link #eraseScreen}.
|
* Anticipate that same DOS behaviour here, and to maintain consistency with {@link #eraseScreen}.
|
||||||
*/
|
*/
|
||||||
val eraseScreenAndBuffer = csi + "3J"
|
val eraseScreenAndBuffer: String = csi + "3J"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Clears the terminal line to the right of the cursor.
|
* Clears the terminal line to the right of the cursor.
|
||||||
*
|
*
|
||||||
* Does not move the cursor.
|
* Does not move the cursor.
|
||||||
*/
|
*/
|
||||||
val eraseLineForward = csi + "0K"
|
val eraseLineForward: String = csi + "0K"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Clears the terminal line to the left of the cursor.
|
* Clears the terminal line to the left of the cursor.
|
||||||
*
|
*
|
||||||
* Does not move the cursor.
|
* Does not move the cursor.
|
||||||
*/
|
*/
|
||||||
val eraseLineBack= csi + "1K"
|
val eraseLineBack: String = csi + "1K"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Clears the whole terminal line.
|
* Clears the whole terminal line.
|
||||||
*
|
*
|
||||||
* Does not move the cursor.
|
* Does not move the cursor.
|
||||||
*/
|
*/
|
||||||
val eraseLine = csi + "2K"
|
val eraseLine: String = csi + "2K"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Scroll page up, default 1, lines.
|
* Scroll page up, default 1, lines.
|
||||||
*/
|
*/
|
||||||
def scrollUp(lines: Int = 1) = csi + lines + "S"
|
def scrollUp(lines: Int = 1): String = csi + lines + "S"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Scroll page down, default 1, lines.
|
* Scroll page down, default 1, lines.
|
||||||
*/
|
*/
|
||||||
def scrollDown(lines: Int = 1) = csi + lines + "T"
|
def scrollDown(lines: Int = 1): String = csi + lines + "T"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Saves the cursor position/state.
|
* Saves the cursor position/state.
|
||||||
*/
|
*/
|
||||||
val saveCursorPosition = csi + "s"
|
val saveCursorPosition: String = csi + "s"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Restores the cursor position/state.
|
* Restores the cursor position/state.
|
||||||
*/
|
*/
|
||||||
val restoreCursorPosition = csi + "u"
|
val restoreCursorPosition: String = csi + "u"
|
||||||
|
|
||||||
val enableAlternateBuffer = csi + "?1049h"
|
val enableAlternateBuffer: String = csi + "?1049h"
|
||||||
|
|
||||||
val disableAlternateBuffer = csi + "?1049l"
|
val disableAlternateBuffer: String = csi + "?1049l"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The Width of the terminal, as reported by the COLUMNS environment variable.
|
* The Width of the terminal, as reported by the COLUMNS environment variable.
|
||||||
|
@ -135,7 +135,7 @@ object Terminal {
|
||||||
.getOrElse(80)
|
.getOrElse(80)
|
||||||
}
|
}
|
||||||
|
|
||||||
val subBars = Map(
|
private val subBars = Map(
|
||||||
0 -> " ",
|
0 -> " ",
|
||||||
1 -> "▏",
|
1 -> "▏",
|
||||||
2 -> "▎",
|
2 -> "▎",
|
||||||
|
|
|
@ -4,6 +4,8 @@ import net.kemitix.thorp.domain.SizeTranslation.sizeInEnglish
|
||||||
import net.kemitix.thorp.domain.Terminal._
|
import net.kemitix.thorp.domain.Terminal._
|
||||||
import net.kemitix.thorp.domain.UploadEvent.RequestEvent
|
import net.kemitix.thorp.domain.UploadEvent.RequestEvent
|
||||||
|
|
||||||
|
import scala.io.AnsiColor._
|
||||||
|
|
||||||
trait UploadEventLogger {
|
trait UploadEventLogger {
|
||||||
|
|
||||||
def logRequestCycle(localFile: LocalFile,
|
def logRequestCycle(localFile: LocalFile,
|
||||||
|
@ -15,9 +17,10 @@ trait UploadEventLogger {
|
||||||
val bar = progressBar(bytesTransferred, fileLength.toDouble, Terminal.width)
|
val bar = progressBar(bytesTransferred, fileLength.toDouble, Terminal.width)
|
||||||
val transferred = sizeInEnglish(bytesTransferred)
|
val transferred = sizeInEnglish(bytesTransferred)
|
||||||
val fileSize = sizeInEnglish(fileLength)
|
val fileSize = sizeInEnglish(fileLength)
|
||||||
print(s"${eraseLine}Uploading $transferred of $fileSize : $remoteKey\n$bar${cursorUp()}\r")
|
val message = s"${GREEN}Uploaded $transferred of $fileSize $RESET: $remoteKey$eraseLineForward"
|
||||||
|
println(s"$message\n$bar${Terminal.cursorPrevLine() * 2}")
|
||||||
} else
|
} else
|
||||||
print(eraseLine)
|
println(s"${GREEN}Uploaded:$RESET $remoteKey$eraseLineForward")
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,23 +6,19 @@ import net.kemitix.thorp.domain._
|
||||||
trait StorageService {
|
trait StorageService {
|
||||||
|
|
||||||
def listObjects(bucket: Bucket,
|
def listObjects(bucket: Bucket,
|
||||||
prefix: RemoteKey
|
prefix: RemoteKey): IO[S3ObjectsData]
|
||||||
)(implicit logger: Logger): IO[S3ObjectsData]
|
|
||||||
|
|
||||||
def upload(localFile: LocalFile,
|
def upload(localFile: LocalFile,
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
uploadEventListener: UploadEventListener,
|
uploadEventListener: UploadEventListener,
|
||||||
tryCount: Int)
|
tryCount: Int): IO[StorageQueueEvent]
|
||||||
(implicit logger: Logger): IO[StorageQueueEvent]
|
|
||||||
|
|
||||||
def copy(bucket: Bucket,
|
def copy(bucket: Bucket,
|
||||||
sourceKey: RemoteKey,
|
sourceKey: RemoteKey,
|
||||||
hash: MD5Hash,
|
hash: MD5Hash,
|
||||||
targetKey: RemoteKey
|
targetKey: RemoteKey): IO[StorageQueueEvent]
|
||||||
)(implicit logger: Logger): IO[StorageQueueEvent]
|
|
||||||
|
|
||||||
def delete(bucket: Bucket,
|
def delete(bucket: Bucket,
|
||||||
remoteKey: RemoteKey
|
remoteKey: RemoteKey): IO[StorageQueueEvent]
|
||||||
)(implicit logger: Logger): IO[StorageQueueEvent]
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,19 +5,15 @@ import com.amazonaws.services.s3.AmazonS3
|
||||||
import com.amazonaws.services.s3.model.CopyObjectRequest
|
import com.amazonaws.services.s3.model.CopyObjectRequest
|
||||||
import net.kemitix.thorp.domain.StorageQueueEvent.CopyQueueEvent
|
import net.kemitix.thorp.domain.StorageQueueEvent.CopyQueueEvent
|
||||||
import net.kemitix.thorp.domain._
|
import net.kemitix.thorp.domain._
|
||||||
import net.kemitix.thorp.storage.aws.S3ClientLogging.{logCopyStart, logCopyFinish}
|
|
||||||
|
|
||||||
class S3ClientCopier(amazonS3: AmazonS3) {
|
class S3ClientCopier(amazonS3: AmazonS3) {
|
||||||
|
|
||||||
def copy(bucket: Bucket,
|
def copy(bucket: Bucket,
|
||||||
sourceKey: RemoteKey,
|
sourceKey: RemoteKey,
|
||||||
hash: MD5Hash,
|
hash: MD5Hash,
|
||||||
targetKey: RemoteKey)
|
targetKey: RemoteKey): IO[StorageQueueEvent] =
|
||||||
(implicit logger: Logger): IO[StorageQueueEvent] =
|
|
||||||
for {
|
for {
|
||||||
_ <- logCopyStart(bucket, sourceKey, targetKey)
|
|
||||||
_ <- copyObject(bucket, sourceKey, hash, targetKey)
|
_ <- copyObject(bucket, sourceKey, hash, targetKey)
|
||||||
_ <- logCopyFinish(bucket, sourceKey,targetKey)
|
|
||||||
} yield CopyQueueEvent(targetKey)
|
} yield CopyQueueEvent(targetKey)
|
||||||
|
|
||||||
private def copyObject(bucket: Bucket,
|
private def copyObject(bucket: Bucket,
|
||||||
|
|
|
@ -4,18 +4,14 @@ import cats.effect.IO
|
||||||
import com.amazonaws.services.s3.AmazonS3
|
import com.amazonaws.services.s3.AmazonS3
|
||||||
import com.amazonaws.services.s3.model.DeleteObjectRequest
|
import com.amazonaws.services.s3.model.DeleteObjectRequest
|
||||||
import net.kemitix.thorp.domain.StorageQueueEvent.DeleteQueueEvent
|
import net.kemitix.thorp.domain.StorageQueueEvent.DeleteQueueEvent
|
||||||
import net.kemitix.thorp.domain.{Bucket, Logger, RemoteKey}
|
import net.kemitix.thorp.domain.{Bucket, RemoteKey}
|
||||||
import net.kemitix.thorp.storage.aws.S3ClientLogging.{logDeleteStart, logDeleteFinish}
|
|
||||||
|
|
||||||
class S3ClientDeleter(amazonS3: AmazonS3) {
|
class S3ClientDeleter(amazonS3: AmazonS3) {
|
||||||
|
|
||||||
def delete(bucket: Bucket,
|
def delete(bucket: Bucket,
|
||||||
remoteKey: RemoteKey)
|
remoteKey: RemoteKey): IO[DeleteQueueEvent] =
|
||||||
(implicit logger: Logger): IO[DeleteQueueEvent] =
|
|
||||||
for {
|
for {
|
||||||
_ <- logDeleteStart(bucket, remoteKey)
|
|
||||||
_ <- deleteObject(bucket, remoteKey)
|
_ <- deleteObject(bucket, remoteKey)
|
||||||
_ <- logDeleteFinish(bucket, remoteKey)
|
|
||||||
} yield DeleteQueueEvent(remoteKey)
|
} yield DeleteQueueEvent(remoteKey)
|
||||||
|
|
||||||
private def deleteObject(bucket: Bucket, remoteKey: RemoteKey) =
|
private def deleteObject(bucket: Bucket, remoteKey: RemoteKey) =
|
||||||
|
|
|
@ -1,40 +0,0 @@
|
||||||
package net.kemitix.thorp.storage.aws
|
|
||||||
|
|
||||||
import cats.effect.IO
|
|
||||||
import net.kemitix.thorp.domain.{Bucket, Logger, RemoteKey}
|
|
||||||
|
|
||||||
object S3ClientLogging {
|
|
||||||
|
|
||||||
def logListObjectsStart(bucket: Bucket,
|
|
||||||
prefix: RemoteKey)
|
|
||||||
(implicit logger: Logger): IO[Unit] =
|
|
||||||
logger.info(s"Fetch S3 Summary: ${bucket.name}:${prefix.key}")
|
|
||||||
|
|
||||||
def logListObjectsFinish(bucket: Bucket,
|
|
||||||
prefix: RemoteKey)
|
|
||||||
(implicit logger: Logger): IO[Unit] =
|
|
||||||
logger.info(s"Fetched S3 Summary: ${bucket.name}:${prefix.key}")
|
|
||||||
|
|
||||||
def logCopyStart(bucket: Bucket,
|
|
||||||
sourceKey: RemoteKey,
|
|
||||||
targetKey: RemoteKey)
|
|
||||||
(implicit logger: Logger): IO[Unit] =
|
|
||||||
logger.info(s"Copy: ${bucket.name}:${sourceKey.key} => ${targetKey.key}")
|
|
||||||
|
|
||||||
def logCopyFinish(bucket: Bucket,
|
|
||||||
sourceKey: RemoteKey,
|
|
||||||
targetKey: RemoteKey)
|
|
||||||
(implicit logger: Logger): IO[Unit] =
|
|
||||||
logger.info(s"Copied: ${bucket.name}:${sourceKey.key} => ${targetKey.key}")
|
|
||||||
|
|
||||||
def logDeleteStart(bucket: Bucket,
|
|
||||||
remoteKey: RemoteKey)
|
|
||||||
(implicit logger: Logger): IO[Unit] =
|
|
||||||
logger.info(s"Delete: ${bucket.name}:${remoteKey.key}")
|
|
||||||
|
|
||||||
def logDeleteFinish(bucket: Bucket,
|
|
||||||
remoteKey: RemoteKey)
|
|
||||||
(implicit logger: Logger): IO[Unit] =
|
|
||||||
logger.info(s"Deleted: ${bucket.name}:${remoteKey.key}")
|
|
||||||
|
|
||||||
}
|
|
|
@ -4,8 +4,7 @@ import cats.effect.IO
|
||||||
import com.amazonaws.services.s3.AmazonS3
|
import com.amazonaws.services.s3.AmazonS3
|
||||||
import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary}
|
import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary}
|
||||||
import net.kemitix.thorp.domain
|
import net.kemitix.thorp.domain
|
||||||
import net.kemitix.thorp.domain.{Bucket, Logger, RemoteKey, S3ObjectsData}
|
import net.kemitix.thorp.domain.{Bucket, RemoteKey, S3ObjectsData}
|
||||||
import net.kemitix.thorp.storage.aws.S3ClientLogging.{logListObjectsStart, logListObjectsFinish}
|
|
||||||
import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash
|
import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash
|
||||||
import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey
|
import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey
|
||||||
|
|
||||||
|
@ -14,8 +13,7 @@ import scala.collection.JavaConverters._
|
||||||
class S3ClientObjectLister(amazonS3: AmazonS3) {
|
class S3ClientObjectLister(amazonS3: AmazonS3) {
|
||||||
|
|
||||||
def listObjects(bucket: Bucket,
|
def listObjects(bucket: Bucket,
|
||||||
prefix: RemoteKey)
|
prefix: RemoteKey): IO[S3ObjectsData] = {
|
||||||
(implicit logger: Logger): IO[S3ObjectsData] = {
|
|
||||||
|
|
||||||
type Token = String
|
type Token = String
|
||||||
type Batch = (Stream[S3ObjectSummary], Option[Token])
|
type Batch = (Stream[S3ObjectSummary], Option[Token])
|
||||||
|
@ -50,10 +48,7 @@ class S3ClientObjectLister(amazonS3: AmazonS3) {
|
||||||
} yield summaries ++ rest
|
} yield summaries ++ rest
|
||||||
|
|
||||||
for {
|
for {
|
||||||
_ <- logListObjectsStart(bucket, prefix)
|
summaries <- fetch(new ListObjectsV2Request().withBucketName(bucket.name).withPrefix(prefix.key))
|
||||||
r = new ListObjectsV2Request().withBucketName(bucket.name).withPrefix(prefix.key)
|
|
||||||
summaries <- fetch(r)
|
|
||||||
_ <- logListObjectsFinish(bucket, prefix)
|
|
||||||
} yield domain.S3ObjectsData(byHash(summaries), byKey(summaries))
|
} yield domain.S3ObjectsData(byHash(summaries), byKey(summaries))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,27 +16,23 @@ class S3StorageService(amazonS3Client: => AmazonS3,
|
||||||
lazy val deleter = new S3ClientDeleter(amazonS3Client)
|
lazy val deleter = new S3ClientDeleter(amazonS3Client)
|
||||||
|
|
||||||
override def listObjects(bucket: Bucket,
|
override def listObjects(bucket: Bucket,
|
||||||
prefix: RemoteKey)
|
prefix: RemoteKey): IO[S3ObjectsData] =
|
||||||
(implicit logger: Logger): IO[S3ObjectsData] =
|
|
||||||
objectLister.listObjects(bucket, prefix)
|
objectLister.listObjects(bucket, prefix)
|
||||||
|
|
||||||
override def copy(bucket: Bucket,
|
override def copy(bucket: Bucket,
|
||||||
sourceKey: RemoteKey,
|
sourceKey: RemoteKey,
|
||||||
hash: MD5Hash,
|
hash: MD5Hash,
|
||||||
targetKey: RemoteKey)
|
targetKey: RemoteKey): IO[StorageQueueEvent] =
|
||||||
(implicit logger: Logger): IO[StorageQueueEvent] =
|
|
||||||
copier.copy(bucket, sourceKey,hash, targetKey)
|
copier.copy(bucket, sourceKey,hash, targetKey)
|
||||||
|
|
||||||
override def upload(localFile: LocalFile,
|
override def upload(localFile: LocalFile,
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
uploadEventListener: UploadEventListener,
|
uploadEventListener: UploadEventListener,
|
||||||
tryCount: Int)
|
tryCount: Int): IO[StorageQueueEvent] =
|
||||||
(implicit logger: Logger): IO[StorageQueueEvent] =
|
|
||||||
uploader.upload(localFile, bucket, uploadEventListener, 1)
|
uploader.upload(localFile, bucket, uploadEventListener, 1)
|
||||||
|
|
||||||
override def delete(bucket: Bucket,
|
override def delete(bucket: Bucket,
|
||||||
remoteKey: RemoteKey)
|
remoteKey: RemoteKey): IO[StorageQueueEvent] =
|
||||||
(implicit logger: Logger): IO[StorageQueueEvent] =
|
|
||||||
deleter.delete(bucket, remoteKey)
|
deleter.delete(bucket, remoteKey)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,6 @@ import com.amazonaws.services.s3.transfer.{TransferManager => AmazonTransferMana
|
||||||
import net.kemitix.thorp.domain.StorageQueueEvent.{ErrorQueueEvent, UploadQueueEvent}
|
import net.kemitix.thorp.domain.StorageQueueEvent.{ErrorQueueEvent, UploadQueueEvent}
|
||||||
import net.kemitix.thorp.domain.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent}
|
import net.kemitix.thorp.domain.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent}
|
||||||
import net.kemitix.thorp.domain.{StorageQueueEvent, _}
|
import net.kemitix.thorp.domain.{StorageQueueEvent, _}
|
||||||
import net.kemitix.thorp.storage.aws.UploaderLogging.{logMultiPartUploadStart, logMultiPartUploadFinished}
|
|
||||||
|
|
||||||
import scala.util.Try
|
import scala.util.Try
|
||||||
|
|
||||||
|
@ -17,16 +16,13 @@ class Uploader(transferManager: => AmazonTransferManager) {
|
||||||
def upload(localFile: LocalFile,
|
def upload(localFile: LocalFile,
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
uploadEventListener: UploadEventListener,
|
uploadEventListener: UploadEventListener,
|
||||||
tryCount: Int)
|
tryCount: Int): IO[StorageQueueEvent] =
|
||||||
(implicit logger: Logger): IO[StorageQueueEvent] =
|
|
||||||
for {
|
for {
|
||||||
_ <- logMultiPartUploadStart(localFile, tryCount)
|
|
||||||
upload <- transfer(localFile, bucket, uploadEventListener)
|
upload <- transfer(localFile, bucket, uploadEventListener)
|
||||||
action = upload match {
|
action = upload match {
|
||||||
case Right(r) => UploadQueueEvent(RemoteKey(r.getKey), MD5Hash(r.getETag))
|
case Right(r) => UploadQueueEvent(RemoteKey(r.getKey), MD5Hash(r.getETag))
|
||||||
case Left(e) => ErrorQueueEvent(localFile.remoteKey, e)
|
case Left(e) => ErrorQueueEvent(localFile.remoteKey, e)
|
||||||
}
|
}
|
||||||
_ <- logMultiPartUploadFinished(localFile)
|
|
||||||
} yield action
|
} yield action
|
||||||
|
|
||||||
private def transfer(localFile: LocalFile,
|
private def transfer(localFile: LocalFile,
|
||||||
|
|
|
@ -1,22 +0,0 @@
|
||||||
package net.kemitix.thorp.storage.aws
|
|
||||||
|
|
||||||
import cats.effect.IO
|
|
||||||
import net.kemitix.thorp.domain.SizeTranslation.sizeInEnglish
|
|
||||||
import net.kemitix.thorp.domain.Terminal._
|
|
||||||
import net.kemitix.thorp.domain.{LocalFile, Logger}
|
|
||||||
|
|
||||||
object UploaderLogging {
|
|
||||||
|
|
||||||
def logMultiPartUploadStart(localFile: LocalFile,
|
|
||||||
tryCount: Int)
|
|
||||||
(implicit logger: Logger): IO[Unit] = {
|
|
||||||
val tryMessage = if (tryCount == 1) "" else s"try $tryCount"
|
|
||||||
val size = sizeInEnglish(localFile.file.length)
|
|
||||||
logger.info(s"${eraseLine}upload:$tryMessage:$size:${localFile.remoteKey.key}")
|
|
||||||
}
|
|
||||||
|
|
||||||
def logMultiPartUploadFinished(localFile: LocalFile)
|
|
||||||
(implicit logger: Logger): IO[Unit] =
|
|
||||||
logger.debug(s"upload:finished: ${localFile.remoteKey.key}")
|
|
||||||
|
|
||||||
}
|
|
Loading…
Reference in a new issue