DC's blog DC's blog
首页
  • 计算机基础
  • linux基础
  • mysql
  • git
  • 数据结构与算法
  • axure
  • english
  • docker
  • opp
  • oop
  • 网络并发编程
  • 不基础的py基础
  • 设计模式
  • html
  • css
  • javascript
  • jquery
  • UI
  • 第一次学vue
  • 第二次学vue
  • Django
  • drf
  • drf_re
  • 温故知新
  • flask
  • 前后端不分离

    • BBS
    • 订单系统
    • CRM
  • 前后端部分分离

    • pear-admin-flask
    • pear-admin-django
  • 前后端分离

    • 供应链系统
  • 理论基础
  • py数据分析包
  • 机器学习
  • 深度学习
  • 华中科大的网课
  • cursor
  • deepseek
  • 杂文
  • 罗老师语录
  • 关于我

    • me
  • 分类
  • 归档
GitHub (opens new window)

DC

愿我一生欢喜,不为世俗所及.
首页
  • 计算机基础
  • linux基础
  • mysql
  • git
  • 数据结构与算法
  • axure
  • english
  • docker
  • opp
  • oop
  • 网络并发编程
  • 不基础的py基础
  • 设计模式
  • html
  • css
  • javascript
  • jquery
  • UI
  • 第一次学vue
  • 第二次学vue
  • Django
  • drf
  • drf_re
  • 温故知新
  • flask
  • 前后端不分离

    • BBS
    • 订单系统
    • CRM
  • 前后端部分分离

    • pear-admin-flask
    • pear-admin-django
  • 前后端分离

    • 供应链系统
  • 理论基础
  • py数据分析包
  • 机器学习
  • 深度学习
  • 华中科大的网课
  • cursor
  • deepseek
  • 杂文
  • 罗老师语录
  • 关于我

    • me
  • 分类
  • 归档
GitHub (opens new window)
  • BBS

  • 订单平台

    • 单点知识
    • 表结构
    • 用户名登陆
    • 短信登陆
    • 菜单和权限
    • 级别管理
    • 客户管理
    • 分页和搜索
    • 价格策略
    • 交易中心
    • message组件
    • 我的交易列表
    • worker
      • 连接redis
      • 连接mysql
      • 上下文管理器
        • 待优化代码
        • with上下文
        • 基本框架
        • 关键代码
        • 优化后的代码
      • 一些思考
      • 打包
      • 补充
    • 部署之代码同步
    • 部署之线上运行
    • 张sir的部署
  • CRM

  • flask+layui

  • django+layui

  • 供应链

  • 实战
  • 订单平台
DC
2024-05-07
目录

worker

# 连接redis

import redis

import redis

REDIS_PARAMS = {
    "host": '127.0.0.1',
    "port": "6379",
    "password": '',
    "encoding": 'utf-8'
}

conn = redis.Redis(**REDIS_PARAMS)
# -- rpop从右开始拿,没有就结束; brpop没有傻傻的一直等,当然可以设置最多等待时间   
#    eg: oid --> (b'YANG_TASK_QUEUE', b'2022081409424192778452289117')      
oid = conn.brpop(settings.QUEUE_TASK_NAME, timeout=5)
1
2
3
4
5
6
7
8
9
10
11
12
13

用户创建订单后, 订单信息就会存在redis中,我们从中取数据!!

image-20240507121816950


# 连接mysql

import pymysql

注意: pymql执行sql语句有两种方式!!
1> cursor.execute('select * from hosts where id > %s', [12, ])
2> cursor.execute('select * from hosts where id > %(n1)s', {"n1":12})

import pymysql

MYSQL_CONN_PARAMS = {
    'host': "127.0.0.1",
    'port': 3306,
    'user': 'root',
    'passwd': "admin1234",
    'charset': "utf8",
    'db': "order",
}
conn = pymysql.connect(**MYSQL_CONN_PARAMS) 
cursor = conn.cursor(pymysql.cursors.DictCursor)                                                                 
sql = 'select * from web_order where oid=2024050702594654374820779848'                                           
cursor.execute(sql)                                                                                                    
# 无DictCursor --> fetchone的结果元祖 (6, 1, 1, '2024050702594654374820779848') 
# 有DictCursor --> fetchone的结果字典 {'id': 6, 'active': 1, 'status': 1, 'oid': '2024050702594654374820779848'}
row_dict = cursor.fetchone()                                                                                                                                                                      
cursor.close()
conn.close()    


