Csharp/C#教程:Azure Service Bus – 使用OnMessage()方法接收消息分享


Azure Service Bus – 使用OnMessage()方法接收消息

在MS文档之后,从订阅接收消息并不困难。 但是,如果我希望我的应用程序每次发布新消息时都会收到消息 – 持续轮询。 因此, SubscriptionClient类的OnMessage()方法。

MS文档说:……当调用OnMessage时,客户端启动一个内部消息泵,不断轮询队列或订阅。这个消息泵包含一个发出Receive()调用的无限循环。如果调用超时,它发出下一个Receive()调用….

但是当应用程序运行时,调用OnMessage()方法时,只接收最新消息。 发布新消息时,常量轮询似乎不起作用。 在尝试了许多不同的方法之后,我可以使这项工作成为唯一的方法并让应用程序在收到新消息时做出反应,即将代码放入具有无限循环的单独任务中。 在如此多的层面上,这似乎完全错了! (见下面的代码)。

任何人都可以帮我修正我的代码或发布一个工作样本来完成相同的function而不需要循环吗? 谢谢!

public void ReceiveMessageFromSubscription(string topicName, string subscriptionFilter) { var newMessage = new MessageQueue(); int i = 0; Task listener = Task.Factory.StartNew(() => { while (true) { SubscriptionClient Client = SubscriptionClient.CreateFromConnectionString(connectionString, topicName, subscriptionFilter); Dictionary retrievedMessage = new Dictionary(); OnMessageOptions options = new OnMessageOptions(); options.AutoComplete = false; options.AutoRenewTimeout = TimeSpan.FromMinutes(1); Client.OnMessage((message) => { try { retrievedMessage.Add("messageGuid", message.Properties["MessageGuid"].ToString()); retrievedMessage.Add("instanceId", message.Properties["InstanceId"].ToString()); retrievedMessage.Add("pId", message.Properties["ProcessId"].ToString()); retrievedMessage.Add("processKey", message.Properties["ProcessKey"].ToString()); retrievedMessage.Add("message", message.Properties["Message"].ToString()); newMessage.AnnounceNewMessage(retrievedMessage); // event -> message.Complete(); // Remove message from subscription. } catch (Exception ex) { string exmes = ex.Message; message.Abandon(); } }, options); retrievedMessage.Clear(); i++; Thread.Sleep(3000); } }); } 

您的代码有一些问题要解决 –

下面应该会看到你走向成功的道路。 祝好运。

  ManualResetEvent CompletedResetEvent = new ManualResetEvent(false); SubscriptionClient Client; public void ReceiveMessagesFromSubscription(string topicName, string subscriptionFilter, string connectionString) { Task listener = Task.Factory.StartNew(() => { // You only need to set up the below once. Client = SubscriptionClient.CreateFromConnectionString(connectionString, topicName, subscriptionFilter); OnMessageOptions options = new OnMessageOptions(); options.AutoComplete = false; options.AutoRenewTimeout = TimeSpan.FromMinutes(1); options.ExceptionReceived += LogErrors; Client.OnMessage((message) => { try { Trace.WriteLine("Got the message with ID {0}", message.MessageId); message.Complete(); // Remove message from subscription. } catch (Exception ex) { Trace.WriteLine("Exception occurred receiving a message: {0}" + ex.ToString()); message.Abandon(); // Failed. Leave the message for retry or max deliveries is exceeded and it dead letters. } }, options); CompletedResetEvent.WaitOne(); }); } ///  /// Added in rudimentary exception handling . ///  /// The sender. /// The  instance containing the event data. private void LogErrors(object sender, ExceptionReceivedEventArgs ex) { Trace.WriteLine("Exception occurred in OnMessage: {0}" + ex.ToString()); } ///  /// Call this to stop the messages arriving from subscription. ///  public void StopMessagesFromSubscription() { Client.Close(); // Close the message pump down gracefully CompletedResetEvent.Set(); // Let the execution of the listener complete and terminate gracefully } 

或者,您可以使用ReceiveBatch以更传统的方式将消息分块:

上述就是C#学习教程:Azure Service Bus – 使用OnMessage()方法接收消息分享的全部内容,如果对大家有所用处且需要了解更多关于C#学习教程,希望大家多多关注—计算机技术网(www.ctvol.com)!

 var messages = await queueClient.ReceiveBatchAsync(10, TimeSpan.FromSeconds(30), cancellationToken); 

本文来自网络收集,不代表计算机技术网立场,如涉及侵权请联系管理员删除。

ctvol管理联系方式QQ:251552304

本文章地址:https://www.ctvol.com/cdevelopment/1011877.html

(0)
上一篇 2021年12月29日
下一篇 2021年12月29日

精彩推荐