【Python基礎】並列処理:ThreadingとConcurrent

  • URLをコピーしました!
目次

Threading

前回、Pythonで並列処理する方法として、multiprocessingを試してみました。

今回はもう一つの並列処理の方法として、Threadingを試していきましょう。

正直に言って前回のmultiprocessingと今回のThreadingの違いは私自身がよく分かっていません。

ただどうやらmultiprocessingは複数のCPUを使って処理を分散させる方法(マルチプロセス)で、Threadingは一つのCPUで複数の処理を同時に行う方法(マルチスレッド)とのことでした。

とりあえず使ってみることにしましょう。

また前回、multiprocessingで処理時間を比較してみようとしたのですが、処理が簡単すぎたためかあまり違いが見られなかったので、今回は使い方の紹介を中心にしていきます。

基本の使い方:threading.Threadと.start()

まず簡単なプログラムを組んでみたのでそちらで紹介します。

import time
import threading

range_val = 100

def loopCalc1(i):
    for j in range(range_val):
        print(f'Thread1: {i}-{j}')

if __name__ == '__main__':
    time_start = time.time()
    
    for i in range(range_val):
        thread1 = threading.Thread(target=loopCalc1, args=(i,))
        thread1.start()
    
    time_end = time.time()
    
    print(f'{round(time_end-time_start,3)} sec')

最初にThreadingを使うためにインポートしています(import threading)。

プログラムの流れとしては、前回同様で指定した数を繰り返しつつ、さらにそれに対して指定した数を繰り返すという形になっています。

つまり、指定した数が10だとしたら、「0 – 0」、「0 – 1」、「0 – 2」…「5 – 0」、「5 – 1」、「5 – 2」…「9 – 7」、「9 – 8」、「9 – 9」という感じで数を繰り返すというわけです。

二つ目の数字の繰り返しを、loopCalc1関数で行なっています。

そしてThreadingを使っているのはこの部分。

    for i in range(range_val):
        thread1 = threading.Thread(target=loopCalc1, args=(i,))
        thread1.start()

「threding.Thread(target=処理したい関数, args=(引数1, 引数2, …))」としてスレッドを作成します。

ここが前回のmultiprocessingと違うところで、multiprocessingでは引数のリストを渡すことで、自動で処理を分割し、並列処理をしてくれました。

今回の場合、for関数を使って、値を一つ取得し、それを引数としてスレッドを作成しています(thread1 = threading.Thread(target=loopCalc1, args=(i,)))。

ちなみにargsが引数用のキーワードですが、ここの指定の仕方は「args=(引数1, 引数2, …)」のようにカッコと最後に「,(カンマ)」が必要ですので注意してください。

そして次の「thread1.start()」で、そのスレッドの処理をしています。

これがなぜ並列処理になるかというと、作成したThread(今回の場合、thread1)の処理が’終わっていなくても’、次の処理に進むということです。

通常、プログラムは上から順番に処理されていき、その際、上の処理が終わったら、次の処理に移るということを繰り返しています。

しかしこのThreadingでは上の処理がされていても、次に処理に取り掛かるということで、処理が並列化されているということです。

そのため先ほどのプログラムを実行するとこうなります。

実行結果
Thread1: 0-0
Thread1: 0-1
Thread1: 0-2
Thread1: 0-3
Thread1: 0-4
(中略)
Thread1: 78-5
Thread1: 78-6
Thread1: 78-7
Thread1: 9-6
Thread1: 62-0
(中略)
Thread1: 40-95
Thread1: 40-96
Thread1: 40-97
Thread1: 40-98
Thread1: 40-99

まず一つ目に途中で処理の順番が入れ替わっているのは、multiprocessingの時と同じです。

ですが、最後に処理時間(print(f'{round(time_end-time_start,3)} sec’))が表示されるようになっているはずなのですが、最後には表示されていません。

どこにあるか探してみると、処理の途中にありました。

Thread1: 35-3
Thread1: 18-2
Thread1: 18-3
Thread1: 18-4
Thread1: 27-0
0.019 sec
Thread1: 46-6
Thread1: 16-0
Thread1: 80-0
Thread1: 81-0
Thread1: 81-1

このようにThreadの処理が終わっていなくても、次の処理に取り掛かるというのがThreadingの特徴だと認識しています。

スレッドの処理を待つ .join()

ただ先ほどのようにスレッドの処理の後に他の処理をしたい場合もあります。

その場合は、「.join()」を使用します。

ということで「.join()」を追加してみたのがこちらです。

import time
import threading

range_val = 100

def loopCalc1(i):
    for j in range(range_val):
        print(f'Thread1: {i}-{j}')

if __name__ == '__main__':
    time_start = time.time()
    
    thread_list = []
    for i in range(range_val):
        thread1 = threading.Thread(target=loopCalc1, args=(i,))
        thread1.start()
        thread1.join()
    
    time_end = time.time()
    
    print(f'{round(time_end-time_start,3)} sec')

実行結果
Thread1: 0-0
Thread1: 0-1
Thread1: 0-2
Thread1: 0-3
Thread1: 0-4
(中略)
hread1: 99-95
Thread1: 99-96
Thread1: 99-97
Thread1: 99-98
Thread1: 99-99
0.06 sec

確かに一つの処理が終わって次の処理に行っているため、数字が順番に増加していっています。

しかしこれでは処理が並列化されているわけではなく、逐次処理と同じことをしているだけになってしまいます。

loopCalc1関数を使うところだけ並列処理し、その処理が終わったら、処理時間を表示するためにはこのようにします。

import time
import threading

range_val = 100

def loopCalc1(i):
    for j in range(range_val):
        print(f'Thread1: {i}-{j}')

if __name__ == '__main__':
    time_start = time.time()
    
    thread_list = []
    for i in range(range_val):
        thread1 = threading.Thread(target=loopCalc1, args=(i,))
        thread1.start()
        thread_list.append(thread1)
        
    for th in thread_list:
        th.join()
    
    time_end = time.time()
    
    print(f'{round(time_end-time_start,3)} sec')

実行結果
Thread1: 0-0
Thread1: 0-1
Thread1: 0-2
Thread1: 0-3
Thread1: 0-4
(中略)
Thread1: 64-20
Thread1: 64-21
Thread1: 64-22
Thread1: 25-54
Thread1: 56-54
Thread1: 56-55
(中略)
Thread1: 92-99
Thread1: 6-96
Thread1: 6-97
Thread1: 6-98
Thread1: 6-99
0.265 sec

Threadingの部分だけ抜き出してみましょう。

    thread_list = []
    for i in range(range_val):
        thread1 = threading.Thread(target=loopCalc1, args=(i,))
        thread1.start()
        thread_list.append(thread1)
        
    for th in thread_list:
        th.join()

「.join()」でスレッドの処理を待つ場合、立てたスレッドに対して、「.join()」が必要になります。

今回の場合、for関数を使って、「thread1」というスレッドが大量に生成され、並列的に処理されていきます。

つまり大量に生成されたそれぞれのスレッドに対して、「.join()」をしなければいけません。

そこで「thread_list」というリストを作成し、その中に作成したスレッドを格納しています(thread_list.append(thread1))。

そしてスレッドを立てるためのfor関数が終了したら、そのまま処理が下に進みます。

そこでfor関数を使って、「thread_list」内のスレッドに対して、「.join()」をすることで、スレッドの処理待ちをしているということです。

これにより全てのスレッドの処理が終わってから、下の処理へと進むようにできるというわけです。

複数の異なる処理を並列化

先ほどお話しした通り、Threadingではその処理の終了を待たずに、下の処理へと進むことができます。

そのため、異なる処理を並列化することも可能です。

例えばこんな感じ。

import time
import threading

range_val = 100

def loopCalc1(i):
    for j in range(range_val):
        print(f'Thread1: {i}-{j}')
        
def loopCalc2(i):
    for j in range(range_val):
        print(f'Thread2: {i}-{j}')

if __name__ == '__main__':
    time_start = time.time()
    
    thread_list = []
    for i in range(range_val):
        thread1 = threading.Thread(target=loopCalc1, args=(i,))
        thread2 = threading.Thread(target=loopCalc2, args=(i,))
        thread1.start()
        thread2.start()
        thread_list.append(thread1)
        thread_list.append(thread2)
        
    for th in thread_list:
        th.join()
    
    time_end = time.time()
    
    print(f'{round(time_end-time_start,3)} sec')

新たにloopCalc2関数を作成し、loopCalc2関数の処理用のThreadを追加しました(thread2 = threading.Thread(target=loopCalc2, args=(i,)))。

これを実行するとこうなります。

実行結果
Thread1: 0-0
Thread2: 0-0
Thread2: 0-1
Thread2: 1-0
Thread2: 1-1
Thread2: 1-2
(中略)
Thread1: 89-41
Thread2: 7-59
Thread2: 7-60
Thread1: 17-33
Thread1: 56-68
(中略)
Thread1: 0-95
Thread1: 0-96
Thread1: 0-97
Thread1: 0-98
Thread1: 0-99
0.611 sec

このようにthread1の処理とthread2の処理を並列的に行うことができます。

今回、thread1とthread2が交互に現れていないのは、あくまでも処理の時間差の問題で、処理が終了したものから表示されているだけです。

concurrent.futures:ThreadPoolExecutor

ここまでThreadingライブラリに関して紹介してきましたが、Threadingは一つのCPUで複数の処理を同時に行う方法(マルチスレッド)の方法において、最近では「concurrent」というより優秀なモジュールがあるようです。

ということで試してみました。

from concurrent.futures import ThreadPoolExecutor
import time

range_val = 100

def loopCalc1(i):
    for j in range(range_val):
        print(f'Thread1: {i}-{j}')

if __name__ == '__main__':
    time_start = time.time()
    
    with ThreadPoolExecutor(max_workers=4) as executor:
        for i in range(range_val):
            executor.submit(loopCalc1, i)
    
    time_end = time.time()
    
    print(f'{round(time_end-time_start,3)} sec')

まずは「from concurrent.futures import ThreadPoolExecutor」でインポートします。

使用しているのはこの3行。

    with ThreadPoolExecutor(max_workers=4) as executor:
        for i in range(range_val):
            executor.submit(loopCalc1, i)

まず「with ThreadPoolExecutor(max_workers=4) as executor:」でいくつのスレッドを同時に処理するか決めています。

今回の場合は、「max_workers=4」なので、4つのスレッドを同時に処理するという設定になっています。

そしてThreadingの時のように、「executor.submit(処理する関数, 引数)」という形で実行しています。

この「ThreadPoolExecutor」の場合、スレッドの開始のための「.start()」はないようです。

今回は「with構文」を使って、ThreadPoolExecutorを実行していますので、このwith構文内部での処理が終わるのを待って、下の処理へと進んでいきます。

with構文を使わないで書いた場合、そのまま下の処理へと進んでいってしまいますので、処理を待つ「.shutdown()」が必要です。

ということでwith構文を使わない場合は、こんな感じになります。

    executor = ThreadPoolExecutor(max_workers=4)
    for i in range(range_val):
        executor.submit(loopCalc1, i)
        
    executor.shutdown()

concurrent.futures:ProcessPoolExecutor

「concurrent.futures」の優秀な点の一つとして、先ほどのマルチスレッドとmultiprocessingのようなマルチプロセスを簡単に変更できるということがあります。

変更するのは単純に「ThreadPoolExecutor」を「ProcessPoolExecutor」に変更するだけです。

ということでこんな感じ。

from concurrent.futures import ProcessPoolExecutor
import time

range_val = 100

def loopCalc1(i):
    for j in range(range_val):
        print(f'Thread1: {i}-{j}')

if __name__ == '__main__':
    time_start = time.time()
    
    with ProcessPoolExecutor(max_workers=4) as executor:
        for i in range(range_val):
            executor.submit(loopCalc1, i)
    
    time_end = time.time()
    
    print(f'{round(time_end-time_start,3)} sec')

処理によって、マルチプロセスが向いているのか、マルチスレッドが向いているのかがあることでしょうし、こんなに簡単に切り替えられるとなると確かにいいですね。

これでマルチプロセス(multiprocessing、ProcessPoolExecutor)、マルチスレッド(Threading、ThreadPoolExecutor)の使い方が分かりました。

ということで次回はもう少し重い処理を使って、それぞれの効果を見ていきたいと思います。

ではでは今回はこんな感じで。

よかったらシェアしてね!
  • URLをコピーしました!

コメント

コメントする

目次