其实今天是我生日来着,本来想着放个假今天博客只更一篇,不过想到计划不能轻易被打破,大晚上地还是起来补了这一篇。←_←
这个内容是在《Redis深度历险:核心原理与应用实践》这本书里面看到的,众所周知List类型可以用来实现一个异步队列,不过只能实现一个即时的生产与消费。而使用Sorted_set(下面称Zset)这个类型可以实现一个延时消费的队列。
思想是通过将消息序列化成一个字符串作为Zset的value,到期处理时间作为score,然后用多个线程轮询Zset获取到期的任务(使用zrem
命令)进行处理。之所以使用多个线程轮询是保证一个线程挂了还有其他线程可以处理,而redis的单线程模型决定了zrem
要么成功,要么失败,在多线程环境下是安全的,避免多个线程同时获取消息而导致消息重复消费。
下面直接上代码,来源是书上的代码,我做了一点修改:
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.Jedis;
import java.lang.reflect.Type;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
@Slf4j
public class RedisDelayingQueue<T> {
//消息
static class TaskItem<T>{
public String id;
public T msg;
}
//JSON转对象需要泛型Type
private Type TaskType = new TypeReference<TaskItem<T>>(){}.getType();
private Jedis jedis;
//队列的键
private String queueKey;
public RedisDelayingQueue(Jedis jedis,String queueKey){
this.jedis = jedis;
this.queueKey = queueKey;
}
//产生一条消息
public void delay(T msg){
TaskItem<T> task = new TaskItem<>();
task.id = UUID.randomUUID().toString();
task.msg = msg;
String s = JSON.toJSONString(task);
//随机产生延迟时间,在2s~10s的范围内
Random random = new Random();
int i = random.nextInt(8000)+2000;
//使用zadd添加消息,第二个参数score为到期时间,第三个参数为value
jedis.zadd(queueKey,System.currentTimeMillis()+i,s);
log.info("产生消息:{},延迟时间:{}",msg,i);
}
public void loop(){
//轮询检查是否有到期的消息
while (!Thread.interrupted()){
//zrangeByScore按score进行排序
//后面两个参数offset=0和count=1代表只取出第一条到期时间最小的数据
Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(),0,1);
if(values.isEmpty()){
//如果没有到期的消息,等待0.1秒后再进行轮询
try {
Thread.sleep(100);
}catch (InterruptedException e){
break;
}
continue;
}
String s = values.iterator().next();
//如果抢到了消息,进行消费
if(jedis.zrem(queueKey, s)>0){
TaskItem<T> task = JSON.parseObject(s,TaskType);
this.handleMsg(task.msg);
}
}
}
//消费消息
public void handleMsg(T msg) {
log.info("消息被消费:{}",msg);
}
public static void main(String[] args) {
//建立两个连接,防止两个comsumer争用一个jedis对象轮询,会产生read time out错误
Jedis jedis1 = new Jedis("192.168.176.128");
Jedis jedis2 = new Jedis("192.168.176.128");
RedisDelayingQueue<String> queue1 = new RedisDelayingQueue<>(jedis1, "q-demo");
RedisDelayingQueue<String> queue2 = new RedisDelayingQueue<>(jedis2, "q-demo");
//消息生产者
Thread producer = new Thread(()->{
for(int i = 0;i<10;i++){
queue1.delay("codehole"+i);
}
},"producer");
//消息消费者
Thread consumer1 = new Thread(()-> queue1.loop(),"consumer1");
Thread consumer2 = new Thread(()-> queue2.loop(),"consumer2");
producer.start();
consumer1.start();
consumer2.start();
try {
//等待消息生产完成
producer.join();
//主线程休眠11000毫秒,保证消息全部过期
Thread.sleep(11000);
//设置两个消费者线程打断标记位,让它们在下一个循环结束退出
consumer1.interrupt();
consumer2.interrupt();
//等待两个消费者线程完成
consumer1.join();
consumer2.join();
}catch (InterruptedException e){
}
}
}
运行程序,观察结果(不要在意时间,睡了一觉半夜起来补的博客):
01:54:38.587 [producer] INFO com.rhett.test.RedisDelayingQueue - 产生消息:codehole0,延迟时间:8532
01:54:38.591 [producer] INFO com.rhett.test.RedisDelayingQueue - 产生消息:codehole1,延迟时间:8939
01:54:38.591 [producer] INFO com.rhett.test.RedisDelayingQueue - 产生消息:codehole2,延迟时间:4227
01:54:38.594 [producer] INFO com.rhett.test.RedisDelayingQueue - 产生消息:codehole3,延迟时间:9167
01:54:38.595 [producer] INFO com.rhett.test.RedisDelayingQueue - 产生消息:codehole4,延迟时间:9748
01:54:38.595 [producer] INFO com.rhett.test.RedisDelayingQueue - 产生消息:codehole5,延迟时间:2505
01:54:38.596 [producer] INFO com.rhett.test.RedisDelayingQueue - 产生消息:codehole6,延迟时间:7359
01:54:38.596 [producer] INFO com.rhett.test.RedisDelayingQueue - 产生消息:codehole7,延迟时间:6994
01:54:38.597 [producer] INFO com.rhett.test.RedisDelayingQueue - 产生消息:codehole8,延迟时间:6011
01:54:38.597 [producer] INFO com.rhett.test.RedisDelayingQueue - 产生消息:codehole9,延迟时间:3717
01:54:41.221 [consumer1] INFO com.rhett.test.RedisDelayingQueue - 消息被消费:codehole5
01:54:42.331 [consumer1] INFO com.rhett.test.RedisDelayingQueue - 消息被消费:codehole9
01:54:42.836 [consumer1] INFO com.rhett.test.RedisDelayingQueue - 消息被消费:codehole2
01:54:44.623 [consumer2] INFO com.rhett.test.RedisDelayingQueue - 消息被消费:codehole8
01:54:45.633 [consumer2] INFO com.rhett.test.RedisDelayingQueue - 消息被消费:codehole7
01:54:45.973 [consumer1] INFO com.rhett.test.RedisDelayingQueue - 消息被消费:codehole6
01:54:47.145 [consumer2] INFO com.rhett.test.RedisDelayingQueue - 消息被消费:codehole0
01:54:47.551 [consumer2] INFO com.rhett.test.RedisDelayingQueue - 消息被消费:codehole1
01:54:47.788 [consumer1] INFO com.rhett.test.RedisDelayingQueue - 消息被消费:codehole3
01:54:48.359 [consumer2] INFO com.rhett.test.RedisDelayingQueue - 消息被消费:codehole4
可以看到消息的延迟消费效果已经实现了,并且消息没有被重复消费。
原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/%e4%bd%bf%e7%94%a8redis%e5%ae%9e%e7%8e%b0%e4%b8%80%e4%b8%aa%e5%bb%b6%e6%97%b6%e9%98%9f%e5%88%97/