Key-Shared Subscriptions Guide
Key-Shared subscriptions route messages to consumers based on their routing key. All messages with the same key are guaranteed to be delivered to the same consumer, in order. This combines the parallelism of Shared subscriptions with per-key ordering guarantees.
For architecture details, see Key-Shared Dispatch Architecture.
When to Use Key-Shared
Key-Shared is the right choice when you need:
- Per-key ordering with multi-consumer parallelism : messages for the same key are processed sequentially, but different keys are processed in parallel
- Key-affinity processing : all events for a given entity (order, user, device) go to the same consumer
- Selective routing : specific consumers handle specific subsets of keys via filtering
Comparison with Other Subscription Types
| Requirement | Subscription Type |
|---|---|
| Total ordering, single consumer | Exclusive |
| Maximum throughput, no ordering | Shared |
| Ordering + high availability | Failover |
| Per-key ordering + parallelism | Key-Shared |
Producer: Sending with Routing Keys
Tag each message with a routing key using send_with_key(). Messages sent with the same key will always be delivered to the same consumer.
let mut producer = client
.new_producer()
.with_topic("/default/orders")
.with_name("orders_producer")
.with_reliable_dispatch()
.build()?;
producer.create().await?;
// All "payment" messages go to the same consumer
producer.send_with_key(b"Payment for #1001".to_vec(), None, "payment").await?;
// "shipping" messages go to (potentially) a different consumer
producer.send_with_key(b"Order #1001 shipped".to_vec(), None, "shipping").await?;
// Same key = same consumer, guaranteed
producer.send_with_key(b"Payment for #1002".to_vec(), None, "payment").await?;
producer, err := client.NewProducer().
WithName("orders_producer").
WithTopic("/default/orders").
WithDispatchStrategy(danube.NewReliableDispatchStrategy()).
Build()
producer.Create(ctx)
// All "payment" messages go to the same consumer
producer.SendWithKey(ctx, []byte("Payment for #1001"), nil, "payment")
// "shipping" messages go to (potentially) a different consumer
producer.SendWithKey(ctx, []byte("Order #1001 shipped"), nil, "shipping")
// Same key = same consumer, guaranteed
producer.SendWithKey(ctx, []byte("Payment for #1002"), nil, "payment")
producer = (
client.new_producer()
.with_topic("/default/orders")
.with_name("orders_producer")
.with_dispatch_strategy(DispatchStrategy.RELIABLE)
.build()
)
await producer.create()
# All "payment" messages go to the same consumer
await producer.send_with_key(b"Payment for #1001", None, "payment")
# "shipping" messages go to (potentially) a different consumer
await producer.send_with_key(b"Order #1001 shipped", None, "shipping")
# Same key = same consumer, guaranteed
await producer.send_with_key(b"Payment for #1002", None, "payment")
Producer producer = client.newProducer()
.withTopic("/default/orders")
.withName("orders_producer")
.withDispatchStrategy(DispatchStrategy.RELIABLE)
.build();
producer.create();
// All "payment" messages go to the same consumer
producer.sendWithKey("Payment for #1001".getBytes(), Map.of(), "payment");
// "shipping" messages go to (potentially) a different consumer
producer.sendWithKey("Order #1001 shipped".getBytes(), Map.of(), "shipping");
// Same key = same consumer, guaranteed
producer.sendWithKey("Payment for #1002".getBytes(), Map.of(), "payment");
Key points:
send_with_key(payload, attributes, routing_key)tags the message with a routing key- Messages with the same routing key always go to the same consumer
- If you use
send()instead ofsend_with_key(), the producer name is used as the fallback routing key - Routing keys can be any string: order IDs, user IDs, device IDs, event types, etc.
Consumer: Automatic Key Distribution
The simplest Key-Shared consumer uses automatic key distribution via consistent hashing. The broker assigns keys to consumers automatically, no configuration needed.
let mut consumer = client
.new_consumer()
.with_topic("/default/orders")
.with_consumer_name("worker_1")
.with_subscription("orders_sub")
.with_subscription_type(SubType::KeyShared)
.build()?;
consumer.subscribe().await?;
let mut stream = consumer.receive().await?;
while let Some(message) = stream.recv().await {
let key = message.routing_key.as_deref().unwrap_or("<none>");
let payload = String::from_utf8_lossy(&message.payload);
println!("📥 key={:<10} | '{}'", key, payload);
consumer.ack(&message).await?;
}
consumer, err := client.NewConsumer().
WithConsumerName("worker_1").
WithTopic("/default/orders").
WithSubscription("orders_sub").
WithSubscriptionType(danube.KeyShared).
Build()
consumer.Subscribe(ctx)
stream, _ := consumer.Receive(ctx)
for msg := range stream {
fmt.Printf("📥 key=%s payload=%s\n",
msg.GetRoutingKey(), string(msg.GetPayload()))
consumer.Ack(ctx, msg)
}
consumer = (
client.new_consumer()
.with_topic("/default/orders")
.with_consumer_name("worker_1")
.with_subscription("orders_sub")
.with_subscription_type(SubType.KEY_SHARED)
.build()
)
await consumer.subscribe()
queue = await consumer.receive()
while True:
message = await queue.get()
key = message.routing_key if message.HasField("routing_key") else ""
print(f"📥 key={key} payload={message.payload.decode()}")
await consumer.ack(message)
Consumer consumer = client.newConsumer()
.withTopic("/default/orders")
.withConsumerName("worker_1")
.withSubscription("orders_sub")
.withSubscriptionType(SubType.KEY_SHARED)
.build();
consumer.subscribe();
consumer.receive().subscribe(new Flow.Subscriber<>() {
@Override public void onSubscribe(Flow.Subscription s) { s.request(Long.MAX_VALUE); }
@Override
public void onNext(StreamMessage msg) {
String key = msg.routingKey() != null ? msg.routingKey() : "";
System.out.printf("📥 key=%s payload=%s%n", key, new String(msg.payload()));
consumer.ack(msg);
}
@Override public void onError(Throwable t) {}
@Override public void onComplete() {}
});
Running Multiple Consumers
Start multiple consumers with the same subscription to see key distribution:
Expected output:
Each consumer receives a distinct subset of routing keys:
# consumer_1 output
📥 key=shipping | 'Order #1001 shipped via express'
📥 key=shipping | 'Order #1002 shipped via standard'
# consumer_2 output
📥 key=payment | 'Payment received for order #1001'
📥 key=payment | 'Payment received for order #1002'
📥 key=invoice | 'Invoice generated for order #1001'
The exact key-to-consumer assignment depends on consistent hashing but is deterministic — re-running produces the same distribution.
Consumer: Key Filtering
Key filtering gives consumers explicit control over which keys they handle. Each consumer declares glob patterns and only receives messages whose routing key matches.
// This consumer only receives "payment" and "invoice" messages
let mut consumer = client
.new_consumer()
.with_topic("/default/orders")
.with_consumer_name("payments_worker")
.with_subscription("orders_filtered")
.with_subscription_type(SubType::KeyShared)
.with_key_filter("payment")
.with_key_filter("invoice")
.build()?;
consumer.subscribe().await?;
let mut stream = consumer.receive().await?;
while let Some(message) = stream.recv().await {
let key = message.routing_key.as_deref().unwrap_or("<none>");
println!("📥 key={:<10} | '{}'", key, String::from_utf8_lossy(&message.payload));
consumer.ack(&message).await?;
}
// This consumer only receives "payment" and "invoice" messages
consumer, err := client.NewConsumer().
WithConsumerName("payments_worker").
WithTopic("/default/orders").
WithSubscription("orders_filtered").
WithSubscriptionType(danube.KeyShared).
WithKeyFilter("payment").
WithKeyFilter("invoice").
Build()
consumer.Subscribe(ctx)
stream, _ := consumer.Receive(ctx)
for msg := range stream {
fmt.Printf("📥 key=%s payload=%s\n",
msg.GetRoutingKey(), string(msg.GetPayload()))
consumer.Ack(ctx, msg)
}
# This consumer only receives "payment" and "invoice" messages
consumer = (
client.new_consumer()
.with_topic("/default/orders")
.with_consumer_name("payments_worker")
.with_subscription("orders_filtered")
.with_subscription_type(SubType.KEY_SHARED)
.with_key_filter("payment")
.with_key_filter("invoice")
.build()
)
await consumer.subscribe()
queue = await consumer.receive()
while True:
message = await queue.get()
key = message.routing_key if message.HasField("routing_key") else ""
print(f"📥 key={key} payload={message.payload.decode()}")
await consumer.ack(message)
// This consumer only receives "payment" and "invoice" messages
Consumer consumer = client.newConsumer()
.withTopic("/default/orders")
.withConsumerName("payments_worker")
.withSubscription("orders_filtered")
.withSubscriptionType(SubType.KEY_SHARED)
.withKeyFilter("payment")
.withKeyFilter("invoice")
.build();
consumer.subscribe();
consumer.receive().subscribe(new Flow.Subscriber<>() {
@Override public void onSubscribe(Flow.Subscription s) { s.request(Long.MAX_VALUE); }
@Override
public void onNext(StreamMessage msg) {
String key = msg.routingKey() != null ? msg.routingKey() : "";
System.out.printf("📥 key=%s payload=%s%n", key, new String(msg.payload()));
consumer.ack(msg);
}
@Override public void onError(Throwable t) {}
@Override public void onComplete() {}
});
Filter Patterns
Filters use glob syntax:
| Pattern | Matches |
|---|---|
"payment" |
Exact match only |
"ship*" |
"shipping", "shipment", etc. |
"eu-west-?" |
"eu-west-1", "eu-west-2", etc. |
"*" |
Everything (same as no filter) |
Multiple Filters
You can set multiple filters at once or chain individual calls:
Expected Filtered Output
# payments consumer (filters: "payment", "invoice")
📥 key=payment | 'Payment received for order #1001'
📥 key=payment | 'Payment received for order #1002'
📥 key=invoice | 'Invoice generated for order #1001'
# logistics consumer (filters: "ship*", "return")
📥 key=shipping | 'Order #1001 shipped via express'
📥 key=return | 'Return request for order #998'
📥 key=shipping | 'Order #1002 shipped via standard'
Design Behavior
Per-Key Ordering
At most one message per routing key is in-flight at any time. If a second message arrives for a key that already has an unacknowledged message, the second message is queued until the first is acked. This guarantees per-key ordering even with multiple keys being processed in parallel.
Consumer Elasticity
When consumers join or leave, key-to-consumer mappings are rebalanced using consistent hashing. Only approximately 1/N of existing key assignments are remapped (where N is the number of consumers). The rest remain stable.
Unmatched Keys
If a message's routing key matches no consumer's filters, the message is skipped by this subscription. The offset is treated as implicitly acked and the cursor advances past it. The message still remains in the topic and is available for delivery to other subscriptions.
Acknowledgment and Retry
Key-Shared supports the same ack/nack semantics as other reliable subscription types:
- Ack: confirms successful processing, frees the key lock, advances the cursor
- Nack: rejects the message with an optional delay and reason, triggers redelivery according to the failure policy
For failure policy configuration, see Dispatch Strategy - Failure Handling.
Use Cases
Order Processing
Route all events for the same order to the same consumer:
Multi-Tenant Workloads
Dedicate consumers to specific tenants using key filtering:
IoT Device Routing
Route device telemetry by device ID for per-device state tracking:
Full Examples
For complete runnable examples, see the client repositories: