大家好,欢迎来到IT知识分享网。
1.应用背景:
1、当你的数据量过大,而你的索引最初创建的分片数量不足,导致数据入库较慢的情况,此时需要扩大分片的数量,此时可以尝试使用Reindex。
2、当数据的mapping需要修改,但是大量的数据已经导入到索引中了,重新导入数据到新的索引太耗时;但是在ES中,一个字段的mapping在定义并且导入数据之后是不能再修改的,
所以这种情况下也可以考虑尝试使用Reindex。
ES提供了_reindex这个API。相对于我们重新导入数据肯定会快不少,实测速度大概是bulk导入数据的5-10倍。
数据迁移步骤:
- 创建新的索引(可以通过java程序也可以直接在head插件上创建)
- 注意:在创建索引的时候要把表结构也要创建好(也就是mapping)
- 复制数据
1.1 注意事项
- 源和目的不能相同, 比如不能将数据流 Reindex 给它自身。
- 源索引的文档中 _source 字段必须开启。
- Reindex 不会复制源的 setting 和源所匹配的模板, 因此在调用 _reindex 前, 你需要设置好目的索引 (action.auto_create_index 为 false 或者 -.* 时)。
- 目标索引的 mapping, 主分片数, 副本数等推荐提前配置。
1.2 前置要求
如果 Elasticsearch 集群配置了安全策略和权限策略, 则进行 Reindex 必须拥有以下权限:
- 读取源的数据流、 索引、 索引别名等索引级别权限。
- 对于目的数据流、 索引、 索引别名的写权限。
- 如果需要使用 Reindex API 自动创建数据流和索引, 则必须拥有对目的数据流、 索引、 索引别名的 auto_configure、 create_index 或者 manage 等索引级别权限。
- 如果源为远程的集群, 则 source.remote.user 用户必须拥有集群监控权限, 和读取源索引、 源索引别名、 源数据流的权限。
- 如果 Reindex 的源为远程集群, 必须在当前集群的请求节点 elasticsearch.yml文件配置远程白名单 reindex.remote.whitelist。
创建数据流, 需要提前配置好数据流的匹配索引模板, 详情可参看 Set up a data stream: https://www.elastic.co/guide/en/elasticsearch/reference/7.11/set-up-a-data-stream.html
2.Reindex
2.1 使用
2.1.1 基本使用
最简单、基本的方式:
1)代码请求:
POST _reindex {
"source": {
"index": "old_index" }, "dest": {
"index": "new_index" } }
2)利用命令:
curl _XPOST 'ES数据库请求地址:9200/_reindex' -d{
"source":{
"index":"old_index"},"dest":{
"index":"new_index"}}
2.1.2 将多个索引reindex到一个目标
POST _reindex {
"source": {
"index": ["twitter", "blog"], "type": ["tweet", "post"] }, "dest": {
"index": "all_together" } }
这里要注意的是,如果twitter和blog中有document的id是一样的,则无法保证最终出现在all_together里面的document是哪个,因为迭代是随机的。(最后一个会覆盖前一个)
2.1.3 只复制特定的field
POST _reindex {
"source": {
"index": "twitter", "_source": ["user", "tweet"] }, "dest": {
"index": "new_twitter" } }
2.1.4 使用script
POST _reindex {
"source": {
"index": "twitter" }, "dest": {
"index": "new_twitter", "version_type": "external" }, "script": {
"source": "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}", "lang": "painless" } }
2.1.5 使用Ingest Node
这个功能应该说是最好用的了。当你的source是因为不合理的结构,需要重新结构化所有的数据时,通过ingest node, 可以很方便的在新的index中获得不一眼的mapping和value:
POST _reindex {
"source": {
"index": "source" }, "dest": {
"index": "dest", "pipeline": "some_ingest_pipeline" } }
2.1.6 reindex远程服务器的索引到本地
使用remote属性:
POST _reindex {
"source": {
"remote": {
"host": "http://otherhost:9200", "username": "user", "password": "pass" }, "index": "source", "query": {
"match": {
"test": "data" } } }, "dest": {
"index": "dest" } }
对于在其他集群上的index,就不存在本地镜像复制的便利。需要从网络上下载数据再写到本地,默认的,buffer的size是100M。在scroll size是1000的情况下,如果单个document的平均大小超过100Kb,则有可能会报错。因此在在遇到非常大的documents,需要减小batch的size:
POST _reindex {
"source": {
"remote": {
"host": "http://otherhost:9200" }, "index": "source", "size": 100, "query": {
"match": {
"test": "data" } } }, "dest": {
"index": "dest" } }
2.2 参数
2.2.1 version_type
但如果新的index中有数据,并且可能发生冲突,那么可以设置version_type”version_type”: “internal”或者不设置,则Elasticsearch强制性的将文档转储到目标中,覆盖具有相同类型和ID的任何内容:
POST _reindex {
"source": {
"index": "old_index" }, "dest": {
"index": "new_index", "version_type": "internal" } }
就像_update_by_query,_reindex会生成源索引的快照(snapshot),但它的目标必须是一个不同的索引,以便避免版本冲突。dest对象可以像index API一样进行配置,以乐观锁控制并发。像上面那样,不设置version_type或设置它将设置为internal。Elasticsearch将会直接将文档转储到dest中,覆盖任何发生的具有相同类型和id的document:
POST _reindex {
"source": {
"index": "twitter" }, "dest": {
"index": "new_twitter", "version_type": "internal" } }
如果把version_type设置为external.则elasticsearch会从source读取version字段,当遇到具有相同类型和id的documents,只更新newer verion。
POST _reindex {
"source": {
"index": "twitter" }, "dest": {
"index": "new_twitter", "version_type": "external" } }
上面说起来似乎有点不好理解。简单说来,就是在redinex的时候,你的dest index可以不是一个新的index,而是包含有数据的。如果你的source indexh和dest index里面有相同类型和id的document.对于使用internal,是直接覆盖。使用external的话,只有当source的version更加新的时候,才更新。
2.2.2 op_type
把op_type设置为create,_reindex API,只在dest index中添加不不存在的doucments。如果相同的documents已经存在,则会报version confilct的错误。
POST _reindex {
"source": {
"index": "twitter" }, "dest": {
"index": "new_twitter", "op_type": "create" } }
2.2.3 conflicts
默认情况下,当发生version conflict的时候,_reindex会被abort。除非把conflicts设置为“proceed”:
POST _reindex {
"conflicts": "proceed", "source": {
"index": "twitter" }, "dest": {
"index": "new_twitter", "op_type": "create" } }
2.2.4 query
我们还可以通过query,把需要_reindex的document限定在一定的范围。下面的例子,就只copy了作者是’kimchy’的twitter到new_twitter:
POST _reindex {
"source": {
"index": "twitter", "type": "tweet", "query": {
"term": {
"user": "kimchy" } } }, "dest": {
"index": "new_twitter" } }
2.2.5 size设置
通过size可以控制复制多少内容,下例只复制了一个document:
POST _reindex {
"size": 1, "source": {
"index": "twitter" }, "dest": {
"index": "new_twitter" } }
2.2.6 sort配置
把时间排序前10000个document,复制到目标:
POST _reindex {
"size": 10000, "source": {
"index": "twitter", "sort": {
"date": "desc" } }, "dest": {
"index": "new_twitter" } }
2.2.7 refresh
2.2.8 timeout
2.2.9 wait_for_active_shards
2.3 查看进度
GET _tasks?detailed=true&actions=*reindex
返回结果如下:
{
"nodes" : {
"r1A2WoRbTwKZ516z6NEs5A" : {
"name" : "r1A2WoR", "transport_address" : "127.0.0.1:9300", "host" : "127.0.0.1", "ip" : "127.0.0.1:9300", "attributes" : {
"testattr" : "test", "portsfile" : "true" }, "tasks" : {
"r1A2WoRbTwKZ516z6NEs5A:36619" : {
"node" : "r1A2WoRbTwKZ516z6NEs5A", "id" : 36619, "type" : "transport", "action" : "indices:data/write/reindex", "status" : {
"total" : 6154, "updated" : 3500, "created" : 0, "deleted" : 0, "batches" : 4, "version_conflicts" : 0, "noops" : 0, "retries": {
"bulk": 0, "search": 0 }, "throttled_millis": 0 }, "description" : "" } } } } }
上面表示,一共需要处理6154个document,现在已经update了3500个。
2.4 取消reindex
射出去的箭还是可以收回的,通过_tasks API:
POST _tasks/task_id:1/_cancel
这里的task_id,可以通过上面的tasks API获得。
2.5 补充
在重建索引之前,首先要考虑一下重建索引的必要性,因为重建索引是非常耗时的。
ES 的 reindex api 不会去尝试设置目标索引,不会复制源索引的设置,所以我们应该在运行_reindex 操作之前设置目标索引,包括设置映射(mapping),分片,副本等。
第一步,和创建普通索引一样创建新索引。当数据量很大的时候,需要设置刷新时间间隔,把 refresh_intervals 设置为-1,即不刷新,number_of_replicas 副本数设置为 0(因为副本数可以动态调整,这样有助于提升速度)。
{
"settings": {
"number_of_shards": "50", "number_of_replicas": "0", "index": {
"refresh_interval": "-1" } } "mappings": {
} }
第二步,调用 reindex 接口,建议加上 wait_for_completion=false 的参数条件,这样 reindex 将直接返回 taskId。
POST _reindex?wait_for_completion=false {
"source": {
"index": "old_index", //原有索引 "size": 5000 //一个批次处理的数据量 }, "dest": {
"index": "new_index", //目标索引 } }
第三步,等待。可以通过 GET _tasks?detailed=true&actions=*reindex 来查询重建的进度。如果要取消 task 则调用_tasks/node_id:task_id/_cancel。
第四步,删除旧索引,释放磁盘空间。更多细节可以查看 ES 官网的 reindex api。
POST /_reindex {
"conflicts": "proceed", //意思是冲突以旧索引为准,直接跳过冲突,否则会抛出异常,停止task "source": {
"index": "old_index" //旧索引 "query": {
"constant_score" : {
"filter" : {
"range" : {
"data_update_time" : {
"gte" : //reindex开始时刻前的毫秒时间戳 } } } } } }, "dest": {
"index": "new_index", //新索引 "version_type": "external" //以旧索引的数据为准 } }
3.问题发现:
常规的如果我们只是进行少量的数据迁移利用普通的reindex就可以很好的达到要求,但是当我们发现我们需要迁移的数据量过大时,我们会发现reindex的速度会变得很慢
可行方案:
1)提升批量写入大小值
默认情况下,_reindex使用1000进行批量操作,您可以在source中调整batch_size。
POST _reindex {
"source": {
"index": "source", "size": 5000 }, "dest": {
"index": "dest", "routing": "=cat" } }
批量大小设置的依据:
1、使用批量索引请求以获得最佳性能。
批量大小取决于数据、分析和集群配置,但一个好的起点是每批处理5-15 MB。
注意,这是物理大小。文档数量不是度量批量大小的好指标。例如,如果每批索引1000个文档:
1)每个1kb的1000个文档是1mb。
2)每个100kb的1000个文档是100 MB。
这些是完全不同的体积大小。
2、逐步递增文档容量大小的方式调优。
1)从大约5-15 MB的大容量开始,慢慢增加,直到你看不到性能的提升。然后开始增加批量写入的并发性(多线程等等)。
2)使用kibana、cerebro或iostat、top和ps等工具监视节点,以查看资源何时开始出现瓶颈。如果您开始接收EsRejectedExecutionException,您的集群就不能再跟上了:至少有一个资源达到了容量。
要么减少并发性,或者提供更多有限的资源(例如从机械硬盘切换到ssd固态硬盘),要么添加更多节点。
2)借助scroll的sliced提升写入效率
Reindex支持Sliced Scroll以并行化重建索引过程。 这种并行化可以提高效率,并提供一种方便的方法将请求分解为更小的部分。
4.sliced原理(from medcl)
slicing使用举例
POST _reindex?slices=5&refresh {
"source": {
"index": "twitter" }, "dest": {
"index": "new_twitter" } }
slices大小设置注意事项:
效果
实践证明,比默认设置reindex速度能提升10倍+。
4.3 ES副本数设置为0
如果要进行大量批量导入,请考虑通过设置index.number_of_replicas来禁用副本:0。
主要原因在于:复制文档时,将整个文档发送到副本节点,并逐字重复索引过程。 这意味着每个副本都将执行分析,索引和潜在合并过程。
相反,如果您使用零副本进行索引,然后在提取完成时启用副本,则恢复过程本质上是逐字节的网络传输。 这比复制索引过程更有效。
PUT /my_logs/_settings {
"number_of_replicas": 1 }
4.4 增加refresh间隔
如果你的搜索结果不需要接近实时的准确性,考虑先不要急于索引刷新refresh。可以将每个索引的refresh_interval到30s。
如果正在进行大量数据导入,可以通过在导入期间将此值设置为-1来禁用刷新。完成后不要忘记重新启用它!
设置方法:
PUT /my_logs/_settings {
"refresh_interval": -1 }
参考:【elasticsearch】ES数据库重建索引 — Reindex(数据迁移)
Reindex API 详解
M.扩展阅读
Elastic Stack 实战手册(早鸟版).pdf 有更详细的介绍。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/112821.html
