我正在开发一个使用 stripe API 进行订阅的电报机器人。我目前面临着在 stripe webhooks 中将电报聊天 ID 作为客户端 ID 传递的问题,这样当我从 stripe 获取数据时,我还可以获取用户的电报聊天 ID 以将其保存到我的数据库中。 我在下面提供了我的烧瓶代码,我不知道我做错了什么,但收到了 Bad Request 400 错误
import logging
from flask import Flask, request, jsonify
import stripe
import psycopg2
from asgiref.wsgi import WsgiToAsgi
# Stripe configuration (use test key for testing)
stripe_api_key = "stripe_api_key"
endpoint_secret = 'endpoint_secret'
# Database connection details
DB_NAME = "xyz"
DB_USER = "user"
DB_PASSWORD = "0000"
DB_HOST = "localhost"
DB_PORT = "5432"
# Enable logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Flask application
app = Flask(__name__)
asgi_app = WsgiToAsgi(app) # Wrap the Flask app with WsgiToAsgi adapter
# Database connection function
在此输入
def get_db_connection():
try:
conn = psycopg2.connect(
dbname=DB_NAME,
user=DB_USER,
password=DB_PASSWORD,
host=DB_HOST,
port=DB_PORT
)
return conn
except Exception as e:
logging.error(f"Database connection error: {e}")
return None
# Handle incoming webhook events from Stripe
@app.route('/webhook', methods=['POST'])
def webhook():
payload = request.get_data(as_text=True)
sig_header = request.headers.get('Stripe-Signature')
try:
event = stripe.Webhook.construct_event(payload, sig_header, endpoint_secret)
except ValueError as e:
# Invalid payload
logging.error(f"Invalid payload: {e}")
return jsonify(success=False), 400
except stripe.error.SignatureVerificationError as e:
# Invalid signature
logging.error(f"Signature verification failed: {e}")
return jsonify(success=False), 400
# Handle the event
if event['type'] == 'checkout.session.completed':
session = event['data']['object']
chat_id = session['client_reference_id']
amount_total = session['amount_total']
# Infer the subscription plan based on the amount
subscription_plan = 'Weekly' if amount_total == 695 else 'Monthly' if amount_total == 1995 else 'Unknown'
# Update the database with subscription information
conn = get_db_connection()
if conn is not None:
cursor = conn.cursor()
try:
cursor.execute(
"UPDATE user_subscription SET subscription_plan = %s WHERE chat_id = %s",
(subscription_plan, chat_id)
)
conn.commit()
logging.info(f"Updated subscription plan for chat_id {chat_id}")
except Exception as e:
logging.error(f"Database error: {e}")
finally:
cursor.close()
conn.close()
else:
logging.error("Failed to connect to the database.")
return jsonify(success=True), 200
# Handle GET requests to retrieve subscription data
@app.route('/subscription/<int:chat_id>', methods=['GET'])
def get_subscription(chat_id):
conn = get_db_connection()
if conn is not None:
cursor = conn.cursor()
try:
cursor.execute(
"SELECT chat_id, subscription_plan FROM user_subscription WHERE chat_id = %s",
(chat_id,)
)
result = cursor.fetchone()
if result:
response = {
"chat_id": result[0],
"subscription_plan": result[1]
}
return jsonify(response), 200
else:
return jsonify({"error": "Subscription not found"}), 404
except Exception as e:
logging.error(f"Database error: {e}")
return jsonify({"error": "Database error"}), 500
finally:
cursor.close()
conn.close()
else:
logging.error("Failed to connect to the database.")
return jsonify({"error": "Database connection error"}), 500
# Index route for testing
@app.route('/')
def index():
return 'Perfectly Working'
if __name__ == '__main__':
from uvicorn import run
run(asgi_app, host='0.0.0.0', port=4242)
这是我在
django
中的 API。变量
try:
checkout_session = stripe.checkout.Session.create(
line_items=[
{
'price': f'{price}',
'quantity': seats,
},
],
payment_method_types=['card', ],
mode='subscription',
success_url=settings.SITE_URL + 'payment?success=true&session_id={CHECKOUT_SESSION_ID}',
cancel_url=settings.SITE_URL + 'payment?canceled=true',
# Here you can send telegramChatID
metadata={
"price_id": f'{price}',
"email": f"{request.user.email}",
"seats": f"{seats}",
"yearly": f"{yearly}",
"monthly": f"{monthly}"
}
)
redirect_url = checkout_session.url
return Response(data=redirect_url, status=status.HTTP_200_OK)
except Exception as e:
return Response({'error': 'Something went wrong when creating stripe checkout session'},
status=status.HTTP_400_BAD_REQUEST)
现在在你的 webhook 中,当
checkout.session.completed
调用时,你可以得到这样的结果:这是我的 API:
class WebhookApiView(APIView):
def post(self, request, *args, **kwargs):
payload = request.data
event_type = payload.get('type')
if event_type == "checkout.session.completed":
seats = payload.get('data', {}).get('object', {}).get('metadata', {}).get('seats')
price_id = payload.get('data', {}).get('object', {}).get('metadata', {}).get('price_id')
email = payload.get('data', {}).get('object', {}).get('metadata', {}).get('email')
yearly = payload.get('data', {}).get('object', {}).get('metadata', {}).get('yearly')
monthly = payload.get('data', {}).get('object', {}).get('metadata', {}).get('monthly')
subscription_id = payload.get('data').get('object')['subscription']
user = User.objects.get(email=email)
# remaining code comes here
如果你想在本地测试它,你必须使用
stripe cli
:
在终端中运行此命令:
stripe listen --forward-to http://0.0.0.0:8000/app/webhook/
当结帐会话完成时,它会自动调用您的 webhook API。