日志监控告警系统的设计与实现

栏目: 数据库 · 发布时间: 6年前

内容简介:日志监控告警系统的设计与实现

日志监控告警系统

基于的日志进行监控,监控需要一定规则,对触发监控规则的日志信息进行告警,告警的方式,是短信和邮件。

log4j---->error,info,debug 应用程序程序的日志   error 级别 TimeOutException 角标越界 IndexXXXException ......Error

com.alibaba.jstorm.daemon.worker.WorkerData]-[INFO] Current worker taskList:[1, 2, 3, 4, 5, 6, 7]

String.contains.(" taskList ")--------------> 当订单量触发一千万时,告警通知,让大家庆祝下。

OrdertotalNum 1000

kafaka生成集群的原理、分区

kafka消费者的负载均衡,kfakaSpout

Kafka broker(核心机制,topic,分片,文件存储机制)

Redis API学习

spout:从外部数据源中读取数据,然后转换为topology

架构图:

DataSource:外部数据源

Spout:接收外部数据源的组件,将外部数据源转化成storm内部的数据,以Tuple为基本的传输单元下发给Bolt.

Bolt:接受Spout发送的数据,或上游的bolt的发送的数据,根据业务逻辑进行处理,发送给下一个Bolt或者是存储到某种介质上,例如Redis。

Tuple:Storm内部中数据传输的基本单元,里面封装了一个List对象,用来保存数据。

StreamGroup:数据分钟策略,7种,shuffleGrouping,Non Grouping,FieldGrouping,Local or ShuffleGrouping.

Nimbus:任务分配

Supervisor:接受任务,并启动worker,worker的数量是根据端口号来的。

Worker:执行任务的具体组件(JVM),可以执行两种类型的任务,Spout任务或者bolt任务

Task:一个task属于一个Spout或者Bolt并发任务。

zk:保存任务分配的信息,心跳信息,元数据信息。

1 、背景知识

一款优秀的软件需要具备的特点

l 软件的实用性。

所谓有的放矢,软件的诞生是为了解决特定的问题,比如现在流行的 MVC 框架,早期的没有 MVC 开发的时候,耦合度很大,后期维护更新成本高,难度大,这样 MVC 框架就孕育而生;比如陌陌这种社交软件,是为了解决陌生人之间交流的问题;比如疼醒这种软件是为了解决人们远程沟通的问题;比如 OA 系统为了解决公司协同流程、项目管理、知识管理等问题……所以一款优秀的软件必须能够解决一个领域内的问题。

l 软件的稳定性。

软件的实用性问题解决之后,急需要解决的问题就是软件的稳定性。一般线上系统都会承载企业的某项业务,系统的稳定性直接影响了业务是否能够正常运营。很多创业公司在前期只注重业务的发展,不太在意系统的稳定性,一旦用户两比较大的之后,就会出现很多性能的问题。这种情况就好比,你找了一个妹子,并准备深入交往后结婚,却发现这个妹子总是有很多异性朋友在联系 ……

l 代码的规范性

铁打的营盘流水的兵,一款优秀的软件不仅仅是功能的实现。 整体架构、功能模块、代码注释、扩展性等问题也也需要考虑 ,毕竟在一个软件的生命周期过程中,参与的人实在是太多了,主创人员也可能随时流式。所以代码的规范性就难能可贵了。

l 升级保持向前兼容性。

如果一个软件平常使用挺好的,但是升级却越来越费劲,或者升级后稳定性大打折扣,也难以称得上一个好的软件。

l 基本的使用手册

文档、文档、文档、一个简单有效的使用手册,才是程序的王道,知其然才能知其所以然。能让用户一目了然,功能、架构、设计思路、代码等等。

2 、需求分析

随着公司业务发展,支撑公司业务的各种系统越来越多,为了保证公司的业务正常发展,急需要对这些线上系统的运行进行监控,做到问题的及时发现和处理,最大程度减少对业务的影响。

目前系统分类有:

1) 有基于 Tomcat web 应用

2) 有独立的 Java Application 应用

3) 有运行在 linux 上的脚本程序

4) 有大规模的集群框架( zookeeper Hadoop Storm SRP ……)

5) 有操作系统的运行日志

主要功能需求分为:

监控系统日志中的内容,按照一定规则进行过滤

发现问题之后通过短信和邮件进行告警

3 、功能分析

l 数据输入

使用 flume 客户端获取个系统的数据;

用户通过页面输入系统名称、负责人触发规则等信息

l 数据存储

使用 flume 采集数据并存放在 kafka 集群中

l 数据计算

使用 storm 编写程序对日志进行过滤,将满足过滤规则的信息,通过邮件短信告警并保存到数据库中

l 数据展示

管理页面可以查看触发规则的信息,系统负责人,联系方式,触发信息明细等

4 、原型设计

产品经理设计产品原形

5 、架构设计

5.1 、整体架构设计

主要架构为应用 +flume+kafka+storm+mysql+Java web 。数据流程如下:

1. 应用程序使用 log4j 产生日志

2. 部署 flume 客户端监控应用程序产生的日志信息,并发送到 kafka 集群中

3. storm spout 拉去 kafka 的数据进行消费,逐条过滤每条日志的进行规则判断,对符合规则的日志进行邮件告警。

4. 最后将告警的信息保存到 mysql 数据库中,用来进行管理。

5.2 Flume 设计

l Flume 说明

Flume 是一个分布式、可靠地、可用的服务,用来收集、聚合、传输日志数据。

它是一个基于流式数据的架构,简单而灵活。具有健壮性、容错机制、故障转移、恢复机制。

它提供一个简单的可扩展的数据模型,容许在线分析程序。 F

Flume 作为  cloudera  开发的实时日志收集系统,受到了业界的认可与广泛应用。

l Flume 设计摘要

使用 Flume EXEC 执行一个 linux 命令来生成数据源。例如,可以用 tail 命令监控一个文件,那么,只要文件增加内容, EXEC 就可以将增加的内容作为数据源发送出去。

使用 org.apache.flume.plugins.KafkaSink ,将 Flume EXEC 产生的数据源发送到 Kafka 中。

5.3 Kafka 设计

l Kafka 说明

kafka 是一个分布式消息队列:生产者、消费者的功能。

l Kakfa 设计摘要

部署 kafka 集群,在集群中添加一个 Topic monitor_realtime_javaxy

5.4 Storm 设计

l KafkaSpout 读取数据,需要配置 Topic monitor_realtime_javaxy

l FilterBolt 判断规则

l NotifyBolt 用来发送邮件或短信息

l Save2DB 用来将告警信息写入 mysql 数据库

5.5、数据模型设计

5.5.1 、用户表

用来保存用户的信息,包括账号、手机号码、邮箱、是否有效等信息

日志监控告警系统的设计与实现

5.5.2 、应用表

用来保存应用的信息,包括应用名称、应用描述、应用是否在线等信息

日志监控告警系统的设计与实现

5.5.3 、应用类型表

用来保存应用的类型等信息

日志监控告警系统的设计与实现

5.5.4 、规则表

用来保存规则的信息,包括规则名称,规则描述,规则关键词等信息

日志监控告警系统的设计与实现

5.5.5 、规则记录表

用来保存触发规则后的记录,包括告警编号、是否短信告知、是否邮件告知、告警明细等信息。

日志监控告警系统的设计与实现

6、代码开发

6.1、整体结构

日志监控告警系统的设计与实现

6.2、 LogMonitorTopologyMain 驱动类

public class LogMonitorTopologyMain {
    private static Logger logger = Logger.getLogger(LogMonitorTopologyMain.class);

    public static void main(String[] args) throws  Exception{
        // 使用TopologyBuilder进行构建驱动类
        TopologyBuilder builder = new TopologyBuilder();

//         设置kafka的zookeeper集群
//        BrokerHosts hosts = new ZkHosts("zk01:2181,zk02:2181,zk03:2181");
////        // 初始化配置信息
//        SpoutConfig spoutConfig = new SpoutConfig(hosts, "logmonitor", "/aaa", "log_monitor");
        // 在topology中设置spout
//        builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig),3);
        builder.setSpout("kafka-spout",new RandomSpout(new StringScheme()),2);
        builder.setBolt("filter-bolt",new FilterBolt(),3).shuffleGrouping("kafka-spout");
        builder.setBolt("prepareRecord-bolt",new PrepareRecordBolt(),2).fieldsGrouping("filter-bolt", new Fields("appId"));
        builder.setBolt("saveMessage-bolt",new SaveMessage2MySql(),2).shuffleGrouping("prepareRecord-bolt");

        //启动topology的配置信息
        Config topologConf = new Config();
        //TOPOLOGY_DEBUG(setDebug), 当它被设置成true的话, storm会记录下每个组件所发射的每条消息。
        //这在本地环境调试topology很有用, 但是在线上这么做的话会影响性能的。
        topologConf.setDebug(true);
        //storm的运行有两种模式: 本地模式和分布式模式.
        if (args != null && args.length > 0) {
            //定义你希望集群分配多少个工作进程给你来执行这个topology
            topologConf.setNumWorkers(2);
            //向集群提交topology
            StormSubmitter.submitTopologyWithProgressBar(args[0], topologConf, builder.createTopology());
        } else {
            topologConf.setMaxTaskParallelism(3);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", topologConf, builder.createTopology());
         Utils.sleep(10000000);
         cluster.shutdown();
        }
    }
}

6.3 FilterBolt 用来过滤日志信息

主要是过滤格式和校验 appId是否合法。

public void execute(Tuple input, BasicOutputCollector collector) {
        //获取KafkaSpout发送出来的数据
        String line = input.getString(0);
        //获取kafka发送的数据,是一个byte数组
//        byte[] value = (byte[]) input.getValue(0);
        //将数组转化成字符串
//        String line = new String(value);
        //对数据进行解析
        // appid   content
        //1  error: Caused by: java.lang.NoClassDefFoundError: com/starit/gejie/dao/SysNameDao
        Message message = MonitorHandler.parser(line);
        if (message == null) {
            return;
        }
        if (MonitorHandler.trigger(message)) {
            collector.emit(new Values(message.getAppId(), message));
        }
        //定时更新规则信息
        MonitorHandler.scheduleLoad();
    }

6.4 PrepareRecordBolt 发送邮件告警和短信告警

public void execute(Tuple input, BasicOutputCollector collector) {
    Message message = (Message) input.getValueByField("message");
    String appId = input.getStringByField("appId");
    //将触发规则的信息进行通知
    MonitorHandler.notifly(appId, message);
    Record record = new Record();
    try {
        BeanUtils.copyProperties(record, message);
        collector.emit(new Values(record));
    } catch (Exception e) {

    }
}

6.6、 SaveMessage2MySq 保存到数据库

public class SaveMessage2MySql extends BaseBasicBolt {
    private static Logger logger = Logger.getLogger(SaveMessage2MySql.class);
    public void execute(Tuple input, BasicOutputCollector collector) {
        Record record = (Record) input.getValueByField("record");
        MonitorHandler.save(record);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}

6.7 、核心类  MonitorHandler   所有流程处理的核心代码

public class MonitorHandler {

    private static Logger logger = Logger.getLogger(MonitorHandler.class);
    //定义一个map,其中appId为Key,以该appId下的所有rule为Value
    private static Map<String, List<Rule>> ruleMap;
    //定义一个map,其中appId为Key,以该appId下的所有user为Value
    private static Map<String, List<User>> userMap;
    //定义一个list,用来封装所有的应用信息
    private static List<App> applist;
    //定义一个list,用来封装所有的用户信息
    private static List<User> userList;
    //定时加载配置文件的标识
    private static boolean reloaded = false;
    //定时加载配置文件的标识
    private static long nextReload = 0l;

    static {
        load();
    }

    /**
     * 解析输入的日志,将数据按照一定的规则进行分割。
     * 判断日志是否合法,主要校验日志所属应用的appId是否存在
     *
     * @param line 一条日志
     * @return
     */
    public static Message parser(String line) {
        //日志内容分为两个部分:由5个$$$$$符号作为分隔符,第一部分为appid,第二部分为日志内容。
        String[] messageArr = line.split("\\$\\$\\$\\$\\$");
        //对日志进行校验
        if (messageArr.length != 2) {
            return null;
        }
        if (StringUtils.isBlank(messageArr[0]) || StringUtils.isBlank(messageArr[1])) {
            return null;
        }
        //检验当前日志所属的appid是否是经过授权的。
        if (apppIdisValid(messageArr[0].trim())) {
            Message message = new Message();
            message.setAppId(messageArr[0].trim());
            message.setLine(messageArr[1]);
            return message;
        }
        return null;
    }

    /**
     * 验证appid是否经过授权
     */
    private static boolean apppIdisValid(String appId) {
        try {
            for (App app : applist) {
                if (app.getId() == Integer.parseInt(appId)) {
                    return true;
                }
            }
        } catch (Exception e) {
            return false;
        }
        return false;
    }

    /**
     * 对日志进行规制判定,看看是否触发规则
     * @param message
     * @return
     */
    public static boolean trigger(Message message) {
        //如果规则模型为空,需要初始化加载规则模型
        if (ruleMap == null) {
            load();
        }
        //从规则模型中获取当前appid配置的规则
        System.out.println(message.getAppId());
        List<Rule> keywordByAppIdList = ruleMap.get(message.getAppId());
        for (Rule rule : keywordByAppIdList) {
            //如果日志中包含过滤过的关键词,即为匹配成功
            if (message.getLine().contains(rule.getKeyword())) {
                message.setRuleId(rule.getId() + "");
                message.setKeyword(rule.getKeyword());
                return true;
            }
        }
        return false;
    }

