随着集团MHA集群的日渐增长,MHA管理平台话越来越迫切。而MHA平台的建设第一步就是将这些成百上千套的MHA集群信息收集起来,便于查询和管理。
MHA主要信息如下:
(1)基础配置信息;
(2)运行状态信息;
(3)启动及FailOver的log信息。
集团目前数据库的管理平台是在Archery的基础上打造,所以,需要将此功能嵌入到既有平台上。通过Archery系统进行查询展示。

简单来说,通过 Filebeat + Logstash + MySQL 架构 来收集保存各个集群的配置信息、启动及FailOver的log信息 和运行状态信息。
运行状态信息是通过一个小程序获取的,这个小程序每分钟执行一次,会把执行结果输出到文件中。当然这个文件是被failbeat监控的。
文件为mha_checkstatus.py
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import os
import io
import re
import ConfigParser
Path='/etc/mha'
#fout=open('输出文件名','w')
for Name in os.listdir(Path) :
Pathname= os.path.join(Path,Name)
## print(Pathname)
## print(Name)
config =ConfigParser.ConfigParser()
try:
config.read(Pathname)
server_item = config.sections()
server1_host = '' ##MHA cnf 配置文件中的节点1
server2_host = '' ##MHA cnf 配置文件中的节点2
server3_host = '' ##MHA cnf 配置文件中的节点3
mha_cnf_remark = ''
if 'server1' in server_item:
server1_host = config.get('server1','hostname')
else:
mha_cnf_remark = mha_cnf_remark + 'Server1未配置;'
if 'server2' in server_item:
server2_host = config.get('server2','hostname')
else:
mha_cnf_remark = mha_cnf_remark + 'Server2未配置;'
if 'server3' in server_item:
server3_host = config.get('server3','hostname')
##print(mha_cnf_remark)
except Exception as e:
print(e)
mha_status_result =''
###20190330
Name = Name.replace(".cnf", "")
###集群一主一从
if server1_host <> '' and server2_host <> '' and server3_host == '':
cmd_mha_status ='/???/???/bin/masterha_check_status --conf='+Pathname
with os.popen(cmd_mha_status) as mha_status:
mha_status_result = mha_status.read()
if 'running(0:PING_OK)' in mha_status_result:
print(Name+':::'+Pathname+':::0:::'+server1_host+':::'+mha_status_result)
print(Name+':::'+Pathname+':::0:::'+server2_host+':::'+mha_status_result)
if 'stopped(2:NOT_RUNNING)' in mha_status_result:
print(Name+':::'+Pathname+':::None:::'+server1_host+':::'+mha_status_result)
print(Name+':::'+Pathname+':::None:::'+server2_host+':::'+mha_status_result)
####集群一主两从
if server1_host <> '' and server2_host <> '' and server3_host <> '':
cmd_mha_status ='/???/???/bin/masterha_check_status --conf='+Pathname
with os.popen(cmd_mha_status) as mha_status:
mha_status_result = mha_status.read()
if 'running(0:PING_OK)' in mha_status_result:
print(Name+':::'+Pathname+':::0:::'+server1_host+':::'+mha_status_result)
print(Name+':::'+Pathname+':::0:::'+server2_host+':::'+mha_status_result)
print(Name+':::'+Pathname+':::0:::'+server3_host+':::'+mha_status_result)
if 'stopped(2:NOT_RUNNING)' in mha_status_result:
print(Name+':::'+Pathname+':::None:::'+server1_host+':::'+mha_status_result)
print(Name+':::'+Pathname+':::None:::'+server2_host+':::'+mha_status_result)
print(Name+':::'+Pathname+':::None:::'+server3_host+':::'+mha_status_result)
概况说明,就是到存放MHA配置的文件夹,根据每个集群的配置文档,去逐一执行下masterha_check_status,把结果格式化,输出到指定的文件中。这个就是每个集群的状态数据。通过filebeat实时汇报上去。
触发的方式可以是crontab,每分钟执行一次。再本案中是输出到 /???/checkmhastatus/masterha_check_status.log 中。
形式类似如下:
*/1 * * * * python /???/????/mha_checkstatus.py >> /???/????/masterha_check_status.log
CREATE TABLE `dbmha_status` (
`id` int NOT NULL AUTO_INCREMENT,
`host` varchar(100) NOT NULL,
`clustername` varchar(200) NOT NULL,
`logpath` varchar(500) NOT NULL,
`confpath` varchar(500) NOT NULL,
`mhstatus` varchar(100) NOT NULL,
`serverip` varchar(100) NOT NULL,
`info` varchar(2000) NOT NULL,
`create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '收集时间',
PRIMARY KEY (`id`),
KEY `idx_createtime` (`create_time`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
CREATE TABLE `dbmha_log` (
`id` int NOT NULL AUTO_INCREMENT,
`host` varchar(100) NOT NULL,
`clustername` varchar(200) NOT NULL,
`filename` varchar(200) NOT NULL,
`logpath` varchar(500) NOT NULL,
`message` longtext NOT NULL,
`create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '收集时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
CREATE TABLE `dbmha_conf_info` (
`id` int NOT NULL AUTO_INCREMENT,
`host` varchar(100) NOT NULL,
`clustername` varchar(200) NOT NULL DEFAULT '',
`confpath` varchar(500) NOT NULL DEFAULT '',
`manager_log` varchar(500) NOT NULL DEFAULT '',
`manager_workdir` varchar(500) NOT NULL DEFAULT '',
`master_binlog_dir` varchar(500) NOT NULL DEFAULT '',
`failover_script` varchar(500) NOT NULL DEFAULT '',
`online_change_script` varchar(500) NOT NULL DEFAULT '',
`password` varchar(128) NOT NULL DEFAULT '',
`ping_interval` varchar(100) NOT NULL DEFAULT '',
`remote_workdir` varchar(100) NOT NULL DEFAULT '',
`repl_password` varchar(128) NOT NULL DEFAULT '',
`repl_user` varchar(20) NOT NULL DEFAULT '',
`ssh_user` varchar(20) NOT NULL DEFAULT '',
`user` varchar(20) NOT NULL DEFAULT '',
`serverip1` varchar(100) NOT NULL DEFAULT '',
`port1` varchar(10) NOT NULL DEFAULT '',
`candidate_master1` varchar(5) NOT NULL DEFAULT '',
`check_repl_delay1` varchar(20) NOT NULL DEFAULT '',
`serverip2` varchar(100) NOT NULL DEFAULT '',
`port2` varchar(10) NOT NULL DEFAULT '',
`candidate_master2` varchar(5) NOT NULL DEFAULT '',
`check_repl_delay2` varchar(20) NOT NULL DEFAULT '',
`serverip3` varchar(100) NOT NULL DEFAULT '',
`port3` varchar(10) NOT NULL DEFAULT '',
`candidate_master3` varchar(5) NOT NULL DEFAULT '',
`check_repl_delay3` varchar(20) NOT NULL DEFAULT '',
`info` longtext NOT NULL,
`create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '收集时间',
PRIMARY KEY (`id`),
KEY `idx_createtime` (`create_time`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
..............
- type: log
paths:
- /???/????/masterha_check_status.log
fields:
log_type: mha-status
db_host: 111.111.XXX.1XX ###这个IP为mha Mnaager所在serverip
- type: log
paths:
- /???/mhaconf/*.cnf
fields:
log_type: mha-cnf
db_host: 111.111.XXX.XXX
multiline.type: pattern
multiline.pattern: '^\[server [[:space:]] default'
multiline.negate: true
multiline.match: after
- type: log
paths:
- /???/????/mha/*/*.log
fields:
log_type: mysql-mha
db_host: 111.111.XXX.XXX
................
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
beats {
port => 5044
}
}
filter {
if [fields][log_type] == "mysql-mha" {
grok {
match => ["message", "(?m)^%{TIMESTAMP_ISO8601:timestamp} %{BASE10NUM} \[%{WORD:error_level}\] %{GREEDYDATA:error_msg}$"]
}
grok {
match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/)(?<product>.*)(\\|\/).*"}
}
grok {
match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/).*(\\|\/)(?<filename>.*)"}
}
date {
match=> ["timestamp", "ISO8601"]
remove_field => ["timestamp"]
}
mutate {
copy => { "[log][file][path]" => "logpath"
"[fields][db_host]" => "manager_ip" }
}
mutate {
remove_field => ["@version", "beat", "input", "offset", "prospector", "source", "tags"]
}
}
if [fields][log_type] == "mha-cnf" {
mutate {
split => ["message","server"]
add_field => {"message1" => "%{[message][1]}"}
add_field => {"messages1" => "%{[message][2]}"}
add_field => {"messages2" => "%{[message][3]}"}
add_field => {"messages3" => "%{[message][4]}"}
add_field => {"dft_password" => "*********"}
add_field => {"dft_repl_password" => "*********"}
}
kv {
source => "message1"
field_split => "\n"
include_keys => ["manager_log", "manager_workdir", "master_binlog_dir", "master_ip_failover_script", "master_ip_online_change_script", "ping_interval", "remote_workdir", "repl_user", "ssh_user", "user" ]
prefix => "dft_"
remove_char_value => "<>\[\],"
}
kv {
source => "messages1"
field_split => "\n"
include_keys => ["candidate_master", "check_repl_delay", "hostname", "port" ]
prefix => "s1_"
}
kv {
source => "messages2"
field_split => "\n"
default_keys => [ "s2_candidate_master", "",
"s2_check_repl_delay", "",
"s2_hostname","",
"s2_port",""
]
include_keys => ["candidate_master", "check_repl_delay", "hostname", "port" ]
prefix => "s2_"
}
kv {
source => "messages3"
field_split => "\n"
default_keys => [ "s3_candidate_master", "",
"s3_check_repl_delay", "",
"s3_hostname","",
"s3_port",""
]
include_keys => ["candidate_master", "check_repl_delay", "hostname", "port" ]
prefix => "s3_"
}
grok {
match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/)(?<product>.*)(\\|\/).*"}
match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/).*(\\|\/)(?<filename>.*)"}
}
mutate {
copy => { "[fields][db_host]" => "manager_ip" }
copy => { "[log][file][path]" => "conf_path" }
gsub => [
"message", "需要加密的***密***码", "*********",
"message", "需要加密的其他字符", "*********"
]
}
date {
match=> ["timestamp", "ISO8601"]
remove_field => ["timestamp"]
}
mutate {
remove_field => ["@version", "beat", "input", "offset", "prospector", "source", "tags"]
}
}
if [fields][log_type] == "mha-status" {
mutate {
split => ["message",":::"]
add_field => {"cluster_name" => "%{[message][0]}"}
add_field => {"conf_path" => "%{[message][1]}"}
add_field => {"masterha_check_status" => "%{[message][2]}"}
add_field => {"server" => "%{[message][3]}"}
add_field => {"info" => "%{[message][4]}"}
}
grok {
match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/).*(\\|\/)(?<filename>.*)"}
}
mutate {
copy => { "[fields][db_host]" => "manager_ip" }
}
date {
match=> ["timestamp", "ISO8601"]
remove_field => ["timestamp"]
}
mutate {
remove_field => ["@version", "beat", "input", "offset", "prospector", "source", "tags"]
}
}
}
output {
if [fields][log_type] == "mysql-mha" {
jdbc {
driver_jar_path => "/???/???/logstash-7.6.0/vendor/jar/jdbc/mysql-connector-java-5.1.47.jar"
driver_class => "com.mysql.jdbc.Driver"
connection_string => "jdbc:mysql://120.120.XXX.XXX:3306/archery?user=ts23_dbhacluster&password=???????"
statement => ["INSERT INTO dbmha_log (host,clustername,filename,logpath, message) VALUES(?, ?, ?, ?, ?)","%{manager_ip}","%{product}","%{filename}","%{logpath}","%{message}"]
}
}
if [fields][log_type] == "mha-status" {
jdbc {
driver_jar_path => "/???/???/logstash-7.6.0/vendor/jar/jdbc/mysql-connector-java-5.1.47.jar"
driver_class => "com.mysql.jdbc.Driver"
connection_string => "jdbc:mysql://120.120.XXX.XXX:3306/archery?user=ts23_dbhacluster&password=???????"
statement => ["INSERT INTO dbmha_status (host,clustername,logpath,confpath,mhstatus,serverip,info) VALUES(?, ?, ?, ?, ?, ?, ?)","%{manager_ip}","%{cluster_name}","%{filename}","%{conf_path}","%{masterha_check_status}","%{server}","%{info}"]
}
}
if [fields][log_type] == "mha-cnf" {
jdbc {
driver_jar_path => "/???/???/logstash-7.6.0/vendor/jar/jdbc/mysql-connector-java-5.1.47.jar"
driver_class => "com.mysql.jdbc.Driver"
connection_string => "jdbc:mysql://120.120.XXX.XXX:3306/archery?user=ts23_dbhacluster&password=???????"
statement => ["INSERT INTO dbmha_conf_info (host,clustername,confpath,manager_log,manager_workdir,master_binlog_dir,failover_script,online_change_script,password,ping_interval,remote_workdir,repl_password,repl_user,ssh_user,user,serverip1,port1,candidate_master1,check_repl_delay1,serverip2,port2,candidate_master2,check_repl_delay2,serverip3,port3,candidate_master3,check_repl_delay3,info) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)","%{manager_ip}","%{product}","%{conf_path}","%{dft_manager_log}","%{dft_manager_workdir}","%{dft_master_binlog_dir}","%{dft_master_ip_failover_script}","%{dft_master_ip_online_change_script}","%{dft_password}","%{dft_ping_interval}","%{dft_remote_workdir}","%{dft_repl_password}","%{dft_repl_user}","%{dft_ssh_user}","%{dft_user}","%{s1_hostname}","%{s1_port}","%{s1_candidate_master}","%{s1_check_repl_delay}","%{s2_hostname}","%{s2_port}","%{s2_candidate_master}","%{s2_check_repl_delay}","%{s3_hostname}","%{s3_port}","%{s3_candidate_master}","%{s3_check_repl_delay}","%{message}"]
}
}
}
这个配置还是相对复杂难懂的。这个文件配置了对三种文件的读取,我们就看读取mha配置文件的部分【[fields][log_type] == "mha-cnf"】,我们挑其中的几个点说下,更多的内容可参照logstash官网--https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
首先,我们是 “server” 关键字,把文件中的配置信息,分割成不同的部分。
接着,因为配置文件的格式是 key=value的样式,所以需要借助 kv{},其中的参数说下:field_split---定义字段间的分隔符;include_keys--定义只读去规定的特定key;prefix---格式化字段名字,加个前缀名字,主要是用来区分server 1 部分和 server2、、、之间的分别。
通过【match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/)(?<product>.*)(\\|\/).*"}】,获取product字段,我们是通过mha的配置文件的名字来定义集群的名字,即规范了mha配置文件的名字的命名来自于集群的名字,反推得知了配置文件的名字,就知道了集群的名字。【match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/).*(\\|\/)(?<filename>.*)"}】这个地方的filename包含了文件的后缀。
我们是把此项目嵌入到既有的Archery平台中,增加了3个查询界面,界面的实现,在此就不具体展开了。
需要注意的是,界面需要支持模糊查询,例如支持MHA Manager Server IP查询(方便查询各个Manager节点上有多少集群);支持集群名字的模糊查询;支持节点serverIP的模糊查询。
Q.1 为什么用MySQL存储信息,ELK是更成熟的架构啊?
是的,用elasticsearch来存储这种文本信息更常见。我们用MySQL替代elasticsearch基于以下考虑:(1)我们既有的管理平台使用的是MySQL,把他们保存到MySQL 便于集成;(2)这些数据,不仅仅是Log,还有些是基础数据,放到MySQL便于相互管理、聚合展示(3)这是数据量并不大,例如mha.log,只有在启动或者failover时才有变化,conf信息也是很少的,所以,从数据量也一点考虑,也不需要保存到MySQL。
Q.2 Logstash 可以把数据写入到MySql中吗?
是可以的。主要是logstash-output-jdbc、logstash-codec-plain插件的安装。
如果是离线的环境下安装,可以参考 《logstash 离线安装logstash-output-jdbc》
https://blog.csdn.net/sxw1065430201/article/details/123663108
Q.3 MHA log 文件夹中 原有一个 .health ,里面是MHA每分钟的健康性报告,那为什么还要自己写Python程序获取呢?
因为.healthy 的内容不是换行符结尾,而filebeat是以换行符来判断的(https://www.elastic.co/guide/en/beats/filebeat/7.4/newline-character-required-eof.html 有详细说明)。
简单来说,filebeat读取不了。
Q.4 MHA 健康性检查的原理
具体的原理可以参考此文章的分析说明--《mha检测mysql状况方式》
https://blog.csdn.net/weixin_35411438/article/details/113455263
Q.5 历史数据的删除
mha log 信息表 dbmha_log
MHA 基础配置表 dbmha_conf_info
以上两张表基本上很少变化,量不大,其数据无需定期删除。
运行状态表 dbmha_status,此表每个集群每分钟(具体crontab定义)都会有新数据插入,数据量增长较大,应设置定时任务,定期删除历史数据,例如删除7天前的数据。
大约一年前,我决定确保每个包含非唯一文本的Flash通知都将从模块中的方法中获取文本。我这样做的最初原因是为了避免一遍又一遍地输入相同的字符串。如果我想更改措辞,我可以在一个地方轻松完成,而且一遍又一遍地重复同一件事而出现拼写错误的可能性也会降低。我最终得到的是这样的:moduleMessagesdefformat_error_messages(errors)errors.map{|attribute,message|"Error:#{attribute.to_s.titleize}#{message}."}enddeferror_message_could_not_find(obje
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
在应用开发中,有时候我们需要获取系统的设备信息,用于数据上报和行为分析。那在鸿蒙系统中,我们应该怎么去获取设备的系统信息呢,比如说获取手机的系统版本号、手机的制造商、手机型号等数据。1、获取方式这里分为两种情况,一种是设备信息的获取,一种是系统信息的获取。1.1、获取设备信息获取设备信息,鸿蒙的SDK包为我们提供了DeviceInfo类,通过该类的一些静态方法,可以获取设备信息,DeviceInfo类的包路径为:ohos.system.DeviceInfo.具体的方法如下:ModifierandTypeMethodDescriptionstatic StringgetAbiList()Obt
文章目录一、概述简介原理模块二、配置Mysql使用版本环境要求1.操作系统2.mysql要求三、配置canal-server离线下载在线下载上传解压修改配置单机配置集群配置分库分表配置1.修改全局配置2.实例配置垂直分库水平分库3.修改group-instance.xml4.启动监听四、配置canal-adapter1修改启动配置2配置映射文件3启动ES数据同步查询所有订阅同步数据同步开关启动4.验证五、配置canal-admin一、概述简介canal是Alibaba旗下的一款开源项目,Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。Git地址:https://github.co
我遇到了这个奇怪的错误.../Users/gideon/Documents/ca_ruby/rubytactoe/lib/player.rb:13:in`gets':Isadirectory-spec(Errno::EISDIR)player_spec.rb:require_relative'../spec_helper'#theuniverseisvastandinfinite...itcontainsagame....butnoplayersdescribe"tictactoegame"docontext"theplayerclass"doit"musthaveahumanplay
我看到其他人也遇到过类似的问题,但没有一个解决方案对我有用。0.3.14gem与其他gem文件一起存在。我已经完全按照此处指示完成了所有操作:https://github.com/brianmario/mysql2.我仍然得到以下信息。我不知道为什么安装程序指示它找不到include目录,因为我已经检查过它存在。thread.h文件存在,但不在ruby目录中。相反,它在这里:C:\RailsInstaller\DevKit\lib\perl5\5.8\msys\CORE\我正在运行Windows7并尝试在Aptana3中构建我的Rails项目。我的Ruby是1.9.3。$gemin
我已经开始使用mysql2gem。我试图弄清楚一些基本的事情——其中之一是如何明确地执行事务(对于批处理操作,比如多个INSERT/UPDATE查询)。在旧的ruby-mysql中,这是我的方法:client=Mysql.real_connect(...)inserts=["INSERTINTO...","UPDATE..WHEREid=..",#etc]client.autocommit(false)inserts.eachdo|ins|beginclient.query(ins)rescue#handleerrorsorabortentirelyendendclient.commi
我有两个文本文件,master.txt和926.txt。如果926.txt中有一行不在master.txt中,我想写入一个新文件notinbook.txt。我写了我能想到的最好的东西,但考虑到我是一个糟糕的/新手程序员,它失败了。这是我的东西g=File.new("notinbook.txt","w")File.open("926.txt","r")do|f|while(line=f.gets)x=line.chompifFile.open("master.txt","w")do|h|endwhile(line=h.gets)ifline.chomp!=xputslineendende
有没有人得到Logstash在Rails上使用ruby?我的客户告诉我将Logstash用于日志收集器等。我正在使用rubyonrails技术。大部分都快完成了。但要求是将日志记录到logstash中。请让我知道这可能吗? 最佳答案 我为此编写了一个gem-logstasher.它将Rails日志写入一个单独的文件,采用纯json格式,无需任何处理即可由logstash使用。查看我的blog有关如何设置Logstash和Kibana的完整说明 关于ruby-on-rails-Lo
在我的场景中,Logstash收到的系统日志行的“时间戳”是UTC,我们在Elasticsearch输出中使用事件“时间戳”:output{elasticsearch{embedded=>falsehost=>localhostport=>9200protocol=>httpcluster=>'elasticsearch'index=>"syslog-%{+YYYY.MM.dd}"}}我的问题是,在UTC午夜,Logstash在外时区(GMT-4=>America/Montreal)结束前将日志发送到不同的索引,并且索引在20小时(晚上8点)之后没有日志,因为“时间戳”是UTC。我们已