상단

IPC


Inter Process Communication

 

OBCon IPC 설계


 

환경 설정

//--- 환경 설정
config.service = {
    name: 'service_cms_bluestone'       //--- 서비스 이름
    guid: 'cms'                         //--- GUID, 서비스 구분에 사용
};

config.ipc = {
    isUse: false,                       //--- true. IPC 사용
    ipcInfo: {                          //--- IPC 종류와 접속 정보
        //--- IPC 종류
        //---    MQTT : Message Queue - Mosquitto
        //---    Cluster : Message Passing - Cluster
        //---    Pipe : Stream (Reserved)
        //---    FIFO : Named pipe (Reserved)
        //---    Direct : Tcp socket (Reserved)
        //---    Indirect : Redis (Reserved)
        type: 'MQTT',                   //--- IPC 종류
        
        protocol: 'mqtt',
        host: '211.251.236.86',  //--- www.obcon.biz,KT Cloud
        port: 1883,           //--- 1883/tcp. MQTT, 8883/tcp. MQTTS
        user: 'bluestoneuser',          //--- 사용자 아이디
        passwd: 'ppp',                  //--- 사용자 비밀번호
    }
};

//--- Application 실행 후 설정되는 worker 정보
appl.worker = {
    id: 1,                    //--- Process ID (1, 2, ...)
    type: 'http',             //--- Process Type (http, tcp, udp, ...)
    seq: 1                    //--- Type내에서 Sequence (1, 2, ...)
};
 

Interface 설계

//--- 동일한 service, serviceGuid를 가진 process만 통신
class InterProcessCommunication {
    constructor(ipcInfo, processInfo) {
        //--- IPC 종류와 접속 정보
        this._ipcInfo = { 
            ...getDefaultIpcInfo(), 
            ...ipcInfo 
        };
      
        //--- Process 정보
        this._processInfo = { 
            ...this.getDefaultProcessInfo(),
            ...processInfo
        };
    }
    
    getDefaultIpcInfo() {
        return config.ipc.ipcInfo;
    }
    
    getDefaultProcessInfo() {
        return {
            service: config.service.name,        //--- 서비스
            guid: config.service.guid, //--- 서비스 구분에 사용

            id: app.worker.id,         //--- Process ID (1, 2, ...)
            type: appl.worker.type,    //--- Process Type
            seq: appl.worker.seq       //--- Type내에서 Sequence (1, 2, ...)
        };
    }
  
    //--- Message 송신
    //---     topic : 메시지 주제
    //---     target : 수신자 process id, -1은 broadcast
    //---     json : 메시지
    //--- 사례) send('topic 1', 2, {});
    send(topic, target, json, serviceType = null, 
            service = null, guid = null) {
        const req = {
            service: service || this._processInfo.service, //--- 요청자 서비스
            guid: guid || this._processInfo.guid,  //--- 요청자 서비스 GUID

            source: this._processInfo.id,         //--- 발송자 ID
            type: serviceType,//--- 수신자 Process Type
            target: target,   //--- 수신자 ID (id or seq)
  
            topic: topic,     //--- 메시지 주제
            json: json        //--- 메시지
        };
    }
  
    //--- Topic에 해당하는 메시지 수신
    //--- 사례) onReceive('topic 1', funtion(req) { ~ });
    onReceive(topic, cb, service = null, guid = null) {
    }
}

function cb(req, ipc) {
    //--- 메시지 구조에서 req 참조
    
    const res = {
        service: req.service, //--- 서비스
        guid: req.guid,       //--- 응답자 서비스 GUID

        source: this._processInfo.id,   //--- 발송자 ID
        target: req.source,   //--- 수신자 ID
  
        topic: req.topic,     //--- 메시지 주제
        json: {               //--- 메시지
            code: 0,          //--- 응답 코드 (0. 정상)
            message: 'ok',    //--- 오류 메시지
            data: {},         //--- 전송 데이터
            err: {}           //--- 오류 데이터
        }
    };
  
    //--- null을 반환하면 처리 종료
    //--- res를 반환하면 응답 메시지 발송
    return res;
}
 

Message Passing


  • Cluster

    • include/cluster.js

 

Master

  • Event

    • master: setup > master: fork > master: online > master: listening

    • worker: disconnect > master: disconnect > master: exit

  • cluster.fork() 명령 실행시

    • setup

    • fork : worker 생성

    • online : worker와 연결

    • listening : 메시지 수신 대기

  • message : 메시지 수신

  • disconnect : worker와 연결 끊어짐

  • exit : master 종료

  • Worker로 데이터 송신

    • cluster.workers[idx].send(~)

    • worker.send(~)

 

Worker

  • online : master와 연결

  • listening : 메시지 수신 대기

  • message : 메시지 수신

  • error : 오류 발생

  • disconnect : master와 연결 끊어짐

  • exit : worker 종료

  • Master로 데이터 송신

    • process.send(~)

 

OBCon의 메시지 타입

  • type

    • master:~ : worker가 master에게 직접 보낸 메시지

    • worker:~ : master가 worker에게 직접 보낸 메시지

      • worker:init

        • worker 생성시 호출 한다.

        • appl.worker에서 다음 정보를 저장 한다.

          • id : worker의 생성된 순서 (1, 2, 3, ...)

          • type : 프로세스 타입 (http, tcp, udp, modbus, proxy, ...)

          • seq : type내에서 순서 (1, 2, 3, ...)

    • 기타 : Broadcast 메시지

      • setLoggerLevel : 로그레벨 재설정

        • process.send({ type: 'setLoggerLevel', level: 'info' });

      • SystemTrading_initialize

        • tradeSecrets(접속 정보)를 각 instance에서 초기화 한다.

        • modules/tradeSecrets/view.js ajaxInit()

메시지 송수신

  • global.clusters

    • boradcast(type, param)

    • sendMaster(type, param)

 

Message Queue


  • MQTT

    • modules/mqtt_client.js

  • config.mqtt 설정할 것

  • global.mqtt_client

    • type : pm2, master, http, tcp, udp, modbus, proxy, ...

    • clientId : config.mqtt.clientIdPrefix + worker.id

    • publish_command에 가입

  • connect

  • reconnect

  • message

    • mqtt_client.subscribe(topic) : 구독

    • mqtt_client.publish(~) : 메시지 전달

  • packetsend

  • packetreceive

  • error

  • close

  • end

  • offline

 

appl.worker

  • id : 전체 순서 (1, 2, 3, ...)

  • type : pm2, master, http, tcp, udp, modbus, proxy, ...

  • seq : type내에서 순서

 

publish_command topic

  • 모든 process에게 전달되는 메시지

  • rebuild idxDevices : global.idxDevices 정보를 갱신

  • udp caches : UDP 장비의 접속 정보를 cache에 저장

  • stop checkAlive : watchDogTimer에서 checkAlive() 함수의 실행을 중단

  • sendRequest : 요청 처리 handler 호출

  • sendResponse : 응답 처리 handler 호출

 

trade topic

Reserved

 

Pipe


 

Stream

import fs from 'fs';

const String filename = 'stream.txt';

// process.stdin;
const ReadStream streamInp = fs.createReadStream(filename, 'utf8');

streamInp.on('readable', function() {
    var chunk = streamInp.read();
    while (chunk != null) {
        chunk = streamInp.read();       
    }
});
streamInp.on('data', function(chunk) {
    const String data = chunk.toString();
    console.log(`data : ${chunk}`);
});
streamInp.on('pause', () => console.log('pause'));
streamInp.on('resume', () => console.log('resume'));
streamInp.on('error', err => console.log(err));
streamInp.on('close', () => console.log('close'));
streamInp.on('end', () => console.log('end'));

// process.stdout;
const WriteStream streamOut = fs.createWriteStream(filename);
streamOut.setDefaultEncoding('utf8');

streamOut.on('pipe', (src) => console.log(src));
streamOut.on('unpipe', (src) => console.log(src));
streamOut.on('error', err => console.log(err));
streamOut.on('close', () => console.log('close'));
streamOut.on('drain', () => console.log('drain'));
streamOut.on('finish', () => console.log('finish'));

streamOut.write('Hello World!');
streamOut.end('The end.');
 

Pipe

// import { pipeline } from 'stream';
import fs from 'fs';
import zlib from 'zlib';

const String fileInp = 'streamRead.txt';
const String fileOut = 'streamWrite.txt';

const ReadStream streamInp = fs.createReadStream(fileInp, 'utf8');
const WriteStream streamOut = fs.createWriteStream(fileOut);

streamInp.pipe(streamOut);

//--- Pipeline
const String fileOutGzip = `${fileInp}.gz`;
const WriteStream streamOutGzip = fs.createWriteStream(fileOutGzip);
streamInp.pipe(zlib.createGzip()).pipe(streamOutGzip);

FIFO


  • First Input First Output

  • Named Pipe

    • 주의: read하는 곳이 없으면 write시 block 된다.

 
 
 
#--- filename에 해당하는 파일을 읽고 쓸 수 있다.
mkfifo  $filename
# mknod  $filename  p

#--- 사용 사례
ls  -alF  >  $filename
cat  <  $filename
 

Direct Communication


 

Indirect Communication


  • Redis

    • Memcache

  • Database

  • File

최종 수정일: 2024-09-30 12:26:20

이전글 :
다음글 :