Method FetchNoWaitAsync
FetchNoWaitAsync<T>(NatsJSFetchOpts, INatsDeserialize<T>?, CancellationToken)
Consume a set number of messages from the stream using this consumer. Returns immediately if no messages are available.
public IAsyncEnumerable<NatsJSMsg<T>> FetchNoWaitAsync<T>(NatsJSFetchOpts opts, INatsDeserialize<T>? serializer = null, CancellationToken cancellationToken = default)
Parameters
opts
NatsJSFetchOptsFetch options. (default:
MaxMsgs
1,000 and timeout is ignored)serializer
INatsDeserialize<T>Serializer to use for the message type.
cancellationToken
CancellationTokenA CancellationToken used to cancel the call.
Returns
- IAsyncEnumerable<NatsJSMsg<T>>
Async enumerable of messages which can be used in a
await foreach
loop.
Type Parameters
T
Message type to deserialize.
Examples
However, there are scenarios where this method is useful. For example if your application is
processing messages in batches infrequently (for example every 5 minutes) you might want to
consider FetchNoWait
. You must make sure to count your messages and stop fetching
if you received all of them in one call, meaning when count < MaxMsgs
.
const int max = 10;
var count = 0;
await foreach (var msg in consumer.FetchAllNoWaitAsync<int>(new NatsJSFetchOpts { MaxMsgs = max }))
{
count++;
Process(msg);
await msg.AckAsync();
}
if (count < max)
{
// No more messages. Pause for more.
await Task.Delay(TimeSpan.FromMinutes(5));
}
Remarks
This method will return immediately if no messages are available.
Using this method is discouraged because it might create an unnecessary load on your cluster.
Use Consume
or Fetch
instead.
Exceptions
- NatsJSProtocolException
Consumer is deleted, it's push based or request sent to server is invalid.
- NatsJSException
There is an error sending the message or this consumer object isn't valid anymore because it was deleted earlier.