搜尋

concurrent.futures 平行任務處理

Python 在執行時,通常是採用同步的任務處理模式 ( 一個處理完成後才會接下去處理第二個 ),然而 Python 的標準函式「concurrent.futures」,提供了平行任務處理 ( 非同步 ) 的功能,能夠同時處理多個任務,這篇教學會介紹 concurrent.futures 的用法。

本篇使用的 Python 版本為 3.7.12,所有範例可使用 Google Colab 實作,不用安裝任何軟體 ( 參考:使用 Google Colab )

同步與非同步

同步和非同步的常見說法是:「同步模式下,每個任務必須按照順序執行,後面的任務必須等待前面的任務執行完成,在非同步模式下,後面的任務不用等前面的,各自執行各自的任務」,也可以想像成「同一個步道 vs 不同步道」,透過步道的方式,會更容易明白同步和非同步。( 因為有時會將同步與非同步的中文字面意思,想成「一起走」或「不要一起走」,很容易搞錯 )

  • 同步:「同一個步道」,只能依序排隊前進
  • 非同步:「不 ( 非 ) 同步道」,可以各走各的

Python 教學 - concurrent.futures 平行任務處理

Thread 和 Process

concurrent.futures 提供了 ThreadPoolExecutorProcessPoolExecutor 兩種可以平行處理任務的實作方法,ThreadPoolExecutor 是針對 Thread ( 執行緒 ),ProcessPoolExecutor 是針對 Process ( 程序 ),下方是 Thread 和 Process 的簡單差異說明:

英文 中文 說明
Thread 執行緒 程式執行任務的基本單位。
Process 程序 啟動應用程式時產生的執行實體,需要一定的 CPU 與記憶體資源,Process 由一到多個 Thread 組成,同一個 Process 裡的 Thread 可以共用記憶體資源。

import concurrent.futures

要使用 concurrent.futures 必須先 import concurrent.futures 模組,或使用 from 的方式,單獨 import 特定的類型。

更多資訊可以參考 Python 官方文件:concurrent.futures 啟動平行任務

import concurrent.futures
from concurrent.futures import ThreadPoolExecutor

ThreadPoolExecutor

ThreadPoolExecutor 會透過 Thread 的方式建立多個 Executors ( 執行器 ) ,執行並處理多個任務 ( tasks ),ThreadPoolExecutor 有四個參數,最常用的為 max_workers:

參數 說明
max_workers Thread 的數量,預設 5 ( CPU number * 5,每個 CPU 可以處理 5 個 Thread),數量越多,運行速度會越快,如果設定小於等於 0 會發生錯誤。
thread_name_prefix Thread 的名稱,預設 ''。
initializer 每個 Thread 啟動時調用的可調用對象,預設 None。
initargs 傳遞給初始化程序的參數,使用 tuple,預設 ()。

使用 ThreadPoolExecutor 後,就能使用 Executors 的相關方法:

方法 參數 說明
submit fn, *args, **kwargs 執行某個函式。
map func, *iterables 使用 map 的方式,使用某個函式執行可迭代的內容。
shutdown wait 完成執行後回傳信號,釋放正在使用的任何資源,wait 預設 True 會在所有對象完成後才回傳信號,wait 設定 False 則會在執行後立刻回傳。

舉例來說,下方的程式碼執行後,會按照順序 ( 同步 ) 顯示出數字,前一個任務尚未處理完,就不會執行後續的工作。

import time
def test(n):
    for i in range(n):
        print(i, end=' ')
        time.sleep(0.2)

test(2)
test(3)
test(4)

# 0 1 0 1 2 0 1 2 3

如果改成 ThreadPoolExecutor 的方式,就會發現三個函式就會一起進行 ( 如果執行的函式大於 5,可再設定 max_workers 的數值 )。

import time
from concurrent.futures import ThreadPoolExecutor

def test(n):
    for i in range(n):
        print(i, end=' ')
        time.sleep(0.2)

executor = ThreadPoolExecutor()  # 設定一個執行 Thread 的啟動器

a = executor.submit(test, 2)     # 啟動第一個 test 函式
b = executor.submit(test, 3)     # 啟動第二個 test 函式
c = executor.submit(test, 4)     # 啟動第三個 test 函式
executor.shutdown()              # 關閉啟動器 ( 如果沒有使用,則啟動器會處在鎖住的狀態而無法繼續 )

# 0 0 0 1 1 1 2 2 3

上述的做法,可以改用 with...as 的方式 ( 有點類似 open 的 with )。

import time
from concurrent.futures import ThreadPoolExecutor

def test(n):
    for i in range(n):
        print(i, end=' ')
        time.sleep(0.2)

with ThreadPoolExecutor() as executor:    # 改用 with...as
    executor.submit(test, 2)
    executor.submit(test ,3)
    executor.submit(test, 4)

# 0 0 0 1 1 1 2 2 3

上述的範例,也可以改用 map 的做法:

import time
from concurrent.futures import ThreadPoolExecutor

def test(n):
    for i in range(n):
        print(i, end=' ')
        time.sleep(0.2)

with ThreadPoolExecutor() as executor:
    executor.map(test, [2,3,4])

# 0 0 0 1 1 1 2 2 3

輸入文字,停止函式執行

透過平行任務處理的方法,就能輕鬆做到「輸入文字,停止正在執行的函式」,以下方的例子而言,run 是一個具有「無窮迴圈」的函式,如果不使用平行任務處理,在 run 後方的程式都無法運作 ( 會被無窮迴圈卡住 ),而 keyin 是一個具有「input」指令的函式,如果不使用平行任務處理,在 keyin 後方的程式也無法運作 ( 會被 input 卡住 ),因此如果使用 concurrent.futures,就能讓兩個函式同時運行,搭配全域變數的做法,就能在輸入特定指令時,停止另外函式的運作。

import time
from concurrent.futures import ThreadPoolExecutor

a = True               # 定義 a 為 True

def run():
    global a           # 定義 a 是全域變數
    while a:           # 如果 a 為 True
        print(123)     # 不斷顯示 123
        time.sleep(1)  # 每隔一秒

def keyin():
    global a           # 定義 a 是全域變數
    if input() == 'a':
        a = False      # 如果輸入的是 a,就讓 a 為 False,停止 run 函式中的迴圈

executor = ThreadPoolExecutor()
e1 = executor.submit(run)
e2 = executor.submit(keyin)
executor.shutdown()

ProcessPoolExecutor

ProcessPoolExecutor 會透過 Process 的方式建立多個 Executors ( 執行器 ),執行並處理多個程序,ProcessPoolExecutor 有四個參數,最常用的為 max_workers:

參數 說明
max_workers Process 的數量,預設為機器的 CPU 數量,如果 max_workers 小於等於 0 或大於等於 61 會發生錯誤。
thread_name_prefix Thread 的名稱,預設 ''。
initializer 每個 Thread 啟動時調用的可調用對象,預設 None。
initargs 傳遞給初始化程序的參數,使用 tuple,預設 ()。

使用 ProcessPoolExecutor 後,就能使用 Executors 的相關方法:

方法 參數 說明
submit fn, *args, **kwargs 執行某個函式。
map func, *iterables 使用 map 的方式,使用某個函式執行可迭代的內容。
shutdown wait 完成執行後回傳信號,釋放正在使用的任何資源,wait 預設 True 會在所有對象完成後才回傳信號,wait 設定 False 則會在執行後立刻回傳。

ProcessPoolExecutor 的用法基本上和 ThreadPoolExecutor 很像,但 ProcessPoolExecutor 主要會用做處理比較需要運算的程式,ThreadPoolExecutor 會使用於等待輸入和輸出 ( I/O ) 的程式,兩者執行後也會有些差別,ProcessPoolExecutor 執行後最後是顯示運算結果,而 ThreadPoolExecutor 則是顯示過程。

