Skip to content

Client Setup and Configuration

This guide covers how to configure and connect Danube clients to your broker.

Basic Connection

Connect to Danube broker with an gRPC endpoint:

use danube_client::DanubeClient;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = DanubeClient::builder()
        .service_url("http://127.0.0.1:6650")
        .build()
        .await?;

    Ok(())
}
import (
    "log"

    "github.com/danube-messaging/danube-go"
)

func main() {
    client, err := danube.NewClient().ServiceURL("127.0.0.1:6650").Build()
    if err != nil {
        log.Fatalf("failed to create client: %v", err)
    }
    _ = client
}
import asyncio
from danube import DanubeClientBuilder

async def main():
    client = await (
        DanubeClientBuilder()
        .service_url("http://127.0.0.1:6650")
        .build()
    )

asyncio.run(main())
import com.danubemessaging.client.DanubeClient;

public class Main {
    public static void main(String[] args) throws Exception {
        DanubeClient client = DanubeClient.builder()
                .serviceUrl("http://127.0.0.1:6650")
                .build();

        // use client ...

        client.close();
    }
}

Endpoint format: http://host:port or https://host:port for TLS


TLS Configuration

For secure production environments, enable TLS encryption:

use danube_client::DanubeClient;
use rustls::crypto;
use tokio::sync::OnceCell;

static CRYPTO_PROVIDER: OnceCell<()> = OnceCell::const_new();

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Initialize crypto provider (required once)
    CRYPTO_PROVIDER.get_or_init(|| async {
        let crypto_provider = crypto::ring::default_provider();
        crypto_provider
            .install_default()
            .expect("Failed to install default CryptoProvider");
    })
    .await;

    let client = DanubeClient::builder()
        .service_url("https://127.0.0.1:6650")
        .with_tls("./certs/ca-cert.pem")?
        .build()
        .await?;

    Ok(())
}
import (
    "log"

    "github.com/danube-messaging/danube-go"
)

func main() {
    // TLS with custom CA certificate
    builder, err := danube.NewClient().
        ServiceURL("127.0.0.1:6650").
        WithTLS("./certs/ca-cert.pem")
    if err != nil {
        log.Fatalf("failed to configure TLS: %v", err)
    }

    client, err := builder.Build()
    if err != nil {
        log.Fatalf("failed to create client: %v", err)
    }
    _ = client
}

For mutual TLS (mTLS) with client certificates:

// mTLS with CA, client cert, and client key
builder, err := danube.NewClient().
    ServiceURL("127.0.0.1:6650").
    WithMTLS("./certs/ca-cert.pem", "./certs/client-cert.pem", "./certs/client-key.pem")
if err != nil {
    log.Fatalf("failed to configure mTLS: %v", err)
}

client, err := builder.Build()
if err != nil {
    log.Fatalf("failed to create client: %v", err)
}
import asyncio
from danube import DanubeClientBuilder

async def main():
    # TLS with custom CA certificate
    client = await (
        DanubeClientBuilder()
        .service_url("https://127.0.0.1:6650")
        .with_tls("./certs/ca-cert.pem")
        .build()
    )

asyncio.run(main())

For mutual TLS (mTLS) with client certificates:

# mTLS with CA, client cert, and client key
client = await (
    DanubeClientBuilder()
    .service_url("https://127.0.0.1:6650")
    .with_mtls(
        "./certs/ca-cert.pem",
        "./certs/client-cert.pem",
        "./certs/client-key.pem",
    )
    .build()
)
import com.danubemessaging.client.DanubeClient;
import java.nio.file.Path;

// TLS with custom CA certificate
DanubeClient client = DanubeClient.builder()
        .serviceUrl("https://127.0.0.1:6650")
        .withTls(Path.of("./certs/ca-cert.pem"))
        .build();

For mutual TLS (mTLS) with client certificates:

// mTLS with CA, client cert, and client key
DanubeClient client = DanubeClient.builder()
        .serviceUrl("https://127.0.0.1:6650")
        .withMutualTls(
                Path.of("./certs/ca-cert.pem"),
                Path.of("./certs/client-cert.pem"),
                Path.of("./certs/client-key.pem"))
        .build();

