卡卡编程网

专注编程技术分享,涵盖开发教程与实战案例

30天学会Python编程:12. Python并发编程

12.1 并发编程基础

12.1.1 并发模型对比

12.1.2 GIL全局解释器锁

关键影响

  • 同一时间只有一个线程执行Python字节码
  • 对CPU密集型任务影响显著
  • I/O密集型任务仍可受益于多线程

12.2 多线程编程

12.2.1 threading模块

基本使用

import threading

def worker(num):
    print(f"Worker {num} 开始执行")
    # 模拟工作
    import time
    time.sleep(1)
    print(f"Worker {num} 执行完成")

threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

12.2.2 线程同步

锁机制示例

import threading

class Counter:
    def __init__(self):
        self.value = 0
        self.lock = threading.Lock()
    
    def increment(self):
        with self.lock:  # 自动获取和释放锁
            self.value += 1

counter = Counter()

def increment_worker():
    for _ in range(100000):
        counter.increment()

threads = [threading.Thread(target=increment_worker) for _ in range(2)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"最终计数: {counter.value}")  # 应为200000

12.3 多进程编程

12.3.1 multiprocessing模块

基本使用

from multiprocessing import Process
import os

def task(name):
    print(f"子进程 {name} (PID: {os.getpid()}) 执行中...")
    # CPU密集型计算
    result = sum(i*i for i in range(1000000))
    print(f"子进程 {name} 完成")

if __name__ == '__main__':
    processes = []
    for i in range(4):  # 4核CPU常用
        p = Process(target=task, args=(i,))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()

12.3.2 进程池

from multiprocessing import Pool

def cpu_intensive(n):
    return sum(i * i for i in range(n))

if __name__ == '__main__':
    with Pool(4) as pool:  # 4个工作进程
        # map方法并行处理
        results = pool.map(cpu_intensive, range(10000, 10010))
        print(results)

12.4 异步编程(asyncio)

12.4.1 协程基础

import asyncio

async def fetch_data(url):
    print(f"开始获取 {url}")
    await asyncio.sleep(2)  # 模拟IO操作
    print(f"完成获取 {url}")
    return f"{url} 的数据"

async def main():
    task1 = asyncio.create_task(fetch_data("url1"))
    task2 = asyncio.create_task(fetch_data("url2"))
    
    results = await asyncio.gather(task1, task2)
    print(results)

asyncio.run(main())

12.4.2 异步IO操作

import aiohttp
import asyncio

async def fetch_page(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

async def main():
    urls = [
        "http://example.com",
        "http://example.org",
        "http://example.net"
    ]
    tasks = [fetch_page(url) for url in urls]
    pages = await asyncio.gather(*tasks)
    print(f"获取了 {len(pages)} 个页面")

asyncio.run(main())

12.5 并发工具

12.5.1 concurrent.futures

from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request

URLS = ['http://example.com', 'http://example.org']

def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

with ThreadPoolExecutor(max_workers=5) as executor:
    future_to_url = {
        executor.submit(load_url, url, 60): url 
        for url in URLS
    }
    for future in as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
            print(f"{url} 页面长度为 {len(data)}")
        except Exception as e:
            print(f"{url} 获取失败: {e}")

12.5.2 队列通信

from queue import Queue
from threading import Thread

def producer(q, items):
    for item in items:
        print(f"生产: {item}")
        q.put(item)
    q.put(None)  # 结束信号

def consumer(q):
    while True:
        item = q.get()
        if item is None:
            break
        print(f"消费: {item}")

q = Queue()
producer_thread = Thread(target=producer, args=(q, [1,2,3]))
consumer_thread = Thread(target=consumer, args=(q,))

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

12.6 应用举例

案例1:并发Web爬虫

import aiohttp
import asyncio
from urllib.parse import urljoin
from bs4 import BeautifulSoup

async def crawl(start_url, max_depth=2):
    visited = set()
    queue = [(start_url, 0)]
    
    async with aiohttp.ClientSession() as session:
        while queue:
            url, depth = queue.pop(0)
            if url in visited or depth > max_depth:
                continue
                
            try:
                print(f"抓取: {url}")
                async with session.get(url) as response:
                    html = await response.text()
                    visited.add(url)
                    
                    if depth < max_depth:
                        soup = BeautifulSoup(html, 'html.parser')
                        tasks = []
                        for link in soup.find_all('a', href=True):
                            next_url = urljoin(url, link['href'])
                            if next_url not in visited:
                                queue.append((next_url, depth + 1))
            except Exception as e:
                print(f"抓取失败 {url}: {e}")

asyncio.run(crawl("http://example.com"))

案例2:实时数据处理管道

import threading
import queue
import random
import time

class DataPipeline:
    def __init__(self):
        self.raw_data_queue = queue.Queue()
        self.processed_data = []
        self.lock = threading.Lock()
    
    def data_source(self):
        """模拟数据源"""
        while True:
            data = random.randint(1, 100)
            self.raw_data_queue.put(data)
            time.sleep(0.1)
    
    def data_processor(self):
        """数据处理工作线程"""
        while True:
            data = self.raw_data_queue.get()
            # 模拟处理延迟
            time.sleep(0.2)
            result = data * 2
            
            with self.lock:
                self.processed_data.append(result)
                print(f"处理数据: {data} -> {result} (队列大小: {self.raw_data_queue.qsize()})")
    
    def start(self):
        """启动处理管道"""
        threads = [
            threading.Thread(target=self.data_source, daemon=True),
            threading.Thread(target=self.data_processor, daemon=True),
            threading.Thread(target=self.data_processor, daemon=True)
        ]
        
        for t in threads:
            t.start()
        
        try:
            while True:
                time.sleep(1)
                with self.lock:
                    print(f"当前处理结果数: {len(self.processed_data)}")
        except KeyboardInterrupt:
            print("停止管道")

if __name__ == '__main__':
    pipeline = DataPipeline()
    pipeline.start()

12.7 学习路线图

12.8 学习总结

  1. 核心要点
  • 理解GIL的影响和应对策略
  • 掌握线程同步原语的使用
  • 区分CPU密集和IO密集任务的并发方案
  • 熟悉async/await编程模型
  1. 实践建议
  • IO密集型使用多线程或异步
  • CPU密集型使用多进程
  • 共享数据必须加锁保护
  • 合理控制并发数量
  1. 进阶方向
  • 分布式任务队列(Celery)
  • 基于事件的驱动架构
  • 异步数据库驱动
  • 协程与生成器的深度结合
  1. 常见陷阱
  • 多线程中的竞态条件
  • 忘记释放锁导致的死锁
  • 异步函数中阻塞调用
  • 进程间通信的性能瓶颈

持续更新Python编程学习日志与技巧,敬请关注!


#编程# #学习# #在头条记录我的2025# #python#


控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言