Refactor Copier.copyObject(AmazonS3.Client,Bucket,RemoteKey,MD5Hash,RemoteKey) (#137)

* [core] Convert HashService into an effect Hasher

* [core] LocalFileStreamSuite fix prep of test env

* [core] LocalFileStreamSuite Refactor

* [core] Add Hasher.hashObjectChunk

* [core] make MD5HashGenerator private

Moved MD5HashGenerator into package hasher and made it private to that
package, together with the Hasher effect which is the only permitted
user of it. Added hex and digest methods to Hasher.

Similar for the ETageGenerator in [storage-aws], moved into a hasher
package and made private. It can only be accessed using the S3Hasher
effect.

* [domain] Add Monoid

* [domain] Sources use Monoid

* [core] ActionGenerator Refactor

* [core] Use >>= and *> operators

* [config] Refactoring

* [storage-aws] Refactoring

* [domain] Refactoring

* [core] Refactoring

* [config] refactoring

* Refactoring

* [core] PlanBuilder Refactoring

* [core] Make PlanBuilder into an object

* [storate-aws] Copier Reduce the number of parameters
This commit is contained in:
Paul Campbell 2019-08-01 08:34:58 +01:00 committed by GitHub
parent a749b1f5b0
commit 899e0724c9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
47 changed files with 519 additions and 540 deletions

View file

@ -122,7 +122,7 @@ lazy val config = (project in file("config"))
.settings(testDependencies)
.settings(commandLineParsing)
.settings(assemblyJarName in assembly := "config.jar")
.dependsOn(domain)
.dependsOn(domain % "compile->compile;test->test")
.dependsOn(filesystem)
lazy val filesystem = (project in file("filesystem"))

View file

@ -4,6 +4,7 @@ import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.filesystem.FileSystem
import net.kemitix.thorp.storage.aws.S3Storage
import net.kemitix.thorp.storage.aws.hasher.S3Hasher
import zio.{App, ZIO}
object Main extends App {
@ -13,6 +14,7 @@ object Main extends App {
with Console.Live
with Config.Live
with FileSystem.Live
with S3Hasher.Live
override def run(args: List[String]): ZIO[Environment, Nothing, Int] =
Program

View file

@ -5,7 +5,6 @@ import net.kemitix.thorp.console._
import net.kemitix.thorp.core.CoreTypes.CoreProgram
import net.kemitix.thorp.core._
import net.kemitix.thorp.domain.StorageQueueEvent
import net.kemitix.thorp.storage.aws.S3HashService.defaultHashService
import zio.ZIO
trait Program {
@ -27,7 +26,7 @@ trait Program {
private def execute = {
for {
plan <- PlanBuilder.createPlan(defaultHashService)
plan <- PlanBuilder.createPlan
archive <- UnversionedMirrorArchive.default(plan.syncTotals)
events <- applyPlan(archive, plan)
_ <- SyncLogging.logRunFinished(events)
@ -39,7 +38,7 @@ trait Program {
_ <- Console.putStrLn("There were errors:")
_ <- throwable match {
case ConfigValidationException(errors) =>
ZIO.foreach(errors)(error => Console.putStrLn(s"- $error"))
ZIO.foreach_(errors)(error => Console.putStrLn(s"- $error"))
}
} yield ()

View file

@ -14,7 +14,7 @@ object ConfigOption {
case class Source(path: Path) extends ConfigOption {
override def update(config: Configuration): Configuration =
sources.modify(_ ++ path)(config)
sources.modify(_ + path)(config)
}
case class Bucket(name: String) extends ConfigOption {

View file

@ -1,5 +1,5 @@
package net.kemitix.thorp.config
final case class ConfigValidationException(
errors: List[ConfigValidation]
errors: Seq[ConfigValidation]
) extends Exception

View file

@ -22,17 +22,15 @@ sealed trait ConfigValidator {
def validateSources(
sources: Sources): Either[List[ConfigValidation], Sources] =
(for {
x <- sources.paths.foldLeft(List[ConfigValidation]()) {
(acc: List[ConfigValidation], path) =>
{
validateSource(path) match {
case Left(errors) => acc ++ errors
case Right(_) => acc
}
sources.paths.foldLeft(List[ConfigValidation]()) {
(acc: List[ConfigValidation], path) =>
{
validateSource(path) match {
case Left(errors) => acc ++ errors
case Right(_) => acc
}
}
} yield x) match {
}
} match {
case Nil => Right(sources)
case errors => Left(errors)
}

View file

@ -17,10 +17,8 @@ trait ConfigurationBuilder {
def buildConfig(priorityOpts: ConfigOptions)
: ZIO[FileSystem, ConfigValidationException, Configuration] =
(for {
config <- getConfigOptions(priorityOpts).map(collateOptions)
valid <- ConfigValidator.validateConfig(config)
} yield valid)
(getConfigOptions(priorityOpts).map(collateOptions) >>=
ConfigValidator.validateConfig)
.catchAll(errors => ZIO.fail(ConfigValidationException(errors)))
private def getConfigOptions(priorityOpts: ConfigOptions) =

View file

@ -3,24 +3,25 @@ package net.kemitix.thorp.config
import java.nio.file.Path
import net.kemitix.thorp.filesystem.FileSystem
import zio.{IO, ZIO}
import zio.{IO, TaskR, ZIO}
trait ParseConfigFile {
def parseFile(
filename: Path): ZIO[FileSystem, List[ConfigValidation], ConfigOptions] =
readFile(filename)
.map(ParseConfigLines.parseLines)
.catchAll(h =>
IO.fail(
List(ConfigValidation.ErrorReadingFile(filename, h.getMessage))))
filename: Path): ZIO[FileSystem, Seq[ConfigValidation], ConfigOptions] =
(readFile(filename) >>= ParseConfigLines.parseLines)
.catchAll(
h =>
IO.fail(
List(ConfigValidation.ErrorReadingFile(filename, h.getMessage))))
private def readFile(filename: Path) =
FileSystem
.exists(filename.toFile)
.flatMap(
if (_) FileSystem.lines(filename.toFile)
else ZIO.succeed(List.empty))
FileSystem.exists(filename.toFile) >>= readLines(filename)
private def readLines(filename: Path)(
exists: Boolean): TaskR[FileSystem, Seq[String]] =
if (exists) FileSystem.lines(filename.toFile)
else ZIO.succeed(Seq.empty)
}

View file

@ -3,22 +3,16 @@ package net.kemitix.thorp.config
import java.nio.file.Paths
import java.util.regex.Pattern
import net.kemitix.thorp.config.ConfigOption.{
Bucket,
Debug,
Exclude,
Include,
Prefix,
Source
}
import net.kemitix.thorp.config.ConfigOption._
import zio.UIO
trait ParseConfigLines {
private val pattern = "^\\s*(?<key>\\S*)\\s*=\\s*(?<value>\\S*)\\s*$"
private val format = Pattern.compile(pattern)
def parseLines(lines: List[String]): ConfigOptions =
ConfigOptions(lines.flatMap(parseLine))
def parseLines(lines: Seq[String]): UIO[ConfigOptions] =
UIO(ConfigOptions(lines.flatMap(parseLine).toList))
private def parseLine(str: String) =
format.matcher(str) match {
@ -40,7 +34,7 @@ trait ParseConfigLines {
case _ => None
}
def truthy(value: String): Boolean =
private def truthy(value: String): Boolean =
value.toLowerCase match {
case "true" => true
case "yes" => true

View file

@ -8,24 +8,16 @@ trait SourceConfigLoader {
val thorpConfigFileName = ".thorp.conf"
def loadSourceConfigs
: Sources => ZIO[FileSystem, List[ConfigValidation], ConfigOptions] =
sources => {
val sourceConfigOptions =
ConfigOptions(sources.paths.map(ConfigOption.Source))
val reduce: List[ConfigOptions] => ConfigOptions =
_.foldLeft(sourceConfigOptions) { (acc, co) =>
def loadSourceConfigs(
sources: Sources): ZIO[FileSystem, Seq[ConfigValidation], ConfigOptions] =
ZIO
.foreach(sources.paths) { path =>
ParseConfigFile.parseFile(path.resolve(thorpConfigFileName))
}
.map(_.foldLeft(ConfigOptions(sources.paths.map(ConfigOption.Source))) {
(acc, co) =>
acc ++ co
}
ZIO
.foreach(sources.paths) { path =>
ParseConfigFile.parseFile(path.resolve(thorpConfigFileName))
}
.map(reduce)
}
})
}

View file

@ -1,12 +1,6 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.config
import net.kemitix.thorp.config.{
ConfigOption,
ConfigOptions,
ConfigQuery,
ConfigurationBuilder
}
import net.kemitix.thorp.domain.Sources
import net.kemitix.thorp.domain.{Sources, TemporaryFolder}
import net.kemitix.thorp.filesystem.FileSystem
import org.scalatest.FunSpec
import zio.DefaultRuntime

View file

@ -1,8 +1,7 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.config
import java.nio.file.Paths
import net.kemitix.thorp.config.{ConfigOption, ConfigOptions, ConfigQuery}
import net.kemitix.thorp.domain.Sources
import org.scalatest.FreeSpec

View file

@ -1,12 +1,7 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.config
import java.nio.file.{Path, Paths}
import net.kemitix.thorp.config.{
ConfigOption,
ConfigOptions,
ConfigurationBuilder
}
import net.kemitix.thorp.domain.Filter.{Exclude, Include}
import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem.FileSystem

View file

@ -1,13 +1,7 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.config
import java.nio.file.{Path, Paths}
import net.kemitix.thorp.config.{
ConfigOption,
ConfigOptions,
ParseConfigFile,
Resource
}
import net.kemitix.thorp.filesystem.FileSystem
import org.scalatest.FunSpec
import zio.DefaultRuntime

View file

@ -0,0 +1,88 @@
package net.kemitix.thorp.config
import java.nio.file.Paths
import org.scalatest.FunSpec
import zio.DefaultRuntime
class ParseConfigLinesTest extends FunSpec {
describe("parse single lines") {
describe("source") {
it("should parse") {
val expected =
Right(
ConfigOptions(
List(ConfigOption.Source(Paths.get("/path/to/source")))))
val result = invoke(List("source = /path/to/source"))
assertResult(expected)(result)
}
}
describe("bucket") {
it("should parse") {
val expected =
Right(ConfigOptions(List(ConfigOption.Bucket("bucket-name"))))
val result = invoke(List("bucket = bucket-name"))
assertResult(expected)(result)
}
}
describe("prefix") {
it("should parse") {
val expected =
Right(ConfigOptions(List(ConfigOption.Prefix("prefix/to/files"))))
val result = invoke(List("prefix = prefix/to/files"))
assertResult(expected)(result)
}
}
describe("include") {
it("should parse") {
val expected =
Right(ConfigOptions(List(ConfigOption.Include("path/to/include"))))
val result = invoke(List("include = path/to/include"))
assertResult(expected)(result)
}
}
describe("exclude") {
it("should parse") {
val expected =
Right(ConfigOptions(List(ConfigOption.Exclude("path/to/exclude"))))
val result = invoke(List("exclude = path/to/exclude"))
assertResult(expected)(result)
}
}
describe("debug - true") {
it("should parse") {
val expected = Right(ConfigOptions(List(ConfigOption.Debug())))
val result = invoke(List("debug = true"))
assertResult(expected)(result)
}
}
describe("debug - false") {
it("should parse") {
val expected = Right(ConfigOptions())
val result = invoke(List("debug = false"))
assertResult(expected)(result)
}
}
describe("comment line") {
it("should be ignored") {
val expected = Right(ConfigOptions())
val result = invoke(List("# ignore me"))
assertResult(expected)(result)
}
}
describe("unrecognised option") {
it("should be ignored") {
val expected = Right(ConfigOptions())
val result = invoke(List("unsupported = option"))
assertResult(expected)(result)
}
}
def invoke(lines: List[String]) = {
new DefaultRuntime {}.unsafeRunSync {
ParseConfigLines.parseLines(lines)
}.toEither
}
}
}

View file

@ -13,35 +13,37 @@ object ActionGenerator {
): ZIO[Config, Nothing, Stream[Action]] =
for {
bucket <- Config.bucket
} yield
s3MetaData match {
// #1 local exists, remote exists, remote matches - do nothing
case S3MetaData(localFile, _, Some(RemoteMetaData(key, hash, _)))
if localFile.matches(hash) =>
doNothing(bucket, key)
// #2 local exists, remote is missing, other matches - copy
case S3MetaData(localFile, matchByHash, None) if matchByHash.nonEmpty =>
copyFile(bucket, localFile, matchByHash)
// #3 local exists, remote is missing, other no matches - upload
case S3MetaData(localFile, matchByHash, None)
if matchByHash.isEmpty &&
isUploadAlreadyQueued(previousActions)(localFile) =>
uploadFile(bucket, localFile)
// #4 local exists, remote exists, remote no match, other matches - copy
case S3MetaData(localFile,
matchByHash,
Some(RemoteMetaData(_, hash, _)))
if !localFile.matches(hash) &&
matchByHash.nonEmpty =>
copyFile(bucket, localFile, matchByHash)
// #5 local exists, remote exists, remote no match, other no matches - upload
case S3MetaData(localFile, matchByHash, Some(_))
if matchByHash.isEmpty =>
uploadFile(bucket, localFile)
// fallback
case S3MetaData(localFile, _, _) =>
doNothing(bucket, localFile.remoteKey)
}
} yield genAction(s3MetaData, previousActions, bucket)
private def genAction(s3MetaData: S3MetaData,
previousActions: Stream[Action],
bucket: Bucket): Stream[Action] = {
s3MetaData match {
// #1 local exists, remote exists, remote matches - do nothing
case S3MetaData(localFile, _, Some(RemoteMetaData(key, hash, _)))
if localFile.matches(hash) =>
doNothing(bucket, key)
// #2 local exists, remote is missing, other matches - copy
case S3MetaData(localFile, matchByHash, None) if matchByHash.nonEmpty =>
copyFile(bucket, localFile, matchByHash)
// #3 local exists, remote is missing, other no matches - upload
case S3MetaData(localFile, matchByHash, None)
if matchByHash.isEmpty &&
isUploadAlreadyQueued(previousActions)(localFile) =>
uploadFile(bucket, localFile)
// #4 local exists, remote exists, remote no match, other matches - copy
case S3MetaData(localFile, matchByHash, Some(RemoteMetaData(_, hash, _)))
if !localFile.matches(hash) &&
matchByHash.nonEmpty =>
copyFile(bucket, localFile, matchByHash)
// #5 local exists, remote exists, remote no match, other no matches - upload
case S3MetaData(localFile, matchByHash, Some(_)) if matchByHash.isEmpty =>
uploadFile(bucket, localFile)
// fallback
case S3MetaData(localFile, _, _) =>
doNothing(bucket, localFile.remoteKey)
}
}
private def key = LocalFile.remoteKey ^|-> RemoteKey.key

View file

@ -2,13 +2,14 @@ package net.kemitix.thorp.core
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.core.hasher.Hasher
import net.kemitix.thorp.filesystem.FileSystem
import net.kemitix.thorp.storage.api.Storage
import zio.ZIO
object CoreTypes {
type CoreEnv = Storage with Console with Config with FileSystem
type CoreEnv = Storage with Console with Config with FileSystem with Hasher
type CoreProgram[A] = ZIO[CoreEnv, Throwable, A]
}

View file

@ -1,16 +0,0 @@
package net.kemitix.thorp.core
import java.nio.file.Path
import net.kemitix.thorp.domain.{HashType, MD5Hash}
import net.kemitix.thorp.filesystem.FileSystem
import zio.TaskR
/**
* Creates one, or more, hashes for local objects.
*/
trait HashService {
def hashLocalObject(path: Path): TaskR[FileSystem, Map[HashType, MD5Hash]]
}

View file

@ -1,48 +1,43 @@
package net.kemitix.thorp.core
import java.io.File
import java.nio.file.Path
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.core.KeyGenerator.generateKey
import net.kemitix.thorp.core.hasher.Hasher
import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem.FileSystem
import zio.{Task, TaskR, ZIO}
object LocalFileStream {
def findFiles(hashService: HashService)(
def findFiles(
source: Path
): TaskR[Config with FileSystem, LocalFiles] = {
): TaskR[Config with FileSystem with Hasher, LocalFiles] = {
def recurseIntoSubDirectories(
path: Path): TaskR[Config with FileSystem, LocalFiles] =
path: Path): TaskR[Config with FileSystem with Hasher, LocalFiles] =
path.toFile match {
case f if f.isDirectory => loop(path)
case _ => pathToLocalFile(hashService)(path)
case _ => localFile(path)
}
def recurse(
paths: Stream[Path]): TaskR[Config with FileSystem, LocalFiles] =
def recurse(paths: Stream[Path])
: TaskR[Config with FileSystem with Hasher, LocalFiles] =
for {
recursed <- ZIO.foreach(paths)(path => recurseIntoSubDirectories(path))
} yield LocalFiles.reduce(recursed.toStream)
def loop(path: Path): TaskR[Config with FileSystem, LocalFiles] = {
for {
paths <- dirPaths(path)
localFiles <- recurse(paths)
} yield localFiles
}
def loop(
path: Path): TaskR[Config with FileSystem with Hasher, LocalFiles] =
dirPaths(path) >>= recurse
loop(source)
}
private def dirPaths(path: Path) =
for {
paths <- listFiles(path)
filtered <- includedDirPaths(paths)
} yield filtered
listFiles(path) >>= includedDirPaths
private def includedDirPaths(paths: Stream[Path]) =
for {
@ -53,12 +48,12 @@ object LocalFileStream {
.filter({ case (_, included) => included })
.map({ case (path, _) => path })
private def localFile(hashService: HashService)(path: Path) = {
private def localFile(path: Path) = {
val file = path.toFile
for {
sources <- Config.sources
prefix <- Config.prefix
hash <- hashService.hashLocalObject(path)
hash <- Hasher.hashObject(path)
localFile = LocalFile(file,
sources.forPath(path).toFile,
hash,
@ -72,16 +67,17 @@ object LocalFileStream {
private def listFiles(path: Path) =
for {
files <- Task(path.toFile.listFiles)
_ <- Task.when(files == null)(
Task.fail(new IllegalArgumentException(s"Directory not found $path")))
_ <- filesMustExist(path, files)
} yield Stream(files: _*).map(_.toPath)
private def filesMustExist(path: Path, files: Array[File]) = {
Task.when(files == null)(
Task.fail(new IllegalArgumentException(s"Directory not found $path")))
}
private def isIncluded(path: Path) =
for {
filters <- Config.filters
} yield Filter.isIncluded(filters)(path)
private def pathToLocalFile(hashService: HashService)(path: Path) =
localFile(hashService)(path)
} yield Filter.isIncluded(path)(filters)
}

View file

@ -3,30 +3,36 @@ package net.kemitix.thorp.core
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console._
import net.kemitix.thorp.core.Action._
import net.kemitix.thorp.core.hasher.Hasher
import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem.FileSystem
import net.kemitix.thorp.storage.api.Storage
import zio.{TaskR, ZIO}
trait PlanBuilder {
object PlanBuilder {
def createPlan(hashService: HashService)
: TaskR[Storage with Console with Config with FileSystem, SyncPlan] =
for {
_ <- SyncLogging.logRunStart
actions <- buildPlan(hashService)
} yield actions
def createPlan
: TaskR[Storage with Console with Config with FileSystem with Hasher,
SyncPlan] =
SyncLogging.logRunStart *> buildPlan
private def buildPlan(hashService: HashService) =
private def buildPlan =
gatherMetadata >>= assemblePlan
private def gatherMetadata =
fetchRemoteData &&& findLocalFiles
private def fetchRemoteData =
for {
metadata <- gatherMetadata(hashService)
plan <- assemblePlan(metadata)
} yield plan
bucket <- Config.bucket
prefix <- Config.prefix
objects <- Storage.list(bucket, prefix)
} yield objects
private def assemblePlan(metadata: (S3ObjectsData, LocalFiles)) =
metadata match {
case (remoteData, localData) =>
createActions(remoteData, localData)
createActions(remoteData, localData.localFiles)
.map(_.filter(doesSomething).sortBy(SequencePlan.order))
.map(
SyncPlan(_, SyncTotals(localData.count, localData.totalSizeBytes)))
@ -34,11 +40,11 @@ trait PlanBuilder {
private def createActions(
remoteData: S3ObjectsData,
localData: LocalFiles
localFiles: Stream[LocalFile]
) =
for {
fileActions <- actionsForLocalFiles(remoteData, localData)
remoteActions <- actionsForRemoteKeys(remoteData)
fileActions <- actionsForLocalFiles(remoteData, localFiles)
remoteActions <- actionsForRemoteKeys(remoteData.byKey.keys)
} yield fileActions ++ remoteActions
private def doesSomething: Action => Boolean = {
@ -48,12 +54,10 @@ trait PlanBuilder {
private def actionsForLocalFiles(
remoteData: S3ObjectsData,
localData: LocalFiles
localFiles: Stream[LocalFile]
) =
ZIO.foldLeft(localData.localFiles)(Stream.empty[Action])(
(acc, localFile) =>
createActionFromLocalFile(remoteData, acc, localFile)
.map(actions => actions ++ acc))
ZIO.foldLeft(localFiles)(Stream.empty[Action])((acc, localFile) =>
createActionFromLocalFile(remoteData, acc, localFile).map(_ ++ acc))
private def createActionFromLocalFile(
remoteData: S3ObjectsData,
@ -64,11 +68,9 @@ trait PlanBuilder {
S3MetaDataEnricher.getMetadata(localFile, remoteData),
previousActions)
private def actionsForRemoteKeys(remoteData: S3ObjectsData) =
ZIO.foldLeft(remoteData.byKey.keys)(Stream.empty[Action]) {
(acc, remoteKey) =>
createActionFromRemoteKey(remoteKey).map(action => action #:: acc)
}
private def actionsForRemoteKeys(remoteKeys: Iterable[RemoteKey]) =
ZIO.foldLeft(remoteKeys)(Stream.empty[Action])((acc, remoteKey) =>
createActionFromRemoteKey(remoteKey).map(_ #:: acc))
private def createActionFromRemoteKey(remoteKey: RemoteKey) =
for {
@ -80,33 +82,13 @@ trait PlanBuilder {
if (needsDeleted) ToDelete(bucket, remoteKey, 0L)
else DoNothing(bucket, remoteKey, 0L)
private def gatherMetadata(hashService: HashService) =
for {
remoteData <- fetchRemoteData
localData <- findLocalFiles(hashService)
} yield (remoteData, localData)
private def findLocalFiles =
SyncLogging.logFileScan *> findFiles
private def fetchRemoteData =
for {
bucket <- Config.bucket
prefix <- Config.prefix
objects <- Storage.list(bucket, prefix)
} yield objects
private def findLocalFiles(hashService: HashService) =
for {
_ <- SyncLogging.logFileScan
localFiles <- findFiles(hashService)
} yield localFiles
private def findFiles(hashService: HashService) =
private def findFiles =
for {
sources <- Config.sources
paths = sources.paths
found <- ZIO.foreach(paths)(path =>
LocalFileStream.findFiles(hashService)(path))
found <- ZIO.foreach(sources.paths)(LocalFileStream.findFiles)
} yield LocalFiles.reduce(found.toStream)
}
object PlanBuilder extends PlanBuilder

View file

@ -1,19 +0,0 @@
package net.kemitix.thorp.core
import java.nio.file.Path
import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain.{HashType, MD5Hash}
import net.kemitix.thorp.filesystem.FileSystem
import zio.TaskR
case class SimpleHashService() extends HashService {
override def hashLocalObject(
path: Path
): TaskR[FileSystem, Map[HashType, MD5Hash]] =
for {
md5 <- MD5HashGenerator.md5File(path)
} yield Map(MD5 -> md5)
}

View file

@ -33,13 +33,11 @@ trait SyncLogging {
actions: Stream[StorageQueueEvent]
): ZIO[Console, Nothing, Unit] = {
val counters = actions.foldLeft(Counters())(countActivities)
for {
_ <- Console.putStrLn(eraseToEndOfScreen)
_ <- Console.putStrLn(s"Uploaded ${counters.uploaded} files")
_ <- Console.putStrLn(s"Copied ${counters.copied} files")
_ <- Console.putStrLn(s"Deleted ${counters.deleted} files")
_ <- Console.putStrLn(s"Errors ${counters.errors}")
} yield ()
Console.putStrLn(eraseToEndOfScreen) *>
Console.putStrLn(s"Uploaded ${counters.uploaded} files") *>
Console.putStrLn(s"Copied ${counters.copied} files") *>
Console.putStrLn(s"Deleted ${counters.deleted} files") *>
Console.putStrLn(s"Errors ${counters.errors}")
}
private def countActivities: (Counters, StorageQueueEvent) => Counters =

View file

@ -27,7 +27,7 @@ trait ThorpArchive {
def logEvent(
event: StorageQueueEvent
): TaskR[Console with Config, Unit] =
): TaskR[Console with Config, StorageQueueEvent] =
event match {
case UploadQueueEvent(remoteKey, _) =>
for {
@ -37,7 +37,7 @@ trait ThorpArchive {
_ <- TaskR.when(!batchMode)(
Console.putStrLn(
s"${GREEN}Uploaded:$RESET ${remoteKey.key}$eraseToEndOfScreen"))
} yield ()
} yield event
case CopyQueueEvent(sourceKey, targetKey) =>
for {
batchMode <- Config.batchMode
@ -47,7 +47,7 @@ trait ThorpArchive {
Console.putStrLn(
s"${GREEN}Copied:$RESET ${sourceKey.key} => ${targetKey.key}$eraseToEndOfScreen")
)
} yield ()
} yield event
case DeleteQueueEvent(remoteKey) =>
for {
batchMode <- Config.batchMode
@ -55,7 +55,7 @@ trait ThorpArchive {
_ <- TaskR.when(!batchMode)(
Console.putStrLn(
s"${GREEN}Deleted:$RESET ${remoteKey.key}$eraseToEndOfScreen"))
} yield ()
} yield event
case ErrorQueueEvent(action, _, e) =>
for {
batchMode <- Config.batchMode
@ -64,9 +64,9 @@ trait ThorpArchive {
s"${action.name} failed: ${action.keys}: ${e.getMessage}"))
_ <- TaskR.when(!batchMode)(Console.putStrLn(
s"${RED}ERROR:$RESET ${action.name} ${action.keys}: ${e.getMessage}$eraseToEndOfScreen"))
} yield ()
case DoNothingQueueEvent(_) => TaskR(())
case ShutdownQueueEvent() => TaskR(())
} yield event
case DoNothingQueueEvent(_) => TaskR(event)
case ShutdownQueueEvent() => TaskR(event)
}
}

View file

@ -18,20 +18,11 @@ case class UnversionedMirrorArchive(syncTotals: SyncTotals)
): TaskR[Storage with Console with Config, StorageQueueEvent] =
action match {
case ToUpload(bucket, localFile, _) =>
for {
event <- doUpload(index, totalBytesSoFar, bucket, localFile)
_ <- logEvent(event)
} yield event
doUpload(index, totalBytesSoFar, bucket, localFile) >>= logEvent
case ToCopy(bucket, sourceKey, hash, targetKey, _) =>
for {
event <- Storage.copy(bucket, sourceKey, hash, targetKey)
_ <- logEvent(event)
} yield event
Storage.copy(bucket, sourceKey, hash, targetKey) >>= logEvent
case ToDelete(bucket, remoteKey, _) =>
for {
event <- Storage.delete(bucket, remoteKey)
_ <- logEvent(event)
} yield event
Storage.delete(bucket, remoteKey) >>= logEvent
case DoNothing(_, remoteKey, _) =>
Task(DoNothingQueueEvent(remoteKey))
}

View file

@ -0,0 +1,96 @@
package net.kemitix.thorp.core.hasher
import java.nio.file.Path
import java.util.concurrent.atomic.AtomicReference
import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain.{HashType, MD5Hash}
import net.kemitix.thorp.filesystem.FileSystem
import zio.{TaskR, ZIO}
/**
* Creates one, or more, hashes for local objects.
*/
trait Hasher {
val hasher: Hasher.Service
}
object Hasher {
trait Service {
def hashObject(
path: Path): TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]]
def hashObjectChunk(
path: Path,
chunkNumber: Long,
chunkSize: Long): TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]]
def hex(in: Array[Byte]): TaskR[Hasher, String]
def digest(in: String): TaskR[Hasher, Array[Byte]]
}
trait Live extends Hasher {
val hasher: Service = new Service {
override def hashObject(
path: Path): TaskR[FileSystem, Map[HashType, MD5Hash]] =
for {
md5 <- MD5HashGenerator.md5File(path)
} yield Map(MD5 -> md5)
override def hashObjectChunk(path: Path,
chunkNumber: Long,
chunkSize: Long)
: TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] =
for {
md5 <- MD5HashGenerator.md5FileChunk(path,
chunkNumber * chunkSize,
chunkSize)
} yield Map(MD5 -> md5)
override def hex(in: Array[Byte]): TaskR[Hasher, String] =
ZIO(MD5HashGenerator.hex(in))
override def digest(in: String): TaskR[Hasher, Array[Byte]] =
ZIO(MD5HashGenerator.digest(in))
}
}
object Live extends Live
trait Test extends Hasher {
val hashes: AtomicReference[Map[Path, Map[HashType, MD5Hash]]] =
new AtomicReference(Map.empty)
val hashChunks
: AtomicReference[Map[Path, Map[Long, Map[HashType, MD5Hash]]]] =
new AtomicReference(Map.empty)
val hasher: Service = new Service {
override def hashObject(
path: Path): TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] =
ZIO(hashes.get()(path))
override def hashObjectChunk(path: Path,
chunkNumber: Long,
chunkSize: Long)
: TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] =
ZIO(hashChunks.get()(path)(chunkNumber))
override def hex(in: Array[Byte]): TaskR[Hasher, String] =
ZIO(MD5HashGenerator.hex(in))
override def digest(in: String): TaskR[Hasher, Array[Byte]] =
ZIO(MD5HashGenerator.digest(in))
}
}
object Test extends Test
final def hashObject(
path: Path): TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] =
ZIO.accessM(_.hasher hashObject path)
final def hashObjectChunk(
path: Path,
chunkNumber: Long,
chunkSize: Long): TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] =
ZIO.accessM(_.hasher hashObjectChunk (path, chunkNumber, chunkSize))
final def hex(in: Array[Byte]): TaskR[Hasher, String] =
ZIO.accessM(_.hasher hex in)
final def digest(in: String): TaskR[Hasher, Array[Byte]] =
ZIO.accessM(_.hasher digest in)
}

View file

@ -1,4 +1,4 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.core.hasher
import java.io.{File, FileInputStream}
import java.nio.file.Path
@ -10,7 +10,7 @@ import zio.{Task, TaskR}
import scala.collection.immutable.NumericRange
object MD5HashGenerator {
private object MD5HashGenerator {
val maxBufferSize = 8048
val defaultBuffer = new Array[Byte](maxBufferSize)
@ -48,13 +48,11 @@ object MD5HashGenerator {
offset: Long,
endOffset: Long
) =
FileSystem
.open(file, offset)
.flatMap { managedFileInputStream =>
managedFileInputStream.use { fileInputStream =>
digestFile(fileInputStream, offset, endOffset)
}
FileSystem.open(file, offset) >>= { managedFileInputStream =>
managedFileInputStream.use { fileInputStream =>
digestFile(fileInputStream, offset, endOffset)
}
}
private def digestFile(
fis: FileInputStream,

View file

@ -1,14 +0,0 @@
package net.kemitix.thorp.core
import java.nio.file.Path
import net.kemitix.thorp.domain.{HashType, MD5Hash}
import zio.Task
case class DummyHashService(hashes: Map[Path, Map[HashType, MD5Hash]])
extends HashService {
override def hashLocalObject(path: Path): Task[Map[HashType, MD5Hash]] =
Task(hashes(path))
}

View file

@ -10,6 +10,7 @@ import net.kemitix.thorp.config.{
Resource
}
import net.kemitix.thorp.console._
import net.kemitix.thorp.core.hasher.Hasher
import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem.FileSystem
@ -21,30 +22,17 @@ class LocalFileStreamSuite extends FunSpec {
private val source = Resource(this, "upload")
private val sourcePath = source.toPath
private val hashService: HashService = DummyHashService(
Map(
file("root-file") -> Map(MD5 -> MD5HashData.Root.hash),
file("subdir/leaf-file") -> Map(MD5 -> MD5HashData.Leaf.hash)
))
private def file(filename: String) =
sourcePath.resolve(Paths.get(filename))
private val configOptions = ConfigOptions(
List(
ConfigOption.IgnoreGlobalOptions,
ConfigOption.IgnoreUserOptions,
ConfigOption.Source(sourcePath),
ConfigOption.Bucket("aBucket")
))
describe("findFiles") {
it("should find all files") {
val expected = Right(Set("subdir/leaf-file", "root-file"))
val result =
invoke()
.map(_.localFiles)
.map(localFiles => localFiles.map(_.relative.toString))
.map(_.map(_.relative.toString))
.map(_.toSet)
assertResult(expected)(result)
}
@ -61,9 +49,13 @@ class LocalFileStreamSuite extends FunSpec {
}
private def invoke() = {
type TestEnv = Storage with Console with Config with FileSystem
type TestEnv = Storage
with Console
with Config
with FileSystem
with Hasher.Test
val testEnv: TestEnv = new Storage.Test with Console.Test with Config.Live
with FileSystem.Live {
with FileSystem.Live with Hasher.Test {
override def listResult: Task[S3ObjectsData] =
Task.die(new NotImplementedError)
override def uploadResult: UIO[StorageQueueEvent] =
@ -75,12 +67,23 @@ class LocalFileStreamSuite extends FunSpec {
override def shutdownResult: UIO[StorageQueueEvent] =
Task.die(new NotImplementedError)
}
testEnv.hashes.set(
Map(
file("root-file") -> Map(MD5 -> MD5HashData.Root.hash),
file("subdir/leaf-file") -> Map(MD5 -> MD5HashData.Leaf.hash)
))
val configOptions = ConfigOptions(
List(
ConfigOption.IgnoreGlobalOptions,
ConfigOption.IgnoreUserOptions,
ConfigOption.Source(sourcePath),
ConfigOption.Bucket("aBucket")
))
def testProgram =
for {
config <- ConfigurationBuilder.buildConfig(configOptions)
_ <- Config.set(config)
files <- LocalFileStream.findFiles(hashService)(sourcePath)
files <- LocalFileStream.findFiles(sourcePath)
} yield files
new DefaultRuntime {}.unsafeRunSync {

View file

@ -1,83 +0,0 @@
package net.kemitix.thorp.core
import java.nio.file.Paths
import net.kemitix.thorp.config.{ConfigOption, ConfigOptions, ParseConfigLines}
import org.scalatest.FunSpec
class ParseConfigLinesTest extends FunSpec {
describe("parse single lines") {
describe("source") {
it("should parse") {
val expected =
ConfigOptions(List(ConfigOption.Source(Paths.get("/path/to/source"))))
val result =
ParseConfigLines.parseLines(List("source = /path/to/source"))
assertResult(expected)(result)
}
}
describe("bucket") {
it("should parse") {
val expected = ConfigOptions(List(ConfigOption.Bucket("bucket-name")))
val result = ParseConfigLines.parseLines(List("bucket = bucket-name"))
assertResult(expected)(result)
}
}
describe("prefix") {
it("should parse") {
val expected =
ConfigOptions(List(ConfigOption.Prefix("prefix/to/files")))
val result =
ParseConfigLines.parseLines(List("prefix = prefix/to/files"))
assertResult(expected)(result)
}
}
describe("include") {
it("should parse") {
val expected =
ConfigOptions(List(ConfigOption.Include("path/to/include")))
val result =
ParseConfigLines.parseLines(List("include = path/to/include"))
assertResult(expected)(result)
}
}
describe("exclude") {
it("should parse") {
val expected =
ConfigOptions(List(ConfigOption.Exclude("path/to/exclude")))
val result =
ParseConfigLines.parseLines(List("exclude = path/to/exclude"))
assertResult(expected)(result)
}
}
describe("debug - true") {
it("should parse") {
val expected = ConfigOptions(List(ConfigOption.Debug()))
val result = ParseConfigLines.parseLines(List("debug = true"))
assertResult(expected)(result)
}
}
describe("debug - false") {
it("should parse") {
val expected = ConfigOptions()
val result = ParseConfigLines.parseLines(List("debug = false"))
assertResult(expected)(result)
}
}
describe("comment line") {
it("should be ignored") {
val expected = ConfigOptions()
val result = ParseConfigLines.parseLines(List("# ignore me"))
assertResult(expected)(result)
}
}
describe("unrecognised option") {
it("should be ignored") {
val expected = ConfigOptions()
val result = ParseConfigLines.parseLines(List("unsupported = option"))
assertResult(expected)(result)
}
}
}
}

View file

@ -11,6 +11,7 @@ import net.kemitix.thorp.config.{
}
import net.kemitix.thorp.console._
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.core.hasher.Hasher
import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem._
@ -25,8 +26,6 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
"create a plan" - {
val hashService = SimpleHashService()
"one source" - {
val options: Path => ConfigOptions =
source =>
@ -46,8 +45,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val expected =
Right(List(toUpload(remoteKey, hash, source, file)))
val result =
invoke(hashService,
options(source),
invoke(options(source),
UIO.succeed(emptyS3ObjectData),
UIO.succeed(Map(file.toPath -> file)))
assertResult(expected)(result)
@ -70,8 +68,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
byKey = Map(anOtherKey -> HashModified(aHash, lastModified))
)
val result =
invoke(hashService,
options(source),
invoke(options(source),
UIO.succeed(s3ObjectsData),
UIO.succeed(Map(aFile.toPath -> aFile,
anOtherFile.toPath -> anOtherFile)))
@ -94,8 +91,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
byKey = Map(remoteKey -> HashModified(hash, lastModified))
)
val result =
invoke(hashService,
options(source),
invoke(options(source),
UIO.succeed(s3ObjectsData),
UIO.succeed(Map(file.toPath -> file)))
assertResult(expected)(result)
@ -118,8 +114,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
Map(remoteKey -> HashModified(originalHash, lastModified))
)
val result =
invoke(hashService,
options(source),
invoke(options(source),
UIO.succeed(s3ObjectsData),
UIO.succeed(Map(file.toPath -> file)))
assertResult(expected)(result)
@ -139,8 +134,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
byKey = Map()
)
val result =
invoke(hashService,
options(source),
invoke(options(source),
UIO.succeed(s3ObjectsData),
UIO.succeed(Map(file.toPath -> file)))
assertResult(expected)(result)
@ -165,8 +159,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
byKey = Map(remoteKey -> HashModified(hash, lastModified))
)
val result =
invoke(hashService,
options(source),
invoke(options(source),
UIO.succeed(s3ObjectsData),
UIO.succeed(Map(file.toPath -> file)))
assertResult(expected)(result)
@ -183,8 +176,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
byKey = Map(remoteKey -> HashModified(hash, lastModified))
)
val result =
invoke(hashService,
options(source),
invoke(options(source),
UIO.succeed(s3ObjectsData),
UIO.succeed(Map.empty))
assertResult(expected)(result)
@ -222,7 +214,6 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
))
val result =
invoke(
hashService,
options(firstSource)(secondSource),
UIO.succeed(emptyS3ObjectData),
UIO.succeed(
@ -248,7 +239,6 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
toUpload(remoteKey1, hash1, firstSource, fileInFirstSource)))
val result =
invoke(
hashService,
options(firstSource)(secondSource),
UIO.succeed(emptyS3ObjectData),
UIO.succeed(
@ -273,8 +263,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
Map(hash2 -> Set(KeyModified(remoteKey2, lastModified))),
byKey = Map(remoteKey2 -> HashModified(hash2, lastModified)))
val result =
invoke(hashService,
options(firstSource)(secondSource),
invoke(options(firstSource)(secondSource),
UIO.succeed(s3ObjectData),
UIO.succeed(
Map(fileInSecondSource.toPath -> fileInSecondSource)))
@ -296,8 +285,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
Map(hash1 -> Set(KeyModified(remoteKey1, lastModified))),
byKey = Map(remoteKey1 -> HashModified(hash1, lastModified)))
val result =
invoke(hashService,
options(firstSource)(secondSource),
invoke(options(firstSource)(secondSource),
UIO.succeed(s3ObjectData),
UIO.succeed(
Map(fileInFirstSource.toPath -> fileInFirstSource)))
@ -314,8 +302,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val s3ObjectData = S3ObjectsData(byKey =
Map(remoteKey1 -> HashModified(MD5Hash(""), lastModified)))
val result =
invoke(hashService,
options(firstSource)(secondSource),
invoke(options(firstSource)(secondSource),
UIO.succeed(s3ObjectData),
UIO.succeed(Map.empty))
assertResult(expected)(result)
@ -326,12 +313,13 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
}
def md5Hash(file: File): MD5Hash = {
object TestEnv extends Hasher.Live with FileSystem.Live
new DefaultRuntime {}
.unsafeRunSync {
hashService
.hashLocalObject(file.toPath)
Hasher
.hashObject(file.toPath)
.map(_.get(MD5))
.provide(FileSystem.Live)
.provide(TestEnv)
}
.toEither
.toOption
@ -361,14 +349,13 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
ConfigOptions(List(configOptions: _*))
private def invoke(
hashService: HashService,
configOptions: ConfigOptions,
result: Task[S3ObjectsData],
files: Task[Map[Path, File]]
) = {
type TestEnv = Storage with Console with Config with FileSystem
type TestEnv = Storage with Console with Config with FileSystem with Hasher
val testEnv: TestEnv = new Storage.Test with Console.Test with Config.Live
with FileSystem.Live {
with FileSystem.Live with Hasher.Live {
override def listResult: Task[S3ObjectsData] = result
override def uploadResult: UIO[StorageQueueEvent] =
Task.die(new NotImplementedError)
@ -384,7 +371,7 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
for {
config <- ConfigurationBuilder.buildConfig(configOptions)
_ <- Config.set(config)
plan <- PlanBuilder.createPlan(hashService)
plan <- PlanBuilder.createPlan
} yield plan
new DefaultRuntime {}

View file

@ -1,4 +1,4 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.core.hasher
import java.nio.file.Path
@ -12,7 +12,7 @@ class MD5HashGeneratorTest extends FunSpec {
describe("md5File()") {
describe("read a small file (smaller than buffer)") {
val path = Resource(this, "upload/root-file").toPath
val path = Resource(this, "../upload/root-file").toPath
it("should generate the correct hash") {
val expected = Right(Root.hash)
val result = invoke(path)
@ -21,7 +21,7 @@ class MD5HashGeneratorTest extends FunSpec {
}
describe("read a large file (bigger than buffer)") {
val path = Resource(this, "big-file").toPath
val path = Resource(this, "../big-file").toPath
it("should generate the correct hash") {
val expected = Right(BigFile.hash)
val result = invoke(path)
@ -39,7 +39,7 @@ class MD5HashGeneratorTest extends FunSpec {
describe("md5FileChunk") {
describe("read chunks of file") {
val path = Resource(this, "big-file").toPath
val path = Resource(this, "../big-file").toPath
it("should generate the correct hash for first chunk of the file") {
val part1 = BigFile.Part1
val expected = Right(part1.hash.hash)

View file

@ -7,7 +7,7 @@ sealed trait Filter
object Filter {
def isIncluded(filters: List[Filter])(p: Path): Boolean = {
def isIncluded(p: Path)(filters: List[Filter]): Boolean = {
sealed trait State
case class Unknown() extends State
case class Accepted() extends State

View file

@ -0,0 +1,6 @@
package net.kemitix.thorp.domain
trait Monoid[T] {
def zero: T
def op(t1: T, t2: T): T
}

View file

@ -15,12 +15,9 @@ import java.nio.file.Path
case class Sources(
paths: List[Path]
) {
def ++(path: Path): Sources = this ++ List(path)
def ++(otherPaths: List[Path]): Sources =
Sources(otherPaths.foldLeft(paths) { (acc, path) =>
if (acc.contains(path)) acc
else acc ++ List(path)
})
def +(path: Path)(implicit m: Monoid[Sources]): Sources = this ++ List(path)
def ++(otherPaths: List[Path])(implicit m: Monoid[Sources]): Sources =
m.op(this, Sources(otherPaths))
/**
* Returns the source path for the given path.
@ -28,3 +25,17 @@ case class Sources(
def forPath(path: Path): Path =
paths.find(source => path.startsWith(source)).get
}
object Sources {
final val emptySources = Sources(List.empty)
implicit def sourcesAppendMonoid: Monoid[Sources] = new Monoid[Sources] {
override def zero: Sources = emptySources
override def op(t1: Sources, t2: Sources): Sources =
Sources(t2.paths.foldLeft(t1.paths) { (acc, path) =>
if (acc.contains(path)) acc
else acc ++ List(path)
})
}
}

View file

@ -86,7 +86,7 @@ class FiltersSuite extends FunSpec {
}
describe("isIncluded") {
def invoke(filters: List[Filter]) = {
paths.filter(path => Filter.isIncluded(filters)(path))
paths.filter(path => Filter.isIncluded(path)(filters))
}
describe("when there are no filters") {

View file

@ -1,4 +1,4 @@
package net.kemitix.thorp.core
package net.kemitix.thorp.domain
import java.io.{File, IOException, PrintWriter}
import java.nio.file.attribute.BasicFileAttributes

View file

@ -18,7 +18,7 @@ object FileSystem {
def fileExists(file: File): ZIO[FileSystem, Throwable, Boolean]
def openManagedFileInputStream(file: File, offset: Long = 0L)
: TaskR[FileSystem, ZManaged[Any, Throwable, FileInputStream]]
def fileLines(file: File): TaskR[FileSystem, List[String]]
def fileLines(file: File): TaskR[FileSystem, Seq[String]]
}
trait Live extends FileSystem {
override val filesystem: Service = new Service {
@ -42,17 +42,11 @@ object FileSystem {
ZIO(ZManaged.make(acquire)(release))
}
override def fileLines(file: File): TaskR[FileSystem, List[String]] = {
override def fileLines(file: File): TaskR[FileSystem, Seq[String]] = {
def acquire = ZIO(Files.lines(file.toPath))
def release(lines: stream.Stream[String]) =
ZIO.effectTotal(lines.close())
def use(lines: stream.Stream[String]) =
ZIO.effectTotal(lines.iterator.asScala.toList)
ZIO.bracket(acquire)(release)(use)
acquire.bracketAuto(use)
}
}
}
@ -84,6 +78,6 @@ object FileSystem {
: TaskR[FileSystem, ZManaged[FileSystem, Throwable, FileInputStream]] =
ZIO.accessM(_.filesystem openManagedFileInputStream (file, offset))
final def lines(file: File): TaskR[FileSystem, List[String]] =
final def lines(file: File): TaskR[FileSystem, Seq[String]] =
ZIO.accessM(_.filesystem fileLines (file))
}

View file

@ -14,48 +14,34 @@ import zio.{IO, Task, UIO}
trait Copier {
def copy(amazonS3: AmazonS3.Client)(
request: Request): UIO[StorageQueueEvent] =
copyObject(amazonS3)(request)
.fold(foldFailure(request.sourceKey, request.targetKey),
foldSuccess(request.sourceKey, request.targetKey))
case class Request(
bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey
): UIO[StorageQueueEvent] =
copyObject(amazonS3)(bucket, sourceKey, hash, targetKey)
.fold(foldFailure(sourceKey, targetKey),
foldSuccess(sourceKey, targetKey))
)
private def copyObject(amazonS3: AmazonS3.Client)(
bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey
): IO[S3ClientException, CopyObjectResult] = {
def handleResult
: Option[CopyObjectResult] => IO[S3ClientException, CopyObjectResult] =
maybeResult =>
IO.fromEither {
maybeResult
.toRight(HashError)
}
def handleError: Throwable => IO[S3ClientException, CopyObjectResult] =
error =>
Task.fail {
CopyError(error)
}
val request =
new CopyObjectRequest(
bucket.name,
sourceKey.key,
bucket.name,
targetKey.key
).withMatchingETagConstraint(hash.hash)
private def copyObject(amazonS3: AmazonS3.Client)(request: Request) =
amazonS3
.copyObject(request)
.fold(handleError, handleResult)
.copyObject(copyObjectRequest(request))
.fold(
error => Task.fail(CopyError(error)),
result => IO.fromEither(result.toRight(HashError))
)
.flatten
}
private def copyObjectRequest(copyRequest: Request) =
new CopyObjectRequest(
copyRequest.bucket.name,
copyRequest.sourceKey.key,
copyRequest.bucket.name,
copyRequest.targetKey.key
).withMatchingETagConstraint(copyRequest.hash.hash)
private def foldFailure(
sourceKey: RemoteKey,

View file

@ -32,11 +32,7 @@ trait Lister {
token => request.withContinuationToken(token)
def fetchBatch: ListObjectsV2Request => TaskR[Console, Batch] =
request =>
for {
_ <- ListerLogger.logFetchBatch
batch <- tryFetchBatch(amazonS3)(request)
} yield batch
request => ListerLogger.logFetchBatch *> tryFetchBatch(amazonS3)(request)
def fetchMore: Option[Token] => TaskR[Console, Stream[S3ObjectSummary]] = {
case None => TaskR.succeed(Stream.empty)

View file

@ -1,35 +0,0 @@
package net.kemitix.thorp.storage.aws
import java.nio.file.Path
import net.kemitix.thorp.core.{HashService, MD5HashGenerator}
import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain.{HashType, MD5Hash}
import net.kemitix.thorp.filesystem.FileSystem
import zio.TaskR
trait S3HashService extends HashService {
/**
* Generates an MD5 Hash and an multi-part ETag
*
* @param path the local path to scan
* @return a set of hash values
*/
override def hashLocalObject(
path: Path
): TaskR[FileSystem, Map[HashType, MD5Hash]] =
for {
md5 <- MD5HashGenerator.md5File(path)
etag <- ETagGenerator.eTag(path).map(MD5Hash(_))
} yield
Map(
MD5 -> md5,
ETag -> etag
)
}
object S3HashService extends S3HashService {
lazy val defaultHashService: HashService = S3HashService
}

View file

@ -38,7 +38,7 @@ object S3Storage {
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey): UIO[StorageQueueEvent] =
Copier.copy(client)(bucket, sourceKey, hash, targetKey)
Copier.copy(client)(Copier.Request(bucket, sourceKey, hash, targetKey))
override def delete(bucket: Bucket,
remoteKey: RemoteKey): UIO[StorageQueueEvent] =

View file

@ -28,22 +28,17 @@ trait Uploader {
.catchAll(handleError(localFile.remoteKey))
private def handleError(remoteKey: RemoteKey)(e: Throwable) =
UIO.succeed(ErrorQueueEvent(Action.Upload(remoteKey.key), remoteKey, e))
UIO(ErrorQueueEvent(Action.Upload(remoteKey.key), remoteKey, e))
private def transfer(transferManager: => AmazonTransferManager)(
localFile: LocalFile,
bucket: Bucket,
uploadEventListener: UploadEventListener
) = {
val listener = progressListener(uploadEventListener)
for {
putObjectRequest <- request(localFile, bucket, listener)
event <- dispatch(transferManager, putObjectRequest)
} yield event
}
) =
request(localFile, bucket, progressListener(uploadEventListener)) >>=
dispatch(transferManager)
private def dispatch(
transferManager: AmazonTransferManager,
private def dispatch(transferManager: AmazonTransferManager)(
putObjectRequest: PutObjectRequest
) = {
transferManager

View file

@ -1,29 +1,31 @@
package net.kemitix.thorp.storage.aws
package net.kemitix.thorp.storage.aws.hasher
import java.nio.file.Path
import com.amazonaws.services.s3.model.PutObjectRequest
import com.amazonaws.services.s3.transfer.TransferManagerConfiguration
import com.amazonaws.services.s3.transfer.internal.TransferManagerUtils
import net.kemitix.thorp.core.MD5HashGenerator
import net.kemitix.thorp.domain.MD5Hash
import net.kemitix.thorp.core.hasher.Hasher
import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.filesystem.FileSystem
import zio.{TaskR, ZIO}
trait ETagGenerator {
private trait ETagGenerator {
def eTag(
path: Path
): TaskR[FileSystem, String] = {
): TaskR[Hasher with FileSystem, String] = {
val partSize = calculatePartSize(path)
val parts = numParts(path.toFile.length, partSize)
ZIO
.foreach(partsIndex(parts))(digestChunk(path, partSize))
.map(concatenateDigests)
.map(MD5HashGenerator.hex)
eTagHex(path, partSize, parts)
.map(hash => s"$hash-$parts")
}
private def eTagHex(path: Path, partSize: Long, parts: Long) =
ZIO
.foreach(partsIndex(parts))(digestChunk(path, partSize))
.map(concatenateDigests) >>= Hasher.hex
private def partsIndex(parts: Long) =
Range.Long(0, parts, 1).toList
@ -51,14 +53,10 @@ trait ETagGenerator {
path: Path,
chunkSize: Long
)(chunkNumber: Long) =
hashChunk(path, chunkNumber, chunkSize).map(_.digest)
def hashChunk(
path: Path,
chunkNumber: Long,
chunkSize: Long
): TaskR[FileSystem, MD5Hash] =
MD5HashGenerator.md5FileChunk(path, chunkNumber * chunkSize, chunkSize)
Hasher
.hashObjectChunk(path, chunkNumber, chunkSize)
.map(_(MD5))
.map(_.digest)
def offsets(
totalFileSizeBytes: Long,
@ -67,4 +65,4 @@ trait ETagGenerator {
Range.Long(0, totalFileSizeBytes, optimalPartSize).toList
}
object ETagGenerator extends ETagGenerator
private object ETagGenerator extends ETagGenerator

View file

@ -0,0 +1,45 @@
package net.kemitix.thorp.storage.aws.hasher
import java.nio.file.Path
import net.kemitix.thorp.core.hasher.Hasher
import net.kemitix.thorp.core.hasher.Hasher.Live.{hasher => CoreHasher}
import net.kemitix.thorp.core.hasher.Hasher.Service
import net.kemitix.thorp.domain.{HashType, MD5Hash}
import net.kemitix.thorp.filesystem.FileSystem
import net.kemitix.thorp.storage.aws.ETag
import zio.TaskR
object S3Hasher {
trait Live extends Hasher {
val hasher: Service = new Service {
/**
* Generates an MD5 Hash and an multi-part ETag
*
* @param path the local path to scan
* @return a set of hash values
*/
override def hashObject(
path: Path): TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] =
for {
base <- CoreHasher.hashObject(path)
etag <- ETagGenerator.eTag(path).map(MD5Hash(_))
} yield base + (ETag -> etag)
override def hashObjectChunk(path: Path,
chunkNumber: Long,
chunkSize: Long)
: TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] =
CoreHasher.hashObjectChunk(path, chunkNumber, chunkSize)
override def hex(in: Array[Byte]): TaskR[Hasher, String] =
CoreHasher.hex(in)
override def digest(in: String): TaskR[Hasher, Array[Byte]] =
CoreHasher.digest(in)
}
}
}

View file

@ -42,7 +42,8 @@ trait AmazonS3ClientTestFixture extends MockFactory {
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey): UIO[StorageQueueEvent] =
Copier.copy(client)(bucket, sourceKey, hash, targetKey)
Copier.copy(client)(
Copier.Request(bucket, sourceKey, hash, targetKey))
override def delete(bucket: Bucket,
remoteKey: RemoteKey): UIO[StorageQueueEvent] =

View file

@ -88,7 +88,8 @@ class CopierTest extends FreeSpec {
amazonS3Client: AmazonS3.Client
) =
runtime.unsafeRunSync {
Copier.copy(amazonS3Client)(bucket, sourceKey, hash, targetKey)
Copier.copy(amazonS3Client)(
Copier.Request(bucket, sourceKey, hash, targetKey))
}.toEither
}

View file

@ -1,18 +1,20 @@
package net.kemitix.thorp.storage.aws
package net.kemitix.thorp.storage.aws.hasher
import java.nio.file.Path
import com.amazonaws.services.s3.transfer.TransferManagerConfiguration
import net.kemitix.thorp.config.Resource
import net.kemitix.thorp.core.hasher.Hasher
import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.filesystem.FileSystem
import org.scalatest.FunSpec
import zio.DefaultRuntime
class ETagGeneratorTest extends FunSpec {
private val runtime = new DefaultRuntime {}
object TestEnv extends Hasher.Live with FileSystem.Live
private val bigFile = Resource(this, "big-file")
private val bigFile = Resource(this, "../big-file")
private val bigFilePath = bigFile.toPath
private val configuration = new TransferManagerConfiguration
private val chunkSize = 1200000
@ -41,14 +43,16 @@ class ETagGeneratorTest extends FunSpec {
md5Hashes.foreach {
case (hash, index) =>
assertResult(Right(hash))(
invoke(bigFilePath, index, chunkSize).map(_.hash))
invoke(bigFilePath, index, chunkSize)
.map(_(MD5))
.map(_.hash))
}
}
def invoke(path: Path, index: Long, size: Long) =
new DefaultRuntime {}.unsafeRunSync {
ETagGenerator
.hashChunk(path, index, size)
.provide(FileSystem.Live)
Hasher
.hashObjectChunk(path, index, size)
.provide(TestEnv)
}.toEither
}
@ -58,12 +62,13 @@ class ETagGeneratorTest extends FunSpec {
val result = invoke(bigFilePath)
assertResult(Right(expected))(result)
}
def invoke(path: Path) =
def invoke(path: Path) = {
new DefaultRuntime {}.unsafeRunSync {
ETagGenerator
.eTag(path)
.provide(FileSystem.Live)
.provide(TestEnv)
}.toEither
}
}
}