Skip to content

Commit 7e7f8f2

Browse files
committed
Refactor sending and receiving of RPCs to not use entities, thereby preserving send/receive order without having to encode an order value and reorder messages on the receive side.
1 parent 78874ff commit 7e7f8f2

File tree

2 files changed

+65
-63
lines changed

2 files changed

+65
-63
lines changed

com.unity.netcode.gameobjects/Runtime/Core/NetworkManager.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -472,9 +472,10 @@ public void NetworkUpdate(NetworkUpdateStage updateStage)
472472

473473
// This should be invoked just prior to the MessageManager processes its outbound queue.
474474
SceneManager.CheckForAndSendNetworkObjectSceneChanged();
475-
475+
#if !UNIFIED_NETCODE
476476
// Process outbound messages
477477
MessageManager.ProcessSendQueues();
478+
#endif
478479

479480
// Metrics update needs to be driven by NetworkConnectionManager's update to assure metrics are dispatched after the send queue is processed.
480481
MetricsManager.UpdateMetrics();

com.unity.netcode.gameobjects/Runtime/Transports/Unified/UnifiedNetcodeTransport.cs

Lines changed: 63 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -43,37 +43,41 @@ public static NativeArray<byte> ToNativeArray(in FixedBytes1280 data)
4343
}
4444
}
4545

46+
internal struct TransportRpcData : IBufferElementData
47+
{
48+
public FixedBytes1280 Buffer;
49+
}
50+
4651
[BurstCompile]
4752
internal struct TransportRpc : IOutOfBandRpcCommand, IRpcCommandSerializer<TransportRpc>
4853
{
49-
public FixedBytes1280 Buffer;
50-
public ulong Order;
54+
public TransportRpcData Value;
5155

5256
public unsafe void Serialize(ref DataStreamWriter writer, in RpcSerializerState state, in TransportRpc data)
5357
{
54-
writer.WriteULong(data.Order);
55-
writer.WriteInt(data.Buffer.Length);
56-
var span = new Span<byte>(FixedBytes1280.GetUnsafePtr(data.Buffer), data.Buffer.Length);
58+
writer.WriteInt(data.Value.Buffer.Length);
59+
var span = new Span<byte>(FixedBytes1280.GetUnsafePtr(data.Value.Buffer), data.Value.Buffer.Length);
5760
writer.WriteBytes(span);
5861
}
5962

6063
public unsafe void Deserialize(ref DataStreamReader reader, in RpcDeserializerState state, ref TransportRpc data)
6164
{
62-
data.Order = reader.ReadULong();
6365
var length = reader.ReadInt();
64-
data.Buffer = new FixedBytes1280
66+
data.Value.Buffer = new FixedBytes1280
6567
{
6668
Length = length
6769
};
6870

69-
var span = new Span<byte>(FixedBytes1280.GetUnsafePtr(data.Buffer), length);
71+
var span = new Span<byte>(FixedBytes1280.GetUnsafePtr(data.Value.Buffer), length);
7072
reader.ReadBytes(span);
7173
}
7274

7375
[BurstCompile(DisableDirectCall = true)]
7476
private static void InvokeExecute(ref RpcExecutor.Parameters parameters)
7577
{
76-
RpcExecutor.ExecuteCreateRequestComponent<TransportRpc, TransportRpc>(ref parameters);
78+
var element = new TransportRpc();
79+
element.Deserialize(ref parameters.Reader, parameters.DeserializerState, ref element);
80+
parameters.CommandBuffer.AppendToBuffer(parameters.JobIndex, parameters.Connection, element.Value);
7781
}
7882

7983
private static readonly PortableFunctionPointer<RpcExecutor.ExecuteDelegate> k_InvokeExecuteFunctionPointer = new PortableFunctionPointer<RpcExecutor.ExecuteDelegate>(InvokeExecute);
@@ -115,33 +119,59 @@ public void OnUpdate(ref SystemState state)
115119
}
116120
}
117121

122+
[WorldSystemFilter(WorldSystemFilterFlags.ServerSimulation | WorldSystemFilterFlags.ClientSimulation | WorldSystemFilterFlags.ThinClientSimulation)]
123+
[UpdateInGroup(typeof(SimulationSystemGroup), OrderLast = true)]
124+
[UpdateBefore(typeof(RpcSystem))]
118125
internal partial class UnifiedNetcodeUpdateSystem : SystemBase
119126
{
127+
public void OnCreate(ref SystemState state)
128+
{
129+
state.RequireForUpdate<RpcCollection>();
130+
state.RequireForUpdate<NetworkId>();
131+
}
132+
120133
public UnifiedNetcodeTransport Transport;
134+
public NetworkManager NetworkManager;
121135

122136
public List<Connection> DisconnectQueue = new List<Connection>();
123137

124138
public void Disconnect(Connection connection)
125139
{
126140
DisconnectQueue.Add(connection);
127141
}
142+
143+
public void SendRpc(TransportRpc rpc)
144+
{
145+
var rpcQueue = SystemAPI.GetSingleton<RpcCollection>().GetRpcQueue<TransportRpc, TransportRpc>();
146+
var ghostInstance = GetComponentLookup<GhostInstance>();
147+
foreach (var rpcDataStreamBuffer in SystemAPI.Query<DynamicBuffer<OutgoingRpcDataStreamBuffer>>())
148+
{
149+
rpcQueue.Schedule(rpcDataStreamBuffer, ghostInstance, rpc);
150+
}
151+
}
128152

129153
protected override void OnUpdate()
130154
{
155+
NetworkManager.MessageManager.ProcessSendQueues();
156+
131157
using var commandBuffer = new EntityCommandBuffer(Allocator.Temp);
132-
foreach (var (request, rpc, entity) in SystemAPI.Query<RefRO<ReceiveRpcCommandRequest>, RefRO<TransportRpc>>().WithEntityAccess())
158+
foreach(var (networkId, _, entity) in SystemAPI.Query<RefRO<NetworkId>, RefRO<NetworkStreamConnection>>().WithEntityAccess())
133159
{
134-
var connectionId = SystemAPI.GetComponent<NetworkId>(request.ValueRO.SourceConnection).Value;
135-
136-
var buffer = rpc.ValueRO.Buffer;
137-
try
160+
var connectionId = networkId.ValueRO.Value;
161+
DynamicBuffer<TransportRpcData> rpcs = EntityManager.GetBuffer<TransportRpcData>(entity);
162+
foreach (var rpc in rpcs)
138163
{
139-
Transport.DispatchMessage(connectionId, buffer, rpc.ValueRO.Order);
140-
}
141-
finally
142-
{
143-
commandBuffer.DestroyEntity(entity);
164+
var buffer = rpc.Buffer;
165+
try
166+
{
167+
Transport.DispatchMessage(connectionId, buffer);
168+
}
169+
catch(Exception e)
170+
{
171+
Debug.LogException(e);
172+
}
144173
}
174+
rpcs.Clear();
145175
}
146176

147177
foreach (var connection in DisconnectQueue)
@@ -171,34 +201,15 @@ private class ConnectionInfo
171201
public BatchedSendQueue SendQueue;
172202
public BatchedReceiveQueue ReceiveQueue;
173203
public Connection Connection;
174-
public ulong LastSent;
175-
public ulong LastReceived;
176204
public Dictionary<ulong, FixedBytes1280> DeferredMessages;
177205
}
178206

179207
private Dictionary<int, ConnectionInfo> m_Connections;
180208

181-
internal void DispatchMessage(int connectionId, in FixedBytes1280 buffer, ulong order)
209+
internal void DispatchMessage(int connectionId, in FixedBytes1280 buffer)
182210
{
183211
var connectionInfo = m_Connections[connectionId];
184212

185-
if (order <= connectionInfo.LastReceived)
186-
{
187-
Debug.LogWarning("Received duplicate message, ignoring.");
188-
return;
189-
}
190-
191-
if (order != connectionInfo.LastReceived + 1)
192-
{
193-
if (connectionInfo.DeferredMessages == null)
194-
{
195-
connectionInfo.DeferredMessages = new Dictionary<ulong, FixedBytes1280>();
196-
}
197-
198-
connectionInfo.DeferredMessages[order] = buffer;
199-
return;
200-
}
201-
202213
using var arr = FixedBytes1280.ToNativeArray(buffer);
203214
var reader = new DataStreamReader(arr);
204215
if (connectionInfo.ReceiveQueue == null)
@@ -209,20 +220,7 @@ internal void DispatchMessage(int connectionId, in FixedBytes1280 buffer, ulong
209220
{
210221
connectionInfo.ReceiveQueue.PushReader(reader);
211222
}
212-
213-
connectionInfo.LastReceived = order;
214-
if (connectionInfo.DeferredMessages != null)
215-
{
216-
var next = order + 1;
217-
while (connectionInfo.DeferredMessages.Remove(next, out var nextBuffer))
218-
{
219-
reader = new DataStreamReader(FixedBytes1280.ToNativeArray(nextBuffer));
220-
connectionInfo.ReceiveQueue.PushReader(reader);
221-
connectionInfo.LastReceived = next;
222-
++next;
223-
}
224-
}
225-
223+
226224
var message = connectionInfo.ReceiveQueue.PopMessage();
227225
while (message.Count != 0)
228226
{
@@ -243,18 +241,15 @@ public override unsafe void Send(ulong clientId, ArraySegment<byte> payload, Net
243241

244242
while (!connectionInfo.SendQueue.IsEmpty)
245243
{
246-
var rpc = new TransportRpc
247-
{
248-
Buffer = new FixedBytes1280(),
249-
};
244+
var rpc = new TransportRpc();
250245

251-
var writer = new DataStreamWriter(FixedBytes1280.GetUnsafePtr(rpc.Buffer), k_MaxPacketSize);
246+
var writer = new DataStreamWriter(FixedBytes1280.GetUnsafePtr(rpc.Value.Buffer), k_MaxPacketSize);
252247

253248
var amount = connectionInfo.SendQueue.FillWriterWithBytes(ref writer, k_MaxPacketSize);
254-
rpc.Buffer.Length = amount;
255-
rpc.Order = ++connectionInfo.LastSent;
256-
257-
connectionInfo.Connection.SendOutOfBandMessage(rpc);
249+
rpc.Value.Buffer.Length = amount;
250+
251+
var updateSystem = NetCode.Netcode.GetWorld(false).GetExistingSystemManaged<UnifiedNetcodeUpdateSystem>();
252+
updateSystem.SendRpc(rpc);
258253

259254
connectionInfo.SendQueue.Consume(amount);
260255
}
@@ -278,6 +273,8 @@ private void OnClientConnectedToServer(Connection connection, NetCodeConnectionE
278273
};
279274
m_ServerClientId = connection.NetworkId.Value;
280275
InvokeOnTransportEvent(NetworkEvent.Connect, (ulong)connection.NetworkId.Value, default, m_RealTimeProvider.RealTimeSinceStartup);
276+
var updateSystem = NetCode.Netcode.GetWorld(false).GetExistingSystemManaged<UnifiedNetcodeUpdateSystem>();
277+
updateSystem.EntityManager.AddBuffer<TransportRpcData>(connection.ConnectionEntity);
281278
}
282279

283280
private void OnServerNewClientConnection(Connection connection, NetCodeConnectionEvent connectionEvent)
@@ -289,6 +286,8 @@ private void OnServerNewClientConnection(Connection connection, NetCodeConnectio
289286
Connection = connection
290287
}; ;
291288
InvokeOnTransportEvent(NetworkEvent.Connect, (ulong)connection.NetworkId.Value, default, m_RealTimeProvider.RealTimeSinceStartup);
289+
var updateSystem = NetCode.Netcode.GetWorld(false).GetExistingSystemManaged<UnifiedNetcodeUpdateSystem>();
290+
updateSystem.EntityManager.AddBuffer<TransportRpcData>(connection.ConnectionEntity);
292291
}
293292

294293
private const string k_InvalidRpcMessage = "An invalid RPC was received";
@@ -376,6 +375,7 @@ public override bool StartClient()
376375
NetCode.Netcode.Client.OnDisconnect = OnClientDisconnectFromServer;
377376
var updateSystem = NetCode.Netcode.GetWorld(false).GetExistingSystemManaged<UnifiedNetcodeUpdateSystem>();
378377
updateSystem.Transport = this;
378+
updateSystem.NetworkManager = m_NetworkManager;
379379
return true;
380380
}
381381

@@ -390,6 +390,7 @@ public override bool StartServer()
390390
NetCode.Netcode.Server.OnDisconnect = OnServerClientDisconnected;
391391
var updateSystem = NetCode.Netcode.GetWorld(true).GetExistingSystemManaged<UnifiedNetcodeUpdateSystem>();
392392
updateSystem.Transport = this;
393+
updateSystem.NetworkManager = m_NetworkManager;
393394
return true;
394395
}
395396

0 commit comments

Comments
 (0)