目次
ハイブ データベースを読み取るための実際のビジネス コード
コードの説明と理解
ホームページ バックエンド開発 Python チュートリアル Python を使用して Hive データベースを読み取るにはどうすればよいですか?

Python を使用して Hive データベースを読み取るにはどうすればよいですか?

May 09, 2023 pm 04:28 PM
python hive

ハイブ データベースを読み取るための実際のビジネス コード

import logging
import pandas as pd
from impala.dbapi import connect
import sqlalchemy
from sqlalchemy.orm import sessionmaker
import os
import time
import os
import datetime
from dateutil.relativedelta import relativedelta
from typing import Dict, List
import logging
import threading
import pandas as pd
import pickle
class HiveHelper(object):
    def __init__(
        self,
        host='10.2.32.22',
        port=21051,
        database='ur_ai_dw',
        auth_mechanism='LDAP',
        user='urbi',
        password='Ur#730xd',
        logger:logging.Logger=None
        ):
        self.host = host
        self.port = port
        self.database = database
        self.auth_mechanism = auth_mechanism
        self.user = user
        self.password = password
        self.logger = logger
        self.impala_conn = None
        self.conn = None
        self.cursor = None
        self.engine = None
        self.session = None
    def create_table_code(self, file_name):
        '''创建表类代码'''
        os.system(f'sqlacodegen {self.connection_str} > {file_name}')
        return self.conn
    def get_conn(self):
        '''创建连接或获取连接'''
        if self.conn is None:
            engine = self.get_engine()
            self.conn = engine.connect()
        return self.conn
    def get_impala_conn(self):
        '''创建连接或获取连接'''
        if self.impala_conn is None:
            self.impala_conn = connect(
                host=self.host,
                port=self.port,
                database=self.database,
                auth_mechanism=self.auth_mechanism,
                user=self.user,
                password=self.password
                )
        return self.impala_conn
    def get_engine(self):
        '''创建连接或获取连接'''
        if self.engine is None:
            self.engine = sqlalchemy.create_engine('impala://', creator=self.get_impala_conn)
        return self.engine
    def get_cursor(self):
        '''创建连接或获取连接'''
        if self.cursor is None:
            self.cursor = self.conn.cursor()
        return self.cursor
    def get_session(self) -> sessionmaker:
        '''创建连接或获取连接'''
        if self.session is None:
            engine = self.get_engine()
            Session = sessionmaker(bind=engine)
            self.session = Session()
        return self.session
    def close_conn(self):
        '''关闭连接'''
        if self.conn is not None:
            self.conn.close()
            self.conn = None
        self.dispose_engine()
        self.close_impala_conn()
    def close_impala_conn(self):
        '''关闭impala连接'''
        if self.impala_conn is not None:
            self.impala_conn.close()
            self.impala_conn = None
    def close_session(self):
        '''关闭连接'''
        if self.session is not None:
            self.session.close()
            self.session = None
        self.dispose_engine()
    def dispose_engine(self):
        '''释放engine'''
        if self.engine is not None:
            # self.engine.dispose(close=False)
            self.engine.dispose()
            self.engine = None
    def close_cursor(self):
        '''关闭cursor'''
        if self.cursor is not None:
            self.cursor.close()
            self.cursor = None
    def get_data(self, sql, auto_close=True) -> pd.DataFrame:
        '''查询数据'''
        conn = self.get_conn()
        data = None
        try:
            # 异常重试3次
            for i in range(3):
                try:
                    data = pd.read_sql(sql, conn)
                    break
                except Exception as ex:
                    if i == 2:
                        raise ex # 往外抛出异常
                    time.sleep(60) # 一分钟后重试
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            if auto_close:
                self.close_conn()
        return data
pass
class VarsHelper():
    def __init__(self, save_dir, auto_save=True):
        self.save_dir = save_dir
        self.auto_save = auto_save
        self.values = {}
        if not os.path.exists(os.path.dirname(self.save_dir)):
            os.makedirs(os.path.dirname(self.save_dir))
        if os.path.exists(self.save_dir):
            with open(self.save_dir, 'rb') as f:
                self.values = pickle.load(f)
                f.close()
    def set_value(self, key, value):
        self.values[key] = value
        if self.auto_save:
            self.save_file()
    def get_value(self, key):
        return self.values[key]
    def has_key(self, key):
        return key in self.values.keys()
    def save_file(self):
        with open(self.save_dir, 'wb') as f:
            pickle.dump(self.values, f)
            f.close()
pass
class GlobalShareArgs():
    args = {
        "debug": False
    }
    def get_args():
        return GlobalShareArgs.args
    def set_args(args):
        GlobalShareArgs.args = args
    def set_args_value(key, value):
        GlobalShareArgs.args[key] = value
    def get_args_value(key, default_value=None):
        return GlobalShareArgs.args.get(key, default_value)
    def contain_key(key):
        return key in GlobalShareArgs.args.keys()
    def update(args):
        GlobalShareArgs.args.update(args)
pass
class ShareArgs():
    args = {
        "labels_dir":"./hjx/shop_group/month_w_amt/data/labels", # 标签目录
        "labels_output_dir":"./hjx/shop_group/month_w_amt/data/labels_output", # 聚类导出标签目录
        "common_datas_dir":"./hjx/data", # 共用数据目录。ur_bi_dw的公共
        "only_predict": False, # 只识别,不训练
        "delete_model": True, # 先删除模型,仅在训练时使用
        "export_excel": False, # 导出excel
        "classes": 12, # 聚类数
        "batch_size": 16,
        "hidden_size": 32,
        "max_nrof_epochs": 100,
        "learning_rate": 0.0005,
        "loss_type": "categorical_crossentropy",
        "avg_model_num": 10,
        "steps_per_epoch": 4.0, # 4.0
        "lr_callback_patience": 4, 
        "lr_callback_cooldown": 1,
        "early_stopping_callback_patience": 6,
        "get_data": True,
    }
    def get_args():
        return ShareArgs.args
    def set_args(args):
        ShareArgs.args = args
    def set_args_value(key, value):
        ShareArgs.args[key] = value
    def get_args_value(key, default_value=None):
        return ShareArgs.args.get(key, default_value)
    def contain_key(key):
        return key in ShareArgs.args.keys()
    def update(args):
        ShareArgs.args.update(args)
pass
class UrBiGetDatasBase():
    # 线程锁列表,同保存路径共用锁
    lock_dict:Dict[str, threading.Lock] = {}
    # 时间列表,用于判断是否超时
    time_dict:Dict[str, datetime.datetime] = {}
    # 用于记录是否需要更新超时时间
    get_data_timeout_dict:Dict[str, bool] = {}
    def __init__(
        self,
        host='10.2.32.22',
        port=21051,
        database='ur_ai_dw',
        auth_mechanism='LDAP',
        user='urbi',
        password='Ur#730xd',
        save_dir=None,
        logger:logging.Logger=None,
        ):
        self.save_dir = save_dir
        self.logger = logger
        self.db_helper = HiveHelper(
            host=host,
            port=port,
            database=database,
            auth_mechanism=auth_mechanism,
            user=user,
            password=password,
            logger=logger
            )
        # 创建子目录
        if self.save_dir is not None and not os.path.exists(self.save_dir):
            os.makedirs(self.save_dir)
        self.vars_helper = None
        if GlobalShareArgs.get_args_value('debug'):
            self.vars_helper = VarsHelper('./hjx/data/vars/UrBiGetDatas') 
    def close(self):
        '''关闭连接'''
        self.db_helper.close_conn()
    def get_last_time(self, key_name) -> bool:
        '''获取是否超时'''
        # 转静态路径,确保唯一性
        key_name = os.path.abspath(key_name)
        if self.vars_helper is not None and self.vars_helper.has_key('UrBiGetDatasBase.time_list'):
            UrBiGetDatasBase.time_dict = self.vars_helper.get_value('UrBiGetDatasBase.time_list')
        timeout = 12 # 12小时
        if GlobalShareArgs.get_args_value('debug'):
            timeout = 24 # 24小时
        get_data_timeout = False
        if key_name not in UrBiGetDatasBase.time_dict.keys() or (datetime.datetime.today() - UrBiGetDatasBase.time_dict[key_name]).total_seconds()>(timeout*60*60):
            self.logger.info('超时%d小时,重新查数据:%s', timeout, key_name)
            # UrBiGetDatasBase.time_list[key_name] = datetime.datetime.today()
            get_data_timeout = True
        else:
            self.logger.info('未超时%d小时,跳过查数据:%s', timeout, key_name)
        # if self.vars_helper is not None :
        #     self.vars_helper.set_value('UrBiGetDatasBase.time_list', UrBiGetDatasBase.time_list)
        UrBiGetDatasBase.get_data_timeout_dict[key_name] = get_data_timeout
        return get_data_timeout
    def save_last_time(self, key_name):
        '''更新状态超时'''
        # 转静态路径,确保唯一性
        key_name = os.path.abspath(key_name)
        if UrBiGetDatasBase.get_data_timeout_dict[key_name]:
            UrBiGetDatasBase.time_dict[key_name] = datetime.datetime.today()
        if self.vars_helper is not None :
            UrBiGetDatasBase.time_dict[key_name] = datetime.datetime.today()
            self.vars_helper.set_value('UrBiGetDatasBase.time_list', UrBiGetDatasBase.time_dict)
    def get_lock(self, key_name) -> threading.Lock:
        '''获取锁'''
        # 转静态路径,确保唯一性
        key_name = os.path.abspath(key_name)
        if key_name not in UrBiGetDatasBase.lock_dict.keys():
            UrBiGetDatasBase.lock_dict[key_name] = threading.Lock()
        return UrBiGetDatasBase.lock_dict[key_name]
    def get_data_of_date(
        self,
        save_dir,
        sql,
        sort_columns:List[str],
        del_index_list=[-1], # 删除最后下标
        start_date = datetime.datetime(2017, 1, 1), # 开始时间
        offset = relativedelta(months=3), # 时间间隔
        date_format_fun = lambda d: '%04d%02d01' % (d.year, d.month), # 查询语句中替代时间参数的格式化
        filename_format_fun = lambda d: '%04d%02d.csv' % (d.year, d.month), # 查询语句中替代时间参数的格式化
        stop_date = '20700101', # 超过时间则停止
        data_format_fun = None, # 格式化数据
        ):
        '''分时间增量读取数据'''
        # 创建文件夹
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)
        else:
            #删除最后一个文件
            file_list = os.listdir(save_dir)
            if len(file_list)>0:
                file_list.sort()
                for del_index in del_index_list:
                    os.remove(os.path.join(save_dir,file_list[del_index]))
                    print('删除最后一个文件:', file_list[del_index])
        select_index = -1
        # start_date = datetime.datetime(2017, 1, 1)
        while True:
            end_date = start_date + offset
            start_date_str = date_format_fun(start_date)
            end_date_str = date_format_fun(end_date)
            self.logger.info('date: %s-%s', start_date_str, end_date_str)
            file_path = os.path.join(save_dir, filename_format_fun(start_date))
            # self.logger.info('file_path: %s', file_path)
            if not os.path.exists(file_path):
                data:pd.DataFrame = self.db_helper.get_data(sql % (start_date_str, end_date_str))
                if data is None:
                    break
                self.logger.info('data: %d', len(data))
                # self.logger.info('data: %d', data.columns)
                if len(data)>0:
                    select_index+=1
                    if data_format_fun is not None:
                        data = data_format_fun(data)
                    # 排序
                    data = data.sort_values(sort_columns)
                    data.to_csv(file_path)
                elif select_index!=-1:
                    break
                elif stop_date < start_date_str:
                    raise Exception("读取数据异常,时间超出最大值!")
            start_date = end_date
pass
class UrBiGetDatas(UrBiGetDatasBase):
    def __init__(
        self,
        host=&#39;10.2.32.22&#39;,
        port=21051,
        database=&#39;ur_ai_dw&#39;,
        auth_mechanism=&#39;LDAP&#39;,
        user=&#39;urbi&#39;,
        password=&#39;Ur#730xd&#39;,
        save_dir=&#39;./hjx/data/ur_bi_dw_data&#39;,
        logger:logging.Logger=None
        ):
        self.save_dir = save_dir
        self.logger = logger
        super().__init__(
            host=host,
            port=port,
            database=database,
            auth_mechanism=auth_mechanism,
            user=user,
            password=password,
            save_dir=save_dir,
            logger=logger
            )
    def get_dim_date(self):
        &#39;&#39;&#39;日期数据&#39;&#39;&#39;
        file_path = os.path.join(self.save_dir,&#39;ur_bi_dw.dim_date.csv&#39;)
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = &#39;SELECT * FROM ur_bi_dw.dim_date&#39;
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:&#39;dim_date.&#39;+c for c in columns}
            data = data.rename(columns=columns)
            data = data.sort_values([&#39;dim_date.date_key&#39;])
            data.to_csv(file_path)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_shop(self):
        &#39;&#39;&#39;店铺数据&#39;&#39;&#39;
        file_path = os.path.join(self.save_dir,&#39;ur_bi_dw.dim_shop.csv&#39;)
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = &#39;SELECT * FROM ur_bi_dw.dim_shop&#39;
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:&#39;dim_shop.&#39;+c for c in columns}
            data = data.rename(columns=columns)
            data = data.sort_values([&#39;dim_shop.shop_no&#39;])
            data.to_csv(file_path)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_vip(self):
        &#39;&#39;&#39;会员数据&#39;&#39;&#39;
        sub_dir = os.path.join(self.save_dir,&#39;vip_no&#39;)
        now_lock = self.get_lock(sub_dir)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(sub_dir):
                return
            sql = &#39;&#39;&#39;SELECT dv.*, dd.date_key, dd.date_name2 
            FROM ur_bi_dw.dim_vip as dv
            INNER JOIN ur_bi_dw.dim_date as dd
            ON dv.card_create_date=dd.date_name2 
            where dd.date_key >= %s
            and dd.date_key < %s&#39;&#39;&#39;
            # data:pd.DataFrame = self.db_helper.get_data(sql)
            sort_columns = [&#39;dv.vip_no&#39;]
            # TODO:
            self.get_data_of_date(
                save_dir=sub_dir,
                sql=sql,
                sort_columns=sort_columns,
                start_date=datetime.datetime(2017, 1, 1), # 开始时间
                offset=relativedelta(years=1)
            )
            # 更新超时时间
            self.save_last_time(sub_dir)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_weather(self):
        &#39;&#39;&#39;天气数据&#39;&#39;&#39;
        sub_dir = os.path.join(self.save_dir,&#39;weather&#39;)
        now_lock = self.get_lock(sub_dir)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(sub_dir):
                return
            sql = """
            select weather.* from ur_bi_ods.ods_base_weather_data_1200 as weather
            where weather.date_key>=%s and weather.date_key<%s
            """
            sort_columns = [&#39;weather.date_key&#39;,&#39;weather.areaid&#39;]
            def data_format_fun(data):
                columns = list(data.columns)
                columns = {c:&#39;weather.&#39;+c for c in columns}
                data = data.rename(columns=columns)
                return data
            self.get_data_of_date(
                save_dir=sub_dir,
                sql=sql,
                sort_columns=sort_columns,
                del_index_list=[-2, -1], # 删除最后下标
                data_format_fun=data_format_fun,
            )
            # 更新超时时间
            self.save_last_time(sub_dir)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_weather_city(self):
        &#39;&#39;&#39;天气城市数据&#39;&#39;&#39;
        file_path = os.path.join(self.save_dir,&#39;ur_bi_dw.weather_city.csv&#39;)
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = &#39;SELECT * FROM ur_bi_dw.dim_weather_city as weather_city&#39;
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:&#39;weather_city.&#39;+c for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_goods(self):
        &#39;&#39;&#39;货品数据&#39;&#39;&#39;
        file_path = os.path.join(self.save_dir,&#39;ur_bi_dw.dim_goods.csv&#39;)
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = &#39;SELECT * FROM ur_bi_dw.dim_goods&#39;
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:&#39;dim_goods.&#39;+c for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_goods_market_shop_date(self):
        &#39;&#39;&#39;店铺商品生命周期数据&#39;&#39;&#39;
        file_path = os.path.join(self.save_dir,&#39;ur_bi_dw.dim_goods_market_shop_date.csv&#39;)
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            # sql = &#39;SELECT * FROM ur_bi_dw.dim_goods_market_shop_date as goods_shop_date&#39;
            sql = &#39;&#39;&#39;
            select shop_no, sku_no, shop_market_date, lifecycle_end_date, lifecycle_days
            FROM ur_bi_dw.dim_goods_market_shop_date
            where lifecycle_end_date is not null
            &#39;&#39;&#39;
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace(&#39;lifecycle_end_date.&#39;,&#39;&#39;) for c in columns}
            data = data.rename(columns=columns)
            data = data.sort_values([&#39;shop_market_date&#39;])
            data.to_csv(file_path, index=False)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_goods_market_date(self):
        &#39;&#39;&#39;全国商品生命周期数据&#39;&#39;&#39;
        file_path = os.path.join(self.save_dir,&#39;ur_bi_dw.dim_goods_market_date.csv&#39;)
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = &#39;&#39;&#39;
            select * FROM ur_bi_dw.dim_goods_market_date
            &#39;&#39;&#39;
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:&#39;dim_goods_market_date.&#39;+c for c in columns}
            data = data.rename(columns=columns)
            data = data.sort_values([&#39;dim_goods_market_date.sku_no&#39;])
            data.to_csv(file_path, index=False)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_goods_color_dev_sizes(self):
        &#39;&#39;&#39;商品开发码数数据&#39;&#39;&#39;
        file_path = os.path.join(self.save_dir,&#39;dim_goods_color_dev_sizes.csv&#39;)
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            # sql = &#39;SELECT * FROM ur_bi_dw.dim_goods_market_shop_date as goods_shop_date&#39;
            sql = &#39;SELECT * FROM ur_bi_dm.dim_goods_color_dev_sizes&#39;
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace(&#39;dim_goods_color_dev_sizes.&#39;,&#39;&#39;) for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path, index=False)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dwd_daily_sales_size(self):
        &#39;&#39;&#39;实际销售金额&#39;&#39;&#39;
        sub_dir = os.path.join(self.save_dir,&#39;dwd_daily_sales_size_all&#39;)
        now_lock = self.get_lock(sub_dir)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(sub_dir):
                return
            sql = """
            select shop_no,sku_no,date_key,`size`,
                sum(tag_price) as `tag_price`,
                sum(sales_qty) as `sales_qty`,
                sum(sales_tag_amt) as `sales_tag_amt`,
                sum(sales_amt) as `sales_amt`,
                count(0) as `sales_count`
            from ur_bi_dw.dwd_daily_sales_size as sales
            where sales.date_key>=%s and sales.date_key<%s
                and sales.currency_code=&#39;CNY&#39;
            group by shop_no,sku_no,date_key,`size`
            """
            sort_columns = [&#39;date_key&#39;,&#39;shop_no&#39;,&#39;sku_no&#39;]
            self.get_data_of_date(
                save_dir=sub_dir,
                sql=sql,
                sort_columns=sort_columns,
                start_date=datetime.datetime(2017, 1, 1), # 开始时间
            )
            # 更新超时时间
            self.save_last_time(sub_dir)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dwd_daily_delivery_size(self):
        &#39;&#39;&#39;实际配货金额&#39;&#39;&#39;
        sub_dir = os.path.join(self.save_dir,&#39;dwd_daily_delivery_size_all&#39;)
        now_lock = self.get_lock(sub_dir)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(sub_dir):
                return
            sql = """
            select shop_no,sku_no,date_key,`size`,
                sum(delivery.shop_distr_received_qty) as `shop_distr_received_qty`,
                sum(delivery.shop_distr_received_amt) as `shop_distr_received_amt`,
                sum(delivery.online_distr_received_qty) as `online_distr_received_qty`,
                sum(delivery.online_distr_received_amt) as `online_distr_received_amt`,
                sum(delivery.pr_received_qty) as `pr_received_qty`,
                count(0) as `delivery_count`
            from ur_bi_dw.dwd_daily_delivery_size as delivery
            where delivery.date_key>=%s and delivery.date_key<%s
                and delivery.currency_code=&#39;CNY&#39;
            group by shop_no,sku_no,date_key,`size`
            """
            sort_columns = [&#39;date_key&#39;,&#39;shop_no&#39;,&#39;sku_no&#39;]
            self.get_data_of_date(
                save_dir=sub_dir,
                sql=sql,
                sort_columns=sort_columns,
                start_date=datetime.datetime(2017, 1, 1), # 开始时间
            )
            # 更新超时时间
            self.save_last_time(sub_dir)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_v_last_nation_sales_status(self):
        &#39;&#39;&#39;商品畅滞销数据&#39;&#39;&#39;
        file_path = os.path.join(self.save_dir,&#39;v_last_nation_sales_status.csv&#39;)
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = &#39;SELECT * FROM ur_bi_dw.v_last_nation_sales_status&#39;
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace(&#39;v_last_nation_sales_status.&#39;,&#39;&#39;) for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path, index=False)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dwd_daily_finacial_goods(self):
        &#39;&#39;&#39;商品成本价数据&#39;&#39;&#39;
        file_path = os.path.join(self.save_dir,&#39;dwd_daily_finacial_goods.csv&#39;)
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = """
            select t1.sku_no,t1.`size`,t1.cost_tax_incl from ur_bi_dw.dwd_daily_finacial_goods as t1
            inner join (
                select sku_no,`size`,max(date_key) as date_key
                from ur_bi_dw.dwd_daily_finacial_goods
                where currency_code=&#39;CNY&#39; and country_code=&#39;CN&#39;
                group by sku_no,`size`
            ) as t2
            on t2.sku_no=t1.sku_no
                and t2.`size`=t1.`size`
                and t2.date_key=t1.date_key
            where t1.currency_code=&#39;CNY&#39; and t1.country_code=&#39;CN&#39;
            """
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace(&#39;t1.&#39;,&#39;&#39;) for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path, index=False)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_dim_size_group(self):
        &#39;&#39;&#39;尺码映射数据&#39;&#39;&#39;
        file_path = os.path.join(self.save_dir,&#39;dim_size_group.csv&#39;)
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = """select * from ur_bi_dw.dim_size_group"""
            data:pd.DataFrame = self.db_helper.get_data(sql)
            columns = list(data.columns)
            columns = {c:c.replace(&#39;dim_size_group.&#39;,&#39;&#39;) for c in columns}
            data = data.rename(columns=columns)
            data.to_csv(file_path, index=False)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
pass
def get_common_datas(
    host=&#39;10.2.32.22&#39;,
    port=21051,
    database=&#39;ur_ai_dw&#39;,
    auth_mechanism=&#39;LDAP&#39;,
    user=&#39;urbi&#39;,
    password=&#39;Ur#730xd&#39;,
    logger:logging.Logger=None):
    # 共用文件
    common_datas_dir = ShareArgs.get_args_value(&#39;common_datas_dir&#39;)
    common_ur_bi_dir = os.path.join(common_datas_dir, &#39;ur_bi_data&#39;)
    ur_bi_get_datas = UrBiGetDatas(
        host=host,
        port=port,
        database=database,
        auth_mechanism=auth_mechanism,
        user=user,
        password=password,
        save_dir=common_ur_bi_dir,
        logger=logger
    )
    try:
        logger.info(&#39;正在查询日期数据...&#39;)
        ur_bi_get_datas.get_dim_date()
        logger.info(&#39;查询日期数据完成!&#39;)
        logger.info(&#39;正在查询店铺数据...&#39;)
        ur_bi_get_datas.get_dim_shop()
        logger.info(&#39;查询店铺数据完成!&#39;)
        logger.info(&#39;正在查询天气数据...&#39;)
        ur_bi_get_datas.get_weather()
        logger.info(&#39;查询天气数据完成!&#39;)
        logger.info(&#39;正在查询天气城市数据...&#39;)
        ur_bi_get_datas.get_weather_city()
        logger.info(&#39;查询天气城市数据完成!&#39;)
        logger.info(&#39;正在查询货品数据...&#39;)
        ur_bi_get_datas.get_dim_goods()
        logger.info(&#39;查询货品数据完成!&#39;)
        logger.info(&#39;正在查询实际销量数据...&#39;)
        ur_bi_get_datas.get_dwd_daily_sales_size()
        logger.info(&#39;查询实际销量数据完成!&#39;)
    except Exception as ex:
        logger.exception(ex)
        raise ex # 往外抛出异常
    finally:
        ur_bi_get_datas.close()
pass
class CustomUrBiGetDatas(UrBiGetDatasBase):
    def __init__(
        self,
        host=&#39;10.2.32.22&#39;,
        port=21051,
        database=&#39;ur_ai_dw&#39;,
        auth_mechanism=&#39;LDAP&#39;,
        user=&#39;urbi&#39;,
        password=&#39;Ur#730xd&#39;,
        save_dir=&#39;./hjx/data/ur_bi_data&#39;,
        logger:logging.Logger=None
        ):
        self.save_dir = save_dir
        self.logger = logger
        super().__init__(
            host=host,
            port=port,
            database=database,
            auth_mechanism=auth_mechanism,
            user=user,
            password=password,
            save_dir=save_dir,
            logger=logger
            )
    def get_sales_goal_amt(self):
        &#39;&#39;&#39;销售目标金额&#39;&#39;&#39;
        file_path = os.path.join(self.save_dir,&#39;month_of_year_sales_goal_amt.csv&#39;)
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = &#39;&#39;&#39;
            select sales_goal.shop_no,
                if(sales_goal.serial=&#39;Y&#39;,&#39;W&#39;,sales_goal.serial) as `sales_goal.serial`,
                dates.month_of_year,
                sum(sales_goal.sales_goal_amt) as sales_goal_amt
            from ur_bi_dw.dwd_sales_goal_west as sales_goal
            inner join ur_bi_dw.dim_date as dates
                on sales_goal.date_key = dates.date_key
            group by sales_goal.shop_no,
                if(sales_goal.serial=&#39;Y&#39;,&#39;W&#39;,sales_goal.serial),
                dates.month_of_year
            &#39;&#39;&#39;
            data:pd.DataFrame = self.db_helper.get_data(sql)
            data = data.rename(columns={
                &#39;shop_no&#39;:&#39;sales_goal.shop_no&#39;,
                &#39;serial&#39;:&#39;sales_goal.serial&#39;,
                &#39;month_of_year&#39;:&#39;dates.month_of_year&#39;,
            })
            # 排序
            data = data.sort_values([&#39;sales_goal.shop_no&#39;,&#39;sales_goal.serial&#39;,&#39;dates.month_of_year&#39;])
            data.to_csv(file_path)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
    def get_shop_serial_area(self):
        &#39;&#39;&#39;店-系列面积&#39;&#39;&#39;
        file_path = os.path.join(self.save_dir,&#39;shop_serial_area.csv&#39;)
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            if not self.get_last_time(file_path):
                return
            sql = &#39;&#39;&#39;
            select shop_serial_area.shop_no,
                if(shop_serial_area.serial=&#39;Y&#39;,&#39;W&#39;,shop_serial_area.serial) as `shop_serial_area.serial`,
                shop_serial_area.month_of_year,
                sum(shop_serial_area.area) as `shop_serial_area.area`
            from ur_bi_dw.dwd_shop_serial_area as shop_serial_area
            where shop_serial_area.area is not null
            group by shop_serial_area.shop_no,if(shop_serial_area.serial=&#39;Y&#39;,&#39;W&#39;,shop_serial_area.serial),shop_serial_area.month_of_year
            &#39;&#39;&#39;
            data:pd.DataFrame = self.db_helper.get_data(sql)
            data = data.rename(columns={
                &#39;shop_no&#39;:&#39;shop_serial_area.shop_no&#39;,
                &#39;serial&#39;:&#39;shop_serial_area.serial&#39;,
                &#39;month_of_year&#39;:&#39;shop_serial_area.month_of_year&#39;,
                &#39;area&#39;:&#39;shop_serial_area.area&#39;,
            })
            # 排序
            data = data.sort_values([&#39;shop_serial_area.shop_no&#39;,&#39;shop_serial_area.serial&#39;,&#39;shop_serial_area.month_of_year&#39;])
            data.to_csv(file_path)
            # 更新超时时间
            self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
pass
def get_datas(
    host=&#39;10.2.32.22&#39;,
    port=21051,
    database=&#39;ur_ai_dw&#39;,
    auth_mechanism=&#39;LDAP&#39;,
    user=&#39;urbi&#39;,
    password=&#39;Ur#730xd&#39;,
    save_dir=&#39;./data/sales_forecast/ur_bi_dw_data&#39;,
    logger:logging.Logger=None):
    ur_bi_get_datas = CustomUrBiGetDatas(
        host=host,
        port=port,
        database=database,
        auth_mechanism=auth_mechanism,
        user=user,
        password=password,
        save_dir=save_dir,
        logger=logger
    )
    try:
        # 店,系列,品类,年月,销售目标金额
        logger.info(&#39;正在查询年月销售目标金额数据...&#39;)
        ur_bi_get_datas.get_sales_goal_amt()
        logger.info(&#39;查询年月销售目标金额数据完成!&#39;)
    except Exception as ex:
        logger.exception(ex)
        raise ex # 往外抛出异常
    finally:
        ur_bi_get_datas.close()
pass
def getdata_ur_bi_dw(
    host=&#39;10.2.32.22&#39;,
    port=21051,
    database=&#39;ur_ai_dw&#39;,
    auth_mechanism=&#39;LDAP&#39;,
    user=&#39;urbi&#39;,
    password=&#39;Ur#730xd&#39;,
    save_dir=&#39;./data/sales_forecast/ur_bi_dw_data&#39;,
    logger=None
):
    get_common_datas(
        host=host,
        port=port,
        database=database,
        auth_mechanism=auth_mechanism,
        user=user,
        password=password,
        logger=logger
    )
    get_datas(
        host=host,
        port=port,
        database=database,
        auth_mechanism=auth_mechanism,
        user=user,
        password=password,
        save_dir=save_dir,
        logger=logger
    )
pass
# 代码入口
# getdata_ur_bi_dw(
#     host=ur_bi_dw_host,
#     port=ur_bi_dw_port,
#     database=ur_bi_dw_database,
#     auth_mechanism=ur_bi_dw_auth_mechanism,
#     user=ur_bi_dw_user,
#     password=ur_bi_dw_password,
#     save_dir=ur_bi_dw_save_dir,
#     logger=logger
#     )
ログイン後にコピー

コードの説明と理解

各クラスの特定の関数の説明。コードは、次のテキストの説明に従って「食べる」必要があります。

(第 1 層) HiveHelper は、データベースへの接続、データベース接続の終了、トランザクションの生成、実行、エンジン、接続などの機能を完了します。

VarsHelper は、保存できる単純な永続化関数を提供します。オブジェクトをファイルに変換し、ディスクに保存します。値の設定、値の取得、および値が存在するかどうかの判断のためのメソッドを提供します。

GlobalShareArgs は辞書を提供し、辞書の取得、辞書の設定、辞書のキーと値のペアの設定、辞書のキーと値のペアの設定を行うためのメソッドを提供します。辞書キーの値、およびキーが辞書内にあるかどうかを判断し、辞書を更新します。その他のメソッド

ShareArgs は GlobalShareArgs と似ていますが、辞書の初期化でより多くのキーと値のペアが含まれる点が異なります

(第2層) UrBiGetDataBaseクラスが提供するスレッドロック辞書、時間辞書、タイムアウト判定辞書はいずれもクラス変数であり、HiveHelperクラスを使用しますが、継承されないので注意してください。具体的なSQL読み込みでは、スレッド固定や時刻判定を提供する

(第3層) UrBiGetDatasクラスで、ハイブデータベースから日付データ、店舗データ、会員データ、天気データ、気象都市データなどを取得します。データ、店舗ライフサイクルデータ、国家製品ライフサイクルデータ、商品開発コードデータ、販売実数データ、流通実数データ、売れ残りデータ、製品原価データ、サイズマッピングデータなど

(第4層) get_common_data関数、URBiGetDataクラスを使用して日付、店舗、天気、気象都市、商品、販売実績データを読み込み、フォルダー ./yongjian/data/ur_bi_data

#にキャッシュします。 ##CustomUrBiGetDataクラスは、UrBiGetDatasBaseクラスを継承し、売上目標金額と点系列エリアデータを読み込みます。

(これも第 4 層です) get_datas 関数は、CustomUrBiGetData クラスを通じて年間および月間の売上目標金額を読み取ります。

一般関数: (これは一般的な呼び出しエントリ関数です) get_data_ur_bi_dw 関数は、get_common_data 関数と get_datas 関数を呼び出してデータを読み取り、データを特定のフォルダー ディレクトリに保存します。

同様に、ハイブ データベースでない場合は、最初の層を mysql に置き換えることができます。ホームページには交換方法が説明されています。 2層目は変更する必要はありません。3層目は読みたいデータテーブルです。データベースが異なれば読みたいデータテーブルも異なるため、ここにSQLを書く必要があります。内部のメソッドを適用するだけです。基本的には変更するだけです。 SQL。

この方法の利点は、データを繰り返し読み取らず、読み取ったデータを効率的に利用できることです。

mysql

import logging
import pandas as pd
from impala.dbapi import connect
import sqlalchemy
from sqlalchemy.orm import sessionmaker
import os
import time
import os
import datetime
from dateutil.relativedelta import relativedelta
from typing import Dict, List
import logging
import threading
import pandas as pd
import pickle
class MySqlHelper(object):
    def __init__(
        self,
        host=&#39;192.168.15.144&#39;,
        port=3306,
        database=&#39;test_ims&#39;,
        user=&#39;spkjz_writer&#39;,
        password=&#39;7cmoP3QDtueVJQj2q4Az&#39;,
        logger:logging.Logger=None
        ):
        self.host = host
        self.port = port
        self.database = database
        self.user = user
        self.password = password
        self.logger = logger
        self.connection_str = &#39;mysql+pymysql://%s:%s@%s:%d/%s&#39; %(
            self.user, self.password, self.host, self.port, self.database
        )
        self.conn = None
        self.cursor = None
        self.engine = None
        self.session = None
    def create_table_code(self, file_name):
        &#39;&#39;&#39;创建表类代码&#39;&#39;&#39;
        os.system(f&#39;sqlacodegen {self.connection_str} > {file_name}&#39;)
        return self.conn
    def get_conn(self):
        &#39;&#39;&#39;创建连接或获取连接&#39;&#39;&#39;
        if self.conn is None:
            engine = self.get_engine()
            self.conn = engine.connect()
        return self.conn
    def get_engine(self):
        &#39;&#39;&#39;创建连接或获取连接&#39;&#39;&#39;
        if self.engine is None:
            self.engine = sqlalchemy.create_engine(self.connection_str)
        return self.engine
    def get_cursor(self):
        &#39;&#39;&#39;创建连接或获取连接&#39;&#39;&#39;
        if self.cursor is None:
            self.cursor = self.conn.cursor()
        return self.cursor
    def get_session(self) -> sessionmaker:
        &#39;&#39;&#39;创建连接或获取连接&#39;&#39;&#39;
        if self.session is None:
            engine = self.get_engine()
            Session = sessionmaker(bind=engine)
            self.session = Session()
        return self.session
    def close_conn(self):
        &#39;&#39;&#39;关闭连接&#39;&#39;&#39;
        if self.conn is not None:
            self.conn.close()
            self.conn = None
        self.dispose_engine()
    def close_session(self):
        &#39;&#39;&#39;关闭连接&#39;&#39;&#39;
        if self.session is not None:
            self.session.close()
            self.session = None
        self.dispose_engine()
    def dispose_engine(self):
        &#39;&#39;&#39;释放engine&#39;&#39;&#39;
        if self.engine is not None:
            # self.engine.dispose(close=False)
            self.engine.dispose()
            self.engine = None
    def close_cursor(self):
        &#39;&#39;&#39;关闭cursor&#39;&#39;&#39;
        if self.cursor is not None:
            self.cursor.close()
            self.cursor = None
    def get_data(self, sql, auto_close=True) -> pd.DataFrame:
        &#39;&#39;&#39;查询数据&#39;&#39;&#39;
        conn = self.get_conn()
        data = None
        try:
            # 异常重试3次
            for i in range(3):
                try:
                    data = pd.read_sql(sql, conn)
                    break
                except Exception as ex:
                    if i == 2:
                        raise ex # 往外抛出异常
                    time.sleep(60) # 一分钟后重试
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            if auto_close:
                self.close_conn()
        return data
pass
class VarsHelper():
    def __init__(self, save_dir, auto_save=True):
        self.save_dir = save_dir
        self.auto_save = auto_save
        self.values = {}
        if not os.path.exists(os.path.dirname(self.save_dir)):
            os.makedirs(os.path.dirname(self.save_dir))
        if os.path.exists(self.save_dir):
            with open(self.save_dir, &#39;rb&#39;) as f:
                self.values = pickle.load(f)
                f.close()
    def set_value(self, key, value):
        self.values[key] = value
        if self.auto_save:
            self.save_file()
    def get_value(self, key):
        return self.values[key]
    def has_key(self, key):
        return key in self.values.keys()
    def save_file(self):
        with open(self.save_dir, &#39;wb&#39;) as f:
            pickle.dump(self.values, f)
            f.close()
pass
class GlobalShareArgs():
    args = {
        "debug": False
    }
    def get_args():
        return GlobalShareArgs.args
    def set_args(args):
        GlobalShareArgs.args = args
    def set_args_value(key, value):
        GlobalShareArgs.args[key] = value
    def get_args_value(key, default_value=None):
        return GlobalShareArgs.args.get(key, default_value)
    def contain_key(key):
        return key in GlobalShareArgs.args.keys()
    def update(args):
        GlobalShareArgs.args.update(args)
pass
class ShareArgs():
    args = {
        "labels_dir":"./hjx/shop_group/month_w_amt/data/labels", # 标签目录
        "labels_output_dir":"./hjx/shop_group/month_w_amt/data/labels_output", # 聚类导出标签目录
        "common_datas_dir":"./hjx/data", # 共用数据目录。ur_bi_dw的公共
        "only_predict": False, # 只识别,不训练
        "delete_model": True, # 先删除模型,仅在训练时使用
        "export_excel": False, # 导出excel
        "classes": 12, # 聚类数
        "batch_size": 16,
        "hidden_size": 32,
        "max_nrof_epochs": 100,
        "learning_rate": 0.0005,
        "loss_type": "categorical_crossentropy",
        "avg_model_num": 10,
        "steps_per_epoch": 4.0, # 4.0
        "lr_callback_patience": 4, 
        "lr_callback_cooldown": 1,
        "early_stopping_callback_patience": 6,
        "get_data": True,
    }
    def get_args():
        return ShareArgs.args
    def set_args(args):
        ShareArgs.args = args
    def set_args_value(key, value):
        ShareArgs.args[key] = value
    def get_args_value(key, default_value=None):
        return ShareArgs.args.get(key, default_value)
    def contain_key(key):
        return key in ShareArgs.args.keys()
    def update(args):
        ShareArgs.args.update(args)
pass
class IMSGetDatasBase():
    # 线程锁列表,同保存路径共用锁
    lock_dict:Dict[str, threading.Lock] = {}
    # 时间列表,用于判断是否超时
    time_dict:Dict[str, datetime.datetime] = {}
    # 用于记录是否需要更新超时时间
    get_data_timeout_dict:Dict[str, bool] = {}
    def __init__(
        self,
        host=&#39;192.168.15.144&#39;,
        port=3306,
        database=&#39;test_ims&#39;,
        user=&#39;spkjz_writer&#39;,
        password=&#39;Ur#7cmoP3QDtueVJQj2q4Az&#39;,
        save_dir=None,
        logger:logging.Logger=None,
        ):
        self.save_dir = save_dir
        self.logger = logger
        self.db_helper = MySqlHelper(
            host=host,
            port=port,
            database=database,
            user=user,
            password=password,
            logger=logger
            )
        # 创建子目录
        if self.save_dir is not None and not os.path.exists(self.save_dir):
            os.makedirs(self.save_dir)
        self.vars_helper = None
        if GlobalShareArgs.get_args_value(&#39;debug&#39;):
            self.vars_helper = VarsHelper(&#39;./hjx/data/vars/IMSGetDatas&#39;) # 把超时时间保存到文件,注释该行即可停掉,只用于调试
    def close(self):
        &#39;&#39;&#39;关闭连接&#39;&#39;&#39;
        self.db_helper.close_conn()
    def get_last_time(self, key_name) -> bool:
        &#39;&#39;&#39;获取是否超时&#39;&#39;&#39;
        # 转静态路径,确保唯一性
        key_name = os.path.abspath(key_name)
        if self.vars_helper is not None and self.vars_helper.has_key(&#39;IMSGetDatasBase.time_list&#39;):
            IMSGetDatasBase.time_dict = self.vars_helper.get_value(&#39;IMSGetDatasBase.time_list&#39;)
        timeout = 12 # 12小时
        if GlobalShareArgs.get_args_value(&#39;debug&#39;):
            timeout = 24 # 24小时
        get_data_timeout = False
        if key_name not in IMSGetDatasBase.time_dict.keys() or (datetime.datetime.today() - IMSGetDatasBase.time_dict[key_name]).total_seconds()>(4*60*60):
            self.logger.info(&#39;超时%d小时,重新查数据:%s&#39;, timeout, key_name)
            # IMSGetDatasBase.time_list[key_name] = datetime.datetime.today()
            get_data_timeout = True
        else:
            self.logger.info(&#39;未超时%d小时,跳过查数据:%s&#39;, timeout, key_name)
        # if self.vars_helper is not None :
        #     self.vars_helper.set_value(&#39;IMSGetDatasBase.time_list&#39;, IMSGetDatasBase.time_list)
        IMSGetDatasBase.get_data_timeout_dict[key_name] = get_data_timeout
        return get_data_timeout
    def save_last_time(self, key_name):
        &#39;&#39;&#39;更新状态超时&#39;&#39;&#39;
        # 转静态路径,确保唯一性
        key_name = os.path.abspath(key_name)
        if IMSGetDatasBase.get_data_timeout_dict[key_name]:
            IMSGetDatasBase.time_dict[key_name] = datetime.datetime.today()
        if self.vars_helper is not None :
            IMSGetDatasBase.time_dict[key_name] = datetime.datetime.today()
            self.vars_helper.set_value(&#39;IMSGetDatasBase.time_list&#39;, IMSGetDatasBase.time_dict)
    def get_lock(self, key_name) -> threading.Lock:
        &#39;&#39;&#39;获取锁&#39;&#39;&#39;
        # 转静态路径,确保唯一性
        key_name = os.path.abspath(key_name)
        if key_name not in IMSGetDatasBase.lock_dict.keys():
            IMSGetDatasBase.lock_dict[key_name] = threading.Lock()
        return IMSGetDatasBase.lock_dict[key_name]
    def get_data_of_date(
        self,
        save_dir,
        sql,
        sort_columns:List[str],
        del_index_list=[-1], # 删除最后下标
        start_date = datetime.datetime(2017, 1, 1), # 开始时间
        offset = relativedelta(months=3), # 时间间隔
        date_format_fun = lambda d: &#39;%04d%02d01&#39; % (d.year, d.month), # 查询语句中替代时间参数的格式化
        filename_format_fun = lambda d: &#39;%04d%02d.csv&#39; % (d.year, d.month), # 查询语句中替代时间参数的格式化
        stop_date = &#39;20700101&#39;, # 超过时间则停止
        ):
        &#39;&#39;&#39;分时间增量读取数据&#39;&#39;&#39;
        # 创建文件夹
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)
        else:
            #删除最后一个文件
            file_list = os.listdir(save_dir)
            if len(file_list)>0:
                file_list.sort()
                for del_index in del_index_list:
                    os.remove(os.path.join(save_dir,file_list[del_index]))
                    print(&#39;删除最后一个文件:&#39;, file_list[del_index])
        select_index = -1
        # start_date = datetime.datetime(2017, 1, 1)
        while True:
            end_date = start_date + offset
            start_date_str = date_format_fun(start_date)
            end_date_str = date_format_fun(end_date)
            self.logger.info(&#39;date: %s-%s&#39;, start_date_str, end_date_str)
            file_path = os.path.join(save_dir, filename_format_fun(start_date))
            # self.logger.info(&#39;file_path: %s&#39;, file_path)
            if not os.path.exists(file_path):
                data:pd.DataFrame = self.db_helper.get_data(sql % (start_date_str, end_date_str))
                if data is None:
                    break
                self.logger.info(&#39;data: %d&#39;, len(data))
                # self.logger.info(&#39;data: %d&#39;, data.columns)
                if len(data)>0:
                    select_index+=1
                    # 排序
                    data = data.sort_values(sort_columns)
                    data.to_csv(file_path)
                elif select_index!=-1:
                    break
                elif stop_date < start_date_str:
                    raise Exception("读取数据异常,时间超出最大值!")
            start_date = end_date
pass
class CustomIMSGetDatas(IMSGetDatasBase):
    def __init__(
        self,
        host=&#39;192.168.13.134&#39;,
        port=4000,
        database=&#39;test_ims&#39;,
        user=&#39;root&#39;,
        password=&#39;rootimmsadmin&#39;,
        save_dir=&#39;./hjx/data/export_ims_data&#39;,
        logger:logging.Logger=None
        ):
        self.save_dir = save_dir
        self.logger = logger
        super().__init__(
            host=host,
            port=port,
            database=database,
            user=user,
            password=password,
            save_dir=save_dir,
            logger=logger
            )
    def get_ims_w_amt_pro(self):
        &#39;&#39;&#39;年月系列占比数据&#39;&#39;&#39;
        file_path = os.path.join(self.save_dir,&#39;ims_w_amt_pro.csv&#39;)
        now_lock = self.get_lock(file_path)
        now_lock.acquire() # 加锁
        try:
            # 设置超时4小时才重新查数据
            # if not self.get_last_time(file_path):
            #     return
            sql = &#39;SELECT * FROM ims_w_amt_pro&#39;
            data:pd.DataFrame = self.db_helper.get_data(sql)
            data = data.rename(columns={
                &#39;serial_forecast_proportion&#39;: &#39;forecast_proportion&#39;,
            })
            data.to_csv(file_path)
            # # 更新超时时间
            # self.save_last_time(file_path)
        except Exception as ex:
            self.logger.exception(ex)
            raise ex # 往外抛出异常
        finally:
            now_lock.release() # 释放锁
pass
def get_datas(
    host=&#39;192.168.13.134&#39;,
    port=4000,
    database=&#39;test_ims&#39;,
    user=&#39;root&#39;,
    password=&#39;rootimmsadmin&#39;,
    save_dir=&#39;./hjx/data/export_ims_data&#39;,
    logger:logging.Logger=None
    ):
    ur_bi_get_datas = CustomIMSGetDatas(
        host=host,
        port=port,
        database=database,
        user=user,
        password=password,
        save_dir=save_dir,
        logger=logger
    )
    try:
        # 年月系列占比数据
        logger.info(&#39;正在查询年月系列占比数据...&#39;)
        ur_bi_get_datas.get_ims_w_amt_pro()
        logger.info(&#39;查询年月系列占比数据完成!&#39;)
    except Exception as ex:
        logger.exception(ex)
        raise ex # 往外抛出异常
    finally:
        ur_bi_get_datas.close()
pass
def getdata_export_ims(
    host=&#39;192.168.13.134&#39;,
    port=4000,
    database=&#39;test_ims&#39;,
    user=&#39;root&#39;,
    password=&#39;rootimmsadmin&#39;,
    save_dir=&#39;./hjx/data/export_ims_data&#39;,
    logger:logging.Logger=None
    ):
    get_datas(
        host=host,
        port=port,
        database=database,
        user=user,
        password=password,
        save_dir=save_dir,
        logger=logger
    )
pass
ログイン後にコピー
に変更されたコード例を添付します。

以上がPython を使用して Hive データベースを読み取るにはどうすればよいですか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

AI Hentai Generator

AI Hentai Generator

AIヘンタイを無料で生成します。

ホットツール

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無料のコードエディター

SublimeText3 中国語版

SublimeText3 中国語版

中国語版、とても使いやすい

ゼンドスタジオ 13.0.1

ゼンドスタジオ 13.0.1

強力な PHP 統合開発環境

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開発ツール

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

PHPおよびPython:コードの例と比較 PHPおよびPython:コードの例と比較 Apr 15, 2025 am 12:07 AM

PHPとPythonには独自の利点と短所があり、選択はプロジェクトのニーズと個人的な好みに依存します。 1.PHPは、大規模なWebアプリケーションの迅速な開発とメンテナンスに適しています。 2。Pythonは、データサイエンスと機械学習の分野を支配しています。

CentosのPytorchのGPUサポートはどのようにサポートされていますか CentosのPytorchのGPUサポートはどのようにサポートされていますか Apr 14, 2025 pm 06:48 PM

Pytorch GPUアクセラレーションを有効にすることで、CentOSシステムでは、PytorchのCUDA、CUDNN、およびGPUバージョンのインストールが必要です。次の手順では、プロセスをガイドします。CUDAおよびCUDNNのインストールでは、CUDAバージョンの互換性が決定されます。NVIDIA-SMIコマンドを使用して、NVIDIAグラフィックスカードでサポートされているCUDAバージョンを表示します。たとえば、MX450グラフィックカードはCUDA11.1以上をサポートする場合があります。 cudatoolkitのダウンロードとインストール:nvidiacudatoolkitの公式Webサイトにアクセスし、グラフィックカードでサポートされている最高のCUDAバージョンに従って、対応するバージョンをダウンロードしてインストールします。 cudnnライブラリをインストールする:

Python vs. JavaScript:コミュニティ、ライブラリ、リソース Python vs. JavaScript:コミュニティ、ライブラリ、リソース Apr 15, 2025 am 12:16 AM

PythonとJavaScriptには、コミュニティ、ライブラリ、リソースの観点から、独自の利点と短所があります。 1)Pythonコミュニティはフレンドリーで初心者に適していますが、フロントエンドの開発リソースはJavaScriptほど豊富ではありません。 2)Pythonはデータサイエンスおよび機械学習ライブラリで強力ですが、JavaScriptはフロントエンド開発ライブラリとフレームワークで優れています。 3)どちらも豊富な学習リソースを持っていますが、Pythonは公式文書から始めるのに適していますが、JavaScriptはMDNWebDocsにより優れています。選択は、プロジェクトのニーズと個人的な関心に基づいている必要があります。

Dockerの原則の詳細な説明 Dockerの原則の詳細な説明 Apr 14, 2025 pm 11:57 PM

DockerはLinuxカーネル機能を使用して、効率的で孤立したアプリケーションランニング環境を提供します。その作業原則は次のとおりです。1。ミラーは、アプリケーションを実行するために必要なすべてを含む読み取り専用テンプレートとして使用されます。 2。ユニオンファイルシステム(UnionFS)は、違いを保存するだけで、スペースを節約し、高速化する複数のファイルシステムをスタックします。 3.デーモンはミラーとコンテナを管理し、クライアントはそれらをインタラクションに使用します。 4。名前空間とcgroupsは、コンテナの分離とリソースの制限を実装します。 5.複数のネットワークモードは、コンテナの相互接続をサポートします。これらのコア概念を理解することによってのみ、Dockerをよりよく利用できます。

ミニオペンCentosの互換性 ミニオペンCentosの互換性 Apr 14, 2025 pm 05:45 PM

MINIOオブジェクトストレージ:CENTOSシステムの下での高性能展開Minioは、Amazons3と互換性のあるGO言語に基づいて開発された高性能の分散オブジェクトストレージシステムです。 Java、Python、JavaScript、Goなど、さまざまなクライアント言語をサポートしています。この記事では、CentosシステムへのMinioのインストールと互換性を簡単に紹介します。 Centosバージョンの互換性Minioは、Centos7.9を含むがこれらに限定されない複数のCentosバージョンで検証されています。

CentosでPytorchの分散トレーニングを操作する方法 CentosでPytorchの分散トレーニングを操作する方法 Apr 14, 2025 pm 06:36 PM

Pytorchの分散トレーニングでは、Centosシステムでトレーニングには次の手順が必要です。Pytorchのインストール:PythonとPipがCentosシステムにインストールされていることです。 CUDAバージョンに応じて、Pytorchの公式Webサイトから適切なインストールコマンドを入手してください。 CPUのみのトレーニングには、次のコマンドを使用できます。PipinstalltorchtorchtorchvisionTorchaudioGPUサポートが必要な場合は、CUDAとCUDNNの対応するバージョンがインストールされ、インストールに対応するPytorchバージョンを使用してください。分散環境構成:分散トレーニングには、通常、複数のマシンまたは単一マシンの複数GPUが必要です。場所

CentosでPytorchバージョンを選択する方法 CentosでPytorchバージョンを選択する方法 Apr 14, 2025 pm 06:51 PM

PytorchをCentosシステムにインストールする場合、適切なバージョンを慎重に選択し、次の重要な要因を検討する必要があります。1。システム環境互換性:オペレーティングシステム:Centos7以上を使用することをお勧めします。 Cuda and Cudnn:PytorchバージョンとCudaバージョンは密接に関連しています。たとえば、pytorch1.9.0にはcuda11.1が必要ですが、pytorch2.0.1にはcuda11.3が必要です。 CUDNNバージョンは、CUDAバージョンとも一致する必要があります。 Pytorchバージョンを選択する前に、互換性のあるCUDAおよびCUDNNバージョンがインストールされていることを確認してください。 Pythonバージョン:Pytorch公式支店

Python:自動化、スクリプト、およびタスク管理 Python:自動化、スクリプト、およびタスク管理 Apr 16, 2025 am 12:14 AM

Pythonは、自動化、スクリプト、およびタスク管理に優れています。 1)自動化:OSやShutilなどの標準ライブラリを介してファイルバックアップが実現されます。 2)スクリプトの書き込み:Psutilライブラリを使用してシステムリソースを監視します。 3)タスク管理:スケジュールライブラリを使用してタスクをスケジュールします。 Pythonの使いやすさと豊富なライブラリサポートにより、これらの分野で優先ツールになります。

See all articles