`
absolute
  • 浏览: 188238 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

一段同步接收和发送MQ消息的代码

阅读更多
java 代码
  1. package com.sdb.payment.core.mq;   
  2.   
  3. import org.apache.log4j.Logger;   
  4.   
  5. import com.ibm.mq.MQC;   
  6. import com.ibm.mq.MQEnvironment;   
  7. import com.ibm.mq.MQException;   
  8. import com.ibm.mq.MQGetMessageOptions;   
  9. import com.ibm.mq.MQMessage;   
  10. import com.ibm.mq.MQPutMessageOptions;   
  11. import com.ibm.mq.MQQueue;   
  12. import com.ibm.mq.MQQueueManager;   
  13.   
  14. public class MessageQueueService {   
  15.     private static Logger logger = Logger.getLogger(MessageQueueService.class);   
  16.   
  17.     private String hostname = "192.168.0.117";   
  18.   
  19.     private String channel = "CHL.SVRCONN";   
  20.   
  21.     private String queueManager = "QM_SERVER";   
  22.   
  23.     private String sendQueue = "OMP.QRMT";   
  24.   
  25.     private String recvQueue = "OMP.QLCA";   
  26.   
  27.     private int port = 24100;   
  28.   
  29.     private int ccsid = 1381;   
  30.   
  31.     private int failedCount = 5;   
  32.   
  33.     private int intervalTime = 1000;   
  34.   
  35.     public MessageQueueService() {   
  36.         MQEnvironment.hostname = hostname;   
  37.         MQEnvironment.channel = channel;   
  38.         MQEnvironment.CCSID = ccsid;   
  39.         MQEnvironment.port = port;   
  40.     }   
  41.   
  42.     public String send(String sendMsg) throws Exception {   
  43.         MQQueueManager qManager = new MQQueueManager(queueManager);   
  44.            
  45.         // send message   
  46.         int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;   
  47.         MQQueue sQueue = qManager.accessQueue(sendQueue, openOptions);   
  48.         MQPutMessageOptions pmo = new MQPutMessageOptions();   
  49.         MQMessage send = new MQMessage();   
  50.         send.write(sendMsg.getBytes());   
  51.         System.out.println("send message : " + sendMsg);   
  52.         sQueue.put(send, pmo);   
  53.         sQueue.close();   
  54.         System.out.println("send message Id");   
  55.         for (int i = 0; i<send.messageId.length; i++) {   
  56.             System.out.print(send.messageId[i]);   
  57.         }   
  58.         System.out.println();   
  59.         System.out.println("send message Id");   
  60.         // fetch message   
  61.         openOptions = MQC.MQOO_INQUIRE + MQC.MQOO_FAIL_IF_QUIESCING   
  62.                 + MQC.MQOO_INPUT_SHARED;   
  63.         MQQueue rQueue = qManager.accessQueue(recvQueue, openOptions);   
  64.         MQGetMessageOptions getOptions = new MQGetMessageOptions();   
  65.         getOptions.options = MQC.MQGMO_WAIT;   
  66.         getOptions.waitInterval = intervalTime;   
  67.            
  68.         MQMessage recvMsg = new MQMessage();   
  69.         recvMsg.messageId = send.messageId;//这里是关键,要保持接收的msgid跟发送的msgid值是一样的,   
  70.                                            //这样就会根据msgId来取队列的消息了,而不会取到别的消息   
  71.         send.clearMessage();   
  72.            
  73.         boolean received = false;   
  74.         int fetchCount = 0;   
  75.         while (!received) {   
  76.             try {   
  77.                 fetchCount++;   
  78.                 rQueue.get(recvMsg, getOptions);   
  79.                 //logger.debug("the " + fetchCount + " time fetch message!");   
  80.                 System.out.println("fetch message !!!");   
  81.                 received = true;   
  82.             } catch (MQException me) {   
  83.                 if (me.reasonCode == MQException.MQRC_NO_MSG_AVAILABLE) {   
  84.                     if (fetchCount > failedCount) {   
  85.                         recvMsg.clearMessage();   
  86.                         rQueue.close();   
  87.                         qManager.disconnect();   
  88.                         //logger.error("can't fetch message for " + me.getMessage());   
  89.                         return null;   
  90.                     }   
  91.                 }   
  92.             } catch (Exception ex) {   
  93.                 recvMsg.clearMessage();   
  94.                 rQueue.close();   
  95.                 qManager.disconnect();   
  96.                 //logger.error("can't fetch message for " + ex.getMessage());   
  97.                 return null;   
  98.             }   
  99.         }   
  100.            
  101.         byte[] bMsg = new byte[recvMsg.getMessageLength()];   
  102.         recvMsg.readFully(bMsg);   
  103.         System.out.println("rec correlationId Id");   
  104.         for (int i = 0; i<recvMsg.correlationId.length; i++) {   
  105.             System.out.print(recvMsg.correlationId[i]);   
  106.         }   
  107.         System.out.println();   
  108.         System.out.println("rec correlationId Id");   
  109.         String recv = new String(bMsg);   
  110.         recvMsg.clearMessage();   
  111.         rQueue.close();   
  112.            
  113.         qManager.disconnect();   
  114.            
  115.         return recv;   
  116.     }   
  117.   
  118.     public void setChannel(String channel) {   
  119.         this.channel = channel;   
  120.     }   
  121.   
  122.     public void setHostname(String hostname) {   
  123.         this.hostname = hostname;   
  124.     }   
  125.   
  126.     public void setQueueManager(String queueManager) {   
  127.         this.queueManager = queueManager;   
  128.     }   
  129.   
  130.     public void setPort(int port) {   
  131.         this.port = port;   
  132.     }   
  133.   
  134.     public void setIntervalTime(int intervalTime) {   
  135.         this.intervalTime = intervalTime;   
  136.     }   
  137.   
  138.     public void setFailedCount(int failedCount) {   
  139.         this.failedCount = failedCount;   
  140.     }   
  141.   
  142.     public void setRecvQueue(String recvQueue) {   
  143.         this.recvQueue = recvQueue;   
  144.     }   
  145.   
  146.     public void setSendQueue(String sendQueue) {   
  147.         this.sendQueue = sendQueue;   
  148.     }   
  149. }  
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics