APC 技術ブログ

株式会社エーピーコミュニケーションズの技術ブログです。

株式会社 エーピーコミュニケーションズの技術ブログです。

DatabricksとKXをConnectする方法: PyKX with kdb Insights license

はじめに

GLB事業部Lakehouse部の鄭(ジョン)です。

この記事ではKXのPyKXを利用し、DatabricksとKXをConnectする方法を紹介いたします。
Databricks上でkdb Insights licenseを使用してKXのプロダクトのkdb+を利用する検証を行います。
kdb+は世界最速の時系列データベース及び分析エンジンです。

KXのプロダクトの詳細情報は記事をご確認ください。 techblog.ap-com.co.jp

目次

PyKXの紹介

PyKXは世界最速の時系列データベースkdb+に対するPythonの初のインターフェースであり、ベクトルプログラミング言語のq言語を基本的に使用します。 PyKXはq/kdb+をPythonと統合するためにPythonファーストのアプローチをとります。 膨大な量のin-memoryおよびdisk上のtime-series dataを効率的にクエリおよび分析できる機能をユーザーに提供します。

  • PyKXの説明は以下のKXのドキュメントを引用します。 code.kx.com

検証の流れは次のようです。
➀ kdb Insights licenseをダウンロードする
➁ PyKXをインストールする
③ kdb Insights licenseを利用し、PyKXをインポートする
④ KXのq言語を使ってみる
⑤ DatabricksのEDA機能を使ってみる
⑥ PyKXとPandasの性能比較

検証

1. kdb Insights licenseをダウンロードする

kdb+を使用するためにkdb Insights licenseをダウンロードします。 kdb Insights licenseは、KXのkdb Insights Personal Edition 12 Months Free Trialで無料体験が可能です。

kx.com

上記のページから個人情報を入力したら、メールにライセンスが届きます。

ライセンスはメールに書いてあるbase64エンコーディングバージョンと、【kdb Insights Personal Edition License Download File】を押したら自動的にインストールされるLICファイルバージョンがあります。

2. PyKXをインストールする

Databricks上でPyKXをインストールします。

pip install pykx

3. PyKXをインポートする

kdb Insights licenseを利用し、PyKXをインポートします。

import pykx as kx

kdb Insights licenseを登録する方法は二つあります。  

➀ライセンスのファイルをノートブックと同じロケーションに保存する方法です。
pythonのバージョンが3.10以上ならできます。
自動的にライセンスを読み込みます。

➁ライセンスを入力する方法です。
入力の方法は下のドキュメントにあります。

code.kx.com

【1. kdb Insights licenseをダウンロードする】を行った方はY / Y / 2 を順番で入力した後、メールに届いたlicense keyをコピーし、入力したらImportができます。

4. KXのq言語を使ってみる

PyKXをインポートしたらDatabricksのpythonでKXのq言語を使用できます。

kx.q.til(10)

Databricksにあるテーブルと同じテーブルをkdb+に作成できます。

  • KXの公開データ(Time series)をDatabricksにインストールした後、pandasを利用してロードします。
import pandas as pd
weather = pd.read_csv("自分のファイルのパス")
  • Databricksにあるデータをロードして、kdb+にインポートします。
# Creating the same table into kdb+
kx.q["weather"] = weather
kx.q["weather"]

PyKXのSQLを使ってみます。

kx.q.sql("SELECT * FROM weather limit 5")

  • sqlにあるほとんどの機能は使用可能です。
kx.q.sql("SELECT * FROM weather  ORDER BY name limit 5")

kx.q.sql("SELECT * FROM weather WHERE borough = 'Staten Island' limit 5")

kdb+にあるデータをロードして、pandasのテーブルに変換した後、csvでDatabricksに保存します。

# Creating the same table into pandas
df = kx.q["weather"].pd()
df.head(5)

kx.q.write.csv('保存するパス/weather_kdb.csv', df)

5. DatabricksのEDA機能を使ってみる

Databricksはワークスペースの機能を使って簡単なEDAが可能です。
kdb+にあるデータをロードして、Databricksで視覚化をします。

display(df)

【+】ボタンを押したらVisualizationとData Profileが出ます。

Data Profileを使うとFeaturesの特徴を簡単に探究できます。
Numeric FeaturesとCategorical Featuresで分類して表示します。

Numeric Featuresは欠損値と基本統計量、簡単なグラフを見ることができます。

Categorical Featuresでは、ユニークな値の個数、最も多くの変数とその頻度数を簡単に見ることができます。

Visualizationを使ったらUIを利用してノーコードで視覚化が可能です。

