如何从慰借中获得所有队列和主题

问题描述 投票:0回答:3

我想从安慰(队列和主题)发现所有目的地

我尝试使用MBeanServerConnection并在名称后查询(但我没有找到正确的方法来使用它)或JNDI查找目标dest =(目标)context.lookup(Dest_name),但我没有队列的名称/话题。我正在使用安慰 - jms库。

我正在寻找像这样的smth :(但是为了安慰,不是活跃的Mq)get all Queue from activeMQ

java jms solace
3个回答
0
投票

您需要在管理界面上使用SEMP。

示例命令:

curl -d '<rpc><show><queue><name>*</name></queue></show></rpc>' -u semp_username:semp_password http://your_management_ip:your_management_port/SEMP
curl -d '<rpc><show><topic-endpoint><name>*</name></topic-endpoint></show></rpc>' -u semp_username:semp_password http://your_management_ip:your_management_port/SEMP

请注意,我使用curl是为了简单,但任何应用程序都可以执行HTTP POST来执行这些命令。如果您使用的是Java,则可以参考Solace API示例中的SempHttpSetRequest示例。

有关SEMP的文档可以在here找到。


但是,这里更大的问题是为什么你需要发现所有目的地?

消息代理的一个功能是将发布者和使用者分离。

如果您需要知道是否将持久性消息发布到没有消费者的主题,则可以使用发布应用程序的客户端配置文件中的reject-msg-to-sender-no-subscription-match设置。这意味着,如果发布者尝试在没有匹配订阅者的主题上发布消息,则发布者将获得否定确认。

有关详细信息,请参阅https://docs.solace.com/Configuring-and-Managing/Configuring-Client-Profiles.htm上的“处理无匹配保证消息”。


0
投票

