継続渡しスタイル(CPS)再訪、パート5: CPSと非同期

原文

今日の記事は本当に長いし、読んだ人も混乱するかもしれない。でもまあなんとか頑張ってやっていこう。

以下の作業を考えてみよう。URLのリストがあったときに、それぞれのURLで指示されている文書を取得したい。(話を簡単にするために、取得は全部成功するとしよう。)あなたは 旧世代 なので、ネットワークに接続されたテープドライブストレージに文書のコピーを作成したい。まったく簡単だ。コードは2行。メソッドを作るまでもないのだけど、一応こんな感じ。

void ArchiveDocuments(List<Url> urls)
{
  for(int i = 0; i < urls.Count; ++i)
    Archive(Fetch(urls[i]));
}

よさそうだ。ただし問題は、URLのリストが長くて、ネットワーク接続が遅い場合、制御フローがこんな感じになってしまうかもしれないということだ。

  • 最初の文書を取得する。
  • 待って、待って、待って、待って、待つ……。
  • OK、最初の文書を取得できたので、保存する。
  • 待って、待って、待って、待つ……。
  • OK、保存された。第2の文書を取得する。
  • 待って、待って、待って、待つ……。
  • OK、第2の文書を取得できたので、保存する。
  • 待って、待って、待って、待つ……。

時間の大部分はただ待つことに費やされ、その間CPUは何もしていない。合理的な人なら、最初の文書をテープに保存するのを待っている間に、第2の文書がネットワークから届くのも待てたはずだ と考えるかもしれない。しかし、一体全体どうやればいいのだろうか?

さて、私たちはもう、考えられる全ての制御フローは継続で作ることができる ということを知っている。それに、継続とは基本的に、「次に起こること」を表現するデリゲートであるということも知っている。ここで何か奇跡が起きて、だれか親切な人がCPSでFetchを実装したメソッドFetchAsyncをくれたとしよう。

void FetchAsync(Url url, Action<Document> continuation)

FetchAsyncは、どうにかして――FeedAsyncがやるべきことなので、別に誰も仕組みを知る必要はない――その作業を非同期で処理する。もしかしたら、スレッド・プールからスレッドを取り出すとか、I/O完了スレッドの上でスピンアップするとか、あるいはWindowsの魔法によって、スレッドのメッセージ・ループに向けて完了メッセージを送るとかするのかもしれない。よく知らないけど、今のところ気にしなくていい。気にしなきゃいけないのは、このメソッドは、(1)この作業を非同期で処理し、(2)非同期処理が完了したとき、指定した継続が呼び出される、そういう契約になっていることだけだ。

文書を取ってくるものの継続は文書を受け取ることに注目すること。思い出そう。継続を呼ぶことはサブルーチン呼び出しから値を返すことに論理的に等しい。そしてそれを、同期バージョンのプログラムでやっていたということを。

すごくいいね。では、CPSに似た何かでArchiveDocumentsを書き直してみてたらどうなるか、見てみようか。

C#で継続を書こうとするとすぐに、難題がふりかかる。一体全体どうやれば、ループを中断しても、iの値を正しく保ちながら繰り返し続けられるのだろう?

明らかに、2つの問題を解決しないといけない。

  • ここに前回いたときの、全てのローカル変数の値は何だっただろう?
  • 制御フローはどこから再開すればいいのだろう、そしてそれはどうやって?

最初の問題は、デリゲートを作ってローカル変数を閉じこめることで解決できる; そうすれば、ローカル変数をキャプチャしてクロージャーに巻き取ることができることを私たちは知っている。

2番目の問題は、イテレーターブロックで直面するのと同じ問題だ。結局、「yield return の次から制御を再開する」という同じ問題を解決しなければならない。昨日、C#コンパイラーがどうやってそれを実現しているかについてのRaymondの記事を読んだと思う:C#コンパイラーは、ステートマシンを1つと、適切な場所に分岐する goto をたくさん生成する。FetchAsyncについて、徹底した大規模変換によって継続渡しスタイルにしてしまうということは、明らかにやりたくない。理解能力にかかる負担と、パフォーマンス上のコストが単純に高すぎる。それよりは、イテレーターブロックの時にと同じトリックを使うほうがいい。

