66import io .envoyproxy .envoy .api .v2 .DiscoveryRequest ;
77import java .util .Collection ;
88import java .util .HashMap ;
9- import java .util .HashSet ;
109import java .util .Map ;
1110import java .util .Objects ;
1211import java .util .Set ;
12+ import java .util .concurrent .ConcurrentHashMap ;
13+ import java .util .concurrent .ConcurrentMap ;
14+ import java .util .concurrent .atomic .AtomicLong ;
1315import java .util .concurrent .locks .Lock ;
1416import java .util .concurrent .locks .ReadWriteLock ;
1517import java .util .concurrent .locks .ReentrantReadWriteLock ;
@@ -40,11 +42,9 @@ public class SimpleCache<T> implements SnapshotCache<T> {
4042
4143 @ GuardedBy ("lock" )
4244 private final Map <T , Snapshot > snapshots = new HashMap <>();
43- @ GuardedBy ("lock" )
44- private final Map <T , CacheStatusInfo <T >> statuses = new HashMap <>();
45+ private final ConcurrentMap <T , CacheStatusInfo <T >> statuses = new ConcurrentHashMap <>();
4546
46- @ GuardedBy ("lock" )
47- private long watchCount ;
47+ private AtomicLong watchCount = new AtomicLong ();
4848
4949 /**
5050 * Constructs a simple cache.
@@ -58,9 +58,10 @@ public SimpleCache(NodeGroup<T> groups) {
5858 /**
5959 * {@inheritDoc}
6060 */
61- @ Override public boolean clearSnapshot (T group ) {
61+ @ Override
62+ public boolean clearSnapshot (T group ) {
63+ // we take a writeLock to prevent watches from being created
6264 writeLock .lock ();
63-
6465 try {
6566 CacheStatusInfo <T > status = statuses .get (group );
6667
@@ -91,12 +92,11 @@ public Watch createWatch(
9192 Consumer <Response > responseConsumer ) {
9293
9394 T group = groups .hash (request .getNode ());
94-
95- writeLock . lock ();
96-
95+ // even though we're modifying, we take a readLock to allow multiple watches to be created in parallel since it
96+ // doesn't conflict
97+ readLock . lock ();
9798 try {
9899 CacheStatusInfo <T > status = statuses .computeIfAbsent (group , g -> new CacheStatusInfo <>(group ));
99-
100100 status .setLastWatchRequestTime (System .currentTimeMillis ());
101101
102102 Snapshot snapshot = snapshots .get (group );
@@ -105,7 +105,7 @@ public Watch createWatch(
105105 Watch watch = new Watch (ads , request , responseConsumer );
106106
107107 if (snapshot != null ) {
108- HashSet <String > requestedResources = new HashSet <> (request .getResourceNamesList ());
108+ Set <String > requestedResources = ImmutableSet . copyOf (request .getResourceNamesList ());
109109
110110 // If the request is asking for resources we haven't sent to the proxy yet, see if we have additional resources.
111111 if (!knownResourceNames .equals (requestedResources )) {
@@ -126,9 +126,7 @@ public Watch createWatch(
126126
127127 // If the requested version is up-to-date or missing a response, leave an open watch.
128128 if (snapshot == null || request .getVersionInfo ().equals (version )) {
129- watchCount ++;
130-
131- long watchId = watchCount ;
129+ long watchId = watchCount .incrementAndGet ();
132130
133131 if (LOGGER .isDebugEnabled ()) {
134132 LOGGER .debug ("open watch {} for {}[{}] from node {} for version {}" ,
@@ -150,33 +148,33 @@ public Watch createWatch(
150148 boolean responded = respond (watch , snapshot , group );
151149
152150 if (!responded ) {
153- watchCount ++ ;
151+ long watchId = watchCount . incrementAndGet () ;
154152
155153 if (LOGGER .isDebugEnabled ()) {
156154 LOGGER .debug ("did not respond immediately, leaving open watch {} for {}[{}] from node {} for version {}" ,
157- watchCount ,
155+ watchId ,
158156 request .getTypeUrl (),
159157 String .join (", " , request .getResourceNamesList ()),
160158 group ,
161159 request .getVersionInfo ());
162160 }
163161
164- status .setWatch (watchCount , watch );
162+ status .setWatch (watchId , watch );
165163
166- watch .setStop (() -> status .removeWatch (watchCount ));
164+ watch .setStop (() -> status .removeWatch (watchId ));
167165 }
168166
169167 return watch ;
170-
171168 } finally {
172- writeLock .unlock ();
169+ readLock .unlock ();
173170 }
174171 }
175172
176173 /**
177174 * {@inheritDoc}
178175 */
179- @ Override public Snapshot getSnapshot (T group ) {
176+ @ Override
177+ public Snapshot getSnapshot (T group ) {
180178 readLock .lock ();
181179
182180 try {
@@ -189,56 +187,51 @@ public Watch createWatch(
189187 /**
190188 * {@inheritDoc}
191189 */
192- @ Override public Collection <T > groups () {
193- readLock .lock ();
194-
195- try {
196- return ImmutableSet .copyOf (statuses .keySet ());
197- } finally {
198- readLock .unlock ();
199- }
190+ @ Override
191+ public Collection <T > groups () {
192+ return ImmutableSet .copyOf (statuses .keySet ());
200193 }
201194
202195 /**
203196 * {@inheritDoc}
204197 */
205198 @ Override
206199 public void setSnapshot (T group , Snapshot snapshot ) {
200+ // we take a writeLock to prevent watches from being created while we update the snapshot
201+ CacheStatusInfo <T > status ;
207202 writeLock .lock ();
208-
209203 try {
210204 // Update the existing snapshot entry.
211205 snapshots .put (group , snapshot );
206+ status = statuses .get (group );
207+ } finally {
208+ writeLock .unlock ();
209+ }
212210
213- CacheStatusInfo <T > status = statuses .get (group );
214-
215- if (status == null ) {
216- return ;
217- }
211+ if (status == null ) {
212+ return ;
213+ }
218214
219- status .watchesRemoveIf ((id , watch ) -> {
220- String version = snapshot .version (watch .request ().getTypeUrl (), watch .request ().getResourceNamesList ());
215+ status .watchesRemoveIf ((id , watch ) -> {
216+ String version = snapshot .version (watch .request ().getTypeUrl (), watch .request ().getResourceNamesList ());
221217
222- if (!watch .request ().getVersionInfo ().equals (version )) {
223- if (LOGGER .isDebugEnabled ()) {
224- LOGGER .debug ("responding to open watch {}[{}] with new version {}" ,
225- id ,
226- String .join (", " , watch .request ().getResourceNamesList ()),
227- version );
228- }
218+ if (!watch .request ().getVersionInfo ().equals (version )) {
219+ if (LOGGER .isDebugEnabled ()) {
220+ LOGGER .debug ("responding to open watch {}[{}] with new version {}" ,
221+ id ,
222+ String .join (", " , watch .request ().getResourceNamesList ()),
223+ version );
224+ }
229225
230- respond (watch , snapshot , group );
226+ respond (watch , snapshot , group );
231227
232- // Discard the watch. A new watch will be created for future snapshots once envoy ACKs the response.
233- return true ;
234- }
228+ // Discard the watch. A new watch will be created for future snapshots once envoy ACKs the response.
229+ return true ;
230+ }
235231
236- // Do not discard the watch. The request version is the same as the snapshot version, so we wait to respond.
237- return false ;
238- });
239- } finally {
240- writeLock .unlock ();
241- }
232+ // Do not discard the watch. The request version is the same as the snapshot version, so we wait to respond.
233+ return false ;
234+ });
242235 }
243236
244237 /**
@@ -259,9 +252,9 @@ private Response createResponse(DiscoveryRequest request, Map<String, ? extends
259252 Collection <? extends Message > filtered = request .getResourceNamesList ().isEmpty ()
260253 ? resources .values ()
261254 : request .getResourceNamesList ().stream ()
262- .map (resources ::get )
263- .filter (Objects ::nonNull )
264- .collect (Collectors .toList ());
255+ .map (resources ::get )
256+ .filter (Objects ::nonNull )
257+ .collect (Collectors .toList ());
265258
266259 return Response .create (request , filtered , version );
267260 }
0 commit comments