Day68 - Redis 与 MySQL 双写一致性
本部分涵盖 Redis 与 MySQL 数据一致性问题及解决方案。
1. 问题背景
1.1 为什么需要双写一致
在高并发系统中,Redis 作为缓存层配合 MySQL 使用:
graph TB
subgraph 典型架构
Client["客户端"]
Redis["Redis 缓存"]
MySQL["MySQL 数据库"]
end
Client --> Redis
Redis -->|"命中"| Client
Client -->|"miss"| MySQL
MySQL -->|"回填"| Redis
1.2 一致性风险
| 风险 | 场景 |
|---|---|
| 并发读写 | 缓存更新时,数据库被其他事务修改 |
| 主从延迟 | 读写分离架构下的延迟问题 |
| 删除失败 | 缓存删除失败导致数据不一致 |
2. Cache-Aside 模式
2.1 读操作
sequenceDiagram
participant C as 客户端
participant R as Redis
participant M as MySQL
C->>R: 1. 查缓存
alt 缓存命中
R-->>C: 返回数据
else 缓存未命中
R-->>C: miss
C->>M: 2. 查数据库
M-->>C: 返回数据
C->>R: 3. 回填缓存
R-->>C: OK
end
2.2 写操作(删除缓存)
def update_user(user_id, data):
# 1. 更新数据库
db.execute("UPDATE users SET ... WHERE id=?", user_id, data)
# 2. 删除缓存
redis.delete(f"user:{user_id}")
3. 延迟双删
3.1 问题场景
sequenceDiagram
participant T1 as 线程A(写)
participant R as Redis
participant M as MySQL
participant T2 as 线程B(读)
T1->>R: 1. 删除缓存
T1->>M: 2. 更新数据库
T2->>R: 3. 查缓存(miss)
T2->>M: 4. 查数据库(旧数据)
T2->>R: 5. 回填缓存(旧数据)
T1-->>T2: (写完成)
3.2 延迟双删解决方案
import time
def update_user(user_id, data):
# 1. 先删除缓存
redis.delete(f"user:{user_id}")
# 2. 更新数据库
db.execute("UPDATE users SET ... WHERE id=?", user_id, data)
# 3. 延迟再次删除缓存
time.sleep(0.5) # 等待读请求完成回填
redis.delete(f"user:{user_id}")
3.3 延迟时间计算
据《Redis 缓存核心问题:缓存与数据库双写一致性》(阿里云开发者社区),延迟时间计算:
# 延迟时间 = 读请求平均耗时 × 1.5 + 主从同步最大延迟
# 公式考虑:
# 1. 覆盖读请求查库+写缓存的完整流程
# 2. 考虑主从同步延迟
# 3. 1.5倍冗余系数
4. 更新策略对比
4.1 四种策略
| 策略 | 读 | 写 | 特点 |
|---|---|---|---|
| Cache-Aside | 先缓存后数据库 | 先数据库后删缓存 | 最常用 |
| Read-Through | 缓存加载 | 先数据库后删缓存 | 缓存自动加载 |
| Write-Through | 先缓存后数据库 | 先缓存后数据库 | 强一致,性能差 |
| Write-Behind | 先缓存后数据库 | 先缓存后异步写库 | 异步高性能 |
4.2 Cache-Aside(旁路缓存)
class CacheAside:
def read(self, key):
# 1. 查缓存
value = redis.get(key)
if value:
return value
# 2. 缓存未命中,查数据库
value = mysql.query(key)
# 3. 回填缓存
if value:
redis.setex(key, 3600, value)
return value
def write(self, key, value):
# 1. 先更新数据库
mysql.update(key, value)
# 2. 再删除缓存
redis.delete(key)
4.3 Write-Through(穿透式写入)
class WriteThrough:
def write(self, key, value):
# 1. 先写缓存
redis.set(key, value)
# 2. 再写数据库
mysql.update(key, value)
# 特点:强一致,但性能差
4.4 Write-Behind(回写式)
class WriteBehind:
def write(self, key, value):
# 1. 先写缓存(异步)
redis.set(key, value)
# 2. 异步写数据库
queue.put((key, value))
def sync_worker(self):
while True:
key, value = queue.get()
mysql.update(key, value)
5. Binlog 同步方案
5.1 架构原理
graph LR
subgraph 数据同步链路
M["MySQL"]
B["Binlog"]
C["Canal/Debezium"]
K["Kafka"]
R["Redis"]
end
M -->|"写入"| B
B -->|"订阅"| C
C -->|"解析"| K
K -->|"消费"| R
据《5分钟读懂 MySQL+Redis 双写一致性实现流程》(CSDN),Binlog 同步的核心优势:
- 业务无侵入:代码只关心写数据库
- 高性能:异步同步,不影响主链路
- 强保证:基于 Binlog 顺序同步
5.2 Canal 配置
# 1. 开启 Binlog
[mysqld]
log-bin = mysql-bin
binlog-format = ROW
# 2. 创建 Canal 用户
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO canal;
FLUSH PRIVILEGES;
# 3. 启动 Canal
./bin/startup.sh
5.3 Java 消费 Canal
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.*;
public class CanalSync {
public void sync(CanalConnector connector) {
while (true) {
Message message = connector.get(100);
for (Entry entry : message.getEntries()) {
if (entry.getEntryType() == EntryType.ROWDATA) {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
for (RowData rowData : rowChange.getRowDatasList()) {
// 根据操作类型同步
if (rowChange.getEventType() == EventType.UPDATE) {
// 同步更新
syncUpdate(rowData);
} else if (rowChange.getEventType() == EventType.DELETE) {
// 同步删除
syncDelete(rowData);
}
}
}
}
}
}
}
6. 分布式锁方案
6.1 分布式锁实现
import redis
import uuid
class DistributedLock:
def __init__(self, redis_client):
self.redis = redis_client
def acquire(self, key, timeout=10):
"""获取锁"""
lock_value = str(uuid.uuid4())
result = self.redis.set(key, lock_value, nx=True, ex=timeout)
return lock_value if result else None
def release(self, key, lock_value):
"""释放锁"""
# 使用 Lua 脚本保证原子性
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
self.redis.eval(script, 1, key, lock_value)
6.2 带锁的缓存更新
def update_with_lock(key, data):
lock_key = f"lock:{key}"
lock_value = lock.acquire(lock_key, 10)
if not lock_value:
# 获取锁失败,等待后重试
time.sleep(0.1)
return update_with_lock(key, data)
try:
# 1. 更新数据库
db.execute("UPDATE ... WHERE id=?", key, data)
# 2. 删除缓存
redis.delete(key)
finally:
# 3. 释放锁
lock.release(lock_key, lock_value)
7. 方案对比与选型
7.1 方案对比
| 方案 | 一致性 | 性能 | 复杂度 | 适用场景 |
|---|---|---|---|---|
| Cache-Aside | 最终一致 | 高 | 低 | 读多写少 |
| 延迟双删 | 更好一致性 | 中 | 低 | 一致性要求高的写操作 |
| Write-Through | 强一致 | 中 | 中 | 写多读少 |
| Binlog 同步 | 最终一致 | 高 | 高 | 大型项目最佳实践 |
7.2 选型建议
-- 读多写少场景
-- 推荐:Cache-Aside + 过期时间
-- 特点:简单高效,容忍短暂不一致
-- 一致性要求高
-- 推荐:Cache-Aside + 延迟双删
-- 特点:降低不一致窗口
-- 大型分布式系统
-- 推荐:Binlog 同步(Canal/Debezium)
-- 特点:业务无侵入,可靠性高
7.3 最佳实践
# 完整实现示例
class CacheService:
def __init__(self):
self.redis = RedisClient()
self.db = MySQLClient()
self.lock = DistributedLock(self.redis)
def get(self, key):
# 1. 查缓存
value = self.redis.get(key)
if value:
return json.loads(value)
# 2. 查数据库
value = self.db.query(key)
# 3. 回填缓存
if value:
self.redis.setex(key, 3600, json.dumps(value))
return value
def update(self, key, data):
# 获取分布式锁
lock_value = self.lock.acquire(f"lock:{key}")
if not lock_value:
time.sleep(0.1)
return self.update(key, data)
try:
# 1. 更新数据库
self.db.update(key, data)
# 2. 删除缓存
self.redis.delete(key)
finally:
self.lock.release(f"lock:{key}", lock_value)
def delete(self, key):
# 1. 删除数据库
self.db.delete(key)
# 2. 删除缓存
self.redis.delete(key)
8. 生产环境建议
8.1 关键配置
# Redis 配置
maxmemory 2gb
maxmemory-policy allkeys-lru
appendonly yes
appendfsync everysec
# MySQL 配置
binlog_format = ROW
expire_logs_days = 7
sync_binlog = 1
8.2 监控告警
| 监控项 | 阈值建议 |
|---|---|
| Redis CPU | > 80% 报警 |
| Redis 内存 | > 80% 报警 |
| 缓存命中率 | < 90% 报警 |
| 慢查询数量 | 关注趋势 |
| 主从延迟 | > 1s 报警 |
8.3 降级策略
def get_with_fallback(key):
try:
# 1. 尝试从缓存获取
value = redis.get(key)
if value:
return value
# 2. 缓存未命中,查数据库
value = mysql.query(key)
if value:
redis.setex(key, 3600, value)
return value
except RedisException:
# 3. Redis 故障,降级到数据库
return mysql.query(key)