Browse Source

* split queue in normal queue and spinlock synchronized queue

* use TutlSpinLock in SyncQueue
master
Bergmann89 9 years ago
parent
commit
c94b02d546
4 changed files with 155 additions and 108 deletions
  1. +68
    -35
      uutlGenerics.pas
  2. +1
    -1
      uutlMessageThread.pas
  3. +85
    -0
      uutlSyncObjs.pas
  4. +1
    -72
      uutlThreads.pas

+ 68
- 35
uutlGenerics.pas View File

@@ -10,7 +10,7 @@ unit uutlGenerics;
interface

uses
Classes, SysUtils, typinfo, syncobjs;
Classes, SysUtils, typinfo, uutlSyncObjs;

type
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -328,8 +328,6 @@ type
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
{ Lock-Free Queue for single Producer / Consumer calls;
Producer and Consumer are synchronized with SpinLocks }
generic TutlQueue<T> = class(TObject)
public type
PListItem = ^TListItem;
@@ -342,8 +340,6 @@ type
protected
fFirst: PListItem;
fLast: PListItem;
fFirstLock: Cardinal;
fLastLock: Cardinal;
fCount: Integer;
fOwnsObjects: Boolean;
public
@@ -358,6 +354,19 @@ type
destructor Destroy; override;
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
generic TutlSyncQueue<T> = class(specialize TutlQueue<T>)
private
fPushLock: TutlSpinLock;
fPopLock: TutlSpinLock;
public
procedure Push(const aItem: T); override;
function Pop(out aItem: T): Boolean; override;

constructor Create(const aOwnsObjects: Boolean = true);
destructor Destroy; override;
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
generic TutlInterfaceList<T> = class(TInterfaceList)
private type
@@ -410,7 +419,7 @@ type
fFillState: integer;
fWritePtr, fReadPtr: integer;
fWrittenEvent,
fReadEvent: TEvent;
fReadEvent: TutlAutoResetEvent;
public
constructor Create(const Elements: Integer);
destructor Destroy; override;
@@ -426,7 +435,7 @@ type
implementation

uses
uutlExceptions, uutlThreads;
uutlExceptions, syncobjs;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//Helper////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -1288,19 +1297,12 @@ procedure TutlQueue.Push(const aItem: T);
var
p: PListItem;
begin
// do as much as possible outside of the lock
new(p);
p^.data := aItem;
p^.next := nil;

while (InterLockedExchange(fLastLock, 1) <> 0) do;
try
fLast^.next := p; // is protected by fCount (if fCount = 0 then fLast = fFirst,
fLast := fLast^.next; // so pop must always check fCount, before touching fFirst)
InterLockedIncrement(fCount);
finally
InterLockedExchange(fLastLock, 0);
end;
fLast^.next := p;
fLast := fLast^.next;
InterLockedIncrement(fCount);
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -1308,22 +1310,15 @@ function TutlQueue.Pop(out aItem: T): Boolean;
var
old: PListItem;
begin
// do as much as possible outside of the lock
result := false;
FillByte(aItem{%H-}, SizeOf(aItem), 0);

while (InterLockedExchange(fFirstLock, 1) <> 0) do;
try
if (Count <= 0) then
exit;
result := true;
old := fFirst;
fFirst := fFirst^.next;
aItem := fFirst^.data;
InterLockedDecrement(fCount);
finally
InterLockedExchange(fFirstLock, 0);
end;
if (Count <= 0) then
exit;
result := true;
old := fFirst;
fFirst := fFirst^.next;
aItem := fFirst^.data;
InterLockedDecrement(fCount);
Dispose(old);
end;

@@ -1349,8 +1344,6 @@ begin
new(fFirst);
FillByte(fFirst^, SizeOf(fFirst^), 0);
fLast := fFirst;
fFirstLock := 0;
fLastLock := 0;
fCount := 0;
fOwnsObjects := aOwnsObjects;
end;
@@ -1366,6 +1359,46 @@ begin
inherited Destroy;
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//TutlSyncQueue/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
procedure TutlSyncQueue.Push(const aItem: T);
begin
fPushLock.Enter;
try
inherited Push(aItem);
finally
fPushLock.Leave;
end;
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
function TutlSyncQueue.Pop(out aItem: T): Boolean;
begin
fPopLock.Enter;
try
result := inherited Pop(aItem);
finally
fPopLock.Leave;
end;
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
constructor TutlSyncQueue.Create(const aOwnsObjects: Boolean);
begin
inherited Create(aOwnsObjects);
fPushLock := TutlSpinLock.Create;
fPopLock := TutlSpinLock.Create;
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
destructor TutlSyncQueue.Destroy;
begin
inherited Destroy; //inherited will pop all remaining items, so do not destroy spinlock before!
FreeAndNil(fPushLock);
FreeAndNil(fPopLock);
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//TutlInterfaceList.TInterfaceEnumerator////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -1565,8 +1598,8 @@ begin
fWritePtr:= 1;
fReadPtr:= 0;
fFillState:= 0;
fReadEvent:= TAutoResetEvent.Create;
fWrittenEvent:= TAutoResetEvent.Create;
fReadEvent:= TutlAutoResetEvent.Create;
fWrittenEvent:= TutlAutoResetEvent.Create;
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////


+ 1
- 1
uutlMessageThread.pas View File

@@ -16,7 +16,7 @@ type
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
TutlMessageThread = class(TThread, IUnknown)
public type
TMessageQueue = class(specialize TutlQueue<TutlMessage>)
TMessageQueue = class(specialize TutlSyncQueue<TutlMessage>)
private
fEvent: TSimpleEvent;
public


+ 85
- 0
uutlSyncObjs.pas View File

@@ -0,0 +1,85 @@
unit uutlSyncObjs;

{$mode objfpc}{$H+}

interface

uses
Classes, SysUtils, syncobjs;

type
TAutoResetEvent = class(TEvent)
public
constructor Create(aInitial: boolean = false);
end;
// aliased to stay in LCL naming scheme for TSimpleEvent
TutlAutoResetEvent = TAutoResetEvent;

TutlSpinLock = class
private
fLock: DWord;
fLockReused: integer;
public
constructor Create;
destructor Destroy; override;
procedure Enter;
procedure Leave;
end;

implementation

{ TAutoResetEvent }

constructor TAutoResetEvent.Create(aInitial: boolean);
begin
inherited Create(Nil, false, aInitial, '');
end;

{ TutlSpinLock }

constructor TutlSpinLock.Create;
begin
inherited Create;
fLock:= 0;
fLockReused:= 0;
end;

destructor TutlSpinLock.Destroy;
begin
Enter;
inherited Destroy;
end;

procedure TutlSpinLock.Enter;
var
ti: dword;
begin
ti:= ThreadID;
if ti = InterlockedCompareExchange(fLock, ti, ti) then begin
{
The lock is already held by this thread. This means it cannot be modified by a concurrent
operation (assuming Enter/Leave bracket correctly), and we can act non-atomar on other variables.
}
inc(fLockReused);
end else begin
while InterlockedCompareExchange(fLock, ti, 0) <> 0 do ;
end;
end;

procedure TutlSpinLock.Leave;
var
ti: DWord;
begin
ti:= ThreadID;
// Unlock only if we hold the lock
if ti = InterlockedCompareExchange(fLock, ti, ti) then begin
// our lock, but we haven't yet done anything (note the above is essentially a threadsafe CMP if successful)
if fLockReused = 0 then
InterLockedExchange(fLock, 0) // normal lock
else
dec(fLockReused); // nested locks
end;
end;

end.


+ 1
- 72
uutlThreads.pas View File

@@ -10,27 +10,9 @@ unit uutlThreads;
interface

uses
Classes, SysUtils, syncobjs, uutlGenerics;
Classes, SysUtils, syncobjs, uutlGenerics, uutlSyncObjs;

type
TAutoResetEvent = class(TEvent)
public
constructor Create(aInitial: boolean = false);
end;
// aliased to stay in LCL naming scheme for TSimpleEvent
TutlAutoResetEvent = TAutoResetEvent;

TutlSpinLock = class
private
fLock: DWord;
fLockReused: integer;
public
constructor Create;
destructor Destroy; override;
procedure Enter;
procedure Leave;
end;

IutlThreadPoolRunnable = interface
['{DFEC1832-30DA-4D12-A321-7762057D2D17}']
function Execute: PtrUInt;
@@ -115,59 +97,6 @@ type

implementation

{ TAutoResetEvent }

constructor TAutoResetEvent.Create(aInitial: boolean);
begin
inherited Create(Nil, false, aInitial, '');
end;

{ TutlSpinLock }

constructor TutlSpinLock.Create;
begin
inherited Create;
fLock:= 0;
fLockReused:= 0;
end;

destructor TutlSpinLock.Destroy;
begin
Enter;
inherited Destroy;
end;

procedure TutlSpinLock.Enter;
var
ti: dword;
begin
ti:= ThreadID;
if ti = InterlockedCompareExchange(fLock, ti, ti) then begin
{
The lock is already held by this thread. This means it cannot be modified by a concurrent
operation (assuming Enter/Leave bracket correctly), and we can act non-atomar on other variables.
}
inc(fLockReused);
end else begin
while InterlockedCompareExchange(fLock, ti, 0) <> 0 do ;
end;
end;

procedure TutlSpinLock.Leave;
var
ti: DWord;
begin
ti:= ThreadID;
// Unlock only if we hold the lock
if ti = InterlockedCompareExchange(fLock, ti, ti) then begin
// our lock, but we haven't yet done anything (note the above is essentially a threadsafe CMP if successful)
if fLockReused = 0 then
InterLockedExchange(fLock, 0) // normal lock
else
dec(fLockReused); // nested locks
end;
end;

{ TutlThreadPool }

constructor TutlThreadPool.Create(const ThreadCount: integer);


Loading…
Cancel
Save