DC's blog DC's blog
首页
  • 计算机基础
  • linux基础
  • mysql
  • git
  • 数据结构与算法
  • axure
  • english
  • docker
  • opp
  • oop
  • 网络并发编程
  • 不基础的py基础
  • 设计模式
  • html
  • css
  • javascript
  • jquery
  • UI
  • 第一次学vue
  • 第二次学vue
  • Django
  • drf
  • drf_re
  • 温故知新
  • flask
  • 前后端不分离

    • BBS
    • 订单系统
    • CRM
  • 前后端部分分离

    • pear-admin-flask
    • pear-admin-django
  • 前后端分离

    • 供应链系统
  • 理论基础
  • py数据分析包
  • 机器学习
  • 深度学习
  • 华中科大的网课
  • cursor
  • deepseek
  • 杂文
  • 罗老师语录
  • 关于我

    • me
  • 分类
  • 归档
GitHub (opens new window)

DC

愿我一生欢喜,不为世俗所及.
首页
  • 计算机基础
  • linux基础
  • mysql
  • git
  • 数据结构与算法
  • axure
  • english
  • docker
  • opp
  • oop
  • 网络并发编程
  • 不基础的py基础
  • 设计模式
  • html
  • css
  • javascript
  • jquery
  • UI
  • 第一次学vue
  • 第二次学vue
  • Django
  • drf
  • drf_re
  • 温故知新
  • flask
  • 前后端不分离

    • BBS
    • 订单系统
    • CRM
  • 前后端部分分离

    • pear-admin-flask
    • pear-admin-django
  • 前后端分离

    • 供应链系统
  • 理论基础
  • py数据分析包
  • 机器学习
  • 深度学习
  • 华中科大的网课
  • cursor
  • deepseek
  • 杂文
  • 罗老师语录
  • 关于我

    • me
  • 分类
  • 归档
GitHub (opens new window)
  • python面向过程

  • python面向对象

  • 网络并发编程

    • 计算机网络储备
    • TCP简单实现
    • TCP粘包问题
    • 文件传输、UDP
    • socketserver模块
    • 进程理论储备
    • 进程开发必用
    • 进程开发必知
      • 守护进程
        • daemon属性
        • 迷惑人的例子
      • 互斥锁
        • 模拟多人抢票
        • 并发抢票
        • 问题分析
        • 串行抢票
        • 加互斥锁解决
        • 实现代码
        • ☆join与加锁
      • 队列
        • 进程间的通信
        • Queue基本用法
        • 需要掌握的
        • 需要了解的
    • 生产者消费者模型
    • 线程开发
    • GIL详解
    • 进程池与线程池
    • 协程
    • 网络IO模型
    • 轮询长轮询
    • channels
    • 小项目之手撸ORM
    • 小项目之仿优酷
    • 脚本编写
  • 不基础的py基础

  • 设计模式

  • python_Need
  • 网络并发编程
DC
2022-10-28
目录

进程开发必知

# 守护进程

守护 即 伴随!
守护进程其实就是一个"子进程".
守护进程的 生命周期 会伴随着 主进程的代码运行完毕 而结束/自动销毁. 只守护主进程代码运行的过程!
注意哦!主/父进程的活干完了,不意味着主进程就结束了!!父进程会等着所有子进程死掉后,给它们收尸..

Q: 为什么要用守护进程?
A: 从两个关键字入手分析
     1> 进程: 当父进程需要将一个任务并发出去执行,可以开启一个子进程.
     2> 守护: 若子进程在父进程代码运行完毕后就没有存在的意义了.
                    就应该将该子进程设置为守护进程,会在父进程代码结束后死掉(这不是僵尸哦!死掉资源会全部释放)

# daemon属性

p1.daemon = True 一定要放到 p1.start() 之前!!

import time
from multiprocessing import Process


def task(x):
    print(f"{x} is running!")
    time.sleep(3)


if __name__ == '__main__':
    p1 = Process(target=task, args=("守护进程",))
    p2 = Process(target=task, args=("正常的子进程",))
    p1.daemon = True  # -- 一定要放到p1.start()之前
    p1.start()
    p2.start()
    print("主")

"""
主
正常的子进程 is running!
"""
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

设置子进程p1为守护进程;
接着发送了两个信号,OS还没把进程造出来,没来得及运行p1、p2的代码呢.
父进程就已经把 print("主") 运行啦!
一旦在屏幕中打印出"主"就意外着父进程代码运行完毕,守护进程p1将会结束生命!!
即守护进程p1没有机会再运行自己的代码..
而父进程会等着p2子进程运行完,替它收完尸后,再安详的死去.

# 迷惑人的例子

关键在于理解守护进程什么时候死掉. 时刻要有并发的思想!

import time
from multiprocessing import Process


def foo():
    print(123)
    time.sleep(1)
    print("end123")


def bar():
    print(456)
    time.sleep(3)
    print("end456")


if __name__ == '__main__':
    p1 = Process(target=foo)
    p2 = Process(target=bar)

    p1.daemon = True
    p1.start()     # -- 现目前的科技水平几乎不可能发信号的同时,OS就把p1造出来..因为造p1得申请内存空间等.
    p2.start()
    print("main")  # -- 执行过程:在内存中造个字符串"主",从内存中取主进程造好的字符串"主",扔到屏幕中显示.

""" -- 在执行完`print("main")`,p1、p2都还没造出来呢.(我们的机器普遍都是这个结果)
main
456
end456
"""

""" -- 在执行`print("main")`过程中,p1造出来啦.会立刻执行p1进程里的代码.把"123"扔到屏幕上
       但总有个先来后到,先"主"、后"123".(机器的性能比较卓越)
main
123
456
end456
"""

""" -- 在执行`p2.start()`过程中,p1造出来啦!(机器的性能超级牛皮)
123
main
456
end456
"""
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

# 互斥锁

进程之间需要通信,但进程之间内存是隔离的
互斥锁将多个并发的任务对修改共享数据的操作变成串行,牺牲了效率,但保证了数据的安全性!

# 模拟多人抢票

# -- db.txt 文件当作数据库
{"count",1}
1
2
# 并发抢票

抢票要经历两个阶段,查票环节和购票环节.

import json
import os
import random
import time
from multiprocessing import Process


def check():
    with open('db.txt', 'rt', encoding='utf-8') as f:
        dic = json.load(f)
    time.sleep(random.randint(1, 3))
    print(f"{os.getpid()} 查询到剩余票数为: {dic['count']}! ")


# -- 购票逻辑:先查票 -- 余票减一 -- 再写回数据文件.
def get():
    # -- 购票的时候,票可能已经被抢走了,所以购票时还得查询一下
    with open('db.txt', 'rt', encoding='utf-8') as f:  # -- S端开始查数据
        dic = json.load(f)
    time.sleep(random.randint(1, 3))  # -- 模拟C端读数据的网络延迟 S-->C 
                                      #    (查到的数据要1-3S才能到C端)
    # -- 开始购票
    if dic['count'] > 0:
        dic['count'] -= 1  # -- 仅仅只是在内存里的操作,得刷新到硬盘db.txt文件里
        time.sleep(random.randint(1, 3))  # -- 模拟S端写数据的网络延迟 C-->S 
                                          #    (要写入的数据要1-3S才能到达S端)
        with open('db.txt', 'wt', encoding='utf-8') as f:  # -- S端开始写入数据
            json.dump(dic, f)
        print(f"{os.getpid()} 您购票成功!")
    else:
        print(f"尊敬的{os.getpid()}用户:没有余票啦!购票失败!")


def task():
    check()  # -- 查票
    get()  # -- 购票


