Dataflow の基本的な使い方 (GCP)
[履歴] [最終更新] (2021/05/02 23:47:54)
最近の投稿
注目の記事

概要

Dataflow は Apache Beam のマネージドサービスです。大規模データの分散処理が可能になります。簡単な使い方を記載します。

Dataflow テンプレートを用いる場合

GCP が提供するデータ処理のテンプレートを利用すると、Apache Beam の使い方を把握していなくても Dataflow を利用できます。

ここでは簡単な例として、Cloud Storage (GCS) 上の CSV ファイルを、こちらのページで使い方を把握した DLP API によって de-identification して、BigQuery に書き出すデータフローのテンプレートを使ってみます。

DLP テンプレートの準備

PERSON_NAME を検出する inspection-template を作ってみます。

Uploaded Image

de-identification-template は、CSV の特定のカラムのみ匿名化したいため、Record を選択しています。

Uploaded Image

匿名化したいカラムのヘッダーは pii であるとします。pii カラムのデータについて、検出された箇所を info type 名で置換するように設定しています。

Uploaded Image

「テスト」タブで動作検証できます。inspection-template を指定することを忘れないようにします。pii 列の PERSON_NAME が匿名化されていることが分かります。

Uploaded Image

BigQuery データセットの準備

DLP で匿名化したデータを格納する BigQuery のデータセットを準備します。

bq mk mydataset_20210501

GCS バケットの準備

DLP で匿名化したいデータを格納する GCS バケットを準備します。

gsutil mb gs://mybucket-20210501-4

サービスアカウントの準備

GCP の Compute Engine サービス等を有効化すると、以下のようなサービスアカウントが作成されます。

{project-number}-compute@developer.gserviceaccount.com

既定では Dataflow はこのサービスアカウントを用いて、必要な VM を起動したりします。プロジェクトの Editor ロールが付与された状態で作成されますが、更に Cloud Dataflow Service Agent ロールを付与しておきます。

Uploaded Image

Dataflow ジョブの作成

Dataflow templateData Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) を指定します。

Uploaded Image

GCS、Bigquery、DLP de-identification template の情報を指定します。

Uploaded Image

DLP inspection template の指定は必須ではありませんが、今回 PERSON_NAME 以外は検出してほしくないため、指定します。更に、Dataflow が起動する VM のタイプや台数、Dataflow が利用するサービスアカウント等を指定します。

Uploaded Image

ストリーミングタイプの Dataflow Job が作成されたことが確認できます。

Uploaded Image

指定した台数の VM が起動しています。

Uploaded Image

GCS への CSV ファイルアップロード

Dataflow Job のパラメータとして指定した GCS のパスに、CSV ファイルをアップロードします。

gsutil cp pii-sample.csv gs://mybucket-20210501-4/input/

pii-sample.csv

pii,non_pii
My name is Mike,My name is Mike
My phone number is 090-1234-1234,My phone number is 090-1234-1234

pii カラムの PERSON_NAME が匿名化されたことが確認できます。

Uploaded Image

Apache Beam 2.x Java SDK の利用例

先程用いた DLP による匿名化のテンプレートは、De-identification and re-identification of PII in large-scale datasets using Cloud DLP に記載されているユースケースの一つをサポートしています。

Dataflow テンプレートをそのまま利用できない場合は、Apache Beam SDK を用いて開発する必要があります。言語は Java または Python がサポートされています。以下では、既存のテンプレートを編集してカスタマイズする場合を想定しています。

Java8 および Maven3 のインストール

Dataflow テンプレートはオープンソースとして GitHub で公開されています

git clone git@github.com:GoogleCloudPlatform/DataflowTemplates.git

テンプレートは Java で記述されています。Java8 および Maven3 をローカル環境にインストールして、以下のようにビルドできます。

mvn clean compile

Java15 等ではビルドできないことに注意します。

$ mvn --version
Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d)
Maven home: c:\Users\username\bin\apache-maven-3.8.1\bin\..
Java version: 1.8.0_291, vendor: Oracle Corporation, runtime: C:\Program Files\Java\jdk1.8.0_291\jre
Default locale: ja_JP, platform encoding: MS932
OS name: "windows 10", version: "10.0", arch: "amd64", family: "windows"

DLPTextToBigQueryStreaming.java をテストするには以下のようにします。

mvn test -Dtest=DLPTextToBigQueryStreamingTest

実行結果の例

[INFO] Results:
[INFO] 
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
[INFO] 
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  44.991 s
[INFO] Finished at: 2021-05-02T17:29:09+09:00
[INFO] ------------------------------------------------------------------------

BigQuery データセットの準備

DLP で匿名化したデータを格納する BigQuery のデータセットを準備します。

bq mk mydataset_20210501

サービスアカウントの準備

ローカルPC で DLP API を実行して、結果を BigQuery に格納するためのサービスアカウントを準備します。

Uploaded Image

IntelliJ IDEA の設定

開発環境として IntelliJ IDEA を利用する場合は、DLPTextToBigQueryStreaming.java を以下のように実行できます。

「File → Project Structure...」で JDK8 を指定することを忘れないように注意します。

Uploaded Image

ローカル PC でのビルドおよび実行

ビルドした結果を GCS にアップロードして Dataflow Job を作成することでもソースコードの検証は行えます。そうではなく、ローカルPC で直接 Java を実行するには以下のようにします。

「Run → Run... → Edit Configurations...」

Environment variables に GOOGLE_APPLICATION_CREDENTIALS を設定して、サービスアカウントの鍵ファイルを指すようにしておきます。更に、Program arguments に必要な情報を指定しておきます:

--project=my-project-20210328
--dlpProjectId=my-project-20210328
--inputFilePattern=C:\Users\username\Desktop\pii-sample.csv
--inspectTemplateName=projects/my-project-20210328/locations/global/inspectTemplates/my-inspection-template-20210501
--deidentifyTemplateName=projects/my-project-20210328/locations/global/deidentifyTemplates/my-deidentification-template-20210501
--datasetName=mydataset_20210501

Uploaded Image

実行結果の例

Uploaded Image

BigQuery にも正しく格納されました。

Uploaded Image

Apache Beam 2.x Python SDK の利用例

インストール

Python から Apache Beam を利用するためには apache-beam をインストールします。setup.py に記載のとおり、GCP API を利用するための GCP_REQUIREMENTS も依存モジュールとしてインストールするように指定します。

python3 -m pip install apache-beam[gcp]

簡単なサンプル

my-dataflow-1.py

#!/usr/bin/python
# -*- coding: utf-8 -*-

from argparse import ArgumentParser

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io import ReadFromText

import logging
logging.getLogger().setLevel(logging.INFO)


def Main():

    parser = ArgumentParser()

    parser.add_argument(
        '--input', type=str,
        default='gs://dataflow-samples/shakespeare/kinglear.txt',
        help='Input file to process [default=%(default)s]')

    knownArgs, pipelineArgs = parser.parse_known_args()

    pipelineOptions = PipelineOptions(pipelineArgs)
    pipelineOptions.view_as(SetupOptions).save_main_session = True

    # パイプラインは with ブロックを抜けた時に実行されます。
    with beam.Pipeline(options=pipelineOptions) as p:
        lines = p | 'read from text' >> ReadFromText(knownArgs.input)
        lines | beam.Map(print)


if __name__ == '__main__':
    Main()

Apache Beam SDK には runner という概念があります。既定では DirectRunner が使われてローカル実行されます。

python3 my-dataflow-1.py --input ~/Desktop/pii-sample.csv

出力例

INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
...

pii,non_pii
My name is Mike,My name is Mike
My phone number is 090-1234-1234,My phone number is 090-1234-1234

DataflowRunner を指定して、その他の必要なオプションを pipelineArgs に渡すことで、GCP Dataflow で処理を実行できます。

export GOOGLE_APPLICATION_CREDENTIALS='C:\Users\username\Downloads\my-project-20210328-04084aee5ee7.json'

python3 my-dataflow-1.py \
--project my-project-20210328 \
--region us-central1 \
--runner DataflowRunner \
--staging_location gs://mybucket-20210501-4/staging \
--temp_location gs://mybucket-20210501-4/temp

作成された Dataflow Job

Uploaded Image

その他

Apache Beam で分散処理を行う際に必要となる関数の使い方は、以下のようなページに記載されています。

Apache Beam SDK で作成したパイプラインの利用方法について

Apache Beam SDK で DirectRunner を用いてローカル開発したパイプラインは、DataflowRunner で GCP 上で実行できます。

実行方法としては、以下の二つがあります。

  • 本ページで行ったように GCS に staging するのと同時に Job を作成する方法。
  • GCP で用意されているテンプレートのように GCS へのアップロードのみを行い、Dataflow Job は必要な時に Cloud Console (Web UI) 等から作成する方法。

更に後者については、Classic templates と Flex Templates の二つの選択肢があります。

関連ページ
    概要 GCP の基本的な使い方について、コマンド例などを記載します。 Web ブラウザ経由で gcloud コマンドを利用 (Cloud Shell) GCP において、Cloud Console (Web UI) によるリソース管理操作と同等の処理は、gcloud コマンドによって行うこともできます。Cloud Shell