Sync more than one source directory into a single bucket/prefix (#25)

* [changelog] updated

* [readme] updated

* [core] ConfigQuery added sources()

* [cli] ParseArgs allow specifying multiple sources

* [domain,core,cli] Source datatype changed to Path

* [domain] Sources added to hold multiple paths in order

* [domain] Config sources change datatype to Sources

* [core] Scan sources for .thorp.config and include any sources listed

This allows the inclusion of a `.thorp.config` file in a source with a
single line `source = ....` that causes that other source to also be
synched into the same remote prefix as the current source.

* [core] ConfigurationBuilderTest add more pending tests

* [[core] ConfigurationBuilderTest rewrite using loan-pattern for fixtures

* [core] ConfigOptionTest use TemporaryFolder

* [core] ConfigOptionTest remove unused fields

* [cli] ParseArgsTest don't use get on an Option

* [core] ConfigurationBuilderTest don't use get on Either

* [core] TemporaryFolder Move import to top of file

* [core] TemporaryFolder use Try over try-finally

* [core] ConfigurationBuilderTest don't use get on Either

* [core] TemporaryFolders.withDirectory propogate errors

* [core] TemporaryFolders add writeFile and createFile

* [core] PlanBuilderTest create a plan with two sources with unique files in both

* [core] ActionGenerator only upload file by name in first source

create a plan
  two sources
    same filename in both
    - only upload file in first source

* [domain] LastModified with no params is now()

* [core] PlanBuilderTest 2 sources w/remote only in 2nd src do nothing

* [core] PlanBuilderTest 2 sources w/remote only in 2nd src do nothing

* [domain] RemoteKey map to a file when prefix is empty

* [domain] S3ObjectData defaults to empty

* [core] KeyGenerator Avoid delimiter when empty prefix key

* [core] PlanBuilderTest when remote not in sources delete from remote

* [core] PlanBuilderTest extract helper md5Hash()

* [core] PlanBuilderTest one source a file no matching remote key

* [core] PlanBuildingTest file with matching key and hash do nothing

* [core] PlanBuilderTest file w/matching remote key and different hash

* [core] PlanBuilderTest a remote key with and without local file

* [core] DummyStorageService Use wildcards when selecting more than 6 elements
This commit is contained in:
Paul Campbell 2019-07-12 07:42:42 +01:00 committed by GitHub
parent a6c767f16f
commit b15350d959
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
42 changed files with 1130 additions and 227 deletions

View file

@ -12,6 +12,7 @@ The format is based on [[https://keepachangelog.com/en/1.0.0/][Keep a Changelog]
- Add a version command-line option (#99) - Add a version command-line option (#99)
- Add a batch mode (#85) - Add a batch mode (#85)
- Display total size and progress for entire run (#94) - Display total size and progress for entire run (#94)
- Sync more than one source directory into a single bucket/prefile (#25)
* [0.6.1] - 2019-07-03 * [0.6.1] - 2019-07-03

View file

@ -33,6 +33,10 @@ If you don't provide a ~source~ the current diretory will be used.
The ~--include~ and ~--exclude~ parameters can be used more than once. The ~--include~ and ~--exclude~ parameters can be used more than once.
The ~--source~ parameter can be used more than once, in which case,
all files in all sources will be consolidated into the same
bucket/prefix.
** Batch mode ** Batch mode
Batch mode disable the ANSI console display and logs simple messages Batch mode disable the ANSI console display and logs simple messages
@ -46,15 +50,18 @@ that can be written to a file.
- User: ~ ~/.config/thorp.conf~ - User: ~ ~/.config/thorp.conf~
- Source: ~${source}/.thorp.conf~ - Source: ~${source}/.thorp.conf~
Command line arguments override those in Source, which override those Command line arguments override those in Source, which override
in User, which override those Global, which override any built-in those in User, which override those Global, which override any
config. built-in config.
Built-in config consists of using the current working directory as the When there is more than one source, only the first ".thorp.conf"
~source~. file found will be used.
Note, that ~include~ and ~exclude~ are cumulative across all Built-in config consists of using the current working directory as
configuration files. the ~source~.
Note, that ~include~ and ~exclude~ are cumulative across all
configuration files.
* Behaviour * Behaviour

View file

@ -20,6 +20,7 @@ object ParseArgs {
.action((_, cos) => ConfigOption.BatchMode :: cos) .action((_, cos) => ConfigOption.BatchMode :: cos)
.text("Enable batch-mode"), .text("Enable batch-mode"),
opt[String]('s', "source") opt[String]('s', "source")
.unbounded()
.action((str, cos) => ConfigOption.Source(Paths.get(str)) :: cos) .action((str, cos) => ConfigOption.Source(Paths.get(str)) :: cos)
.text("Source directory to sync to destination"), .text("Source directory to sync to destination"),
opt[String]('b', "bucket") opt[String]('b', "bucket")

View file

@ -1,7 +1,9 @@
package net.kemitix.thorp.cli package net.kemitix.thorp.cli
import java.nio.file.Paths
import net.kemitix.thorp.core.ConfigOption.Debug import net.kemitix.thorp.core.ConfigOption.Debug
import net.kemitix.thorp.core.{ConfigOptions, Resource} import net.kemitix.thorp.core.{ConfigOptions, ConfigQuery, Resource}
import org.scalatest.FunSpec import org.scalatest.FunSpec
import scala.util.Try import scala.util.Try
@ -15,14 +17,27 @@ class ParseArgsTest extends FunSpec {
ParseArgs(List("--source", path, "--bucket", "bucket")) ParseArgs(List("--source", path, "--bucket", "bucket"))
describe("when source is a directory") { describe("when source is a directory") {
val result = invokeWithSource(pathTo("."))
it("should succeed") { it("should succeed") {
val result = invokeWithSource(pathTo("."))
assert(result.isDefined) assert(result.isDefined)
} }
} }
describe("when source is a relative path to a directory") { describe("when source is a relative path to a directory") {
val result = invokeWithSource(pathTo("."))
it("should succeed") {pending} it("should succeed") {pending}
} }
describe("when there are multiple sources") {
val args = List(
"--source", "path1",
"--source", "path2",
"--bucket", "bucket")
it("should get multiple sources") {
val expected = Some(Set("path1", "path2").map(Paths.get(_)))
val configOptions = ParseArgs(args)
val result = configOptions.map(ConfigQuery.sources(_).paths.toSet)
assertResult(expected)(result)
}
}
} }
describe("parse - debug") { describe("parse - debug") {

View file

@ -1,22 +1,24 @@
package net.kemitix.thorp.cli package net.kemitix.thorp.cli
import java.io.File import java.io.File
import java.nio.file.Path
import cats.data.EitherT import cats.data.EitherT
import cats.effect.IO import cats.effect.IO
import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.core._ import net.kemitix.thorp.core._
import net.kemitix.thorp.domain.{Bucket, LocalFile, Logger, MD5Hash, RemoteKey, StorageQueueEvent} import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.{HashService, StorageService} import net.kemitix.thorp.storage.api.{HashService, StorageService}
import org.scalatest.FunSpec import org.scalatest.FunSpec
class ProgramTest extends FunSpec { class ProgramTest extends FunSpec {
val source: File = Resource(this, ".") val source: File = Resource(this, ".")
val sourcePath: Path = source.toPath
val bucket: Bucket = Bucket("aBucket") val bucket: Bucket = Bucket("aBucket")
val hash: MD5Hash = MD5Hash("aHash") val hash: MD5Hash = MD5Hash("aHash")
val copyAction: Action = ToCopy(bucket, RemoteKey("copy-me"), hash, RemoteKey("overwrite-me"), 17L) val copyAction: Action = ToCopy(bucket, RemoteKey("copy-me"), hash, RemoteKey("overwrite-me"), 17L)
val uploadAction: Action = ToUpload(bucket, LocalFile.resolve("aFile", Map(), source, _ => RemoteKey("upload-me")), 23L) val uploadAction: Action = ToUpload(bucket, LocalFile.resolve("aFile", Map(), sourcePath, _ => RemoteKey("upload-me")), 23L)
val deleteAction: Action = ToDelete(bucket, RemoteKey("delete-me"), 0L) val deleteAction: Action = ToDelete(bucket, RemoteKey("delete-me"), 0L)
val configOptions: ConfigOptions = ConfigOptions(options = List( val configOptions: ConfigOptions = ConfigOptions(options = List(

View file

@ -5,7 +5,17 @@ import net.kemitix.thorp.domain._
object ActionGenerator { object ActionGenerator {
def createActions(s3MetaData: S3MetaData) def remoteNameNotAlreadyQueued(localFile: LocalFile,
previousActions: Stream[Action]): Boolean = {
val key = localFile.remoteKey.key
!previousActions.exists {
case ToUpload(_, lf, _) => lf.remoteKey.key equals key
case _ => false
}
}
def createActions(s3MetaData: S3MetaData,
previousActions: Stream[Action])
(implicit c: Config): Stream[Action] = (implicit c: Config): Stream[Action] =
s3MetaData match { s3MetaData match {
@ -21,7 +31,8 @@ object ActionGenerator {
// #3 local exists, remote is missing, other no matches - upload // #3 local exists, remote is missing, other no matches - upload
case S3MetaData(localFile, otherMatches, None) case S3MetaData(localFile, otherMatches, None)
if otherMatches.isEmpty if otherMatches.isEmpty &&
remoteNameNotAlreadyQueued(localFile, previousActions)
=> uploadFile(c.bucket, localFile) => uploadFile(c.bucket, localFile)
// #4 local exists, remote exists, remote no match, other matches - copy // #4 local exists, remote exists, remote no match, other matches - copy
@ -35,6 +46,8 @@ object ActionGenerator {
if hashMatches.isEmpty if hashMatches.isEmpty
=> uploadFile(c.bucket, localFile) => uploadFile(c.bucket, localFile)
case S3MetaData(localFile, _, _) =>
doNothing(c.bucket, localFile.remoteKey)
} }
private def doNothing(bucket: Bucket, private def doNothing(bucket: Bucket,

View file

@ -17,13 +17,21 @@ object ConfigOption {
override def update(config: Config): Config = config.copy(batchMode = true) override def update(config: Config): Config = config.copy(batchMode = true)
} }
case class Source(path: Path) extends ConfigOption { case class Source(path: Path) extends ConfigOption {
override def update(config: Config): Config = config.copy(source = path.toFile) override def update(config: Config): Config = config.copy(sources = config.sources ++ path)
} }
case class Bucket(name: String) extends ConfigOption { case class Bucket(name: String) extends ConfigOption {
override def update(config: Config): Config = config.copy(bucket = domain.Bucket(name)) override def update(config: Config): Config =
if (config.bucket.name.isEmpty)
config.copy(bucket = domain.Bucket(name))
else
config
} }
case class Prefix(path: String) extends ConfigOption { case class Prefix(path: String) extends ConfigOption {
override def update(config: Config): Config = config.copy(prefix = RemoteKey(path)) override def update(config: Config): Config =
if (config.prefix.key.isEmpty)
config.copy(prefix = RemoteKey(path))
else
config
} }
case class Include(pattern: String) extends ConfigOption { case class Include(pattern: String) extends ConfigOption {
override def update(config: Config): Config = config.copy(filters = domain.Filter.Include(pattern) :: config.filters) override def update(config: Config): Config = config.copy(filters = domain.Filter.Include(pattern) :: config.filters)

View file

@ -1,5 +1,7 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import net.kemitix.thorp.domain.Sources
trait ConfigQuery { trait ConfigQuery {
def showVersion(configOptions: ConfigOptions): Boolean = def showVersion(configOptions: ConfigOptions): Boolean =
@ -14,6 +16,13 @@ trait ConfigQuery {
def ignoreGlobalOptions(configOptions: ConfigOptions): Boolean = def ignoreGlobalOptions(configOptions: ConfigOptions): Boolean =
configOptions contains ConfigOption.IgnoreGlobalOptions configOptions contains ConfigOption.IgnoreGlobalOptions
def sources(configOptions: ConfigOptions): Sources = {
val paths = configOptions.options.flatMap( {
case ConfigOption.Source(sourcePath) => Some(sourcePath)
case _ => None
})
Sources(paths)
}
} }
object ConfigQuery extends ConfigQuery object ConfigQuery extends ConfigQuery

View file

@ -1,33 +1,40 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import java.io.File import java.nio.file.Path
import cats.data.{NonEmptyChain, Validated, ValidatedNec} import cats.data.{NonEmptyChain, Validated, ValidatedNec}
import cats.implicits._ import cats.implicits._
import net.kemitix.thorp.domain.{Bucket, Config} import net.kemitix.thorp.domain.{Bucket, Config, Sources}
sealed trait ConfigValidator { sealed trait ConfigValidator {
type ValidationResult[A] = ValidatedNec[ConfigValidation, A] type ValidationResult[A] = ValidatedNec[ConfigValidation, A]
def validateSourceIsDirectory(source: File): ValidationResult[File] = def validateSourceIsDirectory(source: Path): ValidationResult[Path] =
if(source.isDirectory) source.validNec if(source.toFile.isDirectory) source.validNec
else ConfigValidation.SourceIsNotADirectory.invalidNec else ConfigValidation.SourceIsNotADirectory.invalidNec
def validateSourceIsReadable(source: File): ValidationResult[File] = def validateSourceIsReadable(source: Path): ValidationResult[Path] =
if(source.canRead) source.validNec if(source.toFile.canRead) source.validNec
else ConfigValidation.SourceIsNotReadable.invalidNec else ConfigValidation.SourceIsNotReadable.invalidNec
def validateSource(source: File): ValidationResult[File] = def validateSource(source: Path): ValidationResult[Path] =
validateSourceIsDirectory(source).andThen(s => validateSourceIsReadable(s)) validateSourceIsDirectory(source)
.andThen(s =>
validateSourceIsReadable(s))
def validateBucket(bucket: Bucket): ValidationResult[Bucket] = def validateBucket(bucket: Bucket): ValidationResult[Bucket] =
if (bucket.name.isEmpty) ConfigValidation.BucketNameIsMissing.invalidNec if (bucket.name.isEmpty) ConfigValidation.BucketNameIsMissing.invalidNec
else bucket.validNec else bucket.validNec
def validateSources(sources: Sources): ValidationResult[Sources] =
sources.paths
.map(validateSource).sequence
.map(_ => sources)
def validateConfig(config: Config): Validated[NonEmptyChain[ConfigValidation], Config] = def validateConfig(config: Config): Validated[NonEmptyChain[ConfigValidation], Config] =
( (
validateSource(config.source), validateSources(config.sources),
validateBucket(config.bucket) validateBucket(config.bucket)
).mapN((_, _) => config) ).mapN((_, _) => config)
} }

View file

@ -1,13 +1,13 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import java.io.File import java.nio.file.{Files, Path, Paths}
import java.nio.file.Paths
import cats.data.NonEmptyChain import cats.data.NonEmptyChain
import cats.effect.IO import cats.effect.IO
import cats.implicits._
import net.kemitix.thorp.core.ConfigValidator.validateConfig import net.kemitix.thorp.core.ConfigValidator.validateConfig
import net.kemitix.thorp.core.ParseConfigFile.parseFile import net.kemitix.thorp.core.ParseConfigFile.parseFile
import net.kemitix.thorp.domain.Config import net.kemitix.thorp.domain.{Config, Sources}
/** /**
* Builds a configuration from settings in a file within the * Builds a configuration from settings in a file within the
@ -15,14 +15,11 @@ import net.kemitix.thorp.domain.Config
*/ */
trait ConfigurationBuilder { trait ConfigurationBuilder {
private val pwdFile: File = Paths.get(System.getenv("PWD")).toFile
private val defaultConfig: Config = Config(source = pwdFile)
def buildConfig(priorityOptions: ConfigOptions): IO[Either[NonEmptyChain[ConfigValidation], Config]] = { def buildConfig(priorityOptions: ConfigOptions): IO[Either[NonEmptyChain[ConfigValidation], Config]] = {
val source = findSource(priorityOptions) val sources = ConfigQuery.sources(priorityOptions)
for { for {
sourceOptions <- sourceOptions(source) sourceOptions <- sourceOptions(sources)
userOptions <- userOptions(priorityOptions ++ sourceOptions) userOptions <- userOptions(priorityOptions ++ sourceOptions)
globalOptions <- globalOptions(priorityOptions ++ sourceOptions ++ userOptions) globalOptions <- globalOptions(priorityOptions ++ sourceOptions ++ userOptions)
collected = priorityOptions ++ sourceOptions ++ userOptions ++ globalOptions collected = priorityOptions ++ sourceOptions ++ userOptions ++ globalOptions
@ -30,14 +27,39 @@ trait ConfigurationBuilder {
} yield validateConfig(config).toEither } yield validateConfig(config).toEither
} }
private def findSource(priorityOptions: ConfigOptions): File = private def sourceOptions(sources: Sources): IO[ConfigOptions] = {
priorityOptions.options.foldRight(pwdFile)((co, f) => co match { def existingThorpConfigFiles(sources: Sources) =
case ConfigOption.Source(source) => source.toFile sources.paths
case _ => f .map(_.resolve(".thorp.config"))
}) .filter(Files.exists(_))
private def sourceOptions(source: File): IO[ConfigOptions] = def filterForSources: IO[ConfigOptions] => IO[(Sources, ConfigOptions)] =
readFile(source, ".thorp.conf") for {configOptions <- _} yield (ConfigQuery.sources(configOptions), configOptions)
def recurseIntoSources: IO[(Sources, ConfigOptions)] => IO[ConfigOptions] =
ioSourcesConfigOptions =>
for {
sourcesConfigOptions <- ioSourcesConfigOptions
(sources, configOptions) = sourcesConfigOptions
moreSourcesConfigOptions <- filterForSources(sourceOptions(sources))
(_, moreConfigOptions) = moreSourcesConfigOptions
} yield configOptions ++ moreConfigOptions
def emptyConfig: IO[ConfigOptions] = IO.pure(ConfigOptions())
def collectConfigOptions: (IO[ConfigOptions], IO[ConfigOptions]) => IO[ConfigOptions] =
(ioConfigOptions, ioAcc) =>
for {
configOptions <- ioConfigOptions
acc <- ioAcc
} yield configOptions ++ acc
existingThorpConfigFiles(sources)
.map(ParseConfigFile.parseFile)
.map(filterForSources)
.map(recurseIntoSources)
.foldRight(emptyConfig)(collectConfigOptions)
}
private def userOptions(higherPriorityOptions: ConfigOptions): IO[ConfigOptions] = private def userOptions(higherPriorityOptions: ConfigOptions): IO[ConfigOptions] =
if (ConfigQuery.ignoreUserOptions(higherPriorityOptions)) IO(ConfigOptions()) if (ConfigQuery.ignoreUserOptions(higherPriorityOptions)) IO(ConfigOptions())
@ -47,13 +69,22 @@ trait ConfigurationBuilder {
if (ConfigQuery.ignoreGlobalOptions(higherPriorityOptions)) IO(ConfigOptions()) if (ConfigQuery.ignoreGlobalOptions(higherPriorityOptions)) IO(ConfigOptions())
else parseFile(Paths.get("/etc/thorp.conf")) else parseFile(Paths.get("/etc/thorp.conf"))
private def userHome = new File(System.getProperty("user.home")) private def userHome = Paths.get(System.getProperty("user.home"))
private def readFile(source: File, filename: String): IO[ConfigOptions] = private def readFile(source: Path, filename: String): IO[ConfigOptions] =
parseFile(source.toPath.resolve(filename)) parseFile(source.resolve(filename))
private def collateOptions(configOptions: ConfigOptions): Config = private def collateOptions(configOptions: ConfigOptions): Config = {
configOptions.options.foldRight(defaultConfig)((co, c) => co.update(c)) val pwd = Paths.get(System.getenv("PWD"))
val initialSource =
if (noSourcesProvided(configOptions)) List(pwd) else List()
val initialConfig = Config(sources = Sources(initialSource))
configOptions.options.foldLeft(initialConfig)((c, co) => co.update(c))
}
private def noSourcesProvided(configOptions: ConfigOptions) = {
ConfigQuery.sources(configOptions).paths.isEmpty
}
} }
object ConfigurationBuilder extends ConfigurationBuilder object ConfigurationBuilder extends ConfigurationBuilder

View file

@ -1,17 +1,19 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import java.io.File import java.nio.file.Path
import net.kemitix.thorp.domain.RemoteKey import net.kemitix.thorp.domain.{RemoteKey, Sources}
object KeyGenerator { object KeyGenerator {
def generateKey(source: File, prefix: RemoteKey) def generateKey(sources: Sources,
(file: File): RemoteKey = { prefix: RemoteKey)
val otherPath = file.toPath.toAbsolutePath (path: Path): RemoteKey = {
val sourcePath = source.toPath val source = sources.forPath(path)
val relativePath = sourcePath.relativize(otherPath) val relativePath = source.relativize(path.toAbsolutePath)
RemoteKey(s"${prefix.key}/$relativePath") RemoteKey(List(prefix.key, relativePath.toString)
.filterNot(_.isEmpty)
.mkString("/"))
} }
} }

View file

@ -11,58 +11,66 @@ import net.kemitix.thorp.storage.api.HashService
object LocalFileStream { object LocalFileStream {
def findFiles(file: File, def findFiles(source: Path,
hashService: HashService) hashService: HashService)
(implicit c: Config, (implicit c: Config,
logger: Logger): IO[LocalFiles] = { logger: Logger): IO[LocalFiles] = {
val filters: Path => Boolean = Filter.isIncluded(c.filters) val isIncluded: Path => Boolean = Filter.isIncluded(c.filters)
def loop(file: File): IO[LocalFiles] = { def loop(path: Path): IO[LocalFiles] = {
def dirPaths(file: File): IO[Stream[File]] = def dirPaths(path: Path): IO[Stream[Path]] =
IO(listFiles(file)) IO(listFiles(path))
.map(fs => .map(fs =>
Stream(fs: _*) Stream(fs: _*)
.filter(f => filters(f.toPath))) .map(_.toPath)
.filter(isIncluded))
def recurseIntoSubDirectories(file: File): IO[LocalFiles] = def recurseIntoSubDirectories(path: Path): IO[LocalFiles] =
file match { path.toFile match {
case f if f.isDirectory => loop(file) case f if f.isDirectory => loop(path)
case _ => localFile(hashService, file) case _ => localFile(hashService, path)
} }
def recurse(fs: Stream[File]): IO[LocalFiles] = def recurse(paths: Stream[Path]): IO[LocalFiles] =
fs.foldLeft(IO.pure(LocalFiles()))((acc, f) => paths.foldLeft(IO.pure(LocalFiles()))((acc, path) =>
recurseIntoSubDirectories(f) recurseIntoSubDirectories(path)
.flatMap(localFiles => acc.map(accLocalFiles => accLocalFiles ++ localFiles))) .flatMap(localFiles => acc.map(accLocalFiles => accLocalFiles ++ localFiles)))
for { for {
_ <- logger.debug(s"- Entering: $file") _ <- logger.debug(s"- Entering: $path")
fs <- dirPaths(file) fs <- dirPaths(path)
lfs <- recurse(fs) lfs <- recurse(fs)
_ <- logger.debug(s"- Leaving : $file") _ <- logger.debug(s"- Leaving : $path")
} yield lfs } yield lfs
} }
loop(file) loop(source)
} }
private def localFile(hashService: HashService, private def localFile(hashService: HashService,
file: File) path: Path)
(implicit l: Logger, c: Config) = { (implicit l: Logger, c: Config) = {
val file = path.toFile
val source = c.sources.forPath(path)
for { for {
hash <- hashService.hashLocalObject(file) hash <- hashService.hashLocalObject(path)
} yield } yield
LocalFiles( LocalFiles(
localFiles = Stream(domain.LocalFile(file, c.source, hash, generateKey(c.source, c.prefix)(file))), localFiles = Stream(
domain.LocalFile(
file,
source.toFile,
hash,
generateKey(c.sources, c.prefix)(path))),
count = 1, count = 1,
totalSizeBytes = file.length) totalSizeBytes = file.length)
} }
//TODO: Change this to return an Either[IllegalArgumentException, Array[File]] //TODO: Change this to return an Either[IllegalArgumentException, Array[File]]
private def listFiles(file: File) = { private def listFiles(path: Path): Array[File] = {
Option(file.listFiles) Option(path.toFile.listFiles)
.getOrElse(throw new IllegalArgumentException(s"Directory not found $file")) .getOrElse(throw new IllegalArgumentException(s"Directory not found $path"))
} }
} }

View file

@ -1,6 +1,7 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import java.io.{File, FileInputStream} import java.io.{File, FileInputStream}
import java.nio.file.Path
import java.security.MessageDigest import java.security.MessageDigest
import cats.effect.IO import cats.effect.IO
@ -25,8 +26,8 @@ object MD5HashGenerator {
md5.digest md5.digest
} }
def md5File(file: File)(implicit logger: Logger): IO[MD5Hash] = def md5File(path: Path)(implicit logger: Logger): IO[MD5Hash] =
md5FileChunk(file, 0, file.length) md5FileChunk(path, 0, path.toFile.length)
private def openFile(file: File, offset: Long) = IO { private def openFile(file: File, offset: Long) = IO {
val stream = new FileInputStream(file) val stream = new FileInputStream(file)
@ -68,16 +69,17 @@ object MD5HashGenerator {
result.toInt result.toInt
} }
def md5FileChunk(file: File, def md5FileChunk(path: Path,
offset: Long, offset: Long,
size: Long) size: Long)
(implicit logger: Logger): IO[MD5Hash] = { (implicit logger: Logger): IO[MD5Hash] = {
val file = path.toFile
val endOffset = Math.min(offset + size, file.length) val endOffset = Math.min(offset + size, file.length)
for { for {
_ <- logger.debug(s"md5:reading:size ${file.length}:$file") _ <- logger.debug(s"md5:reading:size ${file.length}:$path")
digest <- readFile(file, offset, endOffset) digest <- readFile(file, offset, endOffset)
hash = MD5Hash.fromDigest(digest) hash = MD5Hash.fromDigest(digest)
_ <- logger.debug(s"md5:generated:${hash.hash}:$file") _ <- logger.debug(s"md5:generated:${hash.hash}:$path")
} yield hash } yield hash
} }

View file

@ -43,7 +43,7 @@ trait PlanBuilder {
hashService: HashService) hashService: HashService)
(implicit c: Config, l: Logger): EitherT[IO, List[String], SyncPlan] = { (implicit c: Config, l: Logger): EitherT[IO, List[String], SyncPlan] = {
for { for {
_ <- EitherT.liftF(SyncLogging.logRunStart(c.bucket, c.prefix, c.source)) _ <- EitherT.liftF(SyncLogging.logRunStart(c.bucket, c.prefix, c.sources))
actions <- gatherMetadata(storageService, hashService) actions <- gatherMetadata(storageService, hashService)
.leftMap(error => List(error)) .leftMap(error => List(error))
.map(assemblePlan) .map(assemblePlan)
@ -61,7 +61,7 @@ trait PlanBuilder {
private def actionsForLocalFiles(localData: LocalFiles, remoteData: S3ObjectsData) private def actionsForLocalFiles(localData: LocalFiles, remoteData: S3ObjectsData)
(implicit c: Config) = (implicit c: Config) =
localData.localFiles.foldLeft(Stream[Action]())((acc, lf) => createActionFromLocalFile(lf, remoteData) ++ acc) localData.localFiles.foldLeft(Stream[Action]())((acc, lf) => createActionFromLocalFile(lf, remoteData, acc) ++ acc)
private def actionsForRemoteKeys(remoteData: S3ObjectsData) private def actionsForRemoteKeys(remoteData: S3ObjectsData)
(implicit c: Config) = (implicit c: Config) =
@ -75,16 +75,33 @@ trait PlanBuilder {
(implicit config: Config, l: Logger) = (implicit config: Config, l: Logger) =
for { for {
_ <- SyncLogging.logFileScan _ <- SyncLogging.logFileScan
localFiles <- LocalFileStream.findFiles(config.source, hashService) localFiles <- findFiles(hashService)
} yield localFiles } yield localFiles
private def createActionFromLocalFile(lf: LocalFile, remoteData: S3ObjectsData) private def findFiles(hashService: HashService)
(implicit c: Config, l: Logger): IO[LocalFiles] = {
val ioListLocalFiles = (for {
source <- c.sources.paths
} yield LocalFileStream.findFiles(source, hashService)).sequence
for {
listLocalFiles <- ioListLocalFiles
localFiles = listLocalFiles.foldRight(LocalFiles()){
(acc, moreLocalFiles) => {
acc ++ moreLocalFiles
}
}
} yield localFiles
}
private def createActionFromLocalFile(lf: LocalFile,
remoteData: S3ObjectsData,
previousActions: Stream[Action])
(implicit c: Config) = (implicit c: Config) =
ActionGenerator.createActions(S3MetaDataEnricher.getMetadata(lf, remoteData)) ActionGenerator.createActions(S3MetaDataEnricher.getMetadata(lf, remoteData), previousActions)
private def createActionFromRemoteKey(rk: RemoteKey) private def createActionFromRemoteKey(rk: RemoteKey)
(implicit c: Config) = (implicit c: Config) =
if (rk.isMissingLocally(c.source, c.prefix)) Action.ToDelete(c.bucket, rk, 0L) if (rk.isMissingLocally(c.sources, c.prefix)) Action.ToDelete(c.bucket, rk, 0L)
else DoNothing(c.bucket, rk, 0L) else DoNothing(c.bucket, rk, 0L)
} }

View file

@ -0,0 +1,19 @@
package net.kemitix.thorp.core
import java.nio.file.Path
import cats.effect.IO
import net.kemitix.thorp.domain.{Logger, MD5Hash}
import net.kemitix.thorp.storage.api.HashService
case class SimpleHashService() extends HashService {
override def hashLocalObject(path: Path)
(implicit l: Logger): IO[Map[String, MD5Hash]] =
for {
md5 <- MD5HashGenerator.md5File(path)
} yield Map(
"md5" -> md5
)
}

View file

@ -1,7 +1,5 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import java.io.File
import cats.effect.IO import cats.effect.IO
import cats.implicits._ import cats.implicits._
import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, ErrorQueueEvent, UploadQueueEvent} import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, ErrorQueueEvent, UploadQueueEvent}
@ -11,13 +9,15 @@ trait SyncLogging {
def logRunStart(bucket: Bucket, def logRunStart(bucket: Bucket,
prefix: RemoteKey, prefix: RemoteKey,
source: File) sources: Sources)
(implicit logger: Logger): IO[Unit] = (implicit logger: Logger): IO[Unit] = {
logger.info(s"Bucket: ${bucket.name}, Prefix: ${prefix.key}, Source: $source, ") val sourcesList = sources.paths.mkString(", ")
logger.info(s"Bucket: ${bucket.name}, Prefix: ${prefix.key}, Source: $sourcesList, ")
}
def logFileScan(implicit c: Config, def logFileScan(implicit c: Config,
logger: Logger): IO[Unit] = logger: Logger): IO[Unit] =
logger.info(s"Scanning local files: ${c.source}...") logger.info(s"Scanning local files: ${c.sources}...")
def logErrors(actions: Stream[StorageQueueEvent]) def logErrors(actions: Stream[StorageQueueEvent])
(implicit logger: Logger): IO[Unit] = (implicit logger: Logger): IO[Unit] =

View file

@ -10,19 +10,22 @@ class ActionGeneratorSuite
extends FunSpec { extends FunSpec {
private val source = Resource(this, "upload") private val source = Resource(this, "upload")
private val sourcePath = source.toPath
private val prefix = RemoteKey("prefix") private val prefix = RemoteKey("prefix")
private val bucket = Bucket("bucket") private val bucket = Bucket("bucket")
implicit private val config: Config = Config(bucket, prefix, source = source) implicit private val config: Config = Config(bucket, prefix, sources = Sources(List(sourcePath)))
private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _ private val fileToKey = KeyGenerator.generateKey(config.sources, config.prefix) _
val lastModified = LastModified(Instant.now()) val lastModified = LastModified(Instant.now())
describe("create actions") { describe("create actions") {
def invoke(input: S3MetaData) = ActionGenerator.createActions(input).toList val previousActions = Stream.empty[Action]
def invoke(input: S3MetaData) = ActionGenerator.createActions(input, previousActions).toList
describe("#1 local exists, remote exists, remote matches - do nothing") { describe("#1 local exists, remote exists, remote matches - do nothing") {
val theHash = MD5Hash("the-hash") val theHash = MD5Hash("the-hash")
val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey)
val theRemoteMetadata = RemoteMetaData(theFile.remoteKey, theHash, lastModified) val theRemoteMetadata = RemoteMetaData(theFile.remoteKey, theHash, lastModified)
val input = S3MetaData(theFile, // local exists val input = S3MetaData(theFile, // local exists
matchByHash = Set(theRemoteMetadata), // remote matches matchByHash = Set(theRemoteMetadata), // remote matches
@ -36,7 +39,7 @@ class ActionGeneratorSuite
} }
describe("#2 local exists, remote is missing, other matches - copy") { describe("#2 local exists, remote is missing, other matches - copy") {
val theHash = MD5Hash("the-hash") val theHash = MD5Hash("the-hash")
val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey)
val theRemoteKey = theFile.remoteKey val theRemoteKey = theFile.remoteKey
val otherRemoteKey = prefix.resolve("other-key") val otherRemoteKey = prefix.resolve("other-key")
val otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash, lastModified) val otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash, lastModified)
@ -51,7 +54,7 @@ class ActionGeneratorSuite
} }
describe("#3 local exists, remote is missing, other no matches - upload") { describe("#3 local exists, remote is missing, other no matches - upload") {
val theHash = MD5Hash("the-hash") val theHash = MD5Hash("the-hash")
val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey)
val input = S3MetaData(theFile, // local exists val input = S3MetaData(theFile, // local exists
matchByHash = Set.empty, // other no matches matchByHash = Set.empty, // other no matches
matchByKey = None) // remote is missing matchByKey = None) // remote is missing
@ -63,7 +66,7 @@ class ActionGeneratorSuite
} }
describe("#4 local exists, remote exists, remote no match, other matches - copy") { describe("#4 local exists, remote exists, remote no match, other matches - copy") {
val theHash = MD5Hash("the-hash") val theHash = MD5Hash("the-hash")
val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey)
val theRemoteKey = theFile.remoteKey val theRemoteKey = theFile.remoteKey
val oldHash = MD5Hash("old-hash") val oldHash = MD5Hash("old-hash")
val otherRemoteKey = prefix.resolve("other-key") val otherRemoteKey = prefix.resolve("other-key")
@ -82,7 +85,7 @@ class ActionGeneratorSuite
} }
describe("#5 local exists, remote exists, remote no match, other no matches - upload") { describe("#5 local exists, remote exists, remote no match, other no matches - upload") {
val theHash = MD5Hash("the-hash") val theHash = MD5Hash("the-hash")
val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey)
val theRemoteKey = theFile.remoteKey val theRemoteKey = theFile.remoteKey
val oldHash = MD5Hash("old-hash") val oldHash = MD5Hash("old-hash")
val theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash, lastModified) val theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash, lastModified)

View file

@ -0,0 +1,31 @@
package net.kemitix.thorp.core
import net.kemitix.thorp.domain.Sources
import org.scalatest.FunSpec
class ConfigOptionTest extends FunSpec with TemporaryFolder {
describe("when more than one source") {
it("should preserve their order") {
withDirectory(path1 => {
withDirectory(path2 => {
val configOptions = ConfigOptions(List(
ConfigOption.Source(path1),
ConfigOption.Source(path2),
ConfigOption.Bucket("bucket"),
ConfigOption.IgnoreGlobalOptions,
ConfigOption.IgnoreUserOptions
))
val expected = Sources(List(path1, path2))
val result = invoke(configOptions)
assert(result.isRight, result)
assertResult(expected)(ConfigQuery.sources(configOptions))
})
})
}
}
private def invoke(configOptions: ConfigOptions) = {
ConfigurationBuilder.buildConfig(configOptions).unsafeRunSync
}
}

View file

@ -0,0 +1,129 @@
package net.kemitix.thorp.core
import java.nio.file.{Path, Paths}
import net.kemitix.thorp.domain._
import org.scalatest.FunSpec
class ConfigurationBuilderTest extends FunSpec with TemporaryFolder {
private val pwd: Path = Paths.get(System.getenv("PWD"))
private val aBucket = Bucket("aBucket")
private val coBucket: ConfigOption.Bucket = ConfigOption.Bucket(aBucket.name)
private val thorpConfigFileName = ".thorp.config"
private def configOptions(options: ConfigOption*): ConfigOptions =
ConfigOptions(List(
ConfigOption.IgnoreUserOptions,
ConfigOption.IgnoreGlobalOptions
) ++ options)
describe("when no source") {
it("should use the current (PWD) directory") {
val expected = Right(Config(aBucket, sources = Sources(List(pwd))))
val options = configOptions(coBucket)
val result = invoke(options)
assertResult(expected)(result)
}
}
describe("when has a single source with no .thorp.config") {
it("should only include the source once") {
withDirectory(aSource => {
val expected = Right(Sources(List(aSource)))
val options = configOptions(ConfigOption.Source(aSource), coBucket)
val result = invoke(options).map(_.sources)
assertResult(expected)(result)
})
}
}
describe("when has two sources") {
it("should include both sources in order") {
withDirectory(currentSource => {
withDirectory(previousSource => {
val expected = Right(List(currentSource, previousSource))
val options = configOptions(
ConfigOption.Source(currentSource),
ConfigOption.Source(previousSource),
coBucket)
val result = invoke(options).map(_.sources.paths)
assertResult(expected)(result)
})
})
}
}
describe("when current source has .thorp.config with source to another") {
it("should include both sources in order") {
withDirectory(currentSource => {
withDirectory(previousSource => {
writeFile(currentSource, thorpConfigFileName,
s"source = $previousSource")
val expected = Right(List(currentSource, previousSource))
val options = configOptions(
ConfigOption.Source(currentSource),
coBucket)
val result = invoke(options).map(_.sources.paths)
assertResult(expected)(result)
})
})
}
describe("when settings are in current and previous") {
it("should include some settings from both sources and some from only current") {
withDirectory(previousSource => {
withDirectory(currentSource => {
writeFile(currentSource, thorpConfigFileName,
s"source = $previousSource",
"bucket = current-bucket",
"prefix = current-prefix",
"include = current-include",
"exclude = current-exclude")
writeFile(previousSource, thorpConfigFileName,
"bucket = previous-bucket",
"prefix = previous-prefix",
"include = previous-include",
"exclude = previous-exclude")
// should have both sources in order
val expectedSources = Right(Sources(List(currentSource, previousSource)))
// should have bucket from current only
val expectedBuckets = Right(Bucket("current-bucket"))
// should have prefix from current only
val expectedPrefixes = Right(RemoteKey("current-prefix"))
// should have filters from both sources
val expectedFilters = Right(List(
Filter.Exclude("previous-exclude"),
Filter.Include("previous-include"),
Filter.Exclude("current-exclude"),
Filter.Include("current-include")))
val options = configOptions(ConfigOption.Source(currentSource))
val result = invoke(options)
assertResult(expectedSources)(result.map(_.sources))
assertResult(expectedBuckets)(result.map(_.bucket))
assertResult(expectedPrefixes)(result.map(_.prefix))
assertResult(expectedFilters)(result.map(_.filters))
})
})
}
}
}
describe("when source has thorp.config source to another source that does the same") {
it("should include all three sources") {
withDirectory(currentSource => {
withDirectory(parentSource => {
writeFile(currentSource, thorpConfigFileName, s"source = $parentSource")
withDirectory(grandParentSource => {
writeFile(parentSource, thorpConfigFileName, s"source = $grandParentSource")
val expected = Right(List(currentSource, parentSource, grandParentSource))
val options = configOptions(
ConfigOption.Source(currentSource),
coBucket)
val result = invoke(options).map(_.sources.paths)
assertResult(expected)(result)
})
})
})
}
}
private def invoke(configOptions: ConfigOptions) =
ConfigurationBuilder.buildConfig(configOptions).unsafeRunSync
}

View file

@ -1,11 +1,16 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import java.io.File import java.nio.file.Path
import cats.effect.IO import cats.effect.IO
import net.kemitix.thorp.domain.{Logger, MD5Hash} import net.kemitix.thorp.domain.{Logger, MD5Hash}
import net.kemitix.thorp.storage.api.HashService import net.kemitix.thorp.storage.api.HashService
case class DummyHashService(hashes: Map[File, Map[String, MD5Hash]]) extends HashService { case class DummyHashService(hashes: Map[Path, Map[String, MD5Hash]])
override def hashLocalObject(file: File)(implicit l: Logger): IO[Map[String, MD5Hash]] = IO.pure(hashes(file)) extends HashService {
override def hashLocalObject(path: Path)
(implicit l: Logger): IO[Map[String, MD5Hash]] =
IO.pure(hashes(path))
} }

View file

@ -0,0 +1,41 @@
package net.kemitix.thorp.core
import java.io.File
import cats.data.EitherT
import cats.effect.IO
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.StorageService
case class DummyStorageService(s3ObjectData: S3ObjectsData,
uploadFiles: Map[File, (RemoteKey, MD5Hash)])
extends StorageService {
override def shutdown: IO[StorageQueueEvent] =
IO.pure(StorageQueueEvent.ShutdownQueueEvent())
override def listObjects(bucket: Bucket,
prefix: RemoteKey)
(implicit l: Logger): EitherT[IO, String, S3ObjectsData] =
EitherT.liftF(IO.pure(s3ObjectData))
override def upload(localFile: LocalFile,
bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener,
tryCount: Int): IO[StorageQueueEvent] = {
val (remoteKey, md5Hash) = uploadFiles(localFile.file)
IO.pure(StorageQueueEvent.UploadQueueEvent(remoteKey, md5Hash))
}
override def copy(bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey): IO[StorageQueueEvent] =
IO.pure(StorageQueueEvent.CopyQueueEvent(targetKey))
override def delete(bucket: Bucket,
remoteKey: RemoteKey): IO[StorageQueueEvent] =
IO.pure(StorageQueueEvent.DeleteQueueEvent(remoteKey))
}

View file

@ -2,32 +2,30 @@ package net.kemitix.thorp.core
import java.io.File import java.io.File
import net.kemitix.thorp.domain.{Bucket, Config, RemoteKey} import net.kemitix.thorp.domain.{Bucket, Config, RemoteKey, Sources}
import org.scalatest.FunSpec import org.scalatest.FunSpec
class KeyGeneratorSuite extends FunSpec { class KeyGeneratorSuite extends FunSpec {
private val source: File = Resource(this, "upload") private val source: File = Resource(this, "upload")
private val sourcePath = source.toPath
private val prefix = RemoteKey("prefix") private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) implicit private val config: Config = Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _ private val fileToKey = KeyGenerator.generateKey(config.sources, config.prefix) _
describe("key generator") { describe("key generator") {
def resolve(subdir: String): File = {
source.toPath.resolve(subdir).toFile
}
describe("when file is within source") { describe("when file is within source") {
it("has a valid key") { it("has a valid key") {
val subdir = "subdir" val subdir = "subdir"
assertResult(RemoteKey(s"${prefix.key}/$subdir"))(fileToKey(resolve(subdir))) assertResult(RemoteKey(s"${prefix.key}/$subdir"))(fileToKey(sourcePath.resolve(subdir)))
} }
} }
describe("when file is deeper within source") { describe("when file is deeper within source") {
it("has a valid key") { it("has a valid key") {
val subdir = "subdir/deeper/still" val subdir = "subdir/deeper/still"
assertResult(RemoteKey(s"${prefix.key}/$subdir"))(fileToKey(resolve(subdir))) assertResult(RemoteKey(s"${prefix.key}/$subdir"))(fileToKey(sourcePath.resolve(subdir)))
} }
} }
} }

View file

@ -2,22 +2,23 @@ package net.kemitix.thorp.core
import java.nio.file.Paths import java.nio.file.Paths
import net.kemitix.thorp.domain.{Config, LocalFile, Logger, MD5HashData} import net.kemitix.thorp.domain.{Config, LocalFile, Logger, MD5HashData, Sources}
import net.kemitix.thorp.storage.api.HashService import net.kemitix.thorp.storage.api.HashService
import org.scalatest.FunSpec import org.scalatest.FunSpec
class LocalFileStreamSuite extends FunSpec { class LocalFileStreamSuite extends FunSpec {
private val uploadResource = Resource(this, "upload") private val source = Resource(this, "upload")
private val sourcePath = source.toPath
private val hashService: HashService = DummyHashService(Map( private val hashService: HashService = DummyHashService(Map(
file("root-file") -> Map("md5" -> MD5HashData.Root.hash), file("root-file") -> Map("md5" -> MD5HashData.Root.hash),
file("subdir/leaf-file") -> Map("md5" -> MD5HashData.Leaf.hash) file("subdir/leaf-file") -> Map("md5" -> MD5HashData.Leaf.hash)
)) ))
private def file(filename: String) = private def file(filename: String) =
uploadResource.toPath.resolve(Paths.get(filename)).toFile sourcePath.resolve(Paths.get(filename))
implicit private val config: Config = Config(source = uploadResource) implicit private val config: Config = Config(sources = Sources(List(sourcePath)))
implicit private val logger: Logger = new DummyLogger implicit private val logger: Logger = new DummyLogger
describe("findFiles") { describe("findFiles") {
@ -37,7 +38,7 @@ class LocalFileStreamSuite extends FunSpec {
} }
} }
private def invoke = { private def invoke =
LocalFileStream.findFiles(uploadResource, hashService).unsafeRunSync LocalFileStream.findFiles(sourcePath, hashService).unsafeRunSync
}
} }

View file

@ -7,37 +7,38 @@ import org.scalatest.FunSpec
class MD5HashGeneratorTest extends FunSpec { class MD5HashGeneratorTest extends FunSpec {
private val source = Resource(this, "upload") private val source = Resource(this, "upload")
private val sourcePath = source.toPath
private val prefix = RemoteKey("prefix") private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) implicit private val config: Config = Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
implicit private val logger: Logger = new DummyLogger implicit private val logger: Logger = new DummyLogger
describe("read a small file (smaller than buffer)") { describe("read a small file (smaller than buffer)") {
val file = Resource(this, "upload/root-file") val path = Resource(this, "upload/root-file").toPath
it("should generate the correct hash") { it("should generate the correct hash") {
val result = MD5HashGenerator.md5File(file).unsafeRunSync val result = MD5HashGenerator.md5File(path).unsafeRunSync
assertResult(Root.hash)(result) assertResult(Root.hash)(result)
} }
} }
describe("read a large file (bigger than buffer)") { describe("read a large file (bigger than buffer)") {
val file = Resource(this, "big-file") val path = Resource(this, "big-file").toPath
it("should generate the correct hash") { it("should generate the correct hash") {
val expected = MD5HashData.BigFile.hash val expected = MD5HashData.BigFile.hash
val result = MD5HashGenerator.md5File(file).unsafeRunSync val result = MD5HashGenerator.md5File(path).unsafeRunSync
assertResult(expected)(result) assertResult(expected)(result)
} }
} }
describe("read chunks of file") { describe("read chunks of file") {
val file = Resource(this, "big-file") val path = Resource(this, "big-file").toPath
it("should generate the correct hash for first chunk of the file") { it("should generate the correct hash for first chunk of the file") {
val part1 = MD5HashData.BigFile.Part1 val part1 = MD5HashData.BigFile.Part1
val expected = part1.hash val expected = part1.hash
val result = MD5HashGenerator.md5FileChunk(file, part1.offset, part1.size).unsafeRunSync val result = MD5HashGenerator.md5FileChunk(path, part1.offset, part1.size).unsafeRunSync
assertResult(expected)(result) assertResult(expected)(result)
} }
it("should generate the correcy hash for second chunk of the file") { it("should generate the correcy hash for second chunk of the file") {
val part2 = MD5HashData.BigFile.Part2 val part2 = MD5HashData.BigFile.Part2
val expected = part2.hash val expected = part2.hash
val result = MD5HashGenerator.md5FileChunk(file, part2.offset, part2.size).unsafeRunSync val result = MD5HashGenerator.md5FileChunk(path, part2.offset, part2.size).unsafeRunSync
assertResult(expected)(result) assertResult(expected)(result)
} }
} }

View file

@ -0,0 +1,404 @@
package net.kemitix.thorp.core
import java.io.File
import java.nio.file.Path
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.{HashService, StorageService}
import org.scalatest.FreeSpec
class PlanBuilderTest extends FreeSpec with TemporaryFolder {
private val planBuilder = new PlanBuilder {}
private val emptyS3ObjectData = S3ObjectsData()
private implicit val logger: Logger = new DummyLogger
val lastModified: LastModified = LastModified()
"create a plan" - {
val hashService = SimpleHashService()
"one source" - {
"a file" - {
val filename = "aFile"
val remoteKey = RemoteKey(filename)
"with no matching remote key" - {
"with no other remote key with matching hash" - {
"upload file" in {
withDirectory(source => {
val file = createFile(source, filename, "file-content")
val hash = md5Hash(file)
val expected = Right(List(
toUpload(remoteKey, hash, source, file)
))
val storageService = DummyStorageService(emptyS3ObjectData, Map(
file -> (remoteKey, hash)
))
val result = invoke(storageService, hashService, configOptions(
ConfigOption.Source(source),
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result)
})
}
}
"with another remote key with matching hash" - {
"copy file" in {
withDirectory(source => {
val anOtherFilename = "other"
val content = "file-content"
val aFile = createFile(source, filename, content)
val anOtherFile = createFile(source, anOtherFilename, content)
val aHash = md5Hash(aFile)
val anOtherKey = RemoteKey("other")
val expected = Right(List(
toCopy(anOtherKey, aHash, remoteKey)
))
val s3ObjectsData = S3ObjectsData(
byHash = Map(aHash -> Set(KeyModified(anOtherKey, lastModified))),
byKey = Map(anOtherKey -> HashModified(aHash, lastModified))
)
val storageService = DummyStorageService(s3ObjectsData, Map(
aFile -> (remoteKey, aHash)
))
val result = invoke(storageService, hashService, configOptions(
ConfigOption.Source(source),
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result)
})
}
}
}
"with matching remote key" - {
"with matching hash" - {
"do nothing" in {
withDirectory(source => {
val file = createFile(source, filename, "file-content")
val hash = md5Hash(file)
// DoNothing actions should have been filtered out of the plan
val expected = Right(List())
val s3ObjectsData = S3ObjectsData(
byHash = Map(hash -> Set(KeyModified(remoteKey, lastModified))),
byKey = Map(remoteKey -> HashModified(hash, lastModified))
)
val storageService = DummyStorageService(s3ObjectsData, Map(
file -> (remoteKey, hash)
))
val result = invoke(storageService, hashService, configOptions(
ConfigOption.Source(source),
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result)
})
}
}
"with different hash" - {
"with no matching remote hash" - {
"upload file" in {
withDirectory(source => {
val file = createFile(source, filename, "file-content")
val currentHash = md5Hash(file)
val originalHash = MD5Hash("original-file-content")
val expected = Right(List(
toUpload(remoteKey, currentHash, source, file)
))
val s3ObjectsData = S3ObjectsData(
byHash = Map(originalHash -> Set(KeyModified(remoteKey, lastModified))),
byKey = Map(remoteKey -> HashModified(originalHash, lastModified))
)
val storageService = DummyStorageService(s3ObjectsData, Map(
file -> (remoteKey, currentHash)
))
val result = invoke(storageService, hashService, configOptions(
ConfigOption.Source(source),
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result)
})
}
}
"with matching remote hash" - {
"copy file" in {
withDirectory(source => {
val file = createFile(source, filename, "file-content")
val hash = md5Hash(file)
val sourceKey = RemoteKey("other-key")
val expected = Right(List(
toCopy(sourceKey, hash, remoteKey)
))
val s3ObjectsData = S3ObjectsData(
byHash = Map(hash -> Set(KeyModified(sourceKey, lastModified))),
byKey = Map()
)
val storageService = DummyStorageService(s3ObjectsData, Map(
file -> (remoteKey, hash)
))
val result = invoke(storageService, hashService, configOptions(
ConfigOption.Source(source),
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result)
})
}
}
}
}
}
"a remote key" - {
val filename = "aFile"
val remoteKey = RemoteKey(filename)
"with a matching local file" - {
"do nothing" in {
withDirectory(source => {
val file = createFile(source, filename, "file-content")
val hash = md5Hash(file)
// DoNothing actions should have been filtered out of the plan
val expected = Right(List())
val s3ObjectsData = S3ObjectsData(
byHash = Map(hash -> Set(KeyModified(remoteKey, lastModified))),
byKey = Map(remoteKey -> HashModified(hash, lastModified))
)
val storageService = DummyStorageService(s3ObjectsData, Map(
file -> (remoteKey, hash)
))
val result = invoke(storageService, hashService, configOptions(
ConfigOption.Source(source),
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result)
})
}
}
"with no matching local file" - {
"delete remote key" ignore {
withDirectory(source => {
val hash = MD5Hash("file-content")
val expected = Right(List(
toDelete(remoteKey)
))
val s3ObjectsData = S3ObjectsData(
byHash = Map(hash -> Set(KeyModified(remoteKey, lastModified))),
byKey = Map(remoteKey -> HashModified(hash, lastModified))
)
val storageService = DummyStorageService(s3ObjectsData, Map.empty)
val result = invoke(storageService, hashService, configOptions(
ConfigOption.Source(source),
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result)
})
}
}
}
}
"two sources" - {
val filename1 = "file-1"
val filename2 = "file-2"
val remoteKey1 = RemoteKey(filename1)
val remoteKey2 = RemoteKey(filename2)
"unique files in both" - {
"upload all files" in {
withDirectory(firstSource => {
val fileInFirstSource = createFile(firstSource, filename1, "file-1-content")
val hash1 = md5Hash(fileInFirstSource)
withDirectory(secondSource => {
val fileInSecondSource = createFile(secondSource, filename2, "file-2-content")
val hash2 = md5Hash(fileInSecondSource)
val expected = Right(List(
toUpload(remoteKey2, hash2, secondSource, fileInSecondSource),
toUpload(remoteKey1, hash1, firstSource, fileInFirstSource)
))
val storageService = DummyStorageService(emptyS3ObjectData, Map(
fileInFirstSource -> (remoteKey1, hash1),
fileInSecondSource -> (remoteKey2, hash2)))
val result = invoke(storageService, hashService, configOptions(
ConfigOption.Source(firstSource),
ConfigOption.Source(secondSource),
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result)
})
})
}
}
"same filename in both" - {
"only upload file in first source" in {
withDirectory(firstSource => {
val fileInFirstSource: File = createFile(firstSource, filename1, "file-1-content")
val hash1 = md5Hash(fileInFirstSource)
withDirectory(secondSource => {
val fileInSecondSource: File = createFile(secondSource, filename1, "file-2-content")
val hash2 = md5Hash(fileInSecondSource)
val expected = Right(List(
toUpload(remoteKey1, hash1, firstSource, fileInFirstSource)
))
val storageService = DummyStorageService(emptyS3ObjectData, Map(
fileInFirstSource -> (remoteKey1, hash1),
fileInSecondSource -> (remoteKey2, hash2)))
val result = invoke(storageService, hashService, configOptions(
ConfigOption.Source(firstSource),
ConfigOption.Source(secondSource),
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result)
})
})
}
}
"with a remote file only present in second source" - {
"do not delete it " in {
withDirectory(firstSource => {
withDirectory(secondSource => {
val fileInSecondSource = createFile(secondSource, filename2, "file-2-content")
val hash2 = md5Hash(fileInSecondSource)
val expected = Right(List())
val s3ObjectData = S3ObjectsData(
byHash = Map(hash2 -> Set(KeyModified(remoteKey2, lastModified))),
byKey = Map(remoteKey2 -> HashModified(hash2, lastModified)))
val storageService = DummyStorageService(s3ObjectData, Map(
fileInSecondSource -> (remoteKey2, hash2)))
val result = invoke(storageService, hashService, configOptions(
ConfigOption.Source(firstSource),
ConfigOption.Source(secondSource),
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result)
})
})
}
}
"with remote file only present in first source" - {
"do not delete it" in {
withDirectory(firstSource => {
val fileInFirstSource: File = createFile(firstSource, filename1, "file-1-content")
val hash1 = md5Hash(fileInFirstSource)
withDirectory(secondSource => {
val expected = Right(List())
val s3ObjectData = S3ObjectsData(
byHash = Map(hash1 -> Set(KeyModified(remoteKey1, lastModified))),
byKey = Map(remoteKey1 -> HashModified(hash1, lastModified)))
val storageService = DummyStorageService(s3ObjectData, Map(
fileInFirstSource -> (remoteKey1, hash1)))
val result = invoke(storageService, hashService, configOptions(
ConfigOption.Source(firstSource),
ConfigOption.Source(secondSource),
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result)
})
})
}
}
"with remote file not present in either source" - {
"delete from remote" in {
withDirectory(firstSource => {
withDirectory(secondSource => {
val expected = Right(List(
toDelete(remoteKey1)
))
val s3ObjectData = S3ObjectsData(
byKey = Map(remoteKey1 -> HashModified(MD5Hash(""), lastModified)))
val storageService = DummyStorageService(s3ObjectData, Map())
val result = invoke(storageService, hashService, configOptions(
ConfigOption.Source(firstSource),
ConfigOption.Source(secondSource),
ConfigOption.Bucket("a-bucket")))
assertResult(expected)(result)
})
})
}
}
}
def md5Hash(file: File) = {
hashService.hashLocalObject(file.toPath).unsafeRunSync()("md5")
}
}
private def toUpload(remoteKey: RemoteKey,
md5Hash: MD5Hash,
source: Path,
file: File): (String, String, String, String, String) =
("upload", remoteKey.key, md5Hash.hash, source.toString, file.toString)
private def toCopy(sourceKey: RemoteKey,
md5Hash: MD5Hash,
targetKey: RemoteKey): (String, String, String, String, String) =
("copy", sourceKey.key, md5Hash.hash, targetKey.key, "")
private def toDelete(remoteKey: RemoteKey): (String, String, String, String, String) =
("delete", remoteKey.key, "", "", "")
private def configOptions(configOptions: ConfigOption*): ConfigOptions =
ConfigOptions(List(configOptions:_*))
private def invoke(storageService: StorageService,
hashService: HashService,
configOptions: ConfigOptions): Either[List[String], List[(String, String, String, String, String)]] =
planBuilder.createPlan(storageService, hashService, configOptions)
.value.unsafeRunSync().map(_.actions.toList.map({
case ToUpload(_, lf, _) => ("upload", lf.remoteKey.key, lf.hashes("md5").hash, lf.source.toString, lf.file.toString)
case ToDelete(_, remoteKey, _) => ("delete", remoteKey.key, "", "", "")
case ToCopy(_, sourceKey, hash, targetKey, _) => ("copy", sourceKey.key, hash.hash, targetKey.key, "")
case DoNothing(_, remoteKey, _) => ("do-nothing", remoteKey.key, "", "", "")
case x => ("other", x.toString, "", "", "")
}))
}

View file

@ -10,9 +10,10 @@ class S3MetaDataEnricherSuite
extends FunSpec { extends FunSpec {
private val source = Resource(this, "upload") private val source = Resource(this, "upload")
private val sourcePath = source.toPath
private val prefix = RemoteKey("prefix") private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) implicit private val config: Config = Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _ private val fileToKey = KeyGenerator.generateKey(config.sources, config.prefix) _
val lastModified = LastModified(Instant.now()) val lastModified = LastModified(Instant.now())
def getMatchesByKey(status: (Option[HashModified], Set[(MD5Hash, KeyModified)])): Option[HashModified] = { def getMatchesByKey(status: (Option[HashModified], Set[(MD5Hash, KeyModified)])): Option[HashModified] = {
@ -29,7 +30,7 @@ class S3MetaDataEnricherSuite
describe("#1a local exists, remote exists, remote matches, other matches - do nothing") { describe("#1a local exists, remote exists, remote matches, other matches - do nothing") {
val theHash: MD5Hash = MD5Hash("the-file-hash") val theHash: MD5Hash = MD5Hash("the-file-hash")
val theFile: LocalFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) val theFile: LocalFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey)
val theRemoteKey: RemoteKey = theFile.remoteKey val theRemoteKey: RemoteKey = theFile.remoteKey
val s3: S3ObjectsData = S3ObjectsData( val s3: S3ObjectsData = S3ObjectsData(
byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))), byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))),
@ -46,7 +47,7 @@ class S3MetaDataEnricherSuite
} }
describe("#1b local exists, remote exists, remote matches, other no matches - do nothing") { describe("#1b local exists, remote exists, remote matches, other no matches - do nothing") {
val theHash: MD5Hash = MD5Hash("the-file-hash") val theHash: MD5Hash = MD5Hash("the-file-hash")
val theFile: LocalFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) val theFile: LocalFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey)
val theRemoteKey: RemoteKey = prefix.resolve("the-file") val theRemoteKey: RemoteKey = prefix.resolve("the-file")
val s3: S3ObjectsData = S3ObjectsData( val s3: S3ObjectsData = S3ObjectsData(
byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))), byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))),
@ -63,7 +64,7 @@ class S3MetaDataEnricherSuite
} }
describe("#2 local exists, remote is missing, remote no match, other matches - copy") { describe("#2 local exists, remote is missing, remote no match, other matches - copy") {
val theHash = MD5Hash("the-hash") val theHash = MD5Hash("the-hash")
val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey)
val otherRemoteKey = RemoteKey("other-key") val otherRemoteKey = RemoteKey("other-key")
val s3: S3ObjectsData = S3ObjectsData( val s3: S3ObjectsData = S3ObjectsData(
byHash = Map(theHash -> Set(KeyModified(otherRemoteKey, lastModified))), byHash = Map(theHash -> Set(KeyModified(otherRemoteKey, lastModified))),
@ -80,11 +81,8 @@ class S3MetaDataEnricherSuite
} }
describe("#3 local exists, remote is missing, remote no match, other no matches - upload") { describe("#3 local exists, remote is missing, remote no match, other no matches - upload") {
val theHash = MD5Hash("the-hash") val theHash = MD5Hash("the-hash")
val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey)
val s3: S3ObjectsData = S3ObjectsData( val s3: S3ObjectsData = S3ObjectsData()
byHash = Map(),
byKey = Map()
)
it("generates valid metadata") { it("generates valid metadata") {
val expected = S3MetaData(theFile, val expected = S3MetaData(theFile,
matchByHash = Set.empty, matchByHash = Set.empty,
@ -95,7 +93,7 @@ class S3MetaDataEnricherSuite
} }
describe("#4 local exists, remote exists, remote no match, other matches - copy") { describe("#4 local exists, remote exists, remote no match, other matches - copy") {
val theHash = MD5Hash("the-hash") val theHash = MD5Hash("the-hash")
val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey)
val theRemoteKey = theFile.remoteKey val theRemoteKey = theFile.remoteKey
val oldHash = MD5Hash("old-hash") val oldHash = MD5Hash("old-hash")
val otherRemoteKey = prefix.resolve("other-key") val otherRemoteKey = prefix.resolve("other-key")
@ -120,7 +118,7 @@ class S3MetaDataEnricherSuite
} }
describe("#5 local exists, remote exists, remote no match, other no matches - upload") { describe("#5 local exists, remote exists, remote no match, other no matches - upload") {
val theHash = MD5Hash("the-hash") val theHash = MD5Hash("the-hash")
val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey)
val theRemoteKey = theFile.remoteKey val theRemoteKey = theFile.remoteKey
val oldHash = MD5Hash("old-hash") val oldHash = MD5Hash("old-hash")
val s3: S3ObjectsData = S3ObjectsData( val s3: S3ObjectsData = S3ObjectsData(
@ -148,11 +146,11 @@ class S3MetaDataEnricherSuite
describe("getS3Status") { describe("getS3Status") {
val hash = MD5Hash("hash") val hash = MD5Hash("hash")
val localFile = LocalFile.resolve("the-file", md5HashMap(hash), source, fileToKey) val localFile = LocalFile.resolve("the-file", md5HashMap(hash), sourcePath, fileToKey)
val key = localFile.remoteKey val key = localFile.remoteKey
val keyOtherKey = LocalFile.resolve("other-key-same-hash", md5HashMap(hash), source, fileToKey) val keyOtherKey = LocalFile.resolve("other-key-same-hash", md5HashMap(hash), sourcePath, fileToKey)
val diffHash = MD5Hash("diff") val diffHash = MD5Hash("diff")
val keyDiffHash = LocalFile.resolve("other-key-diff-hash", md5HashMap(diffHash), source, fileToKey) val keyDiffHash = LocalFile.resolve("other-key-diff-hash", md5HashMap(diffHash), sourcePath, fileToKey)
val lastModified = LastModified(Instant.now) val lastModified = LastModified(Instant.now)
val s3ObjectsData: S3ObjectsData = S3ObjectsData( val s3ObjectsData: S3ObjectsData = S3ObjectsData(
byHash = Map( byHash = Map(
@ -175,7 +173,7 @@ class S3MetaDataEnricherSuite
} }
describe("when remote key does not exist and no others matches hash") { describe("when remote key does not exist and no others matches hash") {
val localFile = LocalFile.resolve("missing-file", md5HashMap(MD5Hash("unique")), source, fileToKey) val localFile = LocalFile.resolve("missing-file", md5HashMap(MD5Hash("unique")), sourcePath, fileToKey)
it("should return no matches by key") { it("should return no matches by key") {
val result = getMatchesByKey(invoke(localFile)) val result = getMatchesByKey(invoke(localFile))
assert(result.isEmpty) assert(result.isEmpty)

View file

@ -17,9 +17,11 @@ class SyncSuite
extends FunSpec { extends FunSpec {
private val source = Resource(this, "upload") private val source = Resource(this, "upload")
private val sourcePath = source.toPath
private val prefix = RemoteKey("prefix") private val prefix = RemoteKey("prefix")
private val configOptions = ConfigOptions(List( private val configOptions =
ConfigOption.Source(source.toPath), ConfigOptions(List(
ConfigOption.Source(sourcePath),
ConfigOption.Bucket("bucket"), ConfigOption.Bucket("bucket"),
ConfigOption.Prefix("prefix"), ConfigOption.Prefix("prefix"),
ConfigOption.IgnoreGlobalOptions, ConfigOption.IgnoreGlobalOptions,
@ -35,19 +37,23 @@ class SyncSuite
// source contains the files root-file and subdir/leaf-file // source contains the files root-file and subdir/leaf-file
val rootRemoteKey = RemoteKey("prefix/root-file") val rootRemoteKey = RemoteKey("prefix/root-file")
val leafRemoteKey = RemoteKey("prefix/subdir/leaf-file") val leafRemoteKey = RemoteKey("prefix/subdir/leaf-file")
val rootFile: LocalFile = LocalFile.resolve("root-file", md5HashMap(Root.hash), source, _ => rootRemoteKey) val rootFile: LocalFile =
LocalFile.resolve("root-file", md5HashMap(Root.hash), sourcePath, _ => rootRemoteKey)
val leafFile: LocalFile =
LocalFile.resolve("subdir/leaf-file", md5HashMap(Leaf.hash), sourcePath, _ => leafRemoteKey)
private def md5HashMap(md5Hash: MD5Hash): Map[String, MD5Hash] = { private def md5HashMap(md5Hash: MD5Hash): Map[String, MD5Hash] =
Map("md5" -> md5Hash) Map("md5" -> md5Hash)
}
val leafFile: LocalFile = LocalFile.resolve("subdir/leaf-file", md5HashMap(Leaf.hash), source, _ => leafRemoteKey) val hashService =
DummyHashService(Map(
val hashService = DummyHashService(Map(
file("root-file") -> Map("md5" -> MD5HashData.Root.hash), file("root-file") -> Map("md5" -> MD5HashData.Root.hash),
file("subdir/leaf-file") -> Map("md5" -> MD5HashData.Leaf.hash) file("subdir/leaf-file") -> Map("md5" -> MD5HashData.Leaf.hash)
)) ))
private def file(filename: String) =
sourcePath.resolve(Paths.get(filename))
def invokeSubject(storageService: StorageService, def invokeSubject(storageService: StorageService,
hashService: HashService, hashService: HashService,
configOptions: ConfigOptions): Either[List[String], SyncPlan] = { configOptions: ConfigOptions): Either[List[String], SyncPlan] = {
@ -62,9 +68,7 @@ class SyncSuite
} }
describe("when all files should be uploaded") { describe("when all files should be uploaded") {
val storageService = new RecordingStorageService(testBucket, S3ObjectsData( val storageService = new RecordingStorageService(testBucket, S3ObjectsData())
byHash = Map(),
byKey = Map()))
it("uploads all files") { it("uploads all files") {
val expected = Right(Set( val expected = Right(Set(
ToUpload(testBucket, rootFile, rootFile.file.length), ToUpload(testBucket, rootFile, rootFile.file.length),
@ -74,9 +78,6 @@ class SyncSuite
} }
} }
private def file(filename: String) =
source.toPath.resolve(Paths.get(filename)).toFile
describe("when no files should be uploaded") { describe("when no files should be uploaded") {
val s3ObjectsData = S3ObjectsData( val s3ObjectsData = S3ObjectsData(
byHash = Map( byHash = Map(

View file

@ -0,0 +1,43 @@
package net.kemitix.thorp.core
import java.io.{File, IOException, PrintWriter}
import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.{FileVisitResult, Files, Path, SimpleFileVisitor}
import scala.util.Try
trait TemporaryFolder {
def withDirectory(testCode: Path => Any): Any = {
val dir: Path = Files.createTempDirectory("thorp-temp")
val t = Try(testCode(dir))
remove(dir)
t.get
}
def remove(root: Path): Unit = {
Files.walkFileTree(root, new SimpleFileVisitor[Path] {
override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = {
Files.delete(file)
FileVisitResult.CONTINUE
}
override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult = {
Files.delete(dir)
FileVisitResult.CONTINUE
}
})
}
def writeFile(directory: Path, name: String, contents: String*): Unit = {
directory.toFile.mkdirs
val pw = new PrintWriter(directory.resolve(name).toFile, "UTF-8")
contents.foreach(pw.println)
pw.close()
}
def createFile(path: Path, name: String, content: String*): File = {
writeFile(path, name, content:_*)
path.resolve(name).toFile
}
}

View file

@ -1,10 +1,8 @@
package net.kemitix.thorp.domain package net.kemitix.thorp.domain
import java.io.File
final case class Config(bucket: Bucket = Bucket(""), final case class Config(bucket: Bucket = Bucket(""),
prefix: RemoteKey = RemoteKey(""), prefix: RemoteKey = RemoteKey(""),
filters: List[Filter] = List(), filters: List[Filter] = List(),
debug: Boolean = false, debug: Boolean = false,
batchMode: Boolean = false, batchMode: Boolean = false,
source: File) sources: Sources)

View file

@ -2,4 +2,4 @@ package net.kemitix.thorp.domain
import java.time.Instant import java.time.Instant
final case class LastModified(when: Instant) final case class LastModified(when: Instant = Instant.now)

View file

@ -21,9 +21,9 @@ final case class LocalFile(file: File, source: File, hashes: Map[String, MD5Hash
object LocalFile { object LocalFile {
def resolve(path: String, def resolve(path: String,
md5Hashes: Map[String, MD5Hash], md5Hashes: Map[String, MD5Hash],
source: File, source: Path,
fileToKey: File => RemoteKey): LocalFile = { pathToKey: Path => RemoteKey): LocalFile = {
val file = source.toPath.resolve(path).toFile val resolvedPath = source.resolve(path)
LocalFile(file, source, md5Hashes, fileToKey(file)) LocalFile(resolvedPath.toFile, source.toFile, md5Hashes, pathToKey(resolvedPath))
} }
} }

View file

@ -1,24 +1,28 @@
package net.kemitix.thorp.domain package net.kemitix.thorp.domain
import java.io.File import java.io.File
import java.nio.file.Paths import java.nio.file.{Path, Paths}
final case class RemoteKey(key: String) { final case class RemoteKey(key: String) {
def asFile(source: File, prefix: RemoteKey): File = def asFile(source: Path, prefix: RemoteKey): Option[File] =
source.toPath.resolve(relativeTo(prefix)).toFile if (key.length == 0) None
else Some(source.resolve(relativeTo(prefix)).toFile)
private def relativeTo(prefix: RemoteKey) = { private def relativeTo(prefix: RemoteKey) = {
prefix match { prefix match {
case RemoteKey("") => Paths.get(prefix.key) case RemoteKey("") => Paths.get(key)
case _ => Paths.get(prefix.key).relativize(Paths.get(key)) case _ => Paths.get(prefix.key).relativize(Paths.get(key))
} }
} }
def isMissingLocally(source: File, prefix: RemoteKey): Boolean = def isMissingLocally(sources: Sources, prefix: RemoteKey): Boolean =
! asFile(source, prefix).exists !sources.paths.exists(source => asFile(source, prefix) match {
case Some(file) => file.exists
case None => false
})
def resolve(path: String): RemoteKey = def resolve(path: String): RemoteKey =
RemoteKey(key + "/" + path) RemoteKey(List(key, path).filterNot(_.isEmpty).mkString("/"))
} }

View file

@ -3,5 +3,5 @@ package net.kemitix.thorp.domain
/** /**
* A list of objects and their MD5 hash values. * A list of objects and their MD5 hash values.
*/ */
final case class S3ObjectsData(byHash: Map[MD5Hash, Set[KeyModified]], final case class S3ObjectsData(byHash: Map[MD5Hash, Set[KeyModified]] = Map.empty,
byKey: Map[RemoteKey, HashModified]) byKey: Map[RemoteKey, HashModified] = Map.empty)

View file

@ -0,0 +1,26 @@
package net.kemitix.thorp.domain
import java.nio.file.Path
/**
* The paths to synchronise with target.
*
* The first source path takes priority over those later in the list,
* etc. Where there is any file with the same relative path within
* more than one source, the file in the first listed path is
* uploaded, and the others are ignored.
*/
case class Sources(paths: List[Path]) {
def ++(path: Path): Sources = this ++ List(path)
def ++(otherPaths: List[Path]): Sources = Sources(paths ++ otherPaths)
/**
* Returns the source path for the given path.
*
* @param path the path to find the matching source
* @return the source for the path
* @throws NoSuchElementException if no source matches the path
*/
def forPath(path: Path): Path =
paths.find(source => path.startsWith(source)).get
}

View file

@ -0,0 +1,72 @@
package net.kemitix.thorp.domain
import java.io.File
import java.nio.file.Paths
import org.scalatest.FreeSpec
class RemoteKeyTest extends FreeSpec {
private val emptyKey = RemoteKey("")
"create a RemoteKey" - {
"can resolve a path" - {
"when key is empty" in {
val key = emptyKey
val path = "path"
val expected = RemoteKey("path")
val result = key.resolve(path)
assertResult(expected)(result)
}
"when path is empty" in {
val key = RemoteKey("key")
val path = ""
val expected = RemoteKey("key")
val result = key.resolve(path)
assertResult(expected)(result)
}
"when key and path are empty" in {
val key = emptyKey
val path = ""
val expected = emptyKey
val result = key.resolve(path)
assertResult(expected)(result)
}
}
"asFile" - {
"when key and prefix are non-empty" in {
val key = RemoteKey("prefix/key")
val source = Paths.get("source")
val prefix = RemoteKey("prefix")
val expected = Some(new File("source/key"))
val result = key.asFile(source, prefix)
assertResult(expected)(result)
}
"when prefix is empty" in {
val key = RemoteKey("key")
val source = Paths.get("source")
val prefix = emptyKey
val expected = Some(new File("source/key"))
val result = key.asFile(source, prefix)
assertResult(expected)(result)
}
"when key is empty" in {
val key = emptyKey
val source = Paths.get("source")
val prefix = RemoteKey("prefix")
val expected = None
val result = key.asFile(source, prefix)
assertResult(expected)(result)
}
"when key and prefix are empty" in {
val key = emptyKey
val source = Paths.get("source")
val prefix = emptyKey
val expected = None
val result = key.asFile(source, prefix)
assertResult(expected)(result)
}
}
}
}

View file

@ -1,6 +1,6 @@
package net.kemitix.thorp.storage.api package net.kemitix.thorp.storage.api
import java.io.File import java.nio.file.Path
import cats.effect.IO import cats.effect.IO
import net.kemitix.thorp.domain.{Logger, MD5Hash} import net.kemitix.thorp.domain.{Logger, MD5Hash}
@ -10,6 +10,7 @@ import net.kemitix.thorp.domain.{Logger, MD5Hash}
*/ */
trait HashService { trait HashService {
def hashLocalObject(file: File)(implicit l: Logger): IO[Map[String, MD5Hash]] def hashLocalObject(path: Path)
(implicit l: Logger): IO[Map[String, MD5Hash]]
} }

View file

@ -1,6 +1,6 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import java.io.File import java.nio.file.Path
import cats.effect.IO import cats.effect.IO
import cats.implicits._ import cats.implicits._
@ -12,11 +12,11 @@ import net.kemitix.thorp.domain.{Logger, MD5Hash}
trait ETagGenerator { trait ETagGenerator {
def eTag(file: File)(implicit l: Logger): IO[String]= { def eTag(path: Path)(implicit l: Logger): IO[String]= {
val partSize = calculatePartSize(file) val partSize = calculatePartSize(path)
val parts = numParts(file.length, partSize) val parts = numParts(path.toFile.length, partSize)
partsIndex(parts) partsIndex(parts)
.map(digestChunk(file, partSize)).sequence .map(digestChunk(path, partSize)).sequence
.map(concatenateDigests) .map(concatenateDigests)
.map(MD5HashGenerator.hex) .map(MD5HashGenerator.hex)
.map(hash => s"$hash-$parts") .map(hash => s"$hash-$parts")
@ -28,8 +28,8 @@ trait ETagGenerator {
private def concatenateDigests: List[Array[Byte]] => Array[Byte] = private def concatenateDigests: List[Array[Byte]] => Array[Byte] =
lab => lab.foldLeft(Array[Byte]())((acc, ab) => acc ++ ab) lab => lab.foldLeft(Array[Byte]())((acc, ab) => acc ++ ab)
private def calculatePartSize(file: File) = { private def calculatePartSize(path: Path) = {
val request = new PutObjectRequest("", "", file) val request = new PutObjectRequest("", "", path.toFile)
val configuration = new TransferManagerConfiguration val configuration = new TransferManagerConfiguration
TransferManagerUtils.calculateOptimalPartSize(request, configuration) TransferManagerUtils.calculateOptimalPartSize(request, configuration)
} }
@ -43,11 +43,11 @@ trait ETagGenerator {
def offsets(totalFileSizeBytes: Long, optimalPartSize: Long): List[Long] = def offsets(totalFileSizeBytes: Long, optimalPartSize: Long): List[Long] =
Range.Long(0, totalFileSizeBytes, optimalPartSize).toList Range.Long(0, totalFileSizeBytes, optimalPartSize).toList
def digestChunk(file: File, chunkSize: Long)(chunkNumber: Long)(implicit l: Logger): IO[Array[Byte]] = def digestChunk(path: Path, chunkSize: Long)(chunkNumber: Long)(implicit l: Logger): IO[Array[Byte]] =
hashChunk(file, chunkNumber, chunkSize).map(_.digest) hashChunk(path, chunkNumber, chunkSize).map(_.digest)
def hashChunk(file: File, chunkNumber: Long, chunkSize: Long)(implicit l: Logger): IO[MD5Hash] = def hashChunk(path: Path, chunkNumber: Long, chunkSize: Long)(implicit l: Logger): IO[MD5Hash] =
MD5HashGenerator.md5FileChunk(file, chunkNumber * chunkSize, chunkSize) MD5HashGenerator.md5FileChunk(path, chunkNumber * chunkSize, chunkSize)
} }
object ETagGenerator extends ETagGenerator object ETagGenerator extends ETagGenerator

View file

@ -1,6 +1,6 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import java.io.File import java.nio.file.Path
import cats.effect.IO import cats.effect.IO
import net.kemitix.thorp.core.MD5HashGenerator import net.kemitix.thorp.core.MD5HashGenerator
@ -12,13 +12,14 @@ trait S3HashService extends HashService {
/** /**
* Generates an MD5 Hash and an multi-part ETag * Generates an MD5 Hash and an multi-part ETag
* *
* @param file the local file to scan * @param path the local path to scan
* @return a set of hash values * @return a set of hash values
*/ */
override def hashLocalObject(file: File)(implicit l: Logger): IO[Map[String, MD5Hash]] = override def hashLocalObject(path: Path)
(implicit l: Logger): IO[Map[String, MD5Hash]] =
for { for {
md5 <- MD5HashGenerator.md5File(file) md5 <- MD5HashGenerator.md5File(path)
etag <- ETagGenerator.eTag(file).map(MD5Hash(_)) etag <- ETagGenerator.eTag(path).map(MD5Hash(_))
} yield Map( } yield Map(
"md5" -> md5, "md5" -> md5,
"etag" -> etag "etag" -> etag

View file

@ -8,6 +8,7 @@ import org.scalatest.FunSpec
class ETagGeneratorTest extends FunSpec { class ETagGeneratorTest extends FunSpec {
private val bigFile = Resource(this, "big-file") private val bigFile = Resource(this, "big-file")
private val bigFilePath = bigFile.toPath
private val configuration = new TransferManagerConfiguration private val configuration = new TransferManagerConfiguration
private val chunkSize = 1200000 private val chunkSize = 1200000
configuration.setMinimumUploadPartSize(chunkSize) configuration.setMinimumUploadPartSize(chunkSize)
@ -35,7 +36,7 @@ class ETagGeneratorTest extends FunSpec {
"8a0c1d0778ac8fcf4ca2010eba4711eb" "8a0c1d0778ac8fcf4ca2010eba4711eb"
).zipWithIndex ).zipWithIndex
md5Hashes.foreach { case (hash, index) => md5Hashes.foreach { case (hash, index) =>
test(hash, ETagGenerator.hashChunk(bigFile, index, chunkSize)(logger).unsafeRunSync) test(hash, ETagGenerator.hashChunk(bigFilePath, index, chunkSize)(logger).unsafeRunSync)
} }
} }
} }
@ -43,7 +44,7 @@ class ETagGeneratorTest extends FunSpec {
describe("create etag for whole file") { describe("create etag for whole file") {
val expected = "f14327c90ad105244c446c498bfe9a7d-2" val expected = "f14327c90ad105244c446c498bfe9a7d-2"
it("should match aws etag for the file") { it("should match aws etag for the file") {
val result = ETagGenerator.eTag(bigFile)(logger).unsafeRunSync val result = ETagGenerator.eTag(bigFilePath)(logger).unsafeRunSync
assertResult(expected)(result) assertResult(expected)(result)
} }
} }

View file

@ -18,8 +18,9 @@ class S3StorageServiceSuite
describe("listObjectsInPrefix") { describe("listObjectsInPrefix") {
val source = Resource(this, "upload") val source = Resource(this, "upload")
val sourcePath = source.toPath
val prefix = RemoteKey("prefix") val prefix = RemoteKey("prefix")
implicit val config: Config = Config(Bucket("bucket"), prefix, source = source) implicit val config: Config = Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
implicit val implLogger: Logger = new DummyLogger implicit val implLogger: Logger = new DummyLogger
val lm = LastModified(Instant.now.truncatedTo(ChronoUnit.MILLIS)) val lm = LastModified(Instant.now.truncatedTo(ChronoUnit.MILLIS))

View file

@ -18,19 +18,20 @@ class StorageServiceSuite
with MockFactory { with MockFactory {
val source = Resource(this, "upload") val source = Resource(this, "upload")
val sourcePath = source.toPath
private val prefix = RemoteKey("prefix") private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) implicit private val config: Config = Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
implicit private val implLogger: Logger = new DummyLogger implicit private val implLogger: Logger = new DummyLogger
private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _ private val fileToKey = KeyGenerator.generateKey(config.sources, config.prefix) _
describe("getS3Status") { describe("getS3Status") {
val hash = MD5Hash("hash") val hash = MD5Hash("hash")
val localFile = LocalFile.resolve("the-file", md5HashMap(hash), source, fileToKey) val localFile = LocalFile.resolve("the-file", md5HashMap(hash), sourcePath, fileToKey)
val key = localFile.remoteKey val key = localFile.remoteKey
val keyOtherKey = LocalFile.resolve("other-key-same-hash", md5HashMap(hash), source, fileToKey) val keyOtherKey = LocalFile.resolve("other-key-same-hash", md5HashMap(hash), sourcePath, fileToKey)
val diffHash = MD5Hash("diff") val diffHash = MD5Hash("diff")
val keyDiffHash = LocalFile.resolve("other-key-diff-hash", md5HashMap(diffHash), source, fileToKey) val keyDiffHash = LocalFile.resolve("other-key-diff-hash", md5HashMap(diffHash), sourcePath, fileToKey)
val lastModified = LastModified(Instant.now) val lastModified = LastModified(Instant.now)
val s3ObjectsData: S3ObjectsData = S3ObjectsData( val s3ObjectsData: S3ObjectsData = S3ObjectsData(
byHash = Map( byHash = Map(
@ -70,7 +71,7 @@ class StorageServiceSuite
} }
describe("when remote key does not exist and no others matches hash") { describe("when remote key does not exist and no others matches hash") {
val localFile = LocalFile.resolve("missing-file", md5HashMap(MD5Hash("unique")), source, fileToKey) val localFile = LocalFile.resolve("missing-file", md5HashMap(MD5Hash("unique")), sourcePath, fileToKey)
it("should return no matches by key") { it("should return no matches by key") {
val result = getMatchesByKey(invoke(localFile)) val result = getMatchesByKey(invoke(localFile))
assert(result.isEmpty) assert(result.isEmpty)
@ -113,7 +114,7 @@ class StorageServiceSuite
val prefix = RemoteKey("prefix") val prefix = RemoteKey("prefix")
val localFile = val localFile =
LocalFile.resolve("root-file", md5HashMap(Root.hash), source, KeyGenerator.generateKey(source, prefix)) LocalFile.resolve("root-file", md5HashMap(Root.hash), sourcePath, KeyGenerator.generateKey(config.sources, prefix))
val bucket = Bucket("a-bucket") val bucket = Bucket("a-bucket")
val remoteKey = RemoteKey("prefix/root-file") val remoteKey = RemoteKey("prefix/root-file")
val uploadEventListener = new UploadEventListener(localFile, 1, SyncTotals(), 0L) val uploadEventListener = new UploadEventListener(localFile, 1, SyncTotals(), 0L)

View file

@ -16,10 +16,11 @@ class UploaderSuite
with MockFactory { with MockFactory {
private val source = Resource(this, ".") private val source = Resource(this, ".")
private val sourcePath = source.toPath
private val prefix = RemoteKey("prefix") private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) implicit private val config: Config = Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath)))
implicit private val implLogger: Logger = new DummyLogger implicit private val implLogger: Logger = new DummyLogger
private val fileToKey = generateKey(config.source, config.prefix) _ private val fileToKey = generateKey(config.sources, config.prefix) _
val lastModified = LastModified(Instant.now()) val lastModified = LastModified(Instant.now())
def md5HashMap(hash: MD5Hash): Map[String, MD5Hash] = def md5HashMap(hash: MD5Hash): Map[String, MD5Hash] =
@ -38,7 +39,7 @@ class UploaderSuite
// dies when putObject is called // dies when putObject is called
val returnedKey = RemoteKey("returned-key") val returnedKey = RemoteKey("returned-key")
val returnedHash = MD5Hash("returned-hash") val returnedHash = MD5Hash("returned-hash")
val bigFile = LocalFile.resolve("small-file", md5HashMap(MD5Hash("the-hash")), source, fileToKey) val bigFile = LocalFile.resolve("small-file", md5HashMap(MD5Hash("the-hash")), sourcePath, fileToKey)
val uploadEventListener = new UploadEventListener(bigFile, 1, SyncTotals(), 0L) val uploadEventListener = new UploadEventListener(bigFile, 1, SyncTotals(), 0L)
val amazonS3 = mock[AmazonS3] val amazonS3 = mock[AmazonS3]
val amazonS3TransferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build val amazonS3TransferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build