diff --git a/CHANGELOG.org b/CHANGELOG.org index b4f6a6a..eb519c0 100644 --- a/CHANGELOG.org +++ b/CHANGELOG.org @@ -12,6 +12,7 @@ The format is based on [[https://keepachangelog.com/en/1.0.0/][Keep a Changelog] - Add a version command-line option (#99) - Add a batch mode (#85) - Display total size and progress for entire run (#94) + - Sync more than one source directory into a single bucket/prefile (#25) * [0.6.1] - 2019-07-03 diff --git a/README.org b/README.org index 4ca6272..269bd2b 100644 --- a/README.org +++ b/README.org @@ -33,6 +33,10 @@ If you don't provide a ~source~ the current diretory will be used. The ~--include~ and ~--exclude~ parameters can be used more than once. +The ~--source~ parameter can be used more than once, in which case, +all files in all sources will be consolidated into the same +bucket/prefix. + ** Batch mode Batch mode disable the ANSI console display and logs simple messages @@ -46,15 +50,18 @@ that can be written to a file. - User: ~ ~/.config/thorp.conf~ - Source: ~${source}/.thorp.conf~ -Command line arguments override those in Source, which override those -in User, which override those Global, which override any built-in -config. + Command line arguments override those in Source, which override + those in User, which override those Global, which override any + built-in config. -Built-in config consists of using the current working directory as the -~source~. + When there is more than one source, only the first ".thorp.conf" + file found will be used. -Note, that ~include~ and ~exclude~ are cumulative across all -configuration files. + Built-in config consists of using the current working directory as + the ~source~. + + Note, that ~include~ and ~exclude~ are cumulative across all + configuration files. * Behaviour diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/ParseArgs.scala b/cli/src/main/scala/net/kemitix/thorp/cli/ParseArgs.scala index 6de4dba..e16f56f 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/ParseArgs.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/ParseArgs.scala @@ -20,6 +20,7 @@ object ParseArgs { .action((_, cos) => ConfigOption.BatchMode :: cos) .text("Enable batch-mode"), opt[String]('s', "source") + .unbounded() .action((str, cos) => ConfigOption.Source(Paths.get(str)) :: cos) .text("Source directory to sync to destination"), opt[String]('b', "bucket") diff --git a/cli/src/test/scala/net/kemitix/thorp/cli/ParseArgsTest.scala b/cli/src/test/scala/net/kemitix/thorp/cli/ParseArgsTest.scala index 94f308e..f7dbd25 100644 --- a/cli/src/test/scala/net/kemitix/thorp/cli/ParseArgsTest.scala +++ b/cli/src/test/scala/net/kemitix/thorp/cli/ParseArgsTest.scala @@ -1,7 +1,9 @@ package net.kemitix.thorp.cli +import java.nio.file.Paths + import net.kemitix.thorp.core.ConfigOption.Debug -import net.kemitix.thorp.core.{ConfigOptions, Resource} +import net.kemitix.thorp.core.{ConfigOptions, ConfigQuery, Resource} import org.scalatest.FunSpec import scala.util.Try @@ -15,14 +17,27 @@ class ParseArgsTest extends FunSpec { ParseArgs(List("--source", path, "--bucket", "bucket")) describe("when source is a directory") { - val result = invokeWithSource(pathTo(".")) it("should succeed") { + val result = invokeWithSource(pathTo(".")) assert(result.isDefined) } } describe("when source is a relative path to a directory") { + val result = invokeWithSource(pathTo(".")) it("should succeed") {pending} } + describe("when there are multiple sources") { + val args = List( + "--source", "path1", + "--source", "path2", + "--bucket", "bucket") + it("should get multiple sources") { + val expected = Some(Set("path1", "path2").map(Paths.get(_))) + val configOptions = ParseArgs(args) + val result = configOptions.map(ConfigQuery.sources(_).paths.toSet) + assertResult(expected)(result) + } + } } describe("parse - debug") { diff --git a/cli/src/test/scala/net/kemitix/thorp/cli/ProgramTest.scala b/cli/src/test/scala/net/kemitix/thorp/cli/ProgramTest.scala index 3aa77f3..aecb21a 100644 --- a/cli/src/test/scala/net/kemitix/thorp/cli/ProgramTest.scala +++ b/cli/src/test/scala/net/kemitix/thorp/cli/ProgramTest.scala @@ -1,22 +1,24 @@ package net.kemitix.thorp.cli import java.io.File +import java.nio.file.Path import cats.data.EitherT import cats.effect.IO import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.core._ -import net.kemitix.thorp.domain.{Bucket, LocalFile, Logger, MD5Hash, RemoteKey, StorageQueueEvent} +import net.kemitix.thorp.domain._ import net.kemitix.thorp.storage.api.{HashService, StorageService} import org.scalatest.FunSpec class ProgramTest extends FunSpec { val source: File = Resource(this, ".") + val sourcePath: Path = source.toPath val bucket: Bucket = Bucket("aBucket") val hash: MD5Hash = MD5Hash("aHash") val copyAction: Action = ToCopy(bucket, RemoteKey("copy-me"), hash, RemoteKey("overwrite-me"), 17L) - val uploadAction: Action = ToUpload(bucket, LocalFile.resolve("aFile", Map(), source, _ => RemoteKey("upload-me")), 23L) + val uploadAction: Action = ToUpload(bucket, LocalFile.resolve("aFile", Map(), sourcePath, _ => RemoteKey("upload-me")), 23L) val deleteAction: Action = ToDelete(bucket, RemoteKey("delete-me"), 0L) val configOptions: ConfigOptions = ConfigOptions(options = List( diff --git a/core/src/main/scala/net/kemitix/thorp/core/ActionGenerator.scala b/core/src/main/scala/net/kemitix/thorp/core/ActionGenerator.scala index 3aadc2a..1a1ab13 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ActionGenerator.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ActionGenerator.scala @@ -5,7 +5,17 @@ import net.kemitix.thorp.domain._ object ActionGenerator { - def createActions(s3MetaData: S3MetaData) + def remoteNameNotAlreadyQueued(localFile: LocalFile, + previousActions: Stream[Action]): Boolean = { + val key = localFile.remoteKey.key + !previousActions.exists { + case ToUpload(_, lf, _) => lf.remoteKey.key equals key + case _ => false + } + } + + def createActions(s3MetaData: S3MetaData, + previousActions: Stream[Action]) (implicit c: Config): Stream[Action] = s3MetaData match { @@ -21,7 +31,8 @@ object ActionGenerator { // #3 local exists, remote is missing, other no matches - upload case S3MetaData(localFile, otherMatches, None) - if otherMatches.isEmpty + if otherMatches.isEmpty && + remoteNameNotAlreadyQueued(localFile, previousActions) => uploadFile(c.bucket, localFile) // #4 local exists, remote exists, remote no match, other matches - copy @@ -35,6 +46,8 @@ object ActionGenerator { if hashMatches.isEmpty => uploadFile(c.bucket, localFile) + case S3MetaData(localFile, _, _) => + doNothing(c.bucket, localFile.remoteKey) } private def doNothing(bucket: Bucket, diff --git a/core/src/main/scala/net/kemitix/thorp/core/ConfigOption.scala b/core/src/main/scala/net/kemitix/thorp/core/ConfigOption.scala index ee2a99c..ca97d96 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ConfigOption.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ConfigOption.scala @@ -17,13 +17,21 @@ object ConfigOption { override def update(config: Config): Config = config.copy(batchMode = true) } case class Source(path: Path) extends ConfigOption { - override def update(config: Config): Config = config.copy(source = path.toFile) + override def update(config: Config): Config = config.copy(sources = config.sources ++ path) } case class Bucket(name: String) extends ConfigOption { - override def update(config: Config): Config = config.copy(bucket = domain.Bucket(name)) + override def update(config: Config): Config = + if (config.bucket.name.isEmpty) + config.copy(bucket = domain.Bucket(name)) + else + config } case class Prefix(path: String) extends ConfigOption { - override def update(config: Config): Config = config.copy(prefix = RemoteKey(path)) + override def update(config: Config): Config = + if (config.prefix.key.isEmpty) + config.copy(prefix = RemoteKey(path)) + else + config } case class Include(pattern: String) extends ConfigOption { override def update(config: Config): Config = config.copy(filters = domain.Filter.Include(pattern) :: config.filters) diff --git a/core/src/main/scala/net/kemitix/thorp/core/ConfigQuery.scala b/core/src/main/scala/net/kemitix/thorp/core/ConfigQuery.scala index 1ace770..e586035 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ConfigQuery.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ConfigQuery.scala @@ -1,5 +1,7 @@ package net.kemitix.thorp.core +import net.kemitix.thorp.domain.Sources + trait ConfigQuery { def showVersion(configOptions: ConfigOptions): Boolean = @@ -14,6 +16,13 @@ trait ConfigQuery { def ignoreGlobalOptions(configOptions: ConfigOptions): Boolean = configOptions contains ConfigOption.IgnoreGlobalOptions + def sources(configOptions: ConfigOptions): Sources = { + val paths = configOptions.options.flatMap( { + case ConfigOption.Source(sourcePath) => Some(sourcePath) + case _ => None + }) + Sources(paths) + } } object ConfigQuery extends ConfigQuery diff --git a/core/src/main/scala/net/kemitix/thorp/core/ConfigValidator.scala b/core/src/main/scala/net/kemitix/thorp/core/ConfigValidator.scala index 5a6a48f..5444c42 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ConfigValidator.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ConfigValidator.scala @@ -1,33 +1,40 @@ package net.kemitix.thorp.core -import java.io.File +import java.nio.file.Path import cats.data.{NonEmptyChain, Validated, ValidatedNec} import cats.implicits._ -import net.kemitix.thorp.domain.{Bucket, Config} +import net.kemitix.thorp.domain.{Bucket, Config, Sources} sealed trait ConfigValidator { type ValidationResult[A] = ValidatedNec[ConfigValidation, A] - def validateSourceIsDirectory(source: File): ValidationResult[File] = - if(source.isDirectory) source.validNec + def validateSourceIsDirectory(source: Path): ValidationResult[Path] = + if(source.toFile.isDirectory) source.validNec else ConfigValidation.SourceIsNotADirectory.invalidNec - def validateSourceIsReadable(source: File): ValidationResult[File] = - if(source.canRead) source.validNec + def validateSourceIsReadable(source: Path): ValidationResult[Path] = + if(source.toFile.canRead) source.validNec else ConfigValidation.SourceIsNotReadable.invalidNec - def validateSource(source: File): ValidationResult[File] = - validateSourceIsDirectory(source).andThen(s => validateSourceIsReadable(s)) + def validateSource(source: Path): ValidationResult[Path] = + validateSourceIsDirectory(source) + .andThen(s => + validateSourceIsReadable(s)) def validateBucket(bucket: Bucket): ValidationResult[Bucket] = if (bucket.name.isEmpty) ConfigValidation.BucketNameIsMissing.invalidNec else bucket.validNec + def validateSources(sources: Sources): ValidationResult[Sources] = + sources.paths + .map(validateSource).sequence + .map(_ => sources) + def validateConfig(config: Config): Validated[NonEmptyChain[ConfigValidation], Config] = ( - validateSource(config.source), + validateSources(config.sources), validateBucket(config.bucket) ).mapN((_, _) => config) } diff --git a/core/src/main/scala/net/kemitix/thorp/core/ConfigurationBuilder.scala b/core/src/main/scala/net/kemitix/thorp/core/ConfigurationBuilder.scala index 615c9ba..05851cd 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ConfigurationBuilder.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ConfigurationBuilder.scala @@ -1,13 +1,13 @@ package net.kemitix.thorp.core -import java.io.File -import java.nio.file.Paths +import java.nio.file.{Files, Path, Paths} import cats.data.NonEmptyChain import cats.effect.IO +import cats.implicits._ import net.kemitix.thorp.core.ConfigValidator.validateConfig import net.kemitix.thorp.core.ParseConfigFile.parseFile -import net.kemitix.thorp.domain.Config +import net.kemitix.thorp.domain.{Config, Sources} /** * Builds a configuration from settings in a file within the @@ -15,14 +15,11 @@ import net.kemitix.thorp.domain.Config */ trait ConfigurationBuilder { - private val pwdFile: File = Paths.get(System.getenv("PWD")).toFile - - private val defaultConfig: Config = Config(source = pwdFile) def buildConfig(priorityOptions: ConfigOptions): IO[Either[NonEmptyChain[ConfigValidation], Config]] = { - val source = findSource(priorityOptions) + val sources = ConfigQuery.sources(priorityOptions) for { - sourceOptions <- sourceOptions(source) + sourceOptions <- sourceOptions(sources) userOptions <- userOptions(priorityOptions ++ sourceOptions) globalOptions <- globalOptions(priorityOptions ++ sourceOptions ++ userOptions) collected = priorityOptions ++ sourceOptions ++ userOptions ++ globalOptions @@ -30,14 +27,39 @@ trait ConfigurationBuilder { } yield validateConfig(config).toEither } - private def findSource(priorityOptions: ConfigOptions): File = - priorityOptions.options.foldRight(pwdFile)((co, f) => co match { - case ConfigOption.Source(source) => source.toFile - case _ => f - }) + private def sourceOptions(sources: Sources): IO[ConfigOptions] = { + def existingThorpConfigFiles(sources: Sources) = + sources.paths + .map(_.resolve(".thorp.config")) + .filter(Files.exists(_)) - private def sourceOptions(source: File): IO[ConfigOptions] = - readFile(source, ".thorp.conf") + def filterForSources: IO[ConfigOptions] => IO[(Sources, ConfigOptions)] = + for {configOptions <- _} yield (ConfigQuery.sources(configOptions), configOptions) + + def recurseIntoSources: IO[(Sources, ConfigOptions)] => IO[ConfigOptions] = + ioSourcesConfigOptions => + for { + sourcesConfigOptions <- ioSourcesConfigOptions + (sources, configOptions) = sourcesConfigOptions + moreSourcesConfigOptions <- filterForSources(sourceOptions(sources)) + (_, moreConfigOptions) = moreSourcesConfigOptions + } yield configOptions ++ moreConfigOptions + + def emptyConfig: IO[ConfigOptions] = IO.pure(ConfigOptions()) + + def collectConfigOptions: (IO[ConfigOptions], IO[ConfigOptions]) => IO[ConfigOptions] = + (ioConfigOptions, ioAcc) => + for { + configOptions <- ioConfigOptions + acc <- ioAcc + } yield configOptions ++ acc + + existingThorpConfigFiles(sources) + .map(ParseConfigFile.parseFile) + .map(filterForSources) + .map(recurseIntoSources) + .foldRight(emptyConfig)(collectConfigOptions) + } private def userOptions(higherPriorityOptions: ConfigOptions): IO[ConfigOptions] = if (ConfigQuery.ignoreUserOptions(higherPriorityOptions)) IO(ConfigOptions()) @@ -47,13 +69,22 @@ trait ConfigurationBuilder { if (ConfigQuery.ignoreGlobalOptions(higherPriorityOptions)) IO(ConfigOptions()) else parseFile(Paths.get("/etc/thorp.conf")) - private def userHome = new File(System.getProperty("user.home")) + private def userHome = Paths.get(System.getProperty("user.home")) - private def readFile(source: File, filename: String): IO[ConfigOptions] = - parseFile(source.toPath.resolve(filename)) + private def readFile(source: Path, filename: String): IO[ConfigOptions] = + parseFile(source.resolve(filename)) - private def collateOptions(configOptions: ConfigOptions): Config = - configOptions.options.foldRight(defaultConfig)((co, c) => co.update(c)) + private def collateOptions(configOptions: ConfigOptions): Config = { + val pwd = Paths.get(System.getenv("PWD")) + val initialSource = + if (noSourcesProvided(configOptions)) List(pwd) else List() + val initialConfig = Config(sources = Sources(initialSource)) + configOptions.options.foldLeft(initialConfig)((c, co) => co.update(c)) + } + + private def noSourcesProvided(configOptions: ConfigOptions) = { + ConfigQuery.sources(configOptions).paths.isEmpty + } } object ConfigurationBuilder extends ConfigurationBuilder diff --git a/core/src/main/scala/net/kemitix/thorp/core/KeyGenerator.scala b/core/src/main/scala/net/kemitix/thorp/core/KeyGenerator.scala index 25ceb1a..0b9ff3f 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/KeyGenerator.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/KeyGenerator.scala @@ -1,17 +1,19 @@ package net.kemitix.thorp.core -import java.io.File +import java.nio.file.Path -import net.kemitix.thorp.domain.RemoteKey +import net.kemitix.thorp.domain.{RemoteKey, Sources} object KeyGenerator { - def generateKey(source: File, prefix: RemoteKey) - (file: File): RemoteKey = { - val otherPath = file.toPath.toAbsolutePath - val sourcePath = source.toPath - val relativePath = sourcePath.relativize(otherPath) - RemoteKey(s"${prefix.key}/$relativePath") + def generateKey(sources: Sources, + prefix: RemoteKey) + (path: Path): RemoteKey = { + val source = sources.forPath(path) + val relativePath = source.relativize(path.toAbsolutePath) + RemoteKey(List(prefix.key, relativePath.toString) + .filterNot(_.isEmpty) + .mkString("/")) } } diff --git a/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala b/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala index 4876978..1647080 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala @@ -11,58 +11,66 @@ import net.kemitix.thorp.storage.api.HashService object LocalFileStream { - def findFiles(file: File, + def findFiles(source: Path, hashService: HashService) (implicit c: Config, logger: Logger): IO[LocalFiles] = { - val filters: Path => Boolean = Filter.isIncluded(c.filters) + val isIncluded: Path => Boolean = Filter.isIncluded(c.filters) - def loop(file: File): IO[LocalFiles] = { + def loop(path: Path): IO[LocalFiles] = { - def dirPaths(file: File): IO[Stream[File]] = - IO(listFiles(file)) + def dirPaths(path: Path): IO[Stream[Path]] = + IO(listFiles(path)) .map(fs => Stream(fs: _*) - .filter(f => filters(f.toPath))) + .map(_.toPath) + .filter(isIncluded)) - def recurseIntoSubDirectories(file: File): IO[LocalFiles] = - file match { - case f if f.isDirectory => loop(file) - case _ => localFile(hashService, file) + def recurseIntoSubDirectories(path: Path): IO[LocalFiles] = + path.toFile match { + case f if f.isDirectory => loop(path) + case _ => localFile(hashService, path) } - def recurse(fs: Stream[File]): IO[LocalFiles] = - fs.foldLeft(IO.pure(LocalFiles()))((acc, f) => - recurseIntoSubDirectories(f) + def recurse(paths: Stream[Path]): IO[LocalFiles] = + paths.foldLeft(IO.pure(LocalFiles()))((acc, path) => + recurseIntoSubDirectories(path) .flatMap(localFiles => acc.map(accLocalFiles => accLocalFiles ++ localFiles))) for { - _ <- logger.debug(s"- Entering: $file") - fs <- dirPaths(file) + _ <- logger.debug(s"- Entering: $path") + fs <- dirPaths(path) lfs <- recurse(fs) - _ <- logger.debug(s"- Leaving : $file") + _ <- logger.debug(s"- Leaving : $path") } yield lfs } - loop(file) + loop(source) } private def localFile(hashService: HashService, - file: File) + path: Path) (implicit l: Logger, c: Config) = { + val file = path.toFile + val source = c.sources.forPath(path) for { - hash <- hashService.hashLocalObject(file) + hash <- hashService.hashLocalObject(path) } yield LocalFiles( - localFiles = Stream(domain.LocalFile(file, c.source, hash, generateKey(c.source, c.prefix)(file))), + localFiles = Stream( + domain.LocalFile( + file, + source.toFile, + hash, + generateKey(c.sources, c.prefix)(path))), count = 1, totalSizeBytes = file.length) } //TODO: Change this to return an Either[IllegalArgumentException, Array[File]] - private def listFiles(file: File) = { - Option(file.listFiles) - .getOrElse(throw new IllegalArgumentException(s"Directory not found $file")) + private def listFiles(path: Path): Array[File] = { + Option(path.toFile.listFiles) + .getOrElse(throw new IllegalArgumentException(s"Directory not found $path")) } } diff --git a/core/src/main/scala/net/kemitix/thorp/core/MD5HashGenerator.scala b/core/src/main/scala/net/kemitix/thorp/core/MD5HashGenerator.scala index 980dbfb..78b29a9 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/MD5HashGenerator.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/MD5HashGenerator.scala @@ -1,6 +1,7 @@ package net.kemitix.thorp.core import java.io.{File, FileInputStream} +import java.nio.file.Path import java.security.MessageDigest import cats.effect.IO @@ -25,8 +26,8 @@ object MD5HashGenerator { md5.digest } - def md5File(file: File)(implicit logger: Logger): IO[MD5Hash] = - md5FileChunk(file, 0, file.length) + def md5File(path: Path)(implicit logger: Logger): IO[MD5Hash] = + md5FileChunk(path, 0, path.toFile.length) private def openFile(file: File, offset: Long) = IO { val stream = new FileInputStream(file) @@ -68,16 +69,17 @@ object MD5HashGenerator { result.toInt } - def md5FileChunk(file: File, + def md5FileChunk(path: Path, offset: Long, size: Long) (implicit logger: Logger): IO[MD5Hash] = { + val file = path.toFile val endOffset = Math.min(offset + size, file.length) for { - _ <- logger.debug(s"md5:reading:size ${file.length}:$file") + _ <- logger.debug(s"md5:reading:size ${file.length}:$path") digest <- readFile(file, offset, endOffset) hash = MD5Hash.fromDigest(digest) - _ <- logger.debug(s"md5:generated:${hash.hash}:$file") + _ <- logger.debug(s"md5:generated:${hash.hash}:$path") } yield hash } diff --git a/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala b/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala index c02d870..b6b3e37 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/PlanBuilder.scala @@ -43,7 +43,7 @@ trait PlanBuilder { hashService: HashService) (implicit c: Config, l: Logger): EitherT[IO, List[String], SyncPlan] = { for { - _ <- EitherT.liftF(SyncLogging.logRunStart(c.bucket, c.prefix, c.source)) + _ <- EitherT.liftF(SyncLogging.logRunStart(c.bucket, c.prefix, c.sources)) actions <- gatherMetadata(storageService, hashService) .leftMap(error => List(error)) .map(assemblePlan) @@ -61,7 +61,7 @@ trait PlanBuilder { private def actionsForLocalFiles(localData: LocalFiles, remoteData: S3ObjectsData) (implicit c: Config) = - localData.localFiles.foldLeft(Stream[Action]())((acc, lf) => createActionFromLocalFile(lf, remoteData) ++ acc) + localData.localFiles.foldLeft(Stream[Action]())((acc, lf) => createActionFromLocalFile(lf, remoteData, acc) ++ acc) private def actionsForRemoteKeys(remoteData: S3ObjectsData) (implicit c: Config) = @@ -75,16 +75,33 @@ trait PlanBuilder { (implicit config: Config, l: Logger) = for { _ <- SyncLogging.logFileScan - localFiles <- LocalFileStream.findFiles(config.source, hashService) + localFiles <- findFiles(hashService) } yield localFiles - private def createActionFromLocalFile(lf: LocalFile, remoteData: S3ObjectsData) + private def findFiles(hashService: HashService) + (implicit c: Config, l: Logger): IO[LocalFiles] = { + val ioListLocalFiles = (for { + source <- c.sources.paths + } yield LocalFileStream.findFiles(source, hashService)).sequence + for { + listLocalFiles <- ioListLocalFiles + localFiles = listLocalFiles.foldRight(LocalFiles()){ + (acc, moreLocalFiles) => { + acc ++ moreLocalFiles + } + } + } yield localFiles + } + + private def createActionFromLocalFile(lf: LocalFile, + remoteData: S3ObjectsData, + previousActions: Stream[Action]) (implicit c: Config) = - ActionGenerator.createActions(S3MetaDataEnricher.getMetadata(lf, remoteData)) + ActionGenerator.createActions(S3MetaDataEnricher.getMetadata(lf, remoteData), previousActions) private def createActionFromRemoteKey(rk: RemoteKey) (implicit c: Config) = - if (rk.isMissingLocally(c.source, c.prefix)) Action.ToDelete(c.bucket, rk, 0L) + if (rk.isMissingLocally(c.sources, c.prefix)) Action.ToDelete(c.bucket, rk, 0L) else DoNothing(c.bucket, rk, 0L) } diff --git a/core/src/main/scala/net/kemitix/thorp/core/SimpleHashService.scala b/core/src/main/scala/net/kemitix/thorp/core/SimpleHashService.scala new file mode 100644 index 0000000..197852e --- /dev/null +++ b/core/src/main/scala/net/kemitix/thorp/core/SimpleHashService.scala @@ -0,0 +1,19 @@ +package net.kemitix.thorp.core + +import java.nio.file.Path + +import cats.effect.IO +import net.kemitix.thorp.domain.{Logger, MD5Hash} +import net.kemitix.thorp.storage.api.HashService + +case class SimpleHashService() extends HashService { + + override def hashLocalObject(path: Path) + (implicit l: Logger): IO[Map[String, MD5Hash]] = + for { + md5 <- MD5HashGenerator.md5File(path) + } yield Map( + "md5" -> md5 + ) + +} diff --git a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala b/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala index 7784dc3..2ca7604 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/SyncLogging.scala @@ -1,7 +1,5 @@ package net.kemitix.thorp.core -import java.io.File - import cats.effect.IO import cats.implicits._ import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, ErrorQueueEvent, UploadQueueEvent} @@ -11,13 +9,15 @@ trait SyncLogging { def logRunStart(bucket: Bucket, prefix: RemoteKey, - source: File) - (implicit logger: Logger): IO[Unit] = - logger.info(s"Bucket: ${bucket.name}, Prefix: ${prefix.key}, Source: $source, ") + sources: Sources) + (implicit logger: Logger): IO[Unit] = { + val sourcesList = sources.paths.mkString(", ") + logger.info(s"Bucket: ${bucket.name}, Prefix: ${prefix.key}, Source: $sourcesList, ") + } def logFileScan(implicit c: Config, logger: Logger): IO[Unit] = - logger.info(s"Scanning local files: ${c.source}...") + logger.info(s"Scanning local files: ${c.sources}...") def logErrors(actions: Stream[StorageQueueEvent]) (implicit logger: Logger): IO[Unit] = diff --git a/core/src/test/scala/net/kemitix/thorp/core/ActionGeneratorSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/ActionGeneratorSuite.scala index dc39d59..62850fe 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/ActionGeneratorSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/ActionGeneratorSuite.scala @@ -10,19 +10,22 @@ class ActionGeneratorSuite extends FunSpec { private val source = Resource(this, "upload") + private val sourcePath = source.toPath private val prefix = RemoteKey("prefix") private val bucket = Bucket("bucket") - implicit private val config: Config = Config(bucket, prefix, source = source) - private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _ + implicit private val config: Config = Config(bucket, prefix, sources = Sources(List(sourcePath))) + private val fileToKey = KeyGenerator.generateKey(config.sources, config.prefix) _ val lastModified = LastModified(Instant.now()) describe("create actions") { - def invoke(input: S3MetaData) = ActionGenerator.createActions(input).toList + val previousActions = Stream.empty[Action] + + def invoke(input: S3MetaData) = ActionGenerator.createActions(input, previousActions).toList describe("#1 local exists, remote exists, remote matches - do nothing") { val theHash = MD5Hash("the-hash") - val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) + val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey) val theRemoteMetadata = RemoteMetaData(theFile.remoteKey, theHash, lastModified) val input = S3MetaData(theFile, // local exists matchByHash = Set(theRemoteMetadata), // remote matches @@ -36,7 +39,7 @@ class ActionGeneratorSuite } describe("#2 local exists, remote is missing, other matches - copy") { val theHash = MD5Hash("the-hash") - val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) + val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey) val theRemoteKey = theFile.remoteKey val otherRemoteKey = prefix.resolve("other-key") val otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash, lastModified) @@ -51,7 +54,7 @@ class ActionGeneratorSuite } describe("#3 local exists, remote is missing, other no matches - upload") { val theHash = MD5Hash("the-hash") - val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) + val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey) val input = S3MetaData(theFile, // local exists matchByHash = Set.empty, // other no matches matchByKey = None) // remote is missing @@ -63,7 +66,7 @@ class ActionGeneratorSuite } describe("#4 local exists, remote exists, remote no match, other matches - copy") { val theHash = MD5Hash("the-hash") - val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) + val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey) val theRemoteKey = theFile.remoteKey val oldHash = MD5Hash("old-hash") val otherRemoteKey = prefix.resolve("other-key") @@ -82,7 +85,7 @@ class ActionGeneratorSuite } describe("#5 local exists, remote exists, remote no match, other no matches - upload") { val theHash = MD5Hash("the-hash") - val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) + val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey) val theRemoteKey = theFile.remoteKey val oldHash = MD5Hash("old-hash") val theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash, lastModified) diff --git a/core/src/test/scala/net/kemitix/thorp/core/ConfigOptionTest.scala b/core/src/test/scala/net/kemitix/thorp/core/ConfigOptionTest.scala new file mode 100644 index 0000000..166cc1e --- /dev/null +++ b/core/src/test/scala/net/kemitix/thorp/core/ConfigOptionTest.scala @@ -0,0 +1,31 @@ +package net.kemitix.thorp.core + +import net.kemitix.thorp.domain.Sources +import org.scalatest.FunSpec + +class ConfigOptionTest extends FunSpec with TemporaryFolder { + + describe("when more than one source") { + it("should preserve their order") { + withDirectory(path1 => { + withDirectory(path2 => { + val configOptions = ConfigOptions(List( + ConfigOption.Source(path1), + ConfigOption.Source(path2), + ConfigOption.Bucket("bucket"), + ConfigOption.IgnoreGlobalOptions, + ConfigOption.IgnoreUserOptions + )) + val expected = Sources(List(path1, path2)) + val result = invoke(configOptions) + assert(result.isRight, result) + assertResult(expected)(ConfigQuery.sources(configOptions)) + }) + }) + } + } + + private def invoke(configOptions: ConfigOptions) = { + ConfigurationBuilder.buildConfig(configOptions).unsafeRunSync + } +} diff --git a/core/src/test/scala/net/kemitix/thorp/core/ConfigurationBuilderTest.scala b/core/src/test/scala/net/kemitix/thorp/core/ConfigurationBuilderTest.scala new file mode 100644 index 0000000..2f36551 --- /dev/null +++ b/core/src/test/scala/net/kemitix/thorp/core/ConfigurationBuilderTest.scala @@ -0,0 +1,129 @@ +package net.kemitix.thorp.core + +import java.nio.file.{Path, Paths} + +import net.kemitix.thorp.domain._ +import org.scalatest.FunSpec + +class ConfigurationBuilderTest extends FunSpec with TemporaryFolder { + + private val pwd: Path = Paths.get(System.getenv("PWD")) + private val aBucket = Bucket("aBucket") + private val coBucket: ConfigOption.Bucket = ConfigOption.Bucket(aBucket.name) + private val thorpConfigFileName = ".thorp.config" + + private def configOptions(options: ConfigOption*): ConfigOptions = + ConfigOptions(List( + ConfigOption.IgnoreUserOptions, + ConfigOption.IgnoreGlobalOptions + ) ++ options) + + describe("when no source") { + it("should use the current (PWD) directory") { + val expected = Right(Config(aBucket, sources = Sources(List(pwd)))) + val options = configOptions(coBucket) + val result = invoke(options) + assertResult(expected)(result) + } + } + describe("when has a single source with no .thorp.config") { + it("should only include the source once") { + withDirectory(aSource => { + val expected = Right(Sources(List(aSource))) + val options = configOptions(ConfigOption.Source(aSource), coBucket) + val result = invoke(options).map(_.sources) + assertResult(expected)(result) + }) + } + } + describe("when has two sources") { + it("should include both sources in order") { + withDirectory(currentSource => { + withDirectory(previousSource => { + val expected = Right(List(currentSource, previousSource)) + val options = configOptions( + ConfigOption.Source(currentSource), + ConfigOption.Source(previousSource), + coBucket) + val result = invoke(options).map(_.sources.paths) + assertResult(expected)(result) + }) + }) + } + } + describe("when current source has .thorp.config with source to another") { + it("should include both sources in order") { + withDirectory(currentSource => { + withDirectory(previousSource => { + writeFile(currentSource, thorpConfigFileName, + s"source = $previousSource") + val expected = Right(List(currentSource, previousSource)) + val options = configOptions( + ConfigOption.Source(currentSource), + coBucket) + val result = invoke(options).map(_.sources.paths) + assertResult(expected)(result) + }) + }) + } + describe("when settings are in current and previous") { + it("should include some settings from both sources and some from only current") { + withDirectory(previousSource => { + withDirectory(currentSource => { + writeFile(currentSource, thorpConfigFileName, + s"source = $previousSource", + "bucket = current-bucket", + "prefix = current-prefix", + "include = current-include", + "exclude = current-exclude") + writeFile(previousSource, thorpConfigFileName, + "bucket = previous-bucket", + "prefix = previous-prefix", + "include = previous-include", + "exclude = previous-exclude") + // should have both sources in order + val expectedSources = Right(Sources(List(currentSource, previousSource))) + // should have bucket from current only + val expectedBuckets = Right(Bucket("current-bucket")) + // should have prefix from current only + val expectedPrefixes = Right(RemoteKey("current-prefix")) + // should have filters from both sources + val expectedFilters = Right(List( + Filter.Exclude("previous-exclude"), + Filter.Include("previous-include"), + Filter.Exclude("current-exclude"), + Filter.Include("current-include"))) + val options = configOptions(ConfigOption.Source(currentSource)) + val result = invoke(options) + assertResult(expectedSources)(result.map(_.sources)) + assertResult(expectedBuckets)(result.map(_.bucket)) + assertResult(expectedPrefixes)(result.map(_.prefix)) + assertResult(expectedFilters)(result.map(_.filters)) + }) + }) + } + } + } + + describe("when source has thorp.config source to another source that does the same") { + it("should include all three sources") { + withDirectory(currentSource => { + withDirectory(parentSource => { + writeFile(currentSource, thorpConfigFileName, s"source = $parentSource") + withDirectory(grandParentSource => { + writeFile(parentSource, thorpConfigFileName, s"source = $grandParentSource") + val expected = Right(List(currentSource, parentSource, grandParentSource)) + val options = configOptions( + ConfigOption.Source(currentSource), + coBucket) + val result = invoke(options).map(_.sources.paths) + assertResult(expected)(result) + }) + }) + }) + } + } + + private def invoke(configOptions: ConfigOptions) = + ConfigurationBuilder.buildConfig(configOptions).unsafeRunSync +} diff --git a/core/src/test/scala/net/kemitix/thorp/core/DummyHashService.scala b/core/src/test/scala/net/kemitix/thorp/core/DummyHashService.scala index facf6be..549d386 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/DummyHashService.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/DummyHashService.scala @@ -1,11 +1,16 @@ package net.kemitix.thorp.core -import java.io.File +import java.nio.file.Path import cats.effect.IO import net.kemitix.thorp.domain.{Logger, MD5Hash} import net.kemitix.thorp.storage.api.HashService -case class DummyHashService(hashes: Map[File, Map[String, MD5Hash]]) extends HashService { - override def hashLocalObject(file: File)(implicit l: Logger): IO[Map[String, MD5Hash]] = IO.pure(hashes(file)) +case class DummyHashService(hashes: Map[Path, Map[String, MD5Hash]]) + extends HashService { + + override def hashLocalObject(path: Path) + (implicit l: Logger): IO[Map[String, MD5Hash]] = + IO.pure(hashes(path)) + } diff --git a/core/src/test/scala/net/kemitix/thorp/core/DummyStorageService.scala b/core/src/test/scala/net/kemitix/thorp/core/DummyStorageService.scala new file mode 100644 index 0000000..6f2d011 --- /dev/null +++ b/core/src/test/scala/net/kemitix/thorp/core/DummyStorageService.scala @@ -0,0 +1,41 @@ +package net.kemitix.thorp.core + +import java.io.File + +import cats.data.EitherT +import cats.effect.IO +import net.kemitix.thorp.domain._ +import net.kemitix.thorp.storage.api.StorageService + +case class DummyStorageService(s3ObjectData: S3ObjectsData, + uploadFiles: Map[File, (RemoteKey, MD5Hash)]) + extends StorageService { + + override def shutdown: IO[StorageQueueEvent] = + IO.pure(StorageQueueEvent.ShutdownQueueEvent()) + + override def listObjects(bucket: Bucket, + prefix: RemoteKey) + (implicit l: Logger): EitherT[IO, String, S3ObjectsData] = + EitherT.liftF(IO.pure(s3ObjectData)) + + override def upload(localFile: LocalFile, + bucket: Bucket, + batchMode: Boolean, + uploadEventListener: UploadEventListener, + tryCount: Int): IO[StorageQueueEvent] = { + val (remoteKey, md5Hash) = uploadFiles(localFile.file) + IO.pure(StorageQueueEvent.UploadQueueEvent(remoteKey, md5Hash)) + } + + override def copy(bucket: Bucket, + sourceKey: RemoteKey, + hash: MD5Hash, + targetKey: RemoteKey): IO[StorageQueueEvent] = + IO.pure(StorageQueueEvent.CopyQueueEvent(targetKey)) + + override def delete(bucket: Bucket, + remoteKey: RemoteKey): IO[StorageQueueEvent] = + IO.pure(StorageQueueEvent.DeleteQueueEvent(remoteKey)) + +} diff --git a/core/src/test/scala/net/kemitix/thorp/core/KeyGeneratorSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/KeyGeneratorSuite.scala index 73119cd..c53e412 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/KeyGeneratorSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/KeyGeneratorSuite.scala @@ -2,34 +2,32 @@ package net.kemitix.thorp.core import java.io.File -import net.kemitix.thorp.domain.{Bucket, Config, RemoteKey} +import net.kemitix.thorp.domain.{Bucket, Config, RemoteKey, Sources} import org.scalatest.FunSpec class KeyGeneratorSuite extends FunSpec { - private val source: File = Resource(this, "upload") - private val prefix = RemoteKey("prefix") - implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) - private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _ + private val source: File = Resource(this, "upload") + private val sourcePath = source.toPath + private val prefix = RemoteKey("prefix") + implicit private val config: Config = Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath))) + private val fileToKey = KeyGenerator.generateKey(config.sources, config.prefix) _ - describe("key generator") { - def resolve(subdir: String): File = { - source.toPath.resolve(subdir).toFile - } + describe("key generator") { - describe("when file is within source") { - it("has a valid key") { - val subdir = "subdir" - assertResult(RemoteKey(s"${prefix.key}/$subdir"))(fileToKey(resolve(subdir))) - } - } - - describe("when file is deeper within source") { - it("has a valid key") { - val subdir = "subdir/deeper/still" - assertResult(RemoteKey(s"${prefix.key}/$subdir"))(fileToKey(resolve(subdir))) - } + describe("when file is within source") { + it("has a valid key") { + val subdir = "subdir" + assertResult(RemoteKey(s"${prefix.key}/$subdir"))(fileToKey(sourcePath.resolve(subdir))) } } + describe("when file is deeper within source") { + it("has a valid key") { + val subdir = "subdir/deeper/still" + assertResult(RemoteKey(s"${prefix.key}/$subdir"))(fileToKey(sourcePath.resolve(subdir))) + } + } + } + } \ No newline at end of file diff --git a/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala index c1557b0..de68a25 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala @@ -2,22 +2,23 @@ package net.kemitix.thorp.core import java.nio.file.Paths -import net.kemitix.thorp.domain.{Config, LocalFile, Logger, MD5HashData} +import net.kemitix.thorp.domain.{Config, LocalFile, Logger, MD5HashData, Sources} import net.kemitix.thorp.storage.api.HashService import org.scalatest.FunSpec class LocalFileStreamSuite extends FunSpec { - private val uploadResource = Resource(this, "upload") + private val source = Resource(this, "upload") + private val sourcePath = source.toPath private val hashService: HashService = DummyHashService(Map( file("root-file") -> Map("md5" -> MD5HashData.Root.hash), file("subdir/leaf-file") -> Map("md5" -> MD5HashData.Leaf.hash) )) private def file(filename: String) = - uploadResource.toPath.resolve(Paths.get(filename)).toFile + sourcePath.resolve(Paths.get(filename)) - implicit private val config: Config = Config(source = uploadResource) + implicit private val config: Config = Config(sources = Sources(List(sourcePath))) implicit private val logger: Logger = new DummyLogger describe("findFiles") { @@ -37,7 +38,7 @@ class LocalFileStreamSuite extends FunSpec { } } - private def invoke = { - LocalFileStream.findFiles(uploadResource, hashService).unsafeRunSync - } + private def invoke = + LocalFileStream.findFiles(sourcePath, hashService).unsafeRunSync + } diff --git a/core/src/test/scala/net/kemitix/thorp/core/MD5HashGeneratorTest.scala b/core/src/test/scala/net/kemitix/thorp/core/MD5HashGeneratorTest.scala index 2d845c6..191dc67 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/MD5HashGeneratorTest.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/MD5HashGeneratorTest.scala @@ -7,37 +7,38 @@ import org.scalatest.FunSpec class MD5HashGeneratorTest extends FunSpec { private val source = Resource(this, "upload") + private val sourcePath = source.toPath private val prefix = RemoteKey("prefix") - implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) + implicit private val config: Config = Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath))) implicit private val logger: Logger = new DummyLogger describe("read a small file (smaller than buffer)") { - val file = Resource(this, "upload/root-file") + val path = Resource(this, "upload/root-file").toPath it("should generate the correct hash") { - val result = MD5HashGenerator.md5File(file).unsafeRunSync + val result = MD5HashGenerator.md5File(path).unsafeRunSync assertResult(Root.hash)(result) } } describe("read a large file (bigger than buffer)") { - val file = Resource(this, "big-file") + val path = Resource(this, "big-file").toPath it("should generate the correct hash") { val expected = MD5HashData.BigFile.hash - val result = MD5HashGenerator.md5File(file).unsafeRunSync + val result = MD5HashGenerator.md5File(path).unsafeRunSync assertResult(expected)(result) } } describe("read chunks of file") { - val file = Resource(this, "big-file") + val path = Resource(this, "big-file").toPath it("should generate the correct hash for first chunk of the file") { val part1 = MD5HashData.BigFile.Part1 val expected = part1.hash - val result = MD5HashGenerator.md5FileChunk(file, part1.offset, part1.size).unsafeRunSync + val result = MD5HashGenerator.md5FileChunk(path, part1.offset, part1.size).unsafeRunSync assertResult(expected)(result) } it("should generate the correcy hash for second chunk of the file") { val part2 = MD5HashData.BigFile.Part2 val expected = part2.hash - val result = MD5HashGenerator.md5FileChunk(file, part2.offset, part2.size).unsafeRunSync + val result = MD5HashGenerator.md5FileChunk(path, part2.offset, part2.size).unsafeRunSync assertResult(expected)(result) } } diff --git a/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala b/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala new file mode 100644 index 0000000..147385c --- /dev/null +++ b/core/src/test/scala/net/kemitix/thorp/core/PlanBuilderTest.scala @@ -0,0 +1,404 @@ +package net.kemitix.thorp.core + +import java.io.File +import java.nio.file.Path + +import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} +import net.kemitix.thorp.domain._ +import net.kemitix.thorp.storage.api.{HashService, StorageService} +import org.scalatest.FreeSpec + +class PlanBuilderTest extends FreeSpec with TemporaryFolder { + + private val planBuilder = new PlanBuilder {} + private val emptyS3ObjectData = S3ObjectsData() + private implicit val logger: Logger = new DummyLogger + + val lastModified: LastModified = LastModified() + + "create a plan" - { + + val hashService = SimpleHashService() + + "one source" - { + "a file" - { + val filename = "aFile" + val remoteKey = RemoteKey(filename) + "with no matching remote key" - { + "with no other remote key with matching hash" - { + "upload file" in { + withDirectory(source => { + val file = createFile(source, filename, "file-content") + val hash = md5Hash(file) + + val expected = Right(List( + toUpload(remoteKey, hash, source, file) + )) + + val storageService = DummyStorageService(emptyS3ObjectData, Map( + file -> (remoteKey, hash) + )) + + val result = invoke(storageService, hashService, configOptions( + ConfigOption.Source(source), + ConfigOption.Bucket("a-bucket"))) + + assertResult(expected)(result) + }) + } + } + "with another remote key with matching hash" - { + "copy file" in { + withDirectory(source => { + val anOtherFilename = "other" + val content = "file-content" + val aFile = createFile(source, filename, content) + val anOtherFile = createFile(source, anOtherFilename, content) + val aHash = md5Hash(aFile) + + val anOtherKey = RemoteKey("other") + + val expected = Right(List( + toCopy(anOtherKey, aHash, remoteKey) + )) + + val s3ObjectsData = S3ObjectsData( + byHash = Map(aHash -> Set(KeyModified(anOtherKey, lastModified))), + byKey = Map(anOtherKey -> HashModified(aHash, lastModified)) + ) + + val storageService = DummyStorageService(s3ObjectsData, Map( + aFile -> (remoteKey, aHash) + )) + + val result = invoke(storageService, hashService, configOptions( + ConfigOption.Source(source), + ConfigOption.Bucket("a-bucket"))) + + assertResult(expected)(result) + }) + } + } + } + "with matching remote key" - { + "with matching hash" - { + "do nothing" in { + withDirectory(source => { + val file = createFile(source, filename, "file-content") + val hash = md5Hash(file) + + // DoNothing actions should have been filtered out of the plan + val expected = Right(List()) + + val s3ObjectsData = S3ObjectsData( + byHash = Map(hash -> Set(KeyModified(remoteKey, lastModified))), + byKey = Map(remoteKey -> HashModified(hash, lastModified)) + ) + + val storageService = DummyStorageService(s3ObjectsData, Map( + file -> (remoteKey, hash) + )) + + val result = invoke(storageService, hashService, configOptions( + ConfigOption.Source(source), + ConfigOption.Bucket("a-bucket"))) + + assertResult(expected)(result) + }) + } + } + "with different hash" - { + "with no matching remote hash" - { + "upload file" in { + withDirectory(source => { + val file = createFile(source, filename, "file-content") + val currentHash = md5Hash(file) + val originalHash = MD5Hash("original-file-content") + + val expected = Right(List( + toUpload(remoteKey, currentHash, source, file) + )) + + val s3ObjectsData = S3ObjectsData( + byHash = Map(originalHash -> Set(KeyModified(remoteKey, lastModified))), + byKey = Map(remoteKey -> HashModified(originalHash, lastModified)) + ) + + val storageService = DummyStorageService(s3ObjectsData, Map( + file -> (remoteKey, currentHash) + )) + + val result = invoke(storageService, hashService, configOptions( + ConfigOption.Source(source), + ConfigOption.Bucket("a-bucket"))) + + assertResult(expected)(result) + }) + } + } + "with matching remote hash" - { + "copy file" in { + withDirectory(source => { + val file = createFile(source, filename, "file-content") + val hash = md5Hash(file) + val sourceKey = RemoteKey("other-key") + + val expected = Right(List( + toCopy(sourceKey, hash, remoteKey) + )) + + val s3ObjectsData = S3ObjectsData( + byHash = Map(hash -> Set(KeyModified(sourceKey, lastModified))), + byKey = Map() + ) + + val storageService = DummyStorageService(s3ObjectsData, Map( + file -> (remoteKey, hash) + )) + + val result = invoke(storageService, hashService, configOptions( + ConfigOption.Source(source), + ConfigOption.Bucket("a-bucket"))) + + assertResult(expected)(result) + }) + } + } + } + } + } + "a remote key" - { + val filename = "aFile" + val remoteKey = RemoteKey(filename) + "with a matching local file" - { + "do nothing" in { + withDirectory(source => { + val file = createFile(source, filename, "file-content") + val hash = md5Hash(file) + + // DoNothing actions should have been filtered out of the plan + val expected = Right(List()) + + val s3ObjectsData = S3ObjectsData( + byHash = Map(hash -> Set(KeyModified(remoteKey, lastModified))), + byKey = Map(remoteKey -> HashModified(hash, lastModified)) + ) + + val storageService = DummyStorageService(s3ObjectsData, Map( + file -> (remoteKey, hash) + )) + + val result = invoke(storageService, hashService, configOptions( + ConfigOption.Source(source), + ConfigOption.Bucket("a-bucket"))) + + assertResult(expected)(result) + }) + } + } + "with no matching local file" - { + "delete remote key" ignore { + withDirectory(source => { + val hash = MD5Hash("file-content") + + val expected = Right(List( + toDelete(remoteKey) + )) + + val s3ObjectsData = S3ObjectsData( + byHash = Map(hash -> Set(KeyModified(remoteKey, lastModified))), + byKey = Map(remoteKey -> HashModified(hash, lastModified)) + ) + + val storageService = DummyStorageService(s3ObjectsData, Map.empty) + + val result = invoke(storageService, hashService, configOptions( + ConfigOption.Source(source), + ConfigOption.Bucket("a-bucket"))) + + assertResult(expected)(result) + }) + } + } + } + } + + "two sources" - { + val filename1 = "file-1" + val filename2 = "file-2" + val remoteKey1 = RemoteKey(filename1) + val remoteKey2 = RemoteKey(filename2) + "unique files in both" - { + "upload all files" in { + withDirectory(firstSource => { + val fileInFirstSource = createFile(firstSource, filename1, "file-1-content") + val hash1 = md5Hash(fileInFirstSource) + + withDirectory(secondSource => { + val fileInSecondSource = createFile(secondSource, filename2, "file-2-content") + val hash2 = md5Hash(fileInSecondSource) + + val expected = Right(List( + toUpload(remoteKey2, hash2, secondSource, fileInSecondSource), + toUpload(remoteKey1, hash1, firstSource, fileInFirstSource) + )) + + val storageService = DummyStorageService(emptyS3ObjectData, Map( + fileInFirstSource -> (remoteKey1, hash1), + fileInSecondSource -> (remoteKey2, hash2))) + + val result = invoke(storageService, hashService, configOptions( + ConfigOption.Source(firstSource), + ConfigOption.Source(secondSource), + ConfigOption.Bucket("a-bucket"))) + + assertResult(expected)(result) + }) + }) + } + } + "same filename in both" - { + "only upload file in first source" in { + withDirectory(firstSource => { + val fileInFirstSource: File = createFile(firstSource, filename1, "file-1-content") + val hash1 = md5Hash(fileInFirstSource) + + withDirectory(secondSource => { + val fileInSecondSource: File = createFile(secondSource, filename1, "file-2-content") + val hash2 = md5Hash(fileInSecondSource) + + val expected = Right(List( + toUpload(remoteKey1, hash1, firstSource, fileInFirstSource) + )) + + val storageService = DummyStorageService(emptyS3ObjectData, Map( + fileInFirstSource -> (remoteKey1, hash1), + fileInSecondSource -> (remoteKey2, hash2))) + + val result = invoke(storageService, hashService, configOptions( + ConfigOption.Source(firstSource), + ConfigOption.Source(secondSource), + ConfigOption.Bucket("a-bucket"))) + + assertResult(expected)(result) + }) + }) + } + } + "with a remote file only present in second source" - { + "do not delete it " in { + withDirectory(firstSource => { + + withDirectory(secondSource => { + val fileInSecondSource = createFile(secondSource, filename2, "file-2-content") + val hash2 = md5Hash(fileInSecondSource) + + val expected = Right(List()) + + val s3ObjectData = S3ObjectsData( + byHash = Map(hash2 -> Set(KeyModified(remoteKey2, lastModified))), + byKey = Map(remoteKey2 -> HashModified(hash2, lastModified))) + + val storageService = DummyStorageService(s3ObjectData, Map( + fileInSecondSource -> (remoteKey2, hash2))) + + val result = invoke(storageService, hashService, configOptions( + ConfigOption.Source(firstSource), + ConfigOption.Source(secondSource), + ConfigOption.Bucket("a-bucket"))) + + assertResult(expected)(result) + }) + }) + } + } + "with remote file only present in first source" - { + "do not delete it" in { + withDirectory(firstSource => { + val fileInFirstSource: File = createFile(firstSource, filename1, "file-1-content") + val hash1 = md5Hash(fileInFirstSource) + + withDirectory(secondSource => { + + val expected = Right(List()) + + val s3ObjectData = S3ObjectsData( + byHash = Map(hash1 -> Set(KeyModified(remoteKey1, lastModified))), + byKey = Map(remoteKey1 -> HashModified(hash1, lastModified))) + + val storageService = DummyStorageService(s3ObjectData, Map( + fileInFirstSource -> (remoteKey1, hash1))) + + val result = invoke(storageService, hashService, configOptions( + ConfigOption.Source(firstSource), + ConfigOption.Source(secondSource), + ConfigOption.Bucket("a-bucket"))) + + assertResult(expected)(result) + }) + }) + } + } + "with remote file not present in either source" - { + "delete from remote" in { + withDirectory(firstSource => { + + withDirectory(secondSource => { + + val expected = Right(List( + toDelete(remoteKey1) + )) + + val s3ObjectData = S3ObjectsData( + byKey = Map(remoteKey1 -> HashModified(MD5Hash(""), lastModified))) + + val storageService = DummyStorageService(s3ObjectData, Map()) + + val result = invoke(storageService, hashService, configOptions( + ConfigOption.Source(firstSource), + ConfigOption.Source(secondSource), + ConfigOption.Bucket("a-bucket"))) + + assertResult(expected)(result) + }) + }) + } + } + } + + def md5Hash(file: File) = { + hashService.hashLocalObject(file.toPath).unsafeRunSync()("md5") + } + + } + + private def toUpload(remoteKey: RemoteKey, + md5Hash: MD5Hash, + source: Path, + file: File): (String, String, String, String, String) = + ("upload", remoteKey.key, md5Hash.hash, source.toString, file.toString) + + private def toCopy(sourceKey: RemoteKey, + md5Hash: MD5Hash, + targetKey: RemoteKey): (String, String, String, String, String) = + ("copy", sourceKey.key, md5Hash.hash, targetKey.key, "") + + private def toDelete(remoteKey: RemoteKey): (String, String, String, String, String) = + ("delete", remoteKey.key, "", "", "") + + private def configOptions(configOptions: ConfigOption*): ConfigOptions = + ConfigOptions(List(configOptions:_*)) + + private def invoke(storageService: StorageService, + hashService: HashService, + configOptions: ConfigOptions): Either[List[String], List[(String, String, String, String, String)]] = + planBuilder.createPlan(storageService, hashService, configOptions) + .value.unsafeRunSync().map(_.actions.toList.map({ + case ToUpload(_, lf, _) => ("upload", lf.remoteKey.key, lf.hashes("md5").hash, lf.source.toString, lf.file.toString) + case ToDelete(_, remoteKey, _) => ("delete", remoteKey.key, "", "", "") + case ToCopy(_, sourceKey, hash, targetKey, _) => ("copy", sourceKey.key, hash.hash, targetKey.key, "") + case DoNothing(_, remoteKey, _) => ("do-nothing", remoteKey.key, "", "", "") + case x => ("other", x.toString, "", "", "") + })) + +} diff --git a/core/src/test/scala/net/kemitix/thorp/core/S3MetaDataEnricherSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/S3MetaDataEnricherSuite.scala index 8cdef87..802f924 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/S3MetaDataEnricherSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/S3MetaDataEnricherSuite.scala @@ -10,9 +10,10 @@ class S3MetaDataEnricherSuite extends FunSpec { private val source = Resource(this, "upload") + private val sourcePath = source.toPath private val prefix = RemoteKey("prefix") - implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) - private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _ + implicit private val config: Config = Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath))) + private val fileToKey = KeyGenerator.generateKey(config.sources, config.prefix) _ val lastModified = LastModified(Instant.now()) def getMatchesByKey(status: (Option[HashModified], Set[(MD5Hash, KeyModified)])): Option[HashModified] = { @@ -29,7 +30,7 @@ class S3MetaDataEnricherSuite describe("#1a local exists, remote exists, remote matches, other matches - do nothing") { val theHash: MD5Hash = MD5Hash("the-file-hash") - val theFile: LocalFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) + val theFile: LocalFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey) val theRemoteKey: RemoteKey = theFile.remoteKey val s3: S3ObjectsData = S3ObjectsData( byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))), @@ -46,7 +47,7 @@ class S3MetaDataEnricherSuite } describe("#1b local exists, remote exists, remote matches, other no matches - do nothing") { val theHash: MD5Hash = MD5Hash("the-file-hash") - val theFile: LocalFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) + val theFile: LocalFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey) val theRemoteKey: RemoteKey = prefix.resolve("the-file") val s3: S3ObjectsData = S3ObjectsData( byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))), @@ -63,7 +64,7 @@ class S3MetaDataEnricherSuite } describe("#2 local exists, remote is missing, remote no match, other matches - copy") { val theHash = MD5Hash("the-hash") - val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) + val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey) val otherRemoteKey = RemoteKey("other-key") val s3: S3ObjectsData = S3ObjectsData( byHash = Map(theHash -> Set(KeyModified(otherRemoteKey, lastModified))), @@ -80,11 +81,8 @@ class S3MetaDataEnricherSuite } describe("#3 local exists, remote is missing, remote no match, other no matches - upload") { val theHash = MD5Hash("the-hash") - val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) - val s3: S3ObjectsData = S3ObjectsData( - byHash = Map(), - byKey = Map() - ) + val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey) + val s3: S3ObjectsData = S3ObjectsData() it("generates valid metadata") { val expected = S3MetaData(theFile, matchByHash = Set.empty, @@ -95,7 +93,7 @@ class S3MetaDataEnricherSuite } describe("#4 local exists, remote exists, remote no match, other matches - copy") { val theHash = MD5Hash("the-hash") - val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) + val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey) val theRemoteKey = theFile.remoteKey val oldHash = MD5Hash("old-hash") val otherRemoteKey = prefix.resolve("other-key") @@ -120,7 +118,7 @@ class S3MetaDataEnricherSuite } describe("#5 local exists, remote exists, remote no match, other no matches - upload") { val theHash = MD5Hash("the-hash") - val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) + val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), sourcePath, fileToKey) val theRemoteKey = theFile.remoteKey val oldHash = MD5Hash("old-hash") val s3: S3ObjectsData = S3ObjectsData( @@ -148,11 +146,11 @@ class S3MetaDataEnricherSuite describe("getS3Status") { val hash = MD5Hash("hash") - val localFile = LocalFile.resolve("the-file", md5HashMap(hash), source, fileToKey) + val localFile = LocalFile.resolve("the-file", md5HashMap(hash), sourcePath, fileToKey) val key = localFile.remoteKey - val keyOtherKey = LocalFile.resolve("other-key-same-hash", md5HashMap(hash), source, fileToKey) + val keyOtherKey = LocalFile.resolve("other-key-same-hash", md5HashMap(hash), sourcePath, fileToKey) val diffHash = MD5Hash("diff") - val keyDiffHash = LocalFile.resolve("other-key-diff-hash", md5HashMap(diffHash), source, fileToKey) + val keyDiffHash = LocalFile.resolve("other-key-diff-hash", md5HashMap(diffHash), sourcePath, fileToKey) val lastModified = LastModified(Instant.now) val s3ObjectsData: S3ObjectsData = S3ObjectsData( byHash = Map( @@ -175,7 +173,7 @@ class S3MetaDataEnricherSuite } describe("when remote key does not exist and no others matches hash") { - val localFile = LocalFile.resolve("missing-file", md5HashMap(MD5Hash("unique")), source, fileToKey) + val localFile = LocalFile.resolve("missing-file", md5HashMap(MD5Hash("unique")), sourcePath, fileToKey) it("should return no matches by key") { val result = getMatchesByKey(invoke(localFile)) assert(result.isEmpty) diff --git a/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala index e0cb20b..b2b1861 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala @@ -17,14 +17,16 @@ class SyncSuite extends FunSpec { private val source = Resource(this, "upload") + private val sourcePath = source.toPath private val prefix = RemoteKey("prefix") - private val configOptions = ConfigOptions(List( - ConfigOption.Source(source.toPath), - ConfigOption.Bucket("bucket"), - ConfigOption.Prefix("prefix"), - ConfigOption.IgnoreGlobalOptions, - ConfigOption.IgnoreUserOptions - )) + private val configOptions = + ConfigOptions(List( + ConfigOption.Source(sourcePath), + ConfigOption.Bucket("bucket"), + ConfigOption.Prefix("prefix"), + ConfigOption.IgnoreGlobalOptions, + ConfigOption.IgnoreUserOptions + )) implicit private val logger: Logger = new DummyLogger private val lastModified = LastModified(Instant.now) @@ -35,22 +37,26 @@ class SyncSuite // source contains the files root-file and subdir/leaf-file val rootRemoteKey = RemoteKey("prefix/root-file") val leafRemoteKey = RemoteKey("prefix/subdir/leaf-file") - val rootFile: LocalFile = LocalFile.resolve("root-file", md5HashMap(Root.hash), source, _ => rootRemoteKey) + val rootFile: LocalFile = + LocalFile.resolve("root-file", md5HashMap(Root.hash), sourcePath, _ => rootRemoteKey) + val leafFile: LocalFile = + LocalFile.resolve("subdir/leaf-file", md5HashMap(Leaf.hash), sourcePath, _ => leafRemoteKey) - private def md5HashMap(md5Hash: MD5Hash): Map[String, MD5Hash] = { + private def md5HashMap(md5Hash: MD5Hash): Map[String, MD5Hash] = Map("md5" -> md5Hash) - } - val leafFile: LocalFile = LocalFile.resolve("subdir/leaf-file", md5HashMap(Leaf.hash), source, _ => leafRemoteKey) + val hashService = + DummyHashService(Map( + file("root-file") -> Map("md5" -> MD5HashData.Root.hash), + file("subdir/leaf-file") -> Map("md5" -> MD5HashData.Leaf.hash) + )) - val hashService = DummyHashService(Map( - file("root-file") -> Map("md5" -> MD5HashData.Root.hash), - file("subdir/leaf-file") -> Map("md5" -> MD5HashData.Leaf.hash) - )) + private def file(filename: String) = + sourcePath.resolve(Paths.get(filename)) def invokeSubject(storageService: StorageService, - hashService: HashService, - configOptions: ConfigOptions): Either[List[String], SyncPlan] = { + hashService: HashService, + configOptions: ConfigOptions): Either[List[String], SyncPlan] = { PlanBuilder.createPlan(storageService, hashService, configOptions).value.unsafeRunSync } @@ -62,9 +68,7 @@ class SyncSuite } describe("when all files should be uploaded") { - val storageService = new RecordingStorageService(testBucket, S3ObjectsData( - byHash = Map(), - byKey = Map())) + val storageService = new RecordingStorageService(testBucket, S3ObjectsData()) it("uploads all files") { val expected = Right(Set( ToUpload(testBucket, rootFile, rootFile.file.length), @@ -74,9 +78,6 @@ class SyncSuite } } - private def file(filename: String) = - source.toPath.resolve(Paths.get(filename)).toFile - describe("when no files should be uploaded") { val s3ObjectsData = S3ObjectsData( byHash = Map( diff --git a/core/src/test/scala/net/kemitix/thorp/core/TemporaryFolder.scala b/core/src/test/scala/net/kemitix/thorp/core/TemporaryFolder.scala new file mode 100644 index 0000000..6d80f81 --- /dev/null +++ b/core/src/test/scala/net/kemitix/thorp/core/TemporaryFolder.scala @@ -0,0 +1,43 @@ +package net.kemitix.thorp.core + +import java.io.{File, IOException, PrintWriter} +import java.nio.file.attribute.BasicFileAttributes +import java.nio.file.{FileVisitResult, Files, Path, SimpleFileVisitor} + +import scala.util.Try + +trait TemporaryFolder { + + def withDirectory(testCode: Path => Any): Any = { + val dir: Path = Files.createTempDirectory("thorp-temp") + val t = Try(testCode(dir)) + remove(dir) + t.get + } + + def remove(root: Path): Unit = { + Files.walkFileTree(root, new SimpleFileVisitor[Path] { + override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = { + Files.delete(file) + FileVisitResult.CONTINUE + } + override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult = { + Files.delete(dir) + FileVisitResult.CONTINUE + } + }) + } + + def writeFile(directory: Path, name: String, contents: String*): Unit = { + directory.toFile.mkdirs + val pw = new PrintWriter(directory.resolve(name).toFile, "UTF-8") + contents.foreach(pw.println) + pw.close() + } + + def createFile(path: Path, name: String, content: String*): File = { + writeFile(path, name, content:_*) + path.resolve(name).toFile + } + +} \ No newline at end of file diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/Config.scala b/domain/src/main/scala/net/kemitix/thorp/domain/Config.scala index f326c90..2ec1867 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/Config.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/Config.scala @@ -1,10 +1,8 @@ package net.kemitix.thorp.domain -import java.io.File - final case class Config(bucket: Bucket = Bucket(""), prefix: RemoteKey = RemoteKey(""), filters: List[Filter] = List(), debug: Boolean = false, batchMode: Boolean = false, - source: File) + sources: Sources) diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/LastModified.scala b/domain/src/main/scala/net/kemitix/thorp/domain/LastModified.scala index 7c954c3..b8c3942 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/LastModified.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/LastModified.scala @@ -2,4 +2,4 @@ package net.kemitix.thorp.domain import java.time.Instant -final case class LastModified(when: Instant) +final case class LastModified(when: Instant = Instant.now) diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/LocalFile.scala b/domain/src/main/scala/net/kemitix/thorp/domain/LocalFile.scala index 1b5f012..ee8f689 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/LocalFile.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/LocalFile.scala @@ -21,9 +21,9 @@ final case class LocalFile(file: File, source: File, hashes: Map[String, MD5Hash object LocalFile { def resolve(path: String, md5Hashes: Map[String, MD5Hash], - source: File, - fileToKey: File => RemoteKey): LocalFile = { - val file = source.toPath.resolve(path).toFile - LocalFile(file, source, md5Hashes, fileToKey(file)) + source: Path, + pathToKey: Path => RemoteKey): LocalFile = { + val resolvedPath = source.resolve(path) + LocalFile(resolvedPath.toFile, source.toFile, md5Hashes, pathToKey(resolvedPath)) } } diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/RemoteKey.scala b/domain/src/main/scala/net/kemitix/thorp/domain/RemoteKey.scala index f75cb8c..3322f55 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/RemoteKey.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/RemoteKey.scala @@ -1,24 +1,28 @@ package net.kemitix.thorp.domain import java.io.File -import java.nio.file.Paths +import java.nio.file.{Path, Paths} final case class RemoteKey(key: String) { - def asFile(source: File, prefix: RemoteKey): File = - source.toPath.resolve(relativeTo(prefix)).toFile + def asFile(source: Path, prefix: RemoteKey): Option[File] = + if (key.length == 0) None + else Some(source.resolve(relativeTo(prefix)).toFile) private def relativeTo(prefix: RemoteKey) = { prefix match { - case RemoteKey("") => Paths.get(prefix.key) + case RemoteKey("") => Paths.get(key) case _ => Paths.get(prefix.key).relativize(Paths.get(key)) } } - def isMissingLocally(source: File, prefix: RemoteKey): Boolean = - ! asFile(source, prefix).exists + def isMissingLocally(sources: Sources, prefix: RemoteKey): Boolean = + !sources.paths.exists(source => asFile(source, prefix) match { + case Some(file) => file.exists + case None => false + }) def resolve(path: String): RemoteKey = - RemoteKey(key + "/" + path) + RemoteKey(List(key, path).filterNot(_.isEmpty).mkString("/")) } diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/S3ObjectsData.scala b/domain/src/main/scala/net/kemitix/thorp/domain/S3ObjectsData.scala index 88f5693..9417053 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/S3ObjectsData.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/S3ObjectsData.scala @@ -3,5 +3,5 @@ package net.kemitix.thorp.domain /** * A list of objects and their MD5 hash values. */ -final case class S3ObjectsData(byHash: Map[MD5Hash, Set[KeyModified]], - byKey: Map[RemoteKey, HashModified]) +final case class S3ObjectsData(byHash: Map[MD5Hash, Set[KeyModified]] = Map.empty, + byKey: Map[RemoteKey, HashModified] = Map.empty) diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/Sources.scala b/domain/src/main/scala/net/kemitix/thorp/domain/Sources.scala new file mode 100644 index 0000000..f42e332 --- /dev/null +++ b/domain/src/main/scala/net/kemitix/thorp/domain/Sources.scala @@ -0,0 +1,26 @@ +package net.kemitix.thorp.domain + +import java.nio.file.Path + +/** + * The paths to synchronise with target. + * + * The first source path takes priority over those later in the list, + * etc. Where there is any file with the same relative path within + * more than one source, the file in the first listed path is + * uploaded, and the others are ignored. + */ +case class Sources(paths: List[Path]) { + def ++(path: Path): Sources = this ++ List(path) + def ++(otherPaths: List[Path]): Sources = Sources(paths ++ otherPaths) + + /** + * Returns the source path for the given path. + * + * @param path the path to find the matching source + * @return the source for the path + * @throws NoSuchElementException if no source matches the path + */ + def forPath(path: Path): Path = + paths.find(source => path.startsWith(source)).get +} diff --git a/domain/src/test/scala/net/kemitix/thorp/domain/RemoteKeyTest.scala b/domain/src/test/scala/net/kemitix/thorp/domain/RemoteKeyTest.scala new file mode 100644 index 0000000..4bdb099 --- /dev/null +++ b/domain/src/test/scala/net/kemitix/thorp/domain/RemoteKeyTest.scala @@ -0,0 +1,72 @@ +package net.kemitix.thorp.domain + +import java.io.File +import java.nio.file.Paths + +import org.scalatest.FreeSpec + +class RemoteKeyTest extends FreeSpec { + + private val emptyKey = RemoteKey("") + + "create a RemoteKey" - { + "can resolve a path" - { + "when key is empty" in { + val key = emptyKey + val path = "path" + val expected = RemoteKey("path") + val result = key.resolve(path) + assertResult(expected)(result) + } + "when path is empty" in { + val key = RemoteKey("key") + val path = "" + val expected = RemoteKey("key") + val result = key.resolve(path) + assertResult(expected)(result) + } + "when key and path are empty" in { + val key = emptyKey + val path = "" + val expected = emptyKey + val result = key.resolve(path) + assertResult(expected)(result) + } + } + "asFile" - { + "when key and prefix are non-empty" in { + val key = RemoteKey("prefix/key") + val source = Paths.get("source") + val prefix = RemoteKey("prefix") + val expected = Some(new File("source/key")) + val result = key.asFile(source, prefix) + assertResult(expected)(result) + } + "when prefix is empty" in { + val key = RemoteKey("key") + val source = Paths.get("source") + val prefix = emptyKey + val expected = Some(new File("source/key")) + val result = key.asFile(source, prefix) + assertResult(expected)(result) + } + "when key is empty" in { + val key = emptyKey + val source = Paths.get("source") + val prefix = RemoteKey("prefix") + val expected = None + val result = key.asFile(source, prefix) + assertResult(expected)(result) + } + "when key and prefix are empty" in { + val key = emptyKey + val source = Paths.get("source") + val prefix = emptyKey + val expected = None + val result = key.asFile(source, prefix) + assertResult(expected)(result) + } + } + } + +} diff --git a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/HashService.scala b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/HashService.scala index e9f9595..0d1efb1 100644 --- a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/HashService.scala +++ b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/HashService.scala @@ -1,6 +1,6 @@ package net.kemitix.thorp.storage.api -import java.io.File +import java.nio.file.Path import cats.effect.IO import net.kemitix.thorp.domain.{Logger, MD5Hash} @@ -10,6 +10,7 @@ import net.kemitix.thorp.domain.{Logger, MD5Hash} */ trait HashService { - def hashLocalObject(file: File)(implicit l: Logger): IO[Map[String, MD5Hash]] + def hashLocalObject(path: Path) + (implicit l: Logger): IO[Map[String, MD5Hash]] } diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ETagGenerator.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ETagGenerator.scala index 77fabdc..154d12f 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ETagGenerator.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ETagGenerator.scala @@ -1,6 +1,6 @@ package net.kemitix.thorp.storage.aws -import java.io.File +import java.nio.file.Path import cats.effect.IO import cats.implicits._ @@ -12,11 +12,11 @@ import net.kemitix.thorp.domain.{Logger, MD5Hash} trait ETagGenerator { - def eTag(file: File)(implicit l: Logger): IO[String]= { - val partSize = calculatePartSize(file) - val parts = numParts(file.length, partSize) + def eTag(path: Path)(implicit l: Logger): IO[String]= { + val partSize = calculatePartSize(path) + val parts = numParts(path.toFile.length, partSize) partsIndex(parts) - .map(digestChunk(file, partSize)).sequence + .map(digestChunk(path, partSize)).sequence .map(concatenateDigests) .map(MD5HashGenerator.hex) .map(hash => s"$hash-$parts") @@ -28,8 +28,8 @@ trait ETagGenerator { private def concatenateDigests: List[Array[Byte]] => Array[Byte] = lab => lab.foldLeft(Array[Byte]())((acc, ab) => acc ++ ab) - private def calculatePartSize(file: File) = { - val request = new PutObjectRequest("", "", file) + private def calculatePartSize(path: Path) = { + val request = new PutObjectRequest("", "", path.toFile) val configuration = new TransferManagerConfiguration TransferManagerUtils.calculateOptimalPartSize(request, configuration) } @@ -43,11 +43,11 @@ trait ETagGenerator { def offsets(totalFileSizeBytes: Long, optimalPartSize: Long): List[Long] = Range.Long(0, totalFileSizeBytes, optimalPartSize).toList - def digestChunk(file: File, chunkSize: Long)(chunkNumber: Long)(implicit l: Logger): IO[Array[Byte]] = - hashChunk(file, chunkNumber, chunkSize).map(_.digest) + def digestChunk(path: Path, chunkSize: Long)(chunkNumber: Long)(implicit l: Logger): IO[Array[Byte]] = + hashChunk(path, chunkNumber, chunkSize).map(_.digest) - def hashChunk(file: File, chunkNumber: Long, chunkSize: Long)(implicit l: Logger): IO[MD5Hash] = - MD5HashGenerator.md5FileChunk(file, chunkNumber * chunkSize, chunkSize) + def hashChunk(path: Path, chunkNumber: Long, chunkSize: Long)(implicit l: Logger): IO[MD5Hash] = + MD5HashGenerator.md5FileChunk(path, chunkNumber * chunkSize, chunkSize) } object ETagGenerator extends ETagGenerator diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3HashService.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3HashService.scala index 7f0dfc8..d5573f1 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3HashService.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3HashService.scala @@ -1,6 +1,6 @@ package net.kemitix.thorp.storage.aws -import java.io.File +import java.nio.file.Path import cats.effect.IO import net.kemitix.thorp.core.MD5HashGenerator @@ -12,13 +12,14 @@ trait S3HashService extends HashService { /** * Generates an MD5 Hash and an multi-part ETag * - * @param file the local file to scan + * @param path the local path to scan * @return a set of hash values */ - override def hashLocalObject(file: File)(implicit l: Logger): IO[Map[String, MD5Hash]] = + override def hashLocalObject(path: Path) + (implicit l: Logger): IO[Map[String, MD5Hash]] = for { - md5 <- MD5HashGenerator.md5File(file) - etag <- ETagGenerator.eTag(file).map(MD5Hash(_)) + md5 <- MD5HashGenerator.md5File(path) + etag <- ETagGenerator.eTag(path).map(MD5Hash(_)) } yield Map( "md5" -> md5, "etag" -> etag diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ETagGeneratorTest.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ETagGeneratorTest.scala index 4cb403e..3c72c47 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ETagGeneratorTest.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ETagGeneratorTest.scala @@ -8,6 +8,7 @@ import org.scalatest.FunSpec class ETagGeneratorTest extends FunSpec { private val bigFile = Resource(this, "big-file") + private val bigFilePath = bigFile.toPath private val configuration = new TransferManagerConfiguration private val chunkSize = 1200000 configuration.setMinimumUploadPartSize(chunkSize) @@ -35,7 +36,7 @@ class ETagGeneratorTest extends FunSpec { "8a0c1d0778ac8fcf4ca2010eba4711eb" ).zipWithIndex md5Hashes.foreach { case (hash, index) => - test(hash, ETagGenerator.hashChunk(bigFile, index, chunkSize)(logger).unsafeRunSync) + test(hash, ETagGenerator.hashChunk(bigFilePath, index, chunkSize)(logger).unsafeRunSync) } } } @@ -43,7 +44,7 @@ class ETagGeneratorTest extends FunSpec { describe("create etag for whole file") { val expected = "f14327c90ad105244c446c498bfe9a7d-2" it("should match aws etag for the file") { - val result = ETagGenerator.eTag(bigFile)(logger).unsafeRunSync + val result = ETagGenerator.eTag(bigFilePath)(logger).unsafeRunSync assertResult(expected)(result) } } diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3StorageServiceSuite.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3StorageServiceSuite.scala index b5934f0..40c357f 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3StorageServiceSuite.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/S3StorageServiceSuite.scala @@ -18,8 +18,9 @@ class S3StorageServiceSuite describe("listObjectsInPrefix") { val source = Resource(this, "upload") + val sourcePath = source.toPath val prefix = RemoteKey("prefix") - implicit val config: Config = Config(Bucket("bucket"), prefix, source = source) + implicit val config: Config = Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath))) implicit val implLogger: Logger = new DummyLogger val lm = LastModified(Instant.now.truncatedTo(ChronoUnit.MILLIS)) diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala index a12f3ee..c9d8b7f 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala @@ -18,19 +18,20 @@ class StorageServiceSuite with MockFactory { val source = Resource(this, "upload") + val sourcePath = source.toPath private val prefix = RemoteKey("prefix") - implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) + implicit private val config: Config = Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath))) implicit private val implLogger: Logger = new DummyLogger - private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _ + private val fileToKey = KeyGenerator.generateKey(config.sources, config.prefix) _ describe("getS3Status") { val hash = MD5Hash("hash") - val localFile = LocalFile.resolve("the-file", md5HashMap(hash), source, fileToKey) + val localFile = LocalFile.resolve("the-file", md5HashMap(hash), sourcePath, fileToKey) val key = localFile.remoteKey - val keyOtherKey = LocalFile.resolve("other-key-same-hash", md5HashMap(hash), source, fileToKey) + val keyOtherKey = LocalFile.resolve("other-key-same-hash", md5HashMap(hash), sourcePath, fileToKey) val diffHash = MD5Hash("diff") - val keyDiffHash = LocalFile.resolve("other-key-diff-hash", md5HashMap(diffHash), source, fileToKey) + val keyDiffHash = LocalFile.resolve("other-key-diff-hash", md5HashMap(diffHash), sourcePath, fileToKey) val lastModified = LastModified(Instant.now) val s3ObjectsData: S3ObjectsData = S3ObjectsData( byHash = Map( @@ -70,7 +71,7 @@ class StorageServiceSuite } describe("when remote key does not exist and no others matches hash") { - val localFile = LocalFile.resolve("missing-file", md5HashMap(MD5Hash("unique")), source, fileToKey) + val localFile = LocalFile.resolve("missing-file", md5HashMap(MD5Hash("unique")), sourcePath, fileToKey) it("should return no matches by key") { val result = getMatchesByKey(invoke(localFile)) assert(result.isEmpty) @@ -113,7 +114,7 @@ class StorageServiceSuite val prefix = RemoteKey("prefix") val localFile = - LocalFile.resolve("root-file", md5HashMap(Root.hash), source, KeyGenerator.generateKey(source, prefix)) + LocalFile.resolve("root-file", md5HashMap(Root.hash), sourcePath, KeyGenerator.generateKey(config.sources, prefix)) val bucket = Bucket("a-bucket") val remoteKey = RemoteKey("prefix/root-file") val uploadEventListener = new UploadEventListener(localFile, 1, SyncTotals(), 0L) diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala index 0ebba03..2fecef9 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala @@ -16,10 +16,11 @@ class UploaderSuite with MockFactory { private val source = Resource(this, ".") + private val sourcePath = source.toPath private val prefix = RemoteKey("prefix") - implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) + implicit private val config: Config = Config(Bucket("bucket"), prefix, sources = Sources(List(sourcePath))) implicit private val implLogger: Logger = new DummyLogger - private val fileToKey = generateKey(config.source, config.prefix) _ + private val fileToKey = generateKey(config.sources, config.prefix) _ val lastModified = LastModified(Instant.now()) def md5HashMap(hash: MD5Hash): Map[String, MD5Hash] = @@ -38,7 +39,7 @@ class UploaderSuite // dies when putObject is called val returnedKey = RemoteKey("returned-key") val returnedHash = MD5Hash("returned-hash") - val bigFile = LocalFile.resolve("small-file", md5HashMap(MD5Hash("the-hash")), source, fileToKey) + val bigFile = LocalFile.resolve("small-file", md5HashMap(MD5Hash("the-hash")), sourcePath, fileToKey) val uploadEventListener = new UploadEventListener(bigFile, 1, SyncTotals(), 0L) val amazonS3 = mock[AmazonS3] val amazonS3TransferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build