awskafka

Run a Kafka producer and consumer

To publish and collect your first message you can follow these instructions:

  • Export the authentication configuration:

    $ export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/bitnami/kafka/conf/kafka_jaas.conf"
    
  • Declare a new topic with a single partition and only one replica:

    $ /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper SERVER-IP:2181 --replication-factor 1 --partitions 1 --topic test
    

    --replication-factor is used to indicate how many servers are going to have a copy of the logs, and --partitions is used to choose the number of partitions we are creating for the topic.

  • Start a new producer on the same Kafka server and generates a message in the topic. Remember to replace SERVER-IP with your server’s public IP address. Enter CTRL-D to end the message.

    $ /opt/bitnami/kafka/bin/kafka-console-producer.sh --broker-list SERVER-IP:9092 --producer.config /opt/bitnami/kafka/conf/producer.properties --topic test
    
    this is my first message
    and this one my second
    
  • Push Ctrl+D when you finish entering your messages.

  • Collect and display the first message in the consumer:

    $ /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config /opt/bitnami/kafka/conf/consumer.properties --from-beginning
    

Troubleshooting

If you get an error message like this when starting a new producer:

org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:433)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
at kafka.producer.NewShinyProducer.<init>(BaseProducer.scala:40)
at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:49)
at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:131)
at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:96)
at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:78)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:103)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:61)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:86)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:390)
[...]

You might also see a similar error:

[ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:781)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:635)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:617)
at kafka.consumer.NewShinyConsumer.<init>(BaseConsumer.scala:61)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:131)
at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:96)
at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:78)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:103)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:61)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:86)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)

If you get any of those errors, then it is possible you did not configure the authentication. Run the following command to export the kafka_jaas.conf file with the required credentials for the client.

    $ export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/bitnami/kafka/conf/kafka_jaas.conf"

If you are using your own client, it is likely you not setting the authentication parameters properly.

Last modification January 2, 2019