さあ、解決案を結合して両方の問題をいっぺんに解決しよう。ラムダを作ってクロージャを手に入れ、すべてをCPSに書き直さずに継続を実装するステートマシンとして内部を書き直そう。

enum State { Start, AfterFetch }

void ArchiveDocuments(List<Url> urls)
{
  State state = State.Start;
  int i;
  Document document;
  Action archiveDocuments = () =>
  {
    switch(state)
    {
      case State.Start:      goto Start;
      case State.AfterFetch: goto AfterFetch;
    }
    Start: ;
    for(i = 0; i < urls.Count; ++i)
    {
      state = State.AfterFetch;
      FetchAsync(urls[i],
        resultingDocument=>
        {
          document = resultingDocument;
          archiveDocuments();
        });
      return;
      AfterFetch: ;
      Archive(document);
    }
  };
  archiveDocuments();
}

注意深い読者は、複数の問題があることに気づくだろう。まず、再帰的なラムダには確定的代入の問題があること。今のところそれは無視しよう。次に、ブロックの外側から中へ飛ぶジャンプがあり、それはC#では違法なこと。それも無視しよう。

ここまでのストーリーを見直そう。

誰かが、たくさんのURLを引数にArchiveDocumentsを呼びだす。すると、メソッドの本当の仕事をするアクションが生成され、それが呼び出される。それから「本当の」メソッドは、Startラベルに制御が移動する。FetchAsyncは、最初の文書を非同期で取得し始める。契約上は、このメソッドはすぐにリターンして、非同期で仕事をする、ということになっているので、今ここでできることはこれ以上何もない。文書の到着を待つ必要がある。 ここでリターンするが、文書が取得されたとき再び呼び出されることがわかっている。

文書が取得されると、継続が呼び出される。継続は、文書の最新のコピーを参照するようにクロージャを更新する。どこから再開するべきかがわかるように、状態はすでに更新済みだ。それから継続は、本当のメソッドをまた呼び出す。

アクションは、AfterFetchラベルに分岐する。(技術的に違法なコードだということを無視しているので念のため。) ループの内側へジャンプして、中断したところから再開する。文書を同期的に保存する。文書が保存されたら、ループの先頭に戻り、次の文書を非同期で取得し始め、そしてまた全部が繰り返される。

正しい方向へ一歩進んだけれど、ゴールをまだ達成していないのは明らかだ。文書も同じように非同期で保存する必要がある。

ここでまた、さっきと同じことをできないだろうか? つまり、魔法の

void ArchiveAsync(Document document, Action continuation)

メソッドを発明し、それから、それを使うように上記のプログラムを変換することは?(元のArchive(document)voidを返すメソッドなので、非同期バージョンでは、継続は引数を取らない。)

愚直にそうしたらどうなるか見てみよう。

Action archiveDocuments = () =>
{
  switch(state)
  {
    case State.Start:        goto Start;
    case State.AfterFetch:   goto AfterFetch;
    case State.AfterArchive: goto AfterArchive;
  }
  Start: ;
  for(i = 0; i < urls.Count; ++i)
  {
    state = State.AfterFetch;
    FetchAsync(urls[i], resultingDocument=>
    {
      document = resultingDocument;
      archiveDocuments();
    });
    return;
    AfterFetch: ;
    state = State.AfterArchive;
    ArchiveAsync(document, archiveDocuments);
    return;
    AfterArchive: ;
  }
};

OK、さあこれで何が起きるだろうか? 文書を非同期で取得したらすぐにリターンする。取得が完了したら、保存のステップに分岐する。非同期の保存が始まり、すぐにリターンする。保存が完了したら、ループの終わりに分岐する、そして、すべてはやり直される。

おっといけない、必要な機能を実装し忘れた。2回目の取得の間の待ち時間は、最初の文書が保存される間の待ち時間に重なっていなければならない。保存処理が始まったとたんにリターンしようとしていたが、それでは何かが足りない。やりたかったことは、保存を始めて、それから次の取得を始めて、そして次の文書を保存し始める前にそれらの両方が完了するのを待つことだ。

