Skip to content

Commit

Permalink
Add core-java-concurrency to main pom (eugenp#2507)
Browse files Browse the repository at this point in the history
* Add core-java-concurrency

* Refactor codebase
  • Loading branch information
pivovarit authored Aug 27, 2017
1 parent 0b85b0a commit 515e4ca
Show file tree
Hide file tree
Showing 21 changed files with 74 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
public class SafeCounterWithLock {
private volatile int counter;

public int getValue() {
int getValue() {
return counter;
}

public synchronized void increment() {
synchronized void increment() {
counter++;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
public class SafeCounterWithoutLock {
private final AtomicInteger counter = new AtomicInteger(0);

public int getValue() {
int getValue() {
return counter.get();
}

public void increment() {
void increment() {
while(true) {
int existingValue = getValue();
int newValue = existingValue + 1;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.baeldung.concurrent.atomic;

public class UnsafeCounter {
int counter;
private int counter;

public int getValue() {
int getValue() {
return counter;
}

public void increment() {
void increment() {
counter++;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
public class Context {
private final String userName;

public Context(String userName) {
Context(String userName) {
this.userName = userName;
}

@Override
public String toString() {
return "Context{" +
"userNameSecret='" + userName + '\'' +
'}';
"userNameSecret='" + userName + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
import java.util.concurrent.ConcurrentHashMap;

public class SharedMapWithUserContext implements Runnable {
public final static Map<Integer, Context> userContextPerUserId = new ConcurrentHashMap<>();
final static Map<Integer, Context> userContextPerUserId = new ConcurrentHashMap<>();
private final Integer userId;
private UserRepository userRepository = new UserRepository();

public SharedMapWithUserContext(Integer userId) {
SharedMapWithUserContext(Integer userId) {
this.userId = userId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public class ThreadLocalWithUserContext implements Runnable {
private final Integer userId;
private UserRepository userRepository = new UserRepository();

public ThreadLocalWithUserContext(Integer userId) {
ThreadLocalWithUserContext(Integer userId) {
this.userId = userId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


public class UserRepository {
public String getUserNameForUserId(Integer userId) {
String getUserNameForUserId(Integer userId) {
return UUID.randomUUID().toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@

import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.Collectors;

public class CountingTask extends RecursiveTask<Integer> {

private final TreeNode node;

public CountingTask(TreeNode node) {
CountingTask(TreeNode node) {
this.node = node;
}

@Override
protected Integer compute() {
return node.value + node.children.stream().map(childNode -> new CountingTask(childNode).fork()).collect(Collectors.summingInt(ForkJoinTask::join));
return node.getValue() + node.getChildren().stream()
.map(childNode -> new CountingTask(childNode).fork())
.mapToInt(ForkJoinTask::join)
.sum();
}

}
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
package com.baeldung.threadpool;

import java.util.Set;

import com.google.common.collect.Sets;

import java.util.Set;

public class TreeNode {

int value;
private int value;

Set<TreeNode> children;
private Set<TreeNode> children;

public TreeNode(int value, TreeNode... children) {
TreeNode(int value, TreeNode... children) {
this.value = value;
this.children = Sets.newHashSet(children);
}

public int getValue() {
return value;
}

public Set<TreeNode> getChildren() {
return children;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ public class Consumer implements Runnable {

private final TransferQueue<String> transferQueue;
private final String name;
private final int numberOfMessagesToConsume;
public final AtomicInteger numberOfConsumedMessages = new AtomicInteger();
final int numberOfMessagesToConsume;
final AtomicInteger numberOfConsumedMessages = new AtomicInteger();

public Consumer(TransferQueue<String> transferQueue, String name, int numberOfMessagesToConsume) {
Consumer(TransferQueue<String> transferQueue, String name, int numberOfMessagesToConsume) {
this.transferQueue = transferQueue;
this.name = name;
this.numberOfMessagesToConsume = numberOfMessagesToConsume;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ public class Producer implements Runnable {

private final TransferQueue<String> transferQueue;
private final String name;
private final Integer numberOfMessagesToProduce;
public final AtomicInteger numberOfProducedMessages = new AtomicInteger();
final Integer numberOfMessagesToProduce;
final AtomicInteger numberOfProducedMessages = new AtomicInteger();

public Producer(TransferQueue<String> transferQueue, String name, Integer numberOfMessagesToProduce) {
Producer(TransferQueue<String> transferQueue, String name, Integer numberOfMessagesToProduce) {
this.transferQueue = transferQueue;
this.name = name;
this.numberOfMessagesToProduce = numberOfMessagesToProduce;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.*;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion() thro
// Given
List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
CountDownLatch countDownLatch = new CountDownLatch(5);
List<Thread> workers = Stream.generate(() -> new Thread(new Worker(outputScraper, countDownLatch))).limit(5).collect(toList());
List<Thread> workers = Stream.generate(() -> new Thread(new Worker(outputScraper, countDownLatch)))
.limit(5)
.collect(toList());

// When
workers.forEach(Thread::start);
countDownLatch.await(); // Block until workers finish
outputScraper.add("Latch released");

// Then
outputScraper.forEach(Object::toString);
assertThat(outputScraper).containsExactly("Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Latch released");
}

Expand All @@ -35,7 +36,9 @@ public void whenFailingToParallelProcess_thenMainThreadShouldTimeout() throws In
// Given
List<String> outputScraper = Collections.synchronizedList(new ArrayList<>());
CountDownLatch countDownLatch = new CountDownLatch(5);
List<Thread> workers = Stream.generate(() -> new Thread(new BrokenWorker(outputScraper, countDownLatch))).limit(5).collect(toList());
List<Thread> workers = Stream.generate(() -> new Thread(new BrokenWorker(outputScraper, countDownLatch)))
.limit(5)
.collect(toList());

// When
workers.forEach(Thread::start);
Expand Down Expand Up @@ -63,7 +66,6 @@ public void whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime() throws
outputScraper.add("Workers complete");

// Then
outputScraper.forEach(Object::toString);
assertThat(outputScraper).containsExactly("Workers ready", "Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Workers complete");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ public class SquareCalculatorIntegrationTest {

private static final Logger LOG = LoggerFactory.getLogger(SquareCalculatorIntegrationTest.class);


@Rule
public TestName name = new TestName();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ private void executeWriterThreads(SynchronizedHashMapWithRWLock object, int thre

private void executeReaderThreads(SynchronizedHashMapWithRWLock object, int threadCount, ExecutorService service) {
for (int i = 0; i < threadCount; i++)
service.execute(() -> {
object.get("key" + threadCount);
});
service.execute(() -> object.get("key" + threadCount));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

public class ConcurrentMapNullKeyValueManualTest {

ConcurrentMap<String, Object> concurrentMap;
private ConcurrentMap<String, Object> concurrentMap;

@Before
public void setup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class ConcurrentNavigableMapManualTest {
public void givenSkipListMap_whenAccessInMultiThreads_thenOrderingStable() throws InterruptedException {
NavigableMap<Integer, String> skipListMap = new ConcurrentSkipListMap<>();

updateMapConcurrently(skipListMap, 4);
updateMapConcurrently(skipListMap);

Iterator<Integer> skipListIter = skipListMap.keySet().iterator();
int previous = skipListIter.next();
Expand All @@ -28,9 +28,9 @@ public void givenSkipListMap_whenAccessInMultiThreads_thenOrderingStable() throw
}
}

private void updateMapConcurrently(NavigableMap<Integer, String> navigableMap, int concurrencyLevel) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(concurrencyLevel);
for (int i = 0; i < concurrencyLevel; i++) {
private void updateMapConcurrently(NavigableMap<Integer, String> navigableMap) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(4);
for (int i = 0; i < 4; i++) {
executorService.execute(() -> {
ThreadLocalRandom random = ThreadLocalRandom.current();
for (int j = 0; j < 10000; j++) {
Expand All @@ -45,26 +45,26 @@ private void updateMapConcurrently(NavigableMap<Integer, String> navigableMap, i
@Test
public void givenSkipListMap_whenNavConcurrently_thenCountCorrect() throws InterruptedException {
NavigableMap<Integer, Integer> skipListMap = new ConcurrentSkipListMap<>();
int count = countMapElementByPollingFirstEntry(skipListMap, 10000, 4);
int count = countMapElementByPollingFirstEntry(skipListMap);
assertEquals(10000 * 4, count);
}

@Test
public void givenTreeMap_whenNavConcurrently_thenCountError() throws InterruptedException {
NavigableMap<Integer, Integer> treeMap = new TreeMap<>();
int count = countMapElementByPollingFirstEntry(treeMap, 10000, 4);
int count = countMapElementByPollingFirstEntry(treeMap);
assertNotEquals(10000 * 4, count);
}

private int countMapElementByPollingFirstEntry(NavigableMap<Integer, Integer> navigableMap, int elementCount, int concurrencyLevel) throws InterruptedException {
for (int i = 0; i < elementCount * concurrencyLevel; i++) {
private int countMapElementByPollingFirstEntry(NavigableMap<Integer, Integer> navigableMap) throws InterruptedException {
for (int i = 0; i < 10000 * 4; i++) {
navigableMap.put(i, i);
}
AtomicInteger counter = new AtomicInteger(0);
ExecutorService executorService = Executors.newFixedThreadPool(concurrencyLevel);
for (int j = 0; j < concurrencyLevel; j++) {
ExecutorService executorService = Executors.newFixedThreadPool(4);
for (int j = 0; j < 4; j++) {
executorService.execute(() -> {
for (int i = 0; i < elementCount; i++) {
for (int i = 0; i < 10000; i++) {
if (navigableMap.pollFirstEntry() != null) {
counter.incrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import org.junit.Test;

import java.util.ArrayList;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
Expand All @@ -28,9 +27,9 @@ public void givenIterating_whenUsingIteratorRemove_thenNoError() throws Interrup

List<Integer> integers = newArrayList(1, 2, 3);

for (Iterator<Integer> iterator = integers.iterator(); iterator.hasNext();) {
for (Iterator<Integer> iterator = integers.iterator(); iterator.hasNext(); ) {
Integer integer = iterator.next();
if(integer == 2) {
if (integer == 2) {
iterator.remove();
}
}
Expand All @@ -45,7 +44,7 @@ public void givenIterating_whenUsingRemovalList_thenNoError() throws Interrupted
List<Integer> toRemove = newArrayList();

for (Integer integer : integers) {
if(integer == 2) {
if (integer == 2) {
toRemove.add(integer);
}
}
Expand All @@ -69,10 +68,10 @@ public void whenUsingStream_thenRemoveElements() {
Collection<Integer> integers = newArrayList(1, 2, 3);

List<String> collected = integers
.stream()
.filter(i -> i != 2)
.map(Object::toString)
.collect(toList());
.stream()
.filter(i -> i != 2)
.map(Object::toString)
.collect(toList());

assertThat(collected).containsExactly("1", "3");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void creationSubmittingTasksShuttingDownNow_whenShutDownAfterAwating_then

assertTrue(threadPoolExecutor.isShutdown());
assertFalse(notExecutedTasks.isEmpty());
assertTrue(notExecutedTasks.size() > 0 && notExecutedTasks.size() < 98);
assertTrue(notExecutedTasks.size() < 98);
}

private List<Runnable> smartShutdown(ExecutorService executorService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal()
List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed().collect(Collectors.toList());

ForkJoinPool customThreadPool = new ForkJoinPool(4);
long actualTotal = customThreadPool.submit(() -> aList.parallelStream().reduce(0L, Long::sum)).get();
long actualTotal = customThreadPool
.submit(() -> aList.parallelStream()
.reduce(0L, Long::sum))
.get();

assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
}
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
<module>cdi</module>
<!-- <module>core-java-9</module> -->
<module>core-java</module>
<module>core-java-concurrency</module>
<module>couchbase-sdk</module>

<module>deltaspike</module>
Expand Down

0 comments on commit 515e4ca

Please sign in to comment.