Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions docs-website/router/configuration.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -1713,6 +1713,10 @@ events:
user_info:
username: "username"
password: "password"
tls:
ca_file: /path/to/ca.crt
cert_file: /path/to/client.crt
key_file: /path/to/client.key
kafka:
- id: my-kafka
tls:
Expand Down Expand Up @@ -1746,6 +1750,60 @@ events:
| | authentication.user_info | <Icon icon="square" /> | User-Info based authentication. | |
| | authentication.user_info.username | <Icon icon="square" /> | Username. | |
| | authentication.user_info.password | <Icon icon="square" /> | Password. | |
| | tls | <Icon icon="square" /> | TLS configuration for the NATS provider. | |
| | tls.insecure_skip_verify | <Icon icon="square" /> | Skip server certificate verification. Not recommended for production. | false |
| | tls.ca_file | <Icon icon="square" /> | Path to a custom CA certificate file (PEM) used to verify the server certificate. | |
| | tls.cert_file | <Icon icon="square" /> | Path to the client certificate file (PEM) for mTLS. | |
| | tls.key_file | <Icon icon="square" /> | Path to the client private key file (PEM) for mTLS. | |

#### TLS

The NATS provider supports four TLS modes:

| Mode | Config |
|---|---|
| No TLS | Omit the `tls` block |
| TLS with skip verify | `insecure_skip_verify: true` |
| TLS with custom CA | `ca_file: /path/to/ca.crt` |
| Mutual TLS (mTLS) | `ca_file` + `cert_file` + `key_file` |

Comment thread
coderabbitai[bot] marked this conversation as resolved.
<CodeGroup>
```yaml Skip verify (dev/self-signed)
events:
providers:
nats:
- id: my-nats
url: "nats://localhost:4222"
tls:
insecure_skip_verify: true
```

```yaml Custom CA (1-way TLS)
events:
providers:
nats:
- id: my-nats
url: "nats://localhost:4222"
tls:
ca_file: /path/to/ca.crt
```

```yaml Mutual TLS (mTLS)
events:
providers:
nats:
- id: my-nats
url: "nats://localhost:4222"
tls:
ca_file: /path/to/ca.crt
cert_file: /path/to/client.crt
key_file: /path/to/client.key
```
</CodeGroup>

<Warning>
`insecure_skip_verify: true` disables server certificate verification. Do not use in production.
</Warning>

### Kafka Provider

Expand Down
16 changes: 12 additions & 4 deletions router/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,11 +669,19 @@ type NatsAuthentication struct {
NatsTokenBasedAuthentication `yaml:"token,inline"`
}

type NatsTLSConfiguration struct {
InsecureSkipVerify bool `yaml:"insecure_skip_verify,omitempty" envDefault:"false"`
CaFile string `yaml:"ca_file,omitempty"`
CertFile string `yaml:"cert_file,omitempty"`
KeyFile string `yaml:"key_file,omitempty"`
}

type NatsEventSource struct {
ID string `yaml:"id,omitempty"`
URL string `yaml:"url,omitempty"`
Authentication *NatsAuthentication `yaml:"authentication,omitempty"`
DeleteDurableConsumersOnShutdown bool `yaml:"experiment_delete_durable_consumers_on_shutdown"`
ID string `yaml:"id,omitempty"`
URL string `yaml:"url,omitempty"`
Authentication *NatsAuthentication `yaml:"authentication,omitempty"`
TLS *NatsTLSConfiguration `yaml:"tls,omitempty"`
DeleteDurableConsumersOnShutdown bool `yaml:"experiment_delete_durable_consumers_on_shutdown"`
}

func (n NatsEventSource) GetID() string {
Expand Down
27 changes: 27 additions & 0 deletions router/pkg/config/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -2550,6 +2550,33 @@
}
]
},
"tls": {
"type": "object",
"description": "TLS configuration for the NATS provider.",
"additionalProperties": false,
"properties": {
"insecure_skip_verify": {
"type": "boolean",
"default": false,
"description": "Skip server certificate verification. Not recommended for production use."
},
"ca_file": {
"type": "string",
"description": "Path to a custom CA certificate file (PEM) used to verify the server certificate.",
"format": "file-path"
},
"cert_file": {
"type": "string",
"description": "Path to the client certificate file (PEM) for mTLS.",
Comment thread
vatsalpatel marked this conversation as resolved.
Outdated
"format": "file-path"
},
"key_file": {
"type": "string",
"description": "Path to the client private key file (PEM) for mTLS.",
"format": "file-path"
}
Comment thread
vatsalpatel marked this conversation as resolved.
}
},
"experiment_delete_durable_consumers_on_shutdown": {
"type": "boolean",
"description": "When enabled, all durable JetStream consumers created by this provider are deleted when the router shuts down normally. Defaults to false. NOTE: This option is experimental and may change in future versions.",
Expand Down
72 changes: 72 additions & 0 deletions router/pkg/config/config_events_nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,75 @@ events:
_, err := LoadConfig([]string{f})
require.NoError(t, err)
}

func TestValidNatsTLSInsecureSkipVerify(t *testing.T) {
t.Parallel()

f := createTempFileFromFixture(t, `
version: "1"

graph:
token: "token"

events:
providers:
nats:
- id: default
url: "nats://localhost:4222"
tls:
insecure_skip_verify: true

`)

_, err := LoadConfig([]string{f})
require.NoError(t, err)
}

func TestValidNatsTLSWithFilePaths(t *testing.T) {
t.Parallel()

f := createTempFileFromFixture(t, `
version: "1"

graph:
token: "token"

events:
providers:
nats:
- id: default
url: "nats://localhost:4222"
tls:
ca_file: "/tmp/ca.pem"
cert_file: "/tmp/client.crt"
key_file: "/tmp/client.key"

`)

_, err := LoadConfig([]string{f})
require.NoError(t, err)
}

func TestInvalidNatsTLSUnknownField(t *testing.T) {
t.Parallel()

f := createTempFileFromFixture(t, `
version: "1"

graph:
token: "token"

events:
providers:
nats:
- id: default
url: "nats://localhost:4222"
tls:
unknown_field: true

`)

_, err := LoadConfig([]string{f})
require.ErrorContains(t, err, "errors while loading config files: router config validation error for")
require.ErrorContains(t, err, "additional properties 'unknown_field' not allowed")
}
2 changes: 2 additions & 0 deletions router/pkg/config/testdata/config_full.json
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,7 @@
"ID": "default",
"URL": "nats://localhost:4222",
"Authentication": null,
"TLS": null,
"DeleteDurableConsumersOnShutdown": false
},
{
Expand All @@ -685,6 +686,7 @@
},
"Token": null
},
"TLS": null,
"DeleteDurableConsumersOnShutdown": false
}
],
Expand Down
41 changes: 41 additions & 0 deletions router/pkg/pubsub/nats/provider_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package nats

import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"os"
"time"

"github.com/nats-io/nats.go"
Expand Down Expand Up @@ -114,6 +117,44 @@ func buildNatsOptions(eventSource config.NatsEventSource, logger *zap.Logger) ([
}
}

if eventSource.TLS != nil {
tlsCfg := &tls.Config{
InsecureSkipVerify: eventSource.TLS.InsecureSkipVerify,
}

if eventSource.TLS.InsecureSkipVerify {
logger.Warn("TLS InsecureSkipVerify is enabled for NATS provider. This is not recommended for production environments.", zap.String("provider_id", eventSource.ID))
}

if eventSource.TLS.CaFile != "" {
caPEM, err := os.ReadFile(eventSource.TLS.CaFile)
if err != nil {
return nil, fmt.Errorf("failed to read CA file %q: %w", eventSource.TLS.CaFile, err)
}
pool := x509.NewCertPool()
if !pool.AppendCertsFromPEM(caPEM) {
return nil, fmt.Errorf("failed to parse CA certificate from %q", eventSource.TLS.CaFile)
}
tlsCfg.RootCAs = pool
}

if eventSource.TLS.CertFile != "" || eventSource.TLS.KeyFile != "" {
if eventSource.TLS.CertFile == "" || eventSource.TLS.KeyFile == "" {
return nil, fmt.Errorf("both cert_file and key_file must be provided for mTLS on NATS provider %q", eventSource.ID)
}
if eventSource.TLS.CaFile == "" {
return nil, fmt.Errorf("ca_file is required when mTLS credentials are configured for NATS provider %q", eventSource.ID)
}
Comment thread
vatsalpatel marked this conversation as resolved.
Outdated
cert, err := tls.LoadX509KeyPair(eventSource.TLS.CertFile, eventSource.TLS.KeyFile)
if err != nil {
return nil, fmt.Errorf("failed to load mTLS certificate/key for NATS provider %q: %w", eventSource.ID, err)
}
tlsCfg.Certificates = []tls.Certificate{cert}
}

opts = append(opts, nats.Secure(tlsCfg))
}

return opts, nil
}

Expand Down
Loading
Loading