DynamoDB は MongoDB/Cassandra/Couchbase といった NoSQL データベースの一つです。DynamoDB のオプション機能としてテーブルのストリームを有効にできます。こちらのページの Kinesis Stream と同様に、ストリームが有効なテーブルを更新すると、その更新内容を Lambda 関数で取得して何らかの処理を行うことができます。簡単な使用例をまとめます。
Lambda 関数が必要とする基本的な権限を付与します。
DynamoDB ストリームからデータを取得する必要があるため AWSLambdaDynamoDBExecutionRole
を追加で付与します。
ロール名を設定して作成します。
こちらのページの事前準備と同様の手順で AWS CLI コマンドを利用できるようにしておきます。以下のコマンドで、更新内容を CloudWatch ログ出力するだけの Lambda 関数を登録します。
ProcessDynamoDBStream.js
console.log('Loading function');
exports.handler = function(event, context, callback) {
// https://developer.mozilla.org/ja/docs/Web/JavaScript/Reference/Global_Objects/JSON/stringify
console.log(JSON.stringify(event, null, 2));
event.Records.forEach(function(record) {
console.log(record.eventID);
console.log(record.eventName);
// %j はオブジェクト用の出力フォーマット
console.log('DynamoDB Record: %j', record.dynamodb);
});
callback(null, "message");
};
zip 化します。
zip -r ProcessDynamoDBStream.zip ProcessDynamoDBStream.js
aws lambda create-function \
--profile lambda-test-user-20170929 \
--region ap-northeast-1 \
--function-name ProcessDynamoDBStream \
--zip-file fileb://ProcessDynamoDBStream.zip \
--role arn:aws:iam::123412341234:role/lambda-dynamodb-execution-role \
--handler ProcessDynamoDBStream.handler \
--runtime nodejs6.10
aws lambda invoke \
--profile lambda-test-user-20170929 \
--invocation-type RequestResponse \
--function-name ProcessDynamoDBStream \
--region ap-northeast-1 \
--payload file://input.txt \
output.txt
{
"Records":[
{
"eventID":"1",
"eventName":"INSERT",
"eventVersion":"1.0",
"eventSource":"aws:dynamodb",
"awsRegion":"us-east-1",
"dynamodb":{
"Keys":{
"Id":{
"N":"101"
}
},
"NewImage":{
"Message":{
"S":"New item!"
},
"Id":{
"N":"101"
}
},
"SequenceNumber":"111",
"SizeBytes":26,
"StreamViewType":"NEW_AND_OLD_IMAGES"
},
"eventSourceARN":"stream-ARN"
},
{
"eventID":"2",
"eventName":"MODIFY",
"eventVersion":"1.0",
"eventSource":"aws:dynamodb",
"awsRegion":"us-east-1",
"dynamodb":{
"Keys":{
"Id":{
"N":"101"
}
},
"NewImage":{
"Message":{
"S":"This item has changed"
},
"Id":{
"N":"101"
}
},
"OldImage":{
"Message":{
"S":"New item!"
},
"Id":{
"N":"101"
}
},
"SequenceNumber":"222",
"SizeBytes":59,
"StreamViewType":"NEW_AND_OLD_IMAGES"
},
"eventSourceARN":"stream-ARN"
},
{
"eventID":"3",
"eventName":"REMOVE",
"eventVersion":"1.0",
"eventSource":"aws:dynamodb",
"awsRegion":"us-east-1",
"dynamodb":{
"Keys":{
"Id":{
"N":"101"
}
},
"OldImage":{
"Message":{
"S":"This item has changed"
},
"Id":{
"N":"101"
}
},
"SequenceNumber":"333",
"SizeBytes":38,
"StreamViewType":"NEW_AND_OLD_IMAGES"
},
"eventSourceARN":"stream-ARN"
}
]
}
AWS コンソールからテーブルを作成してストリーム機能を有効化します。その後、Lambda 関数のトリガーとして DynamoDB ストリームを ARN 指定で追加します。
aws lambda create-event-source-mapping \
--profile lambda-test-user-20170929 \
--region ap-northeast-1 \
--function-name ProcessDynamoDBStream \
--event-source arn:aws:dynamodb:ap-northeast-1:123412341234:table/mytable20171010/stream/2017-10-10T13:59:23.177 \
--batch-size 100 \
--starting-position TRIM_HORIZON
正しく設定されたことは list-event-source-mappings コマンドで確認できます。
aws lambda list-event-source-mappings \
--profile lambda-test-user-20170929 \
--region ap-northeast-1 \
--function-name ProcessDynamoDBStream \
--event-source arn:aws:dynamodb:ap-northeast-1:123412341234:table/mytable20171010/stream/2017-10-10T13:59:23.177
コンソールから Table に Item を追加すると Lambda で処理されて CloudWatch にログが出力されるようになります。