· 4 years ago · Jul 07, 2021, 04:26 PM
1// #include "stdafx.h"
2#include "assert.h"
3#include <stdio.h>
4#include <string.h>
5#include <stdlib.h>
6// #include <windows.h>
7#include <time.h>
8#include "shm_types.h"
9// #include <processthreadsapi.h>
10#include "unordered_map.h"
11
12// moved to shm_types.h
13// #ifndef P_CC_MSVC
14// #include "safe_lib.h"
15// #endif
16
17ShmSuperblock *superblock;
18ShmChunk superblock_desc;
19vl char * superblock_mmap[SHM_BLOCK_COUNT / SHM_BLOCK_GROUP_SIZE];
20#if (defined(P_OS_WIN) || defined(P_OS_WIN64))
21 ShmPrivateEvents *private_events;
22#endif
23
24static ThreadContext thread_local;
25
26// Debugging
27ShmProcessHandle child_processes[64] = {0,};
28ShmProcessHandle parent_process = 0;
29char *assert_prefix;
30
31#ifndef P_CC_MSVC
32 #define unlikely(x) __builtin_expect((x),0)
33#endif
34
35void _myassert(bool condition, char *condition_msg, char *message, char *file, int line)
36{
37 if (unlikely(!(condition))) {
38 \
39 if (assert_prefix) \
40 fputs(assert_prefix, stderr); \
41 fprintf(stderr, "assertion \""); \
42 fprintf(stderr, "%s", condition_msg); \
43 fprintf(stderr, "\" at "); \
44 fputs(file, stderr); \
45 fprintf(stderr, "->"); \
46 fprintf(stderr, "%d", line); \
47 if ((message)) \
48 { \
49 fprintf(stderr, ":"); \
50 char *msg = message; \
51 fputs(msg, stderr); \
52 } \
53 fprintf(stderr, "\n"); \
54 if (DebugPause()) \
55 assert(false); \
56 } \
57}
58
59bool
60result_is_abort(int result)
61{
62 return result == RESULT_ABORT || result == RESULT_PREEMPTED;
63}
64
65bool
66result_is_repeat(int result)
67{
68 return result == RESULT_REPEAT || result == RESULT_WAIT || result == RESULT_WAIT_SIGNAL;
69}
70
71void
72memclear(vl void *dst, ShmInt count)
73{
74 memset(CAST_VL(dst), 0, (puint)count);
75}
76// strcpy/strncpy?
77
78// 62^n bytes possible different identifiers
79void _make_filename(char *buf)
80{
81 // Create a random filename for the shared memory object.
82 // number of random bytes to use for name
83 int nbytes = (SHM_SAFE_NAME_LENGTH - sizeof SHM_NAME_PREFIX);
84 assert(nbytes >= 2); // '_SHM_NAME_PREFIX too long'
85 strcpy_s(buf, SHM_SAFE_NAME_LENGTH, SHM_NAME_PREFIX);
86 for (int i = 0; i<nbytes; ++i)
87 buf[isizeof(SHM_NAME_PREFIX) + i - 1] = "01234567890ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz+_"[(unsigned int)rand() % 64];
88 buf[isizeof(SHM_NAME_PREFIX) + nbytes - 1] = '\0';
89 return;
90}
91
92void
93superblock_check_mmap_inited(int group_index)
94{
95 if (!superblock_mmap[group_index])
96 {
97 // block not mapped into memory yet
98 SectorAllocationResult alloc_result = shm_allocate_sector((__ShmChunk*)&superblock->block_groups[group_index], false, SHM_FIXED_CHUNK_SIZE * SHM_BLOCK_GROUP_SIZE);
99 myassert(alloc_result.data);
100 shm_sector_register_handle(&superblock->block_groups[group_index], alloc_result.FileMapping,
101 superblock->coordinator_process, false);
102 superblock_mmap[group_index] = alloc_result.data;
103 }
104}
105
106vl void *
107superblock_get_block(int index)
108{
109 if (!superblock)
110 return NULL;
111 int group_index = index / SHM_BLOCK_GROUP_SIZE;
112 int itemindex = index % SHM_BLOCK_GROUP_SIZE;
113 if (index >= superblock->block_count)
114 {
115 myassert(false);
116 return NULL;
117 }
118 superblock_check_mmap_inited(group_index);
119 return superblock_mmap[group_index] + itemindex * SHM_FIXED_CHUNK_SIZE;
120}
121
122vl void *
123superblock_get_block_noside(int index)
124{
125 if (!superblock)
126 return NULL;
127 int group_index = index / SHM_BLOCK_GROUP_SIZE;
128 int itemindex = index % SHM_BLOCK_GROUP_SIZE;
129 if (index >= superblock->block_count)
130 return NULL;
131 myassert(superblock_mmap[group_index]);
132 return superblock_mmap[group_index] + itemindex * SHM_FIXED_CHUNK_SIZE;
133}
134
135void
136lock_taken(ThreadContext *self, ShmLock *lock, ShmPointer shm_lock)
137{
138 self->next = EMPTY_SHM;
139}
140
141int
142signal_thread(ThreadContext *self, ShmPointer target)
143{
144 ThreadContext *target_ptr = SPTR(target);
145 if (target_ptr)
146 {
147 myassert(target_ptr->self == target);
148 // target_ptr->ready = 1; // atomic
149 // if (target_ptr->ready_event != 0)
150 // SetEvent(target_ptr->ready_event);
151 shm_event_signal(&target_ptr->ready);
152 return RESULT_OK;
153 }
154 else
155 return RESULT_INVALID;
156}
157
158int
159thread_reset_preempted(ThreadContext *self)
160{
161 ShmPointer thread = p_atomic_shm_pointer_get(&self->thread_preempted);
162 myassert(thread != NONE_SHM);
163 if (SBOOL(thread))
164 {
165 signal_thread(self, thread);
166 p_atomic_shm_pointer_set(&self->thread_preempted, EMPTY_SHM);
167 return RESULT_OK;
168 }
169 return RESULT_INVALID;
170}
171
172void
173thread_reset_signal(ThreadContext *thread)
174{
175 shm_event_reset(&thread->ready);
176}
177
178bool
179preempt_thread(ThreadContext *self, ThreadContext *target)
180{
181 myassert(target->magic == SHM_THREAD_MAGIC);
182 myassert(p_atomic_shm_pointer_get(&target->thread_preempted) != NONE_SHM);
183 // we might also try to signal lower priority waiter already stored target->thread_preempted,
184 // so we can replace thread_preempted with our pointer and return true.
185 if (p_atomic_shm_pointer_get(&target->thread_preempted) == self->self || PCAS2(&target->thread_preempted, self->self, EMPTY_SHM))
186 return true;
187 else
188 return false;
189}
190
191#define COMPARED_LOW_HIGH 1
192#define COMPARED_HIGH_LOW -1
193
194// basically returns self->last_start - target->last_start
195int
196shm_thread_start_compare(ShmInt thread1, ShmInt thread2)
197{
198 if (thread1 == 0 || thread2 == 0 || thread1 == thread2)
199 return 0;
200 else if (thread1 - thread2 > 0)
201 return 1;
202 else
203 return -1;
204}
205
206ThreadContext *
207shm_pointer_to_thread(ShmPointer value)
208{
209 ThreadContext *thread = shm_pointer_to_pointer(value);
210 myassert(thread->self == value);
211 myassert(thread->magic == SHM_THREAD_MAGIC);
212 myassert(thread->index >= 0 && thread->index < isizeof(ShmReaderBitmap));
213 return thread;
214}
215
216int
217take_read_lock__checks(ThreadContext *self, ShmLock *lock, ShmPointer next_writer, ShmPointer writer_lock,
218 ShmInt *oldest_writer_out, bool *lock_is_mine_out)
219{
220 if (SBOOL(p_atomic_shm_pointer_get(&self->thread_preempted)))
221 return RESULT_PREEMPTED;
222
223 bool the_lock_is_mine = false;
224 if (writer_lock == self->self)
225 {
226 the_lock_is_mine = true;
227 writer_lock = LOCK_UNLOCKED;
228 }
229 if (lock_is_mine_out) *lock_is_mine_out = the_lock_is_mine;
230
231 myassert(self->index >= 0 && self->index < isizeof(ShmReaderBitmap));
232 if ((atomic_bitmap_get(&lock->reader_lock) & atomic_bitmap_thread_mask(self->index)) &&
233 (SBOOL(writer_lock) == false || the_lock_is_mine))
234 {
235 return RESULT_OK; // already taken exclusively
236 }
237
238 myassert(self->last_start != 0);
239 ShmInt oldest_writer = 0;
240 if (SBOOL(writer_lock))
241 {
242 ThreadContext *that_thread = shm_pointer_to_thread(writer_lock);
243 if (that_thread)
244 {
245 ShmInt that_thread_start = p_atomic_int_get(&that_thread->last_start);
246 myassert(self->last_start != that_thread_start);
247 if (that_thread_start != 0)
248 {
249 if (oldest_writer == 0)
250 {
251 oldest_writer = that_thread_start;
252 }
253 else if (shm_thread_start_compare(oldest_writer, that_thread_start) == COMPARED_LOW_HIGH)
254 {
255 oldest_writer = that_thread_start;
256 }
257 }
258 }
259 }
260 // ShmPointer next_writer = p_atomic_shm_pointer_get(&lock->next_writer);
261 // myassert(next_writer != self->self);
262 if (!the_lock_is_mine && SBOOL(next_writer))
263 {
264 ThreadContext *that_thread = shm_pointer_to_thread(next_writer);
265 if (that_thread)
266 {
267 ShmInt that_thread_start = p_atomic_int_get(&that_thread->last_start);
268 myassert(self->last_start != that_thread_start);
269 if (that_thread_start != 0)
270 {
271 if (oldest_writer == 0)
272 {
273 oldest_writer = that_thread_start;
274 }
275 else if (shm_thread_start_compare(oldest_writer, that_thread_start) == COMPARED_LOW_HIGH)
276 {
277 oldest_writer = that_thread_start;
278 }
279 }
280 }
281 }
282 if (oldest_writer_out) *oldest_writer_out = oldest_writer;
283 // kinda optimization, we could've also do this check after acquiring the "reader_lock"
284 if (!the_lock_is_mine && oldest_writer != 0)
285 {
286 if (shm_thread_start_compare(self->last_start, oldest_writer) == COMPARED_LOW_HIGH)
287 {
288 self->private_data->times_aborted++;
289 return RESULT_PREEMPTED;
290 }
291 }
292
293 return RESULT_INVALID;
294}
295
296// Last modification 26.08.2020
297// RESULT_WAIT means there are low priority writers still holding the lock and we need to wait for them to abort e.g. Sleep(0), yield(), ShmEvent.
298// RESULT_REPEAT means something changed and we need to rerun this function.
299// RESULT_ABORT means there are high priority threads or we just have to abort anyway.
300// RESULT_PREEMPTED specifically for being preempted by another thread
301// RESULT_WAIT and RESULT_REPEAT keep the acquired lock so other threads, especially readers, can see it.
302//
303// Reader-writer contention is resolved with following considerations:
304// * If there's at least one reader older (higher priority) than writer, then readers are allowed to proceed and writer is aborted.
305// Reader->reader examination is expensive and undesirable for possible cohorting, so we rely on writer status, because writer has to examine readers anyway.
306// Active writer lock means there are no older readers (most probably).
307// * Racing conditions may cause both reader and writer lock flags be active at the same moment.
308// Taking a strict next_writer barrier (aka intermediate lock) with subsequent reader cross-check can help to completely avoid this,
309// but the benefits are not worth the trouble.
310// * Usage of a write barrier (next_writer) for convoy-blocking new readers is not the best idea because
311// we don't know when the next writer will start its transaction,
312// so reading might eventually be blocked forever despite the fact we could have a good reader concurrency.
313// * Contending locker acquires second lock, but cannot use it for data access (because the first lock is already in use).
314// Second lock is used for contention resolution, it can become usefull for data access after the contending lock is released.
315// * Writer either aborts when there's older reader, or preempts every reader otherwise.
316// * Receiving signals from every aborted reader will be expensive, so we should either spinlock with writer or send signal with LAST READER only.
317// Absense of other readers is easily determined because new readers usually cannot appear when high priority writer lock is set.
318// * On contention with readers, a writer sets "NEXT_WRITER" (kinda "a writer waiting for readers") instead of taking "writer_lock",
319// so a higher priority writer can come and take over the whole lock.
320// "next_writer" is more like a hint than actual lock, but we still need to ensure it is cleared correctly,
321// so I'd prefer to set it as late as possible and only by owning thread.
322// Thus high priority writer may come and set lock->writer_lock directly while lock->next_writer waits for readers,
323// or overwrite lock->next_writer and become a new waiter (we don't even need to signal the old waiter in the later case, because queue mechanism will send a signal).
324// * Detect higher priority contenders before locking, preempt low priority contenders after locking. Thus low priority contenders will be aware of higher priority contender.
325// if check_exclusive() return Ok;
326// if !check_highest_priority() return Preempted;
327// acquire_lock();
328// if preempt_contenders() return Wait;
329// else return Ok
330int
331take_read_lock(ThreadContext *self, ShmLock *lock, ShmPointer shm_lock)
332{
333 ShmPointer writer_lock = p_atomic_shm_pointer_get(&lock->writer_lock);
334 ShmPointer next_writer = p_atomic_shm_pointer_get(&lock->next_writer);
335 ShmInt oldest_writer = 0;
336 bool the_lock_is_mine = false;
337 int check_result = take_read_lock__checks(self, lock, next_writer, writer_lock, &oldest_writer, &the_lock_is_mine);
338 if (check_result != RESULT_INVALID)
339 return check_result;
340 // not preempted and there are no higher priority writers.
341
342 // before preempting writers we need to publish our lock so other writers will not try to get writer lock again.
343 bool old_value = atomic_bitmap_set(&lock->reader_lock, self->index);
344 if (!old_value)
345 {
346 // p_atomic_int_add(&lock->readers_count, 1);
347 self->private_data->read_locks_taken++;
348 }
349
350 // Preempt low priority writers.
351 // Also we need to not forget to verify (in each branch) absence of unexpected writer locks placed before we published our reader lock.
352 if (!the_lock_is_mine)
353 {
354 if (oldest_writer != 0 && shm_thread_start_compare(self->last_start, oldest_writer) == COMPARED_HIGH_LOW)
355 {
356 myassert(SBOOL(writer_lock) || SBOOL(next_writer));
357 // thread cannot get older
358 bool can_signal_back = true;
359 bool contended = false; // we can continue when next_writer reset was successfully, but writer_lock should be taken seriously
360 if (SBOOL(next_writer))
361 {
362 // next_writer field is not so important, we can just overwrite it, but send signal in case next_writer thread is waiting for progress.
363 if (PCAS2(&lock->next_writer, LOCK_UNLOCKED, next_writer))
364 {
365 ThreadContext *that_thread = shm_pointer_to_thread(next_writer);
366 // can_signal_back = preempt_thread(self, that_thread) && can_signal_back;
367 // actually, we don't care about next_writer as long as he's able to see our lock and won't make make any writes.
368 myassert(shm_thread_start_compare(self->last_start, that_thread->last_start) != COMPARED_LOW_HIGH); // coz threads cannot get older
369 preempt_thread(self, that_thread);
370 shm_event_signal(&that_thread->ready);
371 }
372 else
373 return RESULT_REPEAT;
374 }
375 else if (SBOOL(p_atomic_shm_pointer_get(&lock->next_writer)))
376 return RESULT_REPEAT;
377
378 ShmPointer writer_lock_2 = p_atomic_shm_pointer_get(&lock->writer_lock);
379 if (writer_lock_2 == self->self)
380 writer_lock_2 = LOCK_UNLOCKED;
381
382 if (writer_lock_2 != writer_lock) // we compare the field with value at the start of the function because the whole function takes short time to complete
383 return RESULT_REPEAT;
384 if (SBOOL(writer_lock))
385 {
386 // writer_lock cannot be modified that easily. We need to wait for writer to willingly release the lock.
387 ThreadContext *that_thread = shm_pointer_to_thread(writer_lock);
388 myassert(shm_thread_start_compare(self->last_start, that_thread->last_start) != COMPARED_LOW_HIGH); // coz threads cannot get older
389 can_signal_back = preempt_thread(self, that_thread) && can_signal_back;
390 shm_event_signal(&that_thread->ready);
391 contended = true;
392 }
393 else if (SBOOL(writer_lock_2))
394 return RESULT_REPEAT;
395
396 if (contended)
397 {
398 self->private_data->times_waiting++;
399 if (can_signal_back)
400 return RESULT_WAIT_SIGNAL;
401 else
402 return RESULT_WAIT;
403 }
404 }
405 else if (SBOOL(p_atomic_shm_pointer_get(&lock->writer_lock)))
406 {
407 // There suddenly appeared a new writer_lock right before we published our read lock.
408 // It's okay to have conflicting reader and writer lock active, but we cannot use our read lock unless we are sure nobody's using the writer lock and vice versa
409 return RESULT_REPEAT;
410 }
411 }
412
413 return RESULT_OK_LOCKED;
414}
415
416bool
417shm_cell_have_write_lock(ThreadContext *thread, ShmLock *lock)
418{
419 return p_atomic_shm_pointer_get(&lock->writer_lock) == thread->self;
420}
421
422bool
423shm_cell_have_read_lock(ThreadContext *thread, ShmLock *lock)
424{
425 return atomic_bitmap_check_me(&lock->reader_lock, thread->index);
426}
427
428void
429shm_cell_check_write_lock(ThreadContext *thread, ShmLock *lock)
430{
431 // myassert(atomic_bitmap_check_exclusive(&lock->reader_lock, thread->index));
432 myassert(shm_cell_have_write_lock(thread, lock));
433 myassert(p_atomic_int_get(&lock->writers_count) == 1);
434
435 ShmInt readers_count = p_atomic_int_get(&lock->readers_count);
436 ShmInt reference;
437 if (shm_cell_have_read_lock(thread, lock))
438 reference = 1;
439 else
440 reference = 0;
441
442 myassert(readers_count == reference);
443}
444
445void
446shm_cell_check_read_lock(ThreadContext *thread, ShmLock *lock)
447{
448 // myassert(atomic_bitmap_check_exclusive(&lock->reader_lock, thread->index));
449 myassert(shm_cell_have_read_lock(thread, lock));
450 myassert(p_atomic_int_get(&lock->readers_count) > 0);
451 // On very rare occasions can the lock->readers_count be higher for a very short period of time with "thread->preempted" assigned
452 if (shm_cell_have_write_lock(thread, lock))
453 myassert(p_atomic_int_get(&lock->writers_count) == 1);
454 else
455 myassert(p_atomic_int_get(&lock->writers_count) == 0);
456}
457
458void
459shm_cell_check_read_write_lock(ThreadContext *thread, ShmLock *lock)
460{
461 if (shm_cell_have_write_lock(thread, lock))
462 shm_cell_check_write_lock(thread, lock);
463 else
464 shm_cell_check_read_lock(thread, lock);
465}
466
467void
468_thread_set_pending_lock(ThreadContext *thread, ShmPointer shm_lock)
469{
470 p_atomic_shm_pointer_set(&thread->pending_lock, shm_lock);
471 p_atomic_int_add(&thread->private_data->pending_lock_count, 1);
472}
473
474void
475notify_next_writer(ThreadContext *thread, ShmLock *lock);
476
477bool
478thread_queue_to_lock(ThreadContext *thread, ShmLock *lock, ShmPointer container_shm)
479{
480 myassert(thread->index >= 0 && thread->index < isizeof(ShmReaderBitmap));
481 bool in_queue = atomic_bitmap_check_me(&lock->queue_threads, thread->index);
482 if (!in_queue)
483 {
484 myassert(thread->pending_lock != container_shm);
485 // if (SBOOL(thread->pending_lock))
486 // {
487 // thread_unqueue_from_lock(thread);
488 // }
489 myassert(SBOOL(thread->pending_lock) == false); // we always unqueue after abort or success (but not repeat)
490
491 // shm_event_reset(&thread->ready);
492 // p_atomic_int_add(&superblock->debug_lock_count, 1);
493 _thread_set_pending_lock(thread, container_shm);
494 atomic_bitmap_set(&lock->queue_threads, thread->index);
495 return true;
496 }
497 else
498 myassert(thread->pending_lock == container_shm);
499
500 return false;
501}
502
503bool
504_thread_unqueue_from_lock(ThreadContext *thread, ShmLock *lock)
505{
506 bool rslt = PCAS2(&lock->next_writer, LOCK_UNLOCKED, thread->self);
507
508 p_atomic_shm_pointer_set(&thread->pending_lock, EMPTY_SHM);
509 p_atomic_int_dec_and_test(&thread->private_data->pending_lock_count);
510 atomic_bitmap_reset(&lock->queue_threads, thread->index);
511 return rslt;
512}
513
514void
515thread_unqueue_from_lock(ThreadContext *thread)
516{
517 if (SBOOL(thread->pending_lock))
518 {
519 ShmContainer *container = shm_pointer_to_pointer(thread->pending_lock);
520 _thread_unqueue_from_lock(thread, &container->lock);
521 }
522}
523
524void
525update_tickets(ThreadContext *thread, vl ShmInt *cell)
526{
527 ShmInt ticket = superblock->ticket;
528 *cell += ticket - thread->private_data->last_known_ticket;
529 thread->private_data->last_known_ticket = ticket;
530}
531
532ShmPointer
533lock_queue_find_highest_priority(volatile ShmReaderBitmap *bitmap, int self_index, int *found_index, ShmInt *found_last_start)
534{
535 ShmReaderBitmap contenders = atomic_bitmap_exclude_thread(*bitmap, self_index);
536 ShmInt oldest_start = 0;
537 int oldest_index = -1;
538 for (int idx = 0; idx < MAX_THREAD_COUNT; ++idx)
539 {
540 if ((contenders & atomic_bitmap_thread_mask(idx)) != 0)
541 {
542 ThreadContext *thread = shm_pointer_to_pointer(superblock->threads.threads[idx]);
543 ShmInt thread_start = thread->last_start;
544 if (thread_start != 0)
545 {
546 if (oldest_start == 0 || shm_thread_start_compare(thread_start, oldest_start) == COMPARED_HIGH_LOW)
547 {
548 oldest_start = thread_start;
549 oldest_index = idx;
550 }
551 }
552 }
553 }
554 if (found_index) *found_index = oldest_index;
555 if (found_last_start) *found_last_start = oldest_start;
556 if (oldest_index == -1)
557 return EMPTY_SHM;
558 else
559 return superblock->threads.threads[oldest_index];
560}
561
562ShmInt
563next_writer_get_last_start(ShmPointer next_writer)
564{
565 ThreadContext *thread = shm_pointer_to_pointer(next_writer);
566 myassert(thread || !SBOOL(next_writer));
567 if (thread)
568 return thread->last_start;
569 else
570 return 0;
571}
572
573bool
574atomic_bitmap_has_higher_priority(ShmReaderBitmap contenders, ShmInt last_start)
575{
576 for (int i = 0; i < 64; ++i)
577 {
578 if ((contenders & (UINT64_C(1) << i)) != 0)
579 {
580 ThreadContext *thread = shm_pointer_to_pointer(superblock->threads.threads[i]);
581 myassert(last_start != thread->last_start);
582 if (thread && shm_thread_start_compare(last_start, thread->last_start) == COMPARED_LOW_HIGH)
583 {
584 return true;
585 }
586
587 }
588 }
589 return false;
590}
591
592#define thread_debug_register_line(line) do {\
593 self->private_data->last_writer_lock = lock->writer_lock; \
594 self->private_data->last_writer_lock_pntr = lock; \
595 self->private_data->last_operation = __LINE__; \
596} while (0);
597
598#define thread_debug_register_result(times_var, tickets_var) do {\
599 self->private_data->times_var++; \
600 update_tickets(self, &self->private_data->tickets_var); \
601 self->private_data->last_writer_lock = lock->writer_lock; \
602 self->private_data->last_writer_lock_pntr = lock; \
603 self->private_data->last_operation = __LINE__; \
604} while (0);
605
606int
607preempt_readers(ThreadContext *self, ShmLock *lock)
608{
609 ShmReaderBitmap contenders = atomic_bitmap_contenders(&lock->reader_lock, self->index);
610 // should be checking this condition before calling preempt_readers_or_abort()
611 if (false && atomic_bitmap_has_higher_priority(contenders, self->last_start))
612 {
613 if (shm_cell_have_write_lock(self, lock))
614 lock->release_line = __LINE__;
615 thread_debug_register_result(times_aborted4, tickets_aborted4);
616 return RESULT_PREEMPTED;
617 }
618 bool preempted = false;
619 ShmReaderBitmap signalled_readers = 0;
620 ShmInt myindex = self->index;
621 for (int idx = 0; idx < 64; ++idx)
622 {
623 // contenders exclude the current thread (self->index)
624 if (atomic_bitmap_check_me(&contenders, idx))
625 {
626 ThreadContext *thread = shm_pointer_to_pointer(superblock->threads.threads[idx]);
627 if (thread)
628 switch (shm_thread_start_compare(self->last_start, thread->last_start))
629 {
630 case COMPARED_HIGH_LOW:
631 {
632 // thread->last_start, but thread cannot get older
633 preempt_thread(self, thread);
634 preempted = true;
635 signalled_readers |= atomic_bitmap_thread_mask(idx);
636 break;
637 }
638 case COMPARED_LOW_HIGH:
639 return RESULT_PREEMPTED;
640 break;
641 case 0:
642 // just remember this case exists. For example, thread just ended its transaction and/or about to start a new one.
643 break;
644 }
645 };
646 }
647 // myassert(preempted, NULL); // contenders might be changed in the process so false assertions will appear on rare occasions
648 if (preempted)
649 {
650 if ((signalled_readers & atomic_bitmap_get(&lock->reader_lock)) != 0)
651 return RESULT_WAIT_SIGNAL; // readers still active and will signal us using lock->next_writer (you've already set next_writer here, right?)
652 else
653 return RESULT_OK;
654 // we might still get some contending readers unless next_writer/writer_lock is set. And even then some higher priority reader might come in.
655 // So RESULT_OK doesn't mean there are no contenders.
656 }
657 else
658 return RESULT_OK;
659}
660
661bool random_flinch = false;
662
663int
664take_write_lock__checks(ThreadContext *self, ShmLock *lock, ShmPointer container_shm, bool strict)
665{
666 // assert the queueing consistency
667 myassert(atomic_bitmap_check_me(&lock->queue_threads, self->index) == (self->pending_lock == container_shm));
668
669 if (SBOOL(p_atomic_shm_pointer_get(&self->thread_preempted)))
670 {
671 thread_debug_register_result(times_aborted1, tickets_aborted1);
672 self->private_data->last_operation = __LINE__;
673 self->private_data->last_operation_rslt = RESULT_PREEMPTED;
674 return RESULT_PREEMPTED;
675 }
676
677 if (random_flinch && rand() % 128 == 3)
678 {
679 thread_debug_register_line(__LINE__);
680 self->private_data->last_operation_rslt = RESULT_INVALID;
681 return RESULT_PREEMPTED;
682 }
683
684 myassert(self->index >= 0 && self->index < isizeof(ShmReaderBitmap));
685 // we might be having contention with readers after we've got the lock, because someone haven't see our lock yet.
686 if (atomic_bitmap_check_exclusive(&lock->reader_lock, self->index))
687 {
688 if (shm_cell_have_write_lock(self, lock))
689 {
690 myassert(p_atomic_int_get(&lock->writers_count) <= 1);
691 thread_debug_register_line(__LINE__);
692 self->private_data->last_operation_rslt = RESULT_OK;
693 return RESULT_OK; // already have the valid lock
694 }
695 }
696 else
697 {
698 // don't even try to get the lock in case there are higher priority locks
699 ShmReaderBitmap contenders = atomic_bitmap_contenders(&lock->reader_lock, self->index);
700 for (int i = 0; i < 64; ++i)
701 {
702 if ((contenders & atomic_bitmap_thread_mask(i)) != 0)
703 {
704 ThreadContext *thread = shm_pointer_to_pointer(superblock->threads.threads[i]);
705 myassert(self->last_start != thread->last_start);
706 if (thread)
707 {
708 switch (shm_thread_start_compare(self->last_start, thread->last_start))
709 {
710 case COMPARED_LOW_HIGH:
711 thread_debug_register_result(times_aborted2, tickets_aborted2);
712 self->private_data->last_operation_rslt = RESULT_PREEMPTED;
713 return RESULT_PREEMPTED;
714 break;
715 case COMPARED_HIGH_LOW:
716 if (strict) // for postchecking right after we've got the lock
717 {
718 thread_debug_register_line(__LINE__);
719 self->private_data->last_operation_rslt = RESULT_REPEAT;
720 return RESULT_REPEAT;
721 }
722 break;
723 }
724 }
725 }
726 }
727 if (random_flinch && rand() % 128 == 3)
728 {
729 thread_debug_register_line(__LINE__);
730 self->private_data->last_operation_rslt = RESULT_INVALID;
731 return RESULT_REPEAT;
732 }
733 /*if (atomic_bitmap_has_higher_priority(contenders, self->last_start))
734 {
735 // no assertion here because we might not have the lock
736 if (shm_cell_have_write_lock(self, lock))
737 {
738 // lock->release_line = __LINE__;
739 // p_atomic_pointer_set(&lock->writer_lock, LOCK_UNLOCKED);
740 // self->private_data->write_locks_taken--;
741 }
742 thread_debug_register_result(times_aborted2, tickets_aborted2);
743 self->private_data->last_operation_rslt = RESULT_PREEMPTED;
744 return RESULT_PREEMPTED;
745 }*/
746 }
747
748 // thread is not preempted, doesn't have the lock yet, there are no higher priority readers
749 thread_debug_register_line(__LINE__);
750 return RESULT_INVALID;
751}
752
753// Last modification 26.08.2020
754// RESULT_WAIT means there are low priority writers still holding the lock and we need to wait for them to abort e.g. Sleep(0), yield(), ShmEvent.
755// RESULT_REPEAT means something changed and we need to rerun this function.
756// RESULT_ABORT means there are high priority threads or we just have to abort anyway.
757// RESULT_PREEMPTED specifically for being preempted by another thread
758// RESULT_WAIT and RESULT_REPEAT keep the acquired lock so other threads, especially readers, can see it.
759//
760// Reader-writer contention is resolved with following considerations:
761// * If there's at least one reader older (higher priority) than writer, then readers are allowed to proceed and writer is aborted.
762// Reader->reader examination is expensive and undesirable for possible cohorting, so we rely on writer status, because writer has to examine readers anyway.
763// Active writer lock means there are no older readers (most probably).
764// * Racing conditions may cause both reader and writer lock flags be active at the same moment.
765// Taking a strict next_writer barrier (aka intermediate lock) with subsequent reader cross-check can help to completely avoid this,
766// but the benefits are not worth the trouble.
767// * Usage of a write barrier (next_writer) for convoy-blocking new readers is not the best idea because
768// we don't know when the next writer will start its transaction,
769// so reading might eventually be blocked forever despite the fact we could have a good reader concurrency.
770// * Contending locker acquires second lock, but cannot use it for data access (because the first lock is already in use).
771// Second lock is used for contention resolution, it can become usefull for data access after the contending lock is released.
772// * Writer either aborts when there's older reader, or preempts every reader otherwise.
773// * Receiving signals from every aborted reader will be expensive, so we should either spinlock with writer or send signal with LAST READER only.
774// Absense of other readers is easily determined because new readers usually cannot appear when high priority writer lock is set.
775// * On contention with readers, a writer sets "NEXT_WRITER" (kinda "a writer waiting for readers") instead of taking "writer_lock",
776// so a higher priority writer can come and take over the whole lock.
777// "next_writer" is more like a hint than actual lock, but we still need to ensure it is cleared correctly,
778// so I'd prefer to set it as late as possible and only by owning thread.
779// Thus high priority writer may come and set lock->writer_lock directly while lock->next_writer waits for readers,
780// or overwrite lock->next_writer and become a new waiter (we don't even need to signal the old waiter in the later case, because queue mechanism will send a signal).
781// * Detect higher priority contenders before locking, preempt low priority contenders after locking. Thus low priority contenders will be aware of higher priority contender.
782//
783// if higher_priority_threads_exist() || preempted then return Preempted;
784// if exclusive_lock_owned() then return Ok;
785// queue_for_lock();
786// set_barrier();
787// if preempt_low_priorities()
788// return wait;
789// acquire_lock();
790// if preempt_low_priorities() {
791// if highest_priority_threads_exist() || preempted then
792// return Preempted;
793// if preempt_low_priorities() then
794// continue;
795// return Wait; // waiting algorythm:
796// // if higher_priority_threads_exist() || preempted then
797// // release_lock();
798// // preempt_low_priorities();
799// // wait_signal();
800// }
801// return Ok
802int
803take_write_lock(ThreadContext *self, ShmLock *lock, ShmPointer container_shm)
804{
805 myassert(self->last_start != 0);
806 int check_rslt = take_write_lock__checks(self, lock, container_shm, false);
807 if (check_rslt != RESULT_INVALID)
808 return check_rslt;
809 // Thread is not preempted, doesn't have the lock yet, there are no higher priority readers...
810
811 // It's possible to reach this place with "writer_lock = self" due to contending readers.
812 // if (shm_cell_have_write_lock(self, lock) == false)
813 // No, always queue to keep the conditions clear for any writer-contender. Unqueue on transaction end (not on retain-retry) or next writer lock acquisition.
814 {
815 thread_queue_to_lock(self, lock, container_shm);
816 thread_reset_signal(self); // prepare to wait for signal in queue
817 }
818 // Could've moved both queueing and writer priority checks into take_write_lock__checks,
819 // but it's better to separate those strict conditions (take_write_lock__checks) with more volatile conditions in the writer queue.
820
821 // Remember: we cancel our transaction due to higher priority writer before taking lock, but preempt other transactions after we took the lock.
822 // We have to check the writer_lock, because each transaction can be queue to single lock only i.e. has writer_lock but absent in queue_threads.
823 ShmPointer current_writer_lock = p_atomic_shm_pointer_get(&lock->writer_lock);
824 if (SBOOL(current_writer_lock) && current_writer_lock != self->self)
825 {
826 ThreadContext *that_thread = shm_pointer_to_thread(current_writer_lock);
827 myassert(that_thread);
828 if (shm_thread_start_compare(self->last_start, that_thread->last_start) == COMPARED_LOW_HIGH)
829 {
830 self->private_data->last_wait_oldest = current_writer_lock;
831 self->private_data->last_wait_oldest_index = that_thread->index;
832 self->private_data->last_wait_queue = lock->queue_threads;
833 self->private_data->last_wait_writer_lock = lock->writer_lock;
834 self->private_data->last_wait_next_writer = lock->next_writer;
835 thread_debug_register_line(__LINE__);
836 self->private_data->last_operation_rslt = RESULT_PREEMPTED;
837 return RESULT_PREEMPTED;
838 }
839 }
840 ShmInt oldest_start;
841 int oldest_index = -1;
842 ShmPointer oldest = lock_queue_find_highest_priority(&lock->queue_threads, self->index, NULL, &oldest_start); // slow function
843 if (SBOOL(oldest) && shm_thread_start_compare(self->last_start, oldest_start) == COMPARED_LOW_HIGH)
844 {
845 // there is a higher priority writer in the queue
846 // signal_thread(self, oldest); - normally we should never need this
847 self->private_data->last_wait_oldest = oldest;
848 self->private_data->last_wait_oldest_index = oldest_index;
849 self->private_data->last_wait_queue = lock->queue_threads;
850 self->private_data->last_wait_writer_lock = lock->writer_lock;
851 self->private_data->last_wait_next_writer = lock->next_writer;
852 thread_debug_register_line(__LINE__);
853 self->private_data->last_operation_rslt = RESULT_PREEMPTED;
854 return RESULT_PREEMPTED; // Hm-m-m, this way we don't need the queue anymore, but a single highest priority thread.
855 }
856 // Thread is not preempted, doesn't have the lock yet, there are no higher priority readers...
857 // ...
858 // ... and there are no higher priority writers.
859 // Now we have two routes:
860 // - There are contending low priority readers and we will wait with next_writer set
861 // - Otherwise just take writer_lock
862
863 // Code below writen by tired me, so you should check it twice.
864
865 // We could place this code somewhere below, inside conditions, but it's so much easier to understand the code if we always set the next_writer.
866 // We are most likely a highest priority thread here.
867 ShmPointer next_writer = p_atomic_shm_pointer_get(&lock->next_writer); // we haven't read the next_writer yet
868 if (next_writer != self->self)
869 {
870 if (SBOOL(next_writer))
871 {
872 ThreadContext *that_thread = shm_pointer_to_thread(next_writer);
873 if (shm_thread_start_compare(self->last_start, that_thread->last_start) == COMPARED_LOW_HIGH)
874 {
875 thread_debug_register_line(__LINE__);
876 self->private_data->last_operation_rslt = RESULT_REPEAT;
877 return RESULT_REPEAT; // that's a new high priority thread, we can't CAS it away.
878 }
879 }
880 if (random_flinch && rand() % 128 == 3)
881 {
882 thread_debug_register_line(__LINE__);
883 self->private_data->last_operation_rslt = RESULT_INVALID;
884 return RESULT_REPEAT;
885 }
886 if (!PCAS2(&lock->next_writer, self->self, next_writer))
887 {
888 self->private_data->last_operation = __LINE__;
889 self->private_data->last_operation_rslt = RESULT_REPEAT;
890 return RESULT_REPEAT;
891 }
892 }
893 // Barrier is active, now we can check the contenders
894 ShmReaderBitmap contenders = atomic_bitmap_contenders(&lock->reader_lock, self->index);
895 if (contenders != 0)
896 {
897 int rslt = preempt_readers(self, lock);
898 if (rslt != RESULT_OK)
899 {
900 self->private_data->last_writer_lock = lock->writer_lock;
901 self->private_data->last_writer_lock_pntr = lock;
902 self->private_data->last_operation = __LINE__;
903 self->private_data->last_operation_rslt = rslt;
904 return rslt; // either preempted by new reader or wait for readers to abort.
905 }
906 // else preempt_readers() detected no contenders on its exit, so we should try acquiring writer_lock
907 }
908
909 if (!PCAS2(&lock->writer_lock, self->self, LOCK_UNLOCKED) && !PCAS2(&lock->writer_lock, self->self, DEBUG_SHM))
910 {
911 self->private_data->last_writer_lock = lock->writer_lock;
912 self->private_data->last_writer_lock_pntr = lock;
913 self->private_data->last_operation = __LINE__;
914 self->private_data->last_operation_rslt = RESULT_REPEAT;
915 return RESULT_REPEAT;
916 }
917 self->private_data->write_locks_taken++;
918 p_atomic_int_add(&superblock->debug_lock_count, 1);
919 // lock_taken(self, lock, container_shm);
920 self->private_data->last_writer_lock = lock->writer_lock;
921 self->private_data->last_writer_lock_pntr = lock;
922 self->private_data->last_operation = __LINE__;
923 self->private_data->last_operation_rslt = RESULT_OK;
924 if (random_flinch && rand() % 128 == 3)
925 {
926 thread_debug_register_line(__LINE__);
927 self->private_data->last_operation_rslt = RESULT_INVALID;
928 return RESULT_REPEAT;
929 }
930
931 // There's a small, but significant chance for a reader to set its reader lock right after it checked the lock->writer_lock.
932 // It will abort its transaction really soon and will not restart it.
933 contenders = atomic_bitmap_contenders(&lock->reader_lock, self->index);
934 if (contenders != 0)
935 {
936 for (int idx = 0; idx < 64; ++idx)
937 {
938 if (atomic_bitmap_check_me(&contenders, idx))
939 {
940 ThreadContext *thread = shm_pointer_to_pointer(superblock->threads.threads[idx]);
941 if (thread && shm_thread_start_compare(self->last_start, thread->last_start) == COMPARED_HIGH_LOW)
942 {
943 // myassert_msg(false, "Got new readers while writer_lock and next_writer barriers were active");
944 return RESULT_WAIT;
945 }
946 }
947 }
948 }
949 if (random_flinch && rand() % 128 == 3)
950 {
951 thread_debug_register_line(__LINE__);
952 self->private_data->last_operation_rslt = RESULT_INVALID;
953 return RESULT_WAIT;
954 }
955
956 int after_check_rslt = take_write_lock__checks(self, lock, container_shm, true);
957 if (after_check_rslt == RESULT_INVALID)
958 after_check_rslt = RESULT_OK; // not preempted and has exclusive lock (no other readers, even low priority ones)
959 if (after_check_rslt != RESULT_OK)
960 {
961 self->private_data->last_operation = __LINE__;
962 self->private_data->last_operation_rslt = after_check_rslt;
963 }
964 return after_check_rslt;
965 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
966
967 // lock->next_writer transaction start
968 ShmPointer prev_next_writer = p_atomic_shm_pointer_get(&lock->next_writer);
969 ShmInt prev_next_writer_start = next_writer_get_last_start(prev_next_writer);
970 // let LR = locking right
971 bool had_LR = prev_next_writer == self->self;
972 bool got_LR = false;
973 if (!had_LR)
974 {
975 if (shm_thread_start_compare(self->last_start, prev_next_writer_start) == COMPARED_LOW_HIGH)
976 {
977 thread_debug_register_result(times_aborted3, tickets_aborted3);
978 // DebugPause();
979 return RESULT_WAIT_SIGNAL; // we are in queue but we need to wait for the current lock->next_writer
980 }
981
982 if (!PCAS2(&lock->next_writer, self->self, prev_next_writer))
983 {
984 thread_debug_register_result(times_repeated, tickets_repeated);
985 return RESULT_REPEAT; // lock->next_writer changed
986 }
987 got_LR = true;
988 }
989 // lock->next_writer transaction finish.
990
991 // WHERE DO WE PLACE IT? Somewhere after we take the lock
992 // the writer lock acts as a barrier, there's no use in preempting readers before taking the lock because readers will keep coming again.
993 if (shm_cell_have_write_lock(self, lock) || false)
994 {
995 myassert(p_atomic_int_get(&lock->writers_count) <= 1);
996
997 // preempt readers or abort
998 thread_reset_signal(self);
999 int readers_result = preempt_readers(self, lock);
1000
1001 if (readers_result != RESULT_OK)
1002 {
1003 if (!atomic_bitmap_check_exclusive(&lock->reader_lock, self->index))
1004 {
1005 // readers are still active and have their
1006 thread_debug_register_result(times_waiting, tickets_waiting);
1007 return readers_result; // wait for the preempted threads
1008 // On rare occasions we might be waiting for a reader that will never signal us.
1009 }
1010 }
1011 // otherwise we don't expect new readers as documented in preempt_readers()
1012 }
1013
1014 if (had_LR || got_LR)
1015 {
1016 // next_writer as a barrier is useless. next_writer check is useless two coz we just read it.
1017 // // cross-check with readers after locking
1018 // int readers_result = preempt_readers_or_abort(self, contenders, lock);
1019
1020 // if (p_atomic_shm_pointer_get(&lock->next_writer) != self->self)
1021 // {
1022 // thread_debug_register_result(times_aborted5, tickets_aborted5);
1023 // return RESULT_PREEMPTED; // about to be preempted
1024 // }
1025
1026 // transform the preliminary lock into the actual lock
1027 if (PCAS2(&lock->writer_lock, self->self, LOCK_UNLOCKED) || PCAS2(&lock->writer_lock, self->self, DEBUG_SHM))
1028 {
1029 self->private_data->write_locks_taken++;
1030
1031 myassert(EMPTY_SHM == p_atomic_shm_pointer_get(&lock->transaction_data));
1032
1033 myassert(lock->writers_count == 0);
1034
1035 // // cross-check with next_writer under lock
1036 // if (p_atomic_shm_pointer_get(&lock->next_writer) != self->self)
1037 // {
1038 // // about to be preempted. -- So what? Putting this check after every line hurts readability.
1039 // signal_thread(self, p_atomic_shm_pointer_get(&lock->next_writer));
1040 // p_atomic_pointer_set(&lock->writer_lock, LOCK_UNLOCKED); // we are still in the lock->queue_threads
1041 // self->private_data->write_locks_taken--;
1042 // thread_debug_register_result(times_aborted6, tickets_aborted6);
1043 // return RESULT_PREEMPTED;
1044 // }
1045
1046 bool next_writer_released = false;
1047 // Find a new next_writer
1048 // Usually we are the highest priority writer unless lock->next_writer was changed by higher priority writer.
1049 for (int newnext_cycle = 0; newnext_cycle <= 5; ++newnext_cycle)
1050 {
1051 myassert_msg(newnext_cycle < 5, "Too long looping to select a new lock->next_writer");
1052 int new_next_index = -1;
1053 ShmPointer new_next = lock_queue_find_highest_priority(&lock->queue_threads, self->index, &new_next_index, NULL);
1054 if (new_next_index != -1)
1055 {
1056 // CAS on lock->next_writer is the last step in the _thread_release_pending_lock() after excluding itself from lock->queue_threads,
1057 // so we need to make sure this new next_writer is not going through these steps right now, otherwise it might never release the lock->next_writer.
1058 if (atomic_bitmap_check_me(&lock->queue_threads, new_next_index))
1059 {
1060 next_writer_released = PCAS2(&lock->next_writer, new_next, self->self);
1061 }
1062 else
1063 {
1064 continue;
1065 }
1066 }
1067 break;
1068 }
1069
1070 // leave the queue
1071 bool next_writer_released2 = _thread_unqueue_from_lock(self, lock);
1072 next_writer_released = next_writer_released2 || next_writer_released;
1073 if (!next_writer_released)
1074 {
1075 // about to be preempted
1076 signal_thread(self, p_atomic_shm_pointer_get(&lock->next_writer));
1077 p_atomic_pointer_set(&lock->writer_lock, LOCK_UNLOCKED); // we are not in queue now
1078 self->private_data->write_locks_taken--;
1079 thread_debug_register_result(times_aborted7, tickets_aborted7);
1080 return RESULT_PREEMPTED;
1081 }
1082
1083 p_atomic_int_add(&superblock->debug_lock_count, 1);
1084
1085 p_atomic_int_add(&lock->writers_count, 1);
1086 lock_taken(self, lock, container_shm);
1087 self->private_data->last_writer_lock_pntr = lock;
1088 self->private_data->last_writer_lock = lock->writer_lock;
1089 self->private_data->last_operation = __LINE__;
1090 // cross-check under lock
1091 if (atomic_bitmap_check_exclusive(&lock->reader_lock, self->index))
1092 {
1093 shm_cell_check_write_lock(self, lock);
1094 return RESULT_OK;
1095 }
1096 else
1097 // someone got a reading lock while we were acquiring writing lock
1098 return RESULT_REPEAT;
1099 }
1100 else
1101 {
1102 ShmPointer writer_lock = p_atomic_shm_pointer_get(&lock->writer_lock);
1103 ThreadContext *thread = shm_pointer_to_pointer(writer_lock);
1104 if (thread)
1105 {
1106 myassert(self->last_start != thread->last_start);
1107 switch (shm_thread_start_compare(self->last_start, thread->last_start))
1108 {
1109 case COMPARED_HIGH_LOW:
1110 // thread cannot get older
1111 thread_debug_register_result(times_waiting2, tickets_waiting2);
1112 if (preempt_thread(self, thread))
1113 return RESULT_WAIT;
1114 else
1115 return RESULT_ABORT;
1116 break;
1117 case COMPARED_LOW_HIGH:
1118 // no assertion here because we might not have the lock
1119 if (shm_cell_have_write_lock(self, lock))
1120 {
1121 lock->release_line = __LINE__;
1122 signal_thread(self, p_atomic_shm_pointer_get(&lock->next_writer));
1123 p_atomic_pointer_set(&lock->writer_lock, LOCK_UNLOCKED);
1124 self->private_data->write_locks_taken--;
1125 }
1126 thread_debug_register_result(times_aborted8, tickets_aborted8);
1127 return RESULT_PREEMPTED;
1128 // return RESULT_WAIT;
1129 break;
1130 }
1131 thread_debug_register_result(times_repeated, tickets_repeated);
1132 return RESULT_REPEAT; // volatility, quickly attempt to run this routine once again
1133 }
1134 else
1135 {
1136 thread_debug_register_result(times_repeated, times_repeated);
1137 return RESULT_REPEAT; // volatility, quickly attempt to run this routine once again
1138 }
1139 }
1140 }
1141 else
1142 {
1143 thread_debug_register_result(times_aborted9, tickets_aborted9);
1144 return RESULT_ABORT;
1145 }
1146
1147}
1148
1149// when we don't want to wait for contention on RESULT_WAIT or RESULT_REPEAT from take_write_lock
1150/*void
1151untake_write_lock(ThreadContext *self, ShmLock *lock, ShmPointer shm_lock)
1152{
1153 // if (shm_cell_have_write_lock(self, lock) && p_atomic_int_get(&lock->writer_lock_ensured) == 0)
1154 {
1155 myassert(lock->writer_lock_ensured == 0, NULL);
1156 lock->release_line = __LINE__;
1157 p_atomic_pointer_set(&lock->writer_lock, LOCK_UNLOCKED);
1158 }
1159}*/
1160
1161void
1162notify_next_writer(ThreadContext *thread, ShmLock *lock)
1163{
1164 ThreadContext *that_thread = NULL;
1165 ShmPointer next_writer = p_atomic_shm_pointer_get(&lock->next_writer);
1166 if (SBOOL(next_writer))
1167 that_thread = shm_pointer_to_thread(next_writer); // next_writer has the highest priority most likely
1168 else
1169 {
1170 ShmPointer oldest = lock_queue_find_highest_priority(&lock->queue_threads, thread->index, NULL, NULL); // slow function
1171 if (SBOOL(oldest))
1172 that_thread = shm_pointer_to_thread(oldest);
1173 }
1174 // ensure the thread has exclusive access
1175 if (that_thread)
1176 {
1177 ShmReaderBitmap contenders = atomic_bitmap_contenders(&lock->reader_lock, that_thread->index);
1178 if (contenders == 0)
1179 shm_event_signal(&that_thread->ready);
1180 }
1181 // next_writer might still run away and never take our lock due to
1182 // some kind of timeout or conditional execution,
1183 // so we might try to implement infinite cycle here running until consistency reached.
1184}
1185
1186void
1187_shm_cell_unlock(ThreadContext *thread, ShmLock *lock, ShmInt type)
1188{
1189 myassert(type == TRANSACTION_ELEMENT_READ || type == TRANSACTION_ELEMENT_WRITE);
1190 bool test = false;
1191 if (TRANSACTION_ELEMENT_READ == type)
1192 test = atomic_bitmap_check_me(&lock->reader_lock, thread->index);
1193 else
1194 test = shm_cell_have_write_lock(thread, lock);
1195
1196 myassert_msg(test, "Trying to unlock the cell we don't own.");
1197
1198 bool had_lock = false;
1199 if (type == TRANSACTION_ELEMENT_READ)
1200 {
1201 had_lock = atomic_bitmap_reset(&lock->reader_lock, thread->index);
1202 // if (had_lock)
1203 myassert(had_lock);
1204 notify_next_writer(thread, lock);
1205
1206 thread->private_data->read_locks_taken--;
1207 // p_atomic_int_dec_and_test(&lock->readers_count);
1208 // myassert(p_atomic_int_get(&lock->readers_count) >= 0);
1209 }
1210 else
1211 {
1212 if (shm_cell_have_write_lock(thread, lock))
1213 {
1214 // p_atomic_pointer_set(&lock->writer_lock, LOCK_UNLOCKED);
1215 lock->prev_lock = lock->writer_lock;
1216 // p_atomic_int_set(&lock->writer_lock_ensured, 0); // always set/reset under writer_lock
1217
1218 // _thread_release_pending_lock(thread, lock);
1219 myassert(thread->pending_lock == EMPTY_SHM);
1220 //_thread_unqueue_from_lock(thread, lock);
1221
1222 p_atomic_int_dec_and_test(&superblock->debug_lock_count);
1223 // p_atomic_int_dec_and_test(&lock->writers_count);
1224 thread->private_data->write_locks_taken--;
1225
1226 myassert(lock->writers_count == 0);
1227 myassert_msg(lock->writer_lock == thread->self, "Our lock has been taken by someone else.");
1228 lock->release_line = __LINE__;
1229 p_atomic_pointer_set(&lock->writer_lock, LOCK_UNLOCKED);
1230 // first indicate there's nobody home, then signal to those who have begun waiting right before we left the home.
1231 // signal_thread(thread, lock->next_writer);
1232
1233 notify_next_writer(thread, lock);
1234 had_lock = true;
1235
1236 }
1237 }
1238 if (!had_lock)
1239 myassert_msg(had_lock, "Unlocking the cell we don't own.");
1240}
1241
1242static bool allocation_failed = false;
1243
1244struct alloc_more_closure {
1245 vl ShmInt *new_index;
1246 ShmInt old_index;
1247};
1248
1249int
1250superblock_alloc_more_cb(void *data)
1251{
1252 struct alloc_more_closure *closure = data;
1253 // if (superblock->block_count > oldcount)
1254 if (closure->old_index != -2 && p_atomic_int_get(closure->new_index) != closure->old_index)
1255 return RESULT_REPEAT;
1256 else
1257 return RESULT_INVALID;
1258}
1259
1260static bool allocation_rand_debug[0x10] = { false, };
1261static int allocation_rand_debug_index = 0;
1262
1263// Atomically claims the last unused index and sets new_index to this index and initializes ShmChunkHeader (only then atomic operation ends)
1264// "oldcount" should be "-2" to disable the check
1265int
1266superblock_alloc_more(ShmPointer thread, int type, vl ShmInt *new_index, int old_index)
1267{
1268 if (superblock->block_count >= SHM_BLOCK_COUNT)
1269 {
1270 if (!allocation_failed)
1271 {
1272 allocation_failed = TRUE;
1273 fprintf(stderr, "Shared block count limit exceeded. superblock_alloc_more failed, crash imminent.\n");
1274 }
1275 return RESULT_FAILURE;
1276 }
1277 // The call must abort in case there was an unexpected allocated in other thread (kinda compare-and-swap).
1278 // We can't read old value here, because decision to call superblock_alloc_more was made from outer function.
1279 // ShmInt oldcount = superblock->block_count;
1280
1281 // take the lock
1282 /*while (InterlockedCompareExchange(&superblock->lock, 1, 0) != 0) {
1283 if (superblock->block_count > oldcount)
1284 {
1285 // new block has been allocated by someone else. Return it
1286 return 0;
1287 }
1288 };*/
1289 // https://stackoverflow.com/questions/17182877/is-it-ok-to-use-a-code-block-as-an-argument-for-a-c-macro
1290 // if (superblock->block_count > oldcount)
1291 // https://stackoverflow.com/questions/23292828/macro-taking-code-statement-as-argument-fails-when-stdmap-is-present
1292 // https://stackoverflow.com/questions/8978997/how-can-i-see-the-output-of-the-visual-c-preprocessor
1293 // take_spinlock(CAS2, &superblock->lock, thread, 0, {
1294 // if (superblock->block_count > oldcount)
1295 // return RESULT_ABORT; // somebody have allocated block concurrently. Go into next iteration outside this function
1296 // });
1297
1298 struct alloc_more_closure closure = { new_index, old_index };
1299 int rslt = shm_lock_acquire_with_cb(&superblock->lock, superblock_alloc_more_cb, &closure);
1300 myassert(rslt != RESULT_INVALID);
1301 if (rslt != RESULT_OK && rslt != RESULT_INVALID)
1302 {
1303 if (shm_lock_owned(&superblock->lock))
1304 shm_lock_release(&superblock->lock);
1305 if (result_is_repeat(rslt))
1306 return RESULT_REPEAT;
1307 return rslt;
1308 }
1309 myassert(shm_lock_owned(&superblock->lock));
1310 if (alloc_flinch)
1311 {
1312 bool should_flinch = rand() % 4 == 1; // shared memory allocations are very rare
1313 allocation_rand_debug[allocation_rand_debug_index] = should_flinch;
1314 allocation_rand_debug_index++;
1315 allocation_rand_debug_index &= 0x10 - 1;
1316 if (should_flinch)
1317 {
1318 shm_lock_release(&superblock->lock);
1319 return RESULT_REPEAT;
1320 }
1321 }
1322 // same check as in superblock_alloc_more_cb, to ensure nobody accured a new block before we've got the lock.
1323 if (closure.old_index != -2 && p_atomic_int_get(closure.new_index) != closure.old_index)
1324 {
1325 shm_lock_release(&superblock->lock);
1326 return RESULT_REPEAT;
1327 }
1328 ShmInt newindex = superblock->block_count;
1329
1330 // if (superblock->block_count > oldcount)
1331 // {
1332 // release_spinlock(&superblock->lock, 0, thread);
1333 // // new block has been allocated by someone else. Return it
1334 // return 0;
1335 // }
1336
1337 rslt = RESULT_OK;
1338 int group_idx = newindex / SHM_BLOCK_GROUP_SIZE;
1339 int itemindex = newindex % SHM_BLOCK_GROUP_SIZE;
1340 ShmChunkHeader *newblock = NULL;
1341 if (itemindex == 0)
1342 {
1343 // allocating a new group
1344 // ShmChunkHeader *newblock = allocate_block((__ShmChunk *)NULL, true, SHM_FIXED_CHUNK_SIZE);
1345 SectorAllocationResult alloc_result = shm_allocate_sector((__ShmChunk*)&superblock->block_groups[group_idx], true, SHM_FIXED_CHUNK_SIZE * SHM_BLOCK_GROUP_SIZE);
1346 myassert(alloc_result.data);
1347 shm_sector_register_handle(&superblock->block_groups[group_idx], alloc_result.FileMapping,
1348 superblock->coordinator_process, false);
1349 newblock = alloc_result.data;
1350 if (newblock)
1351 {
1352 newblock->type = type;
1353 newblock->used = SHM_FIXED_CHUNK_HEADER_SIZE;
1354 superblock_mmap[group_idx] = (vl char *)newblock;
1355 }
1356 }
1357 else
1358 {
1359 // using a block from existing group
1360 vl char *block = superblock_mmap[group_idx];
1361 if (!block)
1362 {
1363 superblock_check_mmap_inited(group_idx);
1364 block = superblock_mmap[group_idx];
1365 }
1366 if (block)
1367 {
1368 newblock = (ShmChunkHeader *)(intptr_t)(block + SHM_FIXED_CHUNK_SIZE * itemindex);
1369 newblock->type = type;
1370 newblock->used = SHM_FIXED_CHUNK_HEADER_SIZE;
1371 }
1372 }
1373
1374 if (newblock)
1375 {
1376 p_atomic_int_set(&superblock->block_count, newindex + 1); // success
1377 if (new_index)
1378 *new_index = newindex;
1379 }
1380 else
1381 {
1382 rslt = RESULT_FAILURE;
1383 }
1384
1385 // release_spinlock(&superblock->lock, 0, thread);
1386 shm_lock_release(&superblock->lock);
1387 return rslt;
1388}
1389
1390// Transactions
1391
1392ShmTransactionElement *
1393thread_register_lock(ThreadContext *thread, ShmLock *lock, ShmPointer container_shm, int container_type, ShmInt type)
1394{
1395 myassert(type == TRANSACTION_ELEMENT_READ || type == TRANSACTION_ELEMENT_WRITE);
1396 bool owned = false;
1397 if (type == TRANSACTION_ELEMENT_WRITE)
1398 {
1399 myassert_msg(lock->writer_lock == thread->self, "Trying to register a write lock we don't own.");
1400 owned = lock->writer_lock == thread->self;
1401 }
1402 else if (type == TRANSACTION_ELEMENT_READ)
1403 {
1404 myassert_msg(atomic_bitmap_check_me(&lock->reader_lock, thread->index), "Trying to register a read lock we don't own.");
1405 }
1406
1407 // verify the container is not in the list yet
1408 ShmTransactionElement *element = thread->current_transaction;
1409 while (element)
1410 {
1411 myassert(element->type != type || element->container != container_shm);
1412 element = element->next;
1413 }
1414
1415 // append the lock into the locks list
1416 if (type == TRANSACTION_ELEMENT_WRITE)
1417 {
1418 myassert(!SBOOL(p_atomic_shm_pointer_get(&lock->transaction_data)));
1419 // lock->transaction_data = newone_shm;
1420 }
1421
1422 ShmPointer newone_shm;
1423 ShmTransactionElement *newone = (ShmTransactionElement *)get_mem(thread, &newone_shm, sizeof(ShmTransactionElement),
1424 SHM_TRANSACTION_ELEMENT_DEBUG_ID);
1425
1426 newone->owner = thread->self;
1427 newone->type = type;
1428 newone->container_type = container_type;
1429 newone->container = container_shm;
1430
1431 newone->next = thread->current_transaction;
1432 newone->next_shm = thread->current_transaction_shm;
1433 thread->current_transaction = newone;
1434 thread->current_transaction_shm = newone_shm;
1435
1436 return newone;
1437
1438 // if (type == TRANSACTION_ELEMENT_WRITE)
1439 // {
1440 // p_atomic_pointer_set(&lock->transaction_data, newone_shm); // transaction_data is more like a flag than an actual pointer
1441 // }
1442}
1443
1444void
1445thread_unregister_last_lock(ThreadContext *thread, ShmTransactionElement *element, ShmLock *lock, ShmPointer container_shm, int container_type, ShmInt type)
1446{
1447 ShmPointer to_release = thread->current_transaction_shm;
1448 myassert(shm_pointer_to_pointer(to_release) == element);
1449 myassert(element->container_type == container_type);
1450 myassert(element->container == container_shm);
1451 myassert(element->type == type);
1452
1453 thread->current_transaction = element->next;
1454 thread->current_transaction_shm = element->next_shm;
1455 free_mem(thread, to_release, sizeof(ShmTransactionElement));
1456}
1457
1458// here lock_shm is a pointer to the parent structure, aligned to the correct memory manager border
1459int
1460transaction_lock_read(ThreadContext *thread, ShmLock *lock, ShmPointer container_shm, int container_type, bool *lock_taken)
1461{
1462 assert(thread != NULL);
1463 myassert_msg(thread->transaction_mode != TRANSACTION_NONE, "thread->transaction_mode != TRANSACTION_NONE");
1464 if (thread->async_mode)
1465 {
1466 switch (thread->transaction_lock_mode)
1467 {
1468 case LOCKING_ALL:
1469 case LOCKING_WRITE:
1470 {
1471 bool had_write_lock = shm_cell_have_write_lock(thread, lock);
1472 bool had_read_lock = atomic_bitmap_check_me(&lock->reader_lock, thread->index);
1473 int rslt = take_read_lock(thread, lock, container_shm);
1474 if (rslt == RESULT_OK_LOCKED)
1475 {
1476 myassert(!had_read_lock);
1477 if (!had_read_lock)
1478 thread_register_lock(thread, lock, container_shm, container_type, TRANSACTION_ELEMENT_READ);
1479 if (lock_taken)
1480 *lock_taken = true;
1481 rslt = RESULT_OK;
1482 }
1483
1484 if (rslt == RESULT_OK)
1485 {
1486 myassert(atomic_bitmap_check_me(&lock->reader_lock, thread->index));
1487 myassert(p_atomic_shm_pointer_get(&lock->writer_lock) == 0 || p_atomic_shm_pointer_get(&lock->writer_lock) == thread->self);
1488 }
1489 return rslt;
1490 break;
1491 }
1492 case LOCKING_NONE:
1493 break;
1494 default:
1495 myassert_msg(false, "Invalid thread->transaction_lock_mode");
1496 return RESULT_FAILURE;
1497 }
1498 }
1499 else
1500 {
1501 switch (thread->transaction_lock_mode)
1502 {
1503 case LOCKING_ALL:
1504 case LOCKING_WRITE:
1505 {
1506 bool had_write_lock = shm_cell_have_write_lock(thread, lock);
1507 bool had_read_lock = atomic_bitmap_check_me(&lock->reader_lock, thread->index);
1508 bool new_write_lock = had_write_lock;
1509 bool new_read_lock = had_read_lock;
1510 if (had_write_lock)
1511 {
1512 // write lock implies read access
1513 int check_result = take_write_lock__checks(thread, lock, container_shm, false);
1514 if (RESULT_OK == check_result || RESULT_INVALID == check_result)
1515 return RESULT_OK;
1516 else
1517 return check_result;
1518 }
1519 if (had_read_lock)
1520 {
1521 // we don't need to enter the negotiation cycle because we've already reached exclusive ownership once.
1522 ShmPointer writer_lock = p_atomic_shm_pointer_get(&lock->writer_lock);
1523 ShmPointer next_writer = p_atomic_shm_pointer_get(&lock->next_writer);
1524 int check_result = take_read_lock__checks(thread, lock, next_writer, writer_lock, NULL, NULL);
1525 if (RESULT_OK_LOCKED == check_result || RESULT_OK == check_result || RESULT_INVALID == check_result)
1526 return RESULT_OK;
1527 else
1528 return check_result;
1529 }
1530 else
1531 {
1532 ShmTransactionElement *registered_lock = NULL;
1533 int final_result = RESULT_INVALID;
1534 do {
1535 int rslt = take_read_lock(thread, lock, container_shm);
1536 if (rslt == RESULT_OK_LOCKED || rslt == RESULT_OK)
1537 {
1538 registered_lock = thread_register_lock(thread, lock, container_shm, container_type, TRANSACTION_ELEMENT_READ);
1539 final_result = RESULT_OK;
1540 new_read_lock = true;
1541 break;
1542 }
1543 if (result_is_abort(rslt))
1544 {
1545 if (!had_read_lock && new_read_lock)
1546 {
1547 myassert(registered_lock);
1548 thread_unregister_last_lock(thread, registered_lock, lock, container_shm, container_type, TRANSACTION_ELEMENT_READ);
1549 registered_lock = NULL;
1550 new_read_lock = false;
1551 }
1552 if (shm_cell_have_read_lock(thread, lock))
1553 _shm_cell_unlock(thread, lock, TRANSACTION_ELEMENT_READ); // might not be registered yet
1554 final_result = rslt;
1555 break;
1556 }
1557 if (rslt == RESULT_WAIT_SIGNAL)
1558 {
1559 ShmPointer writer_lock = p_atomic_shm_pointer_get(&lock->writer_lock);
1560 ShmPointer next_writer = p_atomic_shm_pointer_get(&lock->next_writer);
1561 int check_result = take_read_lock__checks(thread, lock, next_writer, writer_lock, NULL, NULL);
1562 if (RESULT_OK_LOCKED == check_result || RESULT_OK == check_result || RESULT_INVALID == check_result)
1563 ; // good
1564 else
1565 {
1566 final_result = check_result;
1567 break;
1568 }
1569
1570 // This call is very problematic and we really need to ensure another thread would wake us up
1571 int wait_rslt = shm_event_wait(&thread->ready, 1, DEBUG_SHM_EVENTS);
1572 myassert(wait_rslt != RESULT_FAILURE);
1573 }
1574 else
1575 continue; // RESULT_WAIT || RESULT_REPEAT
1576 } while (1);
1577
1578 if (final_result == RESULT_OK)
1579 {
1580 if (lock_taken)
1581 *lock_taken = true;
1582 // count the lock only after we've ensured exclusive access at least once.
1583 p_atomic_int_add(&lock->readers_count, 1);
1584 shm_cell_check_read_lock(thread, lock);
1585 }
1586 else
1587 myassert(shm_cell_have_read_lock(thread, lock) == false);
1588
1589 return final_result;
1590 }
1591 break;
1592 }
1593 case LOCKING_NONE:
1594 break;
1595 default:
1596 myassert_msg(false, "Invalid thread->transaction_lock_mode");
1597 return RESULT_FAILURE;
1598 }
1599 }
1600 return RESULT_OK;
1601}
1602
1603int last_take_write_lock_result = 0;
1604
1605int
1606transaction_lock_write(ThreadContext *thread, ShmLock *lock, ShmPointer lock_shm, int container_type, bool *lock_taken)
1607{
1608 assert(thread != NULL);
1609 myassert_msg(thread->transaction_mode != TRANSACTION_NONE, "thread->transaction_mode != TRANSACTION_NONE");
1610 bool had_write_lock;
1611 bool had_read_lock;
1612 // LOCKING_TRANSIENT usually does not hold more than one lock at a time
1613 if (thread->async_mode)
1614 {
1615 int rslt = RESULT_OK;
1616 switch (thread->transaction_lock_mode)
1617 {
1618 case LOCKING_NONE:
1619 break;
1620 case LOCKING_WRITE:
1621 case LOCKING_ALL:
1622 had_write_lock = shm_cell_have_write_lock(thread, lock);
1623 had_read_lock = atomic_bitmap_check_me(&lock->reader_lock, thread->index);
1624 int orig_rslt = rslt = take_write_lock(thread, lock, lock_shm);
1625 if (rslt == RESULT_OK_LOCKED)
1626 {
1627 myassert(!had_write_lock);
1628 rslt = RESULT_OK;
1629 }
1630 if (rslt == RESULT_OK)
1631 {
1632 shm_cell_check_write_lock(thread, lock);
1633 if (!had_write_lock)
1634 {
1635 thread_register_lock(thread, lock, lock_shm, container_type, TRANSACTION_ELEMENT_WRITE);
1636 if (lock_taken) *lock_taken = true;
1637 }
1638 }
1639
1640 if (rslt == RESULT_OK)
1641 shm_cell_check_write_lock(thread, lock);
1642 return rslt;
1643 break;
1644 default:
1645 myassert_msg(false, "Invalid thread->transaction_lock_mode");
1646 return RESULT_FAILURE;
1647 }
1648 }
1649 else
1650 {
1651 int rslt = RESULT_OK;
1652 // from take_spinlock
1653 // int backoff = 64;
1654 switch (thread->transaction_mode)
1655 {
1656 case LOCKING_NONE:
1657 break;
1658 case LOCKING_WRITE:
1659 case LOCKING_ALL:
1660 had_write_lock = shm_cell_have_write_lock(thread, lock);
1661 had_read_lock = shm_cell_have_read_lock(thread, lock);
1662 bool new_write_lock = had_write_lock;
1663 bool new_read_lock = had_read_lock;
1664 int final_result = RESULT_INVALID;
1665 ShmTransactionElement *registered_lock = NULL;
1666 if (had_write_lock)
1667 {
1668 int rslt = take_write_lock__checks(thread, lock, lock_shm, false); // we don't care about low priorities here, we had this lock long time ago
1669 // having reading low-priority contenders is fine for us because we've got exclusive lock first.
1670 if (rslt != RESULT_PREEMPTED)
1671 return RESULT_OK;
1672 else
1673 return RESULT_PREEMPTED; // lock had to be registered by now
1674 }
1675 else do
1676 {
1677 final_result = RESULT_INVALID;
1678 int orig_rslt = rslt = take_write_lock(thread, lock, lock_shm);
1679 thread->private_data->last_take_write_lock_result = rslt;
1680 if (result_is_abort(rslt))
1681 {
1682 if (!had_write_lock && new_write_lock)
1683 {
1684 myassert(registered_lock);
1685 thread_unregister_last_lock(thread, registered_lock, lock, lock_shm, container_type, TRANSACTION_ELEMENT_WRITE);
1686 registered_lock = NULL;
1687 new_write_lock = false;
1688 }
1689 // unqueue and possibly unlock
1690 thread_unqueue_from_lock(thread); // we can unqueue any time, but should signal only after unqueueing and preferably after releasing the lock.
1691 if (shm_cell_have_write_lock(thread, lock))
1692 _shm_cell_unlock(thread, lock, TRANSACTION_ELEMENT_WRITE); // not be registered yet
1693 final_result = RESULT_ABORT;
1694 break;
1695 }
1696 if (rslt == RESULT_OK_LOCKED || rslt == RESULT_OK)
1697 {
1698 new_write_lock = true;
1699 if (!had_write_lock)
1700 {
1701 registered_lock = thread_register_lock(thread, lock, lock_shm, container_type, TRANSACTION_ELEMENT_WRITE);
1702 }
1703 // Unqueue only
1704 thread_unqueue_from_lock(thread);
1705 // We can unqueue any time, but should signal waiters only after unqueueing and preferably after releasing the lock.
1706 // notify_next_writer(thread, lock);
1707
1708 final_result = RESULT_OK;
1709 break;
1710 }
1711 if (rslt == RESULT_WAIT_SIGNAL)
1712 {
1713 int rslt = take_write_lock__checks(thread, lock, lock_shm, false);
1714 if (rslt == RESULT_PREEMPTED)
1715 {
1716 // same as a regular abort: unqueue and possibly unlock
1717 thread_unqueue_from_lock(thread); // we can unqueue any time, but should signal only after unqueueing and preferably after releasing the lock.
1718 if (shm_cell_have_write_lock(thread, lock))
1719 _shm_cell_unlock(thread, lock, TRANSACTION_ELEMENT_WRITE); // not be registered yet
1720 final_result = rslt;
1721 break;
1722 }
1723 // This call is very problematic and we really need to ensure another thread would wake us up
1724 int wait_rslt = shm_event_wait(&thread->ready, 1, DEBUG_SHM_EVENTS); // debug
1725 // int wait_rslt = shm_event_wait(&thread->ready, 1, false);
1726 myassert(wait_rslt != RESULT_FAILURE);
1727 }
1728 else
1729 continue; // RESULT_WAIT || RESULT_REPEAT
1730 /*
1731 // from take_spinlock
1732 if (backoff <= SPINLOCK_MAX_BACKOFF)
1733 {
1734 for (int i = 0; i < backoff; ++i)
1735 SHM_SMT_PAUSE;
1736 backoff *= 2;
1737 }
1738 else if (backoff <= SPINLOCK_MAX_SLEEP_BACKOFF)
1739 {
1740 Sleep(0);
1741 backoff *= 2;
1742 // int wait_rslt = shm_event_wait(&thread->ready, 2);
1743 // myassert(wait_rslt != RESULT_FAILURE, NULL);
1744 }
1745 else
1746 break;*/
1747 } while (1);
1748
1749 myassert_msg(final_result != RESULT_INVALID, "something very wrong happened");
1750 if (final_result == RESULT_OK)
1751 {
1752 if (lock_taken) *lock_taken = true;
1753 p_atomic_int_add(&lock->writers_count, 1);
1754 shm_cell_check_write_lock(thread, lock);
1755 }
1756 else
1757 myassert(shm_cell_have_write_lock(thread, lock) == false);
1758 return final_result;
1759 break;
1760 default:
1761 myassert_msg(false, "Invalid thread->transaction_lock_mode");
1762 return RESULT_FAILURE;
1763 }
1764 }
1765
1766 return RESULT_OK;
1767}
1768
1769// status is a status of the last locking operation
1770int
1771transaction_unlock_local(ThreadContext *thread, ShmLock *lock, ShmPointer lock_shm, int status, bool lock_taken)
1772{
1773 return RESULT_OK;
1774 assert(thread->transaction_mode != LOCKING_NONE);
1775 if ( ! lock_taken)
1776 return RESULT_OK;
1777 if (thread->transaction_mode == TRANSACTION_TRANSIENT)
1778 {
1779 myassert_msg(shm_cell_have_write_lock(thread, lock) ||
1780 atomic_bitmap_check_me(&lock->reader_lock, thread->index), "Thread does not own the lock.");
1781 // lock->id = 0;
1782 commit_transaction(thread, NULL);
1783 }
1784 else // LOCKING_WRITER || LOCKING_ALL
1785 {
1786 // handle the wait or abort status at higher level
1787 }
1788
1789 return RESULT_OK;
1790}
1791
1792int
1793transaction_push_mode(ThreadContext *thread, int transaction_mode, int transaction_lock_mode)
1794{
1795 myassert_msg(thread->private_data->transaction_stack->count < TRANSACTION_STACK_SIZE,
1796 "thread->transaction_stack->count < TRANSACTION_STACK_SIZE");
1797
1798 int idx = thread->private_data->transaction_stack->count; // this way we can at least catch some misuse
1799 thread->private_data->transaction_stack->modes[idx].transaction_mode = thread->transaction_mode;
1800 thread->private_data->transaction_stack->modes[idx].transaction_lock_mode = thread->transaction_lock_mode;
1801 thread->private_data->transaction_stack->count++;
1802
1803 thread->transaction_mode = transaction_mode;
1804 thread->transaction_lock_mode = transaction_lock_mode;
1805 return idx;
1806}
1807
1808int
1809transaction_pop_mode(ThreadContext *thread, int *transaction_mode, int *transaction_lock_mode)
1810{
1811 myassert_msg(thread->private_data->transaction_stack->count > 0, "thread->private_data->transaction_stack->count > 0");
1812
1813 int idx = thread->private_data->transaction_stack->count - 1; // this way we can at least catch some misuse
1814 thread->transaction_mode = thread->private_data->transaction_stack->modes[idx].transaction_mode;
1815 thread->transaction_lock_mode = thread->private_data->transaction_stack->modes[idx].transaction_lock_mode;
1816 thread->private_data->transaction_stack->count--;
1817 if (transaction_mode)
1818 *transaction_mode = thread->transaction_mode;
1819 if (transaction_lock_mode)
1820 *transaction_lock_mode = thread->transaction_lock_mode;
1821
1822 return thread->private_data->transaction_stack->count;
1823}
1824
1825int
1826transaction_parent_mode(ThreadContext *thread, int *transaction_mode, int *transaction_lock_mode)
1827{
1828 int idx = thread->private_data->transaction_stack->count - 1;
1829 if (thread->private_data->transaction_stack->count > 0)
1830 {
1831 int idx = thread->private_data->transaction_stack->count - 1;
1832 if (transaction_mode)
1833 *transaction_mode = thread->private_data->transaction_stack->modes[idx].transaction_mode;
1834 if (transaction_lock_mode)
1835 *transaction_mode = thread->private_data->transaction_stack->modes[idx].transaction_lock_mode;
1836 }
1837 else
1838 {
1839
1840 }
1841 return idx;
1842}
1843
1844int
1845shm_cell_unlock(ThreadContext *thread, ShmCell *item, ShmInt type);
1846int
1847shm_queue_unlock(ThreadContext *thread, ShmQueue *queue, ShmInt type);
1848int
1849shm_list_unlock(ThreadContext *thread, ShmList *list, ShmInt type);
1850int
1851shm_dict_unlock(ThreadContext *thread, ShmDict *dict, ShmInt type);
1852int
1853shm_undict_unlock(ThreadContext *thread, ShmUnDict *dict, ShmInt type);
1854int
1855shm_cell_rollback(ThreadContext *thread, ShmCell *item);
1856int
1857shm_cell_commit(ThreadContext *thread, ShmCell *item);
1858int
1859shm_queue_rollback(ThreadContext *thread, ShmQueue *queue);
1860int
1861shm_queue_commit(ThreadContext *thread, ShmQueue *queue);
1862int
1863shm_list_rollback(ThreadContext *thread, ShmList *list);
1864int
1865shm_list_commit(ThreadContext *thread, ShmList *list);
1866int
1867shm_dict_rollback(ThreadContext *thread, ShmDict *dict);
1868int
1869shm_dict_commit(ThreadContext *thread, ShmDict *dict);
1870int
1871shm_undict_commit(ThreadContext *thread, ShmUnDict *dict);
1872int
1873shm_undict_rollback(ThreadContext *thread, ShmUnDict *dict);
1874
1875int
1876transaction_end(ThreadContext *thread, bool rollback)
1877{
1878 ShmTransactionElement *element = thread->current_transaction;
1879 while (element)
1880 {
1881 myassert_msg(element->owner == NONE_SHM || element->owner == thread->self,
1882 "element->owner == NONE_SHM || element->owner == thread->self");
1883 if (element->type == TRANSACTION_ELEMENT_WRITE)
1884 {
1885 switch (element->container_type)
1886 {
1887 case CONTAINER_NONE:
1888 break;
1889 case CONTAINER_CELL:
1890 {
1891 CellRef cell;
1892 init_cell_ref(element->container, &cell);
1893 if (rollback)
1894 shm_cell_rollback(thread, cell.local);
1895 else
1896 shm_cell_commit(thread, cell.local);
1897 break;
1898 }
1899 case CONTAINER_QUEUE:
1900 {
1901 QueueRef queue;
1902 init_queue_ref(element->container, &queue);
1903 if (rollback)
1904 shm_queue_rollback(thread, queue.local);
1905 else
1906 shm_queue_commit(thread, queue.local);
1907 break;
1908 }
1909 case CONTAINER_LIST:
1910 {
1911 ListRef list;
1912 init_list_ref(element->container, &list);
1913 if (rollback)
1914 shm_list_rollback(thread, list.local);
1915 else
1916 shm_list_commit(thread, list.local);
1917 break;
1918 }
1919 case CONTAINER_DICT_DELTA:
1920 {
1921 DictRef dict;
1922 init_dict_ref(element->container, &dict);
1923 if (rollback)
1924 shm_dict_rollback(thread, dict.local);
1925 else
1926 shm_dict_commit(thread, dict.local);
1927 break;
1928 }
1929 case CONTAINER_UNORDERED_DICT:
1930 {
1931 UnDictRef undict;
1932 init_undict_ref(element->container, &undict);
1933 if (rollback)
1934 shm_undict_rollback(thread, undict.local);
1935 else
1936 shm_undict_commit(thread, undict.local);
1937 break;
1938 }
1939 case CONTAINER_PROMISE:
1940 {
1941 ShmPromise *promise = shm_pointer_to_pointer(element->container);
1942 if (rollback)
1943 shm_promise_rollback(thread, promise);
1944 else
1945 shm_promise_commit(thread, promise);
1946 break;
1947 }
1948 default:
1949 {
1950 char buf[40];
1951 snprintf(buf, 40, "transaction_end: element is invalid %d", element->container_type);
1952 myassert_msg(false, buf);
1953 }
1954 }
1955 }
1956 element = element->next;
1957 }
1958 return RESULT_OK;
1959}
1960
1961ShmTransactionElement *
1962transaction_unlock_all(ThreadContext *thread)
1963{
1964 myassert(shm_pointer_to_pointer(thread->current_transaction_shm) == thread->current_transaction);
1965 ShmTransactionElement *rslt = thread->current_transaction;
1966 while (thread->current_transaction)
1967 {
1968 ShmTransactionElement *element = thread->current_transaction;
1969
1970 myassert(element->type == TRANSACTION_ELEMENT_READ || element->type == TRANSACTION_ELEMENT_WRITE);
1971
1972 if (element->container_type != CONTAINER_NONE)
1973 {
1974 ShmContainer *container = shm_pointer_to_pointer(element->container);
1975 if (element->type == TRANSACTION_ELEMENT_READ)
1976 {
1977 p_atomic_int_dec_and_test(&container->lock.readers_count);
1978 myassert(p_atomic_int_get(&container->lock.readers_count) >= 0);
1979 int ref = shm_cell_have_write_lock(thread, &container->lock) ? 1 : 0;
1980 myassert(p_atomic_int_get(&container->lock.writers_count) == ref);
1981 }
1982 else
1983 {
1984 p_atomic_int_dec_and_test(&container->lock.writers_count);
1985 myassert(p_atomic_int_get(&container->lock.writers_count) == 0);
1986 int ref = shm_cell_have_read_lock(thread, &container->lock) ? 1 : 0;
1987 myassert(p_atomic_int_get(&container->lock.readers_count) == ref);
1988 }
1989 }
1990 switch (element->container_type)
1991 {
1992 case CONTAINER_NONE:
1993 break;
1994 case CONTAINER_CELL:
1995 {
1996 CellRef cell;
1997 init_cell_ref(element->container, &cell);
1998 shm_cell_unlock(thread, cell.local, element->type);
1999 break;
2000 }
2001 case CONTAINER_QUEUE:
2002 {
2003 QueueRef queue;
2004 init_queue_ref(element->container, &queue);
2005 shm_queue_unlock(thread, queue.local, element->type);
2006 break;
2007 }
2008 case CONTAINER_LIST:
2009 {
2010 ListRef list;
2011 init_list_ref(element->container, &list);
2012 shm_list_unlock(thread, list.local, element->type);
2013 break;
2014 }
2015 case CONTAINER_DICT_DELTA:
2016 {
2017 DictRef dict;
2018 init_dict_ref(element->container, &dict);
2019 shm_dict_unlock(thread, dict.local, element->type);
2020 break;
2021 }
2022 case CONTAINER_UNORDERED_DICT:
2023 {
2024 UnDictRef undict;
2025 init_undict_ref(element->container, &undict);
2026 shm_undict_unlock(thread, undict.local, element->type);
2027 break;
2028 }
2029 case CONTAINER_PROMISE:
2030 {
2031 ShmPromise *promise = shm_pointer_to_pointer(element->container);
2032 shm_promise_unlock(thread, promise, element->type);
2033 break;
2034 }
2035 default:
2036 myassert_msg(false, "transaction_end: element is invalid");
2037 }
2038
2039 ShmPointer to_release = thread->current_transaction_shm;
2040 thread->current_transaction_shm = thread->current_transaction->next_shm;
2041 thread->current_transaction = thread->current_transaction->next;
2042 // free_mem(thread, to_release, sizeof(ShmTransactionElement));
2043 myassert(shm_pointer_to_pointer(thread->current_transaction_shm) == thread->current_transaction);
2044 }
2045 return rslt;
2046}
2047
2048int
2049transaction_length(ThreadContext *thread)
2050{
2051 int cnt = 0;
2052 ShmTransactionElement *element = thread->current_transaction;
2053 while (element)
2054 {
2055 myassert_msg(element->owner == NONE_SHM || element->owner == thread->self,
2056 "element->owner == NONE_SHM || element->owner == thread->self");
2057 cnt++;
2058 myassert(shm_pointer_to_pointer(element->next_shm) == element->next);
2059 element = element->next;
2060 }
2061 return cnt;
2062}
2063
2064// is_initial determines whether it is the first try on this transaction or a retry
2065int
2066start_transaction(ThreadContext *thread, int mode, int locking_mode, int is_initial, int *recursion_count)
2067{
2068 assert(mode != LOCKING_NONE);
2069
2070 int newmode = thread->transaction_mode;
2071 if (mode > thread->transaction_mode)
2072 {
2073 newmode = mode;
2074 }
2075 int newlocking = thread->transaction_lock_mode;
2076 if (locking_mode > thread->transaction_lock_mode)
2077 {
2078 newlocking = locking_mode;
2079 }
2080 if (thread->debug_starts || thread->transaction_mode < TRANSACTION_PERSISTENT)
2081 {
2082 if (superblock->debug_lock_count > superblock->debug_max_lock_count)
2083 fprintf(stderr, "superblock->debug_lock_count %d\n", superblock->debug_lock_count);
2084 if (thread->private_data->pending_lock_count > 1)
2085 fprintf(stderr, "more than one pending lock\n");
2086 myassert(superblock->debug_lock_count <= superblock->debug_max_lock_count);
2087 myassert(thread->private_data->pending_lock_count <= 1);
2088
2089 myassert(thread->pending_lock == EMPTY_SHM);
2090 thread_unqueue_from_lock(thread); // signal we are not in queue as soon as possible.
2091 // Transaction end is a better place, but we still need it here for transient->persistent transition.
2092 if (is_initial) // for scheduling/prioritization)
2093 {
2094 int ticket = p_atomic_int_add(&superblock->ticket, 1);
2095 if (ticket != 0) // zero is reserved
2096 thread->last_start = ticket;
2097 else
2098 thread->last_start = p_atomic_int_add(&superblock->ticket, 1);
2099
2100 superblock->ticket_history[thread->last_start % 64] = thread->self;
2101 }
2102 thread_reset_preempted(thread); // reset the "preempted" flag the last, because non-running transaction cannot be preempted
2103 // and we start registering preemption only after transaction really started.
2104 }
2105 int rslt = transaction_push_mode(thread, newmode, newlocking); // always push for corresponding pop
2106 if (recursion_count)
2107 *recursion_count = rslt;
2108
2109 // if (is_initial) // for scheduling/prioritization
2110 // thread->last_start = GetTickCount(); // rel fence, atomic
2111
2112 return RESULT_OK;
2113}
2114
2115void
2116continue_transaction(ThreadContext *thread)
2117{
2118 shm_event_reset(&thread->ready);
2119}
2120
2121int
2122abort_transaction(ThreadContext *thread, int *recursion_count)
2123{
2124 int mode;
2125 int rslt = transaction_pop_mode(thread, &mode, NULL);
2126 if (recursion_count)
2127 *recursion_count = rslt;
2128 if (thread->debug_starts || thread->transaction_mode <= TRANSACTION_TRANSIENT)
2129 {
2130 myassert(thread->pending_lock == EMPTY_SHM);
2131 thread_unqueue_from_lock(thread);
2132 }
2133 // // unconditionally, release locks for other waiters ASAP
2134 // Okay, we can actually have aborts from transient transactions which should not lead to a global abort
2135 if (thread->transaction_mode <= TRANSACTION_TRANSIENT)
2136 {
2137 transaction_end(thread, true);
2138 ShmTransactionElement *debug = transaction_unlock_all(thread);
2139 thread_reset_preempted(thread); // we don't need to do this when transaction ends, but anyways.
2140 myassert(thread->private_data->read_locks_taken == 0);
2141 myassert(thread->private_data->write_locks_taken == 0);
2142 }
2143 if (mode <= LOCKING_NONE)
2144 p_atomic_int_set(&thread->test_finished, 0);
2145
2146 return RESULT_OK;
2147}
2148
2149int
2150abort_transaction_retaining(ThreadContext *thread)
2151{
2152 // int mode;
2153 // int rslt = transaction_pop_mode(thread, &mode, NULL);
2154 // if (recursion_count)
2155 // *recursion_count = rslt;
2156 int mode;
2157 if (transaction_parent_mode(thread, &mode, NULL) < 0)
2158 mode = TRANSACTION_NONE;
2159 if (thread->debug_starts || mode <= TRANSACTION_TRANSIENT)
2160 {
2161 myassert(thread->pending_lock == EMPTY_SHM);
2162 thread_unqueue_from_lock(thread);
2163 }
2164 if (mode <= TRANSACTION_TRANSIENT)
2165 {
2166 transaction_end(thread, true);
2167 ShmTransactionElement *debug = transaction_unlock_all(thread);
2168 thread_reset_preempted(thread);
2169 myassert(thread->private_data->write_locks_taken == 0);
2170 myassert(thread->private_data->read_locks_taken == 0);
2171 }
2172 // if (mode <= LOCKING_NONE)
2173 // p_atomic_int_set(&thread->test_finished, 0);
2174
2175 return RESULT_OK;
2176}
2177
2178int
2179abort_transaction_retaining_debug_preempt(ThreadContext *thread)
2180{
2181 // thread->thread_preempted = 0;
2182 transaction_end(thread, true);
2183 ShmTransactionElement *debug = transaction_unlock_all(thread);
2184 thread_reset_preempted(thread);
2185 myassert(thread->private_data->write_locks_taken == 0);
2186 myassert(thread->private_data->read_locks_taken == 0);
2187 return RESULT_OK;
2188}
2189
2190bool transient_pause = false;
2191
2192// similar to abort_transaction_retaining
2193int
2194transient_commit(ThreadContext *thread)
2195{
2196 int mode = thread->transaction_mode;
2197 if (mode <= TRANSACTION_TRANSIENT)
2198 {
2199 if (transient_pause) Sleep(0);
2200 myassert(thread->pending_lock == EMPTY_SHM);
2201 thread_unqueue_from_lock(thread);
2202 transaction_end(thread, false);
2203 ShmTransactionElement *debug = transaction_unlock_all(thread);
2204 thread_reset_preempted(thread);
2205 myassert(thread->private_data->write_locks_taken == 0);
2206 myassert(thread->private_data->read_locks_taken == 0);
2207 return RESULT_OK;
2208 }
2209 return RESULT_INVALID;
2210}
2211
2212int
2213transient_abort(ThreadContext *thread)
2214{
2215 // same as transient_commit except transaction_end
2216 int mode = thread->transaction_mode;
2217 if (mode <= TRANSACTION_TRANSIENT)
2218 {
2219 myassert(thread->pending_lock == EMPTY_SHM);
2220 thread_unqueue_from_lock(thread);
2221 transaction_end(thread, true);
2222 ShmTransactionElement *debug = transaction_unlock_all(thread);
2223 thread_reset_preempted(thread);
2224 myassert(thread->private_data->write_locks_taken == 0);
2225 myassert(thread->private_data->read_locks_taken == 0);
2226 return RESULT_OK;
2227 }
2228 return RESULT_INVALID;
2229}
2230
2231void
2232transient_check_clear(ThreadContext *thread)
2233{
2234 if (thread->transaction_mode == TRANSACTION_TRANSIENT)
2235 {
2236 myassert(SBOOL(thread->current_transaction_shm) == false);
2237 myassert(thread->current_transaction == NULL);
2238 }
2239}
2240
2241int
2242commit_transaction(ThreadContext *thread, int *recursion_count)
2243{
2244 int mode;
2245 int rslt = transaction_pop_mode(thread, &mode, NULL);
2246 if (recursion_count)
2247 *recursion_count = rslt;
2248 myassert(mode == thread->transaction_mode);
2249
2250 // if (thread->debug_starts)
2251 // if (p_atomic_shm_pointer_get(&thread->thread_preempted) != 0)
2252 // p_atomic_shm_pointer_set(&thread->thread_preempted, 0);
2253 if (mode <= TRANSACTION_TRANSIENT)
2254 {
2255 // persistent transactions are aborted as a whole.
2256 myassert(thread->pending_lock == EMPTY_SHM);
2257 thread_unqueue_from_lock(thread);
2258 transaction_end(thread, false);
2259 ShmTransactionElement *debug = transaction_unlock_all(thread);
2260 thread_reset_preempted(thread);
2261 // can mark transaction as finished here, no local pointers allowed afterwards
2262 myassert(thread->private_data->write_locks_taken == 0);
2263 myassert(thread->private_data->read_locks_taken == 0);
2264 }
2265 if (mode <= LOCKING_NONE)
2266 p_atomic_int_set(&thread->test_finished, 0);
2267 return rslt;
2268}
2269
2270int
2271init_thread_context(ThreadContext **context)
2272{
2273 myassert(superblock);
2274 int index = -1;
2275 int heap_index = -1;
2276 // those two are linked anyway
2277 take_spinlock(CAS2, &superblock->superheap.lock, 1, 0, {
2278 if (CAS2(&superblock->superheap.lock, 1, 2))
2279 {
2280 // initial
2281 heap_index = 0;
2282 break;
2283 }
2284 });
2285 take_spinlock(CAS2, &superblock->threads.lock, 1, 0, {
2286 if (CAS2(&superblock->threads.lock, 1, 2))
2287 {
2288 // initial
2289 index = 0;
2290 break;
2291 }
2292 });
2293 myassert(superblock->superheap.lock == 1);
2294 myassert(superblock->threads.lock == 1);
2295
2296 if (heap_index == -1)
2297 {
2298 for (int i = 0; i < MAX_THREAD_COUNT; ++i)
2299 {
2300 if (superblock->superheap.heaps[i].thread == 0)
2301 {
2302 heap_index = i;
2303 break;
2304 }
2305 }
2306 };
2307 if (index == -1)
2308 {
2309 for (int i = 0; i < MAX_THREAD_COUNT; ++i)
2310 {
2311 if (superblock->threads.threads[i] == 0)
2312 {
2313 index = i;
2314 break;
2315 }
2316 }
2317 };
2318
2319 int rslt = RESULT_FAILURE;
2320 *context = 0;
2321 if (index >= 0 && heap_index >= 0)
2322 {
2323 ShmPointer self = EMPTY_SHM;
2324 ThreadContext *thread = (ThreadContext *)get_mem(NULL, &self, sizeof(ThreadContext), SHM_THREAD_CONTEXT_ID);
2325 memset(CAST_VL(thread), 0, sizeof(ThreadContext));
2326 thread->magic = SHM_THREAD_MAGIC;
2327 thread->self = self;
2328 thread->private_data = calloc(1, sizeof(ThreadContextPrivate));
2329
2330 superblock->threads.threads[index] = self;
2331 superblock->superheap.heaps[heap_index].thread = self; // claiming the heap slot
2332 // superblock->superheap.heaps[heap_index].size = sizeof(ShmHeap); - inited in init_superblock
2333 thread->index = index;
2334 myassert(thread->index >= 0 && thread->index < isizeof(ShmReaderBitmap));
2335
2336 thread->thread_state = THREAD_NORMAL;
2337 thread->thread_preempted = EMPTY_SHM;
2338 thread->transaction_mode = LOCKING_NONE;
2339 // thread->last_start = GetTickCount();
2340 thread->last_start = 0;
2341 thread->test_finished = FALSE;
2342 thread->async_mode = FALSE;
2343 thread->free_list = DEBUG_SHM;
2344 thread->heap = pack_shm_pointer((intptr_t)&superblock->superheap.heaps[heap_index] - (intptr_t)superblock, SHM_INVALID_BLOCK);
2345 myassert(shm_pointer_to_pointer_root(thread->heap) == &superblock->superheap.heaps[heap_index]);
2346
2347 thread->local_vars = (LocalReferenceBlock *)get_mem(NULL, &thread->local_vars_shm, sizeof(LocalReferenceBlock), THREAD_LOCAL_VARS_DEBUG_ID);
2348 memset(thread->local_vars, 0, sizeof(LocalReferenceBlock));
2349
2350 // thread->transaction_stack = (ShmTransactionStack *)get_mem(NULL, &thread->transaction_stack_shm, sizeof(ShmTransactionStack));
2351 thread->private_data->transaction_stack = calloc(1, sizeof(ShmTransactionStack));
2352 thread->private_data->transaction_stack->count = 0;
2353
2354 thread->current_transaction_shm = EMPTY_SHM;
2355
2356 thread->pending_lock = EMPTY_SHM;
2357
2358 // thread->ready = 0;
2359 // thread->ready_event = 0;
2360 shm_event_init(&thread->ready);
2361
2362 thread->waiting_for_lock = EMPTY_SHM;
2363 thread->next = EMPTY_SHM;
2364
2365 // moved above
2366 // superblock->threads.threads[index] = self;
2367
2368 // superblock->superheap.heaps[heap_index].lock = 0;
2369 shm_simple_lock_init(&superblock->superheap.heaps[heap_index].lock); // it won't be used earlier anyway
2370 superblock->superheap.heaps[heap_index].owner = ShmGetCurrentThreadId();
2371 superblock->superheap.heaps[heap_index].count = 0;
2372 superblock->superheap.heaps[heap_index].large_segment = EMPTY_SHM;
2373 superblock->superheap.heaps[heap_index].fixed_sectors.head = EMPTY_SHM;
2374 superblock->superheap.heaps[heap_index].fixed_sectors.tail = EMPTY_SHM;
2375 superblock->superheap.heaps[heap_index].flex_sectors.head = EMPTY_SHM;
2376 superblock->superheap.heaps[heap_index].flex_sectors.tail = EMPTY_SHM;
2377
2378 *context = thread;
2379 rslt = RESULT_OK;
2380 }
2381
2382 if (superblock->threads.lock == 1)
2383 superblock->threads.lock = 0;
2384 if (superblock->superheap.lock == 1)
2385 superblock->superheap.lock = 0;
2386 return rslt;
2387}
2388
2389void
2390shm_thread_reset_debug_counters(ThreadContext *thread)
2391{
2392 thread->private_data->times_aborted = 0;
2393 thread->private_data->times_waiting = 0;
2394 thread->private_data->times_waiting2 = 0;
2395 thread->private_data->times_repeated = 0;
2396 thread->private_data->times_aborted1 = 0;
2397 thread->private_data->times_aborted2 = 0;
2398 thread->private_data->times_aborted3 = 0;
2399 thread->private_data->times_aborted4 = 0;
2400 thread->private_data->times_aborted5 = 0;
2401 thread->private_data->times_aborted6 = 0;
2402 thread->private_data->times_aborted7 = 0;
2403 thread->private_data->times_aborted8 = 0;
2404 thread->private_data->times_aborted9 = 0;
2405 thread->private_data->tickets_aborted = 0;
2406 thread->private_data->tickets_waiting = 0;
2407 thread->private_data->tickets_waiting2 = 0;
2408 thread->private_data->tickets_repeated = 0;
2409 thread->private_data->tickets_aborted1 = 0;
2410 thread->private_data->tickets_aborted2 = 0;
2411 thread->private_data->tickets_aborted3 = 0;
2412 thread->private_data->tickets_aborted4 = 0;
2413 thread->private_data->tickets_aborted5 = 0;
2414 thread->private_data->tickets_aborted6 = 0;
2415 thread->private_data->tickets_aborted7 = 0;
2416 thread->private_data->tickets_aborted8 = 0;
2417 thread->private_data->tickets_aborted9 = 0;
2418}
2419
2420// ShmPointer routines
2421
2422ShmWord
2423shm_pointer_get_block(ShmPointer pntr)
2424{
2425 return (pntr) >> SHM_OFFSET_BITS;
2426}
2427
2428ShmWord
2429shm_pointer_get_offset(ShmPointer pntr)
2430{
2431 return (pntr) & SHM_INVALID_OFFSET; // 18 bits
2432}
2433
2434bool
2435shm_pointer_is_valid(ShmPointer pntr)
2436{
2437 return SBOOL(pntr) && shm_pointer_get_offset(pntr) != SHM_INVALID_OFFSET &&
2438 shm_pointer_get_offset(pntr) > SHM_FIXED_CHUNK_HEADER_SIZE; // only internal routines can read the header
2439}
2440
2441bool
2442SBOOL(ShmPointer pntr)
2443{
2444 return shm_pointer_get_block(pntr) != SHM_INVALID_BLOCK && pntr != NONE_SHM;
2445}
2446
2447void *
2448shm_pointer_to_pointer_root(ShmPointer pntr)
2449{
2450 ShmWord idx = shm_pointer_get_block(pntr);
2451 vl char *block;
2452 if (idx == SHM_INVALID_BLOCK)
2453 block = (vl char *)superblock;
2454 else
2455 block = superblock_get_block(idx);
2456 if (!block)
2457 return NULL;
2458 ShmWord offset = shm_pointer_get_offset(pntr);
2459 if (idx == SHM_INVALID_BLOCK)
2460 {
2461 myassert(offset < isizeof(ShmSuperblock));
2462 if (offset >= isizeof(ShmSuperblock))
2463 return NULL;
2464 }
2465 else
2466 {
2467 // myassert(offset < superblock->blocks[idx].size, NULL);
2468 // if (offset >= superblock->blocks[idx].size)
2469 // return NULL;
2470
2471 myassert(offset < SHM_FIXED_CHUNK_SIZE);
2472 if (offset >= SHM_FIXED_CHUNK_SIZE)
2473 return NULL;
2474 }
2475 return CAST_VL(block + offset);
2476}
2477
2478void *
2479shm_pointer_to_pointer_unsafe(ShmPointer pntr)
2480{
2481 ShmWord idx = shm_pointer_get_block(pntr);
2482 vl char *block;
2483 if (idx == SHM_INVALID_BLOCK)
2484 return NULL;
2485 else
2486 block = superblock_get_block(idx);
2487 if (!block)
2488 return NULL;
2489 ShmWord offset = shm_pointer_get_offset(pntr);
2490 if (idx == SHM_INVALID_BLOCK)
2491 return NULL;
2492 else
2493 {
2494 myassert(offset < SHM_FIXED_CHUNK_SIZE);
2495 if (offset >= SHM_FIXED_CHUNK_SIZE)
2496 return NULL;
2497 }
2498 return CAST_VL(block + offset);
2499}
2500
2501void *
2502shm_pointer_to_pointer(ShmPointer pntr)
2503{
2504 if (!shm_pointer_is_valid(pntr))
2505 return NULL;
2506 return shm_pointer_to_pointer_unsafe(pntr);
2507}
2508
2509
2510void *
2511shm_pointer_to_pointer_no_side(ShmPointer pntr)
2512{
2513 ShmWord idx = shm_pointer_get_block(pntr);
2514 vl char *block;
2515 block = superblock_mmap[idx / SHM_BLOCK_GROUP_SIZE];
2516 if (!block)
2517 return NULL;
2518 ShmWord offset = shm_pointer_get_offset(pntr);
2519 if (idx == SHM_INVALID_BLOCK)
2520 {
2521 return NULL;
2522 }
2523 else
2524 {
2525 // myassert(offset < superblock->blocks[idx].size, NULL);
2526 // if (offset >= superblock->blocks[idx].size)
2527 // return NULL;
2528 myassert(offset < SHM_FIXED_CHUNK_SIZE);
2529 if (offset >= SHM_FIXED_CHUNK_SIZE)
2530 return NULL;
2531 }
2532 return CAST_VL(block + SHM_FIXED_CHUNK_SIZE * (idx % SHM_BLOCK_GROUP_SIZE) + offset);
2533}
2534
2535void *
2536SPTR(ShmPointer pntr)
2537{
2538 return shm_pointer_to_pointer(pntr);
2539}
2540
2541ShmPointer
2542pack_shm_pointer(ShmWord offset, ShmWord block)
2543{
2544 uintptr_t _offset = (uintptr_t)offset;
2545 uintptr_t _block = (uintptr_t)block;
2546 myassert(_offset <= SHM_INVALID_OFFSET);
2547 myassert(_block <= SHM_INVALID_BLOCK);
2548 _offset &= SHM_INVALID_OFFSET;
2549 _block &= SHM_INVALID_BLOCK;
2550 return (_block << SHM_OFFSET_BITS) | _offset;
2551}
2552
2553ShmPointer
2554pointer_to_shm_pointer(void *pntr, int block)
2555{
2556 myassert(block <= SHM_INVALID_BLOCK);
2557 vl char *block_base = superblock_get_block_noside(block);
2558 int offset = (char*)pntr - block_base;
2559 myassert(offset >= 0);
2560 myassert(offset < SHM_FIXED_CHUNK_SIZE);
2561 return pack_shm_pointer(offset, block);
2562}
2563
2564ShmPointer
2565shm_pointer_shift(ShmPointer pointer, int offset)
2566{
2567 int block = shm_pointer_get_block(pointer);
2568 int orig_offset = shm_pointer_get_offset(pointer);
2569 orig_offset = orig_offset + offset;
2570 if (orig_offset >= SHM_INVALID_OFFSET)
2571 return EMPTY_SHM;
2572 return pack_shm_pointer(orig_offset, block);
2573}
2574
2575// ShmCell
2576
2577ShmLock *
2578shm_cell_to_lock(ShmPointer cell_shm)
2579{
2580 ShmCell *cell = shm_pointer_to_pointer(cell_shm);
2581 if (cell)
2582 {
2583 return &cell->lock;
2584 }
2585 else
2586 return NULL;
2587}
2588
2589bool
2590init_cell_ref(ShmPointer cell_shm, CellRef *cell)
2591{
2592 cell->shared = cell_shm;
2593 cell->local = shm_pointer_to_pointer(cell_shm);
2594 return !!cell->local;
2595}
2596
2597bool
2598init_queuecell_ref(ShmPointer cell_shm, QueueCellRef *cell)
2599{
2600 return init_cell_ref(cell_shm, (CellRef *) cell);
2601}
2602
2603bool
2604init_queue_ref(ShmPointer cell_shm, QueueRef *cell)
2605{
2606 return init_cell_ref(cell_shm, (CellRef *) cell);
2607}
2608
2609bool
2610init_list_ref(ShmPointer cell_shm, ListRef *cell)
2611{
2612 return init_cell_ref(cell_shm, (CellRef *)cell);
2613}
2614
2615bool
2616init_dict_ref(ShmPointer cell_shm, DictRef *cell)
2617{
2618 return init_cell_ref(cell_shm, (CellRef *)cell);
2619}
2620
2621bool
2622init_undict_ref(ShmPointer cell_shm, UnDictRef *cell)
2623{
2624 return init_cell_ref(cell_shm, (CellRef *)cell);
2625}
2626
2627vl char *
2628debug_id_to_str(int id)
2629{
2630 id &= 0xFFFFFF; // filter off the "0x1D >> 24" part
2631 myassert(id >= 0 && id < MAX_DEBUG_ID);
2632 return superblock->type_debug_ids[id];
2633}
2634
2635int
2636init_superblock(const char *id)
2637{
2638 uint32_t v1 = (((uintptr_t)SHM_INVALID_BLOCK) << SHM_OFFSET_BITS) +
2639 SHM_INVALID_OFFSET;
2640 uint32_t v2 = ~(uint32_t)0;
2641 myassert(v1 == v2);
2642
2643 init_mm_maps();
2644 init_coordinator();
2645#if (defined(P_OS_WIN) || defined(P_OS_WIN64))
2646 init_private_events();
2647#endif
2648
2649 long memsize = sizeof(ShmSuperblock);
2650 bool existing = !!id;
2651 if (existing)
2652 {
2653 memclear(&superblock_desc.id[0], SHM_SAFE_NAME_LENGTH + 1);
2654 strcpy_s(CAST_VL(&superblock_desc.id[0]), SHM_SAFE_NAME_LENGTH + 1, id);
2655 }
2656 srand((unsigned)time(NULL)); // otherwise we will be getting same names every time
2657 // superblock = allocate_block((__ShmChunk *)&superblock_desc, !existing, memsize, true, NULL, &hFileMapping);
2658 SectorAllocationResult alloc_result = shm_allocate_sector((__ShmChunk*)&superblock_desc, !existing, memsize);
2659 superblock = alloc_result.data;
2660 if (!superblock)
2661 return RESULT_FAILURE;
2662 ShmProcessID coordinator_process = 0;
2663 bool i_am_the_coordinator = false;
2664 if (existing)
2665 coordinator_process = superblock->coordinator_process;
2666 else
2667 {
2668 // superblock->coordinator_process is not initialized yet
2669 i_am_the_coordinator = !existing;
2670 }
2671 shm_sector_register_handle(&superblock_desc, alloc_result.FileMapping, coordinator_process, i_am_the_coordinator);
2672
2673 if (!existing) {
2674 memclear(superblock, sizeof(ShmSuperblock));
2675 superblock->type = SHM_BLOCK_TYPE_SUPER;
2676
2677 superblock->coordinator_process = ShmGetCurrentProcessId();
2678 memclear(&superblock->coordinator_data, sizeof(ShmCoordinatorData));
2679 shm_simple_lock_init(&superblock->lock);
2680
2681 superblock->block_count = 0;
2682 superblock->ticket = 0;
2683 superblock->mm_last_used_root_sector = (ShmInt)-1;
2684 superblock->last_available_event_id = 0;
2685 // memclear(superblock->block_groups);
2686 superblock->threads.size = sizeof(ShmThreads);
2687 superblock->threads.count = MAX_THREAD_COUNT;
2688 superblock->threads.lock = 2;
2689
2690 superblock->superheap.size = sizeof(ShmSuperheap);
2691 superblock->superheap.count = MAX_THREAD_COUNT;
2692 superblock->superheap.lock = 2;
2693 for (int i = 0; i < MAX_THREAD_COUNT; ++i)
2694 {
2695 superblock->superheap.heaps[i].size = sizeof(ShmHeap);
2696 }
2697 superblock->superheap.self = pack_shm_pointer((intptr_t)&superblock->superheap - (intptr_t)superblock, SHM_INVALID_BLOCK);
2698
2699 superblock->has_grabage = 0;
2700 // event initialization uses superblock variable, so we do it last.
2701 shm_event_init(&superblock->has_grabage_event);
2702
2703 init_debug_type_ids(&superblock->type_debug_ids);
2704
2705 start_coordinator();
2706 }
2707 return RESULT_OK;
2708}
2709
2710////////////// Basic Pointer Operations //////////////////
2711
2712//////////////////////////////////// Types /////////////////////
2713
2714static ShmPointer pchar_type_desc = EMPTY_SHM;
2715
2716ShmPointer get_pchar_type_desc(ThreadContext *thread)
2717{
2718 if (shm_pointer_is_valid(pchar_type_desc))
2719 return pchar_type_desc;
2720 // ShmPointer id = 1;
2721 // if (thread)
2722 // id = thread->self;
2723 if (get_mem(thread, &pchar_type_desc, sizeof(ShmPointer), SHM_TYPE_DESC_DEBUG_ID))
2724 return pchar_type_desc;
2725 else
2726 return EMPTY_SHM;
2727}
2728
2729int
2730shm_value_get_length(ShmValueHeader *value)
2731{
2732 myassert(value->size > isizeof(ShmValueHeader));
2733 return value->size - isizeof(ShmValueHeader);
2734}
2735
2736void *
2737shm_value_get_data(ShmValueHeader *value)
2738{
2739 return (void *)(((uintptr_t)value) + sizeof(ShmValueHeader));
2740}
2741
2742void
2743init_container(ShmContainer *cell, ShmInt size, ShmInt type)
2744{
2745 myassert((type & SHM_TYPE_FLAG_REFCOUNTED) == SHM_TYPE_FLAG_REFCOUNTED);
2746 myassert((type & SHM_TYPE_FLAG_CELL) == SHM_TYPE_FLAG_CELL);
2747 memset(CAST_VL(cell), 0x1B, (puint)size);
2748 cell->refcount = 1;
2749 cell->size = size;
2750 cell->type = type;
2751 cell->lock.reader_lock = 0;
2752 // cell->lock.rw_barrier_thread = EMPTY_SHM;
2753 // cell->lock.rw_barrier_value = 0;
2754 // cell->lock.writer_lock_ensured = 0;
2755 cell->lock.writer_lock = LOCK_UNLOCKED;
2756 // cell->lock.next_writer = 0;
2757 cell->lock.next_writer = LOCK_UNLOCKED;
2758 // cell->lock.queue = NONE_SHM;
2759 cell->lock.queue_threads = 0;
2760 cell->lock.transaction_data = EMPTY_SHM;
2761 cell->lock.prev_lock = EMPTY_SHM;
2762 cell->lock.release_line = __LINE__;
2763 cell->lock.writers_count = 0;
2764 cell->lock.readers_count = 0;
2765}
2766
2767//////////////////////////////////// Queue ///////////////////////
2768
2769static void
2770init_cell(ShmCell *cell, ShmInt size, ShmInt type)
2771{
2772 init_container((ShmContainer *)cell, size, type);
2773 cell->data = EMPTY_SHM;
2774 cell->has_new_data = false;
2775 cell->new_data = EMPTY_SHM;
2776}
2777
2778vl void *
2779new_shm_refcounted_block(ThreadContext *thread, PShmPointer shm_pointer, int total_size, ShmInt type, int debug_id)
2780{
2781 myassert((type & SHM_TYPE_FLAG_REFCOUNTED) == SHM_TYPE_FLAG_REFCOUNTED);
2782 ShmRefcountedBlock *new_block = get_mem(thread, shm_pointer, total_size, debug_id);
2783 memclear(new_block, total_size);
2784 new_block->type = type;
2785 new_block->size = total_size;
2786 new_block->refcount = 1;
2787 return new_block;
2788}
2789
2790ShmValueHeader *
2791new_shm_value(ThreadContext *thread, ShmInt item_size, ShmInt type, PShmPointer shm_pointer)
2792{
2793 if (shm_pointer) *shm_pointer = EMPTY_SHM;
2794 int total_size = item_size + isizeof(ShmValueHeader);
2795 ShmValueHeader *value = new_shm_refcounted_block(thread, shm_pointer, total_size, type | SHM_TYPE_FLAG_REFCOUNTED, SHM_VALUE_DEBUG_ID);
2796 return value;
2797}
2798
2799ShmInt
2800shm_value_get_size(ShmValueHeader *value)
2801{
2802 myassert_msg(value->size >= isizeof(ShmValueHeader), "value->size >= sizeof(ShmValueHeader)");
2803 if (value->size >= isizeof(ShmValueHeader))
2804 return value->size - isizeof(ShmValueHeader);
2805 else
2806 return 0;
2807}
2808
2809ShmValueHeader *
2810new_bytes_zt(ThreadContext *thread, const char *value, ShmInt size, PShmPointer result_shm)
2811{
2812 ShmValueHeader *rslt = new_shm_value(thread, size+1, SHM_TYPE_BYTES, result_shm);
2813 if (!shm_value_get_data(rslt))
2814 {
2815 fprintf(stderr, "Zero value data on allocation");
2816 return NULL;
2817 }
2818 strncpy_s(shm_value_get_data(rslt), (size_t)size + 1, value, (size_t)size);
2819 return rslt;
2820}
2821
2822void
2823shm_cell_get_data(ThreadContext *thread, ShmCell *cell, PShmPointer data)
2824{
2825 if (shm_cell_have_write_lock(thread, &cell->lock))
2826 {
2827 // cell is locked by me
2828 if (cell->has_new_data)
2829 *data = cell->new_data;
2830 else
2831 *data = cell->data;
2832 }
2833 else
2834 *data = cell->data;
2835}
2836
2837int
2838shm_cell_commit(ThreadContext *thread, ShmCell *item)
2839{
2840 shm_cell_check_write_lock(thread, &item->lock);
2841 if (item->has_new_data)
2842 {
2843 item->has_new_data = false;
2844 shm_pointer_move(thread, &item->data, &item->new_data);
2845 }
2846 return RESULT_OK;
2847}
2848
2849int
2850shm_cell_rollback(ThreadContext *thread, ShmCell *item)
2851{
2852 shm_cell_check_write_lock(thread, &item->lock);
2853 if (item->has_new_data)
2854 {
2855 item->has_new_data = false;
2856 shm_pointer_empty(thread, &item->new_data);
2857 }
2858 return RESULT_OK;
2859}
2860
2861int
2862shm_cell_unlock(ThreadContext *thread, ShmCell *item, ShmInt type)
2863{
2864 // myassert(item->lock.id == thread->self, "item->lock.id == thread->self");
2865 if (TRANSACTION_ELEMENT_WRITE == type)
2866 p_atomic_shm_pointer_set(&item->lock.transaction_data, EMPTY_SHM);
2867 _shm_cell_unlock(thread, &item->lock, type);
2868 return RESULT_OK;
2869}
2870
2871ShmValueHeader *
2872do_cell_get_value(ThreadContext *thread, ShmCell *item, PShmPointer value_shm, bool acquire)
2873{
2874 if (value_shm) *value_shm = EMPTY_SHM;
2875 if (!item)
2876 return NULL;
2877 ShmQueueCell *queue_item = (ShmQueueCell *)item;
2878 ShmPointer data;
2879 shm_cell_get_data(thread, item, &data);
2880 if (value_shm)
2881 *value_shm = data;
2882 if (!shm_pointer_is_valid(data) && data != NONE_SHM)
2883 return NULL;
2884
2885 ShmValueHeader *value = (ShmValueHeader *)shm_pointer_to_pointer(data);
2886 if (acquire)
2887 p_atomic_int_inc(&value->refcount);
2888
2889 return value;
2890}
2891
2892// Returns acquired value.
2893// Lock is not needed because item is already acquired and result is immutable.
2894ShmValueHeader *
2895cell_unlocked_acq_value(ThreadContext *thread, ShmCell *item, PShmPointer value_shm)
2896{
2897 return do_cell_get_value(thread, item, value_shm, true);
2898}
2899
2900ShmValueHeader *
2901cell_unlocked_get_value(ThreadContext *thread, ShmCell *item, PShmPointer value_shm)
2902{
2903 return do_cell_get_value(thread, item, value_shm, false);
2904}
2905
2906// ShmRefString
2907
2908void
2909ASCII_from_UCS4(Py_UCS1 *to, const Py_UCS4 *from, ShmInt length)
2910{
2911 for (int i = 0; i < length; ++i)
2912 {
2913 to[i] = (Py_UCS1)from[i];
2914 myassert(from[i] != 0);
2915 }
2916}
2917
2918void
2919UCS4_from_ASCII(Py_UCS4 *to, const Py_UCS1 *from, ShmInt length)
2920{
2921 for (int i = 0; i < length; ++i)
2922 {
2923 to[i] = from[i];
2924 myassert(from[i] != 0);
2925 }
2926}
2927
2928static const unsigned char number_map[10] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9' };
2929
2930int
2931UCS4_format_number(Py_UCS4 *to, int number)
2932{
2933 int len = 0;
2934 if (number <= 0)
2935 {
2936 to[0] = '0';
2937 return 1;
2938 }
2939 int tmp = number;
2940 while (tmp > 0)
2941 {
2942 tmp = tmp / 10;
2943 len++;
2944 }
2945 for (int i = len - 1; i >= 0; --i)
2946 {
2947 to[i] = number_map[number % 10];
2948 number = number / 10;
2949 }
2950 return len;
2951}
2952
2953ShmPointer
2954shm_ref_string_new_ascii(ThreadContext *thread, const Py_UCS1 *s, int len)
2955{
2956 ShmPointer rslt;
2957 int total_size = isizeof(ShmRefString) - isizeof(Py_UCS4) + len * isizeof(Py_UCS4);
2958 vl ShmRefString *ref = new_shm_refcounted_block(thread, &rslt, total_size, SHM_TYPE_REF_UNICODE, SHM_REF_STRING_DEBUG_ID);
2959 if (ref)
2960 {
2961 Py_UCS4 *data = (Py_UCS4 *)(intptr_t)&ref->chars;
2962 UCS4_from_ASCII(data, s, len);
2963 }
2964 return rslt;
2965}
2966
2967ShmPointer
2968shm_ref_string_new(ThreadContext *thread, const Py_UCS4 *s, int len)
2969{
2970 ShmPointer rslt;
2971 int total_size = isizeof(ShmRefString) - isizeof(Py_UCS4) + len * isizeof(Py_UCS4);
2972 vl ShmRefString *ref = new_shm_refcounted_block(thread, &rslt, total_size, SHM_TYPE_REF_UNICODE, SHM_REF_STRING_DEBUG_ID);
2973 if (ref)
2974 {
2975 Py_UCS4 *data = (Py_UCS4 *)&ref->chars;
2976 memcpy(data, s, len * sizeof(Py_UCS4));
2977 }
2978 return rslt;
2979}
2980
2981RefString
2982shm_ref_string_get(ShmPointer p)
2983{
2984 RefString rslt;
2985 ShmRefString *s = shm_pointer_to_pointer(p);
2986 if (!s)
2987 {
2988 rslt.data = NULL;
2989 rslt.len = 0;
2990 return rslt;
2991 }
2992 myassert((s->type & SHM_TYPE_MASK) == (SHM_TYPE_REF_UNICODE & SHM_TYPE_MASK));
2993 int strlen = (s->size - (isizeof(ShmRefString) - isizeof(Py_UCS4))) / isizeof(Py_UCS4);
2994 rslt.len = strlen;
2995 rslt.data = &s->chars;
2996 return rslt;
2997}
2998
2999// FNV_prime = 2^24 + 2^8 + 0x93 = 16777619
3000#define FNV_prime 16777619
3001#define FNV_basis 2166136261
3002
3003// FNV-1a hash
3004uint32_t
3005hash_string(const Py_UCS4 *s, int len)
3006{
3007 uint32_t hash = FNV_basis;
3008 for (int i = 0; i < len; ++i)
3009 {
3010 hash = hash ^ s[i];
3011 hash = hash * FNV_prime;
3012 }
3013 return hash;
3014}
3015
3016uint32_t
3017hash_string_ascii(const char *s, int len)
3018{
3019 uint32_t hash = 0;
3020 for (int i = 0; i < len; ++i)
3021 {
3022 hash = hash ^ (unsigned char)s[i];
3023 hash = hash * FNV_prime;
3024 }
3025 return hash;
3026}
3027
3028ShmValueHeader *
3029new_shm_unicode_value(ThreadContext *thread, const char *s, int length, PShmPointer shm_pointer)
3030{
3031 ShmValueHeader *header = new_shm_value(thread, length * isizeof(Py_UCS4), SHM_TYPE_UNICODE, shm_pointer);
3032 Py_UCS4 *data = (Py_UCS4 *)shm_value_get_data(header);
3033 UCS4_from_ASCII(data, (const Py_UCS1 *)s, length);
3034 return header;
3035}
3036
3037// ShmQueue
3038
3039ShmQueue *
3040new_shm_queue(ThreadContext *thread, PShmPointer shm_pointer)
3041{
3042 // if (shm_pointer) *shm_pointer = EMPTY_SHM; - done by get_mem
3043 ShmQueue *queue = (ShmQueue *)get_mem(thread, shm_pointer, sizeof(ShmQueue), SHM_QUEUE_DEBUG_ID);
3044 init_container((ShmContainer *)queue, sizeof(ShmQueue), SHM_TYPE_QUEUE);
3045 queue->head = EMPTY_SHM;
3046 queue->new_head = NONE_SHM;
3047 queue->tail = EMPTY_SHM;
3048 queue->new_tail = NONE_SHM;
3049 queue->count = 0;
3050 queue->type_desc = get_pchar_type_desc(thread);
3051 queue->changes_shm = EMPTY_SHM;
3052 return queue;
3053}
3054
3055// Allocate new value for the queue. Operates on detached objects, thus does not block the queue.
3056// Returns private pointer to the new item (shared shm_pointer)
3057ShmValueHeader *
3058shm_queue_new_value(ThreadContext *thread, QueueRef queue, ShmInt item_size, ShmInt type, PShmPointer shm_pointer)
3059{
3060 return new_shm_value(thread, item_size, type, shm_pointer);
3061}
3062
3063ShmQueueCell *
3064shm_queue_new_cell(ThreadContext *thread, QueueRef queue, PShmPointer shm_pointer)
3065{
3066 if (shm_pointer) *shm_pointer = EMPTY_SHM;
3067 int total_size = sizeof(ShmQueueCell);
3068 ShmQueueCell *new_block = (ShmQueueCell *)get_mem(thread, shm_pointer, total_size, SHM_QUEUE_CELL_DEBUG_ID);
3069 memclear(new_block, total_size);
3070 init_cell((ShmCell *)new_block, total_size, SHM_TYPE_QUEUE_CELL);
3071 new_block->next = EMPTY_SHM;
3072 new_block->new_next = NONE_SHM;
3073 return new_block;
3074}
3075
3076void
3077shm_queuecell_get_next(ThreadContext *thread, ShmQueueCell *cell, PShmPointer next)
3078{
3079 if (shm_cell_have_write_lock(thread, &cell->lock))
3080 {
3081 // cell is locked by me
3082 if (cell->new_next == NONE_SHM)
3083 *next = cell->next;
3084 else
3085 *next = cell->new_next;
3086 }
3087 else
3088 *next = cell->next;
3089}
3090
3091int
3092shm_queue_changes_check_inited(ThreadContext *thread, ShmQueue *queue, ShmQueueChanges **rslt)
3093{
3094 if ( ! SBOOL(queue->changes_shm) )
3095 {
3096 ShmPointer shm_pointer;
3097 ShmQueueChanges *new = (ShmQueueChanges *)get_mem(thread, &shm_pointer, sizeof(ShmQueueChanges), SHM_QUEUE_CHANGES_DEBUG_ID);
3098 new->type = SHM_TYPE_QUEUE_CHANGES;
3099 new->size = sizeof(ShmQueueChanges);
3100 new->count = 0;
3101 queue->changes_shm = shm_pointer; // publish
3102 if (rslt)
3103 *rslt = new;
3104 return RESULT_OK;
3105 }
3106 else if (rslt)
3107 {
3108 *rslt = shm_pointer_to_pointer(queue->changes_shm);
3109 }
3110 return RESULT_OK;
3111}
3112
3113int
3114shm_queue_changes_push(ThreadContext *thread, QueueRef queue, QueueCellRef cell)
3115{
3116 if (SBOOL(p_atomic_shm_pointer_get(&cell.local->lock.transaction_data)))
3117 return RESULT_OK;
3118 ShmQueueChanges *changes = NULL;
3119 shm_queue_changes_check_inited(thread, queue.local, &changes);
3120 myassert_msg(changes->count < DELTA_ARRAY_SIZE, "changes->count < DELTA_ARRAY_SIZE");
3121 int idx = changes->count;
3122 changes->cells[idx] = cell.shared;
3123 changes->count++;
3124 p_atomic_shm_pointer_set(&cell.local->lock.transaction_data, queue.shared);
3125 return RESULT_OK;
3126}
3127
3128int
3129shm_queue_changes_clear(ThreadContext *thread, ShmQueue *queue)
3130{
3131 ShmQueueChanges *changes = shm_pointer_to_pointer(queue->changes_shm);
3132 if (changes)
3133 {
3134 changes->count = 0;
3135 }
3136 return RESULT_OK;
3137}
3138
3139int
3140shm_queue_commit(ThreadContext *thread, ShmQueue *queue)
3141{
3142 shm_cell_check_write_lock(thread, &queue->base.lock);
3143 // order of operations in this function is not verified for dirty reads.
3144 ShmQueueChanges *changes = shm_pointer_to_pointer(queue->changes_shm);
3145 for (int i = 0; i < changes->count; ++i)
3146 {
3147 ShmQueueCell *cell = shm_pointer_to_pointer(changes->cells[i]);
3148 if (cell->has_new_data)
3149 {
3150 cell->has_new_data = false; // leak is better than wild pointer
3151 shm_pointer_move(thread, &cell->data, &cell->new_data);
3152 }
3153 if (cell->new_next != NONE_SHM)
3154 {
3155 shm_next_pointer_move(thread, &cell->next, &cell->new_next);
3156 }
3157 }
3158
3159 if (queue->new_tail != NONE_SHM)
3160 {
3161 queue->tail = queue->new_tail;
3162 queue->new_tail = NONE_SHM;
3163 }
3164 if (queue->new_head != NONE_SHM)
3165 {
3166 shm_next_pointer_move(thread, &queue->head, &queue->new_head);
3167 }
3168 return RESULT_OK;
3169}
3170
3171int
3172shm_queue_rollback(ThreadContext *thread, ShmQueue *queue)
3173{
3174 shm_cell_check_write_lock(thread, &queue->base.lock);
3175 // order of operations in this function is not verified for dirty reads.
3176 ShmQueueChanges *changes = shm_pointer_to_pointer(queue->changes_shm);
3177 // changes are not implemented in the shm_queue_append_do
3178 for (int i = 0; i < changes->count; ++i)
3179 {
3180 ShmQueueCell *cell = shm_pointer_to_pointer(changes->cells[i]);
3181 if (cell->has_new_data)
3182 {
3183 cell->has_new_data = false; // leak is better than wild pointer
3184 shm_pointer_empty(thread, &cell->new_data);
3185 }
3186 if (cell->new_next != NONE_SHM)
3187 {
3188 shm_next_pointer_empty(thread, &cell->new_next);
3189 }
3190 }
3191
3192 if (queue->new_tail != NONE_SHM)
3193 {
3194 // tail is not refcounted
3195 queue->new_tail = NONE_SHM;
3196 }
3197 if (queue->new_head != NONE_SHM)
3198 {
3199 shm_next_pointer_empty(thread, &queue->new_head);
3200 }
3201 return RESULT_OK;
3202}
3203
3204int
3205shm_queue_unlock(ThreadContext *thread, ShmQueue *queue, ShmInt type)
3206{
3207 // order of operations in this function is not verified for dirty reads.
3208
3209 // locks are global for the whole queue now
3210 // ShmLueueChanges *changes = shm_pointer_to_pointer(queue->changes);
3211 // for (i = 0; i < changes->count; ++i)
3212 // {
3213 // ShmCell *cell = shm_pointer_to_pointer(changes->cells[i]);
3214 // shm_cell_unlock(thread, cell);
3215 //}
3216 shm_queue_changes_clear(thread, queue);
3217
3218 // myassert(queue->base.lock.id == thread->self, "queue->lock.id == thread->self");
3219 if (TRANSACTION_ELEMENT_WRITE == type)
3220 p_atomic_shm_pointer_set(&queue->base.lock.transaction_data, EMPTY_SHM);
3221 // queue->base.lock.id = 0;
3222 _shm_cell_unlock(thread, &queue->base.lock, type);
3223 return RESULT_OK;
3224}
3225
3226// rslt is unacquired, but owned by ShmQueue
3227int
3228shm_queue_append_do(ThreadContext *thread, QueueRef queue, ShmPointer value, CellRef *rslt, bool consume)
3229{
3230 shm_queue_changes_check_inited(thread, queue.local, NULL);
3231 rslt->shared = EMPTY_SHM;
3232 //if (InterlockedCompareExchange(&queue->cell.lock, 1, 0) == 0)
3233 // take_spinlock(&queue->cell.lock, {}); // modification lock
3234 // start_transaction(thread, TRANSACTION_TRANSIENT, LOCKING_WRITE, false, NULL);
3235 bool lock_taken1 = false;
3236 if_failure(
3237 transaction_lock_write(thread, &queue.local->base.lock, queue.shared, CONTAINER_QUEUE, &lock_taken1),
3238 {
3239 if (consume)
3240 shm_pointer_release(thread, value);
3241 transient_abort(thread);
3242 return status;
3243 }
3244 );
3245 {
3246 QueueCellRef tail;
3247 bool lock_taken2 = false;
3248 if (queue.local->new_tail != NONE_SHM)
3249 init_queuecell_ref(queue.local->new_tail, &tail);
3250 else
3251 init_queuecell_ref(queue.local->tail, &tail);
3252 //if (init_queuecell_ref(queue.local->tail, &tail))
3253 //{
3254 // if_failure(transaction_lock_write(thread, &tail.local->cell.lock, tail.shared, CONTAINER_CELL, &lock_taken2),
3255 // {
3256 // transaction_unlock_local(thread, &queue.local->cell.lock, queue.shared, status, lock_taken1);
3257 // return status;
3258 // }
3259 // );
3260 // // now tail and head are both locked
3261 //}
3262 // new implementation locks the whole queue by locking the ShmQueue
3263 ShmCell *new_cell = (ShmCell *)shm_queue_new_cell(thread, queue, &rslt->shared);
3264 if ( ! consume )
3265 shm_pointer_acq(value);
3266 new_cell->data = value; // it's local, so we might store it in new_cell->data
3267 bool rslt_consumed = false;
3268
3269 if (tail.local)
3270 {
3271 assert( ! shm_pointer_is_valid(tail.local->next));
3272 shm_queue_changes_push(thread, queue, tail);
3273 shm_pointer_empty(thread, &tail.local->new_next);
3274 tail.local->new_next = rslt->shared; // ensure the tail does not point to unlinked item before becoming visible
3275 rslt_consumed = true;
3276 }
3277
3278 // queue.local->tail = rslt->shared; // now publish the tail item
3279 queue.local->new_tail = rslt->shared;
3280
3281 // InterlockedIncrement(&queue.local->count); // increment last on addition, decrement first on removal
3282 queue.local->count++; // queue is locked
3283 QueueCellRef head;
3284 if (queue.local->new_head != NONE_SHM)
3285 init_queuecell_ref(queue.local->new_head, &head);
3286 else
3287 init_queuecell_ref(queue.local->head, &head);
3288 myassert((tail.local != NULL) == (head.local != NULL));
3289 // if (!shm_pointer_is_valid(queue.local->head))
3290 if ( ! head.local )
3291 {
3292 // add a new head
3293 // assert( ! shm_pointer_is_valid(queue.local->cell.new_head));
3294 shm_pointer_empty(thread, &queue.local->new_head); // empty queue becomes valid here, so we publish it last, when all the structures are ready
3295 queue.local->new_head = rslt->shared; // consume
3296 myassert(rslt_consumed == false);
3297 rslt_consumed = true;
3298 }
3299 //if (tail.local)
3300 // transaction_unlock_local(thread, &tail.local->cell.lock, tail.shared, RESULT_OK, lock_taken2);
3301
3302 // commit
3303 //if (thread_transaction_mode == TRANSACTION_TRANSIENT && lock_taken1)
3304 // shm_queue_commit(thread, queue);
3305 //int status = transaction_unlock_local(thread, &queue.local->cell.lock, queue.shared, RESULT_OK, lock_taken1);
3306 // Should register the changes with queue->changes_shm here
3307 transient_commit(thread);
3308 rslt->local = new_cell;
3309 return RESULT_OK;
3310 };
3311}
3312
3313// Append new cells, pointing to the existing "value" (value->refcount == 1).
3314// Takes ownership over "value".
3315// rslt is a private pointer to the new cell (shared "new_cell_shm").
3316// Returns locking result.
3317int
3318shm_queue_append_consume(ThreadContext *thread, QueueRef queue, ShmPointer value, CellRef *rslt)
3319{
3320 return shm_queue_append_do(thread, queue, value, rslt, true);
3321}
3322
3323int
3324shm_queue_append(ThreadContext *thread, QueueRef queue, ShmPointer value, CellRef *rslt)
3325{
3326 return shm_queue_append_do(thread, queue, value, rslt, false);
3327}
3328
3329// Acquires the first cell in the queue
3330ShmCell *
3331do_shm_queue_get_first(ThreadContext *thread, QueueRef queue, PShmPointer shm_pointer, bool acquire)
3332{
3333 if (shm_pointer) *shm_pointer = EMPTY_SHM;
3334 ShmPointer data = EMPTY_SHM;
3335 if (shm_cell_have_write_lock(thread, &queue.local->base.lock))
3336 {
3337 // cell is locked by me
3338 if (SBOOL(queue.local->new_head))
3339 data = queue.local->new_head;
3340 else
3341 data = queue.local->head;
3342 }
3343 else
3344 data = queue.local->head;
3345
3346 if (!shm_pointer_is_valid(data))
3347 return NULL;
3348 //while (1)
3349 {
3350 // queue->head might be released during our operation
3351 //if (InterlockedCompareExchange(&queue->cell.lock, 1, 0) == 0)
3352 // take_spinlock(&queue->cell.lock, {}); - not needed with memory releaser(GC)
3353 {
3354 if (shm_pointer_is_valid(data))
3355 {
3356 if (shm_pointer)
3357 *shm_pointer = data;
3358 ShmCell *item = (ShmCell *)shm_pointer_to_pointer(data);
3359 if (acquire)
3360 p_atomic_int_inc(&item->refcount);
3361 // release_spinlock(&queue->cell.lock);
3362 return item;
3363 }
3364 else
3365 {
3366 // release_spinlock(&queue->cell.lock);
3367 return NULL;
3368 }
3369 }
3370 }
3371}
3372
3373ShmCell *
3374shm_queue_acq_first(ThreadContext *thread, QueueRef queue, PShmPointer shm_pointer)
3375{
3376 return do_shm_queue_get_first(thread, queue, shm_pointer, true);
3377}
3378
3379ShmCell *
3380shm_queue_get_first(ThreadContext *thread, QueueRef queue, PShmPointer shm_pointer)
3381{
3382 return do_shm_queue_get_first(thread, queue, shm_pointer, false);
3383}
3384
3385ShmCell *
3386do_shm_queue_cell_get_next(ThreadContext *thread, ShmCell *item, PShmPointer next_shm, bool acquire)
3387{
3388 if (next_shm) *next_shm = EMPTY_SHM;
3389 if (!item)
3390 return NULL;
3391 ShmQueueCell *queue_item = (ShmQueueCell *)item;
3392 if (!shm_pointer_is_valid(queue_item->next))
3393 return NULL;
3394 //while (1)
3395 {
3396 //if (InterlockedCompareExchange(&queue_item->cell.lock, 1, 0) == 0)
3397 //take_spinlock(&queue_item->cell.lock, {}); - not needed with memory releaser(GC)
3398 {
3399 // Double check under lock.
3400 // The next item cannot be unlinked without locking this item, so queue_item->next->refcount >= 1.
3401 if (shm_pointer_is_valid(queue_item->next))
3402 {
3403 if (next_shm)
3404 *next_shm = queue_item->next;
3405 ShmCell *next_item = (ShmCell *)shm_pointer_to_pointer(queue_item->next);
3406 if (acquire)
3407 p_atomic_int_inc(&next_item->refcount);
3408 //release_spinlock(&queue_item->cell.lock);
3409 return next_item;
3410 }
3411 else
3412 {
3413 //release_spinlock(&queue_item->cell.lock);
3414 return NULL;
3415 }
3416 }
3417 }
3418}
3419
3420// "item" shall be already acquired, so it won't be released, and we can safely lock it to step.
3421// Returns acquired next cell.
3422ShmCell *
3423shm_queue_cell_acq_next(ThreadContext *thread, ShmCell *item, PShmPointer next_shm)
3424{
3425 return do_shm_queue_cell_get_next(thread, item, next_shm, true);
3426}
3427
3428ShmCell *
3429shm_queue_cell_get_next(ThreadContext *thread, ShmCell *item, PShmPointer next_shm)
3430{
3431 return do_shm_queue_cell_get_next(thread, item, next_shm, false);
3432}
3433
3434// end ShmQueue
3435
3436// ///////////////////////////////////////////////////////////////////////
3437
3438// ShmList
3439
3440ShmList *
3441new_shm_list(ThreadContext *thread, PShmPointer result)
3442{
3443 ShmList *list = get_mem(thread, result, sizeof(ShmList), SHM_LIST_DEBUG_ID);
3444 init_container((ShmContainer *)list, sizeof(ShmList), SHM_TYPE_LIST);
3445 list->top_block = EMPTY_SHM;
3446 list->count = 0;
3447 list->new_count = -1;
3448 list->deleted = 0;
3449 list->new_deleted = -1;
3450 list->type_desc = get_pchar_type_desc(thread);
3451 list->changes_shm = EMPTY_SHM;
3452 list->inited = 0;
3453 return list;
3454}
3455
3456int
3457shm_list_changes_check_inited(ThreadContext *thread, ShmList *list, ShmListChanges **rslt)
3458{
3459 if (!SBOOL(list->changes_shm))
3460 {
3461 ShmPointer shm_pointer;
3462 ShmListChanges *new = (ShmListChanges *)get_mem(thread, &shm_pointer, sizeof(ShmListChanges), SHM_LIST_CHANGES_DEBUG_ID);
3463 new->type = SHM_TYPE_LIST_CHANGES;
3464 new->size = sizeof(ShmListChanges);
3465 new->count = 0;
3466 list->changes_shm = shm_pointer; // publish
3467 if (rslt)
3468 *rslt = new;
3469 return RESULT_OK;
3470 }
3471 else if (rslt)
3472 {
3473 *rslt = shm_pointer_to_pointer(list->changes_shm);
3474 }
3475 return RESULT_OK;
3476}
3477
3478int
3479shm_list_changes_push(ThreadContext *thread, ShmList *list, ShmListBlock *block, ShmInt block_index, ShmInt index)
3480{
3481 if (block->cells[index].changed)
3482 return RESULT_OK;
3483 ShmListChanges *changes = NULL;
3484 shm_list_changes_check_inited(thread, list, &changes);
3485 myassert(changes->count < DELTA_ARRAY_SIZE);
3486 if (changes->count >= DELTA_ARRAY_SIZE)
3487 return RESULT_FAILURE;
3488 int idx = changes->count;
3489 changes->cells[idx].block_index = block_index;
3490 changes->cells[idx].item_index = index;
3491 changes->count++;
3492 block->cells[index].changed = true;
3493 return RESULT_OK;
3494}
3495
3496int
3497shm_list_changes_clear(ThreadContext *thread, ShmList *list)
3498{
3499 ShmListChanges *changes = shm_pointer_to_pointer(list->changes_shm);
3500 if (changes)
3501 {
3502 changes->count = 0;
3503 }
3504 return RESULT_OK;
3505}
3506
3507ShmListCounts
3508shm_list_block_get_count(ShmListBlock *block, bool owned)
3509{
3510 ShmListCounts result = SHM_LIST_INVALID_COUNTS;
3511
3512 if (owned && block->new_count != -1)
3513 result.count = block->new_count;
3514 else
3515 result.count = block->count;
3516
3517 if (owned && block->new_deleted != -1)
3518 result.deleted = block->new_deleted;
3519 else
3520 result.deleted = block->deleted;
3521
3522 return result;
3523}
3524
3525ShmListCounts
3526shm_list_index_get_count(ShmListIndexItem *item, bool owned)
3527{
3528 ShmListCounts rslt = { .count = item->count, .deleted = item->deleted };
3529 if (owned && item->new_count != -1)
3530 rslt.count = item->new_count;
3531 if (owned && item->new_deleted != -1)
3532 rslt.deleted = item->new_deleted;
3533 return rslt;
3534}
3535
3536ShmListCounts
3537shm_list_get_fast_count(ThreadContext *thread, ShmList *list, bool owned)
3538{
3539 ShmListCounts rslt = { .count = p_atomic_int_get(&list->count), .deleted = p_atomic_int_get(&list->deleted) };
3540 if (owned)
3541 {
3542 ShmInt new_count = p_atomic_int_get(&list->new_count);
3543 ShmInt new_deleted = p_atomic_int_get(&list->new_deleted);
3544 if (new_count != -1)
3545 rslt.count = new_count;
3546 if (new_deleted != -1)
3547 rslt.deleted = new_deleted;
3548 }
3549 return rslt;
3550}
3551
3552ShmPointer
3553shm_list_cell_get_data(ShmListCell *cell, bool owned)
3554{
3555 if (owned && cell->has_new_data)
3556 return cell->new_data;
3557 else
3558 return cell->data;
3559}
3560void
3561shm_list_block_verify_clean(ShmListBlock *block)
3562{
3563 if (block)
3564 {
3565 myassert(block->new_count == -1);
3566 myassert(block->new_deleted == -1);
3567 myassert(block->refcount > 0);
3568 myassert(block->capacity >= block->count + block->deleted);
3569 myassert(block->capacity >= block->count);
3570 }
3571}
3572
3573int
3574shm_list_get_count(ThreadContext *thread, ListRef list, ShmListCounts *result, bool debug)
3575{
3576 transient_check_clear(thread);
3577 if_failure(
3578 transaction_lock_read(thread, &list.local->base.lock, list.shared, CONTAINER_LIST, NULL),
3579 {
3580 transient_abort(thread);
3581 return status;
3582 }
3583 );
3584 shm_cell_check_read_write_lock(thread, &list.local->base.lock);
3585 bool owned = shm_cell_have_write_lock(thread, &list.local->base.lock);
3586
3587 ShmRefcountedBlock *top_block = shm_pointer_to_pointer(list.local->top_block);
3588 ShmListCounts collected_counts = { 0, 0 };
3589 if (top_block == NULL)
3590 ; // zero
3591 else if (SHM_TYPE_LIST_BLOCK == top_block->type)
3592 {
3593 if (debug)
3594 {
3595 ShmListBlock *block = (ShmListBlock *)top_block;
3596 ShmListCounts cnts = shm_list_block_get_count(block, owned);
3597 collected_counts.count = cnts.count;
3598 collected_counts.deleted = cnts.deleted;
3599 }
3600 }
3601 else if (SHM_TYPE_LIST_INDEX == top_block->type)
3602 {
3603 ShmListIndex *index_block = (ShmListIndex *)top_block;
3604 if (debug)
3605 {
3606 for (int i = 0; i < index_block->index_size; ++i)
3607 {
3608 ShmListCounts cnts = shm_list_index_get_count(&index_block->cells[i], owned);
3609 collected_counts.count += cnts.count;
3610 collected_counts.deleted += cnts.deleted;
3611 }
3612 }
3613 }
3614 else
3615 myassert(false);
3616
3617 ShmListCounts counts = shm_list_get_fast_count(thread, list.local, owned);
3618
3619 if (debug)
3620 {
3621 myassert(counts.count == collected_counts.count);
3622 myassert(counts.deleted == collected_counts.deleted);
3623 }
3624
3625 transient_commit(thread);
3626 *result = counts;
3627 return RESULT_OK;
3628}
3629
3630static void
3631shm_list_validate_indexed_block(ShmListIndexItem *index_item, ShmListBlock *block)
3632{
3633 myassert(block->type == SHM_TYPE_LIST_BLOCK);
3634 myassert(index_item->count == block->count);
3635 myassert(index_item->new_count == block->new_count);
3636 myassert(index_item->deleted == block->deleted);
3637 myassert(index_item->new_deleted == block->new_deleted);
3638}
3639
3640// either list_index or first_block is present. I know C data types are horrible.
3641ShmListBlockRef
3642shm_list_get_block(int block_index, ShmListIndex *list_index, ShmListBlockRef first_block, ShmListIndexItem **index_item)
3643{
3644 ShmListBlockRef result;
3645 if (NULL == list_index)
3646 {
3647 assert(block_index == 0);
3648 return first_block;
3649 }
3650
3651 myassert(block_index >= 0 && block_index < list_index->index_size);
3652 ShmListIndexItem *item = &list_index->cells[block_index];
3653 *index_item = item;
3654 result.shared = item->block;
3655 result.local = shm_pointer_to_pointer(result.shared);
3656
3657 if (result.local)
3658 {
3659 shm_list_validate_indexed_block(item, result.local);
3660 }
3661 return result;
3662}
3663
3664typedef struct {
3665 ShmListIndex *index;
3666 ShmListIndexItem *cell;
3667 int cell_ii;
3668 ShmPointer block_shm;
3669} shm_list__index_desc;
3670
3671typedef struct {
3672 ShmInt itemindex;
3673 ShmListBlock *block;
3674} shm_list__block_desc;
3675
3676// Almost never fails
3677// Item is accessed by result.block[result.itemindex]
3678shm_list__block_desc
3679shm_list_get_item_desc(ThreadContext *thread, ListRef list, ShmInt itemindex, bool owned, shm_list__index_desc *index_desc)
3680{
3681 if (index_desc) {
3682 index_desc->index = NULL;
3683 index_desc->cell = NULL;
3684 index_desc->cell_ii = - 1;
3685 index_desc->block_shm = EMPTY_SHM;
3686 }
3687
3688 shm_cell_check_read_write_lock(thread, &list.local->base.lock);
3689
3690 ShmListIndex *list_index = NULL;
3691 ShmListBlockRef first_block = { EMPTY_SHM, NULL };
3692 ShmRefcountedBlock *top_block = shm_pointer_to_pointer(list.local->top_block);
3693 int block_count = 0;
3694 if (top_block == NULL)
3695 {
3696 shm_list__block_desc rslt;
3697 rslt.block = NULL;
3698 rslt.itemindex = -1;
3699 return rslt;
3700 }
3701 else if (SHM_TYPE_LIST_BLOCK == top_block->type)
3702 {
3703 first_block.local = (ShmListBlock *)top_block;
3704 first_block.shared = list.local->top_block;
3705 block_count = 1;
3706 }
3707 else if (SHM_TYPE_LIST_INDEX == top_block->type)
3708 {
3709 list_index = (ShmListIndex *)top_block;
3710 block_count = list_index->index_size;
3711 }
3712 else
3713 myassert_msg(false, "Invalid top_block->type");
3714
3715 ShmListCounts counts = shm_list_get_fast_count(thread, list.local, owned);
3716
3717 ShmInt local_ii = itemindex + counts.deleted;
3718 int target_block_index = 0;
3719 if (list_index)
3720 {
3721 for (int idx = 0; idx < block_count; ++idx)
3722 {
3723 ShmListCounts block_counts = shm_list_index_get_count(&list_index->cells[idx], owned);
3724 if (local_ii >= block_counts.count + block_counts.deleted)
3725 {
3726 local_ii = local_ii - (block_counts.count + block_counts.deleted);
3727 target_block_index = idx + 1;
3728 }
3729 else
3730 break;
3731 }
3732 if (target_block_index == list_index->index_size)
3733 {
3734 // index is outside range
3735 shm_list__block_desc rslt;
3736 rslt.block = NULL;
3737 rslt.itemindex = -1;
3738 return rslt;
3739 }
3740
3741 myassert(target_block_index >= 0 && target_block_index < list_index->index_size);
3742 }
3743 else
3744 myassert(target_block_index == 0);
3745
3746 ShmListIndexItem *index_item = NULL;
3747 ShmListBlockRef block = shm_list_get_block(target_block_index, list_index, first_block, &index_item);
3748
3749 if (index_desc) {
3750 index_desc->index = list_index;
3751 index_desc->cell = index_item;
3752 index_desc->cell_ii = target_block_index;
3753 index_desc->block_shm = index_item ? index_item->block : first_block.shared;
3754 }
3755
3756 shm_list__block_desc rslt;
3757 rslt.block = block.local;
3758 rslt.itemindex = local_ii;
3759 return rslt;
3760}
3761
3762int
3763shm_list_get_item_do(ThreadContext *thread, ListRef list, ShmInt index, ShmPointer *result)
3764{
3765 *result = EMPTY_SHM;
3766 if_failure(
3767 transaction_lock_read(thread, &list.local->base.lock, list.shared, CONTAINER_LIST, NULL),
3768 {
3769 transient_abort(thread);
3770 return status;
3771 }
3772 );
3773 shm_cell_check_read_write_lock(thread, &list.local->base.lock);
3774 bool owned = shm_cell_have_write_lock(thread, &list.local->base.lock);
3775
3776 shm_list__block_desc block_desc = shm_list_get_item_desc(thread, list, index, owned, NULL);
3777 myassert(block_desc.block);
3778
3779 ShmListCounts cnts = shm_list_block_get_count(block_desc.block, owned);
3780 myassert(block_desc.itemindex >= cnts.deleted);
3781 if (block_desc.itemindex >= cnts.deleted + cnts.count)
3782 {
3783 transient_abort(thread);
3784 myassert(false);
3785 return RESULT_FAILURE;
3786 }
3787 int ii = block_desc.itemindex;
3788 myassert(ii >= 0 && ii < block_desc.block->capacity);
3789
3790 ShmListCell *cell = NULL;
3791 cell = &block_desc.block->cells[block_desc.itemindex];
3792 if (cell)
3793 {
3794 ShmPointer value = shm_list_cell_get_data(cell, owned);
3795 *result = value;
3796 }
3797 transient_commit(thread);
3798 return RESULT_OK;
3799}
3800
3801int
3802shm_list_get_item(ThreadContext *thread, ListRef list, ShmInt index, ShmPointer *result)
3803{
3804 return shm_list_get_item_do(thread, list, index, result);
3805}
3806
3807int
3808shm_list_acq_item(ThreadContext *thread, ListRef list, ShmInt index, ShmPointer *result)
3809{
3810 int rslt = shm_list_get_item_do(thread, list, index, result);
3811 if (rslt == RESULT_OK)
3812 shm_pointer_acq(*result);
3813 return rslt;
3814}
3815
3816void
3817fputc_ascii(char c, FILE *file)
3818{
3819 fputc(c, file);
3820 fputc('\0', file);
3821}
3822
3823void
3824shm_list_print_to_file(FILE *file, ShmList *list)
3825{
3826 ShmListIndex *list_index = NULL;
3827 ShmListBlock *first_block = NULL;
3828 ShmRefcountedBlock *top_block = shm_pointer_to_pointer(list->top_block);
3829 int block_count = 0;
3830 if (top_block == NULL)
3831 myassert(false);
3832 else if (SHM_TYPE_LIST_BLOCK == top_block->type)
3833 {
3834 first_block = (ShmListBlock *)top_block;
3835 block_count = 1;
3836 }
3837 else if (SHM_TYPE_LIST_INDEX == top_block->type)
3838 {
3839 list_index = (ShmListIndex *)top_block;
3840 block_count = list_index->index_size;
3841 }
3842 else
3843 myassert_msg(false, "Invalid top_block->type");
3844
3845 for (int idx = 0; idx < block_count; ++idx)
3846 {
3847 ShmListBlock *block;
3848 if (list_index)
3849 block = shm_pointer_to_pointer(list_index->cells[idx].block);
3850 else
3851 block = first_block;
3852 myassert(block);
3853
3854 for (int item = 0; item < block->count; ++item)
3855 {
3856 ShmValueHeader *header = shm_pointer_to_pointer(block->cells[item].data);
3857 if (header)
3858 {
3859 if (shm_type_get_type(header->type) == SHM_TYPE_UNICODE)
3860 {
3861 RefString s = shm_ref_string_get(block->cells[item].data);
3862 for (int i = 0; i < s.len; ++i)
3863 {
3864 // little endian
3865 Py_UCS4 c = s.data[i];
3866 fputc(c & 0xFF, file);
3867 fputc((c & 0xFF00) >> 8, file);
3868 }
3869 }
3870 else
3871 {
3872 char *s = shm_value_get_data(header);
3873 int len = shm_value_get_length(header);
3874 for (int i = 0; i < len; ++i)
3875 {
3876 fputc_ascii(s[i], file);
3877 }
3878 }
3879 fputc_ascii('\n', file);
3880 }
3881 else
3882 {
3883 fputc_ascii('n', file);
3884 fputc_ascii('o', file);
3885 fputc_ascii('n', file);
3886 fputc_ascii('e', file);
3887 fputc_ascii('\n', file);
3888 }
3889 }
3890
3891 fputc('-', file);
3892 fputc('\0', file);
3893 fputc('-', file);
3894 fputc('\0', file);
3895 fputc('\n', file);
3896 fputc('\0', file);
3897 }
3898}
3899
3900ShmListBlock *
3901shm_list_new_block(ThreadContext *thread, ShmList *list, ShmPointer *result_shm, int capacity, int debug_id)
3902{
3903 int block_size = isizeof(ShmListBlockHeader) + isizeof(ShmListCell) * capacity;
3904 ShmListBlock* rslt = new_shm_refcounted_block(thread, result_shm, block_size, SHM_TYPE_LIST_BLOCK, debug_id);
3905 rslt->capacity = capacity;
3906 rslt->count = 0;
3907 rslt->new_count = -1;
3908 rslt->deleted = 0;
3909 rslt->new_deleted = -1;
3910 rslt->count_added_after_relocation = 0;
3911 return rslt;
3912}
3913
3914void
3915shm_list_init_cell(ShmListCell *new_cell, ShmPointer container_shm)
3916{
3917 memclear(new_cell, sizeof(ShmListCell));
3918 // ShmContainedBlock
3919 new_cell->type = SHM_TYPE_LIST_CELL;
3920 new_cell->size = sizeof(ShmListCell);
3921 new_cell->container = container_shm;
3922 // ShmCellBase
3923 new_cell->data = EMPTY_SHM;
3924 new_cell->has_new_data = false;
3925 new_cell->new_data = EMPTY_SHM;
3926 // ShmListCell
3927 new_cell->changed = false;
3928}
3929
3930static int shm_list_append_do__old_count;
3931static int shm_list_append_do__new_count;
3932static ShmListBlock *shm_list_append_do__last_block;
3933
3934
3935// 64k / 28 = 2300 max elements in the single block.
3936// 512k / 12 = 24k max blocks in index.
3937// 2.3k * 24k = 55M max elements, 1500 Mb of memory.
3938ShmInt
3939shm_list_max_index_size() {
3940 myassert(max_heap_block_size != 0);
3941 ShmInt rslt = (max_heap_block_size - isizeof(ShmListIndexHeader)) / isizeof(ShmListIndexItem);
3942 myassert(rslt > 0);
3943 return rslt;
3944}
3945
3946// Not adapted for dirty reads.
3947//
3948int
3949shm_list_append_do(ThreadContext *thread, ListRef list, ShmPointer value, bool consume, ShmInt *out_index)
3950{
3951 shm_list_append_do__old_count = -1;
3952 shm_list_append_do__new_count = -1;
3953 shm_list_append_do__last_block = NULL;
3954
3955 shm_list_changes_check_inited(thread, list.local, NULL);
3956 // rslt->shared = EMPTY_SHM;
3957 bool lock_taken = false;
3958 if_failure(
3959 transaction_lock_write(thread, &list.local->base.lock, list.shared, CONTAINER_LIST, &lock_taken),
3960 {
3961 if (consume && SBOOL(value))
3962 shm_pointer_release(thread, value);
3963 transient_abort(thread);
3964 return status;
3965 }
3966 );
3967 shm_cell_check_write_lock(thread, &list.local->base.lock);
3968 // Below we ensure that the block has no previous transaction data if lock_taken = true.
3969 // We might consider checking all the blocks this way.
3970
3971 {
3972 bool owned = true;
3973
3974 ShmListBlock *tail_block = NULL;
3975 ShmPointer tail_block_shm = EMPTY_SHM;
3976 // ShmListIndex *list_index = NULL;
3977 // ShmListIndexItem *index_item = NULL;
3978 // int index_cell_ii = 0;
3979 // int old_count = -1;
3980 // index_item, list_index, old_count local vars should stay in sync
3981 shm_list__index_desc index_desc = {
3982 .index = NULL,
3983 .cell = NULL,
3984 .cell_ii = -1,
3985 .block_shm = EMPTY_SHM,
3986 };
3987
3988 ShmRefcountedBlock *top_block = shm_pointer_to_pointer(list.local->top_block);
3989 if (top_block == NULL)
3990 {
3991 // create initial block
3992 myassert(list.local->inited == false);
3993 ShmPointer new_top_shm = EMPTY_SHM;
3994 tail_block = shm_list_new_block(thread, list.local, &new_top_shm, 8, SHM_LIST_BLOCK_FIRST_DEBUG_ID);
3995 tail_block_shm = new_top_shm;
3996 top_block = (ShmRefcountedBlock *)tail_block;
3997 index_desc.index = NULL;
3998 index_desc.cell = NULL;
3999 index_desc.cell_ii = 0;
4000 index_desc.block_shm = new_top_shm;
4001 shm_pointer_move(thread, &list.local->top_block, &new_top_shm);
4002 list.local->inited = true;
4003 }
4004 else if (SHM_TYPE_LIST_BLOCK == top_block->type)
4005 {
4006 tail_block = (ShmListBlock *)top_block;
4007 tail_block_shm = list.local->top_block;
4008 index_desc.index = NULL;
4009 index_desc.cell = NULL;
4010 index_desc.cell_ii = 0;
4011 index_desc.block_shm = tail_block_shm;
4012 }
4013 else if (SHM_TYPE_LIST_INDEX == top_block->type)
4014 {
4015 index_desc.index = (ShmListIndex *)top_block;
4016 index_desc.cell_ii = index_desc.index->index_size - 1;
4017 index_desc.cell = &index_desc.index->cells[index_desc.cell_ii];
4018 myassert(index_desc.index->index_size > 0);
4019 myassert(isizeof(ShmListIndexHeader) + ((intptr_t)index_desc.cell - (intptr_t)&index_desc.index->cells[0]) <
4020 index_desc.index->size);
4021 tail_block = shm_pointer_to_pointer(index_desc.cell->block);
4022 tail_block_shm = index_desc.cell->block;
4023 index_desc.block_shm = tail_block_shm;
4024 shm_list_validate_indexed_block(index_desc.cell, tail_block);
4025 }
4026 else
4027 {
4028 myassert(false);
4029 if (consume && SBOOL(value))
4030 shm_pointer_release(thread, value);
4031 return RESULT_FAILURE;
4032 }
4033
4034 ShmInt old_capacity = tail_block->capacity;
4035 ShmListCounts old_cnts = shm_list_block_get_count(tail_block, owned);
4036 myassert(old_cnts.count >= 0);
4037 shm_list_append_do__old_count = old_cnts.count;
4038
4039 myassert(tail_block);
4040 myassert(tail_block->type == SHM_TYPE_LIST_BLOCK);
4041 int tail_block_id = mm_block_get_debug_id(tail_block, tail_block_shm);
4042 myassert(SHM_LIST_BLOCK_FIRST_DEBUG_ID == tail_block_id || SHM_LIST_BLOCK_DEBUG_ID == tail_block_id);
4043
4044 if (index_desc.index)
4045 myassert(index_desc.cell);
4046 if (lock_taken)
4047 {
4048 // verify it has no data from previous transactions, see desc at the begginning of func.
4049 shm_list_block_verify_clean(tail_block);
4050 if (index_desc.index)
4051 {
4052 myassert(index_desc.cell);
4053 myassert(index_desc.cell->new_count == -1);
4054 myassert(index_desc.cell->new_deleted == -1);
4055 }
4056 }
4057
4058 ShmInt max_index_size = shm_list_max_index_size();
4059 ShmInt max_block_capacity = (medium_size_map[4] - isizeof(ShmListBlockHeader)) / isizeof(ShmListCell);
4060
4061 if (old_cnts.count + old_cnts.deleted >= old_capacity)
4062 {
4063 // Block reallocation is constrained by indexes from ShmListChanges.
4064 // We can safely grow the blocks and index, but cannot shift existing offsets.
4065 // Moreover, we are not forced to rollback the reallocation for as long as we keep the indexes,
4066 // thus no "ShmList->new_index" and "ShmListIndex->new_cells".
4067 if (old_capacity >= max_block_capacity)
4068 {
4069 // create a new block and rebuild index
4070 ShmPointer new_tail_shm = EMPTY_SHM;
4071 ShmListBlock *old_tail = tail_block;
4072 ShmListIndex *old_index = index_desc.index;
4073 tail_block = shm_list_new_block(thread, list.local, &new_tail_shm, max_block_capacity / 4, SHM_LIST_BLOCK_DEBUG_ID);
4074
4075 int new_index_capacity = 2; // first block already exists
4076 if (index_desc.index)
4077 new_index_capacity = index_desc.index->index_size + 1;
4078 myassert(new_index_capacity > 0 && new_index_capacity < max_index_size);
4079 // alignment is not a problem here because everything is ShmInt
4080 int index_size = isizeof(ShmListIndexHeader) + isizeof(ShmListIndexItem) * new_index_capacity;
4081 ShmPointer new_index_shm = EMPTY_SHM;
4082 ShmListIndex *new_index_block = new_shm_refcounted_block(thread, &new_index_shm, index_size, SHM_TYPE_LIST_INDEX, SHM_LIST_INDEX_DEBUG_ID);
4083 new_index_block->index_size = new_index_capacity;
4084
4085 int last_idx = new_index_capacity - 1;
4086 // clone the old index into new one
4087 if (old_index)
4088 {
4089 myassert(index_desc.index->index_size > 0);
4090 memcpy(CAST_VL(&new_index_block->cells[0]), CAST_VL(&old_index->cells[0]),
4091 sizeof(ShmListIndexItem) * (puint)old_index->index_size);
4092 // Clear the references from original index, because they are contained in the new index now
4093 for (int i = 0; i < old_index->index_size; i++)
4094 old_index->cells[i].block = EMPTY_SHM;
4095 }
4096 else
4097 {
4098 myassert(list.local->top_block == index_desc.block_shm);
4099 // move top_block into first position
4100 shm_pointer_move(thread, &new_index_block->cells[0].block, &list.local->top_block);
4101 new_index_block->cells[0].count = old_tail->count;
4102 new_index_block->cells[0].new_count = old_tail->new_count;
4103 new_index_block->cells[0].deleted = old_tail->deleted;
4104 new_index_block->cells[0].new_deleted = old_tail->new_deleted;
4105 }
4106
4107 memset(CAST_VL(&new_index_block->cells[last_idx]), 0xF9, sizeof(ShmListIndexItem));
4108
4109 new_index_block->cells[last_idx].block = new_tail_shm; // consume
4110 new_index_block->cells[last_idx].count = tail_block->count;
4111 new_index_block->cells[last_idx].new_count = tail_block->new_count;
4112 new_index_block->cells[last_idx].deleted = tail_block->deleted;
4113 new_index_block->cells[last_idx].new_deleted = tail_block->new_deleted;
4114
4115 // index_item, list_index, old_count local vars should stay in sync
4116 index_desc.cell = &new_index_block->cells[last_idx];
4117 index_desc.index = new_index_block;
4118 index_desc.cell_ii = last_idx;
4119 index_desc.block_shm = new_tail_shm;
4120 old_cnts.count = tail_block->count;
4121 old_cnts.deleted = tail_block->deleted;
4122 myassert(old_cnts.count == 0);
4123 myassert(old_cnts.deleted == 0);
4124
4125 // shm_pointer_move is not thread-safe on weak ordering arch
4126 shm_pointer_move(thread, &list.local->top_block, &new_index_shm);
4127 }
4128 else
4129 {
4130 // Grow-reallocate the block.
4131 ShmInt new_capacity;
4132 if (old_capacity > 64)
4133 new_capacity = old_capacity + old_capacity / 4;
4134 else
4135 new_capacity = old_capacity * 2;
4136
4137 if (new_capacity > max_block_capacity)
4138 new_capacity = max_block_capacity;
4139
4140 ShmPointer new_tail_shm = EMPTY_SHM;
4141 int block_size = isizeof(ShmListBlockHeader) + isizeof(ShmListCell) * new_capacity;
4142 int old_block_size = isizeof(ShmListBlockHeader) + isizeof(ShmListCell) * old_capacity;
4143 myassert(old_block_size < block_size);
4144 ShmListBlock *old_tail_block = tail_block;
4145 myassert(index_desc.block_shm != EMPTY_SHM);
4146 int old_tail_block_id = mm_block_get_debug_id(old_tail_block, index_desc.block_shm);
4147 myassert(SHM_LIST_BLOCK_FIRST_DEBUG_ID == old_tail_block_id || SHM_LIST_BLOCK_DEBUG_ID == old_tail_block_id);
4148
4149 tail_block = shm_list_new_block(thread, list.local, &new_tail_shm, new_capacity, old_tail_block_id);
4150 myassert(tail_block->size == block_size);
4151
4152 shm_list_append_do__last_block = tail_block;
4153
4154 tail_block->count = old_tail_block->count;
4155 tail_block->new_count = old_tail_block->new_count;
4156 tail_block->deleted = old_tail_block->deleted;
4157 tail_block->new_deleted = old_tail_block->new_deleted;
4158
4159 memcpy(CAST_VL(&tail_block->cells[0]), CAST_VL(&old_tail_block->cells[0]), (size_t)(isizeof(ShmListCell) * old_capacity));
4160 memset(CAST_VL(&tail_block->cells[old_capacity]), 0xF7, (size_t)(block_size - old_block_size));
4161
4162 if (index_desc.cell)
4163 shm_pointer_move(thread, &index_desc.cell->block, &new_tail_shm);
4164 else
4165 shm_pointer_move(thread, &list.local->top_block, &new_tail_shm);
4166 }
4167 }
4168
4169 int idx = old_cnts.count + old_cnts.deleted;
4170 myassert(idx < tail_block->capacity);
4171 ShmListCell *new_cell = &tail_block->cells[idx];
4172 shm_list_init_cell(new_cell, list.shared);
4173
4174 // update cell
4175 if (!consume)
4176 shm_pointer_acq(value);
4177 shm_pointer_move(thread, &new_cell->new_data, &value);
4178 new_cell->has_new_data = true;
4179 // update block
4180 tail_block->new_count = old_cnts.count + 1;
4181 // update index
4182 if (index_desc.cell)
4183 index_desc.cell->new_count = old_cnts.count + 1;
4184 // update list
4185 ShmListCounts old_total_counts = shm_list_get_fast_count(thread, list.local, owned);
4186 p_atomic_int_set(&list.local->new_count, old_total_counts.count + 1);
4187
4188 if (out_index)
4189 *out_index = old_total_counts.count;
4190
4191 shm_list_append_do__new_count = old_cnts.count + 1;
4192
4193 shm_list_changes_push(thread, list.local, tail_block, index_desc.cell_ii, idx);
4194 transient_commit(thread);
4195 // rslt->local = new_cell;
4196
4197 return RESULT_OK;
4198 };
4199}
4200
4201int
4202shm_list_append(ThreadContext *thread, ListRef list, ShmPointer value, ShmInt *index)
4203{
4204 return shm_list_append_do(thread, list, value, false, index);
4205}
4206
4207int
4208shm_list_append_consume(ThreadContext *thread, ListRef list, ShmPointer value, ShmInt *index)
4209{
4210 return shm_list_append_do(thread, list, value, true, index);
4211}
4212
4213// result is acquired
4214int
4215shm_list_popleft(ThreadContext *thread, ListRef list, ShmPointer *result)
4216{
4217 shm_pointer_empty(thread, result);
4218
4219 bool lock_taken = false;
4220 if_failure(
4221 transaction_lock_write(thread, &list.local->base.lock, list.shared, CONTAINER_LIST, &lock_taken),
4222 {
4223 transient_abort(thread);
4224 return status;
4225 }
4226 );
4227
4228 shm_cell_check_write_lock(thread, &list.local->base.lock);
4229 // Below we ensure that the block has no previous transaction data if lock_taken = true.
4230 // We might consider checking all the blocks this way.
4231
4232 bool owned = true;
4233
4234 shm_list__index_desc index_desc;
4235 shm_list__block_desc block_desc = shm_list_get_item_desc(thread, list, 0, owned, &index_desc);
4236 if (block_desc.itemindex == -1)
4237 {
4238 transient_commit(thread);
4239 return RESULT_OK;
4240 }
4241 myassert(block_desc.block);
4242
4243 if (index_desc.index)
4244 myassert(index_desc.cell);
4245 if (lock_taken)
4246 {
4247 // verify it has no data from previous transactions, see desc at the begginning of func.
4248 shm_list_block_verify_clean(block_desc.block);
4249 if (index_desc.index)
4250 {
4251 myassert(index_desc.cell->new_count == -1);
4252 myassert(index_desc.cell->new_deleted == -1);
4253 }
4254 }
4255
4256 ShmListCounts old_cnts = shm_list_block_get_count(block_desc.block, owned);
4257 if (block_desc.itemindex >= old_cnts.count + old_cnts.deleted)
4258 {
4259 myassert(false); // should've been handled by shm_list_get_item_desc
4260 transient_commit(thread);
4261 return RESULT_OK;
4262 }
4263 int ii = block_desc.itemindex;
4264 myassert(ii >= 0 && ii < block_desc.block->capacity);
4265
4266 ShmListCell *cell = &block_desc.block->cells[ii];
4267 // Update cell
4268 // shm_pointer_empty(thread, &cell->new_data);
4269 if (cell->has_new_data)
4270 shm_pointer_move(thread, result, &cell->new_data);
4271 else
4272 {
4273 *result = cell->data;
4274 shm_pointer_acq(*result);
4275 cell->new_data = EMPTY_SHM;
4276 }
4277 cell->has_new_data = true;
4278 // Update block
4279 block_desc.block->new_count = old_cnts.count - 1;
4280 block_desc.block->new_deleted = old_cnts.deleted + 1;
4281 // Update index
4282 if (index_desc.cell)
4283 {
4284 index_desc.cell->new_count = old_cnts.count - 1;
4285 index_desc.cell->new_deleted = old_cnts.deleted + 1;
4286 }
4287 // Update list
4288 ShmListCounts old_total_counts = shm_list_get_fast_count(thread, list.local, owned);
4289 p_atomic_int_set(&list.local->new_count, old_total_counts.count - 1);
4290 p_atomic_int_set(&list.local->new_deleted, old_total_counts.deleted + 1);
4291
4292 shm_list_changes_push(thread, list.local, block_desc.block, index_desc.cell_ii, ii);
4293
4294 transient_commit(thread);
4295 myassert(SBOOL(*result));
4296 return RESULT_OK;
4297}
4298
4299int
4300shm_list_commit(ThreadContext *thread, ShmList *list)
4301{
4302 shm_cell_check_write_lock(thread, &list->base.lock);
4303
4304 ShmListChanges *changes = shm_pointer_to_pointer(list->changes_shm);
4305 myassert(changes);
4306
4307 ShmListIndex *list_index = NULL;
4308 ShmListBlockRef first_block = {EMPTY_SHM, NULL};
4309 ShmRefcountedBlock *top_block = shm_pointer_to_pointer(list->top_block);
4310 if (top_block == NULL)
4311 myassert(false);
4312 else if (SHM_TYPE_LIST_BLOCK == top_block->type)
4313 {
4314 first_block.local = (ShmListBlock *)top_block;
4315 first_block.shared = list->top_block;
4316 }
4317 else if (SHM_TYPE_LIST_INDEX == top_block->type)
4318 list_index = (ShmListIndex *)top_block;
4319 else
4320 myassert_msg(false, "Invalid top_block->type");
4321
4322 // We should be sorting the changes list by block_index and then processing their items for each one block in batches
4323 // Dirty reads are kinda supported for growing the block size, because new_size is applied after the modification cycle thus showing the smallest valid block frame.
4324 // Dirty reads are not implementing for removal -- the removal is not implemented at all.
4325 for (int i = 0; i < changes->count; ++i)
4326 {
4327 ShmListChangeItem *item = &changes->cells[i];
4328 ShmListIndexItem *index_item = NULL;
4329 ShmListBlockRef block = shm_list_get_block(item->block_index, list_index, first_block, &index_item);
4330
4331 myassert(block.local);
4332 ShmListCounts counts = shm_list_block_get_count(block.local, true);
4333
4334 if (index_item)
4335 {
4336 // done by shm_list_get_block
4337 // myassert(index_item->count == old_count);
4338 // myassert(index_item->new_count == new_count);
4339 }
4340 else
4341 {
4342 ShmListCounts total_counts = shm_list_get_fast_count(thread, list, true);
4343 myassert(total_counts.count == counts.count);
4344 myassert(total_counts.deleted == counts.deleted);
4345 }
4346 // myassert(counts.count != EVIL_MARK);
4347
4348 // this calculation holds true for as long as new elements are appended to the tail and old elements are removed from the head.
4349 // "deleted" count can only grow by removing items from "count". Thus new_deleted + new_count >= deleted + count.
4350 ShmInt max_actual_capacity = counts.count + counts.deleted;
4351 myassert(max_actual_capacity <= block.local->capacity);
4352
4353 ShmInt ii = item->item_index; // this index is from start of the block, not from the first actual data item.
4354 myassert(ii >= 0 && ii < max_actual_capacity);
4355
4356 if (block.local->cells[ii].has_new_data)
4357 {
4358 block.local->cells[ii].has_new_data = false; // leak is better than wild pointer
4359 if (ii >= counts.deleted)
4360 myassert(block.local->cells[ii].new_data != EMPTY_SHM);
4361 else
4362 myassert(block.local->cells[ii].new_data == EMPTY_SHM);
4363 shm_pointer_move_atomic(thread, &block.local->cells[ii].data, &block.local->cells[ii].new_data);
4364 myassert(block.local->cells[ii].changed);
4365 block.local->cells[ii].changed = false;
4366 }
4367 }
4368 // second pass for setting the block->count, thus only valid part of block would be visible.
4369 bool rebuild_index = false;
4370 for (int i = 0; i < changes->count; ++i)
4371 {
4372 ShmListChangeItem *item = &changes->cells[i];
4373 ShmListIndexItem *index_item = NULL;
4374 ShmListBlockRef block = shm_list_get_block(item->block_index, list_index, first_block, &index_item);
4375 myassert(block.local);
4376 ShmInt new_count = block.local->new_count;
4377 ShmInt new_deleted = block.local->new_deleted;
4378 if (new_count != -1)
4379 {
4380 // not sure about the order
4381 p_atomic_int_set(&block.local->count, new_count);
4382 p_atomic_int_set(&block.local->new_count, -1);
4383 if (index_item)
4384 {
4385 myassert(index_item->new_count == new_count);
4386 p_atomic_int_set(&index_item->count, new_count);
4387 p_atomic_int_set(&index_item->new_count, -1);
4388 }
4389
4390 block.local->count_added_after_relocation += new_count - block.local->count;
4391
4392 if (new_count == 0)
4393 rebuild_index = true;
4394 }
4395 if (new_deleted != -1)
4396 {
4397 p_atomic_int_set(&block.local->deleted, new_deleted);
4398 p_atomic_int_set(&block.local->new_deleted, -1);
4399 if (index_item)
4400 {
4401 myassert(index_item->new_deleted == new_deleted);
4402 p_atomic_int_set(&index_item->deleted, new_deleted);
4403 p_atomic_int_set(&index_item->new_deleted, -1);
4404 }
4405 }
4406 }
4407
4408 if (rebuild_index && list_index != NULL) {
4409 // Some blocks are empty now, delete them.
4410 // Do not delete the last block.
4411 myassert(list_index->index_size > 1);
4412 int first_valid = -1;
4413 for (int i = 0; i < list_index->index_size; i++)
4414 {
4415 myassert(list_index->cells[i].new_count == -1); // transaction committed
4416 myassert(list_index->cells[i].new_deleted == -1); // transaction committed
4417 if (first_valid != -1)
4418 myassert(list_index->cells[i].count != 0); // can't have empty blocks in the middle
4419 if (list_index->cells[i].count != 0)
4420 first_valid = i;
4421 }
4422 // Do not delete the last block.
4423 if (first_valid == -1)
4424 first_valid = list_index->index_size - 1;
4425 int final_index_size = list_index->index_size - first_valid;
4426 if (final_index_size < 2) {
4427 // single block left
4428 int last_index = list_index->index_size - 1;
4429 shm_pointer_move(thread, &list->top_block, &list_index->cells[last_index].block);
4430 list->count = list_index->cells[last_index].count;
4431 list->new_count = list_index->cells[last_index].new_count;
4432 list->deleted = list_index->cells[last_index].deleted;
4433 list->new_deleted = list_index->cells[last_index].new_deleted;
4434 for (int i = 0; i < last_index; i++)
4435 shm_pointer_empty(thread, &list_index->cells[i].block);
4436 }
4437 else
4438 {
4439 myassert(final_index_size > 0 && final_index_size < shm_list_max_index_size());
4440
4441 ShmListCounts old_totals = shm_list_get_fast_count(thread, list, true);
4442 ShmInt delta_deleted = 0;
4443 ShmListCounts new_counts = { 0, 0 };
4444
4445 for (int i = 0; i < first_valid; i++)
4446 {
4447 ShmListCounts index_counts = shm_list_index_get_count(&list_index->cells[i], true);
4448 myassert(index_counts.deleted >= 0);
4449 myassert(index_counts.count == 0);
4450 delta_deleted += index_counts.deleted;
4451 }
4452
4453 for (int i = first_valid; i < list_index->index_size; i++)
4454 {
4455 ShmListCounts index_counts = shm_list_index_get_count(&list_index->cells[i], true);
4456 myassert(index_counts.deleted >= 0);
4457 myassert(index_counts.count != 0);
4458 new_counts.deleted += index_counts.deleted;
4459 new_counts.count += index_counts.count;
4460 }
4461
4462 myassert(delta_deleted + new_counts.deleted == old_totals.deleted);
4463 myassert(new_counts.count == old_totals.count);
4464
4465 // alignment is not a problem here because everything is ShmInt
4466 int index_size = isizeof(ShmListIndexHeader) + isizeof(ShmListIndexItem) * final_index_size;
4467 ShmPointer new_index_shm = EMPTY_SHM;
4468 ShmListIndex *new_index_block = new_shm_refcounted_block(thread, &new_index_shm, index_size, SHM_TYPE_LIST_INDEX, SHM_LIST_INDEX_DEBUG_ID);
4469 new_index_block->index_size = final_index_size;
4470
4471 memcpy(CAST_VL(&new_index_block->cells[0]), CAST_VL(&list_index->cells[first_valid]),
4472 sizeof(ShmListIndexItem) * (puint)final_index_size);
4473 // clear copied blocks, release the deleted.
4474 for (int i = 0; i < first_valid; i++)
4475 shm_pointer_empty(thread, &list_index->cells[i].block);
4476 for (int i = first_valid; i < list_index->index_size - 1; i++)
4477 p_atomic_shm_pointer_set(&list_index->cells[i].block, EMPTY_SHM);
4478
4479 p_atomic_shm_pointer_set(&list->top_block, new_index_shm);
4480 list->new_deleted = new_counts.deleted;
4481 }
4482 }
4483
4484 if (list->new_count != -1)
4485 {
4486 p_atomic_int_set(&list->count, list->new_count);
4487 p_atomic_int_set(&list->new_count, -1);
4488 }
4489 if (list->new_deleted != -1)
4490 {
4491 p_atomic_int_set(&list->deleted, list->new_deleted);
4492 p_atomic_int_set(&list->new_deleted, -1);
4493 }
4494
4495 return RESULT_OK;
4496}
4497
4498int
4499shm_list_rollback(ThreadContext *thread, ShmList *list)
4500{
4501 shm_cell_check_write_lock(thread, &list->base.lock);
4502
4503 ShmListChanges *changes = shm_pointer_to_pointer(list->changes_shm);
4504 myassert(changes);
4505
4506 ShmListIndex *list_index = NULL;
4507 ShmListBlockRef first_block = { EMPTY_SHM, NULL };
4508 ShmRefcountedBlock *top_block = shm_pointer_to_pointer(list->top_block);
4509 if (top_block == NULL)
4510 myassert(false);
4511 else if (SHM_TYPE_LIST_BLOCK == top_block->type)
4512 {
4513 first_block.local = (ShmListBlock *)top_block;
4514 first_block.shared = list->top_block;
4515 }
4516 else if (SHM_TYPE_LIST_INDEX == top_block->type)
4517 list_index = (ShmListIndex *)top_block;
4518 else
4519 myassert_msg(false, "Invalid top_block->type");
4520
4521 for (int i = 0; i < changes->count; ++i)
4522 {
4523 ShmListChangeItem *item = &changes->cells[i];
4524 ShmListIndexItem *index_item = NULL;
4525 ShmListBlockRef block = shm_list_get_block(item->block_index, list_index, first_block, &index_item);
4526 myassert(block.local);
4527 ShmListCounts counts = shm_list_block_get_count(block.local, true);
4528 if (index_item)
4529 {
4530 // done by shm_list_get_block
4531 // myassert(index_item->count == old_count);
4532 // myassert(index_item->new_count == new_count);
4533 }
4534 else
4535 {
4536 ShmListCounts total_counts = shm_list_get_fast_count(thread, list, true);
4537 myassert(total_counts.count == counts.count);
4538 myassert(total_counts.deleted == counts.deleted);
4539 }
4540 // myassert(counts.count != EVIL_MARK);
4541
4542 // this calculation holds true for as long as new elements are appended to the tail and old elements are removed from the head.
4543 // "deleted" count can only grow by removing items from "count". Thus new_deleted + new_count >= deleted + count.
4544 ShmInt max_actual_capacity = counts.count + counts.deleted;
4545 myassert(max_actual_capacity <= block.local->capacity);
4546
4547 ShmInt ii = item->item_index;
4548 myassert(ii >= 0 && ii < max_actual_capacity);
4549
4550 if (block.local->cells[ii].has_new_data)
4551 {
4552 block.local->cells[ii].has_new_data = false; // leak is better than wild pointer
4553 if (ii >= counts.deleted)
4554 myassert(block.local->cells[ii].new_data != EMPTY_SHM);
4555 else
4556 myassert(block.local->cells[ii].new_data == EMPTY_SHM);
4557 shm_pointer_empty_atomic(thread, &block.local->cells[ii].new_data);
4558
4559 myassert(block.local->cells[ii].changed);
4560 block.local->cells[ii].changed = false;
4561 }
4562 }
4563 for (int i = 0; i < changes->count; ++i)
4564 {
4565 ShmListChangeItem *item = &changes->cells[i];
4566 ShmListIndexItem *index_item = NULL;
4567 ShmListBlockRef block = shm_list_get_block(item->block_index, list_index, first_block, &index_item);
4568 myassert(block.local);
4569 ShmInt new_count = block.local->new_count;
4570 ShmInt new_deleted = block.local->new_deleted;
4571
4572 if (new_count != -1)
4573 {
4574 p_atomic_int_set(&block.local->new_count, -1);
4575 if (index_item)
4576 {
4577 myassert(index_item->new_count == new_count);
4578 p_atomic_int_set(&index_item->new_count, -1);
4579 }
4580 }
4581
4582 if (new_deleted != -1)
4583 {
4584 p_atomic_int_set(&block.local->new_deleted, -1);
4585 if (index_item)
4586 {
4587 myassert(index_item->new_deleted == new_deleted);
4588 p_atomic_int_set(&index_item->new_deleted, -1);
4589 }
4590 }
4591 }
4592
4593 if (list->new_count != -1)
4594 {
4595 p_atomic_int_set(&list->new_count, -1);
4596 }
4597 if (list->new_deleted != -1)
4598 {
4599 p_atomic_int_set(&list->new_deleted, -1);
4600 }
4601 return RESULT_OK;
4602}
4603
4604int
4605shm_list_unlock(ThreadContext *thread, ShmList *list, ShmInt type)
4606{
4607 if (TRANSACTION_ELEMENT_WRITE == type)
4608 p_atomic_shm_pointer_set(&list->base.lock.transaction_data, EMPTY_SHM);
4609 shm_list_changes_clear(thread, list);
4610 _shm_cell_unlock(thread, &list->base.lock, type);
4611 return RESULT_OK;
4612}
4613// end of ShmList
4614
4615
4616// Start of ShmDict
4617void
4618shm_dict_init_key(ShmDictKey *key, const char *s)
4619{
4620 key->key = (char *)(intptr_t)s;
4621 key->key_ucs = NULL;
4622 key->key_owned = false;
4623 key->keysize = (int)strlen(s);
4624 key->hash = hash_string_ascii(key->key, key->keysize);
4625}
4626
4627void
4628shm_dict_init_key_consume(ShmDictKey *key, const char *s)
4629{
4630 key->key = (char *)(intptr_t)s;
4631 key->key_ucs = NULL;
4632 key->key_owned = true;
4633 key->keysize = (int)strlen(s);
4634 key->hash = hash_string_ascii(key->key, key->keysize);
4635}
4636
4637void
4638shm_dict_free_key(ShmDictKey *key)
4639{
4640 if (key->key_owned)
4641 {
4642 char *tmp = key->key;
4643 key->key = NULL;
4644 free(tmp);
4645 }
4646}
4647
4648// ShmDict
4649ShmDict *
4650new_shm_dict(ThreadContext *thread, PShmPointer shm_pointer)
4651{
4652 // if (shm_pointer) *shm_pointer = EMPTY_SHM; - done by get_mem
4653 ShmDict *dict = (ShmDict*) get_mem(thread, shm_pointer, sizeof(ShmDict), SHM_DICT_DEBUG_ID);
4654 init_cell((ShmCell*) dict, sizeof(ShmDict), SHM_TYPE_DICT);
4655 return dict;
4656}
4657
4658ShmPointer
4659shm_dict_element_owned_get_value(ShmDictElement *element)
4660{
4661 if (element->has_new_data)
4662 return element->new_data;
4663 else
4664 return element->data;
4665}
4666
4667ShmPointer
4668shm_dict_element_get_value(ShmDictElement *element, bool owned)
4669{
4670 if ( ! element)
4671 return EMPTY_SHM;
4672 if (owned)
4673 return shm_dict_element_owned_get_value(element);
4674 else
4675 return element->data;
4676}
4677
4678int
4679shm_dict_push_delta(ThreadContext *thread, PShmPointer delta, int type, ShmPointer element, ShmPointer array)
4680{
4681 ShmDictDeltaArray *delta_array = shm_pointer_to_pointer(*delta);
4682 if ( ! delta_array)
4683 {
4684 delta_array = get_mem(thread, delta, sizeof(ShmDictDeltaElement) * DELTA_ARRAY_SIZE, SHM_DICT_DELTA_ARRAY_DEBUG_ID);
4685 if ( ! delta_array) return RESULT_FAILURE;
4686 delta_array->size = DELTA_ARRAY_SIZE;
4687 delta_array->count = 0;
4688 }
4689
4690 /*char buffer[20];
4691 OutputDebugStringA("delta push ");
4692 _itoa_s(delta_array->count, buffer, 20, 10);
4693 OutputDebugStringA(buffer);
4694 OutputDebugStringA(" ");
4695 _itoa_s(delta_array->size, buffer, 20, 10);
4696 OutputDebugStringA(buffer);
4697 OutputDebugStringA("\n");*/
4698 if (!(delta_array->count < delta_array->size))
4699 DebugBreak();
4700 // myassert_msg(delta_array->count < delta_array->size, "delta array overflow");
4701 if ( ! (delta_array->count < delta_array->size) ) return RESULT_FAILURE;
4702
4703 int idx = delta_array->count;
4704 delta_array->deltas[idx].type = type;
4705 delta_array->deltas[idx].value = element;
4706 delta_array->deltas[idx].parent_element = array;
4707 delta_array->count++;
4708
4709 return RESULT_OK;
4710}
4711
4712typedef vl2 struct {
4713 bool owned;
4714 uint32_t hash;
4715 const char *key;
4716 int keysize;
4717
4718 ShmDictElement *first_empty;
4719 ShmPointer first_empty_shm;
4720 int first_empty_level;
4721 ShmPointer first_empty_array_shm;
4722
4723 ShmDictElement *deepest;
4724 ShmPointer deepest_shm;
4725 int deepest_level;
4726 ShmPointer deepest_array_shm;
4727} DictFindData;
4728
4729void
4730dict_init_find_data(DictFindData *data)
4731{
4732 memset(data, 0, sizeof(DictFindData));
4733 data->first_empty_shm = EMPTY_SHM;
4734 data->first_empty_array_shm = EMPTY_SHM;
4735 data->deepest_shm = EMPTY_SHM;
4736 data->deepest_array_shm = EMPTY_SHM;
4737}
4738
4739void
4740dict_init_node(ShmDictElement *element, int size)
4741{
4742 for (int i = 0; i < size; ++i)
4743 {
4744 element[i].data = EMPTY_SHM;
4745 element[i].has_new_data = false;
4746 element[i].new_data = EMPTY_SHM;
4747 element[i].key = EMPTY_SHM;
4748 element[i].key_debug = NULL;
4749 element[i].nested = EMPTY_SHM;
4750 }
4751}
4752
4753// deepest is the first empty node found for hash, no matter spare or in use.
4754// Returns true only if at some level it passed the "deepest" with perfect key match and in-use.
4755
4756ShmDictElement *
4757find_nested(ShmDictElement *array, ShmPointer array_shm, uint32_t mask, uint32_t level,
4758 DictFindData *find_data, PShmPointer result_shm)
4759{
4760 if (array == NULL) return NULL;
4761 assert(level < (sizeof(uint32_t) * 8 - DICT_LEVEL_BITS - 1) / DICT_LEVEL_BITS);
4762 uint32_t level_mult = 1 << (level*DICT_LEVEL_BITS);
4763 uint32_t prev_levels_mask = level_mult - 1;
4764 uint32_t all_levels_mask = (level_mult << DICT_LEVEL_BITS) - 1;
4765 uint32_t level_mask = all_levels_mask & (~prev_levels_mask);
4766 // e.g. for level = 2 and DICT_LEVEL_BITS = 2: level_mult = 0x10; prev_levels_mask = 0xF; all_levels_mask = 0x3F; level_mask = 0x30;
4767
4768 // for (int idx = 0; i < DICT_LEVEL_SIZE; ++i) {
4769 // if (level_mult*i == (hash & level_mask)) {
4770 uint32_t idx = (find_data->hash & level_mask) / level_mult;
4771 // fprintf(stderr, "level %i idx %i\n", level, idx);
4772 myassert_msg(0 <= idx && idx < DICT_LEVEL_SIZE, "idx >= 0 && idx < DICT_LEVEL_SIZE");
4773 find_data->deepest = &array[idx];
4774 find_data->deepest_level = level;
4775 find_data->deepest_shm = shm_pointer_shift(array_shm, ((char*)&array[idx]) - ((char*)array));
4776 find_data->deepest_array_shm = array_shm;
4777
4778 bool filled = false;
4779 bool found = false;
4780 ShmPointer data = shm_dict_element_get_value(&array[idx], find_data->owned);
4781 if (data != EMPTY_SHM && data != DEBUG_SHM)
4782 {
4783 filled = true;
4784 if (array[idx].hash == find_data->hash)
4785 {
4786 const char *data_key = shm_pointer_to_pointer(array[idx].key);
4787 if (find_data->keysize == array[idx].keysize && strncmp(data_key, find_data->key, find_data->keysize) == 0)
4788 {
4789 found = true;
4790 }
4791 }
4792 }
4793 if (found)
4794 {
4795 *result_shm = find_data->deepest_shm;
4796 return find_data->deepest;
4797 }
4798 else
4799 {
4800 if ( ! filled && find_data->first_empty == NULL)
4801 {
4802 find_data->first_empty = &array[idx];
4803 find_data->first_empty_level = level;
4804 find_data->first_empty_shm = shm_pointer_shift(array_shm, ((char*)&array[idx]) - ((char*)array));
4805 find_data->first_empty_array_shm = array_shm;
4806 }
4807 }
4808 ShmDictElement *nested = (ShmDictElement *) shm_pointer_to_pointer(array[idx].nested);
4809 return find_nested(nested, array[idx].nested, mask, level + 1, find_data, result_shm);
4810}
4811
4812ShmDictElement *
4813shm_dict_do_find(ShmDictElement *root, ShmPointer root_shm, DictFindData *find_data, PShmPointer result_shm)
4814{
4815 uint32_t mask = ~((1 << DICT_LEVEL_BITS) - 1); // FFFFFFF8 for DICT_LEVEL_SIZE = 8
4816 assert(false);
4817 return find_nested(root, root_shm, mask, 0, find_data, result_shm);
4818}
4819
4820int
4821shm_dict_find_append_element(ThreadContext *thread, DictRef dict, ShmDictKey *key, bool can_append,
4822 ShmDictElement **result, PShmPointer result_shm, PShmPointer array_shm)
4823{
4824 bool lock_taken = false;
4825 // start_transaction(thread, TRANSACTION_TRANSIENT, LOCKING_WRITE, false, NULL);
4826 if_failure(transaction_lock_write(thread, &dict.local->lock, dict.shared, CONTAINER_DICT_DELTA, &lock_taken),
4827 {
4828 return status;
4829 });
4830 ShmPointer root_shm = dict.local->data;
4831 ShmDictElement *root = (ShmDictElement *) shm_pointer_to_pointer(dict.local->data);
4832 if ( ! root)
4833 {
4834 if (!can_append)
4835 {
4836 *result = NULL;
4837 *result_shm = EMPTY_SHM;
4838 *array_shm = EMPTY_SHM;
4839 return RESULT_OK;
4840 }
4841 root = (ShmDictElement *) get_mem(thread, &root_shm, sizeof(ShmDictElement) * DICT_LEVEL_SIZE, SHM_DICT_ELEMENT_ARRAY_DEBUG_ID);
4842 if ( ! root) {
4843 // transaction_unlock_local(thread, &dict.local->lock, dict.shared, RESULT_FAILURE, lock_taken);
4844 // abort_transaction(thread, NULL);
4845 return RESULT_FAILURE;
4846 }
4847 myassert(SBOOL(root_shm));
4848 dict_init_node(root, DICT_LEVEL_SIZE);
4849 dict.local->data = root_shm;
4850 // Let's just allocate it on demand and never deallocate
4851 // shm_dict_push_delta(dict.local, DICT_DELTA_NEW_ROOT, root_shm, dict.shared);
4852 }
4853
4854 uint32_t mask = ((1 << DICT_LEVEL_BITS) - 1); // 7 for DICT_LEVEL_SIZE = 8
4855 DictFindData find_data;
4856 dict_init_find_data(&find_data);
4857 find_data.owned = true;
4858 find_data.hash = key->hash;
4859 find_data.key = key->key;
4860 find_data.keysize = key->keysize;
4861 *result = find_nested(root, root_shm, mask, 0, &find_data, result_shm);
4862 if ( ! *result)
4863 {
4864 if (!can_append)
4865 {
4866 *result = NULL;
4867 *result_shm = EMPTY_SHM;
4868 *array_shm = EMPTY_SHM;
4869 return RESULT_OK;
4870 }
4871 myassert_msg(find_data.deepest && find_data.deepest_level >= 0, "find_data.deepest && find_data.deepest_level >= 0");
4872 myassert_msg(find_data.deepest_array_shm != EMPTY_SHM, "find_data.deepest_array_shm != EMPTY_SHM");
4873 // ShmPointer data = shm_dict_element_owned_get_value(find_data.deepest);
4874 // element is empty when both new_data and data are empty, meaning a committed removal of the value.
4875 // if (data != EMPTY_SHM)
4876 if (find_data.first_empty == NULL)
4877 {
4878 // deepest is already used, obviously.
4879 // Still no way to reclaim other empty nodes except deepest.
4880 ShmPointer node_shm;
4881 ShmDictElement *node = get_mem(thread, &node_shm, sizeof(ShmDictElement) * DICT_LEVEL_SIZE, SHM_DICT_ELEMENT_ARRAY_DEBUG_ID);
4882 if ( ! node) {
4883 // transaction_unlock_local(thread, &dict.local->lock, dict.shared, RESULT_FAILURE, lock_taken);
4884 // abort_transaction(thread, NULL);
4885 return RESULT_FAILURE;
4886 }
4887 dict_init_node(node, DICT_LEVEL_SIZE);
4888 myassert(shm_dict_push_delta(thread, &dict.local->delta, DICT_DELTA_NEW_NODE, node_shm, find_data.deepest_shm));
4889 find_data.deepest->nested = node_shm;
4890
4891 // find_data.deepest_level ++;
4892 *array_shm = node_shm;
4893 int index = (key->hash >> (DICT_LEVEL_BITS*(find_data.deepest_level+1))) & mask;
4894 myassert_msg(index >= 0 && index < DICT_LEVEL_SIZE, "index >= 0 && index < DICT_LEVEL_SIZE");
4895 // fprintf(stderr, "created new node at level %i for insertion at idx %i hash %08x\n", find_data.deepest_level, index, key->hash);
4896 *result = &node[index];
4897 *result_shm = shm_pointer_shift(node_shm, ((char*)&node[index]) - ((char*)node));
4898 }
4899 else
4900 {
4901 *result = find_data.first_empty;
4902 *result_shm = find_data.first_empty_shm;
4903 *array_shm = find_data.first_empty_array_shm;
4904 }
4905 // claim the item
4906 if (SBOOL((*result)->key))
4907 free_mem(thread, (*result)->key, -1);
4908 char *s = get_mem(thread, &(*result)->key, key->keysize, SHM_DICT_ELEMENT_KEY_DEBUG_ID);
4909 memcpy(s, key->key, (size_t)key->keysize);
4910 (*result)->keysize = key->keysize;
4911 (*result)->hash = key->hash;
4912 (*result)->key_debug = s;
4913 }
4914
4915 // int status = transaction_unlock_local(thread, &dict.local->lock, dict.shared, RESULT_OK, lock_taken);
4916 // commit_transaction(thread, NULL);
4917 if ( ! *result)
4918 return RESULT_FAILURE;
4919 else
4920 return RESULT_OK;
4921}
4922
4923int
4924shm_dict_set_consume(ThreadContext *thread, DictRef dict, ShmDictKey *key,
4925 ShmPointer value, ShmDictElement **result, DictProfiling *profiling)
4926{
4927 if (result)
4928 *result = NULL;
4929 bool lock_taken1 = false;
4930 uint64_t tmp;
4931 tmp = rdtsc();
4932 // start_transaction(thread, TRANSACTION_TRANSIENT, LOCKING_WRITE, false, NULL);
4933 if (profiling) profiling->counter1 += rdtsc() - tmp;
4934 tmp = rdtsc();
4935 if_failure(transaction_lock_write(thread, &dict.local->lock, dict.shared, CONTAINER_DICT_DELTA, &lock_taken1),
4936 {
4937 if (SBOOL(value))
4938 shm_pointer_release(thread, value);
4939 transient_abort(thread);
4940 return status;
4941 });
4942 if (profiling) profiling->counter2 += rdtsc() - tmp;
4943 tmp = rdtsc();
4944 {
4945 //ShmDictElement *element = shm_dict_find(thread, dict, hash, key, keysize));
4946 //if ( ! element)
4947 ShmDictElement *element = NULL;
4948 ShmPointer element_shm = EMPTY_SHM;
4949 ShmPointer array_shm = EMPTY_SHM;
4950 {
4951 if_failure(shm_dict_find_append_element(thread, dict, key, value != EMPTY_SHM, &element, &element_shm, &array_shm),
4952 {
4953 if (SBOOL(value))
4954 shm_pointer_release(thread, value);
4955 transient_abort(thread);
4956 return status;
4957 });
4958 }
4959 if (profiling) profiling->counter3 += rdtsc() - tmp;
4960 tmp = rdtsc();
4961 if (element == NULL && value == EMPTY_SHM)
4962 {
4963 transient_commit(thread);
4964 return RESULT_OK;
4965 }
4966 myassert_msg(element != NULL, "shm_dict_set_consume: element");
4967 tmp = rdtsc();
4968 if (shm_dict_element_get_value(element, true) != value)
4969 {
4970 if (profiling) profiling->counter4 += rdtsc() - tmp;
4971 tmp = rdtsc();
4972 myassert(shm_dict_push_delta(thread, &dict.local->delta, DICT_DELTA_CHANGED, value, element_shm));
4973
4974 // release valid value only
4975 if (element->has_new_data && shm_pointer_is_valid(element->new_data))
4976 shm_pointer_empty(thread, &element->new_data);
4977 if (true || !element->has_new_data || element->new_data != element->data)
4978 {
4979 element->has_new_data = true;
4980 element->new_data = value; // consume
4981 }
4982 else
4983 {
4984 // we can live without this branch
4985 element->has_new_data = false;
4986 element->new_data = EMPTY_SHM;
4987 shm_pointer_release(thread, value);
4988 }
4989 if (profiling) profiling->counter5 += rdtsc() - tmp;
4990 }
4991 else
4992 {
4993 tmp = rdtsc();
4994 shm_pointer_release(thread, value); // just release
4995 if (profiling) profiling->counter6 += rdtsc() - tmp;
4996 }
4997
4998 tmp = rdtsc();
4999 if (result)
5000 *result = element;
5001 // transaction_unlock_local(thread, &dict.local->lock, dict.shared, RESULT_OK, lock_taken1);
5002 transient_commit(thread);
5003 if (profiling) profiling->counter7 += rdtsc() - tmp;
5004 return RESULT_OK;
5005 };
5006}
5007
5008int
5009shm_dict_set_empty(ThreadContext *thread, DictRef dict, ShmDictKey *key, DictProfiling *profiling)
5010{
5011 return shm_dict_set_consume(thread, dict, key, EMPTY_SHM, NULL, profiling);
5012}
5013
5014int
5015shm_dict_get(ThreadContext *thread, DictRef dict, ShmDictKey *key,
5016 PShmPointer result_value, ShmDictElement **result_element)
5017{
5018 *result_value = EMPTY_SHM;
5019 *result_element = NULL;
5020 bool lock_taken = false;
5021 if_failure(transaction_lock_read(thread, &dict.local->lock, dict.shared, CONTAINER_DICT_DELTA, &lock_taken),
5022 {
5023 return status;
5024 });
5025 shm_cell_check_read_write_lock(thread, &dict.local->lock);
5026 ShmPointer root_shm = dict.local->data;
5027 ShmDictElement *root = (ShmDictElement *) shm_pointer_to_pointer(dict.local->data);
5028 if (root)
5029 {
5030 uint32_t mask = ~((1 << DICT_LEVEL_BITS) - 1); // FFFFFFF8 for DICT_LEVEL_SIZE = 8, FFFFFFFC for DICT_LEVEL_SIZE = 4
5031 ShmPointer found_shm = EMPTY_SHM;
5032 DictFindData find_data;
5033 dict_init_find_data(&find_data);
5034 find_data.owned = shm_cell_have_write_lock(thread, &dict.local->lock);
5035 find_data.hash = key->hash;
5036 find_data.key = key->key;
5037 find_data.keysize = key->keysize;
5038 ShmDictElement *found = find_nested(root, root_shm, mask, 0, &find_data, &found_shm);
5039
5040 if (found) {
5041 ShmPointer value = shm_dict_element_get_value(found, find_data.owned);
5042 if (value != EMPTY_SHM && value != DEBUG_SHM)
5043 {
5044 *result_value = value;
5045 *result_element = found;
5046 }
5047 }
5048 }
5049 // transaction_unlock_local(thread, &dict.local->lock, dict.shared, RESULT_OK, lock_taken);
5050 return RESULT_OK;
5051}
5052
5053void
5054shm_dict_destroy(ThreadContext *thread, ShmDict *dict, ShmPointer dict_shm)
5055{
5056 // TBD
5057}
5058
5059int
5060dict_nested_count(ShmDictElement *root, bool owned)
5061{
5062 if (root == NULL)
5063 return 0;
5064 int rslt = 0;
5065 for (int i = 0; i < DICT_LEVEL_SIZE; ++i)
5066 {
5067 if (shm_dict_element_get_value(&root[i], owned) != EMPTY_SHM)
5068 rslt++;
5069 ShmDictElement *nested = (ShmDictElement *)shm_pointer_to_pointer(root[i].nested);
5070 if (nested)
5071 rslt += dict_nested_count(nested, owned);
5072 }
5073 return rslt;
5074}
5075
5076int
5077shm_dict_get_count_debug(ThreadContext *thread, DictRef *dict)
5078{
5079 int rslt = 0;
5080 ShmDictElement *root = (ShmDictElement *)shm_pointer_to_pointer_no_side(dict->local->data);
5081 if (root)
5082 {
5083 rslt += dict_nested_count(root, shm_cell_have_write_lock(thread, &dict->local->lock));
5084 return rslt;
5085 }
5086 else
5087 return -1;
5088}
5089
5090int
5091shm_dict_get_count(ThreadContext *thread, DictRef dict)
5092{
5093 int rslt = 0;
5094 ShmDictElement *root = (ShmDictElement *)shm_pointer_to_pointer(dict.local->data);
5095 if (!root)
5096 return -1;
5097 rslt += dict_nested_count(root, shm_cell_have_write_lock(thread, &dict.local->lock));
5098 return rslt;
5099}
5100
5101void
5102shm_dict_delta_clear(ThreadContext *thread, ShmDictDeltaArray *delta_array)
5103{
5104 delta_array->count = 0;
5105}
5106
5107int
5108shm_dict_rollback(ThreadContext *thread, ShmDict *dict)
5109{
5110 shm_cell_check_write_lock(thread, &dict->lock);
5111
5112 int rslt = RESULT_OK;
5113 ShmDictDeltaArray *delta_array = shm_pointer_to_pointer(dict->delta);
5114 if (delta_array)
5115 {
5116 for (int i = 0; i < delta_array->count; ++i)
5117 {
5118 switch (delta_array->deltas[i].type)
5119 {
5120 case DICT_DELTA_NEW_ROOT: // created root array
5121 continue;
5122 break;
5123 case DICT_DELTA_NEW_NODE: // created nested array
5124 {
5125 ShmDictElement *array = shm_pointer_to_pointer(delta_array->deltas[i].parent_element);
5126 if (!array)
5127 {
5128 rslt = RESULT_FAILURE;
5129 break;
5130 }
5131 // free the new element on abort?
5132 break;
5133 }
5134 case DICT_DELTA_DEL_ROOT: // removed root array
5135 break;
5136 case DICT_DELTA_DEL_NODE: // removed nested array
5137 break;
5138 case DICT_DELTA_CHANGED:
5139 {
5140 // both value and element are not refcounted
5141 ShmDictElement *item = shm_pointer_to_pointer(delta_array->deltas[i].parent_element);
5142 if (item->has_new_data && item->new_data == delta_array->deltas[i].value)
5143 {
5144 item->has_new_data = false;
5145 shm_pointer_move(thread, &item->data, &item->new_data); // swap pointer and free unused one
5146 }
5147 // otherwise deltas[i].value is probably freed already
5148 break;
5149 }
5150 }
5151 }
5152 }
5153 // OutputDebugStringA("delta rollback");
5154 return RESULT_OK;
5155}
5156
5157int
5158shm_dict_commit(ThreadContext *thread, ShmDict *dict)
5159{
5160 shm_cell_check_write_lock(thread, &dict->lock);
5161
5162 int rslt = RESULT_OK;
5163 // OutputDebugStringA("delta commit");
5164 ShmDictDeltaArray *delta_array = shm_pointer_to_pointer(dict->delta);
5165 if (delta_array)
5166 {
5167 for (int i = 0; i < delta_array->count; ++i)
5168 {
5169 switch (delta_array->deltas[i].type)
5170 {
5171 case DICT_DELTA_NEW_ROOT: // created root array
5172 continue;
5173 break;
5174 case DICT_DELTA_NEW_NODE: // created nested array
5175 {
5176 ShmDictElement *array = shm_pointer_to_pointer(delta_array->deltas[i].parent_element);
5177 if (!array)
5178 {
5179 rslt = RESULT_FAILURE;
5180 break;
5181 }
5182 // do nothing, we only free the new element on abort
5183 break;
5184 }
5185 case DICT_DELTA_DEL_ROOT: // removed root array
5186 break;
5187 case DICT_DELTA_DEL_NODE: // removed nested array
5188 break;
5189 case DICT_DELTA_CHANGED:
5190 {
5191 ShmDictElement *item = shm_pointer_to_pointer(delta_array->deltas[i].parent_element);
5192 // myassert(item->new_data != item->data, NULL);
5193 if (item->has_new_data && item->new_data == delta_array->deltas[i].value)
5194 {
5195 item->has_new_data = false;
5196 shm_pointer_move(thread, &item->data, &item->new_data); // swap pointer and free unused one
5197 }
5198 // otherwise deltas[i].value is probably freed already
5199 }
5200 }
5201 }
5202 }
5203
5204 // shm_dict_delta_clear(thread, dict);
5205 return rslt;
5206}
5207
5208int
5209shm_dict_unlock(ThreadContext *thread, ShmDict *dict, ShmInt type)
5210{
5211 // myassert(dict->lock.id == thread->self, "dict->lock.id == thread->self");
5212 if (TRANSACTION_ELEMENT_WRITE == type)
5213 p_atomic_shm_pointer_set(&dict->lock.transaction_data, EMPTY_SHM);
5214 ShmDictDeltaArray *delta_array = shm_pointer_to_pointer(dict->delta);
5215 if (delta_array)
5216 shm_dict_delta_clear(thread, delta_array);
5217 _shm_cell_unlock(thread, &dict->lock, type);
5218 return RESULT_OK;
5219}
5220
5221// end ShmDict
5222
5223// start ShmUnDict
5224void
5225shm_undict_table_destroy(ThreadContext *thread, hash_table_header *header);
5226void
5227shm_undict_index_destroy(ThreadContext *thread, hash_table_index_header *index, bool deep);
5228
5229ShmUnDict *
5230new_shm_undict(ThreadContext *thread, PShmPointer shm_pointer)
5231{
5232 // if (shm_pointer) *shm_pointer = EMPTY_SHM; - done by get_mem
5233 ShmUnDict *undict = get_mem(thread, shm_pointer, sizeof(ShmUnDict), SHM_UNDICT_DEBUG_ID);
5234 init_container((ShmContainer *)undict, sizeof(ShmUnDict), SHM_TYPE_UNDICT);
5235 undict->class_name = EMPTY_SHM;
5236 undict->buckets = EMPTY_SHM;
5237 undict->count = 0;
5238 undict->deleted_count = 0;
5239 undict->delta_buckets = EMPTY_SHM; // ShmUnDictTableHeader
5240 undict->delta_count = 0;
5241 undict->delta_deleted_count = 0;
5242 return undict;
5243}
5244
5245int
5246_shm_dict_get_element_size(ShmInt type)
5247{
5248 if (type == SHM_TYPE_UNDICT_TABLE || type == SHM_TYPE_UNDICT_INDEX)
5249 return sizeof(hash_bucket);
5250 else if (type == SHM_TYPE_UNDICT_DELTA_TABLE || type == SHM_TYPE_UNDICT_DELTA_INDEX)
5251 return sizeof(hash_delta_bucket);
5252 else
5253 {
5254 myassert_msg(false, "Invalid unordered dict table type");
5255 return 0;
5256 }
5257}
5258
5259int
5260shm_dict_get_element_size(hash_table_header *header)
5261{
5262 return _shm_dict_get_element_size(header->type);
5263}
5264
5265hash_table_header *
5266new_dict_table(ThreadContext *thread, ShmPointer *out_pntr, ShmInt type, int log_count, int item_size)
5267{
5268 type = type & (~1);
5269 myassert(log_count > 0 && log_count < SHM_UNDICT_LOGCOUNT_LIMIT); // 64k * 4k
5270 *out_pntr = EMPTY_SHM;
5271 const int element_size = _shm_dict_get_element_size(type);
5272 myassert(element_size == item_size);
5273 ShmWord block_total_size = isizeof(hash_table_header) - isizeof(hash_bucket) + element_size * (1 << log_count);
5274 if (block_total_size < 7000)
5275 {
5276 // simple in-place table
5277 hash_table_header *new_block = new_shm_refcounted_block(thread, out_pntr, block_total_size, type,
5278 type == SHM_TYPE_UNDICT_DELTA_TABLE ? SHM_UNDICT_DELTA_TABLE_DEBUG_ID : SHM_UNDICT_TABLE_DEBUG_ID);
5279 new_block->is_index = false;
5280 new_block->log_count = log_count;
5281 return new_block;
5282 }
5283 else
5284 {
5285 int log_index = 0;
5286 do {
5287 log_index++;
5288 block_total_size = isizeof(hash_table_header) - isizeof(hash_bucket) + element_size * (1 << (log_count - log_index));
5289 } while (block_total_size >= 7000);
5290 myassert(log_index > 0 && log_index < 16); // 64k * 4k = 256M seems decent
5291
5292 ShmWord index_total_size = isizeof(hash_table_index_header) - isizeof(ShmPointer) + isizeof(ShmPointer) * (1 << log_index);
5293 hash_table_index_header *new_block = new_shm_refcounted_block(thread, out_pntr, index_total_size, type | 1,
5294 type == SHM_TYPE_UNDICT_DELTA_TABLE ? SHM_UNDICT_DELTA_TABLE_INDEX_DEBUG_ID : SHM_UNDICT_TABLE_INDEX_DEBUG_ID);
5295 new_block->is_index = true;
5296 new_block->log_count = log_count;
5297 new_block->index_log_size = log_index;
5298 new_block->relocated = false;
5299
5300 ShmPointer *index_blocks = &new_block->blocks;
5301 for (int idx = 0; idx < (1 << log_index); ++idx)
5302 {
5303 myassert(index_blocks[idx] != guard_bytes);
5304 hash_table_header *subblock = new_shm_refcounted_block(thread, &index_blocks[idx], block_total_size, type,
5305 type == SHM_TYPE_UNDICT_DELTA_TABLE ? SHM_UNDICT_DELTA_TABLE_BLOCK_DEBUG_ID : SHM_UNDICT_TABLE_BLOCK_DEBUG_ID);
5306 subblock->log_count = log_count - log_index;
5307 subblock->is_index = false;
5308 }
5309
5310 return (hash_table_header *)new_block;
5311 }
5312}
5313
5314// ShmUndict
5315
5316bool
5317shm_undict_get_table(ShmPointer p, hash_table_header **header_pntr, int item_size)
5318{
5319 hash_table_header *header = shm_pointer_to_pointer(p);
5320 if (header_pntr)
5321 *header_pntr = header;
5322
5323 if (header == NULL)
5324 return false;
5325 const int element_size = shm_dict_get_element_size(header);
5326 myassert(element_size == item_size);
5327
5328 return true;
5329}
5330
5331bool
5332shm_undict_compare_false(hash_bucket *bucket, ShmUnDictKey *key)
5333{
5334 return false;
5335}
5336
5337bool
5338shm_undict_compare_key1(hash_bucket *bucket, ShmUnDictKey *key)
5339{
5340 RefString s = shm_ref_string_get(bucket->key);
5341
5342 if (key->keysize != s.len)
5343 return false;
5344 else
5345 {
5346 for (int i = 0; i < s.len; ++i)
5347 {
5348 if (s.data[i] != key->key1[i])
5349 return false;
5350 }
5351 return true;
5352 }
5353}
5354bool
5355shm_undict_compare_key2(hash_bucket *bucket, ShmUnDictKey *key)
5356{
5357 RefString s = shm_ref_string_get(bucket->key);
5358
5359 if (key->keysize != s.len)
5360 return false;
5361 else
5362 {
5363 for (int i = 0; i < s.len; ++i)
5364 {
5365 if (s.data[i] != key->key2[i])
5366 return false;
5367 }
5368 return true;
5369 }
5370}
5371bool
5372shm_undict_compare_key4(hash_bucket *bucket, ShmUnDictKey *key)
5373{
5374 RefString s = shm_ref_string_get(bucket->key);
5375
5376 return key->keysize == s.len && memcmp(s.data, key->key4, (size_t)(s.len * isizeof(Py_UCS4))) == 0;
5377}
5378
5379bool
5380shm_undict_compare_ref_string(hash_bucket *bucket, ShmUnDictKey *key)
5381{
5382 RefString s = shm_ref_string_get(bucket->key);
5383 RefString s2 = shm_ref_string_get(key->key_shm);
5384
5385 return s2.len == s.len && memcmp(s.data, s2.data, (size_t)(s.len * isizeof(Py_UCS4))) == 0;
5386}
5387
5388typedef enum {
5389 key_invalid, key1, key2, key4, key_shm
5390} ShmUndictKeyType;
5391
5392ShmUndictKeyType
5393shm_undict_key_type(ShmUnDictKey *key)
5394{
5395 if (key->key1)
5396 {
5397 myassert(!key->key2);
5398 myassert(!key->key4);
5399 myassert(!SBOOL(key->key_shm));
5400 return key1;
5401 }
5402 else if (key->key2)
5403 {
5404 myassert(!key->key1);
5405 myassert(!key->key4);
5406 myassert(!SBOOL(key->key_shm));
5407 return key2;
5408 }
5409 else if (key->key4)
5410 {
5411 myassert(!key->key1);
5412 myassert(!key->key2);
5413 myassert(!SBOOL(key->key_shm));
5414 return key4;
5415 }
5416 else if (SBOOL(key->key_shm))
5417 {
5418 myassert(!key->key1);
5419 myassert(!key->key2);
5420 myassert(!key->key4);
5421 return key_shm;
5422 }
5423 myassert(false);
5424 return key_invalid;
5425}
5426
5427hash_key_compare
5428shm_undict_key_compare_func(ShmUnDictKey *key)
5429{
5430 switch (shm_undict_key_type(key))
5431 {
5432 case key1:
5433 return shm_undict_compare_key1;
5434 case key2:
5435 return shm_undict_compare_key2;
5436 case key4:
5437 return shm_undict_compare_key4;
5438 case key_shm:
5439 return shm_undict_compare_ref_string;
5440 case key_invalid:
5441 myassert(false);
5442 }
5443 myassert(false);
5444 return NULL;
5445}
5446
5447ShmPointer
5448shm_undict_key_to_ref_string(ThreadContext *thread, ShmUnDictKey *key)
5449{
5450 switch (shm_undict_key_type(key))
5451 {
5452 case key1:
5453 return shm_ref_string_new_ascii(thread, key->key1, key->keysize);
5454 case key2:
5455 myassert_msg(false, "Not implemented");
5456 break;
5457 case key4:
5458 return shm_ref_string_new(thread, key->key4, key->keysize);
5459 break;
5460 case key_shm:
5461 shm_pointer_acq(key->key_shm);
5462 return key->key_shm;
5463 break;
5464 case key_invalid:
5465 myassert(false);
5466 }
5467 myassert(false);
5468 return EMPTY_SHM;
5469}
5470
5471void
5472shm_undict_validate_header(hash_table_header *header, bool is_delta)
5473{
5474 if (header->is_index)
5475 myassert(header->type == (is_delta ? SHM_TYPE_UNDICT_DELTA_INDEX : SHM_TYPE_UNDICT_INDEX));
5476 else
5477 myassert(header->type == (is_delta ? SHM_TYPE_UNDICT_DELTA_TABLE : SHM_TYPE_UNDICT_TABLE));
5478
5479 myassert(SHM_TYPE_RELEASE_MARK != (SHM_TYPE_RELEASE_MARK & (puint)header->type));
5480}
5481
5482
5483void
5484shm_undict_grow_table(ThreadContext *thread, ShmUnDict *dict, bool is_delta, int increment, int item_size,
5485 hash_table_header **created_header, volatile ShmInt *created_bucket_count)
5486{
5487 ShmPointer old_buckets;
5488 if (is_delta)
5489 old_buckets = dict->delta_buckets;
5490 else
5491 old_buckets = dict->buckets;
5492
5493 hash_table_header *header = shm_pointer_to_pointer(old_buckets);
5494 shm_undict_validate_header(header, is_delta);
5495 const int element_size = shm_dict_get_element_size(header);
5496 myassert(element_size == item_size);
5497 myassert(header->log_count < SHM_UNDICT_LOGCOUNT_LIMIT);
5498 if (!header->is_index)
5499 {
5500 const int old_size = isizeof(hash_table_header) - isizeof(hash_bucket) + element_size * (1 << header->log_count);
5501 myassert(header->size == old_size);
5502 }
5503
5504 myassert(increment > 0 && increment < SHM_UNDICT_LOGCOUNT_LIMIT);
5505 int new_log_count = header->log_count + increment;
5506 myassert(new_log_count < SHM_UNDICT_LOGCOUNT_LIMIT);
5507 ShmPointer new_table_shm;
5508 hash_table_header *new_header = new_dict_table(thread, &new_table_shm, header->type, new_log_count, element_size);
5509
5510 hash_func_args new_table_args;
5511 ShmInt new_count = 0;
5512 ShmInt new_deleted_count = 0;
5513 new_table_args.thread = thread;
5514 new_table_args.bucket_item_size = element_size;
5515 new_table_args.header = new_header;
5516 new_table_args.item_count = &new_count;
5517 new_table_args.deleted_count = &new_deleted_count;
5518 new_table_args.bucket_count = 1 << new_log_count;
5519 new_table_args.compare_key = shm_undict_compare_false; // We just copy, we don't need strict comparision.
5520
5521 ShmUnDictKey key = EMPTY_SHM_UNDICT_KEY;
5522
5523 p_atomic_int_set(&header->relocated, true);
5524
5525 for (long src_index = 0; src_index < (1 << header->log_count); ++src_index)
5526 {
5527 hash_bucket *src_item = get_bucket_at_index(header, src_index, element_size);
5528
5529 bool is_deleted = false;
5530 switch (bucket_get_state(src_item))
5531 {
5532 case HASH_BUCKET_STATE_DELETED_RESERVED:
5533 myassert(is_delta);
5534 is_deleted = true;
5535 // pass through
5536 case HASH_BUCKET_STATE_SET:
5537 {
5538 key.hash = src_item->key_hash;
5539 find_position_result find_rslt = hash_find_position(new_table_args, &key);
5540 myassert(find_rslt.found == -1);
5541 myassert(find_rslt.last_free >= 0);
5542 myassert(find_rslt.found != -2);
5543 hash_bucket *new_item = get_bucket_at_index(new_table_args.header, find_rslt.last_free, new_table_args.bucket_item_size);
5544 shm_pointer_move_atomic(thread, &new_item->key, &src_item->key);
5545 myassert(shm_pointer_refcount(thread, new_item->key) > 0);
5546 shm_pointer_move_atomic(thread, &new_item->key_hash, &src_item->key_hash);
5547 shm_pointer_move_atomic(thread, &new_item->value, &src_item->value);
5548 if (is_delta)
5549 ((hash_delta_bucket *)new_item)->orig_item = ((hash_delta_bucket *)src_item)->orig_item;
5550
5551 if (is_deleted)
5552 new_deleted_count++;
5553 else
5554 new_count++;
5555
5556 break;
5557 }
5558 case HASH_BUCKET_STATE_DELETED:
5559 // skip
5560 myassert(SBOOL(src_item->key) == false);
5561 break;
5562 case HASH_BUCKET_STATE_EMPTY:
5563 // skip
5564 break;
5565 default:
5566 myassert(false);
5567 }
5568 }
5569 // shm_undict_debug_print(thread, dict);
5570
5571 // we've already trashed the table -- might be a good idea to unlink the index->table references
5572 if (header->is_index)
5573 shm_undict_index_destroy(thread, (hash_table_index_header *)header, false);
5574
5575 if (is_delta)
5576 {
5577 myassert(new_count == dict->delta_count);
5578 myassert(new_deleted_count == dict->delta_deleted_count);
5579 shm_pointer_move(thread, &dict->delta_buckets, &new_table_shm);
5580 }
5581 else
5582 {
5583 myassert(new_count == dict->count);
5584 myassert(new_deleted_count == 0);
5585 shm_pointer_move(thread, &dict->buckets, &new_table_shm);
5586 }
5587
5588 if (created_header)
5589 *created_header = new_header;
5590 if (created_bucket_count)
5591 *created_bucket_count = new_table_args.bucket_count;
5592}
5593
5594bool
5595grow_or_allocate_delta(ThreadContext *thread, ShmUnDict *dict, hash_func_args *delta_args, hash_table_header **delta_header)
5596{
5597 if (*delta_args->item_count + *delta_args->deleted_count > delta_args->bucket_count / 2 + 1)
5598 {
5599 myassert(delta_args->header);
5600 shm_undict_grow_table(thread, dict, true, 1, delta_args->bucket_item_size, &delta_args->header, &delta_args->bucket_count);
5601 return true;
5602 }
5603 myassert((delta_args->header != NULL) == (*delta_header != NULL));
5604 if (!delta_args->header)
5605 {
5606 new_dict_table(thread, &dict->delta_buckets, SHM_TYPE_UNDICT_DELTA_TABLE, 2, sizeof(hash_delta_bucket)); // 4 items by default
5607 shm_undict_get_table(dict->delta_buckets, delta_header, delta_args->bucket_item_size);
5608 delta_args->bucket_count = 1 << (*delta_header)->log_count;
5609 return true;
5610 }
5611 return false;
5612}
5613
5614int
5615shm_undict_set_consume(ThreadContext *thread, UnDictRef dict, ShmUnDictKey *key,
5616 ShmPointer value, DictProfiling *profiling)
5617{
5618 bool lock_taken1 = false;
5619 uint64_t tmp;
5620 tmp = rdtsc();
5621 if (profiling) profiling->counter1 += rdtsc() - tmp;
5622 tmp = rdtsc();
5623 bool had_write_lock = shm_cell_have_write_lock(thread, &dict.local->lock);
5624 if_failure(transaction_lock_write(thread, &dict.local->lock, dict.shared, CONTAINER_UNORDERED_DICT, &lock_taken1),
5625 {
5626 if (SBOOL(value))
5627 shm_pointer_release(thread, value);
5628 transient_abort(thread);
5629 return status;
5630 });
5631 shm_cell_check_write_lock(thread, &dict.local->lock);
5632 if (!had_write_lock)
5633 {
5634 myassert(dict.local->delta_buckets == EMPTY_SHM);
5635 myassert(dict.local->delta_count == 0);
5636 myassert(dict.local->delta_deleted_count == 0);
5637 }
5638 if (profiling) profiling->counter2 += rdtsc() - tmp;
5639 tmp = rdtsc();
5640 {
5641 //ShmDictElement *element = shm_dict_find(thread, dict, hash, key, keysize));
5642 //if ( ! element)
5643 hash_func_args delta_args;
5644 delta_args.thread = thread;
5645 delta_args.bucket_item_size = sizeof(hash_delta_bucket);
5646 shm_undict_get_table(dict.local->delta_buckets, &delta_args.header, delta_args.bucket_item_size);
5647 delta_args.item_count = &dict.local->delta_count;
5648 delta_args.deleted_count = &dict.local->delta_deleted_count;
5649 delta_args.bucket_count = delta_args.header ? (1 << delta_args.header->log_count) : 0;
5650 delta_args.compare_key = shm_undict_key_compare_func(key);
5651 hash_func_args orig_args;
5652 orig_args.thread = thread;
5653 orig_args.bucket_item_size = sizeof(hash_bucket);
5654 shm_undict_get_table(dict.local->buckets, &orig_args.header, orig_args.bucket_item_size);
5655 orig_args.item_count = &dict.local->delta_count;
5656 orig_args.deleted_count = &dict.local->delta_deleted_count;
5657 orig_args.bucket_count = orig_args.header ? (1 << orig_args.header->log_count) : 0;
5658 orig_args.compare_key = shm_undict_key_compare_func(key);
5659 bool modified = false;
5660 {
5661 find_position_result rslt_delta = { -1, -1 };
5662 hash_delta_bucket *found_delta_bucket = NULL;
5663 hash_delta_bucket *new_delta_bucket = NULL;
5664 myassert(SBOOL(dict.local->delta_buckets) == (delta_args.header != NULL));
5665 if (delta_args.header)
5666 {
5667 if (*delta_args.item_count + *delta_args.deleted_count > delta_args.bucket_count / 2 + 1)
5668 shm_undict_grow_table(thread, dict.local, true, 1, delta_args.bucket_item_size, &delta_args.header, &delta_args.bucket_count);
5669 for (int cycle = 1; cycle <= 2; ++cycle)
5670 {
5671 rslt_delta = hash_find_position(delta_args, key);
5672
5673 if (rslt_delta.found >= 0)
5674 {
5675 // replacing previously modified item
5676 myassert(rslt_delta.found < delta_args.bucket_count);
5677 found_delta_bucket = (hash_delta_bucket *)
5678 get_bucket_at_index(delta_args.header, rslt_delta.found, delta_args.bucket_item_size);
5679 }
5680 else if (rslt_delta.last_free >= 0)
5681 {
5682 myassert(rslt_delta.last_free < delta_args.bucket_count);
5683 new_delta_bucket = (hash_delta_bucket *)
5684 get_bucket_at_index(delta_args.header, rslt_delta.last_free, delta_args.bucket_item_size);
5685 }
5686
5687 if (rslt_delta.found == -2)
5688 {
5689 myassert(cycle != 2);
5690 myassert(!found_delta_bucket && !new_delta_bucket);
5691 shm_undict_grow_table(thread, dict.local, true, 1, delta_args.bucket_item_size, &delta_args.header, &delta_args.bucket_count);
5692 }
5693 if (rslt_delta.found == -1 && rslt_delta.last_free == -1)
5694 break;
5695 }
5696 }
5697 myassert(rslt_delta.found > -2);
5698 if (found_delta_bucket)
5699 {
5700 switch (bucket_get_state((hash_bucket*)found_delta_bucket))
5701 {
5702 case HASH_BUCKET_STATE_SET:
5703 case HASH_BUCKET_STATE_DELETED_RESERVED:
5704 {
5705 // update the existing delta item
5706 bool was_valid = found_delta_bucket->value != EMPTY_SHM;
5707 bool set_valid = value != EMPTY_SHM;
5708 shm_pointer_move(thread, &found_delta_bucket->value, &value);
5709 if (was_valid != set_valid)
5710 {
5711 if (was_valid)
5712 {
5713 (*delta_args.deleted_count)++;
5714 (*delta_args.item_count)--;
5715 }
5716 else
5717 {
5718
5719 (*delta_args.deleted_count)--;
5720 (*delta_args.item_count)++;
5721 }
5722 }
5723
5724 modified = true;
5725 break;
5726 }
5727 default:
5728 myassert(false);
5729 }
5730 }
5731 else
5732 {
5733 find_position_result persist_probe = hash_find_position(orig_args, key);
5734 if (persist_probe.found >= 0)
5735 {
5736 if (grow_or_allocate_delta(thread, dict.local, &delta_args, &delta_args.header)) {
5737 find_position_result new_delta_probe = hash_find_position(delta_args, key);
5738 myassert(new_delta_probe.found == -1 && new_delta_probe.last_free >= 0);
5739 new_delta_bucket = (hash_delta_bucket *)
5740 get_bucket_at_index(delta_args.header, new_delta_probe.last_free, delta_args.bucket_item_size);
5741 }
5742 myassert(new_delta_bucket);
5743 myassert(persist_probe.found < orig_args.bucket_count);
5744 hash_bucket *bucket = get_bucket_at_index(orig_args.header, persist_probe.found, orig_args.bucket_item_size);
5745 // create a linked item in the dict.local->delta_buckets
5746 myassert(bucket_get_state(bucket) == HASH_BUCKET_STATE_SET);
5747 // validate_bucket(bucket); - done by hash_find_position
5748 shm_pointer_copy(thread, &new_delta_bucket->key, bucket->key);
5749 new_delta_bucket->key_hash = bucket->key_hash;
5750 shm_pointer_move(thread, &new_delta_bucket->value, &value);
5751 new_delta_bucket->orig_item = persist_probe.found;
5752
5753 if (value == EMPTY_SHM)
5754 delta_args.deleted_count++;
5755 else
5756 delta_args.item_count++;
5757
5758 modified = true;
5759 }
5760 else
5761 {
5762 if (value != EMPTY_SHM)
5763 {
5764 // create a brand new item in the dict.local->delta_buckets
5765 if (grow_or_allocate_delta(thread, dict.local, &delta_args, &delta_args.header)) {
5766 find_position_result new_delta_rslt = hash_find_position(delta_args, key);
5767 myassert(new_delta_rslt.found == -1 && new_delta_rslt.last_free >= 0);
5768 new_delta_bucket = (hash_delta_bucket *)
5769 get_bucket_at_index(delta_args.header, new_delta_rslt.last_free, delta_args.bucket_item_size);
5770 }
5771 new_delta_bucket->key = shm_undict_key_to_ref_string(thread, key);
5772 new_delta_bucket->key_hash = key->hash;
5773 shm_pointer_move(thread, &new_delta_bucket->value, &value);
5774 new_delta_bucket->orig_item = -1;
5775
5776 dict.local->delta_count++;
5777 modified = true;
5778 }
5779 }
5780 }
5781 }
5782 if (profiling) profiling->counter3 += rdtsc() - tmp;
5783 if (modified)
5784 {
5785 tmp = rdtsc();
5786 // we probably need to verify the dict.lical->lock.changes_shm value
5787 // myassert(shm_dict_push_delta(thread, &dict.local->delta, DICT_DELTA_CHANGED, value, element_shm));
5788 }
5789 if (SBOOL(value))
5790 {
5791 tmp = rdtsc();
5792 shm_pointer_release(thread, value);
5793 if (profiling) profiling->counter6 += rdtsc() - tmp;
5794 }
5795
5796 tmp = rdtsc();
5797 transient_commit(thread);
5798 if (profiling) profiling->counter7 += rdtsc() - tmp;
5799 return RESULT_OK;
5800 };
5801}
5802
5803int
5804shm_undict_set_empty(ThreadContext *thread, UnDictRef dict, ShmUnDictKey *key,
5805 DictProfiling *profiling)
5806{
5807 return shm_undict_set_consume(thread, dict, key, EMPTY_SHM, profiling);
5808}
5809
5810int
5811shm_undict_get_do(ThreadContext *thread, UnDictRef dict, ShmUnDictKey *key, ShmPointer *value, bool acquire)
5812{
5813 myassert(value);
5814 myassert(*value == EMPTY_SHM);
5815 if_failure(transaction_lock_read(thread, &dict.local->lock, dict.shared, CONTAINER_UNORDERED_DICT, NULL),
5816 {
5817 transient_abort(thread);
5818 return status;
5819 });
5820 shm_cell_check_read_lock(thread, &dict.local->lock);
5821
5822 if (shm_cell_have_write_lock(thread, &dict.local->lock))
5823 {
5824 // search the item in the delta table first
5825 hash_func_args delta_args;
5826 delta_args.thread = thread;
5827 delta_args.bucket_item_size = sizeof(hash_bucket);
5828 shm_undict_get_table(dict.local->delta_buckets, &delta_args.header, delta_args.bucket_item_size);
5829 delta_args.item_count = &dict.local->delta_count;
5830 delta_args.deleted_count = &dict.local->delta_deleted_count;
5831 delta_args.bucket_count = delta_args.header ? (1 << delta_args.header->log_count) : 0;
5832 delta_args.compare_key = shm_undict_key_compare_func(key);
5833
5834 find_position_result delta_rslt = hash_find_position(delta_args, key);
5835 if (delta_rslt.found >= 0)
5836 {
5837 hash_delta_bucket *found = (hash_delta_bucket *)get_bucket_at_index(delta_args.header, delta_rslt.found, delta_args.bucket_item_size);
5838 ShmInt state = bucket_get_state((hash_bucket *)found);
5839 myassert(state == HASH_BUCKET_STATE_SET || state == HASH_BUCKET_STATE_DELETED_RESERVED);
5840 ShmPointer retval = EMPTY_SHM;
5841 if (state == HASH_BUCKET_STATE_SET)
5842 {
5843 retval = found->value;
5844 if (acquire)
5845 shm_pointer_acq(retval);
5846 else
5847 myassert_msg(thread->transaction_mode != TRANSACTION_TRANSIENT,
5848 "Handling pointer within a transient transaction without incrementing refcount leads to undefined behaivour.");
5849 }
5850 *value = retval;
5851 transient_commit(thread);
5852 return RESULT_OK;
5853 }
5854 }
5855 else
5856 myassert(EMPTY_SHM == dict.local->delta_buckets);
5857
5858 hash_func_args persist_args;
5859 persist_args.thread = thread;
5860 persist_args.bucket_item_size = sizeof(hash_bucket);
5861 shm_undict_get_table(dict.local->buckets, &persist_args.header, persist_args.bucket_item_size);
5862 persist_args.item_count = &dict.local->count;
5863 persist_args.deleted_count = &dict.local->deleted_count;
5864 persist_args.bucket_count = persist_args.header ? (1 << persist_args.header->log_count) : 0;
5865 persist_args.compare_key = shm_undict_key_compare_func(key);
5866
5867 ShmPointer retval2 = EMPTY_SHM;
5868 find_position_result persist_rslt = hash_find_position(persist_args, key);
5869 if (persist_rslt.found >= 0)
5870 {
5871 hash_bucket *found = get_bucket_at_index(persist_args.header, persist_rslt.found, persist_args.bucket_item_size);
5872 ShmInt state = bucket_get_state(found);
5873 myassert(state == HASH_BUCKET_STATE_SET);
5874 retval2 = found->value;
5875 if (acquire)
5876 shm_pointer_acq(retval2);
5877 else
5878 myassert_msg(thread->transaction_mode != TRANSACTION_TRANSIENT,
5879 "Handling pointer within a transient transaction without incrementing refcount leads to undefined behaivour.");
5880 }
5881 *value = retval2;
5882
5883 transient_commit(thread);
5884 return RESULT_OK;
5885}
5886
5887int
5888shm_undict_get(ThreadContext *thread, UnDictRef dict, ShmUnDictKey *key, ShmPointer *value)
5889{
5890 return shm_undict_get_do(thread, dict, key, value, false);
5891}
5892
5893int
5894shm_undict_acq(ThreadContext *thread, UnDictRef dict, ShmUnDictKey *key, ShmPointer *value)
5895{
5896 return shm_undict_get_do(thread, dict, key, value, true);
5897}
5898
5899volatile char *
5900shm_undict_debug_scan_for_item(ShmUnDict *dict, ShmUnDictKey *key)
5901{
5902 hash_table_header *header;
5903 shm_undict_get_table(dict->buckets, &header, sizeof(hash_bucket));
5904 int item_count = (1 << header->log_count);
5905 for (int idx = 0; idx < item_count; ++idx)
5906 {
5907 hash_bucket *bucket = get_bucket_at_index(header, idx, sizeof(hash_bucket));
5908 if (bucket->key_hash == key->hash)
5909 return (volatile char *)bucket;
5910 }
5911 return NULL;
5912}
5913
5914void
5915print_items_to_file(FILE *pFile, ShmUnDict *dict)
5916{
5917 hash_table_header *header;
5918 shm_undict_get_table(dict->buckets, &header, sizeof(hash_bucket));
5919 int bucket_count = (1 << header->log_count);
5920 for (int idx = 0; idx < bucket_count; ++idx)
5921 {
5922 hash_bucket *bucket = get_bucket_at_index(header, idx, sizeof(hash_bucket));
5923 RefString s = shm_ref_string_get(bucket->key);
5924 if (s.data)
5925 {
5926 for (int i = 0; i < s.len; ++i)
5927 fputc((unsigned char)s.data[i], pFile);
5928 if (SBOOL(bucket->value))
5929 {
5930 fprintf(pFile, ": %d\n", bucket->value);
5931 }
5932 else
5933 fprintf(pFile, ": empty\n");
5934 }
5935 else
5936 fprintf(pFile, "none\n");
5937 }
5938}
5939
5940
5941void
5942shm_undict_debug_print(ThreadContext *thread, ShmUnDict *dict, bool full)
5943{
5944 hash_table_header *persist_header;
5945 shm_undict_get_table(dict->buckets, &persist_header, sizeof(hash_bucket));
5946
5947 if (full)
5948 printf("===========================\n");
5949 if (!persist_header)
5950 {
5951 printf("EMPTY\n");
5952 return;
5953 }
5954 int count = 0;
5955 for (int index = 0; index < (1 << persist_header->log_count); ++index)
5956 {
5957 hash_bucket *bucket = get_bucket_at_index(persist_header, index, sizeof(hash_bucket));
5958 if (bucket_get_state(bucket) == HASH_BUCKET_STATE_SET)
5959 {
5960 count++;
5961 if (full)
5962 {
5963 RefString s = shm_ref_string_get(bucket->key);
5964 ShmValueHeader *val = shm_pointer_to_pointer(bucket->value);
5965 const char *data = shm_value_get_data(val);
5966 for (int i = 0; i < s.len; ++i)
5967 putchar((unsigned char)s.data[i]);
5968 putchar('=');
5969 // for (int i = 0; i < shm_value_get_length(val); ++i)
5970 // putchar(data[i]);
5971 printf("%s", data);
5972 putchar('\n');
5973 }
5974 }
5975 }
5976 printf("count: %d\n", count);
5977 if (full)
5978 printf("===========================\n");
5979}
5980
5981void
5982release_delta_table(ThreadContext *thread, ShmUnDict *dict, bool invalidated)
5983{
5984 hash_table_header *delta_header;
5985 if (shm_undict_get_table(dict->delta_buckets, &delta_header, sizeof(hash_delta_bucket)))
5986 {
5987 for (int delta_index = 0; delta_index < (1 << delta_header->log_count); ++delta_index)
5988 {
5989 hash_bucket *bucket = get_bucket_at_index(delta_header, delta_index, sizeof(hash_delta_bucket));
5990 myassert(bucket->key != ((__ShmPointer)-1));
5991 switch (bucket_get_state(bucket))
5992 {
5993 case HASH_BUCKET_STATE_DELETED_RESERVED:
5994 case HASH_BUCKET_STATE_SET:
5995 if (SBOOL(bucket->value))
5996 shm_pointer_empty(thread, &bucket->value);
5997 if (SBOOL(bucket->key))
5998 {
5999 shm_pointer_empty(thread, &bucket->key);
6000 bucket->key = 0; // memset-friendly
6001 }
6002 break;
6003 case HASH_BUCKET_STATE_DELETED:
6004 myassert_msg(invalidated, "state = HASH_BUCKET_STATE_DELETED");
6005 break;
6006 case HASH_BUCKET_STATE_EMPTY:
6007 break;
6008 default:
6009 myassert(false);
6010 }
6011 }
6012 dict->delta_count = 0;
6013 dict->delta_deleted_count = 0;
6014 shm_pointer_empty(thread, &dict->delta_buckets);
6015 }
6016}
6017
6018void
6019release_persistent_table(ThreadContext *thread, ShmUnDict *dict)
6020{
6021 hash_table_header *header;
6022 if (shm_undict_get_table(dict->buckets, &header, sizeof(hash_bucket)))
6023 {
6024 for (int index = 0; index < (1 << header->log_count); ++index)
6025 {
6026 hash_bucket *bucket = get_bucket_at_index(header, index, sizeof(hash_bucket));
6027 myassert(bucket->key != ((__ShmPointer)-1));
6028 switch (bucket_get_state(bucket))
6029 {
6030 case HASH_BUCKET_STATE_DELETED_RESERVED:
6031 myassert(false);
6032 break;
6033 case HASH_BUCKET_STATE_DELETED:
6034 case HASH_BUCKET_STATE_EMPTY:
6035 break;
6036 case HASH_BUCKET_STATE_SET:
6037 shm_pointer_empty(thread, &bucket->value);
6038 if (SBOOL(bucket->key))
6039 {
6040 shm_pointer_empty(thread, &bucket->key);
6041 bucket->key = 0; // memset-friendly
6042 }
6043 break;
6044 default:
6045 myassert(false);
6046 }
6047 }
6048 dict->count = 0;
6049 dict->deleted_count = 0;
6050 shm_pointer_empty(thread, &dict->buckets);
6051 }
6052}
6053
6054int
6055shm_undict_clear(ThreadContext *thread, UnDictRef dict)
6056{
6057 if_failure(transaction_lock_write(thread, &dict.local->lock, dict.shared, CONTAINER_UNORDERED_DICT, NULL),
6058 {
6059 transient_abort(thread);
6060 return status;
6061 });
6062 release_delta_table(thread, dict.local, false);
6063
6064 myassert(dict.local->delta_buckets == EMPTY_SHM);
6065 myassert(dict.local->delta_count == 0);
6066 myassert(dict.local->delta_deleted_count == 0);
6067
6068 dict.local->delta_buckets = NONE_SHM;
6069
6070 return RESULT_OK;
6071}
6072
6073void
6074shm_undict_destroy(ThreadContext *thread, ShmUnDict *dict, ShmPointer dict_shm)
6075{
6076 // Clear all references in data items so we won't need several more cycles to dereference them (after destroying the block and index itself)
6077 hash_table_header *persistent = shm_pointer_to_pointer(dict->buckets);
6078 if (persistent->is_index)
6079 shm_undict_index_destroy(thread, (hash_table_index_header *)persistent, true);
6080 else
6081 shm_undict_table_destroy(thread, persistent);
6082
6083 hash_table_header *delta = shm_pointer_to_pointer(dict->buckets);
6084 if (delta->is_index)
6085 shm_undict_index_destroy(thread, (hash_table_index_header *)delta, true);
6086 else
6087 shm_undict_table_destroy(thread, delta);
6088}
6089
6090void
6091shm_undict_table_destroy(ThreadContext *thread, hash_table_header *header)
6092{
6093 ShmInt clear_type = (puint)header->type & (~SHM_TYPE_RELEASE_MARK);
6094 myassert(clear_type == SHM_TYPE_UNDICT_DELTA_TABLE || clear_type == SHM_TYPE_UNDICT_TABLE);
6095 header->relocated = true;
6096 myassert(header->log_count > 0 && header->log_count < 16);
6097 int cnt = 1 << header->log_count;
6098 if (header->type == SHM_TYPE_UNDICT_DELTA_TABLE)
6099 {
6100 hash_delta_bucket *buckets = (hash_delta_bucket *)&header->buckets;
6101 for (int idx = 0; idx < cnt; idx++)
6102 {
6103 shm_pointer_empty_atomic(thread, &buckets[idx].key);
6104 shm_pointer_empty_atomic(thread, &buckets[idx].value);
6105 }
6106 }
6107 else
6108 {
6109 hash_bucket *buckets = &header->buckets;
6110 for (int idx = 0; idx < cnt; idx++)
6111 {
6112 shm_pointer_empty_atomic(thread, &buckets[idx].key);
6113 shm_pointer_empty_atomic(thread, &buckets[idx].value);
6114 }
6115 }
6116}
6117
6118void
6119shm_undict_index_destroy(ThreadContext *thread, hash_table_index_header *index, bool deep)
6120{
6121 ShmInt clear_type = (puint)index->type & (~SHM_TYPE_RELEASE_MARK);
6122 myassert(clear_type == SHM_TYPE_UNDICT_INDEX || clear_type == SHM_TYPE_UNDICT_DELTA_INDEX);
6123 myassert(SHM_TYPE_RELEASE_MARK != (SHM_TYPE_RELEASE_MARK & (puint)index->type));
6124 index->relocated = true;
6125 ShmPointer *index_blocks = &index->blocks;
6126 int index_count = 1 << index->index_log_size;
6127 for (int idx = 0; idx < index_count; ++idx)
6128 {
6129 myassert(index_blocks[idx] != guard_bytes);
6130 if (deep)
6131 {
6132 hash_table_header *subblock = shm_pointer_to_pointer(index_blocks[idx]);
6133 myassert(subblock->type == SHM_TYPE_UNDICT_DELTA_TABLE || subblock->type == SHM_TYPE_UNDICT_TABLE);
6134 shm_undict_table_destroy(thread, subblock);
6135 }
6136 if (SBOOL(p_atomic_shm_pointer_get(&index_blocks[idx])))
6137 shm_pointer_empty_atomic(thread, &index_blocks[idx]);
6138 }
6139}
6140
6141int
6142shm_undict_commit(ThreadContext *thread, ShmUnDict *dict)
6143{
6144 shm_cell_check_write_lock(thread, &dict->lock);
6145 // move data from delta into persistent table
6146 hash_table_header *delta_header;
6147 if (shm_undict_get_table(dict->delta_buckets, &delta_header, sizeof(hash_delta_bucket)))
6148 {
6149 bool persistent_relocated = false;
6150 hash_func_args persist_args;
6151 persist_args.thread = thread;
6152 persist_args.bucket_item_size = sizeof(hash_bucket);
6153 if (!SBOOL(dict->buckets))
6154 {
6155 new_dict_table(thread, &dict->buckets, SHM_TYPE_UNDICT_TABLE, 2, sizeof(hash_bucket)); // 4 items by default
6156 persistent_relocated = true;
6157 }
6158 shm_undict_get_table(dict->buckets, &persist_args.header, persist_args.bucket_item_size);
6159 myassert(persist_args.header);
6160 persist_args.item_count = &dict->count;
6161 persist_args.deleted_count = &dict->deleted_count;
6162 persist_args.bucket_count = 1 << persist_args.header->log_count;
6163 persist_args.compare_key = shm_undict_compare_ref_string;
6164 ShmUnDictKey key = EMPTY_SHM_UNDICT_KEY;
6165
6166 for (int delta_index = 0; delta_index < (1 << delta_header->log_count); ++delta_index)
6167 {
6168 if (*persist_args.item_count + *persist_args.deleted_count > persist_args.bucket_count / 2 + 1)
6169 {
6170 shm_undict_grow_table(thread, dict, false, 1, persist_args.bucket_item_size, &persist_args.header, &persist_args.bucket_count);
6171 persistent_relocated = true;
6172 }
6173
6174 hash_delta_bucket *delta_bucket = (hash_delta_bucket *)get_bucket_at_index(delta_header, delta_index, sizeof(hash_delta_bucket));
6175 switch (bucket_get_state((hash_bucket *)delta_bucket))
6176 {
6177 case HASH_BUCKET_STATE_DELETED_RESERVED:
6178 case HASH_BUCKET_STATE_SET:
6179 {
6180 if (*persist_args.item_count + *persist_args.deleted_count > persist_args.bucket_count / 2 + 1)
6181 shm_undict_grow_table(thread, dict, false, 1, persist_args.bucket_item_size, &persist_args.header, &persist_args.bucket_count);
6182
6183 find_position_result persist_rslt = { .found = -1, .last_free = -1 };
6184 key.hash = delta_bucket->key_hash;
6185 key.key_shm = delta_bucket->key;
6186 for (int cycle = 1; cycle <= 4; ++cycle)
6187 {
6188 myassert(cycle < 4);
6189
6190 persist_rslt = hash_find_position(persist_args, &key);
6191 if (persist_rslt.found == -2)
6192 {
6193 shm_undict_grow_table(thread, dict, false, 1, persist_args.bucket_item_size, &persist_args.header, &persist_args.bucket_count);
6194 persistent_relocated = true;
6195 }
6196 else
6197 break;
6198 }
6199 myassert(persist_rslt.found > -2);
6200 if (persist_rslt.last_free < 0 && delta_bucket->orig_item < 0)
6201 DebugBreak();
6202 if (delta_bucket->orig_item >= 0)
6203 {
6204 myassert(persist_rslt.found >= 0);
6205 if (!persistent_relocated)
6206 myassert(persist_rslt.found == delta_bucket->orig_item);
6207
6208 hash_bucket *found_bucket = get_bucket_at_index(persist_args.header, persist_rslt.found, sizeof(hash_bucket));
6209
6210 myassert(bucket_get_state(found_bucket) == HASH_BUCKET_STATE_SET);
6211 int persist_state = bucket_get_state(found_bucket);
6212 bool was_valid = persist_state == HASH_BUCKET_STATE_SET;
6213 bool set_valid = delta_bucket->value != EMPTY_SHM;
6214 // modification
6215 shm_pointer_copy(thread, &found_bucket->value, delta_bucket->value);
6216 // shm_pointer_empty(thread, &delta_buckets[delta_index].key);
6217 delta_bucket->key_hash = 1; // effectively making it HASH_BUCKET_STATE_DELETED, which is invalid for delta table.
6218
6219 if (was_valid != set_valid)
6220 {
6221 if (was_valid)
6222 {
6223 (*persist_args.deleted_count)++;
6224 (*persist_args.item_count)--;
6225 }
6226 else
6227 {
6228 (*persist_args.item_count)++;
6229 if (persist_state == HASH_BUCKET_STATE_DELETED)
6230 (*persist_args.deleted_count)--;
6231 }
6232 }
6233 if (set_valid == false && was_valid)
6234 {
6235 // The element is in temporary HASH_BUCKET_STATE_DELETED_RESERVED state
6236 uint32_t hash = found_bucket->key_hash;
6237 // Save the hash because it's gonna be modified
6238 ShmPointer deleted_key = bucket_delete(found_bucket);
6239 found_bucket = NULL;
6240 myassert(shm_pointer_refcount(thread, deleted_key) > 0);
6241 shm_pointer_release(thread, deleted_key);
6242 // now the persist_rslt.found's state is HASH_BUCKET_STATE_DELETED
6243 hash_compact_tail(persist_args, persist_rslt.found, hash);
6244 }
6245 }
6246 else
6247 {
6248 myassert(persist_rslt.last_free >= 0);
6249 if (delta_bucket->value != EMPTY_SHM)
6250 {
6251 // allocate a new persistent item
6252 hash_bucket *last_free = get_bucket_at_index(persist_args.header, persist_rslt.last_free, sizeof(hash_bucket));
6253 // modification
6254 shm_pointer_copy(thread, &last_free->value, delta_bucket->value);
6255 shm_pointer_copy(thread, &last_free->key, delta_bucket->key);
6256 myassert(shm_pointer_refcount(thread, delta_bucket->key) >= 2);
6257 last_free->key_hash = delta_bucket->key_hash;
6258 (*persist_args.item_count)++;
6259 delta_bucket->key_hash = 1; // effectively making it HASH_BUCKET_STATE_DELETED, which is invalid for delta table.
6260 }
6261 }
6262 break;
6263 }
6264 case HASH_BUCKET_STATE_EMPTY:
6265 // skip
6266 break;
6267 default:
6268 myassert(false);
6269 }
6270 }
6271
6272 dict->delta_count = 0;
6273 dict->delta_deleted_count = 0;
6274 release_delta_table(thread, dict, true);
6275 shm_pointer_empty(thread, &dict->delta_buckets);
6276 }
6277 else if (dict->delta_buckets == NONE_SHM)
6278 {
6279 release_persistent_table(thread, dict);
6280 dict->delta_buckets = EMPTY_SHM;
6281 }
6282 myassert(dict->delta_buckets == EMPTY_SHM);
6283 myassert(dict->delta_count == 0);
6284 myassert(dict->delta_deleted_count == 0);
6285 shm_cell_check_write_lock(thread, &dict->lock);
6286 return RESULT_OK;
6287}
6288
6289int
6290shm_undict_rollback(ThreadContext *thread, ShmUnDict *dict)
6291{
6292 shm_cell_check_write_lock(thread, &dict->lock);
6293 // clear the delta table and release the values (and keys if not owned by persistent table)
6294 release_delta_table(thread, dict, false);
6295 myassert(dict->delta_buckets == EMPTY_SHM);
6296 myassert(dict->delta_count == 0);
6297 myassert(dict->delta_deleted_count == 0);
6298 return RESULT_OK;
6299}
6300
6301int
6302shm_undict_unlock(ThreadContext *thread, ShmUnDict *dict, ShmInt type)
6303{
6304 // myassert(dict->lock.id == thread->self, "dict->lock.id == thread->self");
6305 if (TRANSACTION_ELEMENT_WRITE == type)
6306 p_atomic_shm_pointer_set(&dict->lock.transaction_data, EMPTY_SHM);
6307 _shm_cell_unlock(thread, &dict->lock, type);
6308 return RESULT_OK;
6309}
6310
6311// end ShmUnDict
6312
6313vl void *
6314shm_pointer_acq(ShmPointer pointer)
6315{
6316 ShmRefcountedBlock *block = (ShmRefcountedBlock *)shm_pointer_to_pointer(pointer);
6317 if (!block)
6318 return NULL;
6319 myassert(!(block->type & SHM_TYPE_FLAG_CONTAINED) != !(block->type & SHM_TYPE_FLAG_REFCOUNTED));
6320 if (block->type & SHM_TYPE_FLAG_CONTAINED)
6321 block = shm_pointer_to_pointer(((ShmContainedBlock*)block)->container);
6322 else
6323 myassert(block->type & SHM_TYPE_FLAG_REFCOUNTED);
6324
6325 myassert(block);
6326 p_atomic_int_add(&block->refcount, 1);
6327 return block;
6328}
6329
6330ShmRefcountedBlock *
6331shm_pointer_to_refcounted(ThreadContext *thread, ShmPointer pointer)
6332{
6333 if (SBOOL(pointer) == false)
6334 return NULL;
6335 ShmRefcountedBlock *ref_block = (ShmRefcountedBlock *)shm_pointer_to_pointer(pointer);
6336 myassert(ref_block);
6337 myassert(!((ref_block->type & SHM_TYPE_FLAG_CONTAINED) && (ref_block->type & SHM_TYPE_FLAG_REFCOUNTED))); // mutually exclusive
6338 if (ref_block->type & SHM_TYPE_FLAG_CONTAINED)
6339 ref_block = shm_pointer_to_pointer(((ShmContainedBlock*)ref_block)->container);
6340 else
6341 myassert(ref_block->type & SHM_TYPE_FLAG_REFCOUNTED ||
6342 ref_block->type == SHM_TYPE_LIST_BLOCK ||
6343 ref_block->type == SHM_TYPE_LIST_CHANGES ||
6344 ref_block->type == SHM_TYPE_QUEUE_CHANGES ||
6345 ref_block->type == SHM_TYPE_DICT_DELTA);
6346
6347 myassert(((puint)ref_block->type & SHM_TYPE_RELEASE_MARK) == 0); // otherwise it was released
6348
6349 return ref_block;
6350}
6351
6352void
6353shm_refcounted_block_before_release(ThreadContext *thread, ShmPointer shm_pointer, ShmRefcountedBlock *block)
6354{
6355 int obj_type = block->type & SHM_TYPE_MASK;
6356 if (obj_type == (SHM_TYPE_LIST_BLOCK & SHM_TYPE_MASK))
6357 {
6358 int id = mm_block_get_debug_id(block, shm_pointer);
6359 myassert(SHM_LIST_BLOCK_FIRST_DEBUG_ID == id || SHM_LIST_BLOCK_DEBUG_ID == id);
6360 }
6361 if (obj_type == (SHM_TYPE_LIST_INDEX & SHM_TYPE_MASK))
6362 {
6363 int id = mm_block_get_debug_id(block, shm_pointer);
6364 myassert(SHM_LIST_INDEX_DEBUG_ID == id);
6365 }
6366}
6367
6368// true if item released
6369bool
6370shm_pointer_release(ThreadContext *thread, ShmPointer pointer)
6371{
6372 ShmRefcountedBlock *block = shm_pointer_to_refcounted(thread, pointer);
6373 if (block == NULL)
6374 return false;
6375
6376 ShmInt final_cnt = p_atomic_int_dec(&block->refcount);
6377 myassert(final_cnt >= 0);
6378 if (final_cnt == 0)
6379 {
6380 block->type |= 0x10000000;
6381 shm_refcounted_block_before_release(thread, pointer, block);
6382 free_mem(thread, pointer, block->size);
6383 }
6384 return final_cnt == 0;
6385}
6386
6387void
6388shm_refcounted_block_destroy(ThreadContext *thread, ShmPointer shm_pointer, ShmRefcountedBlock *obj)
6389{
6390 // release nested structures for containers
6391 int obj_type = obj->type & SHM_TYPE_MASK;
6392 switch (obj_type)
6393 {
6394 case SHM_TYPE_CELL & SHM_TYPE_MASK:
6395 // shm_cell_destroy(thread, (ShmCell *)obj, shm_pointer);
6396 break;
6397 case SHM_TYPE_LIST & SHM_TYPE_MASK:
6398 // shm_list_destroy(thread, (ShmList *)obj, shm_pointer);
6399 break;
6400 case SHM_TYPE_LIST_BLOCK & SHM_TYPE_MASK:
6401 // not implemented
6402 {
6403 int id = mm_block_get_debug_id(obj, shm_pointer);
6404 myassert(SHM_LIST_BLOCK_FIRST_DEBUG_ID == id || SHM_LIST_BLOCK_DEBUG_ID == id);
6405 }
6406 break;
6407 case SHM_TYPE_LIST_CELL & SHM_TYPE_MASK:
6408 myassert(false); // not refcounted
6409 break;
6410 case SHM_TYPE_LIST_INDEX & SHM_TYPE_MASK:
6411 {
6412 int id = mm_block_get_debug_id(obj, shm_pointer);
6413 myassert(SHM_LIST_INDEX_DEBUG_ID == id);
6414 }
6415 break;
6416 case SHM_TYPE_LIST_CHANGES & SHM_TYPE_MASK:
6417 break;
6418 case SHM_TYPE_QUEUE & SHM_TYPE_MASK:
6419 // not implemented
6420 break;
6421 case SHM_TYPE_QUEUE_CELL & SHM_TYPE_MASK:
6422 // ??
6423 break;
6424 case SHM_TYPE_QUEUE_CHANGES & SHM_TYPE_MASK:
6425 // ??
6426 break;
6427 case SHM_TYPE_DICT & SHM_TYPE_MASK:
6428 // not implementedshm_refcounted_block_before_release
6429 break;
6430 shm_dict_destroy(thread, (ShmDict *)obj, shm_pointer);
6431 break;
6432 case SHM_TYPE_DICT_DELTA & SHM_TYPE_MASK:
6433 // ???
6434 break;
6435 case SHM_TYPE_UNDICT & SHM_TYPE_MASK:
6436 break;
6437 shm_undict_destroy(thread, (ShmUnDict *)obj, shm_pointer);
6438 break;
6439 case SHM_TYPE_UNDICT_TABLE & SHM_TYPE_MASK:
6440 case SHM_TYPE_UNDICT_DELTA_TABLE & SHM_TYPE_MASK:
6441 break;
6442 /*
6443 #4 0x00442b8b in _myassert (condition=false, condition_msg=0x461843 "ref_block", message=0x0,
6444 file=0x461047 "shm_types.c", line=5844) at shm_types.c:49
6445 #5 0x004511e9 in shm_pointer_to_refcounted (thread=0xb5aa0014, pointer=1) at shm_types.c:5844
6446 #6 0x00451325 in shm_pointer_release (thread=0xb5aa0014, pointer=1) at shm_types.c:5864
6447 #7 0x00451666 in shm_pointer_empty_atomic (thread=0xb5aa0014, pointer=0xb5ba0148)
6448 at shm_types.c:6023
6449 #8 0x004505d2 in shm_undict_table_destroy (thread=0xb5aa0014, header=0xb5ba0110)
6450 at shm_types.c:5624
6451 #9 0x004514a4 in shm_refcounted_block_destroy (thread=0xb5aa0014, shm_pointer=1048848,
6452 obj=0xb5ba0110) at shm_types.c:5934
6453 #10 0x00456393 in free_mem (thread=0xb5aa0014, shm_pointer=1048848, size=88) at MM.c:1251
6454 #11 0x00451388 in shm_pointer_release (thread=0xb5aa0014, pointer=1048848) at shm_types.c:5875
6455 #12 0x0045161e in shm_pointer_empty (thread=0xb5aa0014, pointer=0xb5ba00c4) at shm_types.c:6014
6456 #13 0x004500ca in release_delta_table (thread=0xb5aa0014, dict=0xb5ba0080, invalidated=true)
6457 at shm_types.c:5525
6458 #14 0x00450e4b in shm_undict_commit (thread=0xb5aa0014, dict=0xb5ba0080) at shm_types.c:5782
6459 #15 0x0044680a in transaction_end (thread=0xb5aa0014, rollback=false) at shm_types.c:1926
6460 #16 0x00447750 in commit_transaction (thread=0xb5aa0014, recursion_count=0x0)
6461 */
6462 shm_undict_table_destroy(thread, (hash_table_header *)obj);
6463 break;
6464 case SHM_TYPE_UNDICT_INDEX & SHM_TYPE_MASK:
6465 case SHM_TYPE_UNDICT_DELTA_INDEX & SHM_TYPE_MASK:
6466 break;
6467 shm_undict_index_destroy(thread, (hash_table_index_header *)obj, true);
6468 break;
6469 }
6470}
6471
6472int
6473shm_pointer_refcount(ThreadContext *thread, ShmPointer pointer)
6474{
6475 ShmRefcountedBlock *block = shm_pointer_to_refcounted(thread, pointer);
6476
6477 return p_atomic_int_get(&block->refcount);
6478}
6479
6480LocalReference *
6481thread_local_ref(ThreadContext *thread, ShmPointer pointer)
6482{
6483 assert((thread->local_vars->count < LOCAL_REFERENCE_BLOCK_SIZE));
6484 //thread_local.local_references[thread_local.last_free_local_ref] = pointer;
6485 //thread_local.last_free_local_ref ++ ;
6486 //return &thread_local.local_references[thread_local.last_free_local_ref];
6487
6488 int idx = thread->local_vars->count;
6489 LocalReference *rslt = &(thread->local_vars->references[idx]);
6490 // Increment count only when the item is completely valid, decrement count before we invalidate the item,
6491 // so a reader from outside won't corrupt memory.
6492 rslt->shared = pointer;
6493 rslt->local = NULL;
6494 rslt->owned = FALSE;
6495 thread->local_vars->count += 1;
6496 return rslt;
6497}
6498
6499void
6500thread_local_clear_ref(ThreadContext *thread, LocalReference *reference)
6501{
6502 assert((char*)&thread->local_vars->references[0] <= (char*)reference &&
6503 (char*)reference <= (char*)&thread->local_vars->references[LOCAL_REFERENCE_BLOCK_SIZE]
6504 );
6505 // more strict assertion
6506 assert(thread->local_vars->count > 0);
6507 assert( (char*)reference <= (char*)&thread->local_vars->references[thread->local_vars->count - 1]
6508 );
6509 //if (thread_local.last_free_local_ref > 0 && &thread_local.local_references[thread_local.last_free_local_ref - 1] == pointer)
6510 // thread_local.last_free_local_ref--;
6511 ShmPointer ref = reference->shared;
6512 reference->local = NULL;
6513 reference->shared = EMPTY_SHM;
6514 if (reference->owned)
6515 {
6516 reference->owned = FALSE;
6517 bool rslt = shm_pointer_release(thread, ref);
6518 // assert(rslt >= 0);
6519 }
6520}
6521
6522void
6523thread_local_clear_refs(ThreadContext *thread)
6524{
6525 //long tmp = thread_local.last_free_local_ref;
6526 //memset(&thread_local.local_references[0], 0, tmp * sizeof(ShmPointer));
6527 //thread_local.last_free_local_ref = 0;
6528
6529 while (thread->local_vars->count > 0)
6530 {
6531 int idx = thread->local_vars->count - 1;
6532 thread_local_clear_ref(thread, & thread->local_vars->references[idx]);
6533 thread->local_vars->count--;
6534 }
6535}
6536
6537PShmPointer
6538shm_pointer_empty(ThreadContext *thread, PShmPointer pointer)
6539{
6540 ShmPointer tmp = *pointer;
6541 *pointer = EMPTY_SHM;
6542 shm_pointer_release(thread, tmp);
6543 return pointer;
6544}
6545
6546PShmPointer
6547shm_pointer_empty_atomic(ThreadContext *thread, PShmPointer pointer)
6548{
6549 ShmPointer tmp = p_atomic_shm_pointer_get(pointer);
6550 p_atomic_shm_pointer_set(pointer, EMPTY_SHM);
6551 shm_pointer_release(thread, tmp);
6552 return pointer;
6553}
6554
6555PShmPointer
6556shm_next_pointer_empty(ThreadContext *thread, PShmPointer pointer)
6557{
6558 ShmPointer tmp = *pointer;
6559 *pointer = NONE_SHM;
6560 shm_pointer_release(thread, tmp);
6561 return pointer;
6562}
6563
6564ShmPointer
6565shm_pointer_copy(ThreadContext *thread, PShmPointer dest, ShmPointer source)
6566{
6567 ShmPointer tmp_rel = *dest;
6568 if (SBOOL(source))
6569 shm_pointer_acq(source);
6570
6571 *dest = source;
6572
6573 if (SBOOL(tmp_rel))
6574 shm_pointer_release(thread, tmp_rel);
6575 return source;
6576}
6577
6578PShmPointer
6579shm_pointer_move(ThreadContext *thread, PShmPointer pointer, PShmPointer newval)
6580{
6581 ShmPointer tmp = *pointer;
6582 *pointer = *newval;
6583 *newval = EMPTY_SHM;
6584 shm_pointer_release(thread, tmp);
6585 return pointer;
6586}
6587
6588PShmPointer
6589shm_pointer_move_atomic(ThreadContext *thread, PShmPointer pointer, PShmPointer newval)
6590{
6591 ShmPointer tmp = p_atomic_shm_pointer_get(pointer);
6592 p_atomic_shm_pointer_set(pointer, p_atomic_shm_pointer_get(newval));
6593 p_atomic_shm_pointer_set(newval, EMPTY_SHM);
6594 shm_pointer_release(thread, tmp);
6595 return pointer;
6596}
6597
6598PShmPointer
6599shm_next_pointer_move(ThreadContext *thread, PShmPointer pointer, PShmPointer newval)
6600{
6601 ShmPointer tmp = *pointer;
6602 *pointer = *newval;
6603 *newval = NONE_SHM;
6604 shm_pointer_release(thread, tmp);
6605 return pointer;
6606}
6607