12.1 并发编程基础
12.1.1 并发模型对比
12.1.2 GIL全局解释器锁
关键影响:
- 同一时间只有一个线程执行Python字节码
- 对CPU密集型任务影响显著
- I/O密集型任务仍可受益于多线程
12.2 多线程编程
12.2.1 threading模块
基本使用:
import threading
def worker(num):
print(f"Worker {num} 开始执行")
# 模拟工作
import time
time.sleep(1)
print(f"Worker {num} 执行完成")
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
12.2.2 线程同步
锁机制示例:
import threading
class Counter:
def __init__(self):
self.value = 0
self.lock = threading.Lock()
def increment(self):
with self.lock: # 自动获取和释放锁
self.value += 1
counter = Counter()
def increment_worker():
for _ in range(100000):
counter.increment()
threads = [threading.Thread(target=increment_worker) for _ in range(2)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"最终计数: {counter.value}") # 应为200000
12.3 多进程编程
12.3.1 multiprocessing模块
基本使用:
from multiprocessing import Process
import os
def task(name):
print(f"子进程 {name} (PID: {os.getpid()}) 执行中...")
# CPU密集型计算
result = sum(i*i for i in range(1000000))
print(f"子进程 {name} 完成")
if __name__ == '__main__':
processes = []
for i in range(4): # 4核CPU常用
p = Process(target=task, args=(i,))
processes.append(p)
p.start()
for p in processes:
p.join()
12.3.2 进程池
from multiprocessing import Pool
def cpu_intensive(n):
return sum(i * i for i in range(n))
if __name__ == '__main__':
with Pool(4) as pool: # 4个工作进程
# map方法并行处理
results = pool.map(cpu_intensive, range(10000, 10010))
print(results)
12.4 异步编程(asyncio)
12.4.1 协程基础
import asyncio
async def fetch_data(url):
print(f"开始获取 {url}")
await asyncio.sleep(2) # 模拟IO操作
print(f"完成获取 {url}")
return f"{url} 的数据"
async def main():
task1 = asyncio.create_task(fetch_data("url1"))
task2 = asyncio.create_task(fetch_data("url2"))
results = await asyncio.gather(task1, task2)
print(results)
asyncio.run(main())
12.4.2 异步IO操作
import aiohttp
import asyncio
async def fetch_page(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
"http://example.com",
"http://example.org",
"http://example.net"
]
tasks = [fetch_page(url) for url in urls]
pages = await asyncio.gather(*tasks)
print(f"获取了 {len(pages)} 个页面")
asyncio.run(main())
12.5 并发工具
12.5.1 concurrent.futures
from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request
URLS = ['http://example.com', 'http://example.org']
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
with ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {
executor.submit(load_url, url, 60): url
for url in URLS
}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
print(f"{url} 页面长度为 {len(data)}")
except Exception as e:
print(f"{url} 获取失败: {e}")
12.5.2 队列通信
from queue import Queue
from threading import Thread
def producer(q, items):
for item in items:
print(f"生产: {item}")
q.put(item)
q.put(None) # 结束信号
def consumer(q):
while True:
item = q.get()
if item is None:
break
print(f"消费: {item}")
q = Queue()
producer_thread = Thread(target=producer, args=(q, [1,2,3]))
consumer_thread = Thread(target=consumer, args=(q,))
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
12.6 应用举例
案例1:并发Web爬虫
import aiohttp
import asyncio
from urllib.parse import urljoin
from bs4 import BeautifulSoup
async def crawl(start_url, max_depth=2):
visited = set()
queue = [(start_url, 0)]
async with aiohttp.ClientSession() as session:
while queue:
url, depth = queue.pop(0)
if url in visited or depth > max_depth:
continue
try:
print(f"抓取: {url}")
async with session.get(url) as response:
html = await response.text()
visited.add(url)
if depth < max_depth:
soup = BeautifulSoup(html, 'html.parser')
tasks = []
for link in soup.find_all('a', href=True):
next_url = urljoin(url, link['href'])
if next_url not in visited:
queue.append((next_url, depth + 1))
except Exception as e:
print(f"抓取失败 {url}: {e}")
asyncio.run(crawl("http://example.com"))
案例2:实时数据处理管道
import threading
import queue
import random
import time
class DataPipeline:
def __init__(self):
self.raw_data_queue = queue.Queue()
self.processed_data = []
self.lock = threading.Lock()
def data_source(self):
"""模拟数据源"""
while True:
data = random.randint(1, 100)
self.raw_data_queue.put(data)
time.sleep(0.1)
def data_processor(self):
"""数据处理工作线程"""
while True:
data = self.raw_data_queue.get()
# 模拟处理延迟
time.sleep(0.2)
result = data * 2
with self.lock:
self.processed_data.append(result)
print(f"处理数据: {data} -> {result} (队列大小: {self.raw_data_queue.qsize()})")
def start(self):
"""启动处理管道"""
threads = [
threading.Thread(target=self.data_source, daemon=True),
threading.Thread(target=self.data_processor, daemon=True),
threading.Thread(target=self.data_processor, daemon=True)
]
for t in threads:
t.start()
try:
while True:
time.sleep(1)
with self.lock:
print(f"当前处理结果数: {len(self.processed_data)}")
except KeyboardInterrupt:
print("停止管道")
if __name__ == '__main__':
pipeline = DataPipeline()
pipeline.start()
12.7 学习路线图
12.8 学习总结
- 核心要点:
- 理解GIL的影响和应对策略
- 掌握线程同步原语的使用
- 区分CPU密集和IO密集任务的并发方案
- 熟悉async/await编程模型
- 实践建议:
- IO密集型使用多线程或异步
- CPU密集型使用多进程
- 共享数据必须加锁保护
- 合理控制并发数量
- 进阶方向:
- 分布式任务队列(Celery)
- 基于事件的驱动架构
- 异步数据库驱动
- 协程与生成器的深度结合
- 常见陷阱:
- 多线程中的竞态条件
- 忘记释放锁导致的死锁
- 异步函数中阻塞调用
- 进程间通信的性能瓶颈
持续更新Python编程学习日志与技巧,敬请关注!
#编程# #学习# #在头条记录我的2025# #python#