はじめに
はじめまして!昨年10月に中途入社したエンジニアのおかむです。
エンジニアとしてはそれなりに長くやってきて、インフラからフロントエンドまで幅広く経験しています。
苗字が社長の岡村さん(2021年7月からは会長になります)と同じで紛らわしいため、Adwaysでは愛称で呼んでもらってます。
(実は年齢も岡村さんと同じです)
自己紹介は短く切り上げて本題に入りましょう。
以下、Scalaのサンプルコードではmdocを使っています。
ZIOとは何か
ZIOは、Scalaで作用を扱う関数型プログラミングのライブラリです。
「作用」というのはちょっとわかりづらいので、多少の語弊は恐れず「処理」と言い換えても大きな問題はないと思います。
(英語ではeffectとよばれます)
ZIOで中心となる「作用を表す型」がzio.ZIO
です。
scala.concurrent.Future
を知っているのであれば、遅延評価されるFuture
のような型であると考えてもらってもいいでしょう。
類似の型に、cats-effect
のIO
やmonix
のTask
が挙げられます。
まずはZIO
という型の定義を見てみましょう
https://github.com/zio/zio/blob/bb2d0de772f22295afa3c5bf69808d39e6d9ef66/core/shared/src/main/scala/zio/ZIO.scala#L52
sealed trait ZIO[-R, +E, +A] extends Serializable with ZIOPlatformSpecific[R, E, A]
R
, E
, A
と3つの型パラメータをとる型であることがわかります。
R
: 環境を表す型 (Environment/Requirement)
作用を実行するのに必要な「環境」を表す型です。これがAny
の場合、必要な環境が存在せずそのまま実行できることを示します。
ZIO
では、このR
型を使ってDIのようなことが実現できます。E
: 失敗を表す型 (Failure type)
この作用を実行するとどの様なエラーが発生する可能性があるかを表します。アプリケーションによっては一律Throwable
を使って、特にエラー型を限定しない場合もあります (実質Future
やcats.IO
などと同等)。
Nothing
の場合「予期される失敗はない」ということを表します。
(絶対に失敗しないという保証ではありません。この値を取り扱うユーザがエラー処理を考える必要がないということです。)A
: 成功した作用の結果を表す型 (Success type)
これがUnit
の場合、特に意味のある値を返さないということです。もしNothing
の場合、この作用が完了しない(失敗するまで永久に続く)ことを表します。
例えば ZIO[Any, IOException, Option[Int]]
という場合、必要な環境型はなく、IOException
で失敗する可能性があり、成功するとOption[Int]
を返す作用を表す型になります。
ZIO[R, E, A]
の値は、R => Either[E, A]
という関数にも似ていますが、非同期や並行制御など複雑な作用を扱えたりモナディックな合成ができるなど、より高度な機能を提供します。
ZIOにおいて作用を表す型はZIO
のみですが、よく使われる型パラメータの組み合わせにはエイリアスが用意されています。
下記の様なエイリアスを利用することで、冗長な型の記述をある程度は避けられる様になっています。
type Task[A] = ZIO[Any, Throwable, A] type IO[E, A] = ZIO[Any, E, A] type UIO[A] = ZIO[Any, Nothing, A] type RIO[R, A] = ZIO[R, Throwable, A] type URIO[R, A] = ZIO[R, Nothing, A]
簡単なコード
と、定義やらなにやら解説するだけではわかりづらいと思うので、簡単なコードを書いてみましょう。
import zio._ def run[A](io: RIO[ZEnv, A]): A = Runtime.default.unsafeRunTask(io) val pureValue = ZIO.succeed(1) // pureValue: UIO[Int] = zio.ZIO$EffectTotal@15410753 val io = ZIO.effect { println("My first effect!") } // io: Task[Unit] = zio.ZIO$EffectPartial@49186278 run(io) // My first effect!
ZIO.succeed
というメソッドは純粋な値を受け取ってZIOの値を作成します。
ZIO.effect
というメソッドは名前渡しで受け取った式をZIOの値にラップします。
シグニチャは def effect[A](effect: => A): Task[A]
となります。Task[A]
というのはZIO[Any, Throwable, A]
の別名です。
上記コードを実行すると、コンソールに My first effect! と表示されます。
コードだけを見てもわかりづらいかもしれませんが、ZIOの値を作っただけではその中に定義された処理は実行されません。
Runtime
のunsafeRunTask
などに値を渡すことで初めて処理が実行されます。
さて、これをコンソール表示が3度繰り返されるように変更してみましょう。
val threeTimes = for { _ <- io _ <- io _ <- io } yield () // threeTimes: ZIO[Any, Throwable, Unit] = zio.ZIO$FlatMap@59841390 run(threeTimes) // My first effect! // My first effect! // My first effect!
今回は後で比較するためにfor
で書いていますが、 io *> io *> io
とか io.replicateM(3)
のように書くこともできます (結果の型は多少変化しますが)
さて、上記コードを見ると、io
という変数に入れた値を再利用して何度も使っていますが、ZIOの値は参照透過なので
val threeTimes2 = for { _ <- ZIO.effect { println("My first effect!") } _ <- ZIO.effect { println("My first effect!") } _ <- ZIO.effect { println("My first effect!") } } yield () // threeTimes2: ZIO[Any, Throwable, Unit] = zio.ZIO$FlatMap@1e053ba run(threeTimes2) // My first effect! // My first effect! // My first effect!
と書いた場合と全く同じ意味になります。
Map FlatMap and other combinators
上のサンプルコードでfor
文を使っていましたから分かる人も多いとは思いますが、map
/flatMap
が使えます。
val int = ZIO.succeed(10) // int: UIO[Int] = zio.ZIO$EffectTotal@605d1b30 val added = int.map { n => n + 1 } // added: ZIO[Any, Nothing, Int] = zio.ZIO$FlatMap@2bde66dd val subtracted = int.flatMap { n => ZIO.effect(n - 1) } // subtracted: ZIO[Any, Throwable, Int] = zio.ZIO$FlatMap@f0f3885 run(added.zip(subtracted)) // res3: (Int, Int) = (11, 9)
また、ZIO
では結果型だけでなくエラー型も保持しているため、bimap
やfold
ができます。
(bimap
= bi-directional map
、日本語だと双方向マップになります。成功時のAと失敗時のEの両方を同時にmapできます)
// def bimap[E2, B](f: E => E2, g: A => B)(implicit ev: CanFail[E]): ZIO[R, E2, B] // def fold[B](failure: E => B, success: A => B)(implicit ev: CanFail[E]): URIO[R, B] // def foldM[R1 <: R, E2, B](failure: E => ZIO[R1, E2, B], success: A => ZIO[R1, E2, B])(implicit ev: CanFail[E]): ZIO[R1, E2, B] val ok = ZIO.effect("Ok!") // ok: Task[String] = zio.ZIO$EffectPartial@693ccd84 val fail = ZIO.fail(new RuntimeException("fail")) // fail: IO[RuntimeException, Nothing] = zio.ZIO$Fail@3806602e case class WrappedError(e: Throwable) extends Throwable(e) case class WrappedResult(s: String) val okBimapped = ok.bimap(e => WrappedError(e), a => WrappedResult(a)) // okBimapped: ZIO[Any, WrappedError, WrappedResult] = <function1> val failBimapped = fail.bimap(e => WrappedError(e), a => WrappedResult(a)) // failBimapped: ZIO[Any, WrappedError, WrappedResult] = <function1> run(okBimapped) // res4: WrappedResult = WrappedResult("Ok!") run(failBimapped.either) // make error to Left value // res5: Either[WrappedError, WrappedResult] = Left( // WrappedError(java.lang.RuntimeException: fail) // ) // make error to Left value val folded = fail.fold(e => s"Error: $e", a => s"Success: $a") // folded: URIO[Any, String] = <function1> run(folded) // res6: String = "Error: java.lang.RuntimeException: fail" val folededM = ok.foldM(e => ZIO.effect(s"Error: $e"), a => ZIO.effect(s"Success: $a")) // folededM: ZIO[Any, Throwable, String] = <function1> run(folededM) // res7: String = "Success: Ok!"
エラーからの復帰にはcatchAll
やcatchSome
を使います。catchSome
はFuture
のrecoverWith
とほぼ同等です。
// def catchAll[R1 <: R, E2, A1 >: A](h: E => ZIO[R1, E2, A1])(implicit ev: CanFail[E]): ZIO[R1, E2, A1] // def catchSome[R1 <: R, E1 >: E, A1 >: A](pf: PartialFunction[E, ZIO[R1, E1, A1]])(implicit ev: CanFail[E]): ZIO[R1, E1, A1] sealed abstract class BaseError extends Exception final class SomeError extends BaseError final class ShouldDie extends BaseError final class IgnoreMe extends BaseError val mayFail: IO[BaseError, String] = ZIO.fail(new IgnoreMe) // mayFail: IO[BaseError, String] = zio.ZIO$Fail@53ee50a1 val catched = mayFail.catchAll { e => ZIO.effect(s"catching error: $e") } // catched: ZIO[Any, Throwable, String] = <function1> run(catched) // res8: String = "catching error: repl.MdocSession$App$IgnoreMe" val notCatched = mayFail.catchSome { case e: SomeError => ZIO.effect(s"Catching SomeError") case e: ShouldDie => ZIO.die(new RuntimeException("Dying")) } // notCatched: ZIO[Any, Throwable, String] = <function1> run(notCatched.either) // res9: Either[Throwable, String] = Left(repl.MdocSession$App$IgnoreMe)
また、ZIO
のコンパニオンオブジェクトにも多くの便利なメソッドが定義されています。
mapN
やcollectAll
などを使って、複数のZIOの値を1つにまとめることができます。
// def mapN[R, E, A, B, C](zio1: ZIO[R, E, A], zio2: ZIO[R, E, B])(f: (A, B) => C): ZIO[R, E, C] // def collectAll[R, E, A, Collection[+Element] <: Iterable[Element]](in: Collection[ZIO[R, E, A]])(implicit bf: BuildFrom[Collection[ZIO[R, E, A]], A, Collection[A]]): ZIO[R, E, Collection[A]] val n3 = ZIO.succeed(3) // n3: UIO[Int] = zio.ZIO$EffectTotal@47c38e38 val n5 = ZIO.succeed(5) // n5: UIO[Int] = zio.ZIO$EffectTotal@79705fc val n8: UIO[Int] = ZIO.mapN(n3, n5)(_ + _) // n8: UIO[Int] = zio.ZIO$FlatMap@126ae175 run(n8) // res10: Int = 8 val merged: UIO[Seq[Int]] = ZIO.collectAll(Seq(n3, n5)) // merged: UIO[Seq[Int]] = zio.ZIO$FlatMap@758a189d run(merged) // res11: Seq[Int] = List(3, 5)
Bonus: ほんの少し変更するだけで並列に実行させることができます!
val parN8: UIO[Int] = ZIO.mapParN(n3, n5)(_ + _) // parN8: UIO[Int] = zio.ZIO$GetForkScope@1581c147 run(parN8) // res12: Int = 8 val parMerged: UIO[Seq[Int]] = ZIO.collectAllPar(Seq(n3, n5)) // parMerged: UIO[Seq[Int]] = zio.ZIO$FlatMap@24e7ff91 run(parMerged) // res13: Seq[Int] = List(3, 5)
ここで紹介した関数以外にも、多くのメソッドがあります。
ZIO cheatsheetやJavadoc、最終的にはソースコードに当たることになると思いますが、どんなメソッドが用意されているのかちょっと眺めてみるのも面白いと思います。
バリデーション
ZIOに限らずmap
やflatMap
では失敗するとそれ以降の処理は実行されません。
そのため、バリデーション - つまりデータを検査して問題のある箇所をエラーとして返すような目的の処理では使いづらいことがあります。
例)
入力された名前と年齢をバリデーションしたい。
名前は空でないこと
年齢は18以上であること
ここで素直にmap
/flatMap
で実装すると、
case class Input(name: String, age: Int) def checkName(name: String): IO[String, String] = { if (name.isEmpty) ZIO.fail("Name should not be empty") else ZIO.succeed(name) } def checkAge(age: Int): IO[String, Int] = { if (age < 18) ZIO.fail("Age should be >= 18") else ZIO.succeed(age) } val input = Input("", 15) // input: Input = Input("", 15) val result: IO[String, Input] = for { name <- checkName(input.name) age <- checkAge(input.age) } yield Input(name, age) // result: IO[String, Input] = zio.ZIO$Fail@5a2c001f run(result.either) // res14: Either[String, Input] = Left("Name should not be empty")
というような形になります。(エラーは一旦文字列のエラーメッセージとしてString
にしています)
これでとりあえず入力の検査自体はできているのですが、名前が空の時には年齢のチェックが走らず、Name should not be emptyというメッセージだけが返されます。
もちろんこの挙動で十分なケースもあるのですが、一部がエラーになったとしても常に全体をチェックして、失敗した項目全てのエラーメッセージをリストなどで返して欲しいユースケースは少なくないはずです。
ある程度Scalaを触ってきた方なら、scalazのValidation
(ValidationNEL
)やcatsのValidated
型を想起されるでしょうか。
ZIOでは別の型を用意するわけではなく、メソッドとして用意されています。
// def validate[R1 <: R, E1 >: E, B](that: ZIO[R1, E1, B]): ZIO[R1, E1, (A, B)] // def parallelErrors[E1 >: E]: ZIO[R, ::[E1], A] val validated: IO[::[String], (String, Int)] = checkName(input.name).validate(checkAge(input.age)) .parallelErrors // validated: IO[::[String], (String, Int)] = <function1> run(validated.either) // res15: Either[::[String], (String, Int)] = Left( // List("Name should not be empty", "Age should be >= 18") // )
これで結果型のEは::[String]
となります。::
というのは見慣れない型かもしれませんが、Scalaの標準に含まれる型でconsと呼ばれる「必ず1つ以上の要素を含むリスト」です。
1 :: 2 :: Nil
などと書いてList
を作成する際、List
の::
というメソッドが生成する値の型が::
です。(ややこしい)
(::[A]
はList[A]
のサブタイプで、map
などを呼び出した結果は全てただのList[A]
になってしまうため、「空でない」という情報は失われます)
validate
とparallelErrors
の動作を理解するには、ZIOのエラー型E
の扱いについて多少知っておく必要があります。
ZIOでは内部的に、エラー型E
を Cause[E]
という型にラップして保持しており、複数のエラーやエラーの履歴を追跡できるようになっています。
validate
では発生したエラーをCause[E]
の中に積んでいく様なイメージになります。
validate
時点での結果型はZIO[R, E, A]
でしかないため、内部に積まれたエラー情報全てにアクセスすることはできないのですが、parallelErrors
を呼びだすことでCause[E]
に積まれたエラーを取り出して::[E]
としてアクセスできるようになります。
とまぁ解説すると長くなってしまうのですが、コードとしてはシンプルに実装可能です。
validate
やvalidateWith
、flatMap
などを組み合わせることで、実装者の意図に合わせたバリデーションロジックが実装できます。
エラー時のStacktrace
前述した様にZIO
ではCause[E]
の中に様々なエラー情報を保持しています。
そのため副次的な効果として、非同期処理に絡んで発生するエラーのスタックトレースに含まれる情報量が多く、scala.concurrent.Future
などと比べてデバッグや問題箇所の特定が容易になります。
一例をあげてみましょう。
def asyncDbCall(): Future[Int] = ... def countPromotion: Future[Int] = ...asyncDbCall()... def countMedia: Future[Int] = ...asyncDbCall()...
こんなコードがあり、asyncDbCall()
の中で例外が発生すると、スタックトレースは以下の様になります。
Exception in thread "main" java.lang.RuntimeException at example.Hello$.$anonfun$asyncDbCall$1(Hello.scala:13) Hello.scala:13 at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.java:23) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) Future.scala:659 at scala.util.Success.$anonfun$map$1(Try.scala:255) Try.scala:255 at scala.util.Success.map(Try.scala:213) Try.scala:213 at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) Future.scala:292 at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) Promise.scala:33 at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) Promise.scala:33 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) Promise.scala:64 at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
asyncDbCallでエラーが発生したことまではわかりますが、asyncDbCallがどこで呼ばれたのかさっぱりわかりません。
これをZIOを使う様に変更します。
def asyncDbCallZio(): Task[Int] = { ZIO.fromFuture(_ => asyncDbCall()) } def countMediaZio: Task[Int] = { for { _ <- ZIO.effect(println("Counting!")) r <- asyncDbCallZio() } yield r } def doWork: Task[Int] = { for { _ <- ZIO.effect(println("Let's do some work")) c <- countMediaZio } yield c }
結果、同じ様にasyncDbCallの中でエラーが発生した場合のスタックトレースは以下の様になります。
Fiber failed. An unchecked error was produced. java.lang.RuntimeException at example.Hello$.$anonfun$asyncDbCall$1(Hello.scala:13) Hello.scala:13 at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.java:23) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) Future.scala:659 at scala.util.Success.$anonfun$map$1(Try.scala:255) Try.scala:255 at scala.util.Success.map(Try.scala:213) Try.scala:213 at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) Future.scala:292 at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) Promise.scala:33 at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) Promise.scala:33 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) Promise.scala:64 at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) Fiber:Id(1617612889740,1) was supposed to continue to: <empty trace> Fiber:Id(1617612889740,1) execution trace: at zio.ZIO.orDieWith(ZIO.scala:1060) ZIO.scala:1060 at zio.ZIO.onInterrupt(ZIO.scala:992) ZIO.scala:992 at zio.ZIO$._IdentityFn(ZIO.scala:4024) ZIO.scala:4024 at zio.ZIO$.fromFuture(ZIO.scala:3201) ZIO.scala:3201 2 at zio.ZIO$.effectAsyncInterrupt(ZIO.scala:2621) ZIO.scala:2621 at zio.ZIO$.fromFuture(ZIO.scala:3193) ZIO.scala:3193 at zio.ZIO$.fromFuture(ZIO.scala:3191) ZIO.scala:3191 at zio.ZIO$.fromFuture(ZIO.scala:3190) ZIO.scala:3190 at example.HelloZio$.countMediaZio(Hello.scala:43) Hello.scala:43 at example.HelloZio$.countMediaZio(Hello.scala:42) Hello.scala:42 at example.HelloZio$.doWork(Hello.scala:50) Hello.scala:50 at example.HelloZio$.doWork(Hello.scala:49) Hello.scala:49 Fiber:Id(1617612889740,1) was spawned by: Fiber:Id(1617612889637,0) was supposed to continue to: a future continuation at zio.App.main(App.scala:57) App.scala:57 a future continuation at zio.App.main(App.scala:56) App.scala:56 Fiber:Id(1617612889637,0) ZIO Execution trace: <empty trace> ...
大分量が増えてちょっと読むのが大変ではあるのですが、
ZIO.scala:3191 at zio.ZIO$.fromFuture(ZIO.scala:3190) ZIO.scala:3190 at example.HelloZio$.countMediaZio(Hello.scala:43) Hello.scala:43 at example.HelloZio$.countMediaZio(Hello.scala:42) Hello.scala:42 at example.HelloZio$.doWork(Hello.scala:50) Hello.scala:50 at example.HelloZio$.doWork(Hello.scala:49) Hello.scala:49
というようにどこから呼ばれたのかちゃんと追跡できていることがわかります。
他にも、例外のfinalizerでさらに例外が発生した場合も元の例外情報を失わないなど、ZIOには実用上の利点が多くあります。
最後に
ZIOについて話そうと思えば、まだまだ
R
:環境型を使ったDI- 複雑な非同期制御やエラーハンドリング
- ZStreamを使ったストリーミング処理
など話題は尽きません。
複雑な機能も存在するZIO
ですが、単純にscala.concurrent.Future
を置き換える様な使い方でも多くの実用上のメリットを享受することができると思います。
みなさんも、是非ZIOを使ってみてください。