文章背景图

Day68-Redis与MySQL双写一致性

2026-06-04
0
-
- 分钟
|

Day68 - Redis 与 MySQL 双写一致性

本部分涵盖 Redis 与 MySQL 数据一致性问题及解决方案。


1. 问题背景

1.1 为什么需要双写一致

在高并发系统中,Redis 作为缓存层配合 MySQL 使用:

graph TB
    subgraph 典型架构
        Client["客户端"]
        Redis["Redis 缓存"]
        MySQL["MySQL 数据库"]
    end
    
    Client --> Redis
    Redis -->|"命中"| Client
    Client -->|"miss"| MySQL
    MySQL -->|"回填"| Redis

1.2 一致性风险

风险 场景
并发读写 缓存更新时,数据库被其他事务修改
主从延迟 读写分离架构下的延迟问题
删除失败 缓存删除失败导致数据不一致

2. Cache-Aside 模式

2.1 读操作

sequenceDiagram
    participant C as 客户端
    participant R as Redis
    participant M as MySQL
    
    C->>R: 1. 查缓存
    alt 缓存命中
        R-->>C: 返回数据
    else 缓存未命中
        R-->>C: miss
        C->>M: 2. 查数据库
        M-->>C: 返回数据
        C->>R: 3. 回填缓存
        R-->>C: OK
    end

2.2 写操作(删除缓存)

def update_user(user_id, data):
    # 1. 更新数据库
    db.execute("UPDATE users SET ... WHERE id=?", user_id, data)
    
    # 2. 删除缓存
    redis.delete(f"user:{user_id}")

3. 延迟双删

3.1 问题场景

sequenceDiagram
    participant T1 as 线程A(写)
    participant R as Redis
    participant M as MySQL
    participant T2 as 线程B(读)
    
    T1->>R: 1. 删除缓存
    T1->>M: 2. 更新数据库
    T2->>R: 3. 查缓存(miss)
    T2->>M: 4. 查数据库(旧数据)
    T2->>R: 5. 回填缓存(旧数据)
    T1-->>T2: (写完成)

3.2 延迟双删解决方案

import time

def update_user(user_id, data):
    # 1. 先删除缓存
    redis.delete(f"user:{user_id}")
    
    # 2. 更新数据库
    db.execute("UPDATE users SET ... WHERE id=?", user_id, data)
    
    # 3. 延迟再次删除缓存
    time.sleep(0.5)  # 等待读请求完成回填
    redis.delete(f"user:{user_id}")

3.3 延迟时间计算

据《Redis 缓存核心问题:缓存与数据库双写一致性》(阿里云开发者社区),延迟时间计算:

# 延迟时间 = 读请求平均耗时 × 1.5 + 主从同步最大延迟
# 公式考虑:
# 1. 覆盖读请求查库+写缓存的完整流程
# 2. 考虑主从同步延迟
# 3. 1.5倍冗余系数

4. 更新策略对比

4.1 四种策略

策略 特点
Cache-Aside 先缓存后数据库 先数据库后删缓存 最常用
Read-Through 缓存加载 先数据库后删缓存 缓存自动加载
Write-Through 先缓存后数据库 先缓存后数据库 强一致,性能差
Write-Behind 先缓存后数据库 先缓存后异步写库 异步高性能

4.2 Cache-Aside(旁路缓存)

class CacheAside:
    def read(self, key):
        # 1. 查缓存
        value = redis.get(key)
        if value:
            return value
        
        # 2. 缓存未命中,查数据库
        value = mysql.query(key)
        
        # 3. 回填缓存
        if value:
            redis.setex(key, 3600, value)
        
        return value
    
    def write(self, key, value):
        # 1. 先更新数据库
        mysql.update(key, value)
        
        # 2. 再删除缓存
        redis.delete(key)

4.3 Write-Through(穿透式写入)

class WriteThrough:
    def write(self, key, value):
        # 1. 先写缓存
        redis.set(key, value)
        
        # 2. 再写数据库
        mysql.update(key, value)
        
        # 特点:强一致,但性能差

4.4 Write-Behind(回写式)

class WriteBehind:
    def write(self, key, value):
        # 1. 先写缓存(异步)
        redis.set(key, value)
        
        # 2. 异步写数据库
        queue.put((key, value))
        
    def sync_worker(self):
        while True:
            key, value = queue.get()
            mysql.update(key, value)

5. Binlog 同步方案

5.1 架构原理

graph LR
    subgraph 数据同步链路
        M["MySQL"]
        B["Binlog"]
        C["Canal/Debezium"]
        K["Kafka"]
        R["Redis"]
    end
    
    M -->|"写入"| B
    B -->|"订阅"| C
    C -->|"解析"| K
    K -->|"消费"| R

据《5分钟读懂 MySQL+Redis 双写一致性实现流程》(CSDN),Binlog 同步的核心优势:

  1. 业务无侵入:代码只关心写数据库
  2. 高性能:异步同步,不影响主链路
  3. 强保证:基于 Binlog 顺序同步

5.2 Canal 配置

# 1. 开启 Binlog
[mysqld]
log-bin = mysql-bin
binlog-format = ROW

# 2. 创建 Canal 用户
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO canal;
FLUSH PRIVILEGES;

# 3. 启动 Canal
./bin/startup.sh

5.3 Java 消费 Canal

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.*;

public class CanalSync {
    public void sync(CanalConnector connector) {
        while (true) {
            Message message = connector.get(100);
            
            for (Entry entry : message.getEntries()) {
                if (entry.getEntryType() == EntryType.ROWDATA) {
                    RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                    
                    for (RowData rowData : rowChange.getRowDatasList()) {
                        // 根据操作类型同步
                        if (rowChange.getEventType() == EventType.UPDATE) {
                            // 同步更新
                            syncUpdate(rowData);
                        } else if (rowChange.getEventType() == EventType.DELETE) {
                            // 同步删除
                            syncDelete(rowData);
                        }
                    }
                }
            }
        }
    }
}

6. 分布式锁方案

6.1 分布式锁实现

import redis
import uuid

class DistributedLock:
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def acquire(self, key, timeout=10):
        """获取锁"""
        lock_value = str(uuid.uuid4())
        result = self.redis.set(key, lock_value, nx=True, ex=timeout)
        return lock_value if result else None
    
    def release(self, key, lock_value):
        """释放锁"""
        # 使用 Lua 脚本保证原子性
        script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        self.redis.eval(script, 1, key, lock_value)

6.2 带锁的缓存更新

def update_with_lock(key, data):
    lock_key = f"lock:{key}"
    lock_value = lock.acquire(lock_key, 10)
    
    if not lock_value:
        # 获取锁失败,等待后重试
        time.sleep(0.1)
        return update_with_lock(key, data)
    
    try:
        # 1. 更新数据库
        db.execute("UPDATE ... WHERE id=?", key, data)
        
        # 2. 删除缓存
        redis.delete(key)
    finally:
        # 3. 释放锁
        lock.release(lock_key, lock_value)

7. 方案对比与选型

7.1 方案对比

方案 一致性 性能 复杂度 适用场景
Cache-Aside 最终一致 读多写少
延迟双删 更好一致性 一致性要求高的写操作
Write-Through 强一致 写多读少
Binlog 同步 最终一致 大型项目最佳实践

7.2 选型建议

-- 读多写少场景
-- 推荐:Cache-Aside + 过期时间
-- 特点:简单高效,容忍短暂不一致

-- 一致性要求高
-- 推荐:Cache-Aside + 延迟双删
-- 特点:降低不一致窗口

-- 大型分布式系统
-- 推荐:Binlog 同步(Canal/Debezium)
-- 特点:业务无侵入,可靠性高

7.3 最佳实践

# 完整实现示例
class CacheService:
    def __init__(self):
        self.redis = RedisClient()
        self.db = MySQLClient()
        self.lock = DistributedLock(self.redis)
    
    def get(self, key):
        # 1. 查缓存
        value = self.redis.get(key)
        if value:
            return json.loads(value)
        
        # 2. 查数据库
        value = self.db.query(key)
        
        # 3. 回填缓存
        if value:
            self.redis.setex(key, 3600, json.dumps(value))
        
        return value
    
    def update(self, key, data):
        # 获取分布式锁
        lock_value = self.lock.acquire(f"lock:{key}")
        if not lock_value:
            time.sleep(0.1)
            return self.update(key, data)
        
        try:
            # 1. 更新数据库
            self.db.update(key, data)
            
            # 2. 删除缓存
            self.redis.delete(key)
        finally:
            self.lock.release(f"lock:{key}", lock_value)
    
    def delete(self, key):
        # 1. 删除数据库
        self.db.delete(key)
        
        # 2. 删除缓存
        self.redis.delete(key)

8. 生产环境建议

8.1 关键配置

# Redis 配置
maxmemory 2gb
maxmemory-policy allkeys-lru
appendonly yes
appendfsync everysec

# MySQL 配置
binlog_format = ROW
expire_logs_days = 7
sync_binlog = 1

8.2 监控告警

监控项 阈值建议
Redis CPU > 80% 报警
Redis 内存 > 80% 报警
缓存命中率 < 90% 报警
慢查询数量 关注趋势
主从延迟 > 1s 报警

8.3 降级策略

def get_with_fallback(key):
    try:
        # 1. 尝试从缓存获取
        value = redis.get(key)
        if value:
            return value
        
        # 2. 缓存未命中,查数据库
        value = mysql.query(key)
        if value:
            redis.setex(key, 3600, value)
        return value
    
    except RedisException:
        # 3. Redis 故障,降级到数据库
        return mysql.query(key)
原创

Day68-Redis与MySQL双写一致性

本文链接: Day68-Redis与MySQL双写一致性

本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。

评论交流

文章目录