ここでの問題は、次に何が起きるか決定するために、ArchiveAsyncをまだ同期メソッドのように扱っているということだ; ここまでにやったことはといえば、このメソッドの実際の制御フローを大いに難しくしただけで、論理的には新しい制御フローはまだ何も追加していない。必要なのは、非同期保存を開始して、リターンせず、それから他のこと――たとえば、次の取得を始めるなど――ができることだ。すぐにリターンしないということは、非同期処理の開始が継続を受け取ってはいけないという意味だ。なぜなら、「次に起こるべきこと」はすぐに起こるからだ! だからこそリターンはしない

それなら、継続を受け取るのは何だろう?

どうやら、メソッドFetchAsyncArchiveAsyncは、ニーズを満たすには不十分のようだ。FetchAsyncvoidではない値を返すと考えてみよう。たとえば、AsyncThingy<Document>、これは私が魔法のように発明した型で、「いつか文書を生み出しそうな非同期処理の状態を追跡している」ということを表現するものだが、これを返すとしたらどうなるだろう。継続を受け取るのは、この何か《Thingy》である。同様に、ArchiveAsyncは、型引数なしのAsyncThingyを返すかもしれない。なぜなら、こっちの処理は何も返さないからだ。これらを使ってメソッドを書き直してみよう。

AsyncThingy<Document> fetchThingy = null;
AsyncThingy archiveThingy = null;

Action archiveDocuments = () =>
{
  switch(state) { blah blah blah }
  Start: ;
  for(i = 0; i < urls.Count; ++i)
  {
    fetchThingy = FetchAsync(urls[i]);
    state = State.AfterFetch;
    fetchThingy.SetContinuation(archiveDocuments);
    return;
    AfterFetch: ;
    document = fetchThingy.GetResult();
    archiveThingy = ArchiveAsync(document);
    state = State.AfterArchive;
    archiveThingy.SetContinuation(archiveDocuments);
    return;
    AfterArchive: ;
  }
};

良くなったけれど、ゴールにはまだたどり着いていない。今のところ、基本的には前と同じ制御フローのままだ。とはいえ1つの入れ子にされたラムダは取り除かれている。それは、継続が完了した後では取得用の何か《Thingy》から文書を取り出すことができるからだ。継続が状態を更新するメカニズムはもういらないし、それはいいことだ。

でも、欲しい機能、つまり、前の文書を保存する間の待ち時間を、現在の文書取得と共有することについてはどうだろうか? 実現しなければならないのは、保存処理の継続を後になるまでセットしないことだ:

  Start: ;
  for(i = 0; i < urls.Count; ++i)
  {
    fetchThingy = FetchAsync(urls[i]);
    state = State.AfterFetch;
    fetchThingy.SetContinuation(archiveDocuments);
    return;
    AfterFetch: ;
    document = fetchThingy.GetResult();
    if (archiveThingy != null)
    {
      state = State.AfterArchive;
      archiveThingy.SetContinuation(archiveDocuments);
      return;
      AfterArchive: ;
    }
    archiveThingy = ArchiveAsync(document);
  }

欲しいものはこれだろうか? 中を見ていこう。最初に、文書を非同期で取得し始める。そこですぐにリターンする。文書が取得されたとき、継続は呼び出され、AfterFetchに分岐する。保存用の何か《Thingy》はまだないので、直後のところは飛ばして進む。取得された文書を取り出して、それを非同期で保存し始める。ここではまだリターンしない。ループの先頭まで戻って、次の文書を非同期で取得し始める。このとき、取得と保存を同時に待っている。 メソッドは、取得の継続をセットした直後にリターンする。

第2の文書の取得が完了したとき、その継続が呼び出される。継続は、AfterFetch に分岐する。今度は、前回開始した保存処理の何か《Thingy》があるので、それに継続をセットしてすぐにリターンする。保存処理が完了したときには、現在の取得処理と前回の保存処理が完了したということがわかっているので、続けて現在の文書を保存し始める。処理は非同期的に開始して、それからループの先頭まで戻って、次の文書を非同期で取得し始めて、必要なだけものごとが繰り返される。

やれやれ。

うまくできたかな? でもまだ何かが変な気がする。

第2の文書を取得している間に最初の文書の保存が完了したとしたらどうだろうか?その場合は、継続を割り当ててすぐにリターンするなんていう無駄なことを全部する必要はない。継続をすぐに呼び出すだけでいい。

