unit uutlMessageThread; { Package: Utils Prefix: utl - UTiLs Beschreibung: diese Unit definiert einen Thread, der mit Hilfe von Messages Daten synchronisiert mit anderen Threads austauschen kann } {$mode objfpc}{$H+} interface uses Classes, SysUtils, syncobjs, uutlMessages, uutlGenerics; type //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// TutlMessageThread = class(TThread, IUnknown) public type TMessageProgressCallback = procedure(const aMsg: TutlMessage) of Object; TMessageQueue = class(specialize TutlSyncQueue) private fEvent: TSimpleEvent; public procedure Push(const aItem: TutlMessage); override; function Pop(out aItem: TutlMessage): Boolean; override; function WaitForMessages(const aWaitTime: Cardinal = INFINITE): Boolean; function ProcessMessages(const aProgressCallback: TMessageProgressCallback): Boolean; constructor Create(const aOwnsObjects: Boolean = true); destructor Destroy; override; end; protected fMessages: TMessageQueue; fRefCount : longint; { implement methods of IUnknown } function QueryInterface({$IFDEF FPC_HAS_CONSTREF}constref{$ELSE}const{$ENDIF} iid : tguid;out obj) : longint;{$IFNDEF WINDOWS}cdecl{$ELSE}stdcall{$ENDIF}; function _AddRef : longint;{$IFNDEF WINDOWS}cdecl{$ELSE}stdcall{$ENDIF}; virtual; function _Release : longint;{$IFNDEF WINDOWS}cdecl{$ELSE}stdcall{$ENDIF}; virtual; protected function CreateMessageQueue: TMessageQueue; virtual; function WaitForMessages(const aWaitTime: Cardinal): Boolean; function ProcessMessages: Boolean; virtual; procedure ProcessMessage(const {%H-}aMessage: TutlMessage); virtual; public //Messages Objects passed to PostMessage will be freed automatically procedure PostMessage(const aID: Cardinal; const aWParam, aLParam: PtrInt); overload; procedure PostMessage(const aID: Cardinal; const aArgs: TObject); overload; procedure PostMessage(const aMsg: TutlMessage); virtual; overload; //Messages Objects passed to SendMessage must be freed by user when WaitResult is wrSignaled (otherwise the thread will handle it) function SendMessage(const aID: Cardinal; const aWParam, aLParam: PtrInt; const aWaitTime: Cardinal = INFINITE): TWaitResult; overload; function SendMessage(const aID: Cardinal; const aArgs: TObject; const aWaitTime: Cardinal = INFINITE): TWaitResult; overload; function SendMessage(const aMsg: TutlSynchronousMessage; const aWaitTime: Cardinal = INFINITE): TWaitResult; virtual; overload; constructor Create(CreateSuspended: Boolean; const StackSize: SizeUInt=DefaultStackSize); destructor Destroy; override; end; //Messages Objects passed to PostMessage will be freed automatically function utlPostMessage(const aThreadID: TThreadID; const aID: Cardinal; const aWParam, aLParam: PtrInt): Boolean; overload; function utlPostMessage(const aThreadID: TThreadID; const aID: Cardinal; const aArgs: TObject): Boolean; overload; function utlPostMessage(const aThreadID: TThreadID; const aMsg: TutlMessage): Boolean; overload; //Messages Objects passed to SendMessage must be freed by user when WaitResult is wrSignaled (otherwise the thread will handle it) function utlSendMessage(const aThreadID: TThreadID; const aID: Cardinal; const aWParam, aLParam: PtrInt; const aWaitTime: Cardinal = INFINITE): TWaitResult; overload; function utlSendMessage(const aThreadID: TThreadID; const aID: Cardinal; const aArgs: TObject; const aWaitTime: Cardinal = INFINITE): TWaitResult; overload; function utlSendMessage(const aThreadID: TThreadID; const aMsg: TutlSynchronousMessage; const aWaitTime: Cardinal = INFINITE): TWaitResult; overload; implementation uses uutlLogger, uutlExceptions; type TutlMessageThreadMap = class(specialize TutlMap) private fCS: TCriticalSection; public procedure Lock; procedure Release; constructor Create(const aOwnsObjects: Boolean = true); destructor Destroy; override; end; var Threads: TutlMessageThreadMap; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// function utlPostMessage(const aThreadID: TThreadID; const aID: Cardinal; const aWParam, aLParam: PtrInt): Boolean; begin result := utlPostMessage(aThreadID, TutlMessage.Create(aID, aWParam, aLParam)); end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// function utlPostMessage(const aThreadID: TThreadID; const aID: Cardinal; const aArgs: TObject): Boolean; begin result := utlPostMessage(aThreadID, TutlMessage.Create(aID, aArgs)); end; function utlPostMessage(const aThreadID: TThreadID; const aMsg: TutlMessage): Boolean; var t: TutlMessageThread; begin Threads.Lock; try t := Threads[aThreadID]; finally Threads.Release; end; result := Assigned(t); if (result) then t.PostMessage(aMsg) else aMsg.Free; end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// function utlSendMessage(const aThreadID: TThreadID; const aID: Cardinal; const aWParam, aLParam: PtrInt; const aWaitTime: Cardinal): TWaitResult; begin result := utlSendMessage(aThreadID, TutlSynchronousMessage.Create(aID, aWParam, aLParam), aWaitTime); end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// function utlSendMessage(const aThreadID: TThreadID; const aID: Cardinal; const aArgs: TObject; const aWaitTime: Cardinal): TWaitResult; begin result := utlSendMessage(aThreadID, TutlSynchronousMessage.Create(aID, aArgs), aWaitTime); end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// function utlSendMessage(const aThreadID: TThreadID; const aMsg: TutlSynchronousMessage; const aWaitTime: Cardinal): TWaitResult; var t: TutlMessageThread; begin Threads.Lock; try t := Threads[aThreadID]; finally Threads.Release; end; if Assigned(t) then result := t.SendMessage(aMsg) else begin result := wrError; aMsg.Free; end; end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //TutlMessageThread.TMessageQueue/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// procedure TutlMessageThread.TMessageQueue.Push(const aItem: TutlMessage); begin inherited Push(aItem); fEvent.SetEvent; end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// function TutlMessageThread.TMessageQueue.Pop(out aItem: TutlMessage): Boolean; begin result := inherited Pop(aItem); if (Count <= 0) then fEvent.ResetEvent; end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// function TutlMessageThread.TMessageQueue.WaitForMessages(const aWaitTime: Cardinal): Boolean; var wr: TWaitResult; begin wr := fEvent.WaitFor(aWaitTime); result := (wr = wrSignaled); if not result and (wr <> wrTimeout) then raise EWait.Create('Error while waiting for messages', wr); end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// function TutlMessageThread.TMessageQueue.ProcessMessages(const aProgressCallback: TMessageProgressCallback): Boolean; var m: TutlMessage; empty: Boolean; begin empty := false; result := false; if not Assigned(aProgressCallback) then exit; repeat try if Pop(m) then begin result := true; try aProgressCallback(m); finally if (m is TutlSynchronousMessage) then (m as TutlSynchronousMessage).Finish else FreeAndNil(m); end; end else empty := true; except on e: Exception do begin utlLogger.Error(self, 'error while progressing message: %s - %s', [e.ClassName, e.Message]); end; end; until empty; end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// constructor TutlMessageThread.TMessageQueue.Create(const aOwnsObjects: Boolean); begin inherited Create(aOwnsObjects); fEvent := TSimpleEvent.Create; end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// destructor TutlMessageThread.TMessageQueue.Destroy; begin inherited Destroy; FreeAndNil(fEvent); // do not free event before all messages has been deleted end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //TutlMessageThreadMap////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// procedure TutlMessageThreadMap.Lock; begin fCS.Acquire; end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// procedure TutlMessageThreadMap.Release; begin fCS.Release; end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// constructor TutlMessageThreadMap.Create(const aOwnsObjects: Boolean); begin inherited; fCS:= TCriticalSection.Create; end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// destructor TutlMessageThreadMap.Destroy; begin fCS.Acquire; try inherited Destroy; finally fCS.Release; end; FreeAndNil(fCS); end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //TutlMessageThread///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// function TutlMessageThread.QueryInterface(constref iid: tguid; out obj): longint; {$IFNDEF WINDOWS}cdecl{$ELSE}stdcall{$ENDIF}; begin if getinterface(iid,obj) then result := S_OK else result := longint(E_NOINTERFACE); end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// function TutlMessageThread._AddRef: longint;{$IFNDEF WINDOWS}cdecl{$ELSE}stdcall{$ENDIF}; begin result := InterLockedIncrement(fRefCount); end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// function TutlMessageThread._Release: longint; {$IFNDEF WINDOWS}cdecl{$ELSE}stdcall{$ENDIF}; begin result := InterLockedDecrement(fRefCount); end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// function TutlMessageThread.CreateMessageQueue: TMessageQueue; begin result := TMessageQueue.Create(true); end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// function TutlMessageThread.WaitForMessages(const aWaitTime: Cardinal): Boolean; begin result := fMessages.WaitForMessages(aWaitTime); end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// function TutlMessageThread.ProcessMessages: Boolean; begin result := fMessages.ProcessMessages(@ProcessMessage); end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// procedure TutlMessageThread.ProcessMessage(const aMessage: TutlMessage); begin case aMessage.ID of MSG_CALLBACK: (aMessage as TutlCallbackMsg).ExecuteCallback; MSG_SYNC_CALLBACK: (aMessage as TutlSyncCallbackMsg).ExecuteCallback; end; end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// procedure TutlMessageThread.PostMessage(const aID: Cardinal; const aWParam, aLParam: PtrInt); begin fMessages.Push(TutlMessage.Create(aID, aWParam, aLParam)); end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// procedure TutlMessageThread.PostMessage(const aID: Cardinal; const aArgs: TObject); begin fMessages.Push(TutlMessage.Create(aID, aArgs)); end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// procedure TutlMessageThread.PostMessage(const aMsg: TutlMessage); begin fMessages.Push(aMsg); end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// function TutlMessageThread.SendMessage(const aID: Cardinal; const aWParam, aLParam: PtrInt; const aWaitTime: Cardinal): TWaitResult; begin result := SendMessage(TutlSynchronousMessage.Create(aID, aWParam, aLParam), aWaitTime); end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// function TutlMessageThread.SendMessage(const aID: Cardinal; const aArgs: TObject; const aWaitTime: Cardinal): TWaitResult; begin result := SendMessage(TutlSynchronousMessage.Create(aID, aArgs), aWaitTime); end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// function TutlMessageThread.SendMessage(const aMsg: TutlSynchronousMessage; const aWaitTime: Cardinal): TWaitResult; begin fMessages.Push(aMsg); result := aMsg.WaitFor(aWaitTime); end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// constructor TutlMessageThread.Create(CreateSuspended: Boolean; const StackSize: SizeUInt); begin inherited Create(CreateSuspended, StackSize); fMessages := CreateMessageQueue; Threads.Lock; try Threads.Add(ThreadID, self); finally Threads.Release; end; end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// destructor TutlMessageThread.Destroy; begin Threads.Lock; try Threads.Delete(ThreadID); finally Threads.Release; end; FreeAndNil(fMessages); inherited Destroy; end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// initialization Threads := TutlMessageThreadMap.Create(false); finalization Threads.Lock; try while (Threads.Count > 0) do Threads.ValueAt[Threads.Count-1].Free; finally Threads.Release; end; FreeAndNil(Threads); end.