Use correct hash locally for comparing multi-part uploaded files (#82)

* [storage-aws] ETagGenerator add stub

* [core] MD5HashGenerator add hex and digest helpers

* [domain] MD5Hash can always provide base64 and also digest

Rather that store the base 64 digest some of the time, simply decode
it from the hex hash. The same for the binary digest.

MD5Hash is now cleaner now that it no longer has Option parameters.

* [core] MD5HashGenerator add stubs to allow reading file chunks

* [domain] MD5HashData add sub-objects

* [domain] MD5HashData move back into test where it belongs

* [sbt] add sbt-bloop plugin

* [domain] MD5HashData Add hash of big-file

* [domain] MD5HashData Add hash of big-file

* [core] MD5HashGenerator find end of chunk correctly

* [core] MD5HashGenerator offset is a Long

* [core] MD5HashGenerator don't read past the end of the file

* [storage-aws] ETagGenerator can reproduce ETags

* [storage-aws] ETagGeneratorTest added

* [storate-aws] ETagGenerator refactoring

* [storage-aws] ETageGenerator refactoring

* [core] SyncSuite remove redundant braces

* [storage-api] HashService added

* [storage-aws] S3HashService added

* [core] LocalFileStream refactoring

* [core] integrate HashService and ETagGenerator

* Optimise imports

* [domain] HexEncoder added to replace java 8 only DataTypeConverter

* [core] MD5HashGenerator refactoring

* [core] S3MetaDataEnricher refactoring

* [core] S3MetaDataEnricherSuite refactoring

* [storage-aws] ETagGeneratorTest refactoring

* [storage-aws] StorageServiceSuite refactoring

* [core] S3MetaDataEnricher refactoring

* [core] refactoring

* [storage-aws] refactoring
This commit is contained in:
Paul Campbell 2019-06-29 19:07:51 +01:00 committed by GitHub
parent 9418744136
commit ac9a52f93f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
28 changed files with 568 additions and 221 deletions

View file

@ -72,6 +72,7 @@ lazy val core = (project in file("core"))
.settings(assemblyJarName in assembly := "core.jar") .settings(assemblyJarName in assembly := "core.jar")
.settings(testDependencies) .settings(testDependencies)
.dependsOn(`storage-api`) .dependsOn(`storage-api`)
.dependsOn(domain % "compile->compile;test->test")
lazy val `storage-api` = (project in file("storage-api")) lazy val `storage-api` = (project in file("storage-api"))
.settings(commonSettings) .settings(commonSettings)

View file

@ -4,6 +4,7 @@ import cats.effect.{ExitCode, IO}
import cats.implicits._ import cats.implicits._
import net.kemitix.thorp.core._ import net.kemitix.thorp.core._
import net.kemitix.thorp.domain.{Logger, StorageQueueEvent} import net.kemitix.thorp.domain.{Logger, StorageQueueEvent}
import net.kemitix.thorp.storage.aws.S3HashService.defaultHashService
import net.kemitix.thorp.storage.aws.S3StorageServiceBuilder.defaultStorageService import net.kemitix.thorp.storage.aws.S3StorageServiceBuilder.defaultStorageService
trait Program { trait Program {
@ -11,7 +12,7 @@ trait Program {
def apply(cliOptions: Seq[ConfigOption]): IO[ExitCode] = { def apply(cliOptions: Seq[ConfigOption]): IO[ExitCode] = {
implicit val logger: Logger = new PrintLogger() implicit val logger: Logger = new PrintLogger()
for { for {
actions <- Synchronise(defaultStorageService, cliOptions).valueOrF(handleErrors) actions <- Synchronise(defaultStorageService, defaultHashService, cliOptions).valueOrF(handleErrors)
events <- handleActions(UnversionedMirrorArchive.default(defaultStorageService), actions) events <- handleActions(UnversionedMirrorArchive.default(defaultStorageService), actions)
_ <- SyncLogging.logRunFinished(events) _ <- SyncLogging.logRunFinished(events)
} yield ExitCode.Success } yield ExitCode.Success

View file

@ -49,8 +49,13 @@ object ActionGenerator {
private def copyFile(bucket: Bucket, private def copyFile(bucket: Bucket,
localFile: LocalFile, localFile: LocalFile,
matchByHash: Set[RemoteMetaData]): Stream[Action] = matchByHash: Set[RemoteMetaData]): Stream[Action] = {
matchByHash.headOption.map(_.remoteKey).toStream.map {sourceKey => val headOption = matchByHash.headOption
ToCopy(bucket, sourceKey, localFile.hash, localFile.remoteKey)} headOption.toStream.map { remoteMetaData =>
val sourceKey = remoteMetaData.remoteKey
val hash = remoteMetaData.hash
ToCopy(bucket, sourceKey, hash, localFile.remoteKey)
}
}
} }

View file

@ -7,11 +7,12 @@ import cats.effect.IO
import net.kemitix.thorp.core.KeyGenerator.generateKey import net.kemitix.thorp.core.KeyGenerator.generateKey
import net.kemitix.thorp.domain import net.kemitix.thorp.domain
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.HashService
object LocalFileStream { object LocalFileStream {
def findFiles(file: File, def findFiles(file: File,
md5HashGenerator: File => IO[MD5Hash]) hashService: HashService)
(implicit c: Config, (implicit c: Config,
logger: Logger): IO[Stream[LocalFile]] = { logger: Logger): IO[Stream[LocalFile]] = {
@ -25,11 +26,10 @@ object LocalFileStream {
Stream(fs: _*) Stream(fs: _*)
.filter(f => filters(f.toPath))) .filter(f => filters(f.toPath)))
def recurseIntoSubDirectories(file: File)(implicit c: Config): IO[Stream[LocalFile]] = def recurseIntoSubDirectories(file: File): IO[Stream[LocalFile]] =
file match { file match {
case f if f.isDirectory => loop(file) case f if f.isDirectory => loop(file)
case _ => for(hash <- md5HashGenerator(file)) case _ => localFile(hashService, file)
yield Stream(domain.LocalFile(file, c.source, hash, generateKey(c.source, c.prefix)(file)))
} }
def recurse(fs: Stream[File]): IO[Stream[LocalFile]] = def recurse(fs: Stream[File]): IO[Stream[LocalFile]] =
@ -48,6 +48,12 @@ object LocalFileStream {
loop(file) loop(file)
} }
private def localFile(hashService: HashService, file: File)(implicit l: Logger, c: Config) = {
for {
hash <- hashService.hashLocalObject(file)
} yield Stream(domain.LocalFile(file, c.source, hash, generateKey(c.source, c.prefix)(file)))
}
//TODO: Change this to return an Either[IllegalArgumentException, Array[File]] //TODO: Change this to return an Either[IllegalArgumentException, Array[File]]
private def listFiles(file: File) = { private def listFiles(file: File) = {
Option(file.listFiles) Option(file.listFiles)

View file

@ -10,51 +10,72 @@ import scala.collection.immutable.NumericRange
object MD5HashGenerator { object MD5HashGenerator {
def md5File(file: File) val maxBufferSize = 8048
(implicit logger: Logger): IO[MD5Hash] = { val defaultBuffer = new Array[Byte](maxBufferSize)
val maxBufferSize = 8048 def hex(in: Array[Byte]): String = {
val defaultBuffer = new Array[Byte](maxBufferSize) val md5 = MessageDigest getInstance "MD5"
def openFile = IO.pure(new FileInputStream(file)) md5 update in
def closeFile = {fis: FileInputStream => IO(fis.close())} (md5.digest map ("%02x" format _)).mkString
}
def nextChunkSize(currentOffset: Long) = { def digest(in: String): Array[Byte] = {
// a value between 1 and maxBufferSize val md5 = MessageDigest getInstance "MD5"
val toRead = file.length - currentOffset md5 update in.getBytes
val result = Math.min(maxBufferSize, toRead) md5.digest
result.toInt }
def md5File(file: File)(implicit logger: Logger): IO[MD5Hash] =
md5FileChunk(file, 0, file.length)
private def openFile(file: File, offset: Long) = IO {
val stream = new FileInputStream(file)
stream skip offset
stream
}
private def closeFile(fis: FileInputStream) = IO(fis.close())
private def readFile(file: File, offset: Long, endOffset: Long) =
for {
fis <- openFile(file, offset)
digest <- digestFile(fis, offset, endOffset)
_ <- closeFile(fis)
} yield digest
private def digestFile(fis: FileInputStream, offset: Long, endOffset: Long) =
IO {
val md5 = MessageDigest getInstance "MD5"
NumericRange(offset, endOffset, maxBufferSize)
.foreach(currentOffset => md5 update readToBuffer(fis, currentOffset, endOffset))
md5.digest
} }
def readToBuffer(fis: FileInputStream, private def readToBuffer(fis: FileInputStream,
currentOffset: Long) = { currentOffset: Long,
val buffer = endOffset: Long) = {
if (nextChunkSize(currentOffset) < maxBufferSize) new Array[Byte](nextChunkSize(currentOffset)) val buffer =
else defaultBuffer if (nextBufferSize(currentOffset, endOffset) < maxBufferSize)
fis read buffer new Array[Byte](nextBufferSize(currentOffset, endOffset))
buffer else defaultBuffer
} fis read buffer
buffer
}
def digestFile(fis: FileInputStream) = private def nextBufferSize(currentOffset: Long, endOffset: Long) = {
IO { val toRead = endOffset - currentOffset
val md5 = MessageDigest getInstance "MD5" val result = Math.min(maxBufferSize, toRead)
NumericRange(0, file.length, maxBufferSize) result.toInt
.foreach { currentOffset => { }
val buffer = readToBuffer(fis, currentOffset)
md5 update buffer
}}
md5.digest
}
def readFile =
for {
fis <- openFile
digest <- digestFile(fis)
_ <- closeFile(fis)
} yield digest
def md5FileChunk(file: File,
offset: Long,
size: Long)
(implicit logger: Logger): IO[MD5Hash] = {
val endOffset = Math.min(offset + size, file.length)
for { for {
_ <- logger.debug(s"md5:reading:size ${file.length}:$file") _ <- logger.debug(s"md5:reading:size ${file.length}:$file")
digest <- readFile digest <- readFile(file, offset, endOffset)
hash = MD5Hash.fromDigest(digest) hash = MD5Hash.fromDigest(digest)
_ <- logger.debug(s"md5:generated:${hash.hash}:$file") _ <- logger.debug(s"md5:generated:${hash.hash}:$file")
} yield hash } yield hash

View file

@ -10,13 +10,17 @@ object S3MetaDataEnricher {
val (keyMatches, hashMatches) = getS3Status(localFile, s3ObjectsData) val (keyMatches, hashMatches) = getS3Status(localFile, s3ObjectsData)
S3MetaData(localFile, S3MetaData(localFile,
matchByKey = keyMatches map { hm => RemoteMetaData(localFile.remoteKey, hm.hash, hm.modified) }, matchByKey = keyMatches map { hm => RemoteMetaData(localFile.remoteKey, hm.hash, hm.modified) },
matchByHash = hashMatches map { km => RemoteMetaData(km.key, localFile.hash, km.modified) }) matchByHash = hashMatches map { case (hash, km) => RemoteMetaData(km.key, hash, km.modified) })
} }
def getS3Status(localFile: LocalFile, def getS3Status(localFile: LocalFile,
s3ObjectsData: S3ObjectsData): (Option[HashModified], Set[KeyModified]) = { s3ObjectsData: S3ObjectsData): (Option[HashModified], Set[(MD5Hash, KeyModified)]) = {
val matchingByKey = s3ObjectsData.byKey.get(localFile.remoteKey) val matchingByKey = s3ObjectsData.byKey.get(localFile.remoteKey)
val matchingByHash = s3ObjectsData.byHash.getOrElse(localFile.hash, Set()) val matchingByHash = localFile.hashes
.map { case(_, md5Hash) =>
s3ObjectsData.byHash.getOrElse(md5Hash, Set())
.map(km => (md5Hash, km))
}.flatten.toSet
(matchingByKey, matchingByHash) (matchingByKey, matchingByHash)
} }

View file

@ -1,21 +1,21 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import cats.data.NonEmptyChain import cats.data.{EitherT, NonEmptyChain}
import cats.data.EitherT
import cats.effect.IO import cats.effect.IO
import cats.implicits._ import cats.implicits._
import net.kemitix.thorp.core.Action.DoNothing import net.kemitix.thorp.core.Action.DoNothing
import net.kemitix.thorp.domain.{Config, LocalFile, Logger, RemoteKey, S3ObjectsData} import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.StorageService import net.kemitix.thorp.storage.api.{HashService, StorageService}
trait Synchronise { trait Synchronise {
def apply(storageService: StorageService, def apply(storageService: StorageService,
hashService: HashService,
configOptions: Seq[ConfigOption]) configOptions: Seq[ConfigOption])
(implicit logger: Logger): EitherT[IO, List[String], Stream[Action]] = (implicit l: Logger): EitherT[IO, List[String], Stream[Action]] =
EitherT(ConfigurationBuilder.buildConfig(configOptions)) EitherT(ConfigurationBuilder.buildConfig(configOptions))
.swap.map(errorMessages).swap .swap.map(errorMessages).swap
.flatMap(config => useValidConfig(storageService, config)) .flatMap(config => useValidConfig(storageService, hashService)(config, l))
def errorMessages(errors: NonEmptyChain[ConfigValidation]): List[String] = def errorMessages(errors: NonEmptyChain[ConfigValidation]): List[String] =
errors.map(cv => cv.errorMessage).toList errors.map(cv => cv.errorMessage).toList
@ -26,45 +26,50 @@ trait Synchronise {
} }
def useValidConfig(storageService: StorageService, def useValidConfig(storageService: StorageService,
config: Config) hashService: HashService)
(implicit logger: Logger): EitherT[IO, List[String], Stream[Action]] = { (implicit c: Config, l: Logger): EitherT[IO, List[String], Stream[Action]] = {
for { for {
_ <- EitherT.liftF(SyncLogging.logRunStart(config.bucket, config.prefix, config.source)) _ <- EitherT.liftF(SyncLogging.logRunStart(c.bucket, c.prefix, c.source))
actions <- gatherMetadata(storageService, logger, config) actions <- gatherMetadata(storageService, hashService)
.swap.map(error => List(error)).swap .swap.map(error => List(error)).swap
.map { .map {
case (remoteData, localData) => case (remoteData, localData) =>
(actionsForLocalFiles(config, localData, remoteData) ++ (actionsForLocalFiles(localData, remoteData) ++
actionsForRemoteKeys(config, remoteData)) actionsForRemoteKeys(remoteData))
.filter(removeDoNothing) .filter(removeDoNothing)
} }
} yield actions } yield actions
} }
private def gatherMetadata(storageService: StorageService, private def gatherMetadata(storageService: StorageService,
logger: Logger, hashService: HashService)
config: Config): EitherT[IO, String, (S3ObjectsData, Stream[LocalFile])] = (implicit l: Logger,
c: Config): EitherT[IO, String, (S3ObjectsData, Stream[LocalFile])] =
for { for {
remoteData <- fetchRemoteData(storageService, config) remoteData <- fetchRemoteData(storageService)
localData <- EitherT.liftF(findLocalFiles(config, logger)) localData <- EitherT.liftF(findLocalFiles(hashService))
} yield (remoteData, localData) } yield (remoteData, localData)
private def actionsForLocalFiles(config: Config, localData: Stream[LocalFile], remoteData: S3ObjectsData) = private def actionsForLocalFiles(localData: Stream[LocalFile], remoteData: S3ObjectsData)
localData.foldLeft(Stream[Action]())((acc, lf) => createActionFromLocalFile(config, lf, remoteData) ++ acc) (implicit c: Config) =
localData.foldLeft(Stream[Action]())((acc, lf) => createActionFromLocalFile(lf, remoteData) ++ acc)
private def actionsForRemoteKeys(config: Config, remoteData: S3ObjectsData) = private def actionsForRemoteKeys(remoteData: S3ObjectsData)
remoteData.byKey.keys.foldLeft(Stream[Action]())((acc, rk) => createActionFromRemoteKey(config, rk) #:: acc) (implicit c: Config) =
remoteData.byKey.keys.foldLeft(Stream[Action]())((acc, rk) => createActionFromRemoteKey(rk) #:: acc)
private def fetchRemoteData(storageService: StorageService, config: Config) = private def fetchRemoteData(storageService: StorageService)(implicit c: Config) =
storageService.listObjects(config.bucket, config.prefix) storageService.listObjects(c.bucket, c.prefix)
private def findLocalFiles(implicit config: Config, l: Logger) = private def findLocalFiles(hashService: HashService)(implicit config: Config, l: Logger) =
LocalFileStream.findFiles(config.source, MD5HashGenerator.md5File(_)) LocalFileStream.findFiles(config.source, hashService)
private def createActionFromLocalFile(c: Config, lf: LocalFile, remoteData: S3ObjectsData) = private def createActionFromLocalFile(lf: LocalFile, remoteData: S3ObjectsData)
ActionGenerator.createActions(S3MetaDataEnricher.getMetadata(lf, remoteData)(c))(c) (implicit c: Config) =
ActionGenerator.createActions(S3MetaDataEnricher.getMetadata(lf, remoteData))
private def createActionFromRemoteKey(c: Config, rk: RemoteKey) = private def createActionFromRemoteKey(rk: RemoteKey)
(implicit c: Config) =
if (rk.isMissingLocally(c.source, c.prefix)) Action.ToDelete(c.bucket, rk) if (rk.isMissingLocally(c.source, c.prefix)) Action.ToDelete(c.bucket, rk)
else DoNothing(c.bucket, rk) else DoNothing(c.bucket, rk)

View file

@ -22,7 +22,7 @@ class ActionGeneratorSuite
describe("#1 local exists, remote exists, remote matches - do nothing") { describe("#1 local exists, remote exists, remote matches - do nothing") {
val theHash = MD5Hash("the-hash") val theHash = MD5Hash("the-hash")
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey) val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey)
val theRemoteMetadata = RemoteMetaData(theFile.remoteKey, theHash, lastModified) val theRemoteMetadata = RemoteMetaData(theFile.remoteKey, theHash, lastModified)
val input = S3MetaData(theFile, // local exists val input = S3MetaData(theFile, // local exists
matchByHash = Set(theRemoteMetadata), // remote matches matchByHash = Set(theRemoteMetadata), // remote matches
@ -36,7 +36,7 @@ class ActionGeneratorSuite
} }
describe("#2 local exists, remote is missing, other matches - copy") { describe("#2 local exists, remote is missing, other matches - copy") {
val theHash = MD5Hash("the-hash") val theHash = MD5Hash("the-hash")
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey) val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey)
val theRemoteKey = theFile.remoteKey val theRemoteKey = theFile.remoteKey
val otherRemoteKey = prefix.resolve("other-key") val otherRemoteKey = prefix.resolve("other-key")
val otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash, lastModified) val otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash, lastModified)
@ -51,7 +51,7 @@ class ActionGeneratorSuite
} }
describe("#3 local exists, remote is missing, other no matches - upload") { describe("#3 local exists, remote is missing, other no matches - upload") {
val theHash = MD5Hash("the-hash") val theHash = MD5Hash("the-hash")
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey) val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey)
val input = S3MetaData(theFile, // local exists val input = S3MetaData(theFile, // local exists
matchByHash = Set.empty, // other no matches matchByHash = Set.empty, // other no matches
matchByKey = None) // remote is missing matchByKey = None) // remote is missing
@ -63,7 +63,7 @@ class ActionGeneratorSuite
} }
describe("#4 local exists, remote exists, remote no match, other matches - copy") { describe("#4 local exists, remote exists, remote no match, other matches - copy") {
val theHash = MD5Hash("the-hash") val theHash = MD5Hash("the-hash")
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey) val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey)
val theRemoteKey = theFile.remoteKey val theRemoteKey = theFile.remoteKey
val oldHash = MD5Hash("old-hash") val oldHash = MD5Hash("old-hash")
val otherRemoteKey = prefix.resolve("other-key") val otherRemoteKey = prefix.resolve("other-key")
@ -82,7 +82,7 @@ class ActionGeneratorSuite
} }
describe("#5 local exists, remote exists, remote no match, other no matches - upload") { describe("#5 local exists, remote exists, remote no match, other no matches - upload") {
val theHash = MD5Hash("the-hash") val theHash = MD5Hash("the-hash")
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey) val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey)
val theRemoteKey = theFile.remoteKey val theRemoteKey = theFile.remoteKey
val oldHash = MD5Hash("old-hash") val oldHash = MD5Hash("old-hash")
val theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash, lastModified) val theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash, lastModified)
@ -102,4 +102,8 @@ class ActionGeneratorSuite
} }
} }
} }
private def md5HashMap(theHash: MD5Hash) = {
Map("md5" -> theHash)
}
} }

View file

@ -0,0 +1,11 @@
package net.kemitix.thorp.core
import java.io.File
import cats.effect.IO
import net.kemitix.thorp.domain.{Logger, MD5Hash}
import net.kemitix.thorp.storage.api.HashService
case class DummyHashService(hashes: Map[File, Map[String, MD5Hash]]) extends HashService {
override def hashLocalObject(file: File)(implicit l: Logger): IO[Map[String, MD5Hash]] = IO.pure(hashes(file))
}

View file

@ -1,22 +1,29 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import java.io.File import java.nio.file.Paths
import cats.effect.IO import net.kemitix.thorp.domain.{Config, LocalFile, Logger, MD5HashData}
import net.kemitix.thorp.domain.{Config, LocalFile, Logger, MD5Hash} import net.kemitix.thorp.storage.api.HashService
import org.scalatest.FunSpec import org.scalatest.FunSpec
class LocalFileStreamSuite extends FunSpec { class LocalFileStreamSuite extends FunSpec {
val uploadResource = Resource(this, "upload") private val uploadResource = Resource(this, "upload")
implicit val config: Config = Config(source = uploadResource) private val hashService: HashService = DummyHashService(Map(
file("root-file") -> Map("md5" -> MD5HashData.Root.hash),
file("subdir/leaf-file") -> Map("md5" -> MD5HashData.Leaf.hash)
))
private def file(filename: String) =
uploadResource.toPath.resolve(Paths.get(filename)).toFile
implicit private val config: Config = Config(source = uploadResource)
implicit private val logger: Logger = new DummyLogger implicit private val logger: Logger = new DummyLogger
val md5HashGenerator: File => IO[MD5Hash] = file => MD5HashGenerator.md5File(file)
describe("findFiles") { describe("findFiles") {
it("should find all files") { it("should find all files") {
val result: Set[String] = val result: Set[String] =
LocalFileStream.findFiles(uploadResource, md5HashGenerator).unsafeRunSync.toSet LocalFileStream.findFiles(uploadResource, hashService).unsafeRunSync.toSet
.map { x: LocalFile => x.relative.toString } .map { x: LocalFile => x.relative.toString }
assertResult(Set("subdir/leaf-file", "root-file"))(result) assertResult(Set("subdir/leaf-file", "root-file"))(result)
} }

View file

@ -1,11 +0,0 @@
package net.kemitix.thorp.core
import net.kemitix.thorp.domain.MD5Hash
object MD5HashData {
val rootHash = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e", Some("o6asEaDrV3uBs7tclcyKbg=="))
val leafHash = MD5Hash("208386a650bdec61cfcd7bd8dcb6b542", Some("IIOGplC97GHPzXvY3La1Qg=="))
}

View file

@ -1,6 +1,6 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import net.kemitix.thorp.core.MD5HashData.rootHash import net.kemitix.thorp.domain.MD5HashData.Root
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import org.scalatest.FunSpec import org.scalatest.FunSpec
@ -11,20 +11,35 @@ class MD5HashGeneratorTest extends FunSpec {
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
implicit private val logger: Logger = new DummyLogger implicit private val logger: Logger = new DummyLogger
describe("read a small file (smaller than buffer)") { describe("read a small file (smaller than buffer)") {
val file = Resource(this, "upload/root-file") val file = Resource(this, "upload/root-file")
it("should generate the correct hash") { it("should generate the correct hash") {
val result = MD5HashGenerator.md5File(file).unsafeRunSync val result = MD5HashGenerator.md5File(file).unsafeRunSync
assertResult(rootHash)(result) assertResult(Root.hash)(result)
}
} }
describe("read a large file (bigger than buffer)") { }
val file = Resource(this, "big-file") describe("read a large file (bigger than buffer)") {
it("should generate the correct hash") { val file = Resource(this, "big-file")
val expected = MD5Hash("b1ab1f7680138e6db7309200584e35d8", Some("sasfdoATjm23MJIAWE412A==")) it("should generate the correct hash") {
val result = MD5HashGenerator.md5File(file).unsafeRunSync val expected = MD5HashData.BigFile.hash
assertResult(expected)(result) val result = MD5HashGenerator.md5File(file).unsafeRunSync
} assertResult(expected)(result)
} }
}
describe("read chunks of file") {
val file = Resource(this, "big-file")
it("should generate the correct hash for first chunk of the file") {
val part1 = MD5HashData.BigFile.Part1
val expected = part1.hash
val result = MD5HashGenerator.md5FileChunk(file, part1.offset, part1.size).unsafeRunSync
assertResult(expected)(result)
}
it("should generate the correcy hash for second chunk of the file") {
val part2 = MD5HashData.BigFile.Part2
val expected = part2.hash
val result = MD5HashGenerator.md5FileChunk(file, part2.offset, part2.size).unsafeRunSync
assertResult(expected)(result)
}
}
} }

View file

@ -15,11 +15,21 @@ class S3MetaDataEnricherSuite
private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _ private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _
val lastModified = LastModified(Instant.now()) val lastModified = LastModified(Instant.now())
def getMatchesByKey(status: (Option[HashModified], Set[(MD5Hash, KeyModified)])): Option[HashModified] = {
val (byKey, _) = status
byKey
}
def getMatchesByHash(status: (Option[HashModified], Set[(MD5Hash, KeyModified)])): Set[(MD5Hash, KeyModified)] = {
val (_, byHash) = status
byHash
}
describe("enrich with metadata") { describe("enrich with metadata") {
describe("#1a local exists, remote exists, remote matches, other matches - do nothing") { describe("#1a local exists, remote exists, remote matches, other matches - do nothing") {
val theHash: MD5Hash = MD5Hash("the-file-hash") val theHash: MD5Hash = MD5Hash("the-file-hash")
val theFile: LocalFile = LocalFile.resolve("the-file", theHash, source, fileToKey) val theFile: LocalFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey)
val theRemoteKey: RemoteKey = theFile.remoteKey val theRemoteKey: RemoteKey = theFile.remoteKey
val s3: S3ObjectsData = S3ObjectsData( val s3: S3ObjectsData = S3ObjectsData(
byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))), byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))),
@ -36,7 +46,7 @@ class S3MetaDataEnricherSuite
} }
describe("#1b local exists, remote exists, remote matches, other no matches - do nothing") { describe("#1b local exists, remote exists, remote matches, other no matches - do nothing") {
val theHash: MD5Hash = MD5Hash("the-file-hash") val theHash: MD5Hash = MD5Hash("the-file-hash")
val theFile: LocalFile = LocalFile.resolve("the-file", theHash, source, fileToKey) val theFile: LocalFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey)
val theRemoteKey: RemoteKey = prefix.resolve("the-file") val theRemoteKey: RemoteKey = prefix.resolve("the-file")
val s3: S3ObjectsData = S3ObjectsData( val s3: S3ObjectsData = S3ObjectsData(
byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))), byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))),
@ -53,7 +63,7 @@ class S3MetaDataEnricherSuite
} }
describe("#2 local exists, remote is missing, remote no match, other matches - copy") { describe("#2 local exists, remote is missing, remote no match, other matches - copy") {
val theHash = MD5Hash("the-hash") val theHash = MD5Hash("the-hash")
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey) val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey)
val otherRemoteKey = RemoteKey("other-key") val otherRemoteKey = RemoteKey("other-key")
val s3: S3ObjectsData = S3ObjectsData( val s3: S3ObjectsData = S3ObjectsData(
byHash = Map(theHash -> Set(KeyModified(otherRemoteKey, lastModified))), byHash = Map(theHash -> Set(KeyModified(otherRemoteKey, lastModified))),
@ -70,7 +80,7 @@ class S3MetaDataEnricherSuite
} }
describe("#3 local exists, remote is missing, remote no match, other no matches - upload") { describe("#3 local exists, remote is missing, remote no match, other no matches - upload") {
val theHash = MD5Hash("the-hash") val theHash = MD5Hash("the-hash")
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey) val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey)
val s3: S3ObjectsData = S3ObjectsData( val s3: S3ObjectsData = S3ObjectsData(
byHash = Map(), byHash = Map(),
byKey = Map() byKey = Map()
@ -85,7 +95,7 @@ class S3MetaDataEnricherSuite
} }
describe("#4 local exists, remote exists, remote no match, other matches - copy") { describe("#4 local exists, remote exists, remote no match, other matches - copy") {
val theHash = MD5Hash("the-hash") val theHash = MD5Hash("the-hash")
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey) val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey)
val theRemoteKey = theFile.remoteKey val theRemoteKey = theFile.remoteKey
val oldHash = MD5Hash("old-hash") val oldHash = MD5Hash("old-hash")
val otherRemoteKey = prefix.resolve("other-key") val otherRemoteKey = prefix.resolve("other-key")
@ -110,7 +120,7 @@ class S3MetaDataEnricherSuite
} }
describe("#5 local exists, remote exists, remote no match, other no matches - upload") { describe("#5 local exists, remote exists, remote no match, other no matches - upload") {
val theHash = MD5Hash("the-hash") val theHash = MD5Hash("the-hash")
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey) val theFile = LocalFile.resolve("the-file", md5HashMap(theHash), source, fileToKey)
val theRemoteKey = theFile.remoteKey val theRemoteKey = theFile.remoteKey
val oldHash = MD5Hash("old-hash") val oldHash = MD5Hash("old-hash")
val s3: S3ObjectsData = S3ObjectsData( val s3: S3ObjectsData = S3ObjectsData(
@ -132,13 +142,17 @@ class S3MetaDataEnricherSuite
} }
} }
private def md5HashMap(theHash: MD5Hash) = {
Map("md5" -> theHash)
}
describe("getS3Status") { describe("getS3Status") {
val hash = MD5Hash("hash") val hash = MD5Hash("hash")
val localFile = LocalFile.resolve("the-file", hash, source, fileToKey) val localFile = LocalFile.resolve("the-file", md5HashMap(hash), source, fileToKey)
val key = localFile.remoteKey val key = localFile.remoteKey
val keyOtherKey = LocalFile.resolve("other-key-same-hash", hash, source, fileToKey) val keyOtherKey = LocalFile.resolve("other-key-same-hash", md5HashMap(hash), source, fileToKey)
val diffHash = MD5Hash("diff") val diffHash = MD5Hash("diff")
val keyDiffHash = LocalFile.resolve("other-key-diff-hash", diffHash, source, fileToKey) val keyDiffHash = LocalFile.resolve("other-key-diff-hash", md5HashMap(diffHash), source, fileToKey)
val lastModified = LastModified(Instant.now) val lastModified = LastModified(Instant.now)
val s3ObjectsData: S3ObjectsData = S3ObjectsData( val s3ObjectsData: S3ObjectsData = S3ObjectsData(
byHash = Map( byHash = Map(
@ -154,32 +168,32 @@ class S3MetaDataEnricherSuite
} }
describe("when remote key exists") { describe("when remote key exists") {
it("should return (Some, Set.nonEmpty)") { it("should return a result for matching key") {
assertResult( val result = getMatchesByKey(invoke(localFile))
(Some(HashModified(hash, lastModified)), assert(result.contains(HashModified(hash, lastModified)))
Set(
KeyModified(key, lastModified),
KeyModified(keyOtherKey.remoteKey, lastModified)))
)(invoke(localFile))
} }
} }
describe("when remote key does not exist and no others matches hash") { describe("when remote key does not exist and no others matches hash") {
it("should return (None, Set.empty)") { val localFile = LocalFile.resolve("missing-file", md5HashMap(MD5Hash("unique")), source, fileToKey)
val localFile = LocalFile.resolve("missing-file", MD5Hash("unique"), source, fileToKey) it("should return no matches by key") {
assertResult( val result = getMatchesByKey(invoke(localFile))
(None, assert(result.isEmpty)
Set.empty) }
)(invoke(localFile)) it("should return no matches by hash") {
val result = getMatchesByHash(invoke(localFile))
assert(result.isEmpty)
} }
} }
describe("when remote key exists and no others match hash") { describe("when remote key exists and no others match hash") {
it("should return (None, Set.nonEmpty)") { it("should return match by key") {
assertResult( val result = getMatchesByKey(invoke(keyDiffHash))
(Some(HashModified(diffHash, lastModified)), assert(result.contains(HashModified(diffHash, lastModified)))
Set(KeyModified(keyDiffHash.remoteKey, lastModified))) }
)(invoke(keyDiffHash)) it("should return only itself in match by hash") {
val result = getMatchesByHash(invoke(keyDiffHash))
assert(result.equals(Set((diffHash, KeyModified(keyDiffHash.remoteKey,lastModified)))))
} }
} }

View file

@ -1,15 +1,16 @@
package net.kemitix.thorp.core package net.kemitix.thorp.core
import java.io.File import java.io.File
import java.nio.file.Paths
import java.time.Instant import java.time.Instant
import cats.data.EitherT import cats.data.EitherT
import cats.effect.IO import cats.effect.IO
import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.core.Action.{ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.core.MD5HashData.{leafHash, rootHash} import net.kemitix.thorp.domain.MD5HashData.{Leaf, Root}
import net.kemitix.thorp.domain._
import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, UploadQueueEvent} import net.kemitix.thorp.domain.StorageQueueEvent.{CopyQueueEvent, DeleteQueueEvent, UploadQueueEvent}
import net.kemitix.thorp.storage.api.StorageService import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.{HashService, StorageService}
import org.scalatest.FunSpec import org.scalatest.FunSpec
class SyncSuite class SyncSuite
@ -34,12 +35,23 @@ class SyncSuite
// source contains the files root-file and subdir/leaf-file // source contains the files root-file and subdir/leaf-file
val rootRemoteKey = RemoteKey("prefix/root-file") val rootRemoteKey = RemoteKey("prefix/root-file")
val leafRemoteKey = RemoteKey("prefix/subdir/leaf-file") val leafRemoteKey = RemoteKey("prefix/subdir/leaf-file")
val rootFile: LocalFile = LocalFile.resolve("root-file", rootHash, source, _ => rootRemoteKey) val rootFile: LocalFile = LocalFile.resolve("root-file", md5HashMap(Root.hash), source, _ => rootRemoteKey)
val leafFile: LocalFile = LocalFile.resolve("subdir/leaf-file", leafHash, source, _ => leafRemoteKey)
private def md5HashMap(md5Hash: MD5Hash): Map[String, MD5Hash] = {
Map("md5" -> md5Hash)
}
val leafFile: LocalFile = LocalFile.resolve("subdir/leaf-file", md5HashMap(Leaf.hash), source, _ => leafRemoteKey)
val hashService = DummyHashService(Map(
file("root-file") -> Map("md5" -> MD5HashData.Root.hash),
file("subdir/leaf-file") -> Map("md5" -> MD5HashData.Leaf.hash)
))
def invokeSubject(storageService: StorageService, def invokeSubject(storageService: StorageService,
hashService: HashService,
configOptions: List[ConfigOption]): Either[List[String], Stream[Action]] = { configOptions: List[ConfigOption]): Either[List[String], Stream[Action]] = {
Synchronise(storageService, configOptions).value.unsafeRunSync Synchronise(storageService, hashService, configOptions).value.unsafeRunSync
} }
describe("when all files should be uploaded") { describe("when all files should be uploaded") {
@ -50,22 +62,26 @@ class SyncSuite
val expected = Right(Set( val expected = Right(Set(
ToUpload(testBucket, rootFile), ToUpload(testBucket, rootFile),
ToUpload(testBucket, leafFile))) ToUpload(testBucket, leafFile)))
val result = invokeSubject(storageService, configOptions) val result = invokeSubject(storageService, hashService, configOptions)
assertResult(expected)(result.map(_.toSet)) assertResult(expected)(result.map(_.toSet))
} }
} }
private def file(filename: String) =
source.toPath.resolve(Paths.get(filename)).toFile
describe("when no files should be uploaded") { describe("when no files should be uploaded") {
val s3ObjectsData = S3ObjectsData( val s3ObjectsData = S3ObjectsData(
byHash = Map( byHash = Map(
rootHash -> Set(KeyModified(RemoteKey("prefix/root-file"), lastModified)), Root.hash -> Set(KeyModified(RemoteKey("prefix/root-file"), lastModified)),
leafHash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))), Leaf.hash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))),
byKey = Map( byKey = Map(
RemoteKey("prefix/root-file") -> HashModified(rootHash, lastModified), RemoteKey("prefix/root-file") -> HashModified(Root.hash, lastModified),
RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified))) RemoteKey("prefix/subdir/leaf-file") -> HashModified(Leaf.hash, lastModified)))
val storageService = new RecordingStorageService(testBucket, s3ObjectsData) val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
it("no actions") { it("no actions") {
val expected = Stream() val expected = Stream()
val result = invokeSubject(storageService, configOptions) val result = invokeSubject(storageService, hashService, configOptions)
assert(result.isRight) assert(result.isRight)
assertResult(expected)(result.right.get) assertResult(expected)(result.right.get)
} }
@ -76,18 +92,18 @@ class SyncSuite
// 'root-file-old' should be renamed as 'root-file' // 'root-file-old' should be renamed as 'root-file'
val s3ObjectsData = S3ObjectsData( val s3ObjectsData = S3ObjectsData(
byHash = Map( byHash = Map(
rootHash -> Set(KeyModified(sourceKey, lastModified)), Root.hash -> Set(KeyModified(sourceKey, lastModified)),
leafHash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))), Leaf.hash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))),
byKey = Map( byKey = Map(
sourceKey -> HashModified(rootHash, lastModified), sourceKey -> HashModified(Root.hash, lastModified),
RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified))) RemoteKey("prefix/subdir/leaf-file") -> HashModified(Leaf.hash, lastModified)))
val storageService = new RecordingStorageService(testBucket, s3ObjectsData) val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
it("copies the file and deletes the original") { it("copies the file and deletes the original") {
val expected = Stream( val expected = Stream(
ToCopy(testBucket, sourceKey, rootHash, targetKey), ToCopy(testBucket, sourceKey, Root.hash, targetKey),
ToDelete(testBucket, sourceKey) ToDelete(testBucket, sourceKey)
) )
val result = invokeSubject(storageService, configOptions) val result = invokeSubject(storageService, hashService, configOptions)
assert(result.isRight) assert(result.isRight)
assertResult(expected)(result.right.get) assertResult(expected)(result.right.get)
} }
@ -102,19 +118,19 @@ class SyncSuite
val deletedKey = RemoteKey("prefix/deleted-file") val deletedKey = RemoteKey("prefix/deleted-file")
val s3ObjectsData = S3ObjectsData( val s3ObjectsData = S3ObjectsData(
byHash = Map( byHash = Map(
rootHash -> Set(KeyModified(RemoteKey("prefix/root-file"), lastModified)), Root.hash -> Set(KeyModified(RemoteKey("prefix/root-file"), lastModified)),
leafHash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified)), Leaf.hash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified)),
deletedHash -> Set(KeyModified(RemoteKey("prefix/deleted-file"), lastModified))), deletedHash -> Set(KeyModified(RemoteKey("prefix/deleted-file"), lastModified))),
byKey = Map( byKey = Map(
RemoteKey("prefix/root-file") -> HashModified(rootHash, lastModified), RemoteKey("prefix/root-file") -> HashModified(Root.hash, lastModified),
RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified), RemoteKey("prefix/subdir/leaf-file") -> HashModified(Leaf.hash, lastModified),
deletedKey -> HashModified(deletedHash, lastModified))) deletedKey -> HashModified(deletedHash, lastModified)))
val storageService = new RecordingStorageService(testBucket, s3ObjectsData) val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
it("deleted key") { it("deleted key") {
val expected = Stream( val expected = Stream(
ToDelete(testBucket, deletedKey) ToDelete(testBucket, deletedKey)
) )
val result = invokeSubject(storageService, configOptions) val result = invokeSubject(storageService,hashService, configOptions)
assert(result.isRight) assert(result.isRight)
assertResult(expected)(result.right.get) assertResult(expected)(result.right.get)
} }
@ -122,15 +138,15 @@ class SyncSuite
describe("when a file is excluded") { describe("when a file is excluded") {
val s3ObjectsData = S3ObjectsData( val s3ObjectsData = S3ObjectsData(
byHash = Map( byHash = Map(
rootHash -> Set(KeyModified(RemoteKey("prefix/root-file"), lastModified)), Root.hash -> Set(KeyModified(RemoteKey("prefix/root-file"), lastModified)),
leafHash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))), Leaf.hash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))),
byKey = Map( byKey = Map(
RemoteKey("prefix/root-file") -> HashModified(rootHash, lastModified), RemoteKey("prefix/root-file") -> HashModified(Root.hash, lastModified),
RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified))) RemoteKey("prefix/subdir/leaf-file") -> HashModified(Leaf.hash, lastModified)))
val storageService = new RecordingStorageService(testBucket, s3ObjectsData) val storageService = new RecordingStorageService(testBucket, s3ObjectsData)
it("is not uploaded") { it("is not uploaded") {
val expected = Stream() val expected = Stream()
val result = invokeSubject(storageService, ConfigOption.Exclude("leaf") :: configOptions) val result = invokeSubject(storageService, hashService, ConfigOption.Exclude("leaf") :: configOptions)
assert(result.isRight) assert(result.isRight)
assertResult(expected)(result.right.get) assertResult(expected)(result.right.get)
} }
@ -147,20 +163,17 @@ class SyncSuite
override def upload(localFile: LocalFile, override def upload(localFile: LocalFile,
bucket: Bucket, bucket: Bucket,
uploadEventListener: UploadEventListener, uploadEventListener: UploadEventListener,
tryCount: Int): IO[UploadQueueEvent] = { tryCount: Int): IO[UploadQueueEvent] =
IO.pure(UploadQueueEvent(localFile.remoteKey, localFile.hash)) IO.pure(UploadQueueEvent(localFile.remoteKey, localFile.hashes("md5")))
}
override def copy(bucket: Bucket, override def copy(bucket: Bucket,
sourceKey: RemoteKey, sourceKey: RemoteKey,
hash: MD5Hash, hashes: MD5Hash,
targetKey: RemoteKey): IO[CopyQueueEvent] = { targetKey: RemoteKey): IO[CopyQueueEvent] =
IO.pure(CopyQueueEvent(targetKey)) IO.pure(CopyQueueEvent(targetKey))
}
override def delete(bucket: Bucket, override def delete(bucket: Bucket,
remoteKey: RemoteKey): IO[DeleteQueueEvent] = { remoteKey: RemoteKey): IO[DeleteQueueEvent] =
IO.pure(DeleteQueueEvent(remoteKey)) IO.pure(DeleteQueueEvent(remoteKey))
}
} }
} }

View file

@ -0,0 +1,23 @@
package net.kemitix.thorp.domain
import java.math.BigInteger
trait HexEncoder {
def encode(bytes: Array[Byte]): String = {
val bigInteger = new BigInteger(1, bytes)
String.format("%0" + (bytes.length << 1) + "x", bigInteger)
}
def decode(hexString: String): Array[Byte] = {
val byteArray = new BigInteger(hexString, 16).toByteArray
if (byteArray(0) == 0) {
val output = new Array[Byte](byteArray.length - 1)
System.arraycopy(byteArray, 1, output, 0, output.length)
output
} else byteArray
}
}
object HexEncoder extends HexEncoder

View file

@ -3,7 +3,7 @@ package net.kemitix.thorp.domain
import java.io.File import java.io.File
import java.nio.file.Path import java.nio.file.Path
final case class LocalFile(file: File, source: File, hash: MD5Hash, remoteKey: RemoteKey) { final case class LocalFile(file: File, source: File, hashes: Map[String, MD5Hash], remoteKey: RemoteKey) {
require(!file.isDirectory, s"LocalFile must not be a directory: $file") require(!file.isDirectory, s"LocalFile must not be a directory: $file")
@ -12,16 +12,18 @@ final case class LocalFile(file: File, source: File, hash: MD5Hash, remoteKey: R
// the path of the file within the source // the path of the file within the source
def relative: Path = source.toPath.relativize(file.toPath) def relative: Path = source.toPath.relativize(file.toPath)
def matches(other: MD5Hash): Boolean = hash.hash == other.hash def matches(other: MD5Hash): Boolean = hashes.values.exists(other equals _)
def md5base64: Option[String] = hashes.get("md5").map(_.hash64)
} }
object LocalFile { object LocalFile {
def resolve(path: String, def resolve(path: String,
md5Hash: MD5Hash, md5Hashes: Map[String, MD5Hash],
source: File, source: File,
fileToKey: File => RemoteKey): LocalFile = { fileToKey: File => RemoteKey): LocalFile = {
val file = source.toPath.resolve(path).toFile val file = source.toPath.resolve(path).toFile
LocalFile(file, source, md5Hash, fileToKey(file)) LocalFile(file, source, md5Hashes, fileToKey(file))
} }
} }

View file

@ -4,13 +4,18 @@ import java.util.Base64
import net.kemitix.thorp.domain.QuoteStripper.stripQuotes import net.kemitix.thorp.domain.QuoteStripper.stripQuotes
final case class MD5Hash(in: String, hash64: Option[String] = None) { final case class MD5Hash(in: String) {
lazy val hash: String = in filter stripQuotes lazy val hash: String = in filter stripQuotes
lazy val digest: Array[Byte] = HexEncoder.decode(hash)
lazy val hash64: String = Base64.getEncoder.encodeToString(digest)
} }
object MD5Hash { object MD5Hash {
def fromDigest(digest: Array[Byte]): MD5Hash = def fromDigest(digest: Array[Byte]): MD5Hash = {
MD5Hash((digest map ("%02x" format _)).mkString, Some(Base64.getEncoder.encodeToString(digest))) val hexDigest = (digest map ("%02x" format _)).mkString
MD5Hash(hexDigest)
}
} }

View file

@ -0,0 +1,28 @@
package net.kemitix.thorp.domain
object MD5HashData {
object Root {
val hash = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e")
val base64 = "o6asEaDrV3uBs7tclcyKbg=="
}
object Leaf {
val hash = MD5Hash("208386a650bdec61cfcd7bd8dcb6b542")
val base64 = "IIOGplC97GHPzXvY3La1Qg=="
}
object BigFile {
val hash = MD5Hash("b1ab1f7680138e6db7309200584e35d8")
object Part1 {
val offset = 0
val size = 1048576
val hash = MD5Hash("39d4a9c78b9cfddf6d241a201a4ab726")
}
object Part2 {
val offset = 1048576
val size = 1048576
val hash = MD5Hash("af5876f3a3bc6e66f4ae96bb93d8dae0")
}
}
}

View file

@ -0,0 +1,17 @@
package net.kemitix.thorp.domain
import org.scalatest.FunSpec
class MD5HashTest extends FunSpec {
describe("recover base64 hash") {
it("should recover base 64 #1") {
val rootHash = MD5HashData.Root.hash
assertResult(MD5HashData.Root.base64)(rootHash.hash64)
}
it("should recover base 64 #2") {
val leafHash = MD5HashData.Leaf.hash
assertResult(MD5HashData.Leaf.base64)(leafHash.hash64)
}
}
}

1
project/plugins.sbt Normal file
View file

@ -0,0 +1 @@
addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.3.2")

View file

@ -0,0 +1,15 @@
package net.kemitix.thorp.storage.api
import java.io.File
import cats.effect.IO
import net.kemitix.thorp.domain.{Logger, MD5Hash}
/**
* Creates one, or more, hashes for local objects.
*/
trait HashService {
def hashLocalObject(file: File)(implicit l: Logger): IO[Map[String, MD5Hash]]
}

View file

@ -0,0 +1,53 @@
package net.kemitix.thorp.storage.aws
import java.io.File
import cats.effect.IO
import cats.implicits._
import com.amazonaws.services.s3.model.PutObjectRequest
import com.amazonaws.services.s3.transfer.TransferManagerConfiguration
import com.amazonaws.services.s3.transfer.internal.TransferManagerUtils
import net.kemitix.thorp.core.MD5HashGenerator
import net.kemitix.thorp.domain.{Logger, MD5Hash}
trait ETagGenerator {
def eTag(file: File)(implicit l: Logger): IO[String]= {
val partSize = calculatePartSize(file)
val parts = numParts(file.length, partSize)
partsIndex(parts)
.map(digestChunk(file, partSize)).sequence
.map(concatenateDigests)
.map(MD5HashGenerator.hex)
.map(hash => s"$hash-$parts")
}
private def partsIndex(parts: Long) =
Range.Long(0, parts, 1).toList
private def concatenateDigests: List[Array[Byte]] => Array[Byte] =
lab => lab.foldLeft(Array[Byte]())((acc, ab) => acc ++ ab)
private def calculatePartSize(file: File) = {
val request = new PutObjectRequest("", "", file)
val configuration = new TransferManagerConfiguration
TransferManagerUtils.calculateOptimalPartSize(request, configuration)
}
private def numParts(fileLength: Long, optimumPartSize: Long) = {
val fullParts = Math.floorDiv(fileLength, optimumPartSize)
val incompletePart = if (Math.floorMod(fileLength, optimumPartSize) > 0) 1 else 0
fullParts + incompletePart
}
def offsets(totalFileSizeBytes: Long, optimalPartSize: Long): List[Long] =
Range.Long(0, totalFileSizeBytes, optimalPartSize).toList
def digestChunk(file: File, chunkSize: Long)(chunkNumber: Long)(implicit l: Logger): IO[Array[Byte]] =
hashChunk(file, chunkNumber, chunkSize).map(_.digest)
def hashChunk(file: File, chunkNumber: Long, chunkSize: Long)(implicit l: Logger): IO[MD5Hash] =
MD5HashGenerator.md5FileChunk(file, chunkNumber * chunkSize, chunkSize)
}
object ETagGenerator extends ETagGenerator

View file

@ -1,8 +1,7 @@
package net.kemitix.thorp.storage.aws package net.kemitix.thorp.storage.aws
import cats.effect.IO
import cats.data.EitherT import cats.data.EitherT
import cats.implicits._ import cats.effect.IO
import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary} import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary}
import net.kemitix.thorp.domain import net.kemitix.thorp.domain
@ -11,7 +10,7 @@ import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash
import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.util.{Success, Try} import scala.util.Try
class S3ClientObjectLister(amazonS3: AmazonS3) { class S3ClientObjectLister(amazonS3: AmazonS3) {

View file

@ -0,0 +1,31 @@
package net.kemitix.thorp.storage.aws
import java.io.File
import cats.effect.IO
import net.kemitix.thorp.core.MD5HashGenerator
import net.kemitix.thorp.domain.{Logger, MD5Hash}
import net.kemitix.thorp.storage.api.HashService
trait S3HashService extends HashService {
/**
* Generates an MD5 Hash and an multi-part ETag
*
* @param file the local file to scan
* @return a set of hash values
*/
override def hashLocalObject(file: File)(implicit l: Logger): IO[Map[String, MD5Hash]] =
for {
md5 <- MD5HashGenerator.md5File(file)
etag <- ETagGenerator.eTag(file).map(MD5Hash(_))
} yield Map(
"md5" -> md5,
"etag" -> etag
)
}
object S3HashService extends S3HashService {
lazy val defaultHashService: HashService = S3HashService
}

View file

@ -40,7 +40,7 @@ class Uploader(transferManager: => AmazonTransferManager) {
private def request(localFile: LocalFile, bucket: Bucket, listener: ProgressListener): PutObjectRequest = { private def request(localFile: LocalFile, bucket: Bucket, listener: ProgressListener): PutObjectRequest = {
val metadata = new ObjectMetadata() val metadata = new ObjectMetadata()
localFile.hash.hash64.foreach(metadata.setContentMD5) localFile.md5base64.foreach(metadata.setContentMD5)
new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file) new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file)
.withMetadata(metadata) .withMetadata(metadata)
.withGeneralProgressListener(listener) .withGeneralProgressListener(listener)

View file

@ -0,0 +1,51 @@
package net.kemitix.thorp.storage.aws
import com.amazonaws.services.s3.transfer.TransferManagerConfiguration
import net.kemitix.thorp.core.Resource
import net.kemitix.thorp.domain.MD5Hash
import org.scalatest.FunSpec
class ETagGeneratorTest extends FunSpec {
private val bigFile = Resource(this, "big-file")
private val configuration = new TransferManagerConfiguration
private val chunkSize = 1200000
configuration.setMinimumUploadPartSize(chunkSize)
private val logger = new DummyLogger
describe("Create offsets") {
it("should create offsets") {
val offsets = ETagGenerator.offsets(bigFile.length, chunkSize)
.foldRight(List[Long]())((l: Long, a: List[Long]) => l :: a)
assertResult(List(0, chunkSize, chunkSize * 2, chunkSize * 3, chunkSize * 4))(offsets)
}
}
def test(expected: String, result: MD5Hash): Unit = {
assertResult(expected)(result.hash)
}
describe("create md5 hash for each chunk") {
it("should create expected hash for chunks") {
val md5Hashes = List(
"68b7d37e6578297621e06f01800204f1",
"973475b14a7bda6ad8864a7f9913a947",
"b9adcfc5b103fe2dd5924a5e5e6817f0",
"5bd6e10a99fef100fe7bf5eaa0a42384",
"8a0c1d0778ac8fcf4ca2010eba4711eb"
).zipWithIndex
md5Hashes.foreach { case (hash, index) =>
test(hash, ETagGenerator.hashChunk(bigFile, index, chunkSize)(logger).unsafeRunSync)
}
}
}
describe("create etag for whole file") {
val expected = "f14327c90ad105244c446c498bfe9a7d-2"
it("should match aws etag for the file") {
val result = ETagGenerator.eTag(bigFile)(logger).unsafeRunSync
assertResult(expected)(result)
}
}
}

View file

@ -6,11 +6,10 @@ import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.PutObjectRequest import com.amazonaws.services.s3.model.PutObjectRequest
import com.amazonaws.services.s3.transfer.model.UploadResult import com.amazonaws.services.s3.transfer.model.UploadResult
import com.amazonaws.services.s3.transfer.{TransferManager, Upload} import com.amazonaws.services.s3.transfer.{TransferManager, Upload}
import net.kemitix.thorp.core.MD5HashData.rootHash
import net.kemitix.thorp.core.{KeyGenerator, Resource, S3MetaDataEnricher} import net.kemitix.thorp.core.{KeyGenerator, Resource, S3MetaDataEnricher}
import net.kemitix.thorp.domain.MD5HashData.Root
import net.kemitix.thorp.domain.StorageQueueEvent.UploadQueueEvent import net.kemitix.thorp.domain.StorageQueueEvent.UploadQueueEvent
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.StorageService
import org.scalamock.scalatest.MockFactory import org.scalamock.scalatest.MockFactory
import org.scalatest.FunSpec import org.scalatest.FunSpec
@ -27,60 +26,82 @@ class StorageServiceSuite
describe("getS3Status") { describe("getS3Status") {
val hash = MD5Hash("hash") val hash = MD5Hash("hash")
val localFile = LocalFile.resolve("the-file", hash, source, fileToKey) val localFile = LocalFile.resolve("the-file", md5HashMap(hash), source, fileToKey)
val key = localFile.remoteKey val key = localFile.remoteKey
val keyotherkey = LocalFile.resolve("other-key-same-hash", hash, source, fileToKey) val keyOtherKey = LocalFile.resolve("other-key-same-hash", md5HashMap(hash), source, fileToKey)
val diffhash = MD5Hash("diff") val diffHash = MD5Hash("diff")
val keydiffhash = LocalFile.resolve("other-key-diff-hash", diffhash, source, fileToKey) val keyDiffHash = LocalFile.resolve("other-key-diff-hash", md5HashMap(diffHash), source, fileToKey)
val lastModified = LastModified(Instant.now) val lastModified = LastModified(Instant.now)
val s3ObjectsData: S3ObjectsData = S3ObjectsData( val s3ObjectsData: S3ObjectsData = S3ObjectsData(
byHash = Map( byHash = Map(
hash -> Set(KeyModified(key, lastModified), KeyModified(keyotherkey.remoteKey, lastModified)), hash -> Set(KeyModified(key, lastModified), KeyModified(keyOtherKey.remoteKey, lastModified)),
diffhash -> Set(KeyModified(keydiffhash.remoteKey, lastModified))), diffHash -> Set(KeyModified(keyDiffHash.remoteKey, lastModified))),
byKey = Map( byKey = Map(
key -> HashModified(hash, lastModified), key -> HashModified(hash, lastModified),
keyotherkey.remoteKey -> HashModified(hash, lastModified), keyOtherKey.remoteKey -> HashModified(hash, lastModified),
keydiffhash.remoteKey -> HashModified(diffhash, lastModified))) keyDiffHash.remoteKey -> HashModified(diffHash, lastModified)))
def invoke(self: StorageService, localFile: LocalFile) = { def invoke(localFile: LocalFile) =
S3MetaDataEnricher.getS3Status(localFile, s3ObjectsData) S3MetaDataEnricher.getS3Status(localFile, s3ObjectsData)
def getMatchesByKey(status: (Option[HashModified], Set[(MD5Hash, KeyModified)])): Option[HashModified] = {
val (byKey, _) = status
byKey
} }
describe("when remote key exists") { def getMatchesByHash(status: (Option[HashModified], Set[(MD5Hash, KeyModified)])): Set[(MD5Hash, KeyModified)] = {
val storageService = S3StorageServiceBuilder.defaultStorageService val (_, byHash) = status
it("should return (Some, Set.nonEmpty)") { byHash
}
describe("when remote key exists, unmodified and other key matches the hash") {
it("should return the match by key") {
val result = getMatchesByKey(invoke(localFile))
assert(result.contains(HashModified(hash, lastModified)))
}
it("should return both matches for the hash") {
val result = getMatchesByHash(invoke(localFile))
assertResult( assertResult(
(Some(HashModified(hash, lastModified)), Set(
Set( (hash, KeyModified(key, lastModified)),
KeyModified(key, lastModified), (hash, KeyModified(keyOtherKey.remoteKey, lastModified)))
KeyModified(keyotherkey.remoteKey, lastModified))) )(result)
)(invoke(storageService, localFile))
} }
} }
describe("when remote key does not exist and no others matches hash") { describe("when remote key does not exist and no others matches hash") {
val storageService = S3StorageServiceBuilder.defaultStorageService val localFile = LocalFile.resolve("missing-file", md5HashMap(MD5Hash("unique")), source, fileToKey)
it("should return (None, Set.empty)") { it("should return no matches by key") {
val localFile = LocalFile.resolve("missing-file", MD5Hash("unique"), source, fileToKey) val result = getMatchesByKey(invoke(localFile))
assertResult( assert(result.isEmpty)
(None, }
Set.empty) it("should return no matches by hash") {
)(invoke(storageService, localFile)) val result = getMatchesByHash(invoke(localFile))
assert(result.isEmpty)
} }
} }
describe("when remote key exists and no others match hash") { describe("when remote key exists and no others match hash") {
val storageService = S3StorageServiceBuilder.defaultStorageService val localFile = keyDiffHash
it("should return (None, Set.nonEmpty)") { it("should return the match by key") {
val result = getMatchesByKey(invoke(localFile))
assert(result.contains(HashModified(diffHash, lastModified)))
}
it("should return one match by hash") {
val result = getMatchesByHash(invoke(localFile))
assertResult( assertResult(
(Some(HashModified(diffhash, lastModified)), Set(
Set(KeyModified(keydiffhash.remoteKey, lastModified))) (diffHash, KeyModified(keyDiffHash.remoteKey, lastModified)))
)(invoke(storageService, keydiffhash)) )(result)
} }
} }
} }
private def md5HashMap(hash: MD5Hash) = {
Map("md5" -> hash)
}
describe("upload") { describe("upload") {
describe("when uploading a file") { describe("when uploading a file") {
@ -90,7 +111,7 @@ class StorageServiceSuite
val prefix = RemoteKey("prefix") val prefix = RemoteKey("prefix")
val localFile = val localFile =
LocalFile.resolve("root-file", rootHash, source, KeyGenerator.generateKey(source, prefix)) LocalFile.resolve("root-file", md5HashMap(Root.hash), source, KeyGenerator.generateKey(source, prefix))
val bucket = Bucket("a-bucket") val bucket = Bucket("a-bucket")
val remoteKey = RemoteKey("prefix/root-file") val remoteKey = RemoteKey("prefix/root-file")
val uploadEventListener = new UploadEventListener(localFile) val uploadEventListener = new UploadEventListener(localFile)
@ -99,13 +120,13 @@ class StorageServiceSuite
(amazonS3TransferManager upload (_: PutObjectRequest)).when(*).returns(upload) (amazonS3TransferManager upload (_: PutObjectRequest)).when(*).returns(upload)
val uploadResult = stub[UploadResult] val uploadResult = stub[UploadResult]
(upload.waitForUploadResult _).when().returns(uploadResult) (upload.waitForUploadResult _).when().returns(uploadResult)
(uploadResult.getETag _).when().returns(rootHash.hash) (uploadResult.getETag _).when().returns(Root.hash.hash)
(uploadResult.getKey _).when().returns(remoteKey.key) (uploadResult.getKey _).when().returns(remoteKey.key)
it("should return hash of uploaded file") { it("should return hash of uploaded file") {
pending pending
//FIXME: works okay on its own, but fails when run with others //FIXME: works okay on its own, but fails when run with others
val expected = UploadQueueEvent(remoteKey, rootHash) val expected = UploadQueueEvent(remoteKey, Root.hash)
val result = storageService.upload(localFile, bucket, uploadEventListener, 1) val result = storageService.upload(localFile, bucket, uploadEventListener, 1)
assertResult(expected)(result) assertResult(expected)(result)
} }

View file

@ -22,6 +22,11 @@ class UploaderSuite
private val fileToKey = generateKey(config.source, config.prefix) _ private val fileToKey = generateKey(config.source, config.prefix) _
val lastModified = LastModified(Instant.now()) val lastModified = LastModified(Instant.now())
def md5HashMap(hash: MD5Hash): Map[String, MD5Hash] =
Map(
"md5" -> hash
)
describe("S3ClientMultiPartTransferManagerSuite") { describe("S3ClientMultiPartTransferManagerSuite") {
describe("upload") { describe("upload") {
pending pending
@ -31,7 +36,7 @@ class UploaderSuite
// dies when putObject is called // dies when putObject is called
val returnedKey = RemoteKey("returned-key") val returnedKey = RemoteKey("returned-key")
val returnedHash = MD5Hash("returned-hash") val returnedHash = MD5Hash("returned-hash")
val bigFile = LocalFile.resolve("small-file", MD5Hash("the-hash"), source, fileToKey) val bigFile = LocalFile.resolve("small-file", md5HashMap(MD5Hash("the-hash")), source, fileToKey)
val uploadEventListener = new UploadEventListener(bigFile) val uploadEventListener = new UploadEventListener(bigFile)
val amazonS3 = mock[AmazonS3] val amazonS3 = mock[AmazonS3]
val amazonS3TransferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build val amazonS3TransferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build