< back

Neo4j Bolt Driver for Go: Code Reading - Connection Pool

本記事では、Neo4j Driver として公式サポートされている、neo4j-go-driver におけるコネクションプールの実装について見ていきます。

本記事では、v4.3 branch の最新である 5a14c7024ca3203d89d54ae34bbfbc2886249401 commit hash のソースコードを前提にしています。

About Connection Pool

Driver インスタンスは、NewDriver() を利用して生成します。

driver, err = neo4j.NewDriver("neo4j://localhost:7687", neo4j.BasicAuth("username", "password", ""))
if err != nil {
    return err // handle error
}
defer driver.Close()

この時、README に以下に記載されている通り、Driver ごとにデータベースへのコネクションをプールしているので、アプリケーションごとに一つ Driver インスタンスを生成すれば十分です。

Each Driver instance maintains a pool of connections inside, as a result, it is recommended to only use one driver per application.

このコネクションプールの実装について確認してみましょう。

コネクションプールの作成

NewDriver() が呼ばれたタイミングで、コネクションプールも生成します。デフォルトでは、コネクションプールの最大値は 100、各コネクションが維持される時間が 1 時間となっています。

// driver.go
func NewDriver() {
    //...
    // Let the pool use the same logid as the driver to simplify log reading.
    d.pool = pool.New(d.config.MaxConnectionPoolSize, d.config.MaxConnectionLifetime, d.connector.Connect, d.log, d.logId)
    //...
}

//...

type driver struct {
    //...
    pool      *pool.Pool
    //...
}

Thread Safety

ここで、コネクションプールの実態は neo4j/internal/poolpackage pool で実装されています。

まず、複数のサーバーの接続を Pool.server フィールドで管理しています。Go の map はスレッドセーフではないので、複数の goroutine からアクセスする時に相互ロックを取る必要があるので、sync.Mutex を利用しています。

つまり、このプールの実装は(バグがない限り)スレッドセーフです

// pool.go
type Pool struct {
    maxSize    int
    maxAge     time.Duration
    //...
    servers    map[string]*server
    serversMut sync.Mutex
    queueMut   sync.Mutex
    queue      list.List
    //...
}

queue についても同様にスレッドセーフを意識して書かれています。

キューとは、接続を開始したいクライアントからのリクエストの内、プールしているコネクションが利用できない時のための待ち行列です。標準ライブラリである container/list双方向連結リストを用いて実装されています。

// pool.go

// Add a waiting request to the queue and unlock the queue to let other threads that returns
// their connections access the queue.
q := &qitem{
    servers: serverNames,
    wakeup:  make(chan bool),
}
e := p.queue.PushBack(q)

Penalty

では、コネクションプールに存在するサーバーの内、どのサーバーに接続するかはどのように判断しているのでしょうか。

neo4j-go-driver では、Penalty という概念を導入しています。サーバーごとにこのペナルティを計算し、ペナルティの値(uint32)が少ないサーバーを選択する実装となっています。

ペナルティはどの様に計算されているのでしょうか?

以下のコードを見てみると、「接続失敗した回数」や「処理に忙しい接続中のコネクションの数」、「初回の接続かどうか」 をもとにペナルティを計算しています。コネクションプールに存在するサーバーの内、クライアントの観点から見た時に最もパフォーマンスが良さそうなサーバーを選んでいます。

ペナルティの値が同じ時は、ラウンドロビンで決定することで一部のサーバーに負荷が偏らない工夫も見られます。

// server.go

const newConnectionPenalty = uint32(1 << 8)

// Calculates a penalty value for how this server compares to other servers
// when there is more than one server to choose from. The lower penalty the better choice.
func (s *server) calculatePenalty(now time.Time) uint32 {
    penalty := uint32(0)

    // If a connect to the server has failed recently, add a penalty
    if s.hasFailedConnect(now) {
        penalty = 1 << 31
    }
    // The more busy connections, the higher penalty
    numBusy := uint32(s.busy.Len())
    if numBusy > 0xff {
        numBusy = 0xff
    }
    penalty |= numBusy << 16
    // If there are no idle connections, add a penalty as the cost of connect would
    // add to the transaction time
    if s.idle.Len() == 0 {
        penalty |= newConnectionPenalty
    }
    // Use last round-robin value as lowest priority penalty, so when all other is equal we will
    // make sure to spread usage among the servers. And yes it will wrap around once in a while
    // but since number of busy servers weights higher it will even out pretty fast.
    penalty |= (s.roundRobin & 0xff)

    return penalty
}

また、ここでビット演算を用いることで、省メモリを実現しています。また、ペナルティの種類ごとに優先順位をつけています。例えば、「接続失敗した回数(hasFailedConnect)」は、「処理に忙しい接続中のコネクションの数(numBusy)」より上位ビットなので、接続失敗したことが多いと、処理に忙しいコネクションの数の少ないサーバーより選択されにくいことがわかります。

variable description bit
newConnectionPenalty 初回の接続かどうか 1 << 8
hasFailedConnect 接続失敗した回数 1 << 31
numBusy = 2 処理に忙しい接続中のコネクションの数(少) 2 << 16
numBusy = 32 処理に忙しい接続中のコネクションの数(中) 32 << 16
numBusy >= 255 処理に忙しい接続中のコネクションの数が(大) 255 << 16

以上を一言で言い換えると、neo4j-go-driver のコネクションプールのペナルティの優先順には、「処理に忙しい接続中のコネクションが少ないサーバーの内、接続失敗回数が少なく、すでに接続を確立したサーバー」 を最優先している、とも言えるでしょう。

最後に

以上、neo4j-go-driver のコネクションプールの実装についての紹介を行いました。

October 10, 2021

Recommended

  1. Neo4j を VectorDB として使う

    Neo4j は v5.13 で Vector search index を実装した。使い方は他の VectorDB と同様で、ベクタライズしたデータを Vector Index に格納する。Cypher クエリを通じて近似解を導出する。 この記事では、Vector Index の使い方について紹介する。具体例では、ノ…
  2. Neo4j Causal Clustering 紹介

    本記事では、Neo4j における Causal Clustering (因果クラスタリング) について説明します。 クラスタリングの目的 データベースを運用する場合、単一のインスタンスを稼働させるか、Causal Clustering を用いてクラスター構成を組むか(クラスタリング)のいずれかを選択することになるでし…
  3. Neo4j Bolt Handshake Protocol Introduced

    Bolt Protocol において、サーバーとクライアントが接続を開始するためのプロトコルとして、Bolt Handshake Protocol Specification の仕様が策定されています。 本記事では、Bolt Handshake Protocol の概要について説明します。 Handshake Wor…