限流
唯一标识的构成 --> 'throttle' + scope+ 用户id / IP地址 组合的字符串!!
redis中存储的数据 --> {"唯一标识":[时间戳1,时间戳2,时间戳3]} 唯一标识用于构建缓存中的key!!
理解限流的基本逻辑.
扩展方面:
自定义错误提示;
多个限流类的验证过程中某个限流类验证失败抛出raise.
基于用户而言,不管是匿名用户还是已登陆的用户
- 频率类的全局和局部配置即可,但需注意,频率类的局部配置是在视图类中.
特别重要的是,全局/局部 关乎 在redis中维护键值对 应用于 全部路由(视图类)/特定的路由(视图类)
- 频率次数的全局和局部设置皆可,但需注意,频率次数的局部设置是在频率类中.
当然还可以基于API的访问次数.这个涉及ScopedRateThrottle,略.用到了再仔细探究.
2
3
4
5
6
7
8
9
10
11
12
13
# 限流基本逻辑
限流/限制访问频率
开发过程中,某个接口不想让用户访问过于频繁,就需要用到限流机制.
例如:以短信验证登陆为例.
短信平台自身限制每个账号1小时只能发10次验证;
但这限制的不够严谨.攻击者可以找上万个手机号,向后端的短信登陆接口发送请求,消耗短信数量.增加了维护的费用.
So,仅仅依靠短信平台的限制来防止"盗刷"是远远不够的.还需加入其他限制手段!
1> IP限制 (哪怕攻击者找了10万个手机号,也需要切换IP
2> 验证码 (减缓验证的速度
3> 像什么商品信息页面,加入防爬虫的机制
现目前的学习,限制访问频率,我们需要找到唯一标识
- 已登陆的用户,唯一标识> 用户信息主键(ID、用户名
- 匿名用户,唯一标识> IP
扩展“暂且了解即可”:针对IP,访问者可以通过代理IP来绕过IP的限制.
SO,现在很多网页和程序在IP限制的基础上,还会加入js算法以增加访问接口的难度(不是直接向该接口发送个json数据那么简单啦)
IP+JS算法(若想破解该JS算法,需要逆向)
★★★ 我们拿到唯一标识后,限制的逻辑是怎样的呢? 假设限制 3次/10min
<"用户ID" 维护一个 该用户ID曾经访问该接口的时间点列表/访问记录>
栗如- "9123563":[16:43, 16:42, 16:30, 15:20] 我们设定新加入的时间点是插入到列表下标为0的位置.
1. 获取当前时间 16:45
2. 当前时间-10分钟=计数开始时间 16:35
3. 剔除列表中时间点小于16:35的数据 [16:43, 16:42]
4. 计算列表的长度
- 超过,错误
- 未超过,可以访问
len([16:43, 16:42]) < 3 所以可以访问, 并将16:45这个时间点放到列表中 [16:45, 16:43, 16:42]
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 快速使用
POST格式向
http://127.0.0.1:8000/login/
登陆接口发送数据{"username":"xiaohong","password":"123456"}
;
连续发送5次后, 报错:
{"detail": "Request was throttled. Expected available in 56 seconds."}
settings.py
"""
mac安装redis,启动redis
Django安装django-redis,并在settings文件中进行配置!
"""
# -- redis
CACHES = {
"default": { # -- ▲ 这里default
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": "redis://127.0.0.1:6379",
"OPTIONS": {
"CLIENT_CLASS": "django_redis.client.DefaultClient",
"CONNECTION_POOL_KWARGS": {"max_connections": 100},
"PASSWORD": "qwe123",
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ext/throttle.py
from rest_framework.throttling import SimpleRateThrottle
from django.core.cache import cache as default_cache # 读取配置文件中redis缓存的配置
class MyThrottle(SimpleRateThrottle):
scope = "XXX" # -- 必须设置
THROTTLE_RATES = {"XXX": "5/m"} # -- 全局设置/局部设置皆可,但一定要注意, 字典的key值一定要与scope设定的值一样!!
"""
这个SimpleRateThrottle里已经配置了,这里可以不用写,但要注意,它应用的是Django配置中"default"的那个redis数据库
若想应用Django配置中其他名字的redis,需要编写下面几行代码(看源码分析出的).
>>> from django.utils.connection import ConnectionProxy
>>> from django.core.cache import caches
>>> name = "xxxx"
>>> cache = ConnectionProxy(caches, name) # 源码里默认这里的name值是"default".
"""
cache = default_cache
def get_cache_key(self, request, view):
""" 构建缓存中的key """
# 若已经登陆,取到用户ID;否则获取到发起请求的用户的IP地址
if request.user:
ident = request.user.pk # 用户ID
else:
ident = self.get_ident(request) # IP地址
# self.cache_format --> 'throttle_%(scope)s_%(ident)s'
return self.cache_format % {"scope": self.scope, "ident": ident}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
models.py
"""
数据库迁移后,在数据库中手动添加数据
xiaoming 123123
xiaohong 123456
"""
from django.db import models
class UserInfo(models.Model):
"""用户表"""
username = models.CharField(verbose_name="用户名", max_length=32)
password = models.CharField(verbose_name="密码", max_length=64)
# 为了简便,此处临时将token放在数据库中;drf项目中token的存放,一般会借助jwt.
token = models.CharField(verbose_name="TOKEN", max_length=64, null=True, blank=True)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
app01/views.py 此处是局部配置限流.
import uuid
from rest_framework.response import Response
from rest_framework.views import APIView
from app01 import models
from ext.throttle import MyThrottle
class LoginView(APIView):
authentication_classes = []
permission_classes = []
throttle_classes = [MyThrottle, ] # 限流类也可以进行全局配置和局部配置!!
def post(self, request):
user = request.data.get("username")
pwd = request.data.get("password")
user_obj = models.UserInfo.objects.filter(username=user, password=pwd).first()
if not user_obj:
return Response({"status": False, "msg": "用户名或密码错误"})
token = str(uuid.uuid4())
user_obj.token = token
user_obj.save()
return Response({"status": True, "data": token})
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 源码分析
★ 一定要清楚,源码中:
唯一标识的构成 --> 'throttle'字符串 + scope变量的值 + 用户id / IP地址 组合的字符串!!
redis中存储的数据 --> {"唯一标识":[时间戳1,时间戳2,时间戳3]} 唯一标识用于构建缓存中的key!!
假设,限流类是这样配置的!!
from rest_framework.throttling import SimpleRateThrottle
from django.core.cache import cache as default_cache
class MyThrottle(SimpleRateThrottle):
scope = "XXX"
THROTTLE_RATES = {"XXX": "5/m"}
cache = default_cache
def get_cache_key(self, request, view):
if request.user:
ident = request.user.pk
else:
ident = self.get_ident(request)
return self.cache_format % {"scope": self.scope, "ident": ident}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 对象加载
实例化限流类
实例化每个限流类.. 类的实例化会自动执行类的__init__
方法
读取源码, 可知, 在此构造方法中, 会读取当前限流类对限制频率的配置,将其解析,获得 访问次数num_requests和时间间隔duration.
将 访问次数和时间间隔 封装到当前限流类的实例对象中, 作为该对象独有的属性!
另外, 需知道:
1> 限流类的应用也可以全局和局部配置!!
2> 限制频率的配置局部和全局都可以配置!! {"xxx":"5/m"}
一定要清楚!! throttle_classes 全局指的配置文件, 局部指的视图类 ; THROTTLE_RATES 全局指的配置文件,局部指的是限流类!
# -- 全局配置
REST_FRAMEWORK = {
'DEFAULT_THROTTLE_CLASSES': ['ext.per.IpThrottle','ext.per.UserThrottle' ],
'DEFAULT_THROTTLE_RATES': {
'ip': '3/m',
'user': '5/m',
}
}
2
3
4
5
6
7
8
(´▽`).
class APIView(View):
authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES # 认证
permission_classes = api_settings.DEFAULT_PERMISSION_CLASSES # 权限
throttle_classes = api_settings.DEFAULT_THROTTLE_CLASSES # 限流
def dispatch(self, request, *args, **kwargs):
self.args = args
self.kwargs = kwargs
request = self.initialize_request(request, *args, **kwargs)
self.request = request
self.headers = self.default_response_headers # deprecate?
try:
self.initial(request, *args, **kwargs)
if request.method.lower() in self.http_method_names:
handler = getattr(self, request.method.lower(),self.http_method_not_allowed)
else:
handler = self.http_method_not_allowed
response = handler(request, *args, **kwargs)
except Exception as exc:
response = self.handle_exception(exc)
self.response = self.finalize_response(request, response, *args, **kwargs)
return self.response
def initial(self, request, *args, **kwargs):
self.perform_authentication(request) # 认证
self.check_permissions(request) # 权限
self.check_throttles(request) # 限流
def check_throttles(self, request):
throttle_durations = []
# self.get_throttles() 得到所有限流类的实例化对象列表 将其循环. 注意:此处的类实例化,需看__init__
for throttle in self.get_throttles():
# ★★★ 重点就是 throttle.allow_request(request, self)
# 视图类(SimpleRateThrottle) - SimpleRateThrottle(BaseThrottle)
# 根据继承关系,找到的是SimpleRateThrottle里的allow_request方法.
# allow_request方法的具体实现下面会阐述,这里不讲解. 就现目前看到的几行代码而言.
"""
1> 当所有限流类的实例化对象执行throttle.allow_request(request, self)都返回True时,
那么throttle_durations列表为空,意味着限流的验证结束/check_throttles限流方法执行完毕
接着往下就是执行视图函数.
- So,都返回True,就不限流呗.
2> 只要有一个throttle.allow_request(request, self)返回False,就会限流
将 <需要等待的时间 throttle.wait()计算需要等待的时间> 加入throttle_durations列表中
当有多个限流类的实例化对象返回False时,throttle_durations列表中就会有多个值!
"""
if not throttle.allow_request(request, self):
# 需要限流时
throttle_durations.append(throttle.wait())
if throttle_durations:
# 当设置的次数是负数的时候,throttle.wait()返回的值就是None!!
durations = [
duration for duration in throttle_durations
if duration is not None
]
duration = max(durations, default=None) # 多个限流不通过,等待最久的,过了这个时间,所有限流肯定都能通过
self.throttled(request, duration) # 该方法会抛出异常
def get_throttles(self):
# self.throttle_classes 可以局部设置、全局设置!
# throttle() 类实例化,会自动执行对象的__init__构造方法!!
# So,接下来源码定位到,SimpleRateThrottle类的__init__方法.
return [throttle() for throttle in self.throttle_classes]
def throttled(self, request, wait):
"""抛出异常,最大需要等待多久"""
# 当限流类的次数全部设置为负数,这里传递过去的wait值为None;
# 否则,该wait值是 等待的最大时间戳/最大等待时间.
raise exceptions.Throttled(wait)
class SimpleRateThrottle(BaseThrottle):
cache = default_cache
timer = time.time
cache_format = 'throttle_%(scope)s_%(ident)s'
scope = None
THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES # 频率次数 去配置文件里读,全局和局部都可以配置!
"""比如(全局 此处写了多个频率):
REST_FRAMEWORK = {
"DEFAULT_THROTTLE_RATES":{
"XXX":"5/m",
"x2":"3/h",
}
}
"""
def __init__(self):
# 限流类首次实例化时,实例化对象里面肯定没有'rate'成员. 会执行get_rate方法.
# (自己写的限流类里没有,其父类SimpleRateThrottle里有get_rate)
if not getattr(self, 'rate', None):
self.rate = self.get_rate()
# 假设self.rate的值是 "5/m"
# self.parse_rate(self.rate) 会对 "5/m"进行解析!
self.num_requests, self.duration = self.parse_rate(self.rate) # 5 60
def get_rate(self):
# 意味着,限流类里必须得设置 scope变量, 且不能设置bool值为False的值.
if not getattr(self, 'scope', None):
msg = ("You must set either `.scope` or `.rate` for '%s' throttle" %
self.__class__.__name__)
raise ImproperlyConfigured(msg)
try:
# THROTTLE_RATES'频率'、scope'名字' 都是可以在自己写的限流类里设置的类变量
# eg: THROTTLE_RATES = {"XXX":"5/m"} scope = "XXX"
# 若自己的限流类里没写THROTTLE_RATES,scope,会读取SimpleRateThrottle里的.
return self.THROTTLE_RATES[self.scope]
except KeyError:
msg = "No default throttle rate set for '%s' scope" % self.scope
raise ImproperlyConfigured(msg)
def parse_rate(self, rate):
if rate is None:
return (None, None)
num, period = rate.split('/')
num_requests = int(num)
# 秒 分 时 天
# period[0] 这里设计的很奇妙,意味着 我们设置频率时, "5/m" 和 "5/minute"是一样的效果!
duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]
return (num_requests, duration)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# 是否限流
核心在于, 限流类实例化后, 执行的allow_request方法, 查看是否应该对请求进行节流.
源码分析到这里, 有几个值得注意的点再提一下:
1> 限流类里必须得设置 scope变量, 且不能设置bool值为False的值;
2> 若设置限流的频率 {"xxx":None}, 那么相当于该限流没设置;
3> 若唯一标识的值为None,那么相当于该限流没设置;
4> redis数据库里的访问记录会根据设置的时间自动清除数据的..
5> 程序中做了很多临界点的处理, 比如 设置的次数为0,-1..
class BaseThrottle:
"""
Rate throttling of requests.
"""
def allow_request(self, request, view):
"""
返回True表示允许访问,返回False表示不允许访问.
"""
raise NotImplementedError('.allow_request() must be overridden')
def get_ident(self, request):
"""
获取唯一标识 - IP地址. (下述代码不必深究) 其逻辑如下:
A 直接 访问B, B可以获取到请求者的IP地址 A.
A 通过F代理 访问B, 若代理不是高匿的,可以获取到整个代理关系 F>A,取到源头 A; 若代理是高匿的,结果为F.
"""
xff = request.META.get('HTTP_X_FORWARDED_FOR')
remote_addr = request.META.get('REMOTE_ADDR')
num_proxies = api_settings.NUM_PROXIES
if num_proxies is not None:
if num_proxies == 0 or xff is None:
return remote_addr
addrs = xff.split(',')
client_addr = addrs[-min(num_proxies, len(addrs))]
return client_addr.strip()
return ''.join(xff.split()) if xff else remote_addr
def wait(self):
"""
Optionally, return a recommended number of seconds to wait before
the next request.
当不允许访问时,有一个等待机制.等待多久才能访问.
"""
return None
class SimpleRateThrottle(BaseThrottle):
cache = default_cache
timer = time.time
cache_format = 'throttle_%(scope)s_%(ident)s'
scope = None
THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES
def __init__(self):
if not getattr(self, 'rate', None):
self.rate = self.get_rate() # 5/m
self.num_requests, self.duration = self.parse_rate(self.rate) # 5 60
def get_cache_key(self, request, view):
raise NotImplementedError('.get_cache_key() must be overridden')
def get_rate(self):pass
# ... ... 前面已分析过
def parse_rate(self, rate):pass
# ... ... 前面已分析过
def allow_request(self, request, view):
"""
查看是否应该对请求进行节流.
成功时调用' throttle_success'. 失败时调用' throttle_failure'.
"""
# !! 这两行语句,意味着我设置访问频率为None,相当于没进行限流/该频率的限流直接通过. eg: {"XXX": None} 亲测有效.
if self.rate is None:
return True
# 在频率类里重写了get_cache_key方法,该方法用于获取用户的唯一标识.
self.key = self.get_cache_key(request, view)
# !! 这两行语句,意味着get_cache_key方法设置返回值为None,相当于没进行限流/该频率的限流直接通过.
if self.key is None:
return True
self.history = self.cache.get(self.key, []) # 从redis中获取历史访问记录
self.now = self.timer() # 获取当前的时间戳
"""!该判断条件写的挺牛批的!要转下弯!
将表达式转换一下来理解:
循环列表,有值,并且当前时间减去列表的最后一个时间大于`self.duration`s
把这种数据pop掉,<这样列表中只有60s以内的访问时间!!!>
★ 也就是说,列表中的时间处于 (当前时间-时间间隔)至当前时间这段时间 才是合理的!!!
再换个说法,通过循环剔除掉历史时间列表中超出时间间隔的时间戳,如将1分钟之前的时间戳全部删除.
注意: self.history[-1] 该列表最后一个项 是里面的最早一次访问的时间 因为插入是从列表第0个位置插入的.
while self.history and self.now - self.history[-1] >= self.duration:
self.history.pop()
"""
while self.history and self.history[-1] <= self.now - self.duration:
self.history.pop()
# 记录列表的长度大于等于允许访问的次数,则超过了限制,return False
if len(self.history) >= self.num_requests:
return self.throttle_failure()
# 否则,正常访问,会将时间点插入记录列表,返回True
return self.throttle_success()
def throttle_success(self):
"""
Inserts the current request's timestamp along with the key into the cache.
将当前请求的时间戳与键一起插入进入缓存.
"""
self.history.insert(0, self.now)
self.cache.set(self.key, self.history, self.duration) # ▲!!redis中超过了最大间隔时间,该缓存会自动失效!!
return True
def throttle_failure(self):
return False
def wait(self):
"""以秒为单位返回建议的下一个请求时间."""
if self.history:
# 时间间隔 - (当前时间 - 历史中最早访问的记录) 该表达式得到的最早的访问记录失效需要多久,因为失效了才能继续插入.
# 3/m 当前时间16:47 [16:46,16:45,16:43] 16:47-16:43=4s 60s-4s=56s 即56s后16:43该记录才会失效.
remaining_duration = self.duration - (self.now - self.history[-1]) # 还需等待多久.
else:
remaining_duration = self.duration
available_requests = self.num_requests - len(self.history) + 1
# 0-2+1=-1 就是列表中还有值 但中途改了次数为0 self.history[-1]会报越界的错误
if available_requests <= 0:
return None
return remaining_duration / float(available_requests)
"""为什么wait方法里的两个if判断?? 这涉及临界条件的判断:
执行allow_request方法时
多久没访问了,下一次访问时,redis中的记录都已经自动删除了.self.history得到的是[];
并且访问次数的设置<0,导致len(self.history) >= self.num_requests结果为False,往上返回的是False.
接着分析执行流程,是会执行wait方法的. 此时的self.history为空,available_requests也是<=0的.
So,wait方法里的两个if判断就可以解释啦.
前面的 durations = [ duration for duration in throttle_durations if duration is not None],
为啥判断if duration is not None也可以一并解释啦!
"""
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# 自定义错误提示
# 错误相关源码剖析
其实不用那么逐字逐行的理解, 你看它最后需要给什么.. 像Throttled它最后给的是
super().__init__(detail, code)
它中间经历了啥过程, 我不管, 我重写时, 只需要关注 我给detail赋什么值即可.
class APIView(View):
def initial(self, request, *args, **kwargs):
self.perform_authentication(request) # 认证
self.check_permissions(request) # 权限
self.check_throttles(request) # 限流
def check_throttles(self, request):
throttle_durations = []
for throttle in self.get_throttles():
if not throttle.allow_request(request, self):
throttle_durations.append(throttle.wait())
if throttle_durations:
durations = [
duration for duration in throttle_durations
if duration is not None
]
duration = max(durations, default=None)
self.throttled(request, duration)
def throttled(self, request, wait):
# 当限流类的次数全部设置为负数,这里传递过去的wait值为None;
# 否则,该wait值是 等待的最大时间戳/最大等待时间.
raise exceptions.Throttled(wait)
class APIException(Exception):
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
default_detail = _('A server error occurred.')
default_code = 'error'
def __init__(self, detail=None, code=None):
if detail is None:
detail = self.default_detail
if code is None:
code = self.default_code
self.detail = _get_error_details(detail, code)
def __str__(self):
return str(self.detail)
def get_codes(self):
return _get_codes(self.detail)
def get_full_details(self):
return _get_full_details(self.detail)
class Throttled(APIException):
status_code = status.HTTP_429_TOO_MANY_REQUESTS
default_detail = _('Request was throttled.')
extra_detail_singular = _('Expected available in {wait} second.')
extra_detail_plural = _('Expected available in {wait} seconds.')
default_code = 'throttled'
def __init__(self, wait=None, detail=None, code=None):
if detail is None:
detail = force_str(self.default_detail)
if wait is not None:
wait = math.ceil(wait) # 时间戳取整数
detail = ' '.join((
detail,
force_str(ngettext(self.extra_detail_singular.format(wait=wait),
self.extra_detail_plural.format(wait=wait),
wait))))
self.wait = wait
# 很明显,上述逻辑中,detail最终的值就是 错误信息提示的内容!!
# 那么我们只需要重写throttled方法即可,在哪重写呢?看源码中throttled方法中的self是谁!是视图类的实例化对象.
# So,我们需要在视图类中重写throttled方法!!
super().__init__(detail, code) # 本质还是将值传给APIException!!!
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# 方式一
读懂 Throttled类的逻辑, 还是用Throttled.
from rest_framework import exceptions
class LoginView(APIView):
authentication_classes = []
permission_classes = []
throttle_classes = [MyThrottle, ]
def post(self, request):pass
def throttled(self, request, wait):
if not wait:
msg = 'Request was throttled.'
else:
msg = f"需等待{int(wait)}s后才能访问!"
detail = {
"code": 1005,
"data": "访问频繁",
"wait": msg,
}
raise exceptions.Throttled(detail=detail)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 方式二(推荐)
源码
raise exceptions.Throttled(detail=detail)
, Throttled 本质就是一个继承 APIException的类..
最终还是得将 经过逻辑处理后的 detail值 传递到 APIException中..
So, 我们也可以不用 Throttled, 自己写一个类来继承 APIException. 这样扩展性更高, 推荐!!
from rest_framework import exceptions
from rest_framework import status
class ThrottleException(exceptions.APIException):
status_code = status.HTTP_429_TOO_MANY_REQUESTS
default_code = "throttled"
class LoginView(APIView):
authentication_classes = []
permission_classes = []
throttle_classes = [MyThrottle, ]
def post(self, request):pass
def throttled(self, request, wait):
if not wait:
msg = 'Request was throttled.'
else:
msg = f"需等待{int(wait)}s后才能访问!"
detail = {
"code": 1005,
"data": "访问频繁",
"wait": msg,
}
raise ThrottleException(detail)
#--- 还可以换一种写法!!!
from rest_framework import exceptions
from rest_framework import status
class ThrottleException(exceptions.APIException):
status_code = status.HTTP_429_TOO_MANY_REQUESTS
default_code = "throttled"
def __init__(self, wait, code=None):
if not wait:
msg = 'Request was throttled.'
else:
msg = f"需等待{int(wait)}s后才能访问!"
detail = {
"code": 1005,
"data": "访问频繁",
"wait": msg,
}
super().__init__(detail, code)
class LoginView(APIView):
authentication_classes = []
permission_classes = []
throttle_classes = [MyThrottle, ]
def post(self, request):pass
def throttled(self, request, wait):
raise ThrottleException(wait)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# 多个限流类(扩展)
源码里的两种情况 + 自己扩展的一种
扩展的情况 关键在于,抛出的异常是会往上直到被捕获的!!原理类似于认证类验证时抛出的异常的捕获过程.
根据上面的分析, 主要是 APIView类里的 check_throttles 方法 + SimpleRateThrottle类里的 allow_request方法..
allow_request方法三种返回 (我们扩展的就是第三种):
1> True
2> Fasle
3> raise
源码里allow_request方法
- 返回True,表示当前限流类允许访问
check_throttles 里的for循环继续执行 后续限流类的allow_request方法;
- 返回False,表示当前限流类不允许访问,会计算等待时间加入throttle_durations列表中
check_throttles 里的for循环继续执行 后续限流类的allow_request方法;
当循环完毕,会找到throttle_durations列表中的最大等待时间,返回限流用户提示信息!
上述只是源码提供的流程,我们还可以分析源码,进行扩展.扩展/自定义第三种情况.
check_throttles 当前限流类不允许访问时,不返回False,而是直接抛出异常APIException类型的异常
该异常的抛出会导致循环结束,程序终止,也就是说, '[<后续的限流类都不会执行,不会管啦>]'.
异常会被APIView.dispatch里的try..except..捕获到!!
具体如何实现扩展呢? 关键在于在限流类里重写 throttle_failure 方法!!
from rest_framework.throttling import SimpleRateThrottle
from django.utils.connection import ConnectionProxy
from rest_framework import exceptions
from rest_framework import status
class ThrottledException(exceptions.APIException):
status_code = status.HTTP_429_TOO_MANY_REQUESTS
default_code = "throttled"
class MyThrottle(SimpleRateThrottle):
scope = "XXX"
THROTTLE_RATES = {"XXX": "3/m"}
def get_cache_key(self, request, view):
if request.user:
ident = request.user.pk
else:
ident = self.get_ident(request)
return self.cache_format % {"scope": self.scope, "ident": ident}
def throttle_failure(self):
wait = self.wait
detail = {
"code": 1005,
"data": "访问频率限制",
"wait": f'需等待{int(wait)}s后才能访问!',
}
raise ThrottledException(detail)
# Ps:还可以进一步封装,写个类A A(SimpleRateThrottle),里面重写throttle_failure方法
# MyThrottle限流类就可以有两个选择来继承 SimpleRateThrottle 和 A "回顾:权限里or的扩展也讲到了可以这么封装"
# 前者在限流执行过程中,allow_request方法是返回True和False;后者是返回True和raise 看应用场景来选择!!
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# 思考题
Q: 若'3/m'的限流类进行了全局配置 . 依次访问了接口1、2、3, 接着再访问接口1, 会被限制吗? 为什么?
A:
会!!
再次提醒:
唯一标识的构成 --> 'throttle' + scope + 用户id / IP地址 组合的字符串!!
redis中存储的数据 --> {"唯一标识":[时间戳1,时间戳2,时间戳3]}
因为是全局配置的,所有接口读取的redis数据库中的数据的k值都是一样的,都是那同一个唯一标识!!
★ So,限流类我们通常不会进行全局配置.而是局部配置.而且每个限流类的get_cache_key方法返回的唯一标识都是不一样的!!
突然想到一个需求: 某个接口 不登陆就能访问 但未登录访问频率 5/m 登陆后访问频率 10/m
在前面的快速使用那,就有一点苗头了,但可惜的是,不管是登陆使用的user_id还是未登录使用的ip,其对应的访问频率都是一样的.
如何解决? 运用drf中已有的AnonRateThrottle、UserRateThrottle,这两个限流类.
有几个需要注意的地方:
1> 这两个限流类里的if判断使用了 from django.contrib import auth Django的auth认证组件
2> 关于scope,AnonRateThrottle默认是'anon',UserRateThrottle默认是'user'
- 我们需要在全局配置对应的访问频率
REST_FRAMEWORK = {
'DEFAULT_THROTTLE_RATES': {
'anon': '3/m',
'user': '10/m'
}
}
- 在视图类里进行限流类的局部配置
class ThrottleTestAPI(GenericAPIView):
throttle_classes = [AnonRateThrottle,UserRateThrottle]
def get(self,request):
if request.user.is_authenticated:
return Response(data="你的请求次数有十次每分钟.")
return Response("你的请求次数只有三次每分钟.")
def post(self,request):
user_obj = auth.authenticate(username="admin",password="admin123")
auth.login(request,user_obj)
return Response("登录成功了!")
补充解释:
因为视图类是局部配置的,意味着 仅仅只是 ThrottleTestAPI 这个视图类 在限流方面 维护了三个键值对!
(若是全局配置的话,那就是所有的视图类 在限流方面 维护了这三个键值对.)
throttle_anon_IP -- 3/m
throttle_user_userID -- 10/m
throttle_user_IP -- 10/m
- 当未登录,访问ThrottleTestAPI这个视图类对应的路由,键throttle_anon_IP对应的值加1,键throttle_user_IP的值加1.
当未登录的状态1min访问3次后,AnonRateThrottle限流器通过不了啦!抛出异常!
- 当登录后,访问ThrottleTestAPI这个视图类对应的路由,AnonRateThrottle限流类直接通过
具体体现在源码里
def allow_request(self, request, view):
self.key = self.get_cache_key(request, view)
if self.key is None:
return True
因为已经登陆,键throttle_user_userID的值加1,直到1min访问10次后,UserRateThrottle限流类抛出异常.
https://pythondjango.cn/django/rest-framework/10-throttling/
https://www.cnblogs.com/yunya-cnblogs/p/13911110.html
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# 限流的应用
无需登陆/匿名用户访问的接口限流 1分钟5次
需要登陆才能访问的接口限流 1分钟10次.
from rest_framework.throttling import SimpleRateThrottle
# 匿名用户限流类
class IpThrottle(SimpleRateThrottle):
scope = "ip"
THROTTLE_RATES = {"ip": "5/m"}
def get_cache_key(self, request, view):
ident = self.get_ident(request) # IP
return self.cache_format % {"scope": self.scope, "ident": ident}
# 已成功登陆用户限流类
class UserThrottle(SimpleRateThrottle):
scope = "user"
THROTTLE_RATES = {"user": "10/m"}
def get_cache_key(self, request, view):
ident = request.user.pk # 用户ID
return self.cache_format % {"scope": self.scope, "ident": ident}
"""
1. IpThrottle、UserThrottle 中的 THROTTLE_RATES都可以不写,在全局中进行配置!
但IpThrottle和UserThrottle通常不会在全局进行配置!
2. - IpThrottle应用于无需登陆就能访问的接口. 比如: /login 登陆接口. throttle_classes = [IpThrottle, ]
- UserThrottle应用于登陆成功后才能访问的接口. throttle_classes = [UserThrottle, ]
也就是认证成功后,requst.user中有了当前登陆用户的信息,接着对该用户访问的这个接口进行限流.
- 当然,理论逻辑上,登陆成功后才能访问的接口也可以进行IP的限流!IP和用户ID两个限流类也可以同时使用.
throttle_classes = [UserThrottle, IpThrottle] 但仔细想想,就此处而言,很鸡肋,5/m都通过不了,10/m还通过的了吗?
怎么运行的?再来看这部分关键代码:
def check_throttles(self, request):
throttle_durations = []
for throttle in self.get_throttles():
if not throttle.allow_request(request, self):
throttle_durations.append(throttle.wait())
if throttle_durations:
durations = [
duration for duration in throttle_durations
if duration is not None
]
duration = max(durations, default=None)
self.throttled(request, duration) # 报错.
分析: IpThrottle 5/m UserThrottle 10/m
1min里,前5次访问接口都没问题,第6次,IpThrottle().allow_request就会返回False,计算等待时间加入throttle_durations列表.
继续循环,UserThrottle().allow_request返回的是Ture,不用管. 循环结束.
往下执行,抛出限流的错误提示信息!!
Ps: 对一个认证用户进行限流不仅要限制每分钟的请求次数,还需要限制每小时的请求次数,就需应用多个限流类.
比如 5次/min + 200次/hour (因为1分钟5次,理论上1小时可以访问300次)
再来想想过程
假设40分钟访问了200次,想继续点时,两个限流类都通过不了啦,一个计算等等时间50多s,一个计算等待时间20分钟.
限流报错的提示信息肯定是还需等待1200s,也就是20分钟.
"""
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# 模拟
auth.py和per.py的代码就不贴在这了, 跟权限里的模拟那的代码一样!!
thr.py
import time
class SimpleRateThrottle:
scope = None
THROTTLE_RATES = {"XXX", None}
cache_format = 'throttle_%(scope)s_%(ident)s'
timer = time.time
VISIT_RECORD = {}
def __init__(self):
if not getattr(self, 'rate', None):
self.rate = self.THROTTLE_RATES[self.scope]
self.num_requests, self.duration = self.parse_rate(self.rate) # 5 60
self.history = []
def get_cache_key(self, request, view):
raise NotImplementedError('.get_cache_key() must be overridden')
def parse_rate(self, rate):
if rate is None:
return None, None
num, period = rate.split('/')
num_requests = int(num)
duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]
return num_requests, duration
def allow_request(self, request, view):
if self.rate is None:
return True
self.key = self.get_cache_key(request, view) # "throttle_XXX_127.0.0.1"
if self.key is None:
return True
self.now = self.timer()
self.history = self.VISIT_RECORD.get(self.key, [])
while self.history and self.history[-1] <= self.now - self.duration:
self.history.pop()
if len(self.history) >= self.num_requests:
return False
else:
self.history.insert(0, self.now)
self.VISIT_RECORD[self.key] = self.history
return True
def wait(self):
return self.duration - (self.now - self.history[-1]) # 得到还需等待多久.
class MyThrottle(SimpleRateThrottle):
scope = "XXX"
THROTTLE_RATES = {"XXX": "2/m"}
cache = "default_cache"
def get_cache_key(self, request, view):
if request.user:
ident = request.user.pk
else:
ident = "127.0.0.1"
# "throttle_XXX_127.0.0.1"
return self.cache_format % {"scope": self.scope, "ident": ident}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
run.py
from auth import QueryParamsAuthentication, HeaderAuthentication, NoAuthentication
from per import MyPermission1, MyPermission2, MyPermission3
from thr import MyThrottle
class View:
@classmethod
def as_view(cls, **initkwargs):
def view(request, *args, **kwargs):
self = cls(**initkwargs)
return self.dispatch(request, *args, **kwargs)
return view
def dispatch(self, request, *args, **kwargs):
pass
class APIView(View):
authentication_classes = [QueryParamsAuthentication, HeaderAuthentication]
permission_classes = [MyPermission1, MyPermission2]
throttle_classes = [MyThrottle, ]
@classmethod
def as_view(cls, **initkwargs):
# view = super(APIView, cls).as_view()
view = super().as_view(**initkwargs)
return view
def dispatch(self, request, *args, **kwargs):
self.args = args
self.kwargs = kwargs
request = self.initialize_request(request, *args, **kwargs) # 二次封装request
try:
self.initial(request, *args, **kwargs)
handler = getattr(self, request.method.lower())
return handler(request, *args, **kwargs)
except Exception as e:
print("dispatch捕获的报错>>:", e)
def initialize_request(self, request, *args, **kwargs):
return Request(
request,
authenticators=self.get_authenticators(),
)
def get_authenticators(self):
return [auth() for auth in self.authentication_classes]
def get_permissions(self):
return [permission() for permission in self.permission_classes]
def get_throttles(self):
return [throttle() for throttle in self.throttle_classes]
def initial(self, request, *args, **kwargs):
self.perform_authentication(request) # 进行认证
self.check_permissions(request) # 进行权限校验
self.check_throttles(request) # 进行限流校验
def perform_authentication(self, request):
request.user
def check_permissions(self, request):
for permission in self.get_permissions():
if not permission.has_permission(request, self):
self.permission_denied(
request,
message=getattr(permission, 'message', None),
code=getattr(permission, 'code', None)
)
def check_throttles(self, request):
throttle_durations = []
for throttle in self.get_throttles():
if not throttle.allow_request(request, self):
throttle_durations.append(throttle.wait())
if throttle_durations:
duration = max(throttle_durations, default=None)
raise Exception(f"限流,离下一次访问还需要{int(duration)}s!")
def permission_denied(self, request, message=None, code=None):
print(request.user, request.auth, request._authenticator)
if request.authenticators and not request._authenticator:
raise Exception("not_authenticated.")
raise Exception(f"permission_denied: {message}-{code}")
class Request:
def __init__(self, request, authenticators=None):
self._request = request
self.authenticators = authenticators or ()
def __getattr__(self, attr):
try:
return getattr(self._request, attr)
except AttributeError:
return self.__getattribute__(attr)
@property
def user(self):
if not hasattr(self, '_user'):
self._authenticate()
return self._user
@user.setter
def user(self, value):
self._user = value
def _authenticate(self):
for authenticator in self.authenticators:
try:
user_auth_tuple = authenticator.authenticate(self)
except Exception:
self._not_authenticated()
raise
if user_auth_tuple is not None:
self._authenticator = authenticator
self.user, self.auth = user_auth_tuple
return
self._not_authenticated()
def _not_authenticated(self):
self._authenticator = None
self.user = None
self.auth = None
if __name__ == '__main__':
import random
import time
class DjangoRequest:
def __init__(self):
self.method = random.choice(['GET', 'POST']) # 模拟发送的请求类型
class InfoView(APIView):
authentication_classes = []
permission_classes = []
throttle_classes = [MyThrottle, ]
def __init__(self, **kwargs):
self.tip = "Hello"
for k, v in kwargs.items():
setattr(self, k, v)
def say(self):
return "123123"
def get(self, request, xx):
print(self.args, self.kwargs) # (111,) {}
print(request.user, request.auth)
return "InfoView-GET"
def post(self, request, xx):
print(self.args, self.kwargs) # (111,) {}
print(request.user, request.auth)
return "InfoView-POST"
request = DjangoRequest()
for _ in range(5):
print(InfoView.as_view(**{"height": 100, "wight": 200})(request, 111)) # 111是模拟路由参数传递!
print("======")
time.sleep(1) # 模拟手动点击发送请求的间隔时间.
""" 运行结果如下:
(111,) {}
None None
InfoView-GET
======
(111,) {}
None None
InfoView-GET
======
dispatch捕获的报错>>: 限流,离下一次访问还需要57s!
None
======
dispatch捕获的报错>>: 限流,离下一次访问还需要56s!
None
======
dispatch捕获的报错>>: 限流,离下一次访问还需要55s!
None
======
"""
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191