DRF-认证权限频率
阅读原文时间:2022年04月04日阅读:1

目录

前后端混合开发,可以通过HttpResponse对象来设置cookie进而校验登录,现在前后端分离开发,用不到cookie,那么该怎么认证?DRF提供了认证的方法

我们知道在APIView执行的过程中,在dispatch方法中走了三大认证self.initial(request, *args, **kwargs)

def initial(self, request, *args, **kwargs):
    ···
    self.perform_authentication(request)  # 认证
    self.check_permissions(request)    # 权限
    self.check_throttles(request)    # 频率

需求

我们通过登录接口,来模拟认证登录,登录成功返回json字符串,并且携带随机字符串(uuid模拟生成token),通过token随机字符串来判断用户是否登录,登录了就更新token,首次登录就存token;

分析

  • 创建User表
  • 创建UserToken表,和User一对一关系
  • 前端传入用户名,密码
  • 数据库取校验用户信息
  • 校验成功,Token表内新增一条记录,返回给前端json格式字符串,字符串中带一个随机字符串

登录接口

模型

from django.db import models

class User(models.Model):
    username = models.CharField(max_length=32)
    password = models.CharField(max_length=16)
    user_type = models.IntegerField(choices=((1, '超级管理员'), (2, '普通管理员'), (3, '普通用户')))

    def get_code(self):
        self.get_user_type_display()
        print(self.get_user_type_display())

class UserToken(models.Model):
    user = models.OneToOneField(to=User,on_delete=models.CASCADE)
    token = models.CharField(max_length=32)

视图

from rest_framework.response import Response
from rest_framework.viewsets import ViewSet
from rest_framework.decorators import action
from app01 import models

class UserView(ViewSet):
    @action(methods=['POST'], detail=False)
    def login(self, request, *args, **kwargs):
        # 获取数据
        username = request.data.get('username')
        password = request.data.get('password')
        user = models.User.objects.filter(username=username, password=password).first()
        if user:
            # 如果user有值说明登录成功,生产随机字符串,存入数据库,如果重复登录那么就更新随机字符串
            import uuid
            uuid_str = uuid.uuid4()
            # print(type(uuid_str)) # <class 'uuid.UUID'>
            token = str(uuid_str)
            # 如果存在就更新,如果不存在就新增,指定搜索对象,然后defaults指定更新内容
            models.UserToken.objects.update_or_create(user=user,defaults={'token': token} )
            # 返回随机字符串
            return Response({'code': 100, 'msg': '登录成功', 'token': token})
        return Response({'code': 101, 'msg': '登录失败,用户名或密码错误'})

路由

from django.contrib import admin
from django.urls import path,include
from app01 import views
from rest_framework.routers import SimpleRouter
router = SimpleRouter()
router.register('user',views.UserView,'user')
urlpatterns = [
    path('admin/', admin.site.urls),
    path('',include(router.urls))
]

认证

  • 局部使用:写一个认证类,通过authentication_classes参数指定认证类

    class BookView(ModelViewSet):
        # 局部使用
        authentication_classes = [LoginAuth,]
  • 全局使用:写一个认证类,settings.py配置,所有的视图类生效

    REST_FRAMEWORK={
          "DEFAULT_AUTHENTICATION_CLASSES":["app01.auth.LoginAuth",]
              }
  • 局部禁用:authentication_classes = []

我们知道平时生活中,有一些接口是认证后才能调用的,比如我们登录后才能查看个人站点内容等···

在执行视图函数之前执行了认证方法:self.perform_authentication(request)

这里写一个认证demo,只有登录过的才能查看Book表

'''auth.py'''
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from app01 import models

# 写一个类继承BaseAuthentication
class LoginAuth(BaseAuthentication):
    # 重写authenticate方法
    def authenticate(self, request):
        # 获取前端携带的token,token放在哪是自己规定的,比如从查询参数中获取
        token = request.query_params.get('token')
        # 比对随机字符串
        user_token = models.UserToken.objects.filter(token=token).first()
        if user_token:
            # 登录了,返回当前登录用户和token
            return user_token.user,token
        else:
            # 没有登录,抛异常
            raise AuthenticationFailed('您没有登录,请登录')

'''serializer.py'''
from rest_framework import serializers
from app01 import models
class BookSerializer(serializers.ModelSerializer):
    class Meta:
        model = models.Book
        fields = '__all__'
'''models.py'''
class Book(models.Model):
    name = models.CharField(max_length=32)
    price = models.DecimalField(decimal_places=2,max_digits=5)
    author = models.CharField(max_length=32)
'''urls.py'''
from django.contrib import admin
from django.urls import path,include
from app01 import views
from rest_framework.routers import SimpleRouter
router = SimpleRouter()
router.register('user',views.UserView,'user')
router.register('books',views.BookView,'books')
urlpatterns = [
    path('admin/', admin.site.urls),
    path('',include(router.urls))
]
  • 返回的user_token和token值可以通过在视图类里重写list方法拿到

    '''views.py'''
    from .auth import LoginAuth
    class BookView(ModelViewSet):
        # 局部使用
        authentication_classes = [LoginAuth,]
        queryset = models.Book.objects.all()
        serializer_class = serializer.BookSerializer
    def list(self, request, *args, **kwargs):
        print(request.user) # User object (1)
        print(request.user.username) # HammerZe
        print(request.auth) # de914129-2f08-41a4-a7a9-de289badb659
        return super().list(request, *args, **kwargs)</code></pre></li>

总结

  • 返回的第一个(user_token.user),给了request.user,就是当前登录用户对象
  • 返回的第二个(token),给了request.auth,就是token串
  • 局部禁用和全局配置使用的时候要注意,全局如果认证的时候是每个视图函数都认证,就比如登录视图认证登录,那么就死循环了,不认证不能登录,就相当于做核酸需要核酸单···

和认证一样,都是写一个类去继承,写权限继承BasePermission,重写has_permission方法,判断如果有权限,返回True,如果没有权限,返回False

然后局部使用或者全局使用,或局部禁用

作用

  • 权限控制可以限制用户对于视图的访问和对于具体数据对象的访问
  • 认证通过, 可以进行下一步验证 (频率认证)
  • 认证失败, 抛出权限异常结果

使用

  • 局部使用:permission_classes = [UserPermission, ]

  • 全局使用:

    REST_FRAMEWORK={
                "DEFAULT_PERMISSION_CLASSES":["app01.auth.UserPermission",]
            }
  • 局部禁用:permission_classes = []

需求

  • 认证登录成功后,普通用户只能查询一条或所有
  • 管理员登录后才能通过权限认证进行修改,增加,删除操作

权限类

class UserPermission(BasePermission):
    def has_permission(self, request, view):
        # 没有权限的提示信息
        self.message = '您是:%s,没有权限' % request.user.get_user_type_display()
        # 如果有权限,返回True,没有权限返回False
        # 权限类,在认证类之后,request.user有了当前登录用户
        user_type = request.user.user_type
        print(user_type)
        if user_type < 3:  # 只要不是1,2,就没有权限
            return True
        else:
            return False

视图

from .auth import LoginAuth, UserPermission
from rest_framework.mixins import RetrieveModelMixin, DestroyModelMixin, UpdateModelMixin,ListModelMixin,CreateModelMixin
from rest_framework.viewsets import GenericViewSet

class BookView(RetrieveModelMixin,ListModelMixin,GenericViewSet):
    # 局部使用,普通用户登录后只能获取一条或所有
    authentication_classes = [LoginAuth, ]
    queryset = models.Book.objects.all()
    serializer_class = serializer.BookSerializer

