Dask high memory consumption when loading multiple Pandas dataframes on dictionary
I have a folder (7.7GB) with multiple pandas dataframes stored in parquet file format. I need to load all these dataframes in a python dictionary, but since I only have 32GB of RAM, I use the .loc method to just load the data that I need.
When all the dataframes are loaded in memory in the python dictory, I create a common index from the indexes all of the data, then I reindex all the dataframes with the new index.
I developed two scripts to do this, the first one is in a classic sequential way, the second one is using Dask in oder to get some performance improvement from all the cores of my Threadripper 1920x.
Sequential code:
# Standard library imports
import os
import pathlib
import time
# Third party imports
import pandas as pd
# Local application imports
class DataProvider:
def __init__(self):
self.data = dict()
def load_parquet(self, source_dir: str, timeframe_start: str, timeframe_end: str) -> None:
t = time.perf_counter()
symbol_list = list(file for file in os.listdir(source_dir) if file.endswith('.parquet'))
# updating containers
for symbol in symbol_list:
path = pathlib.Path.joinpath(pathlib.Path(source_dir), symbol)
name = symbol.replace('.parquet', '')
self.data[name] = pd.read_parquet(path).loc[timeframe_start:timeframe_end]
print(f'Loaded data in {round(time.perf_counter() - t, 3)} seconds.')
t = time.perf_counter()
# building index
index = None
for symbol in self.data:
if index is not None:
index.union(self.data[symbol].index)
else:
index = self.data[symbol].index
print(f'Built index in {round(time.perf_counter() - t, 3)} seconds.')
t = time.perf_counter()
# reindexing data
for symbol in self.data:
self.data[symbol] = self.data[symbol].reindex(index=index, method='pad').itertuples()
print(f'Indexed data in {round(time.perf_counter() - t, 3)} seconds.')
if __name__ == '__main__' or __name__ == 'builtins':
source = r'WindowsPath'
x = DataProvider()
x.load_parquet(source_dir=source, timeframe_start='2015', timeframe_end='2015')
Dask code:
# Standard library imports
import os
import pathlib
import time
# Third party imports
from dask.distributed import Client
import pandas as pd
# Local application imports
def __load_parquet__(directory, timeframe_start, timeframe_end):
return pd.read_parquet(directory).loc[timeframe_start:timeframe_end]
def __reindex__(new_index, df):
return df.reindex(index=new_index, method='pad').itertuples()
if __name__ == '__main__' or __name__ == 'builtins':
client = Client()
source = r'WindowsPath'
start = '2015'
end = '2015'
t = time.perf_counter()
file_list = [file for file in os.listdir(source) if file.endswith('.parquet')]
# build data
data = dict()
for file in file_list:
path = pathlib.Path.joinpath(pathlib.Path(source), file)
symbol = file.replace('.parquet', '')
data[symbol] = client.submit(__load_parquet__, path, start, end)
print(f'Loaded data in {round(time.perf_counter() - t, 3)} seconds.')
t = time.perf_counter()
# build index
index = None
for symbol in data:
if index is not None:
index.union(data[symbol].result().index)
else:
index = data[symbol].result().index
print(f'Built index in {round(time.perf_counter() - t, 3)} seconds.')
t = time.perf_counter()
# reindex
for symbol in data:
data[symbol] = client.submit(__reindex__, index, data[symbol].result())
print(f'Indexed data in {round(time.perf_counter() - t, 3)} seconds.')
I found the results pretty weird.
Sequential code:
- max memory consumption during computations: 30.2GB
- memory consumption at the end of computations: 15.6GB
- total memory consumption (without Windows and others): 11.6GB
- Loaded data in 54.289 seconds.
- Built index in 0.428 seconds.
- Reindexed data in 9.666 seconds.
Dask code:
- max memory consumption during computations: 25.2GB
- memory consumption at the end of computations: 22.6GB
- total memory consumption (without Windows and others): 18.9GB
- Loaded data in 0.638 seconds.
- Built index in 27.541 seconds.
- Reindexed data in 30.179 seconds.
My questions:
- Why with Dask the memory consumption at the end of computation is so much higher?
- Why with Dask building the common index and reindexing all the dataframes takes so much time?
Also, when using the Dask code the console prints me the following error.
C:UserseditAnaconda3envseditlibsite-packagesdistributeworker.py:901:UserWarning: Large object of size 5.41 MB detected in task graph:
(DatetimeIndex(['2015-01-02 09:30:00', '2015-01-02 ... s x 5 columns])
Consider scattering large objects ahead of time with client.scatter to reduce scheduler burden and keep data on workers
future = client.submit(func, big_data) # bad
big_future = client.scatter(big_data) # good
future = client.submit(func, big_future) # good
% (format_bytes(len(b)), s))
Even if the error suggestions are really good, I don't get what's wrong with my code. Why is it saying keep data on workers? I thought that with submit method I'm sending all the data to my client, and so the workers have an easy access to all the data. Thank you all for the help.
python pandas parallel-processing parquet dask
|
show 2 more comments
I have a folder (7.7GB) with multiple pandas dataframes stored in parquet file format. I need to load all these dataframes in a python dictionary, but since I only have 32GB of RAM, I use the .loc method to just load the data that I need.
When all the dataframes are loaded in memory in the python dictory, I create a common index from the indexes all of the data, then I reindex all the dataframes with the new index.
I developed two scripts to do this, the first one is in a classic sequential way, the second one is using Dask in oder to get some performance improvement from all the cores of my Threadripper 1920x.
Sequential code:
# Standard library imports
import os
import pathlib
import time
# Third party imports
import pandas as pd
# Local application imports
class DataProvider:
def __init__(self):
self.data = dict()
def load_parquet(self, source_dir: str, timeframe_start: str, timeframe_end: str) -> None:
t = time.perf_counter()
symbol_list = list(file for file in os.listdir(source_dir) if file.endswith('.parquet'))
# updating containers
for symbol in symbol_list:
path = pathlib.Path.joinpath(pathlib.Path(source_dir), symbol)
name = symbol.replace('.parquet', '')
self.data[name] = pd.read_parquet(path).loc[timeframe_start:timeframe_end]
print(f'Loaded data in {round(time.perf_counter() - t, 3)} seconds.')
t = time.perf_counter()
# building index
index = None
for symbol in self.data:
if index is not None:
index.union(self.data[symbol].index)
else:
index = self.data[symbol].index
print(f'Built index in {round(time.perf_counter() - t, 3)} seconds.')
t = time.perf_counter()
# reindexing data
for symbol in self.data:
self.data[symbol] = self.data[symbol].reindex(index=index, method='pad').itertuples()
print(f'Indexed data in {round(time.perf_counter() - t, 3)} seconds.')
if __name__ == '__main__' or __name__ == 'builtins':
source = r'WindowsPath'
x = DataProvider()
x.load_parquet(source_dir=source, timeframe_start='2015', timeframe_end='2015')
Dask code:
# Standard library imports
import os
import pathlib
import time
# Third party imports
from dask.distributed import Client
import pandas as pd
# Local application imports
def __load_parquet__(directory, timeframe_start, timeframe_end):
return pd.read_parquet(directory).loc[timeframe_start:timeframe_end]
def __reindex__(new_index, df):
return df.reindex(index=new_index, method='pad').itertuples()
if __name__ == '__main__' or __name__ == 'builtins':
client = Client()
source = r'WindowsPath'
start = '2015'
end = '2015'
t = time.perf_counter()
file_list = [file for file in os.listdir(source) if file.endswith('.parquet')]
# build data
data = dict()
for file in file_list:
path = pathlib.Path.joinpath(pathlib.Path(source), file)
symbol = file.replace('.parquet', '')
data[symbol] = client.submit(__load_parquet__, path, start, end)
print(f'Loaded data in {round(time.perf_counter() - t, 3)} seconds.')
t = time.perf_counter()
# build index
index = None
for symbol in data:
if index is not None:
index.union(data[symbol].result().index)
else:
index = data[symbol].result().index
print(f'Built index in {round(time.perf_counter() - t, 3)} seconds.')
t = time.perf_counter()
# reindex
for symbol in data:
data[symbol] = client.submit(__reindex__, index, data[symbol].result())
print(f'Indexed data in {round(time.perf_counter() - t, 3)} seconds.')
I found the results pretty weird.
Sequential code:
- max memory consumption during computations: 30.2GB
- memory consumption at the end of computations: 15.6GB
- total memory consumption (without Windows and others): 11.6GB
- Loaded data in 54.289 seconds.
- Built index in 0.428 seconds.
- Reindexed data in 9.666 seconds.
Dask code:
- max memory consumption during computations: 25.2GB
- memory consumption at the end of computations: 22.6GB
- total memory consumption (without Windows and others): 18.9GB
- Loaded data in 0.638 seconds.
- Built index in 27.541 seconds.
- Reindexed data in 30.179 seconds.
My questions:
- Why with Dask the memory consumption at the end of computation is so much higher?
- Why with Dask building the common index and reindexing all the dataframes takes so much time?
Also, when using the Dask code the console prints me the following error.
C:UserseditAnaconda3envseditlibsite-packagesdistributeworker.py:901:UserWarning: Large object of size 5.41 MB detected in task graph:
(DatetimeIndex(['2015-01-02 09:30:00', '2015-01-02 ... s x 5 columns])
Consider scattering large objects ahead of time with client.scatter to reduce scheduler burden and keep data on workers
future = client.submit(func, big_data) # bad
big_future = client.scatter(big_data) # good
future = client.submit(func, big_future) # good
% (format_bytes(len(b)), s))
Even if the error suggestions are really good, I don't get what's wrong with my code. Why is it saying keep data on workers? I thought that with submit method I'm sending all the data to my client, and so the workers have an easy access to all the data. Thank you all for the help.
python pandas parallel-processing parquet dask
2
why not use dask'sread_parquetwith thefiltersargument ?
– moshevi
Nov 12 at 8:26
There are a few reasons why I'm not using dask.dataframes: 1) The documentations says ''This reads a directory of Parquet data into a Dask.dataframe, one file per partition.''. I'm not loading and entire folder as a single big dataframe, but single dataframes for each file in that folder. 2) I don't understand how to apply filters. How can I write something like dd.read_parquet(path=path, filters=[data.index = '2015')]?
– ilpomo
Nov 12 at 9:33
Update: using any filter will raise NotImplementedError: Predicate pushdown not implemented. Looks like the filter option is not implemented in dask 0.20.
– ilpomo
Nov 12 at 9:56
2
they are implemented, however I think only for fastparquet. are you using pyarrow ?
– moshevi
Nov 12 at 10:00
1
please also note this answer about thefiltersargument,
– moshevi
Nov 12 at 10:03
|
show 2 more comments
I have a folder (7.7GB) with multiple pandas dataframes stored in parquet file format. I need to load all these dataframes in a python dictionary, but since I only have 32GB of RAM, I use the .loc method to just load the data that I need.
When all the dataframes are loaded in memory in the python dictory, I create a common index from the indexes all of the data, then I reindex all the dataframes with the new index.
I developed two scripts to do this, the first one is in a classic sequential way, the second one is using Dask in oder to get some performance improvement from all the cores of my Threadripper 1920x.
Sequential code:
# Standard library imports
import os
import pathlib
import time
# Third party imports
import pandas as pd
# Local application imports
class DataProvider:
def __init__(self):
self.data = dict()
def load_parquet(self, source_dir: str, timeframe_start: str, timeframe_end: str) -> None:
t = time.perf_counter()
symbol_list = list(file for file in os.listdir(source_dir) if file.endswith('.parquet'))
# updating containers
for symbol in symbol_list:
path = pathlib.Path.joinpath(pathlib.Path(source_dir), symbol)
name = symbol.replace('.parquet', '')
self.data[name] = pd.read_parquet(path).loc[timeframe_start:timeframe_end]
print(f'Loaded data in {round(time.perf_counter() - t, 3)} seconds.')
t = time.perf_counter()
# building index
index = None
for symbol in self.data:
if index is not None:
index.union(self.data[symbol].index)
else:
index = self.data[symbol].index
print(f'Built index in {round(time.perf_counter() - t, 3)} seconds.')
t = time.perf_counter()
# reindexing data
for symbol in self.data:
self.data[symbol] = self.data[symbol].reindex(index=index, method='pad').itertuples()
print(f'Indexed data in {round(time.perf_counter() - t, 3)} seconds.')
if __name__ == '__main__' or __name__ == 'builtins':
source = r'WindowsPath'
x = DataProvider()
x.load_parquet(source_dir=source, timeframe_start='2015', timeframe_end='2015')
Dask code:
# Standard library imports
import os
import pathlib
import time
# Third party imports
from dask.distributed import Client
import pandas as pd
# Local application imports
def __load_parquet__(directory, timeframe_start, timeframe_end):
return pd.read_parquet(directory).loc[timeframe_start:timeframe_end]
def __reindex__(new_index, df):
return df.reindex(index=new_index, method='pad').itertuples()
if __name__ == '__main__' or __name__ == 'builtins':
client = Client()
source = r'WindowsPath'
start = '2015'
end = '2015'
t = time.perf_counter()
file_list = [file for file in os.listdir(source) if file.endswith('.parquet')]
# build data
data = dict()
for file in file_list:
path = pathlib.Path.joinpath(pathlib.Path(source), file)
symbol = file.replace('.parquet', '')
data[symbol] = client.submit(__load_parquet__, path, start, end)
print(f'Loaded data in {round(time.perf_counter() - t, 3)} seconds.')
t = time.perf_counter()
# build index
index = None
for symbol in data:
if index is not None:
index.union(data[symbol].result().index)
else:
index = data[symbol].result().index
print(f'Built index in {round(time.perf_counter() - t, 3)} seconds.')
t = time.perf_counter()
# reindex
for symbol in data:
data[symbol] = client.submit(__reindex__, index, data[symbol].result())
print(f'Indexed data in {round(time.perf_counter() - t, 3)} seconds.')
I found the results pretty weird.
Sequential code:
- max memory consumption during computations: 30.2GB
- memory consumption at the end of computations: 15.6GB
- total memory consumption (without Windows and others): 11.6GB
- Loaded data in 54.289 seconds.
- Built index in 0.428 seconds.
- Reindexed data in 9.666 seconds.
Dask code:
- max memory consumption during computations: 25.2GB
- memory consumption at the end of computations: 22.6GB
- total memory consumption (without Windows and others): 18.9GB
- Loaded data in 0.638 seconds.
- Built index in 27.541 seconds.
- Reindexed data in 30.179 seconds.
My questions:
- Why with Dask the memory consumption at the end of computation is so much higher?
- Why with Dask building the common index and reindexing all the dataframes takes so much time?
Also, when using the Dask code the console prints me the following error.
C:UserseditAnaconda3envseditlibsite-packagesdistributeworker.py:901:UserWarning: Large object of size 5.41 MB detected in task graph:
(DatetimeIndex(['2015-01-02 09:30:00', '2015-01-02 ... s x 5 columns])
Consider scattering large objects ahead of time with client.scatter to reduce scheduler burden and keep data on workers
future = client.submit(func, big_data) # bad
big_future = client.scatter(big_data) # good
future = client.submit(func, big_future) # good
% (format_bytes(len(b)), s))
Even if the error suggestions are really good, I don't get what's wrong with my code. Why is it saying keep data on workers? I thought that with submit method I'm sending all the data to my client, and so the workers have an easy access to all the data. Thank you all for the help.
python pandas parallel-processing parquet dask
I have a folder (7.7GB) with multiple pandas dataframes stored in parquet file format. I need to load all these dataframes in a python dictionary, but since I only have 32GB of RAM, I use the .loc method to just load the data that I need.
When all the dataframes are loaded in memory in the python dictory, I create a common index from the indexes all of the data, then I reindex all the dataframes with the new index.
I developed two scripts to do this, the first one is in a classic sequential way, the second one is using Dask in oder to get some performance improvement from all the cores of my Threadripper 1920x.
Sequential code:
# Standard library imports
import os
import pathlib
import time
# Third party imports
import pandas as pd
# Local application imports
class DataProvider:
def __init__(self):
self.data = dict()
def load_parquet(self, source_dir: str, timeframe_start: str, timeframe_end: str) -> None:
t = time.perf_counter()
symbol_list = list(file for file in os.listdir(source_dir) if file.endswith('.parquet'))
# updating containers
for symbol in symbol_list:
path = pathlib.Path.joinpath(pathlib.Path(source_dir), symbol)
name = symbol.replace('.parquet', '')
self.data[name] = pd.read_parquet(path).loc[timeframe_start:timeframe_end]
print(f'Loaded data in {round(time.perf_counter() - t, 3)} seconds.')
t = time.perf_counter()
# building index
index = None
for symbol in self.data:
if index is not None:
index.union(self.data[symbol].index)
else:
index = self.data[symbol].index
print(f'Built index in {round(time.perf_counter() - t, 3)} seconds.')
t = time.perf_counter()
# reindexing data
for symbol in self.data:
self.data[symbol] = self.data[symbol].reindex(index=index, method='pad').itertuples()
print(f'Indexed data in {round(time.perf_counter() - t, 3)} seconds.')
if __name__ == '__main__' or __name__ == 'builtins':
source = r'WindowsPath'
x = DataProvider()
x.load_parquet(source_dir=source, timeframe_start='2015', timeframe_end='2015')
Dask code:
# Standard library imports
import os
import pathlib
import time
# Third party imports
from dask.distributed import Client
import pandas as pd
# Local application imports
def __load_parquet__(directory, timeframe_start, timeframe_end):
return pd.read_parquet(directory).loc[timeframe_start:timeframe_end]
def __reindex__(new_index, df):
return df.reindex(index=new_index, method='pad').itertuples()
if __name__ == '__main__' or __name__ == 'builtins':
client = Client()
source = r'WindowsPath'
start = '2015'
end = '2015'
t = time.perf_counter()
file_list = [file for file in os.listdir(source) if file.endswith('.parquet')]
# build data
data = dict()
for file in file_list:
path = pathlib.Path.joinpath(pathlib.Path(source), file)
symbol = file.replace('.parquet', '')
data[symbol] = client.submit(__load_parquet__, path, start, end)
print(f'Loaded data in {round(time.perf_counter() - t, 3)} seconds.')
t = time.perf_counter()
# build index
index = None
for symbol in data:
if index is not None:
index.union(data[symbol].result().index)
else:
index = data[symbol].result().index
print(f'Built index in {round(time.perf_counter() - t, 3)} seconds.')
t = time.perf_counter()
# reindex
for symbol in data:
data[symbol] = client.submit(__reindex__, index, data[symbol].result())
print(f'Indexed data in {round(time.perf_counter() - t, 3)} seconds.')
I found the results pretty weird.
Sequential code:
- max memory consumption during computations: 30.2GB
- memory consumption at the end of computations: 15.6GB
- total memory consumption (without Windows and others): 11.6GB
- Loaded data in 54.289 seconds.
- Built index in 0.428 seconds.
- Reindexed data in 9.666 seconds.
Dask code:
- max memory consumption during computations: 25.2GB
- memory consumption at the end of computations: 22.6GB
- total memory consumption (without Windows and others): 18.9GB
- Loaded data in 0.638 seconds.
- Built index in 27.541 seconds.
- Reindexed data in 30.179 seconds.
My questions:
- Why with Dask the memory consumption at the end of computation is so much higher?
- Why with Dask building the common index and reindexing all the dataframes takes so much time?
Also, when using the Dask code the console prints me the following error.
C:UserseditAnaconda3envseditlibsite-packagesdistributeworker.py:901:UserWarning: Large object of size 5.41 MB detected in task graph:
(DatetimeIndex(['2015-01-02 09:30:00', '2015-01-02 ... s x 5 columns])
Consider scattering large objects ahead of time with client.scatter to reduce scheduler burden and keep data on workers
future = client.submit(func, big_data) # bad
big_future = client.scatter(big_data) # good
future = client.submit(func, big_future) # good
% (format_bytes(len(b)), s))
Even if the error suggestions are really good, I don't get what's wrong with my code. Why is it saying keep data on workers? I thought that with submit method I'm sending all the data to my client, and so the workers have an easy access to all the data. Thank you all for the help.
python pandas parallel-processing parquet dask
python pandas parallel-processing parquet dask
asked Nov 11 at 22:00
ilpomo
1951111
1951111
2
why not use dask'sread_parquetwith thefiltersargument ?
– moshevi
Nov 12 at 8:26
There are a few reasons why I'm not using dask.dataframes: 1) The documentations says ''This reads a directory of Parquet data into a Dask.dataframe, one file per partition.''. I'm not loading and entire folder as a single big dataframe, but single dataframes for each file in that folder. 2) I don't understand how to apply filters. How can I write something like dd.read_parquet(path=path, filters=[data.index = '2015')]?
– ilpomo
Nov 12 at 9:33
Update: using any filter will raise NotImplementedError: Predicate pushdown not implemented. Looks like the filter option is not implemented in dask 0.20.
– ilpomo
Nov 12 at 9:56
2
they are implemented, however I think only for fastparquet. are you using pyarrow ?
– moshevi
Nov 12 at 10:00
1
please also note this answer about thefiltersargument,
– moshevi
Nov 12 at 10:03
|
show 2 more comments
2
why not use dask'sread_parquetwith thefiltersargument ?
– moshevi
Nov 12 at 8:26
There are a few reasons why I'm not using dask.dataframes: 1) The documentations says ''This reads a directory of Parquet data into a Dask.dataframe, one file per partition.''. I'm not loading and entire folder as a single big dataframe, but single dataframes for each file in that folder. 2) I don't understand how to apply filters. How can I write something like dd.read_parquet(path=path, filters=[data.index = '2015')]?
– ilpomo
Nov 12 at 9:33
Update: using any filter will raise NotImplementedError: Predicate pushdown not implemented. Looks like the filter option is not implemented in dask 0.20.
– ilpomo
Nov 12 at 9:56
2
they are implemented, however I think only for fastparquet. are you using pyarrow ?
– moshevi
Nov 12 at 10:00
1
please also note this answer about thefiltersargument,
– moshevi
Nov 12 at 10:03
2
2
why not use dask's
read_parquet with the filters argument ?– moshevi
Nov 12 at 8:26
why not use dask's
read_parquet with the filters argument ?– moshevi
Nov 12 at 8:26
There are a few reasons why I'm not using dask.dataframes: 1) The documentations says ''This reads a directory of Parquet data into a Dask.dataframe, one file per partition.''. I'm not loading and entire folder as a single big dataframe, but single dataframes for each file in that folder. 2) I don't understand how to apply filters. How can I write something like dd.read_parquet(path=path, filters=[data.index = '2015')]?
– ilpomo
Nov 12 at 9:33
There are a few reasons why I'm not using dask.dataframes: 1) The documentations says ''This reads a directory of Parquet data into a Dask.dataframe, one file per partition.''. I'm not loading and entire folder as a single big dataframe, but single dataframes for each file in that folder. 2) I don't understand how to apply filters. How can I write something like dd.read_parquet(path=path, filters=[data.index = '2015')]?
– ilpomo
Nov 12 at 9:33
Update: using any filter will raise NotImplementedError: Predicate pushdown not implemented. Looks like the filter option is not implemented in dask 0.20.
– ilpomo
Nov 12 at 9:56
Update: using any filter will raise NotImplementedError: Predicate pushdown not implemented. Looks like the filter option is not implemented in dask 0.20.
– ilpomo
Nov 12 at 9:56
2
2
they are implemented, however I think only for fastparquet. are you using pyarrow ?
– moshevi
Nov 12 at 10:00
they are implemented, however I think only for fastparquet. are you using pyarrow ?
– moshevi
Nov 12 at 10:00
1
1
please also note this answer about the
filters argument,– moshevi
Nov 12 at 10:03
please also note this answer about the
filters argument,– moshevi
Nov 12 at 10:03
|
show 2 more comments
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53253676%2fdask-high-memory-consumption-when-loading-multiple-pandas-dataframes-on-dictiona%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53253676%2fdask-high-memory-consumption-when-loading-multiple-pandas-dataframes-on-dictiona%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
2
why not use dask's
read_parquetwith thefiltersargument ?– moshevi
Nov 12 at 8:26
There are a few reasons why I'm not using dask.dataframes: 1) The documentations says ''This reads a directory of Parquet data into a Dask.dataframe, one file per partition.''. I'm not loading and entire folder as a single big dataframe, but single dataframes for each file in that folder. 2) I don't understand how to apply filters. How can I write something like dd.read_parquet(path=path, filters=[data.index = '2015')]?
– ilpomo
Nov 12 at 9:33
Update: using any filter will raise NotImplementedError: Predicate pushdown not implemented. Looks like the filter option is not implemented in dask 0.20.
– ilpomo
Nov 12 at 9:56
2
they are implemented, however I think only for fastparquet. are you using pyarrow ?
– moshevi
Nov 12 at 10:00
1
please also note this answer about the
filtersargument,– moshevi
Nov 12 at 10:03