Python读取Hive数据库代码怎么写

数据库   发布日期:2023年07月20日   浏览次数:467

今天小编给大家分享一下Python读取Hive数据库代码怎么写的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。

实际业务读取hive数据库的代码

  1. import logging
  2. import pandas as pd
  3. from impala.dbapi import connect
  4. import sqlalchemy
  5. from sqlalchemy.orm import sessionmaker
  6. import os
  7. import time
  8. import os
  9. import datetime
  10. from dateutil.relativedelta import relativedelta
  11. from typing import Dict, List
  12. import logging
  13. import threading
  14. import pandas as pd
  15. import pickle
  16. class HiveHelper(object):
  17. def __init__(
  18. self,
  19. host='10.2.32.22',
  20. port=21051,
  21. database='ur_ai_dw',
  22. auth_mechanism='LDAP',
  23. user='urbi',
  24. password='Ur#730xd',
  25. logger:logging.Logger=None
  26. ):
  27. self.host = host
  28. self.port = port
  29. self.database = database
  30. self.auth_mechanism = auth_mechanism
  31. self.user = user
  32. self.password = password
  33. self.logger = logger
  34. self.impala_conn = None
  35. self.conn = None
  36. self.cursor = None
  37. self.engine = None
  38. self.session = None
  39. def create_table_code(self, file_name):
  40. '''创建表类代码'''
  41. os.system(f'sqlacodegen {self.connection_str} > {file_name}')
  42. return self.conn
  43. def get_conn(self):
  44. '''创建连接或获取连接'''
  45. if self.conn is None:
  46. engine = self.get_engine()
  47. self.conn = engine.connect()
  48. return self.conn
  49. def get_impala_conn(self):
  50. '''创建连接或获取连接'''
  51. if self.impala_conn is None:
  52. self.impala_conn = connect(
  53. host=self.host,
  54. port=self.port,
  55. database=self.database,
  56. auth_mechanism=self.auth_mechanism,
  57. user=self.user,
  58. password=self.password
  59. )
  60. return self.impala_conn
  61. def get_engine(self):
  62. '''创建连接或获取连接'''
  63. if self.engine is None:
  64. self.engine = sqlalchemy.create_engine('impala://', creator=self.get_impala_conn)
  65. return self.engine
  66. def get_cursor(self):
  67. '''创建连接或获取连接'''
  68. if self.cursor is None:
  69. self.cursor = self.conn.cursor()
  70. return self.cursor
  71. def get_session(self) -> sessionmaker:
  72. '''创建连接或获取连接'''
  73. if self.session is None:
  74. engine = self.get_engine()
  75. Session = sessionmaker(bind=engine)
  76. self.session = Session()
  77. return self.session
  78. def close_conn(self):
  79. '''关闭连接'''
  80. if self.conn is not None:
  81. self.conn.close()
  82. self.conn = None
  83. self.dispose_engine()
  84. self.close_impala_conn()
  85. def close_impala_conn(self):
  86. '''关闭impala连接'''
  87. if self.impala_conn is not None:
  88. self.impala_conn.close()
  89. self.impala_conn = None
  90. def close_session(self):
  91. '''关闭连接'''
  92. if self.session is not None:
  93. self.session.close()
  94. self.session = None
  95. self.dispose_engine()
  96. def dispose_engine(self):
  97. '''释放engine'''
  98. if self.engine is not None:
  99. # self.engine.dispose(close=False)
  100. self.engine.dispose()
  101. self.engine = None
  102. def close_cursor(self):
  103. '''关闭cursor'''
  104. if self.cursor is not None:
  105. self.cursor.close()
  106. self.cursor = None
  107. def get_data(self, sql, auto_close=True) -> pd.DataFrame:
  108. '''查询数据'''
  109. conn = self.get_conn()
  110. data = None
  111. try:
  112. # 异常重试3次
  113. for i in range(3):
  114. try:
  115. data = pd.read_sql(sql, conn)
  116. break
  117. except Exception as ex:
  118. if i == 2:
  119. raise ex # 往外抛出异常
  120. time.sleep(60) # 一分钟后重试
  121. except Exception as ex:
  122. self.logger.exception(ex)
  123. raise ex # 往外抛出异常
  124. finally:
  125. if auto_close:
  126. self.close_conn()
  127. return data
  128. pass
  129. class VarsHelper():
  130. def __init__(self, save_dir, auto_save=True):
  131. self.save_dir = save_dir
  132. self.auto_save = auto_save
  133. self.values = {}
  134. if not os.path.exists(os.path.dirname(self.save_dir)):
  135. os.makedirs(os.path.dirname(self.save_dir))
  136. if os.path.exists(self.save_dir):
  137. with open(self.save_dir, 'rb') as f:
  138. self.values = pickle.load(f)
  139. f.close()
  140. def set_value(self, key, value):
  141. self.values[key] = value
  142. if self.auto_save:
  143. self.save_file()
  144. def get_value(self, key):
  145. return self.values[key]
  146. def has_key(self, key):
  147. return key in self.values.keys()
  148. def save_file(self):
  149. with open(self.save_dir, 'wb') as f:
  150. pickle.dump(self.values, f)
  151. f.close()
  152. pass
  153. class GlobalShareArgs():
  154. args = {
  155. "debug": False
  156. }
  157. def get_args():
  158. return GlobalShareArgs.args
  159. def set_args(args):
  160. GlobalShareArgs.args = args
  161. def set_args_value(key, value):
  162. GlobalShareArgs.args[key] = value
  163. def get_args_value(key, default_value=None):
  164. return GlobalShareArgs.args.get(key, default_value)
  165. def contain_key(key):
  166. return key in GlobalShareArgs.args.keys()
  167. def update(args):
  168. GlobalShareArgs.args.update(args)
  169. pass
  170. class ShareArgs():
  171. args = {
  172. "labels_dir":"./hjx/shop_group/month_w_amt/data/labels", # 标签目录
  173. "labels_output_dir":"./hjx/shop_group/month_w_amt/data/labels_output", # 聚类导出标签目录
  174. "common_datas_dir":"./hjx/data", # 共用数据目录。ur_bi_dw的公共
  175. "only_predict": False, # 只识别,不训练
  176. "delete_model": True, # 先删除模型,仅在训练时使用
  177. "export_excel": False, # 导出excel
  178. "classes": 12, # 聚类数
  179. "batch_size": 16,
  180. "hidden_size": 32,
  181. "max_nrof_epochs": 100,
  182. "learning_rate": 0.0005,
  183. "loss_type": "categorical_crossentropy",
  184. "avg_model_num": 10,
  185. "steps_per_epoch": 4.0, # 4.0
  186. "lr_callback_patience": 4,
  187. "lr_callback_cooldown": 1,
  188. "early_stopping_callback_patience": 6,
  189. "get_data": True,
  190. }
  191. def get_args():
  192. return ShareArgs.args
  193. def set_args(args):
  194. ShareArgs.args = args
  195. def set_args_value(key, value):
  196. ShareArgs.args[key] = value
  197. def get_args_value(key, default_value=None):
  198. return ShareArgs.args.get(key, default_value)
  199. def contain_key(key):
  200. return key in ShareArgs.args.keys()
  201. def update(args):
  202. ShareArgs.args.update(args)
  203. pass
  204. class UrBiGetDatasBase():
  205. # 线程锁列表,同保存路径共用锁
  206. lock_dict:Dict[str, threading.Lock] = {}
  207. # 时间列表,用于判断是否超时
  208. time_dict:Dict[str, datetime.datetime] = {}
  209. # 用于记录是否需要更新超时时间
  210. get_data_timeout_dict:Dict[str, bool] = {}
  211. def __init__(
  212. self,
  213. host='10.2.32.22',
  214. port=21051,
  215. database='ur_ai_dw',
  216. auth_mechanism='LDAP',
  217. user='urbi',
  218. password='Ur#730xd',
  219. save_dir=None,
  220. logger:logging.Logger=None,
  221. ):
  222. self.save_dir = save_dir
  223. self.logger = logger
  224. self.db_helper = HiveHelper(
  225. host=host,
  226. port=port,
  227. database=database,
  228. auth_mechanism=auth_mechanism,
  229. user=user,
  230. password=password,
  231. logger=logger
  232. )
  233. # 创建子目录
  234. if self.save_dir is not None and not os.path.exists(self.save_dir):
  235. os.makedirs(self.save_dir)
  236. self.vars_helper = None
  237. if GlobalShareArgs.get_args_value('debug'):
  238. self.vars_helper = VarsHelper('./hjx/data/vars/UrBiGetDatas')
  239. def close(self):
  240. '''关闭连接'''
  241. self.db_helper.close_conn()
  242. def get_last_time(self, key_name) -> bool:
  243. '''获取是否超时'''
  244. # 转静态路径,确保唯一性
  245. key_name = os.path.abspath(key_name)
  246. if self.vars_helper is not None and self.vars_helper.has_key('UrBiGetDatasBase.time_list'):
  247. UrBiGetDatasBase.time_dict = self.vars_helper.get_value('UrBiGetDatasBase.time_list')
  248. timeout = 12 # 12小时
  249. if GlobalShareArgs.get_args_value('debug'):
  250. timeout = 24 # 24小时
  251. get_data_timeout = False
  252. if key_name not in UrBiGetDatasBase.time_dict.keys() or (datetime.datetime.today() - UrBiGetDatasBase.time_dict[key_name]).total_seconds()>(timeout*60*60):
  253. self.logger.info('超时%d小时,重新查数据:%s', timeout, key_name)
  254. # UrBiGetDatasBase.time_list[key_name] = datetime.datetime.today()
  255. get_data_timeout = True
  256. else:
  257. self.logger.info('未超时%d小时,跳过查数据:%s', timeout, key_name)
  258. # if self.vars_helper is not None :
  259. # self.vars_helper.set_value('UrBiGetDatasBase.time_list', UrBiGetDatasBase.time_list)
  260. UrBiGetDatasBase.get_data_timeout_dict[key_name] = get_data_timeout
  261. return get_data_timeout
  262. def save_last_time(self, key_name):
  263. '''更新状态超时'''
  264. # 转静态路径,确保唯一性
  265. key_name = os.path.abspath(key_name)
  266. if UrBiGetDatasBase.get_data_timeout_dict[key_name]:
  267. UrBiGetDatasBase.time_dict[key_name] = datetime.datetime.today()
  268. if self.vars_helper is not None :
  269. UrBiGetDatasBase.time_dict[key_name] = datetime.datetime.today()
  270. self.vars_helper.set_value('UrBiGetDatasBase.time_list', UrBiGetDatasBase.time_dict)
  271. def get_lock(self, key_name) -> threading.Lock:
  272. '''获取锁'''
  273. # 转静态路径,确保唯一性
  274. key_name = os.path.abspath(key_name)
  275. if key_name not in UrBiGetDatasBase.lock_dict.keys():
  276. UrBiGetDatasBase.lock_dict[key_name] = threading.Lock()
  277. return UrBiGetDatasBase.lock_dict[key_name]
  278. def get_data_of_date(
  279. self,
  280. save_dir,
  281. sql,
  282. sort_columns:List[str],
  283. del_index_list=[-1], # 删除最后下标
  284. start_date = datetime.datetime(2017, 1, 1), # 开始时间
  285. offset = relativedelta(months=3), # 时间间隔
  286. date_format_fun = lambda d: '%04d%02d01' % (d.year, d.month), # 查询语句中替代时间参数的格式化
  287. filename_format_fun = lambda d: '%04d%02d.csv' % (d.year, d.month), # 查询语句中替代时间参数的格式化
  288. stop_date = '20700101', # 超过时间则停止
  289. data_format_fun = None, # 格式化数据
  290. ):
  291. '''分时间增量读取数据'''
  292. # 创建文件夹
  293. if not os.path.exists(save_dir):
  294. os.makedirs(save_dir)
  295. else:
  296. #删除最后一个文件
  297. file_list = os.listdir(save_dir)
  298. if len(file_list)>0:
  299. file_list.sort()
  300. for del_index in del_index_list:
  301. os.remove(os.path.join(save_dir,file_list[del_index]))
  302. print('删除最后一个文件:', file_list[del_index])
  303. select_index = -1
  304. # start_date = datetime.datetime(2017, 1, 1)
  305. while True:
  306. end_date = start_date + offset
  307. start_date_str = date_format_fun(start_date)
  308. end_date_str = date_format_fun(end_date)
  309. self.logger.info('date: %s-%s', start_date_str, end_date_str)
  310. file_path = os.path.join(save_dir, filename_format_fun(start_date))
  311. # self.logger.info('file_path: %s', file_path)
  312. if not os.path.exists(file_path):
  313. data:pd.DataFrame = self.db_helper.get_data(sql % (start_date_str, end_date_str))
  314. if data is None:
  315. break
  316. self.logger.info('data: %d', len(data))
  317. # self.logger.info('data: %d', data.columns)
  318. if len(data)>0:
  319. select_index+=1
  320. if data_format_fun is not None:
  321. data = data_format_fun(data)
  322. # 排序
  323. data = data.sort_values(sort_columns)
  324. data.to_csv(file_path)
  325. elif select_index!=-1:
  326. break
  327. elif stop_date < start_date_str:
  328. raise Exception("读取数据异常,时间超出最大值!")
  329. start_date = end_date
  330. pass
  331. class UrBiGetDatas(UrBiGetDatasBase):
  332. def __init__(
  333. self,
  334. host='10.2.32.22',
  335. port=21051,
  336. database='ur_ai_dw',
  337. auth_mechanism='LDAP',
  338. user='urbi',
  339. password='Ur#730xd',
  340. save_dir='./hjx/data/ur_bi_dw_data',
  341. logger:logging.Logger=None
  342. ):
  343. self.save_dir = save_dir
  344. self.logger = logger
  345. super().__init__(
  346. host=host,
  347. port=port,
  348. database=database,
  349. auth_mechanism=auth_mechanism,
  350. user=user,
  351. password=password,
  352. save_dir=save_dir,
  353. logger=logger
  354. )
  355. def get_dim_date(self):
  356. '''日期数据'''
  357. file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_date.csv')
  358. now_lock = self.get_lock(file_path)
  359. now_lock.acquire() # 加锁
  360. try:
  361. # 设置超时4小时才重新查数据
  362. if not self.get_last_time(file_path):
  363. return
  364. sql = 'SELECT * FROM ur_bi_dw.dim_date'
  365. data:pd.DataFrame = self.db_helper.get_data(sql)
  366. columns = list(data.columns)
  367. columns = {c:'dim_date.'+c for c in columns}
  368. data = data.rename(columns=columns)
  369. data = data.sort_values(['dim_date.date_key'])
  370. data.to_csv(file_path)
  371. # 更新超时时间
  372. self.save_last_time(file_path)
  373. except Exception as ex:
  374. self.logger.exception(ex)
  375. raise ex # 往外抛出异常
  376. finally:
  377. now_lock.release() # 释放锁
  378. def get_dim_shop(self):
  379. '''店铺数据'''
  380. file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_shop.csv')
  381. now_lock = self.get_lock(file_path)
  382. now_lock.acquire() # 加锁
  383. try:
  384. # 设置超时4小时才重新查数据
  385. if not self.get_last_time(file_path):
  386. return
  387. sql = 'SELECT * FROM ur_bi_dw.dim_shop'
  388. data:pd.DataFrame = self.db_helper.get_data(sql)
  389. columns = list(data.columns)
  390. columns = {c:'dim_shop.'+c for c in columns}
  391. data = data.rename(columns=columns)
  392. data = data.sort_values(['dim_shop.shop_no'])
  393. data.to_csv(file_path)
  394. # 更新超时时间
  395. self.save_last_time(file_path)
  396. except Exception as ex:
  397. self.logger.exception(ex)
  398. raise ex # 往外抛出异常
  399. finally:
  400. now_lock.release() # 释放锁
  401. def get_dim_vip(self):
  402. '''会员数据'''
  403. sub_dir = os.path.join(self.save_dir,'vip_no')
  404. now_lock = self.get_lock(sub_dir)
  405. now_lock.acquire() # 加锁
  406. try:
  407. # 设置超时4小时才重新查数据
  408. if not self.get_last_time(sub_dir):
  409. return
  410. sql = '''SELECT dv.*, dd.date_key, dd.date_name2
  411. FROM ur_bi_dw.dim_vip as dv
  412. INNER JOIN ur_bi_dw.dim_date as dd
  413. ON dv.card_create_date=dd.date_name2
  414. where dd.date_key >= %s
  415. and dd.date_key < %s'''
  416. # data:pd.DataFrame = self.db_helper.get_data(sql)
  417. sort_columns = ['dv.vip_no']
  418. # TODO:
  419. self.get_data_of_date(
  420. save_dir=sub_dir,
  421. sql=sql,
  422. sort_columns=sort_columns,
  423. start_date=datetime.datetime(2017, 1, 1), # 开始时间
  424. offset=relativedelta(years=1)
  425. )
  426. # 更新超时时间
  427. self.save_last_time(sub_dir)
  428. except Exception as ex:
  429. self.logger.exception(ex)
  430. raise ex # 往外抛出异常
  431. finally:
  432. now_lock.release() # 释放锁
  433. def get_weather(self):
  434. '''天气数据'''
  435. sub_dir = os.path.join(self.save_dir,'weather')
  436. now_lock = self.get_lock(sub_dir)
  437. now_lock.acquire() # 加锁
  438. try:
  439. # 设置超时4小时才重新查数据
  440. if not self.get_last_time(sub_dir):
  441. return
  442. sql = """
  443. select weather.* from ur_bi_ods.ods_base_weather_data_1200 as weather
  444. where weather.date_key>=%s and weather.date_key<%s
  445. """
  446. sort_columns = ['weather.date_key','weather.areaid']
  447. def data_format_fun(data):
  448. columns = list(data.columns)
  449. columns = {c:'weather.'+c for c in columns}
  450. data = data.rename(columns=columns)
  451. return data
  452. self.get_data_of_date(
  453. save_dir=sub_dir,
  454. sql=sql,
  455. sort_columns=sort_columns,
  456. del_index_list=[-2, -1], # 删除最后下标
  457. data_format_fun=data_format_fun,
  458. )
  459. &

以上就是Python读取Hive数据库代码怎么写的详细内容,更多关于Python读取Hive数据库代码怎么写的资料请关注九品源码其它相关文章!