やわらかテック

興味のあること。業務を通して得られた発見。個人的に試してみたことをアウトプットしています🍵

Coroutines入門とThreadLocalとの併用に気を付けるべし

早いものでKotlinを仕事で書き始めて3ヶ月が経ちました。
ありがたい?ことにKotlinの非同期処理の扱いについて全く知らないな...と反省した機会に恵まれたので、色々と調べてみた結果、Kotlinで非同期処理を行う場合には大きく2つの選択肢があることが分かりました。

  • Coroutines(kotlinx.coroutines.Coroutines)を使う
    • 言語標準機能ではなく、ライブラリとして提供しているのが面白い🦆
  • Java標準ライブラリから提供されているjava.util.concurrentを使う

基本的にはCorountinesを使えば問題ないのですが、実はシンプルなCoroutinesを使えないかもしれないケースが存在します。 最終的になぜ2つの選択肢があるのか考えたいと思いますが、まずは自分のような「Coroutines is ナニソレ」状態から脱却すべく前提知識としてCoroutinesについて軽く触れていきたいと思います。
「あー、ここに書いてあることは知ってるぜ」と感じたら最後の「混ぜるな危険☠️」まで飛ばしてください。

Coroutines

CoroutinesというワードからGoのGoroutinesを連想して、独自のスケジューラーによってOSレベルのスレッドではなく軽量スレッドが動作しているのだろう...と勝手に想像していましたが、実際にはJavaのスレッド機構を利用して実装されているそうです。 ただCoroutines独自のコンテキストスイッチを行なっており、OS依存ではないためピュアなJavaのスレッド処理と比べて軽量かつ高速に動作します。

また、Java19系でバーチャルスレッドという軽量スレッドを動作させるための機構が導入されました。
今後はJavaでのスレッド処理の事情が変わってくるやもしれませんし、そうなればKotlin側にも何かしらの仕組みが導入される可能性があります。

簡単な使い方

Coroutinesの詳しい使い方は、すでに多くの方が解説していると思うので、自分のメモ程度に使い方を記載します。
新しくCoroutinesを起動する方法としてよくあるのは、以下2つの関数によるものです。

  • runBlocking: 完了まで処理をブロックする
  • launch: 完了まで処理をブロックしない
    • runBlocking内部で実行しないとエラーになる

※launchの戻り値(Job型)に対して.join()を実行すると完了までブロックする

import kotlinx.coroutines.*

import kotlinx.coroutines.*

fun main() {
    runBlocking {
        launch { println("A") }
        launch { println("B") }
        
        println("C")
    }

    runBlocking {
        println("D")
    }
}

// 実行結果:
// C
// A
// B
// D

コードには2つのrunBlockingが登場しますが、実際に処理がブロックされて順に結果が出力されています。
興味深いのは1つ目のrunBlocking内部の実行結果です。launchは処理をブロックしないため、実行する度にC->A->BだったりC->B->Aだったり...するのかと思いましたが、何度実行してもC->A->Bの順番になります。

解釈としてはrunBlocking内部に記述した処理は上から順に実行されていき、新しく起動したCoroutinesは都度、スケジューリングされて実行されるということでしょう。試しにprintln("A")の前にdelay(1000L)を書いてみると実行結果がC->B->Aとなりました。

CoroutineScopeとCoroutineContext

Coroutinesが実行される上で重要なりそうな2つの概念についても軽く触れたいと思います。
特にCoroutinesContextについて知っておくと、後に触れますがCorountinesを使えないかもしれないケースの1つである、ThreadLocalとの組み合わせが危険である理由が見えてくるでしょう。

CoroutineScope

runBlockingのブロック内部ではCoroutineScopeというスコープが適用されています。
同じようにrunBlockingも1つのCoroutineScopeを宣言しているのですが、明示的にcoroutineScopeと記述することで新たなスコープを宣言できます。スコープ内の処理が全て完了するまで次のスコープの処理は実行されないため、前の実行結果とは異なりprintln("A")が先に表示されていないことが分かります。

import kotlinx.coroutines.*

fun main() = runBlocking {
    coroutineScope {
        launch { println("B")}
    }
    println("A")
}

// B
// A

非同期処理を実装する際に、タスクのグルーピングをしたい場合にはCoroutineScopeは便利そうです。
関数として切り出す場合はsuspend funと宣言する必要があります。

import kotlinx.coroutines.*

fun main() {
    runBlocking {
        doSomethings()
        println("done.")
    }
}


suspend fun doSomethings() {
    coroutineScope {
        launch {
            println("do A")
        }
        println("do B")
    }
}

// do B
// do A
// done.

CoroutineContext

CoroutineContextとはCoroutinesが保持するコンテキスト情報です。
Dispatcher(どのスレッド・スレッドプールで実行されるか)やJobNameなどの情報を持っており、Dispatcherを起動時に指定すれば、どのスレッド・スレッドプールで処理を実行するかをある程度、制御することができます。

うまく使えば特定の処理のリソースを制限したり、コンテキストを分離して安全に情報を扱うことができそうです。 Dispatcherを指定しない場合は起動元のDispatcher情報が継承されますが、実行結果を見てみると、それぞれが別のスレッドで実行されていることが分かります。

import kotlinx.coroutines.*

fun main() {
    runBlocking {
        println("A: ${Thread.currentThread().name}")

        launch { 
            println("B: ${Thread.currentThread().name}")
            
            launch {
                println("C: ${Thread.currentThread().name}")       
            }
        }
    }

    println("D: ${Thread.currentThread().name}")
}

// A: main @coroutine#1
// B: main @coroutine#2
// C: main @coroutine#3
// D: main

次に任意のスレッドを事前にプールするケースを見てみます。
実行結果から、適当にスレッドが切り替わりdelayの前後で処理が実行されたスレッドが異なる場合があることが分かりました。 もしかしたら実行環境や状態によっては実行されるスレッドが同一になる可能性があります。

import kotlinx.coroutines.*
import java.util.concurrent.Executors

fun main() {
    val threadPool = Executors.newFixedThreadPool(2)
    runBlocking(threadPool.asCoroutineDispatcher()) {
        repeat(4) {
            launch {
                println("before[$it]: ${Thread.currentThread().name}")
                delay(1000L)
                println("after[$it]: ${Thread.currentThread().name}")
            }
        }
    }
}

// 実行結果(見やすさのために順序を入れ替えてあります)
// before[0]: pool-1-thread-2
// before[1]: pool-1-thread-1
// before[2]: pool-1-thread-2
// before[3]: pool-1-thread-1
// after[0]: pool-1-thread-2
// after[1]: pool-1-thread-1
// after[2]: pool-1-thread-1
// after[3]: pool-1-thread-1

混ぜるな危険☠️

基本的には手軽で便利なCoroutinesを使いたです。
しかし、残念ながらCoroutinesの使用を控えたいケースに遭遇しました。
例えば、使用しているライブラリがjava.util.concurrentが提供しているThreadLocalをベースに実装されているとCoroutinesでは、思ったように値の管理ができない可能性があります。

これはThreadLocalが特定のスレッドのみに対して値を保持するため、スレッドが切り替わると値が消失してしまうためです。 先ほどの実行結果から分かるようにCoroutinesは別スレッドで実行されることがある・することを指定できるので、思わぬデータの不整合を生み出してしまうかもしれません。 つまりThreadLocalをグローバルもしくは高域なスコープを持つ、ストレージのような使い方を想定している場合に注意が必要です。

以下のコードではThreadLocal<String>型の値に対してsetを使い値を更新したものの、他スレッドから値を参照した場合に、取得されるデータが異なることを検証したコードです。

import kotlinx.coroutines.*

fun main() {
    val threadLocal = ThreadLocal<String>()
    println("[Main] ${Thread.currentThread().name}") // [Main] main
    
    // contextにDispatchers.IOを指定
    // main関数が実行されるスレッドとは異なるスレッドで実行される
    runBlocking(Dispatchers.IO) {
        threadLocal.set("🍎")
        println("[runBlocking] ${Thread.currentThread().name}") // [runBlocking] DefaultDispatcher-worker-1 @coroutine#1
        println("[runBlocking] ${threadLocal.get()}") // [runBlocking] 🍎

        // launch(Dispatchers.Default)はまた別のスレッドで実行される
        val job = launch(Dispatchers.IO) {
            println("[runBlocking] ${Thread.currentThread().name}") // [runBlocking] DefaultDispatcher-worker-3 @coroutine#2
            println("[Job] Before: ${threadLocal.get()}") // [Job] Before: null
            threadLocal.set("🍊")
            println("[Job] After: ${threadLocal.get()}") // [Job] After: 🍊
        }

        job.join()
        println("[runBlocking] ${threadLocal.get()}") // [runBlocking] 🍎
    }

    println("[Main] ${threadLocal.get()}") // [Main] null
}

つまりCoroutinesとJava標準ライブラリで実装された非同期処理は混ぜるな危険であり、注意深く実装しないと思わぬデータ不整合を引き起こす可能性があります。「なんでKotlinでJava標準ライブラリで非同期処理書いたの?」と思ったのですが、これはリリース時期による影響が考えられます。

Coroutinesは2018年10月30日のKotlin1.3にてリリースされました。
そのため、それ以前に実装されたプログラムではJava標準ライブラリでの実装をする必要があり、現在もCoroutinesへの書き換えができていない・何かしらの理由によりできない状況だと考察できます。

Kotlin 1.3リリース - コルーチン、Kotlin/Nativeベータ | Post Blog

対策

ヘンリーでこの問題に立ち向かったid:agtnさんがネ申記事を公開してくれているので、解説を委ねます。
この記事を書こうと思ったのは、社内でThreadLocalの話を耳にした際に「Kotlinの非同期処理、全然分かってない...」と感じたからです。面白いテーマに触れる機会を頂き、ありがとうございました。

dev.henry.jp

参考文献