ZIO - Scalaで型安全に合成可能な非同期処理を実現する

はじめに

はじめまして!昨年10月に中途入社したエンジニアのおかむです。
エンジニアとしてはそれなりに長くやってきて、インフラからフロントエンドまで幅広く経験しています。
苗字が社長の岡村さん(2021年7月からは会長になります)と同じで紛らわしいため、Adwaysでは愛称で呼んでもらってます。
(実は年齢も岡村さんと同じです)

自己紹介は短く切り上げて本題に入りましょう。

以下、Scalaのサンプルコードではmdocを使っています。

ZIOとは何か

https://zio.dev f:id:AdwaysEngineerBlog:20210426192201p:plain

ZIOは、Scalaで作用を扱う関数型プログラミングのライブラリです。

「作用」というのはちょっとわかりづらいので、多少の語弊は恐れず「処理」と言い換えても大きな問題はないと思います。
(英語ではeffectとよばれます)

ZIOで中心となる「作用を表す型」がzio.ZIO です。
scala.concurrent.Futureを知っているのであれば、遅延評価されるFutureのような型であると考えてもらってもいいでしょう。
類似の型に、cats-effectIOmonixTask が挙げられます。

まずはZIOという型の定義を見てみましょう
https://github.com/zio/zio/blob/bb2d0de772f22295afa3c5bf69808d39e6d9ef66/core/shared/src/main/scala/zio/ZIO.scala#L52

sealed trait ZIO[-R, +E, +A] extends Serializable with ZIOPlatformSpecific[R, E, A]

R, E, Aと3つの型パラメータをとる型であることがわかります。

  • R: 環境を表す型 (Environment/Requirement)
    作用を実行するのに必要な「環境」を表す型です。これがAnyの場合、必要な環境が存在せずそのまま実行できることを示します。
    ZIOでは、このR型を使ってDIのようなことが実現できます。

  • E: 失敗を表す型 (Failure type)
    この作用を実行するとどの様なエラーが発生する可能性があるかを表します。アプリケーションによっては一律Throwableを使って、特にエラー型を限定しない場合もあります (実質Futurecats.IOなどと同等)。
    Nothingの場合「予期される失敗はない」ということを表します。
    (絶対に失敗しないという保証ではありません。この値を取り扱うユーザがエラー処理を考える必要がないということです。)

  • A: 成功した作用の結果を表す型 (Success type)
    これがUnitの場合、特に意味のある値を返さないということです。もしNothingの場合、この作用が完了しない(失敗するまで永久に続く)ことを表します。

例えば ZIO[Any, IOException, Option[Int]] という場合、必要な環境型はなく、IOExceptionで失敗する可能性があり、成功するとOption[Int]を返す作用を表す型になります。

ZIO[R, E, A]の値は、R => Either[E, A]という関数にも似ていますが、非同期や並行制御など複雑な作用を扱えたりモナディックな合成ができるなど、より高度な機能を提供します。

ZIOにおいて作用を表す型はZIOのみですが、よく使われる型パラメータの組み合わせにはエイリアスが用意されています。
下記の様なエイリアスを利用することで、冗長な型の記述をある程度は避けられる様になっています。

type Task[A] = ZIO[Any, Throwable, A]
type IO[E, A] = ZIO[Any, E, A]
type UIO[A] = ZIO[Any, Nothing, A]
type RIO[R, A] = ZIO[R, Throwable, A]
type URIO[R, A] = ZIO[R, Nothing, A]

簡単なコード

と、定義やらなにやら解説するだけではわかりづらいと思うので、簡単なコードを書いてみましょう。

import zio._

def run[A](io: RIO[ZEnv, A]): A = Runtime.default.unsafeRunTask(io)

val pureValue = ZIO.succeed(1)
// pureValue: UIO[Int] = zio.ZIO$EffectTotal@15410753

val io = ZIO.effect { println("My first effect!") }
// io: Task[Unit] = zio.ZIO$EffectPartial@49186278
run(io)
// My first effect!

