Qu'est-ce qu'un filtre ?

Je vais vous en donner la définition que j'ai apprise à la fac : il s'agit d'une commande qui lit son entrée standard et qui écrit sur sa sortie standard.
Autrement dit, il s'agit d'une commande qui va traiter les données qu'on lui donne pour en générer d'autres. Ces commandes peuvent ainsi être chaînées pour combiner des traitements.

Vous connaissez peut-être les commandes grep, cut, sort, uniq, tr ou encore sed : on peut toutes les qualifier de filtres !

L'API stream de Node.js

Qu'est-ce qu'un flux (stream) ?

Node.js fournit un module stream. Ce module contient des objets représentant des flux.
Un flux est un objet permettant d'abstraire une source ou une destination de données. Cela signifie que c'est un moyen de représenter un endroit où on va soit récupérer des informations, soit déposer des informations.
On distingue donc deux types de flux : les flux en lecture (readable ou input streams) ainsi que les flux en écriture (writable ou output streams).
À ces deux types de flux on peut ajouter un troisième : le flux en duplex. Il s'agit d'un flux qui combine les capacités des deux catégories : on peut y envoyer des données et recevoir des données en sa provenance.

En résumé :

Dans l'environnement de Node.js, on dispose de deux propriétés process.stdin (standard input) et process.stdout (standard output).
Ces objets permettent respectivement de lire l'entrée standard et d'écrire sur la sortie standard. Vous l'aurez sans doute deviné : ce sont des flux ! L'entrée standard est un flux en lecture (car on peut lire les données de l'entrée standard) et la sortie standard est un flux en écriture (car on peut y écrire des données).

La méthode .pipe()

Il est possible de piper les flux les uns vers les autres, de la même façon qu'on peut piper les commandes bash les unes vers les autres.
Ainsi, on peut écrire un programme Node.js qui écrit sur sa sortie standard tout ce qu'il reçoit sur son entrée très simplement : process.stdin.pipe(process.stdout);. Ce code signifie : « envoie les données du flux en lecture process.stdin dans le flux en écriture process.stdout ».

Enregistrons ce code dans un fichier filtre.js et utilisons la commande echo pour envoyer du texte sur son entrée : echo "Hello, world!" |node filtre.js.
Fabuleux, nous avons écrit notre premier filtre !

Hello, world!

Bon d'accord, il ne fait pas encore grand chose ; je vous propose d'ajouter un petit traitement au milieu. Pour cela nous allons utiliser…

Les flux de transformation

Le module stream contient un constructeur Transform. Ce constructeur permet de créer des flux de type duplex afin de facilement transformer les données qui y transitent.
À l'aide de Transform, modifions notre filtre existant afin qu'il mette en majuscules tout le texte qu'il reçoit !

'use strict';

const TransformStream = require('stream').Transform;

// Définissons un flux de transformation en étendant le constructeur du module stream.
class CapsLockStream extends TransformStream {
    // La méthode _transform() sera appelée par le prototype parent afin de transformer les
    // données. Elle doit appeler le callback fourni pour transmettre les nouvelles données.
    _transform(chunk, encoding, callback) {
        const chunkString = chunk.toString(encoding !== 'buffer' ? encoding : null);
        callback(null, chunkString.toUpperCase());
    }
}

// Cette fois-ci, on met notre flux de transformation entre les flux d'entrée et de sortie.
// On envoie les données de l'entrée standard dans le flux de transformation, puis les données du
// flux de transformation dans la sortie standard :
process.stdin.pipe(new CapsLockStream).pipe(process.stdout);

Si on relance notre script comme tout à l'heure (echo "Hello, world!" |node filtre.js), on peut constater que notre filtre fonctionne !

HELLO, WORLD!

Écrivons un véritable filtre

Maintenant que nous savons comment utiliser les flux de transformation, nous allons pouvoir nous essayer à l'écriture d'un filtre un peu moins trivial.
Le but du jeu sera donc d'écrire un filtre qui reçoit un flux d'URL (une par ligne) en entrée et les transforme pour afficher des objets JSON représentant les paramètres de recherche (query string) de chaque URL.

Nous allons pour cela utiliser le module url de Node.js pour parser les URL.

Avez-vous une idée de comment nous allons procéder ?
Allez, on se lance ! Commençons par écrire un flux de transformation qui reçoit les URL puis les renvoie presque sans rien faire histoire d'avoir un squelette de code et de vérifier que tout se passe bien.

'use strict';

const url = require('url');
const TransformStream = require('stream').Transform;

class QueryStringExtractor extends TransformStream {
    _transform(chunk, encoding, callback) {
        const urlString = chunk.toString(encoding !== 'buffer' ? encoding : null);
        callback(null, this.extractQueryString(urlString) + "\n");
    }

    extractQueryString(urlString) {
        // Ici on écrira le code qui extrait les paramètres de l'URL et les
        // renvoie formatés en JSON. En attendant on se contente de peu :
        return "Oh une URL : " + urlString;
    }
}

process.stdin.pipe(new QueryStringExtractor).pipe(process.stdout);

Créons également un fichier urls.txt qui contient quelques données de test :

http://example.cguille.net
http://example.cguille.net?toto=titi
http://example.cguille.net?foo=bar&baz=1
http://example.cguille.net/another/example/?vroum[foo]=bar
http://example.cguille.net/search?q=test+hey
http://example.cguille.net/search?q=h%C3%A9+oh

Et enfin testons notre filtre : node filtre.js < urls.txt.

Oh une URL : http://example.cguille.net
http://example.cguille.net?toto=titi
http://example.cguille.net?foo=bar&baz=1
http://example.cguille.net/another/example/?vroum[foo]=bar
http://example.cguille.net/search?q=test+hey
http://example.cguille.net/search?q=h%C3%A9+oh

Oups ! De toute évidence, le filtre de transformation n'a été appliqué qu'une seule fois à toutes les URL au lieu d'être appliqué à chaque URL. Que s'est-il passé ?

Nous sommes partis du principe que notre fonction _transform() allait être appelée une fois pour chaque URL, c'est-à-dire pour chaque ligne. Or c'est faux ! À aucun moment nous n'avons découpé en lignes le texte reçu en entrée.

Traiter l'entrée ligne par ligne

Nous avons donc besoin de traiter le flux ligne par ligne. Ce besoin est a priori assez courant, il est donc fort probable que quelqu'un ait déjà résolu le problème.
Bingo ! Le module npm byline a l'air de faire ce que l'on souhaite ! Si on l'installe (npm install --save byline), on peut utiliser le prototype LineStream qu'il fournit afin d'obtenir un flux ligne par ligne.

'use strict';

const url = require('url');
const TransformStream = require('stream').Transform;
const LineStream = require('byline').LineStream;// On importe le flux par ligne.

class QueryStringExtractor extends TransformStream {
    _transform(chunk, encoding, callback) {
        const urlString = chunk.toString(encoding !== 'buffer' ? encoding : null);
        callback(null, this.extractQueryString(urlString) + "\n");
    }

    extractQueryString(urlString) {
        return "Oh une URL : " + urlString;
    }
}

// On l'intercale entre l'entrée standard et notre flux de transformation:
process.stdin.pipe(new LineStream).pipe(new QueryStringExtractor).pipe(process.stdout);
Oh une URL : http://example.cguille.net
Oh une URL : http://example.cguille.net?toto=titi
Oh une URL : http://example.cguille.net?foo=bar&baz=1
Oh une URL : http://example.cguille.net/another/example/?vroum[foo]=bar
Oh une URL : http://example.cguille.net/search?q=test+hey
Oh une URL : http://example.cguille.net/search?q=h%C3%A9+oh

Voilà qui est beaucoup mieux ! Désormais, on obtient une transformation par URL.

Implémenter la transformation

Il ne nous reste plus qu'à récupérer l'information recherchée :

'use strict';

const url = require('url');
const TransformStream = require('stream').Transform;
const LineStream = require('byline').LineStream;

class QueryStringExtractor extends TransformStream {
    _transform(chunk, encoding, callback) {
        const urlString = chunk.toString(encoding !== 'buffer' ? encoding : null);
        callback(null, this.extractQueryString(urlString) + "\n");
    }

    extractQueryString(urlString) {
        const urlObject = url.parse(urlString, true);
        return JSON.stringify(urlObject.query);
    }
}

process.stdin.pipe(new LineStream).pipe(new QueryStringExtractor).pipe(process.stdout);
{}
{"toto":"titi"}
{"foo":"bar","baz":"1"}
{"vroum[foo]":"bar"}
{"q":"test hey"}
{"q":"hé oh"}

Gérer l'erreur EPIPE

Nous disions que l'un des grands intérêts de ce genre de commandes était de pouvoir les combiner avec d'autres. Que se passe-t-il si par exemple nous essayons de récupérer uniquement les deux premières lignes du résultat à l'aide de la commande node filtre.js < urls.txt |head -n2 ?

{}
{"toto":"titi"}
events.js:154
      throw er; // Unhandled 'error' event
      ^

Error: write EPIPE
    at exports._errnoException (util.js:856:11)
    at WriteWrap.afterWrite (net.js:767:14)

Horreur, tout explose ! Mais pourquoi donc ?
La commande head a brutalement fermé son entrée standard (c'est-à-dire notre sortie standard !) une fois qu'elle a lu suffisamment de lignes. Une fois cela fait, il n'est plus possible d'y envoyer quoi que ce soit, donc on ne doit plus écrire dans process.stdout.
On peut donc décider de simplement quitter le programme lorsque cette erreur se présente.

'use strict';

const url = require('url');
const TransformStream = require('stream').Transform;
const LineStream = require('byline').LineStream;

class QueryStringExtractor extends TransformStream {
    _transform(chunk, encoding, callback) {
        const urlString = chunk.toString(encoding !== 'buffer' ? encoding : null);
        callback(null, this.extractQueryString(urlString) + "\n");
    }

    extractQueryString(urlString) {
        const urlObject = url.parse(urlString, true);
        return JSON.stringify(urlObject.query);
    }
}

process.stdout.on('error', function (error) {
    if (error.code === 'EPIPE') {
        process.exit(0); // On ne peut plus rien écrire, arrêtons-nous tout de suite.
    }
    throw error; // Ce n'est pas l'erreur attendue ; oups !
});

process.stdin.pipe(new LineStream).pipe(new QueryStringExtractor).pipe(process.stdout);
{}
{"toto":"titi"}

Et voilà, tout va bien !