Worker Thread 패턴

패턴 명칭

Worker Thread

필요한 상황

어떤 데이터가 생성되면, 이 데이터를 또 다른 여러 개의 스레드에서 처리한다. 물론 데이터의 생성 역시 또 다른 여러 개의 스레드에서 처리한다. 데이터의 처리를 동시에 처리하면서 처리하는 방식은 한가지로 정해진 것이 아닌 다양한 방식으로 수행되며, 간단이 추가될 수 있어야 한다. 이때 사용할 수 있는 패턴이다. 사실, 데이터의 다양한 방식의 처리는 Worker Thread의 응용이다.

예제 코드

Client 클래스는 처리할 데이터를 생성한다. 이렇게 생성된 데이터는 Request라는 클래스에 담기게 되는데 이 Request는 추상 클래스이며, 이 클래스를 상속받아 데이터에 대한 처리 방식을 정의할 수 있다. Client가 생성한 Request는 바로 처리되는게 아니고 Channel 클래스에 저장된다. 이렇게 저장된 데이터에 대한 Request는 Worker 클래스에의해 스레드로 처리된다. 이 Worker 클래스는 WorkerPool이라는 스레드 저장소에 미리 생성되어 관리된다. 이러한 클래스들을 사용하는 코드는 다음과 같다.

package tstThread;

public class Main {
	public static void main(String[] args) {
		Channel channel = new Channel(10);
		WorkerPool workers = new WorkerPool(5, channel);
		workers.start();
		
		new Client("ClientA", channel).start();
		new Client("ClientB", channel).start();
		new Client("ClientC", channel).start();
	}
}

Channel에 최대로 저장할 수 있는 Request의 개수는 10개, 데이터를 처리하는 Worker의 개수는 5개로 정했으며, 데이터를 생성하는 Client 스레드의 개수는 3개이다. Client 클래스의 코드는 다음과 같다.

package tstThread;

import java.util.Random;

public class Client extends Thread {
	private final Channel channel;
	private static final Random random = new Random();
	
	public Client(String name, Channel channel) {
		super(name);
		this.channel = channel;
	}
	
	public void run() {
		try {
			for(int i = 0; true; i++) {
				Request request;

				Thread.sleep(random.nextInt(1000));
				if(random.nextInt(2) == 0) { 
					request = new OneRequest(getName(), i);
				} else {
					request = new TwoRequest(getName(), i);
				}
				
				channel.putRequest(request);
			}
		} catch(InterruptedException e) {
			//.
		}
	}
}

처리해야할 데이터는 정수값이며, 이 정수값의 데이터에 대한 처리는 무작위로 결정되는데, 실제 처리에 대한 코드는 OneRequest와 TwoRequest 클래스에 정의되어 있다. Request 추상 클래스에 대한 코드는 다음과 같다.

package tstThread;

public abstract class Request {
	protected final String clentName;
	protected final int number;
	
	public Request(String clentName, int number) {
		this.clentName = clentName;
		this.number = number;
	}
	
	public abstract void execute();
}

이 추상클래스를 구현하는 OneRequest 클래스는 다음과 같다.

package tstThread;

public class OneRequest extends Request {

	public OneRequest(String name, int number) {
		super(name, number);
	}

	public void execute() {
		String result = "ECHO: " + number + "@" + clentName;
		System.out.println(Thread.currentThread().getName() + " -> " + result);
	}
}

또 다른 처리 방식인 TwoRequest 클래스는 다음과 같다.

package tstThread;

public class TwoRequest extends Request {

	public TwoRequest(String name, int number) {
		super(name, number);
	}

	public void execute() {
		String result = "POWER: " + (number*number) + "@" + clentName;
		System.out.println(Thread.currentThread().getName() + " -> " + result);
	}
}

이 Request 클래스에 대한 객체는 Client가 생성하여 Channel에 저장되는데, Channel 클래스의 코드는 다음과 같다.

package tstThread;

import java.util.LinkedList;

public class Channel {
	private final int maxCountRequests;
	private final LinkedList<Request> requestQueue = new LinkedList<Request>();
	
	public Channel(int maxCountRequests) {
		this.maxCountRequests = maxCountRequests;
	}
	
	public synchronized void putRequest(Request request) {
		while(requestQueue.size() >= maxCountRequests) {
			try {
				wait();
			} catch(InterruptedException e) {
				//.
			}
		}
		
		requestQueue.addLast(request);
		notifyAll();
	}
	
	public synchronized Request takeRequest() {
		while(requestQueue.size() <= 0) {
			try {
				wait();
			} catch(InterruptedException e) {
				//.
			}
		}
		
		Request request = requestQueue.pollFirst();
		notifyAll();

		return request;
	}
}

데이터를 처리하는 Worker 클래스는 다음과 같다.

package tstThread;

public class Worker extends Thread {
	private final Channel channel;
	
	public Worker(String name, Channel channel) {
		super(name);
		this.channel = channel;
	}
	
	public void run() {
		while(true) {
			Request request = channel.takeRequest();
			request.execute();
		}
	}
}

Worker 클래스의 객체는 WorkerPool이라는 클래스를 통해 생성되어 관리되며 코드는 다음과 같다.

package tstThread;

public class WorkerPool {
	private final Worker[] threadPool;
	
	public WorkerPool(int countThreads, Channel channel) {
		threadPool = new Worker[countThreads];
		for(int i=0; i<threadPool.length; i++) {
			threadPool[i] = new Worker("Worker-" + i, channel);
		}
	}
	
	public void start() {
		for(int i=0; i<threadPool.length; i++) {
			threadPool[i].start();
		}
	}
}

실행 결과는 다음과 같다.

Worker-0 -> ECHO: 0@ClientA
Worker-4 -> ECHO: 0@ClientB
Worker-3 -> POWER: 0@ClientC
Worker-2 -> POWER: 1@ClientB
Worker-3 -> POWER: 4@ClientB
Worker-2 -> POWER: 1@ClientC

.
.
.

orker-0 -> POWER: 625@ClientC
Worker-2 -> POWER: 784@ClientB
Worker-0 -> POWER: 841@ClientB
Worker-2 -> POWER: 784@ClientA
Worker-0 -> POWER: 841@ClientA
Worker-2 -> ECHO: 30@ClientB
Worker-0 -> POWER: 676@ClientC
Worker-2 -> ECHO: 30@ClientA
Worker-3 -> POWER: 961@ClientA
Worker-2 -> ECHO: 32@ClientA
Worker-3 -> POWER: 961@ClientB

Producer-Consumer 패턴

패턴 명칭

Producer-Consumer

필요한 상황

어떤 데이터가 생성되면, 그 생성된 데이터를 받아 처리하는 상황에서 멀티 스레드를 활용하여 처리량과 속도를 늘리고자할 적용할 수 있는 패턴이다. 데이터를 생성하는 스레드와 이를 처리하는 스레드를 분리할 수 있으며 각각은 1개 이상의 스레드로 구성될 수 있다. 데이터를 생성하는 스레드를 Producer, 데이터를 처리하는 스레드를 Consumer이다. Producer는 처리하고자 하는 데이터를 생성해 저장소에 저장해 두면 Consumer는 이 저장소에서 데이터를 받아 처리한다. 여기서 저장소는 Channel이라고 한다. Channel은 데이터를 저장할 수 있는 물리적 한계가 있으며, Producer는 이 물리적 한계 내에서 데이터를 저장하고 저장할 수 있는 공간이 없을 경우 대기한다. Consumer는 Channel에 저장된 데이터를 가져와 처리하는데, 만약 Channel에 데이터가 없을 경우 대기한다.

예제 코드

위의 클래스 다이어그램에서 언급된 클래스들은 앞서 설명한 내용과 같다. 먼저 이 클래스들을 사용하는 예제 코드는 다음과 같다.

package tstThread;

public class Main {
	public static void main(String[] args) {
		Channel channel = new Channel(10);
		
		new Producer("PRODUCER-1", channel).start();
		new Producer("PRODUCER-2", channel).start();
		
		new Consumer("CONSUMER-A", channel).start();
		new Consumer("CONSUMER-B", channel).start();
		new Consumer("CONSUMER-C", channel).start();
		new Consumer("CONSUMER-D", channel).start();
	}
}

Producer 스레드는 2개, Consumer 스레드는 4개를 구성시킨다. Channel에 저장할 수 있는 데이터의 최대 용량은 10으로 지정했다. 이 예제에서 Producer가 생성하는 데이터는 문자열이며, Producer 클래스는 다음과 같다.

package tstThread;

public class Producer extends Thread {
	private final Channel channel;
	private int number = 0;
	
	public Producer(String name, Channel channel) {
		super(name);
		this.channel = channel;
	}
	
	public void run() {
		try {
			while(true) {
				//Thread.sleep(200);
				
				String data = "[DATA-" + number + "@" + getName() + "]";
				channel.put(data);
				number++;
			}
		} catch(InterruptedException e) {
			//.
		}
	}
}

생성된 데이터를 받아 처리하는 Consumer 클래스는 다음과 같은데, 받은 문자열 데이터를 조합한 새로운 문자열을 출력한다.

package tstThread;

public class Consumer extends Thread {
	private final Channel channel;
	
	public Consumer(String name, Channel channel) {
		super(name);
		this.channel = channel;
	}
	
	public void run() {
		try {
			while(true) {
				String data = channel.get();
				
				//Thread.sleep(100);
				System.out.println("#: " + getName() + " consumes " + data);
			}
		} catch(InterruptedException e) {
			//.
		}
	}
}

다음은 Producer가 생성한 데이터를 저장하는 Channel 클래스이다.

package tstThread;

import java.util.LinkedList;

public class Channel {
	private final LinkedList<String> dataBuffer = new LinkedList<String>();
	private final int maxSize;
	
	public Channel(int maxSize) {
		this.maxSize = maxSize;
	}
	
	public synchronized void put(String data) throws InterruptedException {
		while(dataBuffer.size() >= maxSize) {
			wait();
		}
		
		dataBuffer.addLast(data);
		
		notifyAll();
	}
	
	public synchronized String get() throws InterruptedException {
		while(dataBuffer.size() <= 0) {
			wait();
		}
		
		String data = dataBuffer.pollFirst();
		notifyAll();
		
		//System.out.println("\t" + dataBuffer.size());
		
		return data;
	}
}

실행결과는 다음과 같다.

#: CONSUMER-C consumes [DATA-58449@PRODUCER-1]
#: CONSUMER-C consumes [DATA-59980@PRODUCER-2]
#: CONSUMER-B consumes [DATA-58456@PRODUCER-1]
#: CONSUMER-C consumes [DATA-59981@PRODUCER-2]
#: CONSUMER-B consumes [DATA-59982@PRODUCER-2]
#: CONSUMER-C consumes [DATA-59983@PRODUCER-2]

.
.
.

#: CONSUMER-B consumes [DATA-58476@PRODUCER-1]
#: CONSUMER-A consumes [DATA-58477@PRODUCER-1]
#: CONSUMER-B consumes [DATA-58478@PRODUCER-1]
#: CONSUMER-A consumes [DATA-58479@PRODUCER-1]
#: CONSUMER-B consumes [DATA-58480@PRODUCER-1]
#: CONSUMER-A consumes [DATA-58481@PRODUCER-1]
#: CONSUMER-C consumes [DATA-58475@PRODUCER-1]
#: CONSUMER-C consumes [DATA-59988@PRODUCER-2]
#: CONSUMER-D consumes [DATA-58472@PRODUCER-1]

Read-Write Lock 패턴

패턴 명칭

Read-Write Lock

필요한 상황

Database에서 어떤 테이블이 있다고 하자. 이 테이블은 다수의 클라이언트에서 동시에 읽고 쓰이는 대상이다. 어떤 클라이언트는 이 테이블에 데이터를 쓰고, 어떤 또 다른 클라이언트는 이 테이블에서 데이터를 읽는다. 그런데 만약 2개의 클라이언트에서 특정 레코드 데이터를 동시에 각각 쓰고 읽기를 수행하면 읽는 쓰레드 쪽에서는 망가진 데이터를 읽을 수 있다. 또한 만약 2개의 클라이언트에서 동시에 특정 레코드 데이터를 쓰려고 할때 망가진 데이터가 저장될 수 있다. 문제가 없는 경우는 2개의 클라이언트가 동시에 특정 레코드의 데이터를 읽을 때 뿐이다. 이 패턴은 어떤 데이터에 대해 동시에 읽고 쓸때의 상황에서, 또는 동시에 데이터를 쓰는 상황에서 문제가 발생하지 않도록 해주는데 목적이 있다.

예제 코드

위의 클래스 다이어그램에서 언급된 클래스 중 Reader와 Writer는 각각 어떤 데이터를 읽고 쓰는 스레드이며, Data는 이 쓰레드가 읽거나 쓰는 대상이 되는 데이터를 담고 있는 클래스이다. Lock은 읽기와 쓰기에 대한 스레드 제어를 위한 클래스이다. 이 클래스들 사용하는 예제 코드는 다음과 같다.

package tstThread;

public class Main {
	public static void main(String[] args) {
		Data data = new Data(10);
		
		new Reader(data).start();
		new Reader(data).start();
		new Reader(data).start();
		
		String[] weeks = {"월", "화", "수", "목", "금", "토", "일"};
		new Writer(data, weeks).start();
		
		String[] numbers = {"ONE", "TWO", "THREE", "FOUR", "FIVE", "SIX", "SEVEN", "EIGHT", "NINE"};
		new Writer(data, numbers).start();
		
		String[] digits = {"1", "2", "3", "4", "5", "6", "7", "8", "9"};
		new Writer(data, digits).start();
	}
}

Reader 클래스는 다음과 같다.

package tstThread;

public class Reader extends Thread {
	private final Data data;
	
	public Reader(Data data) {
		this.data = data;
	}
	
	public void run() {
		try {
			while(true) {
				String v = data.read();
				System.out.println(Thread.currentThread().getName() + " -> " + v);
			}
		} catch(InterruptedException e) {
			//.
		}
	}
}

Writer 클래스는 다음과 같다.

package tstThread;

import java.util.Random;

public class Writer extends Thread {
	private static final Random random = new Random();
	private final Data data;
	private final String[] inputs;
	private int index = 0;
	
	public Writer(Data data, String[] inputs) {
		this.data = data;
		this.inputs = inputs;
	}
	
	public void run() {
		try {
			while(true) {
				String input = inputs[index];
				index = (index + 1) % inputs.length;
				data.write(input);
				Thread.sleep(random.nextInt(1000));
			}
		} catch(InterruptedException e) {
			//.
		}
	}	
}

Data 클래스는 다음과 같다.

package tstThread;

public class Data {
	private final StringBuilder buffer;
	private final Lock lock = new Lock();
	
	public Data(int size) {
		this.buffer = new StringBuilder("#empty");
	}
	
	public String read() throws InterruptedException {
		lock.readLock();
		try {
			Thread.sleep(100);		
			return buffer.toString();
		} finally {
			lock.readUnlock();
		}
	}
	
	public void write(String v) throws InterruptedException {
		lock.writeLock();
		try {
			Thread.sleep(100);
			buffer.setLength(0);
			buffer.append(v);
		} finally {
			lock.writeUnlock();
		}
	}
}

Lock 클래스는 다음과 같다.

package tstThread;

public class Lock {
	private int readingReaders = 0;
	private int waitingReaders = 0;
	private boolean preferReader = false;
	
	private int waitingWriters = 0;
	private int writingWriters = 0;
	private boolean preferWriter = true;
	
	public synchronized void readLock() throws InterruptedException {
		waitingReaders++;
		try {
			while(writingWriters > 0 || (preferWriter && waitingWriters > 0)) {
				wait();
			}
		} finally {
			waitingReaders--;
		}
		
		readingReaders++;
	}
	
	public synchronized void readUnlock() {
		readingReaders--;
		preferWriter = true;
		preferReader = false;
		notifyAll();
	}
	
	public synchronized void writeLock() throws InterruptedException {
		waitingWriters++;
		try {
			while(readingReaders > 0 || writingWriters > 0 || (preferReader && waitingReaders > 0)) {
				wait();
			}
			
		} finally {
			waitingWriters--;
		}
		
		writingWriters++;
	}
	
	public synchronized void writeUnlock() {
		writingWriters--;
		preferWriter = false;
		preferReader = true;
		notifyAll();
	}
}

실행 결과는 다음과 같다.

Thread-2 -> #empty
Thread-1 -> #empty
Thread-0 -> #empty
Thread-0 -> 월
Thread-2 -> 월
Thread-1 -> 월
Thread-0 -> 1

.
.
.

Thread-0 -> 3
Thread-0 -> FOUR
Thread-2 -> FOUR
Thread-1 -> FOUR

Booch and Rumbaugh’s Unified Notation 0.8

UML 탄생 이전에 존재했다고 판단되는, 부치의 Diagram이라고 생각됩니다. 간단하고 명확하며, 특히 개발자에게는 꼭 필요한 내용을 전달하는 좋은 다이어그램 같습니다.


별다른 설명이 없이도 명확합니다. 비록 클래스 관계만을 제시하지만, UML에서 개발자에게 클래스 관계도가 가장 최고의 설계문서 아니겠습니까?

정리하는 차원에서 살펴보면… Base Class를 기준으로하여 .. “Base Class”가(주어) “Used”를 사용하며.. “Had by Reference”와 “Had By Value”를 맴버 변수로 가지고 있는데, 각각 참조(C++에서는 포인터*나 참조형&)으로 가지고 있다는 의미이고 “Derived 1″과 “Derived 2″에 대한 부모 클래스라는 것입니다. 지금의 클래스 관계도(UML에서)를 처음 접하는 초보 사용자에게 있어서 Composition이냐? Aggregation이냐? 라는 애매모한 정의가 없다는 점이 매우 매력적입니다.

한번쯤 숙지해 놓고, 사용해 볼만한 다이어그램이라고 생각됩니다.

Patterns in “PATTERN-ORIENTED SOFTWARE ARCHITECTURE”

Layer A.P. 어플케이션을 구조화하기 위해 서브 태스크(Subtask)들을 그룹으로 묶기 위해 분해한다. 공통된 추상 레벨에 있는 서브 태스크들끼리 묶어서 그룹으로 분류한다.

Pipes and Filters A.P. 데이터 스트림을 처리하는 시스템 구조를 제공한다. 각 프로세싱 단계는 필터 컴포넌트로 추상화한다. 데이터는 파이프를 통해 연관된 필터들에게 전달된다. 필터들을 다양하게 재조합하여 시스템을 재구축할 수 있다.

Blackboard A.P. 정의되지 않은 도메인에서의 문제를 해결할때 유용하다. 솔루션에 대한 부분적이거나 대략적인 해법을 수립하기 위해 몇가지 특수한 서브시스템들의 지식을 조합한다.

Broker A.P. 분산 소프트웨어 시스템을 구조화할때 유용하다. 분산 소프트웨어 시스템은 분리된 컴포넌트들이 서로 유기적으로 조합되어 운영되는 시스템으로, 이러한 컴포넌트들 간의 통신을 관장하는 역활을 하는 것이 Broker이다.

Model-View-Controller A.P. 모델은 핵심기능과 데이터를 의미하고 뷰는 기능에 의한 데이터의 표현이며 컨트롤은 사용자의 입력에 대한 처리이다. 뷰와 컨트롤러는 사용자의 인터페이스를 구성하며 사용자 인터페이스와 모델간의 일관성 및 정합성을 보장한다.

Presentation-Abstraction-Control A.P. 계층구조를 이루는 에이전트들이 상호작용하는 소프트웨어 시스템에 대한 패턴. 각각의 에이전트는 하나의 어플리케이션의 특정 부분을 전담하며 에이전트는 프리젠테이션/추상/컨트롤로 구성된다.

Microkernel A.P. 변화하는 시스템에 대한 요구사항을 수용할 수 있도록 하는 패턴. 시스템에서 가장 최하단에 위치하는 핵심 기능을 추출해 내며, 추가된 요구사항에 대해 확장기능으로 정의하여 시스템에 손쉽게 추가할 수 있도록 한다.

Reflection A.P. 소프트웨어 시스템의 구조와 동작을 동적으로 변경할 수 있는 메커니즘을 제공.

Whole-Part D.P. 전체(Whole) 객체를 구성하는 컴포넌트(Part)를 정의한다. Whole 객체를 통해 Part 컴포넌트들의 관계를 맺으며, 이 Whole 객체를 통해서만 Part 컴포넌트와 통신할 수 있다.

Master-Slave D.P. 마스터 컴포넌트는 슬레이브 컴포넌트에게 작업을 분산시켜서 최종적으로 슬레이브로부터 그 결과를 취합한다.

Proxy D.P. 실제 컴포넌트가 아닌 대리자를 앞단에 두어 이 대리자를 통해 실제 컴포넌트와 통신을 한다. 실제 컴포넌트의 위치 추상화, 실제 컴포넌트를 사용하기 위한 인증 등과 같은 전처리는 물론 후처리에 대한 기능 추가가 용이하다.

Command Processor D.P. 사용자의 요청을 하나의 객체로 정의하여 관리하며 Undo/Redo와 같은 처리가 가능하다.

View Handler D.P. 시스템의 모든 뷰를 관리하는 책임을 분리하여 뷰들 간의 관계성과 연관된 작업을 쉽게 처리할 수 있도록 한다.

Forwarder-Receiver D.P. 투명한 IPC를 제공하고 Peer를 분리하기 위해 Forwarder와 Receiver를 분리한다.

Client-Dispatcher-Server D.P. 클라이언트와 서버 사이에 디스패처 레이어를 도입한다. 위치 투명성을 제공하고 클라이언트와 서버간의 통신에 대한 세부적인 구현을 캡출화한다.

Publisher-Subscriber D.P. 서로 긴밀하게 관계를 맺고 있는 컴포넌트들 간의 상태에 대해 정합성을 유지하는데 용이하다. Publisher가 책임을 지고 하나의 변경에 대해 다수의 Subscriber에게 변경을 통지한다.