· 4 years ago · Dec 28, 2020, 07:10 PM
1<?php
2/** @noinspection PhpUndefinedClassInspection */
3
4declare(strict_types=1);
5
6namespace mypexteam\icaregions\repository\region\events\thread;
7
8use Exception;
9use mypexteam\icaregions\data\RegionEvent;
10use pocketmine\snooze\SleeperNotifier;
11use pocketmine\Thread;
12use SQLite3;
13use Threaded;
14
15final class RegionEventThread extends Thread{
16
17 public const QUERY_TYPE_SELECT = 0;
18 public const QUERY_TYPE_UPDATE = 1;
19
20 /** @var Threaded */
21 private $sendQueue;
22
23 /** @var Threaded */
24 private $recvQueue;
25
26 /** @var SleeperNotifier */
27 private $connectionNotifier;
28
29 /** @var SleeperNotifier */
30 private $queryNotifier;
31
32 /** @var string */
33 private $pathToFile;
34
35 /** @var bool */
36 private $flagForShutdown = false;
37
38 public function __construct(
39 SleeperNotifier $connectionNotifier,
40 SleeperNotifier $queryNotifier,
41 string $pathToFile
42 ){
43 $this->sendQueue = new Threaded();
44 $this->recvQueue = new Threaded();
45 $this->connectionNotifier = $connectionNotifier;
46 $this->queryNotifier = $queryNotifier;
47 $this->pathToFile = $pathToFile;
48 }
49
50 public function run(){
51 $this->registerClassLoader();
52
53 $sqlite3 = new SQLite3($this->pathToFile);
54 $sqlite3->exec(
55 <<<SQL
56CREATE TABLE IF NOT EXISTS `region_events` (
57 id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
58 type INTEGER NOT NULL,
59 sender TEXT NOT NULL,
60 x INTEGER NOT NULL,
61 y INTEGER NOT NULL,
62 z INTEGER NOT NULL,
63 region_name TEXT NOT NULL,
64 block BLOB NOT NULL,
65 time TEXT NOT NULL DEFAULT (datetime('now', 'localtime'))
66);
67SQL);
68
69 $errorCode = $sqlite3->lastErrorCode();
70 if($errorCode !== 0){
71 $this->pushException(-1, new Exception(
72 $sqlite3->lastErrorMsg(),
73 $errorCode
74 ), false);
75
76 $this->connectionNotifier->wakeupSleeper();
77 return;
78 }
79
80 $this->connectionNotifier->wakeupSleeper();
81 while(!$this->flagForShutdown){
82 while($this->hasQuery()){
83 $this->popQuery($queryId, $queryType, $query, $queryParams);
84
85 $statement = $sqlite3->prepare($query);
86 if($statement === false){
87 $this->pushException($queryId, new Exception(
88 $sqlite3->lastErrorMsg(),
89 $sqlite3->lastErrorCode()
90 ));
91
92 continue;
93 }
94
95 foreach($queryParams as $queryParam){
96 $statement->bindParam(
97 $queryParam[0],
98 $queryParam[1],
99 $queryParam[2],
100 );
101 }
102
103 $result = $statement->execute();
104
105 $errorCode = $sqlite3->lastErrorCode();
106 if($errorCode !== 0){
107 $this->pushException($queryId, new Exception(
108 $sqlite3->lastErrorMsg(),
109 $errorCode
110 ));
111
112 continue;
113 }
114
115 switch($queryType){
116 case self::QUERY_TYPE_SELECT:
117 $events = [];
118 while(($data = $result
119 ->fetchArray(SQLITE3_ASSOC))){
120 $events[] = RegionEvent::fromState($data);
121 }
122
123 $this->pushSelectResult($queryId, $events);
124 break;
125
126 case self::QUERY_TYPE_UPDATE:
127 $this->pushUpdateResult($queryId, $sqlite3->lastInsertRowID());
128 break;
129 }
130
131 $this->wait(1); //yes
132 }
133
134 $this->wait();
135 }
136
137 $sqlite3->close();
138 }
139
140 public function flagForShutdown() : void{
141 $this->flagForShutdown = true;
142 }
143
144 public function pushQuery(
145 int $queryId,
146 int $queryType,
147 string $query,
148 array $args = []
149 ) : void{
150 $this->synchronized(function() use (&$queryId, &$queryType, &$query, &$args) : void{
151 $this->sendQueue[] = serialize(
152 [
153 $queryId,
154 $queryType,
155 $query,
156 $args
157 ]
158 );
159
160 $this->notifyOne();
161 });
162 }
163
164 private function hasQuery() : bool{
165 return $this->sendQueue->count() > 0;
166 }
167
168 private function &popQuery(
169 int &$queryId,
170 int &$queryType,
171 string &$query,
172 array &$args
173 ) : void{
174 $this->synchronized(function() use (&$queryId, &$queryType, &$query, &$args) : void{
175 $queryData = unserialize($this->sendQueue->pop());
176
177 $queryId = $queryData[0];
178 $queryType = $queryData[1];
179 $query = $queryData[2];
180 $args = $queryData[3];
181 });
182 }
183
184 private function pushException(int $queryId, Exception $exception, bool $wakeup = true) : void{
185 $this->synchronized(function() use (&$queryId, &$exception, &$wakeup) : void{
186 $this->recvQueue[] = serialize([
187 $queryId,
188 $exception
189 ]);
190
191 if($wakeup){
192 $this->queryNotifier->wakeupSleeper();
193 }
194 });
195 }
196
197 /**
198 * @param int $queryId
199 * @param RegionEvent[] $regionEvents
200 */
201 private function pushSelectResult(int $queryId, array $regionEvents) : void{
202 $this->synchronized(function() use (&$queryId, &$regionEvents) : void{
203 $this->recvQueue[] = serialize([
204 $queryId,
205 $regionEvents
206 ]);
207
208 $this->queryNotifier->wakeupSleeper();
209 });
210 }
211
212 private function pushUpdateResult(int $queryId, int $lastInsertId) : void{
213 $this->synchronized(function() use (&$queryId, &$lastInsertId) : void{
214 $this->recvQueue[] = serialize([
215 $queryId,
216 $lastInsertId
217 ]);
218
219 $this->queryNotifier->wakeupSleeper();
220 });
221 }
222
223 public function hasResult() : bool{
224 return $this->recvQueue->count() > 0;
225 }
226
227 public function &popResult(int &$queryId, &$data) : void{
228 $this->synchronized(function() use(&$queryId, &$data) : void{
229 $result = unserialize($this->recvQueue->pop());
230
231 $queryId = $result[0];
232 $data = $result[1];
233 });
234 }
235}