18.1 数据库基础
18.1.1 数据库类型对比
18.1.2 DB-API规范
核心接口:
- connect() - 建立连接
- cursor() - 创建游标
- execute() - 执行SQL
- fetchone()/fetchall() - 获取结果
- commit()/rollback() - 事务控制
18.2 SQLite操作
18.2.1 基本CRUD
import sqlite3
# 创建连接
conn = sqlite3.connect('example.db', check_same_thread=False)
cursor = conn.cursor()
# 建表
cursor.execute('''CREATE TABLE IF NOT EXISTS users
(id INTEGER PRIMARY KEY, name TEXT, age INTEGER)''')
# 插入数据
cursor.execute("INSERT INTO users (name, age) VALUES (?, ?)", ('Alice', 25))
# 查询数据
cursor.execute("SELECT * FROM users WHERE age > ?", (20,))
print(cursor.fetchall())
# 提交事务
conn.commit()
conn.close()
18.2.2 高级特性
# 使用上下文管理器
with sqlite3.connect('example.db') as conn:
conn.row_factory = sqlite3.Row # 字典式访问
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
for row in cursor:
print(row['name'], row['age'])
# 内存数据库
mem_db = sqlite3.connect(':memory:')
18.3 MySQL/PostgreSQL
18.3.1 PyMySQL示例
import pymysql
# 连接MySQL
conn = pymysql.connect(
host='localhost',
user='root',
password='password',
database='test',
cursorclass=pymysql.cursors.DictCursor
)
try:
with conn.cursor() as cursor:
# 执行查询
sql = "SELECT * FROM users WHERE email=%s"
cursor.execute(sql, ('test@example.com',))
result = cursor.fetchone()
print(result)
finally:
conn.close()
18.3.2 psycopg2示例
import psycopg2
# 连接PostgreSQL
conn = psycopg2.connect(
host="localhost",
database="test",
user="postgres",
password="password"
)
# 使用with自动提交/回滚
with conn:
with conn.cursor() as cursor:
cursor.execute("""
INSERT INTO products (name, price)
VALUES (%s, %s)
RETURNING id
""", ("Laptop", 999.99))
product_id = cursor.fetchone()[0]
print(f"插入记录ID: {product_id}")
18.4 ORM框架
18.4.1 SQLAlchemy核心
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# 定义模型
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(50))
age = Column(Integer)
# 创建引擎和会话
engine = create_engine('sqlite:///example.db')
Session = sessionmaker(bind=engine)
session = Session()
# 查询操作
users = session.query(User).filter(User.age > 20).all()
for user in users:
print(user.name, user.age)
# 插入数据
new_user = User(name='Bob', age=30)
session.add(new_user)
session.commit()
18.4.2 Django ORM
# models.py
from django.db import models
class Product(models.Model):
name = models.CharField(max_length=100)
price = models.DecimalField(max_digits=10, decimal_places=2)
created_at = models.DateTimeField(auto_now_add=True)
# 查询示例
from app.models import Product
# 创建记录
Product.objects.create(name="Mouse", price=29.99)
# 复杂查询
from django.db.models import Q, F
products = Product.objects.filter(
Q(price__lt=100) | Q(name__startswith="M"),
created_at__year=2023
).annotate(
discounted_price=F('price')*0.9
)
18.5 NoSQL数据库
18.5.1 MongoDB
from pymongo import MongoClient
# 连接MongoDB
client = MongoClient('mongodb://localhost:27017/')
db = client['mydatabase']
collection = db['users']
# 插入文档
user = {"name": "Alice", "age": 25, "hobbies": ["coding", "reading"]}
inserted_id = collection.insert_one(user).inserted_id
# 聚合查询
pipeline = [
{"$match": {"age": {"$gt": 20}}},
{"$group": {"_id": "$name", "count": {"$sum": 1}}}
]
results = collection.aggregate(pipeline)
18.5.2 Redis
import redis
# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0)
# 字符串操作
r.set('foo', 'bar')
print(r.get('foo')) # b'bar'
# 哈希操作
r.hset('user:1000', mapping={
'name': 'Alice',
'age': '25'
})
print(r.hgetall('user:1000')) # {b'name': b'Alice', b'age': b'25'}
18.6 数据库连接池
18.6.1 连接池实现
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
# 创建连接池
engine = create_engine(
'mysql+pymysql://user:password@localhost/db',
poolclass=QueuePool,
pool_size=5,
max_overflow=10,
pool_timeout=30
)
# 使用连接
with engine.connect() as conn:
result = conn.execute("SELECT * FROM users")
print(result.fetchall())
18.6.2 连接池管理
import psycopg2
from psycopg2 import pool
# 创建连接池
connection_pool = psycopg2.pool.SimpleConnectionPool(
minconn=1,
maxconn=10,
host="localhost",
database="test",
user="postgres",
password="password"
)
# 获取连接
conn = connection_pool.getconn()
try:
with conn.cursor() as cursor:
cursor.execute("SELECT * FROM products")
print(cursor.fetchall())
finally:
connection_pool.putconn(conn)
18.7 应用举例
案例1:电商订单
from sqlalchemy import create_engine, ForeignKey
from sqlalchemy.orm import relationship, sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Float, DateTime
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(50))
orders = relationship("Order", back_populates="user")
class Product(Base):
__tablename__ = 'products'
id = Column(Integer, primary_key=True)
name = Column(String(100))
price = Column(Float)
order_items = relationship("OrderItem", back_populates="product")
class Order(Base):
__tablename__ = 'orders'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('users.id'))
created_at = Column(DateTime)
user = relationship("User", back_populates="orders")
items = relationship("OrderItem", back_populates="order")
class OrderItem(Base):
__tablename__ = 'order_items'
id = Column(Integer, primary_key=True)
order_id = Column(Integer, ForeignKey('orders.id'))
product_id = Column(Integer, ForeignKey('products.id'))
quantity = Column(Integer)
order = relationship("Order", back_populates="items")
product = relationship("Product", back_populates="order_items")
# 使用示例
engine = create_engine('sqlite:///ecommerce.db')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
# 创建测试数据
new_user = User(name="Alice")
session.add(new_user)
product1 = Product(name="Laptop", price=999.99)
product2 = Product(name="Mouse", price=29.99)
session.add_all([product1, product2])
order = Order(user=new_user)
order.items = [
OrderItem(product=product1, quantity=1),
OrderItem(product=product2, quantity=2)
]
session.add(order)
session.commit()
# 查询用户订单
user = session.query(User).filter_by(name="Alice").first()
for order in user.orders:
print(f"订单 {order.id}:")
for item in order.items:
print(f" - {item.product.name} x{item.quantity}")
案例2:缓存策略实现
import sqlite3
import redis
import json
from datetime import datetime
class CachedDatabase:
"""带Redis缓存的数据库访问层"""
def __init__(self, db_file=':memory:'):
self.redis = redis.Redis(host='localhost', port=6379, db=0)
self.db_conn = sqlite3.connect(db_file)
self._init_db()
def _init_db(self):
cursor = self.db_conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS articles
(id INTEGER PRIMARY KEY, title TEXT, content TEXT,
created_at TIMESTAMP)''')
self.db_conn.commit()
def get_article(self, article_id):
"""获取文章(优先从缓存读取)"""
cache_key = f"article:{article_id}"
cached_data = self.redis.get(cache_key)
if cached_data:
print("从缓存读取")
return json.loads(cached_data)
print("从数据库读取")
cursor = self.db_conn.cursor()
cursor.execute("SELECT * FROM articles WHERE id=?", (article_id,))
row = cursor.fetchone()
if row:
article = {
'id': row[0],
'title': row[1],
'content': row[2],
'created_at': row[3]
}
# 写入缓存(60秒过期)
self.redis.setex(cache_key, 60, json.dumps(article))
return article
return None
def create_article(self, title, content):
"""创建新文章(自动清除相关缓存)"""
cursor = self.db_conn.cursor()
created_at = datetime.now().isoformat()
cursor.execute("INSERT INTO articles (title, content, created_at) VALUES (?, ?, ?)",
(title, content, created_at))
self.db_conn.commit()
# 获取新插入的ID
article_id = cursor.lastrowid
# 清除可能存在的缓存
self.redis.delete(f"article:{article_id}")
return article_id
# 使用示例
cache_db = CachedDatabase()
# 创建测试文章
article_id = cache_db.create_article(
"Python数据库编程",
"本文介绍Python中的各种数据库操作方法..."
)
# 第一次读取(从数据库)
article = cache_db.get_article(article_id)
# 第二次读取(从缓存)
article = cache_db.get_article(article_id)
18.8 知识图谱
持续更新Python编程学习日志与技巧,敬请关注!
#编程# #学习# #在头条记录我的2025# #python#