Writing gRPC Clients and Servers with C++20 Coroutines

The Callback Hell of C++ Async Programming

If you have ever written network programs in C++, you are probably very familiar with this scenario —

You need to send request B after request A completes, then send request C after B completes. So you end up writing code like this:

1
2
3
4
5
6
7
client.send(requestA, [](Response a) {
client.send(requestB(a), [](Response b) {
client.send(requestC(b), [](Response c) {
// Finally made it here...
});
});
});

This is the infamous “callback hell”. The deeper the nesting, the harder the code is to read, and error handling becomes a nightmare — each layer of callback needs to handle errors independently. One missed callback call in a branch, and the entire request silently disappears without a trace.

The callback pattern also introduces another thorny problem: lifetime management. When a callback executes, every captured object must still be alive, yet their lifetimes are often hard to predict. You end up littering the code with shared_ptr and shared_from_this() just to keep things alive — ugly and unnecessarily expensive.

The most maddening part is cancellation. Imagine a user closes a page and you want to cancel an in-progress multi-step operation, but every step in the callback chain may have already started. How do you propagate the cancel signal? How do you ensure all resources are properly cleaned up? This is nearly an unsolvable problem.

If you have worked with async programming in Go or Python, you have probably envied their concise coroutine syntax — expressing asynchronous logic with synchronous-looking code. The good news is that C++20 introduced coroutines, and C++ programmers finally have a proper tool for the job.

The Async World of gRPC

Before diving in, let’s take a look at what gRPC’s async model looks like and what new challenges it brings.

The C++ implementation of gRPC provides two async APIs: the classic CompletionQueue-based model and the Reactor pattern callback model. Clients typically use the Reactor pattern — inheriting various ClientXxxReactor classes and implementing callbacks; servers typically use the CompletionQueue model — registering operations with the queue and polling for results.

For a client-side server-streaming RPC, you need to inherit grpc::ClientReadReactor<T> and implement several callbacks:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class MyReader : public grpc::ClientReadReactor<Response> {
public:
void OnReadDone(bool ok) override {
if (ok) {
// Process the received data, then call StartRead again
StartRead(&response_);
}
}

void OnDone(const grpc::Status &status) override {
// RPC finished
}

private:
Response response_;
};

The server side is even more complex. With the CompletionQueue-based model, you need to manually maintain a state machine:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
enum class State { WAIT, READ, WRITE, FINISH };

class Handler {
void Proceed() {
switch (state_) {
case State::WAIT:
// Register the next request, transition to READ state
break;
case State::READ:
// Read data, decide whether to continue reading or writing
break;
// ...
}
}
};

This state machine must be maintained by hand. A single wrong state transition can cause data loss or a crash. Every time you need to implement this logic for a new RPC method, you have to go through the same ordeal.

Worse still, both approaches make it very hard to support cancellation cleanly. In the Reactor model, cancellation means calling context->TryCancel() and waiting for the OnDone callback; in the CompletionQueue model, state transitions after cancellation require extra care.

The root of the problem is: gRPC’s async API is designed around callbacks, while we want to organize code around the logical flow.

Promise and Future: Bridging Callbacks and Coroutines

This is exactly where asyncio shines.

asyncio is an async framework built on C++20 coroutines and the libuv event loop. Its core design idea is simple: use Promise and Future as a bridge to connect the callback world and the coroutine world.

A Promise is an object that can be “resolved” or “rejected”. Future is its other face — you can co_await a Future, and the coroutine will automatically resume when the Promise is resolved.

Take the sleep function as an example, to see how asyncio does it:

1
2
3
4
5
6
7
8
9
10
asyncio::task::Task<void> asyncio::sleep(std::chrono::milliseconds ms) {
Promise<void> promise;

uv_timer_start(timer, [](uv_timer_t *handle) {
// Timer fires, resolve the promise, coroutine resumes
static_cast<Promise<void> *>(handle->data)->resolve();
}, ms.count(), 0);

co_return co_await promise.getFuture();
}

Apply the same treatment to gRPC callbacks: resolve or reject a Promise inside the callback, and co_await the corresponding Future on the coroutine side. This lets you write what would otherwise require nested callbacks as straight-line code.

Cancellation Support

Coroutine cancellation is also implemented through the Promise mechanism. asyncio provides a Cancellable wrapper that takes a Future and a cancellation function:

1
2
3
4
5
6
7
8
co_return co_await asyncio::task::Cancellable{
promise.getFuture(),
[&]() -> std::expected<void, std::error_code> {
// Perform the actual cancellation here
context->TryCancel();
return {};
}
};

When external code calls task.cancel(), asyncio walks the task chain to find the Cancellable currently being awaited and executes its cancellation function. For gRPC, the cancellation function simply calls context->TryCancel(). gRPC then handles the cleanup, triggers the OnDone callback, the Promise is eventually rejected, and the coroutine ends with a cancellation error.

With this mechanism, we can cleanly add cancellation support to any gRPC async operation without introducing any special state variables into business code.

Defining the Sample Service

Before writing code, let’s define a sample service covering all four RPC types:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
syntax = "proto3";

package sample;

service SampleService {
// Unary RPC: simplest request-response pattern
rpc Echo(EchoRequest) returns (EchoResponse);

// Server streaming: server continuously pushes data to the client
rpc GetNumbers(GetNumbersRequest) returns (stream Number);

// Client streaming: client continuously sends data, server returns one result
rpc Sum(stream Number) returns (SumResponse);

// Bidirectional streaming: both sides can send and receive continuously
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}

message EchoRequest { string message = 1; }
message EchoResponse { string message = 1; int64 timestamp = 2; }
message GetNumbersRequest { int32 value = 1; int32 count = 2; }
message Number { int32 value = 1; }
message SumResponse { int32 total = 1; int32 count = 2; }
message ChatMessage { string user = 1; string content = 2; int64 timestamp = 3; }

Each RPC pattern has its use: Echo is the canonical RPC, GetNumbers lets the server stream a batch of data, Sum lets the client stream data and get an aggregated result, and Chat is the most complex — a bidirectional stream where either side can send at any time.

Let’s implement them one by one, starting with the simplest: client Unary RPC.


Client Unary RPC

Unary RPC is the most straightforward: send one request, receive one response. The corresponding gRPC Reactor method signature is:

1
2
3
4
5
6
void Stub::async::Echo(
grpc::ClientContext *,
const EchoRequest *,
EchoResponse *,
std::function<void(grpc::Status)>
);

gRPC calls the callback we pass in when the RPC completes. Bridging it with Promise is natural:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
asyncio::task::Task<sample::EchoResponse> echo(
const sample::EchoRequest &request,
std::shared_ptr<grpc::ClientContext> context
) {
sample::EchoResponse response;
asyncio::Promise<void, std::string> promise;

stub->async()->Echo(
context.get(), &request, &response,
[&](const grpc::Status &status) {
if (!status.ok()) {
promise.reject(status.error_message());
return;
}

promise.resolve();
}
);

if (const auto result = co_await asyncio::task::Cancellable{
promise.getFuture(),
[&]() -> std::expected<void, std::error_code> {
context->TryCancel();
return {};
}
}; !result)
throw co_await asyncio::error::StacktraceError<std::runtime_error>::make(result.error());

co_return response;
}

The core logic is just a few lines: construct a Promise, decide resolve or reject based on Status inside the callback, then co_await the Promise‘s Future.

Cancellation support is woven in naturally — wrap the Future with Cancellable, call context->TryCancel() on cancellation. From the caller’s perspective, this function is indistinguishable from a normal synchronous function, yet it never blocks the event loop.

Client Streaming RPCs

Streaming RPCs are more complex than Unary, because data is transmitted one piece at a time and each read or write is an independent async operation.

Server Streaming (Reader)

For a server-streaming RPC, the client needs to inherit grpc::ClientReadReactor<T>. Each call to StartRead causes gRPC to invoke OnReadDone when data is ready.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
template<typename T>
class Reader final : public grpc::ClientReadReactor<T> {
public:
explicit Reader(std::shared_ptr<grpc::ClientContext> context) : mContext{std::move(context)} {
}

void OnDone(const grpc::Status &status) override {
if (!status.ok()) {
mDonePromise.reject(status.error_message());
return;
}

mDonePromise.resolve();
}

void OnReadDone(const bool ok) override {
// Resolve the per-read Promise when each read completes
std::exchange(mReadPromise, std::nullopt)->resolve(ok);
}

asyncio::task::Task<std::optional<T>> read() {
T element;

asyncio::Promise<bool> promise;
auto future = promise.getFuture();

mReadPromise.emplace(std::move(promise));
grpc::ClientReadReactor<T>::StartRead(&element);

// ok == false means the stream has ended
if (!co_await asyncio::task::Cancellable{
std::move(future),
[this]() -> std::expected<void, std::error_code> {
mContext->TryCancel();
return {};
}
})
co_return std::nullopt;

co_return element;
}

asyncio::task::Task<void> done() {
if (const auto result = co_await mDonePromise.getFuture(); !result)
throw co_await asyncio::error::StacktraceError<std::runtime_error>::make(result.error());
}

private:
std::shared_ptr<grpc::ClientContext> mContext;
asyncio::Promise<void, std::string> mDonePromise;
std::optional<asyncio::Promise<bool>> mReadPromise;
};

A few details worth noting:

  • mReadPromise is wrapped in std::optional because each call to read() constructs a new Promise.
  • std::exchange is used in OnReadDone to take ownership of the Promise, signaling that a single read has completed.
  • done() waits for the entire stream to finish; it holds a separate mDonePromise distinct from the per-read promise.

Using Reader to consume a server-streaming RPC:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
asyncio::task::Task<void> getNumbers(
const sample::GetNumbersRequest &request,
std::shared_ptr<grpc::ClientContext> context
) {
Reader<sample::Number> reader{context};
stub->async()->GetNumbers(context.get(), &request, &reader);

reader.AddHold();
reader.StartCall();

while (true) {
auto number = co_await reader.read();

if (!number)
break; // stream ended

fmt::print("Received: {}\n", number->value());
}

reader.RemoveHold();
co_await reader.done(); // wait for the stream to fully finish and check for errors
}

AddHold and RemoveHold are gRPC Reactor lifecycle control mechanisms that prevent the Reactor from being destroyed while we hold it.

Client Streaming (Writer)

Client-streaming RPC is similar to server-streaming but in the opposite direction. Inherit grpc::ClientWriteReactor<T>; after each StartWrite completes, OnWriteDone is called back:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
template<typename T>
class Writer final : public grpc::ClientWriteReactor<T> {
public:
void OnWriteDone(const bool ok) override {
std::exchange(mWritePromise, std::nullopt)->resolve(ok);
}

void OnWritesDoneDone(const bool ok) override {
std::exchange(mWriteDonePromise, std::nullopt)->resolve(ok);
}

asyncio::task::Task<bool> write(const T element) {
asyncio::Promise<bool> promise;
auto future = promise.getFuture();

mWritePromise.emplace(std::move(promise));
grpc::ClientWriteReactor<T>::StartWrite(&element);

co_return co_await asyncio::task::Cancellable{
std::move(future),
[this]() -> std::expected<void, std::error_code> {
mContext->TryCancel();
return {};
}
};
}

asyncio::task::Task<bool> writeDone() {
asyncio::Promise<bool> promise;
auto future = promise.getFuture();

mWriteDonePromise.emplace(std::move(promise));
grpc::ClientWriteReactor<T>::StartWritesDone();

co_return co_await asyncio::task::Cancellable{
std::move(future),
[this]() -> std::expected<void, std::error_code> {
mContext->TryCancel();
return {};
}
};
}

// OnDone and done() are similar to Reader, omitted for brevity

private:
std::shared_ptr<grpc::ClientContext> mContext;
asyncio::Promise<void, std::string> mDonePromise;
std::optional<asyncio::Promise<bool>> mWritePromise;
std::optional<asyncio::Promise<bool>> mWriteDonePromise;
};

writeDone() corresponds to gRPC’s StartWritesDone, which signals to the server that the client has finished writing — equivalent to sending EOF on the stream.

Client Bidirectional Streaming

