feat: initial Beta 1 release

- soju raw connector with event playback and CHATHISTORY fallback
- SQLite store with msgid de-dup and retention job
- Mentions + Pushover + tuning; structured JSON logs
- Summaries: concise, link-following, multi-line grouping
- HTTP: /healthz, /ready, /tail, /trigger, /metrics
- Docker: distroless, healthcheck, version metadata
- Docs: README, CHANGELOG, compose
This commit is contained in:
Thomas Cravey 2025-08-15 18:06:28 -05:00
commit 2954e85e7a
19 changed files with 1983 additions and 0 deletions

254
cmd/sojuboy/main.go Normal file
View file

@ -0,0 +1,254 @@
package main
import (
"context"
"flag"
"io"
"log/slog"
"net/http"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"
"sojuboy/internal/config"
"sojuboy/internal/httpapi"
"sojuboy/internal/logging"
"sojuboy/internal/notifier"
"sojuboy/internal/scheduler"
"sojuboy/internal/soju"
"sojuboy/internal/store"
"sojuboy/internal/summarizer"
)
var (
version = "dev"
commit = ""
builtAt = ""
)
type rateLimiter struct {
mu sync.Mutex
last map[string]time.Time // key: channel|keyword
}
func (r *rateLimiter) allow(key string, minInterval time.Duration) bool {
r.mu.Lock()
defer r.mu.Unlock()
if r.last == nil {
r.last = make(map[string]time.Time)
}
now := time.Now()
if t, ok := r.last[key]; ok {
if now.Sub(t) < minInterval {
return false
}
}
r.last[key] = now
return true
}
func main() {
cfg := config.FromEnv()
health := flag.Bool("health", false, "run healthcheck and exit")
flag.Parse()
if *health {
client := &http.Client{Timeout: 3 * time.Second}
resp, err := client.Get("http://127.0.0.1:8080/ready")
if err != nil {
os.Exit(1)
}
defer resp.Body.Close()
_, _ = io.Copy(io.Discard, resp.Body)
if resp.StatusCode == 200 {
os.Exit(0)
}
os.Exit(1)
}
logger := logging.New(cfg.LogLevel)
logger.Info("starting sojuboy", "version", version, "commit", commit, "builtAt", builtAt)
if cfg.LogLevel == "debug" {
logger.Info("config loaded", "config", cfg.Redact())
}
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
st, err := store.Open(ctx, cfg.StorePath)
if err != nil {
logger.Error("open store", "err", err)
os.Exit(1)
}
defer st.Close()
metrics := &httpapi.Metrics{}
var nt notifier.Notifier
if cfg.Notifier == "pushover" {
nt = notifier.NewPushover(cfg.PushoverUserKey, cfg.PushoverAPIToken)
} else {
logger.Error("unsupported notifier", "notifier", cfg.Notifier)
os.Exit(1)
}
var sum summarizer.Summarizer
if cfg.LLMProvider == "openai" {
ai := summarizer.NewOpenAI(cfg.OpenAIAPIKey, cfg.OpenAIBaseURL, cfg.OpenAIModel, cfg.OpenAIMaxTokens)
// apply summarizer tuning from env
ai.ApplyConfig(cfg)
sum = ai
} else {
logger.Warn("no summarizer configured", "provider", cfg.LLMProvider)
}
// HTTP API
api := httpapi.Server{
ListenAddr: cfg.HTTPListen,
AuthToken: cfg.HTTPToken,
Store: st,
Summarizer: sum,
Notifier: nt,
Logger: slog.New(slog.NewJSONHandler(os.Stdout, nil)), // legacy interface still expects *log.Logger; keep minimal text via adapter if needed
Metrics: metrics,
Ready: func() bool {
return atomic.LoadInt64(&metrics.ConnectedGauge) == 1
},
}
go func() {
if err := api.Start(ctx); err != nil && err != http.ErrServerClosed {
logger.Error("http api", "err", err)
os.Exit(1)
}
}()
// Rate limiter and mention filter
rl := &rateLimiter{}
allowedChannel := func(channel string) bool { return cfg.IsChannelAllowed(channel) }
isUrgent := func(text string) bool { return config.ContainsAnyWordFold(text, cfg.UrgentKeywords) }
// IRC ingestion alert function
mentionChecker := func(text string) bool {
return config.ContainsMention(text, cfg.Nick, cfg.Keywords)
}
alert := func(channel, author, text, msgid string, at time.Time) {
logger.Debug("ingest", "ts", at.UTC(), "channel", channel, "author", author, "body", text, "msgid", msgid)
if err := st.InsertMessage(ctx, store.Message{
Channel: channel,
Author: author,
Body: text,
Time: at.UTC(),
MsgID: msgid,
}); err != nil {
logger.Error("store insert", "err", err)
} else {
atomic.AddInt64(&metrics.MessagesIngested, 1)
}
if mentionChecker(text) && allowedChannel(channel) {
if nt != nil {
if !cfg.NotifyBackfill && time.Since(at) > 5*time.Minute {
logger.Debug("mention suppressed", "reason", "backfill", "channel", channel, "author", author)
return
}
if config.WithinQuietHours(at, cfg.QuietHours) && !isUrgent(text) {
logger.Debug("mention suppressed", "reason", "quiet_hours", "channel", channel, "author", author)
return
}
key := channel + "|" + cfg.Nick
if !rl.allow(key, cfg.MentionMinInterval) {
logger.Debug("mention suppressed", "reason", "rate_limit", "channel", channel, "author", author)
return
}
if err := nt.Notify(ctx, "IRC mention in "+channel, author+": "+text); err != nil {
logger.Error("mention notify", "err", err)
} else {
atomic.AddInt64(&metrics.NotificationsSent, 1)
logger.Debug("mention notified", "channel", channel, "author", author)
}
}
}
}
// Choose connector (raw only for soju)
backfill := config.GetEnvInt("CHATHISTORY_LATEST", 0)
raw := soju.RawClient{
Server: cfg.IRCServer,
Port: cfg.IRCPort,
UseTLS: cfg.IRCTLS,
Nick: cfg.Nick,
Username: cfg.Username,
Realname: cfg.Realname,
Password: cfg.Password,
Channels: cfg.Channels,
BackfillLatest: backfill,
OnPrivmsg: func(channel, author, text, msgid string, at time.Time) {
alert(channel, author, text, msgid, at)
},
ConnectedGauge: &metrics.ConnectedGauge,
}
logger.Info("irc", "auth", "raw", "username", cfg.Username)
go func() {
if err := raw.Run(ctx); err != nil {
logger.Error("irc client", "err", err)
}
}()
// Scheduler for digests
if sum != nil && cfg.DigestCron != "" {
job := func(now time.Time) {
for _, ch := range cfg.Channels {
if config.WithinQuietHours(now, cfg.QuietHours) {
continue
}
window := cfg.DigestWindow
since := now.Add(-window)
msgs, err := st.ListMessagesSince(ctx, ch, since)
if err != nil {
logger.Error("digest fetch", "err", err)
continue
}
summary, err := sum.Summarize(ctx, ch, msgs, window)
if err != nil {
logger.Error("digest summarize", "err", err)
continue
}
if nt != nil {
title := "IRC digest " + ch + " (" + window.String() + ")"
if err := nt.Notify(ctx, title, summary); err != nil {
logger.Error("digest notify", "err", err)
} else {
atomic.AddInt64(&metrics.NotificationsSent, 1)
logger.Debug("digest notified", "channel", ch, "window", window.String())
}
}
}
}
if err := scheduler.Start(ctx, cfg.DigestCron, job, slog.NewLogLogger(logger.Handler(), slog.LevelInfo)); err != nil {
logger.Error("scheduler", "err", err)
}
}
// Daily retention prune job at 03:00 local
if cfg.RetentionDays > 0 {
pruneSpec := "0 0 3 * * *" // sec min hour dom mon dow
pruneJob := func(now time.Time) {
cutoff := now.AddDate(0, 0, -cfg.RetentionDays)
if n, err := st.DeleteOlderThan(ctx, cutoff); err != nil {
logger.Error("retention prune", "err", err)
} else if n > 0 {
atomic.AddInt64(&metrics.MessagesPruned, n)
logger.Info("retention pruned", "count", n, "cutoff", cutoff.Format(time.RFC3339))
}
}
if err := scheduler.Start(ctx, pruneSpec, pruneJob, slog.NewLogLogger(logger.Handler(), slog.LevelInfo)); err != nil {
logger.Error("scheduler prune", "err", err)
}
}
<-ctx.Done()
logger.Info("shutting down")
}