Fixing a random test failure on CI

This commit is contained in:
Soby Chacko
2020-05-22 16:11:18 -04:00
parent 0ac83e510f
commit b6c94c96e5

View File

@@ -66,14 +66,15 @@ import static org.mockito.Mockito.verify;
@RunWith(SpringRunner.class)
@ContextConfiguration
@DirtiesContext
@Ignore
public abstract class DeserializationErrorHandlerByKafkaTests {
@ClassRule
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true,
"DeserializationErrorHandlerByKafkaTests-In",
"abc-DeserializationErrorHandlerByKafkaTests-In",
"xyz-DeserializationErrorHandlerByKafkaTests-In",
"DeserializationErrorHandlerByKafkaTests-out",
"error.DeserializationErrorHandlerByKafkaTests-In.group",
"error.abc-DeserializationErrorHandlerByKafkaTests-In.group",
"error.xyz-DeserializationErrorHandlerByKafkaTests-In.group",
"error.word1.groupx",
"error.word2.groupx");
@@ -99,7 +100,7 @@ public abstract class DeserializationErrorHandlerByKafkaTests {
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(
consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "DeserializationErrorHandlerByKafkaTests-out");
embeddedKafka.consumeFromEmbeddedTopics(consumer, "DeserializationErrorHandlerByKafkaTests-out", "DeserializationErrorHandlerByKafkaTests-out");
}
@AfterClass
@@ -111,6 +112,8 @@ public abstract class DeserializationErrorHandlerByKafkaTests {
}
@SpringBootTest(properties = {
"spring.cloud.stream.bindings.input.destination=abc-DeserializationErrorHandlerByKafkaTests-In",
"spring.cloud.stream.bindings.output.destination=DeserializationErrorHandlerByKafkaTests-Out",
"spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id=deser-kafka-dlq",
"spring.cloud.stream.bindings.input.group=group",
"spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler=sendToDlq",
@@ -125,7 +128,7 @@ public abstract class DeserializationErrorHandlerByKafkaTests {
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(
senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("DeserializationErrorHandlerByKafkaTests-In");
template.setDefaultTopic("abc-DeserializationErrorHandlerByKafkaTests-In");
template.sendDefault(1, null, "foobar");
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("foobar",
@@ -134,10 +137,10 @@ public abstract class DeserializationErrorHandlerByKafkaTests {
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(
consumerProps);
Consumer<String, String> consumer1 = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer1, "error.DeserializationErrorHandlerByKafkaTests-In.group");
embeddedKafka.consumeFromAnEmbeddedTopic(consumer1, "error.abc-DeserializationErrorHandlerByKafkaTests-In.group");
ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer1,
"error.DeserializationErrorHandlerByKafkaTests-In.group");
"error.abc-DeserializationErrorHandlerByKafkaTests-In.group");
assertThat(cr.value()).isEqualTo("foobar");
assertThat(cr.partition()).isEqualTo(0); // custom partition function
@@ -150,6 +153,8 @@ public abstract class DeserializationErrorHandlerByKafkaTests {
}
@SpringBootTest(properties = {
"spring.cloud.stream.bindings.input.destination=xyz-DeserializationErrorHandlerByKafkaTests-In",
"spring.cloud.stream.bindings.output.destination=DeserializationErrorHandlerByKafkaTests-Out",
"spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id=deser-kafka-dlq",
"spring.cloud.stream.bindings.input.group=group",
"spring.cloud.stream.kafka.streams.bindings.input.consumer.deserializationExceptionHandler=sendToDlq",
@@ -164,7 +169,7 @@ public abstract class DeserializationErrorHandlerByKafkaTests {
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(
senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("DeserializationErrorHandlerByKafkaTests-In");
template.setDefaultTopic("xyz-DeserializationErrorHandlerByKafkaTests-In");
template.sendDefault(1, null, "foobar");
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("foobar",
@@ -173,10 +178,10 @@ public abstract class DeserializationErrorHandlerByKafkaTests {
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(
consumerProps);
Consumer<String, String> consumer1 = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer1, "error.DeserializationErrorHandlerByKafkaTests-In.group");
embeddedKafka.consumeFromAnEmbeddedTopic(consumer1, "error.xyz-DeserializationErrorHandlerByKafkaTests-In.group");
ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer1,
"error.DeserializationErrorHandlerByKafkaTests-In.group");
"error.xyz-DeserializationErrorHandlerByKafkaTests-In.group");
assertThat(cr.value()).isEqualTo("foobar");
assertThat(cr.partition()).isEqualTo(0); // custom partition function