Use with Zio

Setup

You will need these dependencies:

sbt
libraryDependencies ++= Seq(
  "uk.gov.nationalarchives" % "da-s3-client_2.13" % "0.1.123",
  "dev.zio" % "zio-interop-reactivestreams_2.13" % "2.0.2",
  "dev.zio" % "zio-interop-cats_2.13" % "23.0.0.5"
)
Maven
<dependencies>
  <dependency>
    <groupId>uk.gov.nationalarchives</groupId>
    <artifactId>da-s3-client_2.13</artifactId>
    <version>0.1.123</version>
  </dependency>
  <dependency>
    <groupId>dev.zio</groupId>
    <artifactId>zio-interop-reactivestreams_2.13</artifactId>
    <version>2.0.2</version>
  </dependency>
  <dependency>
    <groupId>dev.zio</groupId>
    <artifactId>zio-interop-cats_2.13</artifactId>
    <version>23.0.0.5</version>
  </dependency>
</dependencies>
Gradle
dependencies {
  implementation "uk.gov.nationalarchives:da-s3-client_2.13:0.1.123"
  implementation "dev.zio:zio-interop-reactivestreams_2.13:2.0.2"
  implementation "dev.zio:zio-interop-cats_2.13:23.0.0.5"
}

zio-interop-cats is needed to allow us to use the ZIO Task with the cats type classes

zio-interop-reactivestreams converts between a ZStream and a Publisher

Examples

import zio.stream.Stream
import zio._
import software.amazon.awssdk.transfer.s3.model._
import java.nio.ByteBuffer
import zio.interop.reactivestreams._
import zio.interop.catz._
import uk.gov.nationalarchives.DAS3Client

val s3Client = DAS3Client[Task, zio.stream.Stream[Throwable, Byte]]()
def upload(stream: Stream[Throwable, Byte], contentLength: Long): Task[CompletedUpload] = for {
  publisher <- stream.chunks.map(c => ByteBuffer.wrap(c.toArray[Byte])).toPublisher //Convert Stream[Throwable,Byte] to Publisher[ByteBuffer]
  res <- s3Client.upload("bucket", "key", contentLength, publisher)
} yield res

def download(bucket: String, key: String) = {
  s3Client.download(bucket, key)
    .map(_.toZIOStream().map(_.get())) //Publisher[ByteBuffer] to Stream[ThrowableByte]
}

def copy(sourceBucket: String, sourceKey: String, destinationBucket: String, destinationKey: String): Task[CompletedCopy] = {
  s3Client.copy(sourceBucket, sourceKey, destinationBucket, destinationKey)
}

def headObject(bucket: String, key: String): Task[HeadObjectResponse] = {
  s3Client.headObject(bucket, key)
}

def deleteObjects(bucket: String, keys: List[String]): Task[DeleteObjectsResponse] = {
  s3Client.deleteObjects(bucket, keys)
}

def listCommonPrefixes(bucket: String, keysPrefixedWith: String): Task[SdkPublisher[String]] = {
  s3Client.listCommonPrefixes(bucket, keysPrefixedWith)
}