Ken WagatsumaSRE at Neo4j

Neo4j Bolt Driver for Go: Code Reading - Connection Pool

★★★ advanced

October 10, 2021

本記事では、Neo4j Driver として公式サポートされている、neo4j-go-driver におけるコネクションプールの実装について見ていきます。

本記事では、v4.3 branch の最新である 5a14c7024ca3203d89d54ae34bbfbc2886249401 commit hash のソースコードを前提にしています。

About Connection Pool

Driver インスタンスは、NewDriver() を利用して生成します。

driver, err = neo4j.NewDriver("neo4j://localhost:7687", neo4j.BasicAuth("username", "password", ""))
if err != nil {
    return err // handle error
}
defer driver.Close()

この時、README に以下に記載されている通り、Driver ごとにデータベースへのコネクションをプールしているので、アプリケーションごとに一つ Driver インスタンスを生成すれば十分です。

Each Driver instance maintains a pool of connections inside, as a result, it is recommended to only use one driver per application.

このコネクションプールの実装について確認してみましょう。

コネクションプールの作成

NewDriver() が呼ばれたタイミングで、コネクションプールも生成します。デフォルトでは、コネクションプールの最大値は 100、各コネクションが維持される時間が 1 時間となっています。

// driver.go
func NewDriver() {
    //...
    // Let the pool use the same logid as the driver to simplify log reading.
    d.pool = pool.New(d.config.MaxConnectionPoolSize, d.config.MaxConnectionLifetime, d.connector.Connect, d.log, d.logId)
    //...
}

//...

type driver struct {
    //...
    pool      *pool.Pool
    //...
}

Thread Safety

ここで、コネクションプールの実態は neo4j/internal/poolpackage pool で実装されています。

まず、複数のサーバーの接続を Pool.server フィールドで管理しています。Go の map はスレッドセーフではないので、複数の goroutine からアクセスする時に相互ロックを取る必要があるので、sync.Mutex を利用しています。

つまり、このプールの実装は(バグがない限り)スレッドセーフです

// pool.go
type Pool struct {
    maxSize    int
    maxAge     time.Duration
    //...
    servers    map[string]*server
    serversMut sync.Mutex
    queueMut   sync.Mutex
    queue      list.List
    //...
}

queue についても同様にスレッドセーフを意識して書かれています。

キューとは、接続を開始したいクライアントからのリクエストの内、プールしているコネクションが利用できない時のための待ち行列です。標準ライブラリである container/list双方向連結リストを用いて実装されています。

// pool.go

// Add a waiting request to the queue and unlock the queue to let other threads that returns
// their connections access the queue.
q := &qitem{
    servers: serverNames,
    wakeup:  make(chan bool),
}
e := p.queue.PushBack(q)

Penalty

では、コネクションプールに存在するサーバーの内、どのサーバーに接続するかはどのように判断しているのでしょうか。

neo4j-go-driver では、Penalty という概念を導入しています。サーバーごとにこのペナルティを計算し、ペナルティの値(uint32)が少ないサーバーを選択する実装となっています。

ペナルティはどの様に計算されているのでしょうか?

以下のコードを見てみると、「接続失敗した回数」や「処理に忙しい接続中のコネクションの数」、「初回の接続かどうか」 をもとにペナルティを計算しています。コネクションプールに存在するサーバーの内、クライアントの観点から見た時に最もパフォーマンスが良さそうなサーバーを選んでいます。

ペナルティの値が同じ時は、ラウンドロビンで決定することで一部のサーバーに負荷が偏らない工夫も見られます。

// server.go

const newConnectionPenalty = uint32(1 << 8)

// Calculates a penalty value for how this server compares to other servers
// when there is more than one server to choose from. The lower penalty the better choice.
func (s *server) calculatePenalty(now time.Time) uint32 {
    penalty := uint32(0)

    // If a connect to the server has failed recently, add a penalty
    if s.hasFailedConnect(now) {
        penalty = 1 << 31
    }
    // The more busy connections, the higher penalty
    numBusy := uint32(s.busy.Len())
    if numBusy > 0xff {
        numBusy = 0xff
    }
    penalty |= numBusy << 16
    // If there are no idle connections, add a penalty as the cost of connect would
    // add to the transaction time
    if s.idle.Len() == 0 {
        penalty |= newConnectionPenalty
    }
    // Use last round-robin value as lowest priority penalty, so when all other is equal we will
    // make sure to spread usage among the servers. And yes it will wrap around once in a while
    // but since number of busy servers weights higher it will even out pretty fast.
    penalty |= (s.roundRobin & 0xff)

    return penalty
}

また、ここでビット演算を用いることで、省メモリを実現しています。また、ペナルティの種類ごとに優先順位をつけています。例えば、「接続失敗した回数(hasFailedConnect)」は、「処理に忙しい接続中のコネクションの数(numBusy)」より上位ビットなので、接続失敗したことが多いと、処理に忙しいコネクションの数の少ないサーバーより選択されにくいことがわかります。

variabledescriptionbit
newConnectionPenalty初回の接続かどうか1 << 8
hasFailedConnect接続失敗した回数1 << 31
numBusy = 2処理に忙しい接続中のコネクションの数(少)2 << 16
numBusy = 32処理に忙しい接続中のコネクションの数(中)32 << 16
numBusy >= 255処理に忙しい接続中のコネクションの数が(大)255 << 16

以上を一言で言い換えると、neo4j-go-driver のコネクションプールのペナルティの優先順には、「処理に忙しい接続中のコネクションが少ないサーバーの内、接続失敗回数が少なく、すでに接続を確立したサーバー」 を最優先している、とも言えるでしょう。

最後に

以上、neo4j-go-driver のコネクションプールの実装についての紹介を行いました。

Recommended Posts

  1. ★★★ advanced
    neo4j-causal-clustering-introduced-routing
    本記事では、Neo4j における Causal Clustering (因果クラスタリング) について説明します。 クラスタリングの目的 データベースを運用する場合、単一のインスタンスを稼働させるか、Causal Clustering を用いてクラスター構成を組むか(クラスタリング)のいずれかを選択することになるでしょう。 Single Instance…
  2. ★★★ advanced
    neo4j-bolt-handshake-protocol-introduced
    Bolt Protocol において、サーバーとクライアントが接続を開始するためのプロトコルとして、Bolt Handshake Protocol Specification の仕様が策定されています。 本記事では、Bolt Handshake Protocol の概要について説明します。 Handshake Workflow…