DEVLOG 2.5 RxJava (一) 手寫實(shí)現(xiàn)subscribeOn和observeOn操作符
參考內(nèi)容:動腦學(xué)院 RxJava 課程
代碼連接在這里:https://github.com/kolibreath/Practices/tree/master/HandsOnRxJava/core
有幫助的小伙伴可以給這篇文章投幣 點(diǎn)贊 收藏 然后star一下我的代碼倉庫咯~

春節(jié)期間聽了一下動腦學(xué)院關(guān)于RxJava的課程,雖然之前RxJava也經(jīng)常用過,不過當(dāng)時對于函數(shù)式編程的體驗(yàn)不深刻,特別是對于一些操作符根本沒法理解?,F(xiàn)在重新看看之后感覺好了很多。即使RxJava已經(jīng)不如當(dāng)時那么熱門,現(xiàn)在通過手寫一些操作符的方式更能更好地理解觀察者模式和裝飾器模式。
我個人感覺這門課還是非常好的,但是可能因?yàn)檎n程時常的限制,老師不能鞭辟入里的講解實(shí)現(xiàn)思路。所以我今天來復(fù)盤一下老師講的Observable#create,map,flatmap, subscribeOn以及observeOn的實(shí)現(xiàn)思路,并且指出老師的實(shí)現(xiàn)中可能存在的問題。
本文內(nèi)容分成以下幾個板塊:
手寫RxJava中幾個類之間的關(guān)系
手寫Observable#create
手寫Observable#map
手寫Observable#flatmap
手寫subscribeOn和observeOn
目前的話我也只看到手寫observeOn這里,雖然我可以將課程全部看完再進(jìn)行總結(jié),但是我還是習(xí)慣一邊思考一邊學(xué)習(xí),不想淺嘗輒止。如果在后面的課程中對于代碼本身有所修正,以后面的課程為準(zhǔn)。

手寫RxJava中幾個類之間的關(guān)系
RxJava是一種更加細(xì)致的觀察者模式。在以往的觀察者模式中,觀察者(Observer)的引用被 被觀察者(Observable)持有,事件通過被觀察者調(diào)用觀察者的方法傳遞。

我們來看看RxJava中是怎么處理的。在RxJava中抽象出了被觀察者的頂級父類ObservableSource。實(shí)現(xiàn)的類如ObservableCreate,ObservableMapper繼承Observable。同時事件傳遞的具體功能者通過Emitter實(shí)現(xiàn) 。
Observable和Emitter的綁定通過ObservableOnSubscribe實(shí)現(xiàn)。
綁定的具體含義我們通過Observable#create的實(shí)現(xiàn)來理解。

手寫Observable#create
我們在寫create之前理清一下思路:
我們是要進(jìn)行鏈?zhǔn)秸{(diào)用,create一般是Observable的一個靜態(tài)方法,并且后面需要跟一些如map,flatmap等等操作。所以我們不妨寫一個類,ObservableCreate繼承Observable。
事件的通知使用Emitter解耦,并且Emitter實(shí)例可以通過ObservableOnSubscribe的實(shí)現(xiàn),因此ObservableCreate中需要持有一個ObservableOnSubscribe。
Emitter的具體實(shí)現(xiàn)需要指明。
基于以上三點(diǎn),我相信ObservableCreate的思路還是相當(dāng)清晰的:
之后需要在Observable中實(shí)現(xiàn)靜態(tài)方法,并且返回ObservableCreate(作為Observable的一個實(shí)現(xiàn)):
這里有一個小細(xì)節(jié),Emitter是直接調(diào)用對應(yīng)的觀察者的對應(yīng)方法(Emitter#onNext和Observer#onNext相對應(yīng)),這點(diǎn)和原始的觀察者模式一模一樣。
具體的思路圖如下:
需要實(shí)現(xiàn)Observable#create 需要實(shí)現(xiàn)吧ObservableOnSubscribe Emitter,最后實(shí)現(xiàn)鏈?zhǔn)秸{(diào)用上的Observable#subscribe中的Observer lambda。

所以O(shè)bservable#subscribe意味著Emitter#onNext會調(diào)用Observer#onNext

手寫實(shí)現(xiàn)map和flatmap
map和flatmap都是轉(zhuǎn)換操作符,他們的區(qū)別在于map是對于類型的轉(zhuǎn)換。所以在我們首先可以在Observable中給出如下定義:
原類型是T,改變之后的類型是U。我們需要具體實(shí)現(xiàn)一個ObservableMap。
我們來整理一下思路:
上游的類型是T,我們通過實(shí)現(xiàn)Function進(jìn)行一個轉(zhuǎn)換,稱為U類型
下游的觀察者接收到的類型應(yīng)該也是U類型
觀察者的實(shí)現(xiàn)在Observable#subscribe中給出,我們可以通過【包裝】這個Observer。具體的思路可以是在onNext中進(jìn)行類型轉(zhuǎn)換,然后再發(fā)射出去
具體的代碼如下:
這里有一個細(xì)節(jié),考慮一下為什么還需要再subscribeActual中重新調(diào)用Observable#subscribe? 在沒有調(diào)用map之前,如果emitter發(fā)射一個事件,就會間接調(diào)用Observer#onNext,Observer中定義的onNext是通過Observer#subscribe傳入的,因?yàn)槲覀冄b飾的MapObserver也調(diào)用了傳入的Observer#onNext,所以需要重新進(jìn)行subscribe。
flatmap和map的思路類似,但是flatmap是輸出一個Observable,那必然不可能在onNext中直接進(jìn)行映射類型,然后發(fā)射。
我們可以采取類似的思路,首先進(jìn)行類型轉(zhuǎn)換,將原始的T類型得到一個Obsevable<R>,使用這個Observable<R>,訂閱傳入的Observer lambda。
這里還是需要注意上面的細(xì)節(jié),事件的傳遞是通過subscribe展開的。具體的代碼如下,可以看看注釋:

手寫subscribeOn和observeOn
subscribeOn的具體的作用是指定事件發(fā)射的線程;observeOn是指定事件被接受的線程。在考慮具體的實(shí)現(xiàn)之前,我們首先看看RxJava的實(shí)現(xiàn)。對于以下代碼,是否添加subscribeOn、observeOn的區(qū)別構(gòu)成了三個版本:
都沒有添加subscribeOn
添加了subscribeon無observeOn
兩者都添加
三種代碼的輸出圖如下:
如果看不清的小伙伴可以右鍵,選擇在新標(biāo)簽頁打開放大看。
相同的顏色表示在同一個線程。比如第二張圖,subscribeOn是綠色,Map1和Map2,onNext都是綠色,表示在相同的線程。

可以觀察到這樣的規(guī)律:
在沒有observeOn的情況下,subscribeOn可以影響所有和事件發(fā)射、轉(zhuǎn)換相關(guān)的操作符的線程。并且影響Observer#onNext onComplete onError
在有observeOn的情況下只能影響它上面的操作符,并且observeOn指定了observer的線程。
onSubscribe不隨著subscribeOn和observeOn的線程調(diào)度變化而變化。
特別是最后一條規(guī)律,在課程中老師寫代碼時,雖然直接在subscribe的時候立刻寫observer.onSubscribe非常合理,但是仔細(xì)品一品的話還是有還原原版的RxJava的操作的味道的,因?yàn)橐WConSubscribe的線程不是subscribeOn指定的線程。
對于subscribeOn的實(shí)現(xiàn),我們肯定需要將事件發(fā)射的線程切換走。調(diào)用subscribeOn的時候,其實(shí)我們只需要將observable.subscribe(observer)在其他線程執(zhí)行即可,具體的原因我歸結(jié)在了這張圖中:

因?yàn)閛bservable.subscribe(observer)意味著Emitter#onNext會調(diào)用observer#onNext,修改這里就會修改上面從create走出來的邏輯的線程。
記得上面說過的那個點(diǎn),onSubscribe方法和創(chuàng)建初始的Observable的線程一致的問題嗎?我們可以在observable.subscribe(observer)被調(diào)度之前調(diào)用observable#onSubscribe方法,但是這里必須要重新寫一個Observer,這是為了避免onSubscribe被調(diào)用兩次。
具體的代碼如下:
observeOn就需要將接受事件的方法如Observer#onNext等放入新的線程。這里需要注意開一個隊列挨個消費(fèi)事件,這個是為了避免在事件消費(fèi)過程中出現(xiàn)問題,需要終止消費(fèi)行為。不過這里在課程中老師的消費(fèi)事件的幾個變量沒有說明清楚作用,而且處理的機(jī)制存在問題。不過這個不是重點(diǎn),我直接原樣照搬過來了,著重體會在其他線程中處理事件的邏輯就行。
代碼如下: