我正在使用快速 API 后端,我已将其设置为与 AWS 的 DynamoDB 客户端配合使用。我已经声明了以下包装器,可以帮助我管理数据库的读取和写入。
(一个名为database的自定义模块包含两个文件,第一个定义了一个名为DeviceDataManagaer的类,第二个有一个init)。
class DeviceDataManager():
def __init__(self, dyn_resource):
"""
: param dyn_resource: A Boto3 DynamoDB resource.
"""
self.dyn_resource = dyn_resource
self.device_table = None
self.master_order_table = None
self.master_history_table = None
def load_tables(
self,
device_table: str,
master_order_table:str,
master_history_table:str
) -> bool:
"""
Attempts to load the given tables, storing them in a disctionary that is stored
as a member variable. Returns a boolean indicating whether all tables were
loaded or not.
"""
table_names = (device_table, master_order_table, master_history_table)
table_existence = [False] * len(table_names)
loading_tables = []
for i, table_in in enumerate(table_names):
try:
table = self.dyn_resource.Table(table_in)
table.load()
table_existence[i] = True
except ClientError as err:
if err.response["Error"]["Code"] == "ResourceNotFoundException":
table_existence[i] = False
else:
logger.error(
"Couldn't check for existence of tables. Here's why: %s: %s",
err.response["Error"]["Code"],
err.response["Error"]["Message"],
)
raise
else:
loading_tables.append(table)
self.device_table = loading_tables[0]
self.master_order_table, self.master_history_table = loading_tables[1:3]
return all(table_existence)
def get_device_data(self, serial: str) -> dict | None:
"""
Gets an item from the Device Data Table
"""
try:
# response = self.device_table.get_item(Key={"Serial_Number": serial, "Local_Time_Str": "string"})
# FIXME: Replace arg to Serial_Number
if True:
print(repr(serial))
response = self.device_table.query(
KeyConditionExpression=(
Key('Serial_Number').eq(serial)
)
)
except ClientError as err:
logger.error(
"Couldn't get item from %s. Here's why: %s: %s",
self.device_table.name,
err.response["Error"]["Code"],
err.response["Error"]["Message"],
)
raise
else:
return response.get("Item")
# ... More methods here...
我在数据库中有三个表,其中 _init_ 如下所示:
import os, pathlib, boto3
from dotenv import load_dotenv
from .DeviceDataManager import DeviceDataManager
base_dir = pathlib.Path("June Presentation/app").parent.parent.parent
# Load DynamoDB Credentials
key_path = "./app/environment vars/aws.env"
if os.path.exists(base_dir.joinpath(key_path)):
load_dotenv(base_dir.joinpath(key_path))
else:
raise RuntimeError(f"Credentials not found for DeviceDataManager's DynamoDB Client at {base_dir.joinpath(key_path)}")
class __Config:
DB_REGION_NAME = os.getenv('DB_REGION_NAME')
DB_ACCESS_KEY_ID = os.getenv('DB_ACCESS_KEY_ID')
DB_SECRET_ACCESS_KEY = os.getenv('DB_SECRET_ACCESS_KEY')
def get_device_db(
device_table: str = "Device_Data",
order_table: str = "Master_Order",
history_table: str = "Master_History"
) -> DeviceDataManager:
"""
Creates an instance of DeviceDataManager allowing access to the DynamoDB's table.
"""
db = DeviceDataManager(
boto3.resource(
'dynamodb',
region_name=__Config.DB_REGION_NAME,
aws_access_key_id=__Config.DB_ACCESS_KEY_ID,
aws_secret_access_key=__Config.DB_SECRET_ACCESS_KEY
)
)
if (not db.load_tables(device_table, order_table, history_table)):
raise FileNotFoundError("One or more tables not found!")
return db
Device_Data 表具有以下模型:
from pydantic import BaseModel
from pydantic import BaseModel, conlist
from decimal import Decimal
class Device(BaseModel):
Serial_Number: str
class DeviceData(Device):
Local_Time_Str: str # Sort Key
local_ip: str
touch_mode: int
location: str
region: str
country: str
latitude: str
longitude: str
temperature: Decimal
condition: str
wind_speed: Decimal
humidity: Decimal
fan_status_arr: conlist(int, min_length=5, max_length=5)
fan_selecting_pwm: int
另外两个主表工作正常,但 Device_Data 表让我抓狂。它有一个名为“Serial_Number”的 Parrtition_Key 和一个名为“Local_Time_Str”的排序键。我已经测试了与数据库的连接,其他一切正常。当我使用分区和排序键调用 device_data.get_item 时,它也可以工作,但由于某种原因,当我仅使用分区键使用 table.query() 时,我收到 404 Not Found 响应,即使当我在 AWS 上尝试时也是如此控制台(在它有效的网站上)。
HTTP GET 方法如下(针对 fast-api 路由器)
from fastapi import APIRouter, Depends, HTTPException
from ..models.models import DeviceData, MasterData
from ..database import get_device_db, DeviceDataManager
from ..internal.Authentication import get_current_active_user
from ..models.Authentication import User
from typing import Annotated
router = APIRouter(
prefix="/device",
tags=["device"],
responses={404: {"description": "Not found"}},
)
# Instantiate the DynamoDB Table Manager (For Device Data)
try:
db: DeviceDataManager = get_device_db()
except FileNotFoundError as err:
raise print(err)
@router.get("/get-device", response_model=DeviceData)
async def get_item(
serial: str,
current_user: Annotated[User, Depends(get_current_active_user)]
) -> DeviceData | None:
print("NotImplementedWarning!") #FIXME
item = db.get_device_data(serial)
print(item) # DEBUG
if item:
return item
raise HTTPException(status_code=404, detail="Item not found")
如果您能帮助解决此问题,我将不胜感激。
我尝试了带有分区键和排序键的 table.get_item() ,效果很好。我还在 AWS DDB 控制台上尝试了查询,它也有效。我在线阅读了 AWS DDB 的文档和其他文章,但没有任何内容突出或以任何方式帮助我。
正如 @jarmod 在评论中指出的,这个问题非常简单,而且是一个基本的拼写错误。将“Item”替换为“Items”可以解决该问题。
def get_device_data(self, serial: str) -> dict | None:
"""
Gets an item from the Device Data Table
"""
try:
# response = self.device_table.get_item(Key={"Serial_Number": serial, "Local_Time_Str": "string"})
# FIXME: Replace arg to Serial_Number
if True:
print(repr(serial))
response = self.device_table.query(
KeyConditionExpression=(
Key('Serial_Number').eq(serial)
)
)
except ClientError as err:
logger.error(
"Couldn't get item from %s. Here's why: %s: %s",
self.device_table.name,
err.response["Error"]["Code"],
err.response["Error"]["Message"],
)
raise
else:
return response.get("Item")