2011-11-04

Finagle - 都会育ちのメッセージングフレームワーク

Scala でもやるかとぶつやく同僚を見て, たしかに Scala したいかも...などと意思の弱い私は気をそそられ, しかし特に書くものも思いつかずなんとなくウェブをぶらぶらしていた. そういえば Heroku が Scala をサポートしたニュースを読んだっけと検索すると, たしかにアナウンスがあった.

このアナウンスにあるサンプルコードは面白い. Lift でも Play でもなく, Twitter の RPC フレームワークである Finagle が使われている. Finagle でサンプルを書いた理由の一つは画面におさまる簡潔さ, あとは流行り物の目新しさだろうけれど, Heroku の勧める Polyglot Platform の ありかたを示す意味もある気がしなくもない.

Polyglot

Polyglot という言葉を最初に目にしたのは プロダクティブプログラマ だったと思う. この本がどのように Polyglot Programming を主張していたのかよく覚えていないけれど, 適材適所でプログラミング言語を混ぜることがあるのは確かだとうなずいたのは覚えている. 仕事で参加していたプロジェクトが割と polyglot で, いつも this と self のタイポに苦しんでいたからだ.

その時のプロジェクトは

という惨状. Ruby と C# と JavaScript は小規模だから良かったものの, Python/C++/ActionScript は毎日ぜんぶさわる有様だった. 私は C++ 以外でコードを書けてちょっとウキウキだったけれど, 隣席の若者はしばらく苦労していた気がする.

6 種類はやりすぎだとしても, 最近のウェブっぽいお仕事をしている人なら 2-3 種類は使うのではないか. 要するに 1. ウェブサーバや手元のマシンで動くラブリーな言語 (Ruby, Python など), 2. クライアント環境に強いられる言語 (Objective-C, ActionScript, JavaScript など), そして場合によっては 3. なんか速いのが欲しいときの言語 (Java, Scala, C++ など) くらいはさわることがあるだろう. どれか一つの言語に統一できればいいけれど, それはえてして Law of the instrument になる. だから無理せずいろんな言語を使い Polyglot しよう - おおよそこれが Heroku の主張だった. (私の主張ではないので反論は Heroku blog のコメント欄にどうぞ.)

サンプルに Finagle を使ったのは, 適材適所 Polyglot を提案する Heroku からのメッセージなのだ. バックエンドは Scala で書いて, Ruby とは RPC すればいいじゃない...私はこう解釈し, ひとり勝手に納得した.

Heroku からのメッセージがたぶん気のせいなのを認めるのにはやぶさかでない. けれど少なくとも Finagle の開発元たる Twitter での Scala 利用はそんなかんじだと Nick Kallen はインタビューで説明している. そして田舎(Mountain View)にいると都会(San Francisco)っ子の発言がなにもかもおしゃれで素敵なものに思える. Scala なおしゃれ Polyglotter を気取り, 今日は Finagle を拝むことにした.

Finagle

Finagle は Twitter 製の RPC フレームワークである...と聞くと 食傷気味な人もいるかもしれない. Thrift, ARVO, MessagePack(-RPC)... ウェブ世代の RPC は山ほどある. Scala で書けば良いってもんじゃない.

けれど, Finagle はこうした流れとは少し毛色が違う. Finagle は直列化より下のレイヤに重点があり, イメージとしては WCF なんかに近い. PRC よりメッセージングのフレームワークと呼ぶ方がふさわしい. 開発者の Marius A. Eriksen による Scala Days での講演 では "A Network Stack for the JVM" を謳っている. このスタックは各コンポーネントが pluggable にできており, たとえば直列化形式には Thrift を使うこともできる. Finagle 向けに 改造した Thrift コンパイラ がある.

プラガブルなネットワークスタックというアイデアもまた, 使い古しにみえるかもしれない. 昔は UNIX の STREAMS から 最近だと Java の MINANetty まで, よく知られたパターンといえる. 実際, Finagle は Netty をベースに作られている.

それなら...と次の疑問がわきあがる: なぜ Netty のフレームワークに従わず似たようなレイヤを再発明したのだろう. たぶん Scala でクールに書きたかったからだと私はおもう. 都会っ子だからってそんな若気の至りが許されるのか. 田舎の中年として最初はハラスメントな気持で一杯だったが, 先の講演を聞いてそれなりに納得した. たしかに Scala だと色々クールに書けるかもしれない.

com.twitter.util.Future

Finagle は Scala で書かれた Future クラス を中心に非同期のプログラミングモデルを組み立てている. RPC サービスは Future として結果を返すし (Service.scala)), クライアントも同じスタイルで書く.

名前は同じでも, Twitter の Future と Java 標準のやつ は別物. コールバックに頼るモデルで, Python の Deferred なんかに近い. ノンブロッキングモデルに従う Netty の上で動くことを思えばまっとうなつくりといえる.

Marius A. Eriksen の講演で題材に使われていた Twitter 製 Scala 教材 から, 簡易検索エンジンのコードを一例として除いてみよう.

 
 def get(key: String) = try {
   println("GET", key)
   val queries = indices.map { idx =>
     idx.get(key) map { r => Some(r) } handle { case e => None }
   }

   Future.collect(queries) flatMap { results =>
     println("got results", results.mkString(","))
     results.find { _.isDefined } map { _.get } match {
       case Some(v) => Future.value(v)
       case None => Future.exception(new SearchbirdException("No such key"))
     }
   }
 } catch {
   case e =>
     println("got exc", e)
     throw e
 }

Future.collect() のところがポイント. 一見すると単なる Ruby っぽいコードだけれど, collect() の戻り値から flatMap に連なるコードに登場する変数は半分くらいが Future に染まっている. Future に対して collect() や flatMap() をしておくと, 非同期のコールバックが 帰ってきたタイミングでうまいこと仕事を進めてくれる. sharding されたバックエンドに まとめてリクエストを発行し, その結果を集計して返すような処理が簡潔に書ける.

個人的には Future の Seq を Seq の Future に変換する collect() の未来っぽさにときめいた. Future.scala は比較的善良なの Scala コードだったから, 暇なひとは眺めてみても面白いとおもう.

フィルタ/デコレータ

クライアントやサービスを直接使う部分は Finagle がもつ面白さの半分でしかない. フレームワークが提供する様々な付加機能の実装も面白い. たとえば RPC の呼び出しを失敗に応じて再試行してくれる RetryFilter なんてのがある.

class RetryingFilter[Req, Rep](
  backoffs: Stream[Duration],
  statsReceiver: StatsReceiver = NullStatsReceiver,
  shouldRetry: PartialFunction[Try[Rep], Boolean],
  timer: Timer
) extends SimpleFilter[Req, Rep] {
  private[this] val retriesStat = statsReceiver.stat("retries")

  private[this] def dispatch(
    request: Req, service: Service[Req, Rep],
    replyPromise: Promise[Rep],
    backoffs: Stream[Duration],
    count: Int = 0
  ) {
    val res = service(request) respond { res =>
      if (shouldRetry.isDefinedAt(res) && shouldRetry(res)) {
        backoffs match {
          case howlong #:: rest if howlong > 0.seconds =>
            timer.schedule(Time.now + howlong) {
              Trace.record("finagle.retry")
              dispatch(request, service, replyPromise, rest, count + 1)
            }
          ...
        }
        ...
      }
    }
    ...
  }

  def apply(request: Req, service: Service[Req, Rep]) = {
    val promise = new Promise[Rep]
    dispatch(request, service, promise, backoffs)
    promise
  }
}

ここでいう Filter (SimpleFilter) は ServletFilterRack の middleware などを Finagle の文脈で一般化したものだと思えばいい.

RPC 呼び出しの成否判定が PartialFunction になってるところも Scala ぽくて素敵だけれど, もっと面白いのは再試行間隔をあらわす backoff 変数が(無限)ストリームなところ.

object Backoff {
  private[this] def durations(next: Duration, f: Duration => Duration): Stream[Duration] =
    next #:: durations(f(next), f)

  def apply(next: Duration)(f: Duration => Duration) = durations(next, f)

  def exponential(start: Duration, multiplier: Int) =
    Backoff(start) { _ * multiplier }
  ...
}

無駄にかっこいい...なんすかその変な演算子は...

Filter という独立したインターフェイスではなく, RPC 本体をあらわす Service を decorate/proxy するタイプの補助機能もある. たとえば ShardingService.

