import logging import pandas as pd from impala.dbapi import connect import sqlalchemy from sqlalchemy.orm import sessionmaker import os import time import os import datetime from dateutil.relativedelta import relativedelta from typing import Dict, List import logging import threading import pandas as pd import pickle class HiveHelper(object): def __init__( self, host='10.2.32.22', port=21051, database='ur_ai_dw', auth_mechanism='LDAP', user='urbi', password='Ur#730xd', logger:logging.Logger=None ): self.host = host self.port = port self.database = database self.auth_mechanism = auth_mechanism self.user = user self.password = password self.logger = logger self.impala_conn = None self.conn = None self.cursor = None self.engine = None self.session = None def create_table_code(self, file_name): '''创建表类代码''' os.system(f'sqlacodegen {self.connection_str} > {file_name}') return self.conn def get_conn(self): '''创建连接或获取连接''' if self.conn is None: engine = self.get_engine() self.conn = engine.connect() return self.conn def get_impala_conn(self): '''创建连接或获取连接''' if self.impala_conn is None: self.impala_conn = connect( host=self.host, port=self.port, database=self.database, auth_mechanism=self.auth_mechanism, user=self.user, password=self.password ) return self.impala_conn def get_engine(self): '''创建连接或获取连接''' if self.engine is None: self.engine = sqlalchemy.create_engine('impala://', creator=self.get_impala_conn) return self.engine def get_cursor(self): '''创建连接或获取连接''' if self.cursor is None: self.cursor = self.conn.cursor() return self.cursor def get_session(self) -> sessionmaker: '''创建连接或获取连接''' if self.session is None: engine = self.get_engine() Session = sessionmaker(bind=engine) self.session = Session() return self.session def close_conn(self): '''关闭连接''' if self.conn is not None: self.conn.close() self.conn = None self.dispose_engine() self.close_impala_conn() def close_impala_conn(self): '''关闭impala连接''' if self.impala_conn is not None: self.impala_conn.close() self.impala_conn = None def close_session(self): '''关闭连接''' if self.session is not None: self.session.close() self.session = None self.dispose_engine() def dispose_engine(self): '''释放engine''' if self.engine is not None: # self.engine.dispose(close=False) self.engine.dispose() self.engine = None def close_cursor(self): '''关闭cursor''' if self.cursor is not None: self.cursor.close() self.cursor = None def get_data(self, sql, auto_close=True) -> pd.DataFrame: '''查询数据''' conn = self.get_conn() data = None try: # 异常重试3次 for i in range(3): try: data = pd.read_sql(sql, conn) break except Exception as ex: if i == 2: raise ex # 往外抛出异常 time.sleep(60) # 一分钟后重试 except Exception as ex: self.logger.exception(ex) raise ex # 往外抛出异常 finally: if auto_close: self.close_conn() return data pass class VarsHelper(): def __init__(self, save_dir, auto_save=True): self.save_dir = save_dir self.auto_save = auto_save self.values = {} if not os.path.exists(os.path.dirname(self.save_dir)): os.makedirs(os.path.dirname(self.save_dir)) if os.path.exists(self.save_dir): with open(self.save_dir, 'rb') as f: self.values = pickle.load(f) f.close() def set_value(self, key, value): self.values[key] = value if self.auto_save: self.save_file() def get_value(self, key): return self.values[key] def has_key(self, key): return key in self.values.keys() def save_file(self): with open(self.save_dir, 'wb') as f: pickle.dump(self.values, f) f.close() pass class GlobalShareArgs(): args = { "debug": False } def get_args(): return GlobalShareArgs.args def set_args(args): GlobalShareArgs.args = args def set_args_value(key, value): GlobalShareArgs.args[key] = value def get_args_value(key, default_value=None): return GlobalShareArgs.args.get(key, default_value) def contain_key(key): return key in GlobalShareArgs.args.keys() def update(args): GlobalShareArgs.args.update(args) pass class ShareArgs(): args = { "labels_dir":"./hjx/shop_group/month_w_amt/data/labels", # 标签目录 "labels_output_dir":"./hjx/shop_group/month_w_amt/data/labels_output", # 聚类导出标签目录 "common_datas_dir":"./hjx/data", # 共用数据目录。ur_bi_dw的公共 "only_predict": False, # 只识别,不训练 "delete_model": True, # 先删除模型,仅在训练时使用 "export_excel": False, # 导出excel "classes": 12, # 聚类数 "batch_size": 16, "hidden_size": 32, "max_nrof_epochs": 100, "learning_rate": 0.0005, "loss_type": "categorical_crossentropy", "avg_model_num": 10, "steps_per_epoch": 4.0, # 4.0 "lr_callback_patience": 4, "lr_callback_cooldown": 1, "early_stopping_callback_patience": 6, "get_data": True, } def get_args(): return ShareArgs.args def set_args(args): ShareArgs.args = args def set_args_value(key, value): ShareArgs.args[key] = value def get_args_value(key, default_value=None): return ShareArgs.args.get(key, default_value) def contain_key(key): return key in ShareArgs.args.keys() def update(args): ShareArgs.args.update(args) pass class UrBiGetDatasBase(): # 线程锁列表,同保存路径共用锁 lock_dict:Dict[str, threading.Lock] = {} # 时间列表,用于判断是否超时 time_dict:Dict[str, datetime.datetime] = {} # 用于记录是否需要更新超时时间 get_data_timeout_dict:Dict[str, bool] = {} def __init__( self, host='10.2.32.22', port=21051, database='ur_ai_dw', auth_mechanism='LDAP', user='urbi', password='Ur#730xd', save_dir=None, logger:logging.Logger=None, ): self.save_dir = save_dir self.logger = logger self.db_helper = HiveHelper( host=host, port=port, database=database, auth_mechanism=auth_mechanism, user=user, password=password, logger=logger ) # 创建子目录 if self.save_dir is not None and not os.path.exists(self.save_dir): os.makedirs(self.save_dir) self.vars_helper = None if GlobalShareArgs.get_args_value('debug'): self.vars_helper = VarsHelper('./hjx/data/vars/UrBiGetDatas') def close(self): '''关闭连接''' self.db_helper.close_conn() def get_last_time(self, key_name) -> bool: '''获取是否超时''' # 转静态路径,确保唯一性 key_name = os.path.abspath(key_name) if self.vars_helper is not None and self.vars_helper.has_key('UrBiGetDatasBase.time_list'): UrBiGetDatasBase.time_dict = self.vars_helper.get_value('UrBiGetDatasBase.time_list') timeout = 12 # 12小时 if GlobalShareArgs.get_args_value('debug'): timeout = 24 # 24小时 get_data_timeout = False if key_name not in UrBiGetDatasBase.time_dict.keys() or (datetime.datetime.today() - UrBiGetDatasBase.time_dict[key_name]).total_seconds()>(timeout*60*60): self.logger.info('超时%d小时,重新查数据:%s', timeout, key_name) # UrBiGetDatasBase.time_list[key_name] = datetime.datetime.today() get_data_timeout = True else: self.logger.info('未超时%d小时,跳过查数据:%s', timeout, key_name) # if self.vars_helper is not None : # self.vars_helper.set_value('UrBiGetDatasBase.time_list', UrBiGetDatasBase.time_list) UrBiGetDatasBase.get_data_timeout_dict[key_name] = get_data_timeout return get_data_timeout def save_last_time(self, key_name): '''更新状态超时''' # 转静态路径,确保唯一性 key_name = os.path.abspath(key_name) if UrBiGetDatasBase.get_data_timeout_dict[key_name]: UrBiGetDatasBase.time_dict[key_name] = datetime.datetime.today() if self.vars_helper is not None : UrBiGetDatasBase.time_dict[key_name] = datetime.datetime.today() self.vars_helper.set_value('UrBiGetDatasBase.time_list', UrBiGetDatasBase.time_dict) def get_lock(self, key_name) -> threading.Lock: '''获取锁''' # 转静态路径,确保唯一性 key_name = os.path.abspath(key_name) if key_name not in UrBiGetDatasBase.lock_dict.keys(): UrBiGetDatasBase.lock_dict[key_name] = threading.Lock() return UrBiGetDatasBase.lock_dict[key_name] def get_data_of_date( self, save_dir, sql, sort_columns:List[str], del_index_list=[-1], # 删除最后下标 start_date = datetime.datetime(2017, 1, 1), # 开始时间 offset = relativedelta(months=3), # 时间间隔 date_format_fun = lambda d: '%04d%02d01' % (d.year, d.month), # 查询语句中替代时间参数的格式化 filename_format_fun = lambda d: '%04d%02d.csv' % (d.year, d.month), # 查询语句中替代时间参数的格式化 stop_date = '20700101', # 超过时间则停止 data_format_fun = None, # 格式化数据 ): '''分时间增量读取数据''' # 创建文件夹 if not os.path.exists(save_dir): os.makedirs(save_dir) else: #删除最后一个文件 file_list = os.listdir(save_dir) if len(file_list)>0: file_list.sort() for del_index in del_index_list: os.remove(os.path.join(save_dir,file_list[del_index])) print('删除最后一个文件:', file_list[del_index]) select_index = -1 # start_date = datetime.datetime(2017, 1, 1) while True: end_date = start_date + offset start_date_str = date_format_fun(start_date) end_date_str = date_format_fun(end_date) self.logger.info('date: %s-%s', start_date_str, end_date_str) file_path = os.path.join(save_dir, filename_format_fun(start_date)) # self.logger.info('file_path: %s', file_path) if not os.path.exists(file_path): data:pd.DataFrame = self.db_helper.get_data(sql % (start_date_str, end_date_str)) if data is None: break self.logger.info('data: %d', len(data)) # self.logger.info('data: %d', data.columns) if len(data)>0: select_index+=1 if data_format_fun is not None: data = data_format_fun(data) # 排序 data = data.sort_values(sort_columns) data.to_csv(file_path) elif select_index!=-1: break elif stop_date < start_date_str: raise Exception("读取数据异常,时间超出最大值!") start_date = end_date pass class UrBiGetDatas(UrBiGetDatasBase): def __init__( self, host='10.2.32.22', port=21051, database='ur_ai_dw', auth_mechanism='LDAP', user='urbi', password='Ur#730xd', save_dir='./hjx/data/ur_bi_dw_data', logger:logging.Logger=None ): self.save_dir = save_dir self.logger = logger super().__init__( host=host, port=port, database=database, auth_mechanism=auth_mechanism, user=user, password=password, save_dir=save_dir, logger=logger ) def get_dim_date(self): '''日期数据''' file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_date.csv') now_lock = self.get_lock(file_path) now_lock.acquire() # 加锁 try: # 设置超时4小时才重新查数据 if not self.get_last_time(file_path): return sql = 'SELECT * FROM ur_bi_dw.dim_date' data:pd.DataFrame = self.db_helper.get_data(sql) columns = list(data.columns) columns = {c:'dim_date.'+c for c in columns} data = data.rename(columns=columns) data = data.sort_values(['dim_date.date_key']) data.to_csv(file_path) # 更新超时时间 self.save_last_time(file_path) except Exception as ex: self.logger.exception(ex) raise ex # 往外抛出异常 finally: now_lock.release() # 释放锁 def get_dim_shop(self): '''店铺数据''' file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_shop.csv') now_lock = self.get_lock(file_path) now_lock.acquire() # 加锁 try: # 设置超时4小时才重新查数据 if not self.get_last_time(file_path): return sql = 'SELECT * FROM ur_bi_dw.dim_shop' data:pd.DataFrame = self.db_helper.get_data(sql) columns = list(data.columns) columns = {c:'dim_shop.'+c for c in columns} data = data.rename(columns=columns) data = data.sort_values(['dim_shop.shop_no']) data.to_csv(file_path) # 更新超时时间 self.save_last_time(file_path) except Exception as ex: self.logger.exception(ex) raise ex # 往外抛出异常 finally: now_lock.release() # 释放锁 def get_dim_vip(self): '''会员数据''' sub_dir = os.path.join(self.save_dir,'vip_no') now_lock = self.get_lock(sub_dir) now_lock.acquire() # 加锁 try: # 设置超时4小时才重新查数据 if not self.get_last_time(sub_dir): return sql = '''SELECT dv.*, dd.date_key, dd.date_name2 FROM ur_bi_dw.dim_vip as dv INNER JOIN ur_bi_dw.dim_date as dd ON dv.card_create_date=dd.date_name2 where dd.date_key >= %s and dd.date_key < %s''' # data:pd.DataFrame = self.db_helper.get_data(sql) sort_columns = ['dv.vip_no'] # TODO: self.get_data_of_date( save_dir=sub_dir, sql=sql, sort_columns=sort_columns, start_date=datetime.datetime(2017, 1, 1), # 开始时间 offset=relativedelta(years=1) ) # 更新超时时间 self.save_last_time(sub_dir) except Exception as ex: self.logger.exception(ex) raise ex # 往外抛出异常 finally: now_lock.release() # 释放锁 def get_weather(self): '''天气数据''' sub_dir = os.path.join(self.save_dir,'weather') now_lock = self.get_lock(sub_dir) now_lock.acquire() # 加锁 try: # 设置超时4小时才重新查数据 if not self.get_last_time(sub_dir): return sql = """ select weather.* from ur_bi_ods.ods_base_weather_data_1200 as weather where weather.date_key>=%s and weather.date_key<%s """ sort_columns = ['weather.date_key','weather.areaid'] def data_format_fun(data): columns = list(data.columns) columns = {c:'weather.'+c for c in columns} data = data.rename(columns=columns) return data self.get_data_of_date( save_dir=sub_dir, sql=sql, sort_columns=sort_columns, del_index_list=[-2, -1], # 删除最后下标 data_format_fun=data_format_fun, ) # 更新超时时间 self.save_last_time(sub_dir) except Exception as ex: self.logger.exception(ex) raise ex # 往外抛出异常 finally: now_lock.release() # 释放锁 def get_weather_city(self): '''天气城市数据''' file_path = os.path.join(self.save_dir,'ur_bi_dw.weather_city.csv') now_lock = self.get_lock(file_path) now_lock.acquire() # 加锁 try: # 设置超时4小时才重新查数据 if not self.get_last_time(file_path): return sql = 'SELECT * FROM ur_bi_dw.dim_weather_city as weather_city' data:pd.DataFrame = self.db_helper.get_data(sql) columns = list(data.columns) columns = {c:'weather_city.'+c for c in columns} data = data.rename(columns=columns) data.to_csv(file_path) # 更新超时时间 self.save_last_time(file_path) except Exception as ex: self.logger.exception(ex) raise ex # 往外抛出异常 finally: now_lock.release() # 释放锁 def get_dim_goods(self): '''货品数据''' file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_goods.csv') now_lock = self.get_lock(file_path) now_lock.acquire() # 加锁 try: # 设置超时4小时才重新查数据 if not self.get_last_time(file_path): return sql = 'SELECT * FROM ur_bi_dw.dim_goods' data:pd.DataFrame = self.db_helper.get_data(sql) columns = list(data.columns) columns = {c:'dim_goods.'+c for c in columns} data = data.rename(columns=columns) data.to_csv(file_path) # 更新超时时间 self.save_last_time(file_path) except Exception as ex: self.logger.exception(ex) raise ex # 往外抛出异常 finally: now_lock.release() # 释放锁 def get_dim_goods_market_shop_date(self): '''店铺商品生命周期数据''' file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_goods_market_shop_date.csv') now_lock = self.get_lock(file_path) now_lock.acquire() # 加锁 try: # 设置超时4小时才重新查数据 if not self.get_last_time(file_path): return # sql = 'SELECT * FROM ur_bi_dw.dim_goods_market_shop_date as goods_shop_date' sql = ''' select shop_no, sku_no, shop_market_date, lifecycle_end_date, lifecycle_days FROM ur_bi_dw.dim_goods_market_shop_date where lifecycle_end_date is not null ''' data:pd.DataFrame = self.db_helper.get_data(sql) columns = list(data.columns) columns = {c:c.replace('lifecycle_end_date.','') for c in columns} data = data.rename(columns=columns) data = data.sort_values(['shop_market_date']) data.to_csv(file_path, index=False) # 更新超时时间 self.save_last_time(file_path) except Exception as ex: self.logger.exception(ex) raise ex # 往外抛出异常 finally: now_lock.release() # 释放锁 def get_dim_goods_market_date(self): '''全国商品生命周期数据''' file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_goods_market_date.csv') now_lock = self.get_lock(file_path) now_lock.acquire() # 加锁 try: # 设置超时4小时才重新查数据 if not self.get_last_time(file_path): return sql = ''' select * FROM ur_bi_dw.dim_goods_market_date ''' data:pd.DataFrame = self.db_helper.get_data(sql) columns = list(data.columns) columns = {c:'dim_goods_market_date.'+c for c in columns} data = data.rename(columns=columns) data = data.sort_values(['dim_goods_market_date.sku_no']) data.to_csv(file_path, index=False) # 更新超时时间 self.save_last_time(file_path) except Exception as ex: self.logger.exception(ex) raise ex # 往外抛出异常 finally: now_lock.release() # 释放锁 def get_dim_goods_color_dev_sizes(self): '''商品开发码数数据''' file_path = os.path.join(self.save_dir,'dim_goods_color_dev_sizes.csv') now_lock = self.get_lock(file_path) now_lock.acquire() # 加锁 try: # 设置超时4小时才重新查数据 if not self.get_last_time(file_path): return # sql = 'SELECT * FROM ur_bi_dw.dim_goods_market_shop_date as goods_shop_date' sql = 'SELECT * FROM ur_bi_dm.dim_goods_color_dev_sizes' data:pd.DataFrame = self.db_helper.get_data(sql) columns = list(data.columns) columns = {c:c.replace('dim_goods_color_dev_sizes.','') for c in columns} data = data.rename(columns=columns) data.to_csv(file_path, index=False) # 更新超时时间 self.save_last_time(file_path) except Exception as ex: self.logger.exception(ex) raise ex # 往外抛出异常 finally: now_lock.release() # 释放锁 def get_dwd_daily_sales_size(self): '''实际销售金额''' sub_dir = os.path.join(self.save_dir,'dwd_daily_sales_size_all') now_lock = self.get_lock(sub_dir) now_lock.acquire() # 加锁 try: # 设置超时4小时才重新查数据 if not self.get_last_time(sub_dir): return sql = """ select shop_no,sku_no,date_key,`size`, sum(tag_price) as `tag_price`, sum(sales_qty) as `sales_qty`, sum(sales_tag_amt) as `sales_tag_amt`, sum(sales_amt) as `sales_amt`, count(0) as `sales_count` from ur_bi_dw.dwd_daily_sales_size as sales where sales.date_key>=%s and sales.date_key<%s and sales.currency_code='CNY' group by shop_no,sku_no,date_key,`size` """ sort_columns = ['date_key','shop_no','sku_no'] self.get_data_of_date( save_dir=sub_dir, sql=sql, sort_columns=sort_columns, start_date=datetime.datetime(2017, 1, 1), # 开始时间 ) # 更新超时时间 self.save_last_time(sub_dir) except Exception as ex: self.logger.exception(ex) raise ex # 往外抛出异常 finally: now_lock.release() # 释放锁 def get_dwd_daily_delivery_size(self): '''实际配货金额''' sub_dir = os.path.join(self.save_dir,'dwd_daily_delivery_size_all') now_lock = self.get_lock(sub_dir) now_lock.acquire() # 加锁 try: # 设置超时4小时才重新查数据 if not self.get_last_time(sub_dir): return sql = """ select shop_no,sku_no,date_key,`size`, sum(delivery.shop_distr_received_qty) as `shop_distr_received_qty`, sum(delivery.shop_distr_received_amt) as `shop_distr_received_amt`, sum(delivery.online_distr_received_qty) as `online_distr_received_qty`, sum(delivery.online_distr_received_amt) as `online_distr_received_amt`, sum(delivery.pr_received_qty) as `pr_received_qty`, count(0) as `delivery_count` from ur_bi_dw.dwd_daily_delivery_size as delivery where delivery.date_key>=%s and delivery.date_key<%s and delivery.currency_code='CNY' group by shop_no,sku_no,date_key,`size` """ sort_columns = ['date_key','shop_no','sku_no'] self.get_data_of_date( save_dir=sub_dir, sql=sql, sort_columns=sort_columns, start_date=datetime.datetime(2017, 1, 1), # 开始时间 ) # 更新超时时间 self.save_last_time(sub_dir) except Exception as ex: self.logger.exception(ex) raise ex # 往外抛出异常 finally: now_lock.release() # 释放锁 def get_v_last_nation_sales_status(self): '''商品畅滞销数据''' file_path = os.path.join(self.save_dir,'v_last_nation_sales_status.csv') now_lock = self.get_lock(file_path) now_lock.acquire() # 加锁 try: # 设置超时4小时才重新查数据 if not self.get_last_time(file_path): return sql = 'SELECT * FROM ur_bi_dw.v_last_nation_sales_status' data:pd.DataFrame = self.db_helper.get_data(sql) columns = list(data.columns) columns = {c:c.replace('v_last_nation_sales_status.','') for c in columns} data = data.rename(columns=columns) data.to_csv(file_path, index=False) # 更新超时时间 self.save_last_time(file_path) except Exception as ex: self.logger.exception(ex) raise ex # 往外抛出异常 finally: now_lock.release() # 释放锁 def get_dwd_daily_finacial_goods(self): '''商品成本价数据''' file_path = os.path.join(self.save_dir,'dwd_daily_finacial_goods.csv') now_lock = self.get_lock(file_path) now_lock.acquire() # 加锁 try: # 设置超时4小时才重新查数据 if not self.get_last_time(file_path): return sql = """ select t1.sku_no,t1.`size`,t1.cost_tax_incl from ur_bi_dw.dwd_daily_finacial_goods as t1 inner join ( select sku_no,`size`,max(date_key) as date_key from ur_bi_dw.dwd_daily_finacial_goods where currency_code='CNY' and country_code='CN' group by sku_no,`size` ) as t2 on t2.sku_no=t1.sku_no and t2.`size`=t1.`size` and t2.date_key=t1.date_key where t1.currency_code='CNY' and t1.country_code='CN' """ data:pd.DataFrame = self.db_helper.get_data(sql) columns = list(data.columns) columns = {c:c.replace('t1.','') for c in columns} data = data.rename(columns=columns) data.to_csv(file_path, index=False) # 更新超时时间 self.save_last_time(file_path) except Exception as ex: self.logger.exception(ex) raise ex # 往外抛出异常 finally: now_lock.release() # 释放锁 def get_dim_size_group(self): '''尺码映射数据''' file_path = os.path.join(self.save_dir,'dim_size_group.csv') now_lock = self.get_lock(file_path) now_lock.acquire() # 加锁 try: # 设置超时4小时才重新查数据 if not self.get_last_time(file_path): return sql = """select * from ur_bi_dw.dim_size_group""" data:pd.DataFrame = self.db_helper.get_data(sql) columns = list(data.columns) columns = {c:c.replace('dim_size_group.','') for c in columns} data = data.rename(columns=columns) data.to_csv(file_path, index=False) # 更新超时时间 self.save_last_time(file_path) except Exception as ex: self.logger.exception(ex) raise ex # 往外抛出异常 finally: now_lock.release() # 释放锁 pass def get_common_datas( host='10.2.32.22', port=21051, database='ur_ai_dw', auth_mechanism='LDAP', user='urbi', password='Ur#730xd', logger:logging.Logger=None): # 共用文件 common_datas_dir = ShareArgs.get_args_value('common_datas_dir') common_ur_bi_dir = os.path.join(common_datas_dir, 'ur_bi_data') ur_bi_get_datas = UrBiGetDatas( host=host, port=port, database=database, auth_mechanism=auth_mechanism, user=user, password=password, save_dir=common_ur_bi_dir, logger=logger ) try: logger.info('正在查询日期数据...') ur_bi_get_datas.get_dim_date() logger.info('查询日期数据完成!') logger.info('正在查询店铺数据...') ur_bi_get_datas.get_dim_shop() logger.info('查询店铺数据完成!') logger.info('正在查询天气数据...') ur_bi_get_datas.get_weather() logger.info('查询天气数据完成!') logger.info('正在查询天气城市数据...') ur_bi_get_datas.get_weather_city() logger.info('查询天气城市数据完成!') logger.info('正在查询货品数据...') ur_bi_get_datas.get_dim_goods() logger.info('查询货品数据完成!') logger.info('正在查询实际销量数据...') ur_bi_get_datas.get_dwd_daily_sales_size() logger.info('查询实际销量数据完成!') except Exception as ex: logger.exception(ex) raise ex # 往外抛出异常 finally: ur_bi_get_datas.close() pass class CustomUrBiGetDatas(UrBiGetDatasBase): def __init__( self, host='10.2.32.22', port=21051, database='ur_ai_dw', auth_mechanism='LDAP', user='urbi', password='Ur#730xd', save_dir='./hjx/data/ur_bi_data', logger:logging.Logger=None ): self.save_dir = save_dir self.logger = logger super().__init__( host=host, port=port, database=database, auth_mechanism=auth_mechanism, user=user, password=password, save_dir=save_dir, logger=logger ) def get_sales_goal_amt(self): '''销售目标金额''' file_path = os.path.join(self.save_dir,'month_of_year_sales_goal_amt.csv') now_lock = self.get_lock(file_path) now_lock.acquire() # 加锁 try: # 设置超时4小时才重新查数据 if not self.get_last_time(file_path): return sql = ''' select sales_goal.shop_no, if(sales_goal.serial='Y','W',sales_goal.serial) as `sales_goal.serial`, dates.month_of_year, sum(sales_goal.sales_goal_amt) as sales_goal_amt from ur_bi_dw.dwd_sales_goal_west as sales_goal inner join ur_bi_dw.dim_date as dates on sales_goal.date_key = dates.date_key group by sales_goal.shop_no, if(sales_goal.serial='Y','W',sales_goal.serial), dates.month_of_year ''' data:pd.DataFrame = self.db_helper.get_data(sql) data = data.rename(columns={ 'shop_no':'sales_goal.shop_no', 'serial':'sales_goal.serial', 'month_of_year':'dates.month_of_year', }) # 排序 data = data.sort_values(['sales_goal.shop_no','sales_goal.serial','dates.month_of_year']) data.to_csv(file_path) # 更新超时时间 self.save_last_time(file_path) except Exception as ex: self.logger.exception(ex) raise ex # 往外抛出异常 finally: now_lock.release() # 释放锁 def get_shop_serial_area(self): '''店-系列面积''' file_path = os.path.join(self.save_dir,'shop_serial_area.csv') now_lock = self.get_lock(file_path) now_lock.acquire() # 加锁 try: # 设置超时4小时才重新查数据 if not self.get_last_time(file_path): return sql = ''' select shop_serial_area.shop_no, if(shop_serial_area.serial='Y','W',shop_serial_area.serial) as `shop_serial_area.serial`, shop_serial_area.month_of_year, sum(shop_serial_area.area) as `shop_serial_area.area` from ur_bi_dw.dwd_shop_serial_area as shop_serial_area where shop_serial_area.area is not null group by shop_serial_area.shop_no,if(shop_serial_area.serial='Y','W',shop_serial_area.serial),shop_serial_area.month_of_year ''' data:pd.DataFrame = self.db_helper.get_data(sql) data = data.rename(columns={ 'shop_no':'shop_serial_area.shop_no', 'serial':'shop_serial_area.serial', 'month_of_year':'shop_serial_area.month_of_year', 'area':'shop_serial_area.area', }) # 排序 data = data.sort_values(['shop_serial_area.shop_no','shop_serial_area.serial','shop_serial_area.month_of_year']) data.to_csv(file_path) # 更新超时时间 self.save_last_time(file_path) except Exception as ex: self.logger.exception(ex) raise ex # 往外抛出异常 finally: now_lock.release() # 释放锁 pass def get_datas( host='10.2.32.22', port=21051, database='ur_ai_dw', auth_mechanism='LDAP', user='urbi', password='Ur#730xd', save_dir='./data/sales_forecast/ur_bi_dw_data', logger:logging.Logger=None): ur_bi_get_datas = CustomUrBiGetDatas( host=host, port=port, database=database, auth_mechanism=auth_mechanism, user=user, password=password, save_dir=save_dir, logger=logger ) try: # 店,系列,品类,年月,销售目标金额 logger.info('正在查询年月销售目标金额数据...') ur_bi_get_datas.get_sales_goal_amt() logger.info('查询年月销售目标金额数据完成!') except Exception as ex: logger.exception(ex) raise ex # 往外抛出异常 finally: ur_bi_get_datas.close() pass def getdata_ur_bi_dw( host='10.2.32.22', port=21051, database='ur_ai_dw', auth_mechanism='LDAP', user='urbi', password='Ur#730xd', save_dir='./data/sales_forecast/ur_bi_dw_data', logger=None ): get_common_datas( host=host, port=port, database=database, auth_mechanism=auth_mechanism, user=user, password=password, logger=logger ) get_datas( host=host, port=port, database=database, auth_mechanism=auth_mechanism, user=user, password=password, save_dir=save_dir, logger=logger ) pass # 代码入口 # getdata_ur_bi_dw( # host=ur_bi_dw_host, # port=ur_bi_dw_port, # database=ur_bi_dw_database, # auth_mechanism=ur_bi_dw_auth_mechanism, # user=ur_bi_dw_user, # password=ur_bi_dw_password, # save_dir=ur_bi_dw_save_dir, # logger=logger # )
각 클래스의 구체적인 기능 설명, 코드는 다음 텍스트 설명에 따라 "먹어야" 합니다.
(첫 번째 레이어) HiveHelper는 데이터베이스 연결, 데이터베이스 연결 닫기, 트랜잭션 생성, 실행, 엔진, 연결 및 기타 기능
VarsHelper는 디스크에 파일 형식으로 개체를 저장할 수 있는 간단한 지속성 기능을 제공합니다. 그리고 값을 설정하고, 값을 가져오고, 값이 존재하는지 판단하는 방법을 제공합니다.
GlobalShareArgs는 사전을 제공하고, 사전을 가져오고, 사전을 설정하고, 사전 키-값 쌍을 설정하고, 값을 설정하는 방법을 제공합니다. 사전 키 및 키가 사전에 있는지 판단합니다. 사전 업데이트와 같은 방법
ShareArgs는 사전 초기화에 더 많은 키-값 쌍이 있다는 점을 제외하고
(두 번째 레이어) UrBiGetDataBase class는 스레드 잠금 사전, 시간 사전, 시간 초과 판단 사전을 제공하며 모두 클래스 변수입니다. HiveHelper 클래스를 사용하지만 상속되지는 않습니다. 특정 SQL 판독에서는 스레드 고정 및 시간 판단이 제공됩니다
(세 번째 계층) UrBiGetDatas 클래스를 사용하여 날짜 데이터, 매장 데이터, 회원 데이터, 날씨 데이터, 날씨 도시 데이터, 제품 데이터 및 하이브 데이터베이스에서 저장 라이프사이클 데이터를 가져옵니다. , 국내 제품 라이프사이클 데이터, 제품 개발 코드 데이터, 실제 판매량, 실제 유통량, 베스트셀러 제품 데이터, 제품 원가 데이터, 사이즈 매핑 데이터 등
(4번째 레이어) get_common_data 함수, URBiGetData 클래스를 사용하여 날짜, 매장, 날씨, 날씨 도시, 상품, 실제 판매 데이터를 읽고 이를 ./yongjian/data/ur_bi_data 폴더 아래에 캐시합니다.
CustomUrBiGetData 클래스, 상속된 UrBiGetDatasBase 클래스 , 판매 목표 금액 및 포인트 계열 영역 데이터를 읽습니다.
(이것도 네 번째 레이어입니다.) get_datas 함수는 CustomUrBiGetData 클래스를 통해 연간 및 월간 판매 목표 금액을 읽어옵니다.
일반 함수: (일반 호출 입력 함수입니다) get_data_ur_bi_dw 함수, get_common_data 및 get_datas 함수를 호출하여 데이터를 읽은 다음 해당 데이터를 특정 폴더 디렉터리에 저장합니다.
유추하자면, 하이브 데이터베이스가 아닌 경우 첫 번째 레이어를 mysql로 대체하면 됩니다. 홈 페이지에서는 교체 방법을 설명합니다. 두 번째 레이어는 변경할 필요가 없습니다. 세 번째 레이어는 읽고 싶은 데이터 테이블입니다. 데이터베이스마다 읽고 싶은 데이터 테이블이 다르기 때문에 여기에 SQL을 작성하면 됩니다. 기본적으로 내부에 메소드를 적용하면 됩니다. SQL.
이 방법의 장점은 데이터를 반복해서 읽지 않고 읽은 데이터를 효율적으로 사용할 수 있다는 것입니다.
import logging import pandas as pd from impala.dbapi import connect import sqlalchemy from sqlalchemy.orm import sessionmaker import os import time import os import datetime from dateutil.relativedelta import relativedelta from typing import Dict, List import logging import threading import pandas as pd import pickle class MySqlHelper(object): def __init__( self, host='192.168.15.144', port=3306, database='test_ims', user='spkjz_writer', password='7cmoP3QDtueVJQj2q4Az', logger:logging.Logger=None ): self.host = host self.port = port self.database = database self.user = user self.password = password self.logger = logger self.connection_str = 'mysql+pymysql://%s:%s@%s:%d/%s' %( self.user, self.password, self.host, self.port, self.database ) self.conn = None self.cursor = None self.engine = None self.session = None def create_table_code(self, file_name): '''创建表类代码''' os.system(f'sqlacodegen {self.connection_str} > {file_name}') return self.conn def get_conn(self): '''创建连接或获取连接''' if self.conn is None: engine = self.get_engine() self.conn = engine.connect() return self.conn def get_engine(self): '''创建连接或获取连接''' if self.engine is None: self.engine = sqlalchemy.create_engine(self.connection_str) return self.engine def get_cursor(self): '''创建连接或获取连接''' if self.cursor is None: self.cursor = self.conn.cursor() return self.cursor def get_session(self) -> sessionmaker: '''创建连接或获取连接''' if self.session is None: engine = self.get_engine() Session = sessionmaker(bind=engine) self.session = Session() return self.session def close_conn(self): '''关闭连接''' if self.conn is not None: self.conn.close() self.conn = None self.dispose_engine() def close_session(self): '''关闭连接''' if self.session is not None: self.session.close() self.session = None self.dispose_engine() def dispose_engine(self): '''释放engine''' if self.engine is not None: # self.engine.dispose(close=False) self.engine.dispose() self.engine = None def close_cursor(self): '''关闭cursor''' if self.cursor is not None: self.cursor.close() self.cursor = None def get_data(self, sql, auto_close=True) -> pd.DataFrame: '''查询数据''' conn = self.get_conn() data = None try: # 异常重试3次 for i in range(3): try: data = pd.read_sql(sql, conn) break except Exception as ex: if i == 2: raise ex # 往外抛出异常 time.sleep(60) # 一分钟后重试 except Exception as ex: self.logger.exception(ex) raise ex # 往外抛出异常 finally: if auto_close: self.close_conn() return data pass class VarsHelper(): def __init__(self, save_dir, auto_save=True): self.save_dir = save_dir self.auto_save = auto_save self.values = {} if not os.path.exists(os.path.dirname(self.save_dir)): os.makedirs(os.path.dirname(self.save_dir)) if os.path.exists(self.save_dir): with open(self.save_dir, 'rb') as f: self.values = pickle.load(f) f.close() def set_value(self, key, value): self.values[key] = value if self.auto_save: self.save_file() def get_value(self, key): return self.values[key] def has_key(self, key): return key in self.values.keys() def save_file(self): with open(self.save_dir, 'wb') as f: pickle.dump(self.values, f) f.close() pass class GlobalShareArgs(): args = { "debug": False } def get_args(): return GlobalShareArgs.args def set_args(args): GlobalShareArgs.args = args def set_args_value(key, value): GlobalShareArgs.args[key] = value def get_args_value(key, default_value=None): return GlobalShareArgs.args.get(key, default_value) def contain_key(key): return key in GlobalShareArgs.args.keys() def update(args): GlobalShareArgs.args.update(args) pass class ShareArgs(): args = { "labels_dir":"./hjx/shop_group/month_w_amt/data/labels", # 标签目录 "labels_output_dir":"./hjx/shop_group/month_w_amt/data/labels_output", # 聚类导出标签目录 "common_datas_dir":"./hjx/data", # 共用数据目录。ur_bi_dw的公共 "only_predict": False, # 只识别,不训练 "delete_model": True, # 先删除模型,仅在训练时使用 "export_excel": False, # 导出excel "classes": 12, # 聚类数 "batch_size": 16, "hidden_size": 32, "max_nrof_epochs": 100, "learning_rate": 0.0005, "loss_type": "categorical_crossentropy", "avg_model_num": 10, "steps_per_epoch": 4.0, # 4.0 "lr_callback_patience": 4, "lr_callback_cooldown": 1, "early_stopping_callback_patience": 6, "get_data": True, } def get_args(): return ShareArgs.args def set_args(args): ShareArgs.args = args def set_args_value(key, value): ShareArgs.args[key] = value def get_args_value(key, default_value=None): return ShareArgs.args.get(key, default_value) def contain_key(key): return key in ShareArgs.args.keys() def update(args): ShareArgs.args.update(args) pass class IMSGetDatasBase(): # 线程锁列表,同保存路径共用锁 lock_dict:Dict[str, threading.Lock] = {} # 时间列表,用于判断是否超时 time_dict:Dict[str, datetime.datetime] = {} # 用于记录是否需要更新超时时间 get_data_timeout_dict:Dict[str, bool] = {} def __init__( self, host='192.168.15.144', port=3306, database='test_ims', user='spkjz_writer', password='Ur#7cmoP3QDtueVJQj2q4Az', save_dir=None, logger:logging.Logger=None, ): self.save_dir = save_dir self.logger = logger self.db_helper = MySqlHelper( host=host, port=port, database=database, user=user, password=password, logger=logger ) # 创建子目录 if self.save_dir is not None and not os.path.exists(self.save_dir): os.makedirs(self.save_dir) self.vars_helper = None if GlobalShareArgs.get_args_value('debug'): self.vars_helper = VarsHelper('./hjx/data/vars/IMSGetDatas') # 把超时时间保存到文件,注释该行即可停掉,只用于调试 def close(self): '''关闭连接''' self.db_helper.close_conn() def get_last_time(self, key_name) -> bool: '''获取是否超时''' # 转静态路径,确保唯一性 key_name = os.path.abspath(key_name) if self.vars_helper is not None and self.vars_helper.has_key('IMSGetDatasBase.time_list'): IMSGetDatasBase.time_dict = self.vars_helper.get_value('IMSGetDatasBase.time_list') timeout = 12 # 12小时 if GlobalShareArgs.get_args_value('debug'): timeout = 24 # 24小时 get_data_timeout = False if key_name not in IMSGetDatasBase.time_dict.keys() or (datetime.datetime.today() - IMSGetDatasBase.time_dict[key_name]).total_seconds()>(4*60*60): self.logger.info('超时%d小时,重新查数据:%s', timeout, key_name) # IMSGetDatasBase.time_list[key_name] = datetime.datetime.today() get_data_timeout = True else: self.logger.info('未超时%d小时,跳过查数据:%s', timeout, key_name) # if self.vars_helper is not None : # self.vars_helper.set_value('IMSGetDatasBase.time_list', IMSGetDatasBase.time_list) IMSGetDatasBase.get_data_timeout_dict[key_name] = get_data_timeout return get_data_timeout def save_last_time(self, key_name): '''更新状态超时''' # 转静态路径,确保唯一性 key_name = os.path.abspath(key_name) if IMSGetDatasBase.get_data_timeout_dict[key_name]: IMSGetDatasBase.time_dict[key_name] = datetime.datetime.today() if self.vars_helper is not None : IMSGetDatasBase.time_dict[key_name] = datetime.datetime.today() self.vars_helper.set_value('IMSGetDatasBase.time_list', IMSGetDatasBase.time_dict) def get_lock(self, key_name) -> threading.Lock: '''获取锁''' # 转静态路径,确保唯一性 key_name = os.path.abspath(key_name) if key_name not in IMSGetDatasBase.lock_dict.keys(): IMSGetDatasBase.lock_dict[key_name] = threading.Lock() return IMSGetDatasBase.lock_dict[key_name] def get_data_of_date( self, save_dir, sql, sort_columns:List[str], del_index_list=[-1], # 删除最后下标 start_date = datetime.datetime(2017, 1, 1), # 开始时间 offset = relativedelta(months=3), # 时间间隔 date_format_fun = lambda d: '%04d%02d01' % (d.year, d.month), # 查询语句中替代时间参数的格式化 filename_format_fun = lambda d: '%04d%02d.csv' % (d.year, d.month), # 查询语句中替代时间参数的格式化 stop_date = '20700101', # 超过时间则停止 ): '''分时间增量读取数据''' # 创建文件夹 if not os.path.exists(save_dir): os.makedirs(save_dir) else: #删除最后一个文件 file_list = os.listdir(save_dir) if len(file_list)>0: file_list.sort() for del_index in del_index_list: os.remove(os.path.join(save_dir,file_list[del_index])) print('删除最后一个文件:', file_list[del_index]) select_index = -1 # start_date = datetime.datetime(2017, 1, 1) while True: end_date = start_date + offset start_date_str = date_format_fun(start_date) end_date_str = date_format_fun(end_date) self.logger.info('date: %s-%s', start_date_str, end_date_str) file_path = os.path.join(save_dir, filename_format_fun(start_date)) # self.logger.info('file_path: %s', file_path) if not os.path.exists(file_path): data:pd.DataFrame = self.db_helper.get_data(sql % (start_date_str, end_date_str)) if data is None: break self.logger.info('data: %d', len(data)) # self.logger.info('data: %d', data.columns) if len(data)>0: select_index+=1 # 排序 data = data.sort_values(sort_columns) data.to_csv(file_path) elif select_index!=-1: break elif stop_date < start_date_str: raise Exception("读取数据异常,时间超出最大值!") start_date = end_date pass class CustomIMSGetDatas(IMSGetDatasBase): def __init__( self, host='192.168.13.134', port=4000, database='test_ims', user='root', password='rootimmsadmin', save_dir='./hjx/data/export_ims_data', logger:logging.Logger=None ): self.save_dir = save_dir self.logger = logger super().__init__( host=host, port=port, database=database, user=user, password=password, save_dir=save_dir, logger=logger ) def get_ims_w_amt_pro(self): '''年月系列占比数据''' file_path = os.path.join(self.save_dir,'ims_w_amt_pro.csv') now_lock = self.get_lock(file_path) now_lock.acquire() # 加锁 try: # 设置超时4小时才重新查数据 # if not self.get_last_time(file_path): # return sql = 'SELECT * FROM ims_w_amt_pro' data:pd.DataFrame = self.db_helper.get_data(sql) data = data.rename(columns={ 'serial_forecast_proportion': 'forecast_proportion', }) data.to_csv(file_path) # # 更新超时时间 # self.save_last_time(file_path) except Exception as ex: self.logger.exception(ex) raise ex # 往外抛出异常 finally: now_lock.release() # 释放锁 pass def get_datas( host='192.168.13.134', port=4000, database='test_ims', user='root', password='rootimmsadmin', save_dir='./hjx/data/export_ims_data', logger:logging.Logger=None ): ur_bi_get_datas = CustomIMSGetDatas( host=host, port=port, database=database, user=user, password=password, save_dir=save_dir, logger=logger ) try: # 年月系列占比数据 logger.info('正在查询年月系列占比数据...') ur_bi_get_datas.get_ims_w_amt_pro() logger.info('查询年月系列占比数据完成!') except Exception as ex: logger.exception(ex) raise ex # 往外抛出异常 finally: ur_bi_get_datas.close() pass def getdata_export_ims( host='192.168.13.134', port=4000, database='test_ims', user='root', password='rootimmsadmin', save_dir='./hjx/data/export_ims_data', logger:logging.Logger=None ): get_datas( host=host, port=port, database=database, user=user, password=password, save_dir=save_dir, logger=logger ) pass
위 내용은 Python을 사용하여 Hive 데이터베이스를 읽는 방법은 무엇입니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!