417 lines
12 KiB
Go
417 lines
12 KiB
Go
package platforms
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"stream-bot/internal/logger"
|
|
"stream-bot/internal/twitchapi"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
)
|
|
|
|
// TwitchEventSub реализует клиент EventSub через WebSocket
|
|
type TwitchEventSub struct {
|
|
manager *Manager
|
|
twitchAPI *twitchapi.TwitchAPI
|
|
conn *websocket.Conn
|
|
sessionID string
|
|
subscriptions map[string]bool // eventType -> подписана ли
|
|
mu sync.Mutex
|
|
stopCh chan struct{}
|
|
doneCh chan struct{}
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
// Структуры сообщений EventSub
|
|
type eventSubMessage struct {
|
|
Metadata struct {
|
|
MessageID string `json:"message_id"`
|
|
MessageType string `json:"message_type"` // session_welcome, session_keepalive, notification, session_revoke
|
|
MessageTimestamp string `json:"message_timestamp"`
|
|
SessionID string `json:"session_id,omitempty"`
|
|
SubscriptionType string `json:"subscription_type,omitempty"`
|
|
SubscriptionVersion string `json:"subscription_version,omitempty"`
|
|
} `json:"metadata"`
|
|
Payload struct {
|
|
Session struct {
|
|
ID string `json:"id"`
|
|
Status string `json:"status"`
|
|
ConnectedAt string `json:"connected_at"`
|
|
KeepaliveTimeoutSeconds int `json:"keepalive_timeout_seconds"`
|
|
ReconnectURL string `json:"reconnect_url"`
|
|
} `json:"session,omitempty"`
|
|
Subscription struct {
|
|
ID string `json:"id"`
|
|
Status string `json:"status"`
|
|
Type string `json:"type"`
|
|
Version string `json:"version"`
|
|
Condition map[string]string `json:"condition"`
|
|
Transport struct {
|
|
Method string `json:"method"`
|
|
SessionID string `json:"session_id"`
|
|
} `json:"transport"`
|
|
CreatedAt string `json:"created_at"`
|
|
} `json:"subscription,omitempty"`
|
|
Event json.RawMessage `json:"event,omitempty"`
|
|
} `json:"payload"`
|
|
}
|
|
|
|
func NewTwitchEventSub(manager *Manager, twitchAPI *twitchapi.TwitchAPI) *TwitchEventSub {
|
|
return &TwitchEventSub{
|
|
manager: manager,
|
|
twitchAPI: twitchAPI,
|
|
subscriptions: make(map[string]bool),
|
|
stopCh: make(chan struct{}),
|
|
doneCh: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
func (es *TwitchEventSub) Start() error {
|
|
es.ctx, es.cancel = context.WithCancel(context.Background())
|
|
logger.Info("Starting Twitch EventSub WebSocket client...")
|
|
go es.connect()
|
|
return nil
|
|
}
|
|
|
|
func (es *TwitchEventSub) Stop() {
|
|
if es.cancel != nil {
|
|
es.cancel()
|
|
}
|
|
if es.conn != nil {
|
|
_ = es.conn.Close()
|
|
}
|
|
select {
|
|
case <-es.doneCh:
|
|
logger.Info("Twitch EventSub stopped gracefully")
|
|
case <-time.After(3 * time.Second):
|
|
logger.Warn("Twitch EventSub stop timeout")
|
|
}
|
|
}
|
|
|
|
func (es *TwitchEventSub) connect() {
|
|
defer close(es.doneCh)
|
|
for {
|
|
select {
|
|
case <-es.ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
conn, _, err := websocket.DefaultDialer.Dial("wss://eventsub.wss.twitch.tv/ws", nil)
|
|
if err != nil {
|
|
logger.Error("EventSub WebSocket dial error: %v, reconnecting...", err)
|
|
select {
|
|
case <-es.ctx.Done():
|
|
return
|
|
case <-time.After(5 * time.Second):
|
|
continue
|
|
}
|
|
}
|
|
es.mu.Lock()
|
|
es.conn = conn
|
|
es.mu.Unlock()
|
|
|
|
err = es.readLoop(conn)
|
|
if err != nil {
|
|
logger.Error("EventSub read loop error: %v, reconnecting...", err)
|
|
_ = conn.Close()
|
|
select {
|
|
case <-es.ctx.Done():
|
|
return
|
|
case <-time.After(5 * time.Second):
|
|
continue
|
|
}
|
|
}
|
|
|
|
select {
|
|
case <-es.ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
func (es *TwitchEventSub) readLoop(conn *websocket.Conn) error {
|
|
for {
|
|
select {
|
|
case <-es.ctx.Done():
|
|
return es.ctx.Err()
|
|
default:
|
|
}
|
|
_, msg, err := conn.ReadMessage()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var envelope eventSubMessage
|
|
if err := json.Unmarshal(msg, &envelope); err != nil {
|
|
logger.Error("Failed to parse EventSub message: %v", err)
|
|
continue
|
|
}
|
|
|
|
switch envelope.Metadata.MessageType {
|
|
case "session_welcome":
|
|
es.handleWelcome(envelope)
|
|
case "session_keepalive":
|
|
// Ничего не логируем, чтобы не засорять логи
|
|
case "notification":
|
|
es.handleNotification(envelope)
|
|
case "session_revoke":
|
|
logger.Warn("EventSub session revoked, will reconnect")
|
|
return fmt.Errorf("session revoked")
|
|
default:
|
|
logger.Warn("Unknown EventSub message type: %s", envelope.Metadata.MessageType)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (es *TwitchEventSub) handleWelcome(msg eventSubMessage) {
|
|
es.sessionID = msg.Payload.Session.ID
|
|
logger.Info("EventSub connected, session ID: %s", es.sessionID)
|
|
|
|
// После получения welcome подписываемся на события
|
|
broadcasterID, err := es.twitchAPI.GetBroadcasterID()
|
|
if err != nil {
|
|
logger.Error("Cannot get broadcaster ID for subscriptions: %v", err)
|
|
return
|
|
}
|
|
|
|
// Список событий для подписки
|
|
subscriptions := []struct {
|
|
Type string
|
|
Version string
|
|
Condition map[string]string
|
|
}{
|
|
{
|
|
Type: "channel.follow",
|
|
Version: "2",
|
|
Condition: map[string]string{
|
|
"broadcaster_user_id": broadcasterID,
|
|
"moderator_user_id": broadcasterID, // используем ID стримера как модератора
|
|
},
|
|
},
|
|
{
|
|
Type: "channel.subscribe",
|
|
Version: "1",
|
|
Condition: map[string]string{
|
|
"broadcaster_user_id": broadcasterID,
|
|
},
|
|
},
|
|
{
|
|
Type: "channel.subscription.gift",
|
|
Version: "1",
|
|
Condition: map[string]string{
|
|
"broadcaster_user_id": broadcasterID,
|
|
},
|
|
},
|
|
{
|
|
Type: "channel.raid",
|
|
Version: "1",
|
|
Condition: map[string]string{
|
|
"to_broadcaster_user_id": broadcasterID,
|
|
},
|
|
},
|
|
{
|
|
Type: "channel.channel_points_custom_reward_redemption.add",
|
|
Version: "1",
|
|
Condition: map[string]string{
|
|
"broadcaster_user_id": broadcasterID,
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, sub := range subscriptions {
|
|
if err := es.subscribe(sub.Type, sub.Version, sub.Condition); err != nil {
|
|
logger.Error("Failed to subscribe to %s: %v", sub.Type, err)
|
|
} else {
|
|
es.mu.Lock()
|
|
es.subscriptions[sub.Type] = true
|
|
es.mu.Unlock()
|
|
logger.Info("Subscribed to %s", sub.Type)
|
|
}
|
|
}
|
|
}
|
|
|
|
// subscribe создаёт подписку через API Twitch, используя токен стримера (user_token)
|
|
func (es *TwitchEventSub) subscribe(eventType, version string, condition map[string]string) error {
|
|
// Берём токен стримера (user_token), т.к. для подписки на follow нужны права модератора
|
|
token, err := es.twitchAPI.GetUserToken() // добавим этот метод в twitchapi
|
|
if err != nil {
|
|
return err
|
|
}
|
|
clientID := es.twitchAPI.GetClientID()
|
|
|
|
subReq := struct {
|
|
Type string `json:"type"`
|
|
Version string `json:"version"`
|
|
Condition map[string]string `json:"condition"`
|
|
Transport struct {
|
|
Method string `json:"method"`
|
|
SessionID string `json:"session_id"`
|
|
} `json:"transport"`
|
|
}{
|
|
Type: eventType,
|
|
Version: version,
|
|
Condition: condition,
|
|
Transport: struct {
|
|
Method string `json:"method"`
|
|
SessionID string `json:"session_id"`
|
|
}{
|
|
Method: "websocket",
|
|
SessionID: es.sessionID,
|
|
},
|
|
}
|
|
|
|
body, _ := json.Marshal(subReq)
|
|
url := "https://api.twitch.tv/helix/eventsub/subscriptions"
|
|
req, _ := http.NewRequest("POST", url, bytes.NewReader(body))
|
|
req.Header.Set("Authorization", "Bearer "+token)
|
|
req.Header.Set("Client-Id", clientID)
|
|
req.Header.Set("Content-Type", "application/json")
|
|
|
|
resp, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func(Body io.ReadCloser) {
|
|
_ = Body.Close()
|
|
}(resp.Body)
|
|
|
|
if resp.StatusCode != 202 {
|
|
errBody, _ := io.ReadAll(resp.Body)
|
|
logger.Error("Twitch API error when subscribing to %s: status %d, body: %s", eventType, resp.StatusCode, string(errBody))
|
|
return fmt.Errorf("subscription failed: %d %s", resp.StatusCode, string(errBody))
|
|
}
|
|
logger.Info("Subscription to %s created successfully", eventType)
|
|
return nil
|
|
}
|
|
|
|
func (es *TwitchEventSub) handleNotification(msg eventSubMessage) {
|
|
eventType := msg.Metadata.SubscriptionType
|
|
logger.Info("EventSub notification received: %s", eventType)
|
|
|
|
var params map[string]string
|
|
|
|
switch eventType {
|
|
case "channel.follow":
|
|
var data struct {
|
|
UserName string `json:"user_name"`
|
|
UserLogin string `json:"user_login"`
|
|
FollowedAt string `json:"followed_at"`
|
|
}
|
|
if err := json.Unmarshal(msg.Payload.Event, &data); err != nil {
|
|
logger.Error("Failed to parse follow event: %v", err)
|
|
return
|
|
}
|
|
logger.Info("Follow event: %s followed the channel at %s", data.UserName, data.FollowedAt)
|
|
params = map[string]string{
|
|
"username": data.UserName,
|
|
"user_login": data.UserLogin,
|
|
"followed_at": data.FollowedAt,
|
|
}
|
|
es.manager.OnEvent("twitch", "follow", params)
|
|
|
|
case "channel.subscribe":
|
|
var data struct {
|
|
UserName string `json:"user_name"`
|
|
Tier string `json:"tier"` // "1000", "2000", "3000"
|
|
IsGift bool `json:"is_gift"`
|
|
}
|
|
if err := json.Unmarshal(msg.Payload.Event, &data); err != nil {
|
|
logger.Error("Failed to parse subscribe event: %v", err)
|
|
return
|
|
}
|
|
logger.Info("Subscribe event: %s subscribed with tier %s (is_gift=%v)", data.UserName, data.Tier, data.IsGift)
|
|
params = map[string]string{
|
|
"username": data.UserName,
|
|
"tier": data.Tier,
|
|
"is_gift": fmt.Sprintf("%t", data.IsGift),
|
|
}
|
|
es.manager.OnEvent("twitch", "subscribe", params)
|
|
|
|
case "channel.subscription.gift":
|
|
var data struct {
|
|
UserName string `json:"user_name"` // даритель
|
|
RecipientName string `json:"recipient_user_name"`
|
|
Tier string `json:"tier"`
|
|
CumulativeTotal int `json:"cumulative_total"`
|
|
}
|
|
if err := json.Unmarshal(msg.Payload.Event, &data); err != nil {
|
|
logger.Error("Failed to parse gift sub event: %v", err)
|
|
return
|
|
}
|
|
logger.Info("Gift sub event: %s gifted %d subscription(s) (tier %s), recipient: %s", data.UserName, data.CumulativeTotal, data.Tier, data.RecipientName)
|
|
params = map[string]string{
|
|
"gifter": data.UserName,
|
|
"recipient": data.RecipientName,
|
|
"tier": data.Tier,
|
|
"cumulative_total": fmt.Sprintf("%d", data.CumulativeTotal),
|
|
}
|
|
es.manager.OnEvent("twitch", "gift_sub", params)
|
|
|
|
case "channel.raid":
|
|
var data struct {
|
|
FromBroadcasterName string `json:"from_broadcaster_user_name"`
|
|
Viewers int `json:"viewers"`
|
|
}
|
|
if err := json.Unmarshal(msg.Payload.Event, &data); err != nil {
|
|
logger.Error("Failed to parse raid event: %v", err)
|
|
return
|
|
}
|
|
logger.Info("Raid event: %s raided with %d viewers", data.FromBroadcasterName, data.Viewers)
|
|
params = map[string]string{
|
|
"from": data.FromBroadcasterName,
|
|
"viewers": fmt.Sprintf("%d", data.Viewers),
|
|
}
|
|
es.manager.OnEvent("twitch", "raid", params)
|
|
|
|
case "channel.channel_points_custom_reward_redemption.add":
|
|
var data struct {
|
|
UserName string `json:"user_name"`
|
|
Reward struct {
|
|
Title string `json:"title"`
|
|
Cost int `json:"cost"`
|
|
} `json:"reward"`
|
|
UserInput string `json:"user_input"`
|
|
}
|
|
if err := json.Unmarshal(msg.Payload.Event, &data); err != nil {
|
|
logger.Error("Failed to parse reward redemption event: %v", err)
|
|
return
|
|
}
|
|
logger.Info("Reward redemption: %s redeemed '%s' (cost %d) with input: %s", data.UserName, data.Reward.Title, data.Reward.Cost, data.UserInput)
|
|
params = map[string]string{
|
|
"username": data.UserName,
|
|
"reward_title": data.Reward.Title,
|
|
"reward_cost": fmt.Sprintf("%d", data.Reward.Cost),
|
|
"user_input": data.UserInput,
|
|
}
|
|
es.manager.OnEvent("twitch", "reward_redemption", params)
|
|
|
|
default:
|
|
logger.Warn("Unhandled event type: %s", eventType)
|
|
}
|
|
}
|
|
|
|
func (es *TwitchEventSub) IsConnected() bool {
|
|
es.mu.Lock()
|
|
defer es.mu.Unlock()
|
|
return es.conn != nil
|
|
}
|
|
|
|
func (es *TwitchEventSub) GetSubscriptions() []string {
|
|
es.mu.Lock()
|
|
defer es.mu.Unlock()
|
|
subs := make([]string, 0, len(es.subscriptions))
|
|
for s := range es.subscriptions {
|
|
subs = append(subs, s)
|
|
}
|
|
return subs
|
|
}
|