Sending and Receiving JSON messages in Kafka

Sometime back i wrote couple of articles for Java World about Kafka Big data messaging with Kafka, Part 1 and Big data messaging with Kafka, Part 2, you can find basic Producer and Consumer for Kafka along with some basic samples. I wanted to figure out how do i pass JSON message using Kafka. It looks like Kafak Connect provides a simple JSON Serializer org.apache.kafka.connect.json.JsonSerializer and Desrializer org.apache.kafka.connect.json.JsonDeserializer that uses Jackson JSON parser. I wanted to figure out how to use it, so i built following sample

  • First i did create a Contact Object, which is a simple Pojo that has 3 fields contactId, firstName and lastName. Take a look at main() method, in which i create simple object of Contact and then convert it to JSON and write to console.
  • Next i created, which reads values in CSV format like 1,Sunil,Patil from command line and parse it to Contact object first. Then i convert Contact object into JSONNode and pass it as value to Kafka, The JSONSerializer converts the JsonNode into byte[] The producer code is mostly same as one required for passing String, with difference that on line 35, i am creating object of com.fasterxml.jackson.databind.ObjectMapper and then on line 41 i am converting Contact object into JSONNode by calling objectMapper.valueToTree(contact)
  • Since i am using org.apache.kafka.connect.json.JsonSerializer on the producer i have to use org.apache.kafka.connect.json.JsonDeserializer on the Consumer, Then while creating KafkaConsumer object i declare that i will get String key and JSONNode as value. Then once i get messages from Kafka i am calling mapper.treeToValue(jsonNode,Contact.class) to read the message and convert it back to Contact object.

Now you can run the producer and consumer with same topic name and it should work