การทำ ETL ด้วย Talend และ Python เชื่อม RESTful API เพื่อการนำเสนอข้อมูล (Visualization)

(ขอขอบคุณรูปภาพจาก fullvector / Freepik)
บทความนี้ผู้เขียนจะขอนำเสนอกระบวนการหนึ่งที่สำคัญของ Data Engineer นั่นคือ การนำเข้า จัดระเบียบ และนำข้อมูลไปใช้ประโยชน์เชิงวิเคราะห์ (Extract-Transform-Load: ETL) โดยดึงข้อมูลผ่าน RESTful API ด้วยเครื่องมือต่าง ๆ เช่น Talend และ Python ซึ่งในระหว่างการทำ ETL นั้นเราจะได้ทำความรู้จักกับเครื่องมือที่เกี่ยวข้องด้วย เช่น PostMan, PostgreSQL, PowerBI และ Tableau
ในสถานการณ์วิกฤตโรคระบาดโควิด-19 ที่มีผลกระทบทั่วโลก และยังส่งผลโดยตรงต่อเศรษฐกิจ สุขภาพ และชีวิตประจำวันของพี่น้องประชาชนคนไทยทุกคนนั้น ภาครัฐเองก็ไม่ได้นิ่งนอนใจ มีความตระหนักและให้ความสำคัญอย่างยิ่งในการทำงานบูรณาการข้อมูลร่วมกันเพื่อแก้ปัญหาที่เกิดขึ้นนี้อย่างเร่งด่วน ทุกภาคส่วนล้วนช่วยกันเพื่อรับมือกับวิกฤตโรคระบาดครั้งนี้ หากจะกล่าวถึงงานทางด้าน Data Engineer ผู้เขียนมองว่าการนำข้อมูลที่ภาครัฐเปิดเผยไปใช้ประโยชน์นั้น นอกจากจะสามารถช่วยหลากหลายหน่วยงานในการรับมือกับสถานการณ์ครั้งนี้ ยังช่วยให้ประชาชนสามารถเข้าถึงข้อมูลที่มีความถูกต้องได้อีกด้วย
มาเริ่มกันเลยดีกว่า หากท่านผู้อ่านเคยศึกษาลักษณะงาน Data Analytic มาบ้างแล้ว ท่านจะทราบว่าลักษณะงานของ Data Engineer นั้น คาบเกี่ยวกับงานหลายส่วน พอจะสรุปได้สั้น ๆ คือ สนับสนุนงานทางด้านเทคนิค วางโครงสร้างระบบ จัดการโครงสร้างข้อมูล ประมวลผลข้อมูลในระดับทั่วไป หรือข้อมูลในระดับบิ๊กดาต้า รวมไปถึงจัดเตรียมช่องทางให้เกิดการการเผยแพร่ผลลัพธ์จากการวิเคราะห์หรือการคาดการณ์สิ่งที่จะเกิดขึ้นในอนาคต ยกตัวอย่างเช่น การแสดงผลลัพธ์ข้อมูลจำนวนผู้ป่วยในอนาคต การแสดงผลด้วยวิธีที่ง่ายและใกล้ตัวที่ทุกท่านพอจะนึกถึงได้อาจจะเป็นการทำรายงานแบบทั่วไป แต่การทำรายงานที่มีประสิทธิภาพทุกวันนี้ คงไม่่เป็นเพียงตารางเพื่อแสดงข้อมูลเท่านั้น การแสดงข้อมูลในรูปแบบกราฟต่าง ๆ ที่สามารถโต้ตอบผู้ใช้งานได้ จะช่วยให้ผู้ใช้งานเข้าใจได้ง่ายขึ้น อีกทั้งยังสามารถช่วยให้ดูข้อมูลในเชิงลึกได้ได้ดีกว่า อย่างไรก็ตาม หากเราต้องการพัฒนาการแสดงผลในรูปแบบนี้ให้เสร็จด้วยเวลาที่มีอยู่อย่างจำกัด แน่นอนว่าเราต้องอาศัยเครื่องมือที่เรียกว่า Business Intelligence (BI) ที่จะตอบโจทย์ได้เป็นอย่างดีโดยไม่ต้องใช้กำลังในการพัฒนาซอฟท์แวร์มากนัก แต่สิ่งที่สำคัญมากที่จะทำให้เราจะได้ผลลัพธ์ที่ดีออกมานั้น คือ เราจะต้องอาศัยข้อมูลต้นทางที่ดี มีความน่าเชื่อถือ มีความถูกต้อง และมีที่มาของข้อมูลอย่างชัดเจน
ข้อมูลเกี่ยวกับโรคระบาดโควิด-19 ที่เปิดเผยบน Open Data
ตัวอย่างข้อมูลที่จะนำมาใช้ประกอบบทความนี้ เพื่อให้สอดคล้องกับในวิกฤตสถานการณ์โรคระบาดโควิด-19 คือ ข้อมูลจริงที่เผยแพร่อย่างเป็นทางการจากหน่วยงานรัฐ โดยที่ข้อมูลได้ถูกเผยแพร่ใน Platform ที่เรียกว่า Open Data บนเว็บไซต์ https://data.go.th/ ซึ่งเป็นช่องทางในการเผยแพร่ข้อมูลที่เป็นทางการของประเทศไทยจากภาครัฐสู่ภาคเอกชนและประชาชน ทั้งนี้ ข้อมูลที่มีการเผยแพร่เกี่ยวกับโรคโควิด-19 ได้แก่ ข้อมูล “รายงาน COVID-19 ประจำวัน” โดยมีรายละเอียดตามภาพด้านล่างนี้ ข้อมูลนี้สามารถเรียกออกมาใช้งานได้ผ่าน Restful API ซึ่งเป็นข้อมูลที่มีการปรับปรุงแบบรายวัน


สำหรับข้อมูลรายงาน COVID-19 ประจำวันนี้ ทางผู้จัดทำข้อมูลได้จัดเตรียมข้อมูลรายละเอียดข้อมูลที่เราเรียกว่า Metadata ไว้ให้ และยังมีตัวอย่าง API การเรียกใช้ข้อมูลให้ด้วย ซึ่งต้องขอชื่นชม DGA หรือ สำนักงานพัฒนารัฐบาลดิจิทัล (องค์การมหาชน) ที่ได้จัดทำบริการ Open Data ขึ้นมา เพราะนอกจากจะเป็นประโยชน์อย่างมากแล้ว ยังช่วยอำนวยความสะดวกให้ผู้ที่ต้องการนำข้อมูลไปใช้งานสามารถทำงานได้ง่ายขึ้น โดยเมื่อกดที่ “Data API” จะมีตัวอย่างการเรียก API ปรากฏขึ้นมาเป็นตัวอย่าง ซึ่งสามารถนำไปเรียกใช้งานได้ทันที ดังตัวอย่างที่แสดงในรูปภาพด้านล่างนี้


การใช้ Postman เพื่อเรียกดูข้อมูลผ่าน RESTful API
เมื่อเราได้ข้อมูลตั้งต้นแล้ว ผู้เขียนแนะนำว่าให้ใช้ Postman ในการเรียกดูข้อมูลผ่าน API ดังกล่าว เครื่องมือที่แนะนำนี้สามารถดาวน์โหลดใช้งานได้ฟรีที่ https://www.postman.com/ ซึ่งเมื่อติดตั้งเสร็จแล้วให้ทดสอบเรียก API เพื่อดึงข้อมูล แต่การเรียกข้อมูลจาก data.go.th นั้นจะต้องทำการลงทะเบียนก่อน โดยสามารถเข้าไปลงทะเบียนได้ที่ data.go.th จากนั้นท่านจะได้ API KEY สำหรับใช้ดึงข้อมูล โดยการกรอกลงในโปรแกรม Postman ดังตัวอย่างตามรูปภาพข้างล่างนี้ แล้ว API จะตอบกลับมาด้วยข้อมูลในรูปแบบ JSON (JavaScript Object Notation)

เมื่อทดสอบการดึงข้อมูลแล้ว เป็นอันว่าเราสามารถดึงข้อมูลกลับมาได้ตามโครงสร้างที่อยู่ในรูปแบบ JSON ขั้นตอนต่อไปจะเป็นการพัฒนา ETL (Extract, Transform, Load) เพื่อดึงข้อมูลนี้และจัดเก็บลงระบบฐานข้อมูล เพื่อให้ระบบ BI สามารถเชื่อมต่อเพื่อนำข้อมูลไปสรุปและแสดงผลต่อไป การเลือกระบบฐานข้อมูลมาใช้งานนั้น ขึ้นอยู่กับหน่วยงานหรือความต้องการใช้งานแต่ละที่ หากท่านผู้อ่านยังไม่ได้มีระบบฐานข้อมูลที่เลือกไว้ สามารถเลือกใช้งานระบบฐานข้อมูลที่สามารถใช้งานได้ฟรี เช่น PostgreSQL โดยสามารถดาวน์โหลดได้ที่ https://www.postgresql.org/ ส่วนเครื่องมือในการทำ ETL นั้น บทความนี้จะขอยกตัวอย่าง 2 วิธี คือ ด้วยวิธีการเขียนโปรแกรมภาษา Python และการใช้เครื่องมือ Talend Open Studio for Data Integration ซึ่งเป็นเครื่องมือที่สามารถใช้งานได้ฟรี ดาวน์โหลดได้ที่ https://www.talend.com/download/ นอกจากนี้ก็ยังมีโปรแกรมและเครื่องมือต่าง ๆ ที่ผู้อ่านสามารถศึกษาได้เพิ่มเติม ซึ่งผู้เขียนขออนุญาตไม่กล่าวรายละเอียดไว้ ณ ที่นี้ ยกตัวอย่าง ภาษาเช่น Shell Script, JAVA หรือแม้กระทั่งเครื่องมืออื่น ๆ เช่น Alteryx Self-Service Data Analytics Platform ทั้งนี้การเลือกใช้จะขึ้นอยู่ที่ลักษณะของโครงการและงบประมาณ ตัวอย่างที่ผู้เขียนนำมาแสดงนี้จะเป็นการพัฒนาที่ยังไม่มีค่าใช้จ่ายเพื่อส่งเสริมให้เกิดความรู้ความเข้าใจและนำไปใช้งานได้โดยที่ไม่ติดขัดเรื่องค่าใช้จ่าย
การทำ ETL ด้วย Talend Software
สำหรับวิธีการใช้เครื่องมือ Talend Software ที่เกริ่นไปก่อนหน้านี้ ท่านสามารถศึกษาข้อมูลใช้งานได้ที่ TOS 7.3 User Guide โดยภาพด้านล่างนี้เป็นตัวอย่างของ Flow ที่ใช้ในการดึงข้อมูลมาจาก DGA Restful API และเก็บข้อมูลลงระบบฐานข้อมูล PostgreSQL

เริ่มต้นโดยการสร้าง tRest_1 และ tLogRow2 และ Config ตามรูปภาพ โดยกำหนด URL และ api-key ตามรูปโดยให้ใส่ API Key ที่ท่านผู้อ่านได้มาจาก data.go.th ซึ่งจะต้องกด Edit schema ใน tREST_1 ให้มีส่วน Body และ ERROR_CODE ตามภาพ

การเพิ่ม tLogRow2 นั้นช่วยให้ภาพชัดเจนว่ามีข้อมูลไหนเข้ามาอย่างไรบ้าง

เนื่องจากข้อมูลที่ได้กลับมาจาก RESTful API นั้นจะมีส่วนของข้อมูลอื่นที่ไม่ใช้ส่วนข้อมูลที่เป็น record ที่เราจะจัดเก็บลงในฐานข้อมูลจึงจะต้องทำการ Extract JSON ออกมาให้อยู่ในรูปแบบ Column โดยให้ทำการ Config tExtractJSONFields_1 ตามภาพเพื่อทำการ Mapping ว่าจะได้ column อะไรออกมาบ้าง เมื่อกดดูที่ Edit schema ควรได้ fields list ตามภาพด้านล่างนี้


สำหรับ tLogRow3 นั้นเราใส่เพื่อให้เห็น fields ที่ออกจาก tExtractJSONFields_1 และจะไปเข้าที่ tMap_1ที่ต้อง Transform ข้อมูลเกี่ยวกับวันที่ “Notification date” และ “Announce Date”

เนื่องด้วยข้อมูล field วันที่ “Notification date” และ “Announce Date” ที่รับมาจาก JSON นั้นไม่สามารถจัดเก็บลงในระบบฐานข้อมูลได้ตรง ๆ เนื่องด้วยจะทำให้มีการจัดเก็บข้อมูลปีผิด จึงต้องใช้ tMap_1 เข้ามาช่วยโดยการใช้ฟังก์ชันเพื่อแก้ไขข้อมูลปี โดยการ Config ตามภาพ


และสุดท้ายคือการเพิ่ม tDBOutput_2 เพื่อเขียนข้อมูลลงไปจัดเก็บในฐานข้อมูล PostgreSQL โดยให้กรอกข้อมูลการเชื่อมต่อฐานข้อมูลให้สมบูรณ์ ส่วนการ Edit Schema ให้ระวังว่าได้ใส่รูปแบบวันที่ให้ถูกต้องตามภาพ

การตั้งค่า tDBOutput_2 เพื่อจัดเก็บข้อมูลลงฐานข้อมูล PostgreSQL นั้นท่านสามารถเลือกโหมดได้ตามรูปภาพ โดยแนะนำว่าให้เลือก “Drop table if exist and create”เนื่องด้วยข้อมูลเป็นข้อมูลที่มาแบบ Snapshot วันต่อวัน และในกรณีที่โครงสร้างของข้อมูลมีการเปลี่ยนแปลงแนะนำให้ใช้ “Create table if does not exist” ทั้งนี้ทั้งนั้นขึ้นอยู่กับลักษณข้อมูลที่ท่านจะนำ Talend Tool ไปใช้งาน

จากนั้นท่านสามารถกดรันได้ที่ “Run” ด้านล่าง Flow การทำงานตามรู้ภาพ

ตารางข้อมูลจะถูกสร้างในระบบฐานข้อมูล โดยที่ท่านสามารถดาวน์โหลดโปรแกรม เช่น DBeaver เพื่อใช้ Browse และ Query ข้อมูลจาก PostgreSQL ดังภาพ

การทำ ETL ด้วยการพัฒนาโปรแกรม Python
สำหรับตัวอย่างการพัฒนาโปรแกรม Python เพื่อดึงข้อมูลผ่าน Restful API ไปเก็บไว้ที่ระบบฐานข้อมูล PostgreSQL ท่านจะต้องติดตั้ง Python Library ที่จำเป็นเพื่อให้ตัวโปรแกรมสามารถเรียกประมวลผลได้ ได้แก่ sqlalchemy, tabulate, requests, pandas, json โดยใช้คำสั่ง pip install ตามด้วยชื่อ Library
ตัวอย่างโค้ดสำหรับการ Import Library ที่จำเป็น
import json
import datetime
import requests
import pandas as pd
import sqlalchemy
from sqlalchemy import create_engine
from pandas.io.json import json_normalize
from tabulate import tabulate
ตัวอย่างโค้ดสำหรับฟังก์ชันการดึงข้อมูลผ่าน Restful API และจัดการ JSON ด้วย Pandas Library
def get_api_data():
start_time_func = datetime.datetime.now()
resp = requests.get('https://opend.data.go.th/get-ckan/datastore_search?resource_id=93f74e67-6f76-4b25-8f5d-b485083100b6&limit=1500',headers={'api-key': '< ระบุ API Key ของท่าน >'})
jdata = resp.json()
df = pd.json_normalize(jdata['result'],record_path =['records'])
df = df[['no', 'age', 'sex', 'nation', 'occ_new', 'Risk', 'District', 'Notification date', 'Announce Date']]
pd.set_option('expand_frame_repr', False)
pd.set_option('display.max_columns', 10)
print(tabulate(df.head(10), headers='keys', tablefmt='psql'))
end_time_func = datetime.datetime.now()
print('get_resful_data() {}. (used:{})'.format("https://opend.data.go.th",end_time_func - start_time_func))
return df
ตัวอย่างโค้ดสำหรับฟังก์ชันการเก็บข้อมูลที่ได้จาก Restful API ไปไว้ในฐานข้อมูล PostgreSQL และนับจำนวน record
def sync_to_db(dataframe,pg_table):
start_time_func = datetime.datetime.now()
tengine = 'postgresql://' + PG_USERNAME + ':' + PG_PASSWORD + '@' + PG_HOST + ':' + PG_PORT + '/' + PG_DB_NAME
engine = create_engine(tengine)
pd.set_option('display.max_columns', 30)
result = df.to_sql(pg_table, engine, if_exists='replace', chunksize=1000 , index = False ,
dtype={'no': sqlalchemy.types.Integer, #integer
'age': sqlalchemy.types.String(length=10),
'sex': sqlalchemy.types.String(length=10),
'nation': sqlalchemy.types.String(length=50),
'occ_new': sqlalchemy.types.String(length=50),
'province': sqlalchemy.types.String(length=50),
'risk': sqlalchemy.types.String(length=50),
'district': sqlalchemy.types.String(length=50),
'notification_date': sqlalchemy.DateTime(),
'announce_date': sqlalchemy.DateTime()})
print(result)
end_time_func = datetime.datetime.now()
inserted = pd.read_sql('SELECT count(1) from %s'%(pg_table), engine)
print('sync_to_db() {}:{} {} records. (used:{})'.format(PG_DB_NAME,
pg_table,inserted.iloc[0]['count'],end_time_func - start_time_func))
ตัวอย่างโค้ดสำหรับการกำหนดค่าการเชื่มต่อระบบฐานข้อมูลและการเรียกใช้ฟังก์ชันการดึงข้อมูลและเก็บข้อมูลเข้าระบบฐานข้อมูล
#PostgreSQL Configure
PG_DB_NAME = " < Your PG Database> "
PG_USERNAME = " < Your PG User> "
PG_PASSWORD = " < Your PG Password> "
PG_HOST = " < Your PG Host IP> "
PG_PORT = "5432"
PG_TNAME = 'daily_data'
#Main Program
df = get_api_data()
sync_to_db(df,PG_TNAME)
หลังจากที่ได้ข้อมูลอยู่ในระบบฐานข้อมูลแล้ว ไม่ว่าจะด้วย Python หรือ Talend Tool ก็สามารถที่จะใช้ Tool ด้าน BI มาดึงข้อมูลไปทำ Dashboard ได้ ซึ่งทั้ง Tableau และ PowerBI Tool นั้นรองรับการดึงข้อมูลจากฐานข้อมูล PostgreSQL เพื่อไปเป็นข้อมูลตั้งต้นในการสร้างกราฟต่าง ๆ และ Dashboard ซึ่ง หากท่านต้องการให้ข้อมูลสามารถ update ได้เองในทุกวันท่านสามารถตั้ง Scheduler ทั้งบน Linux และ Window ให้เรียกทำงาน Python ขึ้นมาดึงข้อข้อมูลตามเวลาที่กำหนด ส่วน Talend เวอร์ชันที่ใช้ได้ฟรีนั้นไม่สามารถตั้ง Scheduler ได้ แต่ก็มีข้อดีคือผู้พัฒนางาน ETL ไม่จำเป็นต้องมีทักษะในการเขียนโปรแกรมก็สามารถติดตั้งเครื่องมือนี้เพื่อใช้งานได้ซึ่งสามารถพัฒนาได้เร็วกว่าการไปเริ่มศึกษาการเขียนโปรแกรม ซึ่งจะต้องพิจารณาตามความเหมาะสมและข้อจำกัดในการเลือกใช้งาน

รูปด้านล่างแสดง feature ของ Talend – Open Studio for Data Integration ที่สามารถใช้งานได้ฟรี สามารถดูข้อมูลเพิ่มเติมได้ที่ Talend.com

จากกระบวนการ ETL ที่ได้ยกมาเป็นตัวอย่างนี้ ผู้เขียนหวังว่าผู้อ่านจะได้ทำความคุ้นเคยกับเครื่องมือต่าง ๆ ของ Data Engineer สำหรับกระบวนการนำเข้า จัดระเบียบ และแสดงผลข้อมูล โดยผู้อ่านสามารถนำไปใช้ประโยชน์ในองค์กรของผู้อ่านได้จริง และสามารถเริ่มต้นได้โดยไม่มีค่าใช้จ่าย
งานด้าน Data Engineer นั้นยังมีอีกหลายเรื่องที่เกี่ยวข้อง ซึ่งเราจะเขียนในบทความต่อ ๆ ไปเช่นเรื่อง Cloud Service, Big Data Architecture, Infrastructure, Data Privacy, Machine Learning Model Deployment, Data Catalog และอีกมากมายที่เกี่ยวข้อง ในบทความนี้เรายังไม่ได้กล่าวถึงเรื่อง Big Data เนื่องจากคณะผู้เขียนเล็งเห็นว่าการดึงข้อมูลที่เกี่ยวกับโรคระบาดโควิด-19 นั้นเป็นประเด็นเร่งด่วนที่จะต้องเผยแพร่วิธีสู่สาธารณะ คณะผู้จัดทำขอขอบคุณท่านผู้อ่านที่ได้ติดตามอ่านจนจบบทความนี้ ขอให้ท่านดูแลรักษาสุขภาพ มีสุขภาพที่ดี และรักษาระยะห่างระหว่างกัน (Social Distancing) ไม่ให้โควิด-19 มาทำอะไรท่านได้ แล้วพบกันใหม่ในบทความต่อไปครับ
เนื้อหาโดย กิตติศักดิ์ สะอาดเอี่ยม
ตรวจทานและปรับปรุงโดย ปพจน์ ธรรมเจริญพร