Apply scalafmt (#108)

* [scalafmt] Add .scalafmt

* [sbt] Add unused import warning and make them errors

* [core] ConfigurationBuilder remove unused import

* [core] ConfigurationBuilder apply scalafmt

* [domain] reformat

* [storage-api] reformat

* [core] reformat and some refactoring

* [storage-aws] reformat

* [cli] reformat

* [core] post rebase fix up

* [storage-aws] UploaderSuite tidy up

* [domain] MD5Hash tweak

Without doing this we have a file that we don't create a valid md5
hash for. See #103

* [storage-aws] UploaderSuite remove unused import

* [storage-aws] StorageServiceSuite reformatted

* [sbt] consistent settings for all modules

Can't enable as stubbing TransferManager attempts to use the default
constructor for TransferManager.

* [storage-aws] Add AmazonTransferManager

This gives us an interface we can safely stub in unit tests without
the compiler failing the build because TransferManager's default
constructor is deprecated.

* [storage-aws] Add AmazonUpload

Prevents deprecation errors due to deprecated parameters on Transfer.

* [sbt] mode import to top of file
This commit is contained in:
Paul Campbell 2019-07-16 07:56:54 +01:00 committed by GitHub
parent dfb885b76d
commit afc55354e7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
92 changed files with 2095 additions and 1454 deletions

2
.scalafmt.conf Normal file
View file

@ -0,0 +1,2 @@
maxColumn = 80
align = more

View file

@ -1,3 +1,5 @@
import sbtassembly.AssemblyPlugin.defaultShellScript
inThisBuild(List(
organization := "net.kemitix.thorp",
homepage := Some(url("https://github.com/kemitix/thorp")),
@ -15,6 +17,15 @@ inThisBuild(List(
val commonSettings = Seq(
sonatypeProfileName := "net.kemitix",
scalaVersion := "2.12.8",
scalacOptions ++= Seq(
"-Ywarn-unused-import",
"-Xfatal-warnings",
"-feature",
"-deprecation",
"-unchecked",
"-language:postfixOps",
"-language:higherKinds",
"-Ypartial-unification"),
test in assembly := {}
)
@ -43,15 +54,7 @@ val awsSdkDependencies = Seq(
val catsEffectsSettings = Seq(
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-effect" % "1.3.1"
),
// recommended for cats-effects
scalacOptions ++= Seq(
"-feature",
"-deprecation",
"-unchecked",
"-language:postfixOps",
"-language:higherKinds",
"-Ypartial-unification")
)
)
// cli -> thorp-lib -> storage-aws -> core -> storage-api -> domain
@ -60,7 +63,6 @@ lazy val thorp = (project in file("."))
.settings(commonSettings)
.aggregate(cli, `thorp-lib`, `storage-aws`, core, `storage-api`, domain)
import sbtassembly.AssemblyPlugin.defaultShellScript
lazy val cli = (project in file("cli"))
.settings(commonSettings)
.settings(mainClass in assembly := Some("net.kemitix.thorp.cli.Main"))

View file

@ -35,7 +35,7 @@ object ParseArgs {
.text("Include only matching paths"),
opt[String]('x', "exclude")
.unbounded()
.action((str,cos) => ConfigOption.Exclude(str) :: cos)
.action((str, cos) => ConfigOption.Exclude(str) :: cos)
.text("Exclude matching paths"),
opt[Unit]('d', "debug")
.action((_, cos) => ConfigOption.Debug() :: cos)
@ -50,7 +50,8 @@ object ParseArgs {
}
def apply(args: List[String]): Option[ConfigOptions] =
OParser.parse(configParser, args, List())
OParser
.parse(configParser, args, List())
.map(ConfigOptions)
}

View file

@ -9,11 +9,14 @@ class PrintLogger(isDebug: Boolean = false) extends Logger {
if (isDebug) IO(println(s"[ DEBUG] $message"))
else IO.unit
override def info(message: => String): IO[Unit] = IO(println(s"[ INFO] $message"))
override def info(message: => String): IO[Unit] =
IO(println(s"[ INFO] $message"))
override def warn(message: String): IO[Unit] = IO(println(s"[ WARN] $message"))
override def warn(message: String): IO[Unit] =
IO(println(s"[ WARN] $message"))
override def error(message: String): IO[Unit] = IO(println(s"[ ERROR] $message"))
override def error(message: String): IO[Unit] =
IO(println(s"[ ERROR] $message"))
override def withDebug(debug: Boolean): Logger =
if (isDebug == debug) this

View file

@ -17,7 +17,11 @@ trait Program extends PlanBuilder {
} yield ExitCode.Success
else
for {
syncPlan <- createPlan(defaultStorageService, defaultHashService, cliOptions).valueOrF(handleErrors)
syncPlan <- createPlan(
defaultStorageService,
defaultHashService,
cliOptions
).valueOrF(handleErrors)
archive <- thorpArchive(cliOptions, syncPlan)
events <- handleActions(archive, syncPlan)
_ <- defaultStorageService.shutdown
@ -25,8 +29,10 @@ trait Program extends PlanBuilder {
} yield ExitCode.Success
}
def thorpArchive(cliOptions: ConfigOptions,
syncPlan: SyncPlan): IO[ThorpArchive] =
def thorpArchive(
cliOptions: ConfigOptions,
syncPlan: SyncPlan
): IO[ThorpArchive] =
IO.pure(
UnversionedMirrorArchive.default(
defaultStorageService,
@ -34,25 +40,24 @@ trait Program extends PlanBuilder {
syncPlan.syncTotals
))
private def handleErrors(implicit logger: Logger): List[String] => IO[SyncPlan] = {
errors => {
private def handleErrors(
implicit logger: Logger
): List[String] => IO[SyncPlan] = errors => {
for {
_ <- logger.error("There were errors:")
_ <- errors.map(error => logger.error(s" - $error")).sequence
} yield SyncPlan()
}
}
private def handleActions(archive: ThorpArchive,
syncPlan: SyncPlan)
(implicit l: Logger): IO[Stream[StorageQueueEvent]] = {
private def handleActions(
archive: ThorpArchive,
syncPlan: SyncPlan
)(implicit l: Logger): IO[Stream[StorageQueueEvent]] = {
type Accumulator = (Stream[IO[StorageQueueEvent]], Long)
val zero: Accumulator = (Stream(), syncPlan.syncTotals.totalSizeBytes)
val (actions, _) = syncPlan.actions
.zipWithIndex
.reverse
.foldLeft(zero) {
(acc: Accumulator, indexedAction) => {
val (actions, _) = syncPlan.actions.zipWithIndex.reverse
.foldLeft(zero) { (acc: Accumulator, indexedAction) =>
{
val (stream, bytesToDo) = acc
val (action, index) = indexedAction
val remainingBytes = bytesToDo - action.size

View file

@ -24,13 +24,11 @@ class ParseArgsTest extends FunSpec {
}
describe("when source is a relative path to a directory") {
val result = invokeWithSource(pathTo("."))
it("should succeed") {pending}
it("should succeed") { pending }
}
describe("when there are multiple sources") {
val args = List(
"--source", "path1",
"--source", "path2",
"--bucket", "bucket")
val args =
List("--source", "path1", "--source", "path2", "--bucket", "bucket")
it("should get multiple sources") {
val expected = Some(Set("path1", "path2").map(Paths.get(_)))
val configOptions = ParseArgs(args)

View file

@ -17,11 +17,16 @@ class ProgramTest extends FunSpec {
val sourcePath: Path = source.toPath
val bucket: Bucket = Bucket("aBucket")
val hash: MD5Hash = MD5Hash("aHash")
val copyAction: Action = ToCopy(bucket, RemoteKey("copy-me"), hash, RemoteKey("overwrite-me"), 17L)
val uploadAction: Action = ToUpload(bucket, LocalFile.resolve("aFile", Map(), sourcePath, _ => RemoteKey("upload-me")), 23L)
val copyAction: Action =
ToCopy(bucket, RemoteKey("copy-me"), hash, RemoteKey("overwrite-me"), 17L)
val uploadAction: Action = ToUpload(
bucket,
LocalFile.resolve("aFile", Map(), sourcePath, _ => RemoteKey("upload-me")),
23L)
val deleteAction: Action = ToDelete(bucket, RemoteKey("delete-me"), 0L)
val configOptions: ConfigOptions = ConfigOptions(options = List(
val configOptions: ConfigOptions = ConfigOptions(
options = List(
ConfigOption.IgnoreGlobalOptions,
ConfigOption.IgnoreUserOptions
))
@ -36,31 +41,30 @@ class ProgramTest extends FunSpec {
}
}
object TestProgram extends Program with TestPlanBuilder {
val thorpArchive: ActionCaptureArchive = new ActionCaptureArchive
override def thorpArchive(cliOptions: ConfigOptions, syncPlan: SyncPlan): IO[ThorpArchive] =
IO.pure(thorpArchive)
}
trait TestPlanBuilder extends PlanBuilder {
override def createPlan(storageService: StorageService,
hashService: HashService,
configOptions: ConfigOptions)
(implicit l: Logger): EitherT[IO, List[String], SyncPlan] = {
EitherT.right(IO(SyncPlan(Stream(copyAction, uploadAction, deleteAction))))
configOptions: ConfigOptions)(
implicit l: Logger): EitherT[IO, List[String], SyncPlan] = {
EitherT.right(
IO(SyncPlan(Stream(copyAction, uploadAction, deleteAction))))
}
}
class ActionCaptureArchive extends ThorpArchive {
var actions: List[Action] = List[Action]()
override def update(index: Int,
action: Action,
totalBytesSoFar: Long)
(implicit l: Logger): Stream[IO[StorageQueueEvent]] = {
override def update(index: Int, action: Action, totalBytesSoFar: Long)(
implicit l: Logger): Stream[IO[StorageQueueEvent]] = {
actions = action :: actions
Stream()
}
}
}
object TestProgram extends Program with TestPlanBuilder {
val thorpArchive: ActionCaptureArchive = new ActionCaptureArchive
override def thorpArchive(cliOptions: ConfigOptions,
syncPlan: SyncPlan): IO[ThorpArchive] =
IO.pure(thorpArchive)
}
}

View file

@ -8,22 +8,30 @@ sealed trait Action {
}
object Action {
final case class DoNothing(bucket: Bucket,
final case class DoNothing(
bucket: Bucket,
remoteKey: RemoteKey,
size: Long) extends Action
size: Long
) extends Action
final case class ToUpload(bucket: Bucket,
final case class ToUpload(
bucket: Bucket,
localFile: LocalFile,
size: Long) extends Action
size: Long
) extends Action
final case class ToCopy(bucket: Bucket,
final case class ToCopy(
bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey,
size: Long) extends Action
size: Long
) extends Action
final case class ToDelete(bucket: Bucket,
final case class ToDelete(
bucket: Bucket,
remoteKey: RemoteKey,
size: Long) extends Action
size: Long
) extends Action
}

View file

@ -5,70 +5,73 @@ import net.kemitix.thorp.domain._
object ActionGenerator {
def remoteNameNotAlreadyQueued(localFile: LocalFile,
previousActions: Stream[Action]): Boolean = {
val key = localFile.remoteKey.key
!previousActions.exists {
case ToUpload(_, lf, _) => lf.remoteKey.key equals key
case _ => false
}
}
def createActions(s3MetaData: S3MetaData,
previousActions: Stream[Action])
(implicit c: Config): Stream[Action] =
def createActions(
s3MetaData: S3MetaData,
previousActions: Stream[Action]
)(implicit c: Config): Stream[Action] =
s3MetaData match {
// #1 local exists, remote exists, remote matches - do nothing
case S3MetaData(localFile, _, Some(RemoteMetaData(remoteKey, remoteHash, _)))
if localFile.matches(remoteHash)
=> doNothing(c.bucket, remoteKey)
case S3MetaData(localFile, _, Some(RemoteMetaData(key, hash, _)))
if localFile.matches(hash) =>
doNothing(c.bucket, key)
// #2 local exists, remote is missing, other matches - copy
case S3MetaData(localFile, otherMatches, None)
if otherMatches.nonEmpty
=> copyFile(c.bucket, localFile, otherMatches)
case S3MetaData(localFile, matchByHash, None) if matchByHash.nonEmpty =>
copyFile(c.bucket, localFile, matchByHash)
// #3 local exists, remote is missing, other no matches - upload
case S3MetaData(localFile, otherMatches, None)
if otherMatches.isEmpty &&
remoteNameNotAlreadyQueued(localFile, previousActions)
=> uploadFile(c.bucket, localFile)
case S3MetaData(localFile, matchByHash, None)
if matchByHash.isEmpty &&
isUploadAlreadyQueued(previousActions)(localFile) =>
uploadFile(c.bucket, localFile)
// #4 local exists, remote exists, remote no match, other matches - copy
case S3MetaData(localFile, otherMatches, Some(RemoteMetaData(_, remoteHash, _)))
if !localFile.matches(remoteHash) &&
otherMatches.nonEmpty
=> copyFile(c.bucket, localFile, otherMatches)
case S3MetaData(localFile, matchByHash, Some(RemoteMetaData(_, hash, _)))
if !localFile.matches(hash) &&
matchByHash.nonEmpty =>
copyFile(c.bucket, localFile, matchByHash)
// #5 local exists, remote exists, remote no match, other no matches - upload
case S3MetaData(localFile, hashMatches, Some(_))
if hashMatches.isEmpty
=> uploadFile(c.bucket, localFile)
case S3MetaData(localFile, matchByHash, Some(_)) if matchByHash.isEmpty =>
uploadFile(c.bucket, localFile)
// fallback
case S3MetaData(localFile, _, _) =>
doNothing(c.bucket, localFile.remoteKey)
}
private def doNothing(bucket: Bucket,
remoteKey: RemoteKey) =
Stream(
DoNothing(bucket, remoteKey, 0L))
def isUploadAlreadyQueued(
previousActions: Stream[Action]
)(
localFile: LocalFile
): Boolean = {
!previousActions.exists {
case ToUpload(_, lf, _) => lf.remoteKey.key equals localFile.remoteKey.key
case _ => false
}
}
private def uploadFile(bucket: Bucket,
localFile: LocalFile) =
Stream(
ToUpload(bucket, localFile, localFile.file.length))
private def doNothing(
bucket: Bucket,
remoteKey: RemoteKey
) =
Stream(DoNothing(bucket, remoteKey, 0L))
private def copyFile(bucket: Bucket,
private def uploadFile(
bucket: Bucket,
localFile: LocalFile
) =
Stream(ToUpload(bucket, localFile, localFile.file.length))
private def copyFile(
bucket: Bucket,
localFile: LocalFile,
matchByHash: Set[RemoteMetaData]): Stream[Action] = {
val headOption = matchByHash.headOption
headOption.toStream.map { remoteMetaData =>
val sourceKey = remoteMetaData.remoteKey
val hash = remoteMetaData.hash
ToCopy(bucket, sourceKey, hash, localFile.remoteKey, localFile.file.length)
}
matchByHash: Set[RemoteMetaData]
): Stream[Action] =
matchByHash
.map { remoteMetaData =>
ToCopy(bucket,
remoteMetaData.remoteKey,
remoteMetaData.hash,
localFile.remoteKey,
localFile.file.length)
}
.toStream
.take(1)
}

View file

@ -10,15 +10,11 @@ sealed trait ConfigOption {
}
object ConfigOption {
case object Version extends ConfigOption {
override def update(config: Config): Config = config
}
case object BatchMode extends ConfigOption {
override def update(config: Config): Config = config.copy(batchMode = true)
}
case class Source(path: Path) extends ConfigOption {
override def update(config: Config): Config = config.copy(sources = config.sources ++ path)
override def update(config: Config): Config =
config.copy(sources = config.sources ++ path)
}
case class Bucket(name: String) extends ConfigOption {
override def update(config: Config): Config =
if (config.bucket.name.isEmpty)
@ -26,6 +22,7 @@ object ConfigOption {
else
config
}
case class Prefix(path: String) extends ConfigOption {
override def update(config: Config): Config =
if (config.prefix.key.isEmpty)
@ -33,15 +30,29 @@ object ConfigOption {
else
config
}
case class Include(pattern: String) extends ConfigOption {
override def update(config: Config): Config = config.copy(filters = domain.Filter.Include(pattern) :: config.filters)
override def update(config: Config): Config =
config.copy(filters = domain.Filter.Include(pattern) :: config.filters)
}
case class Exclude(pattern: String) extends ConfigOption {
override def update(config: Config): Config = config.copy(filters = domain.Filter.Exclude(pattern) :: config.filters)
override def update(config: Config): Config =
config.copy(filters = domain.Filter.Exclude(pattern) :: config.filters)
}
case class Debug() extends ConfigOption {
override def update(config: Config): Config = config.copy(debug = true)
}
case object Version extends ConfigOption {
override def update(config: Config): Config = config
}
case object BatchMode extends ConfigOption {
override def update(config: Config): Config = config.copy(batchMode = true)
}
case object IgnoreUserOptions extends ConfigOption {
override def update(config: Config): Config = config
}

View file

@ -2,10 +2,14 @@ package net.kemitix.thorp.core
import cats.Semigroup
case class ConfigOptions(options: List[ConfigOption] = List())
extends Semigroup[ConfigOptions] {
case class ConfigOptions(
options: List[ConfigOption] = List()
) extends Semigroup[ConfigOptions] {
override def combine(x: ConfigOptions, y: ConfigOptions): ConfigOptions =
override def combine(
x: ConfigOptions,
y: ConfigOptions
): ConfigOptions =
x ++ y
def ++(other: ConfigOptions): ConfigOptions =

View file

@ -19,15 +19,16 @@ trait ConfigQuery {
configOptions contains ConfigOption.IgnoreGlobalOptions
def sources(configOptions: ConfigOptions): Sources = {
val paths = configOptions.options.flatMap( {
val paths = configOptions.options.flatMap {
case ConfigOption.Source(sourcePath) => Some(sourcePath)
case _ => None
})
}
Sources(paths match {
case List() => List(Paths.get(System.getenv("PWD")))
case _ => paths
})
}
}
object ConfigQuery extends ConfigQuery

View file

@ -10,18 +10,12 @@ sealed trait ConfigValidator {
type ValidationResult[A] = ValidatedNec[ConfigValidation, A]
def validateSourceIsDirectory(source: Path): ValidationResult[Path] =
if(source.toFile.isDirectory) source.validNec
else ConfigValidation.SourceIsNotADirectory.invalidNec
def validateSourceIsReadable(source: Path): ValidationResult[Path] =
if(source.toFile.canRead) source.validNec
else ConfigValidation.SourceIsNotReadable.invalidNec
def validateSource(source: Path): ValidationResult[Path] =
validateSourceIsDirectory(source)
.andThen(s =>
validateSourceIsReadable(s))
def validateConfig(
config: Config): Validated[NonEmptyChain[ConfigValidation], Config] =
(
validateSources(config.sources),
validateBucket(config.bucket)
).mapN((_, _) => config)
def validateBucket(bucket: Bucket): ValidationResult[Bucket] =
if (bucket.name.isEmpty) ConfigValidation.BucketNameIsMissing.invalidNec
@ -29,14 +23,21 @@ sealed trait ConfigValidator {
def validateSources(sources: Sources): ValidationResult[Sources] =
sources.paths
.map(validateSource).sequence
.map(validateSource)
.sequence
.map(_ => sources)
def validateConfig(config: Config): Validated[NonEmptyChain[ConfigValidation], Config] =
(
validateSources(config.sources),
validateBucket(config.bucket)
).mapN((_, _) => config)
def validateSource(source: Path): ValidationResult[Path] =
validateSourceIsDirectory(source)
.andThen(s => validateSourceIsReadable(s))
def validateSourceIsDirectory(source: Path): ValidationResult[Path] =
if (source.toFile.isDirectory) source.validNec
else ConfigValidation.SourceIsNotADirectory.invalidNec
def validateSourceIsReadable(source: Path): ValidationResult[Path] =
if (source.toFile.canRead) source.validNec
else ConfigValidation.SourceIsNotReadable.invalidNec
}
object ConfigValidator extends ConfigValidator

View file

@ -1,13 +1,12 @@
package net.kemitix.thorp.core
import java.nio.file.{Files, Path, Paths}
import java.nio.file.{Path, Paths}
import cats.data.NonEmptyChain
import cats.effect.IO
import cats.implicits._
import net.kemitix.thorp.core.ConfigValidator.validateConfig
import net.kemitix.thorp.core.ParseConfigFile.parseFile
import net.kemitix.thorp.domain.{Config, Sources}
import net.kemitix.thorp.domain.Config
/**
* Builds a configuration from settings in a file within the
@ -15,33 +14,42 @@ import net.kemitix.thorp.domain.{Config, Sources}
*/
trait ConfigurationBuilder {
def buildConfig(priorityOptions: ConfigOptions): IO[Either[NonEmptyChain[ConfigValidation], Config]] = {
val sources = ConfigQuery.sources(priorityOptions)
private val sourceConfigFilename = ".thorp.config"
private val userConfigFilename = ".config/thorp.conf"
private val globalConfig = Paths.get("/etc/thorp.conf")
private val userHome = Paths.get(System.getProperty("user.home"))
private val pwd = Paths.get(System.getenv("PWD"))
def buildConfig(priorityOpts: ConfigOptions)
: IO[Either[NonEmptyChain[ConfigValidation], Config]] = {
val sources = ConfigQuery.sources(priorityOpts)
for {
sourceOptions <- SourceConfigLoader.loadSourceConfigs(sources)
userOptions <- userOptions(priorityOptions ++ sourceOptions)
globalOptions <- globalOptions(priorityOptions ++ sourceOptions ++ userOptions)
collected = priorityOptions ++ sourceOptions ++ userOptions ++ globalOptions
userOptions <- userOptions(priorityOpts ++ sourceOptions)
globalOptions <- globalOptions(priorityOpts ++ sourceOptions ++ userOptions)
collected = priorityOpts ++ sourceOptions ++ userOptions ++ globalOptions
config = collateOptions(collected)
} yield validateConfig(config).toEither
}
private def userOptions(higherPriorityOptions: ConfigOptions): IO[ConfigOptions] =
if (ConfigQuery.ignoreUserOptions(higherPriorityOptions)) IO(ConfigOptions())
else readFile(userHome, ".config/thorp.conf")
private val emptyConfig = IO(ConfigOptions())
private def globalOptions(higherPriorityOptions: ConfigOptions): IO[ConfigOptions] =
if (ConfigQuery.ignoreGlobalOptions(higherPriorityOptions)) IO(ConfigOptions())
else parseFile(Paths.get("/etc/thorp.conf"))
private def userOptions(priorityOpts: ConfigOptions) =
if (ConfigQuery.ignoreUserOptions(priorityOpts)) emptyConfig
else readFile(userHome, userConfigFilename)
private def userHome = Paths.get(System.getProperty("user.home"))
private def globalOptions(priorityOpts: ConfigOptions) =
if (ConfigQuery.ignoreGlobalOptions(priorityOpts)) emptyConfig
else parseFile(globalConfig)
private def readFile(source: Path, filename: String): IO[ConfigOptions] =
private def readFile(
source: Path,
filename: String
) =
parseFile(source.resolve(filename))
private def collateOptions(configOptions: ConfigOptions): Config = {
private def collateOptions(configOptions: ConfigOptions): Config =
configOptions.options.foldLeft(Config())((c, co) => co.update(c))
}
}

View file

@ -1,6 +1,8 @@
package net.kemitix.thorp.core
final case class Counters(uploaded: Int = 0,
final case class Counters(
uploaded: Int = 0,
deleted: Int = 0,
copied: Int = 0,
errors: Int = 0)
errors: Int = 0
)

View file

@ -6,13 +6,17 @@ import net.kemitix.thorp.domain.{RemoteKey, Sources}
object KeyGenerator {
def generateKey(sources: Sources,
prefix: RemoteKey)
(path: Path): RemoteKey = {
def generateKey(
sources: Sources,
prefix: RemoteKey
)(path: Path): RemoteKey = {
val source = sources.forPath(path)
val relativePath = source.relativize(path.toAbsolutePath)
RemoteKey(List(prefix.key, relativePath.toString)
.filterNot(_.isEmpty)
val relativePath = source.relativize(path.toAbsolutePath).toString
RemoteKey(
List(
prefix.key,
relativePath
).filter(_.nonEmpty)
.mkString("/"))
}

View file

@ -1,6 +1,5 @@
package net.kemitix.thorp.core
import java.io.File
import java.nio.file.Path
import cats.effect.IO
@ -11,56 +10,64 @@ import net.kemitix.thorp.storage.api.HashService
object LocalFileStream {
def findFiles(source: Path,
hashService: HashService)
(implicit c: Config,
logger: Logger): IO[LocalFiles] = {
private val emptyIOLocalFiles = IO.pure(LocalFiles())
def findFiles(
source: Path,
hashService: HashService
)(
implicit c: Config,
logger: Logger
): IO[LocalFiles] = {
val isIncluded: Path => Boolean = Filter.isIncluded(c.filters)
val pathToLocalFile: Path => IO[LocalFiles] = path =>
localFile(hashService, logger, c)(path)
def loop(path: Path): IO[LocalFiles] = {
def dirPaths(path: Path): IO[Stream[Path]] =
IO(listFiles(path))
.map(fs =>
Stream(fs: _*)
.map(_.toPath)
.filter(isIncluded))
def dirPaths(path: Path) =
listFiles(path)
.map(_.filter(isIncluded))
def recurseIntoSubDirectories(path: Path): IO[LocalFiles] =
def recurseIntoSubDirectories(path: Path) =
path.toFile match {
case f if f.isDirectory => loop(path)
case _ => localFile(hashService, path)
case _ => pathToLocalFile(path)
}
def recurse(paths: Stream[Path]): IO[LocalFiles] =
paths.foldLeft(IO.pure(LocalFiles()))((acc, path) =>
def recurse(paths: Stream[Path]) =
paths.foldLeft(emptyIOLocalFiles)(
(acc, path) =>
recurseIntoSubDirectories(path)
.flatMap(localFiles => acc.map(accLocalFiles => accLocalFiles ++ localFiles)))
.flatMap(localFiles =>
acc.map(accLocalFiles => accLocalFiles ++ localFiles)))
for {
_ <- logger.debug(s"- Entering: $path")
fs <- dirPaths(path)
lfs <- recurse(fs)
paths <- dirPaths(path)
localFiles <- recurse(paths)
_ <- logger.debug(s"- Leaving : $path")
} yield lfs
} yield localFiles
}
loop(source)
}
private def localFile(hashService: HashService,
path: Path)
(implicit l: Logger, c: Config) = {
def localFile(
hashService: HashService,
l: Logger,
c: Config
): Path => IO[LocalFiles] =
path => {
val file = path.toFile
val source = c.sources.forPath(path)
for {
hash <- hashService.hashLocalObject(path)
hash <- hashService.hashLocalObject(path)(l)
} yield
LocalFiles(
localFiles = Stream(
domain.LocalFile(
file,
LocalFiles(localFiles = Stream(
domain.LocalFile(file,
source.toFile,
hash,
generateKey(c.sources, c.prefix)(path))),
@ -68,9 +75,15 @@ object LocalFileStream {
totalSizeBytes = file.length)
}
//TODO: Change this to return an Either[IllegalArgumentException, Array[File]]
private def listFiles(path: Path): Array[File] = {
//TODO: Change this to return an Either[IllegalArgumentException, Stream[Path]]
private def listFiles(path: Path) = {
IO(
Option(path.toFile.listFiles)
.getOrElse(throw new IllegalArgumentException(s"Directory not found $path"))
.map { fs =>
Stream(fs: _*)
.map(_.toPath)
}
.getOrElse(
throw new IllegalArgumentException(s"Directory not found $path")))
}
}

View file

@ -2,13 +2,16 @@ package net.kemitix.thorp.core
import net.kemitix.thorp.domain.LocalFile
case class LocalFiles(localFiles: Stream[LocalFile] = Stream(),
case class LocalFiles(
localFiles: Stream[LocalFile] = Stream(),
count: Long = 0,
totalSizeBytes: Long = 0) {
totalSizeBytes: Long = 0
) {
def ++(append: LocalFiles): LocalFiles =
copy(localFiles = localFiles ++ append.localFiles,
copy(
localFiles = localFiles ++ append.localFiles,
count = count + append.count,
totalSizeBytes = totalSizeBytes + append.totalSizeBytes)
totalSizeBytes = totalSizeBytes + append.totalSizeBytes
)
}

View file

@ -29,50 +29,11 @@ object MD5HashGenerator {
def md5File(path: Path)(implicit logger: Logger): IO[MD5Hash] =
md5FileChunk(path, 0, path.toFile.length)
private def openFile(file: File, offset: Long) = IO {
val stream = new FileInputStream(file)
stream skip offset
stream
}
private def closeFile(fis: FileInputStream) = IO(fis.close())
private def readFile(file: File, offset: Long, endOffset: Long) =
for {
fis <- openFile(file, offset)
digest <- digestFile(fis, offset, endOffset)
_ <- closeFile(fis)
} yield digest
private def digestFile(fis: FileInputStream, offset: Long, endOffset: Long) =
IO {
val md5 = MessageDigest getInstance "MD5"
NumericRange(offset, endOffset, maxBufferSize)
.foreach(currentOffset => md5 update readToBuffer(fis, currentOffset, endOffset))
md5.digest
}
private def readToBuffer(fis: FileInputStream,
currentOffset: Long,
endOffset: Long) = {
val buffer =
if (nextBufferSize(currentOffset, endOffset) < maxBufferSize)
new Array[Byte](nextBufferSize(currentOffset, endOffset))
else defaultBuffer
fis read buffer
buffer
}
private def nextBufferSize(currentOffset: Long, endOffset: Long) = {
val toRead = endOffset - currentOffset
val result = Math.min(maxBufferSize, toRead)
result.toInt
}
def md5FileChunk(path: Path,
def md5FileChunk(
path: Path,
offset: Long,
size: Long)
(implicit logger: Logger): IO[MD5Hash] = {
size: Long
)(implicit logger: Logger): IO[MD5Hash] = {
val file = path.toFile
val endOffset = Math.min(offset + size, file.length)
for {
@ -83,4 +44,60 @@ object MD5HashGenerator {
} yield hash
}
private def readFile(
file: File,
offset: Long,
endOffset: Long
) =
for {
fis <- openFile(file, offset)
digest <- digestFile(fis, offset, endOffset)
_ <- closeFile(fis)
} yield digest
private def openFile(
file: File,
offset: Long
) = IO {
val stream = new FileInputStream(file)
stream skip offset
stream
}
private def closeFile(fis: FileInputStream) = IO(fis.close())
private def digestFile(
fis: FileInputStream,
offset: Long,
endOffset: Long
) =
IO {
val md5 = MessageDigest getInstance "MD5"
NumericRange(offset, endOffset, maxBufferSize)
.foreach(currentOffset =>
md5 update readToBuffer(fis, currentOffset, endOffset))
md5.digest
}
private def readToBuffer(
fis: FileInputStream,
currentOffset: Long,
endOffset: Long
) = {
val buffer =
if (nextBufferSize(currentOffset, endOffset) < maxBufferSize)
new Array[Byte](nextBufferSize(currentOffset, endOffset))
else defaultBuffer
fis read buffer
buffer
}
private def nextBufferSize(
currentOffset: Long,
endOffset: Long
) = {
val toRead = endOffset - currentOffset
Math.min(maxBufferSize, toRead).toInt
}
}

View file

@ -9,7 +9,8 @@ import scala.collection.JavaConverters._
trait ParseConfigFile {
def parseFile(filename: Path): IO[ConfigOptions] =
readFile(filename).map(ParseConfigLines.parseLines)
readFile(filename)
.map(ParseConfigLines.parseLines)
private def readFile(filename: Path) = {
if (Files.exists(filename)) readFileThatExists(filename)

View file

@ -7,27 +7,22 @@ import net.kemitix.thorp.core.ConfigOption._
trait ParseConfigLines {
def parseLines(lines: List[String]): ConfigOptions =
ConfigOptions(lines.flatMap(parseLine))
private val pattern = "^\\s*(?<key>\\S*)\\s*=\\s*(?<value>\\S*)\\s*$"
private val format = Pattern.compile(pattern)
def parseLines(lines: List[String]): ConfigOptions =
ConfigOptions(lines.flatMap(parseLine))
private def parseLine(str: String) =
format.matcher(str) match {
case m if m.matches => parseKeyValue(m.group("key"), m.group("value"))
case _ =>None
case _ => None
}
def truthy(value: String): Boolean =
value.toLowerCase match {
case "true" => true
case "yes" => true
case "enabled" => true
case _ => false
}
private def parseKeyValue(key: String, value: String): Option[ConfigOption] =
private def parseKeyValue(
key: String,
value: String
): Option[ConfigOption] =
key.toLowerCase match {
case "source" => Some(Source(Paths.get(value)))
case "bucket" => Some(Bucket(value))
@ -38,6 +33,14 @@ trait ParseConfigLines {
case _ => None
}
def truthy(value: String): Boolean =
value.toLowerCase match {
case "true" => true
case "yes" => true
case "enabled" => true
case _ => false
}
}
object ParseConfigLines extends ParseConfigLines

View file

@ -9,10 +9,11 @@ import net.kemitix.thorp.storage.api.{HashService, StorageService}
trait PlanBuilder {
def createPlan(storageService: StorageService,
def createPlan(
storageService: StorageService,
hashService: HashService,
configOptions: ConfigOptions)
(implicit l: Logger): EitherT[IO, List[String], SyncPlan] =
configOptions: ConfigOptions
)(implicit l: Logger): EitherT[IO, List[String], SyncPlan] =
EitherT(ConfigurationBuilder.buildConfig(configOptions))
.leftMap(errorMessages)
.flatMap(config => useValidConfig(storageService, hashService)(config, l))
@ -20,90 +21,113 @@ trait PlanBuilder {
def errorMessages(errors: NonEmptyChain[ConfigValidation]): List[String] =
errors.map(cv => cv.errorMessage).toList
def removeDoNothing: Action => Boolean = {
def useValidConfig(
storageService: StorageService,
hashService: HashService
)(implicit c: Config, l: Logger): EitherT[IO, List[String], SyncPlan] =
for {
_ <- EitherT.liftF(SyncLogging.logRunStart(c.bucket, c.prefix, c.sources))
actions <- buildPlan(storageService, hashService)
} yield actions
private def buildPlan(
storageService: StorageService,
hashService: HashService
)(implicit c: Config, l: Logger) =
gatherMetadata(storageService, hashService)
.leftMap(List(_))
.map(assemblePlan)
def assemblePlan(
implicit c: Config): ((S3ObjectsData, LocalFiles)) => SyncPlan = {
case (remoteData, localData) =>
SyncPlan(
actions = createActions(remoteData, localData).filter(doesSomething),
syncTotals = SyncTotals(count = localData.count,
totalSizeBytes = localData.totalSizeBytes)
)
}
private def createActions(
remoteData: S3ObjectsData,
localData: LocalFiles
)(implicit c: Config): Stream[Action] =
actionsForLocalFiles(localData, remoteData) ++
actionsForRemoteKeys(remoteData)
def doesSomething: Action => Boolean = {
case _: DoNothing => false
case _ => true
}
def assemblePlan(implicit c: Config): ((S3ObjectsData, LocalFiles)) => SyncPlan = {
case (remoteData, localData) => {
val actions =
(actionsForLocalFiles(localData, remoteData) ++
actionsForRemoteKeys(remoteData))
.filter(removeDoNothing)
SyncPlan(
actions = actions,
syncTotals = SyncTotals(
count = localData.count,
totalSizeBytes = localData.totalSizeBytes))
}
}
private val emptyActionStream = Stream[Action]()
def useValidConfig(storageService: StorageService,
hashService: HashService)
(implicit c: Config, l: Logger): EitherT[IO, List[String], SyncPlan] = {
for {
_ <- EitherT.liftF(SyncLogging.logRunStart(c.bucket, c.prefix, c.sources))
actions <- gatherMetadata(storageService, hashService)
.leftMap(error => List(error))
.map(assemblePlan)
} yield actions
}
private def actionsForLocalFiles(
localData: LocalFiles,
remoteData: S3ObjectsData
)(implicit c: Config) =
localData.localFiles.foldLeft(emptyActionStream)((acc, lf) =>
createActionFromLocalFile(lf, remoteData, acc) ++ acc)
private def gatherMetadata(storageService: StorageService,
hashService: HashService)
(implicit l: Logger,
private def createActionFromLocalFile(
lf: LocalFile,
remoteData: S3ObjectsData,
previousActions: Stream[Action]
)(implicit c: Config) =
ActionGenerator.createActions(
S3MetaDataEnricher.getMetadata(lf, remoteData),
previousActions)
private def actionsForRemoteKeys(remoteData: S3ObjectsData)(
implicit c: Config) =
remoteData.byKey.keys.foldLeft(emptyActionStream)((acc, rk) =>
createActionFromRemoteKey(rk) #:: acc)
private def createActionFromRemoteKey(rk: RemoteKey)(implicit c: Config) =
if (rk.isMissingLocally(c.sources, c.prefix))
Action.ToDelete(c.bucket, rk, 0L)
else DoNothing(c.bucket, rk, 0L)
private def gatherMetadata(
storageService: StorageService,
hashService: HashService
)(implicit l: Logger,
c: Config): EitherT[IO, String, (S3ObjectsData, LocalFiles)] =
for {
remoteData <- fetchRemoteData(storageService)
localData <- EitherT.liftF(findLocalFiles(hashService))
} yield (remoteData, localData)
private def actionsForLocalFiles(localData: LocalFiles, remoteData: S3ObjectsData)
(implicit c: Config) =
localData.localFiles.foldLeft(Stream[Action]())((acc, lf) => createActionFromLocalFile(lf, remoteData, acc) ++ acc)
private def actionsForRemoteKeys(remoteData: S3ObjectsData)
(implicit c: Config) =
remoteData.byKey.keys.foldLeft(Stream[Action]())((acc, rk) => createActionFromRemoteKey(rk) #:: acc)
private def fetchRemoteData(storageService: StorageService)
(implicit c: Config, l: Logger) =
private def fetchRemoteData(
storageService: StorageService
)(implicit c: Config, l: Logger) =
storageService.listObjects(c.bucket, c.prefix)
private def findLocalFiles(hashService: HashService)
(implicit config: Config, l: Logger) =
private def findLocalFiles(
hashService: HashService
)(implicit config: Config, l: Logger) =
for {
_ <- SyncLogging.logFileScan
localFiles <- findFiles(hashService)
} yield localFiles
private def findFiles(hashService: HashService)
(implicit c: Config, l: Logger): IO[LocalFiles] = {
private def findFiles(
hashService: HashService
)(implicit c: Config, l: Logger) = {
val ioListLocalFiles = (for {
source <- c.sources.paths
} yield LocalFileStream.findFiles(source, hashService)).sequence
for {
listLocalFiles <- ioListLocalFiles
localFiles = listLocalFiles.foldRight(LocalFiles()){
(acc, moreLocalFiles) => {
localFiles = listLocalFiles.foldRight(LocalFiles()) {
(acc, moreLocalFiles) =>
{
acc ++ moreLocalFiles
}
}
} yield localFiles
}
private def createActionFromLocalFile(lf: LocalFile,
remoteData: S3ObjectsData,
previousActions: Stream[Action])
(implicit c: Config) =
ActionGenerator.createActions(S3MetaDataEnricher.getMetadata(lf, remoteData), previousActions)
private def createActionFromRemoteKey(rk: RemoteKey)
(implicit c: Config) =
if (rk.isMissingLocally(c.sources, c.prefix)) Action.ToDelete(c.bucket, rk, 0L)
else DoNothing(c.bucket, rk, 0L)
}
object PlanBuilder extends PlanBuilder

View file

@ -6,10 +6,11 @@ import scala.util.Try
object Resource {
def apply(base: AnyRef,
name: String): File = {
Try{
def apply(
base: AnyRef,
name: String
): File =
Try {
new File(base.getClass.getResource(name).getPath)
}.getOrElse(throw new FileNotFoundException(name))
}
}

View file

@ -4,23 +4,36 @@ import net.kemitix.thorp.domain._
object S3MetaDataEnricher {
def getMetadata(localFile: LocalFile,
s3ObjectsData: S3ObjectsData)
(implicit c: Config): S3MetaData = {
def getMetadata(
localFile: LocalFile,
s3ObjectsData: S3ObjectsData
)(implicit c: Config): S3MetaData = {
val (keyMatches, hashMatches) = getS3Status(localFile, s3ObjectsData)
S3MetaData(localFile,
matchByKey = keyMatches map { hm => RemoteMetaData(localFile.remoteKey, hm.hash, hm.modified) },
matchByHash = hashMatches map { case (hash, km) => RemoteMetaData(km.key, hash, km.modified) })
S3MetaData(
localFile,
matchByKey = keyMatches.map { hm =>
RemoteMetaData(localFile.remoteKey, hm.hash, hm.modified)
},
matchByHash = hashMatches.map {
case (hash, km) => RemoteMetaData(km.key, hash, km.modified)
}
)
}
def getS3Status(localFile: LocalFile,
s3ObjectsData: S3ObjectsData): (Option[HashModified], Set[(MD5Hash, KeyModified)]) = {
def getS3Status(
localFile: LocalFile,
s3ObjectsData: S3ObjectsData
): (Option[HashModified], Set[(MD5Hash, KeyModified)]) = {
val matchingByKey = s3ObjectsData.byKey.get(localFile.remoteKey)
val matchingByHash = localFile.hashes
.map { case(_, md5Hash) =>
s3ObjectsData.byHash.getOrElse(md5Hash, Set())
.map {
case (_, md5Hash) =>
s3ObjectsData.byHash
.getOrElse(md5Hash, Set())
.map(km => (md5Hash, km))
}.flatten.toSet
}
.flatten
.toSet
(matchingByKey, matchingByHash)
}

View file

@ -8,12 +8,11 @@ import net.kemitix.thorp.storage.api.HashService
case class SimpleHashService() extends HashService {
override def hashLocalObject(path: Path)
(implicit l: Logger): IO[Map[String, MD5Hash]] =
override def hashLocalObject(
path: Path
)(implicit l: Logger): IO[Map[String, MD5Hash]] =
for {
md5 <- MD5HashGenerator.md5File(path)
} yield Map(
"md5" -> md5
)
} yield Map("md5" -> md5)
}

View file

@ -1,7 +1,5 @@
package net.kemitix.thorp.core
import java.nio.file.{Files, Path}
import cats.effect.IO
import cats.implicits._
import net.kemitix.thorp.domain.Sources

View file

@ -2,15 +2,21 @@ package net.kemitix.thorp.core
import cats.effect.IO
import cats.implicits._
import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, ErrorQueueEvent, UploadQueueEvent}
import net.kemitix.thorp.domain.StorageQueueEvent.{
CopyQueueEvent,
DeleteQueueEvent,
ErrorQueueEvent,
UploadQueueEvent
}
import net.kemitix.thorp.domain._
trait SyncLogging {
def logRunStart(bucket: Bucket,
def logRunStart(
bucket: Bucket,
prefix: RemoteKey,
sources: Sources)
(implicit logger: Logger): IO[Unit] = {
sources: Sources
)(implicit logger: Logger): IO[Unit] = {
val sourcesList = sources.paths.mkString(", ")
logger.info(s"Bucket: ${bucket.name}, Prefix: ${prefix.key}, Source: $sourcesList")
}
@ -19,17 +25,9 @@ trait SyncLogging {
logger: Logger): IO[Unit] =
logger.info(s"Scanning local files: ${c.sources.paths.mkString(", ")}...")
def logErrors(actions: Stream[StorageQueueEvent])
(implicit logger: Logger): IO[Unit] =
for {
_ <- actions.map {
case ErrorQueueEvent(k, e) => logger.warn(s"${k.key}: ${e.getMessage}")
case _ => IO.unit
}.sequence
} yield ()
def logRunFinished(actions: Stream[StorageQueueEvent])
(implicit logger: Logger): IO[Unit] = {
def logRunFinished(
actions: Stream[StorageQueueEvent]
)(implicit logger: Logger): IO[Unit] = {
val counters = actions.foldLeft(Counters())(countActivities)
for {
_ <- logger.info(s"Uploaded ${counters.uploaded} files")
@ -40,6 +38,16 @@ trait SyncLogging {
} yield ()
}
def logErrors(
actions: Stream[StorageQueueEvent]
)(implicit logger: Logger): IO[Unit] =
for {
_ <- actions.map {
case ErrorQueueEvent(k, e) => logger.warn(s"${k.key}: ${e.getMessage}")
case _ => IO.unit
}.sequence
} yield ()
private def countActivities: (Counters, StorageQueueEvent) => Counters =
(counters: Counters, s3Action: StorageQueueEvent) => {
s3Action match {

View file

@ -2,5 +2,7 @@ package net.kemitix.thorp.core
import net.kemitix.thorp.domain.SyncTotals
case class SyncPlan(actions: Stream[Action] = Stream(),
syncTotals: SyncTotals = SyncTotals())
case class SyncPlan(
actions: Stream[Action] = Stream(),
syncTotals: SyncTotals = SyncTotals()
)

View file

@ -5,14 +5,17 @@ import net.kemitix.thorp.domain.{LocalFile, Logger, StorageQueueEvent}
trait ThorpArchive {
def update(index: Int,
def update(
index: Int,
action: Action,
totalBytesSoFar: Long)
(implicit l: Logger): Stream[IO[StorageQueueEvent]]
totalBytesSoFar: Long
)(implicit l: Logger): Stream[IO[StorageQueueEvent]]
def fileUploaded(localFile: LocalFile,
batchMode: Boolean)
(implicit l: Logger): IO[Unit] =
if (batchMode) l.info(s"Uploaded: ${localFile.remoteKey.key}") else IO.unit
def logFileUploaded(
localFile: LocalFile,
batchMode: Boolean
)(implicit l: Logger): IO[Unit] =
if (batchMode) l.info(s"Uploaded: ${localFile.remoteKey.key}")
else IO.unit
}

View file

@ -3,41 +3,64 @@ package net.kemitix.thorp.core
import cats.effect.IO
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent
import net.kemitix.thorp.domain.{Logger, StorageQueueEvent, SyncTotals, UploadEventListener}
import net.kemitix.thorp.domain.{
Bucket,
LocalFile,
Logger,
StorageQueueEvent,
SyncTotals,
UploadEventListener
}
import net.kemitix.thorp.storage.api.StorageService
case class UnversionedMirrorArchive(storageService: StorageService,
case class UnversionedMirrorArchive(
storageService: StorageService,
batchMode: Boolean,
syncTotals: SyncTotals) extends ThorpArchive {
override def update(index: Int,
syncTotals: SyncTotals
) extends ThorpArchive {
override def update(
index: Int,
action: Action,
totalBytesSoFar: Long)
(implicit l: Logger): Stream[IO[StorageQueueEvent]] =
Stream(
action match {
case ToUpload(bucket, localFile, size) =>
totalBytesSoFar: Long
)(implicit l: Logger): Stream[IO[StorageQueueEvent]] =
Stream(action match {
case ToUpload(bucket, localFile, _) =>
for {
event <- storageService.upload(localFile, bucket, batchMode,
new UploadEventListener(localFile, index, syncTotals, totalBytesSoFar), 1)
_ <- fileUploaded(localFile, batchMode)
event <- doUpload(index, totalBytesSoFar, bucket, localFile)
_ <- logFileUploaded(localFile, batchMode)
} yield event
case ToCopy(bucket, sourceKey, hash, targetKey, size) =>
case ToCopy(bucket, sourceKey, hash, targetKey, _) =>
for {
event <- storageService.copy(bucket, sourceKey, hash, targetKey)
} yield event
case ToDelete(bucket, remoteKey, size) =>
case ToDelete(bucket, remoteKey, _) =>
for {
event <- storageService.delete(bucket, remoteKey)
} yield event
case DoNothing(_, remoteKey, size) =>
case DoNothing(_, remoteKey, _) =>
IO.pure(DoNothingQueueEvent(remoteKey))
})
private def doUpload(
index: Int,
totalBytesSoFar: Long,
bucket: Bucket,
localFile: LocalFile
) =
storageService.upload(
localFile,
bucket,
batchMode,
new UploadEventListener(localFile, index, syncTotals, totalBytesSoFar),
1)
}
object UnversionedMirrorArchive {
def default(storageService: StorageService,
def default(
storageService: StorageService,
batchMode: Boolean,
syncTotals: SyncTotals): ThorpArchive =
syncTotals: SyncTotals
): ThorpArchive =
new UnversionedMirrorArchive(storageService, batchMode, syncTotals)
}

View file

@ -6,55 +6,75 @@ import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToUpload}
import net.kemitix.thorp.domain._
import org.scalatest.FunSpec
class ActionGeneratorSuite
extends FunSpec {
class ActionGeneratorSuite extends FunSpec {
val lastModified = LastModified(Instant.now())
private val source = Resource(this, "upload")
private val sourcePath = source.toPath
private val prefix = RemoteKey("prefix")
private val bucket = Bucket("bucket")
implicit private val config: Config = Config(bucket, prefix, sources = Sources(List(sourcePath)))
private val fileToKey = KeyGenerator.generateKey(config.sources, config.prefix) _
val lastModified = LastModified(Instant.now())
implicit private val config: Config =
Config(bucket, prefix, sources = Sources(List(sourcePath)))
private val fileToKey =
KeyGenerator.generateKey(config.sources, config.prefix) _
describe("create actions") {
val previousActions = Stream.empty[Action]
def invoke(input: S3MetaData) = ActionGenerator.createActions(input, previousActions).toList
def invoke(input: S3MetaData) =
ActionGenerator.createActions(input, previousActions).toList
describe("#1 local exists, remote exists, remote matches - do nothing") {
val theHash = MD5Hash("the-hash")
val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey)
val theRemoteMetadata = RemoteMetaData(theFile.remoteKey, theHash, lastModified)
val input = S3MetaData(theFile, // local exists
val theFile = LocalFile.resolve("the-file",
md5HashMap(theHash),
sourcePath,
fileToKey)
val theRemoteMetadata =
RemoteMetaData(theFile.remoteKey, theHash, lastModified)
val input =
S3MetaData(theFile, // local exists
matchByHash = Set(theRemoteMetadata), // remote matches
matchByKey = Some(theRemoteMetadata) // remote exists
)
it("do nothing") {
val expected = List(DoNothing(bucket, theFile.remoteKey, theFile.file.length))
val expected =
List(DoNothing(bucket, theFile.remoteKey, theFile.file.length))
val result = invoke(input)
assertResult(expected)(result)
}
}
describe("#2 local exists, remote is missing, other matches - copy") {
val theHash = MD5Hash("the-hash")
val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey)
val theFile = LocalFile.resolve("the-file",
md5HashMap(theHash),
sourcePath,
fileToKey)
val theRemoteKey = theFile.remoteKey
val otherRemoteKey = prefix.resolve("other-key")
val otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash, lastModified)
val input = S3MetaData(theFile, // local exists
val otherRemoteMetadata =
RemoteMetaData(otherRemoteKey, theHash, lastModified)
val input =
S3MetaData(theFile, // local exists
matchByHash = Set(otherRemoteMetadata), // other matches
matchByKey = None) // remote is missing
it("copy from other key") {
val expected = List(ToCopy(bucket, otherRemoteKey, theHash, theRemoteKey, theFile.file.length)) // copy
val expected = List(
ToCopy(bucket,
otherRemoteKey,
theHash,
theRemoteKey,
theFile.file.length)) // copy
val result = invoke(input)
assertResult(expected)(result)
}
}
describe("#3 local exists, remote is missing, other no matches - upload") {
val theHash = MD5Hash("the-hash")
val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey)
val theFile = LocalFile.resolve("the-file",
md5HashMap(theHash),
sourcePath,
fileToKey)
val input = S3MetaData(theFile, // local exists
matchByHash = Set.empty, // other no matches
matchByKey = None) // remote is missing
@ -64,32 +84,49 @@ class ActionGeneratorSuite
assertResult(expected)(result)
}
}
describe("#4 local exists, remote exists, remote no match, other matches - copy") {
describe(
"#4 local exists, remote exists, remote no match, other matches - copy") {
val theHash = MD5Hash("the-hash")
val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey)
val theFile = LocalFile.resolve("the-file",
md5HashMap(theHash),
sourcePath,
fileToKey)
val theRemoteKey = theFile.remoteKey
val oldHash = MD5Hash("old-hash")
val otherRemoteKey = prefix.resolve("other-key")
val otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash, lastModified)
val otherRemoteMetadata =
RemoteMetaData(otherRemoteKey, theHash, lastModified)
val oldRemoteMetadata = RemoteMetaData(theRemoteKey,
hash = oldHash, // remote no match
lastModified = lastModified)
val input = S3MetaData(theFile, // local exists
val input =
S3MetaData(theFile, // local exists
matchByHash = Set(otherRemoteMetadata), // other matches
matchByKey = Some(oldRemoteMetadata)) // remote exists
it("copy from other key") {
val expected = List(ToCopy(bucket, otherRemoteKey, theHash, theRemoteKey, theFile.file.length)) // copy
val expected = List(
ToCopy(bucket,
otherRemoteKey,
theHash,
theRemoteKey,
theFile.file.length)) // copy
val result = invoke(input)
assertResult(expected)(result)
}
}
describe("#5 local exists, remote exists, remote no match, other no matches - upload") {
describe(
"#5 local exists, remote exists, remote no match, other no matches - upload") {
val theHash = MD5Hash("the-hash")
val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey)
val theFile = LocalFile.resolve("the-file",
md5HashMap(theHash),
sourcePath,
fileToKey)
val theRemoteKey = theFile.remoteKey
val oldHash = MD5Hash("old-hash")
val theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash, lastModified)
val input = S3MetaData(theFile, // local exists
val theRemoteMetadata =
RemoteMetaData(theRemoteKey, oldHash, lastModified)
val input =
S3MetaData(theFile, // local exists
matchByHash = Set.empty, // remote no match, other no match
matchByKey = Some(theRemoteMetadata) // remote exists
)

View file

@ -9,7 +9,8 @@ class ConfigOptionTest extends FunSpec with TemporaryFolder {
it("should preserve their order") {
withDirectory(path1 => {
withDirectory(path2 => {
val configOptions = ConfigOptions(List(
val configOptions = ConfigOptions(
List(
ConfigOption.Source(path1),
ConfigOption.Source(path2),
ConfigOption.Bucket("bucket"),

View file

@ -6,8 +6,6 @@ import net.kemitix.thorp.domain.Filter.{Exclude, Include}
import net.kemitix.thorp.domain._
import org.scalatest.FunSpec
import scala.language.postfixOps
class ConfigurationBuilderTest extends FunSpec with TemporaryFolder {
private val pwd: Path = Paths.get(System.getenv("PWD"))
@ -16,7 +14,8 @@ class ConfigurationBuilderTest extends FunSpec with TemporaryFolder {
private val thorpConfigFileName = ".thorp.conf"
private def configOptions(options: ConfigOption*): ConfigOptions =
ConfigOptions(List(
ConfigOptions(
List(
ConfigOption.IgnoreUserOptions,
ConfigOption.IgnoreGlobalOptions
) ++ options)
@ -70,8 +69,7 @@ class ConfigurationBuilderTest extends FunSpec with TemporaryFolder {
withDirectory(currentSource => {
withDirectory(previousSource => {
val expected = Right(List(currentSource, previousSource))
val options = configOptions(
ConfigOption.Source(currentSource),
val options = configOptions(ConfigOption.Source(currentSource),
ConfigOption.Source(previousSource),
coBucket)
val result = invoke(options).map(_.sources.paths)
@ -84,12 +82,12 @@ class ConfigurationBuilderTest extends FunSpec with TemporaryFolder {
it("should include both sources in order") {
withDirectory(currentSource => {
withDirectory(previousSource => {
writeFile(currentSource, thorpConfigFileName,
writeFile(currentSource,
thorpConfigFileName,
s"source = $previousSource")
val expected = Right(List(currentSource, previousSource))
val options = configOptions(
ConfigOption.Source(currentSource),
coBucket)
val options =
configOptions(ConfigOption.Source(currentSource), coBucket)
val result = invoke(options).map(_.sources.paths)
assertResult(expected)(result)
})
@ -99,19 +97,24 @@ class ConfigurationBuilderTest extends FunSpec with TemporaryFolder {
it("should include settings from only current") {
withDirectory(previousSource => {
withDirectory(currentSource => {
writeFile(currentSource, thorpConfigFileName,
writeFile(
currentSource,
thorpConfigFileName,
s"source = $previousSource",
"bucket = current-bucket",
"prefix = current-prefix",
"include = current-include",
"exclude = current-exclude")
writeFile(previousSource, thorpConfigFileName,
"exclude = current-exclude"
)
writeFile(previousSource,
thorpConfigFileName,
"bucket = previous-bucket",
"prefix = previous-prefix",
"include = previous-include",
"exclude = previous-exclude")
// should have both sources in order
val expectedSources = Right(Sources(List(currentSource, previousSource)))
val expectedSources =
Right(Sources(List(currentSource, previousSource)))
// should have bucket from current only
val expectedBuckets = Right(Bucket("current-bucket"))
// should have prefix from current only
@ -136,7 +139,9 @@ class ConfigurationBuilderTest extends FunSpec with TemporaryFolder {
it("should only include first two sources") {
withDirectory(currentSource => {
withDirectory(parentSource => {
writeFile(currentSource, thorpConfigFileName, s"source = $parentSource")
writeFile(currentSource,
thorpConfigFileName,
s"source = $parentSource")
withDirectory(grandParentSource => {
writeFile(parentSource, thorpConfigFileName, s"source = $grandParentSource")
val expected = Right(List(currentSource, parentSource))

View file

@ -9,8 +9,8 @@ import net.kemitix.thorp.storage.api.HashService
case class DummyHashService(hashes: Map[Path, Map[String, MD5Hash]])
extends HashService {
override def hashLocalObject(path: Path)
(implicit l: Logger): IO[Map[String, MD5Hash]] =
override def hashLocalObject(path: Path)(
implicit l: Logger): IO[Map[String, MD5Hash]] =
IO.pure(hashes(path))
}

View file

@ -7,7 +7,7 @@ class DummyLogger extends Logger {
override def debug(message: => String): IO[Unit] = IO.unit
override def info(message: =>String): IO[Unit] = IO.unit
override def info(message: => String): IO[Unit] = IO.unit
override def warn(message: String): IO[Unit] = IO.unit

View file

@ -14,9 +14,8 @@ case class DummyStorageService(s3ObjectData: S3ObjectsData,
override def shutdown: IO[StorageQueueEvent] =
IO.pure(StorageQueueEvent.ShutdownQueueEvent())
override def listObjects(bucket: Bucket,
prefix: RemoteKey)
(implicit l: Logger): EitherT[IO, String, S3ObjectsData] =
override def listObjects(bucket: Bucket, prefix: RemoteKey)(
implicit l: Logger): EitherT[IO, String, S3ObjectsData] =
EitherT.liftF(IO.pure(s3ObjectData))
override def upload(localFile: LocalFile,

View file

@ -10,22 +10,26 @@ class KeyGeneratorSuite extends FunSpec {
private val source: File = Resource(this, "upload")
private val sourcePath = source.toPath
private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
private val fileToKey = KeyGenerator.generateKey(config.sources, config.prefix) _
implicit private val config: Config =
Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
private val fileToKey =
KeyGenerator.generateKey(config.sources, config.prefix) _
describe("key generator") {
describe("when file is within source") {
it("has a valid key") {
val subdir = "subdir"
assertResult(RemoteKey(s"${prefix.key}/$subdir"))(fileToKey(sourcePath.resolve(subdir)))
assertResult(RemoteKey(s"${prefix.key}/$subdir"))(
fileToKey(sourcePath.resolve(subdir)))
}
}
describe("when file is deeper within source") {
it("has a valid key") {
val subdir = "subdir/deeper/still"
assertResult(RemoteKey(s"${prefix.key}/$subdir"))(fileToKey(sourcePath.resolve(subdir)))
assertResult(RemoteKey(s"${prefix.key}/$subdir"))(
fileToKey(sourcePath.resolve(subdir)))
}
}
}

View file

@ -2,7 +2,7 @@ package net.kemitix.thorp.core
import java.nio.file.Paths
import net.kemitix.thorp.domain.{Config, LocalFile, Logger, MD5HashData, Sources}
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.HashService
import org.scalatest.FunSpec
@ -10,7 +10,8 @@ class LocalFileStreamSuite extends FunSpec {
private val source = Resource(this, "upload")
private val sourcePath = source.toPath
private val hashService: HashService = DummyHashService(Map(
private val hashService: HashService = DummyHashService(
Map(
file("root-file") -> Map("md5" -> MD5HashData.Root.hash),
file("subdir/leaf-file") -> Map("md5" -> MD5HashData.Leaf.hash)
))
@ -18,14 +19,17 @@ class LocalFileStreamSuite extends FunSpec {
private def file(filename: String) =
sourcePath.resolve(Paths.get(filename))
implicit private val config: Config = Config(sources = Sources(List(sourcePath)))
implicit private val config: Config = Config(
sources = Sources(List(sourcePath)))
implicit private val logger: Logger = new DummyLogger
describe("findFiles") {
it("should find all files") {
val result: Set[String] =
invoke.localFiles.toSet
.map { x: LocalFile => x.relative.toString }
.map { x: LocalFile =>
x.relative.toString
}
assertResult(Set("subdir/leaf-file", "root-file"))(result)
}
it("should count all files") {

View file

@ -9,7 +9,8 @@ class MD5HashGeneratorTest extends FunSpec {
private val source = Resource(this, "upload")
private val sourcePath = source.toPath
private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
implicit private val config: Config =
Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
implicit private val logger: Logger = new DummyLogger
describe("read a small file (smaller than buffer)") {
@ -32,13 +33,17 @@ class MD5HashGeneratorTest extends FunSpec {
it("should generate the correct hash for first chunk of the file") {
val part1 = MD5HashData.BigFile.Part1
val expected = part1.hash
val result = MD5HashGenerator.md5FileChunk(path, part1.offset, part1.size).unsafeRunSync
val result = MD5HashGenerator
.md5FileChunk(path, part1.offset, part1.size)
.unsafeRunSync
assertResult(expected)(result)
}
it("should generate the correcy hash for second chunk of the file") {
val part2 = MD5HashData.BigFile.Part2
val expected = part2.hash
val result = MD5HashGenerator.md5FileChunk(path, part2.offset, part2.size).unsafeRunSync
val result = MD5HashGenerator
.md5FileChunk(path, part2.offset, part2.size)
.unsafeRunSync
assertResult(expected)(result)
}
}

View file

@ -6,9 +6,11 @@ import org.scalatest.FunSpec
class ParseConfigFileTest extends FunSpec {
private def invoke(filename: Path) = ParseConfigFile.parseFile(filename).unsafeRunSync
private val empty = ConfigOptions()
private def invoke(filename: Path) =
ParseConfigFile.parseFile(filename).unsafeRunSync
describe("parse a missing file") {
val filename = Paths.get("/path/to/missing/file")
it("should return no options") {
@ -29,8 +31,8 @@ class ParseConfigFileTest extends FunSpec {
}
describe("parse a file with properties") {
val filename = Resource(this, "simple-config").toPath
val expected = ConfigOptions(List(
ConfigOption.Source(Paths.get("/path/to/source")),
val expected = ConfigOptions(
List(ConfigOption.Source(Paths.get("/path/to/source")),
ConfigOption.Bucket("bucket-name")))
it("should return some options") {
assertResult(expected)(invoke(filename))

View file

@ -9,8 +9,10 @@ class ParseConfigLinesTest extends FunSpec {
describe("parse single lines") {
describe("source") {
it("should parse") {
val expected = ConfigOptions(List(ConfigOption.Source(Paths.get("/path/to/source"))))
val result = ParseConfigLines.parseLines(List("source = /path/to/source"))
val expected =
ConfigOptions(List(ConfigOption.Source(Paths.get("/path/to/source"))))
val result =
ParseConfigLines.parseLines(List("source = /path/to/source"))
assertResult(expected)(result)
}
}
@ -23,22 +25,28 @@ class ParseConfigLinesTest extends FunSpec {
}
describe("prefix") {
it("should parse") {
val expected = ConfigOptions(List(ConfigOption.Prefix("prefix/to/files")))
val result = ParseConfigLines.parseLines(List("prefix = prefix/to/files"))
val expected =
ConfigOptions(List(ConfigOption.Prefix("prefix/to/files")))
val result =
ParseConfigLines.parseLines(List("prefix = prefix/to/files"))
assertResult(expected)(result)
}
}
describe("include") {
it("should parse") {
val expected = ConfigOptions(List(ConfigOption.Include("path/to/include")))
val result = ParseConfigLines.parseLines(List("include = path/to/include"))
val expected =
ConfigOptions(List(ConfigOption.Include("path/to/include")))
val result =
ParseConfigLines.parseLines(List("include = path/to/include"))
assertResult(expected)(result)
}
}
describe("exclude") {
it("should parse") {
val expected = ConfigOptions(List(ConfigOption.Exclude("path/to/exclude")))
val result = ParseConfigLines.parseLines(List("exclude = path/to/exclude"))
val expected =
ConfigOptions(List(ConfigOption.Exclude("path/to/exclude")))
val result =
ParseConfigLines.parseLines(List("exclude = path/to/exclude"))
assertResult(expected)(result)
}
}

View file

@ -9,12 +9,10 @@ import net.kemitix.thorp.storage.api.{HashService, StorageService}
import org.scalatest.FreeSpec
class PlanBuilderTest extends FreeSpec with TemporaryFolder {
private val planBuilder = new PlanBuilder {}
private val emptyS3ObjectData = S3ObjectsData()
private implicit val logger: Logger = new DummyLogger
val lastModified: LastModified = LastModified()
private val planBuilder = new PlanBuilder {}
private implicit val logger: Logger = new DummyLogger
private val emptyS3ObjectData = S3ObjectsData()
"create a plan" - {
@ -31,16 +29,21 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val file = createFile(source, filename, "file-content")
val hash = md5Hash(file)
val expected = Right(List(
val expected = Right(
List(
toUpload(remoteKey, hash, source, file)
))
val storageService = DummyStorageService(emptyS3ObjectData, Map(
val storageService =
DummyStorageService(emptyS3ObjectData,
Map(
file -> (remoteKey, hash)
))
val result = invoke(storageService, hashService, configOptions(
ConfigOption.Source(source),
val result =
invoke(storageService,
hashService,
configOptions(ConfigOption.Source(source),
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result)
@ -58,21 +61,27 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val anOtherKey = RemoteKey("other")
val expected = Right(List(
val expected = Right(
List(
toCopy(anOtherKey, aHash, remoteKey)
))
val s3ObjectsData = S3ObjectsData(
byHash = Map(aHash -> Set(KeyModified(anOtherKey, lastModified))),
byHash =
Map(aHash -> Set(KeyModified(anOtherKey, lastModified))),
byKey = Map(anOtherKey -> HashModified(aHash, lastModified))
)
val storageService = DummyStorageService(s3ObjectsData, Map(
val storageService =
DummyStorageService(s3ObjectsData,
Map(
aFile -> (remoteKey, aHash)
))
val result = invoke(storageService, hashService, configOptions(
ConfigOption.Source(source),
val result =
invoke(storageService,
hashService,
configOptions(ConfigOption.Source(source),
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result)
@ -91,16 +100,21 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val expected = Right(List())
val s3ObjectsData = S3ObjectsData(
byHash = Map(hash -> Set(KeyModified(remoteKey, lastModified))),
byHash =
Map(hash -> Set(KeyModified(remoteKey, lastModified))),
byKey = Map(remoteKey -> HashModified(hash, lastModified))
)
val storageService = DummyStorageService(s3ObjectsData, Map(
val storageService =
DummyStorageService(s3ObjectsData,
Map(
file -> (remoteKey, hash)
))
val result = invoke(storageService, hashService, configOptions(
ConfigOption.Source(source),
val result =
invoke(storageService,
hashService,
configOptions(ConfigOption.Source(source),
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result)
@ -115,21 +129,28 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val currentHash = md5Hash(file)
val originalHash = MD5Hash("original-file-content")
val expected = Right(List(
val expected = Right(
List(
toUpload(remoteKey, currentHash, source, file)
))
val s3ObjectsData = S3ObjectsData(
byHash = Map(originalHash -> Set(KeyModified(remoteKey, lastModified))),
byKey = Map(remoteKey -> HashModified(originalHash, lastModified))
byHash = Map(originalHash -> Set(
KeyModified(remoteKey, lastModified))),
byKey =
Map(remoteKey -> HashModified(originalHash, lastModified))
)
val storageService = DummyStorageService(s3ObjectsData, Map(
val storageService =
DummyStorageService(s3ObjectsData,
Map(
file -> (remoteKey, currentHash)
))
val result = invoke(storageService, hashService, configOptions(
ConfigOption.Source(source),
val result =
invoke(storageService,
hashService,
configOptions(ConfigOption.Source(source),
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result)
@ -143,21 +164,27 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val hash = md5Hash(file)
val sourceKey = RemoteKey("other-key")
val expected = Right(List(
val expected = Right(
List(
toCopy(sourceKey, hash, remoteKey)
))
val s3ObjectsData = S3ObjectsData(
byHash = Map(hash -> Set(KeyModified(sourceKey, lastModified))),
byHash =
Map(hash -> Set(KeyModified(sourceKey, lastModified))),
byKey = Map()
)
val storageService = DummyStorageService(s3ObjectsData, Map(
val storageService =
DummyStorageService(s3ObjectsData,
Map(
file -> (remoteKey, hash)
))
val result = invoke(storageService, hashService, configOptions(
ConfigOption.Source(source),
val result =
invoke(storageService,
hashService,
configOptions(ConfigOption.Source(source),
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result)
@ -184,12 +211,16 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
byKey = Map(remoteKey -> HashModified(hash, lastModified))
)
val storageService = DummyStorageService(s3ObjectsData, Map(
val storageService =
DummyStorageService(s3ObjectsData,
Map(
file -> (remoteKey, hash)
))
val result = invoke(storageService, hashService, configOptions(
ConfigOption.Source(source),
val result =
invoke(storageService,
hashService,
configOptions(ConfigOption.Source(source),
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result)
@ -201,7 +232,8 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
withDirectory(source => {
val hash = MD5Hash("file-content")
val expected = Right(List(
val expected = Right(
List(
toDelete(remoteKey)
))
@ -212,8 +244,10 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val storageService = DummyStorageService(s3ObjectsData, Map.empty)
val result = invoke(storageService, hashService, configOptions(
ConfigOption.Source(source),
val result =
invoke(storageService,
hashService,
configOptions(ConfigOption.Source(source),
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result)
@ -231,24 +265,30 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
"unique files in both" - {
"upload all files" in {
withDirectory(firstSource => {
val fileInFirstSource = createFile(firstSource, filename1, "file-1-content")
val fileInFirstSource =
createFile(firstSource, filename1, "file-1-content")
val hash1 = md5Hash(fileInFirstSource)
withDirectory(secondSource => {
val fileInSecondSource = createFile(secondSource, filename2, "file-2-content")
val fileInSecondSource =
createFile(secondSource, filename2, "file-2-content")
val hash2 = md5Hash(fileInSecondSource)
val expected = Right(List(
val expected = Right(
List(
toUpload(remoteKey2, hash2, secondSource, fileInSecondSource),
toUpload(remoteKey1, hash1, firstSource, fileInFirstSource)
))
val storageService = DummyStorageService(emptyS3ObjectData, Map(
fileInFirstSource -> (remoteKey1, hash1),
val storageService = DummyStorageService(
emptyS3ObjectData,
Map(fileInFirstSource -> (remoteKey1, hash1),
fileInSecondSource -> (remoteKey2, hash2)))
val result = invoke(storageService, hashService, configOptions(
ConfigOption.Source(firstSource),
val result =
invoke(storageService,
hashService,
configOptions(ConfigOption.Source(firstSource),
ConfigOption.Source(secondSource),
ConfigOption.Bucket("a-bucket")))
@ -260,23 +300,29 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
"same filename in both" - {
"only upload file in first source" in {
withDirectory(firstSource => {
val fileInFirstSource: File = createFile(firstSource, filename1, "file-1-content")
val fileInFirstSource: File =
createFile(firstSource, filename1, "file-1-content")
val hash1 = md5Hash(fileInFirstSource)
withDirectory(secondSource => {
val fileInSecondSource: File = createFile(secondSource, filename1, "file-2-content")
val fileInSecondSource: File =
createFile(secondSource, filename1, "file-2-content")
val hash2 = md5Hash(fileInSecondSource)
val expected = Right(List(
val expected = Right(
List(
toUpload(remoteKey1, hash1, firstSource, fileInFirstSource)
))
val storageService = DummyStorageService(emptyS3ObjectData, Map(
fileInFirstSource -> (remoteKey1, hash1),
val storageService = DummyStorageService(
emptyS3ObjectData,
Map(fileInFirstSource -> (remoteKey1, hash1),
fileInSecondSource -> (remoteKey2, hash2)))
val result = invoke(storageService, hashService, configOptions(
ConfigOption.Source(firstSource),
val result =
invoke(storageService,
hashService,
configOptions(ConfigOption.Source(firstSource),
ConfigOption.Source(secondSource),
ConfigOption.Bucket("a-bucket")))
@ -290,20 +336,25 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
withDirectory(firstSource => {
withDirectory(secondSource => {
val fileInSecondSource = createFile(secondSource, filename2, "file-2-content")
val fileInSecondSource =
createFile(secondSource, filename2, "file-2-content")
val hash2 = md5Hash(fileInSecondSource)
val expected = Right(List())
val s3ObjectData = S3ObjectsData(
byHash = Map(hash2 -> Set(KeyModified(remoteKey2, lastModified))),
byHash =
Map(hash2 -> Set(KeyModified(remoteKey2, lastModified))),
byKey = Map(remoteKey2 -> HashModified(hash2, lastModified)))
val storageService = DummyStorageService(s3ObjectData, Map(
fileInSecondSource -> (remoteKey2, hash2)))
val storageService = DummyStorageService(
s3ObjectData,
Map(fileInSecondSource -> (remoteKey2, hash2)))
val result = invoke(storageService, hashService, configOptions(
ConfigOption.Source(firstSource),
val result =
invoke(storageService,
hashService,
configOptions(ConfigOption.Source(firstSource),
ConfigOption.Source(secondSource),
ConfigOption.Bucket("a-bucket")))
@ -315,7 +366,8 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
"with remote file only present in first source" - {
"do not delete it" in {
withDirectory(firstSource => {
val fileInFirstSource: File = createFile(firstSource, filename1, "file-1-content")
val fileInFirstSource: File =
createFile(firstSource, filename1, "file-1-content")
val hash1 = md5Hash(fileInFirstSource)
withDirectory(secondSource => {
@ -323,14 +375,18 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val expected = Right(List())
val s3ObjectData = S3ObjectsData(
byHash = Map(hash1 -> Set(KeyModified(remoteKey1, lastModified))),
byHash =
Map(hash1 -> Set(KeyModified(remoteKey1, lastModified))),
byKey = Map(remoteKey1 -> HashModified(hash1, lastModified)))
val storageService = DummyStorageService(s3ObjectData, Map(
fileInFirstSource -> (remoteKey1, hash1)))
val storageService = DummyStorageService(
s3ObjectData,
Map(fileInFirstSource -> (remoteKey1, hash1)))
val result = invoke(storageService, hashService, configOptions(
ConfigOption.Source(firstSource),
val result =
invoke(storageService,
hashService,
configOptions(ConfigOption.Source(firstSource),
ConfigOption.Source(secondSource),
ConfigOption.Bucket("a-bucket")))
@ -345,17 +401,20 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
withDirectory(secondSource => {
val expected = Right(List(
val expected = Right(
List(
toDelete(remoteKey1)
))
val s3ObjectData = S3ObjectsData(
byKey = Map(remoteKey1 -> HashModified(MD5Hash(""), lastModified)))
val s3ObjectData = S3ObjectsData(byKey =
Map(remoteKey1 -> HashModified(MD5Hash(""), lastModified)))
val storageService = DummyStorageService(s3ObjectData, Map())
val result = invoke(storageService, hashService, configOptions(
ConfigOption.Source(firstSource),
val result =
invoke(storageService,
hashService,
configOptions(ConfigOption.Source(firstSource),
ConfigOption.Source(secondSource),
ConfigOption.Bucket("a-bucket")))
@ -378,26 +437,39 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
file: File): (String, String, String, String, String) =
("upload", remoteKey.key, md5Hash.hash, source.toString, file.toString)
private def toCopy(sourceKey: RemoteKey,
private def toCopy(
sourceKey: RemoteKey,
md5Hash: MD5Hash,
targetKey: RemoteKey): (String, String, String, String, String) =
("copy", sourceKey.key, md5Hash.hash, targetKey.key, "")
private def toDelete(remoteKey: RemoteKey): (String, String, String, String, String) =
private def toDelete(
remoteKey: RemoteKey): (String, String, String, String, String) =
("delete", remoteKey.key, "", "", "")
private def configOptions(configOptions: ConfigOption*): ConfigOptions =
ConfigOptions(List(configOptions:_*))
ConfigOptions(List(configOptions: _*))
private def invoke(storageService: StorageService,
hashService: HashService,
configOptions: ConfigOptions): Either[List[String], List[(String, String, String, String, String)]] =
planBuilder.createPlan(storageService, hashService, configOptions)
.value.unsafeRunSync().map(_.actions.toList.map({
case ToUpload(_, lf, _) => ("upload", lf.remoteKey.key, lf.hashes("md5").hash, lf.source.toString, lf.file.toString)
configOptions: ConfigOptions)
: Either[List[String], List[(String, String, String, String, String)]] =
planBuilder
.createPlan(storageService, hashService, configOptions)
.value
.unsafeRunSync()
.map(_.actions.toList.map({
case ToUpload(_, lf, _) =>
("upload",
lf.remoteKey.key,
lf.hashes("md5").hash,
lf.source.toString,
lf.file.toString)
case ToDelete(_, remoteKey, _) => ("delete", remoteKey.key, "", "", "")
case ToCopy(_, sourceKey, hash, targetKey, _) => ("copy", sourceKey.key, hash.hash, targetKey.key, "")
case DoNothing(_, remoteKey, _) => ("do-nothing", remoteKey.key, "", "", "")
case ToCopy(_, sourceKey, hash, targetKey, _) =>
("copy", sourceKey.key, hash.hash, targetKey.key, "")
case DoNothing(_, remoteKey, _) =>
("do-nothing", remoteKey.key, "", "", "")
case x => ("other", x.toString, "", "", "")
}))

View file

@ -6,37 +6,46 @@ import net.kemitix.thorp.core.S3MetaDataEnricher.{getMetadata, getS3Status}
import net.kemitix.thorp.domain._
import org.scalatest.FunSpec
class S3MetaDataEnricherSuite
extends FunSpec {
class S3MetaDataEnricherSuite extends FunSpec {
val lastModified = LastModified(Instant.now())
private val source = Resource(this, "upload")
private val sourcePath = source.toPath
private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
private val fileToKey = KeyGenerator.generateKey(config.sources, config.prefix) _
val lastModified = LastModified(Instant.now())
implicit private val config: Config =
Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
private val fileToKey =
KeyGenerator.generateKey(config.sources, config.prefix) _
def getMatchesByKey(status: (Option[HashModified], Set[(MD5Hash, KeyModified)])): Option[HashModified] = {
def getMatchesByKey(
status: (Option[HashModified], Set[(MD5Hash, KeyModified)]))
: Option[HashModified] = {
val (byKey, _) = status
byKey
}
def getMatchesByHash(status: (Option[HashModified], Set[(MD5Hash, KeyModified)])): Set[(MD5Hash, KeyModified)] = {
def getMatchesByHash(
status: (Option[HashModified], Set[(MD5Hash, KeyModified)]))
: Set[(MD5Hash, KeyModified)] = {
val (_, byHash) = status
byHash
}
describe("enrich with metadata") {
describe("#1a local exists, remote exists, remote matches, other matches - do nothing") {
describe(
"#1a local exists, remote exists, remote matches, other matches - do nothing") {
val theHash: MD5Hash = MD5Hash("the-file-hash")
val theFile: LocalFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey)
val theFile: LocalFile = LocalFile.resolve("the-file",
md5HashMap(theHash),
sourcePath,
fileToKey)
val theRemoteKey: RemoteKey = theFile.remoteKey
val s3: S3ObjectsData = S3ObjectsData(
byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))),
byKey = Map(theRemoteKey -> HashModified(theHash, lastModified))
)
val theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash, lastModified)
val theRemoteMetadata =
RemoteMetaData(theRemoteKey, theHash, lastModified)
it("generates valid metadata") {
val expected = S3MetaData(theFile,
matchByHash = Set(theRemoteMetadata),
@ -45,15 +54,20 @@ class S3MetaDataEnricherSuite
assertResult(expected)(result)
}
}
describe("#1b local exists, remote exists, remote matches, other no matches - do nothing") {
describe(
"#1b local exists, remote exists, remote matches, other no matches - do nothing") {
val theHash: MD5Hash = MD5Hash("the-file-hash")
val theFile: LocalFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey)
val theFile: LocalFile = LocalFile.resolve("the-file",
md5HashMap(theHash),
sourcePath,
fileToKey)
val theRemoteKey: RemoteKey = prefix.resolve("the-file")
val s3: S3ObjectsData = S3ObjectsData(
byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))),
byKey = Map(theRemoteKey -> HashModified(theHash, lastModified))
)
val theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash, lastModified)
val theRemoteMetadata =
RemoteMetaData(theRemoteKey, theHash, lastModified)
it("generates valid metadata") {
val expected = S3MetaData(theFile,
matchByHash = Set(theRemoteMetadata),
@ -62,15 +76,20 @@ class S3MetaDataEnricherSuite
assertResult(expected)(result)
}
}
describe("#2 local exists, remote is missing, remote no match, other matches - copy") {
describe(
"#2 local exists, remote is missing, remote no match, other matches - copy") {
val theHash = MD5Hash("the-hash")
val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey)
val theFile = LocalFile.resolve("the-file",
md5HashMap(theHash),
sourcePath,
fileToKey)
val otherRemoteKey = RemoteKey("other-key")
val s3: S3ObjectsData = S3ObjectsData(
byHash = Map(theHash -> Set(KeyModified(otherRemoteKey, lastModified))),
byKey = Map(otherRemoteKey -> HashModified(theHash, lastModified))
)
val otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash, lastModified)
val otherRemoteMetadata =
RemoteMetaData(otherRemoteKey, theHash, lastModified)
it("generates valid metadata") {
val expected = S3MetaData(theFile,
matchByHash = Set(otherRemoteMetadata),
@ -79,35 +98,43 @@ class S3MetaDataEnricherSuite
assertResult(expected)(result)
}
}
describe("#3 local exists, remote is missing, remote no match, other no matches - upload") {
describe(
"#3 local exists, remote is missing, remote no match, other no matches - upload") {
val theHash = MD5Hash("the-hash")
val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey)
val theFile = LocalFile.resolve("the-file",
md5HashMap(theHash),
sourcePath,
fileToKey)
val s3: S3ObjectsData = S3ObjectsData()
it("generates valid metadata") {
val expected = S3MetaData(theFile,
matchByHash = Set.empty,
matchByKey = None)
val expected =
S3MetaData(theFile, matchByHash = Set.empty, matchByKey = None)
val result = getMetadata(theFile, s3)
assertResult(expected)(result)
}
}
describe("#4 local exists, remote exists, remote no match, other matches - copy") {
describe(
"#4 local exists, remote exists, remote no match, other matches - copy") {
val theHash = MD5Hash("the-hash")
val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey)
val theFile = LocalFile.resolve("the-file",
md5HashMap(theHash),
sourcePath,
fileToKey)
val theRemoteKey = theFile.remoteKey
val oldHash = MD5Hash("old-hash")
val otherRemoteKey = prefix.resolve("other-key")
val s3: S3ObjectsData = S3ObjectsData(
byHash = Map(
oldHash -> Set(KeyModified(theRemoteKey, lastModified)),
byHash = Map(oldHash -> Set(KeyModified(theRemoteKey, lastModified)),
theHash -> Set(KeyModified(otherRemoteKey, lastModified))),
byKey = Map(
theRemoteKey -> HashModified(oldHash, lastModified),
otherRemoteKey -> HashModified(theHash, lastModified)
)
)
val theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash, lastModified)
val otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash, lastModified)
val theRemoteMetadata =
RemoteMetaData(theRemoteKey, oldHash, lastModified)
val otherRemoteMetadata =
RemoteMetaData(otherRemoteKey, theHash, lastModified)
it("generates valid metadata") {
val expected = S3MetaData(theFile,
matchByHash = Set(otherRemoteMetadata),
@ -116,20 +143,24 @@ class S3MetaDataEnricherSuite
assertResult(expected)(result)
}
}
describe("#5 local exists, remote exists, remote no match, other no matches - upload") {
describe(
"#5 local exists, remote exists, remote no match, other no matches - upload") {
val theHash = MD5Hash("the-hash")
val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey)
val theFile = LocalFile.resolve("the-file",
md5HashMap(theHash),
sourcePath,
fileToKey)
val theRemoteKey = theFile.remoteKey
val oldHash = MD5Hash("old-hash")
val s3: S3ObjectsData = S3ObjectsData(
byHash = Map(
oldHash -> Set(KeyModified(theRemoteKey, lastModified)),
byHash = Map(oldHash -> Set(KeyModified(theRemoteKey, lastModified)),
theHash -> Set.empty),
byKey = Map(
theRemoteKey -> HashModified(oldHash, lastModified)
)
)
val theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash, lastModified)
val theRemoteMetadata =
RemoteMetaData(theRemoteKey, oldHash, lastModified)
it("generates valid metadata") {
val expected = S3MetaData(theFile,
matchByHash = Set.empty,
@ -146,20 +177,31 @@ class S3MetaDataEnricherSuite
describe("getS3Status") {
val hash = MD5Hash("hash")
val localFile = LocalFile.resolve("the-file", md5HashMap(hash), sourcePath, fileToKey)
val localFile =
LocalFile.resolve("the-file", md5HashMap(hash), sourcePath, fileToKey)
val key = localFile.remoteKey
val keyOtherKey = LocalFile.resolve("other-key-same-hash", md5HashMap(hash), sourcePath, fileToKey)
val keyOtherKey = LocalFile.resolve("other-key-same-hash",
md5HashMap(hash),
sourcePath,
fileToKey)
val diffHash = MD5Hash("diff")
val keyDiffHash = LocalFile.resolve("other-key-diff-hash", md5HashMap(diffHash), sourcePath, fileToKey)
val keyDiffHash = LocalFile.resolve("other-key-diff-hash",
md5HashMap(diffHash),
sourcePath,
fileToKey)
val lastModified = LastModified(Instant.now)
val s3ObjectsData: S3ObjectsData = S3ObjectsData(
byHash = Map(
hash -> Set(KeyModified(key, lastModified), KeyModified(keyOtherKey.remoteKey, lastModified)),
diffHash -> Set(KeyModified(keyDiffHash.remoteKey, lastModified))),
hash -> Set(KeyModified(key, lastModified),
KeyModified(keyOtherKey.remoteKey, lastModified)),
diffHash -> Set(KeyModified(keyDiffHash.remoteKey, lastModified))
),
byKey = Map(
key -> HashModified(hash, lastModified),
keyOtherKey.remoteKey -> HashModified(hash, lastModified),
keyDiffHash.remoteKey -> HashModified(diffHash, lastModified)))
keyDiffHash.remoteKey -> HashModified(diffHash, lastModified)
)
)
def invoke(localFile: LocalFile) = {
getS3Status(localFile, s3ObjectsData)
@ -173,7 +215,10 @@ class S3MetaDataEnricherSuite
}
describe("when remote key does not exist and no others matches hash") {
val localFile = LocalFile.resolve("missing-file", md5HashMap(MD5Hash("unique")), sourcePath, fileToKey)
val localFile = LocalFile.resolve("missing-file",
md5HashMap(MD5Hash("unique")),
sourcePath,
fileToKey)
it("should return no matches by key") {
val result = getMatchesByKey(invoke(localFile))
assert(result.isEmpty)
@ -191,7 +236,9 @@ class S3MetaDataEnricherSuite
}
it("should return only itself in match by hash") {
val result = getMatchesByHash(invoke(keyDiffHash))
assert(result.equals(Set((diffHash, KeyModified(keyDiffHash.remoteKey,lastModified)))))
assert(
result.equals(
Set((diffHash, KeyModified(keyDiffHash.remoteKey, lastModified)))))
}
}

View file

@ -1,6 +1,10 @@
package net.kemitix.thorp.core
import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, UploadQueueEvent}
import net.kemitix.thorp.domain.StorageQueueEvent.{
CopyQueueEvent,
DeleteQueueEvent,
UploadQueueEvent
}
import net.kemitix.thorp.domain.{MD5Hash, RemoteKey}
import org.scalatest.FunSpec

View file

@ -8,72 +8,89 @@ import cats.data.EitherT
import cats.effect.IO
import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain.MD5HashData.{Leaf, Root}
import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, ShutdownQueueEvent, UploadQueueEvent}
import net.kemitix.thorp.domain.StorageQueueEvent.{
CopyQueueEvent,
DeleteQueueEvent,
ShutdownQueueEvent,
UploadQueueEvent
}
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.{HashService, StorageService}
import org.scalatest.FunSpec
class SyncSuite
extends FunSpec {
class SyncSuite extends FunSpec {
private val testBucket = Bucket("bucket")
private val source = Resource(this, "upload")
private val sourcePath = source.toPath
private val prefix = RemoteKey("prefix")
// source contains the files root-file and subdir/leaf-file
private val rootRemoteKey = RemoteKey("prefix/root-file")
private val leafRemoteKey = RemoteKey("prefix/subdir/leaf-file")
private val rootFile: LocalFile =
LocalFile.resolve("root-file",
md5HashMap(Root.hash),
sourcePath,
_ => rootRemoteKey)
implicit private val logger: Logger = new DummyLogger
private val leafFile: LocalFile =
LocalFile.resolve("subdir/leaf-file",
md5HashMap(Leaf.hash),
sourcePath,
_ => leafRemoteKey)
private val hashService =
DummyHashService(
Map(
file("root-file") -> Map("md5" -> MD5HashData.Root.hash),
file("subdir/leaf-file") -> Map("md5" -> MD5HashData.Leaf.hash)
))
private val configOptions =
ConfigOptions(List(
ConfigOptions(
List(
ConfigOption.Source(sourcePath),
ConfigOption.Bucket("bucket"),
ConfigOption.Prefix("prefix"),
ConfigOption.IgnoreGlobalOptions,
ConfigOption.IgnoreUserOptions
))
implicit private val logger: Logger = new DummyLogger
private val lastModified = LastModified(Instant.now)
def putObjectRequest(bucket: Bucket, remoteKey: RemoteKey, localFile: LocalFile): (String, String, File) =
def putObjectRequest(bucket: Bucket,
remoteKey: RemoteKey,
localFile: LocalFile): (String, String, File) =
(bucket.name, remoteKey.key, localFile.file)
val testBucket = Bucket("bucket")
// source contains the files root-file and subdir/leaf-file
val rootRemoteKey = RemoteKey("prefix/root-file")
val leafRemoteKey = RemoteKey("prefix/subdir/leaf-file")
val rootFile: LocalFile =
LocalFile.resolve("root-file", md5HashMap(Root.hash), sourcePath, _ => rootRemoteKey)
val leafFile: LocalFile =
LocalFile.resolve("subdir/leaf-file", md5HashMap(Leaf.hash), sourcePath, _ => leafRemoteKey)
private def md5HashMap(md5Hash: MD5Hash): Map[String, MD5Hash] =
Map("md5" -> md5Hash)
val hashService =
DummyHashService(Map(
file("root-file") -> Map("md5" -> MD5HashData.Root.hash),
file("subdir/leaf-file") -> Map("md5" -> MD5HashData.Leaf.hash)
))
private def file(filename: String) =
sourcePath.resolve(Paths.get(filename))
def invokeSubject(storageService: StorageService,
hashService: HashService,
configOptions: ConfigOptions): Either[List[String], SyncPlan] = {
PlanBuilder.createPlan(storageService, hashService, configOptions).value.unsafeRunSync
}
def invokeSubjectForActions(storageService: StorageService,
def invokeSubjectForActions(
storageService: StorageService,
hashService: HashService,
configOptions: ConfigOptions): Either[List[String], Stream[Action]] = {
invokeSubject(storageService, hashService, configOptions)
.map(_.actions)
}
def invokeSubject(
storageService: StorageService,
hashService: HashService,
configOptions: ConfigOptions): Either[List[String], SyncPlan] = {
PlanBuilder
.createPlan(storageService, hashService, configOptions)
.value
.unsafeRunSync
}
private def md5HashMap(md5Hash: MD5Hash): Map[String, MD5Hash] =
Map("md5" -> md5Hash)
private def file(filename: String) =
sourcePath.resolve(Paths.get(filename))
describe("when all files should be uploaded") {
val storageService = new RecordingStorageService(testBucket, S3ObjectsData())
val storageService =
new RecordingStorageService(testBucket, S3ObjectsData())
it("uploads all files") {
val expected = Right(Set(
ToUpload(testBucket, rootFile, rootFile.file.length),
val expected = Right(
Set(ToUpload(testBucket, rootFile, rootFile.file.length),
ToUpload(testBucket, leafFile, leafFile.file.length)))
val result = invokeSubjectForActions(storageService, hashService, configOptions)
val result =
invokeSubjectForActions(storageService, hashService, configOptions)
assertResult(expected)(result.map(_.toSet))
}
}
@ -81,15 +98,21 @@ class SyncSuite
describe("when no files should be uploaded") {
val s3ObjectsData = S3ObjectsData(
byHash = Map(
Root.hash -> Set(KeyModified(RemoteKey("prefix/root-file"), lastModified)),
Leaf.hash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))),
Root.hash -> Set(
KeyModified(RemoteKey("prefix/root-file"), lastModified)),
Leaf.hash -> Set(
KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))
),
byKey = Map(
RemoteKey("prefix/root-file") -> HashModified(Root.hash, lastModified),
RemoteKey("prefix/subdir/leaf-file") -> HashModified(Leaf.hash, lastModified)))
RemoteKey("prefix/subdir/leaf-file") -> HashModified(Leaf.hash,
lastModified))
)
val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
it("no actions") {
val expected = Stream()
val result = invokeSubjectForActions(storageService, hashService, configOptions)
val result =
invokeSubjectForActions(storageService, hashService, configOptions)
assert(result.isRight)
assertResult(expected)(result.right.get)
}
@ -99,19 +122,27 @@ class SyncSuite
val targetKey = RemoteKey("prefix/root-file")
// 'root-file-old' should be renamed as 'root-file'
val s3ObjectsData = S3ObjectsData(
byHash = Map(
Root.hash -> Set(KeyModified(sourceKey, lastModified)),
Leaf.hash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))),
byKey = Map(
sourceKey -> HashModified(Root.hash, lastModified),
RemoteKey("prefix/subdir/leaf-file") -> HashModified(Leaf.hash, lastModified)))
byHash =
Map(Root.hash -> Set(KeyModified(sourceKey, lastModified)),
Leaf.hash -> Set(
KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))),
byKey =
Map(sourceKey -> HashModified(Root.hash, lastModified),
RemoteKey("prefix/subdir/leaf-file") -> HashModified(Leaf.hash,
lastModified))
)
val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
it("copies the file and deletes the original") {
val expected = Stream(
ToCopy(testBucket, sourceKey, Root.hash, targetKey, rootFile.file.length),
ToCopy(testBucket,
sourceKey,
Root.hash,
targetKey,
rootFile.file.length),
ToDelete(testBucket, sourceKey, 0L)
)
val result = invokeSubjectForActions(storageService, hashService, configOptions)
val result =
invokeSubjectForActions(storageService, hashService, configOptions)
assert(result.isRight)
assertResult(expected)(result.right.get)
}
@ -126,19 +157,27 @@ class SyncSuite
val deletedKey = RemoteKey("prefix/deleted-file")
val s3ObjectsData = S3ObjectsData(
byHash = Map(
Root.hash -> Set(KeyModified(RemoteKey("prefix/root-file"), lastModified)),
Leaf.hash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified)),
deletedHash -> Set(KeyModified(RemoteKey("prefix/deleted-file"), lastModified))),
Root.hash -> Set(
KeyModified(RemoteKey("prefix/root-file"), lastModified)),
Leaf.hash -> Set(
KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified)),
deletedHash -> Set(
KeyModified(RemoteKey("prefix/deleted-file"), lastModified))
),
byKey = Map(
RemoteKey("prefix/root-file") -> HashModified(Root.hash, lastModified),
RemoteKey("prefix/subdir/leaf-file") -> HashModified(Leaf.hash, lastModified),
deletedKey -> HashModified(deletedHash, lastModified)))
RemoteKey("prefix/subdir/leaf-file") -> HashModified(Leaf.hash,
lastModified),
deletedKey -> HashModified(deletedHash, lastModified)
)
)
val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
it("deleted key") {
val expected = Stream(
ToDelete(testBucket, deletedKey, 0L)
)
val result = invokeSubjectForActions(storageService,hashService, configOptions)
val result =
invokeSubjectForActions(storageService, hashService, configOptions)
assert(result.isRight)
assertResult(expected)(result.right.get)
}
@ -146,15 +185,23 @@ class SyncSuite
describe("when a file is excluded") {
val s3ObjectsData = S3ObjectsData(
byHash = Map(
Root.hash -> Set(KeyModified(RemoteKey("prefix/root-file"), lastModified)),
Leaf.hash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))),
Root.hash -> Set(
KeyModified(RemoteKey("prefix/root-file"), lastModified)),
Leaf.hash -> Set(
KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))
),
byKey = Map(
RemoteKey("prefix/root-file") -> HashModified(Root.hash, lastModified),
RemoteKey("prefix/subdir/leaf-file") -> HashModified(Leaf.hash, lastModified)))
RemoteKey("prefix/subdir/leaf-file") -> HashModified(Leaf.hash,
lastModified))
)
val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
it("is not uploaded") {
val expected = Stream()
val result = invokeSubjectForActions(storageService, hashService, ConfigOption.Exclude("leaf") :: configOptions)
val result =
invokeSubjectForActions(storageService,
hashService,
ConfigOption.Exclude("leaf") :: configOptions)
assert(result.isRight)
assertResult(expected)(result.right.get)
}
@ -164,9 +211,8 @@ class SyncSuite
s3ObjectsData: S3ObjectsData)
extends StorageService {
override def listObjects(bucket: Bucket,
prefix: RemoteKey)
(implicit l: Logger): EitherT[IO, String, S3ObjectsData] =
override def listObjects(bucket: Bucket, prefix: RemoteKey)(
implicit l: Logger): EitherT[IO, String, S3ObjectsData] =
EitherT.liftF(IO.pure(s3ObjectsData))
override def upload(localFile: LocalFile,

View file

@ -16,16 +16,26 @@ trait TemporaryFolder {
}
def remove(root: Path): Unit = {
Files.walkFileTree(root, new SimpleFileVisitor[Path] {
override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = {
Files.walkFileTree(
root,
new SimpleFileVisitor[Path] {
override def visitFile(file: Path,
attrs: BasicFileAttributes): FileVisitResult = {
Files.delete(file)
FileVisitResult.CONTINUE
}
override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult = {
override def postVisitDirectory(dir: Path,
exc: IOException): FileVisitResult = {
Files.delete(dir)
FileVisitResult.CONTINUE
}
})
}
)
}
def createFile(path: Path, name: String, content: String*): File = {
writeFile(path, name, content: _*)
path.resolve(name).toFile
}
def writeFile(directory: Path, name: String, contents: String*): Unit = {
@ -35,9 +45,4 @@ trait TemporaryFolder {
pw.close()
}
def createFile(path: Path, name: String, content: String*): File = {
writeFile(path, name, content:_*)
path.resolve(name).toFile
}
}

View file

@ -1,3 +1,5 @@
package net.kemitix.thorp.domain
final case class Bucket(name: String)
final case class Bucket(
name: String
)

View file

@ -1,8 +1,10 @@
package net.kemitix.thorp.domain
final case class Config(bucket: Bucket = Bucket(""),
final case class Config(
bucket: Bucket = Bucket(""),
prefix: RemoteKey = RemoteKey(""),
filters: List[Filter] = List(),
debug: Boolean = false,
batchMode: Boolean = false,
sources: Sources = Sources(List()))
sources: Sources = Sources(List())
)

View file

@ -7,22 +7,6 @@ sealed trait Filter
object Filter {
case class Include(include: String = ".*") extends Filter {
private lazy val predicate = Pattern.compile(include).asPredicate
def isIncluded(path: Path): Boolean = predicate.test(path.toString)
}
case class Exclude(exclude: String) extends Filter {
private lazy val predicate = Pattern.compile(exclude).asPredicate()
def isExcluded(path: Path): Boolean = predicate.test(path.toString)
}
def isIncluded(filters: List[Filter])(p: Path): Boolean = {
sealed trait State
case class Unknown() extends State
@ -38,11 +22,32 @@ object Filter {
}) match {
case Accepted() => true
case Discarded() => false
case Unknown() => filters.forall {
case Unknown() =>
filters.forall {
case _: Include => false
case _ => true
}
}
}
case class Include(
include: String = ".*"
) extends Filter {
private lazy val predicate = Pattern.compile(include).asPredicate
def isIncluded(path: Path): Boolean = predicate.test(path.toString)
}
case class Exclude(
exclude: String
) extends Filter {
private lazy val predicate = Pattern.compile(exclude).asPredicate()
def isExcluded(path: Path): Boolean = predicate.test(path.toString)
}
}

View file

@ -1,4 +1,6 @@
package net.kemitix.thorp.domain
final case class HashModified(hash: MD5Hash,
modified: LastModified)
final case class HashModified(
hash: MD5Hash,
modified: LastModified
)

View file

@ -1,4 +1,6 @@
package net.kemitix.thorp.domain
final case class KeyModified(key: RemoteKey,
modified: LastModified)
final case class KeyModified(
key: RemoteKey,
modified: LastModified
)

View file

@ -2,4 +2,6 @@ package net.kemitix.thorp.domain
import java.time.Instant
final case class LastModified(when: Instant = Instant.now)
final case class LastModified(
when: Instant = Instant.now
)

View file

@ -3,7 +3,12 @@ package net.kemitix.thorp.domain
import java.io.File
import java.nio.file.Path
final case class LocalFile(file: File, source: File, hashes: Map[String, MD5Hash], remoteKey: RemoteKey) {
final case class LocalFile(
file: File,
source: File,
hashes: Map[String, MD5Hash],
remoteKey: RemoteKey
) {
require(!file.isDirectory, s"LocalFile must not be a directory: $file")
@ -19,11 +24,18 @@ final case class LocalFile(file: File, source: File, hashes: Map[String, MD5Hash
}
object LocalFile {
def resolve(path: String,
def resolve(
path: String,
md5Hashes: Map[String, MD5Hash],
source: Path,
pathToKey: Path => RemoteKey): LocalFile = {
pathToKey: Path => RemoteKey
): LocalFile = {
val resolvedPath = source.resolve(path)
LocalFile(resolvedPath.toFile, source.toFile, md5Hashes, pathToKey(resolvedPath))
LocalFile(resolvedPath.toFile,
source.toFile,
md5Hashes,
pathToKey(resolvedPath))
}
}

View file

@ -4,7 +4,9 @@ import java.util.Base64
import net.kemitix.thorp.domain.QuoteStripper.stripQuotes
final case class MD5Hash(in: String) {
final case class MD5Hash(
in: String
) {
lazy val hash: String = in filter stripQuotes

View file

@ -3,9 +3,24 @@ package net.kemitix.thorp.domain
import java.io.File
import java.nio.file.{Path, Paths}
final case class RemoteKey(key: String) {
final case class RemoteKey(
key: String
) {
def asFile(source: Path, prefix: RemoteKey): Option[File] =
def isMissingLocally(
sources: Sources,
prefix: RemoteKey
): Boolean =
!sources.paths.exists(source =>
asFile(source, prefix) match {
case Some(file) => file.exists
case None => false
})
def asFile(
source: Path,
prefix: RemoteKey
): Option[File] =
if (key.length == 0) None
else Some(source.resolve(relativeTo(prefix)).toFile)
@ -16,12 +31,6 @@ final case class RemoteKey(key: String) {
}
}
def isMissingLocally(sources: Sources, prefix: RemoteKey): Boolean =
!sources.paths.exists(source => asFile(source, prefix) match {
case Some(file) => file.exists
case None => false
})
def resolve(path: String): RemoteKey =
RemoteKey(List(key, path).filterNot(_.isEmpty).mkString("/"))

View file

@ -1,5 +1,7 @@
package net.kemitix.thorp.domain
final case class RemoteMetaData(remoteKey: RemoteKey,
final case class RemoteMetaData(
remoteKey: RemoteKey,
hash: MD5Hash,
lastModified: LastModified)
lastModified: LastModified
)

View file

@ -4,4 +4,5 @@ package net.kemitix.thorp.domain
final case class S3MetaData(
localFile: LocalFile,
matchByHash: Set[RemoteMetaData],
matchByKey: Option[RemoteMetaData])
matchByKey: Option[RemoteMetaData]
)

View file

@ -3,5 +3,7 @@ package net.kemitix.thorp.domain
/**
* A list of objects and their MD5 hash values.
*/
final case class S3ObjectsData(byHash: Map[MD5Hash, Set[KeyModified]] = Map.empty,
byKey: Map[RemoteKey, HashModified] = Map.empty)
final case class S3ObjectsData(
byHash: Map[MD5Hash, Set[KeyModified]] = Map.empty,
byKey: Map[RemoteKey, HashModified] = Map.empty
)

View file

@ -8,11 +8,10 @@ object SizeTranslation {
def sizeInEnglish(length: Long): String =
length.toDouble match {
case bytes if bytes > gbLimit => f"${bytes / 1024 / 1024 /1024}%.3fGb"
case bytes if bytes > gbLimit => f"${bytes / 1024 / 1024 / 1024}%.3fGb"
case bytes if bytes > mbLimit => f"${bytes / 1024 / 1024}%.2fMb"
case bytes if bytes > kbLimit => f"${bytes / 1024}%.0fKb"
case bytes => s"${length}b"
}
}

View file

@ -12,11 +12,15 @@ import java.nio.file.Path
*
* A path should only occur once in paths.
*/
case class Sources(paths: List[Path]) {
case class Sources(
paths: List[Path]
) {
def ++(path: Path): Sources = this ++ List(path)
def ++(otherPaths: List[Path]): Sources = Sources(
otherPaths.foldLeft(paths)((acc, path) => if (acc.contains(path)) acc else acc ++ List(path))
)
otherPaths.foldLeft(paths) { (acc, path) =>
if (acc.contains(path)) acc
else acc ++ List(path)
})
/**
* Returns the source path for the given path.

View file

@ -8,24 +8,35 @@ sealed trait StorageQueueEvent {
object StorageQueueEvent {
final case class DoNothingQueueEvent(remoteKey: RemoteKey) extends StorageQueueEvent {
final case class DoNothingQueueEvent(
remoteKey: RemoteKey
) extends StorageQueueEvent {
override val order: Int = 0
}
final case class CopyQueueEvent(remoteKey: RemoteKey) extends StorageQueueEvent {
final case class CopyQueueEvent(
remoteKey: RemoteKey
) extends StorageQueueEvent {
override val order: Int = 1
}
final case class UploadQueueEvent(remoteKey: RemoteKey,
md5Hash: MD5Hash) extends StorageQueueEvent {
final case class UploadQueueEvent(
remoteKey: RemoteKey,
md5Hash: MD5Hash
) extends StorageQueueEvent {
override val order: Int = 2
}
final case class DeleteQueueEvent(remoteKey: RemoteKey) extends StorageQueueEvent {
final case class DeleteQueueEvent(
remoteKey: RemoteKey
) extends StorageQueueEvent {
override val order: Int = 3
}
final case class ErrorQueueEvent(remoteKey: RemoteKey, e: Throwable) extends StorageQueueEvent {
final case class ErrorQueueEvent(
remoteKey: RemoteKey,
e: Throwable
) extends StorageQueueEvent {
override val order: Int = 10
}

View file

@ -1,5 +1,7 @@
package net.kemitix.thorp.domain
case class SyncTotals(count: Long = 0L,
case class SyncTotals(
count: Long = 0L,
totalSizeBytes: Long = 0L,
sizeUploadedBytes: Long = 0L)
sizeUploadedBytes: Long = 0L
)

View file

@ -2,55 +2,8 @@ package net.kemitix.thorp.domain
object Terminal {
private val esc = "\u001B"
private val csi = esc + "["
/**
* Move the cursor up, default 1 line.
*
* Stops at the edge of the screen.
*/
def cursorUp(lines: Int = 1): String = csi + lines + "A"
/**
* Move the cursor down, default 1 line.
*
* Stops at the edge of the screen.
*/
def cursorDown(lines: Int = 1): String = csi + lines + "B"
/**
* Move the cursor forward, default 1 column.
*
* Stops at the edge of the screen.
*/
def cursorForward(cols: Int = 1): String = csi + cols + "C"
/**
* Move the cursor back, default 1 column,
*
* Stops at the edge of the screen.
*/
def cursorBack(cols: Int = 1): String = csi + cols + "D"
/**
* Move the cursor to the beginning of the line, default 1, down.
*/
def cursorNextLine(lines: Int = 1): String = csi + lines + "E"
/**
* Move the cursor to the beginning of the line, default 1, up.
*/
def cursorPrevLine(lines: Int = 1): String = csi + lines + "F"
/**
* Move the cursor to the column on the current line.
*/
def cursorHorizAbs(col: Int): String = csi + col + "G"
/**
* Move the cursor to the position on screen (1,1 is the top-left).
*/
def cursorPosition(row: Int, col: Int): String = csi + row + ";" + col + "H"
val esc: String = "\u001B"
val csi: String = esc + "["
/**
* Clear from cursor to end of screen.
@ -97,16 +50,6 @@ object Terminal {
*/
val eraseLine: String = csi + "2K"
/**
* Scroll page up, default 1, lines.
*/
def scrollUp(lines: Int = 1): String = csi + lines + "S"
/**
* Scroll page down, default 1, lines.
*/
def scrollDown(lines: Int = 1): String = csi + lines + "T"
/**
* Saves the cursor position/state.
*/
@ -116,10 +59,74 @@ object Terminal {
* Restores the cursor position/state.
*/
val restoreCursorPosition: String = csi + "u"
val enableAlternateBuffer: String = csi + "?1049h"
val disableAlternateBuffer: String = csi + "?1049l"
private val subBars = Map(0 -> " ",
1 -> "▏",
2 -> "▎",
3 -> "▍",
4 -> "▌",
5 -> "▋",
6 -> "▊",
7 -> "▉")
/**
* Move the cursor up, default 1 line.
*
* Stops at the edge of the screen.
*/
def cursorUp(lines: Int = 1): String = csi + lines + "A"
/**
* Move the cursor down, default 1 line.
*
* Stops at the edge of the screen.
*/
def cursorDown(lines: Int = 1): String = csi + lines + "B"
/**
* Move the cursor forward, default 1 column.
*
* Stops at the edge of the screen.
*/
def cursorForward(cols: Int = 1): String = csi + cols + "C"
/**
* Move the cursor back, default 1 column,
*
* Stops at the edge of the screen.
*/
def cursorBack(cols: Int = 1): String = csi + cols + "D"
/**
* Move the cursor to the beginning of the line, default 1, down.
*/
def cursorNextLine(lines: Int = 1): String = csi + lines + "E"
/**
* Move the cursor to the beginning of the line, default 1, up.
*/
def cursorPrevLine(lines: Int = 1): String = csi + lines + "F"
/**
* Move the cursor to the column on the current line.
*/
def cursorHorizAbs(col: Int): String = csi + col + "G"
/**
* Move the cursor to the position on screen (1,1 is the top-left).
*/
def cursorPosition(row: Int, col: Int): String = csi + row + ";" + col + "H"
/**
* Scroll page up, default 1, lines.
*/
def scrollUp(lines: Int = 1): String = csi + lines + "S"
/**
* Scroll page down, default 1, lines.
*/
def scrollDown(lines: Int = 1): String = csi + lines + "T"
/**
* The Width of the terminal, as reported by the COLUMNS environment variable.
@ -135,17 +142,11 @@ object Terminal {
.getOrElse(80)
}
private val subBars = Map(
0 -> " ",
1 -> "▏",
2 -> "▎",
3 -> "▍",
4 -> "▌",
5 -> "▋",
6 -> "▊",
7 -> "▉")
def progressBar(pos: Double, max: Double, width: Int): String = {
def progressBar(
pos: Double,
max: Double,
width: Int
): String = {
val barWidth = width - 2
val phases = subBars.values.size
val pxWidth = barWidth * phases

View file

@ -6,12 +6,18 @@ sealed trait UploadEvent {
object UploadEvent {
final case class TransferEvent(name: String) extends UploadEvent
final case class TransferEvent(
name: String
) extends UploadEvent
final case class RequestEvent(name: String,
final case class RequestEvent(
name: String,
bytes: Long,
transferred: Long) extends UploadEvent
transferred: Long
) extends UploadEvent
final case class ByteTransferEvent(name: String) extends UploadEvent
final case class ByteTransferEvent(
name: String
) extends UploadEvent
}

View file

@ -3,17 +3,24 @@ package net.kemitix.thorp.domain
import net.kemitix.thorp.domain.UploadEvent.RequestEvent
import net.kemitix.thorp.domain.UploadEventLogger.logRequestCycle
class UploadEventListener(localFile: LocalFile,
class UploadEventListener(
localFile: LocalFile,
index: Int,
syncTotals: SyncTotals,
totalBytesSoFar: Long) {
totalBytesSoFar: Long
) {
var bytesTransferred = 0L
def listener: UploadEvent => Unit = {
case e: RequestEvent =>
bytesTransferred += e.transferred
logRequestCycle(localFile, e, bytesTransferred, index, syncTotals, totalBytesSoFar)
logRequestCycle(localFile,
e,
bytesTransferred,
index,
syncTotals,
totalBytesSoFar)
case _ => ()
}
}

View file

@ -8,12 +8,14 @@ import scala.io.AnsiColor._
trait UploadEventLogger {
def logRequestCycle(localFile: LocalFile,
def logRequestCycle(
localFile: LocalFile,
event: RequestEvent,
bytesTransferred: Long,
index: Int,
syncTotals: SyncTotals,
totalBytesSoFar: Long): Unit = {
totalBytesSoFar: Long
): Unit = {
val remoteKey = localFile.remoteKey.key
val fileLength = localFile.file.length
val statusHeight = 7
@ -22,17 +24,22 @@ trait UploadEventLogger {
s"${GREEN}Uploading:$RESET $remoteKey$eraseToEndOfScreen\n" +
statusWithBar(" File", sizeInEnglish, bytesTransferred, fileLength) +
statusWithBar("Files", l => l.toString, index, syncTotals.count) +
statusWithBar(" Size", sizeInEnglish, bytesTransferred + totalBytesSoFar, syncTotals.totalSizeBytes) +
statusWithBar(" Size",
sizeInEnglish,
bytesTransferred + totalBytesSoFar,
syncTotals.totalSizeBytes) +
s"${Terminal.cursorPrevLine(statusHeight)}")
} else
println(s"${GREEN}Uploaded:$RESET $remoteKey$eraseToEndOfScreen")
}
private def statusWithBar(label: String,
private def statusWithBar(
label: String,
format: Long => String,
current: Long,
max: Long,
pre: Long = 0): String = {
pre: Long = 0
): String = {
val percent = f"${(current * 100) / max}%2d"
s"$GREEN$label:$RESET ($percent%) ${format(current)} of ${format(max)}" +
(if (pre > 0) s" (pre-synced ${format(pre)}"

View file

@ -13,7 +13,8 @@ class FiltersSuite extends FunSpec {
private val path4 = "/path/to/another/file"
private val path5 = "/home/pcampbell/repos/kemitix/s3thorp"
private val path6 = "/kemitix/s3thorp/upload/subdir"
private val paths = List(path1, path2, path3, path4, path5, path6).map(Paths.get(_))
private val paths =
List(path1, path2, path3, path4, path5, path6).map(Paths.get(_))
describe("Include") {

View file

@ -6,7 +6,6 @@ object MD5HashData {
val hash = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e")
val base64 = "o6asEaDrV3uBs7tclcyKbg=="
}
object Leaf {
val hash = MD5Hash("208386a650bdec61cfcd7bd8dcb6b542")
val base64 = "IIOGplC97GHPzXvY3La1Qg=="

View file

@ -27,7 +27,8 @@ class SizeTranslationTest extends FunSpec {
}
describe("when size is over 10Gb") {
it("should be in Gb with three decimal place") {
assertResult("5468.168Gb")(SizeTranslation.sizeInEnglish(5871400857278L))
assertResult("5468.168Gb")(
SizeTranslation.sizeInEnglish(5871400857278L))
}
}
}

View file

@ -10,7 +10,6 @@ import net.kemitix.thorp.domain.{Logger, MD5Hash}
*/
trait HashService {
def hashLocalObject(path: Path)
(implicit l: Logger): IO[Map[String, MD5Hash]]
def hashLocalObject(path: Path)(implicit l: Logger): IO[Map[String, MD5Hash]]
}

View file

@ -8,22 +8,29 @@ trait StorageService {
def shutdown: IO[StorageQueueEvent]
def listObjects(bucket: Bucket,
prefix: RemoteKey)
(implicit l: Logger): EitherT[IO, String, S3ObjectsData]
def listObjects(
bucket: Bucket,
prefix: RemoteKey
)(implicit l: Logger): EitherT[IO, String, S3ObjectsData]
def upload(localFile: LocalFile,
def upload(
localFile: LocalFile,
bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener,
tryCount: Int): IO[StorageQueueEvent]
tryCount: Int
): IO[StorageQueueEvent]
def copy(bucket: Bucket,
def copy(
bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey): IO[StorageQueueEvent]
targetKey: RemoteKey
): IO[StorageQueueEvent]
def delete(bucket: Bucket,
remoteKey: RemoteKey): IO[StorageQueueEvent]
def delete(
bucket: Bucket,
remoteKey: RemoteKey
): IO[StorageQueueEvent]
}

View file

@ -0,0 +1,11 @@
package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.model.PutObjectRequest
import com.amazonaws.services.s3.transfer.TransferManager
case class AmazonTransferManager(transferManager: TransferManager) {
def shutdownNow(now: Boolean): Unit = transferManager.shutdownNow(now)
def upload(putObjectRequest: PutObjectRequest): AmazonUpload =
AmazonUpload(transferManager.upload(putObjectRequest))
}

View file

@ -0,0 +1,8 @@
package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.transfer.Upload
import com.amazonaws.services.s3.transfer.model.UploadResult
case class AmazonUpload(upload: Upload) {
def waitForUploadResult: UploadResult = upload.waitForUploadResult()
}

View file

@ -8,21 +8,29 @@ import net.kemitix.thorp.domain._
class Copier(amazonS3: AmazonS3) {
def copy(bucket: Bucket,
def copy(
bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey): IO[StorageQueueEvent] =
targetKey: RemoteKey
): IO[StorageQueueEvent] =
for {
_ <- copyObject(bucket, sourceKey, hash, targetKey)
} yield CopyQueueEvent(targetKey)
private def copyObject(bucket: Bucket,
private def copyObject(
bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey) = {
targetKey: RemoteKey
) = {
val request =
new CopyObjectRequest(bucket.name, sourceKey.key, bucket.name, targetKey.key)
.withMatchingETagConstraint(hash.hash)
new CopyObjectRequest(
bucket.name,
sourceKey.key,
bucket.name,
targetKey.key
).withMatchingETagConstraint(hash.hash)
IO(amazonS3.copyObject(request))
}

View file

@ -4,17 +4,24 @@ import cats.effect.IO
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.DeleteObjectRequest
import net.kemitix.thorp.domain.StorageQueueEvent.DeleteQueueEvent
import net.kemitix.thorp.domain.{Bucket, RemoteKey}
import net.kemitix.thorp.domain.{Bucket, RemoteKey, StorageQueueEvent}
class Deleter(amazonS3: AmazonS3) {
def delete(bucket: Bucket,
remoteKey: RemoteKey): IO[DeleteQueueEvent] =
def delete(
bucket: Bucket,
remoteKey: RemoteKey
): IO[StorageQueueEvent] =
for {
_ <- deleteObject(bucket, remoteKey)
} yield DeleteQueueEvent(remoteKey)
private def deleteObject(bucket: Bucket, remoteKey: RemoteKey) =
IO(amazonS3.deleteObject(new DeleteObjectRequest(bucket.name, remoteKey.key)))
private def deleteObject(
bucket: Bucket,
remoteKey: RemoteKey
) = {
val request = new DeleteObjectRequest(bucket.name, remoteKey.key)
IO(amazonS3.deleteObject(request))
}
}

View file

@ -12,11 +12,14 @@ import net.kemitix.thorp.domain.{Logger, MD5Hash}
trait ETagGenerator {
def eTag(path: Path)(implicit l: Logger): IO[String]= {
def eTag(
path: Path
)(implicit l: Logger): IO[String] = {
val partSize = calculatePartSize(path)
val parts = numParts(path.toFile.length, partSize)
partsIndex(parts)
.map(digestChunk(path, partSize)).sequence
.map(digestChunk(path, partSize))
.sequence
.map(concatenateDigests)
.map(MD5HashGenerator.hex)
.map(hash => s"$hash-$parts")
@ -34,20 +37,37 @@ trait ETagGenerator {
TransferManagerUtils.calculateOptimalPartSize(request, configuration)
}
private def numParts(fileLength: Long, optimumPartSize: Long) = {
private def numParts(
fileLength: Long,
optimumPartSize: Long
) = {
val fullParts = Math.floorDiv(fileLength, optimumPartSize)
val incompletePart = if (Math.floorMod(fileLength, optimumPartSize) > 0) 1 else 0
val incompletePart =
if (Math.floorMod(fileLength, optimumPartSize) > 0) 1
else 0
fullParts + incompletePart
}
def offsets(totalFileSizeBytes: Long, optimalPartSize: Long): List[Long] =
Range.Long(0, totalFileSizeBytes, optimalPartSize).toList
def digestChunk(path: Path, chunkSize: Long)(chunkNumber: Long)(implicit l: Logger): IO[Array[Byte]] =
def digestChunk(
path: Path,
chunkSize: Long
)(
chunkNumber: Long
)(implicit l: Logger): IO[Array[Byte]] =
hashChunk(path, chunkNumber, chunkSize).map(_.digest)
def hashChunk(path: Path, chunkNumber: Long, chunkSize: Long)(implicit l: Logger): IO[MD5Hash] =
def hashChunk(
path: Path,
chunkNumber: Long,
chunkSize: Long
)(implicit l: Logger): IO[MD5Hash] =
MD5HashGenerator.md5FileChunk(path, chunkNumber * chunkSize, chunkSize)
def offsets(
totalFileSizeBytes: Long,
optimalPartSize: Long
): List[Long] =
Range.Long(0, totalFileSizeBytes, optimalPartSize).toList
}
object ETagGenerator extends ETagGenerator

View file

@ -2,6 +2,7 @@ package net.kemitix.thorp.storage.aws
import cats.data.EitherT
import cats.effect.IO
import cats.implicits._
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary}
import net.kemitix.thorp.domain
@ -17,31 +18,37 @@ class Lister(amazonS3: AmazonS3) {
private type Token = String
private type Batch = (Stream[S3ObjectSummary], Option[Token])
def listObjects(bucket: Bucket,
prefix: RemoteKey)
(implicit l: Logger): EitherT[IO, String, S3ObjectsData] = {
def listObjects(
bucket: Bucket,
prefix: RemoteKey
)(implicit l: Logger): EitherT[IO, String, S3ObjectsData] = {
val requestMore = (token: Token) => new ListObjectsV2Request()
val requestMore = (token: Token) =>
new ListObjectsV2Request()
.withBucketName(bucket.name)
.withPrefix(prefix.key)
.withContinuationToken(token)
def fetchBatch: ListObjectsV2Request => EitherT[IO, String, Batch] =
request => EitherT {
request =>
EitherT {
for {
_ <- ListerLogger.logFetchBatch
batch <- tryFetchBatch(request)
} yield batch
}
def fetchMore(more: Option[Token]): EitherT[IO, String, Stream[S3ObjectSummary]] = {
def fetchMore(
more: Option[Token]
): EitherT[IO, String, Stream[S3ObjectSummary]] = {
more match {
case None => EitherT.right(IO.pure(Stream.empty))
case Some(token) => fetch(requestMore(token))
}
}
def fetch: ListObjectsV2Request => EitherT[IO, String, Stream[S3ObjectSummary]] =
def fetch
: ListObjectsV2Request => EitherT[IO, String, Stream[S3ObjectSummary]] =
request => {
for {
batch <- fetchBatch(request)
@ -51,11 +58,16 @@ class Lister(amazonS3: AmazonS3) {
}
for {
summaries <- fetch(new ListObjectsV2Request().withBucketName(bucket.name).withPrefix(prefix.key))
summaries <- fetch(
new ListObjectsV2Request()
.withBucketName(bucket.name)
.withPrefix(prefix.key))
} yield domain.S3ObjectsData(byHash(summaries), byKey(summaries))
}
private def tryFetchBatch(request: ListObjectsV2Request): IO[Either[String, (Stream[S3ObjectSummary], Option[Token])]] = {
private def tryFetchBatch(
request: ListObjectsV2Request
): IO[Either[String, (Stream[S3ObjectSummary], Option[Token])]] = {
IO {
Try(amazonS3.listObjectsV2(request))
.map { result =>
@ -63,7 +75,9 @@ class Lister(amazonS3: AmazonS3) {
if (result.isTruncated) Some(result.getNextContinuationToken)
else None
(result.getObjectSummaries.asScala.toStream, more)
}.toEither.swap.map(e => e.getMessage).swap
}
.toEither
.leftMap(e => e.getMessage)
}
}
}

View file

@ -4,6 +4,7 @@ import cats.effect.IO
import net.kemitix.thorp.domain.Logger
trait ListerLogger {
def logFetchBatch(implicit l: Logger): IO[Unit] = l.info("Fetching remote summaries...")
def logFetchBatch(implicit l: Logger): IO[Unit] =
l.info("Fetching remote summaries...")
}
object ListerLogger extends ListerLogger

View file

@ -15,12 +15,14 @@ trait S3HashService extends HashService {
* @param path the local path to scan
* @return a set of hash values
*/
override def hashLocalObject(path: Path)
(implicit l: Logger): IO[Map[String, MD5Hash]] =
override def hashLocalObject(
path: Path
)(implicit l: Logger): IO[Map[String, MD5Hash]] =
for {
md5 <- MD5HashGenerator.md5File(path)
etag <- ETagGenerator.eTag(path).map(MD5Hash(_))
} yield Map(
} yield
Map(
"md5" -> md5,
"etag" -> etag
)

View file

@ -5,14 +5,19 @@ import net.kemitix.thorp.domain.{KeyModified, LastModified, MD5Hash, RemoteKey}
object S3ObjectsByHash {
def byHash(os: Stream[S3ObjectSummary]): Map[MD5Hash, Set[KeyModified]] = {
def byHash(
os: Stream[S3ObjectSummary]
): Map[MD5Hash, Set[KeyModified]] = {
val mD5HashToS3Objects: Map[MD5Hash, Stream[S3ObjectSummary]] =
os.groupBy(o => MD5Hash(o.getETag.filter{c => c != '"'}))
val hashToModifieds: Map[MD5Hash, Set[KeyModified]] =
os.groupBy(o => MD5Hash(o.getETag.filter(_ != '"')))
mD5HashToS3Objects.mapValues { os =>
os.map { o =>
KeyModified(RemoteKey(o.getKey), LastModified(o.getLastModified.toInstant))}.toSet }
hashToModifieds
KeyModified(
RemoteKey(o.getKey),
LastModified(o.getLastModified.toInstant)
)
}.toSet
}
}
}

View file

@ -5,12 +5,14 @@ import net.kemitix.thorp.domain.{HashModified, LastModified, MD5Hash, RemoteKey}
object S3ObjectsByKey {
def byKey(os: Stream[S3ObjectSummary]) =
os.map { o => {
def byKey(os: Stream[S3ObjectSummary]): Map[RemoteKey, HashModified] =
os.map { o =>
{
val remoteKey = RemoteKey(o.getKey)
val hash = MD5Hash(o.getETag)
val lastModified = LastModified(o.getLastModified.toInstant)
(remoteKey, HashModified(hash, lastModified))
}}.toMap
}
}.toMap
}

View file

@ -3,45 +3,52 @@ package net.kemitix.thorp.storage.aws
import cats.data.EitherT
import cats.effect.IO
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.transfer.TransferManager
import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.StorageService
class S3StorageService(amazonS3Client: => AmazonS3,
amazonS3TransferManager: => TransferManager)
extends StorageService {
class S3StorageService(
amazonS3Client: => AmazonS3,
amazonTransferManager: => AmazonTransferManager
) extends StorageService {
lazy val objectLister = new Lister(amazonS3Client)
lazy val copier = new Copier(amazonS3Client)
lazy val uploader = new Uploader(amazonS3TransferManager)
lazy val uploader = new Uploader(amazonTransferManager)
lazy val deleter = new Deleter(amazonS3Client)
override def listObjects(bucket: Bucket,
prefix: RemoteKey)
(implicit l: Logger): EitherT[IO, String, S3ObjectsData] =
override def listObjects(
bucket: Bucket,
prefix: RemoteKey
)(implicit l: Logger): EitherT[IO, String, S3ObjectsData] =
objectLister.listObjects(bucket, prefix)
override def copy(bucket: Bucket,
override def copy(
bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey): IO[StorageQueueEvent] =
copier.copy(bucket, sourceKey,hash, targetKey)
targetKey: RemoteKey
): IO[StorageQueueEvent] =
copier.copy(bucket, sourceKey, hash, targetKey)
override def upload(localFile: LocalFile,
override def upload(
localFile: LocalFile,
bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener,
tryCount: Int): IO[StorageQueueEvent] =
tryCount: Int
): IO[StorageQueueEvent] =
uploader.upload(localFile, bucket, batchMode, uploadEventListener, 1)
override def delete(bucket: Bucket,
remoteKey: RemoteKey): IO[StorageQueueEvent] =
override def delete(
bucket: Bucket,
remoteKey: RemoteKey
): IO[StorageQueueEvent] =
deleter.delete(bucket, remoteKey)
override def shutdown: IO[StorageQueueEvent] =
IO {
amazonS3TransferManager.shutdownNow(true)
amazonTransferManager.shutdownNow(true)
amazonS3Client.shutdown()
ShutdownQueueEvent()
}

View file

@ -1,18 +1,24 @@
package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.transfer.{TransferManager, TransferManagerBuilder}
import com.amazonaws.services.s3.transfer.TransferManagerBuilder
import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder}
import net.kemitix.thorp.storage.api.StorageService
object S3StorageServiceBuilder {
def createService(amazonS3Client: AmazonS3,
amazonS3TransferManager: TransferManager): StorageService =
new S3StorageService(amazonS3Client, amazonS3TransferManager)
def createService(
amazonS3Client: AmazonS3,
amazonTransferManager: AmazonTransferManager
): StorageService =
new S3StorageService(
amazonS3Client,
amazonTransferManager
)
lazy val defaultStorageService: StorageService =
createService(
AmazonS3ClientBuilder.defaultClient,
TransferManagerBuilder.defaultTransferManager)
AmazonTransferManager(TransferManagerBuilder.defaultTransferManager)
)
}

View file

@ -4,32 +4,42 @@ import cats.effect.IO
import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener}
import com.amazonaws.services.s3.model.{ObjectMetadata, PutObjectRequest}
import com.amazonaws.services.s3.transfer.model.UploadResult
import com.amazonaws.services.s3.transfer.{TransferManager => AmazonTransferManager}
import net.kemitix.thorp.domain.StorageQueueEvent.{ErrorQueueEvent, UploadQueueEvent}
import net.kemitix.thorp.domain.UploadEvent.{ByteTransferEvent, RequestEvent, TransferEvent}
import net.kemitix.thorp.domain.StorageQueueEvent.{
ErrorQueueEvent,
UploadQueueEvent
}
import net.kemitix.thorp.domain.UploadEvent.{
ByteTransferEvent,
RequestEvent,
TransferEvent
}
import net.kemitix.thorp.domain.{StorageQueueEvent, _}
import scala.util.Try
class Uploader(transferManager: => AmazonTransferManager) {
def upload(localFile: LocalFile,
def upload(
localFile: LocalFile,
bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener,
tryCount: Int): IO[StorageQueueEvent] =
tryCount: Int
): IO[StorageQueueEvent] =
for {
upload <- transfer(localFile, bucket, batchMode, uploadEventListener)
action = upload match {
case Right(r) => UploadQueueEvent(RemoteKey(r.getKey), MD5Hash(r.getETag))
case Right(r) =>
UploadQueueEvent(RemoteKey(r.getKey), MD5Hash(r.getETag))
case Left(e) => ErrorQueueEvent(localFile.remoteKey, e)
}
} yield action
private def transfer(localFile: LocalFile,
private def transfer(
localFile: LocalFile,
bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener,
uploadEventListener: UploadEventListener
): IO[Either[Throwable, UploadResult]] = {
val listener: ProgressListener = progressListener(uploadEventListener)
val putObjectRequest = request(localFile, bucket, batchMode, listener)
@ -40,13 +50,16 @@ class Uploader(transferManager: => AmazonTransferManager) {
}
}
private def request(localFile: LocalFile,
private def request(
localFile: LocalFile,
bucket: Bucket,
batchMode: Boolean,
listener: ProgressListener): PutObjectRequest = {
listener: ProgressListener
): PutObjectRequest = {
val metadata = new ObjectMetadata()
localFile.md5base64.foreach(metadata.setContentMD5)
val request = new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file)
val request =
new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file)
.withMetadata(metadata)
if (batchMode) request
else request.withGeneralProgressListener(listener)
@ -54,9 +67,8 @@ class Uploader(transferManager: => AmazonTransferManager) {
private def progressListener(uploadEventListener: UploadEventListener) =
new ProgressListener {
override def progressChanged(progressEvent: ProgressEvent): Unit = {
override def progressChanged(progressEvent: ProgressEvent): Unit =
uploadEventListener.listener(eventHandler(progressEvent))
}
private def eventHandler(progressEvent: ProgressEvent) = {
progressEvent match {

View file

@ -7,7 +7,7 @@ class DummyLogger extends Logger {
override def debug(message: => String): IO[Unit] = IO.unit
override def info(message: =>String): IO[Unit] = IO.unit
override def info(message: => String): IO[Unit] = IO.unit
override def warn(message: String): IO[Unit] = IO.unit

View file

@ -16,9 +16,12 @@ class ETagGeneratorTest extends FunSpec {
describe("Create offsets") {
it("should create offsets") {
val offsets = ETagGenerator.offsets(bigFile.length, chunkSize)
val offsets = ETagGenerator
.offsets(bigFile.length, chunkSize)
.foldRight(List[Long]())((l: Long, a: List[Long]) => l :: a)
assertResult(List(0, chunkSize, chunkSize * 2, chunkSize * 3, chunkSize * 4))(offsets)
assertResult(
List(0, chunkSize, chunkSize * 2, chunkSize * 3, chunkSize * 4))(
offsets)
}
}
@ -35,8 +38,12 @@ class ETagGeneratorTest extends FunSpec {
"5bd6e10a99fef100fe7bf5eaa0a42384",
"8a0c1d0778ac8fcf4ca2010eba4711eb"
).zipWithIndex
md5Hashes.foreach { case (hash, index) =>
test(hash, ETagGenerator.hashChunk(bigFilePath, index, chunkSize)(logger).unsafeRunSync)
md5Hashes.foreach {
case (hash, index) =>
test(hash,
ETagGenerator
.hashChunk(bigFilePath, index, chunkSize)(logger)
.unsafeRunSync)
}
}
}

View file

@ -20,7 +20,8 @@ class S3ObjectsByHashSuite extends FunSpec {
val os = Stream(o1, o2)
it("should group by the hash value") {
val expected: Map[MD5Hash, Set[KeyModified]] = Map(
hash -> Set(KeyModified(key1, lastModified), KeyModified(key2, lastModified))
hash -> Set(KeyModified(key1, lastModified),
KeyModified(key2, lastModified))
)
val result = S3ObjectsByHash.byHash(os)
assertResult(expected)(result)

View file

@ -5,22 +5,24 @@ import java.time.temporal.ChronoUnit
import java.util.Date
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.{ListObjectsV2Request, ListObjectsV2Result, S3ObjectSummary}
import com.amazonaws.services.s3.transfer.TransferManager
import com.amazonaws.services.s3.model.{
ListObjectsV2Request,
ListObjectsV2Result,
S3ObjectSummary
}
import net.kemitix.thorp.core.Resource
import net.kemitix.thorp.domain._
import org.scalamock.scalatest.MockFactory
import org.scalatest.FunSpec
class S3StorageServiceSuite
extends FunSpec
with MockFactory {
class S3StorageServiceSuite extends FunSpec with MockFactory {
describe("listObjectsInPrefix") {
val source = Resource(this, "upload")
val sourcePath = source.toPath
val prefix = RemoteKey("prefix")
implicit val config: Config = Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
implicit val config: Config =
Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
implicit val implLogger: Logger = new DummyLogger
val lm = LastModified(Instant.now.truncatedTo(ChronoUnit.MILLIS))
@ -29,7 +31,9 @@ class S3StorageServiceSuite
val k1a = RemoteKey("key1a")
def objectSummary(hash: MD5Hash, remoteKey: RemoteKey, lastModified: LastModified) = {
def objectSummary(hash: MD5Hash,
remoteKey: RemoteKey,
lastModified: LastModified) = {
val summary = new S3ObjectSummary()
summary.setETag(hash.hash)
summary.setKey(remoteKey.key)
@ -47,7 +51,7 @@ class S3StorageServiceSuite
val o2 = objectSummary(h2, k2, lm)
val amazonS3 = stub[AmazonS3]
val amazonS3TransferManager = stub[TransferManager]
val amazonS3TransferManager = stub[AmazonTransferManager]
val storageService = new S3StorageService(amazonS3, amazonS3TransferManager)
val myFakeResponse = new ListObjectsV2Result()
@ -55,18 +59,24 @@ class S3StorageServiceSuite
summaries.add(o1a)
summaries.add(o1b)
summaries.add(o2)
(amazonS3 listObjectsV2 (_: ListObjectsV2Request)).when(*).returns(myFakeResponse)
(amazonS3 listObjectsV2 (_: ListObjectsV2Request))
.when(*)
.returns(myFakeResponse)
it("should build list of hash lookups, with duplicate objects grouped by hash") {
val expected = Right(S3ObjectsData(
byHash = Map(
h1 -> Set(KeyModified(k1a, lm), KeyModified(k1b, lm)),
it(
"should build list of hash lookups, with duplicate objects grouped by hash") {
val expected = Right(
S3ObjectsData(
byHash = Map(h1 -> Set(KeyModified(k1a, lm), KeyModified(k1b, lm)),
h2 -> Set(KeyModified(k2, lm))),
byKey = Map(
k1a -> HashModified(h1, lm),
byKey = Map(k1a -> HashModified(h1, lm),
k1b -> HashModified(h1, lm),
k2 -> HashModified(h2, lm))))
val result = storageService.listObjects(Bucket("bucket"), RemoteKey("prefix")).value.unsafeRunSync
k2 -> HashModified(h2, lm))
))
val result = storageService
.listObjects(Bucket("bucket"), RemoteKey("prefix"))
.value
.unsafeRunSync
assertResult(expected)(result)
}
}

View file

@ -5,7 +5,6 @@ import java.time.Instant
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.PutObjectRequest
import com.amazonaws.services.s3.transfer.model.UploadResult
import com.amazonaws.services.s3.transfer.{TransferManager, Upload}
import net.kemitix.thorp.core.{KeyGenerator, Resource, S3MetaDataEnricher}
import net.kemitix.thorp.domain.MD5HashData.Root
import net.kemitix.thorp.domain.StorageQueueEvent.UploadQueueEvent
@ -13,49 +12,65 @@ import net.kemitix.thorp.domain._
import org.scalamock.scalatest.MockFactory
import org.scalatest.FunSpec
class StorageServiceSuite
extends FunSpec
with MockFactory {
class StorageServiceSuite extends FunSpec with MockFactory {
val source = Resource(this, "upload")
val sourcePath = source.toPath
private val source = Resource(this, "upload")
private val sourcePath = source.toPath
private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
implicit private val config: Config =
Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
implicit private val implLogger: Logger = new DummyLogger
private val fileToKey = KeyGenerator.generateKey(config.sources, config.prefix) _
private val fileToKey =
KeyGenerator.generateKey(config.sources, config.prefix) _
describe("getS3Status") {
val hash = MD5Hash("hash")
val localFile = LocalFile.resolve("the-file", md5HashMap(hash), sourcePath, fileToKey)
val localFile =
LocalFile.resolve("the-file", md5HashMap(hash), sourcePath, fileToKey)
val key = localFile.remoteKey
val keyOtherKey = LocalFile.resolve("other-key-same-hash", md5HashMap(hash), sourcePath, fileToKey)
val keyOtherKey = LocalFile.resolve("other-key-same-hash",
md5HashMap(hash),
sourcePath,
fileToKey)
val diffHash = MD5Hash("diff")
val keyDiffHash = LocalFile.resolve("other-key-diff-hash", md5HashMap(diffHash), sourcePath, fileToKey)
val keyDiffHash = LocalFile.resolve("other-key-diff-hash",
md5HashMap(diffHash),
sourcePath,
fileToKey)
val lastModified = LastModified(Instant.now)
val s3ObjectsData: S3ObjectsData = S3ObjectsData(
byHash = Map(
hash -> Set(KeyModified(key, lastModified), KeyModified(keyOtherKey.remoteKey, lastModified)),
diffHash -> Set(KeyModified(keyDiffHash.remoteKey, lastModified))),
hash -> Set(KeyModified(key, lastModified),
KeyModified(keyOtherKey.remoteKey, lastModified)),
diffHash -> Set(KeyModified(keyDiffHash.remoteKey, lastModified))
),
byKey = Map(
key -> HashModified(hash, lastModified),
keyOtherKey.remoteKey -> HashModified(hash, lastModified),
keyDiffHash.remoteKey -> HashModified(diffHash, lastModified)))
keyDiffHash.remoteKey -> HashModified(diffHash, lastModified)
)
)
def invoke(localFile: LocalFile) =
S3MetaDataEnricher.getS3Status(localFile, s3ObjectsData)
def getMatchesByKey(status: (Option[HashModified], Set[(MD5Hash, KeyModified)])): Option[HashModified] = {
def getMatchesByKey(
status: (Option[HashModified], Set[(MD5Hash, KeyModified)]))
: Option[HashModified] = {
val (byKey, _) = status
byKey
}
def getMatchesByHash(status: (Option[HashModified], Set[(MD5Hash, KeyModified)])): Set[(MD5Hash, KeyModified)] = {
def getMatchesByHash(
status: (Option[HashModified], Set[(MD5Hash, KeyModified)]))
: Set[(MD5Hash, KeyModified)] = {
val (_, byHash) = status
byHash
}
describe("when remote key exists, unmodified and other key matches the hash") {
describe(
"when remote key exists, unmodified and other key matches the hash") {
it("should return the match by key") {
val result = getMatchesByKey(invoke(localFile))
assert(result.contains(HashModified(hash, lastModified)))
@ -63,15 +78,17 @@ class StorageServiceSuite
it("should return both matches for the hash") {
val result = getMatchesByHash(invoke(localFile))
assertResult(
Set(
(hash, KeyModified(key, lastModified)),
Set((hash, KeyModified(key, lastModified)),
(hash, KeyModified(keyOtherKey.remoteKey, lastModified)))
)(result)
}
}
describe("when remote key does not exist and no others matches hash") {
val localFile = LocalFile.resolve("missing-file", md5HashMap(MD5Hash("unique")), sourcePath, fileToKey)
val localFile = LocalFile.resolve("missing-file",
md5HashMap(MD5Hash("unique")),
sourcePath,
fileToKey)
it("should return no matches by key") {
val result = getMatchesByKey(invoke(localFile))
assert(result.isEmpty)
@ -91,17 +108,15 @@ class StorageServiceSuite
it("should return one match by hash") {
val result = getMatchesByHash(invoke(localFile))
assertResult(
Set(
(diffHash, KeyModified(keyDiffHash.remoteKey, lastModified)))
Set((diffHash, KeyModified(keyDiffHash.remoteKey, lastModified)))
)(result)
}
}
}
private def md5HashMap(hash: MD5Hash) = {
private def md5HashMap(hash: MD5Hash) =
Map("md5" -> hash)
}
val batchMode: Boolean = true
@ -109,18 +124,25 @@ class StorageServiceSuite
describe("when uploading a file") {
val amazonS3 = stub[AmazonS3]
val amazonS3TransferManager = stub[TransferManager]
val storageService = new S3StorageService(amazonS3, amazonS3TransferManager)
val amazonTransferManager = stub[AmazonTransferManager]
val storageService =
new S3StorageService(amazonS3, amazonTransferManager)
val prefix = RemoteKey("prefix")
val localFile =
LocalFile.resolve("root-file", md5HashMap(Root.hash), sourcePath, KeyGenerator.generateKey(config.sources, prefix))
LocalFile.resolve("root-file",
md5HashMap(Root.hash),
sourcePath,
KeyGenerator.generateKey(config.sources, prefix))
val bucket = Bucket("a-bucket")
val remoteKey = RemoteKey("prefix/root-file")
val uploadEventListener = new UploadEventListener(localFile, 1, SyncTotals(), 0L)
val uploadEventListener =
new UploadEventListener(localFile, 1, SyncTotals(), 0L)
val upload = stub[Upload]
(amazonS3TransferManager upload (_: PutObjectRequest)).when(*).returns(upload)
val upload = stub[AmazonUpload]
(amazonTransferManager upload (_: PutObjectRequest))
.when(*)
.returns(upload)
val uploadResult = stub[UploadResult]
(upload.waitForUploadResult _).when().returns(uploadResult)
(uploadResult.getETag _).when().returns(Root.hash.hash)
@ -130,7 +152,12 @@ class StorageServiceSuite
pending
//FIXME: works okay on its own, but fails when run with others
val expected = UploadQueueEvent(remoteKey, Root.hash)
val result = storageService.upload(localFile, bucket, batchMode, uploadEventListener, 1)
val result =
storageService.upload(localFile,
bucket,
batchMode,
uploadEventListener,
1)
assertResult(expected)(result)
}
}

View file

@ -1,7 +1,5 @@
package net.kemitix.thorp.storage.aws
import java.time.Instant
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.transfer._
import net.kemitix.thorp.core.KeyGenerator.generateKey
@ -11,24 +9,18 @@ import net.kemitix.thorp.domain.{UploadEventListener, _}
import org.scalamock.scalatest.MockFactory
import org.scalatest.FunSpec
class UploaderSuite
extends FunSpec
with MockFactory {
class UploaderSuite extends FunSpec with MockFactory {
private val batchMode: Boolean = true
private val source = Resource(this, ".")
private val sourcePath = source.toPath
private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
implicit private val implLogger: Logger = new DummyLogger
implicit private val config: Config =
Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
private val fileToKey = generateKey(config.sources, config.prefix) _
val lastModified = LastModified(Instant.now())
implicit private val implLogger: Logger = new DummyLogger
def md5HashMap(hash: MD5Hash): Map[String, MD5Hash] =
Map(
"md5" -> hash
)
val batchMode: Boolean = true
def md5HashMap(hash: MD5Hash): Map[String, MD5Hash] = Map("md5" -> hash)
describe("S3ClientMultiPartTransferManagerSuite") {
describe("upload") {
@ -39,14 +31,24 @@ class UploaderSuite
// dies when putObject is called
val returnedKey = RemoteKey("returned-key")
val returnedHash = MD5Hash("returned-hash")
val bigFile = LocalFile.resolve("small-file", md5HashMap(MD5Hash("the-hash")), sourcePath, fileToKey)
val uploadEventListener = new UploadEventListener(bigFile, 1, SyncTotals(), 0L)
val bigFile = LocalFile.resolve("small-file",
md5HashMap(MD5Hash("the-hash")),
sourcePath,
fileToKey)
val uploadEventListener =
new UploadEventListener(bigFile, 1, SyncTotals(), 0L)
val amazonS3 = mock[AmazonS3]
val amazonS3TransferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build
val uploader = new Uploader(amazonS3TransferManager)
val amazonTransferManager =
AmazonTransferManager(
TransferManagerBuilder.standard().withS3Client(amazonS3).build)
val uploader = new Uploader(amazonTransferManager)
it("should upload") {
val expected = UploadQueueEvent(returnedKey, returnedHash)
val result = uploader.upload(bigFile, config.bucket, batchMode, uploadEventListener, 1)
val result = uploader.upload(bigFile,
config.bucket,
batchMode,
uploadEventListener,
1)
assertResult(expected)(result)
}
}