[sync] use listObjects and show count of files uploaded at end

This commit is contained in:
Paul Campbell 2019-05-15 08:15:21 +01:00
parent 74be5ec1ac
commit ed6550e134
14 changed files with 117 additions and 170 deletions

View file

@ -2,28 +2,18 @@ package net.kemitix.s3thorp
import java.io.File
import fs2.Stream
import cats.effect.IO
trait LocalFileStream {
def streamDirectoryPaths(file: File): Stream[IO, File] =
{
Stream.eval(IO(file)).
flatMap(file => Stream.fromIterator[IO, File](dirPaths(file))).
flatMap(recurseIntoSubDirectories)
}
def streamDirectoryPaths(file: File): Stream[File] =
dirPaths(file)
.flatMap(f => recurseIntoSubDirectories(f))
private def dirPaths(file: File): Iterator[File] = {
Option(file.listFiles).map(_.iterator).
getOrElse(throw new IllegalArgumentException(s"Directory not found $file"))
}
private def dirPaths(file: File): Stream[File] = Option(file.listFiles)
.getOrElse(throw new IllegalArgumentException(s"Directory not found $file")).toStream
private def recurseIntoSubDirectories: File => Stream[IO, File] =
private def recurseIntoSubDirectories: File => Stream[File] =
file =>
if (file.isDirectory) streamDirectoryPaths(file)
else Stream(file)
}

View file

@ -2,29 +2,24 @@ package net.kemitix.s3thorp
import java.io.File
import fs2.Stream
import cats.effect.IO
import net.kemitix.s3thorp.awssdk.S3Client
import net.kemitix.s3thorp.Sync.{LastModified, MD5Hash}
import net.kemitix.s3thorp.awssdk.{HashLookup, S3Client}
trait S3MetaDataEnricher
extends S3Client
with KeyGenerator
with Logging {
def enrichWithS3MetaData(c: Config): File => Stream[IO, Either[File, S3MetaData]] = {
def enrichWithS3MetaData(c: Config)(implicit hashLookup: HashLookup): File => Either[File, S3MetaData] = {
val remoteKey = generateKey(c)_
file =>
Stream.eval({
logger.info(s"- Consider: ${c.relativePath(file)}")
val key = remoteKey(file)
for {
head <- objectHead(c.bucket, key)
} yield head.map {
case (hash, lastModified) => {
val cleanHash = hash.filter{c=>c!='"'}
Right(S3MetaData(file, key, cleanHash, lastModified))
}
}.getOrElse(Left(file))
})
file => {
logger.info(s"- Consider: ${c.relativePath(file)}")
val key = remoteKey(file)
objectHead(key).map {
hlm: (MD5Hash, LastModified) => {
Right(S3MetaData(file, key, hlm._1.filter { c => c != '"' }, hlm._2))
}
}.getOrElse(Left(file))
}
}
}

View file

@ -2,8 +2,8 @@ package net.kemitix.s3thorp
import java.io.File
import fs2.Stream
import cats.effect.IO
import net.kemitix.s3thorp.Sync.MD5Hash
import net.kemitix.s3thorp.awssdk.S3Client
trait S3Uploader
@ -11,16 +11,13 @@ trait S3Uploader
with KeyGenerator
with Logging {
def performUpload(c: Config): File => Stream[IO, Unit] = {
def performUpload(c: Config): File => (File, IO[Either[Throwable, MD5Hash]]) = {
val remoteKey = generateKey(c) _
file => {
val key = remoteKey(file)
val shortFile = c.relativePath(file)
Stream.eval(for {
_ <- IO(logger.info(s" Upload: $shortFile"))
_ <- upload(file, c.bucket, key)
_ <- IO(logger.info(s" Done: $shortFile"))
} yield ())
logger.info(s" Upload: $shortFile")
(file, upload(file, c.bucket, key))
}
}
}

View file

@ -4,8 +4,8 @@ import java.io.File
import java.time.Instant
import cats.effect._
import net.kemitix.s3thorp.Sync.{Bucket, LocalFile, RemoteKey}
import net.kemitix.s3thorp.awssdk.S3Client
import net.kemitix.s3thorp.Sync.{Bucket, LocalFile, MD5Hash, RemoteKey}
import net.kemitix.s3thorp.awssdk.{HashLookup, S3Client}
class Sync(s3Client: S3Client)
extends LocalFileStream
@ -14,20 +14,26 @@ class Sync(s3Client: S3Client)
with S3Uploader
with Logging {
override def objectHead(bucket: Bucket, remoteKey: RemoteKey)=
s3Client.objectHead(bucket, remoteKey)
override def upload(localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey) =
s3Client.upload(localFile, bucket, remoteKey)
def run(c: Config): IO[Unit] = {
logger.info(s"Bucket: ${c.bucket}, Prefix: ${c.prefix}, Source: ${c.source}")
streamDirectoryPaths(c.source).flatMap(
enrichWithS3MetaData(c)).flatMap(
uploadRequiredFilter(c)).flatMap(
performUpload(c)).compile.drain
s3Client.listObjects(c.bucket, c.prefix).map { hashLookup => {
val stream: Stream[(File, IO[Either[Throwable, MD5Hash]])] = streamDirectoryPaths(c.source).map(
enrichWithS3MetaData(c)(hashLookup)).flatMap(
uploadRequiredFilter(c)).map(
performUpload(c))
val count: Int = stream.foldLeft(0)((a: Int, io) => {
io._2.unsafeRunSync
logger.info(s"- Done: ${io._1}")
a + 1
})
logger.info(s"Uploaded $count files")
}}
}
override def listObjects(bucket: Bucket, prefix: RemoteKey): IO[HashLookup] = ???
}
object Sync {

View file

@ -1,10 +1,9 @@
package net.kemitix.s3thorp
import fs2.Stream
import cats.effect.IO
import net.kemitix.s3thorp.Sync.{MD5Hash, LocalFile}
import java.security.{MessageDigest, DigestInputStream}
import java.io.{File, FileInputStream}
import java.security.{DigestInputStream, MessageDigest}
import net.kemitix.s3thorp.Sync.{LocalFile, MD5Hash}
trait UploadSelectionFilter
extends Logging {
@ -19,22 +18,18 @@ trait UploadSelectionFilter
md5.digest.map("%02x".format(_)).mkString
}
def uploadRequiredFilter(c: Config): Either[File, S3MetaData] => Stream[IO, File] = {
def uploadRequiredFilter(c: Config): Either[File, S3MetaData] => Stream[File] = {
case Left(file) => {
logger.info(s" Created: ${c.relativePath(file)}")
Stream(file)
}
case Right(s3Metadata) =>
Stream.eval(for {
localHash <- IO(md5File(s3Metadata.localFile))
} yield (s3Metadata.localFile, localHash)).
filter { case (_, localHash) => localHash != s3Metadata.remoteHash }.
map {
case (localFile,_) => {
logger.info(s" Updated: ${c.relativePath(localFile)}")
localFile
}
}
case Right(s3Metadata) => {
val localHash: MD5Hash = md5File(s3Metadata.localFile)
if (localHash != s3Metadata.remoteHash) {
logger.info(s" Updated: ${c.relativePath(s3Metadata.localFile)}")
Stream(s3Metadata.localFile)
}
else Stream.empty
}
}
}

View file

@ -6,7 +6,10 @@ import net.kemitix.s3thorp.Sync.{Bucket, LastModified, LocalFile, MD5Hash, Remot
trait S3Client {
def objectHead(bucket: Bucket, remoteKey: RemoteKey): IO[Option[(MD5Hash, LastModified)]]
final def objectHead(remoteKey: RemoteKey)(implicit hashLookup: HashLookup): Option[(MD5Hash, LastModified)] =
hashLookup.byKey.get(remoteKey)
def listObjects(bucket: Bucket, prefix: RemoteKey): IO[HashLookup]
def upload(localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey): IO[Either[Throwable, MD5Hash]]

View file

@ -4,23 +4,12 @@ import cats.effect.IO
import com.github.j5ik2o.reactive.aws.s3.cats.S3CatsIOClient
import net.kemitix.s3thorp.Sync._
import software.amazon.awssdk.core.async.AsyncRequestBody
import software.amazon.awssdk.services.s3.model.{HeadObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Object}
import software.amazon.awssdk.services.s3.model.{ListObjectsV2Request, PutObjectRequest, S3Object}
import scala.collection.JavaConverters._
private class ThorpS3Client(s3Client: S3CatsIOClient) extends S3Client {
def objectHead(bucket: Bucket, remoteKey: RemoteKey): IO[Option[(MD5Hash, LastModified)]] = {
val request = HeadObjectRequest.builder()
.bucket(bucket)
.key(remoteKey)
.build()
s3Client.headObject(request).attempt.map {
case Right(r) => Some((r.eTag(), r.lastModified()))
case Left(_) => None
}
}
def upload(localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey): IO[Either[Throwable, MD5Hash]] = {
val request = PutObjectRequest.builder()
.bucket(bucket)

View file

@ -1,12 +1,12 @@
package net.kemitix.s3thorp
import cats.effect.IO
import net.kemitix.s3thorp.Sync.{Bucket, LastModified, LocalFile, MD5Hash, RemoteKey}
import net.kemitix.s3thorp.awssdk.S3Client
import net.kemitix.s3thorp.Sync.{Bucket, LocalFile, MD5Hash, RemoteKey}
import net.kemitix.s3thorp.awssdk.{HashLookup, S3Client}
trait DummyS3Client extends S3Client {
override def objectHead(bucket: Bucket, remoteKey: RemoteKey): IO[Option[(MD5Hash, LastModified)]] = ???
override def upload(localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey): IO[Either[Throwable, MD5Hash]] = ???
override def listObjects(bucket: Bucket, prefix: RemoteKey): IO[HashLookup] = ???
}

View file

@ -0,0 +1,15 @@
package net.kemitix.s3thorp
import org.scalatest.FunSpec
class LocalFileStreamSuite extends FunSpec with LocalFileStream {
describe("streamDirectoryPaths") {
var uploadResource = Resource(this, "upload")
it("should find all files") {
val result: List[String] = streamDirectoryPaths(uploadResource).toList
.map(x=>uploadResource.toPath.relativize(x.toPath).toString)
assertResult(List("subdir/leaf-file", "root-file"))(result)
}
}
}

View file

@ -4,7 +4,7 @@ import java.io.File
import java.nio.file.Paths
import java.time.Instant
import cats.effect.IO
import net.kemitix.s3thorp.awssdk.HashLookup
import org.scalatest.FunSpec
class S3MetaDataEnricherSuite extends FunSpec {
@ -40,26 +40,30 @@ class S3MetaDataEnricherSuite extends FunSpec {
describe("enrich with metadata") {
val local = "localFile"
val localFile = new File(sourcePath + local)
val fileWithRemote = new File(sourcePath + local)
val fileWithNoRemote = new File(sourcePath + "noRemote")
val remoteKey = prefix + "/" + local
val hash = "hash"
val lastModified = Instant.now()
val hashLookup = HashLookup(
byHash = Map(hash -> (remoteKey, lastModified)),
byKey = Map(remoteKey -> (hash, lastModified))
)
describe("when remote exists") {
val hash = "hash"
val lastModified = Instant.now()
new S3MetaDataEnricher with DummyS3Client {
override def objectHead(bucket: String, key: String) = IO(Some((hash, lastModified)))
it("returns metadata") {
val expectedMetadata = S3MetaData(localFile, s"$prefix/$local", hash, lastModified)
val expectedMetadata = S3MetaData(fileWithRemote, remoteKey, hash, lastModified)
val result = enrichWithS3MetaData(config)(localFile).compile.toList.unsafeRunSync().head
val result = enrichWithS3MetaData(config)(hashLookup)(fileWithRemote)
assertResult(Right(expectedMetadata))(result)
}
}
}
describe("when remote doesn't exist") {
new S3MetaDataEnricher with DummyS3Client {
override def objectHead(bucket: String, key: String) = IO(None)
it("returns file to upload") {
val result = enrichWithS3MetaData(config)(localFile).compile.toList.unsafeRunSync().head
assertResult(Left(localFile))(result)
val result = enrichWithS3MetaData(config)(hashLookup)(fileWithNoRemote)
assertResult(Left(fileWithNoRemote))(result)
}
}
}

View file

@ -1,28 +0,0 @@
package net.kemitix.s3thorp
import java.io.File
import cats.effect.IO
import net.kemitix.s3thorp.Sync.{Bucket, LocalFile, RemoteKey}
import org.scalatest.FunSpec
class S3UploaderSuite extends FunSpec {
new S3Uploader {
val md5Hash = "the-md5hash"
override def objectHead(bucket: String, key: String) = ???
override def upload(localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey) =
IO(Right(md5Hash))
describe("upload") {
val config: Config = Config("bucket", "prefix", new File("/path/to/files"))
def invoke(file: File) =
performUpload(config)(file).compile.toList.unsafeRunSync()
it("should return") {
val result = invoke(new File("/path/to/files/a-file-to-upload.txt"))
assertResult(List(()))(result)
}
}
}
}

View file

@ -5,7 +5,7 @@ import java.time.Instant
import cats.effect.IO
import net.kemitix.s3thorp.Sync.{Bucket, LocalFile, MD5Hash, RemoteKey}
import net.kemitix.s3thorp.awssdk.S3Client
import net.kemitix.s3thorp.awssdk.{HashLookup, S3Client}
import org.scalatest.FunSpec
class SyncSuite extends FunSpec {
@ -13,22 +13,6 @@ class SyncSuite extends FunSpec {
describe("s3client thunk") {
val testBucket = "bucket"
val testRemoteKey = "prefix/file"
describe("objectHead") {
val md5Hash = "md5Hash"
val lastModified = Instant.now()
val sync = new Sync(new S3Client with DummyS3Client {
override def objectHead(bucket: String, key: String) = {
assert(bucket == testBucket)
assert(key == testRemoteKey)
IO(Some((md5Hash, lastModified)))
}
})
it("delegates unmodified to the S3Client") {
assertResult(Some((md5Hash, lastModified)))(
sync.objectHead(testBucket, testRemoteKey).
unsafeRunSync())
}
}
describe("upload") {
val md5Hash = "the-hash"
val testLocalFile = new File("file")
@ -54,9 +38,11 @@ class SyncSuite extends FunSpec {
val config = Config("bucket", "prefix", source)
describe("when all files should be uploaded") {
var uploadsRecord: Map[String, RemoteKey] = Map()
val sync = new Sync(new S3Client {
override def objectHead(bucket: Bucket, remoteKey: RemoteKey) =
IO(None)
val sync = new Sync(new DummyS3Client{
override def listObjects(bucket: Bucket, prefix: RemoteKey) = IO(
HashLookup(
byHash = Map(),
byKey = Map()))
override def upload(localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey) = {
if (bucket == testBucket)
uploadsRecord += (source.toPath.relativize(localFile.toPath).toString -> remoteKey)
@ -75,14 +61,13 @@ class SyncSuite extends FunSpec {
describe("when no files should be uploaded") {
val rootHash = "a3a6ac11a0eb577b81b3bb5c95cc8a6e"
val leafHash = "208386a650bdec61cfcd7bd8dcb6b542"
val lastModified = Instant.now
var uploadsRecord: Map[String, RemoteKey] = Map()
val sync = new Sync(new S3Client {
override def objectHead(bucket: Bucket, remoteKey: RemoteKey) = IO(
remoteKey match {
case "prefix/root-file" => Some((rootHash, Instant.now))
case "prefix/subdir/leaf-file" => Some((leafHash, Instant.now))
case _ => None
})
val sync = new Sync(new S3Client with DummyS3Client {
override def listObjects(bucket: Bucket, prefix: RemoteKey) = IO(
HashLookup(
byHash = Map(rootHash -> ("prefix/root-file", lastModified), leafHash -> ("prefix/subdir/leaf-file", lastModified)),
byKey = Map("prefix/root-file" -> (rootHash, lastModified), "prefix/subdir/leaf-file" -> (leafHash, lastModified))))
override def upload(localFile: LocalFile, bucket: Bucket, remoteKey: RemoteKey) = {
if (bucket == testBucket)
uploadsRecord += (source.toPath.relativize(localFile.toPath).toString -> remoteKey)

View file

@ -13,7 +13,7 @@ class UploadSelectionFilterSuite extends FunSpec {
val localHash = "0cbfe978783bd7950d5da4ff85e4af37"
val config = Config("bucket", "prefix", localFile.getParentFile)
def invokeSubject(input: Either[File, S3MetaData]) =
uploadRequiredFilter(config)(input).compile.toList.unsafeRunSync()
uploadRequiredFilter(config)(input).toList
describe("when supplied a file") {
val input = Left(localFile)
it("should be marked for upload") {

View file

@ -12,34 +12,30 @@ import software.amazon.awssdk.services.s3.model._
class S3ClientSuite extends FunSpec {
describe("objectHead") {
def invoke(self: S3Client) = {
self.objectHead("bucket", "remoteKey").unsafeRunSync()
val key = "key"
val hash = "hash"
val lastModified = Instant.now
val hashLookup: HashLookup = HashLookup(
byHash = Map(hash -> (key, lastModified)),
byKey = Map(key -> (hash, lastModified)))
def invoke(self: S3Client, remoteKey: RemoteKey) = {
self.objectHead(remoteKey)(hashLookup)
}
describe("when underlying client response is okay") {
val expectedHash = "hash"
val expectedLastModified = Instant.now
val s3Client = new ThorpS3Client(new S3CatsIOClient with JavaClientWrapper {
override def headObject(headObjectRequest: HeadObjectRequest) =
IO(HeadObjectResponse.builder().
eTag(expectedHash).
lastModified(expectedLastModified).
build())
})
describe("when remote key exists") {
val s3Client = S3Client.defaultClient
it("should return Some(expected values)") {
assertResult(Some((expectedHash, expectedLastModified)))(invoke(s3Client))
assertResult(Some((hash, lastModified)))(invoke(s3Client, key))
}
}
describe("when underlying client throws NoSuchKeyException") {
val s3Client = new ThorpS3Client(new S3CatsIOClient with JavaClientWrapper {
override def headObject(headObjectRequest: HeadObjectRequest) =
IO(throw NoSuchKeyException.builder().build())
})
it("should return None") {
assertResult(None)(invoke(s3Client))
}
describe("when remote key does not exist") {
val s3Client = S3Client.defaultClient
it("should return None") {
assertResult(None)(invoke(s3Client, "missing-key"))
}
}
}