22
33import static com .google .common .base .Strings .isNullOrEmpty ;
44
5+ import com .google .common .base .Preconditions ;
56import com .google .protobuf .Any ;
67import envoy .api .v2 .ClusterDiscoveryServiceGrpc .ClusterDiscoveryServiceImplBase ;
78import envoy .api .v2 .Discovery .DiscoveryRequest ;
1819import io .grpc .Status ;
1920import io .grpc .stub .StreamObserver ;
2021import java .util .Collections ;
22+ import java .util .List ;
2123import java .util .Map ;
2224import java .util .Set ;
2325import java .util .concurrent .ConcurrentHashMap ;
2830
2931public class DiscoveryServer {
3032
31- private static final DiscoveryServerCallbacks DEFAULT_CALLBACKS = new DiscoveryServerCallbacks () { };
3233 private static final Logger LOGGER = LoggerFactory .getLogger (DiscoveryServer .class );
3334
3435 static final String ANY_TYPE_URL = "" ;
3536
36- private final DiscoveryServerCallbacks callbacks ;
37+ private final List < DiscoveryServerCallbacks > callbacks ;
3738 private final ConfigWatcher configWatcher ;
3839 private final AtomicLong streamCount = new AtomicLong ();
3940
41+ public DiscoveryServer (ConfigWatcher configWatcher ) {
42+ this (Collections .emptyList (), configWatcher );
43+ }
44+
4045 public DiscoveryServer (DiscoveryServerCallbacks callbacks , ConfigWatcher configWatcher ) {
41- this .callbacks = callbacks ;
42- this .configWatcher = configWatcher ;
46+ this (Collections .singletonList (callbacks ), configWatcher );
4347 }
4448
45- public DiscoveryServer (ConfigWatcher configWatcher ) {
46- this (DEFAULT_CALLBACKS , configWatcher );
49+ /**
50+ * Creates the server.
51+ * @param callbacks server callbacks
52+ * @param configWatcher source of configuration updates
53+ */
54+ public DiscoveryServer (List <DiscoveryServerCallbacks > callbacks , ConfigWatcher configWatcher ) {
55+ Preconditions .checkNotNull (callbacks , "callbacks cannot be null" );
56+ Preconditions .checkNotNull (configWatcher , "configWatcher cannot be null" );
57+
58+ this .callbacks = callbacks ;
59+ this .configWatcher = configWatcher ;
4760 }
4861
4962 /**
@@ -137,7 +150,7 @@ private StreamObserver<DiscoveryRequest> createRequestHandler(
137150
138151 LOGGER .info ("[{}] open stream from {}" , streamId , defaultTypeUrl );
139152
140- callbacks .onStreamOpen (streamId , defaultTypeUrl );
153+ callbacks .forEach ( cb -> cb . onStreamOpen (streamId , defaultTypeUrl ) );
141154
142155 return new StreamObserver <DiscoveryRequest >() {
143156
@@ -173,7 +186,7 @@ public void onNext(DiscoveryRequest request) {
173186 nonce ,
174187 request .getVersionInfo ());
175188
176- callbacks .onStreamRequest (streamId , request );
189+ callbacks .forEach ( cb -> cb . onStreamRequest (streamId , request ) );
177190
178191 for (String typeUrl : Resources .TYPE_URLS ) {
179192 DiscoveryResponse response = latestResponse .get (typeUrl );
@@ -213,7 +226,7 @@ public void onError(Throwable t) {
213226 }
214227
215228 try {
216- callbacks .onStreamCloseWithError (streamId , defaultTypeUrl , t );
229+ callbacks .forEach ( cb -> cb . onStreamCloseWithError (streamId , defaultTypeUrl , t ) );
217230 responseObserver .onError (Status .fromThrowable (t ).asException ());
218231 } finally {
219232 cancel ();
@@ -225,7 +238,7 @@ public void onCompleted() {
225238 LOGGER .info ("[{}] stream closed" , streamId );
226239
227240 try {
228- callbacks .onStreamClose (streamId , defaultTypeUrl );
241+ callbacks .forEach ( cb -> cb . onStreamClose (streamId , defaultTypeUrl ) );
229242 responseObserver .onCompleted ();
230243 } finally {
231244 cancel ();
@@ -248,7 +261,7 @@ private void send(Response response, String typeUrl) {
248261
249262 LOGGER .info ("[{}] response {} with nonce {} version {}" , streamId , typeUrl , nonce , response .version ());
250263
251- callbacks .onStreamResponse (streamId , response .request (), discoveryResponse );
264+ callbacks .forEach ( cb -> cb . onStreamResponse (streamId , response .request (), discoveryResponse ) );
252265
253266 // Store the latest response *before* we send the response. This ensures that by the time the request
254267 // is processed the map is guaranteed to be updated. Doing it afterwards leads to a race conditions
0 commit comments