Kinesis Stream はデータ集計などに便利な機能を有するキューのようなものを提供します。SQS と似ていますが、リアルタイム性の有無など用途が異なります。本ページでは、こちらのページで使い方を把握した AWS Lambda が Kinesis Stream からデータを定期的に取得して処理するための設定方法をまとめます。
以下のようなファイルを作ります。Kinesis Stream に入力された文字列を CloudWatch にログ出力しています。
ProcessKinesisRecords.js
console.log('Loading function');
exports.handler = function(event, context, callback) {
event.Records.forEach(function(record) {
// base64 エンコーディングされたデータをデコードします。
// https://nodejs.org/dist/latest-v6.x/docs/api/buffer.html#buffer_class_method_buffer_from_string_encoding
var payload = Buffer.from(record.kinesis.data, 'base64').toString('ascii');
console.log('Decoded payload:', payload);
});
callback(null, "message");
};
依存ライブラリはないため js ファイルのみを zip 化します。
zip -r ProcessKinesisRecords.zip ProcessKinesisRecords.js
AWS コンソールを利用して、Lambda 関数が利用する lambda-kinesis-execution-role
ロールを作成します。ロールには Kinesis Stream からデータを取得するための AWSLambdaKinesisExecutionRole
権限を付与します。
Lambda 用の設定がなされたロールを選択
Kinesis 権限を付与
ロール名を設定
以下のコマンドでアップロードします。lambda-test-user-20170929
はこちらのページの事前準備で登録した、Lambda 関数全般の権限が付与された IAM です。
aws lambda create-function \
--profile lambda-test-user-20170929 \
--region ap-northeast-1 \
--function-name ProcessKinesisRecords \
--zip-file fileb://ProcessKinesisRecords.zip \
--role arn:aws:iam::123412341234:role/lambda-kinesis-execution-role \
--handler ProcessKinesisRecords.handler \
--runtime nodejs6.10
CLI から aws lambda invoke
コマンドでテストすることもできますが、ここでは簡単のためコンソールからテスト実行します。様々なテンプレートが用意されています。ここでは Kinesis Stream からのデータを模擬した Kinesis
テンプレートを選択します。
AmazonKinesisFullAccess
権限を lambda-test-user-20170929
に付与してから、以下のコマンドで Kinesis Stream を作成します。
aws kinesis create-stream \
--profile lambda-test-user-20170929 \
--region ap-northeast-1 \
--stream-name examplestream \
--shard-count 1
正常に作成されたことを確認します。
aws kinesis describe-stream \
--profile lambda-test-user-20170929 \
--region ap-northeast-1 \
--stream-name examplestream
S3 が Lambda を実行する場合と異なり、Lambda が Kinesis Stream から能動的にデータを取得します。そのための設定 Event Source Mapping を以下の create-event-source-mapping コマンドで行います。
aws lambda create-event-source-mapping \
--profile lambda-test-user-20170929 \
--region ap-northeast-1 \
--function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:ap-northeast-1:123412341234:stream/examplestream \
--batch-size 100 \
--starting-position TRIM_HORIZON
一度に最大 100 個のデータを取得するように --batch-size
で設定しています。また --starting-position
では Kinesis Stream のどの位置のイベントからデータ取得を開始するかを指定しています。TRIM_HORIZON
はキューの最も古いイベントからデータ取得を開始するための設定です。その他に以下のようなものがあります。
LATEST
最新のイベントからデータ取得を開始AT_SEQUENCE_NUMBER
指定した位置 StartingSequenceNumber
のイベントからデータ取得を開始AFTER_SEQUENCE_NUMBER
指定した位置 StartingSequenceNumber
の「次の」イベントからデータ取得を開始AT_TIMESTAMP
指定した時刻 Timestamp
のタイムスタンプを有するイベントからデータ取得を開始正しく設定されたことは list-event-source-mappings コマンドで確認できます。
aws lambda list-event-source-mappings \
--profile lambda-test-user-20170929 \
--region ap-northeast-1 \
--function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:ap-northeast-1:123412341234:stream/examplestream
実際には何らかの集計対象ログ等を監視するクライアントアプリケーションが行うことですが、ここでは検証のため put-record コマンドを用いてデータを投入してみます。CloudWatch ログに --data
で指定した文字列が出力されていることが確認できれば成功です。
aws kinesis put-record \
--profile lambda-test-user-20170929 \
--region ap-northeast-1 \
--stream-name examplestream \
--data "TEST TEST TEST" \
--partition-key shardId-000000000000