if __name__ == '__main__':
    # -- 模拟多个人并发请抢票
    for _ in range(10):
        p = Process(target=task)
        p.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 问题分析

并发运行效率高,但竞争写入同一文件,数据写入错乱..

# -- 运行结果如下:
23866 查询到剩余票数为: 1! 
23868 查询到剩余票数为: 1! 
23870 查询到剩余票数为: 1! 
23863 查询到剩余票数为: 1! 
23865 查询到剩余票数为: 1! 
23867 查询到剩余票数为: 1! 
23872 查询到剩余票数为: 1! 
23864 查询到剩余票数为: 1! 
23869 查询到剩余票数为: 1! 
23871 查询到剩余票数为: 1! 
23868 您购票成功!
23865 您购票成功!
23866 您购票成功!
23867 您购票成功!
23870 您购票成功!
23872 您购票成功!
23863 您购票成功!
23864 您购票成功!
23871 您购票成功!
23869 您购票成功!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

Amazing啊,只有一张票,结果10个人全部购票成功,奇了个怪了! (つД`)ノ

问题的根源出在: 查票可以并发,但 购票不能并发 ..
        购票的逻辑一定是三步走,先查票 - 有票则余票减1 - 再写回数据文件.
        因为有网络延迟的缘故,10个人查到的票都是一样的结果,都减一重新覆盖掉数据文件..
        不就意味着10个人购买到的是同一张票吗? 所以抢票不能并发着来! 得一个一个来!

具体来说, 该抢票程序中共有三处 time.sleep(random.randint(1, 3)) 语句 模拟了三次网络延迟.
第一次: 最少1s的延迟,已经让10个并发的子进程都查询到了剩余票数.
第二次: 同上.跟第一次一样是读数据. 而且10个人获取到的剩余票数都是一样的!
第三次: 开始购票,10个子进程都先修改了自己内存中的数据.依次写入db.txt.但每次的写入都是 {"count",0}.

# 串行抢票

要保证共享数据的安全, 修改共享数据的行为就不能让其并发的去运行. 但是效率变低啦!!

那么,将并发抢票程序改为 串行 (一个进程完完整整执行完后,才会执行下一个进程) 可以吗?yes.
因为第一个子进程走了一遍购票流程后,数据文件里的余票数减一..
后面的子进程购票行为是基于修改后的数据文件进行的操作..就没啥问题啦!!

if __name__ == '__main__':
    for _ in range(10):
        p = Process(target=task)
        p.start()
        p.join()  # -- 只需要加这一行代码即可!

"""
23908 查询到剩余票数为: 1! 
23908 您购票成功!
23909 查询到剩余票数为: 0! 
尊敬的23909用户:没有余票啦!购票失败!
23910 查询到剩余票数为: 0! 
尊敬的23910用户:没有余票啦!购票失败!
23911 查询到剩余票数为: 0! 
尊敬的23911用户:没有余票啦!购票失败!
... ... ...
"""
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 加互斥锁解决

在你我纳闷这效率也太慢的过程中,应该也敏锐的发现了,上述的串行抢票程序存在的不合理的地方..
查票和购票都变成串行的啦!! 应该只将购票环节变为串行,查票环节应为并发!!

互斥锁 就是将某一并发操作变成串行的,牺牲了效率,保证了安全!

# 实现代码

这里给 get() 操作加锁, 这10个进程谁先抢到锁,谁进行购票,购票完成后释放锁,其余进程再抢锁购票.. 没有锁是不能进行购票的!!锁只有一把,所以每次购票行为都会完完整整的运行完再会进行下一个购票.

import json
import os
import random
import time
from multiprocessing import Process, Lock, set_start_method


def check():
    with open('db.txt', 'rt', encoding='utf-8') as f:
        dic = json.load(f)
    time.sleep(random.randint(1, 3))
    print(f"{os.getpid()} 查询到剩余票数为: {dic['count']}! ")


def get():
    with open('db.txt', 'rt', encoding='utf-8') as f:
        dic = json.load(f)
    time.sleep(random.randint(1, 3))
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(random.randint(1, 3))
        with open('db.txt', 'wt', encoding='utf-8') as f:
            json.dump(dic, f)
        print(f"{os.getpid()} 您购票成功!")
    else:
        print(f"尊敬的{os.getpid()}用户:没有余票啦!购票失败!")


def task(mutex):
    check()
    # -- 只要购票操作涉及到对共享数据改的行为,需要对购票操作加锁
    """ 下方的三行代码可以简写
    with mutex:
        get()
    """
    mutex.acquire()  # -- 加锁,互斥锁不能连续的acquire,必须是release以后才能重新acquire
    get()
    mutex.release()  # -- 释放锁


if __name__ == '__main__':
    """
    # -- ★ 特别说明
    在mac上运行加锁的并发程序,需要在main()里得加这行语句!!!
    set_start_method('fork') 
    原因:MAC电脑默认启动进程的方式是fork,而python默认的方式是spawn
        所以需要将python启动进程的方式做修改
    """
    set_start_method('fork')   
    mutex = Lock()  # -- 在父进程里造了个互斥锁对象
    for _ in range(10):
        p = Process(target=task, args=(mutex,))  # -- 通过参数传递让每个子进程都去抢这把锁
        p.start()
        
"""
24528 查询到剩余票数为: 1! 
24530 查询到剩余票数为: 1! 
24526 查询到剩余票数为: 1! 
... ... ...
24534 查询到剩余票数为: 1! 
24528 您购票成功!
尊敬的24530用户:没有余票啦!购票失败!
尊敬的24526用户:没有余票啦!购票失败!
... ... ...
尊敬的24534用户:没有余票啦!购票失败!
"""
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66

step1: 父进程很快的发完10个信号,10个子进程查看余票的操作 check() 是并发执行的!
step2: 查完票就会去抢锁,先查完票的肯定会先抢到锁.. 假如进程24528抢到了锁,它就会去执行 get() 操作;
            将购票流程 查票 - 余票减1 - 写入数据文件 完完整整的运行完后,释放锁..
注意,进程24528释放锁的时候,数据文件已经改了,后续的进程拿到锁访问到的是已修改的数据. step3: 其它子进程再抢锁, 抢到锁的才能执行 get() 操作..

但凡并发编程里出现修改共享数据,就得让大家一个一个的去改!但没办法,降低效率保证数据安全!

# ☆join与加锁

join整体串行; 互斥锁局部串行(有抢锁操作).

1> join: 将执行任务的 所有代码 整体串行! 
循环里添加 p.join() 意味着10个进程都将依次完完整整的依次执行 check()、get() 这两个操作..
进程1执行完check()、get() 后,进程2再开始执行check()、get() ... 是串行执行的!

2> 互斥锁: 可以将要执行任务的 部分代码 (只涉及到修改共享数据的代码)变成串行! 其余部分都是并行.
加锁操作, 只是给 涉及到共享数据修改 的 get() 操作加了锁, 10个进程谁抢到锁,谁才能执行 get() 操作! 锁也只有一把, 抢到锁的进程将get() 完完整整执行完后才会释放锁, 其它进程再抢.. 没有涉及到共享数据修改的其它操作, 该并发执行的就并发执行!!

def task(mutex):
    check()       # -- 并行
    with mutex:   # -- 抢锁串行
        get()
    other_func1() # -- 并行
    other_func2() # -- 并行
1
2
3
4
5
6

若task()的代码进行如下修改,跟join大同小异!!只不过多了抢锁,不是按照for循环的顺序来的罢了.

def task(mutex):
    mutex.acquire() 
    check()
    get()
    mutex.release()
1
2
3
4
5

# 队列

# 进程间的通信

基于文件、IPC机制

用文件共享数据实现进程间通信, 存在以下问题:
     1> 效率低(共享数据基于文件, 而 文件是硬盘上的数据 )
     2> 需要自己加锁处理 (共享伴随着竞争!!)
          加锁可以保证多个进程修改同一块数据时, 同一时间只能有一个进程可以进行修改,即串行的修改!
          加锁导致速度慢了,但牺牲了速度却保证了数据安全!!
在生产环境中能不自己处理锁的问题就别自己处理, 加锁是一个复杂且危险的行为
(eg: 忘记释放锁了,后面的都在那等着都进不去)

我们需要找寻一种解决方案能够兼顾: 1> 效率高( 多个进程共享一块内存的数据 ) 2> 帮我们处理好锁问题
这就不得不提到mutiprocessing模块为我们提供的 **基于消息的IPC进程通信机制: 队列和管道 **
△ 队列和管道都是共享的内存空间.
△ 队列底层是以管道+锁的方式实现的! (管道就是内存空间,在讲述subprocess模块的时候就已经提到过啦!)

我们用队列!!!(⁎⁍̴̛ᴗ⁍̴̛⁎)
管道相关用法可参考: https://www.cnblogs.com/linhaifeng/articles/7428874.html#_label7

题外话:
  在linux中输入 `free -m` 
  显示结果的shared字段表明了共享内存有多少M!!
1
2
3

# Queue基本用法

队列默认的特点: 先进先出FIFO.

注意: 1> 队列占用的是内存空间
          2> 不应该往队列中放大数据,应该只存放数据量较小的消息!

# 需要掌握的
from multiprocessing import Queue

# -- 参数maxsize默认为0,设置的参数值小于等于0,表明队列大小无限制.
#    队列占用的是内存空间,我们不应该设置为无限制!
q = Queue(3)

q.put('first')
q.put({"k": "second"})
q.put(["third"])
# q.put(4)  # -- 因为设置的队列大小为3 继续加的话会在这里阻塞住(锁的效果)

print(q.get())
print(q.get())
print(q.get())
# print(q.get())  # -- 因为队列中只要3个数据 取不出来第4个,会在这阻塞住
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 需要了解的

注意:只有在block为True的时候,设置timeout超时时间才有意义!!

from multiprocessing import Queue

q = Queue(3)

"""
put方法的参数 block默认为True,timeout默认为None
队列大小为3,前3个肯定能添加进去!!
第4个会阻塞
    1> 不加timeout,timeout默认为None,会一直阻塞,直到队列有空间
    2> 设置timeout=3,不会一直阻塞,阻塞3秒后,队列还是没有空间,会报错 "queue.Full" 告知队列已满
注意:只有在block为True的时候,设置timeout超时时间才有意义!!
"""
q.put('first', block=True, timeout=3)
q.put({"k": "second"}, block=True, timeout=3)
q.put(["third"], block=True, timeout=3)
# q.put(4, block=True, timeout=3)

"""
get方法的参数 block默认为True,timeout默认为None
取第4个时,会阻塞3秒,3秒后队列里还是没有数据的话,会报错 "queue.Empty" 告知队列为空
"""
print(q.get(block=True, timeout=3))
print(q.get(block=True, timeout=3))
print(q.get(block=True, timeout=3))
# print(q.get(block=True, timeout=3))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

若put添加操作 block = False , 当队列已满, 直接立刻报错, 压根不会阻塞.设置timeout也没用.
同理, 若get取数据操作 block = False , 当队列为空, 直接立刻报错, 等都不会等.

q.put_nowait(4) 等同于 q.put(4, block=False)
q.get_nowait() 等同于 q.get(block=False)


进程开发必用
生产者消费者模型

← 进程开发必用 生产者消费者模型→

最近更新
01
deepseek本地部署+知识库
02-17
02
实操-微信小程序
02-14
03
教学-cursor深度探讨
02-13
更多文章>
Theme by Vdoing | Copyright © 2023-2025 DC | One Piece
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式