您好,我正在使用 Locust 库来加载测试服务器,该服务器侦听 TCP/IP 套接字请求并期待自定义 protobuf 消息。我有一个向服务器发送消息的模拟器,通过 Locust,我可以创建多个实例并同时发送请求。然而,我的请求的统计数据没有显示在我的仪表板上,并且
@events.request.add_listener
也不起作用,但用户正在生成并发出我的服务器收到的请求。我缺少什么?如果我使用原始套接字,如何在 localhost:8089 上的仪表板上获取统计信息?我在下面列出了 locustfile 和我的客户端类。
locustfile.py
from locust import User, task
from client.protobuf import ProtobufMsg
from helpers.fleetsync_excel import ConfigFile, ExcelData
from helpers.fleetsync_general import GpsCollection, Machine
from helpers.gps_generator import Field, Route
from locust_fleetsync_wrapper import FleetSyncLocustClient
class FleetSyncUser(User):
"""
A minimal Locust user class that provides an FleetSyncClient to its subclasses
"""
abstract = True # dont instantiate this as an actual user when running Locust
def __init__(self, environment):
super().__init__(environment)
self.client = FleetSyncLocustClient(self.host, request_event=environment.events.request)
self.excel_data_helper = ExcelData()
self.config_file = ConfigFile()
self.machine = self.start_machine("farmcentredummy")
def start_machine(self, user_name) -> Machine:
"""Set the terminal_id and device_id as configured on the selected user sheet.
Reading these from acc server or test server columns using machine name and account id"""
row = self.config_file.get_randow_row_from_sheet(user_name)
machine_type = row[1]
terminal_id = row[4]
device_id = row[5]
if machine_type is None or terminal_id is None or device_id is None:
raise ValueError(f"terminal ID and/or device ID missing for machine '{machine_type}', '{terminal_id}', '{device_id}'"
f"in {self.config_file.NAME} sheet:{user_name}")
print(f"{machine_type} with terminal_id:{terminal_id} and device_id:{device_id} started")
return Machine(machine_type, terminal_id, device_id)
class Terminal(FleetSyncUser):
host = "<my-url.com>"
def on_start(self):
self.client.connect_terminal()
@task
def simulate_route(self):
"""Simulate gps route configured in ts_config sheet 'route'"""
gpsroute_config = self.config_file.get_row_from_sheet("route", "field1")
route = Route(self.machine, gpsroute_config)
gps_coordinates = route.generate_gps_coordinates()
self.send_gps(gps_coordinates)
self.send_sovs(route.speed_handler.sovs)
@task
def simulate_field(self):
"""Simulate gps and speed on field configured in ts_config sheet 'field'"""
gpsfield_config = self.config_file.get_row_from_sheet("field", "datapoint")
field = Field(self.machine, gpsfield_config)
gps_coordinates = field.generate_gps_coordinates_and_speed_sov()
self.send_gps(gps_coordinates)
self.send_sovs(field.speed_handler.sovs)
@task
def simulate_gps_data(self):
"""Simulate gps data configured in ts_config sheet 'gps'"""
gps_config = self.config_file.get_row_from_sheet("gps", "real-tractor")
gps_coordinates = self.excel_data_helper.read_gps_coordinates(gps_config[1], gps_config[2])
self.send_gps(gps_coordinates)
def send_gps(self, gps_coordinates: GpsCollection):
pb_msg = ProtobufMsg(self.machine.terminal_id, self.machine.device_id)
for gps in gps_coordinates.records:
pb_msg.add_gps_coordinate(gps)
self.send_message(pb_msg)
def send_sovs(self, sovs):
pb_msg = ProtobufMsg(self.machine.terminal_id, self.machine.device_id)
for sov in sovs.records:
pb_msg.add_sov_data(sov)
self.send_message(pb_msg)
def send_message(self, pb_msg: ProtobufMsg):
# when datapoints reaches limit 250 the message must be sent (data sent via several messages)
if pb_msg.datapoints >= ProtobufMsg.datapoints_limit:
response = self.client.send_protobuf_message(pb_msg.msg)
print(f"Terminal: {self.machine.terminal_id} received: {response}")
pb_msg.datapoints = 0
pb_msg.update_request_msg()
locust_fleetsync_wrapper.py
import time
from client.fleetsync_client import FleetSyncClient
class FleetSyncLocustClient(FleetSyncClient):
def __init__(self, host, request_event):
super().__init__(host)
self._request_event = request_event
def __getattr__(self, name):
print("running __getattr__")
func = FleetSyncClient.__dict__[name]
def wrapper(*args, **kwargs):
request_meta = {
"request_type": "socket",
"name": name,
"start_time": time.time(),
"response_length": 0, # calculating this for an xmlrpc.client response would be too hard
"response": None,
"context": {}, # see HttpUser if you actually want to implement contexts
"exception": None,
}
start_perf_counter = time.perf_counter()
try:
request_meta["response"] = func(*args, **kwargs)
except Exception as e:
request_meta["exception"] = e
request_meta["response_time"] = (time.perf_counter() - start_perf_counter) * 1000
self._request_event.fire(**request_meta) # This is what makes the request actually get logged in Locust
return request_meta["response"]
return wrapper
fleetsync_client
import ssl
import struct
import sys
import time
import certifi
from client.kvgmprotobuf import PbUnion_pb2
import socket
class FleetSyncClient():
CERTIFICATE = '<my_certificate_file>'
def __init__(self, host):
self.hostname = host
self.ssock = None
def connect_terminal(self):
try:
# Set up the SSL context
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
context.load_cert_chain(certfile=self.CERTIFICATE)
# Open the SSL socket to the server
with socket.create_connection((self.hostname, 443)) as sock:
try:
self.ssock = context.wrap_socket(sock, server_hostname=self.hostname)
print(f"Connected to FarmCentre server: {self.hostname}")
return
except ssl.CertificateError as e:
print("Certificate validation error:", e)
print("Please check the server certifcate")
except ssl.SSLError as e:
print(f"Connection with server {self.hostname} failed: {e}")
except Exception as e:
print("An error occurred:", e)
except Exception as err:
print(f"Connection with server {self.hostname} failed unexpected {err=}, {type(err)=}")
print("Failed to connect to server")
sys.exit(1)
def disconnect_terminal(self):
if self.ssock is not None:
try:
self.ssock.close()
except Exception as err:
print(f"Close connection failed{err=}, {type(err)=}")
sys.exit(1)
def abort(self, text):
print(f"###Error: {text}")
self.disconnect_terminal()
sys.exit(1)
def _send_message(self, data):
try:
# add data length 4 bytes MSB first
data = struct.pack('<I', len(data)) + data
# add two currently not used bytes
data = b'\x00\x00' + data
# add total length 4 bytes MSB first
data = struct.pack('<I', len(data)) + data
# print(binascii.hexlify(data))
####print(f"Sending {len(data)} bytes")
self.ssock.sendall(data)
except socket.error as e:
self.abort(f"Error sending data: {e}")
try:
# receive the response
self.ssock.settimeout(30)
header_bytes = self.ssock.recv(10)
# kvgmprotobuf message length is the last 4 bytes of the header
protobuf_msg_len = struct.unpack('<I', header_bytes[-4:])[0]
protobuf_response = b""
while len(protobuf_response) < protobuf_msg_len:
chunk = self.ssock.recv(1024)
protobuf_response += chunk
# print(f"Protobuf message: {binascii.hexlify(protobuf_response)}")
####print(f"Server response: {len(protobuf_response)} protobuf bytes")
return protobuf_response
except socket.error as e:
self.abort(f"Error receiving response: {e}")
# send kvgmprotobuf message to the server and return the server response kvgmprotobuf message
def send_protobuf_message(self, message):
####Logger.debug(f"sent message:\n{message}")
response_data = self._send_message(message.SerializeToString())
# parse the serialized response bytes to a kvgmprotobuf message
response_msg = PbUnion_pb2.PbUnion()
response_msg.ParseFromString(response_data)
####print(f"{response_msg}")
return response_msg
问题出在 getattr 和包装函数上。它检索返回响应的函数,但它需要 *args 作为秒参数和 self 作为第一个参数
request_meta["response"] = func(self, *args)
然后你需要像这样调用getattr
wrapper = self.client.__getattr__("send_protobuf_message")
response = wrapper(pb_msg.msg)