Run a Kafka producer and consumer
To publish and collect your first message, follow these instructions:
-
Export the authentication configuration:
$ export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf"
-
Declare a new topic with a single partition and only one replica:
$ /opt/bitnami/kafka/bin/kafka-topics.sh --create --bootstrap-server SERVER-IP:9092 --replication-factor 1 --partitions 1 --topic test
The --replication-factor parameter indicates how many servers will have a copy of the logs, and the --partitions parameter controls the number of partitions that will be created for the topic.
-
Start a new producer on the same Kafka server and generates a message in the topic. Remember to replace SERVER-IP with your server’s public IP address. Press CTRL-D to generate the message.
$ /opt/bitnami/kafka/bin/kafka-console-producer.sh --bootstrap-server SERVER-IP:9092 --producer.config /opt/bitnami/kafka/config/producer.properties --topic test this is my first message and this one my second
-
Collect and display the first message in the consumer:
$ /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server SERVER-IP:9092 --consumer.config /opt/bitnami/kafka/config/consumer.properties --topic test --from-beginning
Troubleshooting
You might get an error message like this when starting a new producer:
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:433)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
at kafka.producer.NewShinyProducer.<init>(BaseProducer.scala:40)
at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:49)
at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:131)
at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:96)
at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:78)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:103)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:61)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:86)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:390)
[...]
You might also see this error:
[ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:781)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:635)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:617)
at kafka.consumer.NewShinyConsumer.<init>(BaseConsumer.scala:61)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set
at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:131)
at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:96)
at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:78)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:103)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:61)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:86)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
If you get any of those errors, then it is possible you did not configure the authentication. Run the following command to export the kafka_jaas.conf file with the required credentials for the client.
$ export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf"
If you are using your own client, it is likely that you did not set the authentication parameters properly.