Create and use a cache of hashes for local files (#249)

* [domain] Define Hashes in domain package

* [filesystem] Load and parse any .thorp.cache files found

* [filesystem] Use cached file data when available and up-to-date

* [lib] FileScanner refactoring

* [filesystem] scan sub-dirs first to minimise time cache is on heap

* [filesystem] Write new cache data to temp file

* [lib] replace cache file when finished updating

* [filesystem] AppendLines to correct file with new lines

* [domain] decode HashType from String

* [filesystem] Store last modified time as epoch milliseconds

* [filesystem] parse lastmodified as a long

* [filesystem] use all hash values in cache

* [lib] FileScanner rearrange code

* [lib] Create and use a single cache file per source

* [storage-aws] Use ETag hash from cache when available

* [filesystem] Merge file data together correctly

* [filesystem] Handle exceptions thrown by Files.mode correctly

* [readme] Add section on caching

* [changelog] updated

* [changelog] add pending dependencies notes

* [lib] Filters should not name methods after their defining object

* [lib] Fix up test
This commit is contained in:
Paul Campbell 2019-10-27 19:53:00 +00:00 committed by GitHub
parent ed1f0ec7ee
commit f35ea9795d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 421 additions and 85 deletions

View file

@ -6,6 +6,17 @@ The format is based on [[https://keepachangelog.com/en/1.0.0/][Keep a Changelog]
[[https://semver.org/spec/v2.0.0.html][Semantic Versioning]]. [[https://semver.org/spec/v2.0.0.html][Semantic Versioning]].
* [1.0.0] - ???
** Added
- Create and use a cache of hashes for local files (#249)
** Dependencies
- Revert "[sbt] Rollback ~sbt-ci-release~ to ~1.3.2~ (#231)"
- Update ~sbt~ to ~1.3.3~ (#238)
* [0.10.0] - 2019-10-08 * [0.10.0] - 2019-10-08
This is the last ~v0.x~ feature release. The next feature release will be ~v1.x~. This is the last ~v0.x~ feature release. The next feature release will be ~v1.x~.

View file

@ -64,6 +64,10 @@ that can be written to a file.
Note, that ~include~ and ~exclude~ are cumulative across all Note, that ~include~ and ~exclude~ are cumulative across all
configuration files. configuration files.
* Caching
The last modified time for files is used to decide whether to calculate the hash values for the file. If a file has not been updated, then the hash values stored in the `.thorp.cache` file located in the root of the source is used. Otherwise the file will be read to caculate the the new hashes.
* Behaviour * Behaviour
When considering a local file, the following table governs what should happen: When considering a local file, the following table governs what should happen:

View file

@ -8,7 +8,7 @@ import net.kemitix.thorp.domain.Implicits._
final case class LocalFile private ( final case class LocalFile private (
file: File, file: File,
source: File, source: File,
hashes: Map[HashType, MD5Hash], hashes: Hashes,
remoteKey: RemoteKey, remoteKey: RemoteKey,
length: Long length: Long
) )

View file

@ -36,7 +36,7 @@ object RemoteObjects {
def remoteHasHash( def remoteHasHash(
remoteObjects: RemoteObjects, remoteObjects: RemoteObjects,
hashes: Map[HashType, MD5Hash] hashes: Hashes
): UIO[Option[(RemoteKey, MD5Hash)]] = ): UIO[Option[(RemoteKey, MD5Hash)]] =
UIO(remoteObjects.byHash.collectFirst { UIO(remoteObjects.byHash.collectFirst {
case (hash, key) if (hashes.values.exists(h => h == hash)) => (key, hash) case (hash, key) if (hashes.values.exists(h => h == hash)) => (key, hash)

View file

@ -0,0 +1,8 @@
package net.kemitix.thorp
import java.time.Instant
package object domain {
type Hashes = Map[HashType, MD5Hash]
type LastModified = Instant
}

View file

@ -0,0 +1,22 @@
package net.kemitix.thorp.filesystem
import net.kemitix.thorp.domain.{Hashes, LastModified}
case class FileData(
hashes: Hashes,
lastModified: LastModified
) {
def +(other: FileData): FileData = {
FileData(
hashes = this.hashes ++ other.hashes,
lastModified = lastModified // discards other.lastModified
)
}
}
object FileData {
def create(hashes: Hashes, lastModified: LastModified): FileData = FileData(
hashes = hashes,
lastModified = lastModified
)
}

View file

@ -1,10 +1,11 @@
package net.kemitix.thorp.filesystem package net.kemitix.thorp.filesystem
import java.io.{File, FileInputStream} import java.io.{File, FileInputStream, FileWriter}
import java.nio.file.{Files, Path} import java.nio.file.{Files, Path, StandardCopyOption}
import java.time.Instant
import java.util.stream import java.util.stream
import net.kemitix.thorp.domain.{RemoteKey, Sources} import net.kemitix.thorp.domain.{Hashes, RemoteKey, Sources}
import zio._ import zio._
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
@ -19,12 +20,19 @@ object FileSystem {
def openManagedFileInputStream(file: File, offset: Long) def openManagedFileInputStream(file: File, offset: Long)
: RIO[FileSystem, ZManaged[Any, Throwable, FileInputStream]] : RIO[FileSystem, ZManaged[Any, Throwable, FileInputStream]]
def fileLines(file: File): RIO[FileSystem, Seq[String]] def fileLines(file: File): RIO[FileSystem, Seq[String]]
def appendLines(lines: Iterable[String], file: File): UIO[Unit]
def isDirectory(file: File): RIO[FileSystem, Boolean] def isDirectory(file: File): RIO[FileSystem, Boolean]
def listFiles(path: Path): UIO[List[File]] def listFiles(path: Path): UIO[List[File]]
def listDirs(path: Path): UIO[List[Path]]
def length(file: File): ZIO[FileSystem, Nothing, Long] def length(file: File): ZIO[FileSystem, Nothing, Long]
def lastModified(file: File): UIO[Instant]
def hasLocalFile(sources: Sources, def hasLocalFile(sources: Sources,
prefix: RemoteKey, prefix: RemoteKey,
remoteKey: RemoteKey): ZIO[FileSystem, Nothing, Boolean] remoteKey: RemoteKey): ZIO[FileSystem, Nothing, Boolean]
def findCache(
directory: Path): ZIO[FileSystem with Hasher, Nothing, PathCache]
def getHashes(path: Path, fileData: FileData): ZIO[FileSystem, Any, Hashes]
def moveFile(source: Path, target: Path): UIO[Unit]
} }
trait Live extends FileSystem { trait Live extends FileSystem {
override val filesystem: Service = new Service { override val filesystem: Service = new Service {
@ -59,12 +67,28 @@ object FileSystem {
Task(file.isDirectory) Task(file.isDirectory)
override def listFiles(path: Path): UIO[List[File]] = override def listFiles(path: Path): UIO[List[File]] =
Task(List.from(path.toFile.listFiles())) Task {
.catchAll(_ => UIO.succeed(List.empty[File])) List
.from(path.toFile.listFiles())
.filterNot(_.isDirectory)
.filterNot(_.getName.contentEquals(PathCache.fileName))
.filterNot(_.getName.contentEquals(PathCache.tempFileName))
}.catchAll(_ => UIO.succeed(List.empty[File]))
override def listDirs(path: Path): UIO[List[Path]] =
Task(
List
.from(path.toFile.listFiles())
.filter(_.isDirectory)
.map(_.toPath))
.catchAll(_ => UIO.succeed(List.empty[Path]))
override def length(file: File): ZIO[FileSystem, Nothing, Long] = override def length(file: File): ZIO[FileSystem, Nothing, Long] =
UIO(file.length) UIO(file.length)
override def lastModified(file: File): UIO[Instant] =
UIO(Instant.ofEpochMilli(file.lastModified()))
override def hasLocalFile( override def hasLocalFile(
sources: Sources, sources: Sources,
prefix: RemoteKey, prefix: RemoteKey,
@ -77,6 +101,40 @@ object FileSystem {
.map(_ || accExists) .map(_ || accExists)
} }
} }
override def findCache(
directory: Path): ZIO[FileSystem with Hasher, Nothing, PathCache] =
for {
cacheFile <- UIO(directory.resolve(PathCache.fileName).toFile)
lines <- fileLines(cacheFile).catchAll(_ => UIO(List.empty))
cache <- PathCache.fromLines(lines)
} yield cache
override def getHashes(
path: Path,
fileData: FileData): ZIO[FileSystem, Any, Hashes] = {
val lastModified = Instant.ofEpochMilli(path.toFile.lastModified())
if (lastModified.isAfter(fileData.lastModified)) {
ZIO.fail("fileData is out-of-date")
} else {
ZIO.succeed(fileData.hashes)
}
}
override def appendLines(lines: Iterable[String], file: File): UIO[Unit] =
UIO.bracket(UIO(new FileWriter(file, true)))(fw => UIO(fw.close()))(
fw =>
UIO {
lines.map(line => fw.append(line + System.lineSeparator()))
})
override def moveFile(source: Path, target: Path): UIO[Unit] =
IO {
if (source.toFile.exists()) {
Files.move(source, target, StandardCopyOption.ATOMIC_MOVE)
}
()
}.catchAll(_ => UIO.unit)
} }
} }
object Live extends Live object Live extends Live
@ -86,9 +144,13 @@ object FileSystem {
val fileLinesResult: Task[List[String]] val fileLinesResult: Task[List[String]]
val isDirResult: Task[Boolean] val isDirResult: Task[Boolean]
val listFilesResult: UIO[List[File]] val listFilesResult: UIO[List[File]]
val listDirsResult: UIO[List[Path]]
val lengthResult: UIO[Long] val lengthResult: UIO[Long]
val lastModifiedResult: UIO[Instant]
val managedFileInputStream: Task[ZManaged[Any, Throwable, FileInputStream]] val managedFileInputStream: Task[ZManaged[Any, Throwable, FileInputStream]]
val hasLocalFileResult: UIO[Boolean] val hasLocalFileResult: UIO[Boolean]
val pathCacheResult: UIO[PathCache]
val matchesResult: IO[Any, Hashes]
override val filesystem: Service = new Service { override val filesystem: Service = new Service {
@ -108,14 +170,33 @@ object FileSystem {
override def listFiles(path: Path): UIO[List[File]] = override def listFiles(path: Path): UIO[List[File]] =
listFilesResult listFilesResult
override def listDirs(path: Path): UIO[List[Path]] =
listDirsResult
override def length(file: File): UIO[Long] = override def length(file: File): UIO[Long] =
lengthResult lengthResult
override def lastModified(file: File): UIO[Instant] =
lastModifiedResult
override def hasLocalFile( override def hasLocalFile(
sources: Sources, sources: Sources,
prefix: RemoteKey, prefix: RemoteKey,
remoteKey: RemoteKey): ZIO[FileSystem, Nothing, Boolean] = remoteKey: RemoteKey): ZIO[FileSystem, Nothing, Boolean] =
hasLocalFileResult hasLocalFileResult
override def findCache(directory: Path): UIO[PathCache] =
pathCacheResult
override def getHashes(path: Path,
fileData: FileData): ZIO[FileSystem, Any, Hashes] =
matchesResult
override def appendLines(lines: Iterable[String], file: File): UIO[Unit] =
UIO.unit
override def moveFile(source: Path, target: Path): UIO[Unit] =
UIO.unit
} }
} }
@ -136,9 +217,18 @@ object FileSystem {
final def isDirectory(file: File): RIO[FileSystem, Boolean] = final def isDirectory(file: File): RIO[FileSystem, Boolean] =
ZIO.accessM(_.filesystem.isDirectory(file)) ZIO.accessM(_.filesystem.isDirectory(file))
/**
* Lists only files within the Path.
*/
final def listFiles(path: Path): ZIO[FileSystem, Nothing, List[File]] = final def listFiles(path: Path): ZIO[FileSystem, Nothing, List[File]] =
ZIO.accessM(_.filesystem.listFiles(path)) ZIO.accessM(_.filesystem.listFiles(path))
/**
* Lists only sub-directories within the Path.
*/
final def listDirs(path: Path): ZIO[FileSystem, Nothing, List[Path]] =
ZIO.accessM(_.filesystem.listDirs(path))
final def length(file: File): ZIO[FileSystem, Nothing, Long] = final def length(file: File): ZIO[FileSystem, Nothing, Long] =
ZIO.accessM(_.filesystem.length(file)) ZIO.accessM(_.filesystem.length(file))
@ -147,4 +237,26 @@ object FileSystem {
prefix: RemoteKey, prefix: RemoteKey,
remoteKey: RemoteKey): ZIO[FileSystem, Nothing, Boolean] = remoteKey: RemoteKey): ZIO[FileSystem, Nothing, Boolean] =
ZIO.accessM(_.filesystem.hasLocalFile(sources, prefix, remoteKey)) ZIO.accessM(_.filesystem.hasLocalFile(sources, prefix, remoteKey))
final def findCache(
directory: Path): ZIO[FileSystem with Hasher, Nothing, PathCache] =
ZIO.accessM(_.filesystem.findCache(directory))
final def getHashes(path: Path,
fileData: FileData): ZIO[FileSystem, Any, Hashes] =
ZIO.accessM(_.filesystem.getHashes(path, fileData))
final def lastModified(file: File): ZIO[FileSystem, Nothing, Instant] =
ZIO.accessM(_.filesystem.lastModified(file))
final def appendLines(lines: Iterable[String],
file: File): ZIO[FileSystem, Nothing, Unit] =
ZIO.accessM(_.filesystem.appendLines(lines, file))
final def moveFile(
source: Path,
target: Path
): ZIO[FileSystem, Nothing, Unit] =
ZIO.accessM(_.filesystem.moveFile(source, target))
} }

View file

@ -4,7 +4,7 @@ import java.nio.file.Path
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import net.kemitix.thorp.domain.HashType.MD5 import net.kemitix.thorp.domain.HashType.MD5
import net.kemitix.thorp.domain.{HashType, MD5Hash} import net.kemitix.thorp.domain.{HashType, Hashes}
import zio.{RIO, ZIO} import zio.{RIO, ZIO}
/** /**
@ -15,27 +15,33 @@ trait Hasher {
} }
object Hasher { object Hasher {
trait Service { trait Service {
def typeFrom(str: String): ZIO[Hasher, IllegalArgumentException, HashType]
def hashObject( def hashObject(
path: Path): RIO[Hasher with FileSystem, Map[HashType, MD5Hash]]
def hashObjectChunk(
path: Path, path: Path,
chunkNumber: Long, cachedFileData: Option[FileData]): RIO[Hasher with FileSystem, Hashes]
chunkSize: Long): RIO[Hasher with FileSystem, Map[HashType, MD5Hash]] def hashObjectChunk(path: Path,
chunkNumber: Long,
chunkSize: Long): RIO[Hasher with FileSystem, Hashes]
def hex(in: Array[Byte]): RIO[Hasher, String] def hex(in: Array[Byte]): RIO[Hasher, String]
def digest(in: String): RIO[Hasher, Array[Byte]] def digest(in: String): RIO[Hasher, Array[Byte]]
} }
trait Live extends Hasher { trait Live extends Hasher {
val hasher: Service = new Service { val hasher: Service = new Service {
override def hashObject( override def hashObject(
path: Path): RIO[FileSystem, Map[HashType, MD5Hash]] = path: Path,
for { cachedFileData: Option[FileData]): RIO[FileSystem, Hashes] =
md5 <- MD5HashGenerator.md5File(path) ZIO
} yield Map(MD5 -> md5) .fromOption(cachedFileData)
.flatMap(fileData => FileSystem.getHashes(path, fileData))
.orElse(for {
md5 <- MD5HashGenerator.md5File(path)
} yield Map(MD5 -> md5))
override def hashObjectChunk(path: Path, override def hashObjectChunk(
chunkNumber: Long, path: Path,
chunkSize: Long) chunkNumber: Long,
: RIO[Hasher with FileSystem, Map[HashType, MD5Hash]] = chunkSize: Long): RIO[Hasher with FileSystem, Hashes] =
for { for {
md5 <- MD5HashGenerator.md5FileChunk(path, md5 <- MD5HashGenerator.md5FileChunk(path,
chunkNumber * chunkSize, chunkNumber * chunkSize,
@ -47,25 +53,33 @@ object Hasher {
override def digest(in: String): RIO[Hasher, Array[Byte]] = override def digest(in: String): RIO[Hasher, Array[Byte]] =
ZIO(MD5HashGenerator.digest(in)) ZIO(MD5HashGenerator.digest(in))
override def typeFrom(
str: String): ZIO[Hasher, IllegalArgumentException, HashType] =
if (str.contentEquals("MD5")) {
ZIO.succeed(MD5)
} else {
ZIO.fail(
new IllegalArgumentException("Unknown Hash Type: %s".format(str)))
}
} }
} }
object Live extends Live object Live extends Live
trait Test extends Hasher { trait Test extends Hasher {
val hashes: AtomicReference[Map[Path, Map[HashType, MD5Hash]]] = val hashes: AtomicReference[Map[Path, Hashes]] =
new AtomicReference(Map.empty) new AtomicReference(Map.empty)
val hashChunks val hashChunks: AtomicReference[Map[Path, Map[Long, Hashes]]] =
: AtomicReference[Map[Path, Map[Long, Map[HashType, MD5Hash]]]] =
new AtomicReference(Map.empty) new AtomicReference(Map.empty)
val hasher: Service = new Service { val hasher: Service = new Service {
override def hashObject( override def hashObject(path: Path, cachedFileData: Option[FileData])
path: Path): RIO[Hasher with FileSystem, Map[HashType, MD5Hash]] = : RIO[Hasher with FileSystem, Hashes] =
ZIO(hashes.get()(path)) ZIO(hashes.get()(path))
override def hashObjectChunk(path: Path, override def hashObjectChunk(
chunkNumber: Long, path: Path,
chunkSize: Long) chunkNumber: Long,
: RIO[Hasher with FileSystem, Map[HashType, MD5Hash]] = chunkSize: Long): RIO[Hasher with FileSystem, Hashes] =
ZIO(hashChunks.get()(path)(chunkNumber)) ZIO(hashChunks.get()(path)(chunkNumber))
override def hex(in: Array[Byte]): RIO[Hasher, String] = override def hex(in: Array[Byte]): RIO[Hasher, String] =
@ -73,18 +87,23 @@ object Hasher {
override def digest(in: String): RIO[Hasher, Array[Byte]] = override def digest(in: String): RIO[Hasher, Array[Byte]] =
ZIO(MD5HashGenerator.digest(in)) ZIO(MD5HashGenerator.digest(in))
override def typeFrom(
str: String): ZIO[Hasher, IllegalArgumentException, HashType] =
Live.hasher.typeFrom(str)
} }
} }
object Test extends Test object Test extends Test
final def hashObject( final def hashObject(
path: Path): RIO[Hasher with FileSystem, Map[HashType, MD5Hash]] = path: Path,
ZIO.accessM(_.hasher hashObject path) cachedFileData: Option[FileData]): RIO[Hasher with FileSystem, Hashes] =
ZIO.accessM(_.hasher.hashObject(path, cachedFileData))
final def hashObjectChunk( final def hashObjectChunk(
path: Path, path: Path,
chunkNumber: Long, chunkNumber: Long,
chunkSize: Long): RIO[Hasher with FileSystem, Map[HashType, MD5Hash]] = chunkSize: Long): RIO[Hasher with FileSystem, Hashes] =
ZIO.accessM(_.hasher hashObjectChunk (path, chunkNumber, chunkSize)) ZIO.accessM(_.hasher hashObjectChunk (path, chunkNumber, chunkSize))
final def hex(in: Array[Byte]): RIO[Hasher, String] = final def hex(in: Array[Byte]): RIO[Hasher, String] =
@ -92,4 +111,9 @@ object Hasher {
final def digest(in: String): RIO[Hasher, Array[Byte]] = final def digest(in: String): RIO[Hasher, Array[Byte]] =
ZIO.accessM(_.hasher digest in) ZIO.accessM(_.hasher digest in)
final def typeFrom(
str: String): ZIO[Hasher, IllegalArgumentException, HashType] =
ZIO.accessM(_.hasher.typeFrom(str))
} }

View file

@ -0,0 +1,74 @@
package net.kemitix.thorp.filesystem
import java.nio.file.{Path, Paths}
import java.time.Instant
import java.util.regex.Pattern
import net.kemitix.thorp.domain.{HashType, MD5Hash}
import zio.{UIO, ZIO}
/**
* Meta data for files in the current source, as of the last time Thorp processed this directory.
*
* <p>N.B. Does not include sub-directories.</p>
*/
final case class PathCache(
data: PathCache.Data
) {
def get(path: Path): Option[FileData] = data.get(path)
}
object PathCache {
type Data = Map[Path, FileData]
val fileName = ".thorp.cache"
val tempFileName = ".thorp.cache.tmp"
def create(path: Path, fileData: FileData): UIO[Iterable[String]] =
UIO {
fileData.hashes.keys.map(hashType => {
val hash = fileData.hashes(hashType)
val modified = fileData.lastModified
String.join(":",
hashType.toString,
hash.in,
modified.toEpochMilli.toString,
path.toString)
})
}
private val pattern =
"^(?<hashtype>.+):(?<hash>.+):(?<modified>\\d+):(?<filename>.+)$"
private val format = Pattern.compile(pattern)
def fromLines(lines: Seq[String]): ZIO[Hasher, Nothing, PathCache] =
ZIO
.foreach(
lines
.map(format.matcher(_))
.filter(_.matches())) { matcher =>
for {
hashType <- Hasher.typeFrom(matcher.group("hashtype"))
} yield
(Paths.get(matcher.group("filename")) -> FileData
.create(
Map[HashType, MD5Hash](
hashType -> MD5Hash(matcher.group("hash"))),
Instant.ofEpochMilli(matcher.group("modified").toLong)
))
}
.catchAll({ _: IllegalArgumentException =>
UIO(List.empty)
})
.map(list => mergeFileData(list))
.map(map => PathCache(map))
private def mergeFileData(
list: List[(Path, FileData)]
): Data = {
list.foldLeft(Map.empty[Path, FileData]) { (acc, pair) =>
val (fileName, fileData) = pair
acc.updatedWith(fileName)(
_.map(fd => fd + fileData)
.orElse(Some(fileData)))
}
}
}

View file

@ -0,0 +1,5 @@
package net.kemitix.thorp
package object filesystem {
type FileName = String
}

View file

@ -6,15 +6,8 @@ import java.nio.file.Path
import net.kemitix.eip.zio.MessageChannel.{EChannel, ESender} import net.kemitix.eip.zio.MessageChannel.{EChannel, ESender}
import net.kemitix.eip.zio.{Message, MessageChannel} import net.kemitix.eip.zio.{Message, MessageChannel}
import net.kemitix.thorp.config.Config import net.kemitix.thorp.config.Config
import net.kemitix.thorp.domain.{ import net.kemitix.thorp.domain._
Filter, import net.kemitix.thorp.filesystem._
HashType,
LocalFile,
MD5Hash,
RemoteKey,
Sources
}
import net.kemitix.thorp.filesystem.{FileSystem, Hasher}
import zio.clock.Clock import zio.clock.Clock
import zio.{RIO, UIO, ZIO} import zio.{RIO, UIO, ZIO}
@ -25,12 +18,21 @@ trait FileScanner {
object FileScanner { object FileScanner {
type RemoteHashes = Map[MD5Hash, RemoteKey] type RemoteHashes = Map[MD5Hash, RemoteKey]
type Hashes = Map[HashType, MD5Hash]
type ScannedFile = LocalFile type ScannedFile = LocalFile
type FileSender = ESender[Clock with Hasher with FileSystem with Config, type FileSender =
Throwable, ESender[Clock with Hasher with FileSystem with Config with FileScanner,
ScannedFile] Throwable,
ScannedFile]
type ScannerChannel = EChannel[Any, Throwable, ScannedFile] type ScannerChannel = EChannel[Any, Throwable, ScannedFile]
type CacheData = (Path, FileData)
type CacheChannel = EChannel[Any, Throwable, CacheData]
type CacheSender =
ESender[Clock with FileSystem with Hasher with FileScanner with Config,
Throwable,
CacheData]
final def scanSources: RIO[FileScanner, FileSender] =
ZIO.accessM(_.fileScanner.scanSources)
trait Service { trait Service {
def scanSources: RIO[FileScanner, FileSender] def scanSources: RIO[FileScanner, FileSender]
@ -40,49 +42,104 @@ object FileScanner {
val fileScanner: Service = new Service { val fileScanner: Service = new Service {
override def scanSources: RIO[FileScanner, FileSender] = override def scanSources: RIO[FileScanner, FileSender] =
RIO { channel => RIO { fileChannel =>
(for { (for {
sources <- Config.sources sources <- Config.sources
_ <- ZIO.foreach(sources.paths)(scanPath(channel)(_)) _ <- ZIO.foreach(sources.paths) { sourcePath =>
} yield ()) <* MessageChannel.endChannel(channel) for {
cacheSender <- scanSource(fileChannel)(sourcePath)
cacheReceiver <- cacheReceiver(sourcePath)
_ <- MessageChannel
.pointToPoint(cacheSender)(cacheReceiver)
.runDrain
_ <- FileSystem.moveFile(
sourcePath.resolve(PathCache.tempFileName),
sourcePath.resolve(PathCache.fileName))
} yield ()
}
} yield ()) <* MessageChannel.endChannel(fileChannel)
} }
private def scanPath(channel: ScannerChannel)(path: Path) private def scanSource(fileChannel: ScannerChannel)(
: ZIO[Clock with Config with Hasher with FileSystem, Throwable, Unit] = sourcePath: Path): RIO[FileScanner, CacheSender] =
RIO { cacheChannel =>
(for {
cache <- FileSystem.findCache(sourcePath)
_ <- scanPath(fileChannel, cacheChannel)(sourcePath, cache)
} yield ()) <* MessageChannel.endChannel(cacheChannel)
}
private def scanPath(
fileChannel: ScannerChannel,
cacheChannel: CacheChannel)(path: Path, cache: PathCache)
: ZIO[Clock with FileSystem with Hasher with FileScanner with Config,
Throwable,
Unit] =
for { for {
filters <- Config.filters dirs <- FileSystem.listDirs(path)
files <- FileSystem.listFiles(path) _ <- ZIO.foreach(dirs)(scanPath(fileChannel, cacheChannel)(_, cache))
_ <- ZIO.foreach(files)(handleFile(channel, filters)) files <- FileSystem.listFiles(path)
_ <- handleFiles(fileChannel, cacheChannel, cache, files)
} yield () } yield ()
private def handleFiles(
fileChannel: ScannerChannel,
cacheChannel: CacheChannel,
pathCache: PathCache,
files: List[File]
) =
ZIO.foreach(files) {
handleFile(fileChannel, cacheChannel, pathCache)
}
private def handleFile( private def handleFile(
channel: ScannerChannel, fileChannel: ScannerChannel,
filters: List[Filter] cacheChannel: CacheChannel,
)(file: File) = cache: PathCache
)(file: File)
: ZIO[Clock with FileSystem with Hasher with Config, Throwable, Unit] =
for { for {
isDir <- FileSystem.isDirectory(file) isIncluded <- Filters.isIncluded(file)
isIncluded <- UIO(Filters.isIncluded(file.toPath)(filters)) _ <- ZIO.when(isIncluded) {
_ <- ZIO.when(isIncluded && isDir)(scanPath(channel)(file.toPath)) sendHashedFile(fileChannel, cacheChannel)(file, cache)
_ <- ZIO.when(isIncluded && !isDir)(sendHashedFile(channel)(file)) }
} yield () } yield ()
private def sendHashedFile(channel: ScannerChannel)(file: File) = private def sendHashedFile(
fileChannel: ScannerChannel,
cacheChannel: CacheChannel
)(file: File, pathCache: PathCache) =
for { for {
sources <- Config.sources sources <- Config.sources
source <- Sources.forPath(file.toPath)(sources) source <- Sources.forPath(file.toPath)(sources)
prefix <- Config.prefix prefix <- Config.prefix
hashes <- Hasher.hashObject(file.toPath) path = source.relativize(file.toPath)
hashes <- Hasher.hashObject(file.toPath, pathCache.get(path))
remoteKey <- RemoteKey.from(source, prefix, file) remoteKey <- RemoteKey.from(source, prefix, file)
size <- FileSystem.length(file) size <- FileSystem.length(file)
localFile <- ZIO( fileMsg <- Message.create(
LocalFile(file, source.toFile, hashes, remoteKey, size)) LocalFile(file, source.toFile, hashes, remoteKey, size))
hashedFile <- Message.create(localFile) _ <- MessageChannel.send(fileChannel)(fileMsg)
_ <- MessageChannel.send(channel)(hashedFile) modified <- FileSystem.lastModified(file)
cacheMsg <- Message.create(
(path -> FileData.create(hashes, modified)))
_ <- MessageChannel.send(cacheChannel)(cacheMsg)
} yield () } yield ()
def cacheReceiver(sourcePath: Path)
: UIO[MessageChannel.UReceiver[FileSystem, CacheData]] = {
val tempFile = sourcePath.resolve(PathCache.tempFileName).toFile
UIO { message =>
val (path, fileData) = message.body
for {
line <- PathCache.create(path, fileData)
_ <- FileSystem.appendLines(line, tempFile)
} yield ()
}
}
} }
} }
object Live extends Live object Live extends Live
final def scanSources: RIO[FileScanner, FileSender] =
ZIO.accessM(_.fileScanner.scanSources)
} }

View file

@ -1,12 +1,20 @@
package net.kemitix.thorp.lib package net.kemitix.thorp.lib
import java.io.File
import java.nio.file.Path import java.nio.file.Path
import net.kemitix.thorp.config.Config
import net.kemitix.thorp.domain.Filter import net.kemitix.thorp.domain.Filter
import net.kemitix.thorp.domain.Filter.{Exclude, Include} import net.kemitix.thorp.domain.Filter.{Exclude, Include}
import zio.ZIO
object Filters { object Filters {
def isIncluded(file: File): ZIO[Config, Nothing, Boolean] =
for {
filters <- Config.filters
} yield isIncluded(file.toPath)(filters)
def isIncluded(p: Path)(filters: List[Filter]): Boolean = { def isIncluded(p: Path)(filters: List[Filter]): Boolean = {
sealed trait State sealed trait State
final case class Unknown() extends State final case class Unknown() extends State

View file

@ -11,7 +11,6 @@ import net.kemitix.thorp.domain.RemoteObjects.{
} }
import net.kemitix.thorp.domain._ import net.kemitix.thorp.domain._
import net.kemitix.thorp.filesystem.{FileSystem, Hasher} import net.kemitix.thorp.filesystem.{FileSystem, Hasher}
import net.kemitix.thorp.lib.FileScanner.Hashes
import net.kemitix.thorp.storage.Storage import net.kemitix.thorp.storage.Storage
import net.kemitix.thorp.uishell.UIEvent import net.kemitix.thorp.uishell.UIEvent
import zio._ import zio._

View file

@ -2,4 +2,4 @@ package net.kemitix.thorp.storage.aws
import net.kemitix.thorp.domain.HashType import net.kemitix.thorp.domain.HashType
object ETag extends HashType case object ETag extends HashType

View file

@ -2,12 +2,12 @@ package net.kemitix.thorp.storage.aws.hasher
import java.nio.file.Path import java.nio.file.Path
import net.kemitix.thorp.domain.{HashType, MD5Hash} import net.kemitix.thorp.domain.{HashType, Hashes, MD5Hash}
import net.kemitix.thorp.filesystem.Hasher.Live.{hasher => CoreHasher} import net.kemitix.thorp.filesystem.Hasher.Live.{hasher => CoreHasher}
import net.kemitix.thorp.filesystem.Hasher.Service import net.kemitix.thorp.filesystem.Hasher.Service
import net.kemitix.thorp.filesystem.{FileSystem, Hasher} import net.kemitix.thorp.filesystem.{FileData, FileSystem, Hasher}
import net.kemitix.thorp.storage.aws.ETag import net.kemitix.thorp.storage.aws.ETag
import zio.RIO import zio.{RIO, ZIO}
object S3Hasher { object S3Hasher {
@ -20,17 +20,20 @@ object S3Hasher {
* @param path the local path to scan * @param path the local path to scan
* @return a set of hash values * @return a set of hash values
*/ */
override def hashObject( override def hashObject(path: Path, cachedFileData: Option[FileData])
path: Path): RIO[Hasher with FileSystem, Map[HashType, MD5Hash]] = : RIO[Hasher with FileSystem, Hashes] =
for { ZIO
base <- CoreHasher.hashObject(path) .fromOption(cachedFileData)
etag <- ETagGenerator.eTag(path).map(MD5Hash(_)) .flatMap(fileData => FileSystem.getHashes(path, fileData))
} yield base + (ETag -> etag) .orElse(for {
base <- CoreHasher.hashObject(path, cachedFileData)
etag <- ETagGenerator.eTag(path).map(MD5Hash(_))
} yield base + (ETag -> etag))
override def hashObjectChunk(path: Path, override def hashObjectChunk(
chunkNumber: Long, path: Path,
chunkSize: Long) chunkNumber: Long,
: RIO[Hasher with FileSystem, Map[HashType, MD5Hash]] = chunkSize: Long): RIO[Hasher with FileSystem, Hashes] =
CoreHasher.hashObjectChunk(path, chunkNumber, chunkSize) CoreHasher.hashObjectChunk(path, chunkNumber, chunkSize)
override def hex(in: Array[Byte]): RIO[Hasher, String] = override def hex(in: Array[Byte]): RIO[Hasher, String] =
@ -38,6 +41,15 @@ object S3Hasher {
override def digest(in: String): RIO[Hasher, Array[Byte]] = override def digest(in: String): RIO[Hasher, Array[Byte]] =
CoreHasher.digest(in) CoreHasher.digest(in)
override def typeFrom(
str: String): ZIO[Hasher, IllegalArgumentException, HashType] =
if (str.contentEquals("ETag")) {
RIO.succeed(ETag)
} else {
CoreHasher.typeFrom(str)
}
} }
} }