worker
  # 连接redis
import redis
import redis
REDIS_PARAMS = {
    "host": '127.0.0.1',
    "port": "6379",
    "password": '',
    "encoding": 'utf-8'
}
conn = redis.Redis(**REDIS_PARAMS)
# -- rpop从右开始拿,没有就结束; brpop没有傻傻的一直等,当然可以设置最多等待时间   
#    eg: oid --> (b'YANG_TASK_QUEUE', b'2022081409424192778452289117')      
oid = conn.brpop(settings.QUEUE_TASK_NAME, timeout=5)
 1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
用户创建订单后, 订单信息就会存在redis中,我们从中取数据!!

# 连接mysql
import pymysql
注意: pymql执行sql语句有两种方式!!
1> cursor.execute('select * from hosts where id > %s', [12, ])
2> cursor.execute('select * from hosts where id > %(n1)s', {"n1":12})
import pymysql
MYSQL_CONN_PARAMS = {
    'host': "127.0.0.1",
    'port': 3306,
    'user': 'root',
    'passwd': "admin1234",
    'charset': "utf8",
    'db': "order",
}
conn = pymysql.connect(**MYSQL_CONN_PARAMS) 
cursor = conn.cursor(pymysql.cursors.DictCursor)                                                                 
sql = 'select * from web_order where oid=2024050702594654374820779848'                                           
cursor.execute(sql)                                                                                                    
# 无DictCursor --> fetchone的结果元祖 (6, 1, 1, '2024050702594654374820779848') 
# 有DictCursor --> fetchone的结果字典 {'id': 6, 'active': 1, 'status': 1, 'oid': '2024050702594654374820779848'}
row_dict = cursor.fetchone()                                                                                                                                                                      
cursor.close()
conn.close()    
# ===== ==== ===
通过fetchone取到的数据,{'id': 6, 'active': 1, 'status': 1, 'oid': '2024050702594654374820779848'}
需要通过row_dict["id"],row_dict['status']这种方式取值,较为繁琐!
可以创建一个类,将从数据库中取到的这些字段值封装进去:(用起来会方便很多~
class DbRow(object):
    def __init__(self, id, oid, status, url, count):
        self.id = id
        self.oid = oid  # 订单号
        self.status = status  # 订单状态
        self.url = url  # 刷量的url
        self.count = count  # 刷的播放量
        
row_obj = DbRow(**row_dict)
row_obj.id
row_obj.status
 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 上下文管理器
# 待优化代码
下面这段代码, 在获取订单信息和更新订单状态的时候, 都用到连接mysql的代码!!
import redis
import settings
import pymysql
class DbRow(object):
    def __init__(self, id, oid, status, url, count):
        self.id = id
        self.oid = oid  # 订单号
        self.status = status  # 订单状态
        self.url = url  # 刷量的url
        self.count = count  # 刷的播放量
def get_redis_task():
    # 直接连接redis
    conn = redis.Redis(**settings.REDIS_PARAMS)
    # rpop从右开始拿,没有就结束 brpop没有傻傻的一直等,当然可以设置最多等待时间
    oid = conn.brpop(settings.QUEUE_TASK_NAME, timeout=5)
    if not oid:
        return
    # eg: oid --> (b'YANG_TASK_QUEUE', b'2022081409424192778452289117')
    return oid[1].decode('utf-8')
def get_order_info_by_id(oid):
    """获取订单信息"""
    conn = pymysql.connect(
        host='127.0.0.1', port=3306, user='root', passwd='admin1234', charset="utf8", db="order")
    cursor = conn.cursor(pymysql.cursors.DictCursor)
    sql = 'select  id, oid, status, url, count from web_order where oid=%s and status=1'
    cursor.execute(sql, [oid, ])
    # (6, 1, 1, '2024050702594654374820779848') / 
    # {'id': 6, 'active': 1, 'status': 1, 'oid': '2024050702594654374820779848'}
    row_dict = cursor.fetchone()
    cursor.close()
    conn.close()
    if not row_dict:  # 只有当订单状态为1,即待执行时,才需要获取订单信息
        return
    row_obj = DbRow(**row_dict)
    return row_obj
def update_order_status(oid, status):
    """更新订单状态"""
    conn = pymysql.connect(
        host='127.0.0.1', port=3306, user='root', passwd='admin1234', charset="utf8", db="order")
    cursor = conn.cursor(pymysql.cursors.DictCursor)
    sql = 'update web_order set status=%s where oid=%s'
    cursor.execute(sql, [status, oid, ])
    conn.commit()
    cursor.close()
    conn.close()
def run():
    while True:
        # 1.去redis的队列中获取待执行的订单号
        # oid = get_redis_task()
        # if not oid:
        #     continue  # 有订单号才能继续往下走
        
        oid = '2024050702594654374820779848'
        
        # 2.连接数据库获取订单信息  <需连接数据库>
        row_obj = get_order_info_by_id(oid)
        if not row_obj:
            continue
            
        # 3.更新订单状态 待执行->正在执行  <需连接数据库>
        update_order_status(oid, 2)
if __name__ == '__main__':
    run()
 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# with上下文
__enter__以及__exit__
文件管理open就自带上下文管理 用的时候打开,不用时自己关闭! 我们也可以自己写上下文管理器,执行with语句时,连接数据库,with代码块执行完后,关闭数据库!!
# 基本框架
往下面这段代码中填充内容!
class Connect:
    def __init__(self):
        pass
    def __enter__(self):
        # To do:连接数据库
        return self
    def __exit__(self, exc_type, exc_val, exc_tb):
        # To do:关闭数据库
        pass
    def fetch_one(self):
        pass
if __name__ == '__main__':
    """ ★★★
    with语句,进去时自动执行__enter__,出来时自动执行__exit__
    __enter__返回啥,conn就是啥, 那么此处若__enter__返回self,那么conn就是Connect()这个实例对象!!
    """
    with Connect() as conn:
        conn.fetch_one()
  
""" 相当于,这么一回事:
with 获取连接:
    执行SQL(执行完毕后,自动将连接交还给连接池)
"""
 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 关键代码
这段代码就够用啦!!
class Connect(object):
    def __init__(self):
        self.conn = None
        self.cursor = None
    def __enter__(self):
        self.conn = pymysql.connect(**settings.MYSQL_CONN_PARAMS)
        self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)
        return self
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.cursor.close()
        self.conn.close()
    def exec(self, sql, **kwargs):
        """增加、更新、删除时使用该方法!"""
        self.cursor.execute(sql, kwargs)
        self.conn.commit()
    def fetch_one(self, sql, **kwargs):
        """查看一条数据"""
        self.cursor.execute(sql, kwargs)
        result = self.cursor.fetchone()
        return result
    def fetch_all(self, sql, **kwargs):
        """查看所有数据"""
        self.cursor.execute(sql, kwargs)
        result = self.cursor.fetchall()
        return result
 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 优化后的代码
