赞
踩
1、 需求
增量导入elasticsearch的数据到kafka。
2、 解决方式
1) 自定义一个flume的essource
2)使用spark 的 es rdd
3) 自定义flink的es source
3、解决问题
1) 思路:es中的数据有一个sendTime。也就是发送到es的时间。我们就根据这个时间来增量采集数据。使用es的
transport api。并且使用scorll api来分页。所以我们使用自定义es source 。首先我们是要继承SourceFunction这个类。在run方法中实现查找逻辑。
2)注意点
假如我们的程序挂掉了怎么办。怎么知道我们采集到了哪个时间段呢?~~
这个问题我是这样想的的 首先我是5分钟采集一次。然后记录好每五分钟采集的的条数,es的index,采集的时间段。采集成功了就写入到mysql表中做记录。失败也会写入记录失败。然后如果是因为异常采集失败了。那么就重新采集。采集三次还失败程序就直接退出。然后检查原因再重新启动程序。重新启动先去mysql读取上一次采集的位置。然后从下一次记录开始采集。
2)代码:es -source 是scala代码
-
package com.rongan.source
-
-
import java.util.Date
-
-
import com.rongan.commos.{DateUtils, EsUtils, PropertiesUtil}
-
import com.rongan.constants.Constants
-
import com.rongan.dao.EsExportRecordDAO
-
import com.rongan.model.EsExportRecord
-
import org.apache.flink.streaming.api.functions.source.SourceFunction
-
import org.elasticsearch.search.SearchHit
-
-
import scala.util.control.Breaks.{
break, breakable}
-
-
/**
-
* 自定义es的数据源
-
*
-
* @param clusterName :集群名称
-
* @param esNode :集群节点
-
* @param esPort :es通信端口
-
* @param index :索引名字
-
* @param type1 :tpye
-
*/
-
class
EsSource(val clusterName: String, val esNode: String, val esPort: Int, val index: String, val type1: String,
var fromDate: String)
extends
SourceFunction[String] {
-
-
//判断是否取消运行
-
var
isRunning
=
true
-
//es的客户端
-
EsUtils.getClient(clusterName, esNode, esPort)
-
-
val
properties
= PropertiesUtil.getProperties(Constants.PROPERTIES_PATH)
-
-
override def
run
(sourceContext: SourceFunction.SourceContext[String]): Unit = {
-
//定义一个标志位,标志这是第一次采集
-
var
flag
=
true;
-
//创建客户端
-
EsUtils.getClient(clusterName, esNode, esPort)
-
-
var
toDate
= fromDate
-
-
var
fromDate1
= fromDate
-
-
var
errorCount
=
0;
-
//开始采集数据
-
while (
true && isRunning) {
-
//判断是否是第一次采集。创建lastUpdateTime的采集时间
-
if (flag) {
-
fromDate1 = toDate;
-
flag =
false
-
}
-
else
fromDate1
= DateUtils.targetFormat(DateUtils.add5Minute(DateUtils.strToDate(fromDate1)))
-
toDate = DateUtils.targetFormat(DateUtils.subtraction1second(DateUtils.add5Minute(DateUtils.strToDate(fromDate1))))
-
-
try {
-
var
startTime
= DateUtils.targetFormat(
new
Date())
-
println(
"start collection data index = " + index +
" send_time (start)= " + fromDate1 +
" send_time (end)= "
-
+ toDate +
" currentTime" + startTime)
-
val count: Int = collect(sourceContext, fromDate1, toDate)
-
-
var
endTime
= DateUtils.targetFormat(
new
Date())
-
-
EsExportRecordDAO.updateRecord(EsExportRecord(fromDate1, toDate, count, startTime, endTime,
1, index))
-
errorCount =
0
-
println(
"end of data collection index = " + index +
" send_time (start)= " + fromDate1 +
" send_time (end)= "
-
+ toDate +
" currentTime " + endTime +
" count data = " + count)
-
-
Thread.sleep(properties.getProperty(Constants.ES_COLLECT_INTERVAL).toLong)
-
-
}
catch {
-
case e: Exception => {
-
e.printStackTrace()
-
errorCount +=
1
-
println(
"采集数据出错 index = " + index +
" send_time (开始)= " + fromDate1 +
" send_time (结束) ")
-
EsExportRecordDAO.updateRecord(EsExportRecord(fromDate1,
"00000000",
0,
"00000000",
"00000000",
0, index))
-
fromDate1 = DateUtils.targetFormat(DateUtils.subtraction5Minute(DateUtils.strToDate(fromDate1)))
-
//如果采集三次失败那么就停止程序
-
if (errorCount >=
3) {
-
cancel()
-
}
-
}
-
}
-
}
-
-
}
-
-
//采集数据
-
def
collect
(sourceContext: SourceFunction.SourceContext[String], fromDate: String, toDate: String) = {
-
var
count
=
0;
-
val tuple: (Array[SearchHit], String) = EsUtils.searchByScrollRangeQuery(index, type1,
"send_time.keyword", fromDate, toDate)
-
count = tuple._1.length
-
for
(hit <- tuple._1) {
-
sourceContext.collect(hit.getSourceAsString)
-
}
-
var
scrollID
= tuple._2
-
// println(new Date().toString + " count= " + count)
-
breakable {
-
while (isRunning) {
-
val result: (Array[SearchHit], String) = EsUtils.searchByScrollId(scrollID)
-
if (result._1.length ==
0) {
-
break;
-
}
-
for (hit <- result._1) {
-
sourceContext.collect(hit.getSourceAsString)
-
}
-
count += result._1.
length
-
scrollID
= result._2
-
}
-
}
-
EsUtils.clearScroll(scrollID)
-
count
-
}
-
-
override def
cancel
(): Unit = {
-
isRunning =
false
-
}
-
-
}
-
-
//kafkatopic :roi-center.incident.detail.topic
-
-
object EsCollect {
-
-
}
-
4.整个项目代码请留言~。暂时就是实现这么多。如有更好的想法可以讨论讨论~
esutil代码:
-
package rongan.
util
-
-
import org.
elasticsearch.
action.
search.{
ClearScrollResponse,
SearchRequestBuilder,
SearchResponse}
-
import org.
elasticsearch.
client.
transport.
TransportClient
-
import org.
elasticsearch.
common.
transport.
TransportAddress
-
import org.
elasticsearch.
common.
unit.
TimeValue
-
import org.
elasticsearch.
index.
query.
QueryBuilders
-
import org.
elasticsearch.
search.
SearchHit
-
import org.
elasticsearch.
search.
sort.
SortOrder
-
import rongan.
business.
tornado.
RsdTornadoIpcDeviceEsToHbase.
properties
-
import rongan.
config.
Constans
-
-
import scala.
util.
control.
Breaks.{
break, breakable}
-
-
object
EsUtils {
-
-
import java.
net.
InetAddress
-
-
import org.
elasticsearch.
common.
settings.
Settings
-
import org.
elasticsearch.
transport.
client.
PreBuiltTransportClient
-
-
//创建client
-
var
client:
TransportClient = _
-
-
def
getClient(
clusterName:
String,
host:
String,
port:
Int) = {
-
val
settings:
Settings =
Settings.
builder().
put(
"cluster.name", clusterName).
build
-
client =
new
PreBuiltTransportClient(settings)
-
.
addTransportAddress(
new
TransportAddress(
InetAddress.
getByName(host), port))
-
}
-
-
/**
-
* 该方法用于做范围查询
-
*
-
* @param index :索引名
-
* @param `type` :type 的名字
-
* @param field : 要根据哪个字段的范围来查询
-
* @param fromData :开头的数据
-
* @param toData :结束的数据
-
* @return scroollId
-
*/
-
def
searchByScrollRangeQuery(
index:
String,
`type`:
String,
field:
String,
fromData:
Any,
toData:
Any) = {
-
//1.创建搜索条件
-
val
searchRequestBuilder:
SearchRequestBuilder = client.
prepareSearch()
-
searchRequestBuilder.
setIndices(index)
-
searchRequestBuilder.
setTypes(
`type`)
-
searchRequestBuilder.
setScroll(
new
TimeValue(
30000))
-
//2.设置根据范围查询
-
searchRequestBuilder.
setQuery(
QueryBuilders.
rangeQuery(field).
from(fromData).
to(toData)).
setSize(
10000)
-
searchRequestBuilder.
addSort(
"send_time.keyword",
SortOrder.
ASC)
-
//3.执行查询
-
val
searchResponse:
SearchResponse = searchRequestBuilder.
get
-
//4获取scrollId
-
val
scrollId:
String = searchResponse.
getScrollId
-
//println("scrollID = " + scrollId)
-
//將这一页的数据和scrollId返回
-
val
searchHits:
Array[
SearchHit] = searchResponse.
getHits.
getHits
-
(searchHits, scrollId)
-
}
-
-
-
/**
-
* 根據scrollId查询数据,只查询一页的数据
-
*
-
* @param scrollId1
-
* @return
-
*/
-
def
searchByScrollId(
scrollId1:
String): (
Array[
SearchHit],
String) = {
-
if (scrollId1 ==
null) {
-
return (
Array[
SearchHit](),
null);
-
}
-
// println(scrollId1)
-
// 结果
-
val searchScrollRequestBuilder = client.
prepareSearchScroll(scrollId1)
-
// 重新设定滚动时间
-
searchScrollRequestBuilder.
setScroll(
new
TimeValue(
30000))
-
// 请求
-
val response = searchScrollRequestBuilder.
get
-
// 每次返回下一个批次结果 直到没有结果返回时停止 即hits数组空时
-
//if (response.getHits.getHits.length == 0) break
-
(response.
getHits.
getHits, response.
getScrollId)
-
}
-
-
/**
-
* 清除scrollID
-
*
-
* @param scrollId
-
*/
-
def
clearScroll(
scrollId: String) {
-
if (scrollId ==
null)
return
-
var clearScrollRequestBuilder = client.
prepareClearScroll
-
clearScrollRequestBuilder.
addScrollId(scrollId)
-
val
response:
ClearScrollResponse = clearScrollRequestBuilder.
get
-
response.
isSucceeded
-
}
-
-
def
main(
args:
Array[
String]):
Unit = {
-
// searchByScrollPrefixQuery("a", "b", "c", "d")
-
// 左闭合 右闭合。如果是下一个五分钟。最终的秒数要往后面退一位
-
-
EsUtils.
getClient(properties.
getProperty(
Constans.
ES_CLUSTER_NAME), properties.
getProperty(
Constans.
ES_NODE),
-
properties.
getProperty(
Constans.
ES_PORT).
toInt)
-
var count =
0;
-
val
tuple: (
Array[
SearchHit],
String) =
searchByScrollRangeQuery(
"firewall.ipc.info*",
-
"alert",
"send_time.keyword",
"2019-01-28 19:15:20",
"2019-09-28 19:15:2")
-
count = tuple.
_1.
length
-
var scrollID = tuple.
_2
-
println(count)
-
for (hit <- tuple.
_1) {
-
println(hit.
getSourceAsString)
-
}
-
// EsUtils.getClient("")
-
breakable {
-
while (
true) {
-
val
result: (
Array[
SearchHit],
String) =
searchByScrollId(scrollID)
-
count += result.
_1.
length
-
for (hit <- result.
_1) {
-
println(hit.
getSourceAsString)
-
}
-
if (result.
_1.
length ==
0) {
-
break;
-
}
-
scrollID = result.
_2
-
}
-
println(count)
-
}
-
clearScroll(scrollID)
-
}
-
-
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。