- IPC
- OBCon IPC 설계
- 환경 설정
- Interface 설계
- Message Passing
- Master
- Worker
- OBCon의 메시지 타입
- 메시지 송수신
- Message Queue
- appl.worker
- publish_command topic
- trade topic
- Pipe
- Stream
- Pipe
- FIFO
- Direct Communication
- Indirect Communication
IPC
Inter Process Communication
OBCon IPC 설계
환경 설정
//--- 환경 설정
config.service = {
name: 'service_cms_bluestone' //--- 서비스 이름
guid: 'cms' //--- GUID, 서비스 구분에 사용
};
config.ipc = {
isUse: false, //--- true. IPC 사용
ipcInfo: { //--- IPC 종류와 접속 정보
//--- IPC 종류
//--- MQTT : Message Queue - Mosquitto
//--- Cluster : Message Passing - Cluster
//--- Pipe : Stream (Reserved)
//--- FIFO : Named pipe (Reserved)
//--- Direct : Tcp socket (Reserved)
//--- Indirect : Redis (Reserved)
type: 'MQTT', //--- IPC 종류
protocol: 'mqtt',
host: '211.251.236.86', //--- www.obcon.biz,KT Cloud
port: 1883, //--- 1883/tcp. MQTT, 8883/tcp. MQTTS
user: 'bluestoneuser', //--- 사용자 아이디
passwd: 'ppp', //--- 사용자 비밀번호
}
};
//--- Application 실행 후 설정되는 worker 정보
appl.worker = {
id: 1, //--- Process ID (1, 2, ...)
type: 'http', //--- Process Type (http, tcp, udp, ...)
seq: 1 //--- Type내에서 Sequence (1, 2, ...)
};
Interface 설계
//--- 동일한 service, serviceGuid를 가진 process만 통신
class InterProcessCommunication {
constructor(ipcInfo, processInfo) {
//--- IPC 종류와 접속 정보
this._ipcInfo = {
...getDefaultIpcInfo(),
...ipcInfo
};
//--- Process 정보
this._processInfo = {
...this.getDefaultProcessInfo(),
...processInfo
};
}
getDefaultIpcInfo() {
return config.ipc.ipcInfo;
}
getDefaultProcessInfo() {
return {
service: config.service.name, //--- 서비스
guid: config.service.guid, //--- 서비스 구분에 사용
id: app.worker.id, //--- Process ID (1, 2, ...)
type: appl.worker.type, //--- Process Type
seq: appl.worker.seq //--- Type내에서 Sequence (1, 2, ...)
};
}
//--- Message 송신
//--- topic : 메시지 주제
//--- target : 수신자 process id, -1은 broadcast
//--- json : 메시지
//--- 사례) send('topic 1', 2, {});
send(topic, target, json, serviceType = null,
service = null, guid = null) {
const req = {
service: service || this._processInfo.service, //--- 요청자 서비스
guid: guid || this._processInfo.guid, //--- 요청자 서비스 GUID
source: this._processInfo.id, //--- 발송자 ID
type: serviceType,//--- 수신자 Process Type
target: target, //--- 수신자 ID (id or seq)
topic: topic, //--- 메시지 주제
json: json //--- 메시지
};
}
//--- Topic에 해당하는 메시지 수신
//--- 사례) onReceive('topic 1', funtion(req) { ~ });
onReceive(topic, cb, service = null, guid = null) {
}
}
function cb(req, ipc) {
//--- 메시지 구조에서 req 참조
const res = {
service: req.service, //--- 서비스
guid: req.guid, //--- 응답자 서비스 GUID
source: this._processInfo.id, //--- 발송자 ID
target: req.source, //--- 수신자 ID
topic: req.topic, //--- 메시지 주제
json: { //--- 메시지
code: 0, //--- 응답 코드 (0. 정상)
message: 'ok', //--- 오류 메시지
data: {}, //--- 전송 데이터
err: {} //--- 오류 데이터
}
};
//--- null을 반환하면 처리 종료
//--- res를 반환하면 응답 메시지 발송
return res;
}
Message Passing
Cluster
include/cluster.js
Master
Event
master: setup > master: fork > master: online > master: listening
worker: disconnect > master: disconnect > master: exit
cluster.fork() 명령 실행시
setup
fork : worker 생성
online : worker와 연결
listening : 메시지 수신 대기
message : 메시지 수신
disconnect : worker와 연결 끊어짐
exit : master 종료
Worker로 데이터 송신
cluster.workers[idx].send(~)
worker.send(~)
Worker
online : master와 연결
listening : 메시지 수신 대기
message : 메시지 수신
error : 오류 발생
disconnect : master와 연결 끊어짐
exit : worker 종료
Master로 데이터 송신
process.send(~)
OBCon의 메시지 타입
type
master:~ : worker가 master에게 직접 보낸 메시지
worker:~ : master가 worker에게 직접 보낸 메시지
worker:init
worker 생성시 호출 한다.
appl.worker에서 다음 정보를 저장 한다.
id : worker의 생성된 순서 (1, 2, 3, ...)
type : 프로세스 타입 (http, tcp, udp, modbus, proxy, ...)
seq : type내에서 순서 (1, 2, 3, ...)
기타 : Broadcast 메시지
setLoggerLevel : 로그레벨 재설정
process.send({ type: 'setLoggerLevel', level: 'info' });
SystemTrading_initialize
tradeSecrets(접속 정보)를 각 instance에서 초기화 한다.
modules/tradeSecrets/view.js ajaxInit()
메시지 송수신
global.clusters
boradcast(type, param)
sendMaster(type, param)
Message Queue
MQTT
modules/mqtt_client.js
config.mqtt 설정할 것
global.mqtt_client
type : pm2, master, http, tcp, udp, modbus, proxy, ...
clientId : config.mqtt.clientIdPrefix + worker.id
publish_command에 가입
connect
reconnect
message
mqtt_client.subscribe(topic) : 구독
mqtt_client.publish(~) : 메시지 전달
packetsend
packetreceive
error
close
end
offline
appl.worker
id : 전체 순서 (1, 2, 3, ...)
type : pm2, master, http, tcp, udp, modbus, proxy, ...
seq : type내에서 순서
publish_command topic
모든 process에게 전달되는 메시지
rebuild idxDevices : global.idxDevices 정보를 갱신
udp caches : UDP 장비의 접속 정보를 cache에 저장
stop checkAlive : watchDogTimer에서 checkAlive() 함수의 실행을 중단
sendRequest : 요청 처리 handler 호출
sendResponse : 응답 처리 handler 호출
trade topic
Reserved
Pipe
Stream
import fs from 'fs';
const String filename = 'stream.txt';
// process.stdin;
const ReadStream streamInp = fs.createReadStream(filename, 'utf8');
streamInp.on('readable', function() {
var chunk = streamInp.read();
while (chunk != null) {
chunk = streamInp.read();
}
});
streamInp.on('data', function(chunk) {
const String data = chunk.toString();
console.log(`data : ${chunk}`);
});
streamInp.on('pause', () => console.log('pause'));
streamInp.on('resume', () => console.log('resume'));
streamInp.on('error', err => console.log(err));
streamInp.on('close', () => console.log('close'));
streamInp.on('end', () => console.log('end'));
// process.stdout;
const WriteStream streamOut = fs.createWriteStream(filename);
streamOut.setDefaultEncoding('utf8');
streamOut.on('pipe', (src) => console.log(src));
streamOut.on('unpipe', (src) => console.log(src));
streamOut.on('error', err => console.log(err));
streamOut.on('close', () => console.log('close'));
streamOut.on('drain', () => console.log('drain'));
streamOut.on('finish', () => console.log('finish'));
streamOut.write('Hello World!');
streamOut.end('The end.');
Pipe
// import { pipeline } from 'stream';
import fs from 'fs';
import zlib from 'zlib';
const String fileInp = 'streamRead.txt';
const String fileOut = 'streamWrite.txt';
const ReadStream streamInp = fs.createReadStream(fileInp, 'utf8');
const WriteStream streamOut = fs.createWriteStream(fileOut);
streamInp.pipe(streamOut);
//--- Pipeline
const String fileOutGzip = `${fileInp}.gz`;
const WriteStream streamOutGzip = fs.createWriteStream(fileOutGzip);
streamInp.pipe(zlib.createGzip()).pipe(streamOutGzip);
FIFO
First Input First Output
Named Pipe
주의: read하는 곳이 없으면 write시 block 된다.
#--- filename에 해당하는 파일을 읽고 쓸 수 있다.
mkfifo $filename
# mknod $filename p
#--- 사용 사례
ls -alF > $filename
cat < $filename
Direct Communication
TCP/IP Socket
Indirect Communication
Redis
Memcache
Database
File