If you been looking for a use case for a Document Database, I came to the realization my favorite dead simple one is the ability to query a pile of JSON, right along side my other data with sql without really doing much. Which is the dream realized from the powerful Multi Model InterSystems Data Platform, and shown here in a simple notebook to visualize my geo location data my Rivian R1S is emitting for DeezWatts ( A Rivian Data Adventure ).
So here is the 2 step approach, Ingestion to and Visualization from InterSystems Cloud Document, using the JDBC document driver.
For starters, I fired up a small Cloud Document deployment on the InterSystems Cloud Services Portal, with an enabled listener.
I downloaded the ssl certificate, and snagged the drivers for JDBC and accompanying document driver as well.
For ingestion, I wanted to get a grip on how to lift a JSON document from the file system and persist it as a collection in the document database over the listener, for this I wrote a standalone java app. This was more utility as the fun all happened in the notebook after the data was up in there.
RivianDocDB.java
package databricks_rivian_irisdocdb; import java.sql.SQLException; import com.intersystems.document.*; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.*; import java.io.IOException; import java.io.InputStream; import java.io.File; import java.io.FileInputStream; import org.apache.commons.io.IOUtils; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.stream.Stream; public <span>class RivianDocDb </span>{ <span>public static void main(String[] args) </span>{ String directoryPath = "/home/sween/Desktop/POP2/DEEZWATTS/rivian-iris-docdb/databricks_rivian_irisdocdb/in/json/"; DataSource datasrc = DataSource.createDataSource(); datasrc.setPortNumber(443); datasrc.setServerName("k8s-05868f04-a88b7ecb-5c5e41660d-404345a22ba1370c.elb.us-east-1.amazonaws.com"); datasrc.setDatabaseName("USER"); datasrc.setUser("SQLAdmin"); datasrc.setPassword("REDACTED"); try { datasrc.setConnectionSecurityLevel(10); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("\nCreated datasrc\n"); System.out.println(datasrc); datasrc.preStart(2); System.out.println("\nDataSource size =" + datasrc.getSize()); // creates the collection if it dont exist Collection collectedDocs = Collection.getCollection(datasrc,"deezwatts2"); try (Stream<Path> paths = Files.list(Paths.get(directoryPath))) { paths.filter(Files::isRegularFile) .forEach(path -> { File file = path.toFile(); }); } catch (IOException e) { e.printStackTrace(); } File directory = new File(directoryPath); if (directory.isDirectory()) { File[] files = directory.listFiles(); if (files != null) { for (File file : files) { if (file.isFile()) { try (InputStream is = new FileInputStream("/home/sween/Desktop/POP2/DEEZWATTS/rivian-iris-docdb/databricks_rivian_irisdocdb/in/json/" + file.getName())) { String jsonTxt = IOUtils.toString(is, "UTF-8"); Document doc2 = JSONObject.fromJSONString(jsonTxt); // top level key is whip2 Document doc3 = new JSONObject().put("whip2",doc2); collectedDocs.insert(doc3); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } } long size = collectedDocs.size(); System.out.println(Long.toString(size)); System.out.println("\nIngested Documents =" + datasrc.getSize());
The above is quite close to JAVA trash, but worked, we can see the collection in the collection browser in the deployment.
Now this takes a little bit of Databricks setup, but is well worth it to work with pyspark for the fun part.
I added the two InterSystems drivers to the cluster, and put the certificate in the import_cloudsql_certficiate.sh cluster init script so it gets added to the keystore.
For completeness, the cluster is running Databricks 16, Spark 3.5.0 and Scala 2.12
So we should be set to run a PySpark job and plot where my whip has been in the subset of data Ill drag in.
We are using geopandas and geodatasets for a straight forward approach to plotting.
package databricks_rivian_irisdocdb; import java.sql.SQLException; import com.intersystems.document.*; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.*; import java.io.IOException; import java.io.InputStream; import java.io.File; import java.io.FileInputStream; import org.apache.commons.io.IOUtils; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.stream.Stream; public <span>class RivianDocDb </span>{ <span>public static void main(String[] args) </span>{ String directoryPath = "/home/sween/Desktop/POP2/DEEZWATTS/rivian-iris-docdb/databricks_rivian_irisdocdb/in/json/"; DataSource datasrc = DataSource.createDataSource(); datasrc.setPortNumber(443); datasrc.setServerName("k8s-05868f04-a88b7ecb-5c5e41660d-404345a22ba1370c.elb.us-east-1.amazonaws.com"); datasrc.setDatabaseName("USER"); datasrc.setUser("SQLAdmin"); datasrc.setPassword("REDACTED"); try { datasrc.setConnectionSecurityLevel(10); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("\nCreated datasrc\n"); System.out.println(datasrc); datasrc.preStart(2); System.out.println("\nDataSource size =" + datasrc.getSize()); // creates the collection if it dont exist Collection collectedDocs = Collection.getCollection(datasrc,"deezwatts2"); try (Stream<Path> paths = Files.list(Paths.get(directoryPath))) { paths.filter(Files::isRegularFile) .forEach(path -> { File file = path.toFile(); }); } catch (IOException e) { e.printStackTrace(); } File directory = new File(directoryPath); if (directory.isDirectory()) { File[] files = directory.listFiles(); if (files != null) { for (File file : files) { if (file.isFile()) { try (InputStream is = new FileInputStream("/home/sween/Desktop/POP2/DEEZWATTS/rivian-iris-docdb/databricks_rivian_irisdocdb/in/json/" + file.getName())) { String jsonTxt = IOUtils.toString(is, "UTF-8"); Document doc2 = JSONObject.fromJSONString(jsonTxt); // top level key is whip2 Document doc3 = new JSONObject().put("whip2",doc2); collectedDocs.insert(doc3); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } } long size = collectedDocs.size(); System.out.println(Long.toString(size)); System.out.println("\nIngested Documents =" + datasrc.getSize());
Now, this takes a little bit to get used to, but here is the query to InterSystems Cloud Document using the JSON paths syntax and JSON_TABLE.
import geopandas as gpd import geodatasets from shapely.geometry import Polygon
I did manage to find a site that made it dead simple to create the json path @ jsonpath.com.
Next we setup the connection to the IRIS Document Database Deployment and read it into a dataframe.
dbtablequery = f"(SELECT TOP 1000 lat,longitude FROM JSON_TABLE(deezwatts2 FORMAT COLLECTION, '$' COLUMNS (lat VARCHAR(20) path '$.whip2.data.vehicleState.gnssLocation.latitude', longitude VARCHAR(20) path '$.whip2.data.vehicleState.gnssLocation.longitude' ))) AS temp_table;"
Next we grab an available map from geodatasets, the sdoh one is great for generic use of the united states.
# Read data from InterSystems Document Database via query above df = (spark.read.format("jdbc") \ .option("url", "jdbc:IRIS://k8s-05868f04-a88b7ecb-5c5e41660d-404345a22ba1370c.elb.us-east-1.amazonaws.com:443/USER") \ .option("jars", "/Volumes/cloudsql/iris/irisvolume/intersystems-document-1.0.1.jar") \ .option("driver", "com.intersystems.jdbc.IRISDriver") \ .option("dbtable", dbtablequery) \ .option("sql", "SELECT * FROM temp_table;") \ .option("user", "SQLAdmin") \ .option("password", "REDACTED") \ .option("connection security level","10") \ .option("sslConnection","true") \ .load())
Now the cool part, we want to zoom in on where we want to contain the geo location points of where the R1S has driven, for this we need a bounding box for the state of Michigan.
For this I used a really slick tool from Keene to draw the geo fence bounding box and it gives me the coordinates array!
Now that we have the coordinates array of the bounding box, we need slap them into a Polygon object.
# sdoh map is fantastic with bounding boxes michigan = gpd.read_file(geodatasets.get_path("geoda.us_sdoh")) gdf = gpd.GeoDataFrame( df.toPandas(), geometry=gpd.points_from_xy(df.toPandas()['longitude'].astype(float), df.toPandas()['lat'].astype(float)), crs=michigan.crs #"EPSG:4326" )
Now, lets plot the trail of the Rivian R1S! This will be for about 10,000 records (I used a top statement above to limit the results)
polygon = Polygon([ ( -87.286377, 45.9664245 ), ( -81.6503906, 45.8134865 ), ( -82.3864746, 42.1063737 ), ( -84.7814941, 41.3520721 ), ( -87.253418, 42.5045029 ), ( -87.5610352, 45.8823607 ) ])
And there we have it... Detroit, Traverse City, Silver Lake Sand Dunes, Holland, Mullet Lake, Interlachen... Pure Michigan, Rivian style.
The above is the detailed content of Rivian GeoLocation Plotting with IRIS Cloud Document and Databricks. For more information, please follow other related articles on the PHP Chinese website!