Flink Elastic Search Sink Type Mismatch error
I'm connecting Flink and Elastic Search. With the help of documentation. I created a sink using Scala.
def getEsSink:ElasticsearchSink[CustomerSegementation] = {
val httpHosts = new util.ArrayList[HttpHost]
httpHosts.add(new HttpHost("10.0.35.148", 9200, "http"))
val esSinkBuilder = new ElasticsearchSink.Builder[CustomerSegementation](httpHosts, new ElasticsearchSinkFunction[CustomerSegementation]() {
def createIndexRequest(element: CustomerSegementation): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("_id", element._id)
json.put("last_ordered_date", element.last_ordered_date.toString)
Requests.indexRequest.index("customerSegementation").`type`("test_type").source(json)
}
@Override
def process(element: CustomerSegementation, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
indexer.add(createIndexRequest(element))
}
})
esSinkBuilder.setBulkFlushMaxActions(1)
esSinkBuilder.build()
}
Then I am trying to add the sink to the stream
stream.addSink(getEsSink())
I get error saying
I'm not sure where I'm exactly going wrong. I'm using Flink version 1.6.0 and ElasticSearch version 6 and Scala version 2.11.
Please help me.
elasticsearch apache-flink flink-streaming
add a comment |
I'm connecting Flink and Elastic Search. With the help of documentation. I created a sink using Scala.
def getEsSink:ElasticsearchSink[CustomerSegementation] = {
val httpHosts = new util.ArrayList[HttpHost]
httpHosts.add(new HttpHost("10.0.35.148", 9200, "http"))
val esSinkBuilder = new ElasticsearchSink.Builder[CustomerSegementation](httpHosts, new ElasticsearchSinkFunction[CustomerSegementation]() {
def createIndexRequest(element: CustomerSegementation): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("_id", element._id)
json.put("last_ordered_date", element.last_ordered_date.toString)
Requests.indexRequest.index("customerSegementation").`type`("test_type").source(json)
}
@Override
def process(element: CustomerSegementation, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
indexer.add(createIndexRequest(element))
}
})
esSinkBuilder.setBulkFlushMaxActions(1)
esSinkBuilder.build()
}
Then I am trying to add the sink to the stream
stream.addSink(getEsSink())
I get error saying
I'm not sure where I'm exactly going wrong. I'm using Flink version 1.6.0 and ElasticSearch version 6 and Scala version 2.11.
Please help me.
elasticsearch apache-flink flink-streaming
1
Check the element type fromstream
. Looks like the type fromstream
isObjectNode
, which does not match withCustomerSegmentation
.
– David
Nov 14 '18 at 2:28
add a comment |
I'm connecting Flink and Elastic Search. With the help of documentation. I created a sink using Scala.
def getEsSink:ElasticsearchSink[CustomerSegementation] = {
val httpHosts = new util.ArrayList[HttpHost]
httpHosts.add(new HttpHost("10.0.35.148", 9200, "http"))
val esSinkBuilder = new ElasticsearchSink.Builder[CustomerSegementation](httpHosts, new ElasticsearchSinkFunction[CustomerSegementation]() {
def createIndexRequest(element: CustomerSegementation): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("_id", element._id)
json.put("last_ordered_date", element.last_ordered_date.toString)
Requests.indexRequest.index("customerSegementation").`type`("test_type").source(json)
}
@Override
def process(element: CustomerSegementation, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
indexer.add(createIndexRequest(element))
}
})
esSinkBuilder.setBulkFlushMaxActions(1)
esSinkBuilder.build()
}
Then I am trying to add the sink to the stream
stream.addSink(getEsSink())
I get error saying
I'm not sure where I'm exactly going wrong. I'm using Flink version 1.6.0 and ElasticSearch version 6 and Scala version 2.11.
Please help me.
elasticsearch apache-flink flink-streaming
I'm connecting Flink and Elastic Search. With the help of documentation. I created a sink using Scala.
def getEsSink:ElasticsearchSink[CustomerSegementation] = {
val httpHosts = new util.ArrayList[HttpHost]
httpHosts.add(new HttpHost("10.0.35.148", 9200, "http"))
val esSinkBuilder = new ElasticsearchSink.Builder[CustomerSegementation](httpHosts, new ElasticsearchSinkFunction[CustomerSegementation]() {
def createIndexRequest(element: CustomerSegementation): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("_id", element._id)
json.put("last_ordered_date", element.last_ordered_date.toString)
Requests.indexRequest.index("customerSegementation").`type`("test_type").source(json)
}
@Override
def process(element: CustomerSegementation, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
indexer.add(createIndexRequest(element))
}
})
esSinkBuilder.setBulkFlushMaxActions(1)
esSinkBuilder.build()
}
Then I am trying to add the sink to the stream
stream.addSink(getEsSink())
I get error saying
I'm not sure where I'm exactly going wrong. I'm using Flink version 1.6.0 and ElasticSearch version 6 and Scala version 2.11.
Please help me.
elasticsearch apache-flink flink-streaming
elasticsearch apache-flink flink-streaming
edited Nov 13 '18 at 6:41
Navin Rawat
2,76911427
2,76911427
asked Nov 12 '18 at 16:49
bhargava ravali kogantibhargava ravali koganti
9310
9310
1
Check the element type fromstream
. Looks like the type fromstream
isObjectNode
, which does not match withCustomerSegmentation
.
– David
Nov 14 '18 at 2:28
add a comment |
1
Check the element type fromstream
. Looks like the type fromstream
isObjectNode
, which does not match withCustomerSegmentation
.
– David
Nov 14 '18 at 2:28
1
1
Check the element type from
stream
. Looks like the type from stream
is ObjectNode
, which does not match with CustomerSegmentation
.– David
Nov 14 '18 at 2:28
Check the element type from
stream
. Looks like the type from stream
is ObjectNode
, which does not match with CustomerSegmentation
.– David
Nov 14 '18 at 2:28
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53266655%2fflink-elastic-search-sink-type-mismatch-error%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53266655%2fflink-elastic-search-sink-type-mismatch-error%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
1
Check the element type from
stream
. Looks like the type fromstream
isObjectNode
, which does not match withCustomerSegmentation
.– David
Nov 14 '18 at 2:28