From fa31882e518fe5aac72635814ac12497d4bb9198 Mon Sep 17 00:00:00 2001 From: Paul Campbell Date: Fri, 24 May 2019 07:46:18 +0100 Subject: [PATCH] [S3MetaDataEnricher,ActionSubmitter] return streams Help to perpetuate the map/flatMap structure within for-comprehension in Sync's run method. Added DoNothing and DoNothingS3Action --- .../scala/net/kemitix/s3thorp/Action.scala | 1 + .../net/kemitix/s3thorp/ActionGenerator.scala | 26 +++++++++++------- .../net/kemitix/s3thorp/ActionSubmitter.scala | 27 ++++++++++--------- .../scala/net/kemitix/s3thorp/S3Action.scala | 4 +++ .../kemitix/s3thorp/S3MetaDataEnricher.scala | 14 +++++----- src/main/scala/net/kemitix/s3thorp/Sync.scala | 16 +++++------ 6 files changed, 49 insertions(+), 39 deletions(-) diff --git a/src/main/scala/net/kemitix/s3thorp/Action.scala b/src/main/scala/net/kemitix/s3thorp/Action.scala index 8f1a850..f261ec5 100644 --- a/src/main/scala/net/kemitix/s3thorp/Action.scala +++ b/src/main/scala/net/kemitix/s3thorp/Action.scala @@ -1,6 +1,7 @@ package net.kemitix.s3thorp sealed trait Action +case class DoNothing(remoteKey: RemoteKey) extends Action case class ToUpload(localFile: LocalFile) extends Action case class ToCopy(sourceKey: RemoteKey, hash: MD5Hash, diff --git a/src/main/scala/net/kemitix/s3thorp/ActionGenerator.scala b/src/main/scala/net/kemitix/s3thorp/ActionGenerator.scala index 67d4df1..235c8fa 100644 --- a/src/main/scala/net/kemitix/s3thorp/ActionGenerator.scala +++ b/src/main/scala/net/kemitix/s3thorp/ActionGenerator.scala @@ -8,9 +8,9 @@ trait ActionGenerator s3MetaData match { // #1 local exists, remote exists, remote matches - do nothing - case S3MetaData(localFile, _, Some(RemoteMetaData(_, remoteHash, _))) + case S3MetaData(localFile, _, Some(RemoteMetaData(remoteKey, remoteHash, _))) if localFile.hash == remoteHash - => Stream.empty + => doNothing(remoteKey) // #2 local exists, remote is missing, other matches - copy case S3MetaData(localFile, otherMatches, None) @@ -33,16 +33,22 @@ trait ActionGenerator if hashMatches.isEmpty => uploadFile(localFile) - case _ => Stream.empty } - private def uploadFile(localFile: LocalFile) = Stream(ToUpload(localFile)) + private def doNothing(remoteKey: RemoteKey) = + Stream( + DoNothing(remoteKey)) - private def copyFile(localFile: LocalFile, matchByHash: Set[RemoteMetaData]): Stream[Action] = - Stream(ToCopy( - sourceKey = matchByHash.head.remoteKey, - hash = localFile.hash, - targetKey = localFile.remoteKey - )) + private def uploadFile(localFile: LocalFile) = + Stream( + ToUpload(localFile)) + + private def copyFile(localFile: LocalFile, + matchByHash: Set[RemoteMetaData]): Stream[Action] = + Stream( + ToCopy( + sourceKey = matchByHash.head.remoteKey, + hash = localFile.hash, + targetKey = localFile.remoteKey)) } diff --git a/src/main/scala/net/kemitix/s3thorp/ActionSubmitter.scala b/src/main/scala/net/kemitix/s3thorp/ActionSubmitter.scala index e0db077..044894f 100644 --- a/src/main/scala/net/kemitix/s3thorp/ActionSubmitter.scala +++ b/src/main/scala/net/kemitix/s3thorp/ActionSubmitter.scala @@ -8,17 +8,20 @@ trait ActionSubmitter with Logging { def submitAction(action: Action) - (implicit c: Config): IO[S3Action] = { - action match { - case ToUpload(file) => - log4(s" Upload: ${file.relative}") - upload(file, c.bucket) - case ToCopy(sourceKey, hash, targetKey) => - log4(s" Copy: $sourceKey => $targetKey") - copy(c.bucket, sourceKey, hash, targetKey) - case ToDelete(remoteKey) => - log4(s" Delete: $remoteKey") - delete(c.bucket, remoteKey) - } + (implicit c: Config): Stream[IO[S3Action]] = { + Stream( + action match { + case ToUpload(file) => + log4(s" Upload: ${file.relative}") + upload(file, c.bucket) + case ToCopy(sourceKey, hash, targetKey) => + log4(s" Copy: $sourceKey => $targetKey") + copy(c.bucket, sourceKey, hash, targetKey) + case ToDelete(remoteKey) => + log4(s" Delete: $remoteKey") + delete(c.bucket, remoteKey) + case DoNothing(remoteKey) => IO { + DoNothingS3Action(remoteKey)} + }) } } diff --git a/src/main/scala/net/kemitix/s3thorp/S3Action.scala b/src/main/scala/net/kemitix/s3thorp/S3Action.scala index 2f4f630..903dc9e 100644 --- a/src/main/scala/net/kemitix/s3thorp/S3Action.scala +++ b/src/main/scala/net/kemitix/s3thorp/S3Action.scala @@ -9,6 +9,10 @@ sealed trait S3Action { } +case class DoNothingS3Action(remoteKey: RemoteKey) extends S3Action { + override val order: Int = 0 +} + case class CopyS3Action(remoteKey: RemoteKey) extends S3Action { override val order: Int = 1 } diff --git a/src/main/scala/net/kemitix/s3thorp/S3MetaDataEnricher.scala b/src/main/scala/net/kemitix/s3thorp/S3MetaDataEnricher.scala index e00d94e..a9fff3f 100644 --- a/src/main/scala/net/kemitix/s3thorp/S3MetaDataEnricher.scala +++ b/src/main/scala/net/kemitix/s3thorp/S3MetaDataEnricher.scala @@ -8,14 +8,12 @@ trait S3MetaDataEnricher def getMetadata(localFile: LocalFile) (implicit c: Config, - s3ObjectsData: S3ObjectsData): S3MetaData = { - val (keyMatches: Option[HashModified], hashMatches: Set[KeyModified]) = getS3Status(localFile) - - S3MetaData(localFile, - matchByKey = keyMatches.map{kmAsRemoteMetaData(localFile.remoteKey)}, - matchByHash = hashMatches.map(km => RemoteMetaData(km.key, localFile.hash, km.modified))) + s3ObjectsData: S3ObjectsData): Stream[S3MetaData] = { + val (keyMatches, hashMatches) = getS3Status(localFile) + Stream( + S3MetaData(localFile, + matchByKey = keyMatches map { hm => RemoteMetaData(localFile.remoteKey, hm.hash, hm.modified) }, + matchByHash = hashMatches map { km => RemoteMetaData(km.key, localFile.hash, km.modified) })) } - private def kmAsRemoteMetaData(key: RemoteKey): HashModified => RemoteMetaData = hm => RemoteMetaData(key, hm.hash, hm.modified) - } diff --git a/src/main/scala/net/kemitix/s3thorp/Sync.scala b/src/main/scala/net/kemitix/s3thorp/Sync.scala index 86be8f9..c556e54 100644 --- a/src/main/scala/net/kemitix/s3thorp/Sync.scala +++ b/src/main/scala/net/kemitix/s3thorp/Sync.scala @@ -1,7 +1,5 @@ package net.kemitix.s3thorp -import java.nio.file.Paths - import cats.effect.IO import cats.implicits._ import net.kemitix.s3thorp.awssdk.{S3Client, S3ObjectsData} @@ -17,18 +15,18 @@ class Sync(s3Client: S3Client) logRunStart(c).unsafeRunSync listObjects(c.bucket, c.prefix) .map { implicit s3ObjectsData => { - val actions = (for { + val actions = for { file <- findFiles(c.source) - meta = getMetadata(file) - action <- createActions(meta) - ioS3Action = submitAction(action) - } yield ioS3Action).sequence - val sorted = sort(actions) + data <- getMetadata(file) + action <- createActions(data) + s3Action <- submitAction(action) + } yield s3Action + val sorted = sort(actions.sequence) val list = sorted.unsafeRunSync.toList val delActions = (for { key <- s3ObjectsData.byKey.keys if key.isMissingLocally - ioDelAction = submitAction(ToDelete(key)) + ioDelAction <- submitAction(ToDelete(key)) } yield ioDelAction).toStream.sequence val delList = delActions.unsafeRunSync.toList logRunFinished(list ++ delList).unsafeRunSync