TTW_Bot_GO/internal/platforms/twitch_eventsub.go

417 lines
12 KiB
Go

package platforms
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"stream-bot/internal/logger"
"stream-bot/internal/twitchapi"
"sync"
"time"
"github.com/gorilla/websocket"
)
// TwitchEventSub реализует клиент EventSub через WebSocket
type TwitchEventSub struct {
manager *Manager
twitchAPI *twitchapi.TwitchAPI
conn *websocket.Conn
sessionID string
subscriptions map[string]bool // eventType -> подписана ли
mu sync.Mutex
stopCh chan struct{}
doneCh chan struct{}
ctx context.Context
cancel context.CancelFunc
}
// Структуры сообщений EventSub
type eventSubMessage struct {
Metadata struct {
MessageID string `json:"message_id"`
MessageType string `json:"message_type"` // session_welcome, session_keepalive, notification, session_revoke
MessageTimestamp string `json:"message_timestamp"`
SessionID string `json:"session_id,omitempty"`
SubscriptionType string `json:"subscription_type,omitempty"`
SubscriptionVersion string `json:"subscription_version,omitempty"`
} `json:"metadata"`
Payload struct {
Session struct {
ID string `json:"id"`
Status string `json:"status"`
ConnectedAt string `json:"connected_at"`
KeepaliveTimeoutSeconds int `json:"keepalive_timeout_seconds"`
ReconnectURL string `json:"reconnect_url"`
} `json:"session,omitempty"`
Subscription struct {
ID string `json:"id"`
Status string `json:"status"`
Type string `json:"type"`
Version string `json:"version"`
Condition map[string]string `json:"condition"`
Transport struct {
Method string `json:"method"`
SessionID string `json:"session_id"`
} `json:"transport"`
CreatedAt string `json:"created_at"`
} `json:"subscription,omitempty"`
Event json.RawMessage `json:"event,omitempty"`
} `json:"payload"`
}
func NewTwitchEventSub(manager *Manager, twitchAPI *twitchapi.TwitchAPI) *TwitchEventSub {
return &TwitchEventSub{
manager: manager,
twitchAPI: twitchAPI,
subscriptions: make(map[string]bool),
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
}
}
func (es *TwitchEventSub) Start() error {
es.ctx, es.cancel = context.WithCancel(context.Background())
logger.Info("Starting Twitch EventSub WebSocket client...")
go es.connect()
return nil
}
func (es *TwitchEventSub) Stop() {
if es.cancel != nil {
es.cancel()
}
if es.conn != nil {
_ = es.conn.Close()
}
select {
case <-es.doneCh:
logger.Info("Twitch EventSub stopped gracefully")
case <-time.After(3 * time.Second):
logger.Warn("Twitch EventSub stop timeout")
}
}
func (es *TwitchEventSub) connect() {
defer close(es.doneCh)
for {
select {
case <-es.ctx.Done():
return
default:
}
conn, _, err := websocket.DefaultDialer.Dial("wss://eventsub.wss.twitch.tv/ws", nil)
if err != nil {
logger.Error("EventSub WebSocket dial error: %v, reconnecting...", err)
select {
case <-es.ctx.Done():
return
case <-time.After(5 * time.Second):
continue
}
}
es.mu.Lock()
es.conn = conn
es.mu.Unlock()
err = es.readLoop(conn)
if err != nil {
logger.Error("EventSub read loop error: %v, reconnecting...", err)
_ = conn.Close()
select {
case <-es.ctx.Done():
return
case <-time.After(5 * time.Second):
continue
}
}
select {
case <-es.ctx.Done():
return
default:
}
}
}
func (es *TwitchEventSub) readLoop(conn *websocket.Conn) error {
for {
select {
case <-es.ctx.Done():
return es.ctx.Err()
default:
}
_, msg, err := conn.ReadMessage()
if err != nil {
return err
}
var envelope eventSubMessage
if err := json.Unmarshal(msg, &envelope); err != nil {
logger.Error("Failed to parse EventSub message: %v", err)
continue
}
switch envelope.Metadata.MessageType {
case "session_welcome":
es.handleWelcome(envelope)
case "session_keepalive":
// Ничего не логируем, чтобы не засорять логи
case "notification":
es.handleNotification(envelope)
case "session_revoke":
logger.Warn("EventSub session revoked, will reconnect")
return fmt.Errorf("session revoked")
default:
logger.Warn("Unknown EventSub message type: %s", envelope.Metadata.MessageType)
}
}
}
func (es *TwitchEventSub) handleWelcome(msg eventSubMessage) {
es.sessionID = msg.Payload.Session.ID
logger.Info("EventSub connected, session ID: %s", es.sessionID)
// После получения welcome подписываемся на события
broadcasterID, err := es.twitchAPI.GetBroadcasterID()
if err != nil {
logger.Error("Cannot get broadcaster ID for subscriptions: %v", err)
return
}
// Список событий для подписки
subscriptions := []struct {
Type string
Version string
Condition map[string]string
}{
{
Type: "channel.follow",
Version: "2",
Condition: map[string]string{
"broadcaster_user_id": broadcasterID,
"moderator_user_id": broadcasterID, // используем ID стримера как модератора
},
},
{
Type: "channel.subscribe",
Version: "1",
Condition: map[string]string{
"broadcaster_user_id": broadcasterID,
},
},
{
Type: "channel.subscription.gift",
Version: "1",
Condition: map[string]string{
"broadcaster_user_id": broadcasterID,
},
},
{
Type: "channel.raid",
Version: "1",
Condition: map[string]string{
"to_broadcaster_user_id": broadcasterID,
},
},
{
Type: "channel.channel_points_custom_reward_redemption.add",
Version: "1",
Condition: map[string]string{
"broadcaster_user_id": broadcasterID,
},
},
}
for _, sub := range subscriptions {
if err := es.subscribe(sub.Type, sub.Version, sub.Condition); err != nil {
logger.Error("Failed to subscribe to %s: %v", sub.Type, err)
} else {
es.mu.Lock()
es.subscriptions[sub.Type] = true
es.mu.Unlock()
logger.Info("Subscribed to %s", sub.Type)
}
}
}
// subscribe создаёт подписку через API Twitch, используя токен стримера (user_token)
func (es *TwitchEventSub) subscribe(eventType, version string, condition map[string]string) error {
// Берём токен стримера (user_token), т.к. для подписки на follow нужны права модератора
token, err := es.twitchAPI.GetUserToken() // добавим этот метод в twitchapi
if err != nil {
return err
}
clientID := es.twitchAPI.GetClientID()
subReq := struct {
Type string `json:"type"`
Version string `json:"version"`
Condition map[string]string `json:"condition"`
Transport struct {
Method string `json:"method"`
SessionID string `json:"session_id"`
} `json:"transport"`
}{
Type: eventType,
Version: version,
Condition: condition,
Transport: struct {
Method string `json:"method"`
SessionID string `json:"session_id"`
}{
Method: "websocket",
SessionID: es.sessionID,
},
}
body, _ := json.Marshal(subReq)
url := "https://api.twitch.tv/helix/eventsub/subscriptions"
req, _ := http.NewRequest("POST", url, bytes.NewReader(body))
req.Header.Set("Authorization", "Bearer "+token)
req.Header.Set("Client-Id", clientID)
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer func(Body io.ReadCloser) {
_ = Body.Close()
}(resp.Body)
if resp.StatusCode != 202 {
errBody, _ := io.ReadAll(resp.Body)
logger.Error("Twitch API error when subscribing to %s: status %d, body: %s", eventType, resp.StatusCode, string(errBody))
return fmt.Errorf("subscription failed: %d %s", resp.StatusCode, string(errBody))
}
logger.Info("Subscription to %s created successfully", eventType)
return nil
}
func (es *TwitchEventSub) handleNotification(msg eventSubMessage) {
eventType := msg.Metadata.SubscriptionType
logger.Info("EventSub notification received: %s", eventType)
var params map[string]string
switch eventType {
case "channel.follow":
var data struct {
UserName string `json:"user_name"`
UserLogin string `json:"user_login"`
FollowedAt string `json:"followed_at"`
}
if err := json.Unmarshal(msg.Payload.Event, &data); err != nil {
logger.Error("Failed to parse follow event: %v", err)
return
}
logger.Info("Follow event: %s followed the channel at %s", data.UserName, data.FollowedAt)
params = map[string]string{
"username": data.UserName,
"user_login": data.UserLogin,
"followed_at": data.FollowedAt,
}
es.manager.OnEvent("twitch", "follow", params)
case "channel.subscribe":
var data struct {
UserName string `json:"user_name"`
Tier string `json:"tier"` // "1000", "2000", "3000"
IsGift bool `json:"is_gift"`
}
if err := json.Unmarshal(msg.Payload.Event, &data); err != nil {
logger.Error("Failed to parse subscribe event: %v", err)
return
}
logger.Info("Subscribe event: %s subscribed with tier %s (is_gift=%v)", data.UserName, data.Tier, data.IsGift)
params = map[string]string{
"username": data.UserName,
"tier": data.Tier,
"is_gift": fmt.Sprintf("%t", data.IsGift),
}
es.manager.OnEvent("twitch", "subscribe", params)
case "channel.subscription.gift":
var data struct {
UserName string `json:"user_name"` // даритель
RecipientName string `json:"recipient_user_name"`
Tier string `json:"tier"`
CumulativeTotal int `json:"cumulative_total"`
}
if err := json.Unmarshal(msg.Payload.Event, &data); err != nil {
logger.Error("Failed to parse gift sub event: %v", err)
return
}
logger.Info("Gift sub event: %s gifted %d subscription(s) (tier %s), recipient: %s", data.UserName, data.CumulativeTotal, data.Tier, data.RecipientName)
params = map[string]string{
"gifter": data.UserName,
"recipient": data.RecipientName,
"tier": data.Tier,
"cumulative_total": fmt.Sprintf("%d", data.CumulativeTotal),
}
es.manager.OnEvent("twitch", "gift_sub", params)
case "channel.raid":
var data struct {
FromBroadcasterName string `json:"from_broadcaster_user_name"`
Viewers int `json:"viewers"`
}
if err := json.Unmarshal(msg.Payload.Event, &data); err != nil {
logger.Error("Failed to parse raid event: %v", err)
return
}
logger.Info("Raid event: %s raided with %d viewers", data.FromBroadcasterName, data.Viewers)
params = map[string]string{
"from": data.FromBroadcasterName,
"viewers": fmt.Sprintf("%d", data.Viewers),
}
es.manager.OnEvent("twitch", "raid", params)
case "channel.channel_points_custom_reward_redemption.add":
var data struct {
UserName string `json:"user_name"`
Reward struct {
Title string `json:"title"`
Cost int `json:"cost"`
} `json:"reward"`
UserInput string `json:"user_input"`
}
if err := json.Unmarshal(msg.Payload.Event, &data); err != nil {
logger.Error("Failed to parse reward redemption event: %v", err)
return
}
logger.Info("Reward redemption: %s redeemed '%s' (cost %d) with input: %s", data.UserName, data.Reward.Title, data.Reward.Cost, data.UserInput)
params = map[string]string{
"username": data.UserName,
"reward_title": data.Reward.Title,
"reward_cost": fmt.Sprintf("%d", data.Reward.Cost),
"user_input": data.UserInput,
}
es.manager.OnEvent("twitch", "reward_redemption", params)
default:
logger.Warn("Unhandled event type: %s", eventType)
}
}
func (es *TwitchEventSub) IsConnected() bool {
es.mu.Lock()
defer es.mu.Unlock()
return es.conn != nil
}
func (es *TwitchEventSub) GetSubscriptions() []string {
es.mu.Lock()
defer es.mu.Unlock()
subs := make([]string, 0, len(es.subscriptions))
for s := range es.subscriptions {
subs = append(subs, s)
}
return subs
}