Procházet zdrojové kódy

* add generic RingBuffer; SpinLock, Threadpool

master
Martok před 9 roky
rodič
revize
ec562830af
2 změnil soubory, kde provedl 523 přidání a 2 odebrání
  1. +124
    -2
      uutlGenerics.pas
  2. +399
    -0
      uutlThreads.pas

+ 124
- 2
uutlGenerics.pas Zobrazit soubor

@@ -10,7 +10,7 @@ unit uutlGenerics;
interface

uses
Classes, SysUtils, typinfo;
Classes, SysUtils, typinfo, syncobjs;

type
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -400,12 +400,33 @@ type
class function Values: TValueArray;
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
generic TutlRingBuffer<T> = class
private
fAborted: boolean;
fData: packed array of T;
fDataLen: Integer;
fDataSize: integer;
fFillState: integer;
fWritePtr, fReadPtr: integer;
fWrittenEvent,
fReadEvent: TEvent;
public
constructor Create(const Elements: Integer);
destructor Destroy; override;
function Read(Buf: Pointer; Items: integer; BlockUntilAvail: boolean): integer;
function Write(Buf: Pointer; Items: integer; BlockUntilDone: boolean): integer;
procedure BreakPipe;
property FillState: Integer read fFillState;
property Size: integer read fDataLen;
end;

function utlFreeOrFinalize(var obj; const aTypeInfo: PTypeInfo; const aFreeObj: Boolean = true): Boolean;

implementation

uses
uutlExceptions;
uutlExceptions, uutlThreads;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//Helper////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -1531,5 +1552,106 @@ begin
end;
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//TutlRingBuffer////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
constructor TutlRingBuffer.Create(const Elements: Integer);
begin
inherited Create;
fAborted:= false;
fDataLen:= Elements;
fDataSize:= SizeOf(T);
SetLength(fData, fDataLen);
fWritePtr:= 1;
fReadPtr:= 0;
fFillState:= 0;
fReadEvent:= TAutoResetEvent.Create;
fWrittenEvent:= TAutoResetEvent.Create;
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
destructor TutlRingBuffer.Destroy;
begin
BreakPipe;
FreeAndNil(fReadEvent);
FreeAndNil(fWrittenEvent);
SetLength(fData, 0);
inherited Destroy;
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
function TutlRingBuffer.Read(Buf: Pointer; Items: integer; BlockUntilAvail: boolean): integer;
var
wp, c, r: Integer;
begin
Result:= 0;
while Items > 0 do begin
if fAborted then
exit;

InterLockedExchange(wp{%H-}, fWritePtr);
r:= (fReadPtr + 1) mod fDataLen;
if wp < r then
wp:= fDataLen;
c:= wp - r;
if c > Items then
c:= Items;
if c > 0 then begin
Move(fData[r], Buf^, c * fDataSize);
Dec(Items, c);
inc(Result, c);
dec(fFillState, c);
inc(PByte(Buf), c * fDataSize);
InterLockedExchange(fReadPtr, (fReadPtr + c) mod fDataLen);
fReadEvent.SetEvent;
end else begin
if not BlockUntilAvail then
break;
fWrittenEvent.WaitFor(INFINITE);
end;
end;
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
function TutlRingBuffer.Write(Buf: Pointer; Items: integer; BlockUntilDone: boolean): integer;
var
rp, c: integer;
begin
Result:= 0;
while Items > 0 do begin
if fAborted then
exit;

InterLockedExchange(rp{%H-}, fReadPtr);
if rp < fWritePtr then
rp:= fDataLen;
c:= rp - fWritePtr;
if c > Items then
c:= Items;
if c > 0 then begin
Move(Buf^, fData[fWritePtr], c * fDataSize);
dec(Items, c);
inc(Result, c);
inc(fFillState, c);
inc(PByte(Buf), c * fDataSize);
InterLockedExchange(fWritePtr, (fWritePtr + c) mod fDataLen);
fWrittenEvent.SetEvent;
end else begin
if not BlockUntilDone then
Break;
fReadEvent.WaitFor(INFINITE);
end;
end;
end;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
procedure TutlRingBuffer.BreakPipe;
begin
fAborted:= true;
fWrittenEvent.SetEvent;
fReadEvent.SetEvent;
end;


end.


+ 399
- 0
uutlThreads.pas Zobrazit soubor

@@ -0,0 +1,399 @@
unit uutlThreads;

{ Package: Utils
Prefix: utl - UTiLs
Beschreibung: diese Unit implementiert Hilfsklassen für multithreaded Anwendungen }

{$mode objfpc}{$H+}
{$modeswitch nestedprocvars}

interface

uses
Classes, SysUtils, syncobjs, uutlGenerics;

type
TAutoResetEvent = class(TEvent)
public
constructor Create(aInitial: boolean = false);
end;
// aliased to stay in LCL naming scheme for TSimpleEvent
TutlAutoResetEvent = TAutoResetEvent;

TutlSpinLock = class
private
fLock: DWord;
fLockReused: integer;
public
constructor Create;
destructor Destroy; override;
procedure Enter;
procedure Leave;
end;

IutlThreadPoolRunnable = interface
['{DFEC1832-30DA-4D12-A321-7762057D2D17}']
function Execute: PtrUInt;
end;
IutlThreadPoolWaitable = interface
['{8FEB8A6B-B659-4A48-929F-650CE7455BC7}']
function WaitFor: TWaitResult;
function TaskResult: PtrUInt;
end;

TAsyncFuncNested = function : PtrUInt is nested;
TAsyncFuncParamNested = function (Param: PtrUInt) : PtrUInt is nested;

TutlThreadPool = class
private type
TQueueItem = class(TInterfacedObject, IutlThreadPoolWaitable)
private
fTask: IutlThreadPoolRunnable;
fDoneEvent: TEvent;
fResult: PtrUInt;
public
constructor Create(const aTask: IutlThreadPoolRunnable);
destructor Destroy; override;
procedure Execute;
procedure Cancel;
function WaitFor: TWaitResult;
function TaskResult: PtrUInt;
end;

TRunnableFuncNested = class(TInterfacedObject, IutlThreadPoolRunnable)
private
fProc: TAsyncFuncNested;
public
constructor Create(Proc: TAsyncFuncNested);
function Execute: PtrUInt;
end;

TRunnableFuncParamNested = class(TInterfacedObject, IutlThreadPoolRunnable)
private
fProc: TAsyncFuncParamNested;
fParam: PtrUInt;
public
constructor Create(Proc: TAsyncFuncParamNested; Param: PtrUInt);
function Execute: PtrUInt;
end;

TWorker = class(TThread)
private
fOwner: TutlThreadPool;
public
constructor Create(Owner: TutlThreadPool);
destructor Destroy; override;
procedure Execute; override;
end;

TWorkerThreadList = specialize TutlList<TThread>;
TWorkItemList = specialize TutlList<TQueueItem>;
private
fTerminating: Boolean;
fThreads: TWorkerThreadList;
fThreadMgmtSection: TCriticalSection;
fQueue: TWorkItemList;
fQueueSection: TutlSpinLock;
fNewItemEvent: TEvent;
procedure SetMaxThreads(AValue: integer);
procedure Shutdown;
function GetMaxThreads: integer;
protected
function FetchWork: TQueueItem;
procedure ThreadWaitForEvent;
public
constructor Create;
destructor Destroy; override;
// TODO: attention: if tasks queue more tasks and then wait for them, recursing more than MaxThreads levels is a deadlock ATM. This will be fixed later
property MaxThreads: integer read GetMaxThreads write SetMaxThreads;
function Queue(const Task: IutlThreadPoolRunnable): IutlThreadPoolWaitable; overload;
function Queue(const Task: TAsyncFuncNested): IutlThreadPoolWaitable;
function Queue(const Task: TAsyncFuncParamNested; const Param: PtrUInt): IutlThreadPoolWaitable;
end;

implementation

{ TAutoResetEvent }

constructor TAutoResetEvent.Create(aInitial: boolean);
begin
inherited Create(Nil, false, aInitial, '');
end;

{ TutlSpinLock }

constructor TutlSpinLock.Create;
begin
inherited Create;
fLock:= 0;
fLockReused:= 0;
end;

destructor TutlSpinLock.Destroy;
begin
Enter;
inherited Destroy;
end;

procedure TutlSpinLock.Enter;
var
ti: dword;
begin
ti:= ThreadID;
if ti = InterlockedCompareExchange(fLock, ti, ti) then begin
{
The lock is already held by this thread. This means it cannot be modified by a concurrent
operation (assuming Enter/Leave bracket correctly), and we can act non-atomar on other variables.
}
inc(fLockReused);
end else begin
while InterlockedCompareExchange(fLock, ti, 0) <> 0 do ;
end;
end;

procedure TutlSpinLock.Leave;
var
ti: DWord;
begin
ti:= ThreadID;
// Unlock only if we hold the lock
if ti = InterlockedCompareExchange(fLock, ti, ti) then begin
// our lock, but we haven't yet done anything (note the above is essentially a threadsafe CMP if successful)
if fLockReused = 0 then
InterLockedExchange(fLock, 0) // normal lock
else
dec(fLockReused); // nested locks
end;
end;

{ TutlThreadPool }

constructor TutlThreadPool.Create;
begin
inherited Create;
fTerminating:= false;
fQueue:= TWorkItemList.Create(False);
fQueueSection:= TutlSpinLock.Create;
fThreads:= TWorkerThreadList.Create(true);
fThreadMgmtSection:= TCriticalSection.Create;
fNewItemEvent:= TEvent.Create(nil, false, true, '');
MaxThreads:= 2;
end;

destructor TutlThreadPool.Destroy;
begin
Shutdown;
FreeAndNil(fNewItemEvent);
FreeAndNil(fThreads);
FreeAndNil(fThreadMgmtSection);
FreeAndNil(fQueue);
FreeAndNil(fQueueSection);
inherited Destroy;
end;

function TutlThreadPool.GetMaxThreads: integer;
begin
Result:= fThreads.Count;
end;

procedure TutlThreadPool.SetMaxThreads(AValue: integer);
var
i: integer;
begin
fThreadMgmtSection.Enter;
try
if MaxThreads=AValue then Exit;
if AValue < MaxThreads then begin
for i:= MaxThreads - 1 downto AValue do begin
fThreads.Delete(i); // frees the item, which causes the thread to Terminate, Waitfor and Destroy
end;
end else begin
for i:= MaxThreads to AValue - 1 do begin
fThreads.Add(TWorker.Create(Self));
end;
end;
finally
fThreadMgmtSection.Leave;
end;
end;

procedure TutlThreadPool.Shutdown;
var
i: integer;
begin
fTerminating:= true;
// kill all threads
fThreadMgmtSection.Enter;
try
for i:= 0 to fThreads.Count - 1 do
fThreads[i].Terminate;
fNewItemEvent.SetEvent;
for i:= 0 to fThreads.Count - 1 do
fThreads[i].WaitFor;
fThreads.Clear;
finally
fThreadMgmtSection.Leave;
end;
// kill all remaining tasks and notifiy Waitables
fQueueSection.Enter;
try
for i:= 0 to fQueue.Count - 1 do begin
fQueue[i].Cancel;
fQueue[i]._Release;
end;
fQueue.Clear;
finally
fQueueSection.Leave;
end;
end;

function TutlThreadPool.Queue(const Task: IutlThreadPoolRunnable): IutlThreadPoolWaitable;
var
itm: TQueueItem;
begin
if fTerminating then
Exit(nil);

itm:= TQueueItem.Create(Task);
Result:= itm;
fQueueSection.Enter;
try
itm._AddRef;
fQueue.Add(itm);
fNewItemEvent.SetEvent;
finally
fQueueSection.Leave;
end;
end;

function TutlThreadPool.FetchWork:TQueueItem;
begin
if fTerminating then
Exit(nil);

fQueueSection.Enter;
try
if fQueue.Count > 0 then begin
Result:= fQueue[0];
fQueue.Delete(0);
end
else
Result:= nil;
finally
fQueueSection.Leave;
end;
end;

procedure TutlThreadPool.ThreadWaitForEvent;
begin
fNewItemEvent.WaitFor(INFINITE);
if fTerminating then // this one will soon leave .Execute, wake all others
fNewItemEvent.SetEvent;
end;

function TutlThreadPool.Queue(const Task: TAsyncFuncNested): IutlThreadPoolWaitable;
begin
Result:= Queue(TRunnableFuncNested.Create(Task));
end;

function TutlThreadPool.Queue(const Task: TAsyncFuncParamNested; const Param: PtrUInt): IutlThreadPoolWaitable;
begin
Result:= Queue(TRunnableFuncParamNested.Create(Task, Param));
end;

{ TutlThreadPool.TWorker }

constructor TutlThreadPool.TWorker.Create(Owner: TutlThreadPool);
begin
inherited Create(false);
fOwner:= Owner;
end;

destructor TutlThreadPool.TWorker.Destroy;
begin
Terminate;
WaitFor;
inherited Destroy;
end;

procedure TutlThreadPool.TWorker.Execute;
var
qi: TQueueItem;
begin
while not Terminated do begin
qi:= fOwner.FetchWork;
if Assigned(qi) then begin
qi.Execute;
qi._Release;
end else
fOwner.ThreadWaitForEvent;
end;
end;

{ TutlThreadPool.TQueueItem }

constructor TutlThreadPool.TQueueItem.Create(const aTask: IutlThreadPoolRunnable);
begin
inherited Create;
fDoneEvent:= TEvent.Create(nil, true, false, '');
fTask:= aTask;
end;

destructor TutlThreadPool.TQueueItem.Destroy;
begin
FreeAndNil(fDoneEvent);
inherited Destroy;
end;

function TutlThreadPool.TQueueItem.WaitFor: TWaitResult;
begin
Result:= fDoneEvent.WaitFor(INFINITE);
end;

procedure TutlThreadPool.TQueueItem.Execute;
begin
fResult:= fTask.Execute;
fDoneEvent.SetEvent;
end;

function TutlThreadPool.TQueueItem.TaskResult: PtrUInt;
begin
Result:= fResult;
end;

procedure TutlThreadPool.TQueueItem.Cancel;
begin
fResult:= 0;
fDoneEvent.SetEvent;
end;

{ TutlThreadPool.TRunnableFuncNested }

constructor TutlThreadPool.TRunnableFuncNested.Create(Proc: TAsyncFuncNested);
begin
inherited Create;
fProc:= Proc;
end;

function TutlThreadPool.TRunnableFuncNested.Execute: PtrUInt;
begin
Result:= fProc();
end;

{ TutlThreadPool.TRunnableFuncParamNested }

constructor TutlThreadPool.TRunnableFuncParamNested.Create(
Proc: TAsyncFuncParamNested; Param: PtrUInt);
begin
inherited Create;
fProc:= Proc;
fParam:= Param;
end;

function TutlThreadPool.TRunnableFuncParamNested.Execute: PtrUInt;
begin
Result:= fProc(fParam);
end;

end.


Načítá se…
Zrušit
Uložit