やわらかテック

仕事の話などを書きます。執筆にはAIを使っていません

Coroutines入門とThreadLocalとの併用に気を付けるべし

早いものでKotlinを仕事で書き始めて3ヶ月が経ちました。
ありがたい?ことにKotlinの非同期処理の扱いについて全く知らないな...と反省した機会に恵まれたので、色々と調べてみた結果、Kotlinで非同期処理を行う場合には大きく2つの選択肢があることが分かりました。

  • Coroutines(kotlinx.coroutines.Coroutines)を使う
    • 言語標準機能ではなく、ライブラリとして提供しているのが面白い🦆
  • Java標準ライブラリから提供されているjava.util.concurrentを使う

基本的にはCorountinesを使えば問題ないのですが、実はシンプルなCoroutinesを使えないかもしれないケースが存在します。 最終的になぜ2つの選択肢があるのか考えたいと思いますが、まずは自分のような「Coroutines is ナニソレ」状態から脱却すべく前提知識としてCoroutinesについて軽く触れていきたいと思います。
「あー、ここに書いてあることは知ってるぜ」と感じたら最後の「混ぜるな危険☠️」まで飛ばしてください。

Coroutines

CoroutinesというワードからGoのGoroutinesを連想して、独自のスケジューラーによってOSレベルのスレッドではなく軽量スレッドが動作しているのだろう...と勝手に想像していましたが、実際にはJavaのスレッド機構を利用して実装されているそうです。 ただCoroutines独自のコンテキストスイッチを行なっており、OS依存ではないためピュアなJavaのスレッド処理と比べて軽量かつ高速に動作します。

また、Java19系でバーチャルスレッドという軽量スレッドを動作させるための機構が導入されました。
今後はJavaでのスレッド処理の事情が変わってくるやもしれませんし、そうなればKotlin側にも何かしらの仕組みが導入される可能性があります。

簡単な使い方

Coroutinesの詳しい使い方は、すでに多くの方が解説していると思うので、自分のメモ程度に使い方を記載します。
新しくCoroutinesを起動する方法としてよくあるのは、以下2つの関数によるものです。

  • runBlocking: 完了まで処理をブロックする
  • launch: 完了まで処理をブロックしない
    • runBlocking内部で実行しないとエラーになる

※launchの戻り値(Job型)に対して.join()を実行すると完了までブロックする

import kotlinx.coroutines.*

import kotlinx.coroutines.*

fun main() {
    runBlocking {
        launch { println("A") }
        launch { println("B") }
        
        println("C")
    }

    runBlocking {
        println("D")
    }
}

// 実行結果:
// C
// A
// B
// D

コードには2つのrunBlockingが登場しますが、実際に処理がブロックされて順に結果が出力されています。
興味深いのは1つ目のrunBlocking内部の実行結果です。launchは処理をブロックしないため、実行する度にC->A->BだったりC->B->Aだったり...するのかと思いましたが、何度実行してもC->A->Bの順番になります。

解釈としてはrunBlocking内部に記述した処理は上から順に実行されていき、新しく起動したCoroutinesは都度、スケジューリングされて実行されるということでしょう。試しにprintln("A")の前にdelay(1000L)を書いてみると実行結果がC->B->Aとなりました。

CoroutineScopeとCoroutineContext

Coroutinesが実行される上で重要なりそうな2つの概念についても軽く触れたいと思います。
特にCoroutinesContextについて知っておくと、後に触れますがCorountinesを使えないかもしれないケースの1つである、ThreadLocalとの組み合わせが危険である理由が見えてくるでしょう。

CoroutineScope

runBlockingのブロック内部ではCoroutineScopeというスコープが適用されています。
同じようにrunBlockingも1つのCoroutineScopeを宣言しているのですが、明示的にcoroutineScopeと記述することで新たなスコープを宣言できます。スコープ内の処理が全て完了するまで次のスコープの処理は実行されないため、前の実行結果とは異なりprintln("A")が先に表示されていないことが分かります。

import kotlinx.coroutines.*

fun main() = runBlocking {
    coroutineScope {
        launch { println("B")}
    }
    println("A")
}

// B
// A

非同期処理を実装する際に、タスクのグルーピングをしたい場合にはCoroutineScopeは便利そうです。
関数として切り出す場合はsuspend funと宣言する必要があります。

import kotlinx.coroutines.*

fun main() {
    runBlocking {
        doSomethings()
        println("done.")
    }
}


suspend fun doSomethings() {
    coroutineScope {
        launch {
            println("do A")
        }
        println("do B")
    }
}

// do B
// do A
// done.

CoroutineContext

CoroutineContextとはCoroutinesが保持するコンテキスト情報です。
Dispatcher(どのスレッド・スレッドプールで実行されるか)やJobNameなどの情報を持っており、Dispatcherを起動時に指定すれば、どのスレッド・スレッドプールで処理を実行するかをある程度、制御することができます。

うまく使えば特定の処理のリソースを制限したり、コンテキストを分離して安全に情報を扱うことができそうです。 Dispatcherを指定しない場合は起動元のDispatcher情報が継承されますが、実行結果を見てみると、それぞれが別のスレッドで実行されていることが分かります。

import kotlinx.coroutines.*

fun main() {
    runBlocking {
        println("A: ${Thread.currentThread().name}")

        launch { 
            println("B: ${Thread.currentThread().name}")
            
            launch {
                println("C: ${Thread.currentThread().name}")       
            }
        }
    }

    println("D: ${Thread.currentThread().name}")
}

// A: main @coroutine#1
// B: main @coroutine#2
// C: main @coroutine#3
// D: main

次に任意のスレッドを事前にプールするケースを見てみます。
実行結果から、適当にスレッドが切り替わりdelayの前後で処理が実行されたスレッドが異なる場合があることが分かりました。 もしかしたら実行環境や状態によっては実行されるスレッドが同一になる可能性があります。

import kotlinx.coroutines.*
import java.util.concurrent.Executors

fun main() {
    val threadPool = Executors.newFixedThreadPool(2)
    runBlocking(threadPool.asCoroutineDispatcher()) {
        repeat(4) {
            launch {
                println("before[$it]: ${Thread.currentThread().name}")
                delay(1000L)
                println("after[$it]: ${Thread.currentThread().name}")
            }
        }
    }
}

// 実行結果(見やすさのために順序を入れ替えてあります)
// before[0]: pool-1-thread-2
// before[1]: pool-1-thread-1
// before[2]: pool-1-thread-2
// before[3]: pool-1-thread-1
// after[0]: pool-1-thread-2
// after[1]: pool-1-thread-1
// after[2]: pool-1-thread-1
// after[3]: pool-1-thread-1

混ぜるな危険☠️

基本的には手軽で便利なCoroutinesを使いたです。
しかし、残念ながらCoroutinesの使用を控えたいケースに遭遇しました。
例えば、使用しているライブラリがjava.util.concurrentが提供しているThreadLocalをベースに実装されているとCoroutinesでは、思ったように値の管理ができない可能性があります。

これはThreadLocalが特定のスレッドのみに対して値を保持するため、スレッドが切り替わると値が消失してしまうためです。 先ほどの実行結果から分かるようにCoroutinesは別スレッドで実行されることがある・することを指定できるので、思わぬデータの不整合を生み出してしまうかもしれません。 つまりThreadLocalをグローバルもしくは高域なスコープを持つ、ストレージのような使い方を想定している場合に注意が必要です。

以下のコードではThreadLocal<String>型の値に対してsetを使い値を更新したものの、他スレッドから値を参照した場合に、取得されるデータが異なることを検証したコードです。

import kotlinx.coroutines.*

fun main() {
    val threadLocal = ThreadLocal<String>()
    println("[Main] ${Thread.currentThread().name}") // [Main] main
    
    // contextにDispatchers.IOを指定
    // main関数が実行されるスレッドとは異なるスレッドで実行される
    runBlocking(Dispatchers.IO) {
        threadLocal.set("🍎")
        println("[runBlocking] ${Thread.currentThread().name}") // [runBlocking] DefaultDispatcher-worker-1 @coroutine#1
        println("[runBlocking] ${threadLocal.get()}") // [runBlocking] 🍎

        // launch(Dispatchers.Default)はまた別のスレッドで実行される
        val job = launch(Dispatchers.IO) {
            println("[runBlocking] ${Thread.currentThread().name}") // [runBlocking] DefaultDispatcher-worker-3 @coroutine#2
            println("[Job] Before: ${threadLocal.get()}") // [Job] Before: null
            threadLocal.set("🍊")
            println("[Job] After: ${threadLocal.get()}") // [Job] After: 🍊
        }

        job.join()
        println("[runBlocking] ${threadLocal.get()}") // [runBlocking] 🍎
    }

    println("[Main] ${threadLocal.get()}") // [Main] null
}

つまりCoroutinesとJava標準ライブラリで実装された非同期処理は混ぜるな危険であり、注意深く実装しないと思わぬデータ不整合を引き起こす可能性があります。「なんでKotlinでJava標準ライブラリで非同期処理書いたの?」と思ったのですが、これはリリース時期による影響が考えられます。

Coroutinesは2018年10月30日のKotlin1.3にてリリースされました。
そのため、それ以前に実装されたプログラムではJava標準ライブラリでの実装をする必要があり、現在もCoroutinesへの書き換えができていない・何かしらの理由によりできない状況だと考察できます。

Kotlin 1.3リリース - コルーチン、Kotlin/Nativeベータ | Post Blog

対策

ヘンリーでこの問題に立ち向かったid:agtnさんがネ申記事を公開してくれているので、解説を委ねます。
この記事を書こうと思ったのは、社内でThreadLocalの話を耳にした際に「Kotlinの非同期処理、全然分かってない...」と感じたからです。面白いテーマに触れる機会を頂き、ありがとうございました。

dev.henry.jp

参考文献

【書評】脳に収まるコードの書き方とは結局、何なのか

オライリーから発売された「脳に収まるコードの書き方」という書籍を読了しました。
発売されるまで全く情報をキャッチできていなかったのですが、自分の近辺でこの書籍を購入している方がちらほらといて、特に内容・目次を精査することもなくノリで買ってしまいました。

僕はオライリーから発行される書籍の表紙に描かれる生き物が何なのか毎回、ワクワクしているのですが、この書籍では葬送のフリーレンに登場する防御魔法のようなヘックスが描かれています。まさかこれが書籍の内容を象徴するものだったとは...(後に分かります)

ネタバレ: 脳に収まるコードって何

コンピューターと人間の脳の違い

まずは、本書のタイトルの一部にある「脳に収まるコード」がどんなコードなのかを追っていきたいと思います。
その前置きとして、コンピューターと人間の脳の違いについて本書では紹介があります。コンピューターは膨大な情報を正確に記憶装置(RAM, HDDなど)へ記録して読み取ることができますが、複雑な思考・判断をすることができません。 一方で我々、人間の脳は複雑な思考・判断をしたり、意思決定をすることができますが、膨大な情報を正確に記憶することを苦手としています。

さらに人間の記憶には短期記憶長期記憶というものがあるそうです。
目の前に現れた情報は短期記憶として扱われた後、何かしらのきっかけによって脳が重要だと判断した情報は、短期記憶から長期記憶になることで長い間、脳に記憶され続けることになります。 コンピューターでは高速な読み書きが可能なメモリ、大容量だが読み書きが低速なストレージに分かれているのが一般的なアーキテクチャですが、これが脳を模倣したものなのかは分かりません。詳しい方がいたら教えてください。

コードを読んでいる時、情報はどう扱われているのか

