Zookeeper详细教程、分布式协调服务原理

栏目: 服务器 · 发布时间: 7年前

内容简介:Zookeeper分布式服务框架是Apache Hadoop的一个子项目,主要为分布式系统提供协调服务以及一些数据管理问题,如命名服务、集群管理、分布式应用配置等。zookeeper可以将简单易用的接口和高效稳定的系统提供给用户。在大型网站中,zookeeper一直占据着重要地位,主要功能如下:zookeeper是为别的分布式程序服务的

Zookeeper分布式服务框架是Apache Hadoop的一个子项目,主要为分布式系统提供协调服务以及一些数据管理问题,如命名服务、集群管理、分布式应用配置等。zookeeper可以将简单易用的接口和高效稳定的系统提供给用户。

在大型网站中,zookeeper一直占据着重要地位,主要功能如下:

zookeeper是为别的分布式程序服务的

Zookeeper本身就是一个分布式程序(只要有半数以上节点存活,zk就能正常服务)

Zookeeper所提供的服务涵盖:主从协调、服务器节点动态上下线、统一配置管理、分布式共享锁、统一名称服务

虽然说可以提供各种服务,但是zookeeper在底层其实只提供了两个功能:

为用户程序提供数据节点监听服务;

管理(存储,读取)用户程序提交的数据;

一、zookeeper集群安装

由于zookeeper的集群投票选主机制(下面会介绍),超过半数的节点投票才能完成选主。并且必须超过半数的节点存活才能提供服务。所以集群中节点数最好为奇数台,但不少于3台。我们将我们的zookeeper集群安装到三台虚拟机上。

1.1 环境准备

三台虚拟机

192.168.66.101

192.168.66.102

192.168.66.103

JDK安装包(jdk-7u71-linux-i586.tar.gz)

zookeeper安装包(zookeeper-3.4.5.tar.gz)

1.2 创建zk用户

登录三台虚拟机,添加zookeeper的管理用户,执行如下命令添加一个新用户,注意必须使用root用户权限来添加新用户,需要在三台虚拟机上都要创建一个新用户。

groupadd zkg #添加一个组

useradd zk -g zkg # 添加一个用户,并制定该用户属于zkg组

passwd # 给用户设置密码,下面提示输入密码,以及确认密码

Zookeeper详细教程、分布式协调服务原理 1.3 安装

三台虚拟机的用户创建完成之后,在一台虚拟机上来安装zookeeper,切换到刚创建的用户下执行如下命令

su zk #切换到zk用户

cd ~ # 进入zk用户的home目录

使用ftp工具将jdk安装包和zookeeper安装包上传至任一台虚拟机中zk用户home目录下,并解压至apps目录下

mkdir apps

tar -zxvf jdk-7u71-linux-i586.tar.gz -C apps/

tar -zxvf zookeeper-3.4.5.tar.gz -C apps/

按照下面的步骤,配置环境变量,JDK安装完毕,注意三台机器都需要配置

su # 切换到root用户

vi /etc/profile # 编辑系统配置文件,将下面三行内容粘贴至文件末尾

########################################################

export JAVA_HOME=/home/zk/apps/jdk-7u71-linux-i586

export ZOOKEEPER_HOME=/home/zk/apps/zookeeper-3.4.5

export PATH=$PATH:$ZOOKEEPER_HOME/bin:$JAVA_HOME/bin

#########################################################

source /etc/profile # 使修改后配置文件生效

su zk # 切换为zk用户

检查jdk的安装是否成功,只检查已有加压文件的机器

java -version # 查看jdk版本信息

修改zookeeper的配置文件

cd ~/apps/zookeeper-3.4.5/conf # 进入到zookeeper的配置文件存放目录

cp zoo_sample.cfg zoo.cfg # 将zoo_sample.cfg复制一份,并更名为zoo.cfg

vi zoo.cfg # 编辑zoo.cfg并将下面内容添加到文件末尾

#########################################################

server.1=192.168.66.101:2888:3888 # (主机名, 心跳端口、数据端口)

server.2=192.168.66.102:2888:3888

server.3=192.168.66.103:2888:3888

#########################################################

修改dataDir和dataLogDir的值

#########################################################

dataDir=/home/zk/apps/zookeeper-3.4.5/data

dataLogDir=/home/zk/apps/zookeeper-3.4.5/log

#########################################################

:wq #保存并退出

创建data和log目录,用于存放数据和日志信息

cd /home/zk/apps/zookeeper-3.4.5/

mkdir -m 755 data

mkdir -m 755 log

在data文件夹下新建myid文件,myid的文件内容为1

cd data/

echo 1 > myid #新建myid文件,并输入内容为1

将配置好的文件目录发至其他机器上

scp -r /home/zk/apps zk@192.168.66.102:/home/zk/ # 通过scp将apps目录发至其他机器,需要输入密码

scp -r /home/zk/apps zk@192.168.66.103:/home/zk/

修改其他机器上的myid

到192.168.66.102上:修改myid为:2

到192.168.66.103上:修改myid为:3

启动每台机器上的zookeeper

zkServer.sh start

查看集群状态

jps # 查看进程

zkServer.sh status #查看集群状态,主从信息

二、zookeeper的结构和命令

2.1 zookeeper特性

Zookeeper:一个leader,多个follower组成的集群

全局数据一致:每个server保存一份相同的数据副本,client无论连接到哪个server,数据都是一致的

分布式读写,更新请求转发,由leader实施

更新请求顺序进行,来自同一个client的更新请求按其发送顺序依次执行

数据更新原子性,一次数据更新要么成功,要么失败

实时性,在一定时间范围内,client能读到最新数据

2.2 zookeeper数据结构

层次化的目录结构,命名符合常规文件系统规范(见下图)

每个节点在zookeeper中叫做znode,并且其有一个唯一的路径标识

节点Znode可以包含数据和子节点(但是EPHEMERAL类型的节点不能有子节点,下一页详细讲解)

客户端应用可以在节点上设置监视器(后续详细讲解)

Zookeeper详细教程、分布式协调服务原理 2.3 节点类型

Znode有两种类型

短暂(ephemeral)(断开连接自己删除)

持久(persistent)(断开连接不删除)

Znode有四种形式的目录节点(默认是persistent )

PERSISTENT

PERSISTENT_SEQUENTIAL(持久序列/test0000000019 )

EPHEMERAL

EPHEMERAL_SEQUENTIAL

创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护

在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序

2.4 zookeeper命令行操作

运行 zkCli.sh –server <ip>进入命令行工具

zkCli.sh –server 192.168.66.101

ls / #查看当前 ZooKeeper 中所包含的内容

create /zk "myData" # 创建一个新的 znode ,使用 create /zk myData 。这个命令创建了一个新的 znode 节点“ zk ”以及与它关联的字符串

get /zk # 我们运行 get 命令来确认 znode 是否包含我们所创建的字符串

get /zk watch

#监听这个节点的变化,当另外一个客户端改变/zk时,它会打出下面的

#WATCHER::

#WatchedEvent state:SyncConnected type:NodeDataChanged path:/zk

set /zk "zsl" #通过 set 命令来对 zk 所关联的字符串进行设置

delete /zk # 删除一个节点

rmr /zk # 删除一个节点

2.5 zookeeper Java api的使用

2.5.1 基本使用

org.apache.zookeeper.Zookeeper是客户端入口主类,负责建立与server的会话

它提供了表 1 所示几类主要方法

功能

描述

create

在本地目录树中创建一个节点

delete

删除一个节点

exists

测试本地是否存在目标节点

get/set data

从目标节点上读取 / 写数据

get/set ACL

获取 / 设置目标节点访问控制列表信息

get children

检索一个子节点上的列表

sync

等待要被传送的数据

2.5.2 是用Java API实现简单的增删改查

public class SimpleDemo {

// 会话超时时间,设置为与系统默认时间一致

private static final int SESSION_TIMEOUT = 30000;

// 创建 ZooKeeper 实例

ZooKeeper zk;

// 创建 Watcher 实例

Watcher wh = new Watcher() {

public void process(org.apache.zookeeper.WatchedEvent event)

{

System.out.println(event.toString());

}

};

// 初始化 ZooKeeper 实例

private void createZKInstance() throws IOException

{

zk = new ZooKeeper("weekend01:2181", SimpleDemo.SESSION_TIMEOUT, this.wh);

}

private void ZKOperations() throws IOException, InterruptedException, KeeperException

{

System.out.println("/n1. 创建 ZooKeeper 节点 (znode : zoo2, 数据: myData2 ,权限: OPEN_ACL_UNSAFE ,节点类型: Persistent");

zk.create("/zoo2", "myData2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

System.out.println("/n2. 查看是否创建成功: ");

System.out.println(new String(zk.getData("/zoo2", false, null)));

System.out.println("/n3. 修改节点数据 ");

zk.setData("/zoo2", "shenlan211314".getBytes(), -1);

System.out.println("/n4. 查看是否修改成功: ");

System.out.println(new String(zk.getData("/zoo2", false, null)));

System.out.println("/n5. 删除节点 ");

zk.delete("/zoo2", -1);

System.out.println("/n6. 查看节点是否被删除: ");

System.out.println(" 节点状态: [" + zk.exists("/zoo2", false) + "]");

}

private void ZKClose() throws InterruptedException

{

zk.close();

}

public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

SimpleDemo dm = new SimpleDemo();

dm.createZKInstance();

dm.ZKOperations();

dm.ZKClose();

}

}

2.5.3 Zookeeper的监听器工作机制

Zookeeper详细教程、分布式协调服务原理

监听器是一个接口,我们的代码中可以实现Wather这个接口,实现其中的process方法,方法中即我们自己的业务逻辑

监听器的注册是在获取数据的操作中实现:

getData(path,watch?)监听的事件是:节点数据变化事件

getChildren(path,watch?)监听的事件是:节点下的子节点增减变化事件

三、zookeeper的应用案例(分布式应用HA||分布式锁)

3.1 实现分布式应用的(主节点HA)及客户端动态更新主节点状态

某分布式系统中,主节点可以有多台,可以动态上下线

任意一台客户端都能实时感知到主节点服务器的上下线

Zookeeper详细教程、分布式协调服务原理

A、客户端实现

public class AppClient {

private String groupNode = "sgroup";

private ZooKeeper zk;

private Stat stat = new Stat();

private volatile List<String> serverList;

/**

  • 连接zookeeper
    */
    public void connectZookeeper() throws Exception {
    zk
    = new ZooKeeper("localhost:4180,localhost:4181,localhost:4182", 5000, new Watcher() {
    public void process(WatchedEvent event) {
    // 如果发生了"/sgroup"节点下的子节点变化事件, 更新server列表, 并重新注册监听
    if (event.getType() == EventType.NodeChildrenChanged
    && ("/" + groupNode).equals(event.getPath())) {
    try {
    updateServerList();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }
    });
    updateServerList();
    }
    /**
  • 更新server列表
    */
    private void updateServerList() throws Exception {
    List<String> newServerList = new ArrayList<String>();
    // 获取并监听groupNode的子节点变化
    // watch参数为true, 表示监听子节点变化事件.
    // 每次都需要重新注册监听, 因为一次注册, 只能监听一次事件, 如果还想继续保持监听, 必须重新注册
    List<String> subList = zk.getChildren("/" + groupNode, true);
    for (String subNode : subList) {
    // 获取每个子节点下关联的server地址
    byte[] data = zk.getData("/" + groupNode + "/" + subNode, false, stat);
    newServerList.add(new String(data, "utf-8"));
    }
    // 替换server列表
    serverList = newServerList;
    System.out.println("server list updated: " + serverList);
    }
    /**
  • client的工作逻辑写在这个方法中
  • 此处不做任何处理, 只让client sleep
    */
    public void handle() throws InterruptedException {
    Thread.sleep(Long.MAX_VALUE);
    }
    public static void main(String[] args) throws Exception {
    AppClient ac = new AppClient();
    ac.connectZookeeper();
    ac.handle();
    }
    }
    B、服务器端实现

public class AppServer {

private String groupNode = "sgroup";

private String subNode = "sub";

/**

  • 连接zookeeper
  • @param address server的地址
    */
    public void connectZookeeper(String address) throws Exception {
    ZooKeeper zk = new ZooKeeper(
    "localhost:4180,localhost:4181,localhost:4182",
    5000, new Watcher() {
    public void process(WatchedEvent event) {
    // 不做处理
    }
    });
    // 在"/sgroup"下创建子节点
    // 子节点的类型设置为EPHEMERAL_SEQUENTIAL, 表明这是一个临时节点, 且在子节点的名称后面加上一串数字后缀
    // 将server的地址数据关联到新创建的子节点上
    String createdPath = zk.create("/" + groupNode + "/" + subNode, address.getBytes("utf-8"),
    Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    System.out.println("create: " + createdPath);
    }
    /**
  • server的工作逻辑写在这个方法中
  • 此处不做任何处理, 只让server sleep
    */
    public void handle() throws InterruptedException {
    Thread.sleep(Long.MAX_VALUE);
    }
    public static void main(String[] args) throws Exception {
    // 在参数中指定server的地址
    if (args.length == 0) {
    System.err.println("The first argument must be server address");
    System.exit(1);
    }
    AppServer as = new AppServer();
    as.connectZookeeper(args[0]);
    as.handle();
    }
    }
    3.2 分布式共享锁的简单实现

客户端A

public class DistributedClient {

// 超时时间

private static final int SESSION_TIMEOUT = 5000;

// zookeeper server列表

private String hosts = "localhost:4180,localhost:4181,localhost:4182";

private String groupNode = "locks";

private String subNode = "sub";

private ZooKeeper zk;

// 当前client创建的子节点

private String thisPath;

// 当前client等待的子节点

private String waitPath;

private CountDownLatch latch = new CountDownLatch(1);

/**

  • 连接zookeeper

    */

    public void connectZookeeper() throws Exception {

    zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {

    public void process(WatchedEvent event) {

    try {

    // 连接建立时, 打开latch, 唤醒wait在该latch上的线程

    if (event.getState() == KeeperState.SyncConnected) {

    latch.countDown();

    }

    // 发生了waitPath的删除事件

    if (event.getType() == EventType.NodeDeleted && event.getPath().equals(waitPath)) {

    doSomething();

    }

    } catch (Exception e) {

    e.printStackTrace();

    }

    }

    });

    // 等待连接建立

    latch.await();

    // 创建子节点

    thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,

    CreateMode.EPHEMERAL_SEQUENTIAL);

    // wait一小会, 让结果更清晰一些

    Thread.sleep(10);

    // 注意, 没有必要监听"/locks"的子节点的变化情况

    List<String> childrenNodes = zk.getChildren("/" + groupNode, false);

    // 列表中只有一个子节点, 那肯定就是thisPath, 说明client获得锁

    if (childrenNodes.size() == 1) {

    doSomething();

    } else {

    String thisNode = thisPath.substring(("/" + groupNode + "/").length());

    // 排序

    Collections.sort(childrenNodes);

    int index = childrenNodes.indexOf(thisNode);

    if (index == -1) {

    // never happened

    } else if (index == 0) {

    // inddx == 0, 说明thisNode在列表中最小, 当前client获得锁

    doSomething();

    } else {

    // 获得排名比thisPath前1位的节点

    this.waitPath = "/" + groupNode + "/" + childrenNodes.get(index - 1);

    // 在waitPath上注册监听器, 当waitPath被删除时, zookeeper会回调监听器的process方法

    zk.getData(waitPath, true, new Stat());

    }

    }

    }

    private void doSomething() throws Exception {

    try {

    System.out.println("gain lock: " + thisPath);

    Thread.sleep(2000);

    // do something

    } finally {

    System.out.println("finished: " + thisPath);

    // 将thisPath删除, 监听thisPath的client将获得通知

    // 相当于释放锁

    zk.delete(this.thisPath, -1);

    }

    }

    public static void main(String[] args) throws Exception {

    for (int i = 0; i < 10; i++) {

    new Thread() {

    public void run() {

    try {

    DistributedClient dl = new DistributedClient();

    dl.connectZookeeper();

    } catch (Exception e) {

    e.printStackTrace();

    }

    }

    }.start();

    }

    Thread.sleep(Long.MAX_VALUE);

    }

    }

    分布式多进程模式实现

public class DistributedClientMy {

// 超时时间

private static final int SESSION_TIMEOUT = 5000;

// zookeeper server列表

private String hosts = "spark01:2181,spark02:2181,spark03:2181";

private String groupNode = "locks";

private String subNode = "sub";

private boolean haveLock = false;

private ZooKeeper zk;

// 当前client创建的子节点

private volatile String thisPath;

/**

  • 连接zookeeper
    */
    public void connectZookeeper() throws Exception {
    zk = new ZooKeeper("spark01:2181", SESSION_TIMEOUT, new Watcher() {
    public void process(WatchedEvent event) {
    try {
    // 子节点发生变化
    if (event.getType() == EventType.NodeChildrenChanged && event.getPath().equals("/" + groupNode)) {
    // thisPath是否是列表中的最小节点
    List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
    String thisNode = thisPath.substring(("/" + groupNode + "/").length());
    // 排序
    Collections.sort(childrenNodes);
    if (childrenNodes.indexOf(thisNode) == 0) {
    doSomething();
    thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
    CreateMode.EPHEMERAL_SEQUENTIAL);
    }
    }
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    });
    // 创建子节点
    thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
    CreateMode.EPHEMERAL_SEQUENTIAL);
    // wait一小会, 让结果更清晰一些
    Thread.sleep(new Random().nextInt(1000));
    // 监听子节点的变化
    List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
    // 列表中只有一个子节点, 那肯定就是thisPath, 说明client获得锁
    if (childrenNodes.size() == 1) {
    doSomething();
    thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
    CreateMode.EPHEMERAL_SEQUENTIAL);
    }
    }
    /**
  • 共享资源的访问逻辑写在这个方法中
    */
    private void doSomething() throws Exception {
    try {
    System.out.println("gain lock: " + thisPath);
    Thread.sleep(2000);
    // do something
    } finally {
    System.out.println("finished: " + thisPath);
    // 将thisPath删除, 监听thisPath的client将获得通知
    // 相当于释放锁
    zk.delete(this.thisPath, -1);
    }
    }
    public static void main(String[] args) throws Exception {
    DistributedClientMy dl = new DistributedClientMy();
    dl.connectZookeeper();
    Thread.sleep(Long.MAX_VALUE);
    }
    }
    四、zookeeper的选举机制(全新集群paxos)

以一个简单的例子来说明整个选举的过程。

假设有五台服务器组成的zookeeper集群,它们的id从1-5,同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是一样的.假设这些服务器依序启动,来看看会发生什么。

服务器1启动,此时只有它一台服务器启动了,它发出去的报没有任何响应,所以它的选举状态一直是LOOKING状态

服务器2启动,它与最开始启动的服务器1进行通信,互相交换自己的选举结果,由于两者都没有历史数据,所以id值较大的服务器2胜出,但是由于没有达到超过半数以上的服务器都同意选举它(这个例子中的半数以上是3),所以服务器1,2还是继续保持LOOKING状态.

服务器3启动,根据前面的理论分析,服务器3成为服务器1,2,3中的老大,而与上面不同的是,此时有三台服务器选举了它,所以它成为了这次选举的leader.

服务器4启动,根据前面的分析,理论上服务器4应该是服务器1,2,3,4中最大的,但是由于前面已经有半数以上的服务器选举了服务器3,所以它只能接收当小弟的命了.

服务器5启动,同4一样,当小弟

五、非全新集群的选举机制(数据恢复)

那么,初始化的时候,是按照上述的说明进行选举的,但是当zookeeper运行了一段时间之后,有机器down掉,重新选举时,选举过程就相对复杂了。

需要加入数据id、leader id和逻辑时钟。

数据id:数据新的id就大,数据每次更新都会更新id。

Leader id:就是我们配置的myid中的值,每个机器一个。

逻辑时钟:这个值从0开始递增,每次选举对应一个值,也就是说: 如果在同一次选举中,那么这个值应该是一致的 ; 逻辑时钟值越大,说明这一次选举leader的进程更新.

选举的标准就变成:

1、逻辑时钟小的选举结果被忽略,重新投票

2、统一逻辑时钟后,数据id大的胜出

3、数据id相同的情况下,leader id大的胜出

根据这个规则选出leader。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Mission Python

Mission Python

Sean McManus / No Starch Press / 2018-9-18 / GBP 24.99

Launch into coding with Mission Python, a space-themed guide to building a complete computer game in Python. You'll learn programming fundamentals like loops, strings, and lists as you build Escape!, ......一起来看看 《Mission Python》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

URL 编码/解码
URL 编码/解码

URL 编码/解码