gRPC Streaming PT-2
A WebSocket alternative.
This article will cover server-side and full-duplex gRPC streaming.
Part two of two-part series.

Pre-requisite to this is part one of the series to understand gRPC streaming and how it is better than WebSockets, with an introduction to the client streaming implementation of both the client and server side.
Server Streaming
In this section, we will cover the second type of gRPC streaming, where the server sends a series of messages upon a single request from the server. Unlike client streaming, the server terminates the connection by making the end call, which means the server has control of the request.
How to define the proto function?
syntax = 'proto3';
package locationTrackingApp;
service Location {
// the stream keyword is used with the response
rpc getLocations (LoactionGetReq) returns (stream LoactionGetRes);
}
message LoactionGetReq {
string driverId = 1;
}
message LoactionGetRes {
string longitude = 1;
string latitudes = 2;
}
Now let's dig into the implementation of the server-side first.
We are using node js for server-side implementation.
Server:
const grpc = require("@grpc/grpc-js");
const protoLoader = require("@grpc/proto-loader");
function getLocations(call) {
// the call controll the request
const id = call.request.driverId
for (let i = 0; i < 10; i++) {
let res = {
longitude: "1." + i,
latitudes: "0.2"
}
// write the message to client side
// client will receive the data as a "data" event
call.write(res)
}
// this throws the "end" call to the client
// and ends the response
call.end()
}
const server = new grpc.Server();
const packageDefinition = protoLoader.loadSync("./Path_to_Proto/locationStream.proto", {});
const locationTrackingApp =
grpc.loadPackageDefinition(packageDefinition).locationTrackingApp;
// add the function defination of getLocation to bind with the server
server.addService(locationTrackingApp.Location.service, {
getLocations: getLocations,
});
// start the server
server.bindAsync("0.0.0.0:50051", credentials, (err, port) => {
if (err) {
console.error(err)
return
}
server.start();
console.log(`Server running at http://0.0.0.0:${port}`);
});
How to interact with the server?
We will see client implementations in NodeJs and Java.
NodeJs Implementation
const grpc = require("@grpc/grpc-js");
const protoLoader = require("@grpc/proto-loader");
const packageDefinition = protoLoader.loadSync("./proto/locationStream.proto", {});
const locationTrackingApp =
grpc.loadPackageDefinition(packageDefinition).locationTrackingApp;
const client = new locationTrackingApp.Location(
"localhost:50051",
grpc.credentials.createInsecure()
);
const data = {
driverId: "1",
}
var stream = client.getLocations(data)
stream.on("data", (data) => {
console.log(data)
})
stream.on("end", () => {
console.log("Driver Reached")
})
Java Implementation
final ManagedChannel channel = NettyChannelBuilder
.forTarget("localhost:50051")
.usePlaintext()
.build();
final LocationGrpc.LocationStub stub = LocationGrpc.newStub(channel);
System.out.println("===Get Location Server Streaming===");
StreamObserver<LoactionGetRes> responseGetObserver = new StreamObserver<>() {
@Override
public void onNext(LoactionGetRes res) {
System.out.println("Server Stream: "+res);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("====Finished getLocation===");
}};
System.out.println("requestObserver stream started");
stub.getLocations(LoactionGetReq
.newBuilder()
.setDriverId("java_driver")
.build(),
responseGetObserver);
Java Android Implementation
For Android, most of the code will remain the same. The only change is the channel. For Android, we use OkHttpChannelBuilder to create a gRPC channel between our application and the server.
final ManagedChannel channel = OkHttpChannelBuilder
.forTarget("localhost:50051")
.usePlaintext()
.build();
final LocationGrpc.LocationStub stub = LocationGrpc.newStub(channel);
System.out.println("===Get Location Server Streaming===");
StreamObserver<LoactionGetRes> responseGetObserver = new StreamObserver<>() {
@Override
public void onNext(LoactionGetRes res) {
System.out.println("Server Stream: "+res);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("====Finished getLocation===");
}};
System.out.println("requestObserver stream started");
stub.getLocations(LoactionGetReq
.newBuilder()
.setDriverId("java_driver")
.build(),
responseGetObserver);
How to use the code sample with your application?
You will need to make the process wait for the completion of the server stream. One way is to sleep the main thread until all responses from the server are received.
Full-Duplex Streaming
One of the most common applications of full-duplex communication is a chat system. Consider any online messaging application; you receive instant messages when you are online, similar to a call we have on our phones.
You can design an application of your own where you can perform instant messages between two users.
How would you achieve this?
The implementation is simple, following the guidelines mentioned in this section of the article for implementing full-duplex streaming of gRPC.
We will have a server-side and a client. We will write the server and client in NodeJs.
Proto File
syntax = 'proto3';
package chatPackage;
// Our Chatservice definition.
service Chat {
rpc Chat(stream ChatRequest) returns (stream ChatResponse) {};
}
message ChatRequest {
string message = 1;
}
message ChatResponse {
string username = 1;
string message = 2;
}
Server
I will implement a simple chat system where the server will broadcast messages to all clients.
const grpc = require("@grpc/grpc-js");
const protoLoader = require("@grpc/proto-loader");
const packageDefinition = protoLoader.loadSync("./proto/chat.proto", {});
const chatPackage = grpc.loadPackageDefinition(packageDefinition).chatPackage;
// Create a server
const server = new grpc.Server();
// store the information of active clients
const callObjByUsername = new Map();
function Chat(call) {
call.on("data", (req) => {
// metadata is a new term used here, relate it to
// header in a rest call
const username = call.metadata.get("username")[0];
// req (request) is the parameter
const msg = req.message;
console.log(username, req.message);
// search if user exists
for (let [user, usersCall] of callObjByUsername) {
// broadcast message to all the clients
if (username !== user) {
usersCall.write({
username: username,
message: msg,
});
}
}
// if this is a new user add it to the active user map
if (callObjByUsername.get(username) === undefined) {
callObjByUsername.set(username, call);
}
});
// if any users ends their session remove them from active list
call.on("end", () => {
const username = call.metadata.get("username")[0];
callObjByUsername.delete(username);
for (let [user, usersCall] of callObjByUsername) {
// inform all the clients, a user have left
usersCall.write({
username: username,
message: "Has Left the Chat!",
});
}
console.log(`${username} is ending their chat session`);
// sending a goodbye message to the user who ended their session
call.write({
username: "Server",
message: `See you later ${username}`,
});
call.end();
});
};
// add the Chat as a services to the gRPC server
server.addService(chatPackage.Chat.service, {
chat: Chat,
});
// bind server to an IP:PORT and start the server
server.bindAsync(
"0.0.0.0:50051",
grpc.ServerCredentials.createInsecure(),
(err, port) => {
if (err) {
// in case of error log the error and end the callback
console.error(err)
return
}
console.log(`Server running at http://0.0.0.0:${port}`);
server.start();
}
);
The above code implements a chat server that broadcasts an incoming call to a list of active clients. We have used metadata similar to the header in a rest HTTP call.
Client NodeJs
const grpc = require("@grpc/grpc-js");
const protoLoader = require("@grpc/proto-loader");
// to read console input
const readline = require("readline");
const packageDefinition = protoLoader.loadSync("./proto/chat.proto", {});
const chatPackage = grpc.loadPackageDefinition(packageDefinition).chatPackage;
// Create a server
const client = new chatPackage.Chat(
"localhost:50051",
grpc.credentials.createInsecure()
);
// defining we need to take input from console
const io = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
// when you are running the file pass the username argument
// node client.js username
const username = process.argv[2];
if (!username) console.error("No username, can't join chat"), process.exit();
// set the metadata of the request, we are setting username here
const metadata = new grpc.Metadata();
metadata.set("username", username);
const call = client.Chat(metadata);
// register the user to the active client list
call.write({
message: "register",
});
// handle response from server
call.on("data", (chunk) => {
console.log(`${chunk.username} ==> ${chunk.message}`);
});
io.on("line", (line) => {
// add soon as we have a new line on the console check
if (line === "quit") {
// if users have initated to end the call
call.end();
} else {
// else write the message from users to send to other clients
call.write({
message: line,
});
}
});
Output
Conclusion
For you to explore, I’ve left the full-duplex client implementation in java.
I hope this will give you a clear understanding of how to use gRPC streaming services. If you have any questions, feel free to write me.
Hint on implementing a full-duplex client in java using two StreamObserver is the way to start.
PS: You can refer to the first part to see how to get a secure server implemented.