diff --git a/README.org b/README.org index 7228d84..6f1a137 100644 --- a/README.org +++ b/README.org @@ -20,6 +20,21 @@ hash of the file contents. -v, --verbose Verbosity level (1-5) #+end_example +* Behaviour + +When considering a local file, the following table governs what should happen: + +|---+------------+------------+------------------+--------------------+---------------------| +| # | local file | remote key | hash of same key | hash of other keys | action | +|---+------------+------------+------------------+--------------------+---------------------| +| 1 | exists | exists | matches | - | do nothing | +| 2 | exists | is missing | - | matches | copy from other key | +| 3 | exists | is missing | - | no matches | upload | +| 4 | exists | exists | no match | matches | copy from other key | +| 5 | exists | exists | no match | no matches | upload | +| 6 | is missing | exists | - | - | delete | +|---+------------+------------+------------------+--------------------+---------------------| + * Creating Native Images - Download and install GraalVM diff --git a/src/main/scala/net/kemitix/s3thorp/Action.scala b/src/main/scala/net/kemitix/s3thorp/Action.scala new file mode 100644 index 0000000..8f1a850 --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/Action.scala @@ -0,0 +1,8 @@ +package net.kemitix.s3thorp + +sealed trait Action +case class ToUpload(localFile: LocalFile) extends Action +case class ToCopy(sourceKey: RemoteKey, + hash: MD5Hash, + targetKey: RemoteKey) extends Action +case class ToDelete(remoteKey: RemoteKey) extends Action diff --git a/src/main/scala/net/kemitix/s3thorp/ActionGenerator.scala b/src/main/scala/net/kemitix/s3thorp/ActionGenerator.scala new file mode 100644 index 0000000..67d4df1 --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/ActionGenerator.scala @@ -0,0 +1,48 @@ +package net.kemitix.s3thorp + +trait ActionGenerator + extends Logging { + + def createActions(s3MetaData: S3MetaData) + (implicit c: Config): Stream[Action] = + s3MetaData match { + + // #1 local exists, remote exists, remote matches - do nothing + case S3MetaData(localFile, _, Some(RemoteMetaData(_, remoteHash, _))) + if localFile.hash == remoteHash + => Stream.empty + + // #2 local exists, remote is missing, other matches - copy + case S3MetaData(localFile, otherMatches, None) + if otherMatches.nonEmpty + => copyFile(localFile, otherMatches) + + // #3 local exists, remote is missing, other no matches - upload + case S3MetaData(localFile, otherMatches, None) + if otherMatches.isEmpty + => uploadFile(localFile) + + // #4 local exists, remote exists, remote no match, other matches - copy + case S3MetaData(localFile, otherMatches, Some(RemoteMetaData(_, remoteHash, _))) + if localFile.hash != remoteHash && + otherMatches.nonEmpty + => copyFile(localFile, otherMatches) + + // #5 local exists, remote exists, remote no match, other no matches - upload + case S3MetaData(localFile, hashMatches, Some(_)) + if hashMatches.isEmpty + => uploadFile(localFile) + + case _ => Stream.empty + } + + private def uploadFile(localFile: LocalFile) = Stream(ToUpload(localFile)) + + private def copyFile(localFile: LocalFile, matchByHash: Set[RemoteMetaData]): Stream[Action] = + Stream(ToCopy( + sourceKey = matchByHash.head.remoteKey, + hash = localFile.hash, + targetKey = localFile.remoteKey + )) + +} diff --git a/src/main/scala/net/kemitix/s3thorp/ActionSubmitter.scala b/src/main/scala/net/kemitix/s3thorp/ActionSubmitter.scala new file mode 100644 index 0000000..e0db077 --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/ActionSubmitter.scala @@ -0,0 +1,24 @@ +package net.kemitix.s3thorp + +import cats.effect.IO +import net.kemitix.s3thorp.awssdk.S3Client + +trait ActionSubmitter + extends S3Client + with Logging { + + def submitAction(action: Action) + (implicit c: Config): IO[S3Action] = { + action match { + case ToUpload(file) => + log4(s" Upload: ${file.relative}") + upload(file, c.bucket) + case ToCopy(sourceKey, hash, targetKey) => + log4(s" Copy: $sourceKey => $targetKey") + copy(c.bucket, sourceKey, hash, targetKey) + case ToDelete(remoteKey) => + log4(s" Delete: $remoteKey") + delete(c.bucket, remoteKey) + } + } +} diff --git a/src/main/scala/net/kemitix/s3thorp/Bucket.scala b/src/main/scala/net/kemitix/s3thorp/Bucket.scala new file mode 100644 index 0000000..3ad158b --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/Bucket.scala @@ -0,0 +1,5 @@ +package net.kemitix.s3thorp + +final case class Bucket(name: String) { + +} diff --git a/src/main/scala/net/kemitix/s3thorp/Config.scala b/src/main/scala/net/kemitix/s3thorp/Config.scala index b610b76..648726a 100644 --- a/src/main/scala/net/kemitix/s3thorp/Config.scala +++ b/src/main/scala/net/kemitix/s3thorp/Config.scala @@ -1,16 +1,11 @@ package net.kemitix.s3thorp import java.io.File -import java.nio.file.Path -import net.kemitix.s3thorp.Sync.{Bucket, LocalFile} - -case class Config(bucket: Bucket = "", - prefix: String = "", +case class Config(bucket: Bucket = Bucket(""), + prefix: RemoteKey = RemoteKey(""), verbose: Int = 1, - source: LocalFile + source: File ) { - - def relativePath(file: File): Path = source.toPath.relativize(file.toPath) - + require(source.isDirectory, s"Source must be a directory: $source") } diff --git a/src/main/scala/net/kemitix/s3thorp/HashModified.scala b/src/main/scala/net/kemitix/s3thorp/HashModified.scala new file mode 100644 index 0000000..6c8f8d0 --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/HashModified.scala @@ -0,0 +1,6 @@ +package net.kemitix.s3thorp + +final case class HashModified(hash: MD5Hash, + modified: LastModified) { + +} diff --git a/src/main/scala/net/kemitix/s3thorp/KeyGenerator.scala b/src/main/scala/net/kemitix/s3thorp/KeyGenerator.scala index fa6ca40..8d5ad4b 100644 --- a/src/main/scala/net/kemitix/s3thorp/KeyGenerator.scala +++ b/src/main/scala/net/kemitix/s3thorp/KeyGenerator.scala @@ -4,11 +4,12 @@ import java.io.File trait KeyGenerator { - def generateKey(c: Config)(file: File): String = { + def generateKey(source: File, prefix: RemoteKey) + (file: File): RemoteKey = { val otherPath = file.toPath.toAbsolutePath - val sourcePath = c.source.toPath + val sourcePath = source.toPath val relativePath = sourcePath.relativize(otherPath) - s"${c.prefix}/$relativePath" + RemoteKey(s"${prefix.key}/$relativePath") } } diff --git a/src/main/scala/net/kemitix/s3thorp/KeyModified.scala b/src/main/scala/net/kemitix/s3thorp/KeyModified.scala new file mode 100644 index 0000000..2ae126f --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/KeyModified.scala @@ -0,0 +1,6 @@ +package net.kemitix.s3thorp + +final case class KeyModified(key: RemoteKey, + modified: LastModified) { + +} diff --git a/src/main/scala/net/kemitix/s3thorp/LastModified.scala b/src/main/scala/net/kemitix/s3thorp/LastModified.scala new file mode 100644 index 0000000..93b6349 --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/LastModified.scala @@ -0,0 +1,7 @@ +package net.kemitix.s3thorp + +import java.time.Instant + +final case class LastModified(when: Instant) { + +} diff --git a/src/main/scala/net/kemitix/s3thorp/LocalFile.scala b/src/main/scala/net/kemitix/s3thorp/LocalFile.scala new file mode 100644 index 0000000..a1a5030 --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/LocalFile.scala @@ -0,0 +1,25 @@ +package net.kemitix.s3thorp + +import java.io.File +import java.nio.file.Path + +case class LocalFile(file: File, + source: File, + keyGenerator: File => RemoteKey) + extends MD5HashGenerator { + + require(!file.isDirectory, s"LocalFile must not be a directory: $file") + + private lazy val myhash = md5File(file) + + def hash: MD5Hash = myhash + + // the equivalent location of the file on S3 + def remoteKey: RemoteKey = keyGenerator(file) + + def isDirectory: Boolean = file.isDirectory + + // the path of the file within the source + def relative: Path = source.toPath.relativize(file.toPath) + +} diff --git a/src/main/scala/net/kemitix/s3thorp/LocalFileStream.scala b/src/main/scala/net/kemitix/s3thorp/LocalFileStream.scala index 4586f29..5a5b971 100644 --- a/src/main/scala/net/kemitix/s3thorp/LocalFileStream.scala +++ b/src/main/scala/net/kemitix/s3thorp/LocalFileStream.scala @@ -2,18 +2,28 @@ package net.kemitix.s3thorp import java.io.File -trait LocalFileStream { +trait LocalFileStream + extends KeyGenerator + with Logging { - def streamDirectoryPaths(file: File): Stream[File] = - dirPaths(file) - .flatMap(f => recurseIntoSubDirectories(f)) + def findFiles(file: File) + (implicit c: Config): Stream[LocalFile] = { + log5(s"- Entering: $file") + val files = for { + f <- dirPaths(file) + fs <- recurseIntoSubDirectories(f) + } yield fs + log5(s"- Leaving: $file") + files + } - private def dirPaths(file: File): Stream[File] = Option(file.listFiles) - .getOrElse(throw new IllegalArgumentException(s"Directory not found $file")).toStream + private def dirPaths(file: File): Stream[File] = { + Option(file.listFiles) + .getOrElse(throw new IllegalArgumentException(s"Directory not found $file")).toStream + } - private def recurseIntoSubDirectories: File => Stream[File] = - file => - if (file.isDirectory) streamDirectoryPaths(file) - else Stream(file) + private def recurseIntoSubDirectories(file: File)(implicit c: Config): Stream[LocalFile] = + if (file.isDirectory) findFiles(file)(c) + else Stream(LocalFile(file, c.source, generateKey(c.source, c.prefix))) } diff --git a/src/main/scala/net/kemitix/s3thorp/MD5Hash.scala b/src/main/scala/net/kemitix/s3thorp/MD5Hash.scala new file mode 100644 index 0000000..8fbdfef --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/MD5Hash.scala @@ -0,0 +1,7 @@ +package net.kemitix.s3thorp + +final case class MD5Hash(hash: String) { + + require(!hash.contains("\"")) + +} diff --git a/src/main/scala/net/kemitix/s3thorp/MD5HashGenerator.scala b/src/main/scala/net/kemitix/s3thorp/MD5HashGenerator.scala new file mode 100644 index 0000000..0b1139e --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/MD5HashGenerator.scala @@ -0,0 +1,18 @@ +package net.kemitix.s3thorp + +import java.io.{File, FileInputStream} +import java.security.{DigestInputStream, MessageDigest} + +trait MD5HashGenerator + extends Logging { + + def md5File(file: File): MD5Hash = { + val buffer = new Array[Byte](8192) + val md5 = MessageDigest.getInstance("MD5") + val dis = new DigestInputStream(new FileInputStream(file), md5) + try { while (dis.read(buffer) != -1) { } } finally { dis.close() } + val hash = md5.digest.map("%02x".format(_)).mkString + MD5Hash(hash) + } + +} diff --git a/src/main/scala/net/kemitix/s3thorp/ParseArgs.scala b/src/main/scala/net/kemitix/s3thorp/ParseArgs.scala index 5141eb5..0793ba4 100644 --- a/src/main/scala/net/kemitix/s3thorp/ParseArgs.scala +++ b/src/main/scala/net/kemitix/s3thorp/ParseArgs.scala @@ -19,11 +19,11 @@ object ParseArgs { .required() .text("Source directory to sync to S3"), opt[String]('b', "bucket") - .action((str, c) => c.copy(bucket = str)) + .action((str, c) => c.copy(bucket = Bucket(str))) .required() .text("S3 bucket name"), opt[String]('p', "prefix") - .action((str, c) => c.copy(prefix = str)) + .action((str, c) => c.copy(prefix = RemoteKey(str))) .text("Prefix within the S3 Bucket"), opt[Int]('v', "verbose") .validate(i => diff --git a/src/main/scala/net/kemitix/s3thorp/RemoteKey.scala b/src/main/scala/net/kemitix/s3thorp/RemoteKey.scala new file mode 100644 index 0000000..da18f9d --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/RemoteKey.scala @@ -0,0 +1,11 @@ +package net.kemitix.s3thorp + +import java.io.File +import java.nio.file.Paths + +final case class RemoteKey(key: String) { + def asFile(implicit c: Config): File = + c.source.toPath.resolve(Paths.get(c.prefix.key).relativize(Paths.get(key))).toFile + def isMissingLocally(implicit c: Config): Boolean = + ! asFile.exists +} diff --git a/src/main/scala/net/kemitix/s3thorp/RemoteMetaData.scala b/src/main/scala/net/kemitix/s3thorp/RemoteMetaData.scala new file mode 100644 index 0000000..ee7f464 --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/RemoteMetaData.scala @@ -0,0 +1,7 @@ +package net.kemitix.s3thorp + +final case class RemoteMetaData(remoteKey: RemoteKey, + hash: MD5Hash, + lastModified: LastModified) { + +} diff --git a/src/main/scala/net/kemitix/s3thorp/S3Action.scala b/src/main/scala/net/kemitix/s3thorp/S3Action.scala new file mode 100644 index 0000000..2f4f630 --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/S3Action.scala @@ -0,0 +1,25 @@ +package net.kemitix.s3thorp + +sealed trait S3Action { + + // the remote key that was uploaded, deleted or otherwise updated by the action + def remoteKey: RemoteKey + + val order: Int + +} + +case class CopyS3Action(remoteKey: RemoteKey) extends S3Action { + override val order: Int = 1 +} +case class UploadS3Action(remoteKey: RemoteKey, + md5Hash: MD5Hash) extends S3Action { + override val order: Int = 2 +} +case class DeleteS3Action(remoteKey: RemoteKey) extends S3Action { + override val order: Int = 3 +} + +object S3Action { + implicit def ord[A <: S3Action]: Ordering[A] = Ordering.by(_.order) +} diff --git a/src/main/scala/net/kemitix/s3thorp/S3MetaData.scala b/src/main/scala/net/kemitix/s3thorp/S3MetaData.scala index 6139acc..aafd3ef 100644 --- a/src/main/scala/net/kemitix/s3thorp/S3MetaData.scala +++ b/src/main/scala/net/kemitix/s3thorp/S3MetaData.scala @@ -1,8 +1,6 @@ package net.kemitix.s3thorp -import net.kemitix.s3thorp.Sync.{MD5Hash, LastModified, LocalFile, RemoteKey} - +// For the LocalFile, the set of matching S3 objects with the same MD5Hash, and any S3 object with the same remote key case class S3MetaData(localFile: LocalFile, - remotePath: RemoteKey, - remoteHash: MD5Hash, - remoteLastModified: LastModified) \ No newline at end of file + matchByHash: Set[RemoteMetaData], + matchByKey: Option[RemoteMetaData]) \ No newline at end of file diff --git a/src/main/scala/net/kemitix/s3thorp/S3MetaDataEnricher.scala b/src/main/scala/net/kemitix/s3thorp/S3MetaDataEnricher.scala index c4f6b6c..e00d94e 100644 --- a/src/main/scala/net/kemitix/s3thorp/S3MetaDataEnricher.scala +++ b/src/main/scala/net/kemitix/s3thorp/S3MetaDataEnricher.scala @@ -1,25 +1,21 @@ package net.kemitix.s3thorp -import java.io.File - -import net.kemitix.s3thorp.Sync.{LastModified, MD5Hash} -import net.kemitix.s3thorp.awssdk.{HashLookup, S3Client} +import net.kemitix.s3thorp.awssdk.{S3ObjectsData, S3Client} trait S3MetaDataEnricher extends S3Client - with KeyGenerator with Logging { - def enrichWithS3MetaData(c: Config)(implicit hashLookup: HashLookup): File => Either[File, S3MetaData] = { - val remoteKey = generateKey(c)_ - file => { - log3(s"- Consider: ${c.relativePath(file)}")(c) - val key = remoteKey(file) - objectHead(key).map { - hlm: (MD5Hash, LastModified) => { - Right(S3MetaData(file, key, hlm._1.filter { c => c != '"' }, hlm._2)) - } - }.getOrElse(Left(file)) - } + def getMetadata(localFile: LocalFile) + (implicit c: Config, + s3ObjectsData: S3ObjectsData): S3MetaData = { + val (keyMatches: Option[HashModified], hashMatches: Set[KeyModified]) = getS3Status(localFile) + + S3MetaData(localFile, + matchByKey = keyMatches.map{kmAsRemoteMetaData(localFile.remoteKey)}, + matchByHash = hashMatches.map(km => RemoteMetaData(km.key, localFile.hash, km.modified))) } + + private def kmAsRemoteMetaData(key: RemoteKey): HashModified => RemoteMetaData = hm => RemoteMetaData(key, hm.hash, hm.modified) + } diff --git a/src/main/scala/net/kemitix/s3thorp/S3Uploader.scala b/src/main/scala/net/kemitix/s3thorp/S3Uploader.scala deleted file mode 100644 index 2c7c594..0000000 --- a/src/main/scala/net/kemitix/s3thorp/S3Uploader.scala +++ /dev/null @@ -1,23 +0,0 @@ -package net.kemitix.s3thorp - -import java.io.File - -import cats.effect.IO -import net.kemitix.s3thorp.Sync.MD5Hash -import net.kemitix.s3thorp.awssdk.S3Client - -trait S3Uploader - extends S3Client - with KeyGenerator - with Logging { - - def performUpload(c: Config): File => (File, IO[Either[Throwable, MD5Hash]]) = { - val remoteKey = generateKey(c) _ - file => { - val key = remoteKey(file) - val shortFile = c.relativePath(file) - log4(s" Upload: $shortFile")(c) - (file, upload(file, c.bucket, key)) - } - } -} diff --git a/src/main/scala/net/kemitix/s3thorp/Sync.scala b/src/main/scala/net/kemitix/s3thorp/Sync.scala index be2ae92..86be8f9 100644 --- a/src/main/scala/net/kemitix/s3thorp/Sync.scala +++ b/src/main/scala/net/kemitix/s3thorp/Sync.scala @@ -1,48 +1,59 @@ package net.kemitix.s3thorp -import java.io.File -import java.time.Instant +import java.nio.file.Paths -import cats.effect._ -import net.kemitix.s3thorp.Sync.{Bucket, LocalFile, MD5Hash, RemoteKey} -import net.kemitix.s3thorp.awssdk.{HashLookup, S3Client} +import cats.effect.IO +import cats.implicits._ +import net.kemitix.s3thorp.awssdk.{S3Client, S3ObjectsData} class Sync(s3Client: S3Client) extends LocalFileStream with S3MetaDataEnricher - with UploadSelectionFilter - with S3Uploader - with Logging { + with ActionGenerator + with ActionSubmitter + with SyncLogging { - override def upload(localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey) = - s3Client.upload(localFile, bucket, remoteKey) - - def run(c: Config): IO[Unit] = { - implicit val config: Config = c - log1(s"Bucket: ${c.bucket}, Prefix: ${c.prefix}, Source: ${c.source}") - s3Client.listObjects(c.bucket, c.prefix).map { hashLookup => { - val stream: Stream[(File, IO[Either[Throwable, MD5Hash]])] = streamDirectoryPaths(c.source).map( - enrichWithS3MetaData(c)(hashLookup)).flatMap( - uploadRequiredFilter(c)).map( - performUpload(c)) - val count: Int = stream.foldLeft(0)((a: Int, io) => { - io._2.unsafeRunSync - log1(s"- Done: ${io._1}") - a + 1 - }) - log1(s"Uploaded $count files") - }} + def run(implicit c: Config): IO[Unit] = { + logRunStart(c).unsafeRunSync + listObjects(c.bucket, c.prefix) + .map { implicit s3ObjectsData => { + val actions = (for { + file <- findFiles(c.source) + meta = getMetadata(file) + action <- createActions(meta) + ioS3Action = submitAction(action) + } yield ioS3Action).sequence + val sorted = sort(actions) + val list = sorted.unsafeRunSync.toList + val delActions = (for { + key <- s3ObjectsData.byKey.keys + if key.isMissingLocally + ioDelAction = submitAction(ToDelete(key)) + } yield ioDelAction).toStream.sequence + val delList = delActions.unsafeRunSync.toList + logRunFinished(list ++ delList).unsafeRunSync + }} } - override def listObjects(bucket: Bucket, prefix: RemoteKey): IO[HashLookup] = ??? -} - -object Sync { - - type Bucket = String // the S3 bucket name - type LocalFile = File // the file or directory - type RemoteKey = String // path within an S3 bucket - type MD5Hash = String // an MD5 hash - type LastModified = Instant // or scala equivalent - + private def sort(ioActions: IO[Stream[S3Action]]) = + ioActions.flatMap { actions => IO { actions.sorted } } + + override def upload(localFile: LocalFile, + bucket: Bucket)(implicit c: Config): IO[UploadS3Action] = + s3Client.upload(localFile, bucket) + + override def copy(bucket: Bucket, + sourceKey: RemoteKey, + hash: MD5Hash, + targetKey: RemoteKey)(implicit c: Config): IO[CopyS3Action] = + s3Client.copy(bucket, sourceKey, hash, targetKey) + + override def delete(bucket: Bucket, + remoteKey: RemoteKey)(implicit c: Config): IO[DeleteS3Action] = + s3Client.delete(bucket, remoteKey) + + override def listObjects(bucket: Bucket, + prefix: RemoteKey + )(implicit c: Config): IO[S3ObjectsData] = + s3Client.listObjects(bucket, prefix) } diff --git a/src/main/scala/net/kemitix/s3thorp/SyncLogging.scala b/src/main/scala/net/kemitix/s3thorp/SyncLogging.scala new file mode 100644 index 0000000..88ed078 --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/SyncLogging.scala @@ -0,0 +1,40 @@ +package net.kemitix.s3thorp + +import cats.effect.IO + +// Logging for the Sync class +trait SyncLogging extends Logging { + + def logRunStart(c: Config): IO[Unit] = IO { + log1(s"Bucket: ${c.bucket.name}, Prefix: ${c.prefix.key}, Source: ${c.source}")(c) + } + + def logRunFinished(actions: List[S3Action]) + (implicit c: Config): IO[Unit] = IO { + val counters = actions.foldLeft(Counters())(logActivity) + log1(s"Uploaded ${counters.uploaded} files") + log1(s"Copied ${counters.copied} files") + log1(s"Deleted ${counters.deleted} files") + } + + private def logActivity(implicit c: Config): (Counters, S3Action) => Counters = + (counters: Counters, s3Action: S3Action) => { + s3Action match { + case UploadS3Action(remoteKey, _) => + log1(s"- Uploaded: ${remoteKey.key}") + counters.copy(uploaded = counters.uploaded + 1) + case CopyS3Action(remoteKey) => + log1(s"- Copied: ${remoteKey.key}") + counters.copy(copied = counters.copied + 1) + case DeleteS3Action(remoteKey) => + log1(s"- Deleted: ${remoteKey.key}") + counters.copy(deleted = counters.deleted + 1) + case _ => counters + } + } + + case class Counters(uploaded: Int = 0, + deleted: Int = 0, + copied: Int = 0) + +} diff --git a/src/main/scala/net/kemitix/s3thorp/UploadSelectionFilter.scala b/src/main/scala/net/kemitix/s3thorp/UploadSelectionFilter.scala deleted file mode 100644 index 97e27a7..0000000 --- a/src/main/scala/net/kemitix/s3thorp/UploadSelectionFilter.scala +++ /dev/null @@ -1,35 +0,0 @@ -package net.kemitix.s3thorp - -import java.io.{File, FileInputStream} -import java.security.{DigestInputStream, MessageDigest} - -import net.kemitix.s3thorp.Sync.{LocalFile, MD5Hash} - -trait UploadSelectionFilter - extends Logging { - - private def md5File(localFile: LocalFile): MD5Hash = { - val buffer = new Array[Byte](8192) - val md5 = MessageDigest.getInstance("MD5") - - val dis = new DigestInputStream(new FileInputStream(localFile), md5) - try { while (dis.read(buffer) != -1) { } } finally { dis.close() } - - md5.digest.map("%02x".format(_)).mkString - } - - def uploadRequiredFilter(c: Config): Either[File, S3MetaData] => Stream[File] = { - case Left(file) => { - log5(s" Created: ${c.relativePath(file)}")(c) - Stream(file) - } - case Right(s3Metadata) => { - val localHash: MD5Hash = md5File(s3Metadata.localFile) - if (localHash != s3Metadata.remoteHash) { - log5(s" Updated: ${c.relativePath(s3Metadata.localFile)}")(c) - Stream(s3Metadata.localFile) - } - else Stream.empty - } - } -} diff --git a/src/main/scala/net/kemitix/s3thorp/awssdk/HashLookup.scala b/src/main/scala/net/kemitix/s3thorp/awssdk/HashLookup.scala deleted file mode 100644 index b6332d9..0000000 --- a/src/main/scala/net/kemitix/s3thorp/awssdk/HashLookup.scala +++ /dev/null @@ -1,11 +0,0 @@ -package net.kemitix.s3thorp.awssdk - -import net.kemitix.s3thorp.Sync.{LastModified, MD5Hash, RemoteKey} - -/** - * A list of objects and their MD5 hash values. - */ -case class HashLookup(byHash: Map[MD5Hash, (RemoteKey, LastModified)], - byKey: Map[RemoteKey, (MD5Hash, LastModified)]) { - -} diff --git a/src/main/scala/net/kemitix/s3thorp/awssdk/S3Client.scala b/src/main/scala/net/kemitix/s3thorp/awssdk/S3Client.scala index 8a06e66..9956a11 100644 --- a/src/main/scala/net/kemitix/s3thorp/awssdk/S3Client.scala +++ b/src/main/scala/net/kemitix/s3thorp/awssdk/S3Client.scala @@ -1,24 +1,46 @@ package net.kemitix.s3thorp.awssdk import cats.effect.IO +import com.github.j5ik2o.reactive.aws.s3.S3AsyncClient import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient -import net.kemitix.s3thorp.Sync.{Bucket, LastModified, LocalFile, MD5Hash, RemoteKey} +import net.kemitix.s3thorp._ trait S3Client { - final def objectHead(remoteKey: RemoteKey)(implicit hashLookup: HashLookup): Option[(MD5Hash, LastModified)] = - hashLookup.byKey.get(remoteKey) + final def getS3Status(localFile: LocalFile) + (implicit s3ObjectsData: S3ObjectsData): (Option[HashModified], Set[KeyModified]) = { + val matchingByKey = s3ObjectsData.byKey.get(localFile.remoteKey) + val matchingByHash = s3ObjectsData.byHash.getOrElse(localFile.hash, Set()) + (matchingByKey, matchingByHash) + } - def listObjects(bucket: Bucket, prefix: RemoteKey): IO[HashLookup] + def listObjects(bucket: Bucket, + prefix: RemoteKey + )(implicit c: Config): IO[S3ObjectsData] - def upload(localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey): IO[Either[Throwable, MD5Hash]] + def upload(localFile: LocalFile, + bucket: Bucket + )(implicit c: Config): IO[UploadS3Action] + + def copy(bucket: Bucket, + sourceKey: RemoteKey, + hash: MD5Hash, + targetKey: RemoteKey + )(implicit c: Config): IO[CopyS3Action] + + def delete(bucket: Bucket, + remoteKey: RemoteKey + )(implicit c: Config): IO[DeleteS3Action] } object S3Client { + def createClient(s3AsyncClient: S3AsyncClient): S3Client = { + new ThorpS3Client(S3CatsIOClient(s3AsyncClient)) + } + val defaultClient: S3Client = - new ThorpS3Client( - S3CatsIOClient(new JavaClientWrapper {}.underlying)) + createClient(new JavaClientWrapper {}.underlying) } \ No newline at end of file diff --git a/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientLogging.scala b/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientLogging.scala new file mode 100644 index 0000000..3fc6aaf --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientLogging.scala @@ -0,0 +1,16 @@ +package net.kemitix.s3thorp.awssdk + +import net.kemitix.s3thorp.{Bucket, Config, LocalFile, Logging} + +trait S3ClientLogging + extends Logging { + + def logUploadStart(localFile: LocalFile, bucket: Bucket) + (implicit c: Config)= + log5(s"s3Client:upload:start: ${localFile.file}") + + def logUploadDone(localFile: LocalFile, bucket: Bucket) + (implicit c: Config) = + log5(s"s3Client:upload:done : ${localFile.file}") + +} diff --git a/src/main/scala/net/kemitix/s3thorp/awssdk/S3ObjectsByHash.scala b/src/main/scala/net/kemitix/s3thorp/awssdk/S3ObjectsByHash.scala new file mode 100644 index 0000000..c439fdd --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/awssdk/S3ObjectsByHash.scala @@ -0,0 +1,15 @@ +package net.kemitix.s3thorp.awssdk + +import net.kemitix.s3thorp.{KeyModified, LastModified, MD5Hash, RemoteKey} +import software.amazon.awssdk.services.s3.model.S3Object + +trait S3ObjectsByHash { + + def byHash(os: Stream[S3Object]): Map[MD5Hash, Set[KeyModified]] = { + val mD5HashToS3Objects: Map[MD5Hash, Stream[S3Object]] = os.groupBy(o => MD5Hash(o.eTag.filter{c => c != '"'})) + val hashToModifieds: Map[MD5Hash, Set[KeyModified]] = + mD5HashToS3Objects.mapValues { os => os.map { o => KeyModified(RemoteKey(o.key), LastModified(o.lastModified())) }.toSet } + hashToModifieds + } + +} diff --git a/src/main/scala/net/kemitix/s3thorp/awssdk/S3ObjectsData.scala b/src/main/scala/net/kemitix/s3thorp/awssdk/S3ObjectsData.scala new file mode 100644 index 0000000..ad5790a --- /dev/null +++ b/src/main/scala/net/kemitix/s3thorp/awssdk/S3ObjectsData.scala @@ -0,0 +1,11 @@ +package net.kemitix.s3thorp.awssdk + +import net.kemitix.s3thorp.{HashModified, KeyModified, MD5Hash, RemoteKey} + +/** + * A list of objects and their MD5 hash values. + */ +case class S3ObjectsData(byHash: Map[MD5Hash, Set[KeyModified]], + byKey: Map[RemoteKey, HashModified]) { + +} diff --git a/src/main/scala/net/kemitix/s3thorp/awssdk/ThorpS3Client.scala b/src/main/scala/net/kemitix/s3thorp/awssdk/ThorpS3Client.scala index 557feb1..811e59b 100644 --- a/src/main/scala/net/kemitix/s3thorp/awssdk/ThorpS3Client.scala +++ b/src/main/scala/net/kemitix/s3thorp/awssdk/ThorpS3Client.scala @@ -2,40 +2,83 @@ package net.kemitix.s3thorp.awssdk import cats.effect.IO import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient -import net.kemitix.s3thorp.Sync._ +import net.kemitix.s3thorp._ import software.amazon.awssdk.core.async.AsyncRequestBody -import software.amazon.awssdk.services.s3.model.{ListObjectsV2Request, PutObjectRequest, S3Object} +import software.amazon.awssdk.services.s3.model.{Bucket => _, _} import scala.collection.JavaConverters._ -private class ThorpS3Client(s3Client: S3CatsIOClient) extends S3Client { +private class ThorpS3Client(s3Client: S3CatsIOClient) + extends S3Client + with S3ObjectsByHash + with Logging { - def upload(localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey): IO[Either[Throwable, MD5Hash]] = { + override def upload(localFile: LocalFile, + bucket: Bucket + )(implicit c: Config): IO[UploadS3Action] = { + log5(s"upload:bucket = ${bucket.name}, localFile = ${localFile.remoteKey}") val request = PutObjectRequest.builder() - .bucket(bucket) - .key(remoteKey) + .bucket(bucket.name) + .key(localFile.remoteKey.key) .build() - val body = AsyncRequestBody.fromFile(localFile) - s3Client.putObject(request, body).map(r => Right(r.eTag())) + val body = AsyncRequestBody.fromFile(localFile.file) + s3Client.putObject(request, body) + .map(_.eTag) + .map(_.filter{c => c != '"'}) + .map(MD5Hash) + .map(md5Hash => UploadS3Action(localFile.remoteKey, md5Hash)) } - private def asHashLookup: Stream[S3Object] => HashLookup = - os => HashLookup(byHash(os), byKey(os)) + override def copy(bucket: Bucket, + sourceKey: RemoteKey, + hash: MD5Hash, + targetKey: RemoteKey + )(implicit c: Config): IO[CopyS3Action] = { + log5(s"copy:bucket = ${bucket.name}, sourceKey = ${sourceKey.key}, targetKey = ${targetKey.key}") + val request = CopyObjectRequest.builder() + .bucket(bucket.name) + .copySource(s"${bucket.name}/${sourceKey.key}") + .copySourceIfMatch(hash.hash) + .key(targetKey.key) + .build() + s3Client.copyObject(request) + .map(_ => CopyS3Action(targetKey)) + } - private def byHash(os: Stream[S3Object]) = - os.map{o => (o.eTag, (o.key, o.lastModified))}.toMap + override def delete(bucket: Bucket, + remoteKey: RemoteKey + )(implicit c: Config): IO[DeleteS3Action] = { + log5(s"delete:bucket = ${bucket.name}, remoteKey = ${remoteKey.key}") + val request = DeleteObjectRequest.builder() + .bucket(bucket.name) + .key(remoteKey.key) + .build() + s3Client.deleteObject(request) + .map(_ => DeleteS3Action(remoteKey)) + } + + private def asS3ObjectsData: Stream[S3Object] => S3ObjectsData = + os => S3ObjectsData(byHash(os), byKey(os)) private def byKey(os: Stream[S3Object]) = - os.map{o => (o.key(), (o.eTag(), o.lastModified()))}.toMap + os.map { o => { + val remoteKey = RemoteKey(o.key) + val hash = MD5Hash(o.eTag().filter { c => c != '"' }) + val lastModified = LastModified(o.lastModified()) + (remoteKey, HashModified(hash, lastModified)) + }}.toMap - def listObjects(bucket: Bucket, prefix: RemoteKey): IO[HashLookup] = { + + def listObjects(bucket: Bucket, prefix: RemoteKey) + (implicit c: Config): IO[S3ObjectsData] = { + log5(s"listObjects:bucket = ${bucket.name}, prefix: ${prefix.key}") val request = ListObjectsV2Request.builder() - .bucket(bucket) - .prefix(prefix) + .bucket(bucket.name) + .prefix(prefix.key) .build() s3Client.listObjectsV2(request) .map(r => r.contents.asScala.toStream) - .map(asHashLookup) + .map(asS3ObjectsData) } } diff --git a/src/test/scala/net/kemitix/s3thorp/ActionGeneratorSuite.scala b/src/test/scala/net/kemitix/s3thorp/ActionGeneratorSuite.scala new file mode 100644 index 0000000..ec7dd00 --- /dev/null +++ b/src/test/scala/net/kemitix/s3thorp/ActionGeneratorSuite.scala @@ -0,0 +1,106 @@ +package net.kemitix.s3thorp + +import java.time.Instant + +import org.scalatest.FunSpec + +class ActionGeneratorSuite + extends UnitTest + with KeyGenerator { + + private val source = Resource(this, "upload") + private val prefix = RemoteKey("prefix") + implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) + private val fileToKey = generateKey(config.source, config.prefix) _ + val lastModified = LastModified(Instant.now()) + + new ActionGenerator { + + describe("create actions") { + + def invoke(input: S3MetaData) = createActions(input).toList + + describe("#1 local exists, remote exists, remote matches - do nothing") { + val theHash = MD5Hash("the-hash") + val theFile = aLocalFile("the-file", theHash, source, fileToKey) + val theRemoteMetadata = RemoteMetaData(theFile.remoteKey, theHash, lastModified) + val input = S3MetaData(theFile, // local exists + matchByHash = Set(theRemoteMetadata), // remote matches + matchByKey = Some(theRemoteMetadata) // remote exists + ) + it("do nothing") { + val expected = List.empty // do nothing + val result = invoke(input) + assertResult(expected)(result) + } + } + describe("#2 local exists, remote is missing, other matches - copy") { + val theHash = MD5Hash("the-hash") + val theFile = aLocalFile("the-file", theHash, source, fileToKey) + val theRemoteKey = theFile.remoteKey + val otherRemoteKey = aRemoteKey(prefix, "other-key") + val otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash, lastModified) + val input = S3MetaData(theFile, // local exists + matchByHash = Set(otherRemoteMetadata), // other matches + matchByKey = None) // remote is missing + it("copy from other key") { + val expected = List(ToCopy(otherRemoteKey, theHash, theRemoteKey)) // copy + val result = invoke(input) + assertResult(expected)(result) + } + } + describe("#3 local exists, remote is missing, other no matches - upload") { + val theHash = MD5Hash("the-hash") + val theFile = aLocalFile("the-file", theHash, source, fileToKey) + val input = S3MetaData(theFile, // local exists + matchByHash = Set.empty, // other no matches + matchByKey = None) // remote is missing + it("upload") { + val expected = List(ToUpload(theFile)) // upload + val result = invoke(input) + assertResult(expected)(result) + } + } + describe("#4 local exists, remote exists, remote no match, other matches - copy") { + val theHash = MD5Hash("the-hash") + val theFile = aLocalFile("the-file", theHash, source, fileToKey) + val theRemoteKey = theFile.remoteKey + val oldHash = MD5Hash("old-hash") + val otherRemoteKey = aRemoteKey(prefix, "other-key") + val otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash, lastModified) + val oldRemoteMetadata = RemoteMetaData(theRemoteKey, + hash = oldHash, // remote no match + lastModified = lastModified) + val input = S3MetaData(theFile, // local exists + matchByHash = Set(otherRemoteMetadata), // other matches + matchByKey = Some(oldRemoteMetadata)) // remote exists + it("copy from other key") { + val expected = List(ToCopy(otherRemoteKey, theHash, theRemoteKey)) // copy + val result = invoke(input) + assertResult(expected)(result) + } + } + describe("#5 local exists, remote exists, remote no match, other no matches - upload") { + val theHash = MD5Hash("the-hash") + val theFile = aLocalFile("the-file", theHash, source, fileToKey) + val theRemoteKey = theFile.remoteKey + val oldHash = MD5Hash("old-hash") + val theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash, lastModified) + val input = S3MetaData(theFile, // local exists + matchByHash = Set.empty, // remote no match, other no match + matchByKey = Some(theRemoteMetadata) // remote exists + ) + it("upload") { + val expected = List(ToUpload(theFile)) // upload + val result = invoke(input) + assertResult(expected)(result) + } + } + describe("#6 local missing, remote exists - delete") { + it("TODO") { + pending + } + } + } + } +} diff --git a/src/test/scala/net/kemitix/s3thorp/DummyS3Client.scala b/src/test/scala/net/kemitix/s3thorp/DummyS3Client.scala index f53c234..4bced54 100644 --- a/src/test/scala/net/kemitix/s3thorp/DummyS3Client.scala +++ b/src/test/scala/net/kemitix/s3thorp/DummyS3Client.scala @@ -1,12 +1,26 @@ package net.kemitix.s3thorp import cats.effect.IO -import net.kemitix.s3thorp.Sync.{Bucket, LocalFile, MD5Hash, RemoteKey} -import net.kemitix.s3thorp.awssdk.{HashLookup, S3Client} +import net.kemitix.s3thorp.awssdk.{S3ObjectsData, S3Client} trait DummyS3Client extends S3Client { - override def upload(localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey): IO[Either[Throwable, MD5Hash]] = ??? + override def upload(localFile: LocalFile, + bucket: Bucket + )(implicit c: Config): IO[UploadS3Action] = ??? + + override def copy(bucket: Bucket, + sourceKey: RemoteKey, + hash: MD5Hash, + targetKey: RemoteKey + )(implicit c: Config): IO[CopyS3Action] = ??? + + override def delete(bucket: Bucket, + remoteKey: RemoteKey + )(implicit c: Config): IO[DeleteS3Action] = ??? + + override def listObjects(bucket: Bucket, + prefix: RemoteKey + )(implicit c: Config): IO[S3ObjectsData] = ??? - override def listObjects(bucket: Bucket, prefix: RemoteKey): IO[HashLookup] = ??? } diff --git a/src/test/scala/net/kemitix/s3thorp/KeyGeneratorSuite.scala b/src/test/scala/net/kemitix/s3thorp/KeyGeneratorSuite.scala new file mode 100644 index 0000000..d4b4b75 --- /dev/null +++ b/src/test/scala/net/kemitix/s3thorp/KeyGeneratorSuite.scala @@ -0,0 +1,35 @@ +package net.kemitix.s3thorp + +import java.io.File + +import org.scalatest.FunSpec + +class KeyGeneratorSuite extends FunSpec { + + new KeyGenerator { + private val source: File = Resource(this, "upload") + private val prefix = RemoteKey("prefix") + implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) + private val fileToKey = generateKey(config.source, config.prefix) _ + + describe("key generator") { + def resolve(subdir: String): File = { + source.toPath.resolve(subdir).toFile + } + + describe("when file is within source") { + it("has a valid key") { + val subdir = "subdir" + assertResult(RemoteKey(s"${prefix.key}/$subdir"))(fileToKey(resolve(subdir))) + } + } + + describe("when file is deeper within source") { + it("has a valid key") { + val subdir = "subdir/deeper/still" + assertResult(RemoteKey(s"${prefix.key}/$subdir"))(fileToKey(resolve(subdir))) + } + } + } + } +} \ No newline at end of file diff --git a/src/test/scala/net/kemitix/s3thorp/LocalFileStreamSuite.scala b/src/test/scala/net/kemitix/s3thorp/LocalFileStreamSuite.scala index 4f59dcf..a455d76 100644 --- a/src/test/scala/net/kemitix/s3thorp/LocalFileStreamSuite.scala +++ b/src/test/scala/net/kemitix/s3thorp/LocalFileStreamSuite.scala @@ -1,16 +1,15 @@ package net.kemitix.s3thorp -import java.io.File - import org.scalatest.FunSpec class LocalFileStreamSuite extends FunSpec with LocalFileStream { - describe("streamDirectoryPaths") { - var uploadResource = Resource(this, "upload") + describe("findFiles") { + val uploadResource = Resource(this, "upload") + val config: Config = Config(source = uploadResource) it("should find all files") { - val result: Set[String] = streamDirectoryPaths(uploadResource).toSet - .map { x: File => uploadResource.toPath.relativize(x.toPath).toString } + val result: Set[String] = findFiles(uploadResource)(config).toSet + .map { x: LocalFile => x.relative.toString } assertResult(Set("subdir/leaf-file", "root-file"))(result) } } diff --git a/src/test/scala/net/kemitix/s3thorp/Resource.scala b/src/test/scala/net/kemitix/s3thorp/Resource.scala index 57d14c4..8631dfb 100644 --- a/src/test/scala/net/kemitix/s3thorp/Resource.scala +++ b/src/test/scala/net/kemitix/s3thorp/Resource.scala @@ -7,7 +7,8 @@ import scala.util.Try object Resource { def apply(base: AnyRef, - name: String): File = + name: String): File = { Try(new File(base.getClass.getResource(name).getPath)) .getOrElse(throw new FileNotFoundException(name)) + } } diff --git a/src/test/scala/net/kemitix/s3thorp/S3ActionSuite.scala b/src/test/scala/net/kemitix/s3thorp/S3ActionSuite.scala new file mode 100644 index 0000000..94ce742 --- /dev/null +++ b/src/test/scala/net/kemitix/s3thorp/S3ActionSuite.scala @@ -0,0 +1,19 @@ +package net.kemitix.s3thorp + +class S3ActionSuite extends UnitTest { + + describe("Ordering of types") { + val remoteKey = RemoteKey("remote-key") + val md5Hash = MD5Hash("md5hash") + val copy = CopyS3Action(remoteKey) + val upload = UploadS3Action(remoteKey, md5Hash) + val delete = DeleteS3Action(remoteKey) + val unsorted = List(delete, copy, upload) + it("should sort as copy < upload < delete ") { + val result = unsorted.sorted + val expected = List(copy, upload, delete) + assertResult(expected)(result) + } + } + +} diff --git a/src/test/scala/net/kemitix/s3thorp/S3MetaDataEnricherSuite.scala b/src/test/scala/net/kemitix/s3thorp/S3MetaDataEnricherSuite.scala index 8ea5445..37d1ea7 100644 --- a/src/test/scala/net/kemitix/s3thorp/S3MetaDataEnricherSuite.scala +++ b/src/test/scala/net/kemitix/s3thorp/S3MetaDataEnricherSuite.scala @@ -1,69 +1,133 @@ package net.kemitix.s3thorp -import java.io.File -import java.nio.file.Paths import java.time.Instant -import net.kemitix.s3thorp.awssdk.HashLookup -import org.scalatest.FunSpec +import net.kemitix.s3thorp.awssdk.S3ObjectsData -class S3MetaDataEnricherSuite extends FunSpec { +class S3MetaDataEnricherSuite + extends UnitTest + with KeyGenerator { - private val sourcePath = "/root/from/here/" - private val source = Paths.get(sourcePath).toFile - private val prefix = "prefix" - private val config = Config("bucket", prefix, source = source) - - new S3MetaDataEnricher with DummyS3Client { - describe("key generator") { - val subject = generateKey(config)_ - - def resolve(subdir: String): File = { - source.toPath.resolve(subdir).toFile - } - - describe("when file is within source") { - it("has a valid key") { - val subdir = "subdir" - assertResult(s"$prefix/$subdir")(subject(resolve(subdir))) - } - } - - describe("when file is deeper within source") { - it("has a valid key") { - val subdir = "subdir/deeper/still" - assertResult(s"$prefix/$subdir")(subject(resolve(subdir))) - } - } - } - } + private val source = Resource(this, "upload") + private val prefix = RemoteKey("prefix") + implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) + private val fileToKey = generateKey(config.source, config.prefix) _ + val lastModified = LastModified(Instant.now()) describe("enrich with metadata") { - val local = "localFile" - val fileWithRemote = new File(sourcePath + local) - val fileWithNoRemote = new File(sourcePath + "noRemote") - val remoteKey = prefix + "/" + local - val hash = "hash" - val lastModified = Instant.now() - val hashLookup = HashLookup( - byHash = Map(hash -> (remoteKey, lastModified)), - byKey = Map(remoteKey -> (hash, lastModified)) - ) - describe("when remote exists") { - new S3MetaDataEnricher with DummyS3Client { - it("returns metadata") { - val expectedMetadata = S3MetaData(fileWithRemote, remoteKey, hash, lastModified) + new S3MetaDataEnricher with DummyS3Client { - val result = enrichWithS3MetaData(config)(hashLookup)(fileWithRemote) - assertResult(Right(expectedMetadata))(result) + describe("#1a local exists, remote exists, remote matches, other matches - do nothing") { + val theHash: MD5Hash = MD5Hash("the-file-hash") + val theFile: LocalFile = aLocalFile("the-file", theHash, source, fileToKey) + val theRemoteKey: RemoteKey = theFile.remoteKey + implicit val s3: S3ObjectsData = S3ObjectsData( + byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))), + byKey = Map(theRemoteKey -> HashModified(theHash, lastModified)) + ) + val theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash, lastModified) + it("generates valid metadata") { + val expected = S3MetaData(theFile, + matchByHash = Set(theRemoteMetadata), + matchByKey = Some(theRemoteMetadata)) + val result = getMetadata(theFile) + assertResult(expected)(result) } } - } - describe("when remote doesn't exist") { - new S3MetaDataEnricher with DummyS3Client { - it("returns file to upload") { - val result = enrichWithS3MetaData(config)(hashLookup)(fileWithNoRemote) - assertResult(Left(fileWithNoRemote))(result) + describe("#1b local exists, remote exists, remote matches, other no matches - do nothing") { + val theHash: MD5Hash = MD5Hash("the-file-hash") + val theFile: LocalFile = aLocalFile("the-file", theHash, source, fileToKey) + val theRemoteKey: RemoteKey = aRemoteKey(prefix, "the-file") + implicit val s3: S3ObjectsData = S3ObjectsData( + byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))), + byKey = Map(theRemoteKey -> HashModified(theHash, lastModified)) + ) + val theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash, lastModified) + it("generates valid metadata") { + val expected = S3MetaData(theFile, + matchByHash = Set(theRemoteMetadata), + matchByKey = Some(theRemoteMetadata)) + val result = getMetadata(theFile) + assertResult(expected)(result) + } + } + describe("#2 local exists, remote is missing, remote no match, other matches - copy") { + val theHash = MD5Hash("the-hash") + val theFile = aLocalFile("the-file", theHash, source, fileToKey) + val otherRemoteKey = RemoteKey("other-key") + implicit val s3: S3ObjectsData = S3ObjectsData( + byHash = Map(theHash -> Set(KeyModified(otherRemoteKey, lastModified))), + byKey = Map(otherRemoteKey -> HashModified(theHash, lastModified)) + ) + val otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash, lastModified) + it("generates valid metadata") { + val expected = S3MetaData(theFile, + matchByHash = Set(otherRemoteMetadata), + matchByKey = None) + val result = getMetadata(theFile) + assertResult(expected)(result) + } + } + describe("#3 local exists, remote is missing, remote no match, other no matches - upload") { + val theHash = MD5Hash("the-hash") + val theFile = aLocalFile("the-file", theHash, source, fileToKey) + implicit val s3: S3ObjectsData = S3ObjectsData( + byHash = Map(), + byKey = Map() + ) + it("generates valid metadata") { + val expected = S3MetaData(theFile, + matchByHash = Set.empty, + matchByKey = None) + val result = getMetadata(theFile) + assertResult(expected)(result) + } + } + describe("#4 local exists, remote exists, remote no match, other matches - copy") { + val theHash = MD5Hash("the-hash") + val theFile = aLocalFile("the-file", theHash, source, fileToKey) + val theRemoteKey = theFile.remoteKey + val oldHash = MD5Hash("old-hash") + val otherRemoteKey = aRemoteKey(prefix, "other-key") + implicit val s3: S3ObjectsData = S3ObjectsData( + byHash = Map( + oldHash -> Set(KeyModified(theRemoteKey, lastModified)), + theHash -> Set(KeyModified(otherRemoteKey, lastModified))), + byKey = Map( + theRemoteKey -> HashModified(oldHash, lastModified), + otherRemoteKey -> HashModified(theHash, lastModified) + ) + ) + val theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash, lastModified) + val otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash, lastModified) + it("generates valid metadata") { + val expected = S3MetaData(theFile, + matchByHash = Set(otherRemoteMetadata), + matchByKey = Some(theRemoteMetadata)) + val result = getMetadata(theFile) + assertResult(expected)(result) + } + } + describe("#5 local exists, remote exists, remote no match, other no matches - upload") { + val theHash = MD5Hash("the-hash") + val theFile = aLocalFile("the-file", theHash, source, fileToKey) + val theRemoteKey = theFile.remoteKey + val oldHash = MD5Hash("old-hash") + implicit val s3: S3ObjectsData = S3ObjectsData( + byHash = Map( + oldHash -> Set(KeyModified(theRemoteKey, lastModified)), + theHash -> Set.empty), + byKey = Map( + theRemoteKey -> HashModified(oldHash, lastModified) + ) + ) + val theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash, lastModified) + it("generates valid metadata") { + val expected = S3MetaData(theFile, + matchByHash = Set.empty, + matchByKey = Some(theRemoteMetadata)) + val result = getMetadata(theFile) + assertResult(expected)(result) } } } diff --git a/src/test/scala/net/kemitix/s3thorp/SyncSuite.scala b/src/test/scala/net/kemitix/s3thorp/SyncSuite.scala index f2cf7b9..ccfed62 100644 --- a/src/test/scala/net/kemitix/s3thorp/SyncSuite.scala +++ b/src/test/scala/net/kemitix/s3thorp/SyncSuite.scala @@ -2,83 +2,215 @@ package net.kemitix.s3thorp import java.io.File import java.time.Instant +import java.util.concurrent.CompletableFuture import cats.effect.IO -import net.kemitix.s3thorp.Sync.{Bucket, LocalFile, MD5Hash, RemoteKey} -import net.kemitix.s3thorp.awssdk.{HashLookup, S3Client} -import org.scalatest.FunSpec +import net.kemitix.s3thorp.awssdk.{S3Client, S3ObjectsData} +import com.github.j5ik2o.reactive.aws.s3.S3AsyncClient +import software.amazon.awssdk.core.async.AsyncRequestBody +import software.amazon.awssdk.services.s3.{S3AsyncClient => JavaS3AsyncClient} +import software.amazon.awssdk.services.s3 +import software.amazon.awssdk.services.s3.model.{ListObjectsV2Request, ListObjectsV2Response, PutObjectRequest, PutObjectResponse} -class SyncSuite extends FunSpec { +class SyncSuite + extends UnitTest + with KeyGenerator { + + private val source = Resource(this, "upload") + private val prefix = RemoteKey("prefix") + implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) + private val lastModified = LastModified(Instant.now) describe("s3client thunk") { - val testBucket = "bucket" - val testRemoteKey = "prefix/file" + val testBucket = Bucket("bucket") + val prefix = RemoteKey("prefix") + val source = new File("/") describe("upload") { - val md5Hash = "the-hash" - val testLocalFile = new File("file") + val md5Hash = MD5Hash("the-hash") + val testLocalFile = aLocalFile("file", md5Hash, source, generateKey(source, prefix)) val sync = new Sync(new S3Client with DummyS3Client { - override def upload(localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey): IO[Either[Throwable, MD5Hash]] = { - assert(localFile == testLocalFile) + override def upload(localFile: LocalFile, bucket: Bucket)(implicit c: Config) = IO { assert(bucket == testBucket) - assert(remoteKey == testRemoteKey) - IO(Right(md5Hash)) + UploadS3Action(localFile.remoteKey, md5Hash) } }) it("delegates unmodified to the S3Client") { - assertResult(Right(md5Hash))( - sync.upload(testLocalFile, testBucket, testRemoteKey). + assertResult(UploadS3Action(RemoteKey(prefix.key + "/file"), md5Hash))( + sync.upload(testLocalFile, testBucket). unsafeRunSync()) } } } describe("run") { - val testBucket = "bucket" + val testBucket = Bucket("bucket") val source = Resource(this, "upload") // source contains the files root-file and subdir/leaf-file - val config = Config("bucket", "prefix", source = source) + val config = Config(Bucket("bucket"), RemoteKey("prefix"), source = source) + val rootRemoteKey = RemoteKey("prefix/root-file") + val leafRemoteKey = RemoteKey("prefix/subdir/leaf-file") describe("when all files should be uploaded") { - var uploadsRecord: Map[String, RemoteKey] = Map() - val sync = new Sync(new DummyS3Client{ - override def listObjects(bucket: Bucket, prefix: RemoteKey) = IO( - HashLookup( - byHash = Map(), - byKey = Map())) - override def upload(localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey) = { - if (bucket == testBucket) - uploadsRecord += (source.toPath.relativize(localFile.toPath).toString -> remoteKey) - IO(Right("some hash value")) - } - }) + val sync = new RecordingSync(testBucket, new DummyS3Client {}, S3ObjectsData( + byHash = Map(), + byKey = Map())) + sync.run(config).unsafeRunSync it("uploads all files") { - sync.run(config).unsafeRunSync - val expected = Map( - "subdir/leaf-file" -> "prefix/subdir/leaf-file", - "root-file" -> "prefix/root-file" + val expectedUploads = Map( + "subdir/leaf-file" -> leafRemoteKey, + "root-file" -> rootRemoteKey ) - assertResult(expected)(uploadsRecord) + assertResult(expectedUploads)(sync.uploadsRecord) + } + it("copies nothing") { + val expectedCopies = Map() + assertResult(expectedCopies)(sync.copiesRecord) + } + it("deletes nothing") { + val expectedDeletions = Set() + assertResult(expectedDeletions)(sync.deletionsRecord) } } describe("when no files should be uploaded") { - val rootHash = "a3a6ac11a0eb577b81b3bb5c95cc8a6e" - val leafHash = "208386a650bdec61cfcd7bd8dcb6b542" - val lastModified = Instant.now - var uploadsRecord: Map[String, RemoteKey] = Map() - val sync = new Sync(new S3Client with DummyS3Client { - override def listObjects(bucket: Bucket, prefix: RemoteKey) = IO( - HashLookup( - byHash = Map(rootHash -> ("prefix/root-file", lastModified), leafHash -> ("prefix/subdir/leaf-file", lastModified)), - byKey = Map("prefix/root-file" -> (rootHash, lastModified), "prefix/subdir/leaf-file" -> (leafHash, lastModified)))) - override def upload(localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey) = { - if (bucket == testBucket) - uploadsRecord += (source.toPath.relativize(localFile.toPath).toString -> remoteKey) - IO(Right("some hash value")) - } - }) + val rootHash = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e") + val leafHash = MD5Hash("208386a650bdec61cfcd7bd8dcb6b542") + val s3ObjectsData = S3ObjectsData( + byHash = Map( + rootHash -> Set(KeyModified(RemoteKey("prefix/root-file"), lastModified)), + leafHash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))), + byKey = Map( + RemoteKey("prefix/root-file") -> HashModified(rootHash, lastModified), + RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified))) + val sync = new RecordingSync(testBucket, new DummyS3Client {}, s3ObjectsData) + sync.run(config).unsafeRunSync it("uploads nothing") { - sync.run(config).unsafeRunSync - val expected = Map() - assertResult(expected)(uploadsRecord) + val expectedUploads = Map() + assertResult(expectedUploads)(sync.uploadsRecord) + } + it("copies nothing") { + val expectedCopies = Map() + assertResult(expectedCopies)(sync.copiesRecord) + } + it("deletes nothing") { + val expectedDeletions = Set() + assertResult(expectedDeletions)(sync.deletionsRecord) + } + } + describe("when a file is renamed it is moved on S3 with no upload") { + // 'root-file-old' should be renamed as 'root-file' + val rootHash = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e") + val leafHash = MD5Hash("208386a650bdec61cfcd7bd8dcb6b542") + val s3ObjectsData = S3ObjectsData( + byHash = Map( + rootHash -> Set(KeyModified(RemoteKey("prefix/root-file-old"), lastModified)), + leafHash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))), + byKey = Map( + RemoteKey("prefix/root-file-old") -> HashModified(rootHash, lastModified), + RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified))) + val sync = new RecordingSync(testBucket, new DummyS3Client {}, s3ObjectsData) + sync.run(config).unsafeRunSync + it("uploads nothing") { + val expectedUploads = Map() + assertResult(expectedUploads)(sync.uploadsRecord) + } + it("copies the file") { + val expectedCopies = Map(RemoteKey("prefix/root-file-old") -> RemoteKey("prefix/root-file")) + assertResult(expectedCopies)(sync.copiesRecord) + } + it("deletes the original") { + val expectedDeletions = Set(RemoteKey("prefix/root-file-old")) + assertResult(expectedDeletions)(sync.deletionsRecord) + } + } + describe("when a file is copied it is copied on S3 with no upload") { + it("TODO") { + pending + } + } + describe("when a file is deleted locally it is deleted from S3") { + val deletedHash = MD5Hash("deleted-hash") + val deletedKey = RemoteKey("prefix/deleted-file") + val s3ObjectsData = S3ObjectsData( + byHash = Map( + deletedHash -> Set(KeyModified(RemoteKey("prefix/deleted-file"), lastModified))), + byKey = Map( + deletedKey -> HashModified(deletedHash, lastModified))) + val sync = new RecordingSync(testBucket, new DummyS3Client {}, s3ObjectsData) + sync.run(config).unsafeRunSync + it("deleted key") { + val expectedDeletions = Set(deletedKey) + assertResult(expectedDeletions)(sync.deletionsRecord) + } + } + describe("io actions execute") { + val recordingS3Client = new RecordingS3Client + val client = S3Client.createClient(recordingS3Client) + val sync = new Sync(client) + sync.run(config).unsafeRunSync + it("invokes the underlying Java s3client") { + val expected = Set( + PutObjectRequest.builder().bucket(testBucket.name).key(rootRemoteKey.key).build(), + PutObjectRequest.builder().bucket(testBucket.name).key(leafRemoteKey.key).build() + ) + val result = recordingS3Client.puts + assertResult(expected)(result) } } } + + class RecordingSync(testBucket: Bucket, s3Client: S3Client, s3ObjectsData: S3ObjectsData) + extends Sync(s3Client) { + + var uploadsRecord: Map[String, RemoteKey] = Map() + var copiesRecord: Map[RemoteKey, RemoteKey] = Map() + var deletionsRecord: Set[RemoteKey] = Set() + + override def listObjects(bucket: Bucket, prefix: RemoteKey)(implicit c: Config) = IO {s3ObjectsData} + + override def upload(localFile: LocalFile, + bucket: Bucket + )(implicit c: Config) = IO { + if (bucket == testBucket) + uploadsRecord += (localFile.relative.toString -> localFile.remoteKey) + UploadS3Action(localFile.remoteKey, MD5Hash("some hash value")) + } + + override def copy(bucket: Bucket, + sourceKey: RemoteKey, + hash: MD5Hash, + targetKey: RemoteKey + )(implicit c: Config) = IO { + if (bucket == testBucket) + copiesRecord += (sourceKey -> targetKey) + CopyS3Action(targetKey) + } + + override def delete(bucket: Bucket, + remoteKey: RemoteKey + )(implicit c: Config) = IO { + if (bucket == testBucket) + deletionsRecord += remoteKey + DeleteS3Action(remoteKey) + } + } + + class RecordingS3Client extends S3AsyncClient { + var lists: Set[ListObjectsV2Request] = Set() + var puts: Set[PutObjectRequest] = Set() + override val underlying: s3.S3AsyncClient = new JavaS3AsyncClient { + override def serviceName(): String = "s3Recorder" + + override def close(): Unit = () + + override def listObjectsV2(listObjectsV2Request: ListObjectsV2Request): CompletableFuture[ListObjectsV2Response] = { + lists += listObjectsV2Request + CompletableFuture.completedFuture(ListObjectsV2Response.builder().build()) + } + + override def putObject(putObjectRequest: PutObjectRequest, + requestBody: AsyncRequestBody): CompletableFuture[PutObjectResponse] = { + puts += putObjectRequest + CompletableFuture.completedFuture(PutObjectResponse.builder().eTag("not-null").build()) + } + + } + } } diff --git a/src/test/scala/net/kemitix/s3thorp/UnitTest.scala b/src/test/scala/net/kemitix/s3thorp/UnitTest.scala new file mode 100644 index 0000000..53befe2 --- /dev/null +++ b/src/test/scala/net/kemitix/s3thorp/UnitTest.scala @@ -0,0 +1,17 @@ +package net.kemitix.s3thorp + +import java.io.File + +import org.scalatest.FunSpec + +abstract class UnitTest extends FunSpec { + + def aLocalFile(path: String, myHash: MD5Hash, source: File, fileToKey: File => RemoteKey): LocalFile = + new LocalFile(source.toPath.resolve(path).toFile, source, fileToKey) { + override def hash: MD5Hash = myHash + } + + def aRemoteKey(prefix: RemoteKey, path: String): RemoteKey = + RemoteKey(prefix.key + "/" + path) + +} diff --git a/src/test/scala/net/kemitix/s3thorp/UploadSelectionFilterSuite.scala b/src/test/scala/net/kemitix/s3thorp/UploadSelectionFilterSuite.scala deleted file mode 100644 index edae2e9..0000000 --- a/src/test/scala/net/kemitix/s3thorp/UploadSelectionFilterSuite.scala +++ /dev/null @@ -1,39 +0,0 @@ -package net.kemitix.s3thorp - -import java.io.File -import java.time.Instant - -import org.scalatest.FunSpec - -class UploadSelectionFilterSuite extends FunSpec { - - new UploadSelectionFilter { - describe("uploadRequiredFilter") { - val localFile = Resource(this, "test-file-for-hash.txt") - val localHash = "0cbfe978783bd7950d5da4ff85e4af37" - val config = Config("bucket", "prefix", source = localFile.getParentFile) - def invokeSubject(input: Either[File, S3MetaData]) = - uploadRequiredFilter(config)(input).toList - describe("when supplied a file") { - val input = Left(localFile) - it("should be marked for upload") { - assertResult(List(localFile))(invokeSubject(input)) - } - } - describe("when supplied S3MetaData") { - describe("when hash is different") { - val input = Right(S3MetaData(localFile, "", "doesn't match any hash", Instant.now)) - it("should be marked for upload") { - assertResult(List(localFile))(invokeSubject(input)) - } - } - describe("when hash is the same") { - val input = Right(S3MetaData(localFile, "", localHash, Instant.now)) - it("should not be marked for upload") { - assertResult(List())(invokeSubject(input)) - } - } - } - } - } -} diff --git a/src/test/scala/net/kemitix/s3thorp/awssdk/S3ClientSuite.scala b/src/test/scala/net/kemitix/s3thorp/awssdk/S3ClientSuite.scala index e28670b..de38611 100644 --- a/src/test/scala/net/kemitix/s3thorp/awssdk/S3ClientSuite.scala +++ b/src/test/scala/net/kemitix/s3thorp/awssdk/S3ClientSuite.scala @@ -5,55 +5,92 @@ import java.time.Instant import cats.effect.IO import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient -import net.kemitix.s3thorp.Sync.{Bucket, LocalFile, RemoteKey} -import org.scalatest.FunSpec -import software.amazon.awssdk.services.s3.model._ +import net.kemitix.s3thorp._ +import software.amazon.awssdk.services.s3.model.{PutObjectRequest, PutObjectResponse} -class S3ClientSuite extends FunSpec { +class S3ClientSuite + extends UnitTest + with KeyGenerator { - describe("objectHead") { - val key = "key" - val hash = "hash" - val lastModified = Instant.now - val hashLookup: HashLookup = HashLookup( - byHash = Map(hash -> (key, lastModified)), - byKey = Map(key -> (hash, lastModified))) + val source = Resource(this, "../upload") - def invoke(self: S3Client, remoteKey: RemoteKey) = { - self.objectHead(remoteKey)(hashLookup) + private val prefix = RemoteKey("prefix") + implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source) + private val fileToKey = generateKey(config.source, config.prefix) _ + + describe("getS3Status") { + val hash = MD5Hash("hash") + val localFile = aLocalFile("the-file", hash, source, fileToKey) + val key = localFile.remoteKey + val keyotherkey = aLocalFile("other-key-same-hash", hash, source, fileToKey) + val diffhash = MD5Hash("diff") + val keydiffhash = aLocalFile("other-key-diff-hash", diffhash, source, fileToKey) + val lastModified = LastModified(Instant.now) + val s3ObjectsData: S3ObjectsData = S3ObjectsData( + byHash = Map( + hash -> Set(KeyModified(key, lastModified), KeyModified(keyotherkey.remoteKey, lastModified)), + diffhash -> Set(KeyModified(keydiffhash.remoteKey, lastModified))), + byKey = Map( + key -> HashModified(hash, lastModified), + keyotherkey.remoteKey -> HashModified(hash, lastModified), + keydiffhash.remoteKey -> HashModified(diffhash, lastModified))) + + def invoke(self: S3Client, localFile: LocalFile) = { + self.getS3Status(localFile)(s3ObjectsData) } describe("when remote key exists") { val s3Client = S3Client.defaultClient - it("should return Some(expected values)") { - assertResult(Some((hash, lastModified)))(invoke(s3Client, key)) + it("should return (Some, Set.nonEmpty)") { + assertResult( + (Some(HashModified(hash, lastModified)), + Set( + KeyModified(key, lastModified), + KeyModified(keyotherkey.remoteKey, lastModified))) + )(invoke(s3Client, localFile)) } } - describe("when remote key does not exist") { + describe("when remote key does not exist and no others matches hash") { val s3Client = S3Client.defaultClient - it("should return None") { - assertResult(None)(invoke(s3Client, "missing-key")) + it("should return (None, Set.empty)") { + val localFile = aLocalFile("missing-file", MD5Hash("unique"), source, fileToKey) + assertResult( + (None, + Set.empty) + )(invoke(s3Client, localFile)) + } + } + + describe("when remote key exists and no others match hash") { + val s3Client = S3Client.defaultClient + it("should return (None, Set.nonEmpty)") { + assertResult( + (Some(HashModified(diffhash, lastModified)), + Set(KeyModified(keydiffhash.remoteKey, lastModified))) + )(invoke(s3Client, keydiffhash)) } } } describe("upload") { - def invoke(s3Client: ThorpS3Client, localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey) = - s3Client.upload(localFile, bucket, remoteKey).unsafeRunSync + def invoke(s3Client: ThorpS3Client, localFile: LocalFile, bucket: Bucket) = + s3Client.upload(localFile, bucket).unsafeRunSync describe("when uploading a file") { - val md5Hash = "the-md5hash" + val md5Hash = MD5Hash("the-md5hash") val s3Client = new ThorpS3Client( new S3CatsIOClient with JavaClientWrapper { override def putObject(putObjectRequest: PutObjectRequest, requestBody: RB) = - IO(PutObjectResponse.builder().eTag(md5Hash).build()) + IO(PutObjectResponse.builder().eTag(md5Hash.hash).build()) }) - val localFile: LocalFile = new File("/some/file") - val bucket: Bucket = "a-bucket" - val remoteKey: RemoteKey = "prefix/file" + val source = new File("/") + val prefix = RemoteKey("prefix") + val localFile: LocalFile = aLocalFile("/some/file", md5Hash, source, generateKey(source, prefix)) + val bucket: Bucket = Bucket("a-bucket") + val remoteKey: RemoteKey = RemoteKey("prefix/some/file") it("should return hash of uploaded file") { - assertResult(Right(md5Hash))(invoke(s3Client, localFile, bucket, remoteKey)) + assertResult(UploadS3Action(remoteKey, md5Hash))(invoke(s3Client, localFile, bucket)) } } } diff --git a/src/test/scala/net/kemitix/s3thorp/awssdk/S3ObjectsByHashSuite.scala b/src/test/scala/net/kemitix/s3thorp/awssdk/S3ObjectsByHashSuite.scala new file mode 100644 index 0000000..8caea29 --- /dev/null +++ b/src/test/scala/net/kemitix/s3thorp/awssdk/S3ObjectsByHashSuite.scala @@ -0,0 +1,36 @@ +package net.kemitix.s3thorp.awssdk + +import java.time.Instant + +import net.kemitix.s3thorp.{KeyModified, LastModified, MD5Hash, RemoteKey, UnitTest} +import software.amazon.awssdk.services.s3.model.S3Object + +class S3ObjectsByHashSuite extends UnitTest { + + new S3ObjectsByHash { + describe("grouping s3 object together by their hash values") { + val hash = MD5Hash("hash") + val key1 = RemoteKey("key-1") + val key2 = RemoteKey("key-2") + val lastModified = LastModified(Instant.now) + val o1 = s3object(hash, key1, lastModified) + val o2 = s3object(hash, key2, lastModified) + val os = Stream(o1, o2) + it("should group by the hash value") { + val expected: Map[MD5Hash, Set[KeyModified]] = Map( + hash -> Set(KeyModified(key1, lastModified), KeyModified(key2, lastModified)) + ) + val result = byHash(os) + assertResult(expected)(result) + } + } + } + + private def s3object(md5Hash: MD5Hash, remoteKey: RemoteKey, lastModified: LastModified): S3Object = + S3Object.builder + .eTag(md5Hash.hash) + .key(remoteKey.key) + .lastModified(lastModified.when) + .build + +} diff --git a/src/test/scala/net/kemitix/s3thorp/awssdk/ThorpS3ClientSuite.scala b/src/test/scala/net/kemitix/s3thorp/awssdk/ThorpS3ClientSuite.scala index 5f8a8d3..7eb29d9 100644 --- a/src/test/scala/net/kemitix/s3thorp/awssdk/ThorpS3ClientSuite.scala +++ b/src/test/scala/net/kemitix/s3thorp/awssdk/ThorpS3ClientSuite.scala @@ -1,11 +1,13 @@ package net.kemitix.s3thorp.awssdk import java.time.Instant -import scala.collection.JavaConverters._ +import java.time.temporal.ChronoUnit +import scala.collection.JavaConverters._ import cats.effect.IO import com.github.j5ik2o.reactive.aws.s3.S3AsyncClient import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient +import net.kemitix.s3thorp.{Bucket, Config, HashModified, KeyModified, LastModified, MD5Hash, Main, RemoteKey, Resource} import org.scalatest.FunSpec import software.amazon.awssdk.services.s3 import software.amazon.awssdk.services.s3.model.{ListObjectsV2Request, ListObjectsV2Response, S3Object} @@ -13,37 +15,55 @@ import software.amazon.awssdk.services.s3.model.{ListObjectsV2Request, ListObjec class ThorpS3ClientSuite extends FunSpec { describe("listObjectsInPrefix") { - val h1 = "hash1" - val k1 = "key1" - val lm1 = Instant.now - val o1 = S3Object.builder.eTag(h1).key(k1).lastModified(lm1).build - val h2 = "hash2" - val k2 = "key2" - val lm2 = Instant.now.minusSeconds(200) - val o2 = S3Object.builder.eTag(h2).key(k2).lastModified(lm2).build - val myFakeResponse: IO[ListObjectsV2Response] = IO{ + val source = Resource(Main, "upload") + val prefix = RemoteKey("prefix") + implicit val config: Config = Config(Bucket("bucket"), prefix, source = source) + + val lm = LastModified(Instant.now) + + val h1 = MD5Hash("hash1") + + val k1a = RemoteKey("key1a") + val o1a = S3Object.builder.eTag(h1.hash).key(k1a.key).lastModified(lm.when).build + + val k1b = RemoteKey("key1b") + val o1b = S3Object.builder.eTag(h1.hash).key(k1b.key).lastModified(lm.when).build + + val h2 = MD5Hash("hash2") + val k2 = RemoteKey("key2") + val o2 = S3Object.builder.eTag(h2.hash).key(k2.key).lastModified(lm.when).build + + val myFakeResponse: IO[ListObjectsV2Response] = IO { ListObjectsV2Response.builder() - .contents(List(o1, o2).asJava) + .contents(List(o1a, o1b, o2).asJava) .build() } - val subject = new ThorpS3Client(new S3CatsIOClient { - override val underlying: S3AsyncClient = new S3AsyncClient { - override val underlying: s3.S3AsyncClient = new s3.S3AsyncClient { - override def serviceName(): String = "fake-s3-client" - - override def close(): Unit = () - } - } - override def listObjectsV2(listObjectsV2Request: ListObjectsV2Request) = + val s3client = new ThorpS3Client(new MyS3CatsIOClient { + override def listObjectsV2(listObjectsV2Request: ListObjectsV2Request): IO[ListObjectsV2Response] = myFakeResponse }) - it("should build list of hash lookups") { - val result: HashLookup = subject.listObjects("bucket", "prefix").unsafeRunSync() - val expected = HashLookup( - Map(h1 -> (k1, lm1), h2 -> (k2, lm2)), - Map(k1 -> (h1, lm1), k2 -> (h2, lm2))) + it("should build list of hash lookups, with duplicate objects grouped by hash") { + val expected = S3ObjectsData( + byHash = Map( + h1 -> Set(KeyModified(k1a, lm), KeyModified(k1b, lm)), + h2 -> Set(KeyModified(k2, lm))), + byKey = Map( + k1a -> HashModified(h1, lm), + k1b -> HashModified(h1, lm), + k2 -> HashModified(h2, lm))) + val result: S3ObjectsData = s3client.listObjects(Bucket("bucket"), RemoteKey("prefix")).unsafeRunSync() + assertResult(expected.byHash.keys)(result.byHash.keys) + assertResult(expected.byKey.keys)(result.byKey.keys) assertResult(expected)(result) } } + trait MyS3CatsIOClient extends S3CatsIOClient { + override val underlying: S3AsyncClient = new S3AsyncClient { + override val underlying: s3.S3AsyncClient = new s3.S3AsyncClient { + override def serviceName(): String = "fake-s3-client" + override def close(): Unit = () + } + } + } }