Update zio to 1.0.0-RC11 (#148)
* Update zio to 1.0.0-RC11 * Update to be compatible with ZIO-1.0.0-RC11
This commit is contained in:
parent
8fad680a96
commit
9752d96ab4
19 changed files with 78 additions and 81 deletions
|
@ -54,7 +54,7 @@ val awsSdkDependencies = Seq(
|
||||||
)
|
)
|
||||||
val zioDependencies = Seq(
|
val zioDependencies = Seq(
|
||||||
libraryDependencies ++= Seq (
|
libraryDependencies ++= Seq (
|
||||||
"dev.zio" %% "zio" % "1.0.0-RC10-1"
|
"dev.zio" %% "zio" % "1.0.0-RC11"
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,7 @@ package net.kemitix.thorp.config
|
||||||
import java.io.File
|
import java.io.File
|
||||||
|
|
||||||
import net.kemitix.thorp.filesystem.FileSystem
|
import net.kemitix.thorp.filesystem.FileSystem
|
||||||
import zio.{IO, TaskR, ZIO}
|
import zio.{IO, RIO, ZIO}
|
||||||
|
|
||||||
trait ParseConfigFile {
|
trait ParseConfigFile {
|
||||||
|
|
||||||
|
@ -14,7 +14,7 @@ trait ParseConfigFile {
|
||||||
IO.fail(List(ConfigValidation.ErrorReadingFile(file, h.getMessage))))
|
IO.fail(List(ConfigValidation.ErrorReadingFile(file, h.getMessage))))
|
||||||
|
|
||||||
private def readLines(file: File)(
|
private def readLines(file: File)(
|
||||||
exists: Boolean): TaskR[FileSystem, Seq[String]] =
|
exists: Boolean): RIO[FileSystem, Seq[String]] =
|
||||||
if (exists) FileSystem.lines(file)
|
if (exists) FileSystem.lines(file)
|
||||||
else ZIO.succeed(Seq.empty)
|
else ZIO.succeed(Seq.empty)
|
||||||
|
|
||||||
|
|
|
@ -7,29 +7,28 @@ import net.kemitix.thorp.config.Config
|
||||||
import net.kemitix.thorp.core.hasher.Hasher
|
import net.kemitix.thorp.core.hasher.Hasher
|
||||||
import net.kemitix.thorp.domain.Sources
|
import net.kemitix.thorp.domain.Sources
|
||||||
import net.kemitix.thorp.filesystem.FileSystem
|
import net.kemitix.thorp.filesystem.FileSystem
|
||||||
import zio.{Task, TaskR, ZIO}
|
import zio.{Task, RIO, ZIO}
|
||||||
|
|
||||||
object LocalFileStream {
|
object LocalFileStream {
|
||||||
|
|
||||||
def findFiles(
|
def findFiles(
|
||||||
source: Path
|
source: Path
|
||||||
): TaskR[Config with FileSystem with Hasher, LocalFiles] = {
|
): RIO[Config with FileSystem with Hasher, LocalFiles] = {
|
||||||
|
|
||||||
def recurseIntoSubDirectories(
|
def recurseIntoSubDirectories(
|
||||||
path: Path): TaskR[Config with FileSystem with Hasher, LocalFiles] =
|
path: Path): RIO[Config with FileSystem with Hasher, LocalFiles] =
|
||||||
path.toFile match {
|
path.toFile match {
|
||||||
case f if f.isDirectory => loop(path)
|
case f if f.isDirectory => loop(path)
|
||||||
case _ => localFile(path)
|
case _ => localFile(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
def recurse(paths: Stream[Path])
|
def recurse(paths: Stream[Path])
|
||||||
: TaskR[Config with FileSystem with Hasher, LocalFiles] =
|
: RIO[Config with FileSystem with Hasher, LocalFiles] =
|
||||||
for {
|
for {
|
||||||
recursed <- ZIO.foreach(paths)(path => recurseIntoSubDirectories(path))
|
recursed <- ZIO.foreach(paths)(path => recurseIntoSubDirectories(path))
|
||||||
} yield LocalFiles.reduce(recursed.toStream)
|
} yield LocalFiles.reduce(recursed.toStream)
|
||||||
|
|
||||||
def loop(
|
def loop(path: Path): RIO[Config with FileSystem with Hasher, LocalFiles] =
|
||||||
path: Path): TaskR[Config with FileSystem with Hasher, LocalFiles] =
|
|
||||||
dirPaths(path) >>= recurse
|
dirPaths(path) >>= recurse
|
||||||
|
|
||||||
loop(source)
|
loop(source)
|
||||||
|
@ -40,7 +39,7 @@ object LocalFileStream {
|
||||||
|
|
||||||
private def includedDirPaths(paths: Stream[Path]) =
|
private def includedDirPaths(paths: Stream[Path]) =
|
||||||
for {
|
for {
|
||||||
flaggedPaths <- TaskR.foreach(paths)(path =>
|
flaggedPaths <- RIO.foreach(paths)(path =>
|
||||||
isIncluded(path).map((path, _)))
|
isIncluded(path).map((path, _)))
|
||||||
} yield
|
} yield
|
||||||
flaggedPaths.toStream
|
flaggedPaths.toStream
|
||||||
|
|
|
@ -7,13 +7,13 @@ import net.kemitix.thorp.core.hasher.Hasher
|
||||||
import net.kemitix.thorp.domain._
|
import net.kemitix.thorp.domain._
|
||||||
import net.kemitix.thorp.filesystem.FileSystem
|
import net.kemitix.thorp.filesystem.FileSystem
|
||||||
import net.kemitix.thorp.storage.api.Storage
|
import net.kemitix.thorp.storage.api.Storage
|
||||||
import zio.{TaskR, ZIO}
|
import zio.{RIO, ZIO}
|
||||||
|
|
||||||
object PlanBuilder {
|
object PlanBuilder {
|
||||||
|
|
||||||
def createPlan
|
def createPlan
|
||||||
: TaskR[Storage with Console with Config with FileSystem with Hasher,
|
: RIO[Storage with Console with Config with FileSystem with Hasher,
|
||||||
SyncPlan] =
|
SyncPlan] =
|
||||||
SyncLogging.logRunStart *> buildPlan
|
SyncLogging.logRunStart *> buildPlan
|
||||||
|
|
||||||
private def buildPlan =
|
private def buildPlan =
|
||||||
|
|
|
@ -4,19 +4,19 @@ import java.nio.file.Path
|
||||||
|
|
||||||
import net.kemitix.thorp.domain.{RemoteKey, Sources}
|
import net.kemitix.thorp.domain.{RemoteKey, Sources}
|
||||||
import net.kemitix.thorp.filesystem.FileSystem
|
import net.kemitix.thorp.filesystem.FileSystem
|
||||||
import zio.{TaskR, ZIO}
|
import zio.{RIO, ZIO}
|
||||||
|
|
||||||
object Remote {
|
object Remote {
|
||||||
|
|
||||||
def isMissingLocally(sources: Sources, prefix: RemoteKey)(
|
def isMissingLocally(sources: Sources, prefix: RemoteKey)(
|
||||||
remoteKey: RemoteKey
|
remoteKey: RemoteKey
|
||||||
): TaskR[FileSystem, Boolean] =
|
): RIO[FileSystem, Boolean] =
|
||||||
existsLocally(sources, prefix)(remoteKey)
|
existsLocally(sources, prefix)(remoteKey)
|
||||||
.map(exists => !exists)
|
.map(exists => !exists)
|
||||||
|
|
||||||
def existsLocally(sources: Sources, prefix: RemoteKey)(
|
def existsLocally(sources: Sources, prefix: RemoteKey)(
|
||||||
remoteKey: RemoteKey
|
remoteKey: RemoteKey
|
||||||
): TaskR[FileSystem, Boolean] = {
|
): RIO[FileSystem, Boolean] = {
|
||||||
def existsInSource(source: Path) =
|
def existsInSource(source: Path) =
|
||||||
RemoteKey.asFile(source, prefix)(remoteKey) match {
|
RemoteKey.asFile(source, prefix)(remoteKey) match {
|
||||||
case Some(file) => FileSystem.exists(file)
|
case Some(file) => FileSystem.exists(file)
|
||||||
|
|
|
@ -11,17 +11,17 @@ import net.kemitix.thorp.console._
|
||||||
import net.kemitix.thorp.domain.StorageQueueEvent
|
import net.kemitix.thorp.domain.StorageQueueEvent
|
||||||
import net.kemitix.thorp.domain.StorageQueueEvent._
|
import net.kemitix.thorp.domain.StorageQueueEvent._
|
||||||
import net.kemitix.thorp.storage.api.Storage
|
import net.kemitix.thorp.storage.api.Storage
|
||||||
import zio.{TaskR, ZIO}
|
import zio.{RIO, ZIO}
|
||||||
|
|
||||||
trait ThorpArchive {
|
trait ThorpArchive {
|
||||||
|
|
||||||
def update(
|
def update(
|
||||||
sequencedAction: SequencedAction,
|
sequencedAction: SequencedAction,
|
||||||
totalBytesSoFar: Long
|
totalBytesSoFar: Long
|
||||||
): TaskR[Storage with Console with Config, StorageQueueEvent]
|
): RIO[Storage with Console with Config, StorageQueueEvent]
|
||||||
|
|
||||||
def logEvent(
|
def logEvent(
|
||||||
event: StorageQueueEvent): TaskR[Console with Config, StorageQueueEvent] =
|
event: StorageQueueEvent): RIO[Console with Config, StorageQueueEvent] =
|
||||||
event match {
|
event match {
|
||||||
case UploadQueueEvent(remoteKey, _) =>
|
case UploadQueueEvent(remoteKey, _) =>
|
||||||
ZIO(event) <* Console.putMessageLn(UploadComplete(remoteKey))
|
ZIO(event) <* Console.putMessageLn(UploadComplete(remoteKey))
|
||||||
|
|
|
@ -6,7 +6,7 @@ import net.kemitix.thorp.core.Action.{DoNothing, ToCopy, ToDelete, ToUpload}
|
||||||
import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent
|
import net.kemitix.thorp.domain.StorageQueueEvent.DoNothingQueueEvent
|
||||||
import net.kemitix.thorp.domain._
|
import net.kemitix.thorp.domain._
|
||||||
import net.kemitix.thorp.storage.api.Storage
|
import net.kemitix.thorp.storage.api.Storage
|
||||||
import zio.{Task, TaskR}
|
import zio.{Task, RIO}
|
||||||
|
|
||||||
case class UnversionedMirrorArchive(syncTotals: SyncTotals)
|
case class UnversionedMirrorArchive(syncTotals: SyncTotals)
|
||||||
extends ThorpArchive {
|
extends ThorpArchive {
|
||||||
|
@ -14,7 +14,7 @@ case class UnversionedMirrorArchive(syncTotals: SyncTotals)
|
||||||
override def update(
|
override def update(
|
||||||
sequencedAction: SequencedAction,
|
sequencedAction: SequencedAction,
|
||||||
totalBytesSoFar: Long
|
totalBytesSoFar: Long
|
||||||
): TaskR[Storage with Console with Config, StorageQueueEvent] =
|
): RIO[Storage with Console with Config, StorageQueueEvent] =
|
||||||
sequencedAction match {
|
sequencedAction match {
|
||||||
case SequencedAction(ToUpload(bucket, localFile, _), index) =>
|
case SequencedAction(ToUpload(bucket, localFile, _), index) =>
|
||||||
doUpload(index, totalBytesSoFar, bucket, localFile) >>= logEvent
|
doUpload(index, totalBytesSoFar, bucket, localFile) >>= logEvent
|
||||||
|
|
|
@ -6,7 +6,7 @@ import java.util.concurrent.atomic.AtomicReference
|
||||||
import net.kemitix.thorp.domain.HashType.MD5
|
import net.kemitix.thorp.domain.HashType.MD5
|
||||||
import net.kemitix.thorp.domain.{HashType, MD5Hash}
|
import net.kemitix.thorp.domain.{HashType, MD5Hash}
|
||||||
import net.kemitix.thorp.filesystem.FileSystem
|
import net.kemitix.thorp.filesystem.FileSystem
|
||||||
import zio.{TaskR, ZIO}
|
import zio.{RIO, ZIO}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates one, or more, hashes for local objects.
|
* Creates one, or more, hashes for local objects.
|
||||||
|
@ -17,18 +17,18 @@ trait Hasher {
|
||||||
object Hasher {
|
object Hasher {
|
||||||
trait Service {
|
trait Service {
|
||||||
def hashObject(
|
def hashObject(
|
||||||
path: Path): TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]]
|
path: Path): RIO[Hasher with FileSystem, Map[HashType, MD5Hash]]
|
||||||
def hashObjectChunk(
|
def hashObjectChunk(
|
||||||
path: Path,
|
path: Path,
|
||||||
chunkNumber: Long,
|
chunkNumber: Long,
|
||||||
chunkSize: Long): TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]]
|
chunkSize: Long): RIO[Hasher with FileSystem, Map[HashType, MD5Hash]]
|
||||||
def hex(in: Array[Byte]): TaskR[Hasher, String]
|
def hex(in: Array[Byte]): RIO[Hasher, String]
|
||||||
def digest(in: String): TaskR[Hasher, Array[Byte]]
|
def digest(in: String): RIO[Hasher, Array[Byte]]
|
||||||
}
|
}
|
||||||
trait Live extends Hasher {
|
trait Live extends Hasher {
|
||||||
val hasher: Service = new Service {
|
val hasher: Service = new Service {
|
||||||
override def hashObject(
|
override def hashObject(
|
||||||
path: Path): TaskR[FileSystem, Map[HashType, MD5Hash]] =
|
path: Path): RIO[FileSystem, Map[HashType, MD5Hash]] =
|
||||||
for {
|
for {
|
||||||
md5 <- MD5HashGenerator.md5File(path)
|
md5 <- MD5HashGenerator.md5File(path)
|
||||||
} yield Map(MD5 -> md5)
|
} yield Map(MD5 -> md5)
|
||||||
|
@ -36,17 +36,17 @@ object Hasher {
|
||||||
override def hashObjectChunk(path: Path,
|
override def hashObjectChunk(path: Path,
|
||||||
chunkNumber: Long,
|
chunkNumber: Long,
|
||||||
chunkSize: Long)
|
chunkSize: Long)
|
||||||
: TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] =
|
: RIO[Hasher with FileSystem, Map[HashType, MD5Hash]] =
|
||||||
for {
|
for {
|
||||||
md5 <- MD5HashGenerator.md5FileChunk(path,
|
md5 <- MD5HashGenerator.md5FileChunk(path,
|
||||||
chunkNumber * chunkSize,
|
chunkNumber * chunkSize,
|
||||||
chunkSize)
|
chunkSize)
|
||||||
} yield Map(MD5 -> md5)
|
} yield Map(MD5 -> md5)
|
||||||
|
|
||||||
override def hex(in: Array[Byte]): TaskR[Hasher, String] =
|
override def hex(in: Array[Byte]): RIO[Hasher, String] =
|
||||||
ZIO(MD5HashGenerator.hex(in))
|
ZIO(MD5HashGenerator.hex(in))
|
||||||
|
|
||||||
override def digest(in: String): TaskR[Hasher, Array[Byte]] =
|
override def digest(in: String): RIO[Hasher, Array[Byte]] =
|
||||||
ZIO(MD5HashGenerator.digest(in))
|
ZIO(MD5HashGenerator.digest(in))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -60,37 +60,37 @@ object Hasher {
|
||||||
new AtomicReference(Map.empty)
|
new AtomicReference(Map.empty)
|
||||||
val hasher: Service = new Service {
|
val hasher: Service = new Service {
|
||||||
override def hashObject(
|
override def hashObject(
|
||||||
path: Path): TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] =
|
path: Path): RIO[Hasher with FileSystem, Map[HashType, MD5Hash]] =
|
||||||
ZIO(hashes.get()(path))
|
ZIO(hashes.get()(path))
|
||||||
|
|
||||||
override def hashObjectChunk(path: Path,
|
override def hashObjectChunk(path: Path,
|
||||||
chunkNumber: Long,
|
chunkNumber: Long,
|
||||||
chunkSize: Long)
|
chunkSize: Long)
|
||||||
: TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] =
|
: RIO[Hasher with FileSystem, Map[HashType, MD5Hash]] =
|
||||||
ZIO(hashChunks.get()(path)(chunkNumber))
|
ZIO(hashChunks.get()(path)(chunkNumber))
|
||||||
|
|
||||||
override def hex(in: Array[Byte]): TaskR[Hasher, String] =
|
override def hex(in: Array[Byte]): RIO[Hasher, String] =
|
||||||
ZIO(MD5HashGenerator.hex(in))
|
ZIO(MD5HashGenerator.hex(in))
|
||||||
|
|
||||||
override def digest(in: String): TaskR[Hasher, Array[Byte]] =
|
override def digest(in: String): RIO[Hasher, Array[Byte]] =
|
||||||
ZIO(MD5HashGenerator.digest(in))
|
ZIO(MD5HashGenerator.digest(in))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
object Test extends Test
|
object Test extends Test
|
||||||
|
|
||||||
final def hashObject(
|
final def hashObject(
|
||||||
path: Path): TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] =
|
path: Path): RIO[Hasher with FileSystem, Map[HashType, MD5Hash]] =
|
||||||
ZIO.accessM(_.hasher hashObject path)
|
ZIO.accessM(_.hasher hashObject path)
|
||||||
|
|
||||||
final def hashObjectChunk(
|
final def hashObjectChunk(
|
||||||
path: Path,
|
path: Path,
|
||||||
chunkNumber: Long,
|
chunkNumber: Long,
|
||||||
chunkSize: Long): TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] =
|
chunkSize: Long): RIO[Hasher with FileSystem, Map[HashType, MD5Hash]] =
|
||||||
ZIO.accessM(_.hasher hashObjectChunk (path, chunkNumber, chunkSize))
|
ZIO.accessM(_.hasher hashObjectChunk (path, chunkNumber, chunkSize))
|
||||||
|
|
||||||
final def hex(in: Array[Byte]): TaskR[Hasher, String] =
|
final def hex(in: Array[Byte]): RIO[Hasher, String] =
|
||||||
ZIO.accessM(_.hasher hex in)
|
ZIO.accessM(_.hasher hex in)
|
||||||
|
|
||||||
final def digest(in: String): TaskR[Hasher, Array[Byte]] =
|
final def digest(in: String): RIO[Hasher, Array[Byte]] =
|
||||||
ZIO.accessM(_.hasher digest in)
|
ZIO.accessM(_.hasher digest in)
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,7 @@ import java.security.MessageDigest
|
||||||
|
|
||||||
import net.kemitix.thorp.domain.MD5Hash
|
import net.kemitix.thorp.domain.MD5Hash
|
||||||
import net.kemitix.thorp.filesystem.FileSystem
|
import net.kemitix.thorp.filesystem.FileSystem
|
||||||
import zio.{Task, TaskR}
|
import zio.{Task, RIO}
|
||||||
|
|
||||||
import scala.collection.immutable.NumericRange
|
import scala.collection.immutable.NumericRange
|
||||||
|
|
||||||
|
@ -27,14 +27,14 @@ private object MD5HashGenerator {
|
||||||
md5.digest
|
md5.digest
|
||||||
}
|
}
|
||||||
|
|
||||||
def md5File(path: Path): TaskR[FileSystem, MD5Hash] =
|
def md5File(path: Path): RIO[FileSystem, MD5Hash] =
|
||||||
md5FileChunk(path, 0, path.toFile.length)
|
md5FileChunk(path, 0, path.toFile.length)
|
||||||
|
|
||||||
def md5FileChunk(
|
def md5FileChunk(
|
||||||
path: Path,
|
path: Path,
|
||||||
offset: Long,
|
offset: Long,
|
||||||
size: Long
|
size: Long
|
||||||
): TaskR[FileSystem, MD5Hash] = {
|
): RIO[FileSystem, MD5Hash] = {
|
||||||
val file = path.toFile
|
val file = path.toFile
|
||||||
val endOffset = Math.min(offset + size, file.length)
|
val endOffset = Math.min(offset + size, file.length)
|
||||||
for {
|
for {
|
||||||
|
|
|
@ -5,7 +5,7 @@ import java.io.File
|
||||||
import net.kemitix.thorp.console._
|
import net.kemitix.thorp.console._
|
||||||
import net.kemitix.thorp.domain._
|
import net.kemitix.thorp.domain._
|
||||||
import net.kemitix.thorp.storage.api.Storage
|
import net.kemitix.thorp.storage.api.Storage
|
||||||
import zio.{TaskR, UIO}
|
import zio.{RIO, UIO}
|
||||||
|
|
||||||
case class DummyStorageService(s3ObjectData: S3ObjectsData,
|
case class DummyStorageService(s3ObjectData: S3ObjectsData,
|
||||||
uploadFiles: Map[File, (RemoteKey, MD5Hash)])
|
uploadFiles: Map[File, (RemoteKey, MD5Hash)])
|
||||||
|
@ -17,8 +17,8 @@ case class DummyStorageService(s3ObjectData: S3ObjectsData,
|
||||||
override def listObjects(
|
override def listObjects(
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
prefix: RemoteKey
|
prefix: RemoteKey
|
||||||
): TaskR[Console, S3ObjectsData] =
|
): RIO[Console, S3ObjectsData] =
|
||||||
TaskR(s3ObjectData)
|
RIO(s3ObjectData)
|
||||||
|
|
||||||
override def upload(
|
override def upload(
|
||||||
localFile: LocalFile,
|
localFile: LocalFile,
|
||||||
|
|
|
@ -4,7 +4,7 @@ import java.io.{File, FileInputStream}
|
||||||
import java.nio.file.{Files, Path}
|
import java.nio.file.{Files, Path}
|
||||||
import java.util.stream
|
import java.util.stream
|
||||||
|
|
||||||
import zio.{Task, TaskR, UIO, ZIO, ZManaged}
|
import zio.{Task, RIO, UIO, ZIO, ZManaged}
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
|
@ -17,8 +17,8 @@ object FileSystem {
|
||||||
trait Service {
|
trait Service {
|
||||||
def fileExists(file: File): ZIO[FileSystem, Throwable, Boolean]
|
def fileExists(file: File): ZIO[FileSystem, Throwable, Boolean]
|
||||||
def openManagedFileInputStream(file: File, offset: Long = 0L)
|
def openManagedFileInputStream(file: File, offset: Long = 0L)
|
||||||
: TaskR[FileSystem, ZManaged[Any, Throwable, FileInputStream]]
|
: RIO[FileSystem, ZManaged[Any, Throwable, FileInputStream]]
|
||||||
def fileLines(file: File): TaskR[FileSystem, Seq[String]]
|
def fileLines(file: File): RIO[FileSystem, Seq[String]]
|
||||||
}
|
}
|
||||||
trait Live extends FileSystem {
|
trait Live extends FileSystem {
|
||||||
override val filesystem: Service = new Service {
|
override val filesystem: Service = new Service {
|
||||||
|
@ -27,7 +27,7 @@ object FileSystem {
|
||||||
): ZIO[FileSystem, Throwable, Boolean] = ZIO(file.exists)
|
): ZIO[FileSystem, Throwable, Boolean] = ZIO(file.exists)
|
||||||
|
|
||||||
override def openManagedFileInputStream(file: File, offset: Long)
|
override def openManagedFileInputStream(file: File, offset: Long)
|
||||||
: TaskR[FileSystem, ZManaged[Any, Throwable, FileInputStream]] = {
|
: RIO[FileSystem, ZManaged[Any, Throwable, FileInputStream]] = {
|
||||||
|
|
||||||
def acquire =
|
def acquire =
|
||||||
Task {
|
Task {
|
||||||
|
@ -42,7 +42,7 @@ object FileSystem {
|
||||||
ZIO(ZManaged.make(acquire)(release))
|
ZIO(ZManaged.make(acquire)(release))
|
||||||
}
|
}
|
||||||
|
|
||||||
override def fileLines(file: File): TaskR[FileSystem, Seq[String]] = {
|
override def fileLines(file: File): RIO[FileSystem, Seq[String]] = {
|
||||||
def acquire = ZIO(Files.lines(file.toPath))
|
def acquire = ZIO(Files.lines(file.toPath))
|
||||||
def use(lines: stream.Stream[String]) =
|
def use(lines: stream.Stream[String]) =
|
||||||
ZIO.effectTotal(lines.iterator.asScala.toList)
|
ZIO.effectTotal(lines.iterator.asScala.toList)
|
||||||
|
@ -63,10 +63,10 @@ object FileSystem {
|
||||||
fileExistsResultMap.map(m => m.keys.exists(_ equals file.toPath))
|
fileExistsResultMap.map(m => m.keys.exists(_ equals file.toPath))
|
||||||
|
|
||||||
override def openManagedFileInputStream(file: File, offset: Long)
|
override def openManagedFileInputStream(file: File, offset: Long)
|
||||||
: TaskR[FileSystem, ZManaged[Any, Throwable, FileInputStream]] =
|
: RIO[FileSystem, ZManaged[Any, Throwable, FileInputStream]] =
|
||||||
managedFileInputStream
|
managedFileInputStream
|
||||||
|
|
||||||
override def fileLines(file: File): TaskR[FileSystem, List[String]] =
|
override def fileLines(file: File): RIO[FileSystem, List[String]] =
|
||||||
fileLinesResult
|
fileLinesResult
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -75,9 +75,9 @@ object FileSystem {
|
||||||
ZIO.accessM(_.filesystem fileExists file)
|
ZIO.accessM(_.filesystem fileExists file)
|
||||||
|
|
||||||
final def open(file: File, offset: Long = 0)
|
final def open(file: File, offset: Long = 0)
|
||||||
: TaskR[FileSystem, ZManaged[FileSystem, Throwable, FileInputStream]] =
|
: RIO[FileSystem, ZManaged[FileSystem, Throwable, FileInputStream]] =
|
||||||
ZIO.accessM(_.filesystem openManagedFileInputStream (file, offset))
|
ZIO.accessM(_.filesystem openManagedFileInputStream (file, offset))
|
||||||
|
|
||||||
final def lines(file: File): TaskR[FileSystem, Seq[String]] =
|
final def lines(file: File): RIO[FileSystem, Seq[String]] =
|
||||||
ZIO.accessM(_.filesystem fileLines (file))
|
ZIO.accessM(_.filesystem fileLines (file))
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,7 @@ package net.kemitix.thorp.storage.api
|
||||||
import net.kemitix.thorp.config.Config
|
import net.kemitix.thorp.config.Config
|
||||||
import net.kemitix.thorp.console.Console
|
import net.kemitix.thorp.console.Console
|
||||||
import net.kemitix.thorp.domain._
|
import net.kemitix.thorp.domain._
|
||||||
import zio.{Task, TaskR, UIO, ZIO}
|
import zio.{Task, RIO, UIO, ZIO}
|
||||||
|
|
||||||
trait Storage {
|
trait Storage {
|
||||||
val storage: Storage.Service
|
val storage: Storage.Service
|
||||||
|
@ -15,7 +15,7 @@ object Storage {
|
||||||
def listObjects(
|
def listObjects(
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
prefix: RemoteKey
|
prefix: RemoteKey
|
||||||
): TaskR[Storage with Console, S3ObjectsData]
|
): RIO[Storage with Console, S3ObjectsData]
|
||||||
|
|
||||||
def upload(
|
def upload(
|
||||||
localFile: LocalFile,
|
localFile: LocalFile,
|
||||||
|
@ -50,7 +50,7 @@ object Storage {
|
||||||
|
|
||||||
override def listObjects(
|
override def listObjects(
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
prefix: RemoteKey): TaskR[Storage with Console, S3ObjectsData] =
|
prefix: RemoteKey): RIO[Storage with Console, S3ObjectsData] =
|
||||||
listResult
|
listResult
|
||||||
|
|
||||||
override def upload(
|
override def upload(
|
||||||
|
@ -90,9 +90,8 @@ object Storage {
|
||||||
Task.die(new NotImplementedError)
|
Task.die(new NotImplementedError)
|
||||||
}
|
}
|
||||||
|
|
||||||
final def list(
|
final def list(bucket: Bucket,
|
||||||
bucket: Bucket,
|
prefix: RemoteKey): RIO[Storage with Console, S3ObjectsData] =
|
||||||
prefix: RemoteKey): TaskR[Storage with Console, S3ObjectsData] =
|
|
||||||
ZIO.accessM(_.storage listObjects (bucket, prefix))
|
ZIO.accessM(_.storage listObjects (bucket, prefix))
|
||||||
|
|
||||||
final def upload(
|
final def upload(
|
||||||
|
|
|
@ -45,7 +45,7 @@ trait Copier {
|
||||||
|
|
||||||
private def foldFailure(
|
private def foldFailure(
|
||||||
sourceKey: RemoteKey,
|
sourceKey: RemoteKey,
|
||||||
targetKey: RemoteKey): S3ClientException => StorageQueueEvent = {
|
targetKey: RemoteKey): Throwable => StorageQueueEvent = {
|
||||||
case error: SdkClientException =>
|
case error: SdkClientException =>
|
||||||
errorEvent(sourceKey, targetKey, error)
|
errorEvent(sourceKey, targetKey, error)
|
||||||
case error =>
|
case error =>
|
||||||
|
|
|
@ -9,7 +9,7 @@ import net.kemitix.thorp.console._
|
||||||
import net.kemitix.thorp.domain.{Bucket, RemoteKey, S3ObjectsData}
|
import net.kemitix.thorp.domain.{Bucket, RemoteKey, S3ObjectsData}
|
||||||
import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash
|
import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash
|
||||||
import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey
|
import net.kemitix.thorp.storage.aws.S3ObjectsByKey.byKey
|
||||||
import zio.{Task, TaskR}
|
import zio.{Task, RIO}
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
|
@ -21,7 +21,7 @@ trait Lister {
|
||||||
def listObjects(amazonS3: AmazonS3.Client)(
|
def listObjects(amazonS3: AmazonS3.Client)(
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
prefix: RemoteKey
|
prefix: RemoteKey
|
||||||
): TaskR[Console, S3ObjectsData] = {
|
): RIO[Console, S3ObjectsData] = {
|
||||||
|
|
||||||
def request =
|
def request =
|
||||||
new ListObjectsV2Request()
|
new ListObjectsV2Request()
|
||||||
|
@ -31,15 +31,15 @@ trait Lister {
|
||||||
def requestMore: Token => ListObjectsV2Request =
|
def requestMore: Token => ListObjectsV2Request =
|
||||||
token => request.withContinuationToken(token)
|
token => request.withContinuationToken(token)
|
||||||
|
|
||||||
def fetchBatch: ListObjectsV2Request => TaskR[Console, Batch] =
|
def fetchBatch: ListObjectsV2Request => RIO[Console, Batch] =
|
||||||
request => ListerLogger.logFetchBatch *> tryFetchBatch(amazonS3)(request)
|
request => ListerLogger.logFetchBatch *> tryFetchBatch(amazonS3)(request)
|
||||||
|
|
||||||
def fetchMore: Option[Token] => TaskR[Console, Stream[S3ObjectSummary]] = {
|
def fetchMore: Option[Token] => RIO[Console, Stream[S3ObjectSummary]] = {
|
||||||
case None => TaskR.succeed(Stream.empty)
|
case None => RIO.succeed(Stream.empty)
|
||||||
case Some(token) => fetch(requestMore(token))
|
case Some(token) => fetch(requestMore(token))
|
||||||
}
|
}
|
||||||
|
|
||||||
def fetch: ListObjectsV2Request => TaskR[Console, Stream[S3ObjectSummary]] =
|
def fetch: ListObjectsV2Request => RIO[Console, Stream[S3ObjectSummary]] =
|
||||||
request =>
|
request =>
|
||||||
for {
|
for {
|
||||||
batch <- fetchBatch(request)
|
batch <- fetchBatch(request)
|
||||||
|
|
|
@ -1,10 +1,10 @@
|
||||||
package net.kemitix.thorp.storage.aws
|
package net.kemitix.thorp.storage.aws
|
||||||
|
|
||||||
import net.kemitix.thorp.console._
|
import net.kemitix.thorp.console._
|
||||||
import zio.TaskR
|
import zio.RIO
|
||||||
|
|
||||||
trait ListerLogger {
|
trait ListerLogger {
|
||||||
def logFetchBatch: TaskR[Console, Unit] =
|
def logFetchBatch: RIO[Console, Unit] =
|
||||||
Console.putStrLn("Fetching remote summaries...")
|
Console.putStrLn("Fetching remote summaries...")
|
||||||
}
|
}
|
||||||
object ListerLogger extends ListerLogger
|
object ListerLogger extends ListerLogger
|
||||||
|
|
|
@ -8,7 +8,7 @@ import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent
|
||||||
import net.kemitix.thorp.domain._
|
import net.kemitix.thorp.domain._
|
||||||
import net.kemitix.thorp.storage.api.Storage
|
import net.kemitix.thorp.storage.api.Storage
|
||||||
import net.kemitix.thorp.storage.api.Storage.Service
|
import net.kemitix.thorp.storage.api.Storage.Service
|
||||||
import zio.{TaskR, UIO, ZIO}
|
import zio.{RIO, UIO, ZIO}
|
||||||
|
|
||||||
object S3Storage {
|
object S3Storage {
|
||||||
trait Live extends Storage {
|
trait Live extends Storage {
|
||||||
|
@ -19,9 +19,8 @@ object S3Storage {
|
||||||
private val transferManager: AmazonTransferManager =
|
private val transferManager: AmazonTransferManager =
|
||||||
AmazonTransferManager(TransferManagerBuilder.defaultTransferManager)
|
AmazonTransferManager(TransferManagerBuilder.defaultTransferManager)
|
||||||
|
|
||||||
override def listObjects(
|
override def listObjects(bucket: Bucket,
|
||||||
bucket: Bucket,
|
prefix: RemoteKey): RIO[Console, S3ObjectsData] =
|
||||||
prefix: RemoteKey): TaskR[Console, S3ObjectsData] =
|
|
||||||
Lister.listObjects(client)(bucket, prefix)
|
Lister.listObjects(client)(bucket, prefix)
|
||||||
|
|
||||||
override def upload(
|
override def upload(
|
||||||
|
|
|
@ -9,13 +9,13 @@ import net.kemitix.thorp.core.hasher.Hasher
|
||||||
import net.kemitix.thorp.domain.HashType.MD5
|
import net.kemitix.thorp.domain.HashType.MD5
|
||||||
import net.kemitix.thorp.domain.MD5Hash
|
import net.kemitix.thorp.domain.MD5Hash
|
||||||
import net.kemitix.thorp.filesystem.FileSystem
|
import net.kemitix.thorp.filesystem.FileSystem
|
||||||
import zio.{TaskR, ZIO}
|
import zio.{RIO, ZIO}
|
||||||
|
|
||||||
private trait ETagGenerator {
|
private trait ETagGenerator {
|
||||||
|
|
||||||
def eTag(
|
def eTag(
|
||||||
path: Path
|
path: Path
|
||||||
): TaskR[Hasher with FileSystem, String] = {
|
): RIO[Hasher with FileSystem, String] = {
|
||||||
val partSize = calculatePartSize(path)
|
val partSize = calculatePartSize(path)
|
||||||
val parts = numParts(path.toFile.length, partSize)
|
val parts = numParts(path.toFile.length, partSize)
|
||||||
eTagHex(path, partSize, parts)
|
eTagHex(path, partSize, parts)
|
||||||
|
|
|
@ -8,7 +8,7 @@ import net.kemitix.thorp.core.hasher.Hasher.Service
|
||||||
import net.kemitix.thorp.domain.{HashType, MD5Hash}
|
import net.kemitix.thorp.domain.{HashType, MD5Hash}
|
||||||
import net.kemitix.thorp.filesystem.FileSystem
|
import net.kemitix.thorp.filesystem.FileSystem
|
||||||
import net.kemitix.thorp.storage.aws.ETag
|
import net.kemitix.thorp.storage.aws.ETag
|
||||||
import zio.TaskR
|
import zio.RIO
|
||||||
|
|
||||||
object S3Hasher {
|
object S3Hasher {
|
||||||
|
|
||||||
|
@ -22,7 +22,7 @@ object S3Hasher {
|
||||||
* @return a set of hash values
|
* @return a set of hash values
|
||||||
*/
|
*/
|
||||||
override def hashObject(
|
override def hashObject(
|
||||||
path: Path): TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] =
|
path: Path): RIO[Hasher with FileSystem, Map[HashType, MD5Hash]] =
|
||||||
for {
|
for {
|
||||||
base <- CoreHasher.hashObject(path)
|
base <- CoreHasher.hashObject(path)
|
||||||
etag <- ETagGenerator.eTag(path).map(MD5Hash(_))
|
etag <- ETagGenerator.eTag(path).map(MD5Hash(_))
|
||||||
|
@ -31,13 +31,13 @@ object S3Hasher {
|
||||||
override def hashObjectChunk(path: Path,
|
override def hashObjectChunk(path: Path,
|
||||||
chunkNumber: Long,
|
chunkNumber: Long,
|
||||||
chunkSize: Long)
|
chunkSize: Long)
|
||||||
: TaskR[Hasher with FileSystem, Map[HashType, MD5Hash]] =
|
: RIO[Hasher with FileSystem, Map[HashType, MD5Hash]] =
|
||||||
CoreHasher.hashObjectChunk(path, chunkNumber, chunkSize)
|
CoreHasher.hashObjectChunk(path, chunkNumber, chunkSize)
|
||||||
|
|
||||||
override def hex(in: Array[Byte]): TaskR[Hasher, String] =
|
override def hex(in: Array[Byte]): RIO[Hasher, String] =
|
||||||
CoreHasher.hex(in)
|
CoreHasher.hex(in)
|
||||||
|
|
||||||
override def digest(in: String): TaskR[Hasher, Array[Byte]] =
|
override def digest(in: String): RIO[Hasher, Array[Byte]] =
|
||||||
CoreHasher.digest(in)
|
CoreHasher.digest(in)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,7 +6,7 @@ import net.kemitix.thorp.domain.StorageQueueEvent.ShutdownQueueEvent
|
||||||
import net.kemitix.thorp.domain._
|
import net.kemitix.thorp.domain._
|
||||||
import net.kemitix.thorp.storage.api.Storage
|
import net.kemitix.thorp.storage.api.Storage
|
||||||
import org.scalamock.scalatest.MockFactory
|
import org.scalamock.scalatest.MockFactory
|
||||||
import zio.{TaskR, UIO, ZIO}
|
import zio.{RIO, UIO, ZIO}
|
||||||
|
|
||||||
trait AmazonS3ClientTestFixture extends MockFactory {
|
trait AmazonS3ClientTestFixture extends MockFactory {
|
||||||
|
|
||||||
|
@ -26,7 +26,7 @@ trait AmazonS3ClientTestFixture extends MockFactory {
|
||||||
override def listObjects(
|
override def listObjects(
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
prefix: RemoteKey
|
prefix: RemoteKey
|
||||||
): TaskR[Console, S3ObjectsData] =
|
): RIO[Console, S3ObjectsData] =
|
||||||
Lister.listObjects(client)(bucket, prefix)
|
Lister.listObjects(client)(bucket, prefix)
|
||||||
|
|
||||||
override def upload(
|
override def upload(
|
||||||
|
|
Loading…
Reference in a new issue