[FLINK-38783][runtime] Improved TieredStorageResourceRegistry Thread-Safety/Concurrency#27459
[FLINK-38783][runtime] Improved TieredStorageResourceRegistry Thread-Safety/Concurrency#27459rionmonster wants to merge 5 commits intoapache:masterfrom
Conversation
…Test Cases to Verify Potential Issue
…rrency Tests to Avoid Redundancy
…uctures to Improve Concurrency Support
…gistry Concurrency Tests
|
CC: @TanYuxin-tyx for review (please feel free to tag someone else). You seemed to be the most appropriate given the history. |
7c8a412 to
7715029
Compare
…rceRegistry Concurrency Tests
7715029 to
448adde
Compare
|
@flinkbot run azure |
| private final Map<TieredStorageDataIdentifier, List<TieredStorageResource>> | ||
| registeredResources = new HashMap<>(); | ||
| private final ConcurrentHashMap< | ||
| TieredStorageDataIdentifier, CopyOnWriteArrayList<TieredStorageResource>> |
There was a problem hiding this comment.
I am curious, why do we need the CopyOnWriteArrayList, is the introduction of ConcurrentHashMap not enough to solve this?
There was a problem hiding this comment.
That's a good question!
I thought the same when I initially applied the fix (e.g., only swapped out the external map for its thread-safe brethren), however realized the tests that were added would still fail.
The ConcurrentHashMap handles the thread-safety for the map operations but not for the internal values within the map. This makes it possible to have multiple separate threads acting upon the non thread-safe list, which can lead to some inconsistency:
registeredResources
.computeIfAbsent(owner, (ignore) -> new ArrayList<>())
// Concurrent callers could be working with the same thread-safe map, but
// the underlying list is not thread-safe
.add(tieredStorageResource);
Without the extra thread-safety on the list, many of the existing tests can fail with ConcurrentModificationException, NullPointerException, and lost entries (which testConcurrentRegisterWithSameIdentifier specifically checks for). Making the swap to the CopyOnWriteArrayList (or some other thread-safe collection like Collections.synchronizedList()) makes the behavior consistent.
What is the purpose of the change
This pull request addresses the issue detailed in FLINK-38783 which detailed how the registration process within
TieredStorageResourceRegistrywas not properly handling concurrent operations and as such could throw aConcurrentModificationExceptionunder concurrent load.Brief change log
registeredResourcesgeneral-purpose hash map (Map<..., List<...>>) with corresponding thread-safe structures (ConcurrentHashMap<..., CopyOnWriteArrayList<...>>) to better handle concurrent operations.Verifying this change
This change added a series of tests in
TieredStorageResourceRegistryTestto originally reproduce the issue and later confirm the fix worked as expected including:testConcurrentRegisterResourceto test concurrent resource registration across separate threads (10 total with same owner/identifier)testConcurrentRegisterResourceWithDifferentOwnersto test concurrent registration across separate threads (10 total with different owners/identifiers)testConcurrentRegisterAndClearto test concurrent registration and clearing across separate threadsExample Tests
Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation