Light Mode

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Copy the PublishPacket and then queue it#2115

Open
xljiulang wants to merge 4 commits intodotnet:masterfrom
xljiulang:sessions-manager
Open

Copy the PublishPacket and then queue it#2115
xljiulang wants to merge 4 commits intodotnet:masterfrom
xljiulang:sessions-manager

Conversation

Copy link
Contributor

xljiulang commented Nov 22, 2024 *
edited
Loading

This PR fixes #2113

xljiulang changed the title Retain the copy message #2113 Retain the copy message Nov 22, 2024
Copy link
Contributor Author

xljiulang commented Nov 22, 2024 *
edited
Loading

I found that this is a stubborn bug. After making the above fix, if I modify the logic of MqttChannelAdapter receiving data packets, the bug still exists.

So once a packet is received, we have to copy its payload out instead of using the Pool memory? In Mqttnet.AspNetCore, using Pool memory is the default behavior, which means that the project has never handled this situation correctly.

async Task<ReceivedMqttPacket> ReceiveAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return ReceivedMqttPacket.Empty;
}

var readFixedHeaderResult = await ReadFixedHeaderAsync(cancellationToken).ConfigureAwait(false);

if (cancellationToken.IsCancellationRequested)
{
return ReceivedMqttPacket.Empty;
}

if (readFixedHeaderResult.IsConnectionClosed)
{
return ReceivedMqttPacket.Empty;
}

var fixedHeader = readFixedHeaderResult.FixedHeader;
if (fixedHeader.RemainingLength == 0)
{
return new ReceivedMqttPacket(fixedHeader.Flags, ReadOnlySequence<byte>.Empty, 2);
}

var bodyLength = fixedHeader.RemainingLength;

// Return and clear the previous body buffer
_bodyOwner?.Dispose();

// Re-rent a body buffer
_bodyOwner = BufferOwner.Rent(bodyLength);
var body = _bodyOwner.Buffer;

var bodyOffset = 0;
var chunkSize = Math.Min(ReadBufferSize, bodyLength);

do
{
var bytesLeft = bodyLength - bodyOffset;
if (chunkSize > bytesLeft)
{
chunkSize = bytesLeft;
}

var readBytes = await _channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false);

if (cancellationToken.IsCancellationRequested)
{
return ReceivedMqttPacket.Empty;
}

if (readBytes == 0)
{
return ReceivedMqttPacket.Empty;
}

bodyOffset += readBytes;
} while (bodyOffset < bodyLength);

PacketInspector?.FillReceiveBuffer(body.AsSpan(0, bodyLength));

var bodySegment = body.AsMemory(0, bodyLength);
var bodySequence = new ReadOnlySequence<byte>(bodySegment);
return new ReceivedMqttPacket(fixedHeader.Flags, bodySequence, fixedHeader.TotalLength);
}
private sealed class BufferOwner : IDisposable
{
private bool _disposed = false;

public byte[] Buffer { get; private set; }

public static BufferOwner Rent(int bufferSieze)
{
return new BufferOwner()
{
Buffer = ArrayPool<byte>.Shared.Rent(bufferSieze)
};
}

public void Dispose()
{
if (!_disposed)
{
_disposed = true;
ArrayPool<byte>.Shared.Return(Buffer);
}
}
}

xljiulang changed the title Retain the copy message Copy the PublishPacket and then queue it Dec 5, 2024
xljiulang commented Dec 5, 2024
///
/// When enabled, received ApplicationMessage will be deep cloned and enqueued.
///
public bool ReceivedApplicationMessageQueueable { get; set; } = true;
Copy link
Contributor Author

xljiulang Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to keep this switch configuration in Options and let users choose. Can we radically change the default to false to reduce GC pressure?

xljiulang commented Dec 5, 2024
if (Options.ReceivedApplicationMessageQueueable)
{
// publishPacket must be copied
EnqueueReceivedPublishPacket(publishPacket.Clone());
Copy link
Contributor Author

xljiulang Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The publishPacket here must be cloned, otherwise problems will occur after replacing it with IMqttClientAdapterFactory of mqttnet.asp.
We can consider using Pool memory for MQTTnet.Adapter.MqttChannelAdapter to avoid unnecessary cloning of publishPacket.

Copy link
Contributor Author

xljiulang commented Dec 5, 2024

Hi @chkr1011
This PR is at the final stage, please take the time to review and give comments. After this PR is completed, I will focus on #2109, and finally #2103.

xljiulang mentioned this pull request Dec 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Reviewers

No reviews

Assignees

No one assigned

Labels

None yet

Projects

None yet

Milestone

No milestone

Development

Successfully merging this pull request may close these issues.

About the memory error problem of InjectApplicationMessage of MqttServer in client.WithCleanSession(false)

2 participants