Kafka Java Producer和kerberos

卡尔佩什

在kerberosed环境中向kafka主题发送消息时出错。我们在hdp 2.3上有集群

我遵循了这个http://henning.kropponline.de/2016/02/21/secure-kafka-java-producer-with-kerberos/

但是对于发送消息,我必须先显式地执行kinit,然后才能将消息发送到kafka主题。我试图通过java类进行编织,但是那也不起作用。PFB代码:

package com.ct.test.kafka;

import java.util.Date;
import java.util.Properties;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer {

    public static void main(String[] args) {

        String principalName = "ctadmin";
        String keyTabPath = "/etc/security/keytabs/ctadmin.keytab";
        boolean authStatus = CTSecurityUtil.loginUserFromKeytab(principalName, keyTabPath);

        if (!authStatus) {
            System.out.println("Authntication fails, try something else  "  + authStatus);
        } else {
            System.out.println("Authntication successfull " + authStatus);
        }

        System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
        System.setProperty("java.security.auth.login.config", "/etc/kafka/2.3.4.0-3485/0/kafka_jaas.conf");
        System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
        System.setProperty("sun.security.krb5.debug", "true");

        try {
            long events = Long.parseLong("3");
            Random rnd = new Random();

            Properties props = new Properties();
            System.out.println("After broker list- " + args[0]);

            props.put("metadata.broker.list", args[0]);
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            props.put("request.required.acks", "1");
            props.put("security.protocol", "PLAINTEXTSASL");

            //props.put("partitioner.class", "com.ct.test.kafka.SimplePartitioner");


            System.out.println("After config prop -1");

            ProducerConfig config = new ProducerConfig(props);

            System.out.println("After config prop -2 config" + config);

            Producer<String, String> producer = new Producer<String, String>(config);

            System.out.println("After config prop -3");

            for (long nEvents = 0L; nEvents < events; nEvents += 1L) {
                Date runtime = new Date();
                String ip = "192.168.2" + rnd.nextInt(255);
                String msg = runtime + " www.example.com, " + ip;
                KeyedMessage<String, String> data = new KeyedMessage<String, String>("test_march4", ip, msg);

                System.out.println("After config prop -1 data" + data);

                producer.send(data);
            }
            producer.close();

        } catch (Throwable th) {
            th.printStackTrace();

        }
    }
}

Pom.xml:从hortonworks存储库下载的所有依赖项。

        <dependencies>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.10</artifactId>
                <version>0.9.0.2.3.4.0-3485</version>
            </dependency>

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.9.0.2.3.4.0-3485</version>
            </dependency>

            <dependency>
                <groupId>org.jasypt</groupId>
                <artifactId>jasypt-spring31</artifactId>
                <version>1.9.2</version>
                <scope>compile</scope>
            </dependency>

            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.7.1.2.3.4.0-3485</version>
            </dependency>

        </dependencies>

错误:案例1:当我指定myuser kafka_jass.conf时

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
After config prop -2 configkafka.producer.ProducerConfig@643293ae
java.lang.SecurityException: Configuration Error:
        Line 6: expected [controlFlag]
        at com.sun.security.auth.login.ConfigFile.<init>(ConfigFile.java:110)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
        at java.lang.Class.newInstance(Class.java:379)
        at javax.security.auth.login.Configuration$2.run(Configuration.java:258)
        at javax.security.auth.login.Configuration$2.run(Configuration.java:250)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:249)
        at org.apache.kafka.common.security.kerberos.Login.login(Login.java:291)
        at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
        at kafka.common.security.LoginManager$.init(LoginManager.scala:36)
        at kafka.producer.Producer.<init>(Producer.scala:50)
        at kafka.producer.Producer.<init>(Producer.scala:73)
        at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
        at com.ct.test.kafka.TestProducer.main(TestProducer.java:51)
Caused by: java.io.IOException: Configuration Error:
        Line 6: expected [controlFlag]
        at com.sun.security.auth.login.ConfigFile.match(ConfigFile.java:563)
        at com.sun.security.auth.login.ConfigFile.parseLoginEntry(ConfigFile.java:413)
        at com.sun.security.auth.login.ConfigFile.readConfig(ConfigFile.java:383)
        at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:283)
        at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:219)
        at com.sun.security.auth.login.ConfigFile.<init>(ConfigFile.java:108)

MyUser_Kafka_jass.conf

KafkaClient {
   com.sun.security.auth.module.Krb5LoginModule required
   doNotPrompt=true
   useTicketCache=true
   renewTicket=true
   principal="ctadmin/[email protected]";
   useKeyTab=true
   serviceName="kafka"
   keyTab="/etc/security/keytabs/ctadmin.keytab"
   client=true;
};
Client {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="/etc/security/keytabs/ctadmin.keytab"
   storeKey=true
   useTicketCache=true
   serviceName="zookeeper"
   principal="ctadmin/[email protected]";
};

case2:当我指定Kafkas自己的jaas文件时

Java config name: /etc/krb5.conf
Loaded from Java config
javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. Make sure -Djava.security.auth.login.config property passed to JVM and the client is configured to use a ticket cache (using the JAAS configuration setting 'useTicketCache=true)'. Make sure you are using FQDN of the Kafka broker you are trying to connect to. not available to garner  authentication information from the user
        at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:899)
        at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:719)
        at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:584)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at javax.security.auth.login.LoginContext.invoke(LoginContext.java:762)
        at javax.security.auth.login.LoginContext.access$000(LoginContext.java:203)
        at javax.security.auth.login.LoginContext$4.run(LoginContext.java:690)
        at javax.security.auth.login.LoginContext$4.run(LoginContext.java:688)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:687)
        at javax.security.auth.login.LoginContext.login(LoginContext.java:595)
        at org.apache.kafka.common.security.kerberos.Login.login(Login.java:298)
        at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
        at kafka.common.security.LoginManager$.init(LoginManager.scala:36)
        at kafka.producer.Producer.<init>(Producer.scala:50)
        at kafka.producer.Producer.<init>(Producer.scala:73)
        at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
        at com.ct.test.kafka.TestProducer.main(TestProducer.java:51)

如果我在运行该应用程序之前先进行kinit,这将很好地工作,否则它将通过上述错误发生。我无法在生产环境中执行此操作,如果我们的应用本身可以执行任何操作,请帮帮我。如果您需要更多详细信息,请告诉我。

谢谢:)

卡尔佩什

我不知道第一次犯了什么错误,在再次犯下的错误之后,它仍然可以正常工作。

首先授予所有访问主题的权限:

bin/kafka-acls.sh --add --allow-principals user:ctadmin --operation ALL --topic marchTesting --authorizer-properties zookeeper.connect={hostname}:2181

创建jass文件:kafka-jaas.conf

KafkaClient {
 com.sun.security.auth.module.Krb5LoginModule required
 doNotPrompt=true
 useTicketCache=true
 principal="[email protected]"
 useKeyTab=true
 serviceName="kafka"
 keyTab="/etc/security/keytabs/ctadmin.keytab"
 client=true;
};

Java程序:

package com.ct.test.kafka;

import java.util.Date;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class KafkaProducer {

    public static void main(String[] args) {
        String topic = args[0];

        Properties props = new Properties();
        props.put("metadata.broker.list", "{Hostname}:6667");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "1");
        props.put("security.protocol", "PLAINTEXTSASL");

        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);

        for (int i = 0; i < 10; i++){
            producer.send(new KeyedMessage<String, String>(topic, "Test Date: " + new Date()));
        }
    }
}

运行应用程序:

java -Djava.security.auth.login.config = / home / ctadmin / kafka-jaas.conf -Djava.security.krb5.conf = / etc / krb5.conf -Djavax.security.auth.useSubjectCredsOnly = true -cp kafka -testing-0.0.1-jar-with-dependencies.jar com.ct.test.kafka.KafkaProducer

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

Java Apache Kafka Producer元数据更新程序和重试逻辑

来自分类Dev

Java/Scala Kafka Producer 不向主题发送消息

来自分类Dev

KAFKA 和 SSL:java.lang.OutOfMemoryError:在 KAFKA SSL 集群上使用 kafka-topics 命令时的 Java 堆空间

来自分类Dev

Spring Kafka消费者ACKMODE和Producer缓冲以进行Kafka交易

来自分类Dev

如何在Maven项目中将依赖项org.apache.kafka.clients.producer.Producer解析为apache kafka的Java生产者

来自分类Dev

Kafka Producer的Java客户端示例,send方法不接受KeyedMessage

来自分类Dev

Kafka Producer Java API未将消息分发到所有主题分区

来自分类Dev

引起:java.io.NotSerializableException:org.apache.kafka.clients.producer.KafkaProducer

来自分类Dev

Apache Kafka Producer配置错误

来自分类Dev

NoSuchMethodError:Spark + Kafka:Java

来自分类Dev

在Java中验证,使用和重用密码和Kerberos凭证

来自分类Dev

Kafka:Java Producer发送消息后,在控制台使用者上未看到任何消息

来自分类Dev

Kerberos和LDAP:为什么Java 6在使用Kerberos时会截断LDAP主机名?

来自分类Dev

使用 Spring Kafka 续订 Kerberos 票证

来自分类Dev

Kafka和Kafka Connect部署环境

来自分类Dev

通过不同的线程使用Kafka Producer

来自分类Dev

如何降低Kafka Producer的写入速度?

来自分类Dev

如何为Kafka Producer示例修复NoClassDefFoundError?

来自分类Dev

Kafka Console Producer发送消息失败

来自分类Dev

Kafka Producer消费者API问题

来自分类Dev

MaprStream 与 spring 集成 Kafka Producer 问题

来自分类Dev

在 REST 代理中设置 Kafka Producer 属性

来自分类Dev

Kafka Java SimpleConsumer奇怪的编码

来自分类Dev

Kafka Consumer-Java实现

来自分类Dev

了解Java Kafka Consumer API

来自分类Dev

ELK与Kafka和Hadoop

来自分类Dev

使用Java中的wait()和notify()的Producer Consumer Program

来自分类Dev

使用Java中的wait()和notify()的Producer Consumer Program

来自分类Dev

kafka 8和内存-Java运行时环境没有足够的内存来继续

Related 相关文章

  1. 1

    Java Apache Kafka Producer元数据更新程序和重试逻辑

  2. 2

    Java/Scala Kafka Producer 不向主题发送消息

  3. 3

    KAFKA 和 SSL:java.lang.OutOfMemoryError:在 KAFKA SSL 集群上使用 kafka-topics 命令时的 Java 堆空间

  4. 4

    Spring Kafka消费者ACKMODE和Producer缓冲以进行Kafka交易

  5. 5

    如何在Maven项目中将依赖项org.apache.kafka.clients.producer.Producer解析为apache kafka的Java生产者

  6. 6

    Kafka Producer的Java客户端示例,send方法不接受KeyedMessage

  7. 7

    Kafka Producer Java API未将消息分发到所有主题分区

  8. 8

    引起:java.io.NotSerializableException:org.apache.kafka.clients.producer.KafkaProducer

  9. 9

    Apache Kafka Producer配置错误

  10. 10

    NoSuchMethodError:Spark + Kafka:Java

  11. 11

    在Java中验证,使用和重用密码和Kerberos凭证

  12. 12

    Kafka:Java Producer发送消息后,在控制台使用者上未看到任何消息

  13. 13

    Kerberos和LDAP:为什么Java 6在使用Kerberos时会截断LDAP主机名?

  14. 14

    使用 Spring Kafka 续订 Kerberos 票证

  15. 15

    Kafka和Kafka Connect部署环境

  16. 16

    通过不同的线程使用Kafka Producer

  17. 17

    如何降低Kafka Producer的写入速度?

  18. 18

    如何为Kafka Producer示例修复NoClassDefFoundError?

  19. 19

    Kafka Console Producer发送消息失败

  20. 20

    Kafka Producer消费者API问题

  21. 21

    MaprStream 与 spring 集成 Kafka Producer 问题

  22. 22

    在 REST 代理中设置 Kafka Producer 属性

  23. 23

    Kafka Java SimpleConsumer奇怪的编码

  24. 24

    Kafka Consumer-Java实现

  25. 25

    了解Java Kafka Consumer API

  26. 26

    ELK与Kafka和Hadoop

  27. 27

    使用Java中的wait()和notify()的Producer Consumer Program

  28. 28

    使用Java中的wait()和notify()的Producer Consumer Program

  29. 29

    kafka 8和内存-Java运行时环境没有足够的内存来继续

热门标签

归档