diff --git a/docs/src/main/asciidoc/overview.adoc b/docs/src/main/asciidoc/overview.adoc index 31771679..096eccf6 100644 --- a/docs/src/main/asciidoc/overview.adoc +++ b/docs/src/main/asciidoc/overview.adoc @@ -151,6 +151,9 @@ Default: `false`. spring.cloud.stream.kafka.binder.certificateStoreDirectory:: When the truststore or keystore certificate location is given as a classpath URL (`classpath:...`), the binder copies the resource from the classpath location inside the JAR file to a location on the filesystem. +This is true for both broker level certificates (`ssl.truststore.location` and `ssl.keystore.location`) and certificates intended for schema registry (`schema.registry.ssl.truststore.location` and `schema.registry.ssl.keystore.location`). +Keep in mind that the truststore and keystore classpath locations must be provided under `spring.cloud.stream.kafka.binder.configuration...`. +For example, `spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location`, ``spring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location`, etc. The file will be moved to the location specified as the value for this property which must be an existing directory on the filesystem that is writable by the process running the application. If this value is not set and the certificate file is a classpath resource, then it will be moved to System's temp directory as returned by `System.getProperty("java.io.tmpdir")`. This is also true, if this value is present, but the directory cannot be found on the filesystem or is not writable. diff --git a/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationProperties.java b/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationProperties.java index 06506bf4..c53566ee 100644 --- a/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationProperties.java +++ b/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -163,24 +163,44 @@ public class KafkaBinderConfigurationProperties { private void moveCertsToFileSystemIfNecessary() { try { - final String trustStoreLocation = this.configuration.get("ssl.truststore.location"); - if (trustStoreLocation != null && trustStoreLocation.startsWith("classpath:")) { - final String fileSystemLocation = moveCertToFileSystem(trustStoreLocation, this.certificateStoreDirectory); - // Overriding the value with absolute filesystem path. - this.configuration.put("ssl.truststore.location", fileSystemLocation); - } - final String keyStoreLocation = this.configuration.get("ssl.keystore.location"); - if (keyStoreLocation != null && keyStoreLocation.startsWith("classpath:")) { - final String fileSystemLocation = moveCertToFileSystem(keyStoreLocation, this.certificateStoreDirectory); - // Overriding the value with absolute filesystem path. - this.configuration.put("ssl.keystore.location", fileSystemLocation); - } + moveBrokerCertsIfApplicable(); + moveSchemaRegistryCertsIfApplicable(); } catch (Exception e) { throw new IllegalStateException(e); } } + private void moveBrokerCertsIfApplicable() throws IOException { + final String trustStoreLocation = this.configuration.get("ssl.truststore.location"); + if (trustStoreLocation != null && trustStoreLocation.startsWith("classpath:")) { + final String fileSystemLocation = moveCertToFileSystem(trustStoreLocation, this.certificateStoreDirectory); + // Overriding the value with absolute filesystem path. + this.configuration.put("ssl.truststore.location", fileSystemLocation); + } + final String keyStoreLocation = this.configuration.get("ssl.keystore.location"); + if (keyStoreLocation != null && keyStoreLocation.startsWith("classpath:")) { + final String fileSystemLocation = moveCertToFileSystem(keyStoreLocation, this.certificateStoreDirectory); + // Overriding the value with absolute filesystem path. + this.configuration.put("ssl.keystore.location", fileSystemLocation); + } + } + + private void moveSchemaRegistryCertsIfApplicable() throws IOException { + String trustStoreLocation = this.configuration.get("schema.registry.ssl.truststore.location"); + if (trustStoreLocation != null && trustStoreLocation.startsWith("classpath:")) { + final String fileSystemLocation = moveCertToFileSystem(trustStoreLocation, this.certificateStoreDirectory); + // Overriding the value with absolute filesystem path. + this.configuration.put("schema.registry.ssl.truststore.location", fileSystemLocation); + } + final String keyStoreLocation = this.configuration.get("schema.registry.ssl.keystore.location"); + if (keyStoreLocation != null && keyStoreLocation.startsWith("classpath:")) { + final String fileSystemLocation = moveCertToFileSystem(keyStoreLocation, this.certificateStoreDirectory); + // Overriding the value with absolute filesystem path. + this.configuration.put("schema.registry.ssl.keystore.location", fileSystemLocation); + } + } + private String moveCertToFileSystem(String classpathLocation, String fileSystemLocation) throws IOException { File targetFile; final String tempDir = System.getProperty("java.io.tmpdir"); diff --git a/spring-cloud-stream-binder-kafka-core/src/test/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationPropertiesTest.java b/spring-cloud-stream-binder-kafka-core/src/test/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationPropertiesTest.java index 776bc4b9..48df4962 100644 --- a/spring-cloud-stream-binder-kafka-core/src/test/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationPropertiesTest.java +++ b/spring-cloud-stream-binder-kafka-core/src/test/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationPropertiesTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2019 the original author or authors. + * Copyright 2018-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -142,4 +142,22 @@ public class KafkaBinderConfigurationPropertiesTest { assertThat(configuration.get("ssl.keystore.location")).isEqualTo( Paths.get(Files.currentFolder().toString(), "target", "testclient.keystore").toString()); } + + @Test + public void testCertificateFilesAreMovedForSchemaRegistryConfiguration() { + KafkaProperties kafkaProperties = new KafkaProperties(); + KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties = + new KafkaBinderConfigurationProperties(kafkaProperties); + final Map configuration = kafkaBinderConfigurationProperties.getConfiguration(); + configuration.put("schema.registry.ssl.truststore.location", "classpath:testclient.truststore"); + configuration.put("schema.registry.ssl.keystore.location", "classpath:testclient.keystore"); + kafkaBinderConfigurationProperties.setCertificateStoreDirectory("target"); + + kafkaBinderConfigurationProperties.getKafkaConnectionString(); + + assertThat(configuration.get("schema.registry.ssl.truststore.location")).isEqualTo( + Paths.get(Files.currentFolder().toString(), "target", "testclient.truststore").toString()); + assertThat(configuration.get("schema.registry.ssl.keystore.location")).isEqualTo( + Paths.get(Files.currentFolder().toString(), "target", "testclient.keystore").toString()); + } }