Skip to content

Consumer

A consumer is a process that attaches to a topic via a subscription and then receives messages.

Subscription Types - describe the way the consumers receive the messages from topics

  • Exclusive - Only one consumer can subscribe, guaranteeing message order.
  • Shared - Multiple consumers can subscribe, messages are delivered round-robin, offering good scalability but no order guarantee.
  • Failover - Similar to shared subscriptions, but multiple consumers can subscribe, and one actively receives messages.

Example

let topic = "/default/test_topic";

  // Create the Exclusive consumer
let mut consumer = danube_client
    .new_consumer()
    .with_topic(topic.to_string())
    .with_consumer_name(consumer_name.to_string())
    .with_subscription(format!("test_subscription_{}", consumer_name))
    .with_subscription_type(SubType::Exclusive)
    .build();

// Subscribe to the topic
consumer.subscribe().await?;

// Start receiving messages
let mut message_stream = consumer.receive().await?;

 if let Some(stream_message) = message_stream.recv().await {

    //process the message and ack for receive
    consumer.ack(&stream_message).await?

 }
ctx := context.Background()
topic := "/default/topic_test"
consumerName := "consumer_test"
subscriptionName := "subscription_test"
subType := danube.Exclusive

consumer, err := client.NewConsumer(ctx).
    WithConsumerName(consumerName).
    WithTopic(topic).
    WithSubscription(subscriptionName).
    WithSubscriptionType(subType).
    Build()
if err != nil {
    log.Fatalf("Failed to initialize the consumer: %v", err)
}

if err := consumer.Subscribe(ctx); err != nil {
    log.Fatalf("Failed to subscribe: %v", err)
}
log.Printf("The Consumer %s was created", consumerName)



// Receiving messages
stream, err := consumer.Receive(ctx)
if err != nil {
    log.Fatalf("Failed to receive messages: %v", err)
}

for msg := range stream {
    fmt.Printf("Received message: %+v\n", string(msg.GetPayload()))

    if _, err := consumer.Ack(ctx, msg); err != nil {
        log.Fatalf("Failed to acknowledge message: %v", err)
    }
}

Complete example

For complete code examples of using producers and consumers, check the links: