-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer_program.js
44 lines (41 loc) · 1.34 KB
/
consumer_program.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
var config = require('./consumer_config.js');
var stompit = require('stompit');
var log4js = require('log4js');
var handler = require('./consumer_message_handler.js');
// configuring logging
log4js.loadAppender('console');
log4js.loadAppender('file');
log4js.addAppender(log4js.appenders.file('logs/consumer.log'), 'consumer');
var logger = log4js.getLogger('consumer');
logger.setLevel('TRACE');
//
let totalConsumed = 0;
stompit.connect(config.connectionOptions, (error, client) => {
if (error) {
return logger.error('Connection Error: ' + error);
}
var subscribeHeaders = {
'destination': '/queue/' + config.queueName,
'ack': 'client-individual'
}
client.subscribe(subscribeHeaders, (error, message) => {
if (error) {
return logger.error('Subscribtion Error: ' + error)
}
message.readString('utf-8', (error, body) => {
if (error) {
return logger.error('Consumption Error: ' + error)
}
client.ack(message)
totalConsumed++;
messageObj = JSON.parse(body);
logger.info('message consumed - message id: ' + messageObj.message_id + ' - total (' + totalConsumed + ')');
handler.handleMessage(messageObj, (error, lastId) => {
if (error) {
return logger.error(error);
}
logger.info('>>> Patch #' + lastId + ' sent to redis');
});
});
});
});