tldr; Tired of working with large CSV files? Give GeoParquet a try!
“Parquet is a powerful column-oriented data format, built from the ground up to as a modern alternative to CSV files.”https://geoparquet.org/
(Geo)Parquet is both smaller and faster than CSV. Additionally, (Geo)Parquet columns are typed. Text, numeric values, dates, geometries retain their data types. GeoParquet also stores CRS information and support in GIS solutions is growing.
I’ll be giving a quick overview using AIS data in GeoPandas 1.0.1 (with pyarrow) and QGIS 3.38 (with GDAL 3.9.2).
File size
The example AIS dataset for this demo contains ~10 million rows with 22 columns. I’ve converted the original zipped CSV into GeoPackage and GeoParquet using GeoPandas to illustrate the huge difference in file size: ~470 MB for GeoParquet and zipped CSV, 1.6 GB for CSV, and a whopping 2.6 GB for GeoPackage:
Reading performance
Pandas and GeoPandas both support selective reading of files, i.e. we can specify the specific columns to be loaded. This does speed up reading, even from CSV files:
Whole file
Selected columns
CSV
27.9 s
13.1 s
Geopackage
2min 12s
20.2 s
GeoParquet
7.2 s
4.1 s
Indeed, reading the whole GeoPackage is getting quite painful.
Here’s the code I used for timing the read times:
As you can see, these times include the creation of the GeoPandas.GeoDataFrame.
If we don’t need a GeoDataFrame, we can read the files even faster:
Non-spatial DataFrames
GeoParquet files can be read by non-GIS tools, such as Pandas. This makes it easier to collaborate with people who may not be familiar with geospatial data stacks.
And reading plain DataFrames is much faster than creating GeoDataFrames:
But back to GIS …
GeoParquet in QGIS
In QGIS, GeoParquet files can be loaded like any other vector layer, thanks to GDAL:
Loading the GeoParquet and GeoPackage files is pretty quick, especially if we zoom into a small region of interest (even though, unfortunately, it doesn’t seem possible to restrict the columns to further speed up loading). Loading the CSV, however, is pretty painful due to the lack of spatial indexing, which becomes apparent very quickly in the direct comparison:
As far as I can tell, my QGIS 3.38 ‘Grenoble’ does not support writing to or editing of GeoParquet files. So I’m limited to reading GeoParquet for now.
However, seeing how much smaller GeoParquets are compared to GeoPackages (and also faster to write), I hope that we will soon get the option to export to GeoParquet.
For now, I’ll start by converting my large CSV files to GeoParquet using GeoPandas.
Yesterday, I had the pleasure to speak at the RGS-IBG GIScience Research Group seminar. The talk presents methods for the exploration of movement patterns in massive quasi-continuous GPS tracking datasets containing billions of records using distributed computing approaches.
Here’s the full recording of my talk and follow-up discussion:
In the previous post, we explored how hvPlot and Datashader can help us to visualize large CSVs with point data in interactive map plots. Of course, the spatial distribution of points usually only shows us one part of the whole picture. Today, we’ll therefore look into how to explore other data attributes by linking other (non-spatial) plots to the map.
This functionality, referred to as “linked brushing” or “crossfiltering” is under active development and the following experiment was prompted by a recent thread on Twitter launched by @plotlygraphs announcement of HoloViews 1.14:
Dash HoloViews was just released as part of @HoloViews version 1.14!
To link the two plots, we use HoloViews’ link_selections function:
from holoviews.selection import link_selections
linked_plots = link_selections(map_plot + hist_plot)
That’s all! We can now perform spatial filters in the map and attribute value filters in the histogram and the filters are automatically applied to the linked plots:
Linked brushing demo using ship movement data (AIS): filtering records by speed (SOG) reveals spatial patterns of fast and slow movement.
You’ve probably noticed that there is no background map in the above plot. I had to remove the background map tiles to get rid of an error in Holoviews 1.13.4. This error has been fixed in 1.14.0 but I ran into other issues with the datashaded Scatterplot.
Even with all their downsides, CSV files are still a common data exchange format – particularly between disciplines with different tech stacks. Indeed, “How to Specify Data Types of CSV Columns for Use in QGIS” (originally written in 2011) is still one of the most popular posts on this blog. QGIS continues to be quite handy for visualizing CSV file contents. However, there are times when it’s just not enough, particularly when the number of rows in the CSV is in the range of multiple million. The following example uses a 12 million point CSV:
To give you an idea of the waiting times in QGIS, I’ve run the following script which loads and renders the CSV:
Panning and zooming around are no fun either since rendering takes so long. Changing from a single symbol renderer to, for example, a heatmap renderer does not improve the rendering times. So we need a different solutions when we want to efficiently explore large point CSV files.
The Pandas data analysis library is well-know for being a convenient tool for handling CSVs. However, it’s less clear how to use it as a replacement for desktop GIS for exploring large CSVs with point coordinates. My favorite solution so far uses hvPlot + HoloViews + Datashader to provide interactive Bokeh plots in Jupyter notebooks.
hvPlot provides a high-level plotting API built on HoloViews that provides a general and consistent API for plotting data in (Geo)Pandas, xarray, NetworkX, dask, and others. (Image source: https://hvplot.holoviz.org)
But first things first! Loading the CSV as a Pandas Dataframe takes 10.7 seconds. Pandas’ default plotting function (based on Matplotlib), however, takes around 13 seconds and only produces a static scatter plot.
Loading and plotting the CSV with Pandas
hvPlot to the rescue!
We only need two more steps to get faster and interactive map plots (plus background maps!): First, we need to reproject the lat/lon values. (There’s a warning here, most likely since some of the input lat/lon values are invalid.) Then, we replace plot() with hvplot() and voilà:
Plotting the CSV with Datashader
As you can see from the above GIF, the whole process barely takes 2 seconds and the resulting map plot is interactive and very responsive.
12 million points are far from the limit. As long as the Pandas DataFrame fits into memory, we are good and when the datasets get bigger than that, there are Dask DataFrames. But that’s a story for another day.
Rendering large sets of trajectory lines gets messy fast. Different aggregation approaches have been developed to address this issue. However, most approaches, such as mobility graphs or generalized flow maps, cannot handle large input datasets. Building on M³ prototypes, the following approach can be used in distributed computing environments to extracts flows from large datasets.
This flow extraction is based on a two-step process, conceptually similar to Andrienko flow maps: first, we extract M³ prototypes from the movement data. In the second step, we determine flows between these prototypes, including information about: distribution of travel speeds and number of observed transitions. The resulting flows can be visualized, for example, to explore the popularity of different paths of movement:
After the prototypes have been computed, the flow algorithm computes transitions between pairs of prototypes. An object moving from prototype A to prototype B triggers an update of the corresponding flow. To allow for distributed processing, each node in the distributed computing environment needs a copy of the previously computed prototypes. Additionally, the raw movement data records need to be converted into trajectories. Afterwards, each trajectory is processed independently, going through its records in chronological order:
Find the best matching prototype for the current record
Ensure that the distance to the match is below the distance threshold and that the matched prototype is different from the previous prototype
Get or create the flow between the two prototypes
Ensure that the prototype and flow directions are a good match for the current record’s direction
Update the flow properties: travel speed and number of transitions, as well as the previous prototype reference
This approach scales to large datasets since only the prototypes, the (intermediate) flow results, and the trajectory currently being worked on have to be kept in memory for each iteration. However, this algorithm does not allow for continuous updates. Flows would have to be recomputed (at least locally) whenever prototypes changed. Therefore, the algorithm does not support exploration of continuous data streams. However, it can be used to explore large historical datasets:
Visualizations of raw movement data records, that is, simple point maps or point density (“heat”) maps provide very limited data exploration capabilities. Therefore, we need clever aggregation approaches that can actually reveal movement patterns. Many existing aggregation approaches, however, do not scale to large datasets. We therefore developed the M³ Massive Movement Model [1] which supports distributed computing environments and can be incrementally updated with new data.
Using state-of-the-art big gespatial tools, such as GeoMesa, it is quite straightforward to ingest, index and query large amounts of timestamped location records. Thanks to GeoMesa’s GeoServer integration, it is also possible to publish GeoMesa tables as WMS and WFS which can be visualized in QGIS and explored (for more about GeoMesa, see Scalable spatial vector data processing ).So far so good! But with this basic setup, we only get point maps and point density maps which don’t tell us much about important movement characteristics like speed and direction (particularly if the reporting interval between consecutive location records is irregular). Therefore, we developed an aggregation method which models local record density, as well as movement speed and direction which we call M³.
For distributed computation, we need to split large datasets into chunks. To build models of local movement characteristics, it makes sense to create spatial or spatiotemporal chunks that can be processed independently. We therefore split the data along a regular grid but instead of computing one average value per grid cell, we create a flexible number of prototypes that describe the movement in the cell. Each prototype models a location, speed, and direction distribution (mean and sigma).
In our paper, we used M³ to explore ship movement data. We turned roughly 4 billion AIS records into prototypes:
M³ for ship movement data during January to December 2017 (3.9 billion records turned into 3.4 million prototypes; computing time: 41 minutes)
The above plot really only gives a first impression of the spatial distribution of ship movement records. The real value of M³ becomes clearer when we zoom in and start exploring regional patterns. Then we can discover vessel routes, speeds, and movement directions:
The prototype details on the right side, in particular, show the strength of the prototype idea: even though the grid cells we use are rather large, the prototypes clearly form along vessel routes. We can see exactly where these routes are and what speeds ship travel there, without having to increase the grid resolution to impractical values. Slow prototypes with high direction sigma (red+black markers) are clear indicators of ports. The marker size shows the number of records per prototype and thus helps distinguish heavily traveled routes from minor ones.
M³ is implemented in Spark. We read raw location records from GeoMesa and write prototypes to GeoMesa. All maps have been created in QGIS using prototype data published as GeoServer WFS.
If you want to dive deeper, here’s the full paper:
To explore travel patterns like origin-destination relationships, we need to identify individual trips with their start/end locations and trajectories between them. Extracting these trajectories from large datasets can be challenging, particularly if the records of individual moving objects don’t fit into memory anymore and if the spatial and temporal extent varies widely (as is the case with ship data, where individual vessel journeys can take weeks while crossing multiple oceans).
Roughly speaking, trip trajectories can be generated by first connecting consecutive records into continuous tracks and then splitting them at stops. This general approach applies to many different movement datasets. However, the processing details (e.g. stop detection parameters) and preprocessing steps (e.g. removing outliers) vary depending on input dataset characteristics.
For example, in our paper [1], we extracted vessel journeys from AIS data which meant that we also had to account for observation gaps when ships leave the observable (usually coastal) areas. In the accompanying 10-minute talk, I went through a 4-step trajectory exploration workflow for assessing our dataset’s potential for travel time prediction:
Click to watch the recorded talk
Like the M³ prototype computation presented in part 1, our trajectory aggregation approach is implemented in Spark. The challenges are both the massive amounts of trajectory data and the fact that operations only produce correct results if applied to a complete and chronologically sorted set of location records.This is challenging because Spark core libraries (version 2.4.5 at the time) are mostly geared towards dealing with unsorted data. This means that, when using high-level Spark core functionality incorrectly, an aggregator needs to collect and sort the entire track in the main memory of a single processing node. Consequently, when dealing with large datasets, out-of-memory errors are frequently encountered.
To solve this challenge, our implementation is based on the Secondary Sort pattern and on Spark’s aggregator concept. Secondary Sort takes care to first group records by a key (e.g. the moving object id), and only in the second step, when iterating over the records of a group, the records are sorted (e.g. chronologically). The resulting iterator can be used by an aggregator that implements the logic required to build trajectories based on gaps and stops detected in the dataset.
If you want to dive deeper, here’s the full paper:
Exploring large movement datasets is hard because visualizations of movement data quickly get cluttered and hard to interpret. Therefore, we need to aggregate the data. Density maps are commonly used since they are readily available and quick to compute but they provide only very limited insight. In contrast, meaningful aggregations that can help discover patterns are computationally expensive and therefore slow to generate.
This post serves as a starting point for a series of new approaches to exploring massive movement data. This series will summarize parts of my PhD research and – for those of you who are interested in more details – there will be links to the relevant papers.
Starting with the raw location records, we use different forms of aggregation to learn more about what information a movement dataset contains:
Besides clever aggregation approaches, massive movement datasets also require appropriate computing resources. To ensure that we can efficiently explore large datasets, we have implemented the above mentioned aggregation steps in Spark. This enables us to run the computations on general purpose computing clusters that can be scaled according to the dataset size.
Over the last years, many data analysis platforms have added spatial support to their portfolio. Just two days ago, Databricks have published an extensive post on spatial analysis. I took their post as a sign that it is time to look into how PySpark and GeoPandas can work together to achieve scalable spatial analysis workflows.
If you sign up for Databricks Community Edition, you get access to a toy cluster for experimenting with (Py)Spark. This considerably lowers the entry barrier to Spark since you don’t need to bother with installing anything yourself. They also provide a notebook environment:
I’ve followed the official Databricks GeoPandas example notebook but expanded it to read from a real geodata format (GeoPackage) rather than from CSV.
I’m using test data from the MovingPandas repository: demodata_geolife.gpkg contains a hand full of trajectories from the Geolife dataset. Demodata_grid.gpkg contains a simple 3×4 grid that covers the same geographic extent as the geolife sample:
Once the files are downloaded, we can use GeoPandas to read the GeoPackages:
Note that the display() function is used to show the plot.
The same applies to the grid data:
When the GeoDataFrames are ready, we can start using them in PySpark. To do so, it is necessary to convert from GeoDataFrame to PySpark DataFrame. Therefore, I’ve implemented a simple function that performs the conversion and turn the Point geometries into lon and lat columns:
To compute new values for our DataFrame, we can use existing or user-defined functions (UDF). Here’s a simple hello world function and associated UDF:
A spatial UDF is a little more involved. For example, here’s an UDF that finds the first polygon that intersects the specified lat/lon and returns that polygon’s ID. Note how we first broadcast the grid DataFrame to ensure that it is available on all computation nodes:
It’s worth noting that PySpark has its peculiarities. Since it’s a Python wrapper of a strongly typed language, we need to pay close attention to types in our Python code. For example, when defining UDFs, if the specified return type (Integertype in the above example) does not match the actual value returned by the find_intersection() function, this will cause rather cryptic errors.
To plot the results, I’m converting the joined PySpark DataFrame back to GeoDataFrame:
I’ve published this notebook so you can give it a try. (Any notebook published on Databricks is supposed to stay online for six months, so if you’re trying to access it after June 2020, this link may be broken.)
Working with movement data analysis, I’ve banged my head against performance issues every once in a while. For example, PostgreSQL – and therefore PostGIS – run queries in a single thread of execution. This is now changing, with more and more functionality being parallelized. PostgreSQL version 9.6 (released on 2016-09-29) included important steps towards parallelization, including parallel execution of sequential scans, joins and aggregates. Still, there is no parallel processing in PostGIS so far (but it is under development as described by Paul Ramsey in his posts “Parallel PostGIS II” and “PostGIS Scaling” from late 2017).
At the FOSS4G2016 in Bonn, I had the pleasure to chat with Shoaib Burq who ran the “An intro to Apache PySpark for Big Data GeoAnalysis” workshop. Back home, I downloaded the workshop material and gave it a try but since I wanted a scalable system for storing, analyzing, and visualizing spatial data, it didn’t really seem to fit the bill.
Around one year ago, my search grew more serious since we needed a solution that would support our research group’s new projects where we expected to work with billions of location records (timestamped points and associated attributes). I was happy to find that the fine folks at LocationTech have some very promising open source projects focusing on big spatial data, most notably GeoMesa and GeoWave. Both tools take care of storing and querying big spatio-temporal datasets and integrate into GeoServer for publication and visualization. (A good – if already slightly outdated – comparison of the two has been published by Azavea.)
My understanding at the time was that GeoMesa had a stronger vector data focus while GeoWave was more focused on raster data. This lead me to try out GeoMesa. I published my first steps in “Getting started with GeoMesa using Geodocker” but things only really started to take off once I joined the developer chats and was pointed towards CCRI’s cloud-local “a collection of bash scripts to set up a single-node cloud on your desktop, laptop, or NUC”. This enabled me to skip most of the setup pains and go straight to testing GeoMesa’s functionality.
The learning curve is rather significant: numerous big data stack components (including HDFS, Accumulo, and GeoMesa), a most likely new language (Scala), as well as the Spark computing system require some getting used to. One thing that softened the blow is the fact that writing queries in SparkSQL + GeoMesa is pretty close to writing PostGIS queries. It’s also rather impressive to browse hundreds of millions of points by connecting QGIS TimeManager to a GeoServer WMS-T with GeoMesa backend.
Spatial big data stack with GeoMesa
One of the first big datasets I’ve tested are taxi floating car data (FCD). At one million records per day, the three years in the following example amount to a total of around one billion timestamped points. A query for travel times between arbitrary start and destination locations took a couple of seconds:
Travel time statistics with GeoMesa (left) compared to Google Maps predictions (right)
Besides travel time predictions, I’m also looking into the potential for predicting future movement. After all, it seems not unreasonable to assume that an object would move in a similar fashion as other similar objects did in the past.
Early results of a proof of concept for GeoMesa based movement prediction
Big spatial data – both vector and raster – are an exciting challenge bringing new tools and approaches to our ever expanding spatial toolset. Development of components in open source big data stacks is rapid – not unlike the development speed of QGIS. This can make it challenging to keep up but it also holds promises for continuous improvements and quick turn-around times.
If you are using GeoMesa to work with spatio-temporal data, I’d love to hear about your experiences.