官方地址:https://spring.io/projects/spring-cloud-stream#overview 中文指导手册地址:https://m.wang1314.com/doc/webapp/topic/20971999.html
消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅
这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。
通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。
如果是需要Stream整合的就将依赖改为spring-cloud-starter-stream-kafka
自己创建一个实现的接口以及里面的方法 注意:在这个服务实现类里面不是使用@Service注解了,因为不是web应用,而是Stream消息驱动,是与中间件进行打交道的不是与数据库
同样的参考如下流程图
与8801一样
配置文件与消息生产的区别在于:
与消息生产者一样
必须要有@Component注解注入到Spring容器中
除了启动的端口号不一样之外其他的配置都一样
目前是8802/8803同时都收到了,存在重复消费问题 解决方案:分组和持久化属性group
自定义配置分组,自定义分为同一个组,解决重复消费问题
分别给8801、8802进行分组【orderA】
orderB是历史记录,上面的配置以及都分为了ordeerA组,进入orderA组可以查看实际的消费者数量 同一组内会发生竞争关系,只有其中一个可以消费,启动项目测试是否为真
通过上述,解决了重复消费问题,再看看持久化
8802因为取消了groupA的分组所以获取不到持久化的数据(如果重启mq也会消失)
8803保存groupA的分组所以在启动的时候就会将持久化的数据消费
您需要 登录 才可以下载或查看,没有账号?立即注册
使用道具 举报
本版积分规则 发表回复 回帖后跳转到最后一页
荣誉红客
2 小时前
5 小时前
昨天 08:06
昨天 07:59
中国红客联盟公众号
联系站长QQ:5520533