import logging
import pandas
as
pd
from impala.dbapi import connect
import sqlalchemy
from sqlalchemy.orm import sessionmaker
import os
import time
import os
import datetime
from dateutil.relativedelta import relativedelta
from typing import Dict, List
import logging
import threading
import pandas
as
pd
import pickle
class
HiveHelper(object):
def __init__(
self,
host='10.2.32.22',
port=21051,
database='ur_ai_dw',
auth_mechanism='LDAP',
user='urbi',
password='Ur#730xd',
logger:logging.Logger=None
):
self.host = host
self.port = port
self.database = database
self.auth_mechanism = auth_mechanism
self.user = user
self.password = password
self.logger = logger
self.impala_conn = None
self.conn = None
self.cursor = None
self.engine = None
self.session = None
def create_table_code(self, file_name):
'''创建表类代码'''
os.system(f'sqlacodegen {self.connection_str} > {file_name}')
return
self.conn
def get_conn(self):
'''创建连接或获取连接'''
if
self.conn is None:
engine = self.get_engine()
self.conn = engine.connect()
return
self.conn
def get_impala_conn(self):
'''创建连接或获取连接'''
if
self.impala_conn is None:
self.impala_conn = connect(
host=self.host,
port=self.port,
database=self.database,
auth_mechanism=self.auth_mechanism,
user=self.user,
password=self.password
)
return
self.impala_conn
def get_engine(self):
'''创建连接或获取连接'''
if
self.engine is None:
self.engine = sqlalchemy.create_engine('impala:
return
self.engine
def get_cursor(self):
'''创建连接或获取连接'''
if
self.cursor is None:
self.cursor = self.conn.cursor()
return
self.cursor
def get_session(self) -> sessionmaker:
'''创建连接或获取连接'''
if
self.session is None:
engine = self.get_engine()
Session = sessionmaker(bind=engine)
self.session = Session()
return
self.session
def close_conn(self):
'''关闭连接'''
if
self.conn is not None:
self.conn.close()
self.conn = None
self.dispose_engine()
self.close_impala_conn()
def close_impala_conn(self):
'''关闭impala连接'''
if
self.impala_conn is not None:
self.impala_conn.close()
self.impala_conn = None
def close_session(self):
'''关闭连接'''
if
self.session is not None:
self.session.close()
self.session = None
self.dispose_engine()
def dispose_engine(self):
'''释放engine'''
if
self.engine is not None:
# self.engine.dispose(close=False)
self.engine.dispose()
self.engine = None
def close_cursor(self):
'''关闭cursor'''
if
self.cursor is not None:
self.cursor.close()
self.cursor = None
def get_data(self, sql, auto_close=True) -> pd.DataFrame:
'''查询数据'''
conn = self.get_conn()
data = None
try
:
# 异常重试3次
for
i in range(3):
try
:
data = pd.read_sql(sql, conn)
break
except Exception
as
ex:
if
i == 2:
raise ex # 往外抛出异常
time.sleep(60) # 一分钟后重试
except Exception
as
ex:
self.logger.exception(ex)
raise ex # 往外抛出异常
finally:
if
auto_close:
self.close_conn()
return
data
pass
class
VarsHelper():
def __init__(self, save_dir, auto_save=True):
self.save_dir = save_dir
self.auto_save = auto_save
self.values = {}
if
not os.path.exists(os.path.dirname(self.save_dir)):
os.makedirs(os.path.dirname(self.save_dir))
if
os.path.exists(self.save_dir):
with open(self.save_dir, 'rb')
as
f:
self.values = pickle.load(f)
f.close()
def set_value(self, key, value):
self.values[key] = value
if
self.auto_save:
self.save_file()
def get_value(self, key):
return
self.values[key]
def has_key(self, key):
return
key in self.values.keys()
def save_file(self):
with open(self.save_dir, 'wb')
as
f:
pickle.dump(self.values, f)
f.close()
pass
class
GlobalShareArgs():
args = {
"debug": False
}
def get_args():
return
GlobalShareArgs.args
def set_args(args):
GlobalShareArgs.args = args
def set_args_value(key, value):
GlobalShareArgs.args[key] = value
def get_args_value(key, default_value=None):
return
GlobalShareArgs.args.get(key, default_value)
def contain_key(key):
return
key in GlobalShareArgs.args.keys()
def update(args):
GlobalShareArgs.args.update(args)
pass
class
ShareArgs():
args = {
"labels_dir":"./hjx/shop_group/month_w_amt/data/labels", # 标签目录
"labels_output_dir":"./hjx/shop_group/month_w_amt/data/labels_output", # 聚类导出标签目录
"common_datas_dir":"./hjx/data", # 共用数据目录。ur_bi_dw的公共
"only_predict": False, # 只识别,不训练
"delete_model": True, # 先删除模型,仅在训练时使用
"export_excel": False, # 导出excel
"classes": 12, # 聚类数
"batch_size": 16,
"hidden_size": 32,
"max_nrof_epochs": 100,
"learning_rate": 0.0005,
"loss_type": "categorical_crossentropy",
"avg_model_num": 10,
"steps_per_epoch": 4.0, # 4.0
"lr_callback_patience": 4,
"lr_callback_cooldown": 1,
"early_stopping_callback_patience": 6,
"get_data": True,
}
def get_args():
return
ShareArgs.args
def set_args(args):
ShareArgs.args = args
def set_args_value(key, value):
ShareArgs.args[key] = value
def get_args_value(key, default_value=None):
return
ShareArgs.args.get(key, default_value)
def contain_key(key):
return
key in ShareArgs.args.keys()
def update(args):
ShareArgs.args.update(args)
pass
class
UrBiGetDatasBase():
# 线程锁列表,同保存路径共用锁
lock_dict:Dict[str, threading.Lock] = {}
# 时间列表,用于判断是否超时
time_dict:Dict[str, datetime.datetime] = {}
# 用于记录是否需要更新超时时间
get_data_timeout_dict:Dict[str, bool] = {}
def __init__(
self,
host='10.2.32.22',
port=21051,
database='ur_ai_dw',
auth_mechanism='LDAP',
user='urbi',
password='Ur#730xd',
save_dir=None,
logger:logging.Logger=None,
):
self.save_dir = save_dir
self.logger = logger
self.db_helper = HiveHelper(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
logger=logger
)
# 创建子目录
if
self.save_dir is not None
and
not os.path.exists(self.save_dir):
os.makedirs(self.save_dir)
self.vars_helper = None
if
GlobalShareArgs.get_args_value('debug'):
self.vars_helper = VarsHelper('./hjx/data/vars/UrBiGetDatas')
def close(self):
'''关闭连接'''
self.db_helper.close_conn()
def get_last_time(self, key_name) -> bool:
'''获取是否超时'''
# 转静态路径,确保唯一性
key_name = os.path.abspath(key_name)
if
self.vars_helper is not None
and
self.vars_helper.has_key('UrBiGetDatasBase.time_list'):
UrBiGetDatasBase.time_dict = self.vars_helper.get_value('UrBiGetDatasBase.time_list')
timeout = 12 # 12小时
if
GlobalShareArgs.get_args_value('debug'):
timeout = 24 # 24小时
get_data_timeout = False
if
key_name not in UrBiGetDatasBase.time_dict.keys()
or
(datetime.datetime.today() - UrBiGetDatasBase.time_dict[key_name]).total_seconds()>(timeout*60*60):
self.logger.info('超时%d小时,重新查数据:%s', timeout, key_name)
# UrBiGetDatasBase.time_list[key_name] = datetime.datetime.today()
get_data_timeout = True
else
:
self.logger.info('未超时%d小时,跳过查数据:%s', timeout, key_name)
#
if
self.vars_helper is not None :
# self.vars_helper.set_value('UrBiGetDatasBase.time_list', UrBiGetDatasBase.time_list)
UrBiGetDatasBase.get_data_timeout_dict[key_name] = get_data_timeout
return
get_data_timeout
def save_last_time(self, key_name):
'''更新状态超时'''
# 转静态路径,确保唯一性
key_name = os.path.abspath(key_name)
if
UrBiGetDatasBase.get_data_timeout_dict[key_name]:
UrBiGetDatasBase.time_dict[key_name] = datetime.datetime.today()
if
self.vars_helper is not None :
UrBiGetDatasBase.time_dict[key_name] = datetime.datetime.today()
self.vars_helper.set_value('UrBiGetDatasBase.time_list', UrBiGetDatasBase.time_dict)
def get_lock(self, key_name) -> threading.Lock:
'''获取锁'''
# 转静态路径,确保唯一性
key_name = os.path.abspath(key_name)
if
key_name not in UrBiGetDatasBase.lock_dict.keys():
UrBiGetDatasBase.lock_dict[key_name] = threading.Lock()
return
UrBiGetDatasBase.lock_dict[key_name]
def get_data_of_date(
self,
save_dir,
sql,
sort_columns:List[str],
del_index_list=[-1], # 删除最后下标
start_date = datetime.datetime(2017, 1, 1), # 开始时间
offset = relativedelta(months=3), # 时间间隔
date_format_fun = lambda d: '%04d%02d01' % (d.year, d.month), # 查询语句中替代时间参数的格式化
filename_format_fun = lambda d: '%04d%02d.csv' % (d.year, d.month), # 查询语句中替代时间参数的格式化
stop_date = '20700101', # 超过时间则停止
data_format_fun = None, # 格式化数据
):
'''分时间增量读取数据'''
# 创建文件夹
if
not os.path.exists(save_dir):
os.makedirs(save_dir)
else
:
#删除最后一个文件
file_list = os.listdir(save_dir)
if
len(file_list)>0:
file_list.sort()
for
del_index in del_index_list:
os.remove(os.path.join(save_dir,file_list[del_index]))
print
('删除最后一个文件:', file_list[del_index])
select_index = -1
# start_date = datetime.datetime(2017, 1, 1)
while
True:
end_date = start_date + offset
start_date_str = date_format_fun(start_date)
end_date_str = date_format_fun(end_date)
self.logger.info('
date
: %s-%s', start_date_str, end_date_str)
file_path = os.path.join(save_dir, filename_format_fun(start_date))
# self.logger.info('file_path: %s', file_path)
if
not os.path.exists(file_path):
data:pd.DataFrame = self.db_helper.get_data(sql % (start_date_str, end_date_str))
if
data is None:
break
self.logger.info('data: %d', len(data))
# self.logger.info('data: %d', data.columns)
if
len(data)>0:
select_index+=1
if
data_format_fun is not None:
data = data_format_fun(data)
# 排序
data = data.sort_values(sort_columns)
data.to_csv(file_path)
elif select_index!=-1:
break
elif stop_date < start_date_str:
raise Exception("读取数据异常,时间超出最大值!")
start_date = end_date
pass
class
UrBiGetDatas(UrBiGetDatasBase):
def __init__(
self,
host='10.2.32.22',
port=21051,
database='ur_ai_dw',
auth_mechanism='LDAP',
user='urbi',
password='Ur#730xd',
save_dir='./hjx/data/ur_bi_dw_data',
logger:logging.Logger=None
):
self.save_dir = save_dir
self.logger = logger
super().__init__(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
save_dir=save_dir,
logger=logger
)
def get_dim_date(self):
'''日期数据'''
file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_date.csv')
now_lock = self.get_lock(file_path)
now_lock.acquire() # 加锁
try
:
# 设置超时4小时才重新查数据
if
not self.get_last_time(file_path):
return
sql = 'SELECT * FROM ur_bi_dw.dim_date'
data:pd.DataFrame = self.db_helper.get_data(sql)
columns = list(data.columns)
columns = {c:'dim_date.'+c
for
c in columns}
data = data.rename(columns=columns)
data = data.sort_values(['dim_date.date_key'])
data.to_csv(file_path)
# 更新超时时间
self.save_last_time(file_path)
except Exception
as
ex:
self.logger.exception(ex)
raise ex # 往外抛出异常
finally:
now_lock.release() # 释放锁
def get_dim_shop(self):
'''店铺数据'''
file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_shop.csv')
now_lock = self.get_lock(file_path)
now_lock.acquire() # 加锁
try
:
# 设置超时4小时才重新查数据
if
not self.get_last_time(file_path):
return
sql = 'SELECT * FROM ur_bi_dw.dim_shop'
data:pd.DataFrame = self.db_helper.get_data(sql)
columns = list(data.columns)
columns = {c:'dim_shop.'+c
for
c in columns}
data = data.rename(columns=columns)
data = data.sort_values(['dim_shop.shop_no'])
data.to_csv(file_path)
# 更新超时时间
self.save_last_time(file_path)
except Exception
as
ex:
self.logger.exception(ex)
raise ex # 往外抛出异常
finally:
now_lock.release() # 释放锁
def get_dim_vip(self):
'''会员数据'''
sub_dir = os.path.join(self.save_dir,'vip_no')
now_lock = self.get_lock(sub_dir)
now_lock.acquire() # 加锁
try
:
# 设置超时4小时才重新查数据
if
not self.get_last_time(sub_dir):
return
sql = '''SELECT dv.*, dd.date_key, dd.date_name2
FROM ur_bi_dw.dim_vip
as
dv
INNER JOIN ur_bi_dw.dim_date
as
dd
ON dv.card_create_date=dd.date_name2
where dd.date_key >= %s
and
dd.date_key < %s'''
# data:pd.DataFrame = self.db_helper.get_data(sql)
sort_columns = ['dv.vip_no']
# TODO:
self.get_data_of_date(
save_dir=sub_dir,
sql=sql,
sort_columns=sort_columns,
start_date=datetime.datetime(2017, 1, 1), # 开始时间
offset=relativedelta(years=1)
)
# 更新超时时间
self.save_last_time(sub_dir)
except Exception
as
ex:
self.logger.exception(ex)
raise ex # 往外抛出异常
finally:
now_lock.release() # 释放锁
def get_weather(self):
'''天气数据'''
sub_dir = os.path.join(self.save_dir,'weather')
now_lock = self.get_lock(sub_dir)
now_lock.acquire() # 加锁
try
:
# 设置超时4小时才重新查数据
if
not self.get_last_time(sub_dir):
return
sql = """
select weather.* from ur_bi_ods.ods_base_weather_data_1200
as
weather
where weather.date_key>=%s
and
weather.date_key<%s
"""
sort_columns = ['weather.date_key','weather.areaid']
def data_format_fun(data):
columns = list(data.columns)
columns = {c:'weather.'+c
for
c in columns}
data = data.rename(columns=columns)
return
data
self.get_data_of_date(
save_dir=sub_dir,
sql=sql,
sort_columns=sort_columns,
del_index_list=[-2, -1], # 删除最后下标
data_format_fun=data_format_fun,
)
# 更新超时时间
self.save_last_time(sub_dir)
except Exception
as
ex:
self.logger.exception(ex)
raise ex # 往外抛出异常
finally:
now_lock.release() # 释放锁
def get_weather_city(self):
'''天气城市数据'''
file_path = os.path.join(self.save_dir,'ur_bi_dw.weather_city.csv')
now_lock = self.get_lock(file_path)
now_lock.acquire() # 加锁
try
:
# 设置超时4小时才重新查数据
if
not self.get_last_time(file_path):
return
sql = 'SELECT * FROM ur_bi_dw.dim_weather_city
as
weather_city'
data:pd.DataFrame = self.db_helper.get_data(sql)
columns = list(data.columns)
columns = {c:'weather_city.'+c
for
c in columns}
data = data.rename(columns=columns)
data.to_csv(file_path)
# 更新超时时间
self.save_last_time(file_path)
except Exception
as
ex:
self.logger.exception(ex)
raise ex # 往外抛出异常
finally:
now_lock.release() # 释放锁
def get_dim_goods(self):
'''货品数据'''
file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_goods.csv')
now_lock = self.get_lock(file_path)
now_lock.acquire() # 加锁
try
:
# 设置超时4小时才重新查数据
if
not self.get_last_time(file_path):
return
sql = 'SELECT * FROM ur_bi_dw.dim_goods'
data:pd.DataFrame = self.db_helper.get_data(sql)
columns = list(data.columns)
columns = {c:'dim_goods.'+c
for
c in columns}
data = data.rename(columns=columns)
data.to_csv(file_path)
# 更新超时时间
self.save_last_time(file_path)
except Exception
as
ex:
self.logger.exception(ex)
raise ex # 往外抛出异常
finally:
now_lock.release() # 释放锁
def get_dim_goods_market_shop_date(self):
'''店铺商品生命周期数据'''
file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_goods_market_shop_date.csv')
now_lock = self.get_lock(file_path)
now_lock.acquire() # 加锁
try
:
# 设置超时4小时才重新查数据
if
not self.get_last_time(file_path):
return
# sql = 'SELECT * FROM ur_bi_dw.dim_goods_market_shop_date
as
goods_shop_date'
sql = '''
select shop_no, sku_no, shop_market_date, lifecycle_end_date, lifecycle_days
FROM ur_bi_dw.dim_goods_market_shop_date
where lifecycle_end_date is not null
'''
data:pd.DataFrame = self.db_helper.get_data(sql)
columns = list(data.columns)
columns = {c:c.replace('lifecycle_end_date.','')
for
c in columns}
data = data.rename(columns=columns)
data = data.sort_values(['shop_market_date'])
data.to_csv(file_path, index=False)
# 更新超时时间
self.save_last_time(file_path)
except Exception
as
ex:
self.logger.exception(ex)
raise ex # 往外抛出异常
finally:
now_lock.release() # 释放锁
def get_dim_goods_market_date(self):
'''全国商品生命周期数据'''
file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_goods_market_date.csv')
now_lock = self.get_lock(file_path)
now_lock.acquire() # 加锁
try
:
# 设置超时4小时才重新查数据
if
not self.get_last_time(file_path):
return
sql = '''
select * FROM ur_bi_dw.dim_goods_market_date
'''
data:pd.DataFrame = self.db_helper.get_data(sql)
columns = list(data.columns)
columns = {c:'dim_goods_market_date.'+c
for
c in columns}
data = data.rename(columns=columns)
data = data.sort_values(['dim_goods_market_date.sku_no'])
data.to_csv(file_path, index=False)
# 更新超时时间
self.save_last_time(file_path)
except Exception
as
ex:
self.logger.exception(ex)
raise ex # 往外抛出异常
finally:
now_lock.release() # 释放锁
def get_dim_goods_color_dev_sizes(self):
'''商品开发码数数据'''
file_path = os.path.join(self.save_dir,'dim_goods_color_dev_sizes.csv')
now_lock = self.get_lock(file_path)
now_lock.acquire() # 加锁
try
:
# 设置超时4小时才重新查数据
if
not self.get_last_time(file_path):
return
# sql = 'SELECT * FROM ur_bi_dw.dim_goods_market_shop_date
as
goods_shop_date'
sql = 'SELECT * FROM ur_bi_dm.dim_goods_color_dev_sizes'
data:pd.DataFrame = self.db_helper.get_data(sql)
columns = list(data.columns)
columns = {c:c.replace('dim_goods_color_dev_sizes.','')
for
c in columns}
data = data.rename(columns=columns)
data.to_csv(file_path, index=False)
# 更新超时时间
self.save_last_time(file_path)
except Exception
as
ex:
self.logger.exception(ex)
raise ex # 往外抛出异常
finally:
now_lock.release() # 释放锁
def get_dwd_daily_sales_size(self):
'''实际销售金额'''
sub_dir = os.path.join(self.save_dir,'dwd_daily_sales_size_all')
now_lock = self.get_lock(sub_dir)
now_lock.acquire() # 加锁
try
:
# 设置超时4小时才重新查数据
if
not self.get_last_time(sub_dir):
return
sql = """
select shop_no,sku_no,date_key,`size`,
sum(tag_price)
as
`tag_price`,
sum(sales_qty)
as
`sales_qty`,
sum(sales_tag_amt)
as
`sales_tag_amt`,
sum(sales_amt)
as
`sales_amt`,
count
(0)
as
`sales_count`
from ur_bi_dw.dwd_daily_sales_size
as
sales
where sales.date_key>=%s
and
sales.date_key<%s
and
sales.currency_code='CNY'
group by shop_no,sku_no,date_key,`size`
"""
sort_columns = ['date_key','shop_no','sku_no']
self.get_data_of_date(
save_dir=sub_dir,
sql=sql,
sort_columns=sort_columns,
start_date=datetime.datetime(2017, 1, 1), # 开始时间
)
# 更新超时时间
self.save_last_time(sub_dir)
except Exception
as
ex:
self.logger.exception(ex)
raise ex # 往外抛出异常
finally:
now_lock.release() # 释放锁
def get_dwd_daily_delivery_size(self):
'''实际配货金额'''
sub_dir = os.path.join(self.save_dir,'dwd_daily_delivery_size_all')
now_lock = self.get_lock(sub_dir)
now_lock.acquire() # 加锁
try
:
# 设置超时4小时才重新查数据
if
not self.get_last_time(sub_dir):
return
sql = """
select shop_no,sku_no,date_key,`size`,
sum(delivery.shop_distr_received_qty)
as
`shop_distr_received_qty`,
sum(delivery.shop_distr_received_amt)
as
`shop_distr_received_amt`,
sum(delivery.online_distr_received_qty)
as
`online_distr_received_qty`,
sum(delivery.online_distr_received_amt)
as
`online_distr_received_amt`,
sum(delivery.pr_received_qty)
as
`pr_received_qty`,
count
(0)
as
`delivery_count`
from ur_bi_dw.dwd_daily_delivery_size
as
delivery
where delivery.date_key>=%s
and
delivery.date_key<%s
and
delivery.currency_code='CNY'
group by shop_no,sku_no,date_key,`size`
"""
sort_columns = ['date_key','shop_no','sku_no']
self.get_data_of_date(
save_dir=sub_dir,
sql=sql,
sort_columns=sort_columns,
start_date=datetime.datetime(2017, 1, 1), # 开始时间
)
# 更新超时时间
self.save_last_time(sub_dir)
except Exception
as
ex:
self.logger.exception(ex)
raise ex # 往外抛出异常
finally:
now_lock.release() # 释放锁
def get_v_last_nation_sales_status(self):
'''商品畅滞销数据'''
file_path = os.path.join(self.save_dir,'v_last_nation_sales_status.csv')
now_lock = self.get_lock(file_path)
now_lock.acquire() # 加锁
try
:
# 设置超时4小时才重新查数据
if
not self.get_last_time(file_path):
return
sql = 'SELECT * FROM ur_bi_dw.v_last_nation_sales_status'
data:pd.DataFrame = self.db_helper.get_data(sql)
columns = list(data.columns)
columns = {c:c.replace('v_last_nation_sales_status.','')
for
c in columns}
data = data.rename(columns=columns)
data.to_csv(file_path, index=False)
# 更新超时时间
self.save_last_time(file_path)
except Exception
as
ex:
self.logger.exception(ex)
raise ex # 往外抛出异常
finally:
now_lock.release() # 释放锁
def get_dwd_daily_finacial_goods(self):
'''商品成本价数据'''
file_path = os.path.join(self.save_dir,'dwd_daily_finacial_goods.csv')
now_lock = self.get_lock(file_path)
now_lock.acquire() # 加锁
try
:
# 设置超时4小时才重新查数据
if
not self.get_last_time(file_path):
return
sql = """
select t1.sku_no,t1.`size`,t1.cost_tax_incl from ur_bi_dw.dwd_daily_finacial_goods
as
t1
inner join (
select sku_no,`size`,max(date_key)
as
date_key
from ur_bi_dw.dwd_daily_finacial_goods
where currency_code='CNY'
and
country_code='CN'
group by sku_no,`size`
)
as
t2
on t2.sku_no=t1.sku_no
and
t2.`size`=t1.`size`
and
t2.date_key=t1.date_key
where t1.currency_code='CNY'
and
t1.country_code='CN'
"""
data:pd.DataFrame = self.db_helper.get_data(sql)
columns = list(data.columns)
columns = {c:c.replace('t1.','')
for
c in columns}
data = data.rename(columns=columns)
data.to_csv(file_path, index=False)
# 更新超时时间
self.save_last_time(file_path)
except Exception
as
ex:
self.logger.exception(ex)
raise ex # 往外抛出异常
finally:
now_lock.release() # 释放锁
def get_dim_size_group(self):
'''尺码映射数据'''
file_path = os.path.join(self.save_dir,'dim_size_group.csv')
now_lock = self.get_lock(file_path)
now_lock.acquire() # 加锁
try
:
# 设置超时4小时才重新查数据
if
not self.get_last_time(file_path):
return
sql = """select * from ur_bi_dw.dim_size_group"""
data:pd.DataFrame = self.db_helper.get_data(sql)
columns = list(data.columns)
columns = {c:c.replace('dim_size_group.','')
for
c in columns}
data = data.rename(columns=columns)
data.to_csv(file_path, index=False)
# 更新超时时间
self.save_last_time(file_path)
except Exception
as
ex:
self.logger.exception(ex)
raise ex # 往外抛出异常
finally:
now_lock.release() # 释放锁
pass
def get_common_datas(
host='10.2.32.22',
port=21051,
database='ur_ai_dw',
auth_mechanism='LDAP',
user='urbi',
password='Ur#730xd',
logger:logging.Logger=None):
# 共用文件
common_datas_dir = ShareArgs.get_args_value('common_datas_dir')
common_ur_bi_dir = os.path.join(common_datas_dir, 'ur_bi_data')
ur_bi_get_datas = UrBiGetDatas(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
save_dir=common_ur_bi_dir,
logger=logger
)
try
:
logger.info('正在查询日期数据...')
ur_bi_get_datas.get_dim_date()
logger.info('查询日期数据完成!')
logger.info('正在查询店铺数据...')
ur_bi_get_datas.get_dim_shop()
logger.info('查询店铺数据完成!')
logger.info('正在查询天气数据...')
ur_bi_get_datas.get_weather()
logger.info('查询天气数据完成!')
logger.info('正在查询天气城市数据...')
ur_bi_get_datas.get_weather_city()
logger.info('查询天气城市数据完成!')
logger.info('正在查询货品数据...')
ur_bi_get_datas.get_dim_goods()
logger.info('查询货品数据完成!')
logger.info('正在查询实际销量数据...')
ur_bi_get_datas.get_dwd_daily_sales_size()
logger.info('查询实际销量数据完成!')
except Exception
as
ex:
logger.exception(ex)
raise ex # 往外抛出异常
finally:
ur_bi_get_datas.close()
pass
class
CustomUrBiGetDatas(UrBiGetDatasBase):
def __init__(
self,
host='10.2.32.22',
port=21051,
database='ur_ai_dw',
auth_mechanism='LDAP',
user='urbi',
password='Ur#730xd',
save_dir='./hjx/data/ur_bi_data',
logger:logging.Logger=None
):
self.save_dir = save_dir
self.logger = logger
super().__init__(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
save_dir=save_dir,
logger=logger
)
def get_sales_goal_amt(self):
'''销售目标金额'''
file_path = os.path.join(self.save_dir,'month_of_year_sales_goal_amt.csv')
now_lock = self.get_lock(file_path)
now_lock.acquire() # 加锁
try
:
# 设置超时4小时才重新查数据
if
not self.get_last_time(file_path):
return
sql = '''
select sales_goal.shop_no,
if
(sales_goal.serial='Y','W',sales_goal.serial)
as
`sales_goal.serial`,
dates.month_of_year,
sum(sales_goal.sales_goal_amt)
as
sales_goal_amt
from ur_bi_dw.dwd_sales_goal_west
as
sales_goal
inner join ur_bi_dw.dim_date
as
dates
on sales_goal.date_key = dates.date_key
group by sales_goal.shop_no,
if
(sales_goal.serial='Y','W',sales_goal.serial),
dates.month_of_year
'''
data:pd.DataFrame = self.db_helper.get_data(sql)
data = data.rename(columns={
'shop_no':'sales_goal.shop_no',
'serial':'sales_goal.serial',
'month_of_year':'dates.month_of_year',
})
# 排序
data = data.sort_values(['sales_goal.shop_no','sales_goal.serial','dates.month_of_year'])
data.to_csv(file_path)
# 更新超时时间
self.save_last_time(file_path)
except Exception
as
ex:
self.logger.exception(ex)
raise ex # 往外抛出异常
finally:
now_lock.release() # 释放锁
def get_shop_serial_area(self):
'''店-系列面积'''
file_path = os.path.join(self.save_dir,'shop_serial_area.csv')
now_lock = self.get_lock(file_path)
now_lock.acquire() # 加锁
try
:
# 设置超时4小时才重新查数据
if
not self.get_last_time(file_path):
return
sql = '''
select shop_serial_area.shop_no,
if
(shop_serial_area.serial='Y','W',shop_serial_area.serial)
as
`shop_serial_area.serial`,
shop_serial_area.month_of_year,
sum(shop_serial_area.area)
as
`shop_serial_area.area`
from ur_bi_dw.dwd_shop_serial_area
as
shop_serial_area
where shop_serial_area.area is not null
group by shop_serial_area.shop_no,
if
(shop_serial_area.serial='Y','W',shop_serial_area.serial),shop_serial_area.month_of_year
'''
data:pd.DataFrame = self.db_helper.get_data(sql)
data = data.rename(columns={
'shop_no':'shop_serial_area.shop_no',
'serial':'shop_serial_area.serial',
'month_of_year':'shop_serial_area.month_of_year',
'area':'shop_serial_area.area',
})
# 排序
data = data.sort_values(['shop_serial_area.shop_no','shop_serial_area.serial','shop_serial_area.month_of_year'])
data.to_csv(file_path)
# 更新超时时间
self.save_last_time(file_path)
except Exception
as
ex:
self.logger.exception(ex)
raise ex # 往外抛出异常
finally:
now_lock.release() # 释放锁
pass
def get_datas(
host='10.2.32.22',
port=21051,
database='ur_ai_dw',
auth_mechanism='LDAP',
user='urbi',
password='Ur#730xd',
save_dir='./data/sales_forecast/ur_bi_dw_data',
logger:logging.Logger=None):
ur_bi_get_datas = CustomUrBiGetDatas(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
save_dir=save_dir,
logger=logger
)
try
:
# 店,系列,品类,年月,销售目标金额
logger.info('正在查询年月销售目标金额数据...')
ur_bi_get_datas.get_sales_goal_amt()
logger.info('查询年月销售目标金额数据完成!')
except Exception
as
ex:
logger.exception(ex)
raise ex # 往外抛出异常
finally:
ur_bi_get_datas.close()
pass
def getdata_ur_bi_dw(
host='10.2.32.22',
port=21051,
database='ur_ai_dw',
auth_mechanism='LDAP',
user='urbi',
password='Ur#730xd',
save_dir='./data/sales_forecast/ur_bi_dw_data',
logger=None
):
get_common_datas(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
logger=logger
)
get_datas(
host=host,
port=port,
database=database,
auth_mechanism=auth_mechanism,
user=user,
password=password,
save_dir=save_dir,
logger=logger
)
pass
# 代码入口
# getdata_ur_bi_dw(
# host=ur_bi_dw_host,
# port=ur_bi_dw_port,
# database=ur_bi_dw_database,
# auth_mechanism=ur_bi_dw_auth_mechanism,
# user=ur_bi_dw_user,
# password=ur_bi_dw_password,
# save_dir=ur_bi_dw_save_dir,
# logger=logger
# )