Dropdownで多様なグラフを選択できます。
* グラフの種類の詳細は下記URLをご参照ください。

learn.microsoft.com

  • longitudeとlatitudeがあったらマッピングが可能です。

  • カラムの設定を通じて簡単にLineグラフを作られます。(X、Y、Group byなどのカラム)

作成したグラフはワークスペースで確認できます。

  • マップのlayerはワークスペースでも調整可能です。

  • LineグラフはPNGファイルでイメージを保存できます。

6. PyKXとPandasの性能比較

PyKXとPandasの性能を比較します。

比較の方法は下の資料を参考しました。

  • kdb+のq言語とPandasの集計速度の比較:

www.jameslittle.me.uk

  • PyKXとPandasの速度比較及び視覚化:

kx.com

PyKXとPandasのcsv read機能を両方使って、データを読み込みます。
csvファイルをPyKXで読み込んだ場合は、PyKXのテーブルとして保存されて、
Pandasで読み込んだ場合は、Pandasのデータフレームとして保存されます。

  • PyKXの利用
weather = kx.q.read.csv("自分のファイルのパス")
type (weather)

  • Pandasの利用
weather_pd = pd.read_csv("自分のファイルのパス")
type (weather_pd)

データのカラムタイプを確認します。
Pandasでデータを読み込んだ場合は、timestampを他のtypeで認識する可能性があります。
タイプがtimestampでない場合は、日付、時間など時間に関連する機能を使用できません。
もしかして、Pandasのデータフレームが必要な場合は、Pykxでデータを読み込んだ後、Pykxでテーブルをpdに変換すればtimestampを使えます。

timestampのtimeを基準にsumとavg計算した時、掛かる時間で性能を比較します。
timestampは"2022.07.28D00:00:00.000000000"と同じ形です。
集計結果は次のようなテーブルを作成します。

時系列の集計を100番計算した時、掛かる時間を"%%capture"と"%timeit -n 100"を利用して、各変数に保存します。

  • PyKXの利用
%%capture pykx_time_res #コードの結果を保存する
%timeit -n 100 kx.q('grp:select sum airtemp, avg airtemp by timestamp.time from weather') #100番計算した時、掛かる時間を測定
  • Pandasの利用
%%capture pandas_time_res #コードの結果を保存する
%timeit -n 100 grp=weather_pd_q.groupby(weather_pd_q.timestamp.dt.time).agg({'airtemp': ['sum', 'mean']}) #100番計算した時、掛かる時間を測定

視覚化に使用する関数を作成します。関数は、KX Academyにある関数を利用しました。
一部のコードは、今回の検証に合わせて変更して使用しました。

kx.com

  • ➀ 文字列処理関数
# ➀ 文字列処理関数
def fix_res_string(v) -> str:
    return v.stdout.split("\n")[0].replace("+-", "±")
  • ➁ 入力された値をPandasとPyKXにディクショナリー化する関数
# ➁ 入力された値をPandasとPyKXにディクショナリー化する関数
def create_res_dict(pandas_time_res, pykx_time_res) -> pd.DataFrame:
    return pd.DataFrame(
        {
            "Time": {
                "Pandas": fix_res_string(pandas_time_res),
                "PyKX": fix_res_string(pykx_time_res),
            }
        }
    )
  • ③ 入力されたディクショナリーをデータフレーム化する関数
# ③ 入力されたディクショナリーをデータフレーム化する関数
def parse_time_vals(d: dict) -> pd.DataFrame:
    all_rows = pd.DataFrame()
    for k, v in d.items():

        if "±" not in v:
            one_row = pd.DataFrame(
                {"avg_time": "", "avg_dev": "", "runs": "", "loops": ""}, index=[k]
            )

        else:
            avg_time, rest1, rest2 = v.split(" ± ")
            avg_dev, _ = rest1.split(" per loop")
            rest2 = rest2.strip(" std. dev. of ")
            rest2 = rest2.strip(" loops each)")
            runs, loops = rest2.split(" runs, ")
            one_row = pd.DataFrame(
                {"avg_time": avg_time, "avg_dev": avg_dev, "runs": runs, "loops": loops}, index=[k]
            )

        all_rows = pd.concat([all_rows, one_row])

    return all_rows.reset_index().rename(columns={"index": "syntax"})
  • ④ "%timeit-n 100"で測定した値を入力すれば、➀・➁番関数を計算した後、③関数のデータフレームを返す関数
