一文了解如何在window上轻松实现多进程任务
参考
苏神写的parallel_apply link
python中multiprocessing文档 link
前言
在之前的工作中,我经常使用的多进程方式是进程池,在看了苏神写的parallel_apply之后,发现多进程使用队列的方式运行,从显示上(tqdm显示运行进度),灵活性上都更加好 经过更深入的使用之后,我发现其实进程池也可以比较灵活的传递参数。然而,在运行时发现多进程在windows上的支持很不友好,于是打算写一篇在windows上容易使用的多进程方法,当然也可以在linux上使用。
使用多进程来提高程序的运行效率是非常重要的,通常可以在数据处理,矩阵运算等方面应用。
进程在windows上使用的注意点
1.必须得保证你要使用多进程处理的程序是可序列化的,即可以转换为可存储或传输的形式,如generator就是不可序列化的形式,因此想要对jieba分词使用多进程就是不可行的
2.多个进程之间是独立的,所以想在多个进程中更新同一个类属性是不可行的,除非是使用multiprocessing中带的Array,List等方法,并将这些变量作为输入传入到进程中去,不过这个方法我并没有尝试过。
3.每个进程使用的函数都是一次性的,因此若是传入的类函数中,初始时赋值了变量,则这个变量在每一次运行时都会执行,若这个赋值变量的运行时间很长,则会导致使用多进程甚至比单独执行更慢,以下是例子:
import json
class myClass:
def __init__(self):
with open(file_name, 'r', encoding='utf-8') as f:
self.data = json.load(f)
def forward(self, idx):
return self.data[idx]
上述例子中,初始化__init__
函数中,需要载入数据来执行forward
函数,这种情况下将foward放入多进程中,会极大的影响效率,甚至比单独执行更慢。
4.在if __name__ == '__main__'
或者函数中设定的全局变量无法在多进程中使用,因为如果子进程中的代码尝试访问一个全局变量,它所看到的值(如果有)可能和父进程中执行 Process.start 那一刻的值不一样。例子如下:
import json
from multiprocessing import Pool
def forward(idx):
return data[idx]
if __name__ == '__main__':
global data
with open(file_name, 'r', encoding='utf-8') as f:
data = json.load(f)
with Pool(6) as pool:
pool.map(forward, [1,2,3,4,5])
而下面这种方式是可以的,因为全局变量是知识模块级别的变量
import json
from multiprocessing import Pool
with open(file_name, 'r', encoding='utf-8') as f:
data = json.load(f)
def forward(idx):
return data[idx]
if __name__ == '__main__':
with Pool(6) as pool:
pool.map(forward, [1,2,3,4,5])
5.在使用多进程时,应当使用if __name__ == '__main__'
,从而保护程序的入口点,不使用时可能会出现各种各样的错误。
方法
进程池
进程池是比较简单的方法,只需设定好池中进程的数量,使用map(function, args)
即可使用,也可以使用starmap(function, args)
的方法,map
与starmap的区别在于map
仅能传递一个参数,而starmap
可以传递多个参数,例子如下
from multiprocessing import Pool
from tqdm import tqdm
def func(data):
x = data[0]
y = data[1]
return x+y
def func1(x, y):
return x+y
def simple_calculate():
for data in tqdm(list(zip(*[range(10000000), \
range(10000000)]))):
func(data)
if __name__ == '__main__':
with Pool(6) as pool:
pool.map(func, tqdm(list(zip(*[range(10000000), \
range(10000000)]))))
with Pool(6) as pool:
pool.starmap(func1, tqdm(list(zip(*[range(10000000), \
range(10000000)]))))
simple_calculate()
map
调用的是func,每次传入的参数为data
, 而starmap每次传入的参数为x,y
。使用进程池运行上述例子的时间大约是4s,而简单的运算simple_calculation
为3s,显然在简单的数值运算上,多进程并没有很好的性能
队列
队列,顾名思义是将任务放入到队列中,然后每个进程从队列中取出任务执行的过程。因此使用队列进行多进程操作包括:
1.将任务放入到队列中,我们使用Queue.put()
方法
2.启动进程,我们是用Process(target=target, args=args).start()
3.获取结果,我们是用Queue.get()
方法
4.终止进程,我们将终止信号STOP
传入到每个进程中,使得进程停止的方法是iter(iterable, sentinel)
中的sentinel
参数,当iter
获得sentinel
参数时,迭代停止。
队列的方法可以需在worker
函数中,将函数func
的输入变为*data
,将调用的函数funcs
自定义为1个参数或者多个参数,这对于调用别人写好的函数非常方便,下面是使用队列的例子:
from tqdm import tqdm
from multiprocessing import Process, Queue
def worker(input, output):
for idx, func, data in iter(input.get, 'STOP'):
result = func(*data)
output.put((idx, result))
def multi_process(func, iterable, nworkers):
NUMBER_OF_PROCESSES = nworkers
results = []
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for idx, task in enumerate(iterable):
task_queue.put((idx, func, task))
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# get result
for _ in tqdm(range(len(iterable))):
results.append(done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
# the results are unordered, so use original idx to recovery order
results.sort(key=lambda x: x[0])
return [x[1] for x in results]
def funcs(x,y):
return x+y
if __name__ == '__main__':
multi_process(funcs, tqdm(list(zip(*[range(10000000), \
range(10000000)]))), 6)
在这个例子中,执行时间为5-6分钟,远超于普通计算与进程池,在观察中发现,对于1000w数据,将任务放到进程池中的速度为30w/s,而获取结果的速度为3w/s。
总结
可以发现,当计算任务非常简单的时候,使用多进程带来的提升效果不明显,当任务数非常庞大的时候,应当选择进程池的方法提升效率,当任务数不大的时候,可以同时选择进程池与队列的方式。
我在pypi上传了spft(standard python function tools)的包,可以简单快速的上手多进程任务,使用pip install spft
可以下载,使用如下:
from spft.multiprocess import multi_process
multi_process(func, iterable, worker_num, is_queue)
multi_process
可以自动匹配函数使用单参数或多参数,is_queue
决定使用进程池或队列方式。
Pypi的上传方式
首先运行python setup.py sdist bdist_wheel
,在dist
目录下生成.whl
和.gz
文件,然后确认是否下载twine包,pip install twine
,接着运行twine upload dist/*
即可。
更新 2022.8.24
最近在使用库处理数据的时候,速度能够提升8倍,但是进度条显示有几个数据没有处理完,同时程序跑到最后不会终止,一开始我怀疑是多进程无法处理大量数据的情况,后来终于发现,原来数据中存在脏乱数据,使用单进程的时候,遇到错误的数据程序会exit,但不会报错,因此在多进程的处理过程中,一个进行挂掉了但不会影响其他进程的处理,但是挂掉的进程无法stop导致程序跑到最后不会终止
更新 2022.9.1
上述多进程的使用方法都是输入完整的数据,输出完整的句子。但是在日常使用过程中,我们会遇到数据量非常大,没办法全部载入到内存中的情况,同时我们也希望能够每次读取已经处理好的数据,也就是“生产者消费者原则”。因此我们希望设计一个多进程的生成器,能够实现对多进程中处理好的数据进行实时读取,方法如下:
def multi_process_generator(func, iterable, worker_num):
"""
:param func: 调用的函数
:param iterable: 可迭代对象
:param worker_num: 进程数量
:yield: 处理好的结果
"""
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for idx, task in enumerate(iterable):
task_queue.put((idx, func, task))
# Start worker processes
for i in range(worker_num):
if func.__code__.co_argcount == 1:
Process(target=worker_single, args=(task_queue, done_queue)).start()
else:
Process(target=worker, args=(task_queue, done_queue)).start()
# get result
for _ in tqdm(iterable):
yield done_queue.get()
# Tell child processes to stop
for i in range(worker_num):
task_queue.put('STOP')
上述方法已经整合到spft-0.0.4版本中了,使用方法如下:
from spft.multiprocess import multi_process, multi_process_generator
def funcs(data):
x = data[0]
y = data[1]
return x+y
for data in multi_process_generator(funcs, list(zip(*[range(100), range(100)])), 6):
print(data)
更新2022.9.8
对于9月1日更新的多进程生成器,存在一个缺陷,当数据量非常庞大时,不停的将数据塞入task_queue
中会导致内存爆炸,物理机死机,因此我们需要限制task_queue
队列中最多可以放置的数据个数,可以采用设置max_queue_size的方式,例如task_queue = Queue(max_queue_size)
使用多进程时,模型与方法被重复进行了n次
在使用多进程的时候,明明只想构建模型一次,或者加载数据一次,但是每次都会运行和进程数一样的次数,甚至导致内存爆炸电脑奔溃。最近终于找到了原因,windows中对于没有if __name__ == '__main__'
的方法,在使用多进程的时候都会启动n次,因为windows没有标识符,所以我们只需要把所以只想运行一次的程序放到我们设定的__main__
标识符下面,将多进程调用的程序放到标识符外面,就可以在windows上顺利使用多进程进行加速了!
frozon_support问题
当我们不在标识符__main__
下面运行程序的时候,程序会报frozon_support
的问题,不过我们可以将多进程的方法写到一个函数中,然后再在标识符下面运行多进程函数,就可以避免这个问题了。
更新2022.9.9 (joblib)
在使用一些函数的时候,我们希望使用functional.partial
固定某些参数,并使用多进程去调用这些函数,传入指定的参数,在这种情况下,使用multiprocess
可能就没这么方便了,但是可以使用joblib
来进行多进程的使用
from tqdm import tqdm
import requests
from functools import partial
from joblib import Parallel, delayed
process_fn = partial(requests.post, url='http://0.0.0.0:8000/deploy')
Parallel(n_jobs=4)(delayed(process_fn)(params={'param': param}) for param in tqdm(params, desc='load param'))
通过joblib,我们可以轻松的指定params参数进行多进程的传输。