Handle renames (#14)

* [sync] move thunks to s3client to bottom of class

Also, use the thunk methods from within run rather than accessing the
s3client object directly.

* Layout tweaks to put each parameter on own line

* [syncsuite] value renames and move sync.run outside it() call

Future tests will be evaluating the result of that call, so this
avoids repeatedly calling it.

* Add first pass at copy methods and some delete stubs

* [Bucket] Convert from type alias for String to a case class

* [SyncSuite] mark new tests as pending

* [RemoteKey] Convert from type alias for String to a case class

* [MD5Hash] Convert from type alias for String to a case class

* [LastModified] Convert from type alias for String to a case class

* [LocalFile] Revert to using a normal File

* [Sync] Use a for-comprehension and restructure S3MetaData

The for-comprehension will make it easier to generate multiple actions
out of the stream of enriched metadata. The restructured S3MetaData
avoids the need to wrap it in an Either in some cases.

* [ToUpload] Add an wrapper to indicate action required on File

* [S3Action] Stub actions for IO events

* [S3Action] Use UploadS3Action

* [Sync] Fix formating when echoing parameters

* [logging] Change log level down to 4 for listing every file considered

* [Sync] Use a case class to hold counters

* [HashModified] Add case class to replace MD5Hash, LastModified tuples

* [logging] Move file considered logging to source of files

Rather than logging this where adding meta data, move to where the
files are being initially identified.

* [logging] Log all final counters

* Pass Config and HashLookup as implicit parameters

* [LocalFileStream] rename method as findFiles

* [S3MetaDataEnricher] rename method as getMetadata

* Rename selection filter and uploader trait and methods

* [MD5HashGenerator] Extract as trait

* [Action] Convert ToUpload into an Action sealed trait

* [ActionGenerator] refactored and removed logging

* fix up tests

* [LocalFileStream] adjust logging

* [RemoteMetaData] Added

* [ActionGenerator] remove redundant braces

* [LocalFile] Added as wrapper for File

* [Sync] run: remove redundant braces

* [Sync] run: rename HashLookup as S3ObjectsData

* WIP - toward copy action

* Extract S3ObjectsByHash for grouping

* extract internal wrapper for S3CatsIOClient

Remove some boiler plate from the middle of a test

* Explicitly name the Map parameters in extected result

* All lastModified are the same to avoid confusion

We aren't testing this field, just that the keys and hash values are correct.

* Rename variable

* space out object cxreation

* Fix test - error in expected result

Code has been working for ages!

* [readme] condense and simplify behaviour table, adding option delete

Reduce the complexity by only noting the distinct attributes leading
to each action.

Add the action of delete when a local file is missing.

* [S3MetaDataEnricherSuite] rename tests and note missing tests

* [ActionGeneratorSuite] rename tests and note missing tests

* Note unwritten tests as such

* [ActionGenerator]  #2 local exists, remote is missing, other matches

* [S3ClientSuite] fix tests

* [S3MetaDataEnricherSuite] #2a local exists, remote is missing, remote matches, other matches - copy

* [S3MetaDataEnricherSuite] drop 'remote is missing, remote matches'

Impossible to represent this combination

* [S3MetaDataEnricherSuite] #3 local exists, remote is missing, remote no match, other no matches - upload

* [S3MetaDataEnricherSuite] Tests #1-3 rename variables consistantly

* [S3MetadataEnricherSuite] #4 local exists, remote exists, remote no match, other matches - copy

* [S3MetadataEnricherSuite] #5 local exists, remote exists, remote no match, other no matches - upload

* [S3MetadataEnricherSuite] drop test #6 - no way to make request

* [ActionGeneratorSuite] standardise tests 2-4

* [ActionGeneratorSuite] #1 local exists, remote exists, remote matches - do nothing

* [ActionGeneratorSuite] Comment expected outcome

* [ActionGeneratorSuite] #5 local exists, remote exists, remote no match, other no matches - upload

* [Action] Add ToDelete case class

* Use ToDelete and fix up return types for DeleteS3Action

* [ActionGenerator] Add explicit case for #1

* [ActionGenerator] Add explicit check for local exists in #2

* [ActionGenerator] match case against #3

* [ActionGenerator] simplify case and match against #5

* [ActionGenerator] Add case for #4

* [ActionGenerator] Remote explicit checks for file existing

If we are called with a LocalFile parameter then we assume the file exists.

* [ActionGenerator] Avoid #1 matching condition #5

* [ActionGeneratorSuite] enable tests

* [test] remove stray println

* [SyncSuite] Add test helper RecordingSync

* [SyncSuite] Use RecordingSync

* [SyncSuite] enable rename test - excluding delete test

* [Sync] log and increment counters for copy and delete

* [Sync] Use case matched RemoteKey in log message

* [Sync] Reorder actioins to do copy then upload then delete

* [S3Action] Drop Move as a distinct action

Can be implemented as a Copy followed by a Delete.

* [S3Action] Actions are ordered Copy, Upload then Delete

This allows sequencing of actions so that all the quick to accomplish
copies take place before bandwidth/time costly updates or destructive
deletes. Deletes come last after they have had the opportunity to b
used as the source for any copies.

* [Sync] Use S3Action's default sorting

* [Sync] extract logging of activity

* [SyncLogging] Extract logging out of Sync

Single Responsibility principle - Sync knows nothing about how it
logs, it just delegates to SyncLogging.

* [Sync] Rename variables and extract sort into private def

* [SyncLogging] Use IO context

* [SyncLogging] Remove moved counter

* [SyncLogging] Clean up an log start of run config info

* Verify that IO actions are evaluated before the program terminates

* [Sync] ensure logging runs

* [ActionGenerator] Don't upload files every time

* [ActionGenerator] fix remote hash for #5

* [SyncSuite] Add tests for delete and delete after rename

* [RemoteKey] Add asFile and isMissingLocally helpers

* [Sync] Generate delete actions

* Remove old extensions upon MD5HashGenerator

* [MD5Hash] prevent confusion by never allowing quotes

This means we need to filter quotes from md5hash values at source

* [Sync] ensure start log message is run

* [ThorpS3Client] Fix passing parameters for source key

* [ThorpS3Client] reformat byKey for clarity

* [S3Client] Add level 5 logging around s3 sdk calls

* fix up tests
This commit is contained in:
Paul Campbell 2019-05-22 13:55:03 +01:00 committed by GitHub
parent 00743c425c
commit eacfc37095
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
43 changed files with 1124 additions and 382 deletions

View file

@ -20,6 +20,21 @@ hash of the file contents.
-v, --verbose <value> Verbosity level (1-5)
#+end_example
* Behaviour
When considering a local file, the following table governs what should happen:
|---+------------+------------+------------------+--------------------+---------------------|
| # | local file | remote key | hash of same key | hash of other keys | action |
|---+------------+------------+------------------+--------------------+---------------------|
| 1 | exists | exists | matches | - | do nothing |
| 2 | exists | is missing | - | matches | copy from other key |
| 3 | exists | is missing | - | no matches | upload |
| 4 | exists | exists | no match | matches | copy from other key |
| 5 | exists | exists | no match | no matches | upload |
| 6 | is missing | exists | - | - | delete |
|---+------------+------------+------------------+--------------------+---------------------|
* Creating Native Images
- Download and install GraalVM

View file

@ -0,0 +1,8 @@
package net.kemitix.s3thorp
sealed trait Action
case class ToUpload(localFile: LocalFile) extends Action
case class ToCopy(sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey) extends Action
case class ToDelete(remoteKey: RemoteKey) extends Action

View file

@ -0,0 +1,48 @@
package net.kemitix.s3thorp
trait ActionGenerator
extends Logging {
def createActions(s3MetaData: S3MetaData)
(implicit c: Config): Stream[Action] =
s3MetaData match {
// #1 local exists, remote exists, remote matches - do nothing
case S3MetaData(localFile, _, Some(RemoteMetaData(_, remoteHash, _)))
if localFile.hash == remoteHash
=> Stream.empty
// #2 local exists, remote is missing, other matches - copy
case S3MetaData(localFile, otherMatches, None)
if otherMatches.nonEmpty
=> copyFile(localFile, otherMatches)
// #3 local exists, remote is missing, other no matches - upload
case S3MetaData(localFile, otherMatches, None)
if otherMatches.isEmpty
=> uploadFile(localFile)
// #4 local exists, remote exists, remote no match, other matches - copy
case S3MetaData(localFile, otherMatches, Some(RemoteMetaData(_, remoteHash, _)))
if localFile.hash != remoteHash &&
otherMatches.nonEmpty
=> copyFile(localFile, otherMatches)
// #5 local exists, remote exists, remote no match, other no matches - upload
case S3MetaData(localFile, hashMatches, Some(_))
if hashMatches.isEmpty
=> uploadFile(localFile)
case _ => Stream.empty
}
private def uploadFile(localFile: LocalFile) = Stream(ToUpload(localFile))
private def copyFile(localFile: LocalFile, matchByHash: Set[RemoteMetaData]): Stream[Action] =
Stream(ToCopy(
sourceKey = matchByHash.head.remoteKey,
hash = localFile.hash,
targetKey = localFile.remoteKey
))
}

View file

@ -0,0 +1,24 @@
package net.kemitix.s3thorp
import cats.effect.IO
import net.kemitix.s3thorp.awssdk.S3Client
trait ActionSubmitter
extends S3Client
with Logging {
def submitAction(action: Action)
(implicit c: Config): IO[S3Action] = {
action match {
case ToUpload(file) =>
log4(s" Upload: ${file.relative}")
upload(file, c.bucket)
case ToCopy(sourceKey, hash, targetKey) =>
log4(s" Copy: $sourceKey => $targetKey")
copy(c.bucket, sourceKey, hash, targetKey)
case ToDelete(remoteKey) =>
log4(s" Delete: $remoteKey")
delete(c.bucket, remoteKey)
}
}
}

View file

@ -0,0 +1,5 @@
package net.kemitix.s3thorp
final case class Bucket(name: String) {
}

View file

@ -1,16 +1,11 @@
package net.kemitix.s3thorp
import java.io.File
import java.nio.file.Path
import net.kemitix.s3thorp.Sync.{Bucket, LocalFile}
case class Config(bucket: Bucket = "",
prefix: String = "",
case class Config(bucket: Bucket = Bucket(""),
prefix: RemoteKey = RemoteKey(""),
verbose: Int = 1,
source: LocalFile
source: File
) {
def relativePath(file: File): Path = source.toPath.relativize(file.toPath)
require(source.isDirectory, s"Source must be a directory: $source")
}

View file

@ -0,0 +1,6 @@
package net.kemitix.s3thorp
final case class HashModified(hash: MD5Hash,
modified: LastModified) {
}

View file

@ -4,11 +4,12 @@ import java.io.File
trait KeyGenerator {
def generateKey(c: Config)(file: File): String = {
def generateKey(source: File, prefix: RemoteKey)
(file: File): RemoteKey = {
val otherPath = file.toPath.toAbsolutePath
val sourcePath = c.source.toPath
val sourcePath = source.toPath
val relativePath = sourcePath.relativize(otherPath)
s"${c.prefix}/$relativePath"
RemoteKey(s"${prefix.key}/$relativePath")
}
}

View file

@ -0,0 +1,6 @@
package net.kemitix.s3thorp
final case class KeyModified(key: RemoteKey,
modified: LastModified) {
}

View file

@ -0,0 +1,7 @@
package net.kemitix.s3thorp
import java.time.Instant
final case class LastModified(when: Instant) {
}

View file

@ -0,0 +1,25 @@
package net.kemitix.s3thorp
import java.io.File
import java.nio.file.Path
case class LocalFile(file: File,
source: File,
keyGenerator: File => RemoteKey)
extends MD5HashGenerator {
require(!file.isDirectory, s"LocalFile must not be a directory: $file")
private lazy val myhash = md5File(file)
def hash: MD5Hash = myhash
// the equivalent location of the file on S3
def remoteKey: RemoteKey = keyGenerator(file)
def isDirectory: Boolean = file.isDirectory
// the path of the file within the source
def relative: Path = source.toPath.relativize(file.toPath)
}

View file

@ -2,18 +2,28 @@ package net.kemitix.s3thorp
import java.io.File
trait LocalFileStream {
trait LocalFileStream
extends KeyGenerator
with Logging {
def streamDirectoryPaths(file: File): Stream[File] =
dirPaths(file)
.flatMap(f => recurseIntoSubDirectories(f))
def findFiles(file: File)
(implicit c: Config): Stream[LocalFile] = {
log5(s"- Entering: $file")
val files = for {
f <- dirPaths(file)
fs <- recurseIntoSubDirectories(f)
} yield fs
log5(s"- Leaving: $file")
files
}
private def dirPaths(file: File): Stream[File] = Option(file.listFiles)
private def dirPaths(file: File): Stream[File] = {
Option(file.listFiles)
.getOrElse(throw new IllegalArgumentException(s"Directory not found $file")).toStream
}
private def recurseIntoSubDirectories: File => Stream[File] =
file =>
if (file.isDirectory) streamDirectoryPaths(file)
else Stream(file)
private def recurseIntoSubDirectories(file: File)(implicit c: Config): Stream[LocalFile] =
if (file.isDirectory) findFiles(file)(c)
else Stream(LocalFile(file, c.source, generateKey(c.source, c.prefix)))
}

View file

@ -0,0 +1,7 @@
package net.kemitix.s3thorp
final case class MD5Hash(hash: String) {
require(!hash.contains("\""))
}

View file

@ -0,0 +1,18 @@
package net.kemitix.s3thorp
import java.io.{File, FileInputStream}
import java.security.{DigestInputStream, MessageDigest}
trait MD5HashGenerator
extends Logging {
def md5File(file: File): MD5Hash = {
val buffer = new Array[Byte](8192)
val md5 = MessageDigest.getInstance("MD5")
val dis = new DigestInputStream(new FileInputStream(file), md5)
try { while (dis.read(buffer) != -1) { } } finally { dis.close() }
val hash = md5.digest.map("%02x".format(_)).mkString
MD5Hash(hash)
}
}

View file

@ -19,11 +19,11 @@ object ParseArgs {
.required()
.text("Source directory to sync to S3"),
opt[String]('b', "bucket")
.action((str, c) => c.copy(bucket = str))
.action((str, c) => c.copy(bucket = Bucket(str)))
.required()
.text("S3 bucket name"),
opt[String]('p', "prefix")
.action((str, c) => c.copy(prefix = str))
.action((str, c) => c.copy(prefix = RemoteKey(str)))
.text("Prefix within the S3 Bucket"),
opt[Int]('v', "verbose")
.validate(i =>

View file

@ -0,0 +1,11 @@
package net.kemitix.s3thorp
import java.io.File
import java.nio.file.Paths
final case class RemoteKey(key: String) {
def asFile(implicit c: Config): File =
c.source.toPath.resolve(Paths.get(c.prefix.key).relativize(Paths.get(key))).toFile
def isMissingLocally(implicit c: Config): Boolean =
! asFile.exists
}

View file

@ -0,0 +1,7 @@
package net.kemitix.s3thorp
final case class RemoteMetaData(remoteKey: RemoteKey,
hash: MD5Hash,
lastModified: LastModified) {
}

View file

@ -0,0 +1,25 @@
package net.kemitix.s3thorp
sealed trait S3Action {
// the remote key that was uploaded, deleted or otherwise updated by the action
def remoteKey: RemoteKey
val order: Int
}
case class CopyS3Action(remoteKey: RemoteKey) extends S3Action {
override val order: Int = 1
}
case class UploadS3Action(remoteKey: RemoteKey,
md5Hash: MD5Hash) extends S3Action {
override val order: Int = 2
}
case class DeleteS3Action(remoteKey: RemoteKey) extends S3Action {
override val order: Int = 3
}
object S3Action {
implicit def ord[A <: S3Action]: Ordering[A] = Ordering.by(_.order)
}

View file

@ -1,8 +1,6 @@
package net.kemitix.s3thorp
import net.kemitix.s3thorp.Sync.{MD5Hash, LastModified, LocalFile, RemoteKey}
// For the LocalFile, the set of matching S3 objects with the same MD5Hash, and any S3 object with the same remote key
case class S3MetaData(localFile: LocalFile,
remotePath: RemoteKey,
remoteHash: MD5Hash,
remoteLastModified: LastModified)
matchByHash: Set[RemoteMetaData],
matchByKey: Option[RemoteMetaData])

View file

@ -1,25 +1,21 @@
package net.kemitix.s3thorp
import java.io.File
import net.kemitix.s3thorp.Sync.{LastModified, MD5Hash}
import net.kemitix.s3thorp.awssdk.{HashLookup, S3Client}
import net.kemitix.s3thorp.awssdk.{S3ObjectsData, S3Client}
trait S3MetaDataEnricher
extends S3Client
with KeyGenerator
with Logging {
def enrichWithS3MetaData(c: Config)(implicit hashLookup: HashLookup): File => Either[File, S3MetaData] = {
val remoteKey = generateKey(c)_
file => {
log3(s"- Consider: ${c.relativePath(file)}")(c)
val key = remoteKey(file)
objectHead(key).map {
hlm: (MD5Hash, LastModified) => {
Right(S3MetaData(file, key, hlm._1.filter { c => c != '"' }, hlm._2))
}
}.getOrElse(Left(file))
}
def getMetadata(localFile: LocalFile)
(implicit c: Config,
s3ObjectsData: S3ObjectsData): S3MetaData = {
val (keyMatches: Option[HashModified], hashMatches: Set[KeyModified]) = getS3Status(localFile)
S3MetaData(localFile,
matchByKey = keyMatches.map{kmAsRemoteMetaData(localFile.remoteKey)},
matchByHash = hashMatches.map(km => RemoteMetaData(km.key, localFile.hash, km.modified)))
}
private def kmAsRemoteMetaData(key: RemoteKey): HashModified => RemoteMetaData = hm => RemoteMetaData(key, hm.hash, hm.modified)
}

View file

@ -1,23 +0,0 @@
package net.kemitix.s3thorp
import java.io.File
import cats.effect.IO
import net.kemitix.s3thorp.Sync.MD5Hash
import net.kemitix.s3thorp.awssdk.S3Client
trait S3Uploader
extends S3Client
with KeyGenerator
with Logging {
def performUpload(c: Config): File => (File, IO[Either[Throwable, MD5Hash]]) = {
val remoteKey = generateKey(c) _
file => {
val key = remoteKey(file)
val shortFile = c.relativePath(file)
log4(s" Upload: $shortFile")(c)
(file, upload(file, c.bucket, key))
}
}
}

View file

@ -1,48 +1,59 @@
package net.kemitix.s3thorp
import java.io.File
import java.time.Instant
import java.nio.file.Paths
import cats.effect._
import net.kemitix.s3thorp.Sync.{Bucket, LocalFile, MD5Hash, RemoteKey}
import net.kemitix.s3thorp.awssdk.{HashLookup, S3Client}
import cats.effect.IO
import cats.implicits._
import net.kemitix.s3thorp.awssdk.{S3Client, S3ObjectsData}
class Sync(s3Client: S3Client)
extends LocalFileStream
with S3MetaDataEnricher
with UploadSelectionFilter
with S3Uploader
with Logging {
with ActionGenerator
with ActionSubmitter
with SyncLogging {
override def upload(localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey) =
s3Client.upload(localFile, bucket, remoteKey)
def run(c: Config): IO[Unit] = {
implicit val config: Config = c
log1(s"Bucket: ${c.bucket}, Prefix: ${c.prefix}, Source: ${c.source}")
s3Client.listObjects(c.bucket, c.prefix).map { hashLookup => {
val stream: Stream[(File, IO[Either[Throwable, MD5Hash]])] = streamDirectoryPaths(c.source).map(
enrichWithS3MetaData(c)(hashLookup)).flatMap(
uploadRequiredFilter(c)).map(
performUpload(c))
val count: Int = stream.foldLeft(0)((a: Int, io) => {
io._2.unsafeRunSync
log1(s"- Done: ${io._1}")
a + 1
})
log1(s"Uploaded $count files")
def run(implicit c: Config): IO[Unit] = {
logRunStart(c).unsafeRunSync
listObjects(c.bucket, c.prefix)
.map { implicit s3ObjectsData => {
val actions = (for {
file <- findFiles(c.source)
meta = getMetadata(file)
action <- createActions(meta)
ioS3Action = submitAction(action)
} yield ioS3Action).sequence
val sorted = sort(actions)
val list = sorted.unsafeRunSync.toList
val delActions = (for {
key <- s3ObjectsData.byKey.keys
if key.isMissingLocally
ioDelAction = submitAction(ToDelete(key))
} yield ioDelAction).toStream.sequence
val delList = delActions.unsafeRunSync.toList
logRunFinished(list ++ delList).unsafeRunSync
}}
}
override def listObjects(bucket: Bucket, prefix: RemoteKey): IO[HashLookup] = ???
}
object Sync {
type Bucket = String // the S3 bucket name
type LocalFile = File // the file or directory
type RemoteKey = String // path within an S3 bucket
type MD5Hash = String // an MD5 hash
type LastModified = Instant // or scala equivalent
private def sort(ioActions: IO[Stream[S3Action]]) =
ioActions.flatMap { actions => IO { actions.sorted } }
override def upload(localFile: LocalFile,
bucket: Bucket)(implicit c: Config): IO[UploadS3Action] =
s3Client.upload(localFile, bucket)
override def copy(bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey)(implicit c: Config): IO[CopyS3Action] =
s3Client.copy(bucket, sourceKey, hash, targetKey)
override def delete(bucket: Bucket,
remoteKey: RemoteKey)(implicit c: Config): IO[DeleteS3Action] =
s3Client.delete(bucket, remoteKey)
override def listObjects(bucket: Bucket,
prefix: RemoteKey
)(implicit c: Config): IO[S3ObjectsData] =
s3Client.listObjects(bucket, prefix)
}

View file

@ -0,0 +1,40 @@
package net.kemitix.s3thorp
import cats.effect.IO
// Logging for the Sync class
trait SyncLogging extends Logging {
def logRunStart(c: Config): IO[Unit] = IO {
log1(s"Bucket: ${c.bucket.name}, Prefix: ${c.prefix.key}, Source: ${c.source}")(c)
}
def logRunFinished(actions: List[S3Action])
(implicit c: Config): IO[Unit] = IO {
val counters = actions.foldLeft(Counters())(logActivity)
log1(s"Uploaded ${counters.uploaded} files")
log1(s"Copied ${counters.copied} files")
log1(s"Deleted ${counters.deleted} files")
}
private def logActivity(implicit c: Config): (Counters, S3Action) => Counters =
(counters: Counters, s3Action: S3Action) => {
s3Action match {
case UploadS3Action(remoteKey, _) =>
log1(s"- Uploaded: ${remoteKey.key}")
counters.copy(uploaded = counters.uploaded + 1)
case CopyS3Action(remoteKey) =>
log1(s"- Copied: ${remoteKey.key}")
counters.copy(copied = counters.copied + 1)
case DeleteS3Action(remoteKey) =>
log1(s"- Deleted: ${remoteKey.key}")
counters.copy(deleted = counters.deleted + 1)
case _ => counters
}
}
case class Counters(uploaded: Int = 0,
deleted: Int = 0,
copied: Int = 0)
}

View file

@ -1,35 +0,0 @@
package net.kemitix.s3thorp
import java.io.{File, FileInputStream}
import java.security.{DigestInputStream, MessageDigest}
import net.kemitix.s3thorp.Sync.{LocalFile, MD5Hash}
trait UploadSelectionFilter
extends Logging {
private def md5File(localFile: LocalFile): MD5Hash = {
val buffer = new Array[Byte](8192)
val md5 = MessageDigest.getInstance("MD5")
val dis = new DigestInputStream(new FileInputStream(localFile), md5)
try { while (dis.read(buffer) != -1) { } } finally { dis.close() }
md5.digest.map("%02x".format(_)).mkString
}
def uploadRequiredFilter(c: Config): Either[File, S3MetaData] => Stream[File] = {
case Left(file) => {
log5(s" Created: ${c.relativePath(file)}")(c)
Stream(file)
}
case Right(s3Metadata) => {
val localHash: MD5Hash = md5File(s3Metadata.localFile)
if (localHash != s3Metadata.remoteHash) {
log5(s" Updated: ${c.relativePath(s3Metadata.localFile)}")(c)
Stream(s3Metadata.localFile)
}
else Stream.empty
}
}
}

View file

@ -1,11 +0,0 @@
package net.kemitix.s3thorp.awssdk
import net.kemitix.s3thorp.Sync.{LastModified, MD5Hash, RemoteKey}
/**
* A list of objects and their MD5 hash values.
*/
case class HashLookup(byHash: Map[MD5Hash, (RemoteKey, LastModified)],
byKey: Map[RemoteKey, (MD5Hash, LastModified)]) {
}

View file

@ -1,24 +1,46 @@
package net.kemitix.s3thorp.awssdk
import cats.effect.IO
import com.github.j5ik2o.reactive.aws.s3.S3AsyncClient
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
import net.kemitix.s3thorp.Sync.{Bucket, LastModified, LocalFile, MD5Hash, RemoteKey}
import net.kemitix.s3thorp._
trait S3Client {
final def objectHead(remoteKey: RemoteKey)(implicit hashLookup: HashLookup): Option[(MD5Hash, LastModified)] =
hashLookup.byKey.get(remoteKey)
final def getS3Status(localFile: LocalFile)
(implicit s3ObjectsData: S3ObjectsData): (Option[HashModified], Set[KeyModified]) = {
val matchingByKey = s3ObjectsData.byKey.get(localFile.remoteKey)
val matchingByHash = s3ObjectsData.byHash.getOrElse(localFile.hash, Set())
(matchingByKey, matchingByHash)
}
def listObjects(bucket: Bucket, prefix: RemoteKey): IO[HashLookup]
def listObjects(bucket: Bucket,
prefix: RemoteKey
)(implicit c: Config): IO[S3ObjectsData]
def upload(localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey): IO[Either[Throwable, MD5Hash]]
def upload(localFile: LocalFile,
bucket: Bucket
)(implicit c: Config): IO[UploadS3Action]
def copy(bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey
)(implicit c: Config): IO[CopyS3Action]
def delete(bucket: Bucket,
remoteKey: RemoteKey
)(implicit c: Config): IO[DeleteS3Action]
}
object S3Client {
def createClient(s3AsyncClient: S3AsyncClient): S3Client = {
new ThorpS3Client(S3CatsIOClient(s3AsyncClient))
}
val defaultClient: S3Client =
new ThorpS3Client(
S3CatsIOClient(new JavaClientWrapper {}.underlying))
createClient(new JavaClientWrapper {}.underlying)
}

View file

@ -0,0 +1,16 @@
package net.kemitix.s3thorp.awssdk
import net.kemitix.s3thorp.{Bucket, Config, LocalFile, Logging}
trait S3ClientLogging
extends Logging {
def logUploadStart(localFile: LocalFile, bucket: Bucket)
(implicit c: Config)=
log5(s"s3Client:upload:start: ${localFile.file}")
def logUploadDone(localFile: LocalFile, bucket: Bucket)
(implicit c: Config) =
log5(s"s3Client:upload:done : ${localFile.file}")
}

View file

@ -0,0 +1,15 @@
package net.kemitix.s3thorp.awssdk
import net.kemitix.s3thorp.{KeyModified, LastModified, MD5Hash, RemoteKey}
import software.amazon.awssdk.services.s3.model.S3Object
trait S3ObjectsByHash {
def byHash(os: Stream[S3Object]): Map[MD5Hash, Set[KeyModified]] = {
val mD5HashToS3Objects: Map[MD5Hash, Stream[S3Object]] = os.groupBy(o => MD5Hash(o.eTag.filter{c => c != '"'}))
val hashToModifieds: Map[MD5Hash, Set[KeyModified]] =
mD5HashToS3Objects.mapValues { os => os.map { o => KeyModified(RemoteKey(o.key), LastModified(o.lastModified())) }.toSet }
hashToModifieds
}
}

View file

@ -0,0 +1,11 @@
package net.kemitix.s3thorp.awssdk
import net.kemitix.s3thorp.{HashModified, KeyModified, MD5Hash, RemoteKey}
/**
* A list of objects and their MD5 hash values.
*/
case class S3ObjectsData(byHash: Map[MD5Hash, Set[KeyModified]],
byKey: Map[RemoteKey, HashModified]) {
}

View file

@ -2,40 +2,83 @@ package net.kemitix.s3thorp.awssdk
import cats.effect.IO
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
import net.kemitix.s3thorp.Sync._
import net.kemitix.s3thorp._
import software.amazon.awssdk.core.async.AsyncRequestBody
import software.amazon.awssdk.services.s3.model.{ListObjectsV2Request, PutObjectRequest, S3Object}
import software.amazon.awssdk.services.s3.model.{Bucket => _, _}
import scala.collection.JavaConverters._
private class ThorpS3Client(s3Client: S3CatsIOClient) extends S3Client {
private class ThorpS3Client(s3Client: S3CatsIOClient)
extends S3Client
with S3ObjectsByHash
with Logging {
def upload(localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey): IO[Either[Throwable, MD5Hash]] = {
override def upload(localFile: LocalFile,
bucket: Bucket
)(implicit c: Config): IO[UploadS3Action] = {
log5(s"upload:bucket = ${bucket.name}, localFile = ${localFile.remoteKey}")
val request = PutObjectRequest.builder()
.bucket(bucket)
.key(remoteKey)
.bucket(bucket.name)
.key(localFile.remoteKey.key)
.build()
val body = AsyncRequestBody.fromFile(localFile)
s3Client.putObject(request, body).map(r => Right(r.eTag()))
val body = AsyncRequestBody.fromFile(localFile.file)
s3Client.putObject(request, body)
.map(_.eTag)
.map(_.filter{c => c != '"'})
.map(MD5Hash)
.map(md5Hash => UploadS3Action(localFile.remoteKey, md5Hash))
}
private def asHashLookup: Stream[S3Object] => HashLookup =
os => HashLookup(byHash(os), byKey(os))
override def copy(bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey
)(implicit c: Config): IO[CopyS3Action] = {
log5(s"copy:bucket = ${bucket.name}, sourceKey = ${sourceKey.key}, targetKey = ${targetKey.key}")
val request = CopyObjectRequest.builder()
.bucket(bucket.name)
.copySource(s"${bucket.name}/${sourceKey.key}")
.copySourceIfMatch(hash.hash)
.key(targetKey.key)
.build()
s3Client.copyObject(request)
.map(_ => CopyS3Action(targetKey))
}
private def byHash(os: Stream[S3Object]) =
os.map{o => (o.eTag, (o.key, o.lastModified))}.toMap
override def delete(bucket: Bucket,
remoteKey: RemoteKey
)(implicit c: Config): IO[DeleteS3Action] = {
log5(s"delete:bucket = ${bucket.name}, remoteKey = ${remoteKey.key}")
val request = DeleteObjectRequest.builder()
.bucket(bucket.name)
.key(remoteKey.key)
.build()
s3Client.deleteObject(request)
.map(_ => DeleteS3Action(remoteKey))
}
private def asS3ObjectsData: Stream[S3Object] => S3ObjectsData =
os => S3ObjectsData(byHash(os), byKey(os))
private def byKey(os: Stream[S3Object]) =
os.map{o => (o.key(), (o.eTag(), o.lastModified()))}.toMap
os.map { o => {
val remoteKey = RemoteKey(o.key)
val hash = MD5Hash(o.eTag().filter { c => c != '"' })
val lastModified = LastModified(o.lastModified())
(remoteKey, HashModified(hash, lastModified))
}}.toMap
def listObjects(bucket: Bucket, prefix: RemoteKey): IO[HashLookup] = {
def listObjects(bucket: Bucket, prefix: RemoteKey)
(implicit c: Config): IO[S3ObjectsData] = {
log5(s"listObjects:bucket = ${bucket.name}, prefix: ${prefix.key}")
val request = ListObjectsV2Request.builder()
.bucket(bucket)
.prefix(prefix)
.bucket(bucket.name)
.prefix(prefix.key)
.build()
s3Client.listObjectsV2(request)
.map(r => r.contents.asScala.toStream)
.map(asHashLookup)
.map(asS3ObjectsData)
}
}

View file

@ -0,0 +1,106 @@
package net.kemitix.s3thorp
import java.time.Instant
import org.scalatest.FunSpec
class ActionGeneratorSuite
extends UnitTest
with KeyGenerator {
private val source = Resource(this, "upload")
private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
private val fileToKey = generateKey(config.source, config.prefix) _
val lastModified = LastModified(Instant.now())
new ActionGenerator {
describe("create actions") {
def invoke(input: S3MetaData) = createActions(input).toList
describe("#1 local exists, remote exists, remote matches - do nothing") {
val theHash = MD5Hash("the-hash")
val theFile = aLocalFile("the-file", theHash, source, fileToKey)
val theRemoteMetadata = RemoteMetaData(theFile.remoteKey, theHash, lastModified)
val input = S3MetaData(theFile, // local exists
matchByHash = Set(theRemoteMetadata), // remote matches
matchByKey = Some(theRemoteMetadata) // remote exists
)
it("do nothing") {
val expected = List.empty // do nothing
val result = invoke(input)
assertResult(expected)(result)
}
}
describe("#2 local exists, remote is missing, other matches - copy") {
val theHash = MD5Hash("the-hash")
val theFile = aLocalFile("the-file", theHash, source, fileToKey)
val theRemoteKey = theFile.remoteKey
val otherRemoteKey = aRemoteKey(prefix, "other-key")
val otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash, lastModified)
val input = S3MetaData(theFile, // local exists
matchByHash = Set(otherRemoteMetadata), // other matches
matchByKey = None) // remote is missing
it("copy from other key") {
val expected = List(ToCopy(otherRemoteKey, theHash, theRemoteKey)) // copy
val result = invoke(input)
assertResult(expected)(result)
}
}
describe("#3 local exists, remote is missing, other no matches - upload") {
val theHash = MD5Hash("the-hash")
val theFile = aLocalFile("the-file", theHash, source, fileToKey)
val input = S3MetaData(theFile, // local exists
matchByHash = Set.empty, // other no matches
matchByKey = None) // remote is missing
it("upload") {
val expected = List(ToUpload(theFile)) // upload
val result = invoke(input)
assertResult(expected)(result)
}
}
describe("#4 local exists, remote exists, remote no match, other matches - copy") {
val theHash = MD5Hash("the-hash")
val theFile = aLocalFile("the-file", theHash, source, fileToKey)
val theRemoteKey = theFile.remoteKey
val oldHash = MD5Hash("old-hash")
val otherRemoteKey = aRemoteKey(prefix, "other-key")
val otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash, lastModified)
val oldRemoteMetadata = RemoteMetaData(theRemoteKey,
hash = oldHash, // remote no match
lastModified = lastModified)
val input = S3MetaData(theFile, // local exists
matchByHash = Set(otherRemoteMetadata), // other matches
matchByKey = Some(oldRemoteMetadata)) // remote exists
it("copy from other key") {
val expected = List(ToCopy(otherRemoteKey, theHash, theRemoteKey)) // copy
val result = invoke(input)
assertResult(expected)(result)
}
}
describe("#5 local exists, remote exists, remote no match, other no matches - upload") {
val theHash = MD5Hash("the-hash")
val theFile = aLocalFile("the-file", theHash, source, fileToKey)
val theRemoteKey = theFile.remoteKey
val oldHash = MD5Hash("old-hash")
val theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash, lastModified)
val input = S3MetaData(theFile, // local exists
matchByHash = Set.empty, // remote no match, other no match
matchByKey = Some(theRemoteMetadata) // remote exists
)
it("upload") {
val expected = List(ToUpload(theFile)) // upload
val result = invoke(input)
assertResult(expected)(result)
}
}
describe("#6 local missing, remote exists - delete") {
it("TODO") {
pending
}
}
}
}
}

View file

@ -1,12 +1,26 @@
package net.kemitix.s3thorp
import cats.effect.IO
import net.kemitix.s3thorp.Sync.{Bucket, LocalFile, MD5Hash, RemoteKey}
import net.kemitix.s3thorp.awssdk.{HashLookup, S3Client}
import net.kemitix.s3thorp.awssdk.{S3ObjectsData, S3Client}
trait DummyS3Client extends S3Client {
override def upload(localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey): IO[Either[Throwable, MD5Hash]] = ???
override def upload(localFile: LocalFile,
bucket: Bucket
)(implicit c: Config): IO[UploadS3Action] = ???
override def copy(bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey
)(implicit c: Config): IO[CopyS3Action] = ???
override def delete(bucket: Bucket,
remoteKey: RemoteKey
)(implicit c: Config): IO[DeleteS3Action] = ???
override def listObjects(bucket: Bucket,
prefix: RemoteKey
)(implicit c: Config): IO[S3ObjectsData] = ???
override def listObjects(bucket: Bucket, prefix: RemoteKey): IO[HashLookup] = ???
}

View file

@ -0,0 +1,35 @@
package net.kemitix.s3thorp
import java.io.File
import org.scalatest.FunSpec
class KeyGeneratorSuite extends FunSpec {
new KeyGenerator {
private val source: File = Resource(this, "upload")
private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
private val fileToKey = generateKey(config.source, config.prefix) _
describe("key generator") {
def resolve(subdir: String): File = {
source.toPath.resolve(subdir).toFile
}
describe("when file is within source") {
it("has a valid key") {
val subdir = "subdir"
assertResult(RemoteKey(s"${prefix.key}/$subdir"))(fileToKey(resolve(subdir)))
}
}
describe("when file is deeper within source") {
it("has a valid key") {
val subdir = "subdir/deeper/still"
assertResult(RemoteKey(s"${prefix.key}/$subdir"))(fileToKey(resolve(subdir)))
}
}
}
}
}

View file

@ -1,16 +1,15 @@
package net.kemitix.s3thorp
import java.io.File
import org.scalatest.FunSpec
class LocalFileStreamSuite extends FunSpec with LocalFileStream {
describe("streamDirectoryPaths") {
var uploadResource = Resource(this, "upload")
describe("findFiles") {
val uploadResource = Resource(this, "upload")
val config: Config = Config(source = uploadResource)
it("should find all files") {
val result: Set[String] = streamDirectoryPaths(uploadResource).toSet
.map { x: File => uploadResource.toPath.relativize(x.toPath).toString }
val result: Set[String] = findFiles(uploadResource)(config).toSet
.map { x: LocalFile => x.relative.toString }
assertResult(Set("subdir/leaf-file", "root-file"))(result)
}
}

View file

@ -7,7 +7,8 @@ import scala.util.Try
object Resource {
def apply(base: AnyRef,
name: String): File =
name: String): File = {
Try(new File(base.getClass.getResource(name).getPath))
.getOrElse(throw new FileNotFoundException(name))
}
}

View file

@ -0,0 +1,19 @@
package net.kemitix.s3thorp
class S3ActionSuite extends UnitTest {
describe("Ordering of types") {
val remoteKey = RemoteKey("remote-key")
val md5Hash = MD5Hash("md5hash")
val copy = CopyS3Action(remoteKey)
val upload = UploadS3Action(remoteKey, md5Hash)
val delete = DeleteS3Action(remoteKey)
val unsorted = List(delete, copy, upload)
it("should sort as copy < upload < delete ") {
val result = unsorted.sorted
val expected = List(copy, upload, delete)
assertResult(expected)(result)
}
}
}

View file

@ -1,69 +1,133 @@
package net.kemitix.s3thorp
import java.io.File
import java.nio.file.Paths
import java.time.Instant
import net.kemitix.s3thorp.awssdk.HashLookup
import org.scalatest.FunSpec
import net.kemitix.s3thorp.awssdk.S3ObjectsData
class S3MetaDataEnricherSuite extends FunSpec {
class S3MetaDataEnricherSuite
extends UnitTest
with KeyGenerator {
private val sourcePath = "/root/from/here/"
private val source = Paths.get(sourcePath).toFile
private val prefix = "prefix"
private val config = Config("bucket", prefix, source = source)
new S3MetaDataEnricher with DummyS3Client {
describe("key generator") {
val subject = generateKey(config)_
def resolve(subdir: String): File = {
source.toPath.resolve(subdir).toFile
}
describe("when file is within source") {
it("has a valid key") {
val subdir = "subdir"
assertResult(s"$prefix/$subdir")(subject(resolve(subdir)))
}
}
describe("when file is deeper within source") {
it("has a valid key") {
val subdir = "subdir/deeper/still"
assertResult(s"$prefix/$subdir")(subject(resolve(subdir)))
}
}
}
}
private val source = Resource(this, "upload")
private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
private val fileToKey = generateKey(config.source, config.prefix) _
val lastModified = LastModified(Instant.now())
describe("enrich with metadata") {
val local = "localFile"
val fileWithRemote = new File(sourcePath + local)
val fileWithNoRemote = new File(sourcePath + "noRemote")
val remoteKey = prefix + "/" + local
val hash = "hash"
val lastModified = Instant.now()
val hashLookup = HashLookup(
byHash = Map(hash -> (remoteKey, lastModified)),
byKey = Map(remoteKey -> (hash, lastModified))
)
describe("when remote exists") {
new S3MetaDataEnricher with DummyS3Client {
it("returns metadata") {
val expectedMetadata = S3MetaData(fileWithRemote, remoteKey, hash, lastModified)
val result = enrichWithS3MetaData(config)(hashLookup)(fileWithRemote)
assertResult(Right(expectedMetadata))(result)
describe("#1a local exists, remote exists, remote matches, other matches - do nothing") {
val theHash: MD5Hash = MD5Hash("the-file-hash")
val theFile: LocalFile = aLocalFile("the-file", theHash, source, fileToKey)
val theRemoteKey: RemoteKey = theFile.remoteKey
implicit val s3: S3ObjectsData = S3ObjectsData(
byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))),
byKey = Map(theRemoteKey -> HashModified(theHash, lastModified))
)
val theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash, lastModified)
it("generates valid metadata") {
val expected = S3MetaData(theFile,
matchByHash = Set(theRemoteMetadata),
matchByKey = Some(theRemoteMetadata))
val result = getMetadata(theFile)
assertResult(expected)(result)
}
}
describe("#1b local exists, remote exists, remote matches, other no matches - do nothing") {
val theHash: MD5Hash = MD5Hash("the-file-hash")
val theFile: LocalFile = aLocalFile("the-file", theHash, source, fileToKey)
val theRemoteKey: RemoteKey = aRemoteKey(prefix, "the-file")
implicit val s3: S3ObjectsData = S3ObjectsData(
byHash = Map(theHash -> Set(KeyModified(theRemoteKey, lastModified))),
byKey = Map(theRemoteKey -> HashModified(theHash, lastModified))
)
val theRemoteMetadata = RemoteMetaData(theRemoteKey, theHash, lastModified)
it("generates valid metadata") {
val expected = S3MetaData(theFile,
matchByHash = Set(theRemoteMetadata),
matchByKey = Some(theRemoteMetadata))
val result = getMetadata(theFile)
assertResult(expected)(result)
}
describe("when remote doesn't exist") {
new S3MetaDataEnricher with DummyS3Client {
it("returns file to upload") {
val result = enrichWithS3MetaData(config)(hashLookup)(fileWithNoRemote)
assertResult(Left(fileWithNoRemote))(result)
}
describe("#2 local exists, remote is missing, remote no match, other matches - copy") {
val theHash = MD5Hash("the-hash")
val theFile = aLocalFile("the-file", theHash, source, fileToKey)
val otherRemoteKey = RemoteKey("other-key")
implicit val s3: S3ObjectsData = S3ObjectsData(
byHash = Map(theHash -> Set(KeyModified(otherRemoteKey, lastModified))),
byKey = Map(otherRemoteKey -> HashModified(theHash, lastModified))
)
val otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash, lastModified)
it("generates valid metadata") {
val expected = S3MetaData(theFile,
matchByHash = Set(otherRemoteMetadata),
matchByKey = None)
val result = getMetadata(theFile)
assertResult(expected)(result)
}
}
describe("#3 local exists, remote is missing, remote no match, other no matches - upload") {
val theHash = MD5Hash("the-hash")
val theFile = aLocalFile("the-file", theHash, source, fileToKey)
implicit val s3: S3ObjectsData = S3ObjectsData(
byHash = Map(),
byKey = Map()
)
it("generates valid metadata") {
val expected = S3MetaData(theFile,
matchByHash = Set.empty,
matchByKey = None)
val result = getMetadata(theFile)
assertResult(expected)(result)
}
}
describe("#4 local exists, remote exists, remote no match, other matches - copy") {
val theHash = MD5Hash("the-hash")
val theFile = aLocalFile("the-file", theHash, source, fileToKey)
val theRemoteKey = theFile.remoteKey
val oldHash = MD5Hash("old-hash")
val otherRemoteKey = aRemoteKey(prefix, "other-key")
implicit val s3: S3ObjectsData = S3ObjectsData(
byHash = Map(
oldHash -> Set(KeyModified(theRemoteKey, lastModified)),
theHash -> Set(KeyModified(otherRemoteKey, lastModified))),
byKey = Map(
theRemoteKey -> HashModified(oldHash, lastModified),
otherRemoteKey -> HashModified(theHash, lastModified)
)
)
val theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash, lastModified)
val otherRemoteMetadata = RemoteMetaData(otherRemoteKey, theHash, lastModified)
it("generates valid metadata") {
val expected = S3MetaData(theFile,
matchByHash = Set(otherRemoteMetadata),
matchByKey = Some(theRemoteMetadata))
val result = getMetadata(theFile)
assertResult(expected)(result)
}
}
describe("#5 local exists, remote exists, remote no match, other no matches - upload") {
val theHash = MD5Hash("the-hash")
val theFile = aLocalFile("the-file", theHash, source, fileToKey)
val theRemoteKey = theFile.remoteKey
val oldHash = MD5Hash("old-hash")
implicit val s3: S3ObjectsData = S3ObjectsData(
byHash = Map(
oldHash -> Set(KeyModified(theRemoteKey, lastModified)),
theHash -> Set.empty),
byKey = Map(
theRemoteKey -> HashModified(oldHash, lastModified)
)
)
val theRemoteMetadata = RemoteMetaData(theRemoteKey, oldHash, lastModified)
it("generates valid metadata") {
val expected = S3MetaData(theFile,
matchByHash = Set.empty,
matchByKey = Some(theRemoteMetadata))
val result = getMetadata(theFile)
assertResult(expected)(result)
}
}
}

View file

@ -2,83 +2,215 @@ package net.kemitix.s3thorp
import java.io.File
import java.time.Instant
import java.util.concurrent.CompletableFuture
import cats.effect.IO
import net.kemitix.s3thorp.Sync.{Bucket, LocalFile, MD5Hash, RemoteKey}
import net.kemitix.s3thorp.awssdk.{HashLookup, S3Client}
import org.scalatest.FunSpec
import net.kemitix.s3thorp.awssdk.{S3Client, S3ObjectsData}
import com.github.j5ik2o.reactive.aws.s3.S3AsyncClient
import software.amazon.awssdk.core.async.AsyncRequestBody
import software.amazon.awssdk.services.s3.{S3AsyncClient => JavaS3AsyncClient}
import software.amazon.awssdk.services.s3
import software.amazon.awssdk.services.s3.model.{ListObjectsV2Request, ListObjectsV2Response, PutObjectRequest, PutObjectResponse}
class SyncSuite extends FunSpec {
class SyncSuite
extends UnitTest
with KeyGenerator {
private val source = Resource(this, "upload")
private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
private val lastModified = LastModified(Instant.now)
describe("s3client thunk") {
val testBucket = "bucket"
val testRemoteKey = "prefix/file"
val testBucket = Bucket("bucket")
val prefix = RemoteKey("prefix")
val source = new File("/")
describe("upload") {
val md5Hash = "the-hash"
val testLocalFile = new File("file")
val md5Hash = MD5Hash("the-hash")
val testLocalFile = aLocalFile("file", md5Hash, source, generateKey(source, prefix))
val sync = new Sync(new S3Client with DummyS3Client {
override def upload(localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey): IO[Either[Throwable, MD5Hash]] = {
assert(localFile == testLocalFile)
override def upload(localFile: LocalFile, bucket: Bucket)(implicit c: Config) = IO {
assert(bucket == testBucket)
assert(remoteKey == testRemoteKey)
IO(Right(md5Hash))
UploadS3Action(localFile.remoteKey, md5Hash)
}
})
it("delegates unmodified to the S3Client") {
assertResult(Right(md5Hash))(
sync.upload(testLocalFile, testBucket, testRemoteKey).
assertResult(UploadS3Action(RemoteKey(prefix.key + "/file"), md5Hash))(
sync.upload(testLocalFile, testBucket).
unsafeRunSync())
}
}
}
describe("run") {
val testBucket = "bucket"
val testBucket = Bucket("bucket")
val source = Resource(this, "upload")
// source contains the files root-file and subdir/leaf-file
val config = Config("bucket", "prefix", source = source)
val config = Config(Bucket("bucket"), RemoteKey("prefix"), source = source)
val rootRemoteKey = RemoteKey("prefix/root-file")
val leafRemoteKey = RemoteKey("prefix/subdir/leaf-file")
describe("when all files should be uploaded") {
var uploadsRecord: Map[String, RemoteKey] = Map()
val sync = new Sync(new DummyS3Client{
override def listObjects(bucket: Bucket, prefix: RemoteKey) = IO(
HashLookup(
val sync = new RecordingSync(testBucket, new DummyS3Client {}, S3ObjectsData(
byHash = Map(),
byKey = Map()))
override def upload(localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey) = {
if (bucket == testBucket)
uploadsRecord += (source.toPath.relativize(localFile.toPath).toString -> remoteKey)
IO(Right("some hash value"))
}
})
it("uploads all files") {
sync.run(config).unsafeRunSync
val expected = Map(
"subdir/leaf-file" -> "prefix/subdir/leaf-file",
"root-file" -> "prefix/root-file"
it("uploads all files") {
val expectedUploads = Map(
"subdir/leaf-file" -> leafRemoteKey,
"root-file" -> rootRemoteKey
)
assertResult(expected)(uploadsRecord)
assertResult(expectedUploads)(sync.uploadsRecord)
}
it("copies nothing") {
val expectedCopies = Map()
assertResult(expectedCopies)(sync.copiesRecord)
}
it("deletes nothing") {
val expectedDeletions = Set()
assertResult(expectedDeletions)(sync.deletionsRecord)
}
}
describe("when no files should be uploaded") {
val rootHash = "a3a6ac11a0eb577b81b3bb5c95cc8a6e"
val leafHash = "208386a650bdec61cfcd7bd8dcb6b542"
val lastModified = Instant.now
var uploadsRecord: Map[String, RemoteKey] = Map()
val sync = new Sync(new S3Client with DummyS3Client {
override def listObjects(bucket: Bucket, prefix: RemoteKey) = IO(
HashLookup(
byHash = Map(rootHash -> ("prefix/root-file", lastModified), leafHash -> ("prefix/subdir/leaf-file", lastModified)),
byKey = Map("prefix/root-file" -> (rootHash, lastModified), "prefix/subdir/leaf-file" -> (leafHash, lastModified))))
override def upload(localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey) = {
if (bucket == testBucket)
uploadsRecord += (source.toPath.relativize(localFile.toPath).toString -> remoteKey)
IO(Right("some hash value"))
}
})
it("uploads nothing") {
val rootHash = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e")
val leafHash = MD5Hash("208386a650bdec61cfcd7bd8dcb6b542")
val s3ObjectsData = S3ObjectsData(
byHash = Map(
rootHash -> Set(KeyModified(RemoteKey("prefix/root-file"), lastModified)),
leafHash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))),
byKey = Map(
RemoteKey("prefix/root-file") -> HashModified(rootHash, lastModified),
RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified)))
val sync = new RecordingSync(testBucket, new DummyS3Client {}, s3ObjectsData)
sync.run(config).unsafeRunSync
val expected = Map()
assertResult(expected)(uploadsRecord)
it("uploads nothing") {
val expectedUploads = Map()
assertResult(expectedUploads)(sync.uploadsRecord)
}
it("copies nothing") {
val expectedCopies = Map()
assertResult(expectedCopies)(sync.copiesRecord)
}
it("deletes nothing") {
val expectedDeletions = Set()
assertResult(expectedDeletions)(sync.deletionsRecord)
}
}
describe("when a file is renamed it is moved on S3 with no upload") {
// 'root-file-old' should be renamed as 'root-file'
val rootHash = MD5Hash("a3a6ac11a0eb577b81b3bb5c95cc8a6e")
val leafHash = MD5Hash("208386a650bdec61cfcd7bd8dcb6b542")
val s3ObjectsData = S3ObjectsData(
byHash = Map(
rootHash -> Set(KeyModified(RemoteKey("prefix/root-file-old"), lastModified)),
leafHash -> Set(KeyModified(RemoteKey("prefix/subdir/leaf-file"), lastModified))),
byKey = Map(
RemoteKey("prefix/root-file-old") -> HashModified(rootHash, lastModified),
RemoteKey("prefix/subdir/leaf-file") -> HashModified(leafHash, lastModified)))
val sync = new RecordingSync(testBucket, new DummyS3Client {}, s3ObjectsData)
sync.run(config).unsafeRunSync
it("uploads nothing") {
val expectedUploads = Map()
assertResult(expectedUploads)(sync.uploadsRecord)
}
it("copies the file") {
val expectedCopies = Map(RemoteKey("prefix/root-file-old") -> RemoteKey("prefix/root-file"))
assertResult(expectedCopies)(sync.copiesRecord)
}
it("deletes the original") {
val expectedDeletions = Set(RemoteKey("prefix/root-file-old"))
assertResult(expectedDeletions)(sync.deletionsRecord)
}
}
describe("when a file is copied it is copied on S3 with no upload") {
it("TODO") {
pending
}
}
describe("when a file is deleted locally it is deleted from S3") {
val deletedHash = MD5Hash("deleted-hash")
val deletedKey = RemoteKey("prefix/deleted-file")
val s3ObjectsData = S3ObjectsData(
byHash = Map(
deletedHash -> Set(KeyModified(RemoteKey("prefix/deleted-file"), lastModified))),
byKey = Map(
deletedKey -> HashModified(deletedHash, lastModified)))
val sync = new RecordingSync(testBucket, new DummyS3Client {}, s3ObjectsData)
sync.run(config).unsafeRunSync
it("deleted key") {
val expectedDeletions = Set(deletedKey)
assertResult(expectedDeletions)(sync.deletionsRecord)
}
}
describe("io actions execute") {
val recordingS3Client = new RecordingS3Client
val client = S3Client.createClient(recordingS3Client)
val sync = new Sync(client)
sync.run(config).unsafeRunSync
it("invokes the underlying Java s3client") {
val expected = Set(
PutObjectRequest.builder().bucket(testBucket.name).key(rootRemoteKey.key).build(),
PutObjectRequest.builder().bucket(testBucket.name).key(leafRemoteKey.key).build()
)
val result = recordingS3Client.puts
assertResult(expected)(result)
}
}
}
class RecordingSync(testBucket: Bucket, s3Client: S3Client, s3ObjectsData: S3ObjectsData)
extends Sync(s3Client) {
var uploadsRecord: Map[String, RemoteKey] = Map()
var copiesRecord: Map[RemoteKey, RemoteKey] = Map()
var deletionsRecord: Set[RemoteKey] = Set()
override def listObjects(bucket: Bucket, prefix: RemoteKey)(implicit c: Config) = IO {s3ObjectsData}
override def upload(localFile: LocalFile,
bucket: Bucket
)(implicit c: Config) = IO {
if (bucket == testBucket)
uploadsRecord += (localFile.relative.toString -> localFile.remoteKey)
UploadS3Action(localFile.remoteKey, MD5Hash("some hash value"))
}
override def copy(bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey
)(implicit c: Config) = IO {
if (bucket == testBucket)
copiesRecord += (sourceKey -> targetKey)
CopyS3Action(targetKey)
}
override def delete(bucket: Bucket,
remoteKey: RemoteKey
)(implicit c: Config) = IO {
if (bucket == testBucket)
deletionsRecord += remoteKey
DeleteS3Action(remoteKey)
}
}
class RecordingS3Client extends S3AsyncClient {
var lists: Set[ListObjectsV2Request] = Set()
var puts: Set[PutObjectRequest] = Set()
override val underlying: s3.S3AsyncClient = new JavaS3AsyncClient {
override def serviceName(): String = "s3Recorder"
override def close(): Unit = ()
override def listObjectsV2(listObjectsV2Request: ListObjectsV2Request): CompletableFuture[ListObjectsV2Response] = {
lists += listObjectsV2Request
CompletableFuture.completedFuture(ListObjectsV2Response.builder().build())
}
override def putObject(putObjectRequest: PutObjectRequest,
requestBody: AsyncRequestBody): CompletableFuture[PutObjectResponse] = {
puts += putObjectRequest
CompletableFuture.completedFuture(PutObjectResponse.builder().eTag("not-null").build())
}
}
}
}

View file

@ -0,0 +1,17 @@
package net.kemitix.s3thorp
import java.io.File
import org.scalatest.FunSpec
abstract class UnitTest extends FunSpec {
def aLocalFile(path: String, myHash: MD5Hash, source: File, fileToKey: File => RemoteKey): LocalFile =
new LocalFile(source.toPath.resolve(path).toFile, source, fileToKey) {
override def hash: MD5Hash = myHash
}
def aRemoteKey(prefix: RemoteKey, path: String): RemoteKey =
RemoteKey(prefix.key + "/" + path)
}

View file

@ -1,39 +0,0 @@
package net.kemitix.s3thorp
import java.io.File
import java.time.Instant
import org.scalatest.FunSpec
class UploadSelectionFilterSuite extends FunSpec {
new UploadSelectionFilter {
describe("uploadRequiredFilter") {
val localFile = Resource(this, "test-file-for-hash.txt")
val localHash = "0cbfe978783bd7950d5da4ff85e4af37"
val config = Config("bucket", "prefix", source = localFile.getParentFile)
def invokeSubject(input: Either[File, S3MetaData]) =
uploadRequiredFilter(config)(input).toList
describe("when supplied a file") {
val input = Left(localFile)
it("should be marked for upload") {
assertResult(List(localFile))(invokeSubject(input))
}
}
describe("when supplied S3MetaData") {
describe("when hash is different") {
val input = Right(S3MetaData(localFile, "", "doesn't match any hash", Instant.now))
it("should be marked for upload") {
assertResult(List(localFile))(invokeSubject(input))
}
}
describe("when hash is the same") {
val input = Right(S3MetaData(localFile, "", localHash, Instant.now))
it("should not be marked for upload") {
assertResult(List())(invokeSubject(input))
}
}
}
}
}
}

View file

@ -5,55 +5,92 @@ import java.time.Instant
import cats.effect.IO
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
import net.kemitix.s3thorp.Sync.{Bucket, LocalFile, RemoteKey}
import org.scalatest.FunSpec
import software.amazon.awssdk.services.s3.model._
import net.kemitix.s3thorp._
import software.amazon.awssdk.services.s3.model.{PutObjectRequest, PutObjectResponse}
class S3ClientSuite extends FunSpec {
class S3ClientSuite
extends UnitTest
with KeyGenerator {
describe("objectHead") {
val key = "key"
val hash = "hash"
val lastModified = Instant.now
val hashLookup: HashLookup = HashLookup(
byHash = Map(hash -> (key, lastModified)),
byKey = Map(key -> (hash, lastModified)))
val source = Resource(this, "../upload")
def invoke(self: S3Client, remoteKey: RemoteKey) = {
self.objectHead(remoteKey)(hashLookup)
private val prefix = RemoteKey("prefix")
implicit private val config: Config = Config(Bucket("bucket"), prefix, source = source)
private val fileToKey = generateKey(config.source, config.prefix) _
describe("getS3Status") {
val hash = MD5Hash("hash")
val localFile = aLocalFile("the-file", hash, source, fileToKey)
val key = localFile.remoteKey
val keyotherkey = aLocalFile("other-key-same-hash", hash, source, fileToKey)
val diffhash = MD5Hash("diff")
val keydiffhash = aLocalFile("other-key-diff-hash", diffhash, source, fileToKey)
val lastModified = LastModified(Instant.now)
val s3ObjectsData: S3ObjectsData = S3ObjectsData(
byHash = Map(
hash -> Set(KeyModified(key, lastModified), KeyModified(keyotherkey.remoteKey, lastModified)),
diffhash -> Set(KeyModified(keydiffhash.remoteKey, lastModified))),
byKey = Map(
key -> HashModified(hash, lastModified),
keyotherkey.remoteKey -> HashModified(hash, lastModified),
keydiffhash.remoteKey -> HashModified(diffhash, lastModified)))
def invoke(self: S3Client, localFile: LocalFile) = {
self.getS3Status(localFile)(s3ObjectsData)
}
describe("when remote key exists") {
val s3Client = S3Client.defaultClient
it("should return Some(expected values)") {
assertResult(Some((hash, lastModified)))(invoke(s3Client, key))
it("should return (Some, Set.nonEmpty)") {
assertResult(
(Some(HashModified(hash, lastModified)),
Set(
KeyModified(key, lastModified),
KeyModified(keyotherkey.remoteKey, lastModified)))
)(invoke(s3Client, localFile))
}
}
describe("when remote key does not exist") {
describe("when remote key does not exist and no others matches hash") {
val s3Client = S3Client.defaultClient
it("should return None") {
assertResult(None)(invoke(s3Client, "missing-key"))
it("should return (None, Set.empty)") {
val localFile = aLocalFile("missing-file", MD5Hash("unique"), source, fileToKey)
assertResult(
(None,
Set.empty)
)(invoke(s3Client, localFile))
}
}
describe("when remote key exists and no others match hash") {
val s3Client = S3Client.defaultClient
it("should return (None, Set.nonEmpty)") {
assertResult(
(Some(HashModified(diffhash, lastModified)),
Set(KeyModified(keydiffhash.remoteKey, lastModified)))
)(invoke(s3Client, keydiffhash))
}
}
}
describe("upload") {
def invoke(s3Client: ThorpS3Client, localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey) =
s3Client.upload(localFile, bucket, remoteKey).unsafeRunSync
def invoke(s3Client: ThorpS3Client, localFile: LocalFile, bucket: Bucket) =
s3Client.upload(localFile, bucket).unsafeRunSync
describe("when uploading a file") {
val md5Hash = "the-md5hash"
val md5Hash = MD5Hash("the-md5hash")
val s3Client = new ThorpS3Client(
new S3CatsIOClient with JavaClientWrapper {
override def putObject(putObjectRequest: PutObjectRequest, requestBody: RB) =
IO(PutObjectResponse.builder().eTag(md5Hash).build())
IO(PutObjectResponse.builder().eTag(md5Hash.hash).build())
})
val localFile: LocalFile = new File("/some/file")
val bucket: Bucket = "a-bucket"
val remoteKey: RemoteKey = "prefix/file"
val source = new File("/")
val prefix = RemoteKey("prefix")
val localFile: LocalFile = aLocalFile("/some/file", md5Hash, source, generateKey(source, prefix))
val bucket: Bucket = Bucket("a-bucket")
val remoteKey: RemoteKey = RemoteKey("prefix/some/file")
it("should return hash of uploaded file") {
assertResult(Right(md5Hash))(invoke(s3Client, localFile, bucket, remoteKey))
assertResult(UploadS3Action(remoteKey, md5Hash))(invoke(s3Client, localFile, bucket))
}
}
}

View file

@ -0,0 +1,36 @@
package net.kemitix.s3thorp.awssdk
import java.time.Instant
import net.kemitix.s3thorp.{KeyModified, LastModified, MD5Hash, RemoteKey, UnitTest}
import software.amazon.awssdk.services.s3.model.S3Object
class S3ObjectsByHashSuite extends UnitTest {
new S3ObjectsByHash {
describe("grouping s3 object together by their hash values") {
val hash = MD5Hash("hash")
val key1 = RemoteKey("key-1")
val key2 = RemoteKey("key-2")
val lastModified = LastModified(Instant.now)
val o1 = s3object(hash, key1, lastModified)
val o2 = s3object(hash, key2, lastModified)
val os = Stream(o1, o2)
it("should group by the hash value") {
val expected: Map[MD5Hash, Set[KeyModified]] = Map(
hash -> Set(KeyModified(key1, lastModified), KeyModified(key2, lastModified))
)
val result = byHash(os)
assertResult(expected)(result)
}
}
}
private def s3object(md5Hash: MD5Hash, remoteKey: RemoteKey, lastModified: LastModified): S3Object =
S3Object.builder
.eTag(md5Hash.hash)
.key(remoteKey.key)
.lastModified(lastModified.when)
.build
}

View file

@ -1,11 +1,13 @@
package net.kemitix.s3thorp.awssdk
import java.time.Instant
import scala.collection.JavaConverters._
import java.time.temporal.ChronoUnit
import scala.collection.JavaConverters._
import cats.effect.IO
import com.github.j5ik2o.reactive.aws.s3.S3AsyncClient
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
import net.kemitix.s3thorp.{Bucket, Config, HashModified, KeyModified, LastModified, MD5Hash, Main, RemoteKey, Resource}
import org.scalatest.FunSpec
import software.amazon.awssdk.services.s3
import software.amazon.awssdk.services.s3.model.{ListObjectsV2Request, ListObjectsV2Response, S3Object}
@ -13,20 +15,49 @@ import software.amazon.awssdk.services.s3.model.{ListObjectsV2Request, ListObjec
class ThorpS3ClientSuite extends FunSpec {
describe("listObjectsInPrefix") {
val h1 = "hash1"
val k1 = "key1"
val lm1 = Instant.now
val o1 = S3Object.builder.eTag(h1).key(k1).lastModified(lm1).build
val h2 = "hash2"
val k2 = "key2"
val lm2 = Instant.now.minusSeconds(200)
val o2 = S3Object.builder.eTag(h2).key(k2).lastModified(lm2).build
val myFakeResponse: IO[ListObjectsV2Response] = IO{
val source = Resource(Main, "upload")
val prefix = RemoteKey("prefix")
implicit val config: Config = Config(Bucket("bucket"), prefix, source = source)
val lm = LastModified(Instant.now)
val h1 = MD5Hash("hash1")
val k1a = RemoteKey("key1a")
val o1a = S3Object.builder.eTag(h1.hash).key(k1a.key).lastModified(lm.when).build
val k1b = RemoteKey("key1b")
val o1b = S3Object.builder.eTag(h1.hash).key(k1b.key).lastModified(lm.when).build
val h2 = MD5Hash("hash2")
val k2 = RemoteKey("key2")
val o2 = S3Object.builder.eTag(h2.hash).key(k2.key).lastModified(lm.when).build
val myFakeResponse: IO[ListObjectsV2Response] = IO {
ListObjectsV2Response.builder()
.contents(List(o1, o2).asJava)
.contents(List(o1a, o1b, o2).asJava)
.build()
}
val subject = new ThorpS3Client(new S3CatsIOClient {
val s3client = new ThorpS3Client(new MyS3CatsIOClient {
override def listObjectsV2(listObjectsV2Request: ListObjectsV2Request): IO[ListObjectsV2Response] =
myFakeResponse
})
it("should build list of hash lookups, with duplicate objects grouped by hash") {
val expected = S3ObjectsData(
byHash = Map(
h1 -> Set(KeyModified(k1a, lm), KeyModified(k1b, lm)),
h2 -> Set(KeyModified(k2, lm))),
byKey = Map(
k1a -> HashModified(h1, lm),
k1b -> HashModified(h1, lm),
k2 -> HashModified(h2, lm)))
val result: S3ObjectsData = s3client.listObjects(Bucket("bucket"), RemoteKey("prefix")).unsafeRunSync()
assertResult(expected.byHash.keys)(result.byHash.keys)
assertResult(expected.byKey.keys)(result.byKey.keys)
assertResult(expected)(result)
}
}
trait MyS3CatsIOClient extends S3CatsIOClient {
override val underlying: S3AsyncClient = new S3AsyncClient {
override val underlying: s3.S3AsyncClient = new s3.S3AsyncClient {
override def serviceName(): String = "fake-s3-client"
@ -34,16 +65,5 @@ class ThorpS3ClientSuite extends FunSpec {
override def close(): Unit = ()
}
}
override def listObjectsV2(listObjectsV2Request: ListObjectsV2Request) =
myFakeResponse
})
it("should build list of hash lookups") {
val result: HashLookup = subject.listObjects("bucket", "prefix").unsafeRunSync()
val expected = HashLookup(
Map(h1 -> (k1, lm1), h2 -> (k2, lm2)),
Map(k1 -> (h1, lm1), k2 -> (h2, lm2)))
assertResult(expected)(result)
}
}
}