# ④ "%timeit-n 100"で測定した値を入力すれば、➀・➁番関数を計算した後、③関数のデータフレームを返す関数
def parse_vals(pandas_time_res, pykx_time_res) -> tuple[pd.DataFrame]:
    res_df = create_res_dict(pandas_time_res, pykx_time_res)
    return parse_time_vals(dict(res_df["Time"]))
  • ⑤ データフレームを比較する関数(PyKX/Pandas)
# ⑤ データフレームを比較する関数(PyKX/Pandas)
def compare(df: pd.DataFrame, metric: str, syntax1: str, syntax2: str) -> float:
    factor = float(df[df["syntax"] == syntax1][metric].values) / float(
        df[df["syntax"] == syntax2][metric].values
    )
    str_factor = f"{factor:,.2f} times less" if factor > 1 else f"{1/factor:,.2f} time more"
    print(
        color.BOLD
        + f"\nThe '{metric}' for '{syntax2}' is {str_factor} than '{syntax1}'.\n"
        + color.END
    )
    return factor
  • ⑥ 時間を変換する関数
# ⑥ 時間を変換する関数
def fix_time(l: list[str]) -> list[int]:
    fixed_l = []
    for v in l:
        if v.endswith(" ns"):
            fixed_l.append(float(v.strip(" ns")) / 1000 / 1000)
        elif v.endswith(" µs"):
            fixed_l.append(float(v.strip(" µs")) / 1000)
        elif v.endswith(" us"):
            fixed_l.append(float(v.strip(" us")) / 1000)
        elif v.endswith(" ms"):
            fixed_l.append(float(v.strip(" ms")))
        elif v.endswith(" s"):
            fixed_l.append(float(v.strip(" s")) * 1000)
        elif "min " in v:
            mins, secs = v.split("min ")
            total_secs = (float(mins) * 60) + float(secs.strip(" s"))
            fixed_l.append(total_secs * 1000)
        else:
            fixed_l.append(v)

    return fixed_l
  • ⑦ 視覚化する関数
from tabulate import tabulate
import matplotlib.pyplot as plt
# ⑦ 視覚化する関数
def graph_time_data(df_to_graph: pd.DataFrame) -> pd.DataFrame:
    print(tabulate(df_to_graph, headers="keys", tablefmt="psql"))

    fig, ax = plt.subplots(figsize=(7, 3))

    df = df_to_graph.copy()
    df["avg_time"] = fix_time(df["avg_time"])
    df["avg_dev"] = fix_time(df["avg_dev"])
    df["upper_dev"] = df["avg_time"] + df["avg_dev"]
    df["lower_dev"] = df["avg_time"] - df["avg_dev"]

    _ = compare(df, "avg_time", "Pandas", "PyKX")

    df.plot(ax=ax, kind="bar", x="syntax", y="avg_time", rot=0)
    # df.plot(ax=ax, kind="scatter", x="syntax", y="upper_dev", color="yellow")
    # df.plot(ax=ax, kind="scatter", x="syntax", y="lower_dev", color="orange")

    ax.set_title("Pandas VS Pykx Time Taken")
    ax.set_ylabel("Average Time (ms)")
    ax.get_legend().remove()
    ax.set(xlabel=None)

    plt.show()

    return df
  • class colorの作成
# class colorの作成
class color:
    PURPLE = "\033[95m"
    CYAN = "\033[96m"
    DARKCYAN = "\033[36m"
    BLUE = "\033[94m"
    GREEN = "\033[92m"
    YELLOW = "\033[93m"
    RED = "\033[91m"
    BOLD = "\033[1m"
    UNDERLINE = "\033[4m"
    END = "\033[0m"

作成した関数を利用して、視覚化します。
timestampを利用して集計する場合、pykxがpandasより11.7倍速いという結果が出ました。

time_res_df = parse_vals(
    pandas_time_res, pykx_time_res
)

numeric_time_res_df = graph_time_data(time_res_df)

時系列分析ではPyKXの方が、timestampタイプの使用および集計速度で優秀な結果が出ました。

まとめ

今回の投稿ではDatabricks上でkdb Insights licenseを登録してKXのkdbを利用してみました。
PyKXというライブラリを使用すると、PythonからKXのKDBにアクセスでき、q言語を使用できます。
Databricksの様々な機能とKXのKDB+の両方を使いたい方にお勧めします。

最後までご覧いただきありがとうございます。
引き続きどうぞよろしくお願い致します!

私たちはDatabricksを用いたデータ分析基盤の導入から内製化支援まで幅広く支援をしております。
もしご興味がある方は、お問い合わせ頂ければ幸いです。

www.ap-com.co.jp

また、一緒に働いていただける仲間も募集中です!
APCにご興味がある方の連絡をお待ちしております。

www.ap-com.co.jp