New versions of JavaScript and version 12 of nodejs have made a great feature available, that is also making me to re implement one of my best node modules. This time I am talking about the ‘for await’ loop and nodejs streams support for asynchronous generators.

One after an other. Streams are the needed structure in code, when you need to handle more data than you can fit into memory at once. You have a machine with 16gb of RAM? To process a file bigger than that efficient, we need a way to process chunk after chunk. When developing a stream, with multiple steps, it is important to keep the different speeds working together. Loading a file from disk, for example from a backup, can be very fast, compared to inserting the data into the database somewhere on the network, that need to build indexes and more.

With version 12 of nodejs, streams and for await loops work together very well. Here is my example for importing the planet data of open street map into db, I found the db slower then reading the file stream.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
const bz2 = require('unbzip2-stream');
const http = require('https');
const xml = require('txml');

const planetMapUrl = "https://ftp5.gwdg.de/pub/misc/openstreetmap/planet.openstreetmap.org/planet/2019/planet-190930.osm.bz2";
const planetFilePreamble = `<?xml version="1.0" encoding="UTF-8"?>
// <osm license="http://opendatacommons.org/licenses/odbl/1-0/" copyright="OpenStreetMap and contributors" version="0.6" generator="planet-dump-ng 1.1.6" attribution="http://www.openstreetmap.org/copyright" timestamp="2019-09-29T23:59:55Z">
// <bound box="-90,-180,90,180" origin="http://www.openstreetmap.org/api/0.6"/>`;


main().catch(err => console.log(err));

async function main() {
let frameCount = 0;
let xmlItems = 0;
let dataSize = 0;

const fileReadStream = (await get(planetMapUrl))
.pipe(bz2())

fileReadStream.on('data', (data) => {
frameCount++;
dataSize += data.length;
console.log('data', { frameCount, size: dataSize, xmlItems })
});

const xmlStream = fileReadStream.pipe(xml.transformStream(planetFilePreamble.length));

const typeStats = {};

for await (const node of xmlStream) {
xmlItems++;
await sleep(1);
if (!typeStats[node.tagName]) typeStats[node.tagName] = 0;
typeStats[node.tagName]++;
if (xmlItems % 100 == 0)
console.log('loop', xmlItems, typeStats);
}
}


function get(url) {
return new Promise((resolve, reject) => {
http.get(url, resolve);
});
}

function sleep(ms) {
return new Promise(resolve => setInterval(resolve, ms || 0))
}

In this code, you see first the imports, the configuration for processing the planet file, followed by the main function. In the bottom we have the two utility functions get and sleep. Within the main function, we create a read stream, that is directly downloading from the remote server, provided by the open street map project. Just for some statistics we add a on ‘data’ event to that download stream that is directly unzipped.

Next we pipe the text stream into the XML transformStream. Using the for await loop, we can process one item at a time. using some more logic we could easily make chunks of nodes to process more nodes at once, for inserting many items with a single INSERT statement to SQL for example. The loop sleep a little to simulate a slow database here and also maintain some statistics.

Now lets take a look at the output.
for-async-loop, CLI output

This shows that we can control the speed of the entire stream just by using a for loop. I thinks this is a great step for building scalable applications with very clean code. It is really awesome, that it is possible to process the data, without downloading the entire planet file upfront. What do you think?

By the way, to implement the transformStream for xml, I used the through2 module, it was very easy to change the implementation from the existing “event” based implementation to this stream support.

Contents