Two separate topics are covered in this example:
- Sending data from any thread (main or background) to a TThread-based worker.
- Sending data from a TThread-based worker to a form.
1. Sending data from multiple producers to a single worker
To send data form a form to a thread, we need a message queue. This example uses a TOmniMessageQueue object for that purpose. An instance of this object is created in the main thread. All threads – the main thread, the worker threads, and possible other data-producing threads – use the same shared object which is written with thread-safety in mind.
1.a Initialization and cleanup
The TOmniMessageQueue constructor takes a maximum queue size for a parameter. TWorker is just a simple TThread descendant which accepts the instance of the message queue as a parameter so it can read from the queue.
FCommandQueue := TOmniMessageQueue.Create(1000);FWorker := TWorker.Create(FCommandQueue);
The shutdown sequence is fairly standard. Stop is used instead of Terminate so it can set internal event which is used to signal the thread to stop.
if assigned(FWorker) then begin FWorker.Stop;
FWorker.WaitFor;
FreeAndNil(FWorker);end;FreeAndNil(FCommandQueue);
1.b Sending data to the worker
To put some data into a queue, use its Enqueue method. It accepts a TOmniMessage record. Each TOmniMessage contains an integer message ID (not used in this example) and a TOmniValue data which, in turn, can hold any data type.
procedure TfrmTThreadComm.Query(value: integer);begin if not FCommandQueue.Enqueue(TOmniMessage.Create(0 {ignored}, value)) then raise Exception.Create('Command queue is full!');end;
Enqueue returns False if the queue is full. (A TOmniMessageQueue can only hold as much elements as specified in the constructor call.)
The example also shows how everything works correctly if two threads are started at the same time and both write to the message queue.
var th1: TThread; th2: TThread;begin th1 := TThread.CreateAnonymousThread( procedure begin Query(Random(1000)); end); th2 := TThread.CreateAnonymousThread( procedure begin Query(Random(1000)); end); th1.Start; th2.Start;end;
1.c Receiving the data
The worker’s Execute method waits on two handles in a loop. If a FStopEvent (an internal event) is signalled, the loop will exit. If the message queue’s GetNewMessageEvent (a THandle-returning method) gets signalled, a new data has arrived to the queue. In that case, the code loops to empty the message queue and then waits again for something to happen.
procedure TWorker.Execute;var handles: array [0..1] of THandle; msg : TOmniMessage;begin handles[0] := FStopEvent.Handle; handles[1] := FCommandQueue.GetNewMessageEvent; while WaitForMultipleObjects(2, @handles, false, INFINITE) =
(WAIT_OBJECT_0 + 1) do
begin while FCommandQueue.TryDequeue(msg) do begin //process the message … end; end;end;
2. Sending data from a worker to the form
To send messages from a worker thread to a form we need another instance of TOmniMessageQueue. As we can’t wait on a handle in the main thread (that would block the user interface), we’ll use a different notification mechanism – a window message observer.
2.a Initialization and cleanup
We create the queue just as in the first part. Then we create a window message observer and at the end we Attach it to the message queue. A window message observer sends a window message to some window each time a message queue changes. The four parameters passed to CreateContainerWindowsMessageObserver are the handle of the window that will receive those messages, a message ID, WPARAM, and LPARAM.
FResponseQueue := TOmniMessageQueue.Create(1000, false);
FResponseObserver := CreateContainerWindowsMessageObserver(Handle,
MSG_WORKER_RESULT, 0, 0);
FResponseQueue.ContainerSubject.Attach(FResponseObserver,coiNotifyOnAllInserts);
While shutting down, we first have to Detach the observer from the queue. Then we destroy the observer and empty the response queue (ProcessResults) to process any results that may still be waiting inside.
FResponseQueue.ContainerSubject.Detach(FResponseObserver,coiNotifyOnAllInserts);
FreeAndNil(FResponseObserver);
ProcessResults;
FreeAndNil(FResponseQueue);
2.b Sending data to the form
To send a data, we use exactly the same approach as in 1.b.
if not FResponseQueue.Enqueue(TOmniMessage.Create(0 {ignored},
Format('= %d', [msg.MsgData.AsInteger * 2]))) then raise Exception.Create('Response queue is full!');
2.c Receiving the data
On the receiving side (the form) we have to set up a message function associated with the message that is sent from the window message observer. In this method we’ll call another method ProcessResults.
const MSG_WORKER_RESULT = WM_USER;
procedure WorkerResult(var msg: TMessage); message MSG_WORKER_RESULT;
procedure TfrmTThreadComm.WorkerResult(var msg: TMessage);begin ProcessResults;end;
As a final step, ProcessResults reads data from the message queue and displays each element in a listbox.
procedure TfrmTThreadComm.ProcessResults;var msg: TOmniMessage;begin while FResponseQueue.TryDequeue(msg) do begin //msg.MsgID is ignored in this demo //msg.MsgData contains a string, generated by the worker lbLog.ItemIndex := lbLog.Items.Add(msg.MsgData);
end;end;
3. Using a TOmniBlockingCollection instead of TOmniMessageQueue
Alternatively, you can use the blocking collection implementation from OtlCollections instead. A blocking collection would be appropriate in case you have to handle large number of work requests or responses stored in a queue as a blocking collection grows and shrinks dynamically.
The only important change to the code would be in part 1 as you’d have to create an event observer manually while TOmniMessageQueue does it automatically. For details you can check TOmniMessageQueue.AttachWinEventObserver and TOmniMessageQueue.Destroy.
No comments:
Post a Comment