Tags: aws lambda function dynamodb s3
Dynamodb is a great NoSQL service by AWS. Often it’s required to export data from the dynamodb table .
First, let us review our use case. Our lambda function will read from table from dynamodb and export JSON to s3.
![]() |
import json | |
import traceback | |
import boto3 | |
def lambda_handler(event, context): | |
""" | |
Export Dynamodb to s3 (JSON) | |
""" | |
statusCode = 200 | |
statusMessage = 'Success' | |
try: | |
# parse the payload | |
tableName = event['tableName'] | |
s3_bucket = event['s3_bucket'] | |
s3_object = event['s3_object'] | |
filename = event['filename'] | |
# scan the dynamodb | |
dynamodb = boto3.resource('dynamodb') | |
table = dynamodb.Table(tableName) | |
response = table.scan() | |
data = response['Items'] | |
# maximum data set limit is 1MB | |
# so we need to have this additional step | |
while 'LastEvaluatedKey' in response: | |
response = dynamodb.scan( | |
TableName=tableName, | |
Select='ALL_ATTRIBUTES', | |
ExclusiveStartKey=response['LastEvaluatedKey']) | |
data.extend(response['Items']) | |
# export JSON to s3 bucket | |
s3 = boto3.resource('s3') | |
s3.Object(s3_object, s3_object + filename).put(Body=json.dumps(data)) | |
except Exception as e: | |
statusCode = 400 | |
statusMessage = traceback.format_exc() | |
return { | |
"statusCode": statusCode, | |
"status": statusMessage | |
} |
We can create a payload to test our lambda function
{
"TableName": "DynamoDB_Table_name",
"s3_bucket": "s3_bucket_name",
"s3_object": "s3_object_name",
"filename": "output.json"
}
However, sometimes we might encounter errors for certain values in DynamoDB.
TypeError: Object of type Decimal is not JSON serializable.
We can use a JSONEncoder class to update our lamda function.
import json | |
import traceback | |
import boto3 | |
from collections.abc import Mapping, Iterable | |
from decimal import Decimal | |
class DecimalEncoder(json.JSONEncoder): | |
def encode(self, obj): | |
if isinstance(obj, Mapping): | |
return '{' + ', '.join(f'{self.encode(k)}: {self.encode(v)}' for (k, v) in obj.items()) + '}' | |
elif isinstance(obj, Iterable) and (not isinstance(obj, str)): | |
return '[' + ', '.join(map(self.encode, obj)) + ']' | |
elif isinstance(obj, Decimal): | |
return f'{obj.normalize():f}' # using normalize() gets rid of trailing 0s, using ':f' prevents scientific notation | |
else: | |
print(obj) | |
return super().encode(obj) | |
def lambda_handler(event, context): | |
""" | |
Export Dynamodb to s3 (JSON) | |
""" | |
statusCode = 200 | |
statusMessage = 'Success' | |
try: | |
# parse the payload | |
tableName = event['tableName'] | |
s3_bucket = event['s3_bucket'] | |
s3_object = event['s3_object'] | |
filename = event['filename'] | |
# scan the dynamodb | |
dynamodb = boto3.resource('dynamodb') | |
table = dynamodb.Table(tableName) | |
response = table.scan() | |
data = response['Items'] | |
# maximum data set limit is 1MB | |
# so we need to have this additional step | |
while 'LastEvaluatedKey' in response: | |
response = dynamodb.scan( | |
TableName=tableName, | |
Select='ALL_ATTRIBUTES', | |
ExclusiveStartKey=response['LastEvaluatedKey']) | |
data.extend(response['Items']) | |
# export JSON to s3 bucket | |
s3 = boto3.resource('s3') | |
body = json.dumps(data, cls=DecimalEncoder) | |
s3.Object(s3_bucket, s3_object + filename).put(Body=json.dumps(data)) | |
except Exception as e: | |
statusCode = 400 | |
statusMessage = traceback.format_exc() | |
return { | |
"statusCode": statusCode, | |
"status": statusMessage | |
} |
Another way to export data is to use boto3 client. It’s a low level AWS services.
import json | |
import traceback | |
import boto3 | |
def get_data(table_name, client): | |
""" | |
Get data from DyanamoDB | |
""" | |
results = [] | |
last_evaluated_key = None | |
while True: | |
if last_evaluated_key: | |
response = client.scan( | |
TableName=table_name, | |
ExclusiveStartKey=last_evaluated_key | |
) | |
else: | |
response = client.scan(TableName=table_name) | |
last_evaluated_key = response.get('LastEvaluatedKey') | |
results.extend(response['Items']) | |
if not last_evaluated_key: | |
break | |
return results | |
def lambda_handler(event, context): | |
""" | |
Export Dynamodb to s3 (JSON) | |
""" | |
statusCode = 200 | |
statusMessage = 'Success' | |
try: | |
# parse the payload | |
tableName = event['tableName'] | |
s3_bucket = event['s3_bucket'] | |
s3_object = event['s3_object'] | |
filename = event['filename'] | |
# scan the dynamodb | |
dynamodb = boto3.resource('dynamodb') | |
table = dynamodb.Table(tableName) | |
client = boto3.client('dynamodb') | |
data = get_data(tableName, client) | |
# export JSON to s3 bucket | |
s3 = boto3.resource('s3') | |
s3.Object(s3_bucket, s3_object + filename).put(Body=json.dumps(data)) | |
except Exception as e: | |
statusCode = 400 | |
statusMessage = traceback.format_exc() | |
return { | |
"statusCode": statusCode, | |
"status": statusMessage | |
} |
However boto3 client will generates dynamodb JSON. A simple python script to convert it back to normalized JSON using dynamodb_json
library.
import time | |
import uuid | |
from datetime import datetime | |
from decimal import Decimal | |
from dynamodb_json import json_util as json2 | |
import json | |
import sys | |
filename = sys.argv[1] | |
output = sys.argv[2] | |
with open(filename) as f: | |
data = json.load(f) | |
data_new = json2.load(data) | |
with open(output, 'w') as outfile: | |
json.dump(data_new, outfile) |