import me

"あなたの当たり前が誰かのためになる"

RxKotlin:{Merge, Zip, CombineLatest}編

はじめに

前々回はこちらについて書き、

rozkey.hatenablog.com

 

前回はこちらについて書きました。

rozkey.hatenablog.com

 

今回はその続きで、RxJavaにおける合成オペレータの紹介です。

 

合成オペレータとは

勝手に呼んでいるのですが、(公式にあるのかな?ちょっと調べてみないと分からん)複数のFlowable / Observableに対して1つにまとめたり揃ったタイミングで通知したりするオペレータのことを僕はそう呼んでます。

 

Merge

Mergeメソッドは、複数のFlowable / Observableを1つにまとめて同時に実行するオペレータです。複数のデータの流れを一つにまとめることができます。

サンプルプログラムを見ていきましょう。

fun main(args: Array<String>) {
val ob1 = Observable.interval(500L, TimeUnit.MILLISECONDS)
.take(10)
.map { data -> data.toString() + "A" }
val ob2 = Observable.interval(300L, TimeUnit.MILLISECONDS)
.take(20)
.map { data -> data * 3 }

Observable.zip(ob1, ob2, {data1, data2 -> Arrays.asList(data1, data2)})
.subscribe({
println("Zip data : " + it.toString())
}, {})
Thread.sleep(10000L)
}

どっちの値か分かりやすいように変換を行っています。

 

500ミリ秒ごとに数値を生成して10件のみAという文字列を加えて文字に変換したObservableと、300ミリ秒ごとに数値を生成して20件のみ3倍にしたObservableを合成して同時に実行するサンプルプログラムです。

 

実行結果は以下になります。

f:id:rozkey59:20180211192954g:plain

若干カーソルがウザイ感じがしますが気にしないでください。

確かに二つのObservableで生成されたデータが一つに結合されて同時に一つのObservableにて実行されていることが分かります。

 

Zip

Zipメソッドは、複数のFlowable / Observableの通知データが揃ったタイミングで新しいデータを生成することができます。

サンプルプログラムを見ていきましょう。

fun main(args: Array<String>) {
val ob1 = Observable.interval(500L, TimeUnit.MILLISECONDS)
.take(10)
.map { data -> data.toString() + "A" }
val ob2 = Observable.interval(300L, TimeUnit.MILLISECONDS)
.take(20)
.map { data -> data * 3 }

Observable.zip(ob1, ob2, {data1, data2 -> Arrays.asList(data1, data2)})
.subscribe({
println("Zip data : " + it.toString())
}, {})
Thread.sleep(10000L)
}

今回は 500ミリ秒ごとに数値を生成して10件のみAという文字列を加えて文字に変換したObservableと、300ミリ秒ごとに数値を生成して20件のみ3倍にしたObservableをzipメソッドを読んで実行するサンプルプログラムです。

 

Mergeの時と比較するために最初に用意しているObservableを変えていません。

実行結果が以下になります。

f:id:rozkey59:20180211193438g:plain

Mergeと比較すると確かにデータがそろったタイミングで新しいデータを生成できていることが分かります。逆にデータがそろわない場合は生成していないことが分かります。

 

CombineLatest

CombineLatestメソッドでは、複数のFlowable / Observableがデータを通知するたびに新しくデータを生成しています。

サンプルプログラムを見ていきましょう。

fun main(args: Array<String>) {
val ob1 = Observable.interval(500L, TimeUnit.MILLISECONDS)
.take(10)
.map { data -> data.toString() + "A" }
val ob2 = Observable.interval(300L, TimeUnit.MILLISECONDS)
.take(20)
.map { data -> data * 3 }

Observable.combineLatest(ob1, ob2, {data1, data2 -> Arrays.asList(data1, data2)})
.subscribe({
println("CombineLatest data : " + it.toString())
}, {})
Thread.sleep(10000L)
}

zipの時と変わったのがcombineLatestの部分だけです。

Zipとどう違うのか比較するために実行結果を見てみましょう。

f:id:rozkey59:20180211210830g:plain

片方のObservableの通知があるたびに新しいデータが生成されていることが分かります。zipでは揃わないと新しいデータが生成されなかったのに対して、どちらか片方通知があることで新しくデータを生成していくことができます。

 

まとめ

今回は合成オペレータのMerge, Zip, CombineLatestについての紹介でした。

実際にAPIから取ってきたデータを扱う際に有効的に使えそうだなとサンプルを作成して動かしてみて思いました。

今回使用した例に関しては、どうしても合成するタイプのオペレータは例が作り辛かったので値や処理等は変えていますが下記資料を参考にしています。

下記の本はRxJavaを理解するのに非常にわかりやすいので、もっと詳しく知りたい方は是非購入して読んでみてください。

RxJavaリアクティブプログラミング 著:須田智之