Refactor ActionGenerator.genAction(S3MetaData,Stream,Bucket) (#143)
* [domain] rename S3MetaData as MatchedMetadata * [domain] rename S3ObjectsData as RemoteObjects * [core] ActionGenerator refactoring * [core] ActionGenerator.createAction renamed and no longer a stream * [core] ActionGenerator refactor * [core] ActionGenerator Usage of head on collections
This commit is contained in:
parent
e41e29127f
commit
07ca6b962f
16 changed files with 185 additions and 177 deletions
|
@ -3,45 +3,53 @@ package net.kemitix.thorp.core
|
|||
import net.kemitix.thorp.config.Config
|
||||
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToUpload}
|
||||
import net.kemitix.thorp.domain._
|
||||
import zio.ZIO
|
||||
import zio.RIO
|
||||
|
||||
object ActionGenerator {
|
||||
|
||||
def createActions(
|
||||
s3MetaData: S3MetaData,
|
||||
def createAction(
|
||||
matchedMetadata: MatchedMetadata,
|
||||
previousActions: Stream[Action]
|
||||
): ZIO[Config, Nothing, Stream[Action]] =
|
||||
): RIO[Config, Action] =
|
||||
for {
|
||||
bucket <- Config.bucket
|
||||
} yield genAction(s3MetaData, previousActions, bucket)
|
||||
} yield
|
||||
genAction(formattedMetadata(matchedMetadata, previousActions), bucket)
|
||||
|
||||
private def genAction(s3MetaData: S3MetaData,
|
||||
previousActions: Stream[Action],
|
||||
bucket: Bucket): Stream[Action] = {
|
||||
s3MetaData match {
|
||||
// #1 local exists, remote exists, remote matches - do nothing
|
||||
case S3MetaData(localFile, _, Some(RemoteMetaData(key, hash, _)))
|
||||
if LocalFile.matchesHash(localFile)(hash) =>
|
||||
doNothing(bucket, key)
|
||||
// #2 local exists, remote is missing, other matches - copy
|
||||
case S3MetaData(localFile, matchByHash, None) if matchByHash.nonEmpty =>
|
||||
copyFile(bucket, localFile, matchByHash)
|
||||
// #3 local exists, remote is missing, other no matches - upload
|
||||
case S3MetaData(localFile, matchByHash, None)
|
||||
if matchByHash.isEmpty &&
|
||||
isUploadAlreadyQueued(previousActions)(localFile) =>
|
||||
uploadFile(bucket, localFile)
|
||||
// #4 local exists, remote exists, remote no match, other matches - copy
|
||||
case S3MetaData(localFile, matchByHash, Some(RemoteMetaData(_, hash, _)))
|
||||
if !LocalFile.matchesHash(localFile)(hash) &&
|
||||
matchByHash.nonEmpty =>
|
||||
copyFile(bucket, localFile, matchByHash)
|
||||
// #5 local exists, remote exists, remote no match, other no matches - upload
|
||||
case S3MetaData(localFile, matchByHash, Some(_)) if matchByHash.isEmpty =>
|
||||
uploadFile(bucket, localFile)
|
||||
// fallback
|
||||
case S3MetaData(localFile, _, _) =>
|
||||
doNothing(bucket, localFile.remoteKey)
|
||||
private def formattedMetadata(
|
||||
matchedMetadata: MatchedMetadata,
|
||||
previousActions: Stream[Action]): TaggedMetadata = {
|
||||
val remoteExists = matchedMetadata.matchByKey.nonEmpty
|
||||
val remoteMatches = remoteExists && matchedMetadata.matchByKey.exists(m =>
|
||||
LocalFile.matchesHash(matchedMetadata.localFile)(m.hash))
|
||||
val anyMatches = matchedMetadata.matchByHash.nonEmpty
|
||||
TaggedMetadata(matchedMetadata,
|
||||
previousActions,
|
||||
remoteExists,
|
||||
remoteMatches,
|
||||
anyMatches)
|
||||
}
|
||||
|
||||
case class TaggedMetadata(
|
||||
matchedMetadata: MatchedMetadata,
|
||||
previousActions: Stream[Action],
|
||||
remoteExists: Boolean,
|
||||
remoteMatches: Boolean,
|
||||
anyMatches: Boolean
|
||||
)
|
||||
|
||||
private def genAction(taggedMetadata: TaggedMetadata,
|
||||
bucket: Bucket): Action = {
|
||||
taggedMetadata match {
|
||||
case TaggedMetadata(md, _, exists, matches, _) if exists && matches =>
|
||||
doNothing(bucket, md.localFile.remoteKey)
|
||||
case TaggedMetadata(md, _, _, _, any) if any =>
|
||||
copyFile(bucket, md.localFile, md.matchByHash.head)
|
||||
case TaggedMetadata(md, previous, _, _, _)
|
||||
if isUploadAlreadyQueued(previous)(md.localFile) =>
|
||||
uploadFile(bucket, md.localFile)
|
||||
case TaggedMetadata(md, _, _, _, _) =>
|
||||
doNothing(bucket, md.localFile.remoteKey)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -59,29 +67,22 @@ object ActionGenerator {
|
|||
private def doNothing(
|
||||
bucket: Bucket,
|
||||
remoteKey: RemoteKey
|
||||
) =
|
||||
Stream(DoNothing(bucket, remoteKey, 0L))
|
||||
) = DoNothing(bucket, remoteKey, 0L)
|
||||
|
||||
private def uploadFile(
|
||||
bucket: Bucket,
|
||||
localFile: LocalFile
|
||||
) =
|
||||
Stream(ToUpload(bucket, localFile, localFile.file.length))
|
||||
) = ToUpload(bucket, localFile, localFile.file.length)
|
||||
|
||||
private def copyFile(
|
||||
bucket: Bucket,
|
||||
localFile: LocalFile,
|
||||
matchByHash: Set[RemoteMetaData]
|
||||
): Stream[Action] =
|
||||
matchByHash
|
||||
.map { remoteMetaData =>
|
||||
ToCopy(bucket,
|
||||
remoteMetaData.remoteKey,
|
||||
remoteMetaData.hash,
|
||||
localFile.remoteKey,
|
||||
localFile.file.length)
|
||||
}
|
||||
.toStream
|
||||
.take(1)
|
||||
remoteMetaData: RemoteMetaData
|
||||
): Action =
|
||||
ToCopy(bucket,
|
||||
remoteMetaData.remoteKey,
|
||||
remoteMetaData.hash,
|
||||
localFile.remoteKey,
|
||||
localFile.file.length)
|
||||
|
||||
}
|
||||
|
|
|
@ -29,22 +29,22 @@ object PlanBuilder {
|
|||
objects <- Storage.list(bucket, prefix)
|
||||
} yield objects
|
||||
|
||||
private def assemblePlan(metadata: (S3ObjectsData, LocalFiles)) =
|
||||
private def assemblePlan(metadata: (RemoteObjects, LocalFiles)) =
|
||||
metadata match {
|
||||
case (remoteData, localData) =>
|
||||
createActions(remoteData, localData.localFiles)
|
||||
case (remoteObjects, localData) =>
|
||||
createActions(remoteObjects, localData.localFiles)
|
||||
.map(_.filter(doesSomething).sortBy(SequencePlan.order))
|
||||
.map(
|
||||
SyncPlan(_, SyncTotals(localData.count, localData.totalSizeBytes)))
|
||||
}
|
||||
|
||||
private def createActions(
|
||||
remoteData: S3ObjectsData,
|
||||
remoteObjects: RemoteObjects,
|
||||
localFiles: Stream[LocalFile]
|
||||
) =
|
||||
for {
|
||||
fileActions <- actionsForLocalFiles(remoteData, localFiles)
|
||||
remoteActions <- actionsForRemoteKeys(remoteData.byKey.keys)
|
||||
fileActions <- actionsForLocalFiles(remoteObjects, localFiles)
|
||||
remoteActions <- actionsForRemoteKeys(remoteObjects.byKey.keys)
|
||||
} yield fileActions ++ remoteActions
|
||||
|
||||
private def doesSomething: Action => Boolean = {
|
||||
|
@ -53,19 +53,19 @@ object PlanBuilder {
|
|||
}
|
||||
|
||||
private def actionsForLocalFiles(
|
||||
remoteData: S3ObjectsData,
|
||||
remoteObjects: RemoteObjects,
|
||||
localFiles: Stream[LocalFile]
|
||||
) =
|
||||
ZIO.foldLeft(localFiles)(Stream.empty[Action])((acc, localFile) =>
|
||||
createActionFromLocalFile(remoteData, acc, localFile).map(_ ++ acc))
|
||||
createActionFromLocalFile(remoteObjects, acc, localFile).map(_ #:: acc))
|
||||
|
||||
private def createActionFromLocalFile(
|
||||
remoteData: S3ObjectsData,
|
||||
remoteObjects: RemoteObjects,
|
||||
previousActions: Stream[Action],
|
||||
localFile: LocalFile
|
||||
) =
|
||||
ActionGenerator.createActions(
|
||||
S3MetaDataEnricher.getMetadata(localFile, remoteData),
|
||||
ActionGenerator.createAction(
|
||||
S3MetaDataEnricher.getMetadata(localFile, remoteObjects),
|
||||
previousActions)
|
||||
|
||||
private def actionsForRemoteKeys(remoteKeys: Iterable[RemoteKey]) =
|
||||
|
|
|
@ -6,10 +6,10 @@ object S3MetaDataEnricher {
|
|||
|
||||
def getMetadata(
|
||||
localFile: LocalFile,
|
||||
s3ObjectsData: S3ObjectsData
|
||||
): S3MetaData = {
|
||||
val (keyMatches, hashMatches) = getS3Status(localFile, s3ObjectsData)
|
||||
S3MetaData(
|
||||
remoteObjects: RemoteObjects
|
||||
): MatchedMetadata = {
|
||||
val (keyMatches, hashMatches) = getS3Status(localFile, remoteObjects)
|
||||
MatchedMetadata(
|
||||
localFile,
|
||||
matchByKey = keyMatches.map { hm =>
|
||||
RemoteMetaData(localFile.remoteKey, hm.hash, hm.modified)
|
||||
|
@ -22,13 +22,13 @@ object S3MetaDataEnricher {
|
|||
|
||||
def getS3Status(
|
||||
localFile: LocalFile,
|
||||
s3ObjectsData: S3ObjectsData
|
||||
remoteObjects: RemoteObjects
|
||||
): (Option[HashModified], Set[(MD5Hash, KeyModified)]) = {
|
||||
val matchingByKey = s3ObjectsData.byKey.get(localFile.remoteKey)
|
||||
val matchingByKey = remoteObjects.byKey.get(localFile.remoteKey)
|
||||
val matchingByHash = localFile.hashes
|
||||
.map {
|
||||
case (_, md5Hash) =>
|
||||
s3ObjectsData.byHash
|
||||
remoteObjects.byHash
|
||||
.getOrElse(md5Hash, Set())
|
||||
.map(km => (md5Hash, km))
|
||||
}
|
||||
|
|
|
@ -41,7 +41,7 @@ class ActionGeneratorSuite extends FunSpec {
|
|||
theRemoteMetadata = RemoteMetaData(theFile.remoteKey,
|
||||
theHash,
|
||||
lastModified)
|
||||
input = S3MetaData(
|
||||
input = MatchedMetadata(
|
||||
theFile, // local exists
|
||||
matchByHash = Set(theRemoteMetadata), // remote matches
|
||||
matchByKey = Some(theRemoteMetadata) // remote exists
|
||||
|
@ -72,7 +72,7 @@ class ActionGeneratorSuite extends FunSpec {
|
|||
otherRemoteMetadata = RemoteMetaData(otherRemoteKey,
|
||||
theHash,
|
||||
lastModified)
|
||||
input = S3MetaData(
|
||||
input = MatchedMetadata(
|
||||
theFile, // local exists
|
||||
matchByHash = Set(otherRemoteMetadata), // other matches
|
||||
matchByKey = None) // remote is missing
|
||||
|
@ -100,9 +100,9 @@ class ActionGeneratorSuite extends FunSpec {
|
|||
sourcePath,
|
||||
sources,
|
||||
prefix)
|
||||
input = S3MetaData(theFile, // local exists
|
||||
matchByHash = Set.empty, // other no matches
|
||||
matchByKey = None) // remote is missing
|
||||
input = MatchedMetadata(theFile, // local exists
|
||||
matchByHash = Set.empty, // other no matches
|
||||
matchByKey = None) // remote is missing
|
||||
} yield (theFile, input)
|
||||
it("upload") {
|
||||
env.map({
|
||||
|
@ -134,7 +134,7 @@ class ActionGeneratorSuite extends FunSpec {
|
|||
oldRemoteMetadata = RemoteMetaData(theRemoteKey,
|
||||
hash = oldHash, // remote no match
|
||||
lastModified = lastModified)
|
||||
input = S3MetaData(
|
||||
input = MatchedMetadata(
|
||||
theFile, // local exists
|
||||
matchByHash = Set(otherRemoteMetadata), // other matches
|
||||
matchByKey = Some(oldRemoteMetadata)) // remote exists
|
||||
|
@ -167,7 +167,7 @@ class ActionGeneratorSuite extends FunSpec {
|
|||
theRemoteKey = theFile.remoteKey
|
||||
oldHash = MD5Hash("old-hash")
|
||||
theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash, lastModified)
|
||||
input = S3MetaData(
|
||||
input = MatchedMetadata(
|
||||
theFile, // local exists
|
||||
matchByHash = Set.empty, // remote no match, other no match
|
||||
matchByKey = Some(theRemoteMetadata) // remote exists
|
||||
|
@ -196,7 +196,7 @@ class ActionGeneratorSuite extends FunSpec {
|
|||
}
|
||||
|
||||
private def invoke(
|
||||
input: S3MetaData,
|
||||
input: MatchedMetadata,
|
||||
previousActions: Stream[Action]
|
||||
) = {
|
||||
type TestEnv = Config with FileSystem
|
||||
|
@ -206,7 +206,7 @@ class ActionGeneratorSuite extends FunSpec {
|
|||
for {
|
||||
config <- ConfigurationBuilder.buildConfig(configOptions)
|
||||
_ <- Config.set(config)
|
||||
actions <- ActionGenerator.createActions(input, previousActions)
|
||||
actions <- ActionGenerator.createAction(input, previousActions)
|
||||
} yield actions
|
||||
|
||||
new DefaultRuntime {}.unsafeRunSync {
|
||||
|
|
|
@ -7,7 +7,7 @@ import net.kemitix.thorp.domain._
|
|||
import net.kemitix.thorp.storage.api.Storage
|
||||
import zio.{RIO, UIO}
|
||||
|
||||
case class DummyStorageService(s3ObjectData: S3ObjectsData,
|
||||
case class DummyStorageService(remoteObjects: RemoteObjects,
|
||||
uploadFiles: Map[File, (RemoteKey, MD5Hash)])
|
||||
extends Storage.Service {
|
||||
|
||||
|
@ -17,8 +17,8 @@ case class DummyStorageService(s3ObjectData: S3ObjectsData,
|
|||
override def listObjects(
|
||||
bucket: Bucket,
|
||||
prefix: RemoteKey
|
||||
): RIO[Console, S3ObjectsData] =
|
||||
RIO(s3ObjectData)
|
||||
): RIO[Console, RemoteObjects] =
|
||||
RIO(remoteObjects)
|
||||
|
||||
override def upload(
|
||||
localFile: LocalFile,
|
||||
|
|
|
@ -56,7 +56,7 @@ class LocalFileStreamSuite extends FunSpec {
|
|||
with Hasher.Test
|
||||
val testEnv: TestEnv = new Storage.Test with Console.Test with Config.Live
|
||||
with FileSystem.Live with Hasher.Test {
|
||||
override def listResult: Task[S3ObjectsData] =
|
||||
override def listResult: Task[RemoteObjects] =
|
||||
Task.die(new NotImplementedError)
|
||||
override def uploadResult: UIO[StorageQueueEvent] =
|
||||
Task.die(new NotImplementedError)
|
||||
|
|
|
@ -8,12 +8,12 @@ import net.kemitix.thorp.domain.HashType.MD5
|
|||
import net.kemitix.thorp.domain._
|
||||
import org.scalatest.FunSpec
|
||||
|
||||
class S3MetaDataEnricherSuite extends FunSpec {
|
||||
val lastModified = LastModified(Instant.now())
|
||||
private val source = Resource(this, "upload")
|
||||
private val sourcePath = source.toPath
|
||||
private val sources = Sources(List(sourcePath))
|
||||
private val prefix = RemoteKey("prefix")
|
||||
class MatchedMetadataEnricherSuite extends FunSpec {
|
||||
private val lastModified = LastModified(Instant.now())
|
||||
private val source = Resource(this, "upload")
|
||||
private val sourcePath = source.toPath
|
||||
private val sources = Sources(List(sourcePath))
|
||||
private val prefix = RemoteKey("prefix")
|
||||
|
||||
def getMatchesByKey(
|
||||
status: (Option[HashModified], Set[(MD5Hash, KeyModified)]))
|
||||
|
@ -41,19 +41,19 @@ class S3MetaDataEnricherSuite extends FunSpec {
|
|||
sources,
|
||||
prefix)
|
||||
theRemoteKey = theFile.remoteKey
|
||||
s3 = S3ObjectsData(
|
||||
remoteObjects = RemoteObjects(
|
||||
byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))),
|
||||
byKey = Map(theRemoteKey -> HashModified(theHash, lastModified))
|
||||
)
|
||||
theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash, lastModified)
|
||||
} yield (theFile, theRemoteMetadata, s3)
|
||||
} yield (theFile, theRemoteMetadata, remoteObjects)
|
||||
it("generates valid metadata") {
|
||||
env.map({
|
||||
case (theFile, theRemoteMetadata, s3) => {
|
||||
val expected = S3MetaData(theFile,
|
||||
matchByHash = Set(theRemoteMetadata),
|
||||
matchByKey = Some(theRemoteMetadata))
|
||||
val result = getMetadata(theFile, s3)
|
||||
case (theFile, theRemoteMetadata, remoteObjects) => {
|
||||
val expected = MatchedMetadata(theFile,
|
||||
matchByHash = Set(theRemoteMetadata),
|
||||
matchByKey = Some(theRemoteMetadata))
|
||||
val result = getMetadata(theFile, remoteObjects)
|
||||
assertResult(expected)(result)
|
||||
}
|
||||
})
|
||||
|
@ -69,19 +69,19 @@ class S3MetaDataEnricherSuite extends FunSpec {
|
|||
sources,
|
||||
prefix)
|
||||
theRemoteKey: RemoteKey = RemoteKey.resolve("the-file")(prefix)
|
||||
s3: S3ObjectsData = S3ObjectsData(
|
||||
remoteObjects = RemoteObjects(
|
||||
byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))),
|
||||
byKey = Map(theRemoteKey -> HashModified(theHash, lastModified))
|
||||
)
|
||||
theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash, lastModified)
|
||||
} yield (theFile, theRemoteMetadata, s3)
|
||||
} yield (theFile, theRemoteMetadata, remoteObjects)
|
||||
it("generates valid metadata") {
|
||||
env.map({
|
||||
case (theFile, theRemoteMetadata, s3) => {
|
||||
val expected = S3MetaData(theFile,
|
||||
matchByHash = Set(theRemoteMetadata),
|
||||
matchByKey = Some(theRemoteMetadata))
|
||||
val result = getMetadata(theFile, s3)
|
||||
case (theFile, theRemoteMetadata, remoteObjects) => {
|
||||
val expected = MatchedMetadata(theFile,
|
||||
matchByHash = Set(theRemoteMetadata),
|
||||
matchByKey = Some(theRemoteMetadata))
|
||||
val result = getMetadata(theFile, remoteObjects)
|
||||
assertResult(expected)(result)
|
||||
}
|
||||
})
|
||||
|
@ -97,7 +97,7 @@ class S3MetaDataEnricherSuite extends FunSpec {
|
|||
sources,
|
||||
prefix)
|
||||
otherRemoteKey = RemoteKey("other-key")
|
||||
s3: S3ObjectsData = S3ObjectsData(
|
||||
remoteObjects = RemoteObjects(
|
||||
byHash =
|
||||
Map(theHash -> Set(KeyModified(otherRemoteKey, lastModified))),
|
||||
byKey = Map(otherRemoteKey -> HashModified(theHash, lastModified))
|
||||
|
@ -105,14 +105,15 @@ class S3MetaDataEnricherSuite extends FunSpec {
|
|||
otherRemoteMetadata = RemoteMetaData(otherRemoteKey,
|
||||
theHash,
|
||||
lastModified)
|
||||
} yield (theFile, otherRemoteMetadata, s3)
|
||||
} yield (theFile, otherRemoteMetadata, remoteObjects)
|
||||
it("generates valid metadata") {
|
||||
env.map({
|
||||
case (theFile, otherRemoteMetadata, s3) => {
|
||||
val expected = S3MetaData(theFile,
|
||||
matchByHash = Set(otherRemoteMetadata),
|
||||
matchByKey = None)
|
||||
val result = getMetadata(theFile, s3)
|
||||
case (theFile, otherRemoteMetadata, remoteObjects) => {
|
||||
val expected = MatchedMetadata(theFile,
|
||||
matchByHash =
|
||||
Set(otherRemoteMetadata),
|
||||
matchByKey = None)
|
||||
val result = getMetadata(theFile, remoteObjects)
|
||||
assertResult(expected)(result)
|
||||
}
|
||||
})
|
||||
|
@ -127,14 +128,16 @@ class S3MetaDataEnricherSuite extends FunSpec {
|
|||
sourcePath,
|
||||
sources,
|
||||
prefix)
|
||||
s3: S3ObjectsData = S3ObjectsData()
|
||||
} yield (theFile, s3)
|
||||
remoteObjects = RemoteObjects()
|
||||
} yield (theFile, remoteObjects)
|
||||
it("generates valid metadata") {
|
||||
env.map({
|
||||
case (theFile, s3) => {
|
||||
case (theFile, remoteObjects) => {
|
||||
val expected =
|
||||
S3MetaData(theFile, matchByHash = Set.empty, matchByKey = None)
|
||||
val result = getMetadata(theFile, s3)
|
||||
MatchedMetadata(theFile,
|
||||
matchByHash = Set.empty,
|
||||
matchByKey = None)
|
||||
val result = getMetadata(theFile, remoteObjects)
|
||||
assertResult(expected)(result)
|
||||
}
|
||||
})
|
||||
|
@ -152,7 +155,7 @@ class S3MetaDataEnricherSuite extends FunSpec {
|
|||
theRemoteKey = theFile.remoteKey
|
||||
oldHash = MD5Hash("old-hash")
|
||||
otherRemoteKey = RemoteKey.resolve("other-key")(prefix)
|
||||
s3: S3ObjectsData = S3ObjectsData(
|
||||
remoteObjects = RemoteObjects(
|
||||
byHash =
|
||||
Map(oldHash -> Set(KeyModified(theRemoteKey, lastModified)),
|
||||
theHash -> Set(KeyModified(otherRemoteKey, lastModified))),
|
||||
|
@ -165,14 +168,18 @@ class S3MetaDataEnricherSuite extends FunSpec {
|
|||
otherRemoteMetadata = RemoteMetaData(otherRemoteKey,
|
||||
theHash,
|
||||
lastModified)
|
||||
} yield (theFile, theRemoteMetadata, otherRemoteMetadata, s3)
|
||||
} yield (theFile, theRemoteMetadata, otherRemoteMetadata, remoteObjects)
|
||||
it("generates valid metadata") {
|
||||
env.map({
|
||||
case (theFile, theRemoteMetadata, otherRemoteMetadata, s3) => {
|
||||
val expected = S3MetaData(theFile,
|
||||
matchByHash = Set(otherRemoteMetadata),
|
||||
matchByKey = Some(theRemoteMetadata))
|
||||
val result = getMetadata(theFile, s3)
|
||||
case (theFile,
|
||||
theRemoteMetadata,
|
||||
otherRemoteMetadata,
|
||||
remoteObjects) => {
|
||||
val expected = MatchedMetadata(theFile,
|
||||
matchByHash =
|
||||
Set(otherRemoteMetadata),
|
||||
matchByKey = Some(theRemoteMetadata))
|
||||
val result = getMetadata(theFile, remoteObjects)
|
||||
assertResult(expected)(result)
|
||||
}
|
||||
})
|
||||
|
@ -189,7 +196,7 @@ class S3MetaDataEnricherSuite extends FunSpec {
|
|||
prefix)
|
||||
theRemoteKey = theFile.remoteKey
|
||||
oldHash = MD5Hash("old-hash")
|
||||
s3: S3ObjectsData = S3ObjectsData(
|
||||
remoteObjects = RemoteObjects(
|
||||
byHash = Map(oldHash -> Set(KeyModified(theRemoteKey, lastModified)),
|
||||
theHash -> Set.empty),
|
||||
byKey = Map(
|
||||
|
@ -197,14 +204,14 @@ class S3MetaDataEnricherSuite extends FunSpec {
|
|||
)
|
||||
)
|
||||
theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash, lastModified)
|
||||
} yield (theFile, theRemoteMetadata, s3)
|
||||
} yield (theFile, theRemoteMetadata, remoteObjects)
|
||||
it("generates valid metadata") {
|
||||
env.map({
|
||||
case (theFile, theRemoteMetadata, s3) => {
|
||||
val expected = S3MetaData(theFile,
|
||||
matchByHash = Set.empty,
|
||||
matchByKey = Some(theRemoteMetadata))
|
||||
val result = getMetadata(theFile, s3)
|
||||
case (theFile, theRemoteMetadata, remoteObjects) => {
|
||||
val expected = MatchedMetadata(theFile,
|
||||
matchByHash = Set.empty,
|
||||
matchByKey = Some(theRemoteMetadata))
|
||||
val result = getMetadata(theFile, remoteObjects)
|
||||
assertResult(expected)(result)
|
||||
}
|
||||
})
|
||||
|
@ -237,7 +244,7 @@ class S3MetaDataEnricherSuite extends FunSpec {
|
|||
sources,
|
||||
prefix)
|
||||
lastModified = LastModified(Instant.now)
|
||||
s3ObjectsData = S3ObjectsData(
|
||||
remoteObjects = RemoteObjects(
|
||||
byHash = Map(
|
||||
hash -> Set(KeyModified(key, lastModified),
|
||||
KeyModified(keyOtherKey.remoteKey, lastModified)),
|
||||
|
@ -249,17 +256,17 @@ class S3MetaDataEnricherSuite extends FunSpec {
|
|||
keyDiffHash.remoteKey -> HashModified(diffHash, lastModified)
|
||||
)
|
||||
)
|
||||
} yield (s3ObjectsData, localFile, keyDiffHash, diffHash)
|
||||
} yield (remoteObjects, localFile, keyDiffHash, diffHash)
|
||||
|
||||
def invoke(localFile: LocalFile, s3ObjectsData: S3ObjectsData) = {
|
||||
def invoke(localFile: LocalFile, s3ObjectsData: RemoteObjects) = {
|
||||
getS3Status(localFile, s3ObjectsData)
|
||||
}
|
||||
|
||||
describe("when remote key exists") {
|
||||
it("should return a result for matching key") {
|
||||
env.map({
|
||||
case (s3ObjectsData, localFile: LocalFile, _, _) =>
|
||||
val result = getMatchesByKey(invoke(localFile, s3ObjectsData))
|
||||
case (remoteObjects, localFile: LocalFile, _, _) =>
|
||||
val result = getMatchesByKey(invoke(localFile, remoteObjects))
|
||||
assert(result.contains(HashModified(hash, lastModified)))
|
||||
})
|
||||
}
|
||||
|
@ -275,10 +282,10 @@ class S3MetaDataEnricherSuite extends FunSpec {
|
|||
} yield (localFile)
|
||||
it("should return no matches by key") {
|
||||
env.map({
|
||||
case (s3ObjectsData, _, _, _) => {
|
||||
case (remoteObjects, _, _, _) => {
|
||||
env2.map({
|
||||
case (localFile) => {
|
||||
val result = getMatchesByKey(invoke(localFile, s3ObjectsData))
|
||||
val result = getMatchesByKey(invoke(localFile, remoteObjects))
|
||||
assert(result.isEmpty)
|
||||
}
|
||||
})
|
||||
|
@ -287,10 +294,10 @@ class S3MetaDataEnricherSuite extends FunSpec {
|
|||
}
|
||||
it("should return no matches by hash") {
|
||||
env.map({
|
||||
case (s3ObjectsData, _, _, _) => {
|
||||
case (remoteObjects, _, _, _) => {
|
||||
env2.map({
|
||||
case (localFile) => {
|
||||
val result = getMatchesByHash(invoke(localFile, s3ObjectsData))
|
||||
val result = getMatchesByHash(invoke(localFile, remoteObjects))
|
||||
assert(result.isEmpty)
|
||||
}
|
||||
})
|
||||
|
@ -301,13 +308,13 @@ class S3MetaDataEnricherSuite extends FunSpec {
|
|||
|
||||
describe("when remote key exists and no others match hash") {
|
||||
env.map({
|
||||
case (s3ObjectsData, _, keyDiffHash, diffHash) => {
|
||||
case (remoteObjects, _, keyDiffHash, diffHash) => {
|
||||
it("should return match by key") {
|
||||
val result = getMatchesByKey(invoke(keyDiffHash, s3ObjectsData))
|
||||
val result = getMatchesByKey(invoke(keyDiffHash, remoteObjects))
|
||||
assert(result.contains(HashModified(diffHash, lastModified)))
|
||||
}
|
||||
it("should return only itself in match by hash") {
|
||||
val result = getMatchesByHash(invoke(keyDiffHash, s3ObjectsData))
|
||||
val result = getMatchesByHash(invoke(keyDiffHash, remoteObjects))
|
||||
assert(
|
||||
result.equals(Set(
|
||||
(diffHash, KeyModified(keyDiffHash.remoteKey, lastModified)))))
|
|
@ -22,7 +22,7 @@ import zio.{DefaultRuntime, Task, UIO}
|
|||
class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
||||
|
||||
private val lastModified: LastModified = LastModified()
|
||||
private val emptyS3ObjectData = S3ObjectsData()
|
||||
private val emptyRemoteObjects = RemoteObjects()
|
||||
|
||||
"create a plan" - {
|
||||
|
||||
|
@ -46,7 +46,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
Right(List(toUpload(remoteKey, hash, source, file)))
|
||||
val result =
|
||||
invoke(options(source),
|
||||
UIO.succeed(emptyS3ObjectData),
|
||||
UIO.succeed(emptyRemoteObjects),
|
||||
UIO.succeed(Map(file.toPath -> file)))
|
||||
assertResult(expected)(result)
|
||||
})
|
||||
|
@ -62,14 +62,14 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
val aHash = md5Hash(aFile)
|
||||
val anOtherKey = RemoteKey("other")
|
||||
val expected = Right(List(toCopy(anOtherKey, aHash, remoteKey)))
|
||||
val s3ObjectsData = S3ObjectsData(
|
||||
val remoteObjects = RemoteObjects(
|
||||
byHash =
|
||||
Map(aHash -> Set(KeyModified(anOtherKey, lastModified))),
|
||||
byKey = Map(anOtherKey -> HashModified(aHash, lastModified))
|
||||
)
|
||||
val result =
|
||||
invoke(options(source),
|
||||
UIO.succeed(s3ObjectsData),
|
||||
UIO.succeed(remoteObjects),
|
||||
UIO.succeed(Map(aFile.toPath -> aFile,
|
||||
anOtherFile.toPath -> anOtherFile)))
|
||||
assertResult(expected)(result)
|
||||
|
@ -85,14 +85,14 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
val hash = md5Hash(file)
|
||||
// DoNothing actions should have been filtered out of the plan
|
||||
val expected = Right(List())
|
||||
val s3ObjectsData = S3ObjectsData(
|
||||
val remoteObjects = RemoteObjects(
|
||||
byHash =
|
||||
Map(hash -> Set(KeyModified(remoteKey, lastModified))),
|
||||
byKey = Map(remoteKey -> HashModified(hash, lastModified))
|
||||
)
|
||||
val result =
|
||||
invoke(options(source),
|
||||
UIO.succeed(s3ObjectsData),
|
||||
UIO.succeed(remoteObjects),
|
||||
UIO.succeed(Map(file.toPath -> file)))
|
||||
assertResult(expected)(result)
|
||||
})
|
||||
|
@ -107,7 +107,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
val originalHash = MD5Hash("original-file-content")
|
||||
val expected =
|
||||
Right(List(toUpload(remoteKey, currentHash, source, file)))
|
||||
val s3ObjectsData = S3ObjectsData(
|
||||
val remoteObjects = RemoteObjects(
|
||||
byHash = Map(originalHash -> Set(
|
||||
KeyModified(remoteKey, lastModified))),
|
||||
byKey =
|
||||
|
@ -115,7 +115,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
)
|
||||
val result =
|
||||
invoke(options(source),
|
||||
UIO.succeed(s3ObjectsData),
|
||||
UIO.succeed(remoteObjects),
|
||||
UIO.succeed(Map(file.toPath -> file)))
|
||||
assertResult(expected)(result)
|
||||
})
|
||||
|
@ -128,14 +128,14 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
val hash = md5Hash(file)
|
||||
val sourceKey = RemoteKey("other-key")
|
||||
val expected = Right(List(toCopy(sourceKey, hash, remoteKey)))
|
||||
val s3ObjectsData = S3ObjectsData(
|
||||
val remoteObjects = RemoteObjects(
|
||||
byHash =
|
||||
Map(hash -> Set(KeyModified(sourceKey, lastModified))),
|
||||
byKey = Map()
|
||||
)
|
||||
val result =
|
||||
invoke(options(source),
|
||||
UIO.succeed(s3ObjectsData),
|
||||
UIO.succeed(remoteObjects),
|
||||
UIO.succeed(Map(file.toPath -> file)))
|
||||
assertResult(expected)(result)
|
||||
})
|
||||
|
@ -154,13 +154,13 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
val hash = md5Hash(file)
|
||||
// DoNothing actions should have been filtered out of the plan
|
||||
val expected = Right(List())
|
||||
val s3ObjectsData = S3ObjectsData(
|
||||
val remoteObjects = RemoteObjects(
|
||||
byHash = Map(hash -> Set(KeyModified(remoteKey, lastModified))),
|
||||
byKey = Map(remoteKey -> HashModified(hash, lastModified))
|
||||
)
|
||||
val result =
|
||||
invoke(options(source),
|
||||
UIO.succeed(s3ObjectsData),
|
||||
UIO.succeed(remoteObjects),
|
||||
UIO.succeed(Map(file.toPath -> file)))
|
||||
assertResult(expected)(result)
|
||||
})
|
||||
|
@ -171,13 +171,13 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
withDirectory(source => {
|
||||
val hash = MD5Hash("file-content")
|
||||
val expected = Right(List(toDelete(remoteKey)))
|
||||
val s3ObjectsData = S3ObjectsData(
|
||||
val remoteObjects = RemoteObjects(
|
||||
byHash = Map(hash -> Set(KeyModified(remoteKey, lastModified))),
|
||||
byKey = Map(remoteKey -> HashModified(hash, lastModified))
|
||||
)
|
||||
val result =
|
||||
invoke(options(source),
|
||||
UIO.succeed(s3ObjectsData),
|
||||
UIO.succeed(remoteObjects),
|
||||
UIO.succeed(Map.empty))
|
||||
assertResult(expected)(result)
|
||||
})
|
||||
|
@ -215,7 +215,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
val result =
|
||||
invoke(
|
||||
options(firstSource)(secondSource),
|
||||
UIO.succeed(emptyS3ObjectData),
|
||||
UIO.succeed(emptyRemoteObjects),
|
||||
UIO.succeed(
|
||||
Map(fileInFirstSource.toPath -> fileInFirstSource,
|
||||
fileInSecondSource.toPath -> fileInSecondSource))
|
||||
|
@ -240,7 +240,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
val result =
|
||||
invoke(
|
||||
options(firstSource)(secondSource),
|
||||
UIO.succeed(emptyS3ObjectData),
|
||||
UIO.succeed(emptyRemoteObjects),
|
||||
UIO.succeed(
|
||||
Map(fileInFirstSource.toPath -> fileInFirstSource,
|
||||
fileInSecondSource.toPath -> fileInSecondSource))
|
||||
|
@ -258,13 +258,13 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
createFile(secondSource, filename2, "file-2-content")
|
||||
val hash2 = md5Hash(fileInSecondSource)
|
||||
val expected = Right(List())
|
||||
val s3ObjectData = S3ObjectsData(
|
||||
val remoteObjects = RemoteObjects(
|
||||
byHash =
|
||||
Map(hash2 -> Set(KeyModified(remoteKey2, lastModified))),
|
||||
byKey = Map(remoteKey2 -> HashModified(hash2, lastModified)))
|
||||
val result =
|
||||
invoke(options(firstSource)(secondSource),
|
||||
UIO.succeed(s3ObjectData),
|
||||
UIO.succeed(remoteObjects),
|
||||
UIO.succeed(
|
||||
Map(fileInSecondSource.toPath -> fileInSecondSource)))
|
||||
assertResult(expected)(result)
|
||||
|
@ -280,13 +280,13 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
val hash1 = md5Hash(fileInFirstSource)
|
||||
withDirectory(secondSource => {
|
||||
val expected = Right(List())
|
||||
val s3ObjectData = S3ObjectsData(
|
||||
val remoteObjects = RemoteObjects(
|
||||
byHash =
|
||||
Map(hash1 -> Set(KeyModified(remoteKey1, lastModified))),
|
||||
byKey = Map(remoteKey1 -> HashModified(hash1, lastModified)))
|
||||
val result =
|
||||
invoke(options(firstSource)(secondSource),
|
||||
UIO.succeed(s3ObjectData),
|
||||
UIO.succeed(remoteObjects),
|
||||
UIO.succeed(
|
||||
Map(fileInFirstSource.toPath -> fileInFirstSource)))
|
||||
assertResult(expected)(result)
|
||||
|
@ -299,11 +299,11 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
withDirectory(firstSource => {
|
||||
withDirectory(secondSource => {
|
||||
val expected = Right(List(toDelete(remoteKey1)))
|
||||
val s3ObjectData = S3ObjectsData(byKey =
|
||||
val remoteObjects = RemoteObjects(byKey =
|
||||
Map(remoteKey1 -> HashModified(MD5Hash(""), lastModified)))
|
||||
val result =
|
||||
invoke(options(firstSource)(secondSource),
|
||||
UIO.succeed(s3ObjectData),
|
||||
UIO.succeed(remoteObjects),
|
||||
UIO.succeed(Map.empty))
|
||||
assertResult(expected)(result)
|
||||
})
|
||||
|
@ -354,13 +354,13 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
|||
|
||||
private def invoke(
|
||||
configOptions: ConfigOptions,
|
||||
result: Task[S3ObjectsData],
|
||||
result: Task[RemoteObjects],
|
||||
files: Task[Map[Path, File]]
|
||||
) = {
|
||||
type TestEnv = Storage with Console with Config with FileSystem with Hasher
|
||||
val testEnv: TestEnv = new Storage.Test with Console.Test with Config.Live
|
||||
with FileSystem.Live with Hasher.Live {
|
||||
override def listResult: Task[S3ObjectsData] = result
|
||||
override def listResult: Task[RemoteObjects] = result
|
||||
override def uploadResult: UIO[StorageQueueEvent] =
|
||||
Task.die(new NotImplementedError)
|
||||
override def copyResult: UIO[StorageQueueEvent] =
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package net.kemitix.thorp.domain
|
||||
|
||||
// For the LocalFile, the set of matching S3 objects with the same MD5Hash, and any S3 object with the same remote key
|
||||
final case class S3MetaData(
|
||||
final case class MatchedMetadata(
|
||||
localFile: LocalFile,
|
||||
matchByHash: Set[RemoteMetaData],
|
||||
matchByKey: Option[RemoteMetaData]
|
|
@ -3,7 +3,7 @@ package net.kemitix.thorp.domain
|
|||
/**
|
||||
* A list of objects and their MD5 hash values.
|
||||
*/
|
||||
final case class S3ObjectsData(
|
||||
final case class RemoteObjects(
|
||||
byHash: Map[MD5Hash, Set[KeyModified]] = Map.empty,
|
||||
byKey: Map[RemoteKey, HashModified] = Map.empty
|
||||
)
|
|
@ -15,7 +15,7 @@ object Storage {
|
|||
def listObjects(
|
||||
bucket: Bucket,
|
||||
prefix: RemoteKey
|
||||
): RIO[Storage with Console, S3ObjectsData]
|
||||
): RIO[Storage with Console, RemoteObjects]
|
||||
|
||||
def upload(
|
||||
localFile: LocalFile,
|
||||
|
@ -40,7 +40,7 @@ object Storage {
|
|||
|
||||
trait Test extends Storage {
|
||||
|
||||
def listResult: Task[S3ObjectsData]
|
||||
def listResult: Task[RemoteObjects]
|
||||
def uploadResult: UIO[StorageQueueEvent]
|
||||
def copyResult: UIO[StorageQueueEvent]
|
||||
def deleteResult: UIO[StorageQueueEvent]
|
||||
|
@ -50,7 +50,7 @@ object Storage {
|
|||
|
||||
override def listObjects(
|
||||
bucket: Bucket,
|
||||
prefix: RemoteKey): RIO[Storage with Console, S3ObjectsData] =
|
||||
prefix: RemoteKey): RIO[Storage with Console, RemoteObjects] =
|
||||
listResult
|
||||
|
||||
override def upload(
|
||||
|
@ -78,7 +78,7 @@ object Storage {
|
|||
}
|
||||
|
||||
object Test extends Test {
|
||||
override def listResult: Task[S3ObjectsData] =
|
||||
override def listResult: Task[RemoteObjects] =
|
||||
Task.die(new NotImplementedError)
|
||||
override def uploadResult: UIO[StorageQueueEvent] =
|
||||
Task.die(new NotImplementedError)
|
||||
|
@ -91,7 +91,7 @@ object Storage {
|
|||
}
|
||||
|
||||
final def list(bucket: Bucket,
|
||||
prefix: RemoteKey): RIO[Storage with Console, S3ObjectsData] =
|
||||
prefix: RemoteKey): RIO[Storage with Console, RemoteObjects] =
|
||||
ZIO.accessM(_.storage listObjects (bucket, prefix))
|
||||
|
||||
final def upload(
|
||||
|
|
|
@ -6,7 +6,7 @@ import com.amazonaws.services.s3.model.{
|
|||
S3ObjectSummary
|
||||
}
|
||||
import net.kemitix.thorp.console._
|
||||
import net.kemitix.thorp.domain.{Bucket, RemoteKey, S3ObjectsData}
|
||||
import net.kemitix.thorp.domain.{Bucket, RemoteKey, RemoteObjects}
|
||||
import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash
|
||||
import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey
|
||||
import zio.{Task, RIO}
|
||||
|
@ -21,7 +21,7 @@ trait Lister {
|
|||
def listObjects(amazonS3: AmazonS3.Client)(
|
||||
bucket: Bucket,
|
||||
prefix: RemoteKey
|
||||
): RIO[Console, S3ObjectsData] = {
|
||||
): RIO[Console, RemoteObjects] = {
|
||||
|
||||
def request =
|
||||
new ListObjectsV2Request()
|
||||
|
@ -48,7 +48,7 @@ trait Lister {
|
|||
|
||||
fetch(request)
|
||||
.map(summaries => {
|
||||
S3ObjectsData(byHash(summaries), byKey(summaries))
|
||||
RemoteObjects(byHash(summaries), byKey(summaries))
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ object S3Storage {
|
|||
AmazonTransferManager(TransferManagerBuilder.defaultTransferManager)
|
||||
|
||||
override def listObjects(bucket: Bucket,
|
||||
prefix: RemoteKey): RIO[Console, S3ObjectsData] =
|
||||
prefix: RemoteKey): RIO[Console, RemoteObjects] =
|
||||
Lister.listObjects(client)(bucket, prefix)
|
||||
|
||||
override def upload(
|
||||
|
|
|
@ -26,7 +26,7 @@ trait AmazonS3ClientTestFixture extends MockFactory {
|
|||
override def listObjects(
|
||||
bucket: Bucket,
|
||||
prefix: RemoteKey
|
||||
): RIO[Console, S3ObjectsData] =
|
||||
): RIO[Console, RemoteObjects] =
|
||||
Lister.listObjects(client)(bucket, prefix)
|
||||
|
||||
override def upload(
|
||||
|
|
|
@ -34,7 +34,7 @@ class ListerTest extends FreeSpec {
|
|||
RemoteKey(key) -> HashModified(MD5Hash(etag),
|
||||
LastModified(nowInstant))
|
||||
)
|
||||
val expected = Right(S3ObjectsData(expectedHashMap, expectedKeyMap))
|
||||
val expected = Right(RemoteObjects(expectedHashMap, expectedKeyMap))
|
||||
new AmazonS3ClientTestFixture {
|
||||
(fixture.amazonS3Client.listObjectsV2 _)
|
||||
.when()
|
||||
|
@ -65,7 +65,7 @@ class ListerTest extends FreeSpec {
|
|||
RemoteKey(key2) -> HashModified(MD5Hash(etag2),
|
||||
LastModified(nowInstant))
|
||||
)
|
||||
val expected = Right(S3ObjectsData(expectedHashMap, expectedKeyMap))
|
||||
val expected = Right(RemoteObjects(expectedHashMap, expectedKeyMap))
|
||||
new AmazonS3ClientTestFixture {
|
||||
(fixture.amazonS3Client.listObjectsV2 _)
|
||||
.when()
|
||||
|
|
|
@ -38,7 +38,7 @@ class StorageServiceSuite extends FunSpec with MockFactory {
|
|||
sources,
|
||||
prefix)
|
||||
lastModified = LastModified(Instant.now)
|
||||
s3ObjectsData = S3ObjectsData(
|
||||
s3ObjectsData = RemoteObjects(
|
||||
byHash = Map(
|
||||
hash -> Set(KeyModified(key, lastModified),
|
||||
KeyModified(keyOtherKey.remoteKey, lastModified)),
|
||||
|
@ -59,7 +59,7 @@ class StorageServiceSuite extends FunSpec with MockFactory {
|
|||
diffHash,
|
||||
key)
|
||||
|
||||
def invoke(localFile: LocalFile, s3ObjectsData: S3ObjectsData) =
|
||||
def invoke(localFile: LocalFile, s3ObjectsData: RemoteObjects) =
|
||||
S3MetaDataEnricher.getS3Status(localFile, s3ObjectsData)
|
||||
|
||||
def getMatchesByKey(
|
||||
|
|
Loading…
Reference in a new issue