博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
跟着实例学习ZooKeeper的用法: 计数器
阅读量:6703 次
发布时间:2019-06-25

本文共 6942 字,大约阅读时间需要 23 分钟。

这一篇文章我们将学习使用Curator来实现计数器。 顾名思义,计数器是用来计数的, 利用ZooKeeper可以实现一个集群共享的计数器。 只要使用相同的path就可以得到最新的计数器值, 这是由ZooKeeper的一致性保证的。Curator有两个计数器, 一个是用int来计数,一个用long来计数。

SharedCount

这个类使用int类型来计数。 主要涉及三个类。

  • SharedCount
  • SharedCountReader
  • SharedCountListener

SharedCount代表计数器, 可以为它增加一个SharedCountListener,当计数器改变时此Listener可以监听到改变的事件,而SharedCountReader可以读取到最新的值, 包括字面值和带版本信息的值VersionedValue。

例子代码:

package com.colobu.zkrecipe.counter;import java.io.IOException;import java.util.List;import java.util.Random;import java.util.concurrent.Callable;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.recipes.shared.SharedCount;import org.apache.curator.framework.recipes.shared.SharedCountListener;import org.apache.curator.framework.recipes.shared.SharedCountReader;import org.apache.curator.framework.state.ConnectionState;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.curator.test.TestingServer;import com.google.common.collect.Lists;public class SharedCounterExample implements SharedCountListener{    private static final int QTY = 5;    private static final String PATH = "/examples/counter";    public static void main(String[] args) throws IOException, Exception {        final Random rand = new Random();        SharedCounterExample example = new SharedCounterExample();        try (TestingServer server = new TestingServer()) {            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));            client.start();            SharedCount baseCount = new SharedCount(client, PATH, 0);            baseCount.addListener(example);            baseCount.start();            List
examples = Lists.newArrayList(); ExecutorService service = Executors.newFixedThreadPool(QTY); for (int i = 0; i < QTY; ++i) { final SharedCount count = new SharedCount(client, PATH, 0); examples.add(count); Callable
task = new Callable
() { @Override public Void call() throws Exception { count.start(); Thread.sleep(rand.nextInt(10000)); System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))); return null; } }; service.submit(task); } service.shutdown(); service.awaitTermination(10, TimeUnit.MINUTES); for (int i = 0; i < QTY; ++i) { examples.get(i).close(); } baseCount.close(); } } @Override public void stateChanged(CuratorFramework arg0, ConnectionState arg1) { System.out.println("State changed: " + arg1.toString()); } @Override public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception { System.out.println("Counter's value is changed to " + newCount); }}

在这个例子中,我们使用baseCount来监听计数值(addListener方法)。 任意的SharedCount, 只要使用相同的path,都可以得到这个计数值。 然后我们使用5个线程为计数值增加一个10以内的随机数。

count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))

这里我们使用trySetCount去设置计数器。 第一个参数提供当前的VersionedValue,如果期间其它client更新了此计数值, 你的更新可能不成功, 但是这时你的client更新了最新的值,所以失败了你可以尝试再更新一次。 而setCount是强制更新计数器的值。

注意计数器必须start,使用完之后必须调用close关闭它。

在这里再重复一遍前面讲到的, 强烈推荐你监控ConnectionStateListener, 尽管我们的有些例子没有监控它。 在本例中SharedCountListener扩展了ConnectionStateListener。 这一条针对所有的Curator recipes都适用,后面的文章中就不专门提示了。

DistributedAtomicLong

再看一个Long类型的计数器。 除了计数的范围比SharedCount大了之外, 它首先尝试使用乐观锁的方式设置计数器, 如果不成功(比如期间计数器已经被其它client更新了), 它使用InterProcessMutex方式来更新计数值。 还记得InterProcessMutex是什么吗? 它是我们前面讲的分布式可重入锁。 这和上面的计数器的实现有显著的不同。

可以从它的内部实现DistributedAtomicValue.trySet中看出端倪。

AtomicValue
trySet(MakeValue makeValue) throws Exception { MutableAtomicValue
result = new MutableAtomicValue
(null, null, false); tryOptimistic(result, makeValue); if ( !result.succeeded() && (mutex != null) ) { tryWithMutex(result, makeValue); } return result; }

此计数器有一系列的操作:

  • get(): 获取当前值
  • increment(): 加一
  • decrement(): 减一
  • add(): 增加特定的值
  • subtract(): 减去特定的值
  • trySet(): 尝试设置计数值
  • forceSet(): 强制设置计数值

必须检查返回结果的succeeded(), 它代表此操作是否成功。 如果操作成功, preValue()代表操作前的值, postValue()代表操作后的值。

我们下面的例子中使用5个线程对计数器进行加一操作,如果成功,将操作前后的值打印出来。

package com.colobu.zkrecipe.counter;import java.io.IOException;import java.util.List;import java.util.concurrent.Callable;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.recipes.atomic.AtomicValue;import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.curator.retry.RetryNTimes;import org.apache.curator.test.TestingServer;import com.google.common.collect.Lists;public class DistributedAtomicLongExample {    private static final int QTY = 5;    private static final String PATH = "/examples/counter";    public static void main(String[] args) throws IOException, Exception {        try (TestingServer server = new TestingServer()) {            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));            client.start();            List
examples = Lists.newArrayList(); ExecutorService service = Executors.newFixedThreadPool(QTY); for (int i = 0; i < QTY; ++i) { final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10)); examples.add(count); Callable
task = new Callable
() { @Override public Void call() throws Exception { try { //Thread.sleep(rand.nextInt(1000)); AtomicValue
value = count.increment(); //AtomicValue
value = count.decrement(); //AtomicValue
value = count.add((long)rand.nextInt(20)); System.out.println("succeed: " + value.succeeded()); if (value.succeeded()) System.out.println("Increment: from " + value.preValue() + " to " + value.postValue()); } catch (Exception e) { e.printStackTrace(); } return null; } }; service.submit(task); } service.shutdown(); service.awaitTermination(10, TimeUnit.MINUTES); } }}

转载地址:http://dizlo.baihongyu.com/

你可能感兴趣的文章
我的友情链接
查看>>
java中四种进制的转换
查看>>
git多个远程仓库
查看>>
Linux之命令
查看>>
Android 6.0 特性
查看>>
shell 脚本作业
查看>>
程序员老司机都要错的 Python 陷阱与缺陷列表
查看>>
《netty入门与实战》笔记-06:心跳与空闲检测
查看>>
使用javascript开发的视差滚动效果的云彩 极客标签 - 做最棒的极客知识分享平台...
查看>>
SSM整合框架
查看>>
【安全牛学习笔记】CONTROL FRAME
查看>>
信号量 互斥锁 条件变量的区别(讲的很好,值得收藏)
查看>>
Linux学习笔记十二周四次课(4月26日)
查看>>
Linux之用户管理
查看>>
写一个shell脚本,把192.168.0.0/24网段在线的ip列出来。 思路: for循环, 0
查看>>
JDK安装与环境变量配置
查看>>
多字符与宽字符相互之间的转换
查看>>
CentOS 7.3 mini 解压安装二进制 mariadb 10.2.10
查看>>
RAD Studio增书签功能 单一窗口就可检视受标记
查看>>
硬盘基本知识(一)
查看>>