【Python基礎】並列処理:multiprocessing(mapとstarmap)

  • URLをコピーしました!
目次

multiprocessing

前回、matplotlibで大量の画像を出力した際に起こるメモリ不足でプログラムが落ちてしまう現象の対処法を試してみました。

そして何とかメモリ使用量は安定化したのですが、せっかくなら少しでも早く処理できるようにしたいという思いが芽生え、並列処理のやり方を勉強してみました。

調べてみるとどうやらmultiprocessingとthreadingという二つのライブラリがある模様。

どこがどう違うのかよく分かりませんが、とりあえず使い方を学ぶということで、今回はmultiprocessingを試してみました。

比較対象のプログラム

multiprocessingの効果を見るために、まずは並列処理ではない普通の処理(逐次処理)のプログラムの作成から行いました。

import time

range_val = 1000

def loopCalc(i):
    for j in range(range_val):
        print(f'{i}-{j}')
        
if __name__ == '__main__':
    time_start = time.time()
    
    for i in range(range_val):
        loopCalc(i)
        
    time_end = time.time()

    print(f'{round(time_end-time_start,3)} sec')

プログラムの流れとしては、指定した数を繰り返しつつ、さらにそれに対して指定した数を繰り返すという形になっています。

つまり、指定した数が10だとしたら、「0 – 0」、「0 – 1」、「0 – 2」…「5 – 0」、「5 – 1」、「5 – 2」…「9 – 7」、「9 – 8」、「9 – 9」という感じで数を繰り返すというわけです。

そして実行前後で時間を取得し、その差分から処理時間を取得します。

プログラムとしては、まず今回は処理時間を測定したいため、「time」モジュールをインポートしています(import time)。

「range_val = 1000」は繰り返しの回数です。

loonCalc関数で2つ目の数字の繰り返しをしています。

def loopCalc(i):
    for j in range(range_val):
        print(f'{i}-{j}')

 「if __name__ == ‘__main__’」の中で時間を取得しつつ、一つ目の数字の繰り返しをしています。

if __name__ == '__main__':
    time_start = time.time()
    
    for i in range(range_val):
        loopCalc(i)
        
    time_end = time.time()

    print(f'{round(time_end-time_start,3)} sec')

このプログラムをJupyter Notebook上ではなく、ターミナル(Windowsならコマンドプロンプト)上で実行します。

これは後ほど使うmultiprocessingがJupyter Notebook上では使えないためです。

そして実行するとこんな感じの結果が得られます。

実行結果
0-0
0-1
0-2
0-3
0-4
0-5
(中略)
999-995
999-996
999-997
999-998
999-999
2.967 sec

これを3回繰り返してみると「1回目:2.967 sec」、「2回目:3.047 sec」、「3回目:3.061 sec」となりました。

multiprocessing その1:引数が一つの場合(map)

それではmultiprocessingの使い方を勉強していきましょう。

先ほどのプログラムをmultiprocesingに対応させたものがこちらです。

import time
from multiprocessing import Pool

range_val = 1000

def loopCalc(i):
    for j in range(range_val):
        print(f'{i}-{j}')

if __name__ == '__main__':
    time_start = time.time()
    
    with Pool(4) as pool:
        pool.map(loopCalc, range(range_val))
        
    time_end = time.time()
    
    print(f'{round(time_end-time_start,3)} sec')

まず最初に変わっているのは「from multiprocessing import Pool」を追加し、multiprocessingの「Pool」をインポートしていることです。

次に並列処理のところで「with Pool(4) as pool:」としていますが、「Pool(4)」は4つのプロセス(CPU)を同時に使って並列処理するという意味です。

そして「pool.map(loopCalc, range(range_val))」で実行していますが、「pool.map(実行する関数, 引数のリスト)」という形で実行します。

このようにfor関数で繰り返し処理するのではなく、引数のリストを渡すことで、内部でそのリストに従って並列処理してくれるというわけです。

また「Jupyter Notebook上ではmultiprocessing」は使えませんので、ターミナルやコマンドプロンプト上で実行するようにしてください。

ちなみにJupyter Notebook上でmultiprocessingを使うと、処理が進まず、エラーも出ずに、止まったままになります。

これを実行するとこうなります。

0-0
0-1
0-2
0-3
0-4
0-5
(中略)
162-155
37-533
96-347
162-156
37-534
(中略)
881-995
881-996
881-997
881-998
881-999
3.041 sec

注目すべきは途中で処理の順番が変わっていることです。

先ほどの逐次処理の場合は、0から999へと順番に処理されていますが、multiprocessingの場合は並列で処理されるため、順番がずれるということがあるわけです。

処理時間的には「1回目:3.041 sec」、「2回目:3.103 sec」、「3回目:3.140 sec」と逐次処理とあまり変わりませんでした。

多分処理が単純すぎて、並列処理の恩恵が得られなかったのだと思われます。

multiprocessing その2:引数が二つ以上の場合(starmap)

先ほどの「pool.map(実行する関数, 引数のリスト)」では、引数が一つしか入れられません。

しかし関数では引数が二つ以上の場合も多々あります。

その場合にmultiprocessingを使いたい時は「starmap」を使用します。

この場合は引数のリストを作成し、それをさらにリストとしてまとめて、「pool.starmap(実行する関数, 引数のリストのリスト)」とします。

つまり引数がA、B、Cと3つある場合には、「[[A1, B1, C1],[A2, B2, C2],[A3, B3, C3]…]」のようにそれぞれの値を入れた2次元リストを作成するというわけです。

もちろん、Cがファイルパスのように固定な引数の場合、「[[A1, B1, C],[A2, B2, C],[A3, B3, C]…]」とすることも可能です。

考え方として、処理したい値の組み合わせを全て網羅したリストを作成し、それをmultiprocessingに渡すということです。

それでは試してみましょう。

import time
from multiprocessing import Pool
import itertools

range_val = 1000

def loopCalc(i, j):
    print(f'{i}-{j}')

if __name__ == '__main__':
    time_start = time.time()
    
    i_list = range(range_val)
    j_list = range(range_val)
    
    combination_list = itertools.product(i_list, j_list)
    
    with Pool(4) as pool:
        pool.starmap(loopCalc, combination_list)
        
    time_end = time.time()
    
    print(f'{round(time_end-time_start,3)} sec')

今回は結構大きく変わっています。

まずloopCalc関数ですが、二つの数字を引数として入力し、それを表示するだけになっています。

def loopCalc(i, j):
    print(f'{i}-{j}')

先ほどお話しした引数の全ての組み合わせを作成するため、i_list、j_listで二つのリストを作成し、itertools.productを使って、二つのリストの全組み合わせを作成しています。

    i_list = range(range_val)
    j_list = range(range_val)
    
    combination_list = itertools.product(i_list, j_list)

そしてその全組み合わせを格納したリストを使ってmultiprocessingを実行しています。

    with Pool(4) as pool:
        pool.starmap(loopCalc, combination_list)

実行してみるとこんな感じです。

0-0
0-1
0-2
0-3
0-4
0-5
(中略)
502-486
483-366
428-826
502-487
483-367
(中略)
999-995
999-996
999-997
999-998
999-999
3.258 sec

やはり途中で順番が変わっているのが分かります。

また処理時間的には「1回目:3.258 sec」、「2回目:3.350 sec」、「3回目:3.501 sec」と少し時間が延びてしまいました。

先ほど同様、処理が簡単すぎるため、multiprocessingの恩恵が得られなかったのとあわせて、全組み合わせのリストを作ったことから、先ほどよりも処理が遅くなってしまったと考えられます。

何はともあれ、これでmultiprocessingを使った並列処理は使えるようになりました。

次回はもう一つの並列処理「threading」を試してみましょう。

ではでは今回はこんな感じで。

よかったらシェアしてね!
  • URLをコピーしました!

コメント

コメントする

目次