进程开发必知
# 守护进程
守护 即 伴随!
守护进程其实就是一个"子进程".
守护进程的 生命周期 会伴随着 主进程的代码运行完毕 而结束/自动销毁. 只守护主进程代码运行的过程!
注意哦!主/父进程的活干完了,不意味着主进程就结束了!!父进程会等着所有子进程死掉后,给它们收尸..
Q: 为什么要用守护进程?
A: 从两个关键字入手分析
1> 进程: 当父进程需要将一个任务并发出去执行,可以开启一个子进程.
2> 守护: 若子进程在父进程代码运行完毕后就没有存在的意义了.
就应该将该子进程设置为守护进程,会在父进程代码结束后死掉(这不是僵尸哦!死掉资源会全部释放)
# daemon属性
p1.daemon = True
一定要放到p1.start()
之前!!
import time
from multiprocessing import Process
def task(x):
print(f"{x} is running!")
time.sleep(3)
if __name__ == '__main__':
p1 = Process(target=task, args=("守护进程",))
p2 = Process(target=task, args=("正常的子进程",))
p1.daemon = True # -- 一定要放到p1.start()之前
p1.start()
p2.start()
print("主")
"""
主
正常的子进程 is running!
"""
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
设置子进程p1为守护进程;
接着发送了两个信号,OS还没把进程造出来,没来得及运行p1、p2的代码呢.
父进程就已经把 print("主")
运行啦!
一旦在屏幕中打印出"主"就意外着父进程代码运行完毕,守护进程p1将会结束生命!!
即守护进程p1没有机会再运行自己的代码..
而父进程会等着p2子进程运行完,替它收完尸后,再安详的死去.
# 迷惑人的例子
关键在于理解守护进程什么时候死掉. 时刻要有并发的思想!
import time
from multiprocessing import Process
def foo():
print(123)
time.sleep(1)
print("end123")
def bar():
print(456)
time.sleep(3)
print("end456")
if __name__ == '__main__':
p1 = Process(target=foo)
p2 = Process(target=bar)
p1.daemon = True
p1.start() # -- 现目前的科技水平几乎不可能发信号的同时,OS就把p1造出来..因为造p1得申请内存空间等.
p2.start()
print("main") # -- 执行过程:在内存中造个字符串"主",从内存中取主进程造好的字符串"主",扔到屏幕中显示.
""" -- 在执行完`print("main")`,p1、p2都还没造出来呢.(我们的机器普遍都是这个结果)
main
456
end456
"""
""" -- 在执行`print("main")`过程中,p1造出来啦.会立刻执行p1进程里的代码.把"123"扔到屏幕上
但总有个先来后到,先"主"、后"123".(机器的性能比较卓越)
main
123
456
end456
"""
""" -- 在执行`p2.start()`过程中,p1造出来啦!(机器的性能超级牛皮)
123
main
456
end456
"""
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 互斥锁
进程之间需要通信,但进程之间内存是隔离的
互斥锁将多个并发的任务对修改共享数据的操作变成串行,牺牲了效率,但保证了数据的安全性!
# 模拟多人抢票
# -- db.txt 文件当作数据库
{"count",1}
2
# 并发抢票
抢票要经历两个阶段,查票环节和购票环节.
import json
import os
import random
import time
from multiprocessing import Process
def check():
with open('db.txt', 'rt', encoding='utf-8') as f:
dic = json.load(f)
time.sleep(random.randint(1, 3))
print(f"{os.getpid()} 查询到剩余票数为: {dic['count']}! ")
# -- 购票逻辑:先查票 -- 余票减一 -- 再写回数据文件.
def get():
# -- 购票的时候,票可能已经被抢走了,所以购票时还得查询一下
with open('db.txt', 'rt', encoding='utf-8') as f: # -- S端开始查数据
dic = json.load(f)
time.sleep(random.randint(1, 3)) # -- 模拟C端读数据的网络延迟 S-->C
# (查到的数据要1-3S才能到C端)
# -- 开始购票
if dic['count'] > 0:
dic['count'] -= 1 # -- 仅仅只是在内存里的操作,得刷新到硬盘db.txt文件里
time.sleep(random.randint(1, 3)) # -- 模拟S端写数据的网络延迟 C-->S
# (要写入的数据要1-3S才能到达S端)
with open('db.txt', 'wt', encoding='utf-8') as f: # -- S端开始写入数据
json.dump(dic, f)
print(f"{os.getpid()} 您购票成功!")
else:
print(f"尊敬的{os.getpid()}用户:没有余票啦!购票失败!")
def task():
check() # -- 查票
get() # -- 购票
if __name__ == '__main__':
# -- 模拟多个人并发请抢票
for _ in range(10):
p = Process(target=task)
p.start()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 问题分析
并发运行效率高,但竞争写入同一文件,数据写入错乱..
# -- 运行结果如下:
23866 查询到剩余票数为: 1!
23868 查询到剩余票数为: 1!
23870 查询到剩余票数为: 1!
23863 查询到剩余票数为: 1!
23865 查询到剩余票数为: 1!
23867 查询到剩余票数为: 1!
23872 查询到剩余票数为: 1!
23864 查询到剩余票数为: 1!
23869 查询到剩余票数为: 1!
23871 查询到剩余票数为: 1!
23868 您购票成功!
23865 您购票成功!
23866 您购票成功!
23867 您购票成功!
23870 您购票成功!
23872 您购票成功!
23863 您购票成功!
23864 您购票成功!
23871 您购票成功!
23869 您购票成功!
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Amazing啊,只有一张票,结果10个人全部购票成功,奇了个怪了! (つД`)ノ
问题的根源出在: 查票可以并发,但 购票不能并发 ..
购票的逻辑一定是三步走,先查票 - 有票则余票减1 - 再写回数据文件.
因为有网络延迟的缘故,10个人查到的票都是一样的结果,都减一重新覆盖掉数据文件..
不就意味着10个人购买到的是同一张票吗? 所以抢票不能并发着来! 得一个一个来!
具体来说, 该抢票程序中共有三处 time.sleep(random.randint(1, 3))
语句 模拟了三次网络延迟.
第一次: 最少1s的延迟,已经让10个并发的子进程都查询到了剩余票数.
第二次: 同上.跟第一次一样是读数据. 而且10个人获取到的剩余票数都是一样的!
第三次: 开始购票,10个子进程都先修改了自己内存中的数据.依次写入db.txt.但每次的写入都是 {"count",0}
.
# 串行抢票
要保证共享数据的安全, 修改共享数据的行为就不能让其并发的去运行. 但是效率变低啦!!
那么,将并发抢票程序改为 串行 (一个进程完完整整执行完后,才会执行下一个进程) 可以吗?yes.
因为第一个子进程走了一遍购票流程后,数据文件里的余票数减一..
后面的子进程购票行为是基于修改后的数据文件进行的操作..就没啥问题啦!!
if __name__ == '__main__':
for _ in range(10):
p = Process(target=task)
p.start()
p.join() # -- 只需要加这一行代码即可!
"""
23908 查询到剩余票数为: 1!
23908 您购票成功!
23909 查询到剩余票数为: 0!
尊敬的23909用户:没有余票啦!购票失败!
23910 查询到剩余票数为: 0!
尊敬的23910用户:没有余票啦!购票失败!
23911 查询到剩余票数为: 0!
尊敬的23911用户:没有余票啦!购票失败!
... ... ...
"""
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 加互斥锁解决
在你我纳闷这效率也太慢的过程中,应该也敏锐的发现了,上述的串行抢票程序存在的不合理的地方..
查票和购票都变成串行的啦!! 应该只将购票环节变为串行,查票环节应为并发!!
互斥锁 就是将某一并发操作变成串行的,牺牲了效率,保证了安全!
# 实现代码
这里给 get()
操作加锁, 这10个进程谁先抢到锁,谁进行购票,购票完成后释放锁,其余进程再抢锁购票.. 没有锁是不能进行购票的!!锁只有一把,所以每次购票行为都会完完整整的运行完再会进行下一个购票.
import json
import os
import random
import time
from multiprocessing import Process, Lock, set_start_method
def check():
with open('db.txt', 'rt', encoding='utf-8') as f:
dic = json.load(f)
time.sleep(random.randint(1, 3))
print(f"{os.getpid()} 查询到剩余票数为: {dic['count']}! ")
def get():
with open('db.txt', 'rt', encoding='utf-8') as f:
dic = json.load(f)
time.sleep(random.randint(1, 3))
if dic['count'] > 0:
dic['count'] -= 1
time.sleep(random.randint(1, 3))
with open('db.txt', 'wt', encoding='utf-8') as f:
json.dump(dic, f)
print(f"{os.getpid()} 您购票成功!")
else:
print(f"尊敬的{os.getpid()}用户:没有余票啦!购票失败!")
def task(mutex):
check()
# -- 只要购票操作涉及到对共享数据改的行为,需要对购票操作加锁
""" 下方的三行代码可以简写
with mutex:
get()
"""
mutex.acquire() # -- 加锁,互斥锁不能连续的acquire,必须是release以后才能重新acquire
get()
mutex.release() # -- 释放锁
if __name__ == '__main__':
"""
# -- ★ 特别说明
在mac上运行加锁的并发程序,需要在main()里得加这行语句!!!
set_start_method('fork')
原因:MAC电脑默认启动进程的方式是fork,而python默认的方式是spawn
所以需要将python启动进程的方式做修改
"""
set_start_method('fork')
mutex = Lock() # -- 在父进程里造了个互斥锁对象
for _ in range(10):
p = Process(target=task, args=(mutex,)) # -- 通过参数传递让每个子进程都去抢这把锁
p.start()
"""
24528 查询到剩余票数为: 1!
24530 查询到剩余票数为: 1!
24526 查询到剩余票数为: 1!
... ... ...
24534 查询到剩余票数为: 1!
24528 您购票成功!
尊敬的24530用户:没有余票啦!购票失败!
尊敬的24526用户:没有余票啦!购票失败!
... ... ...
尊敬的24534用户:没有余票啦!购票失败!
"""
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
step1: 父进程很快的发完10个信号,10个子进程查看余票的操作 check()
是并发执行的!
step2: 查完票就会去抢锁,先查完票的肯定会先抢到锁.. 假如进程24528抢到了锁,它就会去执行 get()
操作;
将购票流程 查票 - 余票减1 - 写入数据文件 完完整整的运行完后,释放锁..
注意,进程24528释放锁的时候,数据文件已经改了,后续的进程拿到锁访问到的是已修改的数据.
step3: 其它子进程再抢锁, 抢到锁的才能执行 get()
操作..
但凡并发编程里出现修改共享数据,就得让大家一个一个的去改!但没办法,降低效率保证数据安全!
# ☆join与加锁
join整体串行; 互斥锁局部串行(有抢锁操作).
1> join: 将执行任务的 所有代码 整体串行!
循环里添加 p.join()
意味着10个进程都将依次完完整整的依次执行 check()
、get()
这两个操作..
进程1执行完check()
、get()
后,进程2再开始执行check()
、get()
... 是串行执行的!
2> 互斥锁: 可以将要执行任务的 部分代码 (只涉及到修改共享数据的代码)变成串行! 其余部分都是并行.
加锁操作, 只是给 涉及到共享数据修改 的 get()
操作加了锁, 10个进程谁抢到锁,谁才能执行 get()
操作! 锁也只有一把, 抢到锁的进程将get()
完完整整执行完后才会释放锁, 其它进程再抢.. 没有涉及到共享数据修改的其它操作, 该并发执行的就并发执行!!
def task(mutex):
check() # -- 并行
with mutex: # -- 抢锁串行
get()
other_func1() # -- 并行
other_func2() # -- 并行
2
3
4
5
6
若task()的代码进行如下修改,跟join大同小异!!只不过多了抢锁,不是按照for循环的顺序来的罢了.
def task(mutex):
mutex.acquire()
check()
get()
mutex.release()
2
3
4
5
# 队列
# 进程间的通信
基于文件、IPC机制
用文件共享数据实现进程间通信, 存在以下问题:
1> 效率低(共享数据基于文件, 而 文件是硬盘上的数据 )
2> 需要自己加锁处理 (共享伴随着竞争!!)
加锁可以保证多个进程修改同一块数据时, 同一时间只能有一个进程可以进行修改,即串行的修改!
加锁导致速度慢了,但牺牲了速度却保证了数据安全!!
在生产环境中能不自己处理锁的问题就别自己处理, 加锁是一个复杂且危险的行为
(eg: 忘记释放锁了,后面的都在那等着都进不去)
我们需要找寻一种解决方案能够兼顾: 1> 效率高( 多个进程共享一块内存的数据 ) 2> 帮我们处理好锁问题
这就不得不提到mutiprocessing模块为我们提供的 **基于消息的IPC进程通信机制: 队列和管道 **
△ 队列和管道都是共享的内存空间.
△ 队列底层是以管道+锁的方式实现的! (管道就是内存空间,在讲述subprocess模块的时候就已经提到过啦!)
我们用队列!!!(⁎⁍̴̛ᴗ⁍̴̛⁎)
管道相关用法可参考: https://www.cnblogs.com/linhaifeng/articles/7428874.html#_label7
题外话:
在linux中输入 `free -m`
显示结果的shared字段表明了共享内存有多少M!!
2
3
# Queue基本用法
队列默认的特点: 先进先出FIFO.
注意: 1> 队列占用的是内存空间
2> 不应该往队列中放大数据,应该只存放数据量较小的消息!
# 需要掌握的
from multiprocessing import Queue
# -- 参数maxsize默认为0,设置的参数值小于等于0,表明队列大小无限制.
# 队列占用的是内存空间,我们不应该设置为无限制!
q = Queue(3)
q.put('first')
q.put({"k": "second"})
q.put(["third"])
# q.put(4) # -- 因为设置的队列大小为3 继续加的话会在这里阻塞住(锁的效果)
print(q.get())
print(q.get())
print(q.get())
# print(q.get()) # -- 因为队列中只要3个数据 取不出来第4个,会在这阻塞住
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 需要了解的
注意:只有在block为True的时候,设置timeout超时时间才有意义!!
from multiprocessing import Queue
q = Queue(3)
"""
put方法的参数 block默认为True,timeout默认为None
队列大小为3,前3个肯定能添加进去!!
第4个会阻塞
1> 不加timeout,timeout默认为None,会一直阻塞,直到队列有空间
2> 设置timeout=3,不会一直阻塞,阻塞3秒后,队列还是没有空间,会报错 "queue.Full" 告知队列已满
注意:只有在block为True的时候,设置timeout超时时间才有意义!!
"""
q.put('first', block=True, timeout=3)
q.put({"k": "second"}, block=True, timeout=3)
q.put(["third"], block=True, timeout=3)
# q.put(4, block=True, timeout=3)
"""
get方法的参数 block默认为True,timeout默认为None
取第4个时,会阻塞3秒,3秒后队列里还是没有数据的话,会报错 "queue.Empty" 告知队列为空
"""
print(q.get(block=True, timeout=3))
print(q.get(block=True, timeout=3))
print(q.get(block=True, timeout=3))
# print(q.get(block=True, timeout=3))
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
若put添加操作 block = False
, 当队列已满, 直接立刻报错, 压根不会阻塞.设置timeout也没用.
同理, 若get取数据操作 block = False
, 当队列为空, 直接立刻报错, 等都不会等.
q.put_nowait(4)
等同于 q.put(4, block=False)
q.get_nowait()
等同于 q.get(block=False)