diff --git a/uutlGenerics.pas b/uutlGenerics.pas index 77d6a9e..fa42bed 100644 --- a/uutlGenerics.pas +++ b/uutlGenerics.pas @@ -327,6 +327,36 @@ type constructor Create(const aOwnsObjects: Boolean = true); end; +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +{ Lock-Free Queue for single Producer / Consumer calls; + Producer and Consumer are synchronized with SpinLocks } + generic TutlQueue = class(TObject) + private type + PListItem = ^TListItem; + TListItem = packed record + data: T; + next: PListItem; + end; + private + fFirst: PListItem; + fLast: PListItem; + fFirstLock: Cardinal; + fLastLock: Cardinal; + fCount: Integer; + fOwnsObjects: Boolean; + function GetCount: Integer; + public + property Count: Integer read GetCount; + + procedure Push(const aItem: T); virtual; + function Pop(out aItem: T): Boolean; virtual; + function Pop: Boolean; + procedure Clear; + + constructor Create(const aOwnsObjects: Boolean = true); + destructor Destroy; override; + end; + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// generic TutlInterfaceList = class(TInterfaceList) private type @@ -1223,6 +1253,97 @@ begin inherited Create(TComparer.Create, aOwnsObjects); end; +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +//TutlQueue///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +function TutlQueue.GetCount: Integer; +begin + InterLockedExchange(result{%H-}, fCount); +end; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +procedure TutlQueue.Push(const aItem: T); +var + p: PListItem; +begin + // do as much as possible outside of the lock + new(p); + p^.data := aItem; + p^.next := nil; + + while (InterLockedExchange(fLastLock, 1) <> 0) do; + try + fLast^.next := p; // is protected by fCount (if fCount = 0 then fLast = fFirst, + fLast := fLast^.next; // so pop must always check fCount, before touching fFirst) + InterLockedIncrement(fCount); + finally + InterLockedExchange(fLastLock, 0); + end; +end; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +function TutlQueue.Pop(out aItem: T): Boolean; +var + old: PListItem; +begin + // do as much as possible outside of the lock + result := false; + FillByte(aItem{%H-}, SizeOf(aItem), 0); + + while (InterLockedExchange(fFirstLock, 1) <> 0) do; + try + if (Count <= 0) then + exit; + result := true; + old := fFirst; + fFirst := fFirst^.next; + aItem := fFirst^.data; + InterLockedDecrement(fCount); + finally + InterLockedExchange(fFirstLock, 0); + end; + Dispose(old); +end; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +function TutlQueue.Pop: Boolean; +var + tmp: T; +begin + result := Pop(tmp); + utlFreeOrFinalize(tmp, TypeInfo(tmp), fOwnsObjects); +end; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +procedure TutlQueue.Clear; +begin + while Pop do; +end; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +constructor TutlQueue.Create(const aOwnsObjects: Boolean); +begin + inherited Create; + new(fFirst); + FillByte(fFirst^, SizeOf(fFirst^), 0); + fLast := fFirst; + fFirstLock := 0; + fLastLock := 0; + fCount := 0; + fOwnsObjects := aOwnsObjects; +end; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +destructor TutlQueue.Destroy; +begin + Clear; + if Assigned(fLast) then begin + Dispose(fLast); + fLast := nil; + end; + inherited Destroy; +end; + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //TutlInterfaceList.TInterfaceEnumerator//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/uutlMessageThread.pas b/uutlMessageThread.pas index 8ff7d10..b67344a 100644 --- a/uutlMessageThread.pas +++ b/uutlMessageThread.pas @@ -6,31 +6,29 @@ unit uutlMessageThread; mit anderen Threads austauschen kann } {$mode objfpc}{$H+} -{$DEFINE USE_SPINLOCK} interface uses - Classes, SysUtils, syncobjs, uutlMessages; + Classes, SysUtils, syncobjs, uutlMessages, uutlGenerics; type //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// TutlMessageThread = class(TThread, IUnknown) - protected type - TSingleLinkedListItem = class - msg: TutlMessage; - next: TSingleLinkedListItem; + private type + TMessageQueue = class(specialize TutlQueue) + private + fEvent: TSimpleEvent; + public + procedure Push(const aItem: T); override; + function Pop(out aItem: T): Boolean; override; + function WaitForMessages(const aWaitTime: Cardinal = INFINITE): Boolean; + + constructor Create(const aOwnsObjects: Boolean = true); + destructor Destroy; override; end; private - {$IFDEF USE_SPINLOCK} - fLocked: Cardinal; - {$ELSE} - fCritSec: TCriticalSection; - {$ENDIF} - fMsgEvent: TEvent; - procedure PushMsg(aMessage: TutlMessage); - function PullMsg: TutlMessage; - procedure ClearMessages; + fMessages: TMessageQueue; protected fRefCount : longint; { implement methods of IUnknown } @@ -38,12 +36,7 @@ type function _AddRef : longint;{$IFNDEF WINDOWS}cdecl{$ELSE}stdcall{$ENDIF}; virtual; function _Release : longint;{$IFNDEF WINDOWS}cdecl{$ELSE}stdcall{$ENDIF}; virtual; protected - fFirst: TSingleLinkedListItem; - fLast: TSingleLinkedListItem; - - procedure LockMessages; - procedure UnlockMessages; - function WaitForMessages(const aWaitTime: Cardinal = INFINITE): Boolean; + function WaitForMessages(const aWaitTime: Cardinal): Boolean; function ProcessMessages: Boolean; virtual; procedure ProcessMessage(const {%H-}aMessage: TutlMessage); virtual; public @@ -80,7 +73,7 @@ type implementation uses - uutlLogger, uutlGenerics, uutlExceptions; + uutlLogger, uutlExceptions; type TutlMessageThreadMap = class(specialize TutlMap) @@ -158,6 +151,48 @@ begin end; end; +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +//TutlMessageThread.TMessageQueue/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +procedure TutlMessageThread.TMessageQueue.Push(const aItem: T); +begin + inherited Push(aItem); + fEvent.SetEvent; +end; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +function TutlMessageThread.TMessageQueue.Pop(out aItem: T): Boolean; +begin + result := inherited Pop(aItem); + if (Count <= 0) then + fEvent.ResetEvent; +end; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +function TutlMessageThread.TMessageQueue.WaitForMessages(const aWaitTime: Cardinal): Boolean; +var + wr: TWaitResult; +begin + wr := fEvent.WaitFor(aWaitTime); + result := (wr = wrSignaled); + if not result and (wr <> wrTimeout) then + raise EWait.Create('Error while waiting for messages', wr); +end; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +constructor TutlMessageThread.TMessageQueue.Create(const aOwnsObjects: Boolean); +begin + inherited Create(aOwnsObjects); + fEvent := TSimpleEvent.Create; +end; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +destructor TutlMessageThread.TMessageQueue.Destroy; +begin + inherited Destroy; + FreeAndNil(fEvent); // do not free event before all messages has been deleted +end; + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //TutlMessageThreadMap////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -193,55 +228,6 @@ end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //TutlMessageThread///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -procedure TutlMessageThread.PushMsg(aMessage: TutlMessage); -begin - LockMessages; - try - if not Assigned(fLast) then - exit; - fLast.next := TSingleLinkedListItem.Create; - fLast.next.msg := aMessage; - fLast := fLast.next; - fMsgEvent.SetEvent; - finally - UnlockMessages; - end; -end; - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -function TutlMessageThread.PullMsg: TutlMessage; -var - old: TSingleLinkedListItem; -begin - result := nil; - LockMessages; - try - if Assigned(fFirst) and Assigned(fFirst.next) then begin - old := fFirst; - fFirst := old.next; - result := fFirst.msg; - old.Free; - if not Assigned(fFirst.next) then - fMsgEvent.ResetEvent; - end; - finally - UnlockMessages; - end; -end; - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -procedure TutlMessageThread.ClearMessages; -var - m: TutlMessage; -begin - repeat - m := PullMsg; - if Assigned(m) then - m.Free; - until not Assigned(m); -end; - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// function TutlMessageThread.QueryInterface(constref iid: tguid; out obj): longint; {$IFNDEF WINDOWS}cdecl{$ELSE}stdcall{$ENDIF}; begin @@ -263,40 +249,10 @@ begin result := InterLockedDecrement(fRefCount); end; -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -procedure TutlMessageThread.LockMessages; -{$IFDEF USE_SPINLOCK} -var - lock: Cardinal; -begin - repeat - lock := InterLockedExchange(fLocked, 1); - until (lock = 0); -{$ELSE} -begin - fCritSec.Enter; -{$ENDIF} -end; - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -procedure TutlMessageThread.UnlockMessages; -begin - {$IFDEF USE_SPINLOCK} - InterLockedExchange(fLocked, 0); - {$ELSE} - fCritSec.Leave; - {$ENDIF} -end; - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// function TutlMessageThread.WaitForMessages(const aWaitTime: Cardinal): Boolean; -var - wr: TWaitResult; begin - wr := fMsgEvent.WaitFor(aWaitTime); - result := (wr = wrSignaled); - if not result and (wr <> wrTimeout) then - raise EWait.Create('Error while waiting for messages', wr); + result := fMessages.WaitForMessages(aWaitTime); end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -309,8 +265,7 @@ begin result := false; repeat try - m := PullMsg; //nur beim holen einer Message Locken sonst evtl. DeadLock - if Assigned(m) then begin + if fMessages.Pop(m) then begin result := true; try ProcessMessage(m); @@ -349,50 +304,38 @@ end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// procedure TutlMessageThread.PostMessage(const aID: Cardinal; const aWParam, aLParam: PtrInt); -var - m: TutlMessage; begin - m := TutlMessage.Create(aID, aWParam, aLParam); - PushMsg(m); + fMessages.Push(TutlMessage.Create(aID, aWParam, aLParam)); end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// procedure TutlMessageThread.PostMessage(const aID: Cardinal; const aArgs: TObject); -var - m: TutlMessage; begin - m := TutlMessage.Create(aID, aArgs); - PushMsg(m); + fMessages.Push(TutlMessage.Create(aID, aArgs)); end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// procedure TutlMessageThread.PostMessage(const aMsg: TutlMessage); begin - PushMsg(aMsg); + fMessages.Push(aMsg); end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// function TutlMessageThread.SendMessage(const aID: Cardinal; const aWParam, aLParam: PtrInt; const aWaitTime: Cardinal): TWaitResult; -var - m: TutlSynchronousMessage; begin - m := TutlSynchronousMessage.Create(aID, aWParam, aLParam); - result := SendMessage(m, aWaitTime); + result := SendMessage(TutlSynchronousMessage.Create(aID, aWParam, aLParam), aWaitTime); end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// function TutlMessageThread.SendMessage(const aID: Cardinal; const aArgs: TObject; const aWaitTime: Cardinal): TWaitResult; -var - m: TutlSynchronousMessage; begin - m := TutlSynchronousMessage.Create(aID, aArgs); - result := SendMessage(m, aWaitTime); + result := SendMessage(TutlSynchronousMessage.Create(aID, aArgs), aWaitTime); end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// function TutlMessageThread.SendMessage(const aMsg: TutlSynchronousMessage; const aWaitTime: Cardinal): TWaitResult; begin - PushMsg(aMsg); + fMessages.Push(aMsg); result := aMsg.WaitFor(aWaitTime); end; @@ -400,18 +343,13 @@ end; constructor TutlMessageThread.Create(CreateSuspended: Boolean; const StackSize: SizeUInt); begin inherited Create(CreateSuspended, StackSize); - fMsgEvent := TEvent.Create(nil, true, false, ''); - fFirst := TSingleLinkedListItem.Create; - fLast := fFirst; + fMessages := TMessageQueue.Create; Threads.Lock; try Threads.Add(ThreadID, self); finally Threads.Release; end; - {$IFNDEF USE_SPINLOCK} - fCritSec := TCriticalSection.Create; - {$ENDIF} end; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -423,13 +361,7 @@ begin finally Threads.Release; end; - ClearMessages; - FreeAndNil(fFirst); - fLast := nil; - {$IFNDEF USE_SPINLOCK} - FreeAndNil(fCritSec); - {$ENDIF} - FreeAndNil(fMsgEvent); + FreeAndNil(fMessages); inherited Destroy; end;