programmez-en-d/concurrence.whata

1115 lines
36 KiB
Plaintext

[set
title = "Concurrence par messages"
partAs = chapitre
translator = "Quentin Ladeveze"
]
La concurrence est à la fois similaire et différente du sujet que nous avons abordé dans le dernier chapitre, le parallélisme. Comme ces deux concepts impliquent tout les deux d'éxecuter un programme sur des [* threads], et que le parallélisme est basé sur la concurrence, ils sont souvent confondus.
Voici les différences entre le parallélisme et la concurrence~ :
- L'objectif principal du parallélisme est de profiter des multiples coeurs d'un processeur pour améliorer les performances d'un programme. La concurrence d'un autre côté, est un concept qui peut être utile même dans un environnement mono-coeur. La concurrence, c'est faire s'éxecuter un programme sur plus d'un [* thread] à la fois.
- En parallélisme, les tâches sont indépendantes les unes des autres, ce serait en fait un bug si une tâche dépendait d'autres tâches qui s'éxecutent au même moment. Dans le cas de la concurrence, il est tout à fait normal qu'un [* thread] dépende des résultats d'autres [* threads]
- Même si les deux modèles utilisent les [* threads] du système, les [* threads] du parallélisme sont encapsulées dans le concept de tâche. La concurrence utilise les [* threads] explicitement.
- Le parallélisme est facile à utiliser, et tant que les tâches sont indépendantes, il est facile de créer des programmes qui fonctionnent correctement. La concurrence est facile à mettre en place uniquement quand elle est basée sur les [* messages]. Il est très difficile d'écrire des programmes concurrents si ils sont basés sur le modèle traditionel de la concurrence, qui implique le partage de données verrouillées.
D permet d'utiliser les deux modèles de concurrence~ : les messages et le partage de données. Nous allons parler des messages dans ce chapitre et du partage de données dans le prochain chapitre.
[ = Concepts
- [** [* Thread]]~ : Les systèmes d'exploitation considèrent les programmes comme des unités qu'ils appellent [* threads]. L'éxecution d'un programme D commence par la fonction [** main] dans le [* thread] qui a été assigné au programme par le système d'exploitation. Toutes les opérations du programme sont normalement executées dans ce [* thread]. Le programme est libre de démarrer d'autres [* threads] pour pouvoir exécuter plusieurs tâches simultanément. En fait, les tâches ont été vues dans le chapitre précédent, et sont basées sur des [* threads] démarés automatiquement par [c std.parallelism].
Le système d'exploitation peut mettre en pause l'exécution des [* threads] sans qu'on puisse prédire quand ni pour combien de temps. Même les opérations aussi simples que incrémenter une variable peuvent être interrompues pendant leur éxecution:
[code=d <<< i++; >>>]
L'opération ci-avant implique trois étapes: Lire la valeur de la variable, l'incrémenter, et assigner la nouvelle valeur à la variable. Le [* thread] peut être mis en pause entre ces étapes et continuer sans qu'on sache quand.
- [** Message]~ : Les données qui sont transmises entre des [* threads] sont appelés des messages. Les mesages peuvent être composés d'une ou de plusieurs variables, de n'importe quel type.
- [** Identifiant de [* thread]]~ : Chaque [* thread] a un identifiant, qui est utilisé pour spécifiant le destinataire d'un message.
- [** Père]~ : N'importe quel [* thread] qui démarre un autre [* thread] est le père de ce [* thread].
- [** Fils]~ : N'importe quel [* thread] qui est démarré par un propriétaire est appellé un fils.
]
[ = Démarrer des [* threads]
[c spawn()] prend un pointeur vers une fonction en paramètre et démarrer un nouveau [* thread] dans lequel va s'éxecuter cette fonction. Toutes les opérations qui sont éxecutées au sein de cette fonction, y compris des appels à d'autres fonctions, seront exécutées dans ce nouveau [* thread]. La différence principale entre un [* thread] démarré avec [c spawn()] et un [* thread] démarré avec [c task()] est que [c spawn()] rend possible le passage de messages entre les [* threads].
Dès qu'un nouveau [* thread] est lancé, le père et le fils s'éxecutent séparément comme si ils étaient des programmes indépendants:
[code=d <<<
import std.stdio;
import std.concurrency;
import core.thread;
void fils() {
foreach (i; 0 .. 5) {
Thread.sleep(500.msecs);
writeln(i, " (fils)");
}
}
void main() {
spawn(&fils);
foreach (i; 0 .. 5) {
Thread.sleep(300.msecs);
writeln(i, " (main)");
}
writeln("main est terminé.");
}
>>>]
Les exemples dans ce chapitre font des appels à la fonction [c Thread.sleep] pour ralentir les [* threads] et montrer qu'ils s'éxecutent simultanément. La sortie du programme montre que les deux [* threads], celui dans lequel s'éxecute [c main()] et l'autre qui a été démarré par [c spawn()], s'exécutent indépendament en même temps:
[output <<<
0 (main)
0 (fils)
1 (main)
2 (main)
1 (fils)
3 (main)
2 (fils)
4 (main)
main est terminé.
3 (fils)
4 (fils)
>>>]
Le programme attend que tout les [* threads] aient fini de s'éxecuter. On peut le voir dans le sortie ci-avant, en observant que [c fils()] continue son éxecution, même après que main se soit terminé après avoir affiché "main est terminé."
Les paramètres que la fonction démarrée dans un thread prend sont passés à [c spawn()] comme arguments. Les deux [* threads] fils dans le programme suivant affichent quatre nombres chacun. Ils prennent le nombre de départ en paramètre:
[code=d <<<
import std.stdio;
import std.concurrency;
import core.thread;
void fils(int nbInitial) {
foreach (i; 0 .. 4) {
Thread.sleep(500.msecs);
writeln(nbInitial + i);
}
}
void main() {
foreach (i; 1 .. 3) {
spawn(&fils, i * 10);
}
}
>>>]
La sortie d'un de ces threads est précedée d'une flèche ([** >]):
[output <<<
10
> 20
11
> 21
12
> 22
13
> 23
>>>]
La sortie du programme peut être différente entre deux éxecutions en fonction de comment le système d'exploitation a mis en pause et remis en route les [* threads].
Chaque système d'exploitation a une limite de [* threads] qui peuvent s'éxecuter simultanément. Cette limite peut être reglée par utilisateur, pour tout le système ou selon un autre critère. Les performances globales d'un système peuvent être alterées si il y a plus de [* threads] qui sont occupés que de coeurs dans le processeur. Quand un [* thread] est occupé à réaliser des opérations, on dit qu'il est [** attaché au processeur]. De nombreux [* threads] passent beaucoup de temps sans réaliser d'opérations, à attendre qu'un évenement se produise comme une entrée de l'utilisateur, l'arrivée de données sur une connection réseau, la fin d'un appel à [c Thread.sleep], etc. Ces [* threads] sont dits [** attachés à l'E/S] ([* E/S~ : Entrée/Sortie]). Quand ce genre de [* thread] s'éxecute, un programme peut se permettre de démarrer plus de [* threads] que de coeurs sans dégrader ses performances. Comme dans chaque décision qui concerne les performances d'un programme, il faut réaliser des mesures pour être certains des ressources que consomme effectivement chaque [* thread].
]
[ = Identifiants de [* threads]
[c thisTid()] retourne l'identifiant du [* thread] courrant. On l'utilise communément sans paranthèses:
[code=d <<<
import std.stdio;
import std.concurrency;
void printTid(string tag) {
writefln("%s: %s", tag, thisTid);
}
void fils() {
printTid("Fils");
}
void main() {
spawn(&fils);
printTid("Père");
}
>>>]
La type du retour de [c thisTid()] et [c Tid], ce qui n'a pas beaucoup de sens dans ce programme, car [c Tid] ne redéfinie pas sa fonction [c toString()]:
[output <<<
Père : Tid(std.concurrency.MessageBox)
Fils : Tid(std.concurrency.MessageBox)
>>>]
La valeur de retour de [c spawn()], que nous avions ignoré jusqu'à maintenant, est l'identifiant du [* thread] fils:
[code=d <<< Tid monFils = spawn(&fils); >>>]
Inversement, l'id du [* thread] père peut être obtenu grâce à la fonction [c ownerTid()].
En résumé, le père est identifié par [c ownerTid()] et le fils par la valeur retournée par [c spawn()].
]
[ = Le passage de messages
[c send()] envoie des messages et [c receiveOnly()] attend un message d'un type particulier. (Il y a aussi [c prioritySend()], [c receive()] et [c receiveTimeout()], que l'on expliquera plus tard dans ce chapitre.)
La père dans le programme ci-après envoie à son fils un message de type [c int] et attend en retour un message de type [c double]. Les [* threads] continuent de s'envoyer des messages jusqu'à ce que le père reçoive un [c int] négatif. Voici le [* thread] père:
[code=d <<<
void main() {
Tid fils = spawn(&fonctionFille);
foreach (valeur; 1 .. 5) {
fils.send(value);
double resultat = receiveOnly!double();
writefln("envoyé: %s, reçu: %s", valeur, resultat);
}
/* Envoie d'une valeur négative au fils pour que le programme
* s'arrête */
fils.send(-1);
}
>>>]
[c main()] stocke la valeur retournée par [c spawn()] dans la variable [c fils] et utilise cette valeur quand elle envoie des messages au fils.
De l'autre côté, le fils reçoit un [c int], utilise cette valeur dans un calcul, et envoie le résultat de type [c double] à son père:
[code=d <<<
void fonctionFille(){
int valeur = 0;
while(valeur >= 0){
valeur = receiveOnly!int();
double resultat = to!double(valeur) / 5;
ownerTid.send(resultat);
}
}
>>>]
Le [* thread] principale affiche les messages qu'il envoie, et ceux qu'il reçoit:
[output <<<
envoyé : 1, reçu : 0.2
envoyé : 2, reçu : 0.4
envoyé : 3, reçu : 0.6
envoyé : 4, reçu : 0.8
>>>]
Il est possible d'envoyer plus qu'une valeur dans un même message. Le message suivant est fait de trois parties:
[code=d <<< ownerTid.send(thisTid, 42, 1.5); >>>]
Les valeurs passées ensemble dans un message forment un tuple pour celui qui reçoit le message. Dans ces cas, les paramètres du template de [c receiveOnly()] doivent correspondre aux membres du tuple:
[code=d <<<
auto message = receiveOnly!(Tid, int, double)();
auto sender = message[0];
auto integer = message[1];
auto floating = message[2];
>>>]
Si les types ne correspondent pas, une exception [c MessageMismatch] est levée:
[code=d <<<
import std.concurrency;
void fonctionFils() {
ownerTid.send("bonjour"); // ← Sending string
}
void main() {
spawn(&fonctionFils);
auto message = receiveOnly!double(); // ← Expecting double
}
>>>]
La sortie:
[output <<<
std.concurrency.MessageMismatch@std/concurrency.d(235):
Unexpected message type: expected 'double', got 'immutable(char)[]'
>>>]
Les exceptions que le fils peut lever ne sont pas attrapées par le père. Une solution possible est de faire envoyer l'exception en tant que message par le fils. Nous verrons celà plus tard dans le chapitre.
]
[ = Exemple
Utilisons ce que nous avons vu jusqu'à présent dans un programme de simulation.
Le programme va simuler des robots indépendants qui bougent au hasard dans un espace en deux dimensions. Le mouvement de chaque robot est géré par un [* thread] qui prend trois informations quand il est démarré:
- Le numéro (id) du robot: cette information est renvoyée au père pour identifier le robot qui envoie un message.
- L'origine: ce sont les coordonnées de départ du robot.
- La durée entre chaque étape: cette information est utilisée pour déterminer quand est-ce que le robot bougera.
Ces informations peuvent être stockées dans cette structure [c Job]:
[code=d <<<
struct Job {
size_t robotId;
Position origine;
Duration reposDuree;
}
>>>]
La fonction qui fait bouger les robots dans un [* thread] envoie l'id du robot et ses mouvements au [* thread] père continuellement:
[code=d <<<
void bougeurRobot(Job job){
Position depuis = job.origin;
while(true){
Thread.sleep(job.reposDuree);
Position to = voisinAuHasard(depuis);
Movement mouvement = Movement(depuis, vers);
depuis = vers;
ownerTid.send(MovementMessage(job.robotId, mouvement));
}
}
>>>]
Le père ne fait qu'attendre ces messages dans une boucle infinie. Il identifie les robots par leurs ids qui sont spécifiés dans les messages. Le père ne fait qu'afficher les différents mouvement:
[code=d <<<
while(true){
auto message = receiveOnly!MovementMessage();
writefln("%s %s", robots[message.robotId], message.mouvement);
}
>>>]
Tout les messages dans ce programme vont du fils au père. Le passage de messages implique des schémas de communication bien plus complexes dans des vrais programmes.
Voici le programme complet:
[code=d <<<
import std.stdio;
import std.random;
import std.string;
import std.concurrency;
import core.thread;
struct Position {
int ligne;
int colonne;
string toString() {
return format("%s,%s", ligne, colonne);
}
}
struct Movement {
Position depuis;
Position vers;
string toString() {
return ((depuis == vers)
? format("%s (idle)", depuis)
: format("%s -> %s", depuis, vers));
}
}
class Robot {
string image;
Duration reposDuree;
this(string image, Duration reposDuree) {
this.image = image;
this.reposDuree = reposDuree;
}
override string toString() {
return format("%s(%s)", image, reposDuree);
}
}
/* Retourne une position au hasard */
Position positionHasard() {
return Position(uniform!"[]"(-10, 10),
uniform!"[]"(-10, 10));
}
/* Retourne une coordonnée à une distance d'au maximum un de la coordonnée courant */
int pasAuHasard(int courant) {
return courant + uniform!"[]"(-1, 1);
}
/* Retourne un voisin de la position passée en paramètre.
* Cela peut être un des voisins dans les huit directions,
* ou la position elle-même. */
Position voisinAuHasard(Position position) {
return Position(pasAuHasard(position.line),
pasAuHasard(position.column));
}
struct Job {
size_t robotId;
Position origine;
Duration reposDuree;
}
struct MovementMessage {
size_t robotId;
Movement mouvement;
}
void bougeurRobot(Job job) {
Position from = job.origine;
while (true) {
Thread.sleep(job.reposDuree);
Position to = voisinAuHasard(from);
Movement mouvement = Movement(from, to);
from = to;
ownerTid.send(MovementMessage(job.robotId, mouvement));
}
}
void main() {
/* Des robots avec des durées de repos différentes. */
Robot[] robots = [ new Robot("A", 600.msecs),
new Robot("B", 2000.msecs),
new Robot("C", 5000.msecs) ];
/* Démarre un bougeurRobot pour chaque robot. */
foreach (robotId, robot; robots) {
spawn(&bougeurRobot, Job(robotId,
positionHasard(),
robot.reposDuree));
}
/* Affiche les infos de chaque mouvement des robots. */
while (true) {
auto message = receiveOnly!MovementMessage();
/* Affiche le mouvement dans ce robot. */
writefln("%s %s",
robots[message.robotId], message.movement);
}
}
>>>]
Le programme affiche chaque mouvement jusqu'à ce que ce qu'il soit coupé:
[output <<<
A(600 ms) 6,2 -> 7,3
A(600 ms) 7,3 -> 8,3
A(600 ms) 8,3 -> 7,3
B(2 secs) -7,-4 -> -6,-3
A(600 ms) 7,3 -> 6,2
A(600 ms) 6,2 -> 7,1
A(600 ms) 7,1 (idle)
B(2 secs) -6,-3 (idle)
A(600 ms) 7,1 -> 7,2
A(600 ms) 7,2 -> 7,3
C(5 secs) -4,-4 -> -3,-5
A(600 ms) 7,3 -> 6,4
...
>>>]
Ce programme montre comment la concurrence par passage de messages peut être utile: Les mouvements des robots sont calculés indépendament par des [* threads] séparés qui ne savent rien les uns des autres. C'est le père qui traite le processus d'affichage en [* série] en reçevant les messages un par un.
]
[ = Attendre différents types de messages
[c receiveOnly()] peut attendre un type de message. [c receive()] par contre peut attendre plus d'un type de message. Cette fonction peut trier les messages et les distribuer à des fonctions déléguées. Quand un message arrive, son type est comparé à celui qu'accepte chaque fonction déléguée. La déléguée qui correspond au type prend alors le message en charge.
Par exemple, l'appel à la fonction [c receive()] suivant spécifie deux fonctions qui prennent en charge les messages de type [c string] et [c int] respectivement:
[code=d <<<
void fonctionFils(){
bool termine = false;
while(!termine){
void intHandler(int message){
writeln("prise en charge d'un int : ", message);
if(message == -1){
writeln("sortie");
termine = true;
}
}
void stringHandler(string message){
writeln("prise en charge d'un string : ", message);
}
receive(&intHandler, &stringHandler);
}
}
>>>]
Les messages de type [c int] correspondront à [c intHandler()] et les messages de type [c string] correspondront à [c stringHandler()]. Le [* thread] fils ci-avant peut être testé dans le programme suivant:
[code=d <<<
import std.stdio;
import std.concurrency;
//...
void main(){
auto fils = spawn(&fonctionFils);
fils.send(10);
fils.send(42);
fils.send("hello");
fils.send(-1);
}
>>>]
La sortie du programme montre que les messages sont bien répartis aux fonctions correspondantes à la réception:
[output <<<
prise en charge d'un int : 10
prise en charge d'un int : 42
prise en charge d'un string : hello
prise en charge d'un int : -1
sortie
>>>]
Les lambdas-fonctions et les objets qui implémentent la fonction membre [c opCall()] peuvent aussi être passés en paramètre à [c receive()] en tant que fonctions qui prennent en charge des messages. Le programme suivant gère les messages en les passant à des lambda-fonctions. Il définie également un type [c Exit] utilisé dans la communication avec le [* thread] pour préciser si oui ou non il faut terminer le programme. Utiliser un type explicité pour cela est plus clair que d'envoyer une valeur arbitraire comme -1, tel que nous l'avons fait dans l'exemple précedent.
Il y a trois fonctions anonymes ci-après qui sont passées à [c receive()] comme fonctions qui prennent en charge des messages:
[code=d <<<
import std.stdio;
import std.concurrency;
struct Exit(){
}
void fonctionFils(){
bool termine = false;
while(!termine){
receive(
(int message) {
writeln("message int : ", message);
},
(string message) {
writeln("message string : ", message);
},
(Exit message) {
writeln("exiting");
termine = true;
}
);
}
}
void main(){
auto fils = spawn(&fonctionFils);
fils.send(10);
fils.send(42);
fils.send("coucou");
fils.send(Exit());
}
>>>]
]
[ == Recevoir n'importe quel type de message
[c std.variant.Variant] est un type qui peut encapsuler n'importe quel type de données. Les messages qui ne correspondent à aucune fonction spécifiées avant dans la liste des arguments de [c receive()] sont toujours passées à la fonction qui prend un [c Variant] en paramètre:
[code=d <<<
import std.stdio;
import std.concurrency;
void fonctionFils(){
receive(
(int message){
/* ... */
},
(string message){
/* ... */
},
(Variant message){
writeln("Message innatendu : ", message);
}
);
}
struct SpecialMessage{
// ...
}
void main(){
auto fils = spawn(&fonctionFils);
fils.send(SpecialMessage);
}
>>>]
La sortie:
[output <<<
Message innatendu : SpecialMessage()
>>>]
Les détails concernant [c Variant] sortent du cadre de ce chapitre.
]
[ = Attendre des messages pendant un certain temps
Il se peut qu'on veuille attendre un message uniquement dans un certain délai de temps. Il se peut par exemple que l'expéditeur ai fini son exécution. [c receiveTimeout()] empêche le [* thread] recevant d'attendre indéfiniment. La fonction renvoie [c true] si un message a été reçu et [c false] dans le cas contraire.
[code=d <<<
import std.stdio;
import std.concurrency;
import core.thread;
void fonctionFils() {
Thread.sleep(3.seconds);
ownerTid.send("bonjour");
}
void main() {
spawn(&fonctionFils);
writeln("En attente d'un message");
bool recu = false;
while (!recu) {
recu = receiveTimeout(600.msecs,
(string message) {
writeln("recu: ", message);
});
if (!recu) {
writeln("... Pas de message pour l'instant");
/* ... D'autres opérations peuvent être effectuées ici ...*/
}
}
}
>>>]
Le père ci-avant attend pour un message pendant 600 millisecondes. Il peut continuer à exécuter d'autre opération si aucun message ne s'éxecute pendant ce temps.
[output <<<
En attente d'un message
... Pas de message pour l'instant
... Pas de message pour l'instant
... Pas de message pour l'instant
recu: bonjour
>>>]
]
[ = Exceptions pendant l'exécution du [* thread] fils
Comme nous l'avons vu dans le chapitre précedent, quand on utilise [c std.parallelism], une exception levée dans une tâche peut être automatiquement attrapée par le [* thread] père. Cela permet de traiter ces exceptions:
[code=d <<<
try{
theTask.yieldForce();
} catch (Exception e) {
writefln("Erreur détectée dans la tâche : '%s'", exec.msg);
}
>>>]
[c std.concurrency] ne permet pas de base de traiter les exceptions de cette façon. Cepe dant, les exceptions peuvent être attrapées et envoyées explicitement par le [* thread] fils. Comme nous le verrons ci-après, il est aussi possible de reçevoir des exceptions [c OwnerTerminated] et [c LinkTerminated] en tant que message.
La fonction [c calculer()] ci-après reçoit des messages de type [c string], les converti en [c double], leur ajoute 0.5 et renvoie le résultat par message:
[code=d <<<
void calculer(){
while(true){
auto message = receiveOnly!string();
ownerTid.send(to!double(message) + 0.5);
}
}
>>>]
La fonction [c to!double()] peut lever une exception si la chaîne de caractère n'est pas convertible en valeur [c double]. Comme cette exception entrainerait la fin de l'éxecution de la fonction fille, le père dans le programme suivant ne pourrait recevoir qu'un seul message:
[code=d <<<
import std.stdio;
import std.concurrency;
import std.conv;
// ...
void main() {
Tid calculator = spawn(&calculer);
calculator.send("1.2");
calculator.send("hello"); // ← Entrée incorrecte
calculator.send("3.4");
foreach (i; 0 .. 3) {
auto message = receiveOnly!double();
writefln("résultat %s: %s", i, message);
}
}
>>>]
Le père reçoit la réponse 1.7 pour "1.2", mais parce que la fonction fille s'est terminée, le père sera bloqué à attendre un message qui n'arrivera jamais.
[output <<<
result 0: 1.7
← waiting for a message that will never arrive
>>>]
Ce que peut faire la fonction fille, c'est capter l'exception et l'envoyer dans un message d'erreur spécial. La programme suivant envoie la raison de l'échec dans une structure [c EchecCalcul] dans un message. De plus, ce programme utilise un type [c Exit] pour signaler quand la fonction fille doit se terminer:
[code=d <<<
import std.stdio;
import std.concurrency;
import std.conv;
struct EchecCalcul {
string reason;
}
struct Exit {
}
void calculate() {
bool termine = false;
while (!termine) {
receive(
(string message) {
try {
ownerTid.send(to!double(message) + 0.5);
} catch (Exception exc) {
ownerTid.send(EchecCalcul(exc.msg));
}
},
(Exit message) {
termine = true;
});
}
}
void main() {
Tid calculator = spawn(&calculate);
calculator.send("1.2");
calculator.send("hello"); // ← entrée incorrecte
calculator.send("3.4");
calculator.send(Exit());
foreach (i; 0 .. 3) {
writef("resultat %s: ", i);
receive(
(double message) {
writeln(message);
},
(EchecCalcul message) {
writefln("ERREUR! '%s'", message.reason);
});
}
}
>>>]
Cette fois-ci, la raison de l'échec est affichée par le père:
[output <<<
resultat 0: 1.7
resultat 1: ERREUR! 'no digits seen'
resultat 2: 3.9
>>>]
Un autre moyen de faire ça serait d'envoyer l'exception en elle-même au [* thread] père. La père peut alors utiliser l'exception ou la lever à nouveau:
[code=d <<<
// ... Dans le fils ...
try{
//...
} catch (share(Exception) exc){
ownerTid.send(exc);
}},
// ... Dans le père ...
receive(
//...
(shared(Exception) exc){
throw exc;
}
);
>>>]
]
[ = Détecter la fin de l'éxecution d'un [* thread]
Les [* threads] peuvent détecter la fin de l'exécution d'un receveur de message.
[ == L'exception [c OwnerTerminated]
Cette exception est levée à la réception d'un message du père, si le père a terminé son exécution. Le [* thread] père suivant se contente d'envoyer deux messages à son fils et se termine. Cela lance une [c OwnerTerminated] dans le [* thread] fils:
[code=d <<<
import std.stdio;
import std.concurrency;
void main() {
spawn(&fonctionIntermediaire);
}
void fonctionIntermediaire() {
auto fils = spawn(&fonctionFils);
fils.send(1);
fils.send(2);
} // ← Se termine après avoir envoyé deux messages
void fonctionFils() {
while (true) {
auto m = receiveOnly!int(); // ← Une exception est levée
// si le père est terminé.
writeln("Message: ", m);
}
}
>>>]
La sortie~ :
[output <<<
Message: 1
Message: 2
std.concurrency.OwnerTerminated@std/concurrency.d(248):
Owner terminated
>>>]
Le fils peut attraper l'exception est finir son exécution proprement:
[code=d <<<
void fonctionFille(){
bool termine = false;
while(!termine){
try{
auto m = receiveOnly!int();
writeln("Message: ", m);
} catch(OwnerTerminated exc){
writeln("La fonction mère est terminée.");
termine = true;
}
}
}
>>>]
La sortie:
[output <<<
Message: 1
Message: 2
La fonction mère est terminée.
>>>]
Nous allons voir dans la prochaine partie que cette exception peut aussi être reçu en tant que message.
]
[ == L'exception [c LinkTerminated]
[c spawnLinked()] est utilisé de la même façon que [c spawn()]. Quand un [* thread] fils lancé par [c spawnLinked()] se termine, le [* thread] père est informé par le lancement d'une exception [c LinkTerminated]:
[code=d <<<
import std.stdio;
import std.concurrency;
void main() {
auto fils = spawnLinked(&fonctionFille);
while (true) {
auto m = receiveOnly!int(); // ← Une exception est
// levée si le fils
// est terminé.
writeln("Message: ", m);
}
}
void fonctionFille() {
ownerTid.send(10);
ownerTid.send(20);
} // ← Se termine après avoir envoyé deux valeurs.
>>>]
Le fils ci-avant se termine après avoir envoyé deux messaes. Comme il a été démarré par [c spawnLinked()], le père est notifié de la fin de l'éxecution du fils par une exception [c LinkTerminated].
[output <<<
Message: 10
Message: 20
std.concurrency.LinkTerminated@std/concurrency.d(263):
Link terminated
>>>]
Le père peut attraper cette exception et faire quelque chose pour terminer son exécution proprement:
[code=d <<<
bool termine = false;
while (!termine) {
try {
auto m = receiveOnly!int();
writeln("Message: ", m);
} catch (LinkTerminated exc) {
writeln("Le fils s'est terminé");
termine = true;
}
}
>>>]
]
[ == Recevoir les exceptions en tant que messages
Les exceptions [c OwnerTerminated] et [c LinkTerminated] peuvent également être reçues comme des messages. Le code suivant le montre pour une exception [c OwnerTerminated]:
[code=d <<<
bool termine = false;
while (!termine) {
receive(
(int message) {
writeln("Message: ", message);
},
(OwnerTerminated exc) {
writeln("Le fils s'est terminé, on quitte le programme.");
termine = true;
}
);
}
>>>]
]
]
[ = Gestion de boîte aux lettres
Chaque [* thread] a une boîte aux lettres dédiée qui contient tout les messages envoyés à ce [* thread]. Le nombre de message dans une boîte aux lettres peut croître ou décroître en fonction du temps que met le [* thread] à recevoir les réponses et à y répondre. Une boîte aux lettres qui grossit continuellement sature le système et résulte probablement d'un défaut dans la conception du programme. Cela peut également signifier que le [* thread] ne traitera jamais les messages les plus récents.
[c setMaxMailboxSite()] est une fonction utilisée pour limiter le nombre de messages qu'une boîte aux lettres peut contenir. Ses trois paramètres précisent la boîte aux lettres, le nombre maximum de messages qu'elle peut contenir, et ce qui doit être fait quand la boîte aux lettres est pleine, dans cet ordre. Il y a quatre options pour le dernier paramètre:
- [c OnCrowding.block]: L'expéditeur est mis en pause jusqu'à ce qu'il y ai de la place dans la boîte aux lettres.
- [c OnCrowding.ignore]: Le message est ignoré.
- [c OnCrowding.throwException]: une exception [c MailboxFull] est levée à l'expéditeur du message.
- une fonction de type [bool function(Tid)]: la fonction spécifiée est appellée
Avant d'examiner un exemple de [c setMaxMailboxSize()], parlons de ce qui peut ammener une boîte aux lettres à grossir continuellement. Dans le programme suivant, le fils envoie des messages les uns à la suite des autres mais le père passe un peu de temps à recevoir chaque message:
[code=d <<<
/* ATTENTION: votre système peut avoir
* un comportement imprédictible quand
* ce programme est exécuté.*/
import std.concurrency;
import core.thread;
void fonctionFille() {
while (true) {
ownerTid.send(42); // ← Produit continuellement des messages
}
}
void main() {
spawn(&fonctionFille);
while (true) {
receive(
(int message) {
//Prend du temps à chaque message
Thread.sleep(1.seconds);
});
}
}
>>>]
Parce que le destinataire est plus lent que l'expéditeur, la mémoire que le programme ci-avant utilise va grandir sans s'arrêter. Pour se prévenir de ce phénomène, le père peut limite la taille de sa boîte aux lettres avant de démarrer la fonction fille:
[code=d <<<
void main() {
setMaxMailboxSize(thisTid, 1000, OnCrowding.block);
spawn(&fonctionFille);
// ...
}
>>>]
L'appel à [c setMaxMailboxSize()] défini la taille de la boîte aux lettres du [* thread] principal à 1000. [c OnCrowding.block] fait attendre l'expéditeur jusqu'à ce qu'il y ai de la place dans la boîte aux lettres.
L'exemple suivant utilise [c onCrowding.throwException], qui lance une exception [c MailboxFull] à l'expéditeur quand la boîte aux lettres est pleine:
[code=d <<<
import std.concurrency;
import core.thread;
void fonctionFille() {
while (true) {
try {
ownerTid.send(42);
} catch (MailboxFull exc) {
/* Echec de l'envoie, on réessaye dans une seconde. */
Thread.sleep(1.msecs);
}
}
}
void main() {
setMaxMailboxSize(thisTid, 1000, OnCrowding.throwException);
spawn(&fonctionFille);
while (true) {
receive(
(int message) {
Thread.sleep(1.seconds);
});
}
}
>>>]
]
[ = Messages prioritaires
Certains messages peuvent être prioritaires par rapport aux messages ordinaires grâce à la fonction [c prioritySend()]. Ces messages sont traités avant les autres dans la boîte aux lettres:
[code=d <<< prioritySend(pereTid, ImportantMessage(100)); >>>]
Si le destinataire n'a pas de fonction qui gère le type du message prioritaire, une excepton [c PriorityMessageException] est levée~ :
[output <<<
std.concurrency.PriorityMessageException@std/concurrency.d(280):
Priority message
>>>]
]
[ = [* Threads] nommés
Dans les programmes simples que nous avons vu avant, il était facile d'utiliser les ids des pères et des fils. Transmettre les ids de [* threads] en [* threads] peut être extrêmement compliqué dans des programmes qui utilisent plus que quelques [* threads]. Pour rendre tout cela moins complexe, il est possible d'assigner des noms aux [* threads] qui sont accessibles depuis n'importe quel [* thread].
Les trois fonctions suivantes sont une interface pour accéder à un tableau associatif au quel chaque [* thread] a accès:
- [c register()]~ : associe un [* thread] à un nom.
- [c locate()]~ : retourne le [* thread] associé à un nom. Si il n'y a pas de [* thread] associé au nom, alors [c Tid.init] est retourné.
- [c unregister()]~ : désassocie l'association entre un [* thread] et un nom.
La programme suivant démarre deux [* threads] qui communiquement entre eux par leurs noms. Ces [* threads] s'envoient des messages l'un à l'autre jusqu'à ce qu'ils recoivent un message [c Exit]:
[code=d <<<
import std.stdio;
import std.concurrency;
import core.thread;
struct Exit{}{
}
void main(){
// Un thread dont le partenaire s'appelle "second"
auto premier = spawn(&player, "second");
register("premier", premier);
scope(exit) unregister("premier");
// Un thread dont le partenaire s'appelle "premier"
auto second = spawn(&player, "first");
register("second", second);
scope(exit) unregister("second");
Thread.sleep(2.seconds);
prioritySend(first, Exit());
prioritySend(second, Exit());
// Pour que les appels à unregister() soient effectués, main()
// doit attendre que les fils aient terminé leur exécution.
thread_joinAll();
}
void player(string nomDuPartenaire){
Tid partenaire;
while(partenaire == Tid.init){
Thread.sleep(1.msecs);
partnaire = locate(nomDuPartenaire);
}
bool termine = false;
while(!termine){
partner.send("hello " ~ nomDuPartenaire);
receive(
(string message){
writeln("Message: ", message);
Thread.sleep(500.msecs);
},
(Exit message){
writefln("%s, Je me termine", nomDuPartenaire);
termine = true;
}
);
}
}
>>>]
L'appel à [c thread_joinAll()] que l'on peut voir à la fin du main fait attendre le père jusqu'à ce que les fils aient terminé.
La sortie:
[output <<<
Message: hello second
Message: hello first
Message: hello second
Message: hello first
Message: hello first
Message: hello second
Message: hello first
Message: hello second
second, Je me termine.
first, Je me termine.
>>>]
]
[ = Résumé
- Quand les [* threads] ne doivent pas communiquer entre eux, préferez le [* parallélisme], qui a été vu dans le chapitre précédent. N'utilisez la concurrence que quand les [* threads] doivent interagir.
- La concurrence par partage de mémoire étant difficile à implémenter correctement, préferez la concurrence par messages, qui a été couverte dans ce chapitre.
- [c spawn()] et [c spawnLinked()] démarrent des [* threads].
- [c thisTid] est l'id du [* thread] courrant.
- [c ownerTid] est l'id du [* thread] père du [* thread] courrant.
- [c send()] et [c prioritySend()] envoient des messages.
- [c receiveOnly()], [c receive()] et [c receiveTimeout()] attendent des messages.
- [c Variant] correspond à n'importe quel type de message.
- [c setMaxMailboxSize()] limite la taille de la boîte aux lettres.
- [c register()], [c unregister()] et [c locate()] permettent d'identifier les [* threads] par des noms.
- Des exceptions peuvent être levées pendant le passage de messages~ : [c MessageMismatch], [c OwnerTerminated], [c LinkTerminated], [c MailboxFull] et [c PriorityMessageException].
- Le père peut attraper automatiquement les exceptions levées par un fils.
]