本記事では、Google で開発された、分散グラフ処理エンジンである Pregel について書かれた以下の論文について紹介します。
Pregel: A System for Large-Scale Graph Processing - Malewics et al. (Google) 2010
そもそも、グラフ構造のデータセットを処理するグラフ処理エンジン (Graph Compute Engines) は、大きく二種類に分別されます。
Pregel とは、後者である分散グラフ処理エンジンです。Google 社内で開発されたソフトウェアです。
特徴としては、メッセージパッシングをベースとしたアーキテクチャで、Supersteps と呼ばれる処理群を、ワーカーと呼ばれるマシンに分散させて実行します。
節を中心に据えた考え方 (Vertex-centric) をしており、節ごとに処理状態が完了したかどうかを示すフラグ、現在の値、メッセージキューなどを保持しています。
クラスター全体としてマスター・ワーカー構成をとっています。マスターが各ワーカーの状態を管理したり、グラフデータをパーティショニングして負荷分散させたりと,分散されたマシン上に置いてスケーラビリティと耐障害性を向上させるようなアーキテクチャとなっています。
本論文では、その Pregel のアーキテクチャ、C++ API、パフォーマンスについて紹介しています。
Supersteps とは、各節に対して実行される処理群です。
例えば、以下のスライドでは、Superstep ごとに各節がどのように更新されていくかを説明しています。
具体例で利用しているのは、「最も高い値を更新」です。すなわち、Superstep の段階を経るごとに、グラフの中に存在する最大値(ここでは 6
の値)が、各節に波及されていくのがわかります。
この時に実行される計算処理 Compute()
は、Pregel API においては抽象化されています。したがって、最大値を更新していくのみならず、その他の任意のアルゴリズムを実行できます。
例えば、PageRank アルゴリズムは、Pregel API を利用した場合以下のように実行できます。
class PageRankVertex
: public Vertex<double, void, double> {
public:
virtual void Compute(MessageIterator* msgs) {
if (superstep() >= 1) {
double sum = 0;
// メッセージキューにある値の合計値を計算する
for (; !msgs->Done(); msgs->Next())
sum += msgs->Value();
*MutableValue() = 0.15 / NumVertices() + 0.85 * sum;
}
// この例では、30 ステップ処理するまで辺で繋がっている隣接する節に
// 計算途中にある PageRank の値を波及させていく
if (superstep() < 30) {
const int64 n = GetOutEdgeIterator().size();
SendMessageToAllNeighbors(GetValue() / n);
} else {
VoteToHalt();
}
}
}
マスターとワーカーを用いたクラスター全体のアーキテクチャとしては、以下のような構成となっています。
このとき、ワーカーはマスターに対して定期的に ping を送信し続けます。ping メッセージが途切れた時に、マスターはワーカーが何らかのハードウェア障害やネットワーク障害によって停止したと判断し、停止したワーカーに割り当てていたパーティションを再度他のワーカーに割り当てます。
この時、チェックポイントと呼ばれるコミットログのような実行履歴を保存し、正しく処理が実行されたステップがどこまでかをマスターは管理しています。したがって、パーティションを割り当て直した時に、最初から計算し直すような計算資源の無駄遣いを可能な限り避ける工夫もなされています。
各ワーカーごとに、Vertex ID をキー、その他処理に必要な一連の状態をバリューとした HashMap を保持しています。
処理に必要な一連の状態とは、以下の4つです。
A worker machine maintains the state of its portion of the graph in memory. Conceptually this can be though of as a map from vertex ID to the state of each vertex, where the state of each vertex consists of it current value, a list of its outgoing edges (the vertex ID for the edge's target, and the edge's current value), a queue containing incoming messages, and a flag specifying whether the vertex is active.
以上、Google で開発された、分散グラフ処理エンジンである Pregel について紹介しました。
Pregel のオープンソース実装としては、Apache Giraph や Project Pegasus などがあります。また別の機会にこれらのソフトウェアについても紹介していきます。