やわらかテック

興味のあること。業務を通して得られた発見。個人的に試してみたことをアウトプットしています🍵

【goroutineによる並行処理】サンプルを作りながら学ぶgoroutine入門

golangを学ぶ上での壁goroutine

golangを業務で使い始めてから約半年になりました。業務ではgolangを使ってWebsocketを使ったチャットサーバーを作っています。golangの文法は非常にシンプルで分かりやすくシンプルなので、可読性が非常に良いです。普通にgolangを逐次処理として記述する中では大した問題は発生しないかと思いますが、golangの何が難しいのかというとgoroutineと呼ばれるgolangに実装されている並行処理をするための軽量なスレッドの扱いです。

go-tour-jp.appspot.com
(concurrencyって公式が言っているので並行処理なんでしょう)

goroutineの文法自体は非常にシンプルで実行したい関数の前にgoと記述するのみです。

package main
import (
  "fmt"
  "time"
)  
  
func main(){
    // Your code here!
    go print("hello")
    go print("world")
    
    // 先にメインスレッドが終了してしまうため出力されるまで待機
    time.Sleep(1 * time.Second)
    
}

func print(v interface{}) {
  fmt.Println(v)
}

実行結果

hello
world

play.golang.org

書いたきっかけ

goroutineの習得は元々、並行処理に関する知識があったものの手こずりました。 業務でチャットサーバーを書いており、その大半以上がgoroutineを使ってマルチスレッドでの並行処理になります。なので嫌でもgoroutineを書かなければなりませんでした。おかげさまでgoroutineを使ったgolangでの並行処理はある程度思うように記述出来るようになりました。

ふと「他の人はどうやって学習してんだろ...」と思い「goroutine 入門」と検索してみました。サンプルコードをいくつか見てみましたが、goroutineを使ってfmt.Println("hello")を出すようなものが多い印象を受けました。「うーん、これって面白いのかな」という思いとそのコードから実務に繋がるイメージが湧かなかったので「なんか面白いサンプル作りながら学べる記事があったら最高やな」と思い立って自分で書いてみようという気持ちになりました。

参考としている考え方

並行処理を記述する上でElixirというプログラミング言語で学んだアクターモデルメッセージパッシングといった分散メモリにおけるマルチプロセスの同期方法は非常に参考になりました。今回は詳しくは話しませんが、自分の記述するgoroutineのコードは非常にElixirの影響を受けていると思います。のちに登場します、channelというデータ型はElixirでのメッセージッパッシングのような操作を行うことが出来ます。

そうすることで排他制御や同期処理が非常に楽になり、golangに実装されているsyscパッケージを使うこともほとんどなくなるでしょう。実際に公式のgolangチュートリアル「A Tour of Go」でも言及されています。

sync パッケージは同期する際に役に立つ方法を提供していますが、別の方法があるためそれほど必要ありません。 (次のスライドで説明します)

今回作るもの

何かしらのイベントを駆動して動く処理にしたいなーと思い考えた結果、シンプルで分かりやすいタイマー処理に落ち着きました。今回作るものはusedTimerと勝手に名付けたものです。

usedTimerには以下のような仕様があります。

  • 特定の周期A毎にメインスレッドに対して「Golangはいいぞ」というメッセージを飛ばす
  • 特定の周期B毎に時計が壊れる(処理が停止)ため、再起動する必要がある
  • 電気代もかかるため、再起動の上限はN回まで

全体のコードを用意してありますので「どゆこと」という方はまず動かして試してみてください。golangがinstallされているかdockerがinstallされていればすぐに動かすことが出来ます。

github.com

仕様からコードへの置き換え

今は「なんとなーく」で問題ないので、こんな感じで作っていくというイメージを展開しておきます。

まず特定の周期A毎にメインスレッドに対してという仕様に対応するためには、メインスレッドと別にgoroutineを使ってスレッドを用意してあげる必要がありそうです。
さらに立ち上げた別のスレッドから「Golangはいいぞ」というメッセージを飛ばすとあるため周期A毎にメッセージを飛ばす必要があります。スレッド間でメッセージを飛ばすためにはchannelというデータ型を使います。立ち上げたスレッド -> メインスレッドにメッセージを飛ばす処理も記述する必要があるでしょう。

