我将记录放入流中(我从 AWS 文档获取脚本):
import datetime
import json
import random
import boto3
STREAM_NAME = "apache-flink-demo-input-1"
def get_data():
return {
'event_time': datetime.datetime.now().isoformat(),
'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
'price': round(random.random() * 100, 2)}
def generate(stream_name, kinesis_client):
while True:
data = get_data()
print(data)
response = kinesis_client.put_record(
StreamName=stream_name,
Data=json.dumps(data),
PartitionKey="1")
print(response)
print()
if __name__ == '__main__':
generate(STREAM_NAME,
boto3.client(
'kinesis',
aws_access_key_id="XXXX", # Hidden
aws_secret_access_key="XXXX", # Hidden
region_name='us-west-1'))
我得到的回复是每条记录都成功:
{'ShardId': 'shardId-000000000003', 'SequenceNumber': '49644599907995286627749093391250621075271860022832791602', 'ResponseMetadata': {'RequestId': 'f0e04f67-94fa-934b-ad60-6e3e40d06706', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'f0e04f67-94fa-934b-ad60-6e3e40d06706', 'x-amz-id-2': 'KLepCUNwgrZMY639W6QhdyFRsfeiqAnBqdLWdPSdDKxgDnGUqRGNly/dCPE/GHGVhUN3YXXB98PUvtWRHidOM4PRI4/edeaI', 'date': 'Sat, 16 Sep 2023 21:33:13 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '110'}, 'RetryAttempts': 0}}
看来您看错了流
STREAM_NAME = "apache-flink-demo-input-1"
但是,您在屏幕截图中发布的流是输出流。您可能想检查适当的流。