It is done, here is the UDP version of the msgpack rpc, client and server.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
const { encode, decode } = require('msgpack5')();
const dgram = require('dgram');

const PORT = 8081;

// this implementation will only support small messages (size as allowed by udp/ip itself)
// to find if client and server are still connected, the client will send ping messages
// the server will expect ping and respond. but the server will not initiate a ping himself.
// when the server does not receive a ping, the client get removed
module.exports.createServer = function createServer(api){
const server = dgram.createSocket('udp4');

const clients = {};
server.clients = clients;

server.on('connect',(...args)=>{console.log('server: onConnect', args)})
server.on('message', async (message, rinfo)=>{
var request = {}
try {
request = decode(message);
} catch(err) {
const response = encode({id: request.id, error: true, r: err.message})
server.send(response,0,response.length,client.rinfo.port, client.rinfo.address);
return;
}

// clients get identified by their address and port, here is the logic for clearing inactive clients
clientKey = rinfo.address+':'+rinfo.port;
const client = clients[clientKey] = clients[clientKey] || { rinfo, time: Date.now() };
clearTimeout(client.disconnectTimeout);
client.disconnectTimeout = setTimeout(()=>{},1000*60);

// execution and response is similar as in the tcp version
try {
const result = await api[request.method](...(request.args||[]));
const response = encode({id: request.id, r: result});
server.send(response,0,response.length,client.rinfo.port, client.rinfo.address);
} catch(err) {
const response = encode({id: request.id, error:true, r: err.message})
server.send(response,0,response.length,client.rinfo.port, client.rinfo.address);
}

//console.log('server: onMessage', message+'', rinfo);
//server.send('what',0,4,rinfo.port+1, rinfo.address,(err)=>{console.log('sendDone', err)})
});
server.on('listen',(...args)=>{console.log('server: onListen', args)})
server.on('error',(...args)=>{console.log('server: onError', args)})

api.__getMethods__ = async ()=>{
return methods
};

// the ping response is simple
api.__ping__ = async ()=>{
return true;
};

const methods = Object.keys(api)

return server;
}


module.exports.createClient = function createClient(host,port){
var client = dgram.createSocket('udp4')
var nextId = 0;
client.connect(port, host, function(){
// this callback will not receive any error. guess it is because it only need to bind a local port
api.__getMethods__().then(methodNames=>{
methodNames.forEach(addAPI);
client.ready.resolve();
});
});

client.on('message', (message, rinfo)=>{
const response = decode(message);
if(!requests[response.id])return;
if(requests[response.id]){
requests[response.id].resolve(response.r)
}// todo: handle reject
delete requests[response.id];
});

const api = {};
function addAPI(name){
api[name] = (...args)=> request(name,args);
}
addAPI('__getMethods__');
addAPI('__ping__');

const requests = [];
const request =function request (method, args) {
const id = nextId++;
client.send(encode({method,args,id}),port, host);
return requests[id] = getDeferrer()
}

client.pingInterval = setInterval(()=>{
client.send(encode({method: '__ping__', id: nextId++}),port, host)
}, 1000*30)

client.ready = getDeferrer()
client.api = api;

return client
}

function getDeferrer(){
var _resolve, _reject;
const p = new Promise((res,rej)=>{
_resolve = res; _reject=rej;
});
p.resolve = _resolve;
p.reject = _reject;
return p;
}


var server = module.exports.createServer({
add: (a,b)=> a+b,
getClientCount: ()=>Object.keys(server.clients).length
});

server.bind(PORT)

setTimeout(async ()=>{
console.log('start client')
const rpcClient = module.exports.createClient('localhost',PORT);
console.log('wait for ready')
await rpcClient.ready;
const {api} = rpcClient;


const sum = await api.add(1,2);
console.log({sum})
const textSum = await api.add('hallo ', 'world')
console.log({textSum})
console.log('number of clients', await api.getClientCount())


const rpcClient2 = module.exports.createClient('localhost',PORT);
console.log('wait for ready2')
await rpcClient2.ready;

console.log('number of clients', await rpcClient2.api.getClientCount());

// sending long data like the following line fails without any error
// console.log('long data', await rpcClient2.api.add('a'.repeat(1000*100),'b'));
process.exit();
},1000);

I think the logic is not horribly more complicated. But the silently not delivering the messages is difficult. I am glad to have done this little exercise, as I now understand better why most of the internet is using tcp for data transfer.

Not sure if or when I keep going with this little rpc-library project, But I think I would go forward with the TCP version. Do you understand the code? Do you have any questions? You can contact me on dev.to/bias.

back to developer log

Did you like this article? You also might be interested in the schema less API.