How to use PySpark to stream data into MySQL database?
I am currently working on a single page web app that allows users to upload large CSV files (currently testing a ~7GB file) to a flask server and then stream that dataset to a database. The upload takes about a minute and the file gets fully saved to a temporary file on the flask server. Now I need to be able to stream this file and store it into a database. I did some research and found that PySpark is great for streaming data and I am choosing MySQL as the database to stream the CSV data into (but I am open to other dbs and streaming methods). I am a junior dev and new to PySpark so I'm not sure how to go about this. The Spark streaming guide says that data must ingested through a source like Kafka, Flume, TCP socets, etc so I am wondering if I have to use any of those methods to get my CSV file into Spark. However, I came across this great example where they are streaming csv data into Azure SQL database and it looks like they are just reading the file directly using Spark without needing to ingest it through a streaming source like Kafka, etc. The only thing that confuses me with that example is that they are using a HDInsight Spark cluster to stream data into the db and I'm not sure how to incorporate that all with a flask server. I appologize for the lack of code but currently I just have a flask server file with one route doing the file upload. Any examples, tutorials, or advice would be greatly appreciated.
python mysql flask pyspark spark-streaming
add a comment |
I am currently working on a single page web app that allows users to upload large CSV files (currently testing a ~7GB file) to a flask server and then stream that dataset to a database. The upload takes about a minute and the file gets fully saved to a temporary file on the flask server. Now I need to be able to stream this file and store it into a database. I did some research and found that PySpark is great for streaming data and I am choosing MySQL as the database to stream the CSV data into (but I am open to other dbs and streaming methods). I am a junior dev and new to PySpark so I'm not sure how to go about this. The Spark streaming guide says that data must ingested through a source like Kafka, Flume, TCP socets, etc so I am wondering if I have to use any of those methods to get my CSV file into Spark. However, I came across this great example where they are streaming csv data into Azure SQL database and it looks like they are just reading the file directly using Spark without needing to ingest it through a streaming source like Kafka, etc. The only thing that confuses me with that example is that they are using a HDInsight Spark cluster to stream data into the db and I'm not sure how to incorporate that all with a flask server. I appologize for the lack of code but currently I just have a flask server file with one route doing the file upload. Any examples, tutorials, or advice would be greatly appreciated.
python mysql flask pyspark spark-streaming
add a comment |
I am currently working on a single page web app that allows users to upload large CSV files (currently testing a ~7GB file) to a flask server and then stream that dataset to a database. The upload takes about a minute and the file gets fully saved to a temporary file on the flask server. Now I need to be able to stream this file and store it into a database. I did some research and found that PySpark is great for streaming data and I am choosing MySQL as the database to stream the CSV data into (but I am open to other dbs and streaming methods). I am a junior dev and new to PySpark so I'm not sure how to go about this. The Spark streaming guide says that data must ingested through a source like Kafka, Flume, TCP socets, etc so I am wondering if I have to use any of those methods to get my CSV file into Spark. However, I came across this great example where they are streaming csv data into Azure SQL database and it looks like they are just reading the file directly using Spark without needing to ingest it through a streaming source like Kafka, etc. The only thing that confuses me with that example is that they are using a HDInsight Spark cluster to stream data into the db and I'm not sure how to incorporate that all with a flask server. I appologize for the lack of code but currently I just have a flask server file with one route doing the file upload. Any examples, tutorials, or advice would be greatly appreciated.
python mysql flask pyspark spark-streaming
I am currently working on a single page web app that allows users to upload large CSV files (currently testing a ~7GB file) to a flask server and then stream that dataset to a database. The upload takes about a minute and the file gets fully saved to a temporary file on the flask server. Now I need to be able to stream this file and store it into a database. I did some research and found that PySpark is great for streaming data and I am choosing MySQL as the database to stream the CSV data into (but I am open to other dbs and streaming methods). I am a junior dev and new to PySpark so I'm not sure how to go about this. The Spark streaming guide says that data must ingested through a source like Kafka, Flume, TCP socets, etc so I am wondering if I have to use any of those methods to get my CSV file into Spark. However, I came across this great example where they are streaming csv data into Azure SQL database and it looks like they are just reading the file directly using Spark without needing to ingest it through a streaming source like Kafka, etc. The only thing that confuses me with that example is that they are using a HDInsight Spark cluster to stream data into the db and I'm not sure how to incorporate that all with a flask server. I appologize for the lack of code but currently I just have a flask server file with one route doing the file upload. Any examples, tutorials, or advice would be greatly appreciated.
python mysql flask pyspark spark-streaming
python mysql flask pyspark spark-streaming
edited Nov 12 '18 at 19:13
Mario
asked Nov 12 '18 at 18:50
MarioMario
42
42
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
I am not sure about the streaming part but spark can handle large files efficiently - and storing to a db table will be done in parallel, so without much knowledge about your details, and provided that you have the uploaded file on your server, I'd say that:
If I wanted to save a big structured file like a csv in a table, I would start like this:
# start with some basic spark configuration, e.g. we want the timezone to be UTC
conf = SparkConf()
conf.set('spark.sql.session.timeZone', 'UTC')
# this is important: you need to have the mysql connector jar for the right mysql version:
conf.set('jars', 'path to mysql connector jar you can download from here: https://dev.mysql.com/downloads/connector/odbc/')
# instantiate a spark session: the first time it will take a few seconds
spark = SparkSession.builder
.config(conf=conf)
.appName('Huge File uploader')
.getOrCreate()
# read the file first as a dataframe
df = spark.read.csv('path to 7GB/ huge csv file')
# optionally, add a filename column
from pyspark.sql import functions as F
df = df.withColumn('filename', F.lit('thecurrentfilename'))
# write it to the table
df.write.format('jdbc').options(
url='e.g. localhost:port',
driver='com.mysql.cj.jdbc.Driver', # the driver for MySQL
dbtable='the table name to save to',
user='user',
password='secret',
).mode('append').save()
Note the mode 'append' here: the catch in this is that spark cannot perform updates on a table, it is either append the new rows or replace what is in the table.
So, if your csv is like this:
id, name, address....
You will end up with a table with the same fields.
This is the most basic example I could think of so that you start with spark, with no considerations about a spark cluster or anything else related. I would suggest you take this for a spin and figure out if this suits your needs :)
Also, keep in mind that this might take a few seconds or more depending on your data, where the database is located, your machine and your database load, so it might be a good idea to keep things asynchronous with your api, again I don't know about any of your other details.
Hope this helps. Good luck!
Thank you for your detailed response! @mkaran I gave it a shot but it gives me an error at the last step ondf.write()
method sayingRow size too large (> 8126). Changing some columns to TEXT or BLOB may help. In current row format, BLOB prefix of 0 bytes is stored inline.
– Mario
Nov 15 '18 at 16:39
It looks like it is a MySQL error. I was using MySQL version 8.0 and tried everything suggested on this post and on other posts and I also restarted my computer and MySQL server several times but nothing worked. I also downgraded to MySQL verison 5.7 but I was still getting the same error.
– Mario
Nov 15 '18 at 16:44
I cannot seem to write into my MySQL database so my only other option would be to use another db that pyspark can connect to.
– Mario
Nov 15 '18 at 16:46
Hi @Mario, can you please tell me what the output ofprint(len(df.columns))
andprint(df.count())
is after the read.csv?
– mkaran
Nov 16 '18 at 12:26
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53268342%2fhow-to-use-pyspark-to-stream-data-into-mysql-database%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
I am not sure about the streaming part but spark can handle large files efficiently - and storing to a db table will be done in parallel, so without much knowledge about your details, and provided that you have the uploaded file on your server, I'd say that:
If I wanted to save a big structured file like a csv in a table, I would start like this:
# start with some basic spark configuration, e.g. we want the timezone to be UTC
conf = SparkConf()
conf.set('spark.sql.session.timeZone', 'UTC')
# this is important: you need to have the mysql connector jar for the right mysql version:
conf.set('jars', 'path to mysql connector jar you can download from here: https://dev.mysql.com/downloads/connector/odbc/')
# instantiate a spark session: the first time it will take a few seconds
spark = SparkSession.builder
.config(conf=conf)
.appName('Huge File uploader')
.getOrCreate()
# read the file first as a dataframe
df = spark.read.csv('path to 7GB/ huge csv file')
# optionally, add a filename column
from pyspark.sql import functions as F
df = df.withColumn('filename', F.lit('thecurrentfilename'))
# write it to the table
df.write.format('jdbc').options(
url='e.g. localhost:port',
driver='com.mysql.cj.jdbc.Driver', # the driver for MySQL
dbtable='the table name to save to',
user='user',
password='secret',
).mode('append').save()
Note the mode 'append' here: the catch in this is that spark cannot perform updates on a table, it is either append the new rows or replace what is in the table.
So, if your csv is like this:
id, name, address....
You will end up with a table with the same fields.
This is the most basic example I could think of so that you start with spark, with no considerations about a spark cluster or anything else related. I would suggest you take this for a spin and figure out if this suits your needs :)
Also, keep in mind that this might take a few seconds or more depending on your data, where the database is located, your machine and your database load, so it might be a good idea to keep things asynchronous with your api, again I don't know about any of your other details.
Hope this helps. Good luck!
Thank you for your detailed response! @mkaran I gave it a shot but it gives me an error at the last step ondf.write()
method sayingRow size too large (> 8126). Changing some columns to TEXT or BLOB may help. In current row format, BLOB prefix of 0 bytes is stored inline.
– Mario
Nov 15 '18 at 16:39
It looks like it is a MySQL error. I was using MySQL version 8.0 and tried everything suggested on this post and on other posts and I also restarted my computer and MySQL server several times but nothing worked. I also downgraded to MySQL verison 5.7 but I was still getting the same error.
– Mario
Nov 15 '18 at 16:44
I cannot seem to write into my MySQL database so my only other option would be to use another db that pyspark can connect to.
– Mario
Nov 15 '18 at 16:46
Hi @Mario, can you please tell me what the output ofprint(len(df.columns))
andprint(df.count())
is after the read.csv?
– mkaran
Nov 16 '18 at 12:26
add a comment |
I am not sure about the streaming part but spark can handle large files efficiently - and storing to a db table will be done in parallel, so without much knowledge about your details, and provided that you have the uploaded file on your server, I'd say that:
If I wanted to save a big structured file like a csv in a table, I would start like this:
# start with some basic spark configuration, e.g. we want the timezone to be UTC
conf = SparkConf()
conf.set('spark.sql.session.timeZone', 'UTC')
# this is important: you need to have the mysql connector jar for the right mysql version:
conf.set('jars', 'path to mysql connector jar you can download from here: https://dev.mysql.com/downloads/connector/odbc/')
# instantiate a spark session: the first time it will take a few seconds
spark = SparkSession.builder
.config(conf=conf)
.appName('Huge File uploader')
.getOrCreate()
# read the file first as a dataframe
df = spark.read.csv('path to 7GB/ huge csv file')
# optionally, add a filename column
from pyspark.sql import functions as F
df = df.withColumn('filename', F.lit('thecurrentfilename'))
# write it to the table
df.write.format('jdbc').options(
url='e.g. localhost:port',
driver='com.mysql.cj.jdbc.Driver', # the driver for MySQL
dbtable='the table name to save to',
user='user',
password='secret',
).mode('append').save()
Note the mode 'append' here: the catch in this is that spark cannot perform updates on a table, it is either append the new rows or replace what is in the table.
So, if your csv is like this:
id, name, address....
You will end up with a table with the same fields.
This is the most basic example I could think of so that you start with spark, with no considerations about a spark cluster or anything else related. I would suggest you take this for a spin and figure out if this suits your needs :)
Also, keep in mind that this might take a few seconds or more depending on your data, where the database is located, your machine and your database load, so it might be a good idea to keep things asynchronous with your api, again I don't know about any of your other details.
Hope this helps. Good luck!
Thank you for your detailed response! @mkaran I gave it a shot but it gives me an error at the last step ondf.write()
method sayingRow size too large (> 8126). Changing some columns to TEXT or BLOB may help. In current row format, BLOB prefix of 0 bytes is stored inline.
– Mario
Nov 15 '18 at 16:39
It looks like it is a MySQL error. I was using MySQL version 8.0 and tried everything suggested on this post and on other posts and I also restarted my computer and MySQL server several times but nothing worked. I also downgraded to MySQL verison 5.7 but I was still getting the same error.
– Mario
Nov 15 '18 at 16:44
I cannot seem to write into my MySQL database so my only other option would be to use another db that pyspark can connect to.
– Mario
Nov 15 '18 at 16:46
Hi @Mario, can you please tell me what the output ofprint(len(df.columns))
andprint(df.count())
is after the read.csv?
– mkaran
Nov 16 '18 at 12:26
add a comment |
I am not sure about the streaming part but spark can handle large files efficiently - and storing to a db table will be done in parallel, so without much knowledge about your details, and provided that you have the uploaded file on your server, I'd say that:
If I wanted to save a big structured file like a csv in a table, I would start like this:
# start with some basic spark configuration, e.g. we want the timezone to be UTC
conf = SparkConf()
conf.set('spark.sql.session.timeZone', 'UTC')
# this is important: you need to have the mysql connector jar for the right mysql version:
conf.set('jars', 'path to mysql connector jar you can download from here: https://dev.mysql.com/downloads/connector/odbc/')
# instantiate a spark session: the first time it will take a few seconds
spark = SparkSession.builder
.config(conf=conf)
.appName('Huge File uploader')
.getOrCreate()
# read the file first as a dataframe
df = spark.read.csv('path to 7GB/ huge csv file')
# optionally, add a filename column
from pyspark.sql import functions as F
df = df.withColumn('filename', F.lit('thecurrentfilename'))
# write it to the table
df.write.format('jdbc').options(
url='e.g. localhost:port',
driver='com.mysql.cj.jdbc.Driver', # the driver for MySQL
dbtable='the table name to save to',
user='user',
password='secret',
).mode('append').save()
Note the mode 'append' here: the catch in this is that spark cannot perform updates on a table, it is either append the new rows or replace what is in the table.
So, if your csv is like this:
id, name, address....
You will end up with a table with the same fields.
This is the most basic example I could think of so that you start with spark, with no considerations about a spark cluster or anything else related. I would suggest you take this for a spin and figure out if this suits your needs :)
Also, keep in mind that this might take a few seconds or more depending on your data, where the database is located, your machine and your database load, so it might be a good idea to keep things asynchronous with your api, again I don't know about any of your other details.
Hope this helps. Good luck!
I am not sure about the streaming part but spark can handle large files efficiently - and storing to a db table will be done in parallel, so without much knowledge about your details, and provided that you have the uploaded file on your server, I'd say that:
If I wanted to save a big structured file like a csv in a table, I would start like this:
# start with some basic spark configuration, e.g. we want the timezone to be UTC
conf = SparkConf()
conf.set('spark.sql.session.timeZone', 'UTC')
# this is important: you need to have the mysql connector jar for the right mysql version:
conf.set('jars', 'path to mysql connector jar you can download from here: https://dev.mysql.com/downloads/connector/odbc/')
# instantiate a spark session: the first time it will take a few seconds
spark = SparkSession.builder
.config(conf=conf)
.appName('Huge File uploader')
.getOrCreate()
# read the file first as a dataframe
df = spark.read.csv('path to 7GB/ huge csv file')
# optionally, add a filename column
from pyspark.sql import functions as F
df = df.withColumn('filename', F.lit('thecurrentfilename'))
# write it to the table
df.write.format('jdbc').options(
url='e.g. localhost:port',
driver='com.mysql.cj.jdbc.Driver', # the driver for MySQL
dbtable='the table name to save to',
user='user',
password='secret',
).mode('append').save()
Note the mode 'append' here: the catch in this is that spark cannot perform updates on a table, it is either append the new rows or replace what is in the table.
So, if your csv is like this:
id, name, address....
You will end up with a table with the same fields.
This is the most basic example I could think of so that you start with spark, with no considerations about a spark cluster or anything else related. I would suggest you take this for a spin and figure out if this suits your needs :)
Also, keep in mind that this might take a few seconds or more depending on your data, where the database is located, your machine and your database load, so it might be a good idea to keep things asynchronous with your api, again I don't know about any of your other details.
Hope this helps. Good luck!
answered Nov 13 '18 at 11:18
mkaranmkaran
1,4441217
1,4441217
Thank you for your detailed response! @mkaran I gave it a shot but it gives me an error at the last step ondf.write()
method sayingRow size too large (> 8126). Changing some columns to TEXT or BLOB may help. In current row format, BLOB prefix of 0 bytes is stored inline.
– Mario
Nov 15 '18 at 16:39
It looks like it is a MySQL error. I was using MySQL version 8.0 and tried everything suggested on this post and on other posts and I also restarted my computer and MySQL server several times but nothing worked. I also downgraded to MySQL verison 5.7 but I was still getting the same error.
– Mario
Nov 15 '18 at 16:44
I cannot seem to write into my MySQL database so my only other option would be to use another db that pyspark can connect to.
– Mario
Nov 15 '18 at 16:46
Hi @Mario, can you please tell me what the output ofprint(len(df.columns))
andprint(df.count())
is after the read.csv?
– mkaran
Nov 16 '18 at 12:26
add a comment |
Thank you for your detailed response! @mkaran I gave it a shot but it gives me an error at the last step ondf.write()
method sayingRow size too large (> 8126). Changing some columns to TEXT or BLOB may help. In current row format, BLOB prefix of 0 bytes is stored inline.
– Mario
Nov 15 '18 at 16:39
It looks like it is a MySQL error. I was using MySQL version 8.0 and tried everything suggested on this post and on other posts and I also restarted my computer and MySQL server several times but nothing worked. I also downgraded to MySQL verison 5.7 but I was still getting the same error.
– Mario
Nov 15 '18 at 16:44
I cannot seem to write into my MySQL database so my only other option would be to use another db that pyspark can connect to.
– Mario
Nov 15 '18 at 16:46
Hi @Mario, can you please tell me what the output ofprint(len(df.columns))
andprint(df.count())
is after the read.csv?
– mkaran
Nov 16 '18 at 12:26
Thank you for your detailed response! @mkaran I gave it a shot but it gives me an error at the last step on
df.write()
method saying Row size too large (> 8126). Changing some columns to TEXT or BLOB may help. In current row format, BLOB prefix of 0 bytes is stored inline.
– Mario
Nov 15 '18 at 16:39
Thank you for your detailed response! @mkaran I gave it a shot but it gives me an error at the last step on
df.write()
method saying Row size too large (> 8126). Changing some columns to TEXT or BLOB may help. In current row format, BLOB prefix of 0 bytes is stored inline.
– Mario
Nov 15 '18 at 16:39
It looks like it is a MySQL error. I was using MySQL version 8.0 and tried everything suggested on this post and on other posts and I also restarted my computer and MySQL server several times but nothing worked. I also downgraded to MySQL verison 5.7 but I was still getting the same error.
– Mario
Nov 15 '18 at 16:44
It looks like it is a MySQL error. I was using MySQL version 8.0 and tried everything suggested on this post and on other posts and I also restarted my computer and MySQL server several times but nothing worked. I also downgraded to MySQL verison 5.7 but I was still getting the same error.
– Mario
Nov 15 '18 at 16:44
I cannot seem to write into my MySQL database so my only other option would be to use another db that pyspark can connect to.
– Mario
Nov 15 '18 at 16:46
I cannot seem to write into my MySQL database so my only other option would be to use another db that pyspark can connect to.
– Mario
Nov 15 '18 at 16:46
Hi @Mario, can you please tell me what the output of
print(len(df.columns))
and print(df.count())
is after the read.csv?– mkaran
Nov 16 '18 at 12:26
Hi @Mario, can you please tell me what the output of
print(len(df.columns))
and print(df.count())
is after the read.csv?– mkaran
Nov 16 '18 at 12:26
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53268342%2fhow-to-use-pyspark-to-stream-data-into-mysql-database%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown