Zookeeper客户端Curator使用详解

[TOC]

Zookeeper客户端Curator使用详解

简介

Curator是Netflix集团开源的一套zookeeper客户端框架,化解了诸多Zookeeper客户端非常底层的底细开发工作,包蕴连日来重连、反复注册沃特cher和NodeExistsException卓殊等等。Patrixck
Hunt(Zookeeper)以一句“Guava is to Java that Curator to
Zookeeper”给Curator予中度评价。
引子和趣闻:
Zookeeper名字的原由是相比较有趣的,下边的一对摘抄自《从PAXOS到ZOOKEEPE福特Explorer分布式一致性原理与实践》一书:
Zookeeper最早源点于雅虎的研商院的3个切磋小组。在即时,商讨人口发现,在雅虎内部很多大型的系统须要爱慕贰个近似的种类开展分布式协调,不过这个系统往往存在分布式单点难点。所以雅虎的开发人员就打算开发1个通用的无单点难题的分布式协调框架。在立项初期,考虑到不少品类都以用动物的名字来命名的(例如闻名的Pig项目),雅虎的工程师希望给这些项目也取贰个动物的名字。时任切磋院的首席地理学家Raghu
Ramakrishnan开玩笑说:再这么下去,大家那时候就改为动物园了。此话一出,我们纷繁表示就叫动物园管理员吧——因为各样以动物命名的分布式组件放在一块儿,雅虎的百分百分布式系统看上去如同2个重型的动物园了,而Zookeeper正好用来进展分布式环境的调和——于是,Zookeeper的名字由此诞生了。

Curator无疑是Zookeeper客户端中的瑞士联邦军刀,它译作”馆长”只怕”管理者”,不驾驭是否开发小组有意而为之,我猜度有只怕这么命名的原故是表达Curator就是Zookeeper的馆长(脑洞有点大:Curator就是动物园的园长)。
Curator包蕴了多少个包:
curator-framework:对zookeeper的最底层api的一些装进
curator-client:提供一些客户端的操作,例如重试策略等
curator-recipes:包装了有个别高等本性,如:Cache事件监听、公投、分布式锁、分布式计数器、分布式Barrier等
Maven正视(使用curator的本子:2.12.0,对应Zookeeper的本子为:3.4.x,如若跨版本会有包容性难题,很有大概引致节点操作失败):

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

Curator的基本Api

开创会话

1.运用静态工程措施创制客户端

二个例子如下:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
                        connectionInfo,
                        5000,
                        3000,
                        retryPolicy);

newClient静态工厂方法包罗三个首要参数:

参数名 说明
connectionString 服务器列表,格式host1:port1,host2:port2,…
retryPolicy 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口
sessionTimeoutMs 会话超时时间,单位毫秒,默认60000ms
connectionTimeoutMs 连接创建超时时间,单位毫秒,默认60000ms

2.利用Fluent风格的Api成立会话

骨干参数变为流式设置,3个列子如下:

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();

3.创设包涵隔离命名空间的对话

为了落到实处差距的Zookeeper业务之间的隔离,要求为各类事情分配二个独自的命名空间(NameSpace),即内定贰个Zookeeper的根路径(官方术语:为Zookeeper添加“Chroot”特性)。例如(上面的事例)当客户端内定了单身命名空间为“/base”,那么该客户端对Zookeeper上的数码节点的操作都是依照该目录举办的。通过安装Chroot可以将客户端应用与Zookeeper服务端的一课子树相对应,在多个使用共用三个Zookeeper集群的景色下,这对于贯彻不相同采用之间的互相隔离十三分有意义。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace("base")
                .build();

开行客户端

当创设会话成功,得到client的实例然后可以一向调用其start( )方法:

client.start();

数据节点操作

成立数量节点

Zookeeper的节点创立情势:

  • PERSISTENT:持久化
  • PERSISTENT_SEQUENTIAL:持久化并且带系列号
  • EPHEMERAL:临时
  • EPHEMERAL_SEQUENTIAL:权且并且带系列号

**创立2个节点,开端内容为空 **

client.create().forPath("path");

只顾:即使没有安装节点属性,节点创造格局私行认同为持久化节点,内容暗许为空

创造1个节点,附带早先化内容

client.create().forPath("path","init".getBytes());

开创二个节点,指定成立方式(目前节点),内容为空

client.create().withMode(CreateMode.EPHEMERAL).forPath("path");

创设二个节点,内定创造格局(暂且节点),附带开始化内容

client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());

开创1个节点,钦定创设情势(一时节点),附带先河化内容,并且自动递归创立父节点

client.create()
      .creatingParentContainersIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .forPath("path","init".getBytes());

本条creatingParentContainersIfNeeded()接口极度有用,因为相似景观开发人员在成立3个子节点必须认清它的父节点是不是存在,倘诺不设有直接开立会抛出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator可以活动递归创造全部所需的父节点。

删去数据节点

剔除壹个节点

client.delete().forPath("path");

留意,此措施只可以去除叶子节点,否则会抛出卓殊。

除去二个节点,并且递归删除其具有的子节点

client.delete().deletingChildrenIfNeeded().forPath("path");

删除七个节点,强制内定版本进行删除

client.delete().withVersion(10086).forPath("path");

除去一个节点,强制保险删除

client.delete().guaranteed().forPath("path");

guaranteed()接口是一个保险格局,只要客户端会话有效,那么Curator会在后台持续开展删减操作,直到删除节点成功。

注意:下边的八个流式接口是足以自由组合的,例如:

client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");

读取数据节点数据

读取3个节点的数额内容

client.getData().forPath("path");

小心,此方式返的重临值是byte[ ];

读取三个节点的数目内容,同时拿到到该节点的stat

Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("path");

立异数据节点数据

更新二个节点的多寡内容

client.setData().forPath("path","data".getBytes());

留神:该接口会重返一个Stat实例

履新四个节点的数额内容,强制钦命版本举办更新

client.setData().withVersion(10086).forPath("path","data".getBytes());

自作者批评节点是不是存在

client.checkExists().forPath("path");

瞩目:该办法重回3个Stat实例,用于检查ZNode是不是存在的操作.
可以调用额外的艺术(监控只怕后台处理)并在结尾调用forPath(
)内定要操作的ZNode

收获有个别节点的全数子节点路径

client.getChildren().forPath("path");

只顾:该方式的重临值为List<String>,得到ZNode的子节点帕特h列表。
可以调用额外的措施(监控、后台处理依然取得状态watch, background or get
stat) 并在最终调用forPath()内定要操作的父ZNode

事务

CuratorFramework的实例包蕴inTransaction(
)接口方法,调用此措施开启3个ZooKeeper事务. 可以复合create, setData,
check, and/or delete
等操作然后调用commit()作为二个原子操作提交。3个例子如下:

client.inTransaction().check().forPath("path")
      .and()
      .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
      .and()
      .setData().withVersion(10086).forPath("path","data2".getBytes())
      .and()
      .commit();

异步接口

上面提到的创办、删除、更新、读取等办法都是同台的,Curator提供异步接口,引入了BackgroundCallback接口用于拍卖异步接口调用之后服务端再次来到的结果新闻。BackgroundCallback接口中贰个重点的回调值为Curator伊芙nt,里面含有事件类型、响应吗和节点的详细音讯。

CuratorEventType

事件类型 对应CuratorFramework实例的方法
CREATE #create()
DELETE #delete()
EXISTS #checkExists()
GET_DATA #getData()
SET_DATA #setData()
CHILDREN #getChildren()
SYNC #sync(String,Object)
GET_ACL #getACL()
SET_ACL #setACL()
WATCHED #Watcher(Watcher)
CLOSING #close()

响应码(#getResultCode())

响应码 意义
0 OK,即调用成功
-4 ConnectionLoss,即客户端与服务端断开连接
-110 NodeExists,即节点已经存在
-112 SessionExpired,即会话过期

二个异步创制节点的事例如下:

Executor executor = Executors.newFixedThreadPool(2);
client.create()
      .creatingParentsIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .inBackground((curatorFramework, curatorEvent) -> {      System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
      },executor)
      .forPath("path");

注意:如果#inBackground()方法不钦命executor,那么会暗中认可使用Curator的伊芙ntThread去举行异步处理。

Curator食谱(高级性子)

提拔:首先你无法不添加curator-recipes倚重,下文仅仅对recipes一些风味的使用进行解释和举例,不打算展开源码级其他探索

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

第叁提醒:强烈推荐使用ConnectionStateListener监控连接的事态,当连接情形为LOST,curator-recipes下的兼具Api将会失灵大概逾期,即使前面全数的事例都尚未使用到ConnectionStateListener。

缓存

Zookeeper原生接济通过挂号沃特cher来进行事件监听,但是开发者须要反复注册(沃特cher只好单次注册单次使用)。Cache是Curator中对事件监听的包裹,可以看作是对事件监听的当地缓存视图,可以自行为开发者处理反复注册监听。Curator提供了三种沃特cher(Cache)来监听结点的转变。

Path Cache

Path Cache用来监督二个ZNode的子节点. 当三个子节点增添, 更新,删除时,
帕特h Cache会改变它的情事, 会包含最新的子节点,
子节点的多少和状态,而事态的更变将由此PathChildrenCacheListener文告。

骨子里使用时会涉及到三个类:

  • PathChildrenCache
  • PathChildrenCacheEvent
  • PathChildrenCacheListener
  • ChildData

通过下边的构造函数创造Path Cache:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

想行使cache,必须调用它的start方法,使用完后调用close主意。
可以安装StartMode来达成运转的形式,

StartMode有下边三种:

  1. NO中华VMAL:符合规律伊始化。
  2. BUILD_INITIAL_CACHE:在调用start()事先会调用rebuild()
  3. POST_INITIALIZED_EVENT:
    当Cache开始化数据后发送三个PathChildrenCache伊夫nt.Type#INITIALIZED事件

public void addListener(PathChildrenCacheListener listener)可以追加listener监听缓存的变型。

getCurrentData()办法再次来到二个List<ChildData>目的,可以遍历全数的子节点。

安装/更新、移除其实是运用client (CuratorFramework)来操作,
不经过PathChildrenCache操作:

public class PathCacheDemo {

    private static final String PATH = "/example/pathCache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        cache.start();
        PathChildrenCacheListener cacheListener = (client1, event) -> {
            System.out.println("事件类型:" + event.getType());
            if (null != event.getData()) {
                System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(cacheListener);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
        Thread.sleep(10);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
        Thread.sleep(10);
        client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
        Thread.sleep(10);
        for (ChildData data : cache.getCurrentData()) {
            System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
        }
        client.delete().forPath("/example/pathCache/test01");
        Thread.sleep(10);
        client.delete().forPath("/example/pathCache/test02");
        Thread.sleep(1000 * 5);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:只要new PathChildrenCache(client, PATH,
true)中的参数cacheData值设置为false,则示例中的event.getData().getData()、data.getData()将回来null,cache将不会缓存节点数据。

注意:以身作则中的Thread.sleep(10)可以注释掉,不过注释后事件监听的触发次数会不全,那或然与PathCache的落到实处原理有关,不恐怕太过数十次的触及事件!

Node Cache

Node Cache与Path Cache类似,Node
Cache只是监听某一个特定的节点。它涉及到下边的八个类:

  • NodeCache – Node Cache实现类
  • NodeCacheListener – 节点监听器
  • ChildData – 节点数据

注意:利用cache,依旧要调用它的start()措施,使用完后调用close()方法。

getCurrentData()将取得节点当前的景况,通过它的情形可以得到当前的值。

public class NodeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        final NodeCache cache = new NodeCache(client, PATH);
        NodeCacheListener listener = () -> {
            ChildData data = cache.getCurrentData();
            if (null != data) {
                System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
            } else {
                System.out.println("节点被删除!");
            }
        };
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:示范中的Thread.sleep(10)可以注释,但是注释后事件监听的触发次数会不全,那或者与NodeCache的落成原理有关,无法太过频仍的接触事件!

注意:NodeCache只可以监听二个节点的情形变化。

Tree Cache

Tree
Cache可以监控全数树上的具备节点,类似于PathCache和NodeCache的结缘,主要涉及到上边五个类:

  • TreeCache – Tree Cache实现类
  • TreeCacheListener – 监听器类
  • TreeCache伊夫nt – 触发的风云类
  • ChildData – 节点数据

public class TreeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        TreeCache cache = new TreeCache(client, PATH);
        TreeCacheListener listener = (client1, event) ->
                System.out.println("事件类型:" + event.getType() +
                        " | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:在此示例中并未选择Thread.sleep(10),不过事件触发次数也是符合规律的。

注意:TreeCache在开头化(调用start()办法)的时候会回调TreeCacheListener实例三个事TreeCache伊夫nt,而回调的TreeCache伊夫nt对象的Type为INITIALIZED,ChildData为null,此时event.getData().getPath()很有或者引致空指针卓殊,那里应该主动处理并幸免那种气象。

Leader选举

在分布式统计中, leader elections是很要紧的2个功效,
那个大选进程是那样子的: 指派二个进程作为社团者,将任务分发给各节点。
在义务开端前,
哪个节点都不明了哪个人是leader(领导者)或然coordinator(协调者).
当公投算法起始实施后, 逐个节点最后会取得一个唯一的节点作为天职leader.
除此之外,
选举还时常会发出在leader意外宕机的情景下,新的leader要被大选出来。

在zookeeper集群中,leader负责写操作,然后经过Zab协议落实follower的一路,leader大概follower都足以拍卖读操作。

Curator
有两种leader选举的recipe,分别是LeaderSelectorLeaderLatch

前端是有着存活的客户端不间断的轮番做Leader,晋中社会。后者是一旦大选出Leader,除非有客户端挂掉重新触发大选,否则不会交出领导权。某党?

LeaderLatch

LeaderLatch有八个构造函数:

public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath,  String id)

LeaderLatch的启动:

leaderLatch.start( );

假若运转,LeaderLatch会和其他使用相同latch
path的别样LeaderLatch交涉,然后里面二个最后会被公投为leader,可以通过hasLeadership办法查看LeaderLatch实例是还是不是leader:

leaderLatch.hasLeadership( ); //重临true表达当前实例是leader

就像是JDK的CountDownLatch,
LeaderLatch在呼吁成为leadership会block(阻塞),一旦不利用LeaderLatch了,必须调用close方式。
如若它是leader,会释放leadership, 其余的加入者将会选出贰个leader。

public void await() throws InterruptedException,EOFException
/*Causes the current thread to wait until this instance acquires leadership
unless the thread is interrupted or closed.*/
public boolean await(long timeout,TimeUnit unit)throws InterruptedException

万分处理:
LeaderLatch实例能够扩张ConnectionStateListener来监听互连网连接难点。 当
SUSPENDED 或 LOST 时,
leader不再认为本身照旧leader。当LOST后连连重连后RECONNECTED,LeaderLatch会删除先前的ZNode然后再一次成立三个。LeaderLatch用户必须考虑导致leadership丢失的接连难题。
强烈推荐你利用ConnectionStateListener。

一个LeaderLatch的使用例子:

public class LeaderLatchDemo extends BaseConnectionInfo {
    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderLatch> examples = Lists.newArrayList();
        TestingServer server=new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
                latch.addListener(new LeaderLatchListener() {

                    @Override
                    public void isLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am Leader");
                    }

                    @Override
                    public void notLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am not Leader");
                    }
                });
                examples.add(latch);
                client.start();
                latch.start();
            }
            Thread.sleep(10000);
            LeaderLatch currentLeader = null;
            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
            currentLeader.close();

            Thread.sleep(5000);

            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
        } finally {
            for (LeaderLatch latch : examples) {
                if (null != latch.getState())
                CloseableUtils.closeQuietly(latch);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
        }
    }
}

可以添加test module的借助方便开展测试,不须求运营真实的zookeeper服务端:

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-test</artifactId>
            <version>2.12.0</version>
        </dependency>

首先我们创立了拾2个LeaderLatch,运营后它们中的七个会被推举为leader。
因为公投会费用一些年华,start后并不可以马上就收获leader。
通过hasLeadership翻看自个儿是不是是leader, 假若是的话重回true。
可以透过.getLeader().getId()可以博得当前的leader的ID。
只能够经过close放飞当前的领导权。
await是壹个打断方法, 尝试获取leader地位,但是未必能上位。

LeaderSelector

LeaderSelector使用的时候根本涉嫌上边多少个类:

  • LeaderSelector
  • LeaderSelectorListener
  • LeaderSelectorListenerAdapter
  • CancelLeadershipException

宗旨类是LeaderSelector,它的构造函数如下:

public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)

类似LeaderLatch,LeaderSelector必须start: leaderSelector.start();
一旦运转,当实例取得领导权时你的listener的takeLeadership()方法被调用。而takeLeadership()方法唯有领导权被假释时才回到。
当你不再采纳LeaderSelector实例时,应该调用它的close方法。

十一分处理
LeaderSelectorListener类继承ConnectionStateListener。LeaderSelector必须小心连接境况的更改。假设实例成为leader,
它应当响应SUSPENDED 或 LOST。 当 SUSPENDED 状态出现时,
实例必须假定在重新连接成功之前它或者不再是leader了。 若是LOST状态出现,
实例不再是leader, takeLeadership方法重临。

重要: 推荐处理格局是当接到SUSPENDED 或
LOST时抛出CancelLeadershipException十分.。那会导致LeaderSelector实例中断并撤回执行takeLeadership方法的非常.。那可怜重大,
你必须考虑扩充LeaderSelectorListenerAdapter.
LeaderSelectorListenerAdapter提供了引进的拍卖逻辑。

上边的贰个例证摘抄自官方:

public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
    private final String name;
    private final LeaderSelector leaderSelector;
    private final AtomicInteger leaderCount = new AtomicInteger();

    public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
        this.name = name;
        leaderSelector = new LeaderSelector(client, path, this);
        leaderSelector.autoRequeue();
    }

    public void start() throws IOException {
        leaderSelector.start();
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }

    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = (int) (5 * Math.random()) + 1;
        System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
        System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        } catch (InterruptedException e) {
            System.err.println(name + " was interrupted.");
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(name + " relinquishing leadership.\n");
        }
    }
}

你可以在takeLeadership进行义务的分配等等,并且永不回来,如果你想要要此实例一贯是leader的话可以加2个死循环。调用
leaderSelector.autoRequeue();保险在此实例释放领导权之后还或许赢得领导权。
在此处大家接纳AtomicInteger来记录此client拿到领导权的次数, 它是”fair”,
每一种client有雷同的机会得到领导权。

public class LeaderSelectorDemo {

    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderSelectorAdapter> examples = Lists.newArrayList();
        TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);
                examples.add(selectorAdapter);
                client.start();
                selectorAdapter.start();
            }
            System.out.println("Press enter/return to quit\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();
        } finally {
            System.out.println("Shutting down...");
            for (LeaderSelectorAdapter exampleClient : examples) {
                CloseableUtils.closeQuietly(exampleClient);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
            CloseableUtils.closeQuietly(server);
        }
    }
}

相对而言可以,LeaderLatch必须调用close()方法才会自由领导权,而对此LeaderSelector,通过LeaderSelectorListener可以对领导权举办控制,
在至极的时候释放领导权,那样各种节点都有只怕赢得领导权。从而,LeaderSelector具有更好的灵活性和可控性,指出有LeaderElection应用场景下优先使用LeaderSelector。

分布式锁

提醒:

1.引进使用ConnectionStateListener监控连接的处境,因为当连接LOST时您不再具备锁

2.分布式的锁全局同步,
那意味任何三个时刻点不会有多个客户端都拥有一致的锁。

可重入共享锁—Shared Reentrant Lock

Shared意味着锁是全局可知的, 客户端都得以请求锁。
Reentrant和JDK的ReentrantLock类似,即可重入,
意味着同一个客户端在享有锁的同时,可以屡屡收获,不会被打断。
它是由类InterProcessMutex来达成。 它的构造函数为:

public InterProcessMutex(CuratorFramework client, String path)

通过acquire()取得锁,并提供超时机制:

public void acquire()
Acquire the mutex - blocking until it's available. Note: the same thread can call acquire
re-entrantly. Each call to acquire must be balanced by a call to release()

public boolean acquire(long time,TimeUnit unit)
Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release()

Parameters:
time - time to wait
unit - time unit
Returns:
true if the mutex was acquired, false if not

通过release()主意释放锁。 InterProcessMutex 实例可以接纳。

Revoking ZooKeeper recipes wiki定义了可切磋的吊销机制。
为了裁撤mutex, 调用下边的主意:

public void makeRevocable(RevocationListener<T> listener)
将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。
Parameters:
listener - the listener

假定您请求裁撤当前的锁,
调用attemptRevoke()形式,注意锁释放时RevocationListener将会回调。

public static void attemptRevoke(CuratorFramework client,String path) throws Exception
Utility to mark a lock for revocation. Assuming that the lock has been registered
with a RevocationListener, it will get called and the lock should be released. Note,
however, that revocation is cooperative.
Parameters:
client - the client
path - the path of the lock - usually from something like InterProcessMutex.getParticipantNodes()

3遍提示:错误处理
如故强烈推荐你利用ConnectionStateListener拍卖连接情形的转移。
当连接LOST时您不再持有锁。

首先让大家制造一个效仿的共享财富,
这些能源期望只可以单线程的拜访,否则会有出现难点。

public class FakeLimitedResource {
    private final AtomicBoolean inUse = new AtomicBoolean(false);

    public void use() throws InterruptedException {
        // 真实环境中我们会在这里访问/维护一个共享的资源
        //这个例子在使用锁的情况下不会非法并发异常IllegalStateException
        //但是在无锁的情况由于sleep了一段时间,很容易抛出异常
        if (!inUse.compareAndSet(false, true)) {
            throw new IllegalStateException("Needs to be used by one client at a time");
        }
        try {
            Thread.sleep((long) (3 * Math.random()));
        } finally {
            inUse.set(false);
        }
    }
}

下一场创设1个InterProcessMutexDemo类, 它承担请求锁,
使用能源,释放锁那样二个完好的拜会进度。

public class InterProcessMutexDemo {

    private InterProcessMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

代码也很粗略,生成拾2个client, 每一个client重复执行拾次请求锁–访问财富–释放锁的历程。每一种client都在独立的线程中。
结果能够见见,锁是自由的被各个实例排他性的应用。

既然是可采纳的,你可以在3个线程中频仍调用acquire(),在线程拥有锁时它连接回到true。

您不应当在三个线程中用同二个InterProcessMutex
你能够在每一个线程中都生成八个新的InterProcessMutex实例,它们的path都一律,那样它们得以共享同一个锁。

不得重入共享锁—Shared Lock

那个锁和上面的InterProcessMutex对照,就是少了Reentrant的效劳,也就表示它无法在同1个线程中重入。那几个类是InterProcessSemaphoreMutex,使用形式和InterProcessMutex类似

public class InterProcessSemaphoreMutexDemo {

    private InterProcessSemaphoreMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessSemaphoreMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessSemaphoreMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 已获取到互斥锁");
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 再次获取到互斥锁");
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
            lock.release(); // 获取锁几次 释放锁也要几次
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessSemaphoreMutexDemo example = new InterProcessSemaphoreMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
        Thread.sleep(Integer.MAX_VALUE);
    }
}

运作后意识,有且唯有三个client成功博得第二个锁(第2个acquire()形式再次来到true),然后它和谐过不去在其次个acquire()措施,获取第3个锁超时;其余具有的客户端都阻塞在率先个acquire()方法超时并且抛出非凡。

如此也就印证了InterProcessSemaphoreMutex一路顺风的锁是不行重入的。

可重入读写锁—Shared Reentrant Read Write Lock

类似JDK的ReentrantReadWriteLock。3个读写锁管理一对相关的锁。1个担当读操作,别的3个承担写操作。读操作在写锁没被应用时可同时由多个经过使用,而写锁在动用时差别意读(阻塞)。

此锁是可重入的。壹个有着写锁的线程可重入读锁,但是读锁却无法进入写锁。那也代表写锁可以降级成读锁,
比如请求写锁 —>请求读锁—>释放读锁
—->释放写锁
。从读锁升级成写锁是相当的。

可重入读写锁首要由多个类完毕:InterProcessReadWriteLockInterProcessMutex。使用时首先成立一个InterProcessReadWriteLock实例,然后再依照你的急需拿到读锁或许写锁,读写锁的连串是InterProcessMutex

public class ReentrantReadWriteLockDemo {

    private final InterProcessReadWriteLock lock;
    private final InterProcessMutex readLock;
    private final InterProcessMutex writeLock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public ReentrantReadWriteLockDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        lock = new InterProcessReadWriteLock(client, lockPath);
        readLock = lock.readLock();
        writeLock = lock.writeLock();
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        // 注意只能先得到写锁再得到读锁,不能反过来!!!
        if (!writeLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到写锁");
        }
        System.out.println(clientName + " 已得到写锁");
        if (!readLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到读锁");
        }
        System.out.println(clientName + " 已得到读锁");
        try {
            resource.use(); // 使用资源
            Thread.sleep(1000);
        } finally {
            System.out.println(clientName + " 释放读写锁");
            readLock.release();
            writeLock.release();
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY ;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final ReentrantReadWriteLockDemo example = new ReentrantReadWriteLockDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

信号量—Shared Semaphore

一个计数的信号量类似JDK的Semaphore。
JDK中Semaphore维护的一组认同(permits),而Curator中称之为租约(Lease)。
有两种办法可以操纵semaphore的最大租约数。第②种方式是用户给定path并且钦点最大LeaseSize。第1种方法用户给定path并且接纳SharedCountReader类。若果不行使SharedCountReader,
必须保障全数实例在多进程中采纳同一的(最大)租约数量,否则有或许出现A进度中的实例持有最大租约数量为10,可是在B进程中装有的最大租约数量为20,此时租约的意义就失效了。

本次调用acquire()会回去2个租约对象。
客户端必须在finally中close那个租约对象,否则这个租约会丢失掉。 可是,
但是,假如客户端session由于某种原因比如crash丢掉,
那么那么些客户端持有的租约会自动close,
那样任何客户端可以继续利用那么些租约。 租约还是可以通过上面的不二法门返还:

public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)

留意你可以一遍性请求三个租约,若是Semaphore当前的租约不够,则呼吁线程会被卡住。
同时还提供了晚点的重载方法。

public Lease acquire()
public Collection<Lease> acquire(int qty)
public Lease acquire(long time, TimeUnit unit)
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)

Shared Semaphore使用的根本类包罗下边几个:

  • InterProcessSemaphoreV2
  • Lease
  • SharedCountReader

public class InterProcessSemaphoreDemo {

    private static final int MAX_LEASE = 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {

            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
            Collection<Lease> leases = semaphore.acquire(5);
            System.out.println("get " + leases.size() + " leases");
            Lease lease = semaphore.acquire();
            System.out.println("get another lease");

            resource.use();

            Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
            System.out.println("Should timeout and acquire return " + leases2);

            System.out.println("return one lease");
            semaphore.returnLease(lease);
            System.out.println("return another 5 leases");
            semaphore.returnAll(leases);
        }
    }
}

先是大家先取得了六个租约, 最后咱们把它还给了semaphore。
接着请求了1个租约,因为semaphore还有多少个租约,所以恳请可以满意,重返七个租约,还剩五个租约。
然后再请求1个租约,因为租约不够,卡住到过期,照旧没能知足,重临结果为null(租约不足会阻塞到过期,然后回来null,不会积极抛出非凡;假诺不设置超时时间,会雷同阻塞)。

地点说讲的锁都以公平锁(fair)。 总ZooKeeper的角度看,
各种客户端都根据请求的依次拿到锁,不设有非公平的抢占的景况。

多共享锁对象 —Multi Shared Lock

Multi Shared Lock是2个锁的器皿。 当调用acquire()
全体的锁都会被acquire(),借使请求战败,全数的锁都会被release。
同样调用release时怀有的锁都被release(挫折被忽视)。
基本上,它就是组锁的象征,在它上边的哀求释放操作都会传送给它包含的拥有的锁。

最首要涉及八个类:

  • InterProcessMultiLock
  • InterProcessLock

它的构造函数须求包罗的锁的汇聚,或许一组ZooKeeper的path。

public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)

用法和Shared Lock相同。

public class MultiSharedLockDemo {

    private static final String PATH1 = "/examples/locks1";
    private static final String PATH2 = "/examples/locks2";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessLock lock1 = new InterProcessMutex(client, PATH1);
            InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);

            InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));

            if (!lock.acquire(10, TimeUnit.SECONDS)) {
                throw new IllegalStateException("could not acquire the lock");
            }
            System.out.println("has got all lock");

            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());

            try {
                resource.use(); //access resource exclusively
            } finally {
                System.out.println("releasing the lock");
                lock.release(); // always release the lock in a finally block
            }
            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());
        }
    }
}

新建三个InterProcessMultiLock, 包罗壹个重入锁和八个非重入锁。
调用acquire()后可以看出线程同时具有了那七个锁。
调用release()见到那多个锁都被放走了。

最终再反复三遍,
强烈推荐使用ConnectionStateListener监控连接的景况,当连接情况为LOST,锁将会丢掉。

分布式计数器

顾名思义,计数器是用来计数的,
利用ZooKeeper可以兑现八个集群共享的计数器。
只要使用相同的path就可以博得最新的计数器值,
那是由ZooKeeper的一致性保证的。Curator有三个计数器,
2个是用int来计数(SharedCount),一个用long来计数(DistributedAtomicLong)。

分布式int计数器—SharedCount

这一个类应用int类型来计数。 重要涉嫌四个类。

  • SharedCount
  • SharedCountReader
  • SharedCountListener

SharedCount表示计数器,
可以为它扩张一个SharedCountListener,当计数器改变时此Listener可以监听到改变的风浪,而SharedCountReader可以读取到新型的值,
包罗字面值和带版本新闻的值VersionedValue。

public class SharedCounterDemo implements SharedCountListener {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        final Random rand = new Random();
        SharedCounterDemo example = new SharedCounterDemo();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            SharedCount baseCount = new SharedCount(client, PATH, 0);
            baseCount.addListener(example);
            baseCount.start();

            List<SharedCount> examples = Lists.newArrayList();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final SharedCount count = new SharedCount(client, PATH, 0);
                examples.add(count);
                Callable<Void> task = () -> {
                    count.start();
                    Thread.sleep(rand.nextInt(10000));
                    System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10)));
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            for (int i = 0; i < QTY; ++i) {
                examples.get(i).close();
            }
            baseCount.close();
        }
        Thread.sleep(Integer.MAX_VALUE);
    }

    @Override
    public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
        System.out.println("State changed: " + arg1.toString());
    }

    @Override
    public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
        System.out.println("Counter's value is changed to " + newCount);
    }
}

在这些例子中,大家利用baseCount来监听计数值(addListener办法来添加SharedCountListener
)。 任意的SharedCount, 只要使用相同的path,都可以获取这一个计数值。
然后大家接纳多少个线程为计数值扩充3个10以内的随机数。相同的path的SharedCount对计数值举办转移,将会回调给baseCount的SharedCountListener。

count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))

此间大家应用trySetCount去设置计数器。
率先个参数提供当前的VersionedValue,即使时期其余client更新了此计数值,
你的换代或然不成功,
然而此时你的client更新了最新的值,所以退步了您可以尝试再更新一遍。
setCount是强制更新计数器的值

只顾计数器必须start,使用完之后必须调用close关闭它。

强烈推荐使用ConnectionStateListener
在本例中SharedCountListener扩展ConnectionStateListener

分布式long计数器—DistributedAtomicLong

再看二个Long类型的计数器。 除了计数的限定比SharedCount大了之外,
它首先尝试利用乐观锁的法门设置计数器,
若是不成事(比如时期计数器已经被其余client更新了),
它选拔InterProcessMutex措施来更新计数值。

能够从它的内部贯彻DistributedAtomicValue.trySet()中看出:

   AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception
    {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);

        tryOptimistic(result, makeValue);
        if ( !result.succeeded() && (mutex != null) )
        {
            tryWithMutex(result, makeValue);
        }

        return result;
    }

此计数器有一密密麻麻的操作:

  • get(): 获取当前值
  • increment(): 加一
  • decrement(): 减一
  • add(): 扩充一定的值
  • subtract(): 减去特定的值
  • trySet(): 尝试设置计数值
  • forceSet(): 强制设置计数值

必须检查重返结果的succeeded(), 它代表此操作是或不是中标。
倘使操作成功, preValue()意味着操作前的值,
postValue()代表操作后的值。

public class DistributedAtomicLongDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        List<DistributedAtomicLong> examples = Lists.newArrayList();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));

                examples.add(count);
                Callable<Void> task = () -> {
                    try {
                        AtomicValue<Long> value = count.increment();
                        System.out.println("succeed: " + value.succeeded());
                        if (value.succeeded())
                            System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
}

分布式队列

动用Curator也得以简化Ephemeral Node
(一时半刻节点)的操作。Curator也提供ZK Recipe的分布式队列完成。 利用ZK的
PE陆风X8SISTENTS_EQUENTIAL节点,
可以保证放入到行列中的项目是比照顺序排队的。
假若纯粹的主顾从队列中取数据, 那么它是先入先出的,这也是队列的性状。
如果您严厉须要顺序,你就的施用单一的顾客,能够运用Leader公投只让Leader作为唯一的主顾。

唯独, 依据Netflix的Curator小编所说,
ZooKeeper真心不符合做Queue,或许说ZK没有兑现3个好的Queue,详细内容可以看
Tech Note
4

原因有五:

  1. ZK有1MB 的传输限制。
    实践中ZNode必须相对较小,而队列包括众多的新闻,非凡的大。
  2. 一旦有过多节点,ZK运营时11分的慢。 而使用queue会导致成千成万ZNode.
    你需求肯定增大 initLimit 和 syncLimit.
  3. ZNode很大的时候很难清理。Netflix不得不创造了两个特意的次序做这事。
  4. 当很大气的涵盖众多的子节点的ZNode时, ZK的品质变得不得了
  5. ZK的数据库完全放在内存中。 大批量的Queue意味着会占用很多的内存空间。

就算, Curator仍然创造了种种Queue的落成。
尽管Queue的数据量不太多,数据量不太大的动静下,酌情考虑,还可以拔取的。

分布式队列—DistributedQueue

DistributedQueue是最常见的一种队列。 它部署以下两个类:

  • QueueBuilder – 创立队列使用QueueBuilder,它也是其它队列的创设类
  • QueueConsumer – 队列中的新闻消费者接口
  • Queue塞里alizer –
    队列新闻种类化和反连串化接口,提供了对队列中的对象的系列化和反系列化
  • DistributedQueue – 队列已毕类

QueueConsumer是顾客,它可以接收队列的数码。处理队列中的数据的代码逻辑可以放在QueueConsumer.consumeMessage()中。

例行景况下先将音讯从队列中移除,再提交消费者消费。但那是多少个步骤,不是原子的。可以调用Builder的lockPath()消费者加锁,当消费者消费数据时怀有锁,这样任何消费者不可以消费此新闻。假设消费失败恐怕经过死掉,新闻可以提交其他进度。那会带来一些本性的损失。最好可能单消费者形式应用队列。

public class DistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework clientA = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientA.start();
        CuratorFramework clientB = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientB.start();
        DistributedQueue<String> queueA;
        QueueBuilder<String> builderA = QueueBuilder.builder(clientA, createQueueConsumer("A"), createQueueSerializer(), PATH);
        queueA = builderA.buildQueue();
        queueA.start();

        DistributedQueue<String> queueB;
        QueueBuilder<String> builderB = QueueBuilder.builder(clientB, createQueueConsumer("B"), createQueueSerializer(), PATH);
        queueB = builderB.buildQueue();
        queueB.start();
        for (int i = 0; i < 100; i++) {
            queueA.put(" test-A-" + i);
            Thread.sleep(10);
            queueB.put(" test-B-" + i);
        }
        Thread.sleep(1000 * 10);// 等待消息消费完成
        queueB.close();
        queueA.close();
        clientB.close();
        clientA.close();
        System.out.println("OK!");
    }

    /**
     * 队列消息序列化实现类
     */
    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }
        };
    }

    /**
     * 定义队列消费者
     */
    private static QueueConsumer<String> createQueueConsumer(final String name) {
        return new QueueConsumer<String>() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("连接状态改变: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("消费消息(" + name + "): " + message);
            }
        };
    }
}

事例中定义了五个分布式队列和多个顾客,因为PATH是一模一样的,会设有消费者抢占消费音讯的气象。

带Id的分布式队列—DistributedIdQueue

DistributedIdQueue和地方的系列类似,但是能够为队列中的每二个成分设置一个ID
可以通过ID把队列中肆意的因素移除。 它涉及多少个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue

通过下边方法创造:

builder.buildIdQueue()

放入元素时:

queue.put(aMessage, messageId);

移除成分时:

int numberRemoved = queue.remove(messageId);

在这些事例中,
有个别成分还不曾被消费者消费前就移除了,那样顾客不会吸收删除的新闻。

public class DistributedIdQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedIdQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildIdQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put(" test-" + i, "Id" + i);
                Thread.sleep((long) (15 * Math.random()));
                queue.remove("Id" + i);
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);
            }

        };
    }
}

先行级分布式队列—DistributedPriorityQueue

预先级队列对队列中的成分依据事先级进行排序。 Priority越小,
成分越靠前, 越先被消费掉
。 它关系下边多少个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedPriorityQueue

由此builder.buildPriorityQueue(minItemsBeforeRefresh)方法创建。
当优先级队列拿到元素增删音信时,它会搁浅处理当下的要素队列,然后刷新队列。minItemsBeforeRefresh内定刷新前当前活动的行列的很小数量。
首要安装你的先后可以忍受的不排序的微小值。

放入队列时要求内定优先级:

queue.put(aMessage, priority);

例子:

public class DistributedPriorityQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedPriorityQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildPriorityQueue(0);
            queue.start();

            for (int i = 0; i < 10; i++) {
                int priority = (int) (Math.random() * 100);
                System.out.println("test-" + i + " priority:" + priority);
                queue.put("test-" + i, priority);
                Thread.sleep((long) (50 * Math.random()));
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                Thread.sleep(1000);
                System.out.println("consume one message: " + message);
            }

        };
    }

}

偶尔你大概会有错觉,优先级设置并从未起效。那是因为事先级是对此队列积压的因素而言,借使消费速度过快有只怕出现在后二个成分入队操作从前前三个因素已经被消费,那种气象下DistributedPriorityQueue会退化为DistributedQueue。

分布式延迟队列—DistributedDelayQueue

JDK中也有DelayQueue,不清楚您是还是不是熟谙。
DistributedDelayQueue也提供了接近的法力, 成分有个delay值,
消费者隔一段时间才能接收成分。 涉及到下边两个类。

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedDelayQueue

通过下边的语句创设:

QueueBuilder<MessageType>    builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();

放入成分时得以内定delayUntilEpoch

queue.put(aMessage, delayUntilEpoch);

注意delayUntilEpoch不是离以后的一个小时距离,
比如20微秒,而是将来的三个时间戳,如 System.currentTimeMillis() + 10秒。
假诺delayUntilEpoch的年华已经过去,音信会及时被消费者接受。

public class DistributedDelayQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedDelayQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildDelayQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put("test-" + i, System.currentTimeMillis() + 10000);
            }
            System.out.println(new Date().getTime() + ": already put all items");


            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println(new Date().getTime() + ": consume one message: " + message);
            }

        };
    }
}

SimpleDistributedQueue

前方就算已毕了各个队列,但是你注意到没有,那么些队列并从未落到实处类似JDK一样的接口。
SimpleDistributedQueue提供了和JDK基本一致的接口(可是没有兑现Queue接口)。
创造很粗略:

public SimpleDistributedQueue(CuratorFramework client,String path)

日增成分:

public boolean offer(byte[] data) throws Exception

去除成分:

public byte[] take() throws Exception

别的还提供了其余情势:

public byte[] peek() throws Exception
public byte[] poll(long timeout, TimeUnit unit) throws Exception
public byte[] poll() throws Exception
public byte[] remove() throws Exception
public byte[] element() throws Exception

没有add方法, 多了take方法。

take办法在功成名就重返之前会被封堵。
poll艺术在队列为空时直接重回null。

public class SimpleDistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        SimpleDistributedQueue queue;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));
            client.start();
            queue = new SimpleDistributedQueue(client, PATH);
            Producer producer = new Producer(queue);
            Consumer consumer = new Consumer(queue);
            new Thread(producer, "producer").start();
            new Thread(consumer, "consumer").start();
            Thread.sleep(20000);
        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    public static class Producer implements Runnable {

        private SimpleDistributedQueue queue;

        public Producer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                try {
                    boolean flag = queue.offer(("zjc-" + i).getBytes());
                    if (flag) {
                        System.out.println("发送一条消息成功:" + "zjc-" + i);
                    } else {
                        System.out.println("发送一条消息失败:" + "zjc-" + i);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static class Consumer implements Runnable {

        private SimpleDistributedQueue queue;

        public Consumer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                byte[] datas = queue.take();
                System.out.println("消费一条消息成功:" + new String(datas, "UTF-8"));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


}

唯独其实发送了100条消息,消费完第二条之后,前面的新闻不能消费,近日没找到原因。查看一下官方文档推荐的demo使用下边多少个Api:

Creating a SimpleDistributedQueue

public SimpleDistributedQueue(CuratorFramework client,
                              String path)
Parameters:
client - the client
path - path to store queue nodes
Add to the queue

public boolean offer(byte[] data)
             throws Exception
Inserts data into queue.
Parameters:
data - the data
Returns:
true if data was successfully added
Take from the queue

public byte[] take()
           throws Exception
Removes the head of the queue and returns it, blocks until it succeeds.
Returns:
The former head of the queue
NOTE: see the Javadoc for additional methods

唯独实际上利用发现依然存在消费阻塞难题。

分布式屏障—Barrier

分布式Barrier是那样2个类:
它会阻塞全体节点上的守候历程,直到某两个被满意,
然后有所的节点继续拓展。

譬如赛马比赛中, 等赛马陆续赶来起跑线前。
一声令下,全数的赛马都飞奔而出。

DistributedBarrier

DistributedBarrier类已毕了栅栏的效能。 它的构造函数如下:

public DistributedBarrier(CuratorFramework client, String barrierPath)
Parameters:
client - client
barrierPath - path to use as the barrier

第三你须求设置栅栏,它将卡住在它上边等待的线程:

setBarrier();

然后须求阻塞的线程调用方法等待放行条件:

public void waitOnBarrier()

当条件满意时,移除栅栏,全部等待的线程将继续执行:

removeBarrier();

可怜处理 DistributedBarrier
会监控连接情况,当连接断掉时waitOnBarrier()方法会抛出万分。

public class DistributedBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
            controlBarrier.setBarrier();

            for (int i = 0; i < QTY; ++i) {
                final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
                final int index = i;
                Callable<Void> task = () -> {
                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " waits on Barrier");
                    barrier.waitOnBarrier();
                    System.out.println("Client #" + index + " begins");
                    return null;
                };
                service.submit(task);
            }
            Thread.sleep(10000);
            System.out.println("all Barrier instances should wait the condition");
            controlBarrier.removeBarrier();
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            Thread.sleep(20000);
        }
    }
}

其一例子创设了controlBarrier来安装栅栏和移除栅栏。
我们创造了五个线程,在此Barrier上等待。
最终移除栅栏后拥有的线程才继续执行。

比方您初始不设置栅栏,全部的线程就不会阻塞住。

双栅栏—DistributedDoubleBarrier

双栅栏允许客户端在盘算的初步和了结时一并。当丰富的长河进入到双栅栏时,进度始起盘算,
当总计完毕时,离开栅栏。 双栅栏类是DistributedDoubleBarrier
构造函数为:

public DistributedDoubleBarrier(CuratorFramework client,
                                String barrierPath,
                                int memberQty)
Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until
all members have entered. When leave() is called, it blocks until all members have left.

Parameters:
client - the client
barrierPath - path to use
memberQty - the number of members in the barrier

memberQty是成员数量,当enter()情势被调用时,成员被堵塞,直到全数的成员都调用了enter()
leave()措施被调用时,它也不通调用线程,直到全数的分子都调用了leave()
如同百米赛跑竞赛, 发令枪响,
全数的选手开头跑,等具有的选手跑过巅峰线,比赛才停止。

DistributedDoubleBarrier会监控连接情状,当连接断掉时enter()leave()方法会抛出十二分。

public class DistributedDoubleBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
                final int index = i;
                Callable<Void> task = () -> {

                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " enters");
                    barrier.enter();
                    System.out.println("Client #" + index + " begins");
                    Thread.sleep((long) (3000 * Math.random()));
                    barrier.leave();
                    System.out.println("Client #" + index + " left");
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

}

参考资料:
《从PAXOS到ZOOKEEPERubicon分布式一致性原理与实践》
《 跟着实例学习ZooKeeper的用法》博客体系

品种仓库:
https://github.com/zjcscut/curator-seed
(其实本文的MD是带导航目录[toc]的,相比便利导航到各种章节,只是简书不帮忙,本文的MD原文放在项目标/resources/md目录下,有爱自取,小说用Typora编写,提出用Typora打开)

End on 2017-5-13 13:10.
Help yourselves!
作者是throwable,在里斯本奋斗,白天上班,早晨和双休不定时加班,晌午悠闲锲而不舍写下博客。
期望本身的篇章可以给你带来收获,共勉。

相关文章