APC 技術ブログ

株式会社エーピーコミュニケーションズの技術ブログです。

株式会社 エーピーコミュニケーションズの技術ブログです。

RDSとFivetranをPrivateLink接続し、Databricksにincremental syncする

 

はじめに

GLB事業部Lakehouse部の阿部です。
今回は、FivetranとRDSをAWS PrivateLinkを使って接続し、DestinationのDatabricksワークスペースにincremental sync(差分更新)する方法を解説します。

Fivetranとは、クラウドベースのELT(Extract, Load, Transform)ツールです。
各種データソースから、DWHやSaaS型のデータストアへのデータの取り込みを簡単かつ迅速に行うために開発されました。
また、AWS PrivateLinkとはAWS同士の仮想ネットワークであるVPCに対し、ネットワーク間のトラフィックをインターネットを経由せずにプライベート接続する方法です。
FivetranはVPNやPrivate Linkなどの閉域網接続をサポートしており、本記事ではPrivateLink接続の方で進めます。

注意
本検証はデータソースの準備やセキュリティの設定に手間がかかるため、PrivateLink接続の方法だけ知りたい人は PrivateLinkの接続準備から見ていただければと思います。

目次

本記事の対象者

  • Private Linkの手順をざっくり知りたい人
  • Fivetranに対してPrivateLink接続を検討している人

前提条件

  • EC2にSSH接続できていること
  • EC2上にインストールするPostgreSQLのバージョンは15
  • AMIはAmazon Linux 2023 AMI
  • インスタンスタイプはt2.micro
  • FivetranのPlanはBusiness Critical Planである(Private Linkに必要)

全体構成

以下に全体構成図を示します。(リソースはすべてAWSの同一リージョン内にあります)
※データの流れは、データソース → Fivetran → Databricksですが、Fivetranがトリガーとなってデータソースからデータを移行するロジックになっています。

データソースにはEC2にインストールしたPostgreSQLを使用しており、FivetranとPrivateLink接続しています。
次に、FivetranはDatabricksワークスペースと接続するためにPartner Connectで接続しています。

※PrivateLink接続ができるかどうかまず検証したかったため、Fivetran-Databricks間の方ではPrivateLink接続はしておりません。

AWS側での構成図も示します。