    /**
     * 加载数据模型,主要是用户列表、应用管理表、组合规则模型、组合用户模型。
     */
    public static synchronized void load() {
        if (userList == null) {
            userList = loadUserList();
        }
        if (applist == null) {
            applist = loadAppList();
        }
        if (ruleMap == null) {
            ruleMap = loadRuleMap();
        }
        if (userMap == null) {
            userMap = loadUserMap();
        }
    }

    /**
     * 访问数据库获取所有有效的app列表
     * @return
     */
    private static List<App> loadAppList() {
        return new LogMonitorDao().getAppList();
    }

    /**
     * 访问数据库获取所有有效用户的列表
     * @return
     */
    private static List<User> loadUserList() {
        return new LogMonitorDao().getUserList();
    }

    /**
     * 封装应用与用户对应的map
     * @return
     */
    private static Map<String, List<User>> loadUserMap() {
        //以应用的appId为key,以应用的所有负责人的userList对象为value。
        //HashMap<String, List<User>>
        HashMap<String, List<User>> map = new HashMap<String, List<User>>();
        for (App app : applist) {
            String userIds = app.getUserId();
            List<User> userListInApp = map.get(app.getId());
            if (userListInApp == null) {
                userListInApp = new ArrayList<User>();
                map.put(app.getId() + "", userListInApp);
            }
            String[] userIdArr = userIds.split(",");
            for (String userId : userIdArr) {
                userListInApp.add(queryUserById(userId));
            }
            map.put(app.getId() + "", userListInApp);
        }
        return map;
    }

    /**
     *  封装应用与规则的map
     * @return
     */
    private static Map<String, List<Rule>> loadRuleMap() {
        Map<String, List<Rule>> map = new HashMap<String, List<Rule>>();
        LogMonitorDao logMonitorDao = new LogMonitorDao();
        List<Rule> ruleList = logMonitorDao.getRuleList();
        //将代表rule的list转化成一个map,转化的逻辑是,
        // 从rule.getAppId作为map的key,然后将rule对象作为value传入map
        //Map<appId,ruleList>  一个appid的规则信息,保存在一个list中。
        for (Rule rule : ruleList) {
            List<Rule> ruleListByAppId = map.get(rule.getAppId()+"");
            if (ruleListByAppId == null) {
                ruleListByAppId = new ArrayList<Rule>();
                map.put(rule.getAppId() + "", ruleListByAppId);
            }
            ruleListByAppId.add(rule);
            map.put(rule.getAppId() + "", ruleListByAppId);
        }
        return map;
    }

    /**
     * 通过用户编号获取用户的JavaBean
     * @param userId
     * @return
     */
    private static User queryUserById(String userId) {
        for (User user : userList) {
            if (user.getId() == Integer.parseInt(userId)) {
                return user;
            }
        }
        return null;
    }

    /**
     * 通过app编号,获取当前app的所有负责人列表
     * @param appId
     * @return
     */
    public static List<User> getUserIdsByAppId(String appId) {
        return userMap.get(appId);
    }

    /**
     * 告警模块,用来发送邮件和短信
     * 短信功能由于短信资源匮乏,目前默认返回已发送。
     * @param appId
     * @param message
     */
    public static void notifly(String appId, Message message) {
        //通过appId获取应用负责人的对象
        List<User> users = getUserIdsByAppId(appId);
        //发送邮件
        if (sendMail(appId, users, message)) {
            message.setIsEmail(1);
        }
        //发送短信
        if (sendSMS(appId, users, message)) {
            message.setIsPhone(1);
        }
    }

    /**
     * 发送短信的模块
     * 由于短信资源匮乏,目前该功能不开启,默认true,即短信发送成功。
     * 目前发送短信功能使用的是外部接口,外面接口的并发性没法保证,会影响storm程序运行的效率。
     *  后期可以改造为将短信数据发送到外部的消息队里中,然后创建一个worker去发送短信。
     * @param appId
     * @param users
     * @param message
     * @return
     */
    private static boolean sendSMS(String appId, List<User> users, Message message) {
//        return true;
        List<String> mobileList = new ArrayList<String>();
        for (User user : users) {
            mobileList.add(user.getMobile());
        }
        for (App app : applist) {
            if (app.getId() == Integer.parseInt(appId.trim())) {
                message.setAppName(app.getName());
                break;
            }
        }
        String content = "系统【" + message.getAppName() + "】在 " + DateUtils.getDateTime() + " 触发规则 " + message.getRuleId() + ",关键字:" + message.getKeyword();
        return SMSBase.sendSms(listToStringFormat(mobileList), content);
    }

    /**
     * 发送邮件
     * 后期可以改造为将邮件数据发送到外部的消息队里中,然后创建一个worker去发送短信。
     * @param appId
     * @param userList
     * @param message
     * @return
     */
    private static boolean sendMail(String appId, List<User> userList, Message message) {
        List<String> receiver = new ArrayList<String>();
        for (User user : userList) {
            receiver.add(user.getEmail());
        }
        for (App app : applist) {
            if (app.getId() == Integer.parseInt(appId.trim())) {
                message.setAppName(app.getName());
                break;
            }
        }
        if (receiver.size() >= 1) {
            String date = DateUtils.getDateTime();
            String content = "系统【" + message.getAppName() + "】在 " + date + " 触发规则 " + message.getRuleId() + " ,过滤关键字为:" + message.getKeyword() + "  错误内容:" + message.getLine();
            MailInfo mailInfo = new MailInfo("系统运行日志监控", content, receiver, null);
            return MessageSender.sendMail(mailInfo);
        }
        return false;
    }

    /**
     * 保存触发规则的信息,将触发信息写入到 mysql 数据库中。
     *
     * @param record
     */
    public static void save(Record record) {
        new LogMonitorDao().saveRecord(record);
    }

    /**
     * 将list转换为String
     * @param list
     * @return
     */
    private static String listToStringFormat(List<String> list) {
        StringBuilder stringBuilder = new StringBuilder();
        for (int i = 0; i < list.size(); i++) {
            if (i == list.size() - 1) {
                stringBuilder.append(list.get(i));
            } else {
                stringBuilder.append(list.get(i)).append(",");
            }
        }
        return stringBuilder.toString();
    }

    /**
     * 配置scheduleLoad重新加载底层数据模型。
     */
    /**
     * thread 4
     * thread 3
     * thread 2
     */
    public static synchronized void reloadDataModel() {
//        * thread 1  reloaded = true   ----> reloaded = false
//        * thread 2  reloaded = false
//        * thread 2  reloaded = false
 //        * thread 2  reloaded = false
        if (reloaded) {
            long start = System.currentTimeMillis();
            userList = loadUserList();
            applist = loadAppList();
            ruleMap = loadRuleMap();
            userMap = loadUserMap();
            reloaded = false;
            nextReload = 0l;
            logger.info("配置文件reload完成,时间:"+DateUtils.getDateTime()+" 耗时:"+ (System.currentTimeMillis()-start));
        }


    }

    /**
     * 定时加载配置信息
     * 配合reloadDataModel模块一起使用。
     * 主要实现原理如下:
     * 1,获取分钟的数据值,当分钟数据是10的倍数,就会触发reloadDataModel方法,简称reload时间。
     * 2,reloadDataModel方式是线程安全的,在当前worker中只有一个线程能够操作。
     * 3,为了保证当前线程操作完毕之后,其他线程不再重复操作,设置了一个标识符reloaded。
     *      在非reload时间段时,reloaded一直被置为true;
     *      在reload时间段时,第一个线程进入reloadDataModel后,加载完毕之后会将reloaded置为false。
     */
    public static void scheduleLoad() {
//        String date = DateUtils.getDateTime();
//        int now = Integer.parseInt(date.split(":")[1]);
//        if (now % 10 == 0) {//每10分钟加载一次
//            //1,2,3,4,5,6
//            reloadDataModel();
//        }else {
//            reloaded = true;
//        }

        if (System.currentTimeMillis()==nextReload){
            //thread 1,2,3,
            reloadDataModel();
        }


    }
}

7、运行结果

邮件发送

日志监控告警系统的设计与实现


以上所述就是小编给大家介绍的《日志监控告警系统的设计与实现》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

A Common-Sense Guide to Data Structures and Algorithms

A Common-Sense Guide to Data Structures and Algorithms

Jay Wengrow / Pragmatic Bookshelf / 2017-8-13 / USD 45.95

If you last saw algorithms in a university course or at a job interview, you’re missing out on what they can do for your code. Learn different sorting and searching techniques, and when to use each. F......一起来看看 《A Common-Sense Guide to Data Structures and Algorithms》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码