diff --git a/build.sbt b/build.sbt index 6473912..3e288d3 100644 --- a/build.sbt +++ b/build.sbt @@ -72,6 +72,7 @@ lazy val core = (project in file("core")) .settings(assemblyJarName in assembly := "core.jar") .settings(testDependencies) .dependsOn(`storage-api`) + .dependsOn(domain % "compile->compile;test->test") lazy val `storage-api` = (project in file("storage-api")) .settings(commonSettings) diff --git a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala index 8fe4699..686c90f 100644 --- a/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala +++ b/cli/src/main/scala/net/kemitix/thorp/cli/Program.scala @@ -4,6 +4,7 @@ import cats.effect.{ExitCode, IO} import cats.implicits._ import net.kemitix.thorp.core._ import net.kemitix.thorp.domain.{Logger, StorageQueueEvent} +import net.kemitix.thorp.storage.aws.S3HashService.defaultHashService import net.kemitix.thorp.storage.aws.S3StorageServiceBuilder.defaultStorageService trait Program { @@ -11,7 +12,7 @@ trait Program { def apply(cliOptions: Seq[ConfigOption]): IO[ExitCode] = { implicit val logger: Logger = new PrintLogger() for { - actions <- Synchronise(defaultStorageService, cliOptions).valueOrF(handleErrors) + actions <- Synchronise(defaultStorageService, defaultHashService, cliOptions).valueOrF(handleErrors) events <- handleActions(UnversionedMirrorArchive.default(defaultStorageService), actions) _ <- SyncLogging.logRunFinished(events) } yield ExitCode.Success diff --git a/core/src/main/scala/net/kemitix/thorp/core/ActionGenerator.scala b/core/src/main/scala/net/kemitix/thorp/core/ActionGenerator.scala index a9df59b..18b09ff 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/ActionGenerator.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/ActionGenerator.scala @@ -49,8 +49,13 @@ object ActionGenerator { private def copyFile(bucket: Bucket, localFile: LocalFile, - matchByHash: Set[RemoteMetaData]): Stream[Action] = - matchByHash.headOption.map(_.remoteKey).toStream.map {sourceKey => - ToCopy(bucket, sourceKey, localFile.hash, localFile.remoteKey)} + matchByHash: Set[RemoteMetaData]): Stream[Action] = { + val headOption = matchByHash.headOption + headOption.toStream.map { remoteMetaData => + val sourceKey = remoteMetaData.remoteKey + val hash = remoteMetaData.hash + ToCopy(bucket, sourceKey, hash, localFile.remoteKey) + } + } } diff --git a/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala b/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala index 220ac97..060fed9 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/LocalFileStream.scala @@ -7,11 +7,12 @@ import cats.effect.IO import net.kemitix.thorp.core.KeyGenerator.generateKey import net.kemitix.thorp.domain import net.kemitix.thorp.domain._ +import net.kemitix.thorp.storage.api.HashService object LocalFileStream { def findFiles(file: File, - md5HashGenerator: File => IO[MD5Hash]) + hashService: HashService) (implicit c: Config, logger: Logger): IO[Stream[LocalFile]] = { @@ -25,11 +26,10 @@ object LocalFileStream { Stream(fs: _*) .filter(f => filters(f.toPath))) - def recurseIntoSubDirectories(file: File)(implicit c: Config): IO[Stream[LocalFile]] = + def recurseIntoSubDirectories(file: File): IO[Stream[LocalFile]] = file match { case f if f.isDirectory => loop(file) - case _ => for(hash <- md5HashGenerator(file)) - yield Stream(domain.LocalFile(file, c.source, hash, generateKey(c.source, c.prefix)(file))) + case _ => localFile(hashService, file) } def recurse(fs: Stream[File]): IO[Stream[LocalFile]] = @@ -48,6 +48,12 @@ object LocalFileStream { loop(file) } + private def localFile(hashService: HashService, file: File)(implicit l: Logger, c: Config) = { + for { + hash <- hashService.hashLocalObject(file) + } yield Stream(domain.LocalFile(file, c.source, hash, generateKey(c.source, c.prefix)(file))) + } + //TODO: Change this to return an Either[IllegalArgumentException, Array[File]] private def listFiles(file: File) = { Option(file.listFiles) diff --git a/core/src/main/scala/net/kemitix/thorp/core/MD5HashGenerator.scala b/core/src/main/scala/net/kemitix/thorp/core/MD5HashGenerator.scala index 4816612..980dbfb 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/MD5HashGenerator.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/MD5HashGenerator.scala @@ -10,51 +10,72 @@ import scala.collection.immutable.NumericRange object MD5HashGenerator { - def md5File(file: File) - (implicit logger: Logger): IO[MD5Hash] = { + val maxBufferSize = 8048 + val defaultBuffer = new Array[Byte](maxBufferSize) - val maxBufferSize = 8048 - val defaultBuffer = new Array[Byte](maxBufferSize) - def openFile = IO.pure(new FileInputStream(file)) - def closeFile = {fis: FileInputStream => IO(fis.close())} + def hex(in: Array[Byte]): String = { + val md5 = MessageDigest getInstance "MD5" + md5 update in + (md5.digest map ("%02x" format _)).mkString + } - def nextChunkSize(currentOffset: Long) = { - // a value between 1 and maxBufferSize - val toRead = file.length - currentOffset - val result = Math.min(maxBufferSize, toRead) - result.toInt + def digest(in: String): Array[Byte] = { + val md5 = MessageDigest getInstance "MD5" + md5 update in.getBytes + md5.digest + } + + def md5File(file: File)(implicit logger: Logger): IO[MD5Hash] = + md5FileChunk(file, 0, file.length) + + private def openFile(file: File, offset: Long) = IO { + val stream = new FileInputStream(file) + stream skip offset + stream + } + + private def closeFile(fis: FileInputStream) = IO(fis.close()) + + private def readFile(file: File, offset: Long, endOffset: Long) = + for { + fis <- openFile(file, offset) + digest <- digestFile(fis, offset, endOffset) + _ <- closeFile(fis) + } yield digest + + private def digestFile(fis: FileInputStream, offset: Long, endOffset: Long) = + IO { + val md5 = MessageDigest getInstance "MD5" + NumericRange(offset, endOffset, maxBufferSize) + .foreach(currentOffset => md5 update readToBuffer(fis, currentOffset, endOffset)) + md5.digest } - def readToBuffer(fis: FileInputStream, - currentOffset: Long) = { - val buffer = - if (nextChunkSize(currentOffset) < maxBufferSize) new Array[Byte](nextChunkSize(currentOffset)) - else defaultBuffer - fis read buffer - buffer - } + private def readToBuffer(fis: FileInputStream, + currentOffset: Long, + endOffset: Long) = { + val buffer = + if (nextBufferSize(currentOffset, endOffset) < maxBufferSize) + new Array[Byte](nextBufferSize(currentOffset, endOffset)) + else defaultBuffer + fis read buffer + buffer + } - def digestFile(fis: FileInputStream) = - IO { - val md5 = MessageDigest getInstance "MD5" - NumericRange(0, file.length, maxBufferSize) - .foreach { currentOffset => { - val buffer = readToBuffer(fis, currentOffset) - md5 update buffer - }} - md5.digest - } - - def readFile = - for { - fis <- openFile - digest <- digestFile(fis) - _ <- closeFile(fis) - } yield digest + private def nextBufferSize(currentOffset: Long, endOffset: Long) = { + val toRead = endOffset - currentOffset + val result = Math.min(maxBufferSize, toRead) + result.toInt + } + def md5FileChunk(file: File, + offset: Long, + size: Long) + (implicit logger: Logger): IO[MD5Hash] = { + val endOffset = Math.min(offset + size, file.length) for { _ <- logger.debug(s"md5:reading:size ${file.length}:$file") - digest <- readFile + digest <- readFile(file, offset, endOffset) hash = MD5Hash.fromDigest(digest) _ <- logger.debug(s"md5:generated:${hash.hash}:$file") } yield hash diff --git a/core/src/main/scala/net/kemitix/thorp/core/S3MetaDataEnricher.scala b/core/src/main/scala/net/kemitix/thorp/core/S3MetaDataEnricher.scala index fc220e8..1d5fe89 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/S3MetaDataEnricher.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/S3MetaDataEnricher.scala @@ -10,13 +10,17 @@ object S3MetaDataEnricher { val (keyMatches, hashMatches) = getS3Status(localFile, s3ObjectsData) S3MetaData(localFile, matchByKey = keyMatches map { hm => RemoteMetaData(localFile.remoteKey, hm.hash, hm.modified) }, - matchByHash = hashMatches map { km => RemoteMetaData(km.key, localFile.hash, km.modified) }) + matchByHash = hashMatches map { case (hash, km) => RemoteMetaData(km.key, hash, km.modified) }) } def getS3Status(localFile: LocalFile, - s3ObjectsData: S3ObjectsData): (Option[HashModified], Set[KeyModified]) = { + s3ObjectsData: S3ObjectsData): (Option[HashModified], Set[(MD5Hash, KeyModified)]) = { val matchingByKey = s3ObjectsData.byKey.get(localFile.remoteKey) - val matchingByHash = s3ObjectsData.byHash.getOrElse(localFile.hash, Set()) + val matchingByHash = localFile.hashes + .map { case(_, md5Hash) => + s3ObjectsData.byHash.getOrElse(md5Hash, Set()) + .map(km => (md5Hash, km)) + }.flatten.toSet (matchingByKey, matchingByHash) } diff --git a/core/src/main/scala/net/kemitix/thorp/core/Synchronise.scala b/core/src/main/scala/net/kemitix/thorp/core/Synchronise.scala index a4ff672..8e8d494 100644 --- a/core/src/main/scala/net/kemitix/thorp/core/Synchronise.scala +++ b/core/src/main/scala/net/kemitix/thorp/core/Synchronise.scala @@ -1,21 +1,21 @@ package net.kemitix.thorp.core -import cats.data.NonEmptyChain -import cats.data.EitherT +import cats.data.{EitherT, NonEmptyChain} import cats.effect.IO import cats.implicits._ import net.kemitix.thorp.core.Action.DoNothing -import net.kemitix.thorp.domain.{Config, LocalFile, Logger, RemoteKey, S3ObjectsData} -import net.kemitix.thorp.storage.api.StorageService +import net.kemitix.thorp.domain._ +import net.kemitix.thorp.storage.api.{HashService, StorageService} trait Synchronise { def apply(storageService: StorageService, + hashService: HashService, configOptions: Seq[ConfigOption]) - (implicit logger: Logger): EitherT[IO, List[String], Stream[Action]] = + (implicit l: Logger): EitherT[IO, List[String], Stream[Action]] = EitherT(ConfigurationBuilder.buildConfig(configOptions)) .swap.map(errorMessages).swap - .flatMap(config => useValidConfig(storageService, config)) + .flatMap(config => useValidConfig(storageService, hashService)(config, l)) def errorMessages(errors: NonEmptyChain[ConfigValidation]): List[String] = errors.map(cv => cv.errorMessage).toList @@ -26,45 +26,50 @@ trait Synchronise { } def useValidConfig(storageService: StorageService, - config: Config) - (implicit logger: Logger): EitherT[IO, List[String], Stream[Action]] = { + hashService: HashService) + (implicit c: Config, l: Logger): EitherT[IO, List[String], Stream[Action]] = { for { - _ <- EitherT.liftF(SyncLogging.logRunStart(config.bucket, config.prefix, config.source)) - actions <- gatherMetadata(storageService, logger, config) + _ <- EitherT.liftF(SyncLogging.logRunStart(c.bucket, c.prefix, c.source)) + actions <- gatherMetadata(storageService, hashService) .swap.map(error => List(error)).swap .map { case (remoteData, localData) => - (actionsForLocalFiles(config, localData, remoteData) ++ - actionsForRemoteKeys(config, remoteData)) + (actionsForLocalFiles(localData, remoteData) ++ + actionsForRemoteKeys(remoteData)) .filter(removeDoNothing) } } yield actions } private def gatherMetadata(storageService: StorageService, - logger: Logger, - config: Config): EitherT[IO, String, (S3ObjectsData, Stream[LocalFile])] = + hashService: HashService) + (implicit l: Logger, + c: Config): EitherT[IO, String, (S3ObjectsData, Stream[LocalFile])] = for { - remoteData <- fetchRemoteData(storageService, config) - localData <- EitherT.liftF(findLocalFiles(config, logger)) + remoteData <- fetchRemoteData(storageService) + localData <- EitherT.liftF(findLocalFiles(hashService)) } yield (remoteData, localData) - private def actionsForLocalFiles(config: Config, localData: Stream[LocalFile], remoteData: S3ObjectsData) = - localData.foldLeft(Stream[Action]())((acc, lf) => createActionFromLocalFile(config, lf, remoteData) ++ acc) + private def actionsForLocalFiles(localData: Stream[LocalFile], remoteData: S3ObjectsData) + (implicit c: Config) = + localData.foldLeft(Stream[Action]())((acc, lf) => createActionFromLocalFile(lf, remoteData) ++ acc) - private def actionsForRemoteKeys(config: Config, remoteData: S3ObjectsData) = - remoteData.byKey.keys.foldLeft(Stream[Action]())((acc, rk) => createActionFromRemoteKey(config, rk) #:: acc) + private def actionsForRemoteKeys(remoteData: S3ObjectsData) + (implicit c: Config) = + remoteData.byKey.keys.foldLeft(Stream[Action]())((acc, rk) => createActionFromRemoteKey(rk) #:: acc) - private def fetchRemoteData(storageService: StorageService, config: Config) = - storageService.listObjects(config.bucket, config.prefix) + private def fetchRemoteData(storageService: StorageService)(implicit c: Config) = + storageService.listObjects(c.bucket, c.prefix) - private def findLocalFiles(implicit config: Config, l: Logger) = - LocalFileStream.findFiles(config.source, MD5HashGenerator.md5File(_)) + private def findLocalFiles(hashService: HashService)(implicit config: Config, l: Logger) = + LocalFileStream.findFiles(config.source, hashService) - private def createActionFromLocalFile(c: Config, lf: LocalFile, remoteData: S3ObjectsData) = - ActionGenerator.createActions(S3MetaDataEnricher.getMetadata(lf, remoteData)(c))(c) + private def createActionFromLocalFile(lf: LocalFile, remoteData: S3ObjectsData) + (implicit c: Config) = + ActionGenerator.createActions(S3MetaDataEnricher.getMetadata(lf, remoteData)) - private def createActionFromRemoteKey(c: Config, rk: RemoteKey) = + private def createActionFromRemoteKey(rk: RemoteKey) + (implicit c: Config) = if (rk.isMissingLocally(c.source, c.prefix)) Action.ToDelete(c.bucket, rk) else DoNothing(c.bucket, rk) diff --git a/core/src/test/scala/net/kemitix/thorp/core/ActionGeneratorSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/ActionGeneratorSuite.scala index 199ad99..76a3b62 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/ActionGeneratorSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/ActionGeneratorSuite.scala @@ -22,7 +22,7 @@ class ActionGeneratorSuite describe("#1 local exists, remote exists, remote matches - do nothing") { val theHash = MD5Hash("the-hash") - val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey) + val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) val theRemoteMetadata = RemoteMetaData(theFile.remoteKey, theHash, lastModified) val input = S3MetaData(theFile, // local exists matchByHash = Set(theRemoteMetadata), // remote matches @@ -36,7 +36,7 @@ class ActionGeneratorSuite } describe("#2 local exists, remote is missing, other matches - copy") { val theHash = MD5Hash("the-hash") - val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey) + val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) val theRemoteKey = theFile.remoteKey val otherRemoteKey = prefix.resolve("other-key") val otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash, lastModified) @@ -51,7 +51,7 @@ class ActionGeneratorSuite } describe("#3 local exists, remote is missing, other no matches - upload") { val theHash = MD5Hash("the-hash") - val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey) + val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) val input = S3MetaData(theFile, // local exists matchByHash = Set.empty, // other no matches matchByKey = None) // remote is missing @@ -63,7 +63,7 @@ class ActionGeneratorSuite } describe("#4 local exists, remote exists, remote no match, other matches - copy") { val theHash = MD5Hash("the-hash") - val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey) + val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) val theRemoteKey = theFile.remoteKey val oldHash = MD5Hash("old-hash") val otherRemoteKey = prefix.resolve("other-key") @@ -82,7 +82,7 @@ class ActionGeneratorSuite } describe("#5 local exists, remote exists, remote no match, other no matches - upload") { val theHash = MD5Hash("the-hash") - val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey) + val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) val theRemoteKey = theFile.remoteKey val oldHash = MD5Hash("old-hash") val theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash, lastModified) @@ -102,4 +102,8 @@ class ActionGeneratorSuite } } } + + private def md5HashMap(theHash: MD5Hash) = { + Map("md5" -> theHash) + } } diff --git a/core/src/test/scala/net/kemitix/thorp/core/DummyHashService.scala b/core/src/test/scala/net/kemitix/thorp/core/DummyHashService.scala new file mode 100644 index 0000000..facf6be --- /dev/null +++ b/core/src/test/scala/net/kemitix/thorp/core/DummyHashService.scala @@ -0,0 +1,11 @@ +package net.kemitix.thorp.core + +import java.io.File + +import cats.effect.IO +import net.kemitix.thorp.domain.{Logger, MD5Hash} +import net.kemitix.thorp.storage.api.HashService + +case class DummyHashService(hashes: Map[File, Map[String, MD5Hash]]) extends HashService { + override def hashLocalObject(file: File)(implicit l: Logger): IO[Map[String, MD5Hash]] = IO.pure(hashes(file)) +} diff --git a/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala index f5485b7..f9fea81 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/LocalFileStreamSuite.scala @@ -1,22 +1,29 @@ package net.kemitix.thorp.core -import java.io.File +import java.nio.file.Paths -import cats.effect.IO -import net.kemitix.thorp.domain.{Config, LocalFile, Logger, MD5Hash} +import net.kemitix.thorp.domain.{Config, LocalFile, Logger, MD5HashData} +import net.kemitix.thorp.storage.api.HashService import org.scalatest.FunSpec class LocalFileStreamSuite extends FunSpec { - val uploadResource = Resource(this, "upload") - implicit val config: Config = Config(source = uploadResource) + private val uploadResource = Resource(this, "upload") + private val hashService: HashService = DummyHashService(Map( + file("root-file") -> Map("md5" -> MD5HashData.Root.hash), + file("subdir/leaf-file") -> Map("md5" -> MD5HashData.Leaf.hash) + )) + + private def file(filename: String) = + uploadResource.toPath.resolve(Paths.get(filename)).toFile + + implicit private val config: Config = Config(source = uploadResource) implicit private val logger: Logger = new DummyLogger - val md5HashGenerator: File => IO[MD5Hash] = file => MD5HashGenerator.md5File(file) describe("findFiles") { it("should find all files") { val result: Set[String] = - LocalFileStream.findFiles(uploadResource, md5HashGenerator).unsafeRunSync.toSet + LocalFileStream.findFiles(uploadResource, hashService).unsafeRunSync.toSet .map { x: LocalFile => x.relative.toString } assertResult(Set("subdir/leaf-file", "root-file"))(result) } diff --git a/core/src/test/scala/net/kemitix/thorp/core/MD5HashData.scala b/core/src/test/scala/net/kemitix/thorp/core/MD5HashData.scala deleted file mode 100644 index 1316fe0..0000000 --- a/core/src/test/scala/net/kemitix/thorp/core/MD5HashData.scala +++ /dev/null @@ -1,11 +0,0 @@ -package net.kemitix.thorp.core - -import net.kemitix.thorp.domain.MD5Hash - -object MD5HashData { - - val rootHash = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e", Some("o6asEaDrV3uBs7tclcyKbg==")) - - val leafHash = MD5Hash("208386a650bdec61cfcd7bd8dcb6b542", Some("IIOGplC97GHPzXvY3La1Qg==")) - -} diff --git a/core/src/test/scala/net/kemitix/thorp/core/MD5HashGeneratorTest.scala b/core/src/test/scala/net/kemitix/thorp/core/MD5HashGeneratorTest.scala index 9ff4c35..2d845c6 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/MD5HashGeneratorTest.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/MD5HashGeneratorTest.scala @@ -1,6 +1,6 @@ package net.kemitix.thorp.core -import net.kemitix.thorp.core.MD5HashData.rootHash +import net.kemitix.thorp.domain.MD5HashData.Root import net.kemitix.thorp.domain._ import org.scalatest.FunSpec @@ -11,20 +11,35 @@ class MD5HashGeneratorTest extends FunSpec { implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) implicit private val logger: Logger = new DummyLogger - describe("read a small file (smaller than buffer)") { - val file = Resource(this, "upload/root-file") - it("should generate the correct hash") { - val result = MD5HashGenerator.md5File(file).unsafeRunSync - assertResult(rootHash)(result) - } + describe("read a small file (smaller than buffer)") { + val file = Resource(this, "upload/root-file") + it("should generate the correct hash") { + val result = MD5HashGenerator.md5File(file).unsafeRunSync + assertResult(Root.hash)(result) } - describe("read a large file (bigger than buffer)") { - val file = Resource(this, "big-file") - it("should generate the correct hash") { - val expected = MD5Hash("b1ab1f7680138e6db7309200584e35d8", Some("sasfdoATjm23MJIAWE412A==")) - val result = MD5HashGenerator.md5File(file).unsafeRunSync - assertResult(expected)(result) - } + } + describe("read a large file (bigger than buffer)") { + val file = Resource(this, "big-file") + it("should generate the correct hash") { + val expected = MD5HashData.BigFile.hash + val result = MD5HashGenerator.md5File(file).unsafeRunSync + assertResult(expected)(result) } + } + describe("read chunks of file") { + val file = Resource(this, "big-file") + it("should generate the correct hash for first chunk of the file") { + val part1 = MD5HashData.BigFile.Part1 + val expected = part1.hash + val result = MD5HashGenerator.md5FileChunk(file, part1.offset, part1.size).unsafeRunSync + assertResult(expected)(result) + } + it("should generate the correcy hash for second chunk of the file") { + val part2 = MD5HashData.BigFile.Part2 + val expected = part2.hash + val result = MD5HashGenerator.md5FileChunk(file, part2.offset, part2.size).unsafeRunSync + assertResult(expected)(result) + } + } } diff --git a/core/src/test/scala/net/kemitix/thorp/core/S3MetaDataEnricherSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/S3MetaDataEnricherSuite.scala index c07de3e..8cdef87 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/S3MetaDataEnricherSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/S3MetaDataEnricherSuite.scala @@ -15,11 +15,21 @@ class S3MetaDataEnricherSuite private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _ val lastModified = LastModified(Instant.now()) + def getMatchesByKey(status: (Option[HashModified], Set[(MD5Hash, KeyModified)])): Option[HashModified] = { + val (byKey, _) = status + byKey + } + + def getMatchesByHash(status: (Option[HashModified], Set[(MD5Hash, KeyModified)])): Set[(MD5Hash, KeyModified)] = { + val (_, byHash) = status + byHash + } + describe("enrich with metadata") { describe("#1a local exists, remote exists, remote matches, other matches - do nothing") { val theHash: MD5Hash = MD5Hash("the-file-hash") - val theFile: LocalFile = LocalFile.resolve("the-file", theHash, source, fileToKey) + val theFile: LocalFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) val theRemoteKey: RemoteKey = theFile.remoteKey val s3: S3ObjectsData = S3ObjectsData( byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))), @@ -36,7 +46,7 @@ class S3MetaDataEnricherSuite } describe("#1b local exists, remote exists, remote matches, other no matches - do nothing") { val theHash: MD5Hash = MD5Hash("the-file-hash") - val theFile: LocalFile = LocalFile.resolve("the-file", theHash, source, fileToKey) + val theFile: LocalFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) val theRemoteKey: RemoteKey = prefix.resolve("the-file") val s3: S3ObjectsData = S3ObjectsData( byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))), @@ -53,7 +63,7 @@ class S3MetaDataEnricherSuite } describe("#2 local exists, remote is missing, remote no match, other matches - copy") { val theHash = MD5Hash("the-hash") - val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey) + val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) val otherRemoteKey = RemoteKey("other-key") val s3: S3ObjectsData = S3ObjectsData( byHash = Map(theHash -> Set(KeyModified(otherRemoteKey, lastModified))), @@ -70,7 +80,7 @@ class S3MetaDataEnricherSuite } describe("#3 local exists, remote is missing, remote no match, other no matches - upload") { val theHash = MD5Hash("the-hash") - val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey) + val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) val s3: S3ObjectsData = S3ObjectsData( byHash = Map(), byKey = Map() @@ -85,7 +95,7 @@ class S3MetaDataEnricherSuite } describe("#4 local exists, remote exists, remote no match, other matches - copy") { val theHash = MD5Hash("the-hash") - val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey) + val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) val theRemoteKey = theFile.remoteKey val oldHash = MD5Hash("old-hash") val otherRemoteKey = prefix.resolve("other-key") @@ -110,7 +120,7 @@ class S3MetaDataEnricherSuite } describe("#5 local exists, remote exists, remote no match, other no matches - upload") { val theHash = MD5Hash("the-hash") - val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey) + val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey) val theRemoteKey = theFile.remoteKey val oldHash = MD5Hash("old-hash") val s3: S3ObjectsData = S3ObjectsData( @@ -132,13 +142,17 @@ class S3MetaDataEnricherSuite } } + private def md5HashMap(theHash: MD5Hash) = { + Map("md5" -> theHash) + } + describe("getS3Status") { val hash = MD5Hash("hash") - val localFile = LocalFile.resolve("the-file", hash, source, fileToKey) + val localFile = LocalFile.resolve("the-file", md5HashMap(hash), source, fileToKey) val key = localFile.remoteKey - val keyOtherKey = LocalFile.resolve("other-key-same-hash", hash, source, fileToKey) + val keyOtherKey = LocalFile.resolve("other-key-same-hash", md5HashMap(hash), source, fileToKey) val diffHash = MD5Hash("diff") - val keyDiffHash = LocalFile.resolve("other-key-diff-hash", diffHash, source, fileToKey) + val keyDiffHash = LocalFile.resolve("other-key-diff-hash", md5HashMap(diffHash), source, fileToKey) val lastModified = LastModified(Instant.now) val s3ObjectsData: S3ObjectsData = S3ObjectsData( byHash = Map( @@ -154,32 +168,32 @@ class S3MetaDataEnricherSuite } describe("when remote key exists") { - it("should return (Some, Set.nonEmpty)") { - assertResult( - (Some(HashModified(hash, lastModified)), - Set( - KeyModified(key, lastModified), - KeyModified(keyOtherKey.remoteKey, lastModified))) - )(invoke(localFile)) + it("should return a result for matching key") { + val result = getMatchesByKey(invoke(localFile)) + assert(result.contains(HashModified(hash, lastModified))) } } describe("when remote key does not exist and no others matches hash") { - it("should return (None, Set.empty)") { - val localFile = LocalFile.resolve("missing-file", MD5Hash("unique"), source, fileToKey) - assertResult( - (None, - Set.empty) - )(invoke(localFile)) + val localFile = LocalFile.resolve("missing-file", md5HashMap(MD5Hash("unique")), source, fileToKey) + it("should return no matches by key") { + val result = getMatchesByKey(invoke(localFile)) + assert(result.isEmpty) + } + it("should return no matches by hash") { + val result = getMatchesByHash(invoke(localFile)) + assert(result.isEmpty) } } describe("when remote key exists and no others match hash") { - it("should return (None, Set.nonEmpty)") { - assertResult( - (Some(HashModified(diffHash, lastModified)), - Set(KeyModified(keyDiffHash.remoteKey, lastModified))) - )(invoke(keyDiffHash)) + it("should return match by key") { + val result = getMatchesByKey(invoke(keyDiffHash)) + assert(result.contains(HashModified(diffHash, lastModified))) + } + it("should return only itself in match by hash") { + val result = getMatchesByHash(invoke(keyDiffHash)) + assert(result.equals(Set((diffHash, KeyModified(keyDiffHash.remoteKey,lastModified))))) } } diff --git a/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala b/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala index 073f81b..b542cbb 100644 --- a/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala +++ b/core/src/test/scala/net/kemitix/thorp/core/SyncSuite.scala @@ -1,15 +1,16 @@ package net.kemitix.thorp.core import java.io.File +import java.nio.file.Paths import java.time.Instant import cats.data.EitherT import cats.effect.IO import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload} -import net.kemitix.thorp.core.MD5HashData.{leafHash, rootHash} -import net.kemitix.thorp.domain._ +import net.kemitix.thorp.domain.MD5HashData.{Leaf, Root} import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, UploadQueueEvent} -import net.kemitix.thorp.storage.api.StorageService +import net.kemitix.thorp.domain._ +import net.kemitix.thorp.storage.api.{HashService, StorageService} import org.scalatest.FunSpec class SyncSuite @@ -34,12 +35,23 @@ class SyncSuite // source contains the files root-file and subdir/leaf-file val rootRemoteKey = RemoteKey("prefix/root-file") val leafRemoteKey = RemoteKey("prefix/subdir/leaf-file") - val rootFile: LocalFile = LocalFile.resolve("root-file", rootHash, source, _ => rootRemoteKey) - val leafFile: LocalFile = LocalFile.resolve("subdir/leaf-file", leafHash, source, _ => leafRemoteKey) + val rootFile: LocalFile = LocalFile.resolve("root-file", md5HashMap(Root.hash), source, _ => rootRemoteKey) + + private def md5HashMap(md5Hash: MD5Hash): Map[String, MD5Hash] = { + Map("md5" -> md5Hash) + } + + val leafFile: LocalFile = LocalFile.resolve("subdir/leaf-file", md5HashMap(Leaf.hash), source, _ => leafRemoteKey) + + val hashService = DummyHashService(Map( + file("root-file") -> Map("md5" -> MD5HashData.Root.hash), + file("subdir/leaf-file") -> Map("md5" -> MD5HashData.Leaf.hash) + )) def invokeSubject(storageService: StorageService, + hashService: HashService, configOptions: List[ConfigOption]): Either[List[String], Stream[Action]] = { - Synchronise(storageService, configOptions).value.unsafeRunSync + Synchronise(storageService, hashService, configOptions).value.unsafeRunSync } describe("when all files should be uploaded") { @@ -50,22 +62,26 @@ class SyncSuite val expected = Right(Set( ToUpload(testBucket, rootFile), ToUpload(testBucket, leafFile))) - val result = invokeSubject(storageService, configOptions) + val result = invokeSubject(storageService, hashService, configOptions) assertResult(expected)(result.map(_.toSet)) } } + + private def file(filename: String) = + source.toPath.resolve(Paths.get(filename)).toFile + describe("when no files should be uploaded") { val s3ObjectsData = S3ObjectsData( byHash = Map( - rootHash -> Set(KeyModified(RemoteKey("prefix/root-file"), lastModified)), - leafHash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))), + Root.hash -> Set(KeyModified(RemoteKey("prefix/root-file"), lastModified)), + Leaf.hash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))), byKey = Map( - RemoteKey("prefix/root-file") -> HashModified(rootHash, lastModified), - RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified))) + RemoteKey("prefix/root-file") -> HashModified(Root.hash, lastModified), + RemoteKey("prefix/subdir/leaf-file") -> HashModified(Leaf.hash, lastModified))) val storageService = new RecordingStorageService(testBucket, s3ObjectsData) it("no actions") { val expected = Stream() - val result = invokeSubject(storageService, configOptions) + val result = invokeSubject(storageService, hashService, configOptions) assert(result.isRight) assertResult(expected)(result.right.get) } @@ -76,18 +92,18 @@ class SyncSuite // 'root-file-old' should be renamed as 'root-file' val s3ObjectsData = S3ObjectsData( byHash = Map( - rootHash -> Set(KeyModified(sourceKey, lastModified)), - leafHash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))), + Root.hash -> Set(KeyModified(sourceKey, lastModified)), + Leaf.hash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))), byKey = Map( - sourceKey -> HashModified(rootHash, lastModified), - RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified))) + sourceKey -> HashModified(Root.hash, lastModified), + RemoteKey("prefix/subdir/leaf-file") -> HashModified(Leaf.hash, lastModified))) val storageService = new RecordingStorageService(testBucket, s3ObjectsData) it("copies the file and deletes the original") { val expected = Stream( - ToCopy(testBucket, sourceKey, rootHash, targetKey), + ToCopy(testBucket, sourceKey, Root.hash, targetKey), ToDelete(testBucket, sourceKey) ) - val result = invokeSubject(storageService, configOptions) + val result = invokeSubject(storageService, hashService, configOptions) assert(result.isRight) assertResult(expected)(result.right.get) } @@ -102,19 +118,19 @@ class SyncSuite val deletedKey = RemoteKey("prefix/deleted-file") val s3ObjectsData = S3ObjectsData( byHash = Map( - rootHash -> Set(KeyModified(RemoteKey("prefix/root-file"), lastModified)), - leafHash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified)), + Root.hash -> Set(KeyModified(RemoteKey("prefix/root-file"), lastModified)), + Leaf.hash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified)), deletedHash -> Set(KeyModified(RemoteKey("prefix/deleted-file"), lastModified))), byKey = Map( - RemoteKey("prefix/root-file") -> HashModified(rootHash, lastModified), - RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified), + RemoteKey("prefix/root-file") -> HashModified(Root.hash, lastModified), + RemoteKey("prefix/subdir/leaf-file") -> HashModified(Leaf.hash, lastModified), deletedKey -> HashModified(deletedHash, lastModified))) val storageService = new RecordingStorageService(testBucket, s3ObjectsData) it("deleted key") { val expected = Stream( ToDelete(testBucket, deletedKey) ) - val result = invokeSubject(storageService, configOptions) + val result = invokeSubject(storageService,hashService, configOptions) assert(result.isRight) assertResult(expected)(result.right.get) } @@ -122,15 +138,15 @@ class SyncSuite describe("when a file is excluded") { val s3ObjectsData = S3ObjectsData( byHash = Map( - rootHash -> Set(KeyModified(RemoteKey("prefix/root-file"), lastModified)), - leafHash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))), + Root.hash -> Set(KeyModified(RemoteKey("prefix/root-file"), lastModified)), + Leaf.hash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))), byKey = Map( - RemoteKey("prefix/root-file") -> HashModified(rootHash, lastModified), - RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified))) + RemoteKey("prefix/root-file") -> HashModified(Root.hash, lastModified), + RemoteKey("prefix/subdir/leaf-file") -> HashModified(Leaf.hash, lastModified))) val storageService = new RecordingStorageService(testBucket, s3ObjectsData) it("is not uploaded") { val expected = Stream() - val result = invokeSubject(storageService, ConfigOption.Exclude("leaf") :: configOptions) + val result = invokeSubject(storageService, hashService, ConfigOption.Exclude("leaf") :: configOptions) assert(result.isRight) assertResult(expected)(result.right.get) } @@ -147,20 +163,17 @@ class SyncSuite override def upload(localFile: LocalFile, bucket: Bucket, uploadEventListener: UploadEventListener, - tryCount: Int): IO[UploadQueueEvent] = { - IO.pure(UploadQueueEvent(localFile.remoteKey, localFile.hash)) - } + tryCount: Int): IO[UploadQueueEvent] = + IO.pure(UploadQueueEvent(localFile.remoteKey, localFile.hashes("md5"))) override def copy(bucket: Bucket, sourceKey: RemoteKey, - hash: MD5Hash, - targetKey: RemoteKey): IO[CopyQueueEvent] = { + hashes: MD5Hash, + targetKey: RemoteKey): IO[CopyQueueEvent] = IO.pure(CopyQueueEvent(targetKey)) - } override def delete(bucket: Bucket, - remoteKey: RemoteKey): IO[DeleteQueueEvent] = { + remoteKey: RemoteKey): IO[DeleteQueueEvent] = IO.pure(DeleteQueueEvent(remoteKey)) - } } } diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/HexEncoder.scala b/domain/src/main/scala/net/kemitix/thorp/domain/HexEncoder.scala new file mode 100644 index 0000000..17a8495 --- /dev/null +++ b/domain/src/main/scala/net/kemitix/thorp/domain/HexEncoder.scala @@ -0,0 +1,23 @@ +package net.kemitix.thorp.domain + +import java.math.BigInteger + +trait HexEncoder { + + def encode(bytes: Array[Byte]): String = { + val bigInteger = new BigInteger(1, bytes) + String.format("%0" + (bytes.length << 1) + "x", bigInteger) + } + + def decode(hexString: String): Array[Byte] = { + val byteArray = new BigInteger(hexString, 16).toByteArray + if (byteArray(0) == 0) { + val output = new Array[Byte](byteArray.length - 1) + System.arraycopy(byteArray, 1, output, 0, output.length) + output + } else byteArray + } + +} + +object HexEncoder extends HexEncoder diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/LocalFile.scala b/domain/src/main/scala/net/kemitix/thorp/domain/LocalFile.scala index eae8bb7..1b5f012 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/LocalFile.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/LocalFile.scala @@ -3,7 +3,7 @@ package net.kemitix.thorp.domain import java.io.File import java.nio.file.Path -final case class LocalFile(file: File, source: File, hash: MD5Hash, remoteKey: RemoteKey) { +final case class LocalFile(file: File, source: File, hashes: Map[String, MD5Hash], remoteKey: RemoteKey) { require(!file.isDirectory, s"LocalFile must not be a directory: $file") @@ -12,16 +12,18 @@ final case class LocalFile(file: File, source: File, hash: MD5Hash, remoteKey: R // the path of the file within the source def relative: Path = source.toPath.relativize(file.toPath) - def matches(other: MD5Hash): Boolean = hash.hash == other.hash + def matches(other: MD5Hash): Boolean = hashes.values.exists(other equals _) + + def md5base64: Option[String] = hashes.get("md5").map(_.hash64) } object LocalFile { def resolve(path: String, - md5Hash: MD5Hash, + md5Hashes: Map[String, MD5Hash], source: File, fileToKey: File => RemoteKey): LocalFile = { val file = source.toPath.resolve(path).toFile - LocalFile(file, source, md5Hash, fileToKey(file)) + LocalFile(file, source, md5Hashes, fileToKey(file)) } } diff --git a/domain/src/main/scala/net/kemitix/thorp/domain/MD5Hash.scala b/domain/src/main/scala/net/kemitix/thorp/domain/MD5Hash.scala index 297decf..0cc6823 100644 --- a/domain/src/main/scala/net/kemitix/thorp/domain/MD5Hash.scala +++ b/domain/src/main/scala/net/kemitix/thorp/domain/MD5Hash.scala @@ -4,13 +4,18 @@ import java.util.Base64 import net.kemitix.thorp.domain.QuoteStripper.stripQuotes -final case class MD5Hash(in: String, hash64: Option[String] = None) { +final case class MD5Hash(in: String) { lazy val hash: String = in filter stripQuotes + lazy val digest: Array[Byte] = HexEncoder.decode(hash) + + lazy val hash64: String = Base64.getEncoder.encodeToString(digest) } object MD5Hash { - def fromDigest(digest: Array[Byte]): MD5Hash = - MD5Hash((digest map ("%02x" format _)).mkString, Some(Base64.getEncoder.encodeToString(digest))) + def fromDigest(digest: Array[Byte]): MD5Hash = { + val hexDigest = (digest map ("%02x" format _)).mkString + MD5Hash(hexDigest) + } } diff --git a/domain/src/test/scala/net/kemitix/thorp/domain/MD5HashData.scala b/domain/src/test/scala/net/kemitix/thorp/domain/MD5HashData.scala new file mode 100644 index 0000000..99a7b47 --- /dev/null +++ b/domain/src/test/scala/net/kemitix/thorp/domain/MD5HashData.scala @@ -0,0 +1,28 @@ +package net.kemitix.thorp.domain + +object MD5HashData { + + object Root { + val hash = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e") + val base64 = "o6asEaDrV3uBs7tclcyKbg==" + } + + object Leaf { + val hash = MD5Hash("208386a650bdec61cfcd7bd8dcb6b542") + val base64 = "IIOGplC97GHPzXvY3La1Qg==" + } + object BigFile { + val hash = MD5Hash("b1ab1f7680138e6db7309200584e35d8") + object Part1 { + val offset = 0 + val size = 1048576 + val hash = MD5Hash("39d4a9c78b9cfddf6d241a201a4ab726") + } + object Part2 { + val offset = 1048576 + val size = 1048576 + val hash = MD5Hash("af5876f3a3bc6e66f4ae96bb93d8dae0") + } + } + +} diff --git a/domain/src/test/scala/net/kemitix/thorp/domain/MD5HashTest.scala b/domain/src/test/scala/net/kemitix/thorp/domain/MD5HashTest.scala new file mode 100644 index 0000000..25c1510 --- /dev/null +++ b/domain/src/test/scala/net/kemitix/thorp/domain/MD5HashTest.scala @@ -0,0 +1,17 @@ +package net.kemitix.thorp.domain + +import org.scalatest.FunSpec + +class MD5HashTest extends FunSpec { + + describe("recover base64 hash") { + it("should recover base 64 #1") { + val rootHash = MD5HashData.Root.hash + assertResult(MD5HashData.Root.base64)(rootHash.hash64) + } + it("should recover base 64 #2") { + val leafHash = MD5HashData.Leaf.hash + assertResult(MD5HashData.Leaf.base64)(leafHash.hash64) + } + } +} diff --git a/project/plugins.sbt b/project/plugins.sbt new file mode 100644 index 0000000..c03f03a --- /dev/null +++ b/project/plugins.sbt @@ -0,0 +1 @@ +addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.3.2") diff --git a/storage-api/src/main/scala/net/kemitix/thorp/storage/api/HashService.scala b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/HashService.scala new file mode 100644 index 0000000..e9f9595 --- /dev/null +++ b/storage-api/src/main/scala/net/kemitix/thorp/storage/api/HashService.scala @@ -0,0 +1,15 @@ +package net.kemitix.thorp.storage.api + +import java.io.File + +import cats.effect.IO +import net.kemitix.thorp.domain.{Logger, MD5Hash} + +/** + * Creates one, or more, hashes for local objects. + */ +trait HashService { + + def hashLocalObject(file: File)(implicit l: Logger): IO[Map[String, MD5Hash]] + +} diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ETagGenerator.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ETagGenerator.scala new file mode 100644 index 0000000..77fabdc --- /dev/null +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/ETagGenerator.scala @@ -0,0 +1,53 @@ +package net.kemitix.thorp.storage.aws + +import java.io.File + +import cats.effect.IO +import cats.implicits._ +import com.amazonaws.services.s3.model.PutObjectRequest +import com.amazonaws.services.s3.transfer.TransferManagerConfiguration +import com.amazonaws.services.s3.transfer.internal.TransferManagerUtils +import net.kemitix.thorp.core.MD5HashGenerator +import net.kemitix.thorp.domain.{Logger, MD5Hash} + +trait ETagGenerator { + + def eTag(file: File)(implicit l: Logger): IO[String]= { + val partSize = calculatePartSize(file) + val parts = numParts(file.length, partSize) + partsIndex(parts) + .map(digestChunk(file, partSize)).sequence + .map(concatenateDigests) + .map(MD5HashGenerator.hex) + .map(hash => s"$hash-$parts") + } + + private def partsIndex(parts: Long) = + Range.Long(0, parts, 1).toList + + private def concatenateDigests: List[Array[Byte]] => Array[Byte] = + lab => lab.foldLeft(Array[Byte]())((acc, ab) => acc ++ ab) + + private def calculatePartSize(file: File) = { + val request = new PutObjectRequest("", "", file) + val configuration = new TransferManagerConfiguration + TransferManagerUtils.calculateOptimalPartSize(request, configuration) + } + + private def numParts(fileLength: Long, optimumPartSize: Long) = { + val fullParts = Math.floorDiv(fileLength, optimumPartSize) + val incompletePart = if (Math.floorMod(fileLength, optimumPartSize) > 0) 1 else 0 + fullParts + incompletePart + } + + def offsets(totalFileSizeBytes: Long, optimalPartSize: Long): List[Long] = + Range.Long(0, totalFileSizeBytes, optimalPartSize).toList + + def digestChunk(file: File, chunkSize: Long)(chunkNumber: Long)(implicit l: Logger): IO[Array[Byte]] = + hashChunk(file, chunkNumber, chunkSize).map(_.digest) + + def hashChunk(file: File, chunkNumber: Long, chunkSize: Long)(implicit l: Logger): IO[MD5Hash] = + MD5HashGenerator.md5FileChunk(file, chunkNumber * chunkSize, chunkSize) +} + +object ETagGenerator extends ETagGenerator diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientObjectLister.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientObjectLister.scala index ec804cf..3556079 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientObjectLister.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3ClientObjectLister.scala @@ -1,8 +1,7 @@ package net.kemitix.thorp.storage.aws -import cats.effect.IO import cats.data.EitherT -import cats.implicits._ +import cats.effect.IO import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary} import net.kemitix.thorp.domain @@ -11,7 +10,7 @@ import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey import scala.collection.JavaConverters._ -import scala.util.{Success, Try} +import scala.util.Try class S3ClientObjectLister(amazonS3: AmazonS3) { diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3HashService.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3HashService.scala new file mode 100644 index 0000000..7f0dfc8 --- /dev/null +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/S3HashService.scala @@ -0,0 +1,31 @@ +package net.kemitix.thorp.storage.aws + +import java.io.File + +import cats.effect.IO +import net.kemitix.thorp.core.MD5HashGenerator +import net.kemitix.thorp.domain.{Logger, MD5Hash} +import net.kemitix.thorp.storage.api.HashService + +trait S3HashService extends HashService { + + /** + * Generates an MD5 Hash and an multi-part ETag + * + * @param file the local file to scan + * @return a set of hash values + */ + override def hashLocalObject(file: File)(implicit l: Logger): IO[Map[String, MD5Hash]] = + for { + md5 <- MD5HashGenerator.md5File(file) + etag <- ETagGenerator.eTag(file).map(MD5Hash(_)) + } yield Map( + "md5" -> md5, + "etag" -> etag + ) + +} + +object S3HashService extends S3HashService { + lazy val defaultHashService: HashService = S3HashService +} diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala index 3d63735..c81aaf2 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala @@ -40,7 +40,7 @@ class Uploader(transferManager: => AmazonTransferManager) { private def request(localFile: LocalFile, bucket: Bucket, listener: ProgressListener): PutObjectRequest = { val metadata = new ObjectMetadata() - localFile.hash.hash64.foreach(metadata.setContentMD5) + localFile.md5base64.foreach(metadata.setContentMD5) new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file) .withMetadata(metadata) .withGeneralProgressListener(listener) diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ETagGeneratorTest.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ETagGeneratorTest.scala new file mode 100644 index 0000000..4cb403e --- /dev/null +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/ETagGeneratorTest.scala @@ -0,0 +1,51 @@ +package net.kemitix.thorp.storage.aws + +import com.amazonaws.services.s3.transfer.TransferManagerConfiguration +import net.kemitix.thorp.core.Resource +import net.kemitix.thorp.domain.MD5Hash +import org.scalatest.FunSpec + +class ETagGeneratorTest extends FunSpec { + + private val bigFile = Resource(this, "big-file") + private val configuration = new TransferManagerConfiguration + private val chunkSize = 1200000 + configuration.setMinimumUploadPartSize(chunkSize) + private val logger = new DummyLogger + + describe("Create offsets") { + it("should create offsets") { + val offsets = ETagGenerator.offsets(bigFile.length, chunkSize) + .foldRight(List[Long]())((l: Long, a: List[Long]) => l :: a) + assertResult(List(0, chunkSize, chunkSize * 2, chunkSize * 3, chunkSize * 4))(offsets) + } + } + + def test(expected: String, result: MD5Hash): Unit = { + assertResult(expected)(result.hash) + } + + describe("create md5 hash for each chunk") { + it("should create expected hash for chunks") { + val md5Hashes = List( + "68b7d37e6578297621e06f01800204f1", + "973475b14a7bda6ad8864a7f9913a947", + "b9adcfc5b103fe2dd5924a5e5e6817f0", + "5bd6e10a99fef100fe7bf5eaa0a42384", + "8a0c1d0778ac8fcf4ca2010eba4711eb" + ).zipWithIndex + md5Hashes.foreach { case (hash, index) => + test(hash, ETagGenerator.hashChunk(bigFile, index, chunkSize)(logger).unsafeRunSync) + } + } + } + + describe("create etag for whole file") { + val expected = "f14327c90ad105244c446c498bfe9a7d-2" + it("should match aws etag for the file") { + val result = ETagGenerator.eTag(bigFile)(logger).unsafeRunSync + assertResult(expected)(result) + } + } + +} diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala index ac7187c..04ecdff 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/StorageServiceSuite.scala @@ -6,11 +6,10 @@ import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.PutObjectRequest import com.amazonaws.services.s3.transfer.model.UploadResult import com.amazonaws.services.s3.transfer.{TransferManager, Upload} -import net.kemitix.thorp.core.MD5HashData.rootHash import net.kemitix.thorp.core.{KeyGenerator, Resource, S3MetaDataEnricher} +import net.kemitix.thorp.domain.MD5HashData.Root import net.kemitix.thorp.domain.StorageQueueEvent.UploadQueueEvent import net.kemitix.thorp.domain._ -import net.kemitix.thorp.storage.api.StorageService import org.scalamock.scalatest.MockFactory import org.scalatest.FunSpec @@ -27,60 +26,82 @@ class StorageServiceSuite describe("getS3Status") { val hash = MD5Hash("hash") - val localFile = LocalFile.resolve("the-file", hash, source, fileToKey) + val localFile = LocalFile.resolve("the-file", md5HashMap(hash), source, fileToKey) val key = localFile.remoteKey - val keyotherkey = LocalFile.resolve("other-key-same-hash", hash, source, fileToKey) - val diffhash = MD5Hash("diff") - val keydiffhash = LocalFile.resolve("other-key-diff-hash", diffhash, source, fileToKey) + val keyOtherKey = LocalFile.resolve("other-key-same-hash", md5HashMap(hash), source, fileToKey) + val diffHash = MD5Hash("diff") + val keyDiffHash = LocalFile.resolve("other-key-diff-hash", md5HashMap(diffHash), source, fileToKey) val lastModified = LastModified(Instant.now) val s3ObjectsData: S3ObjectsData = S3ObjectsData( byHash = Map( - hash -> Set(KeyModified(key, lastModified), KeyModified(keyotherkey.remoteKey, lastModified)), - diffhash -> Set(KeyModified(keydiffhash.remoteKey, lastModified))), + hash -> Set(KeyModified(key, lastModified), KeyModified(keyOtherKey.remoteKey, lastModified)), + diffHash -> Set(KeyModified(keyDiffHash.remoteKey, lastModified))), byKey = Map( key -> HashModified(hash, lastModified), - keyotherkey.remoteKey -> HashModified(hash, lastModified), - keydiffhash.remoteKey -> HashModified(diffhash, lastModified))) + keyOtherKey.remoteKey -> HashModified(hash, lastModified), + keyDiffHash.remoteKey -> HashModified(diffHash, lastModified))) - def invoke(self: StorageService, localFile: LocalFile) = { + def invoke(localFile: LocalFile) = S3MetaDataEnricher.getS3Status(localFile, s3ObjectsData) + + def getMatchesByKey(status: (Option[HashModified], Set[(MD5Hash, KeyModified)])): Option[HashModified] = { + val (byKey, _) = status + byKey } - describe("when remote key exists") { - val storageService = S3StorageServiceBuilder.defaultStorageService - it("should return (Some, Set.nonEmpty)") { + def getMatchesByHash(status: (Option[HashModified], Set[(MD5Hash, KeyModified)])): Set[(MD5Hash, KeyModified)] = { + val (_, byHash) = status + byHash + } + + describe("when remote key exists, unmodified and other key matches the hash") { + it("should return the match by key") { + val result = getMatchesByKey(invoke(localFile)) + assert(result.contains(HashModified(hash, lastModified))) + } + it("should return both matches for the hash") { + val result = getMatchesByHash(invoke(localFile)) assertResult( - (Some(HashModified(hash, lastModified)), - Set( - KeyModified(key, lastModified), - KeyModified(keyotherkey.remoteKey, lastModified))) - )(invoke(storageService, localFile)) + Set( + (hash, KeyModified(key, lastModified)), + (hash, KeyModified(keyOtherKey.remoteKey, lastModified))) + )(result) } } describe("when remote key does not exist and no others matches hash") { - val storageService = S3StorageServiceBuilder.defaultStorageService - it("should return (None, Set.empty)") { - val localFile = LocalFile.resolve("missing-file", MD5Hash("unique"), source, fileToKey) - assertResult( - (None, - Set.empty) - )(invoke(storageService, localFile)) + val localFile = LocalFile.resolve("missing-file", md5HashMap(MD5Hash("unique")), source, fileToKey) + it("should return no matches by key") { + val result = getMatchesByKey(invoke(localFile)) + assert(result.isEmpty) + } + it("should return no matches by hash") { + val result = getMatchesByHash(invoke(localFile)) + assert(result.isEmpty) } } describe("when remote key exists and no others match hash") { - val storageService = S3StorageServiceBuilder.defaultStorageService - it("should return (None, Set.nonEmpty)") { + val localFile = keyDiffHash + it("should return the match by key") { + val result = getMatchesByKey(invoke(localFile)) + assert(result.contains(HashModified(diffHash, lastModified))) + } + it("should return one match by hash") { + val result = getMatchesByHash(invoke(localFile)) assertResult( - (Some(HashModified(diffhash, lastModified)), - Set(KeyModified(keydiffhash.remoteKey, lastModified))) - )(invoke(storageService, keydiffhash)) + Set( + (diffHash, KeyModified(keyDiffHash.remoteKey, lastModified))) + )(result) } } } + private def md5HashMap(hash: MD5Hash) = { + Map("md5" -> hash) + } + describe("upload") { describe("when uploading a file") { @@ -90,7 +111,7 @@ class StorageServiceSuite val prefix = RemoteKey("prefix") val localFile = - LocalFile.resolve("root-file", rootHash, source, KeyGenerator.generateKey(source, prefix)) + LocalFile.resolve("root-file", md5HashMap(Root.hash), source, KeyGenerator.generateKey(source, prefix)) val bucket = Bucket("a-bucket") val remoteKey = RemoteKey("prefix/root-file") val uploadEventListener = new UploadEventListener(localFile) @@ -99,13 +120,13 @@ class StorageServiceSuite (amazonS3TransferManager upload (_: PutObjectRequest)).when(*).returns(upload) val uploadResult = stub[UploadResult] (upload.waitForUploadResult _).when().returns(uploadResult) - (uploadResult.getETag _).when().returns(rootHash.hash) + (uploadResult.getETag _).when().returns(Root.hash.hash) (uploadResult.getKey _).when().returns(remoteKey.key) it("should return hash of uploaded file") { pending //FIXME: works okay on its own, but fails when run with others - val expected = UploadQueueEvent(remoteKey, rootHash) + val expected = UploadQueueEvent(remoteKey, Root.hash) val result = storageService.upload(localFile, bucket, uploadEventListener, 1) assertResult(expected)(result) } diff --git a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala index 49776f9..2d4c6fd 100644 --- a/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala +++ b/storage-aws/src/test/scala/net/kemitix/thorp/storage/aws/UploaderSuite.scala @@ -22,6 +22,11 @@ class UploaderSuite private val fileToKey = generateKey(config.source, config.prefix) _ val lastModified = LastModified(Instant.now()) + def md5HashMap(hash: MD5Hash): Map[String, MD5Hash] = + Map( + "md5" -> hash + ) + describe("S3ClientMultiPartTransferManagerSuite") { describe("upload") { pending @@ -31,7 +36,7 @@ class UploaderSuite // dies when putObject is called val returnedKey = RemoteKey("returned-key") val returnedHash = MD5Hash("returned-hash") - val bigFile = LocalFile.resolve("small-file", MD5Hash("the-hash"), source, fileToKey) + val bigFile = LocalFile.resolve("small-file", md5HashMap(MD5Hash("the-hash")), source, fileToKey) val uploadEventListener = new UploadEventListener(bigFile) val amazonS3 = mock[AmazonS3] val amazonS3TransferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build