如何终止等待返回的线程

问题描述 投票:0回答:1

我需要以不会导致错误或延迟的方式终止正在运行的线程。问题出在函数“lMsg := lMsgQueue.Get(FQueueGetTimeout);”中,它将等待定义的时间(通常是5000毫秒)。因此,如果我需要调用外部终止,我的应用程序将陷入等待终止状态。

在进程中终止它的最佳方法是什么?

{ TConsumerThread }

constructor TConsumerThread.Create;
begin
  FreeOnTerminate := True;
  InitializeVars;
  inherited Create(True);
end;

procedure TConsumerThread.Execute;
var
  lMsgQueue: TAMQPMessageQueue;
  lMsg: TAMQPMessage;
  lStartTime: TDateTime;
begin
  lMsgQueue := TAMQPMessageQueue.Create;
  FChannelAMQPThread := FConnectionAMQP.OpenChannel(FQueuePrefetchSize, FQueuePrefetchCount);
  try
    try
      FChannelAMQPThread.BasicConsume(lMsgQueue, FQueue, 'Consumer');
      lStartTime := Now;
      repeat
        try

          try
            if not(FConnectionAMQP.IsOpen) then
            BEGIN
              FConnectionAMQP.Connect;
              FChannelAMQPThread := FConnectionAMQP.OpenChannel(FQueuePrefetchSize, FQueuePrefetchCount);
              FChannelAMQPThread.BasicConsume(lMsgQueue, FQueue, 'Consumer';
            END;
          except
            on E: Exception do
              Break;
          end;

          lMsg := lMsgQueue.Get(FQueueGetTimeout);
          if (lMsg = nil) and not(Terminated) then
          begin
            if Assigned(FChannelAMQPThread) then
            begin
              FConnectionAMQP.CloseChannel(FChannelAMQPThread);
              FChannelAMQPThread := nil;
            end;

            FChannelAMQPThread := FConnectionAMQP.OpenChannel(FQueuePrefetchSize, FQueuePrefetchCount);
            FChannelAMQPThread.BasicConsume(lMsgQueue, FQueue, 'Consumer');
          end;

          if not(Terminated) then
          begin
            try
              if not(FConnectionAMQP.IsOpen) then
                FConnectionAMQP.Connect;
            except
              on E: Exception do
                Break;
            end;
          end;

          if not(Terminated) then
          begin
            if ValidateFilter(lMsg) then
            begin
              FCorrelationID := lMsg.Header.PropertyList.CorrelationID.Value;

              FReceivedMessage := lMsg.Body.asString[TEncoding.ASCII];
              lMsg.Ack;
              lMsg.Free;
              Terminate;
            end
            else
            begin
              lMsg.Reject;
              lMsg.Free;

              if not(FTimeout = INFINITE) then
              begin
                if (MilliSecondsBetween(Now, lStartTime) >= (Int64(FTimeout))) then
                begin
                  FReceivedMessage := '';
                  Terminate;
                end;
              end;
            end;
          end
          else
          begin
            Terminate;
          end;
        except
          on E: Exception do
          begin
            if Assigned(lMsg) then
            begin
              lMsg.Free;
              lMsg := nil;
            end;
          end;
        end;
      until (Terminated);
    except
      on E: Exception do
      begin
        FReceivedMessage := '';

        if not(Terminated) then
          Terminate;
      end;
    end;
  finally
    lMsgQueue.Free;
  end;
end;

procedure TConsumerThread.TerminatedSet;
begin
  inherited;
  if Assigned(FChannelAMQPThread) then
  begin
    try
      if FConnectionAMQP.IsOpen then
        FConnectionAMQP.CloseChannel(FChannelAMQPThread);
    except
      on E: Exception do
    end;

    FChannelAMQPThread := nil;
  end;
end;

function TConsumerThread.ValidateFilter(pMsg: TAMQPMessage): Boolean;
begin
  Result := False;

  case FMsgFilter of
    fmsgNone:
      Result := True;
    fmsgMessageID:
      Result := (pMsg.Header.PropertyList.MessageID.Value = FFilterValue);
    fmsgCorrelationID:
      Result := (pMsg.Header.PropertyList.CorrelationID.Value = FFilterValue);
  end;
end;

procedure TConsumerThread.InitializeVars;
begin
  FConnectionAMQP := nil;
  FChannelAMQPThread := nil;
  FQueue := '';
  FTimeout := INFINITE;
  FQueueGetTimeout := 5000;
  FQueuePrefetchSize := 0;
  FQueuePrefetchCount := 10;
  FMsgFilter := fmsgNone;
  FFilterValue := '';
  FReceivedMessage := '';
end;

为了检查返回的消息或异常,我在 OnTerminate 中使用了一个函数。

此外,在这种情况下将其设置为“FreeOnTerminate”是最好的选择吗?

我启动它时暂停了,因为我在启动之前设置了属性(在InitializeVars中初始化)。

这段代码来自“Get”函数,不是我写的,但如果需要的话我可以编辑它。

{$I AMQP.Options.inc}
unit AMQP.Classes;

interface

Uses
  SysUtils, Classes, SyncObjs, Generics.Collections,
  AMQP.Frame, AMQP.Message, AMQP.Method, AMQP.Types
  {$IfDef fpc}
  , AMQP.SyncObjs
  {$EndIf}
  ;

Type
  AMQPException = Class(Exception);
  AMQPTimeout  = class(AMQPException);

  TAMQPServerProperties = Class
  Strict Private
    FCapabilities : TStringList;
    FMechanisms   : TStringList;
    FLocales      : TStringList;
    FClusterName  : String;
    FCopyright    : String;
    FInformation  : String;
    FPlatform     : String;
    FProduct      : String;
    FVersion      : String;
    FKnownHosts   : String;
    FVersionMajor : Integer;
    FVersionMinor : Integer;
    FChannelMax   : Integer;
    FFrameMax     : Integer;
    FHeartbeat    : Integer;
  Public
    Property Capabilities         : TStringList read FCapabilities;
    Property Mechanisms           : TStringList read FMechanisms;
    Property Locales              : TStringList read FLocales;
    Property ClusterName          : String      read FClusterName;
    Property Copyright            : String      read FCopyright;
    Property Information          : String      read FInformation;
    Property &Platform            : String      read FPlatform;
    Property Product              : String      read FProduct;
    Property Version              : String      read FVersion;
    Property KnownHosts           : String      read FKnownHosts;
    Property ProtocolVersionMajor : Integer     read FVersionMajor;
    Property ProtocolVersionMinor : Integer     read FVersionMinor;
    Property ChannelMax           : Integer     read FChannelMax;
    Property FrameMax             : Integer     read FFrameMax;
    Property Heartbeat            : Integer     read FHeartbeat;

    Procedure ReadConnectionStart( AConnectionStart: TAMQPMethod );
    Procedure ReadConnectionTune( AConnectionTune: TAMQPMethod );
    Procedure ReadConnectionOpenOK( AConnectionOpenOK: TAMQPMethod );

    Constructor Create;
    Destructor Destroy; Override;
  End;

  TBlockingQueue<T> = Class
  Strict Protected
    FGuard     : {$IFDEF FPC}TRTLCriticalSection{$ELSE}TCriticalSection{$ENDIF};
    FCondition : TConditionVariableCS;
    FQueue     : TQueue<T>;
  Public
    Function Count: Integer; Virtual;
    Function Get(ATimeOut: LongWord): T; Virtual;
    Procedure Put( Item: T ); Virtual;

    Constructor Create; Virtual;
    Destructor Destroy; Override;
  End;

  TAMQPQueue = TBlockingQueue<TAMQPFrame>;

  TAMQPMessageQueue = TBlockingQueue<TAMQPMessage>;

implementation

{ TAMQPServerProperties }


constructor TAMQPServerProperties.Create;
begin
  FCapabilities := TStringList.Create;
  FMechanisms   := TStringList.Create;
  FLocales      := TStringList.Create;
  FMechanisms.StrictDelimiter := True;
  FMechanisms.Delimiter       := ' ';
  FLocales.StrictDelimiter    := True;
  FLocales.Delimiter          := ' ';
  FClusterName  := '';
  FCopyright    := '';
  FInformation  := '';
  FPlatform     := '';
  FProduct      := '';
  FVersion      := '';
  FKnownHosts   := '';
  FVersionMajor := 0;
  FVersionMinor := 0;
  FChannelMax   := 0;
  FFrameMax     := 0;
  FHeartbeat    := 0;
end;

Procedure TAMQPServerProperties.ReadConnectionStart( AConnectionStart: TAMQPMethod );
var
  ServerProperties: TFieldTable;
  ServerCapabilities: TFieldTable;
  Pair: TFieldValuePair;
begin
  FMechanisms.DelimitedText := AConnectionStart.Field['mechanisms'].AsLongString.Value;
  FLocales.DelimitedText    := AConnectionStart.Field['locales'].AsLongString.Value;
  ServerProperties          := AConnectionStart.Field['server-properties'].AsFieldTable;
  FVersionMajor             := AConnectionStart.Field['version-major'].AsShortShortUInt.Value;
  FVersionMinor             := AConnectionStart.Field['version-minor'].AsShortShortUInt.Value;
  FClusterName              := ServerProperties.Field['cluster_name'].AsShortString.Value;
  FCopyright                := ServerProperties.Field['copyright'].AsShortString.Value;
  FInformation              := ServerProperties.Field['information'].AsShortString.Value;
  FPlatform                 := ServerProperties.Field['platform'].AsShortString.Value;
  FProduct                  := ServerProperties.Field['product'].AsShortString.Value;
  FVersion                  := ServerProperties.Field['version'].AsShortString.Value;
  ServerCapabilities        := ServerProperties.Field['capabilities'].AsFieldTable;
  for Pair in ServerCapabilities do
    FCapabilities.Values[ Pair.Name.Value ] := Pair.Value.AsString('');
end;

Procedure TAMQPServerProperties.ReadConnectionTune( AConnectionTune: TAMQPMethod );
begin
  FChannelMax               := AConnectionTune.Field['channel-max'].AsShortUInt.Value;
  FFrameMax                 := AConnectionTune.Field['frame-max'].AsLongUInt.Value;
  FHeartbeat                := AConnectionTune.Field['heartbeat'].AsShortUInt.Value;
end;

Procedure TAMQPServerProperties.ReadConnectionOpenOK( AConnectionOpenOK: TAMQPMethod );
begin
  FKnownHosts               := AConnectionOpenOK.Field['known-hosts'].AsShortString.Value;
end;

destructor TAMQPServerProperties.Destroy;
begin
  FCapabilities.Free;
  FMechanisms.Free;
  FLocales.Free;
  inherited;
end;

{ TBlockingQueue<T> }

function TBlockingQueue<T>.Count: Integer;
begin
  {$IFDEF FPC}
  EnterCriticalSection(FGuard);
  {$ELSE}
  FGuard.Acquire;
  {$ENDIF}
  try
    Result := FQueue.Count;
  finally
    {$IFDEF FPC}
     LeaveCriticalSection(FGuard);
    {$ELSE}
    FGuard.Release;
    {$ENDIF}
  end;
end;

constructor TBlockingQueue<T>.Create;
begin
  inherited;
  {$IFDEF FPC}
  InitCriticalSection(FGuard);
  {$ELSE}
  FGuard     := TCriticalSection.Create;
  {$ENDIF}
  FCondition := TConditionVariableCS.Create;
  FQueue     := TQueue<T>.Create;
end;

destructor TBlockingQueue<T>.Destroy;
begin
  FQueue.Free;
  FQueue := nil;
  FCondition.Free;
  FCondition := nil;
  {$IFDEF FPC}
  DoneCriticalSection(FGuard);
  {$ELSE}
  FGuard.Free;
  FGuard := nil;
  {$ENDIF}
  inherited;
end;

function TBlockingQueue<T>.Get(ATimeOut: LongWord): T;
begin
  {$IFDEF FPC}
  EnterCriticalSection(FGuard);
  {$ELSE}
  FGuard.Acquire;
  {$ENDIF}
  try
    while FQueue.Count = 0 do
    begin
     {$IFDEF FPC}
      if FCondition.WaitForRTL(FGuard, ATimeOut) = wrTimeout then
     {$Else}
      if FCondition.WaitFor(FGuard, ATimeOut) = wrTimeout then
     {$EndIf}
       raise AMQPTimeout.Create('Timeout!');
    end;
    Result := FQueue.Dequeue
  finally
  {$IFDEF FPC}
   LeaveCriticalSection(FGuard);
  {$ELSE}
  FGuard.Release;
  {$ENDIF}
  end;
end;

procedure TBlockingQueue<T>.Put(Item: T);
begin
  {$IFDEF FPC}
  EnterCriticalSection(FGuard);
  {$ELSE}
  FGuard.Acquire;
  {$ENDIF}
  try
    FQueue.Enqueue( Item );
    FCondition.ReleaseAll;
  finally
    {$IFDEF FPC}
     LeaveCriticalSection(FGuard);
    {$ELSE}
    FGuard.Release;
    {$ENDIF}
  end;
end;

end.
multithreading delphi delphi-10.3-rio
1个回答
0
投票

TBlockingQueue<T>.Get()
正在等待
TConditionVariableCS
在指定的超时时间内收到信号。 为了更快地退出,即使队列中没有任何内容,您也必须向 ConditionVariable 发出信号,因此您需要在出队之前在某个地方有一个单独的标志,以便
Get()
可以查看。 然后您只需设置该标志并发出 ConditionVariable 信号即可。

因此,使用 ConditionVariable 可能不是最佳选择。 不幸的是,Delphi 没有为此目的实现任何类型的取消令牌,因此我可能只会使用 2 个

TEvent
来代替,然后对它们执行一次等待。 当队列准备好时向一个发出信号,当需要取消时向另一个发出信号。 等待操作会告诉您哪一个满足了等待,以便您可以采取相应的行动。

© www.soinside.com 2019 - 2024. All rights reserved.