9.2 信号量:Semaphore
锁同时只允许一个线程更改数据,而信号量是同时允许一定数量的进程更改数据 。假如有一下应用场景:有10个人吃饭,但只有一张餐桌,只允许做3个人,没上桌的人不允许吃饭,已上桌吃完饭离座之后,下面的人才能抢占桌子继续吃饭,如果不用信号量,肯定是10人一窝蜂一起吃饭:
from multiprocessing import Process
import time
import randomdef fun(i):print('{}号顾客上座,开始吃饭'.format(i))time.sleep(random.randint(3, 8))print('{}号顾客吃完饭了,离座'.format(i))if __name__ == '__main__':for i in range(10):p = Process(target=fun, args=(i,))p.start()
运行结果:
用了信号量,实现了轮流吃饭,每次只有3个人吃饭:
示例代码:
from multiprocessing import Process
import time
import random
from multiprocessing import Semaphoredef fun(i , sem):sem.acquire()print('{}号顾客上座,开始吃饭'.format(i))time.sleep(random.randint(3, 8))print('{}号顾客吃完饭了,离座'.format(i))sem.release()if __name__ == '__main__':sem = Semaphore(3)for i in range(10):p = Process(target=fun, args=(i,sem))p.start()
运行结果:
事实上,Semaphore的作用也类似于锁,只不过在锁机制上添加了一个计数器,允许多个人拥有“钥匙”。
9.3 事件:Event
python进程的事件用于主进程控制其他子进程的执行,Event类有如下几个主要方法:
1)wait() 插入在进程中插入一个标记(flag)默认为 False,当 flag为False时,程序会停止运行进入阻塞状态;
2)set() 使flag为True,程序会进入非阻塞状态
3)clear() 使flag为False,程序会停止运行,进入阻塞状态
4)is_set() 判断flag 是否为True,是的话返回True,不是则返回False
有如下需求:获取当前时间的秒数的个位数,如果小于5,则设置子进程阻塞,如果大于5则设置子进程非阻塞。代码如下:
from multiprocessing import Event, Process
import time
from datetime import datetimedef func(e):print('子进程:开始运行……')while True:print('子进程:现在事件秒数是{}'.format(datetime.now().second))e.wait() # 阻塞等待信号 这里插入了一个flag 默认为 Falsetime.sleep(1)if __name__ == '__main__':e = Event()p = Process(target=func, args=(e,))p.start()for i in range(10):s = int(str(datetime.now().second)[-1]) # 获取当前秒数的个位数if s < 5:print('子进程进入阻塞状态')e.clear() # 使插入的flag为False 进程进入阻塞状态else:print('子进程取消阻塞状态')e.set() # 使插入的flag为True,进程进入非阻塞状态time.sleep(1)e.set()time.sleep(3)p.terminate()print("主进程运行结束……")
运行结果:
10、内置线程池
示例代码:
import time
import os
import random
from multiprocessing.pool import ThreadPooldef task():print(f'开始执行任务:{os.getpid()}')time.sleep(random.randint(0, 5))print(f"执行任务结束:{os.getpid()}")if __name__ == '__main__':pool = ThreadPool(2)for i in range(5):pool.apply_async(task)pool.close()pool.join()
运行结果: