diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientLogging.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientLogging.scala index 1d28ff0..b14dcfc 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientLogging.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientLogging.scala @@ -1,14 +1,14 @@ package net.kemitix.s3thorp.aws.lib import cats.effect.IO -import com.amazonaws.services.s3.model.{CopyObjectResult, DeleteObjectsResult, ListObjectsV2Result, PutObjectResult} +import com.amazonaws.services.s3.model.{CopyObjectResult, DeleteObjectsResult, ListObjectsV2Result, PutObjectResult, S3ObjectSummary} import net.kemitix.s3thorp.domain.{Bucket, LocalFile, RemoteKey} object S3ClientLogging { def logListObjectsStart(bucket: Bucket, prefix: RemoteKey) - (implicit info: Int => String => Unit): ListObjectsV2Result => IO[ListObjectsV2Result] = + (implicit info: Int => String => Unit): Stream[S3ObjectSummary] => IO[Stream[S3ObjectSummary]] = in => IO { info(3)(s"Fetch S3 Summary: ${bucket.name}:${prefix.key}") in @@ -16,7 +16,7 @@ object S3ClientLogging { def logListObjectsFinish(bucket: Bucket, prefix: RemoteKey) - (implicit info: Int => String => Unit): ListObjectsV2Result => IO[Unit] = + (implicit info: Int => String => Unit): Stream[S3ObjectSummary] => IO[Unit] = _ => IO { info(2)(s"Fetched S3 Summary: ${bucket.name}:${prefix.key}") } diff --git a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientObjectLister.scala b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientObjectLister.scala index d87828a..f901425 100644 --- a/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientObjectLister.scala +++ b/aws-lib/src/main/scala/net/kemitix/s3thorp/aws/lib/S3ClientObjectLister.scala @@ -2,7 +2,7 @@ package net.kemitix.s3thorp.aws.lib import cats.effect.IO import com.amazonaws.services.s3.AmazonS3 -import com.amazonaws.services.s3.model.ListObjectsV2Request +import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary} import net.kemitix.s3thorp.aws.lib.S3ClientLogging.{logListObjectsFinish, logListObjectsStart} import net.kemitix.s3thorp.aws.lib.S3ObjectsByHash.byHash import net.kemitix.s3thorp.aws.lib.S3ObjectsByKey.byKey @@ -15,17 +15,43 @@ class S3ClientObjectLister(amazonS3: AmazonS3) { def listObjects(bucket: Bucket, prefix: RemoteKey) (implicit info: Int => String => Unit): IO[S3ObjectsData] = { - val request = new ListObjectsV2Request() - .withBucketName(bucket.name) - .withPrefix(prefix.key) - IO { - amazonS3.listObjectsV2(request) - }.bracket( - logListObjectsStart(bucket, prefix))( - logListObjectsFinish(bucket,prefix)) - .map(_.getObjectSummaries) - .map(_.asScala) - .map(_.toStream) + + type Token = String + type Batch = (Stream[S3ObjectSummary], Option[Token]) + + val requestInitial = new ListObjectsV2Request() + .withBucketName(bucket.name) + .withPrefix(prefix.key) + + val requestMore = (token:Token) => new ListObjectsV2Request() + .withBucketName(bucket.name) + .withPrefix(prefix.key) + .withContinuationToken(token) + + def fetchBatch: ListObjectsV2Request => IO[Batch] = + request => IO{ + val result = amazonS3.listObjectsV2(request) + val more: Option[Token] = + if (result.isTruncated) Some(result.getNextContinuationToken) + else None + (result.getObjectSummaries.asScala.toStream, more) + } + + def fetchAll: ListObjectsV2Request => IO[Stream[S3ObjectSummary]] = + request => + for { + batch <- fetchBatch(request) + (summaries, more) = batch + rest <- more match { + case None => IO{Stream()} + case Some(token) => fetchAll(requestMore(token)) + } + } yield summaries ++ rest + + fetchAll(requestInitial) + .bracket( + logListObjectsStart(bucket, prefix))( + logListObjectsFinish(bucket,prefix)) .map(os => S3ObjectsData(byHash(os), byKey(os))) } diff --git a/core/src/main/scala/net.kemitix.s3thorp.core/MD5HashGenerator.scala b/core/src/main/scala/net.kemitix.s3thorp.core/MD5HashGenerator.scala index 7af9e9a..a552f4f 100644 --- a/core/src/main/scala/net.kemitix.s3thorp.core/MD5HashGenerator.scala +++ b/core/src/main/scala/net.kemitix.s3thorp.core/MD5HashGenerator.scala @@ -24,6 +24,7 @@ object MD5HashGenerator { fis read buffer val hash = md5PartBody(buffer) info(5)(s"md5:generated:${hash.hash}") + fis.close hash }