package platforms import ( "bytes" "context" "encoding/json" "fmt" "io" "net/http" "stream-bot/internal/logger" "stream-bot/internal/twitchapi" "sync" "time" "github.com/gorilla/websocket" ) // TwitchEventSub реализует клиент EventSub через WebSocket type TwitchEventSub struct { manager *Manager twitchAPI *twitchapi.TwitchAPI conn *websocket.Conn sessionID string subscriptions map[string]bool // eventType -> подписана ли mu sync.Mutex stopCh chan struct{} doneCh chan struct{} ctx context.Context cancel context.CancelFunc } // Структуры сообщений EventSub type eventSubMessage struct { Metadata struct { MessageID string `json:"message_id"` MessageType string `json:"message_type"` // session_welcome, session_keepalive, notification, session_revoke MessageTimestamp string `json:"message_timestamp"` SessionID string `json:"session_id,omitempty"` SubscriptionType string `json:"subscription_type,omitempty"` SubscriptionVersion string `json:"subscription_version,omitempty"` } `json:"metadata"` Payload struct { Session struct { ID string `json:"id"` Status string `json:"status"` ConnectedAt string `json:"connected_at"` KeepaliveTimeoutSeconds int `json:"keepalive_timeout_seconds"` ReconnectURL string `json:"reconnect_url"` } `json:"session,omitempty"` Subscription struct { ID string `json:"id"` Status string `json:"status"` Type string `json:"type"` Version string `json:"version"` Condition map[string]string `json:"condition"` Transport struct { Method string `json:"method"` SessionID string `json:"session_id"` } `json:"transport"` CreatedAt string `json:"created_at"` } `json:"subscription,omitempty"` Event json.RawMessage `json:"event,omitempty"` } `json:"payload"` } func NewTwitchEventSub(manager *Manager, twitchAPI *twitchapi.TwitchAPI) *TwitchEventSub { return &TwitchEventSub{ manager: manager, twitchAPI: twitchAPI, subscriptions: make(map[string]bool), stopCh: make(chan struct{}), doneCh: make(chan struct{}), } } func (es *TwitchEventSub) Start() error { es.ctx, es.cancel = context.WithCancel(context.Background()) logger.Info("Starting Twitch EventSub WebSocket client...") go es.connect() return nil } func (es *TwitchEventSub) Stop() { if es.cancel != nil { es.cancel() } if es.conn != nil { _ = es.conn.Close() } select { case <-es.doneCh: logger.Info("Twitch EventSub stopped gracefully") case <-time.After(3 * time.Second): logger.Warn("Twitch EventSub stop timeout") } } func (es *TwitchEventSub) connect() { defer close(es.doneCh) for { select { case <-es.ctx.Done(): return default: } conn, _, err := websocket.DefaultDialer.Dial("wss://eventsub.wss.twitch.tv/ws", nil) if err != nil { logger.Error("EventSub WebSocket dial error: %v, reconnecting...", err) select { case <-es.ctx.Done(): return case <-time.After(5 * time.Second): continue } } es.mu.Lock() es.conn = conn es.mu.Unlock() err = es.readLoop(conn) if err != nil { logger.Error("EventSub read loop error: %v, reconnecting...", err) _ = conn.Close() select { case <-es.ctx.Done(): return case <-time.After(5 * time.Second): continue } } select { case <-es.ctx.Done(): return default: } } } func (es *TwitchEventSub) readLoop(conn *websocket.Conn) error { for { select { case <-es.ctx.Done(): return es.ctx.Err() default: } _, msg, err := conn.ReadMessage() if err != nil { return err } var envelope eventSubMessage if err := json.Unmarshal(msg, &envelope); err != nil { logger.Error("Failed to parse EventSub message: %v", err) continue } switch envelope.Metadata.MessageType { case "session_welcome": es.handleWelcome(envelope) case "session_keepalive": // Ничего не логируем, чтобы не засорять логи case "notification": es.handleNotification(envelope) case "session_revoke": logger.Warn("EventSub session revoked, will reconnect") return fmt.Errorf("session revoked") default: logger.Warn("Unknown EventSub message type: %s", envelope.Metadata.MessageType) } } } func (es *TwitchEventSub) handleWelcome(msg eventSubMessage) { es.sessionID = msg.Payload.Session.ID logger.Info("EventSub connected, session ID: %s", es.sessionID) // После получения welcome подписываемся на события broadcasterID, err := es.twitchAPI.GetBroadcasterID() if err != nil { logger.Error("Cannot get broadcaster ID for subscriptions: %v", err) return } // Список событий для подписки subscriptions := []struct { Type string Version string Condition map[string]string }{ { Type: "channel.follow", Version: "2", Condition: map[string]string{ "broadcaster_user_id": broadcasterID, "moderator_user_id": broadcasterID, // используем ID стримера как модератора }, }, { Type: "channel.subscribe", Version: "1", Condition: map[string]string{ "broadcaster_user_id": broadcasterID, }, }, { Type: "channel.subscription.gift", Version: "1", Condition: map[string]string{ "broadcaster_user_id": broadcasterID, }, }, { Type: "channel.raid", Version: "1", Condition: map[string]string{ "to_broadcaster_user_id": broadcasterID, }, }, { Type: "channel.channel_points_custom_reward_redemption.add", Version: "1", Condition: map[string]string{ "broadcaster_user_id": broadcasterID, }, }, } for _, sub := range subscriptions { if err := es.subscribe(sub.Type, sub.Version, sub.Condition); err != nil { logger.Error("Failed to subscribe to %s: %v", sub.Type, err) } else { es.mu.Lock() es.subscriptions[sub.Type] = true es.mu.Unlock() logger.Info("Subscribed to %s", sub.Type) } } } // subscribe создаёт подписку через API Twitch, используя токен стримера (user_token) func (es *TwitchEventSub) subscribe(eventType, version string, condition map[string]string) error { // Берём токен стримера (user_token), т.к. для подписки на follow нужны права модератора token, err := es.twitchAPI.GetUserToken() // добавим этот метод в twitchapi if err != nil { return err } clientID := es.twitchAPI.GetClientID() subReq := struct { Type string `json:"type"` Version string `json:"version"` Condition map[string]string `json:"condition"` Transport struct { Method string `json:"method"` SessionID string `json:"session_id"` } `json:"transport"` }{ Type: eventType, Version: version, Condition: condition, Transport: struct { Method string `json:"method"` SessionID string `json:"session_id"` }{ Method: "websocket", SessionID: es.sessionID, }, } body, _ := json.Marshal(subReq) url := "https://api.twitch.tv/helix/eventsub/subscriptions" req, _ := http.NewRequest("POST", url, bytes.NewReader(body)) req.Header.Set("Authorization", "Bearer "+token) req.Header.Set("Client-Id", clientID) req.Header.Set("Content-Type", "application/json") resp, err := http.DefaultClient.Do(req) if err != nil { return err } defer func(Body io.ReadCloser) { _ = Body.Close() }(resp.Body) if resp.StatusCode != 202 { errBody, _ := io.ReadAll(resp.Body) logger.Error("Twitch API error when subscribing to %s: status %d, body: %s", eventType, resp.StatusCode, string(errBody)) return fmt.Errorf("subscription failed: %d %s", resp.StatusCode, string(errBody)) } logger.Info("Subscription to %s created successfully", eventType) return nil } func (es *TwitchEventSub) handleNotification(msg eventSubMessage) { eventType := msg.Metadata.SubscriptionType logger.Info("EventSub notification received: %s", eventType) var params map[string]string switch eventType { case "channel.follow": var data struct { UserName string `json:"user_name"` UserLogin string `json:"user_login"` FollowedAt string `json:"followed_at"` } if err := json.Unmarshal(msg.Payload.Event, &data); err != nil { logger.Error("Failed to parse follow event: %v", err) return } logger.Info("Follow event: %s followed the channel at %s", data.UserName, data.FollowedAt) params = map[string]string{ "username": data.UserName, "user_login": data.UserLogin, "followed_at": data.FollowedAt, } es.manager.OnEvent("twitch", "follow", params) case "channel.subscribe": var data struct { UserName string `json:"user_name"` Tier string `json:"tier"` // "1000", "2000", "3000" IsGift bool `json:"is_gift"` } if err := json.Unmarshal(msg.Payload.Event, &data); err != nil { logger.Error("Failed to parse subscribe event: %v", err) return } logger.Info("Subscribe event: %s subscribed with tier %s (is_gift=%v)", data.UserName, data.Tier, data.IsGift) params = map[string]string{ "username": data.UserName, "tier": data.Tier, "is_gift": fmt.Sprintf("%t", data.IsGift), } es.manager.OnEvent("twitch", "subscribe", params) case "channel.subscription.gift": var data struct { UserName string `json:"user_name"` // даритель RecipientName string `json:"recipient_user_name"` Tier string `json:"tier"` CumulativeTotal int `json:"cumulative_total"` } if err := json.Unmarshal(msg.Payload.Event, &data); err != nil { logger.Error("Failed to parse gift sub event: %v", err) return } logger.Info("Gift sub event: %s gifted %d subscription(s) (tier %s), recipient: %s", data.UserName, data.CumulativeTotal, data.Tier, data.RecipientName) params = map[string]string{ "gifter": data.UserName, "recipient": data.RecipientName, "tier": data.Tier, "cumulative_total": fmt.Sprintf("%d", data.CumulativeTotal), } es.manager.OnEvent("twitch", "gift_sub", params) case "channel.raid": var data struct { FromBroadcasterName string `json:"from_broadcaster_user_name"` Viewers int `json:"viewers"` } if err := json.Unmarshal(msg.Payload.Event, &data); err != nil { logger.Error("Failed to parse raid event: %v", err) return } logger.Info("Raid event: %s raided with %d viewers", data.FromBroadcasterName, data.Viewers) params = map[string]string{ "from": data.FromBroadcasterName, "viewers": fmt.Sprintf("%d", data.Viewers), } es.manager.OnEvent("twitch", "raid", params) case "channel.channel_points_custom_reward_redemption.add": var data struct { UserName string `json:"user_name"` Reward struct { Title string `json:"title"` Cost int `json:"cost"` } `json:"reward"` UserInput string `json:"user_input"` } if err := json.Unmarshal(msg.Payload.Event, &data); err != nil { logger.Error("Failed to parse reward redemption event: %v", err) return } logger.Info("Reward redemption: %s redeemed '%s' (cost %d) with input: %s", data.UserName, data.Reward.Title, data.Reward.Cost, data.UserInput) params = map[string]string{ "username": data.UserName, "reward_title": data.Reward.Title, "reward_cost": fmt.Sprintf("%d", data.Reward.Cost), "user_input": data.UserInput, } es.manager.OnEvent("twitch", "reward_redemption", params) default: logger.Warn("Unhandled event type: %s", eventType) } } func (es *TwitchEventSub) IsConnected() bool { es.mu.Lock() defer es.mu.Unlock() return es.conn != nil } func (es *TwitchEventSub) GetSubscriptions() []string { es.mu.Lock() defer es.mu.Unlock() subs := make([]string, 0, len(es.subscriptions)) for s := range es.subscriptions { subs = append(subs, s) } return subs }