· 7 years ago · Jul 29, 2018, 12:44 AM
1var settings = {
2 callback: {
3 hostname: '127.0.0.1',
4 port: 4567,
5 path: '/finish_production',
6 secretKey: 'my-super-secret-key'
7 }
8}
9
10var http = require('http'),
11 uuid = require('uuid'),
12 redis = require('redis'),
13 express = require('express');
14
15var app = express.createServer(),
16 redisClient = redis.createClient();
17
18app.use(express.bodyParser());
19
20
21function addItemToQueue(queueIdentifier, item){
22 console.log("Start production of item " + item.uuid + ".");
23
24 var queueKey = "queues:" + queueIdentifier,
25 listKey = queueKey + ":items";
26
27 setTimeout(function(listKey){
28 redisClient.lpop(listKey, function(error, uuid){
29 redisClient.lindex(listKey, 0, function(error, nextItemUuid){
30 var oldItemKey = 'items:' + uuid;
31 redisClient.del(oldItemKey);
32
33 if(nextItemUuid){
34 var nextItemKey = 'items:' + nextItemUuid;
35
36 redisClient.hset(nextItemKey, 'status', 'active');
37
38 redisClient.hget(nextItemKey, 'duration', function(error, duration){
39 addItemToQueue(queueIdentifier, {
40 uuid: nextItemUuid,
41 duration: duration
42 });
43 });
44 }
45 else {
46 redisClient.hset(queueKey, 'status', 'idle');
47 }
48 });
49 });
50
51 var body = JSON.stringify({
52 uuid: item.uuid
53 }),
54 options = {
55 host: settings.callback.hostname,
56 port: settings.callback.port,
57 path: settings.callback.path,
58 method: 'POST',
59 headers: {
60 'X-PQS-Secret-Key': settings.callback.secretKey,
61 'Content-Type': 'application/json',
62 'Content-Length': body.length
63 }
64 },
65 request = http.request(options, function(){});
66
67 request.on('error', function(){});
68 request.end(body);
69
70 }, item.duration, listKey);
71}
72
73
74/**
75 * Create a queue.
76 * Request body must include a parameter.
77 * identifier: Identify the queue.
78 * TODO:
79 */
80app.post('/queues', function(request, response){
81 var queue = {
82 identifier: request.body.identifier,
83 status: 'idle'
84 },
85 queueKey = 'queues:' + queue.identifier;
86
87 redisClient.hmset(
88 queueKey,
89 'identifier', queue.identifier,
90 'status', queue.status
91 );
92
93 response.send(JSON.stringify(queue));
94});
95
96
97/**
98 * Add an item to the queue.
99 * Request body must include few parameters.
100 * identifier: Identify the item. Not used by the system.
101 * - duration: Exprimed in seconds.
102 * - callbackUrl: Callback URL where is sent the request when the item is finished.
103 *
104 * TODO: Create the queue on the fly.
105 */
106app.post('/queues/:identifier/items', function(request, response){
107 var queueIdentifier = request.param('identifier'),
108 queueKey = "queues:" + queueIdentifier;
109
110 redisClient.exists(queueKey, function(error, exists){
111 if(exists){
112 redisClient.hget(queueKey, 'status', function(error, queueStatus){
113 var itemUuid = uuid.generate().toLowerCase(),
114 itemKey = 'items:' + itemUuid,
115 listKey = queueKey + ":items",
116 duration = parseInt(request.body.duration * 1000);
117
118 if(queueStatus == 'idle'){
119 // Create the item as active, activate the queue and push the item to the list.
120 redisClient.hmset(
121 itemKey,
122 'status', 'active',
123 'identifier', request.body.identifier,
124 'duration', duration
125 );
126 redisClient.hset(queueKey, 'status', 'active');
127 redisClient.rpush(listKey, itemUuid);
128
129 addItemToQueue(queueIdentifier, {
130 uuid: itemUuid,
131 duration: duration
132 });
133 }
134 else {
135 // Create the item as idle and push the item to the list.
136 redisClient.hmset(
137 itemKey,
138 'status', 'idle',
139 'identifier', request.body.identifier,
140 'duration', request.body.duration * 1000
141 );
142 redisClient.rpush(listKey, itemUuid);
143 }
144
145 response.send(JSON.stringify({ queue_status: queueStatus }));
146 });
147 }
148 else {
149 response.send(JSON.stringify({
150 error: 'queues_does_not_exist'
151 }));
152 }
153 });
154});
155
156
157app.listen(1808, 'localhost');