Skip to content
This repository was archived by the owner on Jun 11, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions src/Microsoft.Azure.EventHubs.Processor/EventHubPartitionPump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,30 @@ async Task OpenClientsAsync() // throws EventHubsException, IOException, Interru
ProcessorEventSource.Log.PartitionPumpCreateClientsStart(this.Host.Id, this.PartitionContext.PartitionId, epoch, startAt?.ToString());
this.eventHubClient = EventHubClient.CreateFromConnectionString(this.Host.EventHubConnectionString);

var receiverOptions = new ReceiverOptions()
{
// Enable receiver metrics?
EnableReceiverRuntimeMetric = this.Host.EventProcessorOptions.EnableReceiverRuntimeMetric
};

// Create new receiver and set options
if (startAt is string)
{
this.partitionReceiver = this.eventHubClient.CreateEpochReceiver(this.PartitionContext.ConsumerGroupName, this.PartitionContext.PartitionId, (string)startAt, epoch);
this.partitionReceiver = this.eventHubClient.CreateEpochReceiver(
this.PartitionContext.ConsumerGroupName,
this.PartitionContext.PartitionId,
(string)startAt,
epoch,
receiverOptions);
}
else if (startAt is DateTime)
{
this.partitionReceiver = this.eventHubClient.CreateEpochReceiver(this.PartitionContext.ConsumerGroupName, this.PartitionContext.PartitionId, (DateTime)startAt, epoch);
this.partitionReceiver = this.eventHubClient.CreateEpochReceiver(
this.PartitionContext.ConsumerGroupName,
this.PartitionContext.PartitionId,
(DateTime)startAt,
epoch,
receiverOptions);
}

this.partitionReceiver.PrefetchCount = this.Host.EventProcessorOptions.PrefetchCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ public void SetExceptionHandler(Action<ExceptionReceivedEventArgs> exceptionHand
/// </summary>
public TimeSpan ReceiveTimeout { get; set; }

/// <summary> Gets or sets a value indicating whether the runtime metric of a receiver is enabled. </summary>
/// <value> true if a client wants to access <see cref="ReceiverRuntimeInformation"/> using <see cref="PartitionContext"/>.</value>
public bool EnableReceiverRuntimeMetric
{
get;
set;
}

/// <summary>
/// Gets or sets the current prefetch count for the underlying client.
/// The default is 300.
Expand Down
11 changes: 11 additions & 0 deletions src/Microsoft.Azure.EventHubs.Processor/PartitionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ internal PartitionContext(EventProcessorHost host, string partitionId, string ev
this.ThisLock = new object();
this.Offset = PartitionReceiver.StartOfStream;
this.SequenceNumber = 0;
this.RuntimeInformation = new ReceiverRuntimeInformation(partitionId);
}

/// <summary>
Expand Down Expand Up @@ -50,6 +51,16 @@ public string Owner
}
}

/// <summary>
/// Gets the approximate receiver runtime information for a logical partition of an Event Hub.
/// To enable the setting, refer to <see cref="EventProcessorOptions.EnableReceiverRuntimeMetric"/>
/// </summary>
public ReceiverRuntimeInformation RuntimeInformation
{
get;
private set;
}

internal string Offset { get; set; }

internal long SequenceNumber { get; set; }
Expand Down
7 changes: 7 additions & 0 deletions src/Microsoft.Azure.EventHubs.Processor/PartitionPump.cs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ protected async Task ProcessEventsAsync(IEnumerable<EventData> events)
this.PartitionContext.PartitionId,
"Updating offset in partition context with end of batch " + last.SystemProperties.Offset + "/" + last.SystemProperties.SequenceNumber);
this.PartitionContext.SetOffsetAndSequenceNumber(last);
if (this.Host.EventProcessorOptions.EnableReceiverRuntimeMetric)
{
this.PartitionContext.RuntimeInformation.LastSequenceNumber = last.LastSequenceNumber;
this.PartitionContext.RuntimeInformation.LastEnqueuedOffset = last.LastEnqueuedOffset;
this.PartitionContext.RuntimeInformation.LastEnqueuedTimeUtc = last.LastEnqueuedTime;
this.PartitionContext.RuntimeInformation.RetrievalTime = last.RetrievalTime;
}
}

await this.Processor.ProcessEventsAsync(this.PartitionContext, events).ConfigureAwait(false);
Expand Down
6 changes: 6 additions & 0 deletions src/Microsoft.Azure.EventHubs/Amqp/AmqpClientConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class AmqpClientConstants
public static readonly AmqpSymbol BatchFlushIntervalName = AmqpConstants.Vendor + ":batch-flush-interval";
public static readonly AmqpSymbol EntityTypeName = AmqpConstants.Vendor + ":entity-type";
public static readonly AmqpSymbol TimeoutName = AmqpConstants.Vendor + ":timeout";
public static readonly AmqpSymbol EnableReceiverRuntimeMetricName = AmqpConstants.Vendor + ":enable-receiver-runtime-metric";

// Error codes
public static readonly AmqpSymbol DeadLetterName = AmqpConstants.Vendor + ":dead-letter";
Expand Down Expand Up @@ -64,6 +65,11 @@ class AmqpClientConstants
public const string ManagementEventHubCreatedAt = "created_at";
public const string ManagementEventHubPartitionCount = "partition_count";
public const string ManagementEventHubPartitionIds = "partition_ids";
public const string ManagementPartitionBeginSequenceNumber = "begin_sequence_number";
public const string ManagementPartitionLastEnqueuedSequenceNumber = "last_enqueued_sequence_number";
public const string ManagementPartitionLastEnqueuedOffset = "last_enqueued_offset";
public const string ManagementPartitionLastEnqueuedTimeUtc = "last_enqueued_time_utc";
public const string ManagementPartitionRuntimeInfoRetrievalTimeUtc = "runtime_info_retrieval_time_utc";

// Response codes
public const string ResponseStatusCode = "status-code";
Expand Down
4 changes: 2 additions & 2 deletions src/Microsoft.Azure.EventHubs/Amqp/AmqpEventHubClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ internal override EventDataSender OnCreateEventSender(string partitionId)
}

protected override PartitionReceiver OnCreateReceiver(
string consumerGroupName, string partitionId, string startOffset, bool offsetInclusive, DateTime? startTime, long? epoch)
string consumerGroupName, string partitionId, string startOffset, bool offsetInclusive, DateTime? startTime, long? epoch, ReceiverOptions receiverOptions)
{
return new AmqpPartitionReceiver(
this, consumerGroupName, partitionId, startOffset, offsetInclusive, startTime, epoch);
this, consumerGroupName, partitionId, startOffset, offsetInclusive, startTime, epoch, receiverOptions);
}

protected override Task OnCloseAsync()
Expand Down
44 changes: 27 additions & 17 deletions src/Microsoft.Azure.EventHubs/Amqp/AmqpMessageConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,23 +175,6 @@ static void UpdateEventDataHeaderAndProperties(AmqpMessage amqpMessage, EventDat
}
}

// Custom override for EventHub scenario. Note that these
// "can" override existing properties, which is intentional as
// in the EH these system properties take precedence over Amqp data.
//string publisher;
//if (amqpMessage.MessageAnnotations.Map.TryGetValue<string>(PublisherName, out publisher))
//{
// data.Publisher = publisher;
//}

//#if DEBUG
// short partitionId;
// if (amqpMessage.MessageAnnotations.Map.TryGetValue<short>(PartitionIdName, out partitionId))
// {
// data.PartitionId = partitionId;
// }
//#endif

