基于FLink如何实现实时安全检测

其他教程   发布日期:2023年07月04日   浏览次数:501

这篇“基于FLink如何实现实时安全检测”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“基于FLink如何实现实时安全检测”文章吧。

场景描述

针对一个内部系统,如邮件系统,公司员工的访问行为日志,存放于kafka,我们希望对于一个用户账号在同一个IP下,任意的3分钟时间内,连续登录邮件系统20次失败,下一次登录成功,这种场景能够及时获取并推送到企业微信某个指定的安全接口人。kafka中的数据,能够通过某个关键字,区分当前网络访问是否一次登录事件,且有访问时间(也就是事件时间)。在解析到符合需求的用户账号之后,第一时间进行企业微信告警推送,并将其这段时间内的访问行为,写入下游ElasticSearch。

组件版本

  • Flink-1.14.4

  • Java8

  • ElasticSearch-7.3.2

  • Kafka-2.12_2.8.1

日志结构

IP和账号皆为测试使用。

  1. {
  2. "user": "wangxm",
  3. "client_ip": "110.68.6.182",
  4. "source": "login",
  5. "loginname": "wangxm@test.com",
  6. "IP": "110.8.148.58",
  7. "timestamp": "17:58:12",
  8. "@timestamp": "2022-04-20T09:58:13.647Z",
  9. "ip": "110.7.231.25",
  10. "clienttype": "POP3",
  11. "result": "success",
  12. "@version": "1"
  13. }

技术方案

上述场景,可考虑使用FlinkCEP及Flink的滑动窗口进行实现。由于本人在采用FlinkCEP的方案进行代码编写调试后,发现并不能满足,因此改用滑动窗口进行实现。

关键代码

主入口类

主入口类,创建了flink环境、设置了基础参数,创建了kafkaSource,接入消息后,进行了映射、过滤,并设置了水位线,进行了分组,之后设置了滑动窗口,在窗口内进行了事件统计,将复合条件的事件收集返回并写入ElasticSearch。

针对map、filter、keyBy、window等算子,都单独进行了编写,后面会一一列出来。

  1. package com.data.dev.flink.mailTopic.main;
  2. import com.data.dev.common.javabean.BaseBean;
  3. import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm;
  4. import com.data.dev.elasticsearch.ElasticSearchInfo;
  5. import com.data.dev.elasticsearch.SinkToEs;
  6. import com.data.dev.flink.FlinkEnv;
  7. import com.data.dev.flink.mailTopic.OperationForLoginFailCheck.*;
  8. import com.data.dev.kafka.KafkaSourceBuilder;
  9. import com.data.dev.key.ConfigurationKey;
  10. import com.data.dev.utils.TimeUtils;
  11. import lombok.extern.slf4j.Slf4j;
  12. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  13. import org.apache.flink.connector.kafka.source.KafkaSource;
  14. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  15. import org.apache.flink.streaming.api.datastream.KeyedStream;
  16. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  17. import org.apache.flink.streaming.api.datastream.WindowedStream;
  18. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  19. import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
  20. import org.apache.flink.streaming.api.windowing.time.Time;
  21. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  22. import java.time.Duration;
  23. /**
  24. * Flink处理在3分钟内连续登录失败20次后登录成功的场景
  25. * 采用滑动窗口来实现
  26. * @author wangxiaomin 2022-06-01
  27. */
  28. @Slf4j
  29. public class MailMsg extends BaseBean {
  30. /**
  31. * Flink作业名称
  32. */
  33. public static final String JobName = "告警采集平台――连续登录失败后登录成功告警";
  34. /**
  35. * Kafka消息名
  36. */
  37. public static final String KafkaSourceName = "Kafka Source for AlarmPlatform About Mail Topic";
  38. public MailMsg(){
  39. log.info("初始化滑动窗口场景告警程序");
  40. }
  41. /**
  42. * 执行逻辑统计场景,实现告警推送
  43. */
  44. public static void execute(){
  45. //① 创建Flink执行环境并设置checkpoint等必要的参数
  46. StreamExecutionEnvironment env = FlinkEnv.getFlinkEnv();
  47. KafkaSource<String> kafkaSource = KafkaSourceBuilder.getKafkaSource(ConfigurationKey.KAFKA_MAIL_TOPIC_NAME,ConfigurationKey.KAFKA_MAIL_CONSUMER_GROUP_ID) ;
  48. DataStreamSource<String> kafkaMailMsg = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(10)), KafkaSourceName);
  49. //② 筛选登录消息,创建初始登录事件流
  50. SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginMapDs = kafkaMailMsg.map(new MsgToBeanMapper()).name("Map算子加工");
  51. SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginFilterDs = loginMapDs.filter(new MailMsgForLoginFilter()).name("Filter算子加工");
  52. //③ 设置水位线
  53. WatermarkStrategy<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> watermarkStrategy = WatermarkStrategy.<com.data.dev.common.javabean.kafkaMailTopic.MailMsg>forBoundedOutOfOrderness(Duration.ofMinutes(1))
  54. .withTimestampAssigner((mailMsg, timestamp) -> TimeUtils.switchUTCToBeijingTimestamp(mailMsg.getTimestamp_datetime()));
  55. SingleOutputStreamOperator<com.data.dev.common.javabean.kafkaMailTopic.MailMsg> loginWmDs = loginFilterDs.assignTimestampsAndWatermarks(watermarkStrategy.withIdleness(Duration.ofMinutes(3))).name("增加水位线");
  56. //④ 设置主键
  57. KeyedStream<com.data.dev.common.javabean.kafkaMailTopic.MailMsg, String> loginKeyedDs = loginWmDs.keyBy(new LoginKeySelector());
  58. //⑥ 转化为滑动窗口
  59. WindowedStream<com.data.dev.common.javabean.kafkaMailTopic.MailMsg, String, TimeWindow> loginWindowDs = loginKeyedDs.window(SlidingEventTimeWindows.of(Time.seconds(180L),Time.seconds(90L)));
  60. //⑦ 在窗口内进行逻辑统计
  61. SingleOutputStreamOperator<MailMsgAlarm> loginWindowsDealDs = loginWindowDs.process(new WindowProcessFuncImpl()).name("窗口处理逻辑");
  62. //⑧ 将结果转化为通用DataStream<String>格式
  63. SingleOutputStreamOperator<String> resultDs = loginWindowsDealDs.map(new AlarmMsgToStringMapper()).name("窗口结果转化为标准格式");
  64. //⑨ 将最终结果写入ES
  65. resultDs.addSink(SinkToEs.getEsSinkBuilder(ElasticSearchInfo.ES_LOGIN_FAIL_INDEX_NAME,ElasticSearchInfo.ES_INDEX_TYPE_DEFAULT).build());
  66. //⑩ 提交Flink集群进行执行
  67. FlinkEnv.envExec(env,JobName);
  68. }
  69. }

mapper算子

  1. package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;
  2. import com.alibaba.fastjson.JSON;
  3. import com.data.dev.common.javabean.BaseBean;
  4. import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.apache.flink.api.common.functions.MapFunction;
  7. /**
  8. * 逻辑统计场景告警推送ES消息体
  9. * @author wangxiaoming-ghq 2022-06-01
  10. */
  11. @Slf4j
  12. public class AlarmMsgToStringMapper extends BaseBean implements MapFunction<MailMsgAlarm, String> {
  13. @Override
  14. public String map(MailMsgAlarm mailMsgAlarm) throws Exception {
  15. return JSON.toJSONString(mailMsgAlarm);
  16. }
  17. }

filter算子

  1. package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;
  2. import com.data.dev.common.javabean.BaseBean;
  3. import com.data.dev.common.javabean.kafkaMailTopic.MailMsg;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.flink.api.common.functions.FilterFunction;
  6. /**
  7. * ② 消费mail主题的消息,过滤其中login的事件
  8. * @author wangxiaoming-ghq 2022-06-01
  9. */
  10. @Slf4j
  11. public class MailMsgForLoginFilter extends BaseBean implements FilterFunction<MailMsg> {
  12. @Override
  13. public boolean filter(MailMsg mailMsg) {
  14. if("login".equals(mailMsg.getSource())) {
  15. log.info("筛选原始的login事件:【" + mailMsg + "】");
  16. }
  17. return "login".equals(mailMsg.getSource());
  18. }
  19. }

