Dataflow は Apache Beam のマネージドサービスです。大規模データの分散処理が可能になります。簡単な使い方を記載します。
GCP が提供するデータ処理のテンプレートを利用すると、Apache Beam の使い方を把握していなくても Dataflow を利用できます。
ここでは簡単な例として、Cloud Storage (GCS) 上の CSV ファイルを、こちらのページで使い方を把握した DLP API によって de-identification して、BigQuery に書き出すデータフローのテンプレートを使ってみます。
PERSON_NAME
を検出する inspection-template を作ってみます。
de-identification-template は、CSV の特定のカラムのみ匿名化したいため、Record
を選択しています。
匿名化したいカラムのヘッダーは pii
であるとします。pii
カラムのデータについて、検出された箇所を info type 名で置換するように設定しています。
「テスト」タブで動作検証できます。inspection-template を指定することを忘れないようにします。pii
列の PERSON_NAME
が匿名化されていることが分かります。
DLP で匿名化したデータを格納する BigQuery のデータセットを準備します。
bq mk mydataset_20210501
DLP で匿名化したいデータを格納する GCS バケットを準備します。
gsutil mb gs://mybucket-20210501-4
GCP の Compute Engine サービス等を有効化すると、以下のようなサービスアカウントが作成されます。
{project-number}-compute@developer.gserviceaccount.com
既定では Dataflow はこのサービスアカウントを用いて、必要な VM を起動したりします。プロジェクトの Editor
ロールが付与された状態で作成されますが、更に Cloud Dataflow Service Agent
ロールを付与しておきます。
Dataflow template
に Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) を指定します。
GCS、Bigquery、DLP de-identification template の情報を指定します。
DLP inspection template の指定は必須ではありませんが、今回 PERSON_NAME
以外は検出してほしくないため、指定します。更に、Dataflow が起動する VM のタイプや台数、Dataflow が利用するサービスアカウント等を指定します。
ストリーミングタイプの Dataflow Job が作成されたことが確認できます。
指定した台数の VM が起動しています。
Dataflow Job のパラメータとして指定した GCS のパスに、CSV ファイルをアップロードします。
gsutil cp pii-sample.csv gs://mybucket-20210501-4/input/
pii-sample.csv
pii,non_pii
My name is Mike,My name is Mike
My phone number is 090-1234-1234,My phone number is 090-1234-1234
pii
カラムの PERSON_NAME
が匿名化されたことが確認できます。
先程用いた DLP による匿名化のテンプレートは、De-identification and re-identification of PII in large-scale datasets using Cloud DLP に記載されているユースケースの一つをサポートしています。
Dataflow テンプレートをそのまま利用できない場合は、Apache Beam SDK を用いて開発する必要があります。言語は Java または Python がサポートされています。以下では、既存のテンプレートを編集してカスタマイズする場合を想定しています。
Dataflow テンプレートはオープンソースとして GitHub で公開されています。
git clone git@github.com:GoogleCloudPlatform/DataflowTemplates.git
テンプレートは Java で記述されています。Java8 および Maven3 をローカル環境にインストールして、以下のようにビルドできます。
mvn clean compile
Java15 等ではビルドできないことに注意します。
$ mvn --version
Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d)
Maven home: c:\Users\username\bin\apache-maven-3.8.1\bin\..
Java version: 1.8.0_291, vendor: Oracle Corporation, runtime: C:\Program Files\Java\jdk1.8.0_291\jre
Default locale: ja_JP, platform encoding: MS932
OS name: "windows 10", version: "10.0", arch: "amd64", family: "windows"
DLPTextToBigQueryStreaming.java をテストするには以下のようにします。
mvn test -Dtest=DLPTextToBigQueryStreamingTest
実行結果の例
[INFO] Results:
[INFO]
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 44.991 s
[INFO] Finished at: 2021-05-02T17:29:09+09:00
[INFO] ------------------------------------------------------------------------
DLP で匿名化したデータを格納する BigQuery のデータセットを準備します。
bq mk mydataset_20210501
ローカルPC で DLP API を実行して、結果を BigQuery に格納するためのサービスアカウントを準備します。
開発環境として IntelliJ IDEA を利用する場合は、DLPTextToBigQueryStreaming.java
を以下のように実行できます。
「File → Project Structure...」で JDK8 を指定することを忘れないように注意します。
ビルドした結果を GCS にアップロードして Dataflow Job を作成することでもソースコードの検証は行えます。そうではなく、ローカルPC で直接 Java を実行するには以下のようにします。
「Run → Run... → Edit Configurations...」
Environment variables に GOOGLE_APPLICATION_CREDENTIALS
を設定して、サービスアカウントの鍵ファイルを指すようにしておきます。更に、Program arguments に必要な情報を指定しておきます:
--project=my-project-20210328
--dlpProjectId=my-project-20210328
--inputFilePattern=C:\Users\username\Desktop\pii-sample.csv
--inspectTemplateName=projects/my-project-20210328/locations/global/inspectTemplates/my-inspection-template-20210501
--deidentifyTemplateName=projects/my-project-20210328/locations/global/deidentifyTemplates/my-deidentification-template-20210501
--datasetName=mydataset_20210501
実行結果の例
BigQuery にも正しく格納されました。
Python から Apache Beam を利用するためには apache-beam をインストールします。setup.py に記載のとおり、GCP API を利用するための GCP_REQUIREMENTS
も依存モジュールとしてインストールするように指定します。
python3 -m pip install apache-beam[gcp]
my-dataflow-1.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
from argparse import ArgumentParser
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io import ReadFromText
import logging
logging.getLogger().setLevel(logging.INFO)
def Main():
parser = ArgumentParser()
parser.add_argument(
'--input', type=str,
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='Input file to process [default=%(default)s]')
knownArgs, pipelineArgs = parser.parse_known_args()
pipelineOptions = PipelineOptions(pipelineArgs)
pipelineOptions.view_as(SetupOptions).save_main_session = True
# パイプラインは with ブロックを抜けた時に実行されます。
with beam.Pipeline(options=pipelineOptions) as p:
lines = p | 'read from text' >> ReadFromText(knownArgs.input)
lines | beam.Map(print)
if __name__ == '__main__':
Main()
Apache Beam SDK には runner という概念があります。既定では DirectRunner
が使われてローカル実行されます。
python3 my-dataflow-1.py --input ~/Desktop/pii-sample.csv
出力例
INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
...
pii,non_pii
My name is Mike,My name is Mike
My phone number is 090-1234-1234,My phone number is 090-1234-1234
DataflowRunner
を指定して、その他の必要なオプションを pipelineArgs
に渡すことで、GCP Dataflow で処理を実行できます。
export GOOGLE_APPLICATION_CREDENTIALS='C:\Users\username\Downloads\my-project-20210328-04084aee5ee7.json'
python3 my-dataflow-1.py \
--project my-project-20210328 \
--region us-central1 \
--runner DataflowRunner \
--staging_location gs://mybucket-20210501-4/staging \
--temp_location gs://mybucket-20210501-4/temp
作成された Dataflow Job
Apache Beam で分散処理を行う際に必要となる関数の使い方は、以下のようなページに記載されています。
Apache Beam SDK で DirectRunner
を用いてローカル開発したパイプラインは、DataflowRunner
で GCP 上で実行できます。
実行方法としては、以下の二つがあります。
更に後者については、Classic templates と Flex Templates の二つの選択肢があります。