モーダルを閉じる工作HardwareHub ロゴ画像

工作HardwareHubは、ロボット工作や電子工作に関する情報やモノが行き交うコミュニティサイトです。さらに詳しく

利用規約プライバシーポリシー に同意したうえでログインしてください。

AWS Glue の基本的な使い方

モーダルを閉じる

ステッカーを選択してください

お支払い手続きへ
モーダルを閉じる

お支払い内容をご確認ください

購入商品
」ステッカーの表示権
メッセージ
料金
(税込)
決済方法
GooglePayマーク
決済プラットフォーム
確認事項

利用規約をご確認のうえお支払いください

※カード情報はGoogleアカウント内に保存されます。本サイトやStripeには保存されません

※記事の執筆者は購入者のユーザー名を知ることができます

※購入後のキャンセルはできません

作成日作成日
2017/11/24
最終更新最終更新
2021/09/07
記事区分記事区分
一般公開

目次

    Pythonの初心者向け記事を多数執筆。データ解析にも挑戦中です。

    AWS Glue を利用すると Apache Spark をサーバーレスに実行できます。基本的な使い方を把握する目的で、S3 と RDS からデータを Redshift に ETL (Extract, Transform, and Load) してみます。2017/12/22 に東京リージョンでも利用できるようになりました。また、本ページでは Python を利用しますが、新たに Scala サポートされています

    AWS Glue ETL 概略図

    AWS Glue を ETL サービスとして利用する場合のシステム概略図は以下のようになります

    • Data Source から Data Target に対して ETL します。本ページでは Data Source は S3 と RDS であり、Data Target は Redshift となります。
    • ETL は Job として実行されます。Job は cron のように定期実行したり、Lambda などからイベント駆動で実行できます。任意のタイミングで手動実行することもできます。
    • ETL Job は AWS Glue 内でサーバーレスに処理されます。内部的には Apache Spark が稼動しており、特に Python で記述された PySpark のスクリプトを利用します。
    • ETL Job を定義する際は Data Source を Data Catalog から選択します。Data Target は Job 定義時に新規作成するか、Data Catalog から選択します。
    • Data Catalog は Crawler が収集した情報をもとに作成されたメタデータです。データのカタログであり、実際のデータではありません。
    • Data Catalog は Table として管理されます。複数の Table を Database としてまとめて管理します。

    AWS Glue が利用する IAM ロールの作成

    IAM ロールを AWS service で Glue を選択して新規作成します。ここではロール名を my-glue-role-20171124 として作成しますが、権限の制限された IAM で作業している場合等を考慮する場合は "AWSGlueServiceRole" という文字列で始まる名称を設定します。ポリシーとしては Glue, S3, RDS, Redshift を操作するために必要なものを設定します。

    • ロール名: my-glue-role-20171124
    • AWS service: Glue
    • ポリシー
      • AWSGlueServiceRole
      • AmazonS3FullAccess
      • AmazonRDSFullAccess
      • AmazonRedshiftFullAccess

    S3 と RDS に検証用のデータを用意

    S3 と RDS を Glue と同じリージョンに作成して検証用のデータを格納します。今回、RDS は Glue と同じ VPC 内のパブリックサブネットに所属させますが、実際には踏み台サーバーをパブリックサブネットに用意して、RDS はプライベートサブネットに所属させた方が安全です。また、S3 はインターネットを経由しないように VPC Endpoints を作成することもできますが、今回は設定しないで進めます。必要になり次第後から設定できます。

    VPC

    Glue から RDS や後述の Redshift にアクセスするためには VPC に関して以下の設定が必要であることに注意します。

    • RDS や Redshift は Glue と同じ VPC に存在する必要があります。
    • enableDnsHostnames と enableDnsSupport が true になっている必要があります
    • Glue は Public IP を持ちません。そのため、VPC 内に Glue から利用するプライベートサブネットを新規に作成して、NAT Gateway をデフォルトゲートウェイとしてルーティングテーブルを設定します。NAT Gateway 自体はパブリックサブネットに所属させる必要があることに注意してください。これは Glue から RDS/Redshift にインターネット経由でアクセスすることを意味してはおらず、RDS/Redshift の Publicly Accessible 設定は No でも Yes でも問題ありません。今回は簡単のためインターネット経由で操作したいため Yes として RDS/Redshift はパブリックサブネットに所属させます。

    プライベートサブネットのルーティングテーブルで、デフォルトゲートウェイを「パブリックサブネットに所属する NAT Gateway」に設定

    S3

    新規バケット my-bucket-20171124 を作成して、以下のような JSON ファイルをアップロードします。

    s3://my-bucket-20171124/s3.json

    {"pstr":"aaa","pint":1}
    {"pstr":"bbb","pint":2}
    {"pstr":"ccc","pint":3}
    {"pstr":"ddd","pint":4}
    {"pstr":"eee","pint":5}
    

    RDS

    新規 MySQL DB とユーザーを以下のように作成します。セキュリティグループは後に VPC 内の Glue からアクセスできるように「同じセキュリティグループが設定されているアクセス元であれば許可する」という設定がなされたものを追加するようにします。インターネットからアクセスできる必要はなく、Publicly Accessible は No でも Yes でも問題ありません。mydw.xxxx.us-east-1.redshift.amazonaws.commyrdsdb.xxxx.us-east-1.rds.amazonaws.com といったホスト名は VPC 内ではプライベート IP に解決されます。

    • DB 名: myrdsdb
    • ユーザー名: myuser
    • パスワード: mypass

    myrdsdb 内に以下のようなテーブルとレコードを作成します。

    GRANT ALL ON myrdsdb.* TO 'myuser'@'%' IDENTIFIED BY 'mypass';
    CREATE TABLE myrdsdb.myrdstable (id INT PRIMARY KEY AUTO_INCREMENT, cstr VARCHAR(32), cint INT);
    INSERT INTO myrdsdb.myrdstable (cstr, cint) VALUES ('aaa', -1), ('bbb', -2), ('ccc', -3), ('ddd', -4), ('eee', -5);
    

    Redshift の設定

    ETL 先となる Redshift を Glue と同じ VPC のパブリックサブネットに作成します。実際には踏み台サーバーをパブリックサブネットに用意して、Redshift はプライベートサブネットに所属させた方が安全です。

    psql -h mydw.xxxx.us-east-1.redshift.amazonaws.com -p 5439 mydw username
    

    ユーザーと DB mydw 内のテーブルを作成します。

    CREATE USER myuser WITH PASSWORD 'myPassword20171124';
    CREATE TABLE mytable (cstr VARCHAR(32), cint INTEGER);
    

    削除時

    DROP USER myuser;
    DROP TABLE mytable;
    

    セキュリティグループは後に VPC 内の Glue からアクセスできるように「同じセキュリティグループが設定されているアクセス元であれば許可する」という設定がなされたものを追加するようにします。インターネットからアクセスできる必要はなく、Publicly Accessible は No でも Yes でも問題ありません。mydw.xxxx.us-east-1.redshift.amazonaws.commyrdsdb.xxxx.us-east-1.rds.amazonaws.com といったホスト名は VPC 内ではプライベート IP に解決されます。

    Crawler を設定してカタログ Table を作成

    Glue コンソールから Crawler を設定して S3 と RDS それぞれのカタログ Table を作成します。こちらの公式ブログに記載されているとおり GUI 設定を繰り返すだけです。

    Database の作成

    Glue におけるメタデータ格納用 Table をまとめるための Database を作成します。

    • Database name: mygluedb

    Connections の作成

    JDBC 接続で利用する認証情報を登録します。Connection type で RDS や Redshift を指定すると JDBC 設定が多少簡略化されますが、ここでは Connection type JDBC として設定します。設定後は「Test connection」ボタンで正常に接続できるか試験できます。

    RDS

    • Connection name: my-rds-connection
    • Connection type: JDBC
    • JDBC URL: VPC 内の Glue がアクセスするためのエンドポイントを設定します。JDBC URL のサンプルはこちらです。jdbc:mysql://myrdsdb.xxxx.us-east-1.rds.amazonaws.com:3306/myrdsdb のようになります。
    • Username: myuser
    • Password: mypass
    • VPC: Glue が所属する VPC の設定です。RDS と同じ VPC を指定します。
    • Subnet: Glue が所属する Subnet の設定です。RDS が存在する Subnet と通信できるものを指定します。前述のとおり Glue は Public IP を持たないため、NAT Gateway にルーティングされるプライベート Subnet を選択します。
    • Security groups: Glue に設定する Security groups です。RDS にも設定した「同じセキュリティグループが設定されているアクセス元であれば許可する」という設定がなされたものを追加します。この RDS と同一のセキュリティグループを設定することで、RDS へのアクセスが許可されるようになります。

    Redshift

    • Connection name: my-redshift-connection
    • Connection type: JDBC
    • JDBC URL: VPC 内の Glue がアクセスするためのエンドポイントを設定します。JDBC URL のサンプルはこちらです。jdbc:redshift://mydw.xxxx.us-east-1.redshift.amazonaws.com:5439/mydw のようになります。
    • Username: myuser
    • Password: myPassword20171124
    • VPC: Glue が所属する VPC の設定です。Redshift と同じ VPC を指定します。
    • Subnet: Glue が所属する Subnet の設定です。Redshift が存在する Subnet と通信できるものを指定します。前述のとおり Glue は Public IP を持たないため、NAT Gateway にルーティングされるプライベート Subnet を選択します。
    • Security groups: Glue に設定する Security groups です。Redshift にも設定した「同じセキュリティグループが設定されているアクセス元であれば許可する」という設定がなされたものを追加します。この Redshift と同一のセキュリティグループを設定することで、Redshift へのアクセスが許可されるようになります。

    Crawlers の作成

    S3 のデータをカタログ化する Crawler を登録します。

    • Crawler name: my-s3-crawler-20171124
    • Data store: S3
      • Crawl data in: Specified path in my account
      • Include path: s3://my-bucket-20171124/s3.json
    • Choose an existing IAM role: my-glue-role-20171124
    • Frequency: Run on demand
    • Database: mygluedb
    • Prefix added to tables: mys3prefix_

    RDS のデータをカタログ化する Crawler を登録します。

    • Crawler name: my-rds-crawler-20171124
    • Data store: JDBC
      • Connection: my-rds-connection
      • Include path: myrdsdb/myrdstable
    • Choose an existing IAM role: my-glue-role-20171124
    • Frequency: Run on demand
    • Database: mygluedb
    • Prefix added to tables: myrdsprefix_

    それぞれ手動実行して CloudWatch にログが出力されることを確認します。今回のように Data store 毎に Crawlers を分けることは必須ではなく、似たような Data store であれば同じ Crawler にまとめて登録します。結果として一つの Crawler から複数の Table が生成されます。Crawler は Table に対して大量に作成するようなものではありません

    ETL Job を作成

    Glue コンソールから Job を登録します。具体的には ETL のソースとターゲットを設定します。設定内容をもとに PySpark スクリプトが自動生成されます。

    • Name: myjob-20171124
    • IAM role: my-glue-role-20171124
    • This job runs: A proposed script generated by AWS Glue (スクリプトを自動生成する設定にします。ここで自分のスクリプトを最初から設定することもできます)
    • Script file name: myjob-20171124
    • S3 path where the script is stored: s3://aws-glue-scripts-xxxx-us-west-2/admin (スクリプトは S3 上に生成されます)
    • Temporary directory: s3://aws-glue-scripts-xxxx-us-west-2/tmp (Job 実行のためには一時ファイルなどが必要になるため S3 のフォルダを指定する必要があります)
    • Choose your data sources: mys3prefix_s3_json (ここでは S3 を指定していますが、後でスクリプトを編集することで RDS も Source として扱えます)
    • Choose your data targets: Create tables in your data target (ETL 出力先は Redshift を指定します)
      • Data store: JDBC
      • Connection: my-redshift-connection
      • Database name: mydw
    • Map the source columns to target columns: PySpark スクリプト自動生成のヒントとなります。使用しない Source 情報は Target から除外します。
    • Job 作成後、Edit job で my-rds-connection を Required connections に追加します。

    PySpark 開発環境の構築

    生成された PySpark スクリプトは Job の「Edit script」ボタンからエディタを開いて確認できます。このまま編集して保存することもできますが、動作検証の度に Run job するのは時間がかかります。以下の開発環境を構築して十分に検証してから Run job すると効率的です。

    エンドポイントの作成

    Apache Spark への踏み台となる EC2 インスタンスを、開発環境から利用するエンドポイントとして作成します。Glue コンソールの Dev endpoints から GUI で作成できます。READY 状態のエンドポイントは課金対象となります。安くはないため、費用を抑えるためには例えば DPU を 2 として作成します。DPU 1 だと作成に失敗します。

    • Development endpoint name: myendpoint-20171124
    • IAM role: my-glue-role-20171124 (本ページ上部で作成した Glue から利用する IAM ロール)
    • Data processing units (DPUs): 2
    • Networking: インターネット経由で S3 のみ利用する場合は Skip networking information で問題ありません。RDS や Redshift を利用する場合は Choose a connection から作成済みのものを選択して VPC にこれから作成する EC2 インスタンスを所属させる必要があります。
    • Public key contents: これからエンドポイントとして作成する、Apache Spark への踏み台となる EC2 インスタンスに SSH するための公開鍵を登録します。普段使用しているものを登録するか、ここで新規に作成します。

    作成されるまでしばらくかかります。

    PySpark 開発環境の選択

    エンドポイントとして作成した Apache Spark への踏み台となる EC2 インスタンスを利用する開発環境としては、以下の三つの構築方法があります。

    三番目の選択肢が便利ですが、費用がかかります。二番目の選択肢は若干手間がかかりますが、費用が抑えられます。一番目の選択肢は簡単な検証時に有用です。SSH コマンドは作成したエンドポイントの詳細画面からコピーして利用できます。また、二番目の選択肢において Docker を利用する場合は、ホスト側のサービスをコンテナ内から利用することになるため ssh ポート転送時に -g オプションを付与する必要があることや、Zeppelin 設定時に localhost ではなくホストの IP を指定する必要があることに注意します。

    IAM ロールの作成

    三番目の選択肢で Apache Zeppelin が稼動する EC2 インスタンスを CloudFormation で起動する場合は、事前に EC2 インスタンスで利用する IAM ロールを作成する必要があります。S3, RDS, Redshift すべてを利用する場合は以下のようになります。

    • ロール名: my-ec2-glue-notebook-role-20171124
    • AWS service: EC2
    • ポリシー
      • AWSGlueServiceNotebookRole
      • AmazonS3FullAccess
      • AmazonRDSFullAccess
      • AmazonRedshiftFullAccess

    PySpark スクリプトを編集して ETL Job を実行

    構築した開発環境で PySpark スクリプトを編集します。検証まで完了したら ETL Job として実行します。以下に編集例を記載します。Zeppelin 操作時は %pyspark で記述し始める必要があることに注意します。

    Data Source から情報を取得して表示する例

    %pyspark
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.dynamicframe import DynamicFrame
    from pyspark.sql.functions import udf
    from pyspark.sql.functions import desc
    
    # AWS Glue を操作するオブジェクト
    sc = SparkContext.getOrCreate()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    
    # S3 からデータを取得
    srcS3 = glueContext.create_dynamic_frame.from_catalog(
        database = 'mygluedb',
        table_name = 'mys3prefix_s3_json')
    
    # 情報を表示
    print 'Count:', srcS3.count()
    srcS3.printSchema()
    
    # S3 から取得したデータを、加工せずにそのまま S3 に CSV フォーマットで出力
    glueContext.write_dynamic_frame.from_options(
        frame = srcS3,
        connection_type = 's3',
        connection_options = {
            'path': 's3://my-bucket-20171124/target'
        },
        format = 'csv')
    

    S3 出力例

    $ aws s3 ls s3://my-bucket-20171124/target/
    2017-11-26 15:59:44         40 run-1511679582568-part-r-00000
    $ aws s3 cp s3://my-bucket-20171124/target/run-1511679582568-part-r-00000 -
    pstr,pint
    aaa,1
    bbb,2
    ccc,3
    ddd,4
    eee,5
    

    標準出力例

    Count: 5
    root
    |-- pstr: string
    |-- pint: int
    

    ETL Job として実行する際は %pyspark を削除して、以下のように Job の初期化と終了の処理を追記します。

    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.dynamicframe import DynamicFrame
    from pyspark.sql.functions import udf
    from pyspark.sql.functions import desc
    
    # AWS Glue を操作するオブジェクト
    sc = SparkContext.getOrCreate()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    
    # Job の初期化  ←追加
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    
    # S3 からデータを取得
    srcS3 = glueContext.create_dynamic_frame.from_catalog(
        database = 'mygluedb',
        table_name = 'mys3prefix_s3_json')
    
    # 情報を表示
    print 'Count:', srcS3.count()
    srcS3.printSchema()
    
    # S3 から取得したデータを、加工せずにそのまま S3 に CSV フォーマットで出力
    glueContext.write_dynamic_frame.from_options(
        frame = srcS3,
        connection_type = 's3',
        connection_options = {
            'path': 's3://my-bucket-20171124/target'
        },
        format = 'csv')
    
    # Job を終了  ←追加
    job.commit()
    

    S3 出力例

    $ aws s3 ls s3://my-bucket-20171124/target/
    2017-11-26 15:59:44         40 run-1511679582568-part-r-00000
    $ aws s3 cp s3://my-bucket-20171124/target/run-1511679582568-part-r-00000 -
    pstr,pint
    aaa,1
    bbb,2
    ccc,3
    ddd,4
    eee,5
    

    CloudWatch 出力一部例

    LogType:stdout
    Log Upload Time:Sun Nov 26 06:59:44 +0000 2017
    LogLength:46
    Log Contents:
    Count: 5
    root
    |-- pstr: string
    |-- pint: int
    
    End of LogType:stdout
    

    PySpark チートシート

    AWS Glue DynamicFrame と Apache Spark DataFrame は toDF() および fromDF() で互いに変換できます。AWS Glue が提供する DynamicFrame では表現できない処理が存在する場合、「データソースから取得したデータのキャストや不要なフィールドの除去」および「最終的な構造の変換とデータ出力」の間に発生する「データ変換」は Apache Spark の DataFrame に変換して記述します。DataFrame 関連のドキュメントを調査する際には Python の dir と help 関数の存在も知っておきます。

    DynamicFrame / 入力データの整形

    filter

    srcS3.filter(lambda r: r['pint'] > 1).toDF().show()
    +----+----+
    |pint|pstr|
    +----+----+
    |   2| bbb|
    |   3| ccc|
    |   4| ddd|
    |   5| eee|
    +----+----+
    

    フィールドを選択

    srcS3.select_fields(['pstr']).toDF().show()
    +----+
    |pstr|
    +----+
    | aaa|
    | bbb|
    | ccc|
    | ddd|
    | eee|
    +----+
    

    フィールドを除外

    srcS3.drop_fields(['pstr']).toDF().show()
    +----+
    |pint|
    +----+
    |   1|
    |   2|
    |   3|
    |   4|
    |   5|
    +----+
    

    フィールドの名称を変更

    srcS3.rename_field('pstr', 'pstr2').toDF().show()
    +----+-----+
    |pint|pstr2|
    +----+-----+
    |   1|  aaa|
    |   2|  bbb|
    |   3|  ccc|
    |   4|  ddd|
    |   5|  eee|
    +----+-----+
    

    型変換

    キャスト可能な型はこちらです。

    srcS3.resolveChoice(specs = [('pstr', 'cast:int')]).toDF().show()
    +----+----+
    |pstr|pint|
    +----+----+
    |null|   1|
    |null|   2|
    |null|   3|
    |null|   4|
    |null|   5|
    +----+----+
    

    DynamicFrame / データの変換

    Join

    Join.apply(srcS3, srcS3, 'pstr', 'pstr').toDF().show()
    +----+----+-----+-----+
    |pint|pstr|.pint|.pstr|
    +----+----+-----+-----+
    |   2| bbb|    2|  bbb|
    |   4| ddd|    4|  ddd|
    |   5| eee|    5|  eee|
    |   3| ccc|    3|  ccc|
    |   1| aaa|    1|  aaa|
    +----+----+-----+-----+
    

    以下のように記述することもできます。

    srcS3.join('pstr', 'pstr', srcS3).toDF().show()
    +----+----+-----+-----+
    |pint|pstr|.pint|.pstr|
    +----+----+-----+-----+
    |   2| bbb|    2|  bbb|
    |   4| ddd|    4|  ddd|
    |   5| eee|    5|  eee|
    |   3| ccc|    3|  ccc|
    |   1| aaa|    1|  aaa|
    +----+----+-----+-----+
    

    SplitFields

    指定したフィールドのみから成るフレームと、残りのフィールドから成るフレームの二つを含む DynamicFrameCollection を返します。DynamicFrameCollection からは select で DynamicFrame を取り出せます。

    dfc = SplitFields.apply(srcS3, ['pstr'])
    for dyf_name in dfc.keys():
        dfc.select(dyf_name).toDF().show()
    +----+
    |pstr|
    +----+
    | aaa|
    | bbb|
    | ccc|
    | ddd|
    | eee|
    +----+
    +----+
    |pint|
    +----+
    |   1|
    |   2|
    |   3|
    |   4|
    |   5|
    +----+
    

    map

    def my_f(dyr):
        dyr['pint'] = dyr['pint'] + 10
        return dyr
    
    srcS3.map(my_f).toDF().show()
    
    +----+----+
    |pint|pstr|
    +----+----+
    |  11| aaa|
    |  12| bbb|
    |  13| ccc|
    |  14| ddd|
    |  15| eee|
    +----+----+
    

    unnest

    dyf = srcS3.apply_mapping([
      ('pstr', 'string', 'proot.str', 'string'),
      ('pint', 'int', 'proot.int', 'int')
    ])
    dyf.printSchema()
    dyf2 = dyf.unnest()
    dyf2.printSchema()
    root
    |-- proot: struct
    |    |-- str: string
    |    |-- int: int
    root
    |-- proot.str: string
    |-- proot.int: int
    

    collection / select

    dfc = SplitFields.apply(srcS3, ['pstr'], 'split_off', 'remaining')
    dfc.select('split_off').toDF().show()
    dfc.select('remaining').toDF().show()
    +----+
    |pstr|
    +----+
    | aaa|
    | bbb|
    | ccc|
    | ddd|
    | eee|
    +----+
    +----+
    |pint|
    +----+
    |   1|
    |   2|
    |   3|
    |   4|
    |   5|
    +----+
    

    collection / map

    def my_f(dyf, ctx):
        df = dyf.toDF()
        return DynamicFrame.fromDF(df.union(df), glueContext, 'dyf')
    
    dfc = SplitFields.apply(srcS3, ['pstr'])
    dfc2 = dfc.map(my_f)
    for dyf_name in dfc2.keys():
        dfc2.select(dyf_name).toDF().show()
    
    +----+
    |pstr|
    +----+
    | aaa|
    | bbb|
    | ccc|
    | ddd|
    | eee|
    | aaa|
    | bbb|
    | ccc|
    | ddd|
    | eee|
    +----+
    +----+
    |pint|
    +----+
    |   1|
    |   2|
    |   3|
    |   4|
    |   5|
    |   1|
    |   2|
    |   3|
    |   4|
    |   5|
    +----+
    

    DynamicFrame / 変換後データの整形および出力

    DynamicFrame への変換フレーム構造の変更

    # 相互変換
    df = srcS3.toDF()
    dyf = DynamicFrame.fromDF(df, glueContext, 'dyf')
    dyf.printSchema()
    
    # 構造変更
    dyf2 = dyf.apply_mapping([
      ('pstr', 'string', 'proot.str', 'string'),
      ('pint', 'int', 'proot.int', 'int')
    ])
    dyf2.printSchema()
    
    root
    |-- pstr: string
    |-- pint: int
    root
    |-- proot: struct
    |    |-- str: string
    |    |-- int: int
    

    RDB に格納できるような複数フレームに分割

    フレーム分割のイメージ図はこちらです。

    from pyspark.sql.functions import array
    
    # 検証用にリスト形式のフィールドを追加します。
    df = srcS3.toDF()
    df2 = df.withColumn('plist', array(df.pstr, df.pint))
    df2.show()
    
    dyf = DynamicFrame.fromDF(df2, glueContext, 'dyf')
    
    # リレーショナルな関係にある複数のフレームに分割します。
    dfc = dyf.relationalize('my_dyf_root', 's3://my-bucket-20171124/tmp')
    for dyf_name in dfc.keys():
        print dyf_name
        dfc.select(dyf_name).toDF().show()
    
    +----+----+--------+
    |pstr|pint|   plist|
    +----+----+--------+
    | aaa|   1|[aaa, 1]|
    | bbb|   2|[bbb, 2]|
    | ccc|   3|[ccc, 3]|
    | ddd|   4|[ddd, 4]|
    | eee|   5|[eee, 5]|
    +----+----+--------+
    my_dyf_root
    +----+----+-----+
    |pstr|pint|plist|
    +----+----+-----+
    | aaa|   1|    1|
    | bbb|   2|    2|
    | ccc|   3|    3|
    | ddd|   4|    4|
    | eee|   5|    5|
    +----+----+-----+
    my_dyf_root_plist
    +---+-----+---------+
    | id|index|plist.val|
    +---+-----+---------+
    |  1|    0|      aaa|
    |  1|    1|        1|
    |  2|    0|      bbb|
    |  2|    1|        2|
    |  3|    0|      ccc|
    |  3|    1|        3|
    |  4|    0|      ddd|
    |  4|    1|        4|
    |  5|    0|      eee|
    |  5|    1|        5|
    +---+-----+---------+
    

    DataFrame / データの変換

    直接 SQL を記述

    メソッドチェーンではなく直接 SQL で記述できることを知っておきます。

    df = srcS3.toDF()
    df.createOrReplaceTempView('temptable')
    sql_df = spark.sql('SELECT * FROM temptable')
    sql_df.show()
    print spark.sql('SELECT * FROM temptable LIMIT 1').first().pstr
    +----+----+
    |pstr|pint|
    +----+----+
    | aaa|   1|
    | bbb|   2|
    | ccc|   3|
    | ddd|   4|
    | eee|   5|
    +----+----+
    aaa
    

    Python オブジェクトへの変換

    SQL で解決することが難しい特殊な処理は、以下の手順で Python オブジェクトに変換することで解決できることがあります。

    タプルのリストに変換

    df = srcS3.toDF()
    tuples = df.rdd.map(lambda row: (row.pstr, row.pint)).collect()
    print tuples
    [(u'aaa', 1), (u'bbb', 2), (u'ccc', 3), (u'ddd', 4), (u'eee', 5)]
    

    処理を行った結果が例えばタプルのリストの場合、DataFrame には以下のように変換できます。

    from pyspark.sql import Row
    spark.createDataFrame(map(lambda tup: Row(pstr2=tup[0], pint2=tup[1]), tuples)).show()
    +-----+-----+
    |pint2|pstr2|
    +-----+-----+
    |    1|  aaa|
    |    2|  bbb|
    |    3|  ccc|
    |    4|  ddd|
    |    5|  eee|
    +-----+-----+
    

    Apache Spark は基本的にすべての処理をメモリ上で行います。AWS Glue のように Cluster mode で動作する Apache Spark の場合、一つの Driver と複数の Executor が登場します。PySpark スクリプトの処理は Driver が実行します。DataFrame を含む Resilient Distributed Dataset (RDD) は複数の Executor のメモリ上に分割して展開されます。DataFrame の Executors への分割数は repartition() で調整できます。RDD を処理する場合は複数の Executor がタスクを分割します。そのため RDD を print しても Driver の標準出力ではタスクの実行結果を確認できません。上記 collect() は複数の Executor に存在する RDD を Driver に集めるためのものです。そのため collect() で集めて扱うデータサイズが巨大な場合は Driver のメモリ不足に注意します。また、以下のようなエラーが発生する場合があります。

    Container killed by YARN for exceeding memory limits. xxx GB of yyy GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

    Executors における RDD 処理において、Driver 上の Python オブジェクトを Executors と都度共有するようなことはせず、Python オブジェクトの処理はすべて Driver 上で処理して完全に DataFrame (RDD) 化してから Executors に展開することで解消する可能性があります。例えば RDD の変換処理における udf() で Driver 上の Python オブジェクトを参照することは避けます。また、扱う RDD がそもそも Executor のメモリ容量に対して非常に大きい場合などは DPU を増やすことで対応できます。

    空の DataFrame を作成

    emptyRDDStructTypeStructField を利用します。

    from pyspark.sql.types import StructType
    from pyspark.sql.types import StructField
    from pyspark.sql.types import LongType
    df = spark.createDataFrame(sc.emptyRDD(), StructType([
        StructField("pint", LongType(), False)
    ]))
    

    ペア RDD の生成

    DataFrame からペア RDD とよばれる特殊な RDD を生成するためには KeyBy を利用します。

    rdd = df.rdd.keyBy(lambda row: row.pint)
    

    subtractByKey といった処理が可能になります。

    rdd2 = df2.rdd.keyBy(lambda row: row.pint)
    df3 = spark.createDataFrame(rdd.subtractByKey(rdd2).values())
    

    S3 から直接読み込み

    Crawler でカタログ Table を作成していない場合であっても、スキーマ情報が事前に分かっている場合は S3 から直接データを取得できます。JSON の場合は以下のようになります。

    df = spark.read.json('s3://my-bucket-20171124/s3.json')
    

    フィールドの追加

    後述の select で同様の処理を記述できますが withColumn によってフィールドを追加できることを知っておきます。関数を用いて動的にフィールドの値を設定する場合は以下のようになります。前述の通り udf() 内では Driver 上のオブジェクトを参照しないように注意します。

    from pyspark.sql.types import IntegerType
    def my_f(x):
        return x * 2
    df = srcS3.toDF()
    df.withColumn('pint2', udf(my_f, IntegerType())(df['pint'])).show()
    +----+----+-----+
    |pstr|pint|pint2|
    +----+----+-----+
    | aaa|   1|    2|
    | bbb|   2|    4|
    | ccc|   3|    6|
    | ddd|   4|    8|
    | eee|   5|   10|
    +----+----+-----+
    

    定数のフィールドを追加する場合は lit を利用します。

    from pyspark.sql.functions import lit
    df = srcS3.toDF()
    df.withColumn('pint2', lit(123)).show()
    +----+----+-----+
    |pstr|pint|pint2|
    +----+----+-----+
    | aaa|   1|  123|
    | bbb|   2|  123|
    | ccc|   3|  123|
    | ddd|   4|  123|
    | eee|   5|  123|
    +----+----+-----+
    

    select

    df = srcS3.toDF()
    df.select(df['pstr'], df['pstr'].substr(1,3).alias('pstr2'), (df['pint'] % 2).alias('peo')).show()
    +----+-----+---+
    |pstr|pstr2|peo|
    +----+-----+---+
    | aaa|  aaa|  1|
    | bbb|  bbb|  0|
    | ccc|  ccc|  1|
    | ddd|  ddd|  0|
    | eee|  eee|  1|
    +----+-----+---+
    

    distinct、dropDuplicates

    from pyspark.sql.functions import lit
    df = srcS3.toDF().limit(2)
    df2 = df.union(df).withColumn('pint2', lit(123))
    df2.show()
    +----+----+-----+
    |pstr|pint|pint2|
    +----+----+-----+
    | aaa|   1|  123|
    | bbb|   2|  123|
    | aaa|   1|  123|
    | bbb|   2|  123|
    +----+----+-----+
    

    すべてのフィールドに関して、重複を除去するためには distinct を利用します。

    df2.distinct().show()
    +----+----+-----+
    |pstr|pint|pint2|
    +----+----+-----+
    | bbb|   2|  123|
    | aaa|   1|  123|
    +----+----+-----+
    

    特定のフィールドに関して、重複を除去するためには dropDuplicates を利用します。

    df2.dropDuplicates(['pint2']).show()
    +----+----+-----+
    |pstr|pint|pint2|
    +----+----+-----+
    | aaa|   1|  123|
    +----+----+-----+
    

    where

    df = srcS3.toDF()
    df.where(df['pint'] > 1).show()
    +----+----+
    |pstr|pint|
    +----+----+
    | bbb|   2|
    | ccc|   3|
    | ddd|   4|
    | eee|   5|
    +----+----+
    

    groupBy/集約関数

    count

    df = srcS3.toDF()
    df.groupBy(df['pint'] % 2).count().show()
    +----------+-----+
    |(pint % 2)|count|
    +----------+-----+
    |         1|    3|
    |         0|    2|
    +----------+-----+
    df.groupBy(df['pstr'], (df['pint'] % 2).alias('peo')).count().show()
    +----+---+-----+
    |pstr|peo|count|
    +----+---+-----+
    | ccc|  1|    1|
    | eee|  1|    1|
    | ddd|  0|    1|
    | aaa|  1|    1|
    | bbb|  0|    1|
    +----+---+-----+
    

    sum

    df = srcS3.toDF()
    df.groupBy(df['pint'] % 2).sum('pint').show()
    +----------+---------+
    |(pint % 2)|sum(pint)|
    +----------+---------+
    |         1|        9|
    |         0|        6|
    +----------+---------+
    

    min/max

    df = srcS3.toDF()
    df.groupBy(df['pint'] % 2).min('pint').show()
    +----------+---------+
    |(pint % 2)|min(pint)|
    +----------+---------+
    |         1|        1|
    |         0|        2|
    +----------+---------+
    df.groupBy(df['pint'] % 2).max('pint').show()
    +----------+---------+
    |(pint % 2)|max(pint)|
    +----------+---------+
    |         1|        5|
    |         0|        4|
    +----------+---------+
    

    avg

    df = srcS3.toDF()
    df.groupBy(df['pint'] % 2).avg('pint').show()
    +----------+---------+
    |(pint % 2)|avg(pint)|
    +----------+---------+
    |         1|      3.0|
    |         0|      3.0|
    +----------+---------+
    

    orderBy

    df = srcS3.toDF()
    df.orderBy(desc('pstr'), 'pint').show()
    +----+----+
    |pstr|pint|
    +----+----+
    | eee|   5|
    | ddd|   4|
    | ccc|   3|
    | bbb|   2|
    | aaa|   1|
    +----+----+
    

    RDD 永続化について

    RDD の操作には

    • DynamicFrame の filter や DataFrame の where のように、クラスタ全体の Executors に分割して配置された RDD すべてを利用する必要のない「変換」
    • DynamicFrame の count や DataFrame の collect のように、クラスタ全体の Executors に分割して配置された RDD すべてを利用しなければならない「アクション」

    の二つが存在します。「変換」の操作結果は RDD です。「変換」は「アクション」で必要になるまで実行されません。これを遅延評価とよびます。また、同じ「変換」であっても「アクション」で必要になる度に繰り返し遅延評価されます。

    RDD 永続化による高速化

    cache() を利用すると、アクション実行のために遅延評価されて変換された結果の RDD が Executors のメモリに保存されます。

    df = src.xxx.xxx.xxx.xxx  ← 複雑な RDD 変換
    df.cache()  ← 次のアクションで永続化します
    df.count()  ← RDD 永続化はここでなされます
    

    上記複雑な RDD 変換の結果が永続化されている状態で RDD を利用したアクションを再度実行したとしても

    df.count()
    

    複雑な RDD 変換は再度実行されません。cached がない状態で上記アクションを実行すると、再度以下のような複雑な RDD 変換が遅延評価されてしまいます。HDFS からのデータ読み出しを含めると時間がかかるため、複数のアクションで必要になる RDD は永続化するようにします。

    src.xxx.xxx.xxx.xxx.count()
    

    cache()storageLevel詳細に指定できる persist() の略記法のようなものです。永続化した RDD が不要になったら unpersist() で Executors から明示的に削除するようにします。

    df.unpersist()
    

    可視化

    Apache Zeppelin を利用している場合

    df = srcS3.toDF()
    df.createOrReplaceTempView('df')
    

    として結果を保存した状態で、以下のようにしてグラフ化できます。

    %sql
    select * from df
    

    PySpark スクリプトを記述する際の参考資料

    実際に開発を行う際に都度辞書的に利用できるリファレンスには以下のようなものがあります。

    macOS の Homebrew を利用できる場合は、Apache Spark をインストールして、実行しながらリファレンスの内容を確認できます。

    brew install apache-spark
    

    Python

    pyspark
    

    Scala

    spark-shell
    
    Likeボタン(off)0
    詳細設定を開く/閉じる
    アカウント プロフィール画像

    Pythonの初心者向け記事を多数執筆。データ解析にも挑戦中です。

    記事の執筆者にステッカーを贈る

    有益な情報に対するお礼として、またはコメント欄における質問への返答に対するお礼として、 記事の読者は、執筆者に有料のステッカーを贈ることができます。

    >>さらに詳しくステッカーを贈る
    ステッカーを贈る コンセプト画像

    Feedbacks

    Feedbacks コンセプト画像

      ログインするとコメントを投稿できます。

      ログインする

      関連記事

      • AWS EC2 インスタンスの選定方法
        準仮想化と完全仮想化 AWS のインスタンスタイプが準仮想化と完全仮想化のどちらの仮想化技術を採用したハードウェアであるかによって、使用できる AMI (OS) が異なるのは以下の理由によります。 準仮想化 ParaVirtualization (PV) において OS は自分が仮想化用のハードウェア上で動作していることを知っています。つまり仮想化用にカスタマイズされた専用の OS が必要になりま...
        yuki_coderyuki_coder8/30/2017に更新
        いいねアイコン画像0
      • OpenVPN で二つの VPC をつなぐための設定
        インターネット VPN (Virtual Private Network) には二拠点間の通信を暗号化する方式によって IPsec-VPN や SSL-VPN などがあります。OpenVPN は SSL-VPN の実装のひとつです。AWS VPC を二つ用意してそれらを OpenVPN で接続してみます。 VPC の構成 myvpc-1 (10.1.0.0/16) mysubnet-1-1 (10...
        takuyatakuya8/11/2017に更新
        いいねアイコン画像0
      • Windows Server EC2 インスタンスへの CAL インストール
        サムネイル画像-8bae43ea3b
        Windows サーバを AWS EC2 インスタンスとして起動した後は、RDP 接続が必要となります。RDP 接続を行なうためには Remote Desktop Services (RDS) ライセンスが必要となります。 Windows Server のライセンスは、管理用途の RDS 接続を許可しています。その際に RDS ライセンスは不要です。ただし、[同時接続数が 2 という制限があります...
        けんちゃんけんちゃん11/18/2023に更新
        いいねアイコン画像0
      • AWS Lambda の基本的な使い方
        サムネイル画像-9285163f6b
        AWS Lambda はイベントドリブンな「関数」を登録できるサービスです。例えば S3 に画像がアップロードされたときにサムネイル用のサイズに加工する処理が記述された関数を登録できます。基本的な使い方をまとめます。 事前準備 関数の登録はブラウザで AWS コンソールにログインして行うこともできますが、本ページでは AWS C
        yuki_coderyuki_coder1/18/2020に更新
        いいねアイコン画像0
      • AWS 落穂拾い (Data Engineering)
        Kinesis Kinesis Streams データは 3 AZ にレプリケーションされます。Amazon Kinesis Data Streams FAQs 24 時間 (既定値) から 1 年までデータ保持できます。[Changing the Data Retention Period](https://docs.aws.amazon
        yuki_coderyuki_coder12/22/2024に更新
        いいねアイコン画像0