# ===== ==== ===
通过fetchone取到的数据,{'id': 6, 'active': 1, 'status': 1, 'oid': '2024050702594654374820779848'}
需要通过row_dict["id"],row_dict['status']这种方式取值,较为繁琐!
可以创建一个类,将从数据库中取到的这些字段值封装进去:(用起来会方便很多~
class DbRow(object):
    def __init__(self, id, oid, status, url, count):
        self.id = id
        self.oid = oid  # 订单号
        self.status = status  # 订单状态
        self.url = url  # 刷量的url
        self.count = count  # 刷的播放量
        
row_obj = DbRow(**row_dict)
row_obj.id
row_obj.status
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

# 上下文管理器

# 待优化代码

下面这段代码, 在获取订单信息和更新订单状态的时候, 都用到连接mysql的代码!!

import redis
import settings
import pymysql


class DbRow(object):
    def __init__(self, id, oid, status, url, count):
        self.id = id
        self.oid = oid  # 订单号
        self.status = status  # 订单状态
        self.url = url  # 刷量的url
        self.count = count  # 刷的播放量


def get_redis_task():
    # 直接连接redis
    conn = redis.Redis(**settings.REDIS_PARAMS)
    # rpop从右开始拿,没有就结束 brpop没有傻傻的一直等,当然可以设置最多等待时间
    oid = conn.brpop(settings.QUEUE_TASK_NAME, timeout=5)
    if not oid:
        return
    # eg: oid --> (b'YANG_TASK_QUEUE', b'2022081409424192778452289117')
    return oid[1].decode('utf-8')


def get_order_info_by_id(oid):
    """获取订单信息"""
    conn = pymysql.connect(
        host='127.0.0.1', port=3306, user='root', passwd='admin1234', charset="utf8", db="order")
    cursor = conn.cursor(pymysql.cursors.DictCursor)

    sql = 'select  id, oid, status, url, count from web_order where oid=%s and status=1'
    cursor.execute(sql, [oid, ])
    # (6, 1, 1, '2024050702594654374820779848') / 
    # {'id': 6, 'active': 1, 'status': 1, 'oid': '2024050702594654374820779848'}
    row_dict = cursor.fetchone()

    cursor.close()
    conn.close()

    if not row_dict:  # 只有当订单状态为1,即待执行时,才需要获取订单信息
        return

    row_obj = DbRow(**row_dict)
    return row_obj


def update_order_status(oid, status):
    """更新订单状态"""
    conn = pymysql.connect(
        host='127.0.0.1', port=3306, user='root', passwd='admin1234', charset="utf8", db="order")
    cursor = conn.cursor(pymysql.cursors.DictCursor)

    sql = 'update web_order set status=%s where oid=%s'
    cursor.execute(sql, [status, oid, ])
    conn.commit()

    cursor.close()
    conn.close()


def run():
    while True:
        # 1.去redis的队列中获取待执行的订单号
        # oid = get_redis_task()
        # if not oid:
        #     continue  # 有订单号才能继续往下走
        
        oid = '2024050702594654374820779848'
        
        # 2.连接数据库获取订单信息  <需连接数据库>
        row_obj = get_order_info_by_id(oid)
        if not row_obj:
            continue
            
        # 3.更新订单状态 待执行->正在执行  <需连接数据库>
        update_order_status(oid, 2)


if __name__ == '__main__':
    run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81

# with上下文

__enter__ 以及 __exit__

文件管理open就自带上下文管理 用的时候打开,不用时自己关闭! 我们也可以自己写上下文管理器,执行with语句时,连接数据库,with代码块执行完后,关闭数据库!!

# 基本框架

往下面这段代码中填充内容!

class Connect:
    def __init__(self):
        pass

    def __enter__(self):
        # To do:连接数据库
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        # To do:关闭数据库
        pass

    def fetch_one(self):
        pass


if __name__ == '__main__':
    """ ★★★
    with语句,进去时自动执行__enter__,出来时自动执行__exit__
    __enter__返回啥,conn就是啥, 那么此处若__enter__返回self,那么conn就是Connect()这个实例对象!!
    """
    with Connect() as conn:
        conn.fetch_one()
  
""" 相当于,这么一回事:
with 获取连接:
    执行SQL(执行完毕后,自动将连接交还给连接池)
"""
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 关键代码

这段代码就够用啦!!

class Connect(object):
    def __init__(self):
        self.conn = None
        self.cursor = None

    def __enter__(self):
        self.conn = pymysql.connect(**settings.MYSQL_CONN_PARAMS)
        self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.cursor.close()
        self.conn.close()

    def exec(self, sql, **kwargs):
        """增加、更新、删除时使用该方法!"""
        self.cursor.execute(sql, kwargs)
        self.conn.commit()

    def fetch_one(self, sql, **kwargs):
        """查看一条数据"""
        self.cursor.execute(sql, kwargs)
        result = self.cursor.fetchone()
        return result

    def fetch_all(self, sql, **kwargs):
        """查看所有数据"""
        self.cursor.execute(sql, kwargs)
        result = self.cursor.fetchall()
        return result
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# 优化后的代码

mysql的连接关闭使用上上下文啦!!

import redis
import settings
import pymysql


class DbRow(object):
    def __init__(self, id, oid, status, url, count):
        self.id = id
        self.oid = oid  # 订单号
        self.status = status  # 订单状态
        self.url = url  # 刷量的url
        self.count = count  # 刷的播放量


class Connect:
    def __init__(self):
        self.conn = conn = pymysql.connect(**settings.MYSQL_CONN_PARAMS)
        self.cursor = conn.cursor(pymysql.cursors.DictCursor)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.cursor.close()
        self.conn.close()

    def exec(self, sql, **kwargs):
        self.cursor.execute(sql, kwargs)
        self.conn.commit()

    def fetch_one(self, sql, **kwargs):
        self.cursor.execute(sql, kwargs)
        result = self.cursor.fetchone()
        return result

    def fetch_all(self, sql, **kwargs):
        self.cursor.execute(sql, kwargs)
        result = self.cursor.fetchall()
        return result


def get_redis_task():
    conn = redis.Redis(**settings.REDIS_PARAMS)  # 直接连接redis
    # rpop从右开始拿,没有就结束 brpop没有傻傻的一直等,当然可以设置最多等待时间
    oid = conn.brpop(settings.QUEUE_TASK_NAME, timeout=5)
    if not oid:
        return
    # eg: oid --> (b'YANG_TASK_QUEUE', b'2022081409424192778452289117')
    return oid[1].decode('utf-8')


def get_order_info_by_id(oid):
    with Connect() as conn:
        row_dict = conn.fetch_one(
            "select id,oid,status,url,count from web_order where oid=%(oid)s and status=1",
            oid=oid
        )
    if not row_dict:
        return
    row_object = DbRow(**row_dict)
    return row_object


def update_order_status(oid, status):
    with Connect() as conn:
        conn.exec("update web_order set status=%(status)s where oid=%(oid)s", status=status, oid=oid)


def run():
    while True:
        # 1.去redis的队列中获取待执行的订单号
        # oid = get_redis_task()
        # if not oid:
        #     continue  # 有订单号才能继续往下走

        oid = '2024050702574353748298051742'
        # 2.连接数据库获取订单信息
        row_obj = get_order_info_by_id(oid)
        if not row_obj:
            continue

        # 3.更新订单状态 待执行->正在执行
        update_order_status(oid, 2)
        
        import brush  # -- 这里面就是写的逆向爬虫,具体算法就不说啦!!略.
        
        # 4.执行订单 -- 使用线程池!!并发最多开到20
        pool = ThreadPoolExecutor(20)
        for i in range(row_obj.count):  # 假设要给该视频地址刷100个播放量
            pool.submit(brush.task, row_obj.url)
        pool.shutdown()  # 组塞住,等待20线程把100播放量的任务执行完成,程序才往下走

        # 5.再次更新订单状态 正在执行->已完成
        update_order_status(oid, 3)


if __name__ == '__main__':
    run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98

# 一些思考

(*≧ω≦)

Q: 如果中途程序崩溃了,没有刷完怎么办
程序突然关闭
- 正常运行时,500次,在数据库更新一次,已刷多少.
- 崩溃
  > 状态,正在执行
  > 查看已经刷了多少
- 重新打开时,先去找到所有的正在执行的订单,重新加入队列去执行(所有的减去已刷的).

还可以在本地记录日志,记录跑了多少,奔溃时查看日志,再补刷!

Q: 刷单程序 使用的是 远程数据库?
A: 是的.

Q: 日志相关
A:
  - logging模块,内置模块+线程安全.
  - 第三方模块日志模块.
  - sentry日志&探针.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 打包

将几个脚本打包成可执行文件!!

相当于打包成了一个客户端, 发给任何人都, 它都可以去执行.

- 可执行文件打包
  pip install pyinstaller

windows环境下打包出来的就是 windows包;
mac环境下打包出来的就是 mac包.

# -- app.py 是右键运行的那个py文件!! 打包完后出现一个dist文件夹,双击里面的可执行文件即可.
pyinstaller -D app.py
pyinstaller -F app.py  # -F的方式dist文件夹里就只会有一个文件
1
2
3
4
5
6
7
8
9

# 补充

(・_・;

python执行js代码!!

# Python调用JavaScript代码
#   - 安装nodejs + 配置环境变量
#   - 安装pyexecjs模块  pip install pyexecjs

import execjs  # pip install pyexecjs

javascript_file = execjs.compile("""
function createGUID(e) {
    e = e || 32;
    for (var t = "", r = 1; r <= e; r++) {
        t += Math.floor(16 * Math.random()).toString(16);
    }
    return t;
}
""")
guid = javascript_file.call('createGUID')
print(guid)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

try..except.. 看错误的调用栈,即哪一行出错啦!

try:
    pass
except Exception as e:
    print(e)
    import traceback
    print(traceback.format_exc())
    exit()
1
2
3
4
5
6
7

我的交易列表
部署之代码同步

← 我的交易列表 部署之代码同步→

最近更新
01
deepseek本地部署+知识库
02-17
02
实操-微信小程序
02-14
03
教学-cursor深度探讨
02-13
更多文章>
Theme by Vdoing | Copyright © 2023-2025 DC | One Piece
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式