概述
例子中使用的消息类型是:MapMessage。代码比较简单,帖出来给大家看看:
接收消息:
private void Receive()
{
var factory = new ConnectionFactory(Program.BrokerUri);
var timeout = new TimeSpan(0, 0, 10);
using (var connection = factory.CreateConnection())
{
using (var session = connection.CreateSession())
{
var destination = SessionUtil.GetDestination(session, Program.NormalQueueDestination);
using (var consumer = session.CreateConsumer(destination))
{
connection.Start();
var stopwatch = new Stopwatch();
var index = 1;
for (var i = 0; i < Program.MessageCount; i++)
{
if (index == 1)
{
stopwatch.Start();
}
try
{
var message = (IMapMessage)consumer.Receive(timeout);
index++;
var messageObj = Common.GetMessageObjByIMapMessage(message);
if (messageObj != null)
{
//Debug.WriteLine(messageObj.MediaTaskId);
}
if (index == Program.StatisticsMessageCountSpan)
{
stopwatch.Stop();
var spendSeconds = stopwatch.Elapsed.TotalSeconds;
var speed = Program.StatisticsMessageCountSpan / spendSeconds;
Debug.WriteLine("Receive " + Program.StatisticsMessageCountSpan + " Messages Spend:" + spendSeconds + " Seconds. (" + speed.ToString("0.00") + "/s)");
stopwatch.Reset();
index = 1;
}
}
catch (Exception ex)
{
Debug.WriteLine(ex);
}
}
}
}
}
}
发送消息:
private void Send() { var factory = new ConnectionFactory(Program.BrokerUri); using (var connection = factory.CreateConnection()) { using (var session = connection.CreateSession()) { var destination = SessionUtil.GetDestination(session, Program.NormalQueueDestination); using (var producer = session.CreateProducer(destination)) { connection.Start(); var mediaTaskId = 100000000000000; var stopwatch = new Stopwatch(); var index = 1; for (var i = 1; i <= Program.MessageCount; i++) { if (index == 1) { stopwatch.Start(); } try { mediaTaskId++; var message = session.CreateMapMessage(); Common.SetMapMessage(message, Common.GetMessageObj(mediaTaskId.ToString())); producer.Send(message); index++; if (index == Program.StatisticsMessageCountSpan) { stopwatch.Stop(); var spendSeconds = stopwatch.Elapsed.TotalSeconds; var speed = Program.StatisticsMessageCountSpan / spendSeconds; Debug.WriteLine("Send " + Program.StatisticsMessageCountSpan + " Messages Spend:" + spendSeconds + " Seconds. (" + speed.ToString("0.00") + "/s)"); stopwatch.Reset(); index = 1; } } catch (Exception ex) { Debug.WriteLine(ex); } } } } } MessageBox.Show(@"Send Done!"); }
简单监听封装类:
using System;
using System.Diagnostics;
using System.Threading;
using Apache.NMS;
namespace ActiveMQ.PerformanceTest
{
public class ActiveMqListener
{
public event MessageReceivedEventHandler MessageReceived;
private readonly IMessageConsumer _messageConsumer;
private readonly TimeSpan _timeout;
private readonly int _sleepMinutes;
private bool _listen;
public ActiveMqListener(IMessageConsumer messageConsumer, TimeSpan timeout, int sleepMinutes)
{
_messageConsumer = messageConsumer;
_timeout = timeout;
_sleepMinutes = sleepMinutes;
}
public void Start()
{
_listen = true;
StartListening();
}
public void Stop()
{
_listen = false;
}
private void StartListening()
{
while (_listen)
{
var message = _messageConsumer.Receive(_timeout);
if (message == null)
{
Debug.WriteLine("[%Notice:ActiveMqListener Start " + _sleepMinutes + " Minutes Sleep.%]");
Thread.Sleep(_sleepMinutes * 1000 * 60);
}
else
{
FireRecieveEvent(message);
}
}
}
private void FireRecieveEvent(object message)
{
if (MessageReceived != null)
{
MessageReceived(this, new MessageEventArgs(message));
}
}
}
public delegate void MessageReceivedEventHandler(object sender, MessageEventArgs args);
public class MessageEventArgs : EventArgs
{
private readonly object _message;
public object Message
{
get { return _message; }
}
public MessageEventArgs(object message)
{
_message = message;
}
}
}
完整代码下载地址:
http://files.cnblogs.com/CopyPaster/ActiveMQ.PerformanceTest.rar
转载于:https://www.cnblogs.com/CopyPaster/archive/2012/04/27/2473205.html
最后
以上就是优美荔枝为你收集整理的[MQ]ActiveMQ消息收发简单例子的全部内容,希望文章能够帮你解决[MQ]ActiveMQ消息收发简单例子所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复