Jadual Kandungan
Kod perniagaan sebenar untuk membaca pangkalan data hive
Perihalan dan pemahaman kod
Lampirkan kod contoh yang diubah suai kepada mysql
Rumah pembangunan bahagian belakang Tutorial Python Bagaimana untuk membaca pangkalan data Hive menggunakan Python?

Bagaimana untuk membaca pangkalan data Hive menggunakan Python?

May 09, 2023 pm 04:28 PM
python hive

Kod perniagaan sebenar untuk membaca pangkalan data hive

import logging
import pandas as pd
from impala.dbapi import connect
import sqlalchemy
from sqlalchemy.orm import sessionmaker
import os
import time
import os
import datetime
from dateutil.relativedelta import relativedelta
from typing import Dict, List
import logging
import threading
import pandas as pd
import pickle
class HiveHelper(object):
    def __init__(
        self,
        host='10.2.32.22',
        port=21051,
        database='ur_ai_dw',
        auth_mechanism='LDAP',
        user='urbi',
        password='Ur#730xd',
        logger:logging.Logger=None
        ):
        self.host = host
        self.port = port
        self.database = database
        self.auth_mechanism = auth_mechanism
        self.user = user
        self.password = password
        self.logger = logger
        self.impala_conn = None
        self.conn = None
        self.cursor = None
        self.engine = None
        self.session = None
    def create_table_code(self, file_name):
        '''创建表类代码'''
        os.system(f'sqlacodegen {self.connection_str} > {file_name}')
        return self.conn
    def get_conn(self):
        '''创建连接或获取连接'''
        if self.conn is None:
            engine = self.get_engine()
            self.conn = engine.connect()
        return self.conn
    def get_impala_conn(self):
        '''创建连接或获取连接'''
        if self.impala_conn is None:
            self.impala_conn = connect(
                host=self.host,
                port=self.port,
                database=self.database,
                auth_mechanism=self.auth_mechanism,
                user=self.user,
                password=self.password
                )
        return self.impala_conn
    def get_engine(self):
        '''创建连接或获取连接'''
        if self.engine is None:
            self.engine = sqlalchemy.create_engine('impala://', creator=self.get_impala_conn)
        return self.engine
    def get_cursor(self):
        '''创建连接或获取连接'''
        if self.cursor is None:
            self.cursor = self.conn.cursor()
        return self.cursor
    def get_session(self) -> sessionmaker:
        '''创建连接或获取连接'''
        if self.session is None:
            engine = self.get_engine()
            Session = sessionmaker(bind=engine)
            self.session = Session()
        return self.session
    def close_conn(self):
        '''关闭连接'''
        if self.conn is not None:
            self.conn.close()
            self.conn = None
        self.dispose_engine()
        self.close_impala_conn()
    def close_impala_conn(self):
        '''关闭impala连接'''
        if self.impala_conn is not None:
            self.impala_conn.close()
            self.impala_conn = None
    def close_session(self):
        '''关闭连接'''
        if self.session is not None:
            self.session.close()
            self.session = None
        self.dispose_engine()
    def dispose_engine(self):
        '''释放engine'''
        if self.engine is not None:
            # self.engine.dispose(close=False)
            self.engine.dispose()
            self.engine = None
    def close_cursor(self):
        '''关闭cursor'''
        if self.cursor is not None:
            self.cursor.close()
            self.cursor = None
    def get_data(self, sql, auto_close=True) -> pd.DataFrame:
        '''查询数据'''
        conn = self.get_conn()
        data = None
        try:
            # 异常重试3次
            for i in range(3):
                try:
                    data = pd.read_sql(sql, conn)
                    break
                except Exception as ex:
                    if i == 2:
                        raise ex # 往外抛出异常
                    time.sleep(60) # 一分钟后重试
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            if auto_close:
                self.close_conn()
        return data
pass
class VarsHelper():
    def __init__(self, save_dir, auto_save=True):
        self.save_dir = save_dir
        self.auto_save = auto_save
        self.values = {}
        if not os.path.exists(os.path.dirname(self.save_dir)):
            os.makedirs(os.path.dirname(self.save_dir))
        if os.path.exists(self.save_dir):
            with open(self.save_dir, 'rb') as f:
                self.values = pickle.load(f)
                f.close()
    def set_value(self, key, value):
        self.values[key] = value
        if self.auto_save:
            self.save_file()
    def get_value(self, key):
        return self.values[key]
    def has_key(self, key):
        return key in self.values.keys()
    def save_file(self):
        with open(self.save_dir, 'wb') as f:
            pickle.dump(self.values, f)
            f.close()
pass
class GlobalShareArgs():
    args = {
        "debug": False
    }
    def get_args():
        return GlobalShareArgs.args
    def set_args(args):
        GlobalShareArgs.args = args
    def set_args_value(key, value):
        GlobalShareArgs.args[key] = value
    def get_args_value(key, default_value=None):
        return GlobalShareArgs.args.get(key, default_value)
    def contain_key(key):
        return key in GlobalShareArgs.args.keys()
    def update(args):
        GlobalShareArgs.args.update(args)
pass
class ShareArgs():
    args = {
        "labels_dir":"./hjx/shop_group/month_w_amt/data/labels", # 标签目录
        "labels_output_dir":"./hjx/shop_group/month_w_amt/data/labels_output", # 聚类导出标签目录
        "common_datas_dir":"./hjx/data", # 共用数据目录。ur_bi_dw的公共
        "only_predict": False, # 只识别,不训练
        "delete_model": True, # 先删除模型,仅在训练时使用
        "export_excel": False, # 导出excel
        "classes": 12, # 聚类数
        "batch_size": 16,
        "hidden_size": 32,
        "max_nrof_epochs": 100,
        "learning_rate": 0.0005,
        "loss_type": "categorical_crossentropy",
        "avg_model_num": 10,
        "steps_per_epoch": 4.0, # 4.0
        "lr_callback_patience": 4, 
        "lr_callback_cooldown": 1,
        "early_stopping_callback_patience": 6,
        "get_data": True,
    }
    def get_args():
        return ShareArgs.args
    def set_args(args):
        ShareArgs.args = args
    def set_args_value(key, value):
        ShareArgs.args[key] = value
    def get_args_value(key, default_value=None):
        return ShareArgs.args.get(key, default_value)
    def contain_key(key):
        return key in ShareArgs.args.keys()
    def update(args):
        ShareArgs.args.update(args)
