Python进程间通讯与进程池超详细讲解
作者:alwaysrun 发布时间:2023-09-05 16:50:41
在《多进程并发与同步》中介绍了进程创建与信息共享,除此之外python还提供了更方便的进程间通讯方式。
进程间通讯
multiprocessing中提供了Pipe(一对一)和Queue(多对多)用于进程间通讯。
队列Queue
队列是一个可用于进程间共享的Queue(内部使用pipe与锁),其接口与普通队列类似:
put(obj[, block[, timeout]])
:插入数据到队列(默认阻塞,且没有超时时间);
若设定了超时且队列已满,会抛出queue.Full异常;
队列已关闭时,抛出ValueError异常
get([block[, timeout]])
:读取并删除一个元素;
若设定了超时且队列为空,会抛出queue.Empty异常;
队列已关闭时,抛出ValueError异常;若已阻塞后,再关闭则会一直阻塞;
qsize()
:返回一个近似队列长度(因多进程原因,长度会有误差);
empty()/full()
:队列空或慢(因多进程原因,会有误差);
close()
:关闭队列;
当主进程(创建Queue的)关闭队列时,子进程中的队列并没有关闭,所以getElement进程会一直阻塞等待(为保证能正常退出,需要设为后台进程):
def putElement(name, qu: multiprocessing.Queue):
try:
for i in range(10):
qu.put(f"{name}-{i + 1}")
time.sleep(.1)
except ValueError:
print("queue closed")
print(f"{name}: put complete")
def getElement(name, qu: multiprocessing.Queue):
try:
while True:
r = qu.get()
print(f"{name} recv: {r}")
except ValueError:
print("queue closed")
print(f"{name}: get complete")
if __name__ == '__main__':
qu = multiprocessing.Queue(100)
puts = [multiprocessing.Process(target=putElement, args=(f"send{i}", qu)) for i in range(10)]
gets = [multiprocessing.Process(target=getElement, args=(f"recv{i}", qu), daemon=True) for i in range(2)]
list(map(lambda f: f.start(), puts))
list(map(lambda f: f.start(), gets))
for f in puts:
f.join()
print("To close")
qu.close() # 只是main中的close了,其他进程中的并没有
管道Pipe
multiprocessing.Pipe([duplex])
返回一个连接对象对(conn1, conn2)
。若duplex为True(默认),创建的是双向管道;否则conn1只能用于接收消息,conn2只能用于发送消息:
send():发送消息;
recv():接收消息;
进程间的Pipe基于fork机制建立:
主进程创建Pipe:Pipe的两个Connections连接的的都是主进程;
创建子进程后,Pipe也被拷贝了一份:此时有了4个Connections;
主进程关闭一个Out Connection,子进程关闭一个In Connection:就建立好了一个输入在主进程,输出在子进程的管道。
def pipeProc(pipe):
outPipe, inPipe = pipe
inPipe.close() # 必须关闭,否则结束时不会收到EOFError异常
try:
while True:
r = outPipe.recv()
print("Recv:", r)
except EOFError:
print("RECV end")
if __name__ == '__main__':
outPipe, inPipe = multiprocessing.Pipe()
sub = multiprocessing.Process(target=pipeProc, args=((outPipe, inPipe),))
sub.start()
outPipe.close() # 必须在进程成功运行后,才可关闭
with inPipe:
for x in range(10):
inPipe.send(x)
time.sleep(.1)
print("send complete")
sub.join()
进程池Pool
虽然使用多进程能提高效率,但进程的创建与销毁会消耗较长时间;同时,过多进程会引起频繁的调度,也增加了开销。
进程池中有固定数量的进程:
请求到来时,从池中取出一个进程来处理任务;理完毕后,进程并不立即关闭,而是再放回进程池中;
当池中进程数量不够,请求就要等待,直到拿到空闲进程后才能继续执行;
池中进程的数量是固定的,隐藏同一时间最多有固定数量的进程在运行。
multiprocessing.Pool([processes[, initializer[, initargs]]])
processes:要创建进程数量(默认
os.cpu_count()
个),在需要时才会创建;initializer(*initargs):每个工作进程启动时执行的方法(一般processes为几就执行几次);
Pool类中主要方法:
apply(func[, args[, kwds]])
:以阻塞方式,从池中获取进程并执行func(*args,**kwargs)
;apply_async(func[, args[, kwds[, callback[, error_callback]]]])
:异步方式(从池中获取一个进程)执行func(*args,**kwargs)
,返回AsyncResult;map(func, iterable[, chunksize])/map_async
:map的并行版本(可同时处理多个任务),异步时返回MapResult;starmap(func, iterable[, chunksize])/starmap_async
:与map的区别是允许传入多个参数;imap(func, iterable[, chunksize])
:map的惰性版本(返回结果是可迭代对象),内存消耗会低些,返回迭代器IMapIterator;imap_unordered(func, iterable[, chunksize])
:imap返回的结果顺序与map顺序是相同的,而此方法返回的顺序是乱序的(不依次等待每个任务完成,先完成的先返回),返回迭代器IMapIterator;close()
:关闭,禁止继续提交任务(已提交任务会继续执行完成);terminate()
:立即终止所有任务;join()
:等待工作进程完成(必须已close或terminate了);
def poolWorker():
print(f"worker in process {os.getpid()}")
time.sleep(1)
def poolWorkerOne(name):
print(f"worker one {name} in process {os.getpid()}")
time.sleep(random.random())
return name
def poolWorkerTwo(first, second):
res = first + second
print(f"worker two {res} in process {os.getpid()}")
time.sleep(1./(first+1))
return res
def poolInit():
print("pool init")
if __name__ == '__main__':
workers = multiprocessing.Pool(5, poolInit) # poolInit会被调用5次(线程启动时)
with workers:
for i in range(5):
workers.apply_async(poolWorker)
arg = [(i, i) for i in range(10)]
workers.map_async(poolWorkerOne, arg)
results = workers.starmap_async(poolWorkerTwo, arg) # 每个元素(元组)会被拆分为独立的参数
print("Starmap:", results.get())
results = workers.imap_unordered(poolWorkerOne, arg)
for r in results: # r是乱序的(若使用imap,则与输入arg的顺序相同)
print("Unordered:", r)
# 必须保证workers已close了
workers.join()
来源:https://blog.csdn.net/alwaysrun/article/details/127185356
猜你喜欢
- 啥是依赖规范可以以各种形式指定项目的依赖项,取决于依赖项的类型以及安装项目可能需要的可选约束版本约束^ 约束编写规范允许的版本范围^1.2.
- 写这篇文章的缘由是我使用 reqeusts 库请求接口的时候, 直接使用请求参数里的 json 字段发送数据, 但是服务器无法识别我发送的数
- 本文实例讲述了PHP实现上传文件并存进数据库的方法。分享给大家供大家参考。具体如下:show_add.php文件如下:<?php &n
- 大家经常用的是Adodb.Stream,但这时就有个缺陷,就是不支持断点续传了。经常看到flashget中是红脸(即不支持断点续传)其实支持
- 问题: 1. 后台管理员只有一个用户: admin, 密码: admin 2. 当管理员登陆成功后, 可以管理前台会员信
- python random库简单使用demo当我们需要生成随机数或者从一个序列中随机选择元素时,可以使用 Python 内置的 random
- 使用Scrapy爬取豆瓣某影星的所有个人图片以莫妮卡·贝鲁奇为例1.首先我们在命令行进入到我们要创建的目录,输入 scrapy startp
- EM算法实例通过实例可以快速了解EM算法的基本思想,具体推导请点文末链接。图a是让我们预热的,图b是EM算法的实例。这是一个抛硬币的例子,H
- 进行已经矢量化后的字符串数据,可以使用pandas的Series数据对象的map方法。这样,对于未经矢量化的数据也可以先进行数据的矢量化转换
- msxml3.dll 错误 '80004005'未指定的错误/Project/lijiang_071017/include/
- 本文实例讲述了Python3.6实现根据电影名称(支持电视剧名称),获取下载链接的方法。分享给大家供大家参考,具体如下:做个笔记(pytho
- 本文实例为大家分享了JavaScript缓动动画函数的封装代码,供大家参考,具体内容如下本文将从封装缓动动画的以下几个部分进行封装(1、单个
- 设置模板路径在django项目下创建templats文件来存放html文件为了减少模板加载调用过程及模板本身的冗余代码,Django 提供了
- 我正在参加天池上的一个竞赛,刚开始用的是DenseNet121但是效果没有达到预期,因此开始尝试使用模型融合,将Desenet和Xcepti
- 过滤html代码的函数,当然也可以使用正则表达式。<%Function FilterHTML(strToFilter)&nb
- 首先,了解下原理。1,提供文本框进行查询内容的输入2,将查询信息提交页面程序处理3,程序页主要作用:接受查询信息,根据此信息调用特定的SQL
- 内置方法 说明 __init__(self,...) 初始化对象,在创建新对象时调用 __del__(self) 释放对
- 大家好,我是辰哥。辰哥之前就想着Python可不可以剪辑视频(提取视频的音频,视频截取等等),然后辰哥在网上一搜,还真找到了Python的一
- SMTPSMTP是发送邮件的协议,Python内置对SMTP的支持,可以发送纯文本邮件、HTML邮件以及带附件的邮件。Python对SMTP
- 最近网上流行着一些采集程序,更多人拿着这些东西在网上叫卖,很多不太懂的人看着那些程序眼羡,其实如果你懂一些ASP,了解自动采集程序的原理后,