目录
conf/example/instance.properties" >conf/example/instance.properties
启动" >启动
RocketMQ部分" >RocketMQ部分
首页 Java Java基础 便捷的统计订单收益(一)

便捷的统计订单收益(一)

Oct 21, 2020 pm 05:18 PM

java基础教程栏目带大家了解如何便捷的统计订单收益。

便捷的统计订单收益(一)

引言

统计订单收益是做电商类型的APP老生常谈的问题.常规需求大致有用户收益日报/月报/年报.这些报表型的数据对表设计和程序设计有着不小的挑战.常规的聚合查询语句的查询时间会随着收益表数据日渐庞大而逐渐变长.这时候就需要思考如何设计收益表可以更高效的查询?怎样的设计才可以让统计收益变得简单?

需求

效果图

具体需求

  • 收益类型分为:自购订单收益,分享订单收益,分销收益,活动收益
  • 统计当日收益,当月收益
  • 根据筛选的时间统计出时间段的收益.

思考

设计思路

订单表是肯定需要的.在写入或者修改订单表的时候同步写入修改收益表.只有自购和分享订单会记录到订单表中,分销以及活动赠送收益只在特殊业务中写入收益表.再以日为维度,创建一张用户收益日报表.单行记录写入用户当天收益情况.降低查询用户日/月/年收益统计时的数据量.以单用户为例,通过拆分用户一个月只会产生最多31条数据.属于可控增长速度.如果沿用收益表,因为收益表的数据量跟用户下单的数量一一对应,如果用户下单量多那么表会非常庞大.在前期用户量初见增长时,可用此方法规避大的数据量统计,后期如果用户量增大导致日报表数据变多可以再考虑分表.

可见问题

  • 同步收益日报表的时机问题,因为原本订单的操作就很复杂需要同步写入收益和计算写入收益日报数据,代码耦合度太高.有没有什么方法通过收益表异构出收益日报表呢?
  • 虽然收益被写入到了日报表中,但是要满足效果图要求的效果,可能需要多次查询SQL语句,有没有办法在不影响程序效率的情况下尽量少些一些聚合SQL呢?

实现

总结出上面这些问题.我开始了资料收集.最终采用canal RocketMQ做为异构方案.

技术栈

简单介绍下这两款技术框架:

  • canal:主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
  • RocketMQ:一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。

注:我用的aliyun的全家桶,MQ和mysql都是阿里云的,如果是自建服务器的可能有区别,我在后面尽量标出

方案流程

  1. 在写入或修改收益表的同时通过canal监控mysql收益表的binlog日志.
  2. canal检测到变更,组装变更的JSON报文,发送RocketMQ中事先定义好的TOPIC.
  3. 程序消费该TOPIC,异构收益日报表.

canal配置部分

canal的安装请参考官方文档 解压后可得到一个canal文件夹,包含三个目录

  • bin:存放启动重启脚本
  • conf:存放核心配置文件
  • lib:存放核心jar包

我们需要重点关注conf文件夹里的conf/canal.properties核心配置文件以及conf/example/instance.properties单个监控节点配置文件

conf/canal.properties

# tcp, kafka, RocketMQ,这里默认是tcp读取模式,采用RocketMQ需要将其改变为RocketMQ模式
canal.serverMode = RocketMQ
# 如果是aliyun的RocketMQ需要配置以下两个KEY,ak/sk
canal.aliyun.accessKey =xxxxxxx
canal.aliyun.secretKey =xxxxxxx
# 监控的节点名称.这个默认就是example如果有多节点可以逗号隔开,如下方的例子
canal.destinations = example,sign
# 如果是aliyun的RocketMQ需要修改canal.mq.accessChannel为cloud默认为local
canal.mq.accessChannel = cloud
#MQ的地址,需要注意这里是不带http://,但是需要带端口号
canal.mq.servers = 
#rocketmq实例id
canal.mq.namespace =
登录后复制

conf/example/instance.properties

#mysql地址
canal.instance.master.address=
#以下两个参数需要在开启数据库binlog日志后得到,在数据库查询界面输入查询语句`show master status`,canal.instance.master.journal.name对应File参数,canal.instance.master.position对应Position参数
canal.instance.master.journal.name=
canal.instance.master.position=
#数据库的账号密码
canal.instance.dbUsername=
canal.instance.dbPassword=
#需要监控变动的表
canal.instance.filter.regex=xxx.t_user_order,xxx.t_user_cash_out
#定义发送的mq生产组
canal.mq.producerGroup = 
#定义发送到mq的指定主题
canal.mq.topic=
登录后复制

注:监控表的书写规则格式参照监控表书写规则

启动

cd /canal/bin
./start.sh
登录后复制

这时候会发现canal目录中多了一个log文件,进入可以看到canal主日志文件和example节点启动日志.

canal日志中出现
 the canal server is running now ......
example日志中出现
 init table filter : ^tablename
 xxxxxxxxx , the next step is binlog dump
登录后复制

表示你已经成功了一大步,canal监控已正常运行.

RocketMQ部分

如果用的aliyun的RocketMQ,配置代码部分直接可参考文档 自建的RocketMQ也可参照简单的消费例子监控对应的TOPIC即可 消费Canal发来的数据,格式如下:

{
    "data":[
        {
            //单个修改后表数据,如果同一时间有多个表变动会有多个该JSON对象        }
    ],
    "database":"监控的表所在数据库",
    "es":表变动时间,
    "id":canal生成的id,
    "isDdl":Boolean类型,表示是否DDL语句,
    "mysqlType":{
        表结构
    },
    "old":如果是修改类型会填充修改前的值,
    "pkNames":[
        该表的主键,如"id"
    ],
    "sql":"执行的SQL",
    "sqlType":{
        字段对应的sqlType,一般使用mysqlType即可
    },
    "table":"监控的表名",
    "ts":canal记录发送时间,
    "type":"表的修改类型,入INSERT,UPDATE,DELETE"
}
登录后复制

MQ消费代码主要用了反射,映射到对应的表

//这里的body就是Canal发来的数据
public Action process(String body) {
        boolean result = Boolean.FALSE;
        JSONObject data = JSONObject.parseObject(body);
        log.info("数据库操作日志记录:data:{}",data.toString());
        Class c = null;
        try {
            //这里监控了订单和收益表分别做订单统计和收益日报统计
            c = Class.forName(getClassName(data.getString("table")));
        } catch (ClassNotFoundException e) {
            log.error("error {}",e);
        }
        if (null != c) {
            JSONArray dataArray = data.getJSONArray("data");
            if (dataArray != null) {
                //把获取到的data部分转换为反射后的实体集合
                List list = dataArray.toJavaList(c);
                if (CollUtil.isNotEmpty(list)) {
                    //对修改和写入操作分别进行逻辑操作
                    String type = data.getString("type");
                    if ("UPDATE".equals(type)) {
                        result = uppHistory(list);
                    } else if ("INSERT".equals(type)) {
                        result = saveHistory(list);
                    }
                }
            }
        }
        return result ? Action.CommitMessage : Action.ReconsumeLater;
    }
    
    /**
     * @description: 获取反射ClassName
     * @author: chenyunxuan
     */
    private String getClassName(String tableName) {
        StringBuilder sb = new StringBuilder();
        //判断是哪张表的数据
        if (tableName.equals("t_user_income_detail")) {
            sb.append("cn.mc.core.model.order");
        } else if (tableName.equals("t_user_cash_out")) {
            sb.append("cn.mc.sync.model");
        }
        String className = StrUtil.toCamelCase(tableName).substring(1);
        return sb.append(".").append(className).toString();
    }
    
    /**
     * @description: 写入对应类型的统计表
     * @author: chenyunxuan
     */
    private <T> Boolean saveHistory(List<T> orderList) {
        boolean result = Boolean.FALSE;
        Object dataType = orderList.get(0);
        //用instanceof判断类型进入不同的逻辑处理代码
        if (dataType instanceof TUserIncomeDetail) {
            result = userOrderHistoryService.saveIncomeDaily(orderList);
        } else if (dataType instanceof UserCashOut) {
            result = userCashOutHistoryService.delSaveHistoryList(orderList);
        }
        return result;
    }
登录后复制

saveIncomeDaily伪代码

  public synchronized Boolean saveIncomeDaily(List orderList) {
    //循环收益明细记录
    .......
    //通过创建时间和用户id查询收益日报表中是否有当日数据
    if(不存在当日数据){
        //创建当日的收益日报表记录
        .....
    }
    //因为不存在当日记录也会立即写入当日的空数据,所以下面的流程都是走更新流程
    //更新当日数据
    .......
    return Boolean.TRUE;
    }
登录后复制

注:代码中应该多打一些日志,方便产生异常收益数据后的校对

后记

至此一个基于canal+RocketMQ的收益日报统计异构方案就完成了,下一篇会围绕本文提到的第二个问题减少聚合SQL的产生展开.敬请关注.

相关免费学习推荐:java基础教程

以上是便捷的统计订单收益(一)的详细内容。更多信息请关注PHP中文网其他相关文章!

本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热门文章

R.E.P.O.能量晶体解释及其做什么(黄色晶体)
3 周前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳图形设置
3 周前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您听不到任何人,如何修复音频
3 周前 By 尊渡假赌尊渡假赌尊渡假赌
WWE 2K25:如何解锁Myrise中的所有内容
4 周前 By 尊渡假赌尊渡假赌尊渡假赌

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

如何使用Java的Try-with-Resources语句进行自动资源管理? 如何使用Java的Try-with-Resources语句进行自动资源管理? Mar 14, 2025 pm 04:59 PM

Java的Try-with-Resources通过自动关闭文件流或数据库连接等资源来简化资源管理,从而提高代码可读性和可维护性。

如何使用Java的Nashorn Engine用JavaScript脚本? 如何使用Java的Nashorn Engine用JavaScript脚本? Mar 14, 2025 pm 05:00 PM

Java的Nashorn Engine可以在Java应用程序中启用JavaScript脚本。关键步骤包括设置Nashorn,管理脚本和优化性能。主要问题涉及安全性,内存管理和未来兼容性

什么是Java虚拟机(JVM),它在内部如何工作? 什么是Java虚拟机(JVM),它在内部如何工作? Mar 14, 2025 pm 05:05 PM

本文讨论了Java虚拟机(JVM),详细介绍了其在不同平台运行Java程序中的作用。它说明了JVM的内部流程,密钥组件,内存管理,垃圾收集和性能Optimizatio

Java中有哪些不同的垃圾收集算法(串行,并行,CMS,G1,ZGC)? Java中有哪些不同的垃圾收集算法(串行,并行,CMS,G1,ZGC)? Mar 14, 2025 pm 05:06 PM

本文讨论了各种Java垃圾收集算法(串行,并行,CMS,G1,ZGC),它们的性能影响和适合大量堆的应用。

如何使用Java的枚举来表示固定的值集? 如何使用Java的枚举来表示固定的值集? Mar 14, 2025 pm 04:57 PM

Java枚举代表固定的值集,通过自定义方法和构造函数提供类型安全性,可读性和其他功能。它们增强了代码组织,可用于开关语句中以进行有效的价值处理。

See all articles