golangを学ぶ上での壁goroutine
golang
を業務で使い始めてから約半年になりました。業務ではgolang
を使ってWebsocket
を使ったチャットサーバーを作っています。golang
の文法は非常にシンプルで分かりやすくシンプルなので、可読性が非常に良いです。普通にgolang
を逐次処理として記述する中では大した問題は発生しないかと思いますが、golang
の何が難しいのかというとgoroutine
と呼ばれるgolang
に実装されている並行処理
をするための軽量なスレッドの扱いです。
go-tour-jp.appspot.com
(concurrencyって公式が言っているので並行処理なんでしょう)
goroutine
の文法自体は非常にシンプルで実行したい関数の前にgo
と記述するのみです。
package main import ( "fmt" "time" ) func main(){ // Your code here! go print("hello") go print("world") // 先にメインスレッドが終了してしまうため出力されるまで待機 time.Sleep(1 * time.Second) } func print(v interface{}) { fmt.Println(v) }
実行結果
hello world
書いたきっかけ
goroutine
の習得は元々、並行処理に関する知識があったものの手こずりました。 業務でチャットサーバーを書いており、その大半以上がgoroutine
を使ってマルチスレッドでの並行処理になります。なので嫌でもgoroutine
を書かなければなりませんでした。おかげさまでgoroutine
を使ったgolang
での並行処理
はある程度思うように記述出来るようになりました。
ふと「他の人はどうやって学習してんだろ...」と思い「goroutine 入門」と検索してみました。サンプルコードをいくつか見てみましたが、goroutine
を使ってfmt.Println("hello")
を出すようなものが多い印象を受けました。「うーん、これって面白いのかな」という思いとそのコードから実務に繋がるイメージが湧かなかったので「なんか面白いサンプル作りながら学べる記事があったら最高やな」と思い立って自分で書いてみようという気持ちになりました。
参考としている考え方
並行処理
を記述する上でElixir
というプログラミング言語で学んだアクターモデル
やメッセージパッシング
といった分散メモリにおけるマルチプロセスの同期方法は非常に参考になりました。今回は詳しくは話しませんが、自分の記述するgoroutine
のコードは非常にElixir
の影響を受けていると思います。のちに登場します、channel
というデータ型はElixir
でのメッセージッパッシング
のような操作を行うことが出来ます。
そうすることで排他制御や同期処理が非常に楽になり、golang
に実装されているsysc
パッケージを使うこともほとんどなくなるでしょう。実際に公式のgolang
チュートリアル「A Tour of Go」でも言及されています。
sync パッケージは同期する際に役に立つ方法を提供していますが、別の方法があるためそれほど必要ありません。 (次のスライドで説明します)
今回作るもの
何かしらのイベントを駆動して動く処理にしたいなーと思い考えた結果、シンプルで分かりやすいタイマー処理に落ち着きました。今回作るものはusedTimer
と勝手に名付けたものです。
usedTimer
には以下のような仕様があります。
- 特定の周期A毎にメインスレッドに対して「Golangはいいぞ」というメッセージを飛ばす
- 特定の周期B毎に時計が壊れる(処理が停止)ため、再起動する必要がある
- 電気代もかかるため、再起動の上限はN回まで
全体のコードを用意してありますので「どゆこと」という方はまず動かして試してみてください。golang
がinstallされているかdocker
がinstallされていればすぐに動かすことが出来ます。
仕様からコードへの置き換え
今は「なんとなーく」で問題ないので、こんな感じで作っていくというイメージを展開しておきます。
まず特定の周期A毎にメインスレッドに対して
という仕様に対応するためには、メインスレッドと別にgoroutine
を使ってスレッドを用意してあげる必要がありそうです。
さらに立ち上げた別のスレッドから「Golangはいいぞ」というメッセージを飛ばす
とあるため周期A毎にメッセージを飛ばす必要があります。スレッド間でメッセージを飛ばすためにはchannel
というデータ型を使います。立ち上げたスレッド -> メインスレッド
にメッセージを飛ばす処理も記述する必要があるでしょう。
この立ち上げたスレッドは周期B毎に停止をするという処理は記述自体は出来そうですが、故障したことをメインスレッドに伝える必要があります。それは再起動の処理をしてもらうためです。先ほどと同じようにchannel
を使ったメッセージを飛ばしてあげれば上手く出来そうな気がします。
補足: スレッド間でメッセージを飛ばすってどういうこと?何のため?
一言で言ってしまえば、スレッド同士でイベントを管理するため、主に同期をとる必要があるためです。AのスレッドからはBのスレッドで何が起きているのか分かりません。共通のグローバルな変数に何が起きているのかを都度、更新しても良いのですが排他制御(順番に書き込める、誰かが書き込んでいる最中は書き込めないようにする...etc)を作る必要があり、非常に面倒です。
そのため、スレッド同士でメッセージを通知することで「あ、Aのスレッドのこの処理が終わったんだ」「あ、Aでこのイベントが始まった」ということを認知することが出来るのです。
作成開始
では早速、作っていきましょう。まずは以下のようにディレクトリとファイルを作成してください。$GOPATH
配下に配置するようにお願いします。
main └ main.go timer ├ model.go └ timer.go
メッセージの定義
最初に行うのはスレッド間で送信しあうメッセージの設計です。/timer/model.go
に対してメッセージの定義を行います。今回は先ほど記述したようにメッセージを使って2つの通知をする必要があります。
- 「Golangはいいぞ」というメッセージを飛ばす
- 故障したことをメインスレッドに伝える必要
この2つに対応できて、後に上司から「やっぱこんときもメインスレッドにメッセージ飛ばしてくれる?」と追い討ちをかけられても問題ないようにメッセージの定義を行います。メッセージの定義には構造体
を用います。
/timer/timer.go
package timer // スレッド間で用いるchannel用のstruct定義 type Protocol struct { // 失敗時のmessageなのかを判定するために用意 Type string Message string Error error }
これは業務でチャットサーバーを作り続ける中で洗礼された最終形に近い形のものです。Type
というフィールドに何のメッセージなのかを与えます(eg: 通常のメッセージ, 故障時のメッセージ etc...)。このType
のフィールドを参照してもらうことで何のメッセージが飛んできたのかを判別することが出来ます。Type
はstring型
なのでメッセージの種類はいくらでも増やすことが出来ます。
Type
に対応するデータをMessage
やError
に与えます。通常のメッセージであれば以下のようになります。
msg := &Protocol{ Type: "MESSAGE", Message: "Golangはいいぞ", }
しかし、残念なことにせっかく定義した構造体もこのままではスレッド間で飛ばすメッセージとしては使用することが出来ません。先ほど紹介したようにこの構造体をchannel型
というデータにしてあげる必要があります。usedTimer
を使ってくれる人に「うわーchannel型
ってどうやって作るねん」と意識してもらいたくないので、関数で作成出来るようにしておきましょう。
/timer/timer.go
// 構造体を元にチャネルを作成 func CreateProtocolChannel() chan *Protocol { return make(chan *Protocol) }
make(chan *対象の構造体)
とすることで構造体をベースとしてchannel型
を作成することが出来ます。
timerの作成
スレッド間で飛ばすメッセージの定義が完了したので、ついにメインとなるusedTimer
を作成していきます。といっても、やることは単純です。まずは特定周期A毎に「Golangはいいぞ」というメッセージを飛ばす処理までを作成しましょう。
goroutine
で立ち上げることを想定しているので関数にします。また、このusedTimer
内は3つの引数を受け取ることにします。
- iterTimeInt -> メインスレッドにメッセージを送る周期Aの秒数
- breakTimeInt ->
usedTimer
が故障を起こす周期Bの秒数 - ptc(protocol) -> メッセージをメインスレッドに送信するための
channel型
もちろんこれが正解ではないので、他の方法で周期を指定して頂いても構いません。例えば、共通のスコープを持つ変数を用意する方法がありますが、個人的にはあまり好きではないので避けるようにしています。
まずは関数UsedTimer
の全体を見てみてください。順に詳細を追っていきます。
// 仕様について // 1. 指定した周期毎に" "というメッセージをchannel経由で送信する // 2. 指定した周期毎に時間切れとなり、errorを返して実行loopをbreakする func UsedTimer(iterTimeInt, breakTimeInt int, ptc chan *Protocol) { // int型をtime.Duration型へ変換(second) iterTime := time.Duration(iterTimeInt) * time.Second breakTime := time.Duration(breakTimeInt) * time.Second // 周期毎にイベントを発生させるtickerを作成 iterTicker := time.NewTicker(iterTime) breakTicker := time.NewTicker(breakTime) defer func() { iterTicker.Stop() breakTicker.Stop() }() for { select { // メッセージを送信する処理 case <-iterTicker.C: ptc <- &Protocol{ Type: "MESSAGE", Message: "Golangはいいぞ", } case <-breakTicker.C: ptc <- &Protocol{ Type: "ERROR", Error: errors.New("[Error] Timer was broken"), } // breakだとスレッドがkillできないので注意 return } } }
まずは引数で受け取った周期A,Bの値をint型
からtime.Duration型
に変換します。変換した値を用いて、time
モジュールに用意されているTicker
を作成します。Ticker
は指定した周期毎に作成したスレッドに対してchannel型
を通じてメッセージを飛ばしてくれます。
最後にdefer
構文を使用して、スレッドが停止する際(goroutine
で実行する関数がreturnする時)にTicker
を停止させるようにしておきます。
// int型をtime.Duration型へ変換(second) iterTime := time.Duration(iterTimeInt) * time.Second breakTime := time.Duration(breakTimeInt) * time.Second // 周期毎にイベントを発生させるtickerを作成 iterTicker := time.NewTicker(iterTime) breakTicker := time.NewTicker(breakTime) defer func() { iterTicker.Stop() breakTicker.Stop() }()
準備は出来ました。次にTicker
から周期A,B毎にメッセージを受け取るための処理を記述しましょう。メッセージを受信し続ける必要があるため、多言語で言う所のwhile loop
をfor
構文を用いて作成します。条件を与えなければ意図的に無限ループを作り出すことが出来ます。仮にfor
構文がないとどうなるでしょうか。一度、メッセージを受信した後に関数がnil
を返すか分かりませんが実行を終了します。その結果、次にメッセージを受信することは出来ません。
for { select { // メッセージを送信する処理 case <-iterTicker.C: ptc <- &Protocol{ Type: "MESSAGE", Message: "Golangはいいぞ", } case <-breakTicker.C: ptc <- &Protocol{ Type: "ERROR", Error: errors.New("[Error] Timer was broken"), } // breakだとスレッドがkillできないので注意 return } }
次に見慣れない構文select
についてです。これはswitch
構文のgoroutine
版だと思えば良いです。case <- channel型
とすることで特定のchannel
型を持つメッセージを受信することが出来ます。今回は先ほど作成した2つのTicker
を受信するようにしています。
iterTicker.C
では周期A毎にメインスレッドにメッセージを送るため、breakTicker.C
は周期B毎に故障したことをメッセージとして送るために用意しています。もしやりたことが増えて、第3のTicker
を追加した時にはselect
にも同じように追加するだけで簡単に拡張することが出来ます。
select { // メッセージを送信する処理 case <-iterTicker.C: ptc <- &Protocol{ Type: "MESSAGE", Message: "Golangはいいぞ", } : case <- newTiceker.C: : : }
次にメインスレッドにメッセージを送る部分についてです。この項目が終了した時点でgoroutine
を快適に扱うために必要なchannel型
, メッセージの受信
, メッセージの受信
に触れたことになります。少し練習をすれば、すぐにgoroutine
を使った並行処理が書けるようになるでしょう。
メッセージの送信はselect
の内部でメッセージを受信した時と書き方が似ています。対象のchannel型
に対してch <- データ構造
とすることでchannel型
を通じてメッセージを送ることが出来ます。今回、メッセージとして定義しているのはProtocol
という構造体なので、該当するデータを作成してメッセージを送信します。
// 周期A毎に送るメッセージ ptc <- &Protocol{ Type: "MESSAGE", Message: "Golangはいいぞ", } // 周期B毎に送るエラーメッセージ ptc <- &Protocol{ Type: "ERROR", Message: errors.New("[Error] Timer was broken"), }
これでgoroutine
で実行する関数側の処理は記述が完了しました。あとはgoroutine
を管理するためのスケジューラーを記述してあげるだけです。
Schedulerの作成
まずは全体のコードをご覧ください。無名関数を使っている部分はありますが、すでに最もややこしいchannel型
でのメッセージ受信部分については触れているので最低限、何をやっているのかは分かるのではないかと思います。
// timerの動作管理を行うスケジューラー。再起動まで行う func Scheduler(iterTimeInt, breakTimeInt, revivalNum int) { // 通信用のチャネルを作成 ch := CreateProtocolChannel() // 何度も再起動させたいので簡略化のため無名関数を採用 receiver := func() error { // 作成したUsedTimerからのメッセージを受け取るloopを作成 for { select { case msg := <-ch: switch msg.Type { case "MESSAGE": log.Print(msg.Message) case "ERROR": log.Print(msg.Error.Error()) return msg.Error } } } } // 監視のための処理を記述 counter := 0 for { // goroutineを用いてスレッドを作成 go UsedTimer(iterTimeInt, breakTimeInt, ch) if err := receiver(); err != nil { log.Print("[Main] Restart timer thread. Please wait 3 seconds ...") time.Sleep(3 * time.Second) counter += 1 } if counter >= revivalNum { log.Println("[Main] Reached revival limit. so stop worker ...") return } } }
最初に立ち上げた別スレッドからメッセージを受け取るためのchannel型
のデータを作成します。channel型
の作成は先ほどすでに関数化したので、呼び出した値を変数に保持するのみです。
// 通信用のチャネルを作成
ch := CreateProtocolChannel()
今回は、スレッドを特定関数まで再起動するという仕様があるでの繰り返し使うことを想定してchannel
型を通じてメッセージを受信する部分を無名関数で使いやすくしてみました。受信の処理は先ほどとほとんど同じですが、1点だけ異なるところがあります。channel型
を通じて得たメッセージの値を参照したい時はselect
のブロックの中でcase 変数名 := <-channel型:
とします。
この場合はUsedTimer
から送られてくるProtocol構造体
の値がmsg
に保持されます。送られきたメッセージに保持されているType
のfiledを見ることで何のメッセージなのかを判定します。
for { select { case msg := <-ch: switch msg.Type { case "MESSAGE": log.Print(msg.Message) case "ERROR": log.Print(msg.Error.Error()) return msg.Error } } }
MESSAGE
という周期A毎に送られてくるメッセージを受信した場合には送られてきたメッセージのMessage
fieldの値を表示します。ERROR
というusedTimer
が故障した際に送信されるメッセージを受信した時には無名関数内からreturn err
としてエラーを返して受信ループを停止させます。
この作成したメッセージ受信の無名関数とgoroutine
を使って別スレッドの立ち上げとメッセージの受信を行います。
// 監視のための処理を記述 counter := 0 for { // goroutineを用いてスレッドを作成 go UsedTimer(iterTimeInt, breakTimeInt, ch) if err := receiver(); err != nil { log.Print("[Main] Restart timer thread. Please wait 3 seconds ...") time.Sleep(3 * time.Second) counter += 1 } if counter >= revivalNum { log.Println("[Main] Reached revival limit. so stop worker ...") return } }
再起動にはfor
の無限ループを使いますが、今回は再起動回数に上限があるので、再起動を行なった回数を記録するためのインクリメント変数counter
を用意して、再起動するたびに+1
します。この値が引数で受け取った上限値を超えた時に処理を終了させます。
再起動は別スレッドからエラーメッセージを受け取った際に行いますが、都度都度、即再起動していては思わぬ負荷をかけてしまう可能性が考えられるため、必ず3秒待機してから再起動するようにもしてみました(eg: usedTimer
の壊れる周期が不定期で0.1sごとに壊れるごとが重なるような場合など)。
// 特に問題がなければメッセージの受信ループが発生 if err := receiver(); err != nil { log.Print("[Main] Restart timer thread. Please wait 3 seconds ...") time.Sleep(3 * time.Second) counter += 1 }
これでtimer
パッケージの記述が終了しました。最後にmain.go
から処理を呼び出します。
/main/main.go
package main import ( "log" "used_timer/timer" ) func main() { log.Print("[Main] Start timer ...") // 2秒毎にlogを出し、起動から10秒経過した時にbreak timer.Scheduler(2, 10, 5) }
動作確認
さっそく動作を確認してみましょう。golang
がinstall済みで今回作成したプロジェクトが$GOPATH
配下にあるとして以下のコマンドを実行します。
$ cd used_timer $ go run main/main.go
上手く作れていれば、2秒毎に「Golangはいいぞ」というlogが表示されて、10秒毎にusedTimer
が故障して「[Error] Timer was broken」というlogが出力されます。一度、スレッドはkillされてしまいますが、3秒後に再び新たなスレッドが作成されて同じようにlogが表示されるようになるでしょう。
この一連の処理が5回まで行われれば成功です。
実行結果
2020/04/07 00:36:48 [Main] Start timer ... 2020/04/07 00:36:50 Golangはいいぞ 2020/04/07 00:36:52 Golangはいいぞ 2020/04/07 00:36:54 Golangはいいぞ 2020/04/07 00:36:56 Golangはいいぞ 2020/04/07 00:36:58 Golangはいいぞ 2020/04/07 00:36:58 [Error] Timer was broken 2020/04/07 00:36:58 [Main] Restart timer thread. Please wait 3 seconds ... 2020/04/07 00:37:03 Golangはいいぞ 2020/04/07 00:37:05 Golangはいいぞ 2020/04/07 00:37:07 Golangはいいぞ 2020/04/07 00:37:09 Golangはいいぞ 2020/04/07 00:37:11 Golangはいいぞ 2020/04/07 00:37:11 [Error] Timer was broken 2020/04/07 00:37:11 [Main] Restart timer thread. Please wait 3 seconds ... 2020/04/07 00:37:16 Golangはいいぞ 2020/04/07 00:37:18 Golangはいいぞ 2020/04/07 00:37:20 Golangはいいぞ 2020/04/07 00:37:22 Golangはいいぞ 2020/04/07 00:37:24 Golangはいいぞ 2020/04/07 00:37:24 [Error] Timer was broken 2020/04/07 00:37:24 [Main] Restart timer thread. Please wait 3 seconds ... 2020/04/07 00:37:29 Golangはいいぞ 2020/04/07 00:37:31 Golangはいいぞ 2020/04/07 00:37:33 Golangはいいぞ 2020/04/07 00:37:35 Golangはいいぞ 2020/04/07 00:37:37 Golangはいいぞ 2020/04/07 00:37:37 [Error] Timer was broken 2020/04/07 00:37:37 [Main] Restart timer thread. Please wait 3 seconds ... 2020/04/07 00:37:42 Golangはいいぞ 2020/04/07 00:37:44 Golangはいいぞ 2020/04/07 00:37:46 Golangはいいぞ 2020/04/07 00:37:48 Golangはいいぞ 2020/04/07 00:37:50 Golangはいいぞ 2020/04/07 00:37:50 [Error] Timer was broken 2020/04/07 00:37:50 [Main] Restart timer thread. Please wait 3 seconds ... 2020/04/07 00:37:53 [Main] Reached revival limit. so stop worker ...
上手くいっているようです!!
まとめ
goroutine
を使用したスレッドの立ち上げchannel型
のデータを使ったスレッド同士のメッセージ送受信channel型
に構造体を使用したイベントを切り分け- スレッドの再起動と簡単なサンプル
と簡単なサンプルに実務で役に立つであろうgoroutine
に関する知識を詰め込みました。このサンプルを通して少しでもgoroutine
への理解が深まったのであれば何よりです。では、良い並行処理ライフを。
おまけ1: 関数がnilを返しているのかどうか
返していないっぽいですね。 play.golang.org
おまけ2: 並行処理の学習方法について
筆者は1年前までは並行処理の「へ」の字も知りませんでした。並行処理の知識が深まったのはElixir
というプログラミング言語のおかげです。今回のサンプルコードの設計のベースになっている考え方の多くはElixir
から輸入したものです。
「え、どんな言語やねん」と興味を持っていただけたのなら以下の記事をぜひ読んでみてください。きっと楽しんで頂けると思います。