== Spring Cloud Stream Multi-binder Kafka Application with security (JAAS)
This is a sample application that demonstrates how to connect to multi kafka clusters with security enabled using multiple binders.
This application uses two Kafka clusters both of them are enabled with security (JAAS - SASL/PLAINTEXT).
## Here are the detailed instructions for setting up your cluster.
If you already have two clusters with security (sasl/plaintext) enabled, you can skip this section. However, it may still benefit to go over these instructions in order to avoid any inconsistencies with your environment.
* Download the latest Apache Kafka distribution (version 1.0.0 or above)
* Unzip the same zip file into two directories (lets call them as cluster-1 and cluster-2 for the purposes of this sample)
* cd cluster-1/config
* Create a new file `kafka_server_jaas.conf` and add the following content:
```
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
```
* Create another file called `zookeeper_jaas.conf` and add the following content:
```
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
```
* Edit the file server.properties and make sure that the following properties are set. Some of these are already in the configuration and if so, you have to find them and modify.
```
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
listeners=SASL_PLAINTEXT://localhost:9092
advertised.listeners=SASL_PLAINTEXT://localhost:9092
```
* Edit the file zookeeper.properties and add the following properties:
```
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
```
* Edit the file producer.properties and add the following content:
```
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
```
* Edit the file consumer.properties and add the following content:
```
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
```
* Terminal 1
```
$ export KAFKA_OPTS="-Djava.security.auth.login.config=<PATH-TO-CLUSTER-1>/config/zookeeper_jaas.conf"
$ ./<PATH-TO-CLUSTER-1>/bin/zookeeper-server-start.sh config/zookeeper.properties
```
* Terminal 2
```
$ export KAFKA_OPTS="-Djava.security.auth.login.config=<PATH-TO-CLUSTER-1>/config/kafka_server_jaas.conf"
$ ./<PATH-TO-CLUSTER-1>/bin/kafka-server-start.sh config/server.properties
```
* Now we need to do the same for the second cluster. Follow along...
* cd cluster-2/config
* Create a new file `kafka_server_jaas.conf` and add the following content:
```
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="foo"
password="foo-secret"
user_foo="foo-secret";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="foo"
password="foo-secret";
};
```
* Create another file called `zookeeper_jaas.conf` and add the following content:
```
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="foo"
password="foo-secret"
user_foo="foo-secret";
};
```
* Edit the file server.properties and make sure that the following properties are set. Some of these are already in the configuration and if so, you have to find them and modify.
```
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
listeners=SASL_PLAINTEXT://localhost:9093
advertised.listeners=SASL_PLAINTEXT://localhost:9093
log.dirs=/tmp/kafka-cluster2-logs
zookeeper.connect=localhost:2182
```
* Edit the file zookeeper.properties and add the following properties:
```
clientPort=2182
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
```
* Edit the file producer.properties and add the following content:
```
bootstrap.servers=localhost:9093
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
```
* Edit the file consumer.properties and add the following content:
```
bootstrap.servers=localhost:9093
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
```
* Terminal 3
```
$ export KAFKA_OPTS="-Djava.security.auth.login.config=<PATH-TO-CLUSTER-2>/config/zookeeper_jaas.conf"
$ ./<PATH-TO-CLUSTER-2>/bin/zookeeper-server-start.sh config/zookeeper.properties
```
* Terminal 4
```
$ export KAFKA_OPTS="-Djava.security.auth.login.config=<PATH-TO-CLUSTER-2>/config/kafka_server_jaas.conf"
$ ./<PATH-TO-CLUSTER-2>/bin/kafka-server-start.sh config/server.properties
```
* At this point, you should have 2 zookeeper instances and 2 kafka instances running. Verify this by running `jps` (or other OS tools).
## Running the application
The application contains two `StreamListener` methods. The first one receives records from a topic in cluster-1 and output that to a topic in cluster-2.
The second `StreamListener` method receives records from a topic in cluster-2 and output that to a topic in cluster-1.
Cluster-1 input topic is named as kafka1-in and output topic is kafka1-out. Similarly for cluster-2 they are kafka2-in and kafka2-out respectively.
Run the application `MultiBinderKafkaJaasSample` on the IDE or from CLI.
## Running and verifying the application
Review the application's configuration for kafka client security settings.
The application contains a supplier (`supply`) that produces data to a topic in Kafka-1.
From this topic, a function ('receive1) will consume records and then produce to a topic in Kafka-2.
We provide another consumer ('consume`) in the application that consumes data from this topic in Kafka-2 and then log on the console.
This is the function definition: `supply;receive;consume`
You can simply run the main application and then see the test consumer prints data on the console.
* Terminal 5
Create a file called `kafka_client_jaas.conf` and add the following content (Save it to any directory, but for this example, lets say it is in `/home/username`):
```
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
```
You can also change the function definition to this `spring.cloud.function.definition: receive`.
In this case, we only run a single function, and it is your responsibility to send data to the listening topic.
For that, you can follow the instructions below.
* Terminal 6 - Produce data to kafka1 - input.
$ `export KAFKA_OPTS="-Djava.security.auth.login.config=/home/username/kafka_client_jaas.conf"`
Go to the kafka installation directory (It doesn't matter if you are in cluster-1 or cluster-2 directories for the instructions below)
$ `./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic foo --producer.config=config/producer.properties`
* Terminal 7 (or split the above terminal into 2 window panes) - Consume data from kafka2 - output where the above data is expected to come through the processor.
$ `export KAFKA_OPTS="-Djava.security.auth.login.config=/home/username/kafka_client_jaas.conf"`
Go to the kafka installation directory (It doesn't matter if you are in cluster-1 or cluster-2 at for the instructions below)
$ `./bin/kafka-console-consumer.sh --topic bar --consumer.config=config/consumer.properties --bootstrap-server=localhost:9093`
* Now start adding some text to the terminal session where you are running console producer on kafka1 input topic (`foo`).
Then verify that, you see the same exact text, but in uppercase, on the terminal session where you are running the console consumer on kafka2 output topic (`bar`).
PS: Once you are done with the testing, remember to stop the application, console consumer and producer (if you are running them), and your local kafka clusters used for testing.