keyBy算子

  1. package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;
  2. import com.data.dev.common.javabean.BaseBean;
  3. import com.data.dev.common.javabean.kafkaMailTopic.MailMsg;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.flink.api.java.functions.KeySelector;
  6. /**
  7. * CEP 编程,需要进行key选取
  8. */
  9. @Slf4j
  10. public class LoginKeySelector extends BaseBean implements KeySelector<MailMsg, String> {
  11. @Override
  12. public String getKey(MailMsg mailMsg) {
  13. return mailMsg.getUser() + "@" + mailMsg.getClient_ip();
  14. }
  15. }

窗口函数(核心代码)

这里我们主要考虑使用一个事件列表,用来存储每一个窗口期内得到的连续登录,当检测到登陆失败的事件,即存入事件列表中,之后判断下一次登录失败事件,如果检测到登录成功事件,但此时登录失败的次数不足20次,则清空loginEventList,等待下一次检测。一旦符合窗口内连续登录失败超过20次且下一次登录成功这个事件,则清空此时的loginEventList并将当前登录成功的事件进行告警推送。

  1. package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;
  2. import com.data.dev.common.javabean.kafkaMailTopic.MailMsg;
  3. import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm;
  4. import com.data.dev.utils.HttpUtils;
  5. import com.data.dev.utils.IPUtils;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
  8. import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
  9. import org.apache.flink.util.Collector;
  10. import java.io.Serializable;
  11. import java.util.ArrayList;
  12. import java.util.List;
  13. /**
  14. * 滑动窗口内复杂事件解析逻辑实现
  15. * @author wangxiaoming-ghq 2022-06-01
  16. */
  17. @Slf4j
  18. public class WindowProcessFuncImpl extends ProcessWindowFunction<MailMsg, MailMsgAlarm, String, TimeWindow> implements Serializable {
  19. @Override
  20. public void process(String key, ProcessWindowFunction<MailMsg, MailMsgAlarm, String, TimeWindow>.Context context, Iterable<MailMsg> iterable, Collector<MailMsgAlarm> collector) {
  21. List<MailMsg> loginEventList = new ArrayList<>();
  22. MailMsgAlarm mailMsgAlarm;
  23. for (MailMsg mailMsg : iterable) {
  24. log.info("收集到的登录事件【" + mailMsg + "】");
  25. if (mailMsg.getResult().equals("fail")) { //开始检测当前窗口内的事件,并将失败的事件收集到loginEventList
  26. log.info("开始检测当前窗口内的事件,并将失败的事件收集到loginEventList");
  27. loginEventList.add(mailMsg);
  28. } else if (mailMsg.getResult().equals("success") && loginEventList.size() < 20) {//如果检测到登录成功事件,但此时登录失败的次数不足20次,则清空loginEventList,等待下一次检测
  29. log.info("检测到登录成功事件,但此时登录失败的次数为【" + loginEventList.size() + "】不足20次,清空loginEventList,等待下一次检测");
  30. loginEventList.clear();
  31. } else if (mailMsg.getResult().equals("success") && loginEventList.size() >= 20) {
  32. mailMsgAlarm = getMailMsgAlarm(loginEventList,mailMsg);
  33. log.info("检测到登录成功的事件,此时窗口内连续登录失败的次数为【" + mailMsgAlarm.getFailTimes() + "】");
  34. //一旦符合窗口内连续登录失败超过20次且下一次登录成功这个事件,则清空此时的loginEventList并将当前登录成功的事件进行告警推送;
  35. loginEventList.clear();
  36. doAlarmPush(mailMsgAlarm);
  37. collector.collect(mailMsgAlarm);//将当前登录成功的事件进行收集上报
  38. } else {
  39. log.info(mailMsg.getUser() + "当前已连续:【" + loginEventList.size() + "】 次登录失败");
  40. }
  41. }
  42. }
  43. /**
  44. * 2022年6月17日15:03:06
  45. * @param eventList:当前窗口内的事件列表
  46. * @param eventCurrent:当前登录成功的事件
  47. * @return mailMsgAlarm:告警消息体
  48. */
  49. public static MailMsgAlarm getMailMsgAlarm(List<MailMsg> eventList,MailMsg eventCurrent){
  50. String alarmKey = eventCurrent.getUser() + "@" + eventCurrent.getClient_ip();
  51. String loginFailStartTime = eventList.get(0).getTimestamp_datetime();
  52. String loginSuccessTime = eventCurrent.getTimestamp_datetime();
  53. int loginFailTimes = eventList.size();
  54. MailMsgAlarm mailMsgAlarm = new MailMsgAlarm();
  55. mailMsgAlarm.setMailMsg(eventCurrent);
  56. mailMsgAlarm.setAlarmKey(alarmKey);
  57. mailMsgAlarm.setStartTime(loginFailStartTime);
  58. mailMsgAlarm.setEndTime(loginSuccessTime);
  59. mailMsgAlarm.setFailTimes(loginFailTimes);
  60. return mailMsgAlarm;
  61. }
  62. /**
  63. * 2022年6月17日14:47:53
  64. * @param mailMsgAlarm :当前构建的需要告警的事件
  65. */
  66. public void doAlarmPush(MailMsgAlarm mailMsgAlarm){
  67. String userKey = mailMsgAlarm.getAlarmKey();
  68. String clientIp = mailMsgAlarm.mailMsg.getClient_ip();
  69. boolean isWhiteListIp = IPUtils.isWhiteListIp(clientIp);
  70. if(isWhiteListIp){//如果是白名单IP,不告警
  71. log.info("当前登录用户【" + userKey + "】属于白名单IP");
  72. }else {
  73. //IP归属查询结果、企业微信推送告警
  74. String user = HttpUtils.getUserByClientIp(clientIp);
  75. HttpUtils.pushAlarmMsgToWechatWork(user,mailMsgAlarm.toString());
  76. }
  77. }
  78. }

