IdentifiantMot de passe
Loading...
Mot de passe oublié ?Je m'inscris ! (gratuit)

Cours complet pour apprendre à programmer en D


précédentsommairesuivant

59. Concurrence par messages

La concurrence est à la fois similaire et différente du sujet que nous avons abordé dans le dernier chapitre, le parallélisme. Comme ces deux concepts impliquent tous les deux d'exécuter un programme sur des threads, et que le parallélisme est basé sur la concurrence, ils sont souvent confondus.

Voici les différences entre le parallélisme et la concurrence :

  • l'objectif principal du parallélisme est de profiter des multiples cœurs d'un processeur pour améliorer les performances d'un programme. La concurrence d'un autre côté, est un concept qui peut être utile même dans un environnement monocœur. La concurrence, c'est faire s'exécuter un programme sur plus d'un thread à la fois ;
  • en parallélisme, les tâches sont indépendantes les unes des autres, ce serait en fait un bogue si une tâche dépendait d'autres tâches qui s'exécutent au même moment. Dans le cas de la concurrence, il est tout à fait normal qu'un thread dépende des résultats d'autres threads ;
  • même si les deux modèles utilisent les threads du système, les threads du parallélisme sont encapsulés dans le concept de tâche. La concurrence utilise les threads explicitement ;
  • le parallélisme est facile à utiliser, et tant que les tâches sont indépendantes, il est facile de créer des programmes qui fonctionnent correctement. La concurrence est facile à mettre en place uniquement quand elle est basée sur les messages. Il est très difficile d'écrire des programmes concurrents s'ils sont basés sur le modèle traditionnel de la concurrence, qui implique le partage de données verrouillées.

Le langage D permet d'utiliser les deux modèles de concurrence : les messages et le partage de données. Nous allons parler des messages dans ce chapitre et du partage de données dans le prochain chapitre.

59-1. Concepts

  • Thread  : les systèmes d'exploitation considèrent les programmes comme des unités qu'ils appellent threads. L'exécution d'un programme D commence par la fonction main dans le thread qui a été assigné au programme par le système d'exploitation. Toutes les opérations du programme sont normalement exécutées dans ce thread. Le programme est libre de démarrer d'autres threads pour pouvoir exécuter plusieurs tâches simultanément. En fait, les tâches ont été vues dans le chapitre précédent, et sont basées sur des threads démarrés automatiquement par std.parallelism.
    Le système d'exploitation peut mettre en pause l'exécution des threads sans qu'on puisse prédire quand ni pour combien de temps. Même les opérations aussi simples qu'incrémenter une variable peuvent être interrompues pendant leur exécution :

     
    Sélectionnez
    i++;
  • L'opération ci-avant implique trois étapes : lire la valeur de la variable, l'incrémenter, et assigner la nouvelle valeur à la variable. Le thread peut être mis en pause entre ces étapes et continuer sans qu'on sache quand.

  • Message : les données qui sont transmises entre des threads sont appelées des messages. Les messages peuvent être composés d'une ou de plusieurs variables, de n'importe quel type.

  • Identifiant de thread : chaque thread a un identifiant, qui est utilisé pour spécifier le destinataire d'un message.

  • Père : n'importe quel thread qui démarre un autre thread est le père de ce thread.

  • Fils : n'importe quel thread qui est démarré par un propriétaire est appelé un fils.

59-2. Démarrer des threads

spawn() prend un pointeur vers une fonction en paramètre et démarrer un nouveau thread dans lequel va s'exécuter cette fonction. Toutes les opérations qui sont exécutées au sein de cette fonction, y compris des appels à d'autres fonctions, seront exécutées dans ce nouveau thread. La différence principale entre un thread démarré avec spawn() et un thread démarré avec task() est que spawn() rend possible le passage de messages entre les threads.

Dès qu'un nouveau thread est lancé, le père et le fils s'exécutent séparément comme s'ils étaient des programmes indépendants :

 
Sélectionnez
import std.stdio;
import std.concurrency;
import core.thread;
 
void fils() {
    foreach (i; 0 .. 5) {
        Thread.sleep(500.msecs);
        writeln(i, " (fils)");
    }
}
 
void main() {
    spawn(&fils);
 
    foreach (i; 0 .. 5) {
        Thread.sleep(300.msecs);
        writeln(i, " (main)");
    }
 
    writeln("main est terminé.");
}

Les exemples dans ce chapitre font des appels à la fonction Thread.sleep pour ralentir les threads et montrer qu'ils s'exécutent simultanément. La sortie du programme montre que les deux threads, celui dans lequel s'exécute main() et l'autre qui a été démarré par spawn(), s'exécutent indépendamment en même temps :

 
Sélectionnez
0 (main)
0 (fils)
1 (main)
2 (main)
1 (fils)
3 (main)
2 (fils)
4 (main)
main est terminé
3 (fils)
4 (fils)

Le programme attend que tous les threads aient fini de s'exécuter. On peut le voir dans la sortie ci-avant, en observant que fils() continue son exécution, même après que main se soit terminé après avoir affiché « main est terminé ».

Les paramètres que la fonction démarrée dans un thread prend sont passés à spawn() comme arguments. Les deux threads fils dans le programme suivant affichent quatre nombres chacun. Ils prennent le nombre de départ en paramètre :

 
Sélectionnez
import std.stdio;
import std.concurrency;
import core.thread;
 
void fils(int nbInitial) {
    foreach (i; 0 .. 4) {
        Thread.sleep(500.msecs);
        writeln(nbInitial + i);
    }
}
 
void main() {
    foreach (i; 1 .. 3) {
        spawn(&fils, i * 10);
    }
}

La sortie d'un de ces threads est précédée d'une flèche () :

 
Sélectionnez
10
> 20
11
> 21
12
> 22
13
> 23

La sortie du programme peut être différente entre deux exécutions en fonction de comment le système d'exploitation a mis en pause et remis en route les threads.

Chaque système d'exploitation a une limite de threads qui peuvent s'exécuter simultanément. Cette limite peut être réglée par l'utilisateur, pour tout le système ou selon un autre critère. Les performances globales d'un système peuvent être altérées s'il y a plus de threads qui sont occupés que de cœurs dans le processeur. Quand un thread est occupé à réaliser des opérations, on dit qu'il est attaché au processeur. De nombreux threads passent beaucoup de temps sans réaliser d'opérations, à attendre qu'un événement se produise comme une entrée de l'utilisateur, l'arrivée de données sur une connexion réseau, la fin d'un appel à Thread.sleep, etc. Ces threads sont dits attachés à l'E/S (E/S : Entrée/Sortie). Quand ce genre de thread s'exécute, un programme peut se permettre de démarrer plus de threads que de cœurs sans dégrader ses performances. Comme dans chaque décision qui concerne les performances d'un programme, il faut réaliser des mesures pour être certains des ressources que consomme effectivement chaque thread.

59-3. Identifiants de threads

thisTid() retourne l'identifiant du thread courant. On l'utilise communément sans parenthèse :

 
Sélectionnez
import std.stdio;
import std.concurrency;
 
void printTid(string tag) {
    writefln("%s: %s", tag, thisTid);
}
 
void fils() {
    printTid("Fils");
}
 
void main() {
    spawn(&fils);
    printTid("Père");
}

Le type du retour de thisTid() et Tid, ce qui n'a pas beaucoup de sens dans ce programme, car Tid ne redéfinit pas sa fonction toString():

 
Sélectionnez
Père : Tid(std.concurrency.MessageBox)
Fils : Tid(std.concurrency.MessageBox)

La valeur de retour de spawn(), que nous avions ignoré jusqu'à maintenant, est l'identifiant du thread fils :

 
Sélectionnez
Tid monFils = spawn(&fils);

Inversement, l'id du thread père peut être obtenu grâce à la fonction ownerTid().

En résumé, le père est identifié par ownerTid() et le fils par la valeur retournée par spawn().

59-4. Le passage de messages

send() envoi des messages et receiveOnly() attend un message d'un type particulier. (Il y a aussi prioritySend(), receive() et receiveTimeout(), que l'on expliquera plus tard dans ce chapitre.)

Le père dans le programme ci-après envoie à son fils un message de type int et attend en retour un message de type double. Les threads continuent de s'envoyer des messages jusqu'à ce que le père reçoive un int négatif. Voici le thread père :

 
Sélectionnez
void main() {
    Tid fils = spawn(&fonctionFille);
 
    foreach (valeur; 1 .. 5) {
        fils.send(value);
        double resultat = receiveOnly!double();
        writefln("envoyé: %s, reçu: %s", valeur, resultat);
    }
 
    /* Envoie d'une valeur négative au fils pour que le programme
        * s'arrête */
    fils.send(-1);
}

main() stocke la valeur retournée par spawn() dans la variable fils et utilise cette valeur quand elle envoie des messages au fils.

De l'autre côté, le fils reçoit un int, utilise cette valeur dans un calcul, et envoie le résultat de type double à son père :

 
Sélectionnez
void fonctionFille(){
        int valeur = 0;
 
        while(valeur >= 0){
                valeur = receiveOnly!int();
                double resultat = to!double(valeur) / 5;
                ownerTid.send(resultat);
        }
}

Le thread principal affiche les messages qu'il envoie, et ceux qu'il reçoit :

 
Sélectionnez
envoyé : 1, reçu : 0.2
envoyé : 2, reçu : 0.4
envoyé : 3, reçu : 0.6
envoyé : 4, reçu : 0.8

Il est possible d'envoyer plus qu'une valeur dans un même message. Le message suivant est fait de trois parties :

 
Sélectionnez
ownerTid.send(thisTid, 42, 1.5);

Les valeurs passées ensemble dans un message forment un tuple pour celui qui reçoit le message. Dans ces cas, les paramètres du template de receiveOnly() doivent correspondre aux membres du tuple :

 
Sélectionnez
auto message = receiveOnly!(Tid, int, double)();
 
auto sender   = message[0];
auto integer  = message[1];
auto floating = message[2];

Si les types ne correspondent pas, une exception MessageMismatch est levée :

 
Sélectionnez
import std.concurrency;
 
void fonctionFils() {
    ownerTid.send("bonjour");    // ← Sending string
}
 
void main() {
    spawn(&fonctionFils);
 
    auto message = receiveOnly!double();    // ← Expecting double
}

La sortie :

 
Sélectionnez
std.concurrency.MessageMismatch@std/concurrency.d(235):
Unexpected message type: expected 'double', got 'immutable(char)[]'

Les exceptions que le fils peut lever ne sont pas attrapées par le père. Une solution possible est de faire envoyer l'exception en tant que message par le fils. Nous verrons cela plus tard dans le chapitre.

59-5. Exemple

Utilisons ce que nous avons vu jusqu'à présent dans un programme de simulation.

Le programme va simuler des robots indépendants qui bougent au hasard dans un espace en deux dimensions. Le mouvement de chaque robot est géré par un thread qui prend trois informations quand il est démarré :

  • le numéro (id) du robot : cette information est renvoyée au père pour identifier le robot qui envoie un message ;
  • l'origine : ce sont les coordonnées de départ du robot ;
  • la durée entre chaque étape : cette information est utilisée pour déterminer quand le robot bougera.

Ces informations peuvent être stockées dans cette structure Job :

 
Sélectionnez
struct Job {
        size_t robotId;
        Position origine;
        Duration reposDuree;
}

La fonction qui fait bouger les robots dans un thread envoie l'id du robot et ses mouvements au thread père continuellement :

 
Sélectionnez
void bougeurRobot(Job job){
        Position depuis = job.origin;
 
        while(true){
                Thread.sleep(job.reposDuree);
 
                Position to = voisinAuHasard(depuis);
                Movement mouvement = Movement(depuis, vers);
                depuis = vers;
                ownerTid.send(MovementMessage(job.robotId, mouvement));
        }
}

Le père ne fait qu'attendre ces messages dans une boucle infinie. Il identifie les robots par leurs id qui sont spécifiés dans les messages. Le père ne fait qu'afficher les différents mouvements :

 
Sélectionnez
while(true){
        auto message = receiveOnly!MovementMessage();
 
        writefln("%s %s", robots[message.robotId], message.mouvement);
}

Tous les messages dans ce programme vont du fils au père. Le passage de messages implique des schémas de communication bien plus complexes dans de vrais programmes.

Voici le programme complet :

 
Sélectionnez
import std.stdio;
import std.random;
import std.string;
import std.concurrency;
import core.thread;
 
struct Position {
    int ligne;
    int colonne;
 
    string toString() {
        return format("%s,%s", ligne, colonne);
    }
}
 
struct Movement {
    Position depuis;
    Position vers;
 
    string toString() {
        return ((depuis == vers)
                ? format("%s (idle)", depuis)
                : format("%s -> %s", depuis, vers));
    }
}
 
class Robot {
    string image;
    Duration reposDuree;
 
    this(string image, Duration reposDuree) {
        this.image = image;
        this.reposDuree = reposDuree;
    }
 
    override string toString() {
        return format("%s(%s)", image, reposDuree);
    }
}
 
/* Retourne une position au hasard */
Position positionHasard() {
    return Position(uniform!"[]"(-10, 10),
                    uniform!"[]"(-10, 10));
}
 
/* Retourne une coordonnée à une distance d'au maximum un de la coordonnée courante */
int pasAuHasard(int courant) {
    return courant + uniform!"[]"(-1, 1);
}
 
/* Retourne un voisin de la position passée en paramètre.
 * Cela peut être un des voisins dans les huit directions,
 * ou la position elle-même. */
Position voisinAuHasard(Position position) {
    return Position(pasAuHasard(position.line),
                    pasAuHasard(position.column));
}
 
struct Job {
    size_t robotId;
    Position origine;
    Duration reposDuree;
}
 
struct MovementMessage {
    size_t robotId;
    Movement mouvement;
}
 
void bougeurRobot(Job job) {
    Position from = job.origine;
 
    while (true) {
        Thread.sleep(job.reposDuree);
 
        Position to = voisinAuHasard(from);
        Movement mouvement = Movement(from, to);
        from = to;
 
        ownerTid.send(MovementMessage(job.robotId, mouvement));
    }
}
 
void main() {
    /* Des robots avec des durées de repos différentes. */
    Robot[] robots = [ new Robot("A",  600.msecs),
                       new Robot("B", 2000.msecs),
                       new Robot("C", 5000.msecs) ];
 
    /* Démarre un bougeurRobot pour chaque robot. */
    foreach (robotId, robot; robots) {
        spawn(&bougeurRobot, Job(robotId,
                               positionHasard(),
                               robot.reposDuree));
    }
 
    /* Affiche les infos de chaque mouvement des robots. */
    while (true) {
        auto message = receiveOnly!MovementMessage();
 
        /* Affiche le mouvement dans ce robot. */
        writefln("%s %s",
                 robots[message.robotId], message.movement);
    }
}

Le programme affiche chaque mouvement jusqu'à ce qu'il soit coupé :

 
Sélectionnez
A(600 ms) 6,2 -> 7,3
A(600 ms) 7,3 -> 8,3
A(600 ms) 8,3 -> 7,3
B(2 secs) -7,-4 -> -6,-3
A(600 ms) 7,3 -> 6,2
A(600 ms) 6,2 -> 7,1
A(600 ms) 7,1 (idle)
B(2 secs) -6,-3 (idle)
A(600 ms) 7,1 -> 7,2
A(600 ms) 7,2 -> 7,3
C(5 secs) -4,-4 -> -3,-5
A(600 ms) 7,3 -> 6,4
...

Ce programme montre comment la concurrence par passage de messages peut être utile : les mouvements des robots sont calculés indépendamment par des threads séparés qui ne savent rien les uns des autres. C'est le père qui traite le processus d'affichage en série en recevant les messages un par un.

59-6. Attendre différents types de messages

receiveOnly() peut attendre un type de message. receive() par contre peut attendre plus d'un type de message. Cette fonction peut trier les messages et les distribuer à des fonctions déléguées. Quand un message arrive, son type est comparé à celui qu'accepte chaque fonction déléguée. La déléguée qui correspond au type prend alors le message en charge.

Par exemple, l'appel à la fonction receive() suivant spécifie deux fonctions qui prennent en charge les messages de type string et int respectivement :

 
Sélectionnez
void fonctionFils(){
        bool termine = false;
 
        while(!termine){
                void intHandler(int message){
                        writeln("prise en charge d'un int : ", message);
 
                        if(message == -1){
                                writeln("sortie");
                                termine = true;
                        }
                }
 
                void stringHandler(string message){
                        writeln("prise en charge d'un string : ", message);
                }
 
                receive(&intHandler, &stringHandler);
        }
}

Les messages de type int correspondront à intHandler() et les messages de type string correspondront à stringHandler(). Le thread fils ci-avant peut être testé dans le programme suivant :

 
Sélectionnez
import std.stdio;
import std.concurrency;
 
//...
 
void main(){
        auto fils = spawn(&fonctionFils);
 
        fils.send(10);
        fils.send(42);
        fils.send("hello");
        fils.send(-1);
}

La sortie du programme montre que les messages sont bien répartis aux fonctions correspondantes à la réception :

 
Sélectionnez
prise en charge d'un int : 10
prise en charge d'un int : 42
prise en charge d'un string : hello
prise en charge d'un int : -1
sortie

Les lambdas-fonctions et les objets qui implémentent la fonction membre opCall() peuvent aussi être passés en paramètre à receive() en tant que fonctions qui prennent en charge des messages. Le programme suivant gère les messages en les passant à des lambda-fonctions. Il définit également un type Exit utilisé dans la communication avec le thread pour préciser si oui ou non il faut terminer le programme. Utiliser un type explicité pour cela est plus clair que d'envoyer une valeur arbitraire comme -1, tel que nous l'avons fait dans l'exemple précédent.

Il y a trois fonctions anonymes ci-après qui sont passées à receive() comme fonctions qui prennent en charge des messages :

 
Sélectionnez
import std.stdio;
import std.concurrency;
 
struct Exit(){
}
 
void fonctionFils(){
        bool termine = false;
 
        while(!termine){
                receive(
                                (int message) {
                                        writeln("message int : ", message);
                                },
                                (string message) {
                                        writeln("message string : ", message);
                                },
                                (Exit message) {
                                        writeln("exiting");
                                        termine = true;
                                }
                           );
        }
}
 
void main(){
        auto fils = spawn(&fonctionFils);
 
        fils.send(10);
        fils.send(42);
        fils.send("coucou");
        fils.send(Exit());
}

59-6-1. Recevoir n'importe quel type de message

std.variant.Variant est un type qui peut encapsuler n'importe quel type de données. Les messages qui ne correspondent à aucune fonction spécifiée avant dans la liste des arguments de receive() sont toujours passées à la fonction qui prend un Variant en paramètre :

 
Sélectionnez
import std.stdio;
import std.concurrency;
 
void fonctionFils(){
        receive(
                        (int message){
                                /* ... */
                        },
                        (string message){
                                /* ... */
                        },
                        (Variant message){
                                writeln("Message inattendu : ", message);
                        }
                   );
}
 
struct SpecialMessage{
        // ...
}
 
void main(){
        auto fils = spawn(&fonctionFils);
        fils.send(SpecialMessage);
}

La sortie :

 
Sélectionnez
Message inattendu : SpecialMessage()

Les détails concernant Variant sortent du cadre de ce chapitre.

59-7. Attendre des messages pendant un certain temps

Il se peut qu'on veuille attendre un message uniquement dans un certain délai de temps. Il se peut par exemple que l'expéditeur ait fini son exécution. receiveTimeout() empêche le thread recevant d'attendre indéfiniment. La fonction renvoie true si un message a été reçu et false dans le cas contraire.

 
Sélectionnez
import std.stdio;
import std.concurrency;
import core.thread;
 
void fonctionFils() {
    Thread.sleep(3.seconds);
    ownerTid.send("bonjour");
}
 
void main() {
    spawn(&fonctionFils);
 
    writeln("En attente d'un message");
    bool recu = false;
    while (!recu) {
        recu = receiveTimeout(600.msecs,
                                  (string message) {
                                      writeln("recu: ", message);
                                });
 
        if (!recu) {
            writeln("... Pas de message pour l'instant");
 
            /* ... D'autres opérations peuvent être effectuées ici ...*/
        }
    }
}

Le père ci-avant attend pour un message pendant 600 millisecondes. Il peut continuer à exécuter d'autres opérations si aucun message ne s'exécute pendant ce temps.

 
Sélectionnez
En attente d'un message
... Pas de message pour l'instant
... Pas de message pour l'instant
... Pas de message pour l'instant
recu: bonjour

59-8. Exceptions pendant l'exécution du thread fils

Comme nous l'avons vu dans le chapitre précédent, quand on utilise std.parallelism, une exception levée dans une tâche peut être automatiquement attrapée par le thread père. Cela permet de traiter ces exceptions :

 
Sélectionnez
try{
        theTask.yieldForce();
} catch (Exception e) {
        writefln("Erreur détectée dans la tâche : '%s'", exec.msg);
}

std.concurrency ne permet pas de base de traiter les exceptions de cette façon. Cependant, les exceptions peuvent être attrapées et envoyées explicitement par le thread fils. Comme nous le verrons ci-après, il est aussi possible de recevoir des exceptions OwnerTerminated et LinkTerminated en tant que message.

La fonction calculer() ci-après reçoit des messages de type string, les convertit en double, leur ajoute 0.5 et renvoie le résultat par message :

 
Sélectionnez
void calculer(){
        while(true){
                auto message = receiveOnly!string();
                ownerTid.send(to!double(message) + 0.5);
        }
}

La fonction to!double() peut lever une exception si la chaîne de caractères n'est pas convertible en valeur double. Comme cette exception entraînerait la fin de l'exécution de la fonction fille, le père dans le programme suivant ne pourrait recevoir qu'un seul message :

 
Sélectionnez
import std.stdio;
import std.concurrency;
import std.conv;
 
// ...
 
void main() {
    Tid calculator = spawn(&calculer);
 
    calculator.send("1.2");
    calculator.send("hello");  // ← Entrée incorrecte
    calculator.send("3.4");
 
    foreach (i; 0 .. 3) {
        auto message = receiveOnly!double();
        writefln("résultat %s: %s", i, message);
    }
}

Le père reçoit la réponse 1.7 pour « 1.2 », mais parce que la fonction fille s'est terminée, le père sera bloqué à attendre un message qui n'arrivera jamais.

 
Sélectionnez
result 0: 1.7
                 ← waiting for a message that will never arrive

Ce que peut faire la fonction fille, c'est capter l'exception et l'envoyer dans un message d'erreur spécial. La programme suivant envoie la raison de l'échec dans une structure EchecCalcul dans un message. De plus, ce programme utilise un type Exit pour signaler quand la fonction fille doit se terminer :

 
Sélectionnez
import std.stdio;
import std.concurrency;
import std.conv;
 
struct EchecCalcul {
    string reason;
}
 
struct Exit {
}
 
void calculate() {
    bool termine = false;
 
    while (!termine) {
        receive(
            (string message) {
                try {
                    ownerTid.send(to!double(message) + 0.5);
 
                } catch (Exception exc) {
                    ownerTid.send(EchecCalcul(exc.msg));
                }
            },
 
            (Exit message) {
                termine = true;
            });
    }
}
 
void main() {
    Tid calculator = spawn(&calculate);
 
    calculator.send("1.2");
    calculator.send("hello");  // ← entrée incorrecte
    calculator.send("3.4");
    calculator.send(Exit());
 
    foreach (i; 0 .. 3) {
        writef("resultat %s: ", i);
 
        receive(
            (double message) {
                writeln(message);
            },
 
            (EchecCalcul message) {
                writefln("ERREUR! '%s'", message.reason);
            });
    }
}

Cette fois-ci, la raison de l'échec est affichée par le père :

 
Sélectionnez
resultat 0: 1.7
resultat 1: ERREUR! 'no digits seen'
resultat 2: 3.9

Un autre moyen de faire serait d'envoyer l'exception en elle-même au thread père. La père peut alors utiliser l'exception ou la lever à nouveau :

 
Sélectionnez
// ... Dans le fils ...
try{
        //...
 
} catch (share(Exception) exc){
        ownerTid.send(exc);
}},
 
// ... Dans le père ...
receive(
        //...
        (shared(Exception) exc){
                throw exc;
        }
);

59-9. Détecter la fin de l'exécution d'un thread

Les threads peuvent détecter la fin de l'exécution d'un receveur de messages.

59-9-1. L'exception OwnerTerminated

Cette exception est levée à la réception d'un message du père, si le père a terminé son exécution. Le thread père suivant se contente d'envoyer deux messages à son fils et se termine. Cela lance une OwnerTerminated dans le thread fils :

 
Sélectionnez
import std.stdio;
import std.concurrency;
 
void main() {
    spawn(&fonctionIntermediaire);
}
 
void fonctionIntermediaire() {
    auto fils = spawn(&fonctionFils);
    fils.send(1);
    fils.send(2);
}  // ← Se termine après avoir envoyé deux messages
 
void fonctionFils() {
    while (true) {
        auto m = receiveOnly!int(); // ← Une exception est levée
                                    //  si le père est terminé.
        writeln("Message: ", m);
    }
}

La sortie :

 
Sélectionnez
Message: 1
Message: 2
std.concurrency.OwnerTerminated@std/concurrency.d(248):
Owner terminated

Le fils peut attraper l'exception et finir son exécution proprement :

 
Sélectionnez
void fonctionFille(){
        bool termine = false;
 
        while(!termine){
                try{
                        auto m = receiveOnly!int();
                        writeln("Message: ", m);
                } catch(OwnerTerminated exc){
                        writeln("La fonction mère est terminée.");
                        termine = true;
                }
        }
}

La sortie :

 
Sélectionnez
Message: 1
Message: 2
La fonction mère est terminée.

Nous allons voir dans la prochaine partie que cette exception peut aussi être reçue en tant que message.

59-9-2. L'exception LinkTerminated

spawnLinked() est utilisée de la même façon que spawn(). Quand un thread fils lancé par spawnLinked() se termine, le thread père est informé par le lancement d'une exception LinkTerminated :

 
Sélectionnez
import std.stdio;
import std.concurrency;
 
void main() {
    auto fils = spawnLinked(&fonctionFille);
 
    while (true) {
        auto m = receiveOnly!int(); // ← Une exception est
                                    //   levée si le fils
                                    //   est terminé.
        writeln("Message: ", m);
    }
}
 
void fonctionFille() {
    ownerTid.send(10);
    ownerTid.send(20);
}  // ← Se termine après avoir envoyé deux valeurs.

Le fils ci-avant se termine après avoir envoyé deux messages. Comme il a été démarré par spawnLinked(), le père est notifié de la fin de l'exécution du fils par une exception LinkTerminated.

 
Sélectionnez
Message: 10
Message: 20
std.concurrency.LinkTerminated@std/concurrency.d(263):
Link terminated

Le père peut attraper cette exception et faire quelque chose pour terminer son exécution proprement :

 
Sélectionnez
bool termine = false;
 
while (!termine) {
    try {
        auto m = receiveOnly!int();
        writeln("Message: ", m);
 
    } catch (LinkTerminated exc) {
        writeln("Le fils s'est terminé");
        termine = true;
    }
}

59-9-3. Recevoir les exceptions en tant que messages

Les exceptions OwnerTerminated et LinkTerminated peuvent également être reçues comme des messages. Le code suivant le montre pour une exception OwnerTerminated :

 
Sélectionnez
bool termine = false;
 
while (!termine) {
    receive(
        (int message) {
            writeln("Message: ", message);
        },
 
        (OwnerTerminated exc) {
            writeln("Le fils s'est terminé, on quitte le programme.");
            termine = true;
        }
    );
}

59-10. Gestion de boîte aux lettres

Chaque thread a une boîte aux lettres dédiée qui contient tous les messages envoyés à ce thread. Le nombre de messages dans une boîte aux lettres peut croître ou décroître en fonction du temps que met le thread à recevoir les réponses et à y répondre. Une boîte aux lettres qui grossit continuellement sature le système et résulte probablement d'un défaut dans la conception du programme. Cela peut également signifier que le thread ne traitera jamais les messages les plus récents.

setMaxMailboxSite() est une fonction utilisée pour limiter le nombre de messages qu'une boîte aux lettres peut contenir. Ses trois paramètres précisent la boîte aux lettres, le nombre maximum de messages qu'elle peut contenir, et ce qui doit être fait quand la boîte aux lettres est pleine, dans cet ordre. Il y a quatre options pour le dernier paramètre :

  • OnCrowding.block: l'expéditeur est mis en pause jusqu'à ce qu'il y ait de la place dans la boîte aux lettres ;
  • OnCrowding.ignore: le message est ignoré ;
  • OnCrowding.throwException: une exception MailboxFull est levée à l'expéditeur du message ;
  • une fonction de type function(Tid): la fonction spécifiée est appelée.

Avant d'examiner un exemple de setMaxMailboxSize(), parlons de ce qui peut amener une boîte aux lettres à grossir continuellement. Dans le programme suivant, le fils envoie des messages les uns à la suite des autres, mais le père passe un peu de temps à recevoir chaque message :

 
Sélectionnez
/* ATTENTION: votre système peut avoir
* un comportement imprédictible quand
* ce programme est exécuté.*/
import std.concurrency;
import core.thread;
 
void fonctionFille() {
    while (true) {
        ownerTid.send(42);    // ← Produit continuellement des messages
    }
}
 
void main() {
    spawn(&fonctionFille);
 
    while (true) {
        receive(
            (int message) {
                //Prend du temps à chaque message
                Thread.sleep(1.seconds);
            });
    }
}

Parce que le destinataire est plus lent que l'expéditeur, la mémoire que le programme ci-avant utilise va grandir sans s'arrêter. Pour se prémunir de ce phénomène, le père peut limiter la taille de sa boîte aux lettres avant de démarrer la fonction fille :

 
Sélectionnez
void main() {
    setMaxMailboxSize(thisTid, 1000, OnCrowding.block);
 
    spawn(&fonctionFille);
// ...
}

L'appel à setMaxMailboxSize() définit la taille de la boîte aux lettres du thread principal à 1000. OnCrowding.block fait attendre l'expéditeur jusqu'à ce qu'il y ait de la place dans la boîte aux lettres.

L'exemple suivant utilise onCrowding.throwException, qui lance une exception MailboxFull à l'expéditeur quand la boîte aux lettres est pleine :

 
Sélectionnez
import std.concurrency;
import core.thread;
 
void fonctionFille() {
    while (true) {
        try {
            ownerTid.send(42);
 
        } catch (MailboxFull exc) {
            /* Échec de l'envoi, on réessaye dans une seconde. */
            Thread.sleep(1.msecs);
        }
    }
}
 
void main() {
    setMaxMailboxSize(thisTid, 1000, OnCrowding.throwException);
 
    spawn(&fonctionFille);
 
    while (true) {
        receive(
            (int message) {
                Thread.sleep(1.seconds);
            });
    }
}

59-11. Messages prioritaires

Certains messages peuvent être prioritaires par rapport aux messages ordinaires grâce à la fonction prioritySend(). Ces messages sont traités avant les autres dans la boîte aux lettres :

 
Sélectionnez
prioritySend(pereTid, ImportantMessage(100));

Si le destinataire n'a pas de fonction qui gère le type du message prioritaire, une exception PriorityMessageException est levée :

 
Sélectionnez
std.concurrency.PriorityMessageException@std/concurrency.d(280):
Priority message

59-12. Threads nommés

Dans les programmes simples que nous avons vus avant, il était facile d'utiliser les id des pères et des fils. Transmettre les id de threads en threads peut être extrêmement compliqué dans des programmes qui utilisent plus que quelques threads. Pour rendre tout cela moins complexe, il est possible d'assigner des noms aux threads qui sont accessibles depuis n'importe quel thread.

Les trois fonctions suivantes sont une interface pour accéder à un tableau associatif auquel chaque thread a accès :

  • register() : associe un thread à un nom ;
  • locate() : retourne le thread associé à un nom. S'il n'y a pas de thread associé au nom, alors Tid.init est retourné ;
  • unregister() : dissocie l'association entre un thread et un nom.

La programme suivant démarre deux threads qui communiquent entre eux par leur nom. Ces threads s'envoient des messages l'un à l'autre jusqu'à ce qu'ils reçoivent un message Exit :

 
Sélectionnez
import std.stdio;
import std.concurrency;
import core.thread;
 
struct Exit{}{
}
 
void main(){
        // Un thread dont le partenaire s'appelle "second"
        auto premier = spawn(&player, "second");
        register("premier", premier);
        scope(exit) unregister("premier");
 
        // Un thread dont le partenaire s'appelle "premier"
        auto second = spawn(&player, "first");
        register("second", second);
        scope(exit) unregister("second");
 
        Thread.sleep(2.seconds);
 
        prioritySend(first, Exit());
        prioritySend(second, Exit());
 
        // Pour que les appels à unregister() soient effectués, main()
        // doit attendre que les fils aient terminé leur exécution.
        thread_joinAll();
}
 
void player(string nomDuPartenaire){
        Tid partenaire;
 
        while(partenaire == Tid.init){
                Thread.sleep(1.msecs);
                partnaire = locate(nomDuPartenaire);
        }
 
        bool termine = false;
 
        while(!termine){
                partner.send("hello " ~ nomDuPartenaire);
                receive(
                                (string message){
                                        writeln("Message: ", message);
                                        Thread.sleep(500.msecs);
                                },
                                (Exit message){
                                        writefln("%s, Je me termine", nomDuPartenaire);
                                        termine = true;
                                }
                           );
        }
}

L'appel à thread_joinAll() que l'on peut voir à la fin du main fait attendre le père jusqu'à ce que les fils aient terminé.

La sortie :

 
Sélectionnez
Message: hello second
Message: hello first
Message: hello second
Message: hello first
Message: hello first
Message: hello second
Message: hello first
Message: hello second
second, Je me termine.
first, Je me termine.

59-13. Résumé

  • Quand les threads ne doivent pas communiquer entre eux, préférez le parallélisme, qui a été vu dans le chapitre précédent. N'utilisez la concurrence que quand les threads doivent interagir.
  • La concurrence par partage de mémoire étant difficile à implémenter correctement, préférez la concurrence par messages, qui a été couverte dans ce chapitre.
  • spawn() et spawnLinked() démarrent des threads.
  • thisTid est l'id du thread courant.
  • ownerTid est l'id du thread père du thread courant.
  • send() et prioritySend() envoient des messages.
  • receiveOnly(), receive() et receiveTimeout() attendent des messages.
  • Variant correspond à n'importe quel type de message.
  • setMaxMailboxSize() limite la taille de la boîte aux lettres.
  • register(), unregister() et locate() permettent d'identifier les threads par des noms.
  • Des exceptions peuvent être levées pendant le passage de messages : MessageMismatch, OwnerTerminated, LinkTerminated, MailboxFull et PriorityMessageException.
  • Le père peut attraper automatiquement les exceptions levées par un fils.

précédentsommairesuivant