How to use PySpark to stream data into MySQL database?












0















I am currently working on a single page web app that allows users to upload large CSV files (currently testing a ~7GB file) to a flask server and then stream that dataset to a database. The upload takes about a minute and the file gets fully saved to a temporary file on the flask server. Now I need to be able to stream this file and store it into a database. I did some research and found that PySpark is great for streaming data and I am choosing MySQL as the database to stream the CSV data into (but I am open to other dbs and streaming methods). I am a junior dev and new to PySpark so I'm not sure how to go about this. The Spark streaming guide says that data must ingested through a source like Kafka, Flume, TCP socets, etc so I am wondering if I have to use any of those methods to get my CSV file into Spark. However, I came across this great example where they are streaming csv data into Azure SQL database and it looks like they are just reading the file directly using Spark without needing to ingest it through a streaming source like Kafka, etc. The only thing that confuses me with that example is that they are using a HDInsight Spark cluster to stream data into the db and I'm not sure how to incorporate that all with a flask server. I appologize for the lack of code but currently I just have a flask server file with one route doing the file upload. Any examples, tutorials, or advice would be greatly appreciated.










share|improve this question





























    0















    I am currently working on a single page web app that allows users to upload large CSV files (currently testing a ~7GB file) to a flask server and then stream that dataset to a database. The upload takes about a minute and the file gets fully saved to a temporary file on the flask server. Now I need to be able to stream this file and store it into a database. I did some research and found that PySpark is great for streaming data and I am choosing MySQL as the database to stream the CSV data into (but I am open to other dbs and streaming methods). I am a junior dev and new to PySpark so I'm not sure how to go about this. The Spark streaming guide says that data must ingested through a source like Kafka, Flume, TCP socets, etc so I am wondering if I have to use any of those methods to get my CSV file into Spark. However, I came across this great example where they are streaming csv data into Azure SQL database and it looks like they are just reading the file directly using Spark without needing to ingest it through a streaming source like Kafka, etc. The only thing that confuses me with that example is that they are using a HDInsight Spark cluster to stream data into the db and I'm not sure how to incorporate that all with a flask server. I appologize for the lack of code but currently I just have a flask server file with one route doing the file upload. Any examples, tutorials, or advice would be greatly appreciated.










    share|improve this question



























      0












      0








      0


      0






      I am currently working on a single page web app that allows users to upload large CSV files (currently testing a ~7GB file) to a flask server and then stream that dataset to a database. The upload takes about a minute and the file gets fully saved to a temporary file on the flask server. Now I need to be able to stream this file and store it into a database. I did some research and found that PySpark is great for streaming data and I am choosing MySQL as the database to stream the CSV data into (but I am open to other dbs and streaming methods). I am a junior dev and new to PySpark so I'm not sure how to go about this. The Spark streaming guide says that data must ingested through a source like Kafka, Flume, TCP socets, etc so I am wondering if I have to use any of those methods to get my CSV file into Spark. However, I came across this great example where they are streaming csv data into Azure SQL database and it looks like they are just reading the file directly using Spark without needing to ingest it through a streaming source like Kafka, etc. The only thing that confuses me with that example is that they are using a HDInsight Spark cluster to stream data into the db and I'm not sure how to incorporate that all with a flask server. I appologize for the lack of code but currently I just have a flask server file with one route doing the file upload. Any examples, tutorials, or advice would be greatly appreciated.










      share|improve this question
















      I am currently working on a single page web app that allows users to upload large CSV files (currently testing a ~7GB file) to a flask server and then stream that dataset to a database. The upload takes about a minute and the file gets fully saved to a temporary file on the flask server. Now I need to be able to stream this file and store it into a database. I did some research and found that PySpark is great for streaming data and I am choosing MySQL as the database to stream the CSV data into (but I am open to other dbs and streaming methods). I am a junior dev and new to PySpark so I'm not sure how to go about this. The Spark streaming guide says that data must ingested through a source like Kafka, Flume, TCP socets, etc so I am wondering if I have to use any of those methods to get my CSV file into Spark. However, I came across this great example where they are streaming csv data into Azure SQL database and it looks like they are just reading the file directly using Spark without needing to ingest it through a streaming source like Kafka, etc. The only thing that confuses me with that example is that they are using a HDInsight Spark cluster to stream data into the db and I'm not sure how to incorporate that all with a flask server. I appologize for the lack of code but currently I just have a flask server file with one route doing the file upload. Any examples, tutorials, or advice would be greatly appreciated.







      python mysql flask pyspark spark-streaming






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 12 '18 at 19:13







      Mario

















      asked Nov 12 '18 at 18:50









      MarioMario

      42




      42
























          1 Answer
          1






          active

          oldest

          votes


















          0














          I am not sure about the streaming part but spark can handle large files efficiently - and storing to a db table will be done in parallel, so without much knowledge about your details, and provided that you have the uploaded file on your server, I'd say that:



          If I wanted to save a big structured file like a csv in a table, I would start like this:



          # start with some basic spark configuration, e.g. we want the timezone to be UTC 
          conf = SparkConf()
          conf.set('spark.sql.session.timeZone', 'UTC')
          # this is important: you need to have the mysql connector jar for the right mysql version:
          conf.set('jars', 'path to mysql connector jar you can download from here: https://dev.mysql.com/downloads/connector/odbc/')
          # instantiate a spark session: the first time it will take a few seconds
          spark = SparkSession.builder
          .config(conf=conf)
          .appName('Huge File uploader')
          .getOrCreate()

          # read the file first as a dataframe
          df = spark.read.csv('path to 7GB/ huge csv file')

          # optionally, add a filename column
          from pyspark.sql import functions as F
          df = df.withColumn('filename', F.lit('thecurrentfilename'))

          # write it to the table
          df.write.format('jdbc').options(
          url='e.g. localhost:port',
          driver='com.mysql.cj.jdbc.Driver', # the driver for MySQL
          dbtable='the table name to save to',
          user='user',
          password='secret',
          ).mode('append').save()


          Note the mode 'append' here: the catch in this is that spark cannot perform updates on a table, it is either append the new rows or replace what is in the table.



          So, if your csv is like this:



          id, name, address....


          You will end up with a table with the same fields.



          This is the most basic example I could think of so that you start with spark, with no considerations about a spark cluster or anything else related. I would suggest you take this for a spin and figure out if this suits your needs :)



          Also, keep in mind that this might take a few seconds or more depending on your data, where the database is located, your machine and your database load, so it might be a good idea to keep things asynchronous with your api, again I don't know about any of your other details.



          Hope this helps. Good luck!






          share|improve this answer
























          • Thank you for your detailed response! @mkaran I gave it a shot but it gives me an error at the last step on df.write() method saying Row size too large (> 8126). Changing some columns to TEXT or BLOB may help. In current row format, BLOB prefix of 0 bytes is stored inline.

            – Mario
            Nov 15 '18 at 16:39













          • It looks like it is a MySQL error. I was using MySQL version 8.0 and tried everything suggested on this post and on other posts and I also restarted my computer and MySQL server several times but nothing worked. I also downgraded to MySQL verison 5.7 but I was still getting the same error.

            – Mario
            Nov 15 '18 at 16:44













          • I cannot seem to write into my MySQL database so my only other option would be to use another db that pyspark can connect to.

            – Mario
            Nov 15 '18 at 16:46











          • Hi @Mario, can you please tell me what the output of print(len(df.columns)) and print(df.count()) is after the read.csv?

            – mkaran
            Nov 16 '18 at 12:26













          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53268342%2fhow-to-use-pyspark-to-stream-data-into-mysql-database%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          0














          I am not sure about the streaming part but spark can handle large files efficiently - and storing to a db table will be done in parallel, so without much knowledge about your details, and provided that you have the uploaded file on your server, I'd say that:



          If I wanted to save a big structured file like a csv in a table, I would start like this:



          # start with some basic spark configuration, e.g. we want the timezone to be UTC 
          conf = SparkConf()
          conf.set('spark.sql.session.timeZone', 'UTC')
          # this is important: you need to have the mysql connector jar for the right mysql version:
          conf.set('jars', 'path to mysql connector jar you can download from here: https://dev.mysql.com/downloads/connector/odbc/')
          # instantiate a spark session: the first time it will take a few seconds
          spark = SparkSession.builder
          .config(conf=conf)
          .appName('Huge File uploader')
          .getOrCreate()

          # read the file first as a dataframe
          df = spark.read.csv('path to 7GB/ huge csv file')

          # optionally, add a filename column
          from pyspark.sql import functions as F
          df = df.withColumn('filename', F.lit('thecurrentfilename'))

          # write it to the table
          df.write.format('jdbc').options(
          url='e.g. localhost:port',
          driver='com.mysql.cj.jdbc.Driver', # the driver for MySQL
          dbtable='the table name to save to',
          user='user',
          password='secret',
          ).mode('append').save()


          Note the mode 'append' here: the catch in this is that spark cannot perform updates on a table, it is either append the new rows or replace what is in the table.



          So, if your csv is like this:



          id, name, address....


          You will end up with a table with the same fields.



          This is the most basic example I could think of so that you start with spark, with no considerations about a spark cluster or anything else related. I would suggest you take this for a spin and figure out if this suits your needs :)



          Also, keep in mind that this might take a few seconds or more depending on your data, where the database is located, your machine and your database load, so it might be a good idea to keep things asynchronous with your api, again I don't know about any of your other details.



          Hope this helps. Good luck!






          share|improve this answer
























          • Thank you for your detailed response! @mkaran I gave it a shot but it gives me an error at the last step on df.write() method saying Row size too large (> 8126). Changing some columns to TEXT or BLOB may help. In current row format, BLOB prefix of 0 bytes is stored inline.

            – Mario
            Nov 15 '18 at 16:39













          • It looks like it is a MySQL error. I was using MySQL version 8.0 and tried everything suggested on this post and on other posts and I also restarted my computer and MySQL server several times but nothing worked. I also downgraded to MySQL verison 5.7 but I was still getting the same error.

            – Mario
            Nov 15 '18 at 16:44













          • I cannot seem to write into my MySQL database so my only other option would be to use another db that pyspark can connect to.

            – Mario
            Nov 15 '18 at 16:46











          • Hi @Mario, can you please tell me what the output of print(len(df.columns)) and print(df.count()) is after the read.csv?

            – mkaran
            Nov 16 '18 at 12:26


















          0














          I am not sure about the streaming part but spark can handle large files efficiently - and storing to a db table will be done in parallel, so without much knowledge about your details, and provided that you have the uploaded file on your server, I'd say that:



          If I wanted to save a big structured file like a csv in a table, I would start like this:



          # start with some basic spark configuration, e.g. we want the timezone to be UTC 
          conf = SparkConf()
          conf.set('spark.sql.session.timeZone', 'UTC')
          # this is important: you need to have the mysql connector jar for the right mysql version:
          conf.set('jars', 'path to mysql connector jar you can download from here: https://dev.mysql.com/downloads/connector/odbc/')
          # instantiate a spark session: the first time it will take a few seconds
          spark = SparkSession.builder
          .config(conf=conf)
          .appName('Huge File uploader')
          .getOrCreate()

          # read the file first as a dataframe
          df = spark.read.csv('path to 7GB/ huge csv file')

          # optionally, add a filename column
          from pyspark.sql import functions as F
          df = df.withColumn('filename', F.lit('thecurrentfilename'))

          # write it to the table
          df.write.format('jdbc').options(
          url='e.g. localhost:port',
          driver='com.mysql.cj.jdbc.Driver', # the driver for MySQL
          dbtable='the table name to save to',
          user='user',
          password='secret',
          ).mode('append').save()


          Note the mode 'append' here: the catch in this is that spark cannot perform updates on a table, it is either append the new rows or replace what is in the table.



          So, if your csv is like this:



          id, name, address....


          You will end up with a table with the same fields.



          This is the most basic example I could think of so that you start with spark, with no considerations about a spark cluster or anything else related. I would suggest you take this for a spin and figure out if this suits your needs :)



          Also, keep in mind that this might take a few seconds or more depending on your data, where the database is located, your machine and your database load, so it might be a good idea to keep things asynchronous with your api, again I don't know about any of your other details.



          Hope this helps. Good luck!






          share|improve this answer
























          • Thank you for your detailed response! @mkaran I gave it a shot but it gives me an error at the last step on df.write() method saying Row size too large (> 8126). Changing some columns to TEXT or BLOB may help. In current row format, BLOB prefix of 0 bytes is stored inline.

            – Mario
            Nov 15 '18 at 16:39













          • It looks like it is a MySQL error. I was using MySQL version 8.0 and tried everything suggested on this post and on other posts and I also restarted my computer and MySQL server several times but nothing worked. I also downgraded to MySQL verison 5.7 but I was still getting the same error.

            – Mario
            Nov 15 '18 at 16:44













          • I cannot seem to write into my MySQL database so my only other option would be to use another db that pyspark can connect to.

            – Mario
            Nov 15 '18 at 16:46











          • Hi @Mario, can you please tell me what the output of print(len(df.columns)) and print(df.count()) is after the read.csv?

            – mkaran
            Nov 16 '18 at 12:26
















          0












          0








          0







          I am not sure about the streaming part but spark can handle large files efficiently - and storing to a db table will be done in parallel, so without much knowledge about your details, and provided that you have the uploaded file on your server, I'd say that:



          If I wanted to save a big structured file like a csv in a table, I would start like this:



          # start with some basic spark configuration, e.g. we want the timezone to be UTC 
          conf = SparkConf()
          conf.set('spark.sql.session.timeZone', 'UTC')
          # this is important: you need to have the mysql connector jar for the right mysql version:
          conf.set('jars', 'path to mysql connector jar you can download from here: https://dev.mysql.com/downloads/connector/odbc/')
          # instantiate a spark session: the first time it will take a few seconds
          spark = SparkSession.builder
          .config(conf=conf)
          .appName('Huge File uploader')
          .getOrCreate()

          # read the file first as a dataframe
          df = spark.read.csv('path to 7GB/ huge csv file')

          # optionally, add a filename column
          from pyspark.sql import functions as F
          df = df.withColumn('filename', F.lit('thecurrentfilename'))

          # write it to the table
          df.write.format('jdbc').options(
          url='e.g. localhost:port',
          driver='com.mysql.cj.jdbc.Driver', # the driver for MySQL
          dbtable='the table name to save to',
          user='user',
          password='secret',
          ).mode('append').save()


          Note the mode 'append' here: the catch in this is that spark cannot perform updates on a table, it is either append the new rows or replace what is in the table.



          So, if your csv is like this:



          id, name, address....


          You will end up with a table with the same fields.



          This is the most basic example I could think of so that you start with spark, with no considerations about a spark cluster or anything else related. I would suggest you take this for a spin and figure out if this suits your needs :)



          Also, keep in mind that this might take a few seconds or more depending on your data, where the database is located, your machine and your database load, so it might be a good idea to keep things asynchronous with your api, again I don't know about any of your other details.



          Hope this helps. Good luck!






          share|improve this answer













          I am not sure about the streaming part but spark can handle large files efficiently - and storing to a db table will be done in parallel, so without much knowledge about your details, and provided that you have the uploaded file on your server, I'd say that:



          If I wanted to save a big structured file like a csv in a table, I would start like this:



          # start with some basic spark configuration, e.g. we want the timezone to be UTC 
          conf = SparkConf()
          conf.set('spark.sql.session.timeZone', 'UTC')
          # this is important: you need to have the mysql connector jar for the right mysql version:
          conf.set('jars', 'path to mysql connector jar you can download from here: https://dev.mysql.com/downloads/connector/odbc/')
          # instantiate a spark session: the first time it will take a few seconds
          spark = SparkSession.builder
          .config(conf=conf)
          .appName('Huge File uploader')
          .getOrCreate()

          # read the file first as a dataframe
          df = spark.read.csv('path to 7GB/ huge csv file')

          # optionally, add a filename column
          from pyspark.sql import functions as F
          df = df.withColumn('filename', F.lit('thecurrentfilename'))

          # write it to the table
          df.write.format('jdbc').options(
          url='e.g. localhost:port',
          driver='com.mysql.cj.jdbc.Driver', # the driver for MySQL
          dbtable='the table name to save to',
          user='user',
          password='secret',
          ).mode('append').save()


          Note the mode 'append' here: the catch in this is that spark cannot perform updates on a table, it is either append the new rows or replace what is in the table.



          So, if your csv is like this:



          id, name, address....


          You will end up with a table with the same fields.



          This is the most basic example I could think of so that you start with spark, with no considerations about a spark cluster or anything else related. I would suggest you take this for a spin and figure out if this suits your needs :)



          Also, keep in mind that this might take a few seconds or more depending on your data, where the database is located, your machine and your database load, so it might be a good idea to keep things asynchronous with your api, again I don't know about any of your other details.



          Hope this helps. Good luck!







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 13 '18 at 11:18









          mkaranmkaran

          1,4441217




          1,4441217













          • Thank you for your detailed response! @mkaran I gave it a shot but it gives me an error at the last step on df.write() method saying Row size too large (> 8126). Changing some columns to TEXT or BLOB may help. In current row format, BLOB prefix of 0 bytes is stored inline.

            – Mario
            Nov 15 '18 at 16:39













          • It looks like it is a MySQL error. I was using MySQL version 8.0 and tried everything suggested on this post and on other posts and I also restarted my computer and MySQL server several times but nothing worked. I also downgraded to MySQL verison 5.7 but I was still getting the same error.

            – Mario
            Nov 15 '18 at 16:44













          • I cannot seem to write into my MySQL database so my only other option would be to use another db that pyspark can connect to.

            – Mario
            Nov 15 '18 at 16:46











          • Hi @Mario, can you please tell me what the output of print(len(df.columns)) and print(df.count()) is after the read.csv?

            – mkaran
            Nov 16 '18 at 12:26





















          • Thank you for your detailed response! @mkaran I gave it a shot but it gives me an error at the last step on df.write() method saying Row size too large (> 8126). Changing some columns to TEXT or BLOB may help. In current row format, BLOB prefix of 0 bytes is stored inline.

            – Mario
            Nov 15 '18 at 16:39













          • It looks like it is a MySQL error. I was using MySQL version 8.0 and tried everything suggested on this post and on other posts and I also restarted my computer and MySQL server several times but nothing worked. I also downgraded to MySQL verison 5.7 but I was still getting the same error.

            – Mario
            Nov 15 '18 at 16:44













          • I cannot seem to write into my MySQL database so my only other option would be to use another db that pyspark can connect to.

            – Mario
            Nov 15 '18 at 16:46











          • Hi @Mario, can you please tell me what the output of print(len(df.columns)) and print(df.count()) is after the read.csv?

            – mkaran
            Nov 16 '18 at 12:26



















          Thank you for your detailed response! @mkaran I gave it a shot but it gives me an error at the last step on df.write() method saying Row size too large (> 8126). Changing some columns to TEXT or BLOB may help. In current row format, BLOB prefix of 0 bytes is stored inline.

          – Mario
          Nov 15 '18 at 16:39







          Thank you for your detailed response! @mkaran I gave it a shot but it gives me an error at the last step on df.write() method saying Row size too large (> 8126). Changing some columns to TEXT or BLOB may help. In current row format, BLOB prefix of 0 bytes is stored inline.

          – Mario
          Nov 15 '18 at 16:39















          It looks like it is a MySQL error. I was using MySQL version 8.0 and tried everything suggested on this post and on other posts and I also restarted my computer and MySQL server several times but nothing worked. I also downgraded to MySQL verison 5.7 but I was still getting the same error.

          – Mario
          Nov 15 '18 at 16:44







          It looks like it is a MySQL error. I was using MySQL version 8.0 and tried everything suggested on this post and on other posts and I also restarted my computer and MySQL server several times but nothing worked. I also downgraded to MySQL verison 5.7 but I was still getting the same error.

          – Mario
          Nov 15 '18 at 16:44















          I cannot seem to write into my MySQL database so my only other option would be to use another db that pyspark can connect to.

          – Mario
          Nov 15 '18 at 16:46





          I cannot seem to write into my MySQL database so my only other option would be to use another db that pyspark can connect to.

          – Mario
          Nov 15 '18 at 16:46













          Hi @Mario, can you please tell me what the output of print(len(df.columns)) and print(df.count()) is after the read.csv?

          – mkaran
          Nov 16 '18 at 12:26







          Hi @Mario, can you please tell me what the output of print(len(df.columns)) and print(df.count()) is after the read.csv?

          – mkaran
          Nov 16 '18 at 12:26




















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53268342%2fhow-to-use-pyspark-to-stream-data-into-mysql-database%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Full-time equivalent

          さくらももこ

          13 indicted, 8 arrested in Calif. drug cartel investigation