[core] MD5HashGenerator uses IO to return where there is file IO (#47)
* [core] MD5HashGenerator uses IO to return where there is file IO This required that LocalFile in the domain module no longer be supplied with a function to convert a File into an MD5Hash. Because such a function requires reading the file it now must use IO, which we don't allow in the domain module. Unfortunate ripple effects out to users of MD5HashGenerator and LocalFile. * [aws-lib] Add own copy of test class MD5HashData
This commit is contained in:
parent
f6bf2700ff
commit
7ffa386b29
14 changed files with 185 additions and 159 deletions
|
@ -0,0 +1,11 @@
|
||||||
|
package net.kemitix.s3thorp.aws.lib
|
||||||
|
|
||||||
|
import net.kemitix.s3thorp.domain.MD5Hash
|
||||||
|
|
||||||
|
object MD5HashData {
|
||||||
|
|
||||||
|
val rootHash = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e")
|
||||||
|
|
||||||
|
val leafHash = MD5Hash("208386a650bdec61cfcd7bd8dcb6b542")
|
||||||
|
|
||||||
|
}
|
|
@ -1,6 +1,5 @@
|
||||||
package net.kemitix.s3thorp.aws.lib
|
package net.kemitix.s3thorp.aws.lib
|
||||||
|
|
||||||
import java.io.File
|
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
|
|
||||||
import com.amazonaws.services.s3.AmazonS3
|
import com.amazonaws.services.s3.AmazonS3
|
||||||
|
@ -9,7 +8,8 @@ import com.amazonaws.services.s3.transfer.model.UploadResult
|
||||||
import com.amazonaws.services.s3.transfer.{TransferManager, Upload}
|
import com.amazonaws.services.s3.transfer.{TransferManager, Upload}
|
||||||
import net.kemitix.s3thorp.aws.api.S3Action.UploadS3Action
|
import net.kemitix.s3thorp.aws.api.S3Action.UploadS3Action
|
||||||
import net.kemitix.s3thorp.aws.api.{S3Client, UploadProgressListener}
|
import net.kemitix.s3thorp.aws.api.{S3Client, UploadProgressListener}
|
||||||
import net.kemitix.s3thorp.core.{KeyGenerator, MD5HashGenerator, Resource, S3MetaDataEnricher}
|
import net.kemitix.s3thorp.aws.lib.MD5HashData.rootHash
|
||||||
|
import net.kemitix.s3thorp.core.{KeyGenerator, Resource, S3MetaDataEnricher}
|
||||||
import net.kemitix.s3thorp.domain._
|
import net.kemitix.s3thorp.domain._
|
||||||
import org.scalamock.scalatest.MockFactory
|
import org.scalamock.scalatest.MockFactory
|
||||||
import org.scalatest.FunSpec
|
import org.scalatest.FunSpec
|
||||||
|
@ -25,15 +25,14 @@ class S3ClientSuite
|
||||||
implicit private val logInfo: Int => String => Unit = l => m => ()
|
implicit private val logInfo: Int => String => Unit = l => m => ()
|
||||||
implicit private val logWarn: String => Unit = w => ()
|
implicit private val logWarn: String => Unit = w => ()
|
||||||
private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _
|
private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _
|
||||||
private val fileToHash = (file: File) => MD5HashGenerator.md5File(file)
|
|
||||||
|
|
||||||
describe("getS3Status") {
|
describe("getS3Status") {
|
||||||
val hash = MD5Hash("hash")
|
val hash = MD5Hash("hash")
|
||||||
val localFile = LocalFile.resolve("the-file", hash, source, fileToKey, fileToHash)
|
val localFile = LocalFile.resolve("the-file", hash, source, fileToKey)
|
||||||
val key = localFile.remoteKey
|
val key = localFile.remoteKey
|
||||||
val keyotherkey = LocalFile.resolve("other-key-same-hash", hash, source, fileToKey, fileToHash)
|
val keyotherkey = LocalFile.resolve("other-key-same-hash", hash, source, fileToKey)
|
||||||
val diffhash = MD5Hash("diff")
|
val diffhash = MD5Hash("diff")
|
||||||
val keydiffhash = LocalFile.resolve("other-key-diff-hash", diffhash, source, fileToKey, fileToHash)
|
val keydiffhash = LocalFile.resolve("other-key-diff-hash", diffhash, source, fileToKey)
|
||||||
val lastModified = LastModified(Instant.now)
|
val lastModified = LastModified(Instant.now)
|
||||||
val s3ObjectsData: S3ObjectsData = S3ObjectsData(
|
val s3ObjectsData: S3ObjectsData = S3ObjectsData(
|
||||||
byHash = Map(
|
byHash = Map(
|
||||||
|
@ -63,7 +62,7 @@ class S3ClientSuite
|
||||||
describe("when remote key does not exist and no others matches hash") {
|
describe("when remote key does not exist and no others matches hash") {
|
||||||
val s3Client = S3ClientBuilder.defaultClient
|
val s3Client = S3ClientBuilder.defaultClient
|
||||||
it("should return (None, Set.empty)") {
|
it("should return (None, Set.empty)") {
|
||||||
val localFile = LocalFile.resolve("missing-file", MD5Hash("unique"), source, fileToKey, fileToHash)
|
val localFile = LocalFile.resolve("missing-file", MD5Hash("unique"), source, fileToKey)
|
||||||
assertResult(
|
assertResult(
|
||||||
(None,
|
(None,
|
||||||
Set.empty)
|
Set.empty)
|
||||||
|
@ -91,23 +90,23 @@ class S3ClientSuite
|
||||||
val s3Client = new ThorpS3Client(amazonS3, amazonS3TransferManager)
|
val s3Client = new ThorpS3Client(amazonS3, amazonS3TransferManager)
|
||||||
|
|
||||||
val prefix = RemoteKey("prefix")
|
val prefix = RemoteKey("prefix")
|
||||||
val md5Hash = MD5HashGenerator.md5File(source.toPath.resolve("root-file").toFile)
|
val localFile =
|
||||||
val localFile: LocalFile = LocalFile.resolve("root-file", md5Hash, source, KeyGenerator.generateKey(source, prefix), fileToHash)
|
LocalFile.resolve("root-file", rootHash, source, KeyGenerator.generateKey(source, prefix))
|
||||||
val bucket: Bucket = Bucket("a-bucket")
|
val bucket = Bucket("a-bucket")
|
||||||
val remoteKey: RemoteKey = RemoteKey("prefix/root-file")
|
val remoteKey = RemoteKey("prefix/root-file")
|
||||||
val progressListener = new UploadProgressListener(localFile)
|
val progressListener = new UploadProgressListener(localFile)
|
||||||
|
|
||||||
val upload = stub[Upload]
|
val upload = stub[Upload]
|
||||||
(amazonS3TransferManager upload (_: PutObjectRequest)).when(*).returns(upload)
|
(amazonS3TransferManager upload (_: PutObjectRequest)).when(*).returns(upload)
|
||||||
val uploadResult = stub[UploadResult]
|
val uploadResult = stub[UploadResult]
|
||||||
(upload.waitForUploadResult _).when().returns(uploadResult)
|
(upload.waitForUploadResult _).when().returns(uploadResult)
|
||||||
(uploadResult.getETag _).when().returns(md5Hash.hash)
|
(uploadResult.getETag _).when().returns(rootHash.hash)
|
||||||
(uploadResult.getKey _).when().returns(remoteKey.key)
|
(uploadResult.getKey _).when().returns(remoteKey.key)
|
||||||
|
|
||||||
it("should return hash of uploaded file") {
|
it("should return hash of uploaded file") {
|
||||||
pending
|
pending
|
||||||
//FIXME: works okay on its own, but fails when run with others
|
//FIXME: works okay on its own, but fails when run with others
|
||||||
val expected = UploadS3Action(remoteKey, md5Hash)
|
val expected = UploadS3Action(remoteKey, rootHash)
|
||||||
val result = s3Client.upload(localFile, bucket, progressListener, config.multiPartThreshold, 1, config.maxRetries).unsafeRunSync
|
val result = s3Client.upload(localFile, bucket, progressListener, config.multiPartThreshold, 1, config.maxRetries).unsafeRunSync
|
||||||
assertResult(expected)(result)
|
assertResult(expected)(result)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,17 +1,13 @@
|
||||||
package net.kemitix.s3thorp.aws.lib
|
package net.kemitix.s3thorp.aws.lib
|
||||||
|
|
||||||
import java.io.File
|
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
|
|
||||||
import com.amazonaws.AmazonClientException
|
import com.amazonaws.services.s3.AmazonS3
|
||||||
import com.amazonaws.event.ProgressListener
|
|
||||||
import com.amazonaws.services.s3.{AmazonS3, model}
|
|
||||||
import com.amazonaws.services.s3.transfer.model.UploadResult
|
|
||||||
import com.amazonaws.services.s3.transfer._
|
import com.amazonaws.services.s3.transfer._
|
||||||
import net.kemitix.s3thorp.aws.api.S3Action.UploadS3Action
|
import net.kemitix.s3thorp.aws.api.S3Action.UploadS3Action
|
||||||
import net.kemitix.s3thorp.aws.api.UploadProgressListener
|
import net.kemitix.s3thorp.aws.api.UploadProgressListener
|
||||||
import net.kemitix.s3thorp.core.KeyGenerator.generateKey
|
import net.kemitix.s3thorp.core.KeyGenerator.generateKey
|
||||||
import net.kemitix.s3thorp.core.{MD5HashGenerator, Resource}
|
import net.kemitix.s3thorp.core.Resource
|
||||||
import net.kemitix.s3thorp.domain._
|
import net.kemitix.s3thorp.domain._
|
||||||
import org.scalamock.scalatest.MockFactory
|
import org.scalamock.scalatest.MockFactory
|
||||||
import org.scalatest.FunSpec
|
import org.scalatest.FunSpec
|
||||||
|
@ -26,7 +22,6 @@ class S3ClientTransferManagerSuite
|
||||||
implicit private val logInfo: Int => String => Unit = l => m => ()
|
implicit private val logInfo: Int => String => Unit = l => m => ()
|
||||||
implicit private val logWarn: String => Unit = w => ()
|
implicit private val logWarn: String => Unit = w => ()
|
||||||
private val fileToKey = generateKey(config.source, config.prefix) _
|
private val fileToKey = generateKey(config.source, config.prefix) _
|
||||||
private val fileToHash = (file: File) => MD5HashGenerator.md5File(file)
|
|
||||||
val lastModified = LastModified(Instant.now())
|
val lastModified = LastModified(Instant.now())
|
||||||
|
|
||||||
describe("S3ClientMultiPartTransferManagerSuite") {
|
describe("S3ClientMultiPartTransferManagerSuite") {
|
||||||
|
@ -34,7 +29,7 @@ class S3ClientTransferManagerSuite
|
||||||
val transferManager = stub[TransferManager]
|
val transferManager = stub[TransferManager]
|
||||||
val uploader = new S3ClientTransferManager(transferManager)
|
val uploader = new S3ClientTransferManager(transferManager)
|
||||||
describe("small-file") {
|
describe("small-file") {
|
||||||
val smallFile = LocalFile.resolve("small-file", MD5Hash("the-hash"), source, fileToKey, fileToHash)
|
val smallFile = LocalFile.resolve("small-file", MD5Hash("the-hash"), source, fileToKey)
|
||||||
it("should be a small-file") {
|
it("should be a small-file") {
|
||||||
assert(smallFile.file.length < 5 * 1024 * 1024)
|
assert(smallFile.file.length < 5 * 1024 * 1024)
|
||||||
}
|
}
|
||||||
|
@ -43,7 +38,7 @@ class S3ClientTransferManagerSuite
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
describe("big-file") {
|
describe("big-file") {
|
||||||
val bigFile = LocalFile.resolve("big-file", MD5Hash("the-hash"), source, fileToKey, fileToHash)
|
val bigFile = LocalFile.resolve("big-file", MD5Hash("the-hash"), source, fileToKey)
|
||||||
it("should be a big-file") {
|
it("should be a big-file") {
|
||||||
assert(bigFile.file.length > 5 * 1024 * 1024)
|
assert(bigFile.file.length > 5 * 1024 * 1024)
|
||||||
}
|
}
|
||||||
|
@ -60,7 +55,7 @@ class S3ClientTransferManagerSuite
|
||||||
// dies when putObject is called
|
// dies when putObject is called
|
||||||
val returnedKey = RemoteKey("returned-key")
|
val returnedKey = RemoteKey("returned-key")
|
||||||
val returnedHash = MD5Hash("returned-hash")
|
val returnedHash = MD5Hash("returned-hash")
|
||||||
val bigFile = LocalFile.resolve("small-file", MD5Hash("the-hash"), source, fileToKey, fileToHash)
|
val bigFile = LocalFile.resolve("small-file", MD5Hash("the-hash"), source, fileToKey)
|
||||||
val progressListener = new UploadProgressListener(bigFile)
|
val progressListener = new UploadProgressListener(bigFile)
|
||||||
val amazonS3 = mock[AmazonS3]
|
val amazonS3 = mock[AmazonS3]
|
||||||
val amazonS3TransferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build
|
val amazonS3TransferManager = TransferManagerBuilder.standard().withS3Client(amazonS3).build
|
||||||
|
|
|
@ -2,35 +2,57 @@ package net.kemitix.s3thorp.core
|
||||||
|
|
||||||
import java.io.File
|
import java.io.File
|
||||||
|
|
||||||
|
import cats.effect.IO
|
||||||
import net.kemitix.s3thorp.core.KeyGenerator.generateKey
|
import net.kemitix.s3thorp.core.KeyGenerator.generateKey
|
||||||
import net.kemitix.s3thorp.domain.{Config, LocalFile, MD5Hash}
|
import net.kemitix.s3thorp.domain.{Config, LocalFile, MD5Hash}
|
||||||
|
|
||||||
object LocalFileStream {
|
object LocalFileStream {
|
||||||
|
|
||||||
def findFiles(file: File,
|
def findFiles(file: File,
|
||||||
md5HashGenerator: File => MD5Hash,
|
md5HashGenerator: File => IO[MD5Hash],
|
||||||
info: Int => String => Unit)
|
info: Int => String => Unit)
|
||||||
(implicit c: Config): Stream[LocalFile] = {
|
(implicit c: Config): IO[Stream[LocalFile]] = {
|
||||||
def loop(file: File): Stream[LocalFile] = {
|
|
||||||
info(2)(s"- Entering: $file")
|
def loop(file: File): IO[Stream[LocalFile]] = {
|
||||||
val files = for {
|
|
||||||
f <- dirPaths(file)
|
def dirPaths(file: File): IO[Stream[File]] =
|
||||||
.filter { f => f.isDirectory || c.filters.forall { filter => filter isIncluded f.toPath } }
|
IO {
|
||||||
.filter { f => c.excludes.forall { exclude => exclude isIncluded f.toPath } }
|
Option(file.listFiles)
|
||||||
fs <- recurseIntoSubDirectories(f)
|
.getOrElse(throw new IllegalArgumentException(s"Directory not found $file"))
|
||||||
} yield fs
|
}
|
||||||
info(5)(s"- Leaving: $file")
|
.map(fs =>
|
||||||
files
|
Stream(fs: _*)
|
||||||
|
.filter(isIncluded))
|
||||||
|
|
||||||
|
def recurseIntoSubDirectories(file: File)(implicit c: Config): IO[Stream[LocalFile]] =
|
||||||
|
file match {
|
||||||
|
case f if f.isDirectory => loop(file)
|
||||||
|
case _ => for(hash <- md5HashGenerator(file))
|
||||||
|
yield Stream(LocalFile(file, c.source, hash, generateKey(c.source, c.prefix)))
|
||||||
|
}
|
||||||
|
|
||||||
|
def filterIsIncluded(f: File): Boolean =
|
||||||
|
f.isDirectory || c.filters.forall { filter => filter isIncluded f.toPath }
|
||||||
|
|
||||||
|
def excludeIsIncluded(f: File): Boolean =
|
||||||
|
c.excludes.forall { exclude => exclude isIncluded f.toPath }
|
||||||
|
|
||||||
|
def isIncluded(f: File): Boolean =
|
||||||
|
filterIsIncluded(f) && excludeIsIncluded(f)
|
||||||
|
|
||||||
|
def recurse(fs: Stream[File]): IO[Stream[LocalFile]] =
|
||||||
|
fs.foldLeft(IO.pure(Stream.empty[LocalFile]))((acc, f) =>
|
||||||
|
recurseIntoSubDirectories(f)
|
||||||
|
.flatMap(lfs => acc.map(s => s ++ lfs)))
|
||||||
|
|
||||||
|
for {
|
||||||
|
_ <- IO(info(2)(s"- Entering: $file"))
|
||||||
|
fs <- dirPaths(file)
|
||||||
|
lfs <- recurse(fs)
|
||||||
|
_ <- IO(info(5)(s"- Leaving : $file"))
|
||||||
|
} yield lfs
|
||||||
}
|
}
|
||||||
|
|
||||||
def dirPaths(file: File): Stream[File] =
|
|
||||||
Option(file.listFiles)
|
|
||||||
.getOrElse(throw new IllegalArgumentException(s"Directory not found $file")).toStream
|
|
||||||
|
|
||||||
def recurseIntoSubDirectories(file: File)(implicit c: Config): Stream[LocalFile] =
|
|
||||||
if (file.isDirectory) loop(file)
|
|
||||||
else Stream(LocalFile(file, c.source, generateKey(c.source, c.prefix), md5HashGenerator))
|
|
||||||
|
|
||||||
loop(file)
|
loop(file)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,29 +3,42 @@ package net.kemitix.s3thorp.core
|
||||||
import java.io.{File, FileInputStream}
|
import java.io.{File, FileInputStream}
|
||||||
import java.security.MessageDigest
|
import java.security.MessageDigest
|
||||||
|
|
||||||
|
import cats.effect.IO
|
||||||
import net.kemitix.s3thorp.domain.MD5Hash
|
import net.kemitix.s3thorp.domain.MD5Hash
|
||||||
|
|
||||||
object MD5HashGenerator {
|
object MD5HashGenerator {
|
||||||
|
|
||||||
def md5File(file: File)
|
def md5File(file: File)
|
||||||
(implicit info: Int => String => Unit): MD5Hash = {
|
(implicit info: Int => String => Unit): IO[MD5Hash] =
|
||||||
val hash = md5FilePart(file, 0, file.length)
|
md5FilePart(file, 0, file.length)
|
||||||
hash
|
|
||||||
}
|
|
||||||
|
|
||||||
def md5FilePart(file: File,
|
def md5FilePart(file: File,
|
||||||
offset: Long,
|
offset: Long,
|
||||||
size: Long)
|
size: Long)
|
||||||
(implicit info: Int => String => Unit): MD5Hash = {
|
(implicit info: Int => String => Unit): IO[MD5Hash] = {
|
||||||
info(5)(s"md5:reading:offset $offset:size $size:$file")
|
|
||||||
val fis = new FileInputStream(file)
|
|
||||||
fis skip offset
|
|
||||||
val buffer = new Array[Byte](size.toInt)
|
val buffer = new Array[Byte](size.toInt)
|
||||||
fis read buffer
|
|
||||||
val hash = md5PartBody(buffer)
|
def readIntoBuffer = {
|
||||||
info(5)(s"md5:generated:${hash.hash}")
|
fis: FileInputStream =>
|
||||||
fis.close
|
IO {
|
||||||
hash
|
fis skip offset
|
||||||
|
fis read buffer
|
||||||
|
fis
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def closeFile = {fis: FileInputStream => IO(fis.close())}
|
||||||
|
|
||||||
|
def openFile = IO(new FileInputStream(file))
|
||||||
|
|
||||||
|
def readFile = openFile.bracket(readIntoBuffer)(closeFile)
|
||||||
|
|
||||||
|
for {
|
||||||
|
_ <- IO(info(5)(s"md5:reading:offset $offset:size $size:$file"))
|
||||||
|
_ <- readFile
|
||||||
|
hash = md5PartBody(buffer)
|
||||||
|
_ <- IO (info(5)(s"md5:generated:${hash.hash}"))
|
||||||
|
} yield hash
|
||||||
}
|
}
|
||||||
|
|
||||||
def md5PartBody(partBody: Array[Byte]): MD5Hash = {
|
def md5PartBody(partBody: Array[Byte]): MD5Hash = {
|
||||||
|
|
|
@ -6,12 +6,11 @@ object S3MetaDataEnricher {
|
||||||
|
|
||||||
def getMetadata(localFile: LocalFile,
|
def getMetadata(localFile: LocalFile,
|
||||||
s3ObjectsData: S3ObjectsData)
|
s3ObjectsData: S3ObjectsData)
|
||||||
(implicit c: Config): Stream[S3MetaData] = {
|
(implicit c: Config): S3MetaData = {
|
||||||
val (keyMatches, hashMatches) = getS3Status(localFile, s3ObjectsData)
|
val (keyMatches, hashMatches) = getS3Status(localFile, s3ObjectsData)
|
||||||
Stream(
|
S3MetaData(localFile,
|
||||||
S3MetaData(localFile,
|
matchByKey = keyMatches map { hm => RemoteMetaData(localFile.remoteKey, hm.hash, hm.modified) },
|
||||||
matchByKey = keyMatches map { hm => RemoteMetaData(localFile.remoteKey, hm.hash, hm.modified) },
|
matchByHash = hashMatches map { km => RemoteMetaData(km.key, localFile.hash, km.modified) })
|
||||||
matchByHash = hashMatches map { km => RemoteMetaData(km.key, localFile.hash, km.modified) }))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def getS3Status(localFile: LocalFile,
|
def getS3Status(localFile: LocalFile,
|
||||||
|
|
|
@ -4,7 +4,7 @@ import java.io.File
|
||||||
|
|
||||||
import cats.effect.IO
|
import cats.effect.IO
|
||||||
import cats.implicits._
|
import cats.implicits._
|
||||||
import net.kemitix.s3thorp.aws.api.S3Client
|
import net.kemitix.s3thorp.aws.api.{S3Action, S3Client}
|
||||||
import net.kemitix.s3thorp.core.Action.ToDelete
|
import net.kemitix.s3thorp.core.Action.ToDelete
|
||||||
import net.kemitix.s3thorp.core.ActionGenerator.createActions
|
import net.kemitix.s3thorp.core.ActionGenerator.createActions
|
||||||
import net.kemitix.s3thorp.core.ActionSubmitter.submitAction
|
import net.kemitix.s3thorp.core.ActionSubmitter.submitAction
|
||||||
|
@ -16,38 +16,36 @@ import net.kemitix.s3thorp.domain.{Config, MD5Hash, S3ObjectsData}
|
||||||
object Sync {
|
object Sync {
|
||||||
|
|
||||||
def run(s3Client: S3Client,
|
def run(s3Client: S3Client,
|
||||||
md5HashGenerator: File => MD5Hash,
|
md5HashGenerator: File => IO[MD5Hash],
|
||||||
info: Int => String => Unit,
|
info: Int => String => Unit,
|
||||||
warn: String => Unit,
|
warn: String => Unit,
|
||||||
error: String => Unit)
|
error: String => Unit)
|
||||||
(implicit c: Config): IO[Unit] = {
|
(implicit c: Config): IO[Unit] = {
|
||||||
def copyUploadActions(s3Data: S3ObjectsData) = {
|
|
||||||
for {actions <- {
|
|
||||||
for {
|
|
||||||
file <- findFiles(c.source, md5HashGenerator, info)
|
|
||||||
data <- getMetadata(file, s3Data)
|
|
||||||
action <- createActions(data)
|
|
||||||
s3Action <- submitAction(s3Client, action)(c, info, warn)
|
|
||||||
} yield s3Action
|
|
||||||
}.sequence
|
|
||||||
} yield actions.sorted
|
|
||||||
}
|
|
||||||
|
|
||||||
def deleteActions(s3ObjectsData: S3ObjectsData) = {
|
def copyUploadActions(s3Data: S3ObjectsData): IO[Stream[S3Action]] =
|
||||||
|
(for {
|
||||||
|
sFiles <- findFiles(c.source, md5HashGenerator, info)
|
||||||
|
sData <- IO(sFiles.map(file => getMetadata(file, s3Data)))
|
||||||
|
sActions <- IO(sData.flatMap(s3MetaData => createActions(s3MetaData)))
|
||||||
|
sS3Actions <- IO(sActions.flatMap(action => submitAction(s3Client, action)(c, info, warn)))
|
||||||
|
} yield sS3Actions.sequence)
|
||||||
|
.flatten
|
||||||
|
.map(streamS3Actions => streamS3Actions.sorted)
|
||||||
|
|
||||||
|
def deleteActions(s3ObjectsData: S3ObjectsData): IO[Stream[S3Action]] =
|
||||||
(for {
|
(for {
|
||||||
key <- s3ObjectsData.byKey.keys
|
key <- s3ObjectsData.byKey.keys
|
||||||
if key.isMissingLocally(c.source, c.prefix)
|
if key.isMissingLocally(c.source, c.prefix)
|
||||||
ioDelAction <- submitAction(s3Client, ToDelete(c.bucket, key))(c, info, warn)
|
ioDelAction <- submitAction(s3Client, ToDelete(c.bucket, key))(c, info, warn)
|
||||||
} yield ioDelAction).toStream.sequence
|
} yield ioDelAction).toStream.sequence
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
_ <- logRunStart(info)
|
_ <- logRunStart(info)
|
||||||
s3data <- s3Client.listObjects(c.bucket, c.prefix)(info)
|
s3data <- s3Client.listObjects(c.bucket, c.prefix)(info)
|
||||||
_ <- logFileScan(info)
|
_ <- logFileScan(info)
|
||||||
copyUploadActions <- copyUploadActions(s3data)
|
copyUploadActions <- copyUploadActions(s3data)
|
||||||
deleteAction <- deleteActions(s3data)
|
deleteActions <- deleteActions(s3data)
|
||||||
_ <- logRunFinished(copyUploadActions ++ deleteAction, info)
|
_ <- logRunFinished(copyUploadActions ++ deleteActions, info)
|
||||||
} yield ()
|
} yield ()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
package net.kemitix.s3thorp.core
|
package net.kemitix.s3thorp.core
|
||||||
|
|
||||||
import java.io.File
|
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
|
|
||||||
import net.kemitix.s3thorp.core.Action.{DoNothing, ToCopy, ToUpload}
|
import net.kemitix.s3thorp.core.Action.{DoNothing, ToCopy, ToUpload}
|
||||||
|
@ -16,7 +15,6 @@ class ActionGeneratorSuite
|
||||||
implicit private val config: Config = Config(bucket, prefix, source = source)
|
implicit private val config: Config = Config(bucket, prefix, source = source)
|
||||||
implicit private val logInfo: Int => String => Unit = l => i => ()
|
implicit private val logInfo: Int => String => Unit = l => i => ()
|
||||||
private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _
|
private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _
|
||||||
private val fileToHash = (file: File) => MD5HashGenerator.md5File(file)
|
|
||||||
val lastModified = LastModified(Instant.now())
|
val lastModified = LastModified(Instant.now())
|
||||||
|
|
||||||
describe("create actions") {
|
describe("create actions") {
|
||||||
|
@ -25,7 +23,7 @@ class ActionGeneratorSuite
|
||||||
|
|
||||||
describe("#1 local exists, remote exists, remote matches - do nothing") {
|
describe("#1 local exists, remote exists, remote matches - do nothing") {
|
||||||
val theHash = MD5Hash("the-hash")
|
val theHash = MD5Hash("the-hash")
|
||||||
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey, fileToHash)
|
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey)
|
||||||
val theRemoteMetadata = RemoteMetaData(theFile.remoteKey, theHash, lastModified)
|
val theRemoteMetadata = RemoteMetaData(theFile.remoteKey, theHash, lastModified)
|
||||||
val input = S3MetaData(theFile, // local exists
|
val input = S3MetaData(theFile, // local exists
|
||||||
matchByHash = Set(theRemoteMetadata), // remote matches
|
matchByHash = Set(theRemoteMetadata), // remote matches
|
||||||
|
@ -39,7 +37,7 @@ class ActionGeneratorSuite
|
||||||
}
|
}
|
||||||
describe("#2 local exists, remote is missing, other matches - copy") {
|
describe("#2 local exists, remote is missing, other matches - copy") {
|
||||||
val theHash = MD5Hash("the-hash")
|
val theHash = MD5Hash("the-hash")
|
||||||
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey, fileToHash)
|
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey)
|
||||||
val theRemoteKey = theFile.remoteKey
|
val theRemoteKey = theFile.remoteKey
|
||||||
val otherRemoteKey = prefix.resolve("other-key")
|
val otherRemoteKey = prefix.resolve("other-key")
|
||||||
val otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash, lastModified)
|
val otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash, lastModified)
|
||||||
|
@ -54,7 +52,7 @@ class ActionGeneratorSuite
|
||||||
}
|
}
|
||||||
describe("#3 local exists, remote is missing, other no matches - upload") {
|
describe("#3 local exists, remote is missing, other no matches - upload") {
|
||||||
val theHash = MD5Hash("the-hash")
|
val theHash = MD5Hash("the-hash")
|
||||||
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey, fileToHash)
|
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey)
|
||||||
val input = S3MetaData(theFile, // local exists
|
val input = S3MetaData(theFile, // local exists
|
||||||
matchByHash = Set.empty, // other no matches
|
matchByHash = Set.empty, // other no matches
|
||||||
matchByKey = None) // remote is missing
|
matchByKey = None) // remote is missing
|
||||||
|
@ -66,7 +64,7 @@ class ActionGeneratorSuite
|
||||||
}
|
}
|
||||||
describe("#4 local exists, remote exists, remote no match, other matches - copy") {
|
describe("#4 local exists, remote exists, remote no match, other matches - copy") {
|
||||||
val theHash = MD5Hash("the-hash")
|
val theHash = MD5Hash("the-hash")
|
||||||
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey, fileToHash)
|
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey)
|
||||||
val theRemoteKey = theFile.remoteKey
|
val theRemoteKey = theFile.remoteKey
|
||||||
val oldHash = MD5Hash("old-hash")
|
val oldHash = MD5Hash("old-hash")
|
||||||
val otherRemoteKey = prefix.resolve("other-key")
|
val otherRemoteKey = prefix.resolve("other-key")
|
||||||
|
@ -85,7 +83,7 @@ class ActionGeneratorSuite
|
||||||
}
|
}
|
||||||
describe("#5 local exists, remote exists, remote no match, other no matches - upload") {
|
describe("#5 local exists, remote exists, remote no match, other no matches - upload") {
|
||||||
val theHash = MD5Hash("the-hash")
|
val theHash = MD5Hash("the-hash")
|
||||||
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey, fileToHash)
|
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey)
|
||||||
val theRemoteKey = theFile.remoteKey
|
val theRemoteKey = theFile.remoteKey
|
||||||
val oldHash = MD5Hash("old-hash")
|
val oldHash = MD5Hash("old-hash")
|
||||||
val theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash, lastModified)
|
val theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash, lastModified)
|
||||||
|
|
|
@ -2,6 +2,7 @@ package net.kemitix.s3thorp.core
|
||||||
|
|
||||||
import java.io.File
|
import java.io.File
|
||||||
|
|
||||||
|
import cats.effect.IO
|
||||||
import net.kemitix.s3thorp.domain.{Config, LocalFile, MD5Hash}
|
import net.kemitix.s3thorp.domain.{Config, LocalFile, MD5Hash}
|
||||||
import org.scalatest.FunSpec
|
import org.scalatest.FunSpec
|
||||||
|
|
||||||
|
@ -10,12 +11,12 @@ class LocalFileStreamSuite extends FunSpec {
|
||||||
val uploadResource = Resource(this, "upload")
|
val uploadResource = Resource(this, "upload")
|
||||||
val config: Config = Config(source = uploadResource)
|
val config: Config = Config(source = uploadResource)
|
||||||
implicit private val logInfo: Int => String => Unit = l => i => ()
|
implicit private val logInfo: Int => String => Unit = l => i => ()
|
||||||
val md5HashGenerator: File => MD5Hash = file => MD5HashGenerator.md5File(file)
|
val md5HashGenerator: File => IO[MD5Hash] = file => MD5HashGenerator.md5File(file)
|
||||||
|
|
||||||
describe("findFiles") {
|
describe("findFiles") {
|
||||||
it("should find all files") {
|
it("should find all files") {
|
||||||
val result: Set[String] =
|
val result: Set[String] =
|
||||||
LocalFileStream.findFiles(uploadResource, md5HashGenerator, logInfo)(config).toSet
|
LocalFileStream.findFiles(uploadResource, md5HashGenerator, logInfo)(config).unsafeRunSync.toSet
|
||||||
.map { x: LocalFile => x.relative.toString }
|
.map { x: LocalFile => x.relative.toString }
|
||||||
assertResult(Set("subdir/leaf-file", "root-file"))(result)
|
assertResult(Set("subdir/leaf-file", "root-file"))(result)
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
package net.kemitix.s3thorp.core
|
||||||
|
|
||||||
|
import net.kemitix.s3thorp.domain.MD5Hash
|
||||||
|
|
||||||
|
object MD5HashData {
|
||||||
|
|
||||||
|
val rootHash = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e")
|
||||||
|
|
||||||
|
val leafHash = MD5Hash("208386a650bdec61cfcd7bd8dcb6b542")
|
||||||
|
|
||||||
|
}
|
|
@ -2,6 +2,7 @@ package net.kemitix.s3thorp.core
|
||||||
|
|
||||||
import java.nio.file.Files
|
import java.nio.file.Files
|
||||||
|
|
||||||
|
import net.kemitix.s3thorp.core.MD5HashData.rootHash
|
||||||
import net.kemitix.s3thorp.domain.{Bucket, Config, MD5Hash, RemoteKey}
|
import net.kemitix.s3thorp.domain.{Bucket, Config, MD5Hash, RemoteKey}
|
||||||
import org.scalatest.FunSpec
|
import org.scalatest.FunSpec
|
||||||
|
|
||||||
|
@ -15,25 +16,23 @@ class MD5HashGeneratorTest extends FunSpec {
|
||||||
describe("read a small file (smaller than buffer)") {
|
describe("read a small file (smaller than buffer)") {
|
||||||
val file = Resource(this, "upload/root-file")
|
val file = Resource(this, "upload/root-file")
|
||||||
it("should generate the correct hash") {
|
it("should generate the correct hash") {
|
||||||
val expected = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e")
|
val result = MD5HashGenerator.md5File(file).unsafeRunSync
|
||||||
val result = MD5HashGenerator.md5File(file)
|
assertResult(rootHash)(result)
|
||||||
assertResult(expected)(result)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
describe("read a buffer") {
|
describe("read a buffer") {
|
||||||
val file = Resource(this, "upload/root-file")
|
val file = Resource(this, "upload/root-file")
|
||||||
val buffer: Array[Byte] = Files.readAllBytes(file.toPath)
|
val buffer: Array[Byte] = Files.readAllBytes(file.toPath)
|
||||||
it("should generate the correct hash") {
|
it("should generate the correct hash") {
|
||||||
val expected = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e")
|
|
||||||
val result = MD5HashGenerator.md5PartBody(buffer)
|
val result = MD5HashGenerator.md5PartBody(buffer)
|
||||||
assertResult(expected)(result)
|
assertResult(rootHash)(result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
describe("read a large file (bigger than buffer)") {
|
describe("read a large file (bigger than buffer)") {
|
||||||
val file = Resource(this, "big-file")
|
val file = Resource(this, "big-file")
|
||||||
it("should generate the correct hash") {
|
it("should generate the correct hash") {
|
||||||
val expected = MD5Hash("b1ab1f7680138e6db7309200584e35d8")
|
val expected = MD5Hash("b1ab1f7680138e6db7309200584e35d8")
|
||||||
val result = MD5HashGenerator.md5File(file)
|
val result = MD5HashGenerator.md5File(file).unsafeRunSync
|
||||||
assertResult(expected)(result)
|
assertResult(expected)(result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -44,14 +43,14 @@ class MD5HashGeneratorTest extends FunSpec {
|
||||||
describe("when starting at the beginning of the file") {
|
describe("when starting at the beginning of the file") {
|
||||||
it("should generate the correct hash") {
|
it("should generate the correct hash") {
|
||||||
val expected = MD5Hash("aadf0d266cefe0fcdb241a51798d74b3")
|
val expected = MD5Hash("aadf0d266cefe0fcdb241a51798d74b3")
|
||||||
val result = MD5HashGenerator.md5FilePart(file, 0, halfFileLength)
|
val result = MD5HashGenerator.md5FilePart(file, 0, halfFileLength).unsafeRunSync
|
||||||
assertResult(expected)(result)
|
assertResult(expected)(result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
describe("when starting in the middle of the file") {
|
describe("when starting in the middle of the file") {
|
||||||
it("should generate the correct hash") {
|
it("should generate the correct hash") {
|
||||||
val expected = MD5Hash("16e08d53ca36e729d808fd5e4f7e35dc")
|
val expected = MD5Hash("16e08d53ca36e729d808fd5e4f7e35dc")
|
||||||
val result = MD5HashGenerator.md5FilePart(file, halfFileLength, halfFileLength)
|
val result = MD5HashGenerator.md5FilePart(file, halfFileLength, halfFileLength).unsafeRunSync
|
||||||
assertResult(expected)(result)
|
assertResult(expected)(result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,9 +1,7 @@
|
||||||
package net.kemitix.s3thorp.core
|
package net.kemitix.s3thorp.core
|
||||||
|
|
||||||
import java.io.File
|
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
|
|
||||||
import net.kemitix.s3thorp.aws.api.S3Client
|
|
||||||
import net.kemitix.s3thorp.core.S3MetaDataEnricher.{getMetadata, getS3Status}
|
import net.kemitix.s3thorp.core.S3MetaDataEnricher.{getMetadata, getS3Status}
|
||||||
import net.kemitix.s3thorp.domain._
|
import net.kemitix.s3thorp.domain._
|
||||||
import org.scalatest.FunSpec
|
import org.scalatest.FunSpec
|
||||||
|
@ -16,14 +14,13 @@ class S3MetaDataEnricherSuite
|
||||||
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
|
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
|
||||||
implicit private val logInfo: Int => String => Unit = l => i => ()
|
implicit private val logInfo: Int => String => Unit = l => i => ()
|
||||||
private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _
|
private val fileToKey = KeyGenerator.generateKey(config.source, config.prefix) _
|
||||||
private val fileToHash = (file: File) => MD5HashGenerator.md5File(file)
|
|
||||||
val lastModified = LastModified(Instant.now())
|
val lastModified = LastModified(Instant.now())
|
||||||
|
|
||||||
describe("enrich with metadata") {
|
describe("enrich with metadata") {
|
||||||
|
|
||||||
describe("#1a local exists, remote exists, remote matches, other matches - do nothing") {
|
describe("#1a local exists, remote exists, remote matches, other matches - do nothing") {
|
||||||
val theHash: MD5Hash = MD5Hash("the-file-hash")
|
val theHash: MD5Hash = MD5Hash("the-file-hash")
|
||||||
val theFile: LocalFile = LocalFile.resolve("the-file", theHash, source, fileToKey, fileToHash)
|
val theFile: LocalFile = LocalFile.resolve("the-file", theHash, source, fileToKey)
|
||||||
val theRemoteKey: RemoteKey = theFile.remoteKey
|
val theRemoteKey: RemoteKey = theFile.remoteKey
|
||||||
val s3: S3ObjectsData = S3ObjectsData(
|
val s3: S3ObjectsData = S3ObjectsData(
|
||||||
byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))),
|
byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))),
|
||||||
|
@ -31,16 +28,16 @@ class S3MetaDataEnricherSuite
|
||||||
)
|
)
|
||||||
val theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash, lastModified)
|
val theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash, lastModified)
|
||||||
it("generates valid metadata") {
|
it("generates valid metadata") {
|
||||||
val expected = Stream(S3MetaData(theFile,
|
val expected = S3MetaData(theFile,
|
||||||
matchByHash = Set(theRemoteMetadata),
|
matchByHash = Set(theRemoteMetadata),
|
||||||
matchByKey = Some(theRemoteMetadata)))
|
matchByKey = Some(theRemoteMetadata))
|
||||||
val result = getMetadata(theFile, s3)
|
val result = getMetadata(theFile, s3)
|
||||||
assertResult(expected)(result)
|
assertResult(expected)(result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
describe("#1b local exists, remote exists, remote matches, other no matches - do nothing") {
|
describe("#1b local exists, remote exists, remote matches, other no matches - do nothing") {
|
||||||
val theHash: MD5Hash = MD5Hash("the-file-hash")
|
val theHash: MD5Hash = MD5Hash("the-file-hash")
|
||||||
val theFile: LocalFile = LocalFile.resolve("the-file", theHash, source, fileToKey, fileToHash)
|
val theFile: LocalFile = LocalFile.resolve("the-file", theHash, source, fileToKey)
|
||||||
val theRemoteKey: RemoteKey = prefix.resolve("the-file")
|
val theRemoteKey: RemoteKey = prefix.resolve("the-file")
|
||||||
val s3: S3ObjectsData = S3ObjectsData(
|
val s3: S3ObjectsData = S3ObjectsData(
|
||||||
byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))),
|
byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))),
|
||||||
|
@ -48,16 +45,16 @@ class S3MetaDataEnricherSuite
|
||||||
)
|
)
|
||||||
val theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash, lastModified)
|
val theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash, lastModified)
|
||||||
it("generates valid metadata") {
|
it("generates valid metadata") {
|
||||||
val expected = Stream(S3MetaData(theFile,
|
val expected = S3MetaData(theFile,
|
||||||
matchByHash = Set(theRemoteMetadata),
|
matchByHash = Set(theRemoteMetadata),
|
||||||
matchByKey = Some(theRemoteMetadata)))
|
matchByKey = Some(theRemoteMetadata))
|
||||||
val result = getMetadata(theFile, s3)
|
val result = getMetadata(theFile, s3)
|
||||||
assertResult(expected)(result)
|
assertResult(expected)(result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
describe("#2 local exists, remote is missing, remote no match, other matches - copy") {
|
describe("#2 local exists, remote is missing, remote no match, other matches - copy") {
|
||||||
val theHash = MD5Hash("the-hash")
|
val theHash = MD5Hash("the-hash")
|
||||||
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey, fileToHash)
|
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey)
|
||||||
val otherRemoteKey = RemoteKey("other-key")
|
val otherRemoteKey = RemoteKey("other-key")
|
||||||
val s3: S3ObjectsData = S3ObjectsData(
|
val s3: S3ObjectsData = S3ObjectsData(
|
||||||
byHash = Map(theHash -> Set(KeyModified(otherRemoteKey, lastModified))),
|
byHash = Map(theHash -> Set(KeyModified(otherRemoteKey, lastModified))),
|
||||||
|
@ -65,31 +62,31 @@ class S3MetaDataEnricherSuite
|
||||||
)
|
)
|
||||||
val otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash, lastModified)
|
val otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash, lastModified)
|
||||||
it("generates valid metadata") {
|
it("generates valid metadata") {
|
||||||
val expected = Stream(S3MetaData(theFile,
|
val expected = S3MetaData(theFile,
|
||||||
matchByHash = Set(otherRemoteMetadata),
|
matchByHash = Set(otherRemoteMetadata),
|
||||||
matchByKey = None))
|
matchByKey = None)
|
||||||
val result = getMetadata(theFile, s3)
|
val result = getMetadata(theFile, s3)
|
||||||
assertResult(expected)(result)
|
assertResult(expected)(result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
describe("#3 local exists, remote is missing, remote no match, other no matches - upload") {
|
describe("#3 local exists, remote is missing, remote no match, other no matches - upload") {
|
||||||
val theHash = MD5Hash("the-hash")
|
val theHash = MD5Hash("the-hash")
|
||||||
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey, fileToHash)
|
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey)
|
||||||
val s3: S3ObjectsData = S3ObjectsData(
|
val s3: S3ObjectsData = S3ObjectsData(
|
||||||
byHash = Map(),
|
byHash = Map(),
|
||||||
byKey = Map()
|
byKey = Map()
|
||||||
)
|
)
|
||||||
it("generates valid metadata") {
|
it("generates valid metadata") {
|
||||||
val expected = Stream(S3MetaData(theFile,
|
val expected = S3MetaData(theFile,
|
||||||
matchByHash = Set.empty,
|
matchByHash = Set.empty,
|
||||||
matchByKey = None))
|
matchByKey = None)
|
||||||
val result = getMetadata(theFile, s3)
|
val result = getMetadata(theFile, s3)
|
||||||
assertResult(expected)(result)
|
assertResult(expected)(result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
describe("#4 local exists, remote exists, remote no match, other matches - copy") {
|
describe("#4 local exists, remote exists, remote no match, other matches - copy") {
|
||||||
val theHash = MD5Hash("the-hash")
|
val theHash = MD5Hash("the-hash")
|
||||||
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey, fileToHash)
|
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey)
|
||||||
val theRemoteKey = theFile.remoteKey
|
val theRemoteKey = theFile.remoteKey
|
||||||
val oldHash = MD5Hash("old-hash")
|
val oldHash = MD5Hash("old-hash")
|
||||||
val otherRemoteKey = prefix.resolve("other-key")
|
val otherRemoteKey = prefix.resolve("other-key")
|
||||||
|
@ -105,16 +102,16 @@ class S3MetaDataEnricherSuite
|
||||||
val theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash, lastModified)
|
val theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash, lastModified)
|
||||||
val otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash, lastModified)
|
val otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash, lastModified)
|
||||||
it("generates valid metadata") {
|
it("generates valid metadata") {
|
||||||
val expected = Stream(S3MetaData(theFile,
|
val expected = S3MetaData(theFile,
|
||||||
matchByHash = Set(otherRemoteMetadata),
|
matchByHash = Set(otherRemoteMetadata),
|
||||||
matchByKey = Some(theRemoteMetadata)))
|
matchByKey = Some(theRemoteMetadata))
|
||||||
val result = getMetadata(theFile, s3)
|
val result = getMetadata(theFile, s3)
|
||||||
assertResult(expected)(result)
|
assertResult(expected)(result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
describe("#5 local exists, remote exists, remote no match, other no matches - upload") {
|
describe("#5 local exists, remote exists, remote no match, other no matches - upload") {
|
||||||
val theHash = MD5Hash("the-hash")
|
val theHash = MD5Hash("the-hash")
|
||||||
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey, fileToHash)
|
val theFile = LocalFile.resolve("the-file", theHash, source, fileToKey)
|
||||||
val theRemoteKey = theFile.remoteKey
|
val theRemoteKey = theFile.remoteKey
|
||||||
val oldHash = MD5Hash("old-hash")
|
val oldHash = MD5Hash("old-hash")
|
||||||
val s3: S3ObjectsData = S3ObjectsData(
|
val s3: S3ObjectsData = S3ObjectsData(
|
||||||
|
@ -127,9 +124,9 @@ class S3MetaDataEnricherSuite
|
||||||
)
|
)
|
||||||
val theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash, lastModified)
|
val theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash, lastModified)
|
||||||
it("generates valid metadata") {
|
it("generates valid metadata") {
|
||||||
val expected = Stream(S3MetaData(theFile,
|
val expected = S3MetaData(theFile,
|
||||||
matchByHash = Set.empty,
|
matchByHash = Set.empty,
|
||||||
matchByKey = Some(theRemoteMetadata)))
|
matchByKey = Some(theRemoteMetadata))
|
||||||
val result = getMetadata(theFile, s3)
|
val result = getMetadata(theFile, s3)
|
||||||
assertResult(expected)(result)
|
assertResult(expected)(result)
|
||||||
}
|
}
|
||||||
|
@ -138,20 +135,20 @@ class S3MetaDataEnricherSuite
|
||||||
|
|
||||||
describe("getS3Status") {
|
describe("getS3Status") {
|
||||||
val hash = MD5Hash("hash")
|
val hash = MD5Hash("hash")
|
||||||
val localFile = LocalFile.resolve("the-file", hash, source, fileToKey, fileToHash)
|
val localFile = LocalFile.resolve("the-file", hash, source, fileToKey)
|
||||||
val key = localFile.remoteKey
|
val key = localFile.remoteKey
|
||||||
val keyotherkey = LocalFile.resolve("other-key-same-hash", hash, source, fileToKey, fileToHash)
|
val keyOtherKey = LocalFile.resolve("other-key-same-hash", hash, source, fileToKey)
|
||||||
val diffhash = MD5Hash("diff")
|
val diffHash = MD5Hash("diff")
|
||||||
val keydiffhash = LocalFile.resolve("other-key-diff-hash", diffhash, source, fileToKey, fileToHash)
|
val keyDiffHash = LocalFile.resolve("other-key-diff-hash", diffHash, source, fileToKey)
|
||||||
val lastModified = LastModified(Instant.now)
|
val lastModified = LastModified(Instant.now)
|
||||||
val s3ObjectsData: S3ObjectsData = S3ObjectsData(
|
val s3ObjectsData: S3ObjectsData = S3ObjectsData(
|
||||||
byHash = Map(
|
byHash = Map(
|
||||||
hash -> Set(KeyModified(key, lastModified), KeyModified(keyotherkey.remoteKey, lastModified)),
|
hash -> Set(KeyModified(key, lastModified), KeyModified(keyOtherKey.remoteKey, lastModified)),
|
||||||
diffhash -> Set(KeyModified(keydiffhash.remoteKey, lastModified))),
|
diffHash -> Set(KeyModified(keyDiffHash.remoteKey, lastModified))),
|
||||||
byKey = Map(
|
byKey = Map(
|
||||||
key -> HashModified(hash, lastModified),
|
key -> HashModified(hash, lastModified),
|
||||||
keyotherkey.remoteKey -> HashModified(hash, lastModified),
|
keyOtherKey.remoteKey -> HashModified(hash, lastModified),
|
||||||
keydiffhash.remoteKey -> HashModified(diffhash, lastModified)))
|
keyDiffHash.remoteKey -> HashModified(diffHash, lastModified)))
|
||||||
|
|
||||||
def invoke(localFile: LocalFile) = {
|
def invoke(localFile: LocalFile) = {
|
||||||
getS3Status(localFile, s3ObjectsData)
|
getS3Status(localFile, s3ObjectsData)
|
||||||
|
@ -163,14 +160,14 @@ class S3MetaDataEnricherSuite
|
||||||
(Some(HashModified(hash, lastModified)),
|
(Some(HashModified(hash, lastModified)),
|
||||||
Set(
|
Set(
|
||||||
KeyModified(key, lastModified),
|
KeyModified(key, lastModified),
|
||||||
KeyModified(keyotherkey.remoteKey, lastModified)))
|
KeyModified(keyOtherKey.remoteKey, lastModified)))
|
||||||
)(invoke(localFile))
|
)(invoke(localFile))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
describe("when remote key does not exist and no others matches hash") {
|
describe("when remote key does not exist and no others matches hash") {
|
||||||
it("should return (None, Set.empty)") {
|
it("should return (None, Set.empty)") {
|
||||||
val localFile = LocalFile.resolve("missing-file", MD5Hash("unique"), source, fileToKey, fileToHash)
|
val localFile = LocalFile.resolve("missing-file", MD5Hash("unique"), source, fileToKey)
|
||||||
assertResult(
|
assertResult(
|
||||||
(None,
|
(None,
|
||||||
Set.empty)
|
Set.empty)
|
||||||
|
@ -181,9 +178,9 @@ class S3MetaDataEnricherSuite
|
||||||
describe("when remote key exists and no others match hash") {
|
describe("when remote key exists and no others match hash") {
|
||||||
it("should return (None, Set.nonEmpty)") {
|
it("should return (None, Set.nonEmpty)") {
|
||||||
assertResult(
|
assertResult(
|
||||||
(Some(HashModified(diffhash, lastModified)),
|
(Some(HashModified(diffHash, lastModified)),
|
||||||
Set(KeyModified(keydiffhash.remoteKey, lastModified)))
|
Set(KeyModified(keyDiffHash.remoteKey, lastModified)))
|
||||||
)(invoke(keydiffhash))
|
)(invoke(keyDiffHash))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,7 @@ import java.time.Instant
|
||||||
import cats.effect.IO
|
import cats.effect.IO
|
||||||
import net.kemitix.s3thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action, UploadS3Action}
|
import net.kemitix.s3thorp.aws.api.S3Action.{CopyS3Action, DeleteS3Action, UploadS3Action}
|
||||||
import net.kemitix.s3thorp.aws.api.{S3Client, UploadProgressListener}
|
import net.kemitix.s3thorp.aws.api.{S3Client, UploadProgressListener}
|
||||||
|
import net.kemitix.s3thorp.core.MD5HashData.{leafHash, rootHash}
|
||||||
import net.kemitix.s3thorp.domain._
|
import net.kemitix.s3thorp.domain._
|
||||||
import org.scalatest.FunSpec
|
import org.scalatest.FunSpec
|
||||||
|
|
||||||
|
@ -17,16 +18,13 @@ class SyncSuite
|
||||||
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
|
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
|
||||||
implicit private val logInfo: Int => String => Unit = l => i => ()
|
implicit private val logInfo: Int => String => Unit = l => i => ()
|
||||||
implicit private val logWarn: String => Unit = w => ()
|
implicit private val logWarn: String => Unit = w => ()
|
||||||
def logError: String => Unit = e => ()
|
private def logError: String => Unit = e => ()
|
||||||
private val lastModified = LastModified(Instant.now)
|
private val lastModified = LastModified(Instant.now)
|
||||||
val fileToKey: File => RemoteKey = KeyGenerator.generateKey(source, prefix)
|
private val fileToKey: File => RemoteKey = KeyGenerator.generateKey(source, prefix)
|
||||||
val fileToHash = (file: File) => MD5HashGenerator.md5File(file)
|
private val rootFile = LocalFile.resolve("root-file", rootHash, source, fileToKey)
|
||||||
val rootHash = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e")
|
private val leafFile = LocalFile.resolve("subdir/leaf-file", leafHash, source, fileToKey)
|
||||||
val leafHash = MD5Hash("208386a650bdec61cfcd7bd8dcb6b542")
|
|
||||||
val rootFile = LocalFile.resolve("root-file", rootHash, source, fileToKey, fileToHash)
|
|
||||||
val leafFile = LocalFile.resolve("subdir/leaf-file", leafHash, source, fileToKey, fileToHash)
|
|
||||||
|
|
||||||
val md5HashGenerator: File => MD5Hash = file => MD5HashGenerator.md5File(file)
|
private val md5HashGenerator = MD5HashGenerator.md5File(_)
|
||||||
|
|
||||||
def putObjectRequest(bucket: Bucket, remoteKey: RemoteKey, localFile: LocalFile) = {
|
def putObjectRequest(bucket: Bucket, remoteKey: RemoteKey, localFile: LocalFile) = {
|
||||||
(bucket.name, remoteKey.key, localFile.file)
|
(bucket.name, remoteKey.key, localFile.file)
|
||||||
|
@ -84,8 +82,6 @@ class SyncSuite
|
||||||
}
|
}
|
||||||
describe("when a file is renamed it is moved on S3 with no upload") {
|
describe("when a file is renamed it is moved on S3 with no upload") {
|
||||||
// 'root-file-old' should be renamed as 'root-file'
|
// 'root-file-old' should be renamed as 'root-file'
|
||||||
val rootHash = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e")
|
|
||||||
val leafHash = MD5Hash("208386a650bdec61cfcd7bd8dcb6b542")
|
|
||||||
val s3ObjectsData = S3ObjectsData(
|
val s3ObjectsData = S3ObjectsData(
|
||||||
byHash = Map(
|
byHash = Map(
|
||||||
rootHash -> Set(KeyModified(RemoteKey("prefix/root-file-old"), lastModified)),
|
rootHash -> Set(KeyModified(RemoteKey("prefix/root-file-old"), lastModified)),
|
||||||
|
@ -128,7 +124,7 @@ class SyncSuite
|
||||||
assertResult(expectedDeletions)(s3Client.deletionsRecord)
|
assertResult(expectedDeletions)(s3Client.deletionsRecord)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
describe("when a file is file is excluded") {
|
describe("when a file is excluded") {
|
||||||
val configWithExclusion = config.copy(excludes = List(Exclude("leaf")))
|
val configWithExclusion = config.copy(excludes = List(Exclude("leaf")))
|
||||||
val s3ObjectsData = S3ObjectsData(Map(), Map())
|
val s3ObjectsData = S3ObjectsData(Map(), Map())
|
||||||
val s3Client = new RecordingClient(testBucket, s3ObjectsData)
|
val s3Client = new RecordingClient(testBucket, s3ObjectsData)
|
||||||
|
|
|
@ -3,19 +3,10 @@ package net.kemitix.s3thorp.domain
|
||||||
import java.io.File
|
import java.io.File
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
|
|
||||||
final case class LocalFile(
|
final case class LocalFile(file: File, source: File, hash: MD5Hash, keyGenerator: File => RemoteKey) {
|
||||||
file: File,
|
|
||||||
source: File,
|
|
||||||
keyGenerator: File => RemoteKey,
|
|
||||||
md5HashGenerator: File => MD5Hash,
|
|
||||||
suppliedHash: Option[MD5Hash] = None) {
|
|
||||||
|
|
||||||
require(!file.isDirectory, s"LocalFile must not be a directory: $file")
|
require(!file.isDirectory, s"LocalFile must not be a directory: $file")
|
||||||
|
|
||||||
private lazy val myhash = suppliedHash.getOrElse(md5HashGenerator(file))
|
|
||||||
|
|
||||||
def hash: MD5Hash = myhash
|
|
||||||
|
|
||||||
// the equivalent location of the file on S3
|
// the equivalent location of the file on S3
|
||||||
def remoteKey: RemoteKey = keyGenerator(file)
|
def remoteKey: RemoteKey = keyGenerator(file)
|
||||||
|
|
||||||
|
@ -28,14 +19,10 @@ final case class LocalFile(
|
||||||
|
|
||||||
object LocalFile {
|
object LocalFile {
|
||||||
def resolve(path: String,
|
def resolve(path: String,
|
||||||
myHash: MD5Hash,
|
md5Hash: MD5Hash,
|
||||||
source: File,
|
source: File,
|
||||||
fileToKey: File => RemoteKey,
|
fileToKey: File => RemoteKey): LocalFile = {
|
||||||
fileToHash: File => MD5Hash): LocalFile =
|
val file = source.toPath.resolve(path).toFile
|
||||||
LocalFile(
|
LocalFile(file, source, md5Hash, fileToKey)
|
||||||
file = source.toPath.resolve(path).toFile,
|
}
|
||||||
source = source,
|
|
||||||
keyGenerator = fileToKey,
|
|
||||||
md5HashGenerator = fileToHash,
|
|
||||||
suppliedHash = Some(myHash))
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue