目录
身份认证组件一般都配置在全局settings中。
# settings.py
# drf框架自定义配置
REST_FRAMEWORK = {
# 认证组件
'DEFAULT_AUTHENTICATION_CLASSES': [
# 'rest_framework.authentication.SessionAuthentication',
# 'rest_framework.authentication.BasicAuthentication'
'rest_framework_jwt.authentication.JSONWebTokenAuthentication'
],
}
在视图类中用authentication_classes类属性配置身份认证模块:
# views.py
from rest_framework_jwt.authentication import JSONWebTokenAuthentication
class UserListViewSet(mixins.ListModelMixin, GenericViewSet):
authentication_classes = [JSONWebTokenAuthentication]
queryset = models.User.objects.filter(is_active=True).all()
serializer_class = serializers.UserModelSerializer
其中BaseAuthentication
是用来自定义身份认证类需要继承的基类。
其他的类是drf默认提前写好的几种身份认证类,可以直接使用:
BasicAuthentication
SessionAuthentication
TokenAuthentication
def get_authorization_header(request):
# 前台在请求头用authorization携带认证字符串token给后台
auth = request.META.get('HTTP_AUTHORIZATION', b'')
# auth == token字符串 将其编码成二进制
if isinstance(auth, str):
# Work around django test client oddness
auth = auth.encode(HTTP_HEADER_ENCODING)
return auth
class BasicAuthentication(BaseAuthentication):
www_authenticate_realm = 'api'
def authenticate(self, request):
auth = get_authorization_header(request).split()
# auth按空格拆分,拆分的列表结果长度为2才合法
if not auth or auth[0].lower() != b'basic':
# 没有token,认证方法直接返回None,代表游客(匿名用户)
return None
# 可以推论出auth拆分的结果为2份,结构为:['basic','token字符串']
if len(auth) == 1:
msg = _('Invalid basic header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid basic header. Credentials string should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)
# 结论:提交了token,格式有误,抛异常,代表非法用户
try:
# 反解token(auth是被拆分的列表,0是头,1是token)
auth_parts = base64.b64decode(auth[1]).decode(HTTP_HEADER_ENCODING).partition(':')
except (TypeError, UnicodeDecodeError, binascii.Error):
msg = _('Invalid basic header. Credentials not correctly base64 encoded.')
raise exceptions.AuthenticationFailed(msg)
# 结论:提交了token,格式有误,抛异常,代表非法用户
userid, password = auth_parts[0], auth_parts[2]
return self.authenticate_credentials(userid, password, request)
# 结论:提交了token,解析成功,返回(user,None)组成的元组,代表合法用户
# 元组0位user会被储存到request.user中,
# 元组1位token会被存储到request.auth中,通常可以不用保存,所以可以用None填充
def authenticate_credentials(self, userid, password, request=None):
"""
Authenticate the userid and password against username and password
with optional request for context.
"""
credentials = {
get_user_model().USERNAME_FIELD: userid,
'password': password
}
user = authenticate(request=request, **credentials)
if user is None:
raise exceptions.AuthenticationFailed(_('Invalid username/password.'))
if not user.is_active:
raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))
return (user, None)
def authenticate_header(self, request):
return 'Basic realm="%s"' % self.www_authenticate_realm
class SessionAuthentication(BaseAuthentication):
def authenticate(self, request):
# Get the session-based user from the underlying HttpRequest object
user = getattr(request._request, 'user', None)
# Unauthenticated, CSRF validation not required
if not user or not user.is_active:
return None
self.enforce_csrf(request)
# CSRF passed with authenticated user
return (user, None)
class TokenAuthentication(BaseAuthentication):
keyword = 'Token'
model = None
def authenticate(self, request):
auth = get_authorization_header(request).split()
if not auth or auth[0].lower() != self.keyword.lower().encode():
return None
if len(auth) == 1:
msg = _('Invalid token header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid token header. Token string should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)
try:
token = auth[1].decode()
except UnicodeError:
msg = _('Invalid token header. Token string should not contain invalid characters.')
raise exceptions.AuthenticationFailed(msg)
return self.authenticate_credentials(token)
rf-jwt为我们提供了其已经写好的身份认证类:JSONWebTokenAuthentication
特点:
前端在向后端发送请求时需携带的token格式为:
{'Authorization':'jwt abc.def.xyz'} // token需以jwt开头
后端如何配置:
from rest_framework_jwt.authentication import JSONWebTokenAuthentication
class UserListViewSet(ListModelMixin,GenericViewSet):
# ---------------三大认证配置---------------我是分割线
# authentication_classes = [authentications.MyAuthentication]
authentication_classes = [JSONWebTokenAuthentication]
# -----------------正常逻辑-----------------我是分割线
queryset = models.User.objects.filter(is_active=True).all()
serializer_class = serializers.UserModelSerializer
如何自定义身份认证类:
从请求头中拿到前台提交的token(一般从HTTP_AUTHORIZATION中拿,也可以与前台约定)
没有token,返回None,代表游客
有token,进入校验
from rest_framework.authentication import BaseAuthentication
class MyAuthentication(BaseAuthentication):
def authenticate(self, request):
# 不设置任何身份认证规则,直接全部通过,进入下一轮认证(权限认证)
pass
重点:
结论:不管系统默认、或是全局settings配置的是何认证与权限组件,登录接口不用参与任何认证与权限的校验所以,登录接口一定要进行'身份认证'与'权限认证'的局部禁用
# views.py
from rest_framework.views import APIView
class LoginAPIView(APIView):
authentication_classes = []
pagination_class = []
permission_classes = []
def post(self,request,*args,**kwargs):
user_ser = serializers.LoginModelSerializer(data=request.data)
user_ser.is_valid(raise_exception=True)
return APIResponse(results={
'username':user_ser.content.get('user').username,
'token':user_ser.content.get('token')
})
# serializers.py
from rest_framework_jwt.serializers import jwt_payload_handler,jwt_encode_handler
class LoginModelSerializer(serializers.ModelSerializer):
username = serializers.CharField(
max_length=64,
min_length=3
)
password = serializers.CharField(
max_length=64,
min_length=3
)
class Meta:
model = models.User
fields = ['username','password']
# 在全局钩子中完成token的签发
def validate(self, attrs):
# 先从model表中查出user对象
user = self._validate_user(attrs)
# 将user对象包装进载荷中
payload = jwt_payload_handler(user)
# 将载荷签发入token
token = jwt_encode_handler(payload)
# 将对象和token储存进serializer对象中,就可以在视图类中调用
self.content = {
'user':user,
'token':token
}
return attrs
def _validate_user(self,attrs):
username = attrs.get('username')
password = attrs.get('password')
# 多方式登陆
if re.match(r'.*@.*',username):
user = models.User.objects.filter(email=username).first() # type:models.User
elif re.match(r'^1[3-9][0-9]{9}$',username):
user = models.User.objects.filter(mobile=username).first() # type:models.User
else:
user = models.User.objects.filter(username=username).first() # type:models.User
if not user or not user.check_password(password):
raise serializers.ValidationError({'message':'用户信息系有误!请重新登录!'})
return user
一般权限认证不做全局配置,因为每个功能对应不同的权限,不好配。
使用permission_classes类属性配置:
from rest_framework.permissions import IsAdminUser,IsAuthenticated,IsAuthenticatedOrReadOnly,AllowAny
class UserViewSet(ViewSet):
# 配置django默认提供的权限认证模块
permission_classes = [IsAuthenticated]
drf默认提供了几种权限认证模块:
from rest_framework.permissions import IsAuthenticated, IsAdminUser, AllowAny, IsAuthenticatedOrReadOnly
- AllowAny | 游客和登录用户有全权限
- IsAuthenticated | 只有登录用户有全权限
- IsAdminUser | 只有后台用户(admin用户)有全权限
- IsAuthenticatedOrReadOnly | 游客有读权限,登录用户有全权限
如果有特殊需要,需要自定义权限类
如:只有superuser有权限、只有vip用户有权限、只有某ip网段用户有权限、只有某个视图及其子类有权限
如何自定义权限类:
继承BasePermission续写permission类;
重写has_permission方法;
from rest_framework.permissions import BasePermission
class VIPUserPermission(BasePermission):
def has_permission(self, request, view):
for group in request.user.groups.all():
# 存在于vip组即有权限
if group.name.lower() == 'vip':
return True
# 否则没有权限
return False
class UserViewSet(ViewSet):
# 局部配置哪些权限可以执行此操作
permission_classes = [permissions.VIPUserPermission]
def retrieve(self,request,*args,**kwargs):
return APIResponse(results={
'username':request.user.username,
'email':request.user.email,
'mobile':request.user.mobile,
'create_time':request.user.date_joined,
})
需全局与局部配置配合。
局部配置节流模式,全局配置节流模式的流量。
# settings.py
REST_FRAMEWORK = {
# 频率组件:频率类一般做局部配置,但是频率调节在settings中配置
'DEFAULT_THROTTLE_RATES': {
'user': '5/min',
'anon': '3/min',
'mobile': '1/min'
},
}
使用throttle_classes类属性配置:
from rest_framework.viewsets import ViewSet
class UserViewSet(ViewSet):
throttle_classes = []
drf默认提供了几种节流认证模式:
from rest_framework.throttling import AnonRateThrottle,UserRateThrottle,ScopedRateThrottle
- AnonRateThrottle | scope = 'anon'
- UserRateThrottle | scope = 'user'
- ScopedRateThrottle | scope_attr = 'throttle_scope'
如果有特殊需要,需要自定义频率类.
如:对ip进行限次、对电话进行限制、对视图某些信息进行限次.
如何自定义:
继承SimpleRateThrottle节流基类续写throrrle类;
设置scope字符串类属性,同时在settings中进行drf配置DEFAULT_THROTTLE_RATES
重写get_catch_key方法
from rest_framework.throttling import SimpleRateThrottle
class ModileRateThrottle(SimpleRateThrottle):
scope = 'mobile'
def get_cache_key(self, request, view):
# 游客和没有手机号的用户不做限制
if not request.user.is_authenticated or request.user.mobile:
return None
# 设置用户唯一识别码
return self.cache_format % {
'scope':self.scope,
'ident':request.user.mobile
}
from rest_framework.viewsets import ViewSet
class UserViewSet(ViewSet):
# throttle_classes = [UserRateThrottle]
throttle_classes = [throttles.ModileRateThrottle]
def retrieve(self,request,*args,**kwargs):
return APIResponse(results={
'username':request.user.username,
'email':request.user.email,
'mobile':request.user.mobile,
'create_time':request.user.date_joined,
})
手机扫一扫
移动阅读更方便
你可能感兴趣的文章