DynamoDBからの全データ取得におけるプラクティス

弊社は、サーバーレスアーキテクチャの開発を得意としており、Amazon DynamoDBやAWS Lambda等のサービスを活用した効率的でスケーラブルなシステム設計を提供しています。
その中で、アプリケーション開発において「画面に全データを一覧表示する」という仕様はよく見られるものの、NoSQLデータベースでは適切なアプローチを選ばないとパフォーマンスやコストの課題が発生します。
本記事では、DynamoDBから全データを取得する方法と、その実験結果をもとにした設計上の注意点について解説します。
全データ取得の課題
DynamoDBはスケーラビリティに優れたデータベースですが、設計次第で処理性能が大きく変化します。
特に以下の点に注意が必要です:
- Scanのリスク: Scanはテーブル全体をフルスキャンするため、膨大なデータ量の場合に時間がかかり、コストも増加します。
- Queryの可能性: 適切に設計されたパーティションキーとソートキーを利用することで、特定のデータセットを効率的に取得できます。
実験概要
DynamoDBで3万件、10万件のデータを用意し、ScanとQueryの処理時間を比較しました。以下の2つのアプローチを検証しています。
- 全データを単一パーティションにまとめる
- データを複数パーティションに分散させる
データはAWS公式サンプルリポジトリ aws-samples/csv-to-dynamodb のtestfile.csvを使用。
実験のパターンごとに少しデータを修正し、投入します。
実験1: 単一パーティションにまとめる
手法
- パーティションキー: 固定値(例:
DATA)を設定するGSIを作成。 - 目的: 単一パーティションにデータを格納し、Queryで効率的に全データを取得する。
データ構造例:
| DataType (PK) | uuid | Country | ItemType |
|---|---|---|---|
| DATA | 535113847 | Azerbaijan | Snacks |
| DATA | 874708545 | Panama | Cosmetics |
Queryのソースコード(AWS Lambda用)
import boto3
from boto3.dynamodb.conditions import Key
def lambda_handler(event, context):
table_name = 'test_datas'
region = 'us-east-1'
gsi_name = 'DataType-index'
# DynamoDBクライアントの初期化
dynamodb = boto3.resource('dynamodb', region_name=region)
table = dynamodb.Table(table_name)
# クエリ条件 DataType が DATA
key_condition_expression = Key('DataType').eq('DATA')
try:
options = {
'IndexName': gsi_name,
'KeyConditionExpression': key_condition_expression,
'ReturnConsumedCapacity': 'TOTAL'
}
while True:
response = table.query(**options)
next_token = response.get('LastEvaluatedKey', None)
print(next_token)
if next_token:
options['ExclusiveStartKey'] = next_token
else:
break
return {
'statusCode': 200
}
except Exception as e:
print('エラー:', e)
return {
'statusCode': 500,
'body': 'サーバーエラーが発生しました'
}
Scanのソースコード(AWS Lambda用)
import boto3
def lambda_handler(event, context):
table_name = 'test_datas_'
region = 'us-east-1'
# DynamoDBクライアントの初期化
dynamodb = boto3.resource('dynamodb', region_name=region)
table = dynamodb.Table(table_name)
try:
options = {
'ReturnConsumedCapacity': 'TOTAL'
}
while True:
response = table.scan(**options)
next_token = response.get('LastEvaluatedKey', None)
print(next_token)
if next_token:
options['ExclusiveStartKey'] = next_token
else:
break
return {
'statusCode': 200
}
except Exception as e:
print('エラー:', e)
return {
'statusCode': 500,
'body': 'サーバーエラーが発生しました'
}
結果(Lambdaのウォームスタート)
| データ量 | メモリ (MB) | Query (秒) | Scan (秒) |
|---|---|---|---|
| 3万件 | 512 | 15 | 15 |
| 2048 | 4 | 5 | |
| 10万件 | 512 | 50 | 50 |
| 2048 | 13 | 15 |
考察
- 全データ取得という観点ではScanもQueryもあまり処理時間の違いはないことがわかります。
- データ量が増えると取得時間が数十秒かかるため、ユーザー体験の悪化が懸念されます。
- メモリを増やすことで処理時間が短縮されますが、それでも数万件以上では非同期処理や別の工夫が必要です。
実験2: データを複数パーティションに分散する
手法
- パーティションキー: シャード番号やランダム識別子を付加。
- 目的: データを複数のパーティションに分散し、並列処理で取得速度を向上させる。
データ構造例:
| ItemCategory (PK) | uuid | DataType | Country | ItemType |
|---|---|---|---|---|
| ItemCategory#11 | 535113847 | DATA | Azerbaijan | Snacks |
| ItemCategory#5 | 874708545 | DATA | Panama | Cosmetics |
並列でクエリを実行するソースコード(AWS Lambda用)
import boto3
import os
from botocore.config import Config
from concurrent.futures import ThreadPoolExecutor, as_completed
# DynamoDB テーブル名
TABLE_NAME = "test_datas"
SHARD_COUNT = 12 # シャードの数
custom_config = Config(
max_pool_connections=12
)
# DynamoDB クライアントの作成
dynamodb = boto3.client('dynamodb', config=custom_config)
# 最大ワーカー数
max_workers = min(32, os.cpu_count() * 5)
def query_shard(partition_key):
"""指定されたシャードをクエリする関数"""
try:
items = []
options = {
'TableName': TABLE_NAME,
'IndexName': 'ItemCategory-index',
'KeyConditionExpression': "ItemCategory = :pk",
'ExpressionAttributeValues': {":pk": {"S": partition_key}}
}
while True:
response = dynamodb.query(**options)
items.extend(response.get("Items", []))
next_token = response.get('LastEvaluatedKey', None)
if next_token:
options['ExclusiveStartKey'] = next_token
else:
break
print(f"{partition_key}: {len(items)}件")
return items
except Exception as e:
print(f"Error querying {partition_key}: {e}")
return []
def query_all_shards():
"""すべてのシャードを並列でクエリする"""
print(f'max_workers: {max_workers}')
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# シャードごとにクエリを実行
futures = {
executor.submit(query_shard, f"ItemCategory#{shard_number}"): shard_number
for shard_number in range(1, SHARD_COUNT + 1)
}
# 結果を集約
results = []
for future in as_completed(futures):
try:
shard_result = future.result()
results.extend(shard_result)
except Exception as e:
print(f"Error in shard {futures[future]}: {e}")
return results
def lambda_handler(event, context):
"""Lambda 関数のエントリポイント"""
results = query_all_shards()
print(f'{len(results)}件')
return {
"statusCode": 200,
"body": {
}
}結果(Lambdaのウォームスタート)
| データ量 | メモリ (MB) | Query (秒) |
|---|---|---|
| 3万件 | 512 | 11 |
| 2048 | 3.3 | |
| 10万件 | 512 | Out of memory Error |
| 2048 | 10 |
考察
- パーティション分散により取得速度が向上しました。
- データ量が増えるとOut of Memoryエラーが発生する可能性があるため、クエリ方法やメモリ管理の工夫が必要です。
まとめ
DynamoDBで全データを取得する際のベストプラクティスは以下の通りです。
- データ量が少ない場合: 単一パーティションでQueryを使用しても問題ありません。
- データ量が多い場合: パーティションを分散し、並列でQueryを実行することで効率的にデータを取得可能です。
- 非同期処理: データ量が多い場合は同期処理ではなく、非同期でデータを取得する設計を検討しましょう。
- 仕様の再検討: 全データを一覧表示する代わりに、条件を絞り込んで表示する仕様に変更できないかをまず検討するべきでしょう。
効率的なデータ取得には、データ設計とアプリケーション設計のバランスが欠かせません。本記事を参考に、DynamoDBを活用したパフォーマンスの高いアプリケーションを構築してください!
この記事をシェアする
合同会社raisexでは一緒に働く仲間を募集中です。
ご興味のある方は以下の採用情報をご確認ください。