package com.transnal.modules.wiki.service.impl;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import com.transnal.core.redis.cache.GeccoRedis;
import com.transnal.core.redis.lock.LockType;
import com.transnal.core.redis.lock.RedisLockClient;
import com.transnal.core.tool.api.R;
import com.transnal.modules.wiki.dto.DlWikiDTO;
import com.transnal.modules.wiki.service.IDlWikiService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
@RequiredArgsConstructor
public class DlWikiCacheService {
//文档服务
private final IDlWikiService wikiService;
//Redis工具类
private final GeccoRedis geccoRedis;
//Redis锁工具类
private final RedisLockClient redisLockClient;
//ZSet的key
private final String stashKey = "gecco:wiki:stash";
//文档内容的key
private final String stashContentKey = "gecco:wiki:content";
//文档内容锁
private final String setStashLock = "setStashLock";
@PostConstruct
public void consume() {
//创建线程池,线程数量为1,每隔5秒执行一次
ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1);
exec.scheduleAtFixedRate(() -> {
//捕获异常,防止异常导致线程停止
try {
//获取30秒之前提交的数据id
DateTime dateTime = DateUtil.offsetSecond(new Date(), -30);
Set<Object> set = geccoRedis.getZSetOps().rangeByScore(stashKey, 0,
Double.parseDouble(DateUtil.format(dateTime, "yyyyMMddHHmmss")));
log.info("stashConsume:待处理{}条", CollUtil.size(set));
if (CollUtil.isEmpty(set)) {
return;
}
//处理成功计数
int i = 0;
for (Object o : set) {
//内容锁,防止处理数据期间,用户又提交了新数据
String stashLockKey = setStashLock + o.toString();
boolean stashLock = redisLockClient.tryLock(stashLockKey, LockType.FAIR, 5, 10, TimeUnit.SECONDS);
if (!stashLock) {
log.info("stashConsume:数据锁获取失败{}", stashLockKey);
continue;
}
//捕获异常,防止某条数据异常,导致处理不到后续数据
try {
//获取数据内容
String dtoKey = stashContentKey + ":" + o.toString();
DlWikiDTO dto = geccoRedis.get(dtoKey);
if (dto != null && dto.getContent() != null) {
//更新数据库
wikiService.submit(dto);
}
//删除redis内容
geccoRedis.getZSetOps().remove(stashKey, o);
geccoRedis.del(dtoKey);
i++;
} catch (Exception e) {
log.info("stashConsume:异常");
e.printStackTrace();
} finally {
//解锁
redisLockClient.unLock(stashLockKey, LockType.FAIR);
}
}
log.info("stashConsume:处理成功{}条", i);
} catch (Exception e) {
log.info("stashConsume:异常");
e.printStackTrace();
}
}, 1, 5, TimeUnit.SECONDS);
}
/**
* 内容保存到redis
*/
public R setStash(DlWikiDTO dto) {
if (dto == null || dto.getId() == null) {
return R.fail("参数错误");
}
//内容锁,防止处理数据期间,用户又提交了新数据
String stashLockKey = setStashLock + dto.getId();
try {
boolean stashLock = redisLockClient.tryLock(stashLockKey, LockType.FAIR, 5, 10, TimeUnit.SECONDS);
if (!stashLock) {
return R.fail("setStash锁获取失败");
}
} catch (Exception e) {
e.printStackTrace();
}
try {
Long id = dto.getId();
//保存ZSet,如果id已存在,则会更新score
geccoRedis.getZSetOps().add(stashKey, id, Double.parseDouble(DateUtil.format(new Date(), "yyyyMMddHHmmss")));
//保存内容
geccoRedis.set(stashContentKey + ":" + id, dto);
return R.status(true);
} catch (Exception e) {
e.printStackTrace();
return R.fail(e.getMessage());
} finally {
//解锁
redisLockClient.unLock(stashLockKey, LockType.FAIR);
}
}
/**
* 获取redis的内容
*/
public DlWikiDTO getStash(Long id) {
if (id == null) {
return null;
}
return geccoRedis.get(stashContentKey + ":" + id);
}
}