import time
from concurrent.futures import ProcessPoolExecutor

def test(n):
    for i in range(n):
        print(i, end=' ')
        time.sleep(0.2)
    print()

with ProcessPoolExecutor() as executor:
    executor.map(test, [4,5,6])

Python 教學 - concurrent.futures 平行任務處理

如果是使用 ThreadPoolExecutor 則會如下圖的結果:

Python 教學 - concurrent.futures 平行任務處理

此外,Python 3.5 之後 map() 方法多了 chunksize 參數可以使用,該參數只對 ProcessPoolExecutor 有效,可以提升處理大量可迭代物件的執行效能,chunksize 預設 1,數值越大效能越好 ( 以電腦本身 CPU 的效能為主 )。

import time
from concurrent.futures import ProcessPoolExecutor

def test(n):
    for i in range(n):
        print(i, end=' ')
        time.sleep(0.2)
    print()

with ProcessPoolExecutor() as executor:
    executor.map(test, [4,5,6], chunksize=5)  # 設定 chunksize

小結

Python 的 concurrent.futures 內建函式庫是一個相當方便的函式庫,不僅可以讓原本同步的執行變成非同步,大幅減少工作時間,用法上也比使用 multiprocessing、threading、asyncio 容易得多,是相當推薦的內建函式庫。

意見回饋

如果有任何建議或問題,可傳送「意見表單」給我,謝謝~

Python 教學

基本介紹

Python 學習導讀 關於 Python 使用 Google Colab 使用 Anaconda 使用 Python 虛擬環境

資料型別

變數 variable 變數 ( 全域、區域 ) 數字 number 文字與字串 string 文字與字串 ( 常用方法 ) 文字與字串 ( 格式化 ) 串列 list 串列 ( 常用方法 ) 元組/數組 tuple 字典 dictionary 集合 set

語法觀念

縮排和註解 運算子 operator 邏輯判斷 ( if、elif、else ) 邏輯判斷 ( and 和 or ) 重複迴圈 ( for、while ) 例外處理 ( try、except ) 生成式 comprehension 物件類別 class 物件繼承 inheritance 匯入模組 import

函式操作

函式 function 匿名函式 lambda 遞迴 recursion 產生器 generator 裝飾器 decorator 閉包 closure

內建函式/方法

輸入與輸出 數學計算 字串操作與轉換 迭代物件轉換 迭代物件操作 檔案讀寫 ( open ) eval() 與 exec()

標準函式庫/模組

隨機數 random 數學 math 數學統計函式 statistics 時間與日期 datetime 時間處理 time 日曆 calendar 使用正規表達式 re 檔案操作 os 查找匹配檔案 glob 高階檔案操作 shutil 高效迭代器 itertools 容器資料型態 collections CSV 檔案操作 JSON 檔案操作 concurrent.futures

網路爬蟲

Python 網路爬蟲導讀 關於網路爬蟲 破解反爬蟲的方法 Requests 函式庫 Beautiful Soup 函式庫 Selenium 函式庫 爬取 PTT 文章標題 自動下載 PTT 正妹圖片 同時下載多張圖片 爬取空氣品質指標 ( AQI ) 爬取氣象預報 爬取現在天氣 LINE Notify 雷達回波圖 LINE Notify 即時地震資訊 爬取臺灣銀行牌告匯率 爬取 Yahoo 股市即時股價 爬取 LINE TODAY 留言 批次下載 Pinterest 圖片 登入 Mobile01 截圖下載 Twitter 自動上傳圖文

網頁服務與應用

Flask 函式庫 使用 ngrok 服務 Google Cloud Functions 串接 Gmail 寄送電子郵件 讀取 Google 試算表 寫入 Google 試算表 發送 LINE Notify 通知

LINE BOT 教學