mysql的连接关闭使用上上下文啦!!
import redis
import settings
import pymysql
class DbRow(object):
    def __init__(self, id, oid, status, url, count):
        self.id = id
        self.oid = oid  # 订单号
        self.status = status  # 订单状态
        self.url = url  # 刷量的url
        self.count = count  # 刷的播放量
class Connect:
    def __init__(self):
        self.conn = conn = pymysql.connect(**settings.MYSQL_CONN_PARAMS)
        self.cursor = conn.cursor(pymysql.cursors.DictCursor)
    def __enter__(self):
        return self
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.cursor.close()
        self.conn.close()
    def exec(self, sql, **kwargs):
        self.cursor.execute(sql, kwargs)
        self.conn.commit()
    def fetch_one(self, sql, **kwargs):
        self.cursor.execute(sql, kwargs)
        result = self.cursor.fetchone()
        return result
    def fetch_all(self, sql, **kwargs):
        self.cursor.execute(sql, kwargs)
        result = self.cursor.fetchall()
        return result
def get_redis_task():
    conn = redis.Redis(**settings.REDIS_PARAMS)  # 直接连接redis
    # rpop从右开始拿,没有就结束 brpop没有傻傻的一直等,当然可以设置最多等待时间
    oid = conn.brpop(settings.QUEUE_TASK_NAME, timeout=5)
    if not oid:
        return
    # eg: oid --> (b'YANG_TASK_QUEUE', b'2022081409424192778452289117')
    return oid[1].decode('utf-8')
def get_order_info_by_id(oid):
    with Connect() as conn:
        row_dict = conn.fetch_one(
            "select id,oid,status,url,count from web_order where oid=%(oid)s and status=1",
            oid=oid
        )
    if not row_dict:
        return
    row_object = DbRow(**row_dict)
    return row_object
def update_order_status(oid, status):
    with Connect() as conn:
        conn.exec("update web_order set status=%(status)s where oid=%(oid)s", status=status, oid=oid)
def run():
    while True:
        # 1.去redis的队列中获取待执行的订单号
        # oid = get_redis_task()
        # if not oid:
        #     continue  # 有订单号才能继续往下走
        oid = '2024050702574353748298051742'
        # 2.连接数据库获取订单信息
        row_obj = get_order_info_by_id(oid)
        if not row_obj:
            continue
        # 3.更新订单状态 待执行->正在执行
        update_order_status(oid, 2)
        
        import brush  # -- 这里面就是写的逆向爬虫,具体算法就不说啦!!略.
        
        # 4.执行订单 -- 使用线程池!!并发最多开到20
        pool = ThreadPoolExecutor(20)
        for i in range(row_obj.count):  # 假设要给该视频地址刷100个播放量
            pool.submit(brush.task, row_obj.url)
        pool.shutdown()  # 组塞住,等待20线程把100播放量的任务执行完成,程序才往下走
        # 5.再次更新订单状态 正在执行->已完成
        update_order_status(oid, 3)
if __name__ == '__main__':
    run()
 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# 一些思考
(*≧ω≦)
Q: 如果中途程序崩溃了,没有刷完怎么办
程序突然关闭
- 正常运行时,500次,在数据库更新一次,已刷多少.
- 崩溃
  > 状态,正在执行
  > 查看已经刷了多少
- 重新打开时,先去找到所有的正在执行的订单,重新加入队列去执行(所有的减去已刷的).
还可以在本地记录日志,记录跑了多少,奔溃时查看日志,再补刷!
Q: 刷单程序 使用的是 远程数据库?
A: 是的.
Q: 日志相关
A:
  - logging模块,内置模块+线程安全.
  - 第三方模块日志模块.
  - sentry日志&探针.
 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 打包
将几个脚本打包成可执行文件!!
相当于打包成了一个客户端, 发给任何人都, 它都可以去执行.
- 可执行文件打包
  pip install pyinstaller
windows环境下打包出来的就是 windows包;
mac环境下打包出来的就是 mac包.
# -- app.py 是右键运行的那个py文件!! 打包完后出现一个dist文件夹,双击里面的可执行文件即可.
pyinstaller -D app.py
pyinstaller -F app.py  # -F的方式dist文件夹里就只会有一个文件
 1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 补充
(・_・;
python执行js代码!!
# Python调用JavaScript代码
#   - 安装nodejs + 配置环境变量
#   - 安装pyexecjs模块  pip install pyexecjs
import execjs  # pip install pyexecjs
javascript_file = execjs.compile("""
function createGUID(e) {
    e = e || 32;
    for (var t = "", r = 1; r <= e; r++) {
        t += Math.floor(16 * Math.random()).toString(16);
    }
    return t;
}
""")
guid = javascript_file.call('createGUID')
print(guid)
 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
try..except.. 看错误的调用栈,即哪一行出错啦!
try:
    pass
except Exception as e:
    print(e)
    import traceback
    print(traceback.format_exc())
    exit()
 1
2
3
4
5
6
7
2
3
4
5
6
7