接口测试开发之:Python3,订单并发性能实战

news/2024/6/17 20:57:44 标签: python, 多线程

小屌丝:鱼哥,我想写一个接口订单并发性能,能不能给我讲一下
小鱼:接口订单并发?我前篇文章不是写过常见并发框架
,然后你在追加一个创建订单和生成订单不就可以了?
小屌丝:鱼哥,你说的可轻松,那你能不能来一个?
小鱼:好吧,那我就以我某个项目为例,我们实际的看一下,都需要哪些步骤。
小屌丝: 鱼哥,就你这一点,最招人稀罕。哈哈!
小鱼:挖草了~~

那么我们就来分析一下,订单并发性能,我们想要什么:
>>1.订单并发数
>>2.成功订单数
>>3.订单成功率
>>4.成功订单总响应时间
>>5.成功订单平均响应时间
>>6.TPS
有了上面我们想要的,那么我们就来分析如何获取这些信息:
并发订单数:即自定义的并发数,根据我们的想法,把并发200次,设置为20个线程,每个循环10次
成功订单数:就是获取响应值为成功的请求,先定义一个success_count ,初始值为0,每成功一次,增加1
订单成功率:成功订单数/总的订单数
成功订单总响应时间:每个成功订单的响应时间之和,所以我们定义一个sum_time,初始值为0.00,然后把每次成功的响应时间加起来
成功订单平均响应时间:成功订单总响应时间/成功订单数
TPS:成功并发数/成功订单平均响应时间
订单响应时间:在请求之前,获取一次时间,在断言成功之后,再次获取一次时间,这样二者之差,就是订单的响应时间。

了解了思路之后,我们就看实际源码:

python"># -*- coding: utf-8 -*-
"""
@ auth : carl_DJ
@ time : 2020-6-11
"""

import hashlib
import threading
from time import *
from datetime import datetime,timedelta
import requests
import json


'''初始化全局变量'''

#自定义全局变量需要的线程数,20
thread_num = 20
#自定义全局变量每个线程需要循环的数量,10
one_worker_num = 10
#设定最开始的总时间
sum_time = 0.00
#设定最开始的成功连接数
success_count = 0

''' 后台登录常规操作'''

username = '13388889999'
password = hashlib.md5(b'123456').hexdigest()  #设置密码,且是md5加密方式
url = "http://www.xxx.com/energy/user/login/"
form_data = {"username":username,"password":password}
login_response = requests.post(url,data=form_data)
c = login_response.cookies

 '''订单发送请求'''

def order():
    #引用全局变量
    global c
    global sum_time
    global success_count
    #获取执行发送订单请求前时间
    t1 = time()
    #设定url、form_data进行创建订单
    url1 = "http://www.xxx.com/energy/create_order/"
    from_data1 = {"restaurant_id":1136,
                  "menu_item_total":'12.00',
                  "menu_item_data": [{'id':2667868,'p':22,'q':3}]
                  }
    make_responst = requests.post(url1,data=from_data1,cookies = c)
    #获取请求结果
    res = make_responst.text
    #结果转换成字典赋值给变量id
    id = json.loads(res)['order_id']
    #断言判断是否提交成功
    assert  id != " "
    su_time =datetime.now()+ timedelta(hours=1)
    
    #设定url、form_data进行生成订单
    url2 = "http://www.xxx.com/energy/place_order/"
    from_data2 = {"restaurant_id": id,
                  "customer_name": 'carl_dj',
                  "mobile_number":username,
                  "delivery_address":"address message",
                  "pay_type":'cash',
                  "preorder":su_time
                  }
    place_responst = requests.post(url2, data=from_data2, cookies=c)
    res = place_responst.text
    #追加断言,判断结果是否有"success",有的话,说明订餐成功
    assert res == " success"
    print("订餐成功")
    #订单成功后,再次获取一下时间
    t2 = time()
    #获取订单的响应时间
    res_time = t2-t1
    #把响应时间写入txt文件
    result = open("E:\Private Folder\res.txt","a")  #路径直接写死,也可用os.path 来写路径
    result.write("成功订单响应时间:" + str(res_time)+ '\n')
    result.close()

    #也可以使用with打开文件,好处是不用关心文件是否关闭
    # with open ("E:\Private Folder\res.txt","a") as result1:
        # print(result1.read())

    #把每次成功订单数累加到全局变量sum_time中
    sum_time  = sum_time + res_time
    #把每次获取的成功订单数做累加,添加到全局变量success_count中
    success_count = success_count +1

'''嵌套指定循环次数的order()函数'''

def working()
    global one_worker_num
    for i in range(0,one_worker_num):
        order()

 '''自定义main()函数,来执行多线程'''
def main():
    global thread_num
    #自定义一个空的数组,用来存放线程组
    threads = []
    #设置循环次数
    for i in range(thread_num):
        #将working()函数存放到线程中
        t = threading.Thread(target=working,name="T"+ str(i))
        #设定守护线程
        t.setDaemon(True)
        threads.append(t)
    #启动循环执行
    for t in threads:
        t.start()
    ##设置阻塞线程
    for t in threads:
        t.join()

if __name__ == "__main__":
    main()
    total_order = thread_num*one_worker_num
    avg_time = sum_time/success_count
    '''执行完之后,需要把数据写入到txt文件中'''
    #订单并发总数
    result.write("并发订单数:"+ str(total_order)+ "\n")
    #成功并发数
    result.write("成功并发数:"+ str(success_count) + "\n")
    #订单成功率
    result.write("订单成功率:"+ str(success_count/total_order*100)+ "%" + "\n")
    #成功订单响应时间
    result.write("成功订单总响应时间:"+ str(sum_time)+"\n")
    #成功订单平均响应时间
    result.write("成功平均响应时间:"+str(sum_time/success_count)+"\n")
    #TPS事务数/秒
    result.write("TPS:"+str(success_count/avg_time) + "\n")  #tps = 并发成功数/平均响应时间
    result.close()


1.这里运用到了str(),
>>是因为响应时间是数字,而写入文件的时候是字符串类型,所以需要把最后的数字通过str()函数进行转化。
2.这里的文件路径是直接写死的,并没有使用os.path获取。
3.打开文件的方式 :open () 或者with open() 都可以,这里两种方法都写了。
>使用open()方法,最后别忘了close(),不然消耗资源…


http://www.niftyadmin.cn/n/1757210.html

相关文章

【计算机网络】服务器,防火墙

1.服务器的不同部署位置: 2.防火墙: 只允许发往特定服务器的特定应用程序的包通过 工作方式:包过滤 包过滤将接收方服务器的IP地址和端口号作为判断条件 3.服务器平衡负载 方法1:将请求分配给多台服务器 负载大的原因&#xff1…

移动app自动化测试工具发展历程--完整版

最近在总结关于移动app的自动化测试的系列文章,本来想在7月份推出这个系列,但是又担心7月份的天气太热,开空调费油, 所以索性,想到哪就整理到哪,持续的推出来吧! 今天先把移动app自动化测试工…

【java笔记】字符流,Properties,序列化,打印流

字符流 字符流字节流编码 package demo04;import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.util.Arrays;//字符流,中文操作更方便 public class Demo01 {public static void main(String[] args) throws IOE…

多进程启动appium服务

因为小鱼在上一章节写了 多台appium的启动, 然后就联想到 能不能搞一个多进程启动appium服务。 于是乎~~ 那就搞起来~ ~ 关于并发的问题,小鱼写过专题文章 《常见并发问题》 《多线程并发》 《多线程并发框架》 但是呢,今天小鱼不写多线程并…

【计算机网络】 从服务器到浏览器,网络包的旅程

1.服务器端通信 服务器的不同模块和所有客户端通信 服务器端的收发操作: (1)创建套接字 (2)等待连接,调用bind将端口号写入套接字。调用listen写入等待连接状态的控制信息, (3&…

【java笔记】进程和线程:线程同步,Lock锁

进程:正在运行的程序 线程:执行路径 单线程程序:记事本 多线程程序:扫雷 多线程的实现方式: package demo06; /* 多线程的实现方法 1.定义Thread类重写run方法 2.创建Thread类的对象 3.启动多线程 */ public class De…

解决adb报错“failed to create fdevent interrupt socketpair: Invalid argument“问题

今天在运行adb命令时,报错了,太nice了: adb.exe F 07-06 09:41:14 xxx 67xx fdevent_poll.cpp:64] failed to create fdevent interrupt socketpair: Invalid argument 看到这个问题,好比熟悉的陌生人!! …

Python实现Appium端口检测与释放

python实现端口检测与释放1. 监测端口1.1 socket是什么?1.2 socket本质是什么?2. 释放端口2.1 cmd 释放端口2.2 Python代码释放端口1. 监测端口 我们要引用的socket模块来校验端口是否被占用。 1.1 socket是什么? 简单一句话:网…