|
- unit uutlMessageThread;
-
- { Package: Utils
- Prefix: utl - UTiLs
- Beschreibung: diese Unit definiert einen Thread, der mit Hilfe von Messages Daten synchronisiert
- mit anderen Threads austauschen kann }
-
- {$mode objfpc}{$H+}
-
- interface
-
- uses
- Classes, SysUtils, syncobjs, uutlMessages, uutlGenerics;
-
- type
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- TutlMessageThread = class(TThread, IUnknown)
- public type
- TMessageProgressCallback = procedure(const aMsg: TutlMessage) of Object;
- TMessageQueue = class(specialize TutlSyncQueue<TutlMessage>)
- private
- fEvent: TSimpleEvent;
- public
- procedure Push(const aItem: TutlMessage); override;
- function Pop(out aItem: TutlMessage): Boolean; override;
-
- function WaitForMessages(const aWaitTime: Cardinal = INFINITE): Boolean;
- function ProcessMessages(const aProgressCallback: TMessageProgressCallback): Boolean;
-
- constructor Create(const aOwnsObjects: Boolean = true);
- destructor Destroy; override;
- end;
- protected
- fMessages: TMessageQueue;
- fRefCount : longint;
- { implement methods of IUnknown }
- function QueryInterface({$IFDEF FPC_HAS_CONSTREF}constref{$ELSE}const{$ENDIF} iid : tguid;out obj) : longint;{$IFNDEF WINDOWS}cdecl{$ELSE}stdcall{$ENDIF};
- function _AddRef : longint;{$IFNDEF WINDOWS}cdecl{$ELSE}stdcall{$ENDIF}; virtual;
- function _Release : longint;{$IFNDEF WINDOWS}cdecl{$ELSE}stdcall{$ENDIF}; virtual;
- protected
- function CreateMessageQueue: TMessageQueue; virtual;
- function WaitForMessages(const aWaitTime: Cardinal): Boolean;
- function ProcessMessages: Boolean; virtual;
- procedure ProcessMessage(const {%H-}aMessage: TutlMessage); virtual;
- public
- //Messages Objects passed to PostMessage will be freed automatically
- procedure PostMessage(const aID: Cardinal; const aWParam, aLParam: PtrInt); overload;
- procedure PostMessage(const aID: Cardinal; const aArgs: TObject); overload;
- procedure PostMessage(const aMsg: TutlMessage); virtual; overload;
-
- //Messages Objects passed to SendMessage must be freed by user when WaitResult is wrSignaled (otherwise the thread will handle it)
- function SendMessage(const aID: Cardinal; const aWParam, aLParam: PtrInt;
- const aWaitTime: Cardinal = INFINITE): TWaitResult; overload;
- function SendMessage(const aID: Cardinal; const aArgs: TObject;
- const aWaitTime: Cardinal = INFINITE): TWaitResult; overload;
- function SendMessage(const aMsg: TutlSynchronousMessage;
- const aWaitTime: Cardinal = INFINITE): TWaitResult; virtual; overload;
-
- constructor Create(CreateSuspended: Boolean; const StackSize: SizeUInt=DefaultStackSize);
- destructor Destroy; override;
- end;
-
- //Messages Objects passed to PostMessage will be freed automatically
- function utlPostMessage(const aThreadID: TThreadID; const aID: Cardinal; const aWParam, aLParam: PtrInt): Boolean; overload;
- function utlPostMessage(const aThreadID: TThreadID; const aID: Cardinal; const aArgs: TObject): Boolean; overload;
- function utlPostMessage(const aThreadID: TThreadID; const aMsg: TutlMessage): Boolean; overload;
-
- //Messages Objects passed to SendMessage must be freed by user when WaitResult is wrSignaled (otherwise the thread will handle it)
- function utlSendMessage(const aThreadID: TThreadID; const aID: Cardinal; const aWParam, aLParam: PtrInt;
- const aWaitTime: Cardinal = INFINITE): TWaitResult; overload;
- function utlSendMessage(const aThreadID: TThreadID; const aID: Cardinal; const aArgs: TObject;
- const aWaitTime: Cardinal = INFINITE): TWaitResult; overload;
- function utlSendMessage(const aThreadID: TThreadID; const aMsg: TutlSynchronousMessage;
- const aWaitTime: Cardinal = INFINITE): TWaitResult; overload;
-
- implementation
-
- uses
- uutlLogger, uutlExceptions;
-
- type
- TutlMessageThreadMap = class(specialize TutlMap<TThreadID, TutlMessageThread>)
- private
- fCS: TCriticalSection;
- public
- procedure Lock;
- procedure Release;
- constructor Create(const aOwnsObjects: Boolean = true);
- destructor Destroy; override;
- end;
-
- var
- Threads: TutlMessageThreadMap;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- function utlPostMessage(const aThreadID: TThreadID; const aID: Cardinal; const aWParam, aLParam: PtrInt): Boolean;
- begin
- result := utlPostMessage(aThreadID, TutlMessage.Create(aID, aWParam, aLParam));
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- function utlPostMessage(const aThreadID: TThreadID; const aID: Cardinal; const aArgs: TObject): Boolean;
- begin
- result := utlPostMessage(aThreadID, TutlMessage.Create(aID, aArgs));
- end;
-
- function utlPostMessage(const aThreadID: TThreadID; const aMsg: TutlMessage): Boolean;
- var
- t: TutlMessageThread;
- begin
- Threads.Lock;
- try
- t := Threads[aThreadID];
- finally
- Threads.Release;
- end;
- result := Assigned(t);
- if (result) then
- t.PostMessage(aMsg)
- else
- aMsg.Free;
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- function utlSendMessage(const aThreadID: TThreadID; const aID: Cardinal; const aWParam, aLParam: PtrInt; const aWaitTime: Cardinal): TWaitResult;
- begin
- result := utlSendMessage(aThreadID, TutlSynchronousMessage.Create(aID, aWParam, aLParam), aWaitTime);
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- function utlSendMessage(const aThreadID: TThreadID; const aID: Cardinal; const aArgs: TObject; const aWaitTime: Cardinal): TWaitResult;
- begin
- result := utlSendMessage(aThreadID, TutlSynchronousMessage.Create(aID, aArgs), aWaitTime);
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- function utlSendMessage(const aThreadID: TThreadID; const aMsg: TutlSynchronousMessage; const aWaitTime: Cardinal): TWaitResult;
- var
- t: TutlMessageThread;
- begin
- Threads.Lock;
- try
- t := Threads[aThreadID];
- finally
- Threads.Release;
- end;
- if Assigned(t) then
- result := t.SendMessage(aMsg)
- else begin
- result := wrError;
- aMsg.Free;
- end;
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- //TutlMessageThread.TMessageQueue///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- procedure TutlMessageThread.TMessageQueue.Push(const aItem: TutlMessage);
- begin
- inherited Push(aItem);
- fEvent.SetEvent;
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- function TutlMessageThread.TMessageQueue.Pop(out aItem: TutlMessage): Boolean;
- begin
- result := inherited Pop(aItem);
- if (Count <= 0) then
- fEvent.ResetEvent;
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- function TutlMessageThread.TMessageQueue.WaitForMessages(const aWaitTime: Cardinal): Boolean;
- var
- wr: TWaitResult;
- begin
- wr := fEvent.WaitFor(aWaitTime);
- result := (wr = wrSignaled);
- if not result and (wr <> wrTimeout) then
- raise EWait.Create('Error while waiting for messages', wr);
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- function TutlMessageThread.TMessageQueue.ProcessMessages(const aProgressCallback: TMessageProgressCallback): Boolean;
- var
- m: TutlMessage;
- empty: Boolean;
- begin
- empty := false;
- result := false;
- if not Assigned(aProgressCallback) then
- exit;
- repeat
- try
- if Pop(m) then begin
- result := true;
- try
- aProgressCallback(m);
- finally
- if (m is TutlSynchronousMessage) then
- (m as TutlSynchronousMessage).Finish
- else
- FreeAndNil(m);
- end;
- end else
- empty := true;
- except
- on e: Exception do begin
- utlLogger.Error(self, 'error while progressing message: %s - %s', [e.ClassName, e.Message]);
- end;
- end;
- until empty;
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- constructor TutlMessageThread.TMessageQueue.Create(const aOwnsObjects: Boolean);
- begin
- inherited Create(aOwnsObjects);
- fEvent := TSimpleEvent.Create;
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- destructor TutlMessageThread.TMessageQueue.Destroy;
- begin
- inherited Destroy;
- FreeAndNil(fEvent); // do not free event before all messages has been deleted
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- //TutlMessageThreadMap//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- procedure TutlMessageThreadMap.Lock;
- begin
- fCS.Acquire;
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- procedure TutlMessageThreadMap.Release;
- begin
- fCS.Release;
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- constructor TutlMessageThreadMap.Create(const aOwnsObjects: Boolean);
- begin
- inherited;
- fCS:= TCriticalSection.Create;
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- destructor TutlMessageThreadMap.Destroy;
- begin
- fCS.Acquire;
- try
- inherited Destroy;
- finally
- fCS.Release;
- end;
- FreeAndNil(fCS);
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- //TutlMessageThread/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- function TutlMessageThread.QueryInterface(constref iid: tguid; out obj): longint; {$IFNDEF WINDOWS}cdecl{$ELSE}stdcall{$ENDIF};
- begin
- if getinterface(iid,obj) then
- result := S_OK
- else
- result := longint(E_NOINTERFACE);
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- function TutlMessageThread._AddRef: longint;{$IFNDEF WINDOWS}cdecl{$ELSE}stdcall{$ENDIF};
- begin
- result := InterLockedIncrement(fRefCount);
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- function TutlMessageThread._Release: longint; {$IFNDEF WINDOWS}cdecl{$ELSE}stdcall{$ENDIF};
- begin
- result := InterLockedDecrement(fRefCount);
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- function TutlMessageThread.CreateMessageQueue: TMessageQueue;
- begin
- result := TMessageQueue.Create(true);
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- function TutlMessageThread.WaitForMessages(const aWaitTime: Cardinal): Boolean;
- begin
- result := fMessages.WaitForMessages(aWaitTime);
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- function TutlMessageThread.ProcessMessages: Boolean;
- begin
- result := fMessages.ProcessMessages(@ProcessMessage);
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- procedure TutlMessageThread.ProcessMessage(const aMessage: TutlMessage);
- begin
- case aMessage.ID of
- MSG_CALLBACK:
- (aMessage as TutlCallbackMsg).ExecuteCallback;
- MSG_SYNC_CALLBACK:
- (aMessage as TutlSyncCallbackMsg).ExecuteCallback;
- end;
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- procedure TutlMessageThread.PostMessage(const aID: Cardinal; const aWParam, aLParam: PtrInt);
- begin
- fMessages.Push(TutlMessage.Create(aID, aWParam, aLParam));
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- procedure TutlMessageThread.PostMessage(const aID: Cardinal; const aArgs: TObject);
- begin
- fMessages.Push(TutlMessage.Create(aID, aArgs));
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- procedure TutlMessageThread.PostMessage(const aMsg: TutlMessage);
- begin
- fMessages.Push(aMsg);
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- function TutlMessageThread.SendMessage(const aID: Cardinal; const aWParam, aLParam: PtrInt; const aWaitTime: Cardinal): TWaitResult;
- begin
- result := SendMessage(TutlSynchronousMessage.Create(aID, aWParam, aLParam), aWaitTime);
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- function TutlMessageThread.SendMessage(const aID: Cardinal; const aArgs: TObject; const aWaitTime: Cardinal): TWaitResult;
- begin
- result := SendMessage(TutlSynchronousMessage.Create(aID, aArgs), aWaitTime);
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- function TutlMessageThread.SendMessage(const aMsg: TutlSynchronousMessage; const aWaitTime: Cardinal): TWaitResult;
- begin
- fMessages.Push(aMsg);
- result := aMsg.WaitFor(aWaitTime);
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- constructor TutlMessageThread.Create(CreateSuspended: Boolean; const StackSize: SizeUInt);
- begin
- inherited Create(CreateSuspended, StackSize);
- fMessages := CreateMessageQueue;
- Threads.Lock;
- try
- Threads.Add(ThreadID, self);
- finally
- Threads.Release;
- end;
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- destructor TutlMessageThread.Destroy;
- begin
- Threads.Lock;
- try
- Threads.Delete(ThreadID);
- finally
- Threads.Release;
- end;
- FreeAndNil(fMessages);
- inherited Destroy;
- end;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- initialization
- Threads := TutlMessageThreadMap.Create(false);
-
- finalization
- Threads.Lock;
- try
- while (Threads.Count > 0) do
- Threads.ValueAt[Threads.Count-1].Free;
- finally
- Threads.Release;
- end;
- FreeAndNil(Threads);
-
- end.
|