概述
ActorMailbox:
internal static class MailboxStatus
{
public const int Idle = 0;
public const int Busy = 1;
}
public class UnboundedMailboxQueue
{
private readonly ConcurrentQueue<object> _messages = new ConcurrentQueue<object>();
public void Push(object message)
{
_messages.Enqueue(message);
}
public object Pop()
{
object message;
return _messages.TryDequeue(out message) ? message : null;
}
public bool HasMessages { get { return !_messages.IsEmpty; } }
}
public interface IMessageInvoker
{
Task InvokeMessageAsync(object msg);
void EscalateFailure(Exception reason, object message);
}
public interface IDispatcher
{
int Throughput { get; }
void Schedule(Func<Task> runner);
}
public sealed class ThreadPoolDispatcher : IDispatcher
{
public ThreadPoolDispatcher()
{
Throughput = 300;
}
public void Schedule(Func<Task> runner) => Task.Factory.StartNew(runner, TaskCreationOptions.None);
public int Throughput { get; set; }
}
public static class Dispatchers
{
public static ThreadPoolDispatcher DefaultDispatcher { get; } = new ThreadPoolDispatcher();
}
public class ActorMailbox
{
private readonly UnboundedMailboxQueue _mailbox;
private IDispatcher _dispatcher;
private IMessageInvoker _invoker;
private int _status = MailboxStatus.Idle;
public ActorMailbox()
{
_mailbox = new UnboundedMailboxQueue();
}
public void RegisterHandlers(IMessageInvoker invoker)
{
_invoker = invoker;
_dispatcher = Dispatchers.DefaultDispatcher;
}
public void PostMessage(object msg)
{
_mailbox.Push(msg);
Schedule();
}
private Task RunAsync()
{
var done = ProcessMessages();
if (!done)
// mailbox is halted, awaiting completion of a message task, upon which mailbox will be rescheduled
return Task.FromResult(0);
Interlocked.Exchange(ref _status, MailboxStatus.Idle);
if (_mailbox.HasMessages)
{
Schedule();
}
return Task.FromResult(0);
}
private bool ProcessMessages()
{
object msg = null;
try
{
for (var i = 0; i < _dispatcher.Throughput; i++)
{
if ((msg = _mailbox.Pop()) != null)
{
var t = _invoker.InvokeMessageAsync(msg);
if (t.IsFaulted)
{
_invoker.EscalateFailure(t.Exception, msg);
continue;
}
if (!t.IsCompleted)
{
// if task didn't complete immediately, halt processing and reschedule a new run when task completes
t.ContinueWith(RescheduleOnTaskComplete, msg);
return false;
}
}
else
{
break;
}
}
}
catch (Exception e)
{
_invoker.EscalateFailure(e, msg);
}
return true;
}
private void RescheduleOnTaskComplete(Task task, object message)
{
if (task.IsFaulted)
{
_invoker.EscalateFailure(task.Exception, message);
}
_dispatcher.Schedule(RunAsync);
}
protected void Schedule()
{
if (Interlocked.CompareExchange(ref _status, MailboxStatus.Busy, MailboxStatus.Idle) == MailboxStatus.Idle)
{
_dispatcher.Schedule(RunAsync);
}
}
}
使用方法:
class Program
{
public static AutoResetEvent resetEvent = new AutoResetEvent(false);
public static int maxCount = 10000000;
static void Main(string[] args)
{
RoomData room = new RoomData();
Stopwatch watch = new Stopwatch();
watch.Start();
int j = 1;
for (int i = 0; i < maxCount; i++)
{
room.Tell(i);
//j++;
//if (j >= Program.maxCount)
//
Program.resetEvent.Set();
}
resetEvent.WaitOne();
watch.Stop();
Console.WriteLine("{0},{1}", maxCount * 1000.0 / watch.ElapsedMilliseconds, watch.ElapsedMilliseconds);
Console.Read();
}
}
public class TestMessage
{
public string Message { get; set; }
}
public class MailboxHandler : IMessageInvoker
{
int i = 1;
public Task InvokeMessageAsync(object msg)
{
//return ((TestMessage)msg).TaskCompletionSource.Task;
//Console.Write(msg);
//throw new Exception(msg.ToString());
i++;
if (i >= Program.maxCount)
Program.resetEvent.Set();
return Task.FromResult(0);
}
public void EscalateFailure(Exception reason, object message)
{
//EscalatedFailures.Add(reason);
Console.WriteLine("执行异常:{0},{1},{2}", message, reason.Message, reason.StackTrace);
}
}
public class RoomData
{
private ActorMailbox _mailbox;
private MailboxHandler _handler;
public RoomData()
{
_mailbox = new ActorMailbox();
_handler = new MailboxHandler();
_mailbox.RegisterHandlers(_handler);
}
public void Tell(object msg)
{
_mailbox.PostMessage(msg);
}
}
转载于:https://www.cnblogs.com/Jeece/p/8094039.html
最后
以上就是微笑乐曲为你收集整理的Actor Mailbox的全部内容,希望文章能够帮你解决Actor Mailbox所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复