kafka streams DSL: add an option parameter to disable repartition when using `map` `selectByKey` `groupBy`
According to the documents, streams will be marked for repartition when applied map
selectKey
groupBy
even though the new key has been partitioned appropriately. Is it possible to add an option parameter to disable repartition ?
Here is my user case:
there is a topic has been partitioned by user_id
.
# topic 'user', format '%key,%value'
partition-1:
user1,{'user_id':'user1', 'device_id':'device1'}
user1,{'user_id':'user1', 'device_id':'device1'}
user1,{'user_id':'user1', 'device_id':'device2'}
partition-2:
user2,{'user_id':'user2', 'device_id':'device3'}
user2,{'user_id':'user2', 'device_id':'device4'}
I want to count user_id-device_id pairs using DSL as follow:
stream
.groupBy((user_id, value) -> {
JSONObject event = new JSONObject(value);
String userId = event.getString('user_id');
String deviceId = event.getString('device_id');
return String.format("%s&%s", userId,deviceId);
})
.count();
Actually the new key has been partitioned indirectly. There is no need to do it again.
apache-kafka-streams
add a comment |
According to the documents, streams will be marked for repartition when applied map
selectKey
groupBy
even though the new key has been partitioned appropriately. Is it possible to add an option parameter to disable repartition ?
Here is my user case:
there is a topic has been partitioned by user_id
.
# topic 'user', format '%key,%value'
partition-1:
user1,{'user_id':'user1', 'device_id':'device1'}
user1,{'user_id':'user1', 'device_id':'device1'}
user1,{'user_id':'user1', 'device_id':'device2'}
partition-2:
user2,{'user_id':'user2', 'device_id':'device3'}
user2,{'user_id':'user2', 'device_id':'device4'}
I want to count user_id-device_id pairs using DSL as follow:
stream
.groupBy((user_id, value) -> {
JSONObject event = new JSONObject(value);
String userId = event.getString('user_id');
String deviceId = event.getString('device_id');
return String.format("%s&%s", userId,deviceId);
})
.count();
Actually the new key has been partitioned indirectly. There is no need to do it again.
apache-kafka-streams
add a comment |
According to the documents, streams will be marked for repartition when applied map
selectKey
groupBy
even though the new key has been partitioned appropriately. Is it possible to add an option parameter to disable repartition ?
Here is my user case:
there is a topic has been partitioned by user_id
.
# topic 'user', format '%key,%value'
partition-1:
user1,{'user_id':'user1', 'device_id':'device1'}
user1,{'user_id':'user1', 'device_id':'device1'}
user1,{'user_id':'user1', 'device_id':'device2'}
partition-2:
user2,{'user_id':'user2', 'device_id':'device3'}
user2,{'user_id':'user2', 'device_id':'device4'}
I want to count user_id-device_id pairs using DSL as follow:
stream
.groupBy((user_id, value) -> {
JSONObject event = new JSONObject(value);
String userId = event.getString('user_id');
String deviceId = event.getString('device_id');
return String.format("%s&%s", userId,deviceId);
})
.count();
Actually the new key has been partitioned indirectly. There is no need to do it again.
apache-kafka-streams
According to the documents, streams will be marked for repartition when applied map
selectKey
groupBy
even though the new key has been partitioned appropriately. Is it possible to add an option parameter to disable repartition ?
Here is my user case:
there is a topic has been partitioned by user_id
.
# topic 'user', format '%key,%value'
partition-1:
user1,{'user_id':'user1', 'device_id':'device1'}
user1,{'user_id':'user1', 'device_id':'device1'}
user1,{'user_id':'user1', 'device_id':'device2'}
partition-2:
user2,{'user_id':'user2', 'device_id':'device3'}
user2,{'user_id':'user2', 'device_id':'device4'}
I want to count user_id-device_id pairs using DSL as follow:
stream
.groupBy((user_id, value) -> {
JSONObject event = new JSONObject(value);
String userId = event.getString('user_id');
String deviceId = event.getString('device_id');
return String.format("%s&%s", userId,deviceId);
})
.count();
Actually the new key has been partitioned indirectly. There is no need to do it again.
apache-kafka-streams
apache-kafka-streams
edited Nov 15 '18 at 2:17
Lifei Chen
asked Nov 13 '18 at 4:47
Lifei ChenLifei Chen
609
609
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
If you use .groupBy()
, it always causes data re-partitioning. If possible use groupByKey instead, which will re-partition data only if required.
In your case, you are changing the keys anyways, so that will create a re-partition topic.
In fact, the data has been partitioned indirectly (data is repartitioned by user_id, but user_id_device_id also be partitioned too), so repartition is not needed. Besides, I do not want to change the key of kafka record, I just want the key I used for aggregation in kafka streams is user_id_device_id but not user_id
– Lifei Chen
Nov 15 '18 at 2:14
GroupByKey/Aggregation won't change the key of the source topic. Instead, it creates a repartition topic as internal topic. That belongs to application.id
– Nishu Tayal
Nov 15 '18 at 8:36
Yes, my suggestion is whether it is possilbe that developer of kafka streams could add an option parameter to disable repartition (because it will generate repartition topics) and the key for aggregation not using the key of record directly but also key generated by value of record
– Lifei Chen
Nov 15 '18 at 9:33
No, it is not possible at the moment in Streams DSL directly. You can low level Processor API to implement custom logic to avoid the repartitioning
– Nishu Tayal
Nov 15 '18 at 10:00
Thanks, I will approve the answer, if there are features about it, please mention it in later comments. :)
– Lifei Chen
Nov 15 '18 at 10:06
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53273984%2fkafka-streams-dsl-add-an-option-parameter-to-disable-repartition-when-using-ma%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
If you use .groupBy()
, it always causes data re-partitioning. If possible use groupByKey instead, which will re-partition data only if required.
In your case, you are changing the keys anyways, so that will create a re-partition topic.
In fact, the data has been partitioned indirectly (data is repartitioned by user_id, but user_id_device_id also be partitioned too), so repartition is not needed. Besides, I do not want to change the key of kafka record, I just want the key I used for aggregation in kafka streams is user_id_device_id but not user_id
– Lifei Chen
Nov 15 '18 at 2:14
GroupByKey/Aggregation won't change the key of the source topic. Instead, it creates a repartition topic as internal topic. That belongs to application.id
– Nishu Tayal
Nov 15 '18 at 8:36
Yes, my suggestion is whether it is possilbe that developer of kafka streams could add an option parameter to disable repartition (because it will generate repartition topics) and the key for aggregation not using the key of record directly but also key generated by value of record
– Lifei Chen
Nov 15 '18 at 9:33
No, it is not possible at the moment in Streams DSL directly. You can low level Processor API to implement custom logic to avoid the repartitioning
– Nishu Tayal
Nov 15 '18 at 10:00
Thanks, I will approve the answer, if there are features about it, please mention it in later comments. :)
– Lifei Chen
Nov 15 '18 at 10:06
add a comment |
If you use .groupBy()
, it always causes data re-partitioning. If possible use groupByKey instead, which will re-partition data only if required.
In your case, you are changing the keys anyways, so that will create a re-partition topic.
In fact, the data has been partitioned indirectly (data is repartitioned by user_id, but user_id_device_id also be partitioned too), so repartition is not needed. Besides, I do not want to change the key of kafka record, I just want the key I used for aggregation in kafka streams is user_id_device_id but not user_id
– Lifei Chen
Nov 15 '18 at 2:14
GroupByKey/Aggregation won't change the key of the source topic. Instead, it creates a repartition topic as internal topic. That belongs to application.id
– Nishu Tayal
Nov 15 '18 at 8:36
Yes, my suggestion is whether it is possilbe that developer of kafka streams could add an option parameter to disable repartition (because it will generate repartition topics) and the key for aggregation not using the key of record directly but also key generated by value of record
– Lifei Chen
Nov 15 '18 at 9:33
No, it is not possible at the moment in Streams DSL directly. You can low level Processor API to implement custom logic to avoid the repartitioning
– Nishu Tayal
Nov 15 '18 at 10:00
Thanks, I will approve the answer, if there are features about it, please mention it in later comments. :)
– Lifei Chen
Nov 15 '18 at 10:06
add a comment |
If you use .groupBy()
, it always causes data re-partitioning. If possible use groupByKey instead, which will re-partition data only if required.
In your case, you are changing the keys anyways, so that will create a re-partition topic.
If you use .groupBy()
, it always causes data re-partitioning. If possible use groupByKey instead, which will re-partition data only if required.
In your case, you are changing the keys anyways, so that will create a re-partition topic.
answered Nov 14 '18 at 18:51
Nishu TayalNishu Tayal
12k73481
12k73481
In fact, the data has been partitioned indirectly (data is repartitioned by user_id, but user_id_device_id also be partitioned too), so repartition is not needed. Besides, I do not want to change the key of kafka record, I just want the key I used for aggregation in kafka streams is user_id_device_id but not user_id
– Lifei Chen
Nov 15 '18 at 2:14
GroupByKey/Aggregation won't change the key of the source topic. Instead, it creates a repartition topic as internal topic. That belongs to application.id
– Nishu Tayal
Nov 15 '18 at 8:36
Yes, my suggestion is whether it is possilbe that developer of kafka streams could add an option parameter to disable repartition (because it will generate repartition topics) and the key for aggregation not using the key of record directly but also key generated by value of record
– Lifei Chen
Nov 15 '18 at 9:33
No, it is not possible at the moment in Streams DSL directly. You can low level Processor API to implement custom logic to avoid the repartitioning
– Nishu Tayal
Nov 15 '18 at 10:00
Thanks, I will approve the answer, if there are features about it, please mention it in later comments. :)
– Lifei Chen
Nov 15 '18 at 10:06
add a comment |
In fact, the data has been partitioned indirectly (data is repartitioned by user_id, but user_id_device_id also be partitioned too), so repartition is not needed. Besides, I do not want to change the key of kafka record, I just want the key I used for aggregation in kafka streams is user_id_device_id but not user_id
– Lifei Chen
Nov 15 '18 at 2:14
GroupByKey/Aggregation won't change the key of the source topic. Instead, it creates a repartition topic as internal topic. That belongs to application.id
– Nishu Tayal
Nov 15 '18 at 8:36
Yes, my suggestion is whether it is possilbe that developer of kafka streams could add an option parameter to disable repartition (because it will generate repartition topics) and the key for aggregation not using the key of record directly but also key generated by value of record
– Lifei Chen
Nov 15 '18 at 9:33
No, it is not possible at the moment in Streams DSL directly. You can low level Processor API to implement custom logic to avoid the repartitioning
– Nishu Tayal
Nov 15 '18 at 10:00
Thanks, I will approve the answer, if there are features about it, please mention it in later comments. :)
– Lifei Chen
Nov 15 '18 at 10:06
In fact, the data has been partitioned indirectly (data is repartitioned by user_id, but user_id_device_id also be partitioned too), so repartition is not needed. Besides, I do not want to change the key of kafka record, I just want the key I used for aggregation in kafka streams is user_id_device_id but not user_id
– Lifei Chen
Nov 15 '18 at 2:14
In fact, the data has been partitioned indirectly (data is repartitioned by user_id, but user_id_device_id also be partitioned too), so repartition is not needed. Besides, I do not want to change the key of kafka record, I just want the key I used for aggregation in kafka streams is user_id_device_id but not user_id
– Lifei Chen
Nov 15 '18 at 2:14
GroupByKey/Aggregation won't change the key of the source topic. Instead, it creates a repartition topic as internal topic. That belongs to application.id
– Nishu Tayal
Nov 15 '18 at 8:36
GroupByKey/Aggregation won't change the key of the source topic. Instead, it creates a repartition topic as internal topic. That belongs to application.id
– Nishu Tayal
Nov 15 '18 at 8:36
Yes, my suggestion is whether it is possilbe that developer of kafka streams could add an option parameter to disable repartition (because it will generate repartition topics) and the key for aggregation not using the key of record directly but also key generated by value of record
– Lifei Chen
Nov 15 '18 at 9:33
Yes, my suggestion is whether it is possilbe that developer of kafka streams could add an option parameter to disable repartition (because it will generate repartition topics) and the key for aggregation not using the key of record directly but also key generated by value of record
– Lifei Chen
Nov 15 '18 at 9:33
No, it is not possible at the moment in Streams DSL directly. You can low level Processor API to implement custom logic to avoid the repartitioning
– Nishu Tayal
Nov 15 '18 at 10:00
No, it is not possible at the moment in Streams DSL directly. You can low level Processor API to implement custom logic to avoid the repartitioning
– Nishu Tayal
Nov 15 '18 at 10:00
Thanks, I will approve the answer, if there are features about it, please mention it in later comments. :)
– Lifei Chen
Nov 15 '18 at 10:06
Thanks, I will approve the answer, if there are features about it, please mention it in later comments. :)
– Lifei Chen
Nov 15 '18 at 10:06
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53273984%2fkafka-streams-dsl-add-an-option-parameter-to-disable-repartition-when-using-ma%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown