一文了解如何在window上轻松实现多进程任务

参考

苏神写的parallel_apply link
python中multiprocessing文档 link

前言

在之前的工作中,我经常使用的多进程方式是进程池,在看了苏神写的parallel_apply之后,发现多进程使用队列的方式运行,从显示上(tqdm显示运行进度),灵活性上都更加好 经过更深入的使用之后,我发现其实进程池也可以比较灵活的传递参数。然而,在运行时发现多进程在windows上的支持很不友好,于是打算写一篇在windows上容易使用的多进程方法,当然也可以在linux上使用。

使用多进程来提高程序的运行效率是非常重要的,通常可以在数据处理,矩阵运算等方面应用。

进程在windows上使用的注意点

1.必须得保证你要使用多进程处理的程序是可序列化的,即可以转换为可存储或传输的形式,如generator就是不可序列化的形式,因此想要对jieba分词使用多进程就是不可行的
2.多个进程之间是独立的,所以想在多个进程中更新同一个类属性是不可行的,除非是使用multiprocessing中带的Array,List等方法,并将这些变量作为输入传入到进程中去,不过这个方法我并没有尝试过。
3.每个进程使用的函数都是一次性的,因此若是传入的类函数中,初始时赋值了变量,则这个变量在每一次运行时都会执行,若这个赋值变量的运行时间很长,则会导致使用多进程甚至比单独执行更慢,以下是例子:

import json
class myClass:
	def __init__(self):
		with open(file_name, 'r', encoding='utf-8') as f:
			self.data = json.load(f)
	
	def forward(self, idx):
		return self.data[idx]

上述例子中,初始化__init__函数中,需要载入数据来执行forward函数,这种情况下将foward放入多进程中,会极大的影响效率,甚至比单独执行更慢。
4.在if __name__ == '__main__'或者函数中设定的全局变量无法在多进程中使用,因为如果子进程中的代码尝试访问一个全局变量,它所看到的值(如果有)可能和父进程中执行 Process.start 那一刻的值不一样。例子如下:

import json
from multiprocessing import Pool
def forward(idx):
	return data[idx]

if __name__ == '__main__':
	global data
	with open(file_name, 'r', encoding='utf-8') as f:
		data = json.load(f)
	with Pool(6) as pool:
		pool.map(forward, [1,2,3,4,5])

而下面这种方式是可以的,因为全局变量是知识模块级别的变量

import json
from multiprocessing import Pool
with open(file_name, 'r', encoding='utf-8') as f:
	data = json.load(f)
def forward(idx):
	return data[idx]

if __name__ == '__main__':
	with Pool(6) as pool:
		pool.map(forward, [1,2,3,4,5])

5.在使用多进程时,应当使用if __name__ == '__main__',从而保护程序的入口点,不使用时可能会出现各种各样的错误。

方法

进程池

进程池是比较简单的方法,只需设定好池中进程的数量,使用map(function, args)即可使用,也可以使用starmap(function, args)的方法,map与starmap的区别在于map仅能传递一个参数,而starmap可以传递多个参数,例子如下

from multiprocessing import Pool
from tqdm import tqdm
def func(data):
    x = data[0]
    y = data[1]
    return x+y

def func1(x, y):
	return x+y

def simple_calculate():
	for data in tqdm(list(zip(*[range(10000000), \
        range(10000000)]))):
        func(data)

if __name__ == '__main__':
    with Pool(6) as pool:
        pool.map(func, tqdm(list(zip(*[range(10000000), \
        range(10000000)]))))
    with Pool(6) as pool:
    	pool.starmap(func1, tqdm(list(zip(*[range(10000000), \
        range(10000000)]))))
    simple_calculate()

map调用的是func,每次传入的参数为data, 而starmap每次传入的参数为x,y。使用进程池运行上述例子的时间大约是4s,而简单的运算simple_calculation为3s,显然在简单的数值运算上,多进程并没有很好的性能

队列

队列,顾名思义是将任务放入到队列中,然后每个进程从队列中取出任务执行的过程。因此使用队列进行多进程操作包括:
1.将任务放入到队列中,我们使用Queue.put()方法
2.启动进程,我们是用Process(target=target, args=args).start()
3.获取结果,我们是用Queue.get()方法
4.终止进程,我们将终止信号STOP传入到每个进程中,使得进程停止的方法是iter(iterable, sentinel)中的sentinel参数,当iter获得sentinel参数时,迭代停止。
队列的方法可以需在worker函数中,将函数func的输入变为*data,将调用的函数funcs自定义为1个参数或者多个参数,这对于调用别人写好的函数非常方便,下面是使用队列的例子:

from tqdm import tqdm
from multiprocessing import Process, Queue

def worker(input, output):
    for idx, func, data in iter(input.get, 'STOP'):
        result = func(*data)
        output.put((idx, result))


def multi_process(func, iterable, nworkers):
    NUMBER_OF_PROCESSES = nworkers
    results = []

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for idx, task in enumerate(iterable):
        task_queue.put((idx, func, task))

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # get result
    for _ in tqdm(range(len(iterable))):
        results.append(done_queue.get())

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')

    # the results are unordered, so use original idx to recovery order
    results.sort(key=lambda x: x[0])
    return [x[1] for x in results]

def funcs(x,y):
    return x+y

if __name__ == '__main__':
	multi_process(funcs, tqdm(list(zip(*[range(10000000), \
	range(10000000)]))), 6)

在这个例子中,执行时间为5-6分钟,远超于普通计算与进程池,在观察中发现,对于1000w数据,将任务放到进程池中的速度为30w/s,而获取结果的速度为3w/s。

总结

可以发现,当计算任务非常简单的时候,使用多进程带来的提升效果不明显,当任务数非常庞大的时候,应当选择进程池的方法提升效率,当任务数不大的时候,可以同时选择进程池与队列的方式。
我在pypi上传了spft(standard python function tools)的包,可以简单快速的上手多进程任务,使用pip install spft可以下载,使用如下:

from spft.multiprocess import multi_process
multi_process(func, iterable, worker_num, is_queue)

multi_process可以自动匹配函数使用单参数或多参数,is_queue决定使用进程池或队列方式。

Pypi的上传方式

首先运行python setup.py sdist bdist_wheel,在dist目录下生成.whl.gz文件,然后确认是否下载twine包,pip install twine,接着运行twine upload dist/*即可。

更新 2022.8.24

最近在使用库处理数据的时候,速度能够提升8倍,但是进度条显示有几个数据没有处理完,同时程序跑到最后不会终止,一开始我怀疑是多进程无法处理大量数据的情况,后来终于发现,原来数据中存在脏乱数据,使用单进程的时候,遇到错误的数据程序会exit,但不会报错,因此在多进程的处理过程中,一个进行挂掉了但不会影响其他进程的处理,但是挂掉的进程无法stop导致程序跑到最后不会终止

更新 2022.9.1

上述多进程的使用方法都是输入完整的数据,输出完整的句子。但是在日常使用过程中,我们会遇到数据量非常大,没办法全部载入到内存中的情况,同时我们也希望能够每次读取已经处理好的数据,也就是“生产者消费者原则”。因此我们希望设计一个多进程的生成器,能够实现对多进程中处理好的数据进行实时读取,方法如下:

def multi_process_generator(func, iterable, worker_num):
    """
    :param func: 调用的函数
    :param iterable: 可迭代对象
    :param worker_num: 进程数量
    :yield: 处理好的结果
    """
    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for idx, task in enumerate(iterable):
        task_queue.put((idx, func, task))

    # Start worker processes
    for i in range(worker_num):
        if func.__code__.co_argcount == 1:
            Process(target=worker_single, args=(task_queue, done_queue)).start()
        else:
            Process(target=worker, args=(task_queue, done_queue)).start()

    # get result
    for _ in tqdm(iterable):
        yield done_queue.get()

    # Tell child processes to stop
    for i in range(worker_num):
        task_queue.put('STOP')

上述方法已经整合到spft-0.0.4版本中了,使用方法如下:

from spft.multiprocess import multi_process, multi_process_generator
def funcs(data):
    x = data[0]
    y = data[1]
    return x+y
for data in multi_process_generator(funcs, list(zip(*[range(100), range(100)])), 6):
    print(data)

更新2022.9.8

对于9月1日更新的多进程生成器,存在一个缺陷,当数据量非常庞大时,不停的将数据塞入task_queue中会导致内存爆炸,物理机死机,因此我们需要限制task_queue队列中最多可以放置的数据个数,可以采用设置max_queue_size的方式,例如task_queue = Queue(max_queue_size)

使用多进程时,模型与方法被重复进行了n次

在使用多进程的时候,明明只想构建模型一次,或者加载数据一次,但是每次都会运行和进程数一样的次数,甚至导致内存爆炸电脑奔溃。最近终于找到了原因,windows中对于没有if __name__ == '__main__'的方法,在使用多进程的时候都会启动n次,因为windows没有标识符,所以我们只需要把所以只想运行一次的程序放到我们设定的__main__标识符下面,将多进程调用的程序放到标识符外面,就可以在windows上顺利使用多进程进行加速了!

frozon_support问题

当我们不在标识符__main__下面运行程序的时候,程序会报frozon_support的问题,不过我们可以将多进程的方法写到一个函数中,然后再在标识符下面运行多进程函数,就可以避免这个问题了。

更新2022.9.9 (joblib)

在使用一些函数的时候,我们希望使用functional.partial固定某些参数,并使用多进程去调用这些函数,传入指定的参数,在这种情况下,使用multiprocess可能就没这么方便了,但是可以使用joblib来进行多进程的使用

from tqdm import tqdm
import requests
from functools import partial
from joblib import Parallel, delayed
process_fn = partial(requests.post, url='http://0.0.0.0:8000/deploy')
Parallel(n_jobs=4)(delayed(process_fn)(params={'param': param}) for param in tqdm(params, desc='load param'))

通过joblib,我们可以轻松的指定params参数进行多进程的传输。