Redis 成功 与 Boot 能否适宜做信息队列 Redis 信息队列! Redission List Spring
散布式系统中必备的一个两边件就是信息队列,经过信息队列你能对服务间启动异步解耦、流量消峰、成功最终分歧性。
目前市面上曾经有RabbitMQ、RochetMQ、ActiveMQ、Kafka等,有人会问:“Redis 适宜做信息队列么?”
在回答这个疑问之前,你先从实质思索。
我将联合信息队列的特点,剖析经常使用 Redis 的 List 作为信息队列的成功原理,并分享如何把 SpringBoot 与 Redission 整合来操作 Redis 运用到名目中。
信息队列是一种异步的服务间通讯形式,实用于散布式和微服务架构。信息在被处置和删除之前不时存储在队列上。
每条信息仅可被一位用户处置一次性。信息队列可被用于分别重量级处置、缓冲或批处置上班以及缓解高峰期上班负载。
信息队列在实践运行中包括如下四个场景。
信息队列满足哪些特性
信息有序性
信息是异步处置的,然而消费者须要依照消费者发送信息的顺序来消费,防止发生后发送的信息被先处置的状况。
重复信息处置
消费者或许由于网络疑问发生信息重传造成消费者或许会收到多条重复信息。
雷同的信息重复屡次的话或许会形成一业务逻辑屡次口头,须要确保如何防止重复消费疑问。
牢靠性
一次性保证信息的传递。假设发送信息时接纳者无法用,信息队列会保管信息,直到成功地传递它。
当消费者重启后,可以继续读取信息启动处置,防止信息遗漏。
消费者经常使用LPUSH key element[element...]将信息拔出到队列的头部,假设 key 不存在则会创立一个空的队列再拔出信息。
如下,消费者向队列 queue 先后拔出了 “Java”、“码哥字节”、“Go”,前往值示意信息拔出队列后的个数。
> LPUSH queue Java 码哥字节 Go(integer) 3
确实,List 并没有提供相似于 Kafka 的 ConsumeGroup ,会经常使用多个消费者筹划给你续组成一个消费组来分担处置队列信息。不过在 Redis 5.0 之后,提供了 Streams 数据类型,前面我会引见到。
消费者经常使用RPOP key依次读取队列的信息,先进先出,所以 “Java”会先读敞开费:
> RPOP queue"Java"> RPOP queue"码哥字节"> RPOP queue"Go"
图2-13
别快乐的太早,LPUSH、RPOP存在一共性能危险,消费者向队列拔出数据的时刻,List 并不会被动通知消费者及时消费。
程序须要不时轮询并判别能否为空再口头消费逻辑,这就会造成即使没有新信息写入队列,消费者也在不停地调用RPOP命令占用CPU资源。
请叫我贴心哥 Redis,我提供了BLPOP、BRPOP阻塞读取的命令,消费者在读取队列没有数据的时刻智能阻塞,直到有新的信息写入队列,才会继续读取新信息口头业务逻辑。
BRPOP queue 0
参数 0 示意阻塞期待期间无止期,哪怕是烟花易冷人事易分,雨纷繁旧故里草木深,斑驳的城门盘踞着老树根,石板上回荡的是再等,不时等到“心上人”来。
其实这就是幂等,关于同一条信息,消费者收到后处置一次性的结果和屡次的结果是分歧的。
实质就是消费者在处置信息的时刻解体了,无法再读取信息,不足一个信息确认牢靠机制。
我提供了BRPOPLPUSH source destination timeout指令,含意是阻塞的形式从source队列读取信息的同时把这条信息复制到另一个destination队列中(备份),并且是原子操作。
不过这个指令在 6.2 版本被BLMOVE取代。接上去,上才艺!消费者经常使用LPUSH把信息依次从存入order:pay队列队头(左端)。
LPUSH order:pay "谢霸戈"LPUSH order:pay "肖材吉"
消费者消费信息的时刻在while循环经常使用BLMOVE以阻塞的形式从队列order:pay队尾(右端)弹出信息“谢霸戈”,同时把该信息复制到队列order:pay:back队头(左端),该操作是原子性的,最后一个参数 timeout = 0 示意继续期待。
BLMOVE order:pay order:pay:back RIGHT LEFT 0
假设消费信息“谢霸戈”成功,那就经常使用LREM把队列order:pay:back的“谢霸戈”信息删除,从而成功 ACK 确认机制。
LREM order:pay:back 0 "谢霸戈"
倒数第二个参数 count 的含意如下。
消费意外的话,运行程序经常使用BRPOP order:pay:back从备份队列再次读取信息处置即可。
在 Java 中,你可以应用 Redission 封装的 API 来极速成功队列,接上去我将基于 SpringBoot 2.1.4 版原本教你如何整合 Redisson。
增加依赖
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.23.3</version></dependency>
application.yaml引入 Redisson 性能文件。
spring:application:name: redissionredis:redisson:file: classpath:redisson-config.yaml
创立redisson-config.yaml性能。
singleServerConfig:idleConnectionTimeout: 10000connectTimeout: 10000timeout: 3000retryAttempts: 3retryInterval: 1500password: magebytesubscriptionsPerConnection: 5clientName: redissonClientaddress: "redis://127.0.0.1:6379"subscriptionConnectionMinimumIdleSize: 1subscriptionConnectionPoolSize: 50connectionMinimumIdleSize: 24connectionPoolSize: 64database: 0dnsMonitoringInterval: 5000threads: 16nettyThreads: 32codec: !<org.redisson.codec.Kryo5Codec> {}transportMode: "NIO"
在代码中,我经常使用的是阻塞双端队列,消费者开启死循环,口头指令。
@Slf4j@Servicepublic class QueueService {@Autowiredprivate RedissonClient redissonClient;private static final String ORDER_PAY_QUEUE = "order:pay";private static final String ORDER_PAY_BACK_QUEUE = "order:pay:back";/*** 消费者发送信息到队列头部** @param message*/public void sendMessage(String message) {RBlockingDeque<String> orderPayQueue = redissonClient.getBlockingDeque(ORDER_PAY_QUEUE);try {orderPayQueue.putFirst(message);log.info("将信息: {} 拔出到队列 {}。", message, ORDER_PAY_QUEUE);} catch (InterruptedException e) {e.printStackTrace();}}/*** 消费者消费信息*/public void onMessage() {RBlockingDeque<String> orderPayQueue = redissonClient.getBlockingDeque(ORDER_PAY_QUEUE);while (true) {// BLMOVE order:pay order:pay:back RIGHT LEFT 0String message = orderPayQueue.move(Duration.ofSeconds(0), DequeMoveArgs.pollLast().addFirstTo(ORDER_PAY_BACK_QUEUE));log.info("从队列 {} 中读取到信息:{},并把信息复制到 {} 队列.", ORDER_PAY_QUEUE, message, ORDER_PAY_BACK_QUEUE);// 消费反常,从 ORDER_PAY_BACK_QUEUE 删除这条信息,LREM order:pay:back 0 messageremoveBackQueueMessage(message, ORDER_PAY_BACK_QUEUE);}}/*** 从队列中删除信息* @param message* @param queueName*/private void removeBackQueueMessage(String message, String queueName) {RBlockingDeque<String> orderPayBackDeque = redissonClient.getBlockingDeque(queueName);boolean remove = orderPayBackDeque.remove(message);log.info("消费反常,删除队列 {} 的信息 {}。", queueName, message);}}
单元测试
RunWith(SpringRunner.class)@SpringBootTest(classes = RedissionApplication.class)public class RedissionApplicationTests {@Autowiredprivate QueueService queueService;@Testpublic void testQueue() throws InterruptedException {new Thread(() -> {for (int i = 0; i < 1000; i++) {queueService.sendMessage("信息" + i);}}).start();new Thread(() -> queueService.onMessage()).start();Thread.currentThread().join();}}
可以经常使用 List 数据结构来成功信息队列,满足先进先出。
Redis 是一个十分轻量级的键值数据库,部署一个 Redis 实例就是启动一个进程,部署 Redis 集群,也就是部署多个 Redis 实例。
而 Kafka、RabbitMQ 部署时,触及额外的组件,例如 Kafka 的运转就须要再部署 ZooKeeper。相比 Redis 来说,Kafka 和 RabbitMQ 普通被以为是重量级的信息队列。
须要留意的是,咱们要防止消费者过快,消费者过慢造成的信息沉积占用 Redis 的内存。
在信息量不大的状况下经常使用 Redis 作为信息队列,他能给咱们带来高性能的信息读写,这仿佛也是一个很好信息队列处置打算。