These days you get time series data from multiple sources; you may also argue than using a traditional relational database may not work well with this data because of the following:
In this tutorial I will show you how to use Influxdb; In particular I like it because it offers integration with other tools out of the box (like Grafana, Python3) and it has a powerful yet simple language to run queries called flux.
This is maybe the easiest way to get you started; We will use an external volume to persist the data across container reboots and upgrades (please check the container page to see all the possible options):
podman pull influxdb:latest
podman run --detach --volume /data/influxdb:/var/lib/influxdb --volumne /data:/data:rw --name influxdb_raspberrypi --restart always --publish 8086:8086 influxdb:latest
podman logs --follow influxdb_raspberrypi
Also, we are mapping an additional volume called /data directory inside the container, to import some CSV files later.
Next go to the machine where you are running influxdb (say http://localhost:8086) and complete the installation steps by:
The API token will look similar to this: nFIaywYgg0G6oYtZCDZdp0StZqYmz4eYg23KGKcr5tau05tb7T2DY-LQSIgRK66QG9mkkKuK2nNCoIKJ0zoyng==
Keep it safe, as it will be used to read/ write data into your USTS bucket.
On this tutorial I will use publicly available data from the Connection Data Portal.
Specifically we will use the Underground Storage Tanks (USTs) - Facility and Tank Details: The underground storage tank regulations and the Connecticut underground storage tank enforcement program have been in effect since November 1985. This list is based on notification information submitted by the public since 1985, and is updated weekly.
I like this dataset for the following reasons:
You can grab a copy with curl, for example:
[josevnz@dmaf5 influxdb_intro]$ curl --location --silent --fail --show-error --output ~/Downloads/ust.csv 'https://data.ct.gov/api/views/utni-rddb/rows.csv?accessType=DOWNLOAD'
[josevnz@dmaf5 influxdb_intro]$ wc -l ~/Downloads/ust.csv
49090 /home/josevnz/Downloads/ust.csv
I’ll show you next how you can import your data into your bucket and some issues you may face
It is a good practice to ask yourself what kind of questions you can answer with the data before even deciding that to import and what to ignore; A few examples we will try to answer soon:
Based on that, we check the available columns and make a quick rain check on what to ignore during the import process:
Number | Column Name | Type | Remarks |
---|---|---|---|
1 | UST Site ID Number | Plain Text | Ignoring |
2 | Site Name | Plain Text | Ignoring |
3 | Site Address | Plain Text | Ignoring, do not care about street |
4 | Site City | Plain Text | |
5 | Site Zip | Plain Text | Ignoring |
6 | Tank No. | Plain Text | Ignoring |
7 | Status of Tank | Plain Text | |
8 | Compartment | Plain Text | |
9 | Estimated Total Capacity (gallons) | Number | |
10 | Substance Currently Stored | Plain Text | |
11 | Last Used Date | Date & Time | |
12 | Closure Type | Plain Text | |
13 | Construction Type - Tank | Plain Text | |
14 | Tank Details | Plain Text | Ignoring |
15 | Construction Type - Piping | Plain Text | |
16 | Piping Details | Plain Text | Ignoring |
17 | Installation Date | Date & Time | Ignoring |
18 | Spill Protection | Plain Text | |
19 | Overfill Protection | Plain Text | |
20 | Tank Latitude | Number | |
21 | Tank Longitude | Number | |
22 | Tank Collection Method | Plain Text | Ignoring |
23 | Tank Reference Point Type | Plain Text | Ignoring |
24 | UST Site Latitude | Number | Ignoring, redundant |
25 | UST Site Longitude | Number | Ignoring, redundant |
26 | Site Collection Method | Plain Text | Ignoring |
27 | Site Reference Point Type | Plain Text | Ignoring, redundant |
Let’s take a peek on our data:
ST Site ID Number,Site Name,Site Address,Site City,Site Zip,Tank No.,Status of Tank,Compartment,Estimated Total Capacity (gallons),Substance Currently Stored,Last Used Date,Closure Type,Construction Type - Tank,Tank Details,Construction Type - Piping,Piping Details,Installation Date,Spill Protection,Overfill Protection,Tank Latitude,Tank Longitude,Tank Collection Method,Tank Reference Point Type,UST Site Latitude,UST Site Longitude,Site Collection Method,Site Reference Point Type
50-11456,Brewer Dauntless Marina,9 NOVELTY LN,ESSEX,06426,1,Permanently Closed,,4000,Gasoline,10/18/2018,Tank was Removed From Ground,Coated & Cathodically Protected Steel (sti-P3),Double Walled,Flexible Plastic,"Containment Sumps @ Dispensers,Containment Sumps @ Tanks,Double Walled,Metallic fittings isolated from soil and water",06/01/1999,Spill Bucket,Ball Float Device,41.350018,-72.385442,Address Matching,Approximate Location,41.350018,-72.385442,Address Matching,Approximate Location
106-1923,FOOD BAG #509,1652 BOSTON POST RD,OLD SAYBROOK,06475,D1,Permanently Closed,a,10000,Diesel,03/01/1998,Tank was Removed From Ground,Coated & Cathodically Protected Steel (sti-P3),,Rigid Fiberglass Reinforced Plastic,,02/01/1983,,,41.286115,-72.414762,Address Matching,Approximate Location,41.286115,-72.414762,Address Matching,Approximate Location
As our first take, we can use the Influxdb bulk importers using the line protocol; It basically means we help Influxdb to digest the data as follows:
Using the CSV annotation is a way to go, but keep in mind it has limitations on how much you can manipulate the during the import process.
Then comes the decision on were and how to store the data; Influxdb has the concept of tags, fields and measurements on, explained on understanding-Influxdb-basics:
If you notice, we have not one but 2 date time columns:
Number | Column Name | Type | Remarks | |
---|---|---|---|---|
11 | Last Used Date | Date & Time | ||
17 | Installation Date | Date & Time |
And, like any time series, there can only be one dateTime column.
So what to do? well, we can either split the data into 2 different buckets, depending on what we want to track, store one of them as a tag (useless as it makes it harder to use), or just ignore it completely.
For our analysis we care more about the last used date, so we will ignore the installation date;
How does that look for our data?
#constant measurement,fuel_tanks
#datatype ignore,ignore,ignore,tag,ignore,ignore,tag,tag,long,tag,dateTime:01/02/2006,tag,tag,ignore,tag,ignore,ignore,tag,tag,double,double,ignored,ignored,ignored,ignored,ignored,ignored
ID,Name,Address,City,Zip,TankNo,Status,Compartment,EstimatedTotalCapacity,SubstanceStored,LastUsed,ClosureType,ConstructionType,Details,ConstructionType,PipingDetails,InstallationDate,SpillProtection,OverfillProtection,Latitude,Longitude,CollectionMethod,ReferencePointType,USTLatitude,USTLongitude,CollectionMethod,ReferencePointType
The full command would look something like this (import_ust.sh):
/usr/bin/podman run --interactive --tty --volume "$header_file:/data/headers.csv" --volume "$csv_file:/data/tanks.csv" influxdb influx write "$dryrun" --bucket $BUCKET --org $ORG --format csv --skipHeader=1 --url "$url" --file "/data/headers.csv" --file "/data/tanks.csv"
If you run it in dry mode, you will be able to see the line protocol used to import the data:
fuel_tanks,City=BRISTOL,ConstructionType=Flexible\ Plastic,OverfillProtection=Audible\ Alarm,SpillProtection=Spill\ Bucket,Status=Currently\ In\ Use,SubstanceStored=Gasoline EstimatedTotalCapacity=15000i,Latitude=41.65641,Longitude=-72.91408
fuel_tanks,City=Hartford,ConstructionType=Flexible\ Plastic,OverfillProtection=Audible\ Alarm,SpillProtection=Spill\ Bucket,Status=Currently\ In\ Use,SubstanceStored=Diesel EstimatedTotalCapacity=1000i,Latitude=41.75538,Longitude=-72.680618
fuel_tanks,City=BERLIN,ConstructionType=Flexible\ Plastic,OverfillProtection=Audible\ Alarm,SpillProtection=Spill\ Bucket,Status=Currently\ In\ Use,SubstanceStored=Gasoline EstimatedTotalCapacity=10000i,Latitude=41.646417,Longitude=-72.729937
...
But even with this flexibility there are time when we have to write our custom importer.
The Influx write is pretty flexible but our data has a present a few challenges that cannot be solved by the tool:
I ended writing a custom importer script (import_ust.py) to handle those cases with grace:
Finally, time to run a few queries
from(bucket: "USTS")
|> range(start: -100y)
|> filter(fn: (r) => r["_measurement"] == "fuel_tanks")
|> filter(fn: (r) => r._field == "estimated_total_capacity")
|> group(columns: ["status"])
|> count(column: "_value")
|> group(columns: ["_value", "status"], mode: "except")
|> sort(columns: ["_value"], desc: true)
And we can see we have 3 categories, 25,831 permanently closed tanks in CT.
So quite a bit of ‘permanently closed’, do they distribute differently over time?
from(bucket: "USTS")
|> range(start: -100y)
|> filter(fn: (r) => r._measurement == "fuel_tanks" and r._field == "estimated_total_capacity" and r.status == "permanently closed")
|> truncateTimeColumn(unit: 1y)
|> group(columns: ["city", "_time"])
|> count(column: "_value")
|> drop(columns: ["closure_time", "construction_type", "overfill_protection", "substance_stored", "s2_cell_id_token", "lat", "lon"])
|> group(columns: ["city"])
This will generate a group of tables (towns) over time (e use the truncateTimeColumn to drop date and time granularity in our series data):
Winner has 80 tanks on the graphic.
Gasoline? Oil? Let’s see what is currently in use, last 15 years:
from(bucket: "USTS")
|> range(start: -5y)
|> filter(fn: (r) => r._measurement == "fuel_tanks" and r._field == "estimated_total_capacity" and r.status == "currently in use")
|> group(columns: ["substance_stored"])
|> count(column: "_value")
|> drop(columns: ["city", "closure_tipe", "construction_type", "overfill_protection", "s2_cell_id", "lat", "lon", "_time", "spill_protection", "status"])
|> group()
|> sort(columns: ["_value"], desc: true)
And the results:
At the time of this writing (influx 2.4.0) this something that is labeled as experimental on Influxdb; native support of geolocation capabilities inside the database is a very useful feature, let’s explore it next.
If you want to take a quick taste of the geolocation capabilities, you could run the following on a notebook:
import "influxdata/influxdb/sample"
import "experimental/geo"
sampleGeoData = sample.data(set: "birdMigration")
sampleGeoData
|> geo.filterRows(region: {lat: 30.04, lon: 31.23, radius: 200.0}, strict: true)
First, a bit of geography:
Hartford, CT, USA Latitude and longitude coordinates are: 41.763710, -72.685097.
Count tanks within 30 miles (48.28032 Kilometers) radio from our Hartford coordinates, last 5 years:
import "experimental/geo"
from(bucket: "USTS")
|> range(start: -5y)
|> filter(fn: (r) => r._measurement == "fuel_tanks" or r._field == "lat" or r.field == "lon" and r.status == "currently in use")
|> geo.filterRows(region: {lat: 41.763710, lon: -72.685097, radius: 48.28032}, strict: false)
|> drop(columns: ["closure_time", "construction_type", "overfill_protection", "substance_stored", "s2_cell_id_token", "lat", "lon", "spill_protection", "s2_cell_id", "_time", "status", "closure_type"])
|> count(column: "estimated_total_capacity")
|> group(columns: ["city"])
|> group()
|> sort(columns: ["estimated_total_capacity"], desc: true)
And here are the partial results:
Let’s take the query we used to get how many tanks per substance type ara available for a given period time.
from influxdb_client import InfluxDBClient
# You can generate a Token from the "Tokens Tab" in the UI
token = "pP25Y9broJWTPfj_nPpSnGtFsoUtutOKsxP-AynRXJAz6fZzdhLCD4NqJC0eg_ImKDczbMQxMSTuhmsJHN7ikA=="
org = "Kodegeek"
bucket = "USTS"
with InfluxDBClient(url="http://raspberrypi:8086", token=token, org=org) as client:
query = """from(bucket: "USTS")
|> range(start: -15y)
|> filter(fn: (r) => r._measurement == "fuel_tanks" and r._field == "estimated_total_capacity" and r.status == "currently in use")
|> group(columns: ["substance_stored"])
|> count(column: "_value")
|> drop(
columns: [
"city",
"closure_type",
"construction_type",
"overfill_protection",
"s2_cell_id",
"lat",
"lon",
"_time",
"spill_protection",
"status",
],
)
|> group()
|> sort(columns: ["_value"], desc: true)"""
tables = client.query_api().query(query, org=org)
for table in tables:
for record in table.records:
print(record)
We can do then tweak the Python code and make it better:
Not so bad. Same can be done in other languages like Java, etc.