From f6bf2700ff17e14314c9bf026481a497d21a34d0 Mon Sep 17 00:00:00 2001 From: Paul Campbell Date: Sat, 8 Jun 2019 11:44:22 +0100 Subject: [PATCH] Fetch all hashes (#45) * [core] close files after calculating their MD5 hash FileInputStream was never closed, so eventually ran into ToManyFilesOpen. Will come back to look at this again with IO.bracket for better guarantee that FIS is closed. Signed-off-by: Paul Campbell * [aws-lib] Fetch all MD5 hashes under prefix Initial request only returns the first 1000. --- .../s3thorp/aws/lib/S3ClientLogging.scala | 6 +-- .../aws/lib/S3ClientObjectLister.scala | 50 ++++++++++++++----- .../MD5HashGenerator.scala | 1 + 3 files changed, 42 insertions(+), 15 deletions(-) diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientLogging.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientLogging.scala index 1d28ff0..b14dcfc 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientLogging.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientLogging.scala @@ -1,14 +1,14 @@ package net.kemitix.s3thorp.aws.lib import cats.effect.IO -import com.amazonaws.services.s3.model.{CopyObjectResult, DeleteObjectsResult, ListObjectsV2Result, PutObjectResult} +import com.amazonaws.services.s3.model.{CopyObjectResult, DeleteObjectsResult, ListObjectsV2Result, PutObjectResult, S3ObjectSummary} import net.kemitix.s3thorp.domain.{Bucket, LocalFile, RemoteKey} object S3ClientLogging { def logListObjectsStart(bucket: Bucket, prefix: RemoteKey) - (implicit info: Int => String => Unit): ListObjectsV2Result => IO[ListObjectsV2Result] = + (implicit info: Int => String => Unit): Stream[S3ObjectSummary] => IO[Stream[S3ObjectSummary]] = in => IO { info(3)(s"Fetch S3 Summary: ${bucket.name}:${prefix.key}") in @@ -16,7 +16,7 @@ object S3ClientLogging { def logListObjectsFinish(bucket: Bucket, prefix: RemoteKey) - (implicit info: Int => String => Unit): ListObjectsV2Result => IO[Unit] = + (implicit info: Int => String => Unit): Stream[S3ObjectSummary] => IO[Unit] = _ => IO { info(2)(s"Fetched S3 Summary: ${bucket.name}:${prefix.key}") } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientObjectLister.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientObjectLister.scala index d87828a..f901425 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientObjectLister.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientObjectLister.scala @@ -2,7 +2,7 @@ package net.kemitix.s3thorp.aws.lib import cats.effect.IO import com.amazonaws.services.s3.AmazonS3 -import com.amazonaws.services.s3.model.ListObjectsV2Request +import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary} import net.kemitix.s3thorp.aws.lib.S3ClientLogging.{logListObjectsFinish, logListObjectsStart} import net.kemitix.s3thorp.aws.lib.S3ObjectsByHash.byHash import net.kemitix.s3thorp.aws.lib.S3ObjectsByKey.byKey @@ -15,17 +15,43 @@ class S3ClientObjectLister(amazonS3: AmazonS3) { def listObjects(bucket: Bucket, prefix: RemoteKey) (implicit info: Int => String => Unit): IO[S3ObjectsData] = { - val request = new ListObjectsV2Request() - .withBucketName(bucket.name) - .withPrefix(prefix.key) - IO { - amazonS3.listObjectsV2(request) - }.bracket( - logListObjectsStart(bucket, prefix))( - logListObjectsFinish(bucket,prefix)) - .map(_.getObjectSummaries) - .map(_.asScala) - .map(_.toStream) + + type Token = String + type Batch = (Stream[S3ObjectSummary], Option[Token]) + + val requestInitial = new ListObjectsV2Request() + .withBucketName(bucket.name) + .withPrefix(prefix.key) + + val requestMore = (token:Token) => new ListObjectsV2Request() + .withBucketName(bucket.name) + .withPrefix(prefix.key) + .withContinuationToken(token) + + def fetchBatch: ListObjectsV2Request => IO[Batch] = + request => IO{ + val result = amazonS3.listObjectsV2(request) + val more: Option[Token] = + if (result.isTruncated) Some(result.getNextContinuationToken) + else None + (result.getObjectSummaries.asScala.toStream, more) + } + + def fetchAll: ListObjectsV2Request => IO[Stream[S3ObjectSummary]] = + request => + for { + batch <- fetchBatch(request) + (summaries, more) = batch + rest <- more match { + case None => IO{Stream()} + case Some(token) => fetchAll(requestMore(token)) + } + } yield summaries ++ rest + + fetchAll(requestInitial) + .bracket( + logListObjectsStart(bucket, prefix))( + logListObjectsFinish(bucket,prefix)) .map(os => S3ObjectsData(byHash(os), byKey(os))) } diff --git a/core/src/main/scala/net.kemitix.s3thorp.core/MD5HashGenerator.scala b/core/src/main/scala/net.kemitix.s3thorp.core/MD5HashGenerator.scala index 7af9e9a..a552f4f 100644 --- a/core/src/main/scala/net.kemitix.s3thorp.core/MD5HashGenerator.scala +++ b/core/src/main/scala/net.kemitix.s3thorp.core/MD5HashGenerator.scala @@ -24,6 +24,7 @@ object MD5HashGenerator { fis read buffer val hash = md5PartBody(buffer) info(5)(s"md5:generated:${hash.hash}") + fis.close hash }