class BookDetailView(CreateModelMixin, DestroyModelMixin, UpdateModelMixin, GenericViewSet):
    # 局部使用,普通用户没有权限
    authentication_classes = [LoginAuth, ]
    permission_classes = [UserPermission, ]
    queryset = models.Book.objects.all()
    serializer_class = serializer.BookSerializer

路由

from rest_framework.routers import SimpleRouter
router = SimpleRouter()
router.register('user',views.UserView,'user')
router.register('books',views.BookView,'books')
router.register('bookdetail',views.BookDetailView,'bookdetail')
urlpatterns = [
    path('admin/', admin.site.urls),
    path('',include(router.urls))
]

总结

  • 5个接口分成了俩视图类写
  • BookView:获取所有,获取单条API
  • BookDetailView:删除,修改,新增API
  • 这俩视图都需要登录:authentication_classes = [LoginAuth, ]
  • BookView只要登陆就可以操作
  • BookDetailView必须有权限才能,加了一个权限,permission_classes = [UserPermission, ]

步骤

  • 第一步:写一个类,继承BasePermission,重写has_permission,判断如果有权限,返回True,如果没有权限,返回False
  • 第二步:局部使用和全局使用

注意

  • 如果使用ModelViewSet快速写五个接口,那么在验证认证和权限的时候就会错乱,获取和修改等操作都在一个视图里了,分开写会好一点

作用

  • 限制视图接口被访问的频率次数
  • 限制条件 : IP、ID、唯一键
  • 频率周期 : 时(h)、分(m)、秒(s)
  • 频率次数 : [num] / s
  • 没有达到限制频率可正常访问接口
  • 达到了频率限制次数, 在限制时间内不能进行访问, 超过时间后可以正常访问

使用

频率类

# 频率类
class IPThrottle(SimpleRateThrottle):
    scope = 'ip'

    # get_cache_key返回什么就以什么方法做限制,限制条件必须唯一,比如用户id
    def get_cache_key(self, request, view):
        # 限制ip地址,从request.META字典中获取ip
        '''
        request.META:请求头中的数据
        '''
        return request.META.get('REMOTE_ADDR')  # 客户端ip

配置文件

REST_FRAMEWORK={
    'DEFAULT_THROTTLE_RATES': {
        'ip': '3/m'  # minute_3是scope的字符串,一分钟访问3次
}

局部使用

class BookView(RetrieveModelMixin, ListModelMixin, GenericViewSet):
    authentication_classes = [LoginAuth, ] # 登录认证
    permission_classes = [UserPermission, ] # 权限限制
    throttle_classes = [IPThrottle, ]  # 频率限制

    queryset = models.Book.objects.all()
    serializer_class = serializer.BookSerializer

全局使用

REST_FRAMEWORK={

    'DEFAULT_THROTTLE_CLASSES': (  # 全局配置频率类
        'app01.auth.IPThrottle'
    ),
          }

总结

  1. 写一个类,继承SimpleRateThrottle,重写类属性scope,scope值自定义,配置文件中一致就行,重写get_cache_key方法,返回什么限制什么
  2. 在配置文件中配置,限制频率
  3. 局部/全局使用

模型

from django.db import models

# Create your models here.

class User(models.Model):
    username = models.CharField(max_length=32)
    password = models.CharField(max_length=16)
    user_type = models.IntegerField(choices=((1, '超级管理员'), (2, '普通管理员'), (3, '普通用户')))

class UserToken(models.Model):
    user = models.OneToOneField(to=User,on_delete=models.CASCADE)
    token = models.CharField(max_length=32)

class Book(models.Model):
    name = models.CharField(max_length=32)
    price = models.DecimalField(decimal_places=2,max_digits=5)
    author = models.CharField(max_length=32)

视图

from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.viewsets import ViewSet

from app01 import models
from app01 import serializer

class UserView(ViewSet):
    @action(methods=['POST'], detail=False)
    def login(self, request, *args, **kwargs):
        # 获取数据
        username = request.data.get('username')
        password = request.data.get('password')
        user = models.User.objects.filter(username=username, password=password).first()
        if user:
            # 如果user有值说明登录成功,生产随机字符串,存入数据库,如果重复登录那么就更新随机字符串
            import uuid
            uuid_str = uuid.uuid4()
            # print(type(uuid_str)) # <class 'uuid.UUID'>
            token = str(uuid_str)
            # 如果存在就更新,如果不存在就新增,指定搜索对象,然后defaults指定更新内容
            models.UserToken.objects.update_or_create(user=user, defaults={'token': token})
            # 返回随机字符串
            return Response({'code': 100, 'msg': '登录成功', 'token': token})
        return Response({'code': 101, 'msg': '登录失败,用户名或密码错误'})

from .auth import LoginAuth, UserPermission, IPThrottle

from rest_framework.mixins import RetrieveModelMixin, DestroyModelMixin, UpdateModelMixin, ListModelMixin, \
    CreateModelMixin
from rest_framework.viewsets import GenericViewSet

class BookView(RetrieveModelMixin, ListModelMixin, GenericViewSet):
    # 局部使用,普通用户登录后只能获取一条或所有
    authentication_classes = [LoginAuth, ]
    throttle_classes = [IPThrottle, ]
    queryset = models.Book.objects.all()
    serializer_class = serializer.BookSerializer

class BookDetailView(CreateModelMixin, DestroyModelMixin, UpdateModelMixin, GenericViewSet):
    # 局部使用,普通用户没有权限
    authentication_classes = [LoginAuth, ]
    permission_classes = [UserPermission, ]
    queryset = models.Book.objects.all()
    serializer_class = serializer.BookSerializer

序列化器

from rest_framework import serializers
from app01 import models
class BookSerializer(serializers.ModelSerializer):
    class Meta:
        model = models.Book
        fields = '__all__'

认证权限频率类

from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from rest_framework.permissions import BasePermission
from rest_framework.throttling import SimpleRateThrottle

from app01 import models

# 认证类
class LoginAuth(BaseAuthentication):
    # 重写authenticate方法
    def authenticate(self, request):
        # 获取前端携带的token,token放在哪是自己规定的,比如从查询参数中获取
        token = request.query_params.get('token')
        # 比对随机字符串
        user_token = models.UserToken.objects.filter(token=token).first()
        if user_token:
            # 登录了,返回当前登录用户和token
            return user_token.user, token
        else:
            # 没有登录,抛异常
            raise AuthenticationFailed('您没有登录,请登录')

# 权限类
class UserPermission(BasePermission):
    def has_permission(self, request, view):
        # 没有权限的提示信息
        self.message = '您是:%s,没有权限' % request.user.get_user_type_display()
        # 如果有权限,返回True,没有权限返回False
        # 权限类,在认证类之后,request.user有了当前登录用户
        user_type = request.user.user_type
        print(user_type)
        if user_type < 3:  # 只要不是1,2,就没有权限
            return True
        else:
            return False

# 频率类
class IPThrottle(SimpleRateThrottle):
    scope = 'ip'

    # get_cache_key返回什么就以什么方法做限制,限制条件必须唯一,比如用户id
    def get_cache_key(self, request, view):
        # 限制ip地址,从request.META字典中获取ip
        '''
        request.META:请求头中的数据
        '''
        return request.META.get('REMOTE_ADDR')  # 客户端ip

配置文件

REST_FRAMEWORK={
    'DEFAULT_THROTTLE_RATES': {
        'ip': '3/m'  # minute_3是scope的字符串,一分钟访问3次
},

路由

from django.contrib import admin
from django.urls import path,include
from app01 import views
from rest_framework.routers import SimpleRouter
router = SimpleRouter()
router.register('user',views.UserView,'user')
router.register('books',views.BookView,'books')
router.register('bookdetail',views.BookDetailView,'bookdetail')
urlpatterns = [
    path('admin/', admin.site.urls),
    path('',include(router.urls))
]