locust自定义负载策略。
阅读原文时间:2023年07月08日阅读:2

1.时间峰值策略

每运行一分钟启动100个用户,总运行时间为10分钟

class CustomShape(LoadTestShape):
        # 设置时限
        time_limit = 600
        # 设置产生率
        spawn_rate = 20

        def tick(self):
            '''
            设置 tick()函数
            并在tick()里面调用 get_run_time()方法
            '''

            # 调用get_run_time()方法
            run_time = self.get_run_time()
            # 运行时间在 10分钟之内,则继续执行
            if run_time < self.time_limit:
                uesr_count = round(run_time, -2)
                # 返回user_count,spawn_rate这两个参数
                return (uesr_count, self.spawn_rate)

            return None

2.时间阶段负载策略

class MyCustomShape(LoadTestShape):
    """
        ps:在不同的阶段 具有不同的用户数和 产生率的 图形形状
        time -- 持续时间经过多少秒后,进入到下个阶段
        users -- 总用户数
        spawn_rate -- 产生率,即每秒启动/停止的用户数
    """
    stages = [
        {"time": 10, "users": 10, "spawn_rate": 10},
        {"time": 30, "users": 30, "spawn_rate": 10},
        {"time": 60, "users": 60, "spawn_rate": 10},
        {"time": 200, "users": 120, "spawn_rate": 10},
    ]
    def tick(self):
        run_time = self.get_run_time()
        for stage in self.stages:
            if run_time < stage['time']:
                tick_data = (stage['users'],stage['spawn_rate'])
                return tick_data
        return None

3.逐步施压负载策略

class MyCustomShape(LoadTestShape):
    '''
        step_time -- 逐步加载时间长度
        step_load -- 用户每一步增加的量
        spawn_rate -- 用户在每一步的停止/启动的多少用户数
        time_limit -- 时间限制压测的执行时长
    '''

    # 逐步负载策略每隔30秒新增启动10个用户
    setp_time = 30
    setp_load = 10
    spawn_rate = 10
    time_limit = 300

    def tick(self):
        run_time = self.get_run_time()

        if run_time > self.time_limit:
            return None
        current_step = math.floor(run_time /self.setp_time) +1
        return(current_step * self.setp_load,self.spawn_rate)

4.双波峰负载策略

class MyCustomShape(LoadTestShape):
    '''
        step_time -- 逐步加载时间长度
        step_load -- 用户每一步增加的量
        spawn_rate -- 用户在每一步的停止/启动的多少用户数
        time_limit -- 时间限制压测的执行时长
    '''

    # 逐步负载策略每隔30秒新增启动10个用户
    setp_time = 30
    setp_load = 10
    spawn_rate = 10
    time_limit = 300

    def tick(self):
        run_time = self.get_run_time()

        if run_time > self.time_limit:
            return None
        current_step = math.floor(run_time /self.setp_time) +1
        return(current_step * self.setp_load,self.spawn_rate)