class ShardingService[Req, Rep](
  distributor: Distributor[Service[Req, Rep]],
  hash: Req => Option[Long]
) extends Service[Req, Rep] {

  def apply(request: Req): Future[Rep] = {
    hash(request) map { hash =>
        val shard = distributor.nodeForHash(hash)
        if (shard.isAvailable)
          shard(request)
        else
          Future.exception(ShardingService.ShardNotAvailableException)
    } getOrElse(Future.exception(ShardingService.NotShardableException))
  }

  override def isAvailable: Boolean = distributor.nodes exists { _.isAvailable }
  override def release() = distributor.nodes foreach { _.release() }
}

Service の リスト(distributor) から適当なインスタンスを選んで呼び出すクラス. リクエストに応じて Distributor が shard 先の実サービスを選んでくれるらしい. 当然 Distributor はインターフェイス (trait) になっており, KetamaDistributor のような consistent hash 実装があったりする. かっこいい.

マニュアルのページ で宣伝されているほかの機能もだいたい似たかんじで実装されている. それにしてもこんな flexible にするほど色々サービスの実装が必要なのだろうか. 都会のコードはむずかしいです...

スタックとの統合: Memcached, Kestrel, Ostrich

さてデザインの話はこのくらいで切り上げ, 野次馬っぽい見方もしていきたい.

Finagle のトップディレクトリ には, いくつか気になる名前のサブディレクトリがある. finagle-kestrelfinagle-memcached なんてのを見ると, 各種ストレージへのアクセスは Finagle スタイルで抽象化されていることがわかる. 本来 RPC でないシステムにも一貫した API を与えるのは良いアイデアに思える. なお finagle-mysql とかがないのは, そのレイヤを shard manager である Gizzard に押し込み, Gizzard 相手には Thrift でアクセスするためだと思われる. この Gizzard も Scala で書かれていて何やってるのかさっぱりわからんのだけれど, これは万一気がむいた時にでも読みたいとおもいます.

他のモジュールも眺めてみる. finagle-ostrichOstrich と呼ばれる集計ライブラリとのインテグレーション.

Finagle の本体であるコアパッケージは, 各所に性能を追跡する instrumentation が埋め込まれている. たとえば先の RetryingFilter にも統計収集用のコードがある.

class RetryingFilter[Req, Rep](
  backoffs: Stream[Duration],
  statsReceiver: StatsReceiver = NullStatsReceiver,
  shouldRetry: PartialFunction[Try[Rep], Boolean], // これ
  timer: Timer
) extends SimpleFilter[Req, Rep] {
  private[this] val retriesStat = statsReceiver.stat("retries") // これ

  private[this] def dispatch(
    request: Req, service: Service[Req, Rep],
    replyPromise: Promise[Rep],
    backoffs: Stream[Duration],
    count: Int = 0
  ) {
    val res = service(request) respond { res =>
      if (shouldRetry.isDefinedAt(res) && shouldRetry(res)) {
        backoffs match {
          ....
          case _ =>
            retriesStat.add(count) // これ
            ...
        }
      ...
      }
    ...
    }
  ...
  }

 }

"retries" と名付けられたインスタンスが再試行回数の分布を追跡しているのがわかる.

finagle-ostrich モジュールはコアパッケージで定義された StatsReceiver などのインターフェイスを実装している. コアは集計すべき情報を提供し, finagle-ostrich と Ostrich がその情報を実際に集計, 記録する役割分担がある.

Ostrich 自体は小さなライブラリで, それほど多機能ではない. ヒストグラム のような 統計記録のためのデータ構造を定義したり, 収集した統計情報を定期的にロガー (com.twitter.logging) に書き出したり, 統計収集の有効/無効フラグを制御する HTTP の API を提供したりといった程度. ただ Ostrich が Finagle と別のパッケージに切り出されているということは, 少なくとも Scala で書かれたいくつかのコンポーネントがこれを使い回しているのだろう.

Instrumentation のマメさはコードベースの成熟度を推し量る指標になる. だから Ostrich 本体よりも, Finagle のコアの側に含まれる instrumentation を眺める方が面白い. QPS や遅延のようなよくある指標以外に何を記録しているのか調べれば, multitier なアーキテクチャのもつ性能上の関心事がわかりそうでしょ.

適当に grep 結果を眺めてみよう:

この二つはあるサービスがバックエンドとの接続を終え準備完了するまでの時間を記録している. システムが起動するまでの時間が案外重要な指標なのかしら.

これは netstat で見えるような値を接続単位で記録している. multitier だとフロントエンドとバックエンドの接続は維持することが多いので, こういう接続単位の情報を眺めるのにも意味があるんだろうね.

他にもロードバランスの負荷や各種の遅延などをちまちま測っている. エラい.

Dapper-like Tracing, BigBrotherBird

Finagle は Dapper 風の 分散プロファイリング(tracing)をサポートを売りにしている. ここでいう分散プロファイリングというのは, 複数ノードをまたぐ RPC の呼び出し関係を追跡し, どの呼び出しにどれだけ時間がかかったかを追跡するようなもの. RPC のメッセージにトレース用の ID などを紛れこませ, その情報をもとにログをとって分析する.

Finagle は tracing のために自身の Thrift 用メッセージングスタックを細工している. (ThriftServerFramedCodec.scala など.) メッセージを送信するときデータに トレーシング用識別子 をうめこみ, 受信先でそれを取り出して記録する. こうした instrumentation をもとに Dapper でいう span を組み立ててログを出力する, のだろう.

トレーシングの仕組み自体はプロトコルに依存しないけれど, HTTP 側の実装にはぜんぜん tracing のコードがなかった. 単に必要がないから作ってないかんじ. ヒマな人は Finagle の HTTP まわりをいじって tracing 機構を組み込んでみると面白いかもしれない.

さて Ostrich と同じく, tracing についてもコアはインターフェイスだけを提供している. 実際のログ収集を担当するのは b3 (BigBrothrBird) と呼ばれるコンポーネントのしごとらしい. finagle-b3 モジュールに インテグレーション用のコードがあった.

ただ大変残念なことに, finagle-b3 には ログを書き出すコードまでしかツリーに含まれていない. Scribe で転送されたログの集計や プロファイル結果の表示部分は, 今のところオープンソースじゃないらしい. トレースしても結果が見れないなら意味ないじゃんか...

ドキュメントなどを物色したところコードは https://github.com/imownbey/BigBrotherBird にホストされている様子を窺えたので, そのうちオープンソースいならないかなーと期待しつつ imownbey さんを follow しておいた. ストーカーめ... オープンソースにしなくてもいいので, Heroku の add-on にして使えるようにしてほしいです.

finagle-b3 をみてもわかるように, Finagle をきちんと使おうとするとデプロイの面倒は多そうだ. 開発元が内製したツールをそのままオープンソースにしただけといった風情なので, 運用環境に乗せる部分は文書もツールも揃っていない. オープンソースにしている時点で立派なので文句を言う気にはならないけれど, もうちょっと敷居が低い感じになればなあ.

RPC と AOP

といったところでだいたいひとめぐりして気がすんだ. Scala 成分も補充できて心なしか都会っ子になれた気がする.

RPC で使うようなオブジェクトの直列化は, どこかリフレクション(イントロスペクション)に似ている. プログラミング言語が本来もっている表現能力を越えてオブジェクトをさわることができる. C++ でオブジェクトの直列化ツールを使うと楽しい理由の一つは, このドーピング感にあると個人的には思う. 言語にはないリフレクションがツールの力で使えるようになる.

そして RPC の下にあるメッセージング機構のプラガビリティは AOP に似ている. フィルタやデコレータを使えば, RPC の呼び出し前後をフックできる. プログラマはこのフックを AOP の Pointcut がわりに使える. 統計情報収集やプロファイリングなんて Pointcut の典型的な応用だ.

メッセージングフレームワークの制約を受け入れることで, Scala の言語が本来持っていなかったアスペクト指向の真似事ができる. C++ で直列化ツールを使うのに似たドーピング感がある. (最近の Java フレームワークはバイトコード生成を使ったアスペクト指向ができるので, 言語の限界は曖昧になっているけれど...)

Future の抽象もノンブロッキングの面倒をなんとなく隠してくれて, そこには Erlang 的軽量プロセスに通じる幻を見れなくもない. 冒頭で紹介したインタビューの中で Nick Kallen は Actor より Future を推していた. Finagle を見たあとならその気持もわかる.

ドーピングでハイになり未来の幻をキメる一方, 一連のスタックとの統合や instrumentation を忘れない手堅さもある. そんな都会派フレームワークの Finagle で Polyglot な明日を感じてみたいものでありますが, 気のせいかもしれません.

東京には来週かえります. 都会だ! あらあらかしこ.

( 写真: http://www.flickr.com/photos/kevineddy/3195193007/ )