Create FileSystem as full ZIO effect module (#135)

* [changelog] updated

* [sbt] Add filesystem module

* [filesystem] stub effect module

* [filesystem] Add fileExists

* [filesystem] Add openFile

* [filesystem] Add fileLines

* [config] Use FileSystem.fileExists

* [filesystem] FileSystem.Test implement test methods

* [filesystem] Replace package object with FS object

* [storage] merge storage package object into Storage object

* [console] merge console package with Console object
This commit is contained in:
Paul Campbell 2019-07-30 08:07:26 +01:00 committed by GitHub
parent f6ce262f2b
commit a93781007d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
37 changed files with 394 additions and 244 deletions

View file

@ -21,6 +21,7 @@ The format is based on [[https://keepachangelog.com/en/1.0.0/][Keep a Changelog]
- [internal] Don't use String as key in Map for hashes (#124) - [internal] Don't use String as key in Map for hashes (#124)
- [internal] Convert Storage to full ZIO effect module (#133) - [internal] Convert Storage to full ZIO effect module (#133)
- [internal] Convert Config to full ZIO effect module (#134) - [internal] Convert Config to full ZIO effect module (#134)
- [internal] Create FileSystem ZIO effect module (#135)
** Dependencies ** Dependencies

View file

@ -59,6 +59,7 @@ val zioDependencies = Seq(
// cli -> thorp-lib -> storage-aws -> core -> storage-api -> console -> domain // cli -> thorp-lib -> storage-aws -> core -> storage-api -> console -> domain
// core -> config -> domain // core -> config -> domain
// config -> filesystem
lazy val thorp = (project in file(".")) lazy val thorp = (project in file("."))
.settings(commonSettings) .settings(commonSettings)
@ -122,6 +123,13 @@ lazy val config = (project in file("config"))
.settings(commandLineParsing) .settings(commandLineParsing)
.settings(assemblyJarName in assembly := "config.jar") .settings(assemblyJarName in assembly := "config.jar")
.dependsOn(domain) .dependsOn(domain)
.dependsOn(filesystem)
lazy val filesystem = (project in file("filesystem"))
.settings(commonSettings)
.settings(zioDependencies)
.settings(testDependencies)
.settings(assemblyJarName in assembly := "filesystem.jar")
lazy val domain = (project in file("domain")) lazy val domain = (project in file("domain"))
.settings(commonSettings) .settings(commonSettings)

View file

@ -2,12 +2,17 @@ package net.kemitix.thorp.cli
import net.kemitix.thorp.config.Config import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console.Console import net.kemitix.thorp.console.Console
import net.kemitix.thorp.filesystem.FileSystem
import net.kemitix.thorp.storage.aws.S3Storage import net.kemitix.thorp.storage.aws.S3Storage
import zio.{App, ZIO} import zio.{App, ZIO}
object Main extends App { object Main extends App {
object LiveThorpApp extends S3Storage.Live with Console.Live with Config.Live object LiveThorpApp
extends S3Storage.Live
with Console.Live
with Config.Live
with FileSystem.Live
override def run(args: List[String]): ZIO[Environment, Nothing, Int] = override def run(args: List[String]): ZIO[Environment, Nothing, Int] =
Program Program

View file

@ -17,7 +17,7 @@ trait Program {
cli <- CliArgs.parse(args) cli <- CliArgs.parse(args)
config <- ConfigurationBuilder.buildConfig(cli) config <- ConfigurationBuilder.buildConfig(cli)
_ <- setConfiguration(config) _ <- setConfiguration(config)
_ <- ZIO.when(showVersion(cli))(putStrLn(version)) _ <- ZIO.when(showVersion(cli))(Console.putStrLn(version))
_ <- ZIO.when(!showVersion(cli))(execute.catchAll(handleErrors)) _ <- ZIO.when(!showVersion(cli))(execute.catchAll(handleErrors))
} yield () } yield ()
} }
@ -37,10 +37,10 @@ trait Program {
private def handleErrors(throwable: Throwable) = private def handleErrors(throwable: Throwable) =
for { for {
_ <- putStrLn("There were errors:") _ <- Console.putStrLn("There were errors:")
_ <- throwable match { _ <- throwable match {
case ConfigValidationException(errors) => case ConfigValidationException(errors) =>
ZIO.foreach(errors)(error => putStrLn(s"- $error")) ZIO.foreach(errors)(error => Console.putStrLn(s"- $error"))
} }
} yield () } yield ()

View file

@ -2,7 +2,8 @@ package net.kemitix.thorp.config
import java.nio.file.Paths import java.nio.file.Paths
import zio.{IO, TaskR} import net.kemitix.thorp.filesystem.FileSystem
import zio.ZIO
/** /**
* Builds a configuration from settings in a file within the * Builds a configuration from settings in a file within the
@ -15,15 +16,14 @@ trait ConfigurationBuilder {
private val userHome = Paths.get(System.getProperty("user.home")) private val userHome = Paths.get(System.getProperty("user.home"))
def buildConfig(priorityOpts: ConfigOptions) def buildConfig(priorityOpts: ConfigOptions)
: IO[ConfigValidationException, Configuration] = : ZIO[FileSystem, ConfigValidationException, Configuration] =
(for { (for {
config <- getConfigOptions(priorityOpts).map(collateOptions) config <- getConfigOptions(priorityOpts).map(collateOptions)
valid <- ConfigValidator.validateConfig(config) valid <- ConfigValidator.validateConfig(config)
} yield valid) } yield valid)
.catchAll(errors => TaskR.fail(ConfigValidationException(errors))) .catchAll(errors => ZIO.fail(ConfigValidationException(errors)))
private def getConfigOptions( private def getConfigOptions(priorityOpts: ConfigOptions) =
priorityOpts: ConfigOptions): IO[List[ConfigValidation], ConfigOptions] =
for { for {
sourceOpts <- SourceConfigLoader.loadSourceConfigs( sourceOpts <- SourceConfigLoader.loadSourceConfigs(
ConfigQuery.sources(priorityOpts)) ConfigQuery.sources(priorityOpts))
@ -31,15 +31,13 @@ trait ConfigurationBuilder {
globalOpts <- globalOptions(priorityOpts ++ sourceOpts ++ userOpts) globalOpts <- globalOptions(priorityOpts ++ sourceOpts ++ userOpts)
} yield priorityOpts ++ sourceOpts ++ userOpts ++ globalOpts } yield priorityOpts ++ sourceOpts ++ userOpts ++ globalOpts
private val emptyConfig = IO.succeed(ConfigOptions()) private val emptyConfig = ZIO.succeed(ConfigOptions())
private def userOptions( private def userOptions(priorityOpts: ConfigOptions) =
priorityOpts: ConfigOptions): IO[List[ConfigValidation], ConfigOptions] =
if (ConfigQuery.ignoreUserOptions(priorityOpts)) emptyConfig if (ConfigQuery.ignoreUserOptions(priorityOpts)) emptyConfig
else ParseConfigFile.parseFile(userHome.resolve(userConfigFilename)) else ParseConfigFile.parseFile(userHome.resolve(userConfigFilename))
private def globalOptions( private def globalOptions(priorityOpts: ConfigOptions) =
priorityOpts: ConfigOptions): IO[List[ConfigValidation], ConfigOptions] =
if (ConfigQuery.ignoreGlobalOptions(priorityOpts)) emptyConfig if (ConfigQuery.ignoreGlobalOptions(priorityOpts)) emptyConfig
else ParseConfigFile.parseFile(globalConfig) else ParseConfigFile.parseFile(globalConfig)

View file

@ -1,32 +1,25 @@
package net.kemitix.thorp.config package net.kemitix.thorp.config
import java.nio.file.{Files, Path} import java.nio.file.Path
import zio.{IO, Task} import net.kemitix.thorp.filesystem.{FS, FileSystem}
import zio.{IO, ZIO}
import scala.collection.JavaConverters._
trait ParseConfigFile { trait ParseConfigFile {
def parseFile(filename: Path): IO[List[ConfigValidation], ConfigOptions] = def parseFile(
filename: Path): ZIO[FileSystem, List[ConfigValidation], ConfigOptions] =
readFile(filename) readFile(filename)
.map(ParseConfigLines.parseLines) .map(ParseConfigLines.parseLines)
.catchAll(h => .catchAll(h =>
IO.fail( IO.fail(
List(ConfigValidation.ErrorReadingFile(filename, h.getMessage)))) List(ConfigValidation.ErrorReadingFile(filename, h.getMessage))))
private def readFile(filename: Path): Task[List[String]] = { private def readFile(filename: Path) =
if (Files.exists(filename)) readFileThatExists(filename) FS.exists(filename.toFile)
else IO(List()) .flatMap(
} if (_) FS.lines(filename.toFile)
else ZIO.succeed(List.empty))
private def readFileThatExists(filename: Path): Task[List[String]] =
for {
lines <- IO(Files.lines(filename))
list = lines.iterator.asScala.toList
//FIXME: use a bracket to close the file
_ <- IO(lines.close())
} yield list
} }

View file

@ -1,13 +1,15 @@
package net.kemitix.thorp.config package net.kemitix.thorp.config
import net.kemitix.thorp.domain.Sources import net.kemitix.thorp.domain.Sources
import zio.IO import net.kemitix.thorp.filesystem.FileSystem
import zio.ZIO
trait SourceConfigLoader { trait SourceConfigLoader {
val thorpConfigFileName = ".thorp.conf" val thorpConfigFileName = ".thorp.conf"
def loadSourceConfigs: Sources => IO[List[ConfigValidation], ConfigOptions] = def loadSourceConfigs
: Sources => ZIO[FileSystem, List[ConfigValidation], ConfigOptions] =
sources => { sources => {
val sourceConfigOptions = val sourceConfigOptions =
@ -18,7 +20,8 @@ trait SourceConfigLoader {
acc ++ co acc ++ co
} }
IO.foreach(sources.paths) { path => ZIO
.foreach(sources.paths) { path =>
ParseConfigFile.parseFile(path.resolve(thorpConfigFileName)) ParseConfigFile.parseFile(path.resolve(thorpConfigFileName))
} }
.map(reduce) .map(reduce)

View file

@ -51,4 +51,13 @@ object Console {
object Test extends Test object Test extends Test
final val consoleService: ZIO[Console, Nothing, Console.Service] =
ZIO.access(_.console)
final def putStrLn(line: String): ZIO[Console, Nothing, Unit] =
ZIO.accessM(_.console putStrLn line)
final def putMessageLn(line: ConsoleOut): ZIO[Console, Nothing, Unit] =
ZIO.accessM(_.console putStrLn line)
} }

View file

@ -1,16 +0,0 @@
package net.kemitix.thorp
import zio.ZIO
package object console {
final val consoleService: ZIO[Console, Nothing, Console.Service] =
ZIO.access(_.console)
final def putStrLn(line: String): ZIO[Console, Nothing, Unit] =
ZIO.accessM(_.console putStrLn line)
final def putMessageLn(line: ConsoleOut): ZIO[Console, Nothing, Unit] =
ZIO.accessM(_.console putStrLn line)
}

View file

@ -2,12 +2,13 @@ package net.kemitix.thorp.core
import net.kemitix.thorp.config.Config import net.kemitix.thorp.config.Config
import net.kemitix.thorp.console.Console import net.kemitix.thorp.console.Console
import net.kemitix.thorp.filesystem.FileSystem
import net.kemitix.thorp.storage.api.Storage import net.kemitix.thorp.storage.api.Storage
import zio.ZIO import zio.ZIO
object CoreTypes { object CoreTypes {
type CoreEnv = Storage with Console with Config type CoreEnv = Storage with Console with Config with FileSystem
type CoreProgram[A] = ZIO[CoreEnv, Throwable, A] type CoreProgram[A] = ZIO[CoreEnv, Throwable, A]
} }

View file

@ -0,0 +1,16 @@
package net.kemitix.thorp.core
import java.nio.file.Path
import net.kemitix.thorp.domain.{HashType, MD5Hash}
import net.kemitix.thorp.filesystem.FileSystem
import zio.TaskR
/**
* Creates one, or more, hashes for local objects.
*/
trait HashService {
def hashLocalObject(path: Path): TaskR[FileSystem, Map[HashType, MD5Hash]]
}

View file

@ -5,27 +5,29 @@ import java.nio.file.Path
import net.kemitix.thorp.config._ import net.kemitix.thorp.config._
import net.kemitix.thorp.core.KeyGenerator.generateKey import net.kemitix.thorp.core.KeyGenerator.generateKey
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.HashService import net.kemitix.thorp.filesystem.FileSystem
import zio.{Task, TaskR, ZIO} import zio.{Task, TaskR, ZIO}
object LocalFileStream { object LocalFileStream {
def findFiles(hashService: HashService)( def findFiles(hashService: HashService)(
source: Path source: Path
): TaskR[Config, LocalFiles] = { ): TaskR[Config with FileSystem, LocalFiles] = {
def recurseIntoSubDirectories(path: Path): TaskR[Config, LocalFiles] = def recurseIntoSubDirectories(
path: Path): TaskR[Config with FileSystem, LocalFiles] =
path.toFile match { path.toFile match {
case f if f.isDirectory => loop(path) case f if f.isDirectory => loop(path)
case _ => pathToLocalFile(hashService)(path) case _ => pathToLocalFile(hashService)(path)
} }
def recurse(paths: Stream[Path]): TaskR[Config, LocalFiles] = def recurse(
paths: Stream[Path]): TaskR[Config with FileSystem, LocalFiles] =
for { for {
recursed <- ZIO.foreach(paths)(path => recurseIntoSubDirectories(path)) recursed <- ZIO.foreach(paths)(path => recurseIntoSubDirectories(path))
} yield LocalFiles.reduce(recursed.toStream) } yield LocalFiles.reduce(recursed.toStream)
def loop(path: Path): TaskR[Config, LocalFiles] = { def loop(path: Path): TaskR[Config with FileSystem, LocalFiles] = {
for { for {
paths <- dirPaths(path) paths <- dirPaths(path)

View file

@ -5,7 +5,8 @@ import java.nio.file.Path
import java.security.MessageDigest import java.security.MessageDigest
import net.kemitix.thorp.domain.MD5Hash import net.kemitix.thorp.domain.MD5Hash
import zio.{Task, UIO, ZManaged} import net.kemitix.thorp.filesystem.{FS, FileSystem}
import zio.{Task, TaskR}
import scala.collection.immutable.NumericRange import scala.collection.immutable.NumericRange
@ -26,14 +27,14 @@ object MD5HashGenerator {
md5.digest md5.digest
} }
def md5File(path: Path): Task[MD5Hash] = def md5File(path: Path): TaskR[FileSystem, MD5Hash] =
md5FileChunk(path, 0, path.toFile.length) md5FileChunk(path, 0, path.toFile.length)
def md5FileChunk( def md5FileChunk(
path: Path, path: Path,
offset: Long, offset: Long,
size: Long size: Long
): Task[MD5Hash] = { ): TaskR[FileSystem, MD5Hash] = {
val file = path.toFile val file = path.toFile
val endOffset = Math.min(offset + size, file.length) val endOffset = Math.min(offset + size, file.length)
for { for {
@ -47,22 +48,12 @@ object MD5HashGenerator {
offset: Long, offset: Long,
endOffset: Long endOffset: Long
) = ) =
openFile(file, offset) FS.open(file, offset)
.use(digestFile(_, offset, endOffset)) .flatMap { managedFileInputStream =>
managedFileInputStream.use { fileInputStream =>
private def openFile( digestFile(fileInputStream, offset, endOffset)
file: File, }
offset: Long
) =
ZManaged.make {
Task {
val stream = new FileInputStream(file)
stream skip offset
stream
} }
}(closeFile)
private def closeFile(fis: FileInputStream) = UIO(fis.close())
private def digestFile( private def digestFile(
fis: FileInputStream, fis: FileInputStream,

View file

@ -4,14 +4,14 @@ import net.kemitix.thorp.config._
import net.kemitix.thorp.console._ import net.kemitix.thorp.console._
import net.kemitix.thorp.core.Action._ import net.kemitix.thorp.core.Action._
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage._ import net.kemitix.thorp.filesystem.FileSystem
import net.kemitix.thorp.storage.api.{HashService, Storage} import net.kemitix.thorp.storage.api.Storage
import zio.{TaskR, ZIO} import zio.{TaskR, ZIO}
trait PlanBuilder { trait PlanBuilder {
def createPlan(hashService: HashService) def createPlan(hashService: HashService)
: TaskR[Storage with Console with Config, SyncPlan] = : TaskR[Storage with Console with Config with FileSystem, SyncPlan] =
for { for {
_ <- SyncLogging.logRunStart _ <- SyncLogging.logRunStart
actions <- buildPlan(hashService) actions <- buildPlan(hashService)
@ -75,7 +75,7 @@ trait PlanBuilder {
bucket <- getBucket bucket <- getBucket
prefix <- getPrefix prefix <- getPrefix
sources <- getSources sources <- getSources
needsDeleted = remoteKey.isMissingLocally(sources, prefix) needsDeleted <- Remote.isMissingLocally(sources, prefix)(remoteKey)
} yield } yield
if (needsDeleted) ToDelete(bucket, remoteKey, 0L) if (needsDeleted) ToDelete(bucket, remoteKey, 0L)
else DoNothing(bucket, remoteKey, 0L) else DoNothing(bucket, remoteKey, 0L)
@ -90,7 +90,7 @@ trait PlanBuilder {
for { for {
bucket <- getBucket bucket <- getBucket
prefix <- getPrefix prefix <- getPrefix
objects <- listObjects(bucket, prefix) objects <- Storage.list(bucket, prefix)
} yield objects } yield objects
private def findLocalFiles(hashService: HashService) = private def findLocalFiles(hashService: HashService) =

View file

@ -0,0 +1,30 @@
package net.kemitix.thorp.core
import java.nio.file.Path
import net.kemitix.thorp.domain.{RemoteKey, Sources}
import net.kemitix.thorp.filesystem.{FS, FileSystem}
import zio.{TaskR, ZIO}
object Remote {
def isMissingLocally(sources: Sources, prefix: RemoteKey)(
remoteKey: RemoteKey
): TaskR[FileSystem, Boolean] =
existsLocally(sources, prefix)(remoteKey)
.map(exists => !exists)
def existsLocally(sources: Sources, prefix: RemoteKey)(
remoteKey: RemoteKey
): TaskR[FileSystem, Boolean] = {
def existsInSource(source: Path) =
remoteKey.asFile(source, prefix) match {
case Some(file) => FS.exists(file)
case None => ZIO.succeed(false)
}
ZIO
.foreach(sources.paths)(existsInSource)
.map(lb => lb.exists(l => l))
}
}

View file

@ -4,14 +4,14 @@ import java.nio.file.Path
import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain.{HashType, MD5Hash} import net.kemitix.thorp.domain.{HashType, MD5Hash}
import net.kemitix.thorp.storage.api.HashService import net.kemitix.thorp.filesystem.FileSystem
import zio.Task import zio.TaskR
case class SimpleHashService() extends HashService { case class SimpleHashService() extends HashService {
override def hashLocalObject( override def hashLocalObject(
path: Path path: Path
): Task[Map[HashType, MD5Hash]] = ): TaskR[FileSystem, Map[HashType, MD5Hash]] =
for { for {
md5 <- MD5HashGenerator.md5File(path) md5 <- MD5HashGenerator.md5File(path)
} yield Map(MD5 -> md5) } yield Map(MD5 -> md5)

View file

@ -19,13 +19,14 @@ trait SyncLogging {
bucket <- getBucket bucket <- getBucket
prefix <- getPrefix prefix <- getPrefix
sources <- getSources sources <- getSources
_ <- putMessageLn(ConsoleOut.ValidConfig(bucket, prefix, sources)) _ <- Console.putMessageLn(ConsoleOut.ValidConfig(bucket, prefix, sources))
} yield () } yield ()
def logFileScan: ZIO[Config with Console, Nothing, Unit] = def logFileScan: ZIO[Config with Console, Nothing, Unit] =
for { for {
sources <- getSources sources <- getSources
_ <- putStrLn(s"Scanning local files: ${sources.paths.mkString(", ")}...") _ <- Console.putStrLn(
s"Scanning local files: ${sources.paths.mkString(", ")}...")
} yield () } yield ()
def logRunFinished( def logRunFinished(
@ -33,11 +34,11 @@ trait SyncLogging {
): ZIO[Console, Nothing, Unit] = { ): ZIO[Console, Nothing, Unit] = {
val counters = actions.foldLeft(Counters())(countActivities) val counters = actions.foldLeft(Counters())(countActivities)
for { for {
_ <- putStrLn(eraseToEndOfScreen) _ <- Console.putStrLn(eraseToEndOfScreen)
_ <- putStrLn(s"Uploaded ${counters.uploaded} files") _ <- Console.putStrLn(s"Uploaded ${counters.uploaded} files")
_ <- putStrLn(s"Copied ${counters.copied} files") _ <- Console.putStrLn(s"Copied ${counters.copied} files")
_ <- putStrLn(s"Deleted ${counters.deleted} files") _ <- Console.putStrLn(s"Deleted ${counters.deleted} files")
_ <- putStrLn(s"Errors ${counters.errors}") _ <- Console.putStrLn(s"Errors ${counters.errors}")
} yield () } yield ()
} }

View file

@ -31,32 +31,34 @@ trait ThorpArchive {
event match { event match {
case UploadQueueEvent(remoteKey, _) => case UploadQueueEvent(remoteKey, _) =>
for { for {
_ <- TaskR.when(batchMode)(putStrLn(s"Uploaded: ${remoteKey.key}")) _ <- TaskR.when(batchMode)(
Console.putStrLn(s"Uploaded: ${remoteKey.key}"))
_ <- TaskR.when(!batchMode)( _ <- TaskR.when(!batchMode)(
putStrLn( Console.putStrLn(
s"${GREEN}Uploaded:$RESET ${remoteKey.key}$eraseToEndOfScreen")) s"${GREEN}Uploaded:$RESET ${remoteKey.key}$eraseToEndOfScreen"))
} yield () } yield ()
case CopyQueueEvent(sourceKey, targetKey) => case CopyQueueEvent(sourceKey, targetKey) =>
for { for {
_ <- TaskR.when(batchMode)( _ <- TaskR.when(batchMode)(
putStrLn(s"Copied: ${sourceKey.key} => ${targetKey.key}")) Console.putStrLn(s"Copied: ${sourceKey.key} => ${targetKey.key}"))
_ <- TaskR.when(!batchMode)( _ <- TaskR.when(!batchMode)(
putStrLn( Console.putStrLn(
s"${GREEN}Copied:$RESET ${sourceKey.key} => ${targetKey.key}$eraseToEndOfScreen") s"${GREEN}Copied:$RESET ${sourceKey.key} => ${targetKey.key}$eraseToEndOfScreen")
) )
} yield () } yield ()
case DeleteQueueEvent(remoteKey) => case DeleteQueueEvent(remoteKey) =>
for { for {
_ <- TaskR.when(batchMode)(putStrLn(s"Deleted: $remoteKey")) _ <- TaskR.when(batchMode)(Console.putStrLn(s"Deleted: $remoteKey"))
_ <- TaskR.when(!batchMode)( _ <- TaskR.when(!batchMode)(
putStrLn( Console.putStrLn(
s"${GREEN}Deleted:$RESET ${remoteKey.key}$eraseToEndOfScreen")) s"${GREEN}Deleted:$RESET ${remoteKey.key}$eraseToEndOfScreen"))
} yield () } yield ()
case ErrorQueueEvent(action, _, e) => case ErrorQueueEvent(action, _, e) =>
for { for {
_ <- TaskR.when(batchMode)( _ <- TaskR.when(batchMode)(
putStrLn(s"${action.name} failed: ${action.keys}: ${e.getMessage}")) Console.putStrLn(
_ <- TaskR.when(!batchMode)(putStrLn( s"${action.name} failed: ${action.keys}: ${e.getMessage}"))
_ <- TaskR.when(!batchMode)(Console.putStrLn(
s"${RED}ERROR:$RESET ${action.name} ${action.keys}: ${e.getMessage}$eraseToEndOfScreen")) s"${RED}ERROR:$RESET ${action.name} ${action.keys}: ${e.getMessage}$eraseToEndOfScreen"))
} yield () } yield ()
case DoNothingQueueEvent(_) => TaskR(()) case DoNothingQueueEvent(_) => TaskR(())

View file

@ -4,7 +4,6 @@ import net.kemitix.thorp.console._
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage._
import net.kemitix.thorp.storage.api.Storage import net.kemitix.thorp.storage.api.Storage
import zio.{Task, TaskR} import zio.{Task, TaskR}
@ -26,12 +25,12 @@ case class UnversionedMirrorArchive(
} yield event } yield event
case ToCopy(bucket, sourceKey, hash, targetKey, _) => case ToCopy(bucket, sourceKey, hash, targetKey, _) =>
for { for {
event <- copyObject(bucket, sourceKey, hash, targetKey) event <- Storage.copy(bucket, sourceKey, hash, targetKey)
_ <- logEvent(event, batchMode) _ <- logEvent(event, batchMode)
} yield event } yield event
case ToDelete(bucket, remoteKey, _) => case ToDelete(bucket, remoteKey, _) =>
for { for {
event <- deleteObject(bucket, remoteKey) event <- Storage.delete(bucket, remoteKey)
_ <- logEvent(event, batchMode) _ <- logEvent(event, batchMode)
} yield event } yield event
case DoNothing(_, remoteKey, _) => case DoNothing(_, remoteKey, _) =>
@ -44,7 +43,8 @@ case class UnversionedMirrorArchive(
bucket: Bucket, bucket: Bucket,
localFile: LocalFile localFile: LocalFile
) = ) =
upload(localFile, Storage.upload(
localFile,
bucket, bucket,
batchMode, batchMode,
UploadEventListener(localFile, index, syncTotals, totalBytesSoFar), UploadEventListener(localFile, index, syncTotals, totalBytesSoFar),

View file

@ -6,6 +6,7 @@ import net.kemitix.thorp.config._
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToUpload} import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToUpload}
import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem.FileSystem
import org.scalatest.FunSpec import org.scalatest.FunSpec
import zio.DefaultRuntime import zio.DefaultRuntime
@ -163,8 +164,8 @@ class ActionGeneratorSuite extends FunSpec {
input: S3MetaData, input: S3MetaData,
previousActions: Stream[Action] previousActions: Stream[Action]
) = { ) = {
type TestEnv = Config type TestEnv = Config with FileSystem
val testEnv: TestEnv = new Config.Live {} val testEnv: TestEnv = new Config.Live with FileSystem.Live {}
def testProgram = def testProgram =
for { for {

View file

@ -7,6 +7,7 @@ import net.kemitix.thorp.config.{
ConfigurationBuilder ConfigurationBuilder
} }
import net.kemitix.thorp.domain.Sources import net.kemitix.thorp.domain.Sources
import net.kemitix.thorp.filesystem.FileSystem
import org.scalatest.FunSpec import org.scalatest.FunSpec
import zio.DefaultRuntime import zio.DefaultRuntime
@ -34,9 +35,10 @@ class ConfigOptionTest extends FunSpec with TemporaryFolder {
} }
private def invoke(configOptions: ConfigOptions) = { private def invoke(configOptions: ConfigOptions) = {
val runtime = new DefaultRuntime {} new DefaultRuntime {}.unsafeRunSync {
runtime.unsafeRunSync { ConfigurationBuilder
ConfigurationBuilder.buildConfig(configOptions) .buildConfig(configOptions)
.provide(FileSystem.Live)
}.toEither }.toEither
} }
} }

View file

@ -9,6 +9,7 @@ import net.kemitix.thorp.config.{
} }
import net.kemitix.thorp.domain.Filter.{Exclude, Include} import net.kemitix.thorp.domain.Filter.{Exclude, Include}
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem.FileSystem
import org.scalatest.FunSpec import org.scalatest.FunSpec
import zio.DefaultRuntime import zio.DefaultRuntime
@ -167,9 +168,10 @@ class ConfigurationBuilderTest extends FunSpec with TemporaryFolder {
} }
private def invoke(configOptions: ConfigOptions) = { private def invoke(configOptions: ConfigOptions) = {
val runtime = new DefaultRuntime {} new DefaultRuntime {}.unsafeRunSync {
runtime.unsafeRunSync { ConfigurationBuilder
ConfigurationBuilder.buildConfig(configOptions) .buildConfig(configOptions)
.provide(FileSystem.Live)
}.toEither }.toEither
} }

View file

@ -3,7 +3,6 @@ package net.kemitix.thorp.core
import java.nio.file.Path import java.nio.file.Path
import net.kemitix.thorp.domain.{HashType, MD5Hash} import net.kemitix.thorp.domain.{HashType, MD5Hash}
import net.kemitix.thorp.storage.api.HashService
import zio.Task import zio.Task
case class DummyHashService(hashes: Map[Path, Map[HashType, MD5Hash]]) case class DummyHashService(hashes: Map[Path, Map[HashType, MD5Hash]])

View file

@ -6,7 +6,8 @@ import net.kemitix.thorp.config._
import net.kemitix.thorp.console._ import net.kemitix.thorp.console._
import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.{HashService, Storage} import net.kemitix.thorp.filesystem.FileSystem
import net.kemitix.thorp.storage.api.Storage
import org.scalatest.FunSpec import org.scalatest.FunSpec
import zio.{DefaultRuntime, Task, UIO} import zio.{DefaultRuntime, Task, UIO}
@ -54,8 +55,9 @@ class LocalFileStreamSuite extends FunSpec {
} }
private def invoke() = { private def invoke() = {
type TestEnv = Storage with Console with Config type TestEnv = Storage with Console with Config with FileSystem
val testEnv: TestEnv = new Storage.Test with Console.Test with Config.Live { val testEnv: TestEnv = new Storage.Test with Console.Test with Config.Live
with FileSystem.Live {
override def listResult: Task[S3ObjectsData] = override def listResult: Task[S3ObjectsData] =
Task.die(new NotImplementedError) Task.die(new NotImplementedError)
override def uploadResult: UIO[StorageQueueEvent] = override def uploadResult: UIO[StorageQueueEvent] =

View file

@ -4,15 +4,12 @@ import java.nio.file.Path
import net.kemitix.thorp.config.Resource import net.kemitix.thorp.config.Resource
import net.kemitix.thorp.domain.MD5HashData.{BigFile, Root} import net.kemitix.thorp.domain.MD5HashData.{BigFile, Root}
import net.kemitix.thorp.filesystem.FileSystem
import org.scalatest.FunSpec import org.scalatest.FunSpec
import zio.DefaultRuntime import zio.DefaultRuntime
class MD5HashGeneratorTest extends FunSpec { class MD5HashGeneratorTest extends FunSpec {
private val runtime = new DefaultRuntime {}
private val source = Resource(this, "upload")
describe("md5File()") { describe("md5File()") {
describe("read a small file (smaller than buffer)") { describe("read a small file (smaller than buffer)") {
val path = Resource(this, "upload/root-file").toPath val path = Resource(this, "upload/root-file").toPath
@ -32,12 +29,13 @@ class MD5HashGeneratorTest extends FunSpec {
} }
} }
def invoke(path: Path) = { def invoke(path: Path) =
runtime.unsafeRunSync { new DefaultRuntime {}.unsafeRunSync {
MD5HashGenerator.md5File(path) MD5HashGenerator
.md5File(path)
.provide(testEnv)
}.toEither }.toEither
} }
}
describe("md5FileChunk") { describe("md5FileChunk") {
describe("read chunks of file") { describe("read chunks of file") {
@ -56,11 +54,15 @@ class MD5HashGeneratorTest extends FunSpec {
} }
} }
def invoke(path: Path, offset: Long, size: Long) = { def invoke(path: Path, offset: Long, size: Long) =
runtime.unsafeRunSync { new DefaultRuntime {}.unsafeRunSync {
MD5HashGenerator.md5FileChunk(path, offset, size) MD5HashGenerator
.md5FileChunk(path, offset, size)
.provide(testEnv)
}.toEither }.toEither
} }
}
type TestEnv = FileSystem
val testEnv: TestEnv = new FileSystem.Live {}
} }

View file

@ -8,6 +8,7 @@ import net.kemitix.thorp.config.{
ParseConfigFile, ParseConfigFile,
Resource Resource
} }
import net.kemitix.thorp.filesystem.FileSystem
import org.scalatest.FunSpec import org.scalatest.FunSpec
import zio.DefaultRuntime import zio.DefaultRuntime
@ -44,9 +45,10 @@ class ParseConfigFileTest extends FunSpec {
} }
private def invoke(filename: Path) = { private def invoke(filename: Path) = {
val runtime = new DefaultRuntime {} new DefaultRuntime {}.unsafeRunSync {
runtime.unsafeRunSync { ParseConfigFile
ParseConfigFile.parseFile(filename) .parseFile(filename)
.provide(FileSystem.Live)
}.toEither }.toEither
} }
} }

View file

@ -8,7 +8,8 @@ import net.kemitix.thorp.console._
import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload} import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.{HashService, Storage} import net.kemitix.thorp.filesystem._
import net.kemitix.thorp.storage.api.Storage
import org.scalatest.FreeSpec import org.scalatest.FreeSpec
import zio.{DefaultRuntime, Task, UIO} import zio.{DefaultRuntime, Task, UIO}
@ -42,7 +43,8 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val result = val result =
invoke(hashService, invoke(hashService,
options(source), options(source),
UIO.succeed(emptyS3ObjectData)) UIO.succeed(emptyS3ObjectData),
UIO.succeed(Map(file.toPath -> file)))
assertResult(expected)(result) assertResult(expected)(result)
}) })
} }
@ -65,7 +67,9 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val result = val result =
invoke(hashService, invoke(hashService,
options(source), options(source),
UIO.succeed(s3ObjectsData)) UIO.succeed(s3ObjectsData),
UIO.succeed(Map(aFile.toPath -> aFile,
anOtherFile.toPath -> anOtherFile)))
assertResult(expected)(result) assertResult(expected)(result)
}) })
} }
@ -87,7 +91,8 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val result = val result =
invoke(hashService, invoke(hashService,
options(source), options(source),
UIO.succeed(s3ObjectsData)) UIO.succeed(s3ObjectsData),
UIO.succeed(Map(file.toPath -> file)))
assertResult(expected)(result) assertResult(expected)(result)
}) })
} }
@ -110,7 +115,8 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val result = val result =
invoke(hashService, invoke(hashService,
options(source), options(source),
UIO.succeed(s3ObjectsData)) UIO.succeed(s3ObjectsData),
UIO.succeed(Map(file.toPath -> file)))
assertResult(expected)(result) assertResult(expected)(result)
}) })
} }
@ -130,7 +136,8 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val result = val result =
invoke(hashService, invoke(hashService,
options(source), options(source),
UIO.succeed(s3ObjectsData)) UIO.succeed(s3ObjectsData),
UIO.succeed(Map(file.toPath -> file)))
assertResult(expected)(result) assertResult(expected)(result)
}) })
} }
@ -153,7 +160,10 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
byKey = Map(remoteKey -> HashModified(hash, lastModified)) byKey = Map(remoteKey -> HashModified(hash, lastModified))
) )
val result = val result =
invoke(hashService, options(source), UIO.succeed(s3ObjectsData)) invoke(hashService,
options(source),
UIO.succeed(s3ObjectsData),
UIO.succeed(Map(file.toPath -> file)))
assertResult(expected)(result) assertResult(expected)(result)
}) })
} }
@ -168,7 +178,10 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
byKey = Map(remoteKey -> HashModified(hash, lastModified)) byKey = Map(remoteKey -> HashModified(hash, lastModified))
) )
val result = val result =
invoke(hashService, options(source), UIO.succeed(s3ObjectsData)) invoke(hashService,
options(source),
UIO.succeed(s3ObjectsData),
UIO.succeed(Map.empty))
assertResult(expected)(result) assertResult(expected)(result)
}) })
} }
@ -203,9 +216,14 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
toUpload(remoteKey1, hash1, firstSource, fileInFirstSource) toUpload(remoteKey1, hash1, firstSource, fileInFirstSource)
)) ))
val result = val result =
invoke(hashService, invoke(
hashService,
options(firstSource)(secondSource), options(firstSource)(secondSource),
UIO.succeed(emptyS3ObjectData)) UIO.succeed(emptyS3ObjectData),
UIO.succeed(
Map(fileInFirstSource.toPath -> fileInFirstSource,
fileInSecondSource.toPath -> fileInSecondSource))
)
assertResult(expected)(result) assertResult(expected)(result)
}) })
}) })
@ -224,9 +242,14 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val expected = Right(List( val expected = Right(List(
toUpload(remoteKey1, hash1, firstSource, fileInFirstSource))) toUpload(remoteKey1, hash1, firstSource, fileInFirstSource)))
val result = val result =
invoke(hashService, invoke(
hashService,
options(firstSource)(secondSource), options(firstSource)(secondSource),
UIO.succeed(emptyS3ObjectData)) UIO.succeed(emptyS3ObjectData),
UIO.succeed(
Map(fileInFirstSource.toPath -> fileInFirstSource,
fileInSecondSource.toPath -> fileInSecondSource))
)
assertResult(expected)(result) assertResult(expected)(result)
}) })
}) })
@ -247,7 +270,9 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val result = val result =
invoke(hashService, invoke(hashService,
options(firstSource)(secondSource), options(firstSource)(secondSource),
UIO.succeed(s3ObjectData)) UIO.succeed(s3ObjectData),
UIO.succeed(
Map(fileInSecondSource.toPath -> fileInSecondSource)))
assertResult(expected)(result) assertResult(expected)(result)
}) })
}) })
@ -268,7 +293,9 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val result = val result =
invoke(hashService, invoke(hashService,
options(firstSource)(secondSource), options(firstSource)(secondSource),
UIO.succeed(s3ObjectData)) UIO.succeed(s3ObjectData),
UIO.succeed(
Map(fileInFirstSource.toPath -> fileInFirstSource)))
assertResult(expected)(result) assertResult(expected)(result)
}) })
}) })
@ -284,7 +311,8 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
val result = val result =
invoke(hashService, invoke(hashService,
options(firstSource)(secondSource), options(firstSource)(secondSource),
UIO.succeed(s3ObjectData)) UIO.succeed(s3ObjectData),
UIO.succeed(Map.empty))
assertResult(expected)(result) assertResult(expected)(result)
}) })
}) })
@ -295,7 +323,10 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
def md5Hash(file: File): MD5Hash = { def md5Hash(file: File): MD5Hash = {
new DefaultRuntime {} new DefaultRuntime {}
.unsafeRunSync { .unsafeRunSync {
hashService.hashLocalObject(file.toPath).map(_.get(MD5)) hashService
.hashLocalObject(file.toPath)
.map(_.get(MD5))
.provide(FileSystem.Live)
} }
.toEither .toEither
.toOption .toOption
@ -327,10 +358,12 @@ class PlanBuilderTest extends FreeSpec with TemporaryFolder {
private def invoke( private def invoke(
hashService: HashService, hashService: HashService,
configOptions: ConfigOptions, configOptions: ConfigOptions,
result: Task[S3ObjectsData] result: Task[S3ObjectsData],
files: Task[Map[Path, File]]
) = { ) = {
type TestEnv = Storage with Console with Config type TestEnv = Storage with Console with Config with FileSystem
val testEnv: TestEnv = new Storage.Test with Console.Test with Config.Live { val testEnv: TestEnv = new Storage.Test with Console.Test with Config.Live
with FileSystem.Live {
override def listResult: Task[S3ObjectsData] = result override def listResult: Task[S3ObjectsData] = result
override def uploadResult: UIO[StorageQueueEvent] = override def uploadResult: UIO[StorageQueueEvent] =
Task.die(new NotImplementedError) Task.die(new NotImplementedError)

View file

@ -7,16 +7,6 @@ final case class RemoteKey(
key: String key: String
) { ) {
def isMissingLocally(
sources: Sources,
prefix: RemoteKey
): Boolean =
!sources.paths.exists(source =>
asFile(source, prefix) match {
case Some(file) => file.exists
case None => false
})
def asFile( def asFile(
source: Path, source: Path,
prefix: RemoteKey prefix: RemoteKey
@ -39,4 +29,5 @@ final case class RemoteKey(
object RemoteKey { object RemoteKey {
val key: SimpleLens[RemoteKey, String] = val key: SimpleLens[RemoteKey, String] =
SimpleLens[RemoteKey, String](_.key, b => a => b.copy(key = a)) SimpleLens[RemoteKey, String](_.key, b => a => b.copy(key = a))
} }

View file

@ -0,0 +1,19 @@
package net.kemitix.thorp.filesystem
import java.io.{File, FileInputStream}
import zio.{TaskR, ZIO, ZManaged}
object FS {
final def exists(file: File): ZIO[FileSystem, Throwable, Boolean] =
ZIO.accessM(_.filesystem fileExists file)
final def open(file: File, offset: Long = 0)
: TaskR[FileSystem, ZManaged[FileSystem, Throwable, FileInputStream]] =
ZIO.accessM(_.filesystem openManagedFileInputStream (file, offset))
final def lines(file: File): TaskR[FileSystem, List[String]] =
ZIO.accessM(_.filesystem fileLines (file))
}

View file

@ -0,0 +1,79 @@
package net.kemitix.thorp.filesystem
import java.io.{File, FileInputStream}
import java.nio.file.{Files, Path}
import java.util.stream
import zio.{Task, TaskR, UIO, ZIO, ZManaged}
import scala.collection.JavaConverters._
trait FileSystem {
val filesystem: FileSystem.Service
}
object FileSystem {
trait Service {
def fileExists(file: File): ZIO[FileSystem, Throwable, Boolean]
def openManagedFileInputStream(file: File, offset: Long = 0L)
: TaskR[FileSystem, ZManaged[Any, Throwable, FileInputStream]]
def fileLines(file: File): TaskR[FileSystem, List[String]]
}
trait Live extends FileSystem {
override val filesystem: Service = new Service {
override def fileExists(
file: File
): ZIO[FileSystem, Throwable, Boolean] = ZIO(file.exists)
override def openManagedFileInputStream(file: File, offset: Long)
: TaskR[FileSystem, ZManaged[Any, Throwable, FileInputStream]] = {
def acquire =
Task {
val stream = new FileInputStream(file)
stream skip offset
stream
}
def release(fis: FileInputStream) =
UIO(fis.close())
ZIO(ZManaged.make(acquire)(release))
}
override def fileLines(file: File): TaskR[FileSystem, List[String]] = {
def acquire = ZIO(Files.lines(file.toPath))
def release(lines: stream.Stream[String]) =
ZIO.effectTotal(lines.close())
def use(lines: stream.Stream[String]) =
ZIO.effectTotal(lines.iterator.asScala.toList)
ZIO.bracket(acquire)(release)(use)
}
}
}
object Live extends Live
trait Test extends FileSystem {
val fileExistsResultMap: Task[Map[Path, File]]
val fileLinesResult: Task[List[String]]
val managedFileInputStream: Task[ZManaged[Any, Throwable, FileInputStream]]
override val filesystem: Service = new Service {
override def fileExists(file: File): ZIO[FileSystem, Throwable, Boolean] =
fileExistsResultMap.map(m => m.keys.exists(_ equals file.toPath))
override def openManagedFileInputStream(file: File, offset: Long)
: TaskR[FileSystem, ZManaged[Any, Throwable, FileInputStream]] =
managedFileInputStream
override def fileLines(file: File): TaskR[FileSystem, List[String]] =
fileLinesResult
}
}
}

View file

@ -1,15 +0,0 @@
package net.kemitix.thorp.storage.api
import java.nio.file.Path
import net.kemitix.thorp.domain.{HashType, MD5Hash}
import zio.Task
/**
* Creates one, or more, hashes for local objects.
*/
trait HashService {
def hashLocalObject(path: Path): Task[Map[HashType, MD5Hash]]
}

View file

@ -92,4 +92,33 @@ object Storage {
Task.die(new NotImplementedError) Task.die(new NotImplementedError)
} }
final def list(
bucket: Bucket,
prefix: RemoteKey): TaskR[Storage with Console, S3ObjectsData] =
ZIO.accessM(_.storage listObjects (bucket, prefix))
final def upload(
localFile: LocalFile,
bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener,
tryCount: Int
): ZIO[Storage, Nothing, StorageQueueEvent] =
ZIO.accessM(
_.storage upload (localFile, bucket, batchMode, uploadEventListener, tryCount))
final def copy(
bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey
): ZIO[Storage, Nothing, StorageQueueEvent] =
ZIO.accessM(_.storage copy (bucket, sourceKey, hash, targetKey))
final def delete(
bucket: Bucket,
remoteKey: RemoteKey
): ZIO[Storage, Nothing, StorageQueueEvent] =
ZIO.accessM(_.storage delete (bucket, remoteKey))
} }

View file

@ -1,42 +0,0 @@
package net.kemitix.thorp
import net.kemitix.thorp.console.Console
import net.kemitix.thorp.domain._
import net.kemitix.thorp.storage.api.Storage
import zio.{TaskR, ZIO}
package object storage {
final val storageService: ZIO[Storage, Nothing, Storage.Service] =
ZIO.access(_.storage)
final def listObjects(
bucket: Bucket,
prefix: RemoteKey): TaskR[Storage with Console, S3ObjectsData] =
ZIO.accessM(_.storage listObjects (bucket, prefix))
final def upload(
localFile: LocalFile,
bucket: Bucket,
batchMode: Boolean,
uploadEventListener: UploadEventListener,
tryCount: Int
): ZIO[Storage, Nothing, StorageQueueEvent] =
ZIO.accessM(
_.storage upload (localFile, bucket, batchMode, uploadEventListener, tryCount))
final def copyObject(
bucket: Bucket,
sourceKey: RemoteKey,
hash: MD5Hash,
targetKey: RemoteKey
): ZIO[Storage, Nothing, StorageQueueEvent] =
ZIO.accessM(_.storage copy (bucket, sourceKey, hash, targetKey))
final def deleteObject(
bucket: Bucket,
remoteKey: RemoteKey
): ZIO[Storage, Nothing, StorageQueueEvent] =
ZIO.accessM(_.storage delete (bucket, remoteKey))
}

View file

@ -7,20 +7,19 @@ import com.amazonaws.services.s3.transfer.TransferManagerConfiguration
import com.amazonaws.services.s3.transfer.internal.TransferManagerUtils import com.amazonaws.services.s3.transfer.internal.TransferManagerUtils
import net.kemitix.thorp.core.MD5HashGenerator import net.kemitix.thorp.core.MD5HashGenerator
import net.kemitix.thorp.domain.MD5Hash import net.kemitix.thorp.domain.MD5Hash
import zio.Task import net.kemitix.thorp.filesystem.FileSystem
import zio.{TaskR, ZIO}
trait ETagGenerator { trait ETagGenerator {
def eTag( def eTag(
path: Path path: Path
): Task[String] = { ): TaskR[FileSystem, String] = {
val partSize = calculatePartSize(path) val partSize = calculatePartSize(path)
val parts = numParts(path.toFile.length, partSize) val parts = numParts(path.toFile.length, partSize)
Task ZIO
.foreach(partsIndex(parts)) { chunkNumber => .foreach(partsIndex(parts))(digestChunk(path, partSize))
digestChunk(path, partSize)(chunkNumber) .map(concatenateDigests)
}
.map(parts => concatenateDigests(parts))
.map(MD5HashGenerator.hex) .map(MD5HashGenerator.hex)
.map(hash => s"$hash-$parts") .map(hash => s"$hash-$parts")
} }
@ -48,19 +47,17 @@ trait ETagGenerator {
fullParts + incompletePart fullParts + incompletePart
} }
def digestChunk( private def digestChunk(
path: Path, path: Path,
chunkSize: Long chunkSize: Long
)( )(chunkNumber: Long) =
chunkNumber: Long
): Task[Array[Byte]] =
hashChunk(path, chunkNumber, chunkSize).map(_.digest) hashChunk(path, chunkNumber, chunkSize).map(_.digest)
def hashChunk( def hashChunk(
path: Path, path: Path,
chunkNumber: Long, chunkNumber: Long,
chunkSize: Long chunkSize: Long
): Task[MD5Hash] = ): TaskR[FileSystem, MD5Hash] =
MD5HashGenerator.md5FileChunk(path, chunkNumber * chunkSize, chunkSize) MD5HashGenerator.md5FileChunk(path, chunkNumber * chunkSize, chunkSize)
def offsets( def offsets(

View file

@ -5,6 +5,6 @@ import zio.TaskR
trait ListerLogger { trait ListerLogger {
def logFetchBatch: TaskR[Console, Unit] = def logFetchBatch: TaskR[Console, Unit] =
putStrLn("Fetching remote summaries...") Console.putStrLn("Fetching remote summaries...")
} }
object ListerLogger extends ListerLogger object ListerLogger extends ListerLogger

View file

@ -2,11 +2,11 @@ package net.kemitix.thorp.storage.aws
import java.nio.file.Path import java.nio.file.Path
import net.kemitix.thorp.core.MD5HashGenerator import net.kemitix.thorp.core.{HashService, MD5HashGenerator}
import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain.{HashType, MD5Hash} import net.kemitix.thorp.domain.{HashType, MD5Hash}
import net.kemitix.thorp.storage.api.HashService import net.kemitix.thorp.filesystem.FileSystem
import zio.Task import zio.TaskR
trait S3HashService extends HashService { trait S3HashService extends HashService {
@ -18,7 +18,7 @@ trait S3HashService extends HashService {
*/ */
override def hashLocalObject( override def hashLocalObject(
path: Path path: Path
): Task[Map[HashType, MD5Hash]] = ): TaskR[FileSystem, Map[HashType, MD5Hash]] =
for { for {
md5 <- MD5HashGenerator.md5File(path) md5 <- MD5HashGenerator.md5File(path)
etag <- ETagGenerator.eTag(path).map(MD5Hash(_)) etag <- ETagGenerator.eTag(path).map(MD5Hash(_))

View file

@ -4,6 +4,7 @@ import java.nio.file.Path
import com.amazonaws.services.s3.transfer.TransferManagerConfiguration import com.amazonaws.services.s3.transfer.TransferManagerConfiguration
import net.kemitix.thorp.config.Resource import net.kemitix.thorp.config.Resource
import net.kemitix.thorp.filesystem.FileSystem
import org.scalatest.FunSpec import org.scalatest.FunSpec
import zio.DefaultRuntime import zio.DefaultRuntime
@ -44,8 +45,10 @@ class ETagGeneratorTest extends FunSpec {
} }
} }
def invoke(path: Path, index: Long, size: Long) = def invoke(path: Path, index: Long, size: Long) =
runtime.unsafeRunSync { new DefaultRuntime {}.unsafeRunSync {
ETagGenerator.hashChunk(path, index, size) ETagGenerator
.hashChunk(path, index, size)
.provide(FileSystem.Live)
}.toEither }.toEither
} }
@ -56,8 +59,10 @@ class ETagGeneratorTest extends FunSpec {
assertResult(Right(expected))(result) assertResult(Right(expected))(result)
} }
def invoke(path: Path) = def invoke(path: Path) =
runtime.unsafeRunSync { new DefaultRuntime {}.unsafeRunSync {
ETagGenerator.eTag(path) ETagGenerator
.eTag(path)
.provide(FileSystem.Live)
}.toEither }.toEither
} }