当前位置 博文首页 > wang785994599的博客:5.elasticsearch数据输入和输出
GET /website/blog/123?pretty
GET /website/blog/123?_source=title,text
如果你只想得到?_source
?字段,不需要任何元数据,你能使用?_source
?:
GET /website/blog/123/_source
curl -i -XHEAD http://localhost:9200/website/blog/123
在 Elasticsearch 中文档是?不可改变?的,不能修改它们。?相反,如果想要更新现有的文档,需要?重建索引或者进行替换,?我们可以使用相同的?index
?API 进行实现,在?索引文档?中已经进行了讨论。
PUT /website/blog/123
{
"title": "My first blog entry",
"text": "I am starting to get the hang of this...",
"date": "2014/01/02"
}
在响应体中,我们能看到 Elasticsearch 已经增加了?_version
?字段值:
{
"_index" : "website",
"_type" : "blog",
"_id" : "123",
"_version" : 2,
"created": false
}
created
?标志设置成?false
?,是因为相同的索引、类型和 ID 的文档已经存在。
在内部,Elasticsearch 已将旧文档标记为已删除,并增加一个全新的文档。?尽管你不能再对旧版本的文档进行访问,但它并不会立即消失。当继续索引更多的数据,Elasticsearch 会在后台清理这些已删除文档。
DELETE /website/blog/123
正如已经在更新整个文档中提到的,删除文档不会立即将文档从磁盘中删除,只是将文档标记为已删除状态。随着你不断的索引更多的数据,Elasticsearch 将会在后台清理标记为已删除的文档。
当我们使用?index
?API 更新文档?,可以一次性读取原始文档,做我们的修改,然后重新索引?整个文档?。 最近的索引请求将获胜:无论最后哪一个文档被索引,都将被唯一存储在 Elasticsearch 中。如果其他人同时更改这个文档,他们的更改将丢失。
很多时候这是没有问题的。也许我们的主数据存储是一个关系型数据库,我们只是将数据复制到 Elasticsearch 中并使其可被搜索。 也许两个人同时更改相同的文档的几率很小。或者对于我们的业务来说偶尔丢失更改并不是很严重的问题。
但有时丢失了一个变更就是?非常严重的?。试想我们使用 Elasticsearch 存储我们网上商城商品库存的数量, 每次我们卖一个商品的时候,我们在 Elasticsearch 中将库存数量减少。
有一天,管理层决定做一次促销。突然地,我们一秒要卖好几个商品。 假设有两个 web 程序并行运行,每一个都同时处理所有商品的销售,如图??“Consequence of no concurrency control”?所示。
?Consequence of no concurrency control
?
?
web_1
?对?stock_count
?所做的更改已经丢失,因为?web_2
?不知道它的?stock_count
?的拷贝已经过期。 结果我们会认为有超过商品的实际数量的库存,因为卖给顾客的库存商品并不存在,我们将让他们非常失望。
变更越频繁,读数据和更新数据的间隙越长,也就越可能丢失变更。
在数据库领域中,有两种方法通常被用来确保并发更新时变更不会丢失:
悲观并发控制
这种方法被关系型数据库广泛使用,它假定有变更冲突可能发生,因此阻塞访问资源以防止冲突。 一个典型的例子是读取一行数据之前先将其锁住,确保只有放置锁的线程能够对这行数据进行修改。
乐观并发控制
Elasticsearch 中使用的这种方法假定冲突是不可能发生的,并且不会阻塞正在尝试的操作。 然而,如果源数据在读写当中被修改,更新将会失败。应用程序接下来将决定该如何解决冲突。 例如,可以重试更新、使用新的数据、或者将相关情况报告给用户。
Elasticsearch 是分布式的。当文档创建、更新或删除时,?新版本的文档必须复制到集群中的其他节点。Elasticsearch 也是异步和并发的,这意味着这些复制请求被并行发送,并且到达目的地时也许?顺序是乱的。 Elasticsearch 需要一种方法确保文档的旧版本不会覆盖新的版本。
当我们之前讨论?index
?,?GET
?和?delete
?请求时,我们指出每个文档都有一个?_version
?(版本)号,当文档被修改时版本号递增。 Elasticsearch 使用这个?_version
?号来确保变更以正确顺序得到执行。如果旧版本的文档在新版本之后到达,它可以被简单的忽略。
我们可以利用?_version
?号来确保?应用中相互冲突的变更不会导致数据丢失。我们通过指定想要修改文档的?version
?号来达到这个目的。 如果该版本不是当前版本号,我们的请求将会失败。
让我们创建一个新的博客文章:
PUT /website/blog/1/_create
{
"title": "My first blog entry",
"text": "Just trying this out..."
}
响应体告诉我们,这个新创建的文档?_version
?版本号是?1
?。现在假设我们想编辑这个文档:我们加载其数据到 web 表单中, 做一些修改,然后保存新的版本。
首先我们检索文档:
GET /website/blog/1
响应体包含相同的?_version
?版本号?1
?:
{
"_index" : "website",
"_type" : "blog",
"_id" : "1",
"_version" : 1,
"found" : true,
"_source" : {
"title": "My first blog entry",
"text": "Just trying this out..."
}
}
现在,当我们尝试通过重建文档的索引来保存修改,我们指定?version
?为我们的修改会被应用的版本:
PUT /website/blog/1?version=1
{
"title": "My first blog entry",
"text": "Starting to get the hang of this..."
}
我们想这个在我们索引中的文档只有现在的? |
此请求成功,并且响应体告诉我们?_version
?已经递增到?2
?
{
"_index": "website",
"_type": "blog",
"_id": "1",
"_version": 2
"created": false
}
然而,如果我们重新运行相同的索引请求,仍然指定?version=1
?, Elasticsearch 返回?409 Conflict
HTTP 响应码,和一个如下所示的响应体:
{
"error": {
"root_cause": [
{
"type": "version_conflict_engine_exception",
"reason": "[blog][1]: version conflict, current [2], provided [1]",
"index": "website",
"shard": "3"
}
],
"type": "version_conflict_engine_exception",
"reason": "[blog][1]: version conflict, current [2], provided [1]",
"index": "website",
"shard": "3"
},
"status": 409
}
这告诉我们在 Elasticsearch 中这个文档的当前?_version
?号是?2
?,但我们指定的更新版本号为?1
?。
我们现在怎么做取决于我们的应用需求。我们可以告诉用户说其他人已经修改了文档,并且在再次保存之前检查这些修改内容。 或者,在之前的商品?stock_count
?场景,我们可以获取到最新的文档并尝试重新应用这些修改。
所有文档的更新或删除 API,都可以接受?version
?参数,这允许你在代码中使用乐观的并发控制,这是一种明智的做法。
一个常见的设置是使用其它数据库作为主要的数据存储,使用 Elasticsearch 做数据检索,?这意味着主数据库的所有更改发生时都需要被复制到 Elasticsearch ,如果多个进程负责这一数据同步,你可能遇到类似于之前描述的并发问题。
如果你的主数据库已经有了版本号?—?或一个能作为版本号的字段值比如?timestamp
?—?那么你就可以在 Elasticsearch 中通过增加?version_type=external
?到查询字符串的方式重用这些相同的版本号,?版本号必须是大于零的整数, 且小于?9.2E+18
?—?一个 Java 中?long
?类型的正值。
外部版本号的处理方式和我们之前讨论的内部版本号的处理方式有些不同, Elasticsearch 不是检查当前?_version
?和请求中指定的版本号是否相同, 而是检查当前?_version
?是否?小于?指定的版本号。 如果请求成功,外部的版本号作为文档的新?_version
?进行存储。
外部版本号不仅在索引和删除请求是可以指定,而且在?创建?新文档时也可以指定。
例如,要创建一个新的具有外部版本号?5
?的博客文章,我们可以按以下方法进行:
PUT /website/blog/2?version=5&version_type=external
{
"title": "My first external blog entry",
"text": "Starting to get the hang of this..."
}
在响应中,我们能看到当前的?_version
?版本号是?5
?:
{
"_index": "website",
"_type": "blog",
"_id": "2",
"_version": 5,
"created": true
}
现在我们更新这个文档,指定一个新的?version
?号是?10
?:
PUT /website/blog/2?version=10&version_type=external
{
"title": "My first external blog entry",
"text": "This is a piece of cake..."
}
请求成功并将当前?_version
?设为?10
?:
{
"_index": "website",
"_type": "blog",
"_id": "2",
"_version": 10,
"created": false
}
如果你要重新运行此请求时,它将会失败,并返回像我们之前看到的同样的冲突错误, 因为指定的外部版本号不大于 Elasticsearch 的当前版本号。
在?更新整个文档?, 我们已经介绍过?更新一个文档的方法是检索并修改它,然后重新索引整个文档,这的确如此。然而,使用?update
?API 我们还可以部分更新文档,例如在某个请求时对计数器进行累加。
我们也介绍过文档是不可变的:他们不能被修改,只能被替换。?update
?API 必须遵循同样的规则。 从外部来看,我们在一个文档的某个位置进行部分更新。然而在内部,?update
?API 简单使用与之前描述相同的?检索-修改-重建索引?的处理过程。 区别在于这个过程发生在分片内部,这样就避免了多次请求的网络开销。通过减少检索和重建索引步骤之间的时间,我们也减少了其他进程的变更带来冲突的可能性。
update
?请求最简单的一种形式是接收文档的一部分作为?doc
?的参数, 它只是与现有的文档进行合并。对象被合并到一起,覆盖现有的字段,增加新的字段。 例如,我们增加字段?tags
?和?views
?到我们的博客文章,如下所示:
POST /website/blog/1/_update
{
"doc" : {
"tags" : [ "testing" ],
"views": 0
}
}
如果请求成功,我们看到类似于?index
?请求的响应:
{
"_index" : "website",
"_id" : "1",
"_type" : "blog",
"_version" : 3
}
脚本可以在?update
?API中用来改变?_source
?的字段内容,?它在更新脚本中称为?ctx._source
?。 例如,我们可以使用脚本来增加博客文章中?views
?的数量:
POST /website/blog/1/_update
{
"script" : "ctx._source.views+=1"
}
我们也可以通过使用脚本给?tags
?数组添加一个新的标签。在这个例子中,我们指定新的标签作为参数,而不是硬编码到脚本内部。 这使得 Elasticsearch 可以重用这个脚本,而不是每次我们想添加标签时都要对新脚本重新编译:
POST /website/blog/1/_update
{
"script" : "ctx._source.tags+=new_tag",
"params" : {
"new_tag" : "search"
}
}
我们甚至可以选择通过设置?ctx.op
?为?delete
?来删除基于其内容的文档:
POST /website/blog/1/_update
{
"script" : "ctx.op = ctx._source.views == count ? 'delete' : 'none'",
"params" : {
"count": 1
}
}
假设我们需要?在 Elasticsearch 中存储一个页面访问量计数器。 每当有用户浏览网页,我们对该页面的计数器进行累加。但是,如果它是一个新网页,我们不能确定计数器已经存在。 如果我们尝试更新一个不存在的文档,那么更新操作将会失败。
在这样的情况下,我们可以使用?upsert
?参数,指定如果文档不存在就应该先创建它:
POST /website/pageviews/1/_update
{
"script" : "ctx._source.views+=1",
"upsert": {
"views": 1
}
}
我们第一次运行这个请求时,?upsert
?值作为新文档被索引,初始化?views
?字段为?1
?。 在后续的运行中,由于文档已经存在,?script
?更新操作将替代?upsert
?进行应用,对?views
?计数器进行累加。
在本节的介绍中,我们说明?检索?和?重建索引?步骤的间隔越小,变更冲突的机会越小。 但是它并不能完全消除冲突的可能性。 还是有可能在?update
?设法重新索引之前,来自另一进程的请求修改了文档。
为了避免数据丢失,?update
?API 在?检索?步骤时检索得到文档当前的?_version
?号,并传递版本号到?重建索引?步骤的?index
?请求。 如果另一个进程修改了处于检索和重新索引步骤之间的文档,那么?_version
?号将不匹配,更新请求将会失败。
对于部分更新的很多使用场景,文档已经被改变也没有关系。 例如,如果两个进程都对页面访问量计数器进行递增操作,它们发生的先后顺序其实不太重要; 如果冲突发生了,我们唯一需要做的就是尝试再次更新。
这可以通过?设置参数?retry_on_conflict
?来自动完成, 这个参数规定了失败之前?update
?应该重试的次数,它的默认值为?0
?。
POST /website/pageviews/1/_update?retry_on_conflict=5
{
"script" : "ctx._source.views+=1",
"upsert": {
"views": 0
}
}
失败之前重试该更新5次。
在增量操作无关顺序的场景,例如递增计数器等这个方法十分有效,但是在其他情况下变更的顺序?是?非常重要的。 类似?index
?API?,?update
?API 默认采用?最终写入生效?的方案,但它也接受一个?version
?参数来允许你使用?optimistic concurrency control?指定想要更新文档的版本。
Elasticsearch 的速度已经很快了,但甚至能更快。?将多个请求合并成一个,避免单独处理每个请求花费的网络延时和开销。 如果你需要从 Elasticsearch 检索很多文档,那么使用?multi-get?或者?mget
?API?来将这些检索请求放在一个请求中,将比逐个文档请求更快地检索到全部文档。
mget
?API 要求有一个?docs
?数组作为参数,每个?元素包含需要检索文档的元数据, 包括?_index
?、?_type
和?_id
?。如果你想检索一个或者多个特定的字段,那么你可以通过?_source
?参数来指定这些字段的名字:
GET /_mget
{
"docs" : [
{
"_index" : "website",
"_type" : "blog",
"_id" : 2
},
{
"_index" : "website",
"_type" : "pageviews",
"_id" : 1,
"_source": "views"
}
]
}
该响应体也包含一个?docs
?数组?, 对于每一个在请求中指定的文档,这个数组中都包含有一个对应的响应,且顺序与请求中的顺序相同。 其中的每一个响应都和使用单个?get
?request?请求所得到的响应体相同:
{
"docs" : [
{
"_index" : "website",
"_id" : "2",
"_type" : "blog",
"found" : true,
"_source" : {
"text" : "This is a piece of cake...",
"title" : "My first external blog entry"
},
"_version" : 10
},
{
"_index" : "website",
"_id" : "1",
"_type" : "pageviews",
"found" : true,
"_version" : 2,
"_source" : {
"views" : 2
}
}
]
}
如果想检索的数据都在相同的?_index
?中(甚至相同的?_type
?中),则可以在 URL 中指定默认的?/_index
或者默认的?/_index/_type
?。
你仍然可以通过单独请求覆盖这些值:
GET /website/blog/_mget
{
"docs" : [
{ "_id" : 2 },
{ "_type" : "pageviews", "_id" : 1 }
]
}
事实上,如果所有文档的?_index
?和?_type
?都是相同的,你可以只传一个?ids
?数组,而不是整个?docs
?数组:
GET /website/blog/_mget
{
"ids" : [ "2", "1" ]
}
注意,我们请求的第二个文档是不存在的。我们指定类型为?blog
?,但是文档 ID?1
?的类型是?pageviews
,这个不存在的情况将在响应体中被报告:
{
"docs" : [
{
"_index" : "website",
"_type" : "blog",
"_id" : "2",
"_version" : 10,
"found" : true,
"_source" : {
"title": "My first external blog entry",
"text": "This is a piece of cake..."
}
},
{
"_index" : "website",
"_type" : "blog",
"_id" : "1",
"found" : false
}
]
}
未找到该文档。 |
事实上第二个文档未能找到并不妨碍第一个文档被检索到。每个文档都是单独检索和报告的。
即使有某个文档没有找到,上述请求的 HTTP 状态码仍然是?200
?。事实上,即使请求?没有找到任何文档,它的状态码依然是?200
?--因为?mget
?请求本身已经成功执行。 为了确定某个文档查找是成功或者失败,你需要检查?found
?标记。
https://www.elastic.co/guide/cn/elasticsearch/guide/current/bulk.html
cs