diff --git a/uutlThreads.pas b/uutlThreads.pas index 5f153c4..80941c3 100644 --- a/uutlThreads.pas +++ b/uutlThreads.pas @@ -84,6 +84,7 @@ type constructor Create(Owner: TutlThreadPool); destructor Destroy; override; procedure Execute; override; + procedure BlockedByQueueWait(const IsBlocked: boolean); end; TWorkerThreadList = specialize TutlList; @@ -91,6 +92,7 @@ type private fTerminating: Boolean; fThreads: TWorkerThreadList; + fUnblockedThreads: LongInt; fThreadMgmtSection: TCriticalSection; fQueue: TWorkItemList; fQueueSection: TutlSpinLock; @@ -102,7 +104,7 @@ type function FetchWork: TQueueItem; procedure ThreadWaitForEvent; public - constructor Create; + constructor Create(const ThreadCount: integer = 0); destructor Destroy; override; // TODO: attention: if tasks queue more tasks and then wait for them, recursing more than MaxThreads levels is a deadlock ATM. This will be fixed later property MaxThreads: integer read GetMaxThreads write SetMaxThreads; @@ -168,7 +170,7 @@ end; { TutlThreadPool } -constructor TutlThreadPool.Create; +constructor TutlThreadPool.Create(const ThreadCount: integer); begin inherited Create; fTerminating:= false; @@ -177,7 +179,11 @@ begin fThreads:= TWorkerThreadList.Create(true); fThreadMgmtSection:= TCriticalSection.Create; fNewItemEvent:= TEvent.Create(nil, false, true, ''); - MaxThreads:= 2; + fUnblockedThreads:= 0; + if ThreadCount = 0 then + MaxThreads:= TThread.ProcessorCount + 2 + else + MaxThreads:= ThreadCount; end; destructor TutlThreadPool.Destroy; @@ -205,11 +211,13 @@ begin if MaxThreads=AValue then Exit; if AValue < MaxThreads then begin for i:= MaxThreads - 1 downto AValue do begin + InterLockedDecrement(fUnblockedThreads); fThreads.Delete(i); // frees the item, which causes the thread to Terminate, Waitfor and Destroy end; end else begin for i:= MaxThreads to AValue - 1 do begin fThreads.Add(TWorker.Create(Self)); + InterLockedIncrement(fUnblockedThreads); end; end; finally @@ -221,30 +229,33 @@ procedure TutlThreadPool.Shutdown; var i: integer; begin + // don't give out any new Tasks fTerminating:= true; - // kill all threads fThreadMgmtSection.Enter; try + // kill all threads for i:= 0 to fThreads.Count - 1 do fThreads[i].Terminate; + // some thready may be waiting for FetchWork or a Task that they recursively spawned fNewItemEvent.SetEvent; + // kill all remaining tasks and notifiy Waitables + fQueueSection.Enter; + try + for i:= 0 to fQueue.Count - 1 do begin + fQueue[i].Cancel; + fQueue[i]._Release; + end; + fQueue.Clear; + finally + fQueueSection.Leave; + end; + // every possible event is set now, wait for workers to finish for i:= 0 to fThreads.Count - 1 do fThreads[i].WaitFor; fThreads.Clear; finally fThreadMgmtSection.Leave; end; - // kill all remaining tasks and notifiy Waitables - fQueueSection.Enter; - try - for i:= 0 to fQueue.Count - 1 do begin - fQueue[i].Cancel; - fQueue[i]._Release; - end; - fQueue.Clear; - finally - fQueueSection.Leave; - end; end; function TutlThreadPool.Queue(const Task: IutlThreadPoolRunnable): IutlThreadPoolWaitable; @@ -256,13 +267,21 @@ begin itm:= TQueueItem.Create(Task); Result:= itm; - fQueueSection.Enter; - try - itm._AddRef; - fQueue.Add(itm); - fNewItemEvent.SetEvent; - finally - fQueueSection.Leave; + // do we have a thread, or is everyone waiting for a task? + if InterlockedCompareExchange(fUnblockedThreads, 0, 0) = 0 then begin + // waiting for task, prevent deadlock by running this Task in the current thread + itm.Execute; + // Result has 1 Ref, so the caller frees the QueueItem + end else begin + // normal operation + fQueueSection.Enter; + try + itm._AddRef; + fQueue.Add(itm); + fNewItemEvent.SetEvent; + finally + fQueueSection.Leave; + end; end; end; @@ -330,6 +349,14 @@ begin end; end; +procedure TutlThreadPool.TWorker.BlockedByQueueWait(const IsBlocked: boolean); +begin + if IsBlocked then + InterLockedDecrement(fOwner.fUnblockedThreads) + else + InterLockedIncrement(fOwner.fUnblockedThreads); +end; + { TutlThreadPool.TQueueItem } constructor TutlThreadPool.TQueueItem.Create(const aTask: IutlThreadPoolRunnable); @@ -346,8 +373,21 @@ begin end; function TutlThreadPool.TQueueItem.WaitFor: TWaitResult; +var + ct: TThread; + ctw: TutlThreadPool.TWorker; begin - Result:= fDoneEvent.WaitFor(INFINITE); + ct:= TThread.CurrentThread; + if ct is TutlThreadPool.TWorker then begin + ctw:= TutlThreadPool.TWorker(ct); + ctw.BlockedByQueueWait(true); + try + Result:= fDoneEvent.WaitFor(INFINITE); + finally + ctw.BlockedByQueueWait(false); + end; + end else + Result:= fDoneEvent.WaitFor(INFINITE); end; procedure TutlThreadPool.TQueueItem.Execute;