目录
服务端配置
1. config 目录添加kafka_server_jaas.conf 配置文件2. kafka-run-class.sh 添加3. config/server.properties
客户端接入改造
综合考虑性能影响、管理成本、安全等级要求,接入便利程度。 鉴权采用SASL+PLAINTEXT 方式。每个集群会分配统一的访问账号及密码用于客户端访问。
服务端配置
1. config 目录添加kafka_server_jaas.conf 配置文件
内容: - KafkaServer {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="admin"
- password="7f8d9dsf789ds7ffsdfdsfu9"
- user_admin="7f8d9dsf789ds7ffsdfdsfu9"
- user_alice="xjfkddjfdssifds";
- };
复制代码 2. kafka-run-class.sh 添加
- KAFKA_OPTS="$KAFKA_OPTS -Djava.security.auth.login.config=/home/finance/App/kafka_2.12-2.5.1/config/kafka_server_jaas.conf"
复制代码 3. config/server.properties
添加 - security.inter.broker.protocol=SASL_PLAINTEXT
- sasl.mechanism.inter.broker.protocol=PLAIN
- sasl.enabled.mechanisms=PLAIN
- listener, advertised.listeners 添加对应SASL_PLAINTEXT 监听器
- listeners=SASL_PLAINTEXT://10.193.196.112:9092
- advertised.listeners=SASL_PLAINTEXT://10.193.196.112:9092
复制代码 客户端接入改造
就是在java程序中将安全参数配置进来
生产者、消费者属性配置加入一下配置 - props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=alice password=alice;");
- props.put("security.protocol", "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");
复制代码springboot 高版本估计有支持在properties文件中直接配置,此处没有验证。
完整配置示例
此方法,可以用KafkaProperties 获取properties配置文件中的参数后,再往里面添加新的参数 - import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.kafka.annotation.EnableKafka;
- import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
- import org.springframework.kafka.config.KafkaListenerContainerFactory;
- import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
- import org.springframework.kafka.core.DefaultKafkaProducerFactory;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
- import java.util.Map;
- /**
- * kafka配置
- */
- @Configuration
- @EnableKafka
- public class KafkaProducerConfig {
- @Autowired
- private KafkaProperties kafkaProperties;
- /**
- * 消费者配置
- */
- @Bean
- public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
- ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
- Map<String, Object> props = kafkaProperties.buildConsumerProperties();
- props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=user1 password=pass1;");
- props.put("security.protocol", "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");
- factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
- factory.setConcurrency(2);
- factory.getContainerProperties().setPollTimeout(1500);
- return factory;
- }
- /**
- * 生产者配置
- */
- @Bean
- public KafkaTemplate<String, String> kafkaTemplate() {
- Map<String, Object> props = kafkaProperties.buildProducerProperties();
- props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=user1 password=pass1;");
- props.put("security.protocol", "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");
- return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
- }
- }
复制代码以上为个人经验,希望能给大家一个参考,也希望大家多多支持中国红客联盟。 |