feat: add embedded nats server discovery (#127624)
Some checks are pending
Actionlint / Lint GitHub Actions files (push) Waiting to run
Backend Code Checks / Detect whether code changed (push) Waiting to run
Backend Code Checks / Validate Backend Configs (push) Blocked by required conditions
Backend Unit Tests / Detect whether code changed (push) Waiting to run
Backend Unit Tests / Grafana (1/8) (push) Blocked by required conditions
Backend Unit Tests / Grafana (2/8) (push) Blocked by required conditions
Backend Unit Tests / Grafana (3/8) (push) Blocked by required conditions
Backend Unit Tests / Grafana (4/8) (push) Blocked by required conditions
Backend Unit Tests / Grafana (5/8) (push) Blocked by required conditions
Backend Unit Tests / Grafana (6/8) (push) Blocked by required conditions
Backend Unit Tests / Grafana (7/8) (push) Blocked by required conditions
Backend Unit Tests / Grafana (8/8) (push) Blocked by required conditions
Backend Unit Tests / Grafana Enterprise (1/8) (push) Blocked by required conditions
Backend Unit Tests / Grafana Enterprise (2/8) (push) Blocked by required conditions
Backend Unit Tests / Grafana Enterprise (3/8) (push) Blocked by required conditions
Backend Unit Tests / Grafana Enterprise (4/8) (push) Blocked by required conditions
Backend Unit Tests / Grafana Enterprise (5/8) (push) Blocked by required conditions
Backend Unit Tests / Grafana Enterprise (6/8) (push) Blocked by required conditions
Backend Unit Tests / Grafana Enterprise (7/8) (push) Blocked by required conditions
Backend Unit Tests / Grafana Enterprise (8/8) (push) Blocked by required conditions
Backend Unit Tests / All backend unit tests complete (push) Blocked by required conditions
Build Go (matrix) / darwin/amd64 (push) Waiting to run
Build Go (matrix) / linux/amd64 (push) Waiting to run
Build Go (matrix) / linux/armv6 (push) Waiting to run
Build Go (matrix) / linux/armv7 (push) Waiting to run
Build Go (matrix) / linux/amd64 (enterprise) (push) Waiting to run
Build Go (matrix) / darwin/arm64 (push) Waiting to run
Build Go (matrix) / linux/arm64 (push) Waiting to run
Build Go (matrix) / windows/arm64 (push) Waiting to run
Build Go (matrix) / linux/s390x (push) Waiting to run
Build Go (matrix) / darwin/amd64 (enterprise) (push) Waiting to run
Build Go (matrix) / linux/armv6 (enterprise) (push) Waiting to run
Build Go (matrix) / linux/armv7 (enterprise) (push) Waiting to run
Build Go (matrix) / darwin/arm64 (enterprise) (push) Waiting to run
Build Go (matrix) / linux/arm64 (enterprise) (push) Waiting to run
Build Go (matrix) / windows/arm64 (enterprise) (push) Waiting to run
Build Go (matrix) / linux/s390x (enterprise) (push) Waiting to run
Build Go (matrix) / verify rpm stig (linux/amd64) (push) Waiting to run
Lint Frontend / Detect whether code changed (push) Waiting to run
Lint Frontend / Lint (push) Blocked by required conditions
Lint Frontend / Typecheck (push) Blocked by required conditions
Lint Frontend / Typecheck (TSGO/TS7) (push) Blocked by required conditions
Lint Frontend / Verify API clients (push) Waiting to run
Lint Frontend / Verify OpenAPI specs (push) Blocked by required conditions
Lint Frontend / Verify API clients (enterprise) (push) Waiting to run
Lint Frontend / Verify packed frontend packages (push) Blocked by required conditions
Lint Frontend / Check circular dependencies (push) Blocked by required conditions
Lint Frontend / lint-knip (push) Blocked by required conditions
Lint Frontend / Validate yarn install (push) Blocked by required conditions
golangci-lint / Detect whether code changed (push) Waiting to run
golangci-lint / go-fmt (push) Blocked by required conditions
golangci-lint / lint-go (push) Blocked by required conditions
govulncheck / govulncheck (push) Waiting to run
Crowdin Upload Action / upload-sources-to-crowdin (push) Waiting to run
Verify i18n / verify-i18n (push) Waiting to run
Documentation / Build & Verify Docs (push) Waiting to run
Policybot / Check .policy.yml is valid (push) Waiting to run
End-to-end tests / Detect whether code changed (push) Waiting to run
End-to-end tests / Build backend (push) Blocked by required conditions
End-to-end tests / Build frontend (push) Blocked by required conditions
End-to-end tests / Verify Storybook (Playwright) (push) Blocked by required conditions
End-to-end tests / Playwright E2E tests (1/8) (push) Blocked by required conditions
End-to-end tests / Playwright E2E tests (2/8) (push) Blocked by required conditions
End-to-end tests / Playwright E2E tests (3/8) (push) Blocked by required conditions
End-to-end tests / Playwright E2E tests (4/8) (push) Blocked by required conditions
End-to-end tests / Playwright E2E tests (5/8) (push) Blocked by required conditions
End-to-end tests / Playwright E2E tests (6/8) (push) Blocked by required conditions
End-to-end tests / Playwright E2E tests (7/8) (push) Blocked by required conditions
End-to-end tests / Playwright E2E tests (8/8) (push) Blocked by required conditions
End-to-end tests / All Playwright tests complete (push) Blocked by required conditions
End-to-end tests / Report Playwright benchmarks (push) Blocked by required conditions
End-to-end tests / Publish metrics (push) Waiting to run
End-to-end tests / All E2E tests complete (push) Blocked by required conditions
Frontend tests / Decoupled plugin tests (push) Blocked by required conditions
Frontend tests / Packages unit tests (push) Blocked by required conditions
Frontend tests / Detect whether code changed (push) Waiting to run
Frontend tests / Generate golden files (push) Blocked by required conditions
Frontend tests / Unit tests (1 / 16) (push) Blocked by required conditions
Frontend tests / Unit tests (10 / 16) (push) Blocked by required conditions
Frontend tests / Unit tests (11 / 16) (push) Blocked by required conditions
Frontend tests / Unit tests (12 / 16) (push) Blocked by required conditions
Frontend tests / Unit tests (13 / 16) (push) Blocked by required conditions
Frontend tests / Unit tests (14 / 16) (push) Blocked by required conditions
Frontend tests / Unit tests (15 / 16) (push) Blocked by required conditions
Frontend tests / Unit tests (16 / 16) (push) Blocked by required conditions
Frontend tests / Unit tests (2 / 16) (push) Blocked by required conditions
Frontend tests / Unit tests (3 / 16) (push) Blocked by required conditions
Frontend tests / Unit tests (4 / 16) (push) Blocked by required conditions
Frontend tests / Unit tests (5 / 16) (push) Blocked by required conditions
Frontend tests / Unit tests (6 / 16) (push) Blocked by required conditions
Frontend tests / Unit tests (7 / 16) (push) Blocked by required conditions
Frontend tests / Unit tests (8 / 16) (push) Blocked by required conditions
Frontend tests / Unit tests (9 / 16) (push) Blocked by required conditions
Frontend tests / All frontend unit tests complete (push) Blocked by required conditions
Integration Tests (pgvector) / pgvector (push) Waiting to run
Integration Tests / Detect whether code changed (push) Waiting to run
Integration Tests / Sqlite (1/4) (push) Waiting to run
Integration Tests / Sqlite (2/4) (push) Waiting to run
Integration Tests / Postgres (11/16) (push) Waiting to run
Integration Tests / Sqlite (3/4) (push) Waiting to run
Integration Tests / Sqlite (4/4) (push) Waiting to run
Integration Tests / MySQL (1/16) (push) Waiting to run
Integration Tests / MySQL (10/16) (push) Waiting to run
Integration Tests / MySQL (11/16) (push) Waiting to run
Integration Tests / MySQL (12/16) (push) Waiting to run
Integration Tests / MySQL (13/16) (push) Waiting to run
Integration Tests / MySQL (14/16) (push) Waiting to run
Integration Tests / MySQL (15/16) (push) Waiting to run
Integration Tests / MySQL (16/16) (push) Waiting to run
Integration Tests / MySQL (2/16) (push) Waiting to run
Integration Tests / MySQL (3/16) (push) Waiting to run
Integration Tests / MySQL (4/16) (push) Waiting to run
Integration Tests / MySQL (5/16) (push) Waiting to run
Integration Tests / MySQL (6/16) (push) Waiting to run
Integration Tests / MySQL (7/16) (push) Waiting to run
Integration Tests / MySQL (8/16) (push) Waiting to run
Integration Tests / MySQL (9/16) (push) Waiting to run
Integration Tests / Postgres (1/16) (push) Waiting to run
Integration Tests / Postgres (10/16) (push) Waiting to run
Integration Tests / Postgres (12/16) (push) Waiting to run
Integration Tests / Postgres (13/16) (push) Waiting to run
Integration Tests / Postgres (14/16) (push) Waiting to run
Integration Tests / Postgres (15/16) (push) Waiting to run
Integration Tests / Postgres (16/16) (push) Waiting to run
Integration Tests / Postgres (2/16) (push) Waiting to run
Integration Tests / Postgres (3/16) (push) Waiting to run
Integration Tests / Postgres (4/16) (push) Waiting to run
Integration Tests / Postgres (5/16) (push) Waiting to run
Integration Tests / Postgres (6/16) (push) Waiting to run
Integration Tests / Postgres (7/16) (push) Waiting to run
Integration Tests / Postgres (8/16) (push) Waiting to run
Integration Tests / Postgres (9/16) (push) Waiting to run
Integration Tests / Sqlite Enterprise (1/16) (push) Waiting to run
Integration Tests / Sqlite Enterprise (10/16) (push) Waiting to run
Integration Tests / Sqlite Enterprise (11/16) (push) Waiting to run
Integration Tests / Sqlite Enterprise (12/16) (push) Waiting to run
Integration Tests / Sqlite Enterprise (13/16) (push) Waiting to run
Integration Tests / Sqlite Enterprise (14/16) (push) Waiting to run
Integration Tests / Sqlite Enterprise (15/16) (push) Waiting to run
Integration Tests / Sqlite Enterprise (16/16) (push) Waiting to run
Integration Tests / Sqlite Enterprise (2/16) (push) Waiting to run
Integration Tests / Sqlite Enterprise (3/16) (push) Waiting to run
Integration Tests / Sqlite Enterprise (4/16) (push) Waiting to run
Integration Tests / Sqlite Enterprise (5/16) (push) Waiting to run
Integration Tests / Sqlite Enterprise (6/16) (push) Waiting to run
Integration Tests / Sqlite Enterprise (7/16) (push) Waiting to run
Integration Tests / Sqlite Enterprise (8/16) (push) Waiting to run
Integration Tests / Sqlite Enterprise (9/16) (push) Waiting to run
Integration Tests / MySQL Enterprise (1/16) (push) Waiting to run
Integration Tests / MySQL Enterprise (10/16) (push) Waiting to run
Integration Tests / MySQL Enterprise (11/16) (push) Waiting to run
Integration Tests / MySQL Enterprise (12/16) (push) Waiting to run
Integration Tests / MySQL Enterprise (13/16) (push) Waiting to run
Integration Tests / MySQL Enterprise (14/16) (push) Waiting to run
Integration Tests / MySQL Enterprise (15/16) (push) Waiting to run
Integration Tests / MySQL Enterprise (16/16) (push) Waiting to run
Integration Tests / MySQL Enterprise (2/16) (push) Waiting to run
Integration Tests / MySQL Enterprise (3/16) (push) Waiting to run
Integration Tests / MySQL Enterprise (4/16) (push) Waiting to run
Integration Tests / MySQL Enterprise (5/16) (push) Waiting to run
Integration Tests / MySQL Enterprise (6/16) (push) Waiting to run
Integration Tests / MySQL Enterprise (7/16) (push) Waiting to run
Integration Tests / MySQL Enterprise (8/16) (push) Waiting to run
Integration Tests / MySQL Enterprise (9/16) (push) Waiting to run
Integration Tests / Postgres Enterprise (1/16) (push) Waiting to run
Integration Tests / Postgres Enterprise (10/16) (push) Waiting to run
Integration Tests / Postgres Enterprise (11/16) (push) Waiting to run
Integration Tests / Postgres Enterprise (12/16) (push) Waiting to run
Integration Tests / Postgres Enterprise (13/16) (push) Waiting to run
Integration Tests / Postgres Enterprise (14/16) (push) Waiting to run
Integration Tests / Postgres Enterprise (15/16) (push) Waiting to run
Integration Tests / Postgres Enterprise (16/16) (push) Waiting to run
Integration Tests / Postgres Enterprise (2/16) (push) Waiting to run
Integration Tests / Postgres Enterprise (3/16) (push) Waiting to run
Integration Tests / Postgres Enterprise (4/16) (push) Waiting to run
Integration Tests / Postgres Enterprise (5/16) (push) Waiting to run
Integration Tests / Postgres Enterprise (6/16) (push) Waiting to run
Integration Tests / Postgres Enterprise (7/16) (push) Waiting to run
Integration Tests / Postgres Enterprise (8/16) (push) Waiting to run
Integration Tests / Postgres Enterprise (9/16) (push) Waiting to run
Reject GitHub secrets / reject-gh-secrets (push) Waiting to run
Build Release Packages / setup (push) Waiting to run
Build Release Packages / Dispatch grafana-enterprise build (push) Blocked by required conditions
Build Release Packages / docker / linux-armv7 (push) Blocked by required conditions
Build Release Packages / build frontend (push) Blocked by required conditions
Build Release Packages / build backend / darwin-amd64 (push) Blocked by required conditions
Build Release Packages / build backend / linux-amd64 (push) Blocked by required conditions
Build Release Packages / build backend / windows-amd64 (push) Blocked by required conditions
Build Release Packages / build backend / linux-armv6 (push) Blocked by required conditions
Build Release Packages / build backend / linux-armv7 (push) Blocked by required conditions
Build Release Packages / build backend / darwin-arm64 (push) Blocked by required conditions
Build Release Packages / build backend / linux-arm64 (push) Blocked by required conditions
Build Release Packages / build backend / windows-arm64 (push) Blocked by required conditions
Build Release Packages / build backend / linux-s390x (push) Blocked by required conditions
Build Release Packages / build backend / linux-riscv64 (push) Blocked by required conditions
Build Release Packages / targz / darwin-amd64 (push) Blocked by required conditions
Build Release Packages / targz / linux-amd64 (push) Blocked by required conditions
Build Release Packages / targz / windows-amd64 (push) Blocked by required conditions
Build Release Packages / targz / linux-armv6 (push) Blocked by required conditions
Build Release Packages / targz / linux-armv7 (push) Blocked by required conditions
Build Release Packages / targz / darwin-arm64 (push) Blocked by required conditions
Build Release Packages / targz / linux-arm64 (push) Blocked by required conditions
Build Release Packages / targz / windows-arm64 (push) Blocked by required conditions
Build Release Packages / targz / linux-s390x (push) Blocked by required conditions
Build Release Packages / targz / linux-riscv64 (push) Blocked by required conditions
Build Release Packages / deb / rpm / linux-amd64 (push) Blocked by required conditions
Build Release Packages / deb / rpm / linux-armv6 (push) Blocked by required conditions
Build Release Packages / deb / rpm / linux-armv7 (push) Blocked by required conditions
Build Release Packages / deb / rpm / linux-arm64 (push) Blocked by required conditions
Build Release Packages / deb / rpm / linux-s390x (push) Blocked by required conditions
Build Release Packages / verify targz (linux-amd64) (push) Blocked by required conditions
Build Release Packages / deb / rpm / linux-riscv64 (push) Blocked by required conditions
Build Release Packages / verify rpm stig (linux-amd64) (push) Blocked by required conditions
Build Release Packages / docker / linux-amd64 (push) Blocked by required conditions
Build Release Packages / docker / linux-arm64 (push) Blocked by required conditions
Build Release Packages / docker / linux-s390x (push) Blocked by required conditions
Build Release Packages / verify packages (linux-amd64) (push) Blocked by required conditions
Build Release Packages / / windows / windows-amd64 (push) Blocked by required conditions
Build Release Packages / / windows / windows-arm64 (push) Blocked by required conditions
Build Release Packages / Upload targz / darwin-amd64 (push) Blocked by required conditions
Build Release Packages / Upload targz / darwin-arm64 (push) Blocked by required conditions
Build Release Packages / Upload targz / linux-amd64 (push) Blocked by required conditions
Build Release Packages / Upload targz / linux-arm64 (push) Blocked by required conditions
Build Release Packages / Upload targz / linux-armv6 (push) Blocked by required conditions
Build Release Packages / Upload targz / linux-armv7 (push) Blocked by required conditions
Build Release Packages / Upload targz / linux-s390x (push) Blocked by required conditions
Build Release Packages / Upload targz / linux-riscv64 (push) Blocked by required conditions
Build Release Packages / Upload targz / windows-amd64 (push) Blocked by required conditions
Build Release Packages / Upload targz / windows-arm64 (push) Blocked by required conditions
Build Release Packages / Upload deb/rpm / linux-amd64 (push) Blocked by required conditions
Build Release Packages / Upload deb/rpm / linux-arm64 (push) Blocked by required conditions
Build Release Packages / Upload deb/rpm / linux-armv6 (push) Blocked by required conditions
Build Release Packages / Upload deb/rpm / linux-armv7 (push) Blocked by required conditions
Build Release Packages / Upload deb/rpm / linux-s390x (push) Blocked by required conditions
Build Release Packages / Upload deb/rpm / linux-riscv64 (push) Blocked by required conditions
Build Release Packages / Upload docker (alpine) / linux-amd64 (push) Blocked by required conditions
Build Release Packages / Upload docker (alpine) / linux-arm64 (push) Blocked by required conditions
Build Release Packages / Upload docker (alpine) / linux-armv7 (push) Blocked by required conditions
Build Release Packages / Upload docker (alpine) / linux-s390x (push) Blocked by required conditions
Build Release Packages / Upload docker (alpine-slim) / linux-amd64 (push) Blocked by required conditions
Build Release Packages / Upload docker (alpine-slim) / linux-arm64 (push) Blocked by required conditions
Build Release Packages / Upload docker (alpine-slim) / linux-armv7 (push) Blocked by required conditions
Build Release Packages / Upload docker (alpine-slim) / linux-s390x (push) Blocked by required conditions
Build Release Packages / Upload docker (ubuntu) / linux-amd64 (push) Blocked by required conditions
Build Release Packages / Upload docker (ubuntu) / linux-arm64 (push) Blocked by required conditions
Build Release Packages / Upload docker (ubuntu) / linux-armv7 (push) Blocked by required conditions
Build Release Packages / Upload docker (ubuntu) / linux-s390x (push) Blocked by required conditions
Build Release Packages / Upload docker (ubuntu-slim) / linux-amd64 (push) Blocked by required conditions
Build Release Packages / Upload docker (ubuntu-slim) / linux-arm64 (push) Blocked by required conditions
Build Release Packages / Upload docker (ubuntu-slim) / linux-armv7 (push) Blocked by required conditions
Build Release Packages / Upload docker (ubuntu-slim) / linux-s390x (push) Blocked by required conditions
Build Release Packages / Upload docker (distroless) / linux-amd64 (push) Blocked by required conditions
Build Release Packages / Upload docker (distroless) / linux-arm64 (push) Blocked by required conditions
Build Release Packages / Upload docker (distroless) / linux-armv7 (push) Blocked by required conditions
Build Release Packages / Upload docker (distroless) / linux-s390x (push) Blocked by required conditions
Build Release Packages / Upload docker (distroless-slim) / linux-amd64 (push) Blocked by required conditions
Build Release Packages / Upload docker (distroless-slim) / linux-arm64 (push) Blocked by required conditions
Build Release Packages / Upload docker (distroless-slim) / linux-armv7 (push) Blocked by required conditions
Build Release Packages / Upload docker (distroless-slim) / linux-s390x (push) Blocked by required conditions
Build Release Packages / Upload windows / windows-amd64 (push) Blocked by required conditions
Build Release Packages / Upload windows / windows-arm64 (push) Blocked by required conditions
Build Release Packages / Run Meticulous tests (push) Blocked by required conditions
Build Release Packages / Dispatch publish NPM canaries (push) Blocked by required conditions
Build Release Packages / notify-pr (push) Blocked by required conditions
Run dashboard schema v2 e2e / dashboard-schema-v2-e2e (push) Waiting to run
Shellcheck / Shellcheck scripts (push) Waiting to run
Run Storybook a11y tests / Detect whether code changed (push) Waiting to run
Run Storybook a11y tests / Run Storybook a11y tests (dark theme) (push) Blocked by required conditions
Run Storybook a11y tests / Run Storybook a11y tests (deut_prot_dark theme) (push) Blocked by required conditions
Run Storybook a11y tests / Run Storybook a11y tests (deut_prot_light theme) (push) Blocked by required conditions
Run Storybook a11y tests / Run Storybook a11y tests (light theme) (push) Blocked by required conditions
Run Storybook a11y tests / Run Storybook a11y tests (tritanopia_dark theme) (push) Blocked by required conditions
Run Storybook a11y tests / Run Storybook a11y tests (tritanopia_light theme) (push) Blocked by required conditions
Run Storybook a11y tests / Storybook a11y tests (push) Blocked by required conditions
Swagger generated code / Detect whether code changed (push) Waiting to run
Swagger generated code / Verify committed API specs match (push) Blocked by required conditions
Dispatch sync to mirror / dispatch-job (push) Waiting to run

* feat: add discovery

* fix: optimize

* test: add integration tests

* fix: redundancy

* fix: address comments
This commit is contained in:
Mustafa Sencer Özcan 2026-07-01 20:51:46 +02:00 committed by GitHub
parent 0c2163d391
commit 149bd5ecc1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 864 additions and 11 deletions

View file

@ -144,7 +144,8 @@ enabled = false
# Bus topology: "embedded" runs an in-process Core NATS server; "external" connects to client_urls only.
mode = embedded
# Comma-separated Core NATS client URLs. In embedded mode the local server is prepended automatically.
# Comma-separated NATS client connection URLs (for pub/sub). In embedded mode the local server's
# URL is prepended automatically, so this only needs external brokers/peers to also connect to.
client_urls =
# Embedded server listener settings (conventional NATS ports).
@ -152,9 +153,17 @@ listen_address = 127.0.0.1
client_port = 4222
cluster_port = 6222
# Advertise address for peer discovery. If empty, listen_address is used.
# Advertise address for peer discovery. If empty, listen_address is used. The default listen_address
# (127.0.0.1) only clusters on a single host: peers advertise a loopback route that resolves to
# themselves. For multi-host HA, set listen_address to a routable address, or bind 0.0.0.0 and set
# advertise_address to a routable address (0.0.0.0 is coerced to loopback when advertised).
advertise_address =
# Embedded cluster peer discovery. discovery_interval is how often a node refreshes its heartbeat and
# reconciles routes; discovery_ttl is how long a peer is trusted after its last heartbeat before eviction.
discovery_interval = 5s
discovery_ttl = 30s
# Transport security for client connections.
tls_enabled = false
tls_ca_cert_path =

324
pkg/infra/nats/discovery.go Normal file
View file

@ -0,0 +1,324 @@
package nats
import (
"context"
"encoding/json"
"fmt"
"io"
"net/url"
"time"
natsserver "github.com/nats-io/nats-server/v2/server"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/storage/unified/resource/kv"
)
const (
// discoveryClusterName scopes peer rows so independent Grafana clusters can
// share one DB without cross-wiring meshes; matches Cluster.Name.
discoveryClusterName = "grafana"
// Fallbacks used when the config supplies no interval/TTL. Keep the TTL a
// comfortable multiple of the interval so one missed tick doesn't evict a peer.
defaultDiscoveryInterval = 5 * time.Second
defaultDiscoveryTTL = 30 * time.Second
)
// peer is an embedded NATS server advertising the cluster route URL peers dial.
type peer struct {
ServerName string
RouteURL string
}
// peerRegistry is the persistence seam for cluster membership (topology metadata
// only, never event state). Production is KV-backed; tests supply a fake.
type peerRegistry interface {
// upsert records or refreshes this node's row (a heartbeat).
upsert(ctx context.Context, p peer) error
// listActive returns every peer whose heartbeat is within ttl, pruning rows
// older than ttl as a side effect (self-healing via re-registration). Pruning
// is best-effort: the returned set already excludes stale peers, so a failed
// delete only leaves rows to be retried next tick.
listActive(ctx context.Context, ttl time.Duration) ([]peer, error)
// remove deletes this node's row on graceful shutdown.
remove(ctx context.Context, serverName string) error
}
// discovery peers embedded NATS replicas through a KV-backed registry: each node
// advertises its route URL and periodically solicits routes to live peers, so a
// multi-replica deployment self-assembles into a mesh without static config.
type discovery struct {
log log.Logger
server *natsserver.Server
baseOpts natsserver.Options
registry peerRegistry
self peer
interval time.Duration
ttl time.Duration
// routes is the applied peer route set; only touched by the loop goroutine.
routes map[string]struct{}
}
// discoveryOptions bundles the embedded server's base options with the
// discovery loop cadence. baseOpts is the template applyRoutes reloads routes
// from; interval/ttl fall back to package defaults when non-positive.
type discoveryOptions struct {
baseOpts natsserver.Options
interval time.Duration
ttl time.Duration
}
func newDiscovery(logger log.Logger, server *natsserver.Server, registry peerRegistry, self peer, opts discoveryOptions) *discovery {
interval := opts.interval
if interval <= 0 {
interval = defaultDiscoveryInterval
}
ttl := opts.ttl
if ttl <= 0 {
ttl = defaultDiscoveryTTL
}
return &discovery{
log: logger,
server: server,
baseOpts: opts.baseOpts,
registry: registry,
self: self,
interval: interval,
ttl: ttl,
routes: map[string]struct{}{},
}
}
// run drives the discovery loop until ctx is cancelled. It always returns nil so
// a failing registry only degrades clustering rather than tearing down the server.
func (d *discovery) run(ctx context.Context) error {
// Register and reconcile immediately so we don't wait a full interval.
d.tick(ctx)
ticker := time.NewTicker(d.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
d.tick(ctx)
}
}
}
func (d *discovery) tick(ctx context.Context) {
if err := d.registry.upsert(ctx, d.self); err != nil {
d.log.Warn("nats peer heartbeat failed", "server_name", d.self.ServerName, "err", err)
}
// One scan per tick: listActive returns the live peers and prunes stale rows.
peers, err := d.registry.listActive(ctx, d.ttl)
if err != nil {
d.log.Warn("nats peer discovery degraded", "err", err)
// A read failure yields no peers; keep the current routes rather than
// reconciling to an empty set and tearing down the mesh. A prune failure
// still returns the valid peer set, so we fall through and reconcile.
if peers == nil {
return
}
}
d.reconcile(peers)
}
// reconcile reloads the server's routes when the live-peer set has changed; NATS
// solicits new routes and closes removed ones.
func (d *discovery) reconcile(peers []peer) {
desired := make(map[string]struct{}, len(peers))
for _, p := range peers {
// Skip our own row and peers that haven't advertised a route yet.
if p.ServerName == d.self.ServerName || p.RouteURL == "" {
continue
}
desired[p.RouteURL] = struct{}{}
}
if sameRouteSet(d.routes, desired) {
return
}
if err := d.applyRoutes(desired); err != nil {
d.log.Error("failed to apply nats cluster routes", "err", err)
return
}
d.routes = desired
d.log.Info("reconciled nats cluster routes", "peers", len(desired))
}
func (d *discovery) applyRoutes(routes map[string]struct{}) error {
urls := make([]*url.URL, 0, len(routes))
for r := range routes {
u, err := url.Parse(r)
if err != nil {
// A bad URL still counts as "applied" (see caller), so we log once and
// don't retry it every tick until the peer re-advertises a valid one.
d.log.Warn("skipping invalid peer route url", "url", r, "err", err)
continue
}
if u.Scheme != "nats" && u.Scheme != "tls" {
d.log.Warn("skipping invalid peer route url scheme", "url", r, "scheme", u.Scheme)
continue
}
urls = append(urls, u)
}
// Reload from a copy: ReloadOptions mutates what it's given, so we keep baseOpts
// stable and rebuild Routes from scratch each time.
opts := d.baseOpts
opts.Routes = urls
return d.server.ReloadOptions(&opts)
}
// deregister removes this node's row so peers stop dialing it immediately instead
// of waiting for the TTL. Best-effort: the TTL prune is the backstop.
func (d *discovery) deregister(ctx context.Context) {
if err := d.registry.remove(ctx, d.self.ServerName); err != nil {
d.log.Warn("failed to deregister nats peer", "server_name", d.self.ServerName, "err", err)
}
}
func sameRouteSet(a, b map[string]struct{}) bool {
if len(a) != len(b) {
return false
}
for k := range a {
if _, ok := b[k]; !ok {
return false
}
}
return true
}
// peerRecord is the JSON value stored per peer under kv.NATSPeersSection.
// updatedAt is epoch seconds; the store filters on it in memory since the KV
// section is a plain key/value table with no queryable timestamp column.
type peerRecord struct {
ServerName string `json:"serverName"`
RouteURL string `json:"routeURL"`
UpdatedAt int64 `json:"updatedAt"`
}
// kvPeerStore is the KV-backed peerRegistry. Keys are "<clusterName>/<serverName>"
// so peers of independent clusters sharing one KV never cross-wire, and a cluster's
// rows form a contiguous prefix range for listing.
type kvPeerStore struct {
kv kv.KV
clusterName string
now func() time.Time
}
func newKVPeerStore(store kv.KV, clusterName string) *kvPeerStore {
return &kvPeerStore{kv: store, clusterName: clusterName, now: time.Now}
}
func (s *kvPeerStore) peerKey(serverName string) string {
return s.clusterName + "/" + serverName
}
func (s *kvPeerStore) upsert(ctx context.Context, p peer) error {
data, err := json.Marshal(peerRecord{
ServerName: p.ServerName,
RouteURL: p.RouteURL,
UpdatedAt: s.now().Unix(),
})
if err != nil {
return err
}
w, err := s.kv.Save(ctx, kv.NATSPeersSection, s.peerKey(p.ServerName))
if err != nil {
return err
}
if _, err := w.Write(data); err != nil {
_ = w.Close()
return err
}
return w.Close()
}
func (s *kvPeerStore) listActive(ctx context.Context, ttl time.Duration) ([]peer, error) {
cutoff := s.now().Add(-ttl).Unix()
peers := make([]peer, 0)
var stale []string
for rec, err := range s.records(ctx) {
if err != nil {
return nil, err
}
if rec.UpdatedAt < cutoff {
stale = append(stale, s.peerKey(rec.ServerName))
continue
}
peers = append(peers, peer{ServerName: rec.ServerName, RouteURL: rec.RouteURL})
}
// Best-effort prune in one round-trip; the active set above already excludes
// these, so a failed delete just leaves them for the next tick.
if len(stale) > 0 {
if err := s.kv.BatchDelete(ctx, kv.NATSPeersSection, stale); err != nil {
return peers, err
}
}
return peers, nil
}
func (s *kvPeerStore) remove(ctx context.Context, serverName string) error {
return s.kv.Delete(ctx, kv.NATSPeersSection, s.peerKey(serverName))
}
// records iterates this cluster's peer records in two round-trips: one Keys scan
// over the cluster's prefix range, then one BatchGet for their values. A row that
// vanishes between the two (a concurrent prune/remove) simply won't appear in the
// BatchGet result, so it's skipped rather than failing the whole read.
func (s *kvPeerStore) records(ctx context.Context) func(func(peerRecord, error) bool) {
prefix := s.clusterName + "/"
return func(yield func(peerRecord, error) bool) {
var keys []string
for key, err := range s.kv.Keys(ctx, kv.NATSPeersSection, kv.ListOptions{
StartKey: prefix,
EndKey: kv.PrefixRangeEnd(prefix),
}) {
if err != nil {
yield(peerRecord{}, err)
return
}
keys = append(keys, key)
}
if len(keys) == 0 {
return
}
for kvp, err := range s.kv.BatchGet(ctx, kv.NATSPeersSection, keys) {
if err != nil {
yield(peerRecord{}, err)
return
}
rec, err := decodePeer(kvp)
if err != nil {
yield(peerRecord{}, err)
return
}
if !yield(rec, nil) {
return
}
}
}
}
func decodePeer(kvp kv.KeyValue) (peerRecord, error) {
defer func() { _ = kvp.Value.Close() }()
data, err := io.ReadAll(kvp.Value)
if err != nil {
return peerRecord{}, err
}
var rec peerRecord
if err := json.Unmarshal(data, &rec); err != nil {
return peerRecord{}, fmt.Errorf("decode nats peer %q: %w", kvp.Key, err)
}
return rec, nil
}

View file

@ -0,0 +1,135 @@
package nats
import (
"context"
"testing"
"time"
badger "github.com/dgraph-io/badger/v4"
natsserver "github.com/nats-io/nats-server/v2/server"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource/kv"
"github.com/grafana/grafana/pkg/util/testutil"
)
// TestIntegrationDiscoveryEmbeddedCluster starts two embedded servers sharing a
// KV-backed peer registry and asserts they self-assemble into a working cluster,
// exercising the real ProvideServer/dskit lifecycle end-to-end.
func TestIntegrationDiscoveryEmbeddedCluster(t *testing.T) {
testutil.SkipIntegrationTestInShortMode(t)
t.Run("servers discover each other and form a cluster route", func(t *testing.T) {
_, a, b, _, _ := startTwoNodeCluster(t)
// Each node's discovery loop registers itself and reloads the other's route;
// once solicited both servers report an active cluster peer.
require.Eventually(t, func() bool {
return numRoutes(a) >= 1 && numRoutes(b) >= 1
}, 15*time.Second, 100*time.Millisecond, "embedded servers did not form a cluster route")
})
t.Run("a message published on one node reaches a subscriber on the peer", func(t *testing.T) {
ctx, _, _, cfgA, cfgB := startTwoNodeCluster(t)
// Publisher connects (in-process) to node A, subscriber to node B, so a
// delivery proves interest and the message crossed the cluster route.
pub := ProvidePublisher(cfgA.cfg, ProvideNATSConfig(cfgA.cfg, cfgA.srv), prometheus.NewRegistry())
sub := ProvideSubscriber(cfgB.cfg, ProvideNATSConfig(cfgB.cfg, cfgB.srv), prometheus.NewRegistry())
startService(t, ctx, pub)
startService(t, ctx, sub)
const subject = "grafana.integration.cluster"
received := make(chan []byte, 1)
_, err := sub.Subscribe(ctx, subject, func(_ string, data []byte) {
select {
case received <- data:
default:
}
})
require.NoError(t, err)
// Retry until the route is up and interest has propagated cluster-wide.
require.Eventually(t, func() bool {
require.NoError(t, pub.Publish(ctx, subject, []byte("hello")))
select {
case got := <-received:
require.Equal(t, []byte("hello"), got)
return true
case <-time.After(50 * time.Millisecond):
return false
}
}, 15*time.Second, time.Millisecond, "message did not cross the cluster route")
})
}
// nodeCfg bundles a node's full Cfg with the Server it drives, so callers can
// wire a publisher/subscriber to that specific node.
type nodeCfg struct {
cfg *setting.Cfg
srv *Server
}
// startTwoNodeCluster boots two embedded servers sharing one in-memory peer
// registry and runs their discovery loops via the real dskit lifecycle. The
// returned context is cancelled at test end.
func startTwoNodeCluster(t *testing.T) (context.Context, *Server, *Server, nodeCfg, nodeCfg) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
store := newSharedTestKV(t)
a, cfgA := newTestDiscoveryServer(t, store)
b, cfgB := newTestDiscoveryServer(t, store)
startService(t, ctx, a)
startService(t, ctx, b)
return ctx, a, b, nodeCfg{cfg: cfgA, srv: a}, nodeCfg{cfg: cfgB, srv: b}
}
// newTestDiscoveryServer builds a Server running a real embedded NATS server on
// OS-chosen ports, sharing the given KV-backed peer registry for auto-discovery.
func newTestDiscoveryServer(t *testing.T, store kv.KV) (*Server, *setting.Cfg) {
t.Helper()
cfg := setting.NewCfg()
cfg.NATS = setting.NATSSettings{
Enabled: true,
Mode: setting.NATSModeEmbedded,
ListenAddress: "127.0.0.1",
// RANDOM_PORT lets NATS pick free client/cluster ports so two servers
// can run side by side; routeURLForServer reads the bound port back.
ClientPort: natsserver.RANDOM_PORT,
ClusterPort: natsserver.RANDOM_PORT,
DiscoveryInterval: 50 * time.Millisecond,
DiscoveryTTL: time.Minute,
}
s, err := ProvideServer(cfg, nil, prometheus.NewRegistry())
require.NoError(t, err)
// ProvideServer leaves kv nil without a sqlStore; inject the shared registry
// so both servers discover each other through it.
s.kv = store
return s, cfg
}
// newSharedTestKV returns an in-memory KV both servers register their peer rows
// in, standing in for the production DB-backed registry.
func newSharedTestKV(t *testing.T) kv.KV {
t.Helper()
db, err := badger.Open(badger.DefaultOptions("").WithInMemory(true).WithLogger(nil))
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
return kv.NewBadgerKV(db)
}
// numRoutes reads the server's active cluster route count under the lock the
// discovery loop also holds, keeping the -race build clean.
func numRoutes(s *Server) int {
s.mu.RLock()
defer s.mu.RUnlock()
if s.server == nil {
return 0
}
return s.server.NumRoutes()
}

View file

@ -0,0 +1,101 @@
package nats
import (
"context"
"testing"
"time"
badger "github.com/dgraph-io/badger/v4"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/storage/unified/resource/kv"
)
// newTestKVStore builds a kvPeerStore over an in-memory KV with a settable clock
// so TTL behaviour is deterministic.
func newTestKVStore(t *testing.T, clusterName string, now *time.Time) *kvPeerStore {
t.Helper()
db, err := badger.Open(badger.DefaultOptions("").WithInMemory(true).WithLogger(nil))
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
s := newKVPeerStore(kv.NewBadgerKV(db), clusterName)
s.now = func() time.Time { return *now }
return s
}
func TestKVPeerStore(t *testing.T) {
ctx := context.Background()
ttl := 30 * time.Second
t.Run("upsert then listActive round-trips the route url", func(t *testing.T) {
now := time.Unix(1_000, 0)
s := newTestKVStore(t, "grafana", &now)
require.NoError(t, s.upsert(ctx, peer{ServerName: "a", RouteURL: "nats://10.0.0.1:6222"}))
require.NoError(t, s.upsert(ctx, peer{ServerName: "b", RouteURL: "nats://10.0.0.2:6222"}))
peers, err := s.listActive(ctx, ttl)
require.NoError(t, err)
require.ElementsMatch(t, []peer{
{ServerName: "a", RouteURL: "nats://10.0.0.1:6222"},
{ServerName: "b", RouteURL: "nats://10.0.0.2:6222"},
}, peers)
})
t.Run("only returns peers of the same cluster", func(t *testing.T) {
now := time.Unix(1_000, 0)
db, err := badger.Open(badger.DefaultOptions("").WithInMemory(true).WithLogger(nil))
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, db.Close()) })
store := kv.NewBadgerKV(db)
clock := func() time.Time { return now }
a := &kvPeerStore{kv: store, clusterName: "cluster-a", now: clock}
b := &kvPeerStore{kv: store, clusterName: "cluster-b", now: clock}
require.NoError(t, a.upsert(ctx, peer{ServerName: "a1", RouteURL: "nats://10.0.0.1:6222"}))
require.NoError(t, b.upsert(ctx, peer{ServerName: "b1", RouteURL: "nats://10.0.0.2:6222"}))
peersA, err := a.listActive(ctx, ttl)
require.NoError(t, err)
require.Equal(t, []peer{{ServerName: "a1", RouteURL: "nats://10.0.0.1:6222"}}, peersA)
peersB, err := b.listActive(ctx, ttl)
require.NoError(t, err)
require.Equal(t, []peer{{ServerName: "b1", RouteURL: "nats://10.0.0.2:6222"}}, peersB)
})
t.Run("excludes and prunes peers older than the ttl", func(t *testing.T) {
now := time.Unix(1_000, 0)
s := newTestKVStore(t, "grafana", &now)
require.NoError(t, s.upsert(ctx, peer{ServerName: "stale", RouteURL: "nats://10.0.0.1:6222"}))
now = now.Add(ttl + time.Second)
require.NoError(t, s.upsert(ctx, peer{ServerName: "fresh", RouteURL: "nats://10.0.0.2:6222"}))
// stale's heartbeat is now older than the ttl; only fresh is active, and
// listActive prunes the stale row as a side effect.
peers, err := s.listActive(ctx, ttl)
require.NoError(t, err)
require.Equal(t, []peer{{ServerName: "fresh", RouteURL: "nats://10.0.0.2:6222"}}, peers)
_, err = s.kv.Get(ctx, kv.NATSPeersSection, s.peerKey("stale"))
require.ErrorIs(t, err, kv.ErrNotFound)
r, err := s.kv.Get(ctx, kv.NATSPeersSection, s.peerKey("fresh"))
require.NoError(t, err)
require.NoError(t, r.Close())
})
t.Run("remove deletes a single peer", func(t *testing.T) {
now := time.Unix(1_000, 0)
s := newTestKVStore(t, "grafana", &now)
require.NoError(t, s.upsert(ctx, peer{ServerName: "a", RouteURL: "nats://10.0.0.1:6222"}))
require.NoError(t, s.remove(ctx, "a"))
peers, err := s.listActive(ctx, ttl)
require.NoError(t, err)
require.Empty(t, peers)
})
}

