ActiveMQ는 JMS 사양에서 지속성 메시지와 비지속성 메시지를 지원합니다.
지속성 메시지는 일반적으로 소비자의 사용 여부에 관계없이 소비자가 메시지를 소비하도록 보장하는 데 사용됩니다. 온라인이든 아니든. 메시지가 소비된 것으로 확인되면 저장소에서 삭제됩니다
비지속성 메시지는 일반적으로 알림 및 실시간 데이터를 보내는 데 사용되며 일반적으로 성능이 먼저 요구되며 메시지 신뢰성은 필수가 아닙니다.
MQ는 메모리, 파일, 관계형 데이터베이스 등과 같은 플러그형 풀아웃 메시지 저장을 지원합니다.
ActiveMQ의 큐 메시지 모델 저장
FIFO(선입선출) 사용 하나의 메시지는 한 명의 소비자만 소비할 수 있으며, 메시지는 소비가 확인된 후에만 삭제됩니다.
주제 메시지 모델(지속적 구독용)
각 구독자가 얻은 메시지는 실제로 메시지 복사본 하나만 저장됩니다. MQ는 메시지 저장소를 가리키는 포인터를 제공하고 메시지 복사본을 배포합니다. 그렇지 않으면 모든 영구 구독자가 메시지를 수신할 때까지 메시지를 삭제할 수 없습니다.
영구 저장 방법:
KahaDB 메시지 저장
AMQ 메시지 저장
JDBC 메시지 저장
인 메모리 메시지 저장
KahaDB는 ActiveMQ 5.4부터 기본 지속성 플러그인입니다. KahaDb의 복구 시간은 이전 AMQ보다 훨씬 짧고 더 적은 수의 데이터 파일을 사용하므로 AMQ를 완전히 대체할 수 있습니다. KahaDB의 지속성 메커니즘도 로그 파일, 인덱스 및 캐시를 기반으로 합니다.
(1) KahaDB 주요 기능:
메시지를 로그 형식으로 저장합니다.
메시지 인덱스는 B-Tree 구조에 저장되며 빠르게 업데이트할 수 있습니다.
다양한 복구 메커니즘을 지원합니다.
(2) 적용 가능한 시나리오:
처리량이 많은 애플리케이션
대용량 메시지 저장
( 3) 구성 방법 conf/activemq.xml:
<persistenceAdapter><kahaDB directory="${activemq.data}/kahadb"/></persistenceAdapter>
(四)、KahaDB存储原理:
当有活动消费者时,用于临时存储,消息会被发送给消费着,同时被安排将被存储,如果消息及时被确认,就不需要写入到磁盘。写入到磁盘中的数据消息,在后续的消息活动中,如果消息发送成功,变标记为可删除的。系统会周期性的清除或者归档日志文件。
1、KahaDB内部结构
Data logs:消息日志包含了消息日志和一些命令
Cache:当有活动消费者时,用于临时存储,消息会被发送给消费着,同时被安排将被存储,如果消息及时被确认,这不需要写入到磁盘
Btree indexes(消息索引):用于引用消息日志(message id),它存储在内存中,这样能快速定位到。MQ会定期将内存中的消息索引保存到metadata store中,避免大量消息未发送时,消息索引占用过多内存空间。
Redo log用于在非正常关机情况下维护索引完整性。
2、目录结构:
Db log files:用于存储消息(默认大小32M),当log日志满了,会创建一个新的,当log日志中的消息都被删除,该日志文件会被删除或者归档。
Archive directory:当datalog不在被kahadb需要会被归档(通过archiveDataLogs属性控制)。
Db.data:存放Btree indexs。
Db.redo:存放redo file,用于恢复Btree indexs。
写入消息时,会将消息写入日志文件,由于是顺序追加写,性能很高。为了提升性能,创建消息主键索引,并且提供缓存机制,进一步提升性能。每个日志文件的大小都是有限制的(默认32m,可自行配置)。当超过这个大小,系统会重新建立一个文件。当所有的消息都消费完成,系统会删除这个文件或者归档(取决于配置)。主要的缺点是AMQ Message会为每一个Destination创建一个索引,如果使用了大量的Queue,索引文件的大小会占用很多磁盘空间。而且由于索引巨大,一旦Broker崩溃,重建索引的速度会非常慢。
特点:类似KahaDB,也包含了事务日志,每个destination都包含一个index文件,AMQ适用于高吞吐量的应用场景,但是不适合多个队列的场景。
配置方式conf/activemq.xml:
<!--AMQ directory:数据存储路径 syncOnWrite:是否同步写入 maxFileLength:日志文件大小 --><persistenceAdapter><amqPersistenceAdapterdirectory="${activemq.data}/AMQdb"syncOnWrite="true"maxFileLength="10mb" /></persistenceAdapter>
1、AMQ内部结构:
Data logs:消息日志包含了消息日志
Cache:用于消息的快速检索
Reference store indexes:用于引用datalogs中的消息,通过message ID 关联
2、目录结构:
Lock:保证同一时间只有一个borker访问文件目录
temp-storag:用于存储非持久化消息(当不在被存储在内存中),如等待慢消费者处理消息
Kr-store:用于存储引用消息日志数据
journal directory:包含了消息文件、消息日志和消息控制信息
Archive:归档的数据日志
支持通过JDBC将消息存储到关系数据库,性能上不如文件存储,能通过关系型数据库查询到消息的信息。
MQ支持的数据库:Apache Derby、MYsql、PostgreSQL、Oracle、SQLServer、Sybase、Informix、MaxDB。
存储表结构:
A、ACTIVEMQ_MSGS:用于存储消息,Queue和Topic都存储在这个表中:
ID:自增的数据库主键
CONTAINER:消息的Destination
MSGID_PROD:消息发送者客户端的主键
MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID
EXPIRATION:消息的过期时间,存储的是从1970-01-01到现在的毫秒数
MSG:消息本体的Java序列化对象的二进制数据
PRIORITY:优先级,从0-9,数值越大优先级越高
B、ACTIVEMQ_ACKS:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存:
主要的数据库字段如下:
CONTAINER:消息的Destination
SUB_DEST:如果是使用Static集群,这个字段会有集群其他系统的信息
CLIENT_ID:每个订阅者都必须有一个唯一的客户端ID用以区分
SUB_NAME:订阅者名称
SELECTOR:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性AND和OR操作
LAST_ACKED_ID:记录消费过的消息的ID。
C、ACTIVEMQ_LOCK(消息锁,保证同一时间只能有一个broker访问这些表结构):
表activemq_lock在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。这个表用于记录哪个Broker是当前的Master Broker。
配置方式:
1、配置数据源 conf/acticvemq.xml文件:
<!-- 配置数据源--> <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"><property name="driverClassName" value="com.mysql.jdbc.Driver"/><property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/><property name="username" value="root"/><property name="password" value="111111"/><property name="maxActive" value="200"/><property name="poolPreparedStatements" value="true"/> </bean>
2、配置broke中的persistenceAdapter :
dataSource指定持久化数据库的bean,createTablesOnStartup是否在启动的时候创建数据表,默认值是true,这样每次启动都会去创建数据表了,一般是第一次启动的时候设置为true,之后改成false。
<!-- JDBC配置 --><persistenceAdapter><jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/></persistenceAdapter>
ps:数据库activemq 需要手动创建。
内存消息存储,会将所有的持久化消息存储在内存中,必须注意JVM使用情况以及内存限制,适用于一些能快速消费的数据量不大的小消息,当MQ关闭或者宕机,未被消费的内存消息会被清空。
配置方式 设置 broker属性值 persistent="false":
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" persistent="false">
위 내용은 JMS Active MQ 메시지 저장 인스턴스에 대한 자세한 설명의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!