feat(backend): migrate feed runtime pull state to fetch_state

Move pull metadata to feed_fetch_state and unify scheduler decisions around next_check_at with a global FUSION_PULL_MAX_BACKOFF cap. This keeps feed core data stable while making retry/cache-aware pulling behavior explicit across API, docs, and frontend types.
This commit is contained in:
Yuan
2026-02-14 15:25:43 +08:00
parent 8ca4d9e461
commit 62301ea572
24 changed files with 1737 additions and 365 deletions
+2 -2
View File
@@ -26,8 +26,8 @@ FUSION_PULL_TIMEOUT=30
# Maximum concurrent pulls (default: 10)
FUSION_PULL_CONCURRENCY=10
# Maximum backoff time in seconds (default: 604800 = 7 days)
FUSION_PULL_MAX_BACKOFF=604800
# Global max scheduling delay in seconds (default: 172800 = 48 hours)
FUSION_PULL_MAX_BACKOFF=172800
# Login rate limiting
# Max failed attempts per window (default: 10)
+1 -1
View File
@@ -63,7 +63,7 @@ Common keys:
- `FUSION_DB_PATH` (default `fusion.db`)
- `FUSION_PASSWORD` (required unless `FUSION_ALLOW_EMPTY_PASSWORD=true`)
- `FUSION_PORT` (default `8080`)
- `FUSION_PULL_INTERVAL`, `FUSION_PULL_TIMEOUT`, `FUSION_PULL_CONCURRENCY`
- `FUSION_PULL_INTERVAL`, `FUSION_PULL_TIMEOUT`, `FUSION_PULL_CONCURRENCY`, `FUSION_PULL_MAX_BACKOFF`
- `FUSION_CORS_ALLOWED_ORIGINS`, `FUSION_TRUSTED_PROXIES`
- `FUSION_OIDC_*` for optional SSO
+2 -2
View File
@@ -19,7 +19,7 @@ type Config struct {
PullInterval int // Pull interval in seconds (default: 1800 = 30 min)
PullTimeout int // Request timeout in seconds (default: 30)
PullConcurrency int // Max concurrent pulls (default: 10)
PullMaxBackoff int // Max backoff time in seconds (default: 604800 = 7 days)
PullMaxBackoff int // Global max scheduling delay in seconds (default: 172800 = 48 hours)
LoginRateLimit int // Max failed login attempts per window (default: 10)
LoginWindow int // Login rate limit window in seconds (default: 60)
@@ -90,7 +90,7 @@ func Load() (*Config, error) {
if err != nil {
return nil, err
}
pullMaxBackoff, err := getEnvInt("FUSION_PULL_MAX_BACKOFF", 604800, 1)
pullMaxBackoff, err := getEnvInt("FUSION_PULL_MAX_BACKOFF", 172800, 1)
if err != nil {
return nil, err
}
+13
View File
@@ -35,3 +35,16 @@ func TestLoadParsesCORSAndPrivateFeedSettings(t *testing.T) {
t.Fatalf("unexpected second trusted proxy: %q", cfg.TrustedProxies[1])
}
}
func TestLoadUsesDefaultPullMaxBackoff(t *testing.T) {
t.Setenv("FUSION_PASSWORD", "secret")
cfg, err := Load()
if err != nil {
t.Fatalf("Load() failed: %v", err)
}
if cfg.PullMaxBackoff != 172800 {
t.Fatalf("expected default PullMaxBackoff to be 172800, got %d", cfg.PullMaxBackoff)
}
}
+35 -9
View File
@@ -15,20 +15,46 @@ type Feed struct {
Name string `json:"name"`
Link string `json:"link"`
SiteURL string `json:"site_url,omitempty"`
LastBuild int64 `json:"last_build"`
// LastFailureAt is Unix timestamp of the most recent pull failure.
LastFailureAt int64 `json:"last_failure_at"`
Failure string `json:"failure,omitempty"`
Failures int64 `json:"failures"`
Suspended bool `json:"suspended"`
Proxy string `json:"proxy,omitempty"`
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
Suspended bool `json:"suspended"`
Proxy string `json:"proxy,omitempty"`
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
FetchState FeedFetchState `json:"fetch_state"`
UnreadCount int64 `json:"unread_count"`
ItemCount int64 `json:"item_count"`
}
// FeedFetchState stores runtime pull metadata for a feed.
// Time fields are Unix seconds; 0 means unknown/unset.
type FeedFetchState struct {
// ETag is used for If-None-Match conditional requests.
ETag string `json:"etag,omitempty"`
// LastModified stores raw HTTP Last-Modified for If-Modified-Since.
LastModified string `json:"last_modified,omitempty"`
// CacheControl stores raw HTTP Cache-Control value.
CacheControl string `json:"cache_control,omitempty"`
// ExpiresAt is parsed from the HTTP Expires header.
ExpiresAt int64 `json:"expires_at"`
// LastCheckedAt is the last fetch attempt time (success or failure).
LastCheckedAt int64 `json:"last_checked_at"`
// NextCheckAt is the earliest next fetch time decided by pull policy.
NextCheckAt int64 `json:"next_check_at"`
// LastHTTPStatus is the last HTTP status code observed during fetch.
LastHTTPStatus int `json:"last_http_status"`
// RetryAfterUntil is derived from Retry-After and blocks fetch before this time.
RetryAfterUntil int64 `json:"retry_after_until"`
// LastSuccessAt is the last successful check time (includes 200 and 304).
LastSuccessAt int64 `json:"last_success_at"`
// LastErrorAt is the most recent fetch failure time.
LastErrorAt int64 `json:"last_error_at"`
// LastError keeps the latest fetch error message.
LastError string `json:"last_error,omitempty"`
// ConsecutiveFailures is reset on success and incremented on each failure.
ConsecutiveFailures int64 `json:"consecutive_failures"`
}
// Item represents a feed item.
type Item struct {
ID int64 `json:"id"`
-57
View File
@@ -1,57 +0,0 @@
package pull
import (
"math"
"time"
"github.com/0x2E/fusion/internal/model"
)
const backoffBase = 1.8
// CalculateBackoff computes exponential backoff duration.
// Formula: interval × (1.8 ^ failures), capped at maxBackoff.
func CalculateBackoff(interval time.Duration, failures int64, maxBackoff time.Duration) time.Duration {
if failures == 0 {
return 0
}
backoff := float64(interval) * math.Pow(backoffBase, float64(failures))
duration := time.Duration(backoff)
if duration > maxBackoff {
return maxBackoff
}
return duration
}
// ShouldSkip determines if feed fetch should be skipped based on backoff.
// Returns true if feed is suspended or still in backoff period.
func ShouldSkip(feed *model.Feed, interval, maxBackoff time.Duration) bool {
now := time.Now().Unix()
// Skip if suspended
if feed.Suspended {
return true
}
// Skip if in backoff period
if feed.Failures > 0 {
backoff := CalculateBackoff(interval, feed.Failures, maxBackoff)
base := feed.LastFailureAt
if base <= 0 {
base = feed.LastBuild
}
nextPull := base + int64(backoff.Seconds())
if now < nextPull {
return true
}
}
// Skip if recently updated (within interval)
if now-feed.LastBuild < int64(interval.Seconds()) {
return true
}
return false
}
-157
View File
@@ -1,157 +0,0 @@
package pull
import (
"testing"
"time"
"github.com/0x2E/fusion/internal/model"
)
func TestCalculateBackoff(t *testing.T) {
interval := 30 * time.Minute
maxBackoff := 7 * 24 * time.Hour
tests := []struct {
name string
failures int64
wantApprox time.Duration // Approximate expected value
}{
{
name: "no failures",
failures: 0,
wantApprox: 0,
},
{
name: "1 failure",
failures: 1,
wantApprox: 54 * time.Minute, // 30 * 1.8 = 54
},
{
name: "2 failures",
failures: 2,
wantApprox: 97 * time.Minute, // 30 * 1.8^2 = 97.2
},
{
name: "3 failures",
failures: 3,
wantApprox: 175 * time.Minute, // 30 * 1.8^3 = 174.96
},
{
name: "5 failures",
failures: 5,
wantApprox: 9*time.Hour + 30*time.Minute, // 30 * 1.8^5 = 567.65 min ≈ 9.5h
},
{
name: "100 failures (exceeds max)",
failures: 100,
wantApprox: maxBackoff,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := CalculateBackoff(interval, tt.failures, maxBackoff)
if tt.failures == 0 {
if got != 0 {
t.Errorf("CalculateBackoff() = %v, want 0", got)
}
return
}
if tt.failures == 100 {
if got != maxBackoff {
t.Errorf("CalculateBackoff() = %v, want max backoff %v", got, maxBackoff)
}
return
}
// Allow 10% tolerance for floating point calculations
tolerance := float64(tt.wantApprox) * 0.1
diff := float64(got - tt.wantApprox)
if diff < 0 {
diff = -diff
}
if diff > tolerance {
t.Errorf("CalculateBackoff() = %v, want approximately %v (tolerance: %.0f)", got, tt.wantApprox, tolerance)
}
})
}
}
func TestShouldSkip(t *testing.T) {
interval := 30 * time.Minute
maxBackoff := 7 * 24 * time.Hour
now := time.Now().Unix()
tests := []struct {
name string
feed *model.Feed
want bool
}{
{
name: "suspended feed",
feed: &model.Feed{
Suspended: true,
LastBuild: now - 3600,
Failures: 0,
},
want: true,
},
{
name: "recently updated (10 min ago)",
feed: &model.Feed{
Suspended: false,
LastBuild: now - 600, // 10 minutes ago
Failures: 0,
},
want: true,
},
{
name: "ready to pull (40 min ago)",
feed: &model.Feed{
Suspended: false,
LastBuild: now - 2400, // 40 minutes ago
Failures: 0,
},
want: false,
},
{
name: "1 failure, in backoff period",
feed: &model.Feed{
Suspended: false,
LastBuild: now - 1800, // 30 minutes ago (backoff is 54 min)
Failures: 1,
},
want: true,
},
{
name: "1 failure, backoff expired",
feed: &model.Feed{
Suspended: false,
LastBuild: now - 3600, // 60 minutes ago (backoff is 54 min)
Failures: 1,
},
want: false,
},
{
name: "never pulled before",
feed: &model.Feed{
Suspended: false,
LastBuild: 0,
Failures: 0,
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := ShouldSkip(tt.feed, interval, maxBackoff)
if got != tt.want {
t.Errorf("ShouldSkip() = %v, want %v", got, tt.want)
}
})
}
}
+94 -10
View File
@@ -25,38 +25,66 @@ type ParsedItem struct {
PubDate int64
}
// FetchAndParse fetches RSS/Atom feed and parses into items.
// Returns parsed items and optional site URL discovered from feed metadata.
func FetchAndParse(ctx context.Context, feed *model.Feed, timeout time.Duration, allowPrivateFeeds bool) ([]*ParsedItem, string, error) {
type FetchResult struct {
Items []*ParsedItem
SiteURL string
HTTPStatus int
NotModified bool
ETag string
LastModified string
CacheControl string
ExpiresAt int64
RetryAfterUntil int64
}
// FetchAndParse fetches RSS/Atom feed with conditional request headers.
// It returns fetch metadata plus parsed items when response status is 200.
func FetchAndParse(ctx context.Context, feed *model.Feed, timeout time.Duration, allowPrivateFeeds bool) (*FetchResult, error) {
result := &FetchResult{}
if err := httpc.ValidateRequestURL(ctx, feed.Link, allowPrivateFeeds); err != nil {
return nil, "", fmt.Errorf("validate feed url: %w", err)
return nil, fmt.Errorf("validate feed url: %w", err)
}
client, err := httpc.NewClient(timeout, feed.Proxy, allowPrivateFeeds)
if err != nil {
return nil, "", fmt.Errorf("create client: %w", err)
return nil, fmt.Errorf("create client: %w", err)
}
req, err := http.NewRequestWithContext(ctx, "GET", feed.Link, nil)
if err != nil {
return nil, "", fmt.Errorf("create request: %w", err)
return nil, fmt.Errorf("create request: %w", err)
}
httpc.SetDefaultHeaders(req)
setConditionalHeaders(req, feed)
resp, err := client.Do(req)
if err != nil {
return nil, "", fmt.Errorf("fetch feed: %w", err)
return nil, fmt.Errorf("fetch feed: %w", err)
}
defer resp.Body.Close()
now := time.Now().Unix()
result.HTTPStatus = resp.StatusCode
result.ETag = strings.TrimSpace(resp.Header.Get("ETag"))
result.LastModified = strings.TrimSpace(resp.Header.Get("Last-Modified"))
result.CacheControl = strings.TrimSpace(resp.Header.Get("Cache-Control"))
result.ExpiresAt = parseHTTPTime(resp.Header.Get("Expires"))
result.RetryAfterUntil = parseRetryAfter(resp.Header.Get("Retry-After"), now)
if resp.StatusCode == http.StatusNotModified {
result.NotModified = true
return result, nil
}
if resp.StatusCode != http.StatusOK {
return nil, "", fmt.Errorf("HTTP %d", resp.StatusCode)
return result, fmt.Errorf("HTTP %d", resp.StatusCode)
}
fp := gofeed.NewParser()
parsedFeed, err := fp.Parse(resp.Body)
if err != nil {
return nil, "", fmt.Errorf("parse feed: %w", err)
return result, fmt.Errorf("parse feed: %w", err)
}
siteURL := normalizeSiteURL(parsedFeed.Link)
@@ -74,7 +102,63 @@ func FetchAndParse(ctx context.Context, feed *model.Feed, timeout time.Duration,
items = append(items, mapItem(item, baseURL))
}
return items, siteURL, nil
result.Items = items
result.SiteURL = siteURL
return result, nil
}
func setConditionalHeaders(req *http.Request, feed *model.Feed) {
if req == nil || feed == nil {
return
}
if etag := strings.TrimSpace(feed.FetchState.ETag); etag != "" {
req.Header.Set("If-None-Match", etag)
}
if lastModified := strings.TrimSpace(feed.FetchState.LastModified); lastModified != "" {
req.Header.Set("If-Modified-Since", lastModified)
}
}
func parseHTTPTime(value string) int64 {
value = strings.TrimSpace(value)
if value == "" {
return 0
}
parsed, err := http.ParseTime(value)
if err != nil {
return 0
}
return parsed.Unix()
}
func parseRetryAfter(value string, now int64) int64 {
value = strings.TrimSpace(value)
if value == "" {
return 0
}
if seconds, err := strconv.ParseInt(value, 10, 64); err == nil {
if seconds <= 0 {
return 0
}
return now + seconds
}
parsed, err := http.ParseTime(value)
if err != nil {
return 0
}
unix := parsed.Unix()
if unix <= now {
return 0
}
return unix
}
func normalizeSiteURL(raw string) string {
+81
View File
@@ -1,14 +1,95 @@
package pull
import (
"context"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"time"
"github.com/0x2E/fusion/internal/model"
"github.com/mmcdole/gofeed"
)
func TestFetchAndParseSendsConditionalHeadersAndHandles304(t *testing.T) {
var gotIfNoneMatch string
var gotIfModifiedSince string
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gotIfNoneMatch = r.Header.Get("If-None-Match")
gotIfModifiedSince = r.Header.Get("If-Modified-Since")
w.Header().Set("ETag", `"next-etag"`)
w.Header().Set("Last-Modified", "Mon, 02 Jan 2006 15:04:05 GMT")
w.WriteHeader(http.StatusNotModified)
}))
defer server.Close()
feed := &model.Feed{
Link: server.URL,
FetchState: model.FeedFetchState{
ETag: `"prev-etag"`,
LastModified: "Mon, 01 Jan 2006 15:04:05 GMT",
},
}
result, err := FetchAndParse(context.Background(), feed, 5*time.Second, true)
if err != nil {
t.Fatalf("FetchAndParse() failed: %v", err)
}
if gotIfNoneMatch != `"prev-etag"` {
t.Fatalf("expected If-None-Match header set, got %q", gotIfNoneMatch)
}
if gotIfModifiedSince != "Mon, 01 Jan 2006 15:04:05 GMT" {
t.Fatalf("expected If-Modified-Since header set, got %q", gotIfModifiedSince)
}
if !result.NotModified {
t.Fatalf("expected NotModified=true, got false")
}
if result.HTTPStatus != http.StatusNotModified {
t.Fatalf("expected status 304, got %d", result.HTTPStatus)
}
}
func TestFetchAndParseParsesCacheMetadata(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/rss+xml")
w.Header().Set("Cache-Control", "public, max-age=600")
w.Header().Set("Expires", time.Unix(1700000600, 0).UTC().Format(http.TimeFormat))
w.Header().Set("ETag", `"v1"`)
w.Header().Set("Last-Modified", "Mon, 02 Jan 2006 15:04:05 GMT")
_, _ = w.Write([]byte(`<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0"><channel><title>Demo</title><link>https://example.com</link>
<item><guid>g1</guid><title>Item</title><link>https://example.com/1</link></item>
</channel></rss>`))
}))
defer server.Close()
feed := &model.Feed{Link: server.URL}
result, err := FetchAndParse(context.Background(), feed, 5*time.Second, true)
if err != nil {
t.Fatalf("FetchAndParse() failed: %v", err)
}
if result.NotModified {
t.Fatal("expected NotModified=false for 200 response")
}
if result.HTTPStatus != http.StatusOK {
t.Fatalf("expected status 200, got %d", result.HTTPStatus)
}
if result.CacheControl != "public, max-age=600" {
t.Fatalf("expected cache-control metadata, got %q", result.CacheControl)
}
if result.ExpiresAt != 1700000600 {
t.Fatalf("expected expires_at=1700000600, got %d", result.ExpiresAt)
}
if len(result.Items) != 1 {
t.Fatalf("expected 1 item, got %d", len(result.Items))
}
}
func TestMapItemFallbackGUIDWhenMissingGUIDAndLink(t *testing.T) {
now := time.Now().UTC().Truncate(time.Second)
item := &gofeed.Item{
+107 -11
View File
@@ -10,6 +10,7 @@ import (
"github.com/0x2E/fusion/internal/config"
"github.com/0x2E/fusion/internal/model"
"github.com/0x2E/fusion/internal/pullpolicy"
"github.com/0x2E/fusion/internal/store"
"golang.org/x/sync/semaphore"
)
@@ -65,8 +66,17 @@ func (p *Puller) pullAll(ctx context.Context) {
return
}
now := time.Now().Unix()
_, _ = p.dispatchFeeds(ctx, feeds, func(feed *model.Feed) bool {
return !ShouldSkip(feed, p.interval, p.maxBackoff)
state := pullpolicy.FeedRuntimeState{
Suspended: feed.Suspended,
RetryAfterUntil: feed.FetchState.RetryAfterUntil,
NextCheckAt: feed.FetchState.NextCheckAt,
ConsecutiveFailures: feed.FetchState.ConsecutiveFailures,
LastErrorAt: feed.FetchState.LastErrorAt,
LastCheckedAt: feed.FetchState.LastCheckedAt,
}
return !pullpolicy.ShouldSkip(now, state, p.interval, p.maxBackoff)
})
}
@@ -74,17 +84,94 @@ func (p *Puller) pullAll(ctx context.Context) {
func (p *Puller) pullFeed(ctx context.Context, feed *model.Feed) {
p.logger.Debug("pulling feed", "feed_id", feed.ID, "feed_name", feed.Name)
items, siteURL, err := FetchAndParse(ctx, feed, p.timeout, p.config.AllowPrivateFeeds)
result, err := FetchAndParse(ctx, feed, p.timeout, p.config.AllowPrivateFeeds)
checkedAt := time.Now().Unix()
if err != nil {
p.logger.Warn("failed to fetch feed", "feed_id", feed.ID, "feed_name", feed.Name, "error", err)
if err := p.store.UpdateFeedFailure(feed.ID, err.Error()); err != nil {
httpStatus := 0
retryAfterUntil := int64(0)
if result != nil {
httpStatus = result.HTTPStatus
retryAfterUntil = result.RetryAfterUntil
}
if err := p.store.UpdateFeedFetchFailure(feed.ID, store.UpdateFeedFetchFailureParams{
CheckedAt: checkedAt,
HTTPStatus: httpStatus,
LastError: err.Error(),
RetryAfterUntil: retryAfterUntil,
IntervalSeconds: int64(p.interval.Seconds()),
MaxBackoff: int64(p.maxBackoff.Seconds()),
}); err != nil {
p.logger.Error("failed to record failure", "feed_id", feed.ID, "error", err)
}
p.logger.Warn("failed to fetch feed", "feed_id", feed.ID, "feed_name", feed.Name, "status", httpStatus, "error", err)
return
}
inputs := make([]store.BatchCreateItemInput, 0, len(items))
for _, item := range items {
if result.NotModified {
etag := result.ETag
// Some servers reply 304 without echoing validators; keep the previous
// ones so future conditional requests remain effective.
if strings.TrimSpace(etag) == "" {
etag = feed.FetchState.ETag
}
lastModified := result.LastModified
if strings.TrimSpace(lastModified) == "" {
lastModified = feed.FetchState.LastModified
}
cacheControl := result.CacheControl
if strings.TrimSpace(cacheControl) == "" {
cacheControl = feed.FetchState.CacheControl
}
expiresAt := result.ExpiresAt
if expiresAt == 0 {
expiresAt = feed.FetchState.ExpiresAt
}
nextCheckAt := pullpolicy.ComputeNextCheckAt(
checkedAt,
p.interval,
p.maxBackoff,
0,
result.RetryAfterUntil,
cacheControl,
expiresAt,
)
if err := p.store.UpdateFeedFetchSuccess(feed.ID, store.UpdateFeedFetchSuccessParams{
CheckedAt: checkedAt,
HTTPStatus: result.HTTPStatus,
ETag: etag,
LastModified: lastModified,
CacheControl: cacheControl,
ExpiresAt: expiresAt,
RetryAfterUntil: result.RetryAfterUntil,
NextCheckAt: nextCheckAt,
}); err != nil {
p.logger.Error("failed to persist not-modified state", "feed_id", feed.ID, "error", err)
return
}
p.logger.Debug("feed not modified", "feed_id", feed.ID, "feed_name", feed.Name)
return
}
nextCheckAt := pullpolicy.ComputeNextCheckAt(
checkedAt,
p.interval,
p.maxBackoff,
0,
result.RetryAfterUntil,
result.CacheControl,
result.ExpiresAt,
)
inputs := make([]store.BatchCreateItemInput, 0, len(result.Items))
for _, item := range result.Items {
inputs = append(inputs, store.BatchCreateItemInput{
GUID: item.GUID,
Title: item.Title,
@@ -100,14 +187,23 @@ func (p *Puller) pullFeed(ctx context.Context, feed *model.Feed) {
return
}
if err := p.store.UpdateFeedLastBuild(feed.ID, time.Now().Unix()); err != nil {
p.logger.Error("failed to update last_build", "feed_id", feed.ID, "error", err)
if err := p.store.UpdateFeedFetchSuccess(feed.ID, store.UpdateFeedFetchSuccessParams{
CheckedAt: checkedAt,
HTTPStatus: result.HTTPStatus,
ETag: result.ETag,
LastModified: result.LastModified,
CacheControl: result.CacheControl,
ExpiresAt: result.ExpiresAt,
RetryAfterUntil: result.RetryAfterUntil,
NextCheckAt: nextCheckAt,
}); err != nil {
p.logger.Error("failed to update fetch state", "feed_id", feed.ID, "error", err)
return
}
if strings.TrimSpace(feed.SiteURL) == "" && siteURL != "" {
if err := p.store.UpdateFeedSiteURLIfEmpty(feed.ID, siteURL); err != nil {
p.logger.Warn("failed to auto-fill site_url", "feed_id", feed.ID, "site_url", siteURL, "error", err)
if strings.TrimSpace(feed.SiteURL) == "" && result.SiteURL != "" {
if err := p.store.UpdateFeedSiteURLIfEmpty(feed.ID, result.SiteURL); err != nil {
p.logger.Warn("failed to auto-fill site_url", "feed_id", feed.ID, "site_url", result.SiteURL, "error", err)
}
}
+79 -2
View File
@@ -6,6 +6,7 @@ import (
"net/http"
"net/http/httptest"
"path/filepath"
"sync/atomic"
"testing"
"time"
@@ -13,6 +14,82 @@ import (
"github.com/0x2E/fusion/internal/store"
)
func TestRefreshFeedPreservesValidatorsWhen304OmitHeaders(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), "test.db")
st, err := store.New(dbPath)
if err != nil {
t.Fatalf("create store: %v", err)
}
defer st.Close()
var requestCount int32
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
count := atomic.AddInt32(&requestCount, 1)
if count == 1 {
w.Header().Set("Content-Type", "application/rss+xml")
w.Header().Set("ETag", `"etag-v1"`)
w.Header().Set("Last-Modified", "Mon, 02 Jan 2006 15:04:05 GMT")
w.Header().Set("Cache-Control", "max-age=86400")
w.Header().Set("Expires", "Tue, 03 Jan 2006 15:04:05 GMT")
_, _ = fmt.Fprint(w, `<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0"><channel><title>Demo</title><link>https://example.com</link>
<item><guid>g1</guid><title>Item</title><link>https://example.com/1</link></item>
</channel></rss>`)
return
}
w.WriteHeader(http.StatusNotModified)
}))
defer server.Close()
feed, err := st.CreateFeed(1, "Feed A", server.URL, "", "")
if err != nil {
t.Fatalf("create feed: %v", err)
}
p := New(st, &config.Config{
PullInterval: 1800,
PullTimeout: 5,
PullConcurrency: 1,
PullMaxBackoff: 604800,
AllowPrivateFeeds: true,
})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := p.RefreshFeed(ctx, feed.ID); err != nil {
t.Fatalf("first refresh: %v", err)
}
if err := p.RefreshFeed(ctx, feed.ID); err != nil {
t.Fatalf("second refresh: %v", err)
}
updatedFeed, err := st.GetFeed(feed.ID)
if err != nil {
t.Fatalf("get feed: %v", err)
}
if updatedFeed.FetchState.ETag != `"etag-v1"` {
t.Fatalf("etag = %q, want %q", updatedFeed.FetchState.ETag, `"etag-v1"`)
}
if updatedFeed.FetchState.LastModified != "Mon, 02 Jan 2006 15:04:05 GMT" {
t.Fatalf("last_modified = %q, want %q", updatedFeed.FetchState.LastModified, "Mon, 02 Jan 2006 15:04:05 GMT")
}
if updatedFeed.FetchState.CacheControl != "max-age=86400" {
t.Fatalf("cache_control = %q, want %q", updatedFeed.FetchState.CacheControl, "max-age=86400")
}
expires, err := http.ParseTime("Tue, 03 Jan 2006 15:04:05 GMT")
if err != nil {
t.Fatalf("parse expires: %v", err)
}
if updatedFeed.FetchState.ExpiresAt != expires.Unix() {
t.Fatalf("expires_at = %d, want %d", updatedFeed.FetchState.ExpiresAt, expires.Unix())
}
}
func TestRefreshAllWaitsForRunningJobs(t *testing.T) {
dbPath := filepath.Join(t.TempDir(), "test.db")
st, err := store.New(dbPath)
@@ -62,8 +139,8 @@ func TestRefreshAllWaitsForRunningJobs(t *testing.T) {
t.Fatalf("list feeds: %v", err)
}
for _, feed := range feeds {
if feed.LastBuild <= 0 {
t.Fatalf("feed %d last_build = %d, want > 0", feed.ID, feed.LastBuild)
if feed.FetchState.LastSuccessAt <= 0 {
t.Fatalf("feed %d last_success_at = %d, want > 0", feed.ID, feed.FetchState.LastSuccessAt)
}
}
}
+175
View File
@@ -0,0 +1,175 @@
package pullpolicy
import (
"math"
"strconv"
"strings"
"time"
)
const backoffBase = 1.8
type FeedRuntimeState struct {
Suspended bool
RetryAfterUntil int64
NextCheckAt int64
ConsecutiveFailures int64
LastErrorAt int64
LastCheckedAt int64
}
func ShouldSkip(now int64, state FeedRuntimeState, interval, maxBackoff time.Duration) bool {
if state.Suspended {
return true
}
if state.RetryAfterUntil > now {
return true
}
if state.NextCheckAt > now {
return true
}
if state.NextCheckAt > 0 {
return false
}
if state.ConsecutiveFailures > 0 {
backoff := CalculateBackoff(interval, state.ConsecutiveFailures, maxBackoff)
base := state.LastErrorAt
if base <= 0 {
base = state.LastCheckedAt
}
nextPull := base + int64(backoff.Seconds())
if now < nextPull {
return true
}
}
if now-state.LastCheckedAt < int64(interval.Seconds()) {
return true
}
return false
}
func CalculateBackoff(interval time.Duration, failures int64, maxBackoff time.Duration) time.Duration {
if failures == 0 {
return 0
}
backoff := float64(interval) * math.Pow(backoffBase, float64(failures))
duration := time.Duration(backoff)
if duration > maxBackoff {
return maxBackoff
}
return duration
}
func ComputeNextCheckAt(
now int64,
interval, maxBackoff time.Duration,
consecutiveFailures int64,
retryAfterUntil int64,
cacheControl string,
expiresAt int64,
) int64 {
return ComputeNextCheckAtSeconds(
now,
int64(interval.Seconds()),
int64(maxBackoff.Seconds()),
consecutiveFailures,
retryAfterUntil,
cacheControl,
expiresAt,
)
}
func ComputeNextCheckAtSeconds(
now int64,
intervalSeconds int64,
maxBackoffSeconds int64,
consecutiveFailures int64,
retryAfterUntil int64,
cacheControl string,
expiresAt int64,
) int64 {
if intervalSeconds <= 0 {
intervalSeconds = 1
}
if maxBackoffSeconds <= 0 {
maxBackoffSeconds = intervalSeconds
}
branchDelay := intervalSeconds
if retryAfterDelay := retryAfterUntil - now; retryAfterDelay > branchDelay {
branchDelay = retryAfterDelay
}
if cacheMaxAge := parseCacheControlMaxAgeSeconds(cacheControl); cacheMaxAge > 0 {
if cacheMaxAge > branchDelay {
branchDelay = cacheMaxAge
}
}
if expiresDelay := expiresAt - now; expiresDelay > branchDelay {
branchDelay = expiresDelay
}
if consecutiveFailures > 0 {
backoffSeconds := calculateBackoffSeconds(intervalSeconds, consecutiveFailures, maxBackoffSeconds)
if backoffSeconds > branchDelay {
branchDelay = backoffSeconds
}
}
if branchDelay < 0 {
branchDelay = 0
}
if branchDelay > maxBackoffSeconds {
branchDelay = maxBackoffSeconds
}
return now + branchDelay
}
func calculateBackoffSeconds(intervalSeconds, failures, maxBackoffSeconds int64) int64 {
if failures <= 0 {
return 0
}
backoff := float64(intervalSeconds) * math.Pow(backoffBase, float64(failures))
seconds := int64(backoff)
if seconds > maxBackoffSeconds {
return maxBackoffSeconds
}
return seconds
}
func parseCacheControlMaxAgeSeconds(cacheControl string) int64 {
if strings.TrimSpace(cacheControl) == "" {
return 0
}
for _, part := range strings.Split(cacheControl, ",") {
token := strings.TrimSpace(strings.ToLower(part))
if !strings.HasPrefix(token, "max-age=") {
continue
}
raw := strings.TrimSpace(strings.TrimPrefix(token, "max-age="))
seconds, err := strconv.ParseInt(raw, 10, 64)
if err != nil || seconds <= 0 {
return 0
}
return seconds
}
return 0
}
@@ -0,0 +1,210 @@
package pullpolicy
import (
"testing"
"time"
)
func TestComputeNextCheckAtSecondsPicksStrictestDelay(t *testing.T) {
now := int64(1000)
got := ComputeNextCheckAtSeconds(
now,
60,
86400,
2,
1120,
"public, max-age=300",
1250,
)
// max(now+interval=1060, retry_after=1120, expires=1250, backoff=1194, cache=1300)
want := int64(1300)
if got != want {
t.Fatalf("ComputeNextCheckAtSeconds() = %d, want %d", got, want)
}
}
func TestComputeNextCheckAtSecondsBackoffCapped(t *testing.T) {
now := int64(200)
got := ComputeNextCheckAtSeconds(
now,
60,
500,
20,
0,
"",
0,
)
want := now + 500
if got != want {
t.Fatalf("ComputeNextCheckAtSeconds() = %d, want %d", got, want)
}
}
func TestComputeNextCheckAtSecondsSuccessBranchCappedByGlobalMax(t *testing.T) {
now := int64(1000)
got := ComputeNextCheckAtSeconds(
now,
60,
120,
0,
0,
"public, max-age=600",
0,
)
want := now + 120
if got != want {
t.Fatalf("ComputeNextCheckAtSeconds() = %d, want %d", got, want)
}
}
func TestComputeNextCheckAtSecondsRetryAfterCappedByGlobalMax(t *testing.T) {
now := int64(2000)
got := ComputeNextCheckAtSeconds(
now,
60,
300,
0,
now+3600,
"",
0,
)
want := now + 300
if got != want {
t.Fatalf("ComputeNextCheckAtSeconds() = %d, want %d", got, want)
}
}
func TestComputeNextCheckAtSecondsUsesSafeDefaults(t *testing.T) {
now := int64(123)
got := ComputeNextCheckAtSeconds(
now,
0,
0,
0,
0,
"",
0,
)
want := now + 1
if got != want {
t.Fatalf("ComputeNextCheckAtSeconds() = %d, want %d", got, want)
}
}
func TestParseCacheControlMaxAgeSeconds(t *testing.T) {
tests := []struct {
name string
input string
want int64
}{
{name: "normal", input: "public, max-age=600", want: 600},
{name: "uppercase", input: "MAX-AGE=120", want: 120},
{name: "invalid", input: "max-age=abc", want: 0},
{name: "missing", input: "no-store", want: 0},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := parseCacheControlMaxAgeSeconds(tt.input)
if got != tt.want {
t.Fatalf("parseCacheControlMaxAgeSeconds(%q) = %d, want %d", tt.input, got, tt.want)
}
})
}
}
func TestComputeNextCheckAtDurationWrapper(t *testing.T) {
now := int64(100)
got := ComputeNextCheckAt(
now,
30*time.Minute,
7*24*time.Hour,
1,
0,
"",
0,
)
if got <= now {
t.Fatalf("ComputeNextCheckAt() = %d, want > %d", got, now)
}
}
func TestShouldSkip(t *testing.T) {
interval := 30 * time.Minute
maxBackoff := 7 * 24 * time.Hour
now := time.Now().Unix()
tests := []struct {
name string
state FeedRuntimeState
want bool
}{
{
name: "suspended feed",
state: FeedRuntimeState{
Suspended: true,
LastCheckedAt: now - 3600,
},
want: true,
},
{
name: "recently updated (10 min ago)",
state: FeedRuntimeState{
LastCheckedAt: now - 600,
},
want: true,
},
{
name: "ready to pull (40 min ago)",
state: FeedRuntimeState{
LastCheckedAt: now - 2400,
},
want: false,
},
{
name: "1 failure, in backoff period",
state: FeedRuntimeState{
LastCheckedAt: now - 1800,
ConsecutiveFailures: 1,
LastErrorAt: now - 1800,
},
want: true,
},
{
name: "1 failure, backoff expired",
state: FeedRuntimeState{
LastCheckedAt: now - 3600,
ConsecutiveFailures: 1,
LastErrorAt: now - 3600,
},
want: false,
},
{
name: "never pulled before",
state: FeedRuntimeState{},
want: false,
},
{
name: "explicit next_check_at in future",
state: FeedRuntimeState{
NextCheckAt: now + 300,
},
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := ShouldSkip(now, tt.state, interval, maxBackoff)
if got != tt.want {
t.Errorf("ShouldSkip() = %v, want %v", got, tt.want)
}
})
}
}
+322 -26
View File
@@ -7,18 +7,27 @@ import (
"strings"
"github.com/0x2E/fusion/internal/model"
"github.com/0x2E/fusion/internal/pullpolicy"
)
func (s *Store) ListFeeds() ([]*model.Feed, error) {
rows, err := s.db.Query(`
SELECT f.id, f.group_id, f.name, f.link, f.site_url, f.last_build, f.last_failure_at,
f.failure, f.failures, f.suspended, f.proxy, f.created_at, f.updated_at,
SELECT f.id, f.group_id, f.name, f.link, f.site_url,
f.suspended, f.proxy, f.created_at, f.updated_at,
COALESCE(fs.etag, ''), COALESCE(fs.last_modified, ''), COALESCE(fs.cache_control, ''),
COALESCE(fs.expires_at, 0), COALESCE(fs.last_checked_at, 0), COALESCE(fs.next_check_at, 0),
COALESCE(fs.last_http_status, 0), COALESCE(fs.retry_after_until, 0), COALESCE(fs.last_success_at, 0),
COALESCE(fs.last_error_at, 0), COALESCE(fs.last_error, ''), COALESCE(fs.consecutive_failures, 0),
COALESCE(SUM(CASE WHEN i.unread = 1 THEN 1 ELSE 0 END), 0) AS unread_count,
COALESCE(COUNT(i.id), 0) AS item_count
FROM feeds f
LEFT JOIN feed_fetch_state fs ON fs.feed_id = f.id
LEFT JOIN items i ON i.feed_id = f.id
GROUP BY f.id, f.group_id, f.name, f.link, f.site_url, f.last_build, f.last_failure_at,
f.failure, f.failures, f.suspended, f.proxy, f.created_at, f.updated_at
GROUP BY f.id, f.group_id, f.name, f.link, f.site_url,
f.suspended, f.proxy, f.created_at, f.updated_at,
fs.etag, fs.last_modified, fs.cache_control, fs.expires_at, fs.last_checked_at,
fs.next_check_at, fs.last_http_status, fs.retry_after_until, fs.last_success_at,
fs.last_error_at, fs.last_error, fs.consecutive_failures
ORDER BY f.id
`)
if err != nil {
@@ -30,7 +39,31 @@ func (s *Store) ListFeeds() ([]*model.Feed, error) {
for rows.Next() {
f := &model.Feed{}
var suspended int
if err := rows.Scan(&f.ID, &f.GroupID, &f.Name, &f.Link, &f.SiteURL, &f.LastBuild, &f.LastFailureAt, &f.Failure, &f.Failures, &suspended, &f.Proxy, &f.CreatedAt, &f.UpdatedAt, &f.UnreadCount, &f.ItemCount); err != nil {
if err := rows.Scan(
&f.ID,
&f.GroupID,
&f.Name,
&f.Link,
&f.SiteURL,
&suspended,
&f.Proxy,
&f.CreatedAt,
&f.UpdatedAt,
&f.FetchState.ETag,
&f.FetchState.LastModified,
&f.FetchState.CacheControl,
&f.FetchState.ExpiresAt,
&f.FetchState.LastCheckedAt,
&f.FetchState.NextCheckAt,
&f.FetchState.LastHTTPStatus,
&f.FetchState.RetryAfterUntil,
&f.FetchState.LastSuccessAt,
&f.FetchState.LastErrorAt,
&f.FetchState.LastError,
&f.FetchState.ConsecutiveFailures,
&f.UnreadCount,
&f.ItemCount,
); err != nil {
return nil, err
}
f.Suspended = intToBool(suspended)
@@ -43,10 +76,38 @@ func (s *Store) GetFeed(id int64) (*model.Feed, error) {
f := &model.Feed{}
var suspended int
err := s.db.QueryRow(`
SELECT id, group_id, name, link, site_url, last_build, last_failure_at, failure, failures, suspended, proxy, created_at, updated_at
FROM feeds
WHERE id = :id
`, sql.Named("id", id)).Scan(&f.ID, &f.GroupID, &f.Name, &f.Link, &f.SiteURL, &f.LastBuild, &f.LastFailureAt, &f.Failure, &f.Failures, &suspended, &f.Proxy, &f.CreatedAt, &f.UpdatedAt)
SELECT f.id, f.group_id, f.name, f.link, f.site_url,
f.suspended, f.proxy, f.created_at, f.updated_at,
COALESCE(fs.etag, ''), COALESCE(fs.last_modified, ''), COALESCE(fs.cache_control, ''),
COALESCE(fs.expires_at, 0), COALESCE(fs.last_checked_at, 0), COALESCE(fs.next_check_at, 0),
COALESCE(fs.last_http_status, 0), COALESCE(fs.retry_after_until, 0), COALESCE(fs.last_success_at, 0),
COALESCE(fs.last_error_at, 0), COALESCE(fs.last_error, ''), COALESCE(fs.consecutive_failures, 0)
FROM feeds f
LEFT JOIN feed_fetch_state fs ON fs.feed_id = f.id
WHERE f.id = :id
`, sql.Named("id", id)).Scan(
&f.ID,
&f.GroupID,
&f.Name,
&f.Link,
&f.SiteURL,
&suspended,
&f.Proxy,
&f.CreatedAt,
&f.UpdatedAt,
&f.FetchState.ETag,
&f.FetchState.LastModified,
&f.FetchState.CacheControl,
&f.FetchState.ExpiresAt,
&f.FetchState.LastCheckedAt,
&f.FetchState.NextCheckAt,
&f.FetchState.LastHTTPStatus,
&f.FetchState.RetryAfterUntil,
&f.FetchState.LastSuccessAt,
&f.FetchState.LastErrorAt,
&f.FetchState.LastError,
&f.FetchState.ConsecutiveFailures,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, fmt.Errorf("%w: feed", ErrNotFound)
@@ -59,7 +120,13 @@ func (s *Store) GetFeed(id int64) (*model.Feed, error) {
}
func (s *Store) CreateFeed(groupID int64, name, link, siteURL, proxy string) (*model.Feed, error) {
result, err := s.db.Exec(`
tx, err := s.db.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()
result, err := tx.Exec(`
INSERT INTO feeds (group_id, name, link, site_url, proxy)
VALUES (:group_id, :name, :link, :site_url, :proxy)
`, sql.Named("group_id", groupID), sql.Named("name", name), sql.Named("link", link),
@@ -73,6 +140,17 @@ func (s *Store) CreateFeed(groupID int64, name, link, siteURL, proxy string) (*m
return nil, err
}
if _, err := tx.Exec(`
INSERT INTO feed_fetch_state (feed_id, next_check_at)
VALUES (:feed_id, unixepoch())
`, sql.Named("feed_id", id)); err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err
}
return s.GetFeed(id)
}
@@ -151,9 +229,15 @@ func (s *Store) UpdateFeed(id int64, params UpdateFeedParams) error {
return nil
}
tx, err := s.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
setClauses = append(setClauses, "updated_at = unixepoch()")
query := fmt.Sprintf("UPDATE feeds SET %s WHERE id = :id", strings.Join(setClauses, ", "))
result, err := s.db.Exec(query, args...)
result, err := tx.Exec(query, args...)
if err != nil {
return err
}
@@ -164,7 +248,61 @@ func (s *Store) UpdateFeed(id int64, params UpdateFeedParams) error {
if rows == 0 {
return fmt.Errorf("%w: feed", ErrNotFound)
}
return nil
if params.Link != nil {
if _, err := tx.Exec(`
INSERT INTO feed_fetch_state (
feed_id,
etag,
last_modified,
cache_control,
expires_at,
last_checked_at,
next_check_at,
last_http_status,
retry_after_until,
last_success_at,
last_error_at,
last_error,
consecutive_failures,
updated_at
)
VALUES (
:feed_id,
'',
'',
'',
0,
0,
unixepoch(),
0,
0,
0,
0,
'',
0,
unixepoch()
)
ON CONFLICT(feed_id) DO UPDATE SET
etag = '',
last_modified = '',
cache_control = '',
expires_at = 0,
last_checked_at = 0,
next_check_at = unixepoch(),
last_http_status = 0,
retry_after_until = 0,
last_success_at = 0,
last_error_at = 0,
last_error = '',
consecutive_failures = 0,
updated_at = unixepoch()
`, sql.Named("feed_id", id)); err != nil {
return err
}
}
return tx.Commit()
}
// DeleteFeed removes a feed and all its items in a transaction.
@@ -203,24 +341,168 @@ func (s *Store) DeleteFeed(id int64) error {
return tx.Commit()
}
// UpdateFeedLastBuild records successful feed fetch and resets failure counters.
// This allows feeds to auto-recover from temporary network issues.
func (s *Store) UpdateFeedLastBuild(id int64, lastBuild int64) error {
type UpdateFeedFetchSuccessParams struct {
CheckedAt int64
HTTPStatus int
ETag string
LastModified string
CacheControl string
ExpiresAt int64
RetryAfterUntil int64
NextCheckAt int64
}
func (s *Store) UpdateFeedFetchSuccess(id int64, params UpdateFeedFetchSuccessParams) error {
_, err := s.db.Exec(`
UPDATE feeds
SET last_build = :last_build, last_failure_at = 0, failures = 0, failure = '', updated_at = unixepoch()
WHERE id = :id
`, sql.Named("last_build", lastBuild), sql.Named("id", id))
INSERT INTO feed_fetch_state (
feed_id,
etag,
last_modified,
cache_control,
expires_at,
last_checked_at,
next_check_at,
last_http_status,
retry_after_until,
last_success_at,
last_error_at,
last_error,
consecutive_failures,
updated_at
)
VALUES (
:feed_id,
:etag,
:last_modified,
:cache_control,
:expires_at,
:last_checked_at,
:next_check_at,
:last_http_status,
:retry_after_until,
:last_success_at,
0,
'',
0,
unixepoch()
)
ON CONFLICT(feed_id) DO UPDATE SET
etag = excluded.etag,
last_modified = excluded.last_modified,
cache_control = excluded.cache_control,
expires_at = excluded.expires_at,
last_checked_at = excluded.last_checked_at,
next_check_at = excluded.next_check_at,
last_http_status = excluded.last_http_status,
retry_after_until = excluded.retry_after_until,
last_success_at = excluded.last_success_at,
last_error_at = 0,
last_error = '',
consecutive_failures = 0,
updated_at = unixepoch()
`,
sql.Named("feed_id", id),
sql.Named("etag", strings.TrimSpace(params.ETag)),
sql.Named("last_modified", strings.TrimSpace(params.LastModified)),
sql.Named("cache_control", strings.TrimSpace(params.CacheControl)),
sql.Named("expires_at", params.ExpiresAt),
sql.Named("last_checked_at", params.CheckedAt),
sql.Named("next_check_at", params.NextCheckAt),
sql.Named("last_http_status", params.HTTPStatus),
sql.Named("retry_after_until", params.RetryAfterUntil),
sql.Named("last_success_at", params.CheckedAt),
)
return err
}
func (s *Store) UpdateFeedFailure(id int64, failure string) error {
_, err := s.db.Exec(`
UPDATE feeds
SET failures = failures + 1, failure = :failure, last_failure_at = unixepoch(), updated_at = unixepoch()
WHERE id = :id
`, sql.Named("failure", failure), sql.Named("id", id))
return err
type UpdateFeedFetchFailureParams struct {
CheckedAt int64
HTTPStatus int
LastError string
RetryAfterUntil int64
IntervalSeconds int64
MaxBackoff int64
}
func (s *Store) UpdateFeedFetchFailure(id int64, params UpdateFeedFetchFailureParams) error {
tx, err := s.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
if _, err := tx.Exec(`
INSERT INTO feed_fetch_state (
feed_id,
last_checked_at,
next_check_at,
last_http_status,
retry_after_until,
last_error_at,
last_error,
consecutive_failures,
updated_at
)
VALUES (
:feed_id,
:last_checked_at,
:next_check_at,
:last_http_status,
:retry_after_until,
:last_error_at,
:last_error,
1,
unixepoch()
)
ON CONFLICT(feed_id) DO UPDATE SET
last_checked_at = excluded.last_checked_at,
next_check_at = excluded.next_check_at,
last_http_status = excluded.last_http_status,
retry_after_until = excluded.retry_after_until,
last_error_at = excluded.last_error_at,
last_error = excluded.last_error,
consecutive_failures = feed_fetch_state.consecutive_failures + 1,
updated_at = unixepoch()
`,
sql.Named("feed_id", id),
sql.Named("last_checked_at", params.CheckedAt),
sql.Named("next_check_at", params.CheckedAt),
sql.Named("last_http_status", params.HTTPStatus),
sql.Named("retry_after_until", params.RetryAfterUntil),
sql.Named("last_error_at", params.CheckedAt),
sql.Named("last_error", strings.TrimSpace(params.LastError)),
); err != nil {
return err
}
var newFailures int64
if err := tx.QueryRow(`
SELECT consecutive_failures
FROM feed_fetch_state
WHERE feed_id = :feed_id
`, sql.Named("feed_id", id)).Scan(&newFailures); err != nil {
return err
}
nextCheckAt := pullpolicy.ComputeNextCheckAtSeconds(
params.CheckedAt,
params.IntervalSeconds,
params.MaxBackoff,
newFailures,
params.RetryAfterUntil,
"",
0,
)
if _, err := tx.Exec(`
UPDATE feed_fetch_state
SET next_check_at = :next_check_at, updated_at = unixepoch()
WHERE feed_id = :feed_id
`, sql.Named("next_check_at", nextCheckAt), sql.Named("feed_id", id)); err != nil {
return err
}
return tx.Commit()
}
func (s *Store) UpdateFeedSiteURLIfEmpty(id int64, siteURL string) error {
@@ -277,6 +559,16 @@ func (s *Store) BatchCreateFeeds(inputs []BatchCreateFeedsInput) (*BatchCreateFe
}
defer stmt.Close()
stateStmt, err := tx.Prepare(`
INSERT INTO feed_fetch_state (feed_id, next_check_at)
VALUES (:feed_id, unixepoch())
ON CONFLICT(feed_id) DO UPDATE SET next_check_at = excluded.next_check_at, updated_at = unixepoch()
`)
if err != nil {
return nil, err
}
defer stateStmt.Close()
seenLinks := make(map[string]bool, len(inputs))
for _, input := range inputs {
@@ -313,6 +605,10 @@ func (s *Store) BatchCreateFeeds(inputs []BatchCreateFeedsInput) (*BatchCreateFe
continue
}
if _, err := stateStmt.Exec(sql.Named("feed_id", id)); err != nil {
return nil, fmt.Errorf("init fetch state for %s: %w", input.Link, err)
}
result.Created++
result.CreatedIDs = append(result.CreatedIDs, id)
}
+227 -34
View File
@@ -3,6 +3,7 @@ package store
import (
"database/sql"
"errors"
"sync"
"testing"
"time"
)
@@ -91,8 +92,8 @@ func TestCreateFeed(t *testing.T) {
t.Error("expected suspended to default to false")
}
if feed.Failures != 0 {
t.Error("expected failures to default to 0")
if feed.FetchState.ConsecutiveFailures != 0 {
t.Error("expected consecutive_failures to default to 0")
}
if feed.ID == 0 || feed.CreatedAt == 0 || feed.UpdatedAt == 0 {
@@ -173,12 +174,55 @@ func TestUpdateFeed(t *testing.T) {
}
}
func TestUpdateFeedLinkResetsFetchState(t *testing.T) {
store, _ := setupTestDB(t)
defer closeStore(t, store)
group := mustCreateGroup(t, store, "Group")
feed := mustCreateFeed(t, store, group.ID, "Feed", "https://example.com/feed.xml", "https://example.com", "")
checkedAt := time.Now().Unix()
if err := store.UpdateFeedFetchSuccess(feed.ID, UpdateFeedFetchSuccessParams{
CheckedAt: checkedAt,
HTTPStatus: 200,
ETag: "etag-v1",
LastModified: "Mon, 02 Jan 2006 15:04:05 GMT",
NextCheckAt: checkedAt + 3600,
}); err != nil {
t.Fatalf("UpdateFeedFetchSuccess() failed: %v", err)
}
newLink := "https://example.com/new-feed.xml"
if err := store.UpdateFeed(feed.ID, UpdateFeedParams{Link: &newLink}); err != nil {
t.Fatalf("UpdateFeed() failed: %v", err)
}
updated, err := store.GetFeed(feed.ID)
if err != nil {
t.Fatalf("GetFeed() failed: %v", err)
}
if updated.Link != newLink {
t.Fatalf("expected link %q, got %q", newLink, updated.Link)
}
if updated.FetchState.ETag != "" {
t.Fatalf("expected etag to be reset, got %q", updated.FetchState.ETag)
}
if updated.FetchState.LastModified != "" {
t.Fatalf("expected last_modified to be reset, got %q", updated.FetchState.LastModified)
}
if updated.FetchState.ConsecutiveFailures != 0 {
t.Fatalf("expected consecutive_failures reset to 0, got %d", updated.FetchState.ConsecutiveFailures)
}
}
func TestDeleteFeed(t *testing.T) {
store, _ := setupTestDB(t)
defer closeStore(t, store)
group := mustCreateGroup(t, store, "Test Group")
feed := mustCreateFeed(t, store, group.ID, "Test Feed", "https://example.com/feed", "https://example.com", "")
item := mustCreateItem(t, store, feed.ID, "guid-1", "Item 1", "https://example.com/item1", "Content 1", time.Now().Unix())
bookmark := mustCreateBookmark(t, store, &item.ID, "https://example.com/item1", "Item 1", "Content 1", item.PubDate, "Test Feed")
@@ -206,20 +250,46 @@ func TestDeleteFeed(t *testing.T) {
}
}
func TestUpdateFeedLastBuild(t *testing.T) {
func TestUpdateFeedFetchSuccess(t *testing.T) {
store, _ := setupTestDB(t)
defer closeStore(t, store)
group := mustCreateGroup(t, store, "Test Group")
feed := mustCreateFeed(t, store, group.ID, "Test Feed", "https://example.com/feed", "https://example.com", "")
if err := store.UpdateFeedFailure(feed.ID, "test error"); err != nil {
t.Fatalf("UpdateFeedFailure() failed: %v", err)
seedCheckedAt := time.Now().Unix()
if err := store.UpdateFeedFetchSuccess(feed.ID, UpdateFeedFetchSuccessParams{
CheckedAt: seedCheckedAt,
HTTPStatus: 200,
CacheControl: "max-age=1800",
ExpiresAt: seedCheckedAt + 1800,
NextCheckAt: seedCheckedAt + 1800,
}); err != nil {
t.Fatalf("UpdateFeedFetchSuccess() seed failed: %v", err)
}
lastBuild := time.Now().Unix()
if err := store.UpdateFeedLastBuild(feed.ID, lastBuild); err != nil {
t.Fatalf("UpdateFeedLastBuild() failed: %v", err)
if err := store.UpdateFeedFetchFailure(feed.ID, UpdateFeedFetchFailureParams{
CheckedAt: time.Now().Unix(),
HTTPStatus: 500,
LastError: "test error",
IntervalSeconds: 1800,
MaxBackoff: 604800,
}); err != nil {
t.Fatalf("UpdateFeedFetchFailure() failed: %v", err)
}
checkedAt := time.Now().Unix()
nextCheckAt := time.Now().Add(30 * time.Minute).Unix()
if err := store.UpdateFeedFetchSuccess(feed.ID, UpdateFeedFetchSuccessParams{
CheckedAt: checkedAt,
HTTPStatus: 200,
ETag: "abc",
LastModified: "Mon, 02 Jan 2006 15:04:05 GMT",
CacheControl: "max-age=1800",
ExpiresAt: checkedAt + 1800,
NextCheckAt: nextCheckAt,
}); err != nil {
t.Fatalf("UpdateFeedFetchSuccess() failed: %v", err)
}
updated, err := store.GetFeed(feed.ID)
@@ -227,32 +297,56 @@ func TestUpdateFeedLastBuild(t *testing.T) {
t.Fatalf("GetFeed() failed: %v", err)
}
if updated.LastBuild != lastBuild {
t.Errorf("expected last_build %d, got %d", lastBuild, updated.LastBuild)
if updated.FetchState.LastSuccessAt != checkedAt {
t.Errorf("expected last_success_at %d, got %d", checkedAt, updated.FetchState.LastSuccessAt)
}
if updated.Failure != "" {
t.Error("expected failure to be cleared")
if updated.FetchState.LastError != "" {
t.Error("expected last_error to be cleared")
}
if updated.Failures != 0 {
t.Error("expected failures to be cleared")
if updated.FetchState.ConsecutiveFailures != 0 {
t.Error("expected consecutive_failures to be cleared")
}
if updated.LastFailureAt != 0 {
t.Error("expected last_failure_at to be cleared")
if updated.FetchState.LastErrorAt != 0 {
t.Error("expected last_error_at to be cleared")
}
if updated.FetchState.ETag != "abc" {
t.Errorf("expected etag abc, got %q", updated.FetchState.ETag)
}
if updated.FetchState.NextCheckAt != nextCheckAt {
t.Errorf("expected next_check_at %d, got %d", nextCheckAt, updated.FetchState.NextCheckAt)
}
}
func TestUpdateFeedFailure(t *testing.T) {
func TestUpdateFeedFetchFailure(t *testing.T) {
store, _ := setupTestDB(t)
defer closeStore(t, store)
group := mustCreateGroup(t, store, "Test Group")
feed := mustCreateFeed(t, store, group.ID, "Test Feed", "https://example.com/feed", "https://example.com", "")
seedCheckedAt := time.Now().Unix()
if err := store.UpdateFeedFetchSuccess(feed.ID, UpdateFeedFetchSuccessParams{
CheckedAt: seedCheckedAt,
HTTPStatus: 200,
CacheControl: "max-age=1800",
ExpiresAt: seedCheckedAt + 1800,
NextCheckAt: seedCheckedAt + 1800,
}); err != nil {
t.Fatalf("UpdateFeedFetchSuccess() seed failed: %v", err)
}
errorMsg1 := "first error"
if err := store.UpdateFeedFailure(feed.ID, errorMsg1); err != nil {
t.Fatalf("UpdateFeedFailure() failed: %v", err)
firstCheckedAt := time.Now().Unix()
if err := store.UpdateFeedFetchFailure(feed.ID, UpdateFeedFetchFailureParams{
CheckedAt: firstCheckedAt,
HTTPStatus: 500,
LastError: errorMsg1,
IntervalSeconds: 1800,
MaxBackoff: 604800,
}); err != nil {
t.Fatalf("UpdateFeedFetchFailure() failed: %v", err)
}
updated1, err := store.GetFeed(feed.ID)
@@ -260,21 +354,35 @@ func TestUpdateFeedFailure(t *testing.T) {
t.Fatalf("GetFeed() failed: %v", err)
}
if updated1.Failure != errorMsg1 {
t.Errorf("expected failure %q, got %q", errorMsg1, updated1.Failure)
if updated1.FetchState.LastError != errorMsg1 {
t.Errorf("expected last_error %q, got %q", errorMsg1, updated1.FetchState.LastError)
}
if updated1.Failures != 1 {
t.Errorf("expected failures count to be 1, got %d", updated1.Failures)
if updated1.FetchState.ConsecutiveFailures != 1 {
t.Errorf("expected consecutive_failures to be 1, got %d", updated1.FetchState.ConsecutiveFailures)
}
if updated1.LastFailureAt == 0 {
t.Error("expected last_failure_at to be set after failure")
if updated1.FetchState.LastErrorAt == 0 {
t.Error("expected last_error_at to be set after failure")
}
firstFailureAt := updated1.LastFailureAt
if updated1.FetchState.CacheControl != "max-age=1800" {
t.Errorf("expected cache_control to remain from last success, got %q", updated1.FetchState.CacheControl)
}
if updated1.FetchState.ExpiresAt != seedCheckedAt+1800 {
t.Errorf("expected expires_at to remain from last success, got %d", updated1.FetchState.ExpiresAt)
}
firstFailureAt := updated1.FetchState.LastErrorAt
firstNextCheckAt := updated1.FetchState.NextCheckAt
errorMsg2 := "second error"
if err := store.UpdateFeedFailure(feed.ID, errorMsg2); err != nil {
t.Fatalf("UpdateFeedFailure() failed: %v", err)
secondCheckedAt := time.Now().Unix()
if err := store.UpdateFeedFetchFailure(feed.ID, UpdateFeedFetchFailureParams{
CheckedAt: secondCheckedAt,
HTTPStatus: 502,
LastError: errorMsg2,
IntervalSeconds: 1800,
MaxBackoff: 604800,
}); err != nil {
t.Fatalf("UpdateFeedFetchFailure() failed: %v", err)
}
updated2, err := store.GetFeed(feed.ID)
@@ -282,15 +390,100 @@ func TestUpdateFeedFailure(t *testing.T) {
t.Fatalf("GetFeed() failed: %v", err)
}
if updated2.Failure != errorMsg2 {
t.Errorf("expected failure %q, got %q", errorMsg2, updated2.Failure)
if updated2.FetchState.LastError != errorMsg2 {
t.Errorf("expected last_error %q, got %q", errorMsg2, updated2.FetchState.LastError)
}
if updated2.Failures != 2 {
t.Errorf("expected failures count to be 2, got %d", updated2.Failures)
if updated2.FetchState.ConsecutiveFailures != 2 {
t.Errorf("expected consecutive_failures to be 2, got %d", updated2.FetchState.ConsecutiveFailures)
}
if updated2.LastFailureAt < firstFailureAt {
t.Error("expected last_failure_at to be monotonic")
if updated2.FetchState.LastErrorAt < firstFailureAt {
t.Error("expected last_error_at to be monotonic")
}
if updated2.FetchState.CacheControl != "max-age=1800" {
t.Errorf("expected cache_control to remain from last success, got %q", updated2.FetchState.CacheControl)
}
if updated2.FetchState.ExpiresAt != seedCheckedAt+1800 {
t.Errorf("expected expires_at to remain from last success, got %d", updated2.FetchState.ExpiresAt)
}
if updated2.FetchState.NextCheckAt <= firstNextCheckAt {
t.Error("expected next_check_at to increase with more failures")
}
}
func TestUpdateFeedFetchFailureIgnoresFreshnessHeadersForScheduling(t *testing.T) {
store, _ := setupTestDB(t)
defer closeStore(t, store)
group := mustCreateGroup(t, store, "Test Group")
feed := mustCreateFeed(t, store, group.ID, "Test Feed", "https://example.com/feed", "https://example.com", "")
checkedAt := int64(1700000000)
if err := store.UpdateFeedFetchFailure(feed.ID, UpdateFeedFetchFailureParams{
CheckedAt: checkedAt,
HTTPStatus: 500,
LastError: "upstream failure",
RetryAfterUntil: 0,
IntervalSeconds: 60,
MaxBackoff: 600,
}); err != nil {
t.Fatalf("UpdateFeedFetchFailure() failed: %v", err)
}
updated, err := store.GetFeed(feed.ID)
if err != nil {
t.Fatalf("GetFeed() failed: %v", err)
}
if updated.FetchState.NextCheckAt >= checkedAt+3600 {
t.Fatalf("expected failure scheduling to ignore freshness headers, got next_check_at=%d", updated.FetchState.NextCheckAt)
}
}
func TestUpdateFeedFetchFailureConcurrentIncrements(t *testing.T) {
store, _ := setupTestDB(t)
defer closeStore(t, store)
group := mustCreateGroup(t, store, "Test Group")
feed := mustCreateFeed(t, store, group.ID, "Test Feed", "https://example.com/feed", "https://example.com", "")
checkedAt := time.Now().Unix()
params := UpdateFeedFetchFailureParams{
CheckedAt: checkedAt,
HTTPStatus: 500,
LastError: "concurrent error",
IntervalSeconds: 1800,
MaxBackoff: 604800,
}
var wg sync.WaitGroup
var err1 error
var err2 error
wg.Add(2)
go func() {
defer wg.Done()
err1 = store.UpdateFeedFetchFailure(feed.ID, params)
}()
go func() {
defer wg.Done()
err2 = store.UpdateFeedFetchFailure(feed.ID, params)
}()
wg.Wait()
if err1 != nil {
t.Fatalf("first UpdateFeedFetchFailure() failed: %v", err1)
}
if err2 != nil {
t.Fatalf("second UpdateFeedFetchFailure() failed: %v", err2)
}
updated, err := store.GetFeed(feed.ID)
if err != nil {
t.Fatalf("GetFeed() failed: %v", err)
}
if updated.FetchState.ConsecutiveFailures != 2 {
t.Fatalf("expected consecutive_failures=2 after concurrent failures, got %d", updated.FetchState.ConsecutiveFailures)
}
}
+30 -16
View File
@@ -11,7 +11,7 @@ func TestMigrate(t *testing.T) {
defer closeStore(t, store)
// Verify all expected tables exist
tables := []string{"groups", "feeds", "items", "bookmarks", "schema_migrations", "items_fts"}
tables := []string{"groups", "feeds", "feed_fetch_state", "items", "bookmarks", "schema_migrations", "items_fts"}
for _, table := range tables {
var count int
query := "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=:table"
@@ -100,27 +100,38 @@ func TestMigrateLegacySchema(t *testing.T) {
}
var (
feed1GroupID int64
feed1Failures int64
feed1Suspended int
feed1Proxy string
feed1SiteURL string
feed1LastBuild int64
feed1LastFailure int64
feed1GroupID int64
feed1Suspended int
feed1Proxy string
feed1SiteURL string
feed1LastSuccess int64
feed1ConsecutiveFails int64
feed1LastErrorAt int64
feed1LastError string
)
err = store.db.QueryRow(`
SELECT group_id, failures, suspended, proxy, site_url, last_build, last_failure_at
SELECT group_id, suspended, proxy, site_url
FROM feeds
WHERE link = 'https://legacy.example/feed-a.xml'
`).Scan(&feed1GroupID, &feed1Failures, &feed1Suspended, &feed1Proxy, &feed1SiteURL, &feed1LastBuild, &feed1LastFailure)
`).Scan(&feed1GroupID, &feed1Suspended, &feed1Proxy, &feed1SiteURL)
if err != nil {
t.Fatalf("query migrated feed failed: %v", err)
}
err = store.db.QueryRow(`
SELECT last_success_at, consecutive_failures, last_error_at, last_error
FROM feed_fetch_state
WHERE feed_id = (SELECT id FROM feeds WHERE link = 'https://legacy.example/feed-a.xml')
`).Scan(&feed1LastSuccess, &feed1ConsecutiveFails, &feed1LastErrorAt, &feed1LastError)
if err != nil {
t.Fatalf("query migrated feed state failed: %v", err)
}
if feed1GroupID != 2 {
t.Errorf("expected feed-a group_id=2, got %d", feed1GroupID)
}
if feed1Failures != 3 {
t.Errorf("expected failures=3, got %d", feed1Failures)
if feed1ConsecutiveFails != 3 {
t.Errorf("expected consecutive_failures=3, got %d", feed1ConsecutiveFails)
}
if feed1Suspended != 1 {
t.Errorf("expected suspended=1, got %d", feed1Suspended)
@@ -131,11 +142,14 @@ func TestMigrateLegacySchema(t *testing.T) {
if feed1SiteURL != "" {
t.Errorf("expected empty site_url default, got %q", feed1SiteURL)
}
if feed1LastBuild == 0 {
t.Error("expected last_build to be converted to unix timestamp")
if feed1LastSuccess == 0 {
t.Error("expected last_success_at to be converted to unix timestamp")
}
if feed1LastFailure != 0 {
t.Errorf("expected last_failure_at default to 0, got %d", feed1LastFailure)
if feed1LastErrorAt != 0 {
t.Errorf("expected last_error_at default to 0, got %d", feed1LastErrorAt)
}
if feed1LastError != "timeout" {
t.Errorf("expected last_error='timeout', got %q", feed1LastError)
}
var orphanGroupID int64
@@ -0,0 +1,49 @@
-- Split runtime fetch metadata from static feed config.
CREATE TABLE IF NOT EXISTS feed_fetch_state (
feed_id INTEGER PRIMARY KEY REFERENCES feeds(id) ON UPDATE CASCADE ON DELETE CASCADE,
etag TEXT NOT NULL DEFAULT '',
last_modified TEXT NOT NULL DEFAULT '',
cache_control TEXT NOT NULL DEFAULT '',
expires_at INTEGER NOT NULL DEFAULT 0,
last_checked_at INTEGER NOT NULL DEFAULT 0,
next_check_at INTEGER NOT NULL DEFAULT 0,
last_http_status INTEGER NOT NULL DEFAULT 0,
retry_after_until INTEGER NOT NULL DEFAULT 0,
last_success_at INTEGER NOT NULL DEFAULT 0,
last_error_at INTEGER NOT NULL DEFAULT 0,
last_error TEXT NOT NULL DEFAULT '',
consecutive_failures INTEGER NOT NULL DEFAULT 0,
updated_at INTEGER NOT NULL DEFAULT (unixepoch())
);
INSERT INTO feed_fetch_state (
feed_id,
last_success_at,
last_error_at,
last_error,
consecutive_failures,
last_checked_at,
next_check_at,
updated_at
)
SELECT
id,
last_build,
last_failure_at,
failure,
failures,
CASE
WHEN last_failure_at > last_build THEN last_failure_at
ELSE last_build
END,
0,
updated_at
FROM feeds;
CREATE INDEX IF NOT EXISTS idx_feed_fetch_state_next_check_at ON feed_fetch_state(next_check_at);
ALTER TABLE feeds DROP COLUMN last_build;
ALTER TABLE feeds DROP COLUMN last_failure_at;
ALTER TABLE feeds DROP COLUMN failure;
ALTER TABLE feeds DROP COLUMN failures;
+2
View File
@@ -6,3 +6,5 @@
- `old-database-schema.md`: legacy schema snapshot kept for migration work
Release note: API and design docs should be updated together with any behavior or contract changes.
Current breaking API note: feed runtime fields are now under `feed.fetch_state.*` (top-level `last_build/last_failure_at/failure/failures` removed).
+114 -11
View File
@@ -37,6 +37,7 @@ backend/
│ ├── handler/ # HTTP handlers + middleware
│ ├── store/ # SQL persistence + migrations
│ ├── pull/ # fetch/parse/schedule/backoff
│ ├── pullpolicy/ # pure pull scheduling policy
│ ├── auth/ # password + OIDC helpers
│ ├── model/ # API/storage models
│ └── pkg/httpc/ # HTTP client + SSRF guards
@@ -44,7 +45,10 @@ backend/
## 5. Database schema (current)
Source of truth: `backend/internal/store/migrations/001_initial.sql`.
Source of truth:
- `backend/internal/store/migrations/001_initial.sql`
- `backend/internal/store/migrations/002_feed_fetch_state.sql`
Legacy compatibility: when an old pre-`schema_migrations` database is detected,
backend first creates a timestamped `.bak` backup, builds a fresh temporary
@@ -60,11 +64,52 @@ atomically swaps files, then records baseline version `1`.
### feeds
- Core: `id`, `group_id`, `name`, `link`, `site_url`
- Pull status: `last_build`, `last_failure_at`, `failure`, `failures`, `suspended`
- Runtime control: `suspended`
- Network: `proxy`
- Meta: `created_at`, `updated_at`
- Unique: `link`
### feed_fetch_state
- Per-feed runtime fetch state keyed by `feed_id`
- Conditional request metadata: `etag`, `last_modified`
- HTTP cache hints: `cache_control`, `expires_at`, `retry_after_until`
- Scheduler state: `last_checked_at`, `next_check_at`
- Outcome state: `last_http_status`, `last_success_at`, `last_error_at`, `last_error`, `consecutive_failures`
- `feed_id` references `feeds(id)` with `ON DELETE CASCADE`
- API shape: runtime fields are exposed under `feed.fetch_state.*`.
### Feed runtime state map
```mermaid
erDiagram
FEEDS ||--|| FEED_FETCH_STATE : has
FEEDS {
int id PK
int group_id
string name
string link
bool suspended
}
FEED_FETCH_STATE {
int feed_id PK,FK
string etag
string last_modified
string cache_control
int expires_at
int retry_after_until
int last_checked_at
int next_check_at
int last_http_status
int last_success_at
int last_error_at
string last_error
int consecutive_failures
}
```
### items
- `id`, `feed_id`, `guid`, `title`, `link`, `content`, `pub_date`, `unread`, `created_at`
@@ -84,10 +129,10 @@ atomically swaps files, then records baseline version `1`.
## 6. Data integrity and cascade strategy
- No database foreign-key constraints.
- Cascade rules are explicit in store transactions:
- Cascade rules are explicit in store transactions for group/feed/item/bookmark lifecycles:
- Delete group: move feeds to group `1`, then delete group.
- Delete feed: set matching bookmarks `item_id=NULL`, delete items, then delete feed.
- `feed_fetch_state` uses a direct foreign key to `feeds(id)` for guaranteed runtime-state cleanup.
This keeps behavior explicit and avoids hidden DB-level side effects.
@@ -103,27 +148,85 @@ This keeps behavior explicit and avoids hidden DB-level side effects.
Detailed contract: `docs/openapi.yaml`.
### Breaking API change (feed runtime fields)
- Feed runtime pull fields moved from top-level `feed.*` to nested `feed.fetch_state.*`.
- Removed top-level fields: `last_build`, `last_failure_at`, `failure`, `failures`.
- Clients that still decode old fields must update to `fetch_state` before upgrading.
## 8. Feed pull strategy
### Scheduler
- Pull interval: `FUSION_PULL_INTERVAL` (default 1800s)
- Concurrency limit: `FUSION_PULL_CONCURRENCY` (default 10)
- Request timeout: `FUSION_PULL_TIMEOUT` (default 30s)
- Pull interval: `FUSION_PULL_INTERVAL` (default `1800s`)
- Concurrency limit: `FUSION_PULL_CONCURRENCY` (default `10`)
- Request timeout: `FUSION_PULL_TIMEOUT` (default `30s`)
- Global max scheduling delay: `FUSION_PULL_MAX_BACKOFF` (default `48h`)
### Next-check bound
- `next_check_at` is always computed from branch delay, then capped by `FUSION_PULL_MAX_BACKOFF`.
- Success branch (`200/304`):
- `success_delay = max(interval, retry_after_delay, freshness_delay)`
- `freshness_delay` comes from `Cache-Control max-age` and/or `Expires`.
- Failure branch:
- `failure_delay = max(interval, retry_after_delay, backoff_delay)`
- Final rule (both branches):
- `next_check_at = now + min(branch_delay, pull_max_backoff)`
- `suspended` remains an explicit skip switch and is not affected by this cap.
### Skip policy
Periodic pull skips feed when:
- Feed is suspended, or
- Feed is in exponential backoff window, or
- Last successful update is within the regular interval.
- Current time is before `retry_after_until`, or
- Current time is before `next_check_at`.
### Backoff
- Formula: `interval * (1.8 ^ failures)`
- Capped by `FUSION_PULL_MAX_BACKOFF` (default 7 days)
- Failure timestamp source: `last_failure_at` (fallback to `last_build`)
- `failures` here is the updated `consecutive_failures` value after the current failure is recorded.
- `backoff_delay` is capped by `FUSION_PULL_MAX_BACKOFF`.
- Failure branch computes `next_check_at` from the strictest delay source:
- pull interval
- `Retry-After`
- exponential backoff
- Final `next_check_at` (success/failure) uses the same `FUSION_PULL_MAX_BACKOFF` cap.
- Failure counter and `next_check_at` are updated in one DB transaction to avoid stale-counter races during concurrent refresh failures.
- Failure updates do not overwrite `cache_control` / `expires_at`; these freshness fields are only refreshed on successful `200/304` checks.
### Conditional requests
- Request headers: `If-None-Match` (from `etag`) and `If-Modified-Since` (from `last_modified`)
- Response handling:
- `200`: parse items and refresh validators/cache metadata
- `304`: treat as successful check without item parsing
- If a `304` response omits validators or cache headers, previous stored values are kept.
### Pull decision flow
```mermaid
flowchart TD
A[Start periodic pull] --> B{feed.suspended?}
B -- yes --> Z[Skip]
B -- no --> C{now < retry_after_until?}
C -- yes --> Z
C -- no --> D{now < next_check_at?}
D -- yes --> Z
D -- no --> E[Send GET with If-None-Match/If-Modified-Since]
E --> F{HTTP status}
F -- 200 --> G[Parse items and upsert]
G --> H[Update success state]
F -- 304 --> H
F -- other/error --> I[Update failure state in one transaction]
H --> J[Reset consecutive_failures to 0]
J --> K[Compute success delay and cap to now+pull_max_backoff]
I --> L[Increment consecutive_failures]
L --> M[Compute failure delay and cap to now+pull_max_backoff]
```
### Manual refresh
+47 -14
View File
@@ -775,12 +775,10 @@ components:
- group_id
- name
- link
- last_build
- last_failure_at
- failures
- suspended
- created_at
- updated_at
- fetch_state
- unread_count
- item_count
properties:
@@ -796,17 +794,6 @@ components:
type: string
site_url:
type: string
last_build:
type: integer
format: int64
last_failure_at:
type: integer
format: int64
failure:
type: string
failures:
type: integer
format: int64
suspended:
type: boolean
proxy:
@@ -817,6 +804,8 @@ components:
updated_at:
type: integer
format: int64
fetch_state:
$ref: "#/components/schemas/FeedFetchState"
unread_count:
type: integer
format: int64
@@ -824,6 +813,50 @@ components:
type: integer
format: int64
FeedFetchState:
type: object
required:
- expires_at
- last_checked_at
- next_check_at
- last_http_status
- retry_after_until
- last_success_at
- last_error_at
- consecutive_failures
properties:
etag:
type: string
last_modified:
type: string
cache_control:
type: string
expires_at:
type: integer
format: int64
last_checked_at:
type: integer
format: int64
next_check_at:
type: integer
format: int64
last_http_status:
type: integer
retry_after_until:
type: integer
format: int64
last_success_at:
type: integer
format: int64
last_error_at:
type: integer
format: int64
last_error:
type: string
consecutive_failures:
type: integer
format: int64
FeedEnvelope:
type: object
required: [data]
@@ -1,5 +1,5 @@
import { useState, useEffect } from "react";
import { ChevronDown, Save, Trash2, X } from "lucide-react";
import { useState, useEffect, useRef } from "react";
import { AlertCircle, ChevronDown, Save, Trash2, X } from "lucide-react";
import {
Dialog,
DialogContent,
@@ -23,6 +23,11 @@ import {
CollapsibleTrigger,
} from "@/components/ui/collapsible";
import { Switch } from "@/components/ui/switch";
import {
Tooltip,
TooltipContent,
TooltipTrigger,
} from "@/components/ui/tooltip";
import { useUIStore } from "@/store";
import { useGroups } from "@/queries/groups";
import { useUpdateFeed, useDeleteFeed } from "@/queries/feeds";
@@ -30,6 +35,7 @@ import type { UpdateFeedRequest } from "@/lib/api";
import { toast } from "sonner";
import { useI18n } from "@/lib/i18n";
import { cn } from "@/lib/utils";
import { useIsMobile } from "@/hooks/use-mobile";
export function EditFeedDialog() {
const { t } = useI18n();
@@ -47,6 +53,10 @@ export function EditFeedDialog() {
const [isSubmitting, setIsSubmitting] = useState(false);
const [isDeleteOpen, setIsDeleteOpen] = useState(false);
const [isDeleting, setIsDeleting] = useState(false);
const [isMobileErrorTooltipOpen, setIsMobileErrorTooltipOpen] =
useState(false);
const urlInputRef = useRef<HTMLInputElement>(null);
const isMobile = useIsMobile();
useEffect(() => {
if (editingFeed) {
@@ -56,6 +66,7 @@ export function EditFeedDialog() {
setProxy(editingFeed.proxy ?? "");
setSuspended(editingFeed.suspended);
setIsAdvancedOpen(!!editingFeed.proxy);
setIsMobileErrorTooltipOpen(false);
}
}, [editingFeed]);
@@ -72,6 +83,7 @@ export function EditFeedDialog() {
const handleClose = () => {
setEditFeedOpen(false);
resetForm();
setIsMobileErrorTooltipOpen(false);
};
const handleSubmit = async () => {
@@ -154,11 +166,44 @@ export function EditFeedDialog() {
<DialogContent
className="flex w-full max-w-[480px] flex-col gap-0 overflow-hidden p-0"
showCloseButton={false}
onOpenAutoFocus={(event) => {
event.preventDefault();
urlInputRef.current?.focus();
}}
>
{/* Header */}
<DialogHeader className="flex flex-row items-center justify-between border-b px-5 py-4">
<DialogTitle className="text-base font-semibold">
{t("feed.edit.title")}
<DialogTitle className="flex items-center gap-1.5 text-base font-semibold">
<span>{t("feed.edit.title")}</span>
{editingFeed?.fetch_state.last_error && (
<Tooltip
open={isMobile ? isMobileErrorTooltipOpen : undefined}
onOpenChange={(open) => {
if (!isMobile || open) return;
setIsMobileErrorTooltipOpen(false);
}}
>
<TooltipTrigger asChild>
<button
type="button"
aria-label={t("feeds.status.error")}
onClick={() => {
if (!isMobile) return;
setIsMobileErrorTooltipOpen((open) => !open);
}}
className="inline-flex cursor-help items-center text-destructive"
>
<AlertCircle className="h-4 w-4" />
</button>
</TooltipTrigger>
<TooltipContent
side="bottom"
className="max-w-sm whitespace-normal break-words"
>
{editingFeed.fetch_state.last_error.trim()}
</TooltipContent>
</Tooltip>
)}
</DialogTitle>
<Button variant="ghost" size="icon-sm" onClick={handleClose}>
<X className="h-[18px] w-[18px] text-muted-foreground" />
@@ -173,6 +218,7 @@ export function EditFeedDialog() {
{t("feed.add.urlLabel")}
</label>
<Input
ref={urlInputRef}
placeholder={t("feed.add.urlPlaceholder")}
value={url}
onChange={(e) => setUrl(e.target.value)}
+16 -3
View File
@@ -12,17 +12,30 @@ export interface Feed {
name: string;
link: string;
site_url?: string;
last_build: number;
failure?: string;
failures: number;
suspended: boolean;
proxy?: string;
created_at: number;
updated_at: number;
fetch_state: FeedFetchState;
unread_count: number;
item_count: number;
}
export interface FeedFetchState {
etag?: string;
last_modified?: string;
cache_control?: string;
expires_at: number;
last_checked_at: number;
next_check_at: number;
last_http_status: number;
retry_after_until: number;
last_success_at: number;
last_error_at: number;
last_error?: string;
consecutive_failures: number;
}
export interface Item {
id: number;
feed_id: number;
+14
View File
@@ -0,0 +1,14 @@
const FEED_ERROR_PREVIEW_LENGTH = 48;
function normalizeFeedError(error: string): string {
return error.replace(/\s+/g, " ").trim();
}
export function getFeedErrorPreview(error: string): string {
const normalizedError = normalizeFeedError(error);
if (normalizedError.length <= FEED_ERROR_PREVIEW_LENGTH) {
return normalizedError;
}
return `${normalizedError.slice(0, FEED_ERROR_PREVIEW_LENGTH)}...`;
}
+67 -6
View File
@@ -1,6 +1,7 @@
import { useMemo, useState } from "react";
import { createLazyFileRoute } from "@tanstack/react-router";
import {
AlertCircle,
ChevronDown,
ChevronRight,
Download,
@@ -35,12 +36,18 @@ import {
} from "@/components/ui/dropdown-menu";
import { Input } from "@/components/ui/input";
import { ScrollArea } from "@/components/ui/scroll-area";
import {
Tooltip,
TooltipContent,
TooltipTrigger,
} from "@/components/ui/tooltip";
import { feedAPI, groupAPI } from "@/lib/api";
import type { Feed, Group } from "@/lib/api";
import { getFeedErrorPreview } from "@/lib/feed-error";
import { getFaviconUrl } from "@/lib/api/favicon";
import { generateOPML, downloadFile } from "@/lib/opml";
import { useI18n } from "@/lib/i18n";
import { cn } from "@/lib/utils";
import { cn, formatDate } from "@/lib/utils";
import {
useFeedLookup,
useMoveFeedsToGroup,
@@ -49,6 +56,7 @@ import {
import { useDeleteGroup, useGroups, useUpdateGroup } from "@/queries/groups";
import { useUIStore } from "@/store";
import { FeedFavicon } from "@/components/feed/feed-favicon";
import { useIsMobile } from "@/hooks/use-mobile";
export const Route = createLazyFileRoute("/feeds")({
component: FeedsPage,
@@ -86,6 +94,10 @@ function FeedsPage() {
const [isDeleting, setIsDeleting] = useState(false);
const [refreshConfirmOpen, setRefreshConfirmOpen] = useState(false);
const [mobileErrorTooltipFeedId, setMobileErrorTooltipFeedId] = useState<
number | null
>(null);
const isMobile = useIsMobile();
const statusFilterLabels: Record<StatusFilter, string> = {
all: t("feeds.status.all"),
@@ -106,7 +118,8 @@ function FeedsPage() {
) {
return false;
}
if (statusFilter === "error" && !feed.failure) return false;
if (statusFilter === "error" && !feed.fetch_state.last_error)
return false;
if (statusFilter === "paused" && !feed.suspended) return false;
return true;
};
@@ -455,14 +468,61 @@ function FeedsPage() {
</div>
</div>
<div className="flex shrink-0 items-center gap-1.5 sm:gap-2.5">
{feed.failure && (
<span className="h-1.5 w-1.5 shrink-0 rounded-full bg-destructive" />
{feed.fetch_state.last_error && (
<Tooltip
open={
isMobile
? mobileErrorTooltipFeedId === feed.id
: undefined
}
onOpenChange={(open) => {
if (!isMobile || open) return;
setMobileErrorTooltipFeedId((current) =>
current === feed.id ? null : current,
);
}}
>
<TooltipTrigger asChild>
<button
type="button"
aria-label={t("feeds.status.error")}
onClick={() => {
if (!isMobile) return;
setMobileErrorTooltipFeedId((current) =>
current === feed.id ? null : feed.id,
);
}}
className="flex items-center gap-1 rounded-sm text-xs text-destructive"
>
<AlertCircle className="h-3.5 w-3.5 shrink-0" />
<span className="hidden max-w-56 truncate font-medium sm:inline">
{getFeedErrorPreview(
feed.fetch_state.last_error,
)}
</span>
</button>
</TooltipTrigger>
<TooltipContent
side="top"
className="max-w-sm whitespace-normal break-words"
>
{feed.fetch_state.last_error.trim()}
</TooltipContent>
</Tooltip>
)}
{feed.suspended && (
<Pause className="h-3.5 w-3.5 shrink-0 text-muted-foreground" />
)}
<span className="hidden text-xs text-muted-foreground sm:inline">
{t("feeds.itemCount", { count: feed.item_count })}
{t("feeds.itemCount", {
count: feed.item_count,
})}{" "}
·{" "}
{feed.fetch_state.last_checked_at > 0
? formatDate(
feed.fetch_state.last_checked_at,
)
: t("common.unknown")}
</span>
<button
type="button"
@@ -521,7 +581,8 @@ function FeedsPage() {
{" "}
{t("feeds.deleteGroup.moveHint", {
count,
target: target?.name ?? t("feeds.deleteGroup.targetDefault"),
target:
target?.name ?? t("feeds.deleteGroup.targetDefault"),
})}
</>
);