Improved S3Client logging (#17)
* [ThorpS3Client] Log event when event actually occurs * [MD5HashGenerator] log activity reading md5 hash for local files * [awssdk] Extract logging into S3ClientLogging * [S3ClientLogging] raise logging levels * [SyncLogging] Remove per-file logging * [S3ClientLogging] More readable messages
This commit is contained in:
parent
0fe9b86471
commit
37ac41093e
6 changed files with 138 additions and 68 deletions
|
@ -6,6 +6,7 @@ import java.nio.file.Path
|
|||
case class LocalFile(file: File,
|
||||
source: File,
|
||||
keyGenerator: File => RemoteKey)
|
||||
(implicit c: Config)
|
||||
extends MD5HashGenerator {
|
||||
|
||||
require(!file.isDirectory, s"LocalFile must not be a directory: $file")
|
||||
|
|
|
@ -6,12 +6,14 @@ import java.security.{DigestInputStream, MessageDigest}
|
|||
trait MD5HashGenerator
|
||||
extends Logging {
|
||||
|
||||
def md5File(file: File): MD5Hash = {
|
||||
def md5File(file: File)(implicit c: Config): MD5Hash = {
|
||||
log5(s"md5file:reading:${file.length}:$file")
|
||||
val buffer = new Array[Byte](8192)
|
||||
val md5 = MessageDigest.getInstance("MD5")
|
||||
val dis = new DigestInputStream(new FileInputStream(file), md5)
|
||||
try { while (dis.read(buffer) != -1) { } } finally { dis.close() }
|
||||
val hash = md5.digest.map("%02x".format(_)).mkString
|
||||
log5(s"md5file:generated:$hash:$file")
|
||||
MD5Hash(hash)
|
||||
}
|
||||
|
||||
|
|
|
@ -21,13 +21,10 @@ trait SyncLogging extends Logging {
|
|||
(counters: Counters, s3Action: S3Action) => {
|
||||
s3Action match {
|
||||
case UploadS3Action(remoteKey, _) =>
|
||||
log1(s"- Uploaded: ${remoteKey.key}")
|
||||
counters.copy(uploaded = counters.uploaded + 1)
|
||||
case CopyS3Action(remoteKey) =>
|
||||
log1(s"- Copied: ${remoteKey.key}")
|
||||
counters.copy(copied = counters.copied + 1)
|
||||
case DeleteS3Action(remoteKey) =>
|
||||
log1(s"- Deleted: ${remoteKey.key}")
|
||||
counters.copy(deleted = counters.deleted + 1)
|
||||
case _ => counters
|
||||
}
|
||||
|
|
|
@ -1,16 +1,80 @@
|
|||
package net.kemitix.s3thorp.awssdk
|
||||
|
||||
import net.kemitix.s3thorp.{Bucket, Config, LocalFile, Logging}
|
||||
import cats.effect.IO
|
||||
import net.kemitix.s3thorp.{Bucket, Config, LocalFile, Logging, RemoteKey}
|
||||
import software.amazon.awssdk.services.s3.model.{CopyObjectResponse, DeleteObjectResponse, ListObjectsV2Response, PutObjectResponse}
|
||||
|
||||
trait S3ClientLogging
|
||||
extends Logging {
|
||||
|
||||
def logUploadStart(localFile: LocalFile, bucket: Bucket)
|
||||
(implicit c: Config)=
|
||||
log5(s"s3Client:upload:start: ${localFile.file}")
|
||||
def logListObjectsStart(bucket: Bucket,
|
||||
prefix: RemoteKey)
|
||||
(implicit c: Config): ListObjectsV2Response => IO[ListObjectsV2Response] = {
|
||||
in => IO {
|
||||
log3(s"Fetch S3 Summary: ${bucket.name}:${prefix.key}")
|
||||
in
|
||||
}
|
||||
}
|
||||
|
||||
def logUploadDone(localFile: LocalFile, bucket: Bucket)
|
||||
(implicit c: Config) =
|
||||
log5(s"s3Client:upload:done : ${localFile.file}")
|
||||
def logListObjectsFinish(bucket: Bucket,
|
||||
prefix: RemoteKey)
|
||||
(implicit c: Config): ListObjectsV2Response => IO[Unit] = {
|
||||
in => IO {
|
||||
log2(s"Fetched S3 Summary: ${bucket.name}:${prefix.key}")
|
||||
}
|
||||
}
|
||||
|
||||
def logUploadStart(localFile: LocalFile,
|
||||
bucket: Bucket)
|
||||
(implicit c: Config): PutObjectResponse => IO[PutObjectResponse] = {
|
||||
in => IO {
|
||||
log4(s"Uploading: ${bucket.name}:${localFile.remoteKey}")
|
||||
in
|
||||
}
|
||||
}
|
||||
|
||||
def logUploadFinish(localFile: LocalFile,
|
||||
bucket: Bucket)
|
||||
(implicit c: Config): PutObjectResponse => IO[Unit] = {
|
||||
in =>IO {
|
||||
log3(s"Uploaded: ${bucket.name}:${localFile.remoteKey}")
|
||||
}
|
||||
}
|
||||
|
||||
def logCopyStart(bucket: Bucket,
|
||||
sourceKey: RemoteKey,
|
||||
targetKey: RemoteKey)
|
||||
(implicit c: Config): CopyObjectResponse => IO[CopyObjectResponse] = {
|
||||
in => IO {
|
||||
log4(s"Copy: ${bucket.name}:${sourceKey.key} => ${targetKey.key}")
|
||||
in
|
||||
}
|
||||
}
|
||||
|
||||
def logCopyFinish(bucket: Bucket,
|
||||
sourceKey: RemoteKey,
|
||||
targetKey: RemoteKey)
|
||||
(implicit c: Config): CopyObjectResponse => IO[Unit] = {
|
||||
in => IO {
|
||||
log3(s"Copied: ${bucket.name}:${sourceKey.key} => ${targetKey.key}")
|
||||
}
|
||||
}
|
||||
|
||||
def logDeleteStart(bucket: Bucket,
|
||||
remoteKey: RemoteKey)
|
||||
(implicit c: Config): DeleteObjectResponse => IO[DeleteObjectResponse] = {
|
||||
in => IO {
|
||||
log4(s"Delete: ${bucket.name}:${remoteKey.key}")
|
||||
in
|
||||
}
|
||||
}
|
||||
|
||||
def logDeleteFinish(bucket: Bucket,
|
||||
remoteKey: RemoteKey)
|
||||
(implicit c: Config): DeleteObjectResponse => IO[Unit] = {
|
||||
in => IO {
|
||||
log3(s"Deleted: ${bucket.name}:${remoteKey.key}")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -11,74 +11,79 @@ import scala.collection.JavaConverters._
|
|||
private class ThorpS3Client(s3Client: S3CatsIOClient)
|
||||
extends S3Client
|
||||
with S3ObjectsByHash
|
||||
with Logging {
|
||||
with S3ClientLogging {
|
||||
|
||||
override def upload(localFile: LocalFile,
|
||||
bucket: Bucket
|
||||
)(implicit c: Config): IO[UploadS3Action] = {
|
||||
log5(s"upload:bucket = ${bucket.name}, localFile = ${localFile.remoteKey}")
|
||||
val request = PutObjectRequest.builder()
|
||||
override def listObjects(bucket: Bucket,
|
||||
prefix: RemoteKey)
|
||||
(implicit c: Config): IO[S3ObjectsData] = {
|
||||
val request = ListObjectsV2Request.builder
|
||||
.bucket(bucket.name)
|
||||
.key(localFile.remoteKey.key)
|
||||
.build()
|
||||
val body = AsyncRequestBody.fromFile(localFile.file)
|
||||
s3Client.putObject(request, body)
|
||||
.map(_.eTag)
|
||||
.map(_.filter{c => c != '"'})
|
||||
.map(MD5Hash)
|
||||
.map(md5Hash => UploadS3Action(localFile.remoteKey, md5Hash))
|
||||
.prefix(prefix.key).build
|
||||
s3Client.listObjectsV2(request)
|
||||
.bracket(
|
||||
logListObjectsStart(bucket, prefix))(
|
||||
logListObjectsFinish(bucket,prefix))
|
||||
.map(_.contents)
|
||||
.map(_.asScala)
|
||||
.map(_.toStream)
|
||||
.map(os => S3ObjectsData(byHash(os), byKey(os)))
|
||||
}
|
||||
|
||||
override def copy(bucket: Bucket,
|
||||
sourceKey: RemoteKey,
|
||||
hash: MD5Hash,
|
||||
targetKey: RemoteKey
|
||||
)(implicit c: Config): IO[CopyS3Action] = {
|
||||
log5(s"copy:bucket = ${bucket.name}, sourceKey = ${sourceKey.key}, targetKey = ${targetKey.key}")
|
||||
val request = CopyObjectRequest.builder()
|
||||
.bucket(bucket.name)
|
||||
.copySource(s"${bucket.name}/${sourceKey.key}")
|
||||
.copySourceIfMatch(hash.hash)
|
||||
.key(targetKey.key)
|
||||
.build()
|
||||
s3Client.copyObject(request)
|
||||
.map(_ => CopyS3Action(targetKey))
|
||||
}
|
||||
|
||||
override def delete(bucket: Bucket,
|
||||
remoteKey: RemoteKey
|
||||
)(implicit c: Config): IO[DeleteS3Action] = {
|
||||
log5(s"delete:bucket = ${bucket.name}, remoteKey = ${remoteKey.key}")
|
||||
val request = DeleteObjectRequest.builder()
|
||||
.bucket(bucket.name)
|
||||
.key(remoteKey.key)
|
||||
.build()
|
||||
s3Client.deleteObject(request)
|
||||
.map(_ => DeleteS3Action(remoteKey))
|
||||
}
|
||||
|
||||
private def asS3ObjectsData: Stream[S3Object] => S3ObjectsData =
|
||||
os => S3ObjectsData(byHash(os), byKey(os))
|
||||
|
||||
private def byKey(os: Stream[S3Object]) =
|
||||
os.map { o => {
|
||||
val remoteKey = RemoteKey(o.key)
|
||||
val hash = MD5Hash(o.eTag().filter { c => c != '"' })
|
||||
val hash = MD5Hash(o.eTag() filter stripQuotes)
|
||||
val lastModified = LastModified(o.lastModified())
|
||||
(remoteKey, HashModified(hash, lastModified))
|
||||
}}.toMap
|
||||
|
||||
|
||||
def listObjects(bucket: Bucket, prefix: RemoteKey)
|
||||
(implicit c: Config): IO[S3ObjectsData] = {
|
||||
log5(s"listObjects:bucket = ${bucket.name}, prefix: ${prefix.key}")
|
||||
val request = ListObjectsV2Request.builder()
|
||||
override def upload(localFile: LocalFile,
|
||||
bucket: Bucket)
|
||||
(implicit c: Config): IO[UploadS3Action] = {
|
||||
val request = PutObjectRequest.builder
|
||||
.bucket(bucket.name)
|
||||
.prefix(prefix.key)
|
||||
.build()
|
||||
s3Client.listObjectsV2(request)
|
||||
.map(r => r.contents.asScala.toStream)
|
||||
.map(asS3ObjectsData)
|
||||
.key(localFile.remoteKey.key).build
|
||||
val body = AsyncRequestBody.fromFile(localFile.file)
|
||||
s3Client.putObject(request, body)
|
||||
.bracket(
|
||||
logUploadStart(localFile, bucket))(
|
||||
logUploadFinish(localFile, bucket))
|
||||
.map(_.eTag)
|
||||
.map(_ filter stripQuotes)
|
||||
.map(MD5Hash)
|
||||
.map(UploadS3Action(localFile.remoteKey, _))
|
||||
}
|
||||
|
||||
private def stripQuotes: Char => Boolean = _ != '"'
|
||||
|
||||
override def copy(bucket: Bucket,
|
||||
sourceKey: RemoteKey,
|
||||
hash: MD5Hash,
|
||||
targetKey: RemoteKey)
|
||||
(implicit c: Config): IO[CopyS3Action] = {
|
||||
val request = CopyObjectRequest.builder
|
||||
.bucket(bucket.name)
|
||||
.copySource(s"${bucket.name}/${sourceKey.key}")
|
||||
.copySourceIfMatch(hash.hash)
|
||||
.key(targetKey.key).build
|
||||
s3Client.copyObject(request)
|
||||
.bracket(
|
||||
logCopyStart(bucket, sourceKey, targetKey))(
|
||||
logCopyFinish(bucket, sourceKey,targetKey))
|
||||
.map(_ => CopyS3Action(targetKey))
|
||||
}
|
||||
|
||||
override def delete(bucket: Bucket,
|
||||
remoteKey: RemoteKey)
|
||||
(implicit c: Config): IO[DeleteS3Action] = {
|
||||
val request = DeleteObjectRequest.builder
|
||||
.bucket(bucket.name)
|
||||
.key(remoteKey.key).build
|
||||
s3Client.deleteObject(request)
|
||||
.bracket(
|
||||
logDeleteStart(bucket, remoteKey))(
|
||||
logDeleteFinish(bucket, remoteKey))
|
||||
.map(_ => DeleteS3Action(remoteKey))
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -6,7 +6,8 @@ import org.scalatest.FunSpec
|
|||
|
||||
abstract class UnitTest extends FunSpec {
|
||||
|
||||
def aLocalFile(path: String, myHash: MD5Hash, source: File, fileToKey: File => RemoteKey): LocalFile =
|
||||
def aLocalFile(path: String, myHash: MD5Hash, source: File, fileToKey: File => RemoteKey)
|
||||
(implicit c: Config): LocalFile =
|
||||
new LocalFile(source.toPath.resolve(path).toFile, source, fileToKey) {
|
||||
override def hash: MD5Hash = myHash
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue