PR-URL: https://github.com/hasura/graphql-engine-mono/pull/9420 Co-authored-by: Nicolas Beaussart <7281023+beaussan@users.noreply.github.com> GitOrigin-RevId: 31d983ae8573c91ac5bf11066770f776941c3a11
8.9 KiB
Subscriptions with OpenAPI-to-GraphQL
Since version 2.1.0, OpenAPI-to-GraphQL supports GraphQL subscription operations. In GraphQL, using a subscription query, clients subscribe to updates on the data defined in the query. In this scenario, when data changes, the server publishes these changes to all clients that have active subscriptions for that data.
The OpenAPI specification can define similar behavior using callbacks: a callback defines a request that the server may initiate in response to receiving another request. Callbacks can thus be used to model publish/subscribe behavior. I.e., when the server receives a request to update some data, it can then itself issue callback requests (outside of the first request/response cycle) to any number of subscribed clients to inform about the new data.
When the createSubscriptionsFromCallbacks
option is enabled, OpenAPI-to-GraphQL creates subscription fields from an operation's callback objects. In such cases, OpenAPI-to-GraphQL creates a subscribe function responsible to subscribing clients to receive results of callbacks being executed, and a special form of a resolve function, which pushes data updates to subscribed clients.
To create these two functions, OpenAPI-to-GraphQL relies on the popular graphql-subscriptions package, which provides a unified API to support different network transports (like WebSockets, MQTT, Redis etc. - see this list of supported transports).
A typical example of using OpenAPI-to-GraphQL to create a GraphQL server supporting subscriptions may look like this:
Creating PubSub instance
First, initialize a PubSub instance to spread events between your API and the GraphQL Server, in a pubsub.js
file.
import { EventEmitter2 } from 'eventEmitter2';
import { PubSub } = from 'graphql-subscriptions'
const eventEmitter = new EventEmitter2({
wildcard: true,
delimiter: '/'
});
// Create the PubSub instance (here by wrapping an EventEmitter client)
const pubsub = new PubSub()
export default pubsub
PubSub could also wrap an MQTT client connected to a broker, like in this example API.
import { connect } = from 'mqtt'
import { MQTTPubSub } = from 'graphql-mqtt-subscriptions'
const MQTT_PORT = 1883
// Create a PubSub instance (here by wrapping a MQTT client)
const client = connect(`mqtt://localhost:${MQTT_PORT}`)
const pubsub = new MQTTPubSub({
client
})
export default pubsub
GraphQL server
Create GraphQL schema, resolvers and endpoints.
import { createGraphQLSchema } from 'openapi-to-graphql';
import express from 'express';
import { graphqlExpress } from 'apollo-server-express';
import { execute, printSchema, subscribe } from 'graphql';
import { SubscriptionServer } from 'subscriptions-transport-ws';
import { createServer } from 'http';
import { pubsub } from './pubsub';
const HTTP_PORT = 3000;
const init = async () => {
// Let OpenAPI-to-GraphQL create the schema
const schema = await createGraphQLSchema(oasWithCallbackObjects, {
createSubscriptionsFromCallbacks: true,
});
// Log GraphQL schema...
const myGraphQLSchema = printSchema(schema);
console.log(myGraphQLSchema);
// Set up GraphQL server using Express.js
const app = express();
app.use('/graphql', graphqlExpress({ schema }));
// Wrap the Express server...
const wsServer = createServer(app);
// ...and set up the WebSocket for handling GraphQL subscriptions
wsServer.listen(HTTP_PORT, () => {
new SubscriptionServer(
{
execute,
subscribe,
schema,
onConnect: (params, socket, ctx) => {
// Add pubsub to context to be used by GraphQL subscribe field
return { pubsub };
},
},
{
server: wsServer,
path: '/subscriptions',
}
);
});
};
init();
API server
A simple example could be the following, when an HTTP client tries to create a device (via post('/api/devices')
route) an event is published by the PubSub instance.
If a callback like #/components/callbacks/DevicesEvent is declared in your OpenAPI schema and used in path /devices
for the post
Operation, a subscription field will be generated by OpenAPI-to-GraphQL.
import express from 'express';
import bodyParser from 'body-parser';
import pubsub from './pubsub';
const HTTP_PORT = 4000;
const Devices = {
'Audio-player': {
name: 'Audio-player',
userName: 'johnny',
},
Drone: {
name: 'Drone',
userName: 'eric',
},
};
const startServer = () => {
const app = express();
app.use(bodyParser.json());
const httpServer = app.listen(HTTP_PORT, () => {
app.get('/api/devices', (req, res) => {
res.status(200).send(Object.values(Devices));
});
app.post('/api/devices', (req, res) => {
if (req.body.userName && req.body.name) {
const device = req.body;
Devices[device.name] = device;
const packet = {
topic: `/api/${device.userName}/devices/${req.method.toUpperCase()}/${device.name}`,
payload: Buffer.from(JSON.stringify(device)),
};
// Use pubsub to publish the event
pubsub.publish(packet);
res.status(200).send(device);
} else {
res.status(404).send({
message: 'Wrong device schema',
});
}
});
app.get('/api/devices/:deviceName', (req, res) => {
if (req.params.deviceName in Devices) {
res.status(200).send(Devices[req.params.deviceName]);
} else {
res.status(404).send({
message: 'Wrong device ID.',
});
}
});
});
};
startServer();
GrapQL client
If any GraphQL (WS) client subscribed to the route defined by the callback (#/components/callbacks/DevicesEvent
), it will get the content transfered by PubSub.
import axios from 'axios'
import { SubscriptionClient } from 'subscriptions-transport-ws'
import pubsub from './pubsub'
const GRAPHQL_HTTP_PORT = 3000
const REST_HTTP_PORT = 4000
const device = {
userName: 'Carlos',
name: 'Bot'
}
const startClient = () => {
// Generate subscription via GraphQL WS API...
const client = new SubscriptionClient(
`ws://localhost:${GRAPHQL_HTTP_PORT}/subscriptions`
)
client.request({
query: `subscription watchDevice($topicInput: TopicInput!) {
devicesEventListener(topicInput: $topicInput) {
name
userName
status
}
}`,
operationName: 'watchDevice',
variables: {
topicInput: {
method: 'POST',
userName: `${device.userName}`
}
}
})
.subscribe({
next: {data} => {
console.log('Device created', data)
},
})
// ...or directly via PubSub instance like OpenAPI-to-GraphQL would do
pubsub.subscribe(`/api/${device.userName}/devices/POST/*`, (...args) => {
console.log('Device created', args)
})
// Trigger device creation via GraphQL HTTP API...
axios({
url: `http://localhost:${GRAPHQL_HTTP_PORT}/graphql`,
method: 'POST',
json: true,
data: {
query: `mutation($deviceInput: DeviceInput!) {
createDevice(deviceInput: $deviceInput) {
name
userName
}
}`,
variables: device,
},
})
// ...or via REST API like OpenAPI-to-GraphQL would do
axios({
url: `http://localhost:${REST_HTTP_PORT}/api/devices`,
method: 'POST',
json: true,
data: device,
})
}
startClient()
In this example, we rely on the subscriptions-transport-ws
package to create a SubscriptionServer
that manages WebSockets connections between the GraphQL clients and our server. We also rely on the graphqlExpress
server provided by the apollo-server-express
package to serve GraphQL from Express.js.
Concerning callbacks, as you can see in the example, the path (runtime expression) /api/{$request.body#/userName}/devices/{$request.body#/method}/+
is delimited by /
and ends with +
, these symbols are interpreted as delimiters and wildcard when using MQTT topics.
It needs to be adapted accordingly to the client wrapped in your PubSub instance, for eventEmitter2 you can use *
and define your own delimiter.
A helper might be provided in the future, to simplify this process.
Examples
You can also run the example provided in this project.
Start REST API server (HTTP and MQTT) :
npm run api_sub
Start GRAPHQL server (HTTP and WS) :
npm run start_dev_sub