Requirements:

  • CA certificate file (PEM format)
  • HTTPS URL (https:// instead of http://)
  • Broker must be configured with TLS enabled

Certificate paths:

  • Relative: ./certs/ca-cert.pem
  • Absolute: /etc/danube/certs/ca-cert.pem

JWT Authentication

For authenticated environments, use API keys to obtain JWT tokens:

use danube_client::DanubeClient;
use rustls::crypto;
use tokio::sync::OnceCell;

static CRYPTO_PROVIDER: OnceCell<()> = OnceCell::const_new();

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    CRYPTO_PROVIDER.get_or_init(|| async {
        let crypto_provider = crypto::ring::default_provider();
        crypto_provider
            .install_default()
            .expect("Failed to install default CryptoProvider");
    })
    .await;

    let api_key = std::env::var("DANUBE_API_KEY")
        .expect("DANUBE_API_KEY environment variable not set");

    let client = DanubeClient::builder()
        .service_url("https://127.0.0.1:6650")
        .with_tls("./certs/ca-cert.pem")?
        .with_api_key(api_key)
        .build()
        .await?;

    Ok(())
}
import (
    "log"
    "os"

    "github.com/danube-messaging/danube-go"
)

func main() {
    apiKey := os.Getenv("DANUBE_API_KEY")
    if apiKey == "" {
        log.Fatal("DANUBE_API_KEY environment variable not set")
    }

    // WithAPIKey automatically enables TLS with system CA roots
    client, err := danube.NewClient().
        ServiceURL("127.0.0.1:6650").
        WithAPIKey(apiKey).
        Build()
    if err != nil {
        log.Fatalf("failed to create client: %v", err)
    }
    _ = client
}

To combine API key authentication with a custom CA certificate:

builder, err := danube.NewClient().
    ServiceURL("127.0.0.1:6650").
    WithTLS("./certs/ca-cert.pem")
if err != nil {
    log.Fatalf("failed to configure TLS: %v", err)
}

client, err := builder.WithAPIKey(apiKey).Build()
if err != nil {
    log.Fatalf("failed to create client: %v", err)
}
import asyncio
import os
from danube import DanubeClientBuilder

async def main():
    api_key = os.environ["DANUBE_API_KEY"]

    # with_api_key automatically enables TLS with system CA roots
    client = await (
        DanubeClientBuilder()
        .service_url("https://127.0.0.1:6650")
        .with_api_key(api_key)
        .build()
    )

asyncio.run(main())

To combine API key authentication with a custom CA certificate:

client = await (
    DanubeClientBuilder()
    .service_url("https://127.0.0.1:6650")
    .with_tls("./certs/ca-cert.pem")
    .with_api_key(api_key)
    .build()
)
import com.danubemessaging.client.DanubeClient;

String apiKey = System.getenv("DANUBE_API_KEY");
if (apiKey == null || apiKey.isBlank()) {
    throw new IllegalStateException("DANUBE_API_KEY environment variable not set");
}

// withApiKey automatically enables TLS and exchanges the key for a JWT token
DanubeClient client = DanubeClient.builder()
        .serviceUrl("https://127.0.0.1:6650")
        .withApiKey(apiKey)
        .build();

To combine API key authentication with a custom CA certificate:

import java.nio.file.Path;

DanubeClient client = DanubeClient.builder()
        .serviceUrl("https://127.0.0.1:6650")
        .withTls(Path.of("./certs/ca-cert.pem"))
        .withApiKey(apiKey)
        .build();

How it works:

  1. Client exchanges API key for JWT token on first request
  2. Token is cached and automatically renewed when expired
  3. Token included in Authorization header for all requests
  4. Default token lifetime: 1 hour

Security best practices:

  • Store API keys in environment variables
  • Never hardcode API keys in source code
  • Use different API keys per environment (dev/staging/prod)
  • Rotate API keys regularly

Environment-Based Configuration

# Production
export DANUBE_URL=https://danube.example.com:6650
export DANUBE_CA_CERT=/etc/danube/certs/ca.pem
export DANUBE_API_KEY=your-secret-api-key

Next Steps