lludp replace BeginReceiveFrom/AsyncEndReceive

This commit is contained in:
UbitUmarov
2024-04-27 14:30:02 +01:00
parent 800f47d718
commit 16fa47e4d3

View File

@@ -32,6 +32,8 @@ using log4net;
using OpenSim.Framework;
using OpenMetaverse;
using OpenMetaverse.Packets;
using System.Threading;
using System.Threading.Tasks;
namespace OpenSim.Region.ClientStack.LindenUDP
{
@@ -72,6 +74,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// <summary>Returns true if the server is currently listening for inbound packets, otherwise false</summary>
public bool IsRunningInbound { get; private set; }
public CancellationTokenSource InboundCancellationSource = new();
/// <summary>Returns true if the server is currently sending outbound packets, otherwise false</summary>
/// <remarks>If IsRunningOut = false, then any request to send a packet is simply dropped.</remarks>
@@ -240,10 +244,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
IsRunningInbound = true;
// kick off an async receive. The Start() method will return, the
// actual receives will occur asynchronously and will be caught in
// AsyncEndRecieve().
AsyncBeginReceive();
// kick start the receiver tasks dance.
Task.Run(AsyncBeginReceive);
}
}
@@ -264,6 +266,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
m_log.DebugFormat("[UDPBASE]: Stopping inbound UDP loop");
IsRunningInbound = false;
InboundCancellationSource.Cancel();
m_udpSocket.Close();
m_udpSocket = null;
}
@@ -276,119 +279,63 @@ namespace OpenSim.Region.ClientStack.LindenUDP
IsRunningOutbound = false;
}
private void AsyncBeginReceive()
private static IPEndPoint dummyIP = new IPEndPoint(IPAddress.Any, 0);
private static readonly ExpiringCacheOS<SocketAddress,EndPoint> IPEndpointsCache = new(300000);
private async void AsyncBeginReceive()
{
SocketAddress workSktAddress = new(m_udpSocket.AddressFamily);
while (IsRunningInbound)
{
UDPPacketBuffer buf = GetNewUDPBuffer(new IPEndPoint(IPAddress.Any, 0)); // we need a fresh one here, for now at least
UDPPacketBuffer buf = GetNewUDPBuffer(null); // we need a fresh one here, for now at least
try
{
// kick off an async read
IAsyncResult iar = m_udpSocket.BeginReceiveFrom(
buf.Data,
0,
buf.Data.Length,
SocketFlags.None,
ref buf.RemoteEndPoint,
AsyncEndReceive,
buf);
if (!iar.CompletedSynchronously)
int nbytes = await m_udpSocket.ReceiveFromAsync(buf.Data.AsMemory(), SocketFlags.None, workSktAddress, InboundCancellationSource.Token);
if (!IsRunningInbound)
{
FreeUDPBuffer(buf);
return;
}
catch (SocketException e)
{
if (e.SocketErrorCode == SocketError.ConnectionReset)
{
m_log.Warn("[UDPBASE]: SIO_UDP_CONNRESET was ignored, attempting to salvage the UDP listener on port " + m_udpPort);
{
try
{
IAsyncResult iar = m_udpSocket.BeginReceiveFrom(
buf.Data,
0,
buf.Data.Length,
SocketFlags.None,
ref buf.RemoteEndPoint,
AsyncEndReceive,
buf);
if (!iar.CompletedSynchronously)
return;
}
catch (SocketException) { }
catch (ObjectDisposedException) { return; }
}
m_log.Warn("[UDPBASE]: Salvaged the UDP listener on port " + m_udpPort);
}
}
catch (Exception e)
{
m_log.Error(
string.Format("[UDPBASE]: Error processing UDP begin receive {0}. Exception ", UdpReceives), e);
}
}
}
private void AsyncEndReceive(IAsyncResult iar)
{
if (IsRunningInbound)
{
bool sync = iar.CompletedSynchronously;
try
{
// get the buffer that was created in AsyncBeginReceive
// this is the received data
UDPPacketBuffer buffer = (UDPPacketBuffer)iar.AsyncState;
int startTick = Util.EnvironmentTickCount();
// get the length of data actually read from the socket, store it with the
// buffer
buffer.DataLength = m_udpSocket.EndReceiveFrom(iar, ref buffer.RemoteEndPoint);
UdpReceives++;
// call the abstract method PacketReceived(), passing the buffer that
// has just been filled from the socket read.
PacketReceived(buffer);
// If more than one thread can be calling AsyncEndReceive() at once (e.g. if m_asyncPacketHandler)
// then a particular stat may be inaccurate due to a race condition. We won't worry about this
// since this should be rare and won't cause a runtime problem.
if (m_currentReceiveTimeSamples >= s_receiveTimeSamples)
if (nbytes > 0)
{
AverageReceiveTicksForLastSamplePeriod
= (float)m_receiveTicksInCurrentSamplePeriod / s_receiveTimeSamples;
int startTick = Util.EnvironmentTickCount();
if(!IPEndpointsCache.TryGetValue(workSktAddress, 300000, out EndPoint ep))
{
ep = dummyIP.Create(workSktAddress);
IPEndpointsCache.AddOrUpdate(workSktAddress, ep, 300000);
}
m_receiveTicksInCurrentSamplePeriod = 0;
m_currentReceiveTimeSamples = 0;
buf.RemoteEndPoint = ep;
buf.DataLength = nbytes;
UdpReceives++;
PacketReceived(buf);
if (m_currentReceiveTimeSamples >= s_receiveTimeSamples)
{
AverageReceiveTicksForLastSamplePeriod
= (float)m_receiveTicksInCurrentSamplePeriod / s_receiveTimeSamples;
m_receiveTicksInCurrentSamplePeriod = 0;
m_currentReceiveTimeSamples = 0;
}
else
{
m_receiveTicksInCurrentSamplePeriod += Util.EnvironmentTickCountSubtract(startTick);
m_currentReceiveTimeSamples++;
}
}
else
{
m_receiveTicksInCurrentSamplePeriod += Util.EnvironmentTickCountSubtract(startTick);
m_currentReceiveTimeSamples++;
}
FreeUDPBuffer(buf);
}
catch (SocketException se)
catch (OperationCanceledException)
{
m_log.Error(
string.Format(
"[UDPBASE]: Error processing UDP end receive {0}, socket error code {1}. Exception ",
UdpReceives, se.ErrorCode),
se);
}
catch(ObjectDisposedException) { }
catch (Exception e)
{
m_log.Error(
string.Format("[UDPBASE]: Error processing UDP end receive {0}. Exception ", UdpReceives), e);
}
finally
{
if (IsRunningInbound && !sync)
AsyncBeginReceive();
m_log.Error($"[UDPBASE]: Error processing UDP receiveFrom. Exception ", e);
}
}
}