本記事では、Neo4j Driver として公式サポートされている、neo4j-go-driver
におけるコネクションプールの実装について見ていきます。
本記事では、v4.3 branch の最新である
5a14c7024ca3203d89d54ae34bbfbc2886249401
commit hash のソースコードを前提にしています。
Driver
インスタンスは、NewDriver()
を利用して生成します。
driver, err = neo4j.NewDriver("neo4j://localhost:7687", neo4j.BasicAuth("username", "password", ""))
if err != nil {
return err // handle error
}
defer driver.Close()
この時、README に以下に記載されている通り、Driver ごとにデータベースへのコネクションをプールしているので、アプリケーションごとに一つ Driver
インスタンスを生成すれば十分です。
Each Driver instance maintains a pool of connections inside, as a result, it is recommended to only use one driver per application.
このコネクションプールの実装について確認してみましょう。
NewDriver()
が呼ばれたタイミングで、コネクションプールも生成します。デフォルトでは、コネクションプールの最大値は 100、各コネクションが維持される時間が 1 時間となっています。
// driver.go
func NewDriver() {
//...
// Let the pool use the same logid as the driver to simplify log reading.
d.pool = pool.New(d.config.MaxConnectionPoolSize, d.config.MaxConnectionLifetime, d.connector.Connect, d.log, d.logId)
//...
}
//...
type driver struct {
//...
pool *pool.Pool
//...
}
ここで、コネクションプールの実態は neo4j/internal/pool
の package pool
で実装されています。
まず、複数のサーバーの接続を Pool.server
フィールドで管理しています。Go の map はスレッドセーフではないので、複数の goroutine からアクセスする時に相互ロックを取る必要があるので、sync.Mutex
を利用しています。
つまり、このプールの実装は(バグがない限り)スレッドセーフです。
// pool.go
type Pool struct {
maxSize int
maxAge time.Duration
//...
servers map[string]*server
serversMut sync.Mutex
queueMut sync.Mutex
queue list.List
//...
}
queue
についても同様にスレッドセーフを意識して書かれています。
キューとは、接続を開始したいクライアントからのリクエストの内、プールしているコネクションが利用できない時のための待ち行列です。標準ライブラリである container/list の双方向連結リストを用いて実装されています。
// pool.go
// Add a waiting request to the queue and unlock the queue to let other threads that returns
// their connections access the queue.
q := &qitem{
servers: serverNames,
wakeup: make(chan bool),
}
e := p.queue.PushBack(q)
では、コネクションプールに存在するサーバーの内、どのサーバーに接続するかはどのように判断しているのでしょうか。
neo4j-go-driver では、Penalty という概念を導入しています。サーバーごとにこのペナルティを計算し、ペナルティの値(uint32
)が少ないサーバーを選択する実装となっています。
ペナルティはどの様に計算されているのでしょうか?
以下のコードを見てみると、「接続失敗した回数」や「処理に忙しい接続中のコネクションの数」、「初回の接続かどうか」 をもとにペナルティを計算しています。コネクションプールに存在するサーバーの内、クライアントの観点から見た時に最もパフォーマンスが良さそうなサーバーを選んでいます。
ペナルティの値が同じ時は、ラウンドロビンで決定することで一部のサーバーに負荷が偏らない工夫も見られます。
// server.go
const newConnectionPenalty = uint32(1 << 8)
// Calculates a penalty value for how this server compares to other servers
// when there is more than one server to choose from. The lower penalty the better choice.
func (s *server) calculatePenalty(now time.Time) uint32 {
penalty := uint32(0)
// If a connect to the server has failed recently, add a penalty
if s.hasFailedConnect(now) {
penalty = 1 << 31
}
// The more busy connections, the higher penalty
numBusy := uint32(s.busy.Len())
if numBusy > 0xff {
numBusy = 0xff
}
penalty |= numBusy << 16
// If there are no idle connections, add a penalty as the cost of connect would
// add to the transaction time
if s.idle.Len() == 0 {
penalty |= newConnectionPenalty
}
// Use last round-robin value as lowest priority penalty, so when all other is equal we will
// make sure to spread usage among the servers. And yes it will wrap around once in a while
// but since number of busy servers weights higher it will even out pretty fast.
penalty |= (s.roundRobin & 0xff)
return penalty
}
また、ここでビット演算を用いることで、省メモリを実現しています。また、ペナルティの種類ごとに優先順位をつけています。例えば、「接続失敗した回数(hasFailedConnect
)」は、「処理に忙しい接続中のコネクションの数(numBusy
)」より上位ビットなので、接続失敗したことが多いと、処理に忙しいコネクションの数の少ないサーバーより選択されにくいことがわかります。
variable | description | bit |
---|---|---|
newConnectionPenalty | 初回の接続かどうか | 1 << 8 |
hasFailedConnect | 接続失敗した回数 | 1 << 31 |
numBusy = 2 | 処理に忙しい接続中のコネクションの数(少) | 2 << 16 |
numBusy = 32 | 処理に忙しい接続中のコネクションの数(中) | 32 << 16 |
numBusy >= 255 | 処理に忙しい接続中のコネクションの数が(大) | 255 << 16 |
以上を一言で言い換えると、neo4j-go-driver のコネクションプールのペナルティの優先順には、「処理に忙しい接続中のコネクションが少ないサーバーの内、接続失敗回数が少なく、すでに接続を確立したサーバー」 を最優先している、とも言えるでしょう。
以上、neo4j-go-driver のコネクションプールの実装についての紹介を行いました。