ZIO再訪

前回、ZIOについて紹介した記事を書いてから半年以上も経ってしまいました。エンジニアの岡村です。

今回も引き続きZIOについて、特に並行処理に焦点を当てて紹介していきます。

前回の記事をまだ見ていない方はこちらから確認できますので、よろしければご覧ください。

ZIOにおける並行処理について

ZIOにおけるeffectのデフォルト実行モデルは同期実行です。例えば、

import zio._

def run[A](io: RIO[ZEnv, A]): A = Runtime.default.unsafeRunTask(io)

def printT = s"[${Thread.currentThread().getName}]"

val goToShinjuku = ZIO.succeed("電車で新宿駅まで移動").debug(printT)
// goToShinjuku: ZIO[Any, Nothing, String] = <function1>
val readTheBook = ZIO.succeed("読書").debug(printT)
// readTheBook: ZIO[Any, Nothing, String] = <function1>

val seq = for {
  a <- goToShinjuku
  b <- readTheBook
} yield ()
// seq: ZIO[Any, Nothing, Unit] = zio.ZIO$FlatMap@69303452
run(seq)
// [Thread-1444]: 電車で新宿駅まで移動
// [Thread-1444]: 読書

と2つのeffectは順番に処理されます。
しかし私は電車通勤中、乗っている間の時間を有効利用するために読書がしたいのです。
複数のeffectを並列に処理するためには、ZIO.foreachParZIO.mapParなどの各種xxxParメソッドを使うか、より下位のモデルであるFiberを使う必要があります。

val zipped = goToShinjuku.zipPar(readTheBook)
// zipped: ZIO[Any, Nothing, (String, String)] = zio.ZIO$GetForkScope@255bd306
run(zipped)
// [zio-default-async-7]: 電車で新宿駅まで移動
// [zio-default-async-8]: 読書
// res2: (String, String) = (電車で新宿駅まで移動,読書)

zipParで2つのeffectを接続することで、並列で処理することができました。

ZIOにおける並行処理のプリミティブが Fiber です。
Fiberは一般的にグリーンスレッドや軽量スレッドと呼ばれるものの一種です。
JVMのスレッドであるjava.io.Threadと似たものではあるのですが、カーネルスレッドであるJVMスレッドと違い軽量で優れたパフォーマンスを発揮します。
実際の動きとしては、一つのJVMスレッドの上でたくさんのFiberが動作する様な形になります。Fiberはほぼ無制限に使えるJVMスレッドのようなものだと思ってもらって結構です。

しかしここで公式ドキュメントからの警告を心に留めて置きましょう。

Warning, if you are not an advanced programmer:

You should avoid fibers. If you can avoid fibers, then do it. ZIO gives you many concurrent primitives like raceWith, zipPar, foreachPar, and so forth, which allows you to avoid using fibers directly.

Fibers just like threads are low-level constructs. It's not generally recommended for an average programmer to manually use fibers. It is very easy to make lots of mistakes or to introduce performance problems by manually using them.

ZIOには上で挙げた様にzipParmapParforeachParcollectAllParなど手軽に並行処理を実装するためのメソッドが複数用意されています。
Fiberを直接触る前にそれらの機能を利用することができないか、検討しましょう。

実際に使ってみる

とはいえ、Fiberの使用を避けられないケースは存在します。
複数の並列で動作している処理の間で協調動作が必要な場合など、Fiberなど並行処理のプリミティブをマスターしていれば大いに役に立つでしょう。

まずFiberを作成するには、ZIOのeffectから.forkするだけです。そして.joinでFiberの完了を待つことができます。

def fork: URIO[R, Fiber[E, A]]
def join: IO[E, A]
val walkToOffice = ZIO.succeed("オフィスまで徒歩で移動").debug(printT)
// walkToOffice: ZIO[Any, Nothing, String] = <function1>

val fork1 = for {
  g <- goToShinjuku.fork
  r <- readTheBook.fork
  zipped = g.zip(r)
  result <- zipped.join
  _ <- walkToOffice
} yield ()
// fork1: ZIO[Any, Nothing, Unit] = zio.ZIO$FlatMap@4e89b600
run(fork1)
// [zio-default-async-10]: 電車で新宿駅まで移動
// [zio-default-async-11]: 読書
// [zio-default-async-14]: オフィスまで徒歩で移動

これで期待通り別々のスレッドで処理を動作させることができました。
さて、しかし上の処理では、新宿到着と読書の完了を共に待ってからオフィスに歩き始めることになります。
電車で新宿に着いたら、仮に読書中でも中断してオフィスに向かいたいですね。
ここでinterruptを使います。

def interrupt: UIO[Exit[E, A]]

Fiberのinterruptは、ネイティブなThreadのインタラプトに比べて安全で、多くの場合高速に動作すると言われています。
https://zio.dev/version-1.x/datatypes/fiber/index#interrupt-safe

import zio._
import zio.duration._
import java.util.concurrent.TimeUnit.SECONDS

def run[A](io: RIO[ZEnv, A]): A = Runtime.default.unsafeRunTask(io)

def printT = s"[${Thread.currentThread().getName}]"

val goToShinjuku = for {
  start <- clock.currentTime(SECONDS)
  _ <- ZIO.succeed(s"$start: 電車で新宿駅まで移動").debug(printT)
  _ <- ZIO.sleep(3.seconds)
  finish <- clock.currentTime(SECONDS)
  _ <- ZIO.succeed(s"$finish: 到着").debug(printT)
} yield ()
// goToShinjuku: ZIO[clock.package.Clock, Nothing, Unit] = zio.ZIO$FlatMap@24e67e7

