Upgrade to Scala 2.13.0 (#176)
This commit is contained in:
parent
64a01dea0c
commit
321773f04c
23 changed files with 162 additions and 131 deletions
|
@ -1,6 +1,6 @@
|
||||||
language: scala
|
language: scala
|
||||||
scala:
|
scala:
|
||||||
- 2.12.8
|
- 2.13.0
|
||||||
jdk:
|
jdk:
|
||||||
- openjdk8
|
- openjdk8
|
||||||
- openjdk11
|
- openjdk11
|
||||||
|
|
14
build.sbt
14
build.sbt
|
@ -16,17 +16,23 @@ inThisBuild(List(
|
||||||
|
|
||||||
val commonSettings = Seq(
|
val commonSettings = Seq(
|
||||||
sonatypeProfileName := "net.kemitix",
|
sonatypeProfileName := "net.kemitix",
|
||||||
scalaVersion := "2.12.8",
|
scalaVersion := "2.13.0",
|
||||||
scalacOptions ++= Seq(
|
scalacOptions ++= Seq(
|
||||||
"-Ywarn-unused-import",
|
"-Ywarn-unused:imports",
|
||||||
"-Xfatal-warnings",
|
"-Xfatal-warnings",
|
||||||
"-feature",
|
"-feature",
|
||||||
"-deprecation",
|
"-deprecation",
|
||||||
"-unchecked",
|
"-unchecked",
|
||||||
"-language:postfixOps",
|
"-language:postfixOps",
|
||||||
"-language:higherKinds",
|
"-language:higherKinds",
|
||||||
"-Ypartial-unification"),
|
"-language:higherKinds"),
|
||||||
wartremoverErrors ++= Warts.unsafe.filterNot(wart => List(Wart.Any, Wart.Nothing, Wart.Serializable).contains(wart)),
|
wartremoverErrors ++= Warts.unsafe.filterNot(wart => List(
|
||||||
|
Wart.Any,
|
||||||
|
Wart.Nothing,
|
||||||
|
Wart.Serializable,
|
||||||
|
Wart.NonUnitStatements,
|
||||||
|
Wart.StringPlusAny
|
||||||
|
).contains(wart)),
|
||||||
test in assembly := {}
|
test in assembly := {}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -10,8 +10,8 @@ object ActionGenerator {
|
||||||
|
|
||||||
def createActions(
|
def createActions(
|
||||||
matchedMetadata: MatchedMetadata,
|
matchedMetadata: MatchedMetadata,
|
||||||
previousActions: Stream[Action]
|
previousActions: LazyList[Action]
|
||||||
): RIO[Config, Stream[Action]] =
|
): RIO[Config, LazyList[Action]] =
|
||||||
for {
|
for {
|
||||||
bucket <- Config.bucket
|
bucket <- Config.bucket
|
||||||
} yield
|
} yield
|
||||||
|
@ -19,7 +19,7 @@ object ActionGenerator {
|
||||||
|
|
||||||
private def formattedMetadata(
|
private def formattedMetadata(
|
||||||
matchedMetadata: MatchedMetadata,
|
matchedMetadata: MatchedMetadata,
|
||||||
previousActions: Stream[Action]): TaggedMetadata = {
|
previousActions: LazyList[Action]): TaggedMetadata = {
|
||||||
val remoteExists = matchedMetadata.matchByKey.nonEmpty
|
val remoteExists = matchedMetadata.matchByKey.nonEmpty
|
||||||
val remoteMatches = remoteExists && matchedMetadata.matchByKey.exists(m =>
|
val remoteMatches = remoteExists && matchedMetadata.matchByKey.exists(m =>
|
||||||
LocalFile.matchesHash(matchedMetadata.localFile)(m.hash))
|
LocalFile.matchesHash(matchedMetadata.localFile)(m.hash))
|
||||||
|
@ -33,14 +33,14 @@ object ActionGenerator {
|
||||||
|
|
||||||
final case class TaggedMetadata(
|
final case class TaggedMetadata(
|
||||||
matchedMetadata: MatchedMetadata,
|
matchedMetadata: MatchedMetadata,
|
||||||
previousActions: Stream[Action],
|
previousActions: LazyList[Action],
|
||||||
remoteExists: Boolean,
|
remoteExists: Boolean,
|
||||||
remoteMatches: Boolean,
|
remoteMatches: Boolean,
|
||||||
anyMatches: Boolean
|
anyMatches: Boolean
|
||||||
)
|
)
|
||||||
|
|
||||||
private def genAction(taggedMetadata: TaggedMetadata,
|
private def genAction(taggedMetadata: TaggedMetadata,
|
||||||
bucket: Bucket): Stream[Action] = {
|
bucket: Bucket): LazyList[Action] = {
|
||||||
taggedMetadata match {
|
taggedMetadata match {
|
||||||
case TaggedMetadata(md, _, remoteExists, remoteMatches, _)
|
case TaggedMetadata(md, _, remoteExists, remoteMatches, _)
|
||||||
if remoteExists && remoteMatches =>
|
if remoteExists && remoteMatches =>
|
||||||
|
@ -58,7 +58,7 @@ object ActionGenerator {
|
||||||
private def key = LocalFile.remoteKey ^|-> RemoteKey.key
|
private def key = LocalFile.remoteKey ^|-> RemoteKey.key
|
||||||
|
|
||||||
def isNotUploadAlreadyQueued(
|
def isNotUploadAlreadyQueued(
|
||||||
previousActions: Stream[Action]
|
previousActions: LazyList[Action]
|
||||||
)(
|
)(
|
||||||
localFile: LocalFile
|
localFile: LocalFile
|
||||||
): Boolean = !previousActions.exists {
|
): Boolean = !previousActions.exists {
|
||||||
|
@ -69,21 +69,21 @@ object ActionGenerator {
|
||||||
private def doNothing(
|
private def doNothing(
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
remoteKey: RemoteKey
|
remoteKey: RemoteKey
|
||||||
) = Stream(DoNothing(bucket, remoteKey, 0L))
|
) = LazyList(DoNothing(bucket, remoteKey, 0L))
|
||||||
|
|
||||||
private def uploadFile(
|
private def uploadFile(
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
localFile: LocalFile
|
localFile: LocalFile
|
||||||
) = Stream(ToUpload(bucket, localFile, localFile.file.length))
|
) = LazyList(ToUpload(bucket, localFile, localFile.file.length))
|
||||||
|
|
||||||
private def copyFile(
|
private def copyFile(
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
localFile: LocalFile,
|
localFile: LocalFile,
|
||||||
remoteMetaData: Set[RemoteMetaData]
|
remoteMetaData: Set[RemoteMetaData]
|
||||||
) =
|
) =
|
||||||
remoteMetaData
|
LazyList
|
||||||
|
.from(remoteMetaData)
|
||||||
.take(1)
|
.take(1)
|
||||||
.toStream
|
|
||||||
.map(
|
.map(
|
||||||
other =>
|
other =>
|
||||||
ToCopy(bucket,
|
ToCopy(bucket,
|
||||||
|
|
|
@ -3,6 +3,6 @@ package net.kemitix.thorp.core
|
||||||
import net.kemitix.thorp.domain.StorageQueueEvent
|
import net.kemitix.thorp.domain.StorageQueueEvent
|
||||||
|
|
||||||
final case class EventQueue(
|
final case class EventQueue(
|
||||||
events: Stream[StorageQueueEvent],
|
events: LazyList[StorageQueueEvent],
|
||||||
bytesInQueue: Long
|
bytesInQueue: Long
|
||||||
)
|
)
|
||||||
|
|
|
@ -22,11 +22,11 @@ object LocalFileStream {
|
||||||
case _ => localFile(path)
|
case _ => localFile(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
def recurse(paths: Stream[Path])
|
def recurse(paths: LazyList[Path])
|
||||||
: RIO[Config with FileSystem with Hasher, LocalFiles] =
|
: RIO[Config with FileSystem with Hasher, LocalFiles] =
|
||||||
for {
|
for {
|
||||||
recursed <- ZIO.foreach(paths)(path => recurseIntoSubDirectories(path))
|
recursed <- ZIO.foreach(paths)(path => recurseIntoSubDirectories(path))
|
||||||
} yield LocalFiles.reduce(recursed.toStream)
|
} yield LocalFiles.reduce(LazyList.from(recursed))
|
||||||
|
|
||||||
def loop(path: Path): RIO[Config with FileSystem with Hasher, LocalFiles] =
|
def loop(path: Path): RIO[Config with FileSystem with Hasher, LocalFiles] =
|
||||||
dirPaths(path) >>= recurse
|
dirPaths(path) >>= recurse
|
||||||
|
@ -37,12 +37,13 @@ object LocalFileStream {
|
||||||
private def dirPaths(path: Path) =
|
private def dirPaths(path: Path) =
|
||||||
listFiles(path) >>= includedDirPaths
|
listFiles(path) >>= includedDirPaths
|
||||||
|
|
||||||
private def includedDirPaths(paths: Stream[Path]) =
|
private def includedDirPaths(paths: LazyList[Path]) =
|
||||||
for {
|
for {
|
||||||
flaggedPaths <- RIO.foreach(paths)(path =>
|
flaggedPaths <- RIO.foreach(paths)(path =>
|
||||||
isIncluded(path).map((path, _)))
|
isIncluded(path).map((path, _)))
|
||||||
} yield
|
} yield
|
||||||
flaggedPaths.toStream
|
LazyList
|
||||||
|
.from(flaggedPaths)
|
||||||
.filter({ case (_, included) => included })
|
.filter({ case (_, included) => included })
|
||||||
.map({ case (path, _) => path })
|
.map({ case (path, _) => path })
|
||||||
|
|
||||||
|
@ -63,7 +64,7 @@ object LocalFileStream {
|
||||||
for {
|
for {
|
||||||
files <- Task(path.toFile.listFiles)
|
files <- Task(path.toFile.listFiles)
|
||||||
_ <- filesMustExist(path, files)
|
_ <- filesMustExist(path, files)
|
||||||
} yield Stream(files: _*).map(_.toPath)
|
} yield LazyList.from(files.toIndexedSeq).map(_.toPath)
|
||||||
|
|
||||||
private def filesMustExist(path: Path, files: Array[File]) =
|
private def filesMustExist(path: Path, files: Array[File]) =
|
||||||
Task {
|
Task {
|
||||||
|
|
|
@ -3,7 +3,7 @@ package net.kemitix.thorp.core
|
||||||
import net.kemitix.thorp.domain.LocalFile
|
import net.kemitix.thorp.domain.LocalFile
|
||||||
|
|
||||||
final case class LocalFiles(
|
final case class LocalFiles(
|
||||||
localFiles: Stream[LocalFile],
|
localFiles: LazyList[LocalFile],
|
||||||
count: Long,
|
count: Long,
|
||||||
totalSizeBytes: Long
|
totalSizeBytes: Long
|
||||||
) {
|
) {
|
||||||
|
@ -16,9 +16,9 @@ final case class LocalFiles(
|
||||||
}
|
}
|
||||||
|
|
||||||
object LocalFiles {
|
object LocalFiles {
|
||||||
val empty: LocalFiles = LocalFiles(Stream.empty, 0L, 0L)
|
val empty: LocalFiles = LocalFiles(LazyList.empty, 0L, 0L)
|
||||||
def reduce: Stream[LocalFiles] => LocalFiles =
|
def reduce: LazyList[LocalFiles] => LocalFiles =
|
||||||
list => list.foldLeft(LocalFiles.empty)((acc, lf) => acc ++ lf)
|
list => list.foldLeft(LocalFiles.empty)((acc, lf) => acc ++ lf)
|
||||||
def one(localFile: LocalFile): LocalFiles =
|
def one(localFile: LocalFile): LocalFiles =
|
||||||
LocalFiles(Stream(localFile), 1, localFile.file.length)
|
LocalFiles(LazyList(localFile), 1, localFile.file.length)
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,14 +31,14 @@ object PlanBuilder {
|
||||||
.map(syncPlan(localData))
|
.map(syncPlan(localData))
|
||||||
}
|
}
|
||||||
|
|
||||||
private def syncPlan(localData: LocalFiles): Stream[Action] => SyncPlan =
|
private def syncPlan(localData: LocalFiles): LazyList[Action] => SyncPlan =
|
||||||
SyncPlan.create(_, syncTotal(localData))
|
SyncPlan.create(_, syncTotal(localData))
|
||||||
|
|
||||||
private def syncTotal(localData: LocalFiles): SyncTotals =
|
private def syncTotal(localData: LocalFiles): SyncTotals =
|
||||||
SyncTotals.create(localData.count, localData.totalSizeBytes, 0L)
|
SyncTotals.create(localData.count, localData.totalSizeBytes, 0L)
|
||||||
|
|
||||||
private def createActions(remoteObjects: RemoteObjects,
|
private def createActions(remoteObjects: RemoteObjects,
|
||||||
localFiles: Stream[LocalFile]) =
|
localFiles: LazyList[LocalFile]) =
|
||||||
for {
|
for {
|
||||||
fileActions <- actionsForLocalFiles(remoteObjects, localFiles)
|
fileActions <- actionsForLocalFiles(remoteObjects, localFiles)
|
||||||
remoteActions <- actionsForRemoteKeys(remoteObjects.byKey.keys)
|
remoteActions <- actionsForRemoteKeys(remoteObjects.byKey.keys)
|
||||||
|
@ -50,15 +50,15 @@ object PlanBuilder {
|
||||||
}
|
}
|
||||||
|
|
||||||
private def actionsForLocalFiles(remoteObjects: RemoteObjects,
|
private def actionsForLocalFiles(remoteObjects: RemoteObjects,
|
||||||
localFiles: Stream[LocalFile]) =
|
localFiles: LazyList[LocalFile]) =
|
||||||
ZIO.foldLeft(localFiles)(Stream.empty[Action])(
|
ZIO.foldLeft(localFiles)(LazyList.empty[Action])(
|
||||||
(acc, localFile) =>
|
(acc, localFile) =>
|
||||||
createActionsFromLocalFile(remoteObjects, acc, localFile)
|
createActionsFromLocalFile(remoteObjects, acc, localFile)
|
||||||
.map(_ #::: acc)
|
.map(_ #::: acc)
|
||||||
)
|
)
|
||||||
|
|
||||||
private def createActionsFromLocalFile(remoteObjects: RemoteObjects,
|
private def createActionsFromLocalFile(remoteObjects: RemoteObjects,
|
||||||
previousActions: Stream[Action],
|
previousActions: LazyList[Action],
|
||||||
localFile: LocalFile) =
|
localFile: LocalFile) =
|
||||||
ActionGenerator.createActions(
|
ActionGenerator.createActions(
|
||||||
S3MetaDataEnricher.getMetadata(localFile, remoteObjects),
|
S3MetaDataEnricher.getMetadata(localFile, remoteObjects),
|
||||||
|
@ -66,7 +66,7 @@ object PlanBuilder {
|
||||||
)
|
)
|
||||||
|
|
||||||
private def actionsForRemoteKeys(remoteKeys: Iterable[RemoteKey]) =
|
private def actionsForRemoteKeys(remoteKeys: Iterable[RemoteKey]) =
|
||||||
ZIO.foldLeft(remoteKeys)(Stream.empty[Action])(
|
ZIO.foldLeft(remoteKeys)(LazyList.empty[Action])(
|
||||||
(acc, remoteKey) => createActionFromRemoteKey(remoteKey).map(_ #:: acc)
|
(acc, remoteKey) => createActionFromRemoteKey(remoteKey).map(_ #:: acc)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -90,6 +90,6 @@ object PlanBuilder {
|
||||||
sources <- Config.sources
|
sources <- Config.sources
|
||||||
found <- ZIO.foreach(sources.paths)(LocalFileStream.findFiles)
|
found <- ZIO.foreach(sources.paths)(LocalFileStream.findFiles)
|
||||||
_ <- Console.putStrLn(s"Found ${found.flatMap(_.localFiles).size} files")
|
_ <- Console.putStrLn(s"Found ${found.flatMap(_.localFiles).size} files")
|
||||||
} yield LocalFiles.reduce(found.toStream)
|
} yield LocalFiles.reduce(LazyList.from(found))
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,9 +29,9 @@ trait PlanExecutor {
|
||||||
bytesCounter: Ref[Long]
|
bytesCounter: Ref[Long]
|
||||||
): ZIO[Storage with Console with Config,
|
): ZIO[Storage with Console with Config,
|
||||||
Throwable,
|
Throwable,
|
||||||
Stream[StorageQueueEvent]] = {
|
LazyList[StorageQueueEvent]] = {
|
||||||
ZIO.foldLeft(syncPlan.actions)(Stream.empty[StorageQueueEvent]) {
|
ZIO.foldLeft(syncPlan.actions)(LazyList.empty[StorageQueueEvent]) {
|
||||||
(stream: Stream[StorageQueueEvent], action) =>
|
(stream: LazyList[StorageQueueEvent], action) =>
|
||||||
val result: ZIO[Storage with Console with Config,
|
val result: ZIO[Storage with Console with Config,
|
||||||
Throwable,
|
Throwable,
|
||||||
StorageQueueEvent] =
|
StorageQueueEvent] =
|
||||||
|
|
|
@ -3,12 +3,12 @@ package net.kemitix.thorp.core
|
||||||
import net.kemitix.thorp.domain.SyncTotals
|
import net.kemitix.thorp.domain.SyncTotals
|
||||||
|
|
||||||
final case class SyncPlan private (
|
final case class SyncPlan private (
|
||||||
actions: Stream[Action],
|
actions: LazyList[Action],
|
||||||
syncTotals: SyncTotals
|
syncTotals: SyncTotals
|
||||||
)
|
)
|
||||||
|
|
||||||
object SyncPlan {
|
object SyncPlan {
|
||||||
val empty: SyncPlan = SyncPlan(Stream.empty, SyncTotals.empty)
|
val empty: SyncPlan = SyncPlan(LazyList.empty, SyncTotals.empty)
|
||||||
def create(actions: Stream[Action], syncTotals: SyncTotals): SyncPlan =
|
def create(actions: LazyList[Action], syncTotals: SyncTotals): SyncPlan =
|
||||||
SyncPlan(actions, syncTotals)
|
SyncPlan(actions, syncTotals)
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,7 @@ class ActionGeneratorSuite extends FunSpec {
|
||||||
|
|
||||||
describe("create actions") {
|
describe("create actions") {
|
||||||
|
|
||||||
val previousActions = Stream.empty[Action]
|
val previousActions = LazyList.empty[Action]
|
||||||
|
|
||||||
describe("#1 local exists, remote exists, remote matches - do nothing") {
|
describe("#1 local exists, remote exists, remote matches - do nothing") {
|
||||||
val theHash = MD5Hash("the-hash")
|
val theHash = MD5Hash("the-hash")
|
||||||
|
@ -46,7 +46,7 @@ class ActionGeneratorSuite extends FunSpec {
|
||||||
env.map({
|
env.map({
|
||||||
case (theFile, input) => {
|
case (theFile, input) => {
|
||||||
val expected =
|
val expected =
|
||||||
Right(Stream(
|
Right(LazyList(
|
||||||
DoNothing(bucket, theFile.remoteKey, theFile.file.length + 1)))
|
DoNothing(bucket, theFile.remoteKey, theFile.file.length + 1)))
|
||||||
val result = invoke(input, previousActions)
|
val result = invoke(input, previousActions)
|
||||||
assertResult(expected)(result)
|
assertResult(expected)(result)
|
||||||
|
@ -74,7 +74,7 @@ class ActionGeneratorSuite extends FunSpec {
|
||||||
env.map({
|
env.map({
|
||||||
case (theFile, theRemoteKey, input, otherRemoteKey) => {
|
case (theFile, theRemoteKey, input, otherRemoteKey) => {
|
||||||
val expected = Right(
|
val expected = Right(
|
||||||
Stream(
|
LazyList(
|
||||||
ToCopy(bucket,
|
ToCopy(bucket,
|
||||||
otherRemoteKey,
|
otherRemoteKey,
|
||||||
theHash,
|
theHash,
|
||||||
|
@ -100,7 +100,7 @@ class ActionGeneratorSuite extends FunSpec {
|
||||||
it("upload") {
|
it("upload") {
|
||||||
env.map({
|
env.map({
|
||||||
case (theFile, input) => {
|
case (theFile, input) => {
|
||||||
val expected = Right(Stream(
|
val expected = Right(LazyList(
|
||||||
ToUpload(bucket, theFile, theFile.file.length))) // upload
|
ToUpload(bucket, theFile, theFile.file.length))) // upload
|
||||||
val result = invoke(input, previousActions)
|
val result = invoke(input, previousActions)
|
||||||
assertResult(expected)(result)
|
assertResult(expected)(result)
|
||||||
|
@ -134,7 +134,7 @@ class ActionGeneratorSuite extends FunSpec {
|
||||||
env.map({
|
env.map({
|
||||||
case (theFile, theRemoteKey, input, otherRemoteKey) => {
|
case (theFile, theRemoteKey, input, otherRemoteKey) => {
|
||||||
val expected = Right(
|
val expected = Right(
|
||||||
Stream(
|
LazyList(
|
||||||
ToCopy(bucket,
|
ToCopy(bucket,
|
||||||
otherRemoteKey,
|
otherRemoteKey,
|
||||||
theHash,
|
theHash,
|
||||||
|
@ -167,8 +167,8 @@ class ActionGeneratorSuite extends FunSpec {
|
||||||
it("upload") {
|
it("upload") {
|
||||||
env.map({
|
env.map({
|
||||||
case (theFile, input) => {
|
case (theFile, input) => {
|
||||||
val expected = Right(
|
val expected = Right(LazyList(
|
||||||
Stream(ToUpload(bucket, theFile, theFile.file.length))) // upload
|
ToUpload(bucket, theFile, theFile.file.length))) // upload
|
||||||
val result = invoke(input, previousActions)
|
val result = invoke(input, previousActions)
|
||||||
assertResult(expected)(result)
|
assertResult(expected)(result)
|
||||||
}
|
}
|
||||||
|
@ -188,7 +188,7 @@ class ActionGeneratorSuite extends FunSpec {
|
||||||
|
|
||||||
private def invoke(
|
private def invoke(
|
||||||
input: MatchedMetadata,
|
input: MatchedMetadata,
|
||||||
previousActions: Stream[Action]
|
previousActions: LazyList[Action]
|
||||||
) = {
|
) = {
|
||||||
type TestEnv = Config with FileSystem
|
type TestEnv = Config with FileSystem
|
||||||
val testEnv: TestEnv = new Config.Live with FileSystem.Live {}
|
val testEnv: TestEnv = new Config.Live with FileSystem.Live {}
|
||||||
|
|
|
@ -6,6 +6,8 @@ import net.kemitix.thorp.domain.HashType.MD5
|
||||||
import net.kemitix.thorp.domain._
|
import net.kemitix.thorp.domain._
|
||||||
import org.scalatest.FunSpec
|
import org.scalatest.FunSpec
|
||||||
|
|
||||||
|
import scala.collection.MapView
|
||||||
|
|
||||||
class MatchedMetadataEnricherSuite extends FunSpec {
|
class MatchedMetadataEnricherSuite extends FunSpec {
|
||||||
private val source = Resource(this, "upload")
|
private val source = Resource(this, "upload")
|
||||||
private val sourcePath = source.toPath
|
private val sourcePath = source.toPath
|
||||||
|
@ -37,8 +39,8 @@ class MatchedMetadataEnricherSuite extends FunSpec {
|
||||||
prefix)
|
prefix)
|
||||||
theRemoteKey = theFile.remoteKey
|
theRemoteKey = theFile.remoteKey
|
||||||
remoteObjects = RemoteObjects(
|
remoteObjects = RemoteObjects(
|
||||||
byHash = Map(theHash -> Set(theRemoteKey)),
|
byHash = MapView(theHash -> Set(theRemoteKey)),
|
||||||
byKey = Map(theRemoteKey -> theHash)
|
byKey = MapView(theRemoteKey -> theHash)
|
||||||
)
|
)
|
||||||
theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash)
|
theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash)
|
||||||
} yield (theFile, theRemoteMetadata, remoteObjects)
|
} yield (theFile, theRemoteMetadata, remoteObjects)
|
||||||
|
@ -65,8 +67,8 @@ class MatchedMetadataEnricherSuite extends FunSpec {
|
||||||
prefix)
|
prefix)
|
||||||
theRemoteKey: RemoteKey = RemoteKey.resolve("the-file")(prefix)
|
theRemoteKey: RemoteKey = RemoteKey.resolve("the-file")(prefix)
|
||||||
remoteObjects = RemoteObjects(
|
remoteObjects = RemoteObjects(
|
||||||
byHash = Map(theHash -> Set(theRemoteKey)),
|
byHash = MapView(theHash -> Set(theRemoteKey)),
|
||||||
byKey = Map(theRemoteKey -> theHash)
|
byKey = MapView(theRemoteKey -> theHash)
|
||||||
)
|
)
|
||||||
theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash)
|
theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash)
|
||||||
} yield (theFile, theRemoteMetadata, remoteObjects)
|
} yield (theFile, theRemoteMetadata, remoteObjects)
|
||||||
|
@ -93,8 +95,8 @@ class MatchedMetadataEnricherSuite extends FunSpec {
|
||||||
prefix)
|
prefix)
|
||||||
otherRemoteKey = RemoteKey("other-key")
|
otherRemoteKey = RemoteKey("other-key")
|
||||||
remoteObjects = RemoteObjects(
|
remoteObjects = RemoteObjects(
|
||||||
byHash = Map(theHash -> Set(otherRemoteKey)),
|
byHash = MapView(theHash -> Set(otherRemoteKey)),
|
||||||
byKey = Map(otherRemoteKey -> theHash)
|
byKey = MapView(otherRemoteKey -> theHash)
|
||||||
)
|
)
|
||||||
otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash)
|
otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash)
|
||||||
} yield (theFile, otherRemoteMetadata, remoteObjects)
|
} yield (theFile, otherRemoteMetadata, remoteObjects)
|
||||||
|
@ -148,9 +150,9 @@ class MatchedMetadataEnricherSuite extends FunSpec {
|
||||||
oldHash = MD5Hash("old-hash")
|
oldHash = MD5Hash("old-hash")
|
||||||
otherRemoteKey = RemoteKey.resolve("other-key")(prefix)
|
otherRemoteKey = RemoteKey.resolve("other-key")(prefix)
|
||||||
remoteObjects = RemoteObjects(
|
remoteObjects = RemoteObjects(
|
||||||
byHash =
|
byHash = MapView(oldHash -> Set(theRemoteKey),
|
||||||
Map(oldHash -> Set(theRemoteKey), theHash -> Set(otherRemoteKey)),
|
theHash -> Set(otherRemoteKey)),
|
||||||
byKey = Map(
|
byKey = MapView(
|
||||||
theRemoteKey -> oldHash,
|
theRemoteKey -> oldHash,
|
||||||
otherRemoteKey -> theHash
|
otherRemoteKey -> theHash
|
||||||
)
|
)
|
||||||
|
@ -186,8 +188,8 @@ class MatchedMetadataEnricherSuite extends FunSpec {
|
||||||
theRemoteKey = theFile.remoteKey
|
theRemoteKey = theFile.remoteKey
|
||||||
oldHash = MD5Hash("old-hash")
|
oldHash = MD5Hash("old-hash")
|
||||||
remoteObjects = RemoteObjects(
|
remoteObjects = RemoteObjects(
|
||||||
byHash = Map(oldHash -> Set(theRemoteKey), theHash -> Set.empty),
|
byHash = MapView(oldHash -> Set(theRemoteKey), theHash -> Set.empty),
|
||||||
byKey = Map(theRemoteKey -> oldHash)
|
byKey = MapView(theRemoteKey -> oldHash)
|
||||||
)
|
)
|
||||||
theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash)
|
theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash)
|
||||||
} yield (theFile, theRemoteMetadata, remoteObjects)
|
} yield (theFile, theRemoteMetadata, remoteObjects)
|
||||||
|
@ -230,11 +232,11 @@ class MatchedMetadataEnricherSuite extends FunSpec {
|
||||||
sources,
|
sources,
|
||||||
prefix)
|
prefix)
|
||||||
remoteObjects = RemoteObjects(
|
remoteObjects = RemoteObjects(
|
||||||
byHash = Map(
|
byHash = MapView(
|
||||||
hash -> Set(key, keyOtherKey.remoteKey),
|
hash -> Set(key, keyOtherKey.remoteKey),
|
||||||
diffHash -> Set(keyDiffHash.remoteKey)
|
diffHash -> Set(keyDiffHash.remoteKey)
|
||||||
),
|
),
|
||||||
byKey = Map(
|
byKey = MapView(
|
||||||
key -> hash,
|
key -> hash,
|
||||||
keyOtherKey.remoteKey -> hash,
|
keyOtherKey.remoteKey -> hash,
|
||||||
keyDiffHash.remoteKey -> diffHash
|
keyDiffHash.remoteKey -> diffHash
|
||||||
|
|
|
@ -19,6 +19,8 @@ import net.kemitix.thorp.storage.api.Storage
|
||||||
import org.scalatest.FreeSpec
|
import org.scalatest.FreeSpec
|
||||||
import zio.{DefaultRuntime, Task, UIO}
|
import zio.{DefaultRuntime, Task, UIO}
|
||||||
|
|
||||||
|
import scala.collection.MapView
|
||||||
|
|
||||||
class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
||||||
|
|
||||||
private val emptyRemoteObjects = RemoteObjects.empty
|
private val emptyRemoteObjects = RemoteObjects.empty
|
||||||
|
@ -62,8 +64,8 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
||||||
val anOtherKey = RemoteKey("other")
|
val anOtherKey = RemoteKey("other")
|
||||||
val expected = Right(List(toCopy(anOtherKey, aHash, remoteKey)))
|
val expected = Right(List(toCopy(anOtherKey, aHash, remoteKey)))
|
||||||
val remoteObjects = RemoteObjects(
|
val remoteObjects = RemoteObjects(
|
||||||
byHash = Map(aHash -> Set(anOtherKey)),
|
byHash = MapView(aHash -> Set(anOtherKey)),
|
||||||
byKey = Map(anOtherKey -> aHash)
|
byKey = MapView(anOtherKey -> aHash)
|
||||||
)
|
)
|
||||||
val result =
|
val result =
|
||||||
invoke(options(source),
|
invoke(options(source),
|
||||||
|
@ -84,8 +86,8 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
||||||
// DoNothing actions should have been filtered out of the plan
|
// DoNothing actions should have been filtered out of the plan
|
||||||
val expected = Right(List())
|
val expected = Right(List())
|
||||||
val remoteObjects = RemoteObjects(
|
val remoteObjects = RemoteObjects(
|
||||||
byHash = Map(hash -> Set(remoteKey)),
|
byHash = MapView(hash -> Set(remoteKey)),
|
||||||
byKey = Map(remoteKey -> hash)
|
byKey = MapView(remoteKey -> hash)
|
||||||
)
|
)
|
||||||
val result =
|
val result =
|
||||||
invoke(options(source),
|
invoke(options(source),
|
||||||
|
@ -105,8 +107,8 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
||||||
val expected =
|
val expected =
|
||||||
Right(List(toUpload(remoteKey, currentHash, source, file)))
|
Right(List(toUpload(remoteKey, currentHash, source, file)))
|
||||||
val remoteObjects = RemoteObjects(
|
val remoteObjects = RemoteObjects(
|
||||||
byHash = Map(originalHash -> Set(remoteKey)),
|
byHash = MapView(originalHash -> Set(remoteKey)),
|
||||||
byKey = Map(remoteKey -> originalHash)
|
byKey = MapView(remoteKey -> originalHash)
|
||||||
)
|
)
|
||||||
val result =
|
val result =
|
||||||
invoke(options(source),
|
invoke(options(source),
|
||||||
|
@ -124,8 +126,8 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
||||||
val sourceKey = RemoteKey("other-key")
|
val sourceKey = RemoteKey("other-key")
|
||||||
val expected = Right(List(toCopy(sourceKey, hash, remoteKey)))
|
val expected = Right(List(toCopy(sourceKey, hash, remoteKey)))
|
||||||
val remoteObjects = RemoteObjects(
|
val remoteObjects = RemoteObjects(
|
||||||
byHash = Map(hash -> Set(sourceKey)),
|
byHash = MapView(hash -> Set(sourceKey)),
|
||||||
byKey = Map.empty
|
byKey = MapView.empty
|
||||||
)
|
)
|
||||||
val result =
|
val result =
|
||||||
invoke(options(source),
|
invoke(options(source),
|
||||||
|
@ -149,8 +151,8 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
||||||
// DoNothing actions should have been filtered out of the plan
|
// DoNothing actions should have been filtered out of the plan
|
||||||
val expected = Right(List())
|
val expected = Right(List())
|
||||||
val remoteObjects = RemoteObjects(
|
val remoteObjects = RemoteObjects(
|
||||||
byHash = Map(hash -> Set(remoteKey)),
|
byHash = MapView(hash -> Set(remoteKey)),
|
||||||
byKey = Map(remoteKey -> hash)
|
byKey = MapView(remoteKey -> hash)
|
||||||
)
|
)
|
||||||
val result =
|
val result =
|
||||||
invoke(options(source),
|
invoke(options(source),
|
||||||
|
@ -166,8 +168,8 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
||||||
val hash = MD5Hash("file-content")
|
val hash = MD5Hash("file-content")
|
||||||
val expected = Right(List(toDelete(remoteKey)))
|
val expected = Right(List(toDelete(remoteKey)))
|
||||||
val remoteObjects = RemoteObjects(
|
val remoteObjects = RemoteObjects(
|
||||||
byHash = Map(hash -> Set(remoteKey)),
|
byHash = MapView(hash -> Set(remoteKey)),
|
||||||
byKey = Map(remoteKey -> hash)
|
byKey = MapView(remoteKey -> hash)
|
||||||
)
|
)
|
||||||
val result =
|
val result =
|
||||||
invoke(options(source),
|
invoke(options(source),
|
||||||
|
@ -253,8 +255,8 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
||||||
val hash2 = md5Hash(fileInSecondSource)
|
val hash2 = md5Hash(fileInSecondSource)
|
||||||
val expected = Right(List())
|
val expected = Right(List())
|
||||||
val remoteObjects =
|
val remoteObjects =
|
||||||
RemoteObjects(byHash = Map(hash2 -> Set(remoteKey2)),
|
RemoteObjects(byHash = MapView(hash2 -> Set(remoteKey2)),
|
||||||
byKey = Map(remoteKey2 -> hash2))
|
byKey = MapView(remoteKey2 -> hash2))
|
||||||
val result =
|
val result =
|
||||||
invoke(options(firstSource)(secondSource),
|
invoke(options(firstSource)(secondSource),
|
||||||
UIO.succeed(remoteObjects),
|
UIO.succeed(remoteObjects),
|
||||||
|
@ -274,8 +276,8 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
||||||
withDirectory(secondSource => {
|
withDirectory(secondSource => {
|
||||||
val expected = Right(List())
|
val expected = Right(List())
|
||||||
val remoteObjects =
|
val remoteObjects =
|
||||||
RemoteObjects(byHash = Map(hash1 -> Set(remoteKey1)),
|
RemoteObjects(byHash = MapView(hash1 -> Set(remoteKey1)),
|
||||||
byKey = Map(remoteKey1 -> hash1))
|
byKey = MapView(remoteKey1 -> hash1))
|
||||||
val result =
|
val result =
|
||||||
invoke(options(firstSource)(secondSource),
|
invoke(options(firstSource)(secondSource),
|
||||||
UIO.succeed(remoteObjects),
|
UIO.succeed(remoteObjects),
|
||||||
|
@ -292,8 +294,8 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
|
||||||
withDirectory(secondSource => {
|
withDirectory(secondSource => {
|
||||||
val expected = Right(List(toDelete(remoteKey1)))
|
val expected = Right(List(toDelete(remoteKey1)))
|
||||||
val remoteObjects =
|
val remoteObjects =
|
||||||
RemoteObjects(byHash = Map.empty,
|
RemoteObjects(byHash = MapView.empty,
|
||||||
byKey = Map(remoteKey1 -> MD5Hash("")))
|
byKey = MapView(remoteKey1 -> MD5Hash("")))
|
||||||
val result =
|
val result =
|
||||||
invoke(options(firstSource)(secondSource),
|
invoke(options(firstSource)(secondSource),
|
||||||
UIO.succeed(remoteObjects),
|
UIO.succeed(remoteObjects),
|
||||||
|
|
|
@ -15,11 +15,11 @@ import zio.{DefaultRuntime, ZIO}
|
||||||
|
|
||||||
class PlanExecutorTest extends FreeSpec {
|
class PlanExecutorTest extends FreeSpec {
|
||||||
|
|
||||||
private def subject(in: Stream[Int]): ZIO[Any, Throwable, Stream[Int]] =
|
private def subject(in: LazyList[Int]): ZIO[Any, Throwable, LazyList[Int]] =
|
||||||
ZIO.foldLeft(in)(Stream.empty[Int])((s, i) => ZIO(i #:: s)).map(_.reverse)
|
ZIO.foldLeft(in)(LazyList.empty[Int])((s, i) => ZIO(i #:: s)).map(_.reverse)
|
||||||
|
|
||||||
"zio foreach on a stream can be a stream" in {
|
"zio foreach on a stream can be a stream" in {
|
||||||
val input = (1 to 1000000).toStream
|
val input = LazyList.from(1 to 1000000)
|
||||||
val program = subject(input)
|
val program = subject(input)
|
||||||
val result = new DefaultRuntime {}.unsafeRunSync(program).toEither
|
val result = new DefaultRuntime {}.unsafeRunSync(program).toEither
|
||||||
assertResult(Right(input))(result)
|
assertResult(Right(input))(result)
|
||||||
|
@ -29,7 +29,8 @@ class PlanExecutorTest extends FreeSpec {
|
||||||
val nActions = 100000
|
val nActions = 100000
|
||||||
val bucket = Bucket("bucket")
|
val bucket = Bucket("bucket")
|
||||||
val remoteKey = RemoteKey("remoteKey")
|
val remoteKey = RemoteKey("remoteKey")
|
||||||
val input = (1 to nActions).toStream.map(DoNothing(bucket, remoteKey, _))
|
val input =
|
||||||
|
LazyList.from(1 to nActions).map(DoNothing(bucket, remoteKey, _))
|
||||||
|
|
||||||
val syncTotals = SyncTotals.empty
|
val syncTotals = SyncTotals.empty
|
||||||
val archiveTask = UnversionedMirrorArchive.default(syncTotals)
|
val archiveTask = UnversionedMirrorArchive.default(syncTotals)
|
||||||
|
@ -45,7 +46,8 @@ class PlanExecutorTest extends FreeSpec {
|
||||||
new DefaultRuntime {}.unsafeRunSync(program.provide(TestEnv)).toEither
|
new DefaultRuntime {}.unsafeRunSync(program.provide(TestEnv)).toEither
|
||||||
|
|
||||||
val expected = Right(
|
val expected = Right(
|
||||||
(1 to nActions).toStream
|
LazyList
|
||||||
|
.from(1 to nActions)
|
||||||
.map(_ => StorageQueueEvent.DoNothingQueueEvent(remoteKey)))
|
.map(_ => StorageQueueEvent.DoNothingQueueEvent(remoteKey)))
|
||||||
assertResult(expected)(result)
|
assertResult(expected)(result)
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,7 +12,9 @@ trait HexEncoder {
|
||||||
def decode(hexString: String): Array[Byte] =
|
def decode(hexString: String): Array[Byte] =
|
||||||
hexString
|
hexString
|
||||||
.replaceAll("[^0-9A-Fa-f]", "")
|
.replaceAll("[^0-9A-Fa-f]", "")
|
||||||
|
.toSeq
|
||||||
.sliding(2, 2)
|
.sliding(2, 2)
|
||||||
|
.map(_.unwrap)
|
||||||
.toArray
|
.toArray
|
||||||
.map(Integer.parseInt(_, 16).toByte)
|
.map(Integer.parseInt(_, 16).toByte)
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,7 @@ package net.kemitix.thorp.domain
|
||||||
|
|
||||||
object NonUnit {
|
object NonUnit {
|
||||||
@specialized def ~*[A](evaluateForSideEffectOnly: A): Unit = {
|
@specialized def ~*[A](evaluateForSideEffectOnly: A): Unit = {
|
||||||
val _: A = evaluateForSideEffectOnly
|
val _ = evaluateForSideEffectOnly
|
||||||
() //Return unit to prevent warning due to discarding value
|
() //Return unit to prevent warning due to discarding value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,16 +1,18 @@
|
||||||
package net.kemitix.thorp.domain
|
package net.kemitix.thorp.domain
|
||||||
|
|
||||||
|
import scala.collection.MapView
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A list of objects and their MD5 hash values.
|
* A list of objects and their MD5 hash values.
|
||||||
*/
|
*/
|
||||||
final case class RemoteObjects private (
|
final case class RemoteObjects private (
|
||||||
byHash: Map[MD5Hash, Set[RemoteKey]],
|
byHash: MapView[MD5Hash, Set[RemoteKey]],
|
||||||
byKey: Map[RemoteKey, MD5Hash]
|
byKey: MapView[RemoteKey, MD5Hash]
|
||||||
)
|
)
|
||||||
|
|
||||||
object RemoteObjects {
|
object RemoteObjects {
|
||||||
val empty: RemoteObjects = RemoteObjects(Map.empty, Map.empty)
|
val empty: RemoteObjects = RemoteObjects(MapView.empty, MapView.empty)
|
||||||
def create(byHash: Map[MD5Hash, Set[RemoteKey]],
|
def create(byHash: MapView[MD5Hash, Set[RemoteKey]],
|
||||||
byKey: Map[RemoteKey, MD5Hash]): RemoteObjects =
|
byKey: MapView[RemoteKey, MD5Hash]): RemoteObjects =
|
||||||
RemoteObjects(byHash, byKey)
|
RemoteObjects(byHash, byKey)
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,7 @@ import java.util.stream
|
||||||
|
|
||||||
import zio.{Task, RIO, UIO, ZIO, ZManaged}
|
import zio.{Task, RIO, UIO, ZIO, ZManaged}
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.jdk.CollectionConverters._
|
||||||
|
|
||||||
trait FileSystem {
|
trait FileSystem {
|
||||||
val filesystem: FileSystem.Service
|
val filesystem: FileSystem.Service
|
||||||
|
|
|
@ -11,12 +11,12 @@ import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash
|
||||||
import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey
|
import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey
|
||||||
import zio.{Task, RIO}
|
import zio.{Task, RIO}
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.jdk.CollectionConverters._
|
||||||
|
|
||||||
trait Lister {
|
trait Lister {
|
||||||
|
|
||||||
private type Token = String
|
private type Token = String
|
||||||
case class Batch(summaries: Stream[S3ObjectSummary], more: Option[Token])
|
case class Batch(summaries: LazyList[S3ObjectSummary], more: Option[Token])
|
||||||
|
|
||||||
def listObjects(amazonS3: AmazonS3.Client)(
|
def listObjects(amazonS3: AmazonS3.Client)(
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
|
@ -34,12 +34,12 @@ trait Lister {
|
||||||
def fetchBatch: ListObjectsV2Request => RIO[Console, Batch] =
|
def fetchBatch: ListObjectsV2Request => RIO[Console, Batch] =
|
||||||
request => ListerLogger.logFetchBatch *> tryFetchBatch(amazonS3)(request)
|
request => ListerLogger.logFetchBatch *> tryFetchBatch(amazonS3)(request)
|
||||||
|
|
||||||
def fetchMore: Option[Token] => RIO[Console, Stream[S3ObjectSummary]] = {
|
def fetchMore: Option[Token] => RIO[Console, LazyList[S3ObjectSummary]] = {
|
||||||
case None => RIO.succeed(Stream.empty)
|
case None => RIO.succeed(LazyList.empty)
|
||||||
case Some(token) => fetch(requestMore(token))
|
case Some(token) => fetch(requestMore(token))
|
||||||
}
|
}
|
||||||
|
|
||||||
def fetch: ListObjectsV2Request => RIO[Console, Stream[S3ObjectSummary]] =
|
def fetch: ListObjectsV2Request => RIO[Console, LazyList[S3ObjectSummary]] =
|
||||||
request =>
|
request =>
|
||||||
for {
|
for {
|
||||||
batch <- fetchBatch(request)
|
batch <- fetchBatch(request)
|
||||||
|
@ -60,8 +60,8 @@ trait Lister {
|
||||||
.map(result => Batch(objectSummaries(result), moreToken(result)))
|
.map(result => Batch(objectSummaries(result), moreToken(result)))
|
||||||
|
|
||||||
private def objectSummaries(
|
private def objectSummaries(
|
||||||
result: ListObjectsV2Result): Stream[S3ObjectSummary] =
|
result: ListObjectsV2Result): LazyList[S3ObjectSummary] =
|
||||||
result.getObjectSummaries.asScala.toStream
|
LazyList.from(result.getObjectSummaries.asScala)
|
||||||
|
|
||||||
private def moreToken(result: ListObjectsV2Result): Option[String] =
|
private def moreToken(result: ListObjectsV2Result): Option[String] =
|
||||||
if (result.isTruncated) Some(result.getNextContinuationToken)
|
if (result.isTruncated) Some(result.getNextContinuationToken)
|
||||||
|
|
|
@ -3,14 +3,16 @@ package net.kemitix.thorp.storage.aws
|
||||||
import com.amazonaws.services.s3.model.S3ObjectSummary
|
import com.amazonaws.services.s3.model.S3ObjectSummary
|
||||||
import net.kemitix.thorp.domain.{MD5Hash, RemoteKey}
|
import net.kemitix.thorp.domain.{MD5Hash, RemoteKey}
|
||||||
|
|
||||||
|
import scala.collection.MapView
|
||||||
|
|
||||||
object S3ObjectsByHash {
|
object S3ObjectsByHash {
|
||||||
|
|
||||||
def byHash(
|
def byHash(
|
||||||
os: Stream[S3ObjectSummary]
|
os: LazyList[S3ObjectSummary]
|
||||||
): Map[MD5Hash, Set[RemoteKey]] = {
|
): MapView[MD5Hash, Set[RemoteKey]] = {
|
||||||
val mD5HashToS3Objects: Map[MD5Hash, Stream[S3ObjectSummary]] =
|
val mD5HashToS3Objects: Map[MD5Hash, LazyList[S3ObjectSummary]] =
|
||||||
os.groupBy(o => MD5Hash(o.getETag.filter(_ != '"')))
|
os.groupBy(o => MD5Hash(o.getETag.filter(_ != '"')))
|
||||||
mD5HashToS3Objects.mapValues { os =>
|
mD5HashToS3Objects.view.mapValues { os =>
|
||||||
os.map(_.getKey).map(RemoteKey(_)).toSet
|
os.map(_.getKey).map(RemoteKey(_)).toSet
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,15 +3,19 @@ package net.kemitix.thorp.storage.aws
|
||||||
import com.amazonaws.services.s3.model.S3ObjectSummary
|
import com.amazonaws.services.s3.model.S3ObjectSummary
|
||||||
import net.kemitix.thorp.domain.{MD5Hash, RemoteKey}
|
import net.kemitix.thorp.domain.{MD5Hash, RemoteKey}
|
||||||
|
|
||||||
|
import scala.collection.MapView
|
||||||
|
|
||||||
object S3ObjectsByKey {
|
object S3ObjectsByKey {
|
||||||
|
|
||||||
def byKey(os: Stream[S3ObjectSummary]): Map[RemoteKey, MD5Hash] =
|
def byKey(os: LazyList[S3ObjectSummary]): MapView[RemoteKey, MD5Hash] =
|
||||||
os.map { o =>
|
os.map { o =>
|
||||||
{
|
{
|
||||||
val remoteKey = RemoteKey(o.getKey)
|
val remoteKey = RemoteKey(o.getKey)
|
||||||
val hash = MD5Hash(o.getETag)
|
val hash = MD5Hash(o.getETag)
|
||||||
(remoteKey, hash)
|
(remoteKey, hash)
|
||||||
}
|
}
|
||||||
}.toMap
|
}
|
||||||
|
.toMap
|
||||||
|
.view
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,7 @@ import net.kemitix.thorp.console._
|
||||||
import net.kemitix.thorp.domain.NonUnit.~*
|
import net.kemitix.thorp.domain.NonUnit.~*
|
||||||
import net.kemitix.thorp.domain._
|
import net.kemitix.thorp.domain._
|
||||||
import org.scalatest.FreeSpec
|
import org.scalatest.FreeSpec
|
||||||
|
import org.scalatest.Matchers._
|
||||||
import zio.internal.PlatformLive
|
import zio.internal.PlatformLive
|
||||||
import zio.{Runtime, Task, UIO}
|
import zio.{Runtime, Task, UIO}
|
||||||
|
|
||||||
|
@ -29,16 +30,17 @@ class ListerTest extends FreeSpec {
|
||||||
val etag = "etag"
|
val etag = "etag"
|
||||||
val expectedHashMap = Map(MD5Hash(etag) -> Set(RemoteKey(key)))
|
val expectedHashMap = Map(MD5Hash(etag) -> Set(RemoteKey(key)))
|
||||||
val expectedKeyMap = Map(RemoteKey(key) -> MD5Hash(etag))
|
val expectedKeyMap = Map(RemoteKey(key) -> MD5Hash(etag))
|
||||||
val expected = Right(RemoteObjects(expectedHashMap, expectedKeyMap))
|
|
||||||
new AmazonS3ClientTestFixture {
|
new AmazonS3ClientTestFixture {
|
||||||
~*(
|
|
||||||
(fixture.amazonS3Client.listObjectsV2 _)
|
(fixture.amazonS3Client.listObjectsV2 _)
|
||||||
.when()
|
.when()
|
||||||
.returns(_ => {
|
.returns(_ => {
|
||||||
UIO.succeed(objectResults(nowDate, key, etag, false))
|
UIO.succeed(objectResults(nowDate, key, etag, truncated = false))
|
||||||
}))
|
})
|
||||||
private val result = invoke(fixture.amazonS3Client)(bucket, prefix)
|
private val result = invoke(fixture.amazonS3Client)(bucket, prefix)
|
||||||
~*(assertResult(expected)(result))
|
private val hashMap = result.map(_.byHash).map(m => Map.from(m))
|
||||||
|
private val keyMap = result.map(_.byKey).map(m => Map.from(m))
|
||||||
|
hashMap should be(Right(expectedHashMap))
|
||||||
|
keyMap should be(Right(expectedKeyMap))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -56,19 +58,23 @@ class ListerTest extends FreeSpec {
|
||||||
RemoteKey(key1) -> MD5Hash(etag1),
|
RemoteKey(key1) -> MD5Hash(etag1),
|
||||||
RemoteKey(key2) -> MD5Hash(etag2)
|
RemoteKey(key2) -> MD5Hash(etag2)
|
||||||
)
|
)
|
||||||
val expected = Right(RemoteObjects(expectedHashMap, expectedKeyMap))
|
|
||||||
new AmazonS3ClientTestFixture {
|
new AmazonS3ClientTestFixture {
|
||||||
~*(
|
|
||||||
(fixture.amazonS3Client.listObjectsV2 _)
|
(fixture.amazonS3Client.listObjectsV2 _)
|
||||||
.when()
|
.when()
|
||||||
.returns(_ => UIO(objectResults(nowDate, key1, etag1, true)))
|
.returns(_ =>
|
||||||
.noMoreThanOnce())
|
UIO(objectResults(nowDate, key1, etag1, truncated = true)))
|
||||||
~*(
|
.noMoreThanOnce()
|
||||||
|
|
||||||
(fixture.amazonS3Client.listObjectsV2 _)
|
(fixture.amazonS3Client.listObjectsV2 _)
|
||||||
.when()
|
.when()
|
||||||
.returns(_ => UIO(objectResults(nowDate, key2, etag2, false))))
|
.returns(_ =>
|
||||||
|
UIO(objectResults(nowDate, key2, etag2, truncated = false)))
|
||||||
private val result = invoke(fixture.amazonS3Client)(bucket, prefix)
|
private val result = invoke(fixture.amazonS3Client)(bucket, prefix)
|
||||||
~*(assertResult(expected)(result))
|
private val hashMap = result.map(_.byHash).map(m => Map.from(m))
|
||||||
|
private val keyMap = result.map(_.byKey).map(m => Map.from(m))
|
||||||
|
hashMap should be(Right(expectedHashMap))
|
||||||
|
keyMap should be(Right(expectedKeyMap))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -12,12 +12,12 @@ class S3ObjectsByHashSuite extends FunSpec {
|
||||||
val key2 = RemoteKey("key-2")
|
val key2 = RemoteKey("key-2")
|
||||||
val o1 = s3object(hash, key1)
|
val o1 = s3object(hash, key1)
|
||||||
val o2 = s3object(hash, key2)
|
val o2 = s3object(hash, key2)
|
||||||
val os = Stream(o1, o2)
|
val os = LazyList(o1, o2)
|
||||||
it("should group by the hash value") {
|
it("should group by the hash value") {
|
||||||
val expected: Map[MD5Hash, Set[RemoteKey]] = Map(
|
val expected: Map[MD5Hash, Set[RemoteKey]] = Map(
|
||||||
hash -> Set(key1, key2)
|
hash -> Set(key1, key2)
|
||||||
)
|
)
|
||||||
val result = S3ObjectsByHash.byHash(os)
|
val result = Map.from(S3ObjectsByHash.byHash(os))
|
||||||
assertResult(expected)(result)
|
assertResult(expected)(result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,6 +7,8 @@ import net.kemitix.thorp.domain._
|
||||||
import org.scalamock.scalatest.MockFactory
|
import org.scalamock.scalatest.MockFactory
|
||||||
import org.scalatest.FunSpec
|
import org.scalatest.FunSpec
|
||||||
|
|
||||||
|
import scala.collection.MapView
|
||||||
|
|
||||||
class StorageServiceSuite extends FunSpec with MockFactory {
|
class StorageServiceSuite extends FunSpec with MockFactory {
|
||||||
|
|
||||||
private val source = Resource(this, "upload")
|
private val source = Resource(this, "upload")
|
||||||
|
@ -36,11 +38,11 @@ class StorageServiceSuite extends FunSpec with MockFactory {
|
||||||
sources,
|
sources,
|
||||||
prefix)
|
prefix)
|
||||||
s3ObjectsData = RemoteObjects(
|
s3ObjectsData = RemoteObjects(
|
||||||
byHash = Map(
|
byHash = MapView(
|
||||||
hash -> Set(key, keyOtherKey.remoteKey),
|
hash -> Set(key, keyOtherKey.remoteKey),
|
||||||
diffHash -> Set(keyDiffHash.remoteKey)
|
diffHash -> Set(keyDiffHash.remoteKey)
|
||||||
),
|
),
|
||||||
byKey = Map(
|
byKey = MapView(
|
||||||
key -> hash,
|
key -> hash,
|
||||||
keyOtherKey.remoteKey -> hash,
|
keyOtherKey.remoteKey -> hash,
|
||||||
keyDiffHash.remoteKey -> diffHash
|
keyDiffHash.remoteKey -> diffHash
|
||||||
|
|
Loading…
Reference in a new issue