package httpapi import ( "encoding/json" "context" "fmt" "log/slog" "net/http" "strconv" "strings" "sync/atomic" "time" "sojuboy/internal/store" "sojuboy/internal/summarizer" ) type Metrics struct { MessagesIngested int64 // counter NotificationsSent int64 // counter MessagesPruned int64 // counter ConnectedGauge int64 // 0/1 } type Server struct { ListenAddr string AuthToken string Store *store.Store Summarizer summarizer.Summarizer Notifier interface { Notify(context.Context, string, string) error } Logger *slog.Logger Metrics *Metrics Ready func() bool // Optional timeout override for summarizer SummarizerTimeout time.Duration // Build/runtime info for UI Version string Commit string BuiltAt string StartedAt time.Time // Optional seed list from config for /api/channels when DB is empty KnownChannels []string } func (s *Server) Start(ctx context.Context) error { mux := http.NewServeMux() // Minimal web UI mux.HandleFunc("/", s.handleUI) mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("ok")) }) mux.HandleFunc("/ready", func(w http.ResponseWriter, r *http.Request) { if s.Ready != nil && !s.Ready() { w.WriteHeader(http.StatusServiceUnavailable) _, _ = w.Write([]byte("not ready")) return } w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("ready")) }) mux.HandleFunc("/trigger", s.handleTrigger) mux.HandleFunc("/tail", s.handleTail) mux.HandleFunc("/metrics", s.handleMetrics) // JSON endpoints for UI mux.HandleFunc("/api/info", s.handleInfo) mux.HandleFunc("/api/channels", s.handleChannels) mux.HandleFunc("/api/tail", s.handleTailJSON) mux.HandleFunc("/api/trigger", s.handleTriggerJSON) srv := &http.Server{ Addr: s.ListenAddr, Handler: mux, } go func() { <-ctx.Done() _ = srv.Shutdown(context.Background()) }() if s.Logger != nil { s.Logger.Info("http listening", "addr", s.ListenAddr) } return srv.ListenAndServe() } func (s *Server) handleTrigger(w http.ResponseWriter, r *http.Request) { if s.AuthToken != "" { if !checkAuth(r, s.AuthToken) { w.WriteHeader(http.StatusUnauthorized) _, _ = w.Write([]byte("unauthorized")) return } } channel := r.URL.Query().Get("channel") if channel == "" { w.WriteHeader(http.StatusBadRequest) _, _ = w.Write([]byte("missing channel")) return } windowStr := r.URL.Query().Get("window") if windowStr == "" { windowStr = "6h" } window, err := time.ParseDuration(windowStr) if err != nil { w.WriteHeader(http.StatusBadRequest) _, _ = w.Write([]byte("bad window")) return } ctx := r.Context() msgs, err := s.Store.ListMessagesSince(ctx, channel, time.Now().Add(-window)) if err != nil { if s.Logger != nil { s.Logger.Error("http trigger store", "err", err) } w.WriteHeader(http.StatusInternalServerError) _, _ = w.Write([]byte("store error")) return } if s.Summarizer == nil { w.WriteHeader(http.StatusServiceUnavailable) _, _ = w.Write([]byte("summarizer not configured")) return } // Timeout summarization using configurable timeout (default 5m) tout := s.SummarizerTimeout if tout <= 0 { tout = 5 * time.Minute } ctxSum, cancel := context.WithTimeout(ctx, tout) defer cancel() summary, err := s.Summarizer.Summarize(ctxSum, channel, msgs, window) if err != nil { if s.Logger != nil { s.Logger.Error("http trigger summarizer", "err", err) } w.WriteHeader(http.StatusBadGateway) _, _ = w.Write([]byte("summarizer error")) return } if s.Notifier != nil { title := fmt.Sprintf("IRC digest %s (%s)", channel, window) _ = s.Notifier.Notify(ctx, title, summary) if s.Metrics != nil { atomic.AddInt64(&s.Metrics.NotificationsSent, 1) } } w.Header().Set("Content-Type", "text/plain; charset=utf-8") _, _ = w.Write([]byte(summary)) } func (s *Server) handleTail(w http.ResponseWriter, r *http.Request) { if s.AuthToken != "" { if !checkAuth(r, s.AuthToken) { w.WriteHeader(http.StatusUnauthorized) _, _ = w.Write([]byte("unauthorized")) return } } channel := r.URL.Query().Get("channel") if channel == "" { w.WriteHeader(http.StatusBadRequest) _, _ = w.Write([]byte("missing channel")) return } limit := getIntQuery(r, "limit", 50) msgs, err := s.Store.ListRecentMessages(r.Context(), channel, limit) if err != nil { if s.Logger != nil { s.Logger.Error("http tail store", "err", err) } w.WriteHeader(http.StatusInternalServerError) _, _ = w.Write([]byte("store error")) return } w.Header().Set("Content-Type", "text/plain; charset=utf-8") for i := len(msgs) - 1; i >= 0; i-- { m := msgs[i] _, _ = w.Write([]byte(m.Time.UTC().Format(time.RFC3339) + " " + m.Author + " " + channel + " " + m.Body + "\n")) } } func (s *Server) handleMetrics(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/plain; version=0.0.4") msgs := int64(0) nots := int64(0) pruned := int64(0) conn := int64(0) if s.Metrics != nil { msgs = atomic.LoadInt64(&s.Metrics.MessagesIngested) nots = atomic.LoadInt64(&s.Metrics.NotificationsSent) pruned = atomic.LoadInt64(&s.Metrics.MessagesPruned) conn = atomic.LoadInt64(&s.Metrics.ConnectedGauge) } _, _ = fmt.Fprintf(w, "sojuboy_messages_ingested_total %d\n", msgs) _, _ = fmt.Fprintf(w, "sojuboy_notifications_sent_total %d\n", nots) _, _ = fmt.Fprintf(w, "sojuboy_messages_pruned_total %d\n", pruned) _, _ = fmt.Fprintf(w, "sojuboy_connected %d\n", conn) } // --- Web UI handlers --- func (s *Server) handleUI(w http.ResponseWriter, r *http.Request) { if r.URL.Path != "/" { w.WriteHeader(http.StatusNotFound) return } w.Header().Set("Content-Type", "text/html; charset=utf-8") // Pico.css from CDN and a tiny app page := ` sojuboy

sojuboy


    

    

Status

` _, _ = w.Write([]byte(page)) } func (s *Server) handleInfo(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") resp := map[string]any{ "version": s.Version, "commit": s.Commit, "builtAt": s.BuiltAt, "startedAt": s.StartedAt.Format(time.RFC3339), "uptime": time.Since(s.StartedAt).Round(time.Second).String(), "messagesIngested": func() int64 { if s.Metrics==nil {return 0}; return atomic.LoadInt64(&s.Metrics.MessagesIngested) }(), "notificationsSent": func() int64 { if s.Metrics==nil {return 0}; return atomic.LoadInt64(&s.Metrics.NotificationsSent) }(), "messagesPruned": func() int64 { if s.Metrics==nil {return 0}; return atomic.LoadInt64(&s.Metrics.MessagesPruned) }(), "connected": func() bool { if s.Metrics==nil {return false}; return atomic.LoadInt64(&s.Metrics.ConnectedGauge)==1 }(), } _ = json.NewEncoder(w).Encode(resp) } func (s *Server) handleChannels(w http.ResponseWriter, r *http.Request) { if s.AuthToken != "" && !checkAuth(r, s.AuthToken) { w.WriteHeader(http.StatusUnauthorized) _, _ = w.Write([]byte("unauthorized")) return } chs, err := s.Store.ListChannels(r.Context()) if err != nil { w.WriteHeader(http.StatusInternalServerError) _, _ = w.Write([]byte("store error")) return } if len(chs) == 0 && len(s.KnownChannels) > 0 { chs = append(chs, s.KnownChannels...) } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(chs) } func (s *Server) handleTailJSON(w http.ResponseWriter, r *http.Request) { if s.AuthToken != "" && !checkAuth(r, s.AuthToken) { w.WriteHeader(http.StatusUnauthorized) _, _ = w.Write([]byte("unauthorized")) return } channel := r.URL.Query().Get("channel") if channel == "" { w.WriteHeader(http.StatusBadRequest); _, _ = w.Write([]byte("missing channel")); return } limit := getIntQuery(r, "limit", 100) msgs, err := s.Store.ListRecentMessages(r.Context(), channel, limit) if err != nil { w.WriteHeader(http.StatusInternalServerError); _, _ = w.Write([]byte("store error")); return } w.Header().Set("Content-Type", "application/json") type outMsg struct{ Time string `json:"time"`; Author string `json:"author"`; Body string `json:"body"`; Channel string `json:"channel"` } arr := make([]outMsg, 0, len(msgs)) for i := len(msgs)-1; i>=0; i-- { m := msgs[i]; arr = append(arr, outMsg{Time: m.Time.UTC().Format(time.RFC3339), Author: m.Author, Body: m.Body, Channel: channel}) } _ = json.NewEncoder(w).Encode(arr) } func (s *Server) handleTriggerJSON(w http.ResponseWriter, r *http.Request) { if s.AuthToken != "" && !checkAuth(r, s.AuthToken) { w.WriteHeader(http.StatusUnauthorized) _, _ = w.Write([]byte("unauthorized")) return } channel := r.URL.Query().Get("channel") if channel == "" { w.WriteHeader(http.StatusBadRequest); _, _ = w.Write([]byte("missing channel")); return } win := r.URL.Query().Get("window"); if win=="" { win = "6h" } push := r.URL.Query().Get("push") == "1" dur, err := time.ParseDuration(win) if err != nil { w.WriteHeader(http.StatusBadRequest); _, _ = w.Write([]byte("bad window")); return } msgs, err := s.Store.ListMessagesSince(r.Context(), channel, time.Now().Add(-dur)) if err != nil { w.WriteHeader(http.StatusInternalServerError); _, _ = w.Write([]byte("store error")); return } if s.Summarizer == nil { w.WriteHeader(http.StatusServiceUnavailable); _, _ = w.Write([]byte("summarizer not configured")); return } tout := s.SummarizerTimeout; if tout<=0 { tout = 5*time.Minute } ctx, cancel := context.WithTimeout(r.Context(), tout); defer cancel() sum, err := s.Summarizer.Summarize(ctx, channel, msgs, dur) if err != nil { w.WriteHeader(http.StatusBadGateway); _, _ = w.Write([]byte("summarizer error")); return } if push && s.Notifier != nil { title := fmt.Sprintf("IRC digest %s (%s)", channel, dur); _ = s.Notifier.Notify(r.Context(), title, sum); if s.Metrics != nil { atomic.AddInt64(&s.Metrics.NotificationsSent, 1) } } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]any{"summary": sum}) } func checkAuth(r *http.Request, token string) bool { auth := r.Header.Get("Authorization") if strings.HasPrefix(auth, "Bearer ") { if strings.TrimPrefix(auth, "Bearer ") == token { return true } } if r.URL.Query().Get("token") == token { return true } user, pass, ok := r.BasicAuth() if ok && user == "token" && pass == token { return true } if r.Header.Get("X-Auth-Token") == token { return true } return false } func getIntQuery(r *http.Request, key string, def int) int { if v := r.URL.Query().Get(key); v != "" { if n, err := strconv.Atoi(v); err == nil { return n } } return def }