awskafka

Run a Kafka producer and consumer

To publish and collect your first message, follow these instructions:

  • Export the authentication configuration:

      $ export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf"
    
  • Declare a new topic with a single partition and only one replica:

      $ /opt/bitnami/kafka/bin/kafka-topics.sh --create --bootstrap-server SERVER-IP:9092 --replication-factor 1 --partitions 1 --topic test
    

    The --replication-factor parameter indicates how many servers will have a copy of the logs, and the --partitions parameter controls the number of partitions that will be created for the topic.

  • Start a new producer on the same Kafka server and generates a message in the topic. Remember to replace SERVER-IP with your server’s public IP address. Press CTRL-D to generate the message.

      $ /opt/bitnami/kafka/bin/kafka-console-producer.sh --bootstrap-server SERVER-IP:9092 --producer.config /opt/bitnami/kafka/config/producer.properties --topic test
    
      this is my first message
      and this one my second
    
  • Collect and display the first message in the consumer:

      $ /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server SERVER-IP:9092 --consumer.config /opt/bitnami/kafka/config/consumer.properties --topic test --from-beginning
    

Troubleshooting

You might get an error message like this when starting a new producer:

org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:433)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
at kafka.producer.NewShinyProducer.<init>(BaseProducer.scala:40)
at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:49)
at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:131)
at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:96)
at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:78)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:103)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:61)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:86)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:390)
[...]

You might also see this error:

[ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:781)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:635)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:617)
at kafka.consumer.NewShinyConsumer.<init>(BaseConsumer.scala:61)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:131)
at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:96)
at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:78)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:103)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:61)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:86)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)

If you get any of those errors, then it is possible you did not configure the authentication. Run the following command to export the kafka_jaas.conf file with the required credentials for the client.

    $ export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf"

If you are using your own client, it is likely that you did not set the authentication parameters properly.

Last modification May 22, 2023