Ken WagatsumaSRE at Neo4j

Paper Review: "Pregel: A System for Large-Scale Graph Processing"

★★★ advanced

October 02, 2021

本記事では、Google で開発された、分散グラフ処理エンジンである Pregel について書かれた以下の論文について紹介します。

Pregel: A System for Large-Scale Graph Processing - Malewics et al. (Google) 2010

About Pregel

そもそも、グラフ構造のデータセットを処理するグラフ処理エンジン (Graph Compute Engines) は、大きく二種類に分別されます。

  1. 単一マシン上で稼働することを前提に設計されたグラフ処理エンジン 例:Cassovary
  2. 分散アーキテクチャ上で可動することを前提に設計されたグラフ処理エンジン 例:Apache Giraph, Project Pegasus

Pregel とは、後者である分散グラフ処理エンジンです。Googlek 社内で開発されたソフトウェアです。

特徴としては、メッセージパッシングをベースとしたアーキテクチャで、Supersteps と呼ばれる処理群を、ワーカーと呼ばれるマシンに分散させて実行します。

節を中心に据えた考え方 (Vertex-centric) をしており、節ごとに処理状態が完了したかどうかを示すフラグ、現在の値、メッセージキューなどを保持しています。

クラスター全体としてマスター・ワーカー構成をとっています。マスターが各ワーカーの状態を管理したり、グラフデータをパーティショニングして負荷分散させたりと,分散されたマシン上に置いてスケーラビリティと耐障害性を向上させるようなアーキテクチャとなっています。

本論文では、その Pregel のアーキテクチャ、C++ API、パフォーマンスについて紹介しています。

Supersteps

Supersteps とは、各節に対して実行される処理群です。

例えば、以下のスライドでは、Superstep ごとに各節がどのように更新されていくかを説明しています。

具体例で利用しているのは、「最も高い値を更新」です。すなわち、Superstep の段階を経るごとに、グラフの中に存在する最大値(ここでは 6 の値)が、各節に波及されていくのがわかります。

この時に実行される計算処理 Compute() は、Pregel API においては抽象化されています。したがって、最大値を更新していくのみならず、その他の任意のアルゴリズムを実行できます。

例えば、PageRank アルゴリズムは、Pregel API を利用した場合以下のように実行できます。

class PageRankVertex
    : public Vertex<double, void, double> {
public:
    virtual void Compute(MessageIterator* msgs) {
        if (superstep() >= 1) {
            double sum = 0;
            // メッセージキューにある値の合計値を計算する
            for (; !msgs->Done(); msgs->Next())
                sum += msgs->Value();
            *MutableValue() = 0.15 / NumVertices() + 0.85 * sum;
        }

        // この例では、30 ステップ処理するまで辺で繋がっている隣接する節に
        // 計算途中にある PageRank の値を波及させていく
        if (superstep() < 30) {
            const int64 n = GetOutEdgeIterator().size();
            SendMessageToAllNeighbors(GetValue() / n);
        } else {
            VoteToHalt();
        }
    }
}

Master and Worker Architecture

マスターとワーカーを用いたクラスター全体のアーキテクチャとしては、以下のような構成となっています。

Master and Worker Architecture

  • マスターは、グラフデータの処理自体に責務を持たず、他のワーカーの状態管理のみを行います
  • ワーカーは、まずマスターにクラスターに登録したことを知らせるメッセージを送信します
  • マスターは、各ワーカーにどれくらいのパーティションを任せるかを決定します
  • マスターが、各ワーカーに Superstep に対して実行指示を出します

Fault Tolerance

このとき、ワーカーはマスターに対して定期的に ping を送信し続けます。ping メッセージが途切れた時に、マスターはワーカーが何らかのハードウェア障害やネットワーク障害によって停止したと判断し、停止したワーカーに割り当てていたパーティションを再度他のワーカーに割り当てます。

この時、チェックポイントと呼ばれるコミットログのような実行履歴を保存し、正しく処理が実行されたステップがどこまでかをマスターは管理しています。したがって、パーティションを割り当て直した時に、最初から計算し直すような計算資源の無駄遣いを可能な限り避ける工夫もなされています。

Worker Implementation

各ワーカーごとに、Vertex ID をキー、その他処理に必要な一連の状態をバリューとした HashMap を保持しています。

処理に必要な一連の状態とは、以下の4つです。

  1. current value: 現在の値
  2. outgoing edges: 繋がってる Edges の ID
  3. message queue: 周辺の Vertex から送られてきたメッセージとそれを保持するキュー
  4. voted to halt?: 処理を停止したかどうかを示すフラグ

State of Graph in Worker

A worker machine maintains the state of its portion of the graph in memory. Conceptually this can be though of as a map from vertex ID to the state of each vertex, where the state of each vertex consists of it current value, a list of its outgoing edges (the vertex ID for the edge's target, and the edge's current value), a queue containing incoming messages, and a flag specifying whether the vertex is active.

最後に

以上、Google で開発された、分散グラフ処理エンジンである Pregel について紹介しました。

Pregel のオープンソース実装としては、Apache GiraphProject Pegasus などがあります。また別の機会にこれらのソフトウェアについても紹介していきます。

Recommended Posts

  1. ★ introductory
    portfolio
    本記事では、グラフデータベースとはそもそも何なのかについて、ソフトウェアエンジニアリングを習いたての人や、他業種からきた人にも伝わる説明を目指してみます。 データベースとは そもそも、データベースとは何でしょうか。 Web エンジニアであれば、MySQL や PostgreSQL などのリレーショナルデータベース(以下:RDBMS)をよく利用しているでしょう。Memcached や Redis…
  2. ★ introductory
    pandra-papers
    Graph Database を利用したユースケースの一例として、Neo4j を利用した国際コンソーシアム(ICIJ)によるリーク文書の解析事例について紹介します。 Internationa Consortium of Investigative Journalists: "Pandora Papers: An offshore data tsunami…