BAEL-202: Asynchronous Operations in Couchbase

This commit is contained in:
Kevin Gilmore
2016-08-07 21:43:06 -05:00
parent 6d61497eec
commit f8e67f8dd5
25 changed files with 1294 additions and 0 deletions

View File

@@ -0,0 +1,89 @@
package com.baeldung.couchbase.person;
import com.baeldung.couchbase.service.CouchbaseEntity;
public class Person implements CouchbaseEntity {
private String id;
private String type;
private String name;
private String homeTown;
Person() {}
public Person(Builder b) {
this.id = b.id;
this.type = b.type;
this.name = b.name;
this.homeTown = b.homeTown;
}
@Override
public String getId() {
return id;
}
@Override
public void setId(String id) {
this.id = id;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getHomeTown() {
return homeTown;
}
public void setHomeTown(String homeTown) {
this.homeTown = homeTown;
}
public static class Builder {
private String id;
private String type;
private String name;
private String homeTown;
public static Builder newInstance() {
return new Builder();
}
public Person build() {
return new Person(this);
}
public Builder id(String id) {
this.id = id;
return this;
}
public Builder type(String type) {
this.type = type;
return this;
}
public Builder name(String name) {
this.name = name;
return this;
}
public Builder homeTown(String homeTown) {
this.homeTown = homeTown;
return this;
}
}
}

View File

@@ -0,0 +1,26 @@
package com.baeldung.couchbase.person;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import com.baeldung.couchbase.service.AbstractCrudService;
import com.baeldung.couchbase.service.BucketService;
@Service
public class PersonCrudService extends AbstractCrudService<Person> {
@Autowired
public PersonCrudService(
@Qualifier("TutorialBucketService") BucketService bucketService,
PersonDocumentConverter converter) {
super(bucketService, converter);
}
@PostConstruct
private void init() {
loadBucket();
}
}

View File

@@ -0,0 +1,31 @@
package com.baeldung.couchbase.person;
import org.springframework.stereotype.Service;
import com.baeldung.couchbase.service.JsonDocumentConverter;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.json.JsonObject;
@Service
public class PersonDocumentConverter implements JsonDocumentConverter<Person> {
@Override
public JsonDocument toDocument(Person p) {
JsonObject content = JsonObject.empty()
.put("type", "Person")
.put("name", p.getName())
.put("homeTown", p.getHomeTown());
return JsonDocument.create(p.getId(), content);
}
@Override
public Person fromDocument(JsonDocument doc) {
JsonObject content = doc.content();
Person p = new Person();
p.setId(doc.id());
p.setType("Person");
p.setName(content.getString("name"));
p.setHomeTown(content.getString("homeTown"));
return p;
}
}

View File

@@ -0,0 +1,29 @@
package com.baeldung.couchbase.person;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.couchbase.client.core.CouchbaseException;
@Service
public class RegistrationService {
@Autowired
private PersonCrudService crud;
public void registerNewPerson(String name, String homeTown) {
Person person = new Person();
person.setName(name);
person.setHomeTown(homeTown);
crud.create(person);
}
public Person findRegistrant(String id) {
try{
return crud.read(id);
}
catch(CouchbaseException e) {
return crud.readFromReplica(id);
}
}
}

View File

@@ -0,0 +1,27 @@
package com.baeldung.couchbase.service;
import com.couchbase.client.java.Bucket;
public abstract class AbstractBucketService implements BucketService {
private ClusterService clusterService;
private Bucket bucket;
protected void openBucket() {
bucket = clusterService.openBucket(getBucketName(), getBucketPassword());
}
protected abstract String getBucketName();
protected abstract String getBucketPassword();
public AbstractBucketService(ClusterService clusterService) {
this.clusterService = clusterService;
}
@Override
public Bucket getBucket() {
return bucket;
}
}

View File

@@ -0,0 +1,174 @@
package com.baeldung.couchbase.service;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.couchbase.client.core.BackpressureException;
import com.couchbase.client.core.time.Delay;
import com.couchbase.client.java.AsyncBucket;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.ReplicaMode;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.util.retry.RetryBuilder;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
public abstract class AbstractCrudService<T extends CouchbaseEntity> implements CrudService<T> {
private static final Logger logger = LoggerFactory.getLogger(AbstractCrudService.class);
private BucketService bucketService;
private Bucket bucket;
private JsonDocumentConverter<T> converter;
public AbstractCrudService(BucketService bucketService, JsonDocumentConverter<T> converter) {
this.bucketService = bucketService;
this.converter = converter;
}
protected void loadBucket() {
bucket = bucketService.getBucket();
}
@Override
public void create(T t) {
if(t.getId() == null) {
t.setId(UUID.randomUUID().toString());
}
JsonDocument doc = converter.toDocument(t);
bucket.insert(doc);
}
@Override
public T read(String id) {
JsonDocument doc = bucket.get(id);
return (doc == null ? null : converter.fromDocument(doc));
}
@Override
public T readFromReplica(String id) {
List<JsonDocument> docs = bucket.getFromReplica(id, ReplicaMode.FIRST);
return (docs.isEmpty() ? null : converter.fromDocument(docs.get(0)));
}
@Override
public void update(T t) {
JsonDocument doc = converter.toDocument(t);
bucket.upsert(doc);
}
@Override
public void delete(String id) {
bucket.remove(id);
}
@Override
public List<T> readBulk(Iterable<String> ids) {
final AsyncBucket asyncBucket = bucket.async();
Observable<JsonDocument> asyncOperation = Observable
.from(ids)
.flatMap(new Func1<String, Observable<JsonDocument>>() {
public Observable<JsonDocument> call(String key) {
return asyncBucket.get(key);
}
});
final List<T> items = new ArrayList<T>();
try {
asyncOperation.toBlocking()
.forEach(new Action1<JsonDocument>() {
public void call(JsonDocument doc) {
T item = converter.fromDocument(doc);
items.add(item);
}
});
} catch (Exception e) {
logger.error("Error during bulk get", e);
}
return items;
}
@Override
public void createBulk(Iterable<T> items) {
final AsyncBucket asyncBucket = bucket.async();
Observable
.from(items)
.flatMap(new Func1<T, Observable<JsonDocument>>() {
@SuppressWarnings("unchecked")
@Override
public Observable<JsonDocument> call(final T t) {
if(t.getId() == null) {
t.setId(UUID.randomUUID().toString());
}
JsonDocument doc = converter.toDocument(t);
return asyncBucket.insert(doc)
.retryWhen(RetryBuilder
.anyOf(BackpressureException.class)
.delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
.max(10)
.build());
}
})
.last()
.toBlocking()
.single();
}
@Override
public void updateBulk(Iterable<T> items) {
final AsyncBucket asyncBucket = bucket.async();
Observable
.from(items)
.flatMap(new Func1<T, Observable<JsonDocument>>() {
@SuppressWarnings("unchecked")
@Override
public Observable<JsonDocument> call(final T t) {
JsonDocument doc = converter.toDocument(t);
return asyncBucket.upsert(doc)
.retryWhen(RetryBuilder
.anyOf(BackpressureException.class)
.delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
.max(10)
.build());
}
})
.last()
.toBlocking()
.single();
}
@Override
public void deleteBulk(Iterable<String> ids) {
final AsyncBucket asyncBucket = bucket.async();
Observable
.from(ids)
.flatMap(new Func1<String, Observable<JsonDocument>>() {
@SuppressWarnings("unchecked")
@Override
public Observable<JsonDocument> call(String key) {
return asyncBucket.remove(key)
.retryWhen(RetryBuilder
.anyOf(BackpressureException.class)
.delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
.max(10)
.build());
}
})
.last()
.toBlocking()
.single();
}
@Override
public boolean exists(String id) {
return bucket.exists(id);
}
}

View File

@@ -0,0 +1,8 @@
package com.baeldung.couchbase.service;
import com.couchbase.client.java.Bucket;
public interface BucketService {
Bucket getBucket();
}

View File

@@ -0,0 +1,8 @@
package com.baeldung.couchbase.service;
import com.couchbase.client.java.Bucket;
public interface ClusterService {
Bucket openBucket(String name, String password);
}

View File

@@ -0,0 +1,36 @@
package com.baeldung.couchbase.service;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import org.springframework.stereotype.Service;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
@Service
public class ClusterServiceImpl implements ClusterService {
private Cluster cluster;
private Map<String, Bucket> buckets = new ConcurrentHashMap<>();
@PostConstruct
private void init() {
CouchbaseEnvironment env = DefaultCouchbaseEnvironment.create();
cluster = CouchbaseCluster.create(env, "localhost");
}
@Override
synchronized public Bucket openBucket(String name, String password) {
if(!buckets.containsKey(name)) {
Bucket bucket = cluster.openBucket(name, password);
buckets.put(name, bucket);
}
return buckets.get(name);
}
}

View File

@@ -0,0 +1,9 @@
package com.baeldung.couchbase.service;
public interface CouchbaseEntity {
String getId();
void setId(String id);
}

View File

@@ -0,0 +1,26 @@
package com.baeldung.couchbase.service;
import java.util.List;
public interface CrudService<T> {
void create(T t);
T read(String id);
T readFromReplica(String id);
void update(T t);
void delete(String id);
List<T> readBulk(Iterable<String> ids);
void createBulk(Iterable<T> items);
void updateBulk(Iterable<T> items);
void deleteBulk(Iterable<String> ids);
boolean exists(String id);
}

View File

@@ -0,0 +1,10 @@
package com.baeldung.couchbase.service;
import com.couchbase.client.java.document.JsonDocument;
public interface JsonDocumentConverter<T> {
JsonDocument toDocument(T t);
T fromDocument(JsonDocument doc);
}

View File

@@ -0,0 +1,32 @@
package com.baeldung.couchbase.service;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
@Service
@Qualifier("TutorialBucketService")
public class TutorialBucketService extends AbstractBucketService {
@PostConstruct
void init() {
openBucket();
}
@Autowired
public TutorialBucketService(ClusterService clusterService) {
super(clusterService);
}
@Override
protected String getBucketName() {
return "baeldung-tutorial";
}
@Override
protected String getBucketPassword() {
return "";
}
}

View File

@@ -0,0 +1,17 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>web - %date [%thread] %-5level %logger{36} - %message%n
</pattern>
</encoder>
</appender>
<logger name="org.springframework" level="WARN" />
<logger name="com.baeldung" level="DEBUG" />
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>

View File

@@ -0,0 +1,13 @@
package com.baeldung.couchbase;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestExecutionListeners;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.support.DependencyInjectionTestExecutionListener;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { IntegrationTestConfig.class })
@TestExecutionListeners(listeners = { DependencyInjectionTestExecutionListener.class })
public abstract class IntegrationTest {
}

View File

@@ -0,0 +1,9 @@
package com.baeldung.couchbase;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@Configuration
@ComponentScan(basePackages={"com.baeldung.couchbase"})
public class IntegrationTestConfig {
}

View File

@@ -0,0 +1,220 @@
package com.baeldung.couchbase.person;
import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import javax.annotation.PostConstruct;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import com.baeldung.couchbase.IntegrationTest;
import com.baeldung.couchbase.service.BucketService;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.document.JsonDocument;
public class PersonCrudServiceTest extends IntegrationTest {
@Autowired
private PersonCrudService personService;
@Autowired
@Qualifier("TutorialBucketService")
private BucketService bucketService;
@Autowired
private PersonDocumentConverter converter;
private Bucket bucket;
@PostConstruct
private void init() {
bucket = bucketService.getBucket();
}
@Test
public final void givenRandomPerson_whenCreate_thenPersonPersisted() {
//create person
Person person = randomPerson();
personService.create(person);
//check results
assertNotNull(person.getId());
assertNotNull(bucket.get(person.getId()));
//cleanup
bucket.remove(person.getId());
}
@Test
public final void givenId_whenRead_thenReturnsPerson() {
//create and insert person document
String id = insertRandomPersonDocument().id();
//read person and check results
assertNotNull(personService.read(id));
//cleanup
bucket.remove(id);
}
@Test
public final void givenNewHometown_whenUpdate_thenNewHometownPersisted() {
//create and insert person document
JsonDocument doc = insertRandomPersonDocument();
//update person
Person expected = converter.fromDocument(doc);
String updatedHomeTown = RandomStringUtils.randomAlphabetic(12);
expected.setHomeTown(updatedHomeTown);
personService.update(expected);
//check results
JsonDocument actual = bucket.get(expected.getId());
assertNotNull(actual);
assertNotNull(actual.content());
assertEquals(expected.getHomeTown(), actual.content().getString("homeTown"));
//cleanup
bucket.remove(expected.getId());
}
@Test
public final void givenRandomPerson_whenDelete_thenPersonNotInBucket() {
//create and insert person document
String id = insertRandomPersonDocument().id();
//delete person and check results
personService.delete(id);
assertNull(bucket.get(id));
}
@Test
public final void givenIds_whenReadBulk_thenReturnsOnlyPersonsWithMatchingIds() {
List<String> ids = new ArrayList<>();
//add some person documents
for(int i=0; i<5; i++) {
ids.add(insertRandomPersonDocument().id());
}
//perform bulk read
List<Person> persons = personService.readBulk(ids);
//check results
for(Person person : persons) {
assertTrue(ids.contains(person.getId()));
}
//cleanup
for(String id : ids) {
bucket.remove(id);
}
}
@Test
public final void givenPersons_whenInsertBulk_thenPersonsAreInserted() {
//create some persons
List<Person> persons = new ArrayList<>();
for(int i=0; i<5; i++) {
persons.add(randomPerson());
}
//perform bulk insert
personService.createBulk(persons);
//check results
for(Person person : persons) {
assertNotNull(bucket.get(person.getId()));
}
//cleanup
for(Person person : persons) {
bucket.remove(person.getId());
}
}
@Test
public final void givenPersons_whenUpdateBulk_thenPersonsAreUpdated() {
List<String> ids = new ArrayList<>();
//add some person documents
for(int i=0; i<5; i++) {
ids.add(insertRandomPersonDocument().id());
}
//load persons from Couchbase
List<Person> persons = new ArrayList<>();
for(String id : ids) {
persons.add(converter.fromDocument(bucket.get(id)));
}
//modify persons
for(Person person : persons) {
person.setHomeTown(RandomStringUtils.randomAlphabetic(10));
}
//perform bulk update
personService.updateBulk(persons);
//check results
for(Person person : persons) {
JsonDocument doc = bucket.get(person.getId());
assertEquals(person.getName(), doc.content().getString("name"));
assertEquals(person.getHomeTown(), doc.content().getString("homeTown"));
}
//cleanup
for(String id : ids) {
bucket.remove(id);
}
}
@Test
public void givenIds_whenDeleteBulk_thenPersonsAreDeleted() {
List<String> ids = new ArrayList<>();
//add some person documents
for(int i=0; i<5; i++) {
ids.add(insertRandomPersonDocument().id());
}
//perform bulk delete
personService.deleteBulk(ids);
//check results
for(String id : ids) {
assertNull(bucket.get(id));
}
}
private JsonDocument insertRandomPersonDocument() {
Person expected = randomPersonWithId();
JsonDocument doc = converter.toDocument(expected);
return bucket.insert(doc);
}
private Person randomPerson() {
return Person.Builder.newInstance()
.name(RandomStringUtils.randomAlphabetic(10))
.homeTown(RandomStringUtils.randomAlphabetic(10))
.build();
}
private Person randomPersonWithId() {
return Person.Builder.newInstance()
.id(UUID.randomUUID().toString())
.name(RandomStringUtils.randomAlphabetic(10))
.homeTown(RandomStringUtils.randomAlphabetic(10))
.build();
}
}

View File

@@ -0,0 +1,34 @@
package com.baeldung.couchbase.service;
import static org.junit.Assert.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestExecutionListeners;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.support.DependencyInjectionTestExecutionListener;
import com.baeldung.couchbase.IntegrationTest;
import com.baeldung.couchbase.IntegrationTestConfig;
import com.couchbase.client.java.Bucket;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = { IntegrationTestConfig.class })
@TestExecutionListeners(listeners = { DependencyInjectionTestExecutionListener.class })
public class ClusterServiceTest extends IntegrationTest {
@Autowired
private ClusterService couchbaseService;
private Bucket defaultBucket;
@Test
public void whenOpenBucket_thenBucketIsNotNull() throws Exception {
defaultBucket = couchbaseService.openBucket("default", "");
assertNotNull(defaultBucket);
assertFalse(defaultBucket.isClosed());
defaultBucket.close();
}
}