if (data.SystemProperties == null)
{
data.SystemProperties = new EventData.SystemPropertiesCollection();
Expand Down Expand Up @@ -222,6 +205,33 @@ static void UpdateEventDataHeaderAndProperties(AmqpMessage amqpMessage, EventDat
}
}

if ((sections & SectionFlag.DeliveryAnnotations) != 0)
{
long lastSequenceNumber;
if (amqpMessage.DeliveryAnnotations.Map.TryGetValue<long>(AmqpClientConstants.ManagementPartitionLastEnqueuedSequenceNumber, out lastSequenceNumber))
{
data.LastSequenceNumber = lastSequenceNumber;
}

string lastEnqueuedOffset;
if (amqpMessage.DeliveryAnnotations.Map.TryGetValue<string>(AmqpClientConstants.ManagementPartitionLastEnqueuedOffset, out lastEnqueuedOffset))
{
data.LastEnqueuedOffset = lastEnqueuedOffset;
}

DateTime lastEnqueuedTime;
if (amqpMessage.DeliveryAnnotations.Map.TryGetValue<DateTime>(AmqpClientConstants.ManagementPartitionLastEnqueuedTimeUtc, out lastEnqueuedTime))
{
data.LastEnqueuedTime = lastEnqueuedTime;
}

DateTime retrievalTime;
if (amqpMessage.DeliveryAnnotations.Map.TryGetValue<DateTime>(AmqpClientConstants.ManagementPartitionRuntimeInfoRetrievalTimeUtc, out retrievalTime))
{
data.RetrievalTime = retrievalTime;
}
}

if ((sections & SectionFlag.ApplicationProperties) != 0)
{
foreach (KeyValuePair<MapKey, object> pair in amqpMessage.ApplicationProperties.Map)
Expand Down
15 changes: 13 additions & 2 deletions src/Microsoft.Azure.EventHubs/Amqp/AmqpPartitionReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Microsoft.Azure.EventHubs.Amqp
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Encoding;
using Microsoft.Azure.Amqp.Framing;

class AmqpPartitionReceiver : PartitionReceiver
Expand All @@ -26,8 +27,9 @@ public AmqpPartitionReceiver(
string startOffset,
bool offsetInclusive,
DateTime? startTime,
long? epoch)
: base(eventHubClient, consumerGroupName, partitionId, startOffset, offsetInclusive, startTime, epoch)
long? epoch,
ReceiverOptions receiverOptions)
: base(eventHubClient, consumerGroupName, partitionId, startOffset, offsetInclusive, startTime, epoch, receiverOptions)
{
string entityPath = eventHubClient.ConnectionStringBuilder.EntityPath;
this.Path = $"{entityPath}/ConsumerGroups/{consumerGroupName}/Partitions/{partitionId}";
Expand Down Expand Up @@ -215,6 +217,15 @@ async Task<ReceivingAmqpLink> CreateLinkAsync(TimeSpan timeout)
linkSettings.Target = new Target { Address = this.ClientId };
linkSettings.SettleType = SettleMode.SettleOnSend;

// Receiver metrics enabled?
if (this.ReceiverRuntimeMetricEnabled)
{
linkSettings.DesiredCapabilities = new Multiple<AmqpSymbol>(new List<AmqpSymbol>
{
AmqpClientConstants.EnableReceiverRuntimeMetricName
});
}

if (this.Epoch.HasValue)
{
linkSettings.AddProperty(AmqpClientConstants.AttachEpoch, this.Epoch.Value);
Expand Down
8 changes: 8 additions & 0 deletions src/Microsoft.Azure.EventHubs/EventData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ public SystemPropertiesCollection SystemProperties

internal AmqpMessage AmqpMessage { get; set; }

internal long LastSequenceNumber { get; set; }

internal string LastEnqueuedOffset { get; set; }

internal DateTime LastEnqueuedTime { get; set; }

internal DateTime RetrievalTime { get; set; }

/// <summary>
/// Disposes resources attached to an Event Data
/// </summary>
Expand Down
Loading