- 2019 빅데이터 6일 완성
- 1차시 - 데이터 수집 및 저장 (크롤링과 Kafka 맛보기)
- 2차시 - 데이터 전처리 (SQL, Pandas 활용하기)
- 3차시 - Zeppelin을 이용한 시각화
- 4차시 - Zeppelin을 이용한 실무형 데이터 전처리
- 5차시 - Zeppelin, 현업에서 어떻게 쓰이고 있는가
- 6차시 - 융합 실습 프로젝트 (Data 수집부터 Insight 도출까지)
- 샘플 코드
- 참고 문헌
2019 빅데이터 6일 완성
기간 : 2019년 11월 4일부터 2019년 11월 9일
1차시 - 데이터 수집 및 저장 (크롤링과 Kafka 맛보기)
Windows에 Python 통합개발환경 (IDE) 구성
Python과 PyCharm Community 버전을 다운로드하여 설치 합니다.
"Windows x86-64 executable installer" 링크를 선택하여 설치 파일을 다운로드 한다.
다운로드한 설치 파일로 설치 한다. (참조: https://psklog.tistory.com/44)
Community 아래에 있는 "DOWNLOAD" 버튼을 선택하여 설피 파일을 다운로드 한다.
다운로드한 설치 파일로 설치 한다.
데이터베이스 관리 도구인 DBeaver Community 구성
"Windows 64 bit (installer + JRE)" 링크를 선택하여 설치 파일을 다운로드 한다.
다운로드한 설치 파이롤 설치 한다.
2차시 - 데이터 전처리 (SQL, Pandas 활용하기)
3차시 - Zeppelin을 이용한 시각화
4차시 - Zeppelin을 이용한 실무형 데이터 전처리
MIMIC-III Clinical Data 다운로드
#--- https://physionet.org/content/mimiciii-demo/1.4/
cd /work/files
wget -r -N -c -np https://alpha.physionet.org/files/mimiciii-demo/1.4/
5차시 - Zeppelin, 현업에서 어떻게 쓰이고 있는가
6차시 - 융합 실습 프로젝트 (Data 수집부터 Insight 도출까지)
챠트 종류
Table
Bar Chart, Area Chart, Line Chart
Pie Chart
Scatter Chart
Download : CSV
샘플 코드
Database를 먼저 생성한 후에 작업 하세요.
실행할 함수를 run() 함수에서 호출하면 해당 함수가 실행 됩니다.
class를 사용하지 않고 실행하고자 하는 경우, import 문과 함수내의 코드만 복사하여 실행하면 됩니다.
아래 "단독 실행 샘플 코드"를 참조 하세요.
# -*- coding: utf-8 -*-
# Author : gye hyun james kim [pnuskgh@gmail.com]
# Copyright (c) 2018 ~ 2019, pnuskgh. All rights reserved.
import sys
import traceback
import requests
from bs4 import BeautifulSoup
import pymysql
class Crawling:
def __init__(self, List, p_str=None):
#--- 공공데이터 포털 접속 정보
self.ServiceKey = '인증키'
#--- Database 접속 정보
self.host = '211.43.14.168'
self.port = 3306
self.database = 'crawlingdb'
self.user = 'crawling'
self.passwd = '비밀번호'
def __del__(self):
pass
def run(self):
#--- 실행할 함수명을 여기에 명시하여 실행하세요.
self.day2_002()
#--- requests 사용
def day1_001(self):
res = requests.get('http://www.naver.com')
print(res.content)
print(res.text)
#--- bs4 사용
def day1_002(self):
res = requests.get('http://www.naver.com')
soup = BeautifulSoup(res.text, "html.parser")
title = soup.find('h1') #--- 첫번째 h1 태크를 찾는다.
print(title)
#--- 네이버 실시간 검색어
def day1_003(self):
res = requests.get('http://naver.com')
soup = BeautifulSoup(res.text, "html.parser")
data = soup.select('span.ah_k') #--- span 태그중 class로 ah_k를 가진 모든 태그를 찾는다.
for item in data:
print('-', item.string)
#--- 네이버 뉴스 제목 crawling
def day1_004(self):
res = requests.get('https://news.naver.com')
soup = BeautifulSoup(res.text, "html.parser")
data = soup.select('div.com_list > div > ul > li > a > strong')
for item in data:
print('-', item.string)
#--- 네이버 가장 많이 본 뉴스
def day1_005(self):
res = requests.get('https://news.naver.com')
soup = BeautifulSoup(res.text, "html.parser")
data = soup.select('ul.section_list_ranking > li > a')
for item in data:
print('-', item.string)
#--- 네이버 금융 TOP 종목
def day1_006(self):
res = requests.get('https://finance.naver.com')
soup = BeautifulSoup(res.text, "html.parser")
topItem = soup.find('tbody', id='_topItems1')
data = topItem.find_all('tr')
for item in data:
print(item.th.string, end="\t")
data_td = item.find_all('td')
for item_td in data_td:
print(item_td.string, end='\t')
print()
#--- 실습
def day1_007(self):
url = "https://finance.naver.com/"
res = requests.get(url)
soup = BeautifulSoup(res.text, "html.parser")
data = soup.select("#container > div.aside > div.group_aside > div.aside_area.aside_stock > table > tbody > tr")
for item in data:
print('-', item.find('th').text, end="\t")
print(item.find_all('td')[0].text, end="\t")
print(item.find_all('td')[1].text)
print()
data = soup.select("#container > div.aside > div.group_aside > div.aside_area.aside_popular > table > tbody > tr")
for item in data:
print('-', item.find('th').text, end="\t")
print(item.find_all('td')[0].text, end="\t")
print(item.find_all('td')[1].text)
print()
#--- WSDL 반환
def day1_101(self):
end_point = 'http://openapi.molit.go.kr/OpenAPI_ToolInstallPackage/service/rest/RTMSOBJSvc/getRTMSDataSvcLandTrade?_wadl&type=xml'
res = requests.get(end_point)
print(res.text)
#--- 부동산 매매 신고 자료
def day1_102(self):
end_point = 'http://openapi.molit.go.kr/OpenAPI_ToolInstallPackage/service/rest/RTMSOBJSvc/getRTMSDataSvcNrgTrade'
LAWD_CD = '11110' #--- LAWD_CD, 각 지역별 코드
DEAL_YMD = '201512' #--- DEAL_YMD, 월 단위 신고자료
url = end_point + '?ServiceKey=' + self.ServiceKey
url = url + '&LAWD_CD=' + LAWD_CD
url = url + '&DEAL_YMD=' + DEAL_YMD
res = requests.get(url)
soup = BeautifulSoup(res.text, "xml")
resultCode = soup.find('resultCode')
resultMsg = soup.find('resultMsg')
print(resultCode.string, resultMsg.string)
data = soup.find_all('item')
for item in data:
print(item.법정동.string)
#--- 아파트 분양권 전매 신고자료
def day1_103(self):
end_point = 'http://openapi.molit.go.kr/OpenAPI_ToolInstallPackage/service/rest/RTMSOBJSvc/getRTMSDataSvcSilvTrade'
LAWD_CD = '11110' #--- LAWD_CD, 각 지역별 코드
DEAL_YMD = '201512' #--- DEAL_YMD, 월 단위 신고자료
url = end_point + '?ServiceKey=' + self.ServiceKey
url = url + '&LAWD_CD=' + LAWD_CD
url = url + '&DEAL_YMD=' + DEAL_YMD
res = requests.get(url)
soup = BeautifulSoup(res.text, "xml")
resultCode = soup.find('resultCode')
resultMsg = soup.find('resultMsg')
print(resultCode.string, resultMsg.string)
data = soup.find_all('item')
for item in data:
print(item.법정동.string)
#--- 아파트 매매 실거래 상세 자료
def day1_104(self):
end_point = 'http://openapi.molit.go.kr/OpenAPI_ToolInstallPackage/service/rest/RTMSOBJSvc/getRTMSDataSvcAptTradeDev'
LAWD_CD = '11110' #--- LAWD_CD, 각 지역별 코드
DEAL_YMD = '201512' #--- DEAL_YMD, 월 단위 신고자료
url = end_point + '?ServiceKey=' + self.ServiceKey
url = url + '&LAWD_CD=' + LAWD_CD
url = url + '&DEAL_YMD=' + DEAL_YMD
url = url + '&pageNo=1'
url = url + '&numOfRows=10'
res = requests.get(url)
soup = BeautifulSoup(res.text, "xml")
resultCode = soup.find('resultCode')
resultMsg = soup.find('resultMsg')
print(resultCode.string, resultMsg.string)
data = soup.find_all('item')
for item in data:
print(item.법정동.string)
#--- mysql 연동
def day2_001(self):
db = pymysql.connect(host=self.host, port=self.port, user=self.user, passwd=self.passwd, db=self.database, charset='utf8')
cursor = db.cursor()
cntRows = cursor.execute('show databases')
print('count rows:', cntRows)
print(cursor.fetchall())
#--- Table 생성
def day2_002(self):
sql_dropTable = '''
DROP TABLE IF EXISTS `openapi_estate2`;
'''
sql_createTable = '''
CREATE TABLE openapi_estate2 (
`no` INT NOT NULL AUTO_INCREMENT,
building_size INT NOT NULL,
building_use varchar(100) NOT NULL,
building_year INT NOT NULL,
`year` INT NOT NULL,
land_size FLOAT NOT NULL,
location_dong varchar(100) NOT NULL DEFAULT '',
location_gu varchar(100) NOT NULL,
location_use varchar(100) NOT NULL,
`month` INT NOT NULL,
`type` varchar(100) NOT NULL,
`day` INT NOT NULL,
location_code INT NOT NULL,
PRIMARY KEY (`no`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8
'''
db = pymysql.connect(host=self.host, port=self.port, user=self.user, passwd=self.passwd, db=self.database, charset='utf8')
cursor = db.cursor()
#--- Table 생성
cursor.execute(sql_dropTable)
cursor.execute(sql_createTable)
cntRows = cursor.execute('show tables')
print('count rows:', cntRows)
print(cursor.fetchall())
#--- Table 정의 조회
cntRows = cursor.execute('DESCRIBE openapi_estate2')
print(cursor.fetchall())
if __name__ == "__main__":
try:
app = Crawling(sys.argv)
app.run()
except KeyboardInterrupt:
print(traceback.format_exc())
단독 실행 샘플 코드
# -*- coding: utf-8 -*-
# Author : gye hyun james kim [pnuskgh@gmail.com]
# Copyright (c) 2018 ~ 2019, pnuskgh. All rights reserved.
import sys
import traceback
import requests
from bs4 import BeautifulSoup
import pymysql
#--- 공공데이터 포털 접속 정보
ServiceKey = '인증키'
#--- Database 접속 정보
host = '211.43.14.168'
port = 3306
database = 'crawlingdb'
user = 'crawling'
passwd = '비밀번호'
#--- 여기에 실행할 코드를 입력하여 실행 하세요.
#--- 주의: "self."은 모두 삭제한 후에 입력하여야 합니다.
Zeppelin 샘플 코드
# -*- coding: utf-8 -*-
# Author : gye hyun james kim [pnuskgh@gmail.com]
# Copyright (c) 2018 ~ 2019, pnuskgh. All rights reserved.
import numpy as np
import pandas as pd
from pandas import Series, DataFrame
import datetime
#--- 데이터 읽기
#--- https://mimic.physionet.org/mimictables/admissions/
#--- 'row_id'
#--- 'subject_id' : 환자 아이디
#--- 'hadm_id'
#--- 'admittime' : 입원 일시
#--- 'dischtime' : 퇴원 일시
#--- 'deathtime' : 죽은 일시
#--- 'admission_type', 'admission_location'
#--- 'discharge_location', 'insurance',
#--- 'language', 'religion', 'marital_status'
#--- 'ethnicity'
#--- 'edregtime' : 응급실 입원 일시
#--- 'edouttime' : 응급실 퇴원 일시
#--- 'diagnosis'
#--- 'hospital_expire_flag', 'has_chartevents_data'
#--- https://mimic.physionet.org/mimictables/patients/
#--- 'row_id'
#--- 'subject_id' : 환자 아이디
#--- 'gender' : 성별 (M, F)
#--- 'dob' : 태어난 일시
#--- 'dod' : 죽은 일시
#--- 'dod_hosp' : 죽은 일시 (병원 기록)
#--- 'dod_ssn' : 죽은 일시 (사회보장 기록)
#--- 'expire_flag'
folderData = '/work/files/alpha.physionet.org/files/mimiciii-demo/1.4/'
admission_df = pd.read_csv(folderData + 'admissions.csv')
patients_df = pd.read_csv(folderData + 'patients.csv')
#--- 데이터 탐색: 기본 정보 표시
print('shape', admission_df.shape)
print('columns', admission_df.columns)
# print('values', admission_df.values)
#--- 기초 데이터 가공
#--- 문자열을 날자로 변환
admission_df.admittime = pd.to_datetime(admission_df.admittime)
admission_df.dischtime = pd.to_datetime(admission_df.dischtime)
admission_df.deathtime = pd.to_datetime(admission_df.deathtime)
admission_df.edregtime = pd.to_datetime(admission_df.edregtime)
admission_df.edouttime = pd.to_datetime(admission_df.edouttime)
patients_df.dob = pd.to_datetime(patients_df.dob)
patients_df.dod = pd.to_datetime(patients_df.dod)
patients_df.dod_hosp = pd.to_datetime(patients_df.dod_hosp)
patients_df.dod_ssn = pd.to_datetime(patients_df.dod_ssn)
#--- 두개의 데이터 병합
merged_df = pd.merge(admission_df, patients_df, on='subject_id')
#--- 테이블로 표시
columns = [
'admittime', 'dischtime', 'deathtime', 'edregtime', 'edouttime',
'dob', 'dod', 'dod_hosp', 'dod_ssn'
]
z.show(merged_df.loc[:, columns])
#--- 데이터 가공 1 : 나이 계산
def toYear(x):
return int(x.days / 365)
merged_df.loc[:, 'ages'] = merged_df.admittime.dt.date - merged_df.dob.dt.date
merged_df.ages = merged_df.ages.apply(toYear)
z.show(merged_df.loc[:, ['admittime', 'dob', 'ages']])
#--- 데이터 가공 2 : 기간별 재입원 환자 계산
merged_df.loc[:, 'admitduration'] = merged_df.dischtime - merged_df.admittime
terms = [30, 60, 90, 180, 365]
numOfReadmission = []
oldTerm = 0
for term in terms:
dataset = merged_df[(datetime.timedelta(days=oldTerm) <= merged_df.admitduration) & (merged_df.admitduration < datetime.timedelta(days=term))]
colName = 're_admission_' + str(term)
numOfReadmission.append(dataset.shape[0])
oldTerm = term
print(numOfReadmission)
admission_df['subject_id'].value_counts()
g_df = admission_df.groupby('subject_id').head(1)
g_religion_df = g_df.religion.value_counts().reset_index()
g_religion_df.columns = ['religion','cnt']
z.show(g_religion_df)
admission_df.religion.shape
religion_df = admission_df.religion.value_counts().reset_index()
religion_df.columns = ['religion','cnt']
religion_df.shape
z.show(admission_df)
#--- admissions 처리 2
SUBJECT_ID = 1
ADMITTIME = 3
DISCHTIME = 4
patient_ids = admission_df.subject_id.unique()
new_patients = []
wrong_patients = []
for patient_id in patient_ids:
patient_df = admission_df.loc[admission_df['subject_id'] == patient_id]
patient_list = patient_df.values.tolist()
# 입원을 한번만 했을 경우
if len(patient_list) == 1:
new_patients.append(patient_list[0])
continue
else:
for i in range(len(patient_list)):
checker_wrong_1 = (pd.to_datetime(patient_list[i][DISCHTIME]) - pd.to_datetime(patient_list[i][ADMITTIME]))
# print(type(checker_wrong_1.days))
# print(checker_wrong_1)
wrong_flag = 0
if checker_wrong_1 > pd.Timedelta(np.timedelta64(1, 'ms')):
wrong_patients.append(patient_list[i])
else:
wrong_flag = 0
new_patients.append(patient_list[i])
terms = [30]
for term in terms:
for i in range(len(new_patients) - 1):
next_same_flag = new_patients[i][SUBJECT_ID] == new_patients[i + 1][SUBJECT_ID]
readmission_days = 0
if next_same_flag:
readmission_days = (pd.to_datetime(new_patients[i + 1][ADMITTIME]) - pd.to_datetime(new_patients[i][DISCHTIME]))
readmission_days = readmission_days.days
if readmission_days <= term:
new_patients[i].append(1)
continue
else:
new_patients[i].append(0)
continue
else:
new_patients[i].append(0)
admission_df['admittime'] = pd.to_datetime(admission_df['admittime'])
admission_df['dischtime'] = pd.to_datetime(admission_df['dischtime'])
def get_visit_days(days):
res = ((admission_df.dischtime.values >= admission_df.admittime.values[:, None] - pd.to_timedelta('%d days' % days))
& (admission_df.admittime.values < admission_df.admittime.values[:, None])
& (admission_df.subject_id.values == admission_df.subject_id.values[:, None]))
res = np.sum(res, axis=1)
return pd.Series(res, name="%dd" % days)
visit_days = [30, 60, 90, 180, 365]
visit_df_list = []
for day in visit_days:
visit_df_list.append(get_visit_days(day))
visit_df = pd.concat(visit_df_list, axis=1)
admission_labeled_df = pd.concat([admission_df,visit_df], axis=1)
admission_labeled_df
pd.DataFrame(new_patients)
#--- admissions 처리 3
SUBJECT_ID = 1
ADMITTIME = 3
DISCHTIME = 4
patient_ids = admission_df.subject_id.unique()
new_patients = []
wrong_patients = []
for patient_id in patient_ids:
patient_df = admission_df.loc[admission_df['subject_id'] == patient_id]
patient_list = patient_df.values.tolist()
# 입원을 한번만 했을 경우
if len(patient_list) == 1:
new_patients.append(patient_list[0])
continue
else:
for i in range(len(patient_list)):
checker_wrong_1 = (pd.to_datetime(patient_list[i][DISCHTIME]) - pd.to_datetime(patient_list[i][ADMITTIME]))
wrong_flag = 0
if checker_wrong_1 < pd.Timedelta(np.timedelta64(1, 'ms')):
wrong_patients.append(patient_list[i])
elif checker_wrong_1 > pd.Timedelta(np.timedelta64(1, 'ms')):
wrong_flag = 0
new_patients.append(patient_list[i])
terms = [30]
for term in terms:
for i in range(len(new_patients) - 1):
readmission_days = 0
if new_patients[i][SUBJECT_ID] == new_patients[i + 1][SUBJECT_ID]:
readmission_days = (pd.to_datetime(new_patients[i + 1][ADMITTIME]) - pd.to_datetime(new_patients[i][DISCHTIME]))
readmission_days = readmission_days.days
if readmission_days <= term:
new_patients[i].append(1)
continue
else:
new_patients[i].append(0)
continue
else:
new_patients[i].append(0)
for i in range(len(terms)):
new_patients[-1].append(0)
new_patients_df = pd.DataFrame(new_patients)
new_patients_df.columns = [
'ROW_ID', 'subject_id', 'hadm_id', 'ADMITTIME', 'DISCHTIME',
'DEATHTIME', 'ADMISSION_TYPE', 'ADMISSION_LOCATION', 'DISCHARGE_LOCATION',
'INSURANCE', 'LANGUAGE', 'RELIGION', 'MARITAL_STATUS', 'ETHNICITY',
'EDREGTIME', 'EDOUTTIME', 'DIAGNOSIS', 'HOSPITAL_EXPIRE_FLAG',
'HAS_CHARTEVENTS_DATA', 're_admission_30'
]
target_df = new_patients_df[['subject_id', 'hadm_id', 're_admission_30']]
pie_df = target_df.re_admission_30.value_counts().reset_index()
pie_df.colums=['re_admission_30','cnt']
z.show(pie_df)
new_patients_df
참고 문헌
최종 수정일: 2024-09-30 12:26:19
이전글 :
다음글 :