It was a life I wanted to reset the thread-safe associative counter

I've been struggling lately because I wanted to put a thread-safe associative counter in a microservice. The requirements are like this.

--Increment the count value of various keys for each request --Periodic log output of all key counts and reset to 0 --Do not keep keys that are no longer incremented in memory

I first considered the mterics feature of Finagle, which I use as a framework, but it didn't seem like there was a way to reset it, and it didn't seem like registering hundreds of thousands of keys, so I did it myself. I decided to make it with. So if you take a look at the documentation for Java's thread-safe associative array ConcurrentHashMap

ConcurrentHashMap can be used as a scalable frequency map (in histogram or multiset format) by using the value of LongAdder and initializing it with computeIfAbsent. For example, to add a count to ConcurrentHashMap <String, LongAdder> freqs, you can use freqs.computeIfAbsent (k-> new LongAdder ()). Increment () ;.

https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/ConcurrentHashMap.html

It seems that this is exactly what I was looking for. I thought that LongAdder is not AtomicLong, but it seems that it is used properly depending on the purpose.

This class is usually recommended over AtomicLong when common sums used for purposes such as statistics collection rather than for fine-grained synchronization control are updated by multiple threads. The characteristics of the two classes are similar when there is less contention for updates. When there is a lot of competition, the expected throughput is much higher in this class. However, it also consumes more capacity.

https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/atomic/LongAdder.html

And in fact, Scala also has a thread-safe associative array called TrieMap, so this time I decided to implement it using TrieMap and LongAdder. That is, declare an associative counter like this,

val counter = TrieMap[Key, LongAdder]()

Increment everywhere in each request process, allocate memory if the key does not exist,

counter.getOrElseUpdate(key, new LongAdder()).increment()

Separately, set a timer and delete elements while periodically outputting logs,

for (key <- counter.keys) {
  val sum = counter.remove(key).sum()
  logger.debug(s"$key: $sum")
}

I implemented it with the idea that it's ok, ok, it's easy, but it's not actually an accurate counter, isn't it? All the methods I'm using are atomic, but even if I combine atomic ones, they don't become atomic, so I haven't confirmed an actual example, but there should be cases like this.

  1. Thread A: getOrElseUpdate (key, new LongAdder ())
  2. Thread B: remove (key)
  3. Thread B: sum ()
  4. Thread A: ʻincrement () `

If the same keys are processed in this order, the last ʻincrement () `longAdder has already been removed from TrieMap and the log output value has been evaluated, so the incremented information will be drunk in the dark. .. Also, from the viewpoint of computational load, TrieMap becomes empty at once when the log is output, and then it expands every time it is incremented, so there is a disadvantage that the load increases before and after the log output.

It seems that it is awkward to reset and delete elements at the same time, so for the time being, when outputting the log,

for {
  key <- counter.keys
  value <- counter.get(key)
} {
  val sum = value.sumThenReset()
  logger.debug(s"$key: $sum")
}

I thought I should just reset it, but

public long sumThenReset() The effect is the same as if you execute reset () after sum (). For example, this method may be applied during a rest period between multithreaded calculations. If updates are being made in parallel with this method, the value returned is not guaranteed to be the last value that occurred before the reset.

https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/atomic/LongAdder.html#sumThenReset--

How, the information that is also incremented may be drunk. This seems to be unavoidable due to the data structure of LongAdder, so if you look at AtomicLong,

public final long getAndSet(long newValue) Atomicizes the specified value and returns the previous value.

https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/atomic/AtomicLong.html#getAndSet-long-

This is atomic. Now, switch the data structure of the associative counter,

val counter = TrieMap[Key, AtomicLong]()
for {
  key <- counter.keys
  value <- counter.get(key)
} {
  val v = value.getAndSet(0)
  logger.debug(s"$key: $v")
}

It is a timer different from the log output, and if the count value is 0, it seems good to delete the element. This implementation

for {
  key <- counter.keys
  value <- counter.get(key)
} {
  val v = value.get()
  if (v == 0) counter.remove(key)
}

Of course, if it is incremented from get () to remove (key), yes, it's dark! (Good work!) In other words, it is necessary to perform value condition judgment and element deletion atomically, and if there is such a convenient thing,

def remove(k: K, v: V): Boolean Removes the entry for the specified key if it's currently mapped to the specified value.

https://www.scala-lang.org/api/2.12.7/scala/collection/concurrent/TrieMap.html#remove(k:K,v:V):Boolean

There are some, but it's a match judgment. Since AtomicLong is a reference type, even if it is the same as the count value, as an instance,

scala> new AtomicLong(0) == new AtomicLong(0)
res0: Boolean = false

It's not equal to that. This is the same with ConcurrentHashMap.

public boolean remove(Object key, Object value) Deletes the entry for the key only if the key is currently mapped to the specified value. This is equivalent to the following description.

https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/ConcurrentHashMap.html#remove-java.lang.Object-java.lang.Object-

Therefore, in order to use this remove, the type of value must be set to the primitive Long, and exclusive control that increments atomic must rely on ConcurrentHashMap.compute or the like.

compute public V compute(K key, BiFunction<? super K,? super V,? extends V> remappingFunction) Attempts to calculate the mapping for the specified key and the currently mapped value (null if the current mapping does not exist). The entire method call is executed atomically. Keep the calculation short and easy, as some of the update operations that other threads try on this map may be blocked while the calculation is in progress. Also, do not try to update other mappings in this map in the calculation

https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/ConcurrentHashMap.html#computeIfAbsent-K-java.util.function.Function-

And, in fact, I noticed when I twisted my head a little before this, but there was an example of such an implementation in Gauva.

public final class AtomicLongMap<K> implements Serializable {
  private final ConcurrentHashMap<K, Long> map;

https://github.com/google/guava/blob/v27.0.1/guava/src/com/google/common/util/concurrent/AtomicLongMap.java#L59

This also seems to have been implemented in AtomicLong in the past,

public final class AtomicLongMap<K> {
  private final ConcurrentHashMap<K, AtomicLong> map;

https://github.com/google/guava/blob/v20.0/guava/src/com/google/common/util/concurrent/AtomicLongMap.java#L55

I feel that there was something wrong with the switch.

In conclusion, it seems that Gauva's AtomicLongMap should be used or implemented by imitating it. Parallel processing is difficult.

Recommended Posts

It was a life I wanted to reset the thread-safe associative counter
When I wanted to create a method for Premium Friday, it was already in the Java 8 standard API
The story I wanted to unzip
Generics Metamorphosis Grammar-RealmObject backup-I thought it was a pain to write RealmMigration during development but I wanted to take over the data-
I wanted to add @VisibleForTesting to the method
I was addicted to the roll method
I was addicted to the Spring-Batch test
A story I was addicted to when testing the API using MockMVC
What I tried when I wanted to get all the fields of a bean
I wanted to make (a == 1 && a == 2 && a == 3) true in Java
I was a little addicted to the S3 Checksum comparison, so I made a note.
Since the image of the lock screen of Windows 10 is beautiful, I wanted to make it a slide show of wallpaper
A memorandum because I was addicted to the setting of the Android project of IntelliJ IDEA
I was addicted to the NoSuchMethodError in Cloud Endpoints
I was addicted to the record of the associated model
I tried to decorate the simple calendar a little
I want to add a delete function to the comment function
What I was addicted to when introducing the JNI library
I was addicted to looping the Update statement on MyBatis
I just wanted to make a Reactive Property in Java
I want to create a form to select the [Rails] category
A story I was addicted to in Rails validation settings
I was addicted to the setting of laradock + VSCode + xdebug
What I was addicted to with the Redmine REST API
I want to give a class name to the select attribute
I was confused because there was a split in the Array
The story I was addicted to when setting up STS
I made a GitHub Action that makes it easy to understand the execution result of RSpec
[Circle CI] A story I was addicted to at Start Building
I want to create a chat screen for the Swift chat app!
A note when I was addicted to converting Ubuntu on WSL1 to WSL2
I made a gem to post the text of org-mode to qiita
About the matter that I was addicted to how to use hashmap
I wanted to make JavaFX programming easier with the Spring Framework
[Java] I tried to make a maze by the digging method ♪
[Introduction to JSP + Servlet] I played with it for a while ♬
I made a tool to output the difference of CSV file
"RSpec doesn't work!" The cause was spring, so I investigated it.
I was addicted to the API version min23 setting of registerTorchCallback
I used the Mediator pattern to express a river crossing puzzle.
I was able to deploy the Docker + laravel + MySQL app to Heroku!
A story that I was addicted to twice with the automatic startup setting of Tomcat 8 on CentOS 8
I tried running gRPC's Quick Start (Kotlin version), but it was difficult to read the Gradle script.
What I did when I was addicted to the error "Could not find XXX in any of the sources" when I added a Gem and built it