· 7 years ago · May 15, 2018, 11:46 AM
1package scheduler.threads;
2
3import java.io.BufferedReader;
4import java.io.IOException;
5import java.io.InputStreamReader;
6import java.io.PrintWriter;
7import java.net.Socket;
8import java.security.NoSuchAlgorithmException;
9import java.security.SecureRandom;
10import java.util.ArrayList;
11import java.util.List;
12import java.util.concurrent.ExecutionException;
13import java.util.concurrent.Future;
14
15import javax.crypto.Cipher;
16import javax.crypto.KeyGenerator;
17import javax.crypto.SecretKey;
18import javax.crypto.spec.IvParameterSpec;
19
20import org.bouncycastle.util.encoders.Base64;
21
22import managementcomponent.Channel;
23import managementcomponent.ChannelBase64;
24import managementcomponent.TCPChannel;
25
26import scheduler.Scheduler;
27import scheduler.TaskEngine;
28import scheduler.ThreadPool;
29import scheduler.enumerations.Load;
30import scheduler.enumerations.TaskEngineState;
31
32public class ThreadManageManagementComponentMessages implements Runnable {
33
34 private Scheduler _scheduler = null;
35
36 private Socket _socket = null;
37
38 private BufferedReader _in = null;
39
40 private PrintWriter _out = null;
41
42 private String _identifier = null;
43
44 private Boolean _active = true;
45
46 private List<Future<TaskEngine>> _futureList = null;
47
48 public ThreadManageManagementComponentMessages(Scheduler scheduler, Socket socket) {
49 _scheduler = scheduler;
50 _socket = socket;
51 _scheduler.get_clients().put(_socket.toString(), this);
52 }
53
54 @Override
55 public void run() {
56 // TODO authenticate
57 Channel channel = new TCPChannel();
58 channel = new ChannelBase64(channel);
59 channel.setComponentSocket(_socket);
60
61 try {
62
63 byte[] msg = channel.receiveMessage();
64 String message = new String(msg);
65 System.out.println("message=" + message.length());
66
67 try {
68 // System.out.println(_scheduler.privateKey);
69 Cipher c = Cipher.getInstance("RSA/NONE/OAEPWithSHA384AndMGF1Padding");
70 c.init(Cipher.DECRYPT_MODE, _scheduler.privateKey);
71 byte[] rsaDecryptedMsg = c.doFinal(message.getBytes());
72
73 message = new String(rsaDecryptedMsg);
74 } catch (Exception e) {
75 e.printStackTrace();
76 }
77 message += " ";
78 String[] parts = message.split("!login ");
79 String command = parts[0];
80 String componentChallenge = parts[1];
81
82 String challenge = channel.generateChallenge();
83
84 // Get the KeyGenerator
85 KeyGenerator kgen = KeyGenerator.getInstance("AES");
86 kgen.init(256);
87 // Generate the secret key specs.
88 SecretKey skey = kgen.generateKey();
89
90 SecureRandom secureRandom = new SecureRandom();
91 final byte[] number = new byte[16];
92 secureRandom.nextBytes(number);
93
94 IvParameterSpec iv = new IvParameterSpec(number.clone());
95
96 _scheduler.setPublicKey("manager");
97 channel.setPublicKey(_scheduler.publicKey);
98 System.out.println(componentChallenge);
99
100 try {
101 Cipher c = Cipher.getInstance("RSA/NONE/OAEPWithSHA384AndMGF1Padding");
102 c.init(Cipher.ENCRYPT_MODE, _scheduler.publicKey);
103 String s = "!ok " + new String(Base64.encode(componentChallenge.getBytes())) + " "
104 + new String(Base64.encode(challenge.getBytes())) + " "
105 + new String(Base64.encode(skey.getEncoded())) + " "
106 + new String(Base64.encode(iv.getIV()));
107 byte[] rsaEncryptedMsg = c.doFinal(s.getBytes());
108
109 channel.sendMessage(rsaEncryptedMsg);
110 } catch (Exception e) {
111 e.printStackTrace();
112 }
113
114
115 //System.out.println(message);
116 } catch (IOException e2) {
117 e2.printStackTrace();
118 } catch (NoSuchAlgorithmException e) {
119 // TODO Auto-generated catch block
120 e.printStackTrace();
121 }
122
123 try {
124 _futureList = new ArrayList<Future<TaskEngine>>();
125 _in = new BufferedReader(new InputStreamReader(_socket.getInputStream()));
126 _out = new PrintWriter(_socket.getOutputStream(), true);
127 } catch (IOException e) {
128 e.printStackTrace();
129 }
130
131 System.out.println("Managementcomponent connected");
132 String input = "";
133 try {
134 while (_active) {
135 input = _in.readLine();
136 if (input == null) break;
137 if (input.startsWith("!requestEngine")) {
138 requestEngine(input);
139 }
140 else {
141 _out.println("Unknown command.");
142 }
143 }
144 _scheduler.get_clients().remove(_socket.toString());
145 _in.close();
146 _out.close();
147 _socket.close();
148 } catch (IOException e) {
149 try {
150 _socket.close();
151 _scheduler.get_clients().remove(_socket.toString());
152 } catch (Exception e1) {
153
154 }
155 }
156 }
157
158
159 private void requestEngine(String input) {
160 Load load = null;
161 String id = null;
162 String parts[] = (input + " ").split(" ");
163 if (parts.length < 3) {
164 _out.println("!request <load>");
165 return;
166 }
167 try {
168 load = Load.valueOf(parts[1]);
169 id = parts[2];
170 } catch (Exception e) {
171 _out.println("!request <load>");
172 return;
173 }
174
175 if (_scheduler.get_engines().size() == 0) {
176 _out.println("No engines available.");
177 return;
178 }
179
180 // Update all current engines loads
181
182 getTaskEnginesLoad();
183
184
185 TaskEngine currentEngine = null;
186 float energy = Float.MAX_VALUE;
187 float newLoad = 0.0f;
188
189 for (TaskEngine engine : _scheduler.get_engines().values()) {
190 if (engine.get_state() != TaskEngineState.ONLINE) {
191 continue;
192 }
193
194 newLoad = _scheduler.getLoadValueOf(engine.get_load()) + _scheduler.getLoadValueOf(load);
195 if (newLoad == 0.99f) newLoad = 1.0f;
196 if (newLoad <= 1.0f && _scheduler.calculateConsumption(engine, newLoad) < energy) {
197 currentEngine = engine;
198 energy = _scheduler.calculateConsumption(engine, newLoad);
199 }
200 }
201
202 if (currentEngine == null) {
203 _out.println("Not enough capacity. Try again later.");
204 return;
205 }
206
207 newLoad = _scheduler.getLoadValueOf(currentEngine.get_load()) + _scheduler.getLoadValueOf(load);
208 if (newLoad == 0.99f) newLoad = 1.0f;
209 // Update local load
210 _scheduler.get_engines().get(currentEngine.get_identifier()).set_load(_scheduler.getLoadOf(newLoad));
211 _out.println("!assignedEngine " + currentEngine.get_taskEngineHost() + " " + currentEngine.get_tcpPort()
212 + " " + id);
213 }
214
215 private void requestEngines() {
216
217 }
218
219 private void getTaskEnginesLoad() {
220
221 for (TaskEngine taskEngine : _scheduler.get_engines().values()) {
222 if (taskEngine.get_state() != TaskEngineState.OFFLINE) {
223 CallableTaskEngineManageLoad tmpCallable = new CallableTaskEngineManageLoad(taskEngine);
224 _futureList.add(ThreadPool.getInstance().getExecutor().submit(tmpCallable));
225 }
226 }
227
228 for (Future<TaskEngine> future : _futureList) {
229 try {
230 _scheduler.get_engines().put(future.get().get_identifier(), future.get());
231 } catch (InterruptedException e) {
232 e.printStackTrace();
233 } catch (ExecutionException e) {
234 e.printStackTrace();
235 }
236 }
237 }
238
239 public void set_active(Boolean b) {
240 _active = b;
241 try {
242 _socket.close();
243 } catch (IOException e) {
244 }
245 }
246
247
248}