ZIO.succeedというメソッドは純粋な値を受け取ってZIOの値を作成します。
ZIO.effectというメソッドは名前渡しで受け取った式をZIOの値にラップします。
シグニチャは def effect[A](effect: => A): Task[A] となります。Task[A]というのはZIO[Any, Throwable, A]の別名です。
上記コードを実行すると、コンソールに My first effect! と表示されます。
コードだけを見てもわかりづらいかもしれませんが、ZIOの値を作っただけではその中に定義された処理は実行されません。
RuntimeunsafeRunTaskなどに値を渡すことで初めて処理が実行されます。

さて、これをコンソール表示が3度繰り返されるように変更してみましょう。

val threeTimes = for {
  _ <- io
  _ <- io
  _ <- io
} yield ()
// threeTimes: ZIO[Any, Throwable, Unit] = zio.ZIO$FlatMap@59841390

run(threeTimes)
// My first effect!
// My first effect!
// My first effect!

今回は後で比較するためにfor で書いていますが、 io *> io *> io とか io.replicateM(3) のように書くこともできます (結果の型は多少変化しますが)
さて、上記コードを見ると、io という変数に入れた値を再利用して何度も使っていますが、ZIOの値は参照透過なので

val threeTimes2 = for {
  _ <- ZIO.effect { println("My first effect!") }
  _ <- ZIO.effect { println("My first effect!") }
  _ <- ZIO.effect { println("My first effect!") }
} yield ()
// threeTimes2: ZIO[Any, Throwable, Unit] = zio.ZIO$FlatMap@1e053ba

run(threeTimes2)
// My first effect!
// My first effect!
// My first effect!

と書いた場合と全く同じ意味になります。

Map FlatMap and other combinators

上のサンプルコードでfor文を使っていましたから分かる人も多いとは思いますが、map/flatMapが使えます。

val int = ZIO.succeed(10)
// int: UIO[Int] = zio.ZIO$EffectTotal@605d1b30
val added = int.map { n => n + 1 }
// added: ZIO[Any, Nothing, Int] = zio.ZIO$FlatMap@2bde66dd
val subtracted = int.flatMap { n => ZIO.effect(n - 1) }
// subtracted: ZIO[Any, Throwable, Int] = zio.ZIO$FlatMap@f0f3885

run(added.zip(subtracted))
// res3: (Int, Int) = (11, 9)

また、ZIOでは結果型だけでなくエラー型も保持しているため、bimapfoldができます。
(bimap = bi-directional map、日本語だと双方向マップになります。成功時のAと失敗時のEの両方を同時にmapできます)

// def bimap[E2, B](f: E => E2, g: A => B)(implicit ev: CanFail[E]): ZIO[R, E2, B]
// def fold[B](failure: E => B, success: A => B)(implicit ev: CanFail[E]): URIO[R, B]
// def foldM[R1 <: R, E2, B](failure: E => ZIO[R1, E2, B], success: A => ZIO[R1, E2, B])(implicit ev: CanFail[E]): ZIO[R1, E2, B]

val ok = ZIO.effect("Ok!")
// ok: Task[String] = zio.ZIO$EffectPartial@693ccd84
val fail = ZIO.fail(new RuntimeException("fail"))
// fail: IO[RuntimeException, Nothing] = zio.ZIO$Fail@3806602e

case class WrappedError(e: Throwable) extends Throwable(e)
case class WrappedResult(s: String)

val okBimapped = ok.bimap(e => WrappedError(e), a => WrappedResult(a))
// okBimapped: ZIO[Any, WrappedError, WrappedResult] = <function1>
val failBimapped = fail.bimap(e => WrappedError(e), a => WrappedResult(a))
// failBimapped: ZIO[Any, WrappedError, WrappedResult] = <function1>

run(okBimapped)
// res4: WrappedResult = WrappedResult("Ok!")
run(failBimapped.either) // make error to Left value
// res5: Either[WrappedError, WrappedResult] = Left(
//   WrappedError(java.lang.RuntimeException: fail)
// ) // make error to Left value

val folded = fail.fold(e => s"Error: $e", a => s"Success: $a")
// folded: URIO[Any, String] = <function1>
run(folded)
// res6: String = "Error: java.lang.RuntimeException: fail"

val folededM = ok.foldM(e => ZIO.effect(s"Error: $e"), a => ZIO.effect(s"Success: $a"))
// folededM: ZIO[Any, Throwable, String] = <function1>
run(folededM)
// res7: String = "Success: Ok!"

エラーからの復帰にはcatchAllcatchSomeを使います。catchSomeFuturerecoverWithとほぼ同等です。

// def catchAll[R1 <: R, E2, A1 >: A](h: E => ZIO[R1, E2, A1])(implicit ev: CanFail[E]): ZIO[R1, E2, A1]
// def catchSome[R1 <: R, E1 >: E, A1 >: A](pf: PartialFunction[E, ZIO[R1, E1, A1]])(implicit ev: CanFail[E]): ZIO[R1, E1, A1]

sealed abstract class BaseError extends Exception
final class SomeError extends BaseError
final class ShouldDie extends BaseError
final class IgnoreMe extends BaseError

val mayFail: IO[BaseError, String] = ZIO.fail(new IgnoreMe)
// mayFail: IO[BaseError, String] = zio.ZIO$Fail@53ee50a1

val catched = mayFail.catchAll { e =>
  ZIO.effect(s"catching error: $e")
}
// catched: ZIO[Any, Throwable, String] = <function1>
run(catched)
// res8: String = "catching error: repl.MdocSession$App$IgnoreMe"

val notCatched = mayFail.catchSome {
  case e: SomeError => ZIO.effect(s"Catching SomeError")
  case e: ShouldDie => ZIO.die(new RuntimeException("Dying"))
}
// notCatched: ZIO[Any, Throwable, String] = <function1>
run(notCatched.either)
// res9: Either[Throwable, String] = Left(repl.MdocSession$App$IgnoreMe)

また、ZIOのコンパニオンオブジェクトにも多くの便利なメソッドが定義されています。

mapNcollectAllなどを使って、複数のZIOの値を1つにまとめることができます。

// def mapN[R, E, A, B, C](zio1: ZIO[R, E, A], zio2: ZIO[R, E, B])(f: (A, B) => C): ZIO[R, E, C]
// def collectAll[R, E, A, Collection[+Element] <: Iterable[Element]](in: Collection[ZIO[R, E, A]])(implicit bf: BuildFrom[Collection[ZIO[R, E, A]], A, Collection[A]]): ZIO[R, E, Collection[A]]

val n3 = ZIO.succeed(3)
// n3: UIO[Int] = zio.ZIO$EffectTotal@47c38e38
val n5 = ZIO.succeed(5)
// n5: UIO[Int] = zio.ZIO$EffectTotal@79705fc

val n8: UIO[Int] = ZIO.mapN(n3, n5)(_ + _)
// n8: UIO[Int] = zio.ZIO$FlatMap@126ae175
run(n8)
// res10: Int = 8

val merged: UIO[Seq[Int]] = ZIO.collectAll(Seq(n3, n5))
// merged: UIO[Seq[Int]] = zio.ZIO$FlatMap@758a189d
run(merged)
// res11: Seq[Int] = List(3, 5)

Bonus: ほんの少し変更するだけで並列に実行させることができます!

val parN8: UIO[Int] = ZIO.mapParN(n3, n5)(_ + _)
// parN8: UIO[Int] = zio.ZIO$GetForkScope@1581c147
run(parN8)
// res12: Int = 8

val parMerged: UIO[Seq[Int]] = ZIO.collectAllPar(Seq(n3, n5))
// parMerged: UIO[Seq[Int]] = zio.ZIO$FlatMap@24e7ff91
run(parMerged)
// res13: Seq[Int] = List(3, 5)

ここで紹介した関数以外にも、多くのメソッドがあります。
ZIO cheatsheetJavadoc、最終的にはソースコードに当たることになると思いますが、どんなメソッドが用意されているのかちょっと眺めてみるのも面白いと思います。

バリデーション

ZIOに限らずmapflatMapでは失敗するとそれ以降の処理は実行されません。
そのため、バリデーション - つまりデータを検査して問題のある箇所をエラーとして返すような目的の処理では使いづらいことがあります。

例)
入力された名前と年齢をバリデーションしたい。
名前は空でないこと
年齢は18以上であること

ここで素直にmap/flatMapで実装すると、

case class Input(name: String, age: Int)

def checkName(name: String): IO[String, String] = {
  if (name.isEmpty) ZIO.fail("Name should not be empty")
  else ZIO.succeed(name)
}

def checkAge(age: Int): IO[String, Int] = {
  if (age < 18) ZIO.fail("Age should be >= 18")
  else ZIO.succeed(age)
}

val input = Input("", 15)
// input: Input = Input("", 15)

val result: IO[String, Input] = for {
  name <- checkName(input.name)
  age <- checkAge(input.age)
} yield Input(name, age)
// result: IO[String, Input] = zio.ZIO$Fail@5a2c001f

run(result.either)
// res14: Either[String, Input] = Left("Name should not be empty")

というような形になります。(エラーは一旦文字列のエラーメッセージとしてStringにしています)
これでとりあえず入力の検査自体はできているのですが、名前が空の時には年齢のチェックが走らず、Name should not be emptyというメッセージだけが返されます。
もちろんこの挙動で十分なケースもあるのですが、一部がエラーになったとしても常に全体をチェックして、失敗した項目全てのエラーメッセージをリストなどで返して欲しいユースケースは少なくないはずです。
ある程度Scalaを触ってきた方なら、scalazのValidationValidationNEL)やcatsのValidated型を想起されるでしょうか。
ZIOでは別の型を用意するわけではなく、メソッドとして用意されています。

// def validate[R1 <: R, E1 >: E, B](that: ZIO[R1, E1, B]): ZIO[R1, E1, (A, B)]
// def parallelErrors[E1 >: E]: ZIO[R, ::[E1], A]

val validated: IO[::[String], (String, Int)] =
  checkName(input.name).validate(checkAge(input.age))
    .parallelErrors
// validated: IO[::[String], (String, Int)] = <function1>

run(validated.either)
// res15: Either[::[String], (String, Int)] = Left(
//   List("Name should not be empty", "Age should be >= 18")
// )

これで結果型のEは::[String]となります。::というのは見慣れない型かもしれませんが、Scalaの標準に含まれる型でconsと呼ばれる「必ず1つ以上の要素を含むリスト」です。
1 :: 2 :: Nilなどと書いてListを作成する際、List::というメソッドが生成する値の型が::です。(ややこしい)
::[A]List[A]のサブタイプで、mapなどを呼び出した結果は全てただのList[A]になってしまうため、「空でない」という情報は失われます)

validateparallelErrorsの動作を理解するには、ZIOのエラー型Eの扱いについて多少知っておく必要があります。
ZIOでは内部的に、エラー型ECause[E] という型にラップして保持しており、複数のエラーやエラーの履歴を追跡できるようになっています。
validateでは発生したエラーをCause[E]の中に積んでいく様なイメージになります。
validate時点での結果型はZIO[R, E, A]でしかないため、内部に積まれたエラー情報全てにアクセスすることはできないのですが、parallelErrorsを呼びだすことでCause[E]に積まれたエラーを取り出して::[E]としてアクセスできるようになります。

とまぁ解説すると長くなってしまうのですが、コードとしてはシンプルに実装可能です。
validatevalidateWithflatMapなどを組み合わせることで、実装者の意図に合わせたバリデーションロジックが実装できます。

エラー時のStacktrace

前述した様にZIOではCause[E]の中に様々なエラー情報を保持しています。
そのため副次的な効果として、非同期処理に絡んで発生するエラーのスタックトレースに含まれる情報量が多く、scala.concurrent.Futureなどと比べてデバッグや問題箇所の特定が容易になります。

一例をあげてみましょう。

def asyncDbCall(): Future[Int] = ...

def countPromotion: Future[Int] = ...asyncDbCall()...
def countMedia: Future[Int] = ...asyncDbCall()...

こんなコードがあり、asyncDbCall()の中で例外が発生すると、スタックトレースは以下の様になります。

Exception in thread "main" java.lang.RuntimeException
    at example.Hello$.$anonfun$asyncDbCall$1(Hello.scala:13)
Hello.scala:13
    at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.java:23)
    at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
Future.scala:659
    at scala.util.Success.$anonfun$map$1(Try.scala:255)
Try.scala:255
    at scala.util.Success.map(Try.scala:213)
Try.scala:213
    at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
Future.scala:292
    at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
Promise.scala:33
    at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
Promise.scala:33
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
Promise.scala:64
    at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)

asyncDbCallでエラーが発生したことまではわかりますが、asyncDbCallがどこで呼ばれたのかさっぱりわかりません。

これをZIOを使う様に変更します。

def asyncDbCallZio(): Task[Int] = {
  ZIO.fromFuture(_ => asyncDbCall())
}

def countMediaZio: Task[Int] = {
  for {
    _ <- ZIO.effect(println("Counting!"))
    r <- asyncDbCallZio()
  } yield r
}

def doWork: Task[Int] = {
  for {
    _ <- ZIO.effect(println("Let's do some work"))
    c <- countMediaZio
  } yield c
}

結果、同じ様にasyncDbCallの中でエラーが発生した場合のスタックトレースは以下の様になります。

Fiber failed.
An unchecked error was produced.
java.lang.RuntimeException
    at example.Hello$.$anonfun$asyncDbCall$1(Hello.scala:13)
Hello.scala:13
    at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.java:23)
    at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
Future.scala:659
    at scala.util.Success.$anonfun$map$1(Try.scala:255)
Try.scala:255
    at scala.util.Success.map(Try.scala:213)
Try.scala:213
    at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
Future.scala:292
    at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
Promise.scala:33
    at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
Promise.scala:33
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
Promise.scala:64
    at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)

Fiber:Id(1617612889740,1) was supposed to continue to: <empty trace>

Fiber:Id(1617612889740,1) execution trace:
  at zio.ZIO.orDieWith(ZIO.scala:1060)
ZIO.scala:1060
  at zio.ZIO.onInterrupt(ZIO.scala:992)
ZIO.scala:992
  at zio.ZIO$._IdentityFn(ZIO.scala:4024)
ZIO.scala:4024
  at zio.ZIO$.fromFuture(ZIO.scala:3201)
ZIO.scala:3201
2
  at zio.ZIO$.effectAsyncInterrupt(ZIO.scala:2621)
ZIO.scala:2621
  at zio.ZIO$.fromFuture(ZIO.scala:3193)
ZIO.scala:3193
  at zio.ZIO$.fromFuture(ZIO.scala:3191)
ZIO.scala:3191
  at zio.ZIO$.fromFuture(ZIO.scala:3190)
ZIO.scala:3190
  at example.HelloZio$.countMediaZio(Hello.scala:43)
Hello.scala:43
  at example.HelloZio$.countMediaZio(Hello.scala:42)
Hello.scala:42
  at example.HelloZio$.doWork(Hello.scala:50)
Hello.scala:50
  at example.HelloZio$.doWork(Hello.scala:49)
Hello.scala:49

Fiber:Id(1617612889740,1) was spawned by:

Fiber:Id(1617612889637,0) was supposed to continue to:
  a future continuation at zio.App.main(App.scala:57)
App.scala:57
  a future continuation at zio.App.main(App.scala:56)
App.scala:56

Fiber:Id(1617612889637,0) ZIO Execution trace: <empty trace>


...

大分量が増えてちょっと読むのが大変ではあるのですが、

ZIO.scala:3191
  at zio.ZIO$.fromFuture(ZIO.scala:3190)
ZIO.scala:3190
  at example.HelloZio$.countMediaZio(Hello.scala:43)
Hello.scala:43
  at example.HelloZio$.countMediaZio(Hello.scala:42)
Hello.scala:42
  at example.HelloZio$.doWork(Hello.scala:50)
Hello.scala:50
  at example.HelloZio$.doWork(Hello.scala:49)
Hello.scala:49

というようにどこから呼ばれたのかちゃんと追跡できていることがわかります。
他にも、例外のfinalizerでさらに例外が発生した場合も元の例外情報を失わないなど、ZIOには実用上の利点が多くあります。

最後に

ZIOについて話そうと思えば、まだまだ

  • R:環境型を使ったDI
  • 複雑な非同期制御やエラーハンドリング
  • ZStreamを使ったストリーミング処理

など話題は尽きません。

複雑な機能も存在するZIOですが、単純にscala.concurrent.Futureを置き換える様な使い方でも多くの実用上のメリットを享受することができると思います。

みなさんも、是非ZIOを使ってみてください。