Maison > base de données > tutoriel mysql > Explication détaillée des exemples de méthodes d'implémentation de séquence dans Mysql

Explication détaillée des exemples de méthodes d'implémentation de séquence dans Mysql

黄舟
Libérer: 2017-09-08 11:31:43
original
2868 Les gens l'ont consulté

L'éditeur suivant vous proposera un article sur la façon d'implémenter Sequence basé sur Mysql. L'éditeur le trouve plutôt bon, je vais donc le partager avec vous maintenant et le donner comme référence pour tout le monde. Suivons l'éditeur pour y jeter un œil

L'équipe a remplacé le nouveau framework. Toutes les nouvelles entreprises utilisent de nouveaux frameworks et même de nouvelles bases de données – Mysql.

Oracle a déjà été utilisé ici. Divers numéros de commande, numéros de série, numéros de lot, etc., utilisent tous directement les numéros de série numériques fournis par la séquence d'Oracle. Maintenant que la base de données a été remplacée par Mysql, l'ancienne méthode n'est évidemment plus applicable.

Besoin d'en écrire un nouveau :

•Utilisé dans des scénarios distribués

•Répondre à certaines exigences de concurrence

J'ai recherché des informations pertinentes et découvert que l'implémentation de MySQL est basée sur un enregistrement de base de données et met continuellement à jour sa valeur. Ensuite, la plupart des solutions d’implémentation utilisent des fonctions.

Publier le code depuis Internet :

Implémentation de la table

sur la fonction mysql Structure


CREATE TABLE `t_sequence` (
`sequence_name` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '序列名称' ,
`value` int(11) NULL DEFAULT NULL COMMENT '当前值' ,
PRIMARY KEY (`sequence_name`)
)
ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci
ROW_FORMAT=COMPACT
;
Copier après la connexion

Obtenez la valeur suivante


CREATE DEFINER = `root`@`localhost` FUNCTION `nextval`(sequence_name varchar(64))
 RETURNS int(11)
BEGIN
 declare current integer;
 set current = 0;
 
 update t_sequence t set t.value = t.value + 1 where t.sequence_name = sequence_name;
 select t.value into current from t_sequence t where t.sequence_name = sequence_name;

 return current;
end;
Copier après la connexion

Des problèmes peuvent survenir dans des scénarios simultanés. Bien que le verrouillage puisse être effectué au niveau de la couche métier, cela ne peut pas être garanti dans les scénarios distribués et l'efficacité ne sera pas élevée.

Implémentez-en un par vous-même, version Java

Principe :

•Lire un enregistrement, mettre en cache un segment de données, tel que : 0-100, modifier la valeur actuelle du enregistrement de 0 Modifier à 100

•Mise à jour du verrouillage optimiste de la base de données, autoriser les tentatives

•Lire les données du cache, relire la base de données une fois terminé

Pas de bêtises, c'est parti Code :

Basé sur l'implémentation Java

Structure de la table

Chaque fois qu'elle est mise à jour, SEQ_VALUE est défini sur SEQ_VALUE+STEP


CREATE TABLE `t_pub_sequence` (
 `SEQ_NAME` varchar(128) CHARACTER SET utf8 NOT NULL COMMENT '序列名称',
 `SEQ_VALUE` bigint(20) NOT NULL COMMENT '目前序列值',
 `MIN_VALUE` bigint(20) NOT NULL COMMENT '最小值',
 `MAX_VALUE` bigint(20) NOT NULL COMMENT '最大值',
 `STEP` bigint(20) NOT NULL COMMENT '每次取值的数量',
 `TM_CREATE` datetime NOT NULL COMMENT '创建时间',
 `TM_SMP` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
 PRIMARY KEY (`SEQ_NAME`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='流水号生成表';
Copier après la connexion

interface de séquence


/**
 * <p></p>
 * @author coderzl
 * @Title MysqlSequence
 * @Description 基于mysql数据库实现的序列
 * @date 2017/6/6 23:03
 */
public interface MysqlSequence {
 /**
  * <p>
  * 获取指定sequence的序列号
  * </p>
  * @param seqName sequence名
  * @return String 序列号
  */
 public String nextVal(String seqName);
}
Copier après la connexion

Intervalle de séquence

est utilisé pour mettre en cache une séquence localement, de l'intervalle min à l'intervalle max


/**
 * <p></p>
 *
 * @author coderzl
 * @Title SequenceRange
 * @Description 序列区间,用于缓存序列
 * @date 2017/6/6 22:58
 */
 @Data
public class SequenceRange {
 private final long  min;
 private final long  max;
 /** */
 private final AtomicLong value;
 /** 是否超限 */
 private volatile boolean over = false;

 /**
  * 构造.
  *
  * @param min 
  * @param max 
  */
 public SequenceRange(long min, long max) {
  this.min = min;
  this.max = max;
  this.value = new AtomicLong(min);
 }

 /**
  * <p>Gets and increment</p>
  *
  * @return 
  */
 public long getAndIncrement() {
  long currentValue = value.getAndIncrement();
  if (currentValue > max) {
   over = true;
   return -1;
  }

  return currentValue;
 }

}
Copier après la connexion

BO

correspond à l'enregistrement de la base de données


@Data
public class MysqlSequenceBo {
 /**
  * seq名
  */
 private String seqName;
 /**
  * 当前值
  */
 private Long seqValue;
 /**
  * 最小值
  */
 private Long minValue;
 /**
  * 最大值
  */
 private Long maxValue;
 /**
  * 每次取值的数量
  */
 private Long step;
 /** */
 private Date tmCreate;
 /** */
 private Date tmSmp;

 public boolean validate(){
  //一些简单的校验。如当前值必须在最大最小值之间。step值不能大于max与min的差
  if (StringUtil.isBlank(seqName) || minValue < 0 || maxValue <= 0 || step <= 0 || minValue >= maxValue || maxValue - minValue <= step ||seqValue < minValue || seqValue > maxValue ) {
   return false;
  }
  return true; 
 }
}
Copier après la connexion

DAO

ajouter, supprimer, modifier et vérifier En fait, modification. et check

< sont utilisés 🎜>


public interface MysqlSequenceDAO {
 /**
 * 
 */
 public int createSequence(MysqlSequenceBo bo);

 public int updSequence(@Param("seqName") String seqName, @Param("oldValue") long oldValue ,@Param("newValue") long newValue);

 public int delSequence(@Param("seqName") String seqName);

 public MysqlSequenceBo getSequence(@Param("seqName") String seqName);

 public List<MysqlSequenceBo> getAll();
}
Copier après la connexion
Mapper


<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.xxxxx.core.sequence.impl.dao.MysqlSequenceDAO" >
 <resultMap id="BaseResultMap" type="com.xxxxx.core.sequence.impl.MysqlSequenceBo" >
  <result column="SEQ_NAME" property="seqName" jdbcType="VARCHAR" />
  <result column="SEQ_VALUE" property="seqValue" jdbcType="BIGINT" />
  <result column="MIN_VALUE" property="minValue" jdbcType="BIGINT" />
  <result column="MAX_VALUE" property="maxValue" jdbcType="BIGINT" />
  <result column="STEP" property="step" jdbcType="BIGINT" />
  <result column="TM_CREATE" property="tmCreate" jdbcType="TIMESTAMP" />
  <result column="TM_SMP" property="tmSmp" jdbcType="TIMESTAMP" />
 </resultMap>
 <delete id="delSequence" parameterType="java.lang.String" >
  delete from t_pub_sequence
  where SEQ_NAME = #{seqName,jdbcType=VARCHAR}
 </delete>
 <insert id="createSequence" parameterType="com.xxxxx.core.sequence.impl.MysqlSequenceBo" >
  insert into t_pub_sequence (SEQ_NAME,SEQ_VALUE,MIN_VALUE,MAX_VALUE,STEP,TM_CREATE)
  values (#{seqName,jdbcType=VARCHAR}, #{seqValue,jdbcType=BIGINT},
  #{minValue,jdbcType=BIGINT}, #{maxValue,jdbcType=BIGINT}, #{step,jdbcType=BIGINT},
  now())
 </insert>
 <update id="updSequence" parameterType="com.xxxxx.core.sequence.impl.MysqlSequenceBo" >
  update t_pub_sequence
  set SEQ_VALUE = #{newValue,jdbcType=BIGINT}
  where SEQ_NAME = #{seqName,jdbcType=VARCHAR} and SEQ_VALUE = #{oldValue,jdbcType=BIGINT}
 </update>

 <select id="getAll" resultMap="BaseResultMap" >
  select SEQ_NAME, SEQ_VALUE, MIN_VALUE, MAX_VALUE, STEP
  from t_pub_sequence
 </select>

 <select id="getSequence" resultMap="BaseResultMap" >
  select SEQ_NAME, SEQ_VALUE, MIN_VALUE, MAX_VALUE, STEP
  from t_pub_sequence
  where SEQ_NAME = #{seqName,jdbcType=VARCHAR}
 </select>
</mapper>
Copier après la connexion
Implémentation de l'interface


@Repository("mysqlSequence")
public class MysqlSequenceImpl implements MysqlSequence{

 @Autowired
 private MysqlSequenceFactory mysqlSequenceFactory;
 /**
  * <p>
  * 获取指定sequence的序列号
  * </p>
  *
  * @param seqName sequence名
  * @return String 序列号
  * @author coderzl
  */
 @Override
 public String nextVal(String seqName) {
  return Objects.toString(mysqlSequenceFactory.getNextVal(seqName));
 }
}
Copier après la connexion
Usine

L'usine ne fait que deux choses

•Lorsque le service démarre, initialisez toutes les séquences de la base de données [Cache d'intervalle de séquence complet]

•Obtenir la séquence Valeur suivante


@Component
public class MysqlSequenceFactory {

 private final Lock lock = new ReentrantLock();

 /** */
 private Map<String,MysqlSequenceHolder> holderMap = new ConcurrentHashMap<>();

 @Autowired
 private MysqlSequenceDAO msqlSequenceDAO;
 /** 单个sequence初始化乐观锁更新失败重试次数 */
 @Value("${seq.init.retry:5}")
 private int initRetryNum;
 /** 单个sequence更新序列区间乐观锁更新失败重试次数 */
 @Value("${seq.get.retry:20}")
 private int getRetryNum;

 @PostConstruct
 private void init(){
  //初始化所有sequence
  initAll();
 }


 /**
  * <p> 加载表中所有sequence,完成初始化 </p>
  * @return void
  * @author coderzl
  */
 private void initAll(){
  try {
   lock.lock();
   List<MysqlSequenceBo> boList = msqlSequenceDAO.getAll();
   if (boList == null) {
    throw new IllegalArgumentException("The sequenceRecord is null!");
   }
   for (MysqlSequenceBo bo : boList) {
    MysqlSequenceHolder holder = new MysqlSequenceHolder(msqlSequenceDAO, bo,initRetryNum,getRetryNum);
    holder.init();
    holderMap.put(bo.getSeqName(), holder);
   }
  }finally {
   lock.unlock();
  }
 }


 /**
  * <p> </p>
  * @param seqName
  * @return long
  * @author coderzl
  */
 public long getNextVal(String seqName){
  MysqlSequenceHolder holder = holderMap.get(seqName);
  if (holder == null) {
   try {
    lock.lock();
    holder = holderMap.get(seqName);
    if (holder != null){
     return holder.getNextVal();
    }
    MysqlSequenceBo bo = msqlSequenceDAO.getSequence(seqName);
    holder = new MysqlSequenceHolder(msqlSequenceDAO, bo,initRetryNum,getRetryNum);
    holder.init();
    holderMap.put(seqName, holder);
   }finally {
    lock.unlock();
   }
  }
  return holder.getNextVal();
 }

}
Copier après la connexion
Titulaire d'une seule séquence

•Initialisation init(), qui inclut la vérification des paramètres, mise à jour de l'enregistrement de la base de données et création d'un intervalle de séquence

•getNextVal() obtient la valeur suivante


public class MysqlSequenceHolder {

 private final Lock lock    = new ReentrantLock();

 /** seqName */
 private String seqName;

 /** sequenceDao */
 private MysqlSequenceDAO sequenceDAO;

 private MysqlSequenceBo sequenceBo;
 /** */
 private SequenceRange sequenceRange;
 /** 是否初始化 */
 private volatile boolean  isInitialize  = false;
 /** sequence初始化重试次数 */
 private int initRetryNum;
 /** sequence获取重试次数 */
 private int getRetryNum;

 /**
  * <p> 构造方法 </p>
  * @Title MysqlSequenceHolder
  * @param sequenceDAO 
  * @param sequenceBo
  * @param initRetryNum 初始化时,数据库更新失败后重试次数
  * @param getRetryNum 获取nextVal时,数据库更新失败后重试次数
  * @return
  * @author coderzl
  */
 public MysqlSequenceHolder(MysqlSequenceDAO sequenceDAO, MysqlSequenceBo sequenceBo,int initRetryNum,int getRetryNum) {
  this.sequenceDAO = sequenceDAO;
  this.sequenceBo = sequenceBo;
  this.initRetryNum = initRetryNum;
  this.getRetryNum = getRetryNum;
  if(sequenceBo != null)
   this.seqName = sequenceBo.getSeqName();
 }

 /**
  * <p> 初始化 </p>
  * @Title init
  * @param
  * @return void
  * @author coderzl
  */
 public void init(){
  if (isInitialize == true) {
   throw new SequenceException("[" + seqName + "] the MysqlSequenceHolder has inited");
  }
  if (sequenceDAO == null) {
   throw new SequenceException("[" + seqName + "] the sequenceDao is null");
  }
  if (seqName == null || seqName.trim().length() == 0) {
   throw new SequenceException("[" + seqName + "] the sequenceName is null");
  }
  if (sequenceBo == null) {
   throw new SequenceException("[" + seqName + "] the sequenceBo is null");
  }
  if (!sequenceBo.validate()){
   throw new SequenceException("[" + seqName + "] the sequenceBo validate fail. BO:"+sequenceBo);
  }
  // 初始化该sequence
  try {
   initSequenceRecord(sequenceBo);
  } catch (SequenceException e) {
   throw e;
  }
  isInitialize = true;
 }

 /**
  * <p> 获取下一个序列号 </p>
  * @Title getNextVal
  * @param
  * @return long
  * @author coderzl
  */
 public long getNextVal(){
  if(isInitialize == false){
   throw new SequenceException("[" + seqName + "] the MysqlSequenceHolder not inited");
  }
  if(sequenceRange == null){
   throw new SequenceException("[" + seqName + "] the sequenceRange is null");
  }
  long curValue = sequenceRange.getAndIncrement();

  if(curValue == -1){
   try{
    lock.lock();
    curValue = sequenceRange.getAndIncrement();
    if(curValue != -1){
     return curValue;
    }
    sequenceRange = retryRange();
    curValue = sequenceRange.getAndIncrement();
   }finally {
    lock.unlock();
   }
  }
  return curValue;
 }

 /**
  * <p> 初始化当前这条记录 </p>
  * @Title initSequenceRecord
  * @Description
  * @param sequenceBo
  * @return void
  * @author coderzl
  */
 private void initSequenceRecord(MysqlSequenceBo sequenceBo){
  //在限定次数内,乐观锁更新数据库记录
  for(int i = 1; i < initRetryNum; i++){
   //查询bo
   MysqlSequenceBo curBo = sequenceDAO.getSequence(sequenceBo.getSeqName());
   if(curBo == null){
    throw new SequenceException("[" + seqName + "] the current sequenceBo is null");
   }
   if (!curBo.validate()){
    throw new SequenceException("[" + seqName + "] the current sequenceBo validate fail");
   }
   //改变当前值
   long newValue = curBo.getSeqValue()+curBo.getStep();
   //检查当前值
   if(!checkCurrentValue(newValue,curBo)){
    newValue = resetCurrentValue(curBo);
   }
   int result = sequenceDAO.updSequence(sequenceBo.getSeqName(),curBo.getSeqValue(),newValue);
   if(result > 0){
    sequenceRange = new SequenceRange(curBo.getSeqValue(),newValue - 1);
    curBo.setSeqValue(newValue);
    this.sequenceBo = curBo;
    return;
   }else{
    continue;
   }
  }
  //限定次数内,更新失败,抛出异常
  throw new SequenceException("[" + seqName + "] sequenceBo update error");
 }

 /**
  * <p> 检查新值是否合法 新的当前值是否在最大最小值之间</p>
  * @param curValue
  * @param curBo
  * @return boolean
  * @author coderzl
  */
 private boolean checkCurrentValue(long curValue,MysqlSequenceBo curBo){
  if(curValue > curBo.getMinValue() && curValue <= curBo.getMaxValue()){
   return true;
  }
  return false;
 }

 /**
  * <p> 重置sequence当前值 :当前sequence达到最大值时,重新从最小值开始 </p>
  * @Title resetCurrentValue
  * @param curBo
  * @return long
  * @author coderzl
  */
 private long resetCurrentValue(MysqlSequenceBo curBo){
  return curBo.getMinValue();
 }

 /**
  * <p> 缓存区间使用完毕时,重新读取数据库记录,缓存新序列段 </p>
  * @Title retryRange
  * @param SequenceRange
  * @author coderzl
  */
 private SequenceRange retryRange(){
  for(int i = 1; i < getRetryNum; i++){
   //查询bo
   MysqlSequenceBo curBo = sequenceDAO.getSequence(sequenceBo.getSeqName());
   if(curBo == null){
    throw new SequenceException("[" + seqName + "] the current sequenceBo is null");
   }
   if (!curBo.validate()){
    throw new SequenceException("[" + seqName + "] the current sequenceBo validate fail");
   }
   //改变当前值
   long newValue = curBo.getSeqValue()+curBo.getStep();
   //检查当前值
   if(!checkCurrentValue(newValue,curBo)){
    newValue = resetCurrentValue(curBo);
   }
   int result = sequenceDAO.updSequence(sequenceBo.getSeqName(),curBo.getSeqValue(),newValue);
   if(result > 0){
    sequenceRange = new SequenceRange(curBo.getSeqValue(),newValue - 1);
    curBo.setSeqValue(newValue);
    this.sequenceBo = curBo;
    return sequenceRange;
   }else{
    continue;
   }
  }
  throw new SequenceException("[" + seqName + "] sequenceBo update error");

 }
}
Copier après la connexion
Résumé

•Quand le service est redémarré ou anormal, la valeur actuelle sera perdue La séquence inutilisée mise en cache par le service

• Dans les scénarios distribués, lorsque plusieurs services sont initialisés en même temps, ou lorsque la séquence est réacquise, verrouillage optimiste veillera à ce qu’ils n’entrent pas en conflit les uns avec les autres. Le service A obtiendra 0-99, le service B obtiendra 100-199, et ainsi de suite

•Lorsque la séquence est obtenue plus fréquemment, augmenter la valeur du pas peut améliorer les performances. Mais en même temps, lorsque le service est anormal, davantage de séquences sont perdues

• Modifiez certaines valeurs d'attribut de la séquence dans la base de données, comme step, max, etc., et les nouveaux paramètres seront être activé la prochaine fois qu'il sera obtenu à partir de la base de données

•sequence ne fournit qu'un nombre limité de numéros de séquence (max-min au maximum). Après avoir atteint le maximum, le cycle recommencera depuis le début.

•Puisque la séquence bouclera, après avoir atteint le maximum, elle ne sera pas unique si elle est obtenue à nouveau. Il est recommandé d'utiliser la séquence pour traiter les numéros de série et le temps d'épissage. Par exemple : 20170612235101+numéro de série

Méthode d'épissage de l'ID d'entreprise


Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:php.cn
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal