使用原始套接字请求的Python Locust

问题描述 投票:0回答:1

您好,我正在使用 Locust 库来加载测试服务器,该服务器侦听 TCP/IP 套接字请求并期待自定义 protobuf 消息。我有一个向服务器发送消息的模拟器,通过 Locust,我可以创建多个实例并同时发送请求。然而,我的请求的统计数据没有显示在我的仪表板上,并且

@events.request.add_listener
也不起作用,但用户正在生成并发出我的服务器收到的请求。我缺少什么?如果我使用原始套接字,如何在 localhost:8089 上的仪表板上获取统计信息?我在下面列出了 locustfile 和我的客户端类。

locustfile.py

from locust import User, task

from client.protobuf import ProtobufMsg
from helpers.fleetsync_excel import ConfigFile, ExcelData
from helpers.fleetsync_general import GpsCollection, Machine
from helpers.gps_generator import Field, Route
from locust_fleetsync_wrapper import FleetSyncLocustClient


class FleetSyncUser(User):
    """
    A minimal Locust user class that provides an FleetSyncClient to its subclasses
    """

    abstract = True  # dont instantiate this as an actual user when running Locust

    def __init__(self, environment):
        super().__init__(environment)
        self.client = FleetSyncLocustClient(self.host, request_event=environment.events.request)

        self.excel_data_helper = ExcelData()
        self.config_file = ConfigFile()
        self.machine = self.start_machine("farmcentredummy")


    def start_machine(self, user_name) -> Machine:
        """Set the terminal_id and device_id as configured on the selected user sheet.
        Reading these from acc server or test server columns using machine name and account id"""

        row = self.config_file.get_randow_row_from_sheet(user_name)

        machine_type = row[1]
        terminal_id = row[4]
        device_id = row[5]

        if machine_type is None or terminal_id is None or device_id is None:
            raise ValueError(f"terminal ID and/or device ID missing for machine '{machine_type}', '{terminal_id}', '{device_id}'"
                        f"in {self.config_file.NAME} sheet:{user_name}")
        
        print(f"{machine_type} with terminal_id:{terminal_id} and device_id:{device_id} started")
        return Machine(machine_type, terminal_id, device_id)





class Terminal(FleetSyncUser):

    host = "<my-url.com>"

    def on_start(self):
        self.client.connect_terminal()

    
    @task
    def simulate_route(self):
        """Simulate gps route configured in ts_config sheet 'route'"""
        gpsroute_config = self.config_file.get_row_from_sheet("route", "field1")
        route = Route(self.machine, gpsroute_config)
        gps_coordinates = route.generate_gps_coordinates()
        self.send_gps(gps_coordinates)
        self.send_sovs(route.speed_handler.sovs)


    
    @task
    def simulate_field(self):
        """Simulate gps and speed on field configured in ts_config sheet 'field'"""
        gpsfield_config = self.config_file.get_row_from_sheet("field", "datapoint")
        field = Field(self.machine, gpsfield_config)
        gps_coordinates = field.generate_gps_coordinates_and_speed_sov()
        self.send_gps(gps_coordinates)
        self.send_sovs(field.speed_handler.sovs)


    
    @task
    def simulate_gps_data(self):
        """Simulate gps data configured in ts_config sheet 'gps'"""
        gps_config = self.config_file.get_row_from_sheet("gps", "real-tractor")

        gps_coordinates = self.excel_data_helper.read_gps_coordinates(gps_config[1], gps_config[2])
        self.send_gps(gps_coordinates)
    
    



    def send_gps(self, gps_coordinates: GpsCollection):
        pb_msg = ProtobufMsg(self.machine.terminal_id, self.machine.device_id)
        for gps in gps_coordinates.records:
            pb_msg.add_gps_coordinate(gps)
            self.send_message(pb_msg)
        


    def send_sovs(self, sovs):
        pb_msg = ProtobufMsg(self.machine.terminal_id, self.machine.device_id)
        for sov in sovs.records:
            pb_msg.add_sov_data(sov)
            self.send_message(pb_msg)


    def send_message(self, pb_msg: ProtobufMsg):
        # when datapoints reaches limit 250 the message must be sent (data sent via several messages)
        if pb_msg.datapoints >= ProtobufMsg.datapoints_limit:
                response = self.client.send_protobuf_message(pb_msg.msg)
                print(f"Terminal: {self.machine.terminal_id} received: {response}")
                pb_msg.datapoints = 0
                pb_msg.update_request_msg()

locust_fleetsync_wrapper.py

import time
from client.fleetsync_client import FleetSyncClient


class FleetSyncLocustClient(FleetSyncClient):
    


    def __init__(self, host, request_event):
        super().__init__(host)
        self._request_event = request_event

    def __getattr__(self, name):
        print("running __getattr__")
        func = FleetSyncClient.__dict__[name]

        def wrapper(*args, **kwargs):
            request_meta = {
                "request_type": "socket",
                "name": name,
                "start_time": time.time(),
                "response_length": 0,  # calculating this for an xmlrpc.client response would be too hard
                "response": None,
                "context": {},  # see HttpUser if you actually want to implement contexts
                "exception": None,
            }
            start_perf_counter = time.perf_counter()
            try:
                request_meta["response"] = func(*args, **kwargs)
            except Exception as e:
                request_meta["exception"] = e
            request_meta["response_time"] = (time.perf_counter() - start_perf_counter) * 1000
            self._request_event.fire(**request_meta)  # This is what makes the request actually get logged in Locust
            return request_meta["response"]

        return wrapper

fleetsync_client

import ssl
import struct
import sys
import time
import certifi
from client.kvgmprotobuf import PbUnion_pb2
import socket

class FleetSyncClient():
    CERTIFICATE = '<my_certificate_file>'
    

    def __init__(self, host):

        self.hostname = host
        self.ssock = None

                

    
    def connect_terminal(self):

        try:
            # Set up the SSL context
            context = ssl.create_default_context()
            context.check_hostname = False
            context.verify_mode = ssl.CERT_NONE   
            context.load_cert_chain(certfile=self.CERTIFICATE)

            # Open the SSL socket to the server
            with socket.create_connection((self.hostname, 443)) as sock:
                try:
                    self.ssock = context.wrap_socket(sock, server_hostname=self.hostname)
                    print(f"Connected to FarmCentre server: {self.hostname}")
                    return
                except ssl.CertificateError as e:
                    print("Certificate validation error:", e)
                    print("Please check the server certifcate")
                except ssl.SSLError as e:
                    print(f"Connection with server {self.hostname} failed: {e}")
                except Exception as e:
                    print("An error occurred:", e)
        except Exception as err:
            print(f"Connection with server {self.hostname} failed unexpected {err=}, {type(err)=}")
        print("Failed to connect to server")
        sys.exit(1)

    
    def disconnect_terminal(self):
        if self.ssock is not None:
            try:
                self.ssock.close()
            except Exception as err:
                print(f"Close connection failed{err=}, {type(err)=}")
                sys.exit(1)


    def abort(self, text):
        print(f"###Error: {text}")
        self.disconnect_terminal()
        sys.exit(1)




    def _send_message(self, data):
        try:
            # add data length 4 bytes MSB first
            data = struct.pack('<I', len(data)) + data
            # add two currently not used bytes
            data = b'\x00\x00' + data
            # add total length 4 bytes MSB first
            data = struct.pack('<I', len(data)) + data
            # print(binascii.hexlify(data))
            ####print(f"Sending {len(data)} bytes")
            self.ssock.sendall(data)
        except socket.error as e:
            self.abort(f"Error sending data: {e}")
        try:
            # receive the response
            self.ssock.settimeout(30)
            header_bytes = self.ssock.recv(10)
            # kvgmprotobuf message length is the last 4 bytes of the header
            protobuf_msg_len = struct.unpack('<I', header_bytes[-4:])[0]
            protobuf_response = b""
            while len(protobuf_response) < protobuf_msg_len:
                chunk = self.ssock.recv(1024)
                protobuf_response += chunk
            # print(f"Protobuf message: {binascii.hexlify(protobuf_response)}")
            ####print(f"Server response: {len(protobuf_response)} protobuf bytes")
            return protobuf_response
        except socket.error as e:
            self.abort(f"Error receiving response: {e}")

    # send kvgmprotobuf message to the server and return the server response kvgmprotobuf message
    def send_protobuf_message(self, message):
        ####Logger.debug(f"sent message:\n{message}")
        response_data = self._send_message(message.SerializeToString())
        # parse the serialized response bytes to a kvgmprotobuf message
        response_msg = PbUnion_pb2.PbUnion()
        response_msg.ParseFromString(response_data)
        ####print(f"{response_msg}")
        return response_msg
python python-3.x locust
1个回答
0
投票

问题出在 getattr 和包装函数上。它检索返回响应的函数,但它需要 *args 作为秒参数和 self 作为第一个参数

request_meta["response"] = func(self, *args)

然后你需要像这样调用getattr

wrapper = self.client.__getattr__("send_protobuf_message")
response = wrapper(pb_msg.msg)
© www.soinside.com 2019 - 2024. All rights reserved.