Since the inception of the blocking collection, there was a possible conflict between one thread executing Add (or TryAdd) and another executing CompleteAdding. In short, TryAdd was following this simple pseudocode:
if [not completed][Add, in case you don’t work with TOmniBlockingCollection on a daily basis, just calls TryAdd.]
add element
CompleteAdding was not an iota more complicated:
mark [completed][The reason for two ‘completed’ flags is that the first one interacts with the TryAdd and the second with the TryTake.]
signal [completed]
I knew when I wrote the code that the following scenario is not only possible but very likely:
if [not completed]
mark [completed]
signal [completed]
add element
- The first thread checks if ‘completed’ flag has been set. It is not, so it can continue.
- The second thread marks the collection ‘completed’.
- The first thread then adds the element to the collection.
You see, I’ve forgotten that there’s the third party in the game – the reader. TryTake is complicated but if I stick to the relevant parts I can describe the basic reading algorithm as:
if not [take]TryTake first tries to read an element from the collection and if that fails, waits until the timeout has occurred or 'completed’ is signaled or until new element has appeared in the queue and then retries the read or exits.
wait [timeout, completed, new element]
if completed, exit
else repeat from beginning
Few days ago I found out (while developing some new functionality in the Pipeline high-level parallelization construct) that this can lead to a very strange situation when TryTake is called with the timeout of INFINITE and returns without fetching data although there is a data stored in the collection. One way to get this result would be:
if not [take]
if [not completed]
mark [completed]
signal [completed]
wait [timeout, completed, new element]
if completed, exit
add element
- Reader tries to fetch data from the collection and fails because the collection is empty.
- Writer checks that the collection is not ‘completed’ and continues.
- Another writer marks the collection ‘completed’.
- Reader notices that the collection was ‘completed’ and exits although there’s still the data in the queue.
- A retried [take] as this point would not help as the collection really is empty at the moment.
- Writer adds the element to the collection.
- Bang! You’re dead!
Possible Solution
After I discovered this, I spent few hours in swearing mode, while at the same time thinking about possible solution. Gradually I’ve come up with a simple fix that doesn’t slow down either [Try]Add or [Try]Take much. CompleteAdding, on the other hand, may execute quite slower than before, but that’s usually not a problem.The solution was to a) repeat the [take] part in TryTake after the completion is signaled and b) making sure that TryAdd and CompleteAdding cannot execute at the same time. In pseudocode:
mark [in add]
if [not completed]
add element
unmark [in add]
while [in add] or not atomically mark [completed]
wait
signal [completed]
if not [take]TryAdd simply increments a counter on beginning and decrements it at exit. The counter is incremented and decremented in the interlocked manner so that two threads can modify it at the same time and still expect the correct result.
while not ([take] or [timeout] or [completed]) do
wait [timeout, completed, new element]
CompleteAdding first check if the counter is > 0. If so, it will wait a little and retry, hoping that any TryAdd has completed its execution in the meantime. When [in add] count is 0, it will signal ‘completed’ state by atomically exchanging [in add] value of 0 with a special marker representing the ‘completed’ state. If another TryAdd managed to sneak in and already increment [in add], this atomic exchange will fail and CompleteAdding will retry.
TryTake is less “smart” and always retries the internal [take] operation.
Testing
Before I fixed the code, I implemented a simple unit test that consistently failed with the old blocking collection.procedure TestIOmniBlockingCollection.TestCompleteAdding; var coll : IOmniBlockingCollection; iTest : integer; lastAdded: integer; lastRead : TOmniValue; begin for iTest := 1 to 1000 do begin coll := TOmniBlockingCollection.Create; lastAdded := -1; lastRead := -2; Parallel.Join([ procedure var i: integer; begin for i := 1 to 100000 do begin if not coll.TryAdd(i) then break; lastAdded := i; end; end, procedure begin Sleep(1); coll.CompleteAdding; end, procedure begin while coll.TryTake(lastRead, INFINITE) do ; end ]).Execute; if (lastAdded > 0) and (lastRead.AsInteger > 0) and (lastAdded <> lastRead.AsInteger) then break; //for iTest end; CheckEquals(lastAdded, lastRead.AsInteger); end;The base of the test is the Parallel.Join which executes three thread in parallel – first writes data to the collection, second calls CompleteAdding after a short wait and third reads some data. Last written element is compared to last read element at the end and the test fails if they differ. Whole test is repeated few times as we can’t be sure that it will fail every time. (In fact, it fails approximately in 1% of runs.) Repeating everything 1000 times is enough to consistently reproduce the problem.
The Fix
After I had a working unit test and an idea about the possible fix, writing the correct code was simple. TryAdd now wraps itself in Increment/Decrement:function TOmniBlockingCollection.TryAdd(const value: TOmniValue): boolean; begin obcAddCountAndCompleted.Increment; try // IsCompleted can not change during the execution of this function Result := not IsCompleted; if Result then begin // throttling code, not important for our scenario obcCollection.Enqueue(value); // throttling code, not important for our scenario end; finally obcAddCountAndCompleted.Decrement; end; end;CompleteAdding is implemented using a busy loop:
procedure TOmniBlockingCollection.CompleteAdding; begin repeat if IsCompleted then // CompleteAdding was already called Exit; // there must be no active writers if obcAddCountAndCompleted.CAS(0, CCompletedFlag) then begin // tell blocked readers to quit Win32Check(SetEvent(obcCompletedSignal)); Exit; end; asm pause; end; until false; // don’t use 100% of one core end;IsCompleted is trivial:
function TOmniBlockingCollection.IsCompleted: boolean; begin Result := (obcAddCountAndCompleted.Value AND CCompletedFlag) = CCompletedFlag; end;
Any plans for making a FireMonkey version of OTL?
ReplyDeleteI don't know - if possible, then sure. I don't yet have any idea where the problems can occur.
ReplyDeleteMore important for me will be the Mac OS/X and x64 support.
I don't see why OTL wouldn't work with Firemonkey? No visuals. Or have I missed something?
ReplyDelete@gabr: FireMonkey is required to support GUI in OSX, so I take that as a yes :)
ReplyDelete@Babnik: OTL is currently doing two things that may prevent it from being cross platform. 1. It uses Windows APIs (which have to be replaced with the appropriate OSX APIs). 2. It contains assembly code, which might be a problem - not sure.
@Lars. Ok, but that's still nothing to do with Firemonkey. Perhaps the question was 'any plans to support cross-platform compilation in Delphi XE2?' I'm not criticizing, just trying to understand what Firemonkey is. My handle is that it is a cross platform 'visual' library. OTL could or could not be cross platform, but I don't think that has anything to do with Firemonkey. I think you can write a cross platform app in Delphi XE2 without Firemonkey. Admittedly it would have no gui, but such applications exist. Correct me if I'm wrong.
ReplyDeleteThere are certainly ways to get OTL cross compiled, the question though is if that would be "enough".
ReplyDeleteMac OS X has something called "Grand Central Dispatch", which is as its name imposes, the central authority/api that deals with execution objects and parallelism. If provides queues, thread pools etc, which will work very efficiently if their concept is understood correctly.
Naively porting threads from Windows to Mac would probably work at least "sub-optimal"
From what I gathered by talking to Mac developers, multithreading on OS/X is sub-optimal whatever you do. First OTL port will most likely target only the "it works, however bad" level.
ReplyDeleteThe x64 target is more important to me and will be completed and released as soon as possible after the release of the XE2.
Sounds like a good plan.
ReplyDeleteI like this kind of article. Discussing how you did find an error, and whatever/how you did to correct it is one of my favorites reads.
ReplyDeleteThis kind of problem is what always send me to think twice when using multi-threading.
Following the off-topic and talking about about FMX, I am afraid of it... :(
But hoping you can port OTL for it. That would mean a lot about how strong will be Firemonkey.
regards,
EMB
> This makes havoc when the application [excepts]
ReplyDeleteI suppose it should be "expects". Otherwise nice article!