Telegram ChatID 在 Stripe webhooks 中作为客户端参考 ID 传递

问题描述 投票:0回答:1

我正在开发一个使用 stripe API 进行订阅的电报机器人。我目前面临着在 stripe webhooks 中将电报聊天 ID 作为客户端 ID 传递的问题,这样当我从 stripe 获取数据时,我还可以获取用户的电报聊天 ID 以将其保存到我的数据库中。 我在下面提供了我的烧瓶代码,我不知道我做错了什么,但收到了 Bad Request 400 错误

import logging
from flask import Flask, request, jsonify
import stripe
import psycopg2
from asgiref.wsgi import WsgiToAsgi

# Stripe configuration (use test key for testing)
stripe_api_key = "stripe_api_key"
endpoint_secret = 'endpoint_secret'  

# Database connection details
DB_NAME = "xyz"
DB_USER = "user"
DB_PASSWORD = "0000"
DB_HOST = "localhost"
DB_PORT = "5432"

# Enable logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Flask application
app = Flask(__name__)
asgi_app = WsgiToAsgi(app)  # Wrap the Flask app with WsgiToAsgi adapter

# Database connection function

在此输入


def get_db_connection():
    try:
        conn = psycopg2.connect(
            dbname=DB_NAME,
            user=DB_USER,
            password=DB_PASSWORD,
            host=DB_HOST,
            port=DB_PORT
        )
        return conn
    except Exception as e:
        logging.error(f"Database connection error: {e}")
        return None

# Handle incoming webhook events from Stripe
@app.route('/webhook', methods=['POST'])
def webhook():
    payload = request.get_data(as_text=True)
    sig_header = request.headers.get('Stripe-Signature')

    try:
        event = stripe.Webhook.construct_event(payload, sig_header, endpoint_secret)
    except ValueError as e:
        # Invalid payload
        logging.error(f"Invalid payload: {e}")
        return jsonify(success=False), 400
    except stripe.error.SignatureVerificationError as e:
        # Invalid signature
        logging.error(f"Signature verification failed: {e}")
        return jsonify(success=False), 400

    # Handle the event
    if event['type'] == 'checkout.session.completed':
        session = event['data']['object']
        chat_id = session['client_reference_id']
        amount_total = session['amount_total']
        
        # Infer the subscription plan based on the amount
        subscription_plan = 'Weekly' if amount_total == 695 else 'Monthly' if amount_total == 1995 else 'Unknown'

        # Update the database with subscription information
        conn = get_db_connection()
        if conn is not None:
            cursor = conn.cursor()
            try:
                cursor.execute(
                    "UPDATE user_subscription SET subscription_plan = %s WHERE chat_id = %s",
                    (subscription_plan, chat_id)
                )
                conn.commit()
                logging.info(f"Updated subscription plan for chat_id {chat_id}")
            except Exception as e:
                logging.error(f"Database error: {e}")
            finally:
                cursor.close()
                conn.close()
        else:
            logging.error("Failed to connect to the database.")

    return jsonify(success=True), 200

# Handle GET requests to retrieve subscription data
@app.route('/subscription/<int:chat_id>', methods=['GET'])
def get_subscription(chat_id):
    conn = get_db_connection()
    if conn is not None:
        cursor = conn.cursor()
        try:
            cursor.execute(
                "SELECT chat_id, subscription_plan FROM user_subscription WHERE chat_id = %s",
                (chat_id,)
            )
            result = cursor.fetchone()
            if result:
                response = {
                    "chat_id": result[0],
                    "subscription_plan": result[1]
                }
                return jsonify(response), 200
            else:
                return jsonify({"error": "Subscription not found"}), 404
        except Exception as e:
            logging.error(f"Database error: {e}")
            return jsonify({"error": "Database error"}), 500
        finally:
            cursor.close()
            conn.close()
    else:
        logging.error("Failed to connect to the database.")
        return jsonify({"error": "Database connection error"}), 500

# Index route for testing
@app.route('/')
def index():
    return 'Perfectly Working'

if __name__ == '__main__':
    from uvicorn import run
    run(asgi_app, host='0.0.0.0', port=4242)

python flask stripe-payments telegram python-telegram-bot
1个回答
0
投票

这是我在

django
中的 API。变量

    try:
        checkout_session = stripe.checkout.Session.create(
            line_items=[
                {
                    'price': f'{price}',
                    'quantity': seats,
                },
            ],
            payment_method_types=['card', ],
            mode='subscription',
            success_url=settings.SITE_URL + 'payment?success=true&session_id={CHECKOUT_SESSION_ID}',
            cancel_url=settings.SITE_URL + 'payment?canceled=true',

        # Here you can send telegramChatID
            metadata={
                "price_id": f'{price}',        
                "email": f"{request.user.email}",
                "seats": f"{seats}",
                "yearly": f"{yearly}",
                "monthly": f"{monthly}"
            }
        )

        redirect_url = checkout_session.url
        return Response(data=redirect_url, status=status.HTTP_200_OK)
    except Exception as e:
        return Response({'error': 'Something went wrong when creating stripe checkout session'},
                        status=status.HTTP_400_BAD_REQUEST)

现在在你的 webhook 中,当

checkout.session.completed
调用时,你可以得到这样的结果:这是我的 API:

class WebhookApiView(APIView):

    def post(self, request, *args, **kwargs):
        payload = request.data

        event_type = payload.get('type')
        if event_type == "checkout.session.completed":
            seats = payload.get('data', {}).get('object', {}).get('metadata', {}).get('seats')
            price_id = payload.get('data', {}).get('object', {}).get('metadata', {}).get('price_id')
            email = payload.get('data', {}).get('object', {}).get('metadata', {}).get('email')
            yearly = payload.get('data', {}).get('object', {}).get('metadata', {}).get('yearly')
            monthly = payload.get('data', {}).get('object', {}).get('metadata', {}).get('monthly')
            subscription_id = payload.get('data').get('object')['subscription']
            user = User.objects.get(email=email)
            # remaining code comes here

如果你想在本地测试它,你必须使用

stripe cli
: 在终端中运行此命令:
 stripe listen --forward-to http://0.0.0.0:8000/app/webhook/
当结帐会话完成时,它会自动调用您的 webhook API。

© www.soinside.com 2019 - 2024. All rights reserved.