· 9 years ago · Oct 11, 2016, 12:22 AM
1var dnode = require('./node_modules/dnode');
2var _ = require('lodash');
3var schedule = require('node-schedule');
4var NR = require("node-resque");
5var promise = require('bluebird');
6var options = {
7 promiseLib: promise // override the default (es6 promise)
8};
9var pgp = require('./node_modules/pg-promise')(options);
10var cn = {
11 host: 'localhost',
12 port: 5432,
13 database: 'postgres',
14 user: 'postgres',
15 password: 'apassword'
16};
17var db = pgp(cn);
18
19
20/////////////////////////////
21// SET UP REDIS CONNECTION //
22/////////////////////////////
23
24var connectionDetails = {
25
26 pkg: 'ioredis',
27 host: '127.0.0.1',
28 password: null,
29 port: 6379,
30 database: 0,
31 // namespace: 'resque',
32 // looping: true,
33 // options: {password: 'abc'},
34};
35
36
37// var queue = new NR.queue({connection: connectionDetails}, jobs);
38
39///////////////////////
40// START A SCHEDULER //
41///////////////////////
42
43var scheduler = new NR.scheduler({connection: connectionDetails});
44 scheduler.connect(function(){
45 scheduler.start();
46});
47
48scheduler.on('start', function(){ console.log("scheduler started"); });
49scheduler.on('end', function(){ console.log("scheduler ended"); });
50scheduler.on('poll', function(){ console.log("scheduler polling"); });
51scheduler.on('master', function(state){ console.log("scheduler became master"); });
52scheduler.on('error', function(error){ console.log("scheduler error >> " + error); });
53scheduler.on('working_timestamp', function(timestamp){ console.log("scheduler working timestamp " + timestamp); });
54scheduler.on('transferred_job', function(timestamp, job){ console.log("scheduler enquing job " + timestamp + " >> " + JSON.stringify(job)); });
55
56
57var shutdown = function(){
58 scheduler.end(function(){
59 worker.end(function(){
60 console.log('bye.');
61 // db.query('UPDATE masteruser SET is_live = false')
62 // .then(function(data){
63 // console.log('updated all entries in masteruser is_live to deactivated!');
64 process.exit();
65 // })
66 });
67 });
68};
69
70process.on('SIGTERM', shutdown);
71process.on('SIGINT', shutdown);
72
73
74//////////////////
75// WORKER TASKS //
76//////////////////
77
78var jobs = {
79 "followTask": {
80
81 plugins: [ 'jobLock', 'retry' ],
82 pluginOptions: {
83 jobLock: {},
84 retry: {
85 retryLimit: 3,
86 retryDelay: (1000 * 5),
87 }
88 },
89
90 perform: function(screen_name, oauthToken, oauthTokenSecret, callback){
91
92 var gredis = new NR.gopiredis({connection: connectionDetails});
93 gredis.connect(function(){
94 gredis.popusertofollow(screen_name, function(usertofollow){
95 dnode.connect(7070, function (remote, conn) {
96 remote.postNewFollow(usertofollow, oauthToken, oauthTokenSecret, function (test) {
97
98 //TO DO ADD USER_ID
99 db.query('INSERT INTO newfollows(screen_name, followed_screen_name, date_added, user_id) SELECT ${screen_name},${followed_screen_name},CURRENT_DATE,${user_id}', {
100 screen_name: screen_name,
101 followed_screen_name: usertofollow,
102 user_id:test['id_str']
103 })
104 .then(function(data){
105 console.log('inserted into new follows table');
106 gredis.end(function(){console.log('gredis ended inside pop redis')})
107
108 })
109 });
110 });
111 });
112 });
113 callback(null,screen_name);
114 },
115 },
116
117 "populateRedis": {
118
119 plugins: [ 'jobLock', 'retry' ],
120 pluginOptions: {
121 jobLock: {},
122 retry: {
123 retryLimit: 3,
124 retryDelay: (1000 * 5),
125 }
126 },
127
128 perform: function(screen_name, oauthToken, oauthTokenSecret, callback){
129 var dbcall1 = [];
130 var dbcall2 = [];
131 var differenced = [];
132 var sjob;
133
134 db.any("select user_id from seeds where flag = 'true'")
135 .then(function (data){
136 for(i=0; i<Object.keys(data).length; i++) {
137 dbcall1.push(data[i]['user_id']);
138 }
139 })
140 .catch(function (error) {console.log(error);});
141
142 db.any("select user_id from newfollows where screen_name = ${sname}", {
143 sname: screen_name
144 })
145 .then(function (data){
146 for(i=0; i<Object.keys(data).length; i++) {
147 dbcall2.push(data[i]['user_id']);
148 }
149 differenced = _.difference(dbcall1,dbcall2);
150
151 //ADD FOLLOWABLE USERS INTO A REDIS SET
152 var gredis = new NR.gopiredis({connection: connectionDetails});
153 gredis.connect(function(){
154 gredis.adduserstofollow(screen_name, differenced);
155 });
156
157 //SCHEDULE
158 var sjob = schedule.scheduleJob(screen_name, '10,30,50 * * * * *', function(){
159 var queue = new NR.queue({connection: connectionDetails}, jobs);
160 queue.on('error', function(error){ console.log(error); });
161 queue.connect(function(){
162 if(scheduler.master){
163 console.log('>>> enquing scheduled job' + ':'+ screen_name);
164 queue.enqueue('nonpriority', "followTask", [screen_name, oauthToken, oauthTokenSecret]);
165 // queue.end(function(){console.log('queue connection closed')});
166 }
167 });
168 });
169 })
170 .catch(function (error) {console.log(error);});
171
172 callback(null,screen_name);
173 },
174 },
175};
176
177
178///////////////////////////////
179// START ONE WORKER PROCESS ///
180///////////////////////////////
181
182var worker = new NR.worker({connection: connectionDetails, queues: ['priority', 'nonpriority']}, jobs);
183worker.connect(function(){
184 worker.workerCleanup(); // optional: cleanup any previous improperly shutdown workers on this host
185 worker.start();
186});
187
188
189///////////////////////////////
190// REGESTERING WORKER EVENTS //
191///////////////////////////////
192
193worker.on('start', function(){ console.log("worker started"); });
194worker.on('end', function(){ console.log("worker ended"); });
195worker.on('cleaning_worker', function(worker, pid){ console.log("cleaning old worker " + worker); });
196worker.on('poll', function(queue){ console.log("worker polling " + queue); });
197worker.on('job', function(queue, job){ console.log("working job " + queue + " " + JSON.stringify(job)); });
198worker.on('reEnqueue', function(queue, job, plugin){ console.log("reEnqueue job (" + plugin + ") " + queue + " " + JSON.stringify(job)); });
199worker.on('success', function(queue, job, result){ console.log("job success " + queue + " " + JSON.stringify(job) + " >> " + result); });
200worker.on('failure', function(queue, job, failure){ console.log("job failure " + queue + " " + JSON.stringify(job) + " >> " + failure); });
201worker.on('error', function(queue, job, error){ console.log("error " + queue + " " + JSON.stringify(job) + " >> " + error); });
202worker.on('pause', function(){ console.log("worker paused"); });