LINE BOT 教學導讀 建立 LINE Channel 設定 Colab 開發環境 建立並串接 Webhook 解析 LINE 的訊息 自動回覆訊息 主動推播訊息 建立圖文選單 切換圖文選單 發送樣板訊息 發送 Flex Message 使用 Requests 傳送訊息 使用 LINE URL Scheme 氣象機器人 (1) 氣象機器人 (2) 氣象機器人 (3) 氣象機器人 (4)

OpenCV 教學

OpenCV 教學導讀 OpenCV 函式庫 開啟並顯示圖片 寫入並儲存圖片 讀取並播放影片 寫入並儲存影片 取得影像資訊 旋轉/翻轉/改變尺寸 影像的幾何變形 影像的色彩轉換 繪製各種形狀 影像加入文字 剪裁影像 調整對比和亮度 負片效果 影像模糊化 影像的疊加與相減 二值化黑白影像 影像的侵蝕與膨脹 影像邊緣偵測 影像遮罩 馬賽克效果 子母畫面影片 萬花筒影片效果 多畫面延遲播放影片 搞笑全景影片合成效果 凸透鏡效果 ( 魚眼效果 ) 倒數計時自動拍照效果 線性漸層填色 合成半透明圖片 將指定的顏色變透明 處理 gif 動畫 影片轉透明 gif 動畫 辨識 QRCode 和 BarCode 掃描 QRCode 切換效果 偵測滑鼠事件 滑鼠選取自動馬賽克 即時在影片中繪圖 偵測鍵盤行為 加入滑桿 ( Trackbar )

AI 影像辨識教學

AI 影像辨識教學導讀 OpenCV 人臉偵測 OpenCV 人臉馬賽克 OpenCV 五官偵測 OpenCV 汽車偵測 OpenCV 行人偵測 OpenCV 辨識不同人臉 OpenCV 單物件追蹤 OpenCV 多物件追蹤 OpenCV 抓取特定顏色 OpenCV 追蹤並標記顏色 情緒辨識與年齡偵測 辨識微笑,拍照儲存 使用 Mediapipe Mediapipe 人臉偵測 Mediapipe 人臉網格 Mediapipe 手掌偵測 Mediapipe 姿勢偵測 Mediapipe 全身偵測 Mediapipe 物體偵測 Mediapipe 人物去背 Mediapipe 手勢辨識 辨識比中指,自動馬賽克 用手指在影片中畫圖 手指擦除鏡子霧氣效果 Jupyter 安裝 Tensorflow 使用 Teachable Machine 辨識剪刀、石頭、布 辨識是否戴口罩 辨識手寫數字

NumPy 教學

NumPy 教學導讀 NumPy 函式庫 資料型態 建立陣列 讀取陣列 迭代陣列 陣列項目賦值 修改陣列形狀 修改陣列項目 填充陣列 分割陣列 合併陣列 陣列排序 廣播 搜尋陣列項目 算數計算 數學函式 隨機數 字串操作處理

matplotlib 圖表

matplotlib 教學導讀 matplotlib 函式庫 Figure 和 Axes Figure 參數設定 建立多個子圖表 設定圖表標籤 設定座標軸位置 設定座標軸刻度文字 資料文字標記 加入顏色對照表 使用極座標系統 使用 3D 圖表 圖表顯示中文 下載儲存圖表 顯示圖片 製作圖表動畫 ( 圖表 ) 折線圖 ( 圖表 ) 散布圖 ( 圖表 ) 長條圖 ( 圖表 ) 圓餅圖 ( 圖表 ) 甜甜圈圖 ( 圖表 ) 等高線圖 ( 圖表 ) 階梯折線圖 ( 圖表 ) 堆疊折線圖 ( 圖表 ) 堆疊長條圖 ( 圖表 ) 極座標長條圖 ( 圖表 ) 極座標散布圖 ( 圖表 ) 3D 柱狀長條圖 ( 圖表 ) 3D 散布圖

Tkinter 設計介面

建立 Tkinter 視窗 Label 標籤 Button 按鈕 Radiobutton 單選按鈕 Checkbutton 複選按鈕 Entry 單行輸入框 Text 多行輸入框 Listbox 列表選擇框 Frame 框架 Scrollbar 滾動條 ( 範例 ) Label 製作時鐘 ( 範例 ) 點擊按鈕開檔案

實用範例

定時自動螢幕截圖 LINE Notify 傳送螢幕截圖 批次重新命名檔案 批次圖片轉檔 批次調整圖片尺寸 調整圖片亮度和對比 裁切與旋轉圖片 拼接多張圖片 圖片加上 logo 浮水印 圖片加上文字浮水印 圖片馬賽克效果 讀取與修改圖片 Exif 圖片轉文字 ( OCR ) 讀取聲音資訊、輸出聲音 聲音剪輯與串接 聲音音量調整 聲音混合與反轉 改變聲音速度 播放聲音 麥克風錄音 顯示聲波圖形 影片轉檔 取出影片聲音或加入聲音 影片剪輯與合併 影片混合與排列顯示 改變影片尺寸、旋轉翻轉 調整影片速度、倒轉影片 調整影片亮度/對比/顏色 影片轉 gif 動畫 影片中加入文字 影片自動加上字幕 影片截圖、圖片轉影片 下載 Youtube 影片 下載 Youtube 清單影片 產生 QRCode 產生 BarCode 讀取 PDF 內容 PDF 拆分/合併/插入/刪除 讀取 EXCEL 內容 寫入資料到 EXCEL CSV 寫入 EXCEL

基礎範例

攝氏/華氏轉換 公分/英吋換算 判斷平年與閏年 找出不重複字元 找出中間的字元 大樂透電腦選號 下載進度條 星號金字塔 數字金字塔 猜數字 ( 猜大猜小 ) 猜數字 ( 幾 A 幾 B ) 簡單時鐘 ( 世界時間 ) 計算 BMI 數值 計算年紀 ( 歲、月、天 ) 產生身分證字號 ( 隨機 ) 檢查身分證字號 羅馬數字轉換

數學範例

兩個數字的四則運算 計算多個數字的總和 費波那契數列 九九乘法表 質因數分解 快速找出質數 最小公倍數 ( 多個數字 ) 最大公因數 ( 多個數字 )

ZeroJudge 解答

關於 ZeroJudge a001: 哈囉 a002: 簡易加法 a003: 兩光法師占卜術 a004: 文文的求婚 a005: Eva 的回家作業 a006: 一元二次方程式 a009: 解碼器 a010: 因數分解 a013: 羅馬數字 a015: 矩陣的翻轉 a017: 五則運算 a020: 身分證檢驗 a021: 大數運算 a022: 迴文 a024: 最大公因數(GCD) a034: 二進位制轉換 a038: 數字翻轉 a040: 阿姆斯壯數 a042: 平面圓形切割 a044: 空間切割 a053: Sagit's 計分程式 a054: 電話客服中心 a058: MOD3 a059: 完全平方和 a065: 提款卡密碼 a095: 麥哲倫的陰謀 a104: 排序 a147: Print it all a148: You Cannot Pass?! a149: 乘乘樂 a215: 明明愛數數 a216: 數數愛明明 a224: 明明愛明明 a225: 明明愛排列 a244: 新手訓練~for+if a248: 新手訓練~陣列應用 a263: 日期差幾天 a271: 彩色蘿蔔 a291: nAnB problem a410: 解方程 a414: 位元運算之進位篇 a417: 螺旋矩陣 a524: 手機之謎 a528: 大數排序 a647: 投資專家 a693: 吞食天地 a738: 最大公約數 a746: 畫蛇添足 a799: 正值國 a915: 二維點排序 b265: Conformity b294: 經濟大恐荒 b367: 翻轉世界 b374: 求眾數 b511: 換銅板 b558: 求數列第 n 項 e267: Group Reverse d073: 分組報告 d294: 算算算 Easy d485: 我愛偶數 d827: 買鉛筆