前置きが長くなりましたが、人間がコードを読む時、情報はどのように扱われているのでしょうか。
コードに登場する情報には変数名、関数の引数・戻り値、型...などがありますが、それらはまずは短期記憶として記憶されます。その後、全体的なアーキテクチャ、頻出するパターンやなどは、先ほど記述したように段々と長期記憶になることがありますが、ほとんどの情報は短期記憶として役目を終えていくとのことです。

となると、変数名などの情報が多く短期記憶に記録されていればコードをスムーズに読むことができそうですが、残念ながら人間の短期記憶の容量は非常に小さく、おおよそ7つのことまでしか覚えられないと言われています。心理学の分野ではマジカルナンバー7という言葉があります。

マジカルナンバーとは、人間が瞬間的に保持できる情報の数は「7±2」であるとするもの。
アメリカのハーバード大学の心理学者、ジョージ・ミラー教授(George Armitage Miller)による1956年の論文「The Magical number seven, plus or minus two」で登場

マジカルナンバー7±2(ミラーの法則)とは 意味/解説 - シマウマ用語集

ここでタイトルを回収したいと思います。
つまり「脳に収まるコード」とは7つまでの情報に集約されたコードのことを指していました。
どこまでを情報としてカウントアップするのかについて正確な定義はありませんでしたが、自分は関心事の数という理解をしています。 例えば、以下のサンプルコードは情報が5つなので、脳に収まるコードだと判断することができます。

// 動作しない適当なコードです
fun main(args: Array<String>) {
    val cofigures = Configure.getConfig() // 1.設定の取得
    val middlewares = Middleware.getMiddlewares() // 2.ミドルウェア一覧の取得
    val routing = Router.getRouter() // 3.ルーターの取得
    
    // 4.サーバー情報の設定
    val server = Server(
      configure = configure,
      middlewares = middlewares,
      routing = routing,
      protocol = 'https',
    )

    // 5.サーバーの起動
    server.run(port = 8080)
}

書籍の表紙にあったヘックスの数が7個だったと気づいた方はいるでしょうか。
これは本書ではヘックスフラワーと呼ばれており、コードが持つ情報を以下のように書き込むことで脳に収まるコードかを判断するために使用されていました。

多くの章ではどのようにしてコードの情報を減らす・集約・分割する...というテクニックが紹介されています。
主にテスト駆動開発やAAA(Arrange・Act・Assert)、カプセル化、KISSの原則など...。よく周知されたテクニックだったので、自分目線では目新しいものはありませんでしたが、これらのテクニックを上手く使って脳に過負荷を与えないようにコードが変化していく過程が面白く学びがありました。

なぜ脳にコードを収めたいのか

ケント・ベック著の「実践パターン」という書籍でも触れられているように「コードは書いている時間よりも読んでいる時間の方が圧倒的に多い」です。 本書は実践パターンに影響を受けている印象があり「コードは読んでいる時間の方が多い」という視点で、コードの可読性が組織の開発パフォーマンスに大きく影響を与えることを指摘しています。

さらにコードは負債であり、開発年月が長くなればなるほど持続可能性が低下していきます。
これは自分の経験上からも間違いなくて、いつの間にか知らない機能のモジュールが追加されていたり、関数の引数が爆発的に増えていたりと日々、コードは複雑になり、変更することが難しくなっていきます。 そのために「脳に収まるコードを保ち続けなければならない」という主張があり、サイクロマティック複雑度やリファクタリング、CI/CDによる継続的デリバリーなどの文脈の話が登場してきます。

PS. 残念ながら「実践パターン」はすでに絶版となっているようで、Amazonでは15,000円近い値段となっており入手するのが難しい状況です。たまたま市立の図書館においてあったので、僕は読むことができました。図書館マジでオススメです。

勝手に対象読者

先ほども書いたように「真新しいトピックは正直ない」というのが自分の感想です。
偉大な先人達によって生み出されてきたテクニック、ツールやプロセスに対してリスペクトを送りつつ「脳に収まるコードかどうか」という筆者オリジナルを視点を加えて情報を集約したという印象でした。しかし、先ほども書いたように実際にテクニックを駆使して、脳に過負荷を与えないようにコードを変化させていく過程が秀逸であり、 凄腕プログラマーの実装の過程を覗き見ることができる一冊という感じです。

何か新しい開発方法やコーディングのテクニックを求めている方にはもっと良い書籍があるでしょう。
開発の経験を積んできた方が今までのやり方はどうだったんだろうかと振り返り、再発見するために読んでみるというのが良さそうです。

うーん、それにしてもフリーレンの防御魔法にしか見えない...

https://times-abema.ismcdn.jp/mwimgs/4/c/1200w/img_4c5a1132ff29b38fcd7d3e646a8c2938363871.jpg

少しでも「ええな〜」と思ったらはてなスター・はてなブックマーク・シェアを頂けると励みになります。

明日から使えるgRPC入門

業務でgRPCを使う機会があるのですが、個人で軽く触ったことがある程度で正しく理解できているか不安だったので、改めてインプットし直しました。この記事はさくらインターネットさんが公開されている記事から特に重要だと感じた箇所を抽出して、自身の備忘録として短くまとめたものです。

knowledge.sakura.ad.jp

実は前回も読んでいるはずなのですが、頭から抜けている箇所が多くありました...。

そもそもRPCって何

RPCとは通信プロトコルの1つで「遠隔手続き呼び出し」と訳せるようにクライント・サーバーモデルであり、どこかにあるサーバーに定義された関数をクライアントが呼び出すことで実現されます。 ここでいう通信プロトコルとは「HTTPHTTPSのような通信に使用されるプロトコルよりも上位の概念」だと認識しておくと理解がしやすいと思います。私はプロトコル・通信プロトコルという言葉が同じように使用されていて、とても混乱しました。

意外にもRPCの歴史は古く1976年にはRFCが発表されていたそうです。

遠隔手続き呼出し (RPC) の考え方は、少なくともRFC 707が発表された1976年まで遡る。
最初にRPCを商用に実用化したのはゼロックスの「Courier」であり、1981年のことであった。

遠隔手続き呼出し - Wikipedia

続きを読む

【書評】データ指向アプリケーションデザインを読了して見える世界

1ヶ月ほど読み進めていた「データ指向アプリケーションデザイン」を読了しました。
オライリーから出版されている本の中でも、かなり分厚い部類の書籍だったのと、章ごとの情報量が凄かったので結構、時間がかかってしまいました。

個人的には難易度の高い書籍だと思うのですが、それでも多くの方からオススメされてきた書籍でもあります。
今まで読んでは飽き...読んでは飽き...を繰り返していましたが、ようやく一貫して読了したので簡単に書評を書いてみたいと思います。

デカい...! 分厚い...!

続きを読む

SELECTの結果から複数のデータを複数INSERTする

SQLで初期データを作成したいというのは、よくあるケースかなと思います。
例えば全ての企業(companies)に対して初期ユーザー(users)を1件登録する必要があるとします。企業とユーザーは1対多の関係にあり、以下のようにINSERTSELECTを組み合わせることで簡単に全ての企業に対して初期ユーザーの追加が完了します。

企業一覧

postgres=# SELECT * FROM companies;
 id | name
----+------
  1 | A社
  2 | B社
  3 | C社
(3 rows)

各企業に初期ユーザーを追加

INSERT INTO users (name, company_id)
SELECT
  '初期ユーザー',
  companies.id
FROM
  companies
;

postgres=# SELECT * FROM users;
 id |     name     | company_id
----+--------------+------------
  1 | 初期ユーザー |          1
  2 | 初期ユーザー |          2
  3 | 初期ユーザー |          3
(3 rows)

ではSELECTの実行結果から複数のデータを複数INSERTするにはどうすれば良いでしょうか。
先ほどの例でいうと、初期ユーザとして「岡部・椎名・橋田」を登録するにはどのようなクエリを記述すれば良いでしょうか。自分が「SELECT INSERT 複数行」といったワードで調べた限り、この問題を解決する方法を紹介している方を発見することはできませんでした。

続きを読む