前言
最近使用helm3安装好了kafka和rabbitmq,并且想集成到spring中,发现集成不是那么简单的,虽然有官方实例,但是实例上面缺少必要的代码所以通过自己摸索一步步完成,分享给大家。
操作
首先,安装好kafka安装和rabbitmq安装环境,安装好之后,我们就可以配置spring了。
1、首先引入相关依赖包:
org.springframework.cloud
spring-cloud-stream-binder-kafka
org.springframework.cloud
spring-cloud-stream-binder-rabbit
2、配置application.yml文件
下面是我的配置,如下所示:
spring:
cloud:
stream:
function:
definition: testKafkaOut;testKafkaIn;testRabbitOut;testRabbitIn
bindings:
testKafkaOut-out-0:
binder: kafka-binder
destination: test
#设置消息类型,本次为json,文本则设置"text/plain"
content-type: application/json
testKafkaIn-in-0:
binder: kafka-binder
destination: test
content-type: application/json
group: log_group
testRabbitOut-out-0:
binder: rabbit-binder
destination: dev
content-type: application/json
testRabbitIn-in-0:
binder: rabbit-binder
destination: dev
content-type: application/json
group: dev-group
binders:
kafka-binder:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: xxx.xxx.xxx.xxx:xxxx
auto-create-topics: true
rabbit-binder:
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: xxx.xxx.xxx.xxx
port: xxxx
username: user
password: password
virtual-host: dev
这样就完成了spring cloud stream
和kafka
以及rabbitmq
的配置。
3、收发消息
新建一个java类进行收发消息操作,如下所示:
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;
import java.util.function.Supplier;
@Component
public class MessageProcessor {
@Bean
public Supplier> testKafkaOut() {
return () -> MessageBuilder.withPayload("Hello from Kafka!").build();
}
@Bean
public Consumer> testKafkaIn() {
return message -> System.out.println("Received from Kafka: " + message.getPayload());
}
@Bean
public Supplier> testRabbitOut() {
return () -> MessageBuilder.withPayload("Hello from RabbitMQ!").build();
}
@Bean
public Consumer> testRabbitIn() {
return message -> System.out.println("Received from RabbitMQ: " + message.getPayload());
}
}
或者使用下面简单的写法:
发消息:
@Autowired
private StreamBridge streamBridge;
...
streamBridge.send("testRabbitOut-out-0", "hello rabbitmq");
streamBridge.send("testKafkaOut-out-0", "hello kafka");
...
收消息:
@Component
public class ConsumersHandler {
@Bean
public Consumer testKafkaIn(){
return str -> {
System.out.println("Success Rescive message from kafka: " + str);
};
}
@Bean
public Consumer testRabbitIn() {
return str -> {
System.out.println("Success Rescive message from rabbitmq: " + str);
};
}
}
两种写法都可以,就看你自己的选择了,下面是收到消息的打印结果:
Success Rescive message from rabbitmq: hello rabbitmq
Success Rescive message from kafka: hello kafka
总结
1、配置application.yml文件的时候要注意function.definition
的写法,如下所示:
function:
definition: testKafkaOut;testKafkaIn;testRabbitOut;testRabbitIn
而我一开始写成了:
testKafkaOut,testKafkaIn,testRabbitOut,testRabbitIn
导致报错:
kafka-binder,rabbit-binder, and no default binder has been set.
别小看这个问题,因为我的粗心花了一周才解决,唉!
2、同样是application.yml中的配置default-binder
可以不用设置,因为我们在每个bindings中已经指定了binder了
3、下面的写法,应用程序启动之后报:rabbitmq binder是找不到
,所以使用了localhost:5672
,如下所示:
spring:
cloud:
stream:
kafka:
binder:
auto-create-topics: true
brokers: xxx.xxx.xxx.xxx:xxxx
rabbit:
binder:
host: xxx.xxx.xxx.xxx
port: xxxx
username: user
password: password
virtual-host: dev
这个写法有问题,所以推荐我上面的binders
写法。
4、我开始借助了chatgpt3.5,它有时候给的代码都是spring cloud strem 3.1
之前的,于是我使用了4.0之后给出的代码是最新的,大家可以试试chatgpt4,这里有个推荐的地址
引用
Spring Cloud Stream 函数式编程整合 kafka/rabbit
spring-cloud-stream-samples
Spring Cloud Stream 整合Kafka
Spring Cloud Stream Rabbit 3.1.3 入门实践