Flink Elastic Search Sink Type Mismatch error












0















I'm connecting Flink and Elastic Search. With the help of documentation. I created a sink using Scala.



def getEsSink:ElasticsearchSink[CustomerSegementation] = {
val httpHosts = new util.ArrayList[HttpHost]
httpHosts.add(new HttpHost("10.0.35.148", 9200, "http"))
val esSinkBuilder = new ElasticsearchSink.Builder[CustomerSegementation](httpHosts, new ElasticsearchSinkFunction[CustomerSegementation]() {
def createIndexRequest(element: CustomerSegementation): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("_id", element._id)
json.put("last_ordered_date", element.last_ordered_date.toString)
Requests.indexRequest.index("customerSegementation").`type`("test_type").source(json)
}

@Override
def process(element: CustomerSegementation, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
indexer.add(createIndexRequest(element))
}
})
esSinkBuilder.setBulkFlushMaxActions(1)
esSinkBuilder.build()
}


Then I am trying to add the sink to the stream



stream.addSink(getEsSink())


I get error saying
enter image description here



I'm not sure where I'm exactly going wrong. I'm using Flink version 1.6.0 and ElasticSearch version 6 and Scala version 2.11.



Please help me.










share|improve this question




















  • 1





    Check the element type from stream. Looks like the type from stream is ObjectNode, which does not match with CustomerSegmentation.

    – David
    Nov 14 '18 at 2:28
















0















I'm connecting Flink and Elastic Search. With the help of documentation. I created a sink using Scala.



def getEsSink:ElasticsearchSink[CustomerSegementation] = {
val httpHosts = new util.ArrayList[HttpHost]
httpHosts.add(new HttpHost("10.0.35.148", 9200, "http"))
val esSinkBuilder = new ElasticsearchSink.Builder[CustomerSegementation](httpHosts, new ElasticsearchSinkFunction[CustomerSegementation]() {
def createIndexRequest(element: CustomerSegementation): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("_id", element._id)
json.put("last_ordered_date", element.last_ordered_date.toString)
Requests.indexRequest.index("customerSegementation").`type`("test_type").source(json)
}

@Override
def process(element: CustomerSegementation, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
indexer.add(createIndexRequest(element))
}
})
esSinkBuilder.setBulkFlushMaxActions(1)
esSinkBuilder.build()
}


Then I am trying to add the sink to the stream



stream.addSink(getEsSink())


I get error saying
enter image description here



I'm not sure where I'm exactly going wrong. I'm using Flink version 1.6.0 and ElasticSearch version 6 and Scala version 2.11.



Please help me.










share|improve this question




















  • 1





    Check the element type from stream. Looks like the type from stream is ObjectNode, which does not match with CustomerSegmentation.

    – David
    Nov 14 '18 at 2:28














0












0








0








I'm connecting Flink and Elastic Search. With the help of documentation. I created a sink using Scala.



def getEsSink:ElasticsearchSink[CustomerSegementation] = {
val httpHosts = new util.ArrayList[HttpHost]
httpHosts.add(new HttpHost("10.0.35.148", 9200, "http"))
val esSinkBuilder = new ElasticsearchSink.Builder[CustomerSegementation](httpHosts, new ElasticsearchSinkFunction[CustomerSegementation]() {
def createIndexRequest(element: CustomerSegementation): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("_id", element._id)
json.put("last_ordered_date", element.last_ordered_date.toString)
Requests.indexRequest.index("customerSegementation").`type`("test_type").source(json)
}

@Override
def process(element: CustomerSegementation, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
indexer.add(createIndexRequest(element))
}
})
esSinkBuilder.setBulkFlushMaxActions(1)
esSinkBuilder.build()
}


Then I am trying to add the sink to the stream



stream.addSink(getEsSink())


I get error saying
enter image description here



I'm not sure where I'm exactly going wrong. I'm using Flink version 1.6.0 and ElasticSearch version 6 and Scala version 2.11.



Please help me.










share|improve this question
















I'm connecting Flink and Elastic Search. With the help of documentation. I created a sink using Scala.



def getEsSink:ElasticsearchSink[CustomerSegementation] = {
val httpHosts = new util.ArrayList[HttpHost]
httpHosts.add(new HttpHost("10.0.35.148", 9200, "http"))
val esSinkBuilder = new ElasticsearchSink.Builder[CustomerSegementation](httpHosts, new ElasticsearchSinkFunction[CustomerSegementation]() {
def createIndexRequest(element: CustomerSegementation): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("_id", element._id)
json.put("last_ordered_date", element.last_ordered_date.toString)
Requests.indexRequest.index("customerSegementation").`type`("test_type").source(json)
}

@Override
def process(element: CustomerSegementation, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
indexer.add(createIndexRequest(element))
}
})
esSinkBuilder.setBulkFlushMaxActions(1)
esSinkBuilder.build()
}


Then I am trying to add the sink to the stream



stream.addSink(getEsSink())


I get error saying
enter image description here



I'm not sure where I'm exactly going wrong. I'm using Flink version 1.6.0 and ElasticSearch version 6 and Scala version 2.11.



Please help me.







elasticsearch apache-flink flink-streaming






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 13 '18 at 6:41









Navin Rawat

2,76911427




2,76911427










asked Nov 12 '18 at 16:49









bhargava ravali kogantibhargava ravali koganti

9310




9310








  • 1





    Check the element type from stream. Looks like the type from stream is ObjectNode, which does not match with CustomerSegmentation.

    – David
    Nov 14 '18 at 2:28














  • 1





    Check the element type from stream. Looks like the type from stream is ObjectNode, which does not match with CustomerSegmentation.

    – David
    Nov 14 '18 at 2:28








1




1





Check the element type from stream. Looks like the type from stream is ObjectNode, which does not match with CustomerSegmentation.

– David
Nov 14 '18 at 2:28





Check the element type from stream. Looks like the type from stream is ObjectNode, which does not match with CustomerSegmentation.

– David
Nov 14 '18 at 2:28












0






active

oldest

votes











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53266655%2fflink-elastic-search-sink-type-mismatch-error%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























0






active

oldest

votes








0






active

oldest

votes









active

oldest

votes






active

oldest

votes
















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53266655%2fflink-elastic-search-sink-type-mismatch-error%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Full-time equivalent

Bicuculline

さくらももこ