赞
踩
mixins.ListModelMixin, mixins.CreateModelMixin, GenericViewSet
mixin包括:
CreateModelMixin 定义create方法 对应post
ListModelMixin 有自己的方法list()可以重写,对应get方法
RetrieveModelMixin 定义retrieve方法 对应get方法
UpdateModelMixin 定义update方法 对应put patch 方法
DestroyModelMixin 定义delete方法 对应delete
RetrieveModelMixin对于具体的商品信息进行了获取,序列化。这个在后面的商品详情页会介绍到。
UpdateModelMixin中对于部分更新还是全部更新进行了判断。
DestroyModelMixin用来连接我们的delete方法,在我们delete时有一些必要的操作,如设置返回状态204等。
GenericAPIView 继承自APIView:新增加了过滤、分页、序列化
GenericAPIView结合各种mixin可以组合成ListAPIView、RetrieveAPIView、等等,新增了get、post等方法
想用过滤、分页、序列化 等功能使用mixin,很方便
https://blog.csdn.net/summer2day/article/details/81367781
如果不在url中做转换 list 换为 get 等等
则直接用list方法替代get方法
搜索过滤:
可以看源码
queryset = queryset.filter(reduce(operator.and_, conditions)) 源码中查询的一句代码,根据search_fields 和
输入的条件查询
搜索样式:默认?search=条件1,条件2...... 如果?search=admin,1628080190@qq.com 可以模糊查询
filter_backends = [filters.SearchFilter]
search_fields = ["username", "email"]
router = DefaultRouter()
router.register(r'user_info', views.UserViewSet, basename="user")
get post请求直接对应list create url正常就行,/user_ifo/1/ 可以对应retrieve方法 获取一条数据,list通常获取多条数据。
put delete 要想直接对应update destory方法,则url有变化。都对应/user_ifo/1/ 形式 表示修改/删除id为1的数据。
参数可自己带也可以根据参数去更新删除对应数据,就得重写方法。源代码get_object就是获取id为1..的数据,所以最好重写一下
token的组成:
1、头部-Header。默认是JWT,然后头部进行base64加密
2、payload 有效载荷,就是Token 的具体内容, 包含签发时间和过期时间,还有其他信息。
3、签名Signature 签名的组成是,把header、payload分别通过base64进行编码,然后拼接在一起。即对前两部分的签名,防止数据篡改。
下面代码对token进行校验的时候实际上也会对这三部分进行校验(源代码也有体现),校验都通过才会认为是有效token。
- import jwt
-
- from rest_framework_jwt.authentication import JSONWebTokenAuthentication
- from django.utils.encoding import smart_text
- from rest_framework import exceptions
- from rest_framework.authentication import get_authorization_header
- from rest_framework_jwt.settings import api_settings
- from django.utils.translation import ugettext
-
-
- class CustomAuthentication(JSONWebTokenAuthentication):
-
- def get_jwt_value(self, request):
- """
- 获取并校验前端请求头中token信息
- """
- auth = get_authorization_header(request).split()
- auth_header_prefix = api_settings.JWT_AUTH_HEADER_PREFIX.lower()
-
- if not auth or smart_text(auth[0].lower()) != auth_header_prefix:
- return None
-
- if len(auth) == 1:
- msg = ugettext('Invalid Authorization header. No credentials provided.')
- raise exceptions.AuthenticationFailed(msg)
- elif len(auth) > 2:
- msg = ugettext('Invalid Authorization header. Credentials string should not contain spaces.')
- raise exceptions.AuthenticationFailed(msg)
-
- return auth[1]
-
- def authenticate(self, request, username=None, password=None):
- """
- Returns a two-tuple of `User` and token if a valid signature has been
- supplied using JWT-based authentication.
- """
- jwt_value = self.get_jwt_value(request)
- if jwt_value is None:
- # 没有携带token或者格式不对 返回406
- raise exceptions.NotAcceptable()
- try:
- payload = api_settings.JWT_DECODE_HANDLER(jwt_value)
- except jwt.ExpiredSignature:
- # token过期 返回401
- msg = ugettext('Signature has expired.')
- raise exceptions.AuthenticationFailed(msg)
- except jwt.DecodeError:
- # token解析错误,返回400
- msg = ugettext('Error decoding signature.')
- raise exceptions.ParseError(msg)
- except jwt.InvalidTokenError:
- # 无效token,返回400
- raise exceptions.ParseError()
-
- # token认证完成再认证用户名身份
- user = self.authenticate_credentials(payload)
-
- return user, jwt_value
- setting.py 中配置:
-
- # AUTH_USER_MODEL = 'user.User'
-
- # 登录认证,没有就不用,有了token可以不要
- # AUTHENTICATION_BACKENDS = ('utils.login_auth.LoginAuthentication',)
-
- # token认证
- REST_FRAMEWORK = {
- 'DEFAULT_AUTHENTICATION_CLASSES': ['utils.authentic.CustomAuthentication'],
- 'DEFAULT_PERMISSION_CLASSES': ['utils.permission.MyPermission'],
- }
-
- # 设置Token有效时间和认证token信息的前缀,默认在api_settings也有
- # 这里过期时间是从登录开始传第一次token到前端算起,加密token的时候会把这个时间当做参数进行加密,通过解密来判断过期。所以每次成功请求都会生成新的token,只有两次访问时间超过JWT_EXPIRATION_DELTA才会过期。
-
- 所以这里都有一个漏洞就是在你设定的时间内就会过期前端会重新登录。所以根据业务需求可能得每次发送成功请求都要重新更新token,那这样每次的请求token都不一样。也算增加了安全性。
- JWT_AUTH = {
- 'JWT_EXPIRATION_DELTA': datetime.timedelta(seconds=500),
- 'JWT_AUTH_HEADER_PREFIX': 'JWT',
- }
-
-
-
-
-
- # redis每次请求浏览后有缓存,下次访问的时候先从redis里面取,然后再从数据库里面取。
- # 但是即使数据改了也不会更新,需要手动更新。所以选择合适的场景使用很重要。
- # 6379/1, 1是redis的数据库序号,redis默认允许16个数据库,redis不设置数据库序号默认使用的是0。
- CACHES = {
- "default": {
- "BACKEND": "django_redis.cache.RedisCache",
- "LOCATION": "redis://127.0.0.1:6379",
- "OPTIONS": {
- "CLIENT_CLASS": "django_redis.client.DefaultClient",
- # 连接池数量,如果decode_responses不设置为True的话,使用get_redis_connection读取的数据是bytes,需要decode为utf-8
- "CONNECTION_POOL_KWARGS": {"max_connections": 100, "decode_responses": True},
- # "PASSWORD": "123456",
- }
- }
- }
-
- # 当关闭redis时默认会出现异常,配置这个给所有缓存配置相同的忽略行为
- # DJANGO_REDIS_IGNORE_EXCEPTIONS = True
- from django.shortcuts import render, redirect
- from django.contrib import auth
- from django.contrib.auth.decorators import login_required
-
-
- def login(request):
- if request.method == 'POST':
- username = request.POST.get('username')
- passwd = request.POST.get('password')
- user = auth.authenticate(username=username, password=passwd)
- # 我们一般会重写authenticate方法,假如这个验证成功的话,这个user就有值,就可以进行登录
- # 这个依赖于session,将验证过的用户赋值给request.user属性
- auth.login(request, user)
- # 这个是把这个user封装进这个request里面,下面就可以直接进行调用了,通过request.user进行调用,进行登录验证
- # 就是将这个user和密码写进这个sessions里面,下次过来的时候就可以直接进行访问了,带着这个cookies进行匹配
- if user:
- # 浏览器打开网址会自动生成一个sessionId,F12可以查看到,然后服务器会保存这个sessionId和用户的登录信息
- # 还有过期时间保存到django_session表中。下次浏览器访问界面就是根据sessionId找到并识别用户
- return redirect('/index/')
- else:
- return render(request, 'login.html')
-
- # login_required就是进行登录认证,只要request.user有值request.user.is_authenticated()
- # 源码直接返回True,否则就是匿名用户。返回False
-
-
- @login_required(login_url='/login')
- def index(request):
- print('进入这个首页的页面')
-
-
- def logout(request):
- print('注销')
- auth.logout(request)
- # 这个相当于把这个request里面的user给清除掉,清除掉session_id,注销掉用户变成匿名用户
- # 源码中有 request.session.flush(),将session的数据都删除,并且cookies也失效。
- # 数据库表中对应的这条数据也会删除
from django.conf.urls import url, include from rest_framework.routers import DefaultRouter from user import views router = DefaultRouter() router.register(r'user_info', views.UserViewSet, basename="user") router.register(r'group_info', views.GroupViewSet, basename="group") router.register(r'permission_info', views.PermissionViewSet, basename="permission") router.register(r'group_permission_info', views.GroupPermissionViewSet, basename="group_permission") urlpatterns = [ url(r'', include(router.urls)), url(r'login', views.UserLoginView.as_view()), url(r'get_group_dpt_region_info', views.CommonView.as_view()), url(r'get_user_all_permission', views.CommonView.as_view()) ]paginate.py 分页 from rest_framework.pagination import PageNumberPagination class MyPagination(PageNumberPagination): """ page_size:(默认一页显示的条数,可在settings配置中设置或这里配置或路由中自动配置) page_size_query_param:通过在路由上传递数据来让每一页显示多少条数据(默认为None)(?page_size=10&page=2) max_page_size:最大显示多少个数据(默认为None) page_query_param:通过什么数据翻页(默认通过?page=页码进行翻页) """ page_size = 10 page_size_query_param = 'page_size' page_query_param = 'page'
- import logging
- import datetime
- import re
-
- from django.db import transaction
- from rest_framework import filters
- from rest_framework.mixins import ListModelMixin, CreateModelMixin, UpdateModelMixin, \
- DestroyModelMixin, RetrieveModelMixin
- from rest_framework.views import APIView
- from rest_framework.viewsets import GenericViewSet, ModelViewSet
- from rest_framework_jwt.settings import api_settings
- from rest_framework.response import Response
- from django.contrib.auth.models import Group, Permission, ContentType
- from django_redis import get_redis_connection
-
- from master.models import Region, Department
- from user.models import User
- from utils.paginate import MyPagination
- from user.serializers import UserSerializer, RegionSerializer, DepartmentSerializer, \
- GroupSerializer, PermissionSerializer
- from utils.permission import MyPermission
-
- # 统一规定返回给前端的格式,{"code": 1, "value": "", "token": ""}
- # code:0代表失败,1代表成功,2代表没有权限。value:代表返回给前端的值,token:代表令牌,成功请求的都携带
- EXPIRE_TIME = 60 # 过期时间,redis设置用户密码输入错误登录次数限制,超过限制则等待时间过后才能再次登录
-
- logger = logging.getLogger('user')
-
-
- class UserLoginView(APIView):
- # 不需要登录认证和权限认证
- authentication_classes = []
- permission_classes = []
-
- @staticmethod
- def post(request):
- username = request.data.get('username')
- password = request.data.get('password')
- # __iexact忽略大小写,__exact精确= 都类似like语句
- user = User.objects.filter(username__exact=username, is_active=True).first()
- if not user:
- return Response({"code": 0, "value": "username not exist", "token": ""})
- redis = get_redis_connection()
- if redis.get(username):
- # 锁定之后重新解锁再输入密码只有一次机会
- redis.setex("%s_login_times" % username, EXPIRE_TIME * 5, 3)
- return Response({"code": 0, "value": "Your account has been locked. Please try again later!", "token": ""})
- if user.check_password(password) or user.password == password:
- # 更新登录时间
- user.last_login = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- user.save()
- if redis.get(username):
- redis.delete(username)
- if redis.get("%s_login_times" % username):
- redis.delete("%s_login_times" % username)
- token = api_settings.JWT_ENCODE_HANDLER(api_settings.JWT_PAYLOAD_HANDLER(user))
- return Response({'code': 1, 'value': user.username, "token": token})
- else:
- if redis.get("%s_login_times" % username) and int(redis.get("%s_login_times" % username)) >= 3:
- redis.setex(username, EXPIRE_TIME, "true")
- elif redis.get("%s_login_times" % username):
- redis.setex("%s_login_times" % username, EXPIRE_TIME, int(redis.get("%s_login_times" % username))+1)
- else:
- redis.setex("%s_login_times" % username, EXPIRE_TIME, 1)
- return Response({"code": 0, "value": "password error", "token": ""})
-
-
- class CommonView(APIView):
- """
- 一些不固定model的请求 不需要权限认证,需要登录认证
- """
- permission_classes = []
- authentication_classes = []
-
- @staticmethod
- def get(request):
- """
- 获取group/region/department用于select框
- """
- try:
- regions = Region.objects.filter(active=0).order_by('name')
- departments = Department.objects.filter(active=0).order_by('name')
- groups = Group.objects.order_by('name').all()
- ser_region = RegionSerializer(instance=regions, many=True)
- ser_department = DepartmentSerializer(instance=departments, many=True)
- ser_group = GroupSerializer(instance=groups, many=True)
- data = {
- 'region_info': ser_region.data,
- 'department_info': ser_department.data,
- 'group_info': ser_group.data
- }
- return Response({'code': 1, 'value': data, 'token': ''})
- except Exception as e:
- logger.error(e)
- transaction.rollback()
- return Response({'code': 0, 'value': 'database error', 'token': ''})
-
- @staticmethod
- def post(request):
- try:
- user_group = request.user.groups.all()
- if user_group:
- permission_list = []
- for group in user_group:
- user_permission = group.permissions.all()
- if user_permission:
- for permission in user_permission:
- permission_list.append(permission.codename)
- if permission_list:
- return Response({'code': 1, 'value': permission_list, 'token': ''})
- except Exception as e:
- logger.error(e)
- transaction.rollback()
- return Response({'code': 0, 'value': 'database error', 'token': ''})
-
- return Response({'code': 1, 'value': '', 'token': ''})
-
-
- class UserViewSet(ModelViewSet):
- permission_classes = [MyPermission]
- queryset = User.objects.filter(is_active=True).order_by('username')
- serializer_class = UserSerializer
- pagination_class = MyPagination
- # 搜索功能,SearchFilter是搜索过滤,只针对当前查询过滤,所以不在settings.py中配置
- filter_backends = [filters.SearchFilter]
- # 搜索字段,也可以使用双下划线在Foreign Key或ManyToManyField上执行相关查找:
- search_fields = ["username"]
-
- def list(self, request, *args, **kwargs):
- """
- get请求,分页获取user info
- """
- try:
- ser_user = super().list(self, request, *args, **kwargs)
- if ser_user.status_code == 200:
- return Response({'code': 1, 'value': ser_user.data, 'token': ''})
- return Response({'code': 0, 'value': 'get info fail, errorCode %s' % ser_user.status_code, 'token': ''})
- except Exception as e:
- logger.error(e)
- transaction.rollback()
- return Response({'code': 0, 'value': 'database error', 'token': ''})
-
- def create(self, request, *args, **kwargs):
- """
- Add User
- """
- try:
- serializer = self.get_serializer(data=request.data)
- serializer.is_valid(raise_exception=True)
- serializer.validated_data['password'] = request.data.get('password')
- serializer.validated_data['create_by'] = request.user.username
- serializer.validated_data['update_by'] = request.user.username
- serializer.validated_data['group_id'] = request.data.getlist('groups')
- serializer.save()
- return Response({'code': 1, 'value': "Add Success", "token": ''})
- except Exception as e:
- logger.error(e)
- transaction.rollback()
- pattern = re.compile(r".*重复键违反唯一约束.+键值.*username.*=(.+).*已经存在", re.S)
- result = pattern.findall(str(e))
- if result:
- res = re.search(r'[\w]+', result[0])
- return Response({'code': 0, 'value': 'username: %s is exist' % res.group(), 'token': ''})
- return Response({'code': 0, 'value': 'database error', 'token': ''})
-
- def update(self, request, *args, **kwargs):
- """
- update user,根据路由中的pk找user
- """
- try:
- instance = self.get_object()
- if instance.username == request.user.username or request.user.is_superuser:
- serializer = self.get_serializer(instance, data=request.data)
- serializer.is_valid(raise_exception=True)
- serializer.validated_data['password'] = request.data.get('password')
- serializer.validated_data['update_by'] = request.user.username
- serializer.validated_data['group_id'] = request.data.getlist('groups')
- serializer.save()
- return Response({'code': 1, 'value': "Update Success", "token": ''})
- else:
- return Response({'code': 0, 'value': 'Insufficient Authority!', 'token': ''})
- except Exception as e:
- logger.error(e)
- transaction.rollback()
- return Response({'code': 0, 'value': 'database error', 'token': ''})
-
- def destroy(self, request, *args, **kwargs):
- """
- delete user,根据路由中的pk找user
- """
- try:
- user = self.get_object()
- if request.user.is_superuser:
- user.is_active = False
- user.update_by = user.username
- user.save()
- return Response({'code': 1, 'value': "Delete Success", "token": ""})
- else:
- return Response({'code': 0, 'value': 'Insufficient Authority!', 'token': ''})
- except Exception as e:
- logger.error(e)
- transaction.rollback()
- return Response({'code': 0, 'value': 'database error', 'token': ''})
-
-
- class GroupViewSet(ListModelMixin, CreateModelMixin, DestroyModelMixin, GenericViewSet):
- permission_classes = [MyPermission]
- queryset = Group.objects.all()
- serializer_class = GroupSerializer
- # 根据ordering_fields中指定的字段排序,可以指定多个
- filter_backends = [filters.OrderingFilter]
- ordering_fields = ["name"]
-
- def list(self, request, *args, **kwargs):
- try:
- # 没有这句每次都去读缓存,不能有效进行增删改查。
- queryset = self.filter_queryset(self.get_queryset())
- ser_group = self.get_serializer(queryset, many=True)
- return Response({'code': 1, 'value': ser_group.data, 'token': ''})
- except Exception as e:
- logger.error(e)
- transaction.rollback()
- return Response({'code': 0, 'value': 'database error', 'token': ''})
-
- def create(self, request, *args, **kwargs):
- try:
- serializer = self.get_serializer(data=request.data)
- serializer.is_valid(raise_exception=True)
- serializer.save()
- return Response({'code': 1, 'value': "Add Success", "token": ''})
- except Exception as e:
- logger.error(e)
- transaction.rollback()
- pattern = re.compile(r".*group with this name already exists.*", re.S)
- if pattern.findall(str(e)):
- return Response({'code': 0, 'value': 'group: %s is exist' % request.data.get('name'), 'token': ''})
- return Response({'code': 0, 'value': 'database error', 'token': ''})
-
- def destroy(self, request, *args, **kwargs):
- """
- 先删除多对多关联关系再删除group
- """
- try:
- instance = self.get_object()
- instance.permissions.clear()
- instance.delete()
- return Response({'code': 1, 'value': "Delete Success", "token": ''})
- except Exception as e:
- logger.error(e)
- transaction.rollback()
- return Response({'code': 0, 'value': 'database error', 'token': ''})
-
-
- class PermissionViewSet(ModelViewSet):
- """
- 序列化采用serializers.Serializer时,如果保存和更新数据用序列化的save()时必须重写create和update方法进行保存和更新
- 序列化采用serializers.ModelSerializer(继承Serializer)时,它已经重写了create和update方法
- """
- permission_classes = [MyPermission]
- queryset = Permission.objects.all()
- serializer_class = PermissionSerializer
- pagination_class = MyPagination
- filter_backends = [filters.SearchFilter]
- search_fields = ["name", "codename"]
-
- def list(self, request, *args, **kwargs):
- try:
- ser_user = super().list(self, request, *args, **kwargs)
- if ser_user.status_code == 200:
- return Response({'code': 1, 'value': ser_user.data, 'token': ''})
- return Response({'code': 0, 'value': 'get info fail, errorCode %s' % ser_user.status_code, 'token': ''})
- except Exception as e:
- logger.error(e)
- transaction.rollback()
- return Response({'code': 0, 'value': 'database error', 'token': ''})
-
- def create(self, request, *args, **kwargs):
- try:
- serializer = self.get_serializer(data=request.data)
- serializer.is_valid(raise_exception=True)
- content_type_obj = ContentType.objects.filter(model='user', app_label='user').first()
- serializer.validated_data['content_type'] = content_type_obj
- serializer.save()
- return Response({'code': 1, 'value': "Add Success", 'token': ''})
- except Exception as e:
- logger.error(e)
- transaction.rollback()
- pattern = re.compile(r".*重复键违反唯一约束.+键值.*已经存在", re.S)
- if pattern.findall(str(e)):
- return Response({'code': 0, 'value': 'codename: %s is exist'
- % request.data.get('codename'), 'token': ''})
- return Response({'code': 0, 'value': 'database error', 'token': ''})
-
- def update(self, request, *args, **kwargs):
- try:
- instance = self.get_object()
- serializer = self.get_serializer(instance, data=request.data)
- serializer.is_valid(raise_exception=True)
- serializer.save()
- return Response({'code': 1, 'value': "Update Success", "token": ''})
- except Exception as e:
- logger.error(e)
- transaction.rollback()
- return Response({'code': 0, 'value': 'database error', 'token': ''})
-
- def destroy(self, request, *args, **kwargs):
- try:
- instance = self.get_object()
- instance.delete()
- return Response({'code': 1, 'value': "Delete Success", "token": ''})
- except Exception as e:
- logger.error(e)
- transaction.rollback()
- return Response({'code': 0, 'value': 'database error', 'token': ''})
-
-
- class GroupPermissionViewSet(RetrieveModelMixin, UpdateModelMixin, GenericViewSet):
- permission_classes = [MyPermission]
- queryset = Group.objects.all()
- serializer_class = GroupSerializer
-
- def retrieve(self, request, *args, **kwargs):
- """
- 返回当前组拥有的权限和所有权限
- """
- try:
- instance = self.get_object()
- serializer = self.get_serializer(instance)
- return Response({'code': 1, 'value': serializer.data, 'token': ''})
- except Exception as e:
- logger.error(e)
- transaction.rollback()
- result = re.findall(r"You do not have permission to perform this action", str(e))
- if result:
- return Response({'code': 2, 'value': 'Insufficient Authority!', 'token': ''})
- return Response({'code': 0, 'value': 'database error', 'token': ''})
-
- def update(self, request, *args, **kwargs):
- """
- 更新group权限,前端传入权限id
- """
- permission_id = request.data.getlist('permission_data[id]')
- try:
- instance = self.get_object()
- if permission_id:
- instance.permissions.set(permission_id)
- return Response({'code': 1, 'value': "Save Success", "token": ''})
- instance.permissions.clear()
- return Response({'code': 1, 'value': 'Clear Permission Success', 'token': ''})
- except Exception as e:
- logger.error(e)
- transaction.rollback()
- return Response({'code': 0, 'value': 'database error', 'token': ''})
-
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。