Pyspark: PicklingError: Could not serialize object:











up vote
4
down vote

favorite
2












I have the following two data frames: df_whitelist and df_text



+-------+--------------------+
|keyword| whitelist_terms |
+-------+--------------------+
| LA| LA city|
| LA| US LA in da |
| client|this client has i...|
| client|our client has do...|
+-------+--------------------+
+--------------------+----------+
| Text| Keywords|
+--------------------+----------+
|the client as ada...|client;ada|
|this client has l...| client;LA|
+--------------------+----------+


In df_whitelist, each keyword corresponds to a set of terms, e.g. keyword LA corresponds to “LA city” and “US LA in da”.
In df_text, I have text and some keywords found in this text.
What I want to do is that for each piece of text, such as “the client has ada..”, for each of its keywords “client” and “ada”, check through all the whitelist terms for that keyword, to see how many times the term occurred in the text.
what I have tried is like following:



import pyspark.sql.functions as F
import pyspark.sql.types as T
import re
def whitelisting(text,listOfKeyword,df_whitelist):
keywords = listOfKeyword.split(";")
found_whiteterms_count = 0
for k in keywords:
if df_whitelist.filter(df_whitelist.keyword == k).count() == 0:
found_whiteterms_count = found_whiteterms_count + 0
else:
df = df_whitelist.filter(df_whitelist.keyword == k).select("whitelist_terms")
n = df.rdd.map(lambda x:len(re.findall(x["whitelist_terms"],text))).reduce(lambda x, y: x+y)
found_whiteterms_count = found_whiteterms_count + n
return found_whiteterms_count
whitelisting_udf = F.udf(lambda text,listOfKeyword: whitelisting(text,listOfKeyword,df_whitelist),T.IntegerType())
text.withColumn("whitelist_counts", whitelisting_udf(text.Text,text.Keywords))


and I got the error:



PicklingError: Could not serialize object: Py4JError: An error occurred while calling o1153.__getstate__. Trace:
py4j.Py4JException: Method __getstate__() does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.base/java.lang.Thread.run(Thread.java:844)


I can not figure out after trying for some time. Could anybody kindly help to point out the problem and how to fix it. Thanks.










share|improve this question


























    up vote
    4
    down vote

    favorite
    2












    I have the following two data frames: df_whitelist and df_text



    +-------+--------------------+
    |keyword| whitelist_terms |
    +-------+--------------------+
    | LA| LA city|
    | LA| US LA in da |
    | client|this client has i...|
    | client|our client has do...|
    +-------+--------------------+
    +--------------------+----------+
    | Text| Keywords|
    +--------------------+----------+
    |the client as ada...|client;ada|
    |this client has l...| client;LA|
    +--------------------+----------+


    In df_whitelist, each keyword corresponds to a set of terms, e.g. keyword LA corresponds to “LA city” and “US LA in da”.
    In df_text, I have text and some keywords found in this text.
    What I want to do is that for each piece of text, such as “the client has ada..”, for each of its keywords “client” and “ada”, check through all the whitelist terms for that keyword, to see how many times the term occurred in the text.
    what I have tried is like following:



    import pyspark.sql.functions as F
    import pyspark.sql.types as T
    import re
    def whitelisting(text,listOfKeyword,df_whitelist):
    keywords = listOfKeyword.split(";")
    found_whiteterms_count = 0
    for k in keywords:
    if df_whitelist.filter(df_whitelist.keyword == k).count() == 0:
    found_whiteterms_count = found_whiteterms_count + 0
    else:
    df = df_whitelist.filter(df_whitelist.keyword == k).select("whitelist_terms")
    n = df.rdd.map(lambda x:len(re.findall(x["whitelist_terms"],text))).reduce(lambda x, y: x+y)
    found_whiteterms_count = found_whiteterms_count + n
    return found_whiteterms_count
    whitelisting_udf = F.udf(lambda text,listOfKeyword: whitelisting(text,listOfKeyword,df_whitelist),T.IntegerType())
    text.withColumn("whitelist_counts", whitelisting_udf(text.Text,text.Keywords))


    and I got the error:



    PicklingError: Could not serialize object: Py4JError: An error occurred while calling o1153.__getstate__. Trace:
    py4j.Py4JException: Method __getstate__() does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:272)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.base/java.lang.Thread.run(Thread.java:844)


    I can not figure out after trying for some time. Could anybody kindly help to point out the problem and how to fix it. Thanks.










    share|improve this question
























      up vote
      4
      down vote

      favorite
      2









      up vote
      4
      down vote

      favorite
      2






      2





      I have the following two data frames: df_whitelist and df_text



      +-------+--------------------+
      |keyword| whitelist_terms |
      +-------+--------------------+
      | LA| LA city|
      | LA| US LA in da |
      | client|this client has i...|
      | client|our client has do...|
      +-------+--------------------+
      +--------------------+----------+
      | Text| Keywords|
      +--------------------+----------+
      |the client as ada...|client;ada|
      |this client has l...| client;LA|
      +--------------------+----------+


      In df_whitelist, each keyword corresponds to a set of terms, e.g. keyword LA corresponds to “LA city” and “US LA in da”.
      In df_text, I have text and some keywords found in this text.
      What I want to do is that for each piece of text, such as “the client has ada..”, for each of its keywords “client” and “ada”, check through all the whitelist terms for that keyword, to see how many times the term occurred in the text.
      what I have tried is like following:



      import pyspark.sql.functions as F
      import pyspark.sql.types as T
      import re
      def whitelisting(text,listOfKeyword,df_whitelist):
      keywords = listOfKeyword.split(";")
      found_whiteterms_count = 0
      for k in keywords:
      if df_whitelist.filter(df_whitelist.keyword == k).count() == 0:
      found_whiteterms_count = found_whiteterms_count + 0
      else:
      df = df_whitelist.filter(df_whitelist.keyword == k).select("whitelist_terms")
      n = df.rdd.map(lambda x:len(re.findall(x["whitelist_terms"],text))).reduce(lambda x, y: x+y)
      found_whiteterms_count = found_whiteterms_count + n
      return found_whiteterms_count
      whitelisting_udf = F.udf(lambda text,listOfKeyword: whitelisting(text,listOfKeyword,df_whitelist),T.IntegerType())
      text.withColumn("whitelist_counts", whitelisting_udf(text.Text,text.Keywords))


      and I got the error:



      PicklingError: Could not serialize object: Py4JError: An error occurred while calling o1153.__getstate__. Trace:
      py4j.Py4JException: Method __getstate__() does not exist
      at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
      at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
      at py4j.Gateway.invoke(Gateway.java:272)
      at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
      at py4j.commands.CallCommand.execute(CallCommand.java:79)
      at py4j.GatewayConnection.run(GatewayConnection.java:214)
      at java.base/java.lang.Thread.run(Thread.java:844)


      I can not figure out after trying for some time. Could anybody kindly help to point out the problem and how to fix it. Thanks.










      share|improve this question













      I have the following two data frames: df_whitelist and df_text



      +-------+--------------------+
      |keyword| whitelist_terms |
      +-------+--------------------+
      | LA| LA city|
      | LA| US LA in da |
      | client|this client has i...|
      | client|our client has do...|
      +-------+--------------------+
      +--------------------+----------+
      | Text| Keywords|
      +--------------------+----------+
      |the client as ada...|client;ada|
      |this client has l...| client;LA|
      +--------------------+----------+


      In df_whitelist, each keyword corresponds to a set of terms, e.g. keyword LA corresponds to “LA city” and “US LA in da”.
      In df_text, I have text and some keywords found in this text.
      What I want to do is that for each piece of text, such as “the client has ada..”, for each of its keywords “client” and “ada”, check through all the whitelist terms for that keyword, to see how many times the term occurred in the text.
      what I have tried is like following:



      import pyspark.sql.functions as F
      import pyspark.sql.types as T
      import re
      def whitelisting(text,listOfKeyword,df_whitelist):
      keywords = listOfKeyword.split(";")
      found_whiteterms_count = 0
      for k in keywords:
      if df_whitelist.filter(df_whitelist.keyword == k).count() == 0:
      found_whiteterms_count = found_whiteterms_count + 0
      else:
      df = df_whitelist.filter(df_whitelist.keyword == k).select("whitelist_terms")
      n = df.rdd.map(lambda x:len(re.findall(x["whitelist_terms"],text))).reduce(lambda x, y: x+y)
      found_whiteterms_count = found_whiteterms_count + n
      return found_whiteterms_count
      whitelisting_udf = F.udf(lambda text,listOfKeyword: whitelisting(text,listOfKeyword,df_whitelist),T.IntegerType())
      text.withColumn("whitelist_counts", whitelisting_udf(text.Text,text.Keywords))


      and I got the error:



      PicklingError: Could not serialize object: Py4JError: An error occurred while calling o1153.__getstate__. Trace:
      py4j.Py4JException: Method __getstate__() does not exist
      at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
      at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
      at py4j.Gateway.invoke(Gateway.java:272)
      at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
      at py4j.commands.CallCommand.execute(CallCommand.java:79)
      at py4j.GatewayConnection.run(GatewayConnection.java:214)
      at java.base/java.lang.Thread.run(Thread.java:844)


      I can not figure out after trying for some time. Could anybody kindly help to point out the problem and how to fix it. Thanks.







      pyspark pickle user-defined-functions






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 12 '17 at 13:21









      user1740987

      2814




      2814
























          1 Answer
          1






          active

          oldest

          votes

















          up vote
          4
          down vote



          accepted










          You are passing a pyspark dataframe, df_whitelist to a UDF, pyspark dataframes cannot be pickled. You are also doing computations on a dataframe inside a UDF which is not acceptable (not possible). Keep in mind that your function is going to be called as many times as the number of rows in your dataframe, so you should keep computations simple. and do it only if it couldn't be done with pyspark sql functions.



          What you need to do instead, is to join the two dataframes on keyword.
          Let's start with the two sample dataframes you provided:



          df_whitelist = spark.createDataFrame(
          [["LA", "LA city"], ["LA", "US LA in da"], ["client", "this client has i"], ["client", "our client"]],
          ["keyword", "whitelist_terms"])
          df_text = spark.createDataFrame(
          [["the client as ada", "client;ada"], ["this client has l", "client;LA"]],
          ["Text", "Keywords"])


          Column Keywords in df_text needs some processing, we have to transform the string into an array and then explode it so that we only have one item per line:



          import pyspark.sql.functions as F
          df_text = df_text.select("Text", F.explode(F.split("Keywords", ";")).alias("keyword"))

          +-----------------+-------+
          | Text|keyword|
          +-----------------+-------+
          |the client as ada| client|
          |the client as ada| ada|
          |this client has l| client|
          |this client has l| LA|
          +-----------------+-------+


          Now we can join the two data frames on keyword:



          df = df_text.join(df_whitelist, "keyword", "leftouter")

          +-------+-----------------+-----------------+
          |keyword| Text| whitelist_terms|
          +-------+-----------------+-----------------+
          | LA|this client has l| LA city|
          | LA|this client has l| US LA in da|
          | ada|the client as ada| null|
          | client|the client as ada|this client has i|
          | client|the client as ada| our client|
          | client|this client has l|this client has i|
          | client|this client has l| our client|
          +-------+-----------------+-----------------+



          • The first condition you invoke in your UDF can be translated as follows: if keyword in df_text is not present in df_whitelist then 0. It is equivalent to saying the value for df_whitelist columns are going to be NULL in the left join since they only appear in the left data frame


          • The second condition: you count the number of times whitelist_terms appear in Text: Text.count(whitelist_terms)



          We'll write a UDF to do this:



          from pyspark.sql.types import IntegerType
          count_terms = F.udf(lambda Text, term: Text.count(term) if term is not None else 0, IntegerType())
          df = df.select(
          "Text",
          "keyword",
          F.when(F.isnull("whitelist_terms"), 0).otherwise(count_terms("Text", "whitelist_terms")).alias("whitelist_counts"))

          +-----------------+-------+----------------+
          | Text|keyword|whitelist_counts|
          +-----------------+-------+----------------+
          |this client has l| LA| 0|
          |this client has l| LA| 0|
          |the client as ada| ada| 0|
          |the client as ada| client| 0|
          |the client as ada| client| 0|
          |this client has l| client| 0|
          |this client has l| client| 0|
          +-----------------+-------+----------------+


          Finally we can aggregate to get back to a dataframe with only distinct Text:



          res = df.groupBy("Text").agg(
          F.collect_set("keyword").alias("Keywords"),
          F.sum("whitelist_counts").alias("whitelist_counts"))
          res.show()

          +-----------------+-------------+----------------+
          | Text| Keywords|whitelist_counts|
          +-----------------+-------------+----------------+
          |this client has l| [client, LA]| 0|
          |the client as ada|[ada, client]| 0|
          +-----------------+-------------+----------------+





          share|improve this answer





















          • Many many thanks MaFF, much appreciate the neat solution with clear explanation. Just one point, given that there are millions of text items, 50 keywords, and each keyword might be with a thousand terms. Would be the Cartesian join most efficient solution? Or is there still some alternatives in spark for better efficiency?
            – user1740987
            Nov 12 '17 at 16:47










          • I am not sure what you mean exactly. A cartesian product will be slower than a join with a key since you would drastically increase the number of lines in your resulting data frame. If one of your dataframes is "small" you should consider broadcasting it (pyspark.sql.functions.broadcast) inside the join expression, it will increase efficiency
            – MaFF
            Nov 12 '17 at 17:44











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f47249292%2fpyspark-picklingerror-could-not-serialize-object%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes








          up vote
          4
          down vote



          accepted










          You are passing a pyspark dataframe, df_whitelist to a UDF, pyspark dataframes cannot be pickled. You are also doing computations on a dataframe inside a UDF which is not acceptable (not possible). Keep in mind that your function is going to be called as many times as the number of rows in your dataframe, so you should keep computations simple. and do it only if it couldn't be done with pyspark sql functions.



          What you need to do instead, is to join the two dataframes on keyword.
          Let's start with the two sample dataframes you provided:



          df_whitelist = spark.createDataFrame(
          [["LA", "LA city"], ["LA", "US LA in da"], ["client", "this client has i"], ["client", "our client"]],
          ["keyword", "whitelist_terms"])
          df_text = spark.createDataFrame(
          [["the client as ada", "client;ada"], ["this client has l", "client;LA"]],
          ["Text", "Keywords"])


          Column Keywords in df_text needs some processing, we have to transform the string into an array and then explode it so that we only have one item per line:



          import pyspark.sql.functions as F
          df_text = df_text.select("Text", F.explode(F.split("Keywords", ";")).alias("keyword"))

          +-----------------+-------+
          | Text|keyword|
          +-----------------+-------+
          |the client as ada| client|
          |the client as ada| ada|
          |this client has l| client|
          |this client has l| LA|
          +-----------------+-------+


          Now we can join the two data frames on keyword:



          df = df_text.join(df_whitelist, "keyword", "leftouter")

          +-------+-----------------+-----------------+
          |keyword| Text| whitelist_terms|
          +-------+-----------------+-----------------+
          | LA|this client has l| LA city|
          | LA|this client has l| US LA in da|
          | ada|the client as ada| null|
          | client|the client as ada|this client has i|
          | client|the client as ada| our client|
          | client|this client has l|this client has i|
          | client|this client has l| our client|
          +-------+-----------------+-----------------+



          • The first condition you invoke in your UDF can be translated as follows: if keyword in df_text is not present in df_whitelist then 0. It is equivalent to saying the value for df_whitelist columns are going to be NULL in the left join since they only appear in the left data frame


          • The second condition: you count the number of times whitelist_terms appear in Text: Text.count(whitelist_terms)



          We'll write a UDF to do this:



          from pyspark.sql.types import IntegerType
          count_terms = F.udf(lambda Text, term: Text.count(term) if term is not None else 0, IntegerType())
          df = df.select(
          "Text",
          "keyword",
          F.when(F.isnull("whitelist_terms"), 0).otherwise(count_terms("Text", "whitelist_terms")).alias("whitelist_counts"))

          +-----------------+-------+----------------+
          | Text|keyword|whitelist_counts|
          +-----------------+-------+----------------+
          |this client has l| LA| 0|
          |this client has l| LA| 0|
          |the client as ada| ada| 0|
          |the client as ada| client| 0|
          |the client as ada| client| 0|
          |this client has l| client| 0|
          |this client has l| client| 0|
          +-----------------+-------+----------------+


          Finally we can aggregate to get back to a dataframe with only distinct Text:



          res = df.groupBy("Text").agg(
          F.collect_set("keyword").alias("Keywords"),
          F.sum("whitelist_counts").alias("whitelist_counts"))
          res.show()

          +-----------------+-------------+----------------+
          | Text| Keywords|whitelist_counts|
          +-----------------+-------------+----------------+
          |this client has l| [client, LA]| 0|
          |the client as ada|[ada, client]| 0|
          +-----------------+-------------+----------------+





          share|improve this answer





















          • Many many thanks MaFF, much appreciate the neat solution with clear explanation. Just one point, given that there are millions of text items, 50 keywords, and each keyword might be with a thousand terms. Would be the Cartesian join most efficient solution? Or is there still some alternatives in spark for better efficiency?
            – user1740987
            Nov 12 '17 at 16:47










          • I am not sure what you mean exactly. A cartesian product will be slower than a join with a key since you would drastically increase the number of lines in your resulting data frame. If one of your dataframes is "small" you should consider broadcasting it (pyspark.sql.functions.broadcast) inside the join expression, it will increase efficiency
            – MaFF
            Nov 12 '17 at 17:44















          up vote
          4
          down vote



          accepted










          You are passing a pyspark dataframe, df_whitelist to a UDF, pyspark dataframes cannot be pickled. You are also doing computations on a dataframe inside a UDF which is not acceptable (not possible). Keep in mind that your function is going to be called as many times as the number of rows in your dataframe, so you should keep computations simple. and do it only if it couldn't be done with pyspark sql functions.



          What you need to do instead, is to join the two dataframes on keyword.
          Let's start with the two sample dataframes you provided:



          df_whitelist = spark.createDataFrame(
          [["LA", "LA city"], ["LA", "US LA in da"], ["client", "this client has i"], ["client", "our client"]],
          ["keyword", "whitelist_terms"])
          df_text = spark.createDataFrame(
          [["the client as ada", "client;ada"], ["this client has l", "client;LA"]],
          ["Text", "Keywords"])


          Column Keywords in df_text needs some processing, we have to transform the string into an array and then explode it so that we only have one item per line:



          import pyspark.sql.functions as F
          df_text = df_text.select("Text", F.explode(F.split("Keywords", ";")).alias("keyword"))

          +-----------------+-------+
          | Text|keyword|
          +-----------------+-------+
          |the client as ada| client|
          |the client as ada| ada|
          |this client has l| client|
          |this client has l| LA|
          +-----------------+-------+


          Now we can join the two data frames on keyword:



          df = df_text.join(df_whitelist, "keyword", "leftouter")

          +-------+-----------------+-----------------+
          |keyword| Text| whitelist_terms|
          +-------+-----------------+-----------------+
          | LA|this client has l| LA city|
          | LA|this client has l| US LA in da|
          | ada|the client as ada| null|
          | client|the client as ada|this client has i|
          | client|the client as ada| our client|
          | client|this client has l|this client has i|
          | client|this client has l| our client|
          +-------+-----------------+-----------------+



          • The first condition you invoke in your UDF can be translated as follows: if keyword in df_text is not present in df_whitelist then 0. It is equivalent to saying the value for df_whitelist columns are going to be NULL in the left join since they only appear in the left data frame


          • The second condition: you count the number of times whitelist_terms appear in Text: Text.count(whitelist_terms)



          We'll write a UDF to do this:



          from pyspark.sql.types import IntegerType
          count_terms = F.udf(lambda Text, term: Text.count(term) if term is not None else 0, IntegerType())
          df = df.select(
          "Text",
          "keyword",
          F.when(F.isnull("whitelist_terms"), 0).otherwise(count_terms("Text", "whitelist_terms")).alias("whitelist_counts"))

          +-----------------+-------+----------------+
          | Text|keyword|whitelist_counts|
          +-----------------+-------+----------------+
          |this client has l| LA| 0|
          |this client has l| LA| 0|
          |the client as ada| ada| 0|
          |the client as ada| client| 0|
          |the client as ada| client| 0|
          |this client has l| client| 0|
          |this client has l| client| 0|
          +-----------------+-------+----------------+


          Finally we can aggregate to get back to a dataframe with only distinct Text:



          res = df.groupBy("Text").agg(
          F.collect_set("keyword").alias("Keywords"),
          F.sum("whitelist_counts").alias("whitelist_counts"))
          res.show()

          +-----------------+-------------+----------------+
          | Text| Keywords|whitelist_counts|
          +-----------------+-------------+----------------+
          |this client has l| [client, LA]| 0|
          |the client as ada|[ada, client]| 0|
          +-----------------+-------------+----------------+





          share|improve this answer





















          • Many many thanks MaFF, much appreciate the neat solution with clear explanation. Just one point, given that there are millions of text items, 50 keywords, and each keyword might be with a thousand terms. Would be the Cartesian join most efficient solution? Or is there still some alternatives in spark for better efficiency?
            – user1740987
            Nov 12 '17 at 16:47










          • I am not sure what you mean exactly. A cartesian product will be slower than a join with a key since you would drastically increase the number of lines in your resulting data frame. If one of your dataframes is "small" you should consider broadcasting it (pyspark.sql.functions.broadcast) inside the join expression, it will increase efficiency
            – MaFF
            Nov 12 '17 at 17:44













          up vote
          4
          down vote



          accepted







          up vote
          4
          down vote



          accepted






          You are passing a pyspark dataframe, df_whitelist to a UDF, pyspark dataframes cannot be pickled. You are also doing computations on a dataframe inside a UDF which is not acceptable (not possible). Keep in mind that your function is going to be called as many times as the number of rows in your dataframe, so you should keep computations simple. and do it only if it couldn't be done with pyspark sql functions.



          What you need to do instead, is to join the two dataframes on keyword.
          Let's start with the two sample dataframes you provided:



          df_whitelist = spark.createDataFrame(
          [["LA", "LA city"], ["LA", "US LA in da"], ["client", "this client has i"], ["client", "our client"]],
          ["keyword", "whitelist_terms"])
          df_text = spark.createDataFrame(
          [["the client as ada", "client;ada"], ["this client has l", "client;LA"]],
          ["Text", "Keywords"])


          Column Keywords in df_text needs some processing, we have to transform the string into an array and then explode it so that we only have one item per line:



          import pyspark.sql.functions as F
          df_text = df_text.select("Text", F.explode(F.split("Keywords", ";")).alias("keyword"))

          +-----------------+-------+
          | Text|keyword|
          +-----------------+-------+
          |the client as ada| client|
          |the client as ada| ada|
          |this client has l| client|
          |this client has l| LA|
          +-----------------+-------+


          Now we can join the two data frames on keyword:



          df = df_text.join(df_whitelist, "keyword", "leftouter")

          +-------+-----------------+-----------------+
          |keyword| Text| whitelist_terms|
          +-------+-----------------+-----------------+
          | LA|this client has l| LA city|
          | LA|this client has l| US LA in da|
          | ada|the client as ada| null|
          | client|the client as ada|this client has i|
          | client|the client as ada| our client|
          | client|this client has l|this client has i|
          | client|this client has l| our client|
          +-------+-----------------+-----------------+



          • The first condition you invoke in your UDF can be translated as follows: if keyword in df_text is not present in df_whitelist then 0. It is equivalent to saying the value for df_whitelist columns are going to be NULL in the left join since they only appear in the left data frame


          • The second condition: you count the number of times whitelist_terms appear in Text: Text.count(whitelist_terms)



          We'll write a UDF to do this:



          from pyspark.sql.types import IntegerType
          count_terms = F.udf(lambda Text, term: Text.count(term) if term is not None else 0, IntegerType())
          df = df.select(
          "Text",
          "keyword",
          F.when(F.isnull("whitelist_terms"), 0).otherwise(count_terms("Text", "whitelist_terms")).alias("whitelist_counts"))

          +-----------------+-------+----------------+
          | Text|keyword|whitelist_counts|
          +-----------------+-------+----------------+
          |this client has l| LA| 0|
          |this client has l| LA| 0|
          |the client as ada| ada| 0|
          |the client as ada| client| 0|
          |the client as ada| client| 0|
          |this client has l| client| 0|
          |this client has l| client| 0|
          +-----------------+-------+----------------+


          Finally we can aggregate to get back to a dataframe with only distinct Text:



          res = df.groupBy("Text").agg(
          F.collect_set("keyword").alias("Keywords"),
          F.sum("whitelist_counts").alias("whitelist_counts"))
          res.show()

          +-----------------+-------------+----------------+
          | Text| Keywords|whitelist_counts|
          +-----------------+-------------+----------------+
          |this client has l| [client, LA]| 0|
          |the client as ada|[ada, client]| 0|
          +-----------------+-------------+----------------+





          share|improve this answer












          You are passing a pyspark dataframe, df_whitelist to a UDF, pyspark dataframes cannot be pickled. You are also doing computations on a dataframe inside a UDF which is not acceptable (not possible). Keep in mind that your function is going to be called as many times as the number of rows in your dataframe, so you should keep computations simple. and do it only if it couldn't be done with pyspark sql functions.



          What you need to do instead, is to join the two dataframes on keyword.
          Let's start with the two sample dataframes you provided:



          df_whitelist = spark.createDataFrame(
          [["LA", "LA city"], ["LA", "US LA in da"], ["client", "this client has i"], ["client", "our client"]],
          ["keyword", "whitelist_terms"])
          df_text = spark.createDataFrame(
          [["the client as ada", "client;ada"], ["this client has l", "client;LA"]],
          ["Text", "Keywords"])


          Column Keywords in df_text needs some processing, we have to transform the string into an array and then explode it so that we only have one item per line:



          import pyspark.sql.functions as F
          df_text = df_text.select("Text", F.explode(F.split("Keywords", ";")).alias("keyword"))

          +-----------------+-------+
          | Text|keyword|
          +-----------------+-------+
          |the client as ada| client|
          |the client as ada| ada|
          |this client has l| client|
          |this client has l| LA|
          +-----------------+-------+


          Now we can join the two data frames on keyword:



          df = df_text.join(df_whitelist, "keyword", "leftouter")

          +-------+-----------------+-----------------+
          |keyword| Text| whitelist_terms|
          +-------+-----------------+-----------------+
          | LA|this client has l| LA city|
          | LA|this client has l| US LA in da|
          | ada|the client as ada| null|
          | client|the client as ada|this client has i|
          | client|the client as ada| our client|
          | client|this client has l|this client has i|
          | client|this client has l| our client|
          +-------+-----------------+-----------------+



          • The first condition you invoke in your UDF can be translated as follows: if keyword in df_text is not present in df_whitelist then 0. It is equivalent to saying the value for df_whitelist columns are going to be NULL in the left join since they only appear in the left data frame


          • The second condition: you count the number of times whitelist_terms appear in Text: Text.count(whitelist_terms)



          We'll write a UDF to do this:



          from pyspark.sql.types import IntegerType
          count_terms = F.udf(lambda Text, term: Text.count(term) if term is not None else 0, IntegerType())
          df = df.select(
          "Text",
          "keyword",
          F.when(F.isnull("whitelist_terms"), 0).otherwise(count_terms("Text", "whitelist_terms")).alias("whitelist_counts"))

          +-----------------+-------+----------------+
          | Text|keyword|whitelist_counts|
          +-----------------+-------+----------------+
          |this client has l| LA| 0|
          |this client has l| LA| 0|
          |the client as ada| ada| 0|
          |the client as ada| client| 0|
          |the client as ada| client| 0|
          |this client has l| client| 0|
          |this client has l| client| 0|
          +-----------------+-------+----------------+


          Finally we can aggregate to get back to a dataframe with only distinct Text:



          res = df.groupBy("Text").agg(
          F.collect_set("keyword").alias("Keywords"),
          F.sum("whitelist_counts").alias("whitelist_counts"))
          res.show()

          +-----------------+-------------+----------------+
          | Text| Keywords|whitelist_counts|
          +-----------------+-------------+----------------+
          |this client has l| [client, LA]| 0|
          |the client as ada|[ada, client]| 0|
          +-----------------+-------------+----------------+






          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 12 '17 at 15:46









          MaFF

          3,2881515




          3,2881515












          • Many many thanks MaFF, much appreciate the neat solution with clear explanation. Just one point, given that there are millions of text items, 50 keywords, and each keyword might be with a thousand terms. Would be the Cartesian join most efficient solution? Or is there still some alternatives in spark for better efficiency?
            – user1740987
            Nov 12 '17 at 16:47










          • I am not sure what you mean exactly. A cartesian product will be slower than a join with a key since you would drastically increase the number of lines in your resulting data frame. If one of your dataframes is "small" you should consider broadcasting it (pyspark.sql.functions.broadcast) inside the join expression, it will increase efficiency
            – MaFF
            Nov 12 '17 at 17:44


















          • Many many thanks MaFF, much appreciate the neat solution with clear explanation. Just one point, given that there are millions of text items, 50 keywords, and each keyword might be with a thousand terms. Would be the Cartesian join most efficient solution? Or is there still some alternatives in spark for better efficiency?
            – user1740987
            Nov 12 '17 at 16:47










          • I am not sure what you mean exactly. A cartesian product will be slower than a join with a key since you would drastically increase the number of lines in your resulting data frame. If one of your dataframes is "small" you should consider broadcasting it (pyspark.sql.functions.broadcast) inside the join expression, it will increase efficiency
            – MaFF
            Nov 12 '17 at 17:44
















          Many many thanks MaFF, much appreciate the neat solution with clear explanation. Just one point, given that there are millions of text items, 50 keywords, and each keyword might be with a thousand terms. Would be the Cartesian join most efficient solution? Or is there still some alternatives in spark for better efficiency?
          – user1740987
          Nov 12 '17 at 16:47




          Many many thanks MaFF, much appreciate the neat solution with clear explanation. Just one point, given that there are millions of text items, 50 keywords, and each keyword might be with a thousand terms. Would be the Cartesian join most efficient solution? Or is there still some alternatives in spark for better efficiency?
          – user1740987
          Nov 12 '17 at 16:47












          I am not sure what you mean exactly. A cartesian product will be slower than a join with a key since you would drastically increase the number of lines in your resulting data frame. If one of your dataframes is "small" you should consider broadcasting it (pyspark.sql.functions.broadcast) inside the join expression, it will increase efficiency
          – MaFF
          Nov 12 '17 at 17:44




          I am not sure what you mean exactly. A cartesian product will be slower than a join with a key since you would drastically increase the number of lines in your resulting data frame. If one of your dataframes is "small" you should consider broadcasting it (pyspark.sql.functions.broadcast) inside the join expression, it will increase efficiency
          – MaFF
          Nov 12 '17 at 17:44


















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.





          Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


          Please pay close attention to the following guidance:


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f47249292%2fpyspark-picklingerror-could-not-serialize-object%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Full-time equivalent

          さくらももこ

          13 indicted, 8 arrested in Calif. drug cartel investigation