214 lines
5.5 KiB
Go
214 lines
5.5 KiB
Go
package httpapi
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"sojuboy/internal/store"
|
|
"sojuboy/internal/summarizer"
|
|
)
|
|
|
|
type Metrics struct {
|
|
MessagesIngested int64 // counter
|
|
NotificationsSent int64 // counter
|
|
MessagesPruned int64 // counter
|
|
ConnectedGauge int64 // 0/1
|
|
}
|
|
|
|
type Server struct {
|
|
ListenAddr string
|
|
AuthToken string
|
|
Store *store.Store
|
|
Summarizer summarizer.Summarizer
|
|
Notifier interface {
|
|
Notify(context.Context, string, string) error
|
|
}
|
|
Logger *slog.Logger
|
|
Metrics *Metrics
|
|
Ready func() bool
|
|
// Optional timeout override for summarizer
|
|
SummarizerTimeout time.Duration
|
|
}
|
|
|
|
func (s *Server) Start(ctx context.Context) error {
|
|
mux := http.NewServeMux()
|
|
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte("ok"))
|
|
})
|
|
mux.HandleFunc("/ready", func(w http.ResponseWriter, r *http.Request) {
|
|
if s.Ready != nil && !s.Ready() {
|
|
w.WriteHeader(http.StatusServiceUnavailable)
|
|
_, _ = w.Write([]byte("not ready"))
|
|
return
|
|
}
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte("ready"))
|
|
})
|
|
mux.HandleFunc("/trigger", s.handleTrigger)
|
|
mux.HandleFunc("/tail", s.handleTail)
|
|
mux.HandleFunc("/metrics", s.handleMetrics)
|
|
|
|
srv := &http.Server{
|
|
Addr: s.ListenAddr,
|
|
Handler: mux,
|
|
}
|
|
go func() {
|
|
<-ctx.Done()
|
|
_ = srv.Shutdown(context.Background())
|
|
}()
|
|
if s.Logger != nil {
|
|
s.Logger.Info("http listening", "addr", s.ListenAddr)
|
|
}
|
|
return srv.ListenAndServe()
|
|
}
|
|
|
|
func (s *Server) handleTrigger(w http.ResponseWriter, r *http.Request) {
|
|
if s.AuthToken != "" {
|
|
if !checkAuth(r, s.AuthToken) {
|
|
w.WriteHeader(http.StatusUnauthorized)
|
|
_, _ = w.Write([]byte("unauthorized"))
|
|
return
|
|
}
|
|
}
|
|
channel := r.URL.Query().Get("channel")
|
|
if channel == "" {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
_, _ = w.Write([]byte("missing channel"))
|
|
return
|
|
}
|
|
windowStr := r.URL.Query().Get("window")
|
|
if windowStr == "" {
|
|
windowStr = "6h"
|
|
}
|
|
window, err := time.ParseDuration(windowStr)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
_, _ = w.Write([]byte("bad window"))
|
|
return
|
|
}
|
|
ctx := r.Context()
|
|
msgs, err := s.Store.ListMessagesSince(ctx, channel, time.Now().Add(-window))
|
|
if err != nil {
|
|
if s.Logger != nil {
|
|
s.Logger.Error("http trigger store", "err", err)
|
|
}
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
_, _ = w.Write([]byte("store error"))
|
|
return
|
|
}
|
|
if s.Summarizer == nil {
|
|
w.WriteHeader(http.StatusServiceUnavailable)
|
|
_, _ = w.Write([]byte("summarizer not configured"))
|
|
return
|
|
}
|
|
// Timeout summarization using configurable timeout (default 5m)
|
|
tout := s.SummarizerTimeout
|
|
if tout <= 0 {
|
|
tout = 5 * time.Minute
|
|
}
|
|
ctxSum, cancel := context.WithTimeout(ctx, tout)
|
|
defer cancel()
|
|
summary, err := s.Summarizer.Summarize(ctxSum, channel, msgs, window)
|
|
if err != nil {
|
|
if s.Logger != nil {
|
|
s.Logger.Error("http trigger summarizer", "err", err)
|
|
}
|
|
w.WriteHeader(http.StatusBadGateway)
|
|
_, _ = w.Write([]byte("summarizer error"))
|
|
return
|
|
}
|
|
if s.Notifier != nil {
|
|
title := fmt.Sprintf("IRC digest %s (%s)", channel, window)
|
|
_ = s.Notifier.Notify(ctx, title, summary)
|
|
if s.Metrics != nil {
|
|
atomic.AddInt64(&s.Metrics.NotificationsSent, 1)
|
|
}
|
|
}
|
|
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
|
_, _ = w.Write([]byte(summary))
|
|
}
|
|
|
|
func (s *Server) handleTail(w http.ResponseWriter, r *http.Request) {
|
|
if s.AuthToken != "" {
|
|
if !checkAuth(r, s.AuthToken) {
|
|
w.WriteHeader(http.StatusUnauthorized)
|
|
_, _ = w.Write([]byte("unauthorized"))
|
|
return
|
|
}
|
|
}
|
|
channel := r.URL.Query().Get("channel")
|
|
if channel == "" {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
_, _ = w.Write([]byte("missing channel"))
|
|
return
|
|
}
|
|
limit := getIntQuery(r, "limit", 50)
|
|
msgs, err := s.Store.ListRecentMessages(r.Context(), channel, limit)
|
|
if err != nil {
|
|
if s.Logger != nil {
|
|
s.Logger.Error("http tail store", "err", err)
|
|
}
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
_, _ = w.Write([]byte("store error"))
|
|
return
|
|
}
|
|
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
|
for i := len(msgs) - 1; i >= 0; i-- {
|
|
m := msgs[i]
|
|
_, _ = w.Write([]byte(m.Time.UTC().Format(time.RFC3339) + " " + m.Author + " " + channel + " " + m.Body + "\n"))
|
|
}
|
|
}
|
|
|
|
func (s *Server) handleMetrics(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "text/plain; version=0.0.4")
|
|
msgs := int64(0)
|
|
nots := int64(0)
|
|
pruned := int64(0)
|
|
conn := int64(0)
|
|
if s.Metrics != nil {
|
|
msgs = atomic.LoadInt64(&s.Metrics.MessagesIngested)
|
|
nots = atomic.LoadInt64(&s.Metrics.NotificationsSent)
|
|
pruned = atomic.LoadInt64(&s.Metrics.MessagesPruned)
|
|
conn = atomic.LoadInt64(&s.Metrics.ConnectedGauge)
|
|
}
|
|
_, _ = fmt.Fprintf(w, "sojuboy_messages_ingested_total %d\n", msgs)
|
|
_, _ = fmt.Fprintf(w, "sojuboy_notifications_sent_total %d\n", nots)
|
|
_, _ = fmt.Fprintf(w, "sojuboy_messages_pruned_total %d\n", pruned)
|
|
_, _ = fmt.Fprintf(w, "sojuboy_connected %d\n", conn)
|
|
}
|
|
|
|
func checkAuth(r *http.Request, token string) bool {
|
|
auth := r.Header.Get("Authorization")
|
|
if strings.HasPrefix(auth, "Bearer ") {
|
|
if strings.TrimPrefix(auth, "Bearer ") == token {
|
|
return true
|
|
}
|
|
}
|
|
if r.URL.Query().Get("token") == token {
|
|
return true
|
|
}
|
|
user, pass, ok := r.BasicAuth()
|
|
if ok && user == "token" && pass == token {
|
|
return true
|
|
}
|
|
if r.Header.Get("X-Auth-Token") == token {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func getIntQuery(r *http.Request, key string, def int) int {
|
|
if v := r.URL.Query().Get(key); v != "" {
|
|
if n, err := strconv.Atoi(v); err == nil {
|
|
return n
|
|
}
|
|
}
|
|
return def
|
|
}
|