pass
class UrBiGetDatasBase():
    # 线程锁列表,同保存路径共用锁
    lock_dict:Dict[str, threading.Lock] = {}
    # 时间列表,用于判断是否超时
    time_dict:Dict[str, datetime.datetime] = {}
    # 用于记录是否需要更新超时时间
    get_data_timeout_dict:Dict[str, bool] = {}
    def __init__(
        self,
        host='10.2.32.22',
        port=21051,
        database='ur_ai_dw',
        auth_mechanism='LDAP',
        user='urbi',
        password='Ur#730xd',
        save_dir=None,
        logger:logging.Logger=None,
        ):
        self.save_dir = save_dir
        self.logger = logger
        self.db_helper = HiveHelper(
            host=host,
            port=port,
            database=database,
            auth_mechanism=auth_mechanism,
            user=user,
            password=password,
            logger=logger
            )
        # 创建子目录
        if self.save_dir is not None and not os.path.exists(self.save_dir):
            os.makedirs(self.save_dir)
        self.vars_helper = None
        if GlobalShareArgs.get_args_value('debug'):
            self.vars_helper = VarsHelper('./hjx/data/vars/UrBiGetDatas') 
    def close(self):
        '''关闭连接'''
        self.db_helper.close_conn()
    def get_last_time(self, key_name) -> bool:
        '''获取是否超时'''
        # 转静态路径,确保唯一性
        key_name = os.path.abspath(key_name)
        if self.vars_helper is not None and self.vars_helper.has_key('UrBiGetDatasBase.time_list'):
            UrBiGetDatasBase.time_dict = self.vars_helper.get_value('UrBiGetDatasBase.time_list')
        timeout = 12 # 12小时
        if GlobalShareArgs.get_args_value('debug'):
            timeout = 24 # 24小时
        get_data_timeout = False
        if key_name not in UrBiGetDatasBase.time_dict.keys() or (datetime.datetime.today() - UrBiGetDatasBase.time_dict[key_name]).total_seconds()>(timeout*60*60):
            self.logger.info('超时%d小时,重新查数据:%s', timeout, key_name)
            # UrBiGetDatasBase.time_list[key_name] = datetime.datetime.today()
            get_data_timeout = True
        else:
            self.logger.info('未超时%d小时,跳过查数据:%s', timeout, key_name)
        # if self.vars_helper is not None :
        #     self.vars_helper.set_value('UrBiGetDatasBase.time_list', UrBiGetDatasBase.time_list)
        UrBiGetDatasBase.get_data_timeout_dict[key_name] = get_data_timeout
        return get_data_timeout
    def save_last_time(self, key_name):
        '''更新状态超时'''
        # 转静态路径,确保唯一性
        key_name = os.path.abspath(key_name)
        if UrBiGetDatasBase.get_data_timeout_dict[key_name]:
            UrBiGetDatasBase.time_dict[key_name] = datetime.datetime.today()
        if self.vars_helper is not None :
            UrBiGetDatasBase.time_dict[key_name] = datetime.datetime.today()
            self.vars_helper.set_value('UrBiGetDatasBase.time_list', UrBiGetDatasBase.time_dict)
    def get_lock(self, key_name) -> threading.Lock:
        '''获取锁'''
        # 转静态路径,确保唯一性
        key_name = os.path.abspath(key_name)
        if key_name not in UrBiGetDatasBase.lock_dict.keys():
            UrBiGetDatasBase.lock_dict[key_name] = threading.Lock()
        return UrBiGetDatasBase.lock_dict[key_name]
    def get_data_of_date(
        self,
        save_dir,
        sql,
        sort_columns:List[str],
        del_index_list=[-1], # 删除最后下标
        start_date = datetime.datetime(2017, 1, 1), # 开始时间
        offset = relativedelta(months=3), # 时间间隔
        date_format_fun = lambda d: '%04d%02d01' % (d.year, d.month), # 查询语句中替代时间参数的格式化
        filename_format_fun = lambda d: '%04d%02d.csv' % (d.year, d.month), # 查询语句中替代时间参数的格式化
        stop_date = '20700101', # 超过时间则停止
        data_format_fun = None, # 格式化数据
        ):
        '''分时间增量读取数据'''
        # 创建文件夹
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)
        else:
            #删除最后一个文件
            file_list = os.listdir(save_dir)
            if len(file_list)>0:
                file_list.sort()
                for del_index in del_index_list:
                    os.remove(os.path.join(save_dir,file_list[del_index]))
                    print('删除最后一个文件:', file_list[del_index])
        select_index = -1
        # start_date = datetime.datetime(2017, 1, 1)
        while True:
            end_date = start_date + offset
            start_date_str = date_format_fun(start_date)
            end_date_str = date_format_fun(end_date)
            self.logger.info('date: %s-%s', start_date_str, end_date_str)
            file_path = os.path.join(save_dir, filename_format_fun(start_date))
            # self.logger.info('file_path: %s', file_path)
            if not os.path.exists(file_path):
                data:pd.DataFrame = self.db_helper.get_data(sql % (start_date_str, end_date_str))
                if data is None:
                    break
                self.logger.info('data: %d', len(data))
                # self.logger.info('data: %d', data.columns)
                if len(data)>0:
                    select_index+=1
                    if data_format_fun is not None:
                        data = data_format_fun(data)
                    # 排序
                    data = data.sort_values(sort_columns)
                    data.to_csv(file_path)
                elif select_index!=-1:
                    break
                elif stop_date < start_date_str:
                    raise Exception("读取数据异常,时间超出最大值!")
            start_date = end_date
pass
class UrBiGetDatas(UrBiGetDatasBase):
    def __init__(
        self,
        host=&#39;10.2.32.22&#39;,
        port=21051,
        database=&#39;ur_ai_dw&#39;,
        auth_mechanism=&#39;LDAP&#39;,
        user=&#39;urbi&#39;,
        password=&#39;Ur#730xd&#39;,
        save_dir=&#39;./hjx/data/ur_bi_dw_data&#39;,
        logger:logging.Logger=None
        ):
        self.save_dir = save_dir
        self.logger = logger
        super().__init__(
            host=host,
            port=port,
            database=database,
            auth_mechanism=auth_mechanism,
            user=user,
            password=password,
            save_dir=save_dir,
            logger=logger
            )
    def get_dim_date(self):
        &#39;&#39;&#39;日期数据&#39;&#39;&#39;
        file_path = os.path.join(self.save_dir,&#39;ur_bi_dw.dim_date.csv&#39;)
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = &#39;SELECT * FROM ur_bi_dw.dim_date&#39;
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:&#39;dim_date.&#39;+c for c in columns}
            data = data.rename(columns=columns)
            data = data.sort_values([&#39;dim_date.date_key&#39;])
            data.to_csv(file_path)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_shop(self):
        &#39;&#39;&#39;店铺数据&#39;&#39;&#39;
        file_path = os.path.join(self.save_dir,&#39;ur_bi_dw.dim_shop.csv&#39;)
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = &#39;SELECT * FROM ur_bi_dw.dim_shop&#39;
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:&#39;dim_shop.&#39;+c for c in columns}
            data = data.rename(columns=columns)
            data = data.sort_values([&#39;dim_shop.shop_no&#39;])
            data.to_csv(file_path)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_vip(self):
        &#39;&#39;&#39;会员数据&#39;&#39;&#39;
        sub_dir = os.path.join(self.save_dir,&#39;vip_no&#39;)
        now_lock = self.get_lock(sub_dir)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(sub_dir):
                return
            sql = &#39;&#39;&#39;SELECT dv.*, dd.date_key, dd.date_name2 
            FROM ur_bi_dw.dim_vip as dv
            INNER JOIN ur_bi_dw.dim_date as dd
            ON dv.card_create_date=dd.date_name2 
            where dd.date_key >= %s
            and dd.date_key < %s&#39;&#39;&#39;
            # data:pd.DataFrame = self.db_helper.get_data(sql)
            sort_columns = [&#39;dv.vip_no&#39;]
            # TODO:
            self.get_data_of_date(
                save_dir=sub_dir,
                sql=sql,
                sort_columns=sort_columns,
                start_date=datetime.datetime(2017, 1, 1), # 开始时间
                offset=relativedelta(years=1)
            )
            # 更新超时时间
            self.save_last_time(sub_dir)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_weather(self):
        &#39;&#39;&#39;天气数据&#39;&#39;&#39;
        sub_dir = os.path.join(self.save_dir,&#39;weather&#39;)
        now_lock = self.get_lock(sub_dir)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(sub_dir):
                return
            sql = """
            select weather.* from ur_bi_ods.ods_base_weather_data_1200 as weather
            where weather.date_key>=%s and weather.date_key<%s
            """
            sort_columns = [&#39;weather.date_key&#39;,&#39;weather.areaid&#39;]
            def data_format_fun(data):
                columns = list(data.columns)
                columns = {c:&#39;weather.&#39;+c for c in columns}
                data = data.rename(columns=columns)
                return data
            self.get_data_of_date(
                save_dir=sub_dir,
                sql=sql,
                sort_columns=sort_columns,
                del_index_list=[-2, -1], # 删除最后下标
                data_format_fun=data_format_fun,
            )
            # 更新超时时间
            self.save_last_time(sub_dir)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_weather_city(self):
        &#39;&#39;&#39;天气城市数据&#39;&#39;&#39;
        file_path = os.path.join(self.save_dir,&#39;ur_bi_dw.weather_city.csv&#39;)
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = &#39;SELECT * FROM ur_bi_dw.dim_weather_city as weather_city&#39;
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:&#39;weather_city.&#39;+c for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_goods(self):
        &#39;&#39;&#39;货品数据&#39;&#39;&#39;
        file_path = os.path.join(self.save_dir,&#39;ur_bi_dw.dim_goods.csv&#39;)
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = &#39;SELECT * FROM ur_bi_dw.dim_goods&#39;
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:&#39;dim_goods.&#39;+c for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_goods_market_shop_date(self):
        &#39;&#39;&#39;店铺商品生命周期数据&#39;&#39;&#39;
        file_path = os.path.join(self.save_dir,&#39;ur_bi_dw.dim_goods_market_shop_date.csv&#39;)
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            # sql = &#39;SELECT * FROM ur_bi_dw.dim_goods_market_shop_date as goods_shop_date&#39;
            sql = &#39;&#39;&#39;
            select shop_no, sku_no, shop_market_date, lifecycle_end_date, lifecycle_days
            FROM ur_bi_dw.dim_goods_market_shop_date
            where lifecycle_end_date is not null
            &#39;&#39;&#39;
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace(&#39;lifecycle_end_date.&#39;,&#39;&#39;) for c in columns}
            data = data.rename(columns=columns)
            data = data.sort_values([&#39;shop_market_date&#39;])
            data.to_csv(file_path, index=False)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_goods_market_date(self):
        &#39;&#39;&#39;全国商品生命周期数据&#39;&#39;&#39;
        file_path = os.path.join(self.save_dir,&#39;ur_bi_dw.dim_goods_market_date.csv&#39;)
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = &#39;&#39;&#39;
            select * FROM ur_bi_dw.dim_goods_market_date
            &#39;&#39;&#39;
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:&#39;dim_goods_market_date.&#39;+c for c in columns}
            data = data.rename(columns=columns)
            data = data.sort_values([&#39;dim_goods_market_date.sku_no&#39;])
            data.to_csv(file_path, index=False)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_goods_color_dev_sizes(self):
        &#39;&#39;&#39;商品开发码数数据&#39;&#39;&#39;
        file_path = os.path.join(self.save_dir,&#39;dim_goods_color_dev_sizes.csv&#39;)
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            # sql = &#39;SELECT * FROM ur_bi_dw.dim_goods_market_shop_date as goods_shop_date&#39;
            sql = &#39;SELECT * FROM ur_bi_dm.dim_goods_color_dev_sizes&#39;
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace(&#39;dim_goods_color_dev_sizes.&#39;,&#39;&#39;) for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path, index=False)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dwd_daily_sales_size(self):
        &#39;&#39;&#39;实际销售金额&#39;&#39;&#39;
        sub_dir = os.path.join(self.save_dir,&#39;dwd_daily_sales_size_all&#39;)
        now_lock = self.get_lock(sub_dir)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(sub_dir):
                return
            sql = """
            select shop_no,sku_no,date_key,`size`,
                sum(tag_price) as `tag_price`,
                sum(sales_qty) as `sales_qty`,
                sum(sales_tag_amt) as `sales_tag_amt`,
                sum(sales_amt) as `sales_amt`,
                count(0) as `sales_count`
            from ur_bi_dw.dwd_daily_sales_size as sales
            where sales.date_key>=%s and sales.date_key<%s
                and sales.currency_code=&#39;CNY&#39;
            group by shop_no,sku_no,date_key,`size`
            """
            sort_columns = [&#39;date_key&#39;,&#39;shop_no&#39;,&#39;sku_no&#39;]
            self.get_data_of_date(
                save_dir=sub_dir,
                sql=sql,
                sort_columns=sort_columns,
                start_date=datetime.datetime(2017, 1, 1), # 开始时间
            )
            # 更新超时时间
            self.save_last_time(sub_dir)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dwd_daily_delivery_size(self):
        &#39;&#39;&#39;实际配货金额&#39;&#39;&#39;
        sub_dir = os.path.join(self.save_dir,&#39;dwd_daily_delivery_size_all&#39;)
        now_lock = self.get_lock(sub_dir)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(sub_dir):
                return
            sql = """
            select shop_no,sku_no,date_key,`size`,
                sum(delivery.shop_distr_received_qty) as `shop_distr_received_qty`,
                sum(delivery.shop_distr_received_amt) as `shop_distr_received_amt`,
                sum(delivery.online_distr_received_qty) as `online_distr_received_qty`,
                sum(delivery.online_distr_received_amt) as `online_distr_received_amt`,
                sum(delivery.pr_received_qty) as `pr_received_qty`,
                count(0) as `delivery_count`
            from ur_bi_dw.dwd_daily_delivery_size as delivery
            where delivery.date_key>=%s and delivery.date_key<%s
                and delivery.currency_code=&#39;CNY&#39;
            group by shop_no,sku_no,date_key,`size`
            """
            sort_columns = [&#39;date_key&#39;,&#39;shop_no&#39;,&#39;sku_no&#39;]
            self.get_data_of_date(
                save_dir=sub_dir,
                sql=sql,
                sort_columns=sort_columns,
                start_date=datetime.datetime(2017, 1, 1), # 开始时间
            )
            # 更新超时时间
            self.save_last_time(sub_dir)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_v_last_nation_sales_status(self):
        &#39;&#39;&#39;商品畅滞销数据&#39;&#39;&#39;
        file_path = os.path.join(self.save_dir,&#39;v_last_nation_sales_status.csv&#39;)
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = &#39;SELECT * FROM ur_bi_dw.v_last_nation_sales_status&#39;
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace(&#39;v_last_nation_sales_status.&#39;,&#39;&#39;) for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path, index=False)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dwd_daily_finacial_goods(self):
        &#39;&#39;&#39;商品成本价数据&#39;&#39;&#39;
        file_path = os.path.join(self.save_dir,&#39;dwd_daily_finacial_goods.csv&#39;)
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = """
            select t1.sku_no,t1.`size`,t1.cost_tax_incl from ur_bi_dw.dwd_daily_finacial_goods as t1
            inner join (
                select sku_no,`size`,max(date_key) as date_key
                from ur_bi_dw.dwd_daily_finacial_goods
                where currency_code=&#39;CNY&#39; and country_code=&#39;CN&#39;
                group by sku_no,`size`
            ) as t2
            on t2.sku_no=t1.sku_no
                and t2.`size`=t1.`size`
                and t2.date_key=t1.date_key
            where t1.currency_code=&#39;CNY&#39; and t1.country_code=&#39;CN&#39;
            """
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace(&#39;t1.&#39;,&#39;&#39;) for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path, index=False)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_size_group(self):
        &#39;&#39;&#39;尺码映射数据&#39;&#39;&#39;
        file_path = os.path.join(self.save_dir,&#39;dim_size_group.csv&#39;)
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = """select * from ur_bi_dw.dim_size_group"""
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace(&#39;dim_size_group.&#39;,&#39;&#39;) for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path, index=False)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
pass
def get_common_datas(
    host=&#39;10.2.32.22&#39;,
    port=21051,
    database=&#39;ur_ai_dw&#39;,
    auth_mechanism=&#39;LDAP&#39;,
    user=&#39;urbi&#39;,
    password=&#39;Ur#730xd&#39;,
    logger:logging.Logger=None):
    # 共用文件
    common_datas_dir = ShareArgs.get_args_value(&#39;common_datas_dir&#39;)
    common_ur_bi_dir = os.path.join(common_datas_dir, &#39;ur_bi_data&#39;)
    ur_bi_get_datas = UrBiGetDatas(
        host=host,
        port=port,
        database=database,
        auth_mechanism=auth_mechanism,
        user=user,
        password=password,
        save_dir=common_ur_bi_dir,
        logger=logger
    )
    try:
        logger.info(&#39;正在查询日期数据...&#39;)
        ur_bi_get_datas.get_dim_date()
        logger.info(&#39;查询日期数据完成!&#39;)
        logger.info(&#39;正在查询店铺数据...&#39;)
        ur_bi_get_datas.get_dim_shop()
        logger.info(&#39;查询店铺数据完成!&#39;)
        logger.info(&#39;正在查询天气数据...&#39;)
        ur_bi_get_datas.get_weather()
        logger.info(&#39;查询天气数据完成!&#39;)
        logger.info(&#39;正在查询天气城市数据...&#39;)
        ur_bi_get_datas.get_weather_city()
        logger.info(&#39;查询天气城市数据完成!&#39;)
        logger.info(&#39;正在查询货品数据...&#39;)
        ur_bi_get_datas.get_dim_goods()
        logger.info(&#39;查询货品数据完成!&#39;)
        logger.info(&#39;正在查询实际销量数据...&#39;)
        ur_bi_get_datas.get_dwd_daily_sales_size()
        logger.info(&#39;查询实际销量数据完成!&#39;)
    except Exception as ex:
        logger.exception(ex)
        raise ex # 往外抛出异常
    finally:
        ur_bi_get_datas.close()
pass
class CustomUrBiGetDatas(UrBiGetDatasBase):
    def __init__(
        self,
        host=&#39;10.2.32.22&#39;,
        port=21051,
        database=&#39;ur_ai_dw&#39;,
        auth_mechanism=&#39;LDAP&#39;,
        user=&#39;urbi&#39;,
        password=&#39;Ur#730xd&#39;,
        save_dir=&#39;./hjx/data/ur_bi_data&#39;,
        logger:logging.Logger=None
        ):
        self.save_dir = save_dir
        self.logger = logger
        super().__init__(
            host=host,
            port=port,
            database=database,
            auth_mechanism=auth_mechanism,
            user=user,
            password=password,
            save_dir=save_dir,
            logger=logger
            )
    def get_sales_goal_amt(self):
        &#39;&#39;&#39;销售目标金额&#39;&#39;&#39;
        file_path = os.path.join(self.save_dir,&#39;month_of_year_sales_goal_amt.csv&#39;)
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = &#39;&#39;&#39;
            select sales_goal.shop_no,
                if(sales_goal.serial=&#39;Y&#39;,&#39;W&#39;,sales_goal.serial) as `sales_goal.serial`,
                dates.month_of_year,
                sum(sales_goal.sales_goal_amt) as sales_goal_amt
            from ur_bi_dw.dwd_sales_goal_west as sales_goal
            inner join ur_bi_dw.dim_date as dates
                on sales_goal.date_key = dates.date_key
            group by sales_goal.shop_no,
                if(sales_goal.serial=&#39;Y&#39;,&#39;W&#39;,sales_goal.serial),
                dates.month_of_year
            &#39;&#39;&#39;
            data:pd.DataFrame = self.db_helper.get_data(sql)
            data = data.rename(columns={
                &#39;shop_no&#39;:&#39;sales_goal.shop_no&#39;,
                &#39;serial&#39;:&#39;sales_goal.serial&#39;,
                &#39;month_of_year&#39;:&#39;dates.month_of_year&#39;,
            })
            # 排序
            data = data.sort_values([&#39;sales_goal.shop_no&#39;,&#39;sales_goal.serial&#39;,&#39;dates.month_of_year&#39;])
            data.to_csv(file_path)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_shop_serial_area(self):
        &#39;&#39;&#39;店-系列面积&#39;&#39;&#39;
        file_path = os.path.join(self.save_dir,&#39;shop_serial_area.csv&#39;)
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = &#39;&#39;&#39;
            select shop_serial_area.shop_no,
                if(shop_serial_area.serial=&#39;Y&#39;,&#39;W&#39;,shop_serial_area.serial) as `shop_serial_area.serial`,
                shop_serial_area.month_of_year,
                sum(shop_serial_area.area) as `shop_serial_area.area`
            from ur_bi_dw.dwd_shop_serial_area as shop_serial_area
            where shop_serial_area.area is not null
            group by shop_serial_area.shop_no,if(shop_serial_area.serial=&#39;Y&#39;,&#39;W&#39;,shop_serial_area.serial),shop_serial_area.month_of_year
            &#39;&#39;&#39;
            data:pd.DataFrame = self.db_helper.get_data(sql)
            data = data.rename(columns={
                &#39;shop_no&#39;:&#39;shop_serial_area.shop_no&#39;,
                &#39;serial&#39;:&#39;shop_serial_area.serial&#39;,
                &#39;month_of_year&#39;:&#39;shop_serial_area.month_of_year&#39;,
                &#39;area&#39;:&#39;shop_serial_area.area&#39;,
            })
            # 排序
            data = data.sort_values([&#39;shop_serial_area.shop_no&#39;,&#39;shop_serial_area.serial&#39;,&#39;shop_serial_area.month_of_year&#39;])
            data.to_csv(file_path)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
pass
def get_datas(
    host=&#39;10.2.32.22&#39;,
    port=21051,
    database=&#39;ur_ai_dw&#39;,
    auth_mechanism=&#39;LDAP&#39;,
    user=&#39;urbi&#39;,
    password=&#39;Ur#730xd&#39;,
    save_dir=&#39;./data/sales_forecast/ur_bi_dw_data&#39;,
    logger:logging.Logger=None):
    ur_bi_get_datas = CustomUrBiGetDatas(
        host=host,
        port=port,
        database=database,
        auth_mechanism=auth_mechanism,
        user=user,
        password=password,
        save_dir=save_dir,
        logger=logger
    )
    try:
        # 店,系列,品类,年月,销售目标金额
        logger.info(&#39;正在查询年月销售目标金额数据...&#39;)
        ur_bi_get_datas.get_sales_goal_amt()
        logger.info(&#39;查询年月销售目标金额数据完成!&#39;)
    except Exception as ex:
        logger.exception(ex)
        raise ex # 往外抛出异常
    finally:
        ur_bi_get_datas.close()
pass
def getdata_ur_bi_dw(
    host=&#39;10.2.32.22&#39;,
    port=21051,
    database=&#39;ur_ai_dw&#39;,
    auth_mechanism=&#39;LDAP&#39;,
    user=&#39;urbi&#39;,
    password=&#39;Ur#730xd&#39;,
    save_dir=&#39;./data/sales_forecast/ur_bi_dw_data&#39;,
    logger=None
):
    get_common_datas(
        host=host,
        port=port,
        database=database,
        auth_mechanism=auth_mechanism,
        user=user,
        password=password,
        logger=logger
    )
    get_datas(
        host=host,
        port=port,
        database=database,
        auth_mechanism=auth_mechanism,
        user=user,
        password=password,
        save_dir=save_dir,
        logger=logger
    )
pass
# 代码入口
# getdata_ur_bi_dw(
#     host=ur_bi_dw_host,
#     port=ur_bi_dw_port,
#     database=ur_bi_dw_database,
#     auth_mechanism=ur_bi_dw_auth_mechanism,
#     user=ur_bi_dw_user,
#     password=ur_bi_dw_password,
#     save_dir=ur_bi_dw_save_dir,
#     logger=logger
#     )
Salin selepas log masuk

Perihalan dan pemahaman kod

Penerangan fungsi khusus setiap kelas, kod tersebut perlu "dimakan" mengikut penerangan teks berikut:

(Lapisan pertama) HiveHelper melengkapkan fungsi seperti menyambung ke pangkalan data, menutup sambungan pangkalan data, menjana transaksi, pelaksanaan, enjin, sambungan, dll.

VarsHelper menyediakan fungsi kegigihan mudah yang boleh menjimatkan objek sebagai fail yang disimpan pada cakera. Dan menyediakan kaedah untuk menetapkan nilai, mendapatkan nilai dan menilai sama ada nilai wujud

GlobalShareArgs menyediakan kamus, dan menyediakan kaedah untuk mendapatkan kamus, menetapkan kamus, menetapkan pasangan nilai kunci kamus, menetapkan nilai kunci kamus dan menilai sama ada kunci Dalam kamus, kemas kini kamus dan kaedah lain

ShareArgs adalah serupa dengan GlobalShareArgs, kecuali terdapat lebih banyak pasangan nilai kunci untuk permulaan kamus pada permulaan

(Lapisan kedua) Kelas UrBiGetDataBase menyediakan kamus kunci benang , kamus masa dan kamus penghakiman tamat masa ialah semua pembolehubah kelas yang digunakan, tetapi ambil perhatian bahawa ia tidak diwarisi. Dalam bacaan SQL tertentu, penetapan benang dan pertimbangan masa disediakan

(lapisan ketiga) kelas UrBiGetDatas untuk mendapatkan data tarikh, menyimpan data, data ahli, data cuaca, data bandar cuaca, dll. daripada pangkalan data sarang data, data kitaran hayat kedai, data kitaran hayat produk nasional, data kod pembangunan produk, jumlah jualan sebenar, jumlah pengedaran sebenar, data produk tidak terjual, data harga kos produk, data pemetaan saiz, dsb.

(Lapisan keempat) fungsi get_common_data, gunakan kelas URBiGetData untuk membaca tarikh, menyimpan, cuaca, bandar cuaca, produk, data jualan sebenar dan cache di bawah folder ./yongjian/data/ur_bi_data

Kelas CustomUrBiGetData mewarisi kelas UrBiGetDatasBase dan membaca jumlah sasaran jualan dan data kawasan siri mata.

(Ini juga merupakan lapisan keempat) fungsi get_datas membaca jumlah sasaran jualan tahunan dan bulanan melalui kelas CustomUrBiGetData.

Fungsi umum: (Ini ialah fungsi kemasukan panggilan umum) fungsi get_data_ur_bi_dw, yang memanggil fungsi get_common_data dan get_datas untuk membaca data, dan kemudian menyimpan data ke direktori folder tertentu.

Secara analogi, jika anda bukan pangkalan data sarang, anda boleh menggantikan lapisan pertama dengan mysql. Halaman utama menerangkan cara membuat penggantian. Lapisan kedua tidak perlu diubah Lapisan ketiga ialah jadual data yang anda ingin baca sql.

Kelebihan kaedah ini ialah data tidak akan dibaca berulang kali, dan data yang dibaca boleh digunakan dengan cekap.

Lampirkan kod contoh yang diubah suai kepada mysql

import logging
import pandas as pd
from impala.dbapi import connect
import sqlalchemy
from sqlalchemy.orm import sessionmaker
import os
import time
import os
import datetime
from dateutil.relativedelta import relativedelta
from typing import Dict, List
import logging
import threading
import pandas as pd
import pickle
class MySqlHelper(object):
    def __init__(
        self,
        host=&#39;192.168.15.144&#39;,
        port=3306,
        database=&#39;test_ims&#39;,
        user=&#39;spkjz_writer&#39;,
        password=&#39;7cmoP3QDtueVJQj2q4Az&#39;,
        logger:logging.Logger=None
        ):
        self.host = host
        self.port = port
        self.database = database
        self.user = user
        self.password = password
        self.logger = logger
        self.connection_str = &#39;mysql+pymysql://%s:%s@%s:%d/%s&#39; %(
            self.user, self.password, self.host, self.port, self.database
        )
        self.conn = None
        self.cursor = None
        self.engine = None
        self.session = None
    def create_table_code(self, file_name):
        &#39;&#39;&#39;创建表类代码&#39;&#39;&#39;
        os.system(f&#39;sqlacodegen {self.connection_str} > {file_name}&#39;)
        return self.conn
    def get_conn(self):
        &#39;&#39;&#39;创建连接或获取连接&#39;&#39;&#39;
        if self.conn is None:
            engine = self.get_engine()
            self.conn = engine.connect()
        return self.conn
    def get_engine(self):
        &#39;&#39;&#39;创建连接或获取连接&#39;&#39;&#39;
        if self.engine is None:
            self.engine = sqlalchemy.create_engine(self.connection_str)
        return self.engine
    def get_cursor(self):
        &#39;&#39;&#39;创建连接或获取连接&#39;&#39;&#39;
        if self.cursor is None:
            self.cursor = self.conn.cursor()
        return self.cursor
    def get_session(self) -> sessionmaker:
        &#39;&#39;&#39;创建连接或获取连接&#39;&#39;&#39;
        if self.session is None:
            engine = self.get_engine()
            Session = sessionmaker(bind=engine)
            self.session = Session()
        return self.session
    def close_conn(self):
        &#39;&#39;&#39;关闭连接&#39;&#39;&#39;
        if self.conn is not None:
            self.conn.close()
            self.conn = None
        self.dispose_engine()
    def close_session(self):
        &#39;&#39;&#39;关闭连接&#39;&#39;&#39;
        if self.session is not None:
            self.session.close()
            self.session = None
        self.dispose_engine()
    def dispose_engine(self):
        &#39;&#39;&#39;释放engine&#39;&#39;&#39;
        if self.engine is not None:
            # self.engine.dispose(close=False)
            self.engine.dispose()
            self.engine = None
    def close_cursor(self):
        &#39;&#39;&#39;关闭cursor&#39;&#39;&#39;
        if self.cursor is not None:
            self.cursor.close()
            self.cursor = None
    def get_data(self, sql, auto_close=True) -> pd.DataFrame:
        &#39;&#39;&#39;查询数据&#39;&#39;&#39;
        conn = self.get_conn()
        data = None
        try:
            # 异常重试3次
            for i in range(3):
                try:
                    data = pd.read_sql(sql, conn)
                    break
                except Exception as ex:
                    if i == 2:
                        raise ex # 往外抛出异常
                    time.sleep(60) # 一分钟后重试
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            if auto_close:
                self.close_conn()
        return data
pass
class VarsHelper():
    def __init__(self, save_dir, auto_save=True):
        self.save_dir = save_dir
        self.auto_save = auto_save
        self.values = {}
        if not os.path.exists(os.path.dirname(self.save_dir)):
            os.makedirs(os.path.dirname(self.save_dir))
        if os.path.exists(self.save_dir):
            with open(self.save_dir, &#39;rb&#39;) as f:
                self.values = pickle.load(f)
                f.close()
    def set_value(self, key, value):
        self.values[key] = value
        if self.auto_save:
            self.save_file()
    def get_value(self, key):
        return self.values[key]
    def has_key(self, key):
        return key in self.values.keys()
    def save_file(self):
        with open(self.save_dir, &#39;wb&#39;) as f:
            pickle.dump(self.values, f)
            f.close()
pass
class GlobalShareArgs():
    args = {
        "debug": False
    }
    def get_args():
        return GlobalShareArgs.args
    def set_args(args):
        GlobalShareArgs.args = args
    def set_args_value(key, value):
        GlobalShareArgs.args[key] = value
    def get_args_value(key, default_value=None):
        return GlobalShareArgs.args.get(key, default_value)
    def contain_key(key):
        return key in GlobalShareArgs.args.keys()
    def update(args):
        GlobalShareArgs.args.update(args)
pass
class ShareArgs():
    args = {
        "labels_dir":"./hjx/shop_group/month_w_amt/data/labels", # 标签目录
        "labels_output_dir":"./hjx/shop_group/month_w_amt/data/labels_output", # 聚类导出标签目录
        "common_datas_dir":"./hjx/data", # 共用数据目录。ur_bi_dw的公共
        "only_predict": False, # 只识别,不训练
        "delete_model": True, # 先删除模型,仅在训练时使用
        "export_excel": False, # 导出excel
        "classes": 12, # 聚类数
        "batch_size": 16,
        "hidden_size": 32,
        "max_nrof_epochs": 100,
        "learning_rate": 0.0005,
        "loss_type": "categorical_crossentropy",
        "avg_model_num": 10,
        "steps_per_epoch": 4.0, # 4.0
        "lr_callback_patience": 4, 
        "lr_callback_cooldown": 1,
        "early_stopping_callback_patience": 6,
        "get_data": True,
    }
    def get_args():
        return ShareArgs.args
    def set_args(args):
        ShareArgs.args = args
    def set_args_value(key, value):
        ShareArgs.args[key] = value
    def get_args_value(key, default_value=None):
        return ShareArgs.args.get(key, default_value)
    def contain_key(key):
        return key in ShareArgs.args.keys()
    def update(args):
        ShareArgs.args.update(args)
pass
class IMSGetDatasBase():
    # 线程锁列表,同保存路径共用锁
    lock_dict:Dict[str, threading.Lock] = {}
    # 时间列表,用于判断是否超时
    time_dict:Dict[str, datetime.datetime] = {}
    # 用于记录是否需要更新超时时间
    get_data_timeout_dict:Dict[str, bool] = {}
    def __init__(
        self,
        host=&#39;192.168.15.144&#39;,
        port=3306,
        database=&#39;test_ims&#39;,
        user=&#39;spkjz_writer&#39;,
        password=&#39;Ur#7cmoP3QDtueVJQj2q4Az&#39;,
        save_dir=None,
        logger:logging.Logger=None,
        ):
        self.save_dir = save_dir
        self.logger = logger
        self.db_helper = MySqlHelper(
            host=host,
            port=port,
            database=database,
            user=user,
            password=password,
            logger=logger
            )
        # 创建子目录
        if self.save_dir is not None and not os.path.exists(self.save_dir):
            os.makedirs(self.save_dir)
        self.vars_helper = None
        if GlobalShareArgs.get_args_value(&#39;debug&#39;):
            self.vars_helper = VarsHelper(&#39;./hjx/data/vars/IMSGetDatas&#39;) # 把超时时间保存到文件,注释该行即可停掉,只用于调试
    def close(self):
        &#39;&#39;&#39;关闭连接&#39;&#39;&#39;
        self.db_helper.close_conn()
    def get_last_time(self, key_name) -> bool:
        &#39;&#39;&#39;获取是否超时&#39;&#39;&#39;
        # 转静态路径,确保唯一性
        key_name = os.path.abspath(key_name)
        if self.vars_helper is not None and self.vars_helper.has_key(&#39;IMSGetDatasBase.time_list&#39;):
            IMSGetDatasBase.time_dict = self.vars_helper.get_value(&#39;IMSGetDatasBase.time_list&#39;)
        timeout = 12 # 12小时
        if GlobalShareArgs.get_args_value(&#39;debug&#39;):
            timeout = 24 # 24小时
        get_data_timeout = False
        if key_name not in IMSGetDatasBase.time_dict.keys() or (datetime.datetime.today() - IMSGetDatasBase.time_dict[key_name]).total_seconds()>(4*60*60):
            self.logger.info(&#39;超时%d小时,重新查数据:%s&#39;, timeout, key_name)
            # IMSGetDatasBase.time_list[key_name] = datetime.datetime.today()
            get_data_timeout = True
        else:
            self.logger.info(&#39;未超时%d小时,跳过查数据:%s&#39;, timeout, key_name)
        # if self.vars_helper is not None :
        #     self.vars_helper.set_value(&#39;IMSGetDatasBase.time_list&#39;, IMSGetDatasBase.time_list)
        IMSGetDatasBase.get_data_timeout_dict[key_name] = get_data_timeout
        return get_data_timeout
    def save_last_time(self, key_name):
        &#39;&#39;&#39;更新状态超时&#39;&#39;&#39;
        # 转静态路径,确保唯一性
        key_name = os.path.abspath(key_name)
        if IMSGetDatasBase.get_data_timeout_dict[key_name]:
            IMSGetDatasBase.time_dict[key_name] = datetime.datetime.today()
        if self.vars_helper is not None :
            IMSGetDatasBase.time_dict[key_name] = datetime.datetime.today()
            self.vars_helper.set_value(&#39;IMSGetDatasBase.time_list&#39;, IMSGetDatasBase.time_dict)
    def get_lock(self, key_name) -> threading.Lock:
        &#39;&#39;&#39;获取锁&#39;&#39;&#39;
        # 转静态路径,确保唯一性
        key_name = os.path.abspath(key_name)
        if key_name not in IMSGetDatasBase.lock_dict.keys():
            IMSGetDatasBase.lock_dict[key_name] = threading.Lock()
        return IMSGetDatasBase.lock_dict[key_name]
    def get_data_of_date(
        self,
        save_dir,
        sql,
        sort_columns:List[str],
        del_index_list=[-1], # 删除最后下标
        start_date = datetime.datetime(2017, 1, 1), # 开始时间
        offset = relativedelta(months=3), # 时间间隔
        date_format_fun = lambda d: &#39;%04d%02d01&#39; % (d.year, d.month), # 查询语句中替代时间参数的格式化
        filename_format_fun = lambda d: &#39;%04d%02d.csv&#39; % (d.year, d.month), # 查询语句中替代时间参数的格式化
        stop_date = &#39;20700101&#39;, # 超过时间则停止
        ):
        &#39;&#39;&#39;分时间增量读取数据&#39;&#39;&#39;
        # 创建文件夹
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)
        else:
            #删除最后一个文件
            file_list = os.listdir(save_dir)
            if len(file_list)>0:
                file_list.sort()
                for del_index in del_index_list:
                    os.remove(os.path.join(save_dir,file_list[del_index]))
                    print(&#39;删除最后一个文件:&#39;, file_list[del_index])
        select_index = -1
        # start_date = datetime.datetime(2017, 1, 1)
        while True:
            end_date = start_date + offset
            start_date_str = date_format_fun(start_date)
            end_date_str = date_format_fun(end_date)
            self.logger.info(&#39;date: %s-%s&#39;, start_date_str, end_date_str)
            file_path = os.path.join(save_dir, filename_format_fun(start_date))
            # self.logger.info(&#39;file_path: %s&#39;, file_path)
            if not os.path.exists(file_path):
                data:pd.DataFrame = self.db_helper.get_data(sql % (start_date_str, end_date_str))
                if data is None:
                    break
                self.logger.info(&#39;data: %d&#39;, len(data))
                # self.logger.info(&#39;data: %d&#39;, data.columns)
                if len(data)>0:
                    select_index+=1
                    # 排序
                    data = data.sort_values(sort_columns)
                    data.to_csv(file_path)
                elif select_index!=-1:
                    break
                elif stop_date < start_date_str:
                    raise Exception("读取数据异常,时间超出最大值!")
            start_date = end_date
pass
class CustomIMSGetDatas(IMSGetDatasBase):
    def __init__(
        self,
        host=&#39;192.168.13.134&#39;,
        port=4000,
        database=&#39;test_ims&#39;,
        user=&#39;root&#39;,
        password=&#39;rootimmsadmin&#39;,
        save_dir=&#39;./hjx/data/export_ims_data&#39;,
        logger:logging.Logger=None
        ):
        self.save_dir = save_dir
        self.logger = logger
        super().__init__(
            host=host,
            port=port,
            database=database,
            user=user,
            password=password,
            save_dir=save_dir,
            logger=logger
            )
    def get_ims_w_amt_pro(self):
        &#39;&#39;&#39;年月系列占比数据&#39;&#39;&#39;
        file_path = os.path.join(self.save_dir,&#39;ims_w_amt_pro.csv&#39;)
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            # if not self.get_last_time(file_path):
            #     return
            sql = &#39;SELECT * FROM ims_w_amt_pro&#39;
            data:pd.DataFrame = self.db_helper.get_data(sql)
            data = data.rename(columns={
                &#39;serial_forecast_proportion&#39;: &#39;forecast_proportion&#39;,
            })
            data.to_csv(file_path)
            # # 更新超时时间
            # self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
pass
def get_datas(
    host=&#39;192.168.13.134&#39;,
    port=4000,
    database=&#39;test_ims&#39;,
    user=&#39;root&#39;,
    password=&#39;rootimmsadmin&#39;,
    save_dir=&#39;./hjx/data/export_ims_data&#39;,
    logger:logging.Logger=None
    ):
    ur_bi_get_datas = CustomIMSGetDatas(
        host=host,
        port=port,
        database=database,
        user=user,
        password=password,
        save_dir=save_dir,
        logger=logger
    )
    try:
        # 年月系列占比数据
        logger.info(&#39;正在查询年月系列占比数据...&#39;)
        ur_bi_get_datas.get_ims_w_amt_pro()
        logger.info(&#39;查询年月系列占比数据完成!&#39;)
    except Exception as ex:
        logger.exception(ex)
        raise ex # 往外抛出异常
    finally:
        ur_bi_get_datas.close()
pass
def getdata_export_ims(
    host=&#39;192.168.13.134&#39;,
    port=4000,
    database=&#39;test_ims&#39;,
    user=&#39;root&#39;,
    password=&#39;rootimmsadmin&#39;,
    save_dir=&#39;./hjx/data/export_ims_data&#39;,
    logger:logging.Logger=None
    ):
    get_datas(
        host=host,
        port=port,
        database=database,
        user=user,
        password=password,
        save_dir=save_dir,
        logger=logger
    )
pass
Salin selepas log masuk

Atas ialah kandungan terperinci Bagaimana untuk membaca pangkalan data Hive menggunakan Python?. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn

Alat AI Hot

Undresser.AI Undress

Undresser.AI Undress

Apl berkuasa AI untuk mencipta foto bogel yang realistik

AI Clothes Remover

AI Clothes Remover

Alat AI dalam talian untuk mengeluarkan pakaian daripada foto.

Undress AI Tool

Undress AI Tool

Gambar buka pakaian secara percuma

Clothoff.io

Clothoff.io

Penyingkiran pakaian AI

AI Hentai Generator

AI Hentai Generator

Menjana ai hentai secara percuma.

Alat panas

Notepad++7.3.1

Notepad++7.3.1

Editor kod yang mudah digunakan dan percuma

SublimeText3 versi Cina

SublimeText3 versi Cina

Versi Cina, sangat mudah digunakan

Hantar Studio 13.0.1

Hantar Studio 13.0.1

Persekitaran pembangunan bersepadu PHP yang berkuasa

Dreamweaver CS6

Dreamweaver CS6

Alat pembangunan web visual

SublimeText3 versi Mac

SublimeText3 versi Mac

Perisian penyuntingan kod peringkat Tuhan (SublimeText3)

PHP dan Python: Contoh dan perbandingan kod PHP dan Python: Contoh dan perbandingan kod Apr 15, 2025 am 12:07 AM

PHP dan Python mempunyai kelebihan dan kekurangan mereka sendiri, dan pilihannya bergantung kepada keperluan projek dan keutamaan peribadi. 1.PHP sesuai untuk pembangunan pesat dan penyelenggaraan aplikasi web berskala besar. 2. Python menguasai bidang sains data dan pembelajaran mesin.

Python vs JavaScript: Komuniti, Perpustakaan, dan Sumber Python vs JavaScript: Komuniti, Perpustakaan, dan Sumber Apr 15, 2025 am 12:16 AM

Python dan JavaScript mempunyai kelebihan dan kekurangan mereka sendiri dari segi komuniti, perpustakaan dan sumber. 1) Komuniti Python mesra dan sesuai untuk pemula, tetapi sumber pembangunan depan tidak kaya dengan JavaScript. 2) Python berkuasa dalam bidang sains data dan perpustakaan pembelajaran mesin, sementara JavaScript lebih baik dalam perpustakaan pembangunan dan kerangka pembangunan depan. 3) Kedua -duanya mempunyai sumber pembelajaran yang kaya, tetapi Python sesuai untuk memulakan dengan dokumen rasmi, sementara JavaScript lebih baik dengan MDNWebDocs. Pilihan harus berdasarkan keperluan projek dan kepentingan peribadi.

Cara menjalankan program di terminal vscode Cara menjalankan program di terminal vscode Apr 15, 2025 pm 06:42 PM

Dalam kod VS, anda boleh menjalankan program di terminal melalui langkah -langkah berikut: Sediakan kod dan buka terminal bersepadu untuk memastikan bahawa direktori kod selaras dengan direktori kerja terminal. Pilih arahan Run mengikut bahasa pengaturcaraan (seperti python python your_file_name.py) untuk memeriksa sama ada ia berjalan dengan jayanya dan menyelesaikan kesilapan. Gunakan debugger untuk meningkatkan kecekapan debug.

Bolehkah kod studio visual digunakan dalam python Bolehkah kod studio visual digunakan dalam python Apr 15, 2025 pm 08:18 PM

Kod VS boleh digunakan untuk menulis Python dan menyediakan banyak ciri yang menjadikannya alat yang ideal untuk membangunkan aplikasi python. Ia membolehkan pengguna untuk: memasang sambungan python untuk mendapatkan fungsi seperti penyempurnaan kod, penonjolan sintaks, dan debugging. Gunakan debugger untuk mengesan kod langkah demi langkah, cari dan selesaikan kesilapan. Mengintegrasikan Git untuk Kawalan Versi. Gunakan alat pemformatan kod untuk mengekalkan konsistensi kod. Gunakan alat linting untuk melihat masalah yang berpotensi lebih awal.

Penjelasan terperinci mengenai Prinsip Docker Penjelasan terperinci mengenai Prinsip Docker Apr 14, 2025 pm 11:57 PM

Docker menggunakan ciri -ciri kernel Linux untuk menyediakan persekitaran berjalan yang cekap dan terpencil. Prinsip kerjanya adalah seperti berikut: 1. Cermin digunakan sebagai templat baca sahaja, yang mengandungi semua yang anda perlukan untuk menjalankan aplikasi; 2. Sistem Fail Kesatuan (Unionfs) menyusun pelbagai sistem fail, hanya menyimpan perbezaan, menjimatkan ruang dan mempercepatkan; 3. Daemon menguruskan cermin dan bekas, dan pelanggan menggunakannya untuk interaksi; 4. Ruang nama dan cgroups melaksanakan pengasingan kontena dan batasan sumber; 5. Pelbagai mod rangkaian menyokong interkoneksi kontena. Hanya dengan memahami konsep -konsep teras ini, anda boleh menggunakan Docker dengan lebih baik.

Adakah sambungan vscode berniat jahat? Adakah sambungan vscode berniat jahat? Apr 15, 2025 pm 07:57 PM

Sambungan kod VS menimbulkan risiko yang berniat jahat, seperti menyembunyikan kod jahat, mengeksploitasi kelemahan, dan melancap sebagai sambungan yang sah. Kaedah untuk mengenal pasti sambungan yang berniat jahat termasuk: memeriksa penerbit, membaca komen, memeriksa kod, dan memasang dengan berhati -hati. Langkah -langkah keselamatan juga termasuk: kesedaran keselamatan, tabiat yang baik, kemas kini tetap dan perisian antivirus.

Boleh kod vs dijalankan di Windows 8 Boleh kod vs dijalankan di Windows 8 Apr 15, 2025 pm 07:24 PM

Kod VS boleh dijalankan pada Windows 8, tetapi pengalaman mungkin tidak hebat. Mula -mula pastikan sistem telah dikemas kini ke patch terkini, kemudian muat turun pakej pemasangan kod VS yang sepadan dengan seni bina sistem dan pasangnya seperti yang diminta. Selepas pemasangan, sedar bahawa beberapa sambungan mungkin tidak sesuai dengan Windows 8 dan perlu mencari sambungan alternatif atau menggunakan sistem Windows yang lebih baru dalam mesin maya. Pasang sambungan yang diperlukan untuk memeriksa sama ada ia berfungsi dengan betul. Walaupun kod VS boleh dilaksanakan pada Windows 8, disyorkan untuk menaik taraf ke sistem Windows yang lebih baru untuk pengalaman dan keselamatan pembangunan yang lebih baik.

Python: Automasi, skrip, dan pengurusan tugas Python: Automasi, skrip, dan pengurusan tugas Apr 16, 2025 am 12:14 AM

Python cemerlang dalam automasi, skrip, dan pengurusan tugas. 1) Automasi: Sandaran fail direalisasikan melalui perpustakaan standard seperti OS dan Shutil. 2) Penulisan Skrip: Gunakan Perpustakaan Psutil untuk memantau sumber sistem. 3) Pengurusan Tugas: Gunakan perpustakaan jadual untuk menjadualkan tugas. Kemudahan penggunaan Python dan sokongan perpustakaan yang kaya menjadikannya alat pilihan di kawasan ini.

See all articles