機能に関する詳細
現在、業務でwebsocket
を使用したチャットアプリケーションの開発に関わっている。自分が担当しているのはサーバーサイドのみで、実装はgolang
を使用している。websocket
のライブラリはgorilla/websocketを採用している
サーバーサイドの websocket
のメッセージのやりとりの部分はgorilla/websocketのexamplesにあったものをパクリスペクトしている。そりゃ、ライブラリ作った人たちが作ったサンプルに間違いはないでしょという風に考えた
このサンプルに実装されているのはだいたい以下の通り
- client.go -> clientに関する情報(struct)を生成。メッセージの受信送信のためのスレッドの実装
- hub.go -> チャネル経由で受け取ったメッセージを配信するための処理の実装
- main.go -> サーバーを起動してwebsocketの接続を受け付ける(switching protocol)
これだけでも十分な実装だが、自分が担当している業務では追加の機能要件がある。1つはredis
のpubsub
機能を使用した購読済みの別サーバーへのメッセージのpublish
機能の実装で、これは予想の通り、kubernetes
を使用してサーバーがスケールする予定であるため必要になる
とは言いつつも、参加当時はkubernetes
に関する知識は皆無だったので、どのようにスケールするかなんて考えていなかった。というか、考えられなかったので、後のやらかしポイントになる
合わせて、メッセージの送受には並行処理が欠かせない。1つのプロセス、もしくはスレッドで処理を占有してしまうと、チャットとは呼べない代物が出来上がる。メッセージがサーバーに受信して送信されるまで、誰もメッセージを送ることが出来ないようなものになる。接続数が少ないうちは何とかなるかもしれないが、接続数に比例してストレスを感じるようになるだろう
やらかしたこと
開発初期当時は環境の準備や実装の遅れなどの問題で、シングルサーバーでの動作確認しか行なっておらず、上手く動いているものだと錯覚していた。実際に、複数サーバーにしてみて動作を検証したはずなのだが、見落としていた様だ
では実際に何をやらかしていたかという話に入る。サーバーでのwebsocket
のメッセージの処理とgolang
の並行処理をするためにgoroutines
でどのようにスレッドを用意しているかを簡単に説明していく。ちなみにチャンネルはpubsub
のチャンネルを指しており、チャネルはgolang
のスレッド間通信で扱うデータ構造を指しており、タイポではない
setup
- サーバーを起動する。この時に合わせて、Hub(examplesのhub.go)を
goroutines
を使用して立ち上げて待機させる redis
のpubsub
機能を利用して対象のチャンネルを購読(subscribe)して、publish
されたメッセージを受信するためのスレッドを待機させる- ユーザーがブラウザ経由で
websocket
の新規セッションのrequestを送信する - サーバー側で接続を許可して、このユーザー専用のメッセージ読み込みスレッドとメッセージ書き込みスレッドを立ち上げる
- 新規でユーザーが接続される毎に同様に専用のスレッドを立ち上げる(認証で新規ユーザーなのかを判定している)
この時点でユーザーが2人であるとfixすると存在しているスレッドの内訳は以下の様に7つになる
- メインスレッド(httpのrequestを受け付ける)
- Hub(メイン処理を実行)スレッド
pubsub
の受信用スレッド- ユーザーA専用の読み取りスレッド
- ユーザーB専用の読み取りスレッド
- ユーザーA専用の書き込みスレッド
- ユーザーB専用の書き込みスレッド
メッセージの受信と送信
webscoket
のセッションを張っているユーザーから新規のメッセージが送信される- ユーザー専用の読み取りスレッドでメッセージを読み込んで
redis
のpubsub
機能で購読済みのチャンネルにメッセージをpublish
publish
されたメッセージを受信用のスレッドで受け取り、チャンネルを購読しているサーバーにメッセージを配信してチャネルを通じてHubにメッセージを渡す- Hubで受け取ったメッセージを元に、処理を行い対象のユーザーに配信するために各ユーザー専用の書き込みスレッドにメッセージをチャネル経由で送る
- 専用の書き込みスレッドで受け取ったメッセージを
websocket
経由でユーザーに送信する
これが概要。あっ....これをスケールすると...
コードはかなり、省略してアレンジしたものを載せておく。動作は保証出来ないがイメージが伝わればと思う
hub.go(読み取りスレッド部分)
func (c *Client) readPump() { defer func() { c.hub.unregister <- c c.conn.Close() }() : : message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1)) // publishは自前実装の関数。メッセージをpubsubを利用して送信する publish(message) } }
pubsub.go(pubsub
のメッセージを受信してHubに渡すスレッド) hub自体は起動時に引数で受け取る
func SubscribeChannel(channelName string, h *Hub) { : : // メッセージの受信ループを作成 for { switch val := psc.Receive().(type) { case redis.Message: var message *Message h.broadcast <- message } } }
hub.go(Hub)
type Hub struct { // userを識別するためにstringをkeyに持つmapに変更 clients map[string]*Client broadcast chan []byte register chan *Client unregister chan *Client } func (h *Hub) Run(m *database.Mongo) { for { select { // websocket経由で受信したメッセージを読み取るスレッドから受信 case binaryMessage := <-h.Broadcast: switch message.Type { case "MESSAGE": h.Client[message.UserID].Send <- binaryMessage } } } }
スケールした結果
何と同じメッセージがスケールしたサーバーの数(Pods
)だけ、送信されているではないか。そりゃそうだ。読み取ったメッセージをpublish
して、受け取り配信するためのスレッドと各サーバー毎に立ち上がっているのだから。ついでに処理もサーバー台数分だけ走るので、databaseの値もめちゃくちゃになっている。開発初期当時は、kubernetes
のスケールへの理解が不足しており、redis
のpubsub
のpublish
を担当するスレッドも同様にスケールされるという意識が無かった
単純にpublish
をどのタイミングですれば良いのかという判断が甘かった
改善した結果
構成に手を加えて、スケールしても問題なく、処理が1度のみ走り、メッセージが複数回、送信されないようにした。setupの部分の更新はなく、全体の流れは以下の様になった。大きな変更としてはスレッド間で扱うチャネルを1つ追加した。元のexamplesに実装されているのはbroadCast
というチャネルでこれは、読み取り専用プロセスからHubに渡すために使用するチャネルとして扱い、新規で追加したチャネルはpubsub
経由で受信したメッセージを受信するためのチャネルとして用意をした
元のHubのstruct
type Hub struct { // Registered clients. clients map[*Client]bool // Inbound messages from the clients. broadcast chan []byte // Register requests from the clients. register chan *Client // Unregister requests from clients. unregister chan *Client }
チャネルを追加
type Pubsub struct { // 送信先 Target string Message []byte } type Hub struct { // userを識別するためにstringをkeyに持つmapに変更 clients map[string]*Client broadcast chan []byte register chan *Client unregister chan *Client pubsub chan *Pubsub }
メッセージの受信と送信
webscoket
のセッションを張っているユーザーから新規のメッセージが送信される- ユーザー専用の読み取りスレッドでメッセージを読み込んでHubにチャネル経由でメッセージを渡す
- メッセージを元に処理を行い、
redis
のpubsub
機能で購読済みのチャンネルにメッセージをpublish
publish
されたメッセージを受信用のスレッドで受け取り、チャンネルを購読しているサーバーにメッセージを配信して別のチャネルを通じてHubにメッセージを渡す- Hubで受け取ったメッセージから送信先のユーザーの
websocket
のセッションが自身のサーバーに存在するかを確認。存在している場合に、ユーザー専用の書き込みスレッドにチャネル経由でメッセージを渡す - 専用の書き込みスレッドで受け取ったメッセージを
websocket
経由でユーザーに送信する
このように変更することで無事に、スケールした状態でもメッセージが1通のみ送信されるようなり、冷や汗が引いた。並行処理プログラミングの経験も、スケールの経験も全く無かったので大変、勉強になったし、どのように考えれば良いのかが理解出来た
hub.go(読み取りスレッド部分)
func (c *Client) readPump() { defer func() { c.hub.unregister <- c c.conn.Close() }() : : message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1)) // exampleの通りチャネルを経由してHubにメッセージを渡す c.hub.broadcast <- message } }
hub.go(Hub)
func (h *Hub) Run(m *database.Mongo) { for { select { : : // websocket経由で受信したメッセージを読み取るスレッドから受信 case binaryMessage := <-h.Broadcast: switch message.Type { case "MESSAGE": h.Client[message.UserID].Send <- binaryMessage // 配信に必要な情報をpubsub経由で渡す publish(&Pubsub{Target: "対象のユーザーを認識する値(eg: unique id)", Message: message}) } } } }
pubsub.go(pubsub
のメッセージを受信してHubに渡すスレッド)
func SubscribeChannel(channelName string, h *Hub) { : : // メッセージの受信ループを作成 for { switch val := psc.Receive().(type) { case redis.Message: var message *Message // 全体配信用のチャネルにメッセージを渡す h.pubsub <- message } } }
hub.go(Hub)のpubsub
用のメッセージの受信
func (h *Hub) Run(m *database.Mongo) { for { select { case publish := <-h.pubsub: if val, ok := h.Clients[publish.Target]; ok { val.Send <- publish.Message } : : } }
こんな感じで落ち着いた。並行処理の設計は奥が深い...
draw.ioというサービスを構成図に書くのに初めて使ってみたが、かなり良いのでオススメ
おまけ Elixir/Erlangの勉強が意外なところで役に立った
並行処理と聞いて、反応せずにはいられない。Elixir
・Erlang
の得意とするところだ。実際に実装に使用しているのはgolang
だが、このチャットアプリを実装するにあたって、「プログラミングElixir」や「プログラミング Erlang」で得たプロセスの設計方法やアクターモデルの考え方はgoroutines
を扱う上で非常に参考になった
スレッドの設計をし直す時にも「プログラミング Erlang」でJoe
がどのように複数プロセスでの処理を考えるのか(最初にメッセージシーケンス図を書くらしい)を参考に、シーケンス図をそこそこ書いた
まさか、こんな形で役に立つとは思わなかったがElixir
・Erlang
万歳