티스토리 뷰

Java

Fork/Join Framework in Java

§무명소졸§ 2020. 6. 16. 12:37

Fork/Join Framework 는 효율적인 병렬처리를 위해 자바 7에서 소개된 새로운 기능이다. Fork/Join은 자바에서만 쓰이는 용어는 아니고 병렬 처리를 위한 공통된 모델이고 분할 정복 알고리즘을 통해서 재귀적으로 처리된다.
괜히 어려워지는 것 같은데 간단한 예제를 통해 Java에서 Fork/Join Framework 을 사용 하는법을 알아보자  

Fork/Join  나눴다/합쳐지다

Fork/Join 을 네이버에서 영어 뜻을 찾아보면 아래와 같다.

Fork 

Join


단어 뜻대로 보면 일을 나누고 합치는 것이다. 
더 풀어 쓰면 어떤 작업을 여러개의 쓰레드가 나누어 처리하게 하고 작업이 끝나면 작업들을 합치는 것이다.

만약 100개의 랜덤한 숫자가 있고 이를 합산하는 프로그램을 Fork/Join을 통해 처리 한다면 아래 그림과 같을 것이다.

RecursiveTask<T>, RecursiveAction

이제 위 작업을 실제 자바 코드로 구현해 보겠다. 먼저 자바에서 ForkJoin 처리를 위한 Task클래스를 만들려면 추상클래스인 RecursiveTask<T> 또는 RecursiveAction 을 상속 받아야 된다.  RecursiveTask<T> 와 RecursiveAction의 차이는 처리 작업의 리턴 값 유무이다. 숫자를 합산하는 작업은 리턴값이 필요하기 때문에 RecursiveTask<T>를 상속해서 클래스를 만들겠다.

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.Collectors;

public class SumNumberTask extends RecursiveTask<Integer> {
    private final List<Integer> numbers;

    public SumNumberTask(List<Integer> numbers) {
        this.numbers = numbers;
    }

    @Override
    protected Integer compute() {
        if (numbers.size() <= 25) {
            return numbers.stream().mapToInt(Integer::intValue).sum();
        }
        List<SumNumberTask> subTasks = partitioningTask(this.numbers);

        return ForkJoinTask.invokeAll(subTasks)
                .stream()
                .mapToInt(ForkJoinTask::join)
                .sum();
    }

    private List<SumNumberTask> partitioningTask(List<Integer> numbers) {
        List<Integer> sharedFrist = numbers.subList(0, numbers.size() / 2);
        List<Integer> sharedSecond = numbers.subList(numbers.size() / 2, numbers.size());
        SumNumberTask firstTask = new SumNumberTask(sharedFrist);
        SumNumberTask secondTask = new SumNumberTask(sharedSecond);
        List<SumNumberTask> subtasks = new ArrayList<>();
        subtasks.add(firstTask);
        subtasks.add(secondTask);
        return subtasks;
    }
    private static List<Integer> makeRandomNumbers() {
        return new Random().ints(1000, 1, 100)
                .boxed()
                .collect(Collectors.toList());
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        List<Integer> numbers = makeRandomNumbers();
        int sum = numbers.stream().mapToInt(Integer::intValue).sum();
        System.out.println(sum + "<== single thread");

        SumNumberTask sumNumberTask = new SumNumberTask(numbers);
        ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
        ForkJoinTask<Integer> submit = forkJoinPool.submit(sumNumberTask);
        Integer integer = submit.get();
        System.out.println(integer + "<== multi thread");
    }
}

 

재귀처리

100 개의 숫자를 25개씩 나누어 4개의 쓰레드로 처리 해야되기 때문에 숫자가 25개가 넘지 않을 때까지 Task를 분할 한다. 

   @Override
    protected Integer compute() {
        //25개 이하면 실제 작업을 진행한다.
        if (numbers.size() <= 25) {
            return numbers.stream().mapToInt(Integer::intValue).sum();
        }
        
        //subtask 생성
        List<SumNumberTask> subTasks = partitioningTask(this.numbers);

	//task 재귀 호출
        return ForkJoinTask.invokeAll(subTasks)
                .stream()
                .mapToInt(ForkJoinTask::join)
                .sum();
    }

SubTask생성

숫자 리스트를 입력받아 절반으로 나누고 2개의 task를 생성 반환한다.

 private List<SumNumberTask> partitioningTask(List<Integer> numbers) {
        List<Integer> sharedFrist = numbers.subList(0, numbers.size() / 2);
        List<Integer> sharedSecond = numbers.subList(numbers.size() / 2, numbers.size());
        SumNumberTask firstTask = new SumNumberTask(sharedFrist);
        SumNumberTask secondTask = new SumNumberTask(sharedSecond);
        List<SumNumberTask> subtasks = new ArrayList<>();
        subtasks.add(firstTask);
        subtasks.add(secondTask);
        return subtasks;
    }

결과

메인 함수를 실행시켜 결과를 출력해보면 단일 쓰레드로 처리한 결과와 ForkJoinPool을 이용한 결과가 일치 하는걸 확인할 수 있다.

50821<== single thread
50821<== multi thread

RecursiveTaskRecursive

마치면

사실 위 예제와 같은 케이스의 작업은 굳이  병렬 처리를 하기 보다는 단일 쓰레드로 합산 처리하는게 더 빠르다. 작업을 분할하는 것 또한 리소스가 들어가기 때문이다. 그렇기 때문에 Fork/Join Framework 사용 여부에 대한 개발자의 적절한 선택이 필요하다.  Fork/Join Framework에 대해서 간단한 예제를 통해 사용하는 법을 알아봤다. Fork/Join Framework에 더 깊은 내용은 아래 링크를 참고 하면 된다.

  • https://www.baeldung.com/java-fork-join
  • http://tutorials.jenkov.com/java-util-concurrent/java-fork-and-join-forkjoinpool.html
  • https://www.pluralsight.com/guides/introduction-to-the-fork-join-framework

 

 

 

'Java' 카테고리의 다른 글

Java8 New features #2(default, static 메서드의 추가 )  (0) 2020.08.18
Java8 New features #1(Lambda Expressions)  (0) 2020.08.14
Random Integer List in Java8  (0) 2020.06.08
MessageFormat with Java  (0) 2020.05.26
Run vs Start in Java Thread  (0) 2020.05.22
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크