When working with Kafka, it is very convenient to verify that we are producing the message correctly if we are the producer. One way to do this is to validate/assert that the message has successfully landed in a topic.
In Java, you can create a producer which will consume a message of the topic “topic_name” and then validate that it has been successfully produced. You can see an example of how to do this in the following code.
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.test.utils.KafkaTestUtils;
private KafkaMessage getKafkaMessage() {
ConsumerRecord<String, KafkaMessage> record = null;
Consumer<String, KafkaMessage> consumer = getConsumer();
ConsumerRecords<String, KafkaMessage> records = KafkaTestUtils.getRecords(consumer, 2000, 1);
if (records.count() > 0) {
record = records.iterator().next();
}
consumer.close();
if (!isNull(record)) {
if (!isNull(record.value())) {
return record.value();
}
}
return null;
}
private Consumer<String, KafkaMessage> getConsumer() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
ConsumerFactory<String, KafkaMessage> cf = new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new ErrorHandlingDeserializer<>(new JsonDeserializer<>(KafkaMessage.class, false)));
Consumer<String, KafkaMessage> consumerServiceTest = cf.createConsumer();
consumerServiceTest.subscribe(Collections.singleton(kafkaPrefix + "topic_name"));
return consumerServiceTest;
}
And the KafkaMessage class is:
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
@Data
public class KafkaMessage{
@JsonProperty("id")
String id;
@JsonProperty("responseTesting")
ResponseTesting responseTesting;
}