Strava Order Activity Feed by Date
Strava Activities Project - Data Preparation and Server Side Processing
Summary
This personal project displays my up-to-date Strava fitness activity information on a interactive mobile friendly data dashboard using Leaflet to display geographical data and Chart.JS to display graphical information. Data can be filtered and explored by using buttons, date selections, searches, and by selecting geographical data.
Strava is a fitness tracking mobile application which logs GPS data recorded during outdoor fitness activities. I have been logging rides, walks, runs, and hikes for years to the application and have accumulated over 400 total activities. Strava provides an API to access these records and additional data that the service calculates. I saw this as an opportunity to expand my server and client side development skills by accessing, processing, and presenting the data in a map/dashboard.
Historic Strava data were initially pulled from the Strava API and processed using Python, then a webhook subscription was created which updates my server when new activities are available for processing. Strava activity data are processed in Python using PostGIS functions to remove private areas and to simplify geometries to reduce file sizes, at the cost of spatial accuracy. Data are pre-calculated and served to the Leaflet map in the TopoJSON format to further reduce file sizes and server response times.
The description below discusses the Strava activity processing workflow and server-side processing scripts. You can view the Python files for this project in its GitHub project folder.
I have not yet finished the write-up for the client-side JavaScript/HTML aspect of this project, but the Javascript source code and HTML are available within my Flask Application folder.
Access Activities on Strava API - stravalib
The Python library stravalib provides useful functions to query the Strava API and parse results into Python objects. Instead of using the library's documentation for my server's authorization and authentication, I ended up following this guide on Medium Analytics Vidhya which was clearer and provided example code for refreshing the API access token. Initially I followed the guide's method for Pickling credentials, however I made scalable, in case I wanted to add more athletes in the future, and I removed dependence on local files by storing the credentials in a database.
This process uses SQLAlchemy to access authentication details stored in Postgres, generates and updates the access token if needed, then populates a authorized stravalib client instance for a athlete:
def getAuth(): """ Loads Strava client authentication details from Postgres and creates a authorized client instance. Checks if access token is expired, if so it is refreshed and updated. Returns ------- client. Stravalib model client instance. Contains access token to Strava API for the athlete, ID is hard coded for now. """ # Build empty stravalib client instance client = Client() # create db session session = Session() # Hard coded athlete id athleteID = 7170058 authDict = {} # Load tokens and expiration time from Postgres query = session.query(athletes).filter(athletes.athlete_id == athleteID) for i in query: authDict["Access_Token"] = i.access_token authDict["Expiration"] = i.access_token_exp authDict["Refresh_Token"] = i.refresh_token # Check if access token has expired, if so request a new one and update Postgres if time.time() > authDict["Expiration"]: refresh_response = client.refresh_access_token(client_id=int(os.environ.get('STRAVA_CLIENT_ID')), client_secret=os.environ.get('STRAVA_CLIENT_SECRET'), refresh_token=authDict["Refresh_Token"]) # Update access token and expiration date session.query(athletes).filter(athletes.athlete_id == athleteID). \ update({athletes.access_token: refresh_response['access_token'], athletes.access_token_exp: refresh_response['expires_at']}) # Commit update session.commit() # Set Strava auth details client.access_token = refresh_response['access_token'] client.refresh_token = authDict["Refresh_Token"] client.token_expires_at = refresh_response['expires_at'] else: # Access token is up-to-date, set client details client.access_token = authDict["Access_Token"] client.refresh_token = authDict["Refresh_Token"] client.token_expires_at = authDict["Expiration"] # Close out session session.close() return client
This process uses the Pickle file created in the one-time authentication and is called and edited for all requests to the Strava API:
from application.stravalib.client import Client import os import time import pickle def gettoken(): # Build empty stravalib client instance client = Client() # Load access token from the Pickle file with open(os.path.join(app.root_path, 'access_token.pickle'), 'rb') as f: access_token = pickle.load(f) # Check if access token has expired if time.time() > access_token['expires_at']: # Use client ID, secret, and refresh token to generate a new access token with Strava API refresh_response = client.refresh_access_token(client_id=os.getenv("STRAVA_CLIENT_ID"), client_secret=os.getenv("STRAVA_CLIENT_SECRET"), refresh_token=access_token['refresh_token']) # Open Pickle file and update with new access token with open(os.path.join(app.root_path, 'access_token.pickle'), 'wb') as f: pickle.dump(refresh_response, f) # Set new access token in client instance client.access_token = refresh_response['access_token'] # Set refresh token in client instance client.refresh_token = refresh_response['refresh_token'] # Set access token expiration time for client instance client.token_expires_at = refresh_response['expires_at'] # Access token is still valid, set token in client instance else: client.access_token = access_token['access_token'] client.refresh_token = access_token['refresh_token'] client.token_expires_at = access_token['expires_at'] return client
Now that I have full scope access to my account through the Strava API I can begin downloading activities. The API, and stravalib, offers a few different ways to download activities.
- List Athlete Activities - client.get_activities() using the after argument. Provides a list of activities after the argument date. However, this does not contain full activity details, only certain summary information is returned.
- Get Activity - client.get_activities() using a activity ID argument. Provides full activity details. However, the polyline coordinate information is encoded following the Google Encoded Polyline Algorithm Format.
- Get Activity Streams - client.get_activity_streams(). Allows access to the plain text recorded sensor data for a activity, recorded about every second. This option provides data access to a variety of the sensors on your phone and external connected devices.
There are also options to access routes, segments, efforts, and other account details.
My first goal was to download all my historic activities on Strava and add them to a Postgres/PostGIS database. Considering the API methods available, I decided on the following approach:
Use the List Athlete Activities after date method, set to before I started using Strava, to return the activity IDs for all my recorded activities, then generate a list using these IDs.
def getListIds(client, days): """ Gets a list of all Strava Activity IDs since (days) ago from Strava API. Parameters ---------- client. Stravalib model client object. Contains access token to Strava API for the user. days. Int. How many days to look back, queries all activities since this calculated date. Returns ------- List. List of int IDs of all strava activities for the user. """ # use current datetime and timedelta to calculate previous datetime after = datetime.today() - timedelta(days=days) # after = datetime(year=2019, month=8, day=1) actList = [] # Get all activities since after time and add to list acts = client.get_activities(after=after) for i in acts: actList.append(i.id) return actList
Iterate over activity ID list, passing each activity ID into Get Activity and Get Activity Streams. Parse results by structuring data, removing uninteresting/null details, calculating ancillary data, and combining GPS coordinate and time, provided as time since start of activity, into a PostGIS EWKT LINESTRINGM format. Even though I bring in the time information under a M-value, I am not using this time dimension in this project.
def getFullDetails(client, actId): """ Gets the full details of Strava activities using get_activity() to query flat data and get_activity_streams() to get GPS coordinates and times. Coordinates are formatted to be inserted in PostGIS following ST_GeomFromEWKT. Parameters ---------- client. Stravalib model client object. Contains access token to strava API for the user. actId. Int. Activity ID. Returns ------- Dict. Activity and coordinate information formatted to be inserted into Postgres/PostGIS. """ # Set logger to suppress debug errors, these messages aren't important and pollute the console Log = logging.getLogger() Log.setLevel('ERROR') # Stream data to get from activity streams types = ['time', 'latlng', 'altitude', 'velocity_smooth', 'grade_smooth', "distance", "heartrate", "cadence", "temp"] # Get activity details as a dictionary act = client.get_activity(actId).to_dict() # Get the activity stream details for the activity id stream = client.get_activity_streams(actId, types=types) # Get athlete ID directly from API call, instead of digging into the nested result provided by get_activity athId = client.get_athlete().id # Extract latlng and time information from activity stream latlng = stream['latlng'].data time = stream['time'].data lineStringData = [] wktList = [] # Iterate over time and latlng streams, combining them into a list containing sublists with lat, lng, time for i in range(0, len(latlng)): # Create new entry, swapping (lat, lon) to (lon, lat) then append time, provided as time since start of activity ## as datetime UTC (time is provided as time ## since start of the activity and is converted to datetime) # newEntry = [latlng[i][1], latlng[i][0], (starttime + timedelta(seconds=time[i])).timestamp()] newEntry = [latlng[i][1], latlng[i][0], time[i]] # Append data as nested list lineStringData.append(newEntry) # Take newEntry list and create a string with a space delimiter between list items, add to list of wkt # This formats data to be friendly with geoalchemy ST_GeomFromEWKT wktList.append(" ".join(str(v) for v in newEntry)) # print(wktList) # Format entire list to be friendly with geoalchemy ST_GeomFromEWKT sep = ", " wktStr = f"SRID=4326;LINESTRINGM({sep.join(wktList)})" # Add lat, lng, time as geom key to dict act['geom'] = lineStringData act['actId'] = actId act['geom_wkt'] = wktStr # Add athlete id to dict act['athlete_id'] = athId # Extend type to account for mtb and road rides act['type_extended'] = None # Calculate type of riding activity, using GearIDs if act['gear_id'] in ["b4317610", "b2066194"]: act['type_extended'] = "Mountain Bike" elif act['gear_id'] == "b5970935": act['type_extended'] = "Road Cycling" elif act['type'] == "Walk": act['type_extended'] = "Walk" elif act['type'] == "Run": act['type_extended'] = "Run" elif act['type'] == "Hike": act['type_extended'] = "Walk" # Wahoo Bolt provides additional data, check if populated, if not set to null wahooList = ["average_temp", "has_heartrate", "max_heartrate", "average_heartrate", "average_cadence"] for i in wahooList: if act[i] == "": act[i] = None # List of dictionary keys to remove, these are null or uninteresting remove_keys = ['guid', 'external_id', 'athlete', 'location_city', 'location_state', 'location_country', 'kudos_count', 'comment_count', 'athlete_count', 'photo_count', 'total_photo_count', 'map', 'trainer', 'commute', 'gear', 'device_watts', 'has_kudoed', 'best_efforts', 'segment_efforts', 'splits_metric', 'splits_standard', 'weighted_average_watts', 'suffer_score', 'embed_token', 'trainer', 'photos', 'instagram_primary_photo', 'partner_logo_url', 'partner_brand_tag', 'from_accepted_tag', 'segment_leaderboard_opt_out', 'highlighted_kudosers', 'laps'] # Iterate over dict keys, removing unnecessary/unwanted keys for key in list(act.keys()): if key in remove_keys: del (act[key]) return {"act": act, "stream": stream}
Next, insert full activity data into Postgres:
def insertOriginalAct(actDict): """ Inserts new activity into database, POSTed by Strava webhook update or by manually triggering process activity event route. Parameters ---------- actDict. Dict. Generated by StravaWebHook.handle_sub_update() or by getStravaActivities.processActs(). Returns ------- Nothing. Data are inserted into Postgres/PostGIS. """ insert = strava_activities(actID=actDict['actId'], upload_id=actDict['upload_id'], name=actDict['name'], distance=actDict['distance'], moving_time=actDict['moving_time'], elapsed_time=actDict['elapsed_time'], total_elevation_gain=actDict['total_elevation_gain'], elev_high=actDict['elev_high'], elev_low=actDict['elev_low'], type=actDict['type'], start_date=actDict['start_date'], start_date_local=actDict['start_date_local'], timezone=actDict['timezone'], utc_offset=actDict['utc_offset'], start_latlng=actDict['start_latlng'], end_latlng=actDict['end_latlng'], start_latitude=actDict['start_latitude'], start_longitude=actDict['start_longitude'], achievement_count=actDict['achievement_count'], pr_count=actDict['pr_count'], private=actDict['private'], gear_id=actDict['gear_id'], average_speed=actDict['average_speed'], max_speed=actDict['max_speed'], average_watts=actDict['average_watts'], kilojoules=actDict['kilojoules'], description=actDict['description'], workout_type=actDict['workout_type'], calories=actDict['calories'], device_name=actDict['device_name'], manual=actDict['manual'], athlete_id=actDict['athlete_id'], type_extended=actDict['type_extended'], avgtemp=actDict['average_temp'], has_heartrate=actDict['has_heartrate'], average_cadence=actDict["average_cadence"], average_heartrate=actDict['average_heartrate'], max_heartrate=actDict['max_heartrate'], geom=actDict['geom_wkt']) session = Session() session.add(insert) session.commit() session.close() application.logger.debug(f"New webhook update for activity {actDict['actId']} has been added to Postgres!")
Obfuscate Sensitive Locations - SQLAlchemy/GeoAlchemy2
Now I have the details and coordinates of every Strava activity on my account stored in my Postgres database ready to be served to a Leaflet application. This creates another problem however, since I stored the full coordinate information for each activity, any personal locations such as my home and homes of friends and family will be visible if I share the data publicly. Strava's solution to this issue is to allow users to create privacy zones, which are used to remove any sections of publicly visible activities that start or end within the zones. This solution is bypassed in my dataset since I queried the full coordinates of my activities using full scope access.
To maintain my privacy, I decided to create my own privacy zones in QGIS and store them within my database. A second, public friendly dataset, was generated using SQLAlchemy and GeoAlchemy2 PostGIS functions which removed all sections that crossed these privacy areas. Also, since the dataset from Strava contains a coordinate vertex about every second of recorded time, I simplified the data to reduce the overall number of vertices.
Here you can see the SQLAlchemy/GeoAlchemy2 ORM expressions used to initially populate the obfuscated public friendly table:
# import GeoAlchemy2 and extended SQLAlchemy functions from sqlalchemy import func as sqlfunc # import session factory from application Session # Table holding all geometry and attribute data from Strava API import strava_activities # Table holding masked, public friendly, data import strava_activities_masked def processActivitiesPublic(recordID): """ Processes Strava activity by simplifying geometry and removing private areas. This prepares the activity to be shared publicly on a Leaflet map. These functions greatly reduce the number of vertices, reducing JSON file size, and process the data to be topoJSON friendly, preventing geometries from failing to be converted. Parameters ---------- recordID. Int. Strava activity record ID. Returns ------- Nothing. Data are processed and committed to PostgresSQL/PostGIS database. """ # Create database session session = Session() simplifyFactor = 15 geometricProj = 32610 webSRID = 4326 gridSnap = 3 collectionExtract = 3 # Create CTE to query privacy zone polygons, combine them, extract polygons, and transform to geometricProj privacy_cte = session.query(sqlfunc.ST_Transform(sqlfunc.ST_CollectionExtract(sqlfunc.ST_Collect(AOI.geom), collectionExtract), geometricProj).label("priv_aoi")).filter(AOI.privacy == "Yes").cte("privacy_aoi") # Processes all records in the strava_activities table, used for initial masked table setup only privacyClipQuery = session.query(strava_activities.actID, sqlfunc.ST_AsEWKB( sqlfunc.ST_Transform( sqlfunc.ST_MakeValid( sqlfunc.ST_Multi( sqlfunc.ST_Simplify( sqlfunc.ST_SnapToGrid( sqlfunc.ST_Difference( sqlfunc.ST_SnapToGrid(sqlfunc.ST_Transform( strava_activities.geom, geometricProj), nonNodedSnap), privacy_cte.c.priv_aoi) , gridSnap), simplifyFactor), )), webSRID))) for i in privacyClipQuery: session.add(strava_activities_masked(actID=i[0], geom=i[1])) session.commit() session.close()
The above ORM select query is equivalent to the following PostgreSQL/PostGIS SQL select query:
WITH privacy_cte as ( SELECT ST_Transform(ST_CollectionExtract(ST_Collect("AOI".geom), 3), 32610) as priv_aoi FROM "AOI" where "AOI".privacy = 'Yes' ) SELECT strava_activities."actID", ST_AsEWKB(ST_Transform(ST_MakeValid(ST_Multi(ST_Simplify(ST_SnapToGrid(ST_Difference(ST_SnapToGrid(ST_Transform(strava_activities.geom, 32610), 0.0001), privacy_cte.priv_aoi), 5), 15))), 4326)) FROM strava_activities, privacy_cte;
This query does the following:
- Create a common table expression (CTE) to select privacy zones geometry. This CTE is used to create a single multi-part polygon containing all privacy zones. This ensures that ST_Difference only calculates the difference between each activity and the privacy zones only once. If the privacy zones are not combined, then the difference between each privacy zone record and the activity would be calculated, resulting in duplicated results.
- Select AOI polygons flagged as privacy zones.
- Combine polygons into a single multi-part polygon contained inside a geometry collection (ST_Collect).
- Extract multi-polygon from geometry collection (ST_CollectionExtract). Even though this collection only contains the multi-polygon, it still needs to be extracted.
- Transform geometry to the projected coordinate system geometricProj (ST_Transform). Using a projected coordinate allows for faster geometric calculations and allows for meters to be used in PostGIS function parameters, which use the geometry's unit system.
- Select strava_activities activity linestring geometry based on Record ID and transform (ST_Transform) to geometricProj.
- Snap activity linestrings to a 0.0001m grid (ST_SnapToGrid, variant 3). This solves a non-node intersection error when running ST_Difference. See this StackExchange thread for an explanation for this problem and solution
- Calculate difference (ST_Difference) between activity linestring and privacy zone CTE result. ST_Difference subtracts geometry B from A, removing the vertices from A that are within B and segments that touch B.
- Snap activity linestring vertices to a 5m grid(ST_SnapToGrid, variant 3). This removes some messy areas by combining and removing excess vertices while also reducing resulting geometry memory/file size. This also solves geometric errors when exporting data to a topoJSON format. However, resulting linestring geometries have a step-shaped appearance resembling the grid.
- Simplify activity linestring with a 15m tolerance (ST_Simplify). This further removes messy areas and bends in the linestring by removing vertices to create longer straight line segments. This provides large reductions in resulting geometry memory/file sizes and mitigates the step-shaped results created by ST_SnapToGrid.
- Convert linestrings to multi-linestrings (ST_Multi). Geometries in the strava_activities table are stored as linestrings since activity data provided by Strava are contiguous and don't need to be stored in a multi-part format. However, ST_Difference may create multi-linestrings that must be stored as such, so all geometries are converted to this format.
- Fix any invalid activity linestring geometries (ST_MakeValid) that were generated during prior processing.
- Transform activity linestring geometry (ST_Transform) back into WGS 1984, SRID 4326. WGS 1984 is best this project since its required for display in Leaflet.
- Convert linestring geometry representation to Extended Well Known Binary (ST_AsEWKB). This ensures that data can be be easily inserted into the strava_activities_masked table.
- Query Activity ID of strava_activities record. Will be inserted as a foreign in strava_activities_masked table.
Process Activity Streams - GeoAlchemy2 & Boto3
Next, its time to query the Strava Activity Stream data. These data are recorded every second and contain time, distance, elevation, latlng, and external sensor data. This type of data lends itself well to a tabular format, and I wanted these data available in a CSV such that they can be viewed in profile over the course of the activity.
The full details of the activity are passed into this function, or are queried if not provided. Recently I acquired a bike computer which records additional data that is made available through the API, because of this I query all these stream additional details for all activities, including those which were not recorded with the computer. If the stream data are absent then the API returns nothing for that particular stream type. The following using the results from the getFullDetails function shown above:
def generateAndUploadCSVStream(client, actID, activity=None): """ Generates and uploads a privacy zone masked Strava Stream CSV. @param client: stravalib client instance with valid access token @param actID: Int. Activity ID of Strava activity to process @param activity: Dictionary. Optional. Dictionary of full Strava Activity details, generated if not provided @return: Nothing. Uploads file to S3 Bucket """ if not activity: # Get all activity details for newly created activity, including stream data activity = getFullDetails(client, actID) # Create in-memory buffer csv of stream data csvBuff = StravaAWSS3.writeMemoryCSV(activity["stream"]) # Get WKT formatted latlng stream data wktStr = formatStreamData(activity["stream"]) # Get list of coordinates which cross privacy areas, these will be removed from the latlng stream CSV data removeCoordList = DBQueriesStrava.getIntersectingPoints(wktStr) # Trim/remove rows from latlng CSV stream which have coordinates that intersect the privacy areas trimmedMemCSV = trimStreamCSV(removeCoordList, csvBuff) # Upload trimmed buffer csv to AWS S3 bucket StravaAWSS3.uploadToS3(trimmedMemCSV, activity["act"]["actId"])
Next, the activity stream data are written into a CSV stored in the memory buffer:
def writeMemoryCSV(streamData): """ Converts activity stream data dictionary to a In-memory text buffer, avoids needing to write a local file since data will be uploaded up to S3. :param streamData: Dict. Formatted Strava Stream Data with lat/longs removed :return: In-memory text buffer. Activity stream CSV """ # Create in-memory text buffer memOutput = StringIO() dataDict = {} # stream types to include, latlngs in privacy zones will be removed csvTypes = ['time', 'latlng', 'altitude', 'velocity_smooth', 'grade_smooth', "distance", "heartrate", "cadence", "temp"] # Extract data from stream dictionary for streamType in csvTypes: try: dataDict[streamType] = streamData[streamType].data except: application.logger.debug(f"The stream type {streamType} doesn't exist, skipping") # Iterate over latlngs, which is a list with lat lng, converting to string of lat,lng for c, i in enumerate(dataDict['latlng']): dataDict['latlng'][c] = ",".join(str(x) for x in i) # See: https://stackoverflow.com/questions/23613426/write-dictionary-of-lists-to-a-csv-file # open buffer and populate with csv data writer = csv.writer(memOutput) # Write column names writer.writerow(dataDict.keys()) # Each key:value(list) in dictionary is a column, write into CSV # I have no idea how this works, see link above for description writer.writerows(zip(*dataDict.values())) return memOutput
This helper function is used to format the point coordinates into a Extended Well-Known Text string:
def formatStreamData(stream): """ Formats Strava Activity Stream latlng data into a EWKT string. The string is constructed using string manipulation, consider finding a library which can convert a list of coordinates into EWKT or WKT. @param stream: Strava Activity Stream with latlng data @return: String. EWKT representation of Strava Activity Stream data. """ # Pull out latlngs latlng = stream['latlng'].data # Format first part of EWKT LINESTRING String, in 4326, WGS1984 wktStr = f"SRID=4326;LINESTRING(" # Iterate over latlng records for c, i in enumerate(latlng): # Split based on comma lat, lng = latlng[c].split(",") # Make string of new lat lng value newEntry = f"{lat} {lng}," # Add new record to existing string wktStr += newEntry # Remove last comma wktStr = wktStr[:-1] # Close out wktStr wktStr += ")" return wktStr
The previously generate EWKT string is used in a GeoAlchemy2 POSTGIS query to determine which point coordinates reside within privacy areas:
def getIntersectingPoints(wktStr): """ Takes an EWKT string of a Strava Activity Stream's latlngs and returns a list of float points which reside within the privacy areas. @param wktStr: String. EWKT representation of Strava Activity Stream latlngs @return: List of strings. Points are returned as WGS 1984 coordinate strings in the format lon,lat """ # geometricProj = 32610 collectionExtract = 3 # Open session session = Session() # Get coordinates from within privacy zones try: # Create a labeled common table expression to query privacy zones geometries collected into a single multi-polygon privacy_cte = session.query( sqlfunc.ST_CollectionExtract( sqlfunc.ST_Collect(AOI.geom), collectionExtract).label("ctelab")).filter( AOI.privacy == "Yes").cte() # Take provided EWKT string and convert to GeoAlchemy geometry lineString = sqlfunc.ST_GeomFromEWKT(wktStr) # Get a list of points from the linestring which fall inside the privacy zone # ST_DumpPoints provides a point geometry per iterative loop which is converted to a text representation using As_Text pointQuery = session.query(sqlfunc.ST_AsText(sqlfunc.ST_DumpPoints(sqlfunc.ST_Intersection(lineString, privacy_cte.c.ctelab)).geom)) coordinateList = [] for i in pointQuery: # strip out the WKT parts of the coordinates, only want list of [lon,lat] coordinateList.append(formatPointResponse(i)) finally: session.close() return coordinateList
These overlapping points, and their corresponding data, are removed from the buffer CSV:
def trimStreamCSV(coordList, memCSV): """ Trims out all records from the Strava stream CSV that fall within privacy zones, ensuring that the stream data do not contain reveal locations within sensitive areas. Coordinates are included in the stream data such that they can be used to draw point markers on the map on chart mouseover @param coordList: List. Coordinates which fall within privacy zones @param memCSV: StringIO CSV. Contains original, unaltered activity stream details @return: StringIO CSV. Memory CSV with sensitive locations removed """ # see https://stackoverflow.com/a/41978062 # Reset seek to 0 for memory CSV, after writing it the file pointer is still at the end and must be reset memCSV.seek(0) # Open original memory csv with a reader reader = csv.reader(memCSV) # Create new memory CSV to hold results trimmedMemOutput = StringIO() # Create csv writer on memory csv trimmedWriter = csv.writer(trimmedMemOutput) # Iterate over original CSV for c, row in enumerate(reader): # Write header row if c == 0: trimmedWriter.writerow(row) else: # split row into [lat, lng] coord = row[1].split(",") # Check if lat or long exist in the coordinate list latCheck = any(coord[0] in x for x in coordList) lngCheck = any(coord[1] in x for x in coordList) # If neither lat or long are within a privacy zone, write the entire row into the trimmed csv if not latCheck or not lngCheck: trimmedWriter.writerow(row) return trimmedMemOutput
Finally, the buffer CSV is uploaded to a S3 Bucket where it can be shared publicly (currently the Flask Application grants temporary access to individual activities as needed):
def connectToS3(): """ Establish connection to AWS S3 using environmental variables. :return: S3 service client. """ s3_client = boto3.client(service_name='s3', aws_access_key_id=os.getenv("BOTO3_Flask_ID"), aws_secret_access_key=os.getenv("BOTO3_Flask_KEY")) return s3_client def uploadToS3(file, actID=None): """ Uploads file to S3 Bucket. This bucket is not public but all activities are accessible to the public through the API with pre-signed temporary URLs. If the Act ID is none then the input is the TopoJSON file. :param file: Buffer/memory file to be uploaded, either JSON or CSV. :param actID: Strava Activity ID, used to name uploaded file, if empty then TopoJSON is assumed, which has a static name :return: Nothing, file is uploaded """ # Get bucket details from environmental variable bucket = os.getenv("S3_TRIMMED_STREAM_BUCKET") # Establish connection to S3 API conn = connectToS3() try: # conn.put_object(Body=memCSV.getvalue(), Bucket=bucket, Key=fileName, ContentType='application/vnd.ms-excel') if actID: # Add in-memory buffer csv to bucket # I think using getvalue and put_object on StringIO solves an issue with the StringIO object not being # compatible with other boto3 object creation methods see: fileName = f"stream_{actID}.csv" conn.put_object(Body=file.getvalue(), Bucket=bucket, Key=fileName) else: # Add in-memory buffer TopoJSON file to bucket, file name is static fileName = "topoJSONPublicActivities.json" conn.put_object(Body=file, Bucket=bucket, Key=fileName) except Exception as e: application.logger.error(f"Upload to S3 bucket failed in the error: {e}") finally: # Close in-memory buffer file, removing it from memory file.close()
Prepare Data for Leaflet - TopoJSON
GeoJSON is a standard and convenient format for transferring geospatial data over the web, especially since its supported by Leaflet. However, its not very efficient in storing data, largely because it stores a full list of coordinates and contains unnecessary spacing. Currently, my masked GeoJSON dataset exports out to a 2.8 MB JSON file, which is a fairly large file to transfer on every page load. Fortunately, there's the TopoJSON format that in addition to encoding a topology, which isn't useful for this multi-linestring dataset, stores coordiantes as deltas from an origin coordinate, resulting in a large reduction of stored information. Using the Topojson Python library allowed me to reduce the JSON filesize down to about 1.3 MB, still large but us under half the original filesize. While other encoding techniques are available, this format meets the project's needs since it not only reduces filesize and is easily usable in Leaflet, it also retains all attribute information which will be needed in the web map/viewer.
Process to generate TopoJSON:
def createStravaPublicActTopoJSON(): """ Creates a in memory TopoJSON file containing all database stored Strava Activities. This file will be uploaded to a S3 Bucket, replacing the existing file. A pre-generated file is used to speed up response time, as generating the file may take a few seconds. This function is called whenever a new subscription update adds a new activity to the database or when triggered on the admin page. Returns ------- In memory TopoJSON file. """ # Create Postgres connection session = Session() # Query geom as GeoJSON and other attribute information query = session.query(sqlfunc.ST_AsGeoJSON(strava_activities_masked.geom, 5), strava_activities.name, strava_activities.actID, strava_activities.type, strava_activities.distance, strava_activities.private, strava_activities.calories, strava_activities.start_date, strava_activities.elapsed_time, strava_activities.moving_time, strava_activities.average_watts, strava_activities.start_date_local, strava_activities.total_elevation_gain, strava_activities.average_speed, strava_activities.max_speed, strava_activities.type_extended, strava_activities.has_heartrate, strava_activities.average_cadence, strava_activities.max_heartrate, strava_activities.average_heartrate, strava_gear.gear_name) \ .join(strava_activities_masked.act_rel) \ .join(strava_activities.gear_rel, isouter=True) \ .order_by(strava_activities.start_date.desc()) features = [] for row in query: # Build a dictionary of the attribute information propDict = {"name": row.name, "actID": row.actID, "type": row.type, "distance": round(row.distance), "private": row.private, "calories": round(row.calories), "startDate": row.start_date_local.isoformat(), "elapsed_time": row.elapsed_time.seconds, "total_elevation_gain": round(row.total_elevation_gain), "average_speed": round(row.average_speed, 1), "max_speed": row.max_speed, "gear_name": row.gear_name, "type_extended": row.type_extended, "moving_time": row.moving_time.seconds, "average_watts": row.average_watts,"has_heartrate":row.has_heartrate, "average_cadence":row.average_cadence, "max_heartrate":row.max_heartrate, "average_heartrate":row.average_heartrate} # Take ST_AsGeoJSON() result and load as geojson object geojsonGeom = geojson.loads(row[0]) # Build the feature and add to feature list features.append(Feature(geometry=MultiLineString(geojsonGeom), properties=propDict)) session.close() # Build the feature collection result feature_collection = FeatureCollection(features) # Create local topoJSON file of geoJSON Feature Collection. Don't create a topology, doesn't matter for a polyline # and prequantize the data, this reduces file size at the cost of processing time. # prequantize 1e7 is used over default, 1e6, to avoid errors in which data were placed in the South Pacific Ocean return tp.Topology(feature_collection, topology=False, prequantize=10000000).to_json()
This script queries the masked activities as GeoJSON, loads and parses each record into a GeoJSON MultiLineString Feature, combines all records into a Geometry Collection, and finally creates TopoJSON file which is uploaded to an S3 Bucket using the upload function shown above.
The Topology function is very picky about incoming geometries and kept removing records without a explanation as to why, even though they passed PostGIS ST_MakeValid and ST_IsValid. All original, non-masked, GeoJSON records converted properly, I assume that ST_Difference caused geometries to break during conversion. The additional processing steps during masking, in particular ST_SnapToGrid, appeared to have resolved these issues. However, I assume they may need more fine tuning to ensure that no geometries fail to be converted to TopoJSON in the future.
Get New Activities - Strava API Webhook
Now that all my data have been processed and made available to the application, I need to keep the dataset up-to-date with newly added activities. To accomplish this I created a Strava webhook/Push subscription using stravalib. This enables my server to receive updates from the Strava API whenever I add a new activity, without needing to poll the API for changes. A update is sent whenever a new activity is added, an existing activity's title, type, or privacy is changed, or if the account revokes access to the application. As this is my own account, I do not handle requests to revoke application authorization. Also note that new activity updates include activity IDs only, its my server's responsibility to call the API for any further details.
While stravalib has functions dedicated to webhooks, they are minimally documented with no examples provided. Also, as of the time I started work on this project, the version of stravalib on PyPI, 0.10.2, did not support the newest version of the Strava API. Fortunately, the stravalib team has an updated version on their Github page which supports it.
Here is the conceptual process of creating a new webhook subscription:
The server issues a subscription creation POST request to the Strava API, containing the Client ID and Secret in addition to an optional user/server created verification token and a required callback URL configured to handle GET and POST requests. The verification token is used by the server to confirm that further setup requests are being sent by Strava. The creation request can be created in stravalib using the following (This is taken from an administration page I created and is called from a HTML POST request):
def addwebhooksub(): """ Adds a new Strava webhook subscription to the database and Strava API. Kicks off callback verification process. Called by Strava Activity admin page inputs. """ # Get POST request info # athID = int(request.form['athID']) # callbackurl = str(request.form['callbackURL']) # Generate 14 character verify token string verifytoken = secrets.token_hex(7) # Insert token into database, will be updated if subID if successful, otherwise row will be deleted DBQueriesStrava.insertVerifyToken(verifytoken) # Get Strava API access credentials client = OAuthStrava.getAuth() try: # Send request to create webhook subscription, will be given the new subscription ID in response application.logger.debug(f"Callback url is {os.getenv('FULL_STRAVA_CALLBACK_URL')}") response = client.create_subscription(client_id=os.getenv("STRAVA_CLIENT_ID"), client_secret=os.getenv("STRAVA_CLIENT_SECRET"), callback_url=os.getenv('FULL_STRAVA_CALLBACK_URL'), verify_token=verifytoken) application.logger.debug(f"New sub id is {response.id}, updating database") # Update database with new sub id DBQueriesStrava.updateSubId(response.id, verifytoken) return Response(status=200) except Exception as e: DBQueriesStrava.deleteVerifyTokenRecord(verifytoken) return Response(status=400)
The above script kicks off the process of creating a new webhook subscription, using Client information, a pre-generated verification token, and a full callback URL address. After the subscription has been successfully created, the callback functions are shown further below, the stravalib "create_subscription" function will issue a GET request to the Strava API to get the ID of the newly created subscription. This ID is used to update the subscription entry in my database and is used to verify that webhook subscription updates are from Strava. The following updates the active subscription with the ID, and since an active webhook includes all athletes an application is authorized to access, this record's foreign key is applied to all athletes:
def updateSubId(subId, verifytoken): """ Updates webhook subscriptions table with the new subscription id provided by Strava then updates all athletes with the new subID foreign key. @param subId: Int. Webhook subscription ID provided by Strava API @param verifytoken: String. Script generated verification token @return: Nothing. Database is updated """ session = Session() try: # Update recently created record which only has the verify token populated session.query(webhook_subs.verify_token == verifytoken).update({webhook_subs.sub_id: subId, webhook_subs.activesub: "Yes"}) session.commit() # Get the primary key from the new webhook subscription record = session.query(webhook_subs.verify_token == verifytoken).first() # Update all athletes with the new subscription entry foreign key session.query(athletes).update({athletes.sub_id: record.id}) session.commit() session.close() except Exception as e: application.logger.debug(f"Update Strava athlete sub Id failed with the exception: {e}")
During initial setup a GET request is sent to the server's callback URL and after successful setup POST requests will be issued when specific account updates occur. The following code is used to handle Strava API webhook subscription GET and POST requests:
@stravaActDashAPI_BP.route(os.environ.get("STRAVA_CALLBACK_URL"), methods=['GET', 'POST']) def subCallback(): """ Strava subscription callback URL. Returns ------- GET request: JSON, echoed Strava challenge text. POST request: Success code if data are successfully added to Postgres/PostGIS. Strava must receive a 200 code in response to POST. """ res = WebHookFunctionsStrava.handleSubCallback(request) if res: return res else: return Response(status=200) def handleSubCallback(request): """ Handles requests to Strava subscription callback URL. GET: Webhoook Subscription Creation Process: CallbackURL is sent a GET request containing a challenge code. This code is sent back to requester to verify the callback. The initial request to create a new webhook subscription is then provided with verification and the new subscription ID. POST: Webhook subscription update message. Sent when a activity on a subscribed account is created, updated, or deleted, or when a privacy related profile setting is changed. All update messages are inputted into Postgres. Currently, only activity creation events are handled, additional development is needed to handle other events. Returns ------- GET request: JSON, echoed Strava challenge text. POST request: Success code if data are successfully added to Postgres/PostGIS. Strava must receive a 200 code in response to POST. """ # Get application access credentials client = OAuthStrava.getAuth() # Check if request is a GET callback request, part of webhook subscription process if request.method == 'GET': # Extract challenge and verification tokens callBackContent = request.args.get("hub.challenge") callBackVerifyToken = request.args.get("hub.verify_token") # Form callback response as dict callBackResponse = {"hub.challenge": callBackContent} # Check if verification tokens match, i.e. if GET request is from Strava if DBQueriesStrava.checkVerificationToken(callBackVerifyToken): # Verification succeeded, return challenge code as dict # Using Flask Response API automatically converts it to JSON with HTTP 200 success code return callBackResponse else: # Verification failed, raise error raise ValueError('Strava token verification failed, no match found.') # POST request containing webhook subscription update message, new activity or other change to Strava account elif request.method == 'POST': try: # Convert JSON body to dict callbackContent = json.loads(request.data, strict=False) # Call function to handle update message and process new activity, if applicable handleSubUpdate(client, callbackContent) except Exception as e: application.logger.error(f"Strava subscription update failed with the error {e}")
Now that the subscription has been created and callbacks are handled, update messages can be processed. The following code processes the Strava subscription update messages by inserting them into Postgres then triggers a threaded function for activity processing, if applicable:
def handleSubUpdate(client, updateContent): """ Handles Strava webhook subscription update. This function is called by a valid Strava POST request to the webhook subscription callback URL. Parameters ---------- client. Stravalib model client object. Contains access token to strava API for the user. updateContent. Dict. POST request JSON data formatted by Flask as a dict. Returns ------- Nothing. Data are inserted into Postgres/PostGIS. """ # Parse update information into a model using stravalib update = client.handle_subscription_update(updateContent) # Verify that the athlete(s) and subscription ID contained in the message are in Postgres if DBQueriesStrava.checkAthleteAndSub(update.owner_id, update.subscription_id): application.logger.debug("Sub update from Strava appears valid") # Insert subscription update message details into Postgres DBQueriesStrava.insertSubUpdate(update) # Verify that the update is a activity creation event if update.aspect_type == "create" and update.object_type == "activity": application.logger.debug("This is a activity create event, creating thread to process activity") try: # Create a thread to handle async processing of the activity and its derivatives # Threading allows the activity to long process with a quick 200 code to be sent to the Strava API Thread(target=APIFunctionsStrava.singleActivityProcessing, args=(client, update.object_id)).start() except Exception as e: application.logger.error(f"Creating a thread to process new activity failed with in the error: {e}") elif update.aspect_type == "update" and update.object_type == "activity": application.logger.debug("This is a activity update event, updating existing record") # Update existing activity title DBQueriesStrava.updateExistingActivity(update) else: # Write logic to handle delete events application.logger.debug("Sub update message contains an delete event, skipping request") pass else: application.logger.debug("POST request is invalid, user ID or subscription ID don't match those in database!")
Insert subscription update details into Postgres:
def insertSubUpdate(content): """ Inserts Strava webhook subscription data into Postgres database. This information will be used to get full activity information from another query. Parameters ---------- content. Subscription Update object of Strava webhook update generated by Stravalib Returns ------- Nothing. Updates database. """ # Verify is activity title is in update data, if not set to None. Some activities may have empty titles. if "title" in content.updates.keys(): title = content.updates['title'] application.logger.debug(f"Title of new activity is {title}") else: title = None session = Session() insert = sub_update(aspect=content.aspect_type, event_time=datetime.fromtimestamp(content.event_time.timestamp), object_id=content.object_id, object_type=content.object_type, owner_id=content.owner_id, subscription_id=content.subscription_id, update_title=title) session.add(insert) session.commit() session.close() application.logger.debug(f"New webhook update has been added to Postgres!")
The Strava API requires a success response within 2 seconds or else it will attempt 2 more requests before timing out. Since my process currently exceeds this time allowance I needed a way to process asynchronously. I did not want to spend the time setting up background processing and task queuing, instead I decided to go with a multithreaded approach, which allows the data processing to occur concurrently. While not truly asynchronous, this enables Flask to return a 200 success code while still working on the threaded process:
def singleActivityProcessing(client, actID): """ Processes a single Strava Activity by placing the full activity in the database, making a simplified and masked public version, and by creating a privacy masked stream CSV which is added to a S3 Bucket. Finally a TopoJSON of the public activities is generated and uploaded to the S3 Bucket. @param client: stravalib client instance with valid access token @param actID: Int. ID of Strava Activity to be processed @return: Email. Message states if process was successful or failed """ try: application.logger.debug("Getting full activity details") # Get all activity details for newly created activity, including stream data activity = getFullDetails(client, actID) application.logger.debug("Inserting activity details") # Insert original, non-masked, coordinates and attribute details into Postgres/PostGIS DBQueriesStrava.insertOriginalAct(activity['act']) # Calculate masked, publicly sharable, activities and insert into Postgres masked table application.logger.debug("Processing and inserting masked geometries") DBQueriesStrava.processActivitiesPublic(activity["act"]["actId"]) # Handle CSV stream processing generateAndUploadCSVStream(client, actID, activity) # Create topojson file topoJSON = DBQueriesStrava.createStravaPublicActTopoJSON() # Upload topoJSON to AWS S3 StravaAWSS3.uploadToS3(topoJSON) application.logger.debug("Strava activity has been processed!") except Exception as e: application.logger.error(f"Handling and inserting new webhook activity inside a thread failed with the error {e}") # Raise another exception, this will signal the route function to return an error 500 raise()
Final Thoughts
Now a process flow is setup to automatically process new Strava activities to be consumed by a public facing dashboard using Leaflet to display geographic data. The most recent version of my dashboard is visible at the top of this page, and a full page dashboard is available here. I haven't had the motivation to finish the writeup for the client-side HTML/JavaScript for this project, however the GitHub Repo project folder can be found here.
This was a fun, challenging, and rewarding project to work on. I was able to get my first experience working with GeoAlchemy and PostGIS functions to manipulate spatial data. I also learned, through much trial and error, that spatial datasets need to be aggregated for some PostGIS functions to return desired results.
Source: https://leavittmapping.com/projectpages/stravamapserverside
0 Response to "Strava Order Activity Feed by Date"
Post a Comment