AWS Glue を利用すると Apache Spark をサーバーレスに実行できます。基本的な使い方を把握する目的で、S3 と RDS からデータを Redshift に ETL (Extract, Transform, and Load) してみます。2017/12/22 に東京リージョンでも利用できるようになりました。また、本ページでは Python を利用しますが、新たに Scala サポートされています。
AWS Glue ETL 概略図
AWS Glue を ETL サービスとして利用する場合のシステム概略図は以下のようになります。
- Data Source から Data Target に対して ETL します。本ページでは Data Source は S3 と RDS であり、Data Target は Redshift となります。
- ETL は Job として実行されます。Job は cron のように定期実行したり、Lambda などからイベント駆動で実行できます。任意のタイミングで手動実行することもできます。
- ETL Job は AWS Glue 内でサーバーレスに処理されます。内部的には Apache Spark が稼動しており、特に Python で記述された PySpark のスクリプトを利用します。
- ETL Job を定義する際は Data Source を Data Catalog から選択します。Data Target は Job 定義時に新規作成するか、Data Catalog から選択します。
- Data Catalog は Crawler が収集した情報をもとに作成されたメタデータです。データのカタログであり、実際のデータではありません。
- Data Catalog は Table として管理されます。複数の Table を Database としてまとめて管理します。
AWS Glue が利用する IAM ロールの作成
IAM ロールを AWS service で Glue
を選択して新規作成します。ここではロール名を my-glue-role-20171124
として作成しますが、権限の制限された IAM で作業している場合等を考慮する場合は "AWSGlueServiceRole" という文字列で始まる名称を設定します。ポリシーとしては Glue, S3, RDS, Redshift を操作するために必要なものを設定します。
- ロール名:
my-glue-role-20171124
- AWS service:
Glue
- ポリシー
AWSGlueServiceRole
AmazonS3FullAccess
AmazonRDSFullAccess
AmazonRedshiftFullAccess
S3 と RDS に検証用のデータを用意
S3 と RDS を Glue と同じリージョンに作成して検証用のデータを格納します。今回、RDS は Glue と同じ VPC 内のパブリックサブネットに所属させますが、実際には踏み台サーバーをパブリックサブネットに用意して、RDS はプライベートサブネットに所属させた方が安全です。また、S3 はインターネットを経由しないように VPC Endpoints を作成することもできますが、今回は設定しないで進めます。必要になり次第後から設定できます。
VPC
Glue から RDS や後述の Redshift にアクセスするためには VPC に関して以下の設定が必要であることに注意します。
- RDS や Redshift は Glue と同じ VPC に存在する必要があります。
- enableDnsHostnames と enableDnsSupport が true になっている必要があります。
- Glue は Public IP を持ちません。そのため、VPC 内に Glue から利用するプライベートサブネットを新規に作成して、NAT Gateway をデフォルトゲートウェイとしてルーティングテーブルを設定します。NAT Gateway 自体はパブリックサブネットに所属させる必要があることに注意してください。これは Glue から RDS/Redshift にインターネット経由でアクセスすることを意味してはおらず、RDS/Redshift の Publicly Accessible 設定は No でも Yes でも問題ありません。今回は簡単のためインターネット経由で操作したいため Yes として RDS/Redshift はパブリックサブネットに所属させます。
プライベートサブネットのルーティングテーブルで、デフォルトゲートウェイを「パブリックサブネットに所属する NAT Gateway」に設定
S3
新規バケット my-bucket-20171124
を作成して、以下のような JSON ファイルをアップロードします。
s3://my-bucket-20171124/s3.json
{"pstr":"aaa","pint":1}
{"pstr":"bbb","pint":2}
{"pstr":"ccc","pint":3}
{"pstr":"ddd","pint":4}
{"pstr":"eee","pint":5}
RDS
新規 MySQL DB とユーザーを以下のように作成します。セキュリティグループは後に VPC 内の Glue からアクセスできるように「同じセキュリティグループが設定されているアクセス元であれば許可する」という設定がなされたものを追加するようにします。インターネットからアクセスできる必要はなく、Publicly Accessible は No でも Yes でも問題ありません。mydw.xxxx.us-east-1.redshift.amazonaws.com
や myrdsdb.xxxx.us-east-1.rds.amazonaws.com
といったホスト名は VPC 内ではプライベート IP に解決されます。
- DB 名: myrdsdb
- ユーザー名: myuser
- パスワード: mypass
myrdsdb 内に以下のようなテーブルとレコードを作成します。
GRANT ALL ON myrdsdb.* TO 'myuser'@'%' IDENTIFIED BY 'mypass';
CREATE TABLE myrdsdb.myrdstable (id INT PRIMARY KEY AUTO_INCREMENT, cstr VARCHAR(32), cint INT);
INSERT INTO myrdsdb.myrdstable (cstr, cint) VALUES ('aaa', -1), ('bbb', -2), ('ccc', -3), ('ddd', -4), ('eee', -5);
Redshift の設定
ETL 先となる Redshift を Glue と同じ VPC のパブリックサブネットに作成します。実際には踏み台サーバーをパブリックサブネットに用意して、Redshift はプライベートサブネットに所属させた方が安全です。
psql -h mydw.xxxx.us-east-1.redshift.amazonaws.com -p 5439 mydw username
ユーザーと DB mydw
内のテーブルを作成します。
CREATE USER myuser WITH PASSWORD 'myPassword20171124';
CREATE TABLE mytable (cstr VARCHAR(32), cint INTEGER);
削除時
DROP USER myuser;
DROP TABLE mytable;
セキュリティグループは後に VPC 内の Glue からアクセスできるように「同じセキュリティグループが設定されているアクセス元であれば許可する」という設定がなされたものを追加するようにします。インターネットからアクセスできる必要はなく、Publicly Accessible は No でも Yes でも問題ありません。mydw.xxxx.us-east-1.redshift.amazonaws.com
や myrdsdb.xxxx.us-east-1.rds.amazonaws.com
といったホスト名は VPC 内ではプライベート IP に解決されます。
Crawler を設定してカタログ Table を作成
Glue コンソールから Crawler を設定して S3 と RDS それぞれのカタログ Table を作成します。こちらの公式ブログに記載されているとおり GUI 設定を繰り返すだけです。
Database の作成
Glue におけるメタデータ格納用 Table をまとめるための Database を作成します。
- Database name: mygluedb
Connections の作成
JDBC 接続で利用する認証情報を登録します。Connection type で RDS や Redshift を指定すると JDBC 設定が多少簡略化されますが、ここでは Connection type JDBC として設定します。設定後は「Test connection」ボタンで正常に接続できるか試験できます。
RDS
- Connection name: my-rds-connection
- Connection type: JDBC
- JDBC URL: VPC 内の Glue がアクセスするためのエンドポイントを設定します。JDBC URL のサンプルはこちらです。
jdbc:mysql://myrdsdb.xxxx.us-east-1.rds.amazonaws.com:3306/myrdsdb
のようになります。 - Username: myuser
- Password: mypass
- VPC: Glue が所属する VPC の設定です。RDS と同じ VPC を指定します。
- Subnet: Glue が所属する Subnet の設定です。RDS が存在する Subnet と通信できるものを指定します。前述のとおり Glue は Public IP を持たないため、NAT Gateway にルーティングされるプライベート Subnet を選択します。
- Security groups: Glue に設定する Security groups です。RDS にも設定した「同じセキュリティグループが設定されているアクセス元であれば許可する」という設定がなされたものを追加します。この RDS と同一のセキュリティグループを設定することで、RDS へのアクセスが許可されるようになります。
Redshift
- Connection name: my-redshift-connection
- Connection type: JDBC
- JDBC URL: VPC 内の Glue がアクセスするためのエンドポイントを設定します。JDBC URL のサンプルはこちらです。
jdbc:redshift://mydw.xxxx.us-east-1.redshift.amazonaws.com:5439/mydw
のようになります。 - Username: myuser
- Password: myPassword20171124
- VPC: Glue が所属する VPC の設定です。Redshift と同じ VPC を指定します。
- Subnet: Glue が所属する Subnet の設定です。Redshift が存在する Subnet と通信できるものを指定します。前述のとおり Glue は Public IP を持たないため、NAT Gateway にルーティングされるプライベート Subnet を選択します。
- Security groups: Glue に設定する Security groups です。Redshift にも設定した「同じセキュリティグループが設定されているアクセス元であれば許可する」という設定がなされたものを追加します。この Redshift と同一のセキュリティグループを設定することで、Redshift へのアクセスが許可されるようになります。
Crawlers の作成
S3 のデータをカタログ化する Crawler を登録します。
- Crawler name: my-s3-crawler-20171124
- Data store: S3
- Crawl data in: Specified path in my account
- Include path: s3://my-bucket-20171124/s3.json
- Choose an existing IAM role: my-glue-role-20171124
- Frequency: Run on demand
- Database: mygluedb
- Prefix added to tables:
mys3prefix_
RDS のデータをカタログ化する Crawler を登録します。
- Crawler name: my-rds-crawler-20171124
- Data store: JDBC
- Connection: my-rds-connection
- Include path: myrdsdb/myrdstable
- Choose an existing IAM role: my-glue-role-20171124
- Frequency: Run on demand
- Database: mygluedb
- Prefix added to tables:
myrdsprefix_
それぞれ手動実行して CloudWatch にログが出力されることを確認します。今回のように Data store 毎に Crawlers を分けることは必須ではなく、似たような Data store であれば同じ Crawler にまとめて登録します。結果として一つの Crawler から複数の Table が生成されます。Crawler は Table に対して大量に作成するようなものではありません。
ETL Job を作成
Glue コンソールから Job を登録します。具体的には ETL のソースとターゲットを設定します。設定内容をもとに PySpark スクリプトが自動生成されます。
- Name: myjob-20171124
- IAM role: my-glue-role-20171124
- This job runs: A proposed script generated by AWS Glue (スクリプトを自動生成する設定にします。ここで自分のスクリプトを最初から設定することもできます)
- Script file name: myjob-20171124
- S3 path where the script is stored: s3://aws-glue-scripts-xxxx-us-west-2/admin (スクリプトは S3 上に生成されます)
- Temporary directory: s3://aws-glue-scripts-xxxx-us-west-2/tmp (Job 実行のためには一時ファイルなどが必要になるため S3 のフォルダを指定する必要があります)
- Choose your data sources:
mys3prefix_s3_json
(ここでは S3 を指定していますが、後でスクリプトを編集することで RDS も Source として扱えます) - Choose your data targets: Create tables in your data target (ETL 出力先は Redshift を指定します)
- Data store: JDBC
- Connection: my-redshift-connection
- Database name: mydw
- Map the source columns to target columns: PySpark スクリプト自動生成のヒントとなります。使用しない Source 情報は Target から除外します。
- Job 作成後、Edit job で
my-rds-connection
を Required connections に追加します。
PySpark 開発環境の構築
生成された PySpark スクリプトは Job の「Edit script」ボタンからエディタを開いて確認できます。このまま編集して保存することもできますが、動作検証の度に Run job するのは時間がかかります。以下の開発環境を構築して十分に検証してから Run job すると効率的です。
エンドポイントの作成
Apache Spark への踏み台となる EC2 インスタンスを、開発環境から利用するエンドポイントとして作成します。Glue コンソールの Dev endpoints から GUI で作成できます。READY 状態のエンドポイントは課金対象となります。安くはないため、費用を抑えるためには例えば DPU を 2 として作成します。DPU 1 だと作成に失敗します。
- Development endpoint name: myendpoint-20171124
- IAM role: my-glue-role-20171124 (本ページ上部で作成した Glue から利用する IAM ロール)
- Data processing units (DPUs): 2
- Networking: インターネット経由で S3 のみ利用する場合は Skip networking information で問題ありません。RDS や Redshift を利用する場合は Choose a connection から作成済みのものを選択して VPC にこれから作成する EC2 インスタンスを所属させる必要があります。
- Public key contents: これからエンドポイントとして作成する、Apache Spark への踏み台となる EC2 インスタンスに SSH するための公開鍵を登録します。普段使用しているものを登録するか、ここで新規に作成します。
作成されるまでしばらくかかります。
PySpark 開発環境の選択
エンドポイントとして作成した Apache Spark への踏み台となる EC2 インスタンスを利用する開発環境としては、以下の三つの構築方法があります。
- 登録した公開鍵に対応する秘密鍵でエンドポイントに SSH して REPL シェルを利用
- 登録した公開鍵に対応する秘密鍵でエンドポイントに SSH して
-L
オプションでポート転送した状態で、Apache Zeppelin を ローカル PC で起動してエンドポイントに接続 - Glue サービスが提供する CloudFormation で Apache Zeppelin が動作する EC2 インスタンスを構築
三番目の選択肢が便利ですが、費用がかかります。二番目の選択肢は若干手間がかかりますが、費用が抑えられます。一番目の選択肢は簡単な検証時に有用です。SSH コマンドは作成したエンドポイントの詳細画面からコピーして利用できます。また、二番目の選択肢において Docker を利用する場合は、ホスト側のサービスをコンテナ内から利用することになるため ssh ポート転送時に -g
オプションを付与する必要があることや、Zeppelin 設定時に localhost ではなくホストの IP を指定する必要があることに注意します。
IAM ロールの作成
三番目の選択肢で Apache Zeppelin が稼動する EC2 インスタンスを CloudFormation で起動する場合は、事前に EC2 インスタンスで利用する IAM ロールを作成する必要があります。S3, RDS, Redshift すべてを利用する場合は以下のようになります。
- ロール名:
my-ec2-glue-notebook-role-20171124
- AWS service:
EC2
- ポリシー
AWSGlueServiceNotebookRole
AmazonS3FullAccess
AmazonRDSFullAccess
AmazonRedshiftFullAccess
PySpark スクリプトを編集して ETL Job を実行
構築した開発環境で PySpark スクリプトを編集します。検証まで完了したら ETL Job として実行します。以下に編集例を記載します。Zeppelin 操作時は %pyspark
で記述し始める必要があることに注意します。
Data Source から情報を取得して表示する例
%pyspark
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import udf
from pyspark.sql.functions import desc
# AWS Glue を操作するオブジェクト
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
# S3 からデータを取得
srcS3 = glueContext.create_dynamic_frame.from_catalog(
database = 'mygluedb',
table_name = 'mys3prefix_s3_json')
# 情報を表示
print 'Count:', srcS3.count()
srcS3.printSchema()
# S3 から取得したデータを、加工せずにそのまま S3 に CSV フォーマットで出力
glueContext.write_dynamic_frame.from_options(
frame = srcS3,
connection_type = 's3',
connection_options = {
'path': 's3://my-bucket-20171124/target'
},
format = 'csv')
S3 出力例
$ aws s3 ls s3://my-bucket-20171124/target/
2017-11-26 15:59:44 40 run-1511679582568-part-r-00000
$ aws s3 cp s3://my-bucket-20171124/target/run-1511679582568-part-r-00000 -
pstr,pint
aaa,1
bbb,2
ccc,3
ddd,4
eee,5
標準出力例
Count: 5
root
|-- pstr: string
|-- pint: int
ETL Job として実行する際は %pyspark
を削除して、以下のように Job の初期化と終了の処理を追記します。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import udf
from pyspark.sql.functions import desc
# AWS Glue を操作するオブジェクト
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
# Job の初期化 ←追加
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# S3 からデータを取得
srcS3 = glueContext.create_dynamic_frame.from_catalog(
database = 'mygluedb',
table_name = 'mys3prefix_s3_json')
# 情報を表示
print 'Count:', srcS3.count()
srcS3.printSchema()
# S3 から取得したデータを、加工せずにそのまま S3 に CSV フォーマットで出力
glueContext.write_dynamic_frame.from_options(
frame = srcS3,
connection_type = 's3',
connection_options = {
'path': 's3://my-bucket-20171124/target'
},
format = 'csv')
# Job を終了 ←追加
job.commit()
S3 出力例
$ aws s3 ls s3://my-bucket-20171124/target/
2017-11-26 15:59:44 40 run-1511679582568-part-r-00000
$ aws s3 cp s3://my-bucket-20171124/target/run-1511679582568-part-r-00000 -
pstr,pint
aaa,1
bbb,2
ccc,3
ddd,4
eee,5
CloudWatch 出力一部例
LogType:stdout
Log Upload Time:Sun Nov 26 06:59:44 +0000 2017
LogLength:46
Log Contents:
Count: 5
root
|-- pstr: string
|-- pint: int
End of LogType:stdout
PySpark チートシート
AWS Glue DynamicFrame と Apache Spark DataFrame は toDF()
および fromDF()
で互いに変換できます。AWS Glue が提供する DynamicFrame では表現できない処理が存在する場合、「データソースから取得したデータのキャストや不要なフィールドの除去」および「最終的な構造の変換とデータ出力」の間に発生する「データ変換」は Apache Spark の DataFrame に変換して記述します。DataFrame 関連のドキュメントを調査する際には Python の dir と help 関数の存在も知っておきます。
DynamicFrame / 入力データの整形
filter
srcS3.filter(lambda r: r['pint'] > 1).toDF().show()
+----+----+
|pint|pstr|
+----+----+
| 2| bbb|
| 3| ccc|
| 4| ddd|
| 5| eee|
+----+----+
フィールドを選択
srcS3.select_fields(['pstr']).toDF().show()
+----+
|pstr|
+----+
| aaa|
| bbb|
| ccc|
| ddd|
| eee|
+----+
フィールドを除外
srcS3.drop_fields(['pstr']).toDF().show()
+----+
|pint|
+----+
| 1|
| 2|
| 3|
| 4|
| 5|
+----+
フィールドの名称を変更
srcS3.rename_field('pstr', 'pstr2').toDF().show()
+----+-----+
|pint|pstr2|
+----+-----+
| 1| aaa|
| 2| bbb|
| 3| ccc|
| 4| ddd|
| 5| eee|
+----+-----+
型変換
キャスト可能な型はこちらです。
srcS3.resolveChoice(specs = [('pstr', 'cast:int')]).toDF().show()
+----+----+
|pstr|pint|
+----+----+
|null| 1|
|null| 2|
|null| 3|
|null| 4|
|null| 5|
+----+----+
DynamicFrame / データの変換
Join
Join.apply(srcS3, srcS3, 'pstr', 'pstr').toDF().show()
+----+----+-----+-----+
|pint|pstr|.pint|.pstr|
+----+----+-----+-----+
| 2| bbb| 2| bbb|
| 4| ddd| 4| ddd|
| 5| eee| 5| eee|
| 3| ccc| 3| ccc|
| 1| aaa| 1| aaa|
+----+----+-----+-----+
以下のように記述することもできます。
srcS3.join('pstr', 'pstr', srcS3).toDF().show()
+----+----+-----+-----+
|pint|pstr|.pint|.pstr|
+----+----+-----+-----+
| 2| bbb| 2| bbb|
| 4| ddd| 4| ddd|
| 5| eee| 5| eee|
| 3| ccc| 3| ccc|
| 1| aaa| 1| aaa|
+----+----+-----+-----+
SplitFields
指定したフィールドのみから成るフレームと、残りのフィールドから成るフレームの二つを含む DynamicFrameCollection を返します。DynamicFrameCollection からは select で DynamicFrame を取り出せます。
dfc = SplitFields.apply(srcS3, ['pstr'])
for dyf_name in dfc.keys():
dfc.select(dyf_name).toDF().show()
+----+
|pstr|
+----+
| aaa|
| bbb|
| ccc|
| ddd|
| eee|
+----+
+----+
|pint|
+----+
| 1|
| 2|
| 3|
| 4|
| 5|
+----+
map
def my_f(dyr):
dyr['pint'] = dyr['pint'] + 10
return dyr
srcS3.map(my_f).toDF().show()
+----+----+
|pint|pstr|
+----+----+
| 11| aaa|
| 12| bbb|
| 13| ccc|
| 14| ddd|
| 15| eee|
+----+----+
unnest
dyf = srcS3.apply_mapping([
('pstr', 'string', 'proot.str', 'string'),
('pint', 'int', 'proot.int', 'int')
])
dyf.printSchema()
dyf2 = dyf.unnest()
dyf2.printSchema()
root
|-- proot: struct
| |-- str: string
| |-- int: int
root
|-- proot.str: string
|-- proot.int: int
collection / select
dfc = SplitFields.apply(srcS3, ['pstr'], 'split_off', 'remaining')
dfc.select('split_off').toDF().show()
dfc.select('remaining').toDF().show()
+----+
|pstr|
+----+
| aaa|
| bbb|
| ccc|
| ddd|
| eee|
+----+
+----+
|pint|
+----+
| 1|
| 2|
| 3|
| 4|
| 5|
+----+
collection / map
def my_f(dyf, ctx):
df = dyf.toDF()
return DynamicFrame.fromDF(df.union(df), glueContext, 'dyf')
dfc = SplitFields.apply(srcS3, ['pstr'])
dfc2 = dfc.map(my_f)
for dyf_name in dfc2.keys():
dfc2.select(dyf_name).toDF().show()
+----+
|pstr|
+----+
| aaa|
| bbb|
| ccc|
| ddd|
| eee|
| aaa|
| bbb|
| ccc|
| ddd|
| eee|
+----+
+----+
|pint|
+----+
| 1|
| 2|
| 3|
| 4|
| 5|
| 1|
| 2|
| 3|
| 4|
| 5|
+----+
DynamicFrame / 変換後データの整形および出力
DynamicFrame への変換、フレーム構造の変更
# 相互変換
df = srcS3.toDF()
dyf = DynamicFrame.fromDF(df, glueContext, 'dyf')
dyf.printSchema()
# 構造変更
dyf2 = dyf.apply_mapping([
('pstr', 'string', 'proot.str', 'string'),
('pint', 'int', 'proot.int', 'int')
])
dyf2.printSchema()
root
|-- pstr: string
|-- pint: int
root
|-- proot: struct
| |-- str: string
| |-- int: int
RDB に格納できるような複数フレームに分割
フレーム分割のイメージ図はこちらです。
from pyspark.sql.functions import array
# 検証用にリスト形式のフィールドを追加します。
df = srcS3.toDF()
df2 = df.withColumn('plist', array(df.pstr, df.pint))
df2.show()
dyf = DynamicFrame.fromDF(df2, glueContext, 'dyf')
# リレーショナルな関係にある複数のフレームに分割します。
dfc = dyf.relationalize('my_dyf_root', 's3://my-bucket-20171124/tmp')
for dyf_name in dfc.keys():
print dyf_name
dfc.select(dyf_name).toDF().show()
+----+----+--------+
|pstr|pint| plist|
+----+----+--------+
| aaa| 1|[aaa, 1]|
| bbb| 2|[bbb, 2]|
| ccc| 3|[ccc, 3]|
| ddd| 4|[ddd, 4]|
| eee| 5|[eee, 5]|
+----+----+--------+
my_dyf_root
+----+----+-----+
|pstr|pint|plist|
+----+----+-----+
| aaa| 1| 1|
| bbb| 2| 2|
| ccc| 3| 3|
| ddd| 4| 4|
| eee| 5| 5|
+----+----+-----+
my_dyf_root_plist
+---+-----+---------+
| id|index|plist.val|
+---+-----+---------+
| 1| 0| aaa|
| 1| 1| 1|
| 2| 0| bbb|
| 2| 1| 2|
| 3| 0| ccc|
| 3| 1| 3|
| 4| 0| ddd|
| 4| 1| 4|
| 5| 0| eee|
| 5| 1| 5|
+---+-----+---------+
DataFrame / データの変換
直接 SQL を記述
メソッドチェーンではなく直接 SQL で記述できることを知っておきます。
df = srcS3.toDF()
df.createOrReplaceTempView('temptable')
sql_df = spark.sql('SELECT * FROM temptable')
sql_df.show()
print spark.sql('SELECT * FROM temptable LIMIT 1').first().pstr
+----+----+
|pstr|pint|
+----+----+
| aaa| 1|
| bbb| 2|
| ccc| 3|
| ddd| 4|
| eee| 5|
+----+----+
aaa
Python オブジェクトへの変換
SQL で解決することが難しい特殊な処理は、以下の手順で Python オブジェクトに変換することで解決できることがあります。
タプルのリストに変換
df = srcS3.toDF()
tuples = df.rdd.map(lambda row: (row.pstr, row.pint)).collect()
print tuples
[(u'aaa', 1), (u'bbb', 2), (u'ccc', 3), (u'ddd', 4), (u'eee', 5)]
処理を行った結果が例えばタプルのリストの場合、DataFrame には以下のように変換できます。
from pyspark.sql import Row
spark.createDataFrame(map(lambda tup: Row(pstr2=tup[0], pint2=tup[1]), tuples)).show()
+-----+-----+
|pint2|pstr2|
+-----+-----+
| 1| aaa|
| 2| bbb|
| 3| ccc|
| 4| ddd|
| 5| eee|
+-----+-----+
Apache Spark は基本的にすべての処理をメモリ上で行います。AWS Glue のように Cluster mode で動作する Apache Spark の場合、一つの Driver と複数の Executor が登場します。PySpark スクリプトの処理は Driver が実行します。DataFrame を含む Resilient Distributed Dataset (RDD) は複数の Executor のメモリ上に分割して展開されます。DataFrame の Executors への分割数は repartition() で調整できます。RDD を処理する場合は複数の Executor がタスクを分割します。そのため RDD を print しても Driver の標準出力ではタスクの実行結果を確認できません。上記 collect()
は複数の Executor に存在する RDD を Driver に集めるためのものです。そのため collect()
で集めて扱うデータサイズが巨大な場合は Driver のメモリ不足に注意します。また、以下のようなエラーが発生する場合があります。
Container killed by YARN for exceeding memory limits. xxx GB of yyy GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
Executors における RDD 処理において、Driver 上の Python オブジェクトを Executors と都度共有するようなことはせず、Python オブジェクトの処理はすべて Driver 上で処理して完全に DataFrame (RDD) 化してから Executors に展開することで解消する可能性があります。例えば RDD の変換処理における udf()
で Driver 上の Python オブジェクトを参照することは避けます。また、扱う RDD がそもそも Executor のメモリ容量に対して非常に大きい場合などは DPU を増やすことで対応できます。
空の DataFrame を作成
emptyRDD、StructType、StructField を利用します。
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import LongType
df = spark.createDataFrame(sc.emptyRDD(), StructType([
StructField("pint", LongType(), False)
]))
ペア RDD の生成
DataFrame からペア RDD とよばれる特殊な RDD を生成するためには KeyBy を利用します。
rdd = df.rdd.keyBy(lambda row: row.pint)
subtractByKey といった処理が可能になります。
rdd2 = df2.rdd.keyBy(lambda row: row.pint)
df3 = spark.createDataFrame(rdd.subtractByKey(rdd2).values())
S3 から直接読み込み
Crawler でカタログ Table を作成していない場合であっても、スキーマ情報が事前に分かっている場合は S3 から直接データを取得できます。JSON の場合は以下のようになります。
df = spark.read.json('s3://my-bucket-20171124/s3.json')
フィールドの追加
後述の select
で同様の処理を記述できますが withColumn
によってフィールドを追加できることを知っておきます。関数を用いて動的にフィールドの値を設定する場合は以下のようになります。前述の通り udf()
内では Driver 上のオブジェクトを参照しないように注意します。
from pyspark.sql.types import IntegerType
def my_f(x):
return x * 2
df = srcS3.toDF()
df.withColumn('pint2', udf(my_f, IntegerType())(df['pint'])).show()
+----+----+-----+
|pstr|pint|pint2|
+----+----+-----+
| aaa| 1| 2|
| bbb| 2| 4|
| ccc| 3| 6|
| ddd| 4| 8|
| eee| 5| 10|
+----+----+-----+
定数のフィールドを追加する場合は lit
を利用します。
from pyspark.sql.functions import lit
df = srcS3.toDF()
df.withColumn('pint2', lit(123)).show()
+----+----+-----+
|pstr|pint|pint2|
+----+----+-----+
| aaa| 1| 123|
| bbb| 2| 123|
| ccc| 3| 123|
| ddd| 4| 123|
| eee| 5| 123|
+----+----+-----+
select
df = srcS3.toDF()
df.select(df['pstr'], df['pstr'].substr(1,3).alias('pstr2'), (df['pint'] % 2).alias('peo')).show()
+----+-----+---+
|pstr|pstr2|peo|
+----+-----+---+
| aaa| aaa| 1|
| bbb| bbb| 0|
| ccc| ccc| 1|
| ddd| ddd| 0|
| eee| eee| 1|
+----+-----+---+
distinct、dropDuplicates
from pyspark.sql.functions import lit
df = srcS3.toDF().limit(2)
df2 = df.union(df).withColumn('pint2', lit(123))
df2.show()
+----+----+-----+
|pstr|pint|pint2|
+----+----+-----+
| aaa| 1| 123|
| bbb| 2| 123|
| aaa| 1| 123|
| bbb| 2| 123|
+----+----+-----+
すべてのフィールドに関して、重複を除去するためには distinct
を利用します。
df2.distinct().show()
+----+----+-----+
|pstr|pint|pint2|
+----+----+-----+
| bbb| 2| 123|
| aaa| 1| 123|
+----+----+-----+
特定のフィールドに関して、重複を除去するためには dropDuplicates
を利用します。
df2.dropDuplicates(['pint2']).show()
+----+----+-----+
|pstr|pint|pint2|
+----+----+-----+
| aaa| 1| 123|
+----+----+-----+
where
df = srcS3.toDF()
df.where(df['pint'] > 1).show()
+----+----+
|pstr|pint|
+----+----+
| bbb| 2|
| ccc| 3|
| ddd| 4|
| eee| 5|
+----+----+
groupBy/集約関数
count
df = srcS3.toDF()
df.groupBy(df['pint'] % 2).count().show()
+----------+-----+
|(pint % 2)|count|
+----------+-----+
| 1| 3|
| 0| 2|
+----------+-----+
df.groupBy(df['pstr'], (df['pint'] % 2).alias('peo')).count().show()
+----+---+-----+
|pstr|peo|count|
+----+---+-----+
| ccc| 1| 1|
| eee| 1| 1|
| ddd| 0| 1|
| aaa| 1| 1|
| bbb| 0| 1|
+----+---+-----+
sum
df = srcS3.toDF()
df.groupBy(df['pint'] % 2).sum('pint').show()
+----------+---------+
|(pint % 2)|sum(pint)|
+----------+---------+
| 1| 9|
| 0| 6|
+----------+---------+
min/max
df = srcS3.toDF()
df.groupBy(df['pint'] % 2).min('pint').show()
+----------+---------+
|(pint % 2)|min(pint)|
+----------+---------+
| 1| 1|
| 0| 2|
+----------+---------+
df.groupBy(df['pint'] % 2).max('pint').show()
+----------+---------+
|(pint % 2)|max(pint)|
+----------+---------+
| 1| 5|
| 0| 4|
+----------+---------+
avg
df = srcS3.toDF()
df.groupBy(df['pint'] % 2).avg('pint').show()
+----------+---------+
|(pint % 2)|avg(pint)|
+----------+---------+
| 1| 3.0|
| 0| 3.0|
+----------+---------+
orderBy
df = srcS3.toDF()
df.orderBy(desc('pstr'), 'pint').show()
+----+----+
|pstr|pint|
+----+----+
| eee| 5|
| ddd| 4|
| ccc| 3|
| bbb| 2|
| aaa| 1|
+----+----+
RDD 永続化について
RDD の操作には
- DynamicFrame の filter や DataFrame の where のように、クラスタ全体の Executors に分割して配置された RDD すべてを利用する必要のない「変換」
- DynamicFrame の count や DataFrame の collect のように、クラスタ全体の Executors に分割して配置された RDD すべてを利用しなければならない「アクション」
の二つが存在します。「変換」の操作結果は RDD です。「変換」は「アクション」で必要になるまで実行されません。これを遅延評価とよびます。また、同じ「変換」であっても「アクション」で必要になる度に繰り返し遅延評価されます。
RDD 永続化による高速化
cache() を利用すると、アクション実行のために遅延評価されて変換された結果の RDD が Executors のメモリに保存されます。
df = src.xxx.xxx.xxx.xxx ← 複雑な RDD 変換
df.cache() ← 次のアクションで永続化します
df.count() ← RDD 永続化はここでなされます
上記複雑な RDD 変換の結果が永続化されている状態で RDD を利用したアクションを再度実行したとしても
df.count()
複雑な RDD 変換は再度実行されません。cached がない状態で上記アクションを実行すると、再度以下のような複雑な RDD 変換が遅延評価されてしまいます。HDFS からのデータ読み出しを含めると時間がかかるため、複数のアクションで必要になる RDD は永続化するようにします。
src.xxx.xxx.xxx.xxx.count()
cache()
は storageLevel を詳細に指定できる persist()
の略記法のようなものです。永続化した RDD が不要になったら unpersist()
で Executors から明示的に削除するようにします。
df.unpersist()
可視化
Apache Zeppelin を利用している場合
df = srcS3.toDF()
df.createOrReplaceTempView('df')
として結果を保存した状態で、以下のようにしてグラフ化できます。
%sql
select * from df
PySpark スクリプトを記述する際の参考資料
実際に開発を行う際に都度辞書的に利用できるリファレンスには以下のようなものがあります。
macOS の Homebrew を利用できる場合は、Apache Spark をインストールして、実行しながらリファレンスの内容を確認できます。
brew install apache-spark
Python
pyspark
Scala
spark-shell
関連記事
- AWS EC2 インスタンスの選定方法準仮想化と完全仮想化 AWS のインスタンスタイプが準仮想化と完全仮想化のどちらの仮想化技術を採用したハードウェアであるかによって、使用できる AMI (OS) が異なるのは以下の理由によります。 準仮想化 ParaVirtualization (PV) において OS は自分が仮想化用のハードウェア上で動作していることを知っています。つまり仮想化用にカスタマイズされた専用の OS が必要になりま...
- OpenVPN で二つの VPC をつなぐための設定インターネット VPN (Virtual Private Network) には二拠点間の通信を暗号化する方式によって IPsec-VPN や SSL-VPN などがあります。OpenVPN は SSL-VPN の実装のひとつです。AWS VPC を二つ用意してそれらを OpenVPN で接続してみます。 VPC の構成 myvpc-1 (10.1.0.0/16) mysubnet-1-1 (10...
- Windows Server EC2 インスタンスへの CAL インストールWindows サーバを AWS EC2 インスタンスとして起動した後は、RDP 接続が必要となります。RDP 接続を行なうためには Remote Desktop Services (RDS) ライセンスが必要となります。 Windows Server のライセンスは、管理用途の RDS 接続を許可しています。その際に RDS ライセンスは不要です。ただし、[同時接続数が 2 という制限があります...
- AWS Lambda の基本的な使い方AWS Lambda はイベントドリブンな「関数」を登録できるサービスです。例えば S3 に画像がアップロードされたときにサムネイル用のサイズに加工する処理が記述された関数を登録できます。基本的な使い方をまとめます。 事前準備 関数の登録はブラウザで AWS コンソールにログインして行うこともできますが、本ページでは AWS C
- AWS 落穂拾い (Data Engineering)Kinesis Kinesis Streams データは 3 AZ にレプリケーションされます。Amazon Kinesis Data Streams FAQs 24 時間 (既定値) から 1 年までデータ保持できます。[Changing the Data Retention Period](https://docs.aws.amazon