Fetch all hashes (#45)

* [core] close files after calculating their MD5 hash

FileInputStream was never closed, so eventually ran into
ToManyFilesOpen.

Will come back to look at this again with IO.bracket for better
guarantee that FIS is closed.

Signed-off-by: Paul Campbell <pcampbell@kemitix.net>

* [aws-lib] Fetch all MD5 hashes under prefix

Initial request only returns the first 1000.
This commit is contained in:
Paul Campbell 2019-06-08 11:44:22 +01:00 committed by GitHub
parent 07f12ac19f
commit f6bf2700ff
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 42 additions and 15 deletions

View file

@ -1,14 +1,14 @@
package net.kemitix.s3thorp.aws.lib package net.kemitix.s3thorp.aws.lib
import cats.effect.IO import cats.effect.IO
import com.amazonaws.services.s3.model.{CopyObjectResult, DeleteObjectsResult, ListObjectsV2Result, PutObjectResult} import com.amazonaws.services.s3.model.{CopyObjectResult, DeleteObjectsResult, ListObjectsV2Result, PutObjectResult, S3ObjectSummary}
import net.kemitix.s3thorp.domain.{Bucket, LocalFile, RemoteKey} import net.kemitix.s3thorp.domain.{Bucket, LocalFile, RemoteKey}
object S3ClientLogging { object S3ClientLogging {
def logListObjectsStart(bucket: Bucket, def logListObjectsStart(bucket: Bucket,
prefix: RemoteKey) prefix: RemoteKey)
(implicit info: Int => String => Unit): ListObjectsV2Result => IO[ListObjectsV2Result] = (implicit info: Int => String => Unit): Stream[S3ObjectSummary] => IO[Stream[S3ObjectSummary]] =
in => IO { in => IO {
info(3)(s"Fetch S3 Summary: ${bucket.name}:${prefix.key}") info(3)(s"Fetch S3 Summary: ${bucket.name}:${prefix.key}")
in in
@ -16,7 +16,7 @@ object S3ClientLogging {
def logListObjectsFinish(bucket: Bucket, def logListObjectsFinish(bucket: Bucket,
prefix: RemoteKey) prefix: RemoteKey)
(implicit info: Int => String => Unit): ListObjectsV2Result => IO[Unit] = (implicit info: Int => String => Unit): Stream[S3ObjectSummary] => IO[Unit] =
_ => IO { _ => IO {
info(2)(s"Fetched S3 Summary: ${bucket.name}:${prefix.key}") info(2)(s"Fetched S3 Summary: ${bucket.name}:${prefix.key}")
} }

View file

@ -2,7 +2,7 @@ package net.kemitix.s3thorp.aws.lib
import cats.effect.IO import cats.effect.IO
import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.ListObjectsV2Request import com.amazonaws.services.s3.model.{ListObjectsV2Request, S3ObjectSummary}
import net.kemitix.s3thorp.aws.lib.S3ClientLogging.{logListObjectsFinish, logListObjectsStart} import net.kemitix.s3thorp.aws.lib.S3ClientLogging.{logListObjectsFinish, logListObjectsStart}
import net.kemitix.s3thorp.aws.lib.S3ObjectsByHash.byHash import net.kemitix.s3thorp.aws.lib.S3ObjectsByHash.byHash
import net.kemitix.s3thorp.aws.lib.S3ObjectsByKey.byKey import net.kemitix.s3thorp.aws.lib.S3ObjectsByKey.byKey
@ -15,17 +15,43 @@ class S3ClientObjectLister(amazonS3: AmazonS3) {
def listObjects(bucket: Bucket, def listObjects(bucket: Bucket,
prefix: RemoteKey) prefix: RemoteKey)
(implicit info: Int => String => Unit): IO[S3ObjectsData] = { (implicit info: Int => String => Unit): IO[S3ObjectsData] = {
val request = new ListObjectsV2Request()
type Token = String
type Batch = (Stream[S3ObjectSummary], Option[Token])
val requestInitial = new ListObjectsV2Request()
.withBucketName(bucket.name) .withBucketName(bucket.name)
.withPrefix(prefix.key) .withPrefix(prefix.key)
IO {
amazonS3.listObjectsV2(request) val requestMore = (token:Token) => new ListObjectsV2Request()
}.bracket( .withBucketName(bucket.name)
.withPrefix(prefix.key)
.withContinuationToken(token)
def fetchBatch: ListObjectsV2Request => IO[Batch] =
request => IO{
val result = amazonS3.listObjectsV2(request)
val more: Option[Token] =
if (result.isTruncated) Some(result.getNextContinuationToken)
else None
(result.getObjectSummaries.asScala.toStream, more)
}
def fetchAll: ListObjectsV2Request => IO[Stream[S3ObjectSummary]] =
request =>
for {
batch <- fetchBatch(request)
(summaries, more) = batch
rest <- more match {
case None => IO{Stream()}
case Some(token) => fetchAll(requestMore(token))
}
} yield summaries ++ rest
fetchAll(requestInitial)
.bracket(
logListObjectsStart(bucket, prefix))( logListObjectsStart(bucket, prefix))(
logListObjectsFinish(bucket,prefix)) logListObjectsFinish(bucket,prefix))
.map(_.getObjectSummaries)
.map(_.asScala)
.map(_.toStream)
.map(os => S3ObjectsData(byHash(os), byKey(os))) .map(os => S3ObjectsData(byHash(os), byKey(os)))
} }

View file

@ -24,6 +24,7 @@ object MD5HashGenerator {
fis read buffer fis read buffer
val hash = md5PartBody(buffer) val hash = md5PartBody(buffer)
info(5)(s"md5:generated:${hash.hash}") info(5)(s"md5:generated:${hash.hash}")
fis.close
hash hash
} }