You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

369 lines
9.0 KiB

  1. unit uutlThreads;
  2. { Package: Utils
  3. Prefix: utl - UTiLs
  4. Beschreibung: diese Unit implementiert Hilfsklassen für multithreaded Anwendungen }
  5. {$mode objfpc}{$H+}
  6. {$modeswitch nestedprocvars}
  7. interface
  8. uses
  9. Classes, SysUtils, syncobjs, uutlGenerics, uutlSyncObjs;
  10. type
  11. IutlThreadPoolRunnable = interface
  12. ['{DFEC1832-30DA-4D12-A321-7762057D2D17}']
  13. function Execute: PtrUInt;
  14. end;
  15. IutlThreadPoolWaitable = interface
  16. ['{8FEB8A6B-B659-4A48-929F-650CE7455BC7}']
  17. function WaitFor: TWaitResult;
  18. function TaskResult: PtrUInt;
  19. end;
  20. TAsyncFuncNested = function : PtrUInt is nested;
  21. TAsyncFuncParamNested = function (Param: PtrUInt) : PtrUInt is nested;
  22. TutlThreadPool = class
  23. private type
  24. TQueueItem = class(TInterfacedObject, IutlThreadPoolWaitable)
  25. private
  26. fTask: IutlThreadPoolRunnable;
  27. fDoneEvent: TEvent;
  28. fResult: PtrUInt;
  29. public
  30. constructor Create(const aTask: IutlThreadPoolRunnable);
  31. destructor Destroy; override;
  32. procedure Execute;
  33. procedure Cancel;
  34. function WaitFor: TWaitResult;
  35. function TaskResult: PtrUInt;
  36. end;
  37. TRunnableFuncNested = class(TInterfacedObject, IutlThreadPoolRunnable)
  38. private
  39. fProc: TAsyncFuncNested;
  40. public
  41. constructor Create(Proc: TAsyncFuncNested);
  42. function Execute: PtrUInt;
  43. end;
  44. TRunnableFuncParamNested = class(TInterfacedObject, IutlThreadPoolRunnable)
  45. private
  46. fProc: TAsyncFuncParamNested;
  47. fParam: PtrUInt;
  48. public
  49. constructor Create(Proc: TAsyncFuncParamNested; Param: PtrUInt);
  50. function Execute: PtrUInt;
  51. end;
  52. TWorker = class(TThread)
  53. private
  54. fOwner: TutlThreadPool;
  55. public
  56. constructor Create(Owner: TutlThreadPool);
  57. destructor Destroy; override;
  58. procedure Execute; override;
  59. procedure BlockedByQueueWait(const IsBlocked: boolean);
  60. end;
  61. TWorkerThreadList = specialize TutlList<TThread>;
  62. TWorkItemList = specialize TutlList<TQueueItem>;
  63. private
  64. fTerminating: Boolean;
  65. fThreads: TWorkerThreadList;
  66. fUnblockedThreads: LongInt;
  67. fThreadMgmtSection: TCriticalSection;
  68. fQueue: TWorkItemList;
  69. fQueueSection: TutlSpinLock;
  70. fNewItemEvent: TEvent;
  71. procedure SetMaxThreads(AValue: integer);
  72. procedure Shutdown;
  73. function GetMaxThreads: integer;
  74. protected
  75. function FetchWork: TQueueItem;
  76. procedure ThreadWaitForEvent;
  77. public
  78. constructor Create(const ThreadCount: integer = 0);
  79. destructor Destroy; override;
  80. // TODO: attention: if tasks queue more tasks and then wait for them, recursing more than MaxThreads levels is a deadlock ATM. This will be fixed later
  81. property MaxThreads: integer read GetMaxThreads write SetMaxThreads;
  82. function Queue(const Task: IutlThreadPoolRunnable): IutlThreadPoolWaitable; overload;
  83. function Queue(const Task: TAsyncFuncNested): IutlThreadPoolWaitable;
  84. function Queue(const Task: TAsyncFuncParamNested; const Param: PtrUInt): IutlThreadPoolWaitable;
  85. end;
  86. implementation
  87. { TutlThreadPool }
  88. constructor TutlThreadPool.Create(const ThreadCount: integer);
  89. begin
  90. inherited Create;
  91. fTerminating:= false;
  92. fQueue:= TWorkItemList.Create(False);
  93. fQueueSection:= TutlSpinLock.Create;
  94. fThreads:= TWorkerThreadList.Create(true);
  95. fThreadMgmtSection:= TCriticalSection.Create;
  96. fNewItemEvent:= TEvent.Create(nil, false, true, '');
  97. fUnblockedThreads:= 0;
  98. if ThreadCount = 0 then
  99. MaxThreads:= TThread.ProcessorCount + 2
  100. else
  101. MaxThreads:= ThreadCount;
  102. end;
  103. destructor TutlThreadPool.Destroy;
  104. begin
  105. Shutdown;
  106. FreeAndNil(fNewItemEvent);
  107. FreeAndNil(fThreads);
  108. FreeAndNil(fThreadMgmtSection);
  109. FreeAndNil(fQueue);
  110. FreeAndNil(fQueueSection);
  111. inherited Destroy;
  112. end;
  113. function TutlThreadPool.GetMaxThreads: integer;
  114. begin
  115. Result:= fThreads.Count;
  116. end;
  117. procedure TutlThreadPool.SetMaxThreads(AValue: integer);
  118. var
  119. i: integer;
  120. begin
  121. fThreadMgmtSection.Enter;
  122. try
  123. if MaxThreads=AValue then Exit;
  124. if AValue < MaxThreads then begin
  125. for i:= MaxThreads - 1 downto AValue do begin
  126. InterLockedDecrement(fUnblockedThreads);
  127. fThreads.Delete(i); // frees the item, which causes the thread to Terminate, Waitfor and Destroy
  128. end;
  129. end else begin
  130. for i:= MaxThreads to AValue - 1 do begin
  131. fThreads.Add(TWorker.Create(Self));
  132. InterLockedIncrement(fUnblockedThreads);
  133. end;
  134. end;
  135. finally
  136. fThreadMgmtSection.Leave;
  137. end;
  138. end;
  139. procedure TutlThreadPool.Shutdown;
  140. var
  141. i: integer;
  142. begin
  143. // don't give out any new Tasks
  144. fTerminating:= true;
  145. fThreadMgmtSection.Enter;
  146. try
  147. // kill all threads
  148. for i:= 0 to fThreads.Count - 1 do
  149. fThreads[i].Terminate;
  150. // some thready may be waiting for FetchWork or a Task that they recursively spawned
  151. fNewItemEvent.SetEvent;
  152. // kill all remaining tasks and notifiy Waitables
  153. fQueueSection.Enter;
  154. try
  155. for i:= 0 to fQueue.Count - 1 do begin
  156. fQueue[i].Cancel;
  157. fQueue[i]._Release;
  158. end;
  159. fQueue.Clear;
  160. finally
  161. fQueueSection.Leave;
  162. end;
  163. // every possible event is set now, wait for workers to finish
  164. for i:= 0 to fThreads.Count - 1 do
  165. fThreads[i].WaitFor;
  166. fThreads.Clear;
  167. finally
  168. fThreadMgmtSection.Leave;
  169. end;
  170. end;
  171. function TutlThreadPool.Queue(const Task: IutlThreadPoolRunnable): IutlThreadPoolWaitable;
  172. var
  173. itm: TQueueItem;
  174. begin
  175. if fTerminating then
  176. Exit(nil);
  177. itm:= TQueueItem.Create(Task);
  178. Result:= itm;
  179. // do we have a thread, or is everyone waiting for a task?
  180. if InterlockedCompareExchange(fUnblockedThreads, 0, 0) = 0 then begin
  181. // waiting for task, prevent deadlock by running this Task in the current thread
  182. itm.Execute;
  183. // Result has 1 Ref, so the caller frees the QueueItem
  184. end else begin
  185. // normal operation
  186. fQueueSection.Enter;
  187. try
  188. itm._AddRef;
  189. fQueue.Add(itm);
  190. fNewItemEvent.SetEvent;
  191. finally
  192. fQueueSection.Leave;
  193. end;
  194. end;
  195. end;
  196. function TutlThreadPool.FetchWork:TQueueItem;
  197. begin
  198. if fTerminating then
  199. Exit(nil);
  200. fQueueSection.Enter;
  201. try
  202. if fQueue.Count > 0 then begin
  203. Result:= fQueue[0];
  204. fQueue.Delete(0);
  205. end
  206. else
  207. Result:= nil;
  208. finally
  209. fQueueSection.Leave;
  210. end;
  211. end;
  212. procedure TutlThreadPool.ThreadWaitForEvent;
  213. begin
  214. fNewItemEvent.WaitFor(INFINITE);
  215. if fTerminating then // this one will soon leave .Execute, wake all others
  216. fNewItemEvent.SetEvent;
  217. end;
  218. function TutlThreadPool.Queue(const Task: TAsyncFuncNested): IutlThreadPoolWaitable;
  219. begin
  220. Result:= Queue(TRunnableFuncNested.Create(Task));
  221. end;
  222. function TutlThreadPool.Queue(const Task: TAsyncFuncParamNested; const Param: PtrUInt): IutlThreadPoolWaitable;
  223. begin
  224. Result:= Queue(TRunnableFuncParamNested.Create(Task, Param));
  225. end;
  226. { TutlThreadPool.TWorker }
  227. constructor TutlThreadPool.TWorker.Create(Owner: TutlThreadPool);
  228. begin
  229. inherited Create(false);
  230. fOwner:= Owner;
  231. end;
  232. destructor TutlThreadPool.TWorker.Destroy;
  233. begin
  234. Terminate;
  235. WaitFor;
  236. inherited Destroy;
  237. end;
  238. procedure TutlThreadPool.TWorker.Execute;
  239. var
  240. qi: TQueueItem;
  241. begin
  242. while not Terminated do begin
  243. qi:= fOwner.FetchWork;
  244. if Assigned(qi) then begin
  245. qi.Execute;
  246. qi._Release;
  247. end else
  248. fOwner.ThreadWaitForEvent;
  249. end;
  250. end;
  251. procedure TutlThreadPool.TWorker.BlockedByQueueWait(const IsBlocked: boolean);
  252. begin
  253. if IsBlocked then
  254. InterLockedDecrement(fOwner.fUnblockedThreads)
  255. else
  256. InterLockedIncrement(fOwner.fUnblockedThreads);
  257. end;
  258. { TutlThreadPool.TQueueItem }
  259. constructor TutlThreadPool.TQueueItem.Create(const aTask: IutlThreadPoolRunnable);
  260. begin
  261. inherited Create;
  262. fDoneEvent:= TEvent.Create(nil, true, false, '');
  263. fTask:= aTask;
  264. end;
  265. destructor TutlThreadPool.TQueueItem.Destroy;
  266. begin
  267. FreeAndNil(fDoneEvent);
  268. inherited Destroy;
  269. end;
  270. function TutlThreadPool.TQueueItem.WaitFor: TWaitResult;
  271. var
  272. ct: TThread;
  273. ctw: TutlThreadPool.TWorker;
  274. begin
  275. ct:= TThread.CurrentThread;
  276. if ct is TutlThreadPool.TWorker then begin
  277. ctw:= TutlThreadPool.TWorker(ct);
  278. ctw.BlockedByQueueWait(true);
  279. try
  280. Result:= fDoneEvent.WaitFor(INFINITE);
  281. finally
  282. ctw.BlockedByQueueWait(false);
  283. end;
  284. end else
  285. Result:= fDoneEvent.WaitFor(INFINITE);
  286. end;
  287. procedure TutlThreadPool.TQueueItem.Execute;
  288. begin
  289. fResult:= fTask.Execute;
  290. fDoneEvent.SetEvent;
  291. end;
  292. function TutlThreadPool.TQueueItem.TaskResult: PtrUInt;
  293. begin
  294. Result:= fResult;
  295. end;
  296. procedure TutlThreadPool.TQueueItem.Cancel;
  297. begin
  298. fResult:= 0;
  299. fDoneEvent.SetEvent;
  300. end;
  301. { TutlThreadPool.TRunnableFuncNested }
  302. constructor TutlThreadPool.TRunnableFuncNested.Create(Proc: TAsyncFuncNested);
  303. begin
  304. inherited Create;
  305. fProc:= Proc;
  306. end;
  307. function TutlThreadPool.TRunnableFuncNested.Execute: PtrUInt;
  308. begin
  309. Result:= fProc();
  310. end;
  311. { TutlThreadPool.TRunnableFuncParamNested }
  312. constructor TutlThreadPool.TRunnableFuncParamNested.Create(
  313. Proc: TAsyncFuncParamNested; Param: PtrUInt);
  314. begin
  315. inherited Create;
  316. fProc:= Proc;
  317. fParam:= Param;
  318. end;
  319. function TutlThreadPool.TRunnableFuncParamNested.Execute: PtrUInt;
  320. begin
  321. Result:= fProc(fParam);
  322. end;
  323. end.