C#のReactive Extensionsを用いた曲レコメンデーション
2025-07-23 21:00:25
原文: Song recommendations with C# Reactive Extensions by Mark Seemann
Observable を小さな Recawr サンドイッチとして使う
この記事は、関数型プログラミングによる設計の代替アプローチというシリーズの一部です。前回の記事では、パイプ&フィルター型アーキテクチャについての一般的な考察を紹介しました。今回はそのC#による実装例を紹介します。
ここで取り上げるコードは、Gitリポジトリの rx ブランチに含まれており、その名のとおり ReactiveX(別名 Reactive Extensions for .NET)を使用しています。
正直に言えば、もともとデータのシーケンスを扱うアルゴリズムをリファクタリングしているため、特に「リアクティブ」な処理があるわけではありません。したがって、このアーキテクチャにはそぐわないと感じる方もいるかもしれません。ただ、実際のコードは 約10分かかる ため、進捗の報告やキャンセル処理を可能にするようなアーキテクチャへの移行は妥当とも言えます。
この例にReactive Extensionsを使う主な利点は、メッセージバスベースの本格的なフレームワークと比べて軽量であることにあります。そのおかげで、アーキテクチャの本質に集中できる、という 開発体験 が得られます。
自分自身のスクロブル(再生履歴)を処理する
まずは、ユーザー自身の上位 スクロブルを見つける処理から始めましょう。内容を忘れてしまった方は、Oleksii Holub の 元記事 や、私の記事 実装の仕様化 を参照してください。
Reactive Extensions を使う場合、IObservable
IObservable<T> は モナド であり、合成性が高いため選択肢は明白です。一方、IObserver<T> は 反変ファンクター を形成しますが、言語サポートが弱いため、モナド を選ぶべきです。
まずは型とイニシャライザを定義します:
public sealed class HandleOwnScrobblesObservable : IObservable<Scrobble>
{
private readonly string userName;
private readonly SongService _songService;
public HandleOwnScrobblesObservable(string userName, SongService songService)
{
this.userName = userName;
_songService = songService;
}
// Implementation goes here...
userName から有限のスクロブルストリームを生成したいので、クラスには IObservable<Scrobble> を実装させます。データ取得のためには songService も必要です。
「なぜ _songService は先頭にアンダースコアがあるのに、userName にはないのか?」と疑問に思うかもしれません。それは、Oleksii Golub がそのような命名をしていたからで、私は必ずしもそれに従う必要はないと考えています。
既に動作するコードがあるため、実装は比較的簡単です。
public IDisposable Subscribe(IObserver<Scrobble> observer)
{
return Observable.Create<Scrobble>(Produce).Subscribe(observer);
}
private async Task Produce(IObserver<Scrobble> obs)
{
// Impure
var scrobbles = await _songService.GetTopScrobblesAsync(userName);
// Pure
var scrobblesSnapshot = scrobbles
.OrderByDescending(s => s.ScrobbleCount)
.Take(100);
// Impure
foreach (var scrobble in scrobblesSnapshot)
obs.OnNext(scrobble);
obs.OnCompleted();
}
Reactive Extensions に不慣れな方にとって最も難しく感じられるかもしれないのは、Subscribe メソッドをどう実装するかを考えるときでしょう。実装の多くを自前で書かずに済ませる方法を見つけるのが厄介なのです。とはいえ Rx は再利用可能なライブラリですから、そのための構成要素が備わっていて当然ですし、実際に存在します。
Subscribe の実装で最も簡単なのは、Observable.Create に委譲する方法のようです。これは式を受け取る API で、実装をラムダ式でそのまま書くことも可能です。ただし今回は、ライブラリの仕様とコードの結びつきを少し緩めるために、private なヘルパーメソッドを使いました。
最初の「不純な」ステップは、すでに「リファレンス実装」で見たものと同じですし、「純粋な」ステップもおなじみのはずです。最後の不純なステップでは、Produce メソッドがスクロブルを、リスナーとして待機しているすべての購読者に対して送信します。
もしこのようなアーキテクチャを Reactive Extensions 以外のフレームワークで実装したい場合には、このステップの置き換えを検討する必要があります。たとえば分散メッセージベースのフレームワークを使っている場合、メッセージを送信する手段として「メッセージバス」を使うことになるかもしれません。そうした環境では、obs.OnNext の代わりに bus.Publish のような呼び出しになるでしょう。その際、各スクロブルを明示的なメッセージオブジェクトにラップし、相関 ID などを付け加える必要も出てくるかもしれません。
多くのメッセージベースのフレームワーク(たとえば NServiceBus など)では、メッセージを受け取るための「ハンドラー」を実装することが求められます。そこでは、ステートレスで長寿命なオブジェクト上の Handle メソッドにメッセージが届けられるという形が一般的です。こうした構造により、堅牢で分散的なシステムを構築できる一方で、メッセージの調整や相関のための追加作業も発生します。
このコード例では、userName は単なるクラスフィールドであり、オブジェクトがメッセージの生成を終えたあとは、obs.OnCompleted() を呼び出してストリームの完了を通知します。
Rx を使った今回の実装は、ここまで述べてきたような一部のメッセージベースのシステムと比べて簡素です。だからこそ、この記事ではこの手法を選びました。ただし、それがより「優れている」という意味ではありません。こうしたシンプルさは、得られる機能が限られるという代償のもとに成り立っています。このシステムには永続化がなく、また私自身はこの分野の専門家ではありませんが、分散システムに容易に拡張できるとも思っていません。とはいえ、改めて言うまでもないかもしれませんが、そうした機能が常に必要とは限りません。私が伝えたいのは、こうした設計上のトレードオフに目を向けてほしい、という点なのです。
他のリスナーを処理する
HandleOwnScrobblesObservable オブジェクトは Scrobble を発行します。次の「フィルター」はどのようなものでしょうか? それは HandleOtherListenersObservable というクラスで実装されている別の Observable ストリームです。これは IObservable<User> を実装しており、クラスの宣言・コンストラクタ・Subscribe 実装はいずれも前述のものとよく似ています。主な違いは Produce メソッドです。
private async Task Produce(IObserver<User> obs)
{
// Impure
var otherListeners = await _songService
.GetTopListenersAsync(scrobble.Song.Id);
// Pure
var otherListenersSnapshot = otherListeners
.Where(u => u.TotalScrobbleCount >= 10_000)
.OrderByDescending(u => u.TotalScrobbleCount)
.Take(20);
// Impure
foreach (var otherListener in otherListenersSnapshot)
obs.OnNext(otherListener);
obs.OnCompleted();
}
リファレンスアーキテクチャと比べて、特に驚く点はありません。最も重要なのは、これもまた小さな Recawr サンドイッチ の一例であるという点です。
アルゴリズムの3つ目のステップは、HandleOtherScrobblesObservable によって処理され、HandleOtherListenersObservable とほぼ同様の構造になっています。詳細は Git リポジトリで確認できます。
合成
これら3つの Observable ストリームが、曲レコメンデーションアルゴリズムを構成する主なビルディングブロックになります。観察してほしいのは、この Observable が「ファンアウト(拡散)」構造になっている点です。最初のステップは userName 1つから最大100個のスクロブルを生成します。そして各スクロブルごとに HandleOtherListenersObservable のインスタンスを1つ生成し、それぞれが最大20件の User 通知を生成し、さらに続きます。
抽象的に見ると、HandleOwnScrobblesObservable のコンストラクタは string → IObservable<Scrobble> の関数と見なせます。同様に、HandleOtherListenersObservable は Scrobble → IObservable<User>、HandleOtherScrobblesObservable は User → IObservable<Song> です。
さて、この3つをどう合成するか?
それはまさに クライスリ矢印 であり、実際には モナディックバインド を使って合成します。C# では通常 SelectMany を使います。
public async Task<IReadOnlyList<Song>> GetRecommendationsAsync(string userName)
{
// 1. 自ユーザーのトップスクロブルを取得
// 2. 同じ曲を聴いた他ユーザーを取得
// 3. そのユーザーのトップスクロブルを取得
// 4. 曲を集約してレコメンドを生成
var songs = await
new HandleOwnScrobblesObservable(userName, _songService)
.SelectMany(s => new HandleOtherListenersObservable(s, _songService))
.SelectMany(u => new HandleOtherScrobblesObservable(u, _songService))
.ToList();
return songs
.OrderByDescending(s => s.Rating)
.Take(200)
.ToArray();
}
アルゴリズムの4番目のステップは Observable として実装されておらず、標準的な LINQ パイプラインとして表現されています。なぜならここでソートが必要になるからです。Observable が無限のストリームである可能性がある以上、OrderByDescending のようなメソッドがないのは当然です(とはいえ、System.Reactive ライブラリには Min や Max が定義されていますが、それらが無限ストリームにどう動作するのかは調べていません)。
この OrderByDescending / Take / ToArray パイプラインに専用の関数を定義することもできますが、私はそれを Fairbairn 閾値 以下の処理だと考えています。
クエリ構文
アルゴリズムはクエリ構文でも合成できます。私はその方が見た目が美しいと思っています。
IObservable<Song> obs =
from scr in new HandleOwnScrobblesObservable(userName, _songService)
from usr in new HandleOtherListenersObservable(scr, _songService)
from sng in new HandleOtherScrobblesObservable(usr, _songService)
select sng;
IList<Song> songs = await obs.ToList();
return songs
.OrderByDescending(s => s.Rating)
.Take(200)
.ToArray();
このコードスニペットでは、変数型を var ではなく明示的に記述してあります。型を読み取りやすくするためです。
結論
この記事では、曲レコメンデーション問題をパイプ&フィルター型アーキテクチャにリファクタリングする実装例を紹介しました。Reactive Extensions for .NET を使うことで、合成・分解の構造を最も簡潔に表現できるため、採用しました。より複雑で分散的な非同期メッセージベースのシステムにも応用できるよう、この記事からアイデアを拡張していただければ幸いです。
次回は、その方向へ少し踏み込んだ例を紹介します。