引言

在现代软件开发中,多任务处理能力已成为提升应用性能的关键因素。特别是在数据采集、API调用和后台处理等场景中,合理使用多任务编程可以显著提高程序的吞吐量和响应速度。本文将深入探讨Python中三种主要的多任务开发方式:多线程、多进程和协程,并通过实际案例展示它们的应用场景和最佳实践。

多任务开发的应用场景

在深入技术细节前,我们需要明确不同多任务方案的适用场景:

一、多线程开发

1. 使用Thread类

Python的threading模块提供了Thread类,用于创建和管理线程。以下是一个基本的多线程实现示例:

import threading
import time

def worker(num):
    print(f'Worker: {num} starting')
    time.sleep(1)  # 模拟耗时操作
    print(f'Worker: {num} finishing')

# 创建多个线程
threads = []
for i in range(5):
    # 创建线程,target参数为任务处理函数,args为参数元组
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.setDaemon(True)  # 设为守护线程
    t.start()  # 开始线程

# 等待所有线程完成
for t in threads:
    t.join()

print("所有线程执行完毕")

2. 使用线程池

对于大量任务的处理,使用线程池更为高效。Python的concurrent.futures模块提供了ThreadPoolExecutor类:

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def task_function(x):
    # 模拟耗时操作
    time.sleep(1)
    return x * x

def main():
    tasks = range(100)  # 假设有100个任务
    
    with ThreadPoolExecutor(max_workers=10) as executor:
        # 提交任务到线程池,并收集Future对象
        future_to_result = {executor.submit(task_function, arg): arg for arg in tasks}
        
        # 等待任务完成并收集结果
        for future in as_completed(future_to_result):
            arg = future_to_result[future]
            try:
                result = future.result()
                print(f"任务 {arg} 的结果是 {result}")
            except Exception as e:
                print(f"任务 {arg} 遇到异常:{e}")

if __name__ == "__main__":
    main()

3. 分批处理任务

在处理大量任务时,我们通常需要分批处理以避免资源耗尽:

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def task_function(x):
    # 模拟耗时操作
    time.sleep(1)
    return x * x

def main():
    tasks = list(range(200))  # 假设有200个任务
    batch_size = 20  # 每次提交的任务数量
    batch_num = 1
    
    with ThreadPoolExecutor(max_workers=10) as executor:
        start_index = 0
        while start_index < len(tasks):
            print(f"第{batch_num}批任务,开始")
            end_index = min(start_index + batch_size, len(tasks))
            current_batch = tasks[start_index:end_index]
            
            future_to_result = {}
            for arg in current_batch:
                future = executor.submit(task_function, arg)
                future_to_result[future] = arg
                
            # 等待当前批次的任务完成并收集结果
            for future in as_completed(future_to_result):
                arg = future_to_result[future]
                try:
                    result = future.result()
                    print(f"任务 {arg} 的结果是 {result}")
                except Exception as e:
                    print(f"任务 {arg} 遇到异常:{e}")
                    
            start_index += batch_size
            batch_num += 1

if __name__ == "__main__":
    main()

4. Thread与ThreadPoolExecutor对比