Inhaltsverzeichnis
Tatsächlicher Geschäftscode zum Lesen der Hive-Datenbank
Codebeschreibung und Verständnis
Heim Backend-Entwicklung Python-Tutorial Wie lese ich eine Hive-Datenbank mit Python?

Wie lese ich eine Hive-Datenbank mit Python?

May 09, 2023 pm 04:28 PM
python hive

Tatsächlicher Geschäftscode zum Lesen der Hive-Datenbank

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

398

399

400

401

402

403

404

405

406

407

408

409

410

411

412

413

414

415

416

417

418

419

420

421

422

423

424

425

426

427

428

429

430

431

432

433

434

435

436

437

438

439

440

441

442

443

444

445

446

447

448

449

450

451

452

453

454

455

456

457

458

459

460

461

462

463

464

465

466

467

468

469

470

471

472

473

474

475

476

477

478

479

480

481

482

483

484

485

486

487

488

489

490

491

492

493

494

495

496

497

498

499

500

501

502

503

504

505

506

507

508

509

510

511

512

513

514

515

516

517

518

519

520

521

522

523

524

525

526

527

528

529

530

531

532

533

534

535

536

537

538

539

540

541

542

543

544

545

546

547

548

549

550

551

552

553

554

555

556

557

558

559

560

561

562

563

564

565

566

567

568

569

570

571

572

573

574

575

576

577

578

579

580

581

582

583

584

585

586

587

588

589

590

591

592

593

594

595

596

597

598

599

600

601

602

603

604

605

606

607

608

609

610

611

612

613

614

615

616

617

618

619

620

621

622

623

624

625

626

627

628

629

630

631

632

633

634

635

636

637

638

639

640

641

642

643

644

645

646

647

648

649

650

651

652

653

654

655

656

657

658

659

660

661

662

663

664

665

666

667

668

669

670

671

672

673

674

675

676

677

678

679

680

681

682

683

684

685

686

687

688

689

690

691

692

693

694

695

696

697

698

699

700

701

702

703

704

705

706

707

708

709

710

711

712

713

714

715

716

717

718

719

720

721

722

723

724

725

726

727

728

729

730

731

732

733

734

735

736

737

738

739

740

741

742

743

744

745

746

747

748

749

750

751

752

753

754

755

756

757

758

759

760

761

762

763

764

765

766

767

768

769

770

771

772

773

774

775

776

777

778

779

780

781

782

783

784

785

786

787

788

789

790

791

792

793

794

795

796

797

798

799

800

801

802

803

804

805

806

807

808

809

810

811

812

813

814

815

816

817

818

819

820

821

822

823

824

825

826

827

828

829

830

831

832

833

834

835

836

837

838

839

840

841

842

843

844

845

846

847

848

849

850

851

852

853

854

855

856

857

858

859

860

861

862

863

864

865

866

867

868

869

870

871

872

873

874

875

876

877

878

879

880

881

882

883

884

885

886

887

888

889

890

891

892

893

894

895

896

897

898

899

900

901

902

903

904

905

906

907

908

909

910

911

912

913

914

915

916

917

918

919

920

921

922

923

924

925

926

927

928

929

930

931

932

933

934

935

936

937

938

939

940

941

942

943

944

945

946

947

948

949

import logging

import pandas as pd

from impala.dbapi import connect

import sqlalchemy

from sqlalchemy.orm import sessionmaker

import os

import time

import os

import datetime

from dateutil.relativedelta import relativedelta

from typing import Dict, List

import logging

import threading

import pandas as pd

import pickle

class HiveHelper(object):

    def __init__(

        self,

        host='10.2.32.22',

        port=21051,

        database='ur_ai_dw',

        auth_mechanism='LDAP',

        user='urbi',

        password='Ur#730xd',

        logger:logging.Logger=None

        ):

        self.host = host

        self.port = port

        self.database = database

        self.auth_mechanism = auth_mechanism

        self.user = user

        self.password = password

        self.logger = logger

        self.impala_conn = None

        self.conn = None

        self.cursor = None

        self.engine = None

        self.session = None

    def create_table_code(self, file_name):

        '''创建表类代码'''

        os.system(f'sqlacodegen {self.connection_str} > {file_name}')

        return self.conn

    def get_conn(self):

        '''创建连接或获取连接'''

        if self.conn is None:

            engine = self.get_engine()

            self.conn = engine.connect()

        return self.conn

    def get_impala_conn(self):

        '''创建连接或获取连接'''

        if self.impala_conn is None:

            self.impala_conn = connect(

                host=self.host,

                port=self.port,

                database=self.database,

                auth_mechanism=self.auth_mechanism,

                user=self.user,

                password=self.password

                )

        return self.impala_conn

    def get_engine(self):

        '''创建连接或获取连接'''

        if self.engine is None:

            self.engine = sqlalchemy.create_engine('impala://', creator=self.get_impala_conn)

        return self.engine

    def get_cursor(self):

        '''创建连接或获取连接'''

        if self.cursor is None:

            self.cursor = self.conn.cursor()

        return self.cursor

    def get_session(self) -> sessionmaker:

        '''创建连接或获取连接'''

        if self.session is None:

            engine = self.get_engine()

            Session = sessionmaker(bind=engine)

            self.session = Session()

        return self.session

    def close_conn(self):

        '''关闭连接'''

        if self.conn is not None:

            self.conn.close()

            self.conn = None

        self.dispose_engine()

        self.close_impala_conn()

    def close_impala_conn(self):

        '''关闭impala连接'''

        if self.impala_conn is not None:

            self.impala_conn.close()

            self.impala_conn = None

    def close_session(self):

        '''关闭连接'''

        if self.session is not None:

            self.session.close()

            self.session = None

        self.dispose_engine()

    def dispose_engine(self):

        '''释放engine'''

        if self.engine is not None:

            # self.engine.dispose(close=False)

            self.engine.dispose()

            self.engine = None

    def close_cursor(self):

        '''关闭cursor'''

        if self.cursor is not None:

            self.cursor.close()

            self.cursor = None

    def get_data(self, sql, auto_close=True) -> pd.DataFrame:

        '''查询数据'''

        conn = self.get_conn()

        data = None

        try:

            # 异常重试3次

            for i in range(3):

                try:

                    data = pd.read_sql(sql, conn)

                    break

                except Exception as ex:

                    if i == 2:

                        raise ex # 往外抛出异常

                    time.sleep(60) # 一分钟后重试

        except Exception as ex:

            self.logger.exception(ex)

            raise ex # 往外抛出异常

        finally:

            if auto_close:

                self.close_conn()

        return data

pass

class VarsHelper():

    def __init__(self, save_dir, auto_save=True):

        self.save_dir = save_dir

        self.auto_save = auto_save

        self.values = {}

        if not os.path.exists(os.path.dirname(self.save_dir)):

            os.makedirs(os.path.dirname(self.save_dir))

        if os.path.exists(self.save_dir):

            with open(self.save_dir, 'rb') as f:

                self.values = pickle.load(f)

                f.close()

    def set_value(self, key, value):

        self.values[key] = value

        if self.auto_save:

            self.save_file()

    def get_value(self, key):

        return self.values[key]

    def has_key(self, key):

        return key in self.values.keys()

    def save_file(self):

        with open(self.save_dir, 'wb') as f:

            pickle.dump(self.values, f)

            f.close()

pass

class GlobalShareArgs():

    args = {

        "debug": False

    }

    def get_args():

        return GlobalShareArgs.args

    def set_args(args):

        GlobalShareArgs.args = args

    def set_args_value(key, value):

        GlobalShareArgs.args[key] = value

    def get_args_value(key, default_value=None):

        return GlobalShareArgs.args.get(key, default_value)

    def contain_key(key):

        return key in GlobalShareArgs.args.keys()

    def update(args):

        GlobalShareArgs.args.update(args)

pass

class ShareArgs():

    args = {

        "labels_dir":"./hjx/shop_group/month_w_amt/data/labels", # 标签目录

        "labels_output_dir":"./hjx/shop_group/month_w_amt/data/labels_output", # 聚类导出标签目录

        "common_datas_dir":"./hjx/data", # 共用数据目录。ur_bi_dw的公共

        "only_predict": False, # 只识别,不训练

        "delete_model": True, # 先删除模型,仅在训练时使用

        "export_excel": False, # 导出excel

        "classes": 12, # 聚类数

        "batch_size": 16,

        "hidden_size": 32,

        "max_nrof_epochs": 100,

        "learning_rate": 0.0005,

        "loss_type": "categorical_crossentropy",

        "avg_model_num": 10,

        "steps_per_epoch": 4.0, # 4.0

        "lr_callback_patience": 4,

        "lr_callback_cooldown": 1,

        "early_stopping_callback_patience": 6,

        "get_data": True,

    }

    def get_args():

        return ShareArgs.args

    def set_args(args):

        ShareArgs.args = args

    def set_args_value(key, value):

        ShareArgs.args[key] = value

    def get_args_value(key, default_value=None):

        return ShareArgs.args.get(key, default_value)

    def contain_key(key):

        return key in ShareArgs.args.keys()

    def update(args):

        ShareArgs.args.update(args)

pass

class UrBiGetDatasBase():

    # 线程锁列表,同保存路径共用锁

    lock_dict:Dict[str, threading.Lock] = {}

    # 时间列表,用于判断是否超时

    time_dict:Dict[str, datetime.datetime] = {}

    # 用于记录是否需要更新超时时间

    get_data_timeout_dict:Dict[str, bool] = {}

    def __init__(

        self,

        host='10.2.32.22',

        port=21051,

        database='ur_ai_dw',

        auth_mechanism='LDAP',

        user='urbi',

        password='Ur#730xd',

        save_dir=None,

        logger:logging.Logger=None,

        ):

        self.save_dir = save_dir

        self.logger = logger

        self.db_helper = HiveHelper(

            host=host,

            port=port,

            database=database,

            auth_mechanism=auth_mechanism,

            user=user,

            password=password,

            logger=logger

            )

        # 创建子目录

        if self.save_dir is not None and not os.path.exists(self.save_dir):

            os.makedirs(self.save_dir)

        self.vars_helper = None

        if GlobalShareArgs.get_args_value('debug'):

            self.vars_helper = VarsHelper('./hjx/data/vars/UrBiGetDatas')

    def close(self):

        '''关闭连接'''

        self.db_helper.close_conn()

    def get_last_time(self, key_name) -> bool:

        '''获取是否超时'''

        # 转静态路径,确保唯一性

        key_name = os.path.abspath(key_name)

        if self.vars_helper is not None and self.vars_helper.has_key('UrBiGetDatasBase.time_list'):

            UrBiGetDatasBase.time_dict = self.vars_helper.get_value('UrBiGetDatasBase.time_list')

        timeout = 12 # 12小时

        if GlobalShareArgs.get_args_value('debug'):

            timeout = 24 # 24小时

        get_data_timeout = False

        if key_name not in UrBiGetDatasBase.time_dict.keys() or (datetime.datetime.today() - UrBiGetDatasBase.time_dict[key_name]).total_seconds()>(timeout*60*60):

            self.logger.info('超时%d小时,重新查数据:%s', timeout, key_name)

            # UrBiGetDatasBase.time_list[key_name] = datetime.datetime.today()

            get_data_timeout = True

        else:

            self.logger.info('未超时%d小时,跳过查数据:%s', timeout, key_name)

        # if self.vars_helper is not None :

        #     self.vars_helper.set_value('UrBiGetDatasBase.time_list', UrBiGetDatasBase.time_list)

        UrBiGetDatasBase.get_data_timeout_dict[key_name] = get_data_timeout

        return get_data_timeout

    def save_last_time(self, key_name):

        '''更新状态超时'''

        # 转静态路径,确保唯一性

        key_name = os.path.abspath(key_name)

        if UrBiGetDatasBase.get_data_timeout_dict[key_name]:

            UrBiGetDatasBase.time_dict[key_name] = datetime.datetime.today()

        if self.vars_helper is not None :

            UrBiGetDatasBase.time_dict[key_name] = datetime.datetime.today()

            self.vars_helper.set_value('UrBiGetDatasBase.time_list', UrBiGetDatasBase.time_dict)

    def get_lock(self, key_name) -> threading.Lock:

        '''获取锁'''

        # 转静态路径,确保唯一性

        key_name = os.path.abspath(key_name)

        if key_name not in UrBiGetDatasBase.lock_dict.keys():

            UrBiGetDatasBase.lock_dict[key_name] = threading.Lock()

        return UrBiGetDatasBase.lock_dict[key_name]

    def get_data_of_date(

        self,

        save_dir,

        sql,

        sort_columns:List[str],

        del_index_list=[-1], # 删除最后下标

        start_date = datetime.datetime(2017, 1, 1), # 开始时间

        offset = relativedelta(months=3), # 时间间隔

        date_format_fun = lambda d: '%04d%02d01' % (d.year, d.month), # 查询语句中替代时间参数的格式化

        filename_format_fun = lambda d: '%04d%02d.csv' % (d.year, d.month), # 查询语句中替代时间参数的格式化

        stop_date = '20700101', # 超过时间则停止

        data_format_fun = None, # 格式化数据

        ):

        '''分时间增量读取数据'''

        # 创建文件夹

        if not os.path.exists(save_dir):

            os.makedirs(save_dir)

        else:

            #删除最后一个文件

            file_list = os.listdir(save_dir)

            if len(file_list)>0:

                file_list.sort()

                for del_index in del_index_list:

                    os.remove(os.path.join(save_dir,file_list[del_index]))

                    print('删除最后一个文件:', file_list[del_index])

        select_index = -1

        # start_date = datetime.datetime(2017, 1, 1)

        while True:

            end_date = start_date + offset

            start_date_str = date_format_fun(start_date)

            end_date_str = date_format_fun(end_date)

            self.logger.info('date: %s-%s', start_date_str, end_date_str)

            file_path = os.path.join(save_dir, filename_format_fun(start_date))

            # self.logger.info('file_path: %s', file_path)

            if not os.path.exists(file_path):

                data:pd.DataFrame = self.db_helper.get_data(sql % (start_date_str, end_date_str))

                if data is None:

                    break

                self.logger.info('data: %d', len(data))

                # self.logger.info('data: %d', data.columns)

                if len(data)>0:

                    select_index+=1

                    if data_format_fun is not None:

                        data = data_format_fun(data)

                    # 排序

                    data = data.sort_values(sort_columns)

                    data.to_csv(file_path)

                elif select_index!=-1:

                    break

                elif stop_date < start_date_str:

                    raise Exception("读取数据异常,时间超出最大值!")

            start_date = end_date

