你的浏览器不支持canvas

做你害怕做的事情,然后你会发现,不过如此。

使用Redis的ZSet实现延时队列

时间: 作者: 黄运鑫

本文章属原创文章,未经作者许可,禁止转载,复制,下载,以及用作商业用途。原作者保留所有解释权。


业务场景

  • 用户在编辑文档的时,用户不需要点击保存按钮,前端实时将内容提交到后台,类似飞书云文档的实时保存
  • 由于文档支持多人协同编辑,所以同一个文档的提交有并发场景

实现思路

  • 计划使用Redis临时存储内容,等到用户无提交动作30秒后,再将内容保存到数据库,减少数据库压力
  • 这就需要Redis记录提交时间,每次保存更新提交时间,并且可以根据提交时间筛选数据
  • RedisZSet符合这个场景。可以使用ZSet做一个延时队列,value存储内容idscore存储提交时间,用户每次提交时更新score
  • ZSet不存储数据内容只记录数据id和提交时间,所以还需要使用普通的key-value存储数据,key为内容idvalue为内容数据
  • 消费队列时,根据ZSetscore查询30秒之前的内容id,内容保存到数据库后删除ZSetkey-value的数据
  • 需要注意的是:Redis读取内容到删除内容这个过程,需要对内容加锁,在此期间禁止修改内容,防止读取内容到删除内容之间用户又提交了新内容,造成新内容丢失

实现代码

  • Controller代码如下:
/**
 * 文档资源 控制器
 */
@Slf4j
@RestController
@AllArgsConstructor
@RequestMapping("/wiki/v1")
@Api(value = "文档资源", tags = "文档资源")
public class DlWikiController extends GeccoController {
    private final DlWikiCacheService dlWikiCacheService;

    /**
     * 文档暂存
     */
    @PostMapping("/stash")
    @ApiOperationSupport(order = 1)
    @ApiOperation(value = "文档暂存", notes = "传入dto")
    public R stash(@RequestBody DlWikiDTO dto) {
        return dlWikiCacheService.setStash(dto);
    }
}
  • Service代码如下:
package com.transnal.modules.wiki.service.impl;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import com.transnal.core.redis.cache.GeccoRedis;
import com.transnal.core.redis.lock.LockType;
import com.transnal.core.redis.lock.RedisLockClient;
import com.transnal.core.tool.api.R;
import com.transnal.modules.wiki.dto.DlWikiDTO;
import com.transnal.modules.wiki.service.IDlWikiService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Slf4j
@Service
@RequiredArgsConstructor
public class DlWikiCacheService {
    //文档服务
    private final IDlWikiService wikiService;
    //Redis工具类
    private final GeccoRedis geccoRedis;
    //Redis锁工具类
    private final RedisLockClient redisLockClient;
    //ZSet的key
    private final String stashKey = "gecco:wiki:stash";
    //文档内容的key
    private final String stashContentKey = "gecco:wiki:content";
    //文档内容锁
    private final String setStashLock = "setStashLock";

    @PostConstruct
    public void consume() {
        //创建线程池,线程数量为1,每隔5秒执行一次
        ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1);
        exec.scheduleAtFixedRate(() -> {
            //捕获异常,防止异常导致线程停止
            try {
                //获取30秒之前提交的数据id
                DateTime dateTime = DateUtil.offsetSecond(new Date(), -30);
                Set<Object> set = geccoRedis.getZSetOps().rangeByScore(stashKey, 0,
                    Double.parseDouble(DateUtil.format(dateTime, "yyyyMMddHHmmss")));
                log.info("stashConsume:待处理{}条", CollUtil.size(set));
                if (CollUtil.isEmpty(set)) {
                    return;
                }

                //处理成功计数
                int i = 0;
                for (Object o : set) {
                    //内容锁,防止处理数据期间,用户又提交了新数据
                    String stashLockKey = setStashLock + o.toString();
                    boolean stashLock = redisLockClient.tryLock(stashLockKey, LockType.FAIR, 5, 10, TimeUnit.SECONDS);
                    if (!stashLock) {
                        log.info("stashConsume:数据锁获取失败{}", stashLockKey);
                        continue;
                    }
                    //捕获异常,防止某条数据异常,导致处理不到后续数据
                    try {
                        //获取数据内容
                        String dtoKey = stashContentKey + ":" + o.toString();
                        DlWikiDTO dto = geccoRedis.get(dtoKey);
                        if (dto != null && dto.getContent() != null) {
                            //更新数据库
                            wikiService.submit(dto);
                        }
                        //删除redis内容
                        geccoRedis.getZSetOps().remove(stashKey, o);
                        geccoRedis.del(dtoKey);
                        i++;
                    } catch (Exception e) {
                        log.info("stashConsume:异常");
                        e.printStackTrace();
                    } finally {
                        //解锁
                        redisLockClient.unLock(stashLockKey, LockType.FAIR);
                    }
                }
                log.info("stashConsume:处理成功{}条", i);
            } catch (Exception e) {
                log.info("stashConsume:异常");
                e.printStackTrace();
            }
        }, 1, 5, TimeUnit.SECONDS);
    }

    /**
     * 内容保存到redis
     */
    public R setStash(DlWikiDTO dto) {
        if (dto == null || dto.getId() == null) {
            return R.fail("参数错误");
        }

        //内容锁,防止处理数据期间,用户又提交了新数据
        String stashLockKey = setStashLock + dto.getId();
        try {
            boolean stashLock = redisLockClient.tryLock(stashLockKey, LockType.FAIR, 5, 10, TimeUnit.SECONDS);
            if (!stashLock) {
                return R.fail("setStash锁获取失败");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

        try {
            Long id = dto.getId();
            //保存ZSet,如果id已存在,则会更新score
            geccoRedis.getZSetOps().add(stashKey, id, Double.parseDouble(DateUtil.format(new Date(), "yyyyMMddHHmmss")));
            //保存内容
            geccoRedis.set(stashContentKey + ":" + id, dto);
            return R.status(true);
        } catch (Exception e) {
            e.printStackTrace();
            return R.fail(e.getMessage());
        } finally {
            //解锁
            redisLockClient.unLock(stashLockKey, LockType.FAIR);
        }
    }

    /**
     * 获取redis的内容
     */
    public DlWikiDTO getStash(Long id) {
        if (id == null) {
            return null;
        }
        return geccoRedis.get(stashContentKey + ":" + id);
    }
}

对于本文内容有问题或建议的小伙伴,欢迎在文章底部留言交流讨论。