From 242804294dc3a074bcb1ba3d02fac789eff790d5 Mon Sep 17 00:00:00 2001 From: Paul Campbell Date: Wed, 14 Aug 2019 10:10:23 +0100 Subject: [PATCH] [storage-aws] force upload progress updates to strick sequence Prevents the situation where a second progress update arrives while the first is still on the process of being processed. The new update is blocked until the first is completed. --- .../main/scala/net/kemitix/thorp/storage/aws/Uploader.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala index b14427f..b6dead7 100644 --- a/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala +++ b/storage-aws/src/main/scala/net/kemitix/thorp/storage/aws/Uploader.scala @@ -1,5 +1,7 @@ package net.kemitix.thorp.storage.aws +import java.util.concurrent.locks.StampedLock + import com.amazonaws.event.ProgressEventType.RESPONSE_BYTE_TRANSFER_EVENT import com.amazonaws.event.{ProgressEvent, ProgressListener} import com.amazonaws.services.s3.model.{ObjectMetadata, PutObjectRequest} @@ -75,8 +77,11 @@ trait Uploader { listenerSettings => new ProgressListener { private val listener = UploadEventListener.listener(listenerSettings) + private val lock = new StampedLock override def progressChanged(progressEvent: ProgressEvent): Unit = { + val writeLock = lock.writeLock() listener(eventHandler(progressEvent)) + lock.unlock(writeLock) } private def eventHandler: ProgressEvent => UploadEvent =