pass

class UrBiGetDatas(UrBiGetDatasBase):

    def __init__(

        self,

        host='10.2.32.22',

        port=21051,

        database='ur_ai_dw',

        auth_mechanism='LDAP',

        user='urbi',

        password='Ur#730xd',

        save_dir='./hjx/data/ur_bi_dw_data',

        logger:logging.Logger=None

        ):

        self.save_dir = save_dir

        self.logger = logger

        super().__init__(

            host=host,

            port=port,

            database=database,

            auth_mechanism=auth_mechanism,

            user=user,

            password=password,

            save_dir=save_dir,

            logger=logger

            )

    def get_dim_date(self):

        '''日期数据'''

        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_date.csv')

        now_lock = self.get_lock(file_path)

        now_lock.acquire() # 加锁

        try:

            # 设置超时4小时才重新查数据

            if not self.get_last_time(file_path):

                return

            sql = 'SELECT * FROM ur_bi_dw.dim_date'

            data:pd.DataFrame = self.db_helper.get_data(sql)

            columns = list(data.columns)

            columns = {c:'dim_date.'+c for c in columns}

            data = data.rename(columns=columns)

            data = data.sort_values(['dim_date.date_key'])

            data.to_csv(file_path)

            # 更新超时时间

            self.save_last_time(file_path)

        except Exception as ex:

            self.logger.exception(ex)

            raise ex # 往外抛出异常

        finally:

            now_lock.release() # 释放锁

    def get_dim_shop(self):

        '''店铺数据'''

        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_shop.csv')

        now_lock = self.get_lock(file_path)

        now_lock.acquire() # 加锁

        try:

            # 设置超时4小时才重新查数据

            if not self.get_last_time(file_path):

                return

            sql = 'SELECT * FROM ur_bi_dw.dim_shop'

            data:pd.DataFrame = self.db_helper.get_data(sql)

            columns = list(data.columns)

            columns = {c:'dim_shop.'+c for c in columns}

            data = data.rename(columns=columns)

            data = data.sort_values(['dim_shop.shop_no'])

            data.to_csv(file_path)

            # 更新超时时间

            self.save_last_time(file_path)

        except Exception as ex:

            self.logger.exception(ex)

            raise ex # 往外抛出异常

        finally:

            now_lock.release() # 释放锁

    def get_dim_vip(self):

        '''会员数据'''

        sub_dir = os.path.join(self.save_dir,'vip_no')

        now_lock = self.get_lock(sub_dir)

        now_lock.acquire() # 加锁

        try:

            # 设置超时4小时才重新查数据

            if not self.get_last_time(sub_dir):

                return

            sql = '''SELECT dv.*, dd.date_key, dd.date_name2

            FROM ur_bi_dw.dim_vip as dv

            INNER JOIN ur_bi_dw.dim_date as dd

            ON dv.card_create_date=dd.date_name2

            where dd.date_key >= %s

            and dd.date_key < %s'''

            # data:pd.DataFrame = self.db_helper.get_data(sql)

            sort_columns = ['dv.vip_no']

            # TODO:

            self.get_data_of_date(

                save_dir=sub_dir,

                sql=sql,

                sort_columns=sort_columns,

                start_date=datetime.datetime(2017, 1, 1), # 开始时间

                offset=relativedelta(years=1)

            )

            # 更新超时时间

            self.save_last_time(sub_dir)

        except Exception as ex:

            self.logger.exception(ex)

            raise ex # 往外抛出异常

        finally:

            now_lock.release() # 释放锁

    def get_weather(self):

        '''天气数据'''

        sub_dir = os.path.join(self.save_dir,'weather')

        now_lock = self.get_lock(sub_dir)

        now_lock.acquire() # 加锁

        try:

            # 设置超时4小时才重新查数据

            if not self.get_last_time(sub_dir):

                return

            sql = """

            select weather.* from ur_bi_ods.ods_base_weather_data_1200 as weather

            where weather.date_key>=%s and weather.date_key<%s

            """

            sort_columns = ['weather.date_key','weather.areaid']

            def data_format_fun(data):

                columns = list(data.columns)

                columns = {c:'weather.'+c for c in columns}

                data = data.rename(columns=columns)

                return data

            self.get_data_of_date(

                save_dir=sub_dir,

                sql=sql,

                sort_columns=sort_columns,

                del_index_list=[-2, -1], # 删除最后下标

                data_format_fun=data_format_fun,

            )

            # 更新超时时间

            self.save_last_time(sub_dir)

        except Exception as ex:

            self.logger.exception(ex)

            raise ex # 往外抛出异常

        finally:

            now_lock.release() # 释放锁

    def get_weather_city(self):

        '''天气城市数据'''

        file_path = os.path.join(self.save_dir,'ur_bi_dw.weather_city.csv')

        now_lock = self.get_lock(file_path)

        now_lock.acquire() # 加锁

        try:

            # 设置超时4小时才重新查数据

            if not self.get_last_time(file_path):

                return

            sql = 'SELECT * FROM ur_bi_dw.dim_weather_city as weather_city'

            data:pd.DataFrame = self.db_helper.get_data(sql)

            columns = list(data.columns)

            columns = {c:'weather_city.'+c for c in columns}

            data = data.rename(columns=columns)

            data.to_csv(file_path)

            # 更新超时时间

            self.save_last_time(file_path)

        except Exception as ex:

            self.logger.exception(ex)

            raise ex # 往外抛出异常

        finally:

            now_lock.release() # 释放锁

    def get_dim_goods(self):

        '''货品数据'''

        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_goods.csv')

        now_lock = self.get_lock(file_path)

        now_lock.acquire() # 加锁

        try:

            # 设置超时4小时才重新查数据

            if not self.get_last_time(file_path):

                return

            sql = 'SELECT * FROM ur_bi_dw.dim_goods'

            data:pd.DataFrame = self.db_helper.get_data(sql)

            columns = list(data.columns)

            columns = {c:'dim_goods.'+c for c in columns}

            data = data.rename(columns=columns)

            data.to_csv(file_path)

            # 更新超时时间

            self.save_last_time(file_path)

        except Exception as ex:

            self.logger.exception(ex)

            raise ex # 往外抛出异常

        finally:

            now_lock.release() # 释放锁

    def get_dim_goods_market_shop_date(self):

        '''店铺商品生命周期数据'''

        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_goods_market_shop_date.csv')

        now_lock = self.get_lock(file_path)

        now_lock.acquire() # 加锁

        try:

            # 设置超时4小时才重新查数据

            if not self.get_last_time(file_path):

                return

            # sql = 'SELECT * FROM ur_bi_dw.dim_goods_market_shop_date as goods_shop_date'

            sql = '''

            select shop_no, sku_no, shop_market_date, lifecycle_end_date, lifecycle_days

            FROM ur_bi_dw.dim_goods_market_shop_date

            where lifecycle_end_date is not null

            '''

            data:pd.DataFrame = self.db_helper.get_data(sql)

            columns = list(data.columns)

            columns = {c:c.replace('lifecycle_end_date.','') for c in columns}

            data = data.rename(columns=columns)

            data = data.sort_values(['shop_market_date'])

            data.to_csv(file_path, index=False)

            # 更新超时时间

            self.save_last_time(file_path)

        except Exception as ex:

            self.logger.exception(ex)

            raise ex # 往外抛出异常

        finally:

            now_lock.release() # 释放锁

    def get_dim_goods_market_date(self):

        '''全国商品生命周期数据'''

        file_path = os.path.join(self.save_dir,'ur_bi_dw.dim_goods_market_date.csv')

        now_lock = self.get_lock(file_path)

        now_lock.acquire() # 加锁

        try:

            # 设置超时4小时才重新查数据

            if not self.get_last_time(file_path):

                return

            sql = '''

            select * FROM ur_bi_dw.dim_goods_market_date

            '''

            data:pd.DataFrame = self.db_helper.get_data(sql)

            columns = list(data.columns)

            columns = {c:'dim_goods_market_date.'+c for c in columns}

            data = data.rename(columns=columns)

            data = data.sort_values(['dim_goods_market_date.sku_no'])

            data.to_csv(file_path, index=False)

            # 更新超时时间

            self.save_last_time(file_path)

        except Exception as ex:

            self.logger.exception(ex)

            raise ex # 往外抛出异常

        finally:

            now_lock.release() # 释放锁

    def get_dim_goods_color_dev_sizes(self):

        '''商品开发码数数据'''

        file_path = os.path.join(self.save_dir,'dim_goods_color_dev_sizes.csv')

        now_lock = self.get_lock(file_path)

        now_lock.acquire() # 加锁

        try:

            # 设置超时4小时才重新查数据

            if not self.get_last_time(file_path):

                return

            # sql = 'SELECT * FROM ur_bi_dw.dim_goods_market_shop_date as goods_shop_date'

            sql = 'SELECT * FROM ur_bi_dm.dim_goods_color_dev_sizes'

            data:pd.DataFrame = self.db_helper.get_data(sql)

            columns = list(data.columns)

            columns = {c:c.replace('dim_goods_color_dev_sizes.','') for c in columns}

            data = data.rename(columns=columns)

            data.to_csv(file_path, index=False)

            # 更新超时时间

            self.save_last_time(file_path)

        except Exception as ex:

            self.logger.exception(ex)

            raise ex # 往外抛出异常

        finally:

            now_lock.release() # 释放锁

    def get_dwd_daily_sales_size(self):

        '''实际销售金额'''

        sub_dir = os.path.join(self.save_dir,'dwd_daily_sales_size_all')

        now_lock = self.get_lock(sub_dir)

        now_lock.acquire() # 加锁

        try:

            # 设置超时4小时才重新查数据

            if not self.get_last_time(sub_dir):

                return

            sql = """

            select shop_no,sku_no,date_key,`size`,

                sum(tag_price) as `tag_price`,

                sum(sales_qty) as `sales_qty`,

                sum(sales_tag_amt) as `sales_tag_amt`,

                sum(sales_amt) as `sales_amt`,

                count(0) as `sales_count`

            from ur_bi_dw.dwd_daily_sales_size as sales

            where sales.date_key>=%s and sales.date_key<%s

                and sales.currency_code='CNY'

            group by shop_no,sku_no,date_key,`size`

            """

            sort_columns = ['date_key','shop_no','sku_no']

            self.get_data_of_date(

                save_dir=sub_dir,

                sql=sql,

                sort_columns=sort_columns,

                start_date=datetime.datetime(2017, 1, 1), # 开始时间

            )

            # 更新超时时间

            self.save_last_time(sub_dir)

        except Exception as ex:

            self.logger.exception(ex)

            raise ex # 往外抛出异常

        finally:

            now_lock.release() # 释放锁

    def get_dwd_daily_delivery_size(self):

        '''实际配货金额'''

        sub_dir = os.path.join(self.save_dir,'dwd_daily_delivery_size_all')

        now_lock = self.get_lock(sub_dir)

        now_lock.acquire() # 加锁

        try:

            # 设置超时4小时才重新查数据

            if not self.get_last_time(sub_dir):

                return

            sql = """

            select shop_no,sku_no,date_key,`size`,

                sum(delivery.shop_distr_received_qty) as `shop_distr_received_qty`,

                sum(delivery.shop_distr_received_amt) as `shop_distr_received_amt`,

                sum(delivery.online_distr_received_qty) as `online_distr_received_qty`,

                sum(delivery.online_distr_received_amt) as `online_distr_received_amt`,

                sum(delivery.pr_received_qty) as `pr_received_qty`,

                count(0) as `delivery_count`

            from ur_bi_dw.dwd_daily_delivery_size as delivery

            where delivery.date_key>=%s and delivery.date_key<%s

                and delivery.currency_code='CNY'

            group by shop_no,sku_no,date_key,`size`

            """

            sort_columns = ['date_key','shop_no','sku_no']

            self.get_data_of_date(

                save_dir=sub_dir,

                sql=sql,

                sort_columns=sort_columns,

                start_date=datetime.datetime(2017, 1, 1), # 开始时间

            )

            # 更新超时时间

            self.save_last_time(sub_dir)

        except Exception as ex:

            self.logger.exception(ex)

            raise ex # 往外抛出异常

        finally:

            now_lock.release() # 释放锁

    def get_v_last_nation_sales_status(self):

        '''商品畅滞销数据'''

        file_path = os.path.join(self.save_dir,'v_last_nation_sales_status.csv')

        now_lock = self.get_lock(file_path)

        now_lock.acquire() # 加锁

        try:

            # 设置超时4小时才重新查数据

            if not self.get_last_time(file_path):

                return

            sql = 'SELECT * FROM ur_bi_dw.v_last_nation_sales_status'

            data:pd.DataFrame = self.db_helper.get_data(sql)

            columns = list(data.columns)

            columns = {c:c.replace('v_last_nation_sales_status.','') for c in columns}

            data = data.rename(columns=columns)

            data.to_csv(file_path, index=False)

            # 更新超时时间

            self.save_last_time(file_path)

        except Exception as ex:

            self.logger.exception(ex)

            raise ex # 往外抛出异常

        finally:

            now_lock.release() # 释放锁

    def get_dwd_daily_finacial_goods(self):

        '''商品成本价数据'''

        file_path = os.path.join(self.save_dir,'dwd_daily_finacial_goods.csv')

        now_lock = self.get_lock(file_path)

        now_lock.acquire() # 加锁

        try:

            # 设置超时4小时才重新查数据

            if not self.get_last_time(file_path):

                return

            sql = """

            select t1.sku_no,t1.`size`,t1.cost_tax_incl from ur_bi_dw.dwd_daily_finacial_goods as t1

            inner join (

                select sku_no,`size`,max(date_key) as date_key

                from ur_bi_dw.dwd_daily_finacial_goods

                where currency_code='CNY' and country_code='CN'

                group by sku_no,`size`

            ) as t2

            on t2.sku_no=t1.sku_no

                and t2.`size`=t1.`size`

                and t2.date_key=t1.date_key

            where t1.currency_code='CNY' and t1.country_code='CN'

            """

            data:pd.DataFrame = self.db_helper.get_data(sql)

            columns = list(data.columns)

            columns = {c:c.replace('t1.','') for c in columns}

            data = data.rename(columns=columns)

            data.to_csv(file_path, index=False)

            # 更新超时时间

            self.save_last_time(file_path)

        except Exception as ex:

            self.logger.exception(ex)

            raise ex # 往外抛出异常

        finally:

            now_lock.release() # 释放锁

    def get_dim_size_group(self):

        '''尺码映射数据'''

        file_path = os.path.join(self.save_dir,'dim_size_group.csv')

        now_lock = self.get_lock(file_path)

        now_lock.acquire() # 加锁

        try:

            # 设置超时4小时才重新查数据

            if not self.get_last_time(file_path):

                return

            sql = """select * from ur_bi_dw.dim_size_group"""

            data:pd.DataFrame = self.db_helper.get_data(sql)

            columns = list(data.columns)

            columns = {c:c.replace('dim_size_group.','') for c in columns}

            data = data.rename(columns=columns)

            data.to_csv(file_path, index=False)

            # 更新超时时间

            self.save_last_time(file_path)

        except Exception as ex:

            self.logger.exception(ex)

            raise ex # 往外抛出异常

        finally:

            now_lock.release() # 释放锁

pass

def get_common_datas(

    host='10.2.32.22',

    port=21051,

    database='ur_ai_dw',

    auth_mechanism='LDAP',

    user='urbi',

    password='Ur#730xd',

    logger:logging.Logger=None):

    # 共用文件

    common_datas_dir = ShareArgs.get_args_value('common_datas_dir')

    common_ur_bi_dir = os.path.join(common_datas_dir, 'ur_bi_data')

    ur_bi_get_datas = UrBiGetDatas(

        host=host,

        port=port,

        database=database,

        auth_mechanism=auth_mechanism,

        user=user,

        password=password,

        save_dir=common_ur_bi_dir,

        logger=logger

    )

    try:

        logger.info('正在查询日期数据...')

        ur_bi_get_datas.get_dim_date()

        logger.info('查询日期数据完成!')

        logger.info('正在查询店铺数据...')

        ur_bi_get_datas.get_dim_shop()

        logger.info('查询店铺数据完成!')

        logger.info('正在查询天气数据...')

        ur_bi_get_datas.get_weather()

        logger.info('查询天气数据完成!')

        logger.info('正在查询天气城市数据...')

        ur_bi_get_datas.get_weather_city()

        logger.info('查询天气城市数据完成!')

        logger.info('正在查询货品数据...')

        ur_bi_get_datas.get_dim_goods()

        logger.info('查询货品数据完成!')

        logger.info('正在查询实际销量数据...')

        ur_bi_get_datas.get_dwd_daily_sales_size()

        logger.info('查询实际销量数据完成!')

    except Exception as ex:

        logger.exception(ex)

        raise ex # 往外抛出异常

    finally:

        ur_bi_get_datas.close()

pass

class CustomUrBiGetDatas(UrBiGetDatasBase):

    def __init__(

        self,

        host='10.2.32.22',

        port=21051,

        database='ur_ai_dw',

        auth_mechanism='LDAP',

        user='urbi',

        password='Ur#730xd',

        save_dir='./hjx/data/ur_bi_data',

        logger:logging.Logger=None

        ):

        self.save_dir = save_dir

        self.logger = logger

        super().__init__(

            host=host,

            port=port,

            database=database,

            auth_mechanism=auth_mechanism,

            user=user,

            password=password,

            save_dir=save_dir,

            logger=logger

            )

    def get_sales_goal_amt(self):

        '''销售目标金额'''

        file_path = os.path.join(self.save_dir,'month_of_year_sales_goal_amt.csv')

        now_lock = self.get_lock(file_path)

        now_lock.acquire() # 加锁

        try:

            # 设置超时4小时才重新查数据

            if not self.get_last_time(file_path):

                return

            sql = '''

            select sales_goal.shop_no,

                if(sales_goal.serial='Y','W',sales_goal.serial) as `sales_goal.serial`,

                dates.month_of_year,

                sum(sales_goal.sales_goal_amt) as sales_goal_amt

            from ur_bi_dw.dwd_sales_goal_west as sales_goal

            inner join ur_bi_dw.dim_date as dates

                on sales_goal.date_key = dates.date_key

            group by sales_goal.shop_no,

                if(sales_goal.serial='Y','W',sales_goal.serial),

                dates.month_of_year

            '''

            data:pd.DataFrame = self.db_helper.get_data(sql)

            data = data.rename(columns={

                'shop_no':'sales_goal.shop_no',

                'serial':'sales_goal.serial',

                'month_of_year':'dates.month_of_year',

            })

            # 排序

            data = data.sort_values(['sales_goal.shop_no','sales_goal.serial','dates.month_of_year'])

            data.to_csv(file_path)

            # 更新超时时间

            self.save_last_time(file_path)

        except Exception as ex:

            self.logger.exception(ex)

            raise ex # 往外抛出异常

        finally:

            now_lock.release() # 释放锁

    def get_shop_serial_area(self):

        '''店-系列面积'''

        file_path = os.path.join(self.save_dir,'shop_serial_area.csv')

        now_lock = self.get_lock(file_path)

        now_lock.acquire() # 加锁

        try:

            # 设置超时4小时才重新查数据

            if not self.get_last_time(file_path):

                return

            sql = '''

            select shop_serial_area.shop_no,

                if(shop_serial_area.serial='Y','W',shop_serial_area.serial) as `shop_serial_area.serial`,

                shop_serial_area.month_of_year,

                sum(shop_serial_area.area) as `shop_serial_area.area`

            from ur_bi_dw.dwd_shop_serial_area as shop_serial_area

            where shop_serial_area.area is not null

            group by shop_serial_area.shop_no,if(shop_serial_area.serial='Y','W',shop_serial_area.serial),shop_serial_area.month_of_year

            '''

            data:pd.DataFrame = self.db_helper.get_data(sql)

            data = data.rename(columns={

                'shop_no':'shop_serial_area.shop_no',

                'serial':'shop_serial_area.serial',

                'month_of_year':'shop_serial_area.month_of_year',

                'area':'shop_serial_area.area',

            })

            # 排序

            data = data.sort_values(['shop_serial_area.shop_no','shop_serial_area.serial','shop_serial_area.month_of_year'])

            data.to_csv(file_path)

            # 更新超时时间

            self.save_last_time(file_path)

        except Exception as ex:

            self.logger.exception(ex)

            raise ex # 往外抛出异常

        finally:

            now_lock.release() # 释放锁

pass

def get_datas(

    host='10.2.32.22',

    port=21051,

    database='ur_ai_dw',

    auth_mechanism='LDAP',

    user='urbi',

    password='Ur#730xd',

    save_dir='./data/sales_forecast/ur_bi_dw_data',

    logger:logging.Logger=None):

    ur_bi_get_datas = CustomUrBiGetDatas(

        host=host,

        port=port,

        database=database,

        auth_mechanism=auth_mechanism,

        user=user,

        password=password,

        save_dir=save_dir,

        logger=logger

    )

    try:

        # 店,系列,品类,年月,销售目标金额

        logger.info('正在查询年月销售目标金额数据...')

        ur_bi_get_datas.get_sales_goal_amt()

        logger.info('查询年月销售目标金额数据完成!')

    except Exception as ex:

        logger.exception(ex)

        raise ex # 往外抛出异常

    finally:

        ur_bi_get_datas.close()

