Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions router/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,11 +669,19 @@ type NatsAuthentication struct {
NatsTokenBasedAuthentication `yaml:"token,inline"`
}

type NatsTLSConfiguration struct {
InsecureSkipVerify bool `yaml:"insecure_skip_verify,omitempty" envDefault:"false"`
CaFile string `yaml:"ca_file,omitempty"`
CertFile string `yaml:"cert_file,omitempty"`
KeyFile string `yaml:"key_file,omitempty"`
}

type NatsEventSource struct {
ID string `yaml:"id,omitempty"`
URL string `yaml:"url,omitempty"`
Authentication *NatsAuthentication `yaml:"authentication,omitempty"`
DeleteDurableConsumersOnShutdown bool `yaml:"experiment_delete_durable_consumers_on_shutdown"`
ID string `yaml:"id,omitempty"`
URL string `yaml:"url,omitempty"`
Authentication *NatsAuthentication `yaml:"authentication,omitempty"`
TLS *NatsTLSConfiguration `yaml:"tls,omitempty"`
DeleteDurableConsumersOnShutdown bool `yaml:"experiment_delete_durable_consumers_on_shutdown"`
}

func (n NatsEventSource) GetID() string {
Expand Down
27 changes: 27 additions & 0 deletions router/pkg/config/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -2550,6 +2550,33 @@
}
]
},
"tls": {
"type": "object",
"description": "TLS configuration for the NATS provider.",
"additionalProperties": false,
"properties": {
"insecure_skip_verify": {
"type": "boolean",
"default": false,
"description": "Skip server certificate verification. Not recommended for production use."
},
"ca_file": {
"type": "string",
"description": "Path to a custom CA certificate file (PEM) used to verify the server certificate.",
"format": "file-path"
},
"cert_file": {
"type": "string",
"description": "Path to the client certificate file (PEM) for mTLS.",
Comment thread
vatsalpatel marked this conversation as resolved.
Outdated
"format": "file-path"
},
"key_file": {
"type": "string",
"description": "Path to the client private key file (PEM) for mTLS.",
"format": "file-path"
}
Comment thread
vatsalpatel marked this conversation as resolved.
}
},
"experiment_delete_durable_consumers_on_shutdown": {
"type": "boolean",
"description": "When enabled, all durable JetStream consumers created by this provider are deleted when the router shuts down normally. Defaults to false. NOTE: This option is experimental and may change in future versions.",
Expand Down
72 changes: 72 additions & 0 deletions router/pkg/config/config_events_nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,75 @@ events:
_, err := LoadConfig([]string{f})
require.NoError(t, err)
}

func TestValidNatsTLSInsecureSkipVerify(t *testing.T) {
t.Parallel()

f := createTempFileFromFixture(t, `
version: "1"

graph:
token: "token"

events:
providers:
nats:
- id: default
url: "nats://localhost:4222"
tls:
insecure_skip_verify: true

`)

_, err := LoadConfig([]string{f})
require.NoError(t, err)
}

func TestValidNatsTLSWithFilePaths(t *testing.T) {
t.Parallel()

f := createTempFileFromFixture(t, `
version: "1"

graph:
token: "token"

events:
providers:
nats:
- id: default
url: "nats://localhost:4222"
tls:
ca_file: "/tmp/ca.pem"
cert_file: "/tmp/client.crt"
key_file: "/tmp/client.key"

`)

_, err := LoadConfig([]string{f})
require.NoError(t, err)
}

func TestInvalidNatsTLSUnknownField(t *testing.T) {
t.Parallel()

f := createTempFileFromFixture(t, `
version: "1"

graph:
token: "token"

events:
providers:
nats:
- id: default
url: "nats://localhost:4222"
tls:
unknown_field: true

`)

_, err := LoadConfig([]string{f})
require.ErrorContains(t, err, "errors while loading config files: router config validation error for")
require.ErrorContains(t, err, "additional properties 'unknown_field' not allowed")
}
2 changes: 2 additions & 0 deletions router/pkg/config/testdata/config_full.json
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,7 @@
"ID": "default",
"URL": "nats://localhost:4222",
"Authentication": null,
"TLS": null,
"DeleteDurableConsumersOnShutdown": false
},
{
Expand All @@ -685,6 +686,7 @@
},
"Token": null
},
"TLS": null,
"DeleteDurableConsumersOnShutdown": false
}
],
Expand Down
41 changes: 41 additions & 0 deletions router/pkg/pubsub/nats/provider_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package nats

