Is AWS SDK calculating MD5Hash again for a local file? (#50)

* [aws-lib] Uploader provide request with the already calculated md5 hash

* [aws-lib] remove unused accepts method

* [aws-lib] Uploader refactoring

* [domain] Config remove unused threshold and max retries items

* [core] Show upload errors in summary

* [domain] LocalFile add helper to explicitly compare by hash value

Looking to add an optional field to MD5Hash but we want to do our
checks here only on the hash value, not whether a digest is available
or not.

* [core] Sync refactoring

* [core] SyncSuite invoke subject inside it method and after declaring expectations

* [core] SyncSuite use the localfile hash rather than something arbitrary

* [cli] Add `--no-global` and `--no-user` options

* [core] LocalFileStream refactoring

* [core] SyncSuite: ignore user and global configuration files

* [domain] MD5Hash now can optionally store the base64 encoded hash

* [core] MD5HashGenerator pass the digest to MD5Hash

* [aws-lib] Uploader use the base64 encoded hash

* [changelog] updated
This commit is contained in:
Paul Campbell 2019-06-21 19:20:35 +01:00 committed by GitHub
parent 7e9db432d7
commit 761c1c9784
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 138 additions and 98 deletions

View file

@ -5,7 +5,7 @@ All notable changes to this project will be documented in this file.
The format is based on [[https://keepachangelog.com/en/1.0.0/][Keep a Changelog]], and this project adheres to
[[https://semver.org/spec/v2.0.0.html][Semantic Versioning]].
* [0.5.0] - ????-??-??
* [0.5.0] - 2019-06-21
** Added
@ -15,6 +15,8 @@ The format is based on [[https://keepachangelog.com/en/1.0.0/][Keep a Changelog]
- Read config from ~.thorp.conf~ in source directory (#71)
- Read config from ~$HOME/.config/thorp.conf~ and ~/etc/thorp.conf~
(#73)
- Add ~--no-global~ and ~--no-user~ options (#50)
- Display any upload errors in summary report (#50)
** Changed
@ -22,6 +24,7 @@ The format is based on [[https://keepachangelog.com/en/1.0.0/][Keep a Changelog]
- Suppress Transfer event messages (#64)
- Better error message when source not found (#51)
- Reduced logging (#59)
- Prevent AWS SDK from recalculating MD5 hash (#50)
** Fixed

View file

@ -22,6 +22,8 @@ hash of the file contents.
-i, --include <value> Include matching paths
-x, --exclude <value> Exclude matching paths
-d, --debug Enable debug logging
--no-global Ignore global configuration
--no-user Ignore user configuration
#+end_example
If you don't provide a ~source~ the current diretory will be used.
@ -33,7 +35,7 @@ The ~--include~ and ~--exclude~ parameters can be used more than once.
Configuration will be read from these files:
- Global: ~/etc/thorp.conf~
- User: ~~/.config/thorp.conf~
- User: ~ ~/.config/thorp.conf~
- Source: ~${source}/.thorp.conf~
Command line arguments override those in Source, which override those

View file

@ -13,9 +13,7 @@ trait S3Client {
def upload(localFile: LocalFile,
bucket: Bucket,
uploadProgressListener: UploadProgressListener,
multiPartThreshold: Long,
tryCount: Int,
maxRetries: Int)
tryCount: Int)
(implicit logger: Logger): IO[S3Action]
def copy(bucket: Bucket,

View file

@ -31,11 +31,9 @@ class ThorpS3Client(amazonS3Client: => AmazonS3,
override def upload(localFile: LocalFile,
bucket: Bucket,
progressListener: UploadProgressListener,
multiPartThreshold: Long,
tryCount: Int,
maxRetries: Int)
tryCount: Int)
(implicit logger: Logger): IO[S3Action] =
uploader.upload(localFile, bucket, progressListener, multiPartThreshold, 1, maxRetries)
uploader.upload(localFile, bucket, progressListener, 1)
override def delete(bucket: Bucket,
remoteKey: RemoteKey)

View file

@ -2,7 +2,7 @@ package net.kemitix.thorp.aws.lib
import cats.effect.IO
import com.amazonaws.event.{ProgressEvent, ProgressEventType, ProgressListener}
import com.amazonaws.services.s3.model.PutObjectRequest
import com.amazonaws.services.s3.model.{ObjectMetadata, PutObjectRequest}
import com.amazonaws.services.s3.transfer.model.UploadResult
import com.amazonaws.services.s3.transfer.{TransferManager => AmazonTransferManager}
import net.kemitix.thorp.aws.api.S3Action.{ErroredS3Action, UploadS3Action}
@ -15,25 +15,20 @@ import scala.util.Try
class Uploader(transferManager: => AmazonTransferManager) {
def accepts(localFile: LocalFile)
(implicit multiPartThreshold: Long): Boolean =
localFile.file.length >= multiPartThreshold
def upload(localFile: LocalFile,
bucket: Bucket,
uploadProgressListener: UploadProgressListener,
multiPartThreshold: Long,
tryCount: Int,
maxRetries: Int)
tryCount: Int)
(implicit logger: Logger): IO[S3Action] =
for {
_ <- logMultiPartUploadStart(localFile, tryCount)
upload <- transfer(localFile, bucket, uploadProgressListener)
_ <- logMultiPartUploadFinished(localFile)
} yield upload match {
action = upload match {
case Right(r) => UploadS3Action(RemoteKey(r.getKey), MD5Hash(r.getETag))
case Left(e) => ErroredS3Action(localFile.remoteKey, e)
}
_ <- logMultiPartUploadFinished(localFile)
} yield action
private def transfer(localFile: LocalFile,
bucket: Bucket,
@ -48,9 +43,13 @@ class Uploader(transferManager: => AmazonTransferManager) {
}
}
private def request(localFile: LocalFile, bucket: Bucket, listener: ProgressListener): PutObjectRequest =
private def request(localFile: LocalFile, bucket: Bucket, listener: ProgressListener): PutObjectRequest = {
val metadata = new ObjectMetadata()
localFile.hash.hash64.foreach(metadata.setContentMD5)
new PutObjectRequest(bucket.name, localFile.remoteKey.key, localFile.file)
.withMetadata(metadata)
.withGeneralProgressListener(listener)
}
private def progressListener(uploadProgressListener: UploadProgressListener) =
new ProgressListener {

View file

@ -106,7 +106,7 @@ class S3ClientSuite
pending
//FIXME: works okay on its own, but fails when run with others
val expected = UploadS3Action(remoteKey, rootHash)
val result = s3Client.upload(localFile, bucket, progressListener, config.multiPartThreshold, 1, config.maxRetries)
val result = s3Client.upload(localFile, bucket, progressListener, 1)
assertResult(expected)(result)
}
}

View file

@ -24,28 +24,6 @@ class UploaderSuite
val lastModified = LastModified(Instant.now())
describe("S3ClientMultiPartTransferManagerSuite") {
describe("accepts") {
val transferManager = stub[TransferManager]
val uploader = new Uploader(transferManager)
describe("small-file") {
val smallFile = LocalFile.resolve("small-file", MD5Hash("the-hash"), source, fileToKey)
it("should be a small-file") {
assert(smallFile.file.length < 5 * 1024 * 1024)
}
it("should not accept small-file") {
assertResult(false)(uploader.accepts(smallFile)(config.multiPartThreshold))
}
}
describe("big-file") {
val bigFile = LocalFile.resolve("big-file", MD5Hash("the-hash"), source, fileToKey)
it("should be a big-file") {
assert(bigFile.file.length > 5 * 1024 * 1024)
}
it("should accept big-file") {
assertResult(true)(uploader.accepts(bigFile)(config.multiPartThreshold))
}
}
}
describe("upload") {
pending
// how much of this test is testing the amazonTransferManager
@ -61,7 +39,7 @@ class UploaderSuite
val uploader = new Uploader(amazonS3TransferManager)
it("should upload") {
val expected = UploadS3Action(returnedKey, returnedHash)
val result = uploader.upload(bigFile, config.bucket, progressListener, config.multiPartThreshold, 1, config.maxRetries)
val result = uploader.upload(bigFile, config.bucket, progressListener, 1)
assertResult(expected)(result)
}
}

View file

@ -32,7 +32,13 @@ object ParseArgs {
.text("Exclude matching paths"),
opt[Unit]('d', "debug")
.action((_, cos) => ConfigOption.Debug() :: cos)
.text("Enable debug logging")
.text("Enable debug logging"),
opt[Unit]("no-global")
.action((_, cos) => ConfigOption.IgnoreGlobalOptions :: cos)
.text("Ignore global configuration"),
opt[Unit]("no-user")
.action((_, cos) => ConfigOption.IgnoreUserOptions :: cos)
.text("Ignore user configuration")
)
}

View file

@ -11,7 +11,7 @@ object ActionGenerator {
// #1 local exists, remote exists, remote matches - do nothing
case S3MetaData(localFile, _, Some(RemoteMetaData(remoteKey, remoteHash, _)))
if localFile.hash == remoteHash
if localFile.matches(remoteHash)
=> doNothing(c.bucket, remoteKey)
// #2 local exists, remote is missing, other matches - copy
@ -26,7 +26,7 @@ object ActionGenerator {
// #4 local exists, remote exists, remote no match, other matches - copy
case S3MetaData(localFile, otherMatches, Some(RemoteMetaData(_, remoteHash, _)))
if localFile.hash != remoteHash &&
if !localFile.matches(remoteHash) &&
otherMatches.nonEmpty
=> copyFile(c.bucket, localFile, otherMatches)

View file

@ -18,7 +18,7 @@ object ActionSubmitter {
for {
_ <- logger.info(s" Upload: ${localFile.relative}")
progressListener = new UploadProgressListener(localFile)
action <- s3Client.upload(localFile, bucket, progressListener, c.multiPartThreshold, 1, c.maxRetries)
action <- s3Client.upload(localFile, bucket, progressListener, 1)
} yield action
case ToCopy(bucket, sourceKey, hash, targetKey) =>
for {

View file

@ -28,4 +28,11 @@ object ConfigOption {
case class Debug() extends ConfigOption {
override def update(config: Config): Config = config.copy(debug = true)
}
case object IgnoreUserOptions extends ConfigOption {
override def update(config: Config): Config = config
}
case object IgnoreGlobalOptions extends ConfigOption {
override def update(config: Config): Config = config
}
}

View file

@ -0,0 +1,19 @@
package net.kemitix.thorp.core
trait ConfigQuery {
def ignoreUserOptions(configOptions: Seq[ConfigOption]): Boolean =
configOptions.exists {
case ConfigOption.IgnoreUserOptions => true
case _ => false
}
def ignoreGlobalOptions(configOptions: Seq[ConfigOption]): Boolean =
configOptions.exists {
case ConfigOption.IgnoreGlobalOptions => true
case _ => false
}
}
object ConfigQuery extends ConfigQuery

View file

@ -23,8 +23,8 @@ trait ConfigurationBuilder {
val source = findSource(priorityOptions)
for {
sourceOptions <- sourceOptions(source)
userOptions <- userOptions()
globalOptions <- globalOptions()
userOptions <- userOptions(priorityOptions ++ sourceOptions)
globalOptions <- globalOptions(priorityOptions ++ sourceOptions ++ userOptions)
collected = priorityOptions ++ sourceOptions ++ userOptions ++ globalOptions
config = collateOptions(collected)
} yield validateConfig(config).toEither
@ -39,11 +39,13 @@ trait ConfigurationBuilder {
private def sourceOptions(source: File): IO[Seq[ConfigOption]] =
readFile(source, ".thorp.conf")
private def userOptions(): IO[Seq[ConfigOption]] =
readFile(userHome, ".config/thorp.conf")
private def userOptions(higherPriorityOptions: Seq[ConfigOption]): IO[Seq[ConfigOption]] =
if (ConfigQuery.ignoreUserOptions(higherPriorityOptions)) IO(List())
else readFile(userHome, ".config/thorp.conf")
private def globalOptions(): IO[Seq[ConfigOption]] =
parseFile(Paths.get("/etc/thorp.conf"))
private def globalOptions(higherPriorityOptions: Seq[ConfigOption]): IO[Seq[ConfigOption]] =
if (ConfigQuery.ignoreGlobalOptions(higherPriorityOptions)) IO(List())
else parseFile(Paths.get("/etc/thorp.conf"))
private def userHome = new File(System.getProperty("user.home"))

View file

@ -20,10 +20,7 @@ object LocalFileStream {
def loop(file: File): IO[Stream[LocalFile]] = {
def dirPaths(file: File): IO[Stream[File]] =
IO.pure {
Option(file.listFiles)
.getOrElse(throw new IllegalArgumentException(s"Directory not found $file"))
}
IO(listFiles(file))
.map(fs =>
Stream(fs: _*)
.filter(f => filters(f.toPath)))
@ -50,4 +47,10 @@ object LocalFileStream {
loop(file)
}
//TODO: Change this to return an Either[IllegalArgumentException, Array[File]]
private def listFiles(file: File) = {
Option(file.listFiles)
.getOrElse(throw new IllegalArgumentException(s"Directory not found $file"))
}
}

View file

@ -42,20 +42,20 @@ object MD5HashGenerator {
val buffer = readToBuffer(fis, currentOffset)
md5 update buffer
}}
(md5.digest map ("%02x" format _)).mkString
md5.digest
}
def readFile: IO[String] =
def readFile =
for {
fis <- openFile
md5 <- digestFile(fis)
digest <- digestFile(fis)
_ <- closeFile(fis)
} yield md5
} yield digest
for {
_ <- logger.debug(s"md5:reading:size ${file.length}:$file")
md5 <- readFile
hash = MD5Hash(md5)
digest <- readFile
hash = MD5Hash.fromDigest(digest)
_ <- logger.debug(s"md5:generated:${hash.hash}:$file")
} yield hash
}

View file

@ -24,8 +24,13 @@ trait Sync {
(configOptions: Seq[ConfigOption])
(implicit defaultLogger: Logger): IO[Either[List[String], Unit]] =
buildConfig(configOptions).flatMap {
case Right(config) => runWithValidConfig(s3Client, defaultLogger, config)
case Left(errors) => IO.pure(Left(errorMessages(errors.toList)))
case Right(config) =>
}
private def runWithValidConfig(s3Client: S3Client,
defaultLogger: Logger,
config: Config) = {
for {
_ <- run(config, s3Client, defaultLogger.withDebug(config.debug))
} yield Right(())

View file

@ -1,6 +1,7 @@
package net.kemitix.thorp.core
import cats.effect.IO
import cats.implicits._
import net.kemitix.thorp.aws.api.S3Action
import net.kemitix.thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action, ErroredS3Action, UploadS3Action}
import net.kemitix.thorp.domain.{Config, Logger}
@ -16,6 +17,15 @@ object SyncLogging {
logger: Logger): IO[Unit] =
logger.info(s"Scanning local files: ${c.source}...")
def logErrors(actions: Stream[S3Action])
(implicit logger: Logger): IO[Unit] =
for {
_ <- actions.map {
case ErroredS3Action(k, e) => logger.warn(s"${k.key}: ${e.getMessage}")
case _ => IO.unit
}.sequence
} yield ()
def logRunFinished(actions: Stream[S3Action])
(implicit c: Config,
logger: Logger): IO[Unit] = {
@ -25,10 +35,12 @@ object SyncLogging {
_ <- logger.info(s"Copied ${counters.copied} files")
_ <- logger.info(s"Deleted ${counters.deleted} files")
_ <- logger.info(s"Errors ${counters.errors}")
_ <- logErrors(actions)
} yield ()
}
private def countActivities(implicit c: Config): (Counters, S3Action) => Counters =
private def countActivities(implicit c: Config,
logger: Logger): (Counters, S3Action) => Counters =
(counters: Counters, s3Action: S3Action) => {
s3Action match {
case _: UploadS3Action =>

View file

@ -4,7 +4,7 @@ import net.kemitix.thorp.domain.MD5Hash
object MD5HashData {
val rootHash = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e")
val rootHash = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e", Some("o6asEaDrV3uBs7tclcyKbg=="))
val leafHash = MD5Hash("208386a650bdec61cfcd7bd8dcb6b542")

View file

@ -21,7 +21,7 @@ class MD5HashGeneratorTest extends FunSpec {
describe("read a large file (bigger than buffer)") {
val file = Resource(this, "big-file")
it("should generate the correct hash") {
val expected = MD5Hash("b1ab1f7680138e6db7309200584e35d8")
val expected = MD5Hash("b1ab1f7680138e6db7309200584e35d8", Some("sasfdoATjm23MJIAWE412A=="))
val result = MD5HashGenerator.md5File(file).unsafeRunSync
assertResult(expected)(result)
}

View file

@ -19,7 +19,9 @@ class SyncSuite
private val configOptions = List(
ConfigOption.Source(source.toPath),
ConfigOption.Bucket("bucket"),
ConfigOption.Prefix("prefix")
ConfigOption.Prefix("prefix"),
ConfigOption.IgnoreGlobalOptions,
ConfigOption.IgnoreUserOptions
)
implicit private val logger: Logger = new DummyLogger
private val lastModified = LastModified(Instant.now)
@ -41,20 +43,21 @@ class SyncSuite
val s3Client = new RecordingClient(testBucket, S3ObjectsData(
byHash = Map(),
byKey = Map()))
invokeSubject(s3Client, configOptions)
it("uploads all files") {
val expectedUploads = Map(
"subdir/leaf-file" -> leafRemoteKey,
"root-file" -> rootRemoteKey
)
"root-file" -> rootRemoteKey)
invokeSubject(s3Client, configOptions)
assertResult(expectedUploads)(s3Client.uploadsRecord)
}
it("copies nothing") {
val expectedCopies = Map()
invokeSubject(s3Client, configOptions)
assertResult(expectedCopies)(s3Client.copiesRecord)
}
it("deletes nothing") {
val expectedDeletions = Set()
invokeSubject(s3Client, configOptions)
assertResult(expectedDeletions)(s3Client.deletionsRecord)
}
}
@ -67,17 +70,19 @@ class SyncSuite
RemoteKey("prefix/root-file") -> HashModified(rootHash, lastModified),
RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified)))
val s3Client = new RecordingClient(testBucket, s3ObjectsData)
invokeSubject(s3Client, configOptions)
it("uploads nothing") {
val expectedUploads = Map()
invokeSubject(s3Client, configOptions)
assertResult(expectedUploads)(s3Client.uploadsRecord)
}
it("copies nothing") {
val expectedCopies = Map()
invokeSubject(s3Client, configOptions)
assertResult(expectedCopies)(s3Client.copiesRecord)
}
it("deletes nothing") {
val expectedDeletions = Set()
invokeSubject(s3Client, configOptions)
assertResult(expectedDeletions)(s3Client.deletionsRecord)
}
}
@ -91,17 +96,19 @@ class SyncSuite
RemoteKey("prefix/root-file-old") -> HashModified(rootHash, lastModified),
RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified)))
val s3Client = new RecordingClient(testBucket, s3ObjectsData)
invokeSubject(s3Client, configOptions)
it("uploads nothing") {
invokeSubject(s3Client, configOptions)
val expectedUploads = Map()
assertResult(expectedUploads)(s3Client.uploadsRecord)
}
it("copies the file") {
val expectedCopies = Map(RemoteKey("prefix/root-file-old") -> RemoteKey("prefix/root-file"))
invokeSubject(s3Client, configOptions)
assertResult(expectedCopies)(s3Client.copiesRecord)
}
it("deletes the original") {
val expectedDeletions = Set(RemoteKey("prefix/root-file-old"))
invokeSubject(s3Client, configOptions)
assertResult(expectedDeletions)(s3Client.deletionsRecord)
}
}
@ -119,20 +126,20 @@ class SyncSuite
byKey = Map(
deletedKey -> HashModified(deletedHash, lastModified)))
val s3Client = new RecordingClient(testBucket, s3ObjectsData)
invokeSubject(s3Client, configOptions)
it("deleted key") {
val expectedDeletions = Set(deletedKey)
invokeSubject(s3Client, configOptions)
assertResult(expectedDeletions)(s3Client.deletionsRecord)
}
}
describe("when a file is excluded") {
val s3ObjectsData = S3ObjectsData(Map(), Map())
val s3Client = new RecordingClient(testBucket, s3ObjectsData)
invokeSubject(s3Client, ConfigOption.Exclude("leaf") :: configOptions)
it("is not uploaded") {
val expectedUploads = Map(
"root-file" -> rootRemoteKey
)
invokeSubject(s3Client, ConfigOption.Exclude("leaf") :: configOptions)
assertResult(expectedUploads)(s3Client.uploadsRecord)
}
}
@ -154,13 +161,11 @@ class SyncSuite
override def upload(localFile: LocalFile,
bucket: Bucket,
progressListener: UploadProgressListener,
multiPartThreshold: Long,
tryCount: Int,
maxRetries: Int)
tryCount: Int)
(implicit logger: Logger): IO[UploadS3Action] = {
if (bucket == testBucket)
uploadsRecord += (localFile.relative.toString -> localFile.remoteKey)
IO.pure(UploadS3Action(localFile.remoteKey, MD5Hash("some hash value")))
IO.pure(UploadS3Action(localFile.remoteKey, localFile.hash))
}
override def copy(bucket: Bucket,

View file

@ -2,14 +2,8 @@ package net.kemitix.thorp.domain
import java.io.File
final case class Config(
bucket: Bucket = Bucket(""),
final case class Config(bucket: Bucket = Bucket(""),
prefix: RemoteKey = RemoteKey(""),
filters: List[Filter] = List(),
multiPartThreshold: Long = 1024 * 1024 * 5,
maxRetries: Int = 3,
debug: Boolean = false,
source: File
) {
require(multiPartThreshold >= 1024 * 1024 * 5, s"Threshold for multi-part upload is 5Mb: '$multiPartThreshold'")
}
source: File)

View file

@ -15,6 +15,8 @@ final case class LocalFile(file: File, source: File, hash: MD5Hash, keyGenerator
// the path of the file within the source
def relative: Path = source.toPath.relativize(file.toPath)
def matches(other: MD5Hash): Boolean = hash.hash == other.hash
}
object LocalFile {

View file

@ -1,9 +1,16 @@
package net.kemitix.thorp.domain
import java.util.Base64
import net.kemitix.thorp.domain.QuoteStripper.stripQuotes
final case class MD5Hash(in: String) {
final case class MD5Hash(in: String, hash64: Option[String] = None) {
lazy val hash: String = in filter stripQuotes
}
object MD5Hash {
def fromDigest(digest: Array[Byte]): MD5Hash =
MD5Hash((digest map ("%02x" format _)).mkString, Some(Base64.getEncoder.encodeToString(digest)))
}