GitHub - houbb/mq: The message queue in java.(java 简易版本 mq 实现) (original) (raw)
mq
mq 是基于 netty 实现的 java mq 框架,类似于 rocket mq。
主要用于个人学习,由渐入深,理解 mq 的底层实现原理。
特性
- 基于 netty4 的客户端调用服务端
- timeout 超时处理
- broker 启动的 check 检测服务可用性
- load balance 负载均衡
- 基于 TAG 的消息过滤,broker 端实现
- 生产者的消息同步发送,ONE WAY 发送
- 生产消息的批量发送
- 消息状态的批量确认
- fail 支持 failOver failFast 等失败处理策略
- heartbeat 服务端心跳
- AT LEAST ONCE 最少一次原则
快速入门
测试
注册中心
依赖 maven 包:
com.github.houbb mq-broker 0.1.3代码实现:
MqBroker broker = new MqBroker(); broker.start();
消费者
依赖 maven 包:
com.github.houbb mq-consumer 0.1.3代码实现:
final MqConsumerPush mqConsumerPush = new MqConsumerPush(); mqConsumerPush.start();
mqConsumerPush.subscribe("TOPIC", "TAGA"); mqConsumerPush.registerListener(new IMqConsumerListener() { @Override public ConsumerStatus consumer(MqMessage mqMessage, IMqConsumerListenerContext context) { System.out.println("---------- 自定义 " + JSON.toJSONString(mqMessage)); return ConsumerStatus.SUCCESS; } });
生产者
依赖 maven 包:
com.github.houbb mq-producer 0.1.3代码实现:
MqProducer mqProducer = new MqProducer(); mqProducer.start();
String message = "HELLO MQ!"; MqMessage mqMessage = new MqMessage(); mqMessage.setTopic("TOPIC"); mqMessage.setTags(Arrays.asList("TAGA", "TAGB")); mqMessage.setPayload(message);
SendResult sendResult = mqProducer.send(mqMessage); System.out.println(JSON.toJSON(sendResult));
前言
工作至今,接触 mq 框架已经有很长时间。
但是对于其原理一直只是知道个大概,从来没有深入学习过。
以前一直想写,但由于各种原因被耽搁。
技术准备
这些技术的准备阶段,花费了比较长的时间。
也建议想写 mq 框架的有相关的知识储备。
其他 mq 框架使用的经验此处不再赘述。
快速迭代
原来一直想写 mq,却不行动的原因就是想的太多,做的太少。
想一下把全部写完,结果就是啥都没写。
所以本次的开发,每个代码分支做的事情实际很少,只做一个功能点。
陆陆续续经过近一个月的完善,对 mq 框架有了自己的体会和进一步的认知。
代码实现功能,主要参考 Apache Dubbo
文档
文档
文档将使用 markdown 文本的形式,补充 code 层面没有的东西。
【mq】从零开始实现 mq-02-如何实现生产者调用消费者?
【mq】从零开始实现 mq-03-引入 broker 中间人
【mq】从零开始实现 mq-06-消费者心跳检测 heartbeat
【mq】从零开始实现 mq-07-负载均衡 load balance
【mq】从零开始实现 mq-09-消费者拉取消息 pull message
【mq】从零开始实现 mq-10-消费者拉取消息回执 pull message ack
【mq】从零开始实现 mq-11-消费者消息回执添加分组信息 pull message ack groupName
代码注释
代码有详细的注释,便于阅读和后期维护。
测试
目前测试代码算不上完善。后续将陆续补全。
mq 模块
| 模块 | 说明 |
|---|---|
| mq-common | 公共代码 |
| mq-broker | 注册中心 |
| mq-producer | 服务端 |
| mq-consumer | 客户端 |
| mq-test | 测试模块 |
测试代码
这部分测试代码可以关注公众号【老马啸西风】,后台回复【mq】领取。
后期 ROAD-MAP
- all 模块
- check broker 启动检测
- 关闭时通知 register center
- 优雅关闭添加超时设置
- heartbeat 心跳检测机制
- 完善 load-balance 实现 + shardingkey 粘性消费、请求
- 失败重试的拓展
- 消费者 pull 策略实现
- pull 消息消费的 ACK 处理
- broker springboot 实现
- 消息的 ack 处理,要基于 groupName 进行处理
- 消息的回溯消费 offset
- 消息的批量发送,批量 ACK
- 添加注册鉴权,保证安全性
- 顺序消息
- 事务消息
- 定时消息
- 流量控制 back-press 反压
- 消息可靠性
- offline message 离线消息
- dead message 死信队列
- 断线重连
