diff --git a/persistence-modules/java-cassandra/pom.xml b/persistence-modules/java-cassandra/pom.xml
index ad80fc8a83..6df75edc56 100644
--- a/persistence-modules/java-cassandra/pom.xml
+++ b/persistence-modules/java-cassandra/pom.xml
@@ -43,6 +43,11 @@
java-driver-query-builder
${datastax-cassandra.version}
+
+ io.netty
+ netty-transport
+ ${netty-transport-version}
+
@@ -50,6 +55,7 @@
3.1.2
3.1.1.0
4.1.0
+ 4.1.71.Final
-
\ No newline at end of file
+
diff --git a/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/Application.java b/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/Application.java
new file mode 100644
index 0000000000..598b72338e
--- /dev/null
+++ b/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/Application.java
@@ -0,0 +1,60 @@
+package com.baeldung.cassandra.batch;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.baeldung.cassandra.batch.domain.Product;
+import com.baeldung.cassandra.batch.repository.KeyspaceRepository;
+import com.baeldung.cassandra.batch.repository.ProductRepository;
+import com.datastax.oss.driver.api.core.CqlSession;
+
+public class Application {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Application.class);
+
+ public static void main(String[] args) {
+ new Application().run();
+ }
+
+ public void run() {
+ CassandraConnector connector = new CassandraConnector();
+ connector.connect("127.0.0.1", 9042, "datacenter1");
+ CqlSession session = connector.getSession();
+
+ KeyspaceRepository keyspaceRepository = new KeyspaceRepository(session);
+
+ keyspaceRepository.createKeyspace("testKeyspace", 1);
+ keyspaceRepository.useKeyspace("testKeyspace");
+
+ ProductRepository productRepository = new ProductRepository(session);
+
+ productRepository.createProductTable("testKeyspace");
+ productRepository.createProductByIdTable("testKeyspace");
+ productRepository.createProductByIdTable("testKeyspace");
+ Product product = getProduct();
+ productRepository.insertProductBatch(product);
+
+ Product productV1 = getProduct();
+ Product productV2 = getProduct();
+
+ productRepository.insertProductVariantBatch(productV1, productV2);
+
+
+ List products = productRepository.selectAllProduct("testKeyspace");
+ products.forEach(x -> LOG.info(x.toString()));
+ connector.close();
+ }
+
+ private Product getProduct() {
+ Product product = new Product();
+ product.setProductName("Banana");
+ product.setDescription("Banana");
+ product.setPrice(12f);
+
+ return product;
+ }
+}
+
+
diff --git a/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/CassandraConnector.java b/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/CassandraConnector.java
new file mode 100644
index 0000000000..02d63c62f6
--- /dev/null
+++ b/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/CassandraConnector.java
@@ -0,0 +1,31 @@
+package com.baeldung.cassandra.batch;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.CqlSessionBuilder;
+
+import java.net.InetSocketAddress;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class CassandraConnector {
+
+ private CqlSession session;
+
+ public void connect(final String node, final Integer port, final String dataCenter) {
+ CqlSessionBuilder builder = CqlSession.builder();
+ builder.addContactPoint(new InetSocketAddress(node, port));
+ if (StringUtils.isNotBlank(dataCenter)) {
+ builder.withLocalDatacenter(dataCenter);
+ }
+
+ session = builder.build();
+ }
+
+ public CqlSession getSession() {
+ return this.session;
+ }
+
+ public void close() {
+ session.close();
+ }
+}
diff --git a/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/domain/Product.java b/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/domain/Product.java
new file mode 100644
index 0000000000..a787225fae
--- /dev/null
+++ b/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/domain/Product.java
@@ -0,0 +1,79 @@
+package com.baeldung.cassandra.batch.domain;
+
+import java.util.UUID;
+
+public class Product {
+
+ public Product() {
+ super();
+ }
+
+ public Product(UUID productId, UUID variantId, String productName, String description, float price) {
+ super();
+ this.productId = productId;
+ this.variantId = variantId;
+ this.productName = productName;
+ this.description = description;
+ this.price = price;
+ }
+
+ public Product(UUID productId, String productName, String description, float price) {
+ super();
+ this.productId = productId;
+ this.productName = productName;
+ this.description = description;
+ this.price = price;
+ }
+
+ private UUID productId;
+ private UUID variantId;
+ private String productName;
+ private String description;
+ private float price;
+
+ public UUID getProductId() {
+ return productId;
+ }
+
+ public void setProductId(UUID productId) {
+ this.productId = productId;
+ }
+
+ public UUID getVariantId() {
+ return variantId;
+ }
+
+ public void setVariantId(UUID variantId) {
+ this.variantId = variantId;
+ }
+
+ public String getProductName() {
+ return productName;
+ }
+
+ public void setProductName(String productName) {
+ this.productName = productName;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ public float getPrice() {
+ return price;
+ }
+
+ public void setPrice(float price) {
+ this.price = price;
+ }
+
+ @Override
+ public String toString() {
+ return "Product [productId=" + productId + ", variantId=" + variantId + ", productName=" + productName
+ + ", description=" + description + ", price=" + price + "]";
+ }
+}
diff --git a/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/repository/KeyspaceRepository.java b/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/repository/KeyspaceRepository.java
new file mode 100644
index 0000000000..6d09af0bd3
--- /dev/null
+++ b/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/repository/KeyspaceRepository.java
@@ -0,0 +1,27 @@
+package com.baeldung.cassandra.batch.repository;
+
+import com.datastax.oss.driver.api.core.CqlIdentifier;
+import com.datastax.oss.driver.api.core.CqlSession;
+
+import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
+import com.datastax.oss.driver.api.querybuilder.schema.CreateKeyspace;
+
+public class KeyspaceRepository {
+ private final CqlSession session;
+
+ public KeyspaceRepository(CqlSession session) {
+ this.session = session;
+ }
+
+ public void createKeyspace(String keyspaceName, int numberOfReplicas) {
+ CreateKeyspace createKeyspace = SchemaBuilder.createKeyspace(keyspaceName)
+ .ifNotExists()
+ .withSimpleStrategy(numberOfReplicas);
+
+ session.execute(createKeyspace.build());
+ }
+
+ public void useKeyspace(String keyspace) {
+ session.execute("USE " + CqlIdentifier.fromCql(keyspace));
+ }
+}
diff --git a/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/repository/ProductRepository.java b/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/repository/ProductRepository.java
new file mode 100644
index 0000000000..106db133d1
--- /dev/null
+++ b/persistence-modules/java-cassandra/src/main/java/com/baeldung/cassandra/batch/repository/ProductRepository.java
@@ -0,0 +1,188 @@
+package com.baeldung.cassandra.batch.repository;
+
+import com.baeldung.cassandra.batch.domain.Product;
+import com.datastax.oss.driver.api.core.CqlIdentifier;
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
+import com.datastax.oss.driver.api.core.cql.PreparedStatement;
+import com.datastax.oss.driver.api.core.cql.BatchStatement;
+import com.datastax.oss.driver.api.core.cql.BoundStatement;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.datastax.oss.driver.api.core.type.DataTypes;
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
+import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
+import com.datastax.oss.driver.api.querybuilder.schema.CreateTable;
+import com.datastax.oss.driver.api.querybuilder.select.Select;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+public class ProductRepository {
+
+ private static final String PRODUCT_TABLE_NAME = "product";
+ private static final String PRODUCT_BY_ID_TABLE_NAME = "product_by_id";
+ private static final String PRODUCT_BY_NAME_TABLE_NAME = "product_by_name";
+
+ private final CqlSession session;
+
+ public ProductRepository(CqlSession session) {
+ this.session = session;
+ }
+
+ public void createProductTable(String keyspace) {
+ CreateTable createTable = SchemaBuilder.createTable(PRODUCT_TABLE_NAME).ifNotExists()
+ .withPartitionKey("product_id", DataTypes.UUID)
+ .withClusteringColumn("variant_id", DataTypes.UUID)
+ .withColumn("product_name", DataTypes.TEXT)
+ .withColumn("description", DataTypes.TEXT)
+ .withColumn("price", DataTypes.FLOAT);
+
+ executeStatement(createTable.build(), keyspace);
+ }
+
+ public void createProductByIdTable(String keyspace) {
+ CreateTable createTable = SchemaBuilder.createTable(PRODUCT_BY_ID_TABLE_NAME).ifNotExists()
+ .withPartitionKey("product_id", DataTypes.UUID)
+ .withColumn("product_name", DataTypes.TEXT)
+ .withColumn("title", DataTypes.TEXT)
+ .withColumn("description", DataTypes.TEXT)
+ .withColumn("price", DataTypes.FLOAT);
+
+ executeStatement(createTable.build(), keyspace);
+ }
+
+ public void createProductTableByName(String keyspace) {
+ CreateTable createTable = SchemaBuilder.createTable(PRODUCT_BY_NAME_TABLE_NAME).ifNotExists()
+ .withPartitionKey("product_name", DataTypes.TEXT)
+ .withColumn("product_id", DataTypes.UUID)
+ .withColumn("description", DataTypes.TEXT)
+ .withColumn("price", DataTypes.FLOAT);
+
+ executeStatement(createTable.build(), keyspace);
+ }
+
+ /**
+ * Insert two variant Product into same table using a batch query.
+ *
+ * @param Product
+ */
+ public void insertProductVariantBatch(Product productVariant1,Product productVariant2) {
+ UUID productId = UUID.randomUUID();
+ BoundStatement productBoundStatement1 = this.getProductVariantInsertStatement(productVariant1,productId);
+ BoundStatement productBoundStatement2 = this.getProductVariantInsertStatement(productVariant2,productId);
+
+ BatchStatement batch = BatchStatement.newInstance(DefaultBatchType.UNLOGGED,
+ productBoundStatement1,productBoundStatement2);
+
+ session.execute(batch);
+ }
+
+
+ /**
+ * Insert two same Product into related tables using a batch query.
+ *
+ * @param book
+ */
+ public void insertProductBatch(Product product) {
+ UUID productId = UUID.randomUUID();
+
+ BoundStatement productBoundStatement1 = this.getProductInsertStatement(product,productId,PRODUCT_BY_ID_TABLE_NAME);
+ BoundStatement productBoundStatement2 = this.getProductInsertStatement(product,productId,PRODUCT_BY_NAME_TABLE_NAME);
+
+ BatchStatement batch = BatchStatement.newInstance(DefaultBatchType.LOGGED,
+ productBoundStatement1,productBoundStatement2);
+
+ session.execute(batch);
+ }
+
+ public List selectAllProduct(String keyspace) {
+ Select select = QueryBuilder.selectFrom(PRODUCT_TABLE_NAME).all();
+
+ ResultSet resultSet = executeStatement(select.build(), keyspace);
+
+ List result = new ArrayList<>();
+
+ resultSet.forEach(x -> result.add(new Product(x.getUuid("product_id"), x.getUuid("variant_id"),
+ x.getString("product_name"), x.getString("description"), x.getFloat("price"))));
+
+ return result;
+ }
+
+ public List selectAllProductByName(String keyspace) {
+ Select select = QueryBuilder.selectFrom(PRODUCT_BY_NAME_TABLE_NAME).all();
+
+ ResultSet resultSet = executeStatement(select.build(), keyspace);
+
+ List result = new ArrayList<>();
+
+ resultSet.forEach(x -> result.add(new Product(x.getUuid("product_id"),
+ x.getString("product_name"), x.getString("description"), x.getFloat("price"))));
+
+ return result;
+ }
+
+ public List selectAllProductById(String keyspace) {
+ Select select = QueryBuilder.selectFrom(PRODUCT_BY_ID_TABLE_NAME).all();
+
+ ResultSet resultSet = executeStatement(select.build(), keyspace);
+
+ List result = new ArrayList<>();
+
+ resultSet.forEach(x -> result.add(new Product(x.getUuid("product_id"),
+ x.getString("product_name"), x.getString("description"), x.getFloat("price"))));
+
+ return result;
+ }
+
+ /**
+ * Delete table.
+ *
+ * @param tableName the name of the table to delete.
+ */
+ public void deleteTable(String tableName) {
+ StringBuilder sb = new StringBuilder("DROP TABLE IF EXISTS ").append(tableName);
+
+ final String query = sb.toString();
+ session.execute(query);
+ }
+
+ private ResultSet executeStatement(SimpleStatement statement, String keyspace) {
+ if (keyspace != null) {
+ statement.setKeyspace(CqlIdentifier.fromCql(keyspace));
+ }
+
+ return session.execute(statement);
+ }
+
+ private BoundStatement getProductVariantInsertStatement(Product product,UUID productId) {
+ String insertQuery = new StringBuilder("").append("INSERT INTO ").append(PRODUCT_TABLE_NAME)
+ .append("(product_id,variant_id,product_name,description,price) ").append("VALUES (").append(":product_id")
+ .append(", ").append(":variant_id").append(", ").append(":product_name").append(", ")
+ .append(":description").append(", ").append(":price").append(");").toString();
+
+ PreparedStatement preparedStatement = session.prepare(insertQuery);
+
+ return preparedStatement.bind(productId, UUID.randomUUID(),
+ product.getProductName(),
+ product.getDescription(),
+ product.getPrice());
+ }
+
+ private BoundStatement getProductInsertStatement(Product product,UUID productId,String productTableName) {
+ String cqlQuery1 = new StringBuilder("").append("INSERT INTO ").append(productTableName)
+ .append("(product_id,product_name,description,price) ").append("VALUES (").append(":product_id")
+ .append(", ").append(":product_name").append(", ").append(":description").append(", ")
+ .append(":price").append(");").toString();
+
+ PreparedStatement preparedStatement = session.prepare(cqlQuery1);
+
+ return preparedStatement.bind(productId,
+ product.getProductName(),
+ product.getDescription(),
+ product.getPrice());
+ }
+
+
+}
diff --git a/persistence-modules/java-cassandra/src/test/java/com/baeldung/cassandra/batch/epository/ProductRepositoryIntegrationTest.java b/persistence-modules/java-cassandra/src/test/java/com/baeldung/cassandra/batch/epository/ProductRepositoryIntegrationTest.java
new file mode 100644
index 0000000000..55dc3dad9f
--- /dev/null
+++ b/persistence-modules/java-cassandra/src/test/java/com/baeldung/cassandra/batch/epository/ProductRepositoryIntegrationTest.java
@@ -0,0 +1,133 @@
+package com.baeldung.cassandra.batch.epository;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.thrift.transport.TTransportException;
+import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+import com.baeldung.cassandra.batch.CassandraConnector;
+import com.baeldung.cassandra.batch.domain.Product;
+import com.baeldung.cassandra.batch.repository.KeyspaceRepository;
+import com.baeldung.cassandra.batch.repository.ProductRepository;
+import com.datastax.oss.driver.api.core.CqlIdentifier;
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.ColumnDefinition;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class ProductRepositoryIntegrationTest {
+
+ private KeyspaceRepository schemaRepository;
+
+ private ProductRepository productRepository;
+
+ private CqlSession session;
+
+ private final String KEYSPACE_NAME = "testBaeldungKeyspace";
+ private final String PRODUCT = "product";
+
+ @BeforeClass
+ public static void init() throws ConfigurationException, TTransportException, IOException, InterruptedException {
+ // Start an embedded Cassandra Server
+ EmbeddedCassandraServerHelper.startEmbeddedCassandra(20000L);
+ }
+
+ @Before
+ public void connect() {
+ CassandraConnector client = new CassandraConnector();
+ client.connect("127.0.0.1", 9142,"datacenter1");
+ session = client.getSession();
+ schemaRepository = new KeyspaceRepository(client.getSession());
+ schemaRepository.createKeyspace(KEYSPACE_NAME, 1);
+ schemaRepository.useKeyspace(KEYSPACE_NAME);
+ productRepository = new ProductRepository(client.getSession());
+ }
+
+ @Test
+ public void whenCreatingAProductTable_thenCreatedCorrectly() {
+ productRepository.deleteTable(KEYSPACE_NAME);
+ productRepository.createProductTable(KEYSPACE_NAME);
+
+ ResultSet result = session.execute("SELECT * FROM " + KEYSPACE_NAME + "." + PRODUCT + ";");
+
+ List colDef = new ArrayList<>();
+
+ result.getColumnDefinitions().forEach(columnDef -> colDef.add(columnDef));
+ List columnNames = colDef.stream().map(ColumnDefinition::getName).map(CqlIdentifier::toString).collect(Collectors.toList());
+ assertEquals(columnNames.size(), 5);
+ assertTrue(columnNames.contains("product_id"));
+ assertTrue(columnNames.contains("variant_id"));
+ assertTrue(columnNames.contains("product_name"));
+ assertTrue(columnNames.contains("description"));
+ assertTrue(columnNames.contains("price"));
+ }
+
+ @Test
+ public void whenCreatingRelatedProductBatch_thenCreatedCorrectly() {
+ productRepository.deleteTable(KEYSPACE_NAME);
+ productRepository.createProductTableByName(KEYSPACE_NAME);
+ productRepository.createProductByIdTable(KEYSPACE_NAME);
+
+ Product product = getTestProduct();
+ productRepository.insertProductBatch(product);
+ List productByIdList = productRepository.selectAllProductById(KEYSPACE_NAME);
+ List productByNameList = productRepository.selectAllProductByName(KEYSPACE_NAME);
+
+ assertEquals(productByIdList.size(), 1);
+ assertEquals(productByNameList.size(), 1);
+ assertEquals(productByIdList.get(0).getProductName(), "Banana");
+ assertEquals(productByNameList.get(0).getProductName(), "Banana");
+ assertEquals(productByIdList.get(0).getDescription(), "Banana");
+ assertEquals(productByNameList.get(0).getDescription(), "Banana");
+ assertEquals(productByIdList.get(0).getPrice(), 12f,0f);
+ assertEquals(productByNameList.get(0).getPrice(), 12f,0f);
+ }
+
+ @Test
+ public void whenCreatingMultiVariantProductBatch_thenCreatedCorrectly() {
+ productRepository.deleteTable(KEYSPACE_NAME);
+ productRepository.createProductTable(KEYSPACE_NAME);
+
+ Product productV1 = getTestProduct();
+ Product productV2 = getTestProduct();
+ productRepository.insertProductVariantBatch(productV1, productV2);
+ List productList = productRepository.selectAllProduct(KEYSPACE_NAME);
+
+ assertEquals(productList.size(), 2);
+ assertEquals(productList.get(0).getProductName(), "Banana");
+ assertEquals(productList.get(1).getProductName(), "Banana");
+ assertEquals(productList.get(0).getDescription(), "Banana");
+ assertEquals(productList.get(1).getDescription(), "Banana");
+ assertEquals(productList.get(0).getPrice(), 12f,0f);
+ assertEquals(productList.get(1).getPrice(), 12f,0f);
+ }
+
+
+
+ @AfterClass
+ public static void cleanup() {
+ EmbeddedCassandraServerHelper.cleanEmbeddedCassandra();
+ }
+
+ private Product getTestProduct() {
+ Product product = new Product();
+ product.setProductName("Banana");
+ product.setDescription("Banana");
+ product.setPrice(12f);
+
+ return product;
+ }
+}