[S3MetaDataEnricher,ActionSubmitter] return streams

Help to perpetuate the map/flatMap structure within for-comprehension
in Sync's run method.

Added DoNothing and DoNothingS3Action
This commit is contained in:
Paul Campbell 2019-05-24 07:46:18 +01:00
parent bffc6c032c
commit fa31882e51
6 changed files with 49 additions and 39 deletions

View file

@ -1,6 +1,7 @@
package net.kemitix.s3thorp package net.kemitix.s3thorp
sealed trait Action sealed trait Action
case class DoNothing(remoteKey: RemoteKey) extends Action
case class ToUpload(localFile: LocalFile) extends Action case class ToUpload(localFile: LocalFile) extends Action
case class ToCopy(sourceKey: RemoteKey, case class ToCopy(sourceKey: RemoteKey,
hash: MD5Hash, hash: MD5Hash,

View file

@ -8,9 +8,9 @@ trait ActionGenerator
s3MetaData match { s3MetaData match {
// #1 local exists, remote exists, remote matches - do nothing // #1 local exists, remote exists, remote matches - do nothing
case S3MetaData(localFile, _, Some(RemoteMetaData(_, remoteHash, _))) case S3MetaData(localFile, _, Some(RemoteMetaData(remoteKey, remoteHash, _)))
if localFile.hash == remoteHash if localFile.hash == remoteHash
=> Stream.empty => doNothing(remoteKey)
// #2 local exists, remote is missing, other matches - copy // #2 local exists, remote is missing, other matches - copy
case S3MetaData(localFile, otherMatches, None) case S3MetaData(localFile, otherMatches, None)
@ -33,16 +33,22 @@ trait ActionGenerator
if hashMatches.isEmpty if hashMatches.isEmpty
=> uploadFile(localFile) => uploadFile(localFile)
case _ => Stream.empty
} }
private def uploadFile(localFile: LocalFile) = Stream(ToUpload(localFile)) private def doNothing(remoteKey: RemoteKey) =
Stream(
DoNothing(remoteKey))
private def copyFile(localFile: LocalFile, matchByHash: Set[RemoteMetaData]): Stream[Action] = private def uploadFile(localFile: LocalFile) =
Stream(ToCopy( Stream(
sourceKey = matchByHash.head.remoteKey, ToUpload(localFile))
hash = localFile.hash,
targetKey = localFile.remoteKey private def copyFile(localFile: LocalFile,
)) matchByHash: Set[RemoteMetaData]): Stream[Action] =
Stream(
ToCopy(
sourceKey = matchByHash.head.remoteKey,
hash = localFile.hash,
targetKey = localFile.remoteKey))
} }

View file

@ -8,17 +8,20 @@ trait ActionSubmitter
with Logging { with Logging {
def submitAction(action: Action) def submitAction(action: Action)
(implicit c: Config): IO[S3Action] = { (implicit c: Config): Stream[IO[S3Action]] = {
action match { Stream(
case ToUpload(file) => action match {
log4(s" Upload: ${file.relative}") case ToUpload(file) =>
upload(file, c.bucket) log4(s" Upload: ${file.relative}")
case ToCopy(sourceKey, hash, targetKey) => upload(file, c.bucket)
log4(s" Copy: $sourceKey => $targetKey") case ToCopy(sourceKey, hash, targetKey) =>
copy(c.bucket, sourceKey, hash, targetKey) log4(s" Copy: $sourceKey => $targetKey")
case ToDelete(remoteKey) => copy(c.bucket, sourceKey, hash, targetKey)
log4(s" Delete: $remoteKey") case ToDelete(remoteKey) =>
delete(c.bucket, remoteKey) log4(s" Delete: $remoteKey")
} delete(c.bucket, remoteKey)
case DoNothing(remoteKey) => IO {
DoNothingS3Action(remoteKey)}
})
} }
} }

View file

@ -9,6 +9,10 @@ sealed trait S3Action {
} }
case class DoNothingS3Action(remoteKey: RemoteKey) extends S3Action {
override val order: Int = 0
}
case class CopyS3Action(remoteKey: RemoteKey) extends S3Action { case class CopyS3Action(remoteKey: RemoteKey) extends S3Action {
override val order: Int = 1 override val order: Int = 1
} }

View file

@ -8,14 +8,12 @@ trait S3MetaDataEnricher
def getMetadata(localFile: LocalFile) def getMetadata(localFile: LocalFile)
(implicit c: Config, (implicit c: Config,
s3ObjectsData: S3ObjectsData): S3MetaData = { s3ObjectsData: S3ObjectsData): Stream[S3MetaData] = {
val (keyMatches: Option[HashModified], hashMatches: Set[KeyModified]) = getS3Status(localFile) val (keyMatches, hashMatches) = getS3Status(localFile)
Stream(
S3MetaData(localFile, S3MetaData(localFile,
matchByKey = keyMatches.map{kmAsRemoteMetaData(localFile.remoteKey)}, matchByKey = keyMatches map { hm => RemoteMetaData(localFile.remoteKey, hm.hash, hm.modified) },
matchByHash = hashMatches.map(km => RemoteMetaData(km.key, localFile.hash, km.modified))) matchByHash = hashMatches map { km => RemoteMetaData(km.key, localFile.hash, km.modified) }))
} }
private def kmAsRemoteMetaData(key: RemoteKey): HashModified => RemoteMetaData = hm => RemoteMetaData(key, hm.hash, hm.modified)
} }

View file

@ -1,7 +1,5 @@
package net.kemitix.s3thorp package net.kemitix.s3thorp
import java.nio.file.Paths
import cats.effect.IO import cats.effect.IO
import cats.implicits._ import cats.implicits._
import net.kemitix.s3thorp.awssdk.{S3Client, S3ObjectsData} import net.kemitix.s3thorp.awssdk.{S3Client, S3ObjectsData}
@ -17,18 +15,18 @@ class Sync(s3Client: S3Client)
logRunStart(c).unsafeRunSync logRunStart(c).unsafeRunSync
listObjects(c.bucket, c.prefix) listObjects(c.bucket, c.prefix)
.map { implicit s3ObjectsData => { .map { implicit s3ObjectsData => {
val actions = (for { val actions = for {
file <- findFiles(c.source) file <- findFiles(c.source)
meta = getMetadata(file) data <- getMetadata(file)
action <- createActions(meta) action <- createActions(data)
ioS3Action = submitAction(action) s3Action <- submitAction(action)
} yield ioS3Action).sequence } yield s3Action
val sorted = sort(actions) val sorted = sort(actions.sequence)
val list = sorted.unsafeRunSync.toList val list = sorted.unsafeRunSync.toList
val delActions = (for { val delActions = (for {
key <- s3ObjectsData.byKey.keys key <- s3ObjectsData.byKey.keys
if key.isMissingLocally if key.isMissingLocally
ioDelAction = submitAction(ToDelete(key)) ioDelAction <- submitAction(ToDelete(key))
} yield ioDelAction).toStream.sequence } yield ioDelAction).toStream.sequence
val delList = delActions.unsafeRunSync.toList val delList = delActions.unsafeRunSync.toList
logRunFinished(list ++ delList).unsafeRunSync logRunFinished(list ++ delList).unsafeRunSync