unit uutlThreads; { Package: Utils Prefix: utl - UTiLs Beschreibung: diese Unit implementiert Hilfsklassen für multithreaded Anwendungen } {$mode objfpc}{$H+} {$modeswitch nestedprocvars} interface uses Classes, SysUtils, syncobjs, uutlGenerics, uutlSyncObjs; type IutlThreadPoolRunnable = interface ['{DFEC1832-30DA-4D12-A321-7762057D2D17}'] function Execute: PtrUInt; end; IutlThreadPoolWaitable = interface ['{8FEB8A6B-B659-4A48-929F-650CE7455BC7}'] function WaitFor: TWaitResult; function TaskResult: PtrUInt; end; TAsyncFuncNested = function : PtrUInt is nested; TAsyncFuncParamNested = function (Param: PtrUInt) : PtrUInt is nested; TutlThreadPool = class private type TQueueItem = class(TInterfacedObject, IutlThreadPoolWaitable) private fTask: IutlThreadPoolRunnable; fDoneEvent: TEvent; fResult: PtrUInt; public constructor Create(const aTask: IutlThreadPoolRunnable); destructor Destroy; override; procedure Execute; procedure Cancel; function WaitFor: TWaitResult; function TaskResult: PtrUInt; end; TRunnableFuncNested = class(TInterfacedObject, IutlThreadPoolRunnable) private fProc: TAsyncFuncNested; public constructor Create(Proc: TAsyncFuncNested); function Execute: PtrUInt; end; TRunnableFuncParamNested = class(TInterfacedObject, IutlThreadPoolRunnable) private fProc: TAsyncFuncParamNested; fParam: PtrUInt; public constructor Create(Proc: TAsyncFuncParamNested; Param: PtrUInt); function Execute: PtrUInt; end; TWorker = class(TThread) private fOwner: TutlThreadPool; public constructor Create(Owner: TutlThreadPool); destructor Destroy; override; procedure Execute; override; procedure BlockedByQueueWait(const IsBlocked: boolean); end; TWorkerThreadList = specialize TutlList; TWorkItemList = specialize TutlList; private fTerminating: Boolean; fThreads: TWorkerThreadList; fUnblockedThreads: LongInt; fThreadMgmtSection: TCriticalSection; fQueue: TWorkItemList; fQueueSection: TutlSpinLock; fNewItemEvent: TEvent; procedure SetMaxThreads(AValue: integer); procedure Shutdown; function GetMaxThreads: integer; protected function FetchWork: TQueueItem; procedure ThreadWaitForEvent; public constructor Create(const ThreadCount: integer = 0); destructor Destroy; override; // TODO: attention: if tasks queue more tasks and then wait for them, recursing more than MaxThreads levels is a deadlock ATM. This will be fixed later property MaxThreads: integer read GetMaxThreads write SetMaxThreads; function Queue(const Task: IutlThreadPoolRunnable): IutlThreadPoolWaitable; overload; function Queue(const Task: TAsyncFuncNested): IutlThreadPoolWaitable; function Queue(const Task: TAsyncFuncParamNested; const Param: PtrUInt): IutlThreadPoolWaitable; end; implementation { TutlThreadPool } constructor TutlThreadPool.Create(const ThreadCount: integer); begin inherited Create; fTerminating:= false; fQueue:= TWorkItemList.Create(False); fQueueSection:= TutlSpinLock.Create; fThreads:= TWorkerThreadList.Create(true); fThreadMgmtSection:= TCriticalSection.Create; fNewItemEvent:= TEvent.Create(nil, false, true, ''); fUnblockedThreads:= 0; if ThreadCount = 0 then MaxThreads:= TThread.ProcessorCount + 2 else MaxThreads:= ThreadCount; end; destructor TutlThreadPool.Destroy; begin Shutdown; FreeAndNil(fNewItemEvent); FreeAndNil(fThreads); FreeAndNil(fThreadMgmtSection); FreeAndNil(fQueue); FreeAndNil(fQueueSection); inherited Destroy; end; function TutlThreadPool.GetMaxThreads: integer; begin Result:= fThreads.Count; end; procedure TutlThreadPool.SetMaxThreads(AValue: integer); var i: integer; begin fThreadMgmtSection.Enter; try if MaxThreads=AValue then Exit; if AValue < MaxThreads then begin for i:= MaxThreads - 1 downto AValue do begin InterLockedDecrement(fUnblockedThreads); fThreads.Delete(i); // frees the item, which causes the thread to Terminate, Waitfor and Destroy end; end else begin for i:= MaxThreads to AValue - 1 do begin fThreads.Add(TWorker.Create(Self)); InterLockedIncrement(fUnblockedThreads); end; end; finally fThreadMgmtSection.Leave; end; end; procedure TutlThreadPool.Shutdown; var i: integer; begin // don't give out any new Tasks fTerminating:= true; fThreadMgmtSection.Enter; try // kill all threads for i:= 0 to fThreads.Count - 1 do fThreads[i].Terminate; // some thready may be waiting for FetchWork or a Task that they recursively spawned fNewItemEvent.SetEvent; // kill all remaining tasks and notifiy Waitables fQueueSection.Enter; try for i:= 0 to fQueue.Count - 1 do begin fQueue[i].Cancel; fQueue[i]._Release; end; fQueue.Clear; finally fQueueSection.Leave; end; // every possible event is set now, wait for workers to finish for i:= 0 to fThreads.Count - 1 do fThreads[i].WaitFor; fThreads.Clear; finally fThreadMgmtSection.Leave; end; end; function TutlThreadPool.Queue(const Task: IutlThreadPoolRunnable): IutlThreadPoolWaitable; var itm: TQueueItem; begin if fTerminating then Exit(nil); itm:= TQueueItem.Create(Task); Result:= itm; // do we have a thread, or is everyone waiting for a task? if InterlockedCompareExchange(fUnblockedThreads, 0, 0) = 0 then begin // waiting for task, prevent deadlock by running this Task in the current thread itm.Execute; // Result has 1 Ref, so the caller frees the QueueItem end else begin // normal operation fQueueSection.Enter; try itm._AddRef; fQueue.Add(itm); fNewItemEvent.SetEvent; finally fQueueSection.Leave; end; end; end; function TutlThreadPool.FetchWork:TQueueItem; begin if fTerminating then Exit(nil); fQueueSection.Enter; try if fQueue.Count > 0 then begin Result:= fQueue[0]; fQueue.Delete(0); end else Result:= nil; finally fQueueSection.Leave; end; end; procedure TutlThreadPool.ThreadWaitForEvent; begin fNewItemEvent.WaitFor(INFINITE); if fTerminating then // this one will soon leave .Execute, wake all others fNewItemEvent.SetEvent; end; function TutlThreadPool.Queue(const Task: TAsyncFuncNested): IutlThreadPoolWaitable; begin Result:= Queue(TRunnableFuncNested.Create(Task)); end; function TutlThreadPool.Queue(const Task: TAsyncFuncParamNested; const Param: PtrUInt): IutlThreadPoolWaitable; begin Result:= Queue(TRunnableFuncParamNested.Create(Task, Param)); end; { TutlThreadPool.TWorker } constructor TutlThreadPool.TWorker.Create(Owner: TutlThreadPool); begin inherited Create(false); fOwner:= Owner; end; destructor TutlThreadPool.TWorker.Destroy; begin Terminate; WaitFor; inherited Destroy; end; procedure TutlThreadPool.TWorker.Execute; var qi: TQueueItem; begin while not Terminated do begin qi:= fOwner.FetchWork; if Assigned(qi) then begin qi.Execute; qi._Release; end else fOwner.ThreadWaitForEvent; end; end; procedure TutlThreadPool.TWorker.BlockedByQueueWait(const IsBlocked: boolean); begin if IsBlocked then InterLockedDecrement(fOwner.fUnblockedThreads) else InterLockedIncrement(fOwner.fUnblockedThreads); end; { TutlThreadPool.TQueueItem } constructor TutlThreadPool.TQueueItem.Create(const aTask: IutlThreadPoolRunnable); begin inherited Create; fDoneEvent:= TEvent.Create(nil, true, false, ''); fTask:= aTask; end; destructor TutlThreadPool.TQueueItem.Destroy; begin FreeAndNil(fDoneEvent); inherited Destroy; end; function TutlThreadPool.TQueueItem.WaitFor: TWaitResult; var ct: TThread; ctw: TutlThreadPool.TWorker; begin ct:= TThread.CurrentThread; if ct is TutlThreadPool.TWorker then begin ctw:= TutlThreadPool.TWorker(ct); ctw.BlockedByQueueWait(true); try Result:= fDoneEvent.WaitFor(INFINITE); finally ctw.BlockedByQueueWait(false); end; end else Result:= fDoneEvent.WaitFor(INFINITE); end; procedure TutlThreadPool.TQueueItem.Execute; begin fResult:= fTask.Execute; fDoneEvent.SetEvent; end; function TutlThreadPool.TQueueItem.TaskResult: PtrUInt; begin Result:= fResult; end; procedure TutlThreadPool.TQueueItem.Cancel; begin fResult:= 0; fDoneEvent.SetEvent; end; { TutlThreadPool.TRunnableFuncNested } constructor TutlThreadPool.TRunnableFuncNested.Create(Proc: TAsyncFuncNested); begin inherited Create; fProc:= Proc; end; function TutlThreadPool.TRunnableFuncNested.Execute: PtrUInt; begin Result:= fProc(); end; { TutlThreadPool.TRunnableFuncParamNested } constructor TutlThreadPool.TRunnableFuncParamNested.Create( Proc: TAsyncFuncParamNested; Param: PtrUInt); begin inherited Create; fProc:= Proc; fParam:= Param; end; function TutlThreadPool.TRunnableFuncParamNested.Execute: PtrUInt; begin Result:= fProc(fParam); end; end.