田舎で並行処理の夢を見る

試していることなど...需要がないかもしれないけど細々とアウトプットしてます

websocketを使ったgolang産チャットアプリのやらかしスレッド設計をkubernetesでスケール可能に修正した

機能に関する詳細

現在、業務でwebsocketを使用したチャットアプリケーションの開発に関わっている。自分が担当しているのはサーバーサイドのみで、実装はgolangを使用している。websocketのライブラリはgorilla/websocketを採用している

github.com

サーバーサイドの websocketのメッセージのやりとりの部分はgorilla/websocketのexamplesにあったものをパクリスペクトしている。そりゃ、ライブラリ作った人たちが作ったサンプルに間違いはないでしょという風に考えた

github.com

このサンプルに実装されているのはだいたい以下の通り

  • client.go -> clientに関する情報(struct)を生成。メッセージの受信送信のためのスレッドの実装
  • hub.go -> チャネル経由で受け取ったメッセージを配信するための処理の実装
  • main.go -> サーバーを起動してwebsocketの接続を受け付ける(switching protocol)

これだけでも十分な実装だが、自分が担当している業務では追加の機能要件がある。1つはredispubsub機能を使用した購読済みの別サーバーへのメッセージのpublish機能の実装で、これは予想の通り、kubernetesを使用してサーバーがスケールする予定であるため必要になる
とは言いつつも、参加当時はkubernetesに関する知識は皆無だったので、どのようにスケールするかなんて考えていなかった。というか、考えられなかったので、後のやらかしポイントになる

合わせて、メッセージの送受には並行処理が欠かせない。1つのプロセス、もしくはスレッドで処理を占有してしまうと、チャットとは呼べない代物が出来上がる。メッセージがサーバーに受信して送信されるまで、誰もメッセージを送ることが出来ないようなものになる。接続数が少ないうちは何とかなるかもしれないが、接続数に比例してストレスを感じるようになるだろう

やらかしたこと

f:id:takamizawa46:20191216225158p:plain
開発初期当時は環境の準備や実装の遅れなどの問題で、シングルサーバーでの動作確認しか行なっておらず、上手く動いているものだと錯覚していた。実際に、複数サーバーにしてみて動作を検証したはずなのだが、見落としていた様だ

では実際に何をやらかしていたかという話に入る。サーバーでのwebsocketのメッセージの処理とgolangの並行処理をするためにgoroutinesでどのようにスレッドを用意しているかを簡単に説明していく。ちなみにチャンネルはpubsubのチャンネルを指しており、チャネルはgolangのスレッド間通信で扱うデータ構造を指しており、タイポではない

setup

  • サーバーを起動する。この時に合わせて、Hub(examplesのhub.go)をgoroutinesを使用して立ち上げて待機させる
  • redispubsub機能を利用して対象のチャンネルを購読(subscribe)して、publishされたメッセージを受信するためのスレッドを待機させる
  • ユーザーがブラウザ経由でwebsocketの新規セッションのrequestを送信する
  • サーバー側で接続を許可して、このユーザー専用のメッセージ読み込みスレッドとメッセージ書き込みスレッドを立ち上げる
  • 新規でユーザーが接続される毎に同様に専用のスレッドを立ち上げる(認証で新規ユーザーなのかを判定している)

この時点でユーザーが2人であるとfixすると存在しているスレッドの内訳は以下の様に7つになる

  • メインスレッド(httpのrequestを受け付ける)
  • Hub(メイン処理を実行)スレッド
  • pubsubの受信用スレッド
  • ユーザーA専用の読み取りスレッド
  • ユーザーB専用の読み取りスレッド
  • ユーザーA専用の書き込みスレッド
  • ユーザーB専用の書き込みスレッド

メッセージの受信と送信

f:id:takamizawa46:20191216225218p:plain

  • webscoketのセッションを張っているユーザーから新規のメッセージが送信される
  • ユーザー専用の読み取りスレッドでメッセージを読み込んでredispubsub機能で購読済みのチャンネルにメッセージをpublish
  • publishされたメッセージを受信用のスレッドで受け取り、チャンネルを購読しているサーバーにメッセージを配信してチャネルを通じてHubにメッセージを渡す
  • Hubで受け取ったメッセージを元に、処理を行い対象のユーザーに配信するために各ユーザー専用の書き込みスレッドにメッセージをチャネル経由で送る
  • 専用の書き込みスレッドで受け取ったメッセージをwebsocket経由でユーザーに送信する

これが概要。あっ....これをスケールすると...

コードはかなり、省略してアレンジしたものを載せておく。動作は保証出来ないがイメージが伝わればと思う
hub.go(読み取りスレッド部分)

func (c *Client) readPump() {
    defer func() {
        c.hub.unregister <- c
        c.conn.Close()
    }()
    :
    :
        message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
        // publishは自前実装の関数。メッセージをpubsubを利用して送信する
        publish(message)
    }
}

pubsub.go(pubsubのメッセージを受信してHubに渡すスレッド) hub自体は起動時に引数で受け取る

func SubscribeChannel(channelName string, h *Hub) {
    :
    :
    // メッセージの受信ループを作成
    for {
        switch val := psc.Receive().(type) {
        case redis.Message:
            var message *Message
            h.broadcast <- message
        }
    }
}

hub.go(Hub)

type Hub struct {
         // userを識別するためにstringをkeyに持つmapに変更
    clients map[string]*Client
    broadcast chan []byte
    register chan *Client
    unregister chan *Client
}

