Client Setup and Configuration
This guide covers how to configure and connect Danube clients to your broker.
Basic Connection
Connect to Danube broker with an gRPC endpoint:
Endpoint format: http://host:port or https://host:port for TLS
TLS Configuration
For secure production environments, enable TLS encryption:
use danube_client::DanubeClient;
use rustls::crypto;
use tokio::sync::OnceCell;
static CRYPTO_PROVIDER: OnceCell<()> = OnceCell::const_new();
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize crypto provider (required once)
CRYPTO_PROVIDER.get_or_init(|| async {
let crypto_provider = crypto::ring::default_provider();
crypto_provider
.install_default()
.expect("Failed to install default CryptoProvider");
})
.await;
let client = DanubeClient::builder()
.service_url("https://127.0.0.1:6650")
.with_tls("./certs/ca-cert.pem")?
.build()
.await?;
Ok(())
}
import (
"log"
"github.com/danube-messaging/danube-go"
)
func main() {
// TLS with custom CA certificate
builder, err := danube.NewClient().
ServiceURL("127.0.0.1:6650").
WithTLS("./certs/ca-cert.pem")
if err != nil {
log.Fatalf("failed to configure TLS: %v", err)
}
client, err := builder.Build()
if err != nil {
log.Fatalf("failed to create client: %v", err)
}
_ = client
}
For mutual TLS (mTLS) with client certificates:
// mTLS with CA, client cert, and client key
builder, err := danube.NewClient().
ServiceURL("127.0.0.1:6650").
WithMTLS("./certs/ca-cert.pem", "./certs/client-cert.pem", "./certs/client-key.pem")
if err != nil {
log.Fatalf("failed to configure mTLS: %v", err)
}
client, err := builder.Build()
if err != nil {
log.Fatalf("failed to create client: %v", err)
}
import asyncio
from danube import DanubeClientBuilder
async def main():
# TLS with custom CA certificate
client = await (
DanubeClientBuilder()
.service_url("https://127.0.0.1:6650")
.with_tls("./certs/ca-cert.pem")
.build()
)
asyncio.run(main())
For mutual TLS (mTLS) with client certificates:
Requirements:
- CA certificate file (PEM format)
- HTTPS URL (
https://instead ofhttp://) - Broker must be configured with TLS enabled
Certificate paths:
- Relative:
./certs/ca-cert.pem - Absolute:
/etc/danube/certs/ca-cert.pem
JWT Authentication
When the broker is running with auth.mode: tls, all client requests must carry a valid JWT token. Tokens are created offline using danube-admin and passed to the client at construction time — no API key exchange occurs at runtime.
Creating a Token
Tokens are generated offline using danube-admin (no broker connection needed):
# Create a service account token (default: 1-year TTL)
danube-admin security tokens create \
--subject my-app \
--secret-key your-secret-key
# Create a token with custom TTL and issuer
danube-admin security tokens create \
--subject my-app \
--ttl 24h \
--issuer danube-auth \
--secret-key your-secret-key
The secret_key must match the broker's jwt.secret_key configuration. The token's sub claim (subject) is the principal name used for RBAC authorization.
For a complete guide on tokens, roles, bindings, and RBAC setup, see Security Concepts.
Connecting with a Token
Use with_token to authenticate. This automatically enables TLS:
use danube_client::DanubeClient;
use rustls::crypto;
use tokio::sync::OnceCell;
static CRYPTO_PROVIDER: OnceCell<()> = OnceCell::const_new();
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
CRYPTO_PROVIDER.get_or_init(|| async {
let crypto_provider = crypto::ring::default_provider();
crypto_provider
.install_default()
.expect("Failed to install default CryptoProvider");
})
.await;
let token = std::env::var("DANUBE_TOKEN")
.expect("DANUBE_TOKEN environment variable not set");
let client = DanubeClient::builder()
.service_url("https://127.0.0.1:6650")
.with_tls("./certs/ca-cert.pem")?
.with_token(&token)
.build()
.await?;
Ok(())
}
import (
"log"
"os"
"github.com/danube-messaging/danube-go"
)
func main() {
token := os.Getenv("DANUBE_TOKEN")
if token == "" {
log.Fatal("DANUBE_TOKEN environment variable not set")
}
// WithToken automatically enables TLS with system CA roots
client, err := danube.NewClient().
ServiceURL("127.0.0.1:6650").
WithToken(token).
Build()
if err != nil {
log.Fatalf("failed to create client: %v", err)
}
_ = client
}
To combine token authentication with a custom CA certificate:
import asyncio
import os
from danube import DanubeClientBuilder
async def main():
token = os.environ["DANUBE_TOKEN"]
# with_token automatically enables TLS with system CA roots
client = await (
DanubeClientBuilder()
.service_url("https://127.0.0.1:6650")
.with_token(token)
.build()
)
asyncio.run(main())
To combine token authentication with a custom CA certificate:
import com.danubemessaging.client.DanubeClient;
String token = System.getenv("DANUBE_TOKEN");
if (token == null || token.isBlank()) {
throw new IllegalStateException("DANUBE_TOKEN environment variable not set");
}
// withToken automatically enables TLS
DanubeClient client = DanubeClient.builder()
.serviceUrl("https://127.0.0.1:6650")
.withToken(token)
.build();
To combine token authentication with a custom CA certificate:
How it works:
- Token is created offline using
danube-admin security tokens create - Client sends the token as
Authorization: Bearer <token>on every gRPC request - Broker validates the token signature and expiration, then checks RBAC policies
- No server-side token exchange — the client never calls an authentication RPC
Security best practices:
- Store tokens in environment variables or secret managers
- Never hardcode tokens in source code
- Use short-lived tokens with
--ttlfor production workloads - Assign minimal RBAC permissions via roles and bindings
Token Rotation
For environments where tokens need to be refreshed at runtime (e.g., Kubernetes projected volumes, vault-injected secrets), use with_token_supplier. The supplier function is called on every gRPC request to get the current token:
Environment-Based Configuration
# Production
export DANUBE_URL=https://danube.example.com:6650
export DANUBE_CA_CERT=/etc/danube/certs/ca.pem
export DANUBE_TOKEN=$(danube-admin security tokens create \
--subject my-app --secret-key your-secret-key)
Next Steps
- Producer Guide - Start sending messages
- Consumer Guide - Start receiving messages
- Schema Registry - Add type safety with schemas
- Security Concepts - Full RBAC setup guide