sojuboy/internal/httpapi/server.go

846 lines
24 KiB
Go
Raw Normal View History

package httpapi
import (
"context"
"encoding/json"
"fmt"
"io/fs"
"log/slog"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"sojuboy/internal/store"
"sojuboy/internal/summarizer"
xhtml "golang.org/x/net/html"
)
type Metrics struct {
MessagesIngested int64 // counter
NotificationsSent int64 // counter
MessagesPruned int64 // counter
ConnectedGauge int64 // 0/1
}
type Server struct {
ListenAddr string
AuthToken string
Store *store.Store
Summarizer summarizer.Summarizer
Notifier interface {
Notify(context.Context, string, string) error
}
Logger *slog.Logger
Metrics *Metrics
Ready func() bool
// Optional timeout override for summarizer
SummarizerTimeout time.Duration
// Build/runtime info for UI
Version string
Commit string
BuiltAt string
StartedAt time.Time
// Optional seed list from config for /api/channels when DB is empty
KnownChannels []string
// SSE subscribers map
subs map[string][]chan store.Message
subsMu sync.RWMutex
// Link card cache
cardCache map[string]linkCard
cardCacheExp map[string]time.Time
// Link summary cache
summaryCache map[string]string
summaryCacheExp map[string]time.Time
}
func (s *Server) Start(ctx context.Context) error {
mux := http.NewServeMux()
// Minimal web UI (templated)
mux.HandleFunc("/", s.handleUIDash)
mux.HandleFunc("/summarizer", s.handleUISummarizer)
// Serve embedded static files under /static/
if sub, err := fs.Sub(staticFS, "static"); err == nil {
mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.FS(sub))))
}
mux.HandleFunc("/login", s.handleLogin)
mux.HandleFunc("/auth", s.handleAuth)
mux.HandleFunc("/logout", s.handleLogout)
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok"))
})
mux.HandleFunc("/ready", func(w http.ResponseWriter, r *http.Request) {
if s.Ready != nil && !s.Ready() {
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write([]byte("not ready"))
return
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ready"))
})
mux.HandleFunc("/trigger", s.handleTrigger)
mux.HandleFunc("/tail", s.handleTail)
mux.HandleFunc("/metrics", s.handleMetrics)
// JSON endpoints for UI
mux.HandleFunc("/api/info", s.handleInfo)
mux.HandleFunc("/api/channels", s.handleChannels)
mux.HandleFunc("/api/tail", s.handleTailJSON)
mux.HandleFunc("/api/trigger", s.handleTriggerJSON)
mux.HandleFunc("/api/history", s.handleHistory)
mux.HandleFunc("/api/stream", s.handleStream)
mux.HandleFunc("/api/linkcard", s.handleLinkCard)
mux.HandleFunc("/api/linksummary", s.handleLinkSummary)
srv := &http.Server{
Addr: s.ListenAddr,
Handler: mux,
}
go func() {
<-ctx.Done()
_ = srv.Shutdown(context.Background())
}()
if s.Logger != nil {
s.Logger.Info("http listening", "addr", s.ListenAddr)
}
return srv.ListenAndServe()
}
func (s *Server) handleTrigger(w http.ResponseWriter, r *http.Request) {
if s.AuthToken != "" {
if !checkAuth(r, s.AuthToken) {
w.WriteHeader(http.StatusUnauthorized)
_, _ = w.Write([]byte("unauthorized"))
return
}
}
channel := r.URL.Query().Get("channel")
if channel == "" {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("missing channel"))
return
}
windowStr := r.URL.Query().Get("window")
if windowStr == "" {
windowStr = "6h"
}
window, err := time.ParseDuration(windowStr)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("bad window"))
return
}
ctx := r.Context()
msgs, err := s.Store.ListMessagesSince(ctx, channel, time.Now().Add(-window))
if err != nil {
if s.Logger != nil {
s.Logger.Error("http trigger store", "err", err)
}
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("store error"))
return
}
if s.Summarizer == nil {
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write([]byte("summarizer not configured"))
return
}
// Timeout summarization using configurable timeout (default 5m)
tout := s.SummarizerTimeout
if tout <= 0 {
tout = 5 * time.Minute
}
ctxSum, cancel := context.WithTimeout(ctx, tout)
defer cancel()
summary, err := s.Summarizer.Summarize(ctxSum, channel, msgs, window)
if err != nil {
if s.Logger != nil {
s.Logger.Error("http trigger summarizer", "err", err)
}
w.WriteHeader(http.StatusBadGateway)
_, _ = w.Write([]byte("summarizer error"))
return
}
// Only push if explicitly requested
pushFlag := strings.EqualFold(r.URL.Query().Get("push"), "1") || strings.EqualFold(r.URL.Query().Get("push"), "true")
if pushFlag && s.Notifier != nil {
title := fmt.Sprintf("IRC digest %s (%s)", channel, window)
_ = s.Notifier.Notify(ctx, title, summary)
if s.Metrics != nil {
atomic.AddInt64(&s.Metrics.NotificationsSent, 1)
}
}
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
_, _ = w.Write([]byte(summary))
}
func (s *Server) handleTail(w http.ResponseWriter, r *http.Request) {
if s.AuthToken != "" {
if !checkAuth(r, s.AuthToken) {
w.WriteHeader(http.StatusUnauthorized)
_, _ = w.Write([]byte("unauthorized"))
return
}
}
channel := r.URL.Query().Get("channel")
if channel == "" {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("missing channel"))
return
}
limit := getIntQuery(r, "limit", 50)
msgs, err := s.Store.ListRecentMessages(r.Context(), channel, limit)
if err != nil {
if s.Logger != nil {
s.Logger.Error("http tail store", "err", err)
}
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("store error"))
return
}
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
for i := len(msgs) - 1; i >= 0; i-- {
m := msgs[i]
_, _ = w.Write([]byte(m.Time.UTC().Format(time.RFC3339) + " " + m.Author + " " + channel + " " + m.Body + "\n"))
}
}
// Simple link card structure
type linkCard struct {
URL string `json:"url"`
Title string `json:"title"`
Description string `json:"description"`
Image string `json:"image"`
HTML string `json:"html"`
}
// handleLinkCard fetches basic OpenGraph/Twitter card metadata (best-effort) and returns a small card.
func (s *Server) handleLinkCard(w http.ResponseWriter, r *http.Request) {
if s.AuthToken != "" && !checkAuth(r, s.AuthToken) {
w.WriteHeader(http.StatusUnauthorized)
_, _ = w.Write([]byte("unauthorized"))
return
}
raw := r.URL.Query().Get("url")
if raw == "" {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("missing url"))
return
}
u, err := url.Parse(raw)
if err != nil || (u.Scheme != "http" && u.Scheme != "https") {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("bad url"))
return
}
// cache lookup
if s.cardCache == nil {
s.cardCache = make(map[string]linkCard)
s.cardCacheExp = make(map[string]time.Time)
}
if exp, ok := s.cardCacheExp[raw]; ok && time.Now().Before(exp) {
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(s.cardCache[raw])
return
}
// Special handling for X/Twitter posts via oEmbed
host := strings.ToLower(u.Host)
if (host == "x.com" || host == "twitter.com" || strings.HasSuffix(host, ".twitter.com")) && strings.Contains(strings.ToLower(u.Path), "/status/") {
oembed := "https://publish.twitter.com/oembed?omit_script=1&hide_thread=1&dnt=1&align=center&url=" + url.QueryEscape(raw)
client := &http.Client{Timeout: 8 * time.Second}
reqTw, _ := http.NewRequestWithContext(r.Context(), http.MethodGet, oembed, nil)
respTw, errTw := client.Do(reqTw)
if errTw == nil && respTw.StatusCode >= 200 && respTw.StatusCode < 300 {
defer respTw.Body.Close()
var o struct{ HTML string `json:"html"` }
if err := json.NewDecoder(respTw.Body).Decode(&o); err == nil && o.HTML != "" {
card := linkCard{URL: raw, HTML: o.HTML}
s.cardCache[raw] = card
s.cardCacheExp[raw] = time.Now().Add(24 * time.Hour)
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(card)
return
}
}
// fallthrough to generic fetch if oEmbed fails
}
// fetch minimal HTML and extract tags using a tolerant HTML parser
client := &http.Client{Timeout: 10 * time.Second}
req, _ := http.NewRequestWithContext(r.Context(), http.MethodGet, raw, nil)
req.Header.Set("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0 Safari/537.36")
req.Header.Set("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8")
req.Header.Set("Accept-Language", "en-US,en;q=0.9")
resp, err := client.Do(req)
if err != nil {
w.WriteHeader(http.StatusBadGateway)
_, _ = w.Write([]byte("fetch error"))
return
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
w.WriteHeader(http.StatusBadGateway)
_, _ = w.Write([]byte("bad status"))
return
}
// If image content-type, return as image card
if ct := strings.ToLower(resp.Header.Get("Content-Type")); strings.HasPrefix(ct, "image/") {
card := linkCard{URL: raw, Image: raw}
s.cardCache[raw] = card
s.cardCacheExp[raw] = time.Now().Add(24 * time.Hour)
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(card)
return
}
// limit to 768KB and parse tokens
limited := http.MaxBytesReader(w, resp.Body, 786432)
doc, err := xhtml.Parse(limited)
if err != nil {
w.WriteHeader(http.StatusBadGateway)
_, _ = w.Write([]byte("parse error"))
return
}
var title, desc, img string
var walker func(*xhtml.Node)
walker = func(n *xhtml.Node) {
if n.Type == xhtml.ElementNode {
if strings.EqualFold(n.Data, "meta") {
// property or name + content
var pn = ""
var nm = ""
var content = ""
for _, a := range n.Attr {
if strings.EqualFold(a.Key, "property") {
pn = a.Val
} else if strings.EqualFold(a.Key, "name") {
nm = a.Val
} else if strings.EqualFold(a.Key, "content") {
content = a.Val
}
}
key := strings.ToLower(pn)
if key == "" {
key = strings.ToLower(nm)
}
switch key {
case "og:title", "twitter:title":
if title == "" {
title = content
}
case "og:description", "twitter:description", "description":
if desc == "" {
desc = content
}
case "og:image", "og:image:url", "og:image:secure_url", "twitter:image", "twitter:image:src":
if img == "" {
img = content
}
}
}
}
for c := n.FirstChild; c != nil; c = c.NextSibling {
walker(c)
}
}
walker(doc)
// normalize image URL
if img != "" {
if strings.HasPrefix(img, "//") {
if u.Scheme == "" {
u.Scheme = "https"
}
img = u.Scheme + ":" + img
} else if !strings.HasPrefix(img, "http://") && !strings.HasPrefix(img, "https://") {
if ref, err := url.Parse(img); err == nil {
img = u.ResolveReference(ref).String()
}
}
}
card := linkCard{URL: raw, Title: strings.TrimSpace(title), Description: strings.TrimSpace(desc), Image: strings.TrimSpace(img)}
// cache for 24h
s.cardCache[raw] = card
s.cardCacheExp[raw] = time.Now().Add(24 * time.Hour)
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(card)
}
// handleLinkSummary returns a brief AI summary for a given URL, cached for 24h.
func (s *Server) handleLinkSummary(w http.ResponseWriter, r *http.Request) {
if s.AuthToken != "" && !checkAuth(r, s.AuthToken) {
w.WriteHeader(http.StatusUnauthorized)
_, _ = w.Write([]byte("unauthorized"))
return
}
raw := r.URL.Query().Get("url")
if raw == "" {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("missing url"))
return
}
if s.Summarizer == nil {
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write([]byte("summarizer not configured"))
return
}
if s.summaryCache == nil {
s.summaryCache = make(map[string]string)
}
if s.summaryCacheExp == nil {
s.summaryCacheExp = make(map[string]time.Time)
}
if exp, ok := s.summaryCacheExp[raw]; ok && time.Now().Before(exp) {
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{"summary": s.summaryCache[raw]})
return
}
msgs := []store.Message{{Channel: "#links", Author: "link", Body: raw, Time: time.Now().UTC()}}
tout := s.SummarizerTimeout
if tout <= 0 {
tout = 5 * time.Minute
}
if tout > 2*time.Minute {
tout = 2 * time.Minute
}
ctx, cancel := context.WithTimeout(r.Context(), tout)
defer cancel()
sum, err := s.Summarizer.Summarize(ctx, "#links", msgs, 0)
if err != nil {
w.WriteHeader(http.StatusBadGateway)
_, _ = w.Write([]byte("summarizer error"))
return
}
if sum == "" {
sum = "(no summary)"
}
s.summaryCache[raw] = sum
s.summaryCacheExp[raw] = time.Now().Add(24 * time.Hour)
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{"summary": sum})
}
func (s *Server) handleMetrics(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain; version=0.0.4")
msgs := int64(0)
nots := int64(0)
pruned := int64(0)
conn := int64(0)
if s.Metrics != nil {
msgs = atomic.LoadInt64(&s.Metrics.MessagesIngested)
nots = atomic.LoadInt64(&s.Metrics.NotificationsSent)
pruned = atomic.LoadInt64(&s.Metrics.MessagesPruned)
conn = atomic.LoadInt64(&s.Metrics.ConnectedGauge)
}
_, _ = fmt.Fprintf(w, "sojuboy_messages_ingested_total %d\n", msgs)
_, _ = fmt.Fprintf(w, "sojuboy_notifications_sent_total %d\n", nots)
_, _ = fmt.Fprintf(w, "sojuboy_messages_pruned_total %d\n", pruned)
_, _ = fmt.Fprintf(w, "sojuboy_connected %d\n", conn)
}
// --- Web UI handlers ---
func (s *Server) handleUIDash(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
w.WriteHeader(http.StatusNotFound)
return
}
// redirect to login if token cookie missing/invalid
if s.AuthToken != "" {
if c, err := r.Cookie("auth_token"); err != nil || c.Value != s.AuthToken {
http.Redirect(w, r, "/login", http.StatusFound)
return
}
}
s.render(w, "dashboard.tmpl", map[string]any{})
}
func (s *Server) handleUISummarizer(w http.ResponseWriter, r *http.Request) {
if s.AuthToken != "" {
if c, err := r.Cookie("auth_token"); err != nil || c.Value != s.AuthToken {
http.Redirect(w, r, "/login", http.StatusFound)
return
}
}
s.render(w, "summarizer.tmpl", map[string]any{})
}
func (s *Server) handleInfo(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
resp := map[string]any{
"version": s.Version,
"commit": s.Commit,
"builtAt": s.BuiltAt,
"startedAt": s.StartedAt.Format(time.RFC3339),
"uptime": time.Since(s.StartedAt).Round(time.Second).String(),
"messagesIngested": func() int64 {
if s.Metrics == nil {
return 0
}
return atomic.LoadInt64(&s.Metrics.MessagesIngested)
}(),
"notificationsSent": func() int64 {
if s.Metrics == nil {
return 0
}
return atomic.LoadInt64(&s.Metrics.NotificationsSent)
}(),
"messagesPruned": func() int64 {
if s.Metrics == nil {
return 0
}
return atomic.LoadInt64(&s.Metrics.MessagesPruned)
}(),
"connected": func() bool {
if s.Metrics == nil {
return false
}
return atomic.LoadInt64(&s.Metrics.ConnectedGauge) == 1
}(),
}
_ = json.NewEncoder(w).Encode(resp)
}
// SSE stream of new messages for a channel
func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) {
if s.AuthToken != "" && !checkAuth(r, s.AuthToken) {
w.WriteHeader(http.StatusUnauthorized)
_, _ = w.Write([]byte("unauthorized"))
return
}
ch := strings.TrimSpace(r.URL.Query().Get("channel"))
if ch == "" {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("missing channel"))
return
}
key := strings.ToLower(ch) // Normalize channel name to lowercase
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
flusher, ok := w.(http.Flusher)
if !ok {
w.WriteHeader(http.StatusInternalServerError)
return
}
// register subscriber
if s.subs == nil {
s.subs = make(map[string][]chan store.Message)
}
sub := make(chan store.Message, 64)
s.subsMu.Lock()
s.subs[key] = append(s.subs[key], sub)
s.subsMu.Unlock()
defer func() {
s.subsMu.Lock()
subs := s.subs[key]
for i := range subs {
if subs[i] == sub {
s.subs[key] = append(subs[:i], subs[i+1:]...)
break
}
}
s.subsMu.Unlock()
close(sub)
}()
hb := time.NewTicker(15 * time.Second) // SSE heartbeat
defer hb.Stop()
ctx := r.Context()
for {
select {
case <-ctx.Done():
return
case m := <-sub:
b, _ := json.Marshal(map[string]any{
"time": m.Time.UTC().Format(time.RFC3339),
"author": m.Author,
"body": m.Body,
"channel": m.Channel,
})
_, _ = fmt.Fprintf(w, "data: %s\n\n", b)
flusher.Flush()
case <-hb.C:
_, _ = fmt.Fprintf(w, ": ping\n\n")
flusher.Flush()
}
}
}
func (s *Server) handleChannels(w http.ResponseWriter, r *http.Request) {
if s.AuthToken != "" && !checkAuth(r, s.AuthToken) {
w.WriteHeader(http.StatusUnauthorized)
_, _ = w.Write([]byte("unauthorized"))
return
}
chs, err := s.Store.ListChannels(r.Context())
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("store error"))
return
}
if len(chs) == 0 && len(s.KnownChannels) > 0 {
chs = append(chs, s.KnownChannels...)
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(chs)
}
func (s *Server) handleTailJSON(w http.ResponseWriter, r *http.Request) {
if s.AuthToken != "" && !checkAuth(r, s.AuthToken) {
w.WriteHeader(http.StatusUnauthorized)
_, _ = w.Write([]byte("unauthorized"))
return
}
channel := r.URL.Query().Get("channel")
if channel == "" {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("missing channel"))
return
}
limit := getIntQuery(r, "limit", 100)
msgs, err := s.Store.ListRecentMessages(r.Context(), channel, limit)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("store error"))
return
}
w.Header().Set("Content-Type", "application/json")
type outMsg struct {
Time string `json:"time"`
Author string `json:"author"`
Body string `json:"body"`
Channel string `json:"channel"`
}
arr := make([]outMsg, 0, len(msgs))
for i := len(msgs) - 1; i >= 0; i-- {
m := msgs[i]
arr = append(arr, outMsg{Time: m.Time.UTC().Format(time.RFC3339), Author: m.Author, Body: m.Body, Channel: channel})
}
_ = json.NewEncoder(w).Encode(arr)
}
func (s *Server) handleTriggerJSON(w http.ResponseWriter, r *http.Request) {
if s.AuthToken != "" && !checkAuth(r, s.AuthToken) {
w.WriteHeader(http.StatusUnauthorized)
_, _ = w.Write([]byte("unauthorized"))
return
}
channel := r.URL.Query().Get("channel")
if channel == "" {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("missing channel"))
return
}
win := r.URL.Query().Get("window")
if win == "" {
win = "6h"
}
push := r.URL.Query().Get("push") == "1"
dur, err := time.ParseDuration(win)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("bad window"))
return
}
msgs, err := s.Store.ListMessagesSince(r.Context(), channel, time.Now().Add(-dur))
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("store error"))
return
}
if s.Summarizer == nil {
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write([]byte("summarizer not configured"))
return
}
tout := s.SummarizerTimeout
if tout <= 0 {
tout = 5 * time.Minute
}
ctx, cancel := context.WithTimeout(r.Context(), tout)
defer cancel()
sum, err := s.Summarizer.Summarize(ctx, channel, msgs, dur)
if err != nil {
w.WriteHeader(http.StatusBadGateway)
_, _ = w.Write([]byte("summarizer error"))
return
}
if push && s.Notifier != nil {
title := fmt.Sprintf("IRC digest %s (%s)", channel, dur)
_ = s.Notifier.Notify(r.Context(), title, sum)
if s.Metrics != nil {
atomic.AddInt64(&s.Metrics.NotificationsSent, 1)
}
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{"summary": sum})
}
// history paging
func (s *Server) handleHistory(w http.ResponseWriter, r *http.Request) {
if s.AuthToken != "" && !checkAuth(r, s.AuthToken) {
w.WriteHeader(http.StatusUnauthorized)
_, _ = w.Write([]byte("unauthorized"))
return
}
channel := r.URL.Query().Get("channel")
beforeStr := r.URL.Query().Get("before")
if channel == "" || beforeStr == "" {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("missing params"))
return
}
t, err := time.Parse(time.RFC3339, beforeStr)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("bad before"))
return
}
limit := getIntQuery(r, "limit", 50)
msgs, err := s.Store.ListMessagesBefore(r.Context(), channel, t, limit)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("store error"))
return
}
w.Header().Set("Content-Type", "application/json")
type outMsg struct {
Time string `json:"time"`
Author string `json:"author"`
Body string `json:"body"`
Channel string `json:"channel"`
}
arr := make([]outMsg, 0, len(msgs))
for _, m := range msgs {
arr = append(arr, outMsg{Time: m.Time.UTC().Format(time.RFC3339), Author: m.Author, Body: m.Body, Channel: m.Channel})
}
_ = json.NewEncoder(w).Encode(arr)
}
func checkAuth(r *http.Request, token string) bool {
auth := r.Header.Get("Authorization")
if strings.HasPrefix(auth, "Bearer ") {
if strings.TrimPrefix(auth, "Bearer ") == token {
return true
}
}
if r.URL.Query().Get("token") == token {
return true
}
user, pass, ok := r.BasicAuth()
if ok && user == "token" && pass == token {
return true
}
if r.Header.Get("X-Auth-Token") == token {
return true
}
// Cookie-based
if c, err := r.Cookie("auth_token"); err == nil && c.Value == token {
return true
}
return false
}
func getIntQuery(r *http.Request, key string, def int) int {
if v := r.URL.Query().Get(key); v != "" {
if n, err := strconv.Atoi(v); err == nil {
return n
}
}
return def
}
// --- Login handlers ---
func (s *Server) handleLogin(w http.ResponseWriter, r *http.Request) {
if s.AuthToken == "" {
http.Redirect(w, r, "/", http.StatusFound)
return
}
// If already authed, go to UI
if c, err := r.Cookie("auth_token"); err == nil && c.Value == s.AuthToken {
http.Redirect(w, r, "/", http.StatusFound)
return
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
page := `<!doctype html>
<html>
<head>
<meta charset="utf-8"/>
<meta name="viewport" content="width=device-width, initial-scale=1"/>
<title>Sign in · sojuboy</title>
<link rel="stylesheet" href="https://unpkg.com/@picocss/pico@2/css/pico.min.css">
<style>body{padding:1rem;} main{max-width:480px;margin:auto;margin-top:15vh}</style>
</head>
<body>
<main class="container">
<article>
<h2>Sign in</h2>
<form id="f" method="post" action="/auth">
<label>Access token
<input type="password" name="token" autocomplete="current-password" required placeholder="HTTP_TOKEN"/>
</label>
<button type="submit">Continue</button>
</form>
</article>
</main>
</body>
</html>`
_, _ = w.Write([]byte(page))
}
func (s *Server) handleAuth(w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("bad request"))
return
}
tok := r.Form.Get("token")
if tok == "" || s.AuthToken == "" || tok != s.AuthToken {
w.WriteHeader(http.StatusUnauthorized)
_, _ = w.Write([]byte("unauthorized"))
return
}
// set cookie for 7 days
maxAge := 7 * 24 * 60 * 60
secure := r.TLS != nil || strings.EqualFold(r.Header.Get("X-Forwarded-Proto"), "https")
http.SetCookie(w, &http.Cookie{Name: "auth_token", Value: tok, Path: "/", MaxAge: maxAge, HttpOnly: true, Secure: secure, SameSite: http.SameSiteLaxMode})
w.Header().Set("Location", "/")
w.WriteHeader(http.StatusFound)
}
func (s *Server) handleLogout(w http.ResponseWriter, r *http.Request) {
http.SetCookie(w, &http.Cookie{Name: "auth_token", Value: "", Path: "/", MaxAge: -1})
http.Redirect(w, r, "/login", http.StatusFound)
}
// Broadcast sends a message to all SSE subscribers for a given channel.
func (s *Server) Broadcast(channel string, m store.Message) {
key := strings.ToLower(strings.TrimSpace(channel))
if key == "" {
return
}
s.subsMu.RLock()
subs := s.subs[key]
s.subsMu.RUnlock()
for _, sub := range subs {
select {
case sub <- m:
// ok
default:
if s.Logger != nil {
s.Logger.Warn("sse subscriber buffer full, dropping message", "channel", key)
}
}
}
}