I've been struggling lately because I wanted to put a thread-safe associative counter in a microservice. The requirements are like this.
--Increment the count value of various keys for each request --Periodic log output of all key counts and reset to 0 --Do not keep keys that are no longer incremented in memory
I first considered the mterics feature of Finagle, which I use as a framework, but it didn't seem like there was a way to reset it, and it didn't seem like registering hundreds of thousands of keys, so I did it myself. I decided to make it with. So if you take a look at the documentation for Java's thread-safe associative array ConcurrentHashMap
ConcurrentHashMap can be used as a scalable frequency map (in histogram or multiset format) by using the value of LongAdder and initializing it with computeIfAbsent. For example, to add a count to ConcurrentHashMap <String, LongAdder> freqs, you can use freqs.computeIfAbsent (k-> new LongAdder ()). Increment () ;.
https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/ConcurrentHashMap.html
It seems that this is exactly what I was looking for. I thought that LongAdder is not AtomicLong, but it seems that it is used properly depending on the purpose.
This class is usually recommended over AtomicLong when common sums used for purposes such as statistics collection rather than for fine-grained synchronization control are updated by multiple threads. The characteristics of the two classes are similar when there is less contention for updates. When there is a lot of competition, the expected throughput is much higher in this class. However, it also consumes more capacity.
https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/atomic/LongAdder.html
And in fact, Scala also has a thread-safe associative array called TrieMap, so this time I decided to implement it using TrieMap and LongAdder. That is, declare an associative counter like this,
val counter = TrieMap[Key, LongAdder]()
Increment everywhere in each request process, allocate memory if the key does not exist,
counter.getOrElseUpdate(key, new LongAdder()).increment()
Separately, set a timer and delete elements while periodically outputting logs,
for (key <- counter.keys) {
val sum = counter.remove(key).sum()
logger.debug(s"$key: $sum")
}
I implemented it with the idea that it's ok, ok, it's easy, but it's not actually an accurate counter, isn't it? All the methods I'm using are atomic, but even if I combine atomic ones, they don't become atomic, so I haven't confirmed an actual example, but there should be cases like this.
getOrElseUpdate (key, new LongAdder ())
remove (key)
sum ()
If the same keys are processed in this order, the last ʻincrement () `longAdder has already been removed from TrieMap and the log output value has been evaluated, so the incremented information will be drunk in the dark. .. Also, from the viewpoint of computational load, TrieMap becomes empty at once when the log is output, and then it expands every time it is incremented, so there is a disadvantage that the load increases before and after the log output.
It seems that it is awkward to reset and delete elements at the same time, so for the time being, when outputting the log,
for {
key <- counter.keys
value <- counter.get(key)
} {
val sum = value.sumThenReset()
logger.debug(s"$key: $sum")
}
I thought I should just reset it, but
public long sumThenReset() The effect is the same as if you execute reset () after sum (). For example, this method may be applied during a rest period between multithreaded calculations. If updates are being made in parallel with this method, the value returned is not guaranteed to be the last value that occurred before the reset.
https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/atomic/LongAdder.html#sumThenReset--
How, the information that is also incremented may be drunk. This seems to be unavoidable due to the data structure of LongAdder, so if you look at AtomicLong,
public final long getAndSet(long newValue) Atomicizes the specified value and returns the previous value.
https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/atomic/AtomicLong.html#getAndSet-long-
This is atomic. Now, switch the data structure of the associative counter,
val counter = TrieMap[Key, AtomicLong]()
for {
key <- counter.keys
value <- counter.get(key)
} {
val v = value.getAndSet(0)
logger.debug(s"$key: $v")
}
It is a timer different from the log output, and if the count value is 0, it seems good to delete the element. This implementation
for {
key <- counter.keys
value <- counter.get(key)
} {
val v = value.get()
if (v == 0) counter.remove(key)
}
Of course, if it is incremented from get ()
to remove (key)
, yes, it's dark! (Good work!)
In other words, it is necessary to perform value condition judgment and element deletion atomically, and if there is such a convenient thing,
def remove(k: K, v: V): Boolean Removes the entry for the specified key if it's currently mapped to the specified value.
https://www.scala-lang.org/api/2.12.7/scala/collection/concurrent/TrieMap.html#remove(k:K,v:V):Boolean
There are some, but it's a match judgment. Since AtomicLong is a reference type, even if it is the same as the count value, as an instance,
scala> new AtomicLong(0) == new AtomicLong(0)
res0: Boolean = false
It's not equal to that. This is the same with ConcurrentHashMap.
public boolean remove(Object key, Object value) Deletes the entry for the key only if the key is currently mapped to the specified value. This is equivalent to the following description.
https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/ConcurrentHashMap.html#remove-java.lang.Object-java.lang.Object-
Therefore, in order to use this remove, the type of value must be set to the primitive Long, and exclusive control that increments atomic must rely on ConcurrentHashMap.compute or the like.
compute public V compute(K key, BiFunction<? super K,? super V,? extends V> remappingFunction) Attempts to calculate the mapping for the specified key and the currently mapped value (null if the current mapping does not exist). The entire method call is executed atomically. Keep the calculation short and easy, as some of the update operations that other threads try on this map may be blocked while the calculation is in progress. Also, do not try to update other mappings in this map in the calculation
https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/ConcurrentHashMap.html#computeIfAbsent-K-java.util.function.Function-
And, in fact, I noticed when I twisted my head a little before this, but there was an example of such an implementation in Gauva.
public final class AtomicLongMap<K> implements Serializable {
private final ConcurrentHashMap<K, Long> map;
https://github.com/google/guava/blob/v27.0.1/guava/src/com/google/common/util/concurrent/AtomicLongMap.java#L59
This also seems to have been implemented in AtomicLong in the past,
public final class AtomicLongMap<K> {
private final ConcurrentHashMap<K, AtomicLong> map;
https://github.com/google/guava/blob/v20.0/guava/src/com/google/common/util/concurrent/AtomicLongMap.java#L55
I feel that there was something wrong with the switch.
In conclusion, it seems that Gauva's AtomicLongMap should be used or implemented by imitating it. Parallel processing is difficult.