import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"os"
"time"

"github.com/nats-io/nats.go"
Expand Down Expand Up @@ -114,6 +117,44 @@ func buildNatsOptions(eventSource config.NatsEventSource, logger *zap.Logger) ([
}
}

if eventSource.TLS != nil {
tlsCfg := &tls.Config{
InsecureSkipVerify: eventSource.TLS.InsecureSkipVerify,
}

if eventSource.TLS.InsecureSkipVerify {
logger.Warn("TLS InsecureSkipVerify is enabled for NATS provider. This is not recommended for production environments.", zap.String("provider_id", eventSource.ID))
}

if eventSource.TLS.CaFile != "" {
caPEM, err := os.ReadFile(eventSource.TLS.CaFile)
if err != nil {
return nil, fmt.Errorf("failed to read CA file %q: %w", eventSource.TLS.CaFile, err)
}
pool := x509.NewCertPool()
if !pool.AppendCertsFromPEM(caPEM) {
return nil, fmt.Errorf("failed to parse CA certificate from %q", eventSource.TLS.CaFile)
}
tlsCfg.RootCAs = pool
}

if eventSource.TLS.CertFile != "" || eventSource.TLS.KeyFile != "" {
if eventSource.TLS.CertFile == "" || eventSource.TLS.KeyFile == "" {
return nil, fmt.Errorf("both cert_file and key_file must be provided for mTLS on NATS provider %q", eventSource.ID)
}
if eventSource.TLS.CaFile == "" {
return nil, fmt.Errorf("ca_file is required when mTLS credentials are configured for NATS provider %q", eventSource.ID)
}
Comment thread
vatsalpatel marked this conversation as resolved.
Outdated
cert, err := tls.LoadX509KeyPair(eventSource.TLS.CertFile, eventSource.TLS.KeyFile)
if err != nil {
return nil, fmt.Errorf("failed to load mTLS certificate/key for NATS provider %q: %w", eventSource.ID, err)
}
tlsCfg.Certificates = []tls.Certificate{cert}
}

opts = append(opts, nats.Secure(tlsCfg))
}

return opts, nil
}

Expand Down
166 changes: 166 additions & 0 deletions router/pkg/pubsub/nats/provider_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,16 @@ package nats

import (
"context"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"math/big"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -67,6 +76,114 @@ func TestBuildNatsOptions(t *testing.T) {
})
}

func TestBuildNatsOptionsWithTLS(t *testing.T) {
t.Run("insecure skip verify", func(t *testing.T) {
Comment thread
vatsalpatel marked this conversation as resolved.
Outdated
cfg := config.NatsEventSource{
ID: "test-nats",
URL: "nats://localhost:4222",
TLS: &config.NatsTLSConfiguration{
InsecureSkipVerify: true,
},
}
logger := zaptest.NewLogger(t)

opts, err := buildNatsOptions(cfg, logger)
require.NoError(t, err)
require.Greater(t, len(opts), 7) // base options + TLS option
Comment thread
vatsalpatel marked this conversation as resolved.
Outdated
})

t.Run("missing ca file returns error", func(t *testing.T) {
cfg := config.NatsEventSource{
ID: "test-nats",
URL: "nats://localhost:4222",
TLS: &config.NatsTLSConfiguration{
CaFile: "/nonexistent/ca.pem",
},
}
logger := zaptest.NewLogger(t)

_, err := buildNatsOptions(cfg, logger)
require.ErrorContains(t, err, "failed to read CA file")
})

t.Run("cert file without key file returns error", func(t *testing.T) {
cfg := config.NatsEventSource{
ID: "test-nats",
URL: "nats://localhost:4222",
TLS: &config.NatsTLSConfiguration{
CertFile: "/tmp/client.crt",
},
}
logger := zaptest.NewLogger(t)

_, err := buildNatsOptions(cfg, logger)
require.ErrorContains(t, err, "both cert_file and key_file must be provided")
})

t.Run("key file without cert file returns error", func(t *testing.T) {
cfg := config.NatsEventSource{
ID: "test-nats",
URL: "nats://localhost:4222",
TLS: &config.NatsTLSConfiguration{
KeyFile: "/tmp/client.key",
},
}
logger := zaptest.NewLogger(t)

_, err := buildNatsOptions(cfg, logger)
require.ErrorContains(t, err, "both cert_file and key_file must be provided")
})

t.Run("mtls without ca file returns error", func(t *testing.T) {
cfg := config.NatsEventSource{
ID: "test-nats",
URL: "nats://localhost:4222",
TLS: &config.NatsTLSConfiguration{
CertFile: "/tmp/client.crt",
KeyFile: "/tmp/client.key",
},
}
logger := zaptest.NewLogger(t)

_, err := buildNatsOptions(cfg, logger)
require.ErrorContains(t, err, "ca_file is required when mTLS credentials are configured")
})

t.Run("ca file only succeeds", func(t *testing.T) {
caFile, _, _ := generateTestCerts(t)
cfg := config.NatsEventSource{
ID: "test-nats",
URL: "nats://localhost:4222",
TLS: &config.NatsTLSConfiguration{
CaFile: caFile,
},
}
logger := zaptest.NewLogger(t)

opts, err := buildNatsOptions(cfg, logger)
require.NoError(t, err)
require.Greater(t, len(opts), 7)
})

t.Run("mtls with ca file succeeds", func(t *testing.T) {
caFile, certFile, keyFile := generateTestCerts(t)
cfg := config.NatsEventSource{
ID: "test-nats",
URL: "nats://localhost:4222",
TLS: &config.NatsTLSConfiguration{
CaFile: caFile,
CertFile: certFile,
KeyFile: keyFile,
},
}
logger := zaptest.NewLogger(t)

opts, err := buildNatsOptions(cfg, logger)
require.NoError(t, err)
require.Greater(t, len(opts), 7)
})
}

func TestPubSubProviderBuilderFactory(t *testing.T) {
t.Run("creates provider with configured adapters", func(t *testing.T) {
providerId := "test-provider"
Expand All @@ -92,3 +209,52 @@ func TestPubSubProviderBuilderFactory(t *testing.T) {
assert.NotNil(t, natsProvider.Adapter)
})
}

// writeTempPEM writes PEM-encoded bytes to a temp file and returns its path.
func writeTempPEM(t *testing.T, data []byte) string {
t.Helper()
f, err := os.CreateTemp(t.TempDir(), "*.pem")
require.NoError(t, err)
_, err = f.Write(data)
require.NoError(t, err)
require.NoError(t, f.Close())
return f.Name()
}

// generateTestCerts creates a self-signed CA, and a client cert signed by that CA.
// Returns (caFile, certFile, keyFile) paths.
func generateTestCerts(t *testing.T) (caFile, certFile, keyFile string) {
t.Helper()

caKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
require.NoError(t, err)
caTemplate := &x509.Certificate{
SerialNumber: big.NewInt(1),
Subject: pkix.Name{CommonName: "test-ca"},
NotBefore: time.Now().Add(-time.Hour),
NotAfter: time.Now().Add(time.Hour),
IsCA: true,
BasicConstraintsValid: true,
}
caDER, err := x509.CreateCertificate(rand.Reader, caTemplate, caTemplate, &caKey.PublicKey, caKey)
require.NoError(t, err)
caPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: caDER})

clientKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
require.NoError(t, err)
clientTemplate := &x509.Certificate{
SerialNumber: big.NewInt(2),
Subject: pkix.Name{CommonName: "test-client"},
NotBefore: time.Now().Add(-time.Hour),
NotAfter: time.Now().Add(time.Hour),
}
clientDER, err := x509.CreateCertificate(rand.Reader, clientTemplate, caTemplate, &clientKey.PublicKey, caKey)
require.NoError(t, err)
clientCertPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: clientDER})

clientKeyDER, err := x509.MarshalECPrivateKey(clientKey)
require.NoError(t, err)
clientKeyPEM := pem.EncodeToMemory(&pem.Block{Type: "EC PRIVATE KEY", Bytes: clientKeyDER})

return writeTempPEM(t, caPEM), writeTempPEM(t, clientCertPEM), writeTempPEM(t, clientKeyPEM)
}
Loading