4 min read

Writing WebSocket with Go

Writing WebSocket with Go

The WebSocket protocol, as outlined in RFC 6455, enables persistent, bidirectional communication between clients and servers without the need for repeated HTTP requests. This makes it particularly suitable for applications requiring continuous data exchange, such as online gaming or real-time trading platforms.

In this guide, we'll walk through creating a WebSocket server in Go using the Gin framework.

Prerequisites:

  • Go version 1.15 or higher.

Server Setup:

Initialize the Go Project and Import Dependencies:Begin by setting up your Go application and importing the necessary packages:

go get github.com/gin-gonic/gin
go get nhooyr.io/websocket
go get github.com/sirupsen/logrus

Configure the Gin Engine:In your main function, initialize the Gin engine and define a default route:

engine := gin.New()
engine.Use(gin.Recovery())
engine.GET("/", func(c *gin.Context) {
    c.JSON(http.StatusOK, map[string]interface{}{
        "service": "Go WebSocket Demo",
        "version": "v1.0",
        "time":    time.Now().Unix(),
    })
})

Implement WebSocket Handling:Create a directory named ws or websocket, and within it, create a file named ws.go. Define the Subscriber and ListSubscribers types:

type Subscriber struct {
    Id        string
    Message   chan []byte
    closeSlow func()
}

type ListSubscribers struct {
    subscriberMessageBuffer int
    publishLimiter          *rate.Limiter
    subscribersMu           sync.Mutex
    subscribers             map[*Subscriber]struct{}
}

The Subscriber represents a client connected to the server, while ListSubscribers manages all active subscribers.

Manage Subscribers:Implement methods to add and remove subscribers:

func (as *ListSubscribers) addSubscriber(s *Subscriber) {
    as.subscribersMu.Lock()
    as.subscribers[s] = struct{}{}
    as.subscribersMu.Unlock()
}

func (as *ListSubscribers) deleteSubscriber(s *Subscriber) {
    as.subscribersMu.Lock()
    delete(as.subscribers, s)
    as.subscribersMu.Unlock()
}

Set Up WebSocket Routes:Define a Handler type and set up WebSocket routes:

type Handler struct {
}

func WsHandler(engine *gin.Engine) {
    handler := &Handler{}
    ListSubs = &ListSubscribers{
        subscriberMessageBuffer: 16,
        subscribers:             make(map[*Subscriber]struct{}),
        publishLimiter:          rate.NewLimiter(rate.Every(time.Millisecond*100), 8),
    }
    Group := engine.Group("ws")
    {
        Group.GET(":id", handler.SubscribeHandler)
        Group.POST("", handler.PublishToAll)
        Group.POST(":id", handler.PublishToOne)
    }
}

The publishLimiter controls the rate of messages sent from the server to subscribers, limiting it to 8 messages every 100 milliseconds.

Handle Subscriptions:Implement the SubscribeHandler and subscribe methods to manage client connections:

func (h *Handler) SubscribeHandler(c *gin.Context) {
    wsConn, err := websocket.Accept(c.Writer, c.Request, nil)
    if err != nil {
        c.JSON(http.StatusServiceUnavailable, map[string]interface{}{
            "message": "subscribe error",
        })
        return
    }
    defer wsConn.Close(websocket.StatusInternalError, "")

    err = h.subscribe(c, wsConn)
    if errors.Is(err, context.Canceled) {
        return
    }
    if websocket.CloseStatus(err) == websocket.StatusNormalClosure ||
        websocket.CloseStatus(err) == websocket.StatusGoingAway {
        return
    }
    if err != nil {
        log.Error(err.Error())
        return
    }
}

func (h *Handler) subscribe(c *gin.Context, wsConn *websocket.Conn) error {
    ctx := wsConn.CloseRead(c)
    agentId := c.Param("id")
    if len(agentId) < 1 {
        return errors.New("id is missing")
    }
    s := &Subscriber{
        Id:      agentId,
        Message: make(chan []byte, ListSubs.subscriberMessageBuffer),
        closeSlow: func() {
            wsConn.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages")
        },
    }
    ListSubs.addSubscriber(s)
    defer ListSubs.deleteSubscriber(s)

    for {
        select {
        case msg := <-s.Message:
            err := writeTimeout(ctx, time.Second*5, wsConn, msg)
            if err != nil {
                return err
            }
        case <-ctx.Done():
            return ctx.Err()
        }
    }
}

func writeTimeout(ctx context.Context, timeout time.Duration, c *websocket.Conn, msg []byte) error {
    ctx, cancel := context.WithTimeout(ctx, timeout)
    defer cancel()

    return c.Write(ctx, websocket.MessageText, msg)
}

The SubscribeHandler function accepts WebSocket connections and delegates to the subscribe method, which manages the lifecycle of a subscriber.

Publish Messages:Implement methods to broadcast messages to all subscribers or a specific subscriber:

func (h *Handler) PublishToAll(c *gin.Context) {
    jsonBody := make(map[string]interface{})
    if err := c.BindJSON(&jsonBody); err != nil {
        c.JSON(http.StatusServiceUnavailable, map[string]interface{}{
            "message": "publish error",
        })
        return
    }
    message, _ := jsonBody["message"].(string)
    h.publish([]byte(message))
    c.Writer.WriteHeader(http.StatusAccepted)
}

func (h *Handler) publish(msg []byte) {
    ListSubs.subscribersMu.Lock()
    defer ListSubs.subscribersMu.Unlock()
    ListSubs.publishLimiter.Wait(context.Background())
    for s := range ListSubs.subscribers {
        select {
        case s.Message <- msg:
        default:
            go s.closeSlow()
        }
    }
}

func (h *Handler) PublishToOne(c *gin.Context) {
    jsonBody := make(map[string]interface{})
    if err := c.BindJSON(&jsonBody); err != nil {
        c.JSON(http.StatusServiceUnavailable, map[string]interface{}{
                       "message": "publish error",
           })
           return
       }
       message, _ := jsonBody["message"].(string)
       agentId := c.Param("id")

       ListSubs.subscribersMu.Lock()
       defer ListSubs.subscribersMu.Unlock()

       for s := range ListSubs.subscribers {
           if s.Id == agentId {
               select {
               case s.Message <- []byte(message):
               default:
                   go s.closeSlow()
               }
               break
           }
       }

       c.Writer.WriteHeader(http.StatusAccepted)
   }

Summary

Your WebSocket server in Go now supports:

  • A /ws/:id route for clients to connect via WebSocket and subscribe to messages.
  • A POST /ws route to broadcast a message to all connected clients.
  • A POST /ws/:id route to send a message to one specific subscriber by ID.

Key Points to Remember

  • Each subscriber is identified by an Id which is passed through the WebSocket path.
  • Messages are sent using buffered channels to avoid blocking.
  • A rate.Limiter is used to control how often messages can be broadcast, preventing overload.
  • Slow consumers are closed using the closeSlow function.
  • All subscriber state is protected using a sync.Mutex.

Test Client with Postman

Postman now supports WebSocket Request.

image.png

Connect to server with id 111

image.png

Publish message to all subscribers with API publish all.

image.png
image.png

Publish message to subscriber id 111 with API publish.

image.png
image.png

Repository: go-websocket-demo

Thank you for reading. <3