데이터 흐름
데이터 흐름은 프로그래밍 패러다임이 다른 병렬 프로그래밍입니다. 일반적으로 배우는 프로그래밍 언어는 코드로 절차를 정의하고 실행해 결과를 만듭니다. 절차는 메서드로 구현되고 메서드의 실행 순서에 따라 프로그램이 동작합니다.
반면, 데이터 흐름은 절차가 아닌 데이터가 중심입니다.
데이터 중심이란 데이터로 프로그램의 동작을 정의하게 되는걸 의미합니다. 데이터 흐름에서 프로그램은 필요한 데이터를 준비시키고, 데이터가 준비되어야 다음 연산을 실행합니다. 다시말해 프로그램은 데이터가 준비되어야만 진행됩니다. 예를 들어 여러 이미지를 합성해 하나의 이미지로 만드는 프로그램은 소스가 되는 이미지들이 준비되었는지를 먼저 확인합니다. 소스 이미지들이 준비되었으면 프로그램은 이미지를 합성하는 연산을 진행합니다. 마지막으로 연산 결과인 합성된 이미지를 소스 데이터로 필요로 하는 다음 연산에 알려 프로그램이 진행되도록 합니다.
필요한 데이터만 준비되면 작업을 진행할 수 있어, 각 스레드에 필요한 데이터만 준비되면 동시에 실행할 수 있습니다. 따라서 데이터 흐름은 기존 프로그래밍 방식보다 병렬 프로그래밍을 손쉽게 구현할 수 있습니다.
.NET에서 데이터 흐름
이제 데이터 흐름을 .NET 언어에서 사용하는 방법을 알려드리겠습니다. 먼저 .NET 언어는 TPL 데이터 흐름 라이브러리인 System.Threading.Tasks.Dataflow NuGet 패키지를 설치하셔야 합니다. Visual Studio 에서 'NuGet 패키지 관리'에서 설치하시거나, 아래 명령어로 설치하시면 됩니다.
dotnet add package System.Threading.Tasks.Dataflow
TPL (Task Parallel Library) 에서 데이터 흐름 블록 (Dataflow bolock) 을 개발자에게 제공하는데, 이름 그대로 데이터의 흐름을 정의하는데 사용되는 객체입니다. 데이터 흐름 블록 을 이해하고 활용할 줄 알아야 데이터 흐름을 구현할 수 있습니다. 여러 데이터 흐름 블록들을 연결하여 데이터 흐름 네트워크를 구성하는게 데이터 흐름 구현의 핵심입니다. 데이터 흐름 블록은 역할에 따라 아래와 같이 종류가 나눠지게 됩니다.
데이터 흐름 블록 종류
- Source 블록 : 소스 데이터 역할을 하는 블록. 데이터를 읽는데 활용된다.
- Target 블록 결과 데이터를 저장하는 블록. 데이터를 쓰는데 활용된다.
- Propagator 블록 데이터를 전파하는 블록. 데이터를 읽거나 쓰는데 활용된다.
데이터 흐름 블록의 연결
ISourceBlock<TOutput>.LinkTo 메서드로 Source 블록을 Target 블록에 연결하면 데이터 흐름을 만들 수 있습니다. 그리고 소스 데이터 값에 따라 데이터를 전달할지 말지 결정하는 필터링도 가능합니다. ISourceBlock<TOutput>.LinkTo 메서드에 대리자 메서드를 제공하면 소스 데이터를 연결된 Target 블록에 전달할지 말지를 결정할 수 있습니다. 이렇게 연결된 데이터 흐름 블록은 서로 메시지를 전달하며 작동하게 됩니다.
메시지 전달
데이터 흐름 블록이 연결되었다면 이제 데이터 흐름 블록 간에 데이터를 어떻게 주고 받는지를 설명드리도록 하겠습니다.
먼저 데이터 흐름 네트워크의 맨 처음 블록에 초기 데이터를 전달할 때는 Post 와 SendAsync 메서드를 사용합니다. 그리고 데이터 흐름 네트워크의 맨 마지막 블록에서 결과 데이터를 받을 때는 Receive 와 ReceiveAsync 메서드를 사용합니다.
데이터 흐름 블록 간에 메시지 전달은 ITargetBlock<TInput>.OfferMessage 메서드를 통해 이루어집니다.
ITargetBlock<TInput> 는 OfferMessage 메서드를 통해 메시지 제안을 받으면 DataflowMessageStatus를 반환합니다. DataflowMessageStatus 는 열거형으로 'Accepted', 'Declined', 'Postponed', 'NotAvailable', 'DecliningPermanently' 이 있습니다. 각 반환값 이름은 ITargetBlock<TInput> 가 메시지를 어떻게 처리했는지를 의미하며, 'DecliningPermanently' 이 반환되었을 경우 자동으로 데이트 흐름 간 연결이 끊어지게 됩니다.
데이터 흐름 완료
비동기 처리와 병렬 처리를 위해 데이터 흐름 블록 내부에 Task 객체가 있습니다. 데이터 흐름 블록의 Completion 속성을 통해 Task 객체에 접근 가능합니다. 즉 Completion 속성으로 데이터 흐름 블록 완료 여부를 확인하거나, 연속 작업을 만들거나, 작업 대기가 가능합니다. 자세한 내용은 [C#] 병렬 프로그래밍 Parallel Programming (2) - 작업 병렬화 글에 설명되어 있으니, 읽어보시길 바랍니다.
TPL 에서 제공하는 유용한 데이터 흐름 블록
병렬 프로그래밍 개발자가 데이터 흐름을 더욱 원활히 구현할 수 있도록 TPL 이 여러 데이터 흐름 블록을 제공하고 있습니다. '이런 것도 있구나' 하고 살펴보시길 바랍니다.
버퍼링 블록
메시지를 입력하고 출력하는 큐. 여러 메시지를 전달할 때 유용하게 사용할 수 있습니다.
var bufferBlock = new BufferBlock<int>();
// bufferBlock에 여러 메시지 전달
for (int i = 0; i < 3; i++)
{
bufferBlock.Post(i);
}
// bufferBlock 메시지 처리
for (int i = 0; i < 3; i++)
{
Console.WriteLine(bufferBlock.Receive());
}
/* 결과:
0
1
2
*/
메시지를 읽은 후에도 제거되지 않습니다. 동일한 값을 계속 사용할 수 있습니다.
var broadcastBlock = new BroadcastBlock<double>(null);
broadcastBlock.Post(Math.PI);
for (int i = 0; i < 3; i++)
{
Console.WriteLine(broadcastBlock.Receive());
}
/* 결과:
3.14159265358979
3.14159265358979
3.14159265358979
*/
메시지를 한번만 전달받을 수 있습니다. BroadcastBlock<T> 처럼 메시지가 제거되지 않아, 여러번 사용할 수 있습니다.
var writeOnceBlock = new WriteOnceBlock<string>(null);
// 여러 메시지를 병렬로 전달.
// 제일 먼저 전달된 메시지만 기록되고 나머지 메시지는 버려진다.
Parallel.Invoke(
() => writeOnceBlock.Post("Message 1"),
() => writeOnceBlock.Post("Message 2"),
() => writeOnceBlock.Post("Message 3"));
Console.WriteLine(writeOnceBlock.Receive());
/* 예제 결과:
Message 2
*/
실행 블록
메세지가 전달될 때마다 대리자 메서드를 실행하는 블록입니다. 즉 비동기적으로 소스 데이터가 준비되는대로 실행되는 데이터 흐름 블록이라 생각하시면 됩니다.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));
for (int i = 0; i < 3; i++)
{
actionBlock.Post(i * 10);
}
// actionBlock 이 현재 작업만 마무리하도록 설정
actionBlock.Complete();
// 나머지 작업이 마무리되도록 대기
actionBlock.Completion.Wait();
/* 결과:
0
10
20
*/
작은 팁을 드리자면 Func<TInput, Task> 를 대리자 메서드 대신 전달하면 내부 동작도 비동기로 처리 가능합니다.
ActionBlock<TInput> 와 마찬가지로 전달된 대리자 메서드를 실행합니다. 다른 점은 Source 와 Target 역할을 모두 수행하기 때문에 Receive 메서드로 결과를 받을 수 있습니다.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);
for (int i = 0; i < 3; i++)
{
Console.WriteLine(transformBlock.Receive());
}
/* 결과:
3.16227766016838
4.47213595499958
5.47722557505166
*/
TransformBlock<TInput, TOutput> 마찬가지로 동작하지만, 결과를 시퀀스로 반환할 수 있습니다.
var transformManyBlock = new TransformManyBlock<string, char>(s => s.ToCharArray());
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
Console.WriteLine(transformManyBlock.Receive());
}
/* 결과:
H
e
l
l
o
W
o
r
l
d
*/
수행 블록의 내부 처리를 동기로 진행할 때와 비동기로 진행할 때 사용해야 하는 대리자 메서드를 아래 정리해 놓았으니 참고바랍니다.
Type | Synchronous Delegate | Asynchronous Delegate |
ActionBlock<TInput> | Action | Func<TInput, Task> |
TransformBlock<TInput, TOutput> | Func<TInput, TOutput> | Func<TInput, Task<Toutput> |
TransformManyBlock<TInput, TOutput> | Func<TInput, IEnumberable<TOutput>> | Func<TInput, Task<IEnumberable<TOutput>>> |
그룹 블록
// int 값을 10개 묶음으로 반환하는 BatchBlock 생성
var batchBlock = new BatchBlock<int>(10);
// 0 부터 12 까지 batchBlock 에 메시지로 전달
for (int i = 0; i < 13; i++)
{
batchBlock.Post(i);
}
batchBlock.Complete();
// 0 부터 12 까지 총 13 개
// BatchBlock 은 10개 묶음으로 인식
// 따라서, 10개를 batch 1으로, 나머지 3개인 batch 2로 묶는다.
Console.WriteLine("The sum of the elements in batch 1 is {0}.",
batchBlock.Receive().Sum());
Console.WriteLine("The sum of the elements in batch 2 is {0}.",
batchBlock.Receive().Sum());
/* 결과:
// 0 + 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9
The sum of the elements in batch 1 is 45.
// 10 + 11 + 12
The sum of the elements in batch 2 is 33.
*/
BatchBlock<T> 는 데이터베이스에 자료를 입력할 때 사용하면 유용합니다.
소스 데이터를 받아서 Tuple 타입으로 묶어서 반환합니다.
var joinBlock = new JoinBlock<int, int, char>();
joinBlock.Target1.Post(3);
joinBlock.Target2.Post(5);
joinBlock.Target3.Post('+');
joinBlock.Target1.Post(6);
joinBlock.Target2.Post(4);
joinBlock.Target3.Post('-');
for (int i = 0; i < 2; i++)
{
var data = joinBlock.Receive();
switch (data.Item3)
{
case '+':
Console.WriteLine("{0} + {1} = {2}",
data.Item1, data.Item2, data.Item1 + data.Item2);
break;
case '-':
Console.WriteLine("{0} - {1} = {2}",
data.Item1, data.Item2, data.Item1 - data.Item2);
break;
default:
Console.WriteLine("Unknown operator '{0}'.", data.Item3);
break;
}
}
/* 결과:
3 + 5 = 8
6 - 4 = 2
*/
BatchBlock<T> 과 JoinBlock<T1,T2> 의 특징을 합한 데이터 흐름 블록입니다. 여러 데이터를 정해진 수만큼 그룹으로 만들고, 묶은 데이터를 Tuple로 연결합니다.
Func<int, int> DoWork = n =>
{
if (n < 0)
throw new ArgumentOutOfRangeException();
return n;
};
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
try
{
batchedJoinBlock.Target1.Post(DoWork(i));
}
catch (ArgumentOutOfRangeException e)
{
batchedJoinBlock.Target2.Post(e);
}
}
var results = batchedJoinBlock.Receive();
foreach (int n in results.Item1)
{
Console.WriteLine(n);
}
foreach (Exception e in results.Item2)
{
Console.WriteLine(e.Message);
}
/* 결과:
5
6
13
55
0
Specified argument was out of the range of valid values.
Specified argument was out of the range of valid values.
*/
데이터 흐름 블록을 실제로 활용하는 예제는 [C#] 병렬 프로그래밍 Parallel Programming (4) - 데이터 흐름 예제 글에서 설명드리도록 하겠습니다.
'Develop > .NET 가이드' 카테고리의 다른 글
[C#] 리플렉션 Reflection (1) : 왜 알아야 할까? (0) | 2020.06.18 |
---|---|
[C#] 병렬 프로그래밍 Parallel Programming (4) - 데이터 흐름 예제 (0) | 2020.02.16 |
[C#] 병렬 프로그래밍 Parallel Programming (2) - 작업 병렬화 (2) | 2020.02.08 |
[C#] 병렬 프로그래밍 Parallel Programming (1) - 데이터 병렬화 (0) | 2020.02.05 |
[C#] LINQ 사용방법 - 조인 작업 Join, GroupJoin (0) | 2020.02.02 |
꾸준히 노력하는 개발자 "김예건" 입니다.