val readTheBook = (for {
  start <- clock.currentTime(SECONDS)
  _ <- ZIO.succeed(s"$start: 読書").debug(printT)
  _ <- ZIO.sleep(10.seconds)
  finish <- clock.currentTime(SECONDS)
  _ <- ZIO.succeed(s"$finish: 読了").debug(printT)
} yield ()).onInterrupt(clock.currentTime(SECONDS).flatMap(sec => ZIO.effect(s"$sec: 読書中断").debug(printT).orDie))
// readTheBook: ZIO[clock.package.Clock, Nothing, Unit] = zio.ZIO$CheckInterrupt@3361f044

val walkToOffice = clock.currentTime(SECONDS).flatMap(sec => ZIO.succeed(s"$sec: オフィスまで徒歩で移動").debug(printT))
// walkToOffice: ZIO[clock.package.Clock, Nothing, String] = zio.ZIO$FlatMap@63b61f8b

val fork = for {
  g <- goToShinjuku.fork
  r <- readTheBook.fork
  _ <- g.join
  _ <- r.interrupt
  _ <- walkToOffice
} yield ()
// fork: ZIO[clock.package.Clock, Nothing, Unit] = zio.ZIO$FlatMap@371397eb
run(fork)
// [zio-default-async-15]: 1639459634: 電車で新宿駅まで移動
// [zio-default-async-16]: 1639459634: 読書
// [zio-default-async-17]: 1639459637: 到着
// [zio-default-async-19]: 1639459637: 読書中断
// [zio-default-async-20]: 1639459637: オフィスまで徒歩で移動

これで期待通りの動作になりました。

Supervision

Fiberの基本的な使い方については理解できたでしょうか。
最後にFiberがsupervisedであることを説明しておきます。

ZIOのアプリケーションでは、全ての処理はFiberの中で動作します。あるFiberから作成された新しいFiberは、明示的に指定されない限り作成したFiberを親とした子Fiberとなります。
Fiberがsupervisedであるというのは、ある子Fiberが常に親Fiberの管理下(スコープの中)にあるということです。
親のFiberが終了したら、動作している子Fiberは全てinterruptされます。
この構造が、ZIOにおいてFiberのライフサイクルを安全に管理し、動作させる基本となっています。

さて、よくできたsupervisionの仕組みですが、バックグラウンドで動き続ける様なFiberを作成したい時には、作成元のFiberを親にしてしまうと不都合なケースがあります。(作成元のFiberが終了しても動き続けて欲しいですからね)
この時使うのが、.forkDaemonで、アプリケーション全体のroot fiberを親として子Fiberを作成することができます。

import zio.console._

val inner = putStrLn("Inner job is running...")
  .delay(1.seconds)
  .forever
  .onInterrupt(putStrLn("Inner job interrupted.").orDie)
// inner: ZIO[Console with clock.package.Clock, java.io.IOException, Nothing] = zio.ZIO$CheckInterrupt@704e0ffc

val outer = (
  for {
    _ <- putStrLn("Outer job is started.")
    f <- inner.forkDaemon
    _ <- putStrLn("Outer job is running...").delay(1.seconds).forever
    _ <- f.join
  } yield ()
).onInterrupt(putStrLn("Outer job interrupted.").orDie)
// outer: ZIO[Console with clock.package.Clock, java.io.IOException, Unit] = zio.ZIO$CheckInterrupt@7e82e1b2

val app = for {
  fiber <- outer.fork
  _ <- fiber.interrupt.delay(3.seconds)
  _ <- ZIO.sleep(5.seconds)
  _ <- putStrLn("Finishing my app")
} yield ()
// app: ZIO[Console with clock.package.Clock, java.io.IOException, Unit] = zio.ZIO$FlatMap@36e6485

run(app)
// Outer job is started.
// Inner job is running...
// Outer job is running...
// Inner job is running...
// Outer job is running...
// Outer job interrupted.
// Inner job is running...
// Inner job is running...
// Finishing my app

Inner jobはforkDaemonで作成され、Outer jobではなくアプリケーションのroot fiberの直接の子として動作しているため、Outer Jobが中断されてもアプリケーションが終了するまで動作し続けます。

ちなみにapp

val app = for {
  fiber <- outer.fork
  _ <- fiber.interrupt.delay(3.seconds)
  _ <- ZIO.never
} yield ()

と変更すれば、Ctrl+Cなどで終了されない限り動作し続けるようになります。

まとめ

今回はZIOの並行処理について簡単に説明しました。
繰り返しになりますが、ZIOには各種並行処理を手軽に実現するためのメソッドが用意されています。どうしても必要なケース以外で直接Fiberを扱うのは避けましょう。

今回は紹介できませんでしたが、FiberRefなど、Fiberやそのsupervisionについて理解していれば便利に活用できる機能もあります。
もしFiberを使いこなして複雑な並行処理を設計・実装する機会があればお試しください。

追記

ZIO 2.0がRC間近です。
移行用scalafixルールも用意されていますので、比較的簡単に移行できるはずです。
https://zio.dev/next/howto/migrate/zio-2.x-migration-guide

RCがリリースされたら、よりハイパフォーマンスで使い勝手の向上したZIO 2.0を是非試してみてください。