본문 바로가기

Develop/.NET 가이드

[C#] 병렬 프로그래밍 Parallel Programming (2) - 작업 병렬화

반응형

Parallel Programming
Parallel Programming

작업 병렬화 Task Parallelism

작업 병렬화는 일반적으로 생각하는 병렬 처리를 방식입니다. 작업 병렬화는 데이터 병렬화와 달리, 데이터마다 동일한 처리를 하는 방법이 아닌, 독립적인 작업들을 동시에 처리하는 방식입니다. 데이터 병렬화와 마찬가지로 작업 병렬화도 TPL 이 개발자를 대신하여 스레드를 관리합니다. 따라서 개발자는 병렬 프로그래밍을 Parallel.Invoke 메서드를 사용해 아래와 같이 손쉽게 구현할 수 있게 되었습니다.

Parallel.Invoke(() => DoSomeWork(), () => DoSomeOtherWork());

예제 코드는 Parallel.Invoke 메서드로 DoSomeWork() 과 DoSomeOtherWork()을 병렬로 처리하고 있습니다. 코드의 내부 동작을 추측할 때, 막연히 Parallel.Invoke 메서드에 전달된 Action 수만큼 스레드가 생성될 거라 추측하기 쉽습니다. 하지만 TPL 이 내부적으로 스레드를 관리해주는 역할을 하므로, TPL의 판단에 따라 스레드 수는 얼마든지 달라질 수 있습니다.

병렬 처리 제어하기

만약 병렬 작업을 제어하길 원한다면 명시적으로 Task 클래스를 활용하면 됩니다. Task 클래스로 생성된 작업은 스케줄러에 의해 같은 스레드 내에서 비동기로 처리되거나, 새로운 스레드에서 병렬 처리됩니다.

Task 클래스는 비동기 프로그래밍에도 활용됩니다.

아래 예제 코드는 Task.Factory.StartNew 메서드로 작업을 한번에 생성하여 처리합니다. 그리고 모든 작업이 완료되길 기다립니다. 마지막으로 작업이 처리될 때 스레드 ID 와 시간을 Console 로 출력합니다.

      Task[] taskArray = new Task[10];
      for (int i = 0; i < taskArray.Length; i++) {
         // Task.Factory.StartNew 메서드로 새로운 작업을 생성하고 시작한다.
         // 새로운 작업을 생성할 때 스레드 풀에 등록되고 스케줄러에 의해 처리가 된다.
         taskArray[i] = Task.Factory.StartNew( (Object obj ) => {
                                                  CustomData data = obj as CustomData;
                                                  if (data == null) 
                                                     return;

                                                  data.ThreadNum = Thread.CurrentThread.ManagedThreadId;
                                               },
                                               new CustomData() {Name = i, CreationTime = DateTime.Now.Ticks} );
      }

      // 모든 taskArray 배열 내 모든 작업이 종료되길 기다린다.
      Task.WaitAll(taskArray);     

      // taskArray 배열에 있는 작업 정보를 출력한다.
      foreach (var task in taskArray) {
         var data = task.AsyncState as CustomData;
         if (data != null)
            Console.WriteLine("Task #{0} created at {1}, ran on thread #{2}.",
                              data.Name, data.CreationTime, data.ThreadNum);
      } 

      // 결과:
      //       Task #0 created at 635116412924597583 on thread #3.
      //       Task #1 created at 635116412924607584 on thread #4.
      //       Task #3 created at 635116412924607584 on thread #4.
      //       Task #4 created at 635116412924607584 on thread #4.
      //       Task #2 created at 635116412924607584 on thread #3.
      //       Task #6 created at 635116412924607584 on thread #3.
      //       Task #5 created at 635116412924607584 on thread #4.
      //       Task #8 created at 635116412924607584 on thread #4.
      //       Task #7 created at 635116412924607584 on thread #3.
      //       Task #9 created at 635116412924607584 on thread #4.

결과를 자세히 살펴보면 모든 작업이 거의 동일한 시간에 시작되었고, 2개의 스레드에 나뉘어 병렬 처리되었습니다.

연속하여 작업 처리하기

만약 작업을 연속하여 처리해야 한다면 Task.ContinueWith 메서드로 이전 작업 결과를 활용하는 새로운 작업을 기존 작업에 연결하며 예약할 수 있습니다. 또한 기존 작업의 결과를 넘겨받아 연속 작업에서 처리할 수도 있습니다.

        var getData = Task.Factory.StartNew(() => { 
                                             Random rnd = new Random(); 
                                             int[] values = new int[100];
                                             for (int ctr = 0; ctr <= values.GetUpperBound(0); ctr++)
                                                values[ctr] = rnd.Next();

                                             return values;
                                          } );  
          var processData = getData.ContinueWith((x) => {
                                                int n = x.Result.Length;
                                                long sum = 0;
                                                double mean;

                                                for (int ctr = 0; ctr <= x.Result.GetUpperBound(0); ctr++)
                                                   sum += x.Result[ctr];

                                                mean = sum / (double) n;
                                                return Tuple.Create(n, sum, mean);
                                             } ); 
          var displayData = processData.ContinueWith((x) => {
                                                    return String.Format("N={0:N0}, Total = {1:N0}, Mean = {2:N2}",
                                                                         x.Result.Item1, x.Result.Item2, 
                                                                         x.Result.Item3);
                                                 } );

작업을 조건에 따라 연속 처리 여부를 결정하려면 TaskContinuationOptions 열거형을 Task.ContinueWith 메서드의 매개변수로 전달하면 됩니다. TaskContinuationOptions 열거형의 예시는 아래와 같습니다.

  • NotOnFaulted 선행 작업이 예외를 발생시킨 경우 연속 작업이 예약되지 않도록 합니다.
  • OnlyOnFaulted 선행 작업이 예외를 발생시킨 경우에만 연속 작업이 예약되도록 합니다.

작업 완료 기다리기

만약 작업 처리를 모두 완료시켜야 다음 작업을 진행할 수 있다면, Task.Wait 메서드로 작업을 기다리면 됩니다.

Task[] tasks = new Task[3]
{
    Task.Factory.StartNew(() => MethodA()),
    Task.Factory.StartNew(() => MethodB()),
    Task.Factory.StartNew(() => MethodC())
};

//모든 작업이 완료되길 대기
Task.WaitAll(tasks);

작업 중첩 시 주의할 점

작업 내에 작업을 생성하여 처리하면 작업을 중첩하여 처리하게 됩니다. 일반적인 순차 동기 코드에선 외부 코드는 내부 코드가 완료되어야 완료될 수 있습니다. 하지만 비동기 병렬 작업에선 외부 작업이 내부 작업을 기다리지 않고 종료됩니다. 이러한 특징은 연속 작업에도 영향을 미칩니다. 연속 작업은 내부 작업을 기다리지 않고 외부 작업이 완료되면 바로 시작하게 됩니다.

// 외부 작업 시작
var outer = Task.Factory.StartNew(() =>
{
    Console.WriteLine("Outer task beginning.");

    // 내부 작업 시작
    var inner = Task.Factory.StartNew(() =>
    {
        Thread.SpinWait(5000000);
        Console.WriteLine("Inner task completed.");
    });
});

// 외부 작업이 완료되면서 내부 작업도 완료될거라 생각하지만,
// 내부 작업 완료 여부와 상관없이 코드는 진행된다.
outer.Wait();
Console.WriteLine("Outer task completed.");

// 결과:
//    Outer task beginning.
//    Outer task completed.
//    Inner task completed.

만약 외부 작업과 함께 내부 작업의 완료를 기다리고 싶다면, TaskCreationOptions.AttachedToParent를 사용하면 됩니다. 이때 연속 작업도 또한 내부 작업 완료를 기다리게 됩니다.

var parent = Task.Factory.StartNew(() =>
{
    var child = Task.Factory.StartNew(() =>
    {
        Thread.SpinWait(5000000);
    }, 
    // 부모 작업에 연결
    TaskCreationOptions.AttachedToParent);

    // 자식 작업도 완료되길 기다린다.
    parent.Wait();
});

여러 작업을 묶어서 구성하기

Task.WhenAll

여러 작업을 묶어서 하나의 작업으로 만들고 묶인 작업들이 다 끝나야 완료됩니다. 여러 작업이 끝나야 진행할 수 있는 코드 흐름에 유용합니다.

      foreach (var value in works) {
         tasks.Add(Task.Run( () => { DoSomeWork(value) }));
      }

      // 모든 작업이 마무리되어야 다음 코드로 진행할 수 있다.
      Task t = Task.WhenAll(tasks);

      try {
         t.Wait();
      }
      catch {}   

Task.WhenAny

여러 작업을 묶어서 하나의 작업으로 만들고 묶인 작업들 중 하나라도 완료되면 끝납니다. 여러 작업 중 하나만 완료되면 되는 작업에 유용합니다.

      foreach (var value in works) {
         tasks.Add(Task.Run( () => { DoSomeWork(value) }));
      }

      // 하나의 작업만 완료되어도 다음 코드로 진행할 수 있다.
      Task t = Task.WhenAny(tasks);

      try {
         t.Wait();
      }
      catch {}   

Task.WhenAny / Task.WhenAll 과 Task.WaitAny / Task.WaitAll 의 차이는 스레드의 코드 실행을 막는지 여부입니다. When 메서드는 Task 객체를 반환하기 때문에 await 키워드로 비동기 흐름을 만들 수 있지만, Wait 메서드는 작업이 완료되길 기다려야 합니다.

작업에서 발생한 예외 처리하기

작업 내에서 예외가 발생하면 작업을 호출한 스레드로 예외가 전달됩니다. 예외는 AggregateException 로 래핑되어 전달됩니다. AggregateException 의 InnerExceptions 속성으로 실제 발생한 예외를 확인할 수 있습니다.

아래 예제로 예외가 전파되는 양상을 알 수 있습니다.

      var task1 = Task.Run( () => { throw new CustomException("This exception is expected!"); } );

      try
      {
          task1.Wait();
      }
      catch (AggregateException ae)
      {
          foreach (var e in ae.InnerExceptions) {
              // 작업으로부터 전파된 예외 처리
              if (e is CustomException) {
                  Console.WriteLine(e.Message);
              }
              // 처리되지 않은 예외는 다시 던진다.
              else {
                  throw;
              }
          }
      }

만약 중첩된 작업에서 발생한 예외로 인해 AggregateException 도 중첩되면, 예외를 처리하는 코드 구간에서 전판된 AggregateException 객체의 Flatten 메서드를 호출하면 중첩을 없앨 수 있습니다.

중첩된 작업 구조에서 자식 작업이 부모 작업과 분리되어 생성되었다면 예외가 누락될 수 있으니 주의바랍니다. 자식 작업은 부모 작업과 연결하지 않으면 분리되어 생성됩니다.

작업 취소하기

병렬 프로그래밍이든 비동기 프로그래밍이든 Task 클래스를 기반으로 작업을 관리합니다. 따라서 작업을 취소하는 방법도 동일합니다. .NET 에서 작업 취소는 취소 토큰 을 통해 이루어집니다. 취소 토큰은 CancellationTokenSource 클래스로 생성할 수 있습니다. 아래 순서로 작업 취소를 위한 코드를 작성하시면 됩니다.

  1. CancellationTokenSource 개체를 생성합니다.
  2. 작업 취소에 영향을 받는 모든 작업에 CancellationTokenSource.Token 속성을 전달합니다.
  3. 각 작업 별로 취소에 응답하는 코드를 작성합니다.
  4. CancellationTokenSource.Cancel 메서드를 호출해 작업 취소를 시작합니다.

CancellationTokenSource 클래스는 개발자가 직접 Dispose 하여 자원을 해제해야 합니다.

각 작업 별로 취소에 응답하려면 CancellationTokenSource 개체로부터 전달된 CancellationToken.IsCancellationRequested 속성 값을 확인해야 합니다. 만약 취소가 어디서 부터 시작되었는지 파악하길 원한다면 OperationCanceledException 예외를 던지면 됩니다. 자세한 내용은 공식 문서를 확인해주시길 바랍니다. 아래 예제 코드는 간단한 작업 취소 코드입니다.

        var tokenSource = new CancellationTokenSource();
        CancellationToken ct = tokenSource.Token;

        var task = Task.Run(() =>
        {
            bool moreToDo = true;
            while (moreToDo)
            {
                if (ct.IsCancellationRequested)
                {
                    ct.ThrowIfCancellationRequested();
                }
            }
        }, tokenSource.Token);

        tokenSource.Cancel();

        try
        {
            await task;
        }
        catch (OperationCanceledException e)
        {

        }
        finally
        {
            tokenSource2.Dispose();
        }

예제 : 병렬로 이진 트리 탐색

public class TreeWalk
{
    static void Main()
    {
        Tree<MyClass> tree = new Tree<MyClass>();

        // ...populate tree (left as an exercise)

        // Define the Action to perform on each node.
        Action<MyClass> myAction = x => Console.WriteLine("{0} : {1}", x.Name, x.Number);

        // Traverse the tree with parallel tasks.
        DoTree(tree, myAction);
    }

    public class MyClass
    {
        public string Name { get; set; }
        public int Number { get; set; }
    }
    public class Tree<T>
    {
        public Tree<T> Left;
        public Tree<T> Right;
        public T Data;
    }

    // By using tasks explcitly.
    public static void DoTree<T>(Tree<T> tree, Action<T> action)
    {
        if (tree == null) return;
        var left = Task.Factory.StartNew(() => DoTree(tree.Left, action));
        var right = Task.Factory.StartNew(() => DoTree(tree.Right, action));
        action(tree.Data);

        try
        {
            Task.WaitAll(left, right);
        }
        catch (AggregateException )
        {
            //handle exceptions here
        }
    }

    // By using Parallel.Invoke
    public static void DoTree2<T>(Tree<T> tree, Action<T> action)
    {
        if (tree == null) return;
        Parallel.Invoke(
            () => DoTree2(tree.Left, action),
            () => DoTree2(tree.Right, action),
            () => action(tree.Data)
        );
    }
}
반응형