Not wrapping exceptions thrown in waitForUploadResult (#162)
* [storage-aws] Uploader move implementation to companion * [app] Program Refactoring * [storage-aws] AmazonTransferManager refactoring * [lib] UnversionedMirrorArchive refactoring * [console] Add Console.putStr * [uishell] UIShell show chosen actions * [storage-aws] AmazonTransferManager try to handle errors * [uishell] UIShell avoid line wrap with long file paths * [storage] Log when fetching remote summaries * Handle exceptions thrown in waitForUploadResult * [uishell] log errors * [console] Swap batch/non-batch error messages * fix tests
This commit is contained in:
parent
6adffd8da7
commit
5214bacc0b
18 changed files with 219 additions and 137 deletions
|
@ -5,7 +5,7 @@ import net.kemitix.eip.zio.{Message, MessageChannel}
|
||||||
import net.kemitix.thorp.cli.CliArgs
|
import net.kemitix.thorp.cli.CliArgs
|
||||||
import net.kemitix.thorp.config._
|
import net.kemitix.thorp.config._
|
||||||
import net.kemitix.thorp.console._
|
import net.kemitix.thorp.console._
|
||||||
import net.kemitix.thorp.domain.{Counters, StorageEvent}
|
import net.kemitix.thorp.domain.{Counters, SimpleLens, StorageEvent}
|
||||||
import net.kemitix.thorp.domain.StorageEvent.{
|
import net.kemitix.thorp.domain.StorageEvent.{
|
||||||
CopyEvent,
|
CopyEvent,
|
||||||
DeleteEvent,
|
DeleteEvent,
|
||||||
|
@ -52,7 +52,7 @@ trait Program {
|
||||||
: ZIO[Any,
|
: ZIO[Any,
|
||||||
Nothing,
|
Nothing,
|
||||||
MessageChannel.ESender[
|
MessageChannel.ESender[
|
||||||
Storage with Config with FileSystem with Hasher with Clock with FileScanner,
|
Storage with Config with FileSystem with Hasher with Clock with FileScanner with Console,
|
||||||
Throwable,
|
Throwable,
|
||||||
UIEvent]] = UIO { uiChannel =>
|
UIEvent]] = UIO { uiChannel =>
|
||||||
(for {
|
(for {
|
||||||
|
@ -97,13 +97,13 @@ trait Program {
|
||||||
|
|
||||||
private def countActivities: (Counters, StorageEvent) => Counters =
|
private def countActivities: (Counters, StorageEvent) => Counters =
|
||||||
(counters: Counters, s3Action: StorageEvent) => {
|
(counters: Counters, s3Action: StorageEvent) => {
|
||||||
val increment: Int => Int = _ + 1
|
def increment: SimpleLens[Counters, Int] => Counters =
|
||||||
|
_.modify(_ + 1)(counters)
|
||||||
s3Action match {
|
s3Action match {
|
||||||
case _: UploadEvent =>
|
case _: UploadEvent => increment(Counters.uploaded)
|
||||||
Counters.uploaded.modify(increment)(counters)
|
case _: CopyEvent => increment(Counters.copied)
|
||||||
case _: CopyEvent => Counters.copied.modify(increment)(counters)
|
case _: DeleteEvent => increment(Counters.deleted)
|
||||||
case _: DeleteEvent => Counters.deleted.modify(increment)(counters)
|
case _: ErrorEvent => increment(Counters.errors)
|
||||||
case _: ErrorEvent => Counters.errors.modify(increment)(counters)
|
|
||||||
case _ => counters
|
case _ => counters
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@ object Console {
|
||||||
trait Service {
|
trait Service {
|
||||||
def putMessageLn(line: ConsoleOut): ZIO[Console, Nothing, Unit]
|
def putMessageLn(line: ConsoleOut): ZIO[Console, Nothing, Unit]
|
||||||
def putStrLn(line: String): ZIO[Console, Nothing, Unit]
|
def putStrLn(line: String): ZIO[Console, Nothing, Unit]
|
||||||
|
def putStr(line: String): ZIO[Console, Nothing, Unit]
|
||||||
}
|
}
|
||||||
|
|
||||||
trait Live extends Console {
|
trait Live extends Console {
|
||||||
|
@ -24,9 +25,15 @@ object Console {
|
||||||
putStrLn(line.en)
|
putStrLn(line.en)
|
||||||
override def putStrLn(line: String): ZIO[Console, Nothing, Unit] =
|
override def putStrLn(line: String): ZIO[Console, Nothing, Unit] =
|
||||||
putStrLnPrintStream(SConsole.out)(line)
|
putStrLnPrintStream(SConsole.out)(line)
|
||||||
|
override def putStr(line: String): ZIO[Console, Nothing, Unit] =
|
||||||
|
putStrPrintStream(SConsole.out)(line)
|
||||||
final def putStrLnPrintStream(stream: PrintStream)(
|
final def putStrLnPrintStream(stream: PrintStream)(
|
||||||
line: String): ZIO[Console, Nothing, Unit] =
|
line: String): ZIO[Console, Nothing, Unit] =
|
||||||
UIO(SConsole.withOut(stream)(SConsole.println(line)))
|
UIO(SConsole.withOut(stream)(SConsole.println(line)))
|
||||||
|
final def putStrPrintStream(stream: PrintStream)(
|
||||||
|
line: String): ZIO[Console, Nothing, Unit] =
|
||||||
|
UIO(SConsole.withOut(stream)(SConsole.print(line)))
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,6 +53,10 @@ object Console {
|
||||||
ZIO.succeed(())
|
ZIO.succeed(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override def putStr(line: String): ZIO[Console, Nothing, Unit] = {
|
||||||
|
val _ = output.accumulateAndGet(List(line), (a, b) => a ++ b)
|
||||||
|
ZIO.succeed(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -57,6 +68,9 @@ object Console {
|
||||||
final def putStrLn(line: String): ZIO[Console, Nothing, Unit] =
|
final def putStrLn(line: String): ZIO[Console, Nothing, Unit] =
|
||||||
ZIO.accessM(_.console putStrLn line)
|
ZIO.accessM(_.console putStrLn line)
|
||||||
|
|
||||||
|
final def putStr(line: String): ZIO[Console, Nothing, Unit] =
|
||||||
|
ZIO.accessM(_.console.putStr(line))
|
||||||
|
|
||||||
final def putMessageLn(line: ConsoleOut): ZIO[Console, Nothing, Unit] =
|
final def putMessageLn(line: ConsoleOut): ZIO[Console, Nothing, Unit] =
|
||||||
ZIO.accessM(_.console putMessageLn line)
|
ZIO.accessM(_.console putMessageLn line)
|
||||||
|
|
||||||
|
|
|
@ -62,8 +62,8 @@ object ConsoleOut {
|
||||||
final case class ErrorQueueEventOccurred(action: ActionSummary, e: Throwable)
|
final case class ErrorQueueEventOccurred(action: ActionSummary, e: Throwable)
|
||||||
extends ConsoleOut.WithBatchMode {
|
extends ConsoleOut.WithBatchMode {
|
||||||
override def en: String =
|
override def en: String =
|
||||||
s"${action.name} failed: ${action.keys}: ${e.getMessage}"
|
|
||||||
override def enBatch: String =
|
|
||||||
s"${RED}ERROR:$RESET ${action.name} ${action.keys}: ${e.getMessage}$eraseToEndOfScreen"
|
s"${RED}ERROR:$RESET ${action.name} ${action.keys}: ${e.getMessage}$eraseToEndOfScreen"
|
||||||
|
override def enBatch: String =
|
||||||
|
s"${action.name} failed: ${action.keys}: ${e.getMessage}"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -100,7 +100,10 @@ object LocalFileSystem extends LocalFileSystem {
|
||||||
sequencedAction = SequencedAction(action, actionCounter)
|
sequencedAction = SequencedAction(action, actionCounter)
|
||||||
event <- archive.update(uiChannel, sequencedAction, bytesCounter)
|
event <- archive.update(uiChannel, sequencedAction, bytesCounter)
|
||||||
_ <- eventsRef.update(list => event :: list)
|
_ <- eventsRef.update(list => event :: list)
|
||||||
_ <- uiActionFinished(uiChannel)(action, actionCounter, bytesCounter)
|
_ <- uiActionFinished(uiChannel)(action,
|
||||||
|
actionCounter,
|
||||||
|
bytesCounter,
|
||||||
|
event)
|
||||||
} yield ()
|
} yield ()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -112,9 +115,11 @@ object LocalFileSystem extends LocalFileSystem {
|
||||||
private def uiActionFinished(uiChannel: UChannel[Any, UIEvent])(
|
private def uiActionFinished(uiChannel: UChannel[Any, UIEvent])(
|
||||||
action: Action,
|
action: Action,
|
||||||
actionCounter: Int,
|
actionCounter: Int,
|
||||||
bytesCounter: Long
|
bytesCounter: Long,
|
||||||
|
event: StorageEvent
|
||||||
) =
|
) =
|
||||||
Message.create(UIEvent.ActionFinished(action, actionCounter, bytesCounter)) >>=
|
Message.create(
|
||||||
|
UIEvent.ActionFinished(action, actionCounter, bytesCounter, event)) >>=
|
||||||
MessageChannel.send(uiChannel)
|
MessageChannel.send(uiChannel)
|
||||||
|
|
||||||
private def uiFileFound(uiChannel: UChannel[Any, UIEvent])(
|
private def uiFileFound(uiChannel: UChannel[Any, UIEvent])(
|
||||||
|
@ -246,7 +251,8 @@ object LocalFileSystem extends LocalFileSystem {
|
||||||
_ <- eventsRef.update(list => event :: list)
|
_ <- eventsRef.update(list => event :: list)
|
||||||
_ <- uiActionFinished(uiChannel)(action,
|
_ <- uiActionFinished(uiChannel)(action,
|
||||||
actionCounter,
|
actionCounter,
|
||||||
bytesCounter)
|
bytesCounter,
|
||||||
|
event)
|
||||||
} yield ()
|
} yield ()
|
||||||
}
|
}
|
||||||
} yield ()
|
} yield ()
|
||||||
|
|
|
@ -35,19 +35,30 @@ trait UnversionedMirrorArchive extends ThorpArchive {
|
||||||
localFile: LocalFile
|
localFile: LocalFile
|
||||||
) =
|
) =
|
||||||
for {
|
for {
|
||||||
batchMode <- Config.batchMode
|
settings <- listenerSettings(uiChannel,
|
||||||
upload <- Storage.upload(
|
index,
|
||||||
localFile,
|
totalBytesSoFar,
|
||||||
bucket,
|
bucket,
|
||||||
UploadEventListener.Settings(
|
localFile)
|
||||||
|
upload <- Storage.upload(localFile, bucket, settings)
|
||||||
|
} yield upload
|
||||||
|
|
||||||
|
private def listenerSettings(
|
||||||
uiChannel: UChannel[Any, UIEvent],
|
uiChannel: UChannel[Any, UIEvent],
|
||||||
|
index: Int,
|
||||||
|
totalBytesSoFar: Long,
|
||||||
|
bucket: Bucket,
|
||||||
|
localFile: LocalFile
|
||||||
|
) =
|
||||||
|
for {
|
||||||
|
batchMode <- Config.batchMode
|
||||||
|
} yield
|
||||||
|
UploadEventListener.Settings(uiChannel,
|
||||||
localFile,
|
localFile,
|
||||||
index,
|
index,
|
||||||
totalBytesSoFar,
|
totalBytesSoFar,
|
||||||
batchMode
|
batchMode)
|
||||||
)
|
|
||||||
)
|
|
||||||
} yield upload
|
|
||||||
}
|
}
|
||||||
|
|
||||||
object UnversionedMirrorArchive extends UnversionedMirrorArchive
|
object UnversionedMirrorArchive extends UnversionedMirrorArchive
|
||||||
|
|
|
@ -337,7 +337,7 @@ class LocalFileSystemTest extends FreeSpec {
|
||||||
String.format("action chosen : %s : %s",
|
String.format("action chosen : %s : %s",
|
||||||
action.remoteKey.key,
|
action.remoteKey.key,
|
||||||
action.getClass.getSimpleName)
|
action.getClass.getSimpleName)
|
||||||
case ActionFinished(action, actionCounter, bytesCounter) =>
|
case ActionFinished(action, actionCounter, bytesCounter, event) =>
|
||||||
String.format("action finished : %s : %s",
|
String.format("action finished : %s : %s",
|
||||||
action.remoteKey.key,
|
action.remoteKey.key,
|
||||||
action.getClass.getSimpleName)
|
action.getClass.getSimpleName)
|
||||||
|
|
|
@ -2,15 +2,12 @@ package net.kemitix.thorp.storage.aws
|
||||||
|
|
||||||
import com.amazonaws.services.s3.model.PutObjectRequest
|
import com.amazonaws.services.s3.model.PutObjectRequest
|
||||||
import com.amazonaws.services.s3.transfer.TransferManager
|
import com.amazonaws.services.s3.transfer.TransferManager
|
||||||
import net.kemitix.thorp.storage.aws.AmazonUpload.{
|
import net.kemitix.thorp.storage.aws.AmazonUpload.InProgress
|
||||||
CompletableUpload,
|
import zio.{Task, UIO, ZIO}
|
||||||
InProgress
|
|
||||||
}
|
|
||||||
import zio.{Task, UIO}
|
|
||||||
|
|
||||||
trait AmazonTransferManager {
|
trait AmazonTransferManager {
|
||||||
def shutdownNow(now: Boolean): UIO[Unit]
|
def shutdownNow(now: Boolean): UIO[Unit]
|
||||||
def upload: PutObjectRequest => Task[InProgress]
|
def upload: PutObjectRequest => UIO[InProgress]
|
||||||
}
|
}
|
||||||
|
|
||||||
object AmazonTransferManager {
|
object AmazonTransferManager {
|
||||||
|
@ -20,11 +17,17 @@ object AmazonTransferManager {
|
||||||
def shutdownNow(now: Boolean): UIO[Unit] =
|
def shutdownNow(now: Boolean): UIO[Unit] =
|
||||||
UIO(transferManager.shutdownNow(now))
|
UIO(transferManager.shutdownNow(now))
|
||||||
|
|
||||||
def upload: PutObjectRequest => Task[InProgress] =
|
def upload: PutObjectRequest => UIO[InProgress] =
|
||||||
putObjectRequest =>
|
putObjectRequest =>
|
||||||
Task(transferManager.upload(putObjectRequest))
|
transfer(transferManager, putObjectRequest)
|
||||||
.map(CompletableUpload)
|
.mapError(e => InProgress.Errored(e))
|
||||||
|
.catchAll(e => UIO(e))
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private def transfer(transferManager: TransferManager,
|
||||||
|
putObjectRequest: PutObjectRequest): Task[InProgress] =
|
||||||
|
ZIO
|
||||||
|
.effect(transferManager.upload(putObjectRequest))
|
||||||
|
.map(InProgress.CompletableUpload)
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,16 +2,27 @@ package net.kemitix.thorp.storage.aws
|
||||||
|
|
||||||
import com.amazonaws.services.s3.transfer.Upload
|
import com.amazonaws.services.s3.transfer.Upload
|
||||||
import com.amazonaws.services.s3.transfer.model.UploadResult
|
import com.amazonaws.services.s3.transfer.model.UploadResult
|
||||||
|
import zio.Task
|
||||||
|
|
||||||
object AmazonUpload {
|
object AmazonUpload {
|
||||||
|
|
||||||
|
// unsealed for testing :(
|
||||||
trait InProgress {
|
trait InProgress {
|
||||||
def waitForUploadResult: UploadResult
|
def waitForUploadResult: Task[UploadResult]
|
||||||
|
}
|
||||||
|
|
||||||
|
object InProgress {
|
||||||
|
|
||||||
|
final case class Errored(e: Throwable) extends InProgress {
|
||||||
|
override def waitForUploadResult: Task[UploadResult] =
|
||||||
|
Task.fail(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
final case class CompletableUpload(upload: Upload) extends InProgress {
|
final case class CompletableUpload(upload: Upload) extends InProgress {
|
||||||
override def waitForUploadResult: UploadResult =
|
override def waitForUploadResult: Task[UploadResult] =
|
||||||
upload.waitForUploadResult()
|
Task(upload.waitForUploadResult())
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ import com.amazonaws.services.s3.model.{
|
||||||
ListObjectsV2Result,
|
ListObjectsV2Result,
|
||||||
S3ObjectSummary
|
S3ObjectSummary
|
||||||
}
|
}
|
||||||
|
import net.kemitix.thorp.console.Console
|
||||||
import net.kemitix.thorp.domain.{Bucket, RemoteKey, RemoteObjects}
|
import net.kemitix.thorp.domain.{Bucket, RemoteKey, RemoteObjects}
|
||||||
import net.kemitix.thorp.storage.Storage
|
import net.kemitix.thorp.storage.Storage
|
||||||
import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash
|
import net.kemitix.thorp.storage.aws.S3ObjectsByHash.byHash
|
||||||
|
@ -21,7 +22,7 @@ trait Lister {
|
||||||
def listObjects(amazonS3: AmazonS3.Client)(
|
def listObjects(amazonS3: AmazonS3.Client)(
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
prefix: RemoteKey
|
prefix: RemoteKey
|
||||||
): RIO[Storage, RemoteObjects] = {
|
): RIO[Storage with Console, RemoteObjects] = {
|
||||||
|
|
||||||
def request =
|
def request =
|
||||||
new ListObjectsV2Request()
|
new ListObjectsV2Request()
|
||||||
|
@ -31,16 +32,19 @@ trait Lister {
|
||||||
def requestMore: Token => ListObjectsV2Request =
|
def requestMore: Token => ListObjectsV2Request =
|
||||||
token => request.withContinuationToken(token)
|
token => request.withContinuationToken(token)
|
||||||
|
|
||||||
def fetchBatch: ListObjectsV2Request => Task[Batch] =
|
def fetchBatch: ListObjectsV2Request => RIO[Console, Batch] =
|
||||||
request => tryFetchBatch(amazonS3)(request)
|
request =>
|
||||||
|
for {
|
||||||
|
_ <- Console.putStrLn("Fetching remote summaries...")
|
||||||
|
batch <- tryFetchBatch(amazonS3)(request)
|
||||||
|
} yield batch
|
||||||
|
|
||||||
def fetchMore: Option[Token] => Task[LazyList[S3ObjectSummary]] = {
|
def fetchMore: Option[Token] => RIO[Console, LazyList[S3ObjectSummary]] = {
|
||||||
case None => RIO.succeed(LazyList.empty)
|
case None => RIO.succeed(LazyList.empty)
|
||||||
case Some(token) => fetch(requestMore(token))
|
case Some(token) => fetch(requestMore(token))
|
||||||
}
|
}
|
||||||
|
|
||||||
def fetch: ListObjectsV2Request => Task[LazyList[S3ObjectSummary]] =
|
def fetch: ListObjectsV2Request => RIO[Console, LazyList[S3ObjectSummary]] =
|
||||||
|
|
||||||
request =>
|
request =>
|
||||||
for {
|
for {
|
||||||
batch <- fetchBatch(request)
|
batch <- fetchBatch(request)
|
||||||
|
|
|
@ -1,10 +0,0 @@
|
||||||
package net.kemitix.thorp.storage.aws
|
|
||||||
|
|
||||||
import net.kemitix.thorp.console._
|
|
||||||
import zio.RIO
|
|
||||||
|
|
||||||
trait ListerLogger {
|
|
||||||
def logFetchBatch: RIO[Console, Unit] =
|
|
||||||
Console.putStrLn("Fetching remote summaries...")
|
|
||||||
}
|
|
||||||
object ListerLogger extends ListerLogger
|
|
|
@ -2,6 +2,7 @@ package net.kemitix.thorp.storage.aws
|
||||||
|
|
||||||
import com.amazonaws.services.s3.AmazonS3ClientBuilder
|
import com.amazonaws.services.s3.AmazonS3ClientBuilder
|
||||||
import com.amazonaws.services.s3.transfer.TransferManagerBuilder
|
import com.amazonaws.services.s3.transfer.TransferManagerBuilder
|
||||||
|
import net.kemitix.thorp.console.Console
|
||||||
import net.kemitix.thorp.domain.StorageEvent.ShutdownEvent
|
import net.kemitix.thorp.domain.StorageEvent.ShutdownEvent
|
||||||
import net.kemitix.thorp.domain._
|
import net.kemitix.thorp.domain._
|
||||||
import net.kemitix.thorp.storage.Storage
|
import net.kemitix.thorp.storage.Storage
|
||||||
|
@ -19,8 +20,9 @@ object S3Storage {
|
||||||
AmazonTransferManager.Wrapper(
|
AmazonTransferManager.Wrapper(
|
||||||
TransferManagerBuilder.defaultTransferManager)
|
TransferManagerBuilder.defaultTransferManager)
|
||||||
|
|
||||||
override def listObjects(bucket: Bucket,
|
override def listObjects(
|
||||||
prefix: RemoteKey): RIO[Storage, RemoteObjects] =
|
bucket: Bucket,
|
||||||
|
prefix: RemoteKey): RIO[Storage with Console, RemoteObjects] =
|
||||||
Lister.listObjects(client)(bucket, prefix)
|
Lister.listObjects(client)(bucket, prefix)
|
||||||
|
|
||||||
override def upload(
|
override def upload(
|
||||||
|
|
|
@ -11,13 +11,7 @@ import net.kemitix.thorp.domain.StorageEvent.{
|
||||||
ErrorEvent,
|
ErrorEvent,
|
||||||
UploadEvent
|
UploadEvent
|
||||||
}
|
}
|
||||||
import net.kemitix.thorp.domain.{
|
import net.kemitix.thorp.domain._
|
||||||
Bucket,
|
|
||||||
LocalFile,
|
|
||||||
MD5Hash,
|
|
||||||
RemoteKey,
|
|
||||||
StorageEvent
|
|
||||||
}
|
|
||||||
import net.kemitix.thorp.storage.aws.Uploader.Request
|
import net.kemitix.thorp.storage.aws.Uploader.Request
|
||||||
import net.kemitix.thorp.uishell.UploadProgressEvent.{
|
import net.kemitix.thorp.uishell.UploadProgressEvent.{
|
||||||
ByteTransferEvent,
|
ByteTransferEvent,
|
||||||
|
@ -29,43 +23,48 @@ import zio.UIO
|
||||||
|
|
||||||
trait Uploader {
|
trait Uploader {
|
||||||
|
|
||||||
def upload(transferManager: => AmazonTransferManager)(
|
def upload(
|
||||||
request: Request): UIO[StorageEvent] =
|
transferManager: => AmazonTransferManager
|
||||||
transfer(transferManager)(request)
|
)(request: Request): UIO[StorageEvent] =
|
||||||
.catchAll(handleError(request.localFile.remoteKey))
|
transfer(
|
||||||
|
transferManager,
|
||||||
|
putObjectRequest(request),
|
||||||
|
request.localFile.remoteKey
|
||||||
|
)
|
||||||
|
|
||||||
private def handleError(remoteKey: RemoteKey)(
|
private def transfer(transferManager: AmazonTransferManager,
|
||||||
e: Throwable): UIO[StorageEvent] =
|
putObjectRequest: PutObjectRequest,
|
||||||
UIO(ErrorEvent(ActionSummary.Upload(remoteKey.key), remoteKey, e))
|
remoteKey: RemoteKey): UIO[StorageEvent] = {
|
||||||
|
|
||||||
private def transfer(transferManager: => AmazonTransferManager)(
|
|
||||||
request: Request
|
|
||||||
) =
|
|
||||||
dispatch(transferManager)(putObjectRequest(request))
|
|
||||||
|
|
||||||
private def dispatch(transferManager: AmazonTransferManager)(
|
|
||||||
putObjectRequest: PutObjectRequest
|
|
||||||
) = {
|
|
||||||
transferManager
|
transferManager
|
||||||
.upload(putObjectRequest)
|
.upload(putObjectRequest)
|
||||||
.map(_.waitForUploadResult)
|
.flatMap(_.waitForUploadResult)
|
||||||
.map(uploadResult =>
|
.map(
|
||||||
UploadEvent(RemoteKey(uploadResult.getKey),
|
uploadResult =>
|
||||||
MD5Hash(uploadResult.getETag)))
|
UploadEvent(
|
||||||
|
RemoteKey(uploadResult.getKey),
|
||||||
|
MD5Hash(uploadResult.getETag)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.catchAll(handleError(remoteKey))
|
||||||
}
|
}
|
||||||
|
|
||||||
private def putObjectRequest(
|
private def handleError(
|
||||||
request: Request
|
remoteKey: RemoteKey
|
||||||
) = {
|
)(e: Throwable): UIO[StorageEvent] =
|
||||||
|
UIO(ErrorEvent(ActionSummary.Upload(remoteKey.key), remoteKey, e))
|
||||||
|
|
||||||
|
private def putObjectRequest(request: Request) = {
|
||||||
val putRequest =
|
val putRequest =
|
||||||
new PutObjectRequest(request.bucket.name,
|
new PutObjectRequest(
|
||||||
|
request.bucket.name,
|
||||||
request.localFile.remoteKey.key,
|
request.localFile.remoteKey.key,
|
||||||
request.localFile.file)
|
request.localFile.file
|
||||||
.withMetadata(metadata(request.localFile))
|
).withMetadata(metadata(request.localFile))
|
||||||
if (request.uploadEventListener.batchMode) putRequest
|
if (request.uploadEventListener.batchMode) putRequest
|
||||||
else
|
else
|
||||||
putRequest.withGeneralProgressListener(
|
putRequest.withGeneralProgressListener(
|
||||||
progressListener(request.uploadEventListener))
|
progressListener(request.uploadEventListener)
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def metadata: LocalFile => ObjectMetadata = localFile => {
|
private def metadata: LocalFile => ObjectMetadata = localFile => {
|
||||||
|
@ -98,9 +97,11 @@ trait Uploader {
|
||||||
case e: ProgressEvent if isByteTransfer(e) =>
|
case e: ProgressEvent if isByteTransfer(e) =>
|
||||||
ByteTransferEvent(e.getEventType.name)
|
ByteTransferEvent(e.getEventType.name)
|
||||||
case e: ProgressEvent =>
|
case e: ProgressEvent =>
|
||||||
RequestEvent(e.getEventType.name,
|
RequestEvent(
|
||||||
|
e.getEventType.name,
|
||||||
e.getBytes,
|
e.getBytes,
|
||||||
e.getBytesTransferred)
|
e.getBytesTransferred
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -108,9 +109,7 @@ trait Uploader {
|
||||||
}
|
}
|
||||||
|
|
||||||
object Uploader extends Uploader {
|
object Uploader extends Uploader {
|
||||||
final case class Request(
|
final case class Request(localFile: LocalFile,
|
||||||
localFile: LocalFile,
|
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
uploadEventListener: UploadEventListener.Settings
|
uploadEventListener: UploadEventListener.Settings)
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package net.kemitix.thorp.storage.aws
|
package net.kemitix.thorp.storage.aws
|
||||||
|
|
||||||
|
import net.kemitix.thorp.console.Console
|
||||||
import net.kemitix.thorp.domain.StorageEvent.ShutdownEvent
|
import net.kemitix.thorp.domain.StorageEvent.ShutdownEvent
|
||||||
import net.kemitix.thorp.domain._
|
import net.kemitix.thorp.domain._
|
||||||
import net.kemitix.thorp.storage.Storage
|
import net.kemitix.thorp.storage.Storage
|
||||||
|
@ -28,7 +29,7 @@ trait AmazonS3ClientTestFixture extends MockFactory {
|
||||||
override def listObjects(
|
override def listObjects(
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
prefix: RemoteKey
|
prefix: RemoteKey
|
||||||
): RIO[Storage, RemoteObjects] =
|
): RIO[Storage with Console, RemoteObjects] =
|
||||||
Lister.listObjects(client)(bucket, prefix)
|
Lister.listObjects(client)(bucket, prefix)
|
||||||
|
|
||||||
override def upload(
|
override def upload(
|
||||||
|
|
|
@ -8,11 +8,12 @@ import com.amazonaws.services.s3.model.{
|
||||||
ListObjectsV2Result,
|
ListObjectsV2Result,
|
||||||
S3ObjectSummary
|
S3ObjectSummary
|
||||||
}
|
}
|
||||||
|
import net.kemitix.thorp.console.Console
|
||||||
import net.kemitix.thorp.domain._
|
import net.kemitix.thorp.domain._
|
||||||
import net.kemitix.thorp.storage.Storage
|
import net.kemitix.thorp.storage.Storage
|
||||||
import org.scalatest.FreeSpec
|
import org.scalatest.FreeSpec
|
||||||
import org.scalatest.Matchers._
|
import org.scalatest.Matchers._
|
||||||
import zio.{DefaultRuntime, Task, UIO}
|
import zio.{DefaultRuntime, RIO, Task, UIO}
|
||||||
|
|
||||||
class ListerTest extends FreeSpec {
|
class ListerTest extends FreeSpec {
|
||||||
|
|
||||||
|
@ -114,12 +115,13 @@ class ListerTest extends FreeSpec {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
def invoke(amazonS3Client: AmazonS3.Client)(bucket: Bucket,
|
def invoke(amazonS3Client: AmazonS3.Client)(bucket: Bucket,
|
||||||
prefix: RemoteKey) =
|
prefix: RemoteKey) = {
|
||||||
new DefaultRuntime {}.unsafeRunSync {
|
object TestEnv extends Storage.Test with Console.Test
|
||||||
Lister
|
val program: RIO[Storage with Console, RemoteObjects] = Lister
|
||||||
.listObjects(amazonS3Client)(bucket, prefix)
|
.listObjects(amazonS3Client)(bucket, prefix)
|
||||||
.provide(Storage.Test)
|
val runtime = new DefaultRuntime {}
|
||||||
}.toEither
|
runtime.unsafeRunSync(program.provide(TestEnv)).toEither
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,7 @@ import net.kemitix.thorp.domain.StorageEvent.{
|
||||||
import net.kemitix.thorp.domain._
|
import net.kemitix.thorp.domain._
|
||||||
import org.scalamock.scalatest.MockFactory
|
import org.scalamock.scalatest.MockFactory
|
||||||
import org.scalatest.FreeSpec
|
import org.scalatest.FreeSpec
|
||||||
import zio.{DefaultRuntime, Task}
|
import zio.{DefaultRuntime, Task, UIO}
|
||||||
import net.kemitix.thorp.filesystem.Resource
|
import net.kemitix.thorp.filesystem.Resource
|
||||||
import net.kemitix.thorp.uishell.{UIEvent, UploadEventListener}
|
import net.kemitix.thorp.uishell.{UIEvent, UploadEventListener}
|
||||||
|
|
||||||
|
@ -35,18 +35,19 @@ class UploaderTest extends FreeSpec with MockFactory {
|
||||||
val uploadResult = new UploadResult
|
val uploadResult = new UploadResult
|
||||||
uploadResult.setKey(remoteKey.key)
|
uploadResult.setKey(remoteKey.key)
|
||||||
uploadResult.setETag(MD5Hash.hash(aHash))
|
uploadResult.setETag(MD5Hash.hash(aHash))
|
||||||
val inProgress = new AmazonUpload.InProgress {
|
|
||||||
override def waitForUploadResult: UploadResult = uploadResult
|
|
||||||
}
|
|
||||||
val listenerSettings =
|
val listenerSettings =
|
||||||
UploadEventListener.Settings(uiChannel, localFile, 0, 0, batchMode = true)
|
UploadEventListener.Settings(uiChannel, localFile, 0, 0, batchMode = true)
|
||||||
"when no error" in {
|
"when no error" in {
|
||||||
val expected =
|
val expected =
|
||||||
Right(UploadEvent(remoteKey, aHash))
|
Right(UploadEvent(remoteKey, aHash))
|
||||||
|
val inProgress = new AmazonUpload.InProgress {
|
||||||
|
override def waitForUploadResult: Task[UploadResult] =
|
||||||
|
Task(uploadResult)
|
||||||
|
}
|
||||||
new AmazonS3ClientTestFixture {
|
new AmazonS3ClientTestFixture {
|
||||||
(fixture.amazonS3TransferManager.upload _)
|
(fixture.amazonS3TransferManager.upload _)
|
||||||
.when()
|
.when()
|
||||||
.returns(_ => Task.succeed(inProgress))
|
.returns(_ => UIO.succeed(inProgress))
|
||||||
private val result =
|
private val result =
|
||||||
invoke(fixture.amazonS3TransferManager)(
|
invoke(fixture.amazonS3TransferManager)(
|
||||||
localFile,
|
localFile,
|
||||||
|
@ -61,10 +62,14 @@ class UploaderTest extends FreeSpec with MockFactory {
|
||||||
val expected =
|
val expected =
|
||||||
Right(
|
Right(
|
||||||
ErrorEvent(ActionSummary.Upload(remoteKey.key), remoteKey, exception))
|
ErrorEvent(ActionSummary.Upload(remoteKey.key), remoteKey, exception))
|
||||||
|
val inProgress = new AmazonUpload.InProgress {
|
||||||
|
override def waitForUploadResult: Task[UploadResult] =
|
||||||
|
Task.fail(exception)
|
||||||
|
}
|
||||||
new AmazonS3ClientTestFixture {
|
new AmazonS3ClientTestFixture {
|
||||||
(fixture.amazonS3TransferManager.upload _)
|
(fixture.amazonS3TransferManager.upload _)
|
||||||
.when()
|
.when()
|
||||||
.returns(_ => Task.fail(exception))
|
.returns(_ => UIO.succeed(inProgress))
|
||||||
private val result =
|
private val result =
|
||||||
invoke(fixture.amazonS3TransferManager)(
|
invoke(fixture.amazonS3TransferManager)(
|
||||||
localFile,
|
localFile,
|
||||||
|
@ -79,10 +84,14 @@ class UploaderTest extends FreeSpec with MockFactory {
|
||||||
val expected =
|
val expected =
|
||||||
Right(
|
Right(
|
||||||
ErrorEvent(ActionSummary.Upload(remoteKey.key), remoteKey, exception))
|
ErrorEvent(ActionSummary.Upload(remoteKey.key), remoteKey, exception))
|
||||||
|
val inProgress = new AmazonUpload.InProgress {
|
||||||
|
override def waitForUploadResult: Task[UploadResult] =
|
||||||
|
Task.fail(exception)
|
||||||
|
}
|
||||||
new AmazonS3ClientTestFixture {
|
new AmazonS3ClientTestFixture {
|
||||||
(fixture.amazonS3TransferManager.upload _)
|
(fixture.amazonS3TransferManager.upload _)
|
||||||
.when()
|
.when()
|
||||||
.returns(_ => Task.fail(exception))
|
.returns(_ => UIO.succeed(inProgress))
|
||||||
private val result =
|
private val result =
|
||||||
invoke(fixture.amazonS3TransferManager)(
|
invoke(fixture.amazonS3TransferManager)(
|
||||||
localFile,
|
localFile,
|
||||||
|
@ -97,14 +106,15 @@ class UploaderTest extends FreeSpec with MockFactory {
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
listenerSettings: UploadEventListener.Settings
|
listenerSettings: UploadEventListener.Settings
|
||||||
) = {
|
) = {
|
||||||
type TestEnv = Config
|
val program = Uploader
|
||||||
val testEnv: TestEnv = Config.Live
|
|
||||||
new DefaultRuntime {}.unsafeRunSync {
|
|
||||||
Uploader
|
|
||||||
.upload(transferManager)(
|
.upload(transferManager)(
|
||||||
Uploader.Request(localFile, bucket, listenerSettings))
|
Uploader.Request(localFile, bucket, listenerSettings))
|
||||||
.provide(testEnv)
|
val runtime = new DefaultRuntime {}
|
||||||
}.toEither
|
runtime
|
||||||
|
.unsafeRunSync(
|
||||||
|
program
|
||||||
|
.provide(Config.Live))
|
||||||
|
.toEither
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package net.kemitix.thorp.storage
|
package net.kemitix.thorp.storage
|
||||||
|
|
||||||
|
import net.kemitix.thorp.console.Console
|
||||||
import net.kemitix.thorp.domain._
|
import net.kemitix.thorp.domain._
|
||||||
import net.kemitix.thorp.uishell.UploadEventListener
|
import net.kemitix.thorp.uishell.UploadEventListener
|
||||||
import zio.{RIO, Task, UIO, ZIO}
|
import zio.{RIO, Task, UIO, ZIO}
|
||||||
|
@ -14,7 +15,7 @@ object Storage {
|
||||||
def listObjects(
|
def listObjects(
|
||||||
bucket: Bucket,
|
bucket: Bucket,
|
||||||
prefix: RemoteKey
|
prefix: RemoteKey
|
||||||
): RIO[Storage, RemoteObjects]
|
): RIO[Storage with Console, RemoteObjects]
|
||||||
|
|
||||||
def upload(
|
def upload(
|
||||||
localFile: LocalFile,
|
localFile: LocalFile,
|
||||||
|
@ -83,7 +84,7 @@ object Storage {
|
||||||
object Test extends Test
|
object Test extends Test
|
||||||
|
|
||||||
final def list(bucket: Bucket,
|
final def list(bucket: Bucket,
|
||||||
prefix: RemoteKey): RIO[Storage, RemoteObjects] =
|
prefix: RemoteKey): RIO[Storage with Console, RemoteObjects] =
|
||||||
ZIO.accessM(_.storage listObjects (bucket, prefix))
|
ZIO.accessM(_.storage listObjects (bucket, prefix))
|
||||||
|
|
||||||
final def upload(
|
final def upload(
|
||||||
|
|
|
@ -29,7 +29,8 @@ object UIEvent {
|
||||||
|
|
||||||
case class ActionFinished(action: Action,
|
case class ActionFinished(action: Action,
|
||||||
actionCounter: Int,
|
actionCounter: Int,
|
||||||
bytesCounter: Long)
|
bytesCounter: Long,
|
||||||
|
event: StorageEvent)
|
||||||
extends UIEvent
|
extends UIEvent
|
||||||
|
|
||||||
case class KeyFound(remoteKey: RemoteKey) extends UIEvent
|
case class KeyFound(remoteKey: RemoteKey) extends UIEvent
|
||||||
|
|
|
@ -5,6 +5,7 @@ import net.kemitix.thorp.config.Config
|
||||||
import net.kemitix.thorp.console.ConsoleOut.{
|
import net.kemitix.thorp.console.ConsoleOut.{
|
||||||
CopyComplete,
|
CopyComplete,
|
||||||
DeleteComplete,
|
DeleteComplete,
|
||||||
|
ErrorQueueEventOccurred,
|
||||||
UploadComplete
|
UploadComplete
|
||||||
}
|
}
|
||||||
import net.kemitix.thorp.console.{Console, ConsoleOut}
|
import net.kemitix.thorp.console.{Console, ConsoleOut}
|
||||||
|
@ -29,12 +30,13 @@ object UIShell {
|
||||||
case UIEvent.RemoteDataFetched(size) => remoteDataFetched(size)
|
case UIEvent.RemoteDataFetched(size) => remoteDataFetched(size)
|
||||||
case UIEvent.ShowSummary(counters) => showSummary(counters)
|
case UIEvent.ShowSummary(counters) => showSummary(counters)
|
||||||
case UIEvent.FileFound(localFile) => fileFound(localFile)
|
case UIEvent.FileFound(localFile) => fileFound(localFile)
|
||||||
case UIEvent.ActionChosen(action) => UIO(())
|
case UIEvent.ActionChosen(action) => actionChosen(action)
|
||||||
case UIEvent.AwaitingAnotherUpload(remoteKey, hash) =>
|
case UIEvent.AwaitingAnotherUpload(remoteKey, hash) =>
|
||||||
awaitingUpload(remoteKey, hash)
|
awaitingUpload(remoteKey, hash)
|
||||||
case UIEvent.AnotherUploadWaitComplete(action) =>
|
case UIEvent.AnotherUploadWaitComplete(action) =>
|
||||||
uploadWaitComplete(action)
|
uploadWaitComplete(action)
|
||||||
case UIEvent.ActionFinished(action, _, _) => actionFinished(action)
|
case UIEvent.ActionFinished(_, _, _, event) =>
|
||||||
|
actionFinished(event)
|
||||||
case UIEvent.KeyFound(_) => UIO(())
|
case UIEvent.KeyFound(_) => UIO(())
|
||||||
case UIEvent.RequestCycle(localFile,
|
case UIEvent.RequestCycle(localFile,
|
||||||
bytesTransferred,
|
bytesTransferred,
|
||||||
|
@ -45,17 +47,20 @@ object UIShell {
|
||||||
}
|
}
|
||||||
|
|
||||||
private def actionFinished(
|
private def actionFinished(
|
||||||
action: Action): ZIO[Console with Config, Nothing, Unit] =
|
event: StorageEvent): ZIO[Console with Config, Nothing, Unit] =
|
||||||
for {
|
for {
|
||||||
batchMode <- Config.batchMode
|
batchMode <- Config.batchMode
|
||||||
_ <- action match {
|
_ <- event match {
|
||||||
case _: Action.DoNothing => UIO(())
|
case StorageEvent.DoNothingEvent(remoteKey) => UIO.unit
|
||||||
case ToUpload(_, localFile, _) =>
|
case StorageEvent.CopyEvent(sourceKey, targetKey) =>
|
||||||
Console.putMessageLnB(UploadComplete(localFile.remoteKey), batchMode)
|
|
||||||
case Action.ToCopy(_, sourceKey, _, targetKey, _) =>
|
|
||||||
Console.putMessageLnB(CopyComplete(sourceKey, targetKey), batchMode)
|
Console.putMessageLnB(CopyComplete(sourceKey, targetKey), batchMode)
|
||||||
case Action.ToDelete(_, remoteKey, _) =>
|
case StorageEvent.UploadEvent(remoteKey, md5Hash) =>
|
||||||
|
Console.putMessageLnB(UploadComplete(remoteKey), batchMode)
|
||||||
|
case StorageEvent.DeleteEvent(remoteKey) =>
|
||||||
Console.putMessageLnB(DeleteComplete(remoteKey), batchMode)
|
Console.putMessageLnB(DeleteComplete(remoteKey), batchMode)
|
||||||
|
case StorageEvent.ErrorEvent(action, remoteKey, e) =>
|
||||||
|
Console.putMessageLnB(ErrorQueueEventOccurred(action, e), batchMode)
|
||||||
|
case StorageEvent.ShutdownEvent() => UIO.unit
|
||||||
}
|
}
|
||||||
} yield ()
|
} yield ()
|
||||||
|
|
||||||
|
@ -111,4 +116,26 @@ object UIShell {
|
||||||
s"${Terminal.cursorPrevLine(statusHeight)}")
|
s"${Terminal.cursorPrevLine(statusHeight)}")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private def actionAsString(action: Action): String = action match {
|
||||||
|
case Action.DoNothing(bucket, remoteKey, size) =>
|
||||||
|
s"Do nothing: ${remoteKey.key}"
|
||||||
|
case ToUpload(bucket, localFile, size) => s"Upload: ${localFile.remoteKey}"
|
||||||
|
case Action.ToCopy(bucket, sourceKey, hash, targetKey, size) =>
|
||||||
|
s"Copy: ${sourceKey.key} => ${targetKey.key}"
|
||||||
|
case Action.ToDelete(bucket, remoteKey, size) => s"Delete: ${remoteKey.key}"
|
||||||
|
}
|
||||||
|
|
||||||
|
def trimHeadTerminal(str: String): String = {
|
||||||
|
val width = Terminal.width
|
||||||
|
str.length match {
|
||||||
|
case l if l > width => str.substring(l - width)
|
||||||
|
case _ => str
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def actionChosen(action: Action): ZIO[Console with Config, Nothing, Unit] = {
|
||||||
|
Console.putStr(
|
||||||
|
trimHeadTerminal(actionAsString(action)) + eraseLineForward + "\r")
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue