实战RxJava之combineLatest & distinctUntilChanged

最近项目中碰到了这样一个需求:

  1. 在每当Tab切换到特定页面,并满足onResume状态
  2. 构建得到的查询条件和上一次成功查询条件不一致

时,查询并获取特定的信息。

通常的做法是:

  1. 添加切换Tab和onResume的Listener,在onResume的Listener中保存onResume状态。在切换Tab的Listener触发时检查onResume状态,满足则进入下一步
  2. 构建相应的查询条件,将查询条件和上一次成功的条件进行比较,如果不一致则查询并进入下一步
  3. 查询成功时记录上一次成功查询条件,失败时重置上一次成功查询条件为空

看上去可以Work,实际上大多数运行状况中也是可以的,但这里有个隐藏的前提条件:

Tab切换到特定页面的事件后,onResume状态已经被设置好了

如果不满足这个条件,那么查询永远不会执行。

实际上,我们希望无论Tab切换到特定页面满足onResume状态的先后顺序,只要在条件满足时就进行下一步。在onResume的Listener中在加一个检查显然不够好,如果条件增加,再加上一些异步,很快代码就会一团糟。

好在我们有RxJava。其中的combineLatestdistinctUntilChanged能很好的解决问题。Marbles图能很直观的看出来这两个操作符的作用:

combineLatest
http://reactivex.io/documentation/operators/combinelatest.html

combineLatest

combineLatestzip类似,不同的时它会在一堆Observable中任意一个发射数据项时,将剩余Observable的最近的数据项,加上当前数据项,一起交给我们定义好的FunctionFunction将这些数据项合成一个,并在新的Observable中发射。第一次发射数据时会满足条件:所有原始数据源都发射过至少一次数据项。

distinctUntilChanged
http://reactivex.io/documentation/operators/distinct.html

distinctUntilChanged

distinctUntilChanged会在发射的数据项变化时,发射一次变化的数据项。

用Kotlin写的样例代码:

class RxPS {
    // Precondition may be from EventBus
    val resumeSubject: BehaviorSubject<Boolean> = BehaviorSubject.create<Boolean>()
    val tabSubject: BehaviorSubject<Int> = BehaviorSubject.create<Int>() 

    // Dummy Data
    var place = 0
    var name = ""
    fun fetchPlace(): Single<Int> = Single.just(place)
    fun fetchName(): Single<String> = Single.just(name)

    var isLoaded = false

    fun subscribe() {
        combineLatestAND(
                resumeSubject,
                tabSubject.map { it == 1 }
        ).filter { it }.flatMapSingle {
            zipPlaceName(fetchPlace(), fetchName())
            //Single.zip(fetchPlace(), fetchName(), BiFunction<Int, String, Pair<Int, String>> { t1, t2 -> Pair(t1, t2) }) 
            //当然也可以这样
        }.distinctUntilChanged { t1: Pair<Int, String>, t2: Pair<Int, String> ->
            isLoaded && t1 == t2
        }.flatMapSingle {
            //do some load
            Single.just(it).subscribeOn(Schedulers.io()).delay(1, TimeUnit.SECONDS)
        }.observeOn(Schedulers.single()).subscribe ({
            isLoaded = true
            println(it)
        }, { err ->
            isLoaded = false
            println(err)
        })
    }
}

fun combineLatestAND(vararg sources: Observable<Boolean>): Observable<Boolean> {
    val combineANDFunction: Function<Array<Any>, Boolean> = Function { t -> t.find { it == false } == null }
    return Observable.combineLatest(sources, combineANDFunction)
}

fun zipPlaceName(place: Single<Int>, name: Single<String>): Single<Pair<Int, String>> {
    val sources: List<Single<out Any>> = listOf(place, name)
    val zipFunction: Function<Array<Any>, Pair<Int, String>> = Function { t ->
        Pair(t[0] as Int, t[1] as String)
    }
    return Single.zip(sources, zipFunction)
}

combineANDFunction就是用于合并数据项的函数。注意由于RxJava由于Java的原因,Function<Array<Any>,XXXX>的第一个参数只能是Any。

这里有个小Trick,就是在distinctUntilChanged中判断isLoaded。不愿意这样写可以再写层flatMap,根据isLoaded构建不同的Rx流。

条件的判断和构建查询条件都用的是Rx,之后再多条件和异步都可以搞定。

Finish and Enjoy!!

comments powered by Disqus