Build. Translate. Understand.

F#のエージェントを用いた曲レコメンデーション

2025-07-28 19:40:25

原文: Song recommendations with F# agents by Mark Seemann

MailboxProcessor を小さな Recawr サンドイッチとして使う

この技術記事は、関数型プログラミングによる設計の代替アプローチというというシリーズの一部です。このシリーズでは、特定の問題に対して関数型プログラミングを活用する様々な代替手法について紹介しています。過去に紹介した Impureim サンドイッチ の設計パターンに対して、最も多く寄せられる疑問は「アルゴリズムの途中で追加の不純な読み取りを行う必要がある場合はどうするのか?」というものです。

このシリーズでは、そうした要件が避けられない場合にどのような代替手段が考えられるかを探ります。以前の記事では、一般的な代替手法として Transaction Script のような無秩序な処理を、自己完結した「フィルタ」による パイプ&フィルター アーキテクチャ にリファクタリングする方法を紹介しました。各フィルタはそれ自身が Recawr サンドイッチ です。

使用する技術によって、「フィルタ」は「アクター」「メッセージハンドラ」あるいはこの記事で紹介する「エージェント」などと呼ばれることがあります。一貫性のあるパターン言語を学ぶには Enterprise Integration Patterns を参照することをお勧めします。

以下のコードは、Git リポジトリの fsharp-agents ブランチから抜粋したものです。

Async と Task の違い

F# の標準ライブラリには MailboxProcessor というクラスがあり、「エージェント」と呼ばれることもあります。これは、内部キューから一度に1つずつメッセージを取り出し、バックグラウンドで実行されるインメモリのメッセージハンドラです。

非常に古くから存在しており、その API は Async<T> に基づいています。これは現在広く使われている Task<TResult> よりも前のものです。変換手段は存在しますが、例示コードを簡素化するため、まず SongService インターフェースを Async ベースに書き換えることにしました。

type SongService =
    abstract GetTopListenersAsync : songId : int -> Async<IReadOnlyCollection<User>>
    abstract GetTopScrobblesAsync : userName : string -> Async<IReadOnlyCollection<Scrobble>>

なお、慣用的な I プレフィックスがないため直感的にわかりにくいかもしれませんが、これはインターフェースです。抽象クラスであれば [<AbstractClass>] 属性が必要になります。

これは小規模な変更であり、task 式から async 式 への置き換えなど、数行の修正で済みました。

自分のスクロブル(再生記録)を収集する

以前の記事 と同様に、アルゴリズムの最初のステップとして、ユーザー名からそのユーザーのトップスクロブルを取得します。取得後、エージェントにメッセージとして送信する必要があります。F# エージェントはメッセージベースなので、まず適切なメッセージ型を定義します。

type private ScrobbleMessage = Scrobble of Scrobble | EndOfStream

Reactive Extensions for .NET を使用した以前の記事では、OnCompleted メソッドでストリームの終了を通知できました。一方、F# のエージェントはストリームではなくメッセージのコンシューマーなので、ScrobbleMessageScrobble または EndOfStream のいずれかになります。

メッセージ型が定義できたら、アルゴリズムの最初のステップは以下のように記述できます。

// string -> SongService -> MailboxProcessor<ScrobbleMessage> -> Async<unit>
let private gatherOwnScrobbles
    userName (songService : SongService) (channel : MailboxProcessor<_>) =
    async {
        // Impure
        let! scrobbles = songService.GetTopScrobblesAsync userName
 
        // Pure
        let scrobblesSnapshot =
            scrobbles
            |> Seq.sortByDescending _.ScrobbleCount
            |> Seq.truncate 100
            |> Seq.map Scrobble
 
        // Impure
        Seq.iter channel.Post scrobblesSnapshot
        channel.Post EndOfStream }

gatherOwnScrobbles はそれ自体がエージェントではなく、エージェントにメッセージを送信する関数です。この処理は Async<unit> を返します。つまり結果を返すのではなく、指定された channel にメッセージを送信します。

前回の記事と同様に、すべてのスクロブルを送信した後、gatherOwnScrobbles は最後に EndOfStream メッセージを送ってストリームの終了を示します。この実装でもこの方法は機能します。というのも、F# のエージェントは(私の観測できた範囲では)メッセージを順番通りに処理しているようだからです。ただし、メッセージバスに基づいた分散メッセージングフレームワークを使用していて、かつ複数のマシン上でハンドラーが動作しているような場合には、常にそうであるとは限りません。そのようなケースでは、パイプ&フィルターを用いた曲レコメンデーションにも書いたように、この記事と前回の記事の内容から応用的に考える必要があります。このような場合には、Enterprise Integration Patterns で紹介されているようなパターン言語が役に立つかもしれません。たとえば Message Sequence を検討するとよいでしょう。

この gatherOwnScrobbles 関数は、明確なステップを持つ小さな Recawr サンドイッチ に該当します。

次のステップとして、これらのスクロブルメッセージを受け取る MailboxProcessor を実装します。

他のリスナーを収集する

gatherOwnScrobbles によって投稿されたメッセージを処理するために、エージェントを作成します。ただし、このエージェントはアルゴリズムを完結させるものではありません。代わりに、さらに別のエージェントが処理する可能性のある追加のメッセージを公開します。そのため、別のメッセージ型が必要です。

type private UserMessage = User of User | EndOfStream

ここからパターンが見え始めます:payload ケースと、これ以上メッセージが来ないことを示すケースです。

以下の処理は、Scrobbleメッセージを処理し、ユーザーメッセージを公開するエージェントを作成します。

// SongService -> MailboxProcessor<UserMessage> -> MailboxProcessor<ScrobbleMessage>
let private gatherOtherListeners
    (songService : SongService) (channel : MailboxProcessor<_>) =
    MailboxProcessor.Start <| fun inbox ->
        let rec loop () = async {
            let! message = inbox.Receive ()
            match message with
            | Scrobble scrobble ->
                // Impure
                let! otherListeners =
                    songService.GetTopListenersAsync scrobble.Song.Id
 
                // Pure
                let otherListenersSnapshot =
                    otherListeners
                    |> Seq.filter (fun u -> u.TotalScrobbleCount >= 10_000)
                    |> Seq.sortByDescending _.TotalScrobbleCount
                    |> Seq.truncate 20
                    |> Seq.map User
 
                // Impure
                Seq.iter channel.Post otherListenersSnapshot
                return! loop ()
            | ScrobbleMessage.EndOfStream -> channel.Post EndOfStream }
        loop ()

エージェントの初期化と実装に必要なインフラ(MailboxProcessor)はさておき、主なメッセージハンドラは再び小さな「Recawrサンドイッチ」構造です。match 式のもう一つのケースでは、EndOfStream を別の EndOfStream にマッピングしています。このケースでは loop を再帰的に呼び出していない点に注目してください。これは、EndOfStream メッセージを受信した時点で、それ以降のメッセージ処理をすべて停止することを意味します。

loop は、メッセージを受信する「マークなしの」非純粋な処理から始まっていることに気づいたかもしれません。メッセージが届いたら、それにマッチします。サンドイッチの層が3つ以上あるように見えるかもしれませんが、以前説明したように、サンドイッチの層は必ずしも3つである必要はありません

メッセージの受信と分配を1行に圧縮することも可能です:

match! input.Receive () with

ですが、F# エージェントに慣れていない読者のために、明示的な message 変数をコードに含めることで、可読性を高めた方が良いと判断しました。これは、可読性向上を目的に変数を明示する一例です。

gatherOtherScrobbles によって作成される第三のエージェントは、gatherOtherListeners によって公開されたメッセージを処理し、さらに多くの曲メッセージを公開します。このメッセージ型の詳細には後ほど触れますが、エージェントの構造は上記のものと似ています。詳細について気になる場合は、Gitリポジトリをご覧いただくことをお勧めします。

レコメンド情報の収集

最終的なエージェントは少し異なります。このエージェントには、以下の2つの責務があります:

  • 曲メッセージを処理すること
  • レコメンド情報が準備できたらそれを返すこと

この追加の責務のため、メッセージ型は単純な2分岐の判別共用体ではありません。代わりに、これまでに登場しなかった第三のケースを持ちます。

type private SongMessage =
    | Song of Song
    | EndOfStream
    | Fetch of AsyncReplyChannel<Option<IReadOnlyCollection<Song>>>

SongEndOfStream ケースは、gatherOtherScrobbles エージェントによって公開されるメッセージです。

では、第三のケース Fetch は何をするのでしょうか? AsyncReplyChannel をペイロードとして持つという点が奇妙に見えるかもしれませんが、これは F# のエージェントが リクエスト・リプライ パターンをサポートする方法です。以下でこの仕組みがどのように機能するかを説明します。

// unit -> MailboxProcessor<SongMessage>
let private collectRecommendations () =
    MailboxProcessor.Start <| fun input ->
        let rec loop recommendations isDone = async {
            let! message = input.Receive ()
            match message with
            | Song song -> return! loop (song :: recommendations) false
            | SongMessage.EndOfStream ->
                let recommendations =
                    recommendations
                    |> List.sortByDescending _.Rating
                    |> List.truncate 200
                return! loop recommendations true
            | Fetch replyChannel ->
                if isDone then
                    replyChannel.Reply (Some recommendations)
                else
                    replyChannel.Reply None
                return! loop recommendations isDone }
        loop [] false

このエージェントの目的は、前のエージェントがレコメンドした曲をすべて収集することです。EndOfStream メッセージを受け取ると、曲をソートし、上位200件だけを保持します。

再帰的な loop 関数は、recommendationsisDone の2つのパラメータを受け取ります。ループは空の曲リストと false のフラグから始まります。新しい Song が届くと、曲をリストに追加して再帰します。その場合、フラグは false のままです。

EndOfStream メッセージを受信すると、最終的なレコメンド情報が計算されます。以後は、フラグを立てた状態(true に設定)で loop を再帰的に呼び出します。ただし、他のエージェントと異なり、EndOfStream メッセージを受け取った後でも処理を停止しません。

実行中はいつでも Fetch メッセージが届く可能性があります。これは、レコメンド情報を返すようリクエストするメッセージです。レコメンド情報が準備できている場合は Some ケースに包んで返され、未準備であれば None が返されます。

これにより、レコメンド情報が準備されるまでポーリングする仕組みが可能になります。これがどのように機能するか、次のセクションで説明します。

結果のポーリング

MailboxProcessor クラスには PostAndAsyncReply というメソッドが定義されており、これは SongMessage 型の Fetch ケースに適合しています。このため、以下のようなポーリング機構が実装可能です:

let rec private poll (agent : MailboxProcessor<_>) = task {
    match! agent.PostAndAsyncReply Fetch with
    | Some result -> return result
    | None -> return! poll agent }

この再帰的な処理では PostAndAsyncReply を使ってエージェントにポーリングを行い、有益な返信が返ってくるまで繰り返します。これは主に説明目的のコードなので、いくつかの簡略化をしています。

まず、このコードは事実上ビジーループを実装しています。None が返された場合、即座に再帰して再度試行します。より合理的な実装では、少し遅延を挟むべきですが、最適な遅延時間を見つけるには試行錯誤が必要になるかもしれません。パフォーマンスが気になる場合は、実際にパフォーマンス比較を行うことをお勧めします。このコードはデモ目的であるため、本格的なパフォーマンスチューニングはしていません。観察上は、この密なループでもテストの実行速度に影響は見られません。

次に、poll ループは有益な結果を得るまで続きます。もしその結果が永遠に返ってこなかったらどうなるでしょうか?より堅牢な実装では、一定時間内に応答が得られなかった場合に諦めるようなタイムアウト処理を入れることをお勧めします。

さらに、この poll 処理は型検査上どうして正しく動作するのでしょうか?MailboxProcessor<'Msg> オブジェクトに対して、PostAndAsyncReply メソッドは次のような型を持ちます:

(AsyncReplyChannel<'Reply> -> 'Msg) -> Async<'Reply>

(タイムアウトパラメータは無視しています)

上記の Fetch ケースのコンストラクタは PostAndAsyncReply の型と一致しています。なぜなら、以下のような型を持つからです:

AsyncReplyChannel<Option<IReadOnlyCollection<Song>>> -> SongMessage

これにより、'ReplyOption<IReadOnlyCollection<Song>> と推論され、'MsgSongMessage であると導かれます。したがって、agent パラメータは MailboxProcessor<SongMessage> 型であると推論されます。

合成

すべての構成要素が揃ったので、これらを同期的なメソッドとして合成できます。以下の例では、GetRecommendationsAsync メソッドの型や観察可能な挙動には変更がありません。Task から Async への変更(前述)により、FakeSongService にいくつか簡単な変更を加えましたが、それ以外のテストコードには変更を加えていません。

まず最初の試みとして、以下のように F# の慣用的な左から右へのパイプライン演算子を使ってエージェントを合成できます。

type RecommendationsProvider (songService : SongService) =
    member _.GetRecommendationsAsync userName =
        let collect = collectRecommendations ()
        task {
            do! collect
                |> gatherOtherScrobbles songService
                |> gatherOtherListeners songService
                |> gatherOwnScrobbles userName songService
            return! poll collect }

最初に task 式がすべてのエージェントを起動し、その後 collect エージェントが結果を返すまで poll を行います。

この合成はすべてのテストをパスしますが、少なくとも2つの問題があります。1つは合成の順序が逆になっているように見えることです。コード上では collect から始まり gatherOtherScrobbles に進んでいくように見えますが、実際の処理順序は逆です。この合成は「下から上」、あるいはコードを1行に並べたときの「右から左」と理解するべきです。この問題とは別に、もう一つの問題もありますが、まずはこちらを見ていきましょう。

この問題を解消するために、私はまず逆方向のパイプライン演算子 <| を試しました。しかし、構文の優先順位の関係でかっこが必要となり、結局逆方向の演算子を使う利点がありませんでした。

type RecommendationsProvider (songService : SongService) =
    member _.GetRecommendationsAsync userName =
        let collect = collectRecommendations ()
        task {
            do!
               gatherOwnScrobbles userName songService (
               gatherOtherListeners songService (
               gatherOtherScrobbles songService (
               collect)))
            return! poll collect }

この合成では、やや非標準的なコードフォーマットが使われています。例えば、collectgatherOtherScrobbles の内部にネストされているため、本来は右にインデントされるべきです。同様に、gatherOtherScrobblesgatherOtherListeners にネストされているため、その右にインデントされるべきです。より慣用的なフォーマットでは以下のようになるでしょう。

type RecommendationsProvider (songService : SongService) =
    member _.GetRecommendationsAsync userName =
        let collect = collectRecommendations ()
        task {
            do! gatherOwnScrobbles userName songService (
                    gatherOtherListeners songService (
                        gatherOtherScrobbles songService collect))
            return! poll collect }

しかしこのフォーマットは、合成の意味よりもコードの構造の方を強調してしまいます。私はあまり良い書き方とは思いません。

とはいえ、この問題は別の問題によって無意味になります。MailboxProcessor クラスは IDisposable を実装しているため、使い終わったら確実に破棄すべきです。これ自体は可能ですが、合成の順序が再び逆になります。

type RecommendationsProvider (songService : SongService) =
    member _.GetRecommendationsAsync userName = task {
        use collect = collectRecommendations ()
        use otherScrobbles = gatherOtherScrobbles songService collect
        use otherListeners = gatherOtherListeners songService otherScrobbles
        do! gatherOwnScrobbles userName songService otherListeners
 
        return! poll collect }

