[JAVA] kafka添加安全验证配置方式

1466 0
王子 2022-11-9 09:40:49 | 显示全部楼层 |阅读模式
目录

    服务端配置
      1. config 目录添加kafka_server_jaas.conf 配置文件2. kafka-run-class.sh 添加3. config/server.properties
    客户端接入改造
      完整配置示例


    综合考虑性能影响、管理成本、安全等级要求,接入便利程度。 鉴权采用SASL+PLAINTEXT 方式。每个集群会分配统一的访问账号及密码用于客户端访问。

服务端配置


1. config 目录添加kafka_server_jaas.conf 配置文件

内容:
  1. KafkaServer {
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="admin"
  4. password="7f8d9dsf789ds7ffsdfdsfu9"
  5. user_admin="7f8d9dsf789ds7ffsdfdsfu9"
  6. user_alice="xjfkddjfdssifds";
  7. };
复制代码
2. kafka-run-class.sh 添加
  1. KAFKA_OPTS="$KAFKA_OPTS -Djava.security.auth.login.config=/home/finance/App/kafka_2.12-2.5.1/config/kafka_server_jaas.conf"
复制代码
3. config/server.properties

添加
  1. security.inter.broker.protocol=SASL_PLAINTEXT
  2. sasl.mechanism.inter.broker.protocol=PLAIN
  3. sasl.enabled.mechanisms=PLAIN
  4. listener, advertised.listeners 添加对应SASL_PLAINTEXT 监听器
  5. listeners=SASL_PLAINTEXT://10.193.196.112:9092
  6. advertised.listeners=SASL_PLAINTEXT://10.193.196.112:9092
复制代码
客户端接入改造

就是在java程序中将安全参数配置进来
生产者、消费者属性配置加入一下配置
  1. props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=alice password=alice;");
  2. props.put("security.protocol", "SASL_PLAINTEXT");
  3. props.put("sasl.mechanism", "PLAIN");
复制代码
springboot 高版本估计有支持在properties文件中直接配置,此处没有验证。

完整配置示例

此方法,可以用KafkaProperties 获取properties配置文件中的参数后,再往里面添加新的参数
  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.beans.factory.annotation.Value;
  3. import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import org.springframework.kafka.annotation.EnableKafka;
  7. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  8. import org.springframework.kafka.config.KafkaListenerContainerFactory;
  9. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  10. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  11. import org.springframework.kafka.core.KafkaTemplate;
  12. import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
  13. import java.util.Map;
  14. /**
  15. * kafka配置
  16. */
  17. @Configuration
  18. @EnableKafka
  19. public class KafkaProducerConfig {
  20.         @Autowired
  21.         private KafkaProperties kafkaProperties;
  22.         /**
  23.          * 消费者配置
  24.          */
  25.         @Bean
  26.         public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
  27.                 ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  28.                 Map<String, Object> props = kafkaProperties.buildConsumerProperties();
  29.                 props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=user1 password=pass1;");
  30.                 props.put("security.protocol", "SASL_PLAINTEXT");
  31.                 props.put("sasl.mechanism", "PLAIN");
  32.                 factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
  33.                 factory.setConcurrency(2);
  34.                 factory.getContainerProperties().setPollTimeout(1500);
  35.                 return factory;
  36.         }
  37.         /**
  38.          * 生产者配置
  39.          */
  40.         @Bean
  41.         public KafkaTemplate<String, String> kafkaTemplate() {
  42.                 Map<String, Object> props = kafkaProperties.buildProducerProperties();
  43.                 props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=user1 password=pass1;");
  44.                 props.put("security.protocol", "SASL_PLAINTEXT");
  45.                 props.put("sasl.mechanism", "PLAIN");
  46.                 return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
  47.         }
  48. }
复制代码
以上为个人经验,希望能给大家一个参考,也希望大家多多支持中国红客联盟。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

中国红客联盟公众号

联系站长QQ:5520533

admin@chnhonker.com
Copyright © 2001-2025 Discuz Team. Powered by Discuz! X3.5 ( 粤ICP备13060014号 )|天天打卡 本站已运行