モーダルを閉じる工作HardwareHub ロゴ画像

工作HardwareHubは、ロボット工作や電子工作に関する情報やモノが行き交うコミュニティサイトです。さらに詳しく

利用規約プライバシーポリシー に同意したうえでログインしてください。

目次目次を開く/閉じる

Kinesis Stream から AWS Lambda でデータを取得して処理

モーダルを閉じる

ステッカーを選択してください

お支払い手続きへ
モーダルを閉じる

お支払い内容をご確認ください

購入商品
」ステッカーの表示権
メッセージ
料金
(税込)
決済方法
GooglePayマーク
決済プラットフォーム
確認事項

利用規約をご確認のうえお支払いください

※カード情報はGoogleアカウント内に保存されます。本サイトやStripeには保存されません

※記事の執筆者は購入者のユーザー名を知ることができます

※購入後のキャンセルはできません

作成日作成日
2017/10/07
最終更新最終更新
2020/01/11
記事区分記事区分
一般公開

目次

    フロントからバックエンドまで、幅広い技術をカバーしています。

    Kinesis Stream はデータ集計などに便利な機能を有するキューのようなものを提供します。SQS と似ていますが、リアルタイム性の有無など用途が異なります。本ページでは、 AWS Lambda が Kinesis Stream からデータを定期的に取得して処理するための設定方法をまとめます。

    zip パッケージの作成

    以下のようなファイルを作ります。Kinesis Stream に入力された文字列を CloudWatch にログ出力しています。

    ProcessKinesisRecords.js

    console.log('Loading function');
    
    exports.handler = function(event, context, callback) {
      event.Records.forEach(function(record) {
        // base64 エンコーディングされたデータをデコードします。
        // https://nodejs.org/dist/latest-v6.x/docs/api/buffer.html#buffer_class_method_buffer_from_string_encoding
        var payload = Buffer.from(record.kinesis.data, 'base64').toString('ascii');
        console.log('Decoded payload:', payload);
      });
      callback(null, "message");
    };
    

    依存ライブラリはないため js ファイルのみを zip 化します。

    zip -r ProcessKinesisRecords.zip ProcessKinesisRecords.js
    

    IAM ロールの作成

    AWS コンソールを利用して、Lambda 関数が利用する lambda-kinesis-execution-role ロールを作成します。ロールには Kinesis Stream からデータを取得するための AWSLambdaKinesisExecutionRole 権限を付与します。

    Lambda 用の設定がなされたロールを選択

    Kinesis 権限を付与

    ロール名を設定

    Lambda 関数の登録

    以下のコマンドでアップロードします。lambda-test-user-20170929 はLambda 関数全般の権限が付与された IAM です。

    aws lambda create-function \
    --profile lambda-test-user-20170929 \
    --region ap-northeast-1 \
    --function-name ProcessKinesisRecords \
    --zip-file fileb://ProcessKinesisRecords.zip \
    --role arn:aws:iam::123412341234:role/lambda-kinesis-execution-role \
    --handler ProcessKinesisRecords.handler \
    --runtime nodejs6.10
    

    Lambda 関数のテスト実行

    CLI から aws lambda invoke コマンドでテストすることもできますが、ここでは簡単のためコンソールからテスト実行します。様々なテンプレートが用意されています。ここでは Kinesis Stream からのデータを模擬した Kinesis テンプレートを選択します。

    Kinesis Stream の作成

    AmazonKinesisFullAccess 権限を lambda-test-user-20170929 に付与してから、以下のコマンドで Kinesis Stream を作成します。

    aws kinesis create-stream \
    --profile lambda-test-user-20170929 \
    --region ap-northeast-1 \
    --stream-name examplestream \
    --shard-count 1
    

    正常に作成されたことを確認します。

    aws kinesis describe-stream \
    --profile lambda-test-user-20170929 \
    --region ap-northeast-1 \
    --stream-name examplestream
    

    Kinesis Stream からデータを取得するための Lambda 設定

    S3 が Lambda を実行する場合と異なり、Lambda が Kinesis Stream から能動的にデータを取得します。そのための設定 Event Source Mapping を以下の create-event-source-mapping コマンドで行います。

    aws lambda create-event-source-mapping \
    --profile lambda-test-user-20170929 \
    --region ap-northeast-1 \
    --function-name ProcessKinesisRecords \
    --event-source arn:aws:kinesis:ap-northeast-1:123412341234:stream/examplestream \
    --batch-size 100 \
    --starting-position TRIM_HORIZON
    

    一度に最大 100 個のデータを取得するように --batch-size で設定しています。また --starting-position では Kinesis Stream のどの位置のイベントからデータ取得を開始するかを指定しています。TRIM_HORIZON はキューの最も古いイベントからデータ取得を開始するための設定です。その他に以下のようなものがあります。

    • LATEST 最新のイベントからデータ取得を開始
    • AT_SEQUENCE_NUMBER 指定した位置 StartingSequenceNumber のイベントからデータ取得を開始
    • AFTER_SEQUENCE_NUMBER 指定した位置 StartingSequenceNumber の「次の」イベントからデータ取得を開始
    • AT_TIMESTAMP 指定した時刻 Timestamp のタイムスタンプを有するイベントからデータ取得を開始

    正しく設定されたことは list-event-source-mappings コマンドで確認できます。

    aws lambda list-event-source-mappings \
    --profile lambda-test-user-20170929 \
    --region ap-northeast-1 \
    --function-name ProcessKinesisRecords \
    --event-source arn:aws:kinesis:ap-northeast-1:123412341234:stream/examplestream
    

    Kinesis Stream にデータを投入

    実際には何らかの集計対象ログ等を監視するクライアントアプリケーションが行うことですが、ここでは検証のため put-record コマンドを用いてデータを投入してみます。CloudWatch ログに --data で指定した文字列が出力されていることが確認できれば成功です。

    aws kinesis put-record \
    --profile lambda-test-user-20170929 \
    --region ap-northeast-1 \
    --stream-name examplestream \
    --data "TEST TEST TEST" \
    --partition-key shardId-000000000000
    
    Likeボタン(off)0
    詳細設定を開く/閉じる
    アカウント プロフィール画像

    フロントからバックエンドまで、幅広い技術をカバーしています。

    記事の執筆者にステッカーを贈る

    有益な情報に対するお礼として、またはコメント欄における質問への返答に対するお礼として、 記事の読者は、執筆者に有料のステッカーを贈ることができます。

    >>さらに詳しくステッカーを贈る
    ステッカーを贈る コンセプト画像

    Feedbacks

    Feedbacks コンセプト画像

      ログインするとコメントを投稿できます。

      ログインする

      関連記事

      • AWS EC2 インスタンスの選定方法
        準仮想化と完全仮想化 AWS のインスタンスタイプが準仮想化と完全仮想化のどちらの仮想化技術を採用したハードウェアであるかによって、使用できる AMI (OS) が異なるのは以下の理由によります。 準仮想化 ParaVirtualization (PV) において OS は自分が仮想化用のハードウェア上で動作していることを知っています。つまり仮想化用にカスタマイズされた専用の OS が必要になりま...
        yuki_coderyuki_coder8/30/2017に更新
        いいねアイコン画像0
      • OpenVPN で二つの VPC をつなぐための設定
        インターネット VPN (Virtual Private Network) には二拠点間の通信を暗号化する方式によって IPsec-VPN や SSL-VPN などがあります。OpenVPN は SSL-VPN の実装のひとつです。AWS VPC を二つ用意してそれらを OpenVPN で接続してみます。 VPC の構成 myvpc-1 (10.1.0.0/16) mysubnet-1-1 (10...
        takuyatakuya8/11/2017に更新
        いいねアイコン画像0
      • Windows Server EC2 インスタンスへの CAL インストール
        サムネイル画像-8bae43ea3b
        Windows サーバを AWS EC2 インスタンスとして起動した後は、RDP 接続が必要となります。RDP 接続を行なうためには Remote Desktop Services (RDS) ライセンスが必要となります。 Windows Server のライセンスは、管理用途の RDS 接続を許可しています。その際に RDS ライセンスは不要です。ただし、[同時接続数が 2 という制限があります...
        けんちゃんけんちゃん11/18/2023に更新
        いいねアイコン画像0
      • AWS Lambda の基本的な使い方
        サムネイル画像-9285163f6b
        AWS Lambda はイベントドリブンな「関数」を登録できるサービスです。例えば S3 に画像がアップロードされたときにサムネイル用のサイズに加工する処理が記述された関数を登録できます。基本的な使い方をまとめます。 事前準備 関数の登録はブラウザで AWS コンソールにログインして行うこともできますが、本ページでは AWS C
        yuki_coderyuki_coder1/18/2020に更新
        いいねアイコン画像0
      • AWS 落穂拾い (Data Engineering)
        Kinesis Kinesis Streams データは 3 AZ にレプリケーションされます。Amazon Kinesis Data Streams FAQs 24 時間 (既定値) から 1 年までデータ保持できます。[Changing the Data Retention Period](https://docs.aws.amazon
        yuki_coderyuki_coder12/22/2024に更新
        いいねアイコン画像0