最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會員登陸 & 注冊

【Zookeeper】Apach Curator 框架源碼分析:初始化過程

2023-08-25 09:45 作者:懶時小窩  | 我要投稿

Part1介紹

Curator是netflix公司開源的一套zookeeper客戶端,目前是Apache的頂級項目。

和ZK的原生客戶端相比,Curator的抽象層次要更高,同時簡化了ZK的常用功能開發(fā)量,比如Curator自帶連接重試、反復(fù)注冊Watcher、NodeExistsException 異常處理等等。

根據(jù)官方的介紹,我們可以了解到它是一個用于分布式的Java客戶端API工具。它基于high-level API,擁有它可以更簡單易懂的指揮Zookeeper實現(xiàn)分布式安全應(yīng)用程序開發(fā)。

Curator由一系列的模塊構(gòu)成,對于一般開發(fā)者而言,常用的是curator-frameworkcurator-recipes,以及廣為熟知的 分布式鎖。

Curator 當然也包括許多擴展,比如服務(wù)發(fā)現(xiàn)基于Java 8異步DSL。

Apache Curator is a Java/JVM client library for?[Apache ZooKeeper](https://zookeeper.apache.org/), a distributed coordination service. Apache Curator includes a high-level API framework and utilities to make using Apache ZooKeeper much easier and more reliable. It also includes recipes for common use cases and extensions such as service discovery and a Java 8 asynchronous DSL.

用官方的介紹來說就是:guava之于java就像curator之于zookeeper

Part2ZK 版本支持

Curator 目前最新的版本為 5.X 的版本,已經(jīng)不支持 ZK 的 3.4.X 以及之前的版本,閱讀源碼之前經(jīng)過認真考慮,最終選擇了 ZK的 3.5.10 版本。

5.X 對于 Curator 做了不少破壞性的改動,不兼容的原因如下:

  • 舊的ListenerContainer類已經(jīng)被移除,以避免Guava類泄漏。

  • ConnectionHandlingPolicy和相關(guān)類已被刪除

  • Reaper和ChildReaper類/recipes已被刪除。您應(yīng)該改用 ZooKeeper 容器節(jié)點。

  • newPersistentEphemeralNode()和newPathChildrenCache()已從GroupMember中移除。

  • ServiceCacheBuilder< T> executorService(CloseableExecutorService executorService)已從ServiceCacheBuilder中移除。

  • ServiceProviderBuilder< T> executorService(CloseableExecutorService executorService)已從ServiceProviderBuilder中移除。

  • static boolean shouldRetry(int rc)已從RetryLoop中移除。

  • static boolean isRetryException(Throwable exception)已從RetryLoop中移除。

Part3官網(wǎng)地址

Apache Curator

1下載地址

Curator Maven 相關(guān)地址:https://mvnrepository.com/artifact/org.apache.curator

Curator jar包下載地址:https://cwiki.apache.org/confluence/display/CURATOR/Releases

Part4快速開始

2ZK 集群部署

學習之前需要使用ZK搭建集群環(huán)境,方便Debug的時候調(diào)試代碼。這部分搭建過程放到另一篇文章:

[[【Zookeeper】基于3臺linux虛擬機搭建zookeeper集群]]

3Maven依賴引入

下面是對應(yīng)的Zookeeper和Curator的版本選擇。

<curator.version>4.3.0</curator.version>??
<zookeeper.version>3.5.10</zookeeper.version>
<dependency>
?<groupId>org.apache.curator</groupId>
?<artifactId>curator-framework</artifactId>
?<version>${curator.version}</version>
?<exclusions>
??<exclusion>
???<groupId>org.apache.zookeeper</groupId>
???<artifactId>zookeeper</artifactId>
??</exclusion>
?</exclusions>
</dependency>

<dependency>
?<groupId>org.apache.curator</groupId>
?<artifactId>curator-recipes</artifactId>
?<version>${curator.version}</version>
?<exclusions>
??<exclusion>
???<groupId>org.apache.zookeeper</groupId>
???<artifactId>zookeeper</artifactId>
??</exclusion>
?</exclusions>
</dependency>

<dependency>
?<groupId>org.apache.zookeeper</groupId>
?<artifactId>zookeeper</artifactId>
?<version>${zookeeper.version}</version>
</dependency>

4構(gòu)建入門實例

Curator 最為核心和強大并且常用功能是分布式鎖。

在入門demo中可以看到整個 Curator 依靠 CuratorFrameworkFactory 構(gòu)建,使用 Curator 進行分布式加鎖解鎖操作,只需要為所連接的ZooKeeper集群提供一個CuratorFramework對象即可。

CuratorFrameworkFactory.newClient(zookeeperConnectionString,?retryPolicy)

上面的方法將會使用默認值創(chuàng)建與ZooKeeper集群的連接,調(diào)用放只需要關(guān)注使用到的重試策略。

RetryPolicy?retryPolicy?=?new?ExponentialBackoffRetry(1000,?3)
CuratorFramework?client?=?CuratorFrameworkFactory.newClient(zookeeperConnectionString,?retryPolicy);
client.start();

從參數(shù)值可以大致了解到,這里使用的策略是指數(shù)遞增間隔的方式嘗試重試時間,并且指定重試三次。

RetryPolicy?retryPolicy?=?new?ExponentialBackoffRetry(1000,?3);??
CuratorFramework?client?=??
????????CuratorFrameworkFactory.newClient("192.168.0.1;192.168.0.2;192.168.0.3",?retryPolicy);??
client.start();??
//?此處就獲取到?zk的一個連接實例。??
//.....

擁有了 CuratorFramework 實例之后,就可以直接通過 API 調(diào)用操作ZK。

client.create().forPath("/my/path",?myData)

這樣的直接調(diào)用還有個好處是client實例如果碰到網(wǎng)絡(luò)抖動等情況會自動重試,重試過程不需要開發(fā)者自己實現(xiàn)。

5可重入鎖(公平鎖)案例代碼

下面是官網(wǎng)可重入鎖的Demo使用代碼。

InterProcessMutex?lock?=?new?InterProcessMutex(client,?lockPath);
if?(?lock.acquire(maxWait,?waitUnit)?)?
{
????try?
????{
????????//?do?some?work?inside?of?the?critical?section?here
????}
????finally
????{
????????lock.release();
????}
}

這里改造一下即可簡單使用。

RetryPolicy?retryPolicy?=?new?ExponentialBackoffRetry(1000,?3);??
CuratorFramework?client?=??
????????CuratorFrameworkFactory.newClient("192.168.0.1,192.168.0.2,192.168.0.3",?retryPolicy);??
client.start();??
//?此處就獲取到?zk的一個連接實例。??
//.....??
client.create().forPath("/my/path",?"Test".getBytes());??
InterProcessMutex?lock?=?new?InterProcessMutex(client,?"/test/myLock");??
lock.acquire();??
try?{??
????//?do?some?work?inside?of?the?critical?section?here??
????Thread.sleep(3000);??
}?finally?{??
????lock.release();??
}

整個Demo案例代碼比較簡單,下面直接開始介紹初始化過程。

本文主要介紹和Curator初始化、內(nèi)部的通知機制以及會話管理部分。

Part5初始化過程流程圖

初始化過程流程圖全圖如下。下面將會一步步拆解這幅圖是如何拼湊的。

Curator 源碼分析.drawio.png

Drawio 源文件和圖片地址如下: 鏈接:https://pan.baidu.com/s/18PoMjkp11LztmNB3XgZ0qw?pwd=4bug提取碼:4bug

Part6初始化源碼分析

6CuratorFramework 初始化過程

初始化過程流程圖

CuratorFramework 初始化過程為下面截圖這一部分,紅色部分為個人認為相對比較重要的對象和變量。

image.png

CuratorFrameworkFactory.newClient() 代碼分析

下面通過CuratorFrameworkFactory.newClient()一步步探究整個初始化過程。

RetryPolicy?retryPolicy?=?new?ExponentialBackoffRetry(1000,?3);??
CuratorFramework?client?=??
????????CuratorFrameworkFactory.newClient("192.168.19.100:2181,192.168.19.101:2181,192.168.19.102:2181",?retryPolicy);

在獲取分布式鎖之前,我們需要先連接ZK集群,整個過程通過兩行代碼完成。

首先,我們需要確定連接ZK的重試策略,接著通過CuratorFrameworkFactory構(gòu)建Curator 實例,Curator 內(nèi)部根據(jù)ZK原生客戶端做了一層封裝,開發(fā)者使用過程中不需要關(guān)注。

RetryPolicy?retryPolicy?=?new?ExponentialBackoffRetry(1000,?3);??
CuratorFramework?client?=??CuratorFrameworkFactory.newClient("192.168.0.1,192.168.0.2,192.168.0.3",?retryPolicy);

上面是簡單的模板代碼。ExponentialBackoffRetry 構(gòu)建重試策略為按照指數(shù)增長重試時間,比如第一次1秒,第二次2秒,第三次4秒,第四次8秒.....

接著是利用CuratorFrameworkFactory構(gòu)建實例。

return?newClient(connectString,?DEFAULT_SESSION_TIMEOUT_MS,?DEFAULT_CONNECTION_TIMEOUT_MS,?retryPolicy);

這里強調(diào)一下兩個常量 DEFAULT_SESSION_TIMEOUT_MS (默認的會話超時時間)、DEFAULT_CONNECTION_TIMEOUT_MS(默認的連接超時時間),作用是傳入重試策略時候填寫默認參數(shù)。

private?static?final?int?DEFAULT_SESSION_TIMEOUT_MS
????=?Integer.getInteger("curator-default-session-timeout",?60?*?1000)
private?static?final?int?DEFAULT_CONNECTION_TIMEOUT_MS?=?Integer.getInteger("curator-default-connection-timeout",?15?*?1000);

我們進一步進入構(gòu)造方法,這里用了建造者模式。

return?builder().??
????connectString(connectString).??
????sessionTimeoutMs(sessionTimeoutMs).??
????connectionTimeoutMs(connectionTimeoutMs).??
????retryPolicy(retryPolicy).??
????build();

build()工作完成之后,后續(xù)的調(diào)用實際上調(diào)用的是CuratorFrameworkImpl實例,注意這里把CuratorFrameworkFactorythis引用逸出給CuratorFrameworkImpl對象。

return?new?CuratorFrameworkImpl(this);

CuratorFrameworkImpl 構(gòu)造方法的內(nèi)容比較多,這里在源碼對于相對重要的組件進行標注,這里的CuratorZookeeperClient這個對象,相當于ZK原生客戶端的封裝對象,Curator的很多質(zhì)量都是由它來完成調(diào)用的。

??
public?CuratorFrameworkImpl(CuratorFrameworkFactory.Builder?builder)??
{??
????ZookeeperFactory?localZookeeperFactory?=?makeZookeeperFactory(builder.getZookeeperFactory());??
????this.client?=?new?CuratorZookeeperClient??
????????(??
????????????localZookeeperFactory,??
????????????builder.getEnsembleProvider(),??
????????????builder.getSessionTimeoutMs(),??
????????????builder.getConnectionTimeoutMs(),??
????????????builder.getWaitForShutdownTimeoutMs(),??
????????????new?Watcher()??
????????????{??
????????????????@Override??
????????????????public?void?process(WatchedEvent?watchedEvent)??
????????????????
{??
????????????????????CuratorEvent?event?=?new?CuratorEventImpl(CuratorFrameworkImpl.this,?CuratorEventType.WATCHED,?watchedEvent.getState().getIntValue(),?unfixForNamespace(watchedEvent.getPath()),?null,?null,?null,?null,?null,?watchedEvent,?null,?null);??
????????????????????processEvent(event);??
????????????????}??
????????????},??
????????????builder.getRetryPolicy(),??
????????????builder.canBeReadOnly(),??
????????????builder.getConnectionHandlingPolicy()??
????????);??
??//用于判斷連接斷開和連接超時的狀態(tài),設(shè)置curator的連接狀態(tài),并通過connectionStateManager觸發(fā)連接事件狀態(tài)通知
????internalConnectionHandler?=?new?StandardInternalConnectionHandler();
?????
????//接收事件的通知。后臺線程操作事件和連接狀態(tài)事件會觸發(fā)?
????listeners?=?new?ListenerContainer<CuratorListener>();??
????
????//當后臺線程發(fā)生異?;蛘遠andler發(fā)生異常的時候會觸發(fā)
????unhandledErrorListeners?=?new?ListenerContainer<UnhandledErrorListener>();??
????//后臺線程執(zhí)行的操作隊列
????backgroundOperations?=?new?DelayQueue<OperationAndData<?>>();??
????forcedSleepOperations?=?new?LinkedBlockingQueue<>();??
????//命名空間
????namespace?=?new?NamespaceImpl(this,?builder.getNamespace());??

//線程工廠方法,初始化后臺線程池時會使用
????threadFactory?=?getThreadFactory(builder);??

maxCloseWaitMs?=?builder.getMaxCloseWaitMs();??

//負責連接狀態(tài)變化時的通知
????connectionStateManager?=?new?ConnectionStateManager(this,?builder.getThreadFactory(),?builder.getSessionTimeoutMs(),?builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(),?builder.getConnectionStateListenerDecorator());??
????compressionProvider?=?builder.getCompressionProvider();??
????aclProvider?=?builder.getAclProvider();??
????
????//CuratorFrameworkImpl的狀態(tài),調(diào)用start方法之前為?LATENT,調(diào)用start方法之后為?STARTED?,調(diào)用close()方法之后為STOPPEDstate?=?new?AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);??
????useContainerParentsIfAvailable?=?builder.useContainerParentsIfAvailable();?
????//錯誤連接策略?
????connectionStateErrorPolicy?=?Preconditions.checkNotNull(builder.getConnectionStateErrorPolicy(),?"errorPolicy?cannot?be?null");??
????schemaSet?=?Preconditions.checkNotNull(builder.getSchemaSet(),?"schemaSet?cannot?be?null");??
????zk34CompatibilityMode?=?builder.isZk34CompatibilityMode();??
??
????byte[]?builderDefaultData?=?builder.getDefaultData();??
????defaultData?=?(builderDefaultData?!=?null)???Arrays.copyOf(builderDefaultData,?builderDefaultData.length)?:?new?byte[0];??
????authInfos?=?buildAuths(builder);??
????
??//有保障的執(zhí)行刪除操作,其實是不斷嘗試直到刪除成功,通過遞歸調(diào)用實現(xiàn)
????failedDeleteManager?=?new?FailedDeleteManager(this);??
????
????//有保障的執(zhí)行刪除watch操作
????failedRemoveWatcherManager?=?new?FailedRemoveWatchManager(this);??
????
????namespaceFacadeCache?=?new?NamespaceFacadeCache(this);??
????
??//服務(wù)端可用節(jié)點的檢測器,第一次連接和重連成功之后都會觸發(fā)重新獲取服務(wù)端列表
????ensembleTracker?=?zk34CompatibilityMode???null?:?new?EnsembleTracker(this,?builder.getEnsembleProvider());??
??
????runSafeService?=?makeRunSafeService(builder);

newClient的目的是構(gòu)建ZK連接實例,包括一系列附加核心組件:后臺操作、連接事件、異常監(jiān)控、容器,命名空間、負載均衡等等。

7CuratorZookeeperClient 初始化過程

CuratorZookeeperClient 初始化過程流程圖

CuratorZookeeperClient 初始化過程圖如下:

image.png

CuratorZookeeperClient 初始化代碼分析

上面提到,CuratorFrameworkImpl的初始化過程中,有一段比較重要的CuratorZookeeperClient客戶端初始化過程,下面就來看看這個初始化過程干了啥。

public?CuratorZookeeperClient(ZookeeperFactory?zookeeperFactory,?EnsembleProvider?ensembleProvider,
????????????int?sessionTimeoutMs,?int?connectionTimeoutMs,?int?waitForShutdownTimeoutMs,?Watcher?watcher,
????????????RetryPolicy?retryPolicy,?boolean?canBeReadOnly,?ConnectionHandlingPolicy?connectionHandlingPolicy)

????
{

??//?StandardConnectionHandler當收到Disconnect事件后,如果在規(guī)定時間內(nèi)沒有重連到服務(wù)器,則會主動觸發(fā)Expired事件
????????this.connectionHandlingPolicy?=?connectionHandlingPolicy;
????????if?(?sessionTimeoutMs?<?connectionTimeoutMs?)
????????{
????????????log.warn(String.format("session?timeout?[%d]?is?less?than?connection?timeout?[%d]",?sessionTimeoutMs,?connectionTimeoutMs));
????????}
??//?重連策略
????????retryPolicy?=?Preconditions.checkNotNull(retryPolicy,?"retryPolicy?cannot?be?null");
????????ensembleProvider?=?Preconditions.checkNotNull(ensembleProvider,?"ensembleProvider?cannot?be?null");

????????this.connectionTimeoutMs?=?connectionTimeoutMs;
????????this.waitForShutdownTimeoutMs?=?waitForShutdownTimeoutMs;
????????//?//curator注冊到原生客戶端上的defaultWatcher,會收到和連接狀態(tài)有關(guān)的事件通知等,負責超時重連
????????state?=?new?ConnectionState(zookeeperFactory,?ensembleProvider,?sessionTimeoutMs,?connectionTimeoutMs,?watcher,?tracer,?canBeReadOnly,?connectionHandlingPolicy);

??//??重試策略設(shè)置
????????setRetryPolicy(retryPolicy);
????}

ConnectionStateCurator注冊到原生客戶端上的defaultWatcher,它會收到和連接狀態(tài)有關(guān)的事件通知等,負責超時重連操作等。

再來看下ConnectionState的構(gòu)造方法。

ConnectionState(ZookeeperFactory?zookeeperFactory,?EnsembleProvider?ensembleProvider,?int?sessionTimeoutMs,?int?connectionTimeoutMs,?Watcher?parentWatcher,?AtomicReference<TracerDriver>?tracer,?boolean?canBeReadOnly,?ConnectionHandlingPolicy?connectionHandlingPolicy)??
{??
????this.ensembleProvider?=?ensembleProvider;??
????this.sessionTimeoutMs?=?sessionTimeoutMs;??
????this.connectionTimeoutMs?=?connectionTimeoutMs;??
????this.tracer?=?tracer;??
????this.connectionHandlingPolicy?=?connectionHandlingPolicy;??
????if?(?parentWatcher?!=?null?)??
????{??
?????//?因為defaultWatcher只能有一個,通過parentWatchers可實現(xiàn)defaultWatcher接到事件通知時parentWatchers的回調(diào)
????????parentWatchers.offer(parentWatcher);??
????}??
??
????handleHolder?=?new?HandleHolder(zookeeperFactory,?this,?ensembleProvider,?sessionTimeoutMs,?canBeReadOnly);??
}

parentWatchers 使用了并發(fā)安全隊列 ConcurrentLinkedQueue,這部分屬于JDK并發(fā)編程的基礎(chǔ)內(nèi)容,這個隊列的作用如下:

ConcurrentLinkedQueue:一個基于鏈接節(jié)點的無界線程安全隊列。此隊列按照 FIFO(先進先出)原則對元素進行排序。隊列的頭部 是隊列中時間最長的元素。隊列的尾部 是隊列中時間最短的元素。新的元素插入到隊列的尾部,隊列獲取操作從隊列頭部獲得元素。當多個線程共享訪問一個公共 collection 時,ConcurrentLinkedQueue 是一個恰當?shù)倪x擇。此隊列不允許使用 null 元素。

private?final?Queue<Watcher>?parentWatchers?=?new?ConcurrentLinkedQueue<Watcher>();

8ConnectionStateManager 初始化過程

ConnectionStateManager 初始化過程流程圖

ConnectionStateManager 主要是持有Client引用,通過連接狀態(tài)管理工程創(chuàng)建構(gòu)建監(jiān)聽器,以及構(gòu)建只允許一個線程執(zhí)行的線程池。

Curator 的設(shè)計記錄是一個客戶端永遠只有一個線程負責工作。

image.png

ConnectionStateManager 初始化代碼分析

Curator框架初始化代碼中包含了 ConnectionStateManager 初始化,它主要負責狀態(tài)維護和連接狀態(tài)變更通知。

//負責連接狀態(tài)變化時的通知
connectionStateManager?=?new?ConnectionStateManager(this,?builder.getThreadFactory(),?builder.getSessionTimeoutMs(),?builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(),?builder.getConnectionStateListenerManagerFactory());

可以看到,如果要監(jiān)聽狀態(tài)改變,需要注冊一個監(jiān)聽器。相關(guān)的注冊方式在“”部分進行詳細介紹,這里先看下相關(guān)的成員變量以及初始化方法。

//連接狀態(tài)事件通知隊列
private?final?BlockingQueue<ConnectionState>?eventQueue?=?new?ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);

//需要通知的listeners?
private?final?UnaryListenerManager<ConnectionStateListener>?listeners;

//ConnectionStateManager的運行狀態(tài)?
private?final?AtomicReference<State>?state?=?new?AtomicReference<State>(State.LATENT);

ConnectionStateManager#ConnectionStateManager

/**
Params:
client?–?the?client?

threadFactory?–?thread?factory?to?use?or?null?for?a?default?

sessionTimeoutMs?–?the?ZK?session?timeout?in?milliseconds?

sessionExpirationPercent?–?percentage?of?negotiated?session?timeout?to?use?when?simulating?a?session?timeout.?0?means?don't?simulate?at?all?

managerFactory?–?manager?factory?to?use
*/

public?ConnectionStateManager(CuratorFramework?client,?ThreadFactory?threadFactory,?int?sessionTimeoutMs,?int?sessionExpirationPercent,?ConnectionStateListenerManagerFactory?managerFactory)??
{??
????this.client?=?client;??
????this.sessionTimeoutMs?=?sessionTimeoutMs;??
????this.sessionExpirationPercent?=?sessionExpirationPercent;??
????if?(?threadFactory?==?null?)??
????{??
????????threadFactory?=?ThreadUtils.newThreadFactory("ConnectionStateManager");??
????}??
????//事件隊列處理線程池
????service?=?Executors.newSingleThreadExecutor(threadFactory);??
????//?構(gòu)建監(jiān)聽器隊列
????listeners?=?managerFactory.newManager(client);??
}

9CuratorFrameworkImpl 啟動過程

CuratorFrameworkImpl啟動過程的主要工作如下:

  1. 啟動 ConnectionStateManager,同時負責連接事件的通知準備。

  2. 啟動 CuratorZookeeperClient ,建立服務(wù)端會話連接。

  3. 啟動一個單線程線程池,這個線程負責監(jiān)聽執(zhí)行后臺任務(wù)隊列,不斷從任務(wù)隊列取出元素并且執(zhí)行。

CuratorFrameworkImpl 啟動過程流程圖

image.png

客戶端連接 client.start();

調(diào)用start

client.start();??

client.start(); 內(nèi)部邏輯如下,這個方法的代碼都比較簡單,具體可以參考注釋理解。

public?void?start()??
{??
????log.info("Starting");??
????//?使用CAS把當前的運行狀態(tài)切換為?STARTED,狀態(tài)切換之后不可逆
????//?LATENT:CuratorFramework.start()?has?not?yet?been?called
????//?STARTED:?CuratorFramework.start()?has?been?called
????if?(?!state.compareAndSet(CuratorFrameworkState.LATENT,?CuratorFrameworkState.STARTED)?)??
????{??
????????throw?new?IllegalStateException("Cannot?be?started?more?than?once");??
????}??
??
????try??
????{??
?????//?ordering?dependency?-?must?be?called?before?client.start()??
?????//?順序依賴?-?必須在?client.start()之前調(diào)用。?
????????connectionStateManager.start();?
??//?構(gòu)建連接監(jiān)聽器,監(jiān)聽異常連接狀態(tài)
????????final?ConnectionStateListener?listener?=?new?ConnectionStateListener()??
????????{??
????????????@Override??
????????????public?void?stateChanged(CuratorFramework?client,?ConnectionState?newState)??
????????????
{??
?????????????//?CONNECTED:為第一次成功連接到服務(wù)器而發(fā)送。注意:對于任何一個CuratorFramework實例只會收到其中一條信息。
?????????????
?????????????//?RECONNECTED:一個暫停的、丟失的或只讀的連接已被重新建立
?????????????//?RECONNECTED:A?suspended,?lost,?or?read-only?connection?has?been?re-established
?????????????//?如果已經(jīng)連接或者正在重連
????????????????if?(?ConnectionState.CONNECTED?==?newState?||?ConnectionState.RECONNECTED?==?newState?)??
????????????????{??
????????????????????logAsErrorConnectionErrors.set(true);??
????????????????}??
????????????}??
??
????????????@Override??
????????????public?boolean?doNotDecorate()??
????????????
{??
????????????????return?true;??
????????????}??
????????};??
??//?注冊監(jiān)聽器
????????this.getConnectionStateListenable().addListener(listener);??
??//?全局啟動開發(fā)設(shè)置為true,ConnectionState?狀態(tài)更新
????????client.start();??
??//?構(gòu)建線程池
????????executorService?=?Executors.newSingleThreadScheduledExecutor(threadFactory);?????????//?執(zhí)行具備返回值的Callable?任務(wù)
????????executorService.submit(new?Callable<Object>()??
????????{??
????????????@Override??
????????????public?Object?call()?throws?Exception??
????????????
{??
?????????????//?關(guān)鍵部分:掛起后臺操作
????????????????backgroundOperationsLoop();??
????????????????return?null;????????????
???}??
????????});??
????
????????if?(?ensembleTracker?!=?null?)??
????????{??
????????????ensembleTracker.start();??
????????}??
??
????????log.info(schemaSet.toDocumentation());??
????}??
????catch?(?Exception?e?)??
????{??
????????ThreadUtils.checkInterrupted(e);??
????????handleBackgroundOperationException(null,?e);??
????}??
}

通過CAS操作將當前狀態(tài)更新為 STARTED,同時根據(jù)if邏輯可以得知start()方法不允許重復(fù)調(diào)用,這和 JDK的 Thread 設(shè)計思路比較相似,Thread 同樣只允許執(zhí)行一次start()方法。

CAS 操作成功則構(gòu)建連接監(jiān)聽器監(jiān)聽異常連接狀態(tài),監(jiān)聽器中會判斷當前客戶端是否已經(jīng)連接或者正在重連,如果是則設(shè)置logAsErrorConnectionErrors=true

我們繼續(xù)看關(guān)鍵部分backgroundOperationsLoop(); 。

后臺輪詢操作指令 backgroundOperationsLoop()

backgroundOperationsLoop()方法,根據(jù)名稱得知這是一個后臺循環(huán),后臺任務(wù)的整體流程如下:

private?void?backgroundOperationsLoop()??
{??
????try??
????{??
????????while?(?state.get()?==?CuratorFrameworkState.STARTED?)??
????????{??
????????????OperationAndData<?>?operationAndData;??
????????????try????????????
????????????{??
????????????????operationAndData?=?backgroundOperations.take();??
????????????????if?(?debugListener?!=?null?)??
????????????????{??
????????????????????debugListener.listen(operationAndData);??
????????????????}??
????????????????//?執(zhí)行后臺操作
????????????????performBackgroundOperation(operationAndData);??
????????????}??
????????????catch?(?InterruptedException?e?)??
????????????{??
?????????????//?在這里中斷異常會被吞掉。
????????????????//?swallow?the?interrupt?as?it's?only?possible?from?either?a?background??
????????????????//?operation?and,?thus,?doesn't?apply?to?this?loop?or?the?instance????????????????//?is?being?closed?in?which?case?the?while?test?will?get?it????????????}??
????????}??
????}??
????finally??
????{??
????????log.info("backgroundOperationsLoop?exiting");??
????}??
}

OperationAndData 實現(xiàn)了 Delayed 接口用于實現(xiàn)阻塞隊列延遲重試。

上面的處理邏輯如下:

  1. 判斷當前是否為STARTED狀態(tài),一直循環(huán)。

  2. 從阻塞隊列BlockingQueue當中彈出操作指令對象,在初始化代碼中可以得知是一個DelayQueue 延遲并發(fā)安全阻塞隊列,OperationAndData 對象毫無疑問實現(xiàn)了Delayed接口。

backgroundOperations?=?new?DelayQueue<OperationAndData<?>>();

  1. 判斷Debug 監(jiān)聽器是否存在,如果存在則監(jiān)聽OperationAndData

  2. 執(zhí)行后臺操作performBackgroundOperation,它的工作是從阻塞隊列不斷獲取數(shù)據(jù)操作OperationAndData 對象調(diào)用callPerformBackgroundOperation方法執(zhí)行。

  3. 如果無法正常連接ZK集群,此時會走else分支并且進入重連判斷邏輯。如果符合條件,則添加到阻塞隊列的當中等待下一次重試。(注意這里是主動重試,同步操作

void?performBackgroundOperation(OperationAndData<?>?operationAndData)
????
{
????????try
????????{
????????????if?(?!operationAndData.isConnectionRequired()?||?client.isConnected()?)
????????????{
????????????????operationAndData.callPerformBackgroundOperation();
????????????}
????????????else
????????????{
?????????????//?允許重連或者超時這樣的情況發(fā)生
????????????????client.getZooKeeper();??//?important?-?allow?connection?resets,?timeouts,?etc.?to?occur
??
????//?如果連接超時,則跑出?CuratorConnectionLossException?異常
????????????????if?(?operationAndData.getElapsedTimeMs()?>=?client.getConnectionTimeoutMs()?)
????????????????{
????????????????????throw?new?CuratorConnectionLossException();
????????????????}
????????????????//?如果沒有超時,則推入到?forcedSleepOperations?強制睡眠后等待重連
????????????????sleepAndQueueOperation(operationAndData);
????????????}
????????}
????????catch?(?Throwable?e?)
????????{
?????????//?檢查線程中斷
????????????ThreadUtils.checkInterrupted(e);

????????????/**
?????????????*?Fix?edge?case?reported?as?CURATOR-52.?ConnectionState.checkTimeouts()?throws?KeeperException.ConnectionLossException
?????????????*?when?the?initial?(or?previously?failed)?connection?cannot?be?re-established.?This?needs?to?be?run?through?the?retry?policy
?????????????*?and?callbacks?need?to?get?invoked,?etc.
?????????????*/

?????????????/*
?????????????修復(fù)報告為CURATOR-52的邊緣案例。當初始(或之前失敗的)連接無法重新建立時,ConnectionState.checkTimeouts()會拋出KeeperException.ConnectionLossException。這需要通過重試策略運行,回調(diào)需要被調(diào)用,等等。
?????????????*/

?????????????//?連接丟失異常處理
????????????if?(?e?instanceof?CuratorConnectionLossException?)
????????????{
????????????????WatchedEvent?watchedEvent?=?new?WatchedEvent(Watcher.Event.EventType.None,?Watcher.Event.KeeperState.Disconnected,?null);
????????????????CuratorEvent?event?=?new?CuratorEventImpl(this,?CuratorEventType.WATCHED,?KeeperException.Code.CONNECTIONLOSS.intValue(),?null,?null,?operationAndData.getContext(),?null,?null,?null,?watchedEvent,?null,?null);
????????????????//?如果重連次數(shù)
????????????????if?(?checkBackgroundRetry(operationAndData,?event)?)
????????????????{
?????????????????//?推送到backgroundOperations隊列嘗試重連
????????????????????queueOperation(operationAndData);
????????????????}
????????????????else
????????????????{
?????????????????//?放棄重連
????????????????????logError("Background?retry?gave?up",?e);
????????????????}
????????????}
????????????else
????????????{
?????????????//?否則需要處理后臺操作異常
????????????????handleBackgroundOperationException(operationAndData,?e);
????????????}
????????}
????}

這里順帶介紹下后臺決定是否重試的判斷邏輯,主要是根據(jù)用戶傳輸?shù)闹卦嚥呗詧?zhí)行對應(yīng)的重試邏輯判斷,比較經(jīng)典的策略模式實現(xiàn)。

client.getRetryPolicy().allowRetry(operationAndData.getThenIncrementRetryCount(),?operationAndData.getElapsedTimeMs(),?operationAndData)

operationAndData.callPerformBackgroundOperation() 后臺任務(wù)執(zhí)行

operationAndData 繼承了DelayQueue,運用多態(tài)特性擁有不同實現(xiàn),內(nèi)部只有一行代碼:

void?callPerformBackgroundOperation()?throws?Exception??
{??
????operation.performBackgroundOperation(this);??
}

operation.performBackgroundOperation(this); ?對應(yīng) BackgroundOperation#performBackgroundOperation

image.png

BackgroundOperation 后臺操作有很多具體的實現(xiàn),對應(yīng)了ZK常見操作。傳遞的this就是 operationAndData 對象。

image.png

10會話管理

  1. Client 連接狀態(tài)都是通過 ConnectionState 進行管理的,它會負責嘗試超時重連的操作。

  2. ConnectionStateManager 會負責連接狀態(tài)的改變和通知。

  3. ConnectionHandlingPolicy ?則對應(yīng)了連接超時策略的觸發(fā)。

在后臺輪詢隊列操作指令對象過程中會在狀態(tài)改變的時候嘗試重連,客戶端重連必然要通知到對應(yīng)的監(jiān)聽器,那么 Curator 是如何進行客戶端 會話狀態(tài)通知以及會話超時重連的?

連接事件監(jiān)聽和狀態(tài)變更 ConnectionState#process

ConnectionState#process的代碼可以得知,連接狀態(tài)相關(guān)的事件類型為Watcher.Event.EventType.None,會通知到所有的Wathcer。

其中ConnectionState作為 defaultWatcher ,它的事件回調(diào)如下:

public?void?process(WatchedEvent?event)??
{??
????if?(?LOG_EVENTS?)??
????{??
????????log.debug("ConnectState?watcher:?"?+?event);??
????}??
??
????if?(?event.getType()?==?Watcher.Event.EventType.None?)??
????{??
????//isConnected:客戶當前的連接狀態(tài),true表示已連接(SyncConnected?和?ConnectedReadOnly?狀態(tài))
????????boolean?wasConnected?=?isConnected.get();?
????????//?根據(jù)?org.apache.zookeeper.Watcher.Event.KeeperState?進行狀態(tài)判斷。?
????????boolean?newIsConnected?=?checkState(event.getState(),?wasConnected);??
????????if?(?newIsConnected?!=?wasConnected?)??
????????{??
?????????//?/如果連接狀態(tài)發(fā)生改變,則更新
????????????isConnected.set(newIsConnected);??
????????????connectionStartMs?=?System.currentTimeMillis();??
????????????if?(?newIsConnected?)??
????????????{??
????????????????
???//重連,更新會話超時協(xié)商時間
???//?NegotiatedSessionTimeoutMs(協(xié)商會話超時)。
???????????????????lastNegotiatedSessionTimeoutMs.set(handleHolder.getNegotiatedSessionTimeoutMs());??
????????????????log.debug("Negotiated?session?timeout:?"?+?lastNegotiatedSessionTimeoutMs.get());??
????????????}??
????????}??
????}??

?//?通知parentWatchers,?注意初始化的時候其實傳入了一個parentWatcher,會調(diào)用CuratorFrameworkImpl.processEvent
????for?(?Watcher?parentWatcher?:?parentWatchers?)??
????{??
????????OperationTrace?trace?=?new?OperationTrace("connection-state-parent-process",?tracer.get(),?getSessionId());??
????????parentWatcher.process(event);??
????????trace.commit();??
????}??
}

最后一段注釋提到可以看到遍歷parentWatchers并且調(diào)用process方法。這里實際上默認會有個Watcher,那就是在初始化的時候默認會注冊一個Watch作為parentWatcher傳入。

??this.client?=?new?CuratorZookeeperClient??
????????(??
????????????localZookeeperFactory,??
????????????builder.getEnsembleProvider(),??
????????????builder.getSessionTimeoutMs(),??
????????????builder.getConnectionTimeoutMs(),??
????????????builder.getWaitForShutdownTimeoutMs(),??
????????????new?Watcher()??
????????????{??
????????????????@Override??
????????????????public?void?process(WatchedEvent?watchedEvent)??
????????????????
{??
????????????????????CuratorEvent?event?=?new?CuratorEventImpl(CuratorFrameworkImpl.this,?CuratorEventType.WATCHED,?watchedEvent.getState().getIntValue(),?unfixForNamespace(watchedEvent.getPath()),?null,?null,?null,?null,?null,?watchedEvent,?null,?null);??
????????????????????//?注意初始化的時候其實傳入了一個parentWatcher,會調(diào)用CuratorFrameworkImpl.processEvent
????????????????????processEvent(event);??
????????????????}??
????????????},??
????????????builder.getRetryPolicy(),??
????????????builder.canBeReadOnly(),??
????????????builder.getConnectionHandlingPolicy()??
????????);?

image.png

這部分通知事件回調(diào)在下文會再次提到,這里簡單有關(guān)印象即可。

連接狀態(tài)檢查和處理 ConnectionState#checkState

連接狀態(tài)檢查和處理在ConnectionState#checkState方法中進行。

boolean?newIsConnected?=?checkState(event.getState(),?wasConnected);?
private?boolean?checkState(Event.KeeperState?state,?boolean?wasConnected)??
{??
????boolean?isConnected?=?wasConnected;??
????boolean?checkNewConnectionString?=?true;??
????switch?(?state?)??
????{??
????default:??
????case?Disconnected:??
????{??
????????isConnected?=?false;??
????????break;????}??
??
????case?SyncConnected:??
????case?ConnectedReadOnly:??
????{??
????????isConnected?=?true;??
????????break;????}??
?//?訪問權(quán)限異常
????case?AuthFailed:??
????{??
????????isConnected?=?false;??
????????log.error("Authentication?failed");??
????????break;????}??
??
????case?Expired:??
????{??
????????isConnected?=?false;??
????????checkNewConnectionString?=?false;??
????????handleExpiredSession();??
????????break;????}??
??
????case?SaslAuthenticated:??
????{??
????????//?NOP??
????????break;??
????}??
????}??
????//?the?session?expired?is?logged?in?handleExpiredSession,?so?not?log?here??
????//?會話過期被記錄在handleExpiredSession中,所以不記錄在這里。?
????if?(state?!=?Event.KeeperState.Expired)?{??
????????new?EventTrace(state.toString(),?tracer.get(),?getSessionId()).commit();??
????}??
??
????if?(?checkNewConnectionString?)??
????{??
?????//如果服務(wù)端列表發(fā)生變化,則更新
????????String?newConnectionString?=?handleHolder.getNewConnectionString();??
????????if?(?newConnectionString?!=?null?)??
????????{??
????????????handleNewConnectionString(newConnectionString);??
????????}??
????}??
??
????return?isConnected;??
}

上面根據(jù)不同連接狀態(tài)判斷連接是否異常, 返回結(jié)果為true則表示連接是正常的,當會話超時過期Expired時,會調(diào)用handleExpiredSession進行reset操作(會話被動重連),這里對于非連接超時的狀態(tài)進行時間追蹤。

注意重連策略 RetryPolicy這個策略在主動和被動重連中均會調(diào)用。

parentWatchers 注冊和回調(diào)

發(fā)生狀態(tài)變更的方法最后部分是通知所有的parentWatchers,下面來看看這個循環(huán)干了什么事情。

再次強調(diào)初始化的時候傳入了一個 parentWatcher,會調(diào)用CuratorFrameworkImpl.processEvent 方法,現(xiàn)在來看看這部分是如何注冊和回調(diào)的。

//?通知parentWatchers,注意初始化的時候其實傳入了一個parentWatcher,會調(diào)用CuratorFrameworkImpl.processEvent
????for?(?Watcher?parentWatcher?:?parentWatchers?)??
????{??
????????OperationTrace?trace?=?new?OperationTrace("connection-state-parent-process",?tracer.get(),?getSessionId());??
????????parentWatcher.process(event);??
????????trace.commit();??
????}??

我們直接看看這個默認的Watcher回調(diào)CuratorFrameworkImpl#processEvent(event) 相關(guān)代碼邏輯。

new?Watcher()??
{??
????@Override??
????public?void?process(WatchedEvent?watchedEvent)??
????
{??
????????CuratorEvent?event?=?new?CuratorEventImpl(CuratorFrameworkImpl.this,?CuratorEventType.WATCHED,?watchedEvent.getState().getIntValue(),?unfixForNamespace(watchedEvent.getPath()),?null,?null,?null,?null,?null,?watchedEvent,?null,?null);
????????//?處理事件??
????????processEvent(event);??
????}??
},

processEvent(event)相關(guān)邏輯如下,首先對于狀態(tài)變更判斷,狀態(tài)如果出現(xiàn)變更則通知到所有注冊在 CuratorListener 上的監(jiān)聽器。

private?void?processEvent(final?CuratorEvent?curatorEvent)??
{??
????if?(?curatorEvent.getType()?==?CuratorEventType.WATCHED?)??
????{??
?????//狀態(tài)轉(zhuǎn)換
????????validateConnection(curatorEvent.getWatchedEvent().getState());??
????}??
???//通知所有注冊的CuratorListener
????listeners.forEach(new?Function<CuratorListener,?Void>()??
????{??
????????@Override??
????????public?Void?apply(CuratorListener?listener)??
????????
{??
????????????try??
????????????{??
????????????????OperationTrace?trace?=?client.startAdvancedTracer("EventListener");??
????????????????//?接收回調(diào)事件
????????????????listener.eventReceived(CuratorFrameworkImpl.this,?curatorEvent);??
????????????????trace.commit();??
????????????}??
????????????catch?(?Exception?e?)??
????????????{??
????????????????ThreadUtils.checkInterrupted(e);??
????????????????logError("Event?listener?threw?exception",?e);??
????????????}??
????????????return?null;??
????????}??
????});??
}

其中validateConnection 負責連接狀態(tài)的轉(zhuǎn)換代碼。

CuratorFrameworkImpl#validateConnection

void?validateConnection(Watcher.Event.KeeperState?state)??
{??
????if?(?state?==?Watcher.Event.KeeperState.Disconnected?)??
????{??
????????internalConnectionHandler.suspendConnection(this);??
????}??
????else?if?(?state?==?Watcher.Event.KeeperState.Expired?)??
????{??
????????connectionStateManager.addStateChange(ConnectionState.LOST);??
????}??
????else?if?(?state?==?Watcher.Event.KeeperState.SyncConnected?)??
????{??
????????internalConnectionHandler.checkNewConnection(this);??
????????connectionStateManager.addStateChange(ConnectionState.RECONNECTED);??
????????unSleepBackgroundOperations();??
????}??
????else?if?(?state?==?Watcher.Event.KeeperState.ConnectedReadOnly?)??
????{??
????????internalConnectionHandler.checkNewConnection(this);??
????????connectionStateManager.addStateChange(ConnectionState.READ_ONLY);??
????}??
}

可以看到實際的狀態(tài)變更是依靠 ConnectionStateManager 組件負責的,ZK的原生客戶端狀態(tài)和Curator包裝的狀態(tài)對應(yīng)表如下:

image.png

此外還需要注意每一個 if 判斷的最后一行代碼中有一個添加 ConnectionState 的操作,這個操作的意義是通知所有注冊到 listenersConnectionStateListener。

connectionStateManager.addStateChange(ConnectionState.READ_ONLY);

至于怎么通知的會在下文介紹。

11通知機制

通知是干什么?其實就是在事件發(fā)生的時候,及時回調(diào)注冊的Listenrner監(jiān)聽器對應(yīng)的回調(diào)函數(shù)。Curator 針對不同組件設(shè)計了不同的監(jiān)聽器注冊和回調(diào)。

//?自定義監(jiān)聽器?CuratorListener
client.getCuratorListenable().addListener((_fk,?e)?->?{
?if?(e.getType().equals(CuratorEventType.WATCHED))?{
??log.info("測試");
?}

});
ConnectionStateListener?connectionStateListener?=?(client1,?newState)?->?{
?//Some?details
?log.info("newState?=>?"+?newState);
};

可以注冊的監(jiān)聽器方式如下:

  • 一次性 Watch 通知

  • 注冊 CuratorListener 通知

  • 注冊 ConnectionStateListener 通知

  • 注冊 UnhandledErrorListener 通知

  • 后臺線程操作完成時的回調(diào)通知

  • 緩存機制,多次注冊

一次性 Watch 通知

每次都需要反復(fù)通過下面的方法重新注冊。這里涉及到 NodeCache 的相關(guān)組件,由于目前并沒有介紹相關(guān)的前置代碼,這里暫時跳過介紹。

client.checkExists().creatingParentContainersIfNeeded().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);

注冊 CuratorListener 通知

實現(xiàn)方式很簡單,就是把監(jiān)聽器注冊到CuratorFrameworkImpl.listeners這個容器當中,后臺線程完成操作通知該監(jiān)聽器容器的所有監(jiān)聽器。

比如異步的方式在ZK上面創(chuàng)建路徑會觸發(fā)CuratorEventType.CREATE事件,還有就是連接狀態(tài)事件觸發(fā)的時候parentWatcher也會回調(diào)這些listeners,比如下面的代碼:

/**
?*?connect?ZK,?register?watchers
?*/

public?CuratorFramework?mkClient(Map?conf,?List<String>?servers,?Object?port,
?????????????????????????????????String?root,?final?WatcherCallBack?watcher)
?
{

????CuratorFramework?fk?=?Utils.newCurator(conf,?servers,?port,?root);

?//?自定義監(jiān)聽器?CuratorListener
????fk.getCuratorListenable().addListener(new?CuratorListener()?{
????????@Override
????????public?void?eventReceived(CuratorFramework?_fk,?CuratorEvent?e)?throws?Exception?{
????????????if?(e.getType().equals(CuratorEventType.WATCHED))?{
????????????????WatchedEvent?event?=?e.getWatchedEvent();

????????????????watcher.execute(event.getState(),?event.getType(),?event.getPath());
????????????}

????????}
????});

????fk.start();
????return?fk;
}

CuratorFrameworkImpl#processEvent

processEvent 方法總會進行注冊的 CuratorListener 回調(diào)操作。

private?void?processEvent(final?CuratorEvent?curatorEvent)
????
{
????????if?(?curatorEvent.getType()?==?CuratorEventType.WATCHED?)
????????{
????????????validateConnection(curatorEvent.getWatchedEvent().getState());
????????}

????????listeners.forEach(new?Function<CuratorListener,?Void>()
????????{
????????????@Override
????????????public?Void?apply(CuratorListener?listener)
????????????
{
????????????????try
????????????????{
????????????????????OperationTrace?trace?=?client.startAdvancedTracer("EventListener");
????????????????????listener.eventReceived(CuratorFrameworkImpl.this,?curatorEvent);
????????????????????trace.commit();
????????????????}
????????????????catch?(?Exception?e?)
????????????????{
????????????????????ThreadUtils.checkInterrupted(e);
????????????????????logError("Event?listener?threw?exception",?e);
????????????????}
????????????????return?null;
????????????}
????????});
????}

具體回調(diào)則是有各種執(zhí)行構(gòu)建實現(xiàn)器完成的,這一塊深究比較復(fù)雜,這里有個概念后續(xù)有需要查看相關(guān)實現(xiàn)即可。

image.png

注冊 ConnectionStateListener 通知

如果添加 ConnectionStateListener 監(jiān)聽器,則在連接狀態(tài)發(fā)生改變時,會收到通知。

ConnectionStateListener?connectionStateListener?=?new?ConnectionStateListener()
????{
????????@Override
????????public?void?stateChanged(CuratorFramework?client,?ConnectionState?newState)
????????
{
??????????//Some?details
????????}
????};
client.getConnectionStateListenable().addListener(connectionStateListener);

ConnectionStateListener 監(jiān)聽器的事件回調(diào)發(fā)生在ConnectionStateManager當中,但是前面我們只介紹了如何初始化,下面擴展介紹回調(diào)ConnectionStateListener的部分

ConnectionStateManager 如何回調(diào) ConnectionStateListener?

CuratorFrameworkImpl#validateConnection

上面講解會話機制的時候,提到了最后有一個添加 ConnectionState 的操作,這里將介紹收到 ConnectionState 變更之后如何回調(diào)注冊在自己身上的監(jiān)聽器。

void?validateConnection(Watcher.Event.KeeperState?state)??
{??
?//?......
????else?if?(?state?==?Watcher.Event.KeeperState.Expired?)??
????{??
????????connectionStateManager.addStateChange(ConnectionState.LOST);??
????}??
????else?if?(?state?==?Watcher.Event.KeeperState.SyncConnected?)??
????{??
????????unSleepBackgroundOperations();??
????}??
????else?if?(?state?==?Watcher.Event.KeeperState.ConnectedReadOnly?)??
????{??
????????connectionStateManager.addStateChange(ConnectionState.READ_ONLY);??
????}??
}

具體處理在下面這個方法中完成。

ConnectionStateManager#processEvents

private?void?processEvents()
????
{
????????while?(?state.get()?==?State.STARTED?)
????????{
????????????try
????????????{
????????????????int?useSessionTimeoutMs?=?getUseSessionTimeoutMs();
????????????????long?elapsedMs?=?startOfSuspendedEpoch?==?0???useSessionTimeoutMs?/?2?:?System.currentTimeMillis()?-?startOfSuspendedEpoch;
????????????????long?pollMaxMs?=?useSessionTimeoutMs?-?elapsedMs;

????????????????final?ConnectionState?newState?=?eventQueue.poll(pollMaxMs,?TimeUnit.MILLISECONDS);
????????????????if?(?newState?!=?null?)
????????????????{
????????????????????if?(?listeners.isEmpty()?)
????????????????????{
????????????????????????log.warn("There?are?no?ConnectionStateListeners?registered.");
????????????????????}
?????//?關(guān)鍵部分,當出現(xiàn)狀態(tài)變更進行回調(diào)監(jiān)聽器通知
????????????????????listeners.forEach(listener?->?listener.stateChanged(client,?newState));
????????????????}
????????????????else?if?(?sessionExpirationPercent?>?0?)
????????????????{
????????????????????synchronized(this)
????????????????????{
????????????????????????checkSessionExpiration();
????????????????????}
????????????????}
????????????}
????????????catch?(?InterruptedException?e?)
????????????{
????????????????//?swallow?the?interrupt?as?it's?only?possible?from?either?a?background
????????????????//??吞下中斷,因為它只可能來自后臺操作
????????????????
????????????????//?operation?and,?thus,?doesn't?apply?to?this?loop?or?the?instance
????????????????//?is?being?closed?in?which?case?the?while?test?will?get?it
????????????????//?如果實例在關(guān)閉有可能走到這一塊代碼
????????????}
????????}
????}

上面內(nèi)容重要的其實就一行代碼:

listeners.forEach(listener?->?listener.stateChanged(client,?newState));

這個processEvents是怎么回調(diào)的?其實在之前畫的 CuratorFrameworkImpl 啟動過程流程圖中就有展示。

image.png

ConnectionStateManager 當中有一個 ExecutorService 線程池,翻看代碼可以得知他的實現(xiàn)是 SingleThreadScheduledExecutor,這里含義明顯就是單獨開啟一個線程輪詢這一段代碼檢查 listener,狀態(tài)變更通知注冊在 ConnectionStateManager 上的監(jiān)聽器。

注冊 UnhandledErrorListener 通知

同理注冊到CuratorFrameworkImpl.unhandledErrorListeners當中,當后臺線程操作發(fā)生異?;蛘遠andler發(fā)生異常的時候會觸發(fā)。

注冊方式


/**
?*?connect?ZK,?register?watchers
?*/

public?CuratorFramework?mkClient(Map?conf,?List<String>?servers,?Object?port,
?????????????????????????????????String?root,?final?WatcherCallBack?watcher)
?
{

????CuratorFramework?fk?=?Utils.newCurator(conf,?servers,?port,?root);

?//?自定義監(jiān)聽器?UnhandledErrorListener
???fk.getUnhandledErrorListenable().addListener(new?UnhandledErrorListener()?{
????????@Override
????????public?void?unhandledError(String?msg,?Throwable?error)?{
????????????String?errmsg?=?"Unrecoverable?zookeeper?error,?halting?process:?"?+?msg;
????????????LOG.error(errmsg,?error);
????????????JStormUtils.halt_process(1,?"Unrecoverable?zookeeper?error");

????????}
????});

???
????fk.start();
????return?fk;
}

如何觸發(fā)?

觸發(fā)的相關(guān)代碼在CuratorFrameworkImpl#logError方法中,注意這里的apply方法處理。

void?logError(String?reason,?final?Throwable?e)??
{??
?//?省略其他無關(guān)代碼
????unhandledErrorListeners.forEach(new?Function<UnhandledErrorListener,?Void>()??
????{??
????????@Override??
????????public?Void?apply(UnhandledErrorListener?listener)??
????????
{??
????????????listener.unhandledError(localReason,?e);??
????????????return?null;????????
????????????}??
????});??

?//?省略無關(guān)代碼
}

后臺線程操作完成時的回調(diào)通知

對于不同操作比如 setData,可以通過鏈式調(diào)用的方式傳入回調(diào)函數(shù) callback,操作完成之后會執(zhí)行回調(diào)函數(shù)完成回調(diào)操作。

