Upgrade dependencies

Spring Kafka -> 2.6.3-SNAPSHOT
Spring Integration Kafka -> 5.4.0-SNAPSHOT
Kafka version -> 2.6.0
Use Kafka_2.13 for tests

Ungignore the Jaas security tests.
Unignore a few Kafka Streams binder tests.
This commit is contained in:
Soby Chacko
2020-10-16 16:44:32 -04:00
parent 50ec8f0919
commit 514530db22
10 changed files with 69 additions and 104 deletions

31
pom.xml
View File

@@ -7,14 +7,14 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>3.0.0-M4</version>
<version>3.0.0-SNAPSHOT</version>
<relativePath />
</parent>
<properties>
<java.version>1.8</java.version>
<spring-kafka.version>2.5.3.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>3.3.0.RELEASE</spring-integration-kafka.version>
<kafka.version>2.5.0</kafka.version>
<spring-kafka.version>2.6.3-SNAPSHOT</spring-kafka.version>
<spring-integration-kafka.version>5.4.0-SNAPSHOT</spring-integration-kafka.version>
<kafka.version>2.6.0</kafka.version>
<spring-cloud-schema-registry.version>1.1.0-SNAPSHOT</spring-cloud-schema-registry.version>
<spring-cloud-stream.version>3.1.0-SNAPSHOT</spring-cloud-stream.version>
<maven-checkstyle-plugin.failsOnError>true</maven-checkstyle-plugin.failsOnError>
@@ -51,13 +51,7 @@
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
@@ -93,7 +87,13 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<artifactId>kafka_2.13</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<classifier>test</classifier>
<scope>test</scope>
<version>${kafka.version}</version>
@@ -112,6 +112,13 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-schema-registry-client</artifactId>

View File

@@ -64,19 +64,14 @@
<artifactId>spring-boot-autoconfigure-processor</artifactId>
<optional>true</optional>
</dependency>
<!-- Following dependencies are needed to support Kafka 1.1.0 client-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
<artifactId>kafka_2.13</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>${kafka.version}</version>
<artifactId>kafka_2.13</artifactId>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<!-- Following dependencies are only provided for testing and won't be packaged with the binder apps-->
<dependency>

View File

@@ -30,7 +30,6 @@ import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
@@ -55,14 +54,13 @@ public class KafkaStreamsFunctionStateStoreTests {
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
@Test
@Ignore
public void testKafkaStreamsFuncionWithMultipleStateStores() throws Exception {
SpringApplication app = new SpringApplication(StateStoreTestApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.function.definition=process",
"--spring.cloud.stream.function.definition=process;hello",
"--spring.cloud.stream.bindings.process-in-0.destination=words",
"--spring.cloud.stream.bindings.hello-in-0.destination=words",
"--spring.cloud.stream.kafka.streams.binder.functions.process.applicationId=testKafkaStreamsFuncionWithMultipleStateStores-123",

View File

@@ -31,7 +31,6 @@ import org.apache.kafka.streams.kstream.KStream;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
@@ -50,7 +49,6 @@ import org.springframework.util.Assert;
import static org.assertj.core.api.Assertions.assertThat;
@Ignore
public class MultipleFunctionsInSameAppTests {
@ClassRule

View File

@@ -36,7 +36,6 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
@@ -57,7 +56,6 @@ import org.springframework.kafka.test.utils.KafkaTestUtils;
import static org.assertj.core.api.Assertions.assertThat;
@Ignore
public class StreamToGlobalKTableFunctionTests {
@ClassRule

View File

@@ -32,7 +32,6 @@ import org.apache.kafka.streams.kstream.KStream;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
@@ -98,7 +97,6 @@ public class PerRecordAvroContentTypeTests {
}
@Test
@Ignore
public void testPerRecordAvroConentTypeAndVerifySerialization() throws Exception {
SpringApplication app = new SpringApplication(SensorCountAvroApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);

View File

@@ -25,7 +25,6 @@ import java.util.UUID;
import com.example.Sensor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.cloud.schema.registry.avro.AvroSchemaMessageConverter;
@@ -44,7 +43,6 @@ public class MessageConverterDelegateSerdeTest {
@Test
@SuppressWarnings("unchecked")
@Ignore
public void testCompositeNonNativeSerdeUsingAvroContentType() {
Random random = new Random();
Sensor sensor = new Sensor();

View File

@@ -64,21 +64,16 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
<artifactId>kafka_2.13</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>${kafka.version}</version>
<artifactId>kafka_2.13</artifactId>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
</dependencies>

View File

@@ -1464,7 +1464,7 @@ public class KafkaMessageChannelBinder extends
super.onInit();
}
catch (Exception ex) {
this.logger.error("Initialization errors: ", ex);
this.logger.error(ex, "Initialization errors: ");
throw new RuntimeException(ex);
}
}

View File

@@ -22,18 +22,15 @@ import com.sun.security.auth.login.ConfigFile;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationPropertiesBindException;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/**
* @author Marius Bogoevici
@@ -46,6 +43,10 @@ public class KafkaBinderJaasInitializerListenerTest {
@ClassRule
public static EmbeddedKafkaRule kafkaEmbedded = new EmbeddedKafkaRule(1, true);
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withUserConfiguration(KafkaBinderConfiguration.class, KafkaAutoConfiguration.class);
@BeforeClass
public static void setup() {
System.setProperty(KAFKA_BROKERS_PROPERTY,
@@ -58,7 +59,6 @@ public class KafkaBinderJaasInitializerListenerTest {
}
@Test
@Ignore("CI randomly fails this test, need to investigate further. ")
public void testConfigurationParsedCorrectlyWithKafkaClientAndDefaultControlFlag()
throws Exception {
ConfigFile configFile = new ConfigFile(
@@ -66,28 +66,28 @@ public class KafkaBinderJaasInitializerListenerTest {
final AppConfigurationEntry[] kafkaConfigurationArray = configFile
.getAppConfigurationEntry("KafkaClient");
final ConfigurableApplicationContext context = SpringApplication.run(
SimpleApplication.class,
"--spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true",
"--spring.cloud.stream.kafka.binder.jaas.options.storeKey=true",
"--spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab",
"--spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM",
"--spring.jmx.enabled=false");
javax.security.auth.login.Configuration configuration = javax.security.auth.login.Configuration
.getConfiguration();
final AppConfigurationEntry[] kafkaConfiguration = configuration
.getAppConfigurationEntry("KafkaClient");
assertThat(kafkaConfiguration).hasSize(1);
assertThat(kafkaConfiguration[0].getOptions())
.isEqualTo(kafkaConfigurationArray[0].getOptions());
assertThat(kafkaConfiguration[0].getControlFlag())
.isEqualTo(AppConfigurationEntry.LoginModuleControlFlag.REQUIRED);
context.close();
this.contextRunner
.withPropertyValues("spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true",
"spring.cloud.stream.kafka.binder.jaas.options.storeKey=true",
"spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab",
"spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM",
"spring.jmx.enabled=false")
.run(context -> {
javax.security.auth.login.Configuration configuration = javax.security.auth.login.Configuration
.getConfiguration();
final AppConfigurationEntry[] kafkaConfiguration = configuration
.getAppConfigurationEntry("KafkaClient");
assertThat(kafkaConfiguration).hasSize(1);
assertThat(kafkaConfiguration[0].getOptions())
.isEqualTo(kafkaConfigurationArray[0].getOptions());
assertThat(kafkaConfiguration[0].getControlFlag())
.isEqualTo(AppConfigurationEntry.LoginModuleControlFlag.REQUIRED);
});
}
@Test
@Ignore("CI randomly fails this test, need to investigate further. ")
public void testConfigurationParsedCorrectlyWithKafkaClientAndNonDefaultControlFlag()
throws Exception {
ConfigFile configFile = new ConfigFile(
@@ -95,47 +95,25 @@ public class KafkaBinderJaasInitializerListenerTest {
final AppConfigurationEntry[] kafkaConfigurationArray = configFile
.getAppConfigurationEntry("KafkaClient");
final ConfigurableApplicationContext context = SpringApplication.run(
SimpleApplication.class,
"--spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true",
"--spring.cloud.stream.kafka.binder.jaas.controlFlag=requisite",
"--spring.cloud.stream.kafka.binder.jaas.options.storeKey=true",
"--spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab",
"--spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM",
"--spring.jmx.enabled=false");
javax.security.auth.login.Configuration configuration = javax.security.auth.login.Configuration
.getConfiguration();
this.contextRunner
.withPropertyValues("spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true",
"spring.cloud.stream.kafka.binder.jaas.controlFlag=requisite",
"spring.cloud.stream.kafka.binder.jaas.options.storeKey=true",
"spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab",
"spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM",
"spring.jmx.enabled=false")
.run(context -> {
javax.security.auth.login.Configuration configuration = javax.security.auth.login.Configuration
.getConfiguration();
final AppConfigurationEntry[] kafkaConfiguration = configuration
.getAppConfigurationEntry("KafkaClient");
assertThat(kafkaConfiguration).hasSize(1);
assertThat(kafkaConfiguration[0].getOptions())
.isEqualTo(kafkaConfigurationArray[0].getOptions());
assertThat(kafkaConfiguration[0].getControlFlag())
.isEqualTo(AppConfigurationEntry.LoginModuleControlFlag.REQUISITE);
context.close();
final AppConfigurationEntry[] kafkaConfiguration = configuration
.getAppConfigurationEntry("KafkaClient");
assertThat(kafkaConfiguration).hasSize(1);
assertThat(kafkaConfiguration[0].getOptions())
.isEqualTo(kafkaConfigurationArray[0].getOptions());
assertThat(kafkaConfiguration[0].getControlFlag())
.isEqualTo(AppConfigurationEntry.LoginModuleControlFlag.REQUISITE);
context.close();
});
}
@Test
public void testConfigurationWithUnknownControlFlag() throws Exception {
ConfigFile configFile = new ConfigFile(
new ClassPathResource("jaas-sample-kafka-only.conf").getURI());
assertThatThrownBy(() -> SpringApplication.run(SimpleApplication.class,
"--spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true",
"--spring.cloud.stream.kafka.binder.jaas.controlFlag=unknown",
"--spring.cloud.stream.kafka.binder.jaas.options.storeKey=true",
"--spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab",
"--spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM",
"--spring.jmx.enabled=false"))
.isInstanceOf(ConfigurationPropertiesBindException.class)
.hasMessageContaining(
"Error creating bean with name 'configurationProperties'");
}
@SpringBootApplication
public static class SimpleApplication {
}
}