Bidirectional streaming is the most complex of the four patterns: the client simultaneously has both read and write capabilities. Fortunately, all that is needed is to merge the Reader and Writer logic into a single Stream class:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
template<typename RequestElement, typename ResponseElement>
class Stream final : public grpc::ClientBidiReactor<RequestElement, ResponseElement> {
public:
// OnReadDone, OnWriteDone, OnWritesDoneDone, OnDone are the same as before

asyncio::task::Task<std::optional<ResponseElement>> read() {
ResponseElement element;

asyncio::Promise<bool> promise;
auto future = promise.getFuture();

mReadPromise.emplace(std::move(promise));
grpc::ClientBidiReactor<RequestElement, ResponseElement>::StartRead(&element);

if (!co_await asyncio::task::Cancellable{
std::move(future),
[this]() -> std::expected<void, std::error_code> {
mContext->TryCancel();
return {};
}
})
co_return std::nullopt;

co_return element;
}

asyncio::task::Task<bool> write(const RequestElement element) { /* same as Writer */ }
asyncio::task::Task<bool> writeDone() { /* same as Writer */ }
asyncio::task::Task<void> done() { /* same as Reader */ }

private:
std::shared_ptr<grpc::ClientContext> mContext;
asyncio::Promise<void, std::string> mDonePromise;
std::optional<asyncio::Promise<bool>> mReadPromise;
std::optional<asyncio::Promise<bool>> mWritePromise;
std::optional<asyncio::Promise<bool>> mWriteDonePromise;
};

When using a bidirectional stream, reading and writing are two independent tasks that run concurrently:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
asyncio::task::Task<void> chat(std::shared_ptr<grpc::ClientContext> context) {
Stream<sample::ChatMessage, sample::ChatMessage> stream{context};
stub->async()->Chat(context.get(), &stream);

stream.AddHold();
stream.StartCall();

co_await all(
// Read task
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
while (const auto msg = co_await stream.read()) {
fmt::print("Received: {}\n", msg->content());
}
}),
// Write task
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
sample::ChatMessage msg;
msg.set_content("Hello!");
co_await stream.write(msg);
co_await stream.writeDone();
})
);

stream.RemoveHold();
co_await stream.done();
}

all() waits for both the read and write subtasks simultaneously. If either fails, it cancels the other and returns the error. This is the task-tree mechanism of asyncio in action — structured concurrency.

Wrapping GenericClient

The three streaming wrappers above all follow the same pattern, so it is time to unify them with templates. GenericClient provides one call overload for each of the four RPC types:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
template<typename T>
class GenericClient {
using Stub = T::Stub;
using AsyncStub = class Stub::async;

public:
explicit GenericClient(std::unique_ptr<Stub> stub) : mStub{std::move(stub)} {
}

protected:
// 1. Unary RPC
template<typename Request, typename Response>
asyncio::task::Task<Response>
call(
void (AsyncStub::*method)(grpc::ClientContext *, const Request *, Response *,
std::function<void(grpc::Status)>),
std::shared_ptr<grpc::ClientContext> context,
Request request
) {
Response response;
asyncio::Promise<void, std::string> promise;

std::invoke(
method,
mStub->async(),
context.get(),
&request,
&response,
[&](const grpc::Status &status) {
if (!status.ok()) {
promise.reject(status.error_message());
return;
}

promise.resolve();
}
);

if (const auto result = co_await asyncio::task::Cancellable{
promise.getFuture(),
[&]() -> std::expected<void, std::error_code> {
context->TryCancel();
return {};
}
}; !result)
throw co_await asyncio::error::StacktraceError<std::runtime_error>::make(result.error());

co_return response;
}

// 2. Server streaming: push data into a channel via Sender
template<typename Request, typename Element>
asyncio::task::Task<void>
call(
void (AsyncStub::*method)(grpc::ClientContext *, const Request *,
grpc::ClientReadReactor<Element> *),
std::shared_ptr<grpc::ClientContext> context,
Request request,
asyncio::Sender<Element> sender
) {
Reader<Element> reader{context};
std::invoke(method, mStub->async(), context.get(), &request, &reader);

reader.AddHold();
reader.StartCall();

const auto result = co_await asyncio::error::capture(
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
while (true) {
auto element = co_await reader.read();

if (!element)
break;

co_await asyncio::error::guard(sender.send(*std::move(element)));
}
})
);

reader.RemoveHold();
co_await reader.done();

if (!result)
std::rethrow_exception(result.error());
}

// 3. Client streaming: read data from Receiver and write into the stream
template<typename Response, typename Element>
asyncio::task::Task<Response>
call(
void (AsyncStub::*method)(grpc::ClientContext *, Response *,
grpc::ClientWriteReactor<Element> *),
std::shared_ptr<grpc::ClientContext> context,
asyncio::Receiver<Element> receiver
) {
Response response;
Writer<Element> writer{context};
std::invoke(method, mStub->async(), context.get(), &response, &writer);

writer.AddHold();
writer.StartCall();

const auto result = co_await asyncio::error::capture(
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
while (true) {
auto element = co_await receiver.receive();

if (!element) {
if (!co_await writer.writeDone())
fmt::print(stderr, "Write done failed\n");

if (element.error() != asyncio::ReceiveError::Disconnected)
throw co_await asyncio::error::StacktraceError<std::system_error>::make(element.error());

break;
}

co_await writer.write(*std::move(element));
}
})
);

writer.RemoveHold();
co_await writer.done();

if (!result)
std::rethrow_exception(result.error());

co_return response;
}

// 4. Bidirectional streaming: hold both a Receiver (input) and a Sender (output)
template<typename RequestElement, typename ResponseElement>
asyncio::task::Task<void>
call(
void (AsyncStub::*method)(grpc::ClientContext *,
grpc::ClientBidiReactor<RequestElement, ResponseElement> *),
std::shared_ptr<grpc::ClientContext> context,
asyncio::Receiver<RequestElement> receiver,
asyncio::Sender<ResponseElement> sender
) {
Stream<RequestElement, ResponseElement> stream{context};
std::invoke(method, mStub->async(), context.get(), &stream);

stream.AddHold();
stream.StartCall();

const auto result = co_await asyncio::error::capture(
all(
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
while (true) {
auto element = co_await stream.read();

if (!element)
break;

if (const auto res = co_await sender.send(*std::move(element)); !res) {
context->TryCancel();
throw co_await asyncio::error::StacktraceError<std::system_error>::make(res.error());
}
}
}),
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
while (true) {
auto element = co_await receiver.receive();

if (!element) {
if (!co_await stream.writeDone())
fmt::print(stderr, "Write done failed\n");

if (element.error() != asyncio::ReceiveError::Disconnected)
throw co_await asyncio::error::StacktraceError<std::system_error>::make(element.error());

break;
}

co_await stream.write(*std::move(element));
}
})
)
);

stream.RemoveHold();
co_await stream.done();

if (!result)
std::rethrow_exception(result.error());
}

private:
std::unique_ptr<Stub> mStub;
};

The four overloads are distinguished automatically by parameter types — the compiler selects the correct overload based on the method pointer type passed in. This is a classic use of template metaprogramming: different call patterns map to different function signatures, with zero runtime overhead.

For streaming RPCs, GenericClient uses asyncio::channel as the data conduit: Sender writes data into the channel, Receiver reads from it. The channel’s close signal (Receiver receiving a Disconnected error) naturally maps to stream EOF.

Implementing the Concrete Client

With GenericClient in place, implementing a concrete service client is straightforward:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class Client final : public GenericClient<sample::SampleService> {
public:
using GenericClient::GenericClient;

static Client make(const std::string &address) {
return Client{sample::SampleService::NewStub(grpc::CreateChannel(address, grpc::InsecureChannelCredentials()))};
}

asyncio::task::Task<sample::EchoResponse>
echo(
sample::EchoRequest request,
std::unique_ptr<grpc::ClientContext> context = std::make_unique<grpc::ClientContext>()
) {
co_return co_await call(&sample::SampleService::Stub::async::Echo, std::move(context), std::move(request));
}

asyncio::task::Task<void>
getNumbers(
sample::GetNumbersRequest request,
asyncio::Sender<sample::Number> sender,
std::unique_ptr<grpc::ClientContext> context = std::make_unique<grpc::ClientContext>()
) {
co_await call(
&sample::SampleService::Stub::async::GetNumbers,
std::move(context),
std::move(request),
std::move(sender)
);
}

asyncio::task::Task<sample::SumResponse> sum(
asyncio::Receiver<sample::Number> receiver,
std::unique_ptr<grpc::ClientContext> context = std::make_unique<grpc::ClientContext>()
) {
co_return co_await call(&sample::SampleService::Stub::async::Sum, std::move(context), std::move(receiver));
}

asyncio::task::Task<void>
chat(
asyncio::Receiver<sample::ChatMessage> receiver,
asyncio::Sender<sample::ChatMessage> sender,
std::unique_ptr<grpc::ClientContext> context = std::make_unique<grpc::ClientContext>()
) {
co_return co_await call(
&sample::SampleService::Stub::async::Chat,
std::move(context),
std::move(receiver),
std::move(sender)
);
}
};

Here is how to call all four RPC types concurrently, showcasing the elegance of asyncio’s concurrent programming model:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
asyncio::task::Task<void> asyncMain(const int argc, char *argv[]) {
auto client = Client::make("localhost:50051");

co_await all(
// Unary RPC
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
sample::EchoRequest req;
req.set_message("Hello gRPC!");
const auto resp = co_await client.echo(req);
fmt::print("Echo: {}\n", resp.message());
}),

// Server streaming + client streaming, connected via a channel
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
sample::GetNumbersRequest req;
req.set_value(1);
req.set_count(5);

auto [sender, receiver] = asyncio::channel<sample::Number>();

const auto result = co_await all(
client.getNumbers(req, std::move(sender)),
client.sum(std::move(receiver))
);

const auto &resp = std::get<sample::SumResponse>(result);
fmt::print("Sum: {}, count: {}\n", resp.total(), resp.count());
}),

// Bidirectional streaming
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
auto [inSender, inReceiver] = asyncio::channel<sample::ChatMessage>();
auto [outSender, outReceiver] = asyncio::channel<sample::ChatMessage>();

co_await all(
client.chat(std::move(outReceiver), std::move(inSender)),
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
sample::ChatMessage msg;
msg.set_content("Hello server!");
co_await asyncio::error::guard(outSender.send(std::move(msg)));
outSender.close();
}),
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
const auto msg = co_await asyncio::error::guard(inReceiver.receive());
fmt::print("Chat reply: {}\n", msg.content());
})
);
})
);
}

The channel-based pipeline connecting getNumbers and sum is especially worth noting: numbers produced by the server-streaming RPC flow directly through the channel into the client-streaming RPC. The whole pipeline looks like synchronous code, but is fully asynchronous underneath.


Server Concurrency Model

The server has one extra dimension compared to the client: concurrency. A production-grade service must handle multiple client requests simultaneously, which requires the framework to dynamically manage an unknown number of concurrent tasks.

How CompletionQueue Works

The heart of the gRPC async server is CompletionQueue. The usage is roughly: when registering an async operation with gRPC, pass in a void *tag; when the operation completes, call CompletionQueue::Next() to retrieve the tag and learn whether the operation succeeded (bool ok):

1
2
3
4
5
6
7
void *tag{};
bool ok{};

while (cq->Next(&tag, &ok)) {
// tag points to the object we passed in at registration; ok is the result
dispatch(tag, ok);
}

But Next() is a blocking call — it blocks until an event arrives. Calling it directly on the event loop thread would freeze all of asyncio. The solution is to run it in a separate thread using asyncio::toThread:

1
2
3
4
5
6
7
8
9
asyncio::toThread([this] {
void *tag{};
bool ok{};

while (mCompletionQueue->Next(&tag, &ok)) {
// "Post" the completion event back to the event loop thread
static_cast<asyncio::Promise<bool> *>(tag)->resolve(ok);
}
})

Bridging with Promise

The other key question is: what does the tag point to? When registering an async operation, we pass the address of an asyncio::Promise<bool> as the tag:

1
2
3
4
5
6
7
8
9
10
11
asyncio::Promise<bool> promise;

service->RequestEcho(
context.get(), &request, &writer,
mCompletionQueue.get(), mCompletionQueue.get(),
&promise // <-- the tag is the address of the Promise
);

// Suspend the coroutine, wait for CompletionQueue notification
if (!co_await promise.getFuture())
break; // ok == false means the CompletionQueue has shut down

When the operation completes, Next() returns tag = &promise, which then calls promise->resolve(ok). This resolve wakes up the coroutine suspended at co_await promise.getFuture(), seamlessly delivering the gRPC completion notification to the coroutine world.

The entire GenericServer::run() coordinates the two sides exactly this way:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
virtual asyncio::task::Task<void> run() {
co_await all(
dispatch(), // coroutine side: wait for and dispatch requests
asyncio::toThread(
[this] { // thread side: block-poll the CompletionQueue
void *tag{};
bool ok{};

while (mCompletionQueue->Next(&tag, &ok)) {
static_cast<asyncio::Promise<bool> *>(tag)->resolve(ok);
}
}
)
);
}

dispatch() and toThread(...) run concurrently: the thread continuously pulls completion events from CompletionQueue and resolves the corresponding Promise; the coroutine side waits at co_await promise.getFuture() to be woken up, then handles a new request or a read/write completion each time. The two sides are decoupled through Promise and never block each other.

Dynamic Concurrency with TaskGroup

With the CompletionQueue bridge in place, the concurrency problem becomes clear. asyncio‘s TaskGroup is exactly what is needed to manage dynamically spawned concurrent tasks:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
asyncio::task::TaskGroup group;

while (true) {
// Wait for the next request
asyncio::Promise<bool> promise;
service->RequestEcho(context, &request, &writer, cq, cq, &promise);

if (!co_await promise.getFuture())
break; // CompletionQueue has shut down

// Spawn an independent handler task for this request
auto task = asyncio::task::spawn(handleRequest(...));

// Add to TaskGroup; the task removes itself automatically when it completes
group.add(task);

// Attach an error callback so unhandled exceptions are not silently lost
task.future().fail([](const auto &e) {
fmt::print(stderr, "Unhandled exception: {}\n", e);
});
}

// On shutdown: cancel all in-flight requests and wait for them to finish
std::ignore = group.cancel();
co_await group;

A few important design decisions here:

  1. Accept loop decoupled from handler tasks: the loop waiting for new requests never blocks on handler logic, ensuring high concurrency.
  2. TaskGroup manages lifetimes: after each handler task joins the TaskGroup, a single cancel-and-wait call at shutdown drains all of them cleanly.
  3. Error isolation: a single request failure does not affect the rest of the service; errors are captured by the fail callback and logged, while other requests continue processing.

Server Unary RPC

With the concurrency model as background, implementing server Unary RPC is straightforward. The server uses grpc::ServerAsyncResponseWriter<Response> to send the response:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
asyncio::task::Task<void>
handleUnary(
sample::SampleService::AsyncService *service,
grpc::ServerCompletionQueue *cq
) {
asyncio::task::TaskGroup group;

while (true) {
auto context = std::make_unique<grpc::ServerContext>();
sample::EchoRequest request;
grpc::ServerAsyncResponseWriter<sample::EchoResponse> writer{context.get()};

{
asyncio::Promise<bool> promise;
service->RequestEcho(context.get(), &request, &writer, cq, cq, &promise);

if (!co_await promise.getFuture())
break;
}

auto task = asyncio::task::spawn(
[
context = std::move(context), request = std::move(request), writer = std::move(writer)
]() mutable -> asyncio::task::Task<void> {
// Execute business logic
const auto response = co_await asyncio::error::capture(echo(std::move(request)));

asyncio::Promise<bool> promise;

if (response) {
writer.Finish(*response, grpc::Status::OK, &promise);
}
else {
writer.FinishWithError(
grpc::Status{grpc::StatusCode::INTERNAL, fmt::to_string(response.error())},
&promise
);
}

// Wait for Finish to complete; cancellation is supported here too
if (!co_await asyncio::task::Cancellable{
promise.getFuture(),
[&]() -> std::expected<void, std::error_code> {
context->TryCancel();
return {};
}
})
throw co_await asyncio::error::StacktraceError<std::system_error>::make(
asyncio::task::Error::Cancelled
);
}
);

group.add(task);
task.future().fail([](const auto &e) {
fmt::print(stderr, "Unhandled exception: {}\n", e);
});
}

std::ignore = group.cancel();
co_await group;
}

asyncio::error::capture is a utility that catches exceptions thrown by a coroutine and converts them to std::expected, enabling error handling with value semantics rather than exceptions. This way, even if business logic throws, we can gracefully convert it into a gRPC error status returned to the client instead of crashing the entire program.

Server Streaming RPCs

Server streaming RPCs require separate async wrappers for client-side reading and server-side writing. Unlike the client’s Reactor pattern, all server-side read/write operations go through CompletionQueue — pass a Promise address as the tag, and when the operation completes Next() retrieves it and calls resolve, delivering the completion notification to the coroutine.

Server-Side Reader (Client Streaming)

When the server reads data from the client, it uses grpc::ServerAsyncReader<Response, Element>. It has two type parameters: Response is the type returned to the client at the end, and Element is the type of each stream element read.

There is a subtle problem with this design: the Response type leaks into Reader itself, meaning different response types require different Reader instances. We work around this with type erasure — hide the concrete Response type inside Impl, and expose only the generic IReader interface to the outside:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
template<typename Element>
class Reader {
public:
// IReader allows different underlying gRPC reader types to share one façade
class IReader {
public:
virtual ~IReader() = default;
virtual asyncio::task::Task<std::optional<Element>> read() = 0;
};

template<typename Response>
class Impl : public IReader {
public:
Impl(
std::shared_ptr<grpc::ServerContext> context,
std::shared_ptr<grpc::ServerAsyncReader<Response, Element>> reader
) : mContext{std::move(context)}, mReader{std::move(reader)} {
}

asyncio::task::Task<std::optional<Element>> read() override {
Element element;
asyncio::Promise<bool> promise;

mReader->Read(&element, &promise);

if (!co_await asyncio::task::Cancellable{
promise.getFuture(),
[this]() -> std::expected<void, std::error_code> {
mContext->TryCancel();
return {};
}
}) {
// Distinguish between "cancelled" and "stream ended"
if (co_await asyncio::task::cancelled)
throw co_await asyncio::error::StacktraceError<std::system_error>::make(
asyncio::task::Error::Cancelled
);

co_return std::nullopt;
}

co_return element;
}

private:
std::shared_ptr<grpc::ServerContext> mContext;
std::shared_ptr<grpc::ServerAsyncReader<Response, Element>> mReader;
};

explicit Reader(std::unique_ptr<IReader> reader) : mReader{std::move(reader)} {
}

asyncio::task::Task<std::optional<Element>> read() {
co_return co_await mReader->read();
}

private:
std::unique_ptr<IReader> mReader;
};

Notice the co_await asyncio::task::cancelled check in the read() implementation — this is the key to distinguishing “reached end of stream (ok == false)” from “task was cancelled”. The former is a normal termination signal; the latter should propagate a cancellation error upward.

With Reader in place, the client-streaming request accept loop mirrors Unary: wait for a new request, construct a Reader and pass it to the business function, then send back the Response returned by the business function via reader->Finish():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
asyncio::task::TaskGroup group;

while (true) {
auto context = std::make_shared<grpc::ServerContext>();
auto reader = std::make_shared<grpc::ServerAsyncReader<sample::SumResponse, sample::Number>>(context.get());

{
asyncio::Promise<bool> promise;
service->RequestSum(context.get(), reader.get(), cq, cq, &promise);

if (!co_await promise.getFuture())
break;
}

auto task = asyncio::task::spawn(
[context = std::move(context), reader = std::move(reader)]() -> asyncio::task::Task<void> {
using Impl = Reader<sample::Number>::Impl<sample::SumResponse>;

const auto response = co_await asyncio::error::capture(
sum(Reader<sample::Number>{std::make_unique<Impl>(context, reader)})
);

asyncio::Promise<bool> promise;

if (response) {
reader->Finish(*response, grpc::Status::OK, &promise);
}
else {
reader->FinishWithError(
grpc::Status{grpc::StatusCode::INTERNAL, fmt::to_string(response.error())},
&promise
);
}

if (!co_await asyncio::task::Cancellable{
promise.getFuture(),
[&]() -> std::expected<void, std::error_code> {
context->TryCancel();
return {};
}
})
throw co_await asyncio::error::StacktraceError<std::system_error>::make(
asyncio::task::Error::Cancelled
);
}
);

group.add(task);
task.future().fail([](const auto &e) {
fmt::print(stderr, "Unhandled exception: {}\n", e);
});
}

std::ignore = group.cancel();
co_await group;

Server-Side Writer (Server Streaming)

For writing data from server to client, grpc::ServerAsyncWriter<T> is used:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
template<typename T>
class Writer {
public:
Writer(std::shared_ptr<grpc::ServerContext> context, std::shared_ptr<grpc::ServerAsyncWriter<T>> writer)
: mContext{std::move(context)}, mWriter{std::move(writer)} {
}

asyncio::task::Task<void> write(const T element) {
asyncio::Promise<bool> promise;
mWriter->Write(element, &promise);

if (!co_await asyncio::task::Cancellable{
promise.getFuture(),
[this]() -> std::expected<void, std::error_code> {
mContext->TryCancel();
return {};
}
})
throw co_await asyncio::error::StacktraceError<std::system_error>::make(
asyncio::task::Error::Cancelled
);
}

private:
std::shared_ptr<grpc::ServerContext> mContext;
std::shared_ptr<grpc::ServerAsyncWriter<T>> mWriter;
};

The server-side Writer is slightly simpler than the client-side one — the server does not need to send WritesDone (that is the client’s privilege).

The original gRPC design has the stream end via the Writer’s Finish or FinishWithError, but we apply a similar redesign as for Reader.

The server-streaming request accept loop mirrors client streaming: wait for a new request, construct a Writer and pass it to the business function; after the business function returns, call writer->Finish() to end the stream:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
asyncio::task::TaskGroup group;

while (true) {
auto context = std::make_shared<grpc::ServerContext>();

sample::GetNumbersRequest request;
auto writer = std::make_shared<grpc::ServerAsyncWriter<sample::Number>>(context.get());

{
asyncio::Promise<bool> promise;
service->RequestGetNumbers(context.get(), &request, writer.get(), cq, cq, &promise);

if (!co_await promise.getFuture())
break;
}

auto task = asyncio::task::spawn(
[
context = std::move(context), request = std::move(request), writer = std::move(writer)
]() mutable -> asyncio::task::Task<void> {
const auto result = co_await asyncio::error::capture(
getNumbers(std::move(request), Writer{context, writer})
);

asyncio::Promise<bool> promise;

if (result) {
writer->Finish(grpc::Status::OK, &promise);
}
else {
writer->Finish(
grpc::Status{grpc::StatusCode::INTERNAL, fmt::to_string(result.error())},
&promise
);
}

if (!co_await asyncio::task::Cancellable{
promise.getFuture(),
[&]() -> std::expected<void, std::error_code> {
context->TryCancel();
return {};
}
})
throw co_await asyncio::error::StacktraceError<std::system_error>::make(
asyncio::task::Error::Cancelled
);
}
);

group.add(task);
task.future().fail([](const auto &e) {
fmt::print(stderr, "Unhandled exception: {}\n", e);
});
}

std::ignore = group.cancel();
co_await group;

Server Bidirectional Streaming

The server bidirectional stream wraps grpc::ServerAsyncReaderWriter<ResponseElement, RequestElement>, combining both read and write directions:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
template<typename RequestElement, typename ResponseElement>
class Stream {
public:
Stream(
std::shared_ptr<grpc::ServerContext> context,
std::shared_ptr<grpc::ServerAsyncReaderWriter<ResponseElement, RequestElement>> stream
) : mContext{std::move(context)}, mStream{std::move(stream)} {
}

asyncio::task::Task<std::optional<RequestElement>> read() {
RequestElement element;
asyncio::Promise<bool> promise;

mStream->Read(&element, &promise);

if (!co_await asyncio::task::Cancellable{
promise.getFuture(),
[this]() -> std::expected<void, std::error_code> {
mContext->TryCancel();
return {};
}
}) {
if (co_await asyncio::task::cancelled)
throw co_await asyncio::error::StacktraceError<std::system_error>::make(
asyncio::task::Error::Cancelled
);

co_return std::nullopt;
}

co_return element;
}

asyncio::task::Task<void> write(const ResponseElement element) {
asyncio::Promise<bool> promise;
mStream->Write(element, &promise);

if (!co_await asyncio::task::Cancellable{
promise.getFuture(),
[this]() -> std::expected<void, std::error_code> {
mContext->TryCancel();
return {};
}
})
throw co_await asyncio::error::StacktraceError<std::system_error>::make(
asyncio::task::Error::Cancelled
);
}

private:
std::shared_ptr<grpc::ServerContext> mContext;
std::shared_ptr<grpc::ServerAsyncReaderWriter<ResponseElement, RequestElement>> mStream;
};

The bidirectional streaming request accept loop follows the same familiar pattern: register with ServerAsyncReaderWriter, construct a Stream and pass it to the business function chat; after chat returns, call stream->Finish() to wrap up:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
asyncio::task::TaskGroup group;

while (true) {
auto context = std::make_shared<grpc::ServerContext>();
auto stream = std::make_shared<grpc::ServerAsyncReaderWriter<
sample::ChatMessage, sample::ChatMessage>>(context.get());

{
asyncio::Promise<bool> promise;
service->RequestChat(context.get(), stream.get(), cq, cq, &promise);

if (!co_await promise.getFuture())
break;
}

auto task = asyncio::task::spawn(
[context = std::move(context), stream = std::move(stream)]() -> asyncio::task::Task<void> {
const auto result = co_await asyncio::error::capture(
chat(Stream{context, stream})
);

asyncio::Promise<bool> promise;

if (result) {
stream->Finish(grpc::Status::OK, &promise);
}
else {
stream->Finish(
grpc::Status{grpc::StatusCode::INTERNAL, fmt::to_string(result.error())},
&promise
);
}

if (!co_await asyncio::task::Cancellable{
promise.getFuture(),
[&]() -> std::expected<void, std::error_code> {
context->TryCancel();
return {};
}
})
throw co_await asyncio::error::StacktraceError<std::system_error>::make(
asyncio::task::Error::Cancelled
);
}
);

group.add(task);
task.future().fail([](const auto &e) {
fmt::print(stderr, "Unhandled exception: {}\n", e);
});
}

std::ignore = group.cancel();
co_await group;

Wrapping GenericServer

With all the components in place, GenericServer encapsulates the “accept request → dispatch handler → graceful shutdown” flow into four handle overloads, one per RPC type:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
template<typename T>
class GenericServer {
using AsyncService = T::AsyncService;

protected:
// Unary RPC: handler signature is Task<Response>(Request)
template<typename Service, typename Request, typename Response, typename F>
requires std::derived_from<AsyncService, Service> &&
requires(F &&f, Request request) {
{ std::invoke(std::forward<F>(f), std::move(request)) } -> std::same_as<asyncio::task::Task<Response>>;
}
asyncio::task::Task<void>
handle(
void (Service::*method)(grpc::ServerContext *,
Request *,
grpc::ServerAsyncResponseWriter<Response> *,
grpc::CompletionQueue *,
grpc::ServerCompletionQueue *,
void *),
F handler
) {
asyncio::task::TaskGroup group;

while (true) {
auto context = std::make_unique<grpc::ServerContext>();
Request request;
grpc::ServerAsyncResponseWriter<Response> writer{context.get()};

{
asyncio::Promise<bool> promise;

std::invoke(
method,
mService,
context.get(),
&request,
&writer,
mCompletionQueue.get(),
mCompletionQueue.get(),
&promise
);

if (!co_await promise.getFuture())
break;
}

auto task = asyncio::task::spawn(
[
&handler, context = std::move(context), request = std::move(request), writer = std::move(writer)
]() mutable -> asyncio::task::Task<void> {
const auto response = co_await asyncio::error::capture(std::invoke(handler, std::move(request)));

asyncio::Promise<bool> promise;

if (response) {
writer.Finish(*response, grpc::Status::OK, &promise);
}
else {
writer.FinishWithError(
grpc::Status{grpc::StatusCode::INTERNAL, fmt::to_string(response.error())},
&promise
);
}

if (!co_await asyncio::task::Cancellable{
promise.getFuture(),
[&]() -> std::expected<void, std::error_code> {
context->TryCancel();
return {};
}
})
throw co_await asyncio::error::StacktraceError<std::system_error>::make(
asyncio::task::Error::Cancelled
);
}
);

group.add(task);
task.future().fail([](const auto &e) {
fmt::print(stderr, "Unhandled exception: {}", e);
});
}

std::ignore = group.cancel();
co_await group;
}

// The server streaming, client streaming, and bidi streaming overloads
// follow the exact same structure as Unary. The only differences are
// the method pointer type and how the IO wrapper is constructed:
// server streaming → Writer{context, writer} + writer->Finish()
// client streaming → Reader<E>{make_unique<Impl>(context, reader)} + reader->Finish() / FinishWithError()
// bidi streaming → Stream{context, stream} + stream->Finish()

// Server streaming: handler signature is Task<void>(Request, Writer<Element>)
template<typename Service, typename Request, typename Element, typename F>
requires std::derived_from<AsyncService, Service> &&
requires(F &&f, Request request, Writer<Element> writer) {
{
std::invoke(std::forward<F>(f), std::move(request), std::move(writer))
} -> std::same_as<asyncio::task::Task<void>>;
}
asyncio::task::Task<void>
handle(
void (Service::*method)(grpc::ServerContext *,
Request *,
grpc::ServerAsyncWriter<Element> *,
grpc::CompletionQueue *,
grpc::ServerCompletionQueue *,
void *),
F handler
) { /* same as Unary, omitted */ }

// Client streaming: handler signature is Task<Response>(Reader<Element>)
template<typename Service, typename Element, typename Response, typename F>
requires std::derived_from<AsyncService, Service> &&
requires(F &&f, Reader<Element> reader) {
{ std::invoke(std::forward<F>(f), std::move(reader)) } -> std::same_as<asyncio::task::Task<Response>>;
}
asyncio::task::Task<void>
handle(
void (Service::*method)(grpc::ServerContext *,
grpc::ServerAsyncReader<Response, Element> *,
grpc::CompletionQueue *,
grpc::ServerCompletionQueue *,
void *),
F handler
) { /* same as Unary, omitted */ }

// Bidirectional streaming: handler signature is Task<void>(Stream<Req, Resp>)
template<typename Service, typename RequestElement, typename ResponseElement, typename F>
requires std::derived_from<AsyncService, Service> &&
requires(F &&f, Stream<RequestElement, ResponseElement> stream) {
{ std::invoke(std::forward<F>(f), std::move(stream)) } -> std::same_as<asyncio::task::Task<void>>;
}
asyncio::task::Task<void>
handle(
void (Service::*method)(grpc::ServerContext *,
grpc::ServerAsyncReaderWriter<ResponseElement, RequestElement> *,
grpc::CompletionQueue *,
grpc::ServerCompletionQueue *,
void *),
F handler
) { /* same as Unary, omitted */ }

public:
asyncio::task::Task<void> shutdown() {
co_await asyncio::toThread([=, this] {
mServer->Shutdown();
mCompletionQueue->Shutdown();
});
}

virtual asyncio::task::Task<void> run() {
co_await all(
dispatch(),
asyncio::toThread(
[this] {
void *tag{};
bool ok{};

while (mCompletionQueue->Next(&tag, &ok)) {
static_cast<asyncio::Promise<bool> *>(tag)->resolve(ok);
}
}
)
);
}

private:
virtual asyncio::task::Task<void> dispatch() = 0;

std::unique_ptr<grpc::Server> mServer;
std::unique_ptr<AsyncService> mService;
std::unique_ptr<grpc::ServerCompletionQueue> mCompletionQueue;
};

The requires constraints enforce correct handler signatures — the compiler checks at instantiation time, and if you pass a handler with the wrong signature you get a clear compile-time error instead of a runtime crash.

Implementing the Concrete Server

The final Server implementation simply inherits GenericServer and fills in the business logic and dispatch:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class Server final : public GenericServer<sample::SampleService> {
public:
using GenericServer::GenericServer;

static Server make(const std::string &address) {
auto service = std::make_unique<sample::SampleService::AsyncService>();

grpc::ServerBuilder builder;

builder.AddListeningPort(address, grpc::InsecureServerCredentials());
builder.RegisterService(service.get());

auto completionQueue = builder.AddCompletionQueue();
auto server = builder.BuildAndStart();

return {std::move(server), std::move(service), std::move(completionQueue)};
}

private:
// Unary: return Response directly; errors are automatically converted to gRPC error status
static asyncio::task::Task<sample::EchoResponse> echo(sample::EchoRequest request) {
sample::EchoResponse response;
response.set_message(request.message());
response.set_timestamp(std::time(nullptr));
co_return response;
}

// Server streaming: accept a Writer, write elements one by one
static asyncio::task::Task<void>
getNumbers(sample::GetNumbersRequest request, Writer<sample::Number> writer) {
for (int i = 0; i < request.count(); ++i) {
sample::Number number;
number.set_value(request.value() + i);
co_await writer.write(number);
}
}

// Client streaming: accept a Reader, read and aggregate
static asyncio::task::Task<sample::SumResponse> sum(Reader<sample::Number> reader) {
int total{0}, count{0};
while (const auto number = co_await reader.read()) {
total += number->value();
++count;
}
sample::SumResponse response;
response.set_total(total);
response.set_count(count);
co_return response;
}

// Bidirectional streaming: read one message, echo one back
static asyncio::task::Task<void> chat(Stream<sample::ChatMessage, sample::ChatMessage> stream) {
while (const auto message = co_await stream.read()) {
sample::ChatMessage response;
response.set_user("Server");
response.set_timestamp(std::time(nullptr));
response.set_content(fmt::format("Echo: {}", message->content()));
co_await stream.write(response);
}
}

// Bind method pointers to handlers and start the listening loop for each RPC
asyncio::task::Task<void> dispatch() override {
co_await all(
handle(&sample::SampleService::AsyncService::RequestEcho, echo),
handle(&sample::SampleService::AsyncService::RequestGetNumbers, getNumbers),
handle(&sample::SampleService::AsyncService::RequestSum, sum),
handle(&sample::SampleService::AsyncService::RequestChat, chat)
);
}
};

Business code and framework code are completely separated. The four static functions echo, getNumbers, sum, and chat only need to care about their own logic: accept request parameters, perform IO through Reader/Writer/Stream, and return results. Everything else — waiting for new requests, handling concurrency, writing back status at completion, graceful shutdown — is handled entirely by GenericServer.

Graceful Shutdown

In production, a process must ensure all in-progress RPCs are properly handled before exiting. The graceful shutdown pattern with asyncio + gRPC looks like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
asyncio::task::Task<void> asyncMain(const int argc, char *argv[]) {
auto server = Server::make("0.0.0.0:50051");
auto signal = asyncio::Signal::make();

co_await race(
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
asyncio::sync::Event event;

co_await asyncio::task::Cancellable{
all(
server.run(),
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
co_await asyncio::error::guard(event.wait());
co_await server.shutdown(); // notify the gRPC server to shut down
})
),
[&]() -> std::expected<void, std::error_code> {
event.set(); // trigger the shutdown sequence
return {};
}
};
}),
asyncio::task::spawn([&]() -> asyncio::task::Task<void> {
co_await asyncio::error::guard(signal.on(SIGINT));
})
);
}

The full shutdown sequence:

  1. SIGINT is received; the signal-watching task inside race completes.
  2. race cancels the other task, triggering the Cancellable cancellation function, which calls event.set().
  3. event.wait() returns, and server.shutdown() is called — it runs mServer->Shutdown() and mCompletionQueue->Shutdown() in a thread pool, telling gRPC to stop accepting new requests and close the CompletionQueue.
  4. Once CompletionQueue is closed, the thread-pool task in GenericServer::run() returns, and server.run() completes.
  5. Meanwhile, the accept loops in each handle() method exit because promise.getFuture() returns false, then cancel all in-flight handler tasks in the TaskGroup and wait for them to finish.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
asyncio::task::Task<void> shutdown() {
co_await asyncio::toThread([=, this] {
mServer->Shutdown();
mCompletionQueue->Shutdown();
});
}

virtual asyncio::task::Task<void> run() {
co_await all(
dispatch(),
asyncio::toThread(
[this] {
void *tag{};
bool ok{};
// Block on Next(), forwarding each completion event to the corresponding Promise
while (mCompletionQueue->Next(&tag, &ok))
static_cast<asyncio::Promise<bool> *>(tag)->resolve(ok);
}
)
);
}

asyncio::toThread moves blocking operations to the thread pool, ensuring the event loop thread is never blocked. CompletionQueue::Next is blocking and must run in a separate thread; Server::Shutdown may also block and is likewise offloaded.

Summary

Looking back over the journey, it is clear what asyncio fundamentally changes about gRPC async programming:

Callbacks become coroutines. The Promise/Future pair is the cornerstone of the entire approach. Every gRPC async callback — OnReadDone, OnWriteDone, OnDone, and the tags returned from CompletionQueue — is uniformly converted into resolve/reject calls on a Promise. The coroutine side needs only co_await.

Cancellation support with zero business-code intrusion. Through the Cancellable wrapper, every async await point naturally gains cancellation capability. Business code does not need to carry a context parameter or check a cancellation flag. The cancellation signal propagates automatically along the task chain until it reaches the waiting gRPC operation and calls TryCancel().

TaskGroup solves dynamic concurrency. The server must handle an unknown number of concurrent requests simultaneously. TaskGroup allows tasks to be added dynamically, cancelled in bulk, and awaited in bulk — exactly the right solution for this scenario.

Template constraints improve safety. GenericClient and GenericServer use overloading and requires constraints so that the compiler verifies handler signatures at instantiation time, turning potential runtime errors into compile-time failures.

Graceful shutdown without boilerplate. The event + race + Cancellable combination cleanly implements the full “receive signal → trigger shutdown → wait for all tasks” sequence, with almost no extra state variables.

The final business code demonstrates the value of all this: echo, getNumbers, sum, and chat read like ordinary synchronous functions — no nested callbacks, no state machines, no explicit lifetime management. Yet behind them is a complete, production-ready async gRPC service with high concurrency, cancellation support, and graceful shutdown.

This is the promise of coroutines: make async code as readable as sync code, while retaining all the performance advantages of async execution.

Full source code: GitHub