Master de II. ULL. 1er cuatrimestre. 2020/2021
En el paradigma de paralelismo conocido como Farm o Granja de Procesadores la tarea que se quiere realizar es dividida en un subconjunto de sub-tareas bastante mayor que el número de procesadores disponibles.
Una ventaja que tiene este paradigma es que consigue equilibrar la carga de trabajo entre las máquinas,
En el siguiente código mostramos como usar los sockets ROUTER y DEALER de 0MQ junto con los clusters de Node.js para crear un borrador de una granja de trabajadores:
Fichero connecting-robust-microservices-chapter-4/microservices/dealer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
'use strict';
const PORT = require("./port.js");
const ins = require("./ins.js");
const cluster = require("cluster");
const zmq = require("zeromq");
const numWorkers = require("os").cpus().length;
const randomBetween = (min, max) => Math.floor(Math.random() * (max - min) + min);
function workerTask() {
const dealer = zmq.socket('dealer');
dealer.identity = process.env["identity"];
console.log("identity "+dealer.identity+
" process "+process.pid+" port = "+PORT);
dealer.connect('tcp://localhost:'+PORT);
let total = 0;
const sendMessage = () => dealer.send(['ready']);
// Get workload from broker, until finished
dealer.on('message', function onMessage(...args) {
// console.log("Inside Worker. args = "+ins(args.map(x => x.toString())));
const workload = args[0].toString('utf8');
//console.log("Inside Worker. workload = "+workload);
if (workload === 'stop') {
console.log('Completed: '+total+' tasks ('+dealer.identity+' '+process.pid+')');
dealer.removeListener('message', onMessage);
// https://nodejs.org/api/events.html#events_emitter_removelistener_eventname_listener is a method of EventsEmitter
dealer.close();
return;
}
total++;
// Simulate some work
setTimeout(sendMessage, randomBetween(0, 500));
});
// Tell the broker we're ready for work
sendMessage();
}
function main() {
const broker = zmq.socket('router');
//broker.bindSync('tcp://*:5671');
broker.bind('tcp://*:'+PORT);
let endTime = Date.now() + 5000
, workersFired = 0;
broker.on('message', function (...args) {
// console.log("Inside Master. args = "+ins(args.map(x => x.toString())));
const identity = args[0]
, now = Date.now();
if (now < endTime) {
broker.send([identity, 'more work']);
} else {
broker.send([identity, 'stop']);
workersFired++;
if (workersFired === numWorkers) {
setImmediate(function () { // See https://nodejs.org/api/timers.html#timers_setimmediate_callback_args
broker.close();
cluster.disconnect();
});
}
}
});
for (let i=0;i<numWorkers;i++) {
cluster.fork({identity: "worker"+i});
}
}
if (cluster.isMaster) main();
else workerTask();
Observa que pese a que el worker envía solamente [ 'ready' ]
:
1
2
3
4
const sendMessage = () => dealer.send(['ready']);
...
// Simulate some work
setTimeout(sendMessage, randomBetween(0, 500));
En el master recibimos como primer elemento la identity del worker:
1
2
3
broker.on('message', function (...args) {
// console.log("Inside Master. args = "+ins(args.map(x => x.toString())));
const identity = args[0]
Consultemos la documentación de 0MQ:
The zmq_socket()
man page describes it thus:
When receiving messages a
ZMQ_ROUTER
socket shall prepend a message part containing the identity of the originating peer to the message before passing it to the application. Messages received are fair-queued from among all connected peers. When sending messages aZMQ_ROUTER
socket shall remove the first part of the message and use it to determine the identity of the peer the message shall be routed to.
Cuando ejecuto este programa, obtengo una salida parecida a esta:
1
2
3
4
5
6
7
8
9
[~/.../microservices(master)]$ node dealer.js
identity worker0 process 56820 port = 60300
identity worker2 process 56822 port = 60300
identity worker3 process 56823 port = 60300
identity worker1 process 56821 port = 60300
Completed: 24 tasks (worker3 56823)
Completed: 22 tasks (worker2 56822)
Completed: 19 tasks (worker1 56821)
Completed: 18 tasks (worker0 56820)
Usando 0MQ y paralelismo de granja, compute en paralelo una aproximación al número π aprovechando la siguiente fórmula:
\[\int_{0}^{1} \frac{4}{(1+x^2)} dx = 4 \arctan(x) |_{0}^{1}\ = 4 ( \frac{\pi}{4} - 0) = \pi\]Para computar π aproxime la integral mediante sumas de áreas de rectángulos:
Escriba un chat de línea de comandos - con rooms - usando 0MQ.
En el servidor:
1
2
3
4
5
6
7
8
9
publisher.send( ["room-1", // topic
JSON.stringify(
{
type: "message",
from: user,
content: content
}
)
]
En el cliente:
1
2
3
4
5
subscriber.on("message", (room, data) => {
console.log(room.toString());
const message = JSON.parse(data);
...
});
Fichero local/src/javascript/learning/readline-examples/small-cli.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
const readline = require('readline');
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
prompt: 'DSI> '
});
const bye = () => {
console.log('Have a great day!');
process.exit(0);
};
const methods = {
hello: () => console.log("world"),
exit: () => bye(),
default: (line) => console.log(`Say what? I might have heard '${line.trim()}'`),
};
rl.prompt();
rl.on('line', (line) => {
const choice = line.trim();
if (choice in methods) methods[choice]();
else methods['default'](line);
rl.prompt();
}).on('close', () => bye);