API实例一:
login.py文件
#!/usr/bin/env python
#!coding:utf-8
from flask import Flask,jsonify
from flask_restful import Api,Resource,reqparse
app=Flask(__name__)
api=Api(app)
class LoginView(Resource):
def get(self):
return {'status':0,'msg':'ok','data':'this is a login page'}
def post(self):
parser=reqparse.RequestParser()
parser.add\_argument('username', type=str, required=True, help='用户名不能为空')
parser.add\_argument('password',type=str,required=True,help='账户密码不能为空')
parser.add\_argument('age',type=int,help='年龄必须为正正数')
parser.add\_argument('sex',type=str,help='性别只能是男或者女',choices=\['女','男'\])
args=parser.parse\_args()
return jsonify(args)
api.add_resource(LoginView,'/login',endpoint='login')
if __name__ == '__main__':
app.run(debug=True,port=5000)
requestTest.py文件
import json
import requests
r=requests.get(
url='http://localhost:8888/hi/hello',
params={'name':'wuya'}
)
print('获取请求地址:',r.url)
print(r.text)
print(r.status_code)
print(r.headers)
print(r.content)
print(r.elapsed.total_seconds())
r=requests.post(
url='http://127.0.0.1:5000/login',
headers={'content-type':'application/json'},
data=json.dumps({'username':'lyl','sex':'女','age':'18','password':'123'})
)
'''
1、当请求头的数据格式为application/json的时候,在post请求方法里面,请求参数使用json
2、当请求头的数据格式为application/json的时候,在post请求方法里面,请求参数使用data,但需要对请求参数进行序列
化的处理,如json.dumps(请求参数)
'''
print(r.status_code)
print(r.json())
API实例二:
import json
import requests
'''3、当请求数据格式为application/x-www-form-urlencoded,在post请求方法里面,请求参数使用data'''
r=requests.post(
url='https://www.lagou.com/jobs/v2/positionAjax.json',
params={'needAddtionalResult':False},
data={'first':'False','pn':2,'kd':'测试工程师'},
headers={'content-type':'application/json;charset=UTF-8',
'user-agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36',
'cookie':'user_trace_token=20210906103906-65bbbd6f-9e8e-4321-a96e-5f115f8463f7; Hm_lvt_4233e74dff0ae5bd0a3d81c6ccf756e6=1630895946; _ga=GA1.2.1317827876.1630895947; LGUID=20210906103907-f70589f0-5dda-435c-9eca-075aecdfe049; LG_HAS_LOGIN=1; hasDeliver=0; privacyPolicyPopup=false; sajssdk_2015_cross_new_user=1; showExpriedIndex=1; showExpriedCompanyHome=1; showExpriedMyPublish=1; RECOMMEND_TIP=true; __SAFETY_CLOSE_TIME__22604012=1; _gid=GA1.2.1600152785.1630896153; gate_login_token=9d6759af759749588c6fa28c07a6b883ea1ab9a1c185db1d4eca844493eb3a35; index_location_city=%E5%85%A8%E5%9B%BD; LGSID=20210906172329-26b0ed28-7b04-4e5b-a7fb-5304cf5e43c6; PRE_UTM=m_cf_cpt_baidu_pcbt; PRE_HOST=; PRE_SITE=https%3A%2F%2Fwww.lagou.com%2Fwn%2Fresume%2FregisterComplete; PRE_LAND=https%3A%2F%2Fwww.lagou.com%2Flanding-page%2Fpc%2Fsearch.html%3Futm%5Fsource%3Dm%5Fcf%5Fcpt%5Fbaidu%5Fpcbt; WEBTJ-ID=20210906172604-17bba6dd3ac470-0e283bbb795d0f-c343365-1327104-17bba6dd3ad336; JSESSIONID=ABAAABAABEIABCI0CF59EE5096AF9E27F5DAAEABCD8A6CF; sensorsdata2015session=%7B%7D; sensorsdata2015jssdkcross=%7B%22distinct_id%22%3A%2222604012%22%2C%22first_id%22%3A%2217bb8fa0aea4c0-0bb9a32b499cff-c343365-1327104-17bb8fa0aeb96a%22%2C%22props%22%3A%7B%22%24latest_traffic_source_type%22%3A%22%E7%9B%B4%E6%8E%A5%E6%B5%81%E9%87%8F%22%2C%22%24latest_search_keyword%22%3A%22%E6%9C%AA%E5%8F%96%E5%88%B0%E5%80%BC_%E7%9B%B4%E6%8E%A5%E6%89%93%E5%BC%80%22%2C%22%24latest_referrer%22%3A%22%22%2C%22%24os%22%3A%22Windows%22%2C%22%24browser%22%3A%22Chrome%22%2C%22%24browser_version%22%3A%2292.0.4515.159%22%7D%2C%22%24device_id%22%3A%2217bb8fa0aea4c0-0bb9a32b499cff-c343365-1327104-17bb8fa0aeb96a%22%7D'
}
)
print(r.status_code)
print(json.dumps(r.json(),indent=True,ensure_ascii=False))
API实例三:
add.py
from flask import Flask,make_response,jsonify,abort,request
from flask_restful import Api,Resource
from flask_httpauth import HTTPBasicAuth
from flask import Flask
from flask_jwt import JWT, jwt_required, current_identity
from werkzeug.security import safe_str_cmp
app=Flask(__name__)
app.debug = True
app.config['SECRET_KEY'] = 'super-secret'
api=Api(app=app)
auth=HTTPBasicAuth()
@auth.get_password
def get_password(name):
if name=='admin':
return 'admin'
@auth.error_handler
def authoorized():
return make_response(jsonify({'msg':"请认证"}),403)
books=[
{'id':1,'author':'wuya','name':'Python接口自动化测试实战','done':True},
{'id':2,'author':'无涯','name':'Selenium3自动化测试实战','done':False}
]
class User(object):
def __init__(self, id, username, password):
self.id = id
self.username = username
self.password = password
def \_\_str\_\_(self):
return "User(id='%s')" % self.id
users = [
User(1, 'wuya', 'asd888'),
User(2, 'admin', 'asd888'),
User(3,'share','asd888')
]
username_table = {u.username: u for u in users}
userid_table = {u.id: u for u in users}
def authenticate(username, password):
user = username_table.get(username, None)
if user and safe_str_cmp(user.password.encode('utf-8'), password.encode('utf-8')):
return user
def identity(payload):
user_id = payload['identity']
return userid_table.get(user_id, None)
jwt = JWT(app, authenticate, identity)
class Books(Resource):
# decorators = [auth.login_required]
decorators=[jwt_required()]
def get(self):
return jsonify({'status':0,'msg':'ok','datas':books})
def post(self):
if not request.json:
return jsonify({'status':1001,'msg':'请求参数不是JSON的数据,请检查,谢谢!'})
else:
book = {
'id': books\[-1\]\['id'\] + 1,
'author': request.json.get('author'),
'name': request.json.get('name'),
'done': True
}
books.append(book)
return {'status':1002,'msg': '添加书籍成功','datas':book}
# return jsonify({'status':1002,'msg': '添加书籍成功','datas':book}, 201)
class Book(Resource):
# decorators = [auth.login_required]
decorators = [jwt_required()]
def get(self,book\_id):
book = list(filter(lambda t: t\['id'\] == book\_id, books))
if len(book) == 0:
return jsonify({'status': 1003, 'msg': '很抱歉,您查询的书的信息不存在'})
else:
return jsonify({'status': 0, 'msg': 'ok', 'datas': book})
def put(self,book\_id):
book = list(filter(lambda t: t\['id'\] == book\_id, books))
if len(book) == 0:
return jsonify({'status': 1003, 'msg': '很抱歉,您查询的书的信息不存在'})
elif not request.json:
return jsonify({'status': 1001, 'msg': '请求参数不是JSON的数据,请检查,谢谢!'})
elif 'author' not in request.json:
return jsonify({'status': 1004, 'msg': '请求参数author不能为空'})
elif 'name' not in request.json:
return jsonify({'status': 1005, 'msg': '请求参数name不能为空'})
elif 'done' not in request.json:
return jsonify({'status': 1006, 'msg': '请求参数done不能为空'})
elif type(request.json\['done'\])!=bool:
return jsonify({'status': 1007, 'msg': '请求参数done为bool类型'})
else:
book\[0\]\['author'\] = request.json.get('author', book\[0\]\['author'\])
book\[0\]\['name'\] = request.json.get('name', book\[0\]\['name'\])
book\[0\]\['done'\] = request.json.get('done', book\[0\]\['done'\])
return jsonify({'status': 1008, 'msg': '更新书的信息成功', 'datas': book})
def delete(self,book\_id):
book = list(filter(lambda t: t\['id'\] == book\_id, books))
if len(book) == 0:
return jsonify({'status': 1003, 'msg': '很抱歉,您查询的书的信息不存在'})
else:
books.remove(book\[0\])
return jsonify({'status': 1009, 'msg': '删除书籍成功'})
api.add_resource(Books,'/v1/api/books')
api.add_resource(Book,'/v1/api/book/
if __name__ == '__main__':
app.run(debug=True)
(1)、requestTest.py文件
import request
import json
def login():
r=requests.post(
url='http://localhost:5000/auth',
json={"username":"wuya","password":"asd888"})
# print(r.json())
return r.json()['access_token']
def getAllBooks():
r=requests.get(
url='http://localhost:5000/v1/api/books',
headers={'Authorization':'jwt {0}'.format(login())})
print(r.text)
def addBook():
r=requests.post(
url='http://localhost:5000/v1/api/books',
json={'name':'接口测试','author':'无涯课堂','done':True},
headers={'Authorization':'jwt {0}'.format(login())}
)
return r.json()['datas']['id']
addBook()
def getBook():
r=requests.get(
url='http://localhost:5000/v1/api/book/{0}'.format(addBook()),
headers={'Authorization':'jwt {0}'.format(login())}
)
print(r.text)
def setBook():
r = requests.put(
url='http://localhost:5000/v1/api/book/{0}'.format(addBook()),
headers={'Authorization': 'jwt {0}'.format(login())},
json={'name': '接口测试', 'author': '无涯课堂', 'done': True}
)
print(json.dumps(r.json(),indent=True,ensure_ascii=False))
def delBook():
r = requests.delete(
url='http://localhost:5000/v1/api/book/{0}'.format(addBook()),
headers={'Authorization': 'jwt {0}'.format(login())},
)
print(json.dumps(r.json(),indent=True,ensure_ascii=False))
delBook()
'''
动态参数解决思路:
1、执行上个接口后,拿到授权的返回值(token)
2、把获取到的上个接口的返回值传给下个接口的输入部分(请求头&请求参数)
'''
(2)test_pytest_api.py文件:
import requests
import json
import pytest
url='http://localhost:5000'
def token():
r=requests.post(
url=url+'/auth',
json={'username':'wuya','password':'asd888'}
)
return r.json()['access_token']
def getHeaders():
return {'Authorization': 'jwt {0}'.format(token())}
def allBooks():
r = requests.get(
url=url + '/v1/api/books',
headers=getHeaders()
)
return r
def addBook():
r = requests.post(
url=url + '/v1/api/books',
json={'name': '接口测试', 'author': '无涯课堂', 'done': True},
headers=getHeaders()
)
json.dump(r.json()['datas']['id'],open('bookID','w'))
return r
def getBookID():
return json.load(open('bookID'))
def getBook():
'''查看数据信息'''
r = requests.get(
url=url + '/v1/api/book/{0}'.format(getBookID()),
headers=getHeaders()
)
return r
def delBook():
r = requests.delete(
url=url + '/v1/api/book/{0}'.format(getBookID()),
headers=getHeaders()
)
return r
def test_get_book():
'''测试点:查看新添加书记的信息'''
addBook()
r=getBook()
delBook()
assert r.status_code==200
assert r.json()['status']==0
assert r.json()['datas'][0]['id']==int(getBookID())
def test_add_book():
'''测试点:查看新添加书记的信息'''
r=addBook()
delBook()
assert r.status_code==200
assert r.json()['status']==1002
assert r.json()['datas']['name']=='接口测试'
def test_del_book():
'''测试点:查看新添加书记的信息'''
addBook()
r=delBook()
assert r.status_code==200
assert r.json()['status']==1009
assert len(allBooks().json()['datas'])==2
if __name__ == '__main__':
pytest.main(['-s','-v','test_pytest_api.py'])
(3)、test_unittest_api.py文件
import requests
import json
import unittest
class ApiTest(unittest.TestCase):
url='http://localhost:5000'
def token(self):
r=requests.post(
url=self.url+'/auth',
json={'username':'wuya','password':'asd888'}
)
return r.json()\['access\_token'\]
def getHeaders(self):
return {'Authorization': 'jwt {0}'.format(self.token())}
def allBooks(self):
r = requests.get(
url=self.url + '/v1/api/books',
headers=self.getHeaders()
)
return r
def addBook(self):
r = requests.post(
url=self.url + '/v1/api/books',
json={'name': '接口测试', 'author': '无涯课堂', 'done': True},
headers=self.getHeaders()
)
json.dump(r.json()\['datas'\]\['id'\],open('bookID','w'))
return r
def getBookID(self):
return json.load(open('bookID'))
def getBook(self):
'''查看数据信息'''
r = requests.get(
url=self.url + '/v1/api/book/{0}'.format(self.getBookID()),
headers=self.getHeaders()
)
return r
def delBook(self):
r = requests.delete(
url=self.url + '/v1/api/book/{0}'.format(self.getBookID()),
headers=self.getHeaders()
)
return r
def test\_get\_book(self):
'''测试点:查看书记的信息'''
self.addBook()
r=self.getBook()
self.delBook()
self.assertEqual(r.status\_code,200)
self.assertEqual(r.json()\['status'\],0)
self.assertEqual(r.json()\['datas'\]\['name'\],'接口测试')
def test\_add\_book(self):
'''测试点:查看新添加书记的信息'''
r=self.addBook()
self.delBook()
self.assertEqual(r.status\_code,200)
self.assertEqual(r.json()\['status'\],1002)
self.assertEqual(r.json()\['datas'\]\['name'\],'接口测试')
def test\_del\_book(self):
'''测试点:查看删除书记的信息'''
self.addBook()
r=self.delBook()
self.assertEqual(r.status\_code,200)
self.assertEqual(r.json()\['status'\],1009)
self.assertEqual(len(self.allBooks().json()\['datas'\]),2)
手机扫一扫
移动阅读更方便
你可能感兴趣的文章