worker
# 连接redis
import redis
import redis
REDIS_PARAMS = {
"host": '127.0.0.1',
"port": "6379",
"password": '',
"encoding": 'utf-8'
}
conn = redis.Redis(**REDIS_PARAMS)
# -- rpop从右开始拿,没有就结束; brpop没有傻傻的一直等,当然可以设置最多等待时间
# eg: oid --> (b'YANG_TASK_QUEUE', b'2022081409424192778452289117')
oid = conn.brpop(settings.QUEUE_TASK_NAME, timeout=5)
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
用户创建订单后, 订单信息就会存在redis中,我们从中取数据!!
# 连接mysql
import pymysql
注意: pymql执行sql语句有两种方式!!
1> cursor.execute('select * from hosts where id > %s', [12, ])
2> cursor.execute('select * from hosts where id > %(n1)s', {"n1":12})
import pymysql
MYSQL_CONN_PARAMS = {
'host': "127.0.0.1",
'port': 3306,
'user': 'root',
'passwd': "admin1234",
'charset': "utf8",
'db': "order",
}
conn = pymysql.connect(**MYSQL_CONN_PARAMS)
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = 'select * from web_order where oid=2024050702594654374820779848'
cursor.execute(sql)
# 无DictCursor --> fetchone的结果元祖 (6, 1, 1, '2024050702594654374820779848')
# 有DictCursor --> fetchone的结果字典 {'id': 6, 'active': 1, 'status': 1, 'oid': '2024050702594654374820779848'}
row_dict = cursor.fetchone()
cursor.close()
conn.close()
# ===== ==== ===
通过fetchone取到的数据,{'id': 6, 'active': 1, 'status': 1, 'oid': '2024050702594654374820779848'}
需要通过row_dict["id"],row_dict['status']这种方式取值,较为繁琐!
可以创建一个类,将从数据库中取到的这些字段值封装进去:(用起来会方便很多~
class DbRow(object):
def __init__(self, id, oid, status, url, count):
self.id = id
self.oid = oid # 订单号
self.status = status # 订单状态
self.url = url # 刷量的url
self.count = count # 刷的播放量
row_obj = DbRow(**row_dict)
row_obj.id
row_obj.status
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 上下文管理器
# 待优化代码
下面这段代码, 在获取订单信息和更新订单状态的时候, 都用到连接mysql的代码!!
import redis
import settings
import pymysql
class DbRow(object):
def __init__(self, id, oid, status, url, count):
self.id = id
self.oid = oid # 订单号
self.status = status # 订单状态
self.url = url # 刷量的url
self.count = count # 刷的播放量
def get_redis_task():
# 直接连接redis
conn = redis.Redis(**settings.REDIS_PARAMS)
# rpop从右开始拿,没有就结束 brpop没有傻傻的一直等,当然可以设置最多等待时间
oid = conn.brpop(settings.QUEUE_TASK_NAME, timeout=5)
if not oid:
return
# eg: oid --> (b'YANG_TASK_QUEUE', b'2022081409424192778452289117')
return oid[1].decode('utf-8')
def get_order_info_by_id(oid):
"""获取订单信息"""
conn = pymysql.connect(
host='127.0.0.1', port=3306, user='root', passwd='admin1234', charset="utf8", db="order")
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = 'select id, oid, status, url, count from web_order where oid=%s and status=1'
cursor.execute(sql, [oid, ])
# (6, 1, 1, '2024050702594654374820779848') /
# {'id': 6, 'active': 1, 'status': 1, 'oid': '2024050702594654374820779848'}
row_dict = cursor.fetchone()
cursor.close()
conn.close()
if not row_dict: # 只有当订单状态为1,即待执行时,才需要获取订单信息
return
row_obj = DbRow(**row_dict)
return row_obj
def update_order_status(oid, status):
"""更新订单状态"""
conn = pymysql.connect(
host='127.0.0.1', port=3306, user='root', passwd='admin1234', charset="utf8", db="order")
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = 'update web_order set status=%s where oid=%s'
cursor.execute(sql, [status, oid, ])
conn.commit()
cursor.close()
conn.close()
def run():
while True:
# 1.去redis的队列中获取待执行的订单号
# oid = get_redis_task()
# if not oid:
# continue # 有订单号才能继续往下走
oid = '2024050702594654374820779848'
# 2.连接数据库获取订单信息 <需连接数据库>
row_obj = get_order_info_by_id(oid)
if not row_obj:
continue
# 3.更新订单状态 待执行->正在执行 <需连接数据库>
update_order_status(oid, 2)
if __name__ == '__main__':
run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# with上下文
__enter__
以及__exit__
文件管理open就自带上下文管理 用的时候打开,不用时自己关闭! 我们也可以自己写上下文管理器,执行with语句时,连接数据库,with代码块执行完后,关闭数据库!!
# 基本框架
往下面这段代码中填充内容!
class Connect:
def __init__(self):
pass
def __enter__(self):
# To do:连接数据库
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# To do:关闭数据库
pass
def fetch_one(self):
pass
if __name__ == '__main__':
""" ★★★
with语句,进去时自动执行__enter__,出来时自动执行__exit__
__enter__返回啥,conn就是啥, 那么此处若__enter__返回self,那么conn就是Connect()这个实例对象!!
"""
with Connect() as conn:
conn.fetch_one()
""" 相当于,这么一回事:
with 获取连接:
执行SQL(执行完毕后,自动将连接交还给连接池)
"""
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 关键代码
这段代码就够用啦!!
class Connect(object):
def __init__(self):
self.conn = None
self.cursor = None
def __enter__(self):
self.conn = pymysql.connect(**settings.MYSQL_CONN_PARAMS)
self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.cursor.close()
self.conn.close()
def exec(self, sql, **kwargs):
"""增加、更新、删除时使用该方法!"""
self.cursor.execute(sql, kwargs)
self.conn.commit()
def fetch_one(self, sql, **kwargs):
"""查看一条数据"""
self.cursor.execute(sql, kwargs)
result = self.cursor.fetchone()
return result
def fetch_all(self, sql, **kwargs):
"""查看所有数据"""
self.cursor.execute(sql, kwargs)
result = self.cursor.fetchall()
return result
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 优化后的代码
mysql的连接关闭使用上上下文啦!!
import redis
import settings
import pymysql
class DbRow(object):
def __init__(self, id, oid, status, url, count):
self.id = id
self.oid = oid # 订单号
self.status = status # 订单状态
self.url = url # 刷量的url
self.count = count # 刷的播放量
class Connect:
def __init__(self):
self.conn = conn = pymysql.connect(**settings.MYSQL_CONN_PARAMS)
self.cursor = conn.cursor(pymysql.cursors.DictCursor)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.cursor.close()
self.conn.close()
def exec(self, sql, **kwargs):
self.cursor.execute(sql, kwargs)
self.conn.commit()
def fetch_one(self, sql, **kwargs):
self.cursor.execute(sql, kwargs)
result = self.cursor.fetchone()
return result
def fetch_all(self, sql, **kwargs):
self.cursor.execute(sql, kwargs)
result = self.cursor.fetchall()
return result
def get_redis_task():
conn = redis.Redis(**settings.REDIS_PARAMS) # 直接连接redis
# rpop从右开始拿,没有就结束 brpop没有傻傻的一直等,当然可以设置最多等待时间
oid = conn.brpop(settings.QUEUE_TASK_NAME, timeout=5)
if not oid:
return
# eg: oid --> (b'YANG_TASK_QUEUE', b'2022081409424192778452289117')
return oid[1].decode('utf-8')
def get_order_info_by_id(oid):
with Connect() as conn:
row_dict = conn.fetch_one(
"select id,oid,status,url,count from web_order where oid=%(oid)s and status=1",
oid=oid
)
if not row_dict:
return
row_object = DbRow(**row_dict)
return row_object
def update_order_status(oid, status):
with Connect() as conn:
conn.exec("update web_order set status=%(status)s where oid=%(oid)s", status=status, oid=oid)
def run():
while True:
# 1.去redis的队列中获取待执行的订单号
# oid = get_redis_task()
# if not oid:
# continue # 有订单号才能继续往下走
oid = '2024050702574353748298051742'
# 2.连接数据库获取订单信息
row_obj = get_order_info_by_id(oid)
if not row_obj:
continue
# 3.更新订单状态 待执行->正在执行
update_order_status(oid, 2)
import brush # -- 这里面就是写的逆向爬虫,具体算法就不说啦!!略.
# 4.执行订单 -- 使用线程池!!并发最多开到20
pool = ThreadPoolExecutor(20)
for i in range(row_obj.count): # 假设要给该视频地址刷100个播放量
pool.submit(brush.task, row_obj.url)
pool.shutdown() # 组塞住,等待20线程把100播放量的任务执行完成,程序才往下走
# 5.再次更新订单状态 正在执行->已完成
update_order_status(oid, 3)
if __name__ == '__main__':
run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# 一些思考
(*≧ω≦)
Q: 如果中途程序崩溃了,没有刷完怎么办
程序突然关闭
- 正常运行时,500次,在数据库更新一次,已刷多少.
- 崩溃
> 状态,正在执行
> 查看已经刷了多少
- 重新打开时,先去找到所有的正在执行的订单,重新加入队列去执行(所有的减去已刷的).
还可以在本地记录日志,记录跑了多少,奔溃时查看日志,再补刷!
Q: 刷单程序 使用的是 远程数据库?
A: 是的.
Q: 日志相关
A:
- logging模块,内置模块+线程安全.
- 第三方模块日志模块.
- sentry日志&探针.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 打包
将几个脚本打包成可执行文件!!
相当于打包成了一个客户端, 发给任何人都, 它都可以去执行.
- 可执行文件打包
pip install pyinstaller
windows环境下打包出来的就是 windows包;
mac环境下打包出来的就是 mac包.
# -- app.py 是右键运行的那个py文件!! 打包完后出现一个dist文件夹,双击里面的可执行文件即可.
pyinstaller -D app.py
pyinstaller -F app.py # -F的方式dist文件夹里就只会有一个文件
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 补充
(・_・;
python执行js代码!!
# Python调用JavaScript代码
# - 安装nodejs + 配置环境变量
# - 安装pyexecjs模块 pip install pyexecjs
import execjs # pip install pyexecjs
javascript_file = execjs.compile("""
function createGUID(e) {
e = e || 32;
for (var t = "", r = 1; r <= e; r++) {
t += Math.floor(16 * Math.random()).toString(16);
}
return t;
}
""")
guid = javascript_file.call('createGUID')
print(guid)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
try..except.. 看错误的调用栈,即哪一行出错啦!
try:
pass
except Exception as e:
print(e)
import traceback
print(traceback.format_exc())
exit()
1
2
3
4
5
6
7
2
3
4
5
6
7