Display total size and progress for entire run (#94)

* [changelog] updated

* [core] Wrap Stream[LocalFile] as LocalFiles

* [core] LocalFiles counts files

* [core] LocalFiles sums file lengths

* [core] Restore logFileScan

* [storage-aws] Lister logs when fetching object summaries

* [storage-aws] Extract ListerLogger

* [core] Synchronise use leftMap

* [core] Syncronise extract assemblePlan

* [core] Wrap Stream[Action] in SyncPlan

* [core] Copy the file count and totalSizeBytes across to SyncPlan

* [cli] Program rename actions as syncPlan

* [cli] Program extract thorpArchive def

* [cli] Program extract createPlan def

* [cli] Program refactoring

* [cli] Program remove println showing version

* [cli] Program rename actions parameter as syncPlan

* [core] ThorpArchive add an index to each action

* [cli] Program make SyncPlan available to ThorpArchive

* [core] Pass SyncTotals to Archive

* [domain] Move SyncTotals into module

* [domain] Pass index and SyncTotals to UploadEventListener

* [domain] UploadEventLogger add file count a size progress bars

* [domain] UploadEventLogger better display stability and add file index

* [cli] Index files in correct order

* [cli] Program extends Synchronise

* [core] Rename Synchronise as PlanBuilder

* [cli] Program add test to check actions don't get reordered from plan

* [core] collect file size totals

* [domain] UploadEventLogger include percentage

* [cli] ProgramTest Use wildcards when selecting more than 6 elements
This commit is contained in:
Paul Campbell 2019-07-04 18:58:31 +01:00 committed by GitHub
parent 26b4f6f794
commit 00c04187e8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 317 additions and 114 deletions

View file

@ -11,6 +11,7 @@ The format is based on [[https://keepachangelog.com/en/1.0.0/][Keep a Changelog]
- Add a version command-line option (#99) - Add a version command-line option (#99)
- Add a batch mode (#85) - Add a batch mode (#85)
- Display total size and progress for entire run (#94)
* [0.6.1] - 2019-07-03 * [0.6.1] - 2019-07-03

View file

@ -7,38 +7,63 @@ import net.kemitix.thorp.domain.{Logger, StorageQueueEvent}
import net.kemitix.thorp.storage.aws.S3HashService.defaultHashService import net.kemitix.thorp.storage.aws.S3HashService.defaultHashService
import net.kemitix.thorp.storage.aws.S3StorageServiceBuilder.defaultStorageService import net.kemitix.thorp.storage.aws.S3StorageServiceBuilder.defaultStorageService
trait Program { trait Program extends PlanBuilder {
def run(cliOptions: ConfigOptions): IO[ExitCode] = { def run(cliOptions: ConfigOptions): IO[ExitCode] = {
implicit val logger: Logger = new PrintLogger() implicit val logger: Logger = new PrintLogger()
if (ConfigQuery.showVersion(cliOptions)) IO { if (ConfigQuery.showVersion(cliOptions))
println(s"Thorp v${thorp.BuildInfo.version}")
ExitCode.Success
} else {
for { for {
actions <- Synchronise.createPlan(defaultStorageService, defaultHashService, cliOptions).valueOrF(handleErrors) _ <- logger.info(s"Thorp v${thorp.BuildInfo.version}")
events <- handleActions(UnversionedMirrorArchive.default(defaultStorageService, ConfigQuery.batchMode(cliOptions)), actions) } yield ExitCode.Success
else
for {
syncPlan <- createPlan(defaultStorageService, defaultHashService, cliOptions).valueOrF(handleErrors)
archive <- thorpArchive(cliOptions, syncPlan)
events <- handleActions(archive, syncPlan)
_ <- defaultStorageService.shutdown _ <- defaultStorageService.shutdown
_ <- SyncLogging.logRunFinished(events) _ <- SyncLogging.logRunFinished(events)
} yield ExitCode.Success } yield ExitCode.Success
}
} }
private def handleErrors(implicit logger: Logger): List[String] => IO[Stream[Action]] = { def thorpArchive(cliOptions: ConfigOptions,
syncPlan: SyncPlan): IO[ThorpArchive] =
IO.pure(
UnversionedMirrorArchive.default(
defaultStorageService,
ConfigQuery.batchMode(cliOptions),
syncPlan.syncTotals
))
private def handleErrors(implicit logger: Logger): List[String] => IO[SyncPlan] = {
errors => { errors => {
for { for {
_ <- logger.error("There were errors:") _ <- logger.error("There were errors:")
_ <- errors.map(error => logger.error(s" - $error")).sequence _ <- errors.map(error => logger.error(s" - $error")).sequence
} yield Stream() } yield SyncPlan()
} }
} }
private def handleActions(archive: ThorpArchive, private def handleActions(archive: ThorpArchive,
actions: Stream[Action]) syncPlan: SyncPlan)
(implicit l: Logger): IO[Stream[StorageQueueEvent]] = (implicit l: Logger): IO[Stream[StorageQueueEvent]] = {
actions.foldLeft(Stream[IO[StorageQueueEvent]]()) { type Accumulator = (Stream[IO[StorageQueueEvent]], Long)
(stream, action) => archive.update(action) ++ stream val zero: Accumulator = (Stream(), syncPlan.syncTotals.totalSizeBytes)
}.sequence val (actions, _) = syncPlan.actions
.zipWithIndex
.reverse
.foldLeft(zero) {
(acc: Accumulator, indexedAction) => {
val (stream, bytesToDo) = acc
val (action, index) = indexedAction
val remainingBytes = bytesToDo - action.size
(
archive.update(index, action, remainingBytes) ++ stream,
remainingBytes
)
}
}
actions.sequence
}
} }
object Program extends Program object Program extends Program

View file

@ -0,0 +1,64 @@
package net.kemitix.thorp.cli
import java.io.File
import cats.data.EitherT
import cats.effect.IO
import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.core._
import net.kemitix.thorp.domain.{Bucket, LocalFile, Logger, MD5Hash, RemoteKey, StorageQueueEvent}
import net.kemitix.thorp.storage.api.{HashService, StorageService}
import org.scalatest.FunSpec
class ProgramTest extends FunSpec {
val source: File = Resource(this, ".")
val bucket: Bucket = Bucket("aBucket")
val hash: MD5Hash = MD5Hash("aHash")
val copyAction: Action = ToCopy(bucket, RemoteKey("copy-me"), hash, RemoteKey("overwrite-me"), 17L)
val uploadAction: Action = ToUpload(bucket, LocalFile.resolve("aFile", Map(), source, _ => RemoteKey("upload-me")), 23L)
val deleteAction: Action = ToDelete(bucket, RemoteKey("delete-me"), 0L)
val configOptions: ConfigOptions = ConfigOptions(options = List(
ConfigOption.IgnoreGlobalOptions,
ConfigOption.IgnoreUserOptions
))
describe("upload, copy and delete actions in plan") {
val archive = TestProgram.thorpArchive
it("should be handled in correct order") {
val expected = List(copyAction, uploadAction, deleteAction)
TestProgram.run(configOptions).unsafeRunSync
val result = archive.actions
assertResult(expected)(result)
}
}
object TestProgram extends Program with TestPlanBuilder {
val thorpArchive: ActionCaptureArchive = new ActionCaptureArchive
override def thorpArchive(cliOptions: ConfigOptions, syncPlan: SyncPlan): IO[ThorpArchive] =
IO.pure(thorpArchive)
}
trait TestPlanBuilder extends PlanBuilder {
override def createPlan(storageService: StorageService,
hashService: HashService,
configOptions: ConfigOptions)
(implicit l: Logger): EitherT[IO, List[String], SyncPlan] = {
EitherT.right(IO(SyncPlan(Stream(copyAction, uploadAction, deleteAction))))
}
}
class ActionCaptureArchive extends ThorpArchive {
var actions: List[Action] = List[Action]()
override def update(index: Int,
action: Action,
totalBytesSoFar: Long)
(implicit l: Logger): Stream[IO[StorageQueueEvent]] = {
actions = action :: actions
Stream()
}
}
}

View file

@ -4,21 +4,26 @@ import net.kemitix.thorp.domain.{Bucket, LocalFile, MD5Hash, RemoteKey}
sealed trait Action { sealed trait Action {
def bucket: Bucket def bucket: Bucket
def size: Long
} }
object Action { object Action {
final case class DoNothing(bucket: Bucket, final case class DoNothing(bucket: Bucket,
remoteKey: RemoteKey) extends Action remoteKey: RemoteKey,
size: Long) extends Action
final case class ToUpload(bucket: Bucket, final case class ToUpload(bucket: Bucket,
localFile: LocalFile) extends Action localFile: LocalFile,
size: Long) extends Action
final case class ToCopy(bucket: Bucket, final case class ToCopy(bucket: Bucket,
sourceKey: RemoteKey, sourceKey: RemoteKey,
hash: MD5Hash, hash: MD5Hash,
targetKey: RemoteKey) extends Action targetKey: RemoteKey,
size: Long) extends Action
final case class ToDelete(bucket: Bucket, final case class ToDelete(bucket: Bucket,
remoteKey: RemoteKey) extends Action remoteKey: RemoteKey,
size: Long) extends Action
} }

View file

@ -40,12 +40,12 @@ object ActionGenerator {
private def doNothing(bucket: Bucket, private def doNothing(bucket: Bucket,
remoteKey: RemoteKey) = remoteKey: RemoteKey) =
Stream( Stream(
DoNothing(bucket, remoteKey)) DoNothing(bucket, remoteKey, 0L))
private def uploadFile(bucket: Bucket, private def uploadFile(bucket: Bucket,
localFile: LocalFile) = localFile: LocalFile) =
Stream( Stream(
ToUpload(bucket, localFile)) ToUpload(bucket, localFile, localFile.file.length))
private def copyFile(bucket: Bucket, private def copyFile(bucket: Bucket,
localFile: LocalFile, localFile: LocalFile,
@ -54,7 +54,7 @@ object ActionGenerator {
headOption.toStream.map { remoteMetaData => headOption.toStream.map { remoteMetaData =>
val sourceKey = remoteMetaData.remoteKey val sourceKey = remoteMetaData.remoteKey
val hash = remoteMetaData.hash val hash = remoteMetaData.hash
ToCopy(bucket, sourceKey, hash, localFile.remoteKey) ToCopy(bucket, sourceKey, hash, localFile.remoteKey, localFile.file.length)
} }
} }

View file

@ -14,11 +14,11 @@ object LocalFileStream {
def findFiles(file: File, def findFiles(file: File,
hashService: HashService) hashService: HashService)
(implicit c: Config, (implicit c: Config,
logger: Logger): IO[Stream[LocalFile]] = { logger: Logger): IO[LocalFiles] = {
val filters: Path => Boolean = Filter.isIncluded(c.filters) val filters: Path => Boolean = Filter.isIncluded(c.filters)
def loop(file: File): IO[Stream[LocalFile]] = { def loop(file: File): IO[LocalFiles] = {
def dirPaths(file: File): IO[Stream[File]] = def dirPaths(file: File): IO[Stream[File]] =
IO(listFiles(file)) IO(listFiles(file))
@ -26,16 +26,16 @@ object LocalFileStream {
Stream(fs: _*) Stream(fs: _*)
.filter(f => filters(f.toPath))) .filter(f => filters(f.toPath)))
def recurseIntoSubDirectories(file: File): IO[Stream[LocalFile]] = def recurseIntoSubDirectories(file: File): IO[LocalFiles] =
file match { file match {
case f if f.isDirectory => loop(file) case f if f.isDirectory => loop(file)
case _ => localFile(hashService, file) case _ => localFile(hashService, file)
} }
def recurse(fs: Stream[File]): IO[Stream[LocalFile]] = def recurse(fs: Stream[File]): IO[LocalFiles] =
fs.foldLeft(IO.pure(Stream.empty[LocalFile]))((acc, f) => fs.foldLeft(IO.pure(LocalFiles()))((acc, f) =>
recurseIntoSubDirectories(f) recurseIntoSubDirectories(f)
.flatMap(lfs => acc.map(s => s ++ lfs))) .flatMap(localFiles => acc.map(accLocalFiles => accLocalFiles ++ localFiles)))
for { for {
_ <- logger.debug(s"- Entering: $file") _ <- logger.debug(s"- Entering: $file")
@ -48,10 +48,16 @@ object LocalFileStream {
loop(file) loop(file)
} }
private def localFile(hashService: HashService, file: File)(implicit l: Logger, c: Config) = { private def localFile(hashService: HashService,
file: File)
(implicit l: Logger, c: Config) = {
for { for {
hash <- hashService.hashLocalObject(file) hash <- hashService.hashLocalObject(file)
} yield Stream(domain.LocalFile(file, c.source, hash, generateKey(c.source, c.prefix)(file))) } yield
LocalFiles(
localFiles = Stream(domain.LocalFile(file, c.source, hash, generateKey(c.source, c.prefix)(file))),
count = 1,
totalSizeBytes = file.length)
} }
//TODO: Change this to return an Either[IllegalArgumentException, Array[File]] //TODO: Change this to return an Either[IllegalArgumentException, Array[File]]

View file

@ -0,0 +1,14 @@
package net.kemitix.thorp.core
import net.kemitix.thorp.domain.LocalFile
case class LocalFiles(localFiles: Stream[LocalFile] = Stream(),
count: Long = 0,
totalSizeBytes: Long = 0) {
def ++(append: LocalFiles): LocalFiles =
copy(localFiles = localFiles ++ append.localFiles,
count = count + append.count,
totalSizeBytes = totalSizeBytes + append.totalSizeBytes)
}

View file

@ -7,12 +7,12 @@ import net.kemitix.thorp.core.Action.DoNothing
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.{HashService, StorageService} import net.kemitix.thorp.storage.api.{HashService, StorageService}
trait Synchronise { trait PlanBuilder {
def createPlan(storageService: StorageService, def createPlan(storageService: StorageService,
hashService: HashService, hashService: HashService,
configOptions: ConfigOptions) configOptions: ConfigOptions)
(implicit l: Logger): EitherT[IO, List[String], Stream[Action]] = (implicit l: Logger): EitherT[IO, List[String], SyncPlan] =
EitherT(ConfigurationBuilder.buildConfig(configOptions)) EitherT(ConfigurationBuilder.buildConfig(configOptions))
.leftMap(errorMessages) .leftMap(errorMessages)
.flatMap(config => useValidConfig(storageService, hashService)(config, l)) .flatMap(config => useValidConfig(storageService, hashService)(config, l))
@ -25,44 +25,58 @@ trait Synchronise {
case _ => true case _ => true
} }
def assemblePlan(implicit c: Config): ((S3ObjectsData, LocalFiles)) => SyncPlan = {
case (remoteData, localData) => {
val actions =
(actionsForLocalFiles(localData, remoteData) ++
actionsForRemoteKeys(remoteData))
.filter(removeDoNothing)
SyncPlan(
actions = actions,
syncTotals = SyncTotals(
count = localData.count,
totalSizeBytes = localData.totalSizeBytes))
}
}
def useValidConfig(storageService: StorageService, def useValidConfig(storageService: StorageService,
hashService: HashService) hashService: HashService)
(implicit c: Config, l: Logger): EitherT[IO, List[String], Stream[Action]] = { (implicit c: Config, l: Logger): EitherT[IO, List[String], SyncPlan] = {
for { for {
_ <- EitherT.liftF(SyncLogging.logRunStart(c.bucket, c.prefix, c.source)) _ <- EitherT.liftF(SyncLogging.logRunStart(c.bucket, c.prefix, c.source))
actions <- gatherMetadata(storageService, hashService) actions <- gatherMetadata(storageService, hashService)
.swap.map(error => List(error)).swap .leftMap(error => List(error))
.map { .map(assemblePlan)
case (remoteData, localData) =>
(actionsForLocalFiles(localData, remoteData) ++
actionsForRemoteKeys(remoteData))
.filter(removeDoNothing)
}
} yield actions } yield actions
} }
private def gatherMetadata(storageService: StorageService, private def gatherMetadata(storageService: StorageService,
hashService: HashService) hashService: HashService)
(implicit l: Logger, (implicit l: Logger,
c: Config): EitherT[IO, String, (S3ObjectsData, Stream[LocalFile])] = c: Config): EitherT[IO, String, (S3ObjectsData, LocalFiles)] =
for { for {
remoteData <- fetchRemoteData(storageService) remoteData <- fetchRemoteData(storageService)
localData <- EitherT.liftF(findLocalFiles(hashService)) localData <- EitherT.liftF(findLocalFiles(hashService))
} yield (remoteData, localData) } yield (remoteData, localData)
private def actionsForLocalFiles(localData: Stream[LocalFile], remoteData: S3ObjectsData) private def actionsForLocalFiles(localData: LocalFiles, remoteData: S3ObjectsData)
(implicit c: Config) = (implicit c: Config) =
localData.foldLeft(Stream[Action]())((acc, lf) => createActionFromLocalFile(lf, remoteData) ++ acc) localData.localFiles.foldLeft(Stream[Action]())((acc, lf) => createActionFromLocalFile(lf, remoteData) ++ acc)
private def actionsForRemoteKeys(remoteData: S3ObjectsData) private def actionsForRemoteKeys(remoteData: S3ObjectsData)
(implicit c: Config) = (implicit c: Config) =
remoteData.byKey.keys.foldLeft(Stream[Action]())((acc, rk) => createActionFromRemoteKey(rk) #:: acc) remoteData.byKey.keys.foldLeft(Stream[Action]())((acc, rk) => createActionFromRemoteKey(rk) #:: acc)
private def fetchRemoteData(storageService: StorageService)(implicit c: Config) = private def fetchRemoteData(storageService: StorageService)
(implicit c: Config, l: Logger) =
storageService.listObjects(c.bucket, c.prefix) storageService.listObjects(c.bucket, c.prefix)
private def findLocalFiles(hashService: HashService)(implicit config: Config, l: Logger) = private def findLocalFiles(hashService: HashService)
LocalFileStream.findFiles(config.source, hashService) (implicit config: Config, l: Logger) =
for {
_ <- SyncLogging.logFileScan
localFiles <- LocalFileStream.findFiles(config.source, hashService)
} yield localFiles
private def createActionFromLocalFile(lf: LocalFile, remoteData: S3ObjectsData) private def createActionFromLocalFile(lf: LocalFile, remoteData: S3ObjectsData)
(implicit c: Config) = (implicit c: Config) =
@ -70,9 +84,9 @@ trait Synchronise {
private def createActionFromRemoteKey(rk: RemoteKey) private def createActionFromRemoteKey(rk: RemoteKey)
(implicit c: Config) = (implicit c: Config) =
if (rk.isMissingLocally(c.source, c.prefix)) Action.ToDelete(c.bucket, rk) if (rk.isMissingLocally(c.source, c.prefix)) Action.ToDelete(c.bucket, rk, 0L)
else DoNothing(c.bucket, rk) else DoNothing(c.bucket, rk, 0L)
} }
object Synchronise extends Synchronise object PlanBuilder extends PlanBuilder

View file

@ -0,0 +1,6 @@
package net.kemitix.thorp.core
import net.kemitix.thorp.domain.SyncTotals
case class SyncPlan(actions: Stream[Action] = Stream(),
syncTotals: SyncTotals = SyncTotals())

View file

@ -5,7 +5,10 @@ import net.kemitix.thorp.domain.{LocalFile, Logger, StorageQueueEvent}
trait ThorpArchive { trait ThorpArchive {
def update(action: Action)(implicit l: Logger): Stream[IO[StorageQueueEvent]] def update(index: Int,
action: Action,
totalBytesSoFar: Long)
(implicit l: Logger): Stream[IO[StorageQueueEvent]]
def fileUploaded(localFile: LocalFile, def fileUploaded(localFile: LocalFile,
batchMode: Boolean) batchMode: Boolean)

View file

@ -3,29 +3,33 @@ package net.kemitix.thorp.core
import cats.effect.IO import cats.effect.IO
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent
import net.kemitix.thorp.domain.{LocalFile, Logger, StorageQueueEvent, UploadEventListener} import net.kemitix.thorp.domain.{Logger, StorageQueueEvent, SyncTotals, UploadEventListener}
import net.kemitix.thorp.storage.api.StorageService import net.kemitix.thorp.storage.api.StorageService
case class UnversionedMirrorArchive(storageService: StorageService, case class UnversionedMirrorArchive(storageService: StorageService,
batchMode: Boolean) extends ThorpArchive { batchMode: Boolean,
override def update(action: Action) syncTotals: SyncTotals) extends ThorpArchive {
override def update(index: Int,
action: Action,
totalBytesSoFar: Long)
(implicit l: Logger): Stream[IO[StorageQueueEvent]] = (implicit l: Logger): Stream[IO[StorageQueueEvent]] =
Stream( Stream(
action match { action match {
case ToUpload(bucket, localFile) => case ToUpload(bucket, localFile, size) =>
for { for {
event <- storageService.upload(localFile, bucket, batchMode, new UploadEventListener(localFile), 1) event <- storageService.upload(localFile, bucket, batchMode,
new UploadEventListener(localFile, index, syncTotals, totalBytesSoFar), 1)
_ <- fileUploaded(localFile, batchMode) _ <- fileUploaded(localFile, batchMode)
} yield event } yield event
case ToCopy(bucket, sourceKey, hash, targetKey) => case ToCopy(bucket, sourceKey, hash, targetKey, size) =>
for { for {
event <- storageService.copy(bucket, sourceKey, hash, targetKey) event <- storageService.copy(bucket, sourceKey, hash, targetKey)
} yield event } yield event
case ToDelete(bucket, remoteKey) => case ToDelete(bucket, remoteKey, size) =>
for { for {
event <- storageService.delete(bucket, remoteKey) event <- storageService.delete(bucket, remoteKey)
} yield event } yield event
case DoNothing(_, remoteKey) => case DoNothing(_, remoteKey, size) =>
IO.pure(DoNothingQueueEvent(remoteKey)) IO.pure(DoNothingQueueEvent(remoteKey))
}) })
@ -33,6 +37,7 @@ case class UnversionedMirrorArchive(storageService: StorageService,
object UnversionedMirrorArchive { object UnversionedMirrorArchive {
def default(storageService: StorageService, def default(storageService: StorageService,
batchMode: Boolean): ThorpArchive = batchMode: Boolean,
new UnversionedMirrorArchive(storageService, batchMode) syncTotals: SyncTotals): ThorpArchive =
new UnversionedMirrorArchive(storageService, batchMode, syncTotals)
} }

View file

@ -29,7 +29,7 @@ class ActionGeneratorSuite
matchByKey = Some(theRemoteMetadata) // remote exists matchByKey = Some(theRemoteMetadata) // remote exists
) )
it("do nothing") { it("do nothing") {
val expected = List(DoNothing(bucket, theFile.remoteKey)) val expected = List(DoNothing(bucket, theFile.remoteKey, theFile.file.length))
val result = invoke(input) val result = invoke(input)
assertResult(expected)(result) assertResult(expected)(result)
} }
@ -44,7 +44,7 @@ class ActionGeneratorSuite
matchByHash = Set(otherRemoteMetadata), // other matches matchByHash = Set(otherRemoteMetadata), // other matches
matchByKey = None) // remote is missing matchByKey = None) // remote is missing
it("copy from other key") { it("copy from other key") {
val expected = List(ToCopy(bucket, otherRemoteKey, theHash, theRemoteKey)) // copy val expected = List(ToCopy(bucket, otherRemoteKey, theHash, theRemoteKey, theFile.file.length)) // copy
val result = invoke(input) val result = invoke(input)
assertResult(expected)(result) assertResult(expected)(result)
} }
@ -56,7 +56,7 @@ class ActionGeneratorSuite
matchByHash = Set.empty, // other no matches matchByHash = Set.empty, // other no matches
matchByKey = None) // remote is missing matchByKey = None) // remote is missing
it("upload") { it("upload") {
val expected = List(ToUpload(bucket, theFile)) // upload val expected = List(ToUpload(bucket, theFile, theFile.file.length)) // upload
val result = invoke(input) val result = invoke(input)
assertResult(expected)(result) assertResult(expected)(result)
} }
@ -75,7 +75,7 @@ class ActionGeneratorSuite
matchByHash = Set(otherRemoteMetadata), // other matches matchByHash = Set(otherRemoteMetadata), // other matches
matchByKey = Some(oldRemoteMetadata)) // remote exists matchByKey = Some(oldRemoteMetadata)) // remote exists
it("copy from other key") { it("copy from other key") {
val expected = List(ToCopy(bucket, otherRemoteKey, theHash, theRemoteKey)) // copy val expected = List(ToCopy(bucket, otherRemoteKey, theHash, theRemoteKey, theFile.file.length)) // copy
val result = invoke(input) val result = invoke(input)
assertResult(expected)(result) assertResult(expected)(result)
} }
@ -91,7 +91,7 @@ class ActionGeneratorSuite
matchByKey = Some(theRemoteMetadata) // remote exists matchByKey = Some(theRemoteMetadata) // remote exists
) )
it("upload") { it("upload") {
val expected = List(ToUpload(bucket, theFile)) // upload val expected = List(ToUpload(bucket, theFile, theFile.file.length)) // upload
val result = invoke(input) val result = invoke(input)
assertResult(expected)(result) assertResult(expected)(result)
} }

View file

@ -23,9 +23,21 @@ class LocalFileStreamSuite extends FunSpec {
describe("findFiles") { describe("findFiles") {
it("should find all files") { it("should find all files") {
val result: Set[String] = val result: Set[String] =
LocalFileStream.findFiles(uploadResource, hashService).unsafeRunSync.toSet invoke.localFiles.toSet
.map { x: LocalFile => x.relative.toString } .map { x: LocalFile => x.relative.toString }
assertResult(Set("subdir/leaf-file", "root-file"))(result) assertResult(Set("subdir/leaf-file", "root-file"))(result)
} }
it("should count all files") {
val result = invoke.count
assertResult(2)(result)
}
it("should sum the size of all files") {
val result = invoke.totalSizeBytes
assertResult(113)(result)
}
}
private def invoke = {
LocalFileStream.findFiles(uploadResource, hashService).unsafeRunSync
} }
} }

View file

@ -49,9 +49,16 @@ class SyncSuite
)) ))
def invokeSubject(storageService: StorageService, def invokeSubject(storageService: StorageService,
hashService: HashService, hashService: HashService,
configOptions: ConfigOptions): Either[List[String], Stream[Action]] = { configOptions: ConfigOptions): Either[List[String], SyncPlan] = {
Synchronise.createPlan(storageService, hashService, configOptions).value.unsafeRunSync PlanBuilder.createPlan(storageService, hashService, configOptions).value.unsafeRunSync
}
def invokeSubjectForActions(storageService: StorageService,
hashService: HashService,
configOptions: ConfigOptions): Either[List[String], Stream[Action]] = {
invokeSubject(storageService, hashService, configOptions)
.map(_.actions)
} }
describe("when all files should be uploaded") { describe("when all files should be uploaded") {
@ -60,9 +67,9 @@ class SyncSuite
byKey = Map())) byKey = Map()))
it("uploads all files") { it("uploads all files") {
val expected = Right(Set( val expected = Right(Set(
ToUpload(testBucket, rootFile), ToUpload(testBucket, rootFile, rootFile.file.length),
ToUpload(testBucket, leafFile))) ToUpload(testBucket, leafFile, leafFile.file.length)))
val result = invokeSubject(storageService, hashService, configOptions) val result = invokeSubjectForActions(storageService, hashService, configOptions)
assertResult(expected)(result.map(_.toSet)) assertResult(expected)(result.map(_.toSet))
} }
} }
@ -81,7 +88,7 @@ class SyncSuite
val storageService = new RecordingStorageService(testBucket, s3ObjectsData) val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
it("no actions") { it("no actions") {
val expected = Stream() val expected = Stream()
val result = invokeSubject(storageService, hashService, configOptions) val result = invokeSubjectForActions(storageService, hashService, configOptions)
assert(result.isRight) assert(result.isRight)
assertResult(expected)(result.right.get) assertResult(expected)(result.right.get)
} }
@ -100,10 +107,10 @@ class SyncSuite
val storageService = new RecordingStorageService(testBucket, s3ObjectsData) val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
it("copies the file and deletes the original") { it("copies the file and deletes the original") {
val expected = Stream( val expected = Stream(
ToCopy(testBucket, sourceKey, Root.hash, targetKey), ToCopy(testBucket, sourceKey, Root.hash, targetKey, rootFile.file.length),
ToDelete(testBucket, sourceKey) ToDelete(testBucket, sourceKey, 0L)
) )
val result = invokeSubject(storageService, hashService, configOptions) val result = invokeSubjectForActions(storageService, hashService, configOptions)
assert(result.isRight) assert(result.isRight)
assertResult(expected)(result.right.get) assertResult(expected)(result.right.get)
} }
@ -128,9 +135,9 @@ class SyncSuite
val storageService = new RecordingStorageService(testBucket, s3ObjectsData) val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
it("deleted key") { it("deleted key") {
val expected = Stream( val expected = Stream(
ToDelete(testBucket, deletedKey) ToDelete(testBucket, deletedKey, 0L)
) )
val result = invokeSubject(storageService,hashService, configOptions) val result = invokeSubjectForActions(storageService,hashService, configOptions)
assert(result.isRight) assert(result.isRight)
assertResult(expected)(result.right.get) assertResult(expected)(result.right.get)
} }
@ -146,7 +153,7 @@ class SyncSuite
val storageService = new RecordingStorageService(testBucket, s3ObjectsData) val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
it("is not uploaded") { it("is not uploaded") {
val expected = Stream() val expected = Stream()
val result = invokeSubject(storageService, hashService, ConfigOption.Exclude("leaf") :: configOptions) val result = invokeSubjectForActions(storageService, hashService, ConfigOption.Exclude("leaf") :: configOptions)
assert(result.isRight) assert(result.isRight)
assertResult(expected)(result.right.get) assertResult(expected)(result.right.get)
} }
@ -157,7 +164,8 @@ class SyncSuite
extends StorageService { extends StorageService {
override def listObjects(bucket: Bucket, override def listObjects(bucket: Bucket,
prefix: RemoteKey): EitherT[IO, String, S3ObjectsData] = prefix: RemoteKey)
(implicit l: Logger): EitherT[IO, String, S3ObjectsData] =
EitherT.liftF(IO.pure(s3ObjectsData)) EitherT.liftF(IO.pure(s3ObjectsData))
override def upload(localFile: LocalFile, override def upload(localFile: LocalFile,

View file

@ -0,0 +1,5 @@
package net.kemitix.thorp.domain
case class SyncTotals(count: Long = 0L,
totalSizeBytes: Long = 0L,
sizeUploadedBytes: Long = 0L)

View file

@ -3,15 +3,17 @@ package net.kemitix.thorp.domain
import net.kemitix.thorp.domain.UploadEvent.RequestEvent import net.kemitix.thorp.domain.UploadEvent.RequestEvent
import net.kemitix.thorp.domain.UploadEventLogger.logRequestCycle import net.kemitix.thorp.domain.UploadEventLogger.logRequestCycle
class UploadEventListener(localFile: LocalFile) { class UploadEventListener(localFile: LocalFile,
index: Int,
syncTotals: SyncTotals,
totalBytesSoFar: Long) {
var bytesTransferred = 0L var bytesTransferred = 0L
def listener: UploadEvent => Unit = def listener: UploadEvent => Unit = {
{
case e: RequestEvent => case e: RequestEvent =>
bytesTransferred += e.transferred bytesTransferred += e.transferred
logRequestCycle(localFile, e, bytesTransferred) logRequestCycle(localFile, e, bytesTransferred, index, syncTotals, totalBytesSoFar)
case _ => () case _ => ()
} }
} }

View file

@ -10,19 +10,35 @@ trait UploadEventLogger {
def logRequestCycle(localFile: LocalFile, def logRequestCycle(localFile: LocalFile,
event: RequestEvent, event: RequestEvent,
bytesTransferred: Long): Unit = { bytesTransferred: Long,
index: Int,
syncTotals: SyncTotals,
totalBytesSoFar: Long): Unit = {
val remoteKey = localFile.remoteKey.key val remoteKey = localFile.remoteKey.key
val fileLength = localFile.file.length val fileLength = localFile.file.length
val statusHeight = 7
if (bytesTransferred < fileLength) { if (bytesTransferred < fileLength) {
val bar = progressBar(bytesTransferred, fileLength.toDouble, Terminal.width) println(
val transferred = sizeInEnglish(bytesTransferred) s"${GREEN}Uploading:$RESET $remoteKey$eraseToEndOfScreen\n" +
val fileSize = sizeInEnglish(fileLength) statusWithBar(" File", sizeInEnglish, bytesTransferred, fileLength) +
val message = s"${GREEN}Uploaded $transferred of $fileSize $RESET: $remoteKey$eraseLineForward" statusWithBar("Files", l => l.toString, index, syncTotals.count) +
println(s"$message\n$bar${Terminal.cursorPrevLine() * 2}") statusWithBar(" Size", sizeInEnglish, bytesTransferred + totalBytesSoFar, syncTotals.totalSizeBytes) +
s"${Terminal.cursorPrevLine(statusHeight)}")
} else } else
println(s"${GREEN}Uploaded:$RESET $remoteKey$eraseLineForward") println(s"${GREEN}Uploaded:$RESET $remoteKey$eraseToEndOfScreen")
} }
private def statusWithBar(label: String,
format: Long => String,
current: Long,
max: Long,
pre: Long = 0): String = {
val percent = f"${(current * 100) / max}%2d"
s"$GREEN$label:$RESET ($percent%) ${format(current)} of ${format(max)}" +
(if (pre > 0) s" (pre-synced ${format(pre)}"
else "") + s"$eraseLineForward\n" +
progressBar(current, max, Terminal.width)
}
} }
object UploadEventLogger extends UploadEventLogger object UploadEventLogger extends UploadEventLogger

View file

@ -9,7 +9,8 @@ trait StorageService {
def shutdown: IO[StorageQueueEvent] def shutdown: IO[StorageQueueEvent]
def listObjects(bucket: Bucket, def listObjects(bucket: Bucket,
prefix: RemoteKey): EitherT[IO, String, S3ObjectsData] prefix: RemoteKey)
(implicit l: Logger): EitherT[IO, String, S3ObjectsData]
def upload(localFile: LocalFile, def upload(localFile: LocalFile,
bucket: Bucket, bucket: Bucket,

View file

@ -5,7 +5,7 @@ import cats.effect.IO
import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary} import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary}
import net.kemitix.thorp.domain import net.kemitix.thorp.domain
import net.kemitix.thorp.domain.{Bucket, RemoteKey, S3ObjectsData} import net.kemitix.thorp.domain.{Bucket, Logger, RemoteKey, S3ObjectsData}
import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash
import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey
@ -14,30 +14,25 @@ import scala.util.Try
class Lister(amazonS3: AmazonS3) { class Lister(amazonS3: AmazonS3) {
private type Token = String
private type Batch = (Stream[S3ObjectSummary], Option[Token])
def listObjects(bucket: Bucket, def listObjects(bucket: Bucket,
prefix: RemoteKey): EitherT[IO, String, S3ObjectsData] = { prefix: RemoteKey)
(implicit l: Logger): EitherT[IO, String, S3ObjectsData] = {
type Token = String val requestMore = (token: Token) => new ListObjectsV2Request()
type Batch = (Stream[S3ObjectSummary], Option[Token])
val requestMore = (token:Token) => new ListObjectsV2Request()
.withBucketName(bucket.name) .withBucketName(bucket.name)
.withPrefix(prefix.key) .withPrefix(prefix.key)
.withContinuationToken(token) .withContinuationToken(token)
def fetchBatch: ListObjectsV2Request => EitherT[IO, String, Batch] = def fetchBatch: ListObjectsV2Request => EitherT[IO, String, Batch] =
request => request => EitherT {
EitherT { for {
IO.pure { _ <- ListerLogger.logFetchBatch
Try(amazonS3.listObjectsV2(request)) batch <- tryFetchBatch(request)
.map { result => } yield batch
val more: Option[Token] = }
if (result.isTruncated) Some(result.getNextContinuationToken)
else None
(result.getObjectSummaries.asScala.toStream, more)
}.toEither.swap.map(e => e.getMessage).swap
}
}
def fetchMore(more: Option[Token]): EitherT[IO, String, Stream[S3ObjectSummary]] = { def fetchMore(more: Option[Token]): EitherT[IO, String, Stream[S3ObjectSummary]] = {
more match { more match {
@ -60,4 +55,15 @@ class Lister(amazonS3: AmazonS3) {
} yield domain.S3ObjectsData(byHash(summaries), byKey(summaries)) } yield domain.S3ObjectsData(byHash(summaries), byKey(summaries))
} }
private def tryFetchBatch(request: ListObjectsV2Request): IO[Either[String, (Stream[S3ObjectSummary], Option[Token])]] = {
IO {
Try(amazonS3.listObjectsV2(request))
.map { result =>
val more: Option[Token] =
if (result.isTruncated) Some(result.getNextContinuationToken)
else None
(result.getObjectSummaries.asScala.toStream, more)
}.toEither.swap.map(e => e.getMessage).swap
}
}
} }

View file

@ -0,0 +1,9 @@
package net.kemitix.thorp.storage.aws
import cats.effect.IO
import net.kemitix.thorp.domain.Logger
trait ListerLogger {
def logFetchBatch(implicit l: Logger): IO[Unit] = l.info("Fetching remote summaries...")
}
object ListerLogger extends ListerLogger

View file

@ -18,7 +18,8 @@ class S3StorageService(amazonS3Client: => AmazonS3,
lazy val deleter = new Deleter(amazonS3Client) lazy val deleter = new Deleter(amazonS3Client)
override def listObjects(bucket: Bucket, override def listObjects(bucket: Bucket,
prefix: RemoteKey): EitherT[IO, String, S3ObjectsData] = prefix: RemoteKey)
(implicit l: Logger): EitherT[IO, String, S3ObjectsData] =
objectLister.listObjects(bucket, prefix) objectLister.listObjects(bucket, prefix)
override def copy(bucket: Bucket, override def copy(bucket: Bucket,

View file

@ -116,7 +116,7 @@ class StorageServiceSuite
LocalFile.resolve("root-file", md5HashMap(Root.hash), source, KeyGenerator.generateKey(source, prefix)) LocalFile.resolve("root-file", md5HashMap(Root.hash), source, KeyGenerator.generateKey(source, prefix))
val bucket = Bucket("a-bucket") val bucket = Bucket("a-bucket")
val remoteKey = RemoteKey("prefix/root-file") val remoteKey = RemoteKey("prefix/root-file")
val uploadEventListener = new UploadEventListener(localFile) val uploadEventListener = new UploadEventListener(localFile, 1, SyncTotals(), 0L)
val upload = stub[Upload] val upload = stub[Upload]
(amazonS3TransferManager upload (_: PutObjectRequest)).when(*).returns(upload) (amazonS3TransferManager upload (_: PutObjectRequest)).when(*).returns(upload)

View file

@ -39,7 +39,7 @@ class UploaderSuite
val returnedKey = RemoteKey("returned-key") val returnedKey = RemoteKey("returned-key")
val returnedHash = MD5Hash("returned-hash") val returnedHash = MD5Hash("returned-hash")
val bigFile = LocalFile.resolve("small-file", md5HashMap(MD5Hash("the-hash")), source, fileToKey) val bigFile = LocalFile.resolve("small-file", md5HashMap(MD5Hash("the-hash")), source, fileToKey)
val uploadEventListener = new UploadEventListener(bigFile) val uploadEventListener = new UploadEventListener(bigFile, 1, SyncTotals(), 0L)
val amazonS3 = mock[AmazonS3] val amazonS3 = mock[AmazonS3]
val amazonS3TransferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build val amazonS3TransferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build
val uploader = new Uploader(amazonS3TransferManager) val uploader = new Uploader(amazonS3TransferManager)