42.JSON Web Token认证
阅读原文时间:2023年07月08日阅读:5

JSON Web Token认证介绍

  • 简称JWT认证,一般用于用户认证
  • JWT是一种相当新的标准,可用于基于token的身份验证
  • 与内置的TokenAuthentication方案不同,JWT不需要使用数据库来验证令牌
  • 优势:相较于传统的token,无需再服务端保存

基于传统token简单的实现认证

#传统的token认证
1.用户登录服务端返回token,并将token保存在服务端(缓存、db等都可以)
2.以后用户访问的时候,需要携带token,服务端获取携带token后去数据库获取token校验

#model示例
class UserInfo(models.Model):
    username = models.CharField(max_length=32)
    password = models.CharField(max_length=64)
    token = models.CharField(max_length=64, null=True, blank=True)

#view示例

class LoginView(APIView):

    def post(self, request, *args, **kwargs):
        user = request.data.get('username')
        pwd = request.data.get('password')
        # 密码验证成功生成token,否则返回错误
        user_object = UserInfo.objects.filter(username=user, password=pwd).first()
        if not user_object:
            return Response({'code': 1, 'error': '用户名或者密码错误'})
        # 使用uuid的方式生成token
        random_string = str(uuid.uuid4())
        # 数据库保存token
        user_object.token = random_string
        user_object.save()
        return Response({'code': 0, 'data': random_string})

class OrderView(APIView):

    def get(self, request, *args, **kwargs):
        # 请求的时候携带token,后端获取token
        token = request.query_params.get('token')
        if not token:# 没有获取到token
            return Response({'code': 2, 'error': '登录成功之后才能访问'})
        # 获取到的token 与数据库的token不匹配
        user_object = UserInfo.objects.filter(token=token).first()
        if not user_object:
            return Response({'code': 3, 'error': 'token无效'})

        return Response({'code': 0, 'data': 'order list'})

jwt实现原理及流程代码示例

用户登录成功之后,服务端使用jwt创建一个token,并给用户返回,不在服务端存储

token由Header、Payload、Signature三部分组成,这三部分之间以小数点连接

# jwt token 示例
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.keH6T3x1z7mmhKL1T3r9sQdAxxdzB6siemGMr_6ZOwU

#Header部分
第一段字符串:Header,内部包含算法和token类型,算法默认使用hs256,token类型是jwt
   {
    "alg": "HS256",
    "typ": "JWT"
  }
将上面JSON转换成字符串,然后做base64url加密,生成上述token第一段header字符串

# Payload部分
第二段字符串:Payload,我们自定义的值的地方,例如下方
{
    "id":"123",
    "name":"qi",
    "xxx":"aaa",
    "exp":1516239022 #超时时间,一般都要设置
}
将上面JSON转换成字符串,然后做base64url加密,生成上述token第二段Payload字符串
因为Payload部分是我们自定义的字段,可以使用base64url反解密,所以这部分不要放敏感内容

# Signature部分
第三段字符串:Signature
第一步:将第1、2部分的密文拼接起来
第二步:对钱2部分密文进行hs256加密 + 加盐
第三部:对hs256加密后的密文,在做base64url加密

以后用户再来访问的时候,需要携带token,后端需要对token进行校验

# 校验流程
第一步:获取token
第二步:对token进行切割,切割成三部分
第三步:对第二段进行base64url解密并获取payload信息
      获取超时时间验证是否超时
第四步:由于md5和hs256不能反解密
      我们再将第一段、第二段字符串拼接进行hs256加密
第五步:将新加密的密文和第三段通过base64解密后的密文对比
      如果密文相等,表示token未修改过,认证通过

'''
如果仿造jwt token,将第二段密文base64反解密,修改了过期时间之后再加密传给服务端
那么即时过期时间验证通过,和第三段密文不匹配,也验证不通过
如果把修改后的第二段和第一段通过加密生成匹配的密文去验证,但是由于第三部分不止加密,并且还加盐,所以也不会通过
'''

#安装
pip3.9 install pyjwt

#创建jwt token 流程示例

import jwt
import datetime

class LoginView(APIView):

    def post(self, request, *args, **kwargs):
        user = request.data.get('username')
        pwd = request.data.get('password')
        user_object = UserInfo.objects.filter(username=user, password=pwd).first()
        if user_object:
            # 加密的盐值,自定义,不能变化,不然验证的时候无法通过
            salt = 'ahjgdsashjgdhjsagfdfasghfdghas'

            # 构造header,加密算法除了hs256还支持其他算法,参考文档支持内容
            headers = {
                'typ': 'jwt',
                'alg': 'HS256'
            }

            # 构造payload
            payload = {
                'user_id': user_object.id,
                'username': user_object.username,
                # 超时时间设置为3分钟
                'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=3)
            }

            # 生成jwt token
            token = jwt.encode(payload=payload, key=salt, algorithm='HS256', headers=headers).decode('utf-8')
            return Response({'code': 0, 'data': token})

        else:
            return Response({'code': 1, 'error': '用户名或者密码错误'})

 # jwt token 校验流程示例

class getInfoLIst(APIView):

    def get(self, request, *args, **kwargs):
        salt = 'ahjgdsashjgdhjsagfdfasghfdghas'

        # 获取token
        token = request.query_params.get('token')

        try:
            # 从token中获取payload,进行校验,包喊过期时间、第三段密文校验
            # true是校验合法性,False是不校验合法性
            verified_payload = jwt.decode(token, salt, True)
            return Response({'code': 0, 'data': verified_payload['user_id']})

        # 如果token过期,会抛出ExpiredSignatureError异常
        except jwt.exceptions.ExpiredSignatureError:
            return Response({'code': 1, 'error': 'token失效'})
        # 如果token认证失败,会抛出DecodeError异常
        except jwt.DecodeError:
            return Response({'code': 2, 'error': 'token认证失败'})
        # 如果token非法,会抛出InvalidTokenError异常
        except jwt.InvalidTokenError:
            return Response({'code': 3, 'error': '非法的token'})

基于jwt和drf实现认证示例

# 自定义utils-创建token
import jwt
from django.conf import settings
import datetime
def create_token(payload, timeout=3):
    '''

    :param payload: payload是我们自定义的部分,不同view可能制定字段不一样
                    所以可以其他字段外部定义好传入,在基础上添加超时时间即可
    :param timeout: token超时时间,默认为3分钟,
    :return:
    '''
    salt = settings.SECRET_KEY

    headers = {
        'typ': 'jwt',
        'alg': 'HS256'
    }

    # 构造payload-给payload添加超时时间
    payload['exp'] = datetime.datetime.utcnow() + datetime.timedelta(minutes=timeout)
    token = jwt.encode(payload=payload, key=salt, algorithm='HS256', headers=headers).decode('utf-8')
    return token

#views-创建token
# 导入创建token的方法
from jwt_demo.extensions.jwt_utils import create_token

class LoginView(APIView):

    def post(self, request, *args, **kwargs):
        user = request.data.get('username')
        pwd = request.data.get('password')
        user_object = UserInfo.objects.filter(username=user, password=pwd).first()
        if user_object:
            # 调用创建token的方法
            token = create_token({'user_id': user_object.id})

            return Response({'code': 0, 'data': token})

        else:
            return Response({'code': 1, 'error': '用户名或者密码错误'})

认证-创建auth.py文件
from django.conf import settings
# 认证异常的包
from rest_framework.exceptions import AuthenticationFailed

# 继承认证基类
'''
认证类中可以有三种操作
1.抛出异常,后续不再执行
2.返回一个元组,request.user,request.auth
3.返回None

'''

class JwtQueryParamsAuthentication(BaseAuthentication):

    def authenticate_header(self, request):
        salt = settings.SECRET_KEY

        token = request.query_params.get('token')

        try:
            # 进行校验
            payload = jwt.decode(token, salt, True)

        # 抛出异常
        except jwt.exceptions.ExpiredSignatureError:
            raise AuthenticationFailed({'code': 1, 'error': 'token失效'})
        except jwt.DecodeError:
            raise AuthenticationFailed({'code': 2, 'error': 'token认证失败'})
        except jwt.InvalidTokenError:
            raise AuthenticationFailed({'code': 3, 'error': '非法的token'})

        return (payload, token)

#view-认证

#导入自定义认证类
from jwt_demo.extensions.auth import JwtQueryParamsAuthentication

class getInfoLIst(APIView):
    #指定自定义认证类 或者直接settings中配置全局
    authentication_classes = [JwtQueryParamsAuthentication,]
    def get(self, request, *args, **kwargs):
        ...