3030import java .util .Set ;
3131import java .util .concurrent .ConcurrentHashMap ;
3232import java .util .concurrent .Executor ;
33- import java .util .concurrent .atomic .AtomicBoolean ;
3433import java .util .concurrent .atomic .AtomicLong ;
3534import java .util .stream .Collectors ;
3635import org .slf4j .Logger ;
3736import org .slf4j .LoggerFactory ;
3837
3938public class DiscoveryServer {
4039
41- private static final Logger LOGGER = LoggerFactory .getLogger (DiscoveryServer .class );
42-
4340 static final String ANY_TYPE_URL = "" ;
44-
41+ private static final Logger LOGGER = LoggerFactory . getLogger ( DiscoveryServer . class );
4542 private final List <DiscoveryServerCallbacks > callbacks ;
4643 private final ConfigWatcher configWatcher ;
4744 private final ExecutorGroup executorGroup ;
@@ -58,7 +55,8 @@ public DiscoveryServer(DiscoveryServerCallbacks callbacks, ConfigWatcher configW
5855
5956 /**
6057 * Creates the server.
61- * @param callbacks server callbacks
58+ *
59+ * @param callbacks server callbacks
6260 * @param configWatcher source of configuration updates
6361 */
6462 public DiscoveryServer (List <DiscoveryServerCallbacks > callbacks , ConfigWatcher configWatcher ) {
@@ -67,9 +65,10 @@ public DiscoveryServer(List<DiscoveryServerCallbacks> callbacks, ConfigWatcher c
6765
6866 /**
6967 * Creates the server.
70- * @param callbacks server callbacks
71- * @param configWatcher source of configuration updates
72- * @param executorGroup executor group to use for responding stream requests
68+ *
69+ * @param callbacks server callbacks
70+ * @param configWatcher source of configuration updates
71+ * @param executorGroup executor group to use for responding stream requests
7372 * @param protoResourcesSerializer serializer of proto buffer messages
7473 */
7574 public DiscoveryServer (List <DiscoveryServerCallbacks > callbacks ,
@@ -162,7 +161,8 @@ public StreamObserver<DiscoveryRequest> streamRoutes(
162161 */
163162 public SecretDiscoveryServiceGrpc .SecretDiscoveryServiceImplBase getSecretDiscoveryServiceImpl () {
164163 return new SecretDiscoveryServiceGrpc .SecretDiscoveryServiceImplBase () {
165- @ Override public StreamObserver <DiscoveryRequest > streamSecrets (
164+ @ Override
165+ public StreamObserver <DiscoveryRequest > streamSecrets (
166166 StreamObserver <DiscoveryResponse > responseObserver ) {
167167 return createRequestHandler (responseObserver , false , Resources .SECRET_TYPE_URL );
168168 }
@@ -201,7 +201,7 @@ private class DiscoveryRequestStreamObserver implements StreamObserver<Discovery
201201 private final long streamId ;
202202 private final boolean ads ;
203203 private final Executor executor ;
204- private final AtomicBoolean isClosing = new AtomicBoolean () ;
204+ private boolean isClosing ;
205205
206206 private AtomicLong streamNonce ;
207207
@@ -214,10 +214,10 @@ public DiscoveryRequestStreamObserver(String defaultTypeUrl,
214214 this .responseObserver = responseObserver ;
215215 this .streamId = streamId ;
216216 this .ads = ads ;
217- watches = new ConcurrentHashMap <>(Resources .TYPE_URLS .size ());
218- latestResponse = new ConcurrentHashMap <>(Resources .TYPE_URLS .size ());
219- ackedResources = new ConcurrentHashMap <>(Resources .TYPE_URLS .size ());
220- streamNonce = new AtomicLong ();
217+ this . watches = new ConcurrentHashMap <>(Resources .TYPE_URLS .size ());
218+ this . latestResponse = new ConcurrentHashMap <>(Resources .TYPE_URLS .size ());
219+ this . ackedResources = new ConcurrentHashMap <>(Resources .TYPE_URLS .size ());
220+ this . streamNonce = new AtomicLong ();
221221 this .executor = executor ;
222222 }
223223
@@ -228,10 +228,15 @@ public void onNext(DiscoveryRequest request) {
228228
229229 if (defaultTypeUrl .equals (ANY_TYPE_URL )) {
230230 if (requestTypeUrl .isEmpty ()) {
231- responseObserver .onError (
232- Status .UNKNOWN
233- .withDescription (String .format ("[%d] type URL is required for ADS" , streamId ))
234- .asRuntimeException ());
231+ synchronized (responseObserver ) {
232+ if (!isClosing ) {
233+ isClosing = true ;
234+ responseObserver .onError (
235+ Status .UNKNOWN
236+ .withDescription (String .format ("[%d] type URL is required for ADS" , streamId ))
237+ .asRuntimeException ());
238+ }
239+ }
235240
236241 return ;
237242 }
@@ -292,7 +297,12 @@ public void onError(Throwable t) {
292297
293298 try {
294299 callbacks .forEach (cb -> cb .onStreamCloseWithError (streamId , defaultTypeUrl , t ));
295- responseObserver .onError (Status .fromThrowable (t ).asException ());
300+ synchronized (responseObserver ) {
301+ if (!isClosing ) {
302+ isClosing = true ;
303+ responseObserver .onError (Status .fromThrowable (t ).asException ());
304+ }
305+ }
296306 } finally {
297307 cancel ();
298308 }
@@ -304,7 +314,12 @@ public void onCompleted() {
304314
305315 try {
306316 callbacks .forEach (cb -> cb .onStreamClose (streamId , defaultTypeUrl ));
307- responseObserver .onCompleted ();
317+ synchronized (responseObserver ) {
318+ if (!isClosing ) {
319+ isClosing = true ;
320+ responseObserver .onCompleted ();
321+ }
322+ }
308323 } finally {
309324 cancel ();
310325 }
@@ -316,8 +331,11 @@ void onCancelled() {
316331 }
317332
318333 private void closeWithError (Throwable exception ) {
319- if (isClosing .compareAndSet (false , true )) {
320- responseObserver .onError (exception );
334+ synchronized (responseObserver ) {
335+ if (!isClosing ) {
336+ isClosing = true ;
337+ responseObserver .onError (exception );
338+ }
321339 }
322340 cancel ();
323341 }
@@ -345,11 +363,15 @@ private void send(Response response, String typeUrl) {
345363 // is processed the map is guaranteed to be updated. Doing it afterwards leads to a race conditions
346364 // which may see the incoming request arrive before the map is updated, failing the nonce check erroneously.
347365 latestResponse .put (typeUrl , discoveryResponse );
348- try {
349- responseObserver .onNext (discoveryResponse );
350- } catch (StatusRuntimeException e ) {
351- if (!Status .CANCELLED .getCode ().equals (e .getStatus ().getCode ())) {
352- throw e ;
366+ synchronized (responseObserver ) {
367+ if (!isClosing ) {
368+ try {
369+ responseObserver .onNext (discoveryResponse );
370+ } catch (StatusRuntimeException e ) {
371+ if (!Status .CANCELLED .getCode ().equals (e .getStatus ().getCode ())) {
372+ throw e ;
373+ }
374+ }
353375 }
354376 }
355377 }
0 commit comments