You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

440 regels
11 KiB

  1. unit uutlThreads;
  2. { Package: Utils
  3. Prefix: utl - UTiLs
  4. Beschreibung: diese Unit implementiert Hilfsklassen für multithreaded Anwendungen }
  5. {$mode objfpc}{$H+}
  6. {$modeswitch nestedprocvars}
  7. interface
  8. uses
  9. Classes, SysUtils, syncobjs, uutlGenerics;
  10. type
  11. TAutoResetEvent = class(TEvent)
  12. public
  13. constructor Create(aInitial: boolean = false);
  14. end;
  15. // aliased to stay in LCL naming scheme for TSimpleEvent
  16. TutlAutoResetEvent = TAutoResetEvent;
  17. TutlSpinLock = class
  18. private
  19. fLock: DWord;
  20. fLockReused: integer;
  21. public
  22. constructor Create;
  23. destructor Destroy; override;
  24. procedure Enter;
  25. procedure Leave;
  26. end;
  27. IutlThreadPoolRunnable = interface
  28. ['{DFEC1832-30DA-4D12-A321-7762057D2D17}']
  29. function Execute: PtrUInt;
  30. end;
  31. IutlThreadPoolWaitable = interface
  32. ['{8FEB8A6B-B659-4A48-929F-650CE7455BC7}']
  33. function WaitFor: TWaitResult;
  34. function TaskResult: PtrUInt;
  35. end;
  36. TAsyncFuncNested = function : PtrUInt is nested;
  37. TAsyncFuncParamNested = function (Param: PtrUInt) : PtrUInt is nested;
  38. TutlThreadPool = class
  39. private type
  40. TQueueItem = class(TInterfacedObject, IutlThreadPoolWaitable)
  41. private
  42. fTask: IutlThreadPoolRunnable;
  43. fDoneEvent: TEvent;
  44. fResult: PtrUInt;
  45. public
  46. constructor Create(const aTask: IutlThreadPoolRunnable);
  47. destructor Destroy; override;
  48. procedure Execute;
  49. procedure Cancel;
  50. function WaitFor: TWaitResult;
  51. function TaskResult: PtrUInt;
  52. end;
  53. TRunnableFuncNested = class(TInterfacedObject, IutlThreadPoolRunnable)
  54. private
  55. fProc: TAsyncFuncNested;
  56. public
  57. constructor Create(Proc: TAsyncFuncNested);
  58. function Execute: PtrUInt;
  59. end;
  60. TRunnableFuncParamNested = class(TInterfacedObject, IutlThreadPoolRunnable)
  61. private
  62. fProc: TAsyncFuncParamNested;
  63. fParam: PtrUInt;
  64. public
  65. constructor Create(Proc: TAsyncFuncParamNested; Param: PtrUInt);
  66. function Execute: PtrUInt;
  67. end;
  68. TWorker = class(TThread)
  69. private
  70. fOwner: TutlThreadPool;
  71. public
  72. constructor Create(Owner: TutlThreadPool);
  73. destructor Destroy; override;
  74. procedure Execute; override;
  75. procedure BlockedByQueueWait(const IsBlocked: boolean);
  76. end;
  77. TWorkerThreadList = specialize TutlList<TThread>;
  78. TWorkItemList = specialize TutlList<TQueueItem>;
  79. private
  80. fTerminating: Boolean;
  81. fThreads: TWorkerThreadList;
  82. fUnblockedThreads: LongInt;
  83. fThreadMgmtSection: TCriticalSection;
  84. fQueue: TWorkItemList;
  85. fQueueSection: TutlSpinLock;
  86. fNewItemEvent: TEvent;
  87. procedure SetMaxThreads(AValue: integer);
  88. procedure Shutdown;
  89. function GetMaxThreads: integer;
  90. protected
  91. function FetchWork: TQueueItem;
  92. procedure ThreadWaitForEvent;
  93. public
  94. constructor Create(const ThreadCount: integer = 0);
  95. destructor Destroy; override;
  96. // TODO: attention: if tasks queue more tasks and then wait for them, recursing more than MaxThreads levels is a deadlock ATM. This will be fixed later
  97. property MaxThreads: integer read GetMaxThreads write SetMaxThreads;
  98. function Queue(const Task: IutlThreadPoolRunnable): IutlThreadPoolWaitable; overload;
  99. function Queue(const Task: TAsyncFuncNested): IutlThreadPoolWaitable;
  100. function Queue(const Task: TAsyncFuncParamNested; const Param: PtrUInt): IutlThreadPoolWaitable;
  101. end;
  102. implementation
  103. { TAutoResetEvent }
  104. constructor TAutoResetEvent.Create(aInitial: boolean);
  105. begin
  106. inherited Create(Nil, false, aInitial, '');
  107. end;
  108. { TutlSpinLock }
  109. constructor TutlSpinLock.Create;
  110. begin
  111. inherited Create;
  112. fLock:= 0;
  113. fLockReused:= 0;
  114. end;
  115. destructor TutlSpinLock.Destroy;
  116. begin
  117. Enter;
  118. inherited Destroy;
  119. end;
  120. procedure TutlSpinLock.Enter;
  121. var
  122. ti: dword;
  123. begin
  124. ti:= ThreadID;
  125. if ti = InterlockedCompareExchange(fLock, ti, ti) then begin
  126. {
  127. The lock is already held by this thread. This means it cannot be modified by a concurrent
  128. operation (assuming Enter/Leave bracket correctly), and we can act non-atomar on other variables.
  129. }
  130. inc(fLockReused);
  131. end else begin
  132. while InterlockedCompareExchange(fLock, ti, 0) <> 0 do ;
  133. end;
  134. end;
  135. procedure TutlSpinLock.Leave;
  136. var
  137. ti: DWord;
  138. begin
  139. ti:= ThreadID;
  140. // Unlock only if we hold the lock
  141. if ti = InterlockedCompareExchange(fLock, ti, ti) then begin
  142. // our lock, but we haven't yet done anything (note the above is essentially a threadsafe CMP if successful)
  143. if fLockReused = 0 then
  144. InterLockedExchange(fLock, 0) // normal lock
  145. else
  146. dec(fLockReused); // nested locks
  147. end;
  148. end;
  149. { TutlThreadPool }
  150. constructor TutlThreadPool.Create(const ThreadCount: integer);
  151. begin
  152. inherited Create;
  153. fTerminating:= false;
  154. fQueue:= TWorkItemList.Create(False);
  155. fQueueSection:= TutlSpinLock.Create;
  156. fThreads:= TWorkerThreadList.Create(true);
  157. fThreadMgmtSection:= TCriticalSection.Create;
  158. fNewItemEvent:= TEvent.Create(nil, false, true, '');
  159. fUnblockedThreads:= 0;
  160. if ThreadCount = 0 then
  161. MaxThreads:= TThread.ProcessorCount + 2
  162. else
  163. MaxThreads:= ThreadCount;
  164. end;
  165. destructor TutlThreadPool.Destroy;
  166. begin
  167. Shutdown;
  168. FreeAndNil(fNewItemEvent);
  169. FreeAndNil(fThreads);
  170. FreeAndNil(fThreadMgmtSection);
  171. FreeAndNil(fQueue);
  172. FreeAndNil(fQueueSection);
  173. inherited Destroy;
  174. end;
  175. function TutlThreadPool.GetMaxThreads: integer;
  176. begin
  177. Result:= fThreads.Count;
  178. end;
  179. procedure TutlThreadPool.SetMaxThreads(AValue: integer);
  180. var
  181. i: integer;
  182. begin
  183. fThreadMgmtSection.Enter;
  184. try
  185. if MaxThreads=AValue then Exit;
  186. if AValue < MaxThreads then begin
  187. for i:= MaxThreads - 1 downto AValue do begin
  188. InterLockedDecrement(fUnblockedThreads);
  189. fThreads.Delete(i); // frees the item, which causes the thread to Terminate, Waitfor and Destroy
  190. end;
  191. end else begin
  192. for i:= MaxThreads to AValue - 1 do begin
  193. fThreads.Add(TWorker.Create(Self));
  194. InterLockedIncrement(fUnblockedThreads);
  195. end;
  196. end;
  197. finally
  198. fThreadMgmtSection.Leave;
  199. end;
  200. end;
  201. procedure TutlThreadPool.Shutdown;
  202. var
  203. i: integer;
  204. begin
  205. // don't give out any new Tasks
  206. fTerminating:= true;
  207. fThreadMgmtSection.Enter;
  208. try
  209. // kill all threads
  210. for i:= 0 to fThreads.Count - 1 do
  211. fThreads[i].Terminate;
  212. // some thready may be waiting for FetchWork or a Task that they recursively spawned
  213. fNewItemEvent.SetEvent;
  214. // kill all remaining tasks and notifiy Waitables
  215. fQueueSection.Enter;
  216. try
  217. for i:= 0 to fQueue.Count - 1 do begin
  218. fQueue[i].Cancel;
  219. fQueue[i]._Release;
  220. end;
  221. fQueue.Clear;
  222. finally
  223. fQueueSection.Leave;
  224. end;
  225. // every possible event is set now, wait for workers to finish
  226. for i:= 0 to fThreads.Count - 1 do
  227. fThreads[i].WaitFor;
  228. fThreads.Clear;
  229. finally
  230. fThreadMgmtSection.Leave;
  231. end;
  232. end;
  233. function TutlThreadPool.Queue(const Task: IutlThreadPoolRunnable): IutlThreadPoolWaitable;
  234. var
  235. itm: TQueueItem;
  236. begin
  237. if fTerminating then
  238. Exit(nil);
  239. itm:= TQueueItem.Create(Task);
  240. Result:= itm;
  241. // do we have a thread, or is everyone waiting for a task?
  242. if InterlockedCompareExchange(fUnblockedThreads, 0, 0) = 0 then begin
  243. // waiting for task, prevent deadlock by running this Task in the current thread
  244. itm.Execute;
  245. // Result has 1 Ref, so the caller frees the QueueItem
  246. end else begin
  247. // normal operation
  248. fQueueSection.Enter;
  249. try
  250. itm._AddRef;
  251. fQueue.Add(itm);
  252. fNewItemEvent.SetEvent;
  253. finally
  254. fQueueSection.Leave;
  255. end;
  256. end;
  257. end;
  258. function TutlThreadPool.FetchWork:TQueueItem;
  259. begin
  260. if fTerminating then
  261. Exit(nil);
  262. fQueueSection.Enter;
  263. try
  264. if fQueue.Count > 0 then begin
  265. Result:= fQueue[0];
  266. fQueue.Delete(0);
  267. end
  268. else
  269. Result:= nil;
  270. finally
  271. fQueueSection.Leave;
  272. end;
  273. end;
  274. procedure TutlThreadPool.ThreadWaitForEvent;
  275. begin
  276. fNewItemEvent.WaitFor(INFINITE);
  277. if fTerminating then // this one will soon leave .Execute, wake all others
  278. fNewItemEvent.SetEvent;
  279. end;
  280. function TutlThreadPool.Queue(const Task: TAsyncFuncNested): IutlThreadPoolWaitable;
  281. begin
  282. Result:= Queue(TRunnableFuncNested.Create(Task));
  283. end;
  284. function TutlThreadPool.Queue(const Task: TAsyncFuncParamNested; const Param: PtrUInt): IutlThreadPoolWaitable;
  285. begin
  286. Result:= Queue(TRunnableFuncParamNested.Create(Task, Param));
  287. end;
  288. { TutlThreadPool.TWorker }
  289. constructor TutlThreadPool.TWorker.Create(Owner: TutlThreadPool);
  290. begin
  291. inherited Create(false);
  292. fOwner:= Owner;
  293. end;
  294. destructor TutlThreadPool.TWorker.Destroy;
  295. begin
  296. Terminate;
  297. WaitFor;
  298. inherited Destroy;
  299. end;
  300. procedure TutlThreadPool.TWorker.Execute;
  301. var
  302. qi: TQueueItem;
  303. begin
  304. while not Terminated do begin
  305. qi:= fOwner.FetchWork;
  306. if Assigned(qi) then begin
  307. qi.Execute;
  308. qi._Release;
  309. end else
  310. fOwner.ThreadWaitForEvent;
  311. end;
  312. end;
  313. procedure TutlThreadPool.TWorker.BlockedByQueueWait(const IsBlocked: boolean);
  314. begin
  315. if IsBlocked then
  316. InterLockedDecrement(fOwner.fUnblockedThreads)
  317. else
  318. InterLockedIncrement(fOwner.fUnblockedThreads);
  319. end;
  320. { TutlThreadPool.TQueueItem }
  321. constructor TutlThreadPool.TQueueItem.Create(const aTask: IutlThreadPoolRunnable);
  322. begin
  323. inherited Create;
  324. fDoneEvent:= TEvent.Create(nil, true, false, '');
  325. fTask:= aTask;
  326. end;
  327. destructor TutlThreadPool.TQueueItem.Destroy;
  328. begin
  329. FreeAndNil(fDoneEvent);
  330. inherited Destroy;
  331. end;
  332. function TutlThreadPool.TQueueItem.WaitFor: TWaitResult;
  333. var
  334. ct: TThread;
  335. ctw: TutlThreadPool.TWorker;
  336. begin
  337. ct:= TThread.CurrentThread;
  338. if ct is TutlThreadPool.TWorker then begin
  339. ctw:= TutlThreadPool.TWorker(ct);
  340. ctw.BlockedByQueueWait(true);
  341. try
  342. Result:= fDoneEvent.WaitFor(INFINITE);
  343. finally
  344. ctw.BlockedByQueueWait(false);
  345. end;
  346. end else
  347. Result:= fDoneEvent.WaitFor(INFINITE);
  348. end;
  349. procedure TutlThreadPool.TQueueItem.Execute;
  350. begin
  351. fResult:= fTask.Execute;
  352. fDoneEvent.SetEvent;
  353. end;
  354. function TutlThreadPool.TQueueItem.TaskResult: PtrUInt;
  355. begin
  356. Result:= fResult;
  357. end;
  358. procedure TutlThreadPool.TQueueItem.Cancel;
  359. begin
  360. fResult:= 0;
  361. fDoneEvent.SetEvent;
  362. end;
  363. { TutlThreadPool.TRunnableFuncNested }
  364. constructor TutlThreadPool.TRunnableFuncNested.Create(Proc: TAsyncFuncNested);
  365. begin
  366. inherited Create;
  367. fProc:= Proc;
  368. end;
  369. function TutlThreadPool.TRunnableFuncNested.Execute: PtrUInt;
  370. begin
  371. Result:= fProc();
  372. end;
  373. { TutlThreadPool.TRunnableFuncParamNested }
  374. constructor TutlThreadPool.TRunnableFuncParamNested.Create(
  375. Proc: TAsyncFuncParamNested; Param: PtrUInt);
  376. begin
  377. inherited Create;
  378. fProc:= Proc;
  379. fParam:= Param;
  380. end;
  381. function TutlThreadPool.TRunnableFuncParamNested.Execute: PtrUInt;
  382. begin
  383. Result:= fProc(fParam);
  384. end;
  385. end.