Kinesis Stream はデータ集計などに便利な機能を有するキューのようなものを提供します。SQS と似ていますが、リアルタイム性の有無など用途が異なります。本ページでは、 AWS Lambda が Kinesis Stream からデータを定期的に取得して処理するための設定方法をまとめます。
zip パッケージの作成
以下のようなファイルを作ります。Kinesis Stream に入力された文字列を CloudWatch にログ出力しています。
ProcessKinesisRecords.js
console.log('Loading function');
exports.handler = function(event, context, callback) {
event.Records.forEach(function(record) {
// base64 エンコーディングされたデータをデコードします。
// https://nodejs.org/dist/latest-v6.x/docs/api/buffer.html#buffer_class_method_buffer_from_string_encoding
var payload = Buffer.from(record.kinesis.data, 'base64').toString('ascii');
console.log('Decoded payload:', payload);
});
callback(null, "message");
};
依存ライブラリはないため js ファイルのみを zip 化します。
zip -r ProcessKinesisRecords.zip ProcessKinesisRecords.js
IAM ロールの作成
AWS コンソールを利用して、Lambda 関数が利用する lambda-kinesis-execution-role
ロールを作成します。ロールには Kinesis Stream からデータを取得するための AWSLambdaKinesisExecutionRole
権限を付与します。
Lambda 用の設定がなされたロールを選択
Kinesis 権限を付与
ロール名を設定
Lambda 関数の登録
以下のコマンドでアップロードします。lambda-test-user-20170929
はLambda 関数全般の権限が付与された IAM です。
aws lambda create-function \
--profile lambda-test-user-20170929 \
--region ap-northeast-1 \
--function-name ProcessKinesisRecords \
--zip-file fileb://ProcessKinesisRecords.zip \
--role arn:aws:iam::123412341234:role/lambda-kinesis-execution-role \
--handler ProcessKinesisRecords.handler \
--runtime nodejs6.10
Lambda 関数のテスト実行
CLI から aws lambda invoke
コマンドでテストすることもできますが、ここでは簡単のためコンソールからテスト実行します。様々なテンプレートが用意されています。ここでは Kinesis Stream からのデータを模擬した Kinesis
テンプレートを選択します。
Kinesis Stream の作成
AmazonKinesisFullAccess
権限を lambda-test-user-20170929
に付与してから、以下のコマンドで Kinesis Stream を作成します。
aws kinesis create-stream \
--profile lambda-test-user-20170929 \
--region ap-northeast-1 \
--stream-name examplestream \
--shard-count 1
正常に作成されたことを確認します。
aws kinesis describe-stream \
--profile lambda-test-user-20170929 \
--region ap-northeast-1 \
--stream-name examplestream
Kinesis Stream からデータを取得するための Lambda 設定
S3 が Lambda を実行する場合と異なり、Lambda が Kinesis Stream から能動的にデータを取得します。そのための設定 Event Source Mapping を以下の create-event-source-mapping コマンドで行います。
aws lambda create-event-source-mapping \
--profile lambda-test-user-20170929 \
--region ap-northeast-1 \
--function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:ap-northeast-1:123412341234:stream/examplestream \
--batch-size 100 \
--starting-position TRIM_HORIZON
一度に最大 100 個のデータを取得するように --batch-size
で設定しています。また --starting-position
では Kinesis Stream のどの位置のイベントからデータ取得を開始するかを指定しています。TRIM_HORIZON
はキューの最も古いイベントからデータ取得を開始するための設定です。その他に以下のようなものがあります。
LATEST
最新のイベントからデータ取得を開始AT_SEQUENCE_NUMBER
指定した位置StartingSequenceNumber
のイベントからデータ取得を開始AFTER_SEQUENCE_NUMBER
指定した位置StartingSequenceNumber
の「次の」イベントからデータ取得を開始AT_TIMESTAMP
指定した時刻Timestamp
のタイムスタンプを有するイベントからデータ取得を開始
正しく設定されたことは list-event-source-mappings コマンドで確認できます。
aws lambda list-event-source-mappings \
--profile lambda-test-user-20170929 \
--region ap-northeast-1 \
--function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:ap-northeast-1:123412341234:stream/examplestream
Kinesis Stream にデータを投入
実際には何らかの集計対象ログ等を監視するクライアントアプリケーションが行うことですが、ここでは検証のため put-record コマンドを用いてデータを投入してみます。CloudWatch ログに --data
で指定した文字列が出力されていることが確認できれば成功です。
aws kinesis put-record \
--profile lambda-test-user-20170929 \
--region ap-northeast-1 \
--stream-name examplestream \
--data "TEST TEST TEST" \
--partition-key shardId-000000000000
関連記事
- AWS EC2 インスタンスの選定方法準仮想化と完全仮想化 AWS のインスタンスタイプが準仮想化と完全仮想化のどちらの仮想化技術を採用したハードウェアであるかによって、使用できる AMI (OS) が異なるのは以下の理由によります。 準仮想化 ParaVirtualization (PV) において OS は自分が仮想化用のハードウェア上で動作していることを知っています。つまり仮想化用にカスタマイズされた専用の OS が必要になりま...
- OpenVPN で二つの VPC をつなぐための設定インターネット VPN (Virtual Private Network) には二拠点間の通信を暗号化する方式によって IPsec-VPN や SSL-VPN などがあります。OpenVPN は SSL-VPN の実装のひとつです。AWS VPC を二つ用意してそれらを OpenVPN で接続してみます。 VPC の構成 myvpc-1 (10.1.0.0/16) mysubnet-1-1 (10...
- Windows Server EC2 インスタンスへの CAL インストールWindows サーバを AWS EC2 インスタンスとして起動した後は、RDP 接続が必要となります。RDP 接続を行なうためには Remote Desktop Services (RDS) ライセンスが必要となります。 Windows Server のライセンスは、管理用途の RDS 接続を許可しています。その際に RDS ライセンスは不要です。ただし、[同時接続数が 2 という制限があります...
- AWS Lambda の基本的な使い方AWS Lambda はイベントドリブンな「関数」を登録できるサービスです。例えば S3 に画像がアップロードされたときにサムネイル用のサイズに加工する処理が記述された関数を登録できます。基本的な使い方をまとめます。 事前準備 関数の登録はブラウザで AWS コンソールにログインして行うこともできますが、本ページでは AWS C
- AWS 落穂拾い (Data Engineering)Kinesis Kinesis Streams データは 3 AZ にレプリケーションされます。Amazon Kinesis Data Streams FAQs 24 時間 (既定値) から 1 年までデータ保持できます。[Changing the Data Retention Period](https://docs.aws.amazon