最后一次map算子

  1. package com.data.dev.flink.mailTopic.OperationForLoginFailCheck;
  2. import com.alibaba.fastjson.JSON;
  3. import com.data.dev.common.javabean.BaseBean;
  4. import com.data.dev.common.javabean.kafkaMailTopic.MailMsgAlarm;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.apache.flink.api.common.functions.MapFunction;
  7. /**
  8. * 逻辑统计场景告警推送ES消息体
  9. * @author wangxiaoming-ghq 2022-06-01
  10. */
  11. @Slf4j
  12. public class AlarmMsgToStringMapper extends BaseBean implements MapFunction<MailMsgAlarm, String> {
  13. @Override
  14. public String map(MailMsgAlarm mailMsgAlarm) throws Exception {
  15. return JSON.toJSONString(mailMsgAlarm);
  16. }
  17. }

ElasticSearch工具类

  1. package com.data.dev.elasticsearch;
  2. import com.data.dev.common.javabean.BaseBean;
  3. import com.data.dev.key.ConfigurationKey;
  4. import com.data.dev.key.ElasticSearchKey;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.apache.flink.api.common.functions.RuntimeContext;
  7. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
  8. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
  9. import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
  10. import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
  11. import org.apache.http.HttpHost;
  12. import org.apache.http.auth.AuthScope;
  13. import org.apache.http.auth.UsernamePasswordCredentials;
  14. import org.apache.http.client.CredentialsProvider;
  15. import org.apache.http.impl.client.BasicCredentialsProvider;
  16. import org.elasticsearch.action.index.IndexRequest;
  17. import org.elasticsearch.client.Requests;
  18. import java.util.ArrayList;
  19. import java.util.HashMap;
  20. import java.util.List;
  21. import java.util.Map;
  22. /**
  23. * 2022年6月17日15:15:06
  24. * @author wangxiaoming-ghq
  25. * Flink流计算结果写入ES公共方法
  26. */
  27. @Slf4j
  28. public class SinkToEs extends BaseBean {
  29. public static final long serialVersionUID = 2L;
  30. private static final HashMap<String,String> ES_PROPS_MAP = ConfigurationKey.getApplicationProps();
  31. private static final String HOST = ES_PROPS_MAP.get(ConfigurationKey.ES_HOST);
  32. private static final String PASSWORD = ES_PROPS_MAP.get(ConfigurationKey.ES_PASSWORD);
  33. private static final String USERNAME = ES_PROPS_MAP.get(ConfigurationKey.ES_USERNAME);
  34. private static final String PORT = ES_PROPS_MAP.get(ConfigurationKey.ES_PORT);
  35. /**
  36. * 2022年6月17日15:17:55
  37. * 获取ES连接信息
  38. * @return esInfoMap:ES连接信息持久化
  39. */
  40. public static HashMap<String,String > getElasticSearchInfo(){
  41. log.info("获取ES连接信息:【 " + "HOST="+HOST + "PORT="+PORT+"USERNAME="+USERNAME+"PASSWORD=********" + " 】");
  42. HashMap<String,String> esInfoMap = new HashMap<>();
  43. esInfoMap.put(ElasticSearchKey.HOST,HOST);
  44. esInfoMap.put(ElasticSearchKey.PASSWORD,PASSWORD);
  45. esInfoMap.put(ElasticSearchKey.USERNAME,USERNAME);
  46. esInfoMap.put(ElasticSearchKey.PORT,PORT);
  47. return esInfoMap;
  48. }
  49. /**
  50. * @param esIndexName:写入索引名称
  51. * @param esType:写入索引类型
  52. * @return ElasticsearchSink.Builder<String>:构建器
  53. */
  54. public static ElasticsearchSink.Builder<String> getEsSinkBuilder(String esIndexName,String esType){
  55. HashMap<String, String> esInfoMap = getElasticSearchInfo();
  56. List<HttpHost> httpHosts = new ArrayList<>();
  57. httpHosts.add(new HttpHost(String.valueOf(esInfoMap.get(ElasticSearchKey.HOST)), Integer.parseInt(esInfoMap.get(ElasticSearchKey.PORT)), "http"));
  58. ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
  59. httpHosts,
  60. new ElasticsearchSinkFunction<String>() {
  61. public IndexRequest createIndexRequest() {
  62. Map<String, String> json = new HashMap<>();
  63. //log.info("写入ES的data:【"+json+"】");
  64. IndexRequest index = Requests.indexRequest()
  65. .index(esIndexName)
  66. .type(esType)
  67. .source(json);
  68. return index;
  69. }
  70. @Override
  71. public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
  72. indexer.add(createIndexRequest());
  73. }
  74. }
  75. );
  76. //定义es的连接配置 带用户名密码
  77. RestClientFactory restClientFactory = restClientBuilder -> {
  78. CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
  79. credentialsProvider.setCredentials(
  80. AuthScope.ANY,
  81. new UsernamePasswordCredentials(
  82. String.valueOf(esInfoMap.get(ElasticSearchKey.USERNAME)),
  83. String.valueOf(esInfoMap.get(ElasticSearchKey.PASSWORD))
  84. )
  85. );
  86. restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> {
  87. httpAsyncClientBuilder.disableAuthCaching();
  88. return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
  89. });
  90. };
  91. esSinkBuilder.setRestClientFactory(restClientFactory);
  92. return esSinkBuilder;
  93. }
  94. }

事件实体类

  1. package com.data.dev.common.javabean.kafkaMailTopic;
  2. import com.data.dev.common.javabean.BaseBean;
  3. import lombok.Data;
  4. import java.util.Objects;
  5. /**
  6. * @author wangxiaoming-ghq 2022-05-15
  7. * 逻辑统计场景告警事件
  8. */
  9. @Data
  10. public class MailMsgAlarm extends BaseBean {
  11. /**
  12. * 当前登录成功的事件
  13. */
  14. public MailMsg mailMsg;
  15. /**
  16. * 当前捕获的告警主键:username@client_ip
  17. */
  18. public String alarmKey;
  19. /**
  20. * 第一次登录失败的事件时间
  21. */
  22. public String startTime;
  23. /**
  24. * 连续登录失败后下一次登录成功的事件时间
  25. */
  26. public String endTime;
  27. /**
  28. * 连续登录失败的次数
  29. */
  30. public int failTimes;
  31. @Override
  32. public String toString() {
  33. return "{" +
  34. " 'mailMsg_login_success':'" + mailMsg + "'" +
  35. ", 'alarmKey':'" + alarmKey + "'" +
  36. ", 'start_login_time_in3min':'" +startTime + "'" +
  37. ", 'end_login_time_in3min':'" +endTime + "'" +
  38. ", 'login_fail_times':'" +failTimes + "'" +
  39. "}";
  40. }
  41. public MailMsgAlarm() {
  42. }
  43. @Override
  44. public boolean equals(Object o) {
  45. if (this == o) return true;
  46. if (!(o instanceof MailMsgAlarm)) return false;
  47. MailMsgAlarm that = (MailMsgAlarm) o;
  48. return getFailTimes() == that.getFailTimes() && getMailMsg().equals(that.getMailMsg()) && getAlarmKey().equals(that.getAlarmKey()) && getStartTime().equals(that.getStartTime()) && getEndTime().equals(that.getEndTime());
  49. }
  50. @Override
  51. public int hashCode() {
  52. return Objects.hash(getMailMsg(), getAlarmKey(), getStartTime(), getEndTime(), getFailTimes());
  53. }
  54. }

消息实体类

  1. package com.data.dev.common.javabean.kafkaMailTopic;
  2. import com.data.dev.common.javabean.BaseBean;
  3. import lombok.Data;
  4. import java.util.Objects;
  5. /**
  6. * {
  7. * "user": "wangxm",
  8. * "client_ip": "110.68.6.182",
  9. * "source": "login",
  10. * "loginname": "wangxm@test.com",
  11. * "IP": "110.8.148.58",
  12. * "timestamp": "17:58:12",
  13. * "@timestamp": "2022-04-20T09:58:13.647Z",
  14. * "ip": "110.7.231.25",
  15. * "clienttype": "POP3",
  16. * "result": "success",
  17. * "@version": "1"
  18. * }
  19. *
  20. * user登录用户
  21. * client_ip 来源ip
  22. * source 类型
  23. * loginname 登录用户邮箱地址
  24. * ip 目标前端ip
  25. * timestamp 发送时间
  26. * &#064;timestamp 发送日期时间
  27. * IP 邮件日志发送来源IP
  28. * clienttype 客户端登录类型
  29. * result 登录状态
  30. */
  31. @Data
  32. public class MailMsg extends BaseBean {
  33. public String user;
  34. public String client_ip;
  35. public String source;
  36. public String loginName;
  37. public String mailSenderSourceIp;
  38. public String timestamp_time;
  39. public String timestamp_datetime;
  40. public String ip;
  41. public String clientType;
  42. public String result;
  43. public String version;
  44. public MailMsg() {
  45. }
  46. public MailMsg(String user, String client_ip, String source, String loginName, String mailSenderSourceIp, String timestamp_time, String timestamp_datetime, String ip, String clientType, String result, String version) {
  47. this.user = user;
  48. this.client_ip = client_ip;
  49. this.source = source;
  50. this.loginName = loginName;
  51. this.mailSenderSourceIp = mailSenderSourceIp;
  52. this.timestamp_time = timestamp_time;
  53. this.timestamp_datetime = timestamp_datetime;
  54. this.ip = ip;
  55. this.clientType = clientType;
  56. this.result = result;
  57. this.version = version;
  58. }
  59. @Override
  60. public boolean equals(Object o) {
  61. if (this == o) return true;
  62. if (!(o instanceof MailMsg)) return false;
  63. MailMsg mailMsg = (MailMsg) o;
  64. return getUser().equals(mailMsg.getUser()) && getClient_ip().equals(mailMsg.getClient_ip()) && getSource().equals(mailMsg.getSource()) && getLoginName().equals(mailMsg.getLoginName()) && getMailSenderSourceIp().equals(mailMsg.getMailSenderSourceIp()) && getTimestamp_time().equals(mailMsg.getTimestamp_time()) && getTimestamp_datetime().equals(mailMsg.getTimestamp_datetime()) && getIp().equals(mailMsg.getIp()) && getClientType().equals(mailMsg.getClientType()) && getResult().equals(mailMsg.getResult()) && getVersion().equals(mailMsg.getVersion());
  65. }
  66. @Override
  67. public int hashCode() {
  68. return Objects.hash(getUser(), getClient_ip(), getSource(), getLoginName(), getMailSenderSourceIp(), getTimestamp_time(), getTimestamp_datetime(), getIp(), getClientType(), getResult(), getVersion());
  69. }
  70. @Override
  71. public String toString() {
  72. return "{" +
  73. " 'user':'" + user + "'" +
  74. ", 'client_ip':'" + client_ip + "'" +
  75. ", 'source':'" + source + "'" +
  76. ", 'loginName':'" + loginName + "'" +
  77. ", 'IP':'" + mailSenderSourceIp + "'" +
  78. ", 'timestamp':'" + timestamp_time + "'" +
  79. ", '@timestamp':'" + timestamp_datetime + "'" +
  80. ", 'ip':'" + "'" +
  81. ", 'clientType':'" + clientType + "'" +
  82. ", 'result':'" + result + "'" +
  83. ", 'version':'" + version + "'" +
  84. "}";
  85. }
  86. }

以上就是基于FLink如何实现实时安全检测的详细内容,更多关于基于FLink如何实现实时安全检测的资料请关注九品源码其它相关文章!