Delphi: Cool Little Fork/Join Threading lib

Posted by Bart van der Werf

unit Silk;
{
sppedup options:
reuse TSilkContext instances, creating the mutex is a bit expensive
}
interface
uses
{$IFDEF COMTECTHREADDEBUG}
  ComtecThreadDebugger,
{$ENDIF}
  Classes,
  SysUtils,
  ComtecCollection,
  ComtecArrayList,
  ComtecInterfaceList,
  ComtecThreadsafeList,
  OrtecException,
  OrtecThreadPool,
  SyncObjs,
  Windows;
procedure Enter;
procedure Leave;
procedure Spawn(Func: TFunction; Argument: TStorage; var Result: TStorage);
procedure Sync;
implementation
uses Math;
const
  TIMEOUT = INFINITE;
type
  TSilkContext = class
  private
    FSemaphore: TEvent;
    FCount: Integer;
    procedure Invoke(SilkCommand: TStorage; var Result: TStorage);
  public
    constructor Create;
    destructor Destroy; override;
    procedure Spawn(Func: TFunction; Argument: TStorage; var Result: TStorage);
    procedure Sync;
  end;
  TSetupTeardown = class
  public
    procedure SetupThread(Argument: TStorage; var Result: TStorage);
    procedure TeardownThread(Argument: TStorage; var Result: TStorage);
  end;
  TSilkCommand = record
    Func: TFunction;
    Argument: TStorage;
  end;
  PSilkCommand = ^TSilkCommand;
var
  ThreadPool: TOrtecThreadPool;
  SetupTeardown: TSetupTeardown;
threadvar
  SilkStack: TComtecList;  {used as stack}
{ TSilkContext }
procedure TSilkContext.Invoke(SilkCommand: TStorage; var Result: TStorage);
begin
  try
    PSilkCommand(SilkCommand).Func(PSilkCommand(SilkCommand).Argument, Result);
  finally
    if InterlockedDecrement(FCount) = 0 then
      FSemaphore.SetEvent;
    Dispose(PSilkCommand(SilkCommand));
  end;
end;
constructor TSilkContext.Create;
begin
  inherited;
  FSemaphore := TEvent.Create(nil, true, false, '');
  FSemaphore.SetEvent;
  FCount := 0;
end;
destructor TSilkContext.Destroy;
begin
  FSemaphore.Free;
  inherited;
end;
procedure TSilkContext.Spawn(Func: TFunction; Argument: TStorage; var Result: TStorage);
var
  Store: PSilkCommand;
begin
  New(Store);
  Store.Func := Func;
  Store.Argument := Argument;
  InterlockedIncrement(FCount);
  FSemaphore.ResetEvent;
  if not ThreadPool.InvokeIfAvailable(Invoke, TStorage(Store), Result) then
    Invoke(TStorage(Store), Result);
end;
procedure TSilkContext.Sync;
begin
  if FSemaphore.WaitFor(TIMEOUT)  wrSignaled then
    raise EIllegalState.Create;
end;
{ TSetupTeardown }
procedure TSetupTeardown.SetupThread(Argument: TStorage; var Result: TStorage);
begin
  SilkStack := TComtecArrayList.Create(TComtecObjectComparator.Instance);
end;
procedure TSetupTeardown.TeardownThread(Argument: TStorage; var Result: TStorage);
begin
  SilkStack.Free;
end;
{ Unit functions }
procedure Enter;
begin
  {reserve some stack, but lazyinit when needed}
  SilkStack.AddLast(nil);
end;
procedure Leave;
begin
  SilkStack.RemoveLast.Free;
end;
procedure Spawn(Func: TFunction; Argument: TStorage; var Result: TStorage);
var
  Context: TSilkContext;
begin
  if ThreadPool.HasThreadAvailable then
  begin
    Context := SilkStack.Last as TSilkContext;
    if not Assigned(Context) then
    begin
      Context := TSilkContext.Create;
      SilkStack.RemoveLast;
      SilkStack.AddLast(Context);
    end;
    Context.Spawn(Func, Argument, Result);
  end
  else
    Func(Argument, Result);
end;
procedure Sync;
var
  Context: TSilkContext;
begin
  Context := SilkStack.Last as TSilkContext;
  if Assigned(Context) then
    Context.Sync;
end;
initialization
  SetupTeardown := TSetupTeardown.Create;
  ThreadPool := TOrtecThreadPool.Create(0);
  ThreadPool.Setup(SetupTeardown.SetupThread, NONE, NONE);
  SetupTeardown.SetupThread(NONE, NONE);
finalization
  if Assigned(SetupTeardown) then
    SetupTeardown.TeardownThread(NONE, NONE);
  if Assigned(ThreadPool) then
    ThreadPool.Teardown(SetupTeardown.TeardownThread, NONE, NONE);
  FreeAndNil(ThreadPool);
  FreeAndNil(SetupTeardown);
end.

Related articles

       

Related Articles and Replies