gRPC Streaming PT-1
A WebSocket alternative.
In this article, we will cover gRPC streaming and its different types. Compare it with WebSockets.
Part one of two-part series.
What are Web Sockets?
We all know HTTP/HTTPS, a unidirectional request-response protocol between clients and servers. A connection lifecycle in HTTP protocol is the span of requests and responses.
Similar is the concept of WebSockets, a bidirectional stream in a client-server architecture. A connection lifecycle in WebSocket protocol can exist for a limitless period until some external interference terminates the connection.
Restful APIs are an example of the HTTP protocol, whereas a real-time chat system is an example of the WebSocket protocol.
Uses of WebSockets
Websockets have various uses, including real-time web applications, chat applications, and gaming applications.
In applications where we want continuous streams of data over the network, we can use WebSockets.
In cases where we want to get data only one time, we can use HTTP.
What is gRPC Stream?
GRPC offers three different streaming modes client streaming, server streaming, and full-duplex streaming ( including both client and server).
In client streaming, the server will receive a series of messages from the client and respond when there are no more messages.
When using server streaming, the client makes a request, the server responds with a string of messages, and the client reads each one until there are none left.
Full-duplex enables both client and server to send each other a series of messages until no more messages are left.
A bidirectional stream can be suitable in scenarios where you have a logical requirement of continuous message sharing between client and server.
Why use gRPC Stream?
Why use gRPC has a simple answer. To make it more understandable, we can compare it with WebSockets.
gRPC uses HTTP/2.0, whereas WebSockets use HTTP/1.1. Using gRPC offers a higher security system as they provide built-in encryption, whereas WebSockets require access control. Moreover, gRPC uses binary data format for communication, while WebSockets use data formats like JSON, MTTQ, etc.
Hence, gRPC provides a more secure and fast-stream communication channel than WebSockets.
Moreover, gRPC supports multiplexing, which enables a client to send and receive multiple messages on a single connection, reducing the overhead of creating an HTTP/2 connection every time, hence, making a single reusable connection.
How to implement gRPC streaming?
Client Streaming
First, let us look into
What is client streaming?
- It is communication between the client and server, where the client sends a stream of messages to the server.
- After the stream ends (prompted by the client) server responds once.
Proto File
We are using a client stream where a driver’s location is updated, and we have some dummy variables added.
syntax = 'proto3';
package locationTrackingApp;
service Location {
rpc updateLocation (stream LoactionReq) returns (LoactionRes);
}
message LoactionRepq {
string driverId = 1;
string longitude = 2;
string latitudes = 3;
}
message LoactionRes {
}
Implementation
- We will be using NodeJs to implement a solution for a server first.
- Note: I have added an SSL certificate for secure communication between the client and server. However, you can ignore that if you want.
Server
// grpc-js is the grpc library for node js,
// where proto-loader is a library to compiler proto on runtime
const grpc = require("@grpc/grpc-js");
const protoLoader = require("@grpc/proto-loader");
const fs = require("fs");
// load the certificates
function getServerCredentials() {
const serverCert = fs.readFileSync("./Path_To_Cert/server-cert.pem");
const serverKey = fs.readFileSync("./Path_To_Cert/server-key.pem");
const serverCredentials = grpc.ServerCredentials.createSsl(
null,
[
{
cert_chain: serverCert,
private_key: serverKey,
},
],
false
);
return serverCredentials;
}
function main() {
const server = new grpc.Server();
const packageDefinition = protoLoader.loadSync("./Path_To_Proto/locationStream.proto", {});
// we get the package name from the loaded proto file and register our
// service defination to it
const locationTrackingApp =
grpc.loadPackageDefinition(packageDefinition).locationTrackingApp;
// Add the service
server.addService(locationTrackingApp.Location.service, {
updateLocation: updateLocation,
});
const credentials = getServerCredentials();
// PS: to remove credentials use grpc.ServerCredentials.createInsecure()
server.bindAsync("0.0.0.0:50051", credentials, (err, port) => {
if (err) {
console.error(err)
return
}
server.start();
console.log(`Server running at http://0.0.0.0:${port}`);
});
}
var driverLocations = {};
function updateLocation(call, callback) {
call.on('data', (req) => {
if (!driverLocations[req.driverId]) {
driverLocations = {
...driverLocations,
[req.driverId]: [{
longitude: req.longitude,
latitudes: req.latitudes
}]
}
} else {
driverLocations[req.driverId].push({
longitude: req.longitude,
latitudes: req.latitudes
})
}
})
call.on('end', () => {
console.log("Stream Completed")
console.log(driverLocations)
callback(null, {});
});
}
The client stream provides two events:
- Data: event of the writing of messages from the client.
- End: event of stream end from the client.
Client
I am sharing two client code samples, node and java (in case you are working in an android environment).
NodeJS Client
const grpc = require("@grpc/grpc-js");
const protoLoader = require("@grpc/proto-loader");
const fs = require("fs");
function getChannelCredentials() {
const rootCert = fs.readFileSync("./Path_To_Cert/ca-cert.pem");
const channelCredentials = grpc.ChannelCredentials.createSsl(rootCert);
return channelCredentials;
}
function main() {
const packageDefinition = protoLoader.loadSync("./proto/locationStream.proto", {});
const locationTrackingApp =
grpc.loadPackageDefinition(packageDefinition).locationTrackingApp;
const client = new locationTrackingApp.Location(
"localhost:50051",
getChannelCredentials()
);
// here we establish a link with our server
const stream = client.updateLocation((error, res) => {
if (error) {
console.error(error)
return;
}
console.log(res)
});
// with the same link / stream valiable we will write as many messages we want
for (let i = 0; i < 10; i++) {
// this is the "data" event
stream.write({
driverId: "driver",
longitude: "2." + i,
latitudes: "3.1"
});
}
// this send the "end" event
stream.end();
}
main();
Java Client:
// loading the certificates and building a secure channel with the server.
final SslContext sslCerts = loadTLSCredentials();
final ManagedChannel channel = NettyChannelBuilder
.forTarget("localhost:50051")
.sslContext(sslCerts)
.build();
final LocationGrpc.LocationStub stub = LocationGrpc.newStub(channel);
// are this is a stream we will use StreamObserver to
// observe response from server "responseObserver"
StreamObserver<LoactionRes> responseObserver = new StreamObserver<>() {
@Override
// function to handle response from server
public void onNext(LoactionRes res) {
System.out.println(res);
}
@Override
// handle error if any received from server
public void onError(Throwable throwable) {
throwable.printStackTrace();
finishLatch.countDown();
}
@Override
public void onCompleted() {
System.out.println("Finished UpdateLocation");
finishLatch.countDown();
}
};
// now we will create a stream with the secure channel
// we will use the responseObserver to send message stream
StreamObserver<LoactionReq> responseObserver= stub.updateLocation(responseObserver);
System.out.println("requestObserver stream started");
for(int i=0;i<10;i++){
// this emmit the "data" event
requestObserver.onNext(LoactionReq.
newBuilder()
.setDriverId("driver_java_1_tester")
.setLatitudes("lat"+i)
.setLongitude("long")
.build());
// java requires a minute wait as
// processing is done in asynchronous manner
Thread.sleep(5);
}
// this emits the "end" event
requestObserver.onCompleted();
// How to load certificates
public static SslContext loadTLSCredentials() throws SSLException {
File serverCACertFile = new File("Path_To_Certs/ca-cert.pem");
return GrpcSslContexts.forClient()
.trustManager(serverCACertFile)
.build();
}
In the android environment our certificates and channel creation updates to:
Resources res = getResources();
CertificateFactory cf = CertificateFactory.getInstance("X.509");
InputStream mainstream = res.openRawResource(R.raw.ca_cert);
Certificate ca = cf.generateCertificate(mainstream);
KeyStore kStore = KeyStore.getInstance(KeyStore.getDefaultType());
kStore.load(null, null);
kStore.setCertificateEntry("ca", ca);
TrustManagerFactory tmf = TrustManagerFactory
.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(kStore);
TrustManager[] trustManagers = tmf.getTrustManagers();
if (trustManagers.length != 1 || !(trustManagers[0] instanceof X509TrustManager)) {
throw new IllegalStateException("Unexpected default trust managers:" + Arrays.toString(trustManagers));
}
SSLContext context = SSLContext.getInstance("TLS");
context.init(null, tmf.getTrustManagers(), null);
SSLSocketFactory sslSocketFactory = context.getSocketFactory();
final ManagedChannel channel = OkHttpChannelBuilder
.forTarget("IP:PORT | domain ")
.useTransportSecurity()
.overrideAuthority("IP:PORT | domain")
.sslSocketFactory(sslSocketFactory)
.build();
In Android, we add our certificates as a raw resource, which we load and set as a certificate. We use the certificate to initialize our stub (gRPC instance).
For details, you can view my other article:
Continue reading for server-side streaming and full-duplex streaming.