【elasticsearch】ES数据库重建索引 — Reindex(数据迁移)

【elasticsearch】ES数据库重建索引 — Reindex(数据迁移)本文详细介绍了 Elasticsearc 的 Reindex 功能 用于数据迁移和索引重建

大家好,欢迎来到IT知识分享网。

在这里插入图片描述

1.应用背景:

1、当你的数据量过大,而你的索引最初创建的分片数量不足,导致数据入库较慢的情况,此时需要扩大分片的数量,此时可以尝试使用Reindex。

2、当数据的mapping需要修改,但是大量的数据已经导入到索引中了,重新导入数据到新的索引太耗时;但是在ES中,一个字段的mapping在定义并且导入数据之后是不能再修改的,

所以这种情况下也可以考虑尝试使用Reindex。

ES提供了_reindex这个API。相对于我们重新导入数据肯定会快不少,实测速度大概是bulk导入数据的5-10倍。

数据迁移步骤:

  • 创建新的索引(可以通过java程序也可以直接在head插件上创建)
    • 注意:在创建索引的时候要把表结构也要创建好(也就是mapping)
  • 复制数据

1.1 注意事项

  • 源和目的不能相同, 比如不能将数据流 Reindex 给它自身。
  • 源索引的文档中 _source 字段必须开启。
  • Reindex 不会复制源的 setting 和源所匹配的模板, 因此在调用 _reindex 前, 你需要设置好目的索引 (action.auto_create_index 为 false 或者 -.* 时)。
  • 目标索引的 mapping, 主分片数, 副本数等推荐提前配置。

1.2 前置要求

如果 Elasticsearch 集群配置了安全策略和权限策略, 则进行 Reindex 必须拥有以下权限:

  • 读取源的数据流、 索引、 索引别名等索引级别权限。
  • 对于目的数据流、 索引、 索引别名的写权限。
  • 如果需要使用 Reindex API 自动创建数据流和索引, 则必须拥有对目的数据流、 索引、 索引别名的 auto_configure、 create_index 或者 manage 等索引级别权限。
  • 如果源为远程的集群, 则 source.remote.user 用户必须拥有集群监控权限, 和读取源索引、 源索引别名、 源数据流的权限。
  • 如果 Reindex 的源为远程集群, 必须在当前集群的请求节点 elasticsearch.yml文件配置远程白名单 reindex.remote.whitelist。

创建数据流, 需要提前配置好数据流的匹配索引模板, 详情可参看 Set up a data stream: https://www.elastic.co/guide/en/elasticsearch/reference/7.11/set-up-a-data-stream.html

2.Reindex

2.1 使用

2.1.1 基本使用

最简单、基本的方式:

1)代码请求:

POST _reindex { 
    "source": { 
    "index": "old_index" }, "dest": { 
    "index": "new_index" } } 

2)利用命令:

curl _XPOST 'ES数据库请求地址:9200/_reindex' -d{ 
   "source":{ 
   "index":"old_index"},"dest":{ 
   "index":"new_index"}} 

2.1.2 将多个索引reindex到一个目标

POST _reindex { 
    "source": { 
    "index": ["twitter", "blog"], "type": ["tweet", "post"] }, "dest": { 
    "index": "all_together" } } 

这里要注意的是,如果twitter和blog中有document的id是一样的,则无法保证最终出现在all_together里面的document是哪个,因为迭代是随机的。(最后一个会覆盖前一个)

2.1.3 只复制特定的field

POST _reindex { 
    "source": { 
    "index": "twitter", "_source": ["user", "tweet"] }, "dest": { 
    "index": "new_twitter" } } 

2.1.4 使用script

POST _reindex { 
    "source": { 
    "index": "twitter" }, "dest": { 
    "index": "new_twitter", "version_type": "external" }, "script": { 
    "source": "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}", "lang": "painless" } } 

2.1.5 使用Ingest Node

这个功能应该说是最好用的了。当你的source是因为不合理的结构,需要重新结构化所有的数据时,通过ingest node, 可以很方便的在新的index中获得不一眼的mapping和value:

POST _reindex { 
    "source": { 
    "index": "source" }, "dest": { 
    "index": "dest", "pipeline": "some_ingest_pipeline" } } 

2.1.6 reindex远程服务器的索引到本地

使用remote属性:

POST _reindex { 
    "source": { 
    "remote": { 
    "host": "http://otherhost:9200", "username": "user", "password": "pass" }, "index": "source", "query": { 
    "match": { 
    "test": "data" } } }, "dest": { 
    "index": "dest" } } 

对于在其他集群上的index,就不存在本地镜像复制的便利。需要从网络上下载数据再写到本地,默认的,buffer的size是100M。在scroll size是1000的情况下,如果单个document的平均大小超过100Kb,则有可能会报错。因此在在遇到非常大的documents,需要减小batch的size:

POST _reindex { 
    "source": { 
    "remote": { 
    "host": "http://otherhost:9200" }, "index": "source", "size": 100, "query": { 
    "match": { 
    "test": "data" } } }, "dest": { 
    "index": "dest" } } 

2.2 参数

2.2.1 version_type

但如果新的index中有数据,并且可能发生冲突,那么可以设置version_type”version_type”: “internal”或者不设置,则Elasticsearch强制性的将文档转储到目标中,覆盖具有相同类型和ID的任何内容:

POST _reindex { 
    "source": { 
    "index": "old_index" }, "dest": { 
    "index": "new_index", "version_type": "internal" } } 

就像_update_by_query,_reindex会生成源索引的快照(snapshot),但它的目标必须是一个不同的索引,以便避免版本冲突。dest对象可以像index API一样进行配置,以乐观锁控制并发。像上面那样,不设置version_type或设置它将设置为internal。Elasticsearch将会直接将文档转储到dest中,覆盖任何发生的具有相同类型和id的document:

POST _reindex { 
    "source": { 
    "index": "twitter" }, "dest": { 
    "index": "new_twitter", "version_type": "internal" } } 

如果把version_type设置为external.则elasticsearch会从source读取version字段,当遇到具有相同类型和id的documents,只更新newer verion。

POST _reindex { 
    "source": { 
    "index": "twitter" }, "dest": { 
    "index": "new_twitter", "version_type": "external" } } 

上面说起来似乎有点不好理解。简单说来,就是在redinex的时候,你的dest index可以不是一个新的index,而是包含有数据的。如果你的source indexh和dest index里面有相同类型和id的document.对于使用internal,是直接覆盖。使用external的话,只有当source的version更加新的时候,才更新。

2.2.2 op_type

把op_type设置为create,_reindex API,只在dest index中添加不不存在的doucments。如果相同的documents已经存在,则会报version confilct的错误。

POST _reindex { 
    "source": { 
    "index": "twitter" }, "dest": { 
    "index": "new_twitter", "op_type": "create" } } 

2.2.3 conflicts

默认情况下,当发生version conflict的时候,_reindex会被abort。除非把conflicts设置为“proceed”:

POST _reindex { 
    "conflicts": "proceed", "source": { 
    "index": "twitter" }, "dest": { 
    "index": "new_twitter", "op_type": "create" } } 

2.2.4 query

我们还可以通过query,把需要_reindex的document限定在一定的范围。下面的例子,就只copy了作者是’kimchy’的twitter到new_twitter:

POST _reindex { 
    "source": { 
    "index": "twitter", "type": "tweet", "query": { 
    "term": { 
    "user": "kimchy" } } }, "dest": { 
    "index": "new_twitter" } } 

2.2.5 size设置

通过size可以控制复制多少内容,下例只复制了一个document:

POST _reindex { 
    "size": 1, "source": { 
    "index": "twitter" }, "dest": { 
    "index": "new_twitter" } } 

2.2.6 sort配置

把时间排序前10000个document,复制到目标:

POST _reindex { 
    "size": 10000, "source": { 
    "index": "twitter", "sort": { 
    "date": "desc" } }, "dest": { 
    "index": "new_twitter" } } 

2.2.7 refresh

2.2.8 timeout

2.2.9 wait_for_active_shards

2.3 查看进度

GET _tasks?detailed=true&actions=*reindex 

返回结果如下:

{ 
    "nodes" : { 
    "r1A2WoRbTwKZ516z6NEs5A" : { 
    "name" : "r1A2WoR", "transport_address" : "127.0.0.1:9300", "host" : "127.0.0.1", "ip" : "127.0.0.1:9300", "attributes" : { 
    "testattr" : "test", "portsfile" : "true" }, "tasks" : { 
    "r1A2WoRbTwKZ516z6NEs5A:36619" : { 
    "node" : "r1A2WoRbTwKZ516z6NEs5A", "id" : 36619, "type" : "transport", "action" : "indices:data/write/reindex", "status" : { 
    "total" : 6154, "updated" : 3500, "created" : 0, "deleted" : 0, "batches" : 4, "version_conflicts" : 0, "noops" : 0, "retries": { 
    "bulk": 0, "search": 0 }, "throttled_millis": 0 }, "description" : "" } } } } } 

上面表示,一共需要处理6154个document,现在已经update了3500个。

2.4 取消reindex

射出去的箭还是可以收回的,通过_tasks API:

POST _tasks/task_id:1/_cancel 

这里的task_id,可以通过上面的tasks API获得。

2.5 补充

在重建索引之前,首先要考虑一下重建索引的必要性,因为重建索引是非常耗时的。

ES 的 reindex api 不会去尝试设置目标索引,不会复制源索引的设置,所以我们应该在运行_reindex 操作之前设置目标索引,包括设置映射(mapping),分片,副本等。

第一步,和创建普通索引一样创建新索引。当数据量很大的时候,需要设置刷新时间间隔,把 refresh_intervals 设置为-1,即不刷新,number_of_replicas 副本数设置为 0(因为副本数可以动态调整,这样有助于提升速度)。

{ 
    "settings": { 
    "number_of_shards": "50", "number_of_replicas": "0", "index": { 
    "refresh_interval": "-1" } } "mappings": { 
    } } 

第二步,调用 reindex 接口,建议加上 wait_for_completion=false 的参数条件,这样 reindex 将直接返回 taskId。

POST _reindex?wait_for_completion=false { 
    "source": { 
    "index": "old_index", //原有索引 "size": 5000 //一个批次处理的数据量 }, "dest": { 
    "index": "new_index", //目标索引 } } 

第三步,等待。可以通过 GET _tasks?detailed=true&actions=*reindex 来查询重建的进度。如果要取消 task 则调用_tasks/node_id:task_id/_cancel。

第四步,删除旧索引,释放磁盘空间。更多细节可以查看 ES 官网的 reindex api。

POST /_reindex { 
    "conflicts": "proceed", //意思是冲突以旧索引为准,直接跳过冲突,否则会抛出异常,停止task "source": { 
    "index": "old_index" //旧索引 "query": { 
    "constant_score" : { 
    "filter" : { 
    "range" : { 
    "data_update_time" : { 
    "gte" :  //reindex开始时刻前的毫秒时间戳 } } } } } }, "dest": { 
    "index": "new_index", //新索引 "version_type": "external" //以旧索引的数据为准 } } 

3.问题发现:

常规的如果我们只是进行少量的数据迁移利用普通的reindex就可以很好的达到要求,但是当我们发现我们需要迁移的数据量过大时,我们会发现reindex的速度会变得很慢

可行方案:

1)提升批量写入大小值

默认情况下,_reindex使用1000进行批量操作,您可以在source中调整batch_size。

POST _reindex { 
    "source": { 
    "index": "source", "size": 5000 }, "dest": { 
    "index": "dest", "routing": "=cat" } } 

批量大小设置的依据:

1、使用批量索引请求以获得最佳性能。

批量大小取决于数据、分析和集群配置,但一个好的起点是每批处理5-15 MB。

注意,这是物理大小。文档数量不是度量批量大小的好指标。例如,如果每批索引1000个文档:

1)每个1kb的1000个文档是1mb。

2)每个100kb的1000个文档是100 MB。

这些是完全不同的体积大小。

2、逐步递增文档容量大小的方式调优。

1)从大约5-15 MB的大容量开始,慢慢增加,直到你看不到性能的提升。然后开始增加批量写入的并发性(多线程等等)。

2)使用kibana、cerebro或iostat、top和ps等工具监视节点,以查看资源何时开始出现瓶颈。如果您开始接收EsRejectedExecutionException,您的集群就不能再跟上了:至少有一个资源达到了容量。

要么减少并发性,或者提供更多有限的资源(例如从机械硬盘切换到ssd固态硬盘),要么添加更多节点。

2)借助scroll的sliced提升写入效率

Reindex支持Sliced Scroll以并行化重建索引过程。 这种并行化可以提高效率,并提供一种方便的方法将请求分解为更小的部分。

4.sliced原理(from medcl)

slicing使用举例

POST _reindex?slices=5&refresh { 
    "source": { 
    "index": "twitter" }, "dest": { 
    "index": "new_twitter" } } 

slices大小设置注意事项:

效果

实践证明,比默认设置reindex速度能提升10倍+。

4.3 ES副本数设置为0

如果要进行大量批量导入,请考虑通过设置index.number_of_replicas来禁用副本:0

主要原因在于:复制文档时,将整个文档发送到副本节点,并逐字重复索引过程。 这意味着每个副本都将执行分析,索引和潜在合并过程。

相反,如果您使用零副本进行索引,然后在提取完成时启用副本,则恢复过程本质上是逐字节的网络传输。 这比复制索引过程更有效。

PUT /my_logs/_settings { 
    "number_of_replicas": 1 } 

4.4 增加refresh间隔

如果你的搜索结果不需要接近实时的准确性,考虑先不要急于索引刷新refresh。可以将每个索引的refresh_interval到30s。

如果正在进行大量数据导入,可以通过在导入期间将此值设置为-1来禁用刷新。完成后不要忘记重新启用它!

设置方法:

PUT /my_logs/_settings { 
    "refresh_interval": -1 } 

参考:【elasticsearch】ES数据库重建索引 — Reindex(数据迁移)

Reindex API 详解

M.扩展阅读

Elastic Stack 实战手册(早鸟版).pdf 有更详细的介绍。

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/112821.html

(0)
上一篇 2026-01-15 20:46
下一篇 2026-01-15 21:10

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信