この問題は完全に解決できないわけではありませんが、私はこの方向にこれ以上時間を費やすつもりはありません。エージェントを生成して返す「ファクトリーアクション」の代わりに、各エージェントを個別のオブジェクトにリファクタリングすれば、そのオブジェクトが破棄されると、内部に含まれる他のエージェントも一緒に破棄されるようにできるでしょう。そうすれば、上記のようにオブジェクトを合成し、最外部のオブジェクトだけを破棄すれば済むでしょう。

評価

エージェントを綺麗に合成し、自己文書化的なコードとして扱えるようにするための試行錯誤から分かったのは、F# のエージェントは ReactiveX ほど合成しやすくないということです。公平を期すと、MailboxProcessor クラスは Reactive Extensions よりも古いため、より新しい技術より劣っていることを責めるべきではありません。

主な問題の1つは、エージェントが IObservable<T> のように自然に合成できないことです。私は MailboxProcessor<'Msg>モナド にできないか一瞬検討しましたが、『Thinking with Types』から得た共変・反変に関する知識により、この型が不変であるとすぐに理解しました。これは、Post メソッドが 'Msg に対して反変である一方、その他のメソッドは共変であるためです。すべてのメソッドの変位性を計算するまでもなく、互換性のない2つのメンバーがあることから不変と判断できます。

もう一つの問題点は、collectRecommendations アクションの見た目が煩雑であることです。提示されている形では、綺麗な Impureim サンドイッチには見えません。ほとんどの「中間」のメッセージ処理を純粋関数に抽出することは可能ですが、Fetch ケースの存在がそれを難しくしています。replyChannel.Reply の呼び出しには副作用があり、その副作用を他の場所に移すためのリファクタリング方法はありますが、副作用を発生させるには replyChannel へのアクセスが必要です。その副作用を返すアクションとして外部に取り出すこともできますが、その価値はあまり感じません。一般に、このリクエスト・リプライ型 API は関数型として魅力的とは言えません。

これだけ問題点を挙げてしまうと、このようなアーキテクチャを使う価値があるのかと疑問を抱くかもしれません。ただし、ここで指摘した問題の多くは MailboxProcessor 特有のものであることに注意してください。別のメッセージベースのフレームワークを使用した場合には、同じ問題に直面しない可能性があります。

また、エージェントのチェーンにメッセージを投稿し、最終的に1つのエージェントから結果を取得するという合成が「ムダに手間がかかっている」ように思えるかもしれません。しかしこのコードは、約10分ほどかかる同期処理をリファクタリング したものです。詳細は不明ですが、おそらく元となるコードベースでは10分間同期的に待つことはなく、バックグラウンドのバッチジョブとして結果を JSON ファイルなどの永続ビューに書き出しているのではないかと推測されます。そうであれば、新しいバッチジョブを起動してバックグラウンドで完了を待つだけの非同期メッセージベースアーキテクチャは理にかなっており、ポーリングループは不要です。誰かがリクエストしたときには、最新の永続ビューを返すだけです。そしてそのビューは、定期的にバックグラウンドジョブによって更新される仕組みになります。

結論

前回の記事では、Reactive Extensions を使って自己完結型の Recawr サンドイッチを合成しましたが、今回は F# のエージェントを使うことで、より標準的なメッセージベースアーキテクチャに近づいています。一部の詳細は F# エージェントに特有かもしれませんが、非純粋な処理を複数のサンドイッチに分解して合成する方法の参考になることを願っています。

このアプローチがこの連載で扱っている根本的な問題にとって常に最適な解決策であるとは限りませんが、データセットが大きく実行時間も長い場合には適切です。全体の問題を「投げっぱなし(fire-and-forget)」型アーキテクチャに変換できるなら、メッセージベースシステムを採用することをお勧めします。

一方で、同期的な性質を維持する必要がある場合には、より汎用的な 大きなハンマー に頼る必要があるかもしれません。

次回: フリーモナドを用いた曲レコメンデーション


インデックスへ戻る