From 37ac41093e245efd03cda5eb3040ffc51996b2c9 Mon Sep 17 00:00:00 2001 From: Paul Campbell Date: Thu, 23 May 2019 18:19:51 +0100 Subject: [PATCH] Improved S3Client logging (#17) * [ThorpS3Client] Log event when event actually occurs * [MD5HashGenerator] log activity reading md5 hash for local files * [awssdk] Extract logging into S3ClientLogging * [S3ClientLogging] raise logging levels * [SyncLogging] Remove per-file logging * [S3ClientLogging] More readable messages --- .../scala/net/kemitix/s3thorp/LocalFile.scala | 1 + .../kemitix/s3thorp/MD5HashGenerator.scala | 4 +- .../net/kemitix/s3thorp/SyncLogging.scala | 3 - .../s3thorp/awssdk/S3ClientLogging.scala | 78 ++++++++++-- .../s3thorp/awssdk/ThorpS3Client.scala | 117 +++++++++--------- .../scala/net/kemitix/s3thorp/UnitTest.scala | 3 +- 6 files changed, 138 insertions(+), 68 deletions(-) diff --git a/src/main/scala/net/kemitix/s3thorp/LocalFile.scala b/src/main/scala/net/kemitix/s3thorp/LocalFile.scala index a1a5030..8c8dd1a 100644 --- a/src/main/scala/net/kemitix/s3thorp/LocalFile.scala +++ b/src/main/scala/net/kemitix/s3thorp/LocalFile.scala @@ -6,6 +6,7 @@ import java.nio.file.Path case class LocalFile(file: File, source: File, keyGenerator: File => RemoteKey) + (implicit c: Config) extends MD5HashGenerator { require(!file.isDirectory, s"LocalFile must not be a directory: $file") diff --git a/src/main/scala/net/kemitix/s3thorp/MD5HashGenerator.scala b/src/main/scala/net/kemitix/s3thorp/MD5HashGenerator.scala index 0b1139e..56491cd 100644 --- a/src/main/scala/net/kemitix/s3thorp/MD5HashGenerator.scala +++ b/src/main/scala/net/kemitix/s3thorp/MD5HashGenerator.scala @@ -6,12 +6,14 @@ import java.security.{DigestInputStream, MessageDigest} trait MD5HashGenerator extends Logging { - def md5File(file: File): MD5Hash = { + def md5File(file: File)(implicit c: Config): MD5Hash = { + log5(s"md5file:reading:${file.length}:$file") val buffer = new Array[Byte](8192) val md5 = MessageDigest.getInstance("MD5") val dis = new DigestInputStream(new FileInputStream(file), md5) try { while (dis.read(buffer) != -1) { } } finally { dis.close() } val hash = md5.digest.map("%02x".format(_)).mkString + log5(s"md5file:generated:$hash:$file") MD5Hash(hash) } diff --git a/src/main/scala/net/kemitix/s3thorp/SyncLogging.scala b/src/main/scala/net/kemitix/s3thorp/SyncLogging.scala index 88ed078..c2374f1 100644 --- a/src/main/scala/net/kemitix/s3thorp/SyncLogging.scala +++ b/src/main/scala/net/kemitix/s3thorp/SyncLogging.scala @@ -21,13 +21,10 @@ trait SyncLogging extends Logging { (counters: Counters, s3Action: S3Action) => { s3Action match { case UploadS3Action(remoteKey, _) => - log1(s"- Uploaded: ${remoteKey.key}") counters.copy(uploaded = counters.uploaded + 1) case CopyS3Action(remoteKey) => - log1(s"- Copied: ${remoteKey.key}") counters.copy(copied = counters.copied + 1) case DeleteS3Action(remoteKey) => - log1(s"- Deleted: ${remoteKey.key}") counters.copy(deleted = counters.deleted + 1) case _ => counters } diff --git a/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientLogging.scala b/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientLogging.scala index 3fc6aaf..a9b0e33 100644 --- a/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientLogging.scala +++ b/src/main/scala/net/kemitix/s3thorp/awssdk/S3ClientLogging.scala @@ -1,16 +1,80 @@ package net.kemitix.s3thorp.awssdk -import net.kemitix.s3thorp.{Bucket, Config, LocalFile, Logging} +import cats.effect.IO +import net.kemitix.s3thorp.{Bucket, Config, LocalFile, Logging, RemoteKey} +import software.amazon.awssdk.services.s3.model.{CopyObjectResponse, DeleteObjectResponse, ListObjectsV2Response, PutObjectResponse} trait S3ClientLogging extends Logging { - def logUploadStart(localFile: LocalFile, bucket: Bucket) - (implicit c: Config)= - log5(s"s3Client:upload:start: ${localFile.file}") + def logListObjectsStart(bucket: Bucket, + prefix: RemoteKey) + (implicit c: Config): ListObjectsV2Response => IO[ListObjectsV2Response] = { + in => IO { + log3(s"Fetch S3 Summary: ${bucket.name}:${prefix.key}") + in + } + } - def logUploadDone(localFile: LocalFile, bucket: Bucket) - (implicit c: Config) = - log5(s"s3Client:upload:done : ${localFile.file}") + def logListObjectsFinish(bucket: Bucket, + prefix: RemoteKey) + (implicit c: Config): ListObjectsV2Response => IO[Unit] = { + in => IO { + log2(s"Fetched S3 Summary: ${bucket.name}:${prefix.key}") + } + } + + def logUploadStart(localFile: LocalFile, + bucket: Bucket) + (implicit c: Config): PutObjectResponse => IO[PutObjectResponse] = { + in => IO { + log4(s"Uploading: ${bucket.name}:${localFile.remoteKey}") + in + } + } + + def logUploadFinish(localFile: LocalFile, + bucket: Bucket) + (implicit c: Config): PutObjectResponse => IO[Unit] = { + in =>IO { + log3(s"Uploaded: ${bucket.name}:${localFile.remoteKey}") + } + } + + def logCopyStart(bucket: Bucket, + sourceKey: RemoteKey, + targetKey: RemoteKey) + (implicit c: Config): CopyObjectResponse => IO[CopyObjectResponse] = { + in => IO { + log4(s"Copy: ${bucket.name}:${sourceKey.key} => ${targetKey.key}") + in + } + } + + def logCopyFinish(bucket: Bucket, + sourceKey: RemoteKey, + targetKey: RemoteKey) + (implicit c: Config): CopyObjectResponse => IO[Unit] = { + in => IO { + log3(s"Copied: ${bucket.name}:${sourceKey.key} => ${targetKey.key}") + } + } + + def logDeleteStart(bucket: Bucket, + remoteKey: RemoteKey) + (implicit c: Config): DeleteObjectResponse => IO[DeleteObjectResponse] = { + in => IO { + log4(s"Delete: ${bucket.name}:${remoteKey.key}") + in + } + } + + def logDeleteFinish(bucket: Bucket, + remoteKey: RemoteKey) + (implicit c: Config): DeleteObjectResponse => IO[Unit] = { + in => IO { + log3(s"Deleted: ${bucket.name}:${remoteKey.key}") + } + } } diff --git a/src/main/scala/net/kemitix/s3thorp/awssdk/ThorpS3Client.scala b/src/main/scala/net/kemitix/s3thorp/awssdk/ThorpS3Client.scala index 811e59b..dfa8a97 100644 --- a/src/main/scala/net/kemitix/s3thorp/awssdk/ThorpS3Client.scala +++ b/src/main/scala/net/kemitix/s3thorp/awssdk/ThorpS3Client.scala @@ -11,74 +11,79 @@ import scala.collection.JavaConverters._ private class ThorpS3Client(s3Client: S3CatsIOClient) extends S3Client with S3ObjectsByHash - with Logging { + with S3ClientLogging { - override def upload(localFile: LocalFile, - bucket: Bucket - )(implicit c: Config): IO[UploadS3Action] = { - log5(s"upload:bucket = ${bucket.name}, localFile = ${localFile.remoteKey}") - val request = PutObjectRequest.builder() + override def listObjects(bucket: Bucket, + prefix: RemoteKey) + (implicit c: Config): IO[S3ObjectsData] = { + val request = ListObjectsV2Request.builder .bucket(bucket.name) - .key(localFile.remoteKey.key) - .build() - val body = AsyncRequestBody.fromFile(localFile.file) - s3Client.putObject(request, body) - .map(_.eTag) - .map(_.filter{c => c != '"'}) - .map(MD5Hash) - .map(md5Hash => UploadS3Action(localFile.remoteKey, md5Hash)) + .prefix(prefix.key).build + s3Client.listObjectsV2(request) + .bracket( + logListObjectsStart(bucket, prefix))( + logListObjectsFinish(bucket,prefix)) + .map(_.contents) + .map(_.asScala) + .map(_.toStream) + .map(os => S3ObjectsData(byHash(os), byKey(os))) } - override def copy(bucket: Bucket, - sourceKey: RemoteKey, - hash: MD5Hash, - targetKey: RemoteKey - )(implicit c: Config): IO[CopyS3Action] = { - log5(s"copy:bucket = ${bucket.name}, sourceKey = ${sourceKey.key}, targetKey = ${targetKey.key}") - val request = CopyObjectRequest.builder() - .bucket(bucket.name) - .copySource(s"${bucket.name}/${sourceKey.key}") - .copySourceIfMatch(hash.hash) - .key(targetKey.key) - .build() - s3Client.copyObject(request) - .map(_ => CopyS3Action(targetKey)) - } - - override def delete(bucket: Bucket, - remoteKey: RemoteKey - )(implicit c: Config): IO[DeleteS3Action] = { - log5(s"delete:bucket = ${bucket.name}, remoteKey = ${remoteKey.key}") - val request = DeleteObjectRequest.builder() - .bucket(bucket.name) - .key(remoteKey.key) - .build() - s3Client.deleteObject(request) - .map(_ => DeleteS3Action(remoteKey)) - } - - private def asS3ObjectsData: Stream[S3Object] => S3ObjectsData = - os => S3ObjectsData(byHash(os), byKey(os)) - private def byKey(os: Stream[S3Object]) = os.map { o => { val remoteKey = RemoteKey(o.key) - val hash = MD5Hash(o.eTag().filter { c => c != '"' }) + val hash = MD5Hash(o.eTag() filter stripQuotes) val lastModified = LastModified(o.lastModified()) (remoteKey, HashModified(hash, lastModified)) }}.toMap - - def listObjects(bucket: Bucket, prefix: RemoteKey) - (implicit c: Config): IO[S3ObjectsData] = { - log5(s"listObjects:bucket = ${bucket.name}, prefix: ${prefix.key}") - val request = ListObjectsV2Request.builder() + override def upload(localFile: LocalFile, + bucket: Bucket) + (implicit c: Config): IO[UploadS3Action] = { + val request = PutObjectRequest.builder .bucket(bucket.name) - .prefix(prefix.key) - .build() - s3Client.listObjectsV2(request) - .map(r => r.contents.asScala.toStream) - .map(asS3ObjectsData) + .key(localFile.remoteKey.key).build + val body = AsyncRequestBody.fromFile(localFile.file) + s3Client.putObject(request, body) + .bracket( + logUploadStart(localFile, bucket))( + logUploadFinish(localFile, bucket)) + .map(_.eTag) + .map(_ filter stripQuotes) + .map(MD5Hash) + .map(UploadS3Action(localFile.remoteKey, _)) + } + + private def stripQuotes: Char => Boolean = _ != '"' + + override def copy(bucket: Bucket, + sourceKey: RemoteKey, + hash: MD5Hash, + targetKey: RemoteKey) + (implicit c: Config): IO[CopyS3Action] = { + val request = CopyObjectRequest.builder + .bucket(bucket.name) + .copySource(s"${bucket.name}/${sourceKey.key}") + .copySourceIfMatch(hash.hash) + .key(targetKey.key).build + s3Client.copyObject(request) + .bracket( + logCopyStart(bucket, sourceKey, targetKey))( + logCopyFinish(bucket, sourceKey,targetKey)) + .map(_ => CopyS3Action(targetKey)) + } + + override def delete(bucket: Bucket, + remoteKey: RemoteKey) + (implicit c: Config): IO[DeleteS3Action] = { + val request = DeleteObjectRequest.builder + .bucket(bucket.name) + .key(remoteKey.key).build + s3Client.deleteObject(request) + .bracket( + logDeleteStart(bucket, remoteKey))( + logDeleteFinish(bucket, remoteKey)) + .map(_ => DeleteS3Action(remoteKey)) } } diff --git a/src/test/scala/net/kemitix/s3thorp/UnitTest.scala b/src/test/scala/net/kemitix/s3thorp/UnitTest.scala index 53befe2..c3a8163 100644 --- a/src/test/scala/net/kemitix/s3thorp/UnitTest.scala +++ b/src/test/scala/net/kemitix/s3thorp/UnitTest.scala @@ -6,7 +6,8 @@ import org.scalatest.FunSpec abstract class UnitTest extends FunSpec { - def aLocalFile(path: String, myHash: MD5Hash, source: File, fileToKey: File => RemoteKey): LocalFile = + def aLocalFile(path: String, myHash: MD5Hash, source: File, fileToKey: File => RemoteKey) + (implicit c: Config): LocalFile = new LocalFile(source.toPath.resolve(path).toFile, source, fileToKey) { override def hash: MD5Hash = myHash }