Writing WebSocket with Go

The WebSocket protocol, as outlined in RFC 6455, enables persistent, bidirectional communication between clients and servers without the need for repeated HTTP requests. This makes it particularly suitable for applications requiring continuous data exchange, such as online gaming or real-time trading platforms.
In this guide, we'll walk through creating a WebSocket server in Go using the Gin framework.
Prerequisites:
- Go version 1.15 or higher.
Server Setup:
Initialize the Go Project and Import Dependencies:Begin by setting up your Go application and importing the necessary packages:
go get github.com/gin-gonic/gin
go get nhooyr.io/websocket
go get github.com/sirupsen/logrus
Configure the Gin Engine:In your main
function, initialize the Gin engine and define a default route:
engine := gin.New()
engine.Use(gin.Recovery())
engine.GET("/", func(c *gin.Context) {
c.JSON(http.StatusOK, map[string]interface{}{
"service": "Go WebSocket Demo",
"version": "v1.0",
"time": time.Now().Unix(),
})
})
Implement WebSocket Handling:Create a directory named ws
or websocket
, and within it, create a file named ws.go
. Define the Subscriber
and ListSubscribers
types:
type Subscriber struct {
Id string
Message chan []byte
closeSlow func()
}
type ListSubscribers struct {
subscriberMessageBuffer int
publishLimiter *rate.Limiter
subscribersMu sync.Mutex
subscribers map[*Subscriber]struct{}
}
The Subscriber
represents a client connected to the server, while ListSubscribers
manages all active subscribers.
Manage Subscribers:Implement methods to add and remove subscribers:
func (as *ListSubscribers) addSubscriber(s *Subscriber) {
as.subscribersMu.Lock()
as.subscribers[s] = struct{}{}
as.subscribersMu.Unlock()
}
func (as *ListSubscribers) deleteSubscriber(s *Subscriber) {
as.subscribersMu.Lock()
delete(as.subscribers, s)
as.subscribersMu.Unlock()
}
Set Up WebSocket Routes:Define a Handler
type and set up WebSocket routes:
type Handler struct {
}
func WsHandler(engine *gin.Engine) {
handler := &Handler{}
ListSubs = &ListSubscribers{
subscriberMessageBuffer: 16,
subscribers: make(map[*Subscriber]struct{}),
publishLimiter: rate.NewLimiter(rate.Every(time.Millisecond*100), 8),
}
Group := engine.Group("ws")
{
Group.GET(":id", handler.SubscribeHandler)
Group.POST("", handler.PublishToAll)
Group.POST(":id", handler.PublishToOne)
}
}
The publishLimiter
controls the rate of messages sent from the server to subscribers, limiting it to 8 messages every 100 milliseconds.
Handle Subscriptions:Implement the SubscribeHandler
and subscribe
methods to manage client connections:
func (h *Handler) SubscribeHandler(c *gin.Context) {
wsConn, err := websocket.Accept(c.Writer, c.Request, nil)
if err != nil {
c.JSON(http.StatusServiceUnavailable, map[string]interface{}{
"message": "subscribe error",
})
return
}
defer wsConn.Close(websocket.StatusInternalError, "")
err = h.subscribe(c, wsConn)
if errors.Is(err, context.Canceled) {
return
}
if websocket.CloseStatus(err) == websocket.StatusNormalClosure ||
websocket.CloseStatus(err) == websocket.StatusGoingAway {
return
}
if err != nil {
log.Error(err.Error())
return
}
}
func (h *Handler) subscribe(c *gin.Context, wsConn *websocket.Conn) error {
ctx := wsConn.CloseRead(c)
agentId := c.Param("id")
if len(agentId) < 1 {
return errors.New("id is missing")
}
s := &Subscriber{
Id: agentId,
Message: make(chan []byte, ListSubs.subscriberMessageBuffer),
closeSlow: func() {
wsConn.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages")
},
}
ListSubs.addSubscriber(s)
defer ListSubs.deleteSubscriber(s)
for {
select {
case msg := <-s.Message:
err := writeTimeout(ctx, time.Second*5, wsConn, msg)
if err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
}
}
}
func writeTimeout(ctx context.Context, timeout time.Duration, c *websocket.Conn, msg []byte) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return c.Write(ctx, websocket.MessageText, msg)
}
The SubscribeHandler
function accepts WebSocket connections and delegates to the subscribe
method, which manages the lifecycle of a subscriber.
Publish Messages:Implement methods to broadcast messages to all subscribers or a specific subscriber:
func (h *Handler) PublishToAll(c *gin.Context) {
jsonBody := make(map[string]interface{})
if err := c.BindJSON(&jsonBody); err != nil {
c.JSON(http.StatusServiceUnavailable, map[string]interface{}{
"message": "publish error",
})
return
}
message, _ := jsonBody["message"].(string)
h.publish([]byte(message))
c.Writer.WriteHeader(http.StatusAccepted)
}
func (h *Handler) publish(msg []byte) {
ListSubs.subscribersMu.Lock()
defer ListSubs.subscribersMu.Unlock()
ListSubs.publishLimiter.Wait(context.Background())
for s := range ListSubs.subscribers {
select {
case s.Message <- msg:
default:
go s.closeSlow()
}
}
}
func (h *Handler) PublishToOne(c *gin.Context) {
jsonBody := make(map[string]interface{})
if err := c.BindJSON(&jsonBody); err != nil {
c.JSON(http.StatusServiceUnavailable, map[string]interface{}{
"message": "publish error",
})
return
}
message, _ := jsonBody["message"].(string)
agentId := c.Param("id")
ListSubs.subscribersMu.Lock()
defer ListSubs.subscribersMu.Unlock()
for s := range ListSubs.subscribers {
if s.Id == agentId {
select {
case s.Message <- []byte(message):
default:
go s.closeSlow()
}
break
}
}
c.Writer.WriteHeader(http.StatusAccepted)
}
Summary
Your WebSocket server in Go now supports:
- A
/ws/:id
route for clients to connect via WebSocket and subscribe to messages. - A
POST /ws
route to broadcast a message to all connected clients. - A
POST /ws/:id
route to send a message to one specific subscriber by ID.
Key Points to Remember
- Each subscriber is identified by an
Id
which is passed through the WebSocket path. - Messages are sent using buffered channels to avoid blocking.
- A
rate.Limiter
is used to control how often messages can be broadcast, preventing overload. - Slow consumers are closed using the
closeSlow
function. - All subscriber state is protected using a
sync.Mutex
.
Test Client with Postman
Postman now supports WebSocket Request.

Connect to server with id 111

Publish message to all subscribers with API publish all.


Publish message to subscriber id 111 with API publish.


Repository: go-websocket-demo
Thank you for reading. <3
Member discussion