diff --git a/spring-pulsar/pom.xml b/spring-pulsar/pom.xml new file mode 100644 index 0000000000..4c2fc0d9b4 --- /dev/null +++ b/spring-pulsar/pom.xml @@ -0,0 +1,48 @@ + + + 4.0.0 + spring-pulsar + 0.0.1-SNAPSHOT + spring-pulsar + Intro to Apache Pulsar with Spring + + + com.baeldung + parent-boot-3 + 0.0.1-SNAPSHOT + ../parent-boot-3 + + + + 17 + 0.2.0 + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.pulsar + spring-pulsar-spring-boot-starter + 0.2.0 + + + + org.springframework.boot + spring-boot-starter-test + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + diff --git a/spring-pulsar/src/main/java/com/baeldung/springpulsar/PulsarConsumer.java b/spring-pulsar/src/main/java/com/baeldung/springpulsar/PulsarConsumer.java new file mode 100644 index 0000000000..7a483d97cb --- /dev/null +++ b/spring-pulsar/src/main/java/com/baeldung/springpulsar/PulsarConsumer.java @@ -0,0 +1,59 @@ +package com.baeldung.springpulsar; + +import org.apache.pulsar.client.api.DeadLetterPolicy; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.schema.SchemaType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.pulsar.annotation.PulsarListener; +import org.springframework.pulsar.listener.AckMode; +import org.springframework.stereotype.Service; + +@Service +public class PulsarConsumer { + + private static final String STRING_TOPIC = "string-topic"; + private static final String USER_TOPIC = "user-topic"; + private static final String USER_DEAD_LETTER_TOPIC = "user-dead-letter-topic"; + private final Logger LOGGER = LoggerFactory.getLogger(PulsarConsumer.class); + + @PulsarListener( + subscriptionName = "string-topic-subscription", + topics = STRING_TOPIC, + subscriptionType = SubscriptionType.Shared + ) + public void stringTopicListener(String str) { + LOGGER.info("Received String message: {}", str); + } + + @Bean + DeadLetterPolicy deadLetterPolicy() { + return DeadLetterPolicy.builder() + .maxRedeliverCount(10) + .deadLetterTopic(USER_DEAD_LETTER_TOPIC) + .build(); + } + + @PulsarListener( + subscriptionName = "user-topic-subscription", + topics = USER_TOPIC, + subscriptionType = SubscriptionType.Shared, + schemaType = SchemaType.JSON, + ackMode = AckMode.RECORD, + deadLetterPolicy = "deadLetterPolicy", + properties = {"ackTimeout=60s"} + ) + public void userTopicListener(User user) { + LOGGER.info("Received user object with email: {}", user.getEmail()); + } + + @PulsarListener( + subscriptionName = "dead-letter-topic-subscription", + topics = USER_DEAD_LETTER_TOPIC, + subscriptionType = SubscriptionType.Shared + ) + public void userDlqTopicListener(User user) { + LOGGER.info("Received user object in user-DLQ with email: {}", user.getEmail()); + } +} diff --git a/spring-pulsar/src/main/java/com/baeldung/springpulsar/PulsarProducer.java b/spring-pulsar/src/main/java/com/baeldung/springpulsar/PulsarProducer.java new file mode 100644 index 0000000000..9415b640dc --- /dev/null +++ b/spring-pulsar/src/main/java/com/baeldung/springpulsar/PulsarProducer.java @@ -0,0 +1,37 @@ +package com.baeldung.springpulsar; + +import org.apache.pulsar.client.api.ProducerAccessMode; +import org.apache.pulsar.client.api.PulsarClientException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.pulsar.core.PulsarTemplate; +import org.springframework.stereotype.Component; + +import java.util.concurrent.TimeUnit; + +@Component +public class PulsarProducer { + + @Autowired + private PulsarTemplate template; + @Autowired + private PulsarTemplate stringTemplate; + + private static final String USER_TOPIC = "user-topic"; + private static final String USER_TOPIC_STR = "string-topic"; + + public void sendMessageToPulsarTopic(User user) throws PulsarClientException { + template.newMessage(user) + .withProducerCustomizer(pc -> { + pc.accessMode(ProducerAccessMode.Shared); + }) + .withMessageCustomizer(mc -> { + mc.deliverAfter(10L, TimeUnit.SECONDS); + }) + .withTopic(USER_TOPIC) + .send(); + } + + public void sendStringMessageToPulsarTopic(String str) throws PulsarClientException { + stringTemplate.send(USER_TOPIC_STR, str); + } +} diff --git a/spring-pulsar/src/main/java/com/baeldung/springpulsar/SpringPulsarApplication.java b/spring-pulsar/src/main/java/com/baeldung/springpulsar/SpringPulsarApplication.java new file mode 100644 index 0000000000..8fd978df76 --- /dev/null +++ b/spring-pulsar/src/main/java/com/baeldung/springpulsar/SpringPulsarApplication.java @@ -0,0 +1,15 @@ +package com.baeldung.springpulsar; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.pulsar.annotation.EnablePulsar; + +@EnablePulsar +@SpringBootApplication +public class SpringPulsarApplication { + + public static void main(String[] args) { + SpringApplication.run(SpringPulsarApplication.class, args); + } + +} diff --git a/spring-pulsar/src/main/java/com/baeldung/springpulsar/User.java b/spring-pulsar/src/main/java/com/baeldung/springpulsar/User.java new file mode 100644 index 0000000000..1175b16d26 --- /dev/null +++ b/spring-pulsar/src/main/java/com/baeldung/springpulsar/User.java @@ -0,0 +1,31 @@ +package com.baeldung.springpulsar; + +public class User { + + private String email; + private String firstName; + + public User() { + } + + public User(String email, String firstName) { + this.email = email; + this.firstName = firstName; + } + + public String getEmail() { + return email; + } + + public void setEmail(String email) { + this.email = email; + } + + public String getFirstName() { + return firstName; + } + + public void setFirstName(String firstName) { + this.firstName = firstName; + } +} diff --git a/spring-pulsar/src/main/resources/application.yaml b/spring-pulsar/src/main/resources/application.yaml new file mode 100644 index 0000000000..a35ed70643 --- /dev/null +++ b/spring-pulsar/src/main/resources/application.yaml @@ -0,0 +1,12 @@ +server: + port: 8085 + +spring: + pulsar: + client: + service-url: pulsar://localhost:6650 + defaults: + type-mappings: + - message-type: com.baeldung.springpulsar.User + schema-info: + schema-type: JSON \ No newline at end of file