同じように、FetchAsync は現在のマシン上に自分自身のローカル・キャッシュを保持していて、最近、別のプロセスがその文書をすでに取得済みだったとしよう。そうなると多分、取得処理は時々、時間のかかる非同期処理を何もせずにすぐ完了することがある。

これらのシナリオを扱うために、処理がすでに完了しているかどうかを示すために、SetContinuationboolを返すとしよう。trueの場合はまだ非同期処理が残っているし、そうでなければfalseになる。(つまり、何か《Thingy》が本当に遅れて継続を実行しそうならtrueで、そうじゃないならfalseになる。)全体をその最終形に書き換えよう。

void ArchiveDocuments(List<Url> urls)
{
  State state = State.Start;
  int i;
  Document document;
  AsyncThingy<Document> fetchThingy = null;
  AsyncThingy archiveThing = null;
  Action archiveDocuments = () =>
  {
    switch(state)
    {
      case State.Start:        goto Start;
      case State.AfterFetch:   goto AfterFetch;
      case State.AfterArchive: goto AfterArchive;
    }
    Start: ;
    for(i = 0; i < urls.Count; ++i)
    {
      fetchThingy = FetchAsync(urls[i]);
      state = State.AfterFetch;
      if (fetchThingy.SetContinuation(archiveDocuments))
      return;
      AfterFetch: ;
      document = fetchThingy.GetResult();
      if (archiveThingy != null)
      {
        state = State.AfterArchive;
        if (archiveThingy.SetContinuation(archiveDocuments))
          return;
        AfterArchive: ;
      }
      archiveThingy = ArchiveAsync(document);
    }
  };
  archiveDocuments();
}

これで完了だ。そのコードをよく読んで、最初のものと比べてみよう:

void ArchiveDocuments(List<Url> urls)
{
  for(int i = 0; i < urls.Count; ++i)
    Archive(Fetch(urls[i]));
}

なんということでしょう! なんとひどく混乱させてしまったんだろう! 全く明解な2行のコードだったのに、これまでに見た中でも最悪な2ダースのスパゲッティコードになってしまった。そしてもちろん、ラベルがスコープ内になく、確定的代入のエラーがあるので、このコードはまだコンパイルさえ通らない。それらの問題を解決するために、ここからさらにコードを書き直す必要がある。

CPSの是非について、私が何と言っていたか覚えているかな?

  • 利点:任意の複雑で面白い制御フローは、単純な部品から造ることができる――その通り。
  • 欠点:継続による制御フローの具象化は、読むのが難しくて、よく考えるのも難しい――その通り。
  • 欠点:制御フローのメカニズムを表現するコードは、コードの意味を完全にかき消してしまう――その通り。
  • 欠点:普通のコードの制御フローのCPS化は、コンパイラには得意かもしれないが、他のほとんど誰もが得意ではない――その通り。

これは知的なエクササイズの類ではない。非同期を扱うとき、実際の人々は結局いつも事実上上記に等しいコードを書くことになる。そして、皮肉にも、プロセッサーがより速くてより安くなったにもかかわらず、とても多くの時間をプロセッサーに制約されないものを待つことに費やしている。多くのプログラムでは、費やされる時間の多くは、ネットワーク・パケットがイギリスから日本まで旅行をしてまた戻ってくるとか、あるいはディスクが回転するとか、そういったことを待つことであり、プロセッサー使用率はゼロに張り付いている。

そのうえ、非同期操作をさらに組み合わせたい場合に起こることについては、まだ何も話していない。ArchiveDocumentsを、デリゲートを受け取る非同期操作にしたいならどうなるだろう。そうすると、それを呼ぶ全てのコードは、同じようにCPSで書かれていないといけなくなる。汚染がただ広がっていく。

非同期ロジックを正しく書くことは重要で、しかもそれは今後もっと重要になりそうだ。そしてDuggan教授が賢明にも言ったように、ここで示した道具は「自分の頭の上に立って、自分自身を裏返す」ものだ。

継続渡しスタイルは確かに強力だけれど、その力を利用するのには、上のコードよりも良いやり方がないといけない。

次回予告:素晴らしい冒険!


インデックスへ戻る


Last Update: 2018-07-03 07:34:40