手把手教你搭建一個 Python 連接數據庫,快速取數工具
大家好,我是安果!
在數據生產應用部門,取數分析是一個很常見的需求,實際上業務人員需求時刻變化,最高效的方式是讓業務部門自己來取,減少不必要的重複勞動,一般情況下,業務部門數據庫表結構一般是固定的,根據實際業務將取數需求做成 sql 腳本,快速完成數據獲取 --- 授人以漁的方式,提供平臺或工具
那如何實現一個自助取數查詢工具?
基於底層數據來開發不難,無非是將用戶輸入變量作爲篩選條件,將參數映射到 sql 語句,並生成一個 sql 語句然後再去數據庫執行
最後再利用 QT 開發一個 GUI 界面,用戶界面的點擊和篩選條件,信號觸發對應按鈕與綁定的傳參槽函數執行
具體思路:
一、數據庫連接類
此處利用 pandas 讀寫操作 oracle 數據庫
二、主函數模塊
1)輸入參數模塊,外部輸入條件參數,建立數據庫關鍵字段映射
-- 注:讀取外部 txt 文件,將篩選字段可能需要進行鍵值對轉換
2)sql 語句集合模塊,將待執行的業務 sql 語句統一存放到這裏
3)數據處理函數工廠
4)使用多線程提取數據
一、數據庫連接類
cx_Oracle 是一個 Python 擴展模塊,相當於 python 的 Oracle 數據庫的驅動,通過使用所有數據庫訪問模塊通用的數據庫 API 來實現 Oracle 數據庫的查詢和更新
Pandas 是基於 NumPy 開發,爲了解決數據分析任務的模塊,Pandas 引入了大量庫和一些標準的數據模型,提供了高效地操作大型數據集所需的方法類和函數
pandas 調用數據庫主要有 read_sql_table,read_sql_query,read_sql 三種方式
本文主要介紹一下 Pandas 中 read_sql_query 方法的使用
1:pd.read_sql_query()
讀取自定義數據,返還DataFrame格式,通過SQL查詢腳本包括增刪改查。
pd.read_sql_query(sql, con, index_col=None,coerce_float=True, params=None, parse_dates=None,chunksize=None)
sql:要執行的sql腳本,文本類型
con:數據庫連接
index_col:選擇返回結果集索引的列,文本/文本列表
coerce_float:非常有用,將數字形式的字符串直接以float型讀入
parse_dates:將某一列日期型字符串轉換爲datetime型數據,與pd.to_datetime函數功能類似。
params:向sql腳本中傳入的參數,官方類型有列表,元組和字典。用於傳遞參數的語法是數據庫驅動程序相關的。
chunksize:如果提供了一個整數值,那麼就會返回一個generator,每次輸出的行數就是提供的值的大小
read_sql_query()中可以接受SQL語句,DELETE,INSERT INTO、UPDATE操作沒有返回值(但是會在數據庫中執行),程序會拋出SourceCodeCloseError,並終止程序。SELECT會返回結果。如果想繼續運行,可以try捕捉此異常。
2:pd.read_sql_table()
讀取數據庫中的表,返還DataFrame格式(通過表名)
import pandas as pd
pd.read_sql_table(table_name, con, schema=None,index_col=None, coerce_float=True, parse_dates=None, columns=None,chunksize=None)
3:pd.read_sql()
讀數據庫通過SQL腳本或者表名
import pandas as pd
pd.read_sql(sql, con, index_col=None,coerce_float=True, params=None, parse_dates=None, columns=None, chunksize=None)
以下創建連接 oracel 數據庫的連接類 Oracle_DB
主要提供 2 種操作數據的函數方法。
import cx_Oracle
# Pandas讀寫操作Oracle數據庫
import pandas as pd
# 避免編碼問題帶來的亂碼
import os
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
class Oracle_DB(object):
def __init__(self):
try:
# 連接oracle
# 方法1:sqlalchemy 提供的create_engine()
# from sqlalchemy import create_engine
# engine = create_engine('oracle+cx_oracle://username:password@ip:1521/ORCL')
# #方法2:cx_Oracle.connect()
self.engine = cx_Oracle.connect('username', 'password', 'ip:1521/database')
except cx_Oracle.Error as e:
print("Error %d:%s" % (e.args[0], e.args[1]))
exit()
# 查詢部分信息
def search_one(self, sql,sparm):
try:
# #查詢獲取數據用sql語句
# 代傳參數:sparm--查詢指定字段參數
df = pd.read_sql_query(sql, self.engine,params=sparm)
self.engine.close()
except Exception as e:
return "Error " + e.args[0]
return df
# 查詢全部信息
def search_all(self, sql):
try:
# #查詢獲取數據用sql語句
df = pd.read_sql_query(sql, self.engine)
self.engine.close()
except Exception as e:
return "Error " + e.args[0]
return df
二、數據提取主函數模塊
cx_Oracle 是一個 Python 擴展模塊,相當於 python 的 Oracle 數據庫的驅動,通過使用所有數據庫訪問模塊通用的數據庫 API 來實現 Oracle 數據庫的查詢和更新。
1)外部輸入參數模塊
txt 文本中,就包含一列數據,第一行列名,讀取的時候忽略第一行
#建立ID——編號字典
def buildid():
sqlid = """select * from b_build_info"""
db = Oracle_DB() # 實例化一個對象
b_build_info = db.search_all(sqlid)
ID_bUILDCODE = b_build_info.set_index("BUILDCODE")["ID"].to_dict()
return ID_bUILDCODE
#通過文本傳入待導出數據清單
def read_task_list():
build_code=buildid()
tasklist=[]
is_first_line=True
with open("./b_lst.txt") as lst:
for line in lst:
if is_first_line:
is_first_line=False
continue
tasklist.append(build_code.get(line.strip('\n'))) #鍵值對轉換
return tasklist
2)業務 sql 語句集合
注意 in 後面 {0} 不要加引號,這裏傳入爲元組,params 參數傳入 sparm
= {'Start_time':'2021-04-01','End_time':'2021-05-01'},此處參數可根據需要改變
def sql_d(lst):
# 逐月數據
sql_d_energy_item_month = """select * from d_energy_item_month
where recorddate >= to_date(:Start_time, 'yyyy-MM-dd')
and recorddate < to_date(:End_time, 'yyyy-MM-dd')
and buildid in {0}
order by recorddate asc""".format(lst)
# 逐月數據
sql_d_energy_month = """select d.*,t.name from d_energy_month d join t_device_info t on d.branchid = t.id
where d.recorddate >= to_date(:Start_time, 'yyyy-MM-dd')
and d.recorddate < to_date(:End_time, 'yyyy-MM-dd')
and d.buildid = '{0}'
order by d.recorddate asc""".format(lst)
# 查詢當日數據
sql_energy_item_hour_cheak = """select * from d_energy_item_hour
where trunc(sysdate)=trunc(recorddate)
order by recorddate asc""".format(lst)
sql_collection = [sql_d_energy_item_month, sql_d_energy_item_day, sql_d_energy_item_hour, sql_d_energy_month,
sql_d_energy_day, sql_d_energy_hour, sql_energy_hour_cheak]
#此處省略部分sql語句
return sql_collection
3)業務數據處理
業務數據處理流程,原始數據後處理,這裏不作介紹:
def db_extranction(lst,sparm,sql_type):
"""sql_type--輸入需要操作的sql業務序號"""
sql_=sql_d(lst)[sql_type] #輸出sql語句
db = Oracle_DB() # 實例化一個對象
res=db.search_one(sql_,sparm)
# 數據處理加工
RES=Data_item_factory(res) #此處省略
# res = db.search_all(sql_d_energy_item_month)
print(RES)
return RES
多線程提取數據部分,這裏 tasklist 列表多線程提取數據
import threading
# Pandas讀寫操作Oracle數據庫
from tools.Data_Update_oracle import Oracle_DB
import pandas as pd
from concurrent import futures
if __name__ == '__main__':
#外部傳入
tasklist= read_task_list()
print(tasklist)
# 輸入時間查找範圍參數,可手動修改
sparm = {'Start_time':'2021-04-01','End_time':'2021-05-01'}
lst = tuple(list(tasklist))
#業務類型序號,可手動修改
sql_type=0
#全部提取
db_extranction(lst,sparm,sql_type)
#多線程按字段分批提取
方法一:使用threading模塊的Thread類的構造器創建線程
#threads=[threading.Thread(target=db_extranction,args=(lst,sparm,sql_type)) for lst in tasklist]
# [threads[i].start() for i in range(len(threads))]
方法二:使用python的concurrent庫,這是官方基於 threading 封裝,先安裝該庫
# with futures.ThreadPoolExecutor(len(tasklist)) as executor:
# executor.map([db_extranction(lst,sparm,sql_type) for lst in tasklist],tasklist)
到此整個數據庫取數工具開發流程介紹完畢,就差最後一步分享給小夥伴使用了,做成 GUI 應用此處不做詳細介紹,構建獨立的 python 環境,快速發佈你的應用
本文由 Readfog 進行 AMP 轉碼,版權歸原作者所有。
來源:https://mp.weixin.qq.com/s/uUHECcIqbMuImGsGg7IsXQ