From 6be541c070ecb1e903552c82f9e2319a29c65f8f Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Tue, 17 Nov 2020 13:49:04 -0500 Subject: [PATCH] Copy cert files from classpath to file-system (#989) * Copy cert files from classpath to file-system If `ssl.truststore.location` and `ssl.keystore.location` are provided as classpath resources, convert them to absolute paths on the filesystem. This is because of a restriction in the Kafka client in which it does not allow certificates to be read from the classpath. See these issues for more details: https://issues.apache.org/jira/browse/KAFKA-7685 https://cwiki.apache.org/confluence/display/KAFKA/KIP-398%3A+Support+reading+trust+store+from+classpath This commit allows the Spring Cloud Stream application to provide the cert files as classpath: reosources, but the binder internally move them to a locations on the local filesystem and then use that absolute path as the value for cert locations. Adding properties for optional paths to move the files to. If no values are provided for these properties, then use the system's /tmp directory. Adding tests and docs. Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/985 * Addressing PR review comments * Addressing further PR review comments * Consolidate keystore/truststore filesystem locations into a single property * Addressing PR review --- docs/src/main/asciidoc/overview.adoc | 8 ++ .../KafkaBinderConfigurationProperties.java | 78 +++++++++++++++++++ ...afkaBinderConfigurationPropertiesTest.java | 37 +++++++++ .../src/test/resources/testclient.keystore | 0 .../src/test/resources/testclient.truststore | 0 5 files changed, 123 insertions(+) create mode 100644 spring-cloud-stream-binder-kafka-core/src/test/resources/testclient.keystore create mode 100644 spring-cloud-stream-binder-kafka-core/src/test/resources/testclient.truststore diff --git a/docs/src/main/asciidoc/overview.adoc b/docs/src/main/asciidoc/overview.adoc index 6c1ff011..ad33bb54 100644 --- a/docs/src/main/asciidoc/overview.adoc +++ b/docs/src/main/asciidoc/overview.adoc @@ -149,6 +149,14 @@ Flag to set the binder health as `down`, when any partitions on the topic, regar + Default: `false`. +spring.cloud.stream.kafka.binder.certificateStoreDirectory:: +When the truststore or keystore certificate location is given as a classpath URL (`classpath:...`), the binder copies the resource from the classpath location inside the JAR file to a location on the filesystem. +The file will be moved to the location specified as the value for this property which must be an existing directory on the filesystem that is writable by the process running the application. +If this value is not set and the certificate file is a classpath resource, then it will be moved to System's temp directory as returned by `System.getProperty("java.io.tmpdir")`. +This is also true, if this value is present, but the directory cannot be found on the filesystem or is not writable. ++ +Default: none. + [[kafka-consumer-properties]] ==== Kafka Consumer Properties diff --git a/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationProperties.java b/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationProperties.java index 0dab3dee..3b0cb124 100644 --- a/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationProperties.java +++ b/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationProperties.java @@ -16,6 +16,13 @@ package org.springframework.cloud.stream.binder.kafka.properties; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; import java.time.Duration; import java.util.Collections; import java.util.HashMap; @@ -35,6 +42,8 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.cloud.stream.binder.HeaderMode; import org.springframework.cloud.stream.binder.ProducerProperties; import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties.CompressionType; +import org.springframework.core.io.DefaultResourceLoader; +import org.springframework.core.io.Resource; import org.springframework.expression.Expression; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; @@ -118,6 +127,14 @@ public class KafkaBinderConfigurationProperties { */ private Duration authorizationExceptionRetryInterval; + /** + * When a certificate store location is given as classpath URL (classpath:), then the binder + * moves the resource from the classpath location inside the JAR to a location on + * the filesystem. If this value is set, then this location is used, otherwise, the + * certificate file is copied to the directory returned by java.io.tmpdir. + */ + private String certificateStoreDirectory; + public KafkaBinderConfigurationProperties(KafkaProperties kafkaProperties) { Assert.notNull(kafkaProperties, "'kafkaProperties' cannot be null"); this.kafkaProperties = kafkaProperties; @@ -132,9 +149,62 @@ public class KafkaBinderConfigurationProperties { } public String getKafkaConnectionString() { + // We need to do a check on certificate file locations to see if they are given as classpath resources. + // If that is the case, then we will move them to a file system location and use those as the certificate locations. + // This is due to a limitation in Kafka itself in which it doesn't allow reading certificate resources from the classpath. + // See this: https://issues.apache.org/jira/browse/KAFKA-7685 + // and this: https://cwiki.apache.org/confluence/display/KAFKA/KIP-398%3A+Support+reading+trust+store+from+classpath + moveCertsToFileSystemIfNecessary(); + return toConnectionString(this.brokers, this.defaultBrokerPort); } + private void moveCertsToFileSystemIfNecessary() { + try { + final String trustStoreLocation = this.configuration.get("ssl.truststore.location"); + if (trustStoreLocation != null && trustStoreLocation.startsWith("classpath:")) { + final String fileSystemLocation = moveCertToFileSystem(trustStoreLocation, this.certificateStoreDirectory); + // Overriding the value with absolute filesystem path. + this.configuration.put("ssl.truststore.location", fileSystemLocation); + } + final String keyStoreLocation = this.configuration.get("ssl.keystore.location"); + if (keyStoreLocation != null && keyStoreLocation.startsWith("classpath:")) { + final String fileSystemLocation = moveCertToFileSystem(keyStoreLocation, this.certificateStoreDirectory); + // Overriding the value with absolute filesystem path. + this.configuration.put("ssl.keystore.location", fileSystemLocation); + } + } + catch (Exception e) { + throw new IllegalStateException(e); + } + } + + private String moveCertToFileSystem(String classpathLocation, String fileSystemLocation) throws IOException { + File targetFile; + final String tempDir = System.getProperty("java.io.tmpdir"); + Resource resource = new DefaultResourceLoader().getResource(classpathLocation); + if (StringUtils.hasText(fileSystemLocation)) { + final Path path = Paths.get(fileSystemLocation); + if (!Files.exists(path) || !Files.isDirectory(path) || !Files.isWritable(path)) { + logger.warn("The filesystem location to move the cert files (" + fileSystemLocation + ") " + + "is not found or a directory that is writable. The system temp folder (java.io.tmpdir) will be used instead."); + targetFile = new File(Paths.get(tempDir, resource.getFilename()).toString()); + } + else { + // the given location is verified to be a writable directory. + targetFile = new File(Paths.get(fileSystemLocation, resource.getFilename()).toString()); + } + } + else { + targetFile = new File(Paths.get(tempDir, resource.getFilename()).toString()); + } + + try (InputStream inputStream = resource.getInputStream()) { + Files.copy(inputStream, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + } + return targetFile.getAbsolutePath(); + } + public String getDefaultKafkaConnectionString() { return DEFAULT_KAFKA_CONNECTION_STRING; } @@ -363,6 +433,14 @@ public class KafkaBinderConfigurationProperties { this.considerDownWhenAnyPartitionHasNoLeader = considerDownWhenAnyPartitionHasNoLeader; } + public String getCertificateStoreDirectory() { + return this.certificateStoreDirectory; + } + + public void setCertificateStoreDirectory(String certificateStoreDirectory) { + this.certificateStoreDirectory = certificateStoreDirectory; + } + /** * Domain class that models transaction capabilities in Kafka. */ diff --git a/spring-cloud-stream-binder-kafka-core/src/test/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationPropertiesTest.java b/spring-cloud-stream-binder-kafka-core/src/test/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationPropertiesTest.java index 82b1baca..776bc4b9 100644 --- a/spring-cloud-stream-binder-kafka-core/src/test/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationPropertiesTest.java +++ b/spring-cloud-stream-binder-kafka-core/src/test/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationPropertiesTest.java @@ -16,10 +16,12 @@ package org.springframework.cloud.stream.binder.kafka.properties; +import java.nio.file.Paths; import java.util.Collections; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.assertj.core.util.Files; import org.junit.Test; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; @@ -105,4 +107,39 @@ public class KafkaBinderConfigurationPropertiesTest { assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG); } + + @Test + public void testCertificateFilesAreConvertedToAbsolutePathsFromClassPathResources() { + KafkaProperties kafkaProperties = new KafkaProperties(); + KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties = + new KafkaBinderConfigurationProperties(kafkaProperties); + final Map configuration = kafkaBinderConfigurationProperties.getConfiguration(); + configuration.put("ssl.truststore.location", "classpath:testclient.truststore"); + configuration.put("ssl.keystore.location", "classpath:testclient.keystore"); + + kafkaBinderConfigurationProperties.getKafkaConnectionString(); + + assertThat(configuration.get("ssl.truststore.location")) + .isEqualTo(Paths.get(System.getProperty("java.io.tmpdir"), "testclient.truststore").toString()); + assertThat(configuration.get("ssl.keystore.location")) + .isEqualTo(Paths.get(System.getProperty("java.io.tmpdir"), "testclient.keystore").toString()); + } + + @Test + public void testCertificateFilesAreConvertedToGivenAbsolutePathsFromClassPathResources() { + KafkaProperties kafkaProperties = new KafkaProperties(); + KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties = + new KafkaBinderConfigurationProperties(kafkaProperties); + final Map configuration = kafkaBinderConfigurationProperties.getConfiguration(); + configuration.put("ssl.truststore.location", "classpath:testclient.truststore"); + configuration.put("ssl.keystore.location", "classpath:testclient.keystore"); + kafkaBinderConfigurationProperties.setCertificateStoreDirectory("target"); + + kafkaBinderConfigurationProperties.getKafkaConnectionString(); + + assertThat(configuration.get("ssl.truststore.location")).isEqualTo( + Paths.get(Files.currentFolder().toString(), "target", "testclient.truststore").toString()); + assertThat(configuration.get("ssl.keystore.location")).isEqualTo( + Paths.get(Files.currentFolder().toString(), "target", "testclient.keystore").toString()); + } } diff --git a/spring-cloud-stream-binder-kafka-core/src/test/resources/testclient.keystore b/spring-cloud-stream-binder-kafka-core/src/test/resources/testclient.keystore new file mode 100644 index 00000000..e69de29b diff --git a/spring-cloud-stream-binder-kafka-core/src/test/resources/testclient.truststore b/spring-cloud-stream-binder-kafka-core/src/test/resources/testclient.truststore new file mode 100644 index 00000000..e69de29b