赞
踩
先创建一个es测试表test_wd
插入一条数据
import java.io.IOException; import java.util.Date; import java.util.HashMap; import java.util.Map; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.RequestOptions; public class InsertData { public static void main(String[] args) { addOneData(); } private static void addOneData() { Map<String, Object> map = new HashMap<String, Object>(); map.put("company", "测试公司哈哈哈"); map.put("desc", "这是一个测试公司"); map.put("person", "小测"); map.put("title", "hello world!"); map.put("createTime", new Date().getTime()); IndexRequest request = new IndexRequest("test_wd").source(map);//.id可设置自定义id IndexResponse response = null; try { response = ESClient.getES7Client().index(request, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } ESClient.close(); } }
import java.io.IOException; import java.util.Date; import java.util.HashMap; import java.util.Map; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; public class UpdateEsData { public static void main(String[] args) throws IOException { RestHighLevelClient client = ESClient.getES7Client(); Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("createTime", new Date().getTime()); jsonMap.put("company", "测试公司"); jsonMap.put("person", "测试"); UpdateRequest request = new UpdateRequest("test_wd", "trcAhHYBOd26bujRs8yo").doc(jsonMap); try { UpdateResponse response = client.update(request, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } client.close(); } }
import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; public class UpdateEsData { public static void main(String[] args) throws IOException { RestHighLevelClient client = ESClient.getES7Client(); BulkRequest request = new BulkRequest(); // 第一条 Map<String, Object> map = new HashMap<>(); map.put("title", "测试批量修改"); map.put("desc", "这里是测试公司"); request.add(new UpdateRequest("test_wd", "37cjhHYBOd26bujR8sya").doc(map)); // 第二条 map.put("title", "正式修改"); map.put("desc", "这里是正式公司"); request.add(new UpdateRequest("test_wd", "3rcjhHYBOd26bujR6syQ").doc(map)); // 添加多个提交... // TODO:批量修改:new UpdateRequest // TODO:批量增加:new IndexRequest // request.add(new IndexRequest("test_wd").source(map)); // 提交 BulkResponse bulkResponse = null; try { bulkResponse = client.bulk(request, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } System.out.println("批量更新状态:" + bulkResponse.status()); client.close(); } }
import java.io.*; import java.util.Date; import java.util.HashMap; import java.util.Map; import org.elasticsearch.action.index.IndexRequest; import com.alibaba.fastjson.JSONObject; import com.es.util.ESBulkProcessor; public class BulkAdd { public static String indexName = "test_wd"; public static void main(String[] args) { buldAddData(); } private static void buldAddData() { ESBulkProcessor bulk = new ESBulkProcessor(); bulk.init(); for (int i = 1; i < 1000; i++) { if (i % 100 == 0) { System.out.println(i); } Map<String, Object> map = new HashMap<String, Object>(); map.put("title", "test_title" + i); map.put("company", "测试数据" + i); map.put("person", "小" + i); map.put("desc", "这里是测试" + i + "公司"); map.put("createTime", new Date().getTime()); IndexRequest ir = new IndexRequest(indexName, "_doc", "" + i).source(map); bulk.addRequest(ir); } bulk.closeBulkProcessor();//一定要close,不然会出现程序执行完毕未报错,但es并没有进数据的情况 } }
import java.io.IOException; import java.text.MessageFormat; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.http.HttpHost; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; import org.elasticsearch.client.indices.GetMappingsRequest; import org.elasticsearch.client.indices.GetMappingsResponse; import org.elasticsearch.client.indices.PutMappingRequest; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import com.alibaba.fastjson.JSON; public class ESBulkProcessor { static final Logger logger = LogManager.getLogger("ESBulkProcessor"); RestHighLevelClient client = null; BulkProcessor bulkProcessor = null; MappingMetaData mmd = null; /** * 依据HBase表名进行初始化工作 * * @param tableName */ public synchronized void init() { initESClient(); initESBulkProcessor(); } /** * 初始化ES客户端 */ private void initESClient() { logger.info("开始初始化Elastic Search客户端"); client = ESClient.getES7Client(); } /** * 初始化ES批量处理器 */ private void initESBulkProcessor() { logger.info("开始初始化批量操作处理器 BulkProcessor:bulkAsync"); BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { int numberOfActions = request.numberOfActions(); logger.debug("executionId=" + executionId + ", numberOfActions=" + numberOfActions); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { logger.error("Bulk executed with failures, executionId=" + executionId); } else { logger.debug("Bulk completed in milliseconds=" + response.getTook().getMillis() + "executionId=" + executionId); } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { logger.error("BulkProcessor.Listener.afterBulk :: " + executionId); logger.error(failure); } }; BulkProcessor.Builder builder = BulkProcessor.builder( (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener); builder.setBulkActions(1000); builder.setBulkSize(new ByteSizeValue(1024L, ByteSizeUnit.MB)); builder.setConcurrentRequests(1); builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); // builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(10L), // 3)); bulkProcessor = builder.build(); logger.info("BulkProcessor:bulkAsync 初始化完毕"); } /** * 初始/刷新获取ES索引中的Mapping信息 * * @param indexName */ private void initESMapping(String indexName) { logger.info("获取ES中索引的信息, indexName=" + indexName); GetMappingsRequest request = new GetMappingsRequest(); request.indices(indexName); try { GetMappingsResponse getMappingResponse = client.indices().getMapping(request, RequestOptions.DEFAULT); if (getMappingResponse != null && getMappingResponse.mappings().size() > 0) { mmd = getMappingResponse.mappings().get(indexName); logger.info(JSON.toJSONString(mmd)); } else { logger.info(MessageFormat.format("ES中未找到对应的索引 {0}", indexName)); } } catch (Exception e) { logger.error(e); logger.info("相应索引不存在,该异常不影响程序继续执行"); } } /** * 检查所以中是否存在响应的字段 * * @param indexName * @param fieldName * @return */ @SuppressWarnings("unchecked") public boolean checkField2(String indexName, String fieldName) { if (mmd == null) return false; Object fields = mmd.getSourceAsMap().get("properties"); // 存在直接返回 if (fields != null && ((Map<String, Object>) fields).containsKey(fieldName)) { return true; } else { return false; } } /** * 新增 * * @param ir */ public void addRequest(IndexRequest ir) { bulkProcessor.add(ir); } /** * 更新 * * @param ur */ public void addRequest(UpdateRequest ur) { bulkProcessor.add(ur); } /** * 删除 * * @param dr */ public void addRequest(DeleteRequest dr) { bulkProcessor.add(dr); } /** * 关闭批量操作,关闭客户端 */ public void closeBulkProcessor() { logger.info("开始开始关闭 ES BulkProcessor"); try { if (bulkProcessor != null) bulkProcessor.awaitClose(60L, TimeUnit.SECONDS); } catch (Exception e) { logger.error("ESManager.closeClient:bulkProcessor", e); } finally { try { if (client != null) client.close(); } catch (Exception e) { logger.error("ESManager.closeClient:client", e); } } } }
/* * 删除整个索引:慎用 */ public static void deleteIndex(String indexName) { try { RestHighLevelClient client = ESClient.getES7Client(); try { System.out.println("【严重警告,慎重操作,sleep500s】正在删除索引!!!!!"+indexName); Thread.sleep(500000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } DeleteIndexRequest request = new DeleteIndexRequest(indexName); client.indices().delete(request, RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } }
/**
* @param indexName:索引
* @param id:主键
*/
private static void deleteById(String indexName, String id) throws IOException {
RestHighLevelClient client = ESClient.getES7Client();
DeleteRequest deleteRequest = new DeleteRequest(indexName, id);
client.delete(deleteRequest, RequestOptions.DEFAULT);
client.close();
System.out.println("------删除成功!------");
}
/* * 根据条件删除 */ private static void deleteByQuery(String indexName) throws Exception { RestHighLevelClient client = ESClient.getES7Client(); DeleteByQueryRequest deleteByQuery = new DeleteByQueryRequest(indexName);// _all // term查询 deleteByQuery.setQuery(new TermQueryBuilder("_id", "c7a45799d5131e19af28bf3db6b832dd")); // prefix查询 deleteByQuery.setQuery(new PrefixQueryBuilder("lanmu", "5ea5")); // queryString查询 deleteByQuery.setQuery(new QueryStringQueryBuilder("深圳市南山区").field("company")); // 根据时间删除 deleteByQuery.setQuery(new RangeQueryBuilder("createTime").gt(sdf_day.parse("20200526").getTime())); client.deleteByQuery(deleteByQuery, RequestOptions.DEFAULT); ESClient.close(); System.out.println("------删除成功!------"); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。