Menu


Microsoft Most Valuable Person

継続渡しスタイル(CPS)再訪、パート5: CPSと非同期

原文

今日は、本当に長くて混乱させるようになりそうである。しかし、我々はどうにか終わりまでたどり着くだろう。

以下のタスクを考慮せよ:あなたは、URLのリストを得た。あなたは、各々のURLに関連した文書を取得したい。(議論のために、これが常に成功すると思おう。) あなたは古い派であるので、あなたはそれからネットワーク上に取付けられたテープ装置ストレージに文書のコピーをしたい。全く簡単だ。コードは2行。メソッドを作る価値はほとんどない:

void ArchiveDocuments(List<Url> urls)
{
  for(int i = 0; i < urls.Count; ++i)
    Archive(Fetch(urls[i]));
}

素晴らしい。問題は、URLのリストが長くてネットワーク接続が遅いならば、制御フローはこのようになりそうだということである:

  • 最初の文書を取得する。
  • 待って、待って、待って、待って、待つ……。
  • OK、最初の文書を得た。それをアーカイブする。
  • 待って、待って、待って、待つ……。
  • OK、それはアーカイブされた。第2の文書を取得する。
  • 待って、待って、待って、待つ……。
  • OK、第2の文書を得た。それをアーカイブする。
  • 待って、待って、待って、待つ……。

まったく何もしていないCPUと一緒に、あなたは待つことに時間の大部分を費やす。最初の文書をテープにアーカイブするのを待つ間、第2の文書がネットワークから届くのを待っていることもできたとあなたは合理的に思うかもしれない。しかし、一体全体どうやってあなたはそうするつもりなのか?

さて、我々は考えられる全ての制御フローを継続で造ることができるということを知っている。我々は、継続が基本的に、「次に来ること」を表現するデリゲートであるということを知っている。奇跡が起こり、我々にCPSを使ってFetchを実装するメソッドFetchAsyncを与えてくれる親切な人がいたと思おう:

void FetchAsync(Url url, Action<Document> continuation)

FetchAsyncは、どういうわけか――その方法を知っているのは誰だ?それはFeedAsyncの仕事である――非同期でそのタスクをなんとかすることができる。多分、それはスレッド・プールからスレッドをつかむか、I/O完了スレッドの上でスピンアップするか、あるいはWindowsの魔法をいくつかするかして、このスレッドのメッセージ・ループ上で完了メッセージをそれに送らせるのだろう。私は知らないし、今のところ私は気にかけない。私が気にするすべては、契約では、それが非同期でこのタスクをなんとかすることができるということと、非同期タスクが完了したとき、それは与えられた継続を呼ぶということである。

文書を取ってくるものの継続は文書を受け取ることに注目すること;思い出そう。継続を呼ぶことはサブルーチン呼び出しから値を返すことに論理的に等しい。そしてそれは、我々が同期のバージョンでしていたことである。

非常に素晴らしい。CPSに類似した何かでこのものを書き直し始めよう、そして、何があるかについて見よう。

継続をC#で書こうとするとき、すぐに我々は困難に陥る:一体全体どうやって、我々は我々が去ったループを「i」の値を正しく保ちながらまわり続けるつもりか?

明らかに、我々は2つの問題を解決する必要がある:

  • 最後に我々がここにいたとき、全ての局所変数の値は何であったか?
  • 制御フローはどこを、そしてどうやって拾いあげるのか?

最初の問題は、局所変数を閉じこめるデリゲートを作ることによって解決できる;我々は、そうすることが局所変数を捕えて、それらをクロージャに上げるということをすでに知っている。

第2の問題は、反復子ブロックで直面するのと同じ問題である。結局、彼らは同じ問題を解決しなければならない:yield returnの後、制御を再開しなさい。我々は昨日、C#コンパイラがどのようにそれをするかというRaymondの記事を見た:ステート・マシンと、適切な場所に分岐する「goto」をたくさん生み出す。明らかに、我々はこのメソッドをContinuation Passing Styleにする徹底した大規模翻訳を行いたくはない。認識の重荷とパフォーマンス・コストが単純に高すぎる。むしろ、我々は我々が反復子ブロックのために使用したのと同じトリックを使用することができる。

我々は、解決案を結合することによって、両方の問題を解決する:クロージャを得るようにラムダを作り、すべてをCPSに書き直すことなく継続を実装するステート・マシンとしてその内部を書き直そう:

enum State { Start, AfterFetch }

void ArchiveDocuments(List<Url> urls)
{
  State state = State.Start;
  int i;
  Document document;
  Action archiveDocuments = () =>
  {
    switch(state)
    {
      case State.Start:      goto Start;
      case State.AfterFetch: goto AfterFetch;
    }
    Start: ;
    for(i = 0; i < urls.Count; ++i)
    {
      state = State.AfterFetch;
      FetchAsync(urls[i],
        resultingDocument=>
        {
          document = resultingDocument;
          archiveDocuments();
        });
      return;
      AfterFetch: ;
      Archive(document);
    }
  };
  archiveDocuments();
}

気を配る読者は、我々がここで複数の問題を持っている点に注意するだろう。まず、我々が再帰的なラムダを持っているので、我々は確実な代入の問題を持っている。今のところそれを無視しよう。次に、我々はブロックの外側から中への分岐を持っている。そして、それはC#で違法である。それも無視しよう。

ここまでのストーリーを見直そう。

誰かが、たくさんのURLでArchiveDocumentsを呼ぶ。それは、メソッドの本当の仕事をするアクションを生み出して、それを呼び出す。「本当の」メソッドは、それからStartラベルに制御を送る。FetchAsyncは、最初の文書を非同期で取ってき始める。その契約がそれがすぐにリターンして、その仕事を非同期でするということであるので、これ以上我々がここでできることは現在何もない。我々は、文書の接近を待つ必要がある。我々はリターンして、文書が取得されたとき我々が再び呼ばれるということを知っている。

文書が取得されたとき、継続は呼び出される。継続は、文書の最新のコピーを持つようにクロージャを変異させる。我々がどこをもう一度拾いあげるべきかについてわかるように、我々は状態をすでに変異させた。継続は、それからメソッドを呼び出す。

メソッドは、AfterFetchラベルに分岐する。(念のため、技術的に違法なコードであることを無視している。) 我々はループの中央へ飛んで、我々が離れて去ったところを拾う。我々は、文書を同期的にアーカイブする。それがアーカイブされると、我々はループの先頭まで後ろに分岐して、次の文書を非同期で取ってき始めて、全部が繰り返される。

これは正しい方向への一歩だが、明らかに、我々はまだ我々のゴールを達成していない。我々は、同様に文書を非同期にアーカイブする必要がある。

我々は、まさに我々がした同じことをすることができなかったか?つまり、魔法の

void ArchiveAsync(Document document, Action continuation)

メソッドを発明し、それから、それを使うように上記のプログラムを変換することは?(元ののArchive(document)はvoidを返すメソッドであるので、非同期バージョンにおいて、その継続は引数を取らない。)

素朴にそうすることで何が起きるか見てみよう。

Action archiveDocuments = () =>
{
  switch(state)
  {
    case State.Start:        goto Start;
    case State.AfterFetch:   goto AfterFetch;
    case State.AfterArchive: goto AfterArchive;
  }
  Start: ;
  for(i = 0; i < urls.Count; ++i)
  {
    state = State.AfterFetch;
    FetchAsync(urls[i], resultingDocument=>
    {
      document = resultingDocument;
      archiveDocuments();
    });
    return;
    AfterFetch: ;
    state = State.AfterArchive;
    ArchiveAsync(document, archiveDocuments);
    return;
    AfterArchive: ;
  }
};

OK、現在、何が起きるだろうか?我々は非同期で取得してすぐにリターンする。取得が完了するとき、その完了はアーカイブのステップに分岐する。それは非同期でアーカイブし始め、そしてすぐにリターンする。アーカイブが完了するとき、その完了はループの終わりまで分岐する、そして、すべてはやり直される。

しまった。我々は、実際に我々が必要とする機能を実装するのを忘れた:2回目の取得のための待ち時間は、最初の文書がアーカイブされるための待ち時間に重ならなければならない。我々は、アーカイブするタスクを始めた直後にリターンする。何かが足りない。我々がしたいことは、アーカイバを始めて、それから次の取得を始めて、それから次の文書をアーカイブし始める前にそれらの両方が完了するのを待つことである。

ここの問題は、次に何が起きるか決定するために、我々がまだArchiveAsyncをまるでそれが同期だったかのように扱っているということである;我々がここまでしたすべては、どんな新しい論理的制御フローでも実際に加えることなく、このメソッドの実際の制御フローを大いに難しくすることである。我々が必要とするものは、非同期アーカイブを始めて、リターンせず、それから他のこと――次の取得を始めるなど――をすることができることである。我々がすぐにリターンしないならば、それは非同期の開始が継続を受け取ってはならないことを意味する。なぜなら、「次に起こること」はこれから起ころうとしているからだ!我々は、リターンしていない。

それでは、何が継続を受け取るか?

明らかに、我々のメソッドFetchAsyncとArchiveAsyncは、我々のニーズを満たすには不十分である。FetchAsyncがvoidではなく、むしろ値を返すと考えてみよう。そう、AsyncThingy<Document>、私がまさに魔法のように発明した「私はいつか文書を生産しそうな非同期処理の状態を追跡している」ということを表現する型を返すと。継続を受け取るのは、このアレである。同様に、ArchiveAsyncは、型引数なしのAsyncThingyを返すことができる。なぜなら、これはvoidを返すからである。これを用いて我々のメソッドを書き直そう。

AsyncThingy<Document> fetchThingy = null;
AsyncThingy archiveThingy = null;

Action archiveDocuments = () =>
{
  switch(state) { blah blah blah }
  Start: ;
  for(i = 0; i < urls.Count; ++i)
  {
    fetchThingy = FetchAsync(urls[i]);
    state = State.AfterFetch;
    fetchThingy.SetContinuation(archiveDocuments);
    return;
    AfterFetch: ;
    document = fetchThingy.GetResult();
    archiveThingy = ArchiveAsync(document);
    state = State.AfterArchive;
    archiveThingy.SetContinuation(archiveDocuments);
    return;
    AfterArchive: ;
  }
};

良くなったが、まだたどり着いていない。現在、我々は基本的に前と同じ制御フローを持っている。しかし我々は1つの入れ子にされたラムダを取り除いた。なぜなら、我々が継続が完了した後で取得のアレから文書を取り出すことができるからである。我々は継続が状態を変異させるメカニズムを持っている必要はない。そして、それは親切である。

しかし、我々が欲しい機能、すなわち、前の文書をアーカイブするための待ち時間を、現在の文書取得と共有することはどうだろうか?我々がしなければならないのは、アーカイブのタスクの継続を後になるまでセットしないことだ:

  Start: ;
  for(i = 0; i < urls.Count; ++i)
  {
    fetchThingy = FetchAsync(urls[i]);
    state = State.AfterFetch;
    fetchThingy.SetContinuation(archiveDocuments);
    return;
    AfterFetch: ;
    document = fetchThingy.GetResult();
    if (archiveThingy != null)
    {
      state = State.AfterArchive;
      archiveThingy.SetContinuation(archiveDocuments);
      return;
      AfterArchive: ;
    }
    archiveThingy = ArchiveAsync(document);
  }

これは、我々が欲しいものであるか?中を見ていこう。最初に、我々は文書を非同期で取ってき始める。我々はすぐにリターンする。文書が取得されたとき、継続は呼び出され、我々はAfterFetchに分岐する。アーカイブのアレがまだないので、我々は結果をスキップして、進む。我々は取得された文書を得て、それを非同期でアーカイブし始める。我々は、ここでリターンしない。我々はループのトップまで戻って、非同期で次の文書を取ってき始める。現在我々は、取得とアーカイブを同時に待っている。メソッドは、取得の継続をセットした直後にリターンする。

第2の文書の取得が終了したとき、その継続は呼び出される。その継続は、AfterFetchに分岐する。今度は、以前開始したアーカイブのタスクのアレがあるので、我々はその継続をセットしてすぐにリターンする。アーカイブのタスクが完了するとき、それから、我々は現在の取得タスクと前のアーカイブタスクが完了したということを知っているので、我々は続けて現在の文書をアーカイブし始める。我々はそれを非同期で始めて、それから後ろにループして、次の文書を非同期で取ってき始め、ものごとは必要に応じて繰り返す。

やれやれ。

我々はうまくやることができたか?何かがここであまり正しくないように思える。

第2の文書を取得する間に最初の文書のアーカイブが完了するならばどうだろうか?その場合、我々は継続を割り当ててすぐにリターンするというとりとめのない長話を全て行う必要はない。継続をすぐに呼び出させるだけでいい。

同様に、FetchAsyncが現在のマシン上にそれ自身のローカル・キャッシュを保持していて、別のプロセスには最近その文書をすでに取ってきたものがあると仮定しよう;おそらく、取得するものは時々、どんな高価な非同期操作もすることなくすぐ完了することができる。

これらのシナリオを取り扱うために、SetContinuationがboolを返し、タスクがこれまでにすでに完了しているかどうか示していると思おう。タスクがまだ非同期の仕事をするならばtrueで、さもなければfalseになる。(つまり、アレが後で実際に継続を実施しそうであるならばtrueで、そうではないならばfalseになる。)その最終的な形に全部を書こう:

void ArchiveDocuments(List<Url> urls)
{
  State state = State.Start;
  int i;
  Document document;
  AsyncThingy<Document> fetchThingy = null;
  AsyncThingy archiveThing = null;
  Action archiveDocuments = () =>
  {
    switch(state)
    {
      case State.Start:        goto Start;
      case State.AfterFetch:   goto AfterFetch;
      case State.AfterArchive: goto AfterArchive;
    }
    Start: ;
    for(i = 0; i < urls.Count; ++i)
    {
      fetchThingy = FetchAsync(urls[i]);
      state = State.AfterFetch;
      if (fetchThingy.SetContinuation(archiveDocuments))
      return;
      AfterFetch: ;
      document = fetchThingy.GetResult();
      if (archiveThingy != null)
      {
        state = State.AfterArchive;
        if (archiveThingy.SetContinuation(archiveDocuments))
          return;
        AfterArchive: ;
      }
      archiveThingy = ArchiveAsync(document);
    }
  };
  archiveDocuments();
}

これで完了だ。そのコードをよく読んで、我々が始めた以下のものと比較しよう:

void ArchiveDocuments(List<Url> urls)
{
  for(int i = 0; i < urls.Count; ++i)
    Archive(Fetch(urls[i]));
}

なんということでしょう!我々は、なんというひどい混乱を作ったのだろう!我々は、全く鮮明な2行のコードをあなたがこれまでに見た中で最もひどいスパゲッティの2ダース行に拡大した。そして、もちろん、ラベルがスコープ内になく、確実な代入のエラーがあるので、それはまだコンパイルさえできない。我々は、まだそれらの問題を解決するためにさらにコードを書き直す必要がある。

私がCPSの是非について何を言っていたか覚えているだろうか?

  • 利点:任意に複雑で面白い制御フローは、単純な部品から造ることができる――その通り。
  • 欠点:継続による制御フローの具象化は、読むのが難しくて、推論するのが難しい――その通り。
  • 欠点:制御フローのメカニズムを表現するコードは、コードの意味を完全に圧倒する――その通り。
  • 欠点:普通のコードの制御フローのCPS化はコンパイラが得意であるようなことで、他のほとんど誰もが得意ではない――その通り。

これは知的な練習問題ではない。非同期を扱うとき、実際の人々は結局いつも事実上上記に等しいコードを書くことになる。そして、皮肉にも、プロセッサーがより速くてより安くなったにもかかわらず、我々はとても多くの時間をプロセッサーに結び付いていないものを待つことに費やす。多くのプログラムでは、費やされる時間の多くは、ネットワーク・パケットがイギリスから日本まで旅行をしてまた戻ってくるとか、あるいはディスクが回転するとか、そういったことを待つことであり、プロセッサーはゼロに固定されている。

そして、あなたが非同期操作をさらに組み立てたい場合に起こることについて、私は話してさえいなかった。ArchiveDocumentsを、デリゲートを受け取る非同期操作にしたいと仮定する。そうすると、それを呼ぶ全てのコードは、同様にCPSで書かれなければならない。汚染がまさに広がっていく。

非同期ロジックを正しくすることは重要であり、それは将来さらに重要になりそうである。そしてDuggan教授が賢明にも言ったように、我々があなたに与えたツールはあなたを「あなたの頭の上に立っていて、裏返しにあなた自身を一転させ」る。

確かにContinuation passing styleは強力だが、その力の使用を上のコードより良い方法で果たす方法があるべきだ。

次回:素晴らしい冒険!


インデックスへ戻る


Last Update: 2012-09-26 09:38:58