Incremental Updates of Polygon Events
Goal: retrieve only the events that have changed (added, updated, or deleted) since the last run, without downloading the entire dataset each time.
Context: The AXSMarine API exposes two GraphQL endpoints:
polygonEventsā the active events.deletedPolygonEventsā the events that have been removed.
1. Prerequisites
1ļøā£ | Access Token | Obtain your bearer token from AXSMarine and replace the placeholder |
2ļøā£ | Python 3.9+ | The script uses |
3ļøā£ | Persistence | A local CSV cache is used; you can swap it for a database or cloud storage if desired. |
4ļøā£ | Timestamp handling |
|
2. API Architecture
# 1. polygonEvents
type Query {
polygonEvents(
first: Int,
last: Int,
before: String,
after: String,
isOpen: Boolean,
polygonIds: [Int],
polygonTypes: [String],
entryDate: RangeDate,
outDate: RangeDate,
entryDraft: RangeFloat,
entryHeading: RangeInt,
entrySpeed: RangeFloat,
outDraft: RangeFloat,
outHeading: RangeInt,
outSpeed: RangeFloat,
duration: RangeInt,
lastUpdated: RangeDate,
vesselBuilt: RangeDate,
vesselBeam: RangeFloat,
vesselLoa: RangeFloat,
vesselDraft: RangeFloat,
vesselDwt: RangeInt,
vesselTeu: RangeInt,
vesselCubic: RangeInt,
vesselIds: [Int],
vesselImos: [Int],
vesselSegments: [String],
vesselTypes: [String],
vesselSubtypes: [String],
vesselLinerServiceIds: [Int],
vesselLinerRegionIds: [Int]
): PolygonEventCursorConnection
# 2. deletedPolygonEvents
type Query {
deletedPolygonEvents(
first: Int,
last: Int,
before: String,
after: String,
polygonIds: [Int],
polygonTypes: [String],
deletedAt: RangeDate,
vesselIds: [Int],
vesselImos: [Int],
vesselSegments: [String],
vesselTypes: [String],
vesselSubtypes: [String]
): deletedPolygonEventCursorConnection
}Pagination ā Each response returns a pageInfo (endCursor, startCursor) and a list of edges. Use after to iterate over the entire set.
3. Workflow Overview
- Load the local cache (or initialize an empty dataframe).
- Determine the update window:
- If a file exists, the last run is its modification time minus 24āÆh.
- Otherwise, this is the first run, and weāll request all events.
- Query
polygonEventswith filterspolygonTypes,entryDate,lastUpdated. - Loop over pages until the number of records is <
pageSize. - Merge the new data into the dataframe and deduplicate on
_id. - Query
deletedPolygonEventsfor the same window (deletedAt). - Remove rows that
_idappear in the deletion list. - Save the final DataFrame to a CSV file.
4. Complete Python Example
Tip: Store your token in a .env file or use environment variables; the script uses it in plain text for demonstration.
#!/usr/bin/env python3
import os
import logging
import requests
import pandas as pd
from datetime import datetime, timedelta
# --------------------------------------------------------------------
# 1ļøā£ Configuration
# ------------------------------------------------------------------
TOKEN = "INSERT MY TOKEN" # ā Replace with your token
HEADERS = {"Authorization": f"Bearer {TOKEN}"}
BASE_URL = "https://apihub.axsmarine.com/global/events/v1"
# Example: only retrieve "shipyard" events.
POLYGON_TYPES = ["shipyard"]
ENTRY_START_DATE = "2025-01-01" # ISOā8601
CSV_FILE = "2025_shipyard_events.csv"
# ------------------------------------------------------------------
# 2ļøā£ Load cache (if present)
# ------------------------------------------------------------------
last_execution_time: str | None = None
data = pd.DataFrame()
if os.path.exists(CSV_FILE):
# Ensure we don't miss events updated the same day.
last_exec_ts = datetime.fromtimestamp(os.path.getmtime(CSV_FILE)) - timedelta(days=1)
last_execution_time = last_exec_ts.strftime("%Y-%m-%d")
data = pd.read_csv(CSV_FILE)
# ------------------------------------------------------------------
# 3ļøā£ Generic GraphQL query helper
# ------------------------------------------------------------------
def gql_query(query: str, variables: dict) -> dict:
"""Execute a GraphQL request and return the JSON payload."""
logging.debug(f"Request variables: {variables}")
resp = requests.post(BASE_URL, json={"query": query, "variables": variables}, headers=HEADERS)
resp.raise_for_status()
return resp.json()["data"]
# ------------------------------------------------------------------
# 4ļøā£ Retrieve active polygon events
# ------------------------------------------------------------------
def fetch_polygon_events() -> pd.DataFrame:
"""Fetch active events, paginating until exhaustion."""
page_size = 5000
variables = {
"pageSize": page_size,
"polygonTypes": POLYGON_TYPES,
"entryDate": {"from": ENTRY_START_DATE},
"lastUpdated": {"from": last_execution_time} if last_execution_time else None,
}
query = """
query polygonEvents(
$pageSize: Int,
$afterCursor: String,
$polygonTypes: [String],
$entryDate: RangeDate,
$lastUpdated: RangeDate
) {
polygonEvents(
first: $pageSize,
after: $afterCursor,
polygonTypes: $polygonTypes,
entryDate: $entryDate,
lastUpdated: $lastUpdated
) {
pageInfo { endCursor }
edges {
node {
_id
polygon { _id, name }
vessel { _id, imo, name, type }
entryAis { time }
outAis { time }
lastUpdated
}
}
}
}
"""
all_rows = []
cursor = None
while True:
variables["afterCursor"] = cursor
payload = gql_query(query, variables)
edges = payload["polygonEvents"]["edges"]
rows = pd.json_normalize([e["node"] for e in edges])
all_rows.append(rows)
if len(edges) < page_size:
break
cursor = payload["polygonEvents"]["pageInfo"]["endCursor"]
if all_rows:
df = pd.concat(all_rows, ignore_index=True)
return df
return pd.DataFrame()
# ------------------------------------------------------------------
# 5ļøā£ Remove deleted events
# ------------------------------------------------------------------
def purge_deleted_events(df: pd.DataFrame) -> pd.DataFrame:
"""Delete rows that match events reported as deleted."""
if last_execution_time is None:
# First run ā nothing to purge
return df
page_size = 1000
variables = {
"pageSize": page_size,
"polygonTypes": POLYGON_TYPES,
"deletedAt": {"from": last_execution_time},
}
query = """
query deletedPolygonEvents(
$pageSize: Int,
$afterCursor: String,
$polygonTypes: [String],
$deletedAt: RangeDate
) {
deletedPolygonEvents(
first: $pageSize,
after: $afterCursor,
polygonTypes: $polygonTypes,
deletedAt: $deletedAt
) {
pageInfo { endCursor }
edges {
node { _id }
}
}
}
"""
cursor = None
deleted_ids = set()
while True:
variables["afterCursor"] = cursor
payload = gql_query(query, variables)
edges = payload["deletedPolygonEvents"]["edges"]
deleted_ids.update(e["node"]["_id"] for e in edges)
if len(edges) < page_size:
break
cursor = payload["deletedPolygonEvents"]["pageInfo"]["endCursor"]
if deleted_ids:
df = df[~df["_id"].isin(deleted_ids)].reset_index(drop=True)
return df
# ------------------------------------------------------------------
# 6ļøā£ Main integration
# ------------------------------------------------------------------
def main(data):
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
# 6.1 Fetch new/updated events
new_events = fetch_polygon_events()
logging.info(f"Fetched {len(new_events)} new/updated events")
# 6.2 Merge with existing cache
if not data.empty:
data = pd.concat([data, new_events], ignore_index=True)
data = data.drop_duplicates(subset=["_id"], keep="last")
else:
data = new_events
# 6.3 Remove events that were deleted
data = purge_deleted_events(data)
logging.info(f"Dataset after purge: {len(data)} records")
# 6.4 Persist
data.to_csv(CSV_FILE, index=False)
logging.info(f"Persisted {CSV_FILE}")
if __name__ == "__main__":
main(data)Implementation notes
pandas.json_normalizeflattens the nested GraphQL structure into a tidy dataframe.- The
lastUpdatedfilter is inclusive; subtracting 24āÆh from the fileāsmtimeguarantees that we donāt miss updates on the boundary day. pageSizecan be tuned to match network capacity and available memory.
5. Best Practices
Secure token | Store it in a |
Limit | Prevent server/client timeāouts. |
Logging | Enable debugging output ( |
Error handling | Catch HTTP errors ( |
Unit tests | Mock API responses to validate merge and purge logic. |
Monitoring | Run via Cron/Argoāworkflow and track record counts per run. |
6. Advanced UseāCases
Sync to SQL | Use |
Partitioning | Separate CSVs or tables by year/month. |
Integrity check | Store a SHA-256 hash of the payload and verify it each time. |
7. Conclusion
This approach lets you:
- Significantly cut down network traffic (only changed data).
- Keep your local cache fresh (add, update, delete).
- Easily automate the process with a clear, maintainable script.
By following this guide, you can integrate AXSMarineās polygon events into your documentāmanagement ecosystem while keeping bandwidth and storage usage minimal. š
References
On this page
- Incremental Updates of Polygon Events