본문 바로가기

Develop/.NET 가이드

[C#] 병렬 프로그래밍 Parallel Programming (4) - 데이터 흐름 예제

반응형

parallel programming
parallel programming

데이터 흐름 예제

기초 예제

아래 예제는 비동기로 BufferBlock<T> 데이터 흐름 블록에 데이터를 입력했다가 출력하는 예제입니다.


var bufferBlock = new BufferBlock<int>();

// bufferBlock 에 비동기로 데이터 넣기
for (int i = 0; i < 3; i++)
{
   await bufferBlock.SendAsync(i);
}

// bufferBlock 에 비동기로 데이터 빼기
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(await bufferBlock.ReceiveAsync());
}

/* 결과:
   0
   1
   2
 */

아래 예제는 병렬로 데이터 흐름 블록에 데이터를 입력했다가 출력하는 예제입니다.


var bufferBlock = new BufferBlock<int>();

// post01, receive, post2 작업 정의
var post01 = Task.Run(() =>
{
    bufferBlock.Post(0);
    bufferBlock.Post(1);
});

var receive = Task.Run(() => 
{
    for (int i = 0; i < 3; i++)
    {
        Console.WriteLine(bufferBlock.Receive());
    }
});

var post2 = Task.Run(() => 
{
    bufferBlock.Post(2);
});

// post01, receive, post2 작업을 동시 실행
Task.WaitAll(post01, receive, post2);

/* 가능한 출력:
    2
    0
    1
*/

간단한 데이터 흐름 파이프라인 예제

웹 사이트에서 영문 텍스트를 다운로드 받아, 역순으로 배열하여 만든 단어가 기존 텍스트에 존재하는 단어만 검색하여 출력하는 콘솔 프로그램을 데이터 흐름으로 작성해봅시다.

데이터 흐름 블록 만들기

먼저, 파이프라인에 참여할 데이터 흐름 블록을 구현합니다.

// 웹 사이트에서 영문 텍스트를 다운로드하는 데이터 흐름 블록
var downloadString = new TransformBlock<string, string>(async uri =>
{
   Console.WriteLine("Downloading '{0}'...", uri);

   return await new HttpClient().GetStringAsync(uri);
});
// 텍스트를 단어 별로 나누는 데이터 흐름 블록
var createWordList = new TransformBlock<string, string[]>(text =>
{
   Console.WriteLine("Creating word list...");

   // Remove common punctuation by replacing all non-letter characters 
   // with a space character.
   char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
   text = new string(tokens);

   // Separate the text into an array of words.
   return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});
// 너무 짧거나 중복되는 단어를 제거하는 데이터 흐름 블록
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
   Console.WriteLine("Filtering word list...");

   return words
      .Where(word => word.Length > 3)
      .Distinct()
      .ToArray();
});
// 역순으로 배열한 단어가 기존 텍스트에 존재하는지 검색하는 데이터흐름 블록
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
   Console.WriteLine("Finding reversed words...");

   var wordsSet = new HashSet<string>(words);

   return from word in words.AsParallel()
          let reverse = new string(word.Reverse().ToArray())
          where word != reverse && wordsSet.Contains(reverse)
          select word;
});

위 데이터 흐름 블록이 다른 데이터흐름 블록과 달리, TransformMayBlock 클래스를 사용한 이유는 string[] 배열 하나를 입력으로 받아, 여러 string 출력을 만들어 내기 때문입니다. 즉 입력 하나에 출력 다수를 만드는 데이터 흐름 블록이 필요하기 때문입니다.

// 결과를 출력하는 데이터흐름 블록   
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
   Console.WriteLine("Found reversed words {0}/{1}",
      reversedWord, new string(reversedWord.Reverse().ToArray()));
});

위 데이터 흐름 블록이 다른 데이터 흐름 블록과 달리, ActionBlock<TInput> 클래스를 사용한 이유는 데이터 출력이 필요하지 않기 때문입니다.

파이프라인 만들기

// 데이터 흐름 블록을 연결하여 파이프라인 구성

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);

파이프라인 머리에 데이터 입력하기

// 파이프라인의 시작인 downloadString 데이터 흐름 블록에 데이터 전달
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");

파이프라인 닫기

// 파이프라인의 시작인 downloadString 데이터 흐름 블록에 데이터 전달 차단
downloadString.Complete();

파이프라인이 완료되길 기다리기

// 파이프라인의 끝인 printReversedWords 데이터 흐름 블록 완료 대기
printReversedWords.Completion.Wait();
반응형