SparkSQL subquery and performance











up vote
2
down vote

favorite












In order to allow users of the system dynamically create(via application web UI) different data dictionaries with auxiliary data, I use DataFrames and expose them as temp tables, for example:



Seq("Italy", "France", "United States", "Spain").toDF("country").createOrReplaceTempView("big_countries")
Seq("Poland", "Hungary", "Spain").toDF("country").createOrReplaceTempView("medium_countries")


The number of these dictionaries is only limited by the user imagination and business needs.



After that users also create different queries which may use conditions based on the previously defined auxiliary data, for example SQL WHERE conditions:



Q1: country IN (FROM medium_countries)
Q2: (TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL) AND phone = '+91-9111999998'
Q3: TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL
......
Qn: name = 'Donald' AND email = 'donald@example.com' AND phone = '+1-2222222222'


The number of these queries is only limited by the user imagination and business needs.



My biggest worry right now is the subquery like country IN (FROM medium_countries)



I can't use explicit JOIN here according to the system design so I limited to use subqueries. So I have a question - typically the size of these auxiliary data tables should be relatively small... I think a few thousand rows in the worst case and the total number of these tables - a few hundred in the worst case. Taking this into account, can this approach lead to the performance issue and is there any technics exist that can optimize the process, like caching these dictionaries in memory and so on?



UPDATED



Right now I can test it only in Spark Local Mode



Query:



country IN (FROM big_countries)


Execution plan:



+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|plan |tag|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|== Physical Plan ==
*(1) Project [unique_id#27L]
+- *(1) BroadcastHashJoin [country#22], [country#3], LeftSemi, BuildRight
:- *(1) Project [country#22, unique_id#27L]
: +- LocalTableScan [name#19, email#20, phone#21, country#22, unique_id#27L]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- LocalTableScan [country#3]|big|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+


Query:



TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL


Execution plan:



+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|plan |tag|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|== Physical Plan ==
*(1) Project [unique_id#27L]
+- *(1) Filter (true = (exists#53 && exists#54))
+- *(1) BroadcastHashJoin [country#22], [country#3], ExistenceJoin(exists#54), BuildRight
:- *(1) BroadcastHashJoin [country#22], [country#8], ExistenceJoin(exists#53), BuildRight
: :- *(1) Project [country#22, unique_id#27L]
: : +- *(1) Filter isnotnull(EMAIL#20)
: : +- LocalTableScan [name#19, email#20, phone#21, country#22, unique_id#27L]
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
: +- LocalTableScan [country#8]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- LocalTableScan [country#3]|big|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+









share|improve this question
























  • Could you post Physical execution plan of the dataframe? Output of the Dataframe.explain.
    – alexeipab
    Nov 11 at 8:36










  • Sure, updated the question
    – alexanoid
    Nov 11 at 8:44















up vote
2
down vote

favorite












In order to allow users of the system dynamically create(via application web UI) different data dictionaries with auxiliary data, I use DataFrames and expose them as temp tables, for example:



Seq("Italy", "France", "United States", "Spain").toDF("country").createOrReplaceTempView("big_countries")
Seq("Poland", "Hungary", "Spain").toDF("country").createOrReplaceTempView("medium_countries")


The number of these dictionaries is only limited by the user imagination and business needs.



After that users also create different queries which may use conditions based on the previously defined auxiliary data, for example SQL WHERE conditions:



Q1: country IN (FROM medium_countries)
Q2: (TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL) AND phone = '+91-9111999998'
Q3: TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL
......
Qn: name = 'Donald' AND email = 'donald@example.com' AND phone = '+1-2222222222'


The number of these queries is only limited by the user imagination and business needs.



My biggest worry right now is the subquery like country IN (FROM medium_countries)



I can't use explicit JOIN here according to the system design so I limited to use subqueries. So I have a question - typically the size of these auxiliary data tables should be relatively small... I think a few thousand rows in the worst case and the total number of these tables - a few hundred in the worst case. Taking this into account, can this approach lead to the performance issue and is there any technics exist that can optimize the process, like caching these dictionaries in memory and so on?



UPDATED



Right now I can test it only in Spark Local Mode



Query:



country IN (FROM big_countries)


Execution plan:



+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|plan |tag|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|== Physical Plan ==
*(1) Project [unique_id#27L]
+- *(1) BroadcastHashJoin [country#22], [country#3], LeftSemi, BuildRight
:- *(1) Project [country#22, unique_id#27L]
: +- LocalTableScan [name#19, email#20, phone#21, country#22, unique_id#27L]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- LocalTableScan [country#3]|big|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+


Query:



TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL


Execution plan:



+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|plan |tag|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|== Physical Plan ==
*(1) Project [unique_id#27L]
+- *(1) Filter (true = (exists#53 && exists#54))
+- *(1) BroadcastHashJoin [country#22], [country#3], ExistenceJoin(exists#54), BuildRight
:- *(1) BroadcastHashJoin [country#22], [country#8], ExistenceJoin(exists#53), BuildRight
: :- *(1) Project [country#22, unique_id#27L]
: : +- *(1) Filter isnotnull(EMAIL#20)
: : +- LocalTableScan [name#19, email#20, phone#21, country#22, unique_id#27L]
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
: +- LocalTableScan [country#8]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- LocalTableScan [country#3]|big|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+









share|improve this question
























  • Could you post Physical execution plan of the dataframe? Output of the Dataframe.explain.
    – alexeipab
    Nov 11 at 8:36










  • Sure, updated the question
    – alexanoid
    Nov 11 at 8:44













up vote
2
down vote

favorite









up vote
2
down vote

favorite











In order to allow users of the system dynamically create(via application web UI) different data dictionaries with auxiliary data, I use DataFrames and expose them as temp tables, for example:



Seq("Italy", "France", "United States", "Spain").toDF("country").createOrReplaceTempView("big_countries")
Seq("Poland", "Hungary", "Spain").toDF("country").createOrReplaceTempView("medium_countries")


The number of these dictionaries is only limited by the user imagination and business needs.



After that users also create different queries which may use conditions based on the previously defined auxiliary data, for example SQL WHERE conditions:



Q1: country IN (FROM medium_countries)
Q2: (TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL) AND phone = '+91-9111999998'
Q3: TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL
......
Qn: name = 'Donald' AND email = 'donald@example.com' AND phone = '+1-2222222222'


The number of these queries is only limited by the user imagination and business needs.



My biggest worry right now is the subquery like country IN (FROM medium_countries)



I can't use explicit JOIN here according to the system design so I limited to use subqueries. So I have a question - typically the size of these auxiliary data tables should be relatively small... I think a few thousand rows in the worst case and the total number of these tables - a few hundred in the worst case. Taking this into account, can this approach lead to the performance issue and is there any technics exist that can optimize the process, like caching these dictionaries in memory and so on?



UPDATED



Right now I can test it only in Spark Local Mode



Query:



country IN (FROM big_countries)


Execution plan:



+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|plan |tag|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|== Physical Plan ==
*(1) Project [unique_id#27L]
+- *(1) BroadcastHashJoin [country#22], [country#3], LeftSemi, BuildRight
:- *(1) Project [country#22, unique_id#27L]
: +- LocalTableScan [name#19, email#20, phone#21, country#22, unique_id#27L]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- LocalTableScan [country#3]|big|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+


Query:



TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL


Execution plan:



+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|plan |tag|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|== Physical Plan ==
*(1) Project [unique_id#27L]
+- *(1) Filter (true = (exists#53 && exists#54))
+- *(1) BroadcastHashJoin [country#22], [country#3], ExistenceJoin(exists#54), BuildRight
:- *(1) BroadcastHashJoin [country#22], [country#8], ExistenceJoin(exists#53), BuildRight
: :- *(1) Project [country#22, unique_id#27L]
: : +- *(1) Filter isnotnull(EMAIL#20)
: : +- LocalTableScan [name#19, email#20, phone#21, country#22, unique_id#27L]
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
: +- LocalTableScan [country#8]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- LocalTableScan [country#3]|big|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+









share|improve this question















In order to allow users of the system dynamically create(via application web UI) different data dictionaries with auxiliary data, I use DataFrames and expose them as temp tables, for example:



Seq("Italy", "France", "United States", "Spain").toDF("country").createOrReplaceTempView("big_countries")
Seq("Poland", "Hungary", "Spain").toDF("country").createOrReplaceTempView("medium_countries")


The number of these dictionaries is only limited by the user imagination and business needs.



After that users also create different queries which may use conditions based on the previously defined auxiliary data, for example SQL WHERE conditions:



Q1: country IN (FROM medium_countries)
Q2: (TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL) AND phone = '+91-9111999998'
Q3: TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL
......
Qn: name = 'Donald' AND email = 'donald@example.com' AND phone = '+1-2222222222'


The number of these queries is only limited by the user imagination and business needs.



My biggest worry right now is the subquery like country IN (FROM medium_countries)



I can't use explicit JOIN here according to the system design so I limited to use subqueries. So I have a question - typically the size of these auxiliary data tables should be relatively small... I think a few thousand rows in the worst case and the total number of these tables - a few hundred in the worst case. Taking this into account, can this approach lead to the performance issue and is there any technics exist that can optimize the process, like caching these dictionaries in memory and so on?



UPDATED



Right now I can test it only in Spark Local Mode



Query:



country IN (FROM big_countries)


Execution plan:



+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|plan |tag|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|== Physical Plan ==
*(1) Project [unique_id#27L]
+- *(1) BroadcastHashJoin [country#22], [country#3], LeftSemi, BuildRight
:- *(1) Project [country#22, unique_id#27L]
: +- LocalTableScan [name#19, email#20, phone#21, country#22, unique_id#27L]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- LocalTableScan [country#3]|big|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+


Query:



TRUE = ((country IN (FROM medium_countries)) AND (country IN (FROM big_countries))) AND EMAIL IS NOT NULL


Execution plan:



+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|plan |tag|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+
|== Physical Plan ==
*(1) Project [unique_id#27L]
+- *(1) Filter (true = (exists#53 && exists#54))
+- *(1) BroadcastHashJoin [country#22], [country#3], ExistenceJoin(exists#54), BuildRight
:- *(1) BroadcastHashJoin [country#22], [country#8], ExistenceJoin(exists#53), BuildRight
: :- *(1) Project [country#22, unique_id#27L]
: : +- *(1) Filter isnotnull(EMAIL#20)
: : +- LocalTableScan [name#19, email#20, phone#21, country#22, unique_id#27L]
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
: +- LocalTableScan [country#8]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- LocalTableScan [country#3]|big|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+






apache-spark apache-spark-sql






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 11 at 12:26

























asked Nov 11 at 7:50









alexanoid

6,9731176168




6,9731176168












  • Could you post Physical execution plan of the dataframe? Output of the Dataframe.explain.
    – alexeipab
    Nov 11 at 8:36










  • Sure, updated the question
    – alexanoid
    Nov 11 at 8:44


















  • Could you post Physical execution plan of the dataframe? Output of the Dataframe.explain.
    – alexeipab
    Nov 11 at 8:36










  • Sure, updated the question
    – alexanoid
    Nov 11 at 8:44
















Could you post Physical execution plan of the dataframe? Output of the Dataframe.explain.
– alexeipab
Nov 11 at 8:36




Could you post Physical execution plan of the dataframe? Output of the Dataframe.explain.
– alexeipab
Nov 11 at 8:36












Sure, updated the question
– alexanoid
Nov 11 at 8:44




Sure, updated the question
– alexanoid
Nov 11 at 8:44












1 Answer
1






active

oldest

votes

















up vote
1
down vote



accepted










I think that:



CACHE TABLE tbl  as in sql("CACHE TABLE tbl")


is what you need to be executed after your:



...createOrReplaceTempView....


but before the larger queries of course.



In SPARK now, the above statement on "caching" is now eager by default, not lazy. As the manual states you don’t need to trigger cache materialization manually anymore. That is to say, no longer the need to execute a df.show or df.count.



Once in memory - the until you refresh explicitly, this data need not be gotten every time again and here it looks like no filtering, rather just load all the limited set of data once.



Not knowing your design but looking at it, the sub-query should be fine. Try this approach and look at the Physical Plan. In traditional RDBMS's, this type of limited sub-query - from what I can see - is also not a deal breaker.



You can also see that the Physical Plan shows the Catalyst Optimizer has optimized / converted your IN sub-query already to a JOIN, a typical performance improvement for larger data sets.



As a result "broadcasting" of the smaller tables to executor's worker nodes occurs to improve performance as well. You probably need not set any limit for Broadcasting, but you could set this explicitly, but may take is this is not required, based on what I observe.






share|improve this answer





















    Your Answer






    StackExchange.ifUsing("editor", function () {
    StackExchange.using("externalEditor", function () {
    StackExchange.using("snippets", function () {
    StackExchange.snippets.init();
    });
    });
    }, "code-snippets");

    StackExchange.ready(function() {
    var channelOptions = {
    tags: "".split(" "),
    id: "1"
    };
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function() {
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled) {
    StackExchange.using("snippets", function() {
    createEditor();
    });
    }
    else {
    createEditor();
    }
    });

    function createEditor() {
    StackExchange.prepareEditor({
    heartbeatType: 'answer',
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader: {
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    },
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    });


    }
    });














    draft saved

    draft discarded


















    StackExchange.ready(
    function () {
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53246811%2fsparksql-subquery-and-performance%23new-answer', 'question_page');
    }
    );

    Post as a guest















    Required, but never shown

























    1 Answer
    1






    active

    oldest

    votes








    1 Answer
    1






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes








    up vote
    1
    down vote



    accepted










    I think that:



    CACHE TABLE tbl  as in sql("CACHE TABLE tbl")


    is what you need to be executed after your:



    ...createOrReplaceTempView....


    but before the larger queries of course.



    In SPARK now, the above statement on "caching" is now eager by default, not lazy. As the manual states you don’t need to trigger cache materialization manually anymore. That is to say, no longer the need to execute a df.show or df.count.



    Once in memory - the until you refresh explicitly, this data need not be gotten every time again and here it looks like no filtering, rather just load all the limited set of data once.



    Not knowing your design but looking at it, the sub-query should be fine. Try this approach and look at the Physical Plan. In traditional RDBMS's, this type of limited sub-query - from what I can see - is also not a deal breaker.



    You can also see that the Physical Plan shows the Catalyst Optimizer has optimized / converted your IN sub-query already to a JOIN, a typical performance improvement for larger data sets.



    As a result "broadcasting" of the smaller tables to executor's worker nodes occurs to improve performance as well. You probably need not set any limit for Broadcasting, but you could set this explicitly, but may take is this is not required, based on what I observe.






    share|improve this answer

























      up vote
      1
      down vote



      accepted










      I think that:



      CACHE TABLE tbl  as in sql("CACHE TABLE tbl")


      is what you need to be executed after your:



      ...createOrReplaceTempView....


      but before the larger queries of course.



      In SPARK now, the above statement on "caching" is now eager by default, not lazy. As the manual states you don’t need to trigger cache materialization manually anymore. That is to say, no longer the need to execute a df.show or df.count.



      Once in memory - the until you refresh explicitly, this data need not be gotten every time again and here it looks like no filtering, rather just load all the limited set of data once.



      Not knowing your design but looking at it, the sub-query should be fine. Try this approach and look at the Physical Plan. In traditional RDBMS's, this type of limited sub-query - from what I can see - is also not a deal breaker.



      You can also see that the Physical Plan shows the Catalyst Optimizer has optimized / converted your IN sub-query already to a JOIN, a typical performance improvement for larger data sets.



      As a result "broadcasting" of the smaller tables to executor's worker nodes occurs to improve performance as well. You probably need not set any limit for Broadcasting, but you could set this explicitly, but may take is this is not required, based on what I observe.






      share|improve this answer























        up vote
        1
        down vote



        accepted







        up vote
        1
        down vote



        accepted






        I think that:



        CACHE TABLE tbl  as in sql("CACHE TABLE tbl")


        is what you need to be executed after your:



        ...createOrReplaceTempView....


        but before the larger queries of course.



        In SPARK now, the above statement on "caching" is now eager by default, not lazy. As the manual states you don’t need to trigger cache materialization manually anymore. That is to say, no longer the need to execute a df.show or df.count.



        Once in memory - the until you refresh explicitly, this data need not be gotten every time again and here it looks like no filtering, rather just load all the limited set of data once.



        Not knowing your design but looking at it, the sub-query should be fine. Try this approach and look at the Physical Plan. In traditional RDBMS's, this type of limited sub-query - from what I can see - is also not a deal breaker.



        You can also see that the Physical Plan shows the Catalyst Optimizer has optimized / converted your IN sub-query already to a JOIN, a typical performance improvement for larger data sets.



        As a result "broadcasting" of the smaller tables to executor's worker nodes occurs to improve performance as well. You probably need not set any limit for Broadcasting, but you could set this explicitly, but may take is this is not required, based on what I observe.






        share|improve this answer












        I think that:



        CACHE TABLE tbl  as in sql("CACHE TABLE tbl")


        is what you need to be executed after your:



        ...createOrReplaceTempView....


        but before the larger queries of course.



        In SPARK now, the above statement on "caching" is now eager by default, not lazy. As the manual states you don’t need to trigger cache materialization manually anymore. That is to say, no longer the need to execute a df.show or df.count.



        Once in memory - the until you refresh explicitly, this data need not be gotten every time again and here it looks like no filtering, rather just load all the limited set of data once.



        Not knowing your design but looking at it, the sub-query should be fine. Try this approach and look at the Physical Plan. In traditional RDBMS's, this type of limited sub-query - from what I can see - is also not a deal breaker.



        You can also see that the Physical Plan shows the Catalyst Optimizer has optimized / converted your IN sub-query already to a JOIN, a typical performance improvement for larger data sets.



        As a result "broadcasting" of the smaller tables to executor's worker nodes occurs to improve performance as well. You probably need not set any limit for Broadcasting, but you could set this explicitly, but may take is this is not required, based on what I observe.







        share|improve this answer












        share|improve this answer



        share|improve this answer










        answered Nov 12 at 18:43









        thebluephantom

        2,1962823




        2,1962823






























            draft saved

            draft discarded




















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid



            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.


            To learn more, see our tips on writing great answers.





            Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


            Please pay close attention to the following guidance:


            • Please be sure to answer the question. Provide details and share your research!

            But avoid



            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.


            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53246811%2fsparksql-subquery-and-performance%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            Full-time equivalent

            さくらももこ

            13 indicted, 8 arrested in Calif. drug cartel investigation