pass

def getdata_ur_bi_dw(

    host='10.2.32.22',

    port=21051,

    database='ur_ai_dw',

    auth_mechanism='LDAP',

    user='urbi',

    password='Ur#730xd',

    save_dir='./data/sales_forecast/ur_bi_dw_data',

    logger=None

):

    get_common_datas(

        host=host,

        port=port,

        database=database,

        auth_mechanism=auth_mechanism,

        user=user,

        password=password,

        logger=logger

    )

    get_datas(

        host=host,

        port=port,

        database=database,

        auth_mechanism=auth_mechanism,

        user=user,

        password=password,

        save_dir=save_dir,

        logger=logger

    )

pass

# 代码入口

# getdata_ur_bi_dw(

#     host=ur_bi_dw_host,

#     port=ur_bi_dw_port,

#     database=ur_bi_dw_database,

#     auth_mechanism=ur_bi_dw_auth_mechanism,

#     user=ur_bi_dw_user,

#     password=ur_bi_dw_password,

#     save_dir=ur_bi_dw_save_dir,

#     logger=logger

#     )

Nach dem Login kopieren

Codebeschreibung und Verständnis

Die spezifische Funktionsbeschreibung jeder Klasse, der Code muss entsprechend ausgeführt werden Textbeschreibung unten „Essbar“:

(Erste Ebene) HiveHelper führt Funktionen wie das Herstellen einer Verbindung zur Datenbank, das Schließen von Datenbankverbindungen, das Generieren von Transaktionen, die Ausführung, Engines, Verbindungen usw. aus.

#🎜 🎜#VarsHelper bietet eine einfache Persistenzfunktion, mit der Objekte in Form von Dateien auf der Festplatte gespeichert werden können. Und bietet Methoden zum Festlegen von Werten, zum Abrufen von Werten und zum Beurteilen, ob Werte vorhanden sind den Wert des Wörterbuchschlüssels und beurteilen, ob sich der Schlüssel im Wörterbuch befindet, das Wörterbuch aktualisieren usw.

ShareArgs ähnelt GlobalShareArgs, außer dass bei der Initialisierung mehr Schlüssel-Wert-Paare vorhanden sind Das Wörterbuch

(Zweite Ebene) UrBiGetDataBase-Klasse stellt ein Thread-Sperrwörterbuch, ein Zeitwörterbuch und ein Timeout-Beurteilungswörterbuch bereit, die alle Klassenvariablen sind. Beachten Sie jedoch, dass dies der Fall ist wird nicht vererbt. In bestimmten SQL-Ablesungen werden Thread-Fixierung und Zeitbeurteilung bereitgestellt

(dritte Ebene) UrBiGetDatas-Klasse zum Abrufen von Datumsdaten, Speicherdaten, Mitgliedsdaten, Wetterdaten, Wetterstädten aus der Hive-Datenbank Daten, Produktdaten , Geschäftslebenszyklusdaten, nationale Produktlebenszyklusdaten, Produktentwicklungscodedaten, tatsächliche Verkaufsmenge, tatsächliche Vertriebsmenge, meistverkaufte Produktdaten, Produktkostenpreisdaten, Größenzuordnungsdaten usw.

(Vierte Ebene) get_common_data-Funktion, verwenden Sie die URBiGetData-Klasse, um Datum, Geschäft, Wetter, Wetterstadt, Waren und tatsächliche Verkaufsdaten zu lesen und sie im Ordner ./yongjian/data/ur_bi_data# zwischenzuspeichern 🎜 🎜#

CustomUrBiGetData-Klasse erbt die UrBiGetDatasBase-Klasse, um Umsatzzielbetrag und Punktreihenflächendaten zu lesen.

(Dies ist auch die vierte Ebene) Die Funktion get_datas liest den jährlichen und monatlichen Umsatzzielbetrag über die Klasse CustomUrBiGetData.

Allgemeine Funktion: (Dies ist die allgemeine Aufrufeingabefunktion) get_data_ur_bi_dw-Funktion, die die Funktionen get_common_data und get_datas aufruft, um die Daten zu lesen und die Daten dann in einem bestimmten Ordnerverzeichnis zu speichern.

Analog dazu können Sie, wenn Sie keine Hive-Datenbank haben, die erste Ebene durch MySQL ersetzen. Auf der Homepage wird erläutert, wie der Austausch durchgeführt wird. Die zweite Ebene muss nicht geändert werden. Die dritte Ebene ist die Datentabelle, die Sie lesen möchten. Daher müssen Sie hier einfach SQL schreiben die SQL.

Der Vorteil dieser Methode besteht darin, dass die Daten nicht wiederholt gelesen werden und die gelesenen Daten effizient genutzt werden können.

Anbei ist ein in MySQL geänderter Beispielcode

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

306

307

308

309

310

311

312

313

314

315

316

317

318

319

320

321

322

323

324

325

326

327

328

329

330

331

332

333

334

335

336

337

338

339

340

341

342

343

344

345

346

347

348

349

350

351

352

353

354

355

356

357

358

359

360

361

362

363

364

365

366

367

368

369

370

371

372

373

374

375

376

377

378

379

380

381

382

383

384

385

386

387

388

389

390

391

392

393

394

395

396

397

398

399

400

import logging

import pandas as pd

from impala.dbapi import connect

import sqlalchemy

from sqlalchemy.orm import sessionmaker

import os

import time

import os

import datetime

from dateutil.relativedelta import relativedelta

from typing import Dict, List

import logging

import threading

import pandas as pd

import pickle

class MySqlHelper(object):

    def __init__(

        self,

        host='192.168.15.144',

        port=3306,

        database='test_ims',

        user='spkjz_writer',

        password='7cmoP3QDtueVJQj2q4Az',

        logger:logging.Logger=None

        ):

        self.host = host

        self.port = port

        self.database = database

        self.user = user

        self.password = password

        self.logger = logger

        self.connection_str = 'mysql+pymysql://%s:%s@%s:%d/%s' %(

            self.user, self.password, self.host, self.port, self.database

        )

        self.conn = None

        self.cursor = None

        self.engine = None

        self.session = None

    def create_table_code(self, file_name):

        '''创建表类代码'''

        os.system(f'sqlacodegen {self.connection_str} > {file_name}')

        return self.conn

    def get_conn(self):

        '''创建连接或获取连接'''

        if self.conn is None:

            engine = self.get_engine()

            self.conn = engine.connect()

        return self.conn

    def get_engine(self):

        '''创建连接或获取连接'''

        if self.engine is None:

            self.engine = sqlalchemy.create_engine(self.connection_str)

        return self.engine

    def get_cursor(self):

        '''创建连接或获取连接'''

        if self.cursor is None:

            self.cursor = self.conn.cursor()

        return self.cursor

    def get_session(self) -> sessionmaker:

        '''创建连接或获取连接'''

        if self.session is None:

            engine = self.get_engine()

            Session = sessionmaker(bind=engine)

            self.session = Session()

        return self.session

    def close_conn(self):

        '''关闭连接'''

        if self.conn is not None:

            self.conn.close()

            self.conn = None

        self.dispose_engine()

    def close_session(self):

        '''关闭连接'''

        if self.session is not None:

            self.session.close()

            self.session = None

        self.dispose_engine()

    def dispose_engine(self):

        '''释放engine'''

        if self.engine is not None:

            # self.engine.dispose(close=False)

            self.engine.dispose()

            self.engine = None

    def close_cursor(self):

        '''关闭cursor'''

        if self.cursor is not None:

            self.cursor.close()

            self.cursor = None

    def get_data(self, sql, auto_close=True) -> pd.DataFrame:

        '''查询数据'''

        conn = self.get_conn()

        data = None

        try:

            # 异常重试3次

            for i in range(3):

                try:

                    data = pd.read_sql(sql, conn)

                    break

                except Exception as ex:

                    if i == 2:

                        raise ex # 往外抛出异常

                    time.sleep(60) # 一分钟后重试

        except Exception as ex:

            self.logger.exception(ex)

            raise ex # 往外抛出异常

        finally:

            if auto_close:

                self.close_conn()

        return data

pass

class VarsHelper():

    def __init__(self, save_dir, auto_save=True):

        self.save_dir = save_dir

        self.auto_save = auto_save

        self.values = {}

        if not os.path.exists(os.path.dirname(self.save_dir)):

            os.makedirs(os.path.dirname(self.save_dir))

        if os.path.exists(self.save_dir):

            with open(self.save_dir, 'rb') as f:

                self.values = pickle.load(f)

                f.close()

    def set_value(self, key, value):

        self.values[key] = value

        if self.auto_save:

            self.save_file()

    def get_value(self, key):

        return self.values[key]

    def has_key(self, key):

        return key in self.values.keys()

    def save_file(self):

        with open(self.save_dir, 'wb') as f:

            pickle.dump(self.values, f)

            f.close()

pass

class GlobalShareArgs():

    args = {

        "debug": False

    }

    def get_args():

        return GlobalShareArgs.args

    def set_args(args):

        GlobalShareArgs.args = args

    def set_args_value(key, value):

        GlobalShareArgs.args[key] = value

    def get_args_value(key, default_value=None):

        return GlobalShareArgs.args.get(key, default_value)

    def contain_key(key):

        return key in GlobalShareArgs.args.keys()

    def update(args):

        GlobalShareArgs.args.update(args)

pass

class ShareArgs():

    args = {

        "labels_dir":"./hjx/shop_group/month_w_amt/data/labels", # 标签目录

        "labels_output_dir":"./hjx/shop_group/month_w_amt/data/labels_output", # 聚类导出标签目录

        "common_datas_dir":"./hjx/data", # 共用数据目录。ur_bi_dw的公共

        "only_predict": False, # 只识别,不训练

        "delete_model": True, # 先删除模型,仅在训练时使用

        "export_excel": False, # 导出excel

        "classes": 12, # 聚类数

        "batch_size": 16,

        "hidden_size": 32,

        "max_nrof_epochs": 100,

        "learning_rate": 0.0005,

        "loss_type": "categorical_crossentropy",

        "avg_model_num": 10,

        "steps_per_epoch": 4.0, # 4.0

        "lr_callback_patience": 4,

        "lr_callback_cooldown": 1,

        "early_stopping_callback_patience": 6,

        "get_data": True,

    }

    def get_args():

        return ShareArgs.args

    def set_args(args):

        ShareArgs.args = args

    def set_args_value(key, value):

        ShareArgs.args[key] = value

    def get_args_value(key, default_value=None):

        return ShareArgs.args.get(key, default_value)

    def contain_key(key):

        return key in ShareArgs.args.keys()

    def update(args):

        ShareArgs.args.update(args)

pass

class IMSGetDatasBase():

    # 线程锁列表,同保存路径共用锁

    lock_dict:Dict[str, threading.Lock] = {}

    # 时间列表,用于判断是否超时

    time_dict:Dict[str, datetime.datetime] = {}

    # 用于记录是否需要更新超时时间

    get_data_timeout_dict:Dict[str, bool] = {}

    def __init__(

        self,

        host='192.168.15.144',

        port=3306,

        database='test_ims',

        user='spkjz_writer',

        password='Ur#7cmoP3QDtueVJQj2q4Az',

        save_dir=None,

        logger:logging.Logger=None,

        ):

        self.save_dir = save_dir

        self.logger = logger

        self.db_helper = MySqlHelper(

            host=host,

            port=port,

            database=database,

            user=user,

            password=password,

            logger=logger

            )

        # 创建子目录

        if self.save_dir is not None and not os.path.exists(self.save_dir):

            os.makedirs(self.save_dir)

        self.vars_helper = None

        if GlobalShareArgs.get_args_value('debug'):

            self.vars_helper = VarsHelper('./hjx/data/vars/IMSGetDatas') # 把超时时间保存到文件,注释该行即可停掉,只用于调试

    def close(self):

        '''关闭连接'''

        self.db_helper.close_conn()

    def get_last_time(self, key_name) -> bool:

        '''获取是否超时'''

        # 转静态路径,确保唯一性

        key_name = os.path.abspath(key_name)

        if self.vars_helper is not None and self.vars_helper.has_key('IMSGetDatasBase.time_list'):

            IMSGetDatasBase.time_dict = self.vars_helper.get_value('IMSGetDatasBase.time_list')

        timeout = 12 # 12小时

        if GlobalShareArgs.get_args_value('debug'):

            timeout = 24 # 24小时

        get_data_timeout = False

        if key_name not in IMSGetDatasBase.time_dict.keys() or (datetime.datetime.today() - IMSGetDatasBase.time_dict[key_name]).total_seconds()>(4*60*60):

            self.logger.info('超时%d小时,重新查数据:%s', timeout, key_name)

            # IMSGetDatasBase.time_list[key_name] = datetime.datetime.today()

            get_data_timeout = True

        else:

            self.logger.info('未超时%d小时,跳过查数据:%s', timeout, key_name)

        # if self.vars_helper is not None :

        #     self.vars_helper.set_value('IMSGetDatasBase.time_list', IMSGetDatasBase.time_list)

        IMSGetDatasBase.get_data_timeout_dict[key_name] = get_data_timeout

        return get_data_timeout

    def save_last_time(self, key_name):

        '''更新状态超时'''

        # 转静态路径,确保唯一性

        key_name = os.path.abspath(key_name)

        if IMSGetDatasBase.get_data_timeout_dict[key_name]:

            IMSGetDatasBase.time_dict[key_name] = datetime.datetime.today()

        if self.vars_helper is not None :

            IMSGetDatasBase.time_dict[key_name] = datetime.datetime.today()

            self.vars_helper.set_value('IMSGetDatasBase.time_list', IMSGetDatasBase.time_dict)

    def get_lock(self, key_name) -> threading.Lock:

        '''获取锁'''

        # 转静态路径,确保唯一性

        key_name = os.path.abspath(key_name)

        if key_name not in IMSGetDatasBase.lock_dict.keys():

            IMSGetDatasBase.lock_dict[key_name] = threading.Lock()

        return IMSGetDatasBase.lock_dict[key_name]

    def get_data_of_date(

        self,

        save_dir,

        sql,

        sort_columns:List[str],

        del_index_list=[-1], # 删除最后下标

        start_date = datetime.datetime(2017, 1, 1), # 开始时间

        offset = relativedelta(months=3), # 时间间隔

        date_format_fun = lambda d: '%04d%02d01' % (d.year, d.month), # 查询语句中替代时间参数的格式化

        filename_format_fun = lambda d: '%04d%02d.csv' % (d.year, d.month), # 查询语句中替代时间参数的格式化

        stop_date = '20700101', # 超过时间则停止

        ):

        '''分时间增量读取数据'''

        # 创建文件夹

        if not os.path.exists(save_dir):

            os.makedirs(save_dir)

        else:

            #删除最后一个文件

            file_list = os.listdir(save_dir)

            if len(file_list)>0:

                file_list.sort()

                for del_index in del_index_list:

                    os.remove(os.path.join(save_dir,file_list[del_index]))

                    print('删除最后一个文件:', file_list[del_index])

        select_index = -1

        # start_date = datetime.datetime(2017, 1, 1)

        while True:

            end_date = start_date + offset

            start_date_str = date_format_fun(start_date)

            end_date_str = date_format_fun(end_date)

            self.logger.info('date: %s-%s', start_date_str, end_date_str)

            file_path = os.path.join(save_dir, filename_format_fun(start_date))

            # self.logger.info('file_path: %s', file_path)

            if not os.path.exists(file_path):

                data:pd.DataFrame = self.db_helper.get_data(sql % (start_date_str, end_date_str))

                if data is None:

                    break

                self.logger.info('data: %d', len(data))

                # self.logger.info('data: %d', data.columns)

                if len(data)>0:

                    select_index+=1

                    # 排序

                    data = data.sort_values(sort_columns)

                    data.to_csv(file_path)

                elif select_index!=-1:

                    break

                elif stop_date < start_date_str:

                    raise Exception("读取数据异常,时间超出最大值!")

            start_date = end_date

pass

class CustomIMSGetDatas(IMSGetDatasBase):

    def __init__(

        self,

        host='192.168.13.134',

        port=4000,

        database='test_ims',

        user='root',

        password='rootimmsadmin',

        save_dir='./hjx/data/export_ims_data',

        logger:logging.Logger=None

        ):

        self.save_dir = save_dir

        self.logger = logger

        super().__init__(

            host=host,

            port=port,

            database=database,

            user=user,

            password=password,

            save_dir=save_dir,

            logger=logger

            )

    def get_ims_w_amt_pro(self):

        '''年月系列占比数据'''

        file_path = os.path.join(self.save_dir,'ims_w_amt_pro.csv')

        now_lock = self.get_lock(file_path)

        now_lock.acquire() # 加锁

        try:

            # 设置超时4小时才重新查数据

            # if not self.get_last_time(file_path):

            #     return

            sql = 'SELECT * FROM ims_w_amt_pro'

            data:pd.DataFrame = self.db_helper.get_data(sql)

            data = data.rename(columns={

                'serial_forecast_proportion': 'forecast_proportion',

            })

            data.to_csv(file_path)

            # # 更新超时时间

            # self.save_last_time(file_path)

        except Exception as ex:

            self.logger.exception(ex)

            raise ex # 往外抛出异常

        finally:

            now_lock.release() # 释放锁

pass

def get_datas(

    host='192.168.13.134',

    port=4000,

    database='test_ims',

    user='root',

    password='rootimmsadmin',

    save_dir='./hjx/data/export_ims_data',

    logger:logging.Logger=None

    ):

    ur_bi_get_datas = CustomIMSGetDatas(

        host=host,

        port=port,

        database=database,

        user=user,

        password=password,

        save_dir=save_dir,

        logger=logger

    )

    try:

        # 年月系列占比数据

        logger.info('正在查询年月系列占比数据...')

        ur_bi_get_datas.get_ims_w_amt_pro()

        logger.info('查询年月系列占比数据完成!')

    except Exception as ex:

        logger.exception(ex)

        raise ex # 往外抛出异常

    finally:

        ur_bi_get_datas.close()

pass

def getdata_export_ims(

    host='192.168.13.134',

    port=4000,

    database='test_ims',

    user='root',

    password='rootimmsadmin',

    save_dir='./hjx/data/export_ims_data',

    logger:logging.Logger=None

    ):

    get_datas(

        host=host,

        port=port,

        database=database,

        user=user,

        password=password,

        save_dir=save_dir,

        logger=logger

    )

pass

Nach dem Login kopieren

Das obige ist der detaillierte Inhalt vonWie lese ich eine Hive-Datenbank mit Python?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn

Heiße Artikel -Tags

Notepad++7.3.1

Notepad++7.3.1

Einfach zu bedienender und kostenloser Code-Editor

SublimeText3 chinesische Version

SublimeText3 chinesische Version

Chinesische Version, sehr einfach zu bedienen

Senden Sie Studio 13.0.1

Senden Sie Studio 13.0.1

Leistungsstarke integrierte PHP-Entwicklungsumgebung

Dreamweaver CS6

Dreamweaver CS6

Visuelle Webentwicklungstools

SublimeText3 Mac-Version

SublimeText3 Mac-Version

Codebearbeitungssoftware auf Gottesniveau (SublimeText3)

So laden Sie Deepseek Xiaomi herunter So laden Sie Deepseek Xiaomi herunter Feb 19, 2025 pm 05:27 PM

So laden Sie Deepseek Xiaomi herunter

Was sind die Vor- und Nachteile des Templatings? Was sind die Vor- und Nachteile des Templatings? May 08, 2024 pm 03:51 PM

Was sind die Vor- und Nachteile des Templatings?

Google AI kündigt Gemini 1.5 Pro und Gemma 2 für Entwickler an Google AI kündigt Gemini 1.5 Pro und Gemma 2 für Entwickler an Jul 01, 2024 am 07:22 AM

Google AI kündigt Gemini 1.5 Pro und Gemma 2 für Entwickler an

Für nur 250 US-Dollar zeigt Ihnen der technische Leiter von Hugging Face Schritt für Schritt, wie Sie Llama 3 verfeinern Für nur 250 US-Dollar zeigt Ihnen der technische Leiter von Hugging Face Schritt für Schritt, wie Sie Llama 3 verfeinern May 06, 2024 pm 03:52 PM

Für nur 250 US-Dollar zeigt Ihnen der technische Leiter von Hugging Face Schritt für Schritt, wie Sie Llama 3 verfeinern

Teilen Sie mehrere .NET-Open-Source-KI- und LLM-bezogene Projekt-Frameworks Teilen Sie mehrere .NET-Open-Source-KI- und LLM-bezogene Projekt-Frameworks May 06, 2024 pm 04:43 PM

Teilen Sie mehrere .NET-Open-Source-KI- und LLM-bezogene Projekt-Frameworks

Eine vollständige Anleitung zum Debuggen und Analysieren von Golang-Funktionen Eine vollständige Anleitung zum Debuggen und Analysieren von Golang-Funktionen May 06, 2024 pm 02:00 PM

Eine vollständige Anleitung zum Debuggen und Analysieren von Golang-Funktionen

Wie fragst du ihn Deepseek? Wie fragst du ihn Deepseek? Feb 19, 2025 pm 04:42 PM

Wie fragst du ihn Deepseek?

So speichern Sie die Evaluierungsfunktion So speichern Sie die Evaluierungsfunktion May 07, 2024 am 01:09 AM

So speichern Sie die Evaluierungsfunktion

See all articles