以下是一些可能有用的源代码。在正确配置设备的情况下,SEMP也可通过JMS主题“#SEMP /(router)/ SHOW”获得。

  /**
   * Return the SolTopicInfo for this topic (or all topics if 'topic' is null).
   * 
   * @param session
   * @param endpointName
   * @return
   */
  public static SolTopicInfo[] getTopicInfo(JCSMPSession session, String endpointName, String vpn,
      String sempVersion) {
    XMLMessageConsumer cons = null;
    XMLMessageProducer prod = null;
    Map<String, SolTopicInfo> tiMap = new HashMap<String, SolTopicInfo>();
    try {
      // Create a producer and a consumer, and connect to appliance.
      prod = session.getMessageProducer(new PubCallback());
      cons = session.getMessageConsumer(new SubCallback());
      cons.start();

      if (vpn == null) vpn = (String) session.getProperty(JCSMPProperties.VPN_NAME);
      if (sempVersion == null) sempVersion = getSempVersion(session);

      // Extract the router name.
      final String SEMP_SHOW_TE_TOPICS = "<rpc semp-version=\""
          + sempVersion
          + "\"><show><topic-endpoint><name>"
          + endpointName
          + "</name><vpn-name>"+ vpn + "</vpn-name></topic-endpoint></show></rpc>";

      RpcReply teTopics =  sendRequest(session, SEMP_SHOW_TE_TOPICS);

      for (TopicEndpoint2 te : teTopics.getRpc().getShow().getTopicEndpoint().getTopicEndpoints()
          .getTopicEndpointArray()) {
        SolTopicInfo ti = new SolTopicInfo();
        ti.setBindCount(te.getInfo().getBindCount());
        //qi.setDescription(qt.getInfo().getNetworkTopic());
        ti.setEndpoint(te.getName());
        ti.setMessageVPN(te.getInfo().getMessageVpn());
        ti.setTopic(te.getInfo().getDestination());
        ti.setDurable(te.getInfo().getDurable());
        ti.setInSelPres(te.getInfo().getIngressSelectorPresent());
        ti.setHwmMB(formatter.format(te.getInfo().getHighWaterMarkInMb()));
        ti.setSpoolUsageMB(formatter.format(te.getInfo().getCurrentSpoolUsageInMb()));
        ti.setMessagesSpooled(te.getInfo().getNumMessagesSpooled().longValue());
        String status = te.getInfo().getIngressConfigStatus().substring(0, 1).toUpperCase();
        status += " " + te.getInfo().getEgressConfigStatus().substring(0, 1).toUpperCase();
        status += " " + te.getInfo().getIngressSelectorPresent().substring(0, 1).toUpperCase();
        status += " " + te.getInfo().getType().substring(0, 1).toUpperCase();
        ti.setStatus(status);

        tiMap.put(ti.getEndpoint(), ti);
      }

    } catch (JCSMPException e) {

      throw new RuntimeException(e.getMessage(), e);
    } finally {
      if (cons != null)
        cons.close();
      if (prod != null)
        prod.close();
    }
    return tiMap.values().toArray(new SolTopicInfo[0]);
  }

  /**
   * Return the SolQueueInfo for this queue (or all queues if 'queue' is null).
   * 
   * @param session
   * @param queue
   * @param vpn (if null, use the session's vpn name)
   * @param sempVersion, if null use 'soltr/7_1_1'
   * @return
   */
  public static SolQueueInfo[] getQueueInfo(JCSMPSession session, String queue, String vpn,
      String sempVersion) {
    XMLMessageConsumer cons = null;
    XMLMessageProducer prod = null;
    Map<String, SolQueueInfo> qiMap = new HashMap<String, SolQueueInfo>();
    try {
      // Create a producer and a consumer, and connect to appliance.
      prod = session.getMessageProducer(new PubCallback());
      cons = session.getMessageConsumer(new SubCallback());
      cons.start();

      if (vpn == null) vpn = (String) session.getProperty(JCSMPProperties.VPN_NAME);
      if (sempVersion == null) sempVersion = getSempVersion(session);

      // Extract the router name.

      final String SEMP_SHOW_QUEUE_SUBS = "<rpc semp-version=\""
          + sempVersion
          + "\"><show><queue><name>"
          + queue
          + "</name><vpn-name>"+ vpn + "</vpn-name><subscriptions/><count/><num-elements>200</num-elements></queue></show></rpc>";

      RpcReply queueSubs = sendRequest(session, SEMP_SHOW_QUEUE_SUBS);

      for (QueueType qt : queueSubs.getRpc().getShow().getQueue().getQueues().getQueueArray()) {
        SolQueueInfo qi = new SolQueueInfo();
        qi.setBindCount(qt.getInfo().getBindCount());
        //qi.setDescription(qt.getInfo().getNetworkTopic());
        qi.setName(qt.getName());
        qi.setMessageVPN(qt.getInfo().getMessageVpn());
        qi.setDurable(qt.getInfo().getDurable());
        qi.setEgSelPres(qt.getInfo().getEgressSelectorPresent());
        qi.setHwmMB(formatter.format(qt.getInfo().getHighWaterMarkInMb()));
        qi.setMessagesSpooled(qt.getInfo().getNumMessagesSpooled().longValue());
        qi.setSpoolUsageMB(formatter.format(qt.getInfo().getCurrentSpoolUsageInMb()));
        String status = qt.getInfo().getIngressConfigStatus().substring(0, 1).toUpperCase();
        status += " " + qt.getInfo().getEgressConfigStatus().substring(0, 1).toUpperCase();
        status += " " + qt.getInfo().getAccessType().substring(0, 1).toUpperCase();
        status += " " + qt.getInfo().getEgressSelectorPresent().substring(0, 1).toUpperCase();
        status += " " + qt.getInfo().getType().substring(0, 1).toUpperCase();
        status += qt.getInfo().getDurable() ? " D" : " N";
        qi.setStatus(status);

        for (Subscription sub : qt.getSubscriptions().getSubscriptionArray()) {
          qi.addSubscription(sub.getTopic());
        }

        qiMap.put(qi.getName(), qi);
      }

    } catch (JCSMPException e) {

      throw new RuntimeException(e.getMessage(), e);
    } finally {
      if (cons != null)
        cons.close();
      if (prod != null)
        prod.close();
    }
    return qiMap.values().toArray(new SolQueueInfo[0]);
  }

  private static String getSempVersion(JCSMPSession session)
  {
    String retval = "soltr/7_1_1";
    try {
      String peerVersion = (String)session.getCapability(CapabilityType.PEER_SOFTWARE_VERSION);
      if (peerVersion != null)
      {
        retval = "soltr/";
        String[] version = peerVersion.split("\\.");
        retval += version[0];
        retval += "_" + version[1];
        if (!version[2].equals("0")) retval += "_" + version[2];
      }
    } catch (Throwable e) {
      System.err.println(e);
    }
    return retval;
  }

  private static RpcReply sendRequest(JCSMPSession session,
      final String requestStr)  {
    try {
      // Set up the requestor and request message.
      String routerName = (String) session
          .getCapability(CapabilityType.PEER_ROUTER_NAME);

      final String SEMP_TOPIC_STRING = String.format("#SEMP/%s/SHOW",
          routerName);
      final Topic SEMP_TOPIC = JCSMPFactory.onlyInstance().createTopic(
          SEMP_TOPIC_STRING);
      Requestor requestor = session.createRequestor();
      BytesXMLMessage requestMsg = JCSMPFactory.onlyInstance().createMessage(
          BytesXMLMessage.class);
      requestMsg.writeAttachment(requestStr.getBytes());

      BytesXMLMessage replyMsg = requestor
          .request(requestMsg, 5000, SEMP_TOPIC);

      String replyStr = new String();
      if (replyMsg.getAttachmentContentLength() > 0) {
        byte[] bytes = new byte[replyMsg.getAttachmentContentLength()];
        replyMsg.readAttachmentBytes(bytes);
        replyStr = new String(bytes, "US-ASCII");
      }

      RpcReplyDocument doc = RpcReplyDocument.Factory.parse(replyStr);

      RpcReply reply = doc.getRpcReply();

      if (reply.isSetPermissionError()) {
        throw new RuntimeException(
            "Permission Error: Make sure SEMP over message bus SHOW commands are enabled for this VPN");
      }

      if( reply.isSetParseError() ) {
        throw new RuntimeException( "SEMP Parse Error: " + reply.getParseError() );
      }

      if( reply.isSetLimitError() ) {
        throw new RuntimeException( "SEMP Limit Error: " + reply.getLimitError() );
      }

      if( reply.isSetExecuteResult() && reply.getExecuteResult().isSetReason() ) { // axelp: encountered this error on invalid 'queue' name
        throw new RuntimeException( "SEMP Execution Error: " + reply.getExecuteResult().getReason() );
      }

      return reply;
    } catch (JCSMPException e) {

      throw new RuntimeException(e.getMessage(), e);
    } catch (UnsupportedEncodingException e) {

      throw new RuntimeException(e.getMessage(), e);
    } catch (XmlException e) {

      throw new RuntimeException(e.getMessage(), e);
    }
  }

0
投票

您可以使用以下SEMPv2命令获取消息VPN特定队列和主题。

curl -s -X GET -u semp_user:semp_pass management_host:management_port/SEMP/v2/monitor/msgVpns/{vpn-name}/queues?select="queueName"

curl -s -X GET -u semp_user:semp_pass management_host:management_port/SEMP/v2/monitor/msgVpns/{vpn-name}/topicEndpoints?select="topicEndpointName"
© www.soinside.com 2019 - 2024. All rights reserved.