APC 技術ブログ

株式会社エーピーコミュニケーションズの技術ブログです。

株式会社 エーピーコミュニケーションズの技術ブログです。

データパイプラインにおけるデータ取り込みの設計

はじめに

こんにちは、GLB事業部Lakehouse部の陳(チェン)です。

久しぶりのブログ投稿になる本日は、Databricksにおけるデータを取り込む際、スキーマ変更対応の機能の紹介です。

これまでの実務上、データパイプラインを作成する際、最初に悩むところはデータの取り込みです。 おおよその場合、データパイプラインを構築する際、ある程度、取り込むデータのスキーマを決める必要がありました。 しかし、システムを運用中に、データ取得元のIoT機器やアプリの仕様変更などを理由に、取り込むデータのスキーマが変更されることがしばしばあります。

本日は、この「取り込むデータのスキーマが変更される」際に、どのような対処をとるのが可能かを、幾つかの予想プランを提示します。 提示されたプランをヒントに、システム構築の初期段階で、あらかじめデータカラムの変更についての見通しを織り込んで、より円滑なシステム(データパイプライン)の運用へのヒントになれれば幸いです。

また、前置きになりますが、本記事では、概念的な説明(オプションの選択による結果の変化)を行います。そのため、データパイプラインの構築やDatabricks上の細かい操作を省略しています。ご了承いただければ幸いです。

前提条件

想定状況

下記Fig_1のように、幾つかの取り込むデータに変更が生じる場合を想定します。

  • (a) 元データと関係なく、取り込まれるデータは不変
  • (b) 新規データを取り込むテーブル上に保持してほしい場合
    1. 新規データを事前把握した上、コード面を改変
    2. 新規データを事前把握できない場合、新規データを新規カラムとして追加
    3. 新規データを事前把握できない場合、新規データを固定のカラムに保存し、後日にまとめて検討

Fig_1

ここで提示された状況はすべてではありませんが、大半の悩みを解決できると思います。 特殊なリクエストがある場合、データエンジニアに問い合わせするのがおすすめです。

テストデータ

下記Fig_2でまとめて示したcsvファイルの情報はテストデータの中身です。 本記事では、3つのcsvファイルを用意し、想定状況に応じて順番でAutoloaderにより取り込みます。 今回は、apc_test_1.csvファイルを最初に取り込んでから次のcsvファイル(apc_test_2.csvあるいはapc_test_3.csv)を取り込むことを想定しています。

また、apc_test_1.csvを除く、それぞれのcsvファイルはapc_test_1.csvに対してどのような変更点があるかを記載しております。 参照しながら記事を読んでいただければと思います。

Fig_2

それぞれの悩みに対応する解決法

このセクションで、それぞれの状況におけるAutoloaderのオプションの選択、SQLコードの書き方、およびデータパイプラインの動きをご紹介します。

パタン:(a) 取り込むスキーマ不変

収集されたデータに関係なく、データパイプラインに取り込むデータの内容を変更しない場合は、こちらのサンプルコードを示した通り、事前にスキーマを指定しておくと便利です。

CREATE OR REFRESH STREAMING LIVE TABLE acp_blog_test
COMMENT "for APC技術ブログ"
AS SELECT
  col1,col2,col3
FROM cloud_files("<フォルダーのパスをここに記載>", "csv",
  Map("header", "True", "schema", "col1 STRING, col2 STRING, col3 STRING")) 

このコードを利用して、サンプルcsvのapc_test_1、 apc_test_2、apc_test_3を順番に取り込み、その結果は下記Fig_3に示されています。 一回目の取り込みで、そのままapc_test_1の情報を取得しました.。二回目の取り込みになりますと、apc_test_2にcol4が含まれているにも関わらず、col1〜3の情報しか取り込まれていません。三回目の取り込みになりますと、apc_test_3はcol1〜2の情報しか保持していませんが、取り込み時にcol3は自動的に「null」として代入されます。

Fig_3

パタン:(b)-1 事前コード変更による対応

データパイプラインを構築当初のデータ構造はapc_test_1だと想定して、元コードはこちらになります。 このコードを使用して一回目のデータパイプラインを起動し、データを取り込みます。

CREATE OR REFRESH STREAMING LIVE TABLE acp_blog_test
COMMENT "for APC技術ブログ"
AS SELECT
  col1,col2,col3
FROM cloud_files("<フォルダーのパスをここに記載>", "csv",
  Map("header", "True", "schema", "col1 STRING, col2 STRING, col3 STRING")) 

次に、データ構造の変更は事前に分かり、apc_test_2のように、col4が追加される場合を想定します。データパイプラインを起動する前に、下記のコードの通り、SELECT文にcol4の選択を追加し、schema指定の部分に"col4 STRING"を付け加え、col4の型を合わせて指定しておきます。このままパイプラインを起動します。

CREATE OR REFRESH STREAMING LIVE TABLE acp_blog_test --LIVE
COMMENT "for APC技術ブログ"
AS SELECT
  col1,col2,col3, col4
FROM cloud_files("<フォルダーのパスをここに記載>", "csv",
  Map("header", "True", "schema", "col1 STRING, col2 STRING, col3 STRING, col4 STRING")) 

実行結果は下記Fig_4で示された通り、二回目の取り込みでcol4が取り込まれました。また、一回目の取り込まれたデータにcol4が付け加えられ、「null」値が代入されることが分かります。

Fig_4

パタン:(b)-2 新規カラムとして追加

事前にスキーマの変更を把握していない仮定の元で例を示します。 この場合、操作が少し煩雑になりますので、ご了承ください。

最初はこれまで通り、apc_test_1を取り込みます。 コードの中の「Map("cloudFiles.schemaEvolutionMode", "addNewColumns")」という文言を見ていきたいです。 ここで、Databricks特有の「schemaEvolutionMode(スキーマ進化)」という機能を駆使し、取り込まれたデータに未知なスキーマが含まれているかどうかを自動検出するように設定しておきます。

CREATE OR REFRESH STREAMING LIVE TABLE acp_blog_test --LIVE
COMMENT "for APC技術ブログ"
AS SELECT
  *
FROM cloud_files("<フォルダーのパスをここに記載>", "csv",
  Map("cloudFiles.schemaEvolutionMode", "addNewColumns")) 

この設定ですと、一回目apc_test_1のcsvファイルは無事に取り込んだことを確認できます。次に、上記のコードのままでapc_test_2をデータパイプラインを実行しますと、下記Fig_5のように、データパイプラインの実行が停止し、警告(warning)が表示されます。

ここの「警告」を「リマインド」や「注意喚起」として受け取ってほしいです。単純に取り込まれたデータの中に、これまで検出されていないカラムが検出され、パイプラインが再起動された際、この新たなカラムが新スキーマとして読み込まれ、テーブルが再構成される、との意味です。また、警告を出した意味はデータエンジニアにデータの中身を確認してほしい、と受け取ることができます。

Fig_5

警告文の中身を確認した上、データパイプラインを再開します。そうしますと、テーブルの中身が下記Fig_6に示された通りです。col4が新たに追加されます。同時に、古いデータにおいて、col4の値は「null」として代入されます。

Fig_6

パタン:(b)-3 新しいカラムをまとめて保存しておく

事前にスキーマの変更を把握していない、かつこれまでと違うデータが入ってきてもデータパイプラインが止まってほしくはありません。違うデータは一旦どこかで保存しておいて、後でまとめて確認したい場合、こちらの方法が適切だと思います。

ここで、「schemaEvolutionMode(スキーマ進化)」という機能を利用し、取り込まれたデータに未知なスキーマが含まれているかどうかを自動検出するように設定します。前節と違うのは、新しいカラムが検出された場合、これらのデータを一旦、「_rescued_data」というカラムにまとめて保存しておくことです。コードの中の「Map("cloudFiles.schemaEvolutionMode", "rescue")」と設定します。詳細の設定方法がこちらです。

CREATE OR REFRESH STREAMING LIVE TABLE acp_blog_test --LIVE
COMMENT "for APC技術ブログ"
AS SELECT
  *
FROM cloud_files("<フォルダーのパスをここに記載>", "csv",
  Map("cloudFiles.schemaEvolutionMode", "rescue")) 

取り込みの結果は下記Fig_7に示されています。一回目の取り込みは通常通りに取り込まれています。二回目は警告文が出されず、これまでと違うカラムは「_rescued_data」というカラムにまとめて記録されています。同時に、ファイルのパスも記録されており、後日にまとめて元ファイルの中身の確認がしやすいように配慮しています。

Fig_7

おわりに

本記事はいかがでしょうか。

データパイプラインの基本の「キ」ともいえる、データ取り込みの設計について、簡単にご紹介しました。 ここで紹介された例はすべての状況に対応できるものではありませんが、データパイプラインにおけるデータ取り込み設計のヒントになれれば幸いです。

説明が省略されていましたが、紹介したコードの中に、SELECT文では、時折カラムを明示していったり、「*」を用いて全カラムを選択するようにしています。 こちらの用途もニーズに応じて変更可能であり、その変更に伴いデータパイプラインの動作が変わります。 もっと詳しく知りたい方はぜひDatabricksの公式ドキュメントをご一読ください。 それぞれのデータパイプラインの特性に応じて、設計を行うのもデータエンジニアリングの醍醐味です。

最後に

私たちはDatabricksを用いたデータ分析基盤の導入から内製化支援まで幅広く支援をしております。 もしご興味がある方は、お問い合わせ頂ければ幸いです。

また、一緒に働いていただける仲間も募集中です! APCにご興味がある方の連絡をお待ちしております。