๐Ÿ“ก Orleans Streams: Reactive Messaging for Distributed Grains

Orleans

Microsoft Orleans is best known for its virtual actor model, where ๐ŸŒพ Grains represent independent units of state and behavior.
But when your app needs to react to real-time data โ€” like stock price updates, chat messages ๐Ÿ’ฌ, IoT sensor readings ๐ŸŒก๏ธ, or live game events ๐ŸŽฎ โ€” you need something more dynamic.

Thatโ€™s where Orleans Streams come in.


โš™๏ธ What Are Orleans Streams?

Think of Orleans Streams as event pipelines that connect grains, clients, and external systems.
They provide a pub/sub model โ€” where producers publish messages to a stream, and consumers subscribe to it โ€” without knowing about each other.


๐Ÿ’ก Why Streams Matter

Without streams, grains would need to constantly poll each other or send direct messages, tightly coupling components and creating unnecessary complexity.

With streams:

  • Grains publish events to named channels.
  • Other grains (or clients) subscribe to those events.
  • Orleans ensures messages are delivered reliably, respecting ordering within a stream.

Streams are

  • Reactive
    • Grains react to events instead of constantly polling for updates.
    • The system pushes messages to subscribers whenever something happens.
    • Example: A ChatRoomGrain pushes new messages to subscribed UserGrains automatically โ€” no user polling is needed.
    • Benefits: reduces latency, CPU usage, and simplifies logic.
  • Asynchronous
    • Sending and receiving messages happens in the background.
    • Publishers donโ€™t wait for consumers to process the message.
    • Example: OnNextAsync is called on the stream, and subscribers process it independently.
    • Benefits: highly scalable, non-blocking, and allows many grains to handle thousands of messages concurrently.
  • Location-Transparent
    • Grains can publish and subscribe without knowing each otherโ€™s physical location in the cluster.
    • Example: A SensorGrain on one node can send data to a DashboardGrain on another node โ€” Orleans takes care of routing.
    • Benefits: simplifies distributed system design and allows grains to migrate, restart, or scale without breaking event delivery.

๐Ÿš€ Getting Started with Streams

Letโ€™s walk through a simple example โ€” imagine a Chat Room app.

Each chat room has a unique stream ID, and users (grains) subscribe to it to receive messages.

1๏ธโƒฃ ChatRoom Grain - Stream publisher ๐Ÿ’ฌ

A ChatRoomGrain might look like this:

public interface IChatRoomGrain : IGrainWithGuidKey
{
    Task SendMessageAsync(string message);
}

public class ChatRoomGrain : Grain, IChatRoomGrain
{
    private readonly IClusterClient _clusterClient;
    private IAsyncStream<string>? _stream;

    public ChatRoomGrain(IClusterClient clusterClient)
    {
        _clusterClient = clusterClient;
    }

    public override Task OnActivateAsync(CancellationToken cancellationToken)
    {
        var streamProvider = _clusterClient.GetStreamProvider("chat-streams");
        _stream = streamProvider.GetStream<string>("ChatNamespace", this.GetPrimaryKey());

        return Task.CompletedTask;
    }

    public async Task SendMessageAsync(string message)
    {
        await _stream!.OnNextAsync(message);
    }
}

When someone sends a message, the grain pushes it into the stream โ€” all subscribers will receive it instantly.

2๏ธโƒฃ User grain - Stream subscriber ๐Ÿ“ฌ

Now, users can subscribe to a chat stream and get real-time updates:

public interface IUserGrain : IGrainWithGuidKey
{
    Task Subscribe(Guid chatId);
}

public class UserGrain : Grain, IUserGrain, IAsyncObserver<string>
{
    private readonly IClusterClient _clusterClient;

    public UserGrain(IClusterClient clusterClient)
    {
        _clusterClient = clusterClient;
    }
    
    public async Task Subscribe(Guid chatId)
    {
        var provider = _clusterClient.GetStreamProvider("chat-streams");
        var stream = provider.GetStream<string>("ChatNamespace", chatId);

        await stream.SubscribeAsync(this);
    }

    Task IAsyncObserver<string>.OnNextAsync(string item, StreamSequenceToken? token)
    {
        Console.WriteLine($"New message received : {item}");
        return Task.CompletedTask;
    }

    Task IAsyncObserver<string>.OnCompletedAsync() => Task.CompletedTask;
    
   Task IAsyncObserver<string>.OnErrorAsync(Exception ex) => Task.FromException(ex);
}

3๏ธโƒฃ Define stream and publish a message

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Orleans.Streams;

// Create a generic HostBuilder for hosting the Orleans silo
var builder = new HostBuilder();

// Configure Orleans inside the host
builder.UseOrleans(static siloBuilder =>
{
    // 2a. Use localhost clustering (single-node cluster for development/testing)
    siloBuilder.UseLocalhostClustering();

    // 2b. Add in-memory grain storage for persistence (required for pub/sub bookkeeping)
    siloBuilder.AddMemoryGrainStorage("PubSubStore");

    // 2c. Add an in-memory stream provider named "chat-streams"
    //     This enables grains to publish and subscribe to streams
    siloBuilder.AddMemoryStreams("chat-streams");
});

// Build and start the host (silo is now running)
using var host = builder.Build();
await host.StartAsync();

// Create a new ChatRoomGrain using the IGrainFactory
//    The Guid.NewGuid() ensures a unique chat room ID
var chatRoom = host.Services.GetRequiredService<IGrainFactory>()
    .GetGrain<IChatRoomGrain>(Guid.NewGuid());

// Create a new UserGrain using the IGrainFactory
var user = host.Services.GetRequiredService<IGrainFactory>()
    .GetGrain<IUserGrain>(Guid.NewGuid());

// Subscribe the user to the chat roomโ€™s stream
//    The user will now receive any messages sent to this chat room
await user.Subscribe(chatRoom.GetGrainId().GetGuidKey());

// Send a message to the chat room
//    This publishes the message to the "chat-streams" stream
//    All subscribed users (like the one we just created) will receive it
await chatRoom.SendMessageAsync("Hello world!");

Below diagram illustrates the pub/sub process

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”       โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”       โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ ChatRoomGrain โ”‚       โ”‚ Orleans Stream    โ”‚       โ”‚ UserGrain   โ”‚
โ”‚ (Publisher)   โ”‚โ”€โ”€โ”€โ”€โ”€โ”€>โ”‚ ("chat-streams")  โ”‚โ”€โ”€โ”€โ”€โ”€โ”€>โ”‚ (Subscriber)โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜       โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
            publishes message         delivers message

๐Ÿงฉ When to Use Streams

Use streams when you need:

  • Real-time notifications or broadcasts between grains.
  • Event-driven workflows (IoT, telemetry, chat, trading).
  • Integration with external event systems like Kafka or Azure Event Hubs.

Avoid streams when:

  • You need strict request/response semantics โ€” use regular grain method calls instead.
  • You need cross-silo transaction guarantees โ€” Orleans Streams donโ€™t support distributed transactions.