Neo4j Kafka Connect Introduced

★★ intermediate

October 16, 2021

本記事では、Neo4j と Apache Kafka の間でデータを連携するための手法として、Kafka Connect Plugin for Neo4j の概要について紹介します。

Kafka Connect

まず、Kafka Connect Plugin について紹介します。

Kafka は、Neo4j に限らず、Elasticsearch や AWS S3 など、様々な外部データソースと連携するために、Kafka Connect API を提供しています。

当初は、それぞれ自前で Producer と Consumer を実装する必要がありました。しかし、それぞれのデータソースと連携する実装の殆どは、共通化できる余地が大いにあります。

各データソースのクライアント初期化、ネットワークが分断したときのリトライ処理、スケールさせるためのマルチスレッド処理、書き込み先が busy な状態のときのバッファリング処理などなど。全て自分たちで実装するのも可能では有りますが、実装コストやテストによる品質担保など、確実にサービスの開発自体に割く時間が無くなってきます。

そこで、ユーザーがビジネスロジックの実装に注力できるように、Producer API と Consumer API を更に抽象化した Kafka Connect API が生まれました。

Kafka Connect Plugin の実態は JAR ファイルです。Kafka クラスターとは別に Kafka Connect クラスターを稼働させ、それぞれの Kafka Connect プロセスはその Connect クラスターによって管理させます。設定ファイルを通して、Kafka Connect Plugin ごとにスケールさせたりリトライ処理を任せることができます。

Kafka Connect Plugin for Neo4j

先述した Kafka Connect API を利用して実装されたのが、Kafka Connect Plugin for Neo4j です。開発はオープンソースで行われており、neo4j-contrib/neo4j-streams で公開されています。

Kafka Connect の用語として、Kafka にデータを入れる場合 Source Plugin、Kafka からデータを出す場合 Sink Plugin と呼びます。この Kafka Connect Plugin for Neo4j は、Source/Sink の両方サポートしています。

v4.1.0

2021 年 9 月、v4.1.0 がリリースされました。これによる大きな改善が2つあります。

第一に、Neo4j Aura との双方向のデータ連携が可能となりました。これまでは Sink Connector しか利用できなかったため、Kafka から Neo4j への書き込みはできたものの、その逆は自分たちで実装する必要が有りました。Neo4j から Kafka にデータを送信するための Source Connector が完成したことによって、より柔軟なアーキテクチャを組めるようになりました。

Kafka には Kafka で Confluent Cloud というマネージドサービスが存在します。したがって、例えば Neo4j と Kafka を組み合わせたデータ連携をしたいスタートアップが、しかし自分たちで運用するリソースもスキルセットも無い段階で、両方マネージドサービスを使うことで、運用の負荷から解放されながら、サービスの開発に集中することができるようになりました。

第二に、Neo4j-Streams Plugin が Deprecated になりました。Kafka Connect Plugin が開発される前は、Neo4j と同居させる形で別のプラグインを同じマシンにインストールさせる必要が有りました。そもそもインストールする運用負荷がありますし、Neo4j 本体とメモリや CPU といったマシンリソースを共有するので、リソース管理にも難がありました。プラグインのプロセス自体のモニタリングも最適解がない状態でした。

上記の課題を、Kafka Connect Plugin を利用することによって解決できました。Kafka Connect Cluster を通じて分離された環境でスケールさせることができますし、Kafka Connect が提供するモニタリングも十分に揃っています。もちろん、Neo4j と同居させる形で一つ一つインストールしていく手間も省けました。

最後に

以上、Kafka Connect Plugin を利用した Neo4j と Kafka 間でのデータ連携について紹介しました。

Recommended Posts

  1. ★★★ advanced
    neo4j-causal-clustering-introduced-routing
    本記事では、Neo4j における Causal Clustering (因果クラスタリング) について説明します。 クラスタリングの目的 データベースを運用する場合、単一のインスタンスを稼働させるか、Causal Clustering を用いてクラスター構成を組むか(クラスタリング)のいずれかを選択することになるでしょう。 Single Instance…
  2. ★★★ advanced
    neo4j-bolt-handshake-protocol-introduced
    Bolt Protocol において、サーバーとクライアントが接続を開始するためのプロトコルとして、Bolt Handshake Protocol Specification の仕様が策定されています。 本記事では、Bolt Handshake Protocol の概要について説明します。 Handshake Workflow…