Skip to content

Producer

A producer is a process that attaches to a topic and publishes messages to a Danube broker. The Danube broker processes the messages.

Access Mode is a mechanism to determin the permissions of producers on topics.

  • Shared - Multiple producers can publish on a topic.
  • Exclusive - If there is already a producer connected, other producers trying to publish on this topic get errors immediately.

Before an application creates a producer/consumer, the client library needs to initiate a setup phase including two steps:

  • The client attempts to determine the owner of the topic by sending a Lookup request to Broker.
  • Once the client library has the broker address, it creates a RPC connection (or reuses an existing connection from the pool) and (in later stage authenticates it ).
  • Within this connection, the clients (producer, consumer) and brokers exchange RPC commands. At this point, the client sends a command to create producer/consumer to the broker, which will comply after doing some validation checks.

Create Producer

let topic = "/default/topic_test";
let producer_name = "producer_test";

// Create the producer
let mut producer = danube_client
    .new_producer()
    .with_topic(topic)
    .with_name(producer_name)
    .with_schema("my_schema".into(), SchemaType::String)
    .build();

producer.create().await?;

producer
    .send("Hello Danube".as_bytes().into(), None)
    .await?;
topic := "/default/topic_test"
producerName := "producer_test"

producer, err := client.NewProducer(ctx).
    WithName(producerName).
    WithTopic(topic).
    WithSchema("test_schema", danube.SchemaType_STRING).
    Build()
if err != nil {
    log.Fatalf("unable to initialize the producer: %v", err)
}

if err := producer.Create(ctx); err != nil {
    log.Fatalf("Failed to create producer: %v", err)
}

payload := fmt.Sprintf("Hello Danube %d", i)
// Convert string to bytes
bytes_payload := []byte(payload)

messageID, err := producer.Send(ctx, bytes_payload , nil)
if err != nil {
    log.Fatalf("Failed to send message: %v", err)
}

Create Producer with partitioned topic

Here we create a producer with a partitioned topic. A partitioned topic is implemented as N internal topics, where N is the number of partitions. When publishing messages to a partitioned topic, each message is routed to one of several brokers. The distribution of partitions across brokers is handled automatically.

let topic = "/default/topic_test";
let producer_name = "producer_test";
let partitions = 3

// Create the producer
let mut producer = danube_client
    .new_producer()
    .with_topic(topic)
    .with_name(producer_name)
    .with_schema("my_schema".into(), SchemaType::String)
    .with_partitions(partitions)
    .build();

producer.create().await?;

producer
    .send("Hello Danube".as_bytes().into(), None)
    .await?;
topic := "/default/topic_test"
producerName := "producer_test"

producer, err := client.NewProducer(ctx).
    WithName(producerName).
    WithTopic(topic).
    WithSchema("test_schema", danube.SchemaType_STRING).
    WithPartitions(3).
    Build()
if err != nil {
    log.Fatalf("unable to initialize the producer: %v", err)
}

if err := producer.Create(ctx); err != nil {
    log.Fatalf("Failed to create producer: %v", err)
}

payload := fmt.Sprintf("Hello Danube %d", i)
// Convert string to bytes
bytes_payload := []byte(payload)

messageID, err := producer.Send(ctx, bytes_payload , nil)
if err != nil {
    log.Fatalf("Failed to send message: %v", err)
}

Create Producer with reliable dispatch topic

Here we create Producer with reliable dispatch topic. This strategy ensures guaranteed message delivery by implementing a store-and-forward mechanism. When a message arrives, it's first stored in the chosen storage backend before being dispatched to subscribers.

use danube_client::{
ConfigReliableOptions, ConfigRetentionPolicy, DanubeClient, SchemaType };

let topic = "/default/topic_test";
let producer_name = "producer_test";

let reliable_options =
    ConfigReliableOptions::new(5, ConfigRetentionPolicy::RetainUntilExpire, 3600);

// Create the producer
let mut producer = danube_client
    .new_producer()
    .with_topic(topic)
    .with_name(producer_name)
    .with_schema("my_schema".into(), SchemaType::String)
    .with_reliable_dispatch(reliable_options)
    .build();

producer.create().await?;

producer
    .send("Hello Danube".as_bytes().into(), None)
    .await?;
topic := "/default/topic_test"
producerName := "producer_test"

// For reliable strategy
reliableOpts := danube.NewReliableOptions(
    10, // 10MB segment size
    danube.RetainUntilExpire,
    3600, // retention period in seconds
    )
reliableStrategy := danube.NewReliableDispatchStrategy(reliableOpts)

producer, err := client.NewProducer(ctx).
    WithName(producerName).
    WithTopic(topic).
    WithSchema("test_schema", danube.SchemaType_STRING).
    WithDispatchStrategy(reliableStrategy).
    Build()
if err != nil {
    log.Fatalf("unable to initialize the producer: %v", err)
}

if err := producer.Create(ctx); err != nil {
    log.Fatalf("Failed to create producer: %v", err)
}

payload := fmt.Sprintf("Hello Danube %d", i)
// Convert string to bytes
bytes_payload := []byte(payload)

messageID, err := producer.Send(ctx, bytes_payload , nil)
if err != nil {
    log.Fatalf("Failed to send message: %v", err)
}

Complete examples

For complete code examples of using producers and consumers, check the links: