Polished KafkaBinderTests

polished KafkaBinderTests to account for changes in core related to partitioning - 00748985d6

Resolves #272
This commit is contained in:
Oleg Zhurakousky
2018-01-17 15:52:06 -05:00
parent 39dd68a8ad
commit bc562e3a77

View File

@@ -84,6 +84,7 @@ import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
@@ -580,7 +581,6 @@ public class KafkaBinderTests extends
AbstractKafkaTestBinder binder = getBinder();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setPartitionCount(2);
producerProperties.getExtension().setHeaderPatterns(new String[]{MessageHeaders.CONTENT_TYPE});
DirectChannel moduleOutputChannel = createBindableChannel("output",
@@ -660,7 +660,6 @@ public class KafkaBinderTests extends
Binder binder = getBinder();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setPartitionCount(10);
BindingProperties producerBindingProperties = createProducerBindingProperties(
producerProperties);
@@ -719,7 +718,6 @@ public class KafkaBinderTests extends
public void testDefaultAutoCommitOnErrorWithDlq() throws Exception {
Binder binder = getBinder();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setPartitionCount(10);
BindingProperties producerBindingProperties = createProducerBindingProperties(
producerProperties);
@@ -788,7 +786,6 @@ public class KafkaBinderTests extends
Binder binder = getBinder();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setPartitionCount(10);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setMaxAttempts(3);
consumerProperties.setBackOffInitialInterval(100);
@@ -1614,6 +1611,7 @@ public class KafkaBinderTests extends
QueueChannel moduleInputChannel = new QueueChannel();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setPartitionCount(10);
producerProperties.setPartitionKeyExpression(new LiteralExpression("foo"));
DirectChannel moduleOutputChannel = createBindableChannel("output",
createProducerBindingProperties(producerProperties));