diff --git a/uutlGenerics.pas b/uutlGenerics.pas index 70f0355..32396e7 100644 --- a/uutlGenerics.pas +++ b/uutlGenerics.pas @@ -10,7 +10,7 @@ unit uutlGenerics; interface uses - Classes, SysUtils, typinfo; + Classes, SysUtils, typinfo, syncobjs; type //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -400,12 +400,33 @@ type class function Values: TValueArray; end; +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + generic TutlRingBuffer = class + private + fAborted: boolean; + fData: packed array of T; + fDataLen: Integer; + fDataSize: integer; + fFillState: integer; + fWritePtr, fReadPtr: integer; + fWrittenEvent, + fReadEvent: TEvent; + public + constructor Create(const Elements: Integer); + destructor Destroy; override; + function Read(Buf: Pointer; Items: integer; BlockUntilAvail: boolean): integer; + function Write(Buf: Pointer; Items: integer; BlockUntilDone: boolean): integer; + procedure BreakPipe; + property FillState: Integer read fFillState; + property Size: integer read fDataLen; + end; + function utlFreeOrFinalize(var obj; const aTypeInfo: PTypeInfo; const aFreeObj: Boolean = true): Boolean; implementation uses - uutlExceptions; + uutlExceptions, uutlThreads; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //Helper//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -1531,5 +1552,106 @@ begin end; end; +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +//TutlRingBuffer//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +constructor TutlRingBuffer.Create(const Elements: Integer); +begin + inherited Create; + fAborted:= false; + fDataLen:= Elements; + fDataSize:= SizeOf(T); + SetLength(fData, fDataLen); + fWritePtr:= 1; + fReadPtr:= 0; + fFillState:= 0; + fReadEvent:= TAutoResetEvent.Create; + fWrittenEvent:= TAutoResetEvent.Create; +end; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +destructor TutlRingBuffer.Destroy; +begin + BreakPipe; + FreeAndNil(fReadEvent); + FreeAndNil(fWrittenEvent); + SetLength(fData, 0); + inherited Destroy; +end; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +function TutlRingBuffer.Read(Buf: Pointer; Items: integer; BlockUntilAvail: boolean): integer; +var + wp, c, r: Integer; +begin + Result:= 0; + while Items > 0 do begin + if fAborted then + exit; + + InterLockedExchange(wp{%H-}, fWritePtr); + r:= (fReadPtr + 1) mod fDataLen; + if wp < r then + wp:= fDataLen; + c:= wp - r; + if c > Items then + c:= Items; + if c > 0 then begin + Move(fData[r], Buf^, c * fDataSize); + Dec(Items, c); + inc(Result, c); + dec(fFillState, c); + inc(PByte(Buf), c * fDataSize); + InterLockedExchange(fReadPtr, (fReadPtr + c) mod fDataLen); + fReadEvent.SetEvent; + end else begin + if not BlockUntilAvail then + break; + fWrittenEvent.WaitFor(INFINITE); + end; + end; +end; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +function TutlRingBuffer.Write(Buf: Pointer; Items: integer; BlockUntilDone: boolean): integer; +var + rp, c: integer; +begin + Result:= 0; + while Items > 0 do begin + if fAborted then + exit; + + InterLockedExchange(rp{%H-}, fReadPtr); + if rp < fWritePtr then + rp:= fDataLen; + c:= rp - fWritePtr; + if c > Items then + c:= Items; + if c > 0 then begin + Move(Buf^, fData[fWritePtr], c * fDataSize); + dec(Items, c); + inc(Result, c); + inc(fFillState, c); + inc(PByte(Buf), c * fDataSize); + InterLockedExchange(fWritePtr, (fWritePtr + c) mod fDataLen); + fWrittenEvent.SetEvent; + end else begin + if not BlockUntilDone then + Break; + fReadEvent.WaitFor(INFINITE); + end; + end; +end; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +procedure TutlRingBuffer.BreakPipe; +begin + fAborted:= true; + fWrittenEvent.SetEvent; + fReadEvent.SetEvent; +end; + + end. diff --git a/uutlThreads.pas b/uutlThreads.pas new file mode 100644 index 0000000..5f153c4 --- /dev/null +++ b/uutlThreads.pas @@ -0,0 +1,399 @@ +unit uutlThreads; + +{ Package: Utils + Prefix: utl - UTiLs + Beschreibung: diese Unit implementiert Hilfsklassen für multithreaded Anwendungen } + +{$mode objfpc}{$H+} +{$modeswitch nestedprocvars} + +interface + +uses + Classes, SysUtils, syncobjs, uutlGenerics; + +type + TAutoResetEvent = class(TEvent) + public + constructor Create(aInitial: boolean = false); + end; + // aliased to stay in LCL naming scheme for TSimpleEvent + TutlAutoResetEvent = TAutoResetEvent; + + TutlSpinLock = class + private + fLock: DWord; + fLockReused: integer; + public + constructor Create; + destructor Destroy; override; + procedure Enter; + procedure Leave; + end; + + IutlThreadPoolRunnable = interface + ['{DFEC1832-30DA-4D12-A321-7762057D2D17}'] + function Execute: PtrUInt; + end; + IutlThreadPoolWaitable = interface + ['{8FEB8A6B-B659-4A48-929F-650CE7455BC7}'] + function WaitFor: TWaitResult; + function TaskResult: PtrUInt; + end; + + TAsyncFuncNested = function : PtrUInt is nested; + TAsyncFuncParamNested = function (Param: PtrUInt) : PtrUInt is nested; + + TutlThreadPool = class + private type + TQueueItem = class(TInterfacedObject, IutlThreadPoolWaitable) + private + fTask: IutlThreadPoolRunnable; + fDoneEvent: TEvent; + fResult: PtrUInt; + public + constructor Create(const aTask: IutlThreadPoolRunnable); + destructor Destroy; override; + procedure Execute; + procedure Cancel; + function WaitFor: TWaitResult; + function TaskResult: PtrUInt; + end; + + TRunnableFuncNested = class(TInterfacedObject, IutlThreadPoolRunnable) + private + fProc: TAsyncFuncNested; + public + constructor Create(Proc: TAsyncFuncNested); + function Execute: PtrUInt; + end; + + TRunnableFuncParamNested = class(TInterfacedObject, IutlThreadPoolRunnable) + private + fProc: TAsyncFuncParamNested; + fParam: PtrUInt; + public + constructor Create(Proc: TAsyncFuncParamNested; Param: PtrUInt); + function Execute: PtrUInt; + end; + + TWorker = class(TThread) + private + fOwner: TutlThreadPool; + public + constructor Create(Owner: TutlThreadPool); + destructor Destroy; override; + procedure Execute; override; + end; + + TWorkerThreadList = specialize TutlList; + TWorkItemList = specialize TutlList; + private + fTerminating: Boolean; + fThreads: TWorkerThreadList; + fThreadMgmtSection: TCriticalSection; + fQueue: TWorkItemList; + fQueueSection: TutlSpinLock; + fNewItemEvent: TEvent; + procedure SetMaxThreads(AValue: integer); + procedure Shutdown; + function GetMaxThreads: integer; + protected + function FetchWork: TQueueItem; + procedure ThreadWaitForEvent; + public + constructor Create; + destructor Destroy; override; + // TODO: attention: if tasks queue more tasks and then wait for them, recursing more than MaxThreads levels is a deadlock ATM. This will be fixed later + property MaxThreads: integer read GetMaxThreads write SetMaxThreads; + function Queue(const Task: IutlThreadPoolRunnable): IutlThreadPoolWaitable; overload; + function Queue(const Task: TAsyncFuncNested): IutlThreadPoolWaitable; + function Queue(const Task: TAsyncFuncParamNested; const Param: PtrUInt): IutlThreadPoolWaitable; + end; + +implementation + +{ TAutoResetEvent } + +constructor TAutoResetEvent.Create(aInitial: boolean); +begin + inherited Create(Nil, false, aInitial, ''); +end; + +{ TutlSpinLock } + +constructor TutlSpinLock.Create; +begin + inherited Create; + fLock:= 0; + fLockReused:= 0; +end; + +destructor TutlSpinLock.Destroy; +begin + Enter; + inherited Destroy; +end; + +procedure TutlSpinLock.Enter; +var + ti: dword; +begin + ti:= ThreadID; + if ti = InterlockedCompareExchange(fLock, ti, ti) then begin + { + The lock is already held by this thread. This means it cannot be modified by a concurrent + operation (assuming Enter/Leave bracket correctly), and we can act non-atomar on other variables. + } + inc(fLockReused); + end else begin + while InterlockedCompareExchange(fLock, ti, 0) <> 0 do ; + end; +end; + +procedure TutlSpinLock.Leave; +var + ti: DWord; +begin + ti:= ThreadID; + // Unlock only if we hold the lock + if ti = InterlockedCompareExchange(fLock, ti, ti) then begin + // our lock, but we haven't yet done anything (note the above is essentially a threadsafe CMP if successful) + if fLockReused = 0 then + InterLockedExchange(fLock, 0) // normal lock + else + dec(fLockReused); // nested locks + end; +end; + +{ TutlThreadPool } + +constructor TutlThreadPool.Create; +begin + inherited Create; + fTerminating:= false; + fQueue:= TWorkItemList.Create(False); + fQueueSection:= TutlSpinLock.Create; + fThreads:= TWorkerThreadList.Create(true); + fThreadMgmtSection:= TCriticalSection.Create; + fNewItemEvent:= TEvent.Create(nil, false, true, ''); + MaxThreads:= 2; +end; + +destructor TutlThreadPool.Destroy; +begin + Shutdown; + FreeAndNil(fNewItemEvent); + FreeAndNil(fThreads); + FreeAndNil(fThreadMgmtSection); + FreeAndNil(fQueue); + FreeAndNil(fQueueSection); + inherited Destroy; +end; + +function TutlThreadPool.GetMaxThreads: integer; +begin + Result:= fThreads.Count; +end; + +procedure TutlThreadPool.SetMaxThreads(AValue: integer); +var + i: integer; +begin + fThreadMgmtSection.Enter; + try + if MaxThreads=AValue then Exit; + if AValue < MaxThreads then begin + for i:= MaxThreads - 1 downto AValue do begin + fThreads.Delete(i); // frees the item, which causes the thread to Terminate, Waitfor and Destroy + end; + end else begin + for i:= MaxThreads to AValue - 1 do begin + fThreads.Add(TWorker.Create(Self)); + end; + end; + finally + fThreadMgmtSection.Leave; + end; +end; + +procedure TutlThreadPool.Shutdown; +var + i: integer; +begin + fTerminating:= true; + // kill all threads + fThreadMgmtSection.Enter; + try + for i:= 0 to fThreads.Count - 1 do + fThreads[i].Terminate; + fNewItemEvent.SetEvent; + for i:= 0 to fThreads.Count - 1 do + fThreads[i].WaitFor; + fThreads.Clear; + finally + fThreadMgmtSection.Leave; + end; + // kill all remaining tasks and notifiy Waitables + fQueueSection.Enter; + try + for i:= 0 to fQueue.Count - 1 do begin + fQueue[i].Cancel; + fQueue[i]._Release; + end; + fQueue.Clear; + finally + fQueueSection.Leave; + end; +end; + +function TutlThreadPool.Queue(const Task: IutlThreadPoolRunnable): IutlThreadPoolWaitable; +var + itm: TQueueItem; +begin + if fTerminating then + Exit(nil); + + itm:= TQueueItem.Create(Task); + Result:= itm; + fQueueSection.Enter; + try + itm._AddRef; + fQueue.Add(itm); + fNewItemEvent.SetEvent; + finally + fQueueSection.Leave; + end; +end; + +function TutlThreadPool.FetchWork:TQueueItem; +begin + if fTerminating then + Exit(nil); + + fQueueSection.Enter; + try + if fQueue.Count > 0 then begin + Result:= fQueue[0]; + fQueue.Delete(0); + end + else + Result:= nil; + finally + fQueueSection.Leave; + end; +end; + +procedure TutlThreadPool.ThreadWaitForEvent; +begin + fNewItemEvent.WaitFor(INFINITE); + if fTerminating then // this one will soon leave .Execute, wake all others + fNewItemEvent.SetEvent; +end; + +function TutlThreadPool.Queue(const Task: TAsyncFuncNested): IutlThreadPoolWaitable; +begin + Result:= Queue(TRunnableFuncNested.Create(Task)); +end; + +function TutlThreadPool.Queue(const Task: TAsyncFuncParamNested; const Param: PtrUInt): IutlThreadPoolWaitable; +begin + Result:= Queue(TRunnableFuncParamNested.Create(Task, Param)); +end; + +{ TutlThreadPool.TWorker } + +constructor TutlThreadPool.TWorker.Create(Owner: TutlThreadPool); +begin + inherited Create(false); + fOwner:= Owner; +end; + +destructor TutlThreadPool.TWorker.Destroy; +begin + Terminate; + WaitFor; + inherited Destroy; +end; + +procedure TutlThreadPool.TWorker.Execute; +var + qi: TQueueItem; +begin + while not Terminated do begin + qi:= fOwner.FetchWork; + if Assigned(qi) then begin + qi.Execute; + qi._Release; + end else + fOwner.ThreadWaitForEvent; + end; +end; + +{ TutlThreadPool.TQueueItem } + +constructor TutlThreadPool.TQueueItem.Create(const aTask: IutlThreadPoolRunnable); +begin + inherited Create; + fDoneEvent:= TEvent.Create(nil, true, false, ''); + fTask:= aTask; +end; + +destructor TutlThreadPool.TQueueItem.Destroy; +begin + FreeAndNil(fDoneEvent); + inherited Destroy; +end; + +function TutlThreadPool.TQueueItem.WaitFor: TWaitResult; +begin + Result:= fDoneEvent.WaitFor(INFINITE); +end; + +procedure TutlThreadPool.TQueueItem.Execute; +begin + fResult:= fTask.Execute; + fDoneEvent.SetEvent; +end; + +function TutlThreadPool.TQueueItem.TaskResult: PtrUInt; +begin + Result:= fResult; +end; + +procedure TutlThreadPool.TQueueItem.Cancel; +begin + fResult:= 0; + fDoneEvent.SetEvent; +end; + +{ TutlThreadPool.TRunnableFuncNested } + +constructor TutlThreadPool.TRunnableFuncNested.Create(Proc: TAsyncFuncNested); +begin + inherited Create; + fProc:= Proc; +end; + +function TutlThreadPool.TRunnableFuncNested.Execute: PtrUInt; +begin + Result:= fProc(); +end; + +{ TutlThreadPool.TRunnableFuncParamNested } + +constructor TutlThreadPool.TRunnableFuncParamNested.Create( + Proc: TAsyncFuncParamNested; Param: PtrUInt); +begin + inherited Create; + fProc:= Proc; + fParam:= Param; +end; + +function TutlThreadPool.TRunnableFuncParamNested.Execute: PtrUInt; +begin + Result:= fProc(fParam); +end; + +end. +