Linux Note / 运维笔记

Python Elasticsearch DSL

Einic Yeo · 5月16日 · 2020年

一、Elasticsearch的基本概念

  • Index:Elasticsearch用来存储数据的逻辑区域,它类似于关系型数据库中的database 概念。一个index可以在一个或者多个shard上面,同时一个shard也可能会有多个replicas。
  • Document:Elasticsearch里面存储的实体数据,类似于关系数据中一个table里面的一行数据。 document由多个field组成,不同的document里面同名的field一定具有相同的类型。document里面field可以重复出现,也就是一个field会有多个值,即multivalued。
  • Document type:为了查询需要,一个index可能会有多种document,也就是document type. 它类似版权声明:本文遵循 CC 4.0 BY-SA 版权协议,若要转载请务必附上原文出处链接及本声明,谢谢合作!于关系型数据库中的 table 概念。但需要注意,不同document里面同名的field一定要是相同类型的。
  • Mapping:它类似于关系型数据库中的 schema 定义概念。存储field的相关映射信息,不同document type会有不同的mapping。

下图是ElasticSearch和关系型数据库的一些术语比较:

Relationnal databaseElasticsearch
DatabaseIndex
TableType
RowDocument
ColumnFie版权声明:本文遵循 CC 4.0 BY-SA 版权协议,若要转载请务必附上原文出处链接及本声明,谢谢合作!ld
SchemaMap版权声明:本文遵循 CC 4.0 BY-SA 版权协议,若要转载请务必附上原文出处链接及本声明,谢谢合作!ping
IndexEverything is indexed
SQLQuery DSL
SELECT * FROM table…GET http://…
UPDATE table SETPUT http://…

二、Elasticsearch DSL 简介

1、Install

$ pip install elasticsearch-dsl

2、Create Index and Document

from datetime import datetime
from elasticsearch_dsl import DocType, Date, Integer, Keyword, Text
from elasticsearch_dsl.connections import connections
# Define a default Elasticsearch client
connections.create_connection(hosts=['localhost'])
class Article(DocType):
    title = Text(analyzer='snowball', fields={'raw': Keyword()})
    body = Text(analyzer='snowball')
    tags = Keyword()
    published_from = Date()
    lines = Integer()
    class Meta:
        index = 'blog'
    def save(self, ** kwargs):
        self.lines = len(self.body.split())
        return super(Article, self).save(** kwargs)
    def is_published(self):
        return datetime.now() >= self.published_from
# create the mappings in elasticsearch
Article.init()

创建了一个索引为blog,文档为article的Elasticsearch数据库和表。
必须执行Article.init()方法。 这样Elasticsearch才会根据你的DocType产生对应的Mapping。否则Elasticsearch就会在你第一次创建Index和Type的时候根据你的内容建立对应的Mapping。

现在我们可以通过Elasticsearch Restful API来检查

http GET http://127.0.0.1:9200/blog/_mapping/
{"blog":
	{"mappings":
		{"article":
			{"properties":{
				"body":{"type":"text","analyzer":"snowball"},
				"lines":{"type":"integer"},
				"published_from":{"type":"date"},
				"tags":{"type":"keyword"},
				"title":{"type":"text","fields":{"raw":{"type":"keyword"}},"analyzer":"snowball"}
			}
		}}
	}
}

三、Elasticsearch CRUD 操作

1、Create an article

# create and save and article
article = Article(meta={'id': 1}, title='Hello elasticsearch!', tags=['elasticsearch'])
article.body = ''' looong text '''
article.published_from = datetime.now()
article.save()

=>Restful API

http POST http://127.0.0.1:9200/blog/article/1 title="hello elasticsearch" tags:='["elasticsearch"]'
HTTP/1.1 201 Created
Content-Length: 73
Content-Type: application/json; charset=UTF-8
{
    "_id": "1", 
    "_index": "blog", 
    "_type": "article", 
    "_version": 1, 
    "created": true
}

2、Get a article

article = Article.get(id=1)
# 如果获取一个不存在的文章则返回None
a = Article.get(id='no-in-es')
a is None
# 还可以获取多个文章
articles = Article.mget([1, 2, 3])

=>版权声明:本文遵循 CC 4.0 BY-SA 版权协议,若要转载请务必附上原文出处链接及本声明,谢谢合作!;Restful API

http GET http://127.0.0.1:9200/blog/article/1
HTTP/1.1 200 OK
Content-Length: 141
Content-Type: application/json; charset=UTF-8
{
    "_id": "1", 
    "_index": "blog", 
    "_source": {
        "tags": [
            "elasticsearch"
        ], 
        "title": "hello elasticsearch"
    }, 
    "_type": "article", 
    "_version": 1, 
    "found": true
}

3、Update a article

article = Article.get(id=1)
article.tags = ['elasticsearch', 'hello']
article.save()
# 或者
article.update(body='Today is good day!', published_by='me')

=>Restful API

http PUT http://127.0.0.1:9200/blog/article/1 title="hello elasticsearch" tags:='["elasticsearch", "hello"]'
HTTP/1.1 200 OK
Content-Length: 74
Content-Type: application/json; charset=UTF-8
{
    "_id": "1", 
    "_index": "blog", 
    "_type": "article", 
    "_version": 2, 
    "created": false
}

4、Delete a article

article = Article.get(id=1)
article.delete()

=> Restful API

http DELETE http://127.0.0.1:9200/blog/article/1
HTTP/1.1 200 OK
Content-Length: 71
Content-Type: application/json; charset=UTF-8
{
    "_id": "1", 
    "_index": "blog", 
    "_type": "article", 
    "_version": 4, 
    "found": true
}
http HEAD  http://127.0.0.1:9200/blog/article/1
HTTP/1.1 404 Not Found
Content-Length: 0
Content-Type: text/plain; charset=UTF-8

四、ElasticSearch DSL 搜索

Search主要包括:

  • 查询(queries)
  • 过滤器(filters)版权声明:本文遵循 CC 4.0 BY-SA 版权协议,若要转载请务必附上原文出处链接及本声明,谢谢合作!
  • 聚合(aggreations)
  • 排序(sort)
  • 分页(pagination)
  • 额外的参数(additional parameters)
  • 相关性(associated)

创建一个查询对象

from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
client = Elasticsearch()
s = Search(using=client)

初始化测试数据

def add_article(id_, title, body, tags):
    article = Article(meta={'id': id_}, title=title, tags=tags)
    article.body = body
    article.published_from = datetime.now()
    article.save()
def init_test_data():
    add_article(2, 'Python is good!', 'Python is good!', ['python'])
    add_article(3, 'Elasticsearch', 'Distributed, open source search and analytics engine', ['elasticsearch'])
    add_article(4, 'Python very quickly', 'Python very quickly', ['python'])
    add_article(5, 'Django', 'Python Web framework', ['python', 'django'])
# 创建一个查询语句
s = Search().using(client).query("match", title="python")
# 查看查询语句对应的字典结构
print(s.to_dict())
# {'query': {'match': {'title': 'python'}}}
# 发送查询请求到Elasticsearch
response = s.execute()
# 打印查询结果
for hit in s:
    print(hit.title)
# Out:
Python is good!
Python very quickly
# 删除查询
s.delete()

1、Queries

# 创建一个多字段查询
multi_match = MultiMatch(query='python', fields=['title', 'body'])
s = Search().query(multi_match)
print(s.to_dict())
# {'query': {'multi_match': {'fields': ['title', 'body'], 'query': 'python'}}}
# 使用Q语句
q = Q("multi_match", query='python', fields=['title', 'body'])
# 或者
q = Q({"multi_match": {"query": "python", "fields": ["title", "body"]}})
s = Search().query(q)
print(s.to_dict())
# If you already have a query object, or a dict 
# representing one, you can just override the query used 
# in the Search object:
s.query = Q('bool', must=[Q('match', title='python'), Q('match', body='best')])
print(s.to_dict())
# 查询组合
q = Q("match", title='python') | Q("match", title='django')
s = Search().query(q)
print(s.to_dict())
# {"bool": {"should": [...]}}
q = Q("match", title='python') & Q("match", title='django')
s = Search().query(q)
print(s.to_dict())
# {"bool": {"must": [...]}}
q = ~Q("match", title="python")
s = Search().query(q)
print(s.to_dict())
# {"bool": {"must_not": [...]}}

2、Filters

s = Search()
s = s.filter('terms', tags=['search', 'python'])
print(s.to_dict())
# {'query': {'bool': {'filter': [{'terms': {'tags': ['search', 'python']}}]}}}
s = s.query('bool', filter=[Q('terms', tags=['search', 'python'])])
print(s.to_dict())
# {'query': {'bool': {'filter': [{'terms': {'tags': ['search', 'python']}}]}}}
s = s.exclude('terms', tags=['search', 'python'])
# 或者
s = s.query('bool', filter=[~Q('terms', tags=['search', 'python'])])
print(s.to_dict())
# {'query': {'bool': {'filter': [{'bool': {'must_not': [{'terms': {'tags': ['search', 'python']}}]}}]}}}

3、Aggregations

s = Search()
a = A('terms', filed='title')
s.aggs.bucket('title_terms', a)
print(s.to_dict())
# {
# 'query': {
#   'match_all': {}
#  },
#  'aggs': {
#       'title_terms': {
#            'terms': {'filed': 'title'}
#        }
#    }
# }
# 或者
s = Search()
s.aggs.bucket('articles_per_day', 'date_histogram', field='publish_date', interval='day') \
    .metric('clicks_per_day', 'sum', field='clicks') \
    .pipeline('moving_click_average', 'moving_avg', buckets_path='clicks_per_day') \
    .bucket('tags_per_day', 'terms', field='tags')
s.to_dict()
# {
#   "aggs": {
#     "articles_per_day": {
#       "date_histogram": { "interval": "day", "field": "publish_date" },
#       "aggs": {
#         "clicks_per_day": { "sum": { "field": "clicks" } },
#         "moving_click_average": { "moving_avg": { "buckets_path": "clicks_per_day" } },
#         "tags_per_day": { "terms": { "field": "tags" } }
#       }
#     }
#   }
# }

4、Sorting

s = Search().sort(
    'category',
    '-title',
    {"lines" : {"order" : "asc", "mode" : "avg"}}
)

5、Pagination

s = s[10:20]
# {"from": 10, "size": 10}

6、Extra Properties and parameters

s = Search()
# 设置扩展属性使用`.extra()`方法
s = s.extra(explain=True)
# 设置参数使用`.params()`
s = s.params(search_type="count")
# 如要要限制返回字段,可以使用`source()`方法
# only return the selected fields
s = s.source(['title', 'body'])
# don't return any fields, just the metadata
s = s.source(False)
# explicitly include/exclude fields
s = s.source(include=["title"], exclude=["user.*"])
# reset the field selection
s = s.source(None)
# 使用dict序列化一个查询
s = Search.from_dict({"query": {"match": {"title": "python"}}})
# 修改已经存在的查询
s.update_from_dict({"query": {"match": {"title": "python"}}, "size": 42})

参考文献

https://elasticsearch-dsl.readthedocs.io/en/latest/

0 条回应