Use with Zio
Setup
You will need these dependencies:
- sbt
libraryDependencies ++= Seq( "uk.gov.nationalarchives" % "da-s3-client_2.13" % "0.1.123", "dev.zio" % "zio-interop-reactivestreams_2.13" % "2.0.2", "dev.zio" % "zio-interop-cats_2.13" % "23.0.0.5" )
- Maven
<dependencies> <dependency> <groupId>uk.gov.nationalarchives</groupId> <artifactId>da-s3-client_2.13</artifactId> <version>0.1.123</version> </dependency> <dependency> <groupId>dev.zio</groupId> <artifactId>zio-interop-reactivestreams_2.13</artifactId> <version>2.0.2</version> </dependency> <dependency> <groupId>dev.zio</groupId> <artifactId>zio-interop-cats_2.13</artifactId> <version>23.0.0.5</version> </dependency> </dependencies>
- Gradle
dependencies { implementation "uk.gov.nationalarchives:da-s3-client_2.13:0.1.123" implementation "dev.zio:zio-interop-reactivestreams_2.13:2.0.2" implementation "dev.zio:zio-interop-cats_2.13:23.0.0.5" }
zio-interop-cats
is needed to allow us to use the ZIO Task with the cats type classes
zio-interop-reactivestreams
converts between a ZStream
and a Publisher
Examples
import zio.stream.Stream
import zio._
import software.amazon.awssdk.transfer.s3.model._
import java.nio.ByteBuffer
import zio.interop.reactivestreams._
import zio.interop.catz._
import uk.gov.nationalarchives.DAS3Client
val s3Client = DAS3Client[Task, zio.stream.Stream[Throwable, Byte]]()
def upload(stream: Stream[Throwable, Byte], contentLength: Long): Task[CompletedUpload] = for {
publisher <- stream.chunks.map(c => ByteBuffer.wrap(c.toArray[Byte])).toPublisher //Convert Stream[Throwable,Byte] to Publisher[ByteBuffer]
res <- s3Client.upload("bucket", "key", contentLength, publisher)
} yield res
def download(bucket: String, key: String) = {
s3Client.download(bucket, key)
.map(_.toZIOStream().map(_.get())) //Publisher[ByteBuffer] to Stream[ThrowableByte]
}
def copy(sourceBucket: String, sourceKey: String, destinationBucket: String, destinationKey: String): Task[CompletedCopy] = {
s3Client.copy(sourceBucket, sourceKey, destinationBucket, destinationKey)
}
def headObject(bucket: String, key: String): Task[HeadObjectResponse] = {
s3Client.headObject(bucket, key)
}
def deleteObjects(bucket: String, keys: List[String]): Task[DeleteObjectsResponse] = {
s3Client.deleteObjects(bucket, keys)
}
def listCommonPrefixes(bucket: String, keysPrefixedWith: String): Task[SdkPublisher[String]] = {
s3Client.listCommonPrefixes(bucket, keysPrefixedWith)
}