Parcourir la source

* implemeted generic (almost lock-free) queue

* use new generic list for MessageThread
master
Bergmann89 il y a 9 ans
Parent
révision
e94a825f41
2 fichiers modifiés avec 188 ajouts et 135 suppressions
  1. +121
    -0
      uutlGenerics.pas
  2. +67
    -135
      uutlMessageThread.pas

+ 121
- 0
uutlGenerics.pas Voir le fichier

@@ -327,6 +327,36 @@ type
constructor Create(const aOwnsObjects: Boolean = true);
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
{ Lock-Free Queue for single Producer / Consumer calls;
Producer and Consumer are synchronized with SpinLocks }
generic TutlQueue<T> = class(TObject)
private type
PListItem = ^TListItem;
TListItem = packed record
data: T;
next: PListItem;
end;
private
fFirst: PListItem;
fLast: PListItem;
fFirstLock: Cardinal;
fLastLock: Cardinal;
fCount: Integer;
fOwnsObjects: Boolean;
function GetCount: Integer;
public
property Count: Integer read GetCount;

procedure Push(const aItem: T); virtual;
function Pop(out aItem: T): Boolean; virtual;
function Pop: Boolean;
procedure Clear;

constructor Create(const aOwnsObjects: Boolean = true);
destructor Destroy; override;
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
generic TutlInterfaceList<T> = class(TInterfaceList)
private type
@@ -1223,6 +1253,97 @@ begin
inherited Create(TComparer.Create, aOwnsObjects);
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//TutlQueue/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
function TutlQueue.GetCount: Integer;
begin
InterLockedExchange(result{%H-}, fCount);
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
procedure TutlQueue.Push(const aItem: T);
var
p: PListItem;
begin
// do as much as possible outside of the lock
new(p);
p^.data := aItem;
p^.next := nil;

while (InterLockedExchange(fLastLock, 1) <> 0) do;
try
fLast^.next := p; // is protected by fCount (if fCount = 0 then fLast = fFirst,
fLast := fLast^.next; // so pop must always check fCount, before touching fFirst)
InterLockedIncrement(fCount);
finally
InterLockedExchange(fLastLock, 0);
end;
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
function TutlQueue.Pop(out aItem: T): Boolean;
var
old: PListItem;
begin
// do as much as possible outside of the lock
result := false;
FillByte(aItem{%H-}, SizeOf(aItem), 0);

while (InterLockedExchange(fFirstLock, 1) <> 0) do;
try
if (Count <= 0) then
exit;
result := true;
old := fFirst;
fFirst := fFirst^.next;
aItem := fFirst^.data;
InterLockedDecrement(fCount);
finally
InterLockedExchange(fFirstLock, 0);
end;
Dispose(old);
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
function TutlQueue.Pop: Boolean;
var
tmp: T;
begin
result := Pop(tmp);
utlFreeOrFinalize(tmp, TypeInfo(tmp), fOwnsObjects);
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
procedure TutlQueue.Clear;
begin
while Pop do;
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
constructor TutlQueue.Create(const aOwnsObjects: Boolean);
begin
inherited Create;
new(fFirst);
FillByte(fFirst^, SizeOf(fFirst^), 0);
fLast := fFirst;
fFirstLock := 0;
fLastLock := 0;
fCount := 0;
fOwnsObjects := aOwnsObjects;
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
destructor TutlQueue.Destroy;
begin
Clear;
if Assigned(fLast) then begin
Dispose(fLast);
fLast := nil;
end;
inherited Destroy;
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//TutlInterfaceList.TInterfaceEnumerator////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////


+ 67
- 135
uutlMessageThread.pas Voir le fichier

@@ -6,31 +6,29 @@ unit uutlMessageThread;
mit anderen Threads austauschen kann }

{$mode objfpc}{$H+}
{$DEFINE USE_SPINLOCK}

interface

uses
Classes, SysUtils, syncobjs, uutlMessages;
Classes, SysUtils, syncobjs, uutlMessages, uutlGenerics;

type
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
TutlMessageThread = class(TThread, IUnknown)
protected type
TSingleLinkedListItem = class
msg: TutlMessage;
next: TSingleLinkedListItem;
private type
TMessageQueue = class(specialize TutlQueue<TutlMessage>)
private
fEvent: TSimpleEvent;
public
procedure Push(const aItem: T); override;
function Pop(out aItem: T): Boolean; override;
function WaitForMessages(const aWaitTime: Cardinal = INFINITE): Boolean;

constructor Create(const aOwnsObjects: Boolean = true);
destructor Destroy; override;
end;
private
{$IFDEF USE_SPINLOCK}
fLocked: Cardinal;
{$ELSE}
fCritSec: TCriticalSection;
{$ENDIF}
fMsgEvent: TEvent;
procedure PushMsg(aMessage: TutlMessage);
function PullMsg: TutlMessage;
procedure ClearMessages;
fMessages: TMessageQueue;
protected
fRefCount : longint;
{ implement methods of IUnknown }
@@ -38,12 +36,7 @@ type
function _AddRef : longint;{$IFNDEF WINDOWS}cdecl{$ELSE}stdcall{$ENDIF}; virtual;
function _Release : longint;{$IFNDEF WINDOWS}cdecl{$ELSE}stdcall{$ENDIF}; virtual;
protected
fFirst: TSingleLinkedListItem;
fLast: TSingleLinkedListItem;

procedure LockMessages;
procedure UnlockMessages;
function WaitForMessages(const aWaitTime: Cardinal = INFINITE): Boolean;
function WaitForMessages(const aWaitTime: Cardinal): Boolean;
function ProcessMessages: Boolean; virtual;
procedure ProcessMessage(const {%H-}aMessage: TutlMessage); virtual;
public
@@ -80,7 +73,7 @@ type
implementation

uses
uutlLogger, uutlGenerics, uutlExceptions;
uutlLogger, uutlExceptions;

type
TutlMessageThreadMap = class(specialize TutlMap<TThreadID, TutlMessageThread>)
@@ -158,6 +151,48 @@ begin
end;
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//TutlMessageThread.TMessageQueue///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
procedure TutlMessageThread.TMessageQueue.Push(const aItem: T);
begin
inherited Push(aItem);
fEvent.SetEvent;
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
function TutlMessageThread.TMessageQueue.Pop(out aItem: T): Boolean;
begin
result := inherited Pop(aItem);
if (Count <= 0) then
fEvent.ResetEvent;
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
function TutlMessageThread.TMessageQueue.WaitForMessages(const aWaitTime: Cardinal): Boolean;
var
wr: TWaitResult;
begin
wr := fEvent.WaitFor(aWaitTime);
result := (wr = wrSignaled);
if not result and (wr <> wrTimeout) then
raise EWait.Create('Error while waiting for messages', wr);
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
constructor TutlMessageThread.TMessageQueue.Create(const aOwnsObjects: Boolean);
begin
inherited Create(aOwnsObjects);
fEvent := TSimpleEvent.Create;
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
destructor TutlMessageThread.TMessageQueue.Destroy;
begin
inherited Destroy;
FreeAndNil(fEvent); // do not free event before all messages has been deleted
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//TutlMessageThreadMap//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -193,55 +228,6 @@ end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//TutlMessageThread/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
procedure TutlMessageThread.PushMsg(aMessage: TutlMessage);
begin
LockMessages;
try
if not Assigned(fLast) then
exit;
fLast.next := TSingleLinkedListItem.Create;
fLast.next.msg := aMessage;
fLast := fLast.next;
fMsgEvent.SetEvent;
finally
UnlockMessages;
end;
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
function TutlMessageThread.PullMsg: TutlMessage;
var
old: TSingleLinkedListItem;
begin
result := nil;
LockMessages;
try
if Assigned(fFirst) and Assigned(fFirst.next) then begin
old := fFirst;
fFirst := old.next;
result := fFirst.msg;
old.Free;
if not Assigned(fFirst.next) then
fMsgEvent.ResetEvent;
end;
finally
UnlockMessages;
end;
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
procedure TutlMessageThread.ClearMessages;
var
m: TutlMessage;
begin
repeat
m := PullMsg;
if Assigned(m) then
m.Free;
until not Assigned(m);
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
function TutlMessageThread.QueryInterface(constref iid: tguid; out obj): longint; {$IFNDEF WINDOWS}cdecl{$ELSE}stdcall{$ENDIF};
begin
@@ -263,40 +249,10 @@ begin
result := InterLockedDecrement(fRefCount);
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
procedure TutlMessageThread.LockMessages;
{$IFDEF USE_SPINLOCK}
var
lock: Cardinal;
begin
repeat
lock := InterLockedExchange(fLocked, 1);
until (lock = 0);
{$ELSE}
begin
fCritSec.Enter;
{$ENDIF}
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
procedure TutlMessageThread.UnlockMessages;
begin
{$IFDEF USE_SPINLOCK}
InterLockedExchange(fLocked, 0);
{$ELSE}
fCritSec.Leave;
{$ENDIF}
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
function TutlMessageThread.WaitForMessages(const aWaitTime: Cardinal): Boolean;
var
wr: TWaitResult;
begin
wr := fMsgEvent.WaitFor(aWaitTime);
result := (wr = wrSignaled);
if not result and (wr <> wrTimeout) then
raise EWait.Create('Error while waiting for messages', wr);
result := fMessages.WaitForMessages(aWaitTime);
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -309,8 +265,7 @@ begin
result := false;
repeat
try
m := PullMsg; //nur beim holen einer Message Locken sonst evtl. DeadLock
if Assigned(m) then begin
if fMessages.Pop(m) then begin
result := true;
try
ProcessMessage(m);
@@ -349,50 +304,38 @@ end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
procedure TutlMessageThread.PostMessage(const aID: Cardinal; const aWParam, aLParam: PtrInt);
var
m: TutlMessage;
begin
m := TutlMessage.Create(aID, aWParam, aLParam);
PushMsg(m);
fMessages.Push(TutlMessage.Create(aID, aWParam, aLParam));
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
procedure TutlMessageThread.PostMessage(const aID: Cardinal; const aArgs: TObject);
var
m: TutlMessage;
begin
m := TutlMessage.Create(aID, aArgs);
PushMsg(m);
fMessages.Push(TutlMessage.Create(aID, aArgs));
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
procedure TutlMessageThread.PostMessage(const aMsg: TutlMessage);
begin
PushMsg(aMsg);
fMessages.Push(aMsg);
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
function TutlMessageThread.SendMessage(const aID: Cardinal; const aWParam, aLParam: PtrInt; const aWaitTime: Cardinal): TWaitResult;
var
m: TutlSynchronousMessage;
begin
m := TutlSynchronousMessage.Create(aID, aWParam, aLParam);
result := SendMessage(m, aWaitTime);
result := SendMessage(TutlSynchronousMessage.Create(aID, aWParam, aLParam), aWaitTime);
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
function TutlMessageThread.SendMessage(const aID: Cardinal; const aArgs: TObject; const aWaitTime: Cardinal): TWaitResult;
var
m: TutlSynchronousMessage;
begin
m := TutlSynchronousMessage.Create(aID, aArgs);
result := SendMessage(m, aWaitTime);
result := SendMessage(TutlSynchronousMessage.Create(aID, aArgs), aWaitTime);
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
function TutlMessageThread.SendMessage(const aMsg: TutlSynchronousMessage; const aWaitTime: Cardinal): TWaitResult;
begin
PushMsg(aMsg);
fMessages.Push(aMsg);
result := aMsg.WaitFor(aWaitTime);
end;

@@ -400,18 +343,13 @@ end;
constructor TutlMessageThread.Create(CreateSuspended: Boolean; const StackSize: SizeUInt);
begin
inherited Create(CreateSuspended, StackSize);
fMsgEvent := TEvent.Create(nil, true, false, '');
fFirst := TSingleLinkedListItem.Create;
fLast := fFirst;
fMessages := TMessageQueue.Create;
Threads.Lock;
try
Threads.Add(ThreadID, self);
finally
Threads.Release;
end;
{$IFNDEF USE_SPINLOCK}
fCritSec := TCriticalSection.Create;
{$ENDIF}
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -423,13 +361,7 @@ begin
finally
Threads.Release;
end;
ClearMessages;
FreeAndNil(fFirst);
fLast := nil;
{$IFNDEF USE_SPINLOCK}
FreeAndNil(fCritSec);
{$ENDIF}
FreeAndNil(fMsgEvent);
FreeAndNil(fMessages);
inherited Destroy;
end;



Chargement…
Annuler
Enregistrer