diff --git a/apache-zookeeper/pom.xml b/apache-zookeeper/pom.xml new file mode 100644 index 0000000000..6d49d74ade --- /dev/null +++ b/apache-zookeeper/pom.xml @@ -0,0 +1,30 @@ + + 4.0.0 + com.baeldung + apache-zookeeper + 0.0.1-SNAPSHOT + jar + + + + org.apache.zookeeper + zookeeper + 3.3.2 + + + com.sun.jmx + jmxri + + + com.sun.jdmk + jmxtools + + + javax.jms + jms + + + + + diff --git a/apache-zookeeper/src/main/java/com/baeldung/zookeeper/connection/ZKConnection.java b/apache-zookeeper/src/main/java/com/baeldung/zookeeper/connection/ZKConnection.java new file mode 100644 index 0000000000..0678250d57 --- /dev/null +++ b/apache-zookeeper/src/main/java/com/baeldung/zookeeper/connection/ZKConnection.java @@ -0,0 +1,33 @@ +package com.baeldung.zookeeper.connection; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.ZooKeeper; + +public class ZKConnection { + private ZooKeeper zoo; + final CountDownLatch connectionLatch = new CountDownLatch(1); + + public ZKConnection() { + } + + public ZooKeeper connect(String host) throws IOException, InterruptedException { + zoo = new ZooKeeper(host, 2000, new Watcher() { + public void process(WatchedEvent we) { + if (we.getState() == KeeperState.SyncConnected) { + connectionLatch.countDown(); + } + } + }); + connectionLatch.await(); + return zoo; + } + + public void close() throws InterruptedException { + zoo.close(); + } +} diff --git a/apache-zookeeper/src/main/java/com/baeldung/zookeeper/manager/ZKManager.java b/apache-zookeeper/src/main/java/com/baeldung/zookeeper/manager/ZKManager.java new file mode 100644 index 0000000000..0c0ad52123 --- /dev/null +++ b/apache-zookeeper/src/main/java/com/baeldung/zookeeper/manager/ZKManager.java @@ -0,0 +1,35 @@ +package com.baeldung.zookeeper.manager; + +import org.apache.zookeeper.KeeperException; + +public interface ZKManager { + /** + * Create a Znode and save some data + * + * @param path + * @param data + * @throws KeeperException + * @throws InterruptedException + */ + public void create(String path, byte[] data) throws KeeperException, InterruptedException; + + /** + * Get ZNode Data + * + * @param path + * @param boolean watchFlag + * @throws KeeperException + * @throws InterruptedException + */ + public Object getZNodeData(String path, boolean watchFlag); + + /** + * Update the ZNode Data + * + * @param path + * @param data + * @throws KeeperException + * @throws InterruptedException + */ + public void update(String path, byte[] data) throws KeeperException, InterruptedException, KeeperException; +} diff --git a/apache-zookeeper/src/main/java/com/baeldung/zookeeper/manager/ZKManagerImpl.java b/apache-zookeeper/src/main/java/com/baeldung/zookeeper/manager/ZKManagerImpl.java new file mode 100644 index 0000000000..adf76bc0f2 --- /dev/null +++ b/apache-zookeeper/src/main/java/com/baeldung/zookeeper/manager/ZKManagerImpl.java @@ -0,0 +1,58 @@ +package com.baeldung.zookeeper.manager; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; + +import com.baeldung.zookeeper.connection.ZKConnection; + +public class ZKManagerImpl implements ZKManager { + private static ZooKeeper zkeeper; + private static ZKConnection zkConnection; + + public ZKManagerImpl() { + initialize(); + } + + /** * Initialize connection */ + private void initialize() { + try { + zkConnection = new ZKConnection(); + zkeeper = zkConnection.connect("localhost"); + } catch (Exception e) { + System.out.println(e.getMessage()); + } + } + + public void closeConnection() { + try { + zkConnection.close(); + } catch (InterruptedException e) { + System.out.println(e.getMessage()); + } + } + + public void create(String path, byte[] data) throws KeeperException, InterruptedException { + zkeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + + public Object getZNodeData(String path, boolean watchFlag) { + try { + byte[] b = null; + b = zkeeper.getData(path, null, null); + String data = new String(b, "UTF-8"); + System.out.println(data); + return data; + } catch (Exception e) { + System.out.println(e.getMessage()); + } + return null; + } + + public void update(String path, byte[] data) throws KeeperException, InterruptedException { + int version = zkeeper.exists(path, true) + .getVersion(); + zkeeper.setData(path, data, version); + } +}