我有下面列出的以下客户端,可以使用自定义 protobuf 消息向我的服务器发送请求。我如何使这个 gevent 友好,以便我可以并行连接。现在看来连接是一一串联的,即使我指定每秒创建 50 个用户,创建 50 个用户也需要很长时间。
https://www.gevent.org/intro.html#monkey-patching
import ssl
import struct
import sys
import time
import certifi
from client.kvgmprotobuf import PbUnion_pb2
from gevent import monkey
monkey.patch_socket()
import socket
class FleetSyncClient():
CERTIFICATE = 'mycertificate.pem'
def __init__(self, host):
self.hostname = host
self.ssock = None
def connect_terminal(self):
try:
# Set up the SSL context
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
context.load_cert_chain(certfile=self.CERTIFICATE)
# Open the SSL socket to the server
with socket.create_connection((self.hostname, 443)) as sock:
try:
self.ssock = context.wrap_socket(sock, server_hostname=self.hostname)
print(f"Connected to FarmCentre server: {self.hostname}")
return
except ssl.CertificateError as e:
print("Certificate validation error:", e)
print("Please check the server certifcate")
except ssl.SSLError as e:
print(f"Connection with server {self.hostname} failed: {e}")
except Exception as e:
print("An error occurred:", e)
except Exception as err:
print(f"Connection with server {self.hostname} failed unexpected {err=}, {type(err)=}")
print("Failed to connect to DAS server")
sys.exit(1)
def disconnect_terminal(self):
if self.ssock is not None:
try:
self.ssock.close()
except Exception as err:
print(f"Close connection failed{err=}, {type(err)=}")
sys.exit(1)
def abort(self, text):
print(f"###Error: {text}")
self.disconnect_terminal()
sys.exit(1)
def _send_message(self, data):
try:
# add data length 4 bytes MSB first
data = struct.pack('<I', len(data)) + data
# add two currently not used bytes
data = b'\x00\x00' + data
# add total length 4 bytes MSB first
data = struct.pack('<I', len(data)) + data
# print(binascii.hexlify(data))
####print(f"Sending {len(data)} bytes")
self.ssock.sendall(data)
except socket.error as e:
self.abort(f"Error sending data: {e}")
try:
# receive the response
self.ssock.settimeout(30)
header_bytes = self.ssock.recv(10)
# kvgmprotobuf message length is the last 4 bytes of the header
protobuf_msg_len = struct.unpack('<I', header_bytes[-4:])[0]
protobuf_response = b""
while len(protobuf_response) < protobuf_msg_len:
chunk = self.ssock.recv(1024)
protobuf_response += chunk
# print(f"Protobuf message: {binascii.hexlify(protobuf_response)}")
####print(f"Server response: {len(protobuf_response)} protobuf bytes")
return protobuf_response
except socket.error as e:
self.abort(f"Error receiving response: {e}")
# send kvgmprotobuf message to the server and return the server response kvgmprotobuf message
def send_protobuf_message(self, message):
####Logger.debug(f"sent message:\n{message}")
response_data = self._send_message(message.SerializeToString())
# parse the serialized response bytes to a kvgmprotobuf message
response_msg = PbUnion_pb2.PbUnion()
response_msg.ParseFromString(response_data)
####print(f"{response_msg}")
return response_msg
你应该尽早应用gevent的猴子补丁,并且你需要将它应用到不仅仅是socket(除非你有充分的理由不这样做,然后使用
patch_all()
)
如果您在最顶部执行
import locust
,它将为您修补所有内容(因此您应该删除自己的修补程序)
这里还有一个使用 gRPC 的示例:https://github.com/locustio/locust/tree/master/examples/grpc(我意识到你并没有真正使用 gRPC,但那里可能有一些好的东西无论如何你)
此外(也许您在代码中的其他地方执行此操作),为了获得并发性,您还必须生成单独的 greenlet。也许这是一个很好的起点:https://sdiehl.github.io/gevent-tutorial/