I do like Craig's implementation of a lockless queue. It is small, neat, and working. I also like that he approached multithreading from a communication viewpoint. I do, however, have several issues with how it is integrated into the application. While I whole understand the need for simple demo that viewers can understand, I feel that the Delphi world is full of such examples. That makes it hard for a newcomer to the multithreaded world to find appropriate patterns to copy.
Hence I decided to rewrite Craig's code with different objectives in mind. Instead of speed I focused on flexibility, ease of use and good multithreaded programming patterns. Before I jump into my solution, however, I must articulate the bad programming practices in the Lockless demo. (That is strictly subjective reasoning. Your mileage may vary. It is, alas, a reasoning supported by many many years of writing bad multithreaded code - and not yet enough years of writing good code.)
So, what are my peeves here:
- The queue Craig implemented supports only one-to-one communications. In other words, it is a single producer, single consumer queue. That is useful from time to time, but in a multithreaded world we constantly run into situations where we would like to have multiple writers and/or multiple readers. In other words, I prefer multiple producer, multiple consumer queue over a one-to-one approach even if the latter is faster.
- To read from the queue, thread constantly polls it for data. That's an equivalent of a small child doing Are we there yet? on a repeat. I believe that this programming practice - which I absolutely hate - comes as a consequence of TThread design and original TThread demos which are all written as a while not Terminated do loop.
- The main thread also polls the queue. Even more, it uses the OnIdle event handler. This event handler fires very fast when you move a mouse over the form or when you type, but is only called once or twice per second when the mouse and keyboard are idle. It is also not called at all if application doesn't have focus. All that makes it a bad candidate for polling. You never know when the message you sent from the worker thread will actually be processed.
If you don't believe my OnIdle claims, run the OnIdle demo from my GitHub.
If you don't want to use OmniThreadLibrary, there's a way to achieve all that with standard Delphi RTL. Even more, the solution will support all platforms. The queue will not be as fast as Craig's implementation, but - as I always say - if the speed of your program depends on processing millions of messages per second, your architecture is all wrong anyway.
I will move away from a lockless solution. Writing a lockless multi producer multi consumer queue is hard as you quickly run into the ABA problem. OmniThreadLibrary nevertheless implements two such queues - TOmniBaseBoundedQueue from the OtlContainers unit implements a constant-size queue and TOmniBaseQueue from the same unit implements a dynamically allocated queue. In this example I however promised to stay with the Delphi RTL and I will stay away from these implementations.
My code addresses previous complaints with three simple changes:
- TThreadedQueue
from System.Generics.Collections implements multiple producers, multiple consumers queue. - The worker thread is written so that it responds to external events and uses no CPU time when nothing is going on. In our case, a message has arrived is such an event and is implemented with TEvent from System.SyncObjs.
- Dephi also offers a good way to interrupt main thread by using TThread.Queue.
All queueing and threading logic is implemented in the CommThread unit. Most of the implementation is wrapped in a queue class TMessageQueue and base thread class TCommThread.
TMessageQueue
type
TMessageProc<T>
=
reference
to
procedure
(const
data:
T);
TMessageQueue<T>
=
class
constructor
Create(numItems:
integer;
const
messageReceiver:
TMessageProc
=
nil);
function
Receive(var
value:
T):
boolean;
function
Send(const
value:
T):
boolean;
property
Event:
TEvent
read
FEvent;
end;
The TMessageQueue<T>
If you don't provide the messageReceiver parameter, the queue will assume that you want to receive messages in one or mode background threads. In this mode, TMessageQueue<T>
In both cases, anybody can call Send to write data to the queue. You can even use the queue to send messages back to yourself.
function
TMessageQueue<T>.MakeCallback( const
value:
T):
TThreadProcedure;
begin
Result
:=
procedure
begin
FReceiver(value);
end;
end;
procedure
TMessageQueue<T> .DispatchMessages;
var
value:
T;
begin
while
FQueue.PopItem(value)
=
wrSignaled
do
TThread.Queue(nil,
MakeCallback(value));
end;
function
TMessageQueue<T>.Send( const
value:
T):
boolean;
begin
Result
:=
(FQueue.PushItem(value)
=
wrSignaled);
if
Result
then
begin
if
assigned(FEvent)
then
FEvent.SetEvent;
if
assigned(FReceiver)
then
DispatchMessages;
end;
end;
To simplify writing a thread that uses two such communication channels (one can only transfer data in one direction - to the queue or from the queue) I wrote a TCommThread<TToThread, TToMain> class. It has two generic parameters. TToThread specifies type of data that is sent to the background thread and TToMain specifies type of data that is sent to the main thread. If you want to send different types of data, you can use the TValue (System.RTTI) as the type parameter.
type
TCommThread<TToThread, TToMain> =
class(TThread)
strict
private
FToThread:
TMessageQueue<TToThread>;
FToMain
:
TMessageQueue<TToMain>;
protected
procedure
ProcessMessage(const
data:
TToThread);
virtual;
abstract;
function
SendToMain(const
value:
TToMain):
boolean;
procedure
TerminatedSet;
override;
public
constructor
Create(AQueueToThread:
TMessageQueue;
AQueueToMain:
TMessageQueue);
procedure
Execute;
override;
end;
procedure
TCommThread<TToThread, TToMain>.Execute;
var
data:
TToThread;
begin
while
FToThread.Event.WaitFor
=
wrSignaled
do
begin
if
Terminated
then
break;
FToThread.Event.ResetEvent;
while
FToThread.Receive(data)
do
ProcessMessage(data);
end;
end;
To be able to stop the worker without sending it a message, the code overrides TerminatedSet method which is called when you call TThread.Terminate in the code. Overridden version sets the event on which the Execute waits. Because of that, WaitFor in the Execute stops waiting, the code sees that Terminated flag is set and exits.
procedure
TCommThread<TToThread, TToMain>.TerminatedSet;
begin
inherited;
FToThread.Event.SetEvent;
end;
end;
To implement a worker thread one has to write a derived class implementing a ProcessMessage method.
type
TCopyThread
=
class(TCommThread<string,string>)
protected
procedure
ProcessMessage(const
data:
string);
override;
end;
Implementation in the demo code just passes data back to the main thread and reports an error if the queue is full.
procedure
TCopyThread.ProcessMessage(const
data:
string);
begin
if
not
SendToMain('Processed: '
+
data)
then
TThread.Queue(nil,
procedure
begin
frmThreadedQueue.Log(Format(
'*** Thread failed to post message [%s]',
[data]));
'*** Thread failed to post message [%s]',
[data]));
end);
end;
To test all that, main form creates two communication channels and multiple worker threads.
procedure
TfrmThreadedQueue.FormCreate(Sender:
TObject);
var
i:
integer;
begin
FQueueToWorkers
:=
TMessageQueue<string>.Create(100);
FQueueToMain
:=
TMessageQueue<string>.Create(100,
HandleWorkerMessage);
SetLength(FWorkers,
TThread.ProcessorCount);
for
i
:=
Low(FWorkers)
to
High(FWorkers)
do
FWorkers[i]
:=
TCopyThread.Create(FQueueToWorkers,
FQueueToMain);
end;
procedure
TfrmThreadedQueue.FormDestroy(Sender:
TObject);
var
i:
integer;
begin
for
i
:=
Low(FWorkers)
to
High(FWorkers)
do
begin
FWorkers[i].Terminate;
FWorkers[i].Free;
end;
FreeAndNil(FQueueToWorkers);
FreeAndNil(FQueueToMain);
end;
A click on a button then sends multiple messages to FQueueToWorkers.
procedure
TfrmThreadedQueue.Button1Click(Sender:
TObject);
var
i:
integer;
msg:
string;
begin
ListBox1.Clear;
msg
:=
Edit1.Text;
for
i
:=
1
to
SpinEdit1.Value
do
if
not
FQueueToWorkers.Send(msg
+
'
#'
+
i.ToString)
then
Log(Format('***
Main failed to post message [%d]',
[i]));
end;
These messages are then processed by the workers on a first come - first serve basis. Because there are multiple workers, messages will arrive back to the main thread shuffled.
If you only need a 1:1 communication, unit CommThread also implements a wrapper class TSingleCommThread<TToThread, TToMain> which internally manages its own communication channels. As the comm channels are not exposed to the owner, it also implements method SendToThread which sends message to the thread.
type
TSingleCommThread<TToThread, TToMain> =
class(TCommThread<TToThread, TToMain>
strict
private
FToThreadQueue:
TMessageQueue<TToThread>;
FToMainQueue:
TMessageQueue<TToMain>;
public
constructor
Create(numItems:
integer;
const
messageReceiver:
TMessageProc<TToMain>
=
nil);
destructor
Destroy;
override;
function
SendToThread(const
value:
TToThread):
boolean;
end;
Let's recap:
- This implementation allows multiple producers to send data to multiple workers over one communication channel. It is indeed slower than an optimized lockless solution, but this is rarely important.
- Worker threads don't spend CPU time waiting for messages. Instead of that each worker thread waits on an event which uses zero CPU time.
- Main thread also doesn't spend CPU time waiting for a message. Instead of that a message is pushed to it via TThread.Queue.
And the most important fact:
- Message processing is decoupled from thread logic. Thread logic is handled in the base class which you only have to derive from and write on message-processing method. Same goes for the main thread - you only have to write one message-processing method and that is that.
Yes, a multiple producer, multiple consumer queue that is event driven and self contained, is the solution to many threading applications. Once you learn how to express your multitasking problem into those terms, the rest is much easier.
ReplyDeleteUsing TThreadedQueue brings back the horrors of the TMonitor bugs in the early generic versions, that made my implementation of a similar framework using TThreadedQueue fall flat on the ground. It was finally fixed in XE2 update 4.
Thanks for a well written article.
Hi,
ReplyDeleteThank you for this article. Nice one.
I read Gary's article, and I was a bit confused too, mainly for 2 reasons :
- Same as your article exposed (Utility of one to one)
- And for the fact that current implementation let developper to easely exposed to hard concurrency problem, in a "bad use" case. It is *very easy* to obtain there.
I like much Gary's article usually, hope he'll post a more advanced implementation later.
Who is Gary?
DeleteMight be affiliated to Alice...
DeleteMan, why is this thread stuff still hard? :-/
ReplyDeleteHello
ReplyDeleteThank you for this article and your book (Delpi High Performance). I am using them to guide myself into parallel processing.
I am using your code to implement a queued database worker thread. When I send a query to the input queue, and the output queue signals a result, I want to be sure I get the results of my query and not somebody else's query. I can include an identifier to tag my results, but if I receive a result that is not mine, that result set is already popped and no one else will get a chance to examine it.
I could send the result back to to output queue if I don't want it, and let the output queue re-signal. I run the risk of re-examining the same result repeatedly until the result I want appears.
I could possibly try to peek at the result before I pop it, but that seems to be moving away from the spirit of your code.
Do you have thoughts on how I might solve this problem?
Thanks
David
You could, for example, create one output queue per client and include it into the request data. DB worker thread would then fetch a request, process it, and write the output to the queue reference from the request record.
ReplyDeleteBTW, a better place for such discussions is https://en.delphipraxis.net/forum/32-omnithreadlibrary/