func (h *Hub) Run(m *database.Mongo) {
    for {
        select {
        // websocket経由で受信したメッセージを読み取るスレッドから受信
        case binaryMessage := <-h.Broadcast:
            switch message.Type {
            case "MESSAGE":
                h.Client[message.UserID].Send <- binaryMessage
            }
        }
    }
}

スケールした結果

何と同じメッセージがスケールしたサーバーの数(Pods)だけ、送信されているではないか。そりゃそうだ。読み取ったメッセージをpublishして、受け取り配信するためのスレッドと各サーバー毎に立ち上がっているのだから。ついでに処理もサーバー台数分だけ走るので、databaseの値もめちゃくちゃになっている。開発初期当時は、kubernetesのスケールへの理解が不足しており、redispubsubpublishを担当するスレッドも同様にスケールされるという意識が無かった

単純にpublishをどのタイミングですれば良いのかという判断が甘かった

改善した結果

f:id:takamizawa46:20191216225241p:plain
構成に手を加えて、スケールしても問題なく、処理が1度のみ走り、メッセージが複数回、送信されないようにした。setupの部分の更新はなく、全体の流れは以下の様になった。大きな変更としてはスレッド間で扱うチャネルを1つ追加した。元のexamplesに実装されているのはbroadCastというチャネルでこれは、読み取り専用プロセスからHubに渡すために使用するチャネルとして扱い、新規で追加したチャネルはpubsub経由で受信したメッセージを受信するためのチャネルとして用意をした

元のHubのstruct

type Hub struct {
    // Registered clients.
    clients map[*Client]bool

    // Inbound messages from the clients.
    broadcast chan []byte

    // Register requests from the clients.
    register chan *Client

    // Unregister requests from clients.
    unregister chan *Client
}

チャネルを追加

type Pubsub struct {
        // 送信先
        Target string
        Message []byte
}

type Hub struct {
         // userを識別するためにstringをkeyに持つmapに変更
    clients map[string]*Client
    broadcast chan []byte
    register chan *Client
    unregister chan *Client
    pubsub chan *Pubsub
}

メッセージの受信と送信

  • webscoketのセッションを張っているユーザーから新規のメッセージが送信される
  • ユーザー専用の読み取りスレッドでメッセージを読み込んでHubにチャネル経由でメッセージを渡す
  • メッセージを元に処理を行い、redispubsub機能で購読済みのチャンネルにメッセージをpublish
  • publishされたメッセージを受信用のスレッドで受け取り、チャンネルを購読しているサーバーにメッセージを配信して別のチャネルを通じてHubにメッセージを渡す
  • Hubで受け取ったメッセージから送信先のユーザーのwebsocketのセッションが自身のサーバーに存在するかを確認。存在している場合に、ユーザー専用の書き込みスレッドにチャネル経由でメッセージを渡す
  • 専用の書き込みスレッドで受け取ったメッセージをwebsocket経由でユーザーに送信する

このように変更することで無事に、スケールした状態でもメッセージが1通のみ送信されるようなり、冷や汗が引いた。並行処理プログラミングの経験も、スケールの経験も全く無かったので大変、勉強になったし、どのように考えれば良いのかが理解出来た

hub.go(読み取りスレッド部分)

func (c *Client) readPump() {
    defer func() {
        c.hub.unregister <- c
        c.conn.Close()
    }()
    :
    :
        message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
        // exampleの通りチャネルを経由してHubにメッセージを渡す
        c.hub.broadcast <- message
    }
}

hub.go(Hub)

func (h *Hub) Run(m *database.Mongo) {
    for {
        select {
        :
        :
        // websocket経由で受信したメッセージを読み取るスレッドから受信
        case binaryMessage := <-h.Broadcast:
            switch message.Type {
            case "MESSAGE":
                h.Client[message.UserID].Send <- binaryMessage
                // 配信に必要な情報をpubsub経由で渡す
                publish(&Pubsub{Target: "対象のユーザーを認識する値(eg: unique id)", Message: message})
            }
        }
    }
}

pubsub.go(pubsubのメッセージを受信してHubに渡すスレッド)

func SubscribeChannel(channelName string, h *Hub) {
    :
    :
    // メッセージの受信ループを作成
    for {
        switch val := psc.Receive().(type) {
        case redis.Message:
            var message *Message
            // 全体配信用のチャネルにメッセージを渡す
            h.pubsub <- message
        }
    }
}

hub.go(Hub)のpubsub用のメッセージの受信

func (h *Hub) Run(m *database.Mongo) {
    for {
        select {
        case publish := <-h.pubsub:
            if val, ok := h.Clients[publish.Target]; ok {
                val.Send <- publish.Message
            }
         :
         :
    }
}

こんな感じで落ち着いた。並行処理の設計は奥が深い...
draw.ioというサービスを構成図に書くのに初めて使ってみたが、かなり良いのでオススメ

おまけ Elixir/Erlangの勉強が意外なところで役に立った

並行処理と聞いて、反応せずにはいられない。ElixirErlangの得意とするところだ。実際に実装に使用しているのはgolangだが、このチャットアプリを実装するにあたって、「プログラミングElixir」や「プログラミング Erlang」で得たプロセスの設計方法やアクターモデルの考え方はgoroutinesを扱う上で非常に参考になった

スレッドの設計をし直す時にも「プログラミング Erlang」でJoeがどのように複数プロセスでの処理を考えるのか(最初にメッセージシーケンス図を書くらしい)を参考に、シーケンス図をそこそこ書いた
まさか、こんな形で役に立つとは思わなかったがElixirErlang万歳

参考文献