Added project for Apache pulsar examples

This commit is contained in:
Syed Mansoor
2018-10-12 16:55:59 +11:00
parent 44a3d36bd9
commit 3596d3dc34
10 changed files with 536 additions and 0 deletions

View File

@@ -0,0 +1,48 @@
package com.baeldung;
import java.io.IOException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
public class ConsumerTest {
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String TOPIC_NAME = "test-topic";
private static final String SUBSCRIPTION_NAME = "test-subscription";
public static void main(String[] args) throws IOException {
// Create a Pulsar client instance. A single instance can be shared across many
// producers and consumer within the same application
PulsarClient client = PulsarClient.builder()
.serviceUrl(SERVICE_URL)
.build();
//Configure consumer specific settings.
Consumer<byte[]> consumer = client.newConsumer()
.topic(TOPIC_NAME)
// Allow multiple consumers to attach to the same subscription
// and get messages dispatched as a queue
.subscriptionType(SubscriptionType.Shared)
.subscriptionName(SUBSCRIPTION_NAME)
.subscribe();
// Once the consumer is created, it can be used for the entire application lifecycle
System.out.println("Created consumer for the topic "+ TOPIC_NAME);
do {
// Wait until a message is available
Message<byte[]> msg = consumer.receive();
// Extract the message as a printable string and then log
String content = new String(msg.getData());
System.out.println("Received message '"+content+"' with ID "+msg.getMessageId());
// Acknowledge processing of the message so that it can be deleted
consumer.acknowledge(msg);
} while (true);
}
}

View File

@@ -0,0 +1,58 @@
package com.baeldung;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import java.io.IOException;
import java.util.stream.IntStream;
public class ProducerTest {
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String TOPIC_NAME = "test-topic";
public static void main(String[] args) throws IOException {
// Create a Pulsar client instance. A single instance can be shared across many
// producers and consumer within the same application
PulsarClient client = PulsarClient.builder()
.serviceUrl(SERVICE_URL)
.build();
// Configure producer specific settings
Producer<byte[]> producer = client.newProducer()
// Set the topic
.topic(TOPIC_NAME)
// Enable compression
.compressionType(CompressionType.LZ4)
.create();
// Once the producer is created, it can be used for the entire application life-cycle
System.out.println("Created producer for the topic "+TOPIC_NAME);
// Send 5 test messages
IntStream.range(1, 5).forEach(i -> {
String content = String.format("hi-pulsar-%d", i);
// Build a message object
Message<byte[]> msg = MessageBuilder.create()
.setContent(content.getBytes())
.build();
// Send each message and log message content and ID when successfully received
try {
MessageId msgId = producer.send(msg);
System.out.println("Published message '"+content+"' with the ID "+msgId);
} catch (PulsarClientException e) {
System.out.println(e.getMessage());
}
});
client.close();
}
}

View File

@@ -0,0 +1,59 @@
package com.baeldung.subscriptions;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import java.util.stream.IntStream;
public class ExclusiveSubscriptionTutorial {
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String TOPIC_NAME = "test-topic";
private static final String SUBSCRIPTION_NAME = "test-subscription";
private static final SubscriptionType SUBSCRIPTION_TYPE = SubscriptionType.Exclusive;
public static void main(String[] args) throws PulsarClientException {
PulsarClient client = PulsarClient.builder()
.serviceUrl(SERVICE_URL)
.build();
Producer<byte[]> producer = client.newProducer()
.topic(TOPIC_NAME)
.create();
ConsumerBuilder<byte[]> consumer1 = client.newConsumer()
.topic(TOPIC_NAME)
.subscriptionName(SUBSCRIPTION_NAME)
.subscriptionType(SUBSCRIPTION_TYPE);
ConsumerBuilder<byte[]> consumer2 = client.newConsumer()
.topic(TOPIC_NAME)
.subscriptionName(SUBSCRIPTION_NAME)
.subscriptionType(SUBSCRIPTION_TYPE);
IntStream.range(0, 999).forEach(i -> {
Message<byte[]> msg = MessageBuilder.create()
.setContent(String.format("message-%d", i).getBytes())
.build();
try {
producer.send(msg);
} catch (PulsarClientException e) {
System.out.println(e.getMessage());
}
});
// Consumer 1 can subscribe to the topic
consumer1.subscribe();
// Consumer 2 cannot due to the exclusive subscription held by consumer 1
consumer2.subscribeAsync()
.handle((consumer, exception) -> {
System.out.println(exception.getMessage());
return null;
});
}
}

View File

@@ -0,0 +1,76 @@
package com.baeldung.subscriptions;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import java.util.stream.IntStream;
public class FailoverSubscriptionTutorial {
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String TOPIC_NAME = "failover-subscription-test-topic";
private static final String SUBSCRIPTION_NAME = "test-subscription";
private static final SubscriptionType SUBSCRIPTION_TYPE = SubscriptionType.Failover;
private static final int NUM_MSGS = 10;
public static void main(String[] args) throws PulsarClientException {
PulsarClient client = PulsarClient.builder()
.serviceUrl(SERVICE_URL)
.build();
Producer<byte[]> producer = client.newProducer()
.topic(TOPIC_NAME)
.create();
ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer()
.topic(TOPIC_NAME)
.subscriptionName(SUBSCRIPTION_NAME)
.subscriptionType(SUBSCRIPTION_TYPE);
Consumer<byte[]> mainConsumer = consumerBuilder
.consumerName("consumer-a")
.messageListener((consumer, msg) -> {
System.out.println("Message received by main consumer");
try {
consumer.acknowledge(msg);
} catch (PulsarClientException e) {
System.out.println(e.getMessage());
}
})
.subscribe();
Consumer<byte[]> failoverConsumer = consumerBuilder
.consumerName("consumer-b")
.messageListener((consumer, msg) -> {
System.out.println("Message received by failover consumer");
try {
consumer.acknowledge(msg);
} catch (PulsarClientException e) {
System.out.println(e.getMessage());
}
})
.subscribe();
IntStream.range(0, NUM_MSGS).forEach(i -> {
Message<byte[]> msg = MessageBuilder.create()
.setContent(String.format("message-%d", i).getBytes())
.build();
try {
producer.send(msg);
Thread.sleep(100);
if (i > 5) mainConsumer.close();
} catch (InterruptedException | PulsarClientException e) {
System.out.println(e.getMessage());
}
});
}
}