【SPSS】ModelerからPython経由でBigQueryに接続 ~拡張ノード インポート~

【SPSS】ModelerからPython経由でBigQueryに接続 ~拡張ノード インポート~

ModelerからBigQueryへの接続について、Windows版ではODBC接続がサポートされていますが、Mac版は現時点では未だサポートされていません。
拡張ノードを使えば、サポート有無に関わらずPython経由でBigQueryに接続できるのでは?
ということで、実際にデータをインポートできるか試してみました。
※エクスポート編はこちら

実行環境

  • IBM SPSS Modeler Client 18.3
  • Python 3.8.9

準備

1.Pythonに下記の3パッケージをインストール
  インストール方法は、使用するPythonによって異なります。

  • google-cloud-bigquery
  • pyarrow
  • pandas

  インストール方法1ModelerについているPythonを使用する場合 ~Pythonをあまり使わない人向け~
   下記手順で「google-cloud-bigquery」と「pyarrow」をインストール
   ※「pandas」はデフォルトでインストール済みですが、ストリーム実行時にエラーが出る場合は「–upgrade」オプションをつけて再インストールしてみてください

   ・Windowsでの手順 
   ・Macでの手順

  インストール方法2既存のPythonを使用したい場合 ~Pythonをよく使っている人向け~
   手順1.Pythonに上記の3パッケージをインストール
   手順2.ModelerでPythonのパスを指定 ※未設定の場合のみ      

2.BigQueryのサービスアカウントキー(jsonファイル)をローカルに保存
  サービスアカウントキーの作成方法

手順

1.Modelerの[入力]パレットから「拡張のインポート」ノードを選択

2.ノードを開き、「Python for Spark」を選択

3.「Pythonシンタックス」に下記を記述

#Modelerで使用
import spss.pyspark.runtime
from pyspark.sql.types import *
asContext = spss.pyspark.runtime.getContext()
sqlContext = asContext.getSparkSQLContext()

#BigQueryで使用
from google.cloud import bigquery
import os
import pandas
import pyarrow

#スキーマを作成(後続ノードに渡すフィールド(カラム)を設定)
fieldList =[
    StructField('AAAAAAAAAA',IntegerType(),True),
    StructField('BBBBBBBBBB',StringType(),True)
]
outputSchema = StructType(fieldList)

#BigQuery認証用jsonファイルを指定(ローカルに保存)
#Windowsの場合は「r'ファイルパス'」("\"を"\\"または"/"に変換でもできるが面倒)
credentials_json = './XXXXX.json'

#BigQuery環境変数(PC側で環境変数を設定しても良いが、今回はここで設定)
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_json

#Modelerで使用
asContext.setSparkOutputSchema(outputSchema)
if not asContext.isComputeDataModelOnly():

    #BigQuery接続用
    bigquery_client = bigquery.Client()

    #接続するプロジェクトID
    project = 'XXXXXXXXXX'
    #データセット名
    dataset = 'YYYYYYYYYY'
    #テーブル名
    table = 'ZZZZZZZZZZ'
    #SQL文
    query = 'SELECT AAAAAAAAAA,BBBBBBBBBB FROM `{0}.{1}.{2}`'.format(project, dataset, table)
    
    df = bigquery_client.query(query).to_dataframe()

    #Modelerで使用(後続ノードに結果を渡す)
    outputData = sqlContext.createDataFrame(df)
    asContext.setSparkOutputData(outputData)

4.[OK]をクリック

5.テーブルノードを接続して実行できるか確認

今回は拡張ノードを使用しましたが、「拡張機能」であれば変数の入力欄だけを表示するオリジナルノードを作成できます。
コーディングに慣れていない方と共有する場合は、その方が便利かもしれませんね。

また、PythonからBigQueryへ接続できるか確認したい場合は、こちら↓をご参照ください。

※参考
https://www.ibm.com/docs/ja/spss-modeler/18.3.0?topic=spark-scripting-python
BigQuery API クライアント ライブラリ