线程开发
# 线程理论
在传统操作系统中, 每个进程有一个地址空间(即每个进程都会占用独一份的内存空间), 而且每个进程内默认就有一个控制线程(即每个进程内自带一个线程)..
纠正一个概念:
在前面的并发编程的学习中,我们一直说进程怎么怎么运行,这是不够准确的!
准确点说,进程不是一个执行单位!进程只是一个资源单位!
# 多角度分析
要清楚的知道! 进程是资源单位,线程是cpu执行和调度的基本单位!
# 比喻工厂
工厂 -- OS ; 车间 -- 进程 ; 流水线 -- 线程.
若把操作系统比喻为一座工厂.
在工厂内每造出一个 车间 --- 启动一个 进程.
造车轱辘的车间 -- 放造车轱辘的一堆原材料
造发动机的车间 -- 放造发动机的一堆原材料
每个车间内至少有一条 流水线 --- 每个进程内至少有一个 线程.
流水线会去取车间里的原材料进行加工
# py文件角度
从运行一个python文件的角度诠释进程与线程.
我们说的 "<进程的运行>" 本质上说的是进程里的线程的运行.
进程 -- 开辟了一块内存空间,运行过程中产生的数据都往里面放 线程的运行 -- 基于造好的空间,从上到下的执行代码.
即一说QQ进程,就意味着在内存里有一块隔离的空间专门存放QQ程序运行过程当中相关的数据!
# 抽象的概念
进程和线程都是一个抽象的概念!!
单说进程, 那么进程描述的就是一个程序的运行过程. 此时..
进程 = 资源申请(开辟内存空间) + 程序执行(运行代码)
既提到了进程又提及了线程, 那么..
进程 = 资源申请(开辟内存空间)
线程 = 相应代码的执行过程(前提是进程已经存在/空间已经造好了)
抽象话术 | 大白话 |
---|---|
一个进程启动了 | 关于这个程序的空间划分好了! |
一个进程销毁了 | 这个空间被释放掉啦! |
线程运行到某一行 | 指的是空间已经造好了,单指代码运行到哪一行啦. |
一个线程运行完毕 | 跟这个线程有关的代码已经运行完了 |
有三个线程 | 有三段代码在运行 |
有三个进程 | 有三个空间被申请好了 |
注意!提一嘴,cpu是执行代码的,cpu不涉及资源的申请.
前面并发编程学习中说cpu在多个进程之间来回切换,准确点应该说是 cpu在多个线程之间来回切换.
执行的是线程对应的代码!! 运行代码的过程中,用到的数据跟进程要/从进程空间中拿!!
# 进程 vs 线程
1> 内存共享or隔离
多个进程之间内存空间彼此隔离
<同一进程下>的多个线程共享该进程内的数据
2> 创建速度
造线程的速度要远远快于造进程!!
浅想/浅尝一下什么时候用线程,什么时候用进程!
假设我们需要开发一个文本处理工具,该工具至少包含三方面的功能:
1> 接收用户的输入;
2> 将输入的内容从内存中取出来格式化打印到屏幕上;
3> 把内存里的内容定期的往硬盘里刷.
(初步分析:这三个功能肯定不能是串行.难道刷数据到硬盘的时候就不允许用户输入吗?这是不合理的)
并发的方案有两种:
■ 方案一 多进程 -- 开三个进程(每个进程都自带一个线程)
■ 方案二 多线程 -- 开一个进程,该进程内有三个线程
再次强调,起了10个进程,每个进程里都自带一个线程,叫多线程吗?No.多线程指的是同一个进程内开启了多个线程!!
先说结论,这两个解决方案,第二个方案多线程胜出!分析如下:
若是方案一,三个进程分别负责功能1、2、3.
第一个进程输入的数据要给第二个进程在屏幕上打印,第一个进程内存中的数据要拷贝给第三个进程.
但进程之间内存彼此隔离,解决该问题涉及到进程间的通信.
So,弊端:数据被重复拷贝,进程之间的通信实现复杂.
若是方案二.可以解决方案一的弊端.
该进程里的这三个线程产生的数据都在进程的内存里,数据共享.
造线程的速度也远远快于造进程的速度.
Ps:这里只是进行了简单的分析,为什么选择方案二,分析的还不够透彻!!但可以明确的跟你说,大多数场景下都会用多线程,这也并不意味着多进程没用,当后续了解了GIL后,就晓得多进程的优势啦!!
扩张阅读:`https://www.liaoxuefeng.com/wiki/1016959663602400/1017631469467456`
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 开启线程的两种方式
为了方便描述,通常会将进程里自带的那个线程叫做主线程.
但要晓得,同一进程下的多个线程没有父子之分!大家地位都一样!!不要误解啦.
# 方式一
右键执行代码,会启一个进程,进程里的原生线程会从上到下执行代码.
当运行到 t.start()
时,发送信号,在该进程里又启了个线程. 启线程的速度超级快!
Ps: 这里其它线程是主线程开启的,当然,其它线程里可以再开线程.不管怎样,线程都在该进程里.
import time
from threading import Thread
def task(x):
print("%s is running." % x)
time.sleep(2)
print("%s is done." % x)
if __name__ == '__main__': # -- 启线程可以不加main()函数,但都会习惯性的加上.
t = Thread(target=task, args=('子线程',))
t.start()
print("主") # -- 站在资源角度,主进程;站在执行角度,主线程.
"""
子线程 is running.
主
子线程 is done.
"""
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
★ 思考一个问题,打印出"主"后,程序还未结束,是因为主线程在等子线程结束后,才结束的吗?
No! 这里的等是因为进程必须等待其内部所有线程运行完毕才结束!
再回顾下,多进程里,主进程等子进程是因为主进程要给子进程收尸.
# 方式二
import time
from threading import Thread
class Mythread(Thread):
def __init__(self, x):
# -- 我疑惑过这个构造函数为啥不传参数,看了源码才发现,Thread的构造方法的参数都设置成了默认参数
super().__init__()
self.x = x
def run(self):
print("%s is running." % self.x)
time.sleep(2)
print("%s is done." % self.x)
if __name__ == '__main__':
t = Mythread('子线程')
t.start()
print("主")
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 开多进程与多线程的区别
在 一个进程下 开启多个线程与在 一个进程下 开启多个子进程的区别
# 开启速度
线程的开启速度快
import time
from multiprocessing import Process
from threading import Thread
def task(x):
print("%s is running." % x)
time.sleep(2)
print("%s is done." % x)
if __name__ == '__main__':
# t = Process(target=task, args=('子进程',))
# t.start() # -- 在发出信号的同时,不可能同时将子进程造出来.
# print("主")
# -- 造进程的运行结果如下:
"""
主
子进程 is running.
子进程 is done.
"""
t = Thread(target=task, args=('子线程',))
t.start() # -- 几乎在信号发出的同时线程立马就造出来了!!
print("主")
# -- 造线程的运行结果如下:
"""
子线程 is running.
主
子线程 is done.
"""
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 共享数据
同一进程下的多个线程共享该进程内的数据
from threading import Thread
x = 100
def task():
global x
x = 0
if __name__ == '__main__':
t = Thread(target=task)
print("开线程之前x的值为:", x)
t.start()
t.join() # -- 虽然发信号同时,线程就造出来啦.保险起见,这里还是join下,让主线程等子线程运行完.
print("开线程之后x的值为:", x)
"""
开线程之前x的值为: 100
开线程之后x的值为: 0
"""
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
虽然同一进程下的多个线程数据共享,但以后我们是分布式运行程序的.
那就意味着我们的线程是分散到一台台机器上的,不同机器上的线程是不共享数据的..
所以,以后线程之间的通信我们也需要借助一个套接字软件..
该套接字软件实现了网络版本的队列功能..(不是单机版本的队列哦),线程也可以实现生产者消费者模型的.
# 查看PID
在主进程下开启多个线程,每个线程都跟主进程的pid一样
开多个进程,每个进程都有不同的pid
import os
from multiprocessing import Process
from threading import Thread
def task():
print(os.getpid())
if __name__ == '__main__':
# -- part1
t1 = Thread(target=task)
t2 = Thread(target=task)
t1.start()
t2.start()
print('主线程/主进程pid', os.getpid())
# -- part2
p1 = Process(target=task)
p2 = Process(target=task)
p1.start()
p2.start()
print('主进程pid', os.getpid())
"""
28717
28717
主线程/主进程pid 28717
主进程pid 28717
28719
28720
"""
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 线程对象的其它方法
Thread类实例化对象的方法:
1> is_alive()
: 查看线程是否存活
2> setName()
: 设置线程名
3> join()
: 让主线程等着子线程运行完毕后,主线程再往下走
threading模块提供的一些方法:
1> current_thread()
: 返回当前的线程变量
2> enumerate()
: 返回一个包含正在运行的/活跃的线程对象的list
正在运行指线程启动后、结束前,不包括启动前和终止后的线程.
3> active_count()
: 返回正在运行的/活跃的线程数量, 与 len(threading.enumerate())
有相同的结果.
import time
from threading import Thread, current_thread, enumerate, active_count
def task():
time.sleep(2)
print("在子线程里查看自己的线程名:", current_thread().name)
if __name__ == '__main__':
t = Thread(target=task)
t.start()
print(enumerate()) # -- 连同主线程在内有两个运行的线程
t.join()
print(t.is_alive())
print(enumerate())
print(active_count())
print("在主线程里查看自己的线程名:", current_thread().name)
print("在主线程里查看子线程的线程名:", t.name) # -- 当然可以在Thread里自己指定线程名
print("主")
"""
[<_MainThread(MainThread, started 4556516864)>, <Thread(Thread-1, started 123145552031744)>]
在子线程里查看自己的线程名: Thread-1
False
[<_MainThread(MainThread, started 4556516864)>]
1
在主线程里查看自己的线程名: MainThread
在主线程里查看子线程的线程名: Thread-1
主
"""
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 守护线程
无论是进程还是线程, 都遵循 "守护 进程/线程 会等待 主进程/主线程 运行完毕后被销毁"
需要特别强调! 运行完毕并非终止运行!
# 概念
1> 主进程 在其代码结束后就已经算运行完毕了 (守护进程在此时就被回收). 然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源 (否则会产生僵尸进程) 才会结束.
2> 主线程 在其他非守护线程运行完毕后才算运行完毕 (守护线程在此时就被回收). 因为主线程的结束意味着进程的结束, 进程整体的资源都将被回收, 而进程必须保证非守护线程都运行完毕后才能结束.
import time
from threading import Thread
def task(x):
print("%s is running." % x)
time.sleep(2)
print("%s is done." % x)
"""
守护线程会在本进程内所有非守护线程都死掉了才跟着死.即守护线程守护的是整个进程的运行周期.
-- 守护线程就是车间里的监工,监察车间里所有的流水线,流水线都停工了,就没有监察的必要了.
-- 守护进程就像是个太监,守护着皇上,皇上不干活了,太监就可以去死了.
"""
if __name__ == '__main__':
t = Thread(target=task, args=('守护线程',))
t.daemon = True
t.start() # -- 运行到此处,守护线程立马就起来了.
print("主") # -- 运行到此处,所有的非守护线程都运行完了.
"""
守护线程 is running.
主
"""
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 迷惑人的例子
import time
from threading import Thread
def foo():
print(123)
time.sleep(1)
print("end123")
def bar():
print(456)
time.sleep(3)
print("end456")
t1 = Thread(target=foo)
t2 = Thread(target=bar)
t1.daemon = True
t1.start()
t2.start()
print("main-------")
"""
123
456
main------- # -- 主线程结束了
end123
end456 # -- 子线程t2结束了
"""
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
悄悄告诉你,若foo里睡3秒,bar里睡1秒, "end123" 就打印不出来啦!!!
# 互斥锁
将多个并发任务对共享数据的修改变成"串行",牺牲了效率,保证了数据的安全!
这里串行加引号是因为不是严格意义上的串行,不是挨着一个一个来的,需要抢,谁抢到了谁运行
# 模拟共享数据的修改
用实验证明线程修改共享数据是不安全的!
不加锁 -- 并发执行,速度快,数据不安全.
import time
from threading import Thread
x = 100
def task():
global x
"""
x -= 1 # -- 这样实验,结果为0,但并不能证明线程修改共享数据就是安全的!因为线程启动速度太快!
"""
temp = x
# -- 0.1s的时间,完成100次for循环足矣,保证100个线程都起来啦,都拿到一个值为100的临时变量temp
time.sleep(0.1)
# -- 睡醒后,这100个线程都进行了`x = 100 - 1`的赋值操作,确实也是减了100次
x = temp - 1
if __name__ == '__main__':
t_l = []
start = time.time()
for _ in range(100):
t = Thread(target=task)
t_l.append(t)
t.start()
for t in t_l: # -- 用于保证100个线程运行完毕
t.join()
stop = time.time()
print(x, stop - start) # -- 99 0.12547802925109863
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 加锁
未加锁部分并发执行, 加锁部分"串行"执行,速度慢,但数据安全
注意哦,若在start之后立即使用join,会将100个线程的任务"串行"执行,而加锁只是将任务中 涉及共享数据修改的部分 "串行"执行,相比之下,明显加锁的效率更高!
import time
from threading import Thread, Lock
# -- 没必要传给每一个线程,因为这个锁造出来后,所有线程都能看到
# 因为同一进程下的多个线程共享该进程内的数据
# 回顾下,在父进程里造的锁,子进程看不到,需要通过参数传给每一个子进程(锁只有一把)
mutex = Lock()
x = 100
def task():
# -- 未加锁的代码并发运行
global x
"""
with mutex:
temp = x
time.sleep(0.1)
x = temp - 1
"""
# -- 加锁的代码"串行"运行
mutex.acquire()
temp = x
# -- 在第一个线程(因为for循环,起线程的速度也很快,所以必定为第一个线程)的睡眠期间
# 其余99个线程已经起来啦,之后锁的争抢就说不准谁抢到了
time.sleep(0.1)
x = temp - 1
mutex.release()
if __name__ == '__main__':
t_l = []
start = time.time()
for _ in range(100):
t = Thread(target=task)
t_l.append(t)
t.start()
for t in t_l:
t.join()
stop = time.time()
print(x, stop - start) # 0 10.318571090698242 -- 100*0.1=10 加锁部分是"串行"执行的
# 10秒多,多的这点时间是造线程以及切换线程的时间
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 死锁现象与递归锁
进程和线程都有死锁和递归锁哦!
# 死锁现象
两个或两个以上的进程或线程在执行过程中, 因争夺资源而造成的一种互相等待的现象.
若无外力作用, 它们都将无法继续运行下去. 此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程.
import time
from threading import Thread, Lock, active_count
mutexA = Lock()
mutexB = Lock()
class Mythread(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
mutexA.acquire()
print("[%s]拿到A锁!" % self.name)
mutexB.acquire()
print("[%s]拿到B锁!" % self.name)
mutexB.release()
mutexA.release()
def f2(self):
mutexB.acquire()
print("[%s]拿到B锁!" % self.name)
time.sleep(1) # -- [Thread-1]线程在这里睡一秒,足够其它线程起来啦!
mutexA.acquire()
print("[%s]拿到A锁!" % self.name)
mutexA.release()
mutexB.release()
if __name__ == '__main__':
for i in range(10):
t = Mythread()
t.start()
print(active_count()) # 11 -- 值为11,代表所有线程都已经产生啦
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
实验结果: 线程都已经产生了,但程序却阻塞住了.
线程启动速度很快!线程1肯定会先起来.(for循环第一个嘛)
[Thread-1]执行f1函数体代码时,很悠闲的拿到了A锁、B锁、释放B锁、释放A锁
[Thread-1]接着执行f2函数体代码,先拿到B锁,睡了一秒
(线程1走到这一步的过程中没有任何竞争者). 注意哦,睡的这一秒足够其它所有的线程都起来啦.
在线程1释放掉A锁后,[Thread-2]抢到了A锁,继续执行,发现B锁还处于被占用的状态.
[Thread-1]睡完后,继续执行,发现A锁被占用了.
线程1、线程2都等待着对方释放锁.
自己在编程过程中加锁就极有可能导致死锁情况的产生!
# 递归锁
死锁解决方案: 使用Rlock递归锁 -- 可以连续的acquire
Ps: 互斥锁是不能连续的acquire的!!
递归锁 -- 在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock
这个RLock内部维护着一个Lock和一个counter变量, counter记录了acquire的次数, 从而使得资源可以被多次require. 直到一个线程所有的acquire都被release, 其他的线程才能获得资源!
# -- 只用改动几行代码即可
import time
from threading import Thread, active_count, RLock
# mutexA = Lock()
# mutexB = Lock()
# -- 使用递归锁的话,这里的A锁以及B锁,实际上都是同一把锁
# 简写: mutexA = mutexB = RLock()
obj = RLock()
mutexA = obj
mutexB = obj
"""
[Thread-1]拿到A锁!
[Thread-1]拿到B锁!
[Thread-2]拿到A锁!
[Thread-2]拿到B锁!
[Thread-1]拿到B锁!
... ... ...
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
首先再次明确递归锁的计数不为0的话,其它线程只能原地等待.为0的时候,其它线程才能抢锁.
即一个线程拿到锁,counter加1,该线程内又碰到加锁的情况, 则counter继续加1, 这期间所有其他线程都只能等待, 等待该线程释放所有锁, 即counter递减到0为止!
根据运行结果分析代码流程:
线程1拿到递归锁,运行f1的代码,锁的计数加1、再加1、减1、再减1.
此时递归锁的计数为0.所有线程(包括线程1)重新开始抢锁!
线程2抢到了,线程2开始运行f1的代码. 仅管线程1想继续运行f2的代码,但奈何没有抢到锁,也就只好原地等待.
# 信号量
进程和线程都有信号量! 信号量控制同一时刻并发执行的任务数
信号量Semaphore管理一个内置的计数器
每当调用acquire()时内置计数器-1,调用release() 时内置计数器+1.
计数器不能小于0! 当计数器为0时, acquire()将阻塞线程直到其他线程调用release().
# --
import random
import time
from threading import Thread, Semaphore, current_thread
sm = Semaphore(5) # -- 信号量为5,同一时间最大活跃的只有5个
# 同时只有5个线程可以获得semaphore,即可以限制最大连接数为5
# Ps:互斥锁同一时间运行的只有1个
def task():
sm.acquire()
print("[%s]正在上厕所!" % current_thread().name)
time.sleep(random.randint(1, 4)) # -- 模拟每个人上厕所的时间
sm.release()
if __name__ == '__main__':
for i in range(20):
t = Thread(target=task)
t.start()
"""
上来就有5个人抢到锁了,其余人等着,除非有人释放掉,其他人就可以抢释放的锁了.
不需要5个人都释放锁后才能抢锁,有人释放了锁就可以抢..
"""
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
信号量 就跟 公共厕所 一样. 有多个坑可以同时服务多个人.
互斥锁 就跟 独立卫生间 一样. 同一时间只能有一个人.
提一嘴, 信号量与进程池是完全不同的概念, 进程池Pool(4), 最大只能产生4个进程, 而且从头到尾都只是这四个进程, 不会产生新的, 而信号量是产生一堆线程/进程...
# 定时器
定时器, 指定n秒后执行某操作
from threading import Timer, current_thread
def task(x):
print("%s running." % x)
print(current_thread().name) # -- 是一个线程!
if __name__ == '__main__':
# -- 时间间隔、任务、任务的参数
t = Timer(3, task, args=(10,)) # -- 3秒后运行一次
t.start()
print("主")
"""
主
10 running.
Thread-1
"""
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 线程queue
import queue
, 用法与进程Queue一样
import queue
# -- ★ 队列: 先进先出
q = queue.Queue(3)
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
# -- ★ 堆栈: last in first out 先进后出
q = queue.LifoQueue()
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
# -- ★ 优先级队列: 数字越小,优先级越高 优先级高的先出队
q = queue.PriorityQueue()
q.put((3, 'data1')) # -- 放元祖和列表皆可,第一个元素是优先级,第二个元素是数据
q.put((-10, 'data2'))
q.put((11, 'data3'))
print(q.get())
print(q.get())
print(q.get())
"""
1
2
3
3
2
1
(-10, 'data2')
(3, 'data1')
(11, 'data3')
"""
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41