· 9 years ago · Oct 10, 2016, 08:58 PM
1var dnode = require('./node_modules/dnode');
2var _ = require('lodash');
3var schedule = require('node-schedule');
4var NR = require("node-resque");
5var promise = require('bluebird');
6var options = {
7 promiseLib: promise // override the default (es6 promise)
8};
9var pgp = require('./node_modules/pg-promise')(options);
10var cn = {
11 host: 'localhost',
12 port: 5432,
13 database: 'postgres',
14 user: 'postgres',
15 password: 'apassword'
16};
17var db = pgp(cn);
18
19
20/////////////////////////////
21// SET UP REDIS CONNECTION //
22/////////////////////////////
23
24var connectionDetails = {
25
26 pkg: 'ioredis',
27 host: '127.0.0.1',
28 password: null,
29 port: 6379,
30 database: 0,
31 // namespace: 'resque',
32 // looping: true,
33 // options: {password: 'abc'},
34};
35
36
37// var queue = new NR.queue({connection: connectionDetails}, jobs);
38
39///////////////////////
40// START A SCHEDULER //
41///////////////////////
42
43var scheduler = new NR.scheduler({connection: connectionDetails});
44 scheduler.connect(function(){
45 scheduler.start();
46});
47
48scheduler.on('start', function(){ console.log("scheduler started"); });
49scheduler.on('end', function(){ console.log("scheduler ended"); });
50scheduler.on('poll', function(){ console.log("scheduler polling"); });
51scheduler.on('master', function(state){ console.log("scheduler became master"); });
52scheduler.on('error', function(error){ console.log("scheduler error >> " + error); });
53scheduler.on('working_timestamp', function(timestamp){ console.log("scheduler working timestamp " + timestamp); });
54scheduler.on('transferred_job', function(timestamp, job){ console.log("scheduler enquing job " + timestamp + " >> " + JSON.stringify(job)); });
55
56
57var shutdown = function(){
58 scheduler.end(function(){
59 worker.end(function(){
60 console.log('bye.');
61 // db.query('UPDATE masteruser SET is_live = false')
62 // .then(function(data){
63 // console.log('updated all entries in masteruser is_live to deactivated!');
64 process.exit();
65 // })
66 });
67 });
68};
69
70process.on('SIGTERM', shutdown);
71process.on('SIGINT', shutdown);
72
73
74//////////////////
75// WORKER TASKS //
76//////////////////
77
78var jobs = {
79 "followTask": {
80
81 plugins: [ 'jobLock', 'retry' ],
82 pluginOptions: {
83 jobLock: {},
84 retry: {
85 retryLimit: 3,
86 retryDelay: (1000 * 5),
87 }
88 },
89
90 perform: function(screen_name, oauthToken, oauthTokenSecret, callback){
91
92 var gredis = new NR.gopiredis({connection: connectionDetails});
93 gredis.connect(function(){
94 gredis.popusertofollow(screen_name, function(usertofollow){
95 dnode.connect(7070, function (remote, conn) {
96 remote.postNewFollow(usertofollow, oauthToken, oauthTokenSecret, function (test) {
97
98 //TO DO ADD USER_ID
99 db.query('INSERT INTO newfollows(screen_name, followed_screen_name, date_added, user_id) SELECT ${screen_name},${followed_screen_name},CURRENT_DATE,${user_id}', {
100 screen_name: screen_name,
101 followed_screen_name: usertofollow,
102 user_id:test['id_str']
103 })
104 .then(function(data){
105 console.log('inserted into new follows table')
106 })
107 });
108 });
109 });
110 });
111 callback(null,screen_name);
112 },
113 },
114
115 "populateRedis": {
116
117 plugins: [ 'jobLock', 'retry' ],
118 pluginOptions: {
119 jobLock: {},
120 retry: {
121 retryLimit: 3,
122 retryDelay: (1000 * 5),
123 }
124 },
125
126 perform: function(screen_name, oauthToken, oauthTokenSecret, callback){
127 var dbcall1 = [];
128 var dbcall2 = [];
129 var differenced = [];
130 var sjob;
131
132 db.any("select user_id from seeds where flag = 'true'")
133 .then(function (data){
134 for(i=0; i<Object.keys(data).length; i++) {
135 dbcall1.push(data[i]['user_id']);
136 }
137 })
138 .catch(function (error) {console.log(error);});
139
140 db.any("select user_id from newfollows where screen_name = ${sname}", {
141 sname: screen_name
142 })
143 .then(function (data){
144 for(i=0; i<Object.keys(data).length; i++) {
145 dbcall2.push(data[i]['user_id']);
146 }
147 differenced = _.difference(dbcall1,dbcall2);
148
149 //ADD FOLLOWABLE USERS INTO A REDIS SET
150 var gredis = new NR.gopiredis({connection: connectionDetails});
151 gredis.connect(function(){
152 gredis.adduserstofollow(screen_name, differenced);
153 });
154
155 //SCHEDULE
156 var sjob = schedule.scheduleJob(screen_name, '10,30,50 * * * * *', function(){
157 var queue = new NR.queue({connection: connectionDetails}, jobs);
158 queue.on('error', function(error){ console.log(error); });
159 queue.connect(function(){
160 if(scheduler.master){
161 console.log('>>> enquing scheduled job' + ':'+ screen_name);
162 queue.enqueue('nonpriority', "followTask", [screen_name, oauthToken, oauthTokenSecret]);
163 // queue.end(function(){console.log('queue connection closed')});
164 }
165 });
166 });
167 })
168 .catch(function (error) {console.log(error);});
169
170 callback(null,screen_name);
171 },
172 },
173};
174
175
176///////////////////////////////
177// START ONE WORKER PROCESS ///
178///////////////////////////////
179
180var worker = new NR.worker({connection: connectionDetails, queues: ['priority', 'nonpriority']}, jobs);
181worker.connect(function(){
182 worker.workerCleanup(); // optional: cleanup any previous improperly shutdown workers on this host
183 worker.start();
184});
185
186
187///////////////////////////////
188// REGESTERING WORKER EVENTS //
189///////////////////////////////
190
191worker.on('start', function(){ console.log("worker started"); });
192worker.on('end', function(){ console.log("worker ended"); });
193worker.on('cleaning_worker', function(worker, pid){ console.log("cleaning old worker " + worker); });
194worker.on('poll', function(queue){ console.log("worker polling " + queue); });
195worker.on('job', function(queue, job){ console.log("working job " + queue + " " + JSON.stringify(job)); });
196worker.on('reEnqueue', function(queue, job, plugin){ console.log("reEnqueue job (" + plugin + ") " + queue + " " + JSON.stringify(job)); });
197worker.on('success', function(queue, job, result){ console.log("job success " + queue + " " + JSON.stringify(job) + " >> " + result); });
198worker.on('failure', function(queue, job, failure){ console.log("job failure " + queue + " " + JSON.stringify(job) + " >> " + failure); });
199worker.on('error', function(queue, job, error){ console.log("error " + queue + " " + JSON.stringify(job) + " >> " + error); });
200worker.on('pause', function(){ console.log("worker paused"); });