diff --git a/pom.xml b/pom.xml index a27db9b8..ccd8e153 100644 --- a/pom.xml +++ b/pom.xml @@ -7,14 +7,14 @@ org.springframework.cloud spring-cloud-build - 3.0.0-M4 + 3.0.0-SNAPSHOT 1.8 - 2.5.3.RELEASE - 3.3.0.RELEASE - 2.5.0 + 2.6.3-SNAPSHOT + 5.4.0-SNAPSHOT + 2.6.0 1.1.0-SNAPSHOT 3.1.0-SNAPSHOT true @@ -51,13 +51,7 @@ kafka-clients ${kafka.version} - - org.apache.kafka - kafka-clients - ${kafka.version} - test - test - + org.springframework.kafka spring-kafka @@ -93,7 +87,13 @@ org.apache.kafka - kafka_2.11 + kafka_2.13 + ${kafka.version} + test + + + org.apache.kafka + kafka_2.13 test test ${kafka.version} @@ -112,6 +112,13 @@ + + org.apache.kafka + kafka-clients + ${kafka.version} + test + test + org.springframework.cloud spring-cloud-schema-registry-client diff --git a/spring-cloud-stream-binder-kafka-streams/pom.xml b/spring-cloud-stream-binder-kafka-streams/pom.xml index 3dcf71af..1b4c321c 100644 --- a/spring-cloud-stream-binder-kafka-streams/pom.xml +++ b/spring-cloud-stream-binder-kafka-streams/pom.xml @@ -64,19 +64,14 @@ spring-boot-autoconfigure-processor true - org.apache.kafka - kafka_2.12 - ${kafka.version} - test + kafka_2.13 org.apache.kafka - kafka_2.12 - ${kafka.version} + kafka_2.13 test - test diff --git a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsFunctionStateStoreTests.java b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsFunctionStateStoreTests.java index 12c9e619..1c703149 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsFunctionStateStoreTests.java +++ b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsFunctionStateStoreTests.java @@ -30,7 +30,6 @@ import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.springframework.boot.SpringApplication; @@ -55,14 +54,13 @@ public class KafkaStreamsFunctionStateStoreTests { private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka(); @Test - @Ignore public void testKafkaStreamsFuncionWithMultipleStateStores() throws Exception { SpringApplication app = new SpringApplication(StateStoreTestApplication.class); app.setWebApplicationType(WebApplicationType.NONE); try (ConfigurableApplicationContext context = app.run("--server.port=0", "--spring.jmx.enabled=false", - "--spring.cloud.stream.function.definition=process", + "--spring.cloud.stream.function.definition=process;hello", "--spring.cloud.stream.bindings.process-in-0.destination=words", "--spring.cloud.stream.bindings.hello-in-0.destination=words", "--spring.cloud.stream.kafka.streams.binder.functions.process.applicationId=testKafkaStreamsFuncionWithMultipleStateStores-123", diff --git a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/MultipleFunctionsInSameAppTests.java b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/MultipleFunctionsInSameAppTests.java index 484cc457..c6c2aa7f 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/MultipleFunctionsInSameAppTests.java +++ b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/MultipleFunctionsInSameAppTests.java @@ -31,7 +31,6 @@ import org.apache.kafka.streams.kstream.KStream; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.springframework.boot.SpringApplication; @@ -50,7 +49,6 @@ import org.springframework.util.Assert; import static org.assertj.core.api.Assertions.assertThat; -@Ignore public class MultipleFunctionsInSameAppTests { @ClassRule diff --git a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/StreamToGlobalKTableFunctionTests.java b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/StreamToGlobalKTableFunctionTests.java index 60c11c01..84648e36 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/StreamToGlobalKTableFunctionTests.java +++ b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/StreamToGlobalKTableFunctionTests.java @@ -36,7 +36,6 @@ import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.WallclockTimestampExtractor; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.springframework.boot.SpringApplication; @@ -57,7 +56,6 @@ import org.springframework.kafka.test.utils.KafkaTestUtils; import static org.assertj.core.api.Assertions.assertThat; -@Ignore public class StreamToGlobalKTableFunctionTests { @ClassRule diff --git a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/PerRecordAvroContentTypeTests.java b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/PerRecordAvroContentTypeTests.java index f87eaf1d..00c745a3 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/PerRecordAvroContentTypeTests.java +++ b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/PerRecordAvroContentTypeTests.java @@ -32,7 +32,6 @@ import org.apache.kafka.streams.kstream.KStream; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.springframework.boot.SpringApplication; @@ -98,7 +97,6 @@ public class PerRecordAvroContentTypeTests { } @Test - @Ignore public void testPerRecordAvroConentTypeAndVerifySerialization() throws Exception { SpringApplication app = new SpringApplication(SensorCountAvroApplication.class); app.setWebApplicationType(WebApplicationType.NONE); diff --git a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/serde/MessageConverterDelegateSerdeTest.java b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/serde/MessageConverterDelegateSerdeTest.java index 4bface43..82c68ff0 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/serde/MessageConverterDelegateSerdeTest.java +++ b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/serde/MessageConverterDelegateSerdeTest.java @@ -25,7 +25,6 @@ import java.util.UUID; import com.example.Sensor; import com.fasterxml.jackson.databind.ObjectMapper; -import org.junit.Ignore; import org.junit.Test; import org.springframework.cloud.schema.registry.avro.AvroSchemaMessageConverter; @@ -44,7 +43,6 @@ public class MessageConverterDelegateSerdeTest { @Test @SuppressWarnings("unchecked") - @Ignore public void testCompositeNonNativeSerdeUsingAvroContentType() { Random random = new Random(); Sensor sensor = new Sensor(); diff --git a/spring-cloud-stream-binder-kafka/pom.xml b/spring-cloud-stream-binder-kafka/pom.xml index d03f7745..67292441 100644 --- a/spring-cloud-stream-binder-kafka/pom.xml +++ b/spring-cloud-stream-binder-kafka/pom.xml @@ -64,21 +64,16 @@ org.apache.kafka kafka-clients - ${kafka.version} test org.apache.kafka - kafka_2.12 - ${kafka.version} - test + kafka_2.13 org.apache.kafka - kafka_2.12 - ${kafka.version} + kafka_2.13 test - test diff --git a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java index 4ee82da7..db4f40ba 100644 --- a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java +++ b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java @@ -1464,7 +1464,7 @@ public class KafkaMessageChannelBinder extends super.onInit(); } catch (Exception ex) { - this.logger.error("Initialization errors: ", ex); + this.logger.error(ex, "Initialization errors: "); throw new RuntimeException(ex); } } diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderJaasInitializerListenerTest.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderJaasInitializerListenerTest.java index fdcb21df..5d1e4064 100644 --- a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderJaasInitializerListenerTest.java +++ b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderJaasInitializerListenerTest.java @@ -22,18 +22,15 @@ import com.sun.security.auth.login.ConfigFile; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.ConfigurationPropertiesBindException; -import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration; import org.springframework.core.io.ClassPathResource; import org.springframework.kafka.test.rule.EmbeddedKafkaRule; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; /** * @author Marius Bogoevici @@ -46,6 +43,10 @@ public class KafkaBinderJaasInitializerListenerTest { @ClassRule public static EmbeddedKafkaRule kafkaEmbedded = new EmbeddedKafkaRule(1, true); + + private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withUserConfiguration(KafkaBinderConfiguration.class, KafkaAutoConfiguration.class); + @BeforeClass public static void setup() { System.setProperty(KAFKA_BROKERS_PROPERTY, @@ -58,7 +59,6 @@ public class KafkaBinderJaasInitializerListenerTest { } @Test - @Ignore("CI randomly fails this test, need to investigate further. ") public void testConfigurationParsedCorrectlyWithKafkaClientAndDefaultControlFlag() throws Exception { ConfigFile configFile = new ConfigFile( @@ -66,28 +66,28 @@ public class KafkaBinderJaasInitializerListenerTest { final AppConfigurationEntry[] kafkaConfigurationArray = configFile .getAppConfigurationEntry("KafkaClient"); - final ConfigurableApplicationContext context = SpringApplication.run( - SimpleApplication.class, - "--spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true", - "--spring.cloud.stream.kafka.binder.jaas.options.storeKey=true", - "--spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab", - "--spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM", - "--spring.jmx.enabled=false"); - javax.security.auth.login.Configuration configuration = javax.security.auth.login.Configuration - .getConfiguration(); - final AppConfigurationEntry[] kafkaConfiguration = configuration - .getAppConfigurationEntry("KafkaClient"); - assertThat(kafkaConfiguration).hasSize(1); - assertThat(kafkaConfiguration[0].getOptions()) - .isEqualTo(kafkaConfigurationArray[0].getOptions()); - assertThat(kafkaConfiguration[0].getControlFlag()) - .isEqualTo(AppConfigurationEntry.LoginModuleControlFlag.REQUIRED); - context.close(); + this.contextRunner + .withPropertyValues("spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true", + "spring.cloud.stream.kafka.binder.jaas.options.storeKey=true", + "spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab", + "spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM", + "spring.jmx.enabled=false") + .run(context -> { + javax.security.auth.login.Configuration configuration = javax.security.auth.login.Configuration + .getConfiguration(); + + final AppConfigurationEntry[] kafkaConfiguration = configuration + .getAppConfigurationEntry("KafkaClient"); + assertThat(kafkaConfiguration).hasSize(1); + assertThat(kafkaConfiguration[0].getOptions()) + .isEqualTo(kafkaConfigurationArray[0].getOptions()); + assertThat(kafkaConfiguration[0].getControlFlag()) + .isEqualTo(AppConfigurationEntry.LoginModuleControlFlag.REQUIRED); + }); } @Test - @Ignore("CI randomly fails this test, need to investigate further. ") public void testConfigurationParsedCorrectlyWithKafkaClientAndNonDefaultControlFlag() throws Exception { ConfigFile configFile = new ConfigFile( @@ -95,47 +95,25 @@ public class KafkaBinderJaasInitializerListenerTest { final AppConfigurationEntry[] kafkaConfigurationArray = configFile .getAppConfigurationEntry("KafkaClient"); - final ConfigurableApplicationContext context = SpringApplication.run( - SimpleApplication.class, - "--spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true", - "--spring.cloud.stream.kafka.binder.jaas.controlFlag=requisite", - "--spring.cloud.stream.kafka.binder.jaas.options.storeKey=true", - "--spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab", - "--spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM", - "--spring.jmx.enabled=false"); - javax.security.auth.login.Configuration configuration = javax.security.auth.login.Configuration - .getConfiguration(); + this.contextRunner + .withPropertyValues("spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true", + "spring.cloud.stream.kafka.binder.jaas.controlFlag=requisite", + "spring.cloud.stream.kafka.binder.jaas.options.storeKey=true", + "spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab", + "spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM", + "spring.jmx.enabled=false") + .run(context -> { + javax.security.auth.login.Configuration configuration = javax.security.auth.login.Configuration + .getConfiguration(); - final AppConfigurationEntry[] kafkaConfiguration = configuration - .getAppConfigurationEntry("KafkaClient"); - assertThat(kafkaConfiguration).hasSize(1); - assertThat(kafkaConfiguration[0].getOptions()) - .isEqualTo(kafkaConfigurationArray[0].getOptions()); - assertThat(kafkaConfiguration[0].getControlFlag()) - .isEqualTo(AppConfigurationEntry.LoginModuleControlFlag.REQUISITE); - context.close(); + final AppConfigurationEntry[] kafkaConfiguration = configuration + .getAppConfigurationEntry("KafkaClient"); + assertThat(kafkaConfiguration).hasSize(1); + assertThat(kafkaConfiguration[0].getOptions()) + .isEqualTo(kafkaConfigurationArray[0].getOptions()); + assertThat(kafkaConfiguration[0].getControlFlag()) + .isEqualTo(AppConfigurationEntry.LoginModuleControlFlag.REQUISITE); + context.close(); + }); } - - @Test - public void testConfigurationWithUnknownControlFlag() throws Exception { - ConfigFile configFile = new ConfigFile( - new ClassPathResource("jaas-sample-kafka-only.conf").getURI()); - - assertThatThrownBy(() -> SpringApplication.run(SimpleApplication.class, - "--spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true", - "--spring.cloud.stream.kafka.binder.jaas.controlFlag=unknown", - "--spring.cloud.stream.kafka.binder.jaas.options.storeKey=true", - "--spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab", - "--spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM", - "--spring.jmx.enabled=false")) - .isInstanceOf(ConfigurationPropertiesBindException.class) - .hasMessageContaining( - "Error creating bean with name 'configurationProperties'"); - } - - @SpringBootApplication - public static class SimpleApplication { - - } - }