Kafka integration testing (embedded Kafka) · November 16, 2021 java kafka Simulation of a consumer for testing our producer. @ExtendWith(SpringExtension.class) @SpringBootTest @DirtiesContext @EmbeddedKafka(topics = { "TOPIC_OUT"}) public class ProducerServiceIntegrationTest { private static final String TOPIC_OUT = "TOPIC_OUT"; @Autowired private EmbeddedKafkaBroker embeddedKafkaBroker; @Autowired private ProducerService producerService; public ExampleDTO mockExampleDTO(String name, String description) { ExampleDTO exampleDTO = new ExampleDTO(); exampleDTO.setDescription(description); exampleDTO.setName(name); return exampleDTO; } /** * We verify the output in the topic. With an simulated consumer. */ @Test public void itShould_ProduceCorrectExampleDTO_to_TOPIC_OUT() { // GIVEN ExampleDTO exampleDTO = mockExampleDTO("name", "description"); // simulation consumer Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group_consumer_test", "false", embeddedKafkaBroker); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); Consumer<String, ExampleDTO> consumerServiceTest = new KafkaConsumer<String, ExampleDTO>( consumerProps, new StringDeserializer(), new JsonDeserializer<>(ExampleDTO.class, false)); embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumerServiceTest, TOPIC_OUT); // WHEN producerService.send(exampleDTO); // THEN ConsumerRecord<String, ExampleDTO> consumerRecordOfExampleDTO = KafkaTestUtils.getSingleRecord(consumerServiceTest, TOPIC_OUT); ExampleDTO valueReceived = consumerRecordOfExampleDTO.value(); assertEquals("description", valueReceived.getDescription()); assertEquals("name", valueReceived.getName()); consumerServiceTest.close(); }} Simulation of a producer for testing our consumer. @ExtendWith(SpringExtension.class) @SpringBootTest @DirtiesContext @EmbeddedKafka(topics = {"TOPIC_IN"}) public class ConsumerServiceIntegrationTest { Logger log = LoggerFactory.getLogger(ConsumerServiceIntegrationTest.class); private static final String TOPIC_IN = "TOPIC_IN"; @Autowired private EmbeddedKafkaBroker embeddedKafkaBroker; @Autowired private ExampleRepository exampleRepository; public ExampleDTO mockExampleDTO(String name, String description) { ExampleDTO exampleDTO = new ExampleDTO(); exampleDTO.setDescription(description); exampleDTO.setName(name); return exampleDTO; } /** * We verify the output in the topic. But aslo in the database. */ @Test public void itShould_ConsumeCorrectExampleDTO_from_TOPIC_IN_and_should_saveCorrectExampleEntity() throws ExecutionException, InterruptedException { // GIVEN ExampleDTO exampleDTO = mockExampleDTO("Un nom 2", "Une description 2"); // simulation producer Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafkaBroker.getBrokersAsString()); log.info("props {}", producerProps); Producer<String, ExampleDTO> producerTest = new KafkaProducer(producerProps, new StringSerializer(), new JsonSerializer<ExampleDTO>()); // WHEN producerTest.send(new ProducerRecord(TOPIC_IN, "", exampleDTO)); // THEN // we must have 1 entity inserted // We cannot predict when the insertion into the database will occur. So we wait until the value is present. Thank to Awaitility. await().atMost(Durations.TEN_SECONDS).untilAsserted(() -> { var exampleEntityList = exampleRepository.findAll(); assertEquals(1, exampleEntityList.size()); ExampleEntity firstEntity = exampleEntityList.get(0); assertEquals(exampleDTO.getDescription(), firstEntity.getDescription()); assertEquals(exampleDTO.getName(), firstEntity.getName()); }); producerTest.close(); } } All the infomation comes from this blog. And all the code is inside this github repository. Share: Twitter, Facebook