zookeeper学习02 使用

栏目: 编程工具 · 发布时间: 5年前

内容简介:知识点:1)zookeeper客户端的使用2)zoo.cfg里面配置信息的讲解

知识点:

1)zookeeper客户端的使用

2)zoo.cfg里面配置信息的讲解

3)zookeeper的一些常见概念模型

4)zookeeper java客户端的使用

zoo.cfg配置文件参数分析

tickTime= 2000 zookeeper最小时间单位长度(ms)

initLimit=10 结果是10

tickTime follower节点启动之后与leader节点完成数据同步的时间

tickTime follower节点与leader节点进行心跳检测的最大延迟时间

dataDir=/tmp/zookeeper 表示zookeeper服务器存储快照文件的目录

dataLogDir 表示配置zookeeper事务日志的存储日志 默认指定在dataDir目录下

clientport 表示客户端和服务器建立连接的端口号:2181【默认】

客户端的操作

先进入客户端 shzkCli.sh

zookeeper学习02 使用

看看他有哪些命令

zookeeper学习02 使用

zookeeper中的一些概念 1)数据模型 zookeeper的数据模型和文件系统类似,每一个节点称为:znode 是zookeeper中的最小数据单元,每个znode上都可以保存数据和挂载子节点,从而构成一个层次化的属性结构。

zookeeper学习02 使用

持久化节点 :节点创建后会一直存在zookeeper服务器上,直到主动给删除

持久化有序节点:每个节点都会为他的一级子节点维护一个顺序

临时节点:临时节点的生命周期和客户端的会话保持一致,当客户端会话失效,该节点自动清理

临时有序节点:临时节点上多了一个顺序性的特性

会话:四种状态

NOT CONNECTED 未连接

CONNECTING 连接中

CONNECTED 已连接

CLOSED 关闭

watcher

zookeeper 提供了分布式数据发布/订阅 ,zookeeper允许客户端向服务器注册一个watcher监听,当服务器端的节点触发指定事件的时候会触发watcher,服务端会向客户端发送一个事件通知

watcher的通知是一次性,一旦触发一次通知后,该watcher就失效了。

ACL

zookeeper提供了控制节点访问权限的功能,用于有效保证zookeeper中数据的安全性,避免误操作而导致系统出现重大事故。

create/read/write/delete/admin

zookeeper命令

根据上面的说话,zookeeper是基于类似于文件系统的数据结构

我们先看下它的根

zookeeper学习02 使用

没有节点的时候我们先创建一个节点

创建节点命令

1) create [-s] [-e] path data acl

-s 代表是否是有序的

-e 代表是否是临时的

默认是持久化节点

path 代表路径

data 代表数据

acl 代表权限

create /lulf 123

zookeeper学习02 使用

我们再来看下这个节点下的是否有值

zookeeper学习02 使用

创建子节点必须要携带完整路径

2)get path [watch]

获取指定path的信息

zookeeper学习02 使用

3) set path data [version]

修改节点path对应的data

zookeeper学习02 使用

4) delete path [version]

删除指定节点

version表示锁的概念【乐观锁】

数据库中有个version字段来控制数据行的版本好

zookeeper学习02 使用

stat信息

节点的状态信息

cZxid = 0x100000006 节点被创建时的事务id

ctime = Mon Aug 27 17:05:03 CST 2018 节点被创建时的时间

mZxid = 0x10000000d 节点被最后一次修改的事务id

mtime = Mon Aug 27 18:58:20 CST 2018 节点被最后一次修改的时间

pZxid = 0x100000007 子节点被最后一次修改的事务id

cversion = 1 子节点的版本号

dataVersion = 1 数据版本号

aclVersion = 0 accesscontrol 节点修改权限

ephemeralOwner = 0x0 创建临时节点的时候,会有一个sessionid,该值存储的就是这个id

dataLength = 3 数据长度

numChildren = 1 子节点数

java API的使用 1)导入jar包

<dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <version>3.4.8</version>
    </dependency>
复制代码

2)建立连接

package com.lulf.javaapi;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

public class CreateSessionDemo {

	private final static String CONNECTSTRING = "192.168.48.128:2181,192.168.48.130:2181,"
			+ "192.168.48.131:2181,192.168.48.132:2181";
	private static CountDownLatch countDownLatch=new CountDownLatch(1);
	public static void main(String[] args) throws IOException, InterruptedException {
		ZooKeeper zooKeeper = new ZooKeeper(CONNECTSTRING, 5000, new Watcher() {
			
			@Override
			public void process(WatchedEvent watchedEvent) {
				//如果当前连接状态是连接成功的,通过计数器去控制
				if(watchedEvent.getState()==Event.KeeperState.SyncConnected){
					countDownLatch.countDown();
					System.out.println(watchedEvent.getState());
				}
			}
		});
		countDownLatch.await();
		System.out.println("test  "+zooKeeper.getState());
	}
}
复制代码

3)创建节点

zookeeper学习02 使用
package com.lulf.javaapi;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;

public class CreateNodeDemo implements Watcher {

	private final static String CONNECTSTRING = "192.168.48.128:2181,192.168.48.130:2181,"
			+ "192.168.48.131:2181,192.168.48.132:2181";
	private static CountDownLatch countDownLatch = new CountDownLatch(1);
	private static ZooKeeper zooKeeper = null;
	private static Stat stat = new Stat();

	public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
		zooKeeper = new ZooKeeper(CONNECTSTRING, 5000, new CreateNodeDemo());
		countDownLatch.await();
		System.out.println("test  " + zooKeeper.getState());
		// 创建节点
		String path = "/FXP";
		byte[] data = "1234".getBytes();
		List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
		CreateMode createMode = CreateMode.EPHEMERAL;
		String result = zooKeeper.create(path, data, acl, createMode);
		zooKeeper.getData(path, true, stat);// 增加一个watcher
		System.out.println("创建成功  " + result);

		// 修改数据 -1代表不用管他版本号
		zooKeeper.setData(path, "311".getBytes(), -1);

		//删除节点	
		zooKeeper.delete(path, -1);
		Thread.sleep(2000);
		
		//创建节点和子节点【这边要注意只有持久化节点才有子节点】
		zooKeeper.create("/LULF", "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
		TimeUnit.SECONDS.sleep(1);
		zooKeeper.create("/LULF/lulf1-1", "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
		TimeUnit.SECONDS.sleep(1);
		
		

	}

	@Override
	public void process(WatchedEvent watchedEvent) {
		// 如果当前连接状态是连接成功的,通过计数器去控制
		if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
			if (Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getType()) {
				countDownLatch.countDown();
				System.out.println(watchedEvent.getState());
			} else if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {
				try {
					System.out.println("路径 " + watchedEvent.getPath() + "改变后的值 "
							+ zooKeeper.getData(watchedEvent.getPath(), true, new Stat()));
				} catch (KeeperException | InterruptedException e) {
					e.printStackTrace();
				}
			} else if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {

			} else if (watchedEvent.getType() == Event.EventType.NodeCreated) {
				try {
					System.out.println("路径 " + watchedEvent.getPath() + "节点的值 "
							+ zooKeeper.getData(watchedEvent.getPath(), true, new Stat()));
				} catch (KeeperException | InterruptedException e) {
					e.printStackTrace();
				}
			} else if (watchedEvent.getType() == Event.EventType.NodeDeleted) {
					System.out.println("删除路径 " + watchedEvent.getPath());
			}
		}
	}
}
复制代码

权限控制模式

schema:授权对象

ip : 192.168.48.131

digest : username password

world :开放式的权限控制模式 访问权限对所有用户开放 world:anyone

super :超级用户 可以对zookeeper上的数据节点进行操作

zookeeper学习02 使用

如上可以对创建节点做多个要求

连接状态

1)keeperStat:expored

在一定的时间内客户端没有收到服务器的通知,则认为当前的会话已经过期了,

2)disconnected 断开连接状态

3)keeperstat.syncConnected 客户端和服务器端在某个节点上建立连接,并且完成一次ersion,

zxid的同步

4)keeperstatus.authFailed 授权失败

NodeCreated 当节点被创建时触发

NodeChildrenChanged 标识子节点被创建,被删除,子节点数据发生变化

NodeDataChanged 节点数据发生改变

NodeDeleted 节点被删除

None 当客户端和服务器连接状态发生变化的时候,事件类型就是None

zkclient

1)引入jar包

<dependency>
      <groupId>com.101tec</groupId>
      <artifactId>zkclient</artifactId>
      <version>0.10</version>
    </dependency>
复制代码

2)创建会话

zookeeper学习02 使用
zookeeper学习02 使用

它的watcher怎么做呢?

zookeeper学习02 使用

curator

Curator本身是Netfilx公司开源的zookeeper客户端

curator提供了各种应用场景的实现封装

curator-framework 提供了fluent风格的api

curator-replice 提供了实现封装

1)创建seesion

ExponentialBackoffRetry 衰减重试 减少系统负担 间隔时间 充实次数

RetryNTimes 指定最大重试次数

ReteyOneTime 仅仅重试一次

RetryUnitilElapsed 一直重试知道规定的时间

zookeeper学习02 使用

俩种创立连接的风格

zookeeper学习02 使用

2)增删改

zookeeper学习02 使用

3)异步操作及事务处理 Collection transresult=curatorFramework.inTransaction().create().forPath("/trans", "111".getBytes()).and().setData() .forPath("/lll", "555".getBytes()).and().commit();

package com.lulf.curator;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

public class CuratorOperationDemo {
	public static void main(String[] args) throws Exception {
		CuratorFramework curatorFramework = CuratorClientUtils.getInstance();
		System.out.println("连接成功。。。。");
		// 创建节点
		curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
				.forPath("/curator/curator1-1/curator1-1-1", "123445".getBytes());
		// 删除节点 默认情况下,version是-1
		curatorFramework.delete().deletingChildrenIfNeeded().forPath("/curator");
		// 获取数据
		Stat stat = new Stat();
		byte[] bs = curatorFramework.getData().storingStatIn(stat).forPath("/curator");
		String result = new String(bs);
		// 更新
		Stat stat1 = curatorFramework.setData().forPath("/curator", "312".getBytes());

		// 异步操作
		ExecutorService service = Executors.newFixedThreadPool(1);
		curatorFramework.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT)
				.inBackground(new BackgroundCallback() {
					@Override
					public void processResult(CuratorFramework arg0, CuratorEvent arg1) throws Exception {
						System.out.println(Thread.currentThread().getName() + "-->" + arg1.getResultCode());
					}
				}, service).forPath("/Allen", "allen".getBytes());
		// 事务【curator独有的】
		Collection<CuratorTransactionResult> transresult=curatorFramework.inTransaction().create().forPath("/trans", "111".getBytes()).and().setData()
				.forPath("/lll", "555".getBytes()).and().commit();
		
		System.out.println("success");

	}

}
复制代码

4)事件处理

package com.lulf.curator;

import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.zookeeper.CreateMode;

public class CuratorEventDemo {
	public static void main(String[] args) throws Exception {
		CuratorFramework curatorFramework = CuratorClientUtils.getInstance();
		//System.out.println("连接成功。。。");
		// curator事件处理
		// 三种watcher来做节点监听
		// pathcache 监视一个路径下子节点的创建,删除节点,数据更新
		// NodeCache 监视一个节点的创建,更新,删除
		// TreeCache pathcache和nodecache的结合(监视路径下的创建更新和删除事件),
		// 缓存路径下所有子节点的数据
		
		//node变化NodeCache
//		NodeCache cache = new NodeCache(curatorFramework, "/lulf", false);
//		cache.start(true);
//		cache.getListenable()
//				.addListener(() -> System.out.println("节点数据发生变化" + new String(cache.getCurrentData().getData())));
//		curatorFramework.setData().forPath("/lulf", "gbcq".getBytes());
	
		
		//pathcache
		PathChildrenCache cache2=new PathChildrenCache(curatorFramework, "/event", false);
		cache2.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
		cache2.getListenable().addListener((curatorFramework1,PathChildrenCacheEvent)->{
			switch (PathChildrenCacheEvent.getType()) {
			case CHILD_ADDED:
				System.out.println("增加子节点");
				break;
            case CHILD_REMOVED:
            	System.out.println("删除子节点");
				break; 
            case CHILD_UPDATED:
            	System.out.println("更新子节点");
				break; 
			default:
				break;
			}
		});
		curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/event", "1".getBytes());
		TimeUnit.SECONDS.sleep(1);
		curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/event/event1", "1.1".getBytes());
		TimeUnit.SECONDS.sleep(1);
		curatorFramework.setData().forPath("/event/event1", "1.2".getBytes());
		curatorFramework.delete().forPath("/event/event1");
		System.in.read();
	}
}
复制代码

以上所述就是小编给大家介绍的《zookeeper学习02 使用》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

图形程序开发人员指南

图形程序开发人员指南

Michael Abrash / 前导工作室 / 机械工业出版社 / 1998 / 128

Michael Abrash's classic Graphics Programming Black Book is a compilation of Michael's previous writings on assembly language and graphics programming (including from his "Graphics Programming" column......一起来看看 《图形程序开发人员指南》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

MD5 加密
MD5 加密

MD5 加密工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试