diff --git a/docs/src/main/asciidoc/overview.adoc b/docs/src/main/asciidoc/overview.adoc index 6c1ff011..ad33bb54 100644 --- a/docs/src/main/asciidoc/overview.adoc +++ b/docs/src/main/asciidoc/overview.adoc @@ -149,6 +149,14 @@ Flag to set the binder health as `down`, when any partitions on the topic, regar + Default: `false`. +spring.cloud.stream.kafka.binder.certificateStoreDirectory:: +When the truststore or keystore certificate location is given as a classpath URL (`classpath:...`), the binder copies the resource from the classpath location inside the JAR file to a location on the filesystem. +The file will be moved to the location specified as the value for this property which must be an existing directory on the filesystem that is writable by the process running the application. +If this value is not set and the certificate file is a classpath resource, then it will be moved to System's temp directory as returned by `System.getProperty("java.io.tmpdir")`. +This is also true, if this value is present, but the directory cannot be found on the filesystem or is not writable. ++ +Default: none. + [[kafka-consumer-properties]] ==== Kafka Consumer Properties diff --git a/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationProperties.java b/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationProperties.java index 0dab3dee..3b0cb124 100644 --- a/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationProperties.java +++ b/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationProperties.java @@ -16,6 +16,13 @@ package org.springframework.cloud.stream.binder.kafka.properties; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; import java.time.Duration; import java.util.Collections; import java.util.HashMap; @@ -35,6 +42,8 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.cloud.stream.binder.HeaderMode; import org.springframework.cloud.stream.binder.ProducerProperties; import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties.CompressionType; +import org.springframework.core.io.DefaultResourceLoader; +import org.springframework.core.io.Resource; import org.springframework.expression.Expression; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; @@ -118,6 +127,14 @@ public class KafkaBinderConfigurationProperties { */ private Duration authorizationExceptionRetryInterval; + /** + * When a certificate store location is given as classpath URL (classpath:), then the binder + * moves the resource from the classpath location inside the JAR to a location on + * the filesystem. If this value is set, then this location is used, otherwise, the + * certificate file is copied to the directory returned by java.io.tmpdir. + */ + private String certificateStoreDirectory; + public KafkaBinderConfigurationProperties(KafkaProperties kafkaProperties) { Assert.notNull(kafkaProperties, "'kafkaProperties' cannot be null"); this.kafkaProperties = kafkaProperties; @@ -132,9 +149,62 @@ public class KafkaBinderConfigurationProperties { } public String getKafkaConnectionString() { + // We need to do a check on certificate file locations to see if they are given as classpath resources. + // If that is the case, then we will move them to a file system location and use those as the certificate locations. + // This is due to a limitation in Kafka itself in which it doesn't allow reading certificate resources from the classpath. + // See this: https://issues.apache.org/jira/browse/KAFKA-7685 + // and this: https://cwiki.apache.org/confluence/display/KAFKA/KIP-398%3A+Support+reading+trust+store+from+classpath + moveCertsToFileSystemIfNecessary(); + return toConnectionString(this.brokers, this.defaultBrokerPort); } + private void moveCertsToFileSystemIfNecessary() { + try { + final String trustStoreLocation = this.configuration.get("ssl.truststore.location"); + if (trustStoreLocation != null && trustStoreLocation.startsWith("classpath:")) { + final String fileSystemLocation = moveCertToFileSystem(trustStoreLocation, this.certificateStoreDirectory); + // Overriding the value with absolute filesystem path. + this.configuration.put("ssl.truststore.location", fileSystemLocation); + } + final String keyStoreLocation = this.configuration.get("ssl.keystore.location"); + if (keyStoreLocation != null && keyStoreLocation.startsWith("classpath:")) { + final String fileSystemLocation = moveCertToFileSystem(keyStoreLocation, this.certificateStoreDirectory); + // Overriding the value with absolute filesystem path. + this.configuration.put("ssl.keystore.location", fileSystemLocation); + } + } + catch (Exception e) { + throw new IllegalStateException(e); + } + } + + private String moveCertToFileSystem(String classpathLocation, String fileSystemLocation) throws IOException { + File targetFile; + final String tempDir = System.getProperty("java.io.tmpdir"); + Resource resource = new DefaultResourceLoader().getResource(classpathLocation); + if (StringUtils.hasText(fileSystemLocation)) { + final Path path = Paths.get(fileSystemLocation); + if (!Files.exists(path) || !Files.isDirectory(path) || !Files.isWritable(path)) { + logger.warn("The filesystem location to move the cert files (" + fileSystemLocation + ") " + + "is not found or a directory that is writable. The system temp folder (java.io.tmpdir) will be used instead."); + targetFile = new File(Paths.get(tempDir, resource.getFilename()).toString()); + } + else { + // the given location is verified to be a writable directory. + targetFile = new File(Paths.get(fileSystemLocation, resource.getFilename()).toString()); + } + } + else { + targetFile = new File(Paths.get(tempDir, resource.getFilename()).toString()); + } + + try (InputStream inputStream = resource.getInputStream()) { + Files.copy(inputStream, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + } + return targetFile.getAbsolutePath(); + } + public String getDefaultKafkaConnectionString() { return DEFAULT_KAFKA_CONNECTION_STRING; } @@ -363,6 +433,14 @@ public class KafkaBinderConfigurationProperties { this.considerDownWhenAnyPartitionHasNoLeader = considerDownWhenAnyPartitionHasNoLeader; } + public String getCertificateStoreDirectory() { + return this.certificateStoreDirectory; + } + + public void setCertificateStoreDirectory(String certificateStoreDirectory) { + this.certificateStoreDirectory = certificateStoreDirectory; + } + /** * Domain class that models transaction capabilities in Kafka. */ diff --git a/spring-cloud-stream-binder-kafka-core/src/test/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationPropertiesTest.java b/spring-cloud-stream-binder-kafka-core/src/test/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationPropertiesTest.java index 82b1baca..776bc4b9 100644 --- a/spring-cloud-stream-binder-kafka-core/src/test/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationPropertiesTest.java +++ b/spring-cloud-stream-binder-kafka-core/src/test/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationPropertiesTest.java @@ -16,10 +16,12 @@ package org.springframework.cloud.stream.binder.kafka.properties; +import java.nio.file.Paths; import java.util.Collections; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.assertj.core.util.Files; import org.junit.Test; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; @@ -105,4 +107,39 @@ public class KafkaBinderConfigurationPropertiesTest { assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG); } + + @Test + public void testCertificateFilesAreConvertedToAbsolutePathsFromClassPathResources() { + KafkaProperties kafkaProperties = new KafkaProperties(); + KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties = + new KafkaBinderConfigurationProperties(kafkaProperties); + final Map configuration = kafkaBinderConfigurationProperties.getConfiguration(); + configuration.put("ssl.truststore.location", "classpath:testclient.truststore"); + configuration.put("ssl.keystore.location", "classpath:testclient.keystore"); + + kafkaBinderConfigurationProperties.getKafkaConnectionString(); + + assertThat(configuration.get("ssl.truststore.location")) + .isEqualTo(Paths.get(System.getProperty("java.io.tmpdir"), "testclient.truststore").toString()); + assertThat(configuration.get("ssl.keystore.location")) + .isEqualTo(Paths.get(System.getProperty("java.io.tmpdir"), "testclient.keystore").toString()); + } + + @Test + public void testCertificateFilesAreConvertedToGivenAbsolutePathsFromClassPathResources() { + KafkaProperties kafkaProperties = new KafkaProperties(); + KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties = + new KafkaBinderConfigurationProperties(kafkaProperties); + final Map configuration = kafkaBinderConfigurationProperties.getConfiguration(); + configuration.put("ssl.truststore.location", "classpath:testclient.truststore"); + configuration.put("ssl.keystore.location", "classpath:testclient.keystore"); + kafkaBinderConfigurationProperties.setCertificateStoreDirectory("target"); + + kafkaBinderConfigurationProperties.getKafkaConnectionString(); + + assertThat(configuration.get("ssl.truststore.location")).isEqualTo( + Paths.get(Files.currentFolder().toString(), "target", "testclient.truststore").toString()); + assertThat(configuration.get("ssl.keystore.location")).isEqualTo( + Paths.get(Files.currentFolder().toString(), "target", "testclient.keystore").toString()); + } } diff --git a/spring-cloud-stream-binder-kafka-core/src/test/resources/testclient.keystore b/spring-cloud-stream-binder-kafka-core/src/test/resources/testclient.keystore new file mode 100644 index 00000000..e69de29b diff --git a/spring-cloud-stream-binder-kafka-core/src/test/resources/testclient.truststore b/spring-cloud-stream-binder-kafka-core/src/test/resources/testclient.truststore new file mode 100644 index 00000000..e69de29b