この立ち上げたスレッドは周期B毎に停止をするという処理は記述自体は出来そうですが、故障したことをメインスレッドに伝える必要があります。それは再起動の処理をしてもらうためです。先ほどと同じようにchannelを使ったメッセージを飛ばしてあげれば上手く出来そうな気がします。

補足: スレッド間でメッセージを飛ばすってどういうこと?何のため?

一言で言ってしまえば、スレッド同士でイベントを管理するため、主に同期をとる必要があるためです。AのスレッドからはBのスレッドで何が起きているのか分かりません。共通のグローバルな変数に何が起きているのかを都度、更新しても良いのですが排他制御(順番に書き込める、誰かが書き込んでいる最中は書き込めないようにする...etc)を作る必要があり、非常に面倒です。

そのため、スレッド同士でメッセージを通知することで「あ、Aのスレッドのこの処理が終わったんだ」「あ、Aでこのイベントが始まった」ということを認知することが出来るのです。

作成開始

では早速、作っていきましょう。まずは以下のようにディレクトリとファイルを作成してください。$GOPATH配下に配置するようにお願いします。

main
└ main.go
timer
├ model.go
└ timer.go

メッセージの定義

最初に行うのはスレッド間で送信しあうメッセージの設計です。/timer/model.goに対してメッセージの定義を行います。今回は先ほど記述したようにメッセージを使って2つの通知をする必要があります。

  • Golangはいいぞ」というメッセージを飛ばす
  • 故障したことをメインスレッドに伝える必要

この2つに対応できて、後に上司から「やっぱこんときもメインスレッドにメッセージ飛ばしてくれる?」と追い討ちをかけられても問題ないようにメッセージの定義を行います。メッセージの定義には構造体を用います。

/timer/timer.go

package timer

// スレッド間で用いるchannel用のstruct定義
type Protocol struct {
    // 失敗時のmessageなのかを判定するために用意
    Type    string
    Message string
    Error   error
}

これは業務でチャットサーバーを作り続ける中で洗礼された最終形に近い形のものです。Typeというフィールドに何のメッセージなのかを与えます(eg: 通常のメッセージ, 故障時のメッセージ etc...)。このTypeのフィールドを参照してもらうことで何のメッセージが飛んできたのかを判別することが出来ます。Typestring型なのでメッセージの種類はいくらでも増やすことが出来ます。

Typeに対応するデータをMessageErrorに与えます。通常のメッセージであれば以下のようになります。

msg := &Protocol{
      Type: "MESSAGE",
      Message: "Golangはいいぞ",
    }

しかし、残念なことにせっかく定義した構造体もこのままではスレッド間で飛ばすメッセージとしては使用することが出来ません。先ほど紹介したようにこの構造体をchannel型というデータにしてあげる必要があります。usedTimerを使ってくれる人に「うわーchannel型ってどうやって作るねん」と意識してもらいたくないので、関数で作成出来るようにしておきましょう。

/timer/timer.go

// 構造体を元にチャネルを作成
func CreateProtocolChannel() chan *Protocol {
    return make(chan *Protocol)
}

make(chan *対象の構造体)とすることで構造体をベースとしてchannel型を作成することが出来ます。

timerの作成

スレッド間で飛ばすメッセージの定義が完了したので、ついにメインとなるusedTimerを作成していきます。といっても、やることは単純です。まずは特定周期A毎に「Golangはいいぞ」というメッセージを飛ばす処理までを作成しましょう。
goroutineで立ち上げることを想定しているので関数にします。また、このusedTimer内は3つの引数を受け取ることにします。

  • iterTimeInt -> メインスレッドにメッセージを送る周期Aの秒数
  • breakTimeInt -> usedTimerが故障を起こす周期Bの秒数
  • ptc(protocol) -> メッセージをメインスレッドに送信するためのchannel型

もちろんこれが正解ではないので、他の方法で周期を指定して頂いても構いません。例えば、共通のスコープを持つ変数を用意する方法がありますが、個人的にはあまり好きではないので避けるようにしています。

まずは関数UsedTimerの全体を見てみてください。順に詳細を追っていきます。

// 仕様について
// 1. 指定した周期毎に" "というメッセージをchannel経由で送信する
// 2. 指定した周期毎に時間切れとなり、errorを返して実行loopをbreakする
func UsedTimer(iterTimeInt, breakTimeInt int, ptc chan *Protocol) {
    // int型をtime.Duration型へ変換(second)
    iterTime := time.Duration(iterTimeInt) * time.Second
    breakTime := time.Duration(breakTimeInt) * time.Second

    // 周期毎にイベントを発生させるtickerを作成
    iterTicker := time.NewTicker(iterTime)
    breakTicker := time.NewTicker(breakTime)
    defer func() {
        iterTicker.Stop()
        breakTicker.Stop()
    }()

    for {
        select {
        // メッセージを送信する処理
        case <-iterTicker.C:
            ptc <- &Protocol{
                Type:    "MESSAGE",
                Message: "Golangはいいぞ",
            }
        case <-breakTicker.C:
            ptc <- &Protocol{
                Type:  "ERROR",
                Error: errors.New("[Error] Timer was broken"),
            }
            // breakだとスレッドがkillできないので注意
            return
        }
    }
}

まずは引数で受け取った周期A,Bの値をint型からtime.Duration型に変換します。変換した値を用いて、timeモジュールに用意されているTickerを作成します。Tickerは指定した周期毎に作成したスレッドに対してchannel型を通じてメッセージを飛ばしてくれます。
最後にdefer構文を使用して、スレッドが停止する際(goroutineで実行する関数がreturnする時)にTickerを停止させるようにしておきます。

// int型をtime.Duration型へ変換(second)
iterTime := time.Duration(iterTimeInt) * time.Second
breakTime := time.Duration(breakTimeInt) * time.Second

// 周期毎にイベントを発生させるtickerを作成
iterTicker := time.NewTicker(iterTime)
breakTicker := time.NewTicker(breakTime)
defer func() {
    iterTicker.Stop()
    breakTicker.Stop()
}()

準備は出来ました。次にTickerから周期A,B毎にメッセージを受け取るための処理を記述しましょう。メッセージを受信し続ける必要があるため、多言語で言う所のwhile loopfor構文を用いて作成します。条件を与えなければ意図的に無限ループを作り出すことが出来ます。仮にfor構文がないとどうなるでしょうか。一度、メッセージを受信した後に関数がnilを返すか分かりませんが実行を終了します。その結果、次にメッセージを受信することは出来ません。

for {
        select {
        // メッセージを送信する処理
        case <-iterTicker.C:
            ptc <- &Protocol{
                Type:    "MESSAGE",
                Message: "Golangはいいぞ",
            }
        case <-breakTicker.C:
            ptc <- &Protocol{
                Type:  "ERROR",
                Error: errors.New("[Error] Timer was broken"),
            }
            // breakだとスレッドがkillできないので注意
            return
        }
    }

次に見慣れない構文selectについてです。これはswitch構文のgoroutine版だと思えば良いです。case <- channel型とすることで特定のchannel型を持つメッセージを受信することが出来ます。今回は先ほど作成した2つのTickerを受信するようにしています。
iterTicker.Cでは周期A毎にメインスレッドにメッセージを送るため、breakTicker.Cは周期B毎に故障したことをメッセージとして送るために用意しています。もしやりたことが増えて、第3のTickerを追加した時にはselectにも同じように追加するだけで簡単に拡張することが出来ます。

select {
        // メッセージを送信する処理
        case <-iterTicker.C:
            ptc <- &Protocol{
                Type:    "MESSAGE",
                Message: "Golangはいいぞ",
            }
            :
        case <- newTiceker.C:
            :
            :
        }

次にメインスレッドにメッセージを送る部分についてです。この項目が終了した時点でgoroutineを快適に扱うために必要なchannel型, メッセージの受信, メッセージの受信に触れたことになります。少し練習をすれば、すぐにgoroutineを使った並行処理が書けるようになるでしょう。
メッセージの送信はselectの内部でメッセージを受信した時と書き方が似ています。対象のchannel型に対してch <- データ構造とすることでchannel型を通じてメッセージを送ることが出来ます。今回、メッセージとして定義しているのはProtocolという構造体なので、該当するデータを作成してメッセージを送信します。

// 周期A毎に送るメッセージ
ptc <- &Protocol{
    Type:    "MESSAGE",
    Message: "Golangはいいぞ",
}

// 周期B毎に送るエラーメッセージ
ptc <- &Protocol{
    Type:    "ERROR",
    Message: errors.New("[Error] Timer was broken"),
}

これでgoroutineで実行する関数側の処理は記述が完了しました。あとはgoroutineを管理するためのスケジューラーを記述してあげるだけです。

Schedulerの作成

まずは全体のコードをご覧ください。無名関数を使っている部分はありますが、すでに最もややこしいchannel型でのメッセージ受信部分については触れているので最低限、何をやっているのかは分かるのではないかと思います。

// timerの動作管理を行うスケジューラー。再起動まで行う
func Scheduler(iterTimeInt, breakTimeInt, revivalNum int) {
    // 通信用のチャネルを作成
    ch := CreateProtocolChannel()
    // 何度も再起動させたいので簡略化のため無名関数を採用
    receiver := func() error {
        // 作成したUsedTimerからのメッセージを受け取るloopを作成
        for {
            select {
            case msg := <-ch:
                switch msg.Type {
                case "MESSAGE":
                    log.Print(msg.Message)
                case "ERROR":
                    log.Print(msg.Error.Error())
                    return msg.Error
                }
            }
        }
    }

    // 監視のための処理を記述
    counter := 0
    for {
        // goroutineを用いてスレッドを作成
        go UsedTimer(iterTimeInt, breakTimeInt, ch)
        if err := receiver(); err != nil {
            log.Print("[Main] Restart timer thread. Please wait 3 seconds ...")
            time.Sleep(3 * time.Second)
            counter += 1
        }

        if counter >= revivalNum {
            log.Println("[Main] Reached revival limit. so stop worker ...")
            return
        }
    }
}

最初に立ち上げた別スレッドからメッセージを受け取るためのchannel型のデータを作成します。channel型の作成は先ほどすでに関数化したので、呼び出した値を変数に保持するのみです。

// 通信用のチャネルを作成
ch := CreateProtocolChannel()

今回は、スレッドを特定関数まで再起動するという仕様があるでの繰り返し使うことを想定してchannel型を通じてメッセージを受信する部分を無名関数で使いやすくしてみました。受信の処理は先ほどとほとんど同じですが、1点だけ異なるところがあります。channel型を通じて得たメッセージの値を参照したい時はselectのブロックの中でcase 変数名 := <-channel型:とします。
この場合はUsedTimerから送られてくるProtocol構造体の値がmsgに保持されます。送られきたメッセージに保持されているTypeのfiledを見ることで何のメッセージなのかを判定します。

for {
    select {
        case msg := <-ch:
            switch msg.Type {
        case "MESSAGE":
            log.Print(msg.Message)
        case "ERROR":
            log.Print(msg.Error.Error())
            return msg.Error
        }
    }
}

MESSAGEという周期A毎に送られてくるメッセージを受信した場合には送られてきたメッセージのMessagefieldの値を表示します。ERRORというusedTimerが故障した際に送信されるメッセージを受信した時には無名関数内からreturn errとしてエラーを返して受信ループを停止させます。

この作成したメッセージ受信の無名関数とgoroutineを使って別スレッドの立ち上げとメッセージの受信を行います。

// 監視のための処理を記述
counter := 0
for {
    // goroutineを用いてスレッドを作成
    go UsedTimer(iterTimeInt, breakTimeInt, ch)
    if err := receiver(); err != nil {
        log.Print("[Main] Restart timer thread. Please wait 3 seconds ...")
        time.Sleep(3 * time.Second)
        counter += 1
    }
    if counter >= revivalNum {
        log.Println("[Main] Reached revival limit. so stop worker ...")
        return
    }
}

再起動にはforの無限ループを使いますが、今回は再起動回数に上限があるので、再起動を行なった回数を記録するためのインクリメント変数counterを用意して、再起動するたびに+1します。この値が引数で受け取った上限値を超えた時に処理を終了させます。

再起動は別スレッドからエラーメッセージを受け取った際に行いますが、都度都度、即再起動していては思わぬ負荷をかけてしまう可能性が考えられるため、必ず3秒待機してから再起動するようにもしてみました(eg: usedTimerの壊れる周期が不定期で0.1sごとに壊れるごとが重なるような場合など)。

// 特に問題がなければメッセージの受信ループが発生
if err := receiver(); err != nil {
    log.Print("[Main] Restart timer thread. Please wait 3 seconds ...")
    time.Sleep(3 * time.Second)
    counter += 1
}

これでtimerパッケージの記述が終了しました。最後にmain.goから処理を呼び出します。

/main/main.go

package main

import (
    "log"
    "used_timer/timer"
)

func main() {
    log.Print("[Main] Start timer ...")
    // 2秒毎にlogを出し、起動から10秒経過した時にbreak
    timer.Scheduler(2, 10, 5)
}

動作確認

さっそく動作を確認してみましょう。golangがinstall済みで今回作成したプロジェクトが$GOPATH配下にあるとして以下のコマンドを実行します。

$ cd used_timer $ go run main/main.go

上手く作れていれば、2秒毎に「Golangはいいぞ」というlogが表示されて、10秒毎にusedTimerが故障して「[Error] Timer was broken」というlogが出力されます。一度、スレッドはkillされてしまいますが、3秒後に再び新たなスレッドが作成されて同じようにlogが表示されるようになるでしょう。
この一連の処理が5回まで行われれば成功です。

実行結果

2020/04/07 00:36:48 [Main] Start timer ...
2020/04/07 00:36:50 Golangはいいぞ
2020/04/07 00:36:52 Golangはいいぞ
2020/04/07 00:36:54 Golangはいいぞ
2020/04/07 00:36:56 Golangはいいぞ
2020/04/07 00:36:58 Golangはいいぞ
2020/04/07 00:36:58 [Error] Timer was broken
2020/04/07 00:36:58 [Main] Restart timer thread. Please wait 3 seconds ...
2020/04/07 00:37:03 Golangはいいぞ
2020/04/07 00:37:05 Golangはいいぞ
2020/04/07 00:37:07 Golangはいいぞ
2020/04/07 00:37:09 Golangはいいぞ
2020/04/07 00:37:11 Golangはいいぞ
2020/04/07 00:37:11 [Error] Timer was broken
2020/04/07 00:37:11 [Main] Restart timer thread. Please wait 3 seconds ...
2020/04/07 00:37:16 Golangはいいぞ
2020/04/07 00:37:18 Golangはいいぞ
2020/04/07 00:37:20 Golangはいいぞ
2020/04/07 00:37:22 Golangはいいぞ
2020/04/07 00:37:24 Golangはいいぞ
2020/04/07 00:37:24 [Error] Timer was broken
2020/04/07 00:37:24 [Main] Restart timer thread. Please wait 3 seconds ...
2020/04/07 00:37:29 Golangはいいぞ
2020/04/07 00:37:31 Golangはいいぞ
2020/04/07 00:37:33 Golangはいいぞ
2020/04/07 00:37:35 Golangはいいぞ
2020/04/07 00:37:37 Golangはいいぞ
2020/04/07 00:37:37 [Error] Timer was broken
2020/04/07 00:37:37 [Main] Restart timer thread. Please wait 3 seconds ...
2020/04/07 00:37:42 Golangはいいぞ
2020/04/07 00:37:44 Golangはいいぞ
2020/04/07 00:37:46 Golangはいいぞ
2020/04/07 00:37:48 Golangはいいぞ
2020/04/07 00:37:50 Golangはいいぞ
2020/04/07 00:37:50 [Error] Timer was broken
2020/04/07 00:37:50 [Main] Restart timer thread. Please wait 3 seconds ...
2020/04/07 00:37:53 [Main] Reached revival limit. so stop worker ...

上手くいっているようです!!

まとめ

  • goroutineを使用したスレッドの立ち上げ
  • channel型のデータを使ったスレッド同士のメッセージ送受信
  • channel型に構造体を使用したイベントを切り分け
  • スレッドの再起動と簡単なサンプル

と簡単なサンプルに実務で役に立つであろうgoroutineに関する知識を詰め込みました。このサンプルを通して少しでもgoroutineへの理解が深まったのであれば何よりです。では、良い並行処理ライフを。

おまけ1: 関数がnilを返しているのかどうか

返していないっぽいですね。 play.golang.org

おまけ2: 並行処理の学習方法について

筆者は1年前までは並行処理の「へ」の字も知りませんでした。並行処理の知識が深まったのはElixirというプログラミング言語のおかげです。今回のサンプルコードの設計のベースになっている考え方の多くはElixirから輸入したものです。

「え、どんな言語やねん」と興味を持っていただけたのなら以下の記事をぜひ読んでみてください。きっと楽しんで頂けると思います。

www.okb-shelf.work

参考文献