当我将
opentelemetry-instrumentation-psycopg2
库与 psycopg2's ThreadedConnectionPool
一起使用时,我遇到了递归问题。当涉及并发时,这个问题似乎有一定的概率发生,所以我决定重现这个问题。
receivers:
otlp:
protocols:
grpc:
http:
exporters:
logging:
loglevel: debug
jaeger:
endpoint: jaeger-all-in-one:14250
tls:
insecure: true
processors:
batch:
service:
pipelines:
traces:
receivers: [otlp]
exporters: [logging, jaeger]
processors: [batch]
version: "3"
services:
jaeger-all-in-one:
image: jaegertracing/all-in-one:1.42
restart: always
environment:
- COLLECTOR_OTLP_ENABLED=true
ports:
- "16686:16686" # server frontend
- "14268:14268" # HTTP collector
- "14250:14250" # gRPC collector
otel-collector:
image: otel/opentelemetry-collector:0.72.0
restart: always
command: ["--config=/etc/otel-collector-config.yaml"]
volumes:
- ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
ports:
- "4317:4317" # OTLP gRPC receiver
- "4318:4318" # OTLP Http receiver
depends_on:
- jaeger-all-in-one
# database
postgres:
image: postgres:13.2-alpine
environment:
- POSTGRES_USER=root
- POSTGRES_PASSWORD=12345678
- POSTGRES_DB=example
ports:
- "5432:5432"
psycopg2==2.9.7
opentelemetry-instrumentation-psycopg2>=0.33b0
opentelemetry-exporter-otlp>=1.12.0
import logging
import threading
import time
from psycopg2 import pool
import psycopg2
from opentelemetry import trace
from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
# this database use the connection poll
class PostgresDatabasePool:
def __init__(self, minconn, maxconn, host, database, user, password, port):
self.db_pool = pool.ThreadedConnectionPool(
minconn=minconn,
maxconn=maxconn,
host=host,
database=database,
user=user,
password=password,
port=port
)
def execute_query(self, query):
conn = None
cursor = None
try:
conn = self.db_pool.getconn()
cursor = conn.cursor()
cursor.execute(query)
logging.info(f"execute command: {query}")
except Exception as e:
logging.error(f"SQL error: {e}")
finally:
if cursor:
cursor.close()
if conn:
self.db_pool.putconn(conn)
# this database use the default connect
class PostgresDatabase:
def __init__(self, host, database, user, password, port):
self.conn = psycopg2.connect(
host=host,
database=database,
user=user,
password=password,
port=port
)
self.conn.autocommit = True
self.cursor = self.conn.cursor()
def execute_query(self, query):
if not self.cursor:
logging.warning("Please connect first")
return
try:
self.cursor.execute(query)
logging.info(f"execute command: {query}")
except Exception as e:
logging.error(f"SQL error: {e}")
def delete1_with_db_pool(thread_name):
while True:
with tracer.start_as_current_span("delete1_with_db_pool", kind=trace.SpanKind.INTERNAL):
dbPool.execute_query("DELETE FROM public.test1;")
dbPool.execute_query("DELETE FROM public.test1;")
dbPool.execute_query("DELETE FROM public.test1;")
dbPool.execute_query("DELETE FROM public.test1;")
dbPool.execute_query("DELETE FROM public.test1;")
time.sleep(5)
def delete2_with_db_pool(thread_name):
while True:
with tracer.start_as_current_span("delete2_with_db_pool", kind=trace.SpanKind.INTERNAL):
dbPool.execute_query("DELETE FROM public.test2;")
dbPool.execute_query("DELETE FROM public.test2;")
dbPool.execute_query("DELETE FROM public.test2;")
dbPool.execute_query("DELETE FROM public.test2;")
dbPool.execute_query("DELETE FROM public.test2;")
time.sleep(5)
def delete1(thread_name):
while True:
with tracer.start_as_current_span("delete1", kind=trace.SpanKind.INTERNAL):
db.execute_query("DELETE FROM public.test1;")
db.execute_query("DELETE FROM public.test1;")
db.execute_query("DELETE FROM public.test1;")
db.execute_query("DELETE FROM public.test1;")
db.execute_query("DELETE FROM public.test1;")
time.sleep(5)
def delete2(thread_name):
while True:
with tracer.start_as_current_span("delete2", kind=trace.SpanKind.INTERNAL):
db.execute_query("DELETE FROM public.test2;")
db.execute_query("DELETE FROM public.test2;")
db.execute_query("DELETE FROM public.test2;")
db.execute_query("DELETE FROM public.test2;")
db.execute_query("DELETE FROM public.test2;")
time.sleep(5)
# tracing
resource = Resource(attributes={SERVICE_NAME: 'Demo_Bug'})
provider = TracerProvider(resource=resource)
otlp_exporter = OTLPSpanExporter(endpoint='localhost:4317', insecure=True)
provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("test", "1.0.0")
# Psycopg2
Psycopg2Instrumentor().instrument()
# init database
dbPool = PostgresDatabasePool(1, 5, "localhost", "example", "root", "12345678", 5432)
db = PostgresDatabase("localhost", "example", "root", "12345678", 5432)
# create table
db.execute_query("CREATE TABLE IF NOT EXISTS public.test1 (id serial NOT NULL , text varchar(64) NOT NULL);")
db.execute_query("CREATE TABLE IF NOT EXISTS public.test2 (id serial NOT NULL , text varchar(64) NOT NULL);")
# init thread
threads = [threading.Thread(target=delete1_with_db_pool, args=("Thread-1",)), threading.Thread(target=delete2_with_db_pool, args=("Thread-2",)),
threading.Thread(target=delete1, args=("Thread-3",)), threading.Thread(target=delete2, args=("Thread-4",))]
# start
for thread in threads:
thread.start()
# wait
for thread in threads:
thread.join()
docker-compose up -d
pip install -r requirements.txt
python index.py
我怀疑问题出在我们使用
pool.ThreadedConnectionPool()
库中的 psycopg2
函数来重用数据库连接。看来 opentelemetry-instrumentation-psycopg2 还没有采取这种情况。
我已在 GitHub opentelemetry-python-contrib 存储库上发布了此问题。
当我切换Python版本并运行相同的代码进行测试后,最终的测试结果发生了变化。
如果使用3.6到3.8之间的Python版本进行测试,将会遇到递归问题。但是,如果您使用3.9和3.11之间的版本,则不会出现递归问题。很奇怪。
这是我切换Python版本后的测试结果:
我怀疑根本原因是Python版本,如果你想使用
psycopg2's ThreadedConnectionPool
,请确保你的Python版本在3.9.x以上;否则,您将遇到递归问题。