A-A+

python 使用elasticsearch模块获取ELK系统上的索引数据

2016年07月20日 Python 暂无评论 阅读 13,617 views 次

这次想要开发用户行为分析系统,所以要把网站日志导入到数据库中,而刚好我们部署了EKL日志系统,所以此次写了个脚本,直接写python脚本,利用elasticsearch模块,直接从ELK上获取所需的数据。

 

以下所示范内容与真实需求有出入,这只是为了方便理解脚本代码的意思

EKL中,索引文件名如是:logstash-apache-www.linuxyw.com-2016.7.20这样的格式,

ELK中的日志格式如下图所示:

KEL日志格式

KEL日志格式

 

我们只要在elasticsearch中获取时间,用户IP,用户访问的路由等(其实真实场景还有更多)。

代码如下:

  1. #!/usr/bin/env python
  2. #coding=utf-8
  3. """
  4. __author__ = '戴儒锋'
  5. 使用elasticsearch模块获取昨天某站点访问日志的所有数据
  6. elasticsearch模块中指定使用scroll用来避免深度分页查找数据时的性能消耗
  7. scan(扫描)搜索类型是和scroll(滚屏)API一起使用来从Elasticsearch里高效地取回巨大数量的结果而不需要付出深分页的代价。
  8. size被应用到每一个分片上,所以我们在每个批次里最多或获得size * number_of_primary_shards(size*主分片数)
  9. scroll= "1m" 指定快照时间为1分钟
  10. """
  11. import datetime
  12. from elasticsearch import Elasticsearch
  13. # 格式为:2016.7.19 的昨日日期
  14. yesterday = (datetime.datetime.now()  + datetime.timedelta(days = -1)).strftime("%Y.%m.%d")
  15. # 格式为:2016-7-19 的昨日日期
  16. filter_yesterday = (datetime.datetime.now()  + datetime.timedelta(days = -1)).strftime("%Y-%m-%d")
  17. # 格式为:2016.7.18 的前天日期
  18. before_yesterday = (datetime.datetime.now()  + datetime.timedelta(days = -2)).strftime("%Y.%m.%d")
  19. # 请求elasticsearch节点的url
  20. url = "http://192.168.1.41:9200/"
  21. # 使用的索引,因日期时区问题,所以要指定昨天和前天的索引名
  22. index_name = "logstash-apache-www.linuxyw.com-{date},logstash-apache-www.linuxyw.com-{b_date}".format(date=yesterday,b_date=before_yesterday)
  23. # 实例化Elasticsearch类,并设置超时间为120秒,默认是10秒的,如果数据量很大,时间设置更长一些
  24. es = Elasticsearch(url,timeout=120)
  25. # DSL查询语法,在下面es.search使用
  26. data = {
  27.     "size": 10000000,   #指定每个分片最大返回的数据量,可根据日志量进行设置
  28.     "query" : {
  29.         "bool":{
  30.             # 指定要匹配的字符,这里是查找所有数据
  31.             "must" : {"match_all":{}},
  32.             # 过滤,指定时间范围,这里设置成昨天0点到24点,代码上||-8h,因为ELK用的是UTC时间,跟北京时间误差8小时,所以要减8小时,这就是日志里的北京时间了
  33.             "filter" : {
  34.                 "range" : { "@timestamp" : {
  35.                     "gt" : "{date}T00:00:00||-8h".format(date=filter_yesterday),
  36.                     "lt" : "{date}T23:59:59||-8h".format(date=filter_yesterday),
  37.                     }
  38.                 }
  39.             }
  40.         }
  41.     }
  42. }
  43. # 设置要过滤返回的字段值,要什么字段,就在这里添加,这样可以节约返回的数据量(带宽,内存等)
  44. return_fields = [
  45.     '_scroll_id',
  46.     'hits.hits._source.timestamp',
  47.     'hits.hits._source.@timestamp',
  48.     'hits.hits._source.clientip',
  49.     'hits.hits._source.request',
  50. ]
  51. def main():
  52.     # 指定search_type="scan"模式,并返回_scroll_id给es.scroll获取数据使用
  53.     res = es.search(
  54.             index=index_name,
  55.             body=data,
  56.             search_type="scan",
  57.             scroll="1m"
  58.         )
  59.     scrollId=res["_scroll_id"]  # 获取scrollID
  60.     response= es.scroll(scroll_id=scrollId, scroll= "1m",filter_path=return_fields,)
  61.     print len(response['hits']['hits']) # 打印获取到的日志数量
  62.     # for hit in response['hits']['hits']:
  63.     #     print hit['_source']
  64. if __name__ == "__main__":
  65.     main()

 

for hit in response['hits']['hits']:

print hit

输出的结果如:

  1. {u'timestamp': u'19/Jul/2016:08:00:34 +0800', u'@timestamp': u'2016-07-19T00:00:35.380Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'60.221.255.44'}
  2. {u'timestamp': u'19/Jul/2016:08:00:34 +0800', u'@timestamp': u'2016-07-19T00:00:36.507Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'112.5.16.13'}
  3. {u'timestamp': u'19/Jul/2016:08:00:36 +0800', u'@timestamp': u'2016-07-19T00:00:36.508Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'115.238.250.237'}
  4. {u'timestamp': u'19/Jul/2016:00:02:34 +0800', u'@timestamp': u'2016-07-18T16:02:34.934Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'115.231.218.18'}
  5. {u'timestamp': u'19/Jul/2016:00:02:34 +0800', u'@timestamp': u'2016-07-18T16:02:34.934Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'123.134.186.178'}
  6. {u'timestamp': u'19/Jul/2016:00:02:34 +0800', u'@timestamp': u'2016-07-18T16:02:34.934Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'218.61.8.10'}
  7. {u'timestamp': u'19/Jul/2016:00:02:34 +0800', u'@timestamp': u'2016-07-18T16:02:34.934Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'118.212.147.71'}
  8. {u'timestamp': u'19/Jul/2016:00:02:34 +0800', u'@timestamp': u'2016-07-18T16:02:34.935Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'210.32.125.68'}
  9. {u'timestamp': u'19/Jul/2016:00:02:34 +0800', u'@timestamp': u'2016-07-18T16:02:34.935Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'222.163.80.18'}
  10. {u'timestamp': u'19/Jul/2016:00:02:34 +0800', u'@timestamp': u'2016-07-18T16:02:34.935Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'222.186.33.170'}
  11. {u'timestamp': u'19/Jul/2016:00:02:34 +0800', u'@timestamp': u'2016-07-18T16:02:34.935Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'218.199.110.39'}
  12. {u'timestamp': u'19/Jul/2016:00:02:34 +0800', u'@timestamp': u'2016-07-18T16:02:34.935Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'114.80.201.18'}
  13. {u'timestamp': u'19/Jul/2016:00:02:34 +0800', u'@timestamp': u'2016-07-18T16:02:34.935Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'115.238.250.237'}
  14. {u'timestamp': u'19/Jul/2016:08:04:35 +0800', u'@timestamp': u'2016-07-19T00:04:36.209Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'58.215.186.208'}
  15. {u'timestamp': u'19/Jul/2016:08:04:35 +0800', u'@timestamp': u'2016-07-19T00:04:36.209Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'122.227.164.103'}
  16. {u'timestamp': u'19/Jul/2016:08:04:35 +0800', u'@timestamp': u'2016-07-19T00:04:36.209Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'222.186.33.170'}
  17. {u'timestamp': u'19/Jul/2016:08:04:35 +0800', u'@timestamp': u'2016-07-19T00:04:36.208Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'223.100.7.69'}
  18. {u'timestamp': u'19/Jul/2016:08:04:35 +0800', u'@timestamp': u'2016-07-19T00:04:36.209Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'218.6.9.4'}
  19. {u'timestamp': u'19/Jul/2016:08:04:35 +0800', u'@timestamp': u'2016-07-19T00:04:36.115Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'183.136.232.136'}
  20. {u'timestamp': u'19/Jul/2016:08:04:35 +0800', u'@timestamp': u'2016-07-19T00:04:36.114Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'221.203.236.213'}
  21. {u'timestamp': u'19/Jul/2016:08:04:35 +0800', u'@timestamp': u'2016-07-19T00:04:36.115Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'124.119.87.204'}
  22. {u'timestamp': u'19/Jul/2016:08:04:35 +0800', u'@timestamp': u'2016-07-19T00:04:36.115Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'43.254.144.227'}
  23. {u'timestamp': u'19/Jul/2016:08:04:35 +0800', u'@timestamp': u'2016-07-19T00:04:36.115Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'59.173.18.243'}

 

其实这些脚本,最终入库的,但是这里就不写入库过程了

最后推荐些Elasticsearch教程资料:

Elasticsearch 权威指南(中文版):http://es.xiaoleilu.com/index.html

Python Elasticsearch Client:http://elasticsearch-py.readthedocs.io/en/master/

 

 

关键词:

给我留言

Copyright © linux系统运维 保留所有权利.   Theme  Ality 粤ICP备13023035号-1

用户登录

分享到: