When working with Kafka, it is very convenient to verify that we are producing the message correctly if we are the producer. One way to do this is to validate/assert that the message has successfully landed in a topic.
In Java, you can create a producer which will consume a message of the topic “topic_name” and then validate that it has been successfully produced. You can see an example of how to do this in the following code.
import org.springframework.kafka.core.ConsumerFactory ;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory ;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer ;
import org.springframework.kafka.support.serializer.JsonDeserializer ;
import org.springframework.kafka.test.utils.KafkaTestUtils ;
private KafkaMessage getKafkaMessage () {
ConsumerRecord < String , KafkaMessage > record = null ;
Consumer < String , KafkaMessage > consumer = getConsumer ();
ConsumerRecords < String , KafkaMessage > records = KafkaTestUtils . getRecords ( consumer , 2000 , 1 );
if ( records . count () > 0 ) {
record = records . iterator (). next ();
}
consumer . close ();
if (! isNull ( record )) {
if (! isNull ( record . value ())) {
return record . value ();
}
}
return null ;
}
private Consumer < String , KafkaMessage > getConsumer () {
Map < String , Object > props = new HashMap <>();
props . put ( ConsumerConfig . BOOTSTRAP_SERVERS_CONFIG , kafkaServer );
props . put ( ConsumerConfig . GROUP_ID_CONFIG , "group_id" );
props . put ( ConsumerConfig . AUTO_OFFSET_RESET_CONFIG , "earliest" );
props . put ( JsonDeserializer . TRUSTED_PACKAGES , "*" );
ConsumerFactory < String , KafkaMessage > cf = new DefaultKafkaConsumerFactory <>(
props ,
new StringDeserializer (),
new ErrorHandlingDeserializer <>( new JsonDeserializer <>( KafkaMessage . class , false )));
Consumer < String , KafkaMessage > consumerServiceTest = cf . createConsumer ();
consumerServiceTest . subscribe ( Collections . singleton ( kafkaPrefix + "topic_name" ));
return consumerServiceTest ;
}
And the KafkaMessage class is:
import com.fasterxml.jackson.annotation.JsonProperty ;
import lombok.Data ;
@Data
public class KafkaMessage {
@JsonProperty ( "id" )
String id ;
@JsonProperty ( "responseTesting" )
ResponseTesting responseTesting ;
}