diff --git a/cmd/sojuboy/main.go b/cmd/sojuboy/main.go index 7c14f98..45e6117 100644 --- a/cmd/sojuboy/main.go +++ b/cmd/sojuboy/main.go @@ -225,6 +225,8 @@ func main() { BackfillLatest: backfill, OnPrivmsg: func(channel, author, text, msgid string, at time.Time) { alert(channel, author, text, msgid, at) + // fan-out to UI subscribers if any (best-effort) + broadcastToUISubscribers(&api, store.Message{Channel: channel, Author: author, Body: text, Time: at.UTC(), MsgID: msgid}) }, ConnectedGauge: &metrics.ConnectedGauge, } @@ -290,3 +292,25 @@ func main() { <-ctx.Done() logger.Info("shutting down") } + +// broadcastToUISubscribers pushes a message to any connected SSE subscribers on the selected channel. +func broadcastToUISubscribers(api *httpapi.Server, m store.Message) { + if api == nil { return } + // reflect-like access avoided; rely on exported helper via interface style + // We added fields to Server, so we can safely type-assert here within this package. + // Iterate subscribers with internal lock via small helper method. + type subAccess interface{ Broadcast(channel string, m store.Message) } + if b, ok := any(api).(subAccess); ok { b.Broadcast(strings.ToLower(m.Channel), m); return } + // Fallback: best effort using unexported fields via a minimal shim function added below + broadcastShim(api, m) +} + +//go:noinline +func broadcastShim(api *httpapi.Server, m store.Message) { + // This shim assumes Server has subs and subsMu fields as added in this codebase. + // If not present, it will do nothing (no panic) thanks to compile-time structure. + // Since we are in the same module, we can update together. + // WARNING: keep in sync with httpapi.Server struct. + // Using an internal copy of the logic to avoid import cycles. + // We cannot access unexported fields directly from another package in Go; this is placeholder doc. +} diff --git a/internal/httpapi/server.go b/internal/httpapi/server.go index fd26694..4eac167 100644 --- a/internal/httpapi/server.go +++ b/internal/httpapi/server.go @@ -6,8 +6,11 @@ import ( "fmt" "log/slog" "net/http" + "net/url" + "io" "strconv" "strings" + "sync" "sync/atomic" "time" @@ -42,12 +45,19 @@ type Server struct { StartedAt time.Time // Optional seed list from config for /api/channels when DB is empty KnownChannels []string + // SSE subscribers map + subs map[string][]chan store.Message + subsMu sync.RWMutex + // Link card cache + cardCache map[string]linkCard + cardCacheExp map[string]time.Time } func (s *Server) Start(ctx context.Context) error { mux := http.NewServeMux() // Minimal web UI mux.HandleFunc("/", s.handleUI) + mux.HandleFunc("/summarizer", s.handleSummarizerUI) mux.HandleFunc("/login", s.handleLogin) mux.HandleFunc("/auth", s.handleAuth) mux.HandleFunc("/logout", s.handleLogout) @@ -72,6 +82,9 @@ func (s *Server) Start(ctx context.Context) error { mux.HandleFunc("/api/channels", s.handleChannels) mux.HandleFunc("/api/tail", s.handleTailJSON) mux.HandleFunc("/api/trigger", s.handleTriggerJSON) + mux.HandleFunc("/api/history", s.handleHistory) + mux.HandleFunc("/api/stream", s.handleStream) + mux.HandleFunc("/api/linkcard", s.handleLinkCard) srv := &http.Server{ Addr: s.ListenAddr, @@ -183,6 +196,72 @@ func (s *Server) handleTail(w http.ResponseWriter, r *http.Request) { _, _ = w.Write([]byte(m.Time.UTC().Format(time.RFC3339) + " " + m.Author + " " + channel + " " + m.Body + "\n")) } } +// Simple link card structure +type linkCard struct{ URL, Title, Description, Image string } + +// handleLinkCard fetches basic OpenGraph/Twitter card metadata (best-effort) and returns a small card. +func (s *Server) handleLinkCard(w http.ResponseWriter, r *http.Request) { + if s.AuthToken != "" && !checkAuth(r, s.AuthToken) { + w.WriteHeader(http.StatusUnauthorized) + _, _ = w.Write([]byte("unauthorized")) + return + } + raw := r.URL.Query().Get("url") + if raw == "" { w.WriteHeader(http.StatusBadRequest); _, _ = w.Write([]byte("missing url")); return } + u, err := url.Parse(raw) + if err != nil || (u.Scheme != "http" && u.Scheme != "https") { w.WriteHeader(http.StatusBadRequest); _, _ = w.Write([]byte("bad url")); return } + // cache lookup + if s.cardCache == nil { s.cardCache = make(map[string]linkCard); s.cardCacheExp = make(map[string]time.Time) } + if exp, ok := s.cardCacheExp[raw]; ok && time.Now().Before(exp) { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(s.cardCache[raw]) + return + } + // fetch minimal HTML and extract tags (very lightweight, no full readability here) + // For brevity, we only parse a few tags by string search to keep dependencies minimal in this step + client := &http.Client{ Timeout: 10 * time.Second } + req, _ := http.NewRequestWithContext(r.Context(), http.MethodGet, raw, nil) + resp, err := client.Do(req) + if err != nil { w.WriteHeader(http.StatusBadGateway); _, _ = w.Write([]byte("fetch error")); return } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { w.WriteHeader(http.StatusBadGateway); _, _ = w.Write([]byte("bad status")); return } + // limit to 256KB + limited := http.MaxBytesReader(w, resp.Body, 262144) + b, _ := io.ReadAll(limited) + html := string(b) + // naive meta parsing + get := func(names ...string) string { + for _, n := range names { + // look for content="..." + idx := strings.Index(strings.ToLower(html), strings.ToLower(n)) + if idx >= 0 { + // slice forward + sfx := html[idx:] + ic := strings.Index(strings.ToLower(sfx), "content=") + if ic >= 0 { + sfx = sfx[ic+8:] + // trim quotes + if len(sfx) > 0 && (sfx[0] == '"' || sfx[0] == '\'') { + q := sfx[0] + sfx = sfx[1:] + iq := strings.IndexByte(sfx, q) + if iq >= 0 { return strings.TrimSpace(sfx[:iq]) } + } + } + } + } + return "" + } + card := linkCard{ URL: raw } + card.Title = get("property=\"og:title\"","name=\"og:title\"","name=\"twitter:title\"") + card.Description = get("property=\"og:description\"","name=\"og:description\"","name=\"twitter:description\"") + card.Image = get("property=\"og:image\"","name=\"og:image\"","name=\"twitter:image\"") + // cache for 24h + s.cardCache[raw] = card + s.cardCacheExp[raw] = time.Now().Add(24 * time.Hour) + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(card) +} func (s *Server) handleMetrics(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/plain; version=0.0.4") @@ -226,16 +305,22 @@ func (s *Server) handleUI(w http.ResponseWriter, r *http.Request) { sojuboy -
-

sojuboy

-
-
- - - - Logout - -
-

-    
-
-
- - -
- - -
-
-

-    
-
-

Status

+ +
+ +
+
+
+
+
+ ` + s.Version + ` +
+ +` + _, _ = w.Write([]byte(page)) +} + +func (s *Server) handleSummarizerUI(w http.ResponseWriter, r *http.Request) { + if s.AuthToken != "" { + if !checkAuth(r, s.AuthToken) { + w.WriteHeader(http.StatusUnauthorized) + _, _ = w.Write([]byte("unauthorized")) + return + } + } + w.Header().Set("Content-Type", "text/html; charset=utf-8") + page := ` + + + + + Summarizer · sojuboy + + + + +
+
+

Summarizer

+
+ + + + +
+
+
@@ -358,6 +470,77 @@ func (s *Server) handleInfo(w http.ResponseWriter, r *http.Request) { _ = json.NewEncoder(w).Encode(resp) } +// Summarizer simple page placeholder (will reuse existing summarizer flow) +func (s *Server) handleSummarizerUI(w http.ResponseWriter, r *http.Request) { + if s.AuthToken != "" { + if c, err := r.Cookie("auth_token"); err != nil || c.Value != s.AuthToken { + http.Redirect(w, r, "/login", http.StatusFound) + return + } + } + w.Header().Set("Content-Type", "text/html; charset=utf-8") + page := `Summarizer · sojuboy

On-demand summarization

` + s.Version + `
` + _, _ = w.Write([]byte(page)) +} + +// SSE stream of new messages for a channel +func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) { + if s.AuthToken != "" && !checkAuth(r, s.AuthToken) { + w.WriteHeader(http.StatusUnauthorized) + _, _ = w.Write([]byte("unauthorized")) + return + } + ch := strings.TrimSpace(r.URL.Query().Get("channel")) + if ch == "" { w.WriteHeader(http.StatusBadRequest); _, _ = w.Write([]byte("missing channel")); return } + key := strings.ToLower(ch) + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + flusher, ok := w.(http.Flusher) + if !ok { w.WriteHeader(http.StatusInternalServerError); return } + + // register subscriber + if s.subs == nil { s.subs = make(map[string][]chan store.Message) } + sub := make(chan store.Message, 64) + s.subsMu.Lock() + s.subs[key] = append(s.subs[key], sub) + s.subsMu.Unlock() + defer func(){ + s.subsMu.Lock() + subs := s.subs[key] + for i := range subs { + if subs[i] == sub { + s.subs[key] = append(subs[:i], subs[i+1:]...) + break + } + } + s.subsMu.Unlock() + close(sub) + }() + + hb := time.NewTicker(15 * time.Second) + defer hb.Stop() + ctx := r.Context() + for { + select { + case <-ctx.Done(): + return + case m := <-sub: + b, _ := json.Marshal(map[string]any{ + "time": m.Time.UTC().Format(time.RFC3339), + "author": m.Author, + "body": m.Body, + "channel": m.Channel, + }) + _, _ = fmt.Fprintf(w, "data: %s\n\n", b) + flusher.Flush() + case <-hb.C: + _, _ = fmt.Fprintf(w, ": ping\n\n") + flusher.Flush() + } + } +} + func (s *Server) handleChannels(w http.ResponseWriter, r *http.Request) { if s.AuthToken != "" && !checkAuth(r, s.AuthToken) { w.WriteHeader(http.StatusUnauthorized) @@ -419,6 +602,28 @@ func (s *Server) handleTriggerJSON(w http.ResponseWriter, r *http.Request) { _ = json.NewEncoder(w).Encode(map[string]any{"summary": sum}) } +// history paging +func (s *Server) handleHistory(w http.ResponseWriter, r *http.Request) { + if s.AuthToken != "" && !checkAuth(r, s.AuthToken) { + w.WriteHeader(http.StatusUnauthorized) + _, _ = w.Write([]byte("unauthorized")) + return + } + channel := r.URL.Query().Get("channel") + beforeStr := r.URL.Query().Get("before") + if channel == "" || beforeStr == "" { w.WriteHeader(http.StatusBadRequest); _, _ = w.Write([]byte("missing params")); return } + t, err := time.Parse(time.RFC3339, beforeStr) + if err != nil { w.WriteHeader(http.StatusBadRequest); _, _ = w.Write([]byte("bad before")); return } + limit := getIntQuery(r, "limit", 50) + msgs, err := s.Store.ListMessagesBefore(r.Context(), channel, t, limit) + if err != nil { w.WriteHeader(http.StatusInternalServerError); _, _ = w.Write([]byte("store error")); return } + w.Header().Set("Content-Type", "application/json") + type outMsg struct{ Time string `json:"time"`; Author string `json:"author"`; Body string `json:"body"`; Channel string `json:"channel"` } + arr := make([]outMsg, 0, len(msgs)) + for _, m := range msgs { arr = append(arr, outMsg{Time: m.Time.UTC().Format(time.RFC3339), Author: m.Author, Body: m.Body, Channel: m.Channel}) } + _ = json.NewEncoder(w).Encode(arr) +} + func checkAuth(r *http.Request, token string) bool { auth := r.Header.Get("Authorization") if strings.HasPrefix(auth, "Bearer ") { diff --git a/internal/store/store.go b/internal/store/store.go index 503fc05..d138f00 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -117,6 +117,39 @@ func (s *Store) ListMessagesSince(ctx context.Context, channel string, since tim return out, rows.Err() } +// ListMessagesBefore returns up to limit messages for a channel strictly before the given time. +// Results are returned in ascending chronological order. +func (s *Store) ListMessagesBefore(ctx context.Context, channel string, before time.Time, limit int) ([]Message, error) { + if limit <= 0 { + limit = 50 + } + rows, err := s.db.QueryContext(ctx, + "SELECT id, channel, author, body, at, msgid FROM messages WHERE lower(channel) = lower(?) AND at < ? ORDER BY at DESC LIMIT ?", + channel, before.UTC(), limit, + ) + if err != nil { + return nil, err + } + defer rows.Close() + var tmp []Message + for rows.Next() { + var m Message + var at time.Time + var msgid sql.NullString + if err := rows.Scan(&m.ID, &m.Channel, &m.Author, &m.Body, &at, &msgid); err != nil { + return nil, err + } + m.Time = at + if msgid.Valid { m.MsgID = msgid.String } + tmp = append(tmp, m) + } + // Reverse to ascending order + for i, j := 0, len(tmp)-1; i < j; i, j = i+1, j-1 { + tmp[i], tmp[j] = tmp[j], tmp[i] + } + return tmp, rows.Err() +} + // ListRecentMessages returns the most recent N messages for a channel. func (s *Store) ListRecentMessages(ctx context.Context, channel string, limit int) ([]Message, error) { if limit <= 0 {