早いものでKotlinを仕事で書き始めて3ヶ月が経ちました。
ありがたい?ことにKotlinの非同期処理の扱いについて全く知らないな...と反省した機会に恵まれたので、色々と調べてみた結果、Kotlinで非同期処理を行う場合には大きく2つの選択肢があることが分かりました。
Coroutines
(kotlinx.coroutines.Coroutines
)を使う- 言語標準機能ではなく、ライブラリとして提供しているのが面白い🦆
- Java標準ライブラリから提供されている
java.util.concurrent
を使う
基本的にはCorountines
を使えば問題ないのですが、実はシンプルなCoroutines
を使えないかもしれないケースが存在します。
最終的になぜ2つの選択肢があるのか考えたいと思いますが、まずは自分のような「Coroutines
is ナニソレ」状態から脱却すべく前提知識としてCoroutines
について軽く触れていきたいと思います。
「あー、ここに書いてあることは知ってるぜ」と感じたら最後の「混ぜるな危険☠️」まで飛ばしてください。
Coroutines
Coroutines
というワードからGoのGoroutines
を連想して、独自のスケジューラーによってOSレベルのスレッドではなく軽量スレッドが動作しているのだろう...と勝手に想像していましたが、実際にはJavaのスレッド機構を利用して実装されているそうです。
ただCoroutines
独自のコンテキストスイッチを行なっており、OS依存ではないためピュアなJavaのスレッド処理と比べて軽量かつ高速に動作します。
また、Java19系でバーチャルスレッドという軽量スレッドを動作させるための機構が導入されました。
今後はJavaでのスレッド処理の事情が変わってくるやもしれませんし、そうなればKotlin側にも何かしらの仕組みが導入される可能性があります。
簡単な使い方
Coroutines
の詳しい使い方は、すでに多くの方が解説していると思うので、自分のメモ程度に使い方を記載します。
新しくCoroutines
を起動する方法としてよくあるのは、以下2つの関数によるものです。
- runBlocking: 完了まで処理をブロックする
- launch: 完了まで処理をブロックしない
runBlocking
内部で実行しないとエラーになる
※launchの戻り値(Job型)に対して.join()
を実行すると完了までブロックする
import kotlinx.coroutines.* import kotlinx.coroutines.* fun main() { runBlocking { launch { println("A") } launch { println("B") } println("C") } runBlocking { println("D") } } // 実行結果: // C // A // B // D
コードには2つのrunBlocking
が登場しますが、実際に処理がブロックされて順に結果が出力されています。
興味深いのは1つ目のrunBlocking
内部の実行結果です。launch
は処理をブロックしないため、実行する度にC->A->B
だったりC->B->A
だったり...するのかと思いましたが、何度実行してもC->A->B
の順番になります。
解釈としてはrunBlocking
内部に記述した処理は上から順に実行されていき、新しく起動したCoroutines
は都度、スケジューリングされて実行されるということでしょう。試しにprintln("A")
の前にdelay(1000L)
を書いてみると実行結果がC->B->A
となりました。
CoroutineScopeとCoroutineContext
Coroutines
が実行される上で重要なりそうな2つの概念についても軽く触れたいと思います。
特にCoroutinesContextについて知っておくと、後に触れますがCorountines
を使えないかもしれないケースの1つである、ThreadLocalとの組み合わせが危険である理由が見えてくるでしょう。
CoroutineScope
runBlocking
のブロック内部ではCoroutineScopeというスコープが適用されています。
同じようにrunBlocking
も1つのCoroutineScopeを宣言しているのですが、明示的にcoroutineScope
と記述することで新たなスコープを宣言できます。スコープ内の処理が全て完了するまで次のスコープの処理は実行されないため、前の実行結果とは異なりprintln("A")
が先に表示されていないことが分かります。
import kotlinx.coroutines.* fun main() = runBlocking { coroutineScope { launch { println("B")} } println("A") } // B // A
非同期処理を実装する際に、タスクのグルーピングをしたい場合にはCoroutineScopeは便利そうです。
関数として切り出す場合はsuspend fun
と宣言する必要があります。
import kotlinx.coroutines.* fun main() { runBlocking { doSomethings() println("done.") } } suspend fun doSomethings() { coroutineScope { launch { println("do A") } println("do B") } } // do B // do A // done.
CoroutineContext
CoroutineContextとはCoroutines
が保持するコンテキスト情報です。
Dispatcher(どのスレッド・スレッドプールで実行されるか)やJobNameなどの情報を持っており、Dispatcherを起動時に指定すれば、どのスレッド・スレッドプールで処理を実行するかをある程度、制御することができます。
うまく使えば特定の処理のリソースを制限したり、コンテキストを分離して安全に情報を扱うことができそうです。 Dispatcherを指定しない場合は起動元のDispatcher情報が継承されますが、実行結果を見てみると、それぞれが別のスレッドで実行されていることが分かります。
import kotlinx.coroutines.* fun main() { runBlocking { println("A: ${Thread.currentThread().name}") launch { println("B: ${Thread.currentThread().name}") launch { println("C: ${Thread.currentThread().name}") } } } println("D: ${Thread.currentThread().name}") } // A: main @coroutine#1 // B: main @coroutine#2 // C: main @coroutine#3 // D: main
次に任意のスレッドを事前にプールするケースを見てみます。
実行結果から、適当にスレッドが切り替わりdelay
の前後で処理が実行されたスレッドが異なる場合があることが分かりました。
もしかしたら実行環境や状態によっては実行されるスレッドが同一になる可能性があります。
import kotlinx.coroutines.* import java.util.concurrent.Executors fun main() { val threadPool = Executors.newFixedThreadPool(2) runBlocking(threadPool.asCoroutineDispatcher()) { repeat(4) { launch { println("before[$it]: ${Thread.currentThread().name}") delay(1000L) println("after[$it]: ${Thread.currentThread().name}") } } } } // 実行結果(見やすさのために順序を入れ替えてあります) // before[0]: pool-1-thread-2 // before[1]: pool-1-thread-1 // before[2]: pool-1-thread-2 // before[3]: pool-1-thread-1 // after[0]: pool-1-thread-2 // after[1]: pool-1-thread-1 // after[2]: pool-1-thread-1 // after[3]: pool-1-thread-1
混ぜるな危険☠️
基本的には手軽で便利なCoroutines
を使いたです。
しかし、残念ながらCoroutines
の使用を控えたいケースに遭遇しました。
例えば、使用しているライブラリがjava.util.concurrent
が提供しているThreadLocal
をベースに実装されているとCoroutines
では、思ったように値の管理ができない可能性があります。
これはThreadLocal
が特定のスレッドのみに対して値を保持するため、スレッドが切り替わると値が消失してしまうためです。
先ほどの実行結果から分かるようにCoroutines
は別スレッドで実行されることがある・することを指定できるので、思わぬデータの不整合を生み出してしまうかもしれません。
つまりThreadLocal
をグローバルもしくは高域なスコープを持つ、ストレージのような使い方を想定している場合に注意が必要です。
以下のコードではThreadLocal<String>
型の値に対してset
を使い値を更新したものの、他スレッドから値を参照した場合に、取得されるデータが異なることを検証したコードです。
import kotlinx.coroutines.* fun main() { val threadLocal = ThreadLocal<String>() println("[Main] ${Thread.currentThread().name}") // [Main] main // contextにDispatchers.IOを指定 // main関数が実行されるスレッドとは異なるスレッドで実行される runBlocking(Dispatchers.IO) { threadLocal.set("🍎") println("[runBlocking] ${Thread.currentThread().name}") // [runBlocking] DefaultDispatcher-worker-1 @coroutine#1 println("[runBlocking] ${threadLocal.get()}") // [runBlocking] 🍎 // launch(Dispatchers.Default)はまた別のスレッドで実行される val job = launch(Dispatchers.IO) { println("[runBlocking] ${Thread.currentThread().name}") // [runBlocking] DefaultDispatcher-worker-3 @coroutine#2 println("[Job] Before: ${threadLocal.get()}") // [Job] Before: null threadLocal.set("🍊") println("[Job] After: ${threadLocal.get()}") // [Job] After: 🍊 } job.join() println("[runBlocking] ${threadLocal.get()}") // [runBlocking] 🍎 } println("[Main] ${threadLocal.get()}") // [Main] null }
つまりCoroutines
とJava標準ライブラリで実装された非同期処理は混ぜるな危険であり、注意深く実装しないと思わぬデータ不整合を引き起こす可能性があります。「なんでKotlinでJava標準ライブラリで非同期処理書いたの?」と思ったのですが、これはリリース時期による影響が考えられます。
Coroutines
は2018年10月30日のKotlin1.3にてリリースされました。
そのため、それ以前に実装されたプログラムではJava標準ライブラリでの実装をする必要があり、現在もCoroutines
への書き換えができていない・何かしらの理由によりできない状況だと考察できます。
Kotlin 1.3リリース - コルーチン、Kotlin/Nativeベータ | Post Blog
対策
ヘンリーでこの問題に立ち向かったid:agtnさんがネ申記事を公開してくれているので、解説を委ねます。
この記事を書こうと思ったのは、社内でThreadLocal
の話を耳にした際に「Kotlinの非同期処理、全然分かってない...」と感じたからです。面白いテーマに触れる機会を頂き、ありがとうございました。