Skip to content

Commit f72e494

Browse files
More pipe improvements by @Youssef1313 in #6498 (backport to rel/3.10) (#6539)
Co-authored-by: Youssef1313 <youssefvictor00@gmail.com>
1 parent ac62226 commit f72e494

File tree

2 files changed

+36
-26
lines changed

2 files changed

+36
-26
lines changed

src/Platform/Microsoft.Testing.Platform/IPC/NamedPipeClient.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ namespace Microsoft.Testing.Platform.IPC;
1818
#pragma warning disable CA1416 // Validate platform compatibility
1919
internal sealed class NamedPipeClient : NamedPipeBase, IClient
2020
{
21+
private const PipeOptions CurrentUserPipeOptions = PipeOptions.None
22+
#if NET
23+
| PipeOptions.CurrentUserOnly
24+
#endif
25+
;
26+
2127
private readonly NamedPipeClientStream _namedPipeClientStream;
2228
private readonly SemaphoreSlim _lock = new(1, 1);
2329

@@ -36,7 +42,7 @@ public NamedPipeClient(string name)
3642
public NamedPipeClient(string name, IEnvironment environment)
3743
{
3844
Guard.NotNull(name);
39-
_namedPipeClientStream = new(".", name, PipeDirection.InOut);
45+
_namedPipeClientStream = new(".", name, PipeDirection.InOut, CurrentUserPipeOptions);
4046
PipeName = name;
4147
_environment = environment;
4248
}

src/Platform/Microsoft.Testing.Platform/IPC/NamedPipeServer.cs

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
// Copyright (c) Microsoft Corporation. All rights reserved.
22
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
33

4-
#if NET
5-
using System.Buffers;
6-
#endif
74
using System.IO.Pipes;
85

96
using Microsoft.Testing.Platform.Helpers;
@@ -17,6 +14,14 @@ namespace Microsoft.Testing.Platform.IPC;
1714

1815
internal sealed class NamedPipeServer : NamedPipeBase, IServer
1916
{
17+
#pragma warning disable CA1416 // Validate platform compatibility
18+
private const PipeOptions AsyncCurrentUserPipeOptions = PipeOptions.Asynchronous
19+
#if NET
20+
| PipeOptions.CurrentUserOnly
21+
#endif
22+
;
23+
#pragma warning restore CA1416 // Validate platform compatibility
24+
2025
private readonly Func<IRequest, Task<IResponse>> _callback;
2126
private readonly IEnvironment _environment;
2227
private readonly NamedPipeServerStream _namedPipeServerStream;
@@ -26,6 +31,9 @@ internal sealed class NamedPipeServer : NamedPipeBase, IServer
2631
private readonly MemoryStream _serializationBuffer = new();
2732
private readonly MemoryStream _messageBuffer = new();
2833
private readonly byte[] _readBuffer = new byte[250000];
34+
#if NET
35+
private readonly byte[] _sizeOfIntArray = new byte[sizeof(int)];
36+
#endif
2937
private Task? _loopTask;
3038
private bool _disposed;
3139

@@ -62,7 +70,7 @@ public NamedPipeServer(
6270
{
6371
Guard.NotNull(pipeNameDescription);
6472
#pragma warning disable CA1416 // Validate platform compatibility
65-
_namedPipeServerStream = new((PipeName = pipeNameDescription).Name, PipeDirection.InOut, maxNumberOfServerInstances, PipeTransmissionMode.Byte, PipeOptions.Asynchronous);
73+
_namedPipeServerStream = new((PipeName = pipeNameDescription).Name, PipeDirection.InOut, maxNumberOfServerInstances, PipeTransmissionMode.Byte, AsyncCurrentUserPipeOptions);
6674
#pragma warning restore CA1416
6775
_callback = callback;
6876
_environment = environment;
@@ -142,6 +150,11 @@ private async Task InternalLoopAsync(CancellationToken cancellationToken)
142150
if (currentMessageSize == 0)
143151
{
144152
// We need to read the message size, first 4 bytes
153+
if (currentReadBytes < sizeof(int))
154+
{
155+
throw ApplicationStateGuard.Unreachable();
156+
}
157+
145158
currentMessageSize = BitConverter.ToInt32(_readBuffer, 0);
146159
missingBytesToReadOfCurrentChunk = currentReadBytes - sizeof(int);
147160
missingBytesToReadOfWholeMessage = currentMessageSize;
@@ -159,6 +172,11 @@ private async Task InternalLoopAsync(CancellationToken cancellationToken)
159172
missingBytesToReadOfWholeMessage -= missingBytesToReadOfCurrentChunk;
160173
}
161174

175+
if (missingBytesToReadOfWholeMessage < 0)
176+
{
177+
throw ApplicationStateGuard.Unreachable();
178+
}
179+
162180
// If we have read all the message, we can deserialize it
163181
if (missingBytesToReadOfWholeMessage == 0)
164182
{
@@ -196,34 +214,20 @@ private async Task InternalLoopAsync(CancellationToken cancellationToken)
196214

197215
// Write the message size
198216
#if NET
199-
byte[] bytes = ArrayPool<byte>.Shared.Rent(sizeof(int));
200-
try
201-
{
202-
ApplicationStateGuard.Ensure(BitConverter.TryWriteBytes(bytes, sizeOfTheWholeMessage), PlatformResources.UnexpectedExceptionDuringByteConversionErrorMessage);
203-
204-
await _messageBuffer.WriteAsync(bytes.AsMemory(0, sizeof(int)), cancellationToken).ConfigureAwait(false);
205-
}
206-
finally
207-
{
208-
ArrayPool<byte>.Shared.Return(bytes);
209-
}
217+
byte[] bytes = _sizeOfIntArray;
218+
ApplicationStateGuard.Ensure(BitConverter.TryWriteBytes(bytes, sizeOfTheWholeMessage), PlatformResources.UnexpectedExceptionDuringByteConversionErrorMessage);
219+
ApplicationStateGuard.Ensure(bytes.Length == sizeof(int));
220+
await _messageBuffer.WriteAsync(bytes, cancellationToken).ConfigureAwait(false);
210221
#else
211222
await _messageBuffer.WriteAsync(BitConverter.GetBytes(sizeOfTheWholeMessage), 0, sizeof(int), cancellationToken).ConfigureAwait(false);
212223
#endif
213224

214225
// Write the serializer id
215226
#if NET
216-
bytes = ArrayPool<byte>.Shared.Rent(sizeof(int));
217-
try
218-
{
219-
ApplicationStateGuard.Ensure(BitConverter.TryWriteBytes(bytes, responseNamedPipeSerializer.Id), PlatformResources.UnexpectedExceptionDuringByteConversionErrorMessage);
227+
bytes = _sizeOfIntArray;
228+
ApplicationStateGuard.Ensure(BitConverter.TryWriteBytes(bytes, responseNamedPipeSerializer.Id), PlatformResources.UnexpectedExceptionDuringByteConversionErrorMessage);
220229

221-
await _messageBuffer.WriteAsync(bytes.AsMemory(0, sizeof(int)), cancellationToken).ConfigureAwait(false);
222-
}
223-
finally
224-
{
225-
ArrayPool<byte>.Shared.Return(bytes);
226-
}
230+
await _messageBuffer.WriteAsync(bytes.AsMemory(0, sizeof(int)), cancellationToken).ConfigureAwait(false);
227231
#else
228232
await _messageBuffer.WriteAsync(BitConverter.GetBytes(responseNamedPipeSerializer.Id), 0, sizeof(int), cancellationToken).ConfigureAwait(false);
229233
#endif

0 commit comments

Comments
 (0)