我是靠谱客的博主 微笑乐曲,这篇文章主要介绍Actor Mailbox,现在分享给大家,希望可以做个参考。

ActorMailbox:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
internal static class MailboxStatus { public const int Idle = 0; public const int Busy = 1; } public class UnboundedMailboxQueue { private readonly ConcurrentQueue<object> _messages = new ConcurrentQueue<object>(); public void Push(object message) { _messages.Enqueue(message); } public object Pop() { object message; return _messages.TryDequeue(out message) ? message : null; } public bool HasMessages { get { return !_messages.IsEmpty; } } } public interface IMessageInvoker { Task InvokeMessageAsync(object msg); void EscalateFailure(Exception reason, object message); } public interface IDispatcher { int Throughput { get; } void Schedule(Func<Task> runner); } public sealed class ThreadPoolDispatcher : IDispatcher { public ThreadPoolDispatcher() { Throughput = 300; } public void Schedule(Func<Task> runner) => Task.Factory.StartNew(runner, TaskCreationOptions.None); public int Throughput { get; set; } } public static class Dispatchers { public static ThreadPoolDispatcher DefaultDispatcher { get; } = new ThreadPoolDispatcher(); } public class ActorMailbox { private readonly UnboundedMailboxQueue _mailbox; private IDispatcher _dispatcher; private IMessageInvoker _invoker; private int _status = MailboxStatus.Idle; public ActorMailbox() { _mailbox = new UnboundedMailboxQueue(); } public void RegisterHandlers(IMessageInvoker invoker) { _invoker = invoker; _dispatcher = Dispatchers.DefaultDispatcher; } public void PostMessage(object msg) { _mailbox.Push(msg); Schedule(); } private Task RunAsync() { var done = ProcessMessages(); if (!done) // mailbox is halted, awaiting completion of a message task, upon which mailbox will be rescheduled return Task.FromResult(0); Interlocked.Exchange(ref _status, MailboxStatus.Idle); if (_mailbox.HasMessages) { Schedule(); } return Task.FromResult(0); } private bool ProcessMessages() { object msg = null; try { for (var i = 0; i < _dispatcher.Throughput; i++) { if ((msg = _mailbox.Pop()) != null) { var t = _invoker.InvokeMessageAsync(msg); if (t.IsFaulted) { _invoker.EscalateFailure(t.Exception, msg); continue; } if (!t.IsCompleted) { // if task didn't complete immediately, halt processing and reschedule a new run when task completes t.ContinueWith(RescheduleOnTaskComplete, msg); return false; } } else { break; } } } catch (Exception e) { _invoker.EscalateFailure(e, msg); } return true; } private void RescheduleOnTaskComplete(Task task, object message) { if (task.IsFaulted) { _invoker.EscalateFailure(task.Exception, message); } _dispatcher.Schedule(RunAsync); } protected void Schedule() { if (Interlocked.CompareExchange(ref _status, MailboxStatus.Busy, MailboxStatus.Idle) == MailboxStatus.Idle) { _dispatcher.Schedule(RunAsync); } } }

  使用方法:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class Program { public static AutoResetEvent resetEvent = new AutoResetEvent(false); public static int maxCount = 10000000; static void Main(string[] args) { RoomData room = new RoomData(); Stopwatch watch = new Stopwatch(); watch.Start(); int j = 1; for (int i = 0; i < maxCount; i++) { room.Tell(i); //j++; //if (j >= Program.maxCount) // Program.resetEvent.Set(); } resetEvent.WaitOne(); watch.Stop(); Console.WriteLine("{0},{1}", maxCount * 1000.0 / watch.ElapsedMilliseconds, watch.ElapsedMilliseconds); Console.Read(); } } public class TestMessage { public string Message { get; set; } } public class MailboxHandler : IMessageInvoker { int i = 1; public Task InvokeMessageAsync(object msg) { //return ((TestMessage)msg).TaskCompletionSource.Task; //Console.Write(msg); //throw new Exception(msg.ToString()); i++; if (i >= Program.maxCount) Program.resetEvent.Set(); return Task.FromResult(0); } public void EscalateFailure(Exception reason, object message) { //EscalatedFailures.Add(reason); Console.WriteLine("执行异常:{0},{1},{2}", message, reason.Message, reason.StackTrace); } } public class RoomData { private ActorMailbox _mailbox; private MailboxHandler _handler; public RoomData() { _mailbox = new ActorMailbox(); _handler = new MailboxHandler(); _mailbox.RegisterHandlers(_handler); } public void Tell(object msg) { _mailbox.PostMessage(msg); } }

  

转载于:https://www.cnblogs.com/Jeece/p/8094039.html

最后

以上就是微笑乐曲最近收集整理的关于Actor Mailbox的全部内容,更多相关Actor内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(44)

评论列表共有 0 条评论

立即
投稿
返回
顶部