?public?static?void?setDataAsyncWithCallback(CuratorFramework?client,?BackgroundCallback?callback,?String?path,?byte[]?payload)?throws?Exception?{
????????//?this?is?another?method?of?getting?notification?of?an?async?completion
????????client.setData().inBackground(callback).forPath(path,?payload);
????}

緩存機制,多次注冊

Curator的緩存機制是一塊比較大的部頭,Curator 的緩存方式包括:

  • Path Cache

  • Node Cache

  • Tree Cache

緩存在使用之前會和服務(wù)端的節(jié)點數(shù)據(jù)進行對比,當數(shù)據(jù)不一致時,會通過watch機制觸發(fā)回調(diào)刷新本地緩存,同時再次注冊Watch,每次重連會注冊新的 Watcher,保證 Watcher永遠不丟失。

12小結(jié)

通過通知機制和會話管理兩個部分,我們了解到:

  • 客戶端通知是同步完成。

  • connectionStateManager.listeners是由內(nèi)部的線程池做異步通知

  • CuratorFrameworkImpl.listeners 對于連接狀態(tài)的通知,與watcher通知線程為同步,由后臺線程通知時為異步。

  • watcher注冊過多可能導致重連之后watcher丟失。

13回顧初始化過程

Curator框架實現(xiàn)CuratorFrameworkImpl啟動時,首先啟動連接狀態(tài)管理器ConnectionStateManager, 然后再啟動客戶端CuratorZookeeperClient。

構(gòu)造Curator框架實現(xiàn)CuratorFrameworkImpl初始化動客戶端CuratorZookeeperClient,注意在這里會默認傳入一個Watcher,用于處理CuratorEvent。)。

image.png

CuratorZookeeperClient啟動過程,關(guān)鍵點是在啟動連接狀態(tài)ConnectionState(在構(gòu)造CuratorZookeeperClient,初始化連接狀態(tài),并將內(nèi)部Watcher傳給連接狀態(tài))。

image.png

連接狀態(tài)實現(xiàn)了觀察者Watcher,在連接狀態(tài)建立時,調(diào)用客戶端CuratorZookeeperClient傳入的Watcher,處理相關(guān)事件。而這個Watcher是在現(xiàn)CuratorFrameworkImpl初始化動客戶端CuratorZookeeperClient時 傳入的。

image.png

客戶端觀察者的實際處理業(yè)務(wù)邏輯在CuratorFrameworkImpl實現(xiàn),也就是processEvent方法,processEvent主要處理邏輯為,遍歷CuratorFrameworkImpl內(nèi)部的監(jiān)聽器容器內(nèi)的監(jiān)聽器處理相關(guān)CuratorEvent 事件。這個CuratorEvent事件,是由原生WatchedEvent事件包裝而來。

image.png

啟動連接狀態(tài)管理器ConnectionStateManager,主要是使用連接狀態(tài)監(jiān)聽器容器(UnaryListenerManager< ConnectionStateListener>)Listenabler(之前版本叫 ListenerContainer)中的監(jiān)聽器。

ConnectionStateManager中監(jiān)聽器觸發(fā)具體工作是消費連接狀態(tài)事件隊列BlockingQueue中事件。這里BlockingQueue里面存放的是ConnectionState狀態(tài)變更之后【offer】的節(jié)點。

這部分又回到【注冊 ConnectionStateListener 通知】部分,狀態(tài)變更之后最后一段有一個connectionStateManager.addStateChange(XXXX); ?的小動作。

void?validateConnection(Watcher.Event.KeeperState?state)??
{??
?//?......
????else?if?(?state?==?Watcher.Event.KeeperState.Expired?)??
????{??
????????connectionStateManager.addStateChange(ConnectionState.LOST);??
????}??
????else?if?(?state?==?Watcher.Event.KeeperState.SyncConnected?)??
????{??
????????unSleepBackgroundOperations();??
????}??
????else?if?(?state?==?Watcher.Event.KeeperState.ConnectedReadOnly?)??
????{??
????????connectionStateManager.addStateChange(ConnectionState.READ_ONLY);??
????}??
}

通過代碼下探,最終回到下面的部分:

?private?void?postState(ConnectionState?state)
????
{
????????log.info("State?change:?"?+?state);

????????notifyAll();

????????while?(?!eventQueue.offer(state)?)
????????{
????????????eventQueue.poll();
????????????log.warn("ConnectionStateManager?queue?full?-?dropping?events?to?make?room");
????????}
????}

@since 4.2.0 return type has changed from ListenerContainer to Listenable

Part7寫到最后

本節(jié)介紹了Curator的基礎(chǔ)使用,從源碼角度分析了Curator 組件的初始化過程,并且簡單分析會話管理和通知機制的相關(guān)源碼調(diào)用。

下面是本文涉及到的源碼講解匯總的一副總圖。個人源碼分析過程如果有存在錯誤或者疑問歡迎反饋和討論。

Curator 源碼分析.drawio.png

最后是整個demo代碼:


@Slf4j
public?class?CuratorTestExample?{

????public?static?void?main(String[]?args)?throws?Exception?{
????????RetryPolicy?retryPolicy?=?new?ExponentialBackoffRetry(1000,?3);
????????CuratorFramework?client?=
????????????????CuratorFrameworkFactory.newClient("192.168.19.100:2181,192.168.19.101:2181,192.168.19.102:2181",?retryPolicy);

????????//?連接ZK,開啟連接

????????//?自定義監(jiān)聽器?CuratorListener
????????client.getCuratorListenable().addListener((_fk,?e)?->?{
????????????if?(e.getType().equals(CuratorEventType.WATCHED))?{
????????????????log.info("測試");
????????????}

????????});
????????ConnectionStateListener?connectionStateListener?=?(client1,?newState)?->?{
????????????//Some?details
????????????log.info("newState?=>?"+?newState);
????????};
????????//?11:31:17.026?[Curator-ConnectionStateManager-0]?INFO?com.zxd.interview.zkcurator.CuratorTestExample?-?newState?=>?CONNECTED
????????client.getConnectionStateListenable().addListener(connectionStateListener);
????????client.start();
????????//?此處就獲取到?zk的一個連接實例。
????????//.....
????????//?創(chuàng)建znode,如果有必要需要創(chuàng)建父目錄
????????client.create().creatingParentsIfNeeded().withProtection().forPath("/my/path",?"Test".getBytes());
????????InterProcessMutex?lock?=?new?InterProcessMutex(client,?"/my/path");
????????lock.acquire();
????????try?{
????????????//?do?some?work?inside?of?the?critical?section?here
????????????Thread.sleep(1000);
????????}?finally?{
????????????lock.release();
????????}

????}
}

Part8推薦閱讀

ZK客戶端Curator使用詳解 - 知乎 (zhihu.com)

https://cloud.tencent.com/developer/article/1648976?areaSource=106005.14

Curator目錄監(jiān)聽 | Ravitn Blog (donaldhan.github.io)


【Zookeeper】Apach Curator 框架源碼分析:初始化過程的評論 (共 條)

分享到微博請遵守國家法律
钦州市| 呈贡县| 冕宁县| 邢台市| 阿城市| 龙陵县| 登封市| 图们市| 贵州省| 兴仁县| 潼南县| 文水县| 峡江县| 长乐市| 隆化县| 孟村| 潞城市| 靖宇县| 天峨县| 孝昌县| 五大连池市| 元阳县| 靖江市| 延安市| 仙居县| 称多县| 元江| 贵南县| 白朗县| 友谊县| 宜阳县| 黄大仙区| 汉中市| 鹤岗市| 舟山市| 丹巴县| 肃北| 娱乐| 大理市| 沽源县| 临夏市|