安全攻防 / 运维笔记

DataCon DNS恶意流量检测

Einic Yeo · 5月30日 · 2019年 ·

数据结构化处理

  • 原始pcap上传至服务器,使用 tshark -r q1_final.pcap -T ek > output_ek.json 解包并按照elasticsearch格式导出json。
  • 由于题目要求提交packet index,再将解出的39G json文件使用python脚本添加index列。
import json
path = 'output_ek.json'
output = 'output_ek_index.json'
with open(path) as f:
    with open(output, 'w') as w:
        index = 1
        sep = '$$$$$'
        while True:
            line = f.readline()
            if not line:
                break
            if 'timestamp' in line:
                out_line = str(index) + sep + line.strip() + '\n'
                w.write(out_line)
                if index % 10000 == 0:
                    print index
                index += 1
        print 'total:', index
  • 数据上传到阿里云大数据分析服务 MaxCompute 做包解析和结构化分析。
  • 版权声明:本文遵循 CC 4.0 BY-SA 版权协议,若要转载请务必附上原文出处链接及本声明,谢谢合作!析后的数据阿里云机器学习版权声明:本文遵循 CC 4.0 BY-SA 版权协议,若要转载请务必附上原文出处链接及本声明,谢谢合作!平台 PAI 做算法分析和可视化。

策略解析

通过对数据的初步人工浏览和简单可视化分析发现:

  1. 数据经过脱敏,因此部分字符分布、语义、信息熵等特征会受到影响。
  2. 时间区间很短,因此并不适合用”对历史行为建模以检测未知”的思路来做。
  3. 数据完整,不存在缺失值填充的问题。
  4. 以此说明存在五种攻击方式,且提交的是DNS query的packet id。
  5. eth层、frame层、UDP层、TCP层的特征高度统一,没有留下漏洞,因此重点分析DNS层即可。

据此,我的解题策略为:

  • 原始日志->特征工程->异常检测->人工验证(得到部分答案)->pattern提取->规则匹配->全部答案。

特征工程

接下来将DNS攻击分为三种建模:

  1. 密集请求型:例如随机子域名DDoS、反射型DDoS。其特征为QPS高、时序特征强,一般能够可视化观察到波峰。
  2. 漏洞攻击型:例如针对DNS server的已知漏洞攻击。其特征为数量少、受DNS type影响,适合分类统计。如果批量PoC的话,则特征同1。
  3. 数据传输型:例如DNS Tunnel、Malware DGA、PoC中的DNS回显、SSRF重绑定等。其特征在于域名文本特征明显、适用于规则匹配。

将DNS日志的Request和Response join到一起,然后做统计特征和文本特征:

  1. DNS请求时序分布
  2. QPS min/max/avg
  3. QPS均值
  4. QPS波动性
  5. 连接成功率
  6. DNS响应率
  7. TCP报文占比
  8. 请求响应比
  9. 单域名平均访问次数
  10. 单目标高频访问
  11. 多级子域名变化率
  12. DNS type时序分布
  13. DNS type源IP分布
  14. 长随机域名
  15. DNS Tunnel特征
  16. 部分DNS RCE
  17. 心跳包

代码示例:SRC_IP维度的部分统计特征

select 
  src_ip,
  max(index_rsp_cnt) as max_index_rsp_cnt,
  avg(index_rsp_cnt) as avg_index_rsp_cnt,
  stddev(index_rsp_cnt) as stddev_index_rsp_cnt,
  count(distinct index) as all_req_cnt,
  count(distinct text_dns_qry_name) as domain_req_cnt,
  count(distinct timestamp_sec) as alive_seconds,
  abs(max(timestamp_sec)-min(timestamp_sec)) as alive_period,
  cast(count(distinct timestamp_sec) as double)/cast(abs(max(timestamp_sec)-min(timestamp_sec)+1) as double) as alive_density,
  count(distinct text_dns_qry_name) as uniq_domain_cnt,
  count(distinct text_dns_qry_type) as uniq_qr_type_cnt,
  case when count(index_rqs_success) > 0 then sum(index_rqs_success)/cast(count(index_rqs_success) as double) else 0 end as rqs_success_rate,
  case when count(index_resp_success) > 0 then sum(index_resp_success)/cast(count(index_resp_success) as double) else 0 end as resp_success_rate,
  max(dns_dns_count_queries) as max_dns_count_queries,
  avg(dns_dns_count_queries) as avg_dns_count_queries,
  stddev(dns_dns_count_queries) as stddev_dns_count_queries
from (  
select *,
    unix_timestamp(_time) timestamp_sec,
    trim(json_extractor(layer_dns,'text_dns_qry_type')) as text_dns_qry_type,
    trim(json_extractor(layer_dns,'dns_dns_count_queries')) as dns_dns_count_queries,
    count(distinct r_index) over (partition by index) as index_rsp_cnt,
    case when r_index is not null then '1' else '0' end as index_rqs_success,
    case when trim(json_extractor(r_layer_dns,'dns_flags_dns_flags_rcode')) = '3' then '0' else '1' end as index_resp_success
from ${t1}
where layer_dns <> '' -- 去除TCP握手包
) _
group by src_ip

版权声明:本文遵循 CC 4.0 BY-SA 版权协议,若要转载请务必附上原文出处链接及本声明,谢谢合作!常检测

  • 将以上统计特征通过全量数据建立基线,然后在每个特征维度滤出超越3sigma的异常值。

以下是针对时频异常的基线(stddev)和过滤示版权声明:本文遵循 CC 4.0 BY-SA 版权协议,若要转载请务必附上原文出处链接及本声明,谢谢合作!例代版权声明:本文遵循 CC 4.0 BY-SA 版权协议,若要转载请务必附上原文出处链接及本声明,谢谢合作!码:

-- 分母拉长到全量时间线
select /*+mapjoin(a)*/
  _time,
  src_ip
from (
select
    _time
from ${t1}
) a join (
  select src_ip
  from ${t2}
) b on 1=1

-- stddev计算
select 
  *,
  sqrt(pow_sum/16273.0) as stddev_qps
from (
  select 
    *,
    sum(pow_qps) over (PARTITION by src_ip) as pow_sum
  from (
    select 
      a.*,
      b.avg_qps,
      pow(abs(a.normalized_qry_cnt-b.avg_qps),2) as pow_qps
    from (
      select * from ${t1}
    ) a join (
      select * from ${t2}
    ) b on a.src_ip = b.src_ip
  ) _
) __

-- 3sigma过滤
select 
    *
from ${t1}
where qry_qps_stddev  > 0.08776535453791778*3 
order by qry_qps_stddev desc limit 999
  • 验证过程中,每一轮在确认了每种攻击流量之后,将其从全量流量中去除并重新计算baseline和异常。

人工验证及过滤

将以上异常检测滤出的IP按照异常维度数量排序,依次人工确认是否为攻击行为,然后通过规则滤出存在攻击的数据包。

xy@x-8 ~/D/D/a/finall_100_2> cat traffic.csv | grep ",5" | wc -l
     72
xy@x-8 ~/D/D/a/finall_100_2> cat traffic.csv | grep ",4" | wc -l
   33200
xy@x-8 ~/D/D/a/finall_100_2> cat traffic.csv | grep ",3" | wc -l
    5055
xy@x-8 ~/D/D/a/finall_100_2> cat traffic.csv | grep ",2" | wc -l
    5292
xy@x-8 ~/D/D/a/finall_100_2> cat traffic.csv | grep ",1" | wc -l
   34184

这里前四种攻击(子域名爆破、域传送、非法域更新、反射DDoS),而最后一种攻击没有找到,于是排序出三种最明显的数据提交进行fuzz,碰撞出最后一种。

总结

从结果来看,最高效的特征如下:

  • DNS type。
  • src_ip维度的统计分析特征(QPS、域名数量、请求响应数),而src_ip的行为做的非常干净,那么找到了IP就找到了攻击。

分析方法只用了3sigma异常基线一种,人工排序观察Top的异常结果,确认攻击后写规则捞出全部同类攻击。

参考文献

Presentation: PDF

Transport: 
阿里云-cdxy【乐枕
0 条回应