(https://fivetran.com/docs/databases/connection-options#awsprivatelinkから引用)

NLB(Network Load balancer)を作成後、endpoint serviceからendpointを用意し、Fivetran側のendpointと接続する構成です。

データソース(PostgreSQL)の準備

EC2 instanceにインストールしたpostgreSQLをデータソースとします。
以下の記事を参考にしてEC2にpostgreSQLをインストールしました。

EC2にPostgreSQLをインストールしてサンプルDBを作成しよう!

dvdrentalというデータベースに対してサンプルデータを投入しています。

$ psql dvdrental
psql (15.0)
Type "help" for help.

dvdrental-# \dt
             List of relations
 Schema |     Name      | Type  |  Owner
--------+---------------+-------+----------
 public | actor         | table | postgres
 public | address       | table | postgres
 public | category      | table | postgres
 public | city          | table | postgres
 public | country       | table | postgres
 public | customer      | table | postgres
 public | film          | table | postgres
 public | film_actor    | table | postgres
 public | film_category | table | postgres
 public | inventory     | table | postgres
 public | language      | table | postgres
 public | payment       | table | postgres
 public | rental        | table | postgres
 public | staff         | table | postgres
 public | store         | table | postgres
(15 rows)

例としてactorカラムを確認します。

dvdrental=# select * from actor limit 5;
 actor_id | first_name |  last_name   |      last_update
----------+------------+--------------+------------------------
        1 | Penelope   | Guiness      | 2013-05-26 14:47:57.62
        2 | Nick       | Wahlberg     | 2013-05-26 14:47:57.62
        3 | Ed         | Chase        | 2013-05-26 14:47:57.62
        4 | Jennifer   | Davis        | 2013-05-26 14:47:57.62
        5 | Johnny     | Lollobrigida | 2013-05-26 14:47:57.62
(5 rows)

外部接続の設定

PostgreSQLが外部と接続できるように設定が必要です。

このままでは外部接続できませんが、 netstatコマンドを実行し、TCPの通信状態を調べます。

$ netstat -an | grep 5432
tcp        0      0 127.0.0.1:5432          0.0.0.0:*               LISTEN
tcp        0      0 127.0.0.1:5432          127.0.0.1:39052         TIME_WAIT
unix  2      [ ACC ]     STREAM     LISTENING     30990    /var/run/postgresql/.s.PGSQL.5432

127.0.0.1はローカルループバックアドレス、つまりlocalhostからの接続のみを受け付けるため、外部接続はできません。
外部接続できるようにPostgreSQLとAWS側の設定を進めていきます。

PostgreSQLの設定を変更

viコマンドでpostgresql.confファイルを編集し、外部からの接続を許可します。

sudo vi /var/lib/pgsql/data/postgresql.conf

このパスは、PostgreSQLのインストール方法やバージョンによって異なる場合があります。
パスがわからない場合はfindコマンドで調べます。

$ sudo find / -name postgresql.conf
/etc/postgresql-setup/upgrade/postgresql.conf
find: ‘/proc/2545’: No such file or directory
/var/lib/pgsql/data/postgresql.conf
/usr/lib/tmpfiles.d/postgresql.conf

viコマンドでpostgresql.confファイルを開いた後、以下の行を追加します。

listen_addresses = '*'
port = 5432

この変更により、PostgreSQLはすべてのインターフェイスでリッスン(待ち受け状態)になります。
つまり、localhostの外と接続できるようになりました。

pg_hba.confの設定を変更

接続許可のルールを設定するために、pg_hba.confを編集します。 ディレクトリがわからない場合は、先ほどのようにfindコマンドで検索しましょう。

sudo vi /var/lib/pgsql/data/pg_hba.conf

viコマンドでエディターをファイルを開いた後、以下の行を追加します。

host    all             all             0.0.0.0/0               md5

注意
この行を追加すると、任意のIPアドレスからの接続が許可され、MD5ハッシュを使用したパスワード認証が必要になります。
セキュリティ上の理由から、この設定は公開ネットワーク上のデータベースサーバーには推奨されません。
VPNや専用のネットワーク上、またはファイアウォールで適切に保護された環境内でのみ使用することが推奨されます。

ファイアウォールの設定

Amazon Linuxにはデフォルトで'firewalld'が利用されている場合があります。
次のコマンドで5432ポートを開きます。

$ sudo firewall-cmd --add-port=5432/tcp --permanent
$ sudo firewall-cmd --reload

AWSのセキュリティグループの設定

AWS Management ConsoleからEC2に対するセキュリティグループの設定を変更し、5432ポートへのアクセスを許可する必要があります。
インバウンドルールの編集から以下ルールを追加します。

注意
インバウンドルールは、特定のIPアドレスのみ許可しましょう。
今回は検証のため、すべてのIPアドレスからのアクセスを許可しています。

セキュリティの確認:

最後に以下の観点からセキュリティチェックをします。
検証レベルのため注意しませんが、業務で使用する場合はセキュリティグループの確認やPostgreSQLへのアクセス制御を再確認しましょう。

  • 強固なパスワードポリシーを適用しているか。
  • 外部からのアクセスを必要最小限に許可しているか。
  • 定期的にセキュリティの監査を実施しているか。

PostgreSQLを再起動してデータソース側の準備は完了です。

$ sudo systemctl restart postgresql

PrivateLinkの接続準備

PrivateLink接続するには以下の手順を踏みます。

  1. ターゲットグループの作成
  2. ロードバランサーの作成
  3. endpoint serviceの設定

手順1, 2では、EC2のコンソールからターゲットグループを作成後、ロードバランサーであるNLBを作成してターゲットグループに紐付けます。
ここでは具体的な手順は割愛しますが、以下の記事が参考になると思います。

【初心者】AWS PrivateLink を使ってみる #AWS - Qiita

手順3では、自分のVPC内にエンドポイントサービスを作成します。
Private Link接続をするにはProvider側にエンドポイントサービス、Consumer側にインターフェイスエンドポイントを作成する必要があります。 以下の画像に示すように、VPC内にあるロードバランサーのタイプを選択後、あらかじめ作成済みのNLBを選択します。

Fivetranのドキュメントを見ると、Fivetranのaccount managerにサービス名を送ってくださいとあります。(私はFivetranの方に連絡してservice nameを伝えました)

Safelist Fivetran’s AWS VPC Account ID (arn:aws:iam::834469178297:root) to allow access to your VPC endpoint service. Send the service name (VPCe) to your Fivetran account manager. For example, com.amazonaws.vpce.<region_id>.vpce-svc-xxxxxxxxxxxxxxxxx.

service name(エンドポイントサービス名)

その後、Fivetranからエンドポイント受付リクエストが届くため、プリンシパルを許可します。
私はプリンシパルの許可を手動で行いました。

以上でAWS側の設定が完了しました。

FivetranとのPartner Connectの手順

DatabricksワークスペースとFivetranの接続にPartner Connectを使います。
FivetranからのデータのDestinationとして、Databricksのワークスペースを設定します。
具体的な手順は、以下の記事で解説しております。

techblog.ap-com.co.jp

コネクタの追加

Destinationを選択後、Fivetranのダッシュボードからコネクタを選択します。
コネクタにはPostgreSQL RDSを選択します。

  • Destination schema prefix: schemaの先頭にプレフィックスが付きます。

  • Host: FivetranのHost nameを入れます。(Fivetranから頂いたhost名を入れます)

  • Port: PostgreSQLのPort番号を入れます。デフォルトでは5432です。

  • User: PostgeSQLのユーザー名を入れます。

  • password: PostgreSQLのパスワードを入れます。

  • Database: PostgreSQLの中のデータベースを入れます。先述した、サンプルデータを入れたdvdrentalを入力します。

  • Connection Method: PrivateLinkを使うためConnect via Private Networkingと入力します。

Require TLS when using PrivateLinkは、Private Linkの接続をTLSで暗号化するかどうか設定できます。
今回は暗号化しませんが、業務で使う場合は暗号化しましょう。

  • Update Method: incrementalに更新するにはTeleport syncを使うため、Detect Changes via Fivetran Teleport Syncを選択します。

Incremental updates

PostgreSQLのバージョンによって利用できるUpdate Methodが決まっております。
古いバージョンをお使いの場合は、Fivetranがサポートしていない場合があるためバージョンは要確認です。

https://fivetran.com/docs/databases/postgresql#supportedconfigurations

Save Testボタンをクリックします。

接続テストが成功しました!

Start initial Syncをクリックし、PostgreSQLのデータをDatabricksに移行します。

Databricksワークスペースに移り、CatalogからPartner Connectで設定したスキーマを確認すると、'postgres_rds_abe_public'というスキーマにdvdrentalのテーブルを確認できました。

Incremental処理の検証

検証の最後に、Fivetranがレコードを差分で取り込めているか確認するため、actorテーブルにレコードを1行insertしてFIvetranとDatabricksを同期後、ターゲットテーブルとlogを確認します。

PostgreSQLを起動後、以下のクエリーを実行してactorカラムにレコードをinsertします。

$ sudo systemctl start postgreSQL

dvdrental=# insert into values (abe,ka,'2023-10-20 15:04:57.21');
$ INSERT INTO actor (first_name, last_name, last_update) VALUES ('abe', 'ka', '2023-10-20 15:04:57.21');
INSERT 0 1

次に、Fivetran画面右上のSYNC NOWをクリックし、今すぐsyncします。

メニューのLogsからlogを確認します。

”count”カラムを見ると、1行だけ読み込めたことがわかります。

Databricksのワークスペースから、sync先のdelta tableを確認します。

select * 
from ka_abe.postgres_rds_abe_public.actor
order by last_update desc

一番上の行にinsertしたレコードも確認できました。

おわりに

データソースとFivetranをPrivateLink接続する方法を紹介しました。
データソースの準備とセキュリティの設定に時間がかかりましたが、PrivateLinkの設定は比較的簡単にできると思います。
セキュリティの設定は他のデータソースを使う場合にも応用できるため、データソースを外部接続する際には本記事を参考にしていただければ幸いです。

Lakehouse部ではデータ&AI案件での開発及びコンサルティングを行うエンジニア/PMを募集しています。
他部署でもエンジニアを募集しておりますので、APCにご興味がある方はカジュアル面談か求人一覧からご応募をお待ちしております!