Some history required …
First there was a good idea with somewhat patchy implementation:
Three steps to the blocking collection: [2] Dynamically allocated queue.
Then there was a partial solution, depending on me being able to solve another problem. Still, it was a good solution:
Releasing queue memory without the MREW lock.
At the end, the final (actually, the original) problem was also solved:
Bypassing the ABA problem.
And now to the results …
This article describes a
lock-free, (nearly) O(1) insert/remove, dynamically allocated queue that doesn’t require garbage collector. It can be implemented on any hardware that supports 8-byte compare-and-swap operation (in Intel world, that means at least a Pentium). The code uses 8-byte atomic move in some parts but they can be easily changed into 8-byte CAS in case the platform doesn’t support such operation. In the current implementation, Move64 (8-byte move) function uses SSE2 instructions and therefore requires Pentium 4. The code, however, can be conditionally compiled with CAS64 instead of Move64 thus enabling it to run on Pentium 1 to 3. (See the notes in the code for more information). The code requires memory manager that allows the memory to be released in a thread different from the thread where allocation occurred. [Obviously, Windows on Intel platform satisfies all conditions.]
Although the dynamic queue has been designed with the
OmniThreadLibrary (OTL for short) in mind, there’s also a small sample implementation that doesn’t depend on the OTL:
GpLockFreeQueue.pas. This implementation can store int64 elements only (or everything you can cast into 8 bytes) while the OTL implementation from
OtlContainers stores TOmniValue data. [The latter being a kind of variant record used inside the OTL to store “anything” from a byte to a string/wide string/object/interface.] Because of that, GpLockFreeQueue implementation is smaller, faster, but slightly more limited. Both are released under the
BSD license.
Memory layout
Data is stored in
slots. Each slot uses 16 bytes and contains byte-size
tag, word-size
offset and up to 13 bytes of data. The implementation in OtlContainers uses all of those 13 bytes to store TOmniValue while the implementation in GpLockFreeQueue uses only 8 bytes and keeps the rest unused.
The following notation is used to represent a slot:
[tag|offset|value].
In reality,
value field is first in the record because it must be 4-aligned. The reason for that will be revealed in a moment. In GpLockFreeQueue, a slot is defined as:
TGpLFQueueTaggedValue = packed record
Value : int64;
Tag : TGpLFQueueTag;
Offset : word;
Stuffing: array [1..5] of byte;
end;
Slots do not stand by themselves; they are allocated in
blocks. Default block size if 64 KB (4096 slots) but can be varied from 64 bytes (four slots) to 1 MB (65536 slots). In this article, I’ll be using 5-slot blocks, as they are big enough to demonstrate all the nooks and crannies of the algorithm and small enough to fit in one line of text.
During the allocation, each block is formatted as follows:
[Header|0|4] [Sentinel|1|0] [Free|2|0] [Free|3|0] [EndOfList|4|0]
The first slot is marked as a
Header and has the
value field initialized to “number of slots in the block minus one”. [The highest value that can be stored in the header’s value field is 65535; therefore the maximum number of slots in a block is 65536.] This value is atomically decremented each time a slot is dequeued. When the number drops to zero, block can be released. (More on that in:
Releasing queue memory without the MREW lock.) InterlockedDecrement, which is used to decrement this value, requires its argument to be 4-aligned and that’s the reason for the
value field to be stored first in the slot.
The second slot is a
Sentinel. Slots from the third onwards are tagged
Free and are used to store data. The last slot is tagged
EndOfList and is used to link two blocks. All slots have the
offset field initialized to the sequence number of the slot – in the
Header this value is 0, in the
Sentinel 1, and so on up to the
EnndOfList with the value set to 4 (number of slots in the block minus 1). This value is used in the Dequeue to calculate the address of the header slot just before the header’s value is decremented.
In addition to dynamically allocated (and released) memory blocks, the queue uses
head and
tail tagged pointers. Both are 8-byte values, consisting of two 4-byte fields –
slot and
tag. The following notation is used to represent a tagged pointer:
[slot|tag].
The
slot field contains the address of the current head/tail slot while the
tag field contains the tag of the current slot. The motivation behind this scheme is explained in the
Bypassing the ABA problem post.
Tail and head pointers are modified using 8-byte CAS and Move commands and must therefore be 8-aligned.
By putting all that together, we get a snapshot of the queue state. This is the initial state of a queue with five-slot blocks:
Head:[B1:2|Free]
Tail:[B1:1|Sentinel]
B1:[Header|0|4] T:[Sentinel|1|0] H:[Free|2|0] [Free|3|0] [EndOfList|4|0]
The memory block begins at address
B1 and contains five slots, initialized as described before. The
tail pointer points to the second slot of block B1 (
B1:1; I’m using the form
address:offset), which is tagged
Sentinel and the
head pointer points to the third block (B1:2), the first
Free slot. Here we see the sole reason for the
Sentinel – it stands between the
tail and the
head when the queue is empty.
Enqueue
In theory, the enqueue operation is simple. The element is stored in the next available slot and queue head is advanced. In practice, however, multithreading makes things much more complicated.
To prevent thread conflicts, each enqueueing thread must first
take ownership of the head. It does this by swapping queue
head tag from
Free to
Allocating or from
EndOfList to
Extending. To prevent ABA problems, both head pointer and head tag are swapped with the same head pointer and new tag in one atomic 8-byte compare-and-swap.
Enqueue then does its work and at the end swaps (head pointer, tag) to (next head pointer,
Free|EndOfList) which allows other threads to proceed with their enqueue operation.
Let’s start with the empty list.
Head:[B1:2|Free]
Tail:[B1:1|Sentinel]
B1:[Header|0|4] T:[Sentinel|1|0] H:[Free|2|0] [Free|3|0] [EndOfList|4|0]
Enqueue first swaps
[B1:2|Free] with
[B1:2|Allocating].
Head:[B1:2|Allocating]
Tail:[B1:1|Sentinel]
B1:[Header|0|4] T:[Sentinel|1|0] H:[Free|2|0] [Free|3|0] [EndOfList|4|0]
The green colour indicates an atomic change.
Only the head tag has changed, the data in the B1 memory block is not modified.
Head still points to a slot tagged
Free (slot B1:2). This is fine as enqueueing threads don’t take interest in this tag at all.
Data is then stored in the slot and its tag is changed to
Allocated. This again makes no change to enqueuers as the head slot in the header was not updated yet. It also doesn’t allow the dequeue operation on this slot to proceed because the
head is adjacent to the
tail, which points to a
Sentinel and in this case Dequeue treats the queue as empty (as we’ll see later).
Head:[B1:2|Allocating]
Tail:[B1:1|Sentinel]
B1:[Header|0|4] T:[Sentinel|1|0] H:[Allocated|2|42] [Free|3|0] [EndOfList|4|0]
Red colour marks “unsafe” modification.
At the end, the
head is unlocked by storing address of the next slot (first free slot, B1:3) and next slot’s tag (
Free).
Head:[B1:3|Free]
Tail:[B1:1|Sentinel]
B1:[Header|0|4] T:[Sentinel|1|0] [Allocated|2|42] H:[Free|3|0] [EndOfList|4|0]
Teal colour marks an atomic 8-byte move used to move new data into the
head pointer. If the target platform doesn’t support such move, an 8-byte CAS could be used instead.
After those changes,
head is pointing to the next free slot and data is stored in the queue.
Let’s assume that another Enqueue is called and stores number 17 in the queue. Nothing new happens here.
Head:[B1:4|EndOfList]
Tail:[B1:1|Sentinel]
B1:[Header|0|4] T:[Sentinel|1|0] [Allocated|2|42] [Allocated|3|17] H:[EndOfList|4|0]
The next Enqueue must do something new as there are no free slots in the current block. To extend the queue, thread first swaps the
EndOfList tag with the
Extending tag. By doing this, the thread takes ownership of the queue
head.
Head:[B1:4|Extending]
Tail:[B1:1|Sentinel]
B1:[Header|0|4] T:[Sentinel|1|0] [Allocated|2|42] [Allocated|3|17] H:[EndOfList|4|0]
A new block gets allocated and initialized (see chapter on memory management, below).
Head:[B1:4|Extending]
Tail:[B1:1|Sentinel]
B1:[Header|0|4] T:[Sentinel|1|0] [Allocated|2|42] [Allocated|3|17] H:[EndOfList|4|0]
B2:[Header|0|4] [Sentinel|1|0] [Free|2|0] [Free|3|0] [EndOfList|4|0]
Data is stored in the first free slot of the block B2.
Head:[B1:4|Extending]
Tail:[B1:1|Sentinel]
B1:[Header|0|4] T:[Sentinel|1|0] [Allocated|2|42] [Allocated|3|17] H:[EndOfList|4|0]
B2:[Header|0|4] [Sentinel|1|0] [Allocated|2|57] [Free|3|0] [EndOfList|4|0]
Last slot of block B1 is modified to
point to the first element in the second slot of the next block (
Sentinel). Also, a tag
BlockPointer
is stored into that slot.
Head:[B1:4|Extending]
Tail:[B1:1|Sentinel]
B1:[Header|0|4] T:[Sentinel|1|0] [Allocated|2|42] [Allocated|3|17] H:[BlockPointer|4|B2:1]
B2:[Header|0|4] [Sentinel|1|0] [Allocated|2|57] [Free|3|0] [EndOfList|4|0]
At the end, the
head is updated to point to the first free slot (B2:3).
Head:[B2:3|Free]
Tail:[B1:1|Sentinel]
B1:[Header|0|4] T:[Sentinel|1|0] [Allocated|2|42] [Allocated|3|17] [BlockPointer|4|B2:1]
B2:[Header|0|4] [Sentinel|1|0] [Allocated|2|57] H:[Free|3|0] [EndOfList|4|0]
That completes the Enqueue. List
head is now unlocked.
The actual code is not more complicated than this description (code taken from GpLockFreeQueue).
procedure TGpLockFreeQueue.Enqueue(const value: int64);
var
extension: PGpLFQueueTaggedValue;
next : PGpLFQueueTaggedValue;
head : PGpLFQueueTaggedValue;
begin
repeat
head := obcHeadPointer.Slot;
if (obcHeadPointer.Tag = tagFree)
and CAS64(head, Ord(tagFree), head, Ord(tagAllocating), obcHeadPointer^)
then
break
else if (obcHeadPointer.Tag = tagEndOfList)
and CAS64(head, Ord(tagEndOfList), head, Ord(tagExtending), obcHeadPointer^)
then
break
else
asm pause; end;
until false;
if obcHeadPointer.Tag = tagAllocating then begin
next := NextSlot(head);
head.Value := value;
head.Tag := tagAllocated;
Move64(next, Ord(next.Tag), obcHeadPointer^);
end
else begin
extension := AllocateBlock;
Inc(extension, 2);
extension.Tag := tagAllocated;
extension.Value := value;
Dec(extension);
head.Value := int64(extension);
head.Tag := tagBlockPointer;
Inc(extension, 2);
Move64(extension, Ord(extension.Tag), obcHeadPointer^);
PreallocateMemory;
end;
end;
Dequeue
Enqueue is simple but Dequeue is a whole new bag of problems. It has to handle the
Sentinel slot and because of that there are five possible scenarios:
- Skip the sentinel.
- Read the data (tail doesn’t catch the head).
- Read the data (tail does catch the head).
- The queue is empty.
- Follow the BlockPointer tag.
To prevent thread conflicts, dequeueing thread takes ownership of the
tail. It does this by swapping the tail tag from
Allocated or
Sentinel to
Removing or from
BlockPointer to
Destroying. Again, those changes are done atomically by swapping both tail pointer and tail tag in one go.
Let’s walk through all five scenarios now.
1 – Skip the sentinel
Let’s start with a queue state where two slots are allocated and
head points to the
EndOfList slot.
Head:[B1:4|EndOfList]
Tail:[B1:1|Sentinel]
B1:[Header|0|4] T:[Sentinel|1|0] [Allocated|2|42] [Allocated|3|17] H:[EndOfList|4|0]
The code first locks the
tail.
Head:[B1:4|EndOfList]
Tail:[B1:1|Removing]
B1:[Header|0|4] T:[Sentinel|1|0] [Allocated|2|42] [Allocated|3|17] H:[EndOfList|4|0]
As there is no data in the
Sentinel slot, the
tail is immediately updated to point to the next slot.
Head:[B1:4|EndOfList]
Tail:[B1:2|Allocated]
B1:[Header|0|4] [Sentinel|1|0] T:[Allocated|2|42] [Allocated|3|17] H:[EndOfList|4|0]
There’s no need to update the tag in slot 1 as no other thread can reach it again. Because the slot is now unreachable, the code now decrements the count in the B1’s
Header slot (from 4 to 3).
Head:[B1:4|EndOfList]
Tail:[B1:2|Allocated]
B1:[Header|0|3] [Sentinel|1|0] T:[Allocated|2|42] [Allocated|3|17] H:[EndOfList|4|0]
Because the original tag was
Sentinel, the code retries from beginning immediately. The queue is now in scenario 2 (data, the
tail is not immediately before the
head).
2 - Read the data (tail doesn’t catch the head)
Again, the
tail is locked.
Head:[B1:4|EndOfList]
Tail:[B1:2|Removing]
B1:[Header|0|3] [Sentinel|1|0] T:[Allocated|2|42] [Allocated|3|17] H:[EndOfList|4|0]
The code then reads the value from the slot (42) and advances the
tail to the slot B1:3.
Head:[B1:4|EndOfList]
Tail:[B1:3|Allocated]
B1:[Header|0|3] [Sentinel|1|0] [Allocated|2|42] T:[Allocated|3|17] H:[EndOfList|4|0]
Again, there is no need to change the slot tag. The slot 2 is now unreachable and the
Header count is decremented.
Head:[B1:4|EndOfList]
Tail:[B1:3|Allocated]
B1:[Header|0|2] [Sentinel|1|0] [Allocated|2|42] T:[Allocated|3|17] H:[EndOfList|4|0]
The code has retrieved the data and can now return from the Dequeue method.
3 - Read the data (tail does catch the head)
If the Dequeue is now called for the second time, we have the scenario 3 – there is data in the queue, but the
head pointer is next to the
tail pointer. Because of the, the
tail cannot be incremented. Instead of that, the code replaces the
tail slot tag with the
Sentinel.
It is entirely possible that the
head will change the very next moment which means that the
Sentinel would not be really needed. Luckily, that doesn’t hurt much – the next Dequeue would skip the
Sentinel, retry and fetch the next element from the queue.
The code starts in a well-known manner, by taking ownership of the
tail.
Head:[B1:4|EndOfList]
Tail:[B1:3|Removing]
B1:[Header|0|2] [Sentinel|1|0] [Allocated|2|42] T:[Allocated|3|17] H:[EndOfList|4|0]
The code then reads the value from the slot, but because the
head was next to
tail when Dequeue was called, the code doesn’t increment the
tail and doesn’t decrement the
Header counter. Instead of that, the
Sentinel tag is put into the
head tag.
Head:[B1:4|EndOfList]
Tail:[B1:3|Sentinel]
B1:[Header|0|2] [Sentinel|1|0] [Allocated|2|42] T:[Allocated|3|17] H:[EndOfList|4|0]
It doesn’t matter that the slot tag is still
Allocated as no-one will read it again.
4 - The queue is empty
If the Dequeue would be called now, it would return immediately with status
empty because the
tail tag is
Sentinel and because the
tail has caught the
head.
5 - Follow the BlockPointer tag
In the last scenario, the
tail is pointing to a
BlockPointer.
Head:[B2:3|Free]
Tail:[B1:4|EndOfList]
B1:[Header|0|1] [Sentinel|1|0] [Allocated|2|42] [Allocated|3|17] T:[BlockPointer|4|B2:1]
B2:[Header|0|4] [Sentinel|1|0] [Allocated|2|57] H:[Free|3|0] [EndOfList|4|0]
As expected, the code first takes the ownership of the
tail.
Head:[B2:3|Free]
Tail:[B1:4|Destroying]
B1:[Header|0|1] [Sentinel|1|0] [Allocated|2|42] [Allocated|3|17] T:[BlockPointer|4|B2:1]
B2:[Header|0|4] [Sentinel|1|0] [Allocated|2|57] H:[Free|3|0] [EndOfList|4|0]
We know that the first slot in the next block is
Sentinel. We also know that the
head is not pointing to this slot because that’s how Enqueue works (when new block is allocated,
head points to the first slot
after the
Sentinel.). Therefore, it is safe to update the
tail to point to the
Sentinel slot of the B2 block.
Head:[B2:3|Free]
Tail:[B2:1|Sentinel]
B1:[Header|0|1] [Sentinel|1|0] [Allocated|2|42] [Allocated|3|17] [BlockPointer|4|B2:1]
B2:[Header|0|4] T:[Sentinel|1|0] [Allocated|2|57] H:[Free|3|0] [EndOfList|4|0]
By doing the swap, the ownership of the
tail is released.
The
Header count is then decremented.
Head:[B2:3|Free]
Tail:[B2:1|Sentinel]
B1:[Header|0|0] [Sentinel|1|0] [Allocated|2|42] [Allocated|3|17] [BlockPointer|4|B2:1]
B2:[Header|0|4] T:[Sentinel|1|0] [Allocated|2|57] H:[Free|3|0] [EndOfList|4|0]
Because the count is now zero, the code destroys the B1 block. (Note that the
Header count decrement is atomic and only one thread can actually reach the zero.) While the block is being destroyed, other threads may be calling Dequeue.
Head:[B2:3|Free]
Tail:[B2:1|Sentinel]
B1:[Header|0|0] [Sentinel|1|0] [Allocated|2|42] [Allocated|3|17] [BlockPointer|4|B2:1] B2:[Header|0|4] T:[Sentinel|1|0] [Allocated|2|57] H:[Free|3|0] [EndOfList|4|0]
Because the
tail tag was originally
BlockPointer, the code retries immediately and continues with the scenario 1.
The actual code is tricky because some of the code path is shared between scenarios (code taken from GpLockFreeQueue).
function TGpLockFreeQueue.Dequeue(var value: int64): boolean;
var
caughtTheHead: boolean;
tail : PGpLFQueueTaggedValue;
header : PGpLFQueueTaggedValue;
next : PGpLFQueueTaggedValue;
tag : TGpLFQueueTag;
begin
tag := tagSentinel;
Result := true;
while Result and (tag = tagSentinel) do begin
repeat
tail := obcTailPointer.Slot;
caughtTheHead := NextSlot(obcTailPointer.Slot) = obcHeadPointer.Slot;
if (obcTailPointer.Tag = tagAllocated)
and CAS64(tail, Ord(tagAllocated), tail, Ord(tagRemoving), obcTailPointer^) then
begin
tag := tagAllocated;
break;
end
else if (obcTailPointer.Tag = tagSentinel) then begin
if caughtTheHead then begin
Result := false;
break;
end
else if CAS64(tail, Ord(tagSentinel), tail, Ord(tagRemoving), obcTailPointer^) then begin
tag := tagSentinel;
break;
end
end
else if (obcTailPointer.Tag = tagBlockPointer)
and CAS64(tail, Ord(tagBlockPointer), tail, Ord(tagDestroying), obcTailPointer^) then
begin
tag := tagBlockPointer;
break;
end
else
asm pause; end;
until false;
if Result then begin
header := tail;
Dec(header, header.Offset);
if tag in [tagSentinel, tagAllocated] then begin
next := NextSlot(tail);
if tag = tagAllocated then
value := tail.Value;
if caughtTheHead then begin
Move64(tail, Ord(tagSentinel), obcTailPointer^);
header := nil;
end
else
Move64(next, Ord(next.Tag), obcTailPointer^);
end
else begin
next := PGpLFQueueTaggedValue(tail.Value);
Move64(next, Ord(tagSentinel), obcTailPointer^);
tag := tagSentinel;
end;
if assigned(header) and (InterlockedDecrement(PInteger(header)^) = 0) then
ReleaseBlock(header);
end;
end;
end;
Memory management
In the dynamic queue described above, special consideration goes to memory allocation and deallocation because most of the time that will be the slowest part of the enqueue/dequeue.
Memory is always released
after the queue
tail is unlocked. That way, other threads may dequeue from the same queue while the thread is releasing the memory.
The allocation is trickier, because the Enqueue only knows that it will need the memory
after the
head is locked. The trick here is to use one preallocated memory block which is reused inside the Enqueue. This is much faster than calling the allocator. After the
head is unlocked, Enqueue preallocates next block of memory. This will slow down the current thread, but will not block other threads from enqueueing into the same queue.
Dequeue also tries to help with that. If the preallocated block is not present when a block must be released, Dequeue will store the released block away for the next Enqueue to use.
Also, there's one such block preallocated when the queue is initially created.
If this explanation is unclear, look at the program flow below. It describes the code flow through the Enqueue that has to allocate a memory block and through the Dequeue that has to release a memory block. Identifiers in parenthesis represent methods listed below.
Enqueue:
- lock the head
- detect EndOfList
- use the cached block if available, otherwise allocate a new block (AllocateBlock)
- unlock the head
- if there is no cached block, allocate new block and store it away (PreallocateMemory)
Dequeue:
- lock the tail
- process last slot in the block
- unlock the tail
- decrement the header count
- as the header count has dropped to zero:
- if the cached block is empty, store this one away (ReleaseBlock)
- otherwise release the block
All manipulations with the cached block are done atomically. All allocations are optimistic – if the preallocated block is empty, new memory block is allocated, partitioned and only then the code tries to swap it into the preallocated block variable. If compare-and-swap fails at this point, other thread went through the same routine, just slightly faster, and the allocated (and partitioned) block is thrown away. Looks like there may be quite some work done in vain but in reality the preallocated block is rarely thrown away.
It tested other, more complicated schemes (for example small 4-slot stack) but they invariably behaved worse than this simple approach.
function TGpLockFreeQueue.AllocateBlock: PGpLFQueueTaggedValue;
var
cached: PGpLFQueueTaggedValue;
begin
cached := obcCachedBlock;
if assigned(cached) and CAS32(cached, nil, obcCachedBlock) then
Result := cached
else begin
Result := AllocMem(obcBlockSize);
PartitionMemory(Result);
end;
end;
procedure TGpLockFreeQueue.PreallocateMemory;
var
memory: PGpLFQueueTaggedValue;
begin
if not assigned(obcCachedBlock) then begin
memory := AllocMem(obcBlockSize);
PartitionMemory(memory);
if not CAS32(nil, memory, obcCachedBlock) then
FreeMem(memory);
end;
end;
procedure TGpLockFreeQueue.ReleaseBlock(firstSlot: PGpLFQueueTaggedValue; forceFree: boolean);
begin
if forceFree or assigned(obcCachedBlock) then
FreeMem(firstSlot)
else begin
ZeroMemory(firstSlot, obcBlockSize);
PartitionMemory(firstSlot);
if not CAS32(nil, firstSlot, obcCachedBlock) then
FreeMem(firstSlot);
end;
end;
As you can see in the code fragments above, memory is also initialized (formatted into slots) when memory is allocated. This also helps with the general performance.
Performance
Tests were again performed using the 32_Queue project in the Tests branch of the OTL tree.
The test framework sets up the following data path:
source queue –> N threads –> channel queue –> M threads –> destination queue
Source queue is filled with numbers from 1 to 1.000.000. Then 1 to 8 threads are set up to read from the source queue and write into the channel queue and another 1 to 8 threads are set up to read from the channel queue and write to the destination queue. Application then starts the clock and starts all threads. When all numbers are moved to the destination queue, clock is stopped and contents of the destination queue are verified. Thread creation time is not included in the measured time.
All in all this results in 2 million reads and 2 million writes distributed over three queues. Tests are very brutal as all threads are just hammering on the queues, doing nothing else. The table below contains average, min and max time of 5 runs on a 2.67 GHz computer with two 4-core CPUs. Data from the current implementation ("new code") is compared to the
original implementation ("old code"). Best times are marked
green.
| New code |
| average [min-max] all data in milliseconds | millions of queue operations per second |
N = 1, M = 1 | 590 [559 – 682] | 6.78 |
N = 2, M = 2 | 838 [758 – 910] | 4.77 |
N = 3, M = 3 | 1095 [1054 – 1173] | 3.65 |
N = 4, M = 4 | 1439 [1294 – 1535] | 2.78 |
N = 8, M = 8 | 1674 [1303 – 2217] | 2.39 |
N = 1, M = 7 | 1619 [1528 – 1822] | 2.47 |
N = 7, M = 1 | 1525 [1262 – 1724] | 2.62 |
|
Old Code |
| average [min-max]all data in milliseconds | millions of queue operations per second |
N = 1, M = 1 | 707 [566-834] | 5.66 |
N = 2, M = 2 | 996 [950-1031] | 4.02 |
N = 3, M = 3 | 1065 [1055-1074] | 3.76 |
N = 4, M = 4 | 1313 [1247-1358] | 3.04 |
N = 8, M = 8 | 1520 [1482-1574] | 2.63 |
N = 1, M = 7 | 3880 [3559-4152] | 1.03 |
N = 7, M = 1 | 1314 [1299-1358] | 3.04 |
The new implementation is faster when less threads are used and slightly slower when number of threads increases. The best thing is that there is no weird speed drop in N = 1, M = 7 case. The small slowdown with higher number of threads doesn't bother me much as this test case really stresses the queue. In all practical applications, there should be much more code that does real work and queue load would rapidly drop down.
If your code depends on accessing a shared queue from many multiple threads that enqueue/dequeue most of the time, there's a simple solution - change the code! I believe that multithreaded code should not fight for each data, but cooperate. A possible solution is to split the data in packets and schedule packets to the shared queue. Each thread would then dequeue one packet and process all data stored within.
Wrapup
The code will be released in OmniThreadLibrary 1.5 (but you can use it already if you fetch the HEAD from the
SVN). It passed very rigorous stress test and I believe it is working. If you find any problems, please let me know. I’m also interested in any ports to different languages (a C version would be nice).