View file

@ -0,0 +1,155 @@
package nats
import (
"context"
"sync"
"testing"
"time"
natsserver "github.com/nats-io/nats-server/v2/server"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/infra/log"
)
// fakeRegistry is an in-memory peerRegistry keyed by server name, with a
// settable clock so TTL behaviour is deterministic.
type fakeRegistry struct {
mu sync.Mutex
now time.Time
rows map[string]struct {
peer peer
seen time.Time
}
}
func newFakeRegistry() *fakeRegistry {
return &fakeRegistry{
now: time.Unix(1_000, 0),
rows: map[string]struct {
peer peer
seen time.Time
}{},
}
}
func (f *fakeRegistry) upsert(_ context.Context, p peer) error {
f.mu.Lock()
defer f.mu.Unlock()
f.rows[p.ServerName] = struct {
peer peer
seen time.Time
}{peer: p, seen: f.now}
return nil
}
func (f *fakeRegistry) listActive(_ context.Context, ttl time.Duration) ([]peer, error) {
f.mu.Lock()
defer f.mu.Unlock()
cutoff := f.now.Add(-ttl)
var peers []peer
for name, r := range f.rows {
if r.seen.Before(cutoff) {
delete(f.rows, name)
continue
}
peers = append(peers, r.peer)
}
return peers, nil
}
func (f *fakeRegistry) remove(_ context.Context, serverName string) error {
f.mu.Lock()
defer f.mu.Unlock()
delete(f.rows, serverName)
return nil
}
func (f *fakeRegistry) advance(d time.Duration) {
f.mu.Lock()
defer f.mu.Unlock()
f.now = f.now.Add(d)
}
// newTestDiscovery builds a discovery against a clustered in-process server so
// applyRoutes exercises the real ReloadOptions route-solicitation path. Ports are
// OS-assigned (-1) to avoid collisions; the advertised peer routes are unreachable
// stubs, which NATS solicits asynchronously without failing the reload.
func newTestDiscovery(t *testing.T, reg peerRegistry, self peer) *discovery {
t.Helper()
opts := natsserver.Options{
Host: "127.0.0.1",
Port: -1,
NoLog: true,
NoSigs: true,
JetStream: false,
NoSystemAccount: true,
Cluster: natsserver.ClusterOpts{Name: discoveryClusterName, Host: "127.0.0.1", Port: -1},
}
srv, err := natsserver.NewServer(&opts)
require.NoError(t, err)
go srv.Start()
require.True(t, srv.ReadyForConnections(5*time.Second), "test nats server not ready")
t.Cleanup(srv.Shutdown)
return newDiscovery(log.NewNopLogger(), srv, reg, self, discoveryOptions{baseOpts: opts})
}
func TestDiscoveryReconcile(t *testing.T) {
ctx := context.Background()
t.Run("adds peer routes and skips self and empty routes", func(t *testing.T) {
reg := newFakeRegistry()
self := peer{ServerName: "self", RouteURL: "nats://10.0.0.1:6222"}
require.NoError(t, reg.upsert(ctx, self))
require.NoError(t, reg.upsert(ctx, peer{ServerName: "peer-a", RouteURL: "nats://10.0.0.2:6222"}))
require.NoError(t, reg.upsert(ctx, peer{ServerName: "peer-b", RouteURL: "nats://10.0.0.3:6222"}))
require.NoError(t, reg.upsert(ctx, peer{ServerName: "peer-empty", RouteURL: ""}))
d := newTestDiscovery(t, reg, self)
d.tick(ctx)
require.Equal(t, map[string]struct{}{
"nats://10.0.0.2:6222": {},
"nats://10.0.0.3:6222": {},
}, d.routes)
})
t.Run("drops a peer once its heartbeat ages past the ttl", func(t *testing.T) {
reg := newFakeRegistry()
self := peer{ServerName: "self", RouteURL: "nats://10.0.0.1:6222"}
require.NoError(t, reg.upsert(ctx, self))
require.NoError(t, reg.upsert(ctx, peer{ServerName: "peer-a", RouteURL: "nats://10.0.0.2:6222"}))
d := newTestDiscovery(t, reg, self)
d.tick(ctx)
require.Contains(t, d.routes, "nats://10.0.0.2:6222")
// peer-a stops heartbeating; self keeps ticking.
reg.advance(d.ttl + time.Second)
d.tick(ctx) // heartbeats self, prunes peer-a, reconciles
require.Empty(t, d.routes)
peers, err := reg.listActive(ctx, d.ttl)
require.NoError(t, err)
require.Len(t, peers, 1, "only self should remain after prune")
})
t.Run("deregister removes self", func(t *testing.T) {
reg := newFakeRegistry()
self := peer{ServerName: "self", RouteURL: "nats://10.0.0.1:6222"}
d := newTestDiscovery(t, reg, self)
require.NoError(t, reg.upsert(ctx, self))
d.deregister(ctx)
peers, err := reg.listActive(ctx, d.ttl)
require.NoError(t, err)
require.Empty(t, peers)
})
}
func TestSameRouteSet(t *testing.T) {
require.True(t, sameRouteSet(map[string]struct{}{"a": {}}, map[string]struct{}{"a": {}}))
require.False(t, sameRouteSet(map[string]struct{}{"a": {}}, map[string]struct{}{"b": {}}))
require.False(t, sameRouteSet(map[string]struct{}{"a": {}}, map[string]struct{}{"a": {}, "b": {}}))
}

View file

@ -16,9 +16,10 @@ import (
natsclient "github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource/kv"
)
const (
@ -38,18 +39,33 @@ type Server struct {
cfg setting.NATSSettings
log log.Logger
metrics *serverMetrics
kv kv.KV
mu sync.RWMutex
server *natsserver.Server
opts *natsserver.Options
mu sync.RWMutex
server *natsserver.Server
opts *natsserver.Options
discovery *discovery
}
func ProvideServer(cfg *setting.Cfg, _ *sqlstore.SQLStore, reg prometheus.Registerer) (*Server, error) {
func ProvideServer(cfg *setting.Cfg, sqlStore db.DB, reg prometheus.Registerer) (*Server, error) {
s := &Server{
cfg: cfg.NATS,
log: log.New("infra.nats.server"),
}
// A sqlStore is required to wire DB-backed peer discovery. When present (the
// monolith always injects it) the embedded server clusters through discovery;
// when absent it runs as a single standalone node. Module mode passes nil but
// is prevented from enabling embedded NATS upstream (see module_server.go), so
// in practice a running embedded server in production always has discovery.
if !s.IsDisabled() && sqlStore != nil {
sqlKV, err := kv.NewSQLKV(sqlStore.GetEngine().DB().DB, sqlStore.GetDialect().DriverName())
if err != nil {
return nil, fmt.Errorf("create nats discovery kv: %w", err)
}
s.kv = sqlKV
}
// Only register the embedded-server metrics when this instance actually runs
// the embedded server; external/Cloud mode owns nothing here.
if !s.IsDisabled() {
@ -109,14 +125,30 @@ func (s *Server) dialOptions() []natsclient.Option {
}
func (s *Server) running(ctx context.Context) error {
<-ctx.Done()
return nil
s.mu.RLock()
d := s.discovery
s.mu.RUnlock()
if d == nil {
<-ctx.Done()
return nil
}
return d.run(ctx)
}
func (s *Server) stopping(_ error) error {
if s.IsDisabled() {
return nil
}
s.mu.RLock()
d := s.discovery
s.mu.RUnlock()
if d != nil {
// running's ctx is already cancelled by now, so deregister on a fresh
// bounded context. Best-effort: the TTL prune is the backstop.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
d.deregister(ctx)
}
s.shutdown()
return nil
}
@ -157,14 +189,52 @@ func (s *Server) startEmbeddedServer(_ context.Context) error {
s.mu.Lock()
s.server = server
s.opts = &opts
// Wire peer discovery only when a KV is available. The monolith always injects a
// sqlStore, so production embedded servers cluster through discovery; module mode
// cannot enable embedded NATS (see module_server.go). A nil KV therefore only
// happens for a single standalone node (e.g. tests), which runs without clustering.
if s.kv != nil {
s.discovery = newDiscovery(
s.log,
server,
newKVPeerStore(s.kv, opts.Cluster.Name),
peer{ServerName: opts.ServerName, RouteURL: routeURL},
discoveryOptions{
baseOpts: opts,
interval: s.cfg.DiscoveryInterval,
ttl: s.cfg.DiscoveryTTL,
},
)
}
s.mu.Unlock()
s.metrics.embeddedServerUp.Set(1)
s.log.Info("started embedded nats server", "client_url", clientURL, "route_url", routeURL)
// Discovery advertises route_url to peers via the shared DB. A loopback route
// resolves to the advertising node itself, so peers on other hosts can never
// dial it and clustering silently no-ops. Warn rather than fail: a single
// standalone node on default config is a legitimate case.
if s.kv != nil && isLoopbackRouteURL(routeURL) {
s.log.Warn("embedded nats advertising a loopback route url; multi-host clustering will not form. "+
"Set listen_address to a routable address, or bind 0.0.0.0 and set advertise_address to a routable address",
"route_url", routeURL)
}
return nil
}
// isLoopbackRouteURL reports whether the route URL's host is a loopback address,
// which only clusters within a single host.
func isLoopbackRouteURL(routeURL string) bool {
u, err := url.Parse(routeURL)
if err != nil {
return false
}
ip := net.ParseIP(u.Hostname())
return ip != nil && ip.IsLoopback()
}
func (s *Server) serverOptions() *natsserver.Options {
return &natsserver.Options{
ServerName: defaultServerNamePrefix + randomSuffix(),

View file

@ -50,3 +50,18 @@ func TestServer(t *testing.T) {
require.Error(t, s.Health(context.Background()))
})
}
func TestIsLoopbackRouteURL(t *testing.T) {
cases := map[string]bool{
"nats://127.0.0.1:6222": true,
"nats://[::1]:6222": true,
"nats://10.0.0.5:6222": false,
"nats://example:6222": false, // unresolved hostname is not treated as loopback
"not a url": false,
}
for routeURL, want := range cases {
t.Run(routeURL, func(t *testing.T) {
require.Equal(t, want, isLoopbackRouteURL(routeURL))
})
}
}

View file

@ -252,9 +252,11 @@ func (s *ModuleServer) Run() error {
m.RegisterInvisibleModule(modules.NATS, func() (services.Service, error) {
// The embedded server relies on DB-backed peer discovery that is not wired
// in module mode, so only external NATS is supported here for now.
// in module mode (no sqlStore is injected here), so only external NATS is
// supported. Fail fast rather than fall through to ProvideServer, which would
// reject the nil sqlStore anyway, so operators get a mode-specific message.
if s.cfg.NATS.Enabled && s.cfg.NATS.Embedded() {
s.log.Warn("embedded NATS is not supported in module mode; configure [nats] mode=external")
return nil, fmt.Errorf("embedded NATS is not supported in module mode; configure [nats] mode=external")
}
natsServer, err := nats.ProvideServer(s.cfg, nil, s.registerer)
if err != nil {

View file

@ -176,4 +176,6 @@ func (oss *OSSMigrations) AddMigration(mg *Migrator) {
ualert.AddRuleAlertRoutingColumns(mg)
accesscontrol.AddManagedRoutesPermissions(mg)
addNatsDiscoveryMigrations(mg)
}

View file

@ -0,0 +1,25 @@
package migrations
import (
. "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
)
// addNatsDiscoveryMigrations creates the KV table backing the nats/peers section
// used by embedded NATS peer discovery. This lives in the core migrator (not the
// unified resource-store migrator) because the discovery KV is built directly on
// the core Grafana sqlStore in nats.ProvideServer. The resource-store migrator is
// skipped entirely for the file/unified-grpc/unified-kv-grpc storage types, so
// placing it there would leave embedded NATS without a table in those setups.
func addNatsDiscoveryMigrations(mg *Migrator) {
natsDiscoveryPeers := Table{
Name: "nats_discovery_peers",
Columns: []*Column{
{Name: "key_path", Type: DB_NVarchar, Length: 2048, Nullable: false, IsPrimaryKey: true, IsLatin: true},
{Name: "value", Type: DB_Text, Nullable: false},
},
}
mg.AddMigration("create table nats_discovery_peers", NewAddTableMigration(natsDiscoveryPeers))
mg.AddMigration("Change key_path collation of nats_discovery_peers in postgres", NewRawSQLMigration("").
Postgres(`ALTER TABLE nats_discovery_peers ALTER COLUMN key_path TYPE VARCHAR(2048) COLLATE "C";`))
}

View file

@ -2,6 +2,7 @@ package setting
import (
"fmt"
"time"
"github.com/grafana/grafana/pkg/util"
)
@ -31,6 +32,13 @@ type NATSSettings struct {
ClusterPort int
AdvertiseAddress string
// DiscoveryInterval is how often an embedded node refreshes its registry
// heartbeat and reconciles cluster routes. DiscoveryTTL is how long a peer is
// trusted after its last heartbeat before its route is dropped and row pruned;
// keep it a comfortable multiple of DiscoveryInterval.
DiscoveryInterval time.Duration
DiscoveryTTL time.Duration
TLS NATSTLSSettings
Auth NATSAuthSettings
}
@ -73,6 +81,9 @@ func readNATSSettings(cfg *Cfg) error {
ClientPort: section.Key("client_port").MustInt(4222),
ClusterPort: section.Key("cluster_port").MustInt(6222),
AdvertiseAddress: section.Key("advertise_address").MustString(""),
DiscoveryInterval: section.Key("discovery_interval").MustDuration(5 * time.Second),
DiscoveryTTL: section.Key("discovery_ttl").MustDuration(30 * time.Second),
TLS: NATSTLSSettings{
Enabled: section.Key("tls_enabled").MustBool(false),
CACertPath: section.Key("tls_ca_cert_path").MustString(""),

View file

@ -31,6 +31,7 @@ const (
SearchSnapshotDataSection = "search/snapshot-data"
StatsDailySection = "stats/daily"
StatsAggregatesSection = "stats/aggregates"
NATSPeersSection = "nats/peers"
)
// validSaveSections is the set of sections accepted by SqlKV.Save.
@ -44,6 +45,7 @@ var validSaveSections = map[string]bool{
SearchSnapshotDataSection: true,
StatsDailySection: true,
StatsAggregatesSection: true,
NATSPeersSection: true,
}
var _ KV = &SqlKV{}
@ -129,6 +131,8 @@ func (k *SqlKV) getQueryBuilder(section string) (*queryBuilder, error) {
tableName = "resource_stats_daily"
case StatsAggregatesSection:
tableName = "resource_stats_aggregates"
case NATSPeersSection:
tableName = "nats_discovery_peers"
default:
return nil, fmt.Errorf("invalid section: %s", section)
}