- Simulation of a consumer for testing our producer.
@ExtendWith(SpringExtension.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "TOPIC_OUT"})
public class ProducerServiceIntegrationTest {
private static final String TOPIC_OUT = "TOPIC_OUT";
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
private ProducerService producerService;
public ExampleDTO mockExampleDTO(String name, String description) {
ExampleDTO exampleDTO = new ExampleDTO();
exampleDTO.setDescription(description);
exampleDTO.setName(name);
return exampleDTO;
}
/**
* We verify the output in the topic. With an simulated consumer.
*/
@Test
public void itShould_ProduceCorrectExampleDTO_to_TOPIC_OUT() {
// GIVEN
ExampleDTO exampleDTO = mockExampleDTO("name", "description");
// simulation consumer
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group_consumer_test", "false", embeddedKafkaBroker);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer<String, ExampleDTO> consumerServiceTest = new KafkaConsumer<String, ExampleDTO>(
consumerProps, new StringDeserializer(),
new JsonDeserializer<>(ExampleDTO.class,
false));
embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumerServiceTest, TOPIC_OUT);
// WHEN
producerService.send(exampleDTO);
// THEN
ConsumerRecord<String, ExampleDTO> consumerRecordOfExampleDTO = KafkaTestUtils.getSingleRecord(consumerServiceTest, TOPIC_OUT);
ExampleDTO valueReceived = consumerRecordOfExampleDTO.value();
assertEquals("description", valueReceived.getDescription());
assertEquals("name", valueReceived.getName());
consumerServiceTest.close();
}}
- Simulation of a producer for testing our consumer.
@ExtendWith(SpringExtension.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = {"TOPIC_IN"})
public class ConsumerServiceIntegrationTest {
Logger log = LoggerFactory.getLogger(ConsumerServiceIntegrationTest.class);
private static final String TOPIC_IN = "TOPIC_IN";
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
private ExampleRepository exampleRepository;
public ExampleDTO mockExampleDTO(String name, String description) {
ExampleDTO exampleDTO = new ExampleDTO();
exampleDTO.setDescription(description);
exampleDTO.setName(name);
return exampleDTO;
}
/**
* We verify the output in the topic. But aslo in the database.
*/
@Test
public void itShould_ConsumeCorrectExampleDTO_from_TOPIC_IN_and_should_saveCorrectExampleEntity() throws ExecutionException, InterruptedException {
// GIVEN
ExampleDTO exampleDTO = mockExampleDTO("Un nom 2", "Une description 2");
// simulation producer
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafkaBroker.getBrokersAsString());
log.info("props {}", producerProps);
Producer<String, ExampleDTO> producerTest = new KafkaProducer(producerProps, new StringSerializer(), new JsonSerializer<ExampleDTO>());
// WHEN
producerTest.send(new ProducerRecord(TOPIC_IN, "", exampleDTO));
// THEN
// we must have 1 entity inserted
// We cannot predict when the insertion into the database will occur. So we wait until the value is present. Thank to Awaitility.
await().atMost(Durations.TEN_SECONDS).untilAsserted(() -> {
var exampleEntityList = exampleRepository.findAll();
assertEquals(1, exampleEntityList.size());
ExampleEntity firstEntity = exampleEntityList.get(0);
assertEquals(exampleDTO.getDescription(), firstEntity.getDescription());
assertEquals(exampleDTO.getName(), firstEntity.getName());
});
producerTest.close();
}
}
All the infomation comes from this blog.
And all the code is inside this github repository.