|
22 | 22 | import io.netty.channel.ChannelHandlerContext; |
23 | 23 | import io.netty.channel.ChannelOption; |
24 | 24 | import io.netty.channel.ChannelPipeline; |
| 25 | +import io.netty.channel.EventLoop; |
25 | 26 | import io.netty.handler.codec.LengthFieldBasedFrameDecoder; |
26 | 27 | import io.netty.handler.logging.LogLevel; |
27 | 28 | import io.netty.handler.logging.LoggingHandler; |
|
55 | 56 | import reactor.core.publisher.Mono; |
56 | 57 | import reactor.core.publisher.Operators; |
57 | 58 | import reactor.core.publisher.Sinks; |
| 59 | +import reactor.core.scheduler.Scheduler; |
58 | 60 | import reactor.core.scheduler.Schedulers; |
59 | 61 | import reactor.netty.Connection; |
60 | 62 | import reactor.netty.channel.AbortedException; |
@@ -108,6 +110,8 @@ public final class ReactorNettyClient implements Client { |
108 | 110 |
|
109 | 111 | private final Connection connection; |
110 | 112 |
|
| 113 | + private final Scheduler scheduler; |
| 114 | + |
111 | 115 | private ConnectionContext context; |
112 | 116 |
|
113 | 117 | private final Sinks.Many<Publisher<FrontendMessage>> requestSink = Sinks.many().unicast().onBackpressureBuffer(); |
@@ -167,6 +171,9 @@ private ReactorNettyClient(Connection connection, ConnectionSettings settings) { |
167 | 171 |
|
168 | 172 | this.context = connectionContext; |
169 | 173 |
|
| 174 | + EventLoop eventLoop = connection.channel().eventLoop(); |
| 175 | + this.scheduler = Schedulers.fromExecutorService(eventLoop, eventLoop.toString()); |
| 176 | + |
170 | 177 | AtomicReference<Throwable> receiveError = new AtomicReference<>(); |
171 | 178 |
|
172 | 179 | connection.inbound().receive() |
@@ -470,6 +477,11 @@ public Optional<Integer> getProcessId() { |
470 | 477 | return Optional.ofNullable(this.processId); |
471 | 478 | } |
472 | 479 |
|
| 480 | + @Override |
| 481 | + public Scheduler getScheduler() { |
| 482 | + return this.scheduler; |
| 483 | + } |
| 484 | + |
473 | 485 | @Override |
474 | 486 | public Optional<Integer> getSecretKey() { |
475 | 487 | return Optional.ofNullable(this.secretKey); |
|
0 commit comments