залил
This commit is contained in:
@@ -0,0 +1,213 @@
|
||||
package platforms
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"stream-bot/internal/commands"
|
||||
"stream-bot/internal/db"
|
||||
"stream-bot/internal/events"
|
||||
"stream-bot/internal/logger"
|
||||
"stream-bot/internal/notifications"
|
||||
"stream-bot/internal/userstats"
|
||||
"stream-bot/internal/webservices"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Platform interface {
|
||||
Connect() error
|
||||
Disconnect()
|
||||
SendMessage(text string) error
|
||||
GetName() string
|
||||
}
|
||||
|
||||
type Manager struct {
|
||||
platforms map[string]Platform
|
||||
cmdProc *commands.Processor
|
||||
eventProc *events.Processor
|
||||
mu sync.RWMutex
|
||||
userStats *userstats.Store
|
||||
notifMgr *notifications.Manager
|
||||
webServices *webservices.Manager
|
||||
}
|
||||
|
||||
func NewManager(cmdProc *commands.Processor, eventProc *events.Processor, notifMgr *notifications.Manager, webSrv *webservices.Manager, twitchClientID, twitchClientSecret string) *Manager {
|
||||
m := &Manager{
|
||||
platforms: make(map[string]Platform),
|
||||
cmdProc: cmdProc,
|
||||
eventProc: eventProc,
|
||||
userStats: userstats.NewStore(),
|
||||
notifMgr: notifMgr,
|
||||
webServices: webSrv,
|
||||
}
|
||||
m.platforms["twitch"] = NewTwitchPlatform(m, twitchClientID, twitchClientSecret)
|
||||
return m
|
||||
}
|
||||
|
||||
func (m *Manager) ConnectAll() {
|
||||
for name, p := range m.platforms {
|
||||
if err := p.Connect(); err != nil {
|
||||
logger.Error("Failed to connect %s: %v", name, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) StopAll() {
|
||||
for _, p := range m.platforms {
|
||||
p.Disconnect()
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) GetPlatform(name string) Platform {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
return m.platforms[name]
|
||||
}
|
||||
|
||||
func (m *Manager) OnChatMessage(platform, channel, username, message string, isMod, isBroadcaster, isVip, isSubscriber bool) {
|
||||
// Обновляем статистику пользователя
|
||||
m.userStats.Update(username, func(u *userstats.UserStats) {
|
||||
u.MessageCount++
|
||||
u.LastActive = time.Now()
|
||||
u.IsMod = isMod
|
||||
u.IsVip = isVip
|
||||
u.IsSubscriber = isSubscriber
|
||||
})
|
||||
if m.notifMgr != nil {
|
||||
_ = m.notifMgr.PlayEvent("new_message")
|
||||
}
|
||||
m.webServices.SendChatMessage(webservices.ChatMessage{
|
||||
Username: username,
|
||||
Message: message,
|
||||
IsMod: isMod,
|
||||
IsVip: isVip,
|
||||
IsSub: isSubscriber,
|
||||
Timestamp: time.Now().Unix(),
|
||||
})
|
||||
// Обработка команды
|
||||
if len(message) == 0 || message[0] != '!' {
|
||||
// Проверка на отметку: если пользователь отмечен и это его первое сообщение за стрим (сегодня)
|
||||
m.checkAndSendMarkNotification(username, platform, channel)
|
||||
return
|
||||
}
|
||||
|
||||
parts := strings.SplitN(message, " ", 2)
|
||||
trigger := strings.TrimPrefix(parts[0], "!")
|
||||
args := ""
|
||||
if len(parts) > 1 {
|
||||
args = parts[1]
|
||||
}
|
||||
resp, _, err := m.cmdProc.ProcessCommand(trigger, username, platform, isMod, isBroadcaster, args)
|
||||
if err != nil {
|
||||
logger.Error("Command error: %v", err)
|
||||
return
|
||||
}
|
||||
if resp != "" {
|
||||
if p := m.GetPlatform(platform); p != nil {
|
||||
_ = p.SendMessage(resp)
|
||||
}
|
||||
}
|
||||
// После команды тоже проверяем отметку (можно вынести в общее место)
|
||||
m.checkAndSendMarkNotification(username, platform, channel)
|
||||
}
|
||||
|
||||
// checkAndSendMarkNotification отправляет сообщение, если пользователь отмечен и сегодня ещё не отмечали
|
||||
func (m *Manager) checkAndSendMarkNotification(username, platform, channel string) {
|
||||
marked, lastDate, err := db.IsUserMarked(username, platform)
|
||||
if err != nil {
|
||||
logger.Error("Failed to check marked user: %v", err)
|
||||
return
|
||||
}
|
||||
if !marked {
|
||||
return
|
||||
}
|
||||
today := time.Now().Format("2006-01-02")
|
||||
if lastDate == today {
|
||||
return
|
||||
}
|
||||
// Отправляем сообщение с упоминанием пользователя, а не канала
|
||||
msg := fmt.Sprintf("Время кое-кого отметить! Отмечен @%s", username)
|
||||
if p := m.GetPlatform(platform); p != nil {
|
||||
_ = p.SendMessage(msg)
|
||||
}
|
||||
_ = db.UpdateMarkedUserDate(username, platform, time.Now())
|
||||
}
|
||||
|
||||
func (m *Manager) OnEvent(platform, eventName string, params map[string]string) {
|
||||
m.eventProc.ProcessEvent(platform, eventName, params)
|
||||
if m.webServices != nil {
|
||||
// Преобразуем map[string]string в map[string]interface{}
|
||||
data := make(map[string]interface{})
|
||||
for k, v := range params {
|
||||
data[k] = v
|
||||
}
|
||||
// Исправленный вызов:
|
||||
m.webServices.SendAlertEvent(webservices.AlertEvent{
|
||||
Type: eventName,
|
||||
Data: data,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) IsConnected(platform string) bool {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
if p, ok := m.platforms[platform]; ok {
|
||||
if tw, ok := p.(*TwitchPlatform); ok {
|
||||
return tw.IsConnected()
|
||||
}
|
||||
// для других платформ можно добавить аналогично
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (m *Manager) GetAllUsers() []*userstats.UserStats {
|
||||
return m.userStats.GetAll()
|
||||
}
|
||||
|
||||
func (m *Manager) UpdateUserFlags(username string, isVip, isMod, isSubscriber bool) {
|
||||
m.userStats.Update(username, func(u *userstats.UserStats) {
|
||||
if isVip {
|
||||
u.IsVip = isVip
|
||||
}
|
||||
if isMod {
|
||||
u.IsMod = isMod
|
||||
}
|
||||
if isSubscriber {
|
||||
u.IsSubscriber = isSubscriber
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (m *Manager) UpdateUserMarked(username string, marked bool) {
|
||||
m.userStats.Update(username, func(u *userstats.UserStats) {
|
||||
u.IsMarked = marked
|
||||
})
|
||||
}
|
||||
|
||||
func (m *Manager) SetVip(username string, isVip bool) {
|
||||
m.userStats.Update(username, func(u *userstats.UserStats) {
|
||||
u.IsVip = isVip
|
||||
})
|
||||
}
|
||||
|
||||
func (m *Manager) SetMod(username string, isMod bool) {
|
||||
m.userStats.Update(username, func(u *userstats.UserStats) {
|
||||
u.IsMod = isMod
|
||||
})
|
||||
}
|
||||
|
||||
func (m *Manager) SetMarked(username string, isMarked bool) {
|
||||
m.userStats.Update(username, func(u *userstats.UserStats) {
|
||||
u.IsMarked = isMarked
|
||||
})
|
||||
}
|
||||
|
||||
func (m *Manager) GetTwitchEventSubStatus() (connected bool, subscriptions []string, err error) {
|
||||
tw, ok := m.platforms["twitch"].(*TwitchPlatform)
|
||||
if !ok {
|
||||
return false, nil, fmt.Errorf("twitch platform not available")
|
||||
}
|
||||
connected, subscriptions = tw.EventSubStatus()
|
||||
return connected, subscriptions, nil
|
||||
}
|
||||
@@ -0,0 +1,234 @@
|
||||
package platforms
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"stream-bot/internal/db"
|
||||
"stream-bot/internal/logger"
|
||||
"stream-bot/internal/twitchapi"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/gempir/go-twitch-irc/v4"
|
||||
)
|
||||
|
||||
type TwitchPlatform struct {
|
||||
client *twitch.Client
|
||||
manager *Manager
|
||||
channel string
|
||||
botLogin string
|
||||
connected bool
|
||||
mu sync.RWMutex
|
||||
eventSub *TwitchEventSub
|
||||
twitchAPI *twitchapi.TwitchAPI
|
||||
}
|
||||
|
||||
func NewTwitchPlatform(mgr *Manager, clientID, clientSecret string) *TwitchPlatform {
|
||||
twitchAPI := twitchapi.New(clientID, clientSecret)
|
||||
return &TwitchPlatform{
|
||||
manager: mgr,
|
||||
twitchAPI: twitchAPI,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *TwitchPlatform) GetName() string {
|
||||
return "twitch"
|
||||
}
|
||||
|
||||
func (t *TwitchPlatform) Connect() error {
|
||||
tokens, err := db.GetPlatformTokens("twitch")
|
||||
if err != nil || tokens == nil || tokens.BotToken == "" {
|
||||
logger.Warn("Twitch bot token not set, skipping connection")
|
||||
return fmt.Errorf("no bot token")
|
||||
}
|
||||
t.botLogin = tokens.BotLogin
|
||||
if t.botLogin == "" {
|
||||
t.botLogin = "justinfan123"
|
||||
}
|
||||
t.channel = tokens.UserLogin
|
||||
if t.channel == "" {
|
||||
logger.Warn("Twitch user login not set, cannot join channel")
|
||||
return fmt.Errorf("no channel name")
|
||||
}
|
||||
|
||||
t.client = twitch.NewClient(t.botLogin, "oauth:"+tokens.BotToken)
|
||||
t.client.OnPrivateMessage(func(msg twitch.PrivateMessage) {
|
||||
badges := msg.Tags["badges"]
|
||||
isMod := strings.Contains(badges, "moderator/1")
|
||||
isVip := strings.Contains(badges, "vip/1")
|
||||
isSubscriber := strings.Contains(badges, "subscriber/")
|
||||
isBroadcaster := strings.Contains(badges, "broadcaster/1")
|
||||
|
||||
t.manager.OnChatMessage("twitch", msg.Channel, msg.User.Name, msg.Message, isMod, isBroadcaster, isVip, isSubscriber)
|
||||
})
|
||||
|
||||
t.client.Join(t.channel)
|
||||
go func() {
|
||||
_ = t.client.Connect()
|
||||
}()
|
||||
|
||||
t.mu.Lock()
|
||||
t.connected = true
|
||||
t.mu.Unlock()
|
||||
|
||||
// EventSub использует уже существующий twitchAPI
|
||||
t.eventSub = NewTwitchEventSub(t.manager, t.twitchAPI)
|
||||
if err := t.eventSub.Start(); err != nil {
|
||||
logger.Warn("Failed to start EventSub: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TwitchPlatform) Disconnect() {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
if t.eventSub != nil {
|
||||
t.eventSub.Stop()
|
||||
}
|
||||
if t.client != nil {
|
||||
t.client.Disconnect()
|
||||
}
|
||||
t.connected = false
|
||||
}
|
||||
|
||||
func (t *TwitchPlatform) IsConnected() bool {
|
||||
t.mu.RLock()
|
||||
defer t.mu.RUnlock()
|
||||
return t.connected
|
||||
}
|
||||
|
||||
func (t *TwitchPlatform) SendMessage(text string) error {
|
||||
if t.client == nil {
|
||||
return fmt.Errorf("not connected")
|
||||
}
|
||||
t.client.Say(t.channel, text)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TwitchPlatform) TimeoutUser(username string, seconds int) {
|
||||
t.client.Say(t.channel, fmt.Sprintf("/timeout %s %d", username, seconds))
|
||||
}
|
||||
|
||||
func (t *TwitchPlatform) BanUser(username string) {
|
||||
t.client.Say(t.channel, fmt.Sprintf("/ban %s", username))
|
||||
}
|
||||
|
||||
func (t *TwitchPlatform) UnbanUser(username string) {
|
||||
t.client.Say(t.channel, fmt.Sprintf("/unban %s", username))
|
||||
}
|
||||
|
||||
func (t *TwitchPlatform) AddVip(username string) {
|
||||
t.client.Say(t.channel, fmt.Sprintf("/vip %s", username))
|
||||
}
|
||||
|
||||
func (t *TwitchPlatform) RemoveVip(username string) {
|
||||
t.client.Say(t.channel, fmt.Sprintf("/unvip %s", username))
|
||||
}
|
||||
|
||||
func (t *TwitchPlatform) AddMod(username string) {
|
||||
t.client.Say(t.channel, fmt.Sprintf("/mod %s", username))
|
||||
}
|
||||
|
||||
func (t *TwitchPlatform) RemoveMod(username string) {
|
||||
t.client.Say(t.channel, fmt.Sprintf("/unmod %s", username))
|
||||
}
|
||||
|
||||
// GetClientID удалён – используем t.twitchAPI.GetClientID() при необходимости
|
||||
|
||||
func (t *TwitchPlatform) EventSubStatus() (connected bool, subscriptions []string) {
|
||||
if t.eventSub == nil {
|
||||
return false, nil
|
||||
}
|
||||
return t.eventSub.IsConnected(), t.eventSub.GetSubscriptions()
|
||||
}
|
||||
|
||||
// TimeoutUserViaAPI таймаут через API
|
||||
func (t *TwitchPlatform) TimeoutUserViaAPI(username string, seconds int) error {
|
||||
broadcasterID, err := t.twitchAPI.GetBroadcasterID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
moderatorID := broadcasterID
|
||||
userID, err := t.twitchAPI.GetUserID(username)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return t.twitchAPI.TimeoutUser(broadcasterID, moderatorID, userID, seconds)
|
||||
}
|
||||
|
||||
// BanUserViaAPI бан через API
|
||||
func (t *TwitchPlatform) BanUserViaAPI(username string) error {
|
||||
broadcasterID, err := t.twitchAPI.GetBroadcasterID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
userID, err := t.twitchAPI.GetUserID(username)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return t.twitchAPI.BanUser(broadcasterID, broadcasterID, userID)
|
||||
}
|
||||
|
||||
// UnbanUserViaAPI разбан через API
|
||||
func (t *TwitchPlatform) UnbanUserViaAPI(username string) error {
|
||||
broadcasterID, err := t.twitchAPI.GetBroadcasterID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
userID, err := t.twitchAPI.GetUserID(username)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return t.twitchAPI.UnbanUser(broadcasterID, broadcasterID, userID)
|
||||
}
|
||||
|
||||
// AddVipViaAPI добавить VIP
|
||||
func (t *TwitchPlatform) AddVipViaAPI(username string) error {
|
||||
broadcasterID, err := t.twitchAPI.GetBroadcasterID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
userID, err := t.twitchAPI.GetUserID(username)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return t.twitchAPI.AddVip(broadcasterID, userID)
|
||||
}
|
||||
|
||||
// RemoveVipViaAPI удалить VIP
|
||||
func (t *TwitchPlatform) RemoveVipViaAPI(username string) error {
|
||||
broadcasterID, err := t.twitchAPI.GetBroadcasterID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
userID, err := t.twitchAPI.GetUserID(username)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return t.twitchAPI.RemoveVip(broadcasterID, userID)
|
||||
}
|
||||
|
||||
// AddModViaAPI добавить модератора
|
||||
func (t *TwitchPlatform) AddModViaAPI(username string) error {
|
||||
broadcasterID, err := t.twitchAPI.GetBroadcasterID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
userID, err := t.twitchAPI.GetUserID(username)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return t.twitchAPI.AddMod(broadcasterID, userID)
|
||||
}
|
||||
|
||||
// RemoveModViaAPI удалить модератора
|
||||
func (t *TwitchPlatform) RemoveModViaAPI(username string) error {
|
||||
broadcasterID, err := t.twitchAPI.GetBroadcasterID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
userID, err := t.twitchAPI.GetUserID(username)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return t.twitchAPI.RemoveMod(broadcasterID, userID)
|
||||
}
|
||||
@@ -0,0 +1,134 @@
|
||||
package platforms
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"stream-bot/internal/logger"
|
||||
"time"
|
||||
)
|
||||
|
||||
type TwitchAuth struct {
|
||||
clientID string
|
||||
clientSecret string
|
||||
redirectURI string
|
||||
server *http.Server
|
||||
waitCh chan string
|
||||
}
|
||||
|
||||
func NewTwitchAuth(clientID, clientSecret string) *TwitchAuth {
|
||||
return &TwitchAuth{
|
||||
clientID: clientID,
|
||||
clientSecret: clientSecret,
|
||||
redirectURI: "http://localhost:8089",
|
||||
waitCh: make(chan string, 1),
|
||||
}
|
||||
}
|
||||
|
||||
func (ta *TwitchAuth) GenerateAuthURL(scope []string, state string) string {
|
||||
url := "https://id.twitch.tv/oauth2/authorize?" +
|
||||
"client_id=" + ta.clientID +
|
||||
"&redirect_uri=" + ta.redirectURI +
|
||||
"&response_type=token" +
|
||||
"&scope=" + scopeString(scope) +
|
||||
"&state=" + state
|
||||
return url
|
||||
}
|
||||
|
||||
func scopeString(scopes []string) string {
|
||||
s := ""
|
||||
for i, sc := range scopes {
|
||||
if i > 0 {
|
||||
s += "+"
|
||||
}
|
||||
s += sc
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (ta *TwitchAuth) StartTempServer() error {
|
||||
if ta.server != nil {
|
||||
return nil
|
||||
}
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/", ta.handleCallback)
|
||||
ta.server = &http.Server{
|
||||
Addr: ":8089",
|
||||
Handler: mux,
|
||||
}
|
||||
go func() {
|
||||
if err := ta.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||||
logger.Error("OAuth server error: %v", err)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ta *TwitchAuth) StopTempServer() {
|
||||
if ta.server != nil {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
_ = ta.server.Shutdown(ctx)
|
||||
ta.server = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (ta *TwitchAuth) handleCallback(w http.ResponseWriter, r *http.Request) {
|
||||
html := `
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<title>Twitch Auth</title>
|
||||
<style>
|
||||
body { font-family: sans-serif; text-align: center; margin-top: 50px; }
|
||||
.success { color: green; }
|
||||
.error { color: red; }
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<h3>Авторизация Twitch</h3>
|
||||
<p>Обработка токена...</p>
|
||||
<script>
|
||||
const hash = window.location.hash.substring(1);
|
||||
const params = new URLSearchParams(hash);
|
||||
const accessToken = params.get('access_token');
|
||||
const state = params.get('state');
|
||||
if (accessToken && state) {
|
||||
fetch('http://localhost:8080/api/platforms/twitch/auth/callback?token=' + encodeURIComponent(accessToken) + '&state=' + encodeURIComponent(state))
|
||||
.then(res => {
|
||||
if (res.ok) return res.text();
|
||||
throw new Error('Server error: ' + res.status);
|
||||
})
|
||||
.then(data => {
|
||||
document.body.innerHTML = '<h3 class="success">✅ Токен успешно сохранён! Теперь можно закрыть эту вкладку.</h3>';
|
||||
})
|
||||
.catch(err => {
|
||||
document.body.innerHTML = '<h3 class="error">❌ Ошибка сохранения токена: ' + err.message + '</h3><p>Пожалуйста, закройте это окно и проверьте настройки в боте.</p>';
|
||||
});
|
||||
} else {
|
||||
document.body.innerHTML = '<h3 class="error">❌ Токен не получен или отсутствует state</h3>';
|
||||
}
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
`
|
||||
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
||||
_, _ = w.Write([]byte(html))
|
||||
}
|
||||
|
||||
func (ta *TwitchAuth) WaitForToken(timeout time.Duration) (string, error) {
|
||||
select {
|
||||
case token := <-ta.waitCh:
|
||||
return token, nil
|
||||
case <-time.After(timeout):
|
||||
return "", fmt.Errorf("timeout waiting for token")
|
||||
}
|
||||
}
|
||||
|
||||
func (ta *TwitchAuth) SetTokenCallback(token string) {
|
||||
select {
|
||||
case ta.waitCh <- token:
|
||||
default:
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,416 @@
|
||||
package platforms
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"stream-bot/internal/logger"
|
||||
"stream-bot/internal/twitchapi"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
// TwitchEventSub реализует клиент EventSub через WebSocket
|
||||
type TwitchEventSub struct {
|
||||
manager *Manager
|
||||
twitchAPI *twitchapi.TwitchAPI
|
||||
conn *websocket.Conn
|
||||
sessionID string
|
||||
subscriptions map[string]bool // eventType -> подписана ли
|
||||
mu sync.Mutex
|
||||
stopCh chan struct{}
|
||||
doneCh chan struct{}
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// Структуры сообщений EventSub
|
||||
type eventSubMessage struct {
|
||||
Metadata struct {
|
||||
MessageID string `json:"message_id"`
|
||||
MessageType string `json:"message_type"` // session_welcome, session_keepalive, notification, session_revoke
|
||||
MessageTimestamp string `json:"message_timestamp"`
|
||||
SessionID string `json:"session_id,omitempty"`
|
||||
SubscriptionType string `json:"subscription_type,omitempty"`
|
||||
SubscriptionVersion string `json:"subscription_version,omitempty"`
|
||||
} `json:"metadata"`
|
||||
Payload struct {
|
||||
Session struct {
|
||||
ID string `json:"id"`
|
||||
Status string `json:"status"`
|
||||
ConnectedAt string `json:"connected_at"`
|
||||
KeepaliveTimeoutSeconds int `json:"keepalive_timeout_seconds"`
|
||||
ReconnectURL string `json:"reconnect_url"`
|
||||
} `json:"session,omitempty"`
|
||||
Subscription struct {
|
||||
ID string `json:"id"`
|
||||
Status string `json:"status"`
|
||||
Type string `json:"type"`
|
||||
Version string `json:"version"`
|
||||
Condition map[string]string `json:"condition"`
|
||||
Transport struct {
|
||||
Method string `json:"method"`
|
||||
SessionID string `json:"session_id"`
|
||||
} `json:"transport"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
} `json:"subscription,omitempty"`
|
||||
Event json.RawMessage `json:"event,omitempty"`
|
||||
} `json:"payload"`
|
||||
}
|
||||
|
||||
func NewTwitchEventSub(manager *Manager, twitchAPI *twitchapi.TwitchAPI) *TwitchEventSub {
|
||||
return &TwitchEventSub{
|
||||
manager: manager,
|
||||
twitchAPI: twitchAPI,
|
||||
subscriptions: make(map[string]bool),
|
||||
stopCh: make(chan struct{}),
|
||||
doneCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (es *TwitchEventSub) Start() error {
|
||||
es.ctx, es.cancel = context.WithCancel(context.Background())
|
||||
logger.Info("Starting Twitch EventSub WebSocket client...")
|
||||
go es.connect()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (es *TwitchEventSub) Stop() {
|
||||
if es.cancel != nil {
|
||||
es.cancel()
|
||||
}
|
||||
if es.conn != nil {
|
||||
_ = es.conn.Close()
|
||||
}
|
||||
select {
|
||||
case <-es.doneCh:
|
||||
logger.Info("Twitch EventSub stopped gracefully")
|
||||
case <-time.After(3 * time.Second):
|
||||
logger.Warn("Twitch EventSub stop timeout")
|
||||
}
|
||||
}
|
||||
|
||||
func (es *TwitchEventSub) connect() {
|
||||
defer close(es.doneCh)
|
||||
for {
|
||||
select {
|
||||
case <-es.ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
conn, _, err := websocket.DefaultDialer.Dial("wss://eventsub.wss.twitch.tv/ws", nil)
|
||||
if err != nil {
|
||||
logger.Error("EventSub WebSocket dial error: %v, reconnecting...", err)
|
||||
select {
|
||||
case <-es.ctx.Done():
|
||||
return
|
||||
case <-time.After(5 * time.Second):
|
||||
continue
|
||||
}
|
||||
}
|
||||
es.mu.Lock()
|
||||
es.conn = conn
|
||||
es.mu.Unlock()
|
||||
|
||||
err = es.readLoop(conn)
|
||||
if err != nil {
|
||||
logger.Error("EventSub read loop error: %v, reconnecting...", err)
|
||||
_ = conn.Close()
|
||||
select {
|
||||
case <-es.ctx.Done():
|
||||
return
|
||||
case <-time.After(5 * time.Second):
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-es.ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (es *TwitchEventSub) readLoop(conn *websocket.Conn) error {
|
||||
for {
|
||||
select {
|
||||
case <-es.ctx.Done():
|
||||
return es.ctx.Err()
|
||||
default:
|
||||
}
|
||||
_, msg, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var envelope eventSubMessage
|
||||
if err := json.Unmarshal(msg, &envelope); err != nil {
|
||||
logger.Error("Failed to parse EventSub message: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
switch envelope.Metadata.MessageType {
|
||||
case "session_welcome":
|
||||
es.handleWelcome(envelope)
|
||||
case "session_keepalive":
|
||||
// Ничего не логируем, чтобы не засорять логи
|
||||
case "notification":
|
||||
es.handleNotification(envelope)
|
||||
case "session_revoke":
|
||||
logger.Warn("EventSub session revoked, will reconnect")
|
||||
return fmt.Errorf("session revoked")
|
||||
default:
|
||||
logger.Warn("Unknown EventSub message type: %s", envelope.Metadata.MessageType)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (es *TwitchEventSub) handleWelcome(msg eventSubMessage) {
|
||||
es.sessionID = msg.Payload.Session.ID
|
||||
logger.Info("EventSub connected, session ID: %s", es.sessionID)
|
||||
|
||||
// После получения welcome подписываемся на события
|
||||
broadcasterID, err := es.twitchAPI.GetBroadcasterID()
|
||||
if err != nil {
|
||||
logger.Error("Cannot get broadcaster ID for subscriptions: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Список событий для подписки
|
||||
subscriptions := []struct {
|
||||
Type string
|
||||
Version string
|
||||
Condition map[string]string
|
||||
}{
|
||||
{
|
||||
Type: "channel.follow",
|
||||
Version: "2",
|
||||
Condition: map[string]string{
|
||||
"broadcaster_user_id": broadcasterID,
|
||||
"moderator_user_id": broadcasterID, // используем ID стримера как модератора
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: "channel.subscribe",
|
||||
Version: "1",
|
||||
Condition: map[string]string{
|
||||
"broadcaster_user_id": broadcasterID,
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: "channel.subscription.gift",
|
||||
Version: "1",
|
||||
Condition: map[string]string{
|
||||
"broadcaster_user_id": broadcasterID,
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: "channel.raid",
|
||||
Version: "1",
|
||||
Condition: map[string]string{
|
||||
"to_broadcaster_user_id": broadcasterID,
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: "channel.channel_points_custom_reward_redemption.add",
|
||||
Version: "1",
|
||||
Condition: map[string]string{
|
||||
"broadcaster_user_id": broadcasterID,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, sub := range subscriptions {
|
||||
if err := es.subscribe(sub.Type, sub.Version, sub.Condition); err != nil {
|
||||
logger.Error("Failed to subscribe to %s: %v", sub.Type, err)
|
||||
} else {
|
||||
es.mu.Lock()
|
||||
es.subscriptions[sub.Type] = true
|
||||
es.mu.Unlock()
|
||||
logger.Info("Subscribed to %s", sub.Type)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// subscribe создаёт подписку через API Twitch, используя токен стримера (user_token)
|
||||
func (es *TwitchEventSub) subscribe(eventType, version string, condition map[string]string) error {
|
||||
// Берём токен стримера (user_token), т.к. для подписки на follow нужны права модератора
|
||||
token, err := es.twitchAPI.GetUserToken() // добавим этот метод в twitchapi
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
clientID := es.twitchAPI.GetClientID()
|
||||
|
||||
subReq := struct {
|
||||
Type string `json:"type"`
|
||||
Version string `json:"version"`
|
||||
Condition map[string]string `json:"condition"`
|
||||
Transport struct {
|
||||
Method string `json:"method"`
|
||||
SessionID string `json:"session_id"`
|
||||
} `json:"transport"`
|
||||
}{
|
||||
Type: eventType,
|
||||
Version: version,
|
||||
Condition: condition,
|
||||
Transport: struct {
|
||||
Method string `json:"method"`
|
||||
SessionID string `json:"session_id"`
|
||||
}{
|
||||
Method: "websocket",
|
||||
SessionID: es.sessionID,
|
||||
},
|
||||
}
|
||||
|
||||
body, _ := json.Marshal(subReq)
|
||||
url := "https://api.twitch.tv/helix/eventsub/subscriptions"
|
||||
req, _ := http.NewRequest("POST", url, bytes.NewReader(body))
|
||||
req.Header.Set("Authorization", "Bearer "+token)
|
||||
req.Header.Set("Client-Id", clientID)
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func(Body io.ReadCloser) {
|
||||
_ = Body.Close()
|
||||
}(resp.Body)
|
||||
|
||||
if resp.StatusCode != 202 {
|
||||
errBody, _ := io.ReadAll(resp.Body)
|
||||
logger.Error("Twitch API error when subscribing to %s: status %d, body: %s", eventType, resp.StatusCode, string(errBody))
|
||||
return fmt.Errorf("subscription failed: %d %s", resp.StatusCode, string(errBody))
|
||||
}
|
||||
logger.Info("Subscription to %s created successfully", eventType)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (es *TwitchEventSub) handleNotification(msg eventSubMessage) {
|
||||
eventType := msg.Metadata.SubscriptionType
|
||||
logger.Info("EventSub notification received: %s", eventType)
|
||||
|
||||
var params map[string]string
|
||||
|
||||
switch eventType {
|
||||
case "channel.follow":
|
||||
var data struct {
|
||||
UserName string `json:"user_name"`
|
||||
UserLogin string `json:"user_login"`
|
||||
FollowedAt string `json:"followed_at"`
|
||||
}
|
||||
if err := json.Unmarshal(msg.Payload.Event, &data); err != nil {
|
||||
logger.Error("Failed to parse follow event: %v", err)
|
||||
return
|
||||
}
|
||||
logger.Info("Follow event: %s followed the channel at %s", data.UserName, data.FollowedAt)
|
||||
params = map[string]string{
|
||||
"username": data.UserName,
|
||||
"user_login": data.UserLogin,
|
||||
"followed_at": data.FollowedAt,
|
||||
}
|
||||
es.manager.OnEvent("twitch", "follow", params)
|
||||
|
||||
case "channel.subscribe":
|
||||
var data struct {
|
||||
UserName string `json:"user_name"`
|
||||
Tier string `json:"tier"` // "1000", "2000", "3000"
|
||||
IsGift bool `json:"is_gift"`
|
||||
}
|
||||
if err := json.Unmarshal(msg.Payload.Event, &data); err != nil {
|
||||
logger.Error("Failed to parse subscribe event: %v", err)
|
||||
return
|
||||
}
|
||||
logger.Info("Subscribe event: %s subscribed with tier %s (is_gift=%v)", data.UserName, data.Tier, data.IsGift)
|
||||
params = map[string]string{
|
||||
"username": data.UserName,
|
||||
"tier": data.Tier,
|
||||
"is_gift": fmt.Sprintf("%t", data.IsGift),
|
||||
}
|
||||
es.manager.OnEvent("twitch", "subscribe", params)
|
||||
|
||||
case "channel.subscription.gift":
|
||||
var data struct {
|
||||
UserName string `json:"user_name"` // даритель
|
||||
RecipientName string `json:"recipient_user_name"`
|
||||
Tier string `json:"tier"`
|
||||
CumulativeTotal int `json:"cumulative_total"`
|
||||
}
|
||||
if err := json.Unmarshal(msg.Payload.Event, &data); err != nil {
|
||||
logger.Error("Failed to parse gift sub event: %v", err)
|
||||
return
|
||||
}
|
||||
logger.Info("Gift sub event: %s gifted %d subscription(s) (tier %s), recipient: %s", data.UserName, data.CumulativeTotal, data.Tier, data.RecipientName)
|
||||
params = map[string]string{
|
||||
"gifter": data.UserName,
|
||||
"recipient": data.RecipientName,
|
||||
"tier": data.Tier,
|
||||
"cumulative_total": fmt.Sprintf("%d", data.CumulativeTotal),
|
||||
}
|
||||
es.manager.OnEvent("twitch", "gift_sub", params)
|
||||
|
||||
case "channel.raid":
|
||||
var data struct {
|
||||
FromBroadcasterName string `json:"from_broadcaster_user_name"`
|
||||
Viewers int `json:"viewers"`
|
||||
}
|
||||
if err := json.Unmarshal(msg.Payload.Event, &data); err != nil {
|
||||
logger.Error("Failed to parse raid event: %v", err)
|
||||
return
|
||||
}
|
||||
logger.Info("Raid event: %s raided with %d viewers", data.FromBroadcasterName, data.Viewers)
|
||||
params = map[string]string{
|
||||
"from": data.FromBroadcasterName,
|
||||
"viewers": fmt.Sprintf("%d", data.Viewers),
|
||||
}
|
||||
es.manager.OnEvent("twitch", "raid", params)
|
||||
|
||||
case "channel.channel_points_custom_reward_redemption.add":
|
||||
var data struct {
|
||||
UserName string `json:"user_name"`
|
||||
Reward struct {
|
||||
Title string `json:"title"`
|
||||
Cost int `json:"cost"`
|
||||
} `json:"reward"`
|
||||
UserInput string `json:"user_input"`
|
||||
}
|
||||
if err := json.Unmarshal(msg.Payload.Event, &data); err != nil {
|
||||
logger.Error("Failed to parse reward redemption event: %v", err)
|
||||
return
|
||||
}
|
||||
logger.Info("Reward redemption: %s redeemed '%s' (cost %d) with input: %s", data.UserName, data.Reward.Title, data.Reward.Cost, data.UserInput)
|
||||
params = map[string]string{
|
||||
"username": data.UserName,
|
||||
"reward_title": data.Reward.Title,
|
||||
"reward_cost": fmt.Sprintf("%d", data.Reward.Cost),
|
||||
"user_input": data.UserInput,
|
||||
}
|
||||
es.manager.OnEvent("twitch", "reward_redemption", params)
|
||||
|
||||
default:
|
||||
logger.Warn("Unhandled event type: %s", eventType)
|
||||
}
|
||||
}
|
||||
|
||||
func (es *TwitchEventSub) IsConnected() bool {
|
||||
es.mu.Lock()
|
||||
defer es.mu.Unlock()
|
||||
return es.conn != nil
|
||||
}
|
||||
|
||||
func (es *TwitchEventSub) GetSubscriptions() []string {
|
||||
es.mu.Lock()
|
||||
defer es.mu.Unlock()
|
||||
subs := make([]string, 0, len(es.subscriptions))
|
||||
for s := range es.subscriptions {
|
||||
subs = append(subs, s)
|
||||
}
|
||||